Clean up multiprocessing + shared_memory (#3351)

* Make multiprocessing stubs match implementation

* Add multiprocessing.process.BaseProcess
* Use BaseProcess in multiprocessing.context where applicable
* Remove non-existing BaseContext.Process()
* Derive DefaultContext from BaseContext
* Fix BaseContext/DefaultContext.set_start_method() signatures
* Re-export multiprocessing.context.Process from multiprocessing,
  instead of using a custom definition
* Re-export multiprocessing.active_from from multiprocessing.process
  instead of using a custom definition

* Add parent_process() (Python 3.8)

* Complete BaseManager; add Server

* Add multiprocessing.shared_memory et al
This commit is contained in:
Sebastian Rittau
2019-10-14 09:54:45 +02:00
committed by GitHub
parent 0501e2b329
commit 6b55f5c498
5 changed files with 125 additions and 70 deletions

View File

@@ -1,20 +1,27 @@
# Stubs for multiprocessing
import sys
from typing import Any, Callable, Iterable, Mapping, Optional, List, Union, Sequence, Tuple, Type, overload
from ctypes import _CData
from logging import Logger
from multiprocessing import connection, pool, spawn, synchronize
from multiprocessing.context import (
AuthenticationError as AuthenticationError,
BaseContext,
ProcessError as ProcessError, BufferTooShort as BufferTooShort, TimeoutError as TimeoutError, AuthenticationError as AuthenticationError)
BufferTooShort as BufferTooShort,
Process as Process,
ProcessError as ProcessError,
TimeoutError as TimeoutError,
)
from multiprocessing.managers import SyncManager
from multiprocessing.process import current_process as current_process
from multiprocessing.process import active_children as active_children, current_process as current_process
from multiprocessing.queues import Queue as Queue, SimpleQueue as SimpleQueue, JoinableQueue as JoinableQueue
from multiprocessing.spawn import freeze_support as freeze_support
from multiprocessing.spawn import set_executable as set_executable
import sys
if sys.version_info >= (3, 8):
from multiprocessing.process import parent_process as parent_process
# N.B. The functions below are generated at runtime by partially applying
# multiprocessing.context.BaseContext's methods, so the two signatures should
@@ -39,31 +46,6 @@ def Pool(processes: Optional[int] = ...,
initargs: Iterable[Any] = ...,
maxtasksperchild: Optional[int] = ...) -> pool.Pool: ...
class Process():
name: str
daemon: bool
pid: Optional[int]
exitcode: Optional[int]
authkey: bytes
sentinel: int
# TODO: set type of group to None
def __init__(self,
group: Any = ...,
target: Optional[Callable[..., Any]] = ...,
name: Optional[str] = ...,
args: Iterable[Any] = ...,
kwargs: Mapping[Any, Any] = ...,
*,
daemon: Optional[bool] = ...) -> None: ...
def start(self) -> None: ...
def run(self) -> None: ...
def terminate(self) -> None: ...
if sys.version_info >= (3, 7):
def kill(self) -> None: ...
def close(self) -> None: ...
def is_alive(self) -> bool: ...
def join(self, timeout: Optional[float] = ...) -> None: ...
class Array():
value: Any = ...
@@ -90,7 +72,6 @@ class Value():
def release(self) -> bool: ...
# ----- multiprocessing function stubs -----
def active_children() -> List[Process]: ...
def allow_connection_pickling() -> None: ...
def cpu_count() -> int: ...
def get_logger() -> Logger: ...

View File

@@ -4,6 +4,7 @@ from logging import Logger
import multiprocessing
from multiprocessing import synchronize
from multiprocessing import queues
from multiprocessing.process import BaseProcess
import sys
from typing import Any, Callable, Iterable, Optional, List, Mapping, Sequence, Type, Union
@@ -27,9 +28,12 @@ class BaseContext(object):
# multiprocessing.*, so the signatures should be identical (modulo self).
@staticmethod
def current_process() -> multiprocessing.Process: ...
def current_process() -> BaseProcess: ...
if sys.version_info >= (3, 8):
@staticmethod
def parent_process() -> Optional[BaseProcess]: ...
@staticmethod
def active_children() -> List[multiprocessing.Process]: ...
def active_children() -> List[BaseProcess]: ...
def cpu_count(self) -> int: ...
# TODO: change return to SyncManager once a stub exists in multiprocessing.managers
def Manager(self) -> Any: ...
@@ -59,16 +63,6 @@ class BaseContext(object):
initargs: Iterable[Any] = ...,
maxtasksperchild: Optional[int] = ...
) -> multiprocessing.pool.Pool: ...
def Process(
self,
group: Any = ...,
target: Optional[Callable[..., Any]] = ...,
name: Optional[str] = ...,
args: Iterable[Any] = ...,
kwargs: Mapping[Any, Any] = ...,
*,
daemon: Optional[bool] = ...
) -> multiprocessing.Process: ...
# TODO: typecode_or_type param is a ctype with a base class of _SimpleCData or array.typecode Need to figure out
# how to handle the ctype
# TODO: change return to RawValue once a stub exists in multiprocessing.sharedctypes
@@ -104,46 +98,42 @@ class BaseContext(object):
def set_forkserver_preload(self, module_names: List[str]) -> None: ...
def get_context(self, method: Optional[str] = ...) -> BaseContext: ...
def get_start_method(self, allow_none: bool = ...) -> str: ...
def set_start_method(self, method: Optional[str] = ...) -> None: ...
def set_start_method(self, method: Optional[str], force: bool = ...) -> None: ...
@property
def reducer(self) -> str: ...
@reducer.setter
def reducer(self, reduction: str) -> None: ...
def _check_available(self) -> None: ...
class Process(object):
class Process(BaseProcess):
_start_method: Optional[str]
@staticmethod
# TODO: type should be BaseProcess once a stub in multiprocessing.process exists
def _Popen(process_obj: Any) -> DefaultContext: ...
def _Popen(process_obj: BaseProcess) -> DefaultContext: ...
class DefaultContext(object):
class DefaultContext(BaseContext):
Process: Type[multiprocessing.Process]
def __init__(self, context: BaseContext) -> None: ...
def get_context(self, method: Optional[str] = ...) -> BaseContext: ...
def set_start_method(self, method: str, force: bool = ...) -> None: ...
def set_start_method(self, method: Optional[str], force: bool = ...) -> None: ...
def get_start_method(self, allow_none: bool = ...) -> str: ...
def get_all_start_methods(self) -> List[str]: ...
if sys.platform != 'win32':
# TODO: type should be BaseProcess once a stub in multiprocessing.process exists
class ForkProcess(Any):
class ForkProcess(BaseProcess):
_start_method: str
@staticmethod
def _Popen(process_obj: Any) -> Any: ...
def _Popen(process_obj: BaseProcess) -> Any: ...
# TODO: type should be BaseProcess once a stub in multiprocessing.process exists
class SpawnProcess(Any):
class SpawnProcess(BaseProcess):
_start_method: str
@staticmethod
def _Popen(process_obj: Any) -> SpawnProcess: ...
def _Popen(process_obj: BaseProcess) -> SpawnProcess: ...
# TODO: type should be BaseProcess once a stub in multiprocessing.process exists
class ForkServerProcess(Any):
class ForkServerProcess(BaseProcess):
_start_method: str
@staticmethod
def _Popen(process_obj: Any) -> Any: ...
def _Popen(process_obj: BaseProcess) -> Any: ...
class ForkContext(BaseContext):
_name: str
@@ -157,20 +147,16 @@ if sys.platform != 'win32':
_name: str
Process: Type[ForkServerProcess]
else:
# TODO: type should be BaseProcess once a stub in multiprocessing.process exists
class SpawnProcess(Any):
class SpawnProcess(BaseProcess):
_start_method: str
@staticmethod
# TODO: type should be BaseProcess once a stub in multiprocessing.process exists
def _Popen(process_obj: Process) -> Any: ...
def _Popen(process_obj: BaseProcess) -> Any: ...
class SpawnContext(BaseContext):
_name: str
Process: Type[SpawnProcess]
def _force_start_method(method: str) -> None: ...
# TODO: type should be BaseProcess once a stub in multiprocessing.process exists
def get_spawning_popen() -> Optional[Any]: ...
# TODO: type should be BaseProcess once a stub in multiprocessing.process exists
def set_spawning_popen(popen: Any) -> None: ...
def assert_spawning(obj: Any) -> None: ...

View File

@@ -3,11 +3,16 @@
# NOTE: These are incomplete!
import queue
import sys
import threading
from typing import (
Any, Callable, ContextManager, Dict, Iterable, Generic, List, Mapping, Optional,
Sequence, Tuple, TypeVar, Union,
)
from .context import BaseContext
if sys.version_info >= (3, 8):
from .shared_memory import ShareableList, SharedMemory, _SLT
_T = TypeVar('_T')
_KT = TypeVar('_KT')
@@ -24,18 +29,32 @@ class ValueProxy(BaseProxy, Generic[_T]):
def set(self, value: _T) -> None: ...
value: _T
# Returned by BaseManager.get_server()
class Server:
address: Any
def serve_forever(self) -> None: ...
class BaseManager(ContextManager[BaseManager]):
address: Union[str, Tuple[str, int]]
def __init__(
self,
address: Optional[Any] = ...,
authkey: Optional[bytes] = ...,
serializer: str = ...,
ctx: Optional[BaseContext] = ...,
) -> None: ...
def get_server(self) -> Server: ...
def connect(self) -> None: ...
def start(self, initializer: Optional[Callable[..., Any]] = ..., initargs: Iterable[Any] = ...) -> None: ...
def shutdown(self) -> None: ... # only available after start() was called
def join(self, timeout: Optional[float] = ...) -> None: ... # undocumented
@property
def address(self) -> Any: ...
@classmethod
def register(cls, typeid: str, callable: Optional[Callable[..., Any]] = ...,
proxytype: Any = ...,
exposed: Optional[Sequence[str]] = ...,
method_to_typeid: Optional[Mapping[str, str]] = ...,
create_method: bool = ...) -> None: ...
def shutdown(self) -> None: ...
def start(self, initializer: Optional[Callable[..., Any]] = ...,
initargs: Iterable[Any] = ...) -> None: ...
class SyncManager(BaseManager, ContextManager[SyncManager]):
def BoundedSemaphore(self, value: Any = ...) -> threading.BoundedSemaphore: ...
@@ -52,3 +71,10 @@ class SyncManager(BaseManager, ContextManager[SyncManager]):
def list(self, sequence: Sequence[_T] = ...) -> List[_T]: ...
class RemoteError(Exception): ...
if sys.version_info >= (3, 8):
class SharedMemoryServer(Server): ...
class SharedMemoryManager(BaseManager):
def get_server(self) -> SharedMemoryServer: ...
def SharedMemory(self, size: int) -> SharedMemory: ...
def ShareableList(self, sequence: Optional[Iterable[_SLT]]) -> ShareableList[_SLT]: ...

View File

@@ -1,5 +1,38 @@
from typing import List
from multiprocessing import Process
import sys
from typing import Any, Callable, List, Mapping, Optional, Tuple
def current_process() -> Process: ...
def active_children() -> List[Process]: ...
class BaseProcess:
name: str
daemon: bool
authkey: bytes
def __init__(
self,
group: None = ...,
target: Optional[Callable[..., Any]] = ...,
name: Optional[str] = ...,
args: Tuple[Any, ...] = ...,
kwargs: Mapping[str, Any] = ...,
*,
daemon: Optional[bool] = ...,
) -> None: ...
def run(self) -> None: ...
def start(self) -> None: ...
def terminate(self) -> None: ...
if sys.version_info >= (3, 7):
def kill(self) -> None: ...
def close(self) -> None: ...
def join(self, timeout: Optional[float] = ...) -> None: ...
def is_alive(self) -> bool: ...
@property
def exitcode(self) -> Optional[int]: ...
@property
def ident(self) -> Optional[int]: ...
@property
def pid(self) -> Optional[int]: ...
@property
def sentinel(self) -> int: ...
def current_process() -> BaseProcess: ...
def active_children() -> List[BaseProcess]: ...
if sys.version_info >= (3, 8):
def parent_process() -> Optional[BaseProcess]: ...

View File

@@ -0,0 +1,29 @@
import sys
from typing import Generic, Iterable, Optional, Tuple, TypeVar
_S = TypeVar("_S")
_SLT = TypeVar("_SLT", int, float, bool, str, bytes, None)
if sys.version_info >= (3, 8):
class SharedMemory:
def __init__(self, name: Optional[str] = ..., create: bool = ..., size: int = ...) -> None: ...
@property
def buf(self) -> memoryview: ...
@property
def name(self) -> str: ...
@property
def size(self) -> int: ...
def close(self) -> None: ...
def unlink(self) -> None: ...
class ShareableList(Generic[_SLT]):
shm: SharedMemory
def __init__(self, sequence: Optional[Iterable[_SLT]] = ..., *, name: Optional[str] = ...) -> None: ...
def __getitem__(self, position: int) -> _SLT: ...
def __setitem__(self, position: int, value: _SLT) -> None: ...
def __reduce__(self: _S) -> Tuple[_S, Tuple[_SLT, ...]]: ...
def __len__(self) -> int: ...
@property
def format(self) -> str: ...
def count(self, value: _SLT) -> int: ...
def index(self, value: _SLT) -> int: ...