V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
推荐学习书目
Learn Python the Hard Way
Python Sites
PyPI - Python Package Index
http://diveintopython.org/toc/index.html
Pocoo
值得关注的项目
PyPy
Celery
Jinja2
Read the Docs
gevent
pyenv
virtualenv
Stackless Python
Beautiful Soup
结巴中文分词
Green Unicorn
Sentry
Shovel
Pyflakes
pytest
Python 编程
pep8 Checker
Styles
PEP 8
Google Python Style Guide
Code Style from The Hitchhiker's Guide
1462326016
V2EX  ›  Python

用 Python 的 asyncio 库写了一个简单的 socket5 代理,莫名其妙的卡住,请大家帮忙看下!

  •  
  •   1462326016 · 2019-06-27 16:49:30 +08:00 · 3795 次点击
    这是一个创建于 2018 天前的主题,其中的信息可能已经有所发展或是发生改变。

    我在 stackoverflow 提问了,暂时还没有人回复。网址在此。 https://stackoverflow.com/questions/56786505/when-i-created-a-socket5-proxy-server-using-asyncio-my-code-was-unexpectedly-bl

    # -*- coding: utf-8 -*-
    import asyncio
    from struct import unpack, pack
    
    
    async def handle_echo(reader, writer):
        data = await reader.read(1024 * 64)
        addr = writer.get_extra_info('peername')
        print(f"connect from {addr!r}")
        if len(data) < 3:
            writer.close()
            return
        result = unpack('!BBB', data[:3])
        writer.write(b'\x05\x00')
        await writer.drain()
        data = await reader.read(1024 * 64)
        result = unpack('!4B', data[:4])
        if result[0] == 5 and result[1] == 1 and result[3] == 3:
            host_len = unpack('!B', data[4:5])[0]
            host = data[5:host_len + 5].decode()
            port = unpack('!H', data[host_len + 5:])[0]
            print(f'len {host_len},host {host},port {port}')
            try:
                reader_remote, writer_remote = await asyncio.open_connection(host, port)
                writer.write(pack('!5B', 5, 0, 0, 3, host_len) + host.encode() + pack('!H', port))
                await writer.drain()
                print(f'connect success !{host}')
            except (TimeoutError, ConnectionRefusedError) as _:
                print(f'connect failed !{host}')
                writer.write(pack('!5B', 5, 3, 0, 3, host_len) + host.encode() + pack('!H', port))
                await writer.drain()
                writer.close()
                return
            while True:
                client_data = await reader.read(1024 * 64)
                print(f'{host} client->local {len(client_data)}')
                if not client_data:
                    writer_remote.close()
                    writer.close()
                    print(f'{host} disconnect')
                    return
                writer_remote.write(client_data)
                await writer_remote.drain()
                print(f'{host} local->remote !{len(client_data)}')
                remote_data = await reader_remote.read(1024 * 64)
                print(f'{host} remote->local! {len(remote_data)}')
                writer.write(remote_data)
                await writer.drain()
                print(f'{host} local->client! {len(remote_data)}')
    
    
    async def main():
        server = await asyncio.start_server(
            handle_echo, '0.0.0.0', 3333)
    
        addr = server.sockets[0].getsockname()
        print(f'Serving on {addr}')
    
        async with server:
            await server.serve_forever()
    
    
    asyncio.run(main())
    

    如果一个网页只有一个 html 的话就没问题,例如:example.com 。但是像 baidu.com 这种的要加载好多 js 和 css 的就不行了,打印几行日志就卡住了,应该是卡在这里,client_data = await reader.read(1024 * 64),不知道为什么。系统 win10,python3.7.3。

    第 1 条附言  ·  2019-06-28 08:28:39 +08:00

    非常感谢大家的回复,第一次写asyncio的异步socket通信没什么经验,我尝试下全双工的写法

    18 条回复    2019-07-09 19:05:17 +08:00
    qizheshang
        1
    qizheshang  
       2019-06-27 17:32:53 +08:00
    asyncio.run(main()),你这个 run 从哪里来的?跑不通
    # -*- coding: utf-8 -*- python3 这个完全可以去掉了
    wwqgtxx
        2
    wwqgtxx  
       2019-06-27 17:41:03 +08:00 via iPhone
    @qizheshang py3.7 新加的函数
    lcdtyph
        3
    lcdtyph  
       2019-06-27 18:43:49 +08:00
    我觉得问题出在 handle_echo 的那个 while True 里,
    如果是远程先发送数据的话就死在 await reader.read 上了
    janxin
        4
    janxin  
       2019-06-27 18:45:09 +08:00
    @qizheshang 3.7 asyncio 加了很多新函数
    exch4nge
        5
    exch4nge  
       2019-06-27 18:53:33 +08:00
    如果资源大于 64k ( 1024*64 )的时候,一次性读不完所有内容,所以你仅仅把一部分请求内容转发,一端会继续等待,并不会发送数据
    XiaoxiaoPu
        6
    XiaoxiaoPu  
       2019-06-27 18:59:08 +08:00
    代理要全双工的,你的代码只做了一个方向的数据中转。从网上找了个例子,你可以参考下: github
    .com /msoedov/toxic_proxy/blob/master/toxic_proxy/app.py#L41
    2pang
        7
    2pang  
       2019-06-27 19:07:21 +08:00
    从 A 读发给 B 然后从 B 读发给 A 你不觉得这很容易死锁么
    假如应该从 B 读发给 A 的时候 你一直等待在从 A 读
    exch4nge
        8
    exch4nge  
       2019-06-27 19:13:03 +08:00
    @exch4nge #5 补充一下,你这个 while True 里面逻辑是严格有顺序的,就是先从 Client 读数据,然后发送到 Server,然后从 Server 等待回应,然后再把回应发到 Client。这种模式在 HTTP 1.1 却是没什么问题,是请求-响应模式。不过,资源大于 64k 的时候,你紧紧把部分资源回应发到 Client,Client 还会等着你发剩余的部分,不会向你这个 Proxy 发送数据,所以你会在那个语句卡住。
    1、 要么读写的时候读到数据干为止再转发,
    2、要么同时分别处理 C -> S 和 S -> C 两个方向的请求,比如用两个线程(或协程)一个负责从 Client 读数据转发到 Server,另一个负责从 Server 读数据转发到 Client
    exch4nge
        9
    exch4nge  
       2019-06-27 19:16:46 +08:00
    @exch4nge #8 却是 -> 确实, 紧紧 -> 仅仅;
    上面说的两种中,还是 2 好一些,难免遇到 server 卡住,发一部分不给你发剩余的,Proxy 又没有分析 http 响应包不知道包边界在哪。
    1462326016
        10
    1462326016  
    OP
       2019-06-28 08:14:40 +08:00
    @qizheshang 这个是 Python3.7 新添加的函数,跑不通的话可以用
    ```loop = asyncio.get_event_loop()
    loop.run_until_complete(asyncio.gather(main()))```
    这个
    1462326016
        11
    1462326016  
    OP
       2019-06-28 08:15:56 +08:00
    @lcdtyph 但是远程我在上面只是进行了 tcp 连接,没有其他操作,应该不会是远程先发数据
    1462326016
        12
    1462326016  
    OP
       2019-06-28 08:17:29 +08:00
    @exch4nge 数据大小的话可以看 stackoverflow 里边贴的日志,我打印了数据大小,没有超出。。我怀疑是不是没有全部发送出去,像之前的 sock.send 和 sock.sendall 的区别。
    1462326016
        13
    1462326016  
    OP
       2019-06-28 08:18:47 +08:00
    @exch4nge 好的好的, 非常感谢, 我尝试一下
    1462326016
        14
    1462326016  
    OP
       2019-06-28 08:19:46 +08:00
    @2pang 了解了,感谢提醒,我尝试下全双工的写法。
    1462326016
        15
    1462326016  
    OP
       2019-06-28 08:20:39 +08:00
    @XiaoxiaoPu 我参考一下,尝试下全双工的写法, 感谢
    1462326016
        16
    1462326016  
    OP
       2019-06-28 20:25:52 +08:00
    感谢大家的帮助,改成全双工已经没有问题了。
    ```
    # -*- coding: utf-8 -*-
    import asyncio
    from struct import unpack, pack


    async def handle_echo(reader, writer):
    data = await reader.read(1024 * 64)
    addr = writer.get_extra_info('peername')
    print(f"connect from {addr!r}")
    if len(data) < 3:
    print('too short...')
    writer.close()
    return
    result = unpack('!BBB', data[:3])
    writer.write(b'\x05\x00')
    await writer.drain()
    data = await reader.read(1024 * 64)
    result = unpack('!4B', data[:4])
    if result[0] == 5 and result[1] == 1 and result[3] == 3:
    host_len = unpack('!B', data[4:5])[0]
    host = data[5:host_len + 5].decode()
    port = unpack('!H', data[host_len + 5:])[0]
    print(f'len {host_len},host {host},port {port}')
    try:
    reader_remote, writer_remote = await asyncio.open_connection(host, port)
    writer.write(pack('!5B', 5, 0, 0, 3, host_len) + host.encode() + pack('!H', port))
    await writer.drain()
    print(f'connect success !{host}')
    except (TimeoutError, ConnectionRefusedError) as _:
    print(f'connect failed !{host}')
    writer.write(pack('!5B', 5, 3, 0, 3, host_len) + host.encode() + pack('!H', port))
    await writer.drain()
    writer.close()
    return
    up_stream = _pipe(reader, writer_remote, host)
    down_stream = _pipe(reader_remote, writer, host)
    await asyncio.gather(up_stream, down_stream)
    if result[0] == 5 and result[1] == 1 and result[3] == 1:
    ip = '.'.join([str(a) for a in unpack('!BBBB', data[4:8])])
    port = unpack('H', data[8:10])[0]
    print(f'ip {ip},port {port}')
    try:
    reader_remote, writer_remote = await asyncio.open_connection(ip, port)
    writer.write(pack('!8B', 5, 0, 0, 1, *unpack('!BBBB', data[4:8])) + pack('!H', port))
    await writer.drain()
    print(f'connect success !{ip}')
    except (TimeoutError, ConnectionRefusedError) as _:
    print(f'connect failed !{ip},{repr(_)}')
    writer.write(pack('!8B', 5, 3, 0, 1, *unpack('!BBBB', data[4:8])) + pack('!H', port))
    await writer.drain()
    writer.close()
    return
    up_stream = _pipe(reader, writer_remote, ip)
    down_stream = _pipe(reader_remote, writer, ip)
    await asyncio.gather(up_stream, down_stream)


    async def _pipe(reader, writer, host):
    while reader.at_eof:
    try:
    data = await reader.read(1024 * 64)
    if not data:
    writer.close()
    break
    except (ConnectionAbortedError, ConnectionResetError) as _:
    writer.close()
    print(f'{host} 异常退出 {repr(_)}')
    break
    try:
    writer.write(data)
    await writer.drain()
    except (ConnectionAbortedError, ConnectionResetError) as _:
    writer.close()
    print(f'{host} 异常退出 {repr(_)}')
    break
    print(f'{host} 退出')


    async def main():
    server = await asyncio.start_server(
    handle_echo, '0.0.0.0', 3333)

    addr = server.sockets[0].getsockname()
    print(f'Serving on {addr}')

    async with server:
    await server.serve_forever()


    loop = asyncio.get_event_loop()
    loop.run_until_complete(asyncio.gather(main()))

    ```
    代码在此
    1462326016
        17
    1462326016  
    OP
       2019-06-28 20:26:46 +08:00
    啊,格式乱了,不知道怎么格式化成 markdown。。。无语。
    qizheshang
        18
    qizheshang  
       2019-07-09 19:05:17 +08:00
    @janxin 好吧 我一直用的是 3.6
    关于   ·   帮助文档   ·   博客   ·   API   ·   FAQ   ·   实用小工具   ·   983 人在线   最高记录 6679   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 25ms · UTC 22:54 · PVG 06:54 · LAX 14:54 · JFK 17:54
    Developed with CodeLauncher
    ♥ Do have faith in what you're doing.