V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
推荐学习书目
Learn Python the Hard Way
Python Sites
PyPI - Python Package Index
http://diveintopython.org/toc/index.html
Pocoo
值得关注的项目
PyPy
Celery
Jinja2
Read the Docs
gevent
pyenv
virtualenv
Stackless Python
Beautiful Soup
结巴中文分词
Green Unicorn
Sentry
Shovel
Pyflakes
pytest
Python 编程
pep8 Checker
Styles
PEP 8
Google Python Style Guide
Code Style from The Hitchhiker's Guide
noqwerty
V2EX  ›  Python

尝试用 async / await 下载文件失败,求帮助

  •  
  •   noqwerty · 2018-01-15 07:42:29 +08:00 · 5231 次点击
    这是一个创建于 2547 天前的主题,其中的信息可能已经有所发展或是发生改变。

    目前需要从一个 FTP 服务器下载 3 万多个小文件,之前用 multiprocessing 总是下一部分之后就停了,所以尝试用异步加快下载:

    class pdb:
        def __init__(self):
            self.ids = []
            self.dl_id = []
            self.err_id = []
    
        async def download_file(self, session, url):
            try:
                with async_timeout.timeout(10):
                    async with session.get(url) as remotefile:
                        if remotefile.status == 200:
                            data = await remotefile.read()
                            return {"error": "", "data": data}
                        else:
                            return {"error": remotefile.status, "data": ""}
            except Exception as e:
                return {"error": e, "data": ""}
    
        async def unzip(self, session, work_queue):
            while not work_queue.empty():
                queue_url = await work_queue.get()
                print(queue_url)
                data = await self.download_file(session, queue_url)
                id = queue_url[-11:-7]
                ID = id.upper()
                if not data["error"]:
                    saved_pdb = os.path.join("./pdb", ID, f'{ID}.pdb')
                    if ID not in self.dl_id:
                        self.dl_id.append(ID)
                    with open(f"{id}.ent.gz", 'wb') as f:
                        f.write(data["data"].read())
                    with gzip.open(f"{id}.ent.gz", "rb") as inFile, open(saved_pdb, "wb") as outFile:
                        shutil.copyfileobj(inFile, outFile)
                    os.remove(f"{id}.ent.gz")
                else:
                    self.err_id.append(ID)
    
        def download_queue(self, urls):
            loop = asyncio.get_event_loop()
            q = asyncio.Queue(loop=loop)
            [q.put_nowait(url) for url in urls]
            con = aiohttp.TCPConnector(limit=10)
            with aiohttp.ClientSession(loop=loop, connector=con) as session:
                tasks = [asyncio.ensure_future(self.unzip(session, q)) for _ in range(len(urls))]
                loop.run_until_complete(asyncio.gather(*tasks))
            loop.close()
    
    
    if __name__ == "__main__":
        x = pdb()
        urls = ['ftp://ftp.wwpdb.org/pub/pdb/data/structures/divided/pdb/nf/pdb4nfn.ent.gz', 'ftp://ftp.wwpdb.org/pub/pdb/data/structures/divided/pdb/ny/pdb4nyj.ent.gz', 'ftp://ftp.wwpdb.org/pub/pdb/data/structures/divided/pdb/mn/pdb2mnz.ent.gz', 'ftp://ftp.wwpdb.org/pub/pdb/data/structures/divided/pdb/ra/pdb4ra4.ent.gz', 'ftp://ftp.wwpdb.org/pub/pdb/data/structures/divided/pdb/x5/pdb4x5w.ent.gz', 'ftp://ftp.wwpdb.org/pub/pdb/data/structures/divided/pdb/dm/pdb2dmq.ent.gz', 'ftp://ftp.wwpdb.org/pub/pdb/data/structures/divided/pdb/n7/pdb2n7r.ent.gz', 'ftp://ftp.wwpdb.org/pub/pdb/data/structures/divided/pdb/om/pdb2omv.ent.gz', 'ftp://ftp.wwpdb.org/pub/pdb/data/structures/divided/pdb/oy/pdb3oy8.ent.gz', 'ftp://ftp.wwpdb.org/pub/pdb/data/structures/divided/pdb/fe/pdb3fej.ent.gz', 'ftp://ftp.wwpdb.org/pub/pdb/data/structures/divided/pdb/hw/pdb2hw9.ent.gz']
        x.download_queue(urls)
    

    报错信息如下:

    Traceback (most recent call last):
    File "test.py", line 111, in <module>
    x.download_queue(urls)
    File "test.py", line 99, in download_queue
    loop.run_until_complete(asyncio.gather(*tasks))
    File "/home/yz/miniconda3/lib/python3.6/asyncio/base_events.py", line 467, in run_until_complete
    return future.result()
    File "test.py", line 73, in unzip
    data = await self.download_file(session, queue_url)
    File "test.py", line 65, in download_file
    return {"error": remotefile.status, "data": ""}
    File "/home/yz/miniconda3/lib/python3.6/site-packages/async_timeout/init.py", line 46, in exit
    raise asyncio.TimeoutError from None
    concurrent.futures._base.TimeoutError

    请大家帮忙看看。谢谢!

    第 1 条附言  ·  2018-01-16 13:11:22 +08:00

    发现了很蠢的一个问题……aiohttp似乎并不支持解析ftp链接,我在urls里面随机插了一些正常链接就跑得动了……还是感谢大家的帮忙!

    24 条回复    2018-06-23 12:12:36 +08:00
    XiaoFaye
        1
    XiaoFaye  
       2018-01-15 07:46:30 +08:00 via Android   ❤️ 1
    lftp 开多线程 10 万文件我也下载过,一点问题没有啊!除非服务器限制吧。
    noqwerty
        2
    noqwerty  
    OP
       2018-01-15 07:47:14 +08:00
    @XiaoFaye #1 哈哈我知道,肯定是我代码的问题不是人家服务器的问题
    hareandlion
        3
    hareandlion  
       2018-01-15 07:57:52 +08:00 via iPhone   ❤️ 1
    timeout 时间设置的太短了?
    noqwerty
        4
    noqwerty  
    OP
       2018-01-15 07:58:43 +08:00
    @hareandlion #3 我试过把那行删了,就会一直卡在那,感觉还是其他地方有问题。
    hareandlion
        5
    hareandlion  
       2018-01-15 08:03:20 +08:00 via iPhone   ❤️ 1
    加个 print 看看是不是哪个 url 无效,卡住了
    noqwerty
        6
    noqwerty  
    OP
       2018-01-15 08:39:58 +08:00
    @hareandlion #5 已经有 print 了呀,会直接把所有链接都打印出来然后卡住……
    shoaly
        7
    shoaly  
       2018-01-15 09:47:22 +08:00   ❤️ 2
    老老实实用 python 做一个下载链接的清单, 然后用 aria2c 下载吧... 省出来的时间都是你的
    ipwx
        8
    ipwx  
       2018-01-15 09:57:53 +08:00   ❤️ 1
    unzip 不要用 async,CPU 密集型。
    bramblex
        9
    bramblex  
       2018-01-15 10:05:47 +08:00 via iPhone
    Python 可以真多线程了吗?
    noqwerty
        10
    noqwerty  
    OP
       2018-01-15 10:13:35 +08:00
    @shoaly #7 以后可能会了……但是这个就是练手的项目,想弄明白异步到底该怎么写。
    noqwerty
        11
    noqwerty  
    OP
       2018-01-15 10:14:20 +08:00
    @ipwx #8 没太明白,unzip 里面只有取数据那一行用了 await,其他步骤都没有,这样也不可以吗?谢谢帮忙!
    noqwerty
        12
    noqwerty  
    OP
       2018-01-15 10:15:45 +08:00
    @bramblex #9 还是有 GIL 的,估计多线程是有生之年系列了……不过现在多核越来越不值钱,多线程意义也没那么大了吧。
    Miksztowi
        13
    Miksztowi  
       2018-01-15 11:20:33 +08:00   ❤️ 1
    是客户端连接出错了把。从队列中取出 url 后会打印,发生了异常会继续拿,这样的话,如果请求有问题,应该是直接打印所有的 url 后结束?
    ipwx
        14
    ipwx  
       2018-01-15 11:27:53 +08:00   ❤️ 1
    @noqwerty 你的问题在于:

    with open(f"{id}.ent.gz", 'wb') as f:
    ....f.write(data["data"].read())
    with gzip.open(f"{id}.ent.gz", "rb") as inFile, open(saved_pdb, "wb") as outFile:
    ....shutil.copyfileobj(inFile, outFile)
    os.remove(f"{id}.ent.gz")

    这几行是没法被 asyncio 通过 Coroutine 并行化的,只能多线程。但是这就产生了两个问题,第一默认的 asyncio 不是多线程并行化的,第二即使设置 asyncio 多线程并行化,考虑到 GIL,Python 多线程也是不够用的。所以总体来说,asyncio 对你这段程序是不够的。还是得上多进程。
    ipwx
        15
    ipwx  
       2018-01-15 11:29:14 +08:00   ❤️ 1
    @noqwerty asyncio 主要针对网络通讯的并行化,用的是非阻塞模型。关键词可以搜索 select, epoll,了解更多非阻塞模型的事情。
    Miksztowi
        16
    Miksztowi  
       2018-01-15 11:43:44 +08:00   ❤️ 1
    @ipwx GIL 在文件 I/O 时不是会释放吗?
    支持 async 的文件 I/O 的有:
    1.aiofiles: https://github.com/Tinche/aiofiles
    2.asyncio 中有 thread pool executor. run_in_executor()也可以处理文件 I/O.
    如果还有别的方法,欢迎补充 :)

    你说 unzip 是 cpu 密集型,那这跟 GIL 有啥关系? 还是要上多进程。
    ivechan
        17
    ivechan  
       2018-01-15 12:12:46 +08:00   ❤️ 1
    @Miksztowi GIL 的存在会使得 Python 里的多线程对 CPU 密集型 程序优化作用有限,而多进程就可以避免这个缺点。
    ivechan
        18
    ivechan  
       2018-01-15 12:23:03 +08:00   ❤️ 1
    @ipwx select, epoll 应该只是 I/O 复用,但其实还是属于阻塞模型吧?只不过是在 select 的时候阻塞,而不是在真正的 IO 调用上。
    ipwx
        19
    ipwx  
       2018-01-15 15:51:42 +08:00   ❤️ 2
    @Miksztowi GIL 的存在导致文件读取的每个原子操作,线程切换的开销增大。而 shutil.copyfileobj,那是个 Python 循环,所以是不可能高效的。aiofiles 那东西在很多平台上面是多线程实现的,你可以 check 一下它的源代码。

    基于这个原因,可以认为楼主的程序上多线程没救,所以 asyncio 就没有救(如果多线程有救,asyncio 还是可以用的)。而因为多线程无法使用,CPU 密集型的 unzip 就没法被 asyncio 搞定。这才导出了我的结论,unzip 是 cpu 密集型,不适合 asyncio。

    至于 select、epoll 这类 I/O 复用,我觉得可以认为它们是非阻塞模型,因为它们避免了多线程模式下的 while { read } 县线程等待,和 callback 效果等同。我觉得并不一定 callback 才可以被认为是非阻塞,只要看是否达到同样的效果就可以了。
    noqwerty
        20
    noqwerty  
    OP
       2018-01-16 00:02:34 +08:00
    @ipwx #19 那请问多进程和 asyncio 可以结合起来使用吗?之前用多进程写的也总是跑到一半就自己停了,也不报错。多谢帮助!
    ipwx
        21
    ipwx  
       2018-01-16 09:45:23 +08:00   ❤️ 1
    @noqwerty 我记得 asyncio 有方法用多进程,不过现在的 api 都很基础很难用。所以你这需求用 python 其实挺麻烦的。
    linw1995
        22
    linw1995  
       2018-01-17 22:17:08 +08:00   ❤️ 1
    把 unzip,和 writefile 写成一个普通函数,用`concurrent.futures.ProcessPoolExecutor`和`loop.run_in_executor`函数运行,这样就可以结合起来

    await loop.run_in_executor(PPExecutor, func, args)

    https://pymotw.com/3/asyncio/executors.html
    alexred
        23
    alexred  
       2018-03-20 20:38:51 +08:00
    为什么我跑你的代码会报
    TypeError: Use async with instead
    的错
    ssikiki
        24
    ssikiki  
       2018-06-23 12:12:36 +08:00
    装 aiohttp 2.3.0 版本, 解决 TypeError: Use async with instead
    关于   ·   帮助文档   ·   博客   ·   API   ·   FAQ   ·   实用小工具   ·   921 人在线   最高记录 6679   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 24ms · UTC 21:36 · PVG 05:36 · LAX 13:36 · JFK 16:36
    Developed with CodeLauncher
    ♥ Do have faith in what you're doing.