V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
推荐学习书目
Learn Python the Hard Way
Python Sites
PyPI - Python Package Index
http://diveintopython.org/toc/index.html
Pocoo
值得关注的项目
PyPy
Celery
Jinja2
Read the Docs
gevent
pyenv
virtualenv
Stackless Python
Beautiful Soup
结巴中文分词
Green Unicorn
Sentry
Shovel
Pyflakes
pytest
Python 编程
pep8 Checker
Styles
PEP 8
Google Python Style Guide
Code Style from The Hitchhiker's Guide
xiaolinjia
V2EX  ›  Python

请教下 Python 多进程类方法 pickle 的问题

  •  
  •   xiaolinjia · 2019-11-19 16:23:33 +08:00 · 4957 次点击
    这是一个创建于 1886 天前的主题,其中的信息可能已经有所发展或是发生改变。

    我用 multiprocessing.Pool 来对类方法进行多进程。 然后以下报错: cPickle.PicklingError: Can't pickle <type 'instancemethod'>: attribute lookup builtin.instancemethod failed

    之后查了一下,说 multiprocessing 默认只支持不在类里面的函数或者类里面的 @staticmethod,于是用

    import copy_reg
    import types
    def _pickle_method(m):
        if m.im_self is None:
            return getattr, (m.im_class, m.im_func.func_name)
        else:
            return getattr, (m.im_self, m.im_func.func_name)
    
    
    copy_reg.pickle(types.MethodType, _pickle_method)
    

    这段代码,解决了上面的错误,但是又以下报错: TypeError: can't pickle thread.lock objects

    然后又查了貌似是类中有些变量不能序列化导致的,然后想请教下这种情况怎么解决啊。 我看了些资料说是可以重写__getstate__和__setstate__魔法方法,但也不知道其参数是怎么传的。也有说将类设成全局变量,然后 pool.apply 的时候就不需要把类传进去,但是这样发现,类会初始化多次,请问要怎么解决呢?

    第 1 条附言  ·  2019-11-20 09:46:34 +08:00
    补充下,我的那个类方法里面大致如下:
    ```
    def received_message(self, socket_url):
    ----ws = create_connection(socket_url) # 有建了一个 websocket 连接
    ----.....
    ----rabbitConn = rabbit.rabbitConnect()
    ----rabbitConn.connect(type=self.rmq_exchange_type) # 有连一个消息队列
    ----.....
    ----self.logger.info("开始获取推送数据..") # 有写一个日志
    ----.....
    ----self.db..... # 有执行些数据库操作
    ```
    请问我是需要取消一些类成员,而是将其作为变量传进去吗。
    第 2 条附言  ·  2019-11-20 10:24:56 +08:00
    再补充一下,其实本身这代码在单进程里是能跑的。后来说改成连多个 websocket,然后我就想到多进程去连。因为那些连 socket,获取数据,分析处理数据都在 received_message 方法里,然后就弄成多进程跑上面的那个 received_message 的类方法,然后就发现上面的问题了,或许我的思路有点问题?
    17 条回复    2019-11-22 14:00:55 +08:00
    wuwukai007
        1
    wuwukai007  
       2019-11-20 09:21:21 +08:00
    pip install dill
    dill.dumps(obj)
    xiaolinjia
        2
    xiaolinjia  
    OP
       2019-11-20 09:30:34 +08:00
    @wuwukai007 这个 obj 指的是?
    wuwukai007
        3
    wuwukai007  
       2019-11-20 09:34:03 +08:00
    你要序列化的对象啊,可以是函数,类,内嵌函数,用的时候在 dill.loads(obj)
    wuwukai007
        4
    wuwukai007  
       2019-11-20 09:34:53 +08:00
    a = dill.dumps(obj)
    把 a 当作参数放到 进程的 args 里面
    xiaolinjia
        5
    xiaolinjia  
    OP
       2019-11-20 09:36:51 +08:00
    @wuwukai007 我进程的 args 里面的参数是可以 pickle 的,可能是进程里的类方法里有些类成员不能 pickle。
    wuwukai007
        6
    wuwukai007  
       2019-11-20 09:46:54 +08:00
    你开多进程,把这个类序列化,传进去当参数,类里面的方法会报错?
    xiaolinjia
        7
    xiaolinjia  
    OP
       2019-11-20 09:53:19 +08:00
    @wuwukai007 我补充了一下问题,我调用的时候是在 main 方法中:
    client = Client()
    from multiprocessing import Pool
    pool = Pool(processes=2)
    for url in ['ws://192.168.0.18:9988/websocket/37', 'ws://192.168.0.18:9988/websocket/44']:
    pool.apply_async(func=client.received_message, args=(url,))
    pool.close()
    pool.join()
    xiaolinjia
        8
    xiaolinjia  
    OP
       2019-11-20 09:54:24 +08:00
    @xiaolinjia 那个空格被吃了,
    client = Client()
    from multiprocessing import Pool
    pool = Pool(processes=2)
    for url in ['ws://192.168.0.18:9988/websocket/37', 'ws://192.168.0.18:9988/websocket/44']:
    ----pool.apply_async(func=client.received_message, args=(url,))
    pool.close()
    pool.join()
    hustlibraco
        9
    hustlibraco  
       2019-11-20 10:00:21 +08:00
    一般不建议这么搞,进程之间共享的数据最好只是整型、字符串,每个进程单独创建一个 client 比较好。强行修改序列化对象一个是不安全,一个是复杂不可重用。
    xiaolinjia
        10
    xiaolinjia  
    OP
       2019-11-20 10:14:13 +08:00
    @hustlibraco 其实我也有想过这个,不过就是单独创建一个实例的话,会多次初始化,但是我又想只初始化一次。
    aaronhua
        11
    aaronhua  
       2019-11-20 10:18:25 +08:00
    看着自己堆的一坨坨 shi 山,不禁感慨,有时候代码简单点,效率低点也没有关系。
    wuwukai007
        12
    wuwukai007  
       2019-11-20 10:33:44 +08:00
    socket 对象好像不能被序列化把
    xiaolinjia
        13
    xiaolinjia  
    OP
       2019-11-20 10:56:30 +08:00
    @wuwukai007 socket 这里,我刚才单独测试的时候应该是没问题,因为他是每个进程内,才创建的。应该不会参与序列化这块。。
    wuwukai007
        14
    wuwukai007  
       2019-11-20 11:33:31 +08:00
    copy_reg.pickle(types.MethodType, _pickle_method)
    你这一步报错
    把这一步换成
    dill.dumps(_pickle_method)
    试一下,
    xiaolinjia
        15
    xiaolinjia  
    OP
       2019-11-21 14:47:17 +08:00
    @wuwukai007 这个连第一步都失败了,现在总结得出,应该是有个数据库的持续化链接的问题,就是 self.db 。其他的 websocket,rabbitmq,因为都是在进程内创建的都没问题,应该就是剩下这个 oracle 的数据库链接不能 pickle 了
    wuwukai007
        16
    wuwukai007  
       2019-11-21 15:01:29 +08:00
    oracle 数据库连接不能序列化, 用连接池 pip install DBUtils
    hustlibraco
        17
    hustlibraco  
       2019-11-22 14:00:55 +08:00
    @xiaolinjia 不知道你有没有测试过,序列化对象的性能开销和对象重新创建的性能开销,谁更大
    关于   ·   帮助文档   ·   博客   ·   API   ·   FAQ   ·   实用小工具   ·   2970 人在线   最高记录 6679   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 22ms · UTC 07:04 · PVG 15:04 · LAX 23:04 · JFK 02:04
    Developed with CodeLauncher
    ♥ Do have faith in what you're doing.