diff --git a/stubs/uWSGI/@tests/stubtest_allowlist.txt b/stubs/uWSGI/@tests/stubtest_allowlist.txt new file mode 100644 index 000000000..4abf30e06 --- /dev/null +++ b/stubs/uWSGI/@tests/stubtest_allowlist.txt @@ -0,0 +1,14 @@ +# Error: is not present in stub +# ============================= +# Erlang support is currently broken, so it's better to pretend +# that this decorator doesn't exist +uwsgidecorators.erlang +# This should really only be internal API, so we don't export it +uwsgidecorators.harakiri.real_call + +# Error: is inconsistent +# ====================== +# This is a limitation of ParamSpec, we can't specify that the +# the ParamSpec does have no keyword arguments, but we need the +# ParamSpec to properly annotate this decorator +uwsgidecorators.thread.__call__ diff --git a/stubs/uWSGI/@tests/stubtest_allowlist_darwin.txt b/stubs/uWSGI/@tests/stubtest_allowlist_darwin.txt new file mode 100644 index 000000000..e0ce32895 --- /dev/null +++ b/stubs/uWSGI/@tests/stubtest_allowlist_darwin.txt @@ -0,0 +1,12 @@ +# Error: is not present at runtime +# ============================= +# These functions depend on modules which are not built into +# the MacOS wheel by default, so we just ignore them for now +# we don't want to pretend they don't exist, because people +# could still configure a build with the required modules +# manually, we don't do that for typeshed, since it would +# add a lot of additional complexity to stubtest_third_party +uwsgi.SymbolsImporter +uwsgi.SymbolsZipImporter +uwsgi.ZipImporter +uwsgi.route diff --git a/stubs/uWSGI/@tests/uwsgi.ini b/stubs/uWSGI/@tests/uwsgi.ini new file mode 100644 index 000000000..aa1ae5c2b --- /dev/null +++ b/stubs/uWSGI/@tests/uwsgi.ini @@ -0,0 +1,7 @@ +[uwsgi] +master = true +cache2 = name=mycache,items=2 +queue = 100 +sharedarea = 2 +route-run = log:foo +snmp = true diff --git a/stubs/uWSGI/METADATA.toml b/stubs/uWSGI/METADATA.toml new file mode 100644 index 000000000..9807d0b14 --- /dev/null +++ b/stubs/uWSGI/METADATA.toml @@ -0,0 +1,15 @@ +version = "2.0.*" +upstream_repository = "https://github.com/unbit/uwsgi" +extra_description = """\ + Type hints for uWSGI's \ + [Python API](https://uwsgi-docs.readthedocs.io/en/latest/PythonModule.html). \ + Note that this API is available only when running Python code inside a uWSGI process \ + and some parts of the API are only present when corresponding configuration options \ + have been enabled. +""" + +[tool.stubtest] +# Run stubtest on MacOS as well, to check that the +# uWSGI-specific parts of stubtest_third_party.py +# also work there +platforms = ["linux", "darwin"] diff --git a/stubs/uWSGI/uwsgi.pyi b/stubs/uWSGI/uwsgi.pyi new file mode 100644 index 000000000..7a7bd2dca --- /dev/null +++ b/stubs/uWSGI/uwsgi.pyi @@ -0,0 +1,240 @@ +from _typeshed import HasFileno, OptExcInfo, ReadOnlyBuffer +from _typeshed.wsgi import WSGIApplication +from collections.abc import Callable +from types import ModuleType +from typing import Any, Protocol, overload +from typing_extensions import Literal, Self, TypeAlias, final + +import uwsgidecorators + +_TrueOrNone: TypeAlias = Literal[True] | None + +class _RPCCallable(Protocol): + def __call__(self, *args: bytes) -> bytes | None: ... + +# FIXME: Technically we know the exact layout of _AppsDict and _WorkerDict +# but TypedDict does not support bytes keys, so for now we use type +# aliases to a more generic dict +_WorkerDict: TypeAlias = dict[bytes, Any] + +SPOOL_IGNORE: Literal[0] +SPOOL_OK: Literal[-2] +SPOOL_RETRY: Literal[-1] +applications: dict[str, WSGIApplication | str] | None +buffer_size: int +cores: int +has_threads: int +hostname: bytes +is_a_reload: bool +loop: bytes | None +magic_table: dict[bytes, bytes] +numproc: int +opt: dict[str, bytes | Literal[True] | list[bytes | Literal[True]]] +sockets: list[int] +started_on: int +unbit: _TrueOrNone +version: bytes +version_info: tuple[int, int, int, int, bytes] +spoolers: tuple[bytes, ...] +queue_size: int + +decorators = uwsgidecorators +spooler = uwsgidecorators.manage_spool_request +post_fork_hook = uwsgidecorators.postfork_chain_hook + +@final +class SymbolsImporter: + def find_module(self, __fullname: str) -> Self | None: ... + def load_module(self, __fullname: str) -> ModuleType | None: ... + +@final +class SymbolsZipImporter: + def __init__(self, __name: str) -> None: ... + def find_module(self, __fullname: str) -> Self | None: ... + def load_module(self, __fullname: str) -> ModuleType | None: ... + +@final +class ZipImporter: + def __init__(self, __name: str) -> None: ... + def find_module(self, __fullname: str) -> Self | None: ... + def load_module(self, __fullname: str) -> ModuleType | None: ... + +def accepting(__accepting: bool = True) -> None: ... +def add_cron(__signum: int, __minute: int, __hour: int, __day: int, __month: int, __weekday: int) -> Literal[True]: ... +def add_file_monitor(__signum: int, __filename: str) -> None: ... +def add_rb_timer(__signum: int, __seconds: int, __iterations: int = 0) -> None: ... +def add_timer(__signum: int, __seconds: int) -> None: ... +def add_var(__key: bytes | str, __val: bytes | str) -> Literal[True]: ... +def alarm(__alarm: str, __msg: bytes | ReadOnlyBuffer | str) -> None: ... +def async_connect(__socket_name: str) -> int: ... +def async_sleep(__timeout: float) -> Literal[b""]: ... +def cache_clear(__cache_name: str = ...) -> _TrueOrNone: ... +def cache_dec(__key: str | bytes, __decrement: int = 1, __expires: int = 0, __cache_name: str = ...) -> _TrueOrNone: ... +def cache_del(__key: str | bytes, __cache_name: str = ...) -> _TrueOrNone: ... +def cache_div(__key: str | bytes, __divisor: int = 2, __expires: int = 0, __cache_name: str = ...) -> _TrueOrNone: ... +def cache_exists(__key: str | bytes, __cache_name: str = ...) -> _TrueOrNone: ... +def cache_get(__key: str | bytes, __cache_name: str = ...) -> bytes | None: ... +def cache_inc(__key: str | bytes, __increment: int = 1, __expires: int = 0, __cache_name: str = ...) -> _TrueOrNone: ... +def cache_keys(__cache_name: str = ...) -> list[bytes]: ... +def cache_mul(__key: str | bytes, __factor: int = 2, __expires: int = 0, __cache_name: str = ...) -> _TrueOrNone: ... +def cache_num(__key: str | bytes, __cache_name: str = ...) -> int: ... +def cache_set( + __key: str | bytes, __value: str | bytes | ReadOnlyBuffer, __expires: int = 0, __cache_name: str = ... +) -> _TrueOrNone: ... +def cache_update( + __key: str | bytes, __value: str | bytes | ReadOnlyBuffer, __expires: int = 0, __cache_name: str = ... +) -> _TrueOrNone: ... +def queue_get(__index: int) -> bytes | None: ... +def queue_set(__index: int, __message: str | bytes | ReadOnlyBuffer) -> _TrueOrNone: ... +@overload +def queue_last(__num: Literal[0] = 0) -> bytes | None: ... # type:ignore[misc] +@overload +def queue_last(__num: int) -> list[bytes | None]: ... +def queue_push(__message: str | bytes | ReadOnlyBuffer) -> _TrueOrNone: ... +def queue_pull() -> bytes | None: ... +def queue_pop() -> bytes | None: ... +def queue_slot() -> int: ... +def queue_pull_slot() -> int: ... +def snmp_set_community(__snmp_community: str) -> Literal[True]: ... +def snmp_set_counter32(__oid_num: int, __value: int) -> _TrueOrNone: ... +def snmp_set_counter64(__oid_num: int, __value: int) -> _TrueOrNone: ... +def snmp_set_gauge(__oid_num: int, __value: int) -> _TrueOrNone: ... +def snmp_incr_counter32(__oid_num: int, __increment: int) -> _TrueOrNone: ... +def snmp_incr_counter64(__oid_num: int, __increment: int) -> _TrueOrNone: ... +def snmp_incr_gauge(__oid_num: int, __increment: int) -> _TrueOrNone: ... +def snmp_decr_counter32(__oid_num: int, __decrement: int) -> _TrueOrNone: ... +def snmp_decr_counter64(__oid_num: int, __decrement: int) -> _TrueOrNone: ... +def snmp_decr_gauge(__oid_num: int, __decrement: int) -> _TrueOrNone: ... +@overload +def send_to_spooler(__mesage_dict: dict[bytes, bytes]) -> bytes | None: ... +@overload +def send_to_spooler( + *, spooler: bytes = ..., priority: bytes = ..., at: bytes = ..., body: bytes = ..., **kwargs: bytes +) -> bytes | None: ... + +spool = send_to_spooler + +def set_spooler_frequency(__frequency: int) -> Literal[True]: ... +def spooler_jobs() -> list[bytes]: ... +def spooler_pid() -> int: ... +def spooler_pids() -> list[int]: ... +def spooler_get_task(__task_path: str) -> dict[bytes, bytes] | None: ... +def call(__rpc_name: str, *args: bytes) -> bytes | None: ... +def chunked_read(__timeout: int = 0) -> bytes: ... +def chunked_read_nb() -> bytes: ... +def cl() -> int: ... +def close(__fd: int) -> None: ... +def connect(__socket_name: str, timeout: int = 0) -> int: ... +def connection_fd() -> int: ... +def disconnect() -> None: ... +def embedded_data(__name: str) -> bytes: ... +def extract(__name: str) -> bytes | None: ... +def farm_get_msg() -> bytes | None: ... +def farm_msg(__farm_name: str, __message: str | bytes | ReadOnlyBuffer) -> None: ... +def get_logvar(__key: str | bytes) -> bytes | None: ... +def green_schedule() -> Literal[True]: ... +def i_am_the_lord(__legion_name: str) -> bool: ... +def i_am_the_spooler() -> _TrueOrNone: ... +def in_farm(__farm_name: str = ...) -> _TrueOrNone: ... +def is_connected(__fd: int) -> bool: ... +def is_locked(__lock_num: int = 0) -> bool: ... +def listen_queue(__id: int = 0) -> int: ... +def lock(__lock_num: int = 0) -> None: ... +def log(__logline: str) -> Literal[True]: ... +def log_this_request() -> None: ... +def logsize() -> int: ... +def lord_scroll(__legion_name: str) -> bytes | None: ... +def masterpid() -> int: ... +def mem() -> tuple[int, int]: ... +def metric_dec(__key: str, __decrement: int = 1) -> _TrueOrNone: ... +def metric_div(__key: str, __divisor: int = 1) -> _TrueOrNone: ... +def metric_get(__key: str) -> int: ... +def metric_inc(__key: str, __increment: int = 1) -> _TrueOrNone: ... +def metric_mul(__key: str, __factor: int = 1) -> _TrueOrNone: ... +def metric_set(__key: str, __value: int = 1) -> _TrueOrNone: ... +def metric_set_max(__key: str, __value: int = 1) -> _TrueOrNone: ... +def metric_set_min(__key: str, __value: int = 1) -> _TrueOrNone: ... +def micros() -> int: ... +def mule_get_msg(signals: bool = True, farms: bool = True, buffer_size: int = 65536, timeout: int = -1) -> bytes: ... +def mule_id() -> int: ... +@overload +def mule_msg(__mesage: str | bytes | ReadOnlyBuffer) -> bool: ... +@overload +def mule_msg(__mesage: str | bytes | ReadOnlyBuffer, __mule_id: int) -> bool: ... +@overload +def mule_msg(__mesage: str | bytes | ReadOnlyBuffer, __farm_name: str) -> bool: ... +def offload(__filename: str, __len: int = 0) -> Literal[b""]: ... +def parsefile(__filename: str) -> dict[bytes, bytes] | None: ... +def ready() -> _TrueOrNone: ... +def ready_fd() -> int: ... +def recv(__fd: int, __max_size: int = 4096) -> bytes | None: ... +@overload +def register_rpc(__name: str, __func: Callable[[], bytes | None]) -> Literal[True]: ... +@overload +def register_rpc(__name: str, __func: Callable[[bytes], bytes | None], arg_count: Literal[1]) -> Literal[True]: ... +@overload +def register_rpc(__name: str, __func: Callable[[bytes, bytes], bytes | None], arg_count: Literal[2]) -> Literal[True]: ... +@overload +def register_rpc(__name: str, __func: _RPCCallable, arg_count: int) -> Literal[True]: ... +def register_signal(__signum: int, __who: str, __handler: Callable[[int], Any]) -> None: ... +def reload() -> _TrueOrNone: ... +def request_id() -> int: ... +def route(__router_name: str, __router_args: str) -> int: ... +def rpc(__node: str | bytes, __rpc_name: bytes, *rpc_args: bytes) -> bytes | None: ... +def rpc_list() -> tuple[bytes, ...]: ... +def scrolls(__legion_name: str) -> list[bytes] | None: ... +@overload +def send(__data: bytes) -> _TrueOrNone: ... +@overload +def send(__fd: int, __data: bytes) -> _TrueOrNone: ... +def sendfile( + __filename_or_fd: str | bytes | int | HasFileno, __chunk: int = 0, __pos: int = 0, filesize: int = 0 +) -> _TrueOrNone: ... +def set_logvar(__key: str | bytes, __val: str | bytes) -> None: ... +def set_user_harakiri(__seconds: int) -> None: ... +def set_warning_message(__message: str) -> Literal[True]: ... +def setprocname(__name: str) -> None: ... +def signal(__signum: int = ..., __node: str = ...) -> None: ... +def signal_received() -> int: ... +def signal_registered(__signum: int) -> _TrueOrNone: ... +def signal_wait(__signum: int = ...) -> Literal[b""]: ... +def start_response( + __status: str, __headers: list[tuple[str, str]], __exc_info: OptExcInfo | None = ... +) -> Callable[[bytes], None]: ... +def stop() -> _TrueOrNone: ... +def suspend() -> Literal[True]: ... +def total_requests() -> int: ... +def unlock(__lock_num: int = 0) -> None: ... +def wait_fd_read(__fd: int, __timeout: int = 0) -> Literal[b""]: ... +def wait_fd_write(__fd: int, __timeout: int = 0) -> Literal[b""]: ... +def websocket_handshake(__key: str | bytes = ..., __origin: str | bytes = ..., __proto: str | bytes = ...) -> None: ... +def websocket_recv() -> bytes: ... +def websocket_recv_nb() -> bytes: ... +def websocket_send(message: str | bytes | ReadOnlyBuffer) -> None: ... +def websocket_send_binary(message: str | bytes | ReadOnlyBuffer) -> None: ... +def worker_id() -> int: ... +def workers() -> tuple[_WorkerDict, ...] | None: ... +def sharedarea_read(__id: int, __position: int, __length: int) -> bytes: ... +def sharedarea_write(__id: int, __position: int, __value: str | bytes | ReadOnlyBuffer) -> None: ... +def sharedarea_readbyte(__id: int, __position: int) -> int: ... +def sharedarea_writebyte(__id: int, __position: int, __value: int) -> None: ... +def sharedarea_read8(__id: int, __position: int) -> int: ... +def sharedarea_write8(__id: int, __position: int, __value: int) -> None: ... +def sharedarea_readlong(__id: int, __position: int) -> int: ... +def sharedarea_writelong(__id: int, __position: int, __value: int) -> None: ... +def sharedarea_read64(__id: int, __position: int) -> int: ... +def sharedarea_write64(__id: int, __position: int, __value: int) -> None: ... +def sharedarea_read32(__id: int, __position: int) -> int: ... +def sharedarea_write32(__id: int, __position: int, __value: int) -> None: ... +def sharedarea_read16(__id: int, __position: int) -> int: ... +def sharedarea_write16(__id: int, __position: int, __value: int) -> None: ... +def sharedarea_inclong(__id: int, __position: int, __increment: int = 1) -> None: ... +def sharedarea_inc64(__id: int, __position: int, __increment: int = 1) -> None: ... +def sharedarea_inc32(__id: int, __position: int, __increment: int = 1) -> None: ... +def sharedarea_dec64(__id: int, __position: int, __decrement: int = 1) -> None: ... +def sharedarea_dec32(__id: int, __position: int, __decrement: int = 1) -> None: ... +def sharedarea_rlock(__id: int) -> None: ... +def sharedarea_wlock(__id: int) -> None: ... +def sharedarea_unlock(__id: int) -> None: ... +def sharedarea_object(__id: int) -> bytearray: ... +def sharedarea_memoryview(__id: int) -> memoryview: ... diff --git a/stubs/uWSGI/uwsgidecorators.pyi b/stubs/uWSGI/uwsgidecorators.pyi new file mode 100644 index 000000000..4936f57e0 --- /dev/null +++ b/stubs/uWSGI/uwsgidecorators.pyi @@ -0,0 +1,180 @@ +from collections.abc import Callable +from typing import Any, Generic, TypeVar, overload +from typing_extensions import Literal, ParamSpec + +from uwsgi import _RPCCallable + +_T = TypeVar("_T") +_T2 = TypeVar("_T2") +_SR = TypeVar("_SR", bound=Literal[0, -1, -2] | None) +_SignalCallbackT = TypeVar("_SignalCallbackT", bound=Callable[[int], Any]) +_RPCCallableT = TypeVar("_RPCCallableT", bound=_RPCCallable) +_P = ParamSpec("_P") +_P2 = ParamSpec("_P2") + +spooler_functions: dict[str, Callable[..., Literal[0, -1, -2] | None]] +mule_functions: dict[str, Callable[..., Any]] +postfork_chain: list[Callable[[], None]] + +def get_free_signal() -> int: ... +def manage_spool_request(vars: dict[bytes, Any]) -> Literal[0, -1, -2]: ... +def postfork_chain_hook() -> None: ... + +class postfork(Generic[_P, _T]): + wid: int + f: Callable[_P, _T] | None + @overload + def __init__(self: postfork[..., Any], f: int) -> None: ... + @overload + def __init__(self: postfork[_P, _T], f: Callable[_P, _T]) -> None: ... + @overload + def __call__(self, __f: Callable[_P2, _T2]) -> postfork[_P2, _T2]: ... + @overload + def __call__(self, *args: _P.args, **kwargs: _P.kwargs) -> _T: ... + +class _spoolraw(Generic[_P, _SR]): + f: Callable[_P, _SR] + pass_arguments: bool + base_dict: dict[str, Any] + def __init__(self, f: Callable[_P, _SR], pass_arguments: bool) -> None: ... + def __call__(self, *args: _P.args, **kwargs: _P.kwargs) -> _SR: ... + def spool(self, *args: _P.args, **kwargs: _P.kwargs) -> _SR: ... + +class _spool(_spoolraw[_P, _SR]): ... +class _spoolforever(_spoolraw[_P, _SR]): ... + +@overload +def spool_decorate( + f: Callable[_P, _SR], pass_arguments: bool = False, _class: type[_spoolraw[_P, _SR]] = ... +) -> _spoolraw[_P, _SR]: ... +@overload +def spool_decorate( + f: None = None, pass_arguments: bool = False, _class: type[_spoolraw[..., Any]] = ... +) -> Callable[[Callable[_P, _SR]], _spoolraw[_P, _SR]]: ... +@overload +def spoolraw(f: Callable[_P, _SR], pass_arguments: bool = False) -> _spoolraw[_P, _SR]: ... +@overload +def spoolraw(f: None = None, pass_arguments: bool = False) -> Callable[[Callable[_P, _SR]], _spoolraw[_P, _SR]]: ... +@overload +def spool(f: Callable[_P, _SR], pass_arguments: bool = False) -> _spool[_P, _SR]: ... +@overload +def spool(f: None = None, pass_arguments: bool = False) -> Callable[[Callable[_P, _SR]], _spool[_P, _SR]]: ... +@overload +def spoolforever(f: Callable[_P, _SR], pass_arguments: bool = False) -> _spoolforever[_P, _SR]: ... +@overload +def spoolforever(f: None = None, pass_arguments: bool = False) -> Callable[[Callable[_P, _SR]], _spoolforever[_P, _SR]]: ... + +class mulefunc(Generic[_P, _T]): + fname: str | None + mule: int + @overload + def __init__(self: mulefunc[..., Any], f: int) -> None: ... + @overload + def __init__(self: mulefunc[_P, _T], f: Callable[_P, _T]) -> None: ... + def real_call(self, *args: _P.args, **kwargs: _P.kwargs) -> None: ... + @overload + def __call__(self, __f: Callable[_P2, _T2]) -> mulefunc[_P2, _T2]: ... + @overload + def __call__(self, *args: _P.args, **kwargs: _P.kwargs) -> _T: ... + +def mule_msg_dispatcher(message: bytes) -> Any: ... + +class rpc: + name: str + def __init__(self, name: str) -> None: ... + def __call__(self, f: _RPCCallableT) -> _RPCCallableT: ... + +class farm_loop: + f: Callable[[bytes], Any] + farm: str | None + def __init__(self, f: Callable[[bytes], Any], farm: str | None) -> None: ... + def __call__(self) -> None: ... + +class farm: + name: str + def __init__(self, name: str | None = None, **kwargs: Any) -> None: ... + def __call__(self, f: Callable[[bytes], Any]) -> None: ... + +class mule_brain: + f: Callable[[], Any] + num: int + def __init__(self, f: Callable[[], Any], num: int) -> None: ... + def __call__(self) -> None: ... + +class mule_brainloop(mule_brain): ... + +class mule: + num: int + def __init__(self, num: int) -> None: ... + def __call__(self, f: Callable[[], Any]) -> None: ... + +class muleloop(mule): ... + +class mulemsg_loop: + f: Callable[[bytes], Any] + num: int + def __init__(self, f: Callable[[bytes], Any], num: int) -> None: ... + def __call__(self) -> None: ... + +class mulemsg: + num: int + def __init__(self, num: int) -> None: ... + def __call__(self, f: Callable[[bytes], Any]) -> None: ... + +class signal: + num: int + target: str + def __init__(self, num: int, *, target: str = "", **kwargs: Any) -> None: ... + def __call__(self, f: _SignalCallbackT) -> _SignalCallbackT: ... + +class timer: + num: int + secs: int + target: str + def __init__(self, secs: int, *, signum: int = ..., target: str = "", **kwargs: Any) -> None: ... + def __call__(self, f: _SignalCallbackT) -> _SignalCallbackT: ... + +class cron: + num: int + minute: int + hour: int + day: int + month: int + dayweek: int + target: str + def __init__( + self, minute: int, hour: int, day: int, month: int, dayweek: int, *, signum: int = ..., target: str = "", **kwargs: Any + ) -> None: ... + def __call__(self, f: _SignalCallbackT) -> _SignalCallbackT: ... + +class rbtimer: + num: int + secs: int + target: str + def __init__(self, secs: int, *, signum: int = ..., target: str = "", **kwargs: Any) -> None: ... + def __call__(self, f: _SignalCallbackT) -> _SignalCallbackT: ... + +class filemon: + num: int + fsobj: str + target: str + def __init__(self, fsobj: str, *, signum: int = ..., target: str = "", **kwargs: Any) -> None: ... + def __call__(self, f: _SignalCallbackT) -> _SignalCallbackT: ... + +class lock(Generic[_P, _T]): + f: Callable[_P, _T] + def __init__(self, f: Callable[_P, _T]) -> None: ... + def __call__(self, *args: _P.args, **kwargs: _P.kwargs) -> _T: ... + +# FIXME: Technically this only allows positional arguments, but there is not really +# an adequate way yet to express this, once bound on ParamSpec does something +# we could probably enforce this +class thread(Generic[_P, _T]): + f: Callable[_P, _T] + def __init__(self, f: Callable[_P, _T]) -> None: ... + def __call__(self, *args: _P.args, **kwargs: _P.kwargs) -> Callable[_P, _T]: ... + +class harakiri: + s: int + def __init__(self, seconds: int) -> None: ... + def __call__(self, f: Callable[_P, _T]) -> Callable[_P, _T]: ... diff --git a/tests/stubtest_third_party.py b/tests/stubtest_third_party.py index 00e9660fc..0a13050c5 100755 --- a/tests/stubtest_third_party.py +++ b/tests/stubtest_third_party.py @@ -9,6 +9,7 @@ import subprocess import sys import tempfile from pathlib import Path +from textwrap import dedent from typing import NoReturn from parse_metadata import get_recursive_requirements, read_metadata @@ -97,6 +98,11 @@ def run_stubtest(dist: Path, *, verbose: bool = False, specified_platforms_only: if platform_allowlist.exists(): stubtest_cmd.extend(["--allowlist", str(platform_allowlist)]) + # Perform some black magic in order to run stubtest inside uWSGI + if dist_name == "uWSGI": + if not setup_uwsgi_stubtest_command(dist, venv_dir, stubtest_cmd): + return False + try: subprocess.run(stubtest_cmd, env=stubtest_env, check=True, capture_output=True) except subprocess.CalledProcessError as e: @@ -128,6 +134,82 @@ def run_stubtest(dist: Path, *, verbose: bool = False, specified_platforms_only: return True +def setup_uwsgi_stubtest_command(dist: Path, venv_dir: Path, stubtest_cmd: list[str]) -> bool: + """Perform some black magic in order to run stubtest inside uWSGI. + + We have to write the exit code from stubtest to a surrogate file + because uwsgi --pyrun does not exit with the exitcode from the + python script. We have a second wrapper script that passed the + arguments along to the uWSGI script and retrieves the exit code + from the file, so it behaves like running stubtest normally would. + + Both generated wrapper scripts are created inside `venv_dir`, + which itself is a subdirectory inside a temporary directory, + so both scripts will be cleaned up after this function + has been executed. + """ + uwsgi_ini = dist / "@tests/uwsgi.ini" + + if sys.platform == "win32": + print_error("uWSGI is not supported on Windows") + return False + + uwsgi_script = venv_dir / "uwsgi_stubtest.py" + wrapper_script = venv_dir / "uwsgi_wrapper.py" + exit_code_surrogate = venv_dir / "exit_code" + uwsgi_script_contents = dedent( + f""" + import json + import os + import sys + from mypy.stubtest import main + + sys.argv = json.loads(os.environ.get("STUBTEST_ARGS")) + exit_code = main() + with open("{exit_code_surrogate}", mode="w") as fp: + fp.write(str(exit_code)) + sys.exit(exit_code) + """ + ) + uwsgi_script.write_text(uwsgi_script_contents) + + uwsgi_exe = venv_dir / "bin" / "uwsgi" + + # It would be nice to reliably separate uWSGI output from + # the stubtest output, on linux it appears that stubtest + # will always go to stdout and uWSGI to stderr, but on + # MacOS they both go to stderr, for now we deal with the + # bit of extra spam + wrapper_script_contents = dedent( + f""" + import json + import os + import subprocess + import sys + + stubtest_env = os.environ | {{"STUBTEST_ARGS": json.dumps(sys.argv)}} + uwsgi_cmd = [ + "{uwsgi_exe}", + "--ini", + "{uwsgi_ini}", + "--spooler", + "{venv_dir}", + "--pyrun", + "{uwsgi_script}", + ] + subprocess.run(uwsgi_cmd, env=stubtest_env) + with open("{exit_code_surrogate}", mode="r") as fp: + sys.exit(int(fp.read())) + """ + ) + wrapper_script.write_text(wrapper_script_contents) + + # replace "-m mypy.stubtest" in stubtest_cmd with the path to our wrapper script + assert stubtest_cmd[1:3] == ["-m", "mypy.stubtest"] + stubtest_cmd[1:3] = [str(wrapper_script)] + return True + + def print_commands(dist: Path, pip_cmd: list[str], stubtest_cmd: list[str], mypypath: str) -> None: print(file=sys.stderr) print(" ".join(pip_cmd), file=sys.stderr)