From cdad9047d58c091319c8d999bbd3530623970727 Mon Sep 17 00:00:00 2001 From: Ali Hamdan Date: Mon, 18 Dec 2023 13:46:15 +0100 Subject: [PATCH] stdlib: Use `Unpack` for concurrent.futures.*Executor constructors (#11171) --- stdlib/concurrent/futures/process.pyi | 59 +++++++++++++++++++++++---- stdlib/concurrent/futures/thread.pyi | 31 +++++++++++--- 2 files changed, 76 insertions(+), 14 deletions(-) diff --git a/stdlib/concurrent/futures/process.pyi b/stdlib/concurrent/futures/process.pyi index 000e7a435..d3706a9c1 100644 --- a/stdlib/concurrent/futures/process.pyi +++ b/stdlib/concurrent/futures/process.pyi @@ -5,12 +5,14 @@ from multiprocessing.context import BaseContext, Process from multiprocessing.queues import Queue, SimpleQueue from threading import Lock, Semaphore, Thread from types import TracebackType -from typing import Any, Generic, TypeVar +from typing import Any, Generic, TypeVar, overload +from typing_extensions import TypeVarTuple, Unpack from weakref import ref from ._base import BrokenExecutor, Executor, Future _T = TypeVar("_T") +_Ts = TypeVarTuple("_Ts") _threads_wakeups: MutableMapping[Any, Any] _global_shutdown: bool @@ -109,8 +111,8 @@ if sys.version_info >= (3, 11): def _process_worker( call_queue: Queue[_CallItem], result_queue: SimpleQueue[_ResultItem], - initializer: Callable[..., object] | None, - initargs: tuple[Any, ...], + initializer: Callable[[Unpack[_Ts]], object] | None, + initargs: tuple[Unpack[_Ts]], max_tasks: int | None = None, ) -> None: ... @@ -118,8 +120,8 @@ else: def _process_worker( call_queue: Queue[_CallItem], result_queue: SimpleQueue[_ResultItem], - initializer: Callable[..., object] | None, - initargs: tuple[Any, ...], + initializer: Callable[[Unpack[_Ts]], object] | None, + initargs: tuple[Unpack[_Ts]], ) -> None: ... if sys.version_info >= (3, 9): @@ -169,22 +171,61 @@ class ProcessPoolExecutor(Executor): _result_queue: SimpleQueue[Any] _work_ids: Queue[Any] if sys.version_info >= (3, 11): + @overload def __init__( self, max_workers: int | None = None, mp_context: BaseContext | None = None, - initializer: Callable[..., object] | None = None, - initargs: tuple[Any, ...] = (), + initializer: Callable[[], object] | None = None, + initargs: tuple[()] = (), + *, + max_tasks_per_child: int | None = None, + ) -> None: ... + @overload + def __init__( + self, + max_workers: int | None = None, + mp_context: BaseContext | None = None, + *, + initializer: Callable[[Unpack[_Ts]], object], + initargs: tuple[Unpack[_Ts]], + max_tasks_per_child: int | None = None, + ) -> None: ... + @overload + def __init__( + self, + max_workers: int | None, + mp_context: BaseContext | None, + initializer: Callable[[Unpack[_Ts]], object], + initargs: tuple[Unpack[_Ts]], *, max_tasks_per_child: int | None = None, ) -> None: ... else: + @overload def __init__( self, max_workers: int | None = None, mp_context: BaseContext | None = None, - initializer: Callable[..., object] | None = None, - initargs: tuple[Any, ...] = (), + initializer: Callable[[], object] | None = None, + initargs: tuple[()] = (), + ) -> None: ... + @overload + def __init__( + self, + max_workers: int | None = None, + mp_context: BaseContext | None = None, + *, + initializer: Callable[[Unpack[_Ts]], object], + initargs: tuple[Unpack[_Ts]], + ) -> None: ... + @overload + def __init__( + self, + max_workers: int | None, + mp_context: BaseContext | None, + initializer: Callable[[Unpack[_Ts]], object], + initargs: tuple[Unpack[_Ts]], ) -> None: ... if sys.version_info >= (3, 9): def _start_executor_manager_thread(self) -> None: ... diff --git a/stdlib/concurrent/futures/thread.pyi b/stdlib/concurrent/futures/thread.pyi index 0b00d524a..f38cf2c57 100644 --- a/stdlib/concurrent/futures/thread.pyi +++ b/stdlib/concurrent/futures/thread.pyi @@ -2,11 +2,14 @@ import queue import sys from collections.abc import Callable, Iterable, Mapping, Set as AbstractSet from threading import Lock, Semaphore, Thread -from typing import Any, Generic, TypeVar +from typing import Any, Generic, TypeVar, overload +from typing_extensions import TypeVarTuple, Unpack from weakref import ref from ._base import BrokenExecutor, Executor, Future +_Ts = TypeVarTuple("_Ts") + _threads_queues: Mapping[Any, Any] _shutdown: bool _global_shutdown_lock: Lock @@ -31,8 +34,8 @@ class _WorkItem(Generic[_S]): def _worker( executor_reference: ref[Any], work_queue: queue.SimpleQueue[Any], - initializer: Callable[..., object], - initargs: tuple[Any, ...], + initializer: Callable[[Unpack[_Ts]], object], + initargs: tuple[Unpack[_Ts]], ) -> None: ... class BrokenThreadPool(BrokenExecutor): ... @@ -48,12 +51,30 @@ class ThreadPoolExecutor(Executor): _initializer: Callable[..., None] | None _initargs: tuple[Any, ...] _work_queue: queue.SimpleQueue[_WorkItem[Any]] + @overload def __init__( self, max_workers: int | None = None, thread_name_prefix: str = "", - initializer: Callable[..., object] | None = None, - initargs: tuple[Any, ...] = (), + initializer: Callable[[], object] | None = None, + initargs: tuple[()] = (), + ) -> None: ... + @overload + def __init__( + self, + max_workers: int | None = None, + thread_name_prefix: str = "", + *, + initializer: Callable[[Unpack[_Ts]], object], + initargs: tuple[Unpack[_Ts]], + ) -> None: ... + @overload + def __init__( + self, + max_workers: int | None, + thread_name_prefix: str, + initializer: Callable[[Unpack[_Ts]], object], + initargs: tuple[Unpack[_Ts]], ) -> None: ... def _adjust_thread_count(self) -> None: ... def _initializer_failed(self) -> None: ...