Add stubs for gunicorn (#14690)

This commit is contained in:
Eugene Liukin
2025-09-22 13:09:52 +03:00
committed by GitHub
parent fb852f99f0
commit da93072ceb
37 changed files with 2280 additions and 0 deletions
+1
View File
@@ -44,6 +44,7 @@
"stubs/grpcio-reflection/grpc_reflection/v1alpha",
"stubs/grpcio-status/grpc_status",
"stubs/grpcio/grpc/__init__.pyi",
"stubs/gunicorn",
"stubs/hdbcli/hdbcli/dbapi.pyi",
"stubs/html5lib",
"stubs/httplib2",
@@ -0,0 +1,5 @@
# Is not present in runtime. For more details, see the file itself.
gunicorn._types
# .pyi file doesn't exist
gunicorn.__main__
+8
View File
@@ -0,0 +1,8 @@
version = "23.0.*"
upstream_repository = "https://github.com/benoitc/gunicorn"
requires = ["types-gevent"]
[tool.stubtest]
supported_platforms = ["linux", "darwin"]
ci_platforms = ["linux", "darwin"]
stubtest_requirements = ["gevent>=1.4.0", "eventlet>=0.24.1,!=0.36.0", "tornado>=0.2", "setproctitle", "PasteDeploy", "inotify"]
+4
View File
@@ -0,0 +1,4 @@
version_info: tuple[int, int, int]
__version__: str
SERVER: str
SERVER_SOFTWARE: str
+23
View File
@@ -0,0 +1,23 @@
### This .pyi file is a helper for centralized storage types that are reused across different runtime modules. ###
from _typeshed import FileDescriptor
from collections.abc import Awaitable, Callable, Iterable, MutableMapping
from typing import Any
from typing_extensions import LiteralString, TypeAlias
_StatusType: TypeAlias = str
_HeadersType: TypeAlias = Iterable[tuple[str, str]]
_EnvironType: TypeAlias = MutableMapping[str, Any] # See https://peps.python.org/pep-0333/
_StartResponseType: TypeAlias = Callable[[_StatusType, _HeadersType], None]
_ResponseBodyType: TypeAlias = Iterable[bytes]
_WSGIAppType: TypeAlias = Callable[[_EnvironType, _StartResponseType], _ResponseBodyType] # noqa: Y047
_ScopeType: TypeAlias = MutableMapping[str, Any]
_MessageType: TypeAlias = MutableMapping[str, Any]
_ReceiveType: TypeAlias = Callable[[], Awaitable[_MessageType]]
_SendType: TypeAlias = Callable[[_MessageType], Awaitable[None]]
_ASGIAppType: TypeAlias = Callable[[_ScopeType, _ReceiveType, _SendType], Awaitable[None]] # noqa: Y047
_UnixSocketPathType: TypeAlias = str
_TcpAddressType: TypeAlias = tuple[LiteralString, int] # noqa: Y047
_AddressType: TypeAlias = _UnixSocketPathType | FileDescriptor | _TcpAddressType # noqa: Y047
+36
View File
@@ -0,0 +1,36 @@
from argparse import ArgumentParser, Namespace
from typing import Any
from typing_extensions import override
from gunicorn.config import Config
from gunicorn.glogging import Logger as GLogger
from .._types import _ASGIAppType, _WSGIAppType
class BaseApplication:
usage: str | None
cfg: Config
callable: _WSGIAppType | _ASGIAppType | None
prog: str | None
logger: GLogger
def __init__(self, usage: str | None = None, prog: str | None = None) -> None: ...
def do_load_config(self) -> None: ...
def load_default_config(self) -> None: ...
def init(self, parser: ArgumentParser, opts: Namespace, args: list[str]) -> dict[str, Any] | None: ...
def load(self) -> _WSGIAppType | _ASGIAppType: ...
def load_config(self) -> None: ...
def reload(self) -> None: ...
def wsgi(self) -> _WSGIAppType | _ASGIAppType: ...
def run(self) -> None: ...
class Application(BaseApplication):
def chdir(self) -> None: ...
def get_config_from_filename(self, filename: str) -> dict[str, Any]: ...
def get_config_from_module_name(self, module_name: str) -> dict[str, Any]: ...
def load_config_from_module_name_or_filename(self, location: str) -> dict[str, Any]: ...
def load_config_from_file(self, filename: str) -> dict[str, Any]: ...
@override
def load_config(self) -> None: ...
@override
def run(self) -> None: ...
@@ -0,0 +1,7 @@
from typing import Any
from .._types import _WSGIAppType
def get_wsgi_app(config_uri: str, name: str | None = None, defaults: dict[str, Any] | None = None) -> _WSGIAppType: ...
def has_logging_config(config_file: str) -> bool: ...
def serve(app: _WSGIAppType, global_conf: dict[str, Any], **local_conf: Any) -> None: ...
+20
View File
@@ -0,0 +1,20 @@
from argparse import ArgumentParser, Namespace
from typing_extensions import override
from gunicorn.app.base import Application
from .._types import _WSGIAppType
class WSGIApplication(Application):
app_uri: str | None
@override
def init(self, parser: ArgumentParser, opts: Namespace, args: list[str]) -> None: ...
@override
def load_config(self) -> None: ...
def load_wsgiapp(self) -> _WSGIAppType: ...
def load_pasteapp(self) -> _WSGIAppType: ...
@override
def load(self) -> _WSGIAppType: ...
def run(prog: str | None = None) -> None: ...
+68
View File
@@ -0,0 +1,68 @@
import socket
from types import FrameType
from typing import ClassVar
from gunicorn.app.base import BaseApplication
from gunicorn.config import Config
from gunicorn.glogging import Logger as GLogger
from gunicorn.workers.base import Worker
from ._types import _AddressType
from .pidfile import Pidfile
class Arbiter:
WORKER_BOOT_ERROR: ClassVar[int]
APP_LOAD_ERROR: ClassVar[int]
START_CTX: ClassVar[dict[int | str, str | list[str]]]
LISTENERS: ClassVar[list[socket.socket]]
WORKERS: ClassVar[dict[int, Worker]]
PIPE: ClassVar[list[int]]
SIG_QUEUE: ClassVar[list[int]]
SIGNALS: ClassVar[list[int]]
SIG_NAMES: ClassVar[dict[int, str]]
log: GLogger | None
pidfile: Pidfile | None
systemd: bool
worker_age: int
reexec_pid: int
master_pid: int
master_name: str
pid: int
app: BaseApplication
cfg: Config
worker_class: type[Worker]
address: list[_AddressType]
timeout: int
proc_name: str
num_workers: int
def __init__(self, app: BaseApplication) -> None: ...
def setup(self, app: BaseApplication) -> None: ...
def start(self) -> None: ...
def init_signals(self) -> None: ...
def signal(self, sig: int, frame: FrameType | None) -> None: ...
def run(self) -> None: ...
def handle_chld(self, sig: int, frame: FrameType | None) -> None: ...
def handle_hup(self) -> None: ...
def handle_term(self) -> None: ...
def handle_int(self) -> None: ...
def handle_quit(self) -> None: ...
def handle_ttin(self) -> None: ...
def handle_ttou(self) -> None: ...
def handle_usr1(self) -> None: ...
def handle_usr2(self) -> None: ...
def handle_winch(self) -> None: ...
def maybe_promote_master(self) -> None: ...
def wakeup(self) -> None: ...
def halt(self, reason: str | None = None, exit_status: int = 0) -> None: ...
def sleep(self) -> None: ...
def stop(self, graceful: bool = True) -> None: ...
def reexec(self) -> None: ...
def reload(self) -> None: ...
def murder_workers(self) -> None: ...
def reap_workers(self) -> None: ...
def manage_workers(self) -> None: ...
def spawn_worker(self) -> int: ...
def spawn_workers(self) -> None: ...
def kill_workers(self, sig: int) -> None: ...
def kill_worker(self, pid: int, sig: int) -> None: ...
File diff suppressed because it is too large Load Diff
+18
View File
@@ -0,0 +1,18 @@
__all__ = ["spew", "unspew"]
from collections.abc import Container
from types import FrameType
from typing import Any
from typing_extensions import Self
class Spew:
trace_names: Container[str] | None = None
show_values: bool
def __init__(self, trace_names: Container[str] | None = None, show_values: bool = True) -> None: ...
def __call__(
self, frame: FrameType, event: str, arg: Any # `arg` is not used inside the function, stub is set Any
) -> Self: ...
def spew(trace_names: Container[str] | None = None, show_values: bool = False) -> None: ...
def unspew() -> None: ...
+8
View File
@@ -0,0 +1,8 @@
class HaltServer(BaseException):
reason: str
exit_status: int
def __init__(self, reason: str, exit_status: int = 1) -> None: ...
class ConfigError(Exception): ...
class AppImportError(Exception): ...
+158
View File
@@ -0,0 +1,158 @@
import logging
import threading
from collections.abc import Mapping
from datetime import timedelta
from logging.config import _DictConfigArgs
from socket import SocketKind
from typing import Annotated, Any, ClassVar, Literal, TypedDict, type_check_only
from typing_extensions import TypeAlias, override
from gunicorn.http import Request
from gunicorn.http.wsgi import Response
from ._types import _EnvironType
from .config import Config
SYSLOG_FACILITIES: dict[str, int]
@type_check_only
class _AtomsDict(TypedDict, total=False):
h: str
l: str
u: str
t: str
r: str
s: str
m: str | None
U: str | None
q: str | None
H: str | None
b: str
B: int | None
f: str
a: str
T: int
D: int
M: int
L: str
p: str
_CriticalIntType: TypeAlias = Annotated[int, "50"]
_ErrorIntType: TypeAlias = Annotated[int, "40"]
_WarningIntType: TypeAlias = Annotated[int, "30"]
_InfoIntType: TypeAlias = Annotated[int, "20"]
_DebugIntType: TypeAlias = Annotated[int, "10"]
_LogLevelIntType: TypeAlias = _CriticalIntType | _ErrorIntType | _WarningIntType | _InfoIntType | _DebugIntType
_LogLevelStrType: TypeAlias = Literal["critical", "error", "warning", "info", "debug"]
_LogLevelType: TypeAlias = _LogLevelIntType | _LogLevelStrType
CONFIG_DEFAULTS: _DictConfigArgs
def loggers() -> list[logging.Logger]: ...
class SafeAtoms(dict[str, Any]):
def __init__(self, atoms: dict[str, Any]) -> None: ...
@override
def __getitem__(self, k: str) -> str: ...
_SyslogAddressType: TypeAlias = (
tuple[Literal[SocketKind.SOCK_DGRAM] | None, str] # Unix Socket
| tuple[Literal[SocketKind.SOCK_DGRAM, SocketKind.SOCK_STREAM], tuple[str, int]] # TCP/UDP Socket
)
def parse_syslog_address(addr: str) -> _SyslogAddressType: ...
@type_check_only
class _LogLevels(TypedDict):
critical: _CriticalIntType
error: _ErrorIntType
warning: _WarningIntType
info: _InfoIntType
debug: _DebugIntType
class Logger:
LOG_LEVELS: ClassVar[_LogLevels]
loglevel: ClassVar[_LogLevelIntType]
error_fmt: ClassVar[str]
datefmt: ClassVar[str]
access_fmt: ClassVar[str]
syslog_fmt: ClassVar[str]
atoms_wrapper_class: ClassVar[type[SafeAtoms]]
error_log: logging.Logger
access_log: logging.Logger
error_handlers: list[logging.Handler]
access_handlers: list[logging.Handler]
logfile: Any | None
lock: threading.Lock
cfg: Config
def __init__(self, cfg: Config) -> None: ...
def setup(self, cfg: Config) -> None: ...
def critical(
self,
msg: object,
*args: object,
exc_info: logging._ExcInfoType = None,
stack_info: bool = False,
stacklevel: int = 1,
extra: Mapping[str, object] | None = None,
) -> None: ...
def error(
self,
msg: object,
*args: object,
exc_info: logging._ExcInfoType = None,
stack_info: bool = False,
stacklevel: int = 1,
extra: Mapping[str, object] | None = None,
) -> None: ...
def warning(
self,
msg: object,
*args: object,
exc_info: logging._ExcInfoType = None,
stack_info: bool = False,
stacklevel: int = 1,
extra: Mapping[str, object] | None = None,
) -> None: ...
def info(
self,
msg: object,
*args: object,
exc_info: logging._ExcInfoType = None,
stack_info: bool = False,
stacklevel: int = 1,
extra: Mapping[str, object] | None = None,
) -> None: ...
def debug(
self,
msg: object,
*args: object,
exc_info: logging._ExcInfoType = None,
stack_info: bool = False,
stacklevel: int = 1,
extra: Mapping[str, object] | None = None,
) -> None: ...
def exception(
self,
msg: object,
*args: object,
exc_info: logging._ExcInfoType = True,
stack_info: bool = False,
stacklevel: int = 1,
extra: Mapping[str, object] | None = None,
) -> None: ...
def log(
self,
lvl: _LogLevelType,
msg: object,
*args: object,
exc_info: logging._ExcInfoType = None,
stack_info: bool = False,
stacklevel: int = 1,
extra: Mapping[str, object] | None = None,
) -> None: ...
def atoms(self, resp: Response, req: Request, environ: _EnvironType, request_time: timedelta) -> _AtomsDict: ...
def access(self, resp: Response, req: Request, environ: _EnvironType, request_time: timedelta) -> None: ...
def now(self) -> str: ...
def reopen_files(self) -> None: ...
def close_on_exec(self) -> None: ...
@@ -0,0 +1,4 @@
from gunicorn.http.message import Message as Message, Request as Request
from gunicorn.http.parser import RequestParser as RequestParser
__all__ = ["Message", "Request", "RequestParser"]
+48
View File
@@ -0,0 +1,48 @@
import io
from collections.abc import Callable, Generator, Iterator
from typing_extensions import TypeAlias
from gunicorn.http.message import Request
from gunicorn.http.unreader import Unreader
class ChunkedReader:
req: Request
parser: Generator[bytes, None, None] | None
buf: io.BytesIO
def __init__(self, req: Request, unreader: Unreader) -> None: ...
def read(self, size: int) -> bytes: ...
def parse_trailers(self, unreader: Unreader, data: bytes) -> None: ...
def parse_chunked(self, unreader: Unreader) -> Generator[bytes]: ...
def parse_chunk_size(self, unreader: Unreader, data: bytes | None = None) -> tuple[int, bytes | None]: ...
def get_data(self, unreader: Unreader, buf: io.BytesIO) -> None: ...
class LengthReader:
unreader: Unreader
length: int
def __init__(self, unreader: Unreader, length: int) -> None: ...
def read(self, size: int) -> bytes: ...
class EOFReader:
unreader: Unreader
buf: io.BytesIO
finished: bool
def __init__(self, unreader: Unreader) -> None: ...
def read(self, size: int) -> bytes: ...
_ReaderType: TypeAlias = ChunkedReader | LengthReader | EOFReader
class Body:
reader: _ReaderType
buf: io.BytesIO
def __init__(self, reader: _ReaderType) -> None: ...
def __iter__(self) -> Iterator[bytes]: ...
def __next__(self) -> bytes: ...
next: Callable[[Body], bytes]
def getsize(self, size: int | None) -> int: ...
def read(self, size: int | None = None) -> bytes: ...
def readline(self, size: int | None = None) -> bytes: ...
def readlines(self, size: int | None = None) -> list[bytes]: ...
+89
View File
@@ -0,0 +1,89 @@
from typing_extensions import Buffer
from gunicorn.http import Message
class ParseException(Exception): ...
class NoMoreData(IOError):
buf: Buffer
def __init__(self, buf: Buffer | None = None) -> None: ...
class ConfigurationProblem(ParseException):
info: str
code: int
def __init__(self, info: str) -> None: ...
class InvalidRequestLine(ParseException):
req: str
code: int
def __init__(self, req: str) -> None: ...
class InvalidRequestMethod(ParseException):
method: str
def __init__(self, method: str) -> None: ...
class InvalidHTTPVersion(ParseException):
version: str | tuple[int, int]
def __init__(self, version: str | tuple[int, int]) -> None: ...
class InvalidHeader(ParseException):
hdr: str
req: Message | None
def __init__(self, hdr: str, req: Message | None = None) -> None: ...
class ObsoleteFolding(ParseException):
hdr: str
def __init__(self, hdr: str) -> None: ...
class InvalidHeaderName(ParseException):
hdr: str
def __init__(self, hdr: str) -> None: ...
class UnsupportedTransferCoding(ParseException):
hdr: str
code: int
def __init__(self, hdr: str) -> None: ...
class InvalidChunkSize(IOError):
data: bytes
def __init__(self, data: bytes) -> None: ...
class ChunkMissingTerminator(IOError):
term: bytes
def __init__(self, term: bytes) -> None: ...
class LimitRequestLine(ParseException):
size: int
max_size: int
def __init__(self, size: int, max_size: int) -> None: ...
class LimitRequestHeaders(ParseException):
msg: str
def __init__(self, msg: str) -> None: ...
class InvalidProxyLine(ParseException):
line: str
code: int
def __init__(self, line: str) -> None: ...
class ForbiddenProxyRequest(ParseException):
host: str
code: int
def __init__(self, host: str) -> None: ...
class InvalidSchemeHeaders(ParseException): ...
+62
View File
@@ -0,0 +1,62 @@
import io
import re
from typing_extensions import override
from gunicorn.config import Config
from gunicorn.http.body import Body
from gunicorn.http.unreader import Unreader
from .._types import _AddressType
MAX_REQUEST_LINE: int
MAX_HEADERS: int
DEFAULT_MAX_HEADERFIELD_SIZE: int
RFC9110_5_6_2_TOKEN_SPECIALS: str
TOKEN_RE: re.Pattern[str]
METHOD_BADCHAR_RE: re.Pattern[str]
VERSION_RE: re.Pattern[str]
RFC9110_5_5_INVALID_AND_DANGEROUS: re.Pattern[str]
class Message:
cfg: Config
unreader: Unreader
peer_addr: _AddressType
remote_addr: _AddressType
version: tuple[int, int] | None
headers: list[tuple[str, str]]
trailers: list[tuple[str, str]]
body: Body | None
scheme: str
must_close: bool
limit_request_fields: int
limit_request_field_size: int
max_buffer_headers: int
def __init__(self, cfg: Config, unreader: Unreader, peer_addr: _AddressType) -> None: ...
def force_close(self) -> None: ...
def parse(self, unreader: Unreader) -> bytes: ...
def parse_headers(self, data: bytes, from_trailer: bool = False) -> list[tuple[str, str]]: ...
def set_body_reader(self) -> None: ...
def should_close(self) -> bool: ...
class Request(Message):
method: str | None
uri: str | None
path: str | None
query: str | None
fragment: str | None
limit_request_line: int
req_number: int
proxy_protocol_info: dict[str, str | int] | None
def __init__(self, cfg: Config, unreader: Unreader, peer_addr: _AddressType, req_number: int = 1) -> None: ...
def get_data(self, unreader: Unreader, buf: io.BytesIO, stop: bool = False) -> None: ...
@override
def parse(self, unreader: Unreader) -> bytes: ...
def read_line(self, unreader: Unreader, buf: io.BytesIO, limit: int = 0) -> tuple[bytes, bytes]: ...
def proxy_protocol(self, line: str) -> bool: ...
def proxy_protocol_access_check(self) -> None: ...
def parse_proxy_protocol(self, line: str) -> None: ...
def parse_request_line(self, line_bytes: bytes) -> None: ...
@override
def set_body_reader(self) -> None: ...
+26
View File
@@ -0,0 +1,26 @@
import socket
from collections.abc import Callable, Iterable, Iterator
from typing import ClassVar
from gunicorn.config import Config
from gunicorn.http.message import Request
from gunicorn.http.unreader import Unreader
from .._types import _AddressType
class Parser:
mesg_class: ClassVar[type[Request] | None]
cfg: Config
unreader: Unreader
mesg: Request | None
source_addr: _AddressType
req_count: int
def __init__(self, cfg: Config, source: socket.socket | Iterable[bytes], source_addr: _AddressType) -> None: ...
def __iter__(self) -> Iterator[Request]: ...
def __next__(self) -> Request: ...
next: Callable[[Parser], Request]
class RequestParser(Parser):
mesg_class: ClassVar[type[Request]]
+28
View File
@@ -0,0 +1,28 @@
import io
import socket
from _typeshed import ReadableBuffer
from collections.abc import Iterable, Iterator
from typing_extensions import override
class Unreader:
buf: io.BytesIO
def __init__(self) -> None: ...
def chunk(self) -> bytes: ...
def read(self, size: int | None = None) -> bytes: ...
def unread(self, data: ReadableBuffer) -> None: ...
class SocketUnreader(Unreader):
sock: socket.socket
mxchunk: int
def __init__(self, sock: socket.socket, max_chunk: int = 8192) -> None: ...
@override
def chunk(self) -> bytes: ...
class IterUnreader(Unreader):
iter: Iterator[bytes] | None
def __init__(self, iterable: Iterable[bytes]) -> None: ...
@override
def chunk(self) -> bytes: ...
+70
View File
@@ -0,0 +1,70 @@
import io
import logging
import re
import socket
from _typeshed import ReadableBuffer
from collections.abc import Callable
from typing import Any
from typing_extensions import override
from gunicorn.config import Config
from gunicorn.http import Request
from .._types import _AddressType, _EnvironType, _HeadersType, _StatusType
BLKSIZE: int
HEADER_VALUE_RE: re.Pattern[str]
log: logging.Logger
class FileWrapper:
filelike: io.IOBase
blksize: int
close: Callable[[], None] | None
def __init__(self, filelike: io.IOBase, blksize: int = 8192) -> None: ...
def __getitem__(self, key: Any) -> bytes: ...
class WSGIErrorsWrapper(io.RawIOBase):
streams: list[io.TextIOBase]
def __init__(self, cfg: Config) -> None: ...
@override
def write(self, data: ReadableBuffer) -> None: ...
def base_environ(cfg: Config) -> _EnvironType: ...
def default_environ(req: Request, sock: socket.socket, cfg: Config) -> _EnvironType: ...
def proxy_environ(req: Request) -> _EnvironType: ...
def create(
req: Request, sock: socket.socket, client: _AddressType, server: _AddressType, cfg: Config
) -> tuple[Response, _EnvironType]: ...
class Response:
req: Request
sock: socket.socket
version: str
status: str | None
chunked: bool
must_close: bool
headers: _HeadersType
headers_sent: bool
response_length: int | None
sent: int
upgrade: bool
cfg: Config
status_code: int | None
def __init__(self, req: Request, sock: socket.socket, cfg: Config) -> None: ...
def force_close(self) -> None: ...
def should_close(self) -> bool: ...
def start_response(
self, status: _StatusType, headers: _HeadersType, exc_info: tuple[type, BaseException, Any] | None = None
) -> Callable[[bytes], None]: ...
def process_headers(self, headers: _HeadersType) -> None: ...
def is_chunked(self) -> bool: ...
def default_headers(self) -> list[str]: ...
def send_headers(self) -> None: ...
def write(self, arg: bytes) -> None: ...
def can_sendfile(self) -> bool: ...
def sendfile(self, respiter: FileWrapper) -> bool: ...
def write_file(self, respiter: FileWrapper) -> None: ...
def close(self) -> None: ...
@@ -0,0 +1,105 @@
import logging
import socket
from collections.abc import Mapping
from datetime import timedelta
from typing_extensions import override
from gunicorn.config import Config
from gunicorn.glogging import Logger
from gunicorn.http import Request
from gunicorn.http.wsgi import Response
from .._types import _EnvironType
from ..glogging import _LogLevelType
METRIC_VAR: str
VALUE_VAR: str
MTYPE_VAR: str
GAUGE_TYPE: str
COUNTER_TYPE: str
HISTOGRAM_TYPE: str
class Statsd(Logger):
prefix: str
sock: socket.socket | None
dogstatsd_tags: str | None
cfg: Config
def __init__(self, cfg: Config) -> None: ...
@override
def critical(
self,
msg: object,
*args: object,
exc_info: logging._ExcInfoType = None,
stack_info: bool = False,
stacklevel: int = 1,
extra: Mapping[str, object] | None = None,
) -> None: ...
@override
def error(
self,
msg: object,
*args: object,
exc_info: logging._ExcInfoType = None,
stack_info: bool = False,
stacklevel: int = 1,
extra: Mapping[str, object] | None = None,
) -> None: ...
@override
def warning(
self,
msg: object,
*args: object,
exc_info: logging._ExcInfoType = None,
stack_info: bool = False,
stacklevel: int = 1,
extra: Mapping[str, object] | None = None,
) -> None: ...
@override
def info(
self,
msg: object,
*args: object,
exc_info: logging._ExcInfoType = None,
stack_info: bool = False,
stacklevel: int = 1,
extra: Mapping[str, object] | None = None,
) -> None: ...
@override
def debug(
self,
msg: object,
*args: object,
exc_info: logging._ExcInfoType = None,
stack_info: bool = False,
stacklevel: int = 1,
extra: Mapping[str, object] | None = None,
) -> None: ...
@override
def exception(
self,
msg: object,
*args: object,
exc_info: logging._ExcInfoType = True,
stack_info: bool = False,
stacklevel: int = 1,
extra: Mapping[str, object] | None = None,
) -> None: ...
@override
def log(
self,
lvl: _LogLevelType,
msg: object,
*args: object,
exc_info: logging._ExcInfoType = None,
stack_info: bool = False,
stacklevel: int = 1,
extra: Mapping[str, object] | None = None,
) -> None: ...
@override
def access(self, resp: Response, req: Request, environ: _EnvironType, request_time: timedelta) -> None: ...
def gauge(self, name: str, value: float) -> None: ...
def increment(self, name: str, value: int, sampling_rate: float = 1.0) -> None: ...
def decrement(self, name: str, value: int, sampling_rate: float = 1.0) -> None: ...
def histogram(self, name: str, value: float) -> None: ...
+8
View File
@@ -0,0 +1,8 @@
from _typeshed import StrOrBytesPath
class Pidfile:
def __init__(self, fname: StrOrBytesPath) -> None: ...
def create(self, pid: int) -> None: ...
def rename(self, path: StrOrBytesPath) -> None: ...
def unlink(self) -> None: ...
def validate(self) -> None: ...
+50
View File
@@ -0,0 +1,50 @@
import sys
import threading
from collections.abc import Callable, Iterable
from re import Pattern
from typing import NoReturn, TypedDict, type_check_only
from typing_extensions import TypeAlias, override
COMPILED_EXT_RE: Pattern[str]
class Reloader(threading.Thread):
daemon: bool
def __init__(
self, extra_files: Iterable[str] | None = None, interval: int = 1, callback: Callable[[str], None] | None = None
) -> None: ...
def add_extra_file(self, filename: str) -> None: ...
def get_files(self) -> list[str]: ...
@override
def run(self) -> None: ...
has_inotify: bool
if sys.platform == "linux":
class InotifyReloader(threading.Thread):
event_mask: int
daemon: bool
def __init__(self, extra_files: Iterable[str] | None = None, callback: Callable[[str], None] | None = None) -> None: ...
def add_extra_file(self, filename: str) -> None: ...
def get_dirs(self) -> set[str]: ...
@override
def run(self) -> None: ...
else:
class InotifyReloader:
def __init__(
self, extra_files: Iterable[str] | None = None, callback: Callable[[str], None] | None = None
) -> NoReturn: ...
_PreferredReloaderType: TypeAlias = type[InotifyReloader | Reloader]
_ReloaderType: TypeAlias = InotifyReloader | Reloader # noqa: Y047
@type_check_only
class _ReloadedEngines(TypedDict):
auto: _PreferredReloaderType
pool: type[Reloader]
inotify: type[InotifyReloader]
preferred_reloader: _PreferredReloaderType
reloader_engines: _ReloadedEngines
+41
View File
@@ -0,0 +1,41 @@
import socket
import sys
from collections.abc import Iterable
from ssl import SSLContext, SSLSocket
from typing import Any, ClassVar, Literal, SupportsIndex
from typing_extensions import override
from gunicorn.glogging import Logger as GLogger
from .config import Config
class BaseSocket:
def __init__(self, address: str, conf: Config, log: GLogger, fd: SupportsIndex | None = None) -> None: ...
def __getattr__(self, name: str) -> Any: ...
def set_options(self, sock: socket.socket, bound: bool = False) -> socket.socket: ...
def bind(self, sock: socket.socket) -> None: ...
def close(self) -> None: ...
class TCPSocket(BaseSocket):
FAMILY: ClassVar[Literal[socket.AddressFamily.AF_INET, socket.AddressFamily.AF_INET6]]
@override
def set_options(self, sock: socket.socket, bound: bool = False) -> socket.socket: ...
class TCP6Socket(TCPSocket):
FAMILY: ClassVar[Literal[socket.AddressFamily.AF_INET6]]
class UnixSocket(BaseSocket):
if sys.platform != "win32":
FAMILY: ClassVar[Literal[socket.AddressFamily.AF_UNIX]]
else:
FAMILY: ClassVar[Literal[0]] # Stub for windows
def __init__(self, addr: str, conf: Config, log: GLogger, fd: SupportsIndex | None = None) -> None: ...
@override
def bind(self, sock: socket.socket) -> None: ...
def create_sockets(conf: Config, log: GLogger, fds: Iterable[SupportsIndex] | None = None) -> list[BaseSocket]: ...
def close_sockets(listeners: Iterable[socket.socket], unlink: bool = True) -> None: ...
def ssl_context(conf: Config) -> SSLContext: ...
def ssl_wrap_socket(sock: socket.socket, conf: Config) -> SSLSocket: ...
+6
View File
@@ -0,0 +1,6 @@
from gunicorn.glogging import Logger as GLogger
SD_LISTEN_FDS_START: int
def listen_fds(unset_environment: bool = True) -> int: ...
def sd_notify(state: str, logger: GLogger, unset_environment: bool = False) -> None: ...
+48
View File
@@ -0,0 +1,48 @@
import types
from _typeshed import FileDescriptorLike, FileDescriptorOrPath, HasFileno, StrOrBytesPath
from inspect import _IntrospectableCallable, _ParameterKind
from socket import socket
from typing import Any, Literal, NoReturn
from urllib.parse import SplitResult
from ._types import _AddressType, _WSGIAppType
REDIRECT_TO: str
hop_headers: set[str]
def load_entry_point(distribution: str, group: str, name: str) -> type[object]: ...
def load_class(
uri: str | object, default: str = "gunicorn.workers.sync.SyncWorker", section: str = "gunicorn.workers"
) -> type[Any]: ...
positionals: tuple[Literal[_ParameterKind.POSITIONAL_ONLY], Literal[_ParameterKind.POSITIONAL_OR_KEYWORD]]
def get_arity(f: _IntrospectableCallable) -> int: ...
def get_username(uid: int) -> str: ...
def set_owner_process(uid: int, gid: int, initgroups: bool = False) -> None: ...
def chown(path: FileDescriptorOrPath, uid: int, gid: int) -> None: ...
def unlink(filename: StrOrBytesPath) -> None: ...
def is_ipv6(addr: str) -> bool: ...
def parse_address(netloc: str, default_port: str = "8000") -> _AddressType: ...
def close_on_exec(fd: FileDescriptorLike) -> None: ...
def set_non_blocking(fd: FileDescriptorLike) -> None: ...
def close(sock: socket) -> None: ...
def write_chunk(sock: socket, data: bytes) -> None: ...
def write(sock: socket, data: bytes, chunked: bool = False) -> None: ...
def write_nonblock(sock: socket, data: bytes, chunked: bool = False) -> None: ...
def write_error(sock: socket, status_int: int, reason: str, mesg: str) -> None: ...
def import_app(module: str) -> _WSGIAppType: ...
def getcwd() -> str: ...
def http_date(timestamp: float | None = None) -> str: ...
def is_hoppish(header: str) -> bool: ...
def daemonize(enable_stdio_inheritance: bool = False) -> None: ...
def seed() -> None: ...
def check_is_writable(path: FileDescriptorOrPath) -> None: ...
def to_bytestring(value: str | bytes, encoding: str = "utf8") -> bytes: ...
def has_fileno(obj: HasFileno) -> bool: ...
def warn(msg: str) -> None: ...
def make_fail_app(msg: str) -> _WSGIAppType: ...
def split_request_uri(uri: str) -> SplitResult: ...
def reraise(tp: type[BaseException], value: BaseException | None, tb: types.TracebackType | None = None) -> NoReturn: ...
def bytes_to_str(b: bytes) -> str: ...
def unquote_to_wsgi_str(string: str) -> str: ...
@@ -0,0 +1,13 @@
from typing import TypedDict, type_check_only
@type_check_only
class _SupportedWorkers(TypedDict):
sync: str
eventlet: str
gevent: str
gevent_wsgi: str
gevent_pywsgi: str
tornado: str
gthread: str
SUPPORTED_WORKERS: _SupportedWorkers
+48
View File
@@ -0,0 +1,48 @@
import socket
from types import FrameType
from typing import ClassVar
from gunicorn.app.base import BaseApplication
from gunicorn.config import Config
from gunicorn.glogging import Logger as GLogger
from gunicorn.http import Request
from gunicorn.workers.workertmp import WorkerTmp
from .._types import _AddressType, _WSGIAppType
from ..reloader import _ReloaderType
class Worker:
SIGNALS: ClassVar[list[int]]
PIPE: ClassVar[list[int]]
age: int
pid: str
ppid: int
sockets: list[socket.socket]
app: BaseApplication
timeout: int
cfg: Config
booted: bool
aborted: bool
reloader: _ReloaderType | None
nr: int
max_requests: int
alive: bool
log: GLogger
tmp: WorkerTmp
wait_fds: list[socket.socket | int]
wsgi: _WSGIAppType
def __init__(
self, age: int, ppid: int, sockets: list[socket.socket], app: BaseApplication, timeout: int, cfg: Config, log: GLogger
) -> None: ...
def notify(self) -> None: ...
def run(self) -> None: ...
def init_process(self) -> None: ...
def load_wsgi(self) -> None: ...
def init_signals(self) -> None: ...
def handle_usr1(self, sig: int, frame: FrameType | None) -> None: ...
def handle_exit(self, sig: int, frame: FrameType | None) -> None: ...
def handle_quit(self, sig: int, frame: FrameType | None) -> None: ...
def handle_abort(self, sig: int, frame: FrameType | None) -> None: ...
def handle_error(self, req: Request | None, client: socket.socket, addr: _AddressType, exc: BaseException) -> None: ...
def handle_winch(self, sig: int, fname: str | None) -> None: ...
@@ -0,0 +1,17 @@
import socket
from gunicorn.http import Request
from gunicorn.workers import base
from .._types import _AddressType
ALREADY_HANDLED: object
class AsyncWorker(base.Worker):
worker_connections: int
alive: bool
def timeout_ctx(self) -> None: ...
def is_already_handled(self, respiter: object) -> bool: ...
def handle(self, listener: socket.socket, client: socket.socket, addr: _AddressType) -> None: ...
def handle_request(self, listener_name: str, req: Request, sock: socket.socket, addr: _AddressType) -> bool: ...
@@ -0,0 +1,31 @@
from types import FrameType
from typing import Any
from typing_extensions import TypeAlias, override
from gunicorn.workers.base_async import AsyncWorker
from .._types import _AddressType
GreenSocket: TypeAlias = Any # eventlet GreenSocket class
EVENTLET_WSGI_LOCAL: Any # eventlet local instance
EVENTLET_ALREADY_HANDLED: bool | None
def patch_sendfile() -> None: ...
class EventletWorker(AsyncWorker):
def patch(self) -> None: ...
@override
def is_already_handled(self, respiter: object) -> bool: ...
@override
def init_process(self) -> None: ...
@override
def handle_quit(self, sig: int, frame: FrameType | None) -> None: ...
@override
def handle_usr1(self, sig: int, frame: FrameType | None) -> None: ...
@override
def timeout_ctx(self) -> None: ...
@override
def handle(self, listener: GreenSocket, client: GreenSocket, addr: _AddressType) -> None: ...
@override
def run(self) -> None: ...
@@ -0,0 +1,54 @@
from types import FrameType
from typing import Any, ClassVar
from typing_extensions import override
from gevent import pywsgi
from gevent.pywsgi import WSGIHandler
from gevent.server import StreamServer
from gevent.socket import socket as GeventSocket
from gunicorn.http import Request
from gunicorn.workers.base_async import AsyncWorker
from .._types import _AddressType
VERSION: str
class GeventWorker(AsyncWorker):
server_class: ClassVar[type[StreamServer] | None]
wsgi_handler: ClassVar[type[WSGIHandler] | None]
sockets: list[GeventSocket]
def patch(self) -> None: ...
@override
def notify(self) -> None: ...
@override
def timeout_ctx(self) -> None: ...
@override
def run(self) -> None: ...
@override
def handle(self, listener: GeventSocket, client: GeventSocket, addr: _AddressType) -> None: ...
@override
def handle_request(self, listener_name: str, req: Request, sock: GeventSocket, addr: _AddressType) -> bool: ...
@override
def handle_quit(self, sig: int, frame: FrameType | None) -> None: ...
@override
def handle_usr1(self, sig: int, frame: FrameType | None) -> None: ...
@override
def init_process(self) -> None: ...
class GeventResponse:
status: ClassVar[str | None]
headers: ClassVar[dict[str, str] | None]
sent: ClassVar[int | None]
def __init__(self, status: str, headers: dict[str, str], clength: int | None) -> None: ...
class PyWSGIHandler(pywsgi.WSGIHandler):
def log_request(self) -> None: ...
def get_environ(self) -> dict[str, Any]: ...
class PyWSGIServer(pywsgi.WSGIServer): ...
class GeventPyWSGIWorker(GeventWorker):
server_class: ClassVar[type[PyWSGIServer] | None]
wsgi_handler: ClassVar[type[PyWSGIHandler] | None]
@@ -0,0 +1,50 @@
import socket
from collections import deque
from concurrent.futures import Future, ThreadPoolExecutor
from selectors import DefaultSelector
from types import FrameType
from gunicorn.config import Config
from gunicorn.glogging import Logger as GLogger
from gunicorn.http import RequestParser
from .._types import _AddressType
from . import base
class TConn:
cfg: Config
sock: socket.socket
client: _AddressType
server: _AddressType
timeout: float | None
parser: RequestParser | None
initialized: bool
def __init__(self, cfg: Config, sock: socket.socket, client: _AddressType, server: _AddressType) -> None: ...
def init(self) -> None: ...
def set_timeout(self) -> None: ...
def close(self) -> None: ...
class ThreadWorker(base.Worker):
worker_connections: int
max_keepalived: int
tpool: ThreadPoolExecutor
poller: DefaultSelector
futures: deque[Future[tuple[bool, TConn]]]
nr_conns: int
alive: bool
@classmethod
def check_config(cls, cfg: Config, log: GLogger) -> None: ...
def init_process(self) -> None: ...
def get_thread_pool(self) -> ThreadPoolExecutor: ...
def handle_quit(self, sig: int, frame: FrameType | None) -> None: ...
def enqueue_req(self, conn: TConn) -> None: ...
def accept(self, server: _AddressType, listener: socket.socket) -> None: ...
def on_client_socket_readable(self, conn: TConn, client: socket.socket) -> None: ...
def murder_keepalived(self) -> None: ...
def is_parent_alive(self) -> bool: ...
def run(self) -> None: ...
def finish_request(self, fs: Future[tuple[bool, TConn]]) -> None: ...
def handle(self, conn: TConn) -> tuple[bool, TConn]: ...
def handle_request(self, req: RequestParser, conn: TConn) -> bool: ...
@@ -0,0 +1,28 @@
from types import FrameType
from typing import Any
from typing_extensions import TypeAlias, override
from gunicorn.workers.base import Worker
IOLoop: TypeAlias = Any # tornado IOLoop class
PeriodicCallback: TypeAlias = Any # tornado PeriodicCallback class
TORNADO5: bool
class TornadoWorker(Worker):
alive: bool
server_alive: bool
ioloop: IOLoop
callbacks: list[PeriodicCallback]
@classmethod
def setup(cls) -> None: ...
@override
def handle_exit(self, sig: int, frame: FrameType | None) -> None: ...
def handle_request(self) -> None: ...
def watchdog(self) -> None: ...
def heartbeat(self) -> None: ...
@override
def init_process(self) -> None: ...
@override
def run(self) -> None: ...
+22
View File
@@ -0,0 +1,22 @@
import socket
from typing_extensions import override
from gunicorn.http import Request
from gunicorn.workers.base import Worker
from .._types import _AddressType
class StopWaiting(Exception): ...
class SyncWorker(Worker):
def accept(self, listener: socket.socket) -> None: ...
def wait(self, timeout: int) -> list[socket.socket | int] | None: ...
def is_parent_alive(self) -> bool: ...
def run_for_one(self, timeout: int) -> None: ...
def run_for_multiple(self, timeout: int) -> None: ...
@override
def run(self) -> None: ...
def handle(self, listener: socket.socket, client: socket.socket, addr: _AddressType) -> None: ...
def handle_request(self, listener: socket.socket, req: Request, client: socket.socket, addr: tuple[str, int]) -> bool: ...
@override
def handle_error(self, req: Request | None, client: socket.socket, addr: _AddressType, exc: BaseException) -> None: ...
@@ -0,0 +1,11 @@
from gunicorn.config import Config
PLATFORM: str
IS_CYGWIN: bool
class WorkerTmp:
def __init__(self, cfg: Config) -> None: ...
def notify(self) -> None: ...
def last_update(self) -> float: ...
def fileno(self) -> int: ...
def close(self) -> None: ...