pika: Make Method generic, annotate DeclareOk (#9360)

Co-authored-by: Alex Waygood <Alex.Waygood@Gmail.com>
This commit is contained in:
Sebastian Rittau
2022-12-14 20:42:57 +01:00
committed by GitHub
parent b356abcbda
commit fe9c7384fd
6 changed files with 46 additions and 28 deletions

View File

@@ -21,6 +21,10 @@ pika.adapters.blocking_connection.BlockingConnection.consumer_cancel_notify
pika.adapters.blocking_connection.BlockingConnection.exchange_exchange_bindings
pika.adapters.blocking_connection.BlockingConnection.publisher_confirms
# The implementation has defaults for the arguments that would make the
# created instances unusable, so we require the arguments in the stub.
pika.spec.Queue.DeclareOk.__init__
# Flagged by stubtest for unknown reasons.
pika.data.PY2
pika.data.basestring

View File

@@ -8,7 +8,10 @@ class AMQPObject:
class Class(AMQPObject): ...
class Method(AMQPObject):
synchronous: ClassVar[bool]
# This is a class attribute in the implementation, but subclasses use @property,
# so it's more convenient to use that here as well.
@property
def synchronous(self) -> bool: ...
def get_properties(self) -> Properties: ...
def get_body(self) -> str: ...

View File

@@ -7,7 +7,7 @@ from .connection import Connection
from .data import _ArgumentMapping
from .exchange_type import ExchangeType
from .frame import Method
from .spec import Basic, BasicProperties
from .spec import Basic, BasicProperties, Exchange
LOGGER: Incomplete
MAX_CHANNELS: int
@@ -29,7 +29,9 @@ class Channel:
def add_on_flow_callback(self, callback) -> None: ...
def add_on_return_callback(self, callback) -> None: ...
def basic_ack(self, delivery_tag: int = ..., multiple: bool = ...): ...
def basic_cancel(self, consumer_tag: str = ..., callback: Callable[[Method], object] | None = ...) -> None: ...
def basic_cancel(
self, consumer_tag: str = ..., callback: Callable[[Method[Basic.CancelOk]], object] | None = ...
) -> None: ...
def basic_consume(
self,
queue: str,
@@ -38,7 +40,7 @@ class Channel:
exclusive: bool = ...,
consumer_tag: str | None = ...,
arguments: _ArgumentMapping | None = ...,
callback: Callable[[Method], object] | None = ...,
callback: Callable[[Method[Basic.ConsumeOk]], object] | None = ...,
) -> str: ...
def basic_get(self, queue, callback, auto_ack: bool = ...) -> None: ...
def basic_nack(self, delivery_tag: int = ..., multiple: bool = ..., requeue: bool = ...): ...
@@ -66,10 +68,13 @@ class Channel:
auto_delete: bool = ...,
internal: bool = ...,
arguments: _ArgumentMapping | None = ...,
callback: Callable[[Method], object] | None = ...,
callback: Callable[[Method[Exchange.DeclareOk]], object] | None = ...,
): ...
def exchange_delete(
self, exchange: str | None = ..., if_unused: bool = ..., callback: Callable[[Method], object] | None = ...
self,
exchange: str | None = ...,
if_unused: bool = ...,
callback: Callable[[Method[Exchange.DeleteOk]], object] | None = ...,
): ...
def exchange_unbind(
self,

View File

@@ -9,6 +9,7 @@ from .channel import Channel
from .compat import AbstractBase
from .credentials import _Credentials
from .frame import Method
from .spec import Connection as SpecConnection
PRODUCT: str
LOGGER: Logger
@@ -157,8 +158,12 @@ class Connection(AbstractBase, metaclass=abc.ABCMeta):
internal_connection_workflow: bool = ...,
) -> None: ...
def add_on_close_callback(self: Self, callback: Callable[[Self, BaseException], object]) -> None: ...
def add_on_connection_blocked_callback(self: Self, callback: Callable[[Self, Method], object]) -> None: ...
def add_on_connection_unblocked_callback(self: Self, callback: Callable[[Self, Method], object]) -> None: ...
def add_on_connection_blocked_callback(
self: Self, callback: Callable[[Self, Method[SpecConnection.Blocked]], object]
) -> None: ...
def add_on_connection_unblocked_callback(
self: Self, callback: Callable[[Self, Method[SpecConnection.Unblocked]], object]
) -> None: ...
def add_on_open_callback(self: Self, callback: Callable[[Self], object]) -> None: ...
def add_on_open_error_callback(
self: Self, callback: Callable[[Self, BaseException], object], remove_default: bool = ...

View File

@@ -1,9 +1,12 @@
from abc import abstractmethod
from logging import Logger
from typing import Generic, TypeVar
from .amqp_object import AMQPObject, Method as AMQPMethod
from .spec import BasicProperties
_M = TypeVar("_M", bound=AMQPMethod)
LOGGER: Logger
class Frame(AMQPObject):
@@ -13,9 +16,9 @@ class Frame(AMQPObject):
@abstractmethod
def marshal(self) -> bytes: ...
class Method(Frame):
method: AMQPMethod
def __init__(self, channel_number: int, method: AMQPMethod) -> None: ...
class Method(Frame, Generic[_M]):
method: _M
def __init__(self, channel_number: int, method: _M) -> None: ...
def marshal(self) -> bytes: ...
class Header(Frame):

View File

@@ -1,6 +1,6 @@
from _typeshed import Incomplete
from _typeshed import Incomplete, Self
from typing import ClassVar
from typing_extensions import TypeAlias
from typing_extensions import Literal, TypeAlias
from .amqp_object import Class, Method, Properties
@@ -338,9 +338,9 @@ class Exchange(Class):
INDEX: ClassVar[int]
def __init__(self) -> None: ...
@property
def synchronous(self): ...
def decode(self, encoded, offset: int = ...): ...
def encode(self): ...
def synchronous(self) -> Literal[False]: ...
def decode(self: Self, encoded: bytes, offset: int = ...) -> Self: ...
def encode(self) -> list[bytes]: ...
class Delete(Method):
INDEX: ClassVar[int]
@@ -449,22 +449,20 @@ class Queue(Class):
arguments: Incomplete | None = ...,
) -> None: ...
@property
def synchronous(self): ...
def decode(self, encoded, offset: int = ...): ...
def encode(self): ...
def synchronous(self) -> Literal[True]: ...
def decode(self: Self, encoded: bytes, offset: int = ...) -> Self: ...
def encode(self) -> list[bytes]: ...
class DeclareOk(Method):
INDEX: ClassVar[int]
queue: Incomplete
message_count: Incomplete
consumer_count: Incomplete
def __init__(
self, queue: Incomplete | None = ..., message_count: Incomplete | None = ..., consumer_count: Incomplete | None = ...
) -> None: ...
queue: str
message_count: int
consumer_count: int
def __init__(self, queue: str, message_count: int, consumer_count: int) -> None: ...
@property
def synchronous(self): ...
def decode(self, encoded, offset: int = ...): ...
def encode(self): ...
def synchronous(self) -> Literal[False]: ...
def decode(self: Self, encoded: bytes, offset: int = ...) -> Self: ...
def encode(self) -> list[bytes]: ...
class Bind(Method):
INDEX: ClassVar[int]