waitress: stubtest-complete and update usage of Any (#11796)

This commit is contained in:
Avasam
2024-04-20 08:34:40 -04:00
committed by GitHub
parent 4095e0bada
commit 425db9c122
18 changed files with 203 additions and 222 deletions

View File

@@ -1,33 +1,4 @@
waitress.adjustments.Adjustments.clear_untrusted_proxy_headers
waitress.adjustments.PY2
waitress.adjustments.string_types
waitress.channel.HTTPChannel.addr
waitress.channel.HTTPChannel.error_task_class
waitress.channel.HTTPChannel.parser_class
waitress.channel.HTTPChannel.request
waitress.channel.HTTPChannel.task_class
waitress.compat.PY2
waitress.compat.PY3
waitress.compat.ResourceWarning
waitress.compat.class_types
waitress.compat.exec_
waitress.compat.integer_types
waitress.compat.qualname
waitress.compat.reraise
waitress.compat.set_nonblocking
waitress.compat.string_types
waitress.compat.text_
waitress.compat.tobytes
waitress.compat.tostr
waitress.compat.unquote_bytes_to_wsgi
waitress.rfc7230.tobytes
waitress.server.BaseWSGIServer.channel_class
waitress.server.BaseWSGIServer.get_server_name
waitress.server.MultiSocketServer.__init__
waitress.server.WSGIServer
waitress.task.ErrorTask.content_length
waitress.task.ThreadedTaskDispatcher.start_new_thread
waitress.task.WSGITask.content_length
waitress.rfc7230.BWS
waitress.wasyncore.map
waitress.wasyncore.dispatcher_with_send.handle_write
waitress.__main__
# Leaked loop variable
waitress.utilities.i

View File

@@ -1 +0,0 @@
waitress.server.UnixWSGIServer.get_server_name

View File

@@ -1 +0,0 @@
waitress.server.UnixWSGIServer.get_server_name

View File

@@ -1,9 +1,6 @@
version = "2.1.*"
upstream_repository = "https://github.com/Pylons/waitress"
requires = []
partial_stub = true
[tool.stubtest]
ignore_missing_stub = true
# linux and darwin are equivalent
platforms = ["linux", "win32"]

View File

@@ -1,7 +1,18 @@
from typing import Any
from _typeshed import Unused
from _typeshed.wsgi import WSGIApplication
from collections.abc import Callable, Iterable
from typing import Any, Literal
from waitress.server import create_server as create_server
from waitress.adjustments import _AdjustmentsParams
from waitress.server import BaseWSGIServer
def serve(app: Any, **kw: Any) -> None: ...
def serve_paste(app: Any, global_conf: Any, **kw: Any) -> int: ...
def profile(cmd: Any, globals: Any, locals: Any, sort_order: tuple[str, ...], callers: bool) -> None: ...
def serve(
app: WSGIApplication,
*,
_server: Callable[..., BaseWSGIServer] = ...,
_quiet: bool = False,
_profile: bool = False,
**kw: _AdjustmentsParams,
) -> None: ...
def serve_paste(app: WSGIApplication, global_conf: Unused, **kw: _AdjustmentsParams) -> Literal[0]: ...
def profile(cmd: str, globals: dict[str, Any], locals: dict[str, Any], sort_order: Iterable[str], callers: bool) -> None: ...

View File

@@ -1,12 +1,14 @@
from _typeshed import Incomplete
from collections.abc import Iterable, Sequence
from socket import socket
from typing import Any
from typing import Final
from typing_extensions import TypeAlias
from .compat import HAS_IPV6 as HAS_IPV6, PY2 as PY2, WIN as WIN, string_types as string_types
from .proxy_headers import PROXY_HEADERS as PROXY_HEADERS
# Really complex, consider unpacking a TypedDict
_AdjustmentsParams: TypeAlias = Incomplete
truthy: frozenset[Any]
KNOWN_PROXY_HEADERS: frozenset[Any]
truthy: frozenset[str]
KNOWN_PROXY_HEADERS: Final[frozenset[str]]
def asbool(s: bool | str | int | None) -> bool: ...
def asoctal(s: str) -> int: ...
@@ -30,7 +32,7 @@ class Adjustments:
trusted_proxy_count: int | None
trusted_proxy_headers: set[str]
log_untrusted_proxy_headers: bool
clear_untrusted_proxy_headers: _bool_marker | bool
clear_untrusted_proxy_headers: type[_bool_marker] | bool
url_scheme: str
url_prefix: str
ident: str
@@ -55,8 +57,10 @@ class Adjustments:
ipv4: bool
ipv6: bool
sockets: list[socket]
def __init__(self, **kw: Any) -> None: ...
channel_request_lookahead: int
server_name: str
def __init__(self, **kw: _AdjustmentsParams) -> None: ...
@classmethod
def parse_args(cls, argv: str) -> tuple[dict[str, Any], Any]: ...
def parse_args(cls, argv: str) -> tuple[dict[str, bool], list[str]]: ...
@classmethod
def check_sockets(cls, sockets: Iterable[socket]) -> None: ...

View File

@@ -1,22 +1,22 @@
from _typeshed import ReadableBuffer
from io import BufferedIOBase, BufferedRandom, BytesIO
from typing import Any, Literal
from typing import Final, Literal, NoReturn
COPY_BYTES: int
STRBUF_LIMIT: int
COPY_BYTES: Final = 262144
STRBUF_LIMIT: Final = 8192
class FileBasedBuffer:
remain: int
file: BytesIO
def __init__(self, file: BytesIO, from_buffer: BytesIO | None = None) -> None: ...
def __len__(self) -> int: ...
def __nonzero__(self) -> bool: ...
def __bool__(self) -> Literal[True]: ...
def append(self, s: Any) -> None: ...
def append(self, s: ReadableBuffer) -> None: ...
def get(self, numbytes: int = -1, skip: bool = False) -> bytes: ...
def skip(self, numbytes: int, allow_prune: int = 0) -> None: ...
def newfile(self) -> Any: ...
def newfile(self) -> BufferedIOBase: ...
def prune(self) -> None: ...
def getfile(self) -> Any: ...
def getfile(self) -> BytesIO: ...
def close(self) -> None: ...
class TempfileBasedBuffer(FileBasedBuffer):
@@ -38,7 +38,7 @@ class ReadOnlyFileBasedBuffer(FileBasedBuffer):
def __iter__(self) -> ReadOnlyFileBasedBuffer: ...
def next(self) -> bytes | None: ...
__next__ = next
def append(self, s: Any) -> None: ...
def append(self, s: ReadableBuffer) -> NoReturn: ...
class OverflowableBuffer:
overflowed: bool
@@ -47,7 +47,6 @@ class OverflowableBuffer:
overflow: int
def __init__(self, overflow: int) -> None: ...
def __len__(self) -> int: ...
def __nonzero__(self) -> bool: ...
def __bool__(self) -> bool: ...
def append(self, s: bytes) -> None: ...
def get(self, numbytes: int = -1, skip: bool = False) -> bytes: ...

View File

@@ -1,5 +1,5 @@
from collections.abc import Mapping, Sequence
from socket import socket
from collections.abc import Sequence
from socket import _RetAddress, socket
from threading import Condition, Lock
from waitress.adjustments import Adjustments
@@ -8,15 +8,16 @@ from waitress.parser import HTTPRequestParser
from waitress.server import BaseWSGIServer
from waitress.task import ErrorTask, WSGITask
from . import wasyncore as wasyncore
from . import wasyncore
from .wasyncore import _SocketMap
class ClientDisconnected(Exception): ...
class HTTPChannel(wasyncore.dispatcher):
task_class: WSGITask
error_task_class: ErrorTask
parser_class: HTTPRequestParser
request: HTTPRequestParser
task_class: type[WSGITask]
error_task_class: type[ErrorTask]
parser_class: type[HTTPRequestParser]
request: HTTPRequestParser | None
last_activity: float
will_close: bool
close_when_flushed: bool
@@ -31,19 +32,21 @@ class HTTPChannel(wasyncore.dispatcher):
sendbuf_len: int
task_lock: Lock
outbuf_lock: Condition
addr: tuple[str, int]
addr: _RetAddress
def __init__(
self, server: BaseWSGIServer, sock: socket, addr: str, adj: Adjustments, map: Mapping[int, socket] | None = None
self, server: BaseWSGIServer, sock: socket, addr: _RetAddress, adj: Adjustments, map: _SocketMap | None = None
) -> None: ...
def check_client_disconnected(self) -> None: ...
def writable(self) -> bool: ...
def handle_write(self) -> None: ...
def readable(self) -> bool: ...
def handle_read(self) -> None: ...
def send_continue(self) -> None: ...
def received(self, data: bytes) -> bool: ...
connected: bool
def handle_close(self) -> None: ...
def add_channel(self, map: Mapping[int, socket] | None = None) -> None: ...
def del_channel(self, map: Mapping[int, socket] | None = None) -> None: ...
def add_channel(self, map: _SocketMap | None = None) -> None: ...
def del_channel(self, map: _SocketMap | None = None) -> None: ...
def write_soon(self, data: bytes) -> int: ...
def service(self) -> None: ...
def cancel(self) -> None: ...

View File

@@ -1,29 +1,7 @@
from io import TextIOWrapper
from typing import Any, Literal
from typing import Final
PY2: Literal[False]
PY3: Literal[True]
WIN: bool
string_types: tuple[str]
integer_types: tuple[int]
class_types: tuple[type]
def unquote_bytes_to_wsgi(bytestring: bytes) -> str: ...
def text_(s: str, encoding: str = ..., errors: str = ...) -> str: ...
def tostr(s: str) -> str: ...
def tobytes(s: str) -> bytes: ...
exec_: Any
def reraise(tp: Any, value: BaseException, tb: str | None = ...) -> None: ...
MAXINT: int
HAS_IPV6: bool
IPPROTO_IPV6: int
IPV6_V6ONLY: int
def set_nonblocking(fd: TextIOWrapper) -> None: ...
ResourceWarning: Warning
def qualname(cls: Any) -> str: ...
WIN: Final[bool]
MAXINT: Final[int]
HAS_IPV6: Final[bool]
IPPROTO_IPV6: Final[int]
IPV6_V6ONLY: Final[int]

View File

@@ -1,12 +1,13 @@
from collections.abc import Mapping, Sequence
from io import BytesIO
from re import Pattern
from typing import Any
from waitress.adjustments import Adjustments
from waitress.receiver import ChunkedReceiver, FixedStreamReceiver
from waitress.utilities import Error
def unquote_bytes_to_wsgi(bytestring: str | bytes | bytearray) -> str: ...
class ParsingError(Exception): ...
class TransferEncodingNotImplemented(Exception): ...
@@ -38,6 +39,6 @@ class HTTPRequestParser:
def split_uri(uri: bytes) -> tuple[str, str, bytes, str, str]: ...
def get_header_lines(header: bytes) -> Sequence[bytes]: ...
first_line_re: Pattern[Any]
first_line_re: Pattern[str]
def crack_first_line(line: str) -> tuple[bytes, bytes, bytes]: ...

View File

@@ -1,16 +1,17 @@
from collections.abc import Callable, Mapping, Sequence
from _typeshed.wsgi import WSGIApplication
from collections.abc import Mapping, Sequence
from logging import Logger
from typing import Any, NamedTuple
from typing import Final, NamedTuple, TypeVar
from .utilities import BadRequest as BadRequest
_T = TypeVar("_T")
PROXY_HEADERS: frozenset[Any]
PROXY_HEADERS: Final[frozenset[str]]
class Forwarded(NamedTuple):
by: Any
for_: Any
host: Any
proto: Any
by: str
for_: str
host: str
proto: str
class MalformedProxyHeader(Exception):
header: str
@@ -19,18 +20,18 @@ class MalformedProxyHeader(Exception):
def __init__(self, header: str, reason: str, value: str) -> None: ...
def proxy_headers_middleware(
app: Any,
app: WSGIApplication,
trusted_proxy: str | None = None,
trusted_proxy_count: int = 1,
trusted_proxy_headers: set[str] | None = None,
clear_untrusted: bool = True,
log_untrusted: bool = False,
logger: Logger = ...,
) -> Callable[..., Any]: ...
) -> WSGIApplication: ...
def parse_proxy_headers(
environ: Mapping[str, str], trusted_proxy_count: int, trusted_proxy_headers: set[str], logger: Logger = ...
) -> set[str]: ...
def strip_brackets(addr: str) -> str: ...
def strip_brackets(addr: Sequence[_T]) -> Sequence[_T]: ...
def clear_untrusted_headers(
environ: Mapping[str, str], untrusted_headers: Sequence[str], log_warning: bool = False, logger: Logger = ...
) -> None: ...

View File

@@ -1,13 +1,28 @@
from .compat import tobytes as tobytes
from re import Pattern
from typing import Final
WS: str
OWS: str
RWS: str
BWS = str
TCHAR: str
OBS_TEXT: str
TOKEN: str
VCHAR: str
FIELD_VCHAR: str
FIELD_CONTENT: str
FIELD_VALUE: str
BWS: Final = r"[ \t]{0,}?"
CHUNK_EXT: Final[str]
CHUNK_EXT_NAME: Final = r"[!#$%&'*+\-.^_`|~0-9A-Za-z]{1,}"
CHUNK_EXT_RE: Final[Pattern[str]]
CHUNK_EXT_VAL: Final[str]
DIGIT: Final = "[0-9]"
FIELD_CONTENT: Final[str]
FIELD_VALUE: Final[str]
FIELD_VCHAR: Final = r"[\x21-\x7e\x80-\xff]"
HEADER_FIELD_RE: Final[Pattern[str]]
HEXDIG: Final = r"[0-9a-fA-F]"
OBS_TEXT: Final = r"\x80-\xff"
ONLY_DIGIT_RE: Final[Pattern[str]]
ONLY_HEXDIG_RE: Final[Pattern[str]]
OWS: Final = r"[ \t]{0,}?"
QDTEXT: Final = r"[\t !#-[\]-~\x80-\xff]"
QUOTED_PAIR: Final = r"\\([\t \x21-\x7e\x80-\xff])"
QUOTED_PAIR_RE: Final[Pattern[str]]
QUOTED_STRING: Final[str]
QUOTED_STRING_RE: Final[Pattern[str]]
RWS: Final = r"[ \t]{1,}?"
TCHAR: Final = r"[!#$%&'*+\-.^_`|~0-9A-Za-z]"
TOKEN: Final = r"[!#$%&'*+\-.^_`|~0-9A-Za-z]{1,}"
VCHAR: Final = r"\x21-\x7e"
WS: Final = r"[ \t]"

View File

@@ -1,13 +1,14 @@
from _typeshed import Unused
from collections.abc import Callable, Sequence
from io import TextIOWrapper
from re import Pattern
from typing import Any
from typing import Any, Final
HELP: str
RUNNER_PATTERN: Pattern[Any]
HELP: Final[str]
RUNNER_PATTERN: Final[Pattern[str]]
def match(obj_name: str) -> tuple[str, str]: ...
def resolve(module_name: str, object_name: str) -> Any: ...
def resolve(module_name: str, object_name: str) -> Any: ... # Any module attribute
def show_help(stream: TextIOWrapper, name: str, error: str | None = None) -> None: ...
def show_exception(stream: TextIOWrapper) -> None: ...
def run(argv: Sequence[str] = ..., _serve: Callable[..., object] = ...) -> None: ...
def run(argv: Sequence[str] = ..., _serve: Callable[..., Unused] = ...) -> None: ...

View File

@@ -1,49 +1,54 @@
import sys
from _typeshed import Incomplete
from collections.abc import Sequence
from socket import socket
from typing import Any
from _typeshed import Unused
from _typeshed.wsgi import WSGIApplication
from collections.abc import Callable, Sequence
from socket import _RetAddress, socket
from typing import Literal
from waitress import wasyncore
from waitress.adjustments import Adjustments
from waitress.adjustments import Adjustments, _AdjustmentsParams
from waitress.channel import HTTPChannel
from waitress.task import Task, ThreadedTaskDispatcher
from waitress.wasyncore import _SocketMap
def create_server(
application: Any,
map: Incomplete | None = None,
application: WSGIApplication,
map: _SocketMap | None = None,
_start: bool = True,
_sock: socket | None = None,
_dispatcher: ThreadedTaskDispatcher | None = None,
**kw: Any,
**kw: _AdjustmentsParams,
) -> MultiSocketServer | BaseWSGIServer: ...
class MultiSocketServer:
asyncore: Any
asyncore = wasyncore
adj: Adjustments
map: Any
map: _SocketMap | None
effective_listen: Sequence[tuple[str, int]]
task_dispatcher: ThreadedTaskDispatcher
def __init__(
self,
map: Incomplete | None = None,
map: _SocketMap | None = None,
adj: Adjustments | None = None,
effective_listen: Sequence[tuple[str, int]] | None = None,
dispatcher: ThreadedTaskDispatcher | None = None,
# Can be None, but print_listen will fail
log_info: Callable[[str], Unused] | None = None,
) -> None: ...
def print_listen(self, format_str: str) -> None: ...
def run(self) -> None: ...
def close(self) -> None: ...
class BaseWSGIServer(wasyncore.dispatcher):
channel_class: HTTPChannel
channel_class: type[HTTPChannel]
next_channel_cleanup: int
socketmod: socket
asyncore: Any
sockinfo: tuple[int, int, int, tuple[str, int]]
asyncore = wasyncore
in_connection_overflow: bool
sockinfo: tuple[int, int, int | None, _RetAddress]
family: int
socktype: int
application: Any
application: WSGIApplication
adj: Adjustments
trigger: int
task_dispatcher: ThreadedTaskDispatcher
@@ -51,19 +56,18 @@ class BaseWSGIServer(wasyncore.dispatcher):
active_channels: HTTPChannel
def __init__(
self,
application: Any,
map: Incomplete | None = None,
application: WSGIApplication,
map: _SocketMap | None = None,
_start: bool = True,
_sock: Incomplete | None = None,
_sock: socket | None = None,
dispatcher: ThreadedTaskDispatcher | None = None,
adj: Adjustments | None = None,
sockinfo: Incomplete | None = None,
sockinfo: tuple[int, int, int | None, _RetAddress] | None = None,
bind_socket: bool = True,
**kw: Any,
**kw: _AdjustmentsParams,
) -> None: ...
def bind_server_socket(self) -> None: ...
def get_server_name(self, ip: str) -> str: ...
def getsockname(self) -> Any: ...
def getsockname(self) -> tuple[str, str]: ...
accepting: bool
def accept_connections(self) -> None: ...
def add_task(self, task: Task) -> None: ...
@@ -74,33 +78,32 @@ class BaseWSGIServer(wasyncore.dispatcher):
def handle_accept(self) -> None: ...
def run(self) -> None: ...
def pull_trigger(self) -> None: ...
def set_socket_options(self, conn: Any) -> None: ...
def fix_addr(self, addr: Any) -> Any: ...
def set_socket_options(self, conn: socket) -> None: ...
def fix_addr(self, addr: _RetAddress) -> _RetAddress: ...
def maintenance(self, now: int) -> None: ...
def print_listen(self, format_str: str) -> None: ...
def close(self) -> None: ...
class TcpWSGIServer(BaseWSGIServer):
def bind_server_socket(self) -> None: ...
def getsockname(self) -> tuple[str, tuple[str, int]]: ...
def getsockname(self) -> tuple[str, str]: ...
def set_socket_options(self, conn: socket) -> None: ...
if sys.platform != "win32":
class UnixWSGIServer(BaseWSGIServer):
def __init__(
self,
application: Any,
map: Incomplete | None = ...,
_start: bool = ...,
_sock: Incomplete | None = ...,
dispatcher: Incomplete | None = ...,
adj: Adjustments | None = ...,
sockinfo: Incomplete | None = ...,
**kw: Any,
application: WSGIApplication,
map: _SocketMap | None = None,
_start: bool = True,
_sock: socket | None = None,
dispatcher: ThreadedTaskDispatcher | None = None,
adj: Adjustments | None = None,
sockinfo: tuple[int, int, int | None, _RetAddress] | None = None,
**kw: _AdjustmentsParams,
) -> None: ...
def bind_server_socket(self) -> None: ...
def getsockname(self) -> tuple[str, tuple[str, int]]: ...
def fix_addr(self, addr: Any) -> tuple[str, None]: ...
def get_server_name(self, ip: Any) -> str: ...
def getsockname(self) -> tuple[str, str]: ...
def fix_addr(self, addr: _RetAddress) -> tuple[Literal["localhost"], None]: ...
WSGIServer: TcpWSGIServer
WSGIServer = TcpWSGIServer

View File

@@ -1,6 +1,6 @@
from _typeshed import Incomplete
from _typeshed import Unused
from collections import deque
from collections.abc import Mapping, Sequence
from collections.abc import Callable, Mapping, Sequence
from logging import Logger
from threading import Condition, Lock
from typing import Any
@@ -9,19 +9,19 @@ from .channel import HTTPChannel
from .utilities import Error
rename_headers: Mapping[str, str]
hop_by_hop: frozenset[Any]
hop_by_hop: frozenset[str]
class ThreadedTaskDispatcher:
stop_count: int
active_count: int
logger: Logger
queue_logger: Logger
threads: set[Any]
threads: set[int]
queue: deque[Task]
lock: Lock
queue_cv: Condition
thread_exit_cv: Condition
def start_new_thread(self, target: Any, args: Any) -> None: ...
def start_new_thread(self, target: Callable[[int], Unused], thread_no: int) -> None: ...
def handler_thread(self, thread_no: int) -> None: ...
def set_thread_count(self, count: int) -> None: ...
def add_task(self, task: Task) -> None: ...
@@ -57,15 +57,16 @@ class ErrorTask(Task):
complete: bool
status: str
close_on_finish: bool
content_length: int
content_length: int | None
def execute(self) -> None: ...
class WSGITask(Task):
environ: Incomplete | None
# Environment dict union too complex
environ: dict[str, Any] | None
response_headers: Sequence[tuple[str, str]]
complete: bool
status: str
content_length: int
content_length: int | None
close_on_finish: bool
def execute(self) -> None: ...
def get_environment(self) -> Any: ...
def get_environment(self) -> dict[str, Any]: ...

View File

@@ -1,21 +1,22 @@
import sys
from _typeshed import Unused
from collections.abc import Callable, Mapping
from socket import socket
from threading import Lock
from typing import Literal
from waitress import wasyncore as wasyncore
from waitress import wasyncore
class _triggerbase:
kind: str | None
lock: Lock
thunks: Callable[[None], None]
thunks: list[Callable[[None], None]]
def readable(self) -> Literal[True]: ...
def writable(self) -> Literal[False]: ...
def handle_connect(self) -> None: ...
def handle_close(self) -> None: ...
def close(self) -> None: ...
def pull_trigger(self, thunk: Callable[[None], object] | None = None) -> None: ...
def pull_trigger(self, thunk: Callable[[None], Unused] | None = None) -> None: ...
def handle_read(self) -> None: ...
if sys.platform != "win32":

View File

@@ -1,16 +1,16 @@
from _typeshed import Unused
from _typeshed.wsgi import StartResponse
from collections.abc import Iterator, Mapping, Sequence
from collections.abc import Iterable, Iterator, Mapping, Sequence
from logging import Logger
from re import Match, Pattern
from typing import Any
logger: Logger
queue_logger: Logger
def find_double_newline(s: bytes) -> int: ...
def concat(*args: Any) -> str: ...
def join(seq: Any, field: str = " ") -> str: ...
def group(s: Any) -> str: ...
def concat(*args: str) -> str: ...
def join(seq: Iterable[str], field: str = " ") -> str: ...
def group(s: object) -> str: ...
short_days: Sequence[str]
long_days: Sequence[str]
@@ -22,14 +22,14 @@ months: Sequence[str]
monmap: Mapping[str, int]
months_reg: str
rfc822_date: str
rfc822_reg: Pattern[Any]
rfc822_reg: Pattern[str]
def unpack_rfc822(m: Match[Any]) -> tuple[int, int, int, int, int, int, int, int, int]: ...
def unpack_rfc822(m: Match[str]) -> tuple[int, int, int, int, int, int, int, int, int]: ...
rfc850_date: str
rfc850_reg: Pattern[Any]
rfc850_reg: Pattern[str]
def unpack_rfc850(m: Match[Any]) -> tuple[int, int, int, int, int, int, int, int, int]: ...
def unpack_rfc850(m: Match[str]) -> tuple[int, int, int, int, int, int, int, int, int]: ...
weekdayname: Sequence[str]
monthname: Sequence[str]
@@ -45,7 +45,7 @@ class Error:
body: str
def __init__(self, body: str) -> None: ...
def to_response(self) -> tuple[str, Sequence[tuple[str, str]], str]: ...
def wsgi_response(self, environ: Any, start_response: StartResponse) -> Iterator[str]: ...
def wsgi_response(self, environ: Unused, start_response: StartResponse) -> Iterator[str]: ...
class BadRequest(Error):
code: int

View File

@@ -1,31 +1,27 @@
import sys
from collections.abc import Callable, Mapping
from _typeshed import ReadableBuffer
from collections.abc import Mapping
from io import BytesIO
from logging import Logger
from socket import socket
from typing import Any
from socket import _RetAddress, socket
from typing_extensions import TypeAlias
from waitress import compat as compat, utilities as utilities
_Socket: TypeAlias = socket
_SocketMap: TypeAlias = Mapping[int, socket]
socket_map: Mapping[int, socket]
map: Mapping[int, socket]
socket_map: _SocketMap
class ExitNow(Exception): ...
def read(obj: dispatcher) -> None: ...
def write(obj: dispatcher) -> None: ...
def readwrite(obj: dispatcher, flags: int) -> None: ...
def poll(timeout: float = 0.0, map: Mapping[int, socket] | None = None) -> None: ...
def poll2(timeout: float = 0.0, map: Mapping[int, socket] | None = None) -> None: ...
def poll(timeout: float = 0.0, map: _SocketMap | None = None) -> None: ...
def poll2(timeout: float = 0.0, map: _SocketMap | None = None) -> None: ...
poll3 = poll2
def loop(
timeout: float = 30.0, use_poll: bool = False, map: Mapping[int, socket] | None = None, count: int | None = None
) -> None: ...
def loop(timeout: float = 30.0, use_poll: bool = False, map: _SocketMap | None = None, count: int | None = None) -> None: ...
def compact_traceback() -> tuple[tuple[str, str, str], BaseException, BaseException, str]: ...
class dispatcher:
@@ -34,24 +30,25 @@ class dispatcher:
accepting: bool
connecting: bool
closing: bool
addr: tuple[str, int] | None
ignore_log_types: frozenset[Any]
addr: _RetAddress | None
ignore_log_types: frozenset[str]
logger: Logger
compact_traceback: Callable[[], tuple[tuple[str, str, str], BaseException, BaseException, str]]
socket: _Socket | None
def __init__(self, sock: _Socket | None = None, map: Mapping[int, _Socket] | None = None) -> None: ...
def add_channel(self, map: Mapping[int, _Socket] | None = None) -> None: ...
def del_channel(self, map: Mapping[int, _Socket] | None = None) -> None: ...
@staticmethod
def compact_traceback() -> tuple[tuple[str, str, str], BaseException, BaseException, str]: ...
socket: socket | None
def __init__(self, sock: _Socket | None = None, map: _SocketMap | None = None) -> None: ...
def add_channel(self, map: _SocketMap | None = None) -> None: ...
def del_channel(self, map: _SocketMap | None = None) -> None: ...
family_and_type: tuple[int, int]
def create_socket(self, family: int = ..., type: int = ...) -> None: ...
def set_socket(self, sock: _Socket, map: Mapping[int, _Socket] | None = None) -> None: ...
def set_socket(self, sock: _Socket, map: _SocketMap | None = None) -> None: ...
def set_reuse_addr(self) -> None: ...
def readable(self) -> bool: ...
def writable(self) -> bool: ...
def listen(self, num: int) -> None: ...
def bind(self, addr: tuple[str, int]) -> None: ...
def connect(self, address: tuple[str, int]) -> None: ...
def accept(self) -> tuple[_Socket, tuple[str, int]] | None: ...
def bind(self, addr: _RetAddress) -> None: ...
def connect(self, address: _RetAddress) -> None: ...
def accept(self) -> tuple[_Socket, _RetAddress] | None: ...
def send(self, data: bytes, do_close: bool = True) -> int: ...
def recv(self, buffer_size: int) -> bytes: ...
def close(self) -> None: ...
@@ -67,34 +64,34 @@ class dispatcher:
def handle_write(self) -> None: ...
def handle_connect(self) -> None: ...
def handle_accept(self) -> None: ...
def handle_accepted(self, sock: _Socket, addr: Any) -> None: ...
def handle_accepted(self, sock: _Socket, addr: _RetAddress) -> None: ...
def handle_close(self) -> None: ...
class dispatcher_with_send(dispatcher):
out_buffer: bytes
def __init__(self, sock: socket | None = None, map: Mapping[int, socket] | None = None) -> None: ...
def __init__(self, sock: _Socket | None = None, map: _SocketMap | None = None) -> None: ...
def initiate_send(self) -> None: ...
handle_write: Callable[[], None]
handle_write = initiate_send
def writable(self) -> bool: ...
def send(self, data: bytes) -> None: ... # type: ignore[override]
def close_all(map: Mapping[int, socket] | None = None, ignore_all: bool = False) -> None: ...
def close_all(map: _SocketMap | None = None, ignore_all: bool = False) -> None: ...
if sys.platform != "win32":
class file_wrapper:
fd: BytesIO
def __init__(self, fd: BytesIO) -> None: ...
def __del__(self) -> None: ...
def recv(self, *args: Any) -> bytes: ...
def send(self, *args: Any) -> bytes: ...
def getsockopt(self, level: int, optname: int, buflen: bool | None = ...) -> int: ...
read: Callable[..., bytes]
write: Callable[..., bytes]
def recv(self, length: int, /) -> bytes: ...
def send(self, data: ReadableBuffer, /) -> bytes: ...
def getsockopt(self, level: int, optname: int, buflen: bool | None = None) -> int: ...
read = recv
write = send
def close(self) -> None: ...
def fileno(self) -> BytesIO: ...
class file_dispatcher(dispatcher):
connected: bool
def __init__(self, fd: BytesIO, map: Mapping[int, _Socket] | None = ...) -> None: ...
socket: _Socket
def __init__(self, fd: BytesIO, map: _SocketMap | None = None) -> None: ...
socket: socket
def set_file(self, fd: BytesIO) -> None: ...