源码: Lib/asyncio/streams.py


流是用于处理网络连接的支持 async/await 的高层级原语。 流允许发送和接收数据,而不需要使用回调或低级协议和传输。

下面是一个使用 asyncio streams 编写的 TCP echo 客户端示例:

import asyncio

async def tcp_echo_client(message):
    reader, writer = await asyncio.open_connection(
        '127.0.0.1', 8888)

    print(f'Send: {message!r}')
    writer.write(message.encode())
    await writer.drain()

    data = await reader.read(100)
    print(f'Received: {data.decode()!r}')

    print('Close the connection')
    writer.close()
    await writer.wait_closed()

asyncio.run(tcp_echo_client('Hello World!'))

参见下面的 Examples 部分。

Stream 函数

下面的高级 asyncio 函数可以用来创建和处理流:

coroutine asyncio.open_connection(host=None, port=None, *, limit=None, ssl=None, family=0, proto=0, flags=0, sock=None, local_addr=None, server_hostname=None, ssl_handshake_timeout=None, ssl_shutdown_timeout=None, happy_eyeballs_delay=None, interleave=None)

建立网络连接并返回一对 (reader, writer) 对象。

返回的 readerwriter 对象是 StreamReaderStreamWriter 类的实例。

limit 确定返回的 StreamReader 实例使用的缓冲区大小限制。默认情况下,limit 设置为 64 KiB 。

其余的参数直接传递到 loop.create_connection()

备注

The sock argument transfers ownership of the socket to the StreamWriter created. To close the socket, call its close() method.

在 3.7 版更改: 添加了 ssl_handshake_timeout 形参。

3.8 新版功能: 增加了 happy_eyeballs_delayinterleave 形参。

在 3.10 版更改: 移除了 loop 形参。

在 3.11 版更改: Added the ssl_shutdown_timeout parameter.

coroutine asyncio.start_server(client_connected_cb, host=None, port=None, *, limit=None, family=socket.AF_UNSPEC, flags=socket.AI_PASSIVE, sock=None, backlog=100, ssl=None, reuse_address=None, reuse_port=None, ssl_handshake_timeout=None, ssl_shutdown_timeout=None, start_serving=True)

启动套接字服务。

当一个新的客户端连接被建立时,回调函数 client_connected_cb 会被调用。该函数会接收到一对参数 (reader, writer) ,reader是类 StreamReader 的实例,而writer是类 StreamWriter 的实例。

client_connected_cb 即可以是普通的可调用对象也可以是一个 协程函数; 如果它是一个协程函数,它将自动作为 Task 被调度。

limit 确定返回的 StreamReader 实例使用的缓冲区大小限制。默认情况下,limit 设置为 64 KiB 。

余下的参数将会直接传递给 loop.create_server().

备注

The sock argument transfers ownership of the socket to the server created. To close the socket, call the server's close() method.

在 3.7 版更改: 增加了 ssl_handshake_timeoutstart_serving 形参。

在 3.10 版更改: 移除了 loop 形参。

在 3.11 版更改: Added the ssl_shutdown_timeout parameter.

Unix 套接字

coroutine asyncio.open_unix_connection(path=None, *, limit=None, ssl=None, sock=None, server_hostname=None, ssl_handshake_timeout=None, ssl_shutdown_timeout=None)

建立一个 Unix 套接字连接并返回 (reader, writer) 这对返回值。

open_connection() 相似,但是是在 Unix 套接字上的操作。

请看文档 loop.create_unix_connection().

备注

The sock argument transfers ownership of the socket to the StreamWriter created. To close the socket, call its close() method.

可用性: Unix。

在 3.7 版更改: 增加了 ssl_handshake_timeout 形参。 现在 path 形参可以是一个 path-like object

在 3.10 版更改: 移除了 loop 形参。

在 3.11 版更改: Added the ssl_shutdown_timeout parameter.

coroutine asyncio.start_unix_server(client_connected_cb, path=None, *, limit=None, sock=None, backlog=100, ssl=None, ssl_handshake_timeout=None, ssl_shutdown_timeout=None, start_serving=True)

启动一个 Unix 套接字服务。

start_server() 相似,但是是在 Unix 套接字上的操作。

请看文档 loop.create_unix_server().

备注

The sock argument transfers ownership of the socket to the server created. To close the socket, call the server's close() method.

可用性: Unix。

在 3.7 版更改: 增加了 ssl_handshake_timeoutstart_serving 形参。 现在 path 形参可以是一个 path-like object

在 3.10 版更改: 移除了 loop 形参。

在 3.11 版更改: Added the ssl_shutdown_timeout parameter.

StreamReader

class asyncio.StreamReader

Represents a reader object that provides APIs to read data from the IO stream. As an asynchronous iterable, the object supports the async for statement.

不推荐直接实例化 StreamReader 对象,建议使用 open_connection()start_server() 来获取 StreamReader 实例。

feed_eof()

Acknowledge the EOF.

coroutine read(n=- 1)

Read up to n bytes from the stream.

If n is not provided or set to -1, read until EOF, then return all read bytes. If EOF was received and the internal buffer is empty, return an empty bytes object.

If n is 0, return an empty bytes object immediately.

If n is positive, return at most n available bytes as soon as at least 1 byte is available in the internal buffer. If EOF is received before any byte is read, return an empty bytes object.

coroutine readline()

读取一行,其中“行”指的是以 \n 结尾的字节序列。

如果读到EOF而没有找到 \n ,该方法返回部分读取的数据。

如果读到EOF,且内部缓冲区为空,则返回一个空的 bytes 对象。

coroutine readexactly(n)

精确读取 n 个 bytes,不会超过也不能少于。

如果在读取完 n 个byte之前读取到EOF,则会引发 IncompleteReadError 异常。使用 IncompleteReadError.partial 属性来获取到达流结束之前读取的 bytes 字符串。

coroutine readuntil(separator=b'\n')

从流中读取数据直至遇到 separator

成功后,数据和指定的separator将从内部缓冲区中删除(或者说被消费掉)。返回的数据将包括在末尾的指定separator。

如果读取的数据量超过了配置的流限制,将引发 LimitOverrunError 异常,数据将留在内部缓冲区中并可以再次读取。

如果在找到完整的separator之前到达EOF,则会引发 IncompleteReadError 异常,并重置内部缓冲区。 IncompleteReadError.partial 属性可能包含指定separator的一部分。

3.5.2 新版功能.

at_eof()

如果缓冲区为空并且 feed_eof() 被调用,则返回 True

StreamWriter

class asyncio.StreamWriter

这个类表示一个写入器对象,该对象提供api以便于写数据至IO流中。

不建议直接实例化 StreamWriter;而应改用 open_connection()start_server()

write(data)

此方法会尝试立即将 data 写入到下层的套接字。 如果写入失败,数据会被排入内部写缓冲队列直到可以被发送。

此方法应当与 drain() 方法一起使用:

stream.write(data)
await stream.drain()
writelines(data)

此方法会立即尝试将一个字节串列表(或任何可迭代对象)写入到下层的套接字。 如果写入失败,数据会被排入内部写缓冲队列直到可以被发送。

此方法应当与 drain() 方法一起使用:

stream.writelines(lines)
await stream.drain()
close()

此方法会关闭流以及下层的套接字。

The method should be used, though not mandatory, along with the wait_closed() method:

stream.close()
await stream.wait_closed()
can_write_eof()

如果下层的传输支持 write_eof() 方法则返回``True``,否则返回 False

write_eof()

在已缓冲的写入数据被刷新后关闭流的写入端。

transport

返回下层的 asyncio 传输。

get_extra_info(name, default=None)

访问可选的传输信息;详情参见 BaseTransport.get_extra_info()

coroutine drain()

等待直到可以适当地恢复写入到流。 示例:

writer.write(data)
await writer.drain()

这是一个与下层的 IO 写缓冲区进行交互的流程控制方法。 当缓冲区大小达到最高水位(最大上限)时,drain() 会阻塞直到缓冲区大小减少至最低水位以便恢复写入。 当没有要等待的数据时,drain() 会立即返回。

coroutine start_tls(sslcontext, \*, server_hostname=None, ssl_handshake_timeout=None)

Upgrade an existing stream-based connection to TLS.

Parameters:

  • sslcontext: a configured instance of SSLContext.

  • server_hostname: sets or overrides the host name that the target server's certificate will be matched against.

  • ssl_handshake_timeout is the time in seconds to wait for the TLS handshake to complete before aborting the connection. 60.0 seconds if None (default).

3.11 新版功能.

is_closing()

如果流已被关闭或正在被关闭则返回 True

3.7 新版功能.

coroutine wait_closed()

等待直到流被关闭。

Should be called after close() to wait until the underlying connection is closed, ensuring that all data has been flushed before e.g. exiting the program.

3.7 新版功能.

例子

使用流的 TCP 回显客户端

使用 asyncio.open_connection() 函数的 TCP 回显客户端:

import asyncio

async def tcp_echo_client(message):
    reader, writer = await asyncio.open_connection(
        '127.0.0.1', 8888)

    print(f'Send: {message!r}')
    writer.write(message.encode())
    await writer.drain()

    data = await reader.read(100)
    print(f'Received: {data.decode()!r}')

    print('Close the connection')
    writer.close()
    await writer.wait_closed()

asyncio.run(tcp_echo_client('Hello World!'))

参见

使用低层级 loop.create_connection() 方法的 TCP 回显客户端协议 示例。

使用流的 TCP 回显服务器

TCP 回显服务器使用 asyncio.start_server() 函数:

import asyncio

async def handle_echo(reader, writer):
    data = await reader.read(100)
    message = data.decode()
    addr = writer.get_extra_info('peername')

    print(f"Received {message!r} from {addr!r}")

    print(f"Send: {message!r}")
    writer.write(data)
    await writer.drain()

    print("Close the connection")
    writer.close()
    await writer.wait_closed()

async def main():
    server = await asyncio.start_server(
        handle_echo, '127.0.0.1', 8888)

    addrs = ', '.join(str(sock.getsockname()) for sock in server.sockets)
    print(f'Serving on {addrs}')

    async with server:
        await server.serve_forever()

asyncio.run(main())

参见

使用 loop.create_server() 方法的 TCP 回显服务器协议 示例。

获取 HTTP 标头

查询命令行传入 URL 的 HTTP 标头的简单示例:

import asyncio
import urllib.parse
import sys

async def print_http_headers(url):
    url = urllib.parse.urlsplit(url)
    if url.scheme == 'https':
        reader, writer = await asyncio.open_connection(
            url.hostname, 443, ssl=True)
    else:
        reader, writer = await asyncio.open_connection(
            url.hostname, 80)

    query = (
        f"HEAD {url.path or '/'} HTTP/1.0\r\n"
        f"Host: {url.hostname}\r\n"
        f"\r\n"
    )

    writer.write(query.encode('latin-1'))
    while True:
        line = await reader.readline()
        if not line:
            break

        line = line.decode('latin1').rstrip()
        if line:
            print(f'HTTP header> {line}')

    # Ignore the body, close the socket
    writer.close()
    await writer.wait_closed()

url = sys.argv[1]
asyncio.run(print_http_headers(url))

用法:

python example.py http://example.com/path/page.html

或使用 HTTPS:

python example.py https://example.com/path/page.html

注册一个打开的套接字以等待使用流的数据

使用 open_connection() 函数实现等待直到套接字接收到数据的协程:

import asyncio
import socket

async def wait_for_data():
    # Get a reference to the current event loop because
    # we want to access low-level APIs.
    loop = asyncio.get_running_loop()

    # Create a pair of connected sockets.
    rsock, wsock = socket.socketpair()

    # Register the open socket to wait for data.
    reader, writer = await asyncio.open_connection(sock=rsock)

    # Simulate the reception of data from the network
    loop.call_soon(wsock.send, 'abc'.encode())

    # Wait for data
    data = await reader.read(100)

    # Got data, we are done: close the socket
    print("Received:", data.decode())
    writer.close()
    await writer.wait_closed()

    # Close the second socket
    wsock.close()

asyncio.run(wait_for_data())

参见

使用低层级协议以及 loop.create_connection() 方法的 注册一个打开的套接字以等待使用协议的数据 示例。

使用低层级的 loop.add_reader() 方法来监视文件描述符的 监视文件描述符以读取事件 示例。