Fix stubtest failures for asyncio on windows (#4092)

This commit is contained in:
Rune Tynan
2020-05-27 13:08:19 -04:00
committed by GitHub
parent ed4993bd50
commit 713a2729b4
2 changed files with 25 additions and 7 deletions

View File

@@ -1,9 +1,18 @@
import socket
import sys
from typing import IO, Any, Callable, ClassVar, List, NoReturn, Optional, Tuple
from typing import IO, Any, Callable, ClassVar, List, NoReturn, Optional, Tuple, Type
from . import events, futures, proactor_events, selector_events, streams, windows_utils
__all__ = [
"SelectorEventLoop",
"ProactorEventLoop",
"IocpProactor",
"DefaultEventLoopPolicy",
"WindowsSelectorEventLoopPolicy",
"WindowsProactorEventLoopPolicy",
]
NULL: int
INFINITE: int
ERROR_CONNECTION_REFUSED: int
@@ -42,24 +51,24 @@ class IocpProactor:
def sendfile(self, sock: socket.socket, file: IO[bytes], offset: int, count: int) -> futures.Future[Any]: ...
def accept_pipe(self, pipe: socket.socket) -> futures.Future[Any]: ...
async def connect_pipe(self, address: bytes) -> windows_utils.PipeHandle: ...
def wait_for_handle(self, handle: windows_utils.PipeHandle, timeout: int = ...) -> bool: ...
def wait_for_handle(self, handle: windows_utils.PipeHandle, timeout: Optional[int] = ...) -> bool: ...
def close(self) -> None: ...
SelectorEventLoop = _WindowsSelectorEventLoop
if sys.version_info >= (3, 7):
class WindowsSelectorEventLoopPolicy(events.BaseDefaultEventLoopPolicy):
_loop_factory: ClassVar[events.AbstractEventLoop]
_loop_factory: ClassVar[Type[SelectorEventLoop]]
def get_child_watcher(self) -> NoReturn: ...
def set_child_watcher(self, watcher: Any) -> NoReturn: ...
class WindowsProactorEventLoopPolicy(events.BaseDefaultEventLoopPolicy):
_loop_factory: ClassVar[events.AbstractEventLoop]
_loop_factory: ClassVar[Type[ProactorEventLoop]]
def get_child_watcher(self) -> NoReturn: ...
def set_child_watcher(self, watcher: Any) -> NoReturn: ...
DefaultEventLoopPolicy = WindowsSelectorEventLoopPolicy
else:
class _WindowsDefaultEventLoopPolicy(events.BaseDefaultEventLoopPolicy):
_loop_factory: ClassVar[events.AbstractEventLoop]
_loop_factory: ClassVar[Type[SelectorEventLoop]]
def get_child_watcher(self) -> NoReturn: ...
def set_child_watcher(self, watcher: Any) -> NoReturn: ...
DefaultEventLoopPolicy = _WindowsDefaultEventLoopPolicy

View File

@@ -1,7 +1,13 @@
from typing import Tuple, Callable, Optional
import sys
from typing import Tuple, Callable, Optional, Type, Protocol
from types import TracebackType
class _WarnFunction(Protocol):
def __call__(self, message: str, category: Type[Warning], source: PipeHandle): ...
BUFSIZE: int
PIPE: int
STDOUT: int
@@ -12,7 +18,10 @@ class PipeHandle:
def __init__(self, handle: int) -> None: ...
def __repr__(self) -> str: ...
def __del__(self) -> None: ...
if sys.version_info >= (3, 8):
def __del__(self, _warn: _WarnFunction = ...) -> None: ...
else:
def __del__(self) -> None: ...
def __enter__(self) -> PipeHandle: ...
def __exit__(self, t: Optional[type], v: Optional[BaseException], tb: Optional[TracebackType]) -> None: ...
@property