[redis] Initial stubs for redis.asyncio.cluster (#9468)

Co-authored-by: Nikita Sobolev <mail@sobolevn.me>
Co-authored-by: Alex Waygood <Alex.Waygood@Gmail.com>
Co-authored-by: Sebastian Rittau <srittau@rittau.biz>
This commit is contained in:
Juan Amari
2023-02-03 10:27:39 -03:00
committed by GitHub
parent f94d496626
commit ce8c82a27c
2 changed files with 165 additions and 0 deletions

View File

@@ -70,3 +70,5 @@ redis.commands.core.SortedSetCommands
redis.commands.core.StreamCommands
redis.commands.json.Pipeline
redis.commands.timeseries.Pipeline
redis.asyncio.cluster.ClusterPipeline
redis.asyncio.cluster.RedisCluster

View File

@@ -0,0 +1,163 @@
from _typeshed import Incomplete, Self
from collections.abc import Awaitable, Callable, Mapping
from typing import Any, Generic
from redis.asyncio.client import ResponseCallbackT
from redis.asyncio.connection import BaseParser, Connection, Encoder
from redis.asyncio.parser import CommandsParser
from redis.client import AbstractRedis
from redis.cluster import AbstractRedisCluster, LoadBalancer
# TODO: add AsyncRedisClusterCommands stubs
# from redis.commands import AsyncRedisClusterCommands
from redis.commands.core import _StrType
from redis.credentials import CredentialProvider
from redis.retry import Retry
from redis.typing import AnyKeyT, EncodableT, KeyT
# It uses `DefaultParser` in real life, but it is a dynamic base class.
class ClusterParser(BaseParser): ...
class RedisCluster(AbstractRedis, AbstractRedisCluster, Generic[_StrType]): # TODO: AsyncRedisClusterCommands
retry: Retry | None
connection_kwargs: dict[str, Any]
nodes_manager: NodesManager
encoder: Encoder
read_from_replicas: bool
reinitialize_steps: int
cluster_error_retry_attempts: int
reinitialize_counter: int
commands_parser: CommandsParser
node_flags: set[str]
command_flags: dict[str, str]
response_callbacks: Incomplete
result_callbacks: dict[str, Callable[[Incomplete, Incomplete], Incomplete]]
def __init__(
self,
host: str | None = ...,
port: str | int = ...,
# Cluster related kwargs
startup_nodes: list[ClusterNode] | None = ...,
require_full_coverage: bool = ...,
read_from_replicas: bool = ...,
reinitialize_steps: int = ...,
cluster_error_retry_attempts: int = ...,
connection_error_retry_attempts: int = ...,
max_connections: int = ...,
# Client related kwargs
db: str | int = ...,
path: str | None = ...,
credential_provider: CredentialProvider | None = ...,
username: str | None = ...,
password: str | None = ...,
client_name: str | None = ...,
# Encoding related kwargs
encoding: str = ...,
encoding_errors: str = ...,
decode_responses: bool = ...,
# Connection related kwargs
health_check_interval: float = ...,
socket_connect_timeout: float | None = ...,
socket_keepalive: bool = ...,
socket_keepalive_options: Mapping[int, int | bytes] | None = ...,
socket_timeout: float | None = ...,
retry: Retry | None = ...,
retry_on_error: list[Exception] | None = ...,
# SSL related kwargs
ssl: bool = ...,
ssl_ca_certs: str | None = ...,
ssl_ca_data: str | None = ...,
ssl_cert_reqs: str = ...,
ssl_certfile: str | None = ...,
ssl_check_hostname: bool = ...,
ssl_keyfile: str | None = ...,
) -> None: ...
async def initialize(self: Self) -> Self: ...
async def close(self) -> None: ...
async def __aenter__(self: Self) -> Self: ...
async def __aexit__(self, exc_type: object, exc_value: object, traceback: object) -> None: ...
def __await__(self: Self) -> Awaitable[Self]: ...
def __del__(self) -> None: ...
async def on_connect(self, connection: Connection) -> None: ...
def get_nodes(self) -> list[ClusterNode]: ...
def get_primaries(self) -> list[ClusterNode]: ...
def get_replicas(self) -> list[ClusterNode]: ...
def get_random_node(self) -> ClusterNode: ...
def get_default_node(self) -> ClusterNode: ...
def set_default_node(self, node: ClusterNode) -> None: ...
def get_node(self, host: str | None = ..., port: int | None = ..., node_name: str | None = ...) -> ClusterNode | None: ...
def get_node_from_key(self, key: str, replica: bool = ...) -> ClusterNode | None: ...
def keyslot(self, key: EncodableT) -> int: ...
def get_encoder(self) -> Encoder: ...
def get_connection_kwargs(self) -> dict[str, Any | None]: ...
def set_response_callback(self, command: str, callback: ResponseCallbackT) -> None: ...
async def execute_command(self, *args: EncodableT, **kwargs: Any) -> Any: ...
def pipeline(self, transaction: Any | None = ..., shard_hint: Any | None = ...) -> ClusterPipeline[_StrType]: ...
@classmethod
def from_url(cls: type[Self], url: str, **kwargs) -> Self: ...
class ClusterNode:
host: str
port: str | int
name: str
server_type: str | None
max_connections: int
connection_class: type[Connection]
connection_kwargs: dict[str, Any]
response_callbacks: dict[Incomplete, Incomplete]
def __init__(
self,
host: str,
port: str | int,
server_type: str | None = ...,
*,
max_connections: int = ...,
connection_class: type[Connection] = ...,
**connection_kwargs: Any,
) -> None: ...
def __eq__(self, obj: object) -> bool: ...
def __del__(self) -> None: ...
async def disconnect(self) -> None: ...
def acquire_connection(self) -> Connection: ...
async def parse_response(self, connection: Connection, command: str, **kwargs: Any) -> Any: ...
async def execute_command(self, *args: Any, **kwargs: Any) -> Any: ...
async def execute_pipeline(self, commands: list[PipelineCommand]) -> bool: ...
class NodesManager:
startup_nodes: dict[str, ClusterNode]
require_full_coverage: bool
connection_kwargs: dict[str, Any]
default_node: ClusterNode | None
nodes_cache: dict[str, ClusterNode]
slots_cache: dict[int, list[ClusterNode]]
read_load_balancer: LoadBalancer
def __init__(
self, startup_nodes: list[ClusterNode], require_full_coverage: bool, connection_kwargs: dict[str, Any]
) -> None: ...
def get_node(self, host: str | None = ..., port: int | None = ..., node_name: str | None = ...) -> ClusterNode | None: ...
def set_nodes(self, old: dict[str, ClusterNode], new: dict[str, ClusterNode], remove_old: bool = ...) -> None: ...
def get_node_from_slot(self, slot: int, read_from_replicas: bool = ...) -> ClusterNode: ...
def get_nodes_by_server_type(self, server_type: str) -> list[ClusterNode]: ...
async def initialize(self) -> None: ...
async def close(self, attr: str = ...) -> None: ...
class ClusterPipeline(AbstractRedis, AbstractRedisCluster, Generic[_StrType]): # TODO: AsyncRedisClusterCommands
def __init__(self, client: RedisCluster[_StrType]) -> None: ...
async def initialize(self: Self) -> Self: ...
async def __aenter__(self: Self) -> Self: ...
async def __aexit__(self, exc_type: object, exc_value: object, traceback: object) -> None: ...
def __await__(self: Self) -> Awaitable[Self]: ...
def __enter__(self: Self) -> Self: ...
def __exit__(self, exc_type: object, exc_value: object, traceback: object) -> None: ...
def __bool__(self) -> bool: ...
def __len__(self) -> int: ...
def execute_command(self: Self, *args: KeyT | EncodableT, **kwargs: Any) -> Self: ...
async def execute(self, raise_on_error: bool = ..., allow_redirections: bool = ...) -> list[Any]: ...
def mset_nonatomic(self: Self, mapping: Mapping[AnyKeyT, EncodableT]) -> Self: ...
class PipelineCommand:
args: Any
kwargs: Any
position: int
result: Exception | None | Any
def __init__(self, position: int, *args: Any, **kwargs: Any) -> None: ...