From ce8c82a27c277784a981c113b5d9267ed38d409f Mon Sep 17 00:00:00 2001 From: Juan Amari <11861490+juanamari94@users.noreply.github.com> Date: Fri, 3 Feb 2023 10:27:39 -0300 Subject: [PATCH] [redis] Initial stubs for `redis.asyncio.cluster` (#9468) Co-authored-by: Nikita Sobolev Co-authored-by: Alex Waygood Co-authored-by: Sebastian Rittau --- stubs/redis/@tests/stubtest_allowlist.txt | 2 + stubs/redis/redis/asyncio/cluster.pyi | 163 ++++++++++++++++++++++ 2 files changed, 165 insertions(+) create mode 100644 stubs/redis/redis/asyncio/cluster.pyi diff --git a/stubs/redis/@tests/stubtest_allowlist.txt b/stubs/redis/@tests/stubtest_allowlist.txt index 43aea1424..4256e9c48 100644 --- a/stubs/redis/@tests/stubtest_allowlist.txt +++ b/stubs/redis/@tests/stubtest_allowlist.txt @@ -70,3 +70,5 @@ redis.commands.core.SortedSetCommands redis.commands.core.StreamCommands redis.commands.json.Pipeline redis.commands.timeseries.Pipeline +redis.asyncio.cluster.ClusterPipeline +redis.asyncio.cluster.RedisCluster diff --git a/stubs/redis/redis/asyncio/cluster.pyi b/stubs/redis/redis/asyncio/cluster.pyi new file mode 100644 index 000000000..a68e81c66 --- /dev/null +++ b/stubs/redis/redis/asyncio/cluster.pyi @@ -0,0 +1,163 @@ +from _typeshed import Incomplete, Self +from collections.abc import Awaitable, Callable, Mapping +from typing import Any, Generic + +from redis.asyncio.client import ResponseCallbackT +from redis.asyncio.connection import BaseParser, Connection, Encoder +from redis.asyncio.parser import CommandsParser +from redis.client import AbstractRedis +from redis.cluster import AbstractRedisCluster, LoadBalancer + +# TODO: add AsyncRedisClusterCommands stubs +# from redis.commands import AsyncRedisClusterCommands +from redis.commands.core import _StrType +from redis.credentials import CredentialProvider +from redis.retry import Retry +from redis.typing import AnyKeyT, EncodableT, KeyT + +# It uses `DefaultParser` in real life, but it is a dynamic base class. +class ClusterParser(BaseParser): ... + +class RedisCluster(AbstractRedis, AbstractRedisCluster, Generic[_StrType]): # TODO: AsyncRedisClusterCommands + retry: Retry | None + connection_kwargs: dict[str, Any] + nodes_manager: NodesManager + encoder: Encoder + read_from_replicas: bool + reinitialize_steps: int + cluster_error_retry_attempts: int + reinitialize_counter: int + commands_parser: CommandsParser + node_flags: set[str] + command_flags: dict[str, str] + response_callbacks: Incomplete + result_callbacks: dict[str, Callable[[Incomplete, Incomplete], Incomplete]] + def __init__( + self, + host: str | None = ..., + port: str | int = ..., + # Cluster related kwargs + startup_nodes: list[ClusterNode] | None = ..., + require_full_coverage: bool = ..., + read_from_replicas: bool = ..., + reinitialize_steps: int = ..., + cluster_error_retry_attempts: int = ..., + connection_error_retry_attempts: int = ..., + max_connections: int = ..., + # Client related kwargs + db: str | int = ..., + path: str | None = ..., + credential_provider: CredentialProvider | None = ..., + username: str | None = ..., + password: str | None = ..., + client_name: str | None = ..., + # Encoding related kwargs + encoding: str = ..., + encoding_errors: str = ..., + decode_responses: bool = ..., + # Connection related kwargs + health_check_interval: float = ..., + socket_connect_timeout: float | None = ..., + socket_keepalive: bool = ..., + socket_keepalive_options: Mapping[int, int | bytes] | None = ..., + socket_timeout: float | None = ..., + retry: Retry | None = ..., + retry_on_error: list[Exception] | None = ..., + # SSL related kwargs + ssl: bool = ..., + ssl_ca_certs: str | None = ..., + ssl_ca_data: str | None = ..., + ssl_cert_reqs: str = ..., + ssl_certfile: str | None = ..., + ssl_check_hostname: bool = ..., + ssl_keyfile: str | None = ..., + ) -> None: ... + async def initialize(self: Self) -> Self: ... + async def close(self) -> None: ... + async def __aenter__(self: Self) -> Self: ... + async def __aexit__(self, exc_type: object, exc_value: object, traceback: object) -> None: ... + def __await__(self: Self) -> Awaitable[Self]: ... + def __del__(self) -> None: ... + async def on_connect(self, connection: Connection) -> None: ... + def get_nodes(self) -> list[ClusterNode]: ... + def get_primaries(self) -> list[ClusterNode]: ... + def get_replicas(self) -> list[ClusterNode]: ... + def get_random_node(self) -> ClusterNode: ... + def get_default_node(self) -> ClusterNode: ... + def set_default_node(self, node: ClusterNode) -> None: ... + def get_node(self, host: str | None = ..., port: int | None = ..., node_name: str | None = ...) -> ClusterNode | None: ... + def get_node_from_key(self, key: str, replica: bool = ...) -> ClusterNode | None: ... + def keyslot(self, key: EncodableT) -> int: ... + def get_encoder(self) -> Encoder: ... + def get_connection_kwargs(self) -> dict[str, Any | None]: ... + def set_response_callback(self, command: str, callback: ResponseCallbackT) -> None: ... + async def execute_command(self, *args: EncodableT, **kwargs: Any) -> Any: ... + def pipeline(self, transaction: Any | None = ..., shard_hint: Any | None = ...) -> ClusterPipeline[_StrType]: ... + @classmethod + def from_url(cls: type[Self], url: str, **kwargs) -> Self: ... + +class ClusterNode: + host: str + port: str | int + name: str + server_type: str | None + max_connections: int + connection_class: type[Connection] + connection_kwargs: dict[str, Any] + response_callbacks: dict[Incomplete, Incomplete] + def __init__( + self, + host: str, + port: str | int, + server_type: str | None = ..., + *, + max_connections: int = ..., + connection_class: type[Connection] = ..., + **connection_kwargs: Any, + ) -> None: ... + def __eq__(self, obj: object) -> bool: ... + def __del__(self) -> None: ... + async def disconnect(self) -> None: ... + def acquire_connection(self) -> Connection: ... + async def parse_response(self, connection: Connection, command: str, **kwargs: Any) -> Any: ... + async def execute_command(self, *args: Any, **kwargs: Any) -> Any: ... + async def execute_pipeline(self, commands: list[PipelineCommand]) -> bool: ... + +class NodesManager: + startup_nodes: dict[str, ClusterNode] + require_full_coverage: bool + connection_kwargs: dict[str, Any] + default_node: ClusterNode | None + nodes_cache: dict[str, ClusterNode] + slots_cache: dict[int, list[ClusterNode]] + read_load_balancer: LoadBalancer + def __init__( + self, startup_nodes: list[ClusterNode], require_full_coverage: bool, connection_kwargs: dict[str, Any] + ) -> None: ... + def get_node(self, host: str | None = ..., port: int | None = ..., node_name: str | None = ...) -> ClusterNode | None: ... + def set_nodes(self, old: dict[str, ClusterNode], new: dict[str, ClusterNode], remove_old: bool = ...) -> None: ... + def get_node_from_slot(self, slot: int, read_from_replicas: bool = ...) -> ClusterNode: ... + def get_nodes_by_server_type(self, server_type: str) -> list[ClusterNode]: ... + async def initialize(self) -> None: ... + async def close(self, attr: str = ...) -> None: ... + +class ClusterPipeline(AbstractRedis, AbstractRedisCluster, Generic[_StrType]): # TODO: AsyncRedisClusterCommands + def __init__(self, client: RedisCluster[_StrType]) -> None: ... + async def initialize(self: Self) -> Self: ... + async def __aenter__(self: Self) -> Self: ... + async def __aexit__(self, exc_type: object, exc_value: object, traceback: object) -> None: ... + def __await__(self: Self) -> Awaitable[Self]: ... + def __enter__(self: Self) -> Self: ... + def __exit__(self, exc_type: object, exc_value: object, traceback: object) -> None: ... + def __bool__(self) -> bool: ... + def __len__(self) -> int: ... + def execute_command(self: Self, *args: KeyT | EncodableT, **kwargs: Any) -> Self: ... + async def execute(self, raise_on_error: bool = ..., allow_redirections: bool = ...) -> list[Any]: ... + def mset_nonatomic(self: Self, mapping: Mapping[AnyKeyT, EncodableT]) -> Self: ... + +class PipelineCommand: + args: Any + kwargs: Any + position: int + result: Exception | None | Any + def __init__(self, position: int, *args: Any, **kwargs: Any) -> None: ...