Adds stubs for uWSGI (#10432)

This adds stubs for the uWSGI Python API.

Similarly to GDB the Python API is only accessible within a uWSGI process, some parts of the API also only exist if certain configuration options are enabled. This makes running stubtest a bit of pain.

Co-authored-by: Alex Waygood <Alex.Waygood@Gmail.com>
Co-authored-by: Jelle Zijlstra <jelle.zijlstra@gmail.com>
Co-authored-by: Akuli <akuviljanen17@gmail.com>
This commit is contained in:
David Salvisberg
2023-07-24 13:32:52 +02:00
committed by GitHub
parent 7d33060e6a
commit 21cb2cb546
7 changed files with 550 additions and 0 deletions

View File

@@ -0,0 +1,14 @@
# Error: is not present in stub
# =============================
# Erlang support is currently broken, so it's better to pretend
# that this decorator doesn't exist
uwsgidecorators.erlang
# This should really only be internal API, so we don't export it
uwsgidecorators.harakiri.real_call
# Error: is inconsistent
# ======================
# This is a limitation of ParamSpec, we can't specify that the
# the ParamSpec does have no keyword arguments, but we need the
# ParamSpec to properly annotate this decorator
uwsgidecorators.thread.__call__

View File

@@ -0,0 +1,12 @@
# Error: is not present at runtime
# =============================
# These functions depend on modules which are not built into
# the MacOS wheel by default, so we just ignore them for now
# we don't want to pretend they don't exist, because people
# could still configure a build with the required modules
# manually, we don't do that for typeshed, since it would
# add a lot of additional complexity to stubtest_third_party
uwsgi.SymbolsImporter
uwsgi.SymbolsZipImporter
uwsgi.ZipImporter
uwsgi.route

View File

@@ -0,0 +1,7 @@
[uwsgi]
master = true
cache2 = name=mycache,items=2
queue = 100
sharedarea = 2
route-run = log:foo
snmp = true

15
stubs/uWSGI/METADATA.toml Normal file
View File

@@ -0,0 +1,15 @@
version = "2.0.*"
upstream_repository = "https://github.com/unbit/uwsgi"
extra_description = """\
Type hints for uWSGI's \
[Python API](https://uwsgi-docs.readthedocs.io/en/latest/PythonModule.html). \
Note that this API is available only when running Python code inside a uWSGI process \
and some parts of the API are only present when corresponding configuration options \
have been enabled.
"""
[tool.stubtest]
# Run stubtest on MacOS as well, to check that the
# uWSGI-specific parts of stubtest_third_party.py
# also work there
platforms = ["linux", "darwin"]

240
stubs/uWSGI/uwsgi.pyi Normal file
View File

@@ -0,0 +1,240 @@
from _typeshed import HasFileno, OptExcInfo, ReadOnlyBuffer
from _typeshed.wsgi import WSGIApplication
from collections.abc import Callable
from types import ModuleType
from typing import Any, Protocol, overload
from typing_extensions import Literal, Self, TypeAlias, final
import uwsgidecorators
_TrueOrNone: TypeAlias = Literal[True] | None
class _RPCCallable(Protocol):
def __call__(self, *args: bytes) -> bytes | None: ...
# FIXME: Technically we know the exact layout of _AppsDict and _WorkerDict
# but TypedDict does not support bytes keys, so for now we use type
# aliases to a more generic dict
_WorkerDict: TypeAlias = dict[bytes, Any]
SPOOL_IGNORE: Literal[0]
SPOOL_OK: Literal[-2]
SPOOL_RETRY: Literal[-1]
applications: dict[str, WSGIApplication | str] | None
buffer_size: int
cores: int
has_threads: int
hostname: bytes
is_a_reload: bool
loop: bytes | None
magic_table: dict[bytes, bytes]
numproc: int
opt: dict[str, bytes | Literal[True] | list[bytes | Literal[True]]]
sockets: list[int]
started_on: int
unbit: _TrueOrNone
version: bytes
version_info: tuple[int, int, int, int, bytes]
spoolers: tuple[bytes, ...]
queue_size: int
decorators = uwsgidecorators
spooler = uwsgidecorators.manage_spool_request
post_fork_hook = uwsgidecorators.postfork_chain_hook
@final
class SymbolsImporter:
def find_module(self, __fullname: str) -> Self | None: ...
def load_module(self, __fullname: str) -> ModuleType | None: ...
@final
class SymbolsZipImporter:
def __init__(self, __name: str) -> None: ...
def find_module(self, __fullname: str) -> Self | None: ...
def load_module(self, __fullname: str) -> ModuleType | None: ...
@final
class ZipImporter:
def __init__(self, __name: str) -> None: ...
def find_module(self, __fullname: str) -> Self | None: ...
def load_module(self, __fullname: str) -> ModuleType | None: ...
def accepting(__accepting: bool = True) -> None: ...
def add_cron(__signum: int, __minute: int, __hour: int, __day: int, __month: int, __weekday: int) -> Literal[True]: ...
def add_file_monitor(__signum: int, __filename: str) -> None: ...
def add_rb_timer(__signum: int, __seconds: int, __iterations: int = 0) -> None: ...
def add_timer(__signum: int, __seconds: int) -> None: ...
def add_var(__key: bytes | str, __val: bytes | str) -> Literal[True]: ...
def alarm(__alarm: str, __msg: bytes | ReadOnlyBuffer | str) -> None: ...
def async_connect(__socket_name: str) -> int: ...
def async_sleep(__timeout: float) -> Literal[b""]: ...
def cache_clear(__cache_name: str = ...) -> _TrueOrNone: ...
def cache_dec(__key: str | bytes, __decrement: int = 1, __expires: int = 0, __cache_name: str = ...) -> _TrueOrNone: ...
def cache_del(__key: str | bytes, __cache_name: str = ...) -> _TrueOrNone: ...
def cache_div(__key: str | bytes, __divisor: int = 2, __expires: int = 0, __cache_name: str = ...) -> _TrueOrNone: ...
def cache_exists(__key: str | bytes, __cache_name: str = ...) -> _TrueOrNone: ...
def cache_get(__key: str | bytes, __cache_name: str = ...) -> bytes | None: ...
def cache_inc(__key: str | bytes, __increment: int = 1, __expires: int = 0, __cache_name: str = ...) -> _TrueOrNone: ...
def cache_keys(__cache_name: str = ...) -> list[bytes]: ...
def cache_mul(__key: str | bytes, __factor: int = 2, __expires: int = 0, __cache_name: str = ...) -> _TrueOrNone: ...
def cache_num(__key: str | bytes, __cache_name: str = ...) -> int: ...
def cache_set(
__key: str | bytes, __value: str | bytes | ReadOnlyBuffer, __expires: int = 0, __cache_name: str = ...
) -> _TrueOrNone: ...
def cache_update(
__key: str | bytes, __value: str | bytes | ReadOnlyBuffer, __expires: int = 0, __cache_name: str = ...
) -> _TrueOrNone: ...
def queue_get(__index: int) -> bytes | None: ...
def queue_set(__index: int, __message: str | bytes | ReadOnlyBuffer) -> _TrueOrNone: ...
@overload
def queue_last(__num: Literal[0] = 0) -> bytes | None: ... # type:ignore[misc]
@overload
def queue_last(__num: int) -> list[bytes | None]: ...
def queue_push(__message: str | bytes | ReadOnlyBuffer) -> _TrueOrNone: ...
def queue_pull() -> bytes | None: ...
def queue_pop() -> bytes | None: ...
def queue_slot() -> int: ...
def queue_pull_slot() -> int: ...
def snmp_set_community(__snmp_community: str) -> Literal[True]: ...
def snmp_set_counter32(__oid_num: int, __value: int) -> _TrueOrNone: ...
def snmp_set_counter64(__oid_num: int, __value: int) -> _TrueOrNone: ...
def snmp_set_gauge(__oid_num: int, __value: int) -> _TrueOrNone: ...
def snmp_incr_counter32(__oid_num: int, __increment: int) -> _TrueOrNone: ...
def snmp_incr_counter64(__oid_num: int, __increment: int) -> _TrueOrNone: ...
def snmp_incr_gauge(__oid_num: int, __increment: int) -> _TrueOrNone: ...
def snmp_decr_counter32(__oid_num: int, __decrement: int) -> _TrueOrNone: ...
def snmp_decr_counter64(__oid_num: int, __decrement: int) -> _TrueOrNone: ...
def snmp_decr_gauge(__oid_num: int, __decrement: int) -> _TrueOrNone: ...
@overload
def send_to_spooler(__mesage_dict: dict[bytes, bytes]) -> bytes | None: ...
@overload
def send_to_spooler(
*, spooler: bytes = ..., priority: bytes = ..., at: bytes = ..., body: bytes = ..., **kwargs: bytes
) -> bytes | None: ...
spool = send_to_spooler
def set_spooler_frequency(__frequency: int) -> Literal[True]: ...
def spooler_jobs() -> list[bytes]: ...
def spooler_pid() -> int: ...
def spooler_pids() -> list[int]: ...
def spooler_get_task(__task_path: str) -> dict[bytes, bytes] | None: ...
def call(__rpc_name: str, *args: bytes) -> bytes | None: ...
def chunked_read(__timeout: int = 0) -> bytes: ...
def chunked_read_nb() -> bytes: ...
def cl() -> int: ...
def close(__fd: int) -> None: ...
def connect(__socket_name: str, timeout: int = 0) -> int: ...
def connection_fd() -> int: ...
def disconnect() -> None: ...
def embedded_data(__name: str) -> bytes: ...
def extract(__name: str) -> bytes | None: ...
def farm_get_msg() -> bytes | None: ...
def farm_msg(__farm_name: str, __message: str | bytes | ReadOnlyBuffer) -> None: ...
def get_logvar(__key: str | bytes) -> bytes | None: ...
def green_schedule() -> Literal[True]: ...
def i_am_the_lord(__legion_name: str) -> bool: ...
def i_am_the_spooler() -> _TrueOrNone: ...
def in_farm(__farm_name: str = ...) -> _TrueOrNone: ...
def is_connected(__fd: int) -> bool: ...
def is_locked(__lock_num: int = 0) -> bool: ...
def listen_queue(__id: int = 0) -> int: ...
def lock(__lock_num: int = 0) -> None: ...
def log(__logline: str) -> Literal[True]: ...
def log_this_request() -> None: ...
def logsize() -> int: ...
def lord_scroll(__legion_name: str) -> bytes | None: ...
def masterpid() -> int: ...
def mem() -> tuple[int, int]: ...
def metric_dec(__key: str, __decrement: int = 1) -> _TrueOrNone: ...
def metric_div(__key: str, __divisor: int = 1) -> _TrueOrNone: ...
def metric_get(__key: str) -> int: ...
def metric_inc(__key: str, __increment: int = 1) -> _TrueOrNone: ...
def metric_mul(__key: str, __factor: int = 1) -> _TrueOrNone: ...
def metric_set(__key: str, __value: int = 1) -> _TrueOrNone: ...
def metric_set_max(__key: str, __value: int = 1) -> _TrueOrNone: ...
def metric_set_min(__key: str, __value: int = 1) -> _TrueOrNone: ...
def micros() -> int: ...
def mule_get_msg(signals: bool = True, farms: bool = True, buffer_size: int = 65536, timeout: int = -1) -> bytes: ...
def mule_id() -> int: ...
@overload
def mule_msg(__mesage: str | bytes | ReadOnlyBuffer) -> bool: ...
@overload
def mule_msg(__mesage: str | bytes | ReadOnlyBuffer, __mule_id: int) -> bool: ...
@overload
def mule_msg(__mesage: str | bytes | ReadOnlyBuffer, __farm_name: str) -> bool: ...
def offload(__filename: str, __len: int = 0) -> Literal[b""]: ...
def parsefile(__filename: str) -> dict[bytes, bytes] | None: ...
def ready() -> _TrueOrNone: ...
def ready_fd() -> int: ...
def recv(__fd: int, __max_size: int = 4096) -> bytes | None: ...
@overload
def register_rpc(__name: str, __func: Callable[[], bytes | None]) -> Literal[True]: ...
@overload
def register_rpc(__name: str, __func: Callable[[bytes], bytes | None], arg_count: Literal[1]) -> Literal[True]: ...
@overload
def register_rpc(__name: str, __func: Callable[[bytes, bytes], bytes | None], arg_count: Literal[2]) -> Literal[True]: ...
@overload
def register_rpc(__name: str, __func: _RPCCallable, arg_count: int) -> Literal[True]: ...
def register_signal(__signum: int, __who: str, __handler: Callable[[int], Any]) -> None: ...
def reload() -> _TrueOrNone: ...
def request_id() -> int: ...
def route(__router_name: str, __router_args: str) -> int: ...
def rpc(__node: str | bytes, __rpc_name: bytes, *rpc_args: bytes) -> bytes | None: ...
def rpc_list() -> tuple[bytes, ...]: ...
def scrolls(__legion_name: str) -> list[bytes] | None: ...
@overload
def send(__data: bytes) -> _TrueOrNone: ...
@overload
def send(__fd: int, __data: bytes) -> _TrueOrNone: ...
def sendfile(
__filename_or_fd: str | bytes | int | HasFileno, __chunk: int = 0, __pos: int = 0, filesize: int = 0
) -> _TrueOrNone: ...
def set_logvar(__key: str | bytes, __val: str | bytes) -> None: ...
def set_user_harakiri(__seconds: int) -> None: ...
def set_warning_message(__message: str) -> Literal[True]: ...
def setprocname(__name: str) -> None: ...
def signal(__signum: int = ..., __node: str = ...) -> None: ...
def signal_received() -> int: ...
def signal_registered(__signum: int) -> _TrueOrNone: ...
def signal_wait(__signum: int = ...) -> Literal[b""]: ...
def start_response(
__status: str, __headers: list[tuple[str, str]], __exc_info: OptExcInfo | None = ...
) -> Callable[[bytes], None]: ...
def stop() -> _TrueOrNone: ...
def suspend() -> Literal[True]: ...
def total_requests() -> int: ...
def unlock(__lock_num: int = 0) -> None: ...
def wait_fd_read(__fd: int, __timeout: int = 0) -> Literal[b""]: ...
def wait_fd_write(__fd: int, __timeout: int = 0) -> Literal[b""]: ...
def websocket_handshake(__key: str | bytes = ..., __origin: str | bytes = ..., __proto: str | bytes = ...) -> None: ...
def websocket_recv() -> bytes: ...
def websocket_recv_nb() -> bytes: ...
def websocket_send(message: str | bytes | ReadOnlyBuffer) -> None: ...
def websocket_send_binary(message: str | bytes | ReadOnlyBuffer) -> None: ...
def worker_id() -> int: ...
def workers() -> tuple[_WorkerDict, ...] | None: ...
def sharedarea_read(__id: int, __position: int, __length: int) -> bytes: ...
def sharedarea_write(__id: int, __position: int, __value: str | bytes | ReadOnlyBuffer) -> None: ...
def sharedarea_readbyte(__id: int, __position: int) -> int: ...
def sharedarea_writebyte(__id: int, __position: int, __value: int) -> None: ...
def sharedarea_read8(__id: int, __position: int) -> int: ...
def sharedarea_write8(__id: int, __position: int, __value: int) -> None: ...
def sharedarea_readlong(__id: int, __position: int) -> int: ...
def sharedarea_writelong(__id: int, __position: int, __value: int) -> None: ...
def sharedarea_read64(__id: int, __position: int) -> int: ...
def sharedarea_write64(__id: int, __position: int, __value: int) -> None: ...
def sharedarea_read32(__id: int, __position: int) -> int: ...
def sharedarea_write32(__id: int, __position: int, __value: int) -> None: ...
def sharedarea_read16(__id: int, __position: int) -> int: ...
def sharedarea_write16(__id: int, __position: int, __value: int) -> None: ...
def sharedarea_inclong(__id: int, __position: int, __increment: int = 1) -> None: ...
def sharedarea_inc64(__id: int, __position: int, __increment: int = 1) -> None: ...
def sharedarea_inc32(__id: int, __position: int, __increment: int = 1) -> None: ...
def sharedarea_dec64(__id: int, __position: int, __decrement: int = 1) -> None: ...
def sharedarea_dec32(__id: int, __position: int, __decrement: int = 1) -> None: ...
def sharedarea_rlock(__id: int) -> None: ...
def sharedarea_wlock(__id: int) -> None: ...
def sharedarea_unlock(__id: int) -> None: ...
def sharedarea_object(__id: int) -> bytearray: ...
def sharedarea_memoryview(__id: int) -> memoryview: ...

View File

@@ -0,0 +1,180 @@
from collections.abc import Callable
from typing import Any, Generic, TypeVar, overload
from typing_extensions import Literal, ParamSpec
from uwsgi import _RPCCallable
_T = TypeVar("_T")
_T2 = TypeVar("_T2")
_SR = TypeVar("_SR", bound=Literal[0, -1, -2] | None)
_SignalCallbackT = TypeVar("_SignalCallbackT", bound=Callable[[int], Any])
_RPCCallableT = TypeVar("_RPCCallableT", bound=_RPCCallable)
_P = ParamSpec("_P")
_P2 = ParamSpec("_P2")
spooler_functions: dict[str, Callable[..., Literal[0, -1, -2] | None]]
mule_functions: dict[str, Callable[..., Any]]
postfork_chain: list[Callable[[], None]]
def get_free_signal() -> int: ...
def manage_spool_request(vars: dict[bytes, Any]) -> Literal[0, -1, -2]: ...
def postfork_chain_hook() -> None: ...
class postfork(Generic[_P, _T]):
wid: int
f: Callable[_P, _T] | None
@overload
def __init__(self: postfork[..., Any], f: int) -> None: ...
@overload
def __init__(self: postfork[_P, _T], f: Callable[_P, _T]) -> None: ...
@overload
def __call__(self, __f: Callable[_P2, _T2]) -> postfork[_P2, _T2]: ...
@overload
def __call__(self, *args: _P.args, **kwargs: _P.kwargs) -> _T: ...
class _spoolraw(Generic[_P, _SR]):
f: Callable[_P, _SR]
pass_arguments: bool
base_dict: dict[str, Any]
def __init__(self, f: Callable[_P, _SR], pass_arguments: bool) -> None: ...
def __call__(self, *args: _P.args, **kwargs: _P.kwargs) -> _SR: ...
def spool(self, *args: _P.args, **kwargs: _P.kwargs) -> _SR: ...
class _spool(_spoolraw[_P, _SR]): ...
class _spoolforever(_spoolraw[_P, _SR]): ...
@overload
def spool_decorate(
f: Callable[_P, _SR], pass_arguments: bool = False, _class: type[_spoolraw[_P, _SR]] = ...
) -> _spoolraw[_P, _SR]: ...
@overload
def spool_decorate(
f: None = None, pass_arguments: bool = False, _class: type[_spoolraw[..., Any]] = ...
) -> Callable[[Callable[_P, _SR]], _spoolraw[_P, _SR]]: ...
@overload
def spoolraw(f: Callable[_P, _SR], pass_arguments: bool = False) -> _spoolraw[_P, _SR]: ...
@overload
def spoolraw(f: None = None, pass_arguments: bool = False) -> Callable[[Callable[_P, _SR]], _spoolraw[_P, _SR]]: ...
@overload
def spool(f: Callable[_P, _SR], pass_arguments: bool = False) -> _spool[_P, _SR]: ...
@overload
def spool(f: None = None, pass_arguments: bool = False) -> Callable[[Callable[_P, _SR]], _spool[_P, _SR]]: ...
@overload
def spoolforever(f: Callable[_P, _SR], pass_arguments: bool = False) -> _spoolforever[_P, _SR]: ...
@overload
def spoolforever(f: None = None, pass_arguments: bool = False) -> Callable[[Callable[_P, _SR]], _spoolforever[_P, _SR]]: ...
class mulefunc(Generic[_P, _T]):
fname: str | None
mule: int
@overload
def __init__(self: mulefunc[..., Any], f: int) -> None: ...
@overload
def __init__(self: mulefunc[_P, _T], f: Callable[_P, _T]) -> None: ...
def real_call(self, *args: _P.args, **kwargs: _P.kwargs) -> None: ...
@overload
def __call__(self, __f: Callable[_P2, _T2]) -> mulefunc[_P2, _T2]: ...
@overload
def __call__(self, *args: _P.args, **kwargs: _P.kwargs) -> _T: ...
def mule_msg_dispatcher(message: bytes) -> Any: ...
class rpc:
name: str
def __init__(self, name: str) -> None: ...
def __call__(self, f: _RPCCallableT) -> _RPCCallableT: ...
class farm_loop:
f: Callable[[bytes], Any]
farm: str | None
def __init__(self, f: Callable[[bytes], Any], farm: str | None) -> None: ...
def __call__(self) -> None: ...
class farm:
name: str
def __init__(self, name: str | None = None, **kwargs: Any) -> None: ...
def __call__(self, f: Callable[[bytes], Any]) -> None: ...
class mule_brain:
f: Callable[[], Any]
num: int
def __init__(self, f: Callable[[], Any], num: int) -> None: ...
def __call__(self) -> None: ...
class mule_brainloop(mule_brain): ...
class mule:
num: int
def __init__(self, num: int) -> None: ...
def __call__(self, f: Callable[[], Any]) -> None: ...
class muleloop(mule): ...
class mulemsg_loop:
f: Callable[[bytes], Any]
num: int
def __init__(self, f: Callable[[bytes], Any], num: int) -> None: ...
def __call__(self) -> None: ...
class mulemsg:
num: int
def __init__(self, num: int) -> None: ...
def __call__(self, f: Callable[[bytes], Any]) -> None: ...
class signal:
num: int
target: str
def __init__(self, num: int, *, target: str = "", **kwargs: Any) -> None: ...
def __call__(self, f: _SignalCallbackT) -> _SignalCallbackT: ...
class timer:
num: int
secs: int
target: str
def __init__(self, secs: int, *, signum: int = ..., target: str = "", **kwargs: Any) -> None: ...
def __call__(self, f: _SignalCallbackT) -> _SignalCallbackT: ...
class cron:
num: int
minute: int
hour: int
day: int
month: int
dayweek: int
target: str
def __init__(
self, minute: int, hour: int, day: int, month: int, dayweek: int, *, signum: int = ..., target: str = "", **kwargs: Any
) -> None: ...
def __call__(self, f: _SignalCallbackT) -> _SignalCallbackT: ...
class rbtimer:
num: int
secs: int
target: str
def __init__(self, secs: int, *, signum: int = ..., target: str = "", **kwargs: Any) -> None: ...
def __call__(self, f: _SignalCallbackT) -> _SignalCallbackT: ...
class filemon:
num: int
fsobj: str
target: str
def __init__(self, fsobj: str, *, signum: int = ..., target: str = "", **kwargs: Any) -> None: ...
def __call__(self, f: _SignalCallbackT) -> _SignalCallbackT: ...
class lock(Generic[_P, _T]):
f: Callable[_P, _T]
def __init__(self, f: Callable[_P, _T]) -> None: ...
def __call__(self, *args: _P.args, **kwargs: _P.kwargs) -> _T: ...
# FIXME: Technically this only allows positional arguments, but there is not really
# an adequate way yet to express this, once bound on ParamSpec does something
# we could probably enforce this
class thread(Generic[_P, _T]):
f: Callable[_P, _T]
def __init__(self, f: Callable[_P, _T]) -> None: ...
def __call__(self, *args: _P.args, **kwargs: _P.kwargs) -> Callable[_P, _T]: ...
class harakiri:
s: int
def __init__(self, seconds: int) -> None: ...
def __call__(self, f: Callable[_P, _T]) -> Callable[_P, _T]: ...

View File

@@ -9,6 +9,7 @@ import subprocess
import sys
import tempfile
from pathlib import Path
from textwrap import dedent
from typing import NoReturn
from parse_metadata import get_recursive_requirements, read_metadata
@@ -97,6 +98,11 @@ def run_stubtest(dist: Path, *, verbose: bool = False, specified_platforms_only:
if platform_allowlist.exists():
stubtest_cmd.extend(["--allowlist", str(platform_allowlist)])
# Perform some black magic in order to run stubtest inside uWSGI
if dist_name == "uWSGI":
if not setup_uwsgi_stubtest_command(dist, venv_dir, stubtest_cmd):
return False
try:
subprocess.run(stubtest_cmd, env=stubtest_env, check=True, capture_output=True)
except subprocess.CalledProcessError as e:
@@ -128,6 +134,82 @@ def run_stubtest(dist: Path, *, verbose: bool = False, specified_platforms_only:
return True
def setup_uwsgi_stubtest_command(dist: Path, venv_dir: Path, stubtest_cmd: list[str]) -> bool:
"""Perform some black magic in order to run stubtest inside uWSGI.
We have to write the exit code from stubtest to a surrogate file
because uwsgi --pyrun does not exit with the exitcode from the
python script. We have a second wrapper script that passed the
arguments along to the uWSGI script and retrieves the exit code
from the file, so it behaves like running stubtest normally would.
Both generated wrapper scripts are created inside `venv_dir`,
which itself is a subdirectory inside a temporary directory,
so both scripts will be cleaned up after this function
has been executed.
"""
uwsgi_ini = dist / "@tests/uwsgi.ini"
if sys.platform == "win32":
print_error("uWSGI is not supported on Windows")
return False
uwsgi_script = venv_dir / "uwsgi_stubtest.py"
wrapper_script = venv_dir / "uwsgi_wrapper.py"
exit_code_surrogate = venv_dir / "exit_code"
uwsgi_script_contents = dedent(
f"""
import json
import os
import sys
from mypy.stubtest import main
sys.argv = json.loads(os.environ.get("STUBTEST_ARGS"))
exit_code = main()
with open("{exit_code_surrogate}", mode="w") as fp:
fp.write(str(exit_code))
sys.exit(exit_code)
"""
)
uwsgi_script.write_text(uwsgi_script_contents)
uwsgi_exe = venv_dir / "bin" / "uwsgi"
# It would be nice to reliably separate uWSGI output from
# the stubtest output, on linux it appears that stubtest
# will always go to stdout and uWSGI to stderr, but on
# MacOS they both go to stderr, for now we deal with the
# bit of extra spam
wrapper_script_contents = dedent(
f"""
import json
import os
import subprocess
import sys
stubtest_env = os.environ | {{"STUBTEST_ARGS": json.dumps(sys.argv)}}
uwsgi_cmd = [
"{uwsgi_exe}",
"--ini",
"{uwsgi_ini}",
"--spooler",
"{venv_dir}",
"--pyrun",
"{uwsgi_script}",
]
subprocess.run(uwsgi_cmd, env=stubtest_env)
with open("{exit_code_surrogate}", mode="r") as fp:
sys.exit(int(fp.read()))
"""
)
wrapper_script.write_text(wrapper_script_contents)
# replace "-m mypy.stubtest" in stubtest_cmd with the path to our wrapper script
assert stubtest_cmd[1:3] == ["-m", "mypy.stubtest"]
stubtest_cmd[1:3] = [str(wrapper_script)]
return True
def print_commands(dist: Path, pip_cmd: list[str], stubtest_cmd: list[str], mypypath: str) -> None:
print(file=sys.stderr)
print(" ".join(pip_cmd), file=sys.stderr)