Files
typeshed/stdlib/multiprocessing/connection.pyi
2022-11-04 10:58:58 +01:00

70 lines
2.7 KiB
Python

import socket
import sys
import types
from _typeshed import ReadableBuffer, Self
from collections.abc import Iterable
from typing import Any, Union
from typing_extensions import SupportsIndex, TypeAlias
__all__ = ["Client", "Listener", "Pipe", "wait"]
# https://docs.python.org/3/library/multiprocessing.html#address-formats
_Address: TypeAlias = Union[str, tuple[str, int]]
class _ConnectionBase:
def __init__(self, handle: SupportsIndex, readable: bool = ..., writable: bool = ...) -> None: ...
@property
def closed(self) -> bool: ... # undocumented
@property
def readable(self) -> bool: ... # undocumented
@property
def writable(self) -> bool: ... # undocumented
def fileno(self) -> int: ...
def close(self) -> None: ...
def send_bytes(self, buf: ReadableBuffer, offset: int = ..., size: int | None = ...) -> None: ...
def send(self, obj: Any) -> None: ...
def recv_bytes(self, maxlength: int | None = ...) -> bytes: ...
def recv_bytes_into(self, buf: Any, offset: int = ...) -> int: ...
def recv(self) -> Any: ...
def poll(self, timeout: float | None = ...) -> bool: ...
def __enter__(self: Self) -> Self: ...
def __exit__(
self, exc_type: type[BaseException] | None, exc_value: BaseException | None, exc_tb: types.TracebackType | None
) -> None: ...
class Connection(_ConnectionBase): ...
if sys.platform == "win32":
class PipeConnection(_ConnectionBase): ...
class Listener:
def __init__(
self, address: _Address | None = ..., family: str | None = ..., backlog: int = ..., authkey: bytes | None = ...
) -> None: ...
def accept(self) -> Connection: ...
def close(self) -> None: ...
@property
def address(self) -> _Address: ...
@property
def last_accepted(self) -> _Address | None: ...
def __enter__(self: Self) -> Self: ...
def __exit__(
self, exc_type: type[BaseException] | None, exc_value: BaseException | None, exc_tb: types.TracebackType | None
) -> None: ...
def deliver_challenge(connection: Connection, authkey: bytes) -> None: ...
def answer_challenge(connection: Connection, authkey: bytes) -> None: ...
def wait(
object_list: Iterable[Connection | socket.socket | int], timeout: float | None = ...
) -> list[Connection | socket.socket | int]: ...
def Client(address: _Address, family: str | None = ..., authkey: bytes | None = ...) -> Connection: ...
# N.B. Keep this in sync with multiprocessing.context.BaseContext.Pipe.
# _ConnectionBase is the common base class of Connection and PipeConnection
# and can be used in cross-platform code.
if sys.platform != "win32":
def Pipe(duplex: bool = ...) -> tuple[Connection, Connection]: ...
else:
def Pipe(duplex: bool = ...) -> tuple[PipeConnection, PipeConnection]: ...