Stub: asyncore.pyi (#498)

* Stub: asyncore.pyi

I can't really test this stub since I don't use this module. I believe it's not far from the real thing.

It's based on the source code and not on the documentation.

https://hg.python.org/cpython/file/default/Lib/asyncore.py

* Option ->Optional

* add hint to count, remove addr

* make read/write explicit functions

* add synchat.pyi, move to 2and3

* change sys.version_info test

* try reversing the syntax
This commit is contained in:
Elazar
2016-08-26 23:34:42 +03:00
committed by Matthias Kramm
parent 68f8a278fe
commit f1047ec005
2 changed files with 168 additions and 0 deletions

41
stdlib/2and3/asynchat.pyi Normal file
View File

@@ -0,0 +1,41 @@
from typing import Union, Tuple, Sequence
from abc import abstractmethod
import asyncore
import socket
class simple_producer:
def __init__(self, data: str, buffer_size: int = ...) -> None: ...
def more(self) -> str: ...
class async_chat (asyncore.dispatcher):
ac_in_buffer_size = ... # type: int
ac_out_buffer_size = ... # type: int
def __init__(self, sock: socket.socket = None, map: asyncore._maptype = None) -> None: ...
@abstractmethod
def collect_incoming_data(self, data: str) -> None: ...
@abstractmethod
def found_terminator(self) -> None: ...
def set_terminator(self, term: Union[str, int, None]) -> None: ...
def get_terminator(self) -> Union[str, int, None]: ...
def handle_read(self) -> None: ...
def handle_write(self) -> None: ...
def handle_close(self) -> None: ...
def push(self, data: str) -> None: ...
def push_with_producer(self, producer: simple_producer) -> None: ...
def readable(self) -> bool: ...
def writable(self) -> bool: ...
def close_when_done(self) -> None: ...
def initiate_send(self) -> None: ...
def discard_buffers(self) -> None: ...
import sys
if sys.version_info < (3, 0, 0):
class fifo:
def __init__(self, list: Sequence[Union[str, simple_producer]] = ...) -> None: ...
def __len__(self) -> int: ...
def is_empty(self) -> bool: ...
def first(self) -> str: ...
def push(self, data: Union[str, simple_producer]) -> None: ...
def pop(self) -> Tuple[int, str]: ...

127
stdlib/2and3/asyncore.pyi Normal file
View File

@@ -0,0 +1,127 @@
from typing import Tuple, Union, Optional, Any, Dict, overload
import select, socket, sys, time, warnings, os
from errno import (EALREADY, EINPROGRESS, EWOULDBLOCK, ECONNRESET, EINVAL,
ENOTCONN, ESHUTDOWN, EINTR, EISCONN, EBADF, ECONNABORTED,
EPIPE, EAGAIN, errorcode)
# cyclic dependence with asynchat
_maptype = Dict[str, Any]
class ExitNow(Exception): pass
def read(obj: Any) -> None: ...
def write(obj: Any) -> None: ...
def readwrite(obj: Any, flags: int) -> None: ...
def poll(timeout: float = ..., map: _maptype = ...) -> None: ...
def poll2(timeout: float = ..., map: _maptype = ...) -> None: ...
poll3 = poll2
def loop(timeout: float = ..., use_poll: bool = ..., map: _maptype = ..., count: int = None) -> None: ...
# Not really subclass of socket.socket; it's only delegation.
# It is not covariant to it.
class dispatcher:
debug = ... # type: bool
connected = ... # type: bool
accepting = ... # type: bool
connecting = ... # type: bool
closing = ... # type: bool
ignore_log_types = ... # type: frozenset[str]
def __init__(self, sock: socket.socket = None, map: _maptype = ...) -> None: ...
def add_channel(self, map: _maptype = ...) -> None: ...
def del_channel(self, map: _maptype = ...) -> None: ...
def create_socket(self, family: int, type: int) -> None: ...
def set_socket(self, sock: socket.socket, map: _maptype = ...) -> None: ...
def set_reuse_addr(self) -> None: ...
def readable(self) -> bool: ...
def writable(self) -> bool: ...
def accept(self) -> Optional[Tuple[socket.socket, Any]]: ...
def recv(self, buffer_size: int) -> str: ...
def log(self, message: Any) -> None: ...
def log_info(self, message: Any, type: str = ...) -> None: ...
def handle_read_event(self) -> None: ...
def handle_connect_event(self) -> None: ...
def handle_write_event(self) -> None: ...
def handle_expt_event(self) -> None: ...
def handle_error(self) -> None: ...
def handle_expt(self) -> None: ...
def handle_read(self) -> None: ...
def handle_write(self) -> None: ...
def handle_connect(self) -> None: ...
def handle_accept(self) -> None: ...
def handle_close(self) -> None: ...
def detach(self) -> int: ...
def fileno(self) -> int: ...
# return value is an address
def getpeername(self) -> Any: ...
def getsockname(self) -> Any: ...
@overload
def getsockopt(self, level: int, optname: int) -> int: ...
@overload
def getsockopt(self, level: int, optname: int, buflen: int) -> bytes: ...
def gettimeout(self) -> float: ...
def ioctl(self, control: object,
option: Tuple[int, int, int]) -> None: ...
def listen(self, backlog: int) -> None: ...
# TODO the return value may be BinaryIO or TextIO, depending on mode
def makefile(self, mode: str = ..., buffering: int = ...,
encoding: str = ..., errors: str = ...,
newline: str = ...) -> Any:
...
# return type is an address
def recvfrom(self, bufsize: int, flags: int = ...) -> Any: ...
def recvfrom_into(self, buffer: str, nbytes: int, flags: int = ...) -> Any: ...
def recv_into(self, buffer: str, nbytes: int, flags: int = ...) -> Any: ...
def send(self, data: str, flags: int = ...) -> Optional[int]: ...
def sendall(self, data: str, flags: int = ...) -> None: ...
def sendto(self, data: str, address: Union[tuple, str], flags: int = ...) -> int: ...
def setblocking(self, flag: bool) -> None: ...
def settimeout(self, value: Union[float, None]) -> None: ...
def setsockopt(self, level: int, optname: int, value: Union[int, str]) -> None: ...
def shutdown(self, how: int) -> None: ...
class dispatcher_with_send(dispatcher):
def __init__(self, sock: socket.socket = ..., map: _maptype = ...) -> None: ...
def initiate_send(self) -> None: ...
def handle_write(self) -> None: ...
# incompatible signature:
# def send(self, data: str) -> Optional[int]: ...
def compact_traceback() -> Tuple[Tuple[str, str, str], type, type, str]: ...
def close_all(map: _maptype = ..., ignore_all: bool = ...) -> None: ...
# if os.name == 'posix':
# import fcntl
class file_wrapper:
fd = ... # type: int
def __init__(self, fd: int) -> None: ...
def recv(self, bufsize: int, flags: int = ...) -> str: ...
def send(self, data: str, flags: int = ...) -> int: ...
@overload
def getsockopt(self, level: int, optname: int) -> int: ...
@overload
def getsockopt(self, level: int, optname: int, buflen: int) -> bytes: ...
def read(self, bufsize: int, flags: int = ...) -> str: ...
def write(self, data: str, flags: int = ...) -> int: ...
def close(self) -> None: ...
def fileno(self) -> int: ...
class file_dispatcher(dispatcher):
def __init__(self, fd: int, map: _maptype = ...) -> None: ...
def set_file(self, fd: int) -> None: ...