grpcio: improve client interceptors (#14831)

This commit is contained in:
Robin McCorkell
2025-10-08 11:14:46 +01:00
committed by GitHub
parent af0ee4015b
commit 0e10493a41
2 changed files with 59 additions and 45 deletions
+23 -27
View File
@@ -76,13 +76,10 @@ def secure_channel(
) -> Channel: ...
_Interceptor: TypeAlias = (
UnaryUnaryClientInterceptor[_TRequest, _TResponse]
| UnaryStreamClientInterceptor[_TRequest, _TResponse]
| StreamUnaryClientInterceptor[_TRequest, _TResponse]
| StreamStreamClientInterceptor[_TRequest, _TResponse]
UnaryUnaryClientInterceptor | UnaryStreamClientInterceptor | StreamUnaryClientInterceptor | StreamStreamClientInterceptor
)
def intercept_channel(channel: Channel, *interceptors: _Interceptor[_TRequest, _TResponse]) -> Channel: ...
def intercept_channel(channel: Channel, *interceptors: _Interceptor) -> Channel: ...
# Create Client Credentials:
@@ -378,25 +375,13 @@ class ClientCallDetails(abc.ABC):
@type_check_only
class _CallFuture(Call, Future[_TResponse], metaclass=abc.ABCMeta): ...
class UnaryUnaryClientInterceptor(abc.ABC, Generic[_TRequest, _TResponse]):
class UnaryUnaryClientInterceptor(abc.ABC):
# This method (not the class) is generic over _TRequest and _TResponse
# and the types must satisfy the no-op implementation of
# `return continuation(client_call_details, request)`.
@abc.abstractmethod
def intercept_unary_unary(
self,
# FIXME: decode these cryptic runes to confirm the typing mystery of
# this callable's signature that was left for us by past civilisations:
#
# continuation - A function that proceeds with the invocation by
# executing the next interceptor in chain or invoking the actual RPC
# on the underlying Channel. It is the interceptor's responsibility
# to call it if it decides to move the RPC forward. The interceptor
# can use response_future = continuation(client_call_details,
# request) to continue with the RPC. continuation returns an object
# that is both a Call for the RPC and a Future. In the event of RPC
# completion, the return Call-Future's result value will be the
# response message of the RPC. Should the event terminate with non-OK
# status, the returned Call-Future's exception value will be an
# RpcError.
#
continuation: Callable[[ClientCallDetails, _TRequest], _CallFuture[_TResponse]],
client_call_details: ClientCallDetails,
request: _TRequest,
@@ -407,7 +392,10 @@ class _CallIterator(Call, Generic[_TResponse], metaclass=abc.ABCMeta):
def __iter__(self) -> Iterator[_TResponse]: ...
def __next__(self) -> _TResponse: ...
class UnaryStreamClientInterceptor(abc.ABC, Generic[_TRequest, _TResponse]):
class UnaryStreamClientInterceptor(abc.ABC):
# This method (not the class) is generic over _TRequest and _TResponse
# and the types must satisfy the no-op implementation of
# `return continuation(client_call_details, request)`.
@abc.abstractmethod
def intercept_unary_stream(
self,
@@ -416,20 +404,26 @@ class UnaryStreamClientInterceptor(abc.ABC, Generic[_TRequest, _TResponse]):
request: _TRequest,
) -> _CallIterator[_TResponse]: ...
class StreamUnaryClientInterceptor(abc.ABC, Generic[_TRequest, _TResponse]):
class StreamUnaryClientInterceptor(abc.ABC):
# This method (not the class) is generic over _TRequest and _TResponse
# and the types must satisfy the no-op implementation of
# `return continuation(client_call_details, request_iterator)`.
@abc.abstractmethod
def intercept_stream_unary(
self,
continuation: Callable[[ClientCallDetails, _TRequest], _CallFuture[_TResponse]],
continuation: Callable[[ClientCallDetails, Iterator[_TRequest]], _CallFuture[_TResponse]],
client_call_details: ClientCallDetails,
request_iterator: Iterator[_TRequest],
) -> _CallFuture[_TResponse]: ...
class StreamStreamClientInterceptor(abc.ABC, Generic[_TRequest, _TResponse]):
class StreamStreamClientInterceptor(abc.ABC):
# This method (not the class) is generic over _TRequest and _TResponse
# and the types must satisfy the no-op implementation of
# `return continuation(client_call_details, request_iterator)`.
@abc.abstractmethod
def intercept_stream_stream(
self,
continuation: Callable[[ClientCallDetails, _TRequest], _CallIterator[_TResponse]],
continuation: Callable[[ClientCallDetails, Iterator[_TRequest]], _CallIterator[_TResponse]],
client_call_details: ClientCallDetails,
request_iterator: Iterator[_TRequest],
) -> _CallIterator[_TResponse]: ...
@@ -505,7 +499,9 @@ class ServiceRpcHandler(GenericRpcHandler, metaclass=abc.ABCMeta):
# Service-Side Interceptor:
class ServerInterceptor(abc.ABC):
# This method (not the class) is generic over _TRequest and _TResponse.
# This method (not the class) is generic over _TRequest and _TResponse
# and the types must satisfy the no-op implementation of
# `return continuation(handler_call_details)`.
@abc.abstractmethod
def intercept_service(
self,
+36 -18
View File
@@ -45,8 +45,6 @@ class AioRpcError(RpcError):
# Create Client:
class ClientInterceptor(metaclass=abc.ABCMeta): ...
def insecure_channel(
target: str,
options: _Options | None = None,
@@ -288,7 +286,7 @@ class InterceptedUnaryUnaryCall(_InterceptedCall[_TRequest, _TResponse], metacla
def __await__(self) -> Generator[Incomplete, None, _TResponse]: ...
def __init__(
self,
interceptors: Sequence[UnaryUnaryClientInterceptor[_TRequest, _TResponse]],
interceptors: Sequence[UnaryUnaryClientInterceptor],
request: _TRequest,
timeout: float | None,
metadata: Metadata,
@@ -304,7 +302,7 @@ class InterceptedUnaryUnaryCall(_InterceptedCall[_TRequest, _TResponse], metacla
# pylint: disable=too-many-arguments
async def _invoke(
self,
interceptors: Sequence[UnaryUnaryClientInterceptor[_TRequest, _TResponse]],
interceptors: Sequence[UnaryUnaryClientInterceptor],
method: bytes,
timeout: float | None,
metadata: Metadata | None,
@@ -316,47 +314,67 @@ class InterceptedUnaryUnaryCall(_InterceptedCall[_TRequest, _TResponse], metacla
) -> UnaryUnaryCall[_TRequest, _TResponse]: ...
def time_remaining(self) -> float | None: ...
class UnaryUnaryClientInterceptor(Generic[_TRequest, _TResponse], metaclass=abc.ABCMeta):
class ClientInterceptor(metaclass=abc.ABCMeta): ...
class UnaryUnaryClientInterceptor(ClientInterceptor, metaclass=abc.ABCMeta):
# This method (not the class) is generic over _TRequest and _TResponse
# and the types must satisfy the no-op implementation of
# `return await continuation(client_call_details, request)`.
@abc.abstractmethod
async def intercept_unary_unary(
self,
# XXX: See equivalent function in grpc types for notes about continuation:
continuation: Callable[[ClientCallDetails, _TRequest], UnaryUnaryCall[_TRequest, _TResponse]],
continuation: Callable[[ClientCallDetails, _TRequest], Awaitable[UnaryUnaryCall[_TRequest, _TResponse]]],
client_call_details: ClientCallDetails,
request: _TRequest,
) -> _TResponse: ...
) -> _TResponse | UnaryUnaryCall[_TRequest, _TResponse]: ...
class UnaryStreamClientInterceptor(Generic[_TRequest, _TResponse], metaclass=abc.ABCMeta):
class UnaryStreamClientInterceptor(ClientInterceptor, metaclass=abc.ABCMeta):
# This method (not the class) is generic over _TRequest and _TResponse
# and the types must satisfy the no-op implementation of
# `return await continuation(client_call_details, request)`.
@abc.abstractmethod
async def intercept_unary_stream(
self,
continuation: Callable[[ClientCallDetails, _TRequest], UnaryStreamCall[_TRequest, _TResponse]],
continuation: Callable[[ClientCallDetails, _TRequest], Awaitable[UnaryStreamCall[_TRequest, _TResponse]]],
client_call_details: ClientCallDetails,
request: _TRequest,
) -> AsyncIterable[_TResponse] | UnaryStreamCall[_TRequest, _TResponse]: ...
) -> AsyncIterator[_TResponse] | UnaryStreamCall[_TRequest, _TResponse]: ...
class StreamUnaryClientInterceptor(Generic[_TRequest, _TResponse], metaclass=abc.ABCMeta):
class StreamUnaryClientInterceptor(ClientInterceptor, metaclass=abc.ABCMeta):
# This method (not the class) is generic over _TRequest and _TResponse
# and the types must satisfy the no-op implementation of
# `return await continuation(client_call_details, request_iterator)`.
@abc.abstractmethod
async def intercept_stream_unary(
self,
continuation: Callable[[ClientCallDetails, _TRequest], StreamUnaryCall[_TRequest, _TResponse]],
continuation: Callable[
[ClientCallDetails, AsyncIterable[_TRequest] | Iterable[_TRequest]], Awaitable[StreamUnaryCall[_TRequest, _TResponse]]
],
client_call_details: ClientCallDetails,
request_iterator: AsyncIterable[_TRequest] | Iterable[_TRequest],
) -> AsyncIterable[_TResponse] | UnaryStreamCall[_TRequest, _TResponse]: ...
) -> _TResponse | StreamUnaryCall[_TRequest, _TResponse]: ...
class StreamStreamClientInterceptor(Generic[_TRequest, _TResponse], metaclass=abc.ABCMeta):
class StreamStreamClientInterceptor(ClientInterceptor, metaclass=abc.ABCMeta):
# This method (not the class) is generic over _TRequest and _TResponse
# and the types must satisfy the no-op implementation of
# `return await continuation(client_call_details, request_iterator)`.
@abc.abstractmethod
async def intercept_stream_stream(
self,
continuation: Callable[[ClientCallDetails, _TRequest], StreamStreamCall[_TRequest, _TResponse]],
continuation: Callable[
[ClientCallDetails, AsyncIterable[_TRequest] | Iterable[_TRequest]],
Awaitable[StreamStreamCall[_TRequest, _TResponse]],
],
client_call_details: ClientCallDetails,
request_iterator: AsyncIterable[_TRequest] | Iterable[_TRequest],
) -> AsyncIterable[_TResponse] | StreamStreamCall[_TRequest, _TResponse]: ...
) -> AsyncIterator[_TResponse] | StreamStreamCall[_TRequest, _TResponse]: ...
# Server-Side Interceptor:
class ServerInterceptor(metaclass=abc.ABCMeta):
# This method (not the class) is generic over _TRequest and _TResponse.
# This method (not the class) is generic over _TRequest and _TResponse
# and the types must satisfy the no-op implementation of
# `return await continuation(handler_call_details)`.
@abc.abstractmethod
async def intercept_service(
self,