mirror of
https://github.com/davidhalter/typeshed.git
synced 2025-12-08 04:54:47 +08:00
123 lines
4.5 KiB
Python
123 lines
4.5 KiB
Python
from _typeshed import FileDescriptorOrPath, SupportsWrite
|
|
from collections.abc import Callable
|
|
from io import BufferedReader
|
|
from multiprocessing import Process
|
|
from multiprocessing.queues import Queue
|
|
from multiprocessing.synchronize import Event
|
|
from signal import Signals
|
|
from types import FrameType, TracebackType
|
|
from typing import IO, Any, NoReturn
|
|
from typing_extensions import Self
|
|
|
|
__version__: str
|
|
aliases: dict[str, Any]
|
|
DEFAULT_TEXT_ENCODING: str
|
|
GRACEFUL_SHUTDOWN_SIGNAL: Signals
|
|
TERMINATION_DELAY: float
|
|
PARTIAL_DEFAULT: bool
|
|
STDOUT_FD: int
|
|
STDERR_FD: int
|
|
|
|
def enable_old_api() -> None: ...
|
|
def create_proxy_method(name: str) -> Callable[..., Any]: ...
|
|
|
|
class MultiProcessHelper:
|
|
processes: list[Process]
|
|
def __init__(self) -> None: ...
|
|
def start_child(self, target: Callable[[Event], Any]) -> None: ...
|
|
def stop_children(self) -> None: ...
|
|
def wait_for_children(self) -> None: ...
|
|
def enable_graceful_shutdown(self) -> None: ...
|
|
def raise_shutdown_request(self, signum: int, frame: FrameType | None) -> NoReturn: ...
|
|
|
|
class CaptureOutput(MultiProcessHelper):
|
|
chunk_size: int
|
|
encoding: str
|
|
merged: bool
|
|
relay: bool
|
|
termination_delay: float
|
|
pseudo_terminals: list[PseudoTerminal]
|
|
streams: list[tuple[int, Stream]]
|
|
stdout_stream: Stream
|
|
stderr_stream: Stream
|
|
output: PseudoTerminal
|
|
output_queue: Queue[tuple[Any, bytes]]
|
|
stdout: PseudoTerminal
|
|
stderr: PseudoTerminal
|
|
def __init__(
|
|
self, merged: bool = True, encoding: str = ..., termination_delay: float = ..., chunk_size: int = 1024, relay: bool = True
|
|
) -> None: ...
|
|
def initialize_stream(self, file_obj: IO[str], expected_fd: int) -> Stream: ...
|
|
def __enter__(self) -> Self: ...
|
|
def __exit__(
|
|
self,
|
|
exc_type: type[BaseException] | None = None,
|
|
exc_val: BaseException | None = None,
|
|
exc_tb: TracebackType | None = None,
|
|
) -> bool | None: ...
|
|
@property
|
|
def is_capturing(self) -> bool: ...
|
|
def start_capture(self) -> None: ...
|
|
def finish_capture(self) -> None: ...
|
|
def allocate_pty(
|
|
self, relay_fd: int | None = None, output_queue: Queue[tuple[Any, bytes]] | None = None, queue_token: Any | None = None
|
|
) -> PseudoTerminal: ...
|
|
def merge_loop(self, started_event: Event) -> None: ...
|
|
def get_handle(self, partial: bool = ...) -> BufferedReader: ...
|
|
def get_bytes(self, partial: bool = ...) -> bytes: ...
|
|
def get_lines(self, interpreted: bool = True, partial: bool = ...) -> list[str]: ...
|
|
def get_text(self, interpreted: bool = ..., partial: bool = ...) -> str: ...
|
|
def save_to_handle(self, handle: SupportsWrite[bytes], partial: bool = ...) -> None: ...
|
|
def save_to_path(self, filename: FileDescriptorOrPath, partial: bool = ...) -> None: ...
|
|
|
|
class OutputBuffer:
|
|
fd: int
|
|
buffer: bytes
|
|
def __init__(self, fd: int) -> None: ...
|
|
def add(self, output: bytes) -> None: ...
|
|
def flush(self) -> None: ...
|
|
|
|
class PseudoTerminal(MultiProcessHelper):
|
|
encoding: str
|
|
termination_delay: float
|
|
chunk_size: int
|
|
relay_fd: int | None
|
|
output_queue: Queue[tuple[Any, bytes]] | None
|
|
queue_token: Any | None
|
|
streams: list[Stream]
|
|
master_fd: int
|
|
slave_fd: int
|
|
output_fd: int
|
|
output_handle: BufferedReader
|
|
def __init__(
|
|
self,
|
|
encoding: str,
|
|
termination_delay: float,
|
|
chunk_size: int,
|
|
relay_fd: int | None,
|
|
output_queue: Queue[tuple[int, bytes]] | None,
|
|
queue_token: Any | None,
|
|
) -> None: ...
|
|
def attach(self, stream: Stream) -> None: ...
|
|
def start_capture(self) -> None: ...
|
|
def finish_capture(self) -> None: ...
|
|
def close_pseudo_terminal(self) -> None: ...
|
|
def restore_streams(self) -> None: ...
|
|
def get_handle(self, partial: bool = ...) -> BufferedReader: ...
|
|
def get_bytes(self, partial: bool = ...) -> bytes: ...
|
|
def get_lines(self, interpreted: bool = True, partial: bool = ...) -> list[str]: ...
|
|
def get_text(self, interpreted: bool = ..., partial: bool = ...) -> str: ...
|
|
def save_to_handle(self, handle: SupportsWrite[bytes], partial: bool = ...) -> None: ...
|
|
def save_to_path(self, filename: FileDescriptorOrPath, partial: bool = ...) -> None: ...
|
|
def capture_loop(self, started_event: Event) -> None: ...
|
|
|
|
class Stream:
|
|
fd: int
|
|
original_fd: int
|
|
is_redirected: bool
|
|
def __init__(self, fd: int) -> None: ...
|
|
def redirect(self, target_fd: int) -> None: ...
|
|
def restore(self) -> None: ...
|
|
|
|
class ShutdownRequested(Exception): ...
|