Add stubs for package capturer (#11551)

This commit is contained in:
ChenMoFeiJin
2024-04-07 19:32:41 +08:00
committed by GitHub
parent 26c218818b
commit 7e01591643
3 changed files with 126 additions and 0 deletions

View File

@@ -0,0 +1,2 @@
# Tests should not be part of the stubs
capturer.tests.*

View File

@@ -0,0 +1,2 @@
version = "3.0.*"
upstream_repository = "https://github.com/xolox/python-capturer"

122
stubs/capturer/capturer.pyi Normal file
View File

@@ -0,0 +1,122 @@
from _typeshed import FileDescriptorOrPath, SupportsWrite
from collections.abc import Callable
from io import BufferedReader
from multiprocessing import Process
from multiprocessing.queues import Queue
from multiprocessing.synchronize import Event
from signal import Signals
from types import FrameType, TracebackType
from typing import IO, Any, NoReturn
from typing_extensions import Self
__version__: str
aliases: dict[str, Any]
DEFAULT_TEXT_ENCODING: str
GRACEFUL_SHUTDOWN_SIGNAL: Signals
TERMINATION_DELAY: float
PARTIAL_DEFAULT: bool
STDOUT_FD: int
STDERR_FD: int
def enable_old_api() -> None: ...
def create_proxy_method(name: str) -> Callable[..., Any]: ...
class MultiProcessHelper:
processes: list[Process]
def __init__(self) -> None: ...
def start_child(self, target: Callable[[Event], Any]) -> None: ...
def stop_children(self) -> None: ...
def wait_for_children(self) -> None: ...
def enable_graceful_shutdown(self) -> None: ...
def raise_shutdown_request(self, signum: int, frame: FrameType | None) -> NoReturn: ...
class CaptureOutput(MultiProcessHelper):
chunk_size: int
encoding: str
merged: bool
relay: bool
termination_delay: float
pseudo_terminals: list[PseudoTerminal]
streams: list[tuple[int, Stream]]
stdout_stream: Stream
stderr_stream: Stream
output: PseudoTerminal
output_queue: Queue[tuple[Any, bytes]]
stdout: PseudoTerminal
stderr: PseudoTerminal
def __init__(
self, merged: bool = True, encoding: str = ..., termination_delay: float = ..., chunk_size: int = 1024, relay: bool = True
) -> None: ...
def initialize_stream(self, file_obj: IO[str], expected_fd: int) -> Stream: ...
def __enter__(self) -> Self: ...
def __exit__(
self,
exc_type: type[BaseException] | None = None,
exc_val: BaseException | None = None,
exc_tb: TracebackType | None = None,
) -> bool | None: ...
@property
def is_capturing(self) -> bool: ...
def start_capture(self) -> None: ...
def finish_capture(self) -> None: ...
def allocate_pty(
self, relay_fd: int | None = None, output_queue: Queue[tuple[Any, bytes]] | None = None, queue_token: Any | None = None
) -> PseudoTerminal: ...
def merge_loop(self, started_event: Event) -> None: ...
def get_handle(self, partial: bool = ...) -> BufferedReader: ...
def get_bytes(self, partial: bool = ...) -> bytes: ...
def get_lines(self, interpreted: bool = True, partial: bool = ...) -> list[str]: ...
def get_text(self, interpreted: bool = ..., partial: bool = ...) -> str: ...
def save_to_handle(self, handle: SupportsWrite[bytes], partial: bool = ...) -> None: ...
def save_to_path(self, filename: FileDescriptorOrPath, partial: bool = ...) -> None: ...
class OutputBuffer:
fd: int
buffer: bytes
def __init__(self, fd: int) -> None: ...
def add(self, output: bytes) -> None: ...
def flush(self) -> None: ...
class PseudoTerminal(MultiProcessHelper):
encoding: str
termination_delay: float
chunk_size: int
relay_fd: int | None
output_queue: Queue[tuple[Any, bytes]] | None
queue_token: Any | None
streams: list[Stream]
master_fd: int
slave_fd: int
output_fd: int
output_handle: BufferedReader
def __init__(
self,
encoding: str,
termination_delay: float,
chunk_size: int,
relay_fd: int | None,
output_queue: Queue[tuple[int, bytes]] | None,
queue_token: Any | None,
) -> None: ...
def attach(self, stream: Stream) -> None: ...
def start_capture(self) -> None: ...
def finish_capture(self) -> None: ...
def close_pseudo_terminal(self) -> None: ...
def restore_streams(self) -> None: ...
def get_handle(self, partial: bool = ...) -> BufferedReader: ...
def get_bytes(self, partial: bool = ...) -> bytes: ...
def get_lines(self, interpreted: bool = True, partial: bool = ...) -> list[str]: ...
def get_text(self, interpreted: bool = ..., partial: bool = ...) -> str: ...
def save_to_handle(self, handle: SupportsWrite[bytes], partial: bool = ...) -> None: ...
def save_to_path(self, filename: FileDescriptorOrPath, partial: bool = ...) -> None: ...
def capture_loop(self, started_event: Event) -> None: ...
class Stream:
fd: int
original_fd: int
is_redirected: bool
def __init__(self, fd: int) -> None: ...
def redirect(self, target_fd: int) -> None: ...
def restore(self) -> None: ...
class ShutdownRequested(Exception): ...