Improve socketserver (#7073)

Co-authored-by: Alex Waygood <Alex.Waygood@Gmail.com>
This commit is contained in:
Nikita Sobolev
2022-02-12 04:22:00 +03:00
committed by GitHub
parent 846c2dfa4f
commit 418574f82c
2 changed files with 18 additions and 26 deletions

View File

@@ -7,16 +7,23 @@ from typing import Any, BinaryIO, Callable, ClassVar, Union
_RequestType = Union[_socket, tuple[bytes, _socket]]
_AddressType = Union[tuple[str, int], str]
# This can possibly be generic at some point:
class BaseServer:
address_family: int
RequestHandlerClass: Callable[..., BaseRequestHandler]
server_address: tuple[str, int]
socket: _socket
allow_reuse_address: bool
request_queue_size: int
socket_type: int
timeout: float | None
def __init__(self, server_address: Any, RequestHandlerClass: Callable[..., BaseRequestHandler]) -> None: ...
def __init__(
self: Self, server_address: Any, RequestHandlerClass: Callable[[Any, Any, Self], BaseRequestHandler]
) -> None: ...
# It is not actually a `@property`, but we need a `Self` type:
@property
def RequestHandlerClass(self: Self) -> Callable[[Any, Any, Self], BaseRequestHandler]: ...
@RequestHandlerClass.setter
def RequestHandlerClass(self: Self, val: Callable[[Any, Any, Self], BaseRequestHandler]) -> None: ...
def fileno(self) -> int: ...
def handle_request(self) -> None: ...
def serve_forever(self, poll_interval: float = ...) -> None: ...
@@ -39,50 +46,34 @@ class BaseServer:
def close_request(self, request: _RequestType) -> None: ... # undocumented
class TCPServer(BaseServer):
allow_reuse_port: bool
request_queue_size: int
def __init__(
self,
self: Self,
server_address: tuple[str, int],
RequestHandlerClass: Callable[..., BaseRequestHandler],
RequestHandlerClass: Callable[[Any, Any, Self], BaseRequestHandler],
bind_and_activate: bool = ...,
) -> None: ...
def get_request(self) -> tuple[_socket, Any]: ...
def finish_request(self, request: _RequestType, client_address: _AddressType) -> None: ...
def handle_error(self, request: _RequestType, client_address: _AddressType) -> None: ...
def process_request(self, request: _RequestType, client_address: _AddressType) -> None: ...
def verify_request(self, request: _RequestType, client_address: _AddressType) -> bool: ...
def shutdown_request(self, request: _RequestType) -> None: ... # undocumented
def close_request(self, request: _RequestType) -> None: ... # undocumented
class UDPServer(BaseServer):
max_packet_size: ClassVar[int]
def __init__(
self,
server_address: tuple[str, int],
RequestHandlerClass: Callable[..., BaseRequestHandler],
bind_and_activate: bool = ...,
) -> None: ...
def get_request(self) -> tuple[tuple[bytes, _socket], Any]: ...
def finish_request(self, request: _RequestType, client_address: _AddressType) -> None: ...
def handle_error(self, request: _RequestType, client_address: _AddressType) -> None: ...
def process_request(self, request: _RequestType, client_address: _AddressType) -> None: ...
def verify_request(self, request: _RequestType, client_address: _AddressType) -> bool: ...
def shutdown_request(self, request: _RequestType) -> None: ... # undocumented
def close_request(self, request: _RequestType) -> None: ... # undocumented
if sys.platform != "win32":
class UnixStreamServer(BaseServer):
def __init__(
self,
self: Self,
server_address: str | bytes,
RequestHandlerClass: Callable[..., BaseRequestHandler],
RequestHandlerClass: Callable[[Any, Any, Self], BaseRequestHandler],
bind_and_activate: bool = ...,
) -> None: ...
class UnixDatagramServer(BaseServer):
def __init__(
self,
self: Self,
server_address: str | bytes,
RequestHandlerClass: Callable[..., BaseRequestHandler],
RequestHandlerClass: Callable[[Any, Any, Self], BaseRequestHandler],
bind_and_activate: bool = ...,
) -> None: ...