diff --git a/stdlib/@tests/stubtest_allowlists/py314.txt b/stdlib/@tests/stubtest_allowlists/py314.txt index d66b81762..d88a157b4 100644 --- a/stdlib/@tests/stubtest_allowlists/py314.txt +++ b/stdlib/@tests/stubtest_allowlists/py314.txt @@ -60,17 +60,6 @@ compression.gzip.GzipFile.readinto1 compression.gzip.GzipFile.readinto1 compression.gzip.compress compression.zstd -concurrent.futures.__all__ -concurrent.futures.InterpreterPoolExecutor -concurrent.futures.ThreadPoolExecutor.BROKEN -concurrent.futures.ThreadPoolExecutor.prepare_context -concurrent.futures.interpreter -concurrent.futures.thread.ThreadPoolExecutor.BROKEN -concurrent.futures.thread.ThreadPoolExecutor.prepare_context -concurrent.futures.thread.WorkerContext -concurrent.futures.thread._WorkItem.__init__ -concurrent.futures.thread._WorkItem.run -concurrent.futures.thread._worker ctypes.POINTER ctypes.byref ctypes.memoryview_at diff --git a/stdlib/@tests/test_cases/check_concurrent_futures.py b/stdlib/@tests/test_cases/check_concurrent_futures.py index 962ec23c6..ba6ca0845 100644 --- a/stdlib/@tests/test_cases/check_concurrent_futures.py +++ b/stdlib/@tests/test_cases/check_concurrent_futures.py @@ -1,7 +1,9 @@ from __future__ import annotations +import sys from collections.abc import Callable, Iterator from concurrent.futures import Future, ThreadPoolExecutor, as_completed +from typing import Literal from typing_extensions import assert_type @@ -28,3 +30,49 @@ def check_future_invariance() -> None: fut: Future[Child] = Future() execute_callback(lambda: Parent(), fut) # type: ignore assert isinstance(fut.result(), Child) + + +if sys.version_info >= (3, 14): + + def _initializer(x: int) -> None: + pass + + def check_interpreter_pool_executor() -> None: + import concurrent.futures.interpreter + from concurrent.futures import InterpreterPoolExecutor + + with InterpreterPoolExecutor(initializer=_initializer, initargs=(1,)): + ... + + with InterpreterPoolExecutor(initializer=_initializer, initargs=("x",)): # type: ignore + ... + + context = InterpreterPoolExecutor.prepare_context(initializer=_initializer, initargs=(1,), shared={}) + worker_context = context[0]() + assert_type(worker_context, concurrent.futures.interpreter.WorkerContext) + resolve_task = context[1] + # Function should enfore that the arguments are correct. + res = resolve_task(_initializer, 1) + assert_type(res, tuple[bytes, Literal["function"]]) + # When the function is a script, the arguments should be a string. + str_res = resolve_task("print('Hello, world!')") + assert_type(str_res, tuple[bytes, Literal["script"]]) + # When a script is passed, no arguments should be provided. + resolve_task("print('Hello, world!')", 1) # type: ignore + + # `WorkerContext.__init__` should accept the result of a resolved task. + concurrent.futures.interpreter.WorkerContext(initdata=res) + + # Run should also accept the result of a resolved task. + worker_context.run(res) + + def check_thread_worker_context() -> None: + import concurrent.futures.thread + + context = concurrent.futures.thread.WorkerContext.prepare(initializer=_initializer, initargs=(1,)) + worker_context = context[0]() + assert_type(worker_context, concurrent.futures.thread.WorkerContext) + resolve_task = context[1] + res = resolve_task(_initializer, (1,), {"test": 1}) + assert_type(res[1], tuple[int]) + assert_type(res[2], dict[str, int]) diff --git a/stdlib/VERSIONS b/stdlib/VERSIONS index 9defa7c27..d13340ab3 100644 --- a/stdlib/VERSIONS +++ b/stdlib/VERSIONS @@ -121,6 +121,7 @@ colorsys: 3.0- compileall: 3.0- compression: 3.14- concurrent: 3.2- +concurrent.futures.interpreter: 3.14- configparser: 3.0- contextlib: 3.0- contextvars: 3.7- diff --git a/stdlib/concurrent/futures/__init__.pyi b/stdlib/concurrent/futures/__init__.pyi index 68fd0bc5a..dd1f6da80 100644 --- a/stdlib/concurrent/futures/__init__.pyi +++ b/stdlib/concurrent/futures/__init__.pyi @@ -16,7 +16,27 @@ from ._base import ( from .process import ProcessPoolExecutor as ProcessPoolExecutor from .thread import ThreadPoolExecutor as ThreadPoolExecutor -if sys.version_info >= (3, 13): +if sys.version_info >= (3, 14): + from .interpreter import InterpreterPoolExecutor as InterpreterPoolExecutor + + __all__ = ( + "FIRST_COMPLETED", + "FIRST_EXCEPTION", + "ALL_COMPLETED", + "CancelledError", + "TimeoutError", + "InvalidStateError", + "BrokenExecutor", + "Future", + "Executor", + "wait", + "as_completed", + "ProcessPoolExecutor", + "ThreadPoolExecutor", + "InterpreterPoolExecutor", + ) + +elif sys.version_info >= (3, 13): __all__ = ( "FIRST_COMPLETED", "FIRST_EXCEPTION", diff --git a/stdlib/concurrent/futures/interpreter.pyi b/stdlib/concurrent/futures/interpreter.pyi new file mode 100644 index 000000000..c1a29e6b0 --- /dev/null +++ b/stdlib/concurrent/futures/interpreter.pyi @@ -0,0 +1,102 @@ +import sys +from collections.abc import Callable, Mapping +from concurrent.futures import ThreadPoolExecutor +from typing import Final, Literal, Protocol, overload, type_check_only +from typing_extensions import ParamSpec, Self, TypeAlias, TypeVar, TypeVarTuple, Unpack + +_Task: TypeAlias = tuple[bytes, Literal["function", "script"]] + +@type_check_only +class _TaskFunc(Protocol): + @overload + def __call__(self, fn: Callable[_P, _R], *args: _P.args, **kwargs: _P.kwargs) -> tuple[bytes, Literal["function"]]: ... + @overload + def __call__(self, fn: str) -> tuple[bytes, Literal["script"]]: ... + +_Ts = TypeVarTuple("_Ts") +_P = ParamSpec("_P") +_R = TypeVar("_R") + +# A `type.simplenamespace` with `__name__` attribute. +@type_check_only +class _HasName(Protocol): + __name__: str + +# `_interpreters.exec` technically gives us a simple namespace. +@type_check_only +class _ExcInfo(Protocol): + formatted: str + msg: str + type: _HasName + +if sys.version_info >= (3, 14): + from concurrent.futures.thread import BrokenThreadPool, WorkerContext as ThreadWorkerContext + + from _interpreters import InterpreterError + + class ExecutionFailed(InterpreterError): + def __init__(self, excinfo: _ExcInfo) -> None: ... # type: ignore[override] + + UNBOUND: Final = 2 + + class WorkerContext(ThreadWorkerContext): + # Parent class doesn't have `shared` argument, + @overload # type: ignore[override] + @classmethod + def prepare( + cls, initializer: Callable[[Unpack[_Ts]], object], initargs: tuple[Unpack[_Ts]], shared: Mapping[str, object] + ) -> tuple[Callable[[], Self], _TaskFunc]: ... + @overload # type: ignore[override] + @classmethod + def prepare( + cls, initializer: Callable[[], object], initargs: tuple[()], shared: Mapping[str, object] + ) -> tuple[Callable[[], Self], _TaskFunc]: ... + def __init__( + self, initdata: tuple[bytes, Literal["function", "script"]], shared: Mapping[str, object] | None = None + ) -> None: ... # type: ignore[override] + def __del__(self) -> None: ... + def run(self, task: _Task) -> None: ... # type: ignore[override] + + class BrokenInterpreterPool(BrokenThreadPool): ... + + class InterpreterPoolExecutor(ThreadPoolExecutor): + BROKEN: type[BrokenInterpreterPool] + + @overload # type: ignore[override] + @classmethod + def prepare_context( + cls, initializer: Callable[[], object], initargs: tuple[()], shared: Mapping[str, object] + ) -> tuple[Callable[[], WorkerContext], _TaskFunc]: ... + @overload # type: ignore[override] + @classmethod + def prepare_context( + cls, initializer: Callable[[Unpack[_Ts]], object], initargs: tuple[Unpack[_Ts]], shared: Mapping[str, object] + ) -> tuple[Callable[[], WorkerContext], _TaskFunc]: ... + @overload + def __init__( + self, + max_workers: int | None = None, + thread_name_prefix: str = "", + initializer: Callable[[], object] | None = None, + initargs: tuple[()] = (), + shared: Mapping[str, object] | None = None, + ) -> None: ... + @overload + def __init__( + self, + max_workers: int | None = None, + thread_name_prefix: str = "", + *, + initializer: Callable[[Unpack[_Ts]], object], + initargs: tuple[Unpack[_Ts]], + shared: Mapping[str, object] | None = None, + ) -> None: ... + @overload + def __init__( + self, + max_workers: int | None, + thread_name_prefix: str, + initializer: Callable[[Unpack[_Ts]], object], + initargs: tuple[Unpack[_Ts]], + shared: Mapping[str, object] | None = None, + ) -> None: ... diff --git a/stdlib/concurrent/futures/thread.pyi b/stdlib/concurrent/futures/thread.pyi index da3e006b6..22df0dca5 100644 --- a/stdlib/concurrent/futures/thread.pyi +++ b/stdlib/concurrent/futures/thread.pyi @@ -1,9 +1,10 @@ import queue +import sys from collections.abc import Callable, Iterable, Mapping, Set as AbstractSet from threading import Lock, Semaphore, Thread from types import GenericAlias -from typing import Any, Generic, TypeVar, overload -from typing_extensions import TypeVarTuple, Unpack +from typing import Any, Generic, Protocol, TypeVar, overload, type_check_only +from typing_extensions import Self, TypeAlias, TypeVarTuple, Unpack from weakref import ref from ._base import BrokenExecutor, Executor, Future @@ -18,25 +19,71 @@ def _python_exit() -> None: ... _S = TypeVar("_S") -class _WorkItem(Generic[_S]): - future: Future[_S] - fn: Callable[..., _S] - args: Iterable[Any] - kwargs: Mapping[str, Any] - def __init__(self, future: Future[_S], fn: Callable[..., _S], args: Iterable[Any], kwargs: Mapping[str, Any]) -> None: ... - def run(self) -> None: ... - def __class_getitem__(cls, item: Any, /) -> GenericAlias: ... +_Task: TypeAlias = tuple[Callable[..., Any], tuple[Any, ...], dict[str, Any]] -def _worker( - executor_reference: ref[Any], - work_queue: queue.SimpleQueue[Any], - initializer: Callable[[Unpack[_Ts]], object], - initargs: tuple[Unpack[_Ts]], -) -> None: ... +_C = TypeVar("_C", bound=Callable[..., object]) +_KT = TypeVar("_KT", bound=str) +_VT = TypeVar("_VT") + +@type_check_only +class _ResolveTaskFunc(Protocol): + def __call__( + self, func: _C, args: tuple[Unpack[_Ts]], kwargs: dict[_KT, _VT] + ) -> tuple[_C, tuple[Unpack[_Ts]], dict[_KT, _VT]]: ... + +if sys.version_info >= (3, 14): + class WorkerContext: + @overload + @classmethod + def prepare( + cls, initializer: Callable[[Unpack[_Ts]], object], initargs: tuple[Unpack[_Ts]] + ) -> tuple[Callable[[], Self], _ResolveTaskFunc]: ... + @overload + @classmethod + def prepare( + cls, initializer: Callable[[], object], initargs: tuple[()] + ) -> tuple[Callable[[], Self], _ResolveTaskFunc]: ... + @overload + def __init__(self, initializer: Callable[[Unpack[_Ts]], object], initargs: tuple[Unpack[_Ts]]) -> None: ... + @overload + def __init__(self, initializer: Callable[[], object], initargs: tuple[()]) -> None: ... + def initialize(self) -> None: ... + def finalize(self) -> None: ... + def run(self, task: _Task) -> None: ... + +if sys.version_info >= (3, 14): + class _WorkItem(Generic[_S]): + future: Future[Any] + task: _Task + def __init__(self, future: Future[Any], task: _Task) -> None: ... + def run(self, ctx: WorkerContext) -> None: ... + def __class_getitem__(cls, item: Any, /) -> GenericAlias: ... + + def _worker(executor_reference: ref[Any], ctx: WorkerContext, work_queue: queue.SimpleQueue[Any]) -> None: ... + +else: + class _WorkItem(Generic[_S]): + future: Future[_S] + fn: Callable[..., _S] + args: Iterable[Any] + kwargs: Mapping[str, Any] + def __init__(self, future: Future[_S], fn: Callable[..., _S], args: Iterable[Any], kwargs: Mapping[str, Any]) -> None: ... + def run(self) -> None: ... + def __class_getitem__(cls, item: Any, /) -> GenericAlias: ... + + def _worker( + executor_reference: ref[Any], + work_queue: queue.SimpleQueue[Any], + initializer: Callable[[Unpack[_Ts]], object], + initargs: tuple[Unpack[_Ts]], + ) -> None: ... class BrokenThreadPool(BrokenExecutor): ... class ThreadPoolExecutor(Executor): + if sys.version_info >= (3, 14): + BROKEN: type[BrokenThreadPool] + _max_workers: int _idle_semaphore: Semaphore _threads: AbstractSet[Thread] @@ -47,6 +94,19 @@ class ThreadPoolExecutor(Executor): _initializer: Callable[..., None] | None _initargs: tuple[Any, ...] _work_queue: queue.SimpleQueue[_WorkItem[Any]] + + if sys.version_info >= (3, 14): + @overload + @classmethod + def prepare_context( + cls, initializer: Callable[[], object], initargs: tuple[()] + ) -> tuple[Callable[[], Self], _ResolveTaskFunc]: ... + @overload + @classmethod + def prepare_context( + cls, initializer: Callable[[Unpack[_Ts]], object], initargs: tuple[Unpack[_Ts]] + ) -> tuple[Callable[[], Self], _ResolveTaskFunc]: ... + @overload def __init__( self,