使用 Timer 的 Channel 来配合的超时机制,怎么都有漏洞。
timer 的超时状态和 channel 的 sendTime 数据,也不做到原子状态。判断出来是超时了,既有可能 channel 中有数据,也有可能没有数据。完全是薛定谔的定时器。
就这种情况下,为什么不 native 提供一个 select 的超时方式呢?
各位大牛们是如何解决这个问题的啊?
容忍(理论上)一定概率的立即超时?
或者干脆不重用,每次循环重新创建一个新的 Timer ?
1
noisyes 2021-08-07 16:17:45 +08:00
ctx + select
|
2
lesismal 2021-08-07 17:01:44 +08:00 5
这根本就不算事:
1. 没人能做到时间的百分百精确 2. 即使是 syacall 的 select/poll/epoll 的 timeout 参数,也可能你本次 loop 刚超时的瞬间、fd 事件就来了,而且超时的瞬间,对于业务而言已经达到了那个可以按超时处理的条件,业务开始处理超时后、超时事件再出现丢弃即可 3. 超时后即使 channel 中又收到了数据而没被读取,也没问题,不是必须读出来才行。另外,chan 也不是必须 close 才会被回收的,所以不用纠结残留相关的问题 并发的边界问题,应该由业务层来保证,是保证指令范围的原子性还是保证过程范围的原子性,要区分清楚 |
3
lujjjh 2021-08-07 19:54:45 +08:00
如果没理解错你的意思,文档里就有正确的解法:先 Stop(),然后消费 channel (如果需要),最后 Reset:
https://pkg.go.dev/time#Timer.Reset |
5
xuantedev OP @lujjjh 感谢兄弟的细心阅读,并且一眼就看出了我的担心,握爪。是这样的,其实问题就消费 Channel 这里。
有一种极端的情况,就是我消费 Channel 的时候,runtime 的 deltimer()还没有来得及调用 sendTime 。但是当我的消费代码走完了以后,它调用了 sendTime,导致了 Channel 里面又有了内容,这样,我下次正儿八经的业务 select 的时候,立马就会返回,导致我错误的认为超时了。 这个问题的出在 Runtime 中 timer 的维护 routine 中调用定时器处理函数 runOneTimer()。该函数的处理分为两部分: * A) 从 timer 堆中处理定时器状态这部分 * B) 调用 f(arg, seq)来 sentTime 到 Channel 中 从 [代码]( https://github.com/golang/go/blob/891547e2d4bc2a23973e2c9f972ce69b2b48478e/src/runtime/time.go#L818)看,这两个部分操作没有放在一个锁中,因此不是原子操作。 ```go func runOneTimer(pp *p, t *timer, now int64) { ....... if raceenabled { // Temporarily use the current P's racectx for g0. gp := getg() if gp.racectx != 0 { throw("runOneTimer: unexpected racectx") } gp.racectx = gp.m.p.ptr().timerRaceCtx } unlock(&pp.timersLock) f(arg, seq) // 就是那个 sendTime 函数 lock(&pp.timersLock) ....... } ``` 设想在用户调用 Stop()的时候,虽然和 runOneTimer()的 A 部分产生了互斥,但是和 sentTime 并没有产生互斥。极端情况下,如果 runtime 的 routine 中把 A 部分执行完了,碰到 CPU 繁忙或者系统调度等极端场景,B 部分一直没有机会执行。那么 User 的 Stop()、消费 Channel (实际上没有数据)、然后 Reset(),到这里 B 部分都没有执行。 这样,Channel 中一直没有数据,但是当用户调用业务 Select()的时候,runtime routine 中的 B 部分执行了,sendTime 将数据发送到了 Channel 中。select()就因为上一次 Timer 的数据而触发了。 这也就是我在问题中说的,`(理论上)一定概率的立即超时`。 其实 Golang 的相关模块作者也注意到了该问题,修改的次数非常多,提供的注释也非常长。 其中作者提到,Stop()并不会等到 f (即 sentTime)执行完毕后才返回,也就是说 Stop()只反馈 Timer 当时的状态,不反馈 channel 的状态。 ``` // Stop prevents the Timer from firing. // It returns true if the call stops the timer, false if the timer has already // expired or been stopped. // Stop does not close the channel, to prevent a read from the channel succeeding // incorrectly. // // To ensure the channel is empty after a call to Stop, check the // return value and drain the channel. // For example, assuming the program has not received from t.C already: // // if !t.Stop() { // <-t.C // } // // This cannot be done concurrent to other receives from the Timer's // channel or other calls to the Timer's Stop method. // // For a timer created with AfterFunc(d, f), if t.Stop returns false, then the timer // has already expired and the function f has been started in its own goroutine; // Stop does not wait for f to complete before returning. // If the caller needs to know whether f is completed, it must coordinate // with f explicitly. ``` 而对于 Reset(),作者也是用了大量篇幅,指出 Reset 的返回值并不是准确的,因为竟态导致的问题。提供返回值只是为了兼容过去的接口。另外其实 time 模块的 Reset()调用的其实就是 runtime 的 resetTimer,而 resetTimer()其实就是内部调用 stopTimer()。 ``` // Reset changes the timer to expire after duration d. // It returns true if the timer had been active, false if the timer had // expired or been stopped. // // For a Timer created with NewTimer, Reset should be invoked only on // stopped or expired timers with drained channels. // // If a program has already received a value from t.C, the timer is known // to have expired and the channel drained, so t.Reset can be used directly. // If a program has not yet received a value from t.C, however, // the timer must be stopped and—if Stop reports that the timer expired // before being stopped—the channel explicitly drained: // // if !t.Stop() { // <-t.C // } // t.Reset(d) // // This should not be done concurrent to other receives from the Timer's // channel. // // Note that it is not possible to use Reset's return value correctly, as there // is a race condition between draining the channel and the new timer expiring. // Reset should always be invoked on stopped or expired channels, as described above. // The return value exists to preserve compatibility with existing programs. // // For a Timer created with AfterFunc(d, f), Reset either reschedules // when f will run, in which case Reset returns true, or schedules f // to run again, in which case it returns false. // When Reset returns false, Reset neither waits for the prior f to // complete before returning nor does it guarantee that the subsequent // goroutine running f does not run concurrently with the prior // one. If the caller needs to know whether the prior execution of // f is completed, it must coordinate with f explicitly. ``` 所以从这些场景看,很难精确的判断出来 Channel 是否有数据,而尝试去 Drain Channel 也只是大概率可以 drain out,极端情况下,drain empty channel 之后,还会立即有数据 send 进来。 |
6
lujjjh 2021-08-08 11:36:26 +08:00
常见的超时处理 pattern 下(操作 timer 和消费 t.C 的是同一个 goroutine )应该没有你说的这种 race condition 。
写了个例子: https://gist.github.com/lujjjh/8f9c4b257654465ed2585dcd4d193a29 可以论证的是,runOneTimer 执行 N 次,f 就会执行 N 次,从而 t.C 里会产生 N 个需要消费的东西。只要确保 runOneTimer 的执行次数(对 Timer 来说要么是 0,要么是 1 )跟 t.C 的消费次数一致即可。 正向看,只要 runOneTimer 被执行了,f 产生的 t.C 要么在 A 处被消费,如果进入了 B,t.Stop() 一定会返回 false,从而 t.C 在 C 处被消费。你说的 f 一直没有被调度的情况,消费 t.C 的 goroutine 也会被阻塞,要么阻塞在 A 处,要么阻塞在 C 处。 反向看,如果 runOneTimer 还没有被执行,就走到了 B,那么 Stop 会阻止 timer 进入 timerRunning 并且返回 true 。此时 runOneTimer 不会被执行,f 也就一定不会被执行。 |
7
xuantedev OP @lujjjh 感谢细心回复。
现在的问题是,在消费 channel 中的内容的时候,您是选择阻塞消耗,还是尝试消耗,这两者只能选择一个。 阻塞消耗,如果 f 没有调用,就会阻塞: ```go select { case <-timer.C: xxx } ``` 尝试性消耗,即使 f 没有调用,也不会阻塞: ```go select { case <-timer.C xxxx default: xxxx ``` 如果选择阻塞性消耗,就是您编写的代码情况,那么假定的前题条件是:Stop()返回 False 表明一定执行了 f 。但是在某些情况下,不能做这样的保证,可以参考这个 bug: ``` https://github.com/golang/go/issues/14038 ``` 如果 Stop()返回了 False,但是没有执行 f,那么上面的阻塞式读取就会出现问题。我们再看看什么场景下会出现 Stop()返回 false,但是 f 没有执行的情况。 只要前面超时过(第一次发送了 channel 数据),然后您的程序从正常逻辑(不是 drain channel 那段)中读取了数据,Channel 会为空。此时您去 Stop(),肯定返回 False,您再去阻塞读取 Channel,那么就会阻塞。 您可以拿代码运行一下看看:第一个匿名函数是生产者,第二个匿名函数是正常的业务逻辑(即消费者)。 正常情况下,读取数据的超时值是 2 秒。 第一次读取就超时,因为生产者 3 秒后才生产数据,超时后,BBB 处的代码会把 channel 读取干净,这也很正常,超时事件发送给我们,我们消耗它。 循环第二次执行,Stop()会返回 False (因为定时器已经 expired 了),但是不会调用 f 去 sendTime,这也就是我前面说过的超时和 f 的调用是两件独立的事情,Stop()的返回值不会反应是否发送 sendTime 。 此时,再 AAA 处的阻塞调用,就会阻塞。因为 Channel 里面的确没有数据。 ```go package main import ( "fmt" "time" ) func main() { c := make(chan bool) go func() { for i := 0; i < 10; i++ { time.Sleep(time.Second * 3) c <- false } }() go func() { t := time.NewTimer(time.Second * 2) for { if !t.Stop() { <-t.C // AAA: will blocked here } t.Reset(time.Second * 2) select { case b := <-c: if !b { fmt.Println(time.Now(), "work...") continue } case <-t.C: // BBB: normal receive from channel timeout event fmt.Println(time.Now(), "timeout") continue } } }() time.Sleep(time.Second * 10000000) } ``` 如果改成非阻塞性读取,那么又有我第一次说的那个多了 f 调用的问题。 |
8
lujjjh 2021-08-08 15:04:03 +08:00
@xuantedev 这段代码是有问题的,可以对比一下我上面提供的代码。并不是 f 没有调用,而是这段代码逻辑里,超时的时候消费了两次 t.C ( case <-t.C: 和下一次循环 if 分支里的 <-t.C ),修复方式也很简单,把 if !t.Stop() { <-t.C } 挪到第一个 case 里就行了。
|
9
xuantedev OP @lujjjh 太感谢了,这应该是正确的做法了。前面看了很多代码片段,都是把 Stop()放在 select 之外的,头脑禁锢了。感谢感谢~
|
10
lesismal 2021-08-08 17:10:15 +08:00
@lujjjh @xuantedev
go func() { for { func() { timer := time.NewTimer(time.Second * 2) defer timer.Stop() select { case b := <-c: if !b { fmt.Println(time.Now(), "work...") } case <-timer.C: // BBB: normal receive from channel timeout event fmt.Println(time.Now(), "timeout") } }() } }() 简洁点就这么写,每次一个新的局部变量 Timer 结构体没压力,非要复用那么写法的可读性太差了,对维护者不友好,而且习惯了不好的写法,哪天一不小心就写出问题了 |
11
lesismal 2021-08-08 17:11:21 +08:00
回帖里头贴代码的格式是真的伤
|
12
bthulu 2021-08-19 09:09:00 +08:00
@lesismal 回帖好像可以用 markdown 代码块的
``` func main() { fmt.Println("Hello, 世界") } ``` ` func main() { fmt.Println("Hello, 世界") } ` |
13
bthulu 2021-08-19 09:09:13 +08:00
`func main() {
fmt.Println("Hello, 世界") }` |
14
bthulu 2021-08-19 09:09:41 +08:00
不行, 用不了`func main() {fmt.Println("Hello, 世界")`
|
15
lesismal 2021-08-19 09:10:53 +08:00
|