stdlib: Use Unpack for concurrent.futures.*Executor constructors (#11171)

This commit is contained in:
Ali Hamdan
2023-12-18 13:46:15 +01:00
committed by GitHub
parent 96526875f7
commit cdad9047d5
2 changed files with 76 additions and 14 deletions

View File

@@ -5,12 +5,14 @@ from multiprocessing.context import BaseContext, Process
from multiprocessing.queues import Queue, SimpleQueue
from threading import Lock, Semaphore, Thread
from types import TracebackType
from typing import Any, Generic, TypeVar
from typing import Any, Generic, TypeVar, overload
from typing_extensions import TypeVarTuple, Unpack
from weakref import ref
from ._base import BrokenExecutor, Executor, Future
_T = TypeVar("_T")
_Ts = TypeVarTuple("_Ts")
_threads_wakeups: MutableMapping[Any, Any]
_global_shutdown: bool
@@ -109,8 +111,8 @@ if sys.version_info >= (3, 11):
def _process_worker(
call_queue: Queue[_CallItem],
result_queue: SimpleQueue[_ResultItem],
initializer: Callable[..., object] | None,
initargs: tuple[Any, ...],
initializer: Callable[[Unpack[_Ts]], object] | None,
initargs: tuple[Unpack[_Ts]],
max_tasks: int | None = None,
) -> None: ...
@@ -118,8 +120,8 @@ else:
def _process_worker(
call_queue: Queue[_CallItem],
result_queue: SimpleQueue[_ResultItem],
initializer: Callable[..., object] | None,
initargs: tuple[Any, ...],
initializer: Callable[[Unpack[_Ts]], object] | None,
initargs: tuple[Unpack[_Ts]],
) -> None: ...
if sys.version_info >= (3, 9):
@@ -169,22 +171,61 @@ class ProcessPoolExecutor(Executor):
_result_queue: SimpleQueue[Any]
_work_ids: Queue[Any]
if sys.version_info >= (3, 11):
@overload
def __init__(
self,
max_workers: int | None = None,
mp_context: BaseContext | None = None,
initializer: Callable[..., object] | None = None,
initargs: tuple[Any, ...] = (),
initializer: Callable[[], object] | None = None,
initargs: tuple[()] = (),
*,
max_tasks_per_child: int | None = None,
) -> None: ...
@overload
def __init__(
self,
max_workers: int | None = None,
mp_context: BaseContext | None = None,
*,
initializer: Callable[[Unpack[_Ts]], object],
initargs: tuple[Unpack[_Ts]],
max_tasks_per_child: int | None = None,
) -> None: ...
@overload
def __init__(
self,
max_workers: int | None,
mp_context: BaseContext | None,
initializer: Callable[[Unpack[_Ts]], object],
initargs: tuple[Unpack[_Ts]],
*,
max_tasks_per_child: int | None = None,
) -> None: ...
else:
@overload
def __init__(
self,
max_workers: int | None = None,
mp_context: BaseContext | None = None,
initializer: Callable[..., object] | None = None,
initargs: tuple[Any, ...] = (),
initializer: Callable[[], object] | None = None,
initargs: tuple[()] = (),
) -> None: ...
@overload
def __init__(
self,
max_workers: int | None = None,
mp_context: BaseContext | None = None,
*,
initializer: Callable[[Unpack[_Ts]], object],
initargs: tuple[Unpack[_Ts]],
) -> None: ...
@overload
def __init__(
self,
max_workers: int | None,
mp_context: BaseContext | None,
initializer: Callable[[Unpack[_Ts]], object],
initargs: tuple[Unpack[_Ts]],
) -> None: ...
if sys.version_info >= (3, 9):
def _start_executor_manager_thread(self) -> None: ...

View File

@@ -2,11 +2,14 @@ import queue
import sys
from collections.abc import Callable, Iterable, Mapping, Set as AbstractSet
from threading import Lock, Semaphore, Thread
from typing import Any, Generic, TypeVar
from typing import Any, Generic, TypeVar, overload
from typing_extensions import TypeVarTuple, Unpack
from weakref import ref
from ._base import BrokenExecutor, Executor, Future
_Ts = TypeVarTuple("_Ts")
_threads_queues: Mapping[Any, Any]
_shutdown: bool
_global_shutdown_lock: Lock
@@ -31,8 +34,8 @@ class _WorkItem(Generic[_S]):
def _worker(
executor_reference: ref[Any],
work_queue: queue.SimpleQueue[Any],
initializer: Callable[..., object],
initargs: tuple[Any, ...],
initializer: Callable[[Unpack[_Ts]], object],
initargs: tuple[Unpack[_Ts]],
) -> None: ...
class BrokenThreadPool(BrokenExecutor): ...
@@ -48,12 +51,30 @@ class ThreadPoolExecutor(Executor):
_initializer: Callable[..., None] | None
_initargs: tuple[Any, ...]
_work_queue: queue.SimpleQueue[_WorkItem[Any]]
@overload
def __init__(
self,
max_workers: int | None = None,
thread_name_prefix: str = "",
initializer: Callable[..., object] | None = None,
initargs: tuple[Any, ...] = (),
initializer: Callable[[], object] | None = None,
initargs: tuple[()] = (),
) -> None: ...
@overload
def __init__(
self,
max_workers: int | None = None,
thread_name_prefix: str = "",
*,
initializer: Callable[[Unpack[_Ts]], object],
initargs: tuple[Unpack[_Ts]],
) -> None: ...
@overload
def __init__(
self,
max_workers: int | None,
thread_name_prefix: str,
initializer: Callable[[Unpack[_Ts]], object],
initargs: tuple[Unpack[_Ts]],
) -> None: ...
def _adjust_thread_count(self) -> None: ...
def _initializer_failed(self) -> None: ...