diff --git a/stdlib/3/multiprocessing/__init__.pyi b/stdlib/3/multiprocessing/__init__.pyi index 59b82daf3..a8a3260ad 100644 --- a/stdlib/3/multiprocessing/__init__.pyi +++ b/stdlib/3/multiprocessing/__init__.pyi @@ -2,34 +2,39 @@ from typing import ( Any, Callable, ContextManager, Iterable, Mapping, Optional, Dict, List, - Union, TypeVar, + Union, TypeVar, Sequence, Tuple ) from logging import Logger -from multiprocessing import pool -from multiprocessing.context import BaseContext +from multiprocessing import connection, pool, synchronize +from multiprocessing.context import ( + BaseContext, + ProcessError, BufferTooShort, TimeoutError, AuthenticationError) from multiprocessing.managers import SyncManager -from multiprocessing.pool import AsyncResult from multiprocessing.process import current_process as current_process -import sys import queue +import sys _T = TypeVar('_T') -class Lock(ContextManager[Lock]): - def acquire(self, block: bool = ..., timeout: int = ...) -> None: ... - def release(self) -> None: ... +# N.B. The functions below are generated at runtime by partially applying +# multiprocessing.context.BaseContext's methods, so the two signatures should +# be identical (modulo self). -class Event(object): - def __init__(self, *, ctx: BaseContext) -> None: ... - def is_set(self) -> bool: ... - def set(self) -> None: ... - def clear(self) -> None: ... - def wait(self, timeout: Optional[int] = ...) -> bool: ... +# Sychronization primitives +_LockLike = Union[synchronize.Lock, synchronize.RLock] +def Barrier(parties: int, + action: Optional[Callable] = ..., + timeout: Optional[float] = ...) -> synchronize.Barrier: ... +def BoundedSemaphore(value: int = ...) -> synchronize.BoundedSemaphore: ... +def Condition(lock: Optional[_LockLike] = ...) -> synchronize.Condition: ... +def Event(lock: Optional[_LockLike] = ...) -> synchronize.Event: ... +def Lock() -> synchronize.Lock: ... +def RLock() -> synchronize.RLock: ... +def Semaphore(value: int = ...) -> synchronize.Semaphore: ... + +def Pipe(duplex: bool = ...) -> Tuple[connection.Connection, connection.Connection]: ... -# N.B. This is generated at runtime by partially applying -# multiprocessing.context.BaseContext.Pool, so the two signatures should be -# identical (modulo self). def Pool(processes: Optional[int] = ..., initializer: Optional[Callable[..., Any]] = ..., initargs: Iterable[Any] = ..., diff --git a/stdlib/3/multiprocessing/connection.pyi b/stdlib/3/multiprocessing/connection.pyi new file mode 100644 index 000000000..0f267a630 --- /dev/null +++ b/stdlib/3/multiprocessing/connection.pyi @@ -0,0 +1,14 @@ +from typing import Any, Optional + +class Connection: + def close(self) -> None: ... + def fileno(self) -> int: ... + def poll(self, timeout: float = ...) -> bool: ... + def recv(self) -> Any: ... + def recv_bytes(self, maxlength: Optional[int] = ...) -> bytes: ... + def recv_bytes_into(self, buf: Any, offset: int = ...) -> int: ... + def send(self, obj: Any) -> None: ... + def send_bytes(self, + buf: bytes, + offset: int = ..., + size: Optional[int] = ...) -> None: ... diff --git a/stdlib/3/multiprocessing/context.pyi b/stdlib/3/multiprocessing/context.pyi index ab095c99d..859a6c3a6 100644 --- a/stdlib/3/multiprocessing/context.pyi +++ b/stdlib/3/multiprocessing/context.pyi @@ -2,11 +2,15 @@ from logging import Logger import multiprocessing +from multiprocessing import synchronize import sys from typing import ( - Any, Callable, Iterable, Optional, List, Mapping, Sequence, Tuple, Type, Union, + Any, Callable, Iterable, Optional, List, Mapping, Sequence, Tuple, Type, + Union, ) +_LockLike = Union[synchronize.Lock, synchronize.RLock] + class ProcessError(Exception): ... class BufferTooShort(ProcessError): ... @@ -21,6 +25,9 @@ class BaseContext(object): TimeoutError = ... # type: Type[Exception] AuthenticationError = ... # type: Type[Exception] + # N.B. The methods below are applied at runtime to generate + # multiprocessing.*, so the signatures should be identical (modulo self). + @staticmethod def current_process() -> multiprocessing.Process: ... @staticmethod @@ -30,30 +37,26 @@ class BaseContext(object): def Manager(self) -> Any: ... # TODO: change return to Pipe once a stub exists in multiprocessing.connection def Pipe(self, duplex: bool) -> Any: ... - # TODO: change return to Lock once a stub exists in multiprocessing.synchronize - def Lock(self) -> Any: ... - # TODO: change return to RLock once a stub exists in multiprocessing.synchronize - def RLock(self) -> Any: ... - # TODO: change lock param to Optional[Union[Lock, RLock]] when stubs exists in multiprocessing.synchronize - # TODO: change return to Condition once a stub exists in multiprocessing.synchronize - def Condition(self, lock: Optional[Any] = ...) -> Any: ... - # TODO: change return to Semaphore once a stub exists in multiprocessing.synchronize - def Semaphore(self, value: int = ...) -> Any: ... - # TODO: change return to BoundedSemaphore once a stub exists in multiprocessing.synchronize - def BoundedSemaphore(self, value: int = ...) -> Any: ... - # TODO: change return to Event once a stub exists in multiprocessing.synchronize - def Event(self) -> Any: ... - # TODO: change return to Barrier once a stub exists in multiprocessing.synchronize - def Barrier(self, parties: int, action: Optional[Callable[..., Any]] = ..., timeout: Optional[int] = ...) -> Any: ... + + def Barrier(self, + parties: int, + action: Optional[Callable] = ..., + timeout: Optional[float] = ...) -> synchronize.Barrier: ... + def BoundedSemaphore(self, + value: int = ...) -> synchronize.BoundedSemaphore: ... + def Condition(self, + lock: Optional[_LockLike] = ...) -> synchronize.Condition: ... + def Event(self, lock: Optional[_LockLike] = ...) -> synchronize.Event: ... + def Lock(self) -> synchronize.Lock: ... + def RLock(self) -> synchronize.RLock: ... + def Semaphore(self, value: int = ...) -> synchronize.Semaphore: ... + # TODO: change return to Queue once a stub exists in multiprocessing.queues def Queue(self, maxsize: int = ...) -> Any: ... # TODO: change return to Queue once a stub exists in multiprocessing.queues def JoinableQueue(self, maxsize: int = ...) -> Any: ... # TODO: change return to SimpleQueue once a stub exists in multiprocessing.queues def SimpleQueue(self) -> Any: ... - # N.B. This method is partially applied at runtime to generate - # multiprocessing.Pool, so the two signatures should be identical (modulo - # self). def Pool( self, processes: Optional[int] = ..., diff --git a/stdlib/3/multiprocessing/synchronize.pyi b/stdlib/3/multiprocessing/synchronize.pyi new file mode 100644 index 000000000..3b8f0fd86 --- /dev/null +++ b/stdlib/3/multiprocessing/synchronize.pyi @@ -0,0 +1,64 @@ +from typing import Callable, ContextManager, Optional, Union + +from multiprocessing.context import BaseContext +import threading +import sys + +_LockLike = Union[Lock, RLock] + +class Barrier(threading.Barrier): + def __init__(self, + parties: int, + action: Optional[Callable] = ..., + timeout: Optional[float] = ..., + * + ctx: BaseContext) -> None: ... + +class BoundedSemaphore(Semaphore): + def __init__(self, value: int = ..., *, ctx: BaseContext) -> None: ... + +class Condition(ContextManager[bool]): + def __init__(self, + lock: Optional[_LockLike] = ..., + *, + ctx: BaseContext) -> None: ... + if sys.version_info >= (3, 7): + def notify(self, n: int = ...) -> None: ... + else: + def notify(self) -> None: ... + def notify_all(self) -> None: ... + def wait(self, timeout: Optional[float] = ...) -> bool: ... + if sys.version_info >= (3, 3): + def wait_for(self, + predicate: Callable[[], bool], + timeout: Optional[float] = ...) -> bool: ... + def acquire(self, + block: bool = ..., + timeout: Optional[float] = ...) -> bool: ... + def release(self) -> None: ... + +class Event(ContextManager[bool]): + def __init__(self, + lock: Optional[_LockLike] = ..., + *, + ctx: BaseContext) -> None: ... + def is_set(self) -> bool: ... + def set(self) -> None: ... + def clear(self) -> None: ... + def wait(self, timeout: Optional[float] = ...) -> bool: ... + +class Lock(SemLock): + def __init__(self, *, ctx: BaseContext) -> None: ... + +class RLock(SemLock): + def __init__(self, *, ctx: BaseContext) -> None: ... + +class Semaphore(SemLock): + def __init__(self, value: int = ..., *, ctx: BaseContext) -> None: ... + +# Not part of public API +class SemLock(ContextManager[bool]): + def acquire(self, + block: bool = ..., + timeout: Optional[float] = ...) -> bool: ... + def release(self) -> None: ...