Import gRPC stubs from the grpc-stubs project (#11204)

This commit is contained in:
Blake Williams
2025-04-25 18:39:43 +10:00
committed by GitHub
parent 9dd2ea67d3
commit 22b55fb7fa
33 changed files with 1519 additions and 0 deletions
+1
View File
@@ -44,6 +44,7 @@
"stubs/gdb",
"stubs/geopandas",
"stubs/google-cloud-ndb",
"stubs/grpcio/grpc/__init__.pyi",
"stubs/hdbcli/hdbcli/dbapi.pyi",
"stubs/html5lib",
"stubs/httplib2",
@@ -0,0 +1,11 @@
# Error: is not present at runtime
# =============================
# Error class attributes that aren't defined.
grpc.RpcError.code
grpc.RpcError.details
grpc.RpcError.trailing_metadata
# Error: is inconsistent
# =============================
# Stub class is incomplete.
grpc_reflection.v1alpha._base.BaseReflectionServicer.__init__
@@ -0,0 +1,25 @@
from __future__ import annotations
from typing import Any, cast
from typing_extensions import assert_type
import grpc.aio
# Interceptor casts
client_interceptors: list[grpc.aio.ClientInterceptor] = []
grpc.aio.insecure_channel("target", interceptors=client_interceptors)
server_interceptors: list[grpc.aio.ServerInterceptor[Any, Any]] = []
grpc.aio.server(interceptors=server_interceptors)
# Metadata
async def metadata() -> None:
metadata = await cast(grpc.aio.Call, None).initial_metadata()
assert_type(metadata["foo"], grpc.aio._MetadataValue)
for k in metadata:
assert_type(k, str)
for k, v in metadata.items():
assert_type(k, str)
assert_type(v, grpc.aio._MetadataValue)
@@ -0,0 +1,37 @@
from __future__ import annotations
from typing import Protocol, cast
from typing_extensions import assert_type
import grpc.aio
class DummyRequest:
pass
class DummyReply:
pass
class DummyServiceStub(Protocol):
UnaryUnary: grpc.aio.UnaryUnaryMultiCallable[DummyRequest, DummyReply]
UnaryStream: grpc.aio.UnaryStreamMultiCallable[DummyRequest, DummyReply]
StreamUnary: grpc.aio.StreamUnaryMultiCallable[DummyRequest, DummyReply]
StreamStream: grpc.aio.StreamStreamMultiCallable[DummyRequest, DummyReply]
stub = cast(DummyServiceStub, None)
req = DummyRequest()
async def async_context() -> None:
assert_type(await stub.UnaryUnary(req), DummyReply)
async for resp in stub.UnaryStream(req):
assert_type(resp, DummyReply)
assert_type(await stub.StreamUnary(iter([req])), DummyReply)
async for resp in stub.StreamStream(iter([req])):
assert_type(resp, DummyReply)
@@ -0,0 +1,46 @@
from __future__ import annotations
from typing import Optional, cast
from typing_extensions import assert_type
import grpc
# Channel options:
assert_type(grpc.insecure_channel("target", ()), grpc.Channel)
assert_type(grpc.insecure_channel("target", (("a", "b"),)), grpc.Channel)
assert_type(grpc.insecure_channel("target", (("a", "b"), ("c", "d"))), grpc.Channel)
# Local channel credentials:
creds = grpc.local_channel_credentials(grpc.LocalConnectionType.LOCAL_TCP)
assert_type(creds, grpc.ChannelCredentials)
# Other credential types:
assert_type(grpc.alts_channel_credentials(), grpc.ChannelCredentials)
assert_type(grpc.alts_server_credentials(), grpc.ServerCredentials)
assert_type(grpc.compute_engine_channel_credentials(grpc.CallCredentials("")), grpc.ChannelCredentials)
assert_type(grpc.insecure_server_credentials(), grpc.ServerCredentials)
# XDS credentials:
assert_type(
grpc.xds_channel_credentials(grpc.local_channel_credentials(grpc.LocalConnectionType.LOCAL_TCP)), grpc.ChannelCredentials
)
assert_type(grpc.xds_server_credentials(grpc.insecure_server_credentials()), grpc.ServerCredentials)
# Channel ready future
channel = grpc.insecure_channel("target", ())
assert_type(grpc.channel_ready_future(channel).result(), None)
# Channel options supports list:
assert_type(grpc.insecure_channel("target", []), grpc.Channel)
assert_type(grpc.insecure_channel("target", [("a", "b")]), grpc.Channel)
assert_type(grpc.insecure_channel("target", [("a", "b"), ("c", "d")]), grpc.Channel)
# Client call details optionals:
call_details = grpc.ClientCallDetails()
assert_type(call_details.method, str)
assert_type(call_details.timeout, Optional[float])
# Call iterator
call_iter = cast(grpc._CallIterator[str], None)
for call in call_iter:
assert_type(call, str)
@@ -0,0 +1,36 @@
from __future__ import annotations
from typing import cast
from typing_extensions import assert_type
import grpc
class Request:
pass
class Response:
pass
def unary_unary_call(rq: Request, ctx: grpc.ServicerContext) -> Response:
assert_type(rq, Request)
return Response()
class ServiceHandler(grpc.ServiceRpcHandler[Request, Response]):
def service_name(self) -> str:
return "hello"
def service(self, handler_call_details: grpc.HandlerCallDetails) -> grpc.RpcMethodHandler[Request, Response] | None:
rpc = grpc.RpcMethodHandler[Request, Response]()
rpc.unary_unary = unary_unary_call
return rpc
h = ServiceHandler()
ctx = cast(grpc.ServicerContext, None)
svc = h.service(grpc.HandlerCallDetails())
if svc is not None and svc.unary_unary is not None:
svc.unary_unary(Request(), ctx)
@@ -0,0 +1,35 @@
from __future__ import annotations
from typing import Protocol, cast
from typing_extensions import assert_type
import grpc
class DummyRequest:
pass
class DummyReply:
pass
class DummyServiceStub(Protocol):
UnaryUnary: grpc.UnaryUnaryMultiCallable[DummyRequest, DummyReply]
UnaryStream: grpc.UnaryStreamMultiCallable[DummyRequest, DummyReply]
StreamUnary: grpc.StreamUnaryMultiCallable[DummyRequest, DummyReply]
StreamStream: grpc.StreamStreamMultiCallable[DummyRequest, DummyReply]
stub = cast(DummyServiceStub, None)
req = DummyRequest()
assert_type(stub.UnaryUnary(req), DummyReply)
for resp in stub.UnaryStream(req):
assert_type(resp, DummyReply)
assert_type(stub.StreamUnary(iter([req])), DummyReply)
for resp in stub.StreamStream(iter([req])):
assert_type(resp, DummyReply)
@@ -0,0 +1,9 @@
from __future__ import annotations
from typing import cast
import grpc
from grpc_reflection.v1alpha.reflection import enable_server_reflection
server = cast(grpc.Server, None)
enable_server_reflection(["foo"], server, None)
@@ -0,0 +1,9 @@
from __future__ import annotations
from typing import cast
import grpc.aio
from grpc_reflection.v1alpha.reflection import enable_server_reflection
server = cast(grpc.aio.Server, None)
enable_server_reflection(["foo"], server, None)
@@ -0,0 +1,14 @@
from __future__ import annotations
from typing import Any
import grpc
@grpc.Call.register
class CallProxy:
def __init__(self, target: grpc.Call) -> None:
self._target = target
def __getattr__(self, name: str) -> Any:
return getattr(self._target, name)
@@ -0,0 +1,22 @@
from __future__ import annotations
from collections.abc import Callable
import grpc
class Request:
pass
class Response:
pass
class NoopInterceptor(grpc.ServerInterceptor[Request, Response]):
def intercept_service(
self,
continuation: Callable[[grpc.HandlerCallDetails], grpc.RpcMethodHandler[Request, Response] | None],
handler_call_details: grpc.HandlerCallDetails,
) -> grpc.RpcMethodHandler[Request, Response] | None:
return continuation(handler_call_details)
@@ -0,0 +1,8 @@
from __future__ import annotations
from grpc import Status
from grpc_status import to_status
# XXX: to_status actually expects a "google.rpc.status.Status",
# but the stubs for that aren't present yet.
status: Status = to_status(None)
+15
View File
@@ -0,0 +1,15 @@
version = "1.*"
upstream_repository = "https://github.com/grpc/grpc"
partial_stub = true
requires = [
"types-protobuf",
]
[tool.stubtest]
ignore_missing_stub = true
stubtest_requirements = [
"grpcio-channelz",
"grpcio-health-checking",
"grpcio-reflection",
"grpcio-status",
]
+640
View File
@@ -0,0 +1,640 @@
import abc
import enum
import threading
from collections.abc import Callable, Iterable, Iterator, Mapping, Sequence
from concurrent import futures
from types import ModuleType, TracebackType
from typing import Any, Generic, NoReturn, Protocol, TypeVar, type_check_only
from typing_extensions import Self, TypeAlias
__version__: str
# This class encodes an uninhabited type, requiring use of explicit casts or ignores
# in order to satisfy type checkers. This allows grpc-stubs to add proper stubs
# later, allowing those overrides to be removed.
# The alternative is Any, but a future replacement of Any with a proper type
# would result in type errors where previously the type checker was happy, which
# we want to avoid. Forcing the user to use overrides provides forwards-compatibility.
@type_check_only
class _PartialStubMustCastOrIgnore: ...
# XXX: Early attempts to tame this used literals for all the keys (gRPC is
# a bit segfaulty and doesn't adequately validate the option keys), but that
# didn't quite work out. Maybe it's something we can come back to?
_OptionKeyValue: TypeAlias = tuple[str, Any]
_Options: TypeAlias = Sequence[_OptionKeyValue]
class Compression(enum.IntEnum):
NoCompression = 0
Deflate = 1
Gzip = 2
@enum.unique
class LocalConnectionType(enum.Enum):
UDS = 0
LOCAL_TCP = 1
# XXX: not documented, needs more investigation.
# Some evidence:
# - https://github.com/grpc/grpc/blob/0e1984effd7e977ef18f1ad7fde7d10a2a153e1d/src/python/grpcio_tests/tests/unit/_metadata_test.py#L71
# - https://github.com/grpc/grpc/blob/0e1984effd7e977ef18f1ad7fde7d10a2a153e1d/src/python/grpcio_tests/tests/unit/_metadata_test.py#L58
# - https://github.com/grpc/grpc/blob/0e1984effd7e977ef18f1ad7fde7d10a2a153e1d/src/python/grpcio_tests/tests/unit/_invocation_defects_test.py#L66
_Metadata: TypeAlias = tuple[tuple[str, str | bytes], ...]
_TRequest = TypeVar("_TRequest")
_TResponse = TypeVar("_TResponse")
# XXX: These are probably the SerializeToTring/FromString pb2 methods, but
# this needs further investigation
@type_check_only
class _RequestSerializer(Protocol):
def __call__(self, *args: Any, **kwargs: Any) -> Any: ...
@type_check_only
class _RequestDeserializer(Protocol):
def __call__(self, *args: Any, **kwargs: Any) -> Any: ...
@type_check_only
class _ResponseSerializer(Protocol):
def __call__(self, *args: Any, **kwargs: Any) -> Any: ...
@type_check_only
class _ResponseDeserializer(Protocol):
def __call__(self, *args: Any, **kwargs: Any) -> Any: ...
# Future Interfaces:
class FutureTimeoutError(Exception): ...
class FutureCancelledError(Exception): ...
_TFutureValue = TypeVar("_TFutureValue")
class Future(abc.ABC, Generic[_TFutureValue]):
@abc.abstractmethod
def add_done_callback(self, fn: Callable[[Future[_TFutureValue]], None]) -> None: ...
@abc.abstractmethod
def cancel(self) -> bool: ...
@abc.abstractmethod
def cancelled(self) -> bool: ...
@abc.abstractmethod
def done(self) -> bool: ...
@abc.abstractmethod
def exception(self, timeout: float | None = ...) -> Exception | None: ...
@abc.abstractmethod
def result(self, timeout: float | None = ...) -> _TFutureValue: ...
@abc.abstractmethod
def running(self) -> bool: ...
# FIXME: unsure of the exact return type here. Is it a traceback.StackSummary?
@abc.abstractmethod
def traceback(self, timeout: float | None = ...) -> Any: ...
# Create Client:
def insecure_channel(target: str, options: _Options | None = ..., compression: Compression | None = ...) -> Channel: ...
def secure_channel(
target: str, credentials: ChannelCredentials, options: _Options | None = ..., compression: Compression | None = ...
) -> Channel: ...
_Interceptor: TypeAlias = (
UnaryUnaryClientInterceptor[_TRequest, _TResponse]
| UnaryStreamClientInterceptor[_TRequest, _TResponse]
| StreamUnaryClientInterceptor[_TRequest, _TResponse]
| StreamStreamClientInterceptor[_TRequest, _TResponse]
)
def intercept_channel(channel: Channel, *interceptors: _Interceptor[_TRequest, _TResponse]) -> Channel: ...
# Create Client Credentials:
def ssl_channel_credentials(
root_certificates: bytes | None = ..., private_key: bytes | None = ..., certificate_chain: bytes | None = ...
) -> ChannelCredentials: ...
def local_channel_credentials(local_connect_type: LocalConnectionType = ...) -> ChannelCredentials: ...
def metadata_call_credentials(metadata_plugin: AuthMetadataPlugin, name: str | None = ...) -> CallCredentials: ...
def access_token_call_credentials(access_token: str) -> CallCredentials: ...
def alts_channel_credentials(service_accounts: Sequence[str] | None = ...) -> ChannelCredentials: ...
def compute_engine_channel_credentials(call_credentials: CallCredentials) -> ChannelCredentials: ...
def xds_channel_credentials(fallback_credentials: ChannelCredentials | None = ...) -> ChannelCredentials: ...
# GRPC docs say there should be at least two:
def composite_call_credentials(creds1: CallCredentials, creds2: CallCredentials, *rest: CallCredentials) -> CallCredentials: ...
# Compose a ChannelCredentials and one or more CallCredentials objects.
def composite_channel_credentials(
channel_credentials: ChannelCredentials, call_credentials: CallCredentials, *rest: CallCredentials
) -> ChannelCredentials: ...
# Create Server:
def server(
thread_pool: futures.ThreadPoolExecutor,
handlers: list[GenericRpcHandler[Any, Any]] | None = ...,
interceptors: list[ServerInterceptor[Any, Any]] | None = ...,
options: _Options | None = ...,
maximum_concurrent_rpcs: int | None = ...,
compression: Compression | None = ...,
xds: bool = ...,
) -> Server: ...
# Create Server Credentials:
_CertificateChainPair: TypeAlias = tuple[bytes, bytes]
def ssl_server_credentials(
private_key_certificate_chain_pairs: list[_CertificateChainPair],
root_certificates: bytes | None = ...,
require_client_auth: bool = ...,
) -> ServerCredentials: ...
def local_server_credentials(local_connect_type: LocalConnectionType = ...) -> ServerCredentials: ...
def ssl_server_certificate_configuration(
private_key_certificate_chain_pairs: list[_CertificateChainPair], root_certificates: bytes | None = ...
) -> ServerCertificateConfiguration: ...
def dynamic_ssl_server_credentials(
initial_certificate_configuration: ServerCertificateConfiguration,
certificate_configuration_fetcher: Callable[[], ServerCertificateConfiguration],
require_client_authentication: bool = ...,
) -> ServerCredentials: ...
def alts_server_credentials() -> ServerCredentials: ...
def insecure_server_credentials() -> ServerCredentials: ...
def xds_server_credentials(fallback_credentials: ServerCredentials) -> ServerCredentials: ...
# RPC Method Handlers:
# XXX: This is probably what appears in the add_FooServicer_to_server function
# in the _pb2_grpc files that get generated, which points to the FooServicer
# handler functions that get generated, which look like this:
#
# def FloobDoob(self, request, context):
# return response
#
@type_check_only
class _Behaviour(Protocol):
def __call__(self, *args: Any, **kwargs: Any) -> Any: ...
def unary_unary_rpc_method_handler(
behavior: _Behaviour,
request_deserializer: _RequestDeserializer | None = ...,
response_serializer: _ResponseSerializer | None = ...,
) -> RpcMethodHandler[Any, Any]: ...
def unary_stream_rpc_method_handler(
behavior: _Behaviour,
request_deserializer: _RequestDeserializer | None = ...,
response_serializer: _ResponseSerializer | None = ...,
) -> RpcMethodHandler[Any, Any]: ...
def stream_unary_rpc_method_handler(
behavior: _Behaviour,
request_deserializer: _RequestDeserializer | None = ...,
response_serializer: _ResponseSerializer | None = ...,
) -> RpcMethodHandler[Any, Any]: ...
def stream_stream_rpc_method_handler(
behavior: _Behaviour,
request_deserializer: _RequestDeserializer | None = ...,
response_serializer: _ResponseSerializer | None = ...,
) -> RpcMethodHandler[Any, Any]: ...
def method_handlers_generic_handler(
service: str, method_handlers: dict[str, RpcMethodHandler[Any, Any]]
) -> GenericRpcHandler[Any, Any]: ...
# Channel Ready Future:
def channel_ready_future(channel: Channel) -> Future[None]: ...
# Channel Connectivity:
class ChannelConnectivity(enum.Enum):
IDLE = (0, "idle")
CONNECTING = (1, "connecting")
READY = (2, "ready")
TRANSIENT_FAILURE = (3, "transient failure")
SHUTDOWN = (4, "shutdown")
# gRPC Status Code:
class Status(abc.ABC):
code: StatusCode
# XXX: misnamed property, does not align with status.proto, where it is called 'message':
details: str
trailing_metadata: _Metadata
# https://grpc.github.io/grpc/core/md_doc_statuscodes.html
class StatusCode(enum.Enum):
OK = (0, "ok")
CANCELLED = (1, "cancelled")
UNKNOWN = (2, "unknown")
INVALID_ARGUMENT = (3, "invalid argument")
DEADLINE_EXCEEDED = (4, "deadline exceeded")
NOT_FOUND = (5, "not found")
ALREADY_EXISTS = (6, "already exists")
PERMISSION_DENIED = (7, "permission denied")
RESOURCE_EXHAUSTED = (8, "resource exhausted")
FAILED_PRECONDITION = (9, "failed precondition")
ABORTED = (10, "aborted")
OUT_OF_RANGE = (11, "out of range")
UNIMPLEMENTED = (12, "unimplemented")
INTERNAL = (13, "internal")
UNAVAILABLE = (14, "unavailable")
DATA_LOSS = (15, "data loss")
UNAUTHENTICATED = (16, "unauthenticated")
# Channel Object:
class Channel(abc.ABC):
@abc.abstractmethod
def close(self) -> None: ...
@abc.abstractmethod
def stream_stream(
self,
method: str,
request_serializer: _RequestSerializer | None = ...,
response_deserializer: _ResponseDeserializer | None = ...,
) -> StreamStreamMultiCallable[Any, Any]: ...
@abc.abstractmethod
def stream_unary(
self,
method: str,
request_serializer: _RequestSerializer | None = ...,
response_deserializer: _ResponseDeserializer | None = ...,
) -> StreamUnaryMultiCallable[Any, Any]: ...
@abc.abstractmethod
def subscribe(self, callback: Callable[[ChannelConnectivity], None], try_to_connect: bool = ...) -> None: ...
@abc.abstractmethod
def unary_stream(
self,
method: str,
request_serializer: _RequestSerializer | None = ...,
response_deserializer: _ResponseDeserializer | None = ...,
) -> UnaryStreamMultiCallable[Any, Any]: ...
@abc.abstractmethod
def unary_unary(
self,
method: str,
request_serializer: _RequestSerializer | None = ...,
response_deserializer: _ResponseDeserializer | None = ...,
) -> UnaryUnaryMultiCallable[Any, Any]: ...
@abc.abstractmethod
def unsubscribe(self, callback: Callable[[ChannelConnectivity], None]) -> None: ...
def __enter__(self) -> Self: ...
def __exit__(
self, exc_type: type[BaseException] | None, exc_val: BaseException | None, exc_tb: TracebackType | None
) -> bool | None: ...
# Server Object:
class Server(abc.ABC):
@abc.abstractmethod
def add_generic_rpc_handlers(self, generic_rpc_handlers: Iterable[GenericRpcHandler[Any, Any]]) -> None: ...
# Returns an integer port on which server will accept RPC requests.
@abc.abstractmethod
def add_insecure_port(self, address: str) -> int: ...
# Returns an integer port on which server will accept RPC requests.
@abc.abstractmethod
def add_secure_port(self, address: str, server_credentials: ServerCredentials) -> int: ...
@abc.abstractmethod
def start(self) -> None: ...
# Grace period is in seconds.
@abc.abstractmethod
def stop(self, grace: float | None) -> threading.Event: ...
# Block current thread until the server stops. Returns a bool
# indicates if the operation times out. Timeout is in seconds.
def wait_for_termination(self, timeout: float | None = ...) -> bool: ...
# Authentication & Authorization Objects:
# This class has no supported interface
class ChannelCredentials:
def __init__(self, credentials) -> None: ...
# This class has no supported interface
class CallCredentials:
def __init__(self, credentials) -> None: ...
class AuthMetadataContext(abc.ABC):
service_url: str
method_name: str
class AuthMetadataPluginCallback(abc.ABC):
def __call__(self, metadata: _Metadata, error: Exception | None) -> None: ...
class AuthMetadataPlugin(abc.ABC):
def __call__(self, context: AuthMetadataContext, callback: AuthMetadataPluginCallback) -> None: ...
# This class has no supported interface
class ServerCredentials:
def __init__(self, credentials) -> None: ...
# This class has no supported interface
class ServerCertificateConfiguration:
def __init__(self, certificate_configuration) -> None: ...
# gRPC Exceptions:
@type_check_only
class _Metadatum:
key: str
value: bytes
# FIXME: There is scant documentation about what is actually available in this type.
# The properties here are the properties observed in the wild, and may be inaccurate.
# A better source to confirm their presence needs to be found at some point.
class RpcError(Exception):
def code(self) -> StatusCode: ...
# misnamed property, does not align with status.proto, where it is called 'message':
def details(self) -> str | None: ...
# XXX: This has a slightly different return type to all the other metadata:
def trailing_metadata(self) -> tuple[_Metadatum, ...]: ...
# Shared Context:
class RpcContext(abc.ABC):
@abc.abstractmethod
def add_callback(self, callback: Callable[[], None]) -> bool: ...
@abc.abstractmethod
def cancel(self) -> bool: ...
@abc.abstractmethod
def is_active(self) -> bool: ...
@abc.abstractmethod
def time_remaining(self) -> float: ...
# Client-Side Context:
class Call(RpcContext, metaclass=abc.ABCMeta):
@abc.abstractmethod
def code(self) -> StatusCode: ...
# misnamed property, does not align with status.proto, where it is called 'message':
@abc.abstractmethod
def details(self) -> str: ...
@abc.abstractmethod
def initial_metadata(self) -> _Metadata: ...
@abc.abstractmethod
def trailing_metadata(self) -> _Metadata: ...
# Client-Side Interceptor:
class ClientCallDetails(abc.ABC):
method: str
timeout: float | None
metadata: _Metadata | None
credentials: CallCredentials | None
# "This is an EXPERIMENTAL argument. An optional flag t enable wait for ready mechanism."
wait_for_ready: bool | None
compression: Compression | None
# An object that is both a Call for the RPC and a Future. In the event of
# RPC completion, the return Call-Future's result value will be the
# response message of the RPC. Should the event terminate with non-OK
# status, the returned Call-Future's exception value will be an RpcError.
#
@type_check_only
class _CallFuture(Call, Future[_TResponse], metaclass=abc.ABCMeta): ...
class UnaryUnaryClientInterceptor(abc.ABC, Generic[_TRequest, _TResponse]):
@abc.abstractmethod
def intercept_unary_unary(
self,
# FIXME: decode these cryptic runes to confirm the typing mystery of
# this callable's signature that was left for us by past civilisations:
#
# continuation - A function that proceeds with the invocation by
# executing the next interceptor in chain or invoking the actual RPC
# on the underlying Channel. It is the interceptor's responsibility
# to call it if it decides to move the RPC forward. The interceptor
# can use response_future = continuation(client_call_details,
# request) to continue with the RPC. continuation returns an object
# that is both a Call for the RPC and a Future. In the event of RPC
# completion, the return Call-Future's result value will be the
# response message of the RPC. Should the event terminate with non-OK
# status, the returned Call-Future's exception value will be an
# RpcError.
#
continuation: Callable[[ClientCallDetails, _TRequest], _CallFuture[_TResponse]],
client_call_details: ClientCallDetails,
request: _TRequest,
) -> _CallFuture[_TResponse]: ...
@type_check_only
class _CallIterator(Call, Generic[_TResponse], metaclass=abc.ABCMeta):
def __iter__(self) -> Iterator[_TResponse]: ...
class UnaryStreamClientInterceptor(abc.ABC, Generic[_TRequest, _TResponse]):
@abc.abstractmethod
def intercept_unary_stream(
self,
continuation: Callable[[ClientCallDetails, _TRequest], _CallIterator[_TResponse]],
client_call_details: ClientCallDetails,
request: _TRequest,
) -> _CallIterator[_TResponse]: ...
class StreamUnaryClientInterceptor(abc.ABC, Generic[_TRequest, _TResponse]):
@abc.abstractmethod
def intercept_stream_unary(
self,
continuation: Callable[[ClientCallDetails, _TRequest], _CallFuture[_TResponse]],
client_call_details: ClientCallDetails,
request_iterator: Iterator[_TRequest],
) -> _CallFuture[_TResponse]: ...
class StreamStreamClientInterceptor(abc.ABC, Generic[_TRequest, _TResponse]):
@abc.abstractmethod
def intercept_stream_stream(
self,
continuation: Callable[[ClientCallDetails, _TRequest], _CallIterator[_TResponse]],
client_call_details: ClientCallDetails,
request_iterator: Iterator[_TRequest],
) -> _CallIterator[_TResponse]: ...
# Service-Side Context:
class ServicerContext(RpcContext, metaclass=abc.ABCMeta):
# misnamed parameter 'details', does not align with status.proto, where it is called 'message':
@abc.abstractmethod
def abort(self, code: StatusCode, details: str) -> NoReturn: ...
@abc.abstractmethod
def abort_with_status(self, status: Status) -> NoReturn: ...
# FIXME: The docs say "A map of strings to an iterable of bytes for each auth property".
# Does that mean 'bytes' (which is iterable), or 'Iterable[bytes]'?
@abc.abstractmethod
def auth_context(self) -> Mapping[str, bytes]: ...
def disable_next_message_compression(self) -> None: ...
@abc.abstractmethod
def invocation_metadata(self) -> _Metadata: ...
@abc.abstractmethod
def peer(self) -> str: ...
@abc.abstractmethod
def peer_identities(self) -> Iterable[bytes] | None: ...
@abc.abstractmethod
def peer_identity_key(self) -> str | None: ...
@abc.abstractmethod
def send_initial_metadata(self, initial_metadata: _Metadata) -> None: ...
@abc.abstractmethod
def set_code(self, code: StatusCode) -> None: ...
def set_compression(self, compression: Compression) -> None: ...
@abc.abstractmethod
def set_trailing_metadata(self, trailing_metadata: _Metadata) -> None: ...
# misnamed function 'details', does not align with status.proto, where it is called 'message':
@abc.abstractmethod
def set_details(self, details: str) -> None: ...
def trailing_metadata(self) -> _Metadata: ...
# Service-Side Handler:
class RpcMethodHandler(abc.ABC, Generic[_TRequest, _TResponse]):
request_streaming: bool
response_streaming: bool
# XXX: not clear from docs whether this is optional or not
request_deserializer: _RequestDeserializer | None
# XXX: not clear from docs whether this is optional or not
response_serializer: _ResponseSerializer | None
unary_unary: Callable[[_TRequest, ServicerContext], _TResponse] | None
unary_stream: Callable[[_TRequest, ServicerContext], Iterator[_TResponse]] | None
stream_unary: Callable[[Iterator[_TRequest], ServicerContext], _TResponse] | None
stream_stream: Callable[[Iterator[_TRequest], ServicerContext], Iterator[_TResponse]] | None
class HandlerCallDetails(abc.ABC):
method: str
invocation_metadata: _Metadata
class GenericRpcHandler(abc.ABC, Generic[_TRequest, _TResponse]):
@abc.abstractmethod
def service(self, handler_call_details: HandlerCallDetails) -> RpcMethodHandler[_TRequest, _TResponse] | None: ...
class ServiceRpcHandler(GenericRpcHandler[_TRequest, _TResponse], metaclass=abc.ABCMeta):
@abc.abstractmethod
def service_name(self) -> str: ...
# Service-Side Interceptor:
class ServerInterceptor(abc.ABC, Generic[_TRequest, _TResponse]):
@abc.abstractmethod
def intercept_service(
self,
continuation: Callable[[HandlerCallDetails], RpcMethodHandler[_TRequest, _TResponse] | None],
handler_call_details: HandlerCallDetails,
) -> RpcMethodHandler[_TRequest, _TResponse] | None: ...
# Multi-Callable Interfaces:
class UnaryUnaryMultiCallable(abc.ABC, Generic[_TRequest, _TResponse]):
@abc.abstractmethod
def __call__(
self,
request: _TRequest,
timeout: float | None = ...,
metadata: _Metadata | None = ...,
credentials: CallCredentials | None = ...,
# FIXME: optional bool seems weird, but that's what the docs suggest
wait_for_ready: bool | None = ...,
compression: Compression | None = ...,
) -> _TResponse: ...
@abc.abstractmethod
def future(
self,
request: _TRequest,
timeout: float | None = ...,
metadata: _Metadata | None = ...,
credentials: CallCredentials | None = ...,
# FIXME: optional bool seems weird, but that's what the docs suggest
wait_for_ready: bool | None = ...,
compression: Compression | None = ...,
) -> _CallFuture[_TResponse]: ...
@abc.abstractmethod
def with_call(
self,
request: _TRequest,
timeout: float | None = ...,
metadata: _Metadata | None = ...,
credentials: CallCredentials | None = ...,
# FIXME: optional bool seems weird, but that's what the docs suggest
wait_for_ready: bool | None = ...,
compression: Compression | None = ...,
# FIXME: Return value is documented as "The response value for the RPC and a Call value for the RPC";
# this is slightly unclear so this return type is a best-effort guess.
) -> tuple[_TResponse, Call]: ...
class UnaryStreamMultiCallable(abc.ABC, Generic[_TRequest, _TResponse]):
@abc.abstractmethod
def __call__(
self,
request: _TRequest,
timeout: float | None = ...,
metadata: _Metadata | None = ...,
credentials: CallCredentials | None = ...,
# FIXME: optional bool seems weird, but that's what the docs suggest
wait_for_ready: bool | None = ...,
compression: Compression | None = ...,
) -> _CallIterator[_TResponse]: ...
class StreamUnaryMultiCallable(abc.ABC, Generic[_TRequest, _TResponse]):
@abc.abstractmethod
def __call__(
self,
request_iterator: Iterator[_TRequest],
timeout: float | None = ...,
metadata: _Metadata | None = ...,
credentials: CallCredentials | None = ...,
# FIXME: optional bool seems weird, but that's what the docs suggest
wait_for_ready: bool | None = ...,
compression: Compression | None = ...,
) -> _TResponse: ...
@abc.abstractmethod
def future(
self,
request_iterator: Iterator[_TRequest],
timeout: float | None = ...,
metadata: _Metadata | None = ...,
credentials: CallCredentials | None = ...,
# FIXME: optional bool seems weird, but that's what the docs suggest
wait_for_ready: bool | None = ...,
compression: Compression | None = ...,
) -> _CallFuture[_TResponse]: ...
@abc.abstractmethod
def with_call(
self,
request_iterator: Iterator[_TRequest],
timeout: float | None = ...,
metadata: _Metadata | None = ...,
credentials: CallCredentials | None = ...,
# FIXME: optional bool seems weird, but that's what the docs suggest
wait_for_ready: bool | None = ...,
compression: Compression | None = ...,
# FIXME: Return value is documented as "The response value for the RPC and a Call value for the RPC";
# this is slightly unclear so this return type is a best-effort guess.
) -> tuple[_TResponse, Call]: ...
class StreamStreamMultiCallable(abc.ABC, Generic[_TRequest, _TResponse]):
@abc.abstractmethod
def __call__(
self,
request_iterator: Iterator[_TRequest],
timeout: float | None = ...,
metadata: _Metadata | None = ...,
credentials: CallCredentials | None = ...,
# FIXME: optional bool seems weird, but that's what the docs suggest
wait_for_ready: bool | None = ...,
compression: Compression | None = ...,
) -> _CallIterator[_TResponse]: ...
# Runtime Protobuf Parsing:
def protos(protobuf_path: str) -> ModuleType: ...
def services(protobuf_path: str) -> ModuleType: ...
def protos_and_services(protobuf_path: str) -> tuple[ModuleType, ModuleType]: ...
+455
View File
@@ -0,0 +1,455 @@
import abc
import asyncio
from _typeshed import Incomplete
from collections.abc import AsyncIterable, AsyncIterator, Awaitable, Callable, Generator, Iterable, Iterator, Mapping, Sequence
from concurrent import futures
from types import TracebackType
from typing import Any, Generic, NoReturn, TypeVar, overload, type_check_only
from typing_extensions import Self, TypeAlias
from grpc import (
CallCredentials,
ChannelConnectivity,
ChannelCredentials,
Compression,
GenericRpcHandler,
HandlerCallDetails,
RpcError,
RpcMethodHandler,
ServerCredentials,
StatusCode,
_Options,
)
_TRequest = TypeVar("_TRequest")
_TResponse = TypeVar("_TResponse")
# Exceptions:
class BaseError(Exception): ...
class UsageError(BaseError): ...
class AbortError(BaseError): ...
class InternalError(BaseError): ...
class AioRpcError(RpcError):
def __init__(
self,
code: StatusCode,
initial_metadata: Metadata,
trailing_metadata: Metadata,
details: str | None = ...,
debug_error_string: str | None = ...,
) -> None: ...
# FIXME: confirm if these are present in the parent type. The remaining
# methods already exist.
def debug_error_string(self) -> str: ...
def initial_metadata(self) -> Metadata: ...
# Create Client:
class ClientInterceptor(metaclass=abc.ABCMeta): ...
def insecure_channel(
target: str,
options: _Options | None = ...,
compression: Compression | None = ...,
interceptors: Sequence[ClientInterceptor] | None = ...,
) -> Channel: ...
def secure_channel(
target: str,
credentials: ChannelCredentials,
options: _Options | None = ...,
compression: Compression | None = ...,
interceptors: Sequence[ClientInterceptor] | None = ...,
) -> Channel: ...
# Create Server:
def server(
migration_thread_pool: futures.Executor | None = ...,
handlers: Sequence[GenericRpcHandler[Any, Any]] | None = ...,
interceptors: Sequence[ServerInterceptor[Any, Any]] | None = ...,
options: _Options | None = ...,
maximum_concurrent_rpcs: int | None = ...,
compression: Compression | None = ...,
) -> Server: ...
# Channel Object:
# XXX: The docs suggest these type signatures for aio, but not for non-async,
# and it's unclear why;
# https://grpc.github.io/grpc/python/grpc_asyncio.html#grpc.aio.Channel.stream_stream
_RequestSerializer: TypeAlias = Callable[[Any], bytes]
_ResponseDeserializer: TypeAlias = Callable[[bytes], Any]
class Channel(abc.ABC):
@abc.abstractmethod
async def close(self, grace: float | None = ...) -> None: ...
@abc.abstractmethod
def get_state(self, try_to_connect: bool = ...) -> ChannelConnectivity: ...
@abc.abstractmethod
async def wait_for_state_change(self, last_observed_state: ChannelConnectivity) -> None: ...
@abc.abstractmethod
def stream_stream(
self,
method: str,
request_serializer: _RequestSerializer | None = ...,
response_deserializer: _ResponseDeserializer | None = ...,
) -> StreamStreamMultiCallable[Any, Any]: ...
@abc.abstractmethod
def stream_unary(
self,
method: str,
request_serializer: _RequestSerializer | None = ...,
response_deserializer: _ResponseDeserializer | None = ...,
) -> StreamUnaryMultiCallable[Any, Any]: ...
@abc.abstractmethod
def unary_stream(
self,
method: str,
request_serializer: _RequestSerializer | None = ...,
response_deserializer: _ResponseDeserializer | None = ...,
) -> UnaryStreamMultiCallable[Any, Any]: ...
@abc.abstractmethod
def unary_unary(
self,
method: str,
request_serializer: _RequestSerializer | None = ...,
response_deserializer: _ResponseDeserializer | None = ...,
) -> UnaryUnaryMultiCallable[Any, Any]: ...
@abc.abstractmethod
async def __aenter__(self) -> Self: ...
@abc.abstractmethod
async def __aexit__(
self, exc_type: type[BaseException] | None, exc_val: BaseException | None, exc_tb: TracebackType | None
) -> bool | None: ...
@abc.abstractmethod
async def channel_ready(self) -> None: ...
# Server Object:
class Server(metaclass=abc.ABCMeta):
@abc.abstractmethod
def add_generic_rpc_handlers(self, generic_rpc_handlers: Iterable[GenericRpcHandler[Any, Any]]) -> None: ...
# Returns an integer port on which server will accept RPC requests.
@abc.abstractmethod
def add_insecure_port(self, address: str) -> int: ...
# Returns an integer port on which server will accept RPC requests.
@abc.abstractmethod
def add_secure_port(self, address: str, server_credentials: ServerCredentials) -> int: ...
@abc.abstractmethod
async def start(self) -> None: ...
# Grace period is in seconds.
@abc.abstractmethod
async def stop(self, grace: float | None) -> None: ...
# Returns a bool indicates if the operation times out. Timeout is in seconds.
@abc.abstractmethod
async def wait_for_termination(self, timeout: float | None = ...) -> bool: ...
# Client-Side Context:
_DoneCallbackType: TypeAlias = Callable[[Any], None]
_EOFType: TypeAlias = object
class RpcContext(metaclass=abc.ABCMeta):
@abc.abstractmethod
def cancelled(self) -> bool: ...
@abc.abstractmethod
def done(self) -> bool: ...
@abc.abstractmethod
def time_remaining(self) -> float | None: ...
@abc.abstractmethod
def cancel(self) -> bool: ...
@abc.abstractmethod
def add_done_callback(self, callback: _DoneCallbackType) -> None: ...
class Call(RpcContext, metaclass=abc.ABCMeta):
@abc.abstractmethod
async def initial_metadata(self) -> Metadata: ...
@abc.abstractmethod
async def trailing_metadata(self) -> Metadata: ...
@abc.abstractmethod
async def code(self) -> StatusCode: ...
@abc.abstractmethod
async def details(self) -> str: ...
@abc.abstractmethod
async def wait_for_connection(self) -> None: ...
class UnaryUnaryCall(Call, Generic[_TRequest, _TResponse], metaclass=abc.ABCMeta):
@abc.abstractmethod
def __await__(self) -> Generator[None, None, _TResponse]: ...
class UnaryStreamCall(Call, Generic[_TRequest, _TResponse], metaclass=abc.ABCMeta):
@abc.abstractmethod
def __aiter__(self) -> AsyncIterator[_TResponse]: ...
@abc.abstractmethod
async def read(self) -> _EOFType | _TResponse: ...
class StreamUnaryCall(Call, Generic[_TRequest, _TResponse], metaclass=abc.ABCMeta):
@abc.abstractmethod
async def write(self, request: _TRequest) -> None: ...
@abc.abstractmethod
async def done_writing(self) -> None: ...
@abc.abstractmethod
def __await__(self) -> Generator[None, None, _TResponse]: ...
class StreamStreamCall(Call, Generic[_TRequest, _TResponse], metaclass=abc.ABCMeta):
@abc.abstractmethod
def __aiter__(self) -> AsyncIterator[_TResponse]: ...
@abc.abstractmethod
async def read(self) -> _EOFType | _TResponse: ...
@abc.abstractmethod
async def write(self, request: _TRequest) -> None: ...
@abc.abstractmethod
async def done_writing(self) -> None: ...
# Service-Side Context:
@type_check_only
class _DoneCallback(Generic[_TRequest, _TResponse]):
def __call__(self, ctx: ServicerContext[_TRequest, _TResponse]) -> None: ...
class ServicerContext(Generic[_TRequest, _TResponse], metaclass=abc.ABCMeta):
@abc.abstractmethod
async def abort(self, code: StatusCode, details: str = ..., trailing_metadata: _MetadataType = ...) -> NoReturn: ...
@abc.abstractmethod
async def read(self) -> _TRequest: ...
@abc.abstractmethod
async def write(self, message: _TResponse) -> None: ...
@abc.abstractmethod
async def send_initial_metadata(self, initial_metadata: _MetadataType) -> None: ...
def add_done_callback(self, callback: _DoneCallback[_TRequest, _TResponse]) -> None: ...
@abc.abstractmethod
def set_trailing_metadata(self, trailing_metadata: _MetadataType) -> None: ...
@abc.abstractmethod
def invocation_metadata(self) -> Metadata | None: ...
@abc.abstractmethod
def set_code(self, code: StatusCode) -> None: ...
@abc.abstractmethod
def set_details(self, details: str) -> None: ...
@abc.abstractmethod
def set_compression(self, compression: Compression) -> None: ...
@abc.abstractmethod
def disable_next_message_compression(self) -> None: ...
@abc.abstractmethod
def peer(self) -> str: ...
@abc.abstractmethod
def peer_identities(self) -> Iterable[bytes] | None: ...
@abc.abstractmethod
def peer_identity_key(self) -> str | None: ...
@abc.abstractmethod
def auth_context(self) -> Mapping[str, Iterable[bytes]]: ...
def time_remaining(self) -> float: ...
def trailing_metadata(self) -> Metadata: ...
def code(self) -> StatusCode: ...
def details(self) -> str: ...
def cancelled(self) -> bool: ...
def done(self) -> bool: ...
# Client-Side Interceptor:
class ClientCallDetails(abc.ABC):
def __init__(
self,
method: str,
timeout: float | None,
metadata: Metadata | None,
credentials: CallCredentials | None,
wait_for_ready: bool | None,
) -> None: ...
method: str
timeout: float | None
metadata: Metadata | None
credentials: CallCredentials | None
# "This is an EXPERIMENTAL argument. An optional flag t enable wait for ready mechanism."
wait_for_ready: bool | None
# As at 1.53.0, this is not supported in aio:
# compression: Compression | None
@type_check_only
class _InterceptedCall(Generic[_TRequest, _TResponse]):
def __init__(self, interceptors_task: asyncio.Task[Any]) -> None: ...
def __del__(self) -> None: ...
def cancel(self) -> bool: ...
def cancelled(self) -> bool: ...
def done(self) -> bool: ...
def add_done_callback(self, callback: _DoneCallback[_TRequest, _TResponse]) -> None: ...
def time_remaining(self) -> float | None: ...
async def initial_metadata(self) -> Metadata | None: ...
async def trailing_metadata(self) -> Metadata | None: ...
async def code(self) -> StatusCode: ...
async def details(self) -> str: ...
async def debug_error_string(self) -> str | None: ...
async def wait_for_connection(self) -> None: ...
class InterceptedUnaryUnaryCall(_InterceptedCall[_TRequest, _TResponse], metaclass=abc.ABCMeta):
def __await__(self) -> Generator[Incomplete, None, _TResponse]: ...
def __init__(
self,
interceptors: Sequence[UnaryUnaryClientInterceptor[_TRequest, _TResponse]],
request: _TRequest,
timeout: float | None,
metadata: Metadata,
credentials: CallCredentials | None,
wait_for_ready: bool | None,
channel: Channel,
method: bytes,
request_serializer: _RequestSerializer,
response_deserializer: _ResponseDeserializer,
loop: asyncio.AbstractEventLoop,
) -> None: ...
# pylint: disable=too-many-arguments
async def _invoke(
self,
interceptors: Sequence[UnaryUnaryClientInterceptor[_TRequest, _TResponse]],
method: bytes,
timeout: float | None,
metadata: Metadata | None,
credentials: CallCredentials | None,
wait_for_ready: bool | None,
request: _TRequest,
request_serializer: _RequestSerializer,
response_deserializer: _ResponseDeserializer,
) -> UnaryUnaryCall[_TRequest, _TResponse]: ...
def time_remaining(self) -> float | None: ...
class UnaryUnaryClientInterceptor(Generic[_TRequest, _TResponse], metaclass=abc.ABCMeta):
@abc.abstractmethod
async def intercept_unary_unary(
self,
# XXX: See equivalent function in grpc types for notes about continuation:
continuation: Callable[[ClientCallDetails, _TRequest], UnaryUnaryCall[_TRequest, _TResponse]],
client_call_details: ClientCallDetails,
request: _TRequest,
) -> _TResponse: ...
class UnaryStreamClientInterceptor(Generic[_TRequest, _TResponse], metaclass=abc.ABCMeta):
@abc.abstractmethod
async def intercept_unary_stream(
self,
continuation: Callable[[ClientCallDetails, _TRequest], UnaryStreamCall[_TRequest, _TResponse]],
client_call_details: ClientCallDetails,
request: _TRequest,
) -> AsyncIterable[_TResponse] | UnaryStreamCall[_TRequest, _TResponse]: ...
class StreamUnaryClientInterceptor(Generic[_TRequest, _TResponse], metaclass=abc.ABCMeta):
@abc.abstractmethod
async def intercept_stream_unary(
self,
continuation: Callable[[ClientCallDetails, _TRequest], StreamUnaryCall[_TRequest, _TResponse]],
client_call_details: ClientCallDetails,
request_iterator: AsyncIterable[_TRequest] | Iterable[_TRequest],
) -> AsyncIterable[_TResponse] | UnaryStreamCall[_TRequest, _TResponse]: ...
class StreamStreamClientInterceptor(Generic[_TRequest, _TResponse], metaclass=abc.ABCMeta):
@abc.abstractmethod
async def intercept_stream_stream(
self,
continuation: Callable[[ClientCallDetails, _TRequest], StreamStreamCall[_TRequest, _TResponse]],
client_call_details: ClientCallDetails,
request_iterator: AsyncIterable[_TRequest] | Iterable[_TRequest],
) -> AsyncIterable[_TResponse] | StreamStreamCall[_TRequest, _TResponse]: ...
# Server-Side Interceptor:
class ServerInterceptor(Generic[_TRequest, _TResponse], metaclass=abc.ABCMeta):
@abc.abstractmethod
async def intercept_service(
self,
continuation: Callable[[HandlerCallDetails], Awaitable[RpcMethodHandler[_TRequest, _TResponse]]],
handler_call_details: HandlerCallDetails,
) -> RpcMethodHandler[_TRequest, _TResponse]: ...
# Multi-Callable Interfaces:
class UnaryUnaryMultiCallable(Generic[_TRequest, _TResponse], metaclass=abc.ABCMeta):
@abc.abstractmethod
def __call__(
self,
request: _TRequest,
*,
timeout: float | None = ...,
metadata: _MetadataType | None = ...,
credentials: CallCredentials | None = ...,
# FIXME: optional bool seems weird, but that's what the docs suggest
wait_for_ready: bool | None = ...,
compression: Compression | None = ...,
) -> UnaryUnaryCall[_TRequest, _TResponse]: ...
class UnaryStreamMultiCallable(Generic[_TRequest, _TResponse], metaclass=abc.ABCMeta):
@abc.abstractmethod
def __call__(
self,
request: _TRequest,
*,
timeout: float | None = ...,
metadata: _MetadataType | None = ...,
credentials: CallCredentials | None = ...,
# FIXME: optional bool seems weird, but that's what the docs suggest
wait_for_ready: bool | None = ...,
compression: Compression | None = ...,
) -> UnaryStreamCall[_TRequest, _TResponse]: ...
class StreamUnaryMultiCallable(Generic[_TRequest, _TResponse], metaclass=abc.ABCMeta):
@abc.abstractmethod
def __call__(
self,
request_iterator: AsyncIterator[_TRequest] | Iterator[_TRequest] | None = None,
timeout: float | None = ...,
metadata: _MetadataType | None = ...,
credentials: CallCredentials | None = ...,
# FIXME: optional bool seems weird, but that's what the docs suggest
wait_for_ready: bool | None = ...,
compression: Compression | None = ...,
) -> StreamUnaryCall[_TRequest, _TResponse]: ...
class StreamStreamMultiCallable(Generic[_TRequest, _TResponse], metaclass=abc.ABCMeta):
@abc.abstractmethod
def __call__(
self,
request_iterator: AsyncIterator[_TRequest] | Iterator[_TRequest] | None = None,
timeout: float | None = ...,
metadata: _MetadataType | None = ...,
credentials: CallCredentials | None = ...,
# FIXME: optional bool seems weird, but that's what the docs suggest
wait_for_ready: bool | None = ...,
compression: Compression | None = ...,
) -> StreamStreamCall[_TRequest, _TResponse]: ...
# Metadata:
_MetadataKey: TypeAlias = str
_MetadataValue: TypeAlias = str | bytes
_MetadatumType: TypeAlias = tuple[_MetadataKey, _MetadataValue]
_MetadataType: TypeAlias = Metadata | Sequence[_MetadatumType]
_T = TypeVar("_T")
class Metadata(Mapping[_MetadataKey, _MetadataValue]):
def __init__(self, *args: tuple[_MetadataKey, _MetadataValue]) -> None: ...
@classmethod
def from_tuple(cls, raw_metadata: tuple[_MetadataKey, _MetadataValue]) -> Metadata: ...
def add(self, key: _MetadataKey, value: _MetadataValue) -> None: ...
def __len__(self) -> int: ...
def __getitem__(self, key: _MetadataKey) -> _MetadataValue: ...
def __setitem__(self, key: _MetadataKey, value: _MetadataValue) -> None: ...
def __delitem__(self, key: _MetadataKey) -> None: ...
def delete_all(self, key: _MetadataKey) -> None: ...
def __iter__(self) -> Iterator[_MetadataKey]: ...
@overload
def get(self, key: _MetadataKey) -> _MetadataValue | None: ...
@overload
def get(self, key: _MetadataKey, default: _T) -> _MetadataValue | _T: ...
def get_all(self, key: _MetadataKey) -> list[_MetadataValue]: ...
def set_all(self, key: _MetadataKey, values: list[_MetadataValue]) -> None: ...
def __contains__(self, key: object) -> bool: ...
def __eq__(self, other: object) -> bool: ...
def __add__(self, other: Any) -> Metadata: ...
+3
View File
@@ -0,0 +1,3 @@
from _typeshed import Incomplete
def __getattr__(name: str) -> Incomplete: ...
@@ -0,0 +1,3 @@
from _typeshed import Incomplete
def __getattr__(name: str) -> Incomplete: ...
@@ -0,0 +1,25 @@
import grpc_channelz.v1.channelz_pb2 as _channelz_pb2
import grpc_channelz.v1.channelz_pb2_grpc as _channelz_pb2_grpc
from grpc import ServicerContext
class ChannelzServicer(_channelz_pb2_grpc.ChannelzServicer):
@staticmethod
def GetTopChannels(
request: _channelz_pb2.GetTopChannelsRequest, context: ServicerContext
) -> _channelz_pb2.GetTopChannelsResponse: ...
@staticmethod
def GetServers(request: _channelz_pb2.GetServersRequest, context: ServicerContext) -> _channelz_pb2.GetServersResponse: ...
@staticmethod
def GetServer(request: _channelz_pb2.GetServerRequest, context: ServicerContext) -> _channelz_pb2.GetServerResponse: ...
@staticmethod
def GetServerSockets(
request: _channelz_pb2.GetServerSocketsRequest, context: ServicerContext
) -> _channelz_pb2.GetServerSocketsResponse: ...
@staticmethod
def GetChannel(request: _channelz_pb2.GetChannelRequest, context: ServicerContext) -> _channelz_pb2.GetChannelResponse: ...
@staticmethod
def GetSubchannel(
request: _channelz_pb2.GetSubchannelRequest, context: ServicerContext
) -> _channelz_pb2.GetSubchannelResponse: ...
@staticmethod
def GetSocket(request: _channelz_pb2.GetSocketRequest, context: ServicerContext) -> _channelz_pb2.GetSocketResponse: ...
@@ -0,0 +1,3 @@
from grpc import Server
def add_channelz_servicer(server: Server) -> None: ...
@@ -0,0 +1,16 @@
from _typeshed import Incomplete
GetTopChannelsRequest = Incomplete
GetTopChannelsResponse = Incomplete
GetServersRequest = Incomplete
GetServersResponse = Incomplete
GetServerRequest = Incomplete
GetServerResponse = Incomplete
GetServerSocketsRequest = Incomplete
GetServerSocketsResponse = Incomplete
GetChannelRequest = Incomplete
GetChannelResponse = Incomplete
GetSubchannelRequest = Incomplete
GetSubchannelResponse = Incomplete
GetSocketRequest = Incomplete
GetSocketResponse = Incomplete
@@ -0,0 +1,3 @@
from _typeshed import Incomplete
ChannelzServicer = Incomplete
+3
View File
@@ -0,0 +1,3 @@
from _typeshed import Incomplete
def __getattr__(name: str) -> Incomplete: ...
+3
View File
@@ -0,0 +1,3 @@
from _typeshed import Incomplete
def __getattr__(name: str) -> Incomplete: ...
+34
View File
@@ -0,0 +1,34 @@
from concurrent import futures
from typing import Any, Protocol
from grpc import ServicerContext
from grpc_health.v1 import health_pb2 as _health_pb2, health_pb2_grpc as _health_pb2_grpc
SERVICE_NAME: str
OVERALL_HEALTH: str
class _Watcher:
def __init__(self) -> None: ...
def __iter__(self) -> _Watcher: ...
def next(self) -> _health_pb2.HealthCheckResponse: ...
def __next__(self) -> _health_pb2.HealthCheckResponse: ...
def add(self, response: _health_pb2.HealthCheckResponse) -> None: ...
def close(self) -> None: ...
# FIXME: This needs further investigation
class _SendResponseCallback(Protocol):
def __call__(self, *args: Any, **kwargs: Any) -> Any: ...
class HealthServicer(_health_pb2_grpc.HealthServicer):
def __init__(
self, experimental_non_blocking: bool = ..., experimental_thread_pool: futures.ThreadPoolExecutor | None = ...
) -> None: ...
def Check(self, request: _health_pb2.HealthCheckRequest, context: ServicerContext) -> _health_pb2.HealthCheckResponse: ...
def Watch(
self,
request: _health_pb2.HealthCheckRequest,
context: ServicerContext,
send_response_callback: _SendResponseCallback | None = ...,
) -> _health_pb2.HealthCheckResponse: ...
def set(self, service: str, status: _health_pb2.HealthCheckResponse.ServingStatus) -> None: ...
def enter_graceful_shutdown(self) -> None: ...
@@ -0,0 +1,3 @@
from _typeshed import Incomplete
def __getattr__(name: str) -> Incomplete: ...
@@ -0,0 +1,6 @@
from _typeshed import Incomplete
def __getattr__(name: str) -> Incomplete: ...
# FIXME: Incomplete
class HealthServicer: ...
@@ -0,0 +1,3 @@
from _typeshed import Incomplete
def __getattr__(name: str) -> Incomplete: ...
@@ -0,0 +1,3 @@
from _typeshed import Incomplete
def __getattr__(name: str) -> Incomplete: ...
@@ -0,0 +1,6 @@
from _typeshed import Incomplete
def __getattr__(name: str) -> Incomplete: ...
# FIXME: Incomplete
class BaseReflectionServicer: ...
@@ -0,0 +1,23 @@
import typing_extensions
from _typeshed import Incomplete
from collections.abc import Iterable
import grpc
from google.protobuf import descriptor_pool
from grpc import aio
from grpc_reflection.v1alpha import reflection_pb2 as _reflection_pb2
from grpc_reflection.v1alpha._base import BaseReflectionServicer
SERVICE_NAME: str
_AnyServer: typing_extensions.TypeAlias = grpc.Server | aio.Server
_AnyServicerContext: typing_extensions.TypeAlias = grpc.ServicerContext | aio.ServicerContext[Incomplete, Incomplete]
class ReflectionServicer(BaseReflectionServicer):
def ServerReflectionInfo(
self, request_iterator: Iterable[_reflection_pb2.ServerReflectionRequest], context: _AnyServicerContext
) -> None: ...
def enable_server_reflection(
service_names: Iterable[str], server: _AnyServer, pool: descriptor_pool.DescriptorPool | None = ...
) -> None: ...
@@ -0,0 +1,3 @@
from _typeshed import Incomplete
def __getattr__(name: str) -> Incomplete: ...
+3
View File
@@ -0,0 +1,3 @@
from _typeshed import Incomplete
def __getattr__(name: str) -> Incomplete: ...
+13
View File
@@ -0,0 +1,13 @@
from typing import Any
import grpc
# XXX: don't yet know how to add a stub for google.rpc.status_pb2.Status
# without affecting other stuff; may need to make a stub-only package for
# google.rpc as well.
# Returns a google.rpc.status.Status message corresponding to a given grpc.Call.
def from_call(call: grpc.Call) -> Any: ...
# Convert a google.rpc.status.Status message to grpc.Status.
def to_status(status: Any) -> grpc.Status: ...