From a4b7c363669ed6067bb54cc109b5937ae2c99de9 Mon Sep 17 00:00:00 2001 From: Stephen Morton Date: Fri, 27 Dec 2024 21:13:30 -0800 Subject: [PATCH] add missing things in multiprocessing (#13153) --- stdlib/@tests/stubtest_allowlists/common.txt | 28 ------- stdlib/multiprocessing/managers.pyi | 79 +++++++++++++++----- stdlib/multiprocessing/pool.pyi | 3 + stdlib/multiprocessing/synchronize.pyi | 1 + 4 files changed, 66 insertions(+), 45 deletions(-) diff --git a/stdlib/@tests/stubtest_allowlists/common.txt b/stdlib/@tests/stubtest_allowlists/common.txt index a9ef5e479..ed6466b50 100644 --- a/stdlib/@tests/stubtest_allowlists/common.txt +++ b/stdlib/@tests/stubtest_allowlists/common.txt @@ -37,34 +37,6 @@ turtledemo\..+ # ====================================================================== _thread.RLock -multiprocessing.managers.Server.accepter -multiprocessing.managers.Server.create -multiprocessing.managers.Server.debug_info -multiprocessing.managers.Server.decref -multiprocessing.managers.Server.dummy -multiprocessing.managers.Server.fallback_getvalue -multiprocessing.managers.Server.fallback_mapping -multiprocessing.managers.Server.fallback_repr -multiprocessing.managers.Server.fallback_str -multiprocessing.managers.Server.get_methods -multiprocessing.managers.Server.handle_request -multiprocessing.managers.Server.incref -multiprocessing.managers.Server.number_of_objects -multiprocessing.managers.Server.public -multiprocessing.managers.Server.serve_client -multiprocessing.managers.Server.shutdown -multiprocessing.managers.SharedMemoryServer.create -multiprocessing.managers.SharedMemoryServer.list_segments -multiprocessing.managers.SharedMemoryServer.public -multiprocessing.managers.SharedMemoryServer.release_segment -multiprocessing.managers.SharedMemoryServer.shutdown -multiprocessing.managers.SharedMemoryServer.track_segment -multiprocessing.managers.SyncManager.Barrier -multiprocessing.managers.SyncManager.JoinableQueue -multiprocessing.managers.SyncManager.Pool -multiprocessing.pool.Pool.Process -multiprocessing.pool.ThreadPool.Process -multiprocessing.synchronize.Semaphore.get_value tkinter.Misc.config diff --git a/stdlib/multiprocessing/managers.pyi b/stdlib/multiprocessing/managers.pyi index 3a798e1e5..ad5697e0a 100644 --- a/stdlib/multiprocessing/managers.pyi +++ b/stdlib/multiprocessing/managers.pyi @@ -1,13 +1,14 @@ import queue import sys import threading -from _typeshed import Incomplete, SupportsKeysAndGetItem, SupportsRichComparison, SupportsRichComparisonT +from _typeshed import SupportsKeysAndGetItem, SupportsRichComparison, SupportsRichComparisonT from collections.abc import Callable, Iterable, Iterator, Mapping, MutableMapping, MutableSequence, Sequence from types import TracebackType from typing import Any, AnyStr, ClassVar, Generic, SupportsIndex, TypeVar, overload from typing_extensions import Self, TypeAlias -from .connection import Connection +from . import pool +from .connection import Connection, _Address from .context import BaseContext from .shared_memory import _SLT, ShareableList as _ShareableList, SharedMemory as _SharedMemory from .util import Finalize as _Finalize @@ -30,14 +31,14 @@ _Namespace: TypeAlias = Namespace class Token: typeid: str | bytes | None - address: tuple[str | bytes, int] + address: _Address | None id: str | bytes | int | None - def __init__(self, typeid: bytes | str | None, address: tuple[str | bytes, int], id: str | bytes | int | None) -> None: ... + def __init__(self, typeid: bytes | str | None, address: _Address | None, id: str | bytes | int | None) -> None: ... def __getstate__(self) -> tuple[str | bytes | None, tuple[str | bytes, int], str | bytes | int | None]: ... def __setstate__(self, state: tuple[str | bytes | None, tuple[str | bytes, int], str | bytes | int | None]) -> None: ... class BaseProxy: - _address_to_local: dict[Any, Any] + _address_to_local: dict[_Address, Any] _mutex: Any def __init__( self, @@ -151,22 +152,50 @@ class ListProxy(BaseListProxy[_T]): if sys.version_info >= (3, 13): def __class_getitem__(cls, args: Any, /) -> Any: ... +# Send is (kind, result) +# Receive is (id, methodname, args, kwds) +_ServerConnection: TypeAlias = Connection[tuple[str, Any], tuple[str, str, Iterable[Any], Mapping[str, Any]]] + # Returned by BaseManager.get_server() class Server: - address: Any + address: _Address | None + id_to_obj: dict[str, tuple[Any, set[str], dict[str, str]]] + fallback_mapping: dict[str, Callable[[_ServerConnection, str, Any], Any]] + public: list[str] + # Registry values are (callable, exposed, method_to_typeid, proxytype) def __init__( - self, registry: dict[str, tuple[Callable[..., Any], Any, Any, Any]], address: Any, authkey: bytes, serializer: str + self, + registry: dict[str, tuple[Callable[..., Any], Iterable[str], dict[str, str], Any]], + address: _Address | None, + authkey: bytes, + serializer: str, ) -> None: ... def serve_forever(self) -> None: ... - def accept_connection( - self, c: Connection[tuple[str, str | None], tuple[str, str, Iterable[Incomplete], Mapping[str, Incomplete]]], name: str - ) -> None: ... + def accepter(self) -> None: ... + if sys.version_info >= (3, 10): + def handle_request(self, conn: _ServerConnection) -> None: ... + else: + def handle_request(self, c: _ServerConnection) -> None: ... + + def serve_client(self, conn: _ServerConnection) -> None: ... + def fallback_getvalue(self, conn: _ServerConnection, ident: str, obj: _T) -> _T: ... + def fallback_str(self, conn: _ServerConnection, ident: str, obj: Any) -> str: ... + def fallback_repr(self, conn: _ServerConnection, ident: str, obj: Any) -> str: ... + def dummy(self, c: _ServerConnection) -> None: ... + def debug_info(self, c: _ServerConnection) -> str: ... + def number_of_objects(self, c: _ServerConnection) -> int: ... + def shutdown(self, c: _ServerConnection) -> None: ... + def create(self, c: _ServerConnection, typeid: str, /, *args: Any, **kwds: Any) -> tuple[str, tuple[str, ...]]: ... + def get_methods(self, c: _ServerConnection, token: Token) -> set[str]: ... + def accept_connection(self, c: _ServerConnection, name: str) -> None: ... + def incref(self, c: _ServerConnection, ident: str) -> None: ... + def decref(self, c: _ServerConnection, ident: str) -> None: ... class BaseManager: if sys.version_info >= (3, 11): def __init__( self, - address: Any | None = None, + address: _Address | None = None, authkey: bytes | None = None, serializer: str = "pickle", ctx: BaseContext | None = None, @@ -176,7 +205,7 @@ class BaseManager: else: def __init__( self, - address: Any | None = None, + address: _Address | None = None, authkey: bytes | None = None, serializer: str = "pickle", ctx: BaseContext | None = None, @@ -188,7 +217,7 @@ class BaseManager: shutdown: _Finalize # only available after start() was called def join(self, timeout: float | None = None) -> None: ... # undocumented @property - def address(self) -> Any: ... + def address(self) -> _Address | None: ... @classmethod def register( cls, @@ -205,14 +234,26 @@ class BaseManager: ) -> None: ... class SyncManager(BaseManager): - def BoundedSemaphore(self, value: Any = ...) -> threading.BoundedSemaphore: ... - def Condition(self, lock: Any = ...) -> threading.Condition: ... + def Barrier( + self, parties: int, action: Callable[[], None] | None = None, timeout: float | None = None + ) -> threading.Barrier: ... + def BoundedSemaphore(self, value: int = 1) -> threading.BoundedSemaphore: ... + def Condition(self, lock: threading.Lock | threading._RLock | None = None) -> threading.Condition: ... def Event(self) -> threading.Event: ... def Lock(self) -> threading.Lock: ... def Namespace(self) -> _Namespace: ... + def Pool( + self, + processes: int | None = None, + initializer: Callable[..., object] | None = None, + initargs: Iterable[Any] = (), + maxtasksperchild: int | None = None, + context: Any | None = None, + ) -> pool.Pool: ... def Queue(self, maxsize: int = ...) -> queue.Queue[Any]: ... + def JoinableQueue(self, maxsize: int = ...) -> queue.Queue[Any]: ... def RLock(self) -> threading.RLock: ... - def Semaphore(self, value: Any = ...) -> threading.Semaphore: ... + def Semaphore(self, value: int = 1) -> threading.Semaphore: ... def Array(self, typecode: Any, sequence: Sequence[_T]) -> Sequence[_T]: ... def Value(self, typecode: Any, value: _T) -> ValueProxy[_T]: ... # Overloads are copied from builtins.dict.__init__ @@ -238,7 +279,11 @@ class SyncManager(BaseManager): def list(self) -> ListProxy[Any]: ... class RemoteError(Exception): ... -class SharedMemoryServer(Server): ... + +class SharedMemoryServer(Server): + def track_segment(self, c: _ServerConnection, segment_name: str) -> None: ... + def release_segment(self, c: _ServerConnection, segment_name: str) -> None: ... + def list_segments(self, c: _ServerConnection) -> list[str]: ... class SharedMemoryManager(BaseManager): def get_server(self) -> SharedMemoryServer: ... diff --git a/stdlib/multiprocessing/pool.pyi b/stdlib/multiprocessing/pool.pyi index 61d6d0781..2937d45e3 100644 --- a/stdlib/multiprocessing/pool.pyi +++ b/stdlib/multiprocessing/pool.pyi @@ -1,5 +1,6 @@ import sys from collections.abc import Callable, Iterable, Mapping +from multiprocessing.context import DefaultContext, Process from types import TracebackType from typing import Any, Final, Generic, TypeVar from typing_extensions import Self @@ -53,6 +54,8 @@ class Pool: maxtasksperchild: int | None = None, context: Any | None = None, ) -> None: ... + @staticmethod + def Process(ctx: DefaultContext, *args: Any, **kwds: Any) -> Process: ... def apply(self, func: Callable[..., _T], args: Iterable[Any] = (), kwds: Mapping[str, Any] = {}) -> _T: ... def apply_async( self, diff --git a/stdlib/multiprocessing/synchronize.pyi b/stdlib/multiprocessing/synchronize.pyi index e3cbfbc0e..a0d97baa0 100644 --- a/stdlib/multiprocessing/synchronize.pyi +++ b/stdlib/multiprocessing/synchronize.pyi @@ -54,6 +54,7 @@ class RLock(SemLock): class Semaphore(SemLock): def __init__(self, value: int = 1, *, ctx: BaseContext) -> None: ... + def get_value(self) -> int: ... class BoundedSemaphore(Semaphore): def __init__(self, value: int = 1, *, ctx: BaseContext) -> None: ...