[simple-websocket] Add stubs for simple-websocket (#14868)

This commit is contained in:
lev-blit
2025-11-01 17:27:32 +02:00
committed by GitHub
parent be34e9201d
commit 62dc403abc
7 changed files with 329 additions and 0 deletions
+1
View File
@@ -88,6 +88,7 @@
"stubs/seaborn",
"stubs/setuptools/setuptools",
"stubs/shapely",
"stubs/simple-websocket",
"stubs/tensorflow",
"stubs/tqdm",
"stubs/vobject",
+3
View File
@@ -0,0 +1,3 @@
version = "1.1.*"
upstream_repository = "https://github.com/miguelgrinberg/simple-websocket"
requires = ["wsproto"]
@@ -0,0 +1,3 @@
from .aiows import AioClient as AioClient, AioServer as AioServer
from .errors import ConnectionClosed as ConnectionClosed, ConnectionError as ConnectionError
from .ws import Client as Client, Server as Server
@@ -0,0 +1,130 @@
import asyncio
import socket
from _typeshed import Incomplete, Unused
from _typeshed.wsgi import WSGIEnvironment
from collections.abc import Awaitable, Callable
from ssl import SSLContext
from typing import Any, Literal, TypedDict, type_check_only
from wsproto import ConnectionType, WSConnection
from wsproto.events import Request
from wsproto.frame_protocol import CloseReason
from .asgi import WebSocketASGI, _SocketDataBase, _SocketDataBytes, _SocketDataProtocol, _SocketDataStr
class AioBase:
subprotocol: str | None
connection_type: ConnectionType
receive_bytes: int
ping_interval: float | None
max_message_size: int | None
pong_received: bool
input_buffer: list[bytes | str]
incoming_message: bytes | str | None
incoming_message_len: int
connected: bool
is_server: bool
close_reason: CloseReason
close_message: str
rsock: asyncio.StreamReader
wsock: asyncio.StreamWriter
event: asyncio.Event
ws: WSConnection | None
task: asyncio.Task[None]
def __init__(
self,
connection_type: ConnectionType | None = None,
receive_bytes: int = 4096,
ping_interval: float | None = None,
max_message_size: int | None = None,
) -> None: ...
async def connect(self) -> None: ...
async def handshake(self) -> None: ...
# data can be antyhing. a special case is made for `bytes`, anything else is converted to `str`.
async def send(self, data: bytes | Any) -> None: ...
async def receive(self, timeout: float | None = None) -> bytes | str | None: ...
async def close(self, reason: CloseReason | None = None, message: str | None = None) -> None: ...
def choose_subprotocol(self, request: Request) -> str | None: ...
@type_check_only
class _AioServerRequest(TypedDict):
# this is `aiohttp.web.Request`
aiohttp: Incomplete
sock: None
headers: None
class AioServer(AioBase):
request: _AioServerRequest
headers: dict[str, Any]
subprotocols: list[str]
is_server: Literal[True]
mode: str
connected: bool
def __init__(
self,
request: _AioServerRequest,
subprotocols: list[str] | None = None,
receive_bytes: int = 4096,
ping_interval: float | None = None,
max_message_size: int | None = None,
) -> None: ...
@classmethod
async def accept(
cls,
# this is `aiohttp.web.Request`
aiohttp=None,
asgi: (
tuple[
WSGIEnvironment,
Callable[[], Awaitable[_SocketDataBytes | _SocketDataStr]],
Callable[[_SocketDataBase | _SocketDataProtocol | _SocketDataBytes | _SocketDataStr], Awaitable[None]],
]
| None
) = None,
sock: socket.socket | None = None,
headers: dict[str, Any] | None = None,
subprotocols: list[str] | None = None,
receive_bytes: int = 4096,
ping_interval: float | None = None,
max_message_size: int | None = None,
) -> WebSocketASGI | AioServer: ...
async def handshake(self) -> None: ...
def choose_subprotocol(self, request: Request) -> str | None: ...
class AioClient(AioBase):
url: str
ssl_context: SSLContext | None
is_secure: bool
host: str
port: int
path: str
subprotocols: list[str]
extra_headeers: list[tuple[bytes, bytes]]
subprotocol: str | None
connected: bool
def __init__(
self,
url: str,
subprotocols: list[str] | None = None,
headers: dict[str, Any] | None = None,
receive_bytes: int = 4096,
ping_interval: float | None = None,
max_message_size: int | None = None,
ssl_context: SSLContext | None = None,
) -> None: ...
# the source code itself has this override
@classmethod
async def connect( # type: ignore[override]
cls,
url: str,
subprotocols: list[str] | None = None,
headers: dict[str, Any] | None = None,
receive_bytes: int = 4096,
ping_interval: float | None = None,
max_message_size: int | None = None,
ssl_context: SSLContext | None = None,
thread_class: Unused = None,
event_class: Unused = None,
) -> AioClient: ...
async def handshake(self) -> None: ...
async def close(self, reason: CloseReason | None = None, message: str | None = None) -> None: ...
@@ -0,0 +1,44 @@
from _typeshed.wsgi import WSGIEnvironment
from collections.abc import Awaitable, Callable
from typing import TypedDict, type_check_only
@type_check_only
class _SocketDataBase(TypedDict):
type: str
@type_check_only
class _SocketDataProtocol(_SocketDataBase):
subprotocol: str | None
@type_check_only
class _SocketDataStr(_SocketDataBase):
text: str
@type_check_only
class _SocketDataBytes(_SocketDataBase):
bytes: bytes
class WebSocketASGI:
subprotocols: list[str]
subprotocol: str
connected: bool
# this is set in `close` to `False`
conncted: bool
def __init__(
self,
scope: WSGIEnvironment,
receive: Callable[[], Awaitable[_SocketDataBytes | _SocketDataStr]],
send: Callable[[_SocketDataBase | _SocketDataProtocol | _SocketDataBytes | _SocketDataStr], Awaitable[None]],
subprotocols: list[str] | None = None,
) -> None: ...
@classmethod
async def accept(
cls,
scope: WSGIEnvironment,
receive: Callable[[], Awaitable[_SocketDataBytes | _SocketDataStr]],
send: Callable[[_SocketDataBase | _SocketDataProtocol | _SocketDataBytes | _SocketDataStr], Awaitable[None]],
subprotocols: list[str] | None = None,
) -> WebSocketASGI: ...
async def receive(self) -> bytes | str: ...
async def send(self, data: bytes | str) -> None: ...
async def close(self) -> None: ...
@@ -0,0 +1,12 @@
from wsproto.frame_protocol import CloseReason
class SimpleWebsocketError(RuntimeError): ...
class ConnectionError(SimpleWebsocketError):
status_code: int | None
def __init__(self, status_code: int | None = None) -> None: ...
class ConnectionClosed(SimpleWebsocketError):
reason: CloseReason
message: str | None
def __init__(self, reason: CloseReason = ..., message: str | None = None) -> None: ...
@@ -0,0 +1,136 @@
import socket
import threading
from _typeshed import FileDescriptorLike
from _typeshed.wsgi import WSGIEnvironment
from collections.abc import Callable
from selectors import SelectorKey, _EventMask
from ssl import SSLContext
from typing import Any, Protocol, type_check_only
from wsproto import ConnectionType, WSConnection
from wsproto.events import Request
from wsproto.frame_protocol import CloseReason
@type_check_only
class _ThreadClassProtocol(Protocol):
name: str
# this accepts any callable as the target, like `threading.Thread`
def __init__(self, target: Callable[..., Any]) -> None: ...
def start(self) -> None: ...
@type_check_only
class _EventClassProtocol(Protocol):
def clear(self) -> None: ...
def set(self) -> None: ...
def wait(self, timeout: float | None = None) -> bool: ...
@type_check_only
class _SelectorClassProtocol(Protocol):
# the signature of `register` here is the same as `selectors._BaseSelectorImpl` from the stdlib
def register(self, fileobj: FileDescriptorLike, events: _EventMask, data: Any = None) -> SelectorKey: ...
# the signature of `select` here is the same as `selectors.DefaultSelector` from the stdlib
def select(self, timeout: float | None = None) -> list[tuple[SelectorKey, _EventMask]]: ...
def close(self) -> None: ...
class Base:
subprotocol: str | None
sock: socket.socket | None
receive_bytes: int
ping_interval: float | None
max_message_size: int | None
pong_received: bool
input_buffer: list[bytes | str]
incoming_message: bytes | str | None
incoming_message_len: int
connected: bool
is_server: bool
close_reason: CloseReason
close_message: str | None
selector_class: type[_SelectorClassProtocol]
event: _EventClassProtocol | threading.Event
ws: WSConnection
thread: _ThreadClassProtocol | threading.Thread
def __init__(
self,
sock: socket.socket | None = None,
connection_type: ConnectionType | None = None,
receive_bytes: int = 4096,
ping_interval: float | None = None,
max_message_size: int | None = None,
thread_class: type[_ThreadClassProtocol] | None = None,
event_class: type[_EventClassProtocol] | None = None,
selector_class: type[_SelectorClassProtocol] | None = None,
) -> None: ...
def handshake(self) -> None: ...
# data can be antyhing. a special case is made for `bytes`, anything else is converted to `str`.
def send(self, data: bytes | Any) -> None: ...
def receive(self, timeout: float | None = None) -> bytes | str | None: ...
def close(self, reason: CloseReason | None = None, message: str | None = None) -> None: ...
def choose_subprotocol(self, request: Request) -> str | None: ...
class Server(Base):
environ: WSGIEnvironment
subprotocols: list[str]
mode: str
connected: bool
def __init__(
self,
environ: WSGIEnvironment,
subprotocols: list[str] | None = None,
receive_bytes: int = 4096,
ping_interval: float | None = None,
max_message_size: int | None = None,
thread_class: type[_ThreadClassProtocol] | None = None,
event_class: type[_EventClassProtocol] | None = None,
selector_class: type[_SelectorClassProtocol] | None = None,
) -> None: ...
@classmethod
def accept(
cls,
environ: WSGIEnvironment,
subprotocols: list[str] | None = None,
receive_bytes: int = 4096,
ping_interval: float | None = None,
max_message_size: int | None = None,
thread_class: type[_ThreadClassProtocol] | None = None,
event_class: type[_EventClassProtocol] | None = None,
selector_class: type[_SelectorClassProtocol] | None = None,
) -> Server: ...
def handshake(self) -> None: ...
def choose_subprotocol(self, request: Request) -> str | None: ...
class Client(Base):
host: str
port: int
path: str
subprotocols: list[str]
extra_headeers: list[tuple[bytes, bytes]]
subprotocol: str | None
connected: bool
def __init__(
self,
url: str,
subprotocols: list[str] | None = None,
headers: dict[bytes, bytes] | list[tuple[bytes, bytes]] | None = None,
receive_bytes: int = 4096,
ping_interval: float | None = None,
max_message_size: int | None = None,
ssl_context: SSLContext | None = None,
thread_class: type[_ThreadClassProtocol] | None = None,
event_class: type[_EventClassProtocol] | None = None,
) -> None: ...
@classmethod
def connect(
cls,
url: str,
subprotocols: list[str] | None = None,
headers: dict[bytes, bytes] | list[tuple[bytes, bytes]] | None = None,
receive_bytes: int = 4096,
ping_interval: float | None = None,
max_message_size: int | None = None,
ssl_context: SSLContext | None = None,
thread_class: type[_ThreadClassProtocol] | None = None,
event_class: type[_EventClassProtocol] | None = None,
) -> Client: ...
def handshake(self) -> None: ...
def close(self, reason: CloseReason | None = None, message: str | None = None) -> None: ...