Mark several asyncio modules as Windows-only (#6796)

This commit is contained in:
Nikita Sobolev
2022-01-03 16:52:39 +03:00
committed by GitHub
parent 96c836fc41
commit 6e45c3196d
2 changed files with 91 additions and 93 deletions

View File

@@ -6,76 +6,72 @@ from typing_extensions import Literal
from . import events, futures, proactor_events, selector_events, streams, windows_utils
if sys.version_info >= (3, 7):
__all__ = (
"SelectorEventLoop",
"ProactorEventLoop",
"IocpProactor",
"DefaultEventLoopPolicy",
"WindowsSelectorEventLoopPolicy",
"WindowsProactorEventLoopPolicy",
)
else:
__all__ = ["SelectorEventLoop", "ProactorEventLoop", "IocpProactor", "DefaultEventLoopPolicy"]
NULL: Literal[0]
INFINITE: Literal[0xFFFFFFFF]
ERROR_CONNECTION_REFUSED: Literal[1225]
ERROR_CONNECTION_ABORTED: Literal[1236]
CONNECT_PIPE_INIT_DELAY: float
CONNECT_PIPE_MAX_DELAY: float
class PipeServer:
def __init__(self, address: str) -> None: ...
def __del__(self) -> None: ...
def closed(self) -> bool: ...
def close(self) -> None: ...
class _WindowsSelectorEventLoop(selector_events.BaseSelectorEventLoop): ...
class ProactorEventLoop(proactor_events.BaseProactorEventLoop):
def __init__(self, proactor: IocpProactor | None = ...) -> None: ...
async def create_pipe_connection(
self, protocol_factory: Callable[[], streams.StreamReaderProtocol], address: str
) -> tuple[proactor_events._ProactorDuplexPipeTransport, streams.StreamReaderProtocol]: ...
async def start_serving_pipe(
self, protocol_factory: Callable[[], streams.StreamReaderProtocol], address: str
) -> list[PipeServer]: ...
class IocpProactor:
def __init__(self, concurrency: int = ...) -> None: ...
def __repr__(self) -> str: ...
def __del__(self) -> None: ...
def set_loop(self, loop: events.AbstractEventLoop) -> None: ...
def select(self, timeout: int | None = ...) -> list[futures.Future[Any]]: ...
def recv(self, conn: socket.socket, nbytes: int, flags: int = ...) -> futures.Future[bytes]: ...
if sys.platform == "win32":
if sys.version_info >= (3, 7):
def recv_into(self, conn: socket.socket, buf: WriteableBuffer, flags: int = ...) -> futures.Future[Any]: ...
def send(self, conn: socket.socket, buf: WriteableBuffer, flags: int = ...) -> futures.Future[Any]: ...
def accept(self, listener: socket.socket) -> futures.Future[Any]: ...
def connect(self, conn: socket.socket, address: bytes) -> futures.Future[Any]: ...
__all__ = (
"SelectorEventLoop",
"ProactorEventLoop",
"IocpProactor",
"DefaultEventLoopPolicy",
"WindowsSelectorEventLoopPolicy",
"WindowsProactorEventLoopPolicy",
)
else:
__all__ = ["SelectorEventLoop", "ProactorEventLoop", "IocpProactor", "DefaultEventLoopPolicy"]
NULL: Literal[0]
INFINITE: Literal[0xFFFFFFFF]
ERROR_CONNECTION_REFUSED: Literal[1225]
ERROR_CONNECTION_ABORTED: Literal[1236]
CONNECT_PIPE_INIT_DELAY: float
CONNECT_PIPE_MAX_DELAY: float
class PipeServer:
def __init__(self, address: str) -> None: ...
def __del__(self) -> None: ...
def closed(self) -> bool: ...
def close(self) -> None: ...
class _WindowsSelectorEventLoop(selector_events.BaseSelectorEventLoop): ...
class ProactorEventLoop(proactor_events.BaseProactorEventLoop):
def __init__(self, proactor: IocpProactor | None = ...) -> None: ...
async def create_pipe_connection(
self, protocol_factory: Callable[[], streams.StreamReaderProtocol], address: str
) -> tuple[proactor_events._ProactorDuplexPipeTransport, streams.StreamReaderProtocol]: ...
async def start_serving_pipe(
self, protocol_factory: Callable[[], streams.StreamReaderProtocol], address: str
) -> list[PipeServer]: ...
class IocpProactor:
def __init__(self, concurrency: int = ...) -> None: ...
def __repr__(self) -> str: ...
def __del__(self) -> None: ...
def set_loop(self, loop: events.AbstractEventLoop) -> None: ...
def select(self, timeout: int | None = ...) -> list[futures.Future[Any]]: ...
def recv(self, conn: socket.socket, nbytes: int, flags: int = ...) -> futures.Future[bytes]: ...
if sys.version_info >= (3, 7):
def recv_into(self, conn: socket.socket, buf: WriteableBuffer, flags: int = ...) -> futures.Future[Any]: ...
def send(self, conn: socket.socket, buf: WriteableBuffer, flags: int = ...) -> futures.Future[Any]: ...
def accept(self, listener: socket.socket) -> futures.Future[Any]: ...
def connect(self, conn: socket.socket, address: bytes) -> futures.Future[Any]: ...
if sys.version_info >= (3, 7):
def sendfile(self, sock: socket.socket, file: IO[bytes], offset: int, count: int) -> futures.Future[Any]: ...
def accept_pipe(self, pipe: socket.socket) -> futures.Future[Any]: ...
async def connect_pipe(self, address: bytes) -> windows_utils.PipeHandle: ...
def wait_for_handle(self, handle: windows_utils.PipeHandle, timeout: int | None = ...) -> bool: ...
def close(self) -> None: ...
SelectorEventLoop = _WindowsSelectorEventLoop
if sys.version_info >= (3, 7):
def sendfile(self, sock: socket.socket, file: IO[bytes], offset: int, count: int) -> futures.Future[Any]: ...
def accept_pipe(self, pipe: socket.socket) -> futures.Future[Any]: ...
async def connect_pipe(self, address: bytes) -> windows_utils.PipeHandle: ...
def wait_for_handle(self, handle: windows_utils.PipeHandle, timeout: int | None = ...) -> bool: ...
def close(self) -> None: ...
SelectorEventLoop = _WindowsSelectorEventLoop
if sys.version_info >= (3, 7):
class WindowsSelectorEventLoopPolicy(events.BaseDefaultEventLoopPolicy):
_loop_factory: ClassVar[Type[SelectorEventLoop]]
def get_child_watcher(self) -> NoReturn: ...
def set_child_watcher(self, watcher: Any) -> NoReturn: ...
class WindowsProactorEventLoopPolicy(events.BaseDefaultEventLoopPolicy):
_loop_factory: ClassVar[Type[ProactorEventLoop]]
def get_child_watcher(self) -> NoReturn: ...
def set_child_watcher(self, watcher: Any) -> NoReturn: ...
DefaultEventLoopPolicy = WindowsSelectorEventLoopPolicy
else:
class _WindowsDefaultEventLoopPolicy(events.BaseDefaultEventLoopPolicy):
_loop_factory: ClassVar[Type[SelectorEventLoop]]
def get_child_watcher(self) -> NoReturn: ...
def set_child_watcher(self, watcher: Any) -> NoReturn: ...
DefaultEventLoopPolicy = _WindowsDefaultEventLoopPolicy
class WindowsSelectorEventLoopPolicy(events.BaseDefaultEventLoopPolicy):
_loop_factory: ClassVar[Type[SelectorEventLoop]]
def get_child_watcher(self) -> NoReturn: ...
def set_child_watcher(self, watcher: Any) -> NoReturn: ...
class WindowsProactorEventLoopPolicy(events.BaseDefaultEventLoopPolicy):
_loop_factory: ClassVar[Type[ProactorEventLoop]]
def get_child_watcher(self) -> NoReturn: ...
def set_child_watcher(self, watcher: Any) -> NoReturn: ...
DefaultEventLoopPolicy = WindowsSelectorEventLoopPolicy
else:
class _WindowsDefaultEventLoopPolicy(events.BaseDefaultEventLoopPolicy):
_loop_factory: ClassVar[Type[SelectorEventLoop]]
def get_child_watcher(self) -> NoReturn: ...
def set_child_watcher(self, watcher: Any) -> NoReturn: ...
DefaultEventLoopPolicy = _WindowsDefaultEventLoopPolicy

View File

@@ -1,27 +1,29 @@
import subprocess
import sys
from _typeshed import Self
from types import TracebackType
from typing import Callable, Protocol, Type
from typing_extensions import Literal
class _WarnFunction(Protocol):
def __call__(self, message: str, category: Type[Warning] = ..., stacklevel: int = ..., source: PipeHandle = ...) -> None: ...
BUFSIZE: int
PIPE: int
STDOUT: int
def pipe(*, duplex: bool = ..., overlapped: tuple[bool, bool] = ..., bufsize: int = ...) -> tuple[int, int]: ...
class PipeHandle:
def __init__(self, handle: int) -> None: ...
def __repr__(self) -> str: ...
if sys.version_info >= (3, 8):
def __del__(self, _warn: _WarnFunction = ...) -> None: ...
else:
def __del__(self) -> None: ...
def __enter__(self: Self) -> Self: ...
def __exit__(self, t: type | None, v: BaseException | None, tb: TracebackType | None) -> None: ...
@property
def handle(self) -> int: ...
def fileno(self) -> int: ...
def close(self, *, CloseHandle: Callable[[int], None] = ...) -> None: ...
if sys.platform == "win32":
class _WarnFunction(Protocol):
def __call__(
self, message: str, category: Type[Warning] = ..., stacklevel: int = ..., source: PipeHandle = ...
) -> None: ...
BUFSIZE: Literal[8192]
PIPE = subprocess.PIPE
STDOUT = subprocess.STDOUT
def pipe(*, duplex: bool = ..., overlapped: tuple[bool, bool] = ..., bufsize: int = ...) -> tuple[int, int]: ...
class PipeHandle:
def __init__(self, handle: int) -> None: ...
def __repr__(self) -> str: ...
if sys.version_info >= (3, 8):
def __del__(self, _warn: _WarnFunction = ...) -> None: ...
else:
def __del__(self) -> None: ...
def __enter__(self: Self) -> Self: ...
def __exit__(self, t: type | None, v: BaseException | None, tb: TracebackType | None) -> None: ...
@property
def handle(self) -> int: ...
def fileno(self) -> int: ...
def close(self, *, CloseHandle: Callable[[int], None] = ...) -> None: ...