Add InterpreterPoolExecutor (3.14) (#14008)

This commit is contained in:
Max Muoto
2025-05-12 11:58:29 -05:00
committed by GitHub
parent 81fc4a7b20
commit 8b12b1664b
6 changed files with 248 additions and 28 deletions
@@ -60,17 +60,6 @@ compression.gzip.GzipFile.readinto1
compression.gzip.GzipFile.readinto1
compression.gzip.compress
compression.zstd
concurrent.futures.__all__
concurrent.futures.InterpreterPoolExecutor
concurrent.futures.ThreadPoolExecutor.BROKEN
concurrent.futures.ThreadPoolExecutor.prepare_context
concurrent.futures.interpreter
concurrent.futures.thread.ThreadPoolExecutor.BROKEN
concurrent.futures.thread.ThreadPoolExecutor.prepare_context
concurrent.futures.thread.WorkerContext
concurrent.futures.thread._WorkItem.__init__
concurrent.futures.thread._WorkItem.run
concurrent.futures.thread._worker
ctypes.POINTER
ctypes.byref
ctypes.memoryview_at
@@ -1,7 +1,9 @@
from __future__ import annotations
import sys
from collections.abc import Callable, Iterator
from concurrent.futures import Future, ThreadPoolExecutor, as_completed
from typing import Literal
from typing_extensions import assert_type
@@ -28,3 +30,49 @@ def check_future_invariance() -> None:
fut: Future[Child] = Future()
execute_callback(lambda: Parent(), fut) # type: ignore
assert isinstance(fut.result(), Child)
if sys.version_info >= (3, 14):
def _initializer(x: int) -> None:
pass
def check_interpreter_pool_executor() -> None:
import concurrent.futures.interpreter
from concurrent.futures import InterpreterPoolExecutor
with InterpreterPoolExecutor(initializer=_initializer, initargs=(1,)):
...
with InterpreterPoolExecutor(initializer=_initializer, initargs=("x",)): # type: ignore
...
context = InterpreterPoolExecutor.prepare_context(initializer=_initializer, initargs=(1,), shared={})
worker_context = context[0]()
assert_type(worker_context, concurrent.futures.interpreter.WorkerContext)
resolve_task = context[1]
# Function should enfore that the arguments are correct.
res = resolve_task(_initializer, 1)
assert_type(res, tuple[bytes, Literal["function"]])
# When the function is a script, the arguments should be a string.
str_res = resolve_task("print('Hello, world!')")
assert_type(str_res, tuple[bytes, Literal["script"]])
# When a script is passed, no arguments should be provided.
resolve_task("print('Hello, world!')", 1) # type: ignore
# `WorkerContext.__init__` should accept the result of a resolved task.
concurrent.futures.interpreter.WorkerContext(initdata=res)
# Run should also accept the result of a resolved task.
worker_context.run(res)
def check_thread_worker_context() -> None:
import concurrent.futures.thread
context = concurrent.futures.thread.WorkerContext.prepare(initializer=_initializer, initargs=(1,))
worker_context = context[0]()
assert_type(worker_context, concurrent.futures.thread.WorkerContext)
resolve_task = context[1]
res = resolve_task(_initializer, (1,), {"test": 1})
assert_type(res[1], tuple[int])
assert_type(res[2], dict[str, int])
+1
View File
@@ -121,6 +121,7 @@ colorsys: 3.0-
compileall: 3.0-
compression: 3.14-
concurrent: 3.2-
concurrent.futures.interpreter: 3.14-
configparser: 3.0-
contextlib: 3.0-
contextvars: 3.7-
+21 -1
View File
@@ -16,7 +16,27 @@ from ._base import (
from .process import ProcessPoolExecutor as ProcessPoolExecutor
from .thread import ThreadPoolExecutor as ThreadPoolExecutor
if sys.version_info >= (3, 13):
if sys.version_info >= (3, 14):
from .interpreter import InterpreterPoolExecutor as InterpreterPoolExecutor
__all__ = (
"FIRST_COMPLETED",
"FIRST_EXCEPTION",
"ALL_COMPLETED",
"CancelledError",
"TimeoutError",
"InvalidStateError",
"BrokenExecutor",
"Future",
"Executor",
"wait",
"as_completed",
"ProcessPoolExecutor",
"ThreadPoolExecutor",
"InterpreterPoolExecutor",
)
elif sys.version_info >= (3, 13):
__all__ = (
"FIRST_COMPLETED",
"FIRST_EXCEPTION",
+102
View File
@@ -0,0 +1,102 @@
import sys
from collections.abc import Callable, Mapping
from concurrent.futures import ThreadPoolExecutor
from typing import Final, Literal, Protocol, overload, type_check_only
from typing_extensions import ParamSpec, Self, TypeAlias, TypeVar, TypeVarTuple, Unpack
_Task: TypeAlias = tuple[bytes, Literal["function", "script"]]
@type_check_only
class _TaskFunc(Protocol):
@overload
def __call__(self, fn: Callable[_P, _R], *args: _P.args, **kwargs: _P.kwargs) -> tuple[bytes, Literal["function"]]: ...
@overload
def __call__(self, fn: str) -> tuple[bytes, Literal["script"]]: ...
_Ts = TypeVarTuple("_Ts")
_P = ParamSpec("_P")
_R = TypeVar("_R")
# A `type.simplenamespace` with `__name__` attribute.
@type_check_only
class _HasName(Protocol):
__name__: str
# `_interpreters.exec` technically gives us a simple namespace.
@type_check_only
class _ExcInfo(Protocol):
formatted: str
msg: str
type: _HasName
if sys.version_info >= (3, 14):
from concurrent.futures.thread import BrokenThreadPool, WorkerContext as ThreadWorkerContext
from _interpreters import InterpreterError
class ExecutionFailed(InterpreterError):
def __init__(self, excinfo: _ExcInfo) -> None: ... # type: ignore[override]
UNBOUND: Final = 2
class WorkerContext(ThreadWorkerContext):
# Parent class doesn't have `shared` argument,
@overload # type: ignore[override]
@classmethod
def prepare(
cls, initializer: Callable[[Unpack[_Ts]], object], initargs: tuple[Unpack[_Ts]], shared: Mapping[str, object]
) -> tuple[Callable[[], Self], _TaskFunc]: ...
@overload # type: ignore[override]
@classmethod
def prepare(
cls, initializer: Callable[[], object], initargs: tuple[()], shared: Mapping[str, object]
) -> tuple[Callable[[], Self], _TaskFunc]: ...
def __init__(
self, initdata: tuple[bytes, Literal["function", "script"]], shared: Mapping[str, object] | None = None
) -> None: ... # type: ignore[override]
def __del__(self) -> None: ...
def run(self, task: _Task) -> None: ... # type: ignore[override]
class BrokenInterpreterPool(BrokenThreadPool): ...
class InterpreterPoolExecutor(ThreadPoolExecutor):
BROKEN: type[BrokenInterpreterPool]
@overload # type: ignore[override]
@classmethod
def prepare_context(
cls, initializer: Callable[[], object], initargs: tuple[()], shared: Mapping[str, object]
) -> tuple[Callable[[], WorkerContext], _TaskFunc]: ...
@overload # type: ignore[override]
@classmethod
def prepare_context(
cls, initializer: Callable[[Unpack[_Ts]], object], initargs: tuple[Unpack[_Ts]], shared: Mapping[str, object]
) -> tuple[Callable[[], WorkerContext], _TaskFunc]: ...
@overload
def __init__(
self,
max_workers: int | None = None,
thread_name_prefix: str = "",
initializer: Callable[[], object] | None = None,
initargs: tuple[()] = (),
shared: Mapping[str, object] | None = None,
) -> None: ...
@overload
def __init__(
self,
max_workers: int | None = None,
thread_name_prefix: str = "",
*,
initializer: Callable[[Unpack[_Ts]], object],
initargs: tuple[Unpack[_Ts]],
shared: Mapping[str, object] | None = None,
) -> None: ...
@overload
def __init__(
self,
max_workers: int | None,
thread_name_prefix: str,
initializer: Callable[[Unpack[_Ts]], object],
initargs: tuple[Unpack[_Ts]],
shared: Mapping[str, object] | None = None,
) -> None: ...
+76 -16
View File
@@ -1,9 +1,10 @@
import queue
import sys
from collections.abc import Callable, Iterable, Mapping, Set as AbstractSet
from threading import Lock, Semaphore, Thread
from types import GenericAlias
from typing import Any, Generic, TypeVar, overload
from typing_extensions import TypeVarTuple, Unpack
from typing import Any, Generic, Protocol, TypeVar, overload, type_check_only
from typing_extensions import Self, TypeAlias, TypeVarTuple, Unpack
from weakref import ref
from ._base import BrokenExecutor, Executor, Future
@@ -18,25 +19,71 @@ def _python_exit() -> None: ...
_S = TypeVar("_S")
class _WorkItem(Generic[_S]):
future: Future[_S]
fn: Callable[..., _S]
args: Iterable[Any]
kwargs: Mapping[str, Any]
def __init__(self, future: Future[_S], fn: Callable[..., _S], args: Iterable[Any], kwargs: Mapping[str, Any]) -> None: ...
def run(self) -> None: ...
def __class_getitem__(cls, item: Any, /) -> GenericAlias: ...
_Task: TypeAlias = tuple[Callable[..., Any], tuple[Any, ...], dict[str, Any]]
def _worker(
executor_reference: ref[Any],
work_queue: queue.SimpleQueue[Any],
initializer: Callable[[Unpack[_Ts]], object],
initargs: tuple[Unpack[_Ts]],
) -> None: ...
_C = TypeVar("_C", bound=Callable[..., object])
_KT = TypeVar("_KT", bound=str)
_VT = TypeVar("_VT")
@type_check_only
class _ResolveTaskFunc(Protocol):
def __call__(
self, func: _C, args: tuple[Unpack[_Ts]], kwargs: dict[_KT, _VT]
) -> tuple[_C, tuple[Unpack[_Ts]], dict[_KT, _VT]]: ...
if sys.version_info >= (3, 14):
class WorkerContext:
@overload
@classmethod
def prepare(
cls, initializer: Callable[[Unpack[_Ts]], object], initargs: tuple[Unpack[_Ts]]
) -> tuple[Callable[[], Self], _ResolveTaskFunc]: ...
@overload
@classmethod
def prepare(
cls, initializer: Callable[[], object], initargs: tuple[()]
) -> tuple[Callable[[], Self], _ResolveTaskFunc]: ...
@overload
def __init__(self, initializer: Callable[[Unpack[_Ts]], object], initargs: tuple[Unpack[_Ts]]) -> None: ...
@overload
def __init__(self, initializer: Callable[[], object], initargs: tuple[()]) -> None: ...
def initialize(self) -> None: ...
def finalize(self) -> None: ...
def run(self, task: _Task) -> None: ...
if sys.version_info >= (3, 14):
class _WorkItem(Generic[_S]):
future: Future[Any]
task: _Task
def __init__(self, future: Future[Any], task: _Task) -> None: ...
def run(self, ctx: WorkerContext) -> None: ...
def __class_getitem__(cls, item: Any, /) -> GenericAlias: ...
def _worker(executor_reference: ref[Any], ctx: WorkerContext, work_queue: queue.SimpleQueue[Any]) -> None: ...
else:
class _WorkItem(Generic[_S]):
future: Future[_S]
fn: Callable[..., _S]
args: Iterable[Any]
kwargs: Mapping[str, Any]
def __init__(self, future: Future[_S], fn: Callable[..., _S], args: Iterable[Any], kwargs: Mapping[str, Any]) -> None: ...
def run(self) -> None: ...
def __class_getitem__(cls, item: Any, /) -> GenericAlias: ...
def _worker(
executor_reference: ref[Any],
work_queue: queue.SimpleQueue[Any],
initializer: Callable[[Unpack[_Ts]], object],
initargs: tuple[Unpack[_Ts]],
) -> None: ...
class BrokenThreadPool(BrokenExecutor): ...
class ThreadPoolExecutor(Executor):
if sys.version_info >= (3, 14):
BROKEN: type[BrokenThreadPool]
_max_workers: int
_idle_semaphore: Semaphore
_threads: AbstractSet[Thread]
@@ -47,6 +94,19 @@ class ThreadPoolExecutor(Executor):
_initializer: Callable[..., None] | None
_initargs: tuple[Any, ...]
_work_queue: queue.SimpleQueue[_WorkItem[Any]]
if sys.version_info >= (3, 14):
@overload
@classmethod
def prepare_context(
cls, initializer: Callable[[], object], initargs: tuple[()]
) -> tuple[Callable[[], Self], _ResolveTaskFunc]: ...
@overload
@classmethod
def prepare_context(
cls, initializer: Callable[[Unpack[_Ts]], object], initargs: tuple[Unpack[_Ts]]
) -> tuple[Callable[[], Self], _ResolveTaskFunc]: ...
@overload
def __init__(
self,