Add multiprocessing.sharedctypes stubs (#5231)

This commit is contained in:
hatal175
2021-05-02 23:20:33 +03:00
committed by GitHub
parent 06709912ad
commit b15c025059
3 changed files with 152 additions and 64 deletions

View File

@@ -1,6 +1,7 @@
import sys
from collections.abc import Callable, Iterable
from logging import Logger
from multiprocessing import connection, pool, sharedctypes, synchronize
from multiprocessing import connection, context, pool, synchronize
from multiprocessing.context import (
AuthenticationError as AuthenticationError,
BaseContext,
@@ -17,7 +18,7 @@ from multiprocessing.process import active_children as active_children, current_
# These are technically functions that return instances of these Queue classes. See #4313 for discussion
from multiprocessing.queues import JoinableQueue as JoinableQueue, Queue as Queue, SimpleQueue as SimpleQueue
from multiprocessing.spawn import freeze_support as freeze_support
from typing import Any, Callable, Iterable, List, Optional, Sequence, Tuple, Union, overload
from typing import Any, Optional, Union, overload
from typing_extensions import Literal
if sys.version_info >= (3, 8):
@@ -32,6 +33,10 @@ if sys.platform != "win32":
# Sychronization primitives
_LockLike = Union[synchronize.Lock, synchronize.RLock]
RawValue = context._default_context.RawValue
RawArray = context._default_context.RawArray
Value = context._default_context.Value
Array = context._default_context.Array
def Barrier(parties: int, action: Optional[Callable[..., Any]] = ..., timeout: Optional[float] = ...) -> synchronize.Barrier: ...
def BoundedSemaphore(value: int = ...) -> synchronize.BoundedSemaphore: ...
@@ -40,7 +45,7 @@ def Event() -> synchronize.Event: ...
def Lock() -> synchronize.Lock: ...
def RLock() -> synchronize.RLock: ...
def Semaphore(value: int = ...) -> synchronize.Semaphore: ...
def Pipe(duplex: bool = ...) -> Tuple[connection.Connection, connection.Connection]: ...
def Pipe(duplex: bool = ...) -> tuple[connection.Connection, connection.Connection]: ...
def Pool(
processes: Optional[int] = ...,
initializer: Optional[Callable[..., Any]] = ...,
@@ -48,12 +53,6 @@ def Pool(
maxtasksperchild: Optional[int] = ...,
) -> pool.Pool: ...
# Functions Array and Value are copied from context.pyi.
# See https://github.com/python/typeshed/blob/ac234f25927634e06d9c96df98d72d54dd80dfc4/stdlib/2and3/turtle.pyi#L284-L291
# for rationale
def Array(typecode_or_type: Any, size_or_initializer: Union[int, Sequence[Any]], *, lock: bool = ...) -> sharedctypes._Array: ...
def Value(typecode_or_type: Any, *args: Any, lock: bool = ...) -> sharedctypes._Value: ...
# ----- multiprocessing function stubs -----
def allow_connection_pickling() -> None: ...
def cpu_count() -> int: ...
@@ -61,8 +60,8 @@ def get_logger() -> Logger: ...
def log_to_stderr(level: Optional[Union[str, int]] = ...) -> Logger: ...
def Manager() -> SyncManager: ...
def set_executable(executable: str) -> None: ...
def set_forkserver_preload(module_names: List[str]) -> None: ...
def get_all_start_methods() -> List[str]: ...
def set_forkserver_preload(module_names: list[str]) -> None: ...
def get_all_start_methods() -> list[str]: ...
def get_start_method(allow_none: bool = ...) -> Optional[str]: ...
def set_start_method(method: str, force: Optional[bool] = ...) -> None: ...

View File

@@ -1,12 +1,17 @@
import ctypes
import multiprocessing
import sys
from collections.abc import Callable, Iterable, Sequence
from ctypes import _CData
from logging import Logger
from multiprocessing import queues, sharedctypes, synchronize
from multiprocessing import queues, synchronize
from multiprocessing.process import BaseProcess
from typing import Any, Callable, Iterable, List, Optional, Sequence, Type, Union, overload
from multiprocessing.sharedctypes import SynchronizedArray, SynchronizedBase
from typing import Any, Optional, Type, TypeVar, Union, overload
from typing_extensions import Literal
_LockLike = Union[synchronize.Lock, synchronize.RLock]
_CT = TypeVar("_CT", bound=_CData)
class ProcessError(Exception): ...
class BufferTooShort(ProcessError): ...
@@ -28,7 +33,7 @@ class BaseContext(object):
@staticmethod
def parent_process() -> Optional[BaseProcess]: ...
@staticmethod
def active_children() -> List[BaseProcess]: ...
def active_children() -> list[BaseProcess]: ...
def cpu_count(self) -> int: ...
# TODO: change return to SyncManager once a stub exists in multiprocessing.managers
def Manager(self) -> Any: ...
@@ -53,28 +58,52 @@ class BaseContext(object):
initargs: Iterable[Any] = ...,
maxtasksperchild: Optional[int] = ...,
) -> multiprocessing.pool.Pool: ...
# TODO: typecode_or_type param is a ctype with a base class of _SimpleCData or array.typecode Need to figure out
# how to handle the ctype
# TODO: change return to RawValue once a stub exists in multiprocessing.sharedctypes
def RawValue(self, typecode_or_type: Any, *args: Any) -> Any: ...
# TODO: typecode_or_type param is a ctype with a base class of _SimpleCData or array.typecode Need to figure out
# how to handle the ctype
# TODO: change return to RawArray once a stub exists in multiprocessing.sharedctypes
def RawArray(self, typecode_or_type: Any, size_or_initializer: Union[int, Sequence[Any]]) -> Any: ...
# TODO: typecode_or_type param is a ctype with a base class of _SimpleCData or array.typecode Need to figure out
# how to handle the ctype
def Value(self, typecode_or_type: Any, *args: Any, lock: bool = ...) -> sharedctypes._Value: ...
# TODO: typecode_or_type param is a ctype with a base class of _SimpleCData or array.typecode Need to figure out
# how to handle the ctype
@overload
def RawValue(self, typecode_or_type: Type[_CT], *args: Any) -> _CT: ...
@overload
def RawValue(self, typecode_or_type: str, *args: Any) -> Any: ...
@overload
def RawArray(self, typecode_or_type: Type[_CT], size_or_initializer: Union[int, Sequence[Any]]) -> ctypes.Array[_CT]: ...
@overload
def RawArray(self, typecode_or_type: str, size_or_initializer: Union[int, Sequence[Any]]) -> Any: ...
@overload
def Value(self, typecode_or_type: Type[_CT], *args: Any, lock: Literal[False]) -> _CT: ...
@overload
def Value(self, typecode_or_type: Type[_CT], *args: Any, lock: Union[Literal[True], _LockLike]) -> SynchronizedBase[_CT]: ...
@overload
def Value(self, typecode_or_type: str, *args: Any, lock: Union[Literal[True], _LockLike]) -> SynchronizedBase[Any]: ...
@overload
def Value(self, typecode_or_type: Union[str, Type[_CData]], *args: Any, lock: Union[bool, _LockLike] = ...) -> Any: ...
@overload
def Array(
self, typecode_or_type: Any, size_or_initializer: Union[int, Sequence[Any]], *, lock: bool = ...
) -> sharedctypes._Array: ...
self, typecode_or_type: Type[_CT], size_or_initializer: Union[int, Sequence[Any]], *, lock: Literal[False]
) -> _CT: ...
@overload
def Array(
self,
typecode_or_type: Type[_CT],
size_or_initializer: Union[int, Sequence[Any]],
*,
lock: Union[Literal[True], _LockLike],
) -> SynchronizedArray[_CT]: ...
@overload
def Array(
self, typecode_or_type: str, size_or_initializer: Union[int, Sequence[Any]], *, lock: Union[Literal[True], _LockLike]
) -> SynchronizedArray[Any]: ...
@overload
def Array(
self,
typecode_or_type: Union[str, Type[_CData]],
size_or_initializer: Union[int, Sequence[Any]],
*,
lock: Union[bool, _LockLike] = ...,
) -> Any: ...
def freeze_support(self) -> None: ...
def get_logger(self) -> Logger: ...
def log_to_stderr(self, level: Optional[str] = ...) -> Logger: ...
def allow_connection_pickling(self) -> None: ...
def set_executable(self, executable: str) -> None: ...
def set_forkserver_preload(self, module_names: List[str]) -> None: ...
def set_forkserver_preload(self, module_names: list[str]) -> None: ...
if sys.platform != "win32":
@overload
def get_context(self, method: None = ...) -> DefaultContext: ...
@@ -111,7 +140,9 @@ class DefaultContext(BaseContext):
def __init__(self, context: BaseContext) -> None: ...
def set_start_method(self, method: Optional[str], force: bool = ...) -> None: ...
def get_start_method(self, allow_none: bool = ...) -> str: ...
def get_all_start_methods(self) -> List[str]: ...
def get_all_start_methods(self) -> list[str]: ...
_default_context: DefaultContext
if sys.platform != "win32":
class ForkProcess(BaseProcess):

View File

@@ -1,43 +1,101 @@
from ctypes import _CData
import ctypes
from collections.abc import Callable, Iterable, Sequence
from ctypes import _CData, _SimpleCData, c_char
from multiprocessing.context import BaseContext
from multiprocessing.synchronize import _LockLike
from typing import Any, List, Optional, Sequence, Type, Union, overload
from typing import Any, Generic, Optional, Protocol, Type, TypeVar, Union, overload
from typing_extensions import Literal
class _Array:
value: Any = ...
def __init__(
self,
typecode_or_type: Union[str, Type[_CData]],
size_or_initializer: Union[int, Sequence[Any]],
*,
lock: Union[bool, _LockLike] = ...,
) -> None: ...
def acquire(self) -> bool: ...
def release(self) -> bool: ...
def get_lock(self) -> _LockLike: ...
def get_obj(self) -> Any: ...
@overload
def __getitem__(self, key: int) -> Any: ...
@overload
def __getitem__(self, key: slice) -> List[Any]: ...
def __getslice__(self, start: int, stop: int) -> Any: ...
def __setitem__(self, key: int, value: Any) -> None: ...
class _Value:
value: Any = ...
def __init__(self, typecode_or_type: Union[str, Type[_CData]], *args: Any, lock: Union[bool, _LockLike] = ...) -> None: ...
def get_lock(self) -> _LockLike: ...
def get_obj(self) -> Any: ...
def acquire(self) -> bool: ...
def release(self) -> bool: ...
_T = TypeVar("_T")
_CT = TypeVar("_CT", bound=_CData)
@overload
def RawValue(typecode_or_type: Type[_CT], *args: Any) -> _CT: ...
@overload
def RawValue(typecode_or_type: str, *args: Any) -> Any: ...
@overload
def RawArray(typecode_or_type: Type[_CT], size_or_initializer: Union[int, Sequence[Any]]) -> ctypes.Array[_CT]: ...
@overload
def RawArray(typecode_or_type: str, size_or_initializer: Union[int, Sequence[Any]]) -> Any: ...
@overload
def Value(typecode_or_type: Type[_CT], *args: Any, lock: Literal[False], ctx: Optional[BaseContext] = ...) -> _CT: ...
@overload
def Value(
typecode_or_type: Type[_CT], *args: Any, lock: Union[Literal[True], _LockLike], ctx: Optional[BaseContext] = ...
) -> SynchronizedBase[_CT]: ...
@overload
def Value(
typecode_or_type: str, *args: Any, lock: Union[Literal[True], _LockLike], ctx: Optional[BaseContext] = ...
) -> SynchronizedBase[Any]: ...
@overload
def Value(
typecode_or_type: Union[str, Type[_CData]], *args: Any, lock: Union[bool, _LockLike] = ..., ctx: Optional[BaseContext] = ...
) -> Any: ...
@overload
def Array(
typecode_or_type: Type[_CT],
size_or_initializer: Union[int, Sequence[Any]],
*,
lock: Literal[False],
ctx: Optional[BaseContext] = ...,
) -> _CT: ...
@overload
def Array(
typecode_or_type: Type[_CT],
size_or_initializer: Union[int, Sequence[Any]],
*,
lock: Union[Literal[True], _LockLike],
ctx: Optional[BaseContext] = ...,
) -> SynchronizedArray[_CT]: ...
@overload
def Array(
typecode_or_type: str,
size_or_initializer: Union[int, Sequence[Any]],
*,
lock: Union[Literal[True], _LockLike],
ctx: Optional[BaseContext] = ...,
) -> SynchronizedArray[Any]: ...
@overload
def Array(
typecode_or_type: Union[str, Type[_CData]],
size_or_initializer: Union[int, Sequence[Any]],
*,
lock: Union[bool, _LockLike] = ...,
ctx: Optional[BaseContext] = ...,
) -> _Array: ...
def Value(
typecode_or_type: Union[str, Type[_CData]], *args: Any, lock: Union[bool, _LockLike] = ..., ctx: Optional[BaseContext] = ...
) -> _Value: ...
) -> Any: ...
def copy(obj: _CT) -> _CT: ...
@overload
def synchronized(obj: _SimpleCData[_T], lock: Optional[_LockLike] = ..., ctx: Optional[Any] = ...) -> Synchronized[_T]: ...
@overload
def synchronized(obj: ctypes.Array[c_char], lock: Optional[_LockLike] = ..., ctx: Optional[Any] = ...) -> SynchronizedString: ...
@overload
def synchronized(obj: ctypes.Array[_CT], lock: Optional[_LockLike] = ..., ctx: Optional[Any] = ...) -> SynchronizedArray[_CT]: ...
@overload
def synchronized(obj: _CT, lock: Optional[_LockLike] = ..., ctx: Optional[Any] = ...) -> SynchronizedBase[_CT]: ...
class _AcquireFunc(Protocol):
def __call__(self, block: bool = ..., timeout: Optional[float] = ...) -> bool: ...
class SynchronizedBase(Generic[_CT]):
acquire: _AcquireFunc = ...
release: Callable[[], None] = ...
def __init__(self, obj: Any, lock: Optional[_LockLike] = ..., ctx: Optional[Any] = ...) -> None: ...
def __reduce__(self) -> tuple[Callable[..., Any], tuple[Any, _LockLike]]: ...
def get_obj(self) -> _CT: ...
def get_lock(self) -> _LockLike: ...
def __enter__(self) -> bool: ...
def __exit__(self, *args: Any) -> None: ...
class Synchronized(SynchronizedBase[_SimpleCData[_T]], Generic[_T]):
value: _T
class SynchronizedArray(SynchronizedBase[ctypes.Array[_CT]], Generic[_CT]):
def __len__(self) -> int: ...
def __getitem__(self, i: int) -> _CT: ...
def __setitem__(self, i: int, o: _CT) -> None: ...
def __getslice__(self, start: int, stop: int) -> list[_CT]: ...
def __setslice__(self, start: int, stop: int, values: Iterable[_CT]) -> None: ...
class SynchronizedString(SynchronizedArray[c_char]):
value: bytes
raw: bytes