concurrent.futures: allow as_completed to act covariantly (#11291)

This commit is contained in:
Shantanu
2024-01-30 20:14:29 -08:00
committed by GitHub
parent 98ba275d5e
commit 21f84d09c3
2 changed files with 50 additions and 2 deletions

View File

@@ -4,7 +4,7 @@ from _typeshed import Unused
from collections.abc import Callable, Iterable, Iterator
from logging import Logger
from types import TracebackType
from typing import Any, Generic, Literal, NamedTuple, TypeVar
from typing import Any, Generic, Literal, NamedTuple, Protocol, TypeVar
from typing_extensions import ParamSpec, Self
if sys.version_info >= (3, 9):
@@ -34,9 +34,15 @@ class InvalidStateError(Error): ...
class BrokenExecutor(RuntimeError): ...
_T = TypeVar("_T")
_T_co = TypeVar("_T_co", covariant=True)
_P = ParamSpec("_P")
class Future(Generic[_T]):
_condition: threading.Condition
_state: str
_result: _T | None
_exception: BaseException | None
_waiters: list[_Waiter]
def cancel(self) -> bool: ...
def cancelled(self) -> bool: ...
def running(self) -> bool: ...
@@ -69,7 +75,17 @@ class Executor:
self, exc_type: type[BaseException] | None, exc_val: BaseException | None, exc_tb: TracebackType | None
) -> bool | None: ...
def as_completed(fs: Iterable[Future[_T]], timeout: float | None = None) -> Iterator[Future[_T]]: ...
class _AsCompletedFuture(Protocol[_T_co]):
# as_completed only mutates non-generic aspects of passed Futures and does not do any nominal
# checks. Therefore, we can use a Protocol here to allow as_completed to act covariantly.
# See the tests for concurrent.futures
_condition: threading.Condition
_state: str
_waiters: list[_Waiter]
# Not used by as_completed, but needed to propagate the generic type
def result(self, timeout: float | None = None) -> _T_co: ...
def as_completed(fs: Iterable[_AsCompletedFuture[_T]], timeout: float | None = None) -> Iterator[Future[_T]]: ...
class DoneAndNotDoneFutures(NamedTuple, Generic[_T]):
done: set[Future[_T]]

View File

@@ -0,0 +1,32 @@
from __future__ import annotations
from collections.abc import Callable, Iterator
from concurrent.futures import Future, ThreadPoolExecutor, as_completed
from typing_extensions import assert_type
class Parent:
...
class Child(Parent):
...
def check_as_completed_covariance() -> None:
with ThreadPoolExecutor() as executor:
f1 = executor.submit(lambda: Parent())
f2 = executor.submit(lambda: Child())
fs: list[Future[Parent] | Future[Child]] = [f1, f2]
assert_type(as_completed(fs), Iterator[Future[Parent]])
for future in as_completed(fs):
assert_type(future.result(), Parent)
def check_future_invariance() -> None:
def execute_callback(callback: Callable[[], Parent], future: Future[Parent]) -> None:
future.set_result(callback())
fut: Future[Child] = Future()
execute_callback(lambda: Parent(), fut) # type: ignore
assert isinstance(fut.result(), Child)