mirror of
https://github.com/davidhalter/typeshed.git
synced 2025-12-09 05:24:52 +08:00
Continuing work towards #8988. The first five commits were created using stubdefaulter on various Python versions; the following commits were all created manually by me to fix various problems. The main things this adds that weren't present in #9501 are: - Defaults in Windows-only modules and Windows-only branches (because I'm running a Windows machine) - Defaults in non-py311 branches - Defaults for float parameters - Defaults for overloads
70 lines
2.7 KiB
Python
70 lines
2.7 KiB
Python
import socket
|
|
import sys
|
|
import types
|
|
from _typeshed import ReadableBuffer, Self
|
|
from collections.abc import Iterable
|
|
from typing import Any, Union
|
|
from typing_extensions import SupportsIndex, TypeAlias
|
|
|
|
__all__ = ["Client", "Listener", "Pipe", "wait"]
|
|
|
|
# https://docs.python.org/3/library/multiprocessing.html#address-formats
|
|
_Address: TypeAlias = Union[str, tuple[str, int]]
|
|
|
|
class _ConnectionBase:
|
|
def __init__(self, handle: SupportsIndex, readable: bool = True, writable: bool = True) -> None: ...
|
|
@property
|
|
def closed(self) -> bool: ... # undocumented
|
|
@property
|
|
def readable(self) -> bool: ... # undocumented
|
|
@property
|
|
def writable(self) -> bool: ... # undocumented
|
|
def fileno(self) -> int: ...
|
|
def close(self) -> None: ...
|
|
def send_bytes(self, buf: ReadableBuffer, offset: int = 0, size: int | None = None) -> None: ...
|
|
def send(self, obj: Any) -> None: ...
|
|
def recv_bytes(self, maxlength: int | None = None) -> bytes: ...
|
|
def recv_bytes_into(self, buf: Any, offset: int = 0) -> int: ...
|
|
def recv(self) -> Any: ...
|
|
def poll(self, timeout: float | None = 0.0) -> bool: ...
|
|
def __enter__(self: Self) -> Self: ...
|
|
def __exit__(
|
|
self, exc_type: type[BaseException] | None, exc_value: BaseException | None, exc_tb: types.TracebackType | None
|
|
) -> None: ...
|
|
|
|
class Connection(_ConnectionBase): ...
|
|
|
|
if sys.platform == "win32":
|
|
class PipeConnection(_ConnectionBase): ...
|
|
|
|
class Listener:
|
|
def __init__(
|
|
self, address: _Address | None = None, family: str | None = None, backlog: int = 1, authkey: bytes | None = None
|
|
) -> None: ...
|
|
def accept(self) -> Connection: ...
|
|
def close(self) -> None: ...
|
|
@property
|
|
def address(self) -> _Address: ...
|
|
@property
|
|
def last_accepted(self) -> _Address | None: ...
|
|
def __enter__(self: Self) -> Self: ...
|
|
def __exit__(
|
|
self, exc_type: type[BaseException] | None, exc_value: BaseException | None, exc_tb: types.TracebackType | None
|
|
) -> None: ...
|
|
|
|
def deliver_challenge(connection: Connection, authkey: bytes) -> None: ...
|
|
def answer_challenge(connection: Connection, authkey: bytes) -> None: ...
|
|
def wait(
|
|
object_list: Iterable[Connection | socket.socket | int], timeout: float | None = None
|
|
) -> list[Connection | socket.socket | int]: ...
|
|
def Client(address: _Address, family: str | None = None, authkey: bytes | None = None) -> Connection: ...
|
|
|
|
# N.B. Keep this in sync with multiprocessing.context.BaseContext.Pipe.
|
|
# _ConnectionBase is the common base class of Connection and PipeConnection
|
|
# and can be used in cross-platform code.
|
|
if sys.platform != "win32":
|
|
def Pipe(duplex: bool = True) -> tuple[Connection, Connection]: ...
|
|
|
|
else:
|
|
def Pipe(duplex: bool = True) -> tuple[PipeConnection, PipeConnection]: ...
|