diff --git a/stubs/capturer/@tests/stubtest_allowlist.txt b/stubs/capturer/@tests/stubtest_allowlist.txt new file mode 100644 index 000000000..6f3cc357f --- /dev/null +++ b/stubs/capturer/@tests/stubtest_allowlist.txt @@ -0,0 +1,2 @@ +# Tests should not be part of the stubs +capturer.tests.* diff --git a/stubs/capturer/METADATA.toml b/stubs/capturer/METADATA.toml new file mode 100644 index 000000000..01ed91c42 --- /dev/null +++ b/stubs/capturer/METADATA.toml @@ -0,0 +1,2 @@ +version = "3.0.*" +upstream_repository = "https://github.com/xolox/python-capturer" diff --git a/stubs/capturer/capturer.pyi b/stubs/capturer/capturer.pyi new file mode 100644 index 000000000..48759b77a --- /dev/null +++ b/stubs/capturer/capturer.pyi @@ -0,0 +1,122 @@ +from _typeshed import FileDescriptorOrPath, SupportsWrite +from collections.abc import Callable +from io import BufferedReader +from multiprocessing import Process +from multiprocessing.queues import Queue +from multiprocessing.synchronize import Event +from signal import Signals +from types import FrameType, TracebackType +from typing import IO, Any, NoReturn +from typing_extensions import Self + +__version__: str +aliases: dict[str, Any] +DEFAULT_TEXT_ENCODING: str +GRACEFUL_SHUTDOWN_SIGNAL: Signals +TERMINATION_DELAY: float +PARTIAL_DEFAULT: bool +STDOUT_FD: int +STDERR_FD: int + +def enable_old_api() -> None: ... +def create_proxy_method(name: str) -> Callable[..., Any]: ... + +class MultiProcessHelper: + processes: list[Process] + def __init__(self) -> None: ... + def start_child(self, target: Callable[[Event], Any]) -> None: ... + def stop_children(self) -> None: ... + def wait_for_children(self) -> None: ... + def enable_graceful_shutdown(self) -> None: ... + def raise_shutdown_request(self, signum: int, frame: FrameType | None) -> NoReturn: ... + +class CaptureOutput(MultiProcessHelper): + chunk_size: int + encoding: str + merged: bool + relay: bool + termination_delay: float + pseudo_terminals: list[PseudoTerminal] + streams: list[tuple[int, Stream]] + stdout_stream: Stream + stderr_stream: Stream + output: PseudoTerminal + output_queue: Queue[tuple[Any, bytes]] + stdout: PseudoTerminal + stderr: PseudoTerminal + def __init__( + self, merged: bool = True, encoding: str = ..., termination_delay: float = ..., chunk_size: int = 1024, relay: bool = True + ) -> None: ... + def initialize_stream(self, file_obj: IO[str], expected_fd: int) -> Stream: ... + def __enter__(self) -> Self: ... + def __exit__( + self, + exc_type: type[BaseException] | None = None, + exc_val: BaseException | None = None, + exc_tb: TracebackType | None = None, + ) -> bool | None: ... + @property + def is_capturing(self) -> bool: ... + def start_capture(self) -> None: ... + def finish_capture(self) -> None: ... + def allocate_pty( + self, relay_fd: int | None = None, output_queue: Queue[tuple[Any, bytes]] | None = None, queue_token: Any | None = None + ) -> PseudoTerminal: ... + def merge_loop(self, started_event: Event) -> None: ... + def get_handle(self, partial: bool = ...) -> BufferedReader: ... + def get_bytes(self, partial: bool = ...) -> bytes: ... + def get_lines(self, interpreted: bool = True, partial: bool = ...) -> list[str]: ... + def get_text(self, interpreted: bool = ..., partial: bool = ...) -> str: ... + def save_to_handle(self, handle: SupportsWrite[bytes], partial: bool = ...) -> None: ... + def save_to_path(self, filename: FileDescriptorOrPath, partial: bool = ...) -> None: ... + +class OutputBuffer: + fd: int + buffer: bytes + def __init__(self, fd: int) -> None: ... + def add(self, output: bytes) -> None: ... + def flush(self) -> None: ... + +class PseudoTerminal(MultiProcessHelper): + encoding: str + termination_delay: float + chunk_size: int + relay_fd: int | None + output_queue: Queue[tuple[Any, bytes]] | None + queue_token: Any | None + streams: list[Stream] + master_fd: int + slave_fd: int + output_fd: int + output_handle: BufferedReader + def __init__( + self, + encoding: str, + termination_delay: float, + chunk_size: int, + relay_fd: int | None, + output_queue: Queue[tuple[int, bytes]] | None, + queue_token: Any | None, + ) -> None: ... + def attach(self, stream: Stream) -> None: ... + def start_capture(self) -> None: ... + def finish_capture(self) -> None: ... + def close_pseudo_terminal(self) -> None: ... + def restore_streams(self) -> None: ... + def get_handle(self, partial: bool = ...) -> BufferedReader: ... + def get_bytes(self, partial: bool = ...) -> bytes: ... + def get_lines(self, interpreted: bool = True, partial: bool = ...) -> list[str]: ... + def get_text(self, interpreted: bool = ..., partial: bool = ...) -> str: ... + def save_to_handle(self, handle: SupportsWrite[bytes], partial: bool = ...) -> None: ... + def save_to_path(self, filename: FileDescriptorOrPath, partial: bool = ...) -> None: ... + def capture_loop(self, started_event: Event) -> None: ... + +class Stream: + fd: int + original_fd: int + is_redirected: bool + def __init__(self, fd: int) -> None: ... + def redirect(self, target_fd: int) -> None: ... + def restore(self) -> None: ... + +class ShutdownRequested(Exception): ...