mirror of
https://github.com/davidhalter/typeshed.git
synced 2025-12-07 12:44:28 +08:00
Add missing methods to asyncio stubs (#3088)
This commit is contained in:
committed by
Sebastian Rittau
parent
d05a9d3d83
commit
29771aa1ee
@@ -1,8 +1,8 @@
|
||||
import selectors
|
||||
from socket import socket, _Address
|
||||
from socket import socket, _Address, _RetAddress
|
||||
import ssl
|
||||
import sys
|
||||
from typing import Any, Awaitable, Callable, Dict, Generator, List, Optional, Sequence, Tuple, TypeVar, Union, overload
|
||||
from typing import Any, Awaitable, Callable, Dict, Generator, IO, List, Optional, Sequence, Tuple, TypeVar, Union, overload
|
||||
from asyncio.futures import Future
|
||||
from asyncio.coroutines import coroutine
|
||||
from asyncio.events import AbstractEventLoop, AbstractServer, Handle, TimerHandle
|
||||
@@ -59,37 +59,76 @@ class BaseEventLoop(AbstractEventLoop):
|
||||
flags: int = ...) -> Generator[Any, None, List[Tuple[int, int, int, str, Tuple[Any, ...]]]]: ...
|
||||
@coroutine
|
||||
def getnameinfo(self, sockaddr: tuple, flags: int = ...) -> Generator[Any, None, Tuple[str, int]]: ...
|
||||
@overload
|
||||
@coroutine
|
||||
def create_connection(self, protocol_factory: _ProtocolFactory, host: str = ..., port: int = ..., *,
|
||||
ssl: _SSLContext = ..., family: int = ..., proto: int = ..., flags: int = ..., sock: None = ...,
|
||||
local_addr: Optional[str] = ..., server_hostname: Optional[str] = ...) -> Generator[Any, None, _TransProtPair]: ...
|
||||
@overload
|
||||
@coroutine
|
||||
def create_connection(self, protocol_factory: _ProtocolFactory, host: None = ..., port: None = ..., *,
|
||||
ssl: _SSLContext = ..., family: int = ..., proto: int = ..., flags: int = ..., sock: socket,
|
||||
local_addr: None = ..., server_hostname: Optional[str] = ...) -> Generator[Any, None, _TransProtPair]: ...
|
||||
@overload
|
||||
@coroutine
|
||||
def create_server(self, protocol_factory: _ProtocolFactory, host: Optional[Union[str, Sequence[str]]] = ..., port: int = ..., *,
|
||||
family: int = ..., flags: int = ...,
|
||||
sock: None = ..., backlog: int = ..., ssl: _SSLContext = ...,
|
||||
reuse_address: Optional[bool] = ...,
|
||||
reuse_port: Optional[bool] = ...) -> Generator[Any, None, AbstractServer]: ...
|
||||
@overload
|
||||
@coroutine
|
||||
def create_server(self, protocol_factory: _ProtocolFactory, host: None = ..., port: None = ..., *,
|
||||
family: int = ..., flags: int = ...,
|
||||
sock: socket, backlog: int = ..., ssl: _SSLContext = ...,
|
||||
reuse_address: Optional[bool] = ...,
|
||||
reuse_port: Optional[bool] = ...) -> Generator[Any, None, AbstractServer]: ...
|
||||
@coroutine
|
||||
def create_unix_connection(self, protocol_factory: _ProtocolFactory, path: str, *,
|
||||
ssl: _SSLContext = ..., sock: Optional[socket] = ...,
|
||||
server_hostname: str = ...) -> Generator[Any, None, _TransProtPair]: ...
|
||||
@coroutine
|
||||
def create_unix_server(self, protocol_factory: _ProtocolFactory, path: str, *,
|
||||
sock: Optional[socket] = ..., backlog: int = ..., ssl: _SSLContext = ...) -> Generator[Any, None, AbstractServer]: ...
|
||||
if sys.version_info >= (3, 7):
|
||||
async def sock_sendfile(self, sock: socket, file: IO[bytes], offset: int = ..., count: Optional[int] = ..., *,
|
||||
fallback: bool = ...) -> int: ...
|
||||
@overload
|
||||
async def create_connection(self, protocol_factory: _ProtocolFactory, host: str = ..., port: int = ..., *,
|
||||
ssl: _SSLContext = ..., family: int = ..., proto: int = ..., flags: int = ...,
|
||||
sock: None = ..., local_addr: Optional[str] = ..., server_hostname: Optional[str] = ...,
|
||||
ssl_handshake_timeout: Optional[float] = ...) -> _TransProtPair: ...
|
||||
@overload
|
||||
async def create_connection(self, protocol_factory: _ProtocolFactory, host: None = ..., port: None = ..., *,
|
||||
ssl: _SSLContext = ..., family: int = ..., proto: int = ..., flags: int = ...,
|
||||
sock: socket, local_addr: None = ..., server_hostname: Optional[str] = ...,
|
||||
ssl_handshake_timeout: Optional[float] = ...) -> _TransProtPair: ...
|
||||
@overload
|
||||
async def create_server(self, protocol_factory: _ProtocolFactory, host: Optional[Union[str, Sequence[str]]] = ...,
|
||||
port: int = ..., *, family: int = ..., flags: int = ..., sock: None = ..., backlog: int = ...,
|
||||
ssl: _SSLContext = ..., reuse_address: Optional[bool] = ..., reuse_port: Optional[bool] = ...,
|
||||
ssl_handshake_timeout: Optional[float] = ..., start_serving: bool = ...) -> AbstractServer: ...
|
||||
@overload
|
||||
async def create_server(self, protocol_factory: _ProtocolFactory, host: None = ..., port: None = ..., *,
|
||||
family: int = ..., flags: int = ..., sock: socket = ..., backlog: int = ...,
|
||||
ssl: _SSLContext = ..., reuse_address: Optional[bool] = ..., reuse_port: Optional[bool] = ...,
|
||||
ssl_handshake_timeout: Optional[float] = ..., start_serving: bool = ...) -> AbstractServer: ...
|
||||
async def create_unix_connection(self, protocol_factory: _ProtocolFactory, path: str, *, ssl: _SSLContext = ...,
|
||||
sock: Optional[socket] = ..., server_hostname: str = ...,
|
||||
ssl_handshake_timeout: Optional[float]) -> _TransProtPair: ...
|
||||
async def create_unix_server(self, protocol_factory: _ProtocolFactory, path: str, *, sock: Optional[socket] = ...,
|
||||
backlog: int = ..., ssl: _SSLContext = ..., ssl_handshake_timeout: Optional[float] = ...,
|
||||
start_serving: bool = ...) -> AbstractServer: ...
|
||||
async def connect_accepted_socket(self, protocol_factory: _ProtocolFactory, sock: socket, *, ssl: _SSLContext = ...,
|
||||
ssl_handshake_timeout: Optional[float] = ...) -> _TransProtPair: ...
|
||||
async def sendfile(self, transport: BaseTransport, file: IO[bytes], offset: int = ..., count: Optional[int] = ..., *,
|
||||
fallback: bool = ...) -> int: ...
|
||||
async def start_tls(self, transport: BaseTransport, protocol: BaseProtocol, sslcontext: ssl.SSLContext, *,
|
||||
server_side: bool = ..., server_hostname: Optional[str] = ...,
|
||||
ssl_handshake_timeout: Optional[float] = ...) -> BaseTransport: ...
|
||||
else:
|
||||
@overload
|
||||
@coroutine
|
||||
def create_connection(self, protocol_factory: _ProtocolFactory, host: str = ..., port: int = ..., *,
|
||||
ssl: _SSLContext = ..., family: int = ..., proto: int = ..., flags: int = ..., sock: None = ...,
|
||||
local_addr: Optional[str] = ..., server_hostname: Optional[str] = ...) -> Generator[Any, None, _TransProtPair]: ...
|
||||
@overload
|
||||
@coroutine
|
||||
def create_connection(self, protocol_factory: _ProtocolFactory, host: None = ..., port: None = ..., *,
|
||||
ssl: _SSLContext = ..., family: int = ..., proto: int = ..., flags: int = ..., sock: socket,
|
||||
local_addr: None = ..., server_hostname: Optional[str] = ...) -> Generator[Any, None, _TransProtPair]: ...
|
||||
@overload
|
||||
@coroutine
|
||||
def create_server(self, protocol_factory: _ProtocolFactory, host: Optional[Union[str, Sequence[str]]] = ..., port: int = ..., *,
|
||||
family: int = ..., flags: int = ...,
|
||||
sock: None = ..., backlog: int = ..., ssl: _SSLContext = ...,
|
||||
reuse_address: Optional[bool] = ...,
|
||||
reuse_port: Optional[bool] = ...) -> Generator[Any, None, AbstractServer]: ...
|
||||
@overload
|
||||
@coroutine
|
||||
def create_server(self, protocol_factory: _ProtocolFactory, host: None = ..., port: None = ..., *,
|
||||
family: int = ..., flags: int = ...,
|
||||
sock: socket, backlog: int = ..., ssl: _SSLContext = ...,
|
||||
reuse_address: Optional[bool] = ...,
|
||||
reuse_port: Optional[bool] = ...) -> Generator[Any, None, AbstractServer]: ...
|
||||
@coroutine
|
||||
def create_unix_connection(self, protocol_factory: _ProtocolFactory, path: str, *,
|
||||
ssl: _SSLContext = ..., sock: Optional[socket] = ...,
|
||||
server_hostname: str = ...) -> Generator[Any, None, _TransProtPair]: ...
|
||||
@coroutine
|
||||
def create_unix_server(self, protocol_factory: _ProtocolFactory, path: str, *,
|
||||
sock: Optional[socket] = ..., backlog: int = ..., ssl: _SSLContext = ...) -> Generator[Any, None, AbstractServer]: ...
|
||||
@coroutine
|
||||
def connect_accepted_socket(self, protocol_factory: _ProtocolFactory, sock: socket, *, ssl: _SSLContext = ...) -> Generator[Any, None, _TransProtPair]: ...
|
||||
@coroutine
|
||||
def create_datagram_endpoint(self, protocol_factory: _ProtocolFactory,
|
||||
local_addr: Optional[Tuple[str, int]] = ..., remote_addr: Optional[Tuple[str, int]] = ..., *,
|
||||
@@ -97,8 +136,6 @@ class BaseEventLoop(AbstractEventLoop):
|
||||
reuse_address: Optional[bool] = ..., reuse_port: Optional[bool] = ...,
|
||||
allow_broadcast: Optional[bool] = ...,
|
||||
sock: Optional[socket] = ...) -> Generator[Any, None, _TransProtPair]: ...
|
||||
@coroutine
|
||||
def connect_accepted_socket(self, protocol_factory: _ProtocolFactory, sock: socket, *, ssl: _SSLContext = ...) -> Generator[Any, None, _TransProtPair]: ...
|
||||
# Pipes and subprocesses.
|
||||
@coroutine
|
||||
def connect_read_pipe(self, protocol_factory: _ProtocolFactory, pipe: Any) -> Generator[Any, None, _TransProtPair]: ...
|
||||
@@ -116,15 +153,18 @@ class BaseEventLoop(AbstractEventLoop):
|
||||
def remove_reader(self, fd: selectors._FileObject) -> None: ...
|
||||
def add_writer(self, fd: selectors._FileObject, callback: Callable[..., Any], *args: Any) -> None: ...
|
||||
def remove_writer(self, fd: selectors._FileObject) -> None: ...
|
||||
# Completion based I/O methods returning Futures.
|
||||
@coroutine
|
||||
def sock_recv(self, sock: socket, nbytes: int) -> Generator[Any, None, bytes]: ...
|
||||
@coroutine
|
||||
def sock_sendall(self, sock: socket, data: bytes) -> Generator[Any, None, None]: ...
|
||||
@coroutine
|
||||
def sock_connect(self, sock: socket, address: _Address) -> Generator[Any, None, None]: ...
|
||||
@coroutine
|
||||
def sock_accept(self, sock: socket) -> Generator[Any, None, Tuple[socket, Any]]: ...
|
||||
# Completion based I/O methods returning Futures prior to 3.7
|
||||
if sys.version_info >= (3, 7):
|
||||
async def sock_recv(self, sock: socket, nbytes: int) -> bytes: ...
|
||||
async def sock_recv_into(self, sock: socket, buf: bytearray) -> int: ...
|
||||
async def sock_sendall(self, sock: socket, data: bytes) -> None: ...
|
||||
async def sock_connect(self, sock: socket, address: _Address) -> None: ...
|
||||
async def sock_accept(self, sock: socket) -> Tuple[socket, _RetAddress]: ...
|
||||
else:
|
||||
def sock_recv(self, sock: socket, nbytes: int) -> Future[bytes]: ...
|
||||
def sock_sendall(self, sock: socket, data: bytes) -> Future[None]: ...
|
||||
def sock_connect(self, sock: socket, address: _Address) -> Future[None]: ...
|
||||
def sock_accept(self, sock: socket) -> Future[Tuple[socket, _RetAddress]]: ...
|
||||
# Signal handling.
|
||||
def add_signal_handler(self, sig: int, callback: Callable[..., Any], *args: Any) -> None: ...
|
||||
def remove_signal_handler(self, sig: int) -> None: ...
|
||||
|
||||
@@ -1,8 +1,8 @@
|
||||
import selectors
|
||||
from socket import socket, _Address
|
||||
from socket import socket, _Address, _RetAddress
|
||||
import ssl
|
||||
import sys
|
||||
from typing import Any, Awaitable, Callable, Dict, Generator, List, Optional, Sequence, Tuple, TypeVar, Union, overload
|
||||
from typing import Any, Awaitable, Callable, Dict, Generator, IO, List, Optional, Sequence, Tuple, TypeVar, Union, overload
|
||||
from abc import ABCMeta, abstractmethod
|
||||
from asyncio.futures import Future
|
||||
from asyncio.coroutines import coroutine
|
||||
@@ -26,6 +26,7 @@ class Handle:
|
||||
def __repr__(self) -> str: ...
|
||||
def cancel(self) -> None: ...
|
||||
def _run(self) -> None: ...
|
||||
def cancelled(self) -> bool: ...
|
||||
|
||||
class TimerHandle(Handle):
|
||||
def __init__(self, when: float, callback: Callable[..., Any], args: List[Any],
|
||||
@@ -35,6 +36,11 @@ class TimerHandle(Handle):
|
||||
class AbstractServer:
|
||||
sockets: Optional[List[socket]]
|
||||
def close(self) -> None: ...
|
||||
if sys.version_info >= (3, 7):
|
||||
def get_loop(self) -> AbstractEventLoop: ...
|
||||
def is_serving(self) -> bool: ...
|
||||
async def start_serving(self) -> None: ...
|
||||
async def serve_forever(self) -> None: ...
|
||||
@coroutine
|
||||
def wait_closed(self) -> Generator[Any, None, None]: ...
|
||||
|
||||
@@ -102,43 +108,93 @@ class AbstractEventLoop(metaclass=ABCMeta):
|
||||
@abstractmethod
|
||||
@coroutine
|
||||
def getnameinfo(self, sockaddr: tuple, flags: int = ...) -> Generator[Any, None, Tuple[str, int]]: ...
|
||||
@overload
|
||||
@abstractmethod
|
||||
@coroutine
|
||||
def create_connection(self, protocol_factory: _ProtocolFactory, host: str = ..., port: int = ..., *,
|
||||
ssl: _SSLContext = ..., family: int = ..., proto: int = ..., flags: int = ..., sock: None = ...,
|
||||
local_addr: Optional[str] = ..., server_hostname: Optional[str] = ...) -> Generator[Any, None, _TransProtPair]: ...
|
||||
@overload
|
||||
@abstractmethod
|
||||
@coroutine
|
||||
def create_connection(self, protocol_factory: _ProtocolFactory, host: None = ..., port: None = ..., *,
|
||||
ssl: _SSLContext = ..., family: int = ..., proto: int = ..., flags: int = ..., sock: socket,
|
||||
local_addr: None = ..., server_hostname: Optional[str] = ...) -> Generator[Any, None, _TransProtPair]: ...
|
||||
@overload
|
||||
@abstractmethod
|
||||
@coroutine
|
||||
def create_server(self, protocol_factory: _ProtocolFactory, host: Optional[Union[str, Sequence[str]]] = ..., port: int = ..., *,
|
||||
family: int = ..., flags: int = ...,
|
||||
sock: None = ..., backlog: int = ..., ssl: _SSLContext = ...,
|
||||
reuse_address: Optional[bool] = ...,
|
||||
reuse_port: Optional[bool] = ...) -> Generator[Any, None, AbstractServer]: ...
|
||||
@overload
|
||||
@abstractmethod
|
||||
@coroutine
|
||||
def create_server(self, protocol_factory: _ProtocolFactory, host: None = ..., port: None = ..., *,
|
||||
family: int = ..., flags: int = ...,
|
||||
sock: socket, backlog: int = ..., ssl: _SSLContext = ...,
|
||||
reuse_address: Optional[bool] = ...,
|
||||
reuse_port: Optional[bool] = ...) -> Generator[Any, None, AbstractServer]: ...
|
||||
@abstractmethod
|
||||
@coroutine
|
||||
def create_unix_connection(self, protocol_factory: _ProtocolFactory, path: str, *,
|
||||
ssl: _SSLContext = ..., sock: Optional[socket] = ...,
|
||||
server_hostname: str = ...) -> Generator[Any, None, _TransProtPair]: ...
|
||||
@abstractmethod
|
||||
@coroutine
|
||||
def create_unix_server(self, protocol_factory: _ProtocolFactory, path: str, *,
|
||||
sock: Optional[socket] = ..., backlog: int = ..., ssl: _SSLContext = ...) -> Generator[Any, None, AbstractServer]: ...
|
||||
if sys.version_info >= (3, 7):
|
||||
@abstractmethod
|
||||
async def sock_sendfile(self, sock: socket, file: IO[bytes], offset: int = ..., count: Optional[int] = ..., *,
|
||||
fallback: bool = ...) -> int: ...
|
||||
@overload
|
||||
@abstractmethod
|
||||
async def create_connection(self, protocol_factory: _ProtocolFactory, host: str = ..., port: int = ..., *,
|
||||
ssl: _SSLContext = ..., family: int = ..., proto: int = ..., flags: int = ...,
|
||||
sock: None = ..., local_addr: Optional[str] = ..., server_hostname: Optional[str] = ...,
|
||||
ssl_handshake_timeout: Optional[float] = ...) -> _TransProtPair: ...
|
||||
@overload
|
||||
@abstractmethod
|
||||
async def create_connection(self, protocol_factory: _ProtocolFactory, host: None = ..., port: None = ..., *,
|
||||
ssl: _SSLContext = ..., family: int = ..., proto: int = ..., flags: int = ...,
|
||||
sock: socket, local_addr: None = ..., server_hostname: Optional[str] = ...,
|
||||
ssl_handshake_timeout: Optional[float] = ...) -> _TransProtPair: ...
|
||||
@overload
|
||||
@abstractmethod
|
||||
async def create_server(self, protocol_factory: _ProtocolFactory, host: Optional[Union[str, Sequence[str]]] = ...,
|
||||
port: int = ..., *, family: int = ..., flags: int = ..., sock: None = ..., backlog: int = ...,
|
||||
ssl: _SSLContext = ..., reuse_address: Optional[bool] = ..., reuse_port: Optional[bool] = ...,
|
||||
ssl_handshake_timeout: Optional[float] = ..., start_serving: bool = ...) -> AbstractServer: ...
|
||||
@overload
|
||||
@abstractmethod
|
||||
async def create_server(self, protocol_factory: _ProtocolFactory, host: None = ..., port: None = ..., *,
|
||||
family: int = ..., flags: int = ..., sock: socket = ..., backlog: int = ...,
|
||||
ssl: _SSLContext = ..., reuse_address: Optional[bool] = ..., reuse_port: Optional[bool] = ...,
|
||||
ssl_handshake_timeout: Optional[float] = ..., start_serving: bool = ...) -> AbstractServer: ...
|
||||
@abstractmethod
|
||||
async def create_unix_connection(self, protocol_factory: _ProtocolFactory, path: str, *, ssl: _SSLContext = ...,
|
||||
sock: Optional[socket] = ..., server_hostname: str = ...,
|
||||
ssl_handshake_timeout: Optional[float]) -> _TransProtPair: ...
|
||||
@abstractmethod
|
||||
async def create_unix_server(self, protocol_factory: _ProtocolFactory, path: str, *, sock: Optional[socket] = ...,
|
||||
backlog: int = ..., ssl: _SSLContext = ..., ssl_handshake_timeout: Optional[float] = ...,
|
||||
start_serving: bool = ...) -> AbstractServer: ...
|
||||
@abstractmethod
|
||||
async def connect_accepted_socket(self, protocol_factory: _ProtocolFactory, sock: socket, *, ssl: _SSLContext = ...,
|
||||
ssl_handshake_timeout: Optional[float] = ...) -> _TransProtPair: ...
|
||||
@abstractmethod
|
||||
async def sendfile(self, transport: BaseTransport, file: IO[bytes], offset: int = ..., count: Optional[int] = ..., *,
|
||||
fallback: bool = ...) -> int: ...
|
||||
@abstractmethod
|
||||
async def start_tls(self, transport: BaseTransport, protocol: BaseProtocol, sslcontext: ssl.SSLContext, *,
|
||||
server_side: bool = ..., server_hostname: Optional[str] = ...,
|
||||
ssl_handshake_timeout: Optional[float] = ...) -> BaseTransport: ...
|
||||
else:
|
||||
@overload
|
||||
@abstractmethod
|
||||
@coroutine
|
||||
def create_connection(self, protocol_factory: _ProtocolFactory, host: str = ..., port: int = ..., *,
|
||||
ssl: _SSLContext = ..., family: int = ..., proto: int = ..., flags: int = ..., sock: None = ...,
|
||||
local_addr: Optional[str] = ..., server_hostname: Optional[str] = ...) -> Generator[Any, None, _TransProtPair]: ...
|
||||
@overload
|
||||
@abstractmethod
|
||||
@coroutine
|
||||
def create_connection(self, protocol_factory: _ProtocolFactory, host: None = ..., port: None = ..., *,
|
||||
ssl: _SSLContext = ..., family: int = ..., proto: int = ..., flags: int = ..., sock: socket,
|
||||
local_addr: None = ..., server_hostname: Optional[str] = ...) -> Generator[Any, None, _TransProtPair]: ...
|
||||
@overload
|
||||
@abstractmethod
|
||||
@coroutine
|
||||
def create_server(self, protocol_factory: _ProtocolFactory, host: Optional[Union[str, Sequence[str]]] = ..., port: int = ..., *,
|
||||
family: int = ..., flags: int = ...,
|
||||
sock: None = ..., backlog: int = ..., ssl: _SSLContext = ...,
|
||||
reuse_address: Optional[bool] = ...,
|
||||
reuse_port: Optional[bool] = ...) -> Generator[Any, None, AbstractServer]: ...
|
||||
@overload
|
||||
@abstractmethod
|
||||
@coroutine
|
||||
def create_server(self, protocol_factory: _ProtocolFactory, host: None = ..., port: None = ..., *,
|
||||
family: int = ..., flags: int = ...,
|
||||
sock: socket, backlog: int = ..., ssl: _SSLContext = ...,
|
||||
reuse_address: Optional[bool] = ...,
|
||||
reuse_port: Optional[bool] = ...) -> Generator[Any, None, AbstractServer]: ...
|
||||
@abstractmethod
|
||||
@coroutine
|
||||
def create_unix_connection(self, protocol_factory: _ProtocolFactory, path: str, *,
|
||||
ssl: _SSLContext = ..., sock: Optional[socket] = ...,
|
||||
server_hostname: str = ...) -> Generator[Any, None, _TransProtPair]: ...
|
||||
@abstractmethod
|
||||
@coroutine
|
||||
def create_unix_server(self, protocol_factory: _ProtocolFactory, path: str, *,
|
||||
sock: Optional[socket] = ..., backlog: int = ..., ssl: _SSLContext = ...) -> Generator[Any, None, AbstractServer]: ...
|
||||
@abstractmethod
|
||||
@coroutine
|
||||
def connect_accepted_socket(self, protocol_factory: _ProtocolFactory, sock: socket, *, ssl: _SSLContext = ...) -> Generator[Any, None, _TransProtPair]: ...
|
||||
@abstractmethod
|
||||
@coroutine
|
||||
def create_datagram_endpoint(self, protocol_factory: _ProtocolFactory,
|
||||
@@ -147,9 +203,6 @@ class AbstractEventLoop(metaclass=ABCMeta):
|
||||
reuse_address: Optional[bool] = ..., reuse_port: Optional[bool] = ...,
|
||||
allow_broadcast: Optional[bool] = ...,
|
||||
sock: Optional[socket] = ...) -> Generator[Any, None, _TransProtPair]: ...
|
||||
@abstractmethod
|
||||
@coroutine
|
||||
def connect_accepted_socket(self, protocol_factory: _ProtocolFactory, sock: socket, *, ssl: _SSLContext = ...) -> Generator[Any, None, _TransProtPair]: ...
|
||||
# Pipes and subprocesses.
|
||||
@abstractmethod
|
||||
@coroutine
|
||||
@@ -175,19 +228,27 @@ class AbstractEventLoop(metaclass=ABCMeta):
|
||||
def add_writer(self, fd: selectors._FileObject, callback: Callable[..., Any], *args: Any) -> None: ...
|
||||
@abstractmethod
|
||||
def remove_writer(self, fd: selectors._FileObject) -> None: ...
|
||||
# Completion based I/O methods returning Futures.
|
||||
@abstractmethod
|
||||
@coroutine
|
||||
def sock_recv(self, sock: socket, nbytes: int) -> Generator[Any, None, bytes]: ...
|
||||
@abstractmethod
|
||||
@coroutine
|
||||
def sock_sendall(self, sock: socket, data: bytes) -> Generator[Any, None, None]: ...
|
||||
@abstractmethod
|
||||
@coroutine
|
||||
def sock_connect(self, sock: socket, address: _Address) -> Generator[Any, None, None]: ...
|
||||
@abstractmethod
|
||||
@coroutine
|
||||
def sock_accept(self, sock: socket) -> Generator[Any, None, Tuple[socket, Any]]: ...
|
||||
# Completion based I/O methods returning Futures prior to 3.7
|
||||
if sys.version_info >= (3, 7):
|
||||
@abstractmethod
|
||||
async def sock_recv(self, sock: socket, nbytes: int) -> bytes: ...
|
||||
@abstractmethod
|
||||
async def sock_recv_into(self, sock: socket, buf: bytearray) -> int: ...
|
||||
@abstractmethod
|
||||
async def sock_sendall(self, sock: socket, data: bytes) -> None: ...
|
||||
@abstractmethod
|
||||
async def sock_connect(self, sock: socket, address: _Address) -> None: ...
|
||||
@abstractmethod
|
||||
async def sock_accept(self, sock: socket) -> Tuple[socket, _RetAddress]: ...
|
||||
else:
|
||||
@abstractmethod
|
||||
def sock_recv(self, sock: socket, nbytes: int) -> Future[bytes]: ...
|
||||
@abstractmethod
|
||||
def sock_sendall(self, sock: socket, data: bytes) -> Future[None]: ...
|
||||
@abstractmethod
|
||||
def sock_connect(self, sock: socket, address: _Address) -> Future[None]: ...
|
||||
@abstractmethod
|
||||
def sock_accept(self, sock: socket) -> Future[Tuple[socket, _RetAddress]]: ...
|
||||
# Signal handling.
|
||||
@abstractmethod
|
||||
def add_signal_handler(self, sig: int, callback: Callable[..., Any], *args: Any) -> None: ...
|
||||
|
||||
@@ -14,6 +14,10 @@ class Protocol(BaseProtocol):
|
||||
def data_received(self, data: bytes) -> None: ...
|
||||
def eof_received(self) -> Optional[bool]: ...
|
||||
|
||||
class BufferedProtocol(Protocol):
|
||||
def get_buffer(self, sizehint: int) -> bytearray: ...
|
||||
def buffer_updated(self, nbytes: int) -> None: ...
|
||||
|
||||
class DatagramProtocol(BaseProtocol):
|
||||
def datagram_received(self, data: Union[bytes, Text], addr: Tuple[str, int]) -> None: ...
|
||||
def error_received(self, exc: Exception) -> None: ...
|
||||
|
||||
@@ -1,4 +1,6 @@
|
||||
import sys
|
||||
from typing import Dict, Any, TypeVar, Mapping, List, Optional, Tuple
|
||||
from asyncio.protocols import BaseProtocol
|
||||
|
||||
__all__: List[str]
|
||||
|
||||
@@ -7,8 +9,13 @@ class BaseTransport:
|
||||
def get_extra_info(self, name: Any, default: Any = ...) -> Any: ...
|
||||
def is_closing(self) -> bool: ...
|
||||
def close(self) -> None: ...
|
||||
if sys.version_info >= (3, 5):
|
||||
def set_protocol(self, protocol: BaseProtocol) -> None: ...
|
||||
def get_protocol(self) -> BaseProtocol: ...
|
||||
|
||||
class ReadTransport(BaseTransport):
|
||||
if sys.version_info >= (3, 7):
|
||||
def is_reading(self) -> bool: ...
|
||||
def pause_reading(self) -> None: ...
|
||||
def resume_reading(self) -> None: ...
|
||||
|
||||
|
||||
Reference in New Issue
Block a user