V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
The Go Programming Language
http://golang.org/
Go Playground
Go Projects
Revel Web Framework
chaleaochexist
V2EX  ›  Go 编程语言

请教 goroutine 通信写法问题

  •  
  •   chaleaochexist · 2022-08-17 18:22:21 +08:00 · 2594 次点击
    这是一个创建于 853 天前的主题,其中的信息可能已经有所发展或是发生改变。

    image

    第一种想法如上图: main goroutine 遍历这个 chan, 将所有数据读出. 但是问题是. 左边那三个 goroutine 如果结束了. main goroutine 如何结束? 岂不是阻塞了? 也没法关闭, 如果左边的其中一个关闭了. 那另外两个的数据就读不出来了.

    第二种思路是创建三个 chan image

    这个直接 close 就行了.

    我的问题是: 第一种想法中的问题如何解决?

    是不是实践中第二种想法比较符合套路?

    确实是新手. 大佬勿喷.

    第 1 条附言  ·  2022-08-18 11:28:37 +08:00
    谢谢 已经解决. 参考楼下代码即可.
    第 2 条附言  ·  2022-08-18 11:29:11 +08:00
    ns09005264 的代码
    35 条回复    2022-08-31 19:24:19 +08:00
    bruce0
        1
    bruce0  
       2022-08-17 18:27:43 +08:00
    第一个用 sync.waitGroup 应该能解决吧
    ch2
        2
    ch2  
       2022-08-17 18:28:52 +08:00
    waitgroup
    chaleaochexist
        3
    chaleaochexist  
    OP
       2022-08-17 18:30:04 +08:00
    @bruce0
    @ch2

    可以是可以. 那 chan 的 buffer 得足够大.要不然就死锁了吧?
    qq1009479218
        4
    qq1009479218  
       2022-08-17 18:31:48 +08:00   ❤️ 1
    也可以再起一个 chan, 在想结束的时候通知 main 该结束了
    chaleaochexist
        5
    chaleaochexist  
    OP
       2022-08-17 18:33:25 +08:00
    @qq1009479218 明白了. 用 chan 或者 context
    在主线程中用 select 检测那三个 goroutine. 好像可行. 不知道算不算最佳实践.
    ch2
        6
    ch2  
       2022-08-17 18:39:11 +08:00
    @chaleaochexist #3 我记得有第三方写的 infinite chan ,无限大缓冲区的 chan
    lxdlam
        7
    lxdlam  
       2022-08-17 18:41:43 +08:00   ❤️ 4
    Reading Material: https://go.dev/blog/pipelines

    拆成两个问题:
    1. 多个 goroutine 如何读取消息
    - 使用 fan-in 和 fan-out pattern ,将其结果汇总到一个 channel 里,此时原始 goroutine 关闭 channel 不影响;
    - 直接 select 多个 channel 。
    2. 当某个 gorutine 退出时如何通知其他的 goroutine 退出:
    a. (可选)如果需要等待其他 goroutine 退出的话,使用 sync.WaitGroup 等待;
    b. 使用一个 exitChannel ( chan struct{} 就行),接收到退出信号的时候直接由 main close ,其他 goroutine 使用 `for { select { case <- exitChannel: return default: logic} }` 的形式来正确接受退出信号
    lxdlam
        8
    lxdlam  
       2022-08-17 18:42:44 +08:00
    @lxdlam 没有换行写的有点乱,最后一条 case 和 default 是不同的分支
    qq1009479218
        9
    qq1009479218  
       2022-08-17 18:46:51 +08:00   ❤️ 1
    @chaleaochexist 三个 goroutinue 在想结束的时候发一个消息到用来结束的 chan 里,在 main 里面 select 监听,在监听到三次之后,说明三个 groutinue 全部执行完了,return main 就好了
    这种方法,是其他协程通知主协程自己结束了,主协程收到这个通知,再决定下一步怎样做
    而 context 其实是 main 协程管理其他协程的,就是 main 想让其他协程结束时调用 cancel ,其他协程通过监听 ctx.Done(),就可以 return 了
    复杂并发应用中 goutinue 之间的关系,其实是树状的,你想在一个树的节点,结束这个数下面的所有的子 goroutinue 时,就用 context ,在子 goutinue 中传递值也可以
    ilylx2008
        10
    ilylx2008  
       2022-08-17 18:52:36 +08:00
    你们真强,我都没看明白楼主在说啥。
    jitongxi
        11
    jitongxi  
       2022-08-17 18:54:29 +08:00
    一个 tcp 连接一个 goroutine , 结束, 不管客户端还是服务端都是。
    加个 channel 就是脱裤子放屁
    rrfeng
        12
    rrfeng  
       2022-08-17 20:10:59 +08:00
    考虑下 main goroutine 为啥要结束??
    chaleaochexist
        13
    chaleaochexist  
    OP
       2022-08-17 20:21:42 +08:00
    @rrfeng 也不是一定要结束, 而是继续往下走.
    nmap
        14
    nmap  
       2022-08-17 20:45:58 +08:00
    第二种吧,逻辑清晰,实现简单
    fds
        15
    fds  
       2022-08-17 21:09:39 +08:00
    nuk
        16
    nuk  
       2022-08-17 21:24:49 +08:00
    往 channel 里写个结束的标记就行了吧,要不然就加个生存期管理
    haoliang
        17
    haoliang  
       2022-08-17 21:33:14 +08:00
    第一种消耗比较小啊,可以考虑增加规定个独特的终止信息在生产端退出时发出,消费端识别处理下(比较类似于 waitgroup ,消费端处理终止信息时也可以用 atomic 计数)
    joesonw
        18
    joesonw  
       2022-08-17 22:52:55 +08:00 via iPhone
    用 waitgroup 为什么会死锁?只是 routine 里因为 channel 满了,阻塞住,要等 channel 用空位了,才会塞入,然后继续运作。
    chaleaochexist
        19
    chaleaochexist  
    OP
       2022-08-18 07:44:14 +08:00
    @joesonw 三个生产者堵死. 因为 chan 满了.
    消费者堵死, 因为一直在 wait.
    joesonw
        20
    joesonw  
       2022-08-18 09:19:02 +08:00 via iPhone
    @chaleaochexist 都满了消费者怎么堵死,直接消费啊
    wisej
        21
    wisej  
       2022-08-18 09:27:19 +08:00
    @chaleaochexist 你对 waitgroup 的使用有误解,add 和 wait 都是生产者侧调用的。跟 consumer 没关系,consumer 只需要循环读取 chan 消费。

    另外,如果 producer 并发很高,建议多个 chan ;毕竟 chan 底层依赖 mutex ,main 里再 for select 消费
    index90
        22
    index90  
       2022-08-18 09:35:32 +08:00   ❤️ 1
    最简单就是加一个 waitgroup ,再起一个 goroutine 去 wait ,wait 到了就 close channel ,main routine 用 for range 去读。channel 。
    index90
        23
    index90  
       2022-08-18 09:37:25 +08:00
    套路都是 main routine 去 range channel ,剩下的问题就是如何 close channel ,这个建议你搜索“如何优雅关闭 channel”,学习 channel 使用的几个套路。
    ns09005264
        24
    ns09005264  
       2022-08-18 09:37:43 +08:00   ❤️ 1
    ```
    ch := make(chan any)
    var wg sync.WaitGroup
    for i := 0; i < 10; i++ {
    wg.Add(1)
    go func(done func()) {
    defer done()
    time.Sleep(time.Second * 3)
    ch <- rand.Int()
    }(wg.Done)
    }
    go func() {
    wg.Wait()
    close(ch)
    }()
    for v := range ch {
    log.Printf("v:%v", v)
    }
    log.Printf("这里等待执行")

    ```
    ns09005264
        25
    ns09005264  
       2022-08-18 09:42:32 +08:00   ❤️ 1
    joesonw
        26
    joesonw  
       2022-08-18 09:58:54 +08:00 via iPhone
    @ns09005264 Done 应该要放在消费者里,不然假如消费者处理时间比较长的,会漏掉最后一条。
    joesonw
        27
    joesonw  
       2022-08-18 10:00:26 +08:00 via iPhone
    @ns09005264 忽略我上面说的,没看到是 unbuffered
    BingoXuan
        28
    BingoXuan  
       2022-08-18 10:51:02 +08:00
    这个方案我研究过,头疼得很。最后选择手写 CAS 得 ringbuffer 的方案。
    loveuer
        29
    loveuer  
       2022-08-18 10:55:53 +08:00
    第一种写法按照官方的说法就是不要过分关注 chan 的关闭了, 毕竟没有数据了, chan 的占用很小, 程序如果结束自然也就交还操作系统了
    seth19960929
        30
    seth19960929  
       2022-08-18 11:18:26 +08:00   ❤️ 2
    close chan 两个原则
    1. 不要在接收端关闭, 也就是你代码里的 main goroutine
    2. 有多个同时写, 不要在写的地方关闭, 也就是你的代码中 goroutine1,2,3

    所以最好的做法, 就是楼上给的那个代码, 先 waitGroup 够三个之后, 直接在 main 关闭就行了
    chaleaochexist
        31
    chaleaochexist  
    OP
       2022-08-18 11:24:18 +08:00
    @index90
    @ns09005264
    谢谢 最后用的这个方法解决了.
    seth19960929
        32
    seth19960929  
       2022-08-18 11:24:42 +08:00
    package main

    import (
    "fmt"
    "math/rand"
    "sync"
    "time"
    )

    func main() {

    // init var
    ch := make(chan int)
    wg := &sync.WaitGroup{}

    // goroutine1, 2, 3
    for i := 0; i < 3; i++ {
    wg.Add(1)
    go task(wg, ch)
    }

    // read chan data
    go func() {
    for val := range ch {
    fmt.Println(val)
    }
    }()

    // wait group
    wg.Wait()
    close(ch)

    // close fast, can`t read all chan data
    time.Sleep(time.Second)
    }

    func task(wg *sync.WaitGroup, ch chan int) {
    defer wg.Done()

    ts := rand.Intn(3) + 1
    time.Sleep(time.Second * time.Duration(ts))

    ch <- ts
    }
    lessMonologue
        33
    lessMonologue  
       2022-08-18 11:35:44 +08:00
    channel 为什么要关闭?
    index90
        34
    index90  
       2022-08-18 20:41:43 +08:00
    这里可以延伸一个问题,你上述例子用了 4 个 goroutine ,如果用 waitgroup ,就要用 5 个 goroutine ,问能否用 3 个 goroutine 解决问题?
    paceewang1
        35
    paceewang1  
       2022-08-31 19:24:19 +08:00
    1 、chan 关闭了里面的数据可以继续读,只是不可以写
    2 、多个 goroutine 关闭用 context
    关于   ·   帮助文档   ·   博客   ·   API   ·   FAQ   ·   实用小工具   ·   4144 人在线   最高记录 6679   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 26ms · UTC 05:29 · PVG 13:29 · LAX 21:29 · JFK 00:29
    Developed with CodeLauncher
    ♥ Do have faith in what you're doing.