V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
The Go Programming Language
http://golang.org/
Go Playground
Go Projects
Revel Web Framework
xuantedev
V2EX  ›  Go 编程语言

吐槽一下 golang 的 select 模型,居然不自带超时机制

  •  
  •   xuantedev · 2021-08-07 15:45:06 +08:00 · 2589 次点击
    这是一个创建于 1195 天前的主题,其中的信息可能已经有所发展或是发生改变。

    使用 Timer 的 Channel 来配合的超时机制,怎么都有漏洞。

    timer 的超时状态和 channel 的 sendTime 数据,也不做到原子状态。判断出来是超时了,既有可能 channel 中有数据,也有可能没有数据。完全是薛定谔的定时器。

    就这种情况下,为什么不 native 提供一个 select 的超时方式呢?

    各位大牛们是如何解决这个问题的啊?

    容忍(理论上)一定概率的立即超时?

    或者干脆不重用,每次循环重新创建一个新的 Timer ?

    16 条回复    2021-08-19 09:11:17 +08:00
    noisyes
        1
    noisyes  
       2021-08-07 16:17:45 +08:00
    ctx + select
    lesismal
        2
    lesismal  
       2021-08-07 17:01:44 +08:00   ❤️ 5
    这根本就不算事:

    1. 没人能做到时间的百分百精确
    2. 即使是 syacall 的 select/poll/epoll 的 timeout 参数,也可能你本次 loop 刚超时的瞬间、fd 事件就来了,而且超时的瞬间,对于业务而言已经达到了那个可以按超时处理的条件,业务开始处理超时后、超时事件再出现丢弃即可
    3. 超时后即使 channel 中又收到了数据而没被读取,也没问题,不是必须读出来才行。另外,chan 也不是必须 close 才会被回收的,所以不用纠结残留相关的问题

    并发的边界问题,应该由业务层来保证,是保证指令范围的原子性还是保证过程范围的原子性,要区分清楚
    lujjjh
        3
    lujjjh  
       2021-08-07 19:54:45 +08:00
    如果没理解错你的意思,文档里就有正确的解法:先 Stop(),然后消费 channel (如果需要),最后 Reset:
    https://pkg.go.dev/time#Timer.Reset
    xuantedev
        4
    xuantedev  
    OP
       2021-08-08 10:37:34 +08:00
    @lesismal 感谢大神赐教,受益匪浅。
    xuantedev
        5
    xuantedev  
    OP
       2021-08-08 10:37:42 +08:00
    @lujjjh 感谢兄弟的细心阅读,并且一眼就看出了我的担心,握爪。是这样的,其实问题就消费 Channel 这里。
    有一种极端的情况,就是我消费 Channel 的时候,runtime 的 deltimer()还没有来得及调用 sendTime 。但是当我的消费代码走完了以后,它调用了 sendTime,导致了 Channel 里面又有了内容,这样,我下次正儿八经的业务 select 的时候,立马就会返回,导致我错误的认为超时了。

    这个问题的出在 Runtime 中 timer 的维护 routine 中调用定时器处理函数 runOneTimer()。该函数的处理分为两部分:
    * A) 从 timer 堆中处理定时器状态这部分
    * B) 调用 f(arg, seq)来 sentTime 到 Channel 中
    从 [代码]( https://github.com/golang/go/blob/891547e2d4bc2a23973e2c9f972ce69b2b48478e/src/runtime/time.go#L818)看,这两个部分操作没有放在一个锁中,因此不是原子操作。
    ```go
    func runOneTimer(pp *p, t *timer, now int64) {
    .......
    if raceenabled {
    // Temporarily use the current P's racectx for g0.
    gp := getg()
    if gp.racectx != 0 {
    throw("runOneTimer: unexpected racectx")
    }
    gp.racectx = gp.m.p.ptr().timerRaceCtx
    }

    unlock(&pp.timersLock)

    f(arg, seq) // 就是那个 sendTime 函数

    lock(&pp.timersLock)

    .......
    }
    ```

    设想在用户调用 Stop()的时候,虽然和 runOneTimer()的 A 部分产生了互斥,但是和 sentTime 并没有产生互斥。极端情况下,如果 runtime 的 routine 中把 A 部分执行完了,碰到 CPU 繁忙或者系统调度等极端场景,B 部分一直没有机会执行。那么 User 的 Stop()、消费 Channel (实际上没有数据)、然后 Reset(),到这里 B 部分都没有执行。

    这样,Channel 中一直没有数据,但是当用户调用业务 Select()的时候,runtime routine 中的 B 部分执行了,sendTime 将数据发送到了 Channel 中。select()就因为上一次 Timer 的数据而触发了。

    这也就是我在问题中说的,`(理论上)一定概率的立即超时`。


    其实 Golang 的相关模块作者也注意到了该问题,修改的次数非常多,提供的注释也非常长。


    其中作者提到,Stop()并不会等到 f (即 sentTime)执行完毕后才返回,也就是说 Stop()只反馈 Timer 当时的状态,不反馈 channel 的状态。
    ```
    // Stop prevents the Timer from firing.
    // It returns true if the call stops the timer, false if the timer has already
    // expired or been stopped.
    // Stop does not close the channel, to prevent a read from the channel succeeding
    // incorrectly.
    //
    // To ensure the channel is empty after a call to Stop, check the
    // return value and drain the channel.
    // For example, assuming the program has not received from t.C already:
    //
    // if !t.Stop() {
    // <-t.C
    // }
    //
    // This cannot be done concurrent to other receives from the Timer's
    // channel or other calls to the Timer's Stop method.
    //
    // For a timer created with AfterFunc(d, f), if t.Stop returns false, then the timer
    // has already expired and the function f has been started in its own goroutine;
    // Stop does not wait for f to complete before returning.
    // If the caller needs to know whether f is completed, it must coordinate
    // with f explicitly.
    ```

    而对于 Reset(),作者也是用了大量篇幅,指出 Reset 的返回值并不是准确的,因为竟态导致的问题。提供返回值只是为了兼容过去的接口。另外其实 time 模块的 Reset()调用的其实就是 runtime 的 resetTimer,而 resetTimer()其实就是内部调用 stopTimer()。
    ```
    // Reset changes the timer to expire after duration d.
    // It returns true if the timer had been active, false if the timer had
    // expired or been stopped.
    //
    // For a Timer created with NewTimer, Reset should be invoked only on
    // stopped or expired timers with drained channels.
    //
    // If a program has already received a value from t.C, the timer is known
    // to have expired and the channel drained, so t.Reset can be used directly.
    // If a program has not yet received a value from t.C, however,
    // the timer must be stopped and—if Stop reports that the timer expired
    // before being stopped—the channel explicitly drained:
    //
    // if !t.Stop() {
    // <-t.C
    // }
    // t.Reset(d)
    //
    // This should not be done concurrent to other receives from the Timer's
    // channel.
    //
    // Note that it is not possible to use Reset's return value correctly, as there
    // is a race condition between draining the channel and the new timer expiring.
    // Reset should always be invoked on stopped or expired channels, as described above.
    // The return value exists to preserve compatibility with existing programs.
    //
    // For a Timer created with AfterFunc(d, f), Reset either reschedules
    // when f will run, in which case Reset returns true, or schedules f
    // to run again, in which case it returns false.
    // When Reset returns false, Reset neither waits for the prior f to
    // complete before returning nor does it guarantee that the subsequent
    // goroutine running f does not run concurrently with the prior
    // one. If the caller needs to know whether the prior execution of
    // f is completed, it must coordinate with f explicitly.
    ```

    所以从这些场景看,很难精确的判断出来 Channel 是否有数据,而尝试去 Drain Channel 也只是大概率可以 drain out,极端情况下,drain empty channel 之后,还会立即有数据 send 进来。
    lujjjh
        6
    lujjjh  
       2021-08-08 11:36:26 +08:00
    常见的超时处理 pattern 下(操作 timer 和消费 t.C 的是同一个 goroutine )应该没有你说的这种 race condition 。

    写了个例子:

    https://gist.github.com/lujjjh/8f9c4b257654465ed2585dcd4d193a29

    可以论证的是,runOneTimer 执行 N 次,f 就会执行 N 次,从而 t.C 里会产生 N 个需要消费的东西。只要确保 runOneTimer 的执行次数(对 Timer 来说要么是 0,要么是 1 )跟 t.C 的消费次数一致即可。

    正向看,只要 runOneTimer 被执行了,f 产生的 t.C 要么在 A 处被消费,如果进入了 B,t.Stop() 一定会返回 false,从而 t.C 在 C 处被消费。你说的 f 一直没有被调度的情况,消费 t.C 的 goroutine 也会被阻塞,要么阻塞在 A 处,要么阻塞在 C 处。

    反向看,如果 runOneTimer 还没有被执行,就走到了 B,那么 Stop 会阻止 timer 进入 timerRunning 并且返回 true 。此时 runOneTimer 不会被执行,f 也就一定不会被执行。
    xuantedev
        7
    xuantedev  
    OP
       2021-08-08 14:48:48 +08:00
    @lujjjh 感谢细心回复。

    现在的问题是,在消费 channel 中的内容的时候,您是选择阻塞消耗,还是尝试消耗,这两者只能选择一个。

    阻塞消耗,如果 f 没有调用,就会阻塞:
    ```go
    select {
    case <-timer.C:
    xxx
    }
    ```
    尝试性消耗,即使 f 没有调用,也不会阻塞:
    ```go
    select {
    case <-timer.C
    xxxx
    default:
    xxxx
    ```

    如果选择阻塞性消耗,就是您编写的代码情况,那么假定的前题条件是:Stop()返回 False 表明一定执行了 f 。但是在某些情况下,不能做这样的保证,可以参考这个 bug:
    ```
    https://github.com/golang/go/issues/14038
    ```

    如果 Stop()返回了 False,但是没有执行 f,那么上面的阻塞式读取就会出现问题。我们再看看什么场景下会出现 Stop()返回 false,但是 f 没有执行的情况。

    只要前面超时过(第一次发送了 channel 数据),然后您的程序从正常逻辑(不是 drain channel 那段)中读取了数据,Channel 会为空。此时您去 Stop(),肯定返回 False,您再去阻塞读取 Channel,那么就会阻塞。

    您可以拿代码运行一下看看:第一个匿名函数是生产者,第二个匿名函数是正常的业务逻辑(即消费者)。
    正常情况下,读取数据的超时值是 2 秒。
    第一次读取就超时,因为生产者 3 秒后才生产数据,超时后,BBB 处的代码会把 channel 读取干净,这也很正常,超时事件发送给我们,我们消耗它。
    循环第二次执行,Stop()会返回 False (因为定时器已经 expired 了),但是不会调用 f 去 sendTime,这也就是我前面说过的超时和 f 的调用是两件独立的事情,Stop()的返回值不会反应是否发送 sendTime 。
    此时,再 AAA 处的阻塞调用,就会阻塞。因为 Channel 里面的确没有数据。

    ```go
    package main

    import (
    "fmt"
    "time"
    )

    func main() {
    c := make(chan bool)
    go func() {
    for i := 0; i < 10; i++ {
    time.Sleep(time.Second * 3)
    c <- false
    }
    }()

    go func() {
    t := time.NewTimer(time.Second * 2)
    for {
    if !t.Stop() {
    <-t.C // AAA: will blocked here
    }
    t.Reset(time.Second * 2)
    select {
    case b := <-c:
    if !b {
    fmt.Println(time.Now(), "work...")
    continue
    }
    case <-t.C: // BBB: normal receive from channel timeout event
    fmt.Println(time.Now(), "timeout")
    continue
    }
    }
    }()

    time.Sleep(time.Second * 10000000)
    }
    ```

    如果改成非阻塞性读取,那么又有我第一次说的那个多了 f 调用的问题。
    lujjjh
        8
    lujjjh  
       2021-08-08 15:04:03 +08:00
    @xuantedev 这段代码是有问题的,可以对比一下我上面提供的代码。并不是 f 没有调用,而是这段代码逻辑里,超时的时候消费了两次 t.C ( case <-t.C: 和下一次循环 if 分支里的 <-t.C ),修复方式也很简单,把 if !t.Stop() { <-t.C } 挪到第一个 case 里就行了。
    xuantedev
        9
    xuantedev  
    OP
       2021-08-08 15:20:14 +08:00
    @lujjjh 太感谢了,这应该是正确的做法了。前面看了很多代码片段,都是把 Stop()放在 select 之外的,头脑禁锢了。感谢感谢~
    lesismal
        10
    lesismal  
       2021-08-08 17:10:15 +08:00
    @lujjjh @xuantedev

    go func() {
    for {
    func() {
    timer := time.NewTimer(time.Second * 2)
    defer timer.Stop()

    select {
    case b := <-c:
    if !b {
    fmt.Println(time.Now(), "work...")
    }
    case <-timer.C: // BBB: normal receive from channel timeout event
    fmt.Println(time.Now(), "timeout")
    }
    }()
    }
    }()

    简洁点就这么写,每次一个新的局部变量 Timer 结构体没压力,非要复用那么写法的可读性太差了,对维护者不友好,而且习惯了不好的写法,哪天一不小心就写出问题了
    lesismal
        11
    lesismal  
       2021-08-08 17:11:21 +08:00
    回帖里头贴代码的格式是真的伤
    bthulu
        12
    bthulu  
       2021-08-19 09:09:00 +08:00
    @lesismal 回帖好像可以用 markdown 代码块的
    ```
    func main() {
    fmt.Println("Hello, 世界")
    }
    ```
    `
    func main() {
    fmt.Println("Hello, 世界")
    }
    `
    bthulu
        13
    bthulu  
       2021-08-19 09:09:13 +08:00
    `func main() {
    fmt.Println("Hello, 世界")
    }`
    bthulu
        14
    bthulu  
       2021-08-19 09:09:41 +08:00
    不行, 用不了`func main() {fmt.Println("Hello, 世界")`
    lesismal
        15
    lesismal  
       2021-08-19 09:10:53 +08:00
    @bthulu 看你回复的好像还是不行呀,我试试

    ```golang
    println("test markdown response"
    ```
    lesismal
        16
    lesismal  
       2021-08-19 09:11:17 +08:00
    @bthulu 果然不行,白高兴了
    关于   ·   帮助文档   ·   博客   ·   API   ·   FAQ   ·   实用小工具   ·   5953 人在线   最高记录 6679   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 26ms · UTC 02:02 · PVG 10:02 · LAX 18:02 · JFK 21:02
    Developed with CodeLauncher
    ♥ Do have faith in what you're doing.