V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
推荐学习书目
Learn Python the Hard Way
Python Sites
PyPI - Python Package Index
http://diveintopython.org/toc/index.html
Pocoo
值得关注的项目
PyPy
Celery
Jinja2
Read the Docs
gevent
pyenv
virtualenv
Stackless Python
Beautiful Soup
结巴中文分词
Green Unicorn
Sentry
Shovel
Pyflakes
pytest
Python 编程
pep8 Checker
Styles
PEP 8
Google Python Style Guide
Code Style from The Hitchhiker's Guide
fzzff
V2EX  ›  Python

django+ celery 的 gevent 模式使用 sync_to_async 时报错, 请问有什么好的处理方案吗?

  •  
  •   fzzff · 2022-11-02 12:04:06 +08:00 · 2038 次点击
    这是一个创建于 780 天前的主题,其中的信息可能已经有所发展或是发生改变。

    复现方式:

    使用 gevent 模式运行 celery

     celery -A config.celery_app worker -P gevent -l INFO
    

    celery 任务

    from asgiref.sync import sync_to_async
    
    
    async def _test_async():
        print(await sync_to_async(User.objects.get)(pk=1))
    
    
    @shared_task(ignore_result=True)
    def test_async():
        asyncio.run(_test_async())
    

    在 django 中运行调用异步任务将得到以下异常:

    [2022-11-02 11:58:12,883: ERROR/MainProcess] Task apps.dark_chain.tasks.test_async[fbf4f896-0020-4273-90c2-8a4ef8f4cc46] raised unexpected: SynchronousOnlyOperation('You cannot call this from an async context - use a thread or sync_to_async.')
    Traceback (most recent call last):
      File "/Users/xxxxx/.pyenv/versions/3.9.7/envs/yingyan3/lib/python3.9/site-packages/celery/app/trace.py", line 451, in trace_task
        R = retval = fun(*args, **kwargs)
      File "/Users/xxxxx/.pyenv/versions/3.9.7/envs/yingyan3/lib/python3.9/site-packages/celery/app/trace.py", line 734, in __protected_call__
        return self.run(*args, **kwargs)
      File "/Users/xxxxx/Desktop/work_shuan/web/YingYan/apps/dark_chain/tasks.py", line 179, in test_async
        asyncio.run(_test_async())
      File "/Users/xxxxx/.pyenv/versions/3.9.7/lib/python3.9/asyncio/runners.py", line 44, in run
        return loop.run_until_complete(main)
      File "/Users/xxxxx/.pyenv/versions/3.9.7/lib/python3.9/asyncio/base_events.py", line 642, in run_until_complete
        return future.result()
      File "/Users/xxxxx/Desktop/work_shuan/web/YingYan/apps/dark_chain/tasks.py", line 174, in _test_async
        print(await sync_to_async(User.objects.get)(pk=1))
      File "/Users/xxxxx/.pyenv/versions/3.9.7/envs/yingyan3/lib/python3.9/site-packages/asgiref/sync.py", line 435, in __call__
        ret = await asyncio.wait_for(future, timeout=None)
      File "/Users/xxxxx/.pyenv/versions/3.9.7/lib/python3.9/asyncio/tasks.py", line 442, in wait_for
        return await fut
      File "/Users/xxxxx/.pyenv/versions/3.9.7/lib/python3.9/concurrent/futures/thread.py", line 52, in run
        result = self.fn(*self.args, **self.kwargs)
      File "/Users/xxxxx/.pyenv/versions/3.9.7/envs/yingyan3/lib/python3.9/site-packages/asgiref/sync.py", line 476, in thread_handler
        return func(*args, **kwargs)
      File "/Users/xxxxx/.pyenv/versions/3.9.7/envs/yingyan3/lib/python3.9/site-packages/django/db/models/manager.py", line 85, in manager_method
        return getattr(self.get_queryset(), name)(*args, **kwargs)
      File "/Users/xxxxx/.pyenv/versions/3.9.7/envs/yingyan3/lib/python3.9/site-packages/django/db/models/query.py", line 435, in get
        num = len(clone)
      File "/Users/xxxxx/.pyenv/versions/3.9.7/envs/yingyan3/lib/python3.9/site-packages/django/db/models/query.py", line 262, in __len__
        self._fetch_all()
      File "/Users/xxxxx/.pyenv/versions/3.9.7/envs/yingyan3/lib/python3.9/site-packages/django/db/models/query.py", line 1354, in _fetch_all
        self._result_cache = list(self._iterable_class(self))
      File "/Users/xxxxx/.pyenv/versions/3.9.7/envs/yingyan3/lib/python3.9/site-packages/django/db/models/query.py", line 51, in __iter__
        results = compiler.execute_sql(chunked_fetch=self.chunked_fetch, chunk_size=self.chunk_size)
      File "/Users/xxxxx/.pyenv/versions/3.9.7/envs/yingyan3/lib/python3.9/site-packages/django/db/models/sql/compiler.py", line 1200, in execute_sql
        cursor = self.connection.cursor()
      File "/Users/xxxxx/.pyenv/versions/3.9.7/envs/yingyan3/lib/python3.9/site-packages/django/utils/asyncio.py", line 23, in inner
        raise SynchronousOnlyOperation(message)
    django.core.exceptions.SynchronousOnlyOperation: You cannot call this from an async context - use a thread or sync_to_async.
    
    
    2 条回复    2022-11-09 09:50:17 +08:00
    veoco
        1
    veoco  
       2022-11-04 13:19:37 +08:00
    User.objects.get 要放到一个单独的同步函数里,再用 sync_to_async 运行这个同步函数。不过你都用 gevent 了,不用写成异步了吧?
    fzzff
        2
    fzzff  
    OP
       2022-11-09 09:50:17 +08:00
    @veoco 感谢回复, 我测试了一下你说的这种方式, 还是报相同的错误, 因为是把另一个项目的用 asyncio 实现的异步爬虫代码移植到一个 django 项目作为 celery 任务, django 项目的 celery 启动用是用的 gevent 模式所以触发了这个错误, 目前已经解决, 把数据库写入部分改成了使用 aiomysql 操作
    关于   ·   帮助文档   ·   博客   ·   API   ·   FAQ   ·   实用小工具   ·   2516 人在线   最高记录 6679   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 22ms · UTC 03:28 · PVG 11:28 · LAX 19:28 · JFK 22:28
    Developed with CodeLauncher
    ♥ Do have faith in what you're doing.