[aws-xray-sdk] Improve stubs (#14277)

This commit is contained in:
Semyon Moroz
2025-07-07 14:01:33 +00:00
committed by GitHub
parent 1c022395b0
commit 0a26a8320b
36 changed files with 326 additions and 269 deletions
@@ -1,6 +1,5 @@
aws_xray_sdk.core.models.subsegment.subsegment_decorator
aws_xray_sdk.core.sampling.connector.ServiceConnector.fetch_sampling_rules
aws_xray_sdk.core.sampling.sampler.ServiceConnector.fetch_sampling_rules
# We can not import 3rd-party libraries in teststubs runtime,
# but we can use Protocol to replace this types:
@@ -1,18 +1,17 @@
from logging import Logger
from typing import Final
from aws_xray_sdk.core.daemon_config import DaemonConfig as DaemonConfig
from ..exceptions.exceptions import InvalidDaemonAddressException as InvalidDaemonAddressException
from aws_xray_sdk.core.models.entity import Entity
log: Logger
PROTOCOL_HEADER: str
PROTOCOL_DELIMITER: str
DEFAULT_DAEMON_ADDRESS: str
PROTOCOL_HEADER: Final[str]
PROTOCOL_DELIMITER: Final[str]
DEFAULT_DAEMON_ADDRESS: Final[str]
class UDPEmitter:
def __init__(self, daemon_address="127.0.0.1:2000") -> None: ...
def send_entity(self, entity) -> None: ...
def set_daemon_address(self, address) -> None: ...
def __init__(self, daemon_address: str = "127.0.0.1:2000") -> None: ...
def send_entity(self, entity: Entity) -> None: ...
def set_daemon_address(self, address: str | None) -> None: ...
@property
def ip(self): ...
@property
@@ -1,5 +1,3 @@
from ..utils.search_pattern import wildcard_match as wildcard_match
class DefaultDynamicNaming:
def __init__(self, pattern, fallback) -> None: ...
def get_name(self, host_name): ...
def __init__(self, pattern: str, fallback: str) -> None: ...
def get_name(self, host_name: str | None) -> str: ...
@@ -1,29 +1,8 @@
from .noop_traceid import NoOpTraceId as NoOpTraceId
from .segment import Segment as Segment
from .subsegment import Subsegment as Subsegment
from .traceid import TraceId as TraceId
from .segment import Segment
from .subsegment import Subsegment
class DummySegment(Segment):
sampled: bool
def __init__(self, name: str = "dummy") -> None: ...
def set_aws(self, aws_meta) -> None: ...
def put_http_meta(self, key, value) -> None: ...
def put_annotation(self, key, value) -> None: ...
def put_metadata(self, key, value, namespace: str = "default") -> None: ...
def set_user(self, user) -> None: ...
def set_service(self, service_info) -> None: ...
def apply_status_code(self, status_code) -> None: ...
def add_exception(self, exception, stack, remote: bool = False) -> None: ...
def serialize(self) -> None: ...
class DummySubsegment(Subsegment):
sampled: bool
def __init__(self, segment, name: str = "dummy") -> None: ...
def set_aws(self, aws_meta) -> None: ...
def put_http_meta(self, key, value) -> None: ...
def put_annotation(self, key, value) -> None: ...
def put_metadata(self, key, value, namespace: str = "default") -> None: ...
def set_sql(self, sql) -> None: ...
def apply_status_code(self, status_code) -> None: ...
def add_exception(self, exception, stack, remote: bool = False) -> None: ...
def serialize(self) -> None: ...
@@ -1,31 +1,40 @@
from _typeshed import Incomplete
from logging import Logger
from traceback import StackSummary
from typing import Any
from typing import Any, Final, Literal, overload
from .subsegment import Subsegment
from .throwable import Throwable
log: Logger
ORIGIN_TRACE_HEADER_ATTR_KEY: str
ORIGIN_TRACE_HEADER_ATTR_KEY: Final[str]
class Entity:
id: Any
name: Any
start_time: Any
parent_id: Any
id: str
name: str
start_time: float
parent_id: str | None
sampled: bool
in_progress: bool
http: Any
annotations: Any
metadata: Any
aws: Any
cause: Any
subsegments: Any
end_time: Any
def __init__(self, name, entity_id=None) -> None: ...
def close(self, end_time=None) -> None: ...
def add_subsegment(self, subsegment) -> None: ...
def remove_subsegment(self, subsegment) -> None: ...
def put_http_meta(self, key, value) -> None: ...
def put_annotation(self, key, value) -> None: ...
def put_metadata(self, key, value, namespace: str = "default") -> None: ...
http: dict[str, dict[str, str | int]]
annotations: dict[str, float | str | bool]
metadata: dict[str, dict[str, Any]] # value is any object that can be serialized into JSON string
aws: dict[str, Incomplete]
cause: dict[str, str | list[Throwable]]
subsegments: list[Subsegment]
end_time: float
def __init__(self, name: str, entity_id: str | None = None) -> None: ...
def close(self, end_time: float | None = None) -> None: ...
def add_subsegment(self, subsegment: Subsegment) -> None: ...
def remove_subsegment(self, subsegment: Subsegment) -> None: ...
@overload
def put_http_meta(self, key: Literal["status", "content_length"], value: int) -> None: ...
@overload
def put_http_meta(self, key: Literal["url", "method", "user_agent", "client_ip", "x_forwarded_for"], value: str) -> None: ...
def put_annotation(self, key: str, value: float | str | bool) -> None: ...
def put_metadata(
self, key: str, value: Any, namespace: str = "default" # value is any object that can be serialized into JSON string
) -> None: ...
def set_aws(self, aws_meta) -> None: ...
throttle: bool
def add_throttle_flag(self) -> None: ...
@@ -33,9 +42,9 @@ class Entity:
def add_fault_flag(self) -> None: ...
error: bool
def add_error_flag(self) -> None: ...
def apply_status_code(self, status_code) -> None: ...
def apply_status_code(self, status_code: int | None) -> None: ...
def add_exception(self, exception: Exception, stack: StackSummary, remote: bool = False) -> None: ...
def save_origin_trace_header(self, trace_header) -> None: ...
def get_origin_trace_header(self): ...
def serialize(self): ...
def to_dict(self): ...
def serialize(self) -> str: ...
def to_dict(self) -> dict[str, Incomplete]: ...
@@ -1,24 +1,9 @@
from typing import Any
from typing import Final
from .segment import Segment
MUTATION_UNSUPPORTED_MESSAGE: str
MUTATION_UNSUPPORTED_MESSAGE: Final[str]
class FacadeSegment(Segment):
initializing: Any
def __init__(self, name, entityid, traceid, sampled) -> None: ...
def close(self, end_time=None) -> None: ...
def put_http_meta(self, key, value) -> None: ...
def put_annotation(self, key, value) -> None: ...
def put_metadata(self, key, value, namespace: str = "default") -> None: ...
def set_aws(self, aws_meta) -> None: ...
def set_user(self, user) -> None: ...
def add_throttle_flag(self) -> None: ...
def add_fault_flag(self) -> None: ...
def add_error_flag(self) -> None: ...
def add_exception(self, exception, stack, remote: bool = False) -> None: ...
def apply_status_code(self, status_code) -> None: ...
def serialize(self) -> None: ...
def ready_to_send(self): ...
def increment(self) -> None: ...
def decrement_ref_counter(self) -> None: ...
initializing: bool
def __init__(self, name: str, entityid: str | None, traceid: str | None, sampled: bool | None) -> None: ...
@@ -1,13 +1,13 @@
from typing import Any
from typing import Final
URL: str
METHOD: str
USER_AGENT: str
CLIENT_IP: str
X_FORWARDED_FOR: str
STATUS: str
CONTENT_LENGTH: str
XRAY_HEADER: str
ALT_XRAY_HEADER: str
request_keys: Any
response_keys: Any
URL: Final[str]
METHOD: Final[str]
USER_AGENT: Final[str]
CLIENT_IP: Final[str]
X_FORWARDED_FOR: Final[str]
STATUS: Final[str]
CONTENT_LENGTH: Final[str]
XRAY_HEADER: Final[str]
ALT_XRAY_HEADER: Final[str]
request_keys: tuple[str, ...]
response_keys: tuple[str, ...]
@@ -1,6 +1,8 @@
from typing import ClassVar
class NoOpTraceId:
VERSION: str
DELIMITER: str
VERSION: ClassVar[str]
DELIMITER: ClassVar[str]
start_time: str
def __init__(self) -> None: ...
def to_id(self): ...
def to_id(self) -> str: ...
@@ -1,44 +1,59 @@
from _typeshed import Incomplete
from types import TracebackType
from typing import Any
from typing import Final
from ..recorder import AWSXRayRecorder
from ..utils.atomic_counter import AtomicCounter
from .dummy_entities import DummySegment
from .entity import Entity
from .subsegment import Subsegment
ORIGIN_TRACE_HEADER_ATTR_KEY: str
ORIGIN_TRACE_HEADER_ATTR_KEY: Final[str]
class SegmentContextManager:
name: str
segment_kwargs: dict[str, Any]
name: str | None
segment_kwargs: dict[str, str | bool | None]
recorder: AWSXRayRecorder
segment: Segment
def __init__(self, recorder: AWSXRayRecorder, name: str | None = None, **segment_kwargs) -> None: ...
def __enter__(self): ...
segment: Segment | None
def __init__(
self,
recorder: AWSXRayRecorder,
name: str | None = None,
*,
traceid: str | None = None,
parent_id: str | None = None,
sampling: bool | None = None,
) -> None: ...
def __enter__(self) -> DummySegment | Segment: ...
def __exit__(
self, exc_type: type[BaseException] | None, exc_val: BaseException | None, exc_tb: TracebackType | None
) -> None: ...
class Segment(Entity):
trace_id: str | None
id: str | None
trace_id: str
id: str
in_progress: bool
sampled: bool
user: str | None
ref_counter: AtomicCounter
parent_id: str | None
parent_id: str
service: dict[str, str]
def __init__(
self, name, entityid: str | None = None, traceid: str | None = None, parent_id: str | None = None, sampled: bool = True
self,
name: str,
entityid: str | None = None,
traceid: str | None = None,
parent_id: str | None = None,
sampled: bool = True,
) -> None: ...
def add_subsegment(self, subsegment: Subsegment) -> None: ...
def increment(self) -> None: ...
def decrement_ref_counter(self) -> None: ...
def ready_to_send(self): ...
def get_total_subsegments_size(self): ...
def decrement_subsegments_size(self): ...
def remove_subsegment(self, subsegment) -> None: ...
def ready_to_send(self) -> bool: ...
def get_total_subsegments_size(self) -> int: ...
def decrement_subsegments_size(self) -> int: ...
def remove_subsegment(self, subsegment: Subsegment) -> None: ...
def set_user(self, user) -> None: ...
def set_service(self, service_info) -> None: ...
def set_rule_name(self, rule_name) -> None: ...
def to_dict(self): ...
def set_service(self, service_info: dict[str, str]) -> None: ...
def set_rule_name(self, rule_name: str) -> None: ...
def to_dict(self) -> dict[str, Incomplete]: ...
@@ -1,12 +1,13 @@
import time
from _typeshed import Incomplete
from types import TracebackType
from typing import Any
from typing import Final
from ...core import AWSXRayRecorder
from ..recorder import AWSXRayRecorder
from .dummy_entities import DummySubsegment
from .entity import Entity
from .segment import Segment
SUBSEGMENT_RECORDING_ATTRIBUTE: str
SUBSEGMENT_RECORDING_ATTRIBUTE: Final[str]
def set_as_recording(decorated_func, wrapped) -> None: ...
def is_already_recording(func): ...
@@ -14,12 +15,12 @@ def subsegment_decorator(wrapped, instance, args, kwargs): ...
class SubsegmentContextManager:
name: str | None
subsegment_kwargs: dict[str, Any] | None
subsegment_kwargs: dict[str, str]
recorder: AWSXRayRecorder
subsegment: Subsegment
def __init__(self, recorder: AWSXRayRecorder, name=None, **subsegment_kwargs) -> None: ...
def __call__(self, wrapped, instance, args: list[Any], kwargs: dict[str, Any]): ...
def __enter__(self) -> Subsegment | None: ...
subsegment: Subsegment | None
def __init__(self, recorder: AWSXRayRecorder, name: str | None = None, *, namespace: str = "local") -> None: ...
def __call__(self, wrapped, instance, args: list[Incomplete], kwargs: dict[str, Incomplete]): ...
def __enter__(self) -> DummySubsegment | Subsegment | None: ...
def __exit__(
self, exc_type: type[BaseException] | None, exc_val: BaseException | None, exc_tb: TracebackType | None
) -> None: ...
@@ -29,10 +30,10 @@ class Subsegment(Entity):
trace_id: str
type: str
namespace: str
sql: dict[str, Any]
sql: dict[str, Incomplete]
def __init__(self, name: str, namespace: str, segment: Segment) -> None: ...
def add_subsegment(self, subsegment: Subsegment) -> None: ...
def remove_subsegment(self, subsegment: Subsegment) -> None: ...
def close(self, end_time: time.struct_time | None = None) -> None: ...
def set_sql(self, sql: dict[str, Any]) -> None: ...
def to_dict(self) -> dict[str, Any]: ...
def close(self, end_time: float | None = None) -> None: ...
def set_sql(self, sql: dict[str, Incomplete]) -> None: ...
def to_dict(self) -> dict[str, Incomplete]: ...
@@ -1,12 +1,21 @@
from typing import Any
from _typeshed import Incomplete
from logging import Logger
from traceback import StackSummary
from typing import TypedDict, type_check_only
log: Any
@type_check_only
class _StackInfo(TypedDict):
path: str
line: int
label: str
log: Logger
class Throwable:
id: Any
message: Any
type: Any
remote: Any
stack: Any
def __init__(self, exception, stack, remote: bool = False) -> None: ...
def to_dict(self): ...
id: str
message: str
type: str
remote: bool
stack: list[_StackInfo] | None
def __init__(self, exception: Exception, stack: StackSummary, remote: bool = False) -> None: ...
def to_dict(self) -> dict[str, Incomplete]: ...
@@ -1,25 +1,36 @@
from typing import Any
from typing_extensions import Self
from _typeshed import Incomplete
from logging import Logger
from typing import Final, Literal
from typing_extensions import Self, TypeAlias
log: Any
ROOT: str
PARENT: str
SAMPLE: str
SELF: str
HEADER_DELIMITER: str
_SampledTrue: TypeAlias = Literal[True, "1", 1]
_SampledFalse: TypeAlias = Literal[False, "0", 0]
_SampledUnknown: TypeAlias = Literal["?"]
_Sampled: TypeAlias = _SampledTrue | _SampledFalse | _SampledUnknown
log: Logger
ROOT: Final = "Root"
PARENT: Final = "Parent"
SAMPLE: Final = "Sampled"
SELF: Final = "Self"
HEADER_DELIMITER: Final = ";"
class TraceHeader:
def __init__(
self, root: str | None = None, parent: str | None = None, sampled: bool | None = None, data: dict[str, Any] | None = None
self,
root: str | None = None,
parent: str | None = None,
sampled: _Sampled | None = None,
data: dict[str, Incomplete] | None = None,
) -> None: ...
@classmethod
def from_header_str(cls, header) -> Self: ...
def to_header_str(self): ...
def from_header_str(cls, header: str | None) -> Self: ...
def to_header_str(self) -> str: ...
@property
def root(self): ...
def root(self) -> str | None: ...
@property
def parent(self): ...
def parent(self) -> str | None: ...
@property
def sampled(self): ...
def sampled(self) -> Literal[1, 0, "?"] | None: ...
@property
def data(self): ...
def data(self) -> dict[str, Incomplete]: ...
@@ -1,8 +1,8 @@
from typing import Any
from typing import ClassVar
class TraceId:
VERSION: str
DELIMITER: str
start_time: Any
VERSION: ClassVar[str]
DELIMITER: ClassVar[str]
start_time: int
def __init__(self) -> None: ...
def to_id(self): ...
def to_id(self) -> str: ...
@@ -1,10 +1,11 @@
from collections.abc import Iterable
from logging import Logger
from typing import Any
log: Logger
SUPPORTED_MODULES: Any
NO_DOUBLE_PATCH: Any
SUPPORTED_MODULES: tuple[str, ...]
NO_DOUBLE_PATCH: tuple[str, ...]
def patch_all(double_patch: bool = False) -> None: ...
def patch(modules_to_patch: Iterable[str], raise_errors: bool = True, ignore_module_patterns: str | None = None) -> None: ...
def patch(
modules_to_patch: Iterable[str], raise_errors: bool = True, ignore_module_patterns: Iterable[str] | None = None
) -> None: ...
@@ -1,12 +1,17 @@
from typing import Any
from collections.abc import MutableMapping
from logging import Logger
from typing import Any, Final, overload
log: Any
SERVICE_NAME: str
ORIGIN: str
IMDS_URL: str
log: Logger
SERVICE_NAME: Final[str]
ORIGIN: Final[str]
IMDS_URL: Final[str]
def initialize() -> None: ...
def get_token(): ...
def get_metadata(token=None): ...
def parse_metadata_json(json_str): ...
def do_request(url, headers=None, method: str = "GET"): ...
def get_token() -> str | None: ...
def get_metadata(token: str | None = None) -> dict[str, Any]: ... # result of parse_metadata_json()
def parse_metadata_json(json_str: str | bytes | bytearray) -> dict[str, Any]: ... # result of json.loads()
@overload
def do_request(url: str, headers: MutableMapping[str, str] | None = None, method: str = "GET") -> str: ...
@overload
def do_request(url: None, headers: MutableMapping[str, str] | None = None, method: str = "GET") -> None: ...
@@ -1,7 +1,8 @@
from typing import Any
from logging import Logger
from typing import Final
log: Any
SERVICE_NAME: str
ORIGIN: str
log: Logger
SERVICE_NAME: Final[str]
ORIGIN: Final[str]
def initialize() -> None: ...
@@ -1,8 +1,9 @@
from typing import Any
from logging import Logger
from typing import Final
log: Any
CONF_PATH: str
SERVICE_NAME: str
ORIGIN: str
log: Logger
CONF_PATH: Final[str]
SERVICE_NAME: Final[str]
ORIGIN: Final[str]
def initialize() -> None: ...
@@ -1,6 +1,8 @@
from typing import Any
from collections.abc import Iterable
from types import ModuleType
from typing import Final
module_prefix: str
PLUGIN_MAPPING: Any
module_prefix: Final[str]
PLUGIN_MAPPING: Final[dict[str, str]]
def get_plugin_modules(plugins): ...
def get_plugin_modules(plugins: Iterable[str]) -> tuple[ModuleType, ...]: ...
@@ -1,4 +1,5 @@
import time
from _typeshed import FileDescriptorOrPath
from collections.abc import Callable, Iterable
from logging import Logger
from typing import Any
@@ -17,8 +18,8 @@ log: Logger
TRACING_NAME_KEY: str
DAEMON_ADDR_KEY: str
CONTEXT_MISSING_KEY: str
XRAY_META: Any
SERVICE_INFO: Any
XRAY_META: dict[str, dict[str, str]]
SERVICE_INFO: dict[str, str]
class AWSXRayRecorder:
def __init__(self) -> None: ...
@@ -27,7 +28,7 @@ class AWSXRayRecorder:
sampling: bool | None = None,
plugins: Iterable[str] | None = None,
context_missing: str | None = None,
sampling_rules: dict[str, Any] | str | None = None,
sampling_rules: dict[str, Any] | FileDescriptorOrPath | None = None,
daemon_address: str | None = None,
service: str | None = None,
context: Context | None = None,
@@ -85,9 +86,9 @@ class AWSXRayRecorder:
@service.setter
def service(self, value: str) -> None: ...
@property
def dynamic_naming(self) -> Any | DefaultDynamicNaming: ...
def dynamic_naming(self) -> DefaultDynamicNaming | None: ...
@dynamic_naming.setter
def dynamic_naming(self, value: Any | DefaultDynamicNaming) -> None: ...
def dynamic_naming(self, value: DefaultDynamicNaming | str) -> None: ...
@property
def context(self) -> Context: ...
@context.setter
@@ -1,9 +1,15 @@
from _typeshed import Incomplete
from aws_xray_sdk.core.context import Context
from .sampling_rule import SamplingRule
class ServiceConnector:
def __init__(self) -> None: ...
def fetch_sampling_rules(self): ...
def fetch_sampling_target(self, rules): ...
def setup_xray_client(self, ip, port, client) -> None: ...
def fetch_sampling_rules(self) -> list[SamplingRule]: ...
def fetch_sampling_target(self, rules) -> tuple[Incomplete, int]: ...
def setup_xray_client(self, ip: str, port: str | int, client) -> None: ...
@property
def context(self): ...
def context(self) -> Context: ...
@context.setter
def context(self, v) -> None: ...
def context(self, v: Context) -> None: ...
@@ -1,8 +1,6 @@
from typing import Any
class Reservoir:
traces_per_sec: Any
traces_per_sec: int
used_this_sec: int
this_sec: Any
this_sec: int
def __init__(self, traces_per_sec: int = 0) -> None: ...
def take(self): ...
def take(self) -> bool: ...
@@ -1,12 +1,18 @@
from typing import Any
from typing import TypedDict, type_check_only
from typing_extensions import NotRequired
from ...exceptions.exceptions import InvalidSamplingManifestError as InvalidSamplingManifestError
from .sampling_rule import SamplingRule as SamplingRule
from .sampling_rule import SamplingRule, _Rule
local_sampling_rule: Any
SUPPORTED_RULE_VERSION: Any
@type_check_only
class _SamplingRule(TypedDict):
version: NotRequired[int]
default: _Rule
rules: list[_Rule]
local_sampling_rule: _SamplingRule
SUPPORTED_RULE_VERSION: tuple[int, ...]
class LocalSampler:
def __init__(self, rules=...) -> None: ...
def should_trace(self, sampling_req=None): ...
def load_local_rules(self, rules) -> None: ...
def __init__(self, rules: _SamplingRule = ...) -> None: ...
def should_trace(self, sampling_req: SamplingRule | None = None) -> bool: ...
def load_local_rules(self, rules: _SamplingRule) -> None: ...
@@ -1,28 +1,38 @@
from aws_xray_sdk.core.utils.search_pattern import wildcard_match as wildcard_match
from typing import ClassVar, TypedDict, type_check_only
from typing_extensions import NotRequired
from ...exceptions.exceptions import InvalidSamplingManifestError as InvalidSamplingManifestError
from .reservoir import Reservoir as Reservoir
from .reservoir import Reservoir
@type_check_only
class _Rule(TypedDict):
description: NotRequired[str]
host: NotRequired[str]
service_name: NotRequired[str]
http_method: NotRequired[str]
url_path: NotRequired[str]
fixed_target: NotRequired[int]
rate: NotRequired[float]
class SamplingRule:
FIXED_TARGET: str
RATE: str
HOST: str
METHOD: str
PATH: str
SERVICE_NAME: str
def __init__(self, rule_dict, version: int = 2, default: bool = False) -> None: ...
def applies(self, host, method, path): ...
FIXED_TARGET: ClassVar[str]
RATE: ClassVar[str]
HOST: ClassVar[str]
METHOD: ClassVar[str]
PATH: ClassVar[str]
SERVICE_NAME: ClassVar[str]
def __init__(self, rule_dict: _Rule, version: int = 2, default: bool = False) -> None: ...
def applies(self, host: str | None, method: str | None, path: str | None) -> bool: ...
@property
def fixed_target(self): ...
def fixed_target(self) -> int | None: ...
@property
def rate(self): ...
def rate(self) -> float | None: ...
@property
def host(self): ...
def host(self) -> str | None: ...
@property
def method(self): ...
def method(self) -> str | None: ...
@property
def path(self): ...
def path(self) -> str | None: ...
@property
def reservoir(self): ...
def reservoir(self) -> Reservoir: ...
@property
def version(self): ...
@@ -2,12 +2,12 @@ from enum import Enum
class Reservoir:
def __init__(self) -> None: ...
def borrow_or_take(self, now, can_borrow): ...
def load_quota(self, quota, TTL, interval) -> None: ...
def borrow_or_take(self, now: int, can_borrow: bool | None) -> ReservoirDecision | None: ...
def load_quota(self, quota: int | None, TTL: int | None, interval: int | None) -> None: ...
@property
def quota(self): ...
def quota(self) -> int | None: ...
@property
def TTL(self): ...
def TTL(self) -> int | None: ...
class ReservoirDecision(Enum):
TAKE = "take"
@@ -1,6 +1,6 @@
from typing import Any
from typing import Final
TTL: Any
TTL: Final = 3600
class RuleCache:
def __init__(self) -> None: ...
@@ -12,6 +12,6 @@ class RuleCache:
@rules.setter
def rules(self, v) -> None: ...
@property
def last_updated(self): ...
def last_updated(self) -> int | None: ...
@last_updated.setter
def last_updated(self, v) -> None: ...
def last_updated(self, v: int | None) -> None: ...
@@ -1,9 +1,13 @@
from typing import Any
from logging import Logger
from typing import Final
log: Any
DEFAULT_INTERVAL: Any
from .connector import ServiceConnector
from .rule_cache import RuleCache
log: Logger
DEFAULT_INTERVAL: Final = 300
class RulePoller:
def __init__(self, cache, connector) -> None: ...
def __init__(self, cache: RuleCache, connector: ServiceConnector) -> None: ...
def start(self) -> None: ...
def wake_up(self) -> None: ...
@@ -1,13 +1,6 @@
from typing import Any
from logging import Logger
from .connector import ServiceConnector as ServiceConnector
from .local.sampler import LocalSampler as LocalSampler
from .reservoir import ReservoirDecision as ReservoirDecision
from .rule_cache import RuleCache as RuleCache
from .rule_poller import RulePoller as RulePoller
from .target_poller import TargetPoller as TargetPoller
log: Any
log: Logger
class DefaultSampler:
def __init__(self) -> None: ...
@@ -1,6 +1,6 @@
from typing import Any
from logging import Logger
log: Any
log: Logger
class TargetPoller:
def __init__(self, cache, rule_poller, connector) -> None: ...
@@ -1,8 +1,14 @@
from _typeshed import Unused
from collections.abc import Callable
from aws_xray_sdk.core.models.entity import Entity
from aws_xray_sdk.core.models.segment import Segment
class DefaultStreaming:
def __init__(self, streaming_threshold: int = 30) -> None: ...
def is_eligible(self, segment): ...
def stream(self, entity, callback) -> None: ...
def is_eligible(self, segment: Segment) -> bool: ...
def stream(self, entity: Entity, callback: Callable[..., Unused]) -> None: ...
@property
def streaming_threshold(self): ...
def streaming_threshold(self) -> int: ...
@streaming_threshold.setter
def streaming_threshold(self, value) -> None: ...
def streaming_threshold(self, value: int) -> None: ...
@@ -1,9 +1,7 @@
from typing import Any
class AtomicCounter:
value: Any
value: int
def __init__(self, initial: int = 0) -> None: ...
def increment(self, num: int = 1): ...
def decrement(self, num: int = 1): ...
def get_current(self): ...
def reset(self): ...
def increment(self, num: int = 1) -> int: ...
def decrement(self, num: int = 1) -> int: ...
def get_current(self) -> int: ...
def reset(self) -> int: ...
@@ -1,6 +1,4 @@
from typing import Any
annotation_value_types: tuple[type, ...]
annotation_value_types: Any
def is_classmethod(func): ...
def is_instance_method(parent_class, func_name, func): ...
def is_classmethod(func: object) -> bool: ... # argument func is passing to getattr() function
def is_instance_method(parent_class: type, func_name: str, func: object) -> bool: ...
@@ -1,5 +1,13 @@
from typing import Any
from logging import Logger
from typing import Any, TypeVar, overload
log: Any
_K = TypeVar("_K")
def metadata_to_dict(obj): ...
log: Logger
@overload
def metadata_to_dict(obj: dict[_K, Any]) -> dict[_K, Any]: ...
@overload
def metadata_to_dict(obj: type) -> str: ...
@overload
def metadata_to_dict(obj: Any) -> Any: ...
@@ -1 +1,8 @@
def wildcard_match(pattern, text, case_insensitive: bool = True): ...
from typing import Literal, overload
@overload
def wildcard_match(pattern: None, text: str, case_insensitive: bool = True) -> Literal[False]: ...
@overload
def wildcard_match(pattern: str, text: None, case_insensitive: bool = True) -> Literal[False]: ...
@overload
def wildcard_match(pattern: str, text: str, case_insensitive: bool = True) -> bool: ...
@@ -1 +1,3 @@
def get_stacktrace(limit=None): ...
import traceback
def get_stacktrace(limit: int | None = None) -> list[traceback.FrameSummary]: ...
@@ -1,11 +1,12 @@
from logging import Logger
from typing import ClassVar
log: Logger
class SDKConfig:
XRAY_ENABLED_KEY: str
DISABLED_ENTITY_NAME: str
XRAY_ENABLED_KEY: ClassVar[str]
DISABLED_ENTITY_NAME: ClassVar[str]
@classmethod
def sdk_enabled(cls): ...
def sdk_enabled(cls) -> bool: ...
@classmethod
def set_sdk_enabled(cls, value) -> None: ...
def set_sdk_enabled(cls, value: bool | None) -> None: ...
+3 -1
View File
@@ -1 +1,3 @@
VERSION: str
from typing import Final
VERSION: Final[str]