Add Waitress stubs (#3889)

This commit is contained in:
dosisod
2020-05-13 07:22:13 -07:00
committed by GitHub
parent b8045a3fb2
commit 2ead8123a6
15 changed files with 721 additions and 0 deletions

6
third_party/3/waitress/__init__.pyi vendored Normal file
View File

@@ -0,0 +1,6 @@
from typing import Any, Tuple
from waitress.server import create_server
def serve(app: Any, **kw: Any) -> None: ...
def serve_paste(app: Any, global_conf: Any, **kw: Any) -> int: ...
def profile(cmd: Any, globals: Any, locals: Any, sort_order: Tuple[str, ...], callers: bool) -> None: ...

60
third_party/3/waitress/adjustments.pyi vendored Normal file
View File

@@ -0,0 +1,60 @@
from .compat import HAS_IPV6, PY2, WIN, string_types
from .proxy_headers import PROXY_HEADERS
from socket import SocketType
from typing import Any, Dict, FrozenSet, Iterable, List, Sequence, Optional, Set, Tuple, Union
truthy: FrozenSet
KNOWN_PROXY_HEADERS: FrozenSet
def asbool(s: Optional[Union[bool, str, int]]) -> bool: ...
def asoctal(s: str) -> int: ...
def aslist_cronly(value: str) -> List[str]: ...
def aslist(value: str) -> List[str]: ...
def asset(value: Optional[str]) -> Set[str]: ...
def slash_fixed_str(s: Optional[str]) -> str: ...
def str_iftruthy(s: Optional[str]) -> Optional[str]: ...
def as_socket_list(sockets: Sequence[object]) -> List[SocketType]: ...
class _str_marker(str): ...
class _int_marker(int): ...
class _bool_marker: ...
class Adjustments:
host: _str_marker = ...
port: _int_marker = ...
listen: List[str] = ...
threads: int = ...
trusted_proxy: Optional[str] = ...
trusted_proxy_count: Optional[int] = ...
trusted_proxy_headers: Set[str] = ...
log_untrusted_proxy_headers: bool = ...
clear_untrusted_proxy_headers: Union[_bool_marker, bool] = ...
url_scheme: str = ...
url_prefix: str = ...
ident: str = ...
backlog: int = ...
recv_bytes: int = ...
send_bytes: int = ...
outbuf_overflow: int = ...
outbuf_high_watermark: int = ...
inbuf_overflow: int = ...
connection_limit: int = ...
cleanup_interval: int = ...
channel_timeout: int = ...
log_socket_errors: bool = ...
max_request_header_size: int = ...
max_request_body_size: int = ...
expose_tracebacks: bool = ...
unix_socket: Optional[str] = ...
unix_socket_perms: int = ...
socket_options: List[Tuple[int, int, int]] = ...
asyncore_loop_timeout: int = ...
asyncore_use_poll: bool = ...
ipv4: bool = ...
ipv6: bool = ...
sockets: List[SocketType] = ...
def __init__(self, **kw: Any) -> None: ...
@classmethod
def parse_args(cls, argv: str) -> Tuple[Dict[str, Any], Any]: ...
@classmethod
def check_sockets(cls, sockets: Iterable[SocketType]) -> None: ...

57
third_party/3/waitress/buffers.pyi vendored Normal file
View File

@@ -0,0 +1,57 @@
from io import BufferedIOBase, BufferedRandom, BytesIO
from typing import Any, Callable, Optional
COPY_BYTES: int
STRBUF_LIMIT: int
class FileBasedBuffer:
remain: int = ...
file: BytesIO = ...
def __init__(self, file: BytesIO, from_buffer: Optional[BytesIO] = ...) -> None: ...
def __len__(self) -> int: ...
def __nonzero__(self) -> bool: ...
__bool__: Callable[[], bool] = ...
def append(self, s: Any) -> None: ...
def get(self, numbytes: int = ..., skip: bool = ...) -> bytes: ...
def skip(self, numbytes: int, allow_prune: int = ...) -> None: ...
def newfile(self) -> Any: ...
def prune(self) -> None: ...
def getfile(self) -> Any: ...
def close(self) -> None: ...
class TempfileBasedBuffer(FileBasedBuffer):
def __init__(self, from_buffer: Optional[BytesIO] = ...) -> None: ...
def newfile(self) -> BufferedRandom: ...
class BytesIOBasedBuffer(FileBasedBuffer):
file: BytesIO = ...
def __init__(self, from_buffer: Optional[BytesIO] = ...) -> None: ...
def newfile(self) -> BytesIO: ...
class ReadOnlyFileBasedBuffer(FileBasedBuffer):
file: BytesIO = ...
block_size: int = ...
def __init__(self, file: BytesIO, block_size: int = ...) -> None: ...
remain: int = ...
def prepare(self, size: Optional[int] = ...) -> int: ...
def get(self, numbytes: int = ..., skip: bool = ...) -> bytes: ...
def __iter__(self) -> ReadOnlyFileBasedBuffer: ...
def next(self) -> Optional[bytes]: ...
__next__: Callable[[], Optional[bytes]] = ...
def append(self, s: Any) -> None: ...
class OverflowableBuffer:
overflowed: bool = ...
buf: Optional[BufferedIOBase] = ...
strbuf: bytes = ...
overflow: int = ...
def __init__(self, overflow: int) -> None: ...
def __len__(self) -> int: ...
def __nonzero__(self) -> bool: ...
__bool__: Callable[[], bool] = ...
def append(self, s: bytes) -> None: ...
def get(self, numbytes: int = ..., skip: bool = ...) -> bytes: ...
def skip(self, numbytes: int, allow_prune: bool = ...) -> None: ...
def prune(self) -> None: ...
def getfile(self) -> BytesIO: ...
def close(self) -> None: ...

48
third_party/3/waitress/channel.pyi vendored Normal file
View File

@@ -0,0 +1,48 @@
from . import wasyncore as wasyncore
from socket import SocketType
from threading import Condition, Lock
from typing import Mapping, Sequence, Optional, Tuple
from waitress.adjustments import Adjustments
from waitress.buffers import OverflowableBuffer, ReadOnlyFileBasedBuffer
from waitress.parser import HTTPRequestParser
from waitress.server import BaseWSGIServer
from waitress.task import ErrorTask, WSGITask, Task
from waitress.utilities import InternalServerError
class ClientDisconnected(Exception): ...
class HTTPChannel(wasyncore.dispatcher):
task_class: WSGITask = ...
error_task_class: ErrorTask = ...
parser_class: HTTPRequestParser = ...
request: HTTPRequestParser = ...
last_activity: float = ...
will_close: bool = ...
close_when_flushed: bool = ...
requests: Sequence[HTTPRequestParser] = ...
sent_continue: bool = ...
total_outbufs_len: int = ...
current_outbuf_count: int = ...
server: BaseWSGIServer = ...
adj: Adjustments = ...
outbufs: Sequence[OverflowableBuffer] = ...
creation_time: float = ...
sendbuf_len: int = ...
task_lock: Lock = ...
outbuf_lock: Condition = ...
addr: Tuple[str, int] = ...
def __init__(
self, server: BaseWSGIServer, sock: SocketType, addr: str, adj: Adjustments, map: Optional[Mapping[int, SocketType]] = ...
) -> None: ...
def writable(self) -> bool: ...
def handle_write(self) -> None: ...
def readable(self) -> bool: ...
def handle_read(self) -> None: ...
def received(self, data: bytes) -> bool: ...
connected: bool = ...
def handle_close(self) -> None: ...
def add_channel(self, map: Optional[Mapping[int, SocketType]] = ...) -> None: ...
def del_channel(self, map: Optional[Mapping[int, SocketType]] = ...) -> None: ...
def write_soon(self, data: bytes) -> int: ...
def service(self) -> None: ...
def cancel(self) -> None: ...

38
third_party/3/waitress/compat.pyi vendored Normal file
View File

@@ -0,0 +1,38 @@
from io import TextIOWrapper
import sys
from typing import Any, Optional, Text, Tuple
if sys.version_info[0] == 3:
from urllib import parse as urlparse
else:
import urlparse
PY2: bool
PY3: bool
WIN: bool
string_types: Tuple[str, ]
integer_types: Tuple[int, ]
class_types: Tuple[type, ]
text_type = str
binary_type = bytes
long = int
def unquote_bytes_to_wsgi(bytestring: bytes) -> str: ...
def text_(s: Text, encoding: str = ..., errors: str = ...) -> str: ...
def tostr(s: Text) -> str: ...
def tobytes(s: Text) -> bytes: ...
exec_: Any
def reraise(tp: Any, value: BaseException, tb: Optional[str] = ...) -> None: ...
MAXINT: int
HAS_IPV6: bool
IPPROTO_IPV6: int
IPV6_V6ONLY: int
def set_nonblocking(fd: TextIOWrapper) -> None: ...
ResourceWarning: Warning
def qualname(cls: Any) -> str: ...

50
third_party/3/waitress/parser.pyi vendored Normal file
View File

@@ -0,0 +1,50 @@
from .rfc7230 import HEADER_FIELD
from io import BytesIO
from typing import Mapping, Sequence, Optional, Pattern, Tuple, Union
from waitress.adjustments import Adjustments
from waitress.buffers import OverflowableBuffer
from waitress.compat import tostr, unquote_bytes_to_wsgi, urlparse
from waitress.receiver import ChunkedReceiver, FixedStreamReceiver
from waitress.utilities import (
BadRequest,
RequestEntityTooLarge,
RequestHeaderFieldsTooLarge,
ServerNotImplemented,
find_double_newline,
Error,
)
class ParsingError(Exception): ...
class TransferEncodingNotImplemented(Exception): ...
class HTTPRequestParser:
completed: bool = ...
empty: bool = ...
expect_continue: bool = ...
headers_finished: bool = ...
header_plus: bytes = ...
chunked: bool = ...
content_length: int = ...
header_bytes_received: int = ...
body_bytes_received: int = ...
body_rcv: Optional[Union[ChunkedReceiver, FixedStreamReceiver]] = ...
version: str = ...
error: Optional[Error] = ...
connection_close: bool = ...
headers: Mapping[str, str] = ...
adj: Adjustments = ...
def __init__(self, adj: Adjustments) -> None: ...
def received(self, data: bytes) -> int: ...
first_line: str = ...
command: bytes = ...
url_scheme: str = ...
def parse_header(self, header_plus: bytes) -> None: ...
def get_body_stream(self) -> BytesIO: ...
def close(self) -> None: ...
def split_uri(uri: bytes) -> Tuple[str, str, bytes, str, str]: ...
def get_header_lines(header: bytes) -> Sequence[bytes]: ...
first_line_re: Pattern
def crack_first_line(line: str) -> Tuple[bytes, bytes, bytes]: ...

View File

@@ -0,0 +1,31 @@
from .utilities import BadRequest, logger, undquote
from collections import namedtuple
from logging import Logger
from typing import Any, Callable, Mapping, Sequence, Optional, Set
PROXY_HEADERS: frozenset
Forwarded = namedtuple("Forwarded", ["by", "for_", "host", "proto"])
class MalformedProxyHeader(Exception):
header: str = ...
reason: str = ...
value: str = ...
def __init__(self, header: str, reason: str, value: str) -> None: ...
def proxy_headers_middleware(
app: Any,
trusted_proxy: Optional[str] = ...,
trusted_proxy_count: int = ...,
trusted_proxy_headers: Optional[Set[str]] = ...,
clear_untrusted: bool = ...,
log_untrusted: bool = ...,
logger: Logger = ...,
) -> Callable[..., Any]: ...
def parse_proxy_headers(
environ: Mapping[str, str], trusted_proxy_count: int, trusted_proxy_headers: Set[str], logger: Logger = ...
) -> Set[str]: ...
def strip_brackets(addr: str) -> str: ...
def clear_untrusted_headers(
environ: Mapping[str, str], untrusted_headers: Sequence[str], log_warning: bool = ..., logger: Logger = ...
) -> None: ...

31
third_party/3/waitress/receiver.pyi vendored Normal file
View File

@@ -0,0 +1,31 @@
from io import BytesIO
from typing import Optional
from waitress.buffers import OverflowableBuffer
from waitress.utilities import BadRequest, find_double_newline
class FixedStreamReceiver:
completed: bool = ...
error: None = ...
remain: int = ...
buf: OverflowableBuffer = ...
def __init__(self, cl: int, buf: OverflowableBuffer) -> None: ...
def __len__(self) -> int: ...
def received(self, data: bytes) -> int: ...
def getfile(self) -> BytesIO: ...
def getbuf(self) -> OverflowableBuffer: ...
class ChunkedReceiver:
chunk_remainder: int = ...
validate_chunk_end: bool = ...
control_line: bytes = ...
chunk_end: bytes = ...
all_chunks_received: bool = ...
trailer: bytes = ...
completed: bool = ...
error: Optional[BadRequest] = ...
buf: OverflowableBuffer = ...
def __init__(self, buf: OverflowableBuffer) -> None: ...
def __len__(self) -> int: ...
def received(self, s: bytes) -> int: ...
def getfile(self) -> BytesIO: ...
def getbuf(self) -> OverflowableBuffer: ...

15
third_party/3/waitress/rfc7230.pyi vendored Normal file
View File

@@ -0,0 +1,15 @@
from .compat import tobytes
from typing import Pattern
WS: str
OWS: str
RWS: str
BWS = str
TCHAR: str
OBS_TEXT: str
TOKEN: str
VCHAR: str
FIELD_VCHAR: str
FIELD_CONTENT: str
FIELD_VALUE: str
HEADER_FIELD: Pattern

12
third_party/3/waitress/runner.pyi vendored Normal file
View File

@@ -0,0 +1,12 @@
from io import TextIOWrapper
from typing import Any, Callable, Sequence, Optional, Pattern, Tuple
from waitress import serve
HELP: str
RUNNER_PATTERN: Pattern
def match(obj_name: str) -> Tuple[str, str]: ...
def resolve(module_name: str, object_name: str) -> Any: ...
def show_help(stream: TextIOWrapper, name: str, error: Optional[str] = ...) -> None: ...
def show_exception(stream: TextIOWrapper) -> None: ...
def run(argv: Sequence[str] = ..., _serve: Callable[..., Any] = ...): ...

105
third_party/3/waitress/server.pyi vendored Normal file
View File

@@ -0,0 +1,105 @@
from . import wasyncore
from .proxy_headers import proxy_headers_middleware
from socket import SocketType
from typing import Any, Sequence, Optional, Tuple, Union
from waitress import trigger
from waitress.adjustments import Adjustments
from waitress.channel import HTTPChannel
from waitress.compat import IPPROTO_IPV6, IPV6_V6ONLY
from waitress.task import Task, ThreadedTaskDispatcher
from waitress.utilities import cleanup_unix_socket
def create_server(
application: Any,
map: Optional[Any] = ...,
_start: bool = ...,
_sock: Optional[SocketType] = ...,
_dispatcher: Optional[ThreadedTaskDispatcher] = ...,
**kw: Any,
) -> Union[MultiSocketServer, BaseWSGIServer]: ...
class MultiSocketServer:
asyncore: Any = ...
adj: Adjustments = ...
map: Any = ...
effective_listen: Sequence[Tuple[str, int]] = ...
task_dispatcher: ThreadedTaskDispatcher = ...
def __init__(
self,
map: Optional[Any] = ...,
adj: Optional[Adjustments] = ...,
effective_listen: Optional[Sequence[Tuple[str, int]]] = ...,
dispatcher: Optional[ThreadedTaskDispatcher] = ...,
) -> None: ...
def print_listen(self, format_str: str) -> None: ...
def run(self) -> None: ...
def close(self) -> None: ...
class BaseWSGIServer(wasyncore.dispatcher):
channel_class: HTTPChannel = ...
next_channel_cleanup: int = ...
socketmod: SocketType = ...
asyncore: Any = ...
sockinfo: Tuple[int, int, int, Tuple[str, int]] = ...
family: int = ...
socktype: int = ...
application: Any = ...
adj: Adjustments = ...
trigger: int = ...
task_dispatcher: ThreadedTaskDispatcher = ...
server_name: str = ...
active_channels: HTTPChannel = ...
def __init__(
self,
application: Any,
map: Optional[Any] = ...,
_start: bool = ...,
_sock: Optional[Any] = ...,
dispatcher: Optional[ThreadedTaskDispatcher] = ...,
adj: Optional[Adjustments] = ...,
sockinfo: Optional[Any] = ...,
bind_socket: bool = ...,
**kw: Any,
) -> None: ...
def bind_server_socket(self) -> None: ...
def get_server_name(self, ip: str) -> str: ...
def getsockname(self) -> Any: ...
accepting: bool = ...
def accept_connections(self) -> None: ...
def add_task(self, task: Task) -> None: ...
def readable(self) -> bool: ...
def writable(self) -> bool: ...
def handle_read(self) -> None: ...
def handle_connect(self) -> None: ...
def handle_accept(self) -> None: ...
def run(self) -> None: ...
def pull_trigger(self) -> None: ...
def set_socket_options(self, conn: Any) -> None: ...
def fix_addr(self, addr: Any) -> Any: ...
def maintenance(self, now: int) -> None: ...
def print_listen(self, format_str: str) -> None: ...
def close(self) -> None: ...
class TcpWSGIServer(BaseWSGIServer):
def bind_server_socket(self) -> None: ...
def getsockname(self) -> Tuple[str, Tuple[str, int]]: ...
def set_socket_options(self, conn: SocketType) -> None: ...
class UnixWSGIServer(BaseWSGIServer):
def __init__(
self,
application: Any,
map: Optional[Any] = ...,
_start: bool = ...,
_sock: Optional[Any] = ...,
dispatcher: Optional[Any] = ...,
adj: Optional[Adjustments] = ...,
sockinfo: Optional[Any] = ...,
**kw: Any,
) -> None: ...
def bind_server_socket(self) -> None: ...
def getsockname(self) -> Tuple[str, Tuple[str, int]]: ...
def fix_addr(self, addr: Any) -> Tuple[str, None]: ...
def get_server_name(self, ip: Any) -> str: ...
WSGIServer: TcpWSGIServer

70
third_party/3/waitress/task.pyi vendored Normal file
View File

@@ -0,0 +1,70 @@
from .buffers import ReadOnlyFileBasedBuffer
from .channel import HTTPChannel
from .compat import reraise, tobytes
from .utilities import Error, build_http_date, logger, queue_logger
from logging import Logger
from threading import Condition, Lock
from typing import Any, Deque, Mapping, Sequence, Optional, Set, Tuple
rename_headers: Mapping[str, str]
hop_by_hop: frozenset
class ThreadedTaskDispatcher:
stop_count: int = ...
active_count: int = ...
logger: Logger = ...
queue_logger: Logger = ...
threads: Set = ...
queue: Deque[Task] = ...
lock: Lock = ...
queue_cv: Condition = ...
thread_exit_cv: Condition = ...
def __init__(self) -> None: ...
def start_new_thread(self, target: Any, args: Any) -> None: ...
def handler_thread(self, thread_no: int) -> None: ...
def set_thread_count(self, count: int) -> None: ...
def add_task(self, task: Task) -> None: ...
def shutdown(self, cancel_pending: bool = ..., timeout: int = ...) -> bool: ...
class Task:
close_on_finish: bool = ...
status: str = ...
wrote_header: bool = ...
start_time: int = ...
content_length: Optional[int] = ...
content_bytes_written: int = ...
logged_write_excess: bool = ...
logged_write_no_body: bool = ...
complete: bool = ...
chunked_response: bool = ...
logger: Logger = ...
channel: HTTPChannel = ...
request: Error = ...
response_headers: Sequence[Tuple[str, str]] = ...
version: str = ...
def __init__(self, channel: HTTPChannel, request: Error) -> None: ...
def service(self) -> None: ...
@property
def has_body(self) -> bool: ...
def build_response_header(self) -> bytes: ...
def remove_content_length_header(self) -> None: ...
def start(self) -> None: ...
def finish(self) -> None: ...
def write(self, data: bytes) -> None: ...
class ErrorTask(Task):
complete: bool = ...
status: str = ...
close_on_finish: bool = ...
content_length: int = ...
def execute(self) -> None: ...
class WSGITask(Task):
environ: Optional[Any] = ...
response_headers: Sequence[Tuple[str, str]] = ...
complete: bool = ...
status: str = ...
content_length: int = ...
close_on_finish: bool = ...
def execute(self) -> None: ...
def get_environment(self) -> Any: ...

30
third_party/3/waitress/trigger.pyi vendored Normal file
View File

@@ -0,0 +1,30 @@
from . import wasyncore as wasyncore
from threading import Lock
from socket import SocketType
import sys
from typing import Callable, Mapping, Optional
from typing_extensions import Literal
class _triggerbase:
kind: Optional[str] = ...
lock: Lock = ...
thunks: Callable[[None], None] = ...
def __init__(self) -> None: ...
def readable(self) -> Literal[True]: ...
def writable(self) -> Literal[False]: ...
def handle_connect(self) -> None: ...
def handle_close(self) -> None: ...
def close(self) -> None: ...
def pull_trigger(self, thunk: Optional[Callable[[None], None]] = ...) -> None: ...
def handle_read(self) -> None: ...
if sys.platform == "linux" or sys.platform == "darwin":
class trigger(_triggerbase, wasyncore.file_dispatcher):
kind: str = ...
def __init__(self, map: Mapping[str, _triggerbase]) -> None: ...
else:
class trigger(_triggerbase, wasyncore.dispatcher):
kind: str = ...
trigger: SocketType = ...
def __init__(self, map: Mapping[str, _triggerbase]) -> None: ...

75
third_party/3/waitress/utilities.pyi vendored Normal file
View File

@@ -0,0 +1,75 @@
from .rfc7230 import OBS_TEXT, VCHAR
from logging import Logger
from typing import Any, Callable, Mapping, Sequence, Match, Pattern, Tuple
logger: Logger
queue_logger: Logger
def find_double_newline(s: bytes) -> int: ...
def concat(*args: Any) -> str: ...
def join(seq: Any, field: str = ...) -> str: ...
def group(s: Any) -> str: ...
short_days: Sequence[str]
long_days: Sequence[str]
short_day_reg: str
long_day_reg: str
daymap: Mapping[str, int]
hms_reg: str
months: Sequence[str]
monmap: Mapping[str, int]
months_reg: str
rfc822_date: str
rfc822_reg: Pattern
def unpack_rfc822(m: Match) -> Tuple[int, int, int, int, int, int, int, int, int]: ...
rfc850_date: str
rfc850_reg: Pattern
def unpack_rfc850(m: Match) -> Tuple[int, int, int, int, int, int, int, int, int]: ...
weekdayname: Sequence[str]
monthname: Sequence[str]
def build_http_date(when: int) -> str: ...
def parse_http_date(d: str) -> int: ...
vchar_re: str
obs_text_re: str
qdtext_re: str
quoted_pair_re: str
quoted_string_re: str
quoted_string: Pattern
quoted_pair: Pattern
def undquote(value: str) -> str: ...
def cleanup_unix_socket(path: str) -> None: ...
class Error:
code: int = ...
reason: str = ...
body: str = ...
def __init__(self, body: str) -> None: ...
def to_response(self) -> Tuple[str, Sequence[Tuple[str, str]], str]: ...
def wsgi_response(self, environ: Any, start_response: Callable[[str, Sequence[Tuple[str, str]]], None]) -> str: ...
class BadRequest(Error):
code: int = ...
reason: str = ...
class RequestHeaderFieldsTooLarge(BadRequest):
code: int = ...
reason: str = ...
class RequestEntityTooLarge(BadRequest):
code: int = ...
reason: str = ...
class InternalServerError(Error):
code: int = ...
reason: str = ...
class ServerNotImplemented(Error):
code: int = ...
reason: str = ...

93
third_party/3/waitress/wasyncore.pyi vendored Normal file
View File

@@ -0,0 +1,93 @@
from . import compat, utilities
from io import BytesIO
from logging import Logger
from socket import SocketType
from typing import Any, Callable, Mapping, Optional, Tuple
socket_map: Mapping[int, SocketType]
map: Mapping[int, SocketType]
class ExitNow(Exception): ...
def read(obj: dispatcher) -> None: ...
def write(obj: dispatcher) -> None: ...
def readwrite(obj: dispatcher, flags: int) -> None: ...
def poll(timeout: float = ..., map: Optional[Mapping[int, SocketType]] = ...) -> None: ...
def poll2(timeout: float = ..., map: Optional[Mapping[int, SocketType]] = ...) -> None: ...
poll3 = poll2
def loop(
timeout: float = ..., use_poll: bool = ..., map: Optional[Mapping[int, SocketType]] = ..., count: Optional[int] = ...
) -> None: ...
def compact_traceback() -> Tuple[Tuple[str, str, str], BaseException, BaseException, str]: ...
class dispatcher:
debug: bool = ...
connected: bool = ...
accepting: bool = ...
connecting: bool = ...
closing: bool = ...
addr: Optional[Tuple[str, int]] = ...
ignore_log_types: frozenset = ...
logger: Logger = ...
compact_traceback: Callable[[], Tuple[Tuple[str, str, str], BaseException, BaseException, str]] = ...
socket: Optional[SocketType] = ...
def __init__(self, sock: Optional[SocketType] = ..., map: Optional[Mapping[int, SocketType]] = ...) -> None: ...
def add_channel(self, map: Optional[Mapping[int, SocketType]] = ...) -> None: ...
def del_channel(self, map: Optional[Mapping[int, SocketType]] = ...) -> None: ...
family_and_type: Tuple[int, int] = ...
def create_socket(self, family: int = ..., type: int = ...) -> None: ...
def set_socket(self, sock: SocketType, map: Optional[Mapping[int, SocketType]] = ...) -> None: ...
def set_reuse_addr(self) -> None: ...
def readable(self) -> bool: ...
def writable(self) -> bool: ...
def listen(self, num: int) -> None: ...
def bind(self, addr: Tuple[str, int]) -> None: ...
def connect(self, address: Tuple[str, int]) -> None: ...
def accept(self) -> Optional[Tuple[SocketType, Tuple[str, int]]]: ...
def send(self, data: bytes) -> int: ...
def recv(self, buffer_size: int) -> bytes: ...
def close(self) -> None: ...
def log(self, message: str) -> None: ...
def log_info(self, message: str, type: str = ...) -> None: ...
def handle_read_event(self) -> None: ...
def handle_connect_event(self) -> None: ...
def handle_write_event(self) -> None: ...
def handle_expt_event(self) -> None: ...
def handle_error(self) -> None: ...
def handle_expt(self) -> None: ...
def handle_read(self) -> None: ...
def handle_write(self) -> None: ...
def handle_connect(self) -> None: ...
def handle_accept(self) -> None: ...
def handle_accepted(self, sock: SocketType, addr: Any) -> None: ...
def handle_close(self) -> None: ...
class dispatcher_with_send(dispatcher):
out_buffer: bytes = ...
def __init__(self, sock: Optional[SocketType] = ..., map: Optional[Mapping[int, SocketType]] = ...) -> None: ...
def initiate_send(self) -> None: ...
handle_write: Callable[[], None] = ...
def writable(self) -> bool: ...
def send(self, data: bytes) -> None: ... # type: ignore
def close_all(map: Optional[Mapping[int, SocketType]] = ..., ignore_all: bool = ...) -> None: ...
class file_wrapper:
fd: BytesIO = ...
def __init__(self, fd: BytesIO) -> None: ...
def __del__(self) -> None: ...
def recv(self, *args: Any) -> bytes: ...
def send(self, *args: Any) -> bytes: ...
def getsockopt(self, level: int, optname: int, buflen: Optional[bool] = ...) -> int: ...
read: Callable[..., bytes] = ...
write: Callable[..., bytes] = ...
def close(self) -> None: ...
def fileno(self) -> BytesIO: ...
class file_dispatcher(dispatcher):
connected: bool = ...
def __init__(self, fd: BytesIO, map: Optional[Mapping[int, SocketType]] = ...) -> None: ...
socket: SocketType = ...
def set_file(self, fd: BytesIO) -> None: ...