V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
推荐学习书目
Learn Python the Hard Way
Python Sites
PyPI - Python Package Index
http://diveintopython.org/toc/index.html
Pocoo
值得关注的项目
PyPy
Celery
Jinja2
Read the Docs
gevent
pyenv
virtualenv
Stackless Python
Beautiful Soup
结巴中文分词
Green Unicorn
Sentry
Shovel
Pyflakes
pytest
Python 编程
pep8 Checker
Styles
PEP 8
Google Python Style Guide
Code Style from The Hitchhiker's Guide
aoscici2000
V2EX  ›  Python

Celery + Rabbitmq 报错:接收删除未知信息?

  •  
  •   aoscici2000 · 2019-12-04 12:06:03 +08:00 · 2106 次点击
    这是一个创建于 783 天前的主题,其中的信息可能已经有所发展或是发生改变。

    /celerytask/myapp.py

    app = Celery('celerytask')
    app.config_from_object('celerytask.config')
    

    /celerytask/config.py

    broker_url = 'amqp://guest:[email protected]:5672//'
    result_backend = 'redis://localhost:6379/0'
    accept_content = ['json', 'application/text']
    
    task_queues = {
        'celery': {
            'exchange': 'celery',
            'routing_key': 'celery',
        },
    }
    
    task_routes = {
        'celerytask.task.hello': {
            'queue': 'celery',
            'routing_key': 'celery'
        },
    }
    
    imports = ['celerytask.task',]
    

    /celerytask/task.py

    @app.task
    def hello(msg='hello'):
        time.sleep(5)
        return msg.upper()
    

    serve.py

    class CallHandler(RequestHandler):
    
        def get(self):
            msg = self.get_argument('msg', 'default')
            hello.apply_async(args=(msg,))
            self.write('ok')
    
    class PushHandler(RequestHandler):
    
        def get(self):
            msg = self.get_argument('msg', 'default')
            self.application.publisher.publish(
                exchange='celery',
                routing_key='celery',
                body={
                    'id': str(uuid.uuid1()),
                    'args': [msg, ],
                    'task': 'celerytask.task.hello'
                }
            )
            self.write('ok')
    

    直接调用(CallHandler)一切正常, 如果是把消息推到队列(PushHandler), celery 就报:

    [ WARNING/MainProcess]Received and deleted unknown message. Wrong destination?!?

    The full contents of the message body was: body: '{"id": "6c50942e-1045-15ea-96c3-5251a82fa45f", "args": ["fsafifds"], "task": "celerytask.task.hello"}' (96b) {content_type:None content_encoding:None delivery_info:{'consumer_tag': 'None3', 'delivery_tag': 1, 'redelivered': False, 'exchange': 'celery', 'routing_key': 'celery'} headers={}}

    这里 task.hello()也并没有执行, 关于这个报错, 网上查了能查到的几个解决方法, 包括 celery 版本降到 3.1, 卸载 librabbitmq 改用 pyamqp, 改 task_protocol=1, 都试了一个遍, 貌似都没用, 不知道该怎么解决?

    另外有一点想知道的是直接调用任务函数和推送到消息队列有什么区别吗?

    1 条回复    2019-12-04 12:38:20 +08:00
    aoscici2000
        1
    aoscici2000  
    OP
       2019-12-04 12:38:20 +08:00
    已解决, content_type 没匹配上..... 折腾了两天偌大的提示都没留意到
    关于   ·   帮助文档   ·   API   ·   FAQ   ·   我们的愿景   ·   广告投放   ·   感谢   ·   实用小工具   ·   2577 人在线   最高记录 5497   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 27ms · UTC 13:36 · PVG 21:36 · LAX 05:36 · JFK 08:36
    ♥ Do have faith in what you're doing.