Python 3 multiprocesisng synchronization stubs (#1678)

* Add multiprocessing.Array to Python 3 stub

* Add Pipe and Connection type

* Add synchronize type-stub

* Add multiprocessing Exceptions

* Update context with synchronization primitives

* Code review comments

* Add acquire and release from _make_method

* Remove Array stub

* add missing Optional
This commit is contained in:
Alan Du
2017-11-07 02:07:02 -05:00
committed by Jelle Zijlstra
parent f4e3657c57
commit 275d9b5818
4 changed files with 122 additions and 36 deletions

View File

@@ -2,34 +2,39 @@
from typing import (
Any, Callable, ContextManager, Iterable, Mapping, Optional, Dict, List,
Union, TypeVar,
Union, TypeVar, Sequence, Tuple
)
from logging import Logger
from multiprocessing import pool
from multiprocessing.context import BaseContext
from multiprocessing import connection, pool, synchronize
from multiprocessing.context import (
BaseContext,
ProcessError, BufferTooShort, TimeoutError, AuthenticationError)
from multiprocessing.managers import SyncManager
from multiprocessing.pool import AsyncResult
from multiprocessing.process import current_process as current_process
import sys
import queue
import sys
_T = TypeVar('_T')
class Lock(ContextManager[Lock]):
def acquire(self, block: bool = ..., timeout: int = ...) -> None: ...
def release(self) -> None: ...
# N.B. The functions below are generated at runtime by partially applying
# multiprocessing.context.BaseContext's methods, so the two signatures should
# be identical (modulo self).
class Event(object):
def __init__(self, *, ctx: BaseContext) -> None: ...
def is_set(self) -> bool: ...
def set(self) -> None: ...
def clear(self) -> None: ...
def wait(self, timeout: Optional[int] = ...) -> bool: ...
# Sychronization primitives
_LockLike = Union[synchronize.Lock, synchronize.RLock]
def Barrier(parties: int,
action: Optional[Callable] = ...,
timeout: Optional[float] = ...) -> synchronize.Barrier: ...
def BoundedSemaphore(value: int = ...) -> synchronize.BoundedSemaphore: ...
def Condition(lock: Optional[_LockLike] = ...) -> synchronize.Condition: ...
def Event(lock: Optional[_LockLike] = ...) -> synchronize.Event: ...
def Lock() -> synchronize.Lock: ...
def RLock() -> synchronize.RLock: ...
def Semaphore(value: int = ...) -> synchronize.Semaphore: ...
def Pipe(duplex: bool = ...) -> Tuple[connection.Connection, connection.Connection]: ...
# N.B. This is generated at runtime by partially applying
# multiprocessing.context.BaseContext.Pool, so the two signatures should be
# identical (modulo self).
def Pool(processes: Optional[int] = ...,
initializer: Optional[Callable[..., Any]] = ...,
initargs: Iterable[Any] = ...,

View File

@@ -0,0 +1,14 @@
from typing import Any, Optional
class Connection:
def close(self) -> None: ...
def fileno(self) -> int: ...
def poll(self, timeout: float = ...) -> bool: ...
def recv(self) -> Any: ...
def recv_bytes(self, maxlength: Optional[int] = ...) -> bytes: ...
def recv_bytes_into(self, buf: Any, offset: int = ...) -> int: ...
def send(self, obj: Any) -> None: ...
def send_bytes(self,
buf: bytes,
offset: int = ...,
size: Optional[int] = ...) -> None: ...

View File

@@ -2,11 +2,15 @@
from logging import Logger
import multiprocessing
from multiprocessing import synchronize
import sys
from typing import (
Any, Callable, Iterable, Optional, List, Mapping, Sequence, Tuple, Type, Union,
Any, Callable, Iterable, Optional, List, Mapping, Sequence, Tuple, Type,
Union,
)
_LockLike = Union[synchronize.Lock, synchronize.RLock]
class ProcessError(Exception): ...
class BufferTooShort(ProcessError): ...
@@ -21,6 +25,9 @@ class BaseContext(object):
TimeoutError = ... # type: Type[Exception]
AuthenticationError = ... # type: Type[Exception]
# N.B. The methods below are applied at runtime to generate
# multiprocessing.*, so the signatures should be identical (modulo self).
@staticmethod
def current_process() -> multiprocessing.Process: ...
@staticmethod
@@ -30,30 +37,26 @@ class BaseContext(object):
def Manager(self) -> Any: ...
# TODO: change return to Pipe once a stub exists in multiprocessing.connection
def Pipe(self, duplex: bool) -> Any: ...
# TODO: change return to Lock once a stub exists in multiprocessing.synchronize
def Lock(self) -> Any: ...
# TODO: change return to RLock once a stub exists in multiprocessing.synchronize
def RLock(self) -> Any: ...
# TODO: change lock param to Optional[Union[Lock, RLock]] when stubs exists in multiprocessing.synchronize
# TODO: change return to Condition once a stub exists in multiprocessing.synchronize
def Condition(self, lock: Optional[Any] = ...) -> Any: ...
# TODO: change return to Semaphore once a stub exists in multiprocessing.synchronize
def Semaphore(self, value: int = ...) -> Any: ...
# TODO: change return to BoundedSemaphore once a stub exists in multiprocessing.synchronize
def BoundedSemaphore(self, value: int = ...) -> Any: ...
# TODO: change return to Event once a stub exists in multiprocessing.synchronize
def Event(self) -> Any: ...
# TODO: change return to Barrier once a stub exists in multiprocessing.synchronize
def Barrier(self, parties: int, action: Optional[Callable[..., Any]] = ..., timeout: Optional[int] = ...) -> Any: ...
def Barrier(self,
parties: int,
action: Optional[Callable] = ...,
timeout: Optional[float] = ...) -> synchronize.Barrier: ...
def BoundedSemaphore(self,
value: int = ...) -> synchronize.BoundedSemaphore: ...
def Condition(self,
lock: Optional[_LockLike] = ...) -> synchronize.Condition: ...
def Event(self, lock: Optional[_LockLike] = ...) -> synchronize.Event: ...
def Lock(self) -> synchronize.Lock: ...
def RLock(self) -> synchronize.RLock: ...
def Semaphore(self, value: int = ...) -> synchronize.Semaphore: ...
# TODO: change return to Queue once a stub exists in multiprocessing.queues
def Queue(self, maxsize: int = ...) -> Any: ...
# TODO: change return to Queue once a stub exists in multiprocessing.queues
def JoinableQueue(self, maxsize: int = ...) -> Any: ...
# TODO: change return to SimpleQueue once a stub exists in multiprocessing.queues
def SimpleQueue(self) -> Any: ...
# N.B. This method is partially applied at runtime to generate
# multiprocessing.Pool, so the two signatures should be identical (modulo
# self).
def Pool(
self,
processes: Optional[int] = ...,

View File

@@ -0,0 +1,64 @@
from typing import Callable, ContextManager, Optional, Union
from multiprocessing.context import BaseContext
import threading
import sys
_LockLike = Union[Lock, RLock]
class Barrier(threading.Barrier):
def __init__(self,
parties: int,
action: Optional[Callable] = ...,
timeout: Optional[float] = ...,
*
ctx: BaseContext) -> None: ...
class BoundedSemaphore(Semaphore):
def __init__(self, value: int = ..., *, ctx: BaseContext) -> None: ...
class Condition(ContextManager[bool]):
def __init__(self,
lock: Optional[_LockLike] = ...,
*,
ctx: BaseContext) -> None: ...
if sys.version_info >= (3, 7):
def notify(self, n: int = ...) -> None: ...
else:
def notify(self) -> None: ...
def notify_all(self) -> None: ...
def wait(self, timeout: Optional[float] = ...) -> bool: ...
if sys.version_info >= (3, 3):
def wait_for(self,
predicate: Callable[[], bool],
timeout: Optional[float] = ...) -> bool: ...
def acquire(self,
block: bool = ...,
timeout: Optional[float] = ...) -> bool: ...
def release(self) -> None: ...
class Event(ContextManager[bool]):
def __init__(self,
lock: Optional[_LockLike] = ...,
*,
ctx: BaseContext) -> None: ...
def is_set(self) -> bool: ...
def set(self) -> None: ...
def clear(self) -> None: ...
def wait(self, timeout: Optional[float] = ...) -> bool: ...
class Lock(SemLock):
def __init__(self, *, ctx: BaseContext) -> None: ...
class RLock(SemLock):
def __init__(self, *, ctx: BaseContext) -> None: ...
class Semaphore(SemLock):
def __init__(self, value: int = ..., *, ctx: BaseContext) -> None: ...
# Not part of public API
class SemLock(ContextManager[bool]):
def acquire(self,
block: bool = ...,
timeout: Optional[float] = ...) -> bool: ...
def release(self) -> None: ...