fix redis str types (#4783)

This commit is contained in:
Jonathan Schoonhoven
2020-11-28 19:26:05 -08:00
committed by GitHub
parent 9af49c0b69
commit 6d697e7f2c

View File

@@ -1,5 +1,22 @@
from datetime import timedelta
from typing import Any, Callable, Dict, Iterable, Iterator, List, Mapping, Optional, Sequence, Set, Text, Tuple, Union, overload
from typing import (
Any,
Callable,
Dict,
Generic,
Iterable,
Iterator,
List,
Mapping,
Optional,
Sequence,
Set,
Text,
Tuple,
TypeVar,
Union,
overload,
)
from typing_extensions import Literal
from .connection import ConnectionPool
@@ -38,14 +55,82 @@ def parse_slowlog_get(response, **options): ...
_Value = Union[bytes, float, int, Text]
_Key = Union[Text, bytes]
class Redis(object):
# Lib returns str or bytes depending on Python version and value of decode_responses
_StrType = TypeVar("_StrType", bound=Union[Text, bytes])
class Redis(Generic[_StrType]):
RESPONSE_CALLBACKS: Any
@overload
@classmethod
def from_url(cls, url: Text, db: Optional[int] = ..., **kwargs) -> Redis: ...
def from_url(
cls,
url: Text,
host: Optional[Text] = ...,
port: Optional[int] = ...,
db: Optional[int] = ...,
password: Optional[Text] = ...,
socket_timeout: Optional[float] = ...,
socket_connect_timeout: Optional[float] = ...,
socket_keepalive: Optional[bool] = ...,
socket_keepalive_options: Optional[Mapping[str, Union[int, str]]] = ...,
connection_pool: Optional[ConnectionPool] = ...,
unix_socket_path: Optional[Text] = ...,
encoding: Text = ...,
encoding_errors: Text = ...,
charset: Optional[Text] = ...,
errors: Optional[Text] = ...,
decode_responses: Optional[bool] = ...,
retry_on_timeout: bool = ...,
ssl: bool = ...,
ssl_keyfile: Optional[Text] = ...,
ssl_certfile: Optional[Text] = ...,
ssl_cert_reqs: Optional[Union[str, int]] = ...,
ssl_ca_certs: Optional[Text] = ...,
ssl_check_hostname: bool = ...,
max_connections: Optional[int] = ...,
single_connection_client: bool = ...,
health_check_interval: float = ...,
client_name: Optional[Text] = ...,
username: Optional[Text] = ...,
) -> Redis[bytes]: ...
@overload
@classmethod
def from_url(
cls,
url: Text,
host: Optional[Text] = ...,
port: Optional[int] = ...,
db: Optional[int] = ...,
password: Optional[Text] = ...,
socket_timeout: Optional[float] = ...,
socket_connect_timeout: Optional[float] = ...,
socket_keepalive: Optional[bool] = ...,
socket_keepalive_options: Optional[Mapping[str, Union[int, str]]] = ...,
connection_pool: Optional[ConnectionPool] = ...,
unix_socket_path: Optional[Text] = ...,
encoding: Text = ...,
encoding_errors: Text = ...,
charset: Optional[Text] = ...,
decode_responses: Literal[True] = ...,
errors: Optional[Text] = ...,
retry_on_timeout: bool = ...,
ssl: bool = ...,
ssl_keyfile: Optional[Text] = ...,
ssl_certfile: Optional[Text] = ...,
ssl_cert_reqs: Optional[Union[str, int]] = ...,
ssl_ca_certs: Optional[Text] = ...,
ssl_check_hostname: bool = ...,
max_connections: Optional[int] = ...,
single_connection_client: bool = ...,
health_check_interval: float = ...,
client_name: Optional[Text] = ...,
username: Optional[Text] = ...,
) -> Redis[str]: ...
connection_pool: Any
response_callbacks: Any
def __init__(
self,
@overload
def __new__(
cls,
host: Text = ...,
port: int = ...,
db: int = ...,
@@ -60,7 +145,99 @@ class Redis(object):
encoding_errors: Text = ...,
charset: Optional[Text] = ...,
errors: Optional[Text] = ...,
decode_responses: bool = ...,
retry_on_timeout: bool = ...,
ssl: bool = ...,
ssl_keyfile: Optional[Text] = ...,
ssl_certfile: Optional[Text] = ...,
ssl_cert_reqs: Optional[Union[str, int]] = ...,
ssl_ca_certs: Optional[Text] = ...,
ssl_check_hostname: bool = ...,
max_connections: Optional[int] = ...,
single_connection_client: bool = ...,
health_check_interval: float = ...,
client_name: Optional[Text] = ...,
username: Optional[Text] = ...,
) -> Redis[bytes]: ...
@overload
def __new__(
cls,
host: Text = ...,
port: int = ...,
db: int = ...,
password: Optional[Text] = ...,
socket_timeout: Optional[float] = ...,
socket_connect_timeout: Optional[float] = ...,
socket_keepalive: Optional[bool] = ...,
socket_keepalive_options: Optional[Mapping[str, Union[int, str]]] = ...,
connection_pool: Optional[ConnectionPool] = ...,
unix_socket_path: Optional[Text] = ...,
encoding: Text = ...,
encoding_errors: Text = ...,
charset: Optional[Text] = ...,
errors: Optional[Text] = ...,
decode_responses: Literal[True] = ...,
retry_on_timeout: bool = ...,
ssl: bool = ...,
ssl_keyfile: Optional[Text] = ...,
ssl_certfile: Optional[Text] = ...,
ssl_cert_reqs: Optional[Union[str, int]] = ...,
ssl_ca_certs: Optional[Text] = ...,
ssl_check_hostname: bool = ...,
max_connections: Optional[int] = ...,
single_connection_client: bool = ...,
health_check_interval: float = ...,
client_name: Optional[Text] = ...,
username: Optional[Text] = ...,
) -> Redis[str]: ...
@overload
def __init__(
self: Redis[str],
host: Text = ...,
port: int = ...,
db: int = ...,
password: Optional[Text] = ...,
socket_timeout: Optional[float] = ...,
socket_connect_timeout: Optional[float] = ...,
socket_keepalive: Optional[bool] = ...,
socket_keepalive_options: Optional[Mapping[str, Union[int, str]]] = ...,
connection_pool: Optional[ConnectionPool] = ...,
unix_socket_path: Optional[Text] = ...,
encoding: Text = ...,
encoding_errors: Text = ...,
charset: Optional[Text] = ...,
errors: Optional[Text] = ...,
decode_responses: Literal[True] = ...,
retry_on_timeout: bool = ...,
ssl: bool = ...,
ssl_keyfile: Optional[Text] = ...,
ssl_certfile: Optional[Text] = ...,
ssl_cert_reqs: Optional[Union[str, int]] = ...,
ssl_ca_certs: Optional[Text] = ...,
ssl_check_hostname: bool = ...,
max_connections: Optional[int] = ...,
single_connection_client: bool = ...,
health_check_interval: float = ...,
client_name: Optional[Text] = ...,
username: Optional[Text] = ...,
) -> None: ...
@overload
def __init__(
self: Redis[bytes],
host: Text = ...,
port: int = ...,
db: int = ...,
password: Optional[Text] = ...,
socket_timeout: Optional[float] = ...,
socket_connect_timeout: Optional[float] = ...,
socket_keepalive: Optional[bool] = ...,
socket_keepalive_options: Optional[Mapping[str, Union[int, str]]] = ...,
connection_pool: Optional[ConnectionPool] = ...,
unix_socket_path: Optional[Text] = ...,
encoding: Text = ...,
encoding_errors: Text = ...,
charset: Optional[Text] = ...,
errors: Optional[Text] = ...,
decode_responses: Optional[bool] = ...,
retry_on_timeout: bool = ...,
ssl: bool = ...,
ssl_keyfile: Optional[Text] = ...,
@@ -153,11 +330,11 @@ class Redis(object):
__contains__: Any
def expire(self, name: _Key, time: Union[int, timedelta]) -> bool: ...
def expireat(self, name, when): ...
def get(self, name: _Key) -> Any: ... # Optional[Union[str, bytes]] depending on decode_responses
def get(self, name: _Key) -> Optional[_StrType]: ...
def __getitem__(self, name): ...
def getbit(self, name: _Key, offset: int) -> int: ...
def getrange(self, key, start, end): ...
def getset(self, name, value): ...
def getset(self, name, value) -> Optional[_StrType]: ...
def incr(self, name, amount=...): ...
def incrby(self, name, amount=...): ...
def incrbyfloat(self, name, amount=...): ...
@@ -197,10 +374,10 @@ class Redis(object):
def watch(self, *names): ...
def unlink(self, *names: _Key) -> int: ...
def unwatch(self): ...
def blpop(self, keys: Union[_Value, Iterable[_Value]], timeout: int = ...) -> Optional[Tuple[bytes, bytes]]: ...
def brpop(self, keys: Union[_Value, Iterable[_Value]], timeout: int = ...) -> Optional[Tuple[bytes, bytes]]: ...
def blpop(self, keys: Union[_Value, Iterable[_Value]], timeout: int = ...) -> Optional[Tuple[_StrType, _StrType]]: ...
def brpop(self, keys: Union[_Value, Iterable[_Value]], timeout: int = ...) -> Optional[Tuple[_StrType, _StrType]]: ...
def brpoplpush(self, src, dst, timeout=...): ...
def lindex(self, name: _Key, index: int) -> Optional[bytes]: ...
def lindex(self, name: _Key, index: int) -> Optional[_StrType]: ...
def linsert(
self, name: _Key, where: Literal["BEFORE", "AFTER", "before", "after"], refvalue: _Value, value: _Value
) -> int: ...
@@ -208,7 +385,7 @@ class Redis(object):
def lpop(self, name): ...
def lpush(self, name: _Value, *values: _Value) -> int: ...
def lpushx(self, name, value): ...
def lrange(self, name: _Key, start: int, end: int) -> List[bytes]: ...
def lrange(self, name: _Key, start: int, end: int) -> List[_StrType]: ...
def lrem(self, name: _Key, count: int, value: _Value) -> int: ...
def lset(self, name: _Key, index: int, value: _Value) -> bool: ...
def ltrim(self, name: _Key, start: int, end: int) -> bool: ...
@@ -228,7 +405,7 @@ class Redis(object):
alpha: bool = ...,
store: None = ...,
groups: bool = ...,
) -> List[bytes]: ...
) -> List[_StrType]: ...
@overload
def sort(
self,
@@ -256,15 +433,13 @@ class Redis(object):
store: _Key,
groups: bool = ...,
) -> int: ...
def scan(
self, cursor: int = ..., match: Optional[_Key] = ..., count: Optional[int] = ...
) -> Tuple[int, List[Any]]: ... # Tuple[int, List[_Key]] depending on decode_responses
def scan_iter(
self, match: Optional[Text] = ..., count: Optional[int] = ...
) -> Iterator[Any]: ... # Iterator[_Key] depending on decode_responses
def sscan(self, name: _Key, cursor: int = ..., match: Text = ..., count: int = ...) -> Tuple[int, List[bytes]]: ...
def scan(self, cursor: int = ..., match: Optional[_Key] = ..., count: Optional[int] = ...) -> Tuple[int, List[_StrType]]: ...
def scan_iter(self, match: Optional[Text] = ..., count: Optional[int] = ...) -> Iterator[_StrType]: ...
def sscan(self, name: _Key, cursor: int = ..., match: Text = ..., count: int = ...) -> Tuple[int, List[_StrType]]: ...
def sscan_iter(self, name, match=..., count=...): ...
def hscan(self, name: _Key, cursor: int = ..., match: Text = ..., count: int = ...) -> Tuple[int, Dict[bytes, bytes]]: ...
def hscan(
self, name: _Key, cursor: int = ..., match: Text = ..., count: int = ...
) -> Tuple[int, Dict[_StrType, _StrType]]: ...
def hscan_iter(self, name, match=..., count=...): ...
def zscan(self, name, cursor=..., match=..., count=..., score_cast_func=...): ...
def zscan_iter(self, name, match=..., count=..., score_cast_func=...): ...
@@ -346,19 +521,19 @@ class Redis(object):
def pfmerge(self, dest: _Key, *sources: _Key) -> bool: ...
def hdel(self, name: _Key, *keys: _Key) -> int: ...
def hexists(self, name: _Key, key: _Key) -> bool: ...
def hget(self, name: _Key, key: _Key) -> Optional[bytes]: ...
def hgetall(self, name: _Key) -> Dict[bytes, bytes]: ...
def hget(self, name: _Key, key: _Key) -> Optional[_StrType]: ...
def hgetall(self, name: _Key) -> Dict[_StrType, _StrType]: ...
def hincrby(self, name: _Key, key: _Key, amount: int = ...) -> int: ...
def hincrbyfloat(self, name: _Key, key: _Key, amount: float = ...) -> float: ...
def hkeys(self, name: _Key) -> List[bytes]: ...
def hkeys(self, name: _Key) -> List[_StrType]: ...
def hlen(self, name: _Key) -> int: ...
def hset(
self, name: _Key, key: Optional[_Key], value: Optional[_Value], mapping: Optional[Mapping[_Value, _Value]] = ...
) -> int: ...
def hsetnx(self, name: _Key, key: _Key, value: _Value) -> int: ...
def hmset(self, name: _Key, mapping: Mapping[_Value, _Value]) -> bool: ...
def hmget(self, name: _Key, keys: Union[_Key, Iterable[_Key]], *args: _Key) -> List[Optional[bytes]]: ...
def hvals(self, name: _Key) -> List[bytes]: ...
def hmget(self, name: _Key, keys: Union[_Key, Iterable[_Key]], *args: _Key) -> List[Optional[_StrType]]: ...
def hvals(self, name: _Key) -> List[_StrType]: ...
def publish(self, channel: _Key, message: _Key) -> int: ...
def eval(self, script, numkeys, *keys_and_args): ...
def evalsha(self, sha, numkeys, *keys_and_args): ...
@@ -366,7 +541,7 @@ class Redis(object):
def script_flush(self): ...
def script_kill(self): ...
def script_load(self, script): ...
def register_script(self, script: Union[Text, bytes]) -> Script: ...
def register_script(self, script: Union[Text, _StrType]) -> Script: ...
def pubsub_channels(self, pattern: _Key = ...) -> List[Text]: ...
def pubsub_numsub(self, *args: _Key) -> List[Tuple[Text, int]]: ...
def pubsub_numpat(self) -> int: ...