mirror of
https://github.com/davidhalter/typeshed.git
synced 2025-12-24 12:51:27 +08:00
Clean up several version-dependent modules (#6885)
This commit is contained in:
@@ -1,7 +1,5 @@
|
||||
import sys
|
||||
PY34: bool
|
||||
PY35: bool
|
||||
PY352: bool
|
||||
|
||||
if sys.version_info < (3, 7):
|
||||
PY34: bool
|
||||
PY35: bool
|
||||
PY352: bool
|
||||
def flatten_list_bytes(list_of_data: list[bytes]) -> bytes: ...
|
||||
def flatten_list_bytes(list_of_data: list[bytes]) -> bytes: ...
|
||||
|
||||
@@ -1,14 +1,13 @@
|
||||
import sys
|
||||
class CancelledError(BaseException): ...
|
||||
class TimeoutError(Exception): ...
|
||||
class InvalidStateError(Exception): ...
|
||||
class SendfileNotAvailableError(RuntimeError): ...
|
||||
|
||||
if sys.version_info >= (3, 8):
|
||||
class CancelledError(BaseException): ...
|
||||
class TimeoutError(Exception): ...
|
||||
class InvalidStateError(Exception): ...
|
||||
class SendfileNotAvailableError(RuntimeError): ...
|
||||
class IncompleteReadError(EOFError):
|
||||
expected: int | None
|
||||
partial: bytes
|
||||
def __init__(self, partial: bytes, expected: int | None) -> None: ...
|
||||
class LimitOverrunError(Exception):
|
||||
consumed: int
|
||||
def __init__(self, message: str, consumed: int) -> None: ...
|
||||
class IncompleteReadError(EOFError):
|
||||
expected: int | None
|
||||
partial: bytes
|
||||
def __init__(self, partial: bytes, expected: int | None) -> None: ...
|
||||
|
||||
class LimitOverrunError(Exception):
|
||||
consumed: int
|
||||
def __init__(self, message: str, consumed: int) -> None: ...
|
||||
|
||||
@@ -1,5 +1,4 @@
|
||||
import functools
|
||||
import sys
|
||||
import traceback
|
||||
from types import FrameType, FunctionType
|
||||
from typing import Any, Iterable, Union, overload
|
||||
@@ -9,12 +8,11 @@ class _HasWrapper:
|
||||
|
||||
_FuncType = Union[FunctionType, _HasWrapper, functools.partial[Any], functools.partialmethod[Any]]
|
||||
|
||||
if sys.version_info >= (3, 7):
|
||||
@overload
|
||||
def _get_function_source(func: _FuncType) -> tuple[str, int]: ...
|
||||
@overload
|
||||
def _get_function_source(func: object) -> tuple[str, int] | None: ...
|
||||
def _format_callback_source(func: object, args: Iterable[Any]) -> str: ...
|
||||
def _format_args_and_kwargs(args: Iterable[Any], kwargs: dict[str, Any]) -> str: ...
|
||||
def _format_callback(func: object, args: Iterable[Any], kwargs: dict[str, Any], suffix: str = ...) -> str: ...
|
||||
def extract_stack(f: FrameType | None = ..., limit: int | None = ...) -> traceback.StackSummary: ...
|
||||
@overload
|
||||
def _get_function_source(func: _FuncType) -> tuple[str, int]: ...
|
||||
@overload
|
||||
def _get_function_source(func: object) -> tuple[str, int] | None: ...
|
||||
def _format_callback_source(func: object, args: Iterable[Any]) -> str: ...
|
||||
def _format_args_and_kwargs(args: Iterable[Any], kwargs: dict[str, Any]) -> str: ...
|
||||
def _format_callback(func: object, args: Iterable[Any], kwargs: dict[str, Any], suffix: str = ...) -> str: ...
|
||||
def extract_stack(f: FrameType | None = ..., limit: int | None = ...) -> traceback.StackSummary: ...
|
||||
|
||||
@@ -1,10 +1,9 @@
|
||||
import sys
|
||||
from typing import Awaitable, TypeVar
|
||||
|
||||
if sys.version_info >= (3, 7):
|
||||
from typing import Awaitable, TypeVar
|
||||
_T = TypeVar("_T")
|
||||
if sys.version_info >= (3, 8):
|
||||
def run(main: Awaitable[_T], *, debug: bool | None = ...) -> _T: ...
|
||||
|
||||
_T = TypeVar("_T")
|
||||
if sys.version_info >= (3, 8):
|
||||
def run(main: Awaitable[_T], *, debug: bool | None = ...) -> _T: ...
|
||||
else:
|
||||
def run(main: Awaitable[_T], *, debug: bool = ...) -> _T: ...
|
||||
else:
|
||||
def run(main: Awaitable[_T], *, debug: bool = ...) -> _T: ...
|
||||
|
||||
@@ -1,9 +1,7 @@
|
||||
import sys
|
||||
from typing import Any, Awaitable, Callable, Iterable
|
||||
|
||||
from . import events
|
||||
|
||||
if sys.version_info >= (3, 8):
|
||||
async def staggered_race(
|
||||
coro_fns: Iterable[Callable[[], Awaitable[Any]]], delay: float | None, *, loop: events.AbstractEventLoop | None = ...
|
||||
) -> tuple[Any, int | None, list[Exception | None]]: ...
|
||||
async def staggered_race(
|
||||
coro_fns: Iterable[Callable[[], Awaitable[Any]]], delay: float | None, *, loop: events.AbstractEventLoop | None = ...
|
||||
) -> tuple[Any, int | None, list[Exception | None]]: ...
|
||||
|
||||
@@ -1,9 +1,7 @@
|
||||
import sys
|
||||
from typing import Callable, TypeVar
|
||||
from typing_extensions import ParamSpec
|
||||
|
||||
_P = ParamSpec("_P")
|
||||
_R = TypeVar("_R")
|
||||
|
||||
if sys.version_info >= (3, 9):
|
||||
async def to_thread(__func: Callable[_P, _R], *args: _P.args, **kwargs: _P.kwargs) -> _R: ...
|
||||
async def to_thread(__func: Callable[_P, _R], *args: _P.args, **kwargs: _P.kwargs) -> _R: ...
|
||||
|
||||
@@ -4,84 +4,84 @@ from builtins import type as Type # alias to avoid name clashes with property n
|
||||
from types import TracebackType
|
||||
from typing import Any, BinaryIO, Iterable, NoReturn, Union, overload
|
||||
|
||||
if sys.version_info >= (3, 8):
|
||||
# These are based in socket, maybe move them out into _typeshed.pyi or such
|
||||
_Address = Union[tuple[Any, ...], str]
|
||||
_RetAddress = Any
|
||||
_WriteBuffer = Union[bytearray, memoryview]
|
||||
_CMSG = tuple[int, int, bytes]
|
||||
class TransportSocket:
|
||||
def __init__(self, sock: socket.socket) -> None: ...
|
||||
def _na(self, what: str) -> None: ...
|
||||
@property
|
||||
def family(self) -> int: ...
|
||||
@property
|
||||
def type(self) -> int: ...
|
||||
@property
|
||||
def proto(self) -> int: ...
|
||||
def __getstate__(self) -> NoReturn: ...
|
||||
def fileno(self) -> int: ...
|
||||
def dup(self) -> socket.socket: ...
|
||||
def get_inheritable(self) -> bool: ...
|
||||
def shutdown(self, how: int) -> None: ...
|
||||
@overload
|
||||
def getsockopt(self, level: int, optname: int) -> int: ...
|
||||
@overload
|
||||
def getsockopt(self, level: int, optname: int, buflen: int) -> bytes: ...
|
||||
@overload
|
||||
def setsockopt(self, level: int, optname: int, value: int | bytes) -> None: ...
|
||||
@overload
|
||||
def setsockopt(self, level: int, optname: int, value: None, optlen: int) -> None: ...
|
||||
def getpeername(self) -> _RetAddress: ...
|
||||
def getsockname(self) -> _RetAddress: ...
|
||||
def getsockbyname(self) -> NoReturn: ... # This method doesn't exist on socket, yet is passed through?
|
||||
def accept(self) -> tuple[socket.socket, _RetAddress]: ...
|
||||
def connect(self, address: _Address | bytes) -> None: ...
|
||||
def connect_ex(self, address: _Address | bytes) -> int: ...
|
||||
def bind(self, address: _Address | bytes) -> None: ...
|
||||
if sys.platform == "win32":
|
||||
def ioctl(self, control: int, option: int | tuple[int, int, int] | bool) -> None: ...
|
||||
else:
|
||||
def ioctl(self, control: int, option: int | tuple[int, int, int] | bool) -> NoReturn: ...
|
||||
def listen(self, __backlog: int = ...) -> None: ...
|
||||
def makefile(self) -> BinaryIO: ...
|
||||
def sendfile(self, file: BinaryIO, offset: int = ..., count: int | None = ...) -> int: ...
|
||||
def close(self) -> None: ...
|
||||
def detach(self) -> int: ...
|
||||
if sys.platform == "linux":
|
||||
def sendmsg_afalg(
|
||||
self, msg: Iterable[bytes] = ..., *, op: int, iv: Any = ..., assoclen: int = ..., flags: int = ...
|
||||
) -> int: ...
|
||||
else:
|
||||
def sendmsg_afalg(
|
||||
self, msg: Iterable[bytes] = ..., *, op: int, iv: Any = ..., assoclen: int = ..., flags: int = ...
|
||||
) -> NoReturn: ...
|
||||
def sendmsg(
|
||||
self, __buffers: Iterable[bytes], __ancdata: Iterable[_CMSG] = ..., __flags: int = ..., __address: _Address = ...
|
||||
# These are based in socket, maybe move them out into _typeshed.pyi or such
|
||||
_Address = Union[tuple[Any, ...], str]
|
||||
_RetAddress = Any
|
||||
_WriteBuffer = Union[bytearray, memoryview]
|
||||
_CMSG = tuple[int, int, bytes]
|
||||
|
||||
class TransportSocket:
|
||||
def __init__(self, sock: socket.socket) -> None: ...
|
||||
def _na(self, what: str) -> None: ...
|
||||
@property
|
||||
def family(self) -> int: ...
|
||||
@property
|
||||
def type(self) -> int: ...
|
||||
@property
|
||||
def proto(self) -> int: ...
|
||||
def __getstate__(self) -> NoReturn: ...
|
||||
def fileno(self) -> int: ...
|
||||
def dup(self) -> socket.socket: ...
|
||||
def get_inheritable(self) -> bool: ...
|
||||
def shutdown(self, how: int) -> None: ...
|
||||
@overload
|
||||
def getsockopt(self, level: int, optname: int) -> int: ...
|
||||
@overload
|
||||
def getsockopt(self, level: int, optname: int, buflen: int) -> bytes: ...
|
||||
@overload
|
||||
def setsockopt(self, level: int, optname: int, value: int | bytes) -> None: ...
|
||||
@overload
|
||||
def setsockopt(self, level: int, optname: int, value: None, optlen: int) -> None: ...
|
||||
def getpeername(self) -> _RetAddress: ...
|
||||
def getsockname(self) -> _RetAddress: ...
|
||||
def getsockbyname(self) -> NoReturn: ... # This method doesn't exist on socket, yet is passed through?
|
||||
def accept(self) -> tuple[socket.socket, _RetAddress]: ...
|
||||
def connect(self, address: _Address | bytes) -> None: ...
|
||||
def connect_ex(self, address: _Address | bytes) -> int: ...
|
||||
def bind(self, address: _Address | bytes) -> None: ...
|
||||
if sys.platform == "win32":
|
||||
def ioctl(self, control: int, option: int | tuple[int, int, int] | bool) -> None: ...
|
||||
else:
|
||||
def ioctl(self, control: int, option: int | tuple[int, int, int] | bool) -> NoReturn: ...
|
||||
def listen(self, __backlog: int = ...) -> None: ...
|
||||
def makefile(self) -> BinaryIO: ...
|
||||
def sendfile(self, file: BinaryIO, offset: int = ..., count: int | None = ...) -> int: ...
|
||||
def close(self) -> None: ...
|
||||
def detach(self) -> int: ...
|
||||
if sys.platform == "linux":
|
||||
def sendmsg_afalg(
|
||||
self, msg: Iterable[bytes] = ..., *, op: int, iv: Any = ..., assoclen: int = ..., flags: int = ...
|
||||
) -> int: ...
|
||||
@overload
|
||||
def sendto(self, data: bytes, address: _Address) -> int: ...
|
||||
@overload
|
||||
def sendto(self, data: bytes, flags: int, address: _Address) -> int: ...
|
||||
def send(self, data: bytes, flags: int = ...) -> int: ...
|
||||
def sendall(self, data: bytes, flags: int = ...) -> None: ...
|
||||
def set_inheritable(self, inheritable: bool) -> None: ...
|
||||
if sys.platform == "win32":
|
||||
def share(self, process_id: int) -> bytes: ...
|
||||
else:
|
||||
def share(self, process_id: int) -> NoReturn: ...
|
||||
def recv_into(self, buffer: _WriteBuffer, nbytes: int = ..., flags: int = ...) -> int: ...
|
||||
def recvfrom_into(self, buffer: _WriteBuffer, nbytes: int = ..., flags: int = ...) -> tuple[int, _RetAddress]: ...
|
||||
def recvmsg_into(
|
||||
self, __buffers: Iterable[_WriteBuffer], __ancbufsize: int = ..., __flags: int = ...
|
||||
) -> tuple[int, list[_CMSG], int, Any]: ...
|
||||
def recvmsg(self, __bufsize: int, __ancbufsize: int = ..., __flags: int = ...) -> tuple[bytes, list[_CMSG], int, Any]: ...
|
||||
def recvfrom(self, bufsize: int, flags: int = ...) -> tuple[bytes, _RetAddress]: ...
|
||||
def recv(self, bufsize: int, flags: int = ...) -> bytes: ...
|
||||
def settimeout(self, value: float | None) -> None: ...
|
||||
def gettimeout(self) -> float | None: ...
|
||||
def setblocking(self, flag: bool) -> None: ...
|
||||
def __enter__(self) -> socket.socket: ...
|
||||
def __exit__(
|
||||
self, exc_type: Type[BaseException] | None, exc_val: BaseException | None, exc_tb: TracebackType | None
|
||||
) -> None: ...
|
||||
else:
|
||||
def sendmsg_afalg(
|
||||
self, msg: Iterable[bytes] = ..., *, op: int, iv: Any = ..., assoclen: int = ..., flags: int = ...
|
||||
) -> NoReturn: ...
|
||||
def sendmsg(
|
||||
self, __buffers: Iterable[bytes], __ancdata: Iterable[_CMSG] = ..., __flags: int = ..., __address: _Address = ...
|
||||
) -> int: ...
|
||||
@overload
|
||||
def sendto(self, data: bytes, address: _Address) -> int: ...
|
||||
@overload
|
||||
def sendto(self, data: bytes, flags: int, address: _Address) -> int: ...
|
||||
def send(self, data: bytes, flags: int = ...) -> int: ...
|
||||
def sendall(self, data: bytes, flags: int = ...) -> None: ...
|
||||
def set_inheritable(self, inheritable: bool) -> None: ...
|
||||
if sys.platform == "win32":
|
||||
def share(self, process_id: int) -> bytes: ...
|
||||
else:
|
||||
def share(self, process_id: int) -> NoReturn: ...
|
||||
def recv_into(self, buffer: _WriteBuffer, nbytes: int = ..., flags: int = ...) -> int: ...
|
||||
def recvfrom_into(self, buffer: _WriteBuffer, nbytes: int = ..., flags: int = ...) -> tuple[int, _RetAddress]: ...
|
||||
def recvmsg_into(
|
||||
self, __buffers: Iterable[_WriteBuffer], __ancbufsize: int = ..., __flags: int = ...
|
||||
) -> tuple[int, list[_CMSG], int, Any]: ...
|
||||
def recvmsg(self, __bufsize: int, __ancbufsize: int = ..., __flags: int = ...) -> tuple[bytes, list[_CMSG], int, Any]: ...
|
||||
def recvfrom(self, bufsize: int, flags: int = ...) -> tuple[bytes, _RetAddress]: ...
|
||||
def recv(self, bufsize: int, flags: int = ...) -> bytes: ...
|
||||
def settimeout(self, value: float | None) -> None: ...
|
||||
def gettimeout(self) -> float | None: ...
|
||||
def setblocking(self, flag: bool) -> None: ...
|
||||
def __enter__(self) -> socket.socket: ...
|
||||
def __exit__(
|
||||
self, exc_type: Type[BaseException] | None, exc_val: BaseException | None, exc_tb: TracebackType | None
|
||||
) -> None: ...
|
||||
|
||||
Reference in New Issue
Block a user