mirror of
https://github.com/davidhalter/typeshed.git
synced 2025-12-08 21:14:48 +08:00
Continuing work towards #8988. The first five commits were created using stubdefaulter on various Python versions; the following commits were all created manually by me to fix various problems. The main things this adds that weren't present in #9501 are: - Defaults in Windows-only modules and Windows-only branches (because I'm running a Windows machine) - Defaults in non-py311 branches - Defaults for float parameters - Defaults for overloads
195 lines
7.8 KiB
Python
195 lines
7.8 KiB
Python
import ctypes
|
|
import sys
|
|
from collections.abc import Callable, Iterable, Sequence
|
|
from ctypes import _CData
|
|
from logging import Logger, _Level as _LoggingLevel
|
|
from multiprocessing import popen_fork, popen_forkserver, popen_spawn_posix, popen_spawn_win32, queues, synchronize
|
|
from multiprocessing.managers import SyncManager
|
|
from multiprocessing.pool import Pool as _Pool
|
|
from multiprocessing.process import BaseProcess
|
|
from multiprocessing.sharedctypes import SynchronizedArray, SynchronizedBase
|
|
from typing import Any, ClassVar, TypeVar, overload
|
|
from typing_extensions import Literal, TypeAlias
|
|
|
|
if sys.platform != "win32":
|
|
from multiprocessing.connection import Connection
|
|
else:
|
|
from multiprocessing.connection import PipeConnection
|
|
|
|
if sys.version_info >= (3, 8):
|
|
__all__ = ()
|
|
else:
|
|
__all__: list[str] = []
|
|
|
|
_LockLike: TypeAlias = synchronize.Lock | synchronize.RLock
|
|
_CT = TypeVar("_CT", bound=_CData)
|
|
|
|
class ProcessError(Exception): ...
|
|
class BufferTooShort(ProcessError): ...
|
|
class TimeoutError(ProcessError): ...
|
|
class AuthenticationError(ProcessError): ...
|
|
|
|
class BaseContext:
|
|
ProcessError: ClassVar[type[ProcessError]]
|
|
BufferTooShort: ClassVar[type[BufferTooShort]]
|
|
TimeoutError: ClassVar[type[TimeoutError]]
|
|
AuthenticationError: ClassVar[type[AuthenticationError]]
|
|
|
|
# N.B. The methods below are applied at runtime to generate
|
|
# multiprocessing.*, so the signatures should be identical (modulo self).
|
|
@staticmethod
|
|
def current_process() -> BaseProcess: ...
|
|
if sys.version_info >= (3, 8):
|
|
@staticmethod
|
|
def parent_process() -> BaseProcess | None: ...
|
|
|
|
@staticmethod
|
|
def active_children() -> list[BaseProcess]: ...
|
|
def cpu_count(self) -> int: ...
|
|
def Manager(self) -> SyncManager: ...
|
|
|
|
# N.B. Keep this in sync with multiprocessing.connection.Pipe.
|
|
# _ConnectionBase is the common base class of Connection and PipeConnection
|
|
# and can be used in cross-platform code.
|
|
if sys.platform != "win32":
|
|
def Pipe(self, duplex: bool = True) -> tuple[Connection, Connection]: ...
|
|
else:
|
|
def Pipe(self, duplex: bool = True) -> tuple[PipeConnection, PipeConnection]: ...
|
|
|
|
def Barrier(
|
|
self, parties: int, action: Callable[..., object] | None = None, timeout: float | None = None
|
|
) -> synchronize.Barrier: ...
|
|
def BoundedSemaphore(self, value: int = 1) -> synchronize.BoundedSemaphore: ...
|
|
def Condition(self, lock: _LockLike | None = None) -> synchronize.Condition: ...
|
|
def Event(self) -> synchronize.Event: ...
|
|
def Lock(self) -> synchronize.Lock: ...
|
|
def RLock(self) -> synchronize.RLock: ...
|
|
def Semaphore(self, value: int = 1) -> synchronize.Semaphore: ...
|
|
def Queue(self, maxsize: int = 0) -> queues.Queue[Any]: ...
|
|
def JoinableQueue(self, maxsize: int = 0) -> queues.JoinableQueue[Any]: ...
|
|
def SimpleQueue(self) -> queues.SimpleQueue[Any]: ...
|
|
def Pool(
|
|
self,
|
|
processes: int | None = None,
|
|
initializer: Callable[..., object] | None = None,
|
|
initargs: Iterable[Any] = ...,
|
|
maxtasksperchild: int | None = None,
|
|
) -> _Pool: ...
|
|
@overload
|
|
def RawValue(self, typecode_or_type: type[_CT], *args: Any) -> _CT: ...
|
|
@overload
|
|
def RawValue(self, typecode_or_type: str, *args: Any) -> Any: ...
|
|
@overload
|
|
def RawArray(self, typecode_or_type: type[_CT], size_or_initializer: int | Sequence[Any]) -> ctypes.Array[_CT]: ...
|
|
@overload
|
|
def RawArray(self, typecode_or_type: str, size_or_initializer: int | Sequence[Any]) -> Any: ...
|
|
@overload
|
|
def Value(self, typecode_or_type: type[_CT], *args: Any, lock: Literal[False]) -> _CT: ...
|
|
@overload
|
|
def Value(self, typecode_or_type: type[_CT], *args: Any, lock: Literal[True] | _LockLike = True) -> SynchronizedBase[_CT]: ...
|
|
@overload
|
|
def Value(self, typecode_or_type: str, *args: Any, lock: Literal[True] | _LockLike = True) -> SynchronizedBase[Any]: ...
|
|
@overload
|
|
def Value(self, typecode_or_type: str | type[_CData], *args: Any, lock: bool | _LockLike = True) -> Any: ...
|
|
@overload
|
|
def Array(self, typecode_or_type: type[_CT], size_or_initializer: int | Sequence[Any], *, lock: Literal[False]) -> _CT: ...
|
|
@overload
|
|
def Array(
|
|
self, typecode_or_type: type[_CT], size_or_initializer: int | Sequence[Any], *, lock: Literal[True] | _LockLike = True
|
|
) -> SynchronizedArray[_CT]: ...
|
|
@overload
|
|
def Array(
|
|
self, typecode_or_type: str, size_or_initializer: int | Sequence[Any], *, lock: Literal[True] | _LockLike = True
|
|
) -> SynchronizedArray[Any]: ...
|
|
@overload
|
|
def Array(
|
|
self, typecode_or_type: str | type[_CData], size_or_initializer: int | Sequence[Any], *, lock: bool | _LockLike = True
|
|
) -> Any: ...
|
|
def freeze_support(self) -> None: ...
|
|
def get_logger(self) -> Logger: ...
|
|
def log_to_stderr(self, level: _LoggingLevel | None = None) -> Logger: ...
|
|
def allow_connection_pickling(self) -> None: ...
|
|
def set_executable(self, executable: str) -> None: ...
|
|
def set_forkserver_preload(self, module_names: list[str]) -> None: ...
|
|
if sys.platform != "win32":
|
|
@overload
|
|
def get_context(self, method: None = ...) -> DefaultContext: ...
|
|
@overload
|
|
def get_context(self, method: Literal["spawn"]) -> SpawnContext: ...
|
|
@overload
|
|
def get_context(self, method: Literal["fork"]) -> ForkContext: ...
|
|
@overload
|
|
def get_context(self, method: Literal["forkserver"]) -> ForkServerContext: ...
|
|
@overload
|
|
def get_context(self, method: str) -> BaseContext: ...
|
|
else:
|
|
@overload
|
|
def get_context(self, method: None = None) -> DefaultContext: ...
|
|
@overload
|
|
def get_context(self, method: Literal["spawn"]) -> SpawnContext: ...
|
|
@overload
|
|
def get_context(self, method: str) -> BaseContext: ...
|
|
|
|
@overload
|
|
def get_start_method(self, allow_none: Literal[False] = False) -> str: ...
|
|
@overload
|
|
def get_start_method(self, allow_none: bool) -> str | None: ...
|
|
def set_start_method(self, method: str | None, force: bool = False) -> None: ...
|
|
@property
|
|
def reducer(self) -> str: ...
|
|
@reducer.setter
|
|
def reducer(self, reduction: str) -> None: ...
|
|
def _check_available(self) -> None: ...
|
|
|
|
class Process(BaseProcess):
|
|
_start_method: str | None
|
|
@staticmethod
|
|
def _Popen(process_obj: BaseProcess) -> DefaultContext: ...
|
|
|
|
class DefaultContext(BaseContext):
|
|
Process: ClassVar[type[Process]]
|
|
def __init__(self, context: BaseContext) -> None: ...
|
|
def get_start_method(self, allow_none: bool = False) -> str: ...
|
|
def get_all_start_methods(self) -> list[str]: ...
|
|
if sys.version_info < (3, 8):
|
|
__all__: ClassVar[list[str]]
|
|
|
|
_default_context: DefaultContext
|
|
|
|
class SpawnProcess(BaseProcess):
|
|
_start_method: str
|
|
if sys.platform != "win32":
|
|
@staticmethod
|
|
def _Popen(process_obj: BaseProcess) -> popen_spawn_posix.Popen: ...
|
|
else:
|
|
@staticmethod
|
|
def _Popen(process_obj: BaseProcess) -> popen_spawn_win32.Popen: ...
|
|
|
|
class SpawnContext(BaseContext):
|
|
_name: str
|
|
Process: ClassVar[type[SpawnProcess]]
|
|
|
|
if sys.platform != "win32":
|
|
class ForkProcess(BaseProcess):
|
|
_start_method: str
|
|
@staticmethod
|
|
def _Popen(process_obj: BaseProcess) -> popen_fork.Popen: ...
|
|
|
|
class ForkServerProcess(BaseProcess):
|
|
_start_method: str
|
|
@staticmethod
|
|
def _Popen(process_obj: BaseProcess) -> popen_forkserver.Popen: ...
|
|
|
|
class ForkContext(BaseContext):
|
|
_name: str
|
|
Process: ClassVar[type[ForkProcess]]
|
|
|
|
class ForkServerContext(BaseContext):
|
|
_name: str
|
|
Process: ClassVar[type[ForkServerProcess]]
|
|
|
|
def _force_start_method(method: str) -> None: ...
|
|
def get_spawning_popen() -> Any | None: ...
|
|
def set_spawning_popen(popen: Any) -> None: ...
|
|
def assert_spawning(obj: Any) -> None: ...
|