mirror of
https://github.com/davidhalter/typeshed.git
synced 2025-12-07 20:54:28 +08:00
96 lines
3.1 KiB
Python
96 lines
3.1 KiB
Python
# Stubs for threading
|
|
|
|
from typing import Any, Optional, Callable, TypeVar, Union, List, Mapping, Sequence
|
|
|
|
def active_count() -> int: ...
|
|
def activeCount() -> int: ...
|
|
|
|
def current_thread() -> Thread: ...
|
|
def currentThread() -> Thread: ...
|
|
def enumerate() -> List[Thread]: ...
|
|
|
|
class Thread(object):
|
|
name = ... # type: str
|
|
ident = ... # type: Optional[int]
|
|
daemon = ... # type: bool
|
|
|
|
def __init__(self, group: Any = ..., target: Callable[..., Any] = ...,
|
|
name: str = ..., args: Sequence[Any] = ...,
|
|
kwargs: Mapping[str, Any] = ...) -> None: ...
|
|
def start(self) -> None: ...
|
|
def run(self) -> None: ...
|
|
def join(self, timeout: float = ...) -> None: ...
|
|
def is_alive(self) -> bool: ...
|
|
|
|
# Legacy methods
|
|
def isAlive(self) -> bool: ...
|
|
def getName(self) -> str: ...
|
|
def setName(self, name: str) -> None: ...
|
|
def isDaemon(self) -> bool: ...
|
|
def setDaemon(self, daemon: bool) -> None: ...
|
|
|
|
class Timer(Thread):
|
|
def __init__(self, interval: float, function: Callable[..., Any],
|
|
args: Sequence[Any] = ...,
|
|
kwargs: Mapping[str, Any] = ...) -> None: ...
|
|
def cancel(self) -> None : ...
|
|
|
|
# TODO: better type
|
|
def settrace(func: Callable[[Any, str, Any], Any]) -> None: ...
|
|
def setprofile(func: Any) -> None: ...
|
|
def stack_size(size: int = ...) -> None: ...
|
|
|
|
class ThreadError(Exception):
|
|
pass
|
|
|
|
class local(Any): ...
|
|
|
|
class Event(object):
|
|
def is_set(self) -> bool: ...
|
|
def isSet(self) -> bool: ...
|
|
def set(self) -> None: ...
|
|
def clear(self) -> None: ...
|
|
# TODO can it return None?
|
|
def wait(self, timeout: float = ...) -> bool: ...
|
|
|
|
class Lock(object):
|
|
def acquire(self, blocking: bool = ...) -> bool: ...
|
|
def release(self) -> None: ...
|
|
def locked(self) -> bool: ...
|
|
def __enter__(self) -> bool: ...
|
|
def __exit__(self, *args): ...
|
|
|
|
class RLock(object):
|
|
def acquire(self, blocking: int = ...) -> Optional[bool]: ...
|
|
def release(self) -> None: ...
|
|
def __enter__(self) -> bool: ...
|
|
def __exit__(self, *args): ...
|
|
|
|
class Semaphore(object):
|
|
def acquire(self, blocking: bool = ...) -> Optional[bool]: ...
|
|
def release(self) -> None: ...
|
|
def __init__(self, value: int = ...) -> None: ...
|
|
def __enter__(self) -> bool: ...
|
|
def __exit__(self, *args): ...
|
|
|
|
class BoundedSemaphore(Semaphore):
|
|
def acquire(self, blocking: bool = ...) -> Optional[bool]: ...
|
|
def release(self) -> None: ...
|
|
def __init__(self, value: int = ...) -> None: ...
|
|
def __enter__(self) -> bool: ...
|
|
def __exit__(self, *args): ...
|
|
|
|
_T = TypeVar('_T')
|
|
|
|
class Condition(object):
|
|
def acquire(self, blocking: bool = ...) -> bool: ...
|
|
def release(self) -> None: ...
|
|
def notify(self, n: int = ...) -> None: ...
|
|
def notify_all(self) -> None: ...
|
|
def notifyAll(self) -> None: ...
|
|
def wait(self, timeout: float = ...) -> bool: ...
|
|
def wait_for(self, predicate: Callable[[], _T], timeout: float = ...) -> Union[_T, bool]: ...
|
|
def __enter__(self) -> bool: ...
|
|
def __exit__(self, *args): ...
|
|
def __init__(self, lock: Lock = ...) -> None: ...
|