on windows, resolve proactor and selector event loop policies only for python 3.7 and newer (#3866)

Signed-off-by: Oleg Höfling <oleg.hoefling@gmail.com>
This commit is contained in:
Oleg Höfling
2020-03-19 01:42:01 +01:00
committed by GitHub
parent 56e2e475e9
commit dc060fac2a

View File

@@ -1,7 +1,8 @@
from typing import Callable, Tuple, List, IO, Any, Optional
import socket
from . import proactor_events, events, futures, windows_utils, selector_events, streams
import sys
from typing import IO, Any, Callable, ClassVar, List, NoReturn, Optional, Tuple
from . import events, futures, proactor_events, selector_events, streams, windows_utils
NULL: int
INFINITE: int
@@ -11,7 +12,6 @@ CONNECT_PIPE_INIT_DELAY: float
CONNECT_PIPE_MAX_DELAY: float
class PipeServer:
def __init__(self, address: str) -> None: ...
def __del__(self) -> None: ...
def closed(self) -> bool: ...
@@ -20,13 +20,15 @@ class PipeServer:
class _WindowsSelectorEventLoop(selector_events.BaseSelectorEventLoop): ...
class ProactorEventLoop(proactor_events.BaseProactorEventLoop):
def __init__(self, proactor: Optional[IocpProactor] = ...) -> None: ...
async def create_pipe_connection(self, protocol_factory: Callable[[], streams.StreamReaderProtocol], address: str) -> Tuple[proactor_events._ProactorDuplexPipeTransport, streams.StreamReaderProtocol]: ...
async def start_serving_pipe(self, protocol_factory: Callable[[], streams.StreamReaderProtocol], address: str) -> List[PipeServer]: ...
async def create_pipe_connection(
self, protocol_factory: Callable[[], streams.StreamReaderProtocol], address: str
) -> Tuple[proactor_events._ProactorDuplexPipeTransport, streams.StreamReaderProtocol]: ...
async def start_serving_pipe(
self, protocol_factory: Callable[[], streams.StreamReaderProtocol], address: str
) -> List[PipeServer]: ...
class IocpProactor:
def __init__(self, concurrency: int = ...) -> None: ...
def __repr__(self) -> str: ...
def __del__(self) -> None: ...
@@ -45,14 +47,19 @@ class IocpProactor:
SelectorEventLoop = _WindowsSelectorEventLoop
class WindowsSelectorEventLoopPolicy(events.BaseDefaultEventLoopPolicy):
_loop_factory: events.AbstractEventLoop = ...
def get_child_watcher(self) -> Any: ...
def set_child_watcher(self, watcher: Any) -> None: ...
class WindowsProactorEventLoopPolicy(events.BaseDefaultEventLoopPolicy):
_loop_factory: events.AbstractEventLoop = ...
def get_child_watcher(self) -> Any: ...
def set_child_watcher(self, watcher: Any) -> None: ...
DefaultEventLoopPolicy = WindowsSelectorEventLoopPolicy
if sys.version_info >= (3, 7):
class WindowsSelectorEventLoopPolicy(events.BaseDefaultEventLoopPolicy):
_loop_factory: ClassVar[events.AbstractEventLoop]
def get_child_watcher(self) -> NoReturn: ...
def set_child_watcher(self, watcher: Any) -> NoReturn: ...
class WindowsProactorEventLoopPolicy(events.BaseDefaultEventLoopPolicy):
_loop_factory: ClassVar[events.AbstractEventLoop]
def get_child_watcher(self) -> NoReturn: ...
def set_child_watcher(self, watcher: Any) -> NoReturn: ...
DefaultEventLoopPolicy = WindowsSelectorEventLoopPolicy
else:
class _WindowsDefaultEventLoopPolicy(events.BaseDefaultEventLoopPolicy):
_loop_factory: ClassVar[events.AbstractEventLoop]
def get_child_watcher(self) -> NoReturn: ...
def set_child_watcher(self, watcher: Any) -> NoReturn: ...
DefaultEventLoopPolicy = _WindowsDefaultEventLoopPolicy