Files
typeshed/stdlib/multiprocessing/util.pyi
Kevin Kirsche 48e502c431 fix: Add missing multiprocessing.util types (#8404)
Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
Co-authored-by: Alex Waygood <Alex.Waygood@Gmail.com>
2022-07-27 20:22:54 +01:00

88 lines
2.4 KiB
Python

import threading
from _typeshed import Incomplete, ReadableBuffer, SupportsTrunc
from collections.abc import Callable, Iterable, Mapping, MutableMapping, Sequence
from logging import Logger
from typing import Any, SupportsInt
from typing_extensions import SupportsIndex
__all__ = [
"sub_debug",
"debug",
"info",
"sub_warning",
"get_logger",
"log_to_stderr",
"get_temp_dir",
"register_after_fork",
"is_exiting",
"Finalize",
"ForkAwareThreadLock",
"ForkAwareLocal",
"close_all_fds_except",
"SUBDEBUG",
"SUBWARNING",
]
NOTSET: int
SUBDEBUG: int
DEBUG: int
INFO: int
SUBWARNING: int
LOGGER_NAME: str
DEFAULT_LOGGING_FORMAT: str
def sub_debug(msg: object, *args: object) -> None: ...
def debug(msg: object, *args: object) -> None: ...
def info(msg: object, *args: object) -> None: ...
def sub_warning(msg: object, *args: object) -> None: ...
def get_logger() -> Logger: ...
def log_to_stderr(level: int | None = ...) -> Logger: ...
def is_abstract_socket_namespace(address: str | bytes | None) -> bool: ...
abstract_sockets_supported: bool
def get_temp_dir() -> str: ...
def register_after_fork(obj: Incomplete, func: Callable[[Incomplete], object]) -> None: ...
class Finalize:
def __init__(
self,
obj: Incomplete | None,
callback: Callable[..., Incomplete],
args: Sequence[Any] = ...,
kwargs: Mapping[str, Any] | None = ...,
exitpriority: int | None = ...,
) -> None: ...
def __call__(
self,
wr: object = ...,
_finalizer_registry: MutableMapping[Incomplete, Incomplete] = ...,
sub_debug: Callable[..., object] = ...,
getpid: Callable[[], int] = ...,
) -> Incomplete: ...
def cancel(self) -> None: ...
def still_active(self) -> bool: ...
def is_exiting() -> bool: ...
class ForkAwareThreadLock:
acquire: Callable[[bool, float], bool]
release: Callable[[], None]
def __init__(self) -> None: ...
def __enter__(self) -> bool: ...
def __exit__(self, *args: object) -> None: ...
class ForkAwareLocal(threading.local):
def __init__(self) -> None: ...
MAXFD: int
def close_all_fds_except(fds: Iterable[int]) -> None: ...
def spawnv_passfds(
path: bytes,
# args is anything that can be passed to the int constructor
args: Sequence[str | ReadableBuffer | SupportsInt | SupportsIndex | SupportsTrunc],
passfds: Sequence[int],
) -> int: ...