Improve types of redis.connection module (#8342)

Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
Co-authored-by: Alex Waygood <Alex.Waygood@Gmail.com>
This commit is contained in:
Nikita Sobolev
2022-07-20 15:48:13 +03:00
committed by GitHub
parent 85077b273d
commit f178ae0d83
2 changed files with 121 additions and 110 deletions

View File

@@ -6,7 +6,7 @@ from typing import Any, ClassVar, Generic
from redis.client import PubSub
from redis.commands import RedisClusterCommands
from redis.commands.core import _StrType
from redis.connection import DefaultParser, Encoder
from redis.connection import BaseParser, Encoder
from redis.exceptions import RedisError
def get_node_name(host, port): ...
@@ -24,8 +24,8 @@ READ_COMMANDS: Any
def cleanup_kwargs(**kwargs): ...
class ClusterParser(DefaultParser):
EXCEPTION_CLASSES: Any
# It uses `DefaultParser` in real life, but it is a dynamic base class.
class ClusterParser(BaseParser): ...
class RedisCluster(RedisClusterCommands[_StrType], Generic[_StrType]):
RedisClusterRequestTTL: ClassVar[int]

View File

@@ -1,83 +1,101 @@
from _typeshed import Self
from collections.abc import Callable, Mapping
from typing import Any
from _typeshed import Incomplete, Self
from collections.abc import Callable, Iterable, Mapping
from queue import Queue
from socket import socket
from typing import Any, ClassVar
from typing_extensions import TypeAlias
from .retry import Retry
ssl_available: Any
SYM_STAR: Any
SYM_DOLLAR: Any
SYM_CRLF: Any
SYM_EMPTY: Any
SERVER_CLOSED_CONNECTION_ERROR: Any
ssl_available: bool
SYM_STAR: bytes
SYM_DOLLAR: bytes
SYM_CRLF: bytes
SYM_EMPTY: bytes
SERVER_CLOSED_CONNECTION_ERROR: str
NONBLOCKING_EXCEPTIONS: tuple[type[Exception], ...]
NONBLOCKING_EXCEPTION_ERROR_NUMBERS: dict[type[Exception], int]
SENTINEL: object
MODULE_LOAD_ERROR: str
NO_SUCH_MODULE_ERROR: str
MODULE_UNLOAD_NOT_POSSIBLE_ERROR: str
MODULE_EXPORTS_DATA_TYPES_ERROR: str
FALSE_STRINGS: tuple[str, ...]
URL_QUERY_ARGUMENT_PARSERS: dict[str, Callable[[Any], Any]]
# Options as passed to Pool.get_connection().
_ConnectionPoolOptions: TypeAlias = Any
_ConnectFunc: TypeAlias = Callable[[Connection], object]
class BaseParser:
EXCEPTION_CLASSES: Any
def parse_error(self, response): ...
EXCEPTION_CLASSES: ClassVar[dict[str, type[Exception] | dict[str, type[Exception]]]]
def parse_error(self, response: str) -> Exception: ...
class SocketBuffer:
socket_read_size: Any
bytes_written: Any
bytes_read: Any
def __init__(self, socket, socket_read_size, socket_timeout) -> None: ...
socket_read_size: int
bytes_written: int
bytes_read: int
socket_timeout: float | None
def __init__(self, socket: socket, socket_read_size: int, socket_timeout: float | None) -> None: ...
@property
def length(self): ...
def read(self, length): ...
def readline(self): ...
def purge(self): ...
def close(self): ...
def can_read(self, timeout): ...
def length(self) -> int: ...
def read(self, length: int) -> bytes: ...
def readline(self) -> bytes: ...
def purge(self) -> None: ...
def close(self) -> None: ...
def can_read(self, timeout: float | None) -> bool: ...
class PythonParser(BaseParser):
encoding: Any
socket_read_size: Any
def __init__(self, socket_read_size) -> None: ...
def __del__(self): ...
def on_connect(self, connection): ...
def on_disconnect(self): ...
def can_read(self, timeout): ...
def read_response(self, disable_decoding: bool = ...): ...
encoding: str
socket_read_size: int
encoder: Encoder | None
def __init__(self, socket_read_size: int) -> None: ...
def __del__(self) -> None: ...
def on_connect(self, connection: Connection) -> None: ...
def on_disconnect(self) -> None: ...
def can_read(self, timeout: float | None) -> bool: ...
def read_response(self, disable_decoding: bool = ...) -> Any: ... # `str | bytes` or `list[str | bytes]`
class HiredisParser(BaseParser):
socket_read_size: Any
def __init__(self, socket_read_size) -> None: ...
def __del__(self): ...
def on_connect(self, connection, **kwargs): ...
def on_disconnect(self): ...
def can_read(self, timeout): ...
def read_from_socket(self, timeout=..., raise_on_timeout: bool = ...) -> bool: ...
def read_response(self, disable_decoding: bool = ...): ...
socket_read_size: int
def __init__(self, socket_read_size: int) -> None: ...
def __del__(self) -> None: ...
def on_connect(self, connection: Connection, **kwargs) -> None: ...
def on_disconnect(self) -> None: ...
def can_read(self, timeout: float | None) -> bool: ...
def read_from_socket(self, timeout: float | None = ..., raise_on_timeout: bool = ...) -> bool: ...
def read_response(self, disable_decoding: bool = ...) -> Any: ... # `str | bytes` or `list[str | bytes]`
DefaultParser: Any
DefaultParser: type[BaseParser] # Hiredis or PythonParser
class Encoder:
def __init__(self, encoding, encoding_errors, decode_responses: bool) -> None: ...
encoding: str
encoding_errors: str
decode_responses: bool
def __init__(self, encoding: str, encoding_errors: str, decode_responses: bool) -> None: ...
def encode(self, value: str | bytes | memoryview | bool | float) -> bytes: ...
def decode(self, value: str | bytes | memoryview, force: bool = ...) -> str: ...
class Connection:
description_format: Any
pid: Any
host: Any
port: Any
db: Any
password: Any
socket_timeout: Any
socket_connect_timeout: Any
socket_keepalive: Any
socket_keepalive_options: Any
retry_on_timeout: Any
retry_on_error: Any
encoding: Any
encoding_errors: Any
decode_responses: Any
pid: int
host: str
port: int
db: int
username: str | None
password: str | None
client_name: str | None
socket_timeout: float | None
socket_connect_timeout: float | None
socket_keepalive: bool
socket_keepalive_options: Mapping[str, int | str]
socket_type: int
retry_on_timeout: bool
retry_on_error: list[type[Exception]]
retry: Retry
redis_connect_func: _ConnectFunc | None
encoder: Encoder
next_health_check: int
health_check_interval: int
def __init__(
self,
host: str = ...,
@@ -90,7 +108,7 @@ class Connection:
socket_keepalive_options: Mapping[str, int | str] | None = ...,
socket_type: int = ...,
retry_on_timeout: bool = ...,
retry_on_error=...,
retry_on_error: list[type[Exception]] = ...,
encoding: str = ...,
encoding_errors: str = ...,
decode_responses: bool = ...,
@@ -102,24 +120,23 @@ class Connection:
retry: Retry | None = ...,
redis_connect_func: _ConnectFunc | None = ...,
) -> None: ...
def __del__(self): ...
def register_connect_callback(self, callback): ...
def clear_connect_callbacks(self): ...
def set_parser(self, parser_class): ...
def connect(self): ...
def on_connect(self): ...
def __del__(self) -> None: ...
def register_connect_callback(self, callback: _ConnectFunc) -> None: ...
def clear_connect_callbacks(self) -> None: ...
def set_parser(self, parser_class: type[BaseParser]) -> None: ...
def connect(self) -> None: ...
def on_connect(self) -> None: ...
def disconnect(self, *args: object) -> None: ... # 'args' added in redis 4.1.2
def check_health(self) -> None: ...
def send_packed_command(self, command, check_health: bool = ...): ...
def send_command(self, *args): ...
def can_read(self, timeout=...): ...
def read_response(self, disable_decoding: bool = ...): ...
def pack_command(self, *args): ...
def pack_commands(self, commands): ...
def send_packed_command(self, command: str | Iterable[str], check_health: bool = ...) -> None: ...
def send_command(self, *args, **kwargs) -> None: ...
def can_read(self, timeout: float | None = ...) -> bool: ...
def read_response(self, disable_decoding: bool = ...) -> Any: ... # `str | bytes` or `list[str | bytes]`
def pack_command(self, *args) -> list[bytes]: ...
def pack_commands(self, commands: Iterable[Iterable[Incomplete]]) -> list[bytes]: ...
def repr_pieces(self) -> list[tuple[str, str]]: ...
class SSLConnection(Connection):
description_format: Any
keyfile: Any
certfile: Any
cert_reqs: Any
@@ -149,65 +166,59 @@ class SSLConnection(Connection):
) -> None: ...
class UnixDomainSocketConnection(Connection):
description_format: Any
pid: Any
path: Any
db: Any
password: Any
socket_timeout: Any
retry_on_timeout: Any
encoding: Any
encoding_errors: Any
decode_responses: Any
retry: Retry
path: str
def __init__(
self,
path=...,
path: str = ...,
db: int = ...,
username=...,
password=...,
socket_timeout=...,
username: str | None = ...,
password: str | None = ...,
socket_timeout: float | None = ...,
encoding: str = ...,
encoding_errors: str = ...,
decode_responses: bool = ...,
retry_on_timeout: bool = ...,
retry_on_error=...,
parser_class=...,
retry_on_error: list[type[Exception]] = ...,
parser_class: type[BaseParser] = ...,
socket_read_size: int = ...,
health_check_interval: int = ...,
client_name=...,
client_name: str | None = ...,
retry: Retry | None = ...,
redis_connect_func: _ConnectFunc | None = ...,
) -> None: ...
def repr_pieces(self) -> list[tuple[str, str]]: ...
# TODO: make generic on `connection_class`
class ConnectionPool:
connection_class: type[Connection]
connection_kwargs: dict[str, Any]
max_connections: int
pid: int
@classmethod
def from_url(cls: type[Self], url: str, *, db: int = ..., decode_components: bool = ..., **kwargs) -> Self: ...
connection_class: Any
connection_kwargs: Any
max_connections: Any
def __init__(self, connection_class=..., max_connections=..., **connection_kwargs) -> None: ...
pid: Any
def reset(self): ...
def get_connection(self, command_name, *keys, **options: _ConnectionPoolOptions): ...
def make_connection(self): ...
def release(self, connection): ...
def disconnect(self, inuse_connections: bool = ...): ...
def __init__(
self, connection_class: type[Connection] = ..., max_connections: int | None = ..., **connection_kwargs
) -> None: ...
def reset(self) -> None: ...
def get_connection(self, command_name: object, *keys, **options: _ConnectionPoolOptions) -> Connection: ...
def make_connection(self) -> Connection: ...
def release(self, connection: Connection) -> None: ...
def disconnect(self, inuse_connections: bool = ...) -> None: ...
def get_encoder(self) -> Encoder: ...
def owns_connection(self, connection: Connection) -> bool: ...
class BlockingConnectionPool(ConnectionPool):
queue_class: Any
timeout: Any
def __init__(self, max_connections=..., timeout=..., connection_class=..., queue_class=..., **connection_kwargs) -> None: ...
pid: Any
pool: Any
def reset(self): ...
def make_connection(self): ...
def get_connection(self, command_name, *keys, **options: _ConnectionPoolOptions): ...
def release(self, connection): ...
def disconnect(self): ...
queue_class: type[Queue[Any]]
timeout: float
pool: Queue[Connection | None] # might not be defined
def __init__(
self,
max_connections: int = ...,
timeout: float = ...,
connection_class: type[Connection] = ...,
queue_class: type[Queue[Any]] = ...,
**connection_kwargs,
) -> None: ...
def disconnect(self) -> None: ... # type: ignore[override]
def to_bool(value: object) -> bool: ...
def parse_url(url: str) -> dict[str, Any]: ...