mirror of
https://github.com/davidhalter/typeshed.git
synced 2025-12-08 13:04:46 +08:00
Improve asyncio.tasks stubs (#2139)
- Use PEP 526-style variable annotations where possible - Use FrameType instead of Any - Remove workaround for mypy issue that was since fixed
This commit is contained in:
committed by
Guido van Rossum
parent
f5a74fd5da
commit
82c930e66c
@@ -1,6 +1,7 @@
|
||||
from typing import (Any, TypeVar, Set, Dict, List, TextIO, Union, Tuple, Generic, Callable,
|
||||
Coroutine, Generator, Iterable, Awaitable, overload, Sequence, Iterator,
|
||||
Optional)
|
||||
from types import FrameType
|
||||
import concurrent.futures
|
||||
from .events import AbstractEventLoop
|
||||
from .futures import Future
|
||||
@@ -15,9 +16,9 @@ _T4 = TypeVar('_T4')
|
||||
_T5 = TypeVar('_T5')
|
||||
_FutureT = Union[Future[_T], Generator[Any, None, _T], Awaitable[_T]]
|
||||
|
||||
FIRST_EXCEPTION = 'FIRST_EXCEPTION'
|
||||
FIRST_COMPLETED = 'FIRST_COMPLETED'
|
||||
ALL_COMPLETED = 'ALL_COMPLETED'
|
||||
FIRST_EXCEPTION: str
|
||||
FIRST_COMPLETED: str
|
||||
ALL_COMPLETED: str
|
||||
|
||||
def as_completed(fs: Sequence[_FutureT[_T]], *, loop: AbstractEventLoop = ...,
|
||||
timeout: Optional[float] = ...) -> Iterator[Generator[Any, None, _T]]: ...
|
||||
@@ -57,22 +58,13 @@ def wait_for(fut: _FutureT[_T], timeout: Optional[float],
|
||||
*, loop: AbstractEventLoop = ...) -> Future[_T]: ...
|
||||
|
||||
class Task(Future[_T], Generic[_T]):
|
||||
_all_tasks = ... # type: Set[Task]
|
||||
_current_tasks = ... # type: Dict[AbstractEventLoop, Task]
|
||||
@classmethod
|
||||
def current_task(cls, loop: AbstractEventLoop = ...) -> Task: ...
|
||||
@classmethod
|
||||
def all_tasks(cls, loop: AbstractEventLoop = ...) -> Set[Task]: ...
|
||||
|
||||
# Can't use a union, see mypy issue #1873.
|
||||
@overload
|
||||
def __init__(self, coro: Generator[Any, None, _T],
|
||||
*, loop: AbstractEventLoop = ...) -> None: ...
|
||||
@overload
|
||||
def __init__(self, coro: Awaitable[_T], *, loop: AbstractEventLoop = ...) -> None: ...
|
||||
|
||||
def __init__(self, coro: Union[Generator[Any, None, _T], Awaitable[_T]], *, loop: AbstractEventLoop = ...) -> None: ...
|
||||
def __repr__(self) -> str: ...
|
||||
def get_stack(self, *, limit: int = ...) -> List[Any]: ... # return List[stackframe]
|
||||
def get_stack(self, *, limit: int = ...) -> List[FrameType]: ...
|
||||
def print_stack(self, *, limit: int = ..., file: TextIO = ...) -> None: ...
|
||||
def cancel(self) -> bool: ...
|
||||
def _step(self, value: Any = ..., exc: Exception = ...) -> None: ...
|
||||
|
||||
Reference in New Issue
Block a user