V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
nyxsonsleep
V2EX  ›  Node.js

node 怎么实现并行化执行传输不可序列化对象?

  •  
  •   nyxsonsleep · 2024-07-19 14:58:07 +08:00 · 3304 次点击
    这是一个创建于 411 天前的主题,其中的信息可能已经有所发展或是发生改变。

    现在的需求是存在一个类内的方法,原来是串行的,现在需要改并行。现在我需要将一个对象 obj ,或者对象的方法 obj.run 传入子线程,然后回调执行。

    但是我尝试了几种方式,似乎是没办法将复杂的对象进行传递?以至于常规的回调函数的方式没办法在 node 的并行化中实现。

    1. worker_thread
    new Worker(moduleThreadFile, 
    workerData:{'obj':obj}) //ERROR
    

    会报告 Cannot set property code of which has only a getter.

    1. workerpool
    pool=workerpool.pool()
    pool.exec(obj,[])
    

    实际上传入的 obj 在子线程中是 undefined

    第 1 条附言  ·  2024-07-19 16:36:40 +08:00
    计算密集型任务。
    51 条回复    2024-08-10 11:32:35 +08:00
    okakuyang
        1
    okakuyang  
       2024-07-19 15:08:23 +08:00
    维护一个 map ,用 id 来区分哪个任务完成了,触发相应回调。
    nyxsonsleep
        2
    nyxsonsleep  
    OP
       2024-07-19 15:11:20 +08:00
    @okakuyang #1 能简单举例一些伪代码来做示例吗? node 用得少。
    jifengg
        3
    jifengg  
       2024-07-19 15:59:05 +08:00
    没用过 workerpool ,但是,以我的理解
    pool.exec(obj,[])
    exec 第一个参数应该是一个 function ?,第二个参数你写了空数组的[],应该是传给这个 function 的参数列表?
    yaodong0126
        4
    yaodong0126  
       2024-07-19 16:20:20 +08:00
    传不了,看文档,写的清楚的不能更清楚了,用什么工具之前多看文档,多看文档
    shadowyue
        5
    shadowyue  
       2024-07-19 16:22:56 +08:00
    看文档,worker 能传递的数据格式有要求的
    nyxsonsleep
        7
    nyxsonsleep  
    OP
       2024-07-19 16:27:27 +08:00
    @jifengg #3 function 也试过,没区别
    nyxsonsleep
        8
    nyxsonsleep  
    OP
       2024-07-19 16:28:42 +08:00
    @yaodong0126 #4 我想我需要的什么工具能支持传输完整原始对象的方法。
    photon006
        9
    photon006  
       2024-07-19 16:29:32 +08:00
    bluebird.map()比较方便实现并行,还能设置并发量 concurrency

    http://bluebirdjs.com/docs/api/promise.map.html
    nyxsonsleep
        10
    nyxsonsleep  
    OP
       2024-07-19 16:29:38 +08:00
    @shadowyue #5 这说明了 workerData 不能传输这种对象。那么有其他方法可以实现吗?
    luckyscript
        11
    luckyscript  
       2024-07-19 16:33:23 +08:00
    workerData <any> Any JavaScript value that is cloned and made available as require('node:worker_threads').workerData. The cloning occurs as described in the HTML structured clone algorithm, and an error is thrown if the object cannot be cloned (e.g. because it contains functions).

    ---

    一定要通过子线程的方案来实现吗?把需要执行的方法改造成异步的形式是不是也可以。
    nyxsonsleep
        12
    nyxsonsleep  
    OP
       2024-07-19 16:34:14 +08:00
    @luckyscript #11 这个是计算密集任务
    shadowyue
        13
    shadowyue  
       2024-07-19 16:34:24 +08:00
    #10 @nyxsonsleep

    你如果一定要传递这个对象参数,最直接的就是把这个对象,转成符合要求的数据类型。
    比如把对象拍平,把值弄成数组传递
    nyxsonsleep
        14
    nyxsonsleep  
    OP
       2024-07-19 16:36:12 +08:00
    @photon006 #9 看起来是个异步模型。我这个可能需要的是计算密集型加速方案。
    nyxsonsleep
        15
    nyxsonsleep  
    OP
       2024-07-19 16:38:54 +08:00
    @shadowyue #13 无法实现,对象非常复杂。
    shadowyue
        16
    shadowyue  
       2024-07-19 16:43:26 +08:00
    #15 对象可以序列化存储吗?可以的话直接写文件或者写数据库,worker 自己去读
    photon006
        17
    photon006  
       2024-07-19 16:46:44 +08:00
    @nyxsonsleep

    确实,bluebird.map()适合 io 密集型任务的并行执行,计算密集型不合适。

    计算密集型需要调用 cpu 多线程,可以通过 worker_threads 实现

    主线程分配任务给 worker 线程,最简单就是传递字符串

    const tasks = [
    `
    function taskA() {
    // do something
    }
    `
    ,
    `
    function taskB() {
    // do something
    }
    `
    ]

    worker 线程接收到用 eval 语法执行 taskA 、taskB ,把执行结果返回给主线程。
    xiwh
        19
    xiwh  
    PRO
       2024-07-19 17:06:53 +08:00
    题主的需求大概率实现不了,Node 的 Worker 是并行用多进程实现的,那哪些对象是没法序列化的?文件句柄,线程句柄,tcp/udp 连接句柄,而这些资源在进程间都是隔离的,即便是强行序列化传过去也用不了,当然 Linux 似乎有方式实现进程间资源共享,最好的方式还是支持基于内存通信的多线程的语言 go java c++等
    yaodong0126
        20
    yaodong0126  
       2024-07-19 17:10:50 +08:00
    我的天,有这么难吗,为什么一定要让线程去执行回调,线程把任务完成后通知主不可以吗?
    sinalvee
        21
    sinalvee  
       2024-07-19 17:23:22 +08:00
    没实践过,不知道有没有别的坑,思路就是把函数转为字符串,worker 中再把字符串转回来

    ```js
    const { Worker, isMainThread, parentPort, workerData } = require('node:worker_threads');

    if (isMainThread) {
    const obj = {
    name: 'Foo',

    greet(other) {
    return `Hello ${this.name} and ${other}`;
    }
    }

    const objStr = JSON.stringify(obj, (key, value) => {
    if (typeof value === 'function') {
    return value.toString();
    }
    return value;
    });

    const worker = new Worker(__filename, {
    workerData: objStr,
    });
    worker.on('message', (value) => {
    console.log('Receive data from worker =>', value);
    });
    worker.on('error', console.error);
    worker.on('exit', (code) => {
    if (code !== 0)
    console.error(new Error(`Worker stopped with exit code ${code}`));
    });
    } else {
    const objStr = workerData;
    const objParsed = JSON.parse(objStr);

    const run = (obj, funcName, ...args) => {
    if (obj.hasOwnProperty(funcName)) {
    const funcStr = obj[funcName];
    // 提取函数体,忽略函数参数定义
    const funcBody = funcStr.substring(obj.greet.indexOf('{') + 1, obj.greet.lastIndexOf('}'));
    // 使用剩余参数语法来定义一个新的函数,允许接收任意数量的参数
    const funcArgs = funcStr.substring(funcStr.indexOf('(') + 1, funcStr.indexOf(')')).split(',').map(arg => arg.trim()).filter(arg => arg);
    const func = new Function(...funcArgs, funcBody);

    return func.call(obj, ...args);
    }
    }

    const result = run(objParsed, 'greet', 'Bar');

    parentPort.postMessage(result);
    }
    ```
    nyxsonsleep
        22
    nyxsonsleep  
    OP
       2024-07-19 17:28:20 +08:00
    @shadowyue #16 JSON.stringfy 试过,不行,函数丢失了。这个对象有很多静态成员和方法。
    nyxsonsleep
        23
    nyxsonsleep  
    OP
       2024-07-19 17:49:00 +08:00
    @yaodong0126 #20 首先这个工程的主要功能需要编译成一个单文件,由于某些原因可能不会轻易改变这种行为。
    此外由于 node 的多线程需要一个单独的 js 文件作为入口,所以我创建了一个新工程,因此这个工程无法直接导入原工程的内容。所以我尝试进行回调。
    Melting
        24
    Melting  
       2024-07-19 17:49:50 +08:00
    可以用 module.exports 导出方法,在 worker_threads 里调用吧
    nyxsonsleep
        25
    nyxsonsleep  
    OP
       2024-07-19 18:18:03 +08:00
    @Melting #24
    > 首先这个工程的主要功能需要编译成一个单文件,由于某些原因可能不会轻易改变这种行为。
    此外由于 node 的多线程需要一个单独的 js 文件作为入口,所以我创建了一个新工程,来创建新的 js 文件入口,因此这个工程无法直接导入原工程的内容。所以我尝试进行回调。
    okakuyang
        26
    okakuyang  
       2024-07-19 18:37:38 +08:00 via iPhone
    worker 不能传函数,你可以一个任务给一个 id ,任务处理完传 id 给主线程,根据 id 执行对应回调。超级简单。
    nyxsonsleep
        27
    nyxsonsleep  
    OP
       2024-07-19 18:45:43 +08:00
    @okakuyang #26 没看明白。我就是需要将任务传递给 worker ,不然这个任务怎么处理完呢?子进程根本不知道应该做什么吧。
    现在我需要并行化的就是 obj.run 函数,子进程中没有这个函数,也没有这个对象,怎么执行 obj.run 呢?
    momocraft
        28
    momocraft  
       2024-07-19 19:12:47 +08:00
    重写一份不绑定对象的,参数可以序列化的 run
    EchoWhale
        29
    EchoWhale  
       2024-07-19 19:32:28 +08:00 via iPhone
    别想了,自己实现一个序列化/反序列化方法吧。
    nomagick
        30
    nomagick  
       2024-07-19 19:43:32 +08:00
    你就别当 node 有线程,node 相当于没有线程,先序列化再反序列化,多进程模型

    而且 node 里面的 fork, 也不是你认为的 fork, 纯就是重新再启动一个新的
    nomagick
        31
    nomagick  
       2024-07-19 19:44:38 +08:00
    函数和对象引用都是不共享的,也不能传递,只能通信
    mark2025
        32
    mark2025  
       2024-07-19 20:11:51 +08:00
    @nyxsonsleep 在 worker 内加载/初始化这个对象不行么?
    okakuyang
        33
    okakuyang  
       2024-07-19 20:30:38 +08:00
    @nyxsonsleep 你的 worker 里面本身要有 [处理计算] 的代码,主线程只是负责把 [要处理的数据] [id] 传给 worker 线程,worker 计算完成之后只负责把计算好的数据 [字符串/基本类型/字节] [id] 发给主线程,主线程收到处理好的数据根据 [id] 再进行下一步处理 [合并数据] 。
    nyxsonsleep
        34
    nyxsonsleep  
    OP
       2024-07-20 14:16:56 +08:00
    @mark2025 #32 > 首先这个工程的主要功能需要编译成一个单文件,由于某些原因可能不会轻易改变这种行为。
    此外由于 node 的多线程需要一个单独的 js 文件作为入口,所以我创建了一个新工程,来创建新的 js 文件入口,因此这个工程无法直接导入原工程的内容。

    基于以上原因,实际上 worker 里基本没办法初始化这个对象。或者能提供某种方法可以避免上述的限制吗?
    image72
        35
    image72  
       2024-07-20 19:10:09 +08:00
    我看到 workerData 支持 Blob, dataview,arraybuffer 类型(The structured clone algorithm
    )[https://developer.mozilla.org/en-US/docs/Web/API/Web_Workers_API/Structured_clone_algorithm#webapi_types]
    mark2025
        36
    mark2025  
       2024-07-21 15:55:52 +08:00
    @nyxsonsleep 能不能就这一个 js 文件既是主主入口文件也是 worker 入口文件,然后内部根据 mian/worker 分支判断呢?
    nyxsonsleep
        37
    nyxsonsleep  
    OP
       2024-07-21 18:33:53 +08:00
    @mark2025 #36 这个 js 文件有 10+M ,worker 子线程启动的时候会不会有性能问题。
    yaodong0126
        38
    yaodong0126  
       2024-07-22 09:28:31 +08:00
    @nyxsonsleep 你编不编译成单文件不重要啊,你都新建一个工程了,那么你的新工程存在两个文件就不可以?文件 A 作为和主交互的入口,并在里面 require 你的单文件 B 执行,B 执行完毕通过 A 把消息传递给主,不就完了吗,下面 26 楼说的也是这个意思
    accelerator1
        39
    accelerator1  
       2024-07-22 09:35:16 +08:00
    nodejs 中的 worker 是多进程,不是多线程,不能内存共享,正常情况没法传递引用。
    但是 worker 支持 transferable 对象,也就是可以直接传递引用避免进程间的数据拷贝,其实还是要自己实现序列/反序列化。

    如果你的 obj 不可序列化,那就把 obj 的实例化函数放到 worker 中,通过传递 obj 实例化的相关参数来实现。
    kyuuseiryuu
        40
    kyuuseiryuu  
       2024-07-22 10:13:28 +08:00
    源码级传递 —— 你在 worker 侧实现一个一摸一样的类。这样把数据丢过去就能计算了。
    SenseHu
        41
    SenseHu  
       2024-07-22 12:44:31 +08:00
    有没可能方向就错了, node 适合计算密集型任务?
    yaodong0126
        42
    yaodong0126  
       2024-07-22 13:52:44 +08:00
    不懂的能不能不要乱说? worker 什么时候变成进程了?你去 ps 看看进程号不难吧?天天在那误人子弟
    yaodong0126
        43
    yaodong0126  
       2024-07-22 13:55:13 +08:00
    进程间还能避免数据拷贝,说话都有逻辑吗?没事看看书吧
    zhufpy
        44
    zhufpy  
       2024-07-22 18:02:02 +08:00
    想办法序列化对象吧~,或者在通过一些参数,在 worker 里实例化对象
    lmw2616
        45
    lmw2616  
       2024-07-23 00:52:23 +08:00
    切片上传?
    lmw2616
        46
    lmw2616  
       2024-07-23 00:56:27 +08:00
    @lmw2616 理解错了( ̄▽ ̄)"
    yaodong0126
        47
    yaodong0126  
       2024-07-23 09:19:33 +08:00
    官方都说了不允许传递 function ,还在那序列化,序列化,现在程序员的平均水平是真的低
    image72
        48
    image72  
       2024-07-27 11:28:02 +08:00
    @Livid 请求删除屏蔽 @yaodong0126 用户引战,群嘲不友善行为
    yaodong0126
        49
    yaodong0126  
       2024-07-29 09:11:33 +08:00
    @image72 菜就多练
    accelerator1
        50
    accelerator1  
       2024-08-10 11:25:47 +08:00
    @yaodong0126 看起来是在说我,虽然但是,你说的对,是进程不是线程,因为是线程隔离,无法直接传递引用,我的确误人子弟了。
    accelerator1
        51
    accelerator1  
       2024-08-10 11:32:35 +08:00
    @yaodong0126 对不起,还是写反了,能理解意思就好😁
    关于   ·   帮助文档   ·   自助推广系统   ·   博客   ·   API   ·   FAQ   ·   实用小工具   ·   3076 人在线   最高记录 6679   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 26ms · UTC 11:39 · PVG 19:39 · LAX 04:39 · JFK 07:39
    Developed with CodeLauncher
    ♥ Do have faith in what you're doing.