mirror of
https://github.com/davidhalter/typeshed.git
synced 2025-12-07 12:44:28 +08:00
Rework socket (#5545)
* Extract _socket.pyi from socket.pyi. * Extract _socket.socket from socket.socket. * Fix socket.family annotation. * Annotate SocketIO properly. * SocketType is an alias of _socket.socket. * Sort items in socket.pyi in the same order as in socket.py. * Remove socket.EINTR. * Use _typeshed.WriteableBuffer instead of custom alias. * Add errorTab (Windows only). * Add _socket.dup(). * Mark positional-only argments. * Remove constructors from socket exceptions. * socket.timeout is an alias for TimeoutError, starting with Python 3.10. * Use PEP 604 in changed lines. * Add alias for fileno arguments. * getaddrinfo() port can be bytes. * Explicitly override some SSLSocket methods. * Allow ReadableBuffer in _CMSG arguments.
This commit is contained in:
@@ -1,15 +1,17 @@
|
||||
import sys
|
||||
import types
|
||||
from socket import SocketType
|
||||
from socket import socket as _socket
|
||||
from typing import Any, BinaryIO, Callable, ClassVar, Optional, Set, Tuple, Type, TypeVar, Union
|
||||
|
||||
_T = TypeVar("_T")
|
||||
_RequestType = Union[_socket, Tuple[bytes, _socket]]
|
||||
_AddressType = Union[Tuple[str, int], str]
|
||||
|
||||
class BaseServer:
|
||||
address_family: int
|
||||
RequestHandlerClass: Callable[..., BaseRequestHandler]
|
||||
server_address: Tuple[str, int]
|
||||
socket: SocketType
|
||||
server_address: tuple[str, int]
|
||||
socket: _socket
|
||||
allow_reuse_address: bool
|
||||
request_queue_size: int
|
||||
socket_type: int
|
||||
@@ -20,59 +22,51 @@ class BaseServer:
|
||||
def serve_forever(self, poll_interval: float = ...) -> None: ...
|
||||
def shutdown(self) -> None: ...
|
||||
def server_close(self) -> None: ...
|
||||
def finish_request(
|
||||
self, request: Union[SocketType, Tuple[bytes, SocketType]], client_address: Union[Tuple[str, int], str]
|
||||
) -> None: ...
|
||||
def get_request(self) -> Tuple[Any, Any]: ...
|
||||
def handle_error(
|
||||
self, request: Union[SocketType, Tuple[bytes, SocketType]], client_address: Union[Tuple[str, int], str]
|
||||
) -> None: ...
|
||||
def finish_request(self, request: _RequestType, client_address: _AddressType) -> None: ...
|
||||
def get_request(self) -> tuple[Any, Any]: ...
|
||||
def handle_error(self, request: _RequestType, client_address: _AddressType) -> None: ...
|
||||
def handle_timeout(self) -> None: ...
|
||||
def process_request(
|
||||
self, request: Union[SocketType, Tuple[bytes, SocketType]], client_address: Union[Tuple[str, int], str]
|
||||
) -> None: ...
|
||||
def process_request(self, request: _RequestType, client_address: _AddressType) -> None: ...
|
||||
def server_activate(self) -> None: ...
|
||||
def server_bind(self) -> None: ...
|
||||
def verify_request(
|
||||
self, request: Union[SocketType, Tuple[bytes, SocketType]], client_address: Union[Tuple[str, int], str]
|
||||
) -> bool: ...
|
||||
def verify_request(self, request: _RequestType, client_address: _AddressType) -> bool: ...
|
||||
def __enter__(self: _T) -> _T: ...
|
||||
def __exit__(
|
||||
self, exc_type: Optional[Type[BaseException]], exc_val: Optional[BaseException], exc_tb: Optional[types.TracebackType]
|
||||
) -> None: ...
|
||||
def service_actions(self) -> None: ...
|
||||
def shutdown_request(self, request: Union[SocketType, Tuple[bytes, SocketType]]) -> None: ... # undocumented
|
||||
def close_request(self, request: Union[SocketType, Tuple[bytes, SocketType]]) -> None: ... # undocumented
|
||||
def shutdown_request(self, request: _RequestType) -> None: ... # undocumented
|
||||
def close_request(self, request: _RequestType) -> None: ... # undocumented
|
||||
|
||||
class TCPServer(BaseServer):
|
||||
def __init__(
|
||||
self,
|
||||
server_address: Tuple[str, int],
|
||||
server_address: tuple[str, int],
|
||||
RequestHandlerClass: Callable[..., BaseRequestHandler],
|
||||
bind_and_activate: bool = ...,
|
||||
) -> None: ...
|
||||
def get_request(self) -> Tuple[SocketType, Any]: ...
|
||||
def finish_request(self, request: SocketType, client_address: Union[Tuple[str, int], str]) -> None: ...
|
||||
def handle_error(self, request: SocketType, client_address: Union[Tuple[str, int], str]) -> None: ...
|
||||
def process_request(self, request: SocketType, client_address: Union[Tuple[str, int], str]) -> None: ...
|
||||
def verify_request(self, request: SocketType, client_address: Union[Tuple[str, int], str]) -> bool: ...
|
||||
def shutdown_request(self, request: SocketType) -> None: ... # undocumented
|
||||
def close_request(self, request: SocketType) -> None: ... # undocumented
|
||||
def get_request(self) -> tuple[_socket, Any]: ...
|
||||
def finish_request(self, request: _RequestType, client_address: _AddressType) -> None: ...
|
||||
def handle_error(self, request: _RequestType, client_address: _AddressType) -> None: ...
|
||||
def process_request(self, request: _RequestType, client_address: _AddressType) -> None: ...
|
||||
def verify_request(self, request: _RequestType, client_address: _AddressType) -> bool: ...
|
||||
def shutdown_request(self, request: _RequestType) -> None: ... # undocumented
|
||||
def close_request(self, request: _RequestType) -> None: ... # undocumented
|
||||
|
||||
class UDPServer(BaseServer):
|
||||
def __init__(
|
||||
self,
|
||||
server_address: Tuple[str, int],
|
||||
server_address: tuple[str, int],
|
||||
RequestHandlerClass: Callable[..., BaseRequestHandler],
|
||||
bind_and_activate: bool = ...,
|
||||
) -> None: ...
|
||||
def get_request(self) -> Tuple[Tuple[bytes, SocketType], Any]: ...
|
||||
def finish_request(self, request: Tuple[bytes, SocketType], client_address: Union[Tuple[str, int], str]) -> None: ...
|
||||
def handle_error(self, request: Tuple[bytes, SocketType], client_address: Union[Tuple[str, int], str]) -> None: ...
|
||||
def process_request(self, request: Tuple[bytes, SocketType], client_address: Union[Tuple[str, int], str]) -> None: ...
|
||||
def verify_request(self, request: Tuple[bytes, SocketType], client_address: Union[Tuple[str, int], str]) -> bool: ...
|
||||
def shutdown_request(self, request: Tuple[bytes, SocketType]) -> None: ... # undocumented
|
||||
def close_request(self, request: Tuple[bytes, SocketType]) -> None: ... # undocumented
|
||||
def get_request(self) -> tuple[tuple[bytes, _socket], Any]: ...
|
||||
def finish_request(self, request: _RequestType, client_address: _AddressType) -> None: ...
|
||||
def handle_error(self, request: _RequestType, client_address: _AddressType) -> None: ...
|
||||
def process_request(self, request: _RequestType, client_address: _AddressType) -> None: ...
|
||||
def verify_request(self, request: _RequestType, client_address: _AddressType) -> bool: ...
|
||||
def shutdown_request(self, request: _RequestType) -> None: ... # undocumented
|
||||
def close_request(self, request: _RequestType) -> None: ... # undocumented
|
||||
|
||||
if sys.platform != "win32":
|
||||
class UnixStreamServer(BaseServer):
|
||||
@@ -100,21 +94,15 @@ if sys.platform != "win32":
|
||||
def collect_children(self, *, blocking: bool = ...) -> None: ... # undocumented
|
||||
def handle_timeout(self) -> None: ... # undocumented
|
||||
def service_actions(self) -> None: ... # undocumented
|
||||
def process_request(
|
||||
self, request: Union[SocketType, Tuple[bytes, SocketType]], client_address: Union[Tuple[str, int], str]
|
||||
) -> None: ...
|
||||
def process_request(self, request: _RequestType, client_address: _AddressType) -> None: ...
|
||||
def server_close(self) -> None: ...
|
||||
|
||||
class ThreadingMixIn:
|
||||
daemon_threads: bool
|
||||
if sys.version_info >= (3, 7):
|
||||
block_on_close: bool
|
||||
def process_request_thread(
|
||||
self, request: Union[SocketType, Tuple[bytes, SocketType]], client_address: Union[Tuple[str, int], str]
|
||||
) -> None: ... # undocumented
|
||||
def process_request(
|
||||
self, request: Union[SocketType, Tuple[bytes, SocketType]], client_address: Union[Tuple[str, int], str]
|
||||
) -> None: ...
|
||||
def process_request_thread(self, request: _RequestType, client_address: _AddressType) -> None: ... # undocumented
|
||||
def process_request(self, request: _RequestType, client_address: _AddressType) -> None: ...
|
||||
def server_close(self) -> None: ...
|
||||
|
||||
if sys.platform != "win32":
|
||||
@@ -130,20 +118,15 @@ if sys.platform != "win32":
|
||||
|
||||
class BaseRequestHandler:
|
||||
# Those are technically of types, respectively:
|
||||
# * Union[SocketType, Tuple[bytes, SocketType]]
|
||||
# * Union[Tuple[str, int], str]
|
||||
# * _RequestType
|
||||
# * _AddressType
|
||||
# But there are some concerns that having unions here would cause
|
||||
# too much inconvenience to people using it (see
|
||||
# https://github.com/python/typeshed/pull/384#issuecomment-234649696)
|
||||
request: Any
|
||||
client_address: Any
|
||||
server: BaseServer
|
||||
def __init__(
|
||||
self,
|
||||
request: Union[SocketType, Tuple[bytes, SocketType]],
|
||||
client_address: Union[Tuple[str, int], str],
|
||||
server: BaseServer,
|
||||
) -> None: ...
|
||||
def __init__(self, request: _RequestType, client_address: _AddressType, server: BaseServer) -> None: ...
|
||||
def setup(self) -> None: ...
|
||||
def handle(self) -> None: ...
|
||||
def finish(self) -> None: ...
|
||||
@@ -153,12 +136,12 @@ class StreamRequestHandler(BaseRequestHandler):
|
||||
wbufsize: ClassVar[int] # Undocumented
|
||||
timeout: ClassVar[Optional[float]] # Undocumented
|
||||
disable_nagle_algorithm: ClassVar[bool] # Undocumented
|
||||
connection: SocketType # Undocumented
|
||||
connection: _socket # Undocumented
|
||||
rfile: BinaryIO
|
||||
wfile: BinaryIO
|
||||
|
||||
class DatagramRequestHandler(BaseRequestHandler):
|
||||
packet: SocketType # Undocumented
|
||||
socket: SocketType # Undocumented
|
||||
packet: _socket # Undocumented
|
||||
socket: _socket # Undocumented
|
||||
rfile: BinaryIO
|
||||
wfile: BinaryIO
|
||||
|
||||
Reference in New Issue
Block a user