Add more stubs for redis-py (#4480)

This commit is contained in:
Rajiv Bakulesh Shah
2020-09-17 07:25:16 -07:00
committed by GitHub
parent 822e427693
commit 98667b18c2

View File

@@ -1,5 +1,6 @@
from datetime import timedelta
from typing import Any, Callable, Dict, Iterable, Iterator, List, Mapping, Optional, Set, Text, Tuple, Union
from typing import Any, Callable, Dict, Iterable, Iterator, List, Mapping, Optional, Sequence, Set, Text, Tuple, Union
from typing_extensions import Literal
from .connection import ConnectionPool
@@ -114,7 +115,7 @@ class Redis(object):
def slowlog_reset(self): ...
def time(self): ...
def append(self, key, value): ...
def bitcount(self, key, start=..., end=...): ...
def bitcount(self, key: _Key, start: Optional[int] = ..., end: Optional[int] = ...) -> int: ...
def bitop(self, operation, dest, *keys): ...
def bitpos(self, key, bit, start=..., end=...): ...
def decr(self, name, amount=...): ...
@@ -127,7 +128,7 @@ class Redis(object):
def expireat(self, name, when): ...
def get(self, name: _Key) -> Any: ... # Optional[Union[str, bytes]] depending on decode_responses
def __getitem__(self, name): ...
def getbit(self, name, offset): ...
def getbit(self, name: _Key, offset: int) -> int: ...
def getrange(self, key, start, end): ...
def getset(self, name, value): ...
def incr(self, name, amount=...): ...
@@ -157,7 +158,7 @@ class Redis(object):
xx: bool = ...,
) -> Optional[bool]: ...
def __setitem__(self, name, value): ...
def setbit(self, name, offset, value): ...
def setbit(self, name: _Key, offset: int, value: int) -> int: ...
def setex(self, name, time, value): ...
def setnx(self, name, value): ...
def setrange(self, name, offset, value): ...
@@ -170,40 +171,53 @@ class Redis(object):
def blpop(self, keys: Union[_Value, Iterable[_Value]], timeout: int = ...) -> Optional[Tuple[bytes, bytes]]: ...
def brpop(self, keys: Union[_Value, Iterable[_Value]], timeout: int = ...) -> Optional[Tuple[bytes, bytes]]: ...
def brpoplpush(self, src, dst, timeout=...): ...
def lindex(self, name, index): ...
def linsert(self, name, where, refvalue, value): ...
def llen(self, name): ...
def lindex(self, name: _Key, index: int) -> Optional[bytes]: ...
def linsert(
self, name: _Key, where: Literal["BEFORE", "AFTER", "before", "after"], refvalue: _Value, value: _Value
) -> int: ...
def llen(self, name: _Key) -> int: ...
def lpop(self, name): ...
def lpush(self, name: _Value, *values: _Value) -> int: ...
def lpushx(self, name, value): ...
def lrange(self, name, start, end): ...
def lrem(self, name, count, value): ...
def lset(self, name, index, value): ...
def ltrim(self, name, start, end): ...
def lrange(self, name: _Key, start: int, end: int) -> List[bytes]: ...
def lrem(self, name: _Key, count: int, value: _Value) -> int: ...
def lset(self, name: _Key, index: int, value: _Value) -> bool: ...
def ltrim(self, name: _Key, start: int, end: int) -> bool: ...
def rpop(self, name): ...
def rpoplpush(self, src, dst): ...
def rpush(self, name: _Value, *values: _Value) -> int: ...
def rpushx(self, name, value): ...
def sort(self, name, start=..., num=..., by=..., get=..., desc=..., alpha=..., store=..., groups=...): ...
def sort(
self,
name: _Key,
start: Optional[int] = ...,
num: Optional[int] = ...,
by: Optional[_Key] = ...,
get: Optional[Union[_Key, Sequence[_Key]]] = ...,
desc: bool = ...,
alpha: bool = ...,
store: Optional[_Key] = ...,
groups: bool = ...,
) -> Union[List[bytes], int]: ...
def scan(
self, cursor: int = ..., match: Optional[_Key] = ..., count: Optional[int] = ...
) -> Tuple[int, List[Any]]: ... # Tuple[int, List[_Key]] depending on decode_responses
def scan_iter(
self, match: Optional[Text] = ..., count: Optional[int] = ...
) -> Iterator[Any]: ... # Iterator[_Key] depending on decode_responses
def sscan(self, name, cursor=..., match=..., count=...): ...
def sscan(self, name: _Key, cursor: int = ..., match: Text = ..., count: int = ...) -> Tuple[int, List[bytes]]: ...
def sscan_iter(self, name, match=..., count=...): ...
def hscan(self, name, cursor=..., match=..., count=...): ...
def hscan(self, name: _Key, cursor: int = ..., match: Text = ..., count: int = ...) -> Tuple[int, Dict[bytes, bytes]]: ...
def hscan_iter(self, name, match=..., count=...): ...
def zscan(self, name, cursor=..., match=..., count=..., score_cast_func=...): ...
def zscan_iter(self, name, match=..., count=..., score_cast_func=...): ...
def sadd(self, name, *values): ...
def sadd(self, name: _Key, *values: _Value) -> int: ...
def scard(self, name: _Key) -> int: ...
def sdiff(self, keys, *args): ...
def sdiffstore(self, dest, keys, *args): ...
def sinter(self, keys, *args): ...
def sinter(self, keys: _Key, *args: _Key) -> Set[_Value]: ...
def sinterstore(self, dest, keys, *args): ...
def sismember(self, name, value): ...
def sismember(self, name: _Key, value: _Value) -> bool: ...
def smembers(self, name: _Key) -> Set[_Value]: ...
def smove(self, src, dst, value): ...
def spop(self, name, count: Optional[int] = ...): ...
@@ -263,9 +277,9 @@ class Redis(object):
def zrevrank(self, name, value): ...
def zscore(self, name, value): ...
def zunionstore(self, dest, keys, aggregate=...): ...
def pfadd(self, name, *values): ...
def pfcount(self, name): ...
def pfmerge(self, dest, *sources): ...
def pfadd(self, name: _Key, *values: _Value) -> int: ...
def pfcount(self, name: _Key) -> int: ...
def pfmerge(self, dest: _Key, *sources: _Key) -> bool: ...
def hdel(self, name: _Key, *keys: _Key) -> int: ...
def hexists(self, name: _Key, key: _Key) -> bool: ...
def hget(self, name: _Key, key: _Key) -> Optional[bytes]: ...
@@ -346,7 +360,7 @@ class BasePipeline:
scripts: Any
explicit_transaction: Any
def reset(self): ...
def multi(self): ...
def multi(self) -> None: ...
def execute_command(self, *args, **kwargs): ...
def immediate_execute_command(self, *args, **options): ...
def pipeline_execute_command(self, *args, **options): ...
@@ -354,7 +368,7 @@ class BasePipeline:
def annotate_exception(self, exception, number, command): ...
def parse_response(self, connection, command_name, **options): ...
def load_scripts(self): ...
def execute(self, raise_on_error=...): ...
def execute(self, raise_on_error: bool = ...) -> List[Any]: ...
def watch(self, *names): ...
def unwatch(self): ...
def script_load_for_pipeline(self, script): ...