Improve pika (#13739)

This commit is contained in:
Semyon Moroz
2025-03-31 15:30:55 +04:00
committed by GitHub
parent 38a5859d72
commit d26a889962
17 changed files with 438 additions and 387 deletions
+4
View File
@@ -1,3 +1,5 @@
from typing import Final
from pika import adapters as adapters
from pika.adapters import (
BaseConnection as BaseConnection,
@@ -9,3 +11,5 @@ from pika.connection import ConnectionParameters as ConnectionParameters, SSLOpt
from pika.credentials import PlainCredentials as PlainCredentials
from pika.delivery_mode import DeliveryMode as DeliveryMode
from pika.spec import BasicProperties as BasicProperties
__version__: Final[str]
@@ -1,4 +1,3 @@
from _typeshed import Incomplete
from asyncio import AbstractEventLoop
from collections.abc import Callable
from logging import Logger
@@ -6,7 +5,7 @@ from typing_extensions import Self
from ..connection import Parameters
from .base_connection import BaseConnection
from .utils import io_services_utils, nbio_interface
from .utils import connection_workflow, io_services_utils, nbio_interface
LOGGER: Logger
@@ -22,7 +21,11 @@ class AsyncioConnection(BaseConnection):
) -> None: ...
@classmethod
def create_connection(
cls, connection_configs, on_done, custom_ioloop: AbstractEventLoop | None = None, workflow: Incomplete | None = None
cls,
connection_configs,
on_done,
custom_ioloop: AbstractEventLoop | None = None,
workflow: connection_workflow.AbstractAMQPConnectionWorkflow | None = None,
): ...
class _AsyncioIOServicesAdapter(
@@ -31,7 +34,7 @@ class _AsyncioIOServicesAdapter(
nbio_interface.AbstractIOServices,
nbio_interface.AbstractFileDescriptorServices,
):
def __init__(self, loop: Incomplete | None = None) -> None: ...
def __init__(self, loop: AbstractEventLoop | None = None) -> None: ...
def get_native_ioloop(self): ...
def close(self) -> None: ...
def run(self) -> None: ...
+2 -1
View File
@@ -1,12 +1,13 @@
import abc
from _typeshed import Incomplete
from collections.abc import Callable
from logging import Logger
from typing_extensions import Self
from ..adapters.utils import nbio_interface
from ..connection import Connection
LOGGER: Incomplete
LOGGER: Logger
class BaseConnection(Connection, metaclass=abc.ABCMeta):
def __init__(
@@ -1,5 +1,6 @@
from _typeshed import Incomplete, Unused
from collections.abc import Generator, Sequence
from logging import Logger
from types import TracebackType
from typing import NamedTuple
from typing_extensions import Self
@@ -9,7 +10,7 @@ from ..data import _ArgumentMapping
from ..exchange_type import ExchangeType
from ..spec import BasicProperties
LOGGER: Incomplete
LOGGER: Logger
class _CallbackResult:
def __init__(self, value_class: Incomplete | None = None) -> None: ...
@@ -1,10 +1,11 @@
from _typeshed import Incomplete
from logging import Logger
from pika.adapters.base_connection import BaseConnection
from pika.adapters.utils.nbio_interface import AbstractIOReference
from pika.adapters.utils.selector_ioloop_adapter import AbstractSelectorIOLoop, SelectorIOServicesAdapter
LOGGER: Incomplete
LOGGER: Logger
class GeventConnection(BaseConnection):
def __init__(
@@ -1,11 +1,12 @@
import abc
from _typeshed import Incomplete
from logging import Logger
import pika.compat
from pika.adapters.base_connection import BaseConnection
from pika.adapters.utils.selector_ioloop_adapter import AbstractSelectorIOLoop
LOGGER: Incomplete
LOGGER: Logger
SELECT_TYPE: Incomplete
class SelectConnection(BaseConnection):
@@ -1,8 +1,9 @@
from _typeshed import Incomplete
from logging import Logger
from pika.adapters import base_connection
LOGGER: Incomplete
LOGGER: Logger
class TornadoConnection(base_connection.BaseConnection):
def __init__(
@@ -2,6 +2,7 @@
# We don't want to force it as a dependency but that means we also can't test it with type-checkers given the current setup.
from _typeshed import Incomplete
from logging import Logger
from typing import Generic, NamedTuple, TypeVar
import pika.connection
@@ -17,7 +18,7 @@ from twisted.python.failure import Failure # type: ignore[import-not-found] #
_T = TypeVar("_T")
LOGGER: Incomplete
LOGGER: Logger
class ClosableDeferredQueue(DeferredQueue[_T], Generic[_T]): # pyright: ignore[reportUntypedBaseClass] # noqa: Y060
closed: Failure | BaseException | None
@@ -1,9 +1,10 @@
import abc
from _typeshed import Incomplete
from logging import Logger
from pika.adapters.utils import io_services_utils, nbio_interface
LOGGER: Incomplete
LOGGER: Logger
class AbstractSelectorIOLoop(metaclass=abc.ABCMeta):
@property
+23 -12
View File
@@ -1,8 +1,13 @@
from _typeshed import Incomplete
from collections.abc import Callable
from logging import Logger
from typing import Literal
LOGGER: Incomplete
from pika import amqp_object, frame
def name_or_value(value): ...
LOGGER: Logger
def name_or_value(value: amqp_object.AMQPObject | frame.Frame | int | str) -> str: ...
def sanitize_prefix(function): ...
def check_for_prefix_and_key(function): ...
@@ -16,16 +21,22 @@ class CallbackManager:
def __init__(self) -> None: ...
def add(
self,
prefix,
key,
callback,
prefix: str | int,
key: str | object,
callback: Callable[[Incomplete], Incomplete],
one_shot: bool = True,
only_caller: Incomplete | None = None,
only_caller: object | None = None,
arguments: Incomplete | None = None,
): ...
) -> tuple[str | int, str | object]: ...
def clear(self) -> None: ...
def cleanup(self, prefix): ...
def pending(self, prefix, key): ...
def process(self, prefix, key, caller, *args, **keywords): ...
def remove(self, prefix, key, callback_value: Incomplete | None = None, arguments: Incomplete | None = None): ...
def remove_all(self, prefix, key) -> None: ...
def cleanup(self, prefix: str | int) -> bool: ...
def pending(self, prefix: str | int, key: str | object) -> int | None: ...
def process(self, prefix: str | int, key: str | object, caller, *args, **keywords) -> bool: ...
def remove(
self,
prefix: str | int,
key: str | object,
callback_value: Callable[[Incomplete], Incomplete] | None = None,
arguments: Incomplete | None = None,
) -> Literal[True]: ...
def remove_all(self, prefix: str | int, key: str | object) -> None: ...
+11 -10
View File
@@ -1,4 +1,5 @@
import abc
import ssl
from _typeshed import Incomplete
from collections.abc import Callable
from logging import Logger
@@ -18,13 +19,13 @@ LOGGER: Logger
class Parameters:
DEFAULT_USERNAME: str
DEFAULT_PASSWORD: str
DEFAULT_BLOCKED_CONNECTION_TIMEOUT: Incomplete
DEFAULT_CHANNEL_MAX: Incomplete
DEFAULT_CLIENT_PROPERTIES: Incomplete
DEFAULT_BLOCKED_CONNECTION_TIMEOUT: None
DEFAULT_CHANNEL_MAX: int
DEFAULT_CLIENT_PROPERTIES: None
DEFAULT_CREDENTIALS: Incomplete
DEFAULT_CONNECTION_ATTEMPTS: int
DEFAULT_FRAME_MAX: Incomplete
DEFAULT_HEARTBEAT_TIMEOUT: Incomplete
DEFAULT_FRAME_MAX: int
DEFAULT_HEARTBEAT_TIMEOUT: None
DEFAULT_HOST: str
DEFAULT_LOCALE: str
DEFAULT_PORT: int
@@ -32,10 +33,10 @@ class Parameters:
DEFAULT_SOCKET_TIMEOUT: float
DEFAULT_STACK_TIMEOUT: float
DEFAULT_SSL: bool
DEFAULT_SSL_OPTIONS: Incomplete
DEFAULT_SSL_OPTIONS: None
DEFAULT_SSL_PORT: int
DEFAULT_VIRTUAL_HOST: str
DEFAULT_TCP_OPTIONS: Incomplete
DEFAULT_TCP_OPTIONS: None
def __init__(self) -> None: ...
def __eq__(self, other: object) -> bool: ...
def __ne__(self, other: object) -> bool: ...
@@ -129,9 +130,9 @@ class URLParameters(Parameters):
def __init__(self, url: str) -> None: ...
class SSLOptions:
context: Incomplete
server_hostname: Incomplete
def __init__(self, context, server_hostname: Incomplete | None = None) -> None: ...
context: ssl.SSLContext
server_hostname: str | None
def __init__(self, context: ssl.SSLContext, server_hostname: str | None = None) -> None: ...
class Connection(AbstractBase, metaclass=abc.ABCMeta):
ON_CONNECTION_CLOSED: Final[str]
+7 -1
View File
@@ -1 +1,7 @@
def create_log_exception_decorator(logger): ...
from collections.abc import Callable
from logging import Logger
from typing import Any, TypeVar
_F = TypeVar("_F", bound=Callable[..., Any])
def create_log_exception_decorator(logger: Logger) -> Callable[[_F], _F]: ...
+13 -11
View File
@@ -1,4 +1,6 @@
from _typeshed import Incomplete
from collections.abc import Sequence
from pika.adapters.blocking_connection import ReturnedMessage
class AMQPError(Exception): ...
class AMQPConnectionError(AMQPError): ...
@@ -12,11 +14,11 @@ class NoFreeChannels(AMQPConnectionError): ...
class ConnectionWrongStateError(AMQPConnectionError): ...
class ConnectionClosed(AMQPConnectionError):
def __init__(self, reply_code, reply_text) -> None: ...
def __init__(self, reply_code: int, reply_text: str) -> None: ...
@property
def reply_code(self): ...
def reply_code(self) -> int: ...
@property
def reply_text(self): ...
def reply_text(self) -> str: ...
class ConnectionClosedByBroker(ConnectionClosed): ...
class ConnectionClosedByClient(ConnectionClosed): ...
@@ -26,11 +28,11 @@ class AMQPChannelError(AMQPError): ...
class ChannelWrongStateError(AMQPChannelError): ...
class ChannelClosed(AMQPChannelError):
def __init__(self, reply_code, reply_text) -> None: ...
def __init__(self, reply_code: int, reply_text: str) -> None: ...
@property
def reply_code(self): ...
def reply_code(self) -> int: ...
@property
def reply_text(self): ...
def reply_text(self) -> str: ...
class ChannelClosedByBroker(ChannelClosed): ...
class ChannelClosedByClient(ChannelClosed): ...
@@ -38,12 +40,12 @@ class DuplicateConsumerTag(AMQPChannelError): ...
class ConsumerCancelled(AMQPChannelError): ...
class UnroutableError(AMQPChannelError):
messages: Incomplete
def __init__(self, messages) -> None: ...
messages: Sequence[ReturnedMessage]
def __init__(self, messages: Sequence[ReturnedMessage]) -> None: ...
class NackError(AMQPChannelError):
messages: Incomplete
def __init__(self, messages) -> None: ...
messages: Sequence[ReturnedMessage]
def __init__(self, messages: Sequence[ReturnedMessage]) -> None: ...
class InvalidChannelNumber(AMQPError): ...
class ProtocolSyntaxError(AMQPError): ...
+2 -2
View File
@@ -1,6 +1,6 @@
from _typeshed import Incomplete
from logging import Logger
LOGGER: Incomplete
LOGGER: Logger
class HeartbeatChecker:
def __init__(self, connection, timeout) -> None: ...
+338 -332
View File
File diff suppressed because it is too large Load Diff
+7 -4
View File
@@ -1,6 +1,9 @@
from _typeshed import Incomplete
from _socket import SocketType
from logging import Logger
LOGGER: Incomplete
LOGGER: Logger
def socket_requires_keepalive(tcp_options): ...
def set_sock_opts(tcp_options, sock) -> None: ...
_SUPPORTED_TCP_OPTIONS: dict[str, int]
def socket_requires_keepalive(tcp_options: dict[str, int]) -> bool: ...
def set_sock_opts(tcp_options: dict[str, int] | None, sock: SocketType) -> None: ...
+12 -4
View File
@@ -1,4 +1,12 @@
def require_string(value, value_name) -> None: ...
def require_callback(callback, callback_name: str = "callback") -> None: ...
def rpc_completion_callback(callback): ...
def zero_or_greater(name, value) -> None: ...
from collections.abc import Callable
from typing import Literal, overload
def require_string(value: object, value_name: str) -> None: ... # raise TypeError if value is not string
def require_callback(
callback: object, callback_name: str = "callback"
) -> None: ... # raise TypeError if callback is not callable
@overload
def rpc_completion_callback(callback: None) -> Literal[True]: ...
@overload
def rpc_completion_callback(callback: Callable[..., object]) -> Literal[False]: ...
def zero_or_greater(name: str, value: str | float) -> None: ...