mirror of
https://github.com/davidhalter/typeshed.git
synced 2025-12-08 13:04:46 +08:00
- Change the return type of create_connection, start_tls, connect_accepted_socket, create_unix_connection to Transport rather than BaseTransport (closes #9199). - Change the return type of create_datagram_endpoint to DatagramTransport rather than BaseTransport. - Change the argument of sendfile to WriteTransport rather than BaseTransport. I considered also changing the argument of start_tls to Transport, but I think that will give false positives for code that implements a custom transport class that inherits from both ReadTransport and WriteTransport but not from Transport, and I'm not sure if typing has a way to express an intersection of types. Since users are not normally expected to implement transports that may be overthinking things.
631 lines
23 KiB
Python
631 lines
23 KiB
Python
import ssl
|
|
import sys
|
|
from _typeshed import FileDescriptorLike, ReadableBuffer, Self, StrPath, WriteableBuffer
|
|
from abc import ABCMeta, abstractmethod
|
|
from collections.abc import Awaitable, Callable, Coroutine, Generator, Sequence
|
|
from contextvars import Context
|
|
from socket import AddressFamily, SocketKind, _Address, _RetAddress, socket
|
|
from typing import IO, Any, Protocol, TypeVar, overload
|
|
from typing_extensions import Literal, TypeAlias
|
|
|
|
from .base_events import Server
|
|
from .futures import Future
|
|
from .protocols import BaseProtocol
|
|
from .tasks import Task
|
|
from .transports import BaseTransport, DatagramTransport, ReadTransport, SubprocessTransport, Transport, WriteTransport
|
|
from .unix_events import AbstractChildWatcher
|
|
|
|
if sys.version_info >= (3, 8):
|
|
__all__ = (
|
|
"AbstractEventLoopPolicy",
|
|
"AbstractEventLoop",
|
|
"AbstractServer",
|
|
"Handle",
|
|
"TimerHandle",
|
|
"get_event_loop_policy",
|
|
"set_event_loop_policy",
|
|
"get_event_loop",
|
|
"set_event_loop",
|
|
"new_event_loop",
|
|
"get_child_watcher",
|
|
"set_child_watcher",
|
|
"_set_running_loop",
|
|
"get_running_loop",
|
|
"_get_running_loop",
|
|
)
|
|
|
|
else:
|
|
__all__ = (
|
|
"AbstractEventLoopPolicy",
|
|
"AbstractEventLoop",
|
|
"AbstractServer",
|
|
"Handle",
|
|
"TimerHandle",
|
|
"SendfileNotAvailableError",
|
|
"get_event_loop_policy",
|
|
"set_event_loop_policy",
|
|
"get_event_loop",
|
|
"set_event_loop",
|
|
"new_event_loop",
|
|
"get_child_watcher",
|
|
"set_child_watcher",
|
|
"_set_running_loop",
|
|
"get_running_loop",
|
|
"_get_running_loop",
|
|
)
|
|
|
|
_T = TypeVar("_T")
|
|
_ProtocolT = TypeVar("_ProtocolT", bound=BaseProtocol)
|
|
_Context: TypeAlias = dict[str, Any]
|
|
_ExceptionHandler: TypeAlias = Callable[[AbstractEventLoop, _Context], object]
|
|
_ProtocolFactory: TypeAlias = Callable[[], BaseProtocol]
|
|
_SSLContext: TypeAlias = bool | None | ssl.SSLContext
|
|
|
|
class _TaskFactory(Protocol):
|
|
def __call__(
|
|
self, __loop: AbstractEventLoop, __factory: Coroutine[Any, Any, _T] | Generator[Any, None, _T]
|
|
) -> Future[_T]: ...
|
|
|
|
class Handle:
|
|
_cancelled: bool
|
|
_args: Sequence[Any]
|
|
def __init__(
|
|
self, callback: Callable[..., object], args: Sequence[Any], loop: AbstractEventLoop, context: Context | None = ...
|
|
) -> None: ...
|
|
def cancel(self) -> None: ...
|
|
def _run(self) -> None: ...
|
|
def cancelled(self) -> bool: ...
|
|
|
|
class TimerHandle(Handle):
|
|
def __init__(
|
|
self,
|
|
when: float,
|
|
callback: Callable[..., object],
|
|
args: Sequence[Any],
|
|
loop: AbstractEventLoop,
|
|
context: Context | None = ...,
|
|
) -> None: ...
|
|
def when(self) -> float: ...
|
|
def __lt__(self, other: TimerHandle) -> bool: ...
|
|
def __le__(self, other: TimerHandle) -> bool: ...
|
|
def __gt__(self, other: TimerHandle) -> bool: ...
|
|
def __ge__(self, other: TimerHandle) -> bool: ...
|
|
def __eq__(self, other: object) -> bool: ...
|
|
|
|
class AbstractServer:
|
|
@abstractmethod
|
|
def close(self) -> None: ...
|
|
async def __aenter__(self: Self) -> Self: ...
|
|
async def __aexit__(self, *exc: object) -> None: ...
|
|
@abstractmethod
|
|
def get_loop(self) -> AbstractEventLoop: ...
|
|
@abstractmethod
|
|
def is_serving(self) -> bool: ...
|
|
@abstractmethod
|
|
async def start_serving(self) -> None: ...
|
|
@abstractmethod
|
|
async def serve_forever(self) -> None: ...
|
|
@abstractmethod
|
|
async def wait_closed(self) -> None: ...
|
|
|
|
class AbstractEventLoop:
|
|
slow_callback_duration: float
|
|
@abstractmethod
|
|
def run_forever(self) -> None: ...
|
|
# Can't use a union, see mypy issue # 1873.
|
|
@overload
|
|
@abstractmethod
|
|
def run_until_complete(self, future: Generator[Any, None, _T]) -> _T: ...
|
|
@overload
|
|
@abstractmethod
|
|
def run_until_complete(self, future: Awaitable[_T]) -> _T: ...
|
|
@abstractmethod
|
|
def stop(self) -> None: ...
|
|
@abstractmethod
|
|
def is_running(self) -> bool: ...
|
|
@abstractmethod
|
|
def is_closed(self) -> bool: ...
|
|
@abstractmethod
|
|
def close(self) -> None: ...
|
|
@abstractmethod
|
|
async def shutdown_asyncgens(self) -> None: ...
|
|
# Methods scheduling callbacks. All these return Handles.
|
|
if sys.version_info >= (3, 9): # "context" added in 3.9.10/3.10.2
|
|
@abstractmethod
|
|
def call_soon(self, callback: Callable[..., object], *args: Any, context: Context | None = ...) -> Handle: ...
|
|
@abstractmethod
|
|
def call_later(
|
|
self, delay: float, callback: Callable[..., object], *args: Any, context: Context | None = ...
|
|
) -> TimerHandle: ...
|
|
@abstractmethod
|
|
def call_at(
|
|
self, when: float, callback: Callable[..., object], *args: Any, context: Context | None = ...
|
|
) -> TimerHandle: ...
|
|
else:
|
|
@abstractmethod
|
|
def call_soon(self, callback: Callable[..., object], *args: Any) -> Handle: ...
|
|
@abstractmethod
|
|
def call_later(self, delay: float, callback: Callable[..., object], *args: Any) -> TimerHandle: ...
|
|
@abstractmethod
|
|
def call_at(self, when: float, callback: Callable[..., object], *args: Any) -> TimerHandle: ...
|
|
|
|
@abstractmethod
|
|
def time(self) -> float: ...
|
|
# Future methods
|
|
@abstractmethod
|
|
def create_future(self) -> Future[Any]: ...
|
|
# Tasks methods
|
|
if sys.version_info >= (3, 11):
|
|
@abstractmethod
|
|
def create_task(
|
|
self,
|
|
coro: Coroutine[Any, Any, _T] | Generator[Any, None, _T],
|
|
*,
|
|
name: str | None = ...,
|
|
context: Context | None = ...,
|
|
) -> Task[_T]: ...
|
|
elif sys.version_info >= (3, 8):
|
|
@abstractmethod
|
|
def create_task(
|
|
self, coro: Coroutine[Any, Any, _T] | Generator[Any, None, _T], *, name: str | None = ...
|
|
) -> Task[_T]: ...
|
|
else:
|
|
@abstractmethod
|
|
def create_task(self, coro: Coroutine[Any, Any, _T] | Generator[Any, None, _T]) -> Task[_T]: ...
|
|
|
|
@abstractmethod
|
|
def set_task_factory(self, factory: _TaskFactory | None) -> None: ...
|
|
@abstractmethod
|
|
def get_task_factory(self) -> _TaskFactory | None: ...
|
|
# Methods for interacting with threads
|
|
if sys.version_info >= (3, 9): # "context" added in 3.9.10/3.10.2
|
|
@abstractmethod
|
|
def call_soon_threadsafe(self, callback: Callable[..., object], *args: Any, context: Context | None = ...) -> Handle: ...
|
|
else:
|
|
@abstractmethod
|
|
def call_soon_threadsafe(self, callback: Callable[..., object], *args: Any) -> Handle: ...
|
|
|
|
@abstractmethod
|
|
def run_in_executor(self, executor: Any, func: Callable[..., _T], *args: Any) -> Future[_T]: ...
|
|
@abstractmethod
|
|
def set_default_executor(self, executor: Any) -> None: ...
|
|
# Network I/O methods returning Futures.
|
|
@abstractmethod
|
|
async def getaddrinfo(
|
|
self,
|
|
host: bytes | str | None,
|
|
port: bytes | str | int | None,
|
|
*,
|
|
family: int = ...,
|
|
type: int = ...,
|
|
proto: int = ...,
|
|
flags: int = ...,
|
|
) -> list[tuple[AddressFamily, SocketKind, int, str, tuple[str, int] | tuple[str, int, int, int]]]: ...
|
|
@abstractmethod
|
|
async def getnameinfo(self, sockaddr: tuple[str, int] | tuple[str, int, int, int], flags: int = ...) -> tuple[str, str]: ...
|
|
if sys.version_info >= (3, 11):
|
|
@overload
|
|
@abstractmethod
|
|
async def create_connection(
|
|
self,
|
|
protocol_factory: Callable[[], _ProtocolT],
|
|
host: str = ...,
|
|
port: int = ...,
|
|
*,
|
|
ssl: _SSLContext = ...,
|
|
family: int = ...,
|
|
proto: int = ...,
|
|
flags: int = ...,
|
|
sock: None = ...,
|
|
local_addr: tuple[str, int] | None = ...,
|
|
server_hostname: str | None = ...,
|
|
ssl_handshake_timeout: float | None = ...,
|
|
ssl_shutdown_timeout: float | None = ...,
|
|
happy_eyeballs_delay: float | None = ...,
|
|
interleave: int | None = ...,
|
|
) -> tuple[Transport, _ProtocolT]: ...
|
|
@overload
|
|
@abstractmethod
|
|
async def create_connection(
|
|
self,
|
|
protocol_factory: Callable[[], _ProtocolT],
|
|
host: None = ...,
|
|
port: None = ...,
|
|
*,
|
|
ssl: _SSLContext = ...,
|
|
family: int = ...,
|
|
proto: int = ...,
|
|
flags: int = ...,
|
|
sock: socket,
|
|
local_addr: None = ...,
|
|
server_hostname: str | None = ...,
|
|
ssl_handshake_timeout: float | None = ...,
|
|
ssl_shutdown_timeout: float | None = ...,
|
|
happy_eyeballs_delay: float | None = ...,
|
|
interleave: int | None = ...,
|
|
) -> tuple[Transport, _ProtocolT]: ...
|
|
elif sys.version_info >= (3, 8):
|
|
@overload
|
|
@abstractmethod
|
|
async def create_connection(
|
|
self,
|
|
protocol_factory: Callable[[], _ProtocolT],
|
|
host: str = ...,
|
|
port: int = ...,
|
|
*,
|
|
ssl: _SSLContext = ...,
|
|
family: int = ...,
|
|
proto: int = ...,
|
|
flags: int = ...,
|
|
sock: None = ...,
|
|
local_addr: tuple[str, int] | None = ...,
|
|
server_hostname: str | None = ...,
|
|
ssl_handshake_timeout: float | None = ...,
|
|
happy_eyeballs_delay: float | None = ...,
|
|
interleave: int | None = ...,
|
|
) -> tuple[Transport, _ProtocolT]: ...
|
|
@overload
|
|
@abstractmethod
|
|
async def create_connection(
|
|
self,
|
|
protocol_factory: Callable[[], _ProtocolT],
|
|
host: None = ...,
|
|
port: None = ...,
|
|
*,
|
|
ssl: _SSLContext = ...,
|
|
family: int = ...,
|
|
proto: int = ...,
|
|
flags: int = ...,
|
|
sock: socket,
|
|
local_addr: None = ...,
|
|
server_hostname: str | None = ...,
|
|
ssl_handshake_timeout: float | None = ...,
|
|
happy_eyeballs_delay: float | None = ...,
|
|
interleave: int | None = ...,
|
|
) -> tuple[Transport, _ProtocolT]: ...
|
|
else:
|
|
@overload
|
|
@abstractmethod
|
|
async def create_connection(
|
|
self,
|
|
protocol_factory: Callable[[], _ProtocolT],
|
|
host: str = ...,
|
|
port: int = ...,
|
|
*,
|
|
ssl: _SSLContext = ...,
|
|
family: int = ...,
|
|
proto: int = ...,
|
|
flags: int = ...,
|
|
sock: None = ...,
|
|
local_addr: tuple[str, int] | None = ...,
|
|
server_hostname: str | None = ...,
|
|
ssl_handshake_timeout: float | None = ...,
|
|
) -> tuple[Transport, _ProtocolT]: ...
|
|
@overload
|
|
@abstractmethod
|
|
async def create_connection(
|
|
self,
|
|
protocol_factory: Callable[[], _ProtocolT],
|
|
host: None = ...,
|
|
port: None = ...,
|
|
*,
|
|
ssl: _SSLContext = ...,
|
|
family: int = ...,
|
|
proto: int = ...,
|
|
flags: int = ...,
|
|
sock: socket,
|
|
local_addr: None = ...,
|
|
server_hostname: str | None = ...,
|
|
ssl_handshake_timeout: float | None = ...,
|
|
) -> tuple[Transport, _ProtocolT]: ...
|
|
if sys.version_info >= (3, 11):
|
|
@overload
|
|
@abstractmethod
|
|
async def create_server(
|
|
self,
|
|
protocol_factory: _ProtocolFactory,
|
|
host: str | Sequence[str] | None = ...,
|
|
port: int = ...,
|
|
*,
|
|
family: int = ...,
|
|
flags: int = ...,
|
|
sock: None = ...,
|
|
backlog: int = ...,
|
|
ssl: _SSLContext = ...,
|
|
reuse_address: bool | None = ...,
|
|
reuse_port: bool | None = ...,
|
|
ssl_handshake_timeout: float | None = ...,
|
|
ssl_shutdown_timeout: float | None = ...,
|
|
start_serving: bool = ...,
|
|
) -> Server: ...
|
|
@overload
|
|
@abstractmethod
|
|
async def create_server(
|
|
self,
|
|
protocol_factory: _ProtocolFactory,
|
|
host: None = ...,
|
|
port: None = ...,
|
|
*,
|
|
family: int = ...,
|
|
flags: int = ...,
|
|
sock: socket = ...,
|
|
backlog: int = ...,
|
|
ssl: _SSLContext = ...,
|
|
reuse_address: bool | None = ...,
|
|
reuse_port: bool | None = ...,
|
|
ssl_handshake_timeout: float | None = ...,
|
|
ssl_shutdown_timeout: float | None = ...,
|
|
start_serving: bool = ...,
|
|
) -> Server: ...
|
|
@abstractmethod
|
|
async def start_tls(
|
|
self,
|
|
transport: WriteTransport,
|
|
protocol: BaseProtocol,
|
|
sslcontext: ssl.SSLContext,
|
|
*,
|
|
server_side: bool = ...,
|
|
server_hostname: str | None = ...,
|
|
ssl_handshake_timeout: float | None = ...,
|
|
ssl_shutdown_timeout: float | None = ...,
|
|
) -> Transport: ...
|
|
async def create_unix_server(
|
|
self,
|
|
protocol_factory: _ProtocolFactory,
|
|
path: StrPath | None = ...,
|
|
*,
|
|
sock: socket | None = ...,
|
|
backlog: int = ...,
|
|
ssl: _SSLContext = ...,
|
|
ssl_handshake_timeout: float | None = ...,
|
|
ssl_shutdown_timeout: float | None = ...,
|
|
start_serving: bool = ...,
|
|
) -> Server: ...
|
|
else:
|
|
@overload
|
|
@abstractmethod
|
|
async def create_server(
|
|
self,
|
|
protocol_factory: _ProtocolFactory,
|
|
host: str | Sequence[str] | None = ...,
|
|
port: int = ...,
|
|
*,
|
|
family: int = ...,
|
|
flags: int = ...,
|
|
sock: None = ...,
|
|
backlog: int = ...,
|
|
ssl: _SSLContext = ...,
|
|
reuse_address: bool | None = ...,
|
|
reuse_port: bool | None = ...,
|
|
ssl_handshake_timeout: float | None = ...,
|
|
start_serving: bool = ...,
|
|
) -> Server: ...
|
|
@overload
|
|
@abstractmethod
|
|
async def create_server(
|
|
self,
|
|
protocol_factory: _ProtocolFactory,
|
|
host: None = ...,
|
|
port: None = ...,
|
|
*,
|
|
family: int = ...,
|
|
flags: int = ...,
|
|
sock: socket = ...,
|
|
backlog: int = ...,
|
|
ssl: _SSLContext = ...,
|
|
reuse_address: bool | None = ...,
|
|
reuse_port: bool | None = ...,
|
|
ssl_handshake_timeout: float | None = ...,
|
|
start_serving: bool = ...,
|
|
) -> Server: ...
|
|
@abstractmethod
|
|
async def start_tls(
|
|
self,
|
|
transport: BaseTransport,
|
|
protocol: BaseProtocol,
|
|
sslcontext: ssl.SSLContext,
|
|
*,
|
|
server_side: bool = ...,
|
|
server_hostname: str | None = ...,
|
|
ssl_handshake_timeout: float | None = ...,
|
|
) -> Transport: ...
|
|
async def create_unix_server(
|
|
self,
|
|
protocol_factory: _ProtocolFactory,
|
|
path: StrPath | None = ...,
|
|
*,
|
|
sock: socket | None = ...,
|
|
backlog: int = ...,
|
|
ssl: _SSLContext = ...,
|
|
ssl_handshake_timeout: float | None = ...,
|
|
start_serving: bool = ...,
|
|
) -> Server: ...
|
|
if sys.version_info >= (3, 11):
|
|
async def connect_accepted_socket(
|
|
self,
|
|
protocol_factory: Callable[[], _ProtocolT],
|
|
sock: socket,
|
|
*,
|
|
ssl: _SSLContext = ...,
|
|
ssl_handshake_timeout: float | None = ...,
|
|
ssl_shutdown_timeout: float | None = ...,
|
|
) -> tuple[Transport, _ProtocolT]: ...
|
|
elif sys.version_info >= (3, 10):
|
|
async def connect_accepted_socket(
|
|
self,
|
|
protocol_factory: Callable[[], _ProtocolT],
|
|
sock: socket,
|
|
*,
|
|
ssl: _SSLContext = ...,
|
|
ssl_handshake_timeout: float | None = ...,
|
|
) -> tuple[Transport, _ProtocolT]: ...
|
|
if sys.version_info >= (3, 11):
|
|
async def create_unix_connection(
|
|
self,
|
|
protocol_factory: Callable[[], _ProtocolT],
|
|
path: str | None = ...,
|
|
*,
|
|
ssl: _SSLContext = ...,
|
|
sock: socket | None = ...,
|
|
server_hostname: str | None = ...,
|
|
ssl_handshake_timeout: float | None = ...,
|
|
ssl_shutdown_timeout: float | None = ...,
|
|
) -> tuple[Transport, _ProtocolT]: ...
|
|
else:
|
|
async def create_unix_connection(
|
|
self,
|
|
protocol_factory: Callable[[], _ProtocolT],
|
|
path: str | None = ...,
|
|
*,
|
|
ssl: _SSLContext = ...,
|
|
sock: socket | None = ...,
|
|
server_hostname: str | None = ...,
|
|
ssl_handshake_timeout: float | None = ...,
|
|
) -> tuple[Transport, _ProtocolT]: ...
|
|
|
|
@abstractmethod
|
|
async def sock_sendfile(
|
|
self, sock: socket, file: IO[bytes], offset: int = ..., count: int | None = ..., *, fallback: bool | None = ...
|
|
) -> int: ...
|
|
@abstractmethod
|
|
async def sendfile(
|
|
self, transport: WriteTransport, file: IO[bytes], offset: int = ..., count: int | None = ..., *, fallback: bool = ...
|
|
) -> int: ...
|
|
@abstractmethod
|
|
async def create_datagram_endpoint(
|
|
self,
|
|
protocol_factory: Callable[[], _ProtocolT],
|
|
local_addr: tuple[str, int] | None = ...,
|
|
remote_addr: tuple[str, int] | None = ...,
|
|
*,
|
|
family: int = ...,
|
|
proto: int = ...,
|
|
flags: int = ...,
|
|
reuse_address: bool | None = ...,
|
|
reuse_port: bool | None = ...,
|
|
allow_broadcast: bool | None = ...,
|
|
sock: socket | None = ...,
|
|
) -> tuple[DatagramTransport, _ProtocolT]: ...
|
|
# Pipes and subprocesses.
|
|
@abstractmethod
|
|
async def connect_read_pipe(
|
|
self, protocol_factory: Callable[[], _ProtocolT], pipe: Any
|
|
) -> tuple[ReadTransport, _ProtocolT]: ...
|
|
@abstractmethod
|
|
async def connect_write_pipe(
|
|
self, protocol_factory: Callable[[], _ProtocolT], pipe: Any
|
|
) -> tuple[WriteTransport, _ProtocolT]: ...
|
|
@abstractmethod
|
|
async def subprocess_shell(
|
|
self,
|
|
protocol_factory: Callable[[], _ProtocolT],
|
|
cmd: bytes | str,
|
|
*,
|
|
stdin: int | IO[Any] | None = ...,
|
|
stdout: int | IO[Any] | None = ...,
|
|
stderr: int | IO[Any] | None = ...,
|
|
universal_newlines: Literal[False] = ...,
|
|
shell: Literal[True] = ...,
|
|
bufsize: Literal[0] = ...,
|
|
encoding: None = ...,
|
|
errors: None = ...,
|
|
text: Literal[False, None] = ...,
|
|
**kwargs: Any,
|
|
) -> tuple[SubprocessTransport, _ProtocolT]: ...
|
|
@abstractmethod
|
|
async def subprocess_exec(
|
|
self,
|
|
protocol_factory: Callable[[], _ProtocolT],
|
|
program: Any,
|
|
*args: Any,
|
|
stdin: int | IO[Any] | None = ...,
|
|
stdout: int | IO[Any] | None = ...,
|
|
stderr: int | IO[Any] | None = ...,
|
|
universal_newlines: Literal[False] = ...,
|
|
shell: Literal[True] = ...,
|
|
bufsize: Literal[0] = ...,
|
|
encoding: None = ...,
|
|
errors: None = ...,
|
|
**kwargs: Any,
|
|
) -> tuple[SubprocessTransport, _ProtocolT]: ...
|
|
@abstractmethod
|
|
def add_reader(self, fd: FileDescriptorLike, callback: Callable[..., Any], *args: Any) -> None: ...
|
|
@abstractmethod
|
|
def remove_reader(self, fd: FileDescriptorLike) -> bool: ...
|
|
@abstractmethod
|
|
def add_writer(self, fd: FileDescriptorLike, callback: Callable[..., Any], *args: Any) -> None: ...
|
|
@abstractmethod
|
|
def remove_writer(self, fd: FileDescriptorLike) -> bool: ...
|
|
# Completion based I/O methods returning Futures prior to 3.7
|
|
@abstractmethod
|
|
async def sock_recv(self, sock: socket, nbytes: int) -> bytes: ...
|
|
@abstractmethod
|
|
async def sock_recv_into(self, sock: socket, buf: WriteableBuffer) -> int: ...
|
|
@abstractmethod
|
|
async def sock_sendall(self, sock: socket, data: ReadableBuffer) -> None: ...
|
|
@abstractmethod
|
|
async def sock_connect(self, sock: socket, address: _Address) -> None: ...
|
|
@abstractmethod
|
|
async def sock_accept(self, sock: socket) -> tuple[socket, _RetAddress]: ...
|
|
if sys.version_info >= (3, 11):
|
|
@abstractmethod
|
|
async def sock_recvfrom(self, sock: socket, bufsize: int) -> bytes: ...
|
|
@abstractmethod
|
|
async def sock_recvfrom_into(self, sock: socket, buf: WriteableBuffer, nbytes: int = ...) -> int: ...
|
|
@abstractmethod
|
|
async def sock_sendto(self, sock: socket, data: ReadableBuffer, address: _Address) -> None: ...
|
|
# Signal handling.
|
|
@abstractmethod
|
|
def add_signal_handler(self, sig: int, callback: Callable[..., object], *args: Any) -> None: ...
|
|
@abstractmethod
|
|
def remove_signal_handler(self, sig: int) -> bool: ...
|
|
# Error handlers.
|
|
@abstractmethod
|
|
def set_exception_handler(self, handler: _ExceptionHandler | None) -> None: ...
|
|
@abstractmethod
|
|
def get_exception_handler(self) -> _ExceptionHandler | None: ...
|
|
@abstractmethod
|
|
def default_exception_handler(self, context: _Context) -> None: ...
|
|
@abstractmethod
|
|
def call_exception_handler(self, context: _Context) -> None: ...
|
|
# Debug flag management.
|
|
@abstractmethod
|
|
def get_debug(self) -> bool: ...
|
|
@abstractmethod
|
|
def set_debug(self, enabled: bool) -> None: ...
|
|
if sys.version_info >= (3, 9):
|
|
@abstractmethod
|
|
async def shutdown_default_executor(self) -> None: ...
|
|
|
|
class AbstractEventLoopPolicy:
|
|
@abstractmethod
|
|
def get_event_loop(self) -> AbstractEventLoop: ...
|
|
@abstractmethod
|
|
def set_event_loop(self, loop: AbstractEventLoop | None) -> None: ...
|
|
@abstractmethod
|
|
def new_event_loop(self) -> AbstractEventLoop: ...
|
|
# Child processes handling (Unix only).
|
|
@abstractmethod
|
|
def get_child_watcher(self) -> AbstractChildWatcher: ...
|
|
@abstractmethod
|
|
def set_child_watcher(self, watcher: AbstractChildWatcher) -> None: ...
|
|
|
|
class BaseDefaultEventLoopPolicy(AbstractEventLoopPolicy, metaclass=ABCMeta):
|
|
def get_event_loop(self) -> AbstractEventLoop: ...
|
|
def set_event_loop(self, loop: AbstractEventLoop | None) -> None: ...
|
|
def new_event_loop(self) -> AbstractEventLoop: ...
|
|
|
|
def get_event_loop_policy() -> AbstractEventLoopPolicy: ...
|
|
def set_event_loop_policy(policy: AbstractEventLoopPolicy | None) -> None: ...
|
|
def get_event_loop() -> AbstractEventLoop: ...
|
|
def set_event_loop(loop: AbstractEventLoop | None) -> None: ...
|
|
def new_event_loop() -> AbstractEventLoop: ...
|
|
def get_child_watcher() -> AbstractChildWatcher: ...
|
|
def set_child_watcher(watcher: AbstractChildWatcher) -> None: ...
|
|
def _set_running_loop(__loop: AbstractEventLoop | None) -> None: ...
|
|
def _get_running_loop() -> AbstractEventLoop: ...
|
|
def get_running_loop() -> AbstractEventLoop: ...
|
|
|
|
if sys.version_info < (3, 8):
|
|
class SendfileNotAvailableError(RuntimeError): ...
|