V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
The Go Programming Language
http://golang.org/
Go Playground
Go Projects
Revel Web Framework
monkeyWie
V2EX  ›  Go 编程语言

求 go 并发限制的最佳实现

  •  
  •   monkeyWie ·
    monkeyWie · 2021-02-08 12:01:29 +08:00 · 7278 次点击
    这是一个创建于 1413 天前的主题,其中的信息可能已经有所发展或是发生改变。

    情况如下: 有 N 个任务,每个任务执行完都会返回结果或者 error,通过固定的(M)协程去执行,如果其中有一个任务返回 error 时立即结束,否则全部执行完成时返回结果列表。

    我自己写了一版,感觉有点复杂:https://play.golang.org/p/ono1S04XupK

    不知道各位有没有什么更简单的实现。

    第 1 条附言  ·  2021-02-08 18:32:55 +08:00
    大家好好审下题啊,要在有一个任务发送错误时立即结束流程,errgroup 是不行的哦,它会等到所有任务执行完毕才返回。
    第 2 条附言  ·  2021-02-10 12:33:58 +08:00
    感谢各位大佬的思路,目前基于最优实现封装了一个通用的方法: https://play.golang.org/p/Be7vNF4JH4-
    如果各位有更好的思路可以重写下面的 run 方法然后分享出来
    第 3 条附言  ·  2021-02-10 13:27:32 +08:00
    再更新下,上面代码的没有实现发生错误立即返回: https://play.golang.org/p/B3fZZCWwuKT
    70 条回复    2021-02-19 10:21:51 +08:00
    ToPoGE
        1
    ToPoGE  
       2021-02-08 12:16:33 +08:00 via Android
    context 不就可以实现吗
    cloudfstrife
        2
    cloudfstrife  
       2021-02-08 12:26:53 +08:00 via Android
    errgroup 了解一下
    xylophone21
        3
    xylophone21  
       2021-02-08 12:30:21 +08:00
    一起探讨,如果 M 不是大到离谱,是否需要控制 M ?还是无脑 go 就好了,因为这样感觉就是 go 程池 /线程池了,有点像把 go 程当线程用了?
    sunorg
        4
    sunorg  
       2021-02-08 12:44:23 +08:00
    不是 channel 就可以实现了吗?
    monkeyWie
        5
    monkeyWie  
    OP
       2021-02-08 13:22:19 +08:00
    @ToPoGE context 实现不了等待任务全部执行完成吧
    monkeyWie
        6
    monkeyWie  
    OP
       2021-02-08 13:25:00 +08:00
    @xylophone21 现在就是 M 如果太大了服务器会顶不住,比如并发查询 sql
    monkeyWie
        7
    monkeyWie  
    OP
       2021-02-08 13:34:51 +08:00
    @cloudfstrife errgroup 我试了下,第一不能限制并发数,第二不能在发生错误时立即返回。
    baodaren8
        8
    baodaren8  
       2021-02-08 13:35:46 +08:00
    借楼问一下,我怎么不能发帖了。。
    coool
        9
    coool  
       2021-02-08 13:45:42 +08:00
    @baodaren8 我也不能发了
    ToPoGE
        10
    ToPoGE  
       2021-02-08 13:52:03 +08:00
    @monkeyWie context 配合 waitGRoup,就可以做到失败一个全干掉,全完成,返回
    MadbookPro
        11
    MadbookPro  
       2021-02-08 14:00:54 +08:00
    一个 err chan 用来通知 root goroutine 发生错误,然后 root context 退出; wait group 用来做全部完成检查。
    这样何如?
    fatedier
        12
    fatedier  
       2021-02-08 14:04:21 +08:00
    MadbookPro
        13
    MadbookPro  
       2021-02-08 14:04:47 +08:00
    哦还有并发控制。
    taskChannel + workerChannel,这样 wait group 也不需要了
    mogg
        14
    mogg  
       2021-02-08 14:23:54 +08:00
    线程池+channel,收到错误消息停止向队列里发送消息。
    想要立刻回收,主线程直接 stop 线程池里所有线程,否则等待运行中线程跑完
    monkeyWie
        15
    monkeyWie  
    OP
       2021-02-08 14:27:43 +08:00
    @ToPoGE @MadbookPro @mogg
    直接上代码吧,这样说不明白🤣
    LoNeFong
        16
    LoNeFong  
       2021-02-08 14:29:07 +08:00
    errgroup + 1
    Claar
        17
    Claar  
       2021-02-08 14:37:21 +08:00 via iPhone
    并发控制:协程池
    消息传递:channel
    还有等待协程结束再结束 main 函数:sync.
    甚至可以子任务报错直接 exit
    ToPoGE
        18
    ToPoGE  
       2021-02-08 14:38:59 +08:00
    @monkeyWie 那你用 errgroup 把,go 官方标准库中的,看文档直接用,就是 context 和 waitGroup 结合的
    ppphp
        19
    ppphp  
       2021-02-08 14:48:36 +08:00
    看不到代码。。。我的思路是,for 循环几个 goroutine,select 的时候,先从 errchannel 里看 err,然后 default 里从 taskchannel 里拿 task
    mogg
        20
    mogg  
       2021-02-08 15:48:18 +08:00
    ```go
    var wg sync.WaitGroup
    pool := makePool(pollSize)
    func() {
    for i := 0; i < runTimes; i++ {
    select {
    case <-errChan:
    fmt.Println("error")
    pool.Stop()
    return
    default:
    wg.Add(1)
    poll.Submit(task)
    }
    }
    }()
    if pool.IsRunning()
    wg.Wait()
    ```
    我觉得核心就是这样一个结构,不过 go 没用过线程池,可以看看有什么库
    KaynW
        21
    KaynW  
       2021-02-08 15:53:11 +08:00
    caiych
        22
    caiych  
       2021-02-08 16:05:58 +08:00
    一个比较简单的实现
    https://play.golang.org/p/MDU_x7C2npI

    如果有一个 routine 有 error,ctx 会 Done
    如果 ctx 已经 Done 了,semaphore 会 error
    wpf375516041
        23
    wpf375516041  
       2021-02-08 17:03:40 +08:00
    package main

    import (
    "fmt"
    "net/http"
    )

    /*有 N 个任务,每个任务都会返回结果或者 error,通过固定的并发数(M)去执行。
    如果其中有一个任务返回 error 时立即结束,否则全部执行完成时返回结果列表*/
    func main() {

    n := 10
    m := 5
    result := make([]string, n)
    limitCh := make(chan interface{}, m)
    errCh := make(chan error)
    doneCh := make(chan interface{},1)

    defer func() {
    close(limitCh)
    close(errCh)
    }()

    for state, i := true, 0; i < n; i++ {
    state = true
    for state {
    select {
    case limitCh <- nil:
    fmt.Printf("开始第%d 个任务\n", i)
    go func(i int) {
    var err error
    defer func() {
    if i == n-1 {
    close(doneCh)
    }
    if err != nil {
    errCh <- err
    }
    <-limitCh
    }()
    ret, err := doTask()
    if err != nil {
    return
    }
    result[i] = ret
    }(i)
    state = false
    case <-errCh:
    return
    default:
    }
    }
    }
    <- doneCh
    fmt.Println(result)

    }

    func doTask() (string, error) {
    // 模拟执行任务
    resp, err := http.Get("https://www.baidu.com")
    if err != nil {
    return "", err
    }
    defer resp.Body.Close()
    return resp.Status, nil
    }
    monkeyWie
        24
    monkeyWie  
    OP
       2021-02-08 18:17:31 +08:00
    @caiych #22 这个做不到有一个 task 发生 error 立即结束,例如: https://play.golang.org/p/sqlMbgW7z9Z
    monkeyWie
        25
    monkeyWie  
    OP
       2021-02-08 18:18:55 +08:00
    @caiych #22 比如第一个任务执行已经失败了,需要立即返回,而不是等到所有任务执行完
    monkeyWie
        26
    monkeyWie  
    OP
       2021-02-08 18:28:10 +08:00
    @wpf375516041 #23 这个好像也有点问题哦,就是判断任务全部执行完成的地方

    ```
    if i == n-1 {
    close(doneCh)
    }
    ```
    这里判断最后一个任务执行完成就结束,但是可能会存在还有正在执行的任务并且比最后一个任务执行还慢,就不对了。
    caiych
        27
    caiych  
       2021-02-08 18:54:42 +08:00
    @monkeyWie 你仔细看一下打出来的内容,第一批开始执行的任务里没有 1 ( 1 没有抢到信号量)。当 1 抢到信号量开始执行后整个程序都结束了(其他的任务都取消了,不然如果 i 那个循环大的话整个程序会运行很久)
    rrfeng
        28
    rrfeng  
       2021-02-08 19:36:10 +08:00
    1. 声明一个 buffered chan 当做 goroutine 池子 。
    2. 启动 goroutine 的时候传入 context 用做取消(前提是你的 goroutine 任务可以取消)。
    ginjedoad
        29
    ginjedoad  
       2021-02-08 20:29:53 +08:00
    兄弟,errgroup 就是为了你这个场景设计的。不要重复造轮子了
    ginjedoad
        30
    ginjedoad  
       2021-02-08 20:30:42 +08:00
    说只要一个错误不能马上中断返回的,自己好好审计一下 errgroup
    monkeyWie
        31
    monkeyWie  
    OP
       2021-02-08 22:29:21 +08:00
    @caiych #27 错误发生时不会立刻结束,而是会等正在执行的任务全部完成才返回,你可以跑这个试试: https://play.golang.org/p/66Me2TYbVoK

    错误发生了也要等 5 秒才结束。
    monkeyWie
        32
    monkeyWie  
    OP
       2021-02-08 22:32:04 +08:00
    @ginjedoad #30 老哥贴个代码我跑一下看看,我自己测的 errgroup 是不能发生错误立即中断的
    wpf375516041
        33
    wpf375516041  
       2021-02-08 22:55:03 +08:00
    @monkeyWie 是滴 用 waitgroup 大家说的都对 其实你把原来的封装下 搞个协程池 代码就清晰了
    wpf375516041
        34
    wpf375516041  
       2021-02-08 23:16:58 +08:00   ❤️ 1
    我觉得这是个很好的面试题,既有实际意义也考验基本功,大家可以试试不用三方库实现一下~
    talk is cheap, show me the code
    一起娱乐娱乐,新年快乐~!
    1. 控制并发
    2. 等待所有任务返回
    3. 一个任务错误,立刻结束

    如果不解耦,并发控制和结果处理的逻辑混杂确实屎

    go 实现的时候想当然了,只以最后提交的任务判断是否结束

    kotlin 协程实现的时候发现 io,计算任务无法退出,必须要手动捕捉中止信号

    java 须要手动捕捉中止信号 但是可以通过 thread.stop()强制停止,另外判断线程是否异常退出较难
    caiych
        35
    caiych  
       2021-02-09 00:06:49 +08:00   ❤️ 2
    @monkeyWie

    最开始的版本里注释有写 DoWork 里的操作需要支持 context cancellation (比如如果操作是 http.Get 的话,可以使用 http.NewRequestWithContext)

    这个里面实现了一个如果调用的操作不支持 context cancellation 的情况
    https://play.golang.org/p/Cot1FYgIKLd
    ooh
        36
    ooh  
       2021-02-09 00:11:53 +08:00
    monkeyWie
        37
    monkeyWie  
    OP
       2021-02-09 08:45:30 +08:00
    @wpf375516041 #34 哈哈,搞不好会加入大厂面试题库
    monkeyWie
        38
    monkeyWie  
    OP
       2021-02-09 08:48:38 +08:00
    @caiych #35 实际上很多 work 是不支持 cancel 的,而且也不一定要 cancel 掉,只要不阻塞主协程就行了,发送错误的时候主协程继续执行,其它正在执行的任务让它继续跑。
    monkeyWie
        39
    monkeyWie  
    OP
       2021-02-09 09:47:54 +08:00
    @caiych #35 不好意思前面没看仔细,这个确实可以,赞一个!
    guonaihong
        40
    guonaihong  
       2021-02-09 09:51:10 +08:00
    难道 slice+context.Context+errgroup 的组合不行?

    1.分配 M 容量的 slice 。放 M 个 go 程正常运行的结果。每个 go 程都有自己的 index,所以存结果这块都不要加锁,相当舒服的操作。

    2.context 当作异常终端点。每个 go 程都持有这个 context 变量。任意一个 go 程错误,cancel 。任意 go 程检查 ctx.Done()。所谓 “如果其中有一个任务返回 error 时立即结束” 完美实现。

    3.检查 errgroup 的返回,err != nil,就返回错误,else 部分没有错误,返回第 1 步声明的 slice
    crclz
        41
    crclz  
       2021-02-09 09:54:50 +08:00
    分解问题:
    1. 通过固定的 M 个携程执行。
    解决方案:信号量或者条件变量或者 channel,很多种方法实现。

    2. 只要出现一个任务返回 Error,就立即结束全部任务。
    解决方案:C#的 TAP 的 Task.WhenAny 和 CancellationToken 模式可以很好的解决这些问题。
    那么只需要用 go 来实现 Task.WhenAny 和 CancellationToken 模式即可。

    CancellationToken 很好实现,用 struct {IsCancellationRequired: bool}就可以实现。
    Task.WhenAny 可以用如下方式实现:有一个 chan ch,每个任务完成后,往 ch 里面写一个数字(或者写入任务信息),同时主线程阻塞读取 ch 。
    monkeyWie
        42
    monkeyWie  
    OP
       2021-02-09 09:57:02 +08:00
    @guonaihong #40 应该行的,但是对第一点有点疑问,用 slice 怎么实现 M 个协程的限制呢
    guonaihong
        43
    guonaihong  
       2021-02-09 10:01:11 +08:00
    //放结果伪代码
    // 每个 go 程的 id 已经固定下来,就是 for 循环启动的 index.大家操作自己的私有 index 。为啥会有竞争?
    for i :=0;i < M;i++ {
    i:=i
    go func(){
    slice[i] = result
    }
    }
    jackeliang
        44
    jackeliang  
       2021-02-09 10:09:47 +08:00
    @monkeyWie 发生错误不能立即返回的,需要等待其它协程结束才行,不然协程泄露了。
    aeli
        45
    aeli  
       2021-02-09 10:11:36 +08:00
    所有在并发里要求错误立即中断所有并发返回的,显然是脑抽抽
    dawniii
        46
    dawniii  
       2021-02-09 10:13:53 +08:00
    当其中有一个协程有错误,另一个协程在密集计算,应该是没法打断直接返回的吧
    SignLeung
        47
    SignLeung  
       2021-02-09 11:10:36 +08:00   ❤️ 1
    楼上说的没错,协程好像只能自己结束
    seth19960929
        48
    seth19960929  
       2021-02-09 11:11:06 +08:00
    N 个的容量的 success channel.
    再来一个 err channel

    然后主线程那里 select 这两个 channel 做事情就可以啦.

    ----------------------
    至于你纠结 err 之后协程能不能关闭, 那个不是你关心的事情了. 可以考虑传递一个 context 给 request, err 发生错误的时候进行 cancel context 即可.
    liyunlong41
        49
    liyunlong41  
       2021-02-09 11:24:29 +08:00
    https://play.golang.org/p/YYvynelzIHj
    感兴趣写了下,如果想出现错误后中断其他 goroutine 的处理,其他 goroutine 必须可以被 cancel 掉。
    asAnotherJack
        50
    asAnotherJack  
       2021-02-09 12:47:03 +08:00
    errgroup 可以实现出错时结束流程,前提是你的代码实现了 cancel 逻辑
    Aoang
        51
    Aoang  
       2021-02-09 14:47:03 +08:00 via Android
    并发限制用协程池,退出机制需要自己实现。

    如果懒得实现,就直接摘出来,用系统的实现。main 启动之后开一个协程监听退出信号,收到退出信号之后直接 os.Exit()

    其实还是应该自己在代码中实现出来,那个操作只能玩玩。看看源码,用 runtime.Goexit() 来终止协程。

    errgroup context 都是用来传递消息的,并不是来做终止的。你用这两个来做的话,只能每进行一步就检查一下是否有退出信号,不然就做不到及时退出。
    teawithlife
        52
    teawithlife  
       2021-02-09 14:48:32 +08:00
    @liyunlong41 #49 你好,请教一下,在 61 行的这一部分,为什么两个 channel 之间互相要写数据?
    ```
    select {
    case err := <-errCh:
    handleErr(err, result[1:])
    <-done
    case <-done:
    if len(errCh) > 0 {
    err := <-errCh
    handleErr(err, result[1:])
    return
    }
    fmt.Println("success handle all task:", result[1:])
    }
    ```

    改成这样是否可以?
    ```
    select {
    case err := <-errCh:
    handleErr(err, result[1:])
    case <-done:
    fmt.Println("success handle all task:", result[1:])
    }
    ```
    Aoang
        53
    Aoang  
       2021-02-09 15:01:42 +08:00 via Android
    @teawithlife #52

    因为他代码中 errCh 和 done 作用是一样的,一个 chan 就可以了,检查 chan 中的内容是出错退出的还是最终完成退出就行了。
    liyunlong41
        54
    liyunlong41  
       2021-02-09 15:28:38 +08:00
    @teawithlife 没有相互写数据,是为了一些健壮性考虑吧。第一个 case 如果从 errCh 中读到 err 了,<-done 表示等待其他 goroutine 也都结束,是怕有 goroutine 泄露。第二个 case 是考虑可能有极小概率 errCh 和 done 同时有数据,select 随机选择了 done channel,所以最终再判断下 errCh 是否有 err 。
    monkeyWie
        55
    monkeyWie  
    OP
       2021-02-09 19:50:17 +08:00
    @liyunlong41 #49 目前 35L 这种应该是最优雅的实现,我们用纯标准库实现的还是太复杂了哈哈
    teawithlife
        56
    teawithlife  
       2021-02-09 20:55:51 +08:00
    @liyunlong41 #54 “相互写数据”这个表述不太严谨。不过你说的有道理,从健壮性来说,确实需要考虑这些极端情况。


    @monkeyWie #55 个人觉得 35L 的写法确实比较优雅,但是从效率来说,还是 49L 的写法更好一些,因为 49L 的协程数量是 M 个,而 35L 的协程数量是 N 个,当 N>>M 时,虽然 golang 的协程足够轻量,但是也没必要这么浪费。
    monkeyWie
        57
    monkeyWie  
    OP
       2021-02-09 20:59:03 +08:00
    @teawithlife #56 35L 协程数量其实是 N 个,用信号量做了控制的
    monkeyWie
        58
    monkeyWie  
    OP
       2021-02-09 21:00:12 +08:00
    @monkeyWie #57 说出了,是 M 个
    teawithlife
        59
    teawithlife  
       2021-02-10 08:29:04 +08:00
    @monkeyWie #57 协程有 N 个,只不过通过信号量保证了其中只有 M 个在跑,其他的协程虽然在等待,也需要消耗少量资源
    Nillouise
        60
    Nillouise  
       2021-02-10 09:52:31 +08:00
    我之前研究过 go 的流控怎么写,感觉还是流控好,流控还可以控制几秒才做一个任务,单纯控制并发感觉不如流控。
    monkeyWie
        61
    monkeyWie  
    OP
       2021-02-10 10:55:47 +08:00   ❤️ 1
    @teawithlife #59 额,确实是 N 个协程,不过稍微改下就行了,把信号量控制放在循环里面
    https://play.golang.org/p/SP7a8MaDd8B
    MilletChili
        62
    MilletChili  
       2021-02-10 11:23:42 +08:00
    感觉不需要写太多代码,直接用 channel 控制就好了
    https://play.golang.org/p/JiD2EopqPkL
    monkeyWie
        63
    monkeyWie  
    OP
       2021-02-10 12:10:11 +08:00
    @MilletChili #62 这种思路好像也不错,不过如果要加上参数传递和结果、错误返回也还是挺复杂的
    monkeyWie
        64
    monkeyWie  
    OP
       2021-02-10 12:37:58 +08:00
    @MilletChili #62 可以尝试下用这种思路实现下这里的 run 方法,https://play.golang.org/p/Be7vNF4JH4-
    MilletChili
        65
    MilletChili  
       2021-02-10 13:57:42 +08:00
    @monkeyWie 嗯嗯,真要项目搞,还是用一些开源包好点
    troywinter
        66
    troywinter  
       2021-02-10 16:42:15 +08:00
    控制并发直接用 Semaphore 就行了,至于遇到错误怎么退出 goroutine 应该是你的业务代码自己实现,不同业务场景需要有不同的处理。
    YouLMAO
        67
    YouLMAO  
       2021-02-11 22:50:02 +08:00 via Android
    errgroup.withcontext 楼主是不是没学过 go 呀? 这样只要一个 err,context 就会取消,全部都返回了
    tolerance
        68
    tolerance  
       2021-02-13 00:37:32 +08:00
    把 channel 关了就可以
    kevinwan
        69
    kevinwan  
       2021-02-17 13:09:27 +08:00 via iPhone
    go-zero 下有个 mr 包解决这种场景,遇到 error 可以 cancel 所有任务
    abccccabc
        70
    abccccabc  
       2021-02-19 10:21:51 +08:00
    各位问一下,你们是怎样看 play.golang.org/p/xxxxxx 代码的?难道要 pa 强??
    关于   ·   帮助文档   ·   博客   ·   API   ·   FAQ   ·   实用小工具   ·   2715 人在线   最高记录 6679   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 27ms · UTC 10:05 · PVG 18:05 · LAX 02:05 · JFK 05:05
    Developed with CodeLauncher
    ♥ Do have faith in what you're doing.