Add stubs for gevent (#10527)

This commit is contained in:
David Salvisberg
2023-12-06 20:32:48 +01:00
committed by GitHub
parent d8c174d26a
commit aded4aa390
65 changed files with 3715 additions and 0 deletions

View File

@@ -0,0 +1,241 @@
# Error: failed to find stubs
# =============================
# testing modules are not included in type stubs
gevent.testing
gevent.testing.*
gevent.tests
gevent.tests.*
# these are only present for monkey patching and should not be used directly
gevent.thread
gevent.threading
# deprecated module which should not be used anymore
gevent.builtins
gevent.contextvars
gevent.core
# part of internal API which is not needed for public type stubs
gevent._ffi.callback
# Error: is not present in stub
# =============================
# internal API stuff we dropped because it wasn't necessary
gevent._config.Config.subclass
gevent._ffi.CRITICAL
gevent._ffi.DEBUG
gevent._ffi.ERROR
gevent._ffi.GEVENT_DEBUG_LEVEL
gevent._ffi.TRACE
gevent._ffi.loop.AbstractLoop.async
gevent._ffi.loop.assign_standard_callbacks
gevent._fileobjectcommon.UniversalNewlineBytesWrapper
gevent._waiter.Waiter.switch_args
# loop local that wasn't deleted
gevent.resolver.blocking.Resolver.method
# isn't actually implemented for libuv, it just raises an exception
gevent.libuv.watcher.watcher.feed
# unnecessary python 2 compatibility stuff
gevent._config.Config.trace_malloc
gevent._imap.IMapUnordered.next
gevent.monkey.patch_builtins
gevent.monkey.patch_sys
gevent.pywsgi.Environ.iteritems
# weird method that doesn't work with this being generic, so we removed it
# it's not necessary for public API
gevent.hub.Waiter.switch_args
# these may be gevent extensions or methods for backwards compatibility
# for now we're fine with pretending it's not there
gevent.socket.SocketType.__enter__
gevent.socket.SocketType.__exit__
gevent.socket.SocketType.accept
gevent.socket.SocketType.closed
gevent.socket.SocketType.dup
gevent.socket.SocketType.get_inheritable
gevent.socket.SocketType.makefile
gevent.socket.SocketType.sendfile
gevent.socket.SocketType.set_inheritable
# zope.interface related attributes we can ignore
gevent.[\w\.]+\.__implemented__
gevent.[\w\.]+\.__providedBy__
gevent.[\w\.]+\.__provides__
# these shouldn't be in __all__ they end up there, due to how gevent imports
# the globals from the stdlib ssl module, For ssl/subprocess we ignore all symbols
# that start with an underscore (i.e. internal symbols)
gevent\.ssl\._[A-Za-z0-9]\w*
gevent.ssl.base64
gevent.ssl.create_connection
gevent.ssl.errno
gevent.ssl.os
gevent.ssl.socket_error
gevent.ssl.warnings
gevent\.subprocess\._[A-Za-z0-9]\w*
# gevent implements its own Popen which doesn't completely match the original
# API, for now we ignore this discrepancy
gevent.subprocess.Popen.rawlink
gevent.subprocess.Popen.__del__
# Error: is not present at runtime
# =============================
# Due to the way gevent copies globals from other modules, there's a few symbols
# that may not end up in the module, that would otherwise end up there with a
# `from x import *`. None of these seem critical, so we ignore them. For socket
# we ignore all constants that start with at least one segment of all uppercase
# letters before the underscore
gevent\.socket\.[A-Z0-9]+(_\w+)?
gevent.socket.AddressInfo
gevent.socket.MsgFlag
gevent.ssl.PROTOCOL_SSLv2
gevent.ssl.PROTOCOL_SSLv3
gevent.ssl.RAND_egd
# Error: differs from runtime type
# ======================
# these are None in the base class, but all settings are a subclass
# so it makes sense to annotate this as not None
gevent._config.Setting.default
gevent._config.Setting.environment_key
gevent._config.Setting.name
gevent._config.Setting.value
# it is set to None on the class but always initialized in __init__
gevent.hub.Hub.thread_ident
gevent.pywsgi.WSGIServer.error_log
gevent.pywsgi.WSGIServer.log
# Error: is inconsistent
# ======================
# minor config validation implementation difference that don't matter for
# the actual subclasses, which are proper settings.
gevent._config.SettingType.__new__
gevent._config._PositiveValueMixin.validate
# internal API implementation detail we don't care about
gevent._ffi.watcher.AbstractWatcherType.__new__
# these are inconsistent due to the ParamSpec hack for positional only callables
gevent._ffi.loop.AbstractLoop.run_callback
gevent._ffi.loop.AbstractLoop.run_callback_threadsafe
gevent._ffi.watcher.watcher.start
gevent._hub_primitives.WaitOperationsGreenlet.cancel_waits_close_and_then
gevent.baseserver.BaseServer.do_close
gevent.baseserver.BaseServer.do_handle
# we don't care about write/writeall allowing a named parameter
gevent._fileobjectcommon.FlushingBufferedWriter.write
gevent._fileobjectcommon.WriteIsWriteallMixin.write
gevent._fileobjectcommon.WriteallMixin.writeall
# these are different because of Cython, without Cython these don't have
# any arguments, so it should be annotated that way
gevent._ident.IdentRegistry.__init__
gevent.event.AsyncResult.__init__
gevent.event.Event.__init__
# positional only arguments due to Cython?
gevent._abstract_linkable.AbstractLinkable.rawlink
gevent._abstract_linkable.AbstractLinkable.unlink
# removed undocumented arguments for internal use
gevent.Greenlet.link
gevent.Greenlet.link_exception
gevent.Greenlet.link_value
gevent.greenlet.Greenlet.link
gevent.greenlet.Greenlet.link_exception
gevent.greenlet.Greenlet.link_value
gevent._ffi.watcher.IoMixin.__init__
gevent._threading.Queue.qsize
gevent.monkey.patch_module
gevent.monkey.patch_ssl
gevent.monkey.patch_thread
# removed deprecated argument
gevent._hub_primitives.wait_readwrite
gevent._hub_primitives.wait_write
gevent.pywsgi.WSGIHandler.__init__
# we have punted on socket, the gevent version of these functions sometimes use
# named parameters, while the base implementation only allows positional arguments
# we're fine with holding the geven implemenation to the same restrictions
# additionally there's some functions with additional optional arguments, that
# we are fine with ignoring for now as well
gevent.socket.SocketType.bind
gevent.socket.SocketType.connect
gevent.socket.SocketType.connect_ex
gevent.socket.SocketType.send
gevent.socket.SocketType.sendall
gevent.socket.SocketType.setblocking
gevent.socket.SocketType.settimeout
gevent.socket.SocketType.shutdown
gevent.socket.cancel_wait
gevent.socket.create_connection
gevent.socket.gethostbyaddr
gevent.socket.gethostbyname
gevent.socket.gethostbyname_ex
gevent.socket.getnameinfo
gevent.socket.socket.closed
gevent.socket.wait_readwrite
gevent.socket.wait_write
# we have punted on ssl, the gevent version of these functions have an additional
# argument for timeouts/blocking and there are some with different default values
# for nbytes/length, for now we ignore that fact
gevent.ssl.Purpose.__new__
gevent.ssl.SSLSocket.__init__
gevent.ssl.SSLSocket.do_handshake
gevent.ssl.SSLSocket.read
gevent.ssl.SSLSocket.send
gevent.ssl.get_server_certificate
# we have punted on subprocess, the gevent version has slightly different arguments
# for now we ignore that fact, most of the ways to call Popen should be compatible
gevent.subprocess.Popen.__init__
# we exclude the undocumented internal argument _raise_exc from the stubs
gevent.subprocess.Popen.wait
# we exclude the undocumented internal argument _format_hub from the stubs
gevent.threadpool.ThreadPool.__repr__
# gevent overwrites with a named parameter for fd, but we're fine with only
# supporting the API of the superclass
gevent.threadpool.ThreadPoolExecutor.submit
# we exclude the undocumented internal argument _one_shot from the stubs
gevent.Timeout.__init__
gevent.Timeout.start_new
gevent.timeout.Timeout.__init__
gevent.timeout.Timeout.start_new
# Error: is not a type/function
# =====================
# zope.interface related errors, these shouldn't matter
gevent._monitor.implementer
gevent.events.implementer
gevent.events.IEventLoopBlocked
gevent.events.IGeventDidPatchAllEvent
gevent.events.IGeventDidPatchBuiltinModulesEvent
gevent.events.IGeventDidPatchEvent
gevent.events.IGeventDidPatchModuleEvent
gevent.events.IGeventPatchEvent
gevent.events.IGeventWillPatchAllEvent
gevent.events.IGeventWillPatchEvent
gevent.events.IGeventWillPatchModuleEvent
gevent.events.IMemoryUsageThresholdExceeded
gevent.events.IMemoryUsageUnderThreshold
gevent.events.IPeriodicMonitorThread
gevent.events.IPeriodicMonitorThreadStartedEvent
# Error: failed to import
# ======================
# internal use module for some complex protocols used across different modules
# so there wasn't really a great place for them
gevent._types

View File

@@ -0,0 +1,33 @@
# Error: is not present in stub
# =============================
# internal API stuff we dropped because it wasn't necessary
gevent.libev.corecext.loop.async
# these shouldn't be in __all__ they end up there, due to how gevent imports
# the globals from the stdlib ssl module
gevent.subprocess.Popen.pipe_cloexec
# Error: is inconsistent
# ======================
# these are inconsistent due to the ParamSpec hack for positional only callables
gevent.libev.corecext.loop.run_callback
gevent.libev.corecext.loop.run_callback_threadsafe
gevent.libev.watcher.watcher.feed
# undocumented argument for internal use only
gevent.libev.watcher.watcher.__init__
# ares_host_result always has the same layout, so we set the arguments on __new__
# to reflect that fact, we don't care that the implementation accepts any number
# of arguments
gevent.resolver.cares.ares_host_result.__new__
# we have punted on socket, the gevent version of these functions sometimes use
# named parameters, while the base implementation only allows positional arguments
# we're fine with holding the geven implemenation to the same restrictions
# additionally there's some functions with additional optional arguments, that
# we are fine with ignoring for now as well
gevent.socket.SocketType.recvmsg_into
gevent.socket.SocketType.sendmsg
gevent.socket.socket.recvmsg_into
gevent.socket.socket.sendmsg

View File

@@ -0,0 +1,45 @@
# Error: is not present in stub
# =============================
# internal API stuff we dropped because it wasn't necessary
gevent.libev.corecext.loop.async
# these shouldn't be in __all__ they end up there, due to how gevent imports
# the globals from the stdlib ssl module
gevent.subprocess.Popen.pipe_cloexec
gevent.subprocess.Popen.rawlink
# Error: is not present at runtime
# =============================
# this is currently missing from the gevent implementation, but we'll ignore
# it for simplicity's sake
gevent.socket.SocketType.sendmsg_afalg
# Due to the way gevent copies globals from other modules, there's a few symbols
# that may not end up in the module, that would otherwise end up there with a
# `from x import *`. None of these seem critical, so we ignore them
gevent.socket.MsgFlag
# Error: is inconsistent
# ======================
# these are inconsistent due to the ParamSpec hack for positional only callables
gevent.libev.corecext.loop.run_callback
gevent.libev.corecext.loop.run_callback_threadsafe
gevent.libev.watcher.watcher.feed
# undocumented argument for internal use only
gevent.libev.watcher.watcher.__init__
# ares_host_result always has the same layout, so we set the arguments on __new__
# to reflect that fact, we don't care that the implementation accepts any number
# of arguments
gevent.resolver.cares.ares_host_result.__new__
# we have punted on socket, the gevent version of these functions sometimes use
# named parameters, while the base implementation only allows positional arguments
# we're fine with holding the geven implemenation to the same restrictions
# additionally there's some functions with additional optional arguments, that
# we are fine with ignoring for now as well
gevent.socket.SocketType.recvmsg_into
gevent.socket.SocketType.sendmsg
gevent.socket.socket.recvmsg_into
gevent.socket.socket.sendmsg

View File

@@ -0,0 +1,26 @@
# Error: is not present in stub
# =============================
# these get exported but don't actually work on win32 so we ignore them
gevent.signal.getsignal
gevent.signal.signal
# these don't exist on win32 in stdlib, but they do in gevent, for now we ignore them
gevent.socket.SocketType.share
# the docs say this doesn't work on windows, so it has been removed
gevent._ffi.loop.AbstractLoop.fork
# for some reason this extension exists even though it is not supported on windows
gevent.libev.corecext.*
# Error: failed to import
# =============================
# these won't work until we find out if we can install libev somehow with choco
gevent.libev.corecffi
gevent.libev.watcher
# these don't work on windows
gevent.ares
gevent.resolver.ares
gevent.resolver.cares
gevent.resolver_ares

View File

@@ -0,0 +1,13 @@
version = "23.9.*"
upstream_repository = "https://github.com/gevent/gevent"
requires = ["types-greenlet", "types-psutil"]
requires_python = ">=3.8"
[tool.stubtest]
# Run stubtest on all platforms, since there is some platform specific stuff
# especially in the stdlib module replacement
platforms = ["linux", "darwin", "win32"]
# for testing the ffi loop implementations on all platforms
stubtest_requirements = ["cffi", "dnspython"]
apt_dependencies = ["libev4", "libev-dev", "libuv1", "libuv1-dev"]
brew_dependencies = ["libev", "libuv"]

View File

@@ -0,0 +1,76 @@
import sys
from gevent._config import config as config
from gevent._hub_local import get_hub as get_hub
from gevent._hub_primitives import iwait_on_objects as iwait, wait_on_objects as wait
from gevent.greenlet import Greenlet as Greenlet, joinall as joinall, killall as killall
from gevent.hub import (
GreenletExit as GreenletExit,
getcurrent as getcurrent,
idle as idle,
kill as kill,
reinit as reinit,
signal as signal_handler,
sleep as sleep,
spawn_raw as spawn_raw,
)
from gevent.timeout import Timeout as Timeout, with_timeout as with_timeout
if sys.platform != "win32":
from gevent.os import fork
__all__ = [
"Greenlet",
"GreenletExit",
"Timeout",
"config",
"fork",
"get_hub",
"getcurrent",
"getswitchinterval",
"idle",
"iwait",
"joinall",
"kill",
"killall",
"reinit",
"setswitchinterval",
"signal_handler",
"sleep",
"spawn",
"spawn_later",
"spawn_raw",
"wait",
"with_timeout",
]
else:
__all__ = [
"Greenlet",
"GreenletExit",
"Timeout",
"config",
"get_hub",
"getcurrent",
"getswitchinterval",
"idle",
"iwait",
"joinall",
"kill",
"killall",
"reinit",
"setswitchinterval",
"signal_handler",
"sleep",
"spawn",
"spawn_later",
"spawn_raw",
"wait",
"with_timeout",
]
__version__: str
getswitchinterval = sys.getswitchinterval
setswitchinterval = sys.setswitchinterval
spawn = Greenlet.spawn
spawn_later = Greenlet.spawn_later

View File

@@ -0,0 +1,13 @@
from collections.abc import Callable
from typing_extensions import Self
from gevent.hub import Hub
class AbstractLinkable:
@property
def hub(self) -> Hub | None: ...
def __init__(self, hub: Hub | None = None) -> None: ...
def linkcount(self) -> int: ...
def rawlink(self, __callback: Callable[[Self], object]) -> None: ...
def ready(self) -> bool: ...
def unlink(self, __callback: Callable[[Self], object]) -> None: ...

View File

@@ -0,0 +1,196 @@
from collections.abc import Callable, Sequence
from typing import Any, Generic, NoReturn, Protocol, TypeVar, overload
from gevent._types import _Loop, _Resolver
from gevent.fileobject import _FileObjectType
from gevent.threadpool import ThreadPool
__all__ = ["config"]
_T = TypeVar("_T")
class _SettingDescriptor(Protocol[_T]):
@overload
def __get__(self, obj: None, owner: type[Config]) -> property: ...
@overload
def __get__(self, obj: Config, owner: type[Config]) -> _T: ...
def __set__(self, obj: Config, value: str | _T) -> None: ...
class SettingType(type):
def fmt_desc(cls, desc: str) -> str: ...
def validate_invalid(value: object) -> NoReturn: ...
def validate_bool(value: str | bool) -> bool: ...
def validate_anything(value: _T) -> _T: ...
convert_str_value_as_is = validate_anything
class Setting(Generic[_T], metaclass=SettingType):
order: int # all subclasses have this
name: str
environment_key: str
value: _T
default: _T
document: bool
desc: str
validate: Callable[[Any], _T]
def get(self) -> _T: ...
def set(self, val: str | _T) -> None: ...
class Config:
settings: dict[str, Setting[Any]]
def __init__(self) -> None: ...
def __getattr__(self, name: str) -> Any: ...
def __setattr__(self, name: str, value: object) -> None: ...
def set(self, name: str, value: object) -> None: ...
def __dir__(self) -> list[str]: ...
def print_help(self) -> None: ...
# we manually add properties for all the settings in this module
# SettingType inserts a property into Config for every subclass of Setting
resolver: _SettingDescriptor[type[_Resolver]]
threadpool: _SettingDescriptor[type[Threadpool]]
threadpool_idle_task_timeout: _SettingDescriptor[float]
loop: _SettingDescriptor[type[_Loop]]
format_context: _SettingDescriptor[Callable[[Any], str]]
libev_backend: _SettingDescriptor[str | None]
fileobject: _SettingDescriptor[_FileObjectType]
disable_watch_children: _SettingDescriptor[bool]
track_greenlet_tree: _SettingDescriptor[bool]
monitor_thread: _SettingDescriptor[bool]
max_blocking_time: _SettingDescriptor[float]
memory_monitor_period: _SettingDescriptor[float]
max_memory_usage: _SettingDescriptor[int | None]
resolver_nameservers: _SettingDescriptor[Sequence[str] | str | None]
resolver_timeout: _SettingDescriptor[float | None]
# these get parsed by gevent.resolver.cares.channel so the Setting does not
# perform any conversion, but we know at least what types can be valid
ares_flags: _SettingDescriptor[str | int | None]
ares_timeout: _SettingDescriptor[str | float | None]
ares_tries: _SettingDescriptor[str | int | None]
ares_ndots: _SettingDescriptor[str | int | None]
ares_udp_port: _SettingDescriptor[str | int | None]
ares_tcp_port: _SettingDescriptor[str | int | None]
ares_servers: _SettingDescriptor[Sequence[str] | str | None]
class ImportableSetting(Generic[_T]):
default: str | Sequence[str]
shortname_map: dict[str, str]
def validate(self, value: str | _T) -> _T: ...
def get_options(self) -> dict[str, _T]: ...
class BoolSettingMixin:
@staticmethod
def validate(value: str | bool) -> bool: ...
class IntSettingMixin:
@staticmethod
def validate(value: int) -> int: ...
class _PositiveValueMixin(Generic[_T]):
@staticmethod
def validate(value: _T) -> _T: ...
class FloatSettingMixin(_PositiveValueMixin[float]): ...
class ByteCountSettingMixin(_PositiveValueMixin[int]): ...
class Resolver(ImportableSetting[type[_Resolver]], Setting[type[_Resolver]]):
desc: str
default: list[str] # type: ignore[assignment]
shortname_map: dict[str, str]
class Threadpool(ImportableSetting[type[ThreadPool]], Setting[type[ThreadPool]]):
desc: str
default: str # type: ignore[assignment]
class ThreadpoolIdleTaskTimeout(FloatSettingMixin, Setting[float]):
document: bool
desc: str
default: float
class Loop(ImportableSetting[type[_Loop]], Setting[type[_Loop]]):
desc: str
default: list[str] # type: ignore[assignment]
shortname_map: dict[str, str]
class FormatContext(ImportableSetting[Callable[[Any], str]], Setting[Callable[[Any], str]]):
default: str # type: ignore[assignment]
class LibevBackend(Setting[str | None]):
desc: str
default: None
class FileObject(ImportableSetting[_FileObjectType], Setting[_FileObjectType]):
desc: str
default: list[str] # type: ignore[assignment]
shortname_map: dict[str, str]
class WatchChildren(BoolSettingMixin, Setting[bool]):
desc: str
default: bool
class TrackGreenletTree(BoolSettingMixin, Setting[bool]):
default: bool
desc: str
class MonitorThread(BoolSettingMixin, Setting[bool]):
default: bool
desc: str
class MaxBlockingTime(FloatSettingMixin, Setting[float]):
default: float
desc: str
class MonitorMemoryPeriod(FloatSettingMixin, Setting[float]):
default: int
desc: str
class MonitorMemoryMaxUsage(ByteCountSettingMixin, Setting[int | None]):
default: None
desc: str
class AresSettingMixin:
document: bool
@property
def kwarg_name(self) -> str: ...
validate: Any # we just want this to mixin without errors
class AresFlags(AresSettingMixin, Setting[str | int | None]):
default: None
class AresTimeout(AresSettingMixin, Setting[str | float | None]):
document: bool
default: None
desc: str
class AresTries(AresSettingMixin, Setting[str | int | None]):
default: None
class AresNdots(AresSettingMixin, Setting[str | int | None]):
default: None
class AresUDPPort(AresSettingMixin, Setting[str | int | None]):
default: None
class AresTCPPort(AresSettingMixin, Setting[str | int | None]):
default: None
class AresServers(AresSettingMixin, Setting[Sequence[str] | str | None]):
document: bool
default: None
desc: str
class ResolverNameservers(AresSettingMixin, Setting[Sequence[str] | str | None]):
document: bool
default: None
desc: str
@property
def kwarg_name(self) -> str: ...
class ResolverTimeout(FloatSettingMixin, AresSettingMixin, Setting[float | None]):
document: bool
desc: str
@property
def kwarg_name(self) -> str: ...
config: Config = ...

View File

View File

@@ -0,0 +1,83 @@
import sys
from _typeshed import FileDescriptor
from collections.abc import Callable
from types import TracebackType
from typing import Protocol
from typing_extensions import ParamSpec, TypeAlias
from gevent._types import _AsyncWatcher, _Callback, _ChildWatcher, _IoWatcher, _StatWatcher, _TimerWatcher, _Watcher
_P = ParamSpec("_P")
_ErrorHandlerFunc: TypeAlias = Callable[
[object | None, type[BaseException] | None, BaseException | None, TracebackType | None], object
]
class _SupportsHandleError(Protocol):
handle_error: _ErrorHandlerFunc
_ErrorHandler: TypeAlias = _ErrorHandlerFunc | _SupportsHandleError
class AbstractLoop:
CALLBACK_CHECK_COUNT: int
error_handler: _ErrorHandler | None
starting_timer_may_update_loop_time: bool
# internal API, this __init__ will only be called from subclasses
def __init__(
self, ffi: object, lib: object, watchers: object, flags: int | None = ..., default: bool | None = ...
) -> None: ...
def destroy(self) -> bool | None: ...
@property
def ptr(self) -> int: ...
@property
def WatcherType(self) -> type[_Watcher]: ...
@property
def MAXPRI(self) -> int: ...
@property
def MINPRI(self) -> int: ...
def handle_error(
self, context: object | None, type: type[BaseException] | None, value: BaseException | None, tb: TracebackType | None
) -> None: ...
def run(self, nowait: bool = False, once: bool = False) -> None: ...
def reinit(self) -> None: ...
def ref(self) -> None: ...
def unref(self) -> None: ...
def break_(self, how: int | None = ...) -> None: ...
def verify(self) -> None: ...
def now(self) -> float: ...
def update_now(self) -> None: ...
update = update_now # deprecated
@property
def default(self) -> bool: ...
@property
def iteration(self) -> int: ...
@property
def depth(self) -> int: ...
@property
def backend_int(self) -> int: ...
@property
def backend(self) -> str | int: ...
@property
def pendingcnt(self) -> int: ...
@property
def activecnt(self) -> int: ...
def io(self, fd: FileDescriptor, events: int, ref: bool = True, priority: int | None = None) -> _IoWatcher: ...
def closing_fd(self, fd: FileDescriptor) -> bool: ...
def timer(self, after: float, repeat: float = 0.0, ref: bool = True, priority: int | None = None) -> _TimerWatcher: ...
def signal(self, signum: int, ref: bool = True, priority: int | None = None) -> _Watcher: ...
def idle(self, ref: bool = True, priority: int | None = None) -> _Watcher: ...
def prepare(self, ref: bool = True, priority: int | None = None) -> _Watcher: ...
def check(self, ref: bool = True, priority: int | None = None) -> _Watcher: ...
if sys.platform != "win32":
def fork(self, ref: bool = True, priority: int | None = None) -> _Watcher: ...
def child(self, pid: int, trace: int = 0, ref: bool = True) -> _ChildWatcher: ...
def install_sigchld(self) -> None: ...
def async_(self, ref: bool = True, priority: int | None = None) -> _AsyncWatcher: ...
def stat(self, path: str, interval: float = 0.0, ref: bool = True, priority: bool | None = ...) -> _StatWatcher: ...
# These technically don't allow the functions arguments to be passed in as kwargs
# but there's no way to express that yet with ParamSpec, however, we would still like
# to verify that the arguments match
def run_callback(self, func: Callable[_P, object], *args: _P.args, **_: _P.kwargs) -> _Callback: ...
def run_callback_threadsafe(self, func: Callable[_P, object], *args: _P.args, **_: _P.kwargs) -> _Callback: ...
def callback(self, priority: float | None = ...) -> _Callback: ...
def fileno(self) -> FileDescriptor | None: ...

View File

@@ -0,0 +1,95 @@
from _typeshed import FileDescriptor, StrOrBytesPath
from collections.abc import Callable
from types import TracebackType
from typing import Any, overload
from typing_extensions import Concatenate, Literal, ParamSpec, Self
from gevent._types import _Loop, _StatResult
_P = ParamSpec("_P")
class AbstractWatcherType(type):
def new_handle(cls, obj: object) -> int: ...
def new(cls, kind: object) -> Any: ...
class watcher(metaclass=AbstractWatcherType):
loop: _Loop
def __init__(self, _loop: _Loop, ref: bool = True, priority: int | None = None, args: tuple[object, ...] = ...) -> None: ...
def close(self) -> None: ...
def __enter__(self) -> Self: ...
def __exit__(self, t: type[BaseException] | None, v: BaseException | None, tb: TracebackType | None) -> None: ...
@property
def ref(self) -> bool: ...
callback: Callable[..., Any]
args: tuple[Any, ...]
def start(self, callback: Callable[_P, Any], *args: _P.args, **_: _P.kwargs) -> None: ...
def stop(self) -> None: ...
@property
def priority(self) -> int | None: ...
@priority.setter
def priority(self, value: int | None) -> None: ...
@property
def active(self) -> bool: ...
@property
def pending(self) -> bool: ...
class IoMixin:
EVENT_MASK: int
def __init__(self, loop: _Loop, fd: FileDescriptor, events: int, ref: bool = True, priority: int | None = None) -> None: ...
# pass_events means the first argument of the callback needs to be an integer, but we can't
# type check the other passed in args in this case
@overload
def start(self, callback: Callable[Concatenate[int, _P], Any], *args: Any, pass_events: Literal[True]) -> None: ...
@overload
def start(self, callback: Callable[_P, Any], *args: _P.args, **_: _P.kwargs) -> None: ...
class TimerMixin:
def __init__(
self, loop: _Loop, after: float = 0.0, repeat: float = 0.0, ref: bool = True, priority: int | None = None
) -> None: ...
# this has one specific allowed keyword argument, if it is given we don't try to check
# the passed in arguments, but if it isn't passed in, then we do.
@overload
def start(self, callback: Callable[..., Any], *args: Any, update: bool) -> None: ...
@overload
def start(self, callback: Callable[_P, Any], *args: _P.args, **_: _P.kwargs) -> None: ...
@overload
def again(self, callback: Callable[..., Any], *args: Any, update: bool) -> None: ...
@overload
def again(self, callback: Callable[_P, Any], *args: _P.args, **_: _P.kwargs) -> None: ...
class SignalMixin:
def __init__(self, loop: _Loop, signalnum: int, ref: bool = True, priority: int | None = None) -> None: ...
class IdleMixin: ...
class PrepareMixin: ...
class CheckMixin: ...
class ForkMixin: ...
class AsyncMixin:
def send(self) -> None: ...
def send_ignoring_arg(self, _ignored: object) -> None: ...
@property
def pending(self) -> bool: ...
class ChildMixin:
def __init__(self, loop: _Loop, pid: int, trace: int = 0, ref: bool = True) -> None: ...
@property
def pid(self) -> int: ...
@property
def rpid(self) -> int | None: ...
@property
def rstatus(self) -> int: ...
class StatMixin:
def __init__(
self, _loop: _Loop, path: StrOrBytesPath, interval: float = 0.0, ref: bool = True, priority: float | None = None
) -> None: ...
@property
def path(self) -> StrOrBytesPath: ...
@property
def attr(self) -> _StatResult | None: ...
@property
def prev(self) -> _StatResult | None: ...
@property
def interval(self) -> float: ...

View File

@@ -0,0 +1,367 @@
import io
from _typeshed import (
FileDescriptorOrPath,
OpenBinaryMode,
OpenBinaryModeReading,
OpenBinaryModeUpdating,
OpenBinaryModeWriting,
OpenTextMode,
ReadableBuffer,
)
from types import TracebackType
from typing import IO, Any, AnyStr, ClassVar, Generic, TypeVar, overload
from typing_extensions import Literal, Self
from gevent.lock import DummySemaphore, Semaphore
from gevent.threadpool import ThreadPool
_IOT = TypeVar("_IOT", bound=IO[Any])
class cancel_wait_ex(IOError):
def __init__(self) -> None: ...
class FileObjectClosed(IOError):
def __init__(self) -> None: ...
class FlushingBufferedWriter(io.BufferedWriter): ...
class WriteallMixin:
def writeall(self, __b: ReadableBuffer) -> int: ...
class FileIO(io.FileIO): ...
class WriteIsWriteallMixin(WriteallMixin):
def write(self, __b: ReadableBuffer) -> int: ...
class WriteallFileIO(WriteIsWriteallMixin, io.FileIO): ... # type: ignore[misc]
class OpenDescriptor(Generic[_IOT]):
default_buffer_size: ClassVar[int]
fileio_mode: str
mode: str
creating: bool
reading: bool
writing: bool
appending: bool
updating: bool
text: bool
binary: bool
can_write: bool
can_read: bool
native: bool
universal: bool
buffering: int
encoding: str | None
errors: str | None
newline: bool
closefd: bool
atomic_write: bool
# we could add all the necessary overloads here too, but since this is internal API
# I don't think it makes sense to do that
def __init__(
self,
fobj: FileDescriptorOrPath,
mode: str = "r",
bufsize: int | None = None,
close: bool | None = None,
encoding: str | None = None,
errors: str | None = None,
newline: str | None = None,
buffering: int | None = None,
closefd: bool | None = None,
atomic_write: bool = False,
) -> None: ...
def is_fd(self) -> bool: ...
def opened(self) -> _IOT: ...
def opened_raw(self) -> FileIO: ...
@staticmethod
def is_buffered(stream: object) -> bool: ...
@classmethod
def buffer_size_for_stream(cls, stream: object) -> int: ...
class FileObjectBase(Generic[_IOT, AnyStr]):
def __init__(self: FileObjectBase[_IOT, AnyStr], descriptor: OpenDescriptor[_IOT]) -> None: ...
io: _IOT
@property
def closed(self) -> bool: ...
def close(self) -> None: ...
def __getattr__(self, name: str) -> Any: ...
def __enter__(self) -> Self: ...
def __exit__(self, __typ: type[BaseException] | None, __value: BaseException | None, __tb: TracebackType | None) -> None: ...
def __iter__(self) -> Self: ...
def __next__(self) -> AnyStr: ...
def __bool__(self) -> bool: ...
next = __next__
class FileObjectBlock(FileObjectBase[_IOT, AnyStr]):
# Text mode: always binds a TextIOWrapper
@overload
def __init__(
self: FileObjectBlock[io.TextIOWrapper, str],
fobj: FileDescriptorOrPath,
mode: OpenTextMode = "r",
bufsize: int | None = None,
close: bool | None = None,
encoding: str | None = None,
errors: str | None = None,
newline: str | None = None,
buffering: int | None = None,
closefd: bool | None = None,
atomic_write: bool = False,
) -> None: ...
# Unbuffered binary mode: binds a FileIO
@overload
def __init__(
self: FileObjectBlock[io.FileIO, bytes],
fobj: FileDescriptorOrPath,
mode: OpenBinaryMode,
bufsize: Literal[0],
close: bool | None = None,
encoding: str | None = None,
errors: str | None = None,
newline: str | None = None,
buffering: Literal[0] | None = None,
closefd: bool | None = None,
atomic_write: bool = False,
) -> None: ...
@overload
def __init__(
self: FileObjectBlock[io.FileIO, bytes],
fobj: FileDescriptorOrPath,
mode: OpenBinaryMode,
bufsize: Literal[0] | None = None,
close: bool | None = None,
encoding: str | None = None,
errors: str | None = None,
newline: str | None = None,
*,
buffering: Literal[0],
closefd: bool | None = None,
atomic_write: bool = False,
) -> None: ...
# Buffering is on: return BufferedRandom, BufferedReader, or BufferedWriter
@overload
def __init__(
self: FileObjectBlock[io.BufferedRandom, bytes],
fobj: FileDescriptorOrPath,
mode: OpenBinaryModeUpdating,
bufsize: Literal[-1, 1] | None = None,
close: bool | None = None,
encoding: str | None = None,
errors: str | None = None,
newline: str | None = None,
buffering: Literal[-1, 1] | None = None,
closefd: bool | None = None,
atomic_write: bool = False,
) -> None: ...
@overload
def __init__(
self: FileObjectBlock[io.BufferedWriter, bytes],
fobj: FileDescriptorOrPath,
mode: OpenBinaryModeWriting,
bufsize: Literal[-1, 1] | None = None,
close: bool | None = None,
encoding: str | None = None,
errors: str | None = None,
newline: str | None = None,
buffering: Literal[-1, 1] | None = None,
closefd: bool | None = None,
atomic_write: bool = False,
) -> None: ...
@overload
def __init__(
self: FileObjectBlock[io.BufferedReader, bytes],
fobj: FileDescriptorOrPath,
mode: OpenBinaryModeReading,
bufsize: Literal[-1, 1] | None = None,
close: bool | None = None,
encoding: str | None = None,
errors: str | None = None,
newline: str | None = None,
buffering: Literal[-1, 1] | None = None,
closefd: bool | None = None,
atomic_write: bool = False,
) -> None: ...
# Buffering cannot be determined: fall back to BinaryIO
@overload
def __init__(
self: FileObjectBlock[IO[bytes], bytes],
fobj: FileDescriptorOrPath,
mode: OpenBinaryMode,
bufsize: int | None = None,
close: bool | None = None,
encoding: str | None = None,
errors: str | None = None,
newline: str | None = None,
buffering: int | None = None,
closefd: bool | None = None,
atomic_write: bool = False,
) -> None: ...
# Fallback if mode is not specified
@overload
def __init__(
self: FileObjectBlock[IO[Any], Any],
fobj: FileDescriptorOrPath,
mode: str,
bufsize: int | None = None,
close: bool | None = None,
encoding: str | None = None,
errors: str | None = None,
newline: str | None = None,
buffering: int | None = None,
closefd: bool | None = None,
atomic_write: bool = False,
) -> None: ...
class FileObjectThread(FileObjectBase[_IOT, AnyStr]):
threadpool: ThreadPool
lock: Semaphore | DummySemaphore
# Text mode: always binds a TextIOWrapper
@overload
def __init__(
self: FileObjectThread[io.TextIOWrapper, str],
fobj: FileDescriptorOrPath,
mode: OpenTextMode = "r",
bufsize: int | None = None,
close: bool | None = None,
encoding: str | None = None,
errors: str | None = None,
newline: str | None = None,
buffering: int | None = None,
closefd: bool | None = None,
atomic_write: bool = False,
*,
lock: bool = True,
threadpool: ThreadPool | None = None,
) -> None: ...
# Unbuffered binary mode: binds a FileIO
@overload
def __init__(
self: FileObjectThread[io.FileIO, bytes],
fobj: FileDescriptorOrPath,
mode: OpenBinaryMode,
bufsize: Literal[0],
close: bool | None = None,
encoding: str | None = None,
errors: str | None = None,
newline: str | None = None,
buffering: Literal[0] | None = None,
closefd: bool | None = None,
atomic_write: bool = False,
*,
lock: bool = True,
threadpool: ThreadPool | None = None,
) -> None: ...
@overload
def __init__(
self: FileObjectThread[io.FileIO, bytes],
fobj: FileDescriptorOrPath,
mode: OpenBinaryMode,
bufsize: Literal[0] | None = None,
close: bool | None = None,
encoding: str | None = None,
errors: str | None = None,
newline: str | None = None,
*,
buffering: Literal[0],
closefd: bool | None = None,
atomic_write: bool = False,
lock: bool = True,
threadpool: ThreadPool | None = None,
) -> None: ...
# Buffering is on: return BufferedRandom, BufferedReader, or BufferedWriter
@overload
def __init__(
self: FileObjectThread[io.BufferedRandom, bytes],
fobj: FileDescriptorOrPath,
mode: OpenBinaryModeUpdating,
bufsize: Literal[-1, 1] | None = None,
close: bool | None = None,
encoding: str | None = None,
errors: str | None = None,
newline: str | None = None,
buffering: Literal[-1, 1] | None = None,
closefd: bool | None = None,
atomic_write: bool = False,
*,
lock: bool = True,
threadpool: ThreadPool | None = None,
) -> None: ...
@overload
def __init__(
self: FileObjectThread[io.BufferedWriter, bytes],
fobj: FileDescriptorOrPath,
mode: OpenBinaryModeWriting,
bufsize: Literal[-1, 1] | None = None,
close: bool | None = None,
encoding: str | None = None,
errors: str | None = None,
newline: str | None = None,
buffering: Literal[-1, 1] | None = None,
closefd: bool | None = None,
atomic_write: bool = False,
*,
lock: bool = True,
threadpool: ThreadPool | None = None,
) -> None: ...
@overload
def __init__(
self: FileObjectThread[io.BufferedReader, bytes],
fobj: FileDescriptorOrPath,
mode: OpenBinaryModeReading,
bufsize: Literal[-1, 1] | None = None,
close: bool | None = None,
encoding: str | None = None,
errors: str | None = None,
newline: str | None = None,
buffering: Literal[-1, 1] | None = None,
closefd: bool | None = None,
atomic_write: bool = False,
*,
lock: bool = True,
threadpool: ThreadPool | None = None,
) -> None: ...
# Buffering cannot be determined: fall back to BinaryIO
@overload
def __init__(
self: FileObjectThread[IO[bytes], bytes],
fobj: FileDescriptorOrPath,
mode: OpenBinaryMode,
bufsize: int | None = None,
close: bool | None = None,
encoding: str | None = None,
errors: str | None = None,
newline: str | None = None,
buffering: int | None = None,
closefd: bool | None = None,
atomic_write: bool = False,
*,
lock: bool = True,
threadpool: ThreadPool | None = None,
) -> None: ...
# Fallback if mode is not specified
@overload
def __init__(
self: FileObjectThread[IO[Any], Any],
fobj: FileDescriptorOrPath,
mode: str,
bufsize: int | None = None,
close: bool | None = None,
encoding: str | None = None,
errors: str | None = None,
newline: str | None = None,
buffering: int | None = None,
closefd: bool | None = None,
atomic_write: bool = False,
*,
lock: bool = True,
threadpool: ThreadPool | None = None,
) -> None: ...

View File

@@ -0,0 +1,16 @@
from abc import abstractmethod
from typing import Any, NoReturn
from gevent._types import _Loop
from greenlet import greenlet
class TrackedRawGreenlet(greenlet): ...
class SwitchOutGreenletWithLoop(TrackedRawGreenlet):
@property
@abstractmethod
def loop(self) -> _Loop: ...
@loop.setter
def loop(self, value: _Loop) -> None: ...
def switch(self) -> Any: ...
def switch_out(self) -> NoReturn: ...

View File

@@ -0,0 +1,15 @@
from gevent._types import _Loop
from gevent.hub import Hub as _Hub
__all__ = ["get_hub", "get_hub_noargs", "get_hub_if_exists"]
Hub: type[_Hub] | None
def get_hub_class() -> type[_Hub] | None: ...
def set_default_hub_class(hubtype: type[_Hub]) -> None: ...
def get_hub() -> _Hub: ...
def get_hub_noargs() -> _Hub: ...
def get_hub_if_exists() -> _Hub | None: ...
def set_hub(hub: _Hub) -> None: ...
def get_loop() -> _Loop: ...
def set_loop(loop: _Loop) -> None: ...

View File

@@ -0,0 +1,72 @@
from _typeshed import FileDescriptor
from collections.abc import Callable, Collection, Iterable
from types import TracebackType
from typing import Any, Generic, Protocol, TypeVar, overload
from typing_extensions import ParamSpec, Self
from gevent._greenlet_primitives import SwitchOutGreenletWithLoop
from gevent._types import _Loop, _Watcher
from gevent.hub import Hub
from gevent.socket import socket
__all__ = ["WaitOperationsGreenlet", "iwait_on_objects", "wait_on_objects", "wait_read", "wait_write", "wait_readwrite"]
_T = TypeVar("_T")
_WaitableT = TypeVar("_WaitableT", bound=_Waitable)
_P = ParamSpec("_P")
class _Waitable(Protocol):
def rawlink(self, __callback: Callable[[Any], object]) -> object: ...
def unlink(self, __callback: Callable[[Any], object]) -> object: ...
class WaitOperationsGreenlet(SwitchOutGreenletWithLoop):
loop: _Loop
def wait(self, watcher: _Watcher) -> None: ...
# These then doesn't allow keyword arguments, but ParamSpec doesn't allow for that
def cancel_waits_close_and_then(
self,
watchers: Iterable[_Watcher],
exc_kind: type[BaseException] | BaseException,
then: Callable[_P, object],
*then_args: _P.args,
**_: _P.kwargs,
) -> None: ...
def cancel_wait(self, watcher: _Watcher, error: type[BaseException] | BaseException, close_watcher: bool = False) -> None: ...
class _WaitIterator(Generic[_T]):
def __init__(self, objects: Collection[_T], hub: Hub, timeout: float, count: None | int) -> None: ...
def __enter__(self) -> Self: ...
def __exit__(self, typ: type[BaseException] | None, value: BaseException | None, tb: TracebackType | None) -> None: ...
def __iter__(self) -> Self: ...
def __next__(self) -> _T: ...
next = __next__
@overload
def iwait_on_objects(objects: None, timeout: float | None = None, count: int | None = None) -> list[bool]: ...
@overload
def iwait_on_objects(
objects: Collection[_WaitableT], timeout: float | None = None, count: int | None = None
) -> _WaitIterator[_WaitableT]: ...
@overload
def wait_on_objects(objects: None = None, timeout: float | None = None, count: int | None = None) -> bool: ...
@overload
def wait_on_objects(
objects: Collection[_WaitableT], timeout: float | None = None, count: int | None = None
) -> list[_WaitableT]: ...
def set_default_timeout_error(e: type[BaseException]) -> None: ...
def wait_on_socket(socket: socket, watcher: _Watcher, timeout_exc: type[BaseException] | BaseException | None = None) -> None: ...
def wait_on_watcher(
watcher: _Watcher,
timeout: float | None = None,
timeout_exc: type[BaseException] | BaseException = ...,
hub: Hub | None = None,
) -> None: ...
def wait_read(
fileno: FileDescriptor, timeout: float | None = None, timeout_exc: type[BaseException] | BaseException = ...
) -> None: ...
def wait_write(
fileno: FileDescriptor, timeout: float | None = None, timeout_exc: type[BaseException] | BaseException = ...
) -> None: ...
def wait_readwrite(
fileno: FileDescriptor, timeout: float | None = None, timeout_exc: type[BaseException] | BaseException = ...
) -> None: ...

View File

@@ -0,0 +1,13 @@
from typing import Any
from typing_extensions import final
from weakref import ref
@final
class ValuedWeakRef(ref):
value: Any
@final
class IdentRegistry:
def __init__(self) -> None: ...
def get_ident(self, obj: object) -> int: ...
def __len__(self) -> int: ...

View File

@@ -0,0 +1,24 @@
from collections.abc import Callable, Iterable
from typing import Any, TypeVar
from typing_extensions import ParamSpec, Self
from gevent.greenlet import Greenlet
from gevent.queue import UnboundQueue
_T = TypeVar("_T")
_P = ParamSpec("_P")
# this matches builtins.map to some degree, but since it is an non-public API type that just gets
# returned by some public API functions, we don't bother adding a whole bunch of overloads to handle
# the case of 1-n Iterables being passed in and just go for the fully unsafe signature
# we do the crazy overloads instead in the functions that create these objects
class IMapUnordered(Greenlet[_P, _T]):
finished: bool
# it may contain an undocumented Failure object
queue: UnboundQueue[_T | object]
def __init__(self, func: Callable[_P, _T], iterable: Iterable[Any], spawn: Callable[_P, Greenlet[_P, _T]]) -> None: ...
def __iter__(self) -> Self: ...
def __next__(self) -> _T: ...
class IMap(IMapUnordered[_P, _T]):
index: int

View File

@@ -0,0 +1,50 @@
from collections.abc import Callable, Sequence
from typing import Any, TypeVar
from gevent.events import IPeriodicMonitorThread, MemoryUsageThresholdExceeded, MemoryUsageUnderThreshold
from gevent.hub import Hub
from greenlet import greenlet
__all__ = ["PeriodicMonitoringThread"]
_T = TypeVar("_T")
# FIXME: While it would be nice to import Interface from zope.interface here so the
# mypy plugin will work correctly for the people that use it, it causes all
# sorts of issues to reference a module that is not stubbed in typeshed, so
# for now we punt and just define an alias for Interface and implementer we
# can get rid of later
def implementer(__interface: Any) -> Callable[[_T], _T]: ...
class MonitorWarning(RuntimeWarning): ...
class _MonitorEntry:
function: Callable[[Hub], object]
period: float
last_run_time: float
def __init__(self, function: Callable[[Hub], object], period: float) -> None: ...
def __eq__(self, other: object) -> bool: ...
def __hash__(self) -> int: ...
@implementer(IPeriodicMonitorThread)
class PeriodicMonitoringThread:
inactive_sleep_time: float
min_sleep_time: float
min_memory_monitor_period: float
should_run: bool
monitor_thread_ident: int
pid: int
def __init__(self, hub: Hub) -> None: ...
@property
def hub(self) -> Hub | None: ...
def monitoring_functions(self) -> list[_MonitorEntry]: ...
def add_monitoring_function(self, function: Callable[[Hub], object], period: float) -> None: ...
def calculate_sleep_time(self) -> float: ...
def kill(self) -> None: ...
def __call__(self) -> None: ...
def monitor_blocking(self, hub: Hub) -> tuple[greenlet, Sequence[str]]: ...
def ignore_current_greenlet_blocking(self) -> None: ...
def monitor_current_greenlet_blocking(self) -> None: ...
def can_monitor_memory_usage(self) -> bool: ...
def install_monitor_memory_usage(self) -> None: ...
def monitor_memory_usage(self, _hub: Hub) -> MemoryUsageThresholdExceeded | MemoryUsageUnderThreshold | None: ...

View File

@@ -0,0 +1,21 @@
from _thread import LockType, allocate_lock as Lock
from typing import Generic, NewType, TypeVar
__all__ = ["Lock", "Queue", "EmptyTimeout"]
_T = TypeVar("_T")
_Cookie = NewType("_Cookie", LockType)
class EmptyTimeout(Exception): ...
class Queue(Generic[_T]):
unfinished_tasks: int
def __init__(self) -> None: ...
def task_done(self) -> None: ...
def qsize(self) -> int: ...
def empty(self) -> bool: ...
def full(self) -> bool: ...
def put(self, item: _T) -> None: ...
def get(self, cookie: _Cookie, timeout: int = -1) -> _T: ...
def allocate_cookie(self) -> _Cookie: ...
def kill(self) -> None: ...

View File

@@ -0,0 +1,151 @@
import sys
from _typeshed import FileDescriptor, StrOrBytesPath
from collections.abc import Callable
from types import TracebackType
from typing import Any, Protocol, overload
from typing_extensions import Concatenate, Literal, ParamSpec, TypeAlias
_P = ParamSpec("_P")
# gevent uses zope.interface interanlly which does not work well with type checkers
# partially due to the odd call signatures without self and partially due to them
# behaving essentially like a Protocol, so a mypy plugin is necessary to type check
# them correctly, and then you still have to give up on some valuable features due
# to the missing self/cls argument.
# To ensure maximum compatibility with other type checkers and so we don't depend
# on mypy-zope we define an equivalent Protocol for each interface, which we will
# use on arguments in place of the interface
# it also looks like ILoop is possibly too strict, since there are additional
# properties and methods that are available on all event loops, so these have
# been added as well, instead of completely mirroring the internal interface
class _Loop(Protocol): # noqa: Y046
@property
def approx_timer_resolution(self) -> float: ...
@property
def default(self) -> bool: ...
@property
def iteration(self) -> int: ...
@property
def depth(self) -> int: ...
@property
def backend_int(self) -> int: ...
@property
def backend(self) -> str | int: ...
@property
def pendingcnt(self) -> int: ...
@property
def activecnt(self) -> int: ...
def handle_error(
self, context: object | None, type: type[BaseException] | None, value: BaseException | None, tb: TracebackType | None
) -> None: ...
def run(self, nowait: bool = False, once: bool = False) -> None: ...
def reinit(self) -> None: ...
def ref(self) -> None: ...
def unref(self) -> None: ...
def break_(self, how: int | None = ...) -> None: ...
def verify(self) -> None: ...
def now(self) -> float: ...
def update_now(self) -> None: ...
def destroy(self) -> None: ...
def io(self, fd: FileDescriptor, events: int, ref: bool = True, priority: int | None = None) -> _IoWatcher: ...
def closing_fd(self, fd: FileDescriptor) -> bool: ...
def timer(self, after: float, repeat: float = 0.0, ref: bool = True, priority: int | None = None) -> _TimerWatcher: ...
def signal(self, signum: int, ref: bool = True, priority: int | None = None) -> _Watcher: ...
def idle(self, ref: bool = True, priority: int | None = None) -> _Watcher: ...
def prepare(self, ref: bool = True, priority: int | None = None) -> _Watcher: ...
def check(self, ref: bool = True, priority: int | None = None) -> _Watcher: ...
if sys.platform != "win32":
def fork(self, ref: bool = True, priority: int | None = None) -> _Watcher: ...
def child(self, pid: int, trace: int = 0, ref: bool = True) -> _ChildWatcher: ...
def install_sigchld(self) -> None: ...
def async_(self, ref: bool = True, priority: int | None = None) -> _AsyncWatcher: ...
def stat(self, path: str, interval: float = 0.0, ref: bool = True, priority: bool | None = ...) -> _StatWatcher: ...
# These technically don't allow the functions arguments to be passed in as kwargs
# but there's no way to express that yet with ParamSpec, however, we would still like
# to verify that the arguments match
def run_callback(self, func: Callable[_P, Any], *args: _P.args, **_: _P.kwargs) -> _Callback: ...
def run_callback_threadsafe(self, func: Callable[_P, Any], *args: _P.args, **_: _P.kwargs) -> _Callback: ...
def fileno(self) -> FileDescriptor | None: ...
class _Watcher(Protocol):
# while IWatcher allows for kwargs the actual implementation does not...
def start(self, callback: Callable[_P, Any], *args: _P.args, **_: _P.kwargs) -> None: ...
def stop(self) -> None: ...
def close(self) -> None: ...
# this matches Intersection[_Watcher, TimerMixin]
class _TimerWatcher(_Watcher, Protocol):
# this has one specific allowed keyword argument, if it is given we don't try to check
# the passed in arguments, but if it isn't passed in, then we do.
@overload
def start(self, callback: Callable[..., Any], *args: Any, update: bool) -> None: ...
@overload
def start(self, callback: Callable[_P, Any], *args: _P.args, **_: _P.kwargs) -> None: ...
@overload
def again(self, callback: Callable[..., Any], *args: Any, update: bool) -> None: ...
@overload
def again(self, callback: Callable[_P, Any], *args: _P.args, **_: _P.kwargs) -> None: ...
# this matches Intersection[_Watcher, IoMixin]
class _IoWatcher(_Watcher, Protocol):
EVENT_MASK: int
# pass_events means the first argument of the callback needs to be an integer, but we can't
# type check the other passed in args in this case
@overload
def start(self, callback: Callable[Concatenate[int, _P], Any], *args: Any, pass_events: Literal[True]) -> None: ...
@overload
def start(self, callback: Callable[_P, Any], *args: _P.args, **_: _P.kwargs) -> None: ...
# this matches Intersection[_Watcher, ChildMixin]
class _ChildWatcher(_Watcher, Protocol):
@property
def pid(self) -> int: ...
@property
def rpid(self) -> int | None: ...
@property
def rstatus(self) -> int: ...
# this matches Intersection[_Watcher, AsyncMixin]
class _AsyncWatcher(_Watcher, Protocol):
def send(self) -> None: ...
def send_ignoring_arg(self, __ignored: object) -> None: ...
@property
def pending(self) -> bool: ...
# all implementations return something of this shape
class _StatResult(Protocol):
@property
def st_nlink(self) -> int: ...
# this matches Intersection[_Watcher, StatMixin]
class _StatWatcher(_Watcher, Protocol):
@property
def path(self) -> StrOrBytesPath: ...
@property
def attr(self) -> _StatResult | None: ...
@property
def prev(self) -> _StatResult | None: ...
@property
def interval(self) -> float: ...
class _Callback(Protocol):
pending: bool
def stop(self) -> None: ...
def close(self) -> None: ...
_FullSockAddr: TypeAlias = tuple[str, int, int, int] # host, port, flowinfo, scopeid
_SockAddr: TypeAlias = _FullSockAddr | tuple[str, int]
_AddrinfoResult: TypeAlias = list[tuple[int, int, int, str, _SockAddr]] # family, type, protocol, cname, sockaddr
_NameinfoResult: TypeAlias = tuple[str, str]
class _Resolver(Protocol): # noqa: Y046
def close(self) -> None: ...
def gethostbyname(self, hostname: str, family: int = 2) -> str: ...
def gethostbyname_ex(self, hostname: str, family: int = 2) -> tuple[str, list[str], list[str]]: ...
def getaddrinfo(
self, host: str, port: int, family: int = 0, socktype: int = 0, proto: int = 0, flags: int = 0
) -> _AddrinfoResult: ...
def gethostbyaddr(self, ip_address: str) -> tuple[str, list[str], list[str]]: ...
def getnameinfo(self, sockaddr: _SockAddr, flags: int) -> _NameinfoResult: ...

View File

@@ -0,0 +1,53 @@
from _typeshed import Incomplete
from collections.abc import Callable, MutableMapping, Sequence
from types import ModuleType
from typing import Any, Generic, TypeVar, overload
from typing_extensions import Self
_T = TypeVar("_T")
WRAPPER_ASSIGNMENTS: tuple[str, ...]
WRAPPER_UPDATES: tuple[str, ...]
def update_wrapper(wrapper: _T, wrapped: object, assigned: Sequence[str] = ..., updated: Sequence[str] = ...) -> _T: ...
def copy_globals(
source: ModuleType,
globs: MutableMapping[str, Any],
only_names: Incomplete | None = None,
ignore_missing_names: bool = False,
names_to_ignore: Sequence[str] = ...,
dunder_names_to_keep: Sequence[str] = ...,
cleanup_globs: bool = True,
) -> list[str]: ...
def import_c_accel(globs: MutableMapping[str, Any], cname: str) -> None: ...
class Lazy(Generic[_T]):
data: _T
def __init__(self, func: Callable[[Any], _T]) -> None: ...
@overload
def __get__(self, inst: None, class_: type[object]) -> Self: ...
@overload
def __get__(self, inst: object, class_: type[object]) -> _T: ...
class readproperty(Generic[_T]):
func: Callable[[Any], _T]
def __init__(self: readproperty[_T], func: Callable[[Any], _T]) -> None: ...
@overload
def __get__(self, inst: None, class_: type[object]) -> Self: ...
@overload
def __get__(self, inst: object, class_: type[object]) -> _T: ...
class LazyOnClass(Generic[_T]):
@classmethod
def lazy(cls, cls_dict: MutableMapping[str, Any], func: Callable[[Any], _T]) -> None: ...
name: str
func: Callable[[Any], _T]
def __init__(self, func: Callable[[Any], _T], name: str | None = None) -> None: ...
@overload
def __get__(self, inst: None, class_: type[object]) -> Self: ...
@overload
def __get__(self, inst: object, class_: type[object]) -> _T: ...
def gmctime() -> str: ...
def prereleaser_middle(data: MutableMapping[str, Any]) -> None: ...
def postreleaser_before(data: MutableMapping[str, Any]) -> None: ...

View File

@@ -0,0 +1,47 @@
from types import TracebackType
from typing import Generic, TypeVar, overload
from typing_extensions import TypeAlias, final
from gevent.event import _ValueSource
from gevent.hub import Hub
from greenlet import greenlet as greenlet_t
__all__ = ["Waiter"]
_T = TypeVar("_T")
# this is annoying, it's due to them using *throw args, rather than just storing them in standardized form
_ThrowArgs: TypeAlias = (
tuple[()]
| tuple[BaseException]
| tuple[BaseException, None]
| tuple[BaseException, None, TracebackType | None]
| tuple[type[BaseException]]
| tuple[type[BaseException], BaseException | object]
| tuple[type[BaseException], BaseException | object, TracebackType | None]
)
class Waiter(Generic[_T]):
@property
def hub(self) -> Hub: ... # readonly in Cython
@property
def greenlet(self) -> greenlet_t | None: ... # readonly in Cython
@property
def value(self) -> _T | None: ... # readonly in Cython
def __init__(self, hub: Hub | None = None) -> None: ...
def clear(self) -> None: ...
def ready(self) -> bool: ...
def successful(self) -> bool: ...
@property
def exc_info(self) -> _ThrowArgs | None: ...
def switch(self, value: _T) -> None: ...
@overload
def throw(
self, __typ: type[BaseException], __val: BaseException | object = None, __tb: TracebackType | None = None
) -> None: ...
@overload
def throw(self, __typ: BaseException = ..., __val: None = None, __tb: TracebackType | None = None) -> None: ...
def get(self) -> _T: ...
def __call__(self, source: _ValueSource[_T]) -> None: ...
@final
class MultipleWaiter(Waiter[_T]): ...

View File

@@ -0,0 +1 @@
from gevent.resolver.cares import *

View File

@@ -0,0 +1,45 @@
from _typeshed import StrOrBytesPath
from typing import Any, overload
from gevent.baseserver import _Spawner
from gevent.server import StreamServer, _Address
from gevent.socket import socket as _GeventSocket
from gevent.ssl import SSLContext
class BackdoorServer(StreamServer):
locals: dict[str, Any]
banner: str | None
@overload
def __init__(
self,
listener: _GeventSocket | tuple[str, int] | str,
locals: dict[str, Any] | None = None,
banner: str | None = None,
*,
backlog: int | None = None,
spawn: _Spawner = "default",
ssl_context: SSLContext,
server_side: bool = True,
do_handshake_on_connect: bool = True,
suppress_ragged_eofs: bool = True,
) -> None: ...
@overload
def __init__(
self,
listener: _GeventSocket | tuple[str, int] | str,
locals: dict[str, Any] | None = None,
banner: str | None = None,
*,
backlog: int | None = None,
spawn: _Spawner = "default",
keyfile: StrOrBytesPath = ...,
certfile: StrOrBytesPath = ...,
server_side: bool = True,
cert_reqs: int = ...,
ssl_version: int = ...,
ca_certs: str = ...,
do_handshake_on_connect: bool = True,
suppress_ragged_eofs: bool = True,
ciphers: str = ...,
) -> None: ...
def handle(self, conn: _GeventSocket, _address: _Address) -> None: ...

View File

@@ -0,0 +1,65 @@
from collections.abc import Callable, Container
from types import TracebackType
from typing import Any, Generic, Protocol
from typing_extensions import Literal, ParamSpec, Self, TypeAlias
from gevent._types import _Loop
from gevent.pool import Pool
from gevent.socket import socket as _GeventSocket
from greenlet import greenlet
_P = ParamSpec("_P")
class _SpawnFunc(Protocol):
def __call__(self, __func: Callable[_P, object], *args: _P.args, **kwargs: _P.kwargs) -> greenlet: ...
_Spawner: TypeAlias = Pool | _SpawnFunc | int | Literal["default"] | None
class BaseServer(Generic[_P]):
min_delay: float
max_delay: float
max_accept: int
stop_timeout: float
fatal_errors: Container[int]
pool: Pool | None
delay: float
loop: _Loop
family: int
address: str | tuple[str, int]
socket: _GeventSocket
handle: Callable[..., object]
def __init__(
self,
listener: _GeventSocket | tuple[str, int] | str,
handle: Callable[_P, object] | None = None,
spawn: _Spawner = "default",
) -> None: ...
def __enter__(self) -> Self: ...
def __exit__(self, __typ: type[BaseException] | None, __value: BaseException | None, __tb: TracebackType | None) -> None: ...
def set_listener(self, listener: _GeventSocket | tuple[str, int] | str) -> None: ...
def set_spawn(self, spawn: _Spawner) -> None: ...
def set_handle(self, handle: Callable[_P, object]) -> None: ...
def start_accepting(self) -> None: ...
def stop_accepting(self) -> None: ...
# neither of these accept keyword arguments, but if we omit them, then ParamSpec
# won't match the arguments correctly
def do_handle(self, *args: _P.args, **_: _P.kwargs) -> None: ...
def do_close(self, *args: _P.args, **_: _P.kwargs) -> None: ...
# we would like to return _P.args here, however pyright will complain
# mypy doesn't seem to mind
def do_read(self) -> tuple[Any, ...] | None: ...
def full(self) -> bool: ...
@property
def server_host(self) -> str | None: ...
@property
def server_port(self) -> int | None: ...
def init_socket(self) -> None: ...
@property
def started(self) -> bool: ...
def start(self) -> None: ...
def close(self) -> None: ...
@property
def closed(self) -> bool: ...
def stop(self, timeout: float | None = None) -> None: ...
def serve_forever(self, stop_timeout: float | None = None) -> None: ...
def is_fatal_error(self, ex: BaseException) -> bool: ...

View File

@@ -0,0 +1,63 @@
from types import TracebackType
from typing import Generic, Protocol, TypeVar, overload
from typing_extensions import Literal, TypeAlias
from gevent._abstract_linkable import AbstractLinkable
_T = TypeVar("_T")
_T_co = TypeVar("_T_co", covariant=True)
# gevent generally allows the tracebock to be omitted, it can also fail to serialize
# in which case it will be None as well.
_ExcInfo: TypeAlias = tuple[type[BaseException], BaseException, TracebackType | None]
_OptExcInfo: TypeAlias = _ExcInfo | tuple[None, None, None]
class _ValueSource(Protocol[_T_co]):
def successful(self) -> bool: ...
@property
def value(self) -> _T_co | None: ...
@property
def exception(self) -> BaseException | None: ...
class Event(AbstractLinkable):
def __init__(self) -> None: ...
def is_set(self) -> bool: ...
def isSet(self) -> bool: ...
def ready(self) -> bool: ...
def set(self) -> None: ...
def clear(self) -> None: ...
@overload
def wait(self, timeout: None = None) -> Literal[True]: ...
@overload
def wait(self, timeout: float) -> bool: ...
class AsyncResult(AbstractLinkable, Generic[_T]):
def __init__(self) -> None: ...
@property
def value(self) -> _T | None: ...
@property
def exc_info(self) -> _OptExcInfo | tuple[None, None, None] | tuple[()]: ...
@property
def exception(self) -> BaseException | None: ...
def ready(self) -> bool: ...
def successful(self) -> bool: ...
def set(self, value: _T | None = None) -> None: ...
@overload
def set_exception(self, exception: BaseException, exc_info: None = None) -> None: ...
@overload
def set_exception(self, exception: BaseException | None, exc_info: _OptExcInfo) -> None: ...
# technically get/get_nowait/result should just return _T, but the API is designed in
# such a way that it is perfectly legal for a ValueSource to have neither its value nor
# its exception set, while still being marked successful, at which point None would be
# stored into value, it's also legal to call set without arguments, which has the same
# effect, this is a little annoying, since it will introduce some additional None checks
# that may not be necessary, but it's impossible to annotate this situation, so for now
# we just deal with the possibly redundant None checks...
def get(self, block: bool = True, timeout: float | None = None) -> _T | None: ...
def get_nowait(self) -> _T | None: ...
def wait(self, timeout: float | None = None) -> _T | None: ...
def __call__(self, source: _ValueSource[_T]) -> None: ...
def result(self, timeout: float | None = None) -> _T | None: ...
set_result = set
def done(self) -> bool: ...
def cancel(self) -> Literal[False]: ...
def cancelled(self) -> Literal[False]: ...

View File

@@ -0,0 +1,163 @@
import sys
from collections.abc import Callable, Mapping, Sequence
from types import ModuleType
from typing import Any, Protocol, TypeVar
from typing_extensions import TypeAlias
from gevent.hub import Hub
from greenlet import greenlet as greenlet_t
_T = TypeVar("_T")
# FIXME: While it would be nice to import Interface from zope.interface here so the
# mypy plugin will work correctly for the people that use it, it causes all
# sorts of issues to reference a module that is not stubbed in typeshed, so
# for now we punt and just define an alias for Interface and implementer we
# can get rid of later
Interface: TypeAlias = Any
def implementer(__interface: Interface) -> Callable[[_T], _T]: ...
# this is copied from types-psutil, it would be nice if we could just import this
# but it doesn't seem like we can...
if sys.platform == "linux":
from psutil._pslinux import pmem
elif sys.platform == "darwin":
from psutil._psosx import pmem
elif sys.platform == "win32":
from psutil._pswindows import pmem
else:
class pmem(Any): ...
subscribers: list[Callable[[Any], object]]
class _PeriodicMonitorThread(Protocol):
def add_monitoring_function(self, function: Callable[[Hub], object], period: float | None) -> object: ...
class IPeriodicMonitorThread(Interface):
def add_monitoring_function(function: Callable[[Hub], object], period: float | None) -> object: ...
class IPeriodicMonitorThreadStartedEvent(Interface):
monitor: IPeriodicMonitorThread
@implementer(IPeriodicMonitorThread)
class PeriodicMonitorThreadStartedEvent:
ENTRY_POINT_NAME: str
monitor: _PeriodicMonitorThread
def __init__(self, monitor: _PeriodicMonitorThread) -> None: ...
class IEventLoopBlocked(Interface):
greenlet: greenlet_t
blocking_time: float
info: Sequence[str]
@implementer(IEventLoopBlocked)
class EventLoopBlocked:
greenlet: greenlet_t
blocking_time: float
info: Sequence[str]
def __init__(self, greenlet: greenlet_t, blocking_time: float, info: Sequence[str]) -> None: ...
class IMemoryUsageThresholdExceeded(Interface):
mem_usage: int
max_allowed: int
memory_info: pmem
class _AbstractMemoryEvent:
mem_usage: int
max_allowed: int
memory_info: pmem
def __init__(self, mem_usage: int, max_allowed: int, memory_info: pmem) -> None: ...
@implementer(IMemoryUsageThresholdExceeded)
class MemoryUsageThresholdExceeded(_AbstractMemoryEvent): ...
class IMemoryUsageUnderThreshold(Interface):
mem_usage: int
max_allowed: int
max_memory_usage: int
memory_info: pmem
@implementer(IMemoryUsageUnderThreshold)
class MemoryUsageUnderThreshold(_AbstractMemoryEvent):
max_memory_usage: int
def __init__(self, mem_usage: int, max_allowed: int, memory_info: pmem, max_usage: int) -> None: ...
class IGeventPatchEvent(Interface):
source: object
target: object
@implementer(IGeventPatchEvent)
class GeventPatchEvent:
source: object
target: object
def __init__(self, source: object, target: object) -> None: ...
class IGeventWillPatchEvent(IGeventPatchEvent): ...
class DoNotPatch(BaseException): ...
@implementer(IGeventWillPatchEvent)
class GeventWillPatchEvent(GeventPatchEvent): ...
class IGeventDidPatchEvent(IGeventPatchEvent): ...
@implementer(IGeventWillPatchEvent)
class GeventDidPatchEvent(GeventPatchEvent): ...
class IGeventWillPatchModuleEvent(IGeventWillPatchEvent):
source: ModuleType
target: ModuleType
module_name: str
target_item_names: list[str]
@implementer(IGeventWillPatchModuleEvent)
class GeventWillPatchModuleEvent(GeventWillPatchEvent):
ENTRY_POINT_NAME: str
source: ModuleType
target: ModuleType
module_name: str
target_item_names: list[str]
def __init__(self, module_name: str, source: ModuleType, target: ModuleType, items: list[str]) -> None: ...
class IGeventDidPatchModuleEvent(IGeventDidPatchEvent):
source: ModuleType
target: ModuleType
module_name: str
@implementer(IGeventDidPatchModuleEvent)
class GeventDidPatchModuleEvent(GeventDidPatchEvent):
ENTRY_POINT_NAME: str
source: ModuleType
target: ModuleType
module_name: str
def __init__(self, module_name: str, source: ModuleType, target: ModuleType) -> None: ...
class IGeventWillPatchAllEvent(IGeventWillPatchEvent):
patch_all_arguments: Mapping[str, Any]
patch_all_kwargs: Mapping[str, Any]
def will_patch_module(module_name: str) -> bool: ...
class _PatchAllMixin:
def __init__(self, patch_all_arguments: Mapping[str, Any], patch_all_kwargs: Mapping[str, Any]) -> None: ...
@property
def patch_all_arguments(self) -> dict[str, Any]: ... # safe to mutate, it's a copy
@property
def patch_all_kwargs(self) -> dict[str, Any]: ... # safe to mutate, it's a copy
@implementer(IGeventWillPatchAllEvent)
class GeventWillPatchAllEvent(_PatchAllMixin, GeventWillPatchEvent):
ENTRY_POINT_NAME: str
def will_patch_module(self, module_name: str) -> bool: ...
class IGeventDidPatchBuiltinModulesEvent(IGeventDidPatchEvent):
patch_all_arguments: Mapping[str, Any]
patch_all_kwargs: Mapping[str, Any]
@implementer(IGeventDidPatchBuiltinModulesEvent)
class GeventDidPatchBuiltinModulesEvent(_PatchAllMixin, GeventDidPatchEvent):
ENTRY_POINT_NAME: str
class IGeventDidPatchAllEvent(IGeventDidPatchEvent): ...
@implementer(IGeventDidPatchAllEvent)
class GeventDidPatchAllEvent(_PatchAllMixin, GeventDidPatchEvent):
ENTRY_POINT_NAME: str

View File

@@ -0,0 +1,15 @@
from gevent.hub import Hub
from greenlet import GreenletExit
class LoopExit(Exception):
@property
def hub(self) -> Hub | None: ...
class BlockingSwitchOutError(AssertionError): ...
class InvalidSwitchError(AssertionError): ...
class ConcurrentObjectUseError(AssertionError): ...
class InvalidThreadUseError(RuntimeError): ...
class HubDestroyed(GreenletExit):
destroy_loop: bool
def __init__(self, destroy_loop: bool) -> None: ...

View File

@@ -0,0 +1,156 @@
import sys
from typing import Any
from typing_extensions import TypeAlias
from gevent._fileobjectcommon import FileObjectBlock as FileObjectBlock, FileObjectThread as FileObjectThread
if sys.platform != "win32":
import io
from _typeshed import (
FileDescriptorOrPath,
OpenBinaryMode,
OpenBinaryModeReading,
OpenBinaryModeUpdating,
OpenBinaryModeWriting,
OpenTextMode,
)
from typing import IO, AnyStr, overload
from typing_extensions import Literal
from gevent._fileobjectcommon import _IOT, FileObjectBase
# this is implemented in _fileobjectposix and technically uses an undocumented subclass
# of RawIOBase, but the interface is the same, so it doesn't seem worth it to add
# annotations for it. _fileobjectcommon was barely worth it due to the common base class
# of all three FileObject types
class FileObjectPosix(FileObjectBase[_IOT, AnyStr]):
default_bufsize = io.DEFAULT_BUFFER_SIZE
fileio: io.RawIOBase
# Text mode: always binds a TextIOWrapper
@overload
def __init__(
self: FileObjectPosix[io.TextIOWrapper, str],
fobj: FileDescriptorOrPath,
mode: OpenTextMode = "r",
bufsize: int | None = None,
close: bool | None = None,
encoding: str | None = None,
errors: str | None = None,
newline: str | None = None,
buffering: int | None = None,
closefd: bool | None = None,
atomic_write: bool = False,
) -> None: ...
# Unbuffered binary mode: binds a FileIO
@overload
def __init__(
self: FileObjectPosix[io.FileIO, bytes],
fobj: FileDescriptorOrPath,
mode: OpenBinaryMode,
bufsize: Literal[0],
close: bool | None = None,
encoding: str | None = None,
errors: str | None = None,
newline: str | None = None,
buffering: Literal[0] | None = None,
closefd: bool | None = None,
atomic_write: bool = False,
) -> None: ...
@overload
def __init__(
self: FileObjectPosix[io.FileIO, bytes],
fobj: FileDescriptorOrPath,
mode: OpenBinaryMode,
bufsize: Literal[0] | None = None,
close: bool | None = None,
encoding: str | None = None,
errors: str | None = None,
newline: str | None = None,
*,
buffering: Literal[0],
closefd: bool | None = None,
atomic_write: bool = False,
) -> None: ...
# Buffering is on: return BufferedRandom, BufferedReader, or BufferedWriter
@overload
def __init__(
self: FileObjectPosix[io.BufferedRandom, bytes],
fobj: FileDescriptorOrPath,
mode: OpenBinaryModeUpdating,
bufsize: Literal[-1, 1] | None = None,
close: bool | None = None,
encoding: str | None = None,
errors: str | None = None,
newline: str | None = None,
buffering: Literal[-1, 1] | None = None,
closefd: bool | None = None,
atomic_write: bool = False,
) -> None: ...
@overload
def __init__(
self: FileObjectPosix[io.BufferedWriter, bytes],
fobj: FileDescriptorOrPath,
mode: OpenBinaryModeWriting,
bufsize: Literal[-1, 1] | None = None,
close: bool | None = None,
encoding: str | None = None,
errors: str | None = None,
newline: str | None = None,
buffering: Literal[-1, 1] | None = None,
closefd: bool | None = None,
atomic_write: bool = False,
) -> None: ...
@overload
def __init__(
self: FileObjectPosix[io.BufferedReader, bytes],
fobj: FileDescriptorOrPath,
mode: OpenBinaryModeReading,
bufsize: Literal[-1, 1] | None = None,
close: bool | None = None,
encoding: str | None = None,
errors: str | None = None,
newline: str | None = None,
buffering: Literal[-1, 1] | None = None,
closefd: bool | None = None,
atomic_write: bool = False,
) -> None: ...
# Buffering cannot be determined: fall back to BinaryIO
@overload
def __init__(
self: FileObjectPosix[IO[bytes], bytes],
fobj: FileDescriptorOrPath,
mode: OpenBinaryMode,
bufsize: int | None = None,
close: bool | None = None,
encoding: str | None = None,
errors: str | None = None,
newline: str | None = None,
buffering: int | None = None,
closefd: bool | None = None,
atomic_write: bool = False,
) -> None: ...
# Fallback if mode is not specified
@overload
def __init__(
self: FileObjectPosix[IO[Any], Any],
fobj: FileDescriptorOrPath,
mode: str,
bufsize: int | None = None,
close: bool | None = None,
encoding: str | None = None,
errors: str | None = None,
newline: str | None = None,
buffering: int | None = None,
closefd: bool | None = None,
atomic_write: bool = False,
) -> None: ...
_FileObjectType: TypeAlias = type[FileObjectPosix[Any, Any] | FileObjectBlock[Any, Any] | FileObjectThread[Any, Any]]
else:
_FileObjectType: TypeAlias = type[FileObjectBlock[Any, Any] | FileObjectThread[Any, Any]]
FileObject: _FileObjectType

View File

@@ -0,0 +1,93 @@
import weakref
from collections.abc import Callable, Iterable, Sequence
from types import FrameType, TracebackType
from typing import Any, ClassVar, Generic, TypeVar, overload
from typing_extensions import ParamSpec, Self
import greenlet
from gevent._types import _Loop
from gevent._util import readproperty
_T = TypeVar("_T")
_G = TypeVar("_G", bound=greenlet.greenlet)
_P = ParamSpec("_P")
class Greenlet(greenlet.greenlet, Generic[_P, _T]):
# we can't use _P.args/_P.kwargs here because pyright will complain
# mypy doesn't seem to mind though
args: tuple[Any, ...]
kwargs: dict[str, Any]
value: _T | None
@overload
def __init__(self: Greenlet[_P, _T], run: Callable[_P, _T], *args: _P.args, **kwargs: _P.kwargs) -> None: ...
@overload
def __init__(self: Greenlet[[], None]) -> None: ...
@readproperty
def name(self) -> str: ...
@property
def minimal_ident(self) -> int: ...
@property
def loop(self) -> _Loop: ...
@property
def dead(self) -> bool: ...
@property
def started(self) -> bool: ...
@property
def exception(self) -> BaseException | None: ...
@property
def exc_info(self) -> tuple[type[BaseException], BaseException, TracebackType | None] | None: ...
@staticmethod
def add_spawn_callback(callback: Callable[[Greenlet[..., Any]], object]) -> None: ...
@staticmethod
def remove_spawn_callback(callback: Callable[[Greenlet[..., Any]], object]) -> None: ...
def get(self, block: bool = True, timeout: float | None = None) -> _T: ...
def has_links(self) -> bool: ...
def join(self, timeout: float | None = None) -> None: ...
def kill(
self, exception: type[BaseException] | BaseException = ..., block: bool = True, timeout: float | None = None
) -> None: ...
def link(self, callback: Callable[[Self], object]) -> None: ...
def link_exception(self, callback: Callable[[Self], object]) -> None: ...
def link_value(self, callback: Callable[[Self], object]) -> None: ...
def rawlink(self, callback: Callable[[Self], object]) -> None: ...
def unlink(self, callback: Callable[[Self], Any]) -> None: ...
def unlink_all(self) -> None: ...
def ready(self) -> bool: ...
def run(self) -> Any: ...
@overload
@classmethod
def spawn(cls, __run: Callable[_P, _T], *args: _P.args, **kwargs: _P.kwargs) -> Self: ...
@overload
@classmethod
def spawn(cls) -> Greenlet[[], None]: ...
@overload
@classmethod
def spawn_later(cls, seconds: float, run: Callable[_P, _T], *args: _P.args, **kwargs: _P.kwargs) -> Self: ...
@overload
@classmethod
def spawn_later(cls, seconds: float) -> Greenlet[[], None]: ...
def start(self) -> None: ...
def start_later(self, seconds: float) -> None: ...
def successful(self) -> bool: ...
def __bool__(self) -> bool: ...
def __enter__(self) -> Self: ...
def __exit__(self, t: type[BaseException] | None, v: BaseException | None, tb: TracebackType | None) -> None: ...
# since these are for instrumentation which is disabled by default, we could
# consider just not annotating them...
spawning_stack_limit: ClassVar[int]
spawn_tree_locals: dict[str, Any] | None
spawning_greenlet: weakref.ref[greenlet.greenlet] | None
# not quite accurate, since it may be an internal dummy type instead
# but since it has all the same fields as FrameType we shouldn't care
spawning_stack: FrameType | None
def joinall(
greenlets: Sequence[_G], timeout: float | None = None, raise_error: bool = False, count: int | None = None
) -> list[_G]: ...
def killall(
greenlets: Iterable[greenlet.greenlet],
exception: type[BaseException] | BaseException = ...,
block: bool = True,
timeout: float | None = None,
) -> None: ...

110
stubs/gevent/gevent/hub.pyi Normal file
View File

@@ -0,0 +1,110 @@
from collections.abc import Callable
from types import TracebackType
from typing import Any, Generic, Protocol, TextIO, TypeVar, overload
from typing_extensions import ParamSpec
import gevent._hub_local
import gevent._waiter
import greenlet
from gevent._hub_primitives import WaitOperationsGreenlet
from gevent._ident import IdentRegistry
from gevent._monitor import PeriodicMonitoringThread
from gevent._types import _Loop, _Watcher
from gevent._util import Lazy, readproperty
from gevent.greenlet import Greenlet
from gevent.resolver import AbstractResolver
from gevent.threadpool import ThreadPool
_T = TypeVar("_T")
_P = ParamSpec("_P")
GreenletExit = greenlet.GreenletExit
getcurrent = greenlet.getcurrent
get_hub = gevent._hub_local.get_hub
Waiter = gevent._waiter.Waiter
class _DefaultReturnProperty(Protocol[_T]):
@overload
def __get__(self, obj: None, owner: type[object] | None = None) -> property: ...
@overload
def __get__(self, obj: object, owner: type[object] | None = None) -> _T: ...
def __set__(self, obj: object, value: _T | None) -> None: ...
def __del__(self, obj: object) -> None: ...
def spawn_raw(function: Callable[..., object], *args: object, **kwargs: object) -> greenlet.greenlet: ...
def sleep(seconds: float = 0, ref: bool = True) -> None: ...
def idle(priority: int = 0) -> None: ...
def kill(greenlet: greenlet.greenlet, exception: type[BaseException] | BaseException = ...) -> None: ...
class signal(Generic[_P]):
greenlet_class: type[Greenlet[..., Any]] | None
hub: Hub
watcher: _Watcher
handler: Callable[_P, object]
# we can't use _P.args/_P.kwargs here because pyright will complain
# mypy doesn't seem to mind though
args: tuple[Any, ...]
kwargs: dict[str, Any]
def __init__(self, signalnum: int, handler: Callable[_P, object], *args: _P.args, **kwargs: _P.kwargs) -> None: ...
@property
def ref(self) -> bool: ...
@ref.setter
def ref(self, value: bool) -> None: ...
def cancel(self) -> None: ...
def handle(self) -> None: ...
def reinit(hub: Hub | None = None) -> None: ...
class Hub(WaitOperationsGreenlet):
SYSTEM_ERROR: tuple[type[BaseException], ...]
NOT_ERROR: tuple[type[BaseException], ...]
threadpool_size: int
periodic_monitoring_thread: PeriodicMonitoringThread | None
thread_ident: int
name: str
loop: _Loop
format_context: Callable[[object], str]
minimal_ident: int
@overload
def __init__(self, loop: _Loop, default: None = None) -> None: ...
@overload
def __init__(self, loop: None = None, default: bool | None = None) -> None: ...
@Lazy
def ident_registry(self) -> IdentRegistry: ...
@property
def loop_class(self) -> type[_Loop]: ...
@property
def backend(self) -> int | str: ...
@property
def main_hub(self) -> bool: ...
def handle_error(
self,
context: object | None,
type: type[BaseException] | None,
value: BaseException | str | None,
tb: TracebackType | None,
) -> None: ...
def handle_system_error(
self, type: type[BaseException], value: BaseException | None, tb: TracebackType | None = None
) -> None: ...
@readproperty
def exception_stream(self) -> TextIO | None: ...
def print_exception(
self, context: object | None, t: type[BaseException] | None, v: BaseException | str | None, tb: TracebackType | None
) -> None: ...
def run(self) -> None: ...
def start_periodic_monitoring_thread(self) -> PeriodicMonitoringThread: ...
def join(self, timeout: float | None = None) -> bool: ...
def destroy(self, destroy_loop: bool | None = None) -> None: ...
@property
def resolver_class(self) -> type[AbstractResolver]: ...
resolver: _DefaultReturnProperty[AbstractResolver]
@property
def threadpool_class(self) -> type[ThreadPool]: ...
threadpool: _DefaultReturnProperty[ThreadPool]
class linkproxy:
callback: Callable[[object], object]
obj: object
def __init__(self, callback: Callable[[_T], object], obj: _T) -> None: ...
def __call__(self, *args: object) -> None: ...

View File

View File

@@ -0,0 +1,92 @@
import sys
from _typeshed import FileDescriptor
from collections.abc import Callable, Sequence
from types import TracebackType
from typing import Any
from typing_extensions import ParamSpec
import gevent.libev.watcher as watcher
from gevent._ffi.loop import _ErrorHandler
from gevent._types import _Callback
# this c extension is only available on posix
if sys.platform != "win32":
_P = ParamSpec("_P")
def get_version() -> str: ...
def get_header_version() -> str: ...
# the final item in the list could be an integer if one of the backends did not have a string mapping
def embeddable_backends() -> list[str | int]: ...
def recommended_backends() -> list[str | int]: ...
def supported_backends() -> list[str | int]: ...
def time() -> float: ...
class loop:
starting_timer_may_update_loop_time: bool
error_handler: _ErrorHandler
@property
def approx_timer_resolution(self) -> float: ... # readonly in Cython
def __init__(self, flags: Sequence[str] | str | int | None = None, default: bool | None = None, ptr: int = 0) -> None: ...
def destroy(self) -> None: ...
@property
def ptr(self) -> int: ...
@property
def WatcherType(self) -> type[watcher.watcher]: ...
@property
def MAXPRI(self) -> int: ...
@property
def MINPRI(self) -> int: ...
def handle_error(
self, context: object | None, type: type[BaseException] | None, value: BaseException | None, tb: TracebackType | None
) -> None: ...
def run(self, nowait: bool = False, once: bool = False) -> None: ...
def reinit(self) -> None: ...
def ref(self) -> None: ...
def unref(self) -> None: ...
def break_(self, how: int = ...) -> None: ...
def verify(self) -> None: ...
def now(self) -> float: ...
def update_now(self) -> None: ...
update = update_now # deprecated
@property
def default(self) -> bool: ...
@property
def iteration(self) -> int: ...
@property
def depth(self) -> int: ...
@property
def backend_int(self) -> int: ...
@property
def backend(self) -> str | int: ...
@property
def pendingcnt(self) -> int: ...
def io(self, fd: FileDescriptor, events: int, ref: bool = True, priority: int | None = None) -> watcher.io: ...
def closing_fd(self, fd: FileDescriptor) -> bool: ...
def timer(self, after: float, repeat: float = 0.0, ref: bool = True, priority: int | None = None) -> watcher.timer: ...
def signal(self, signum: int, ref: bool = True, priority: int | None = None) -> watcher.signal: ...
def idle(self, ref: bool = True, priority: int | None = None) -> watcher.idle: ...
def prepare(self, ref: bool = True, priority: int | None = None) -> watcher.prepare: ...
def check(self, ref: bool = True, priority: int | None = None) -> watcher.check: ...
def fork(self, ref: bool = True, priority: int | None = None) -> watcher.fork: ...
def async_(self, ref: bool = True, priority: int | None = None) -> watcher.async_: ...
def child(self, pid: int, trace: int = 0, ref: bool = True) -> watcher.child: ...
def install_sigchld(self) -> None: ...
def reset_sigchld(self) -> None: ...
def stat(self, path: str, interval: float = 0.0, ref: bool = True, priority: bool | None = None) -> watcher.stat: ...
# These technically don't allow the functions arguments to be passed in as kwargs
# but there's no way to express that yet with ParamSpec, however, we would still like
# to verify that the arguments match
def run_callback(self, func: Callable[_P, Any], *args: _P.args, **_: _P.kwargs) -> _Callback: ...
def run_callback_threadsafe(self, func: Callable[_P, Any], *args: _P.args, **_: _P.kwargs) -> _Callback: ...
def fileno(self) -> FileDescriptor | None: ...
@property
def activecnt(self) -> int: ...
@property
def sig_pending(self) -> int: ...
# the final item in the list could be a integer if some of the flags don't a string mapping
@property
def origflags(self) -> list[str | int]: ...
@property
def origflags_int(self) -> int: ...
@property
def sigfd(self) -> FileDescriptor: ...

View File

@@ -0,0 +1,36 @@
import sys
from _typeshed import FileDescriptor
from collections.abc import Sequence
import gevent.libev.watcher as watcher
from gevent._ffi.loop import AbstractLoop
def get_version() -> str: ...
def get_header_version() -> str: ...
def supported_backends() -> list[str | int]: ...
def recommended_backends() -> list[str | int]: ...
def embeddable_backends() -> list[str | int]: ...
def time() -> float: ...
class loop(AbstractLoop):
approx_timer_resolution: float
error_handler: None
@property
def MAXPRI(self) -> int: ...
@property
def MINPRI(self) -> int: ...
def __init__(self, flags: Sequence[str] | str | int | None = None, default: bool | None = None) -> None: ...
def io(self, fd: FileDescriptor, events: int, ref: bool = True, priority: int | None = None) -> watcher.io: ...
def closing_fd(self, fd: FileDescriptor) -> bool: ...
def timer(self, after: float, repeat: float = 0.0, ref: bool = True, priority: int | None = None) -> watcher.timer: ...
def signal(self, signum: int, ref: bool = True, priority: int | None = None) -> watcher.signal: ...
def idle(self, ref: bool = True, priority: int | None = None) -> watcher.idle: ...
def prepare(self, ref: bool = True, priority: int | None = None) -> watcher.prepare: ...
def check(self, ref: bool = True, priority: int | None = None) -> watcher.check: ...
def async_(self, ref: bool = True, priority: int | None = None) -> watcher.async_: ...
if sys.platform != "win32":
def fork(self, ref: bool = True, priority: int | None = None) -> watcher.fork: ...
def child(self, pid: int, trace: int = 0, ref: bool = True) -> watcher.child: ...
def reset_sigchld(self) -> None: ...
def stat(self, path: str, interval: float = 0.0, ref: bool = True, priority: bool | None = None) -> watcher.stat: ...

View File

@@ -0,0 +1,65 @@
import sys
from _typeshed import FileDescriptor
from collections.abc import Callable
from typing_extensions import ParamSpec, TypeAlias
from gevent._ffi import watcher as _base
from gevent.libev.corecffi import loop as cffi_loop
__all__: list[str] = []
if sys.platform != "win32":
from gevent.libev.corecext import loop as cext_loop
_Loop: TypeAlias = cffi_loop | cext_loop
else:
_Loop: TypeAlias = cffi_loop
_P = ParamSpec("_P")
class watcher(_base.watcher):
def __init__(self, _loop: _Loop, ref: bool = True, priority: int | None = None) -> None: ...
@property
def ref(self) -> bool: ...
@ref.setter
def ref(self, value: bool) -> None: ...
# does not accept keyword arguments
def feed(self, revents: int, callback: Callable[_P, object], *args: _P.args, **_: _P.kwargs) -> None: ...
class io(_base.IoMixin, watcher):
EVENT_MASK: int
@property
def fd(self) -> FileDescriptor: ...
@fd.setter
def fd(self, value: FileDescriptor) -> None: ...
@property
def events(self) -> int: ...
@events.setter
def events(self, events: int) -> None: ...
@property
def events_str(self) -> str: ...
class timer(_base.TimerMixin, watcher):
@property
def at(self) -> float: ...
class signal(_base.SignalMixin, watcher): ...
class idle(_base.IdleMixin, watcher): ...
class prepare(_base.PrepareMixin, watcher): ...
class check(_base.CheckMixin, watcher): ...
class fork(_base.ForkMixin, watcher): ...
class async_(_base.AsyncMixin, watcher): ...
class child(_base.ChildMixin, watcher):
@property
def rpid(self) -> int: ...
@rpid.setter
def rpid(self, value: int) -> None: ...
@property
def rstatus(self) -> int: ...
@rstatus.setter
def rstatus(self, value: int) -> None: ...
class stat(_base.StatMixin, watcher):
@property
def interval(self) -> float: ...

View File

View File

@@ -0,0 +1,41 @@
import sys
from _typeshed import FileDescriptor
from typing import NamedTuple
import gevent.libuv.watcher as watcher
from gevent._ffi.loop import AbstractLoop
from gevent._types import _IoWatcher
def get_version() -> str: ...
def get_header_version() -> str: ...
def supported_backends() -> list[str]: ...
class loop(AbstractLoop):
CALLBACK_CHECK_COUNT: int
SIGNAL_CHECK_INTERVAL_MS: int
approx_timer_resolution: float
error_handler: None
def __init__(self, flags: int | None = None, default: bool | None = None) -> None: ...
class _HandleState(NamedTuple):
handle: int
type: str
watcher: watcher.watcher
ref: bool
active: bool
closing: bool
def debug(self) -> list[_HandleState]: ...
def install_sigchld(self) -> None: ...
def reset_sigchld(self) -> None: ...
# this returns a class private to gevent.libuv.watcher.io, which satisifies the protocol
def io(self, fd: FileDescriptor, events: int, ref: bool = True, priority: int | None = None) -> _IoWatcher: ...
def closing_fd(self, fd: FileDescriptor) -> bool: ...
def timer(self, after: float, repeat: float = 0.0, ref: bool = True, priority: int | None = None) -> watcher.timer: ...
def signal(self, signum: int, ref: bool = True, priority: int | None = None) -> watcher.signal: ...
def idle(self, ref: bool = True, priority: int | None = None) -> watcher.idle: ...
def check(self, ref: bool = True, priority: int | None = None) -> watcher.check: ...
def async_(self, ref: bool = True, priority: int | None = None) -> watcher.async_: ...
if sys.platform != "win32":
def fork(self, ref: bool = True, priority: int | None = None) -> watcher.fork: ...
def child(self, pid: int, trace: int = 0, ref: bool = True) -> watcher.child: ...
# prepare is not supported on libuv yet, but we need type_error to annotate that

View File

@@ -0,0 +1,32 @@
from gevent._ffi import watcher as _base
from gevent._types import _IoWatcher
class watcher(_base.watcher):
@property
def ref(self) -> bool: ...
@ref.setter
def ref(self, value: bool) -> None: ...
class io(_base.IoMixin, watcher):
EVENT_MASK: int
@property
def events(self) -> int: ...
@events.setter
def events(self, value: int) -> None: ...
def multiplex(self, events: int) -> _IoWatcher: ...
class fork(_base.ForkMixin, watcher): ...
class child(_base.ChildMixin, watcher): ...
# for some reason pending on this has been overwritten with None, but we don't
# necessarily want to change our Protocol to reflect that, so for now we ignore it
class async_(_base.AsyncMixin, watcher): ...
class timer(_base.TimerMixin, watcher): ...
class stat(_base.StatMixin, watcher):
MIN_STAT_INTERVAL: float
class signal(_base.SignalMixin, watcher): ...
class idle(_base.IdleMixin, watcher): ...
class check(_base.CheckMixin, watcher): ...
class prepare(_base.PrepareMixin, watcher): ...

View File

@@ -0,0 +1,9 @@
from typing import Any
from typing_extensions import Self
class local:
def __init__(self, *args: object, **kwargs: object) -> None: ...
def __copy__(self) -> Self: ...
def __getattribute__(self, name: str) -> Any: ...
def __delattr__(self, name: str) -> None: ...
def __setattr__(self, name: str, value: Any) -> None: ...

View File

@@ -0,0 +1,41 @@
from collections.abc import Callable
from types import TracebackType
from typing import Any
from typing_extensions import Literal
from gevent._abstract_linkable import AbstractLinkable
from gevent.hub import Hub
__all__ = ["Semaphore", "BoundedSemaphore", "DummySemaphore", "RLock"]
class Semaphore(AbstractLinkable):
counter: int
def __init__(self, value: int = 1, hub: Hub | None = None) -> None: ...
def acquire(self, blocking: bool = True, timeout: float | None = None) -> bool: ...
def locked(self) -> bool: ...
def ready(self) -> bool: ...
def release(self) -> int: ...
def wait(self, timeout: float | None = None) -> int: ...
def __enter__(self) -> None: ...
def __exit__(self, t: type[BaseException] | None, v: BaseException | None, tb: TracebackType | None) -> None: ...
class BoundedSemaphore(Semaphore): ...
class DummySemaphore:
def __init__(self, value: int | None = None) -> None: ...
def locked(self) -> Literal[False]: ...
def ready(self) -> Literal[True]: ...
def release(self) -> None: ...
def rawlink(self, callback: Callable[[Any], object]) -> None: ...
def unlink(self, callback: Callable[[Any], object]) -> None: ...
def wait(self, timeout: float | None = None) -> Literal[1]: ...
def acquire(self, blocking: bool = True, timeout: float | None = None) -> Literal[True]: ...
def __enter__(self) -> None: ...
def __exit__(self, typ: type[BaseException] | None, val: BaseException | None, tb: TracebackType | None) -> None: ...
class RLock:
def __init__(self, hub: Hub | None = None) -> None: ...
def acquire(self, blocking: bool = True, timeout: float | None = None) -> bool: ...
def __enter__(self) -> bool: ...
def release(self) -> None: ...
def __exit__(self, typ: type[BaseException] | None, val: BaseException | None, tb: TracebackType | None) -> None: ...

View File

@@ -0,0 +1,41 @@
from types import ModuleType
from typing import Any
class MonkeyPatchWarning(RuntimeWarning): ...
def is_module_patched(mod_name: str) -> bool: ...
def is_object_patched(mod_name: str, item_name: str) -> bool: ...
def get_original(mod_name: str, item_name: str) -> Any: ...
def patch_module(target_module: ModuleType, source_module: ModuleType, items: list[str] | None = None) -> bool: ...
def patch_os() -> None: ...
def patch_queue() -> None: ...
def patch_time() -> None: ...
def patch_thread(
threading: bool = True, _threading_local: bool = True, Event: bool = True, logging: bool = True, existing_locks: bool = True
) -> None: ...
def patch_socket(dns: bool = True, aggressive: bool = True) -> None: ...
def patch_dns() -> None: ...
def patch_ssl() -> None: ...
def patch_select(aggressive: bool = True) -> None: ...
def patch_selectors(aggressive: bool = True) -> None: ...
def patch_subprocess() -> None: ...
def patch_signal() -> None: ...
def patch_all(
socket: bool = True,
dns: bool = True,
time: bool = True,
select: bool = True,
thread: bool = True,
os: bool = True,
ssl: bool = True,
subprocess: bool = True,
sys: bool = False,
aggressive: bool = True,
Event: bool = True,
builtins: bool = True, # does nothing on Python 3
signal: bool = True,
queue: bool = True,
contextvars: bool = True, # does nothing on Python 3.7+
**kwargs: object,
) -> bool | None: ...
def main() -> dict[str, Any]: ...

View File

@@ -0,0 +1,31 @@
import os
import sys
from _typeshed import FileDescriptor, Incomplete, ReadableBuffer
from collections.abc import Callable
from typing_extensions import Literal
def tp_read(fd: FileDescriptor, n: int) -> bytes: ...
def tp_write(fd: FileDescriptor, buf: ReadableBuffer) -> int: ...
if sys.platform != "win32":
def make_nonblocking(fd: FileDescriptor) -> Literal[True] | None: ...
def nb_read(fd: FileDescriptor, n: int) -> bytes: ...
def nb_write(fd: FileDescriptor, buf: ReadableBuffer) -> int: ...
fork = os.fork
forkpty = os.forkpty
def fork_gevent() -> int: ...
def forkpty_gevent() -> tuple[int, int]: ...
waitpid = os.waitpid
def fork_and_watch(
callback: Incomplete | None = None, loop: Incomplete | None = None, ref: bool = False, fork: Callable[[], int] = ...
) -> int: ...
def forkpty_and_watch(
callback: Incomplete | None = None,
loop: Incomplete | None = None,
ref: bool = False,
forkpty: Callable[[], tuple[int, int]] = ...,
) -> tuple[int, int]: ...
if sys.version_info >= (3, 8):
posix_spawn = os.posix_spawn
posix_spawnp = os.posix_spawnp

View File

@@ -0,0 +1,197 @@
from collections.abc import Callable, Collection, Iterable, Iterator
from typing import Any, TypeVar, overload
from typing_extensions import ParamSpec
from gevent._imap import IMap, IMapUnordered
from gevent.greenlet import Greenlet
from gevent.queue import Full as QueueFull
__all__ = ["Group", "Pool", "PoolFull"]
_T = TypeVar("_T")
_T1 = TypeVar("_T1")
_T2 = TypeVar("_T2")
_T3 = TypeVar("_T3")
_T4 = TypeVar("_T4")
_T5 = TypeVar("_T5")
_S = TypeVar("_S")
_P = ParamSpec("_P")
class GroupMappingMixin:
def spawn(self, func: Callable[_P, _T], *args: _P.args, **kwargs: _P.kwargs) -> Greenlet[_P, _T]: ...
# we would like to use ParamSpec for these, but since args and kwds are passed in as is
# pyright will complain if we use _P.args/_P.kwargs, it appears to work on mypy though
# we can probably get away with Sequence and Mapping instead of tuple and dict, but for
# now we will be strict, just to be safe
def apply_cb(
self,
func: Callable[..., _T],
args: tuple[Any, ...] | None = None,
kwds: dict[str, Any] | None = None,
callback: Callable[[_T], object] | None = None,
) -> _T: ...
# The ParamSpec of the spawned greenlet can differ from the one being passed in, but the return type will match
def apply_async(
self,
func: Callable[..., _T],
args: tuple[Any, ...] | None = None,
kwds: dict[str, Any] | None = None,
callback: Callable[[_T], object] | None = None,
) -> Greenlet[..., _T]: ...
def apply(self, func: Callable[..., _T], args: tuple[Any, ...] | None = None, kwds: dict[str, Any] | None = None) -> _T: ...
def map(self, func: Callable[[_T], _S], iterable: Iterable[_T]) -> list[_S]: ...
def map_cb(
self, func: Callable[[_T], _S], iterable: Iterable[_T], callback: Callable[[list[_S]], object] | None = None
) -> list[_S]: ...
def map_async(
self, func: Callable[[_T], _S], iterable: Iterable[_T], callback: Callable[[list[_S]], object] | None = None
) -> Greenlet[..., list[_S]]: ...
@overload
def imap(self, func: Callable[[_T1], _S], __iter1: Iterable[_T1], *, maxsize: int | None = None) -> IMap[[_T1], _S]: ...
@overload
def imap(
self, func: Callable[[_T1, _T2], _S], __iter1: Iterable[_T1], __iter2: Iterable[_T2], *, maxsize: int | None = None
) -> IMap[[_T1, _T2], _S]: ...
@overload
def imap(
self,
func: Callable[[_T1, _T2, _T3], _S],
__iter1: Iterable[_T1],
__iter2: Iterable[_T2],
__iter3: Iterable[_T3],
*,
maxsize: int | None = None,
) -> IMap[[_T1, _T2, _T3], _S]: ...
@overload
def imap(
self,
func: Callable[[_T1, _T2, _T3, _T4], _S],
__iter1: Iterable[_T1],
__iter2: Iterable[_T2],
__iter3: Iterable[_T3],
__iter4: Iterable[_T4],
*,
maxsize: int | None = None,
) -> IMap[[_T1, _T2, _T3, _T4], _S]: ...
@overload
def imap(
self,
func: Callable[[_T1, _T2, _T3, _T4, _T5], _S],
__iter1: Iterable[_T1],
__iter2: Iterable[_T2],
__iter3: Iterable[_T3],
__iter4: Iterable[_T4],
__iter5: Iterable[_T5],
*,
maxsize: int | None = None,
) -> IMap[[_T1, _T2, _T3, _T4, _T5], _S]: ...
@overload
def imap(
self,
func: Callable[_P, _S],
__iter1: Iterable[Any],
__iter2: Iterable[Any],
__iter3: Iterable[Any],
__iter4: Iterable[Any],
__iter5: Iterable[Any],
__iter6: Iterable[Any],
*iterables: Iterable[Any],
maxsize: int | None = None,
) -> IMap[_P, _S]: ...
@overload
def imap_unordered(
self, func: Callable[[_T1], _S], __iter1: Iterable[_T1], *, maxsize: int | None = None
) -> IMapUnordered[[_T1], _S]: ...
@overload
def imap_unordered(
self, func: Callable[[_T1, _T2], _S], __iter1: Iterable[_T1], __iter2: Iterable[_T2], *, maxsize: int | None = None
) -> IMapUnordered[[_T1, _T2], _S]: ...
@overload
def imap_unordered(
self,
func: Callable[[_T1, _T2, _T3], _S],
__iter1: Iterable[_T1],
__iter2: Iterable[_T2],
__iter3: Iterable[_T3],
*,
maxsize: int | None = None,
) -> IMapUnordered[[_T1, _T2, _T3], _S]: ...
@overload
def imap_unordered(
self,
func: Callable[[_T1, _T2, _T3, _T4], _S],
__iter1: Iterable[_T1],
__iter2: Iterable[_T2],
__iter3: Iterable[_T3],
__iter4: Iterable[_T4],
*,
maxsize: int | None = None,
) -> IMapUnordered[[_T1, _T2, _T3, _T4], _S]: ...
@overload
def imap_unordered(
self,
func: Callable[[_T1, _T2, _T3, _T4, _T5], _S],
__iter1: Iterable[_T1],
__iter2: Iterable[_T2],
__iter3: Iterable[_T3],
__iter4: Iterable[_T4],
__iter5: Iterable[_T5],
*,
maxsize: int | None = None,
) -> IMapUnordered[[_T1, _T2, _T3, _T4, _T5], _S]: ...
@overload
def imap_unordered(
self,
func: Callable[_P, _S],
__iter1: Iterable[Any],
__iter2: Iterable[Any],
__iter3: Iterable[Any],
__iter4: Iterable[Any],
__iter5: Iterable[Any],
__iter6: Iterable[Any],
*iterables: Iterable[Any],
maxsize: int | None = None,
) -> IMapUnordered[_P, _S]: ...
# TODO: Consider making these generic in Greenlet. The drawback would be, that it
# wouldn't be possible to mix Greenlets with different return values/ParamSpecs
# unless you bind Grenlet[..., object], but in that case all the spawn/apply/map
# methods become less helpful, because the return types cannot be as specific...
# We would need higher-kinded TypeVars if we wanted to give up neither
class Group(GroupMappingMixin):
greenlet_class: type[Greenlet[..., Any]]
greenlets: set[Greenlet[..., Any]]
dying: set[Greenlet[..., Any]]
@overload
def __init__(self) -> None: ...
@overload
def __init__(self, __grenlets: Collection[Greenlet[..., object]]) -> None: ...
def __len__(self) -> int: ...
def __contains__(self, item: Greenlet[..., object]) -> bool: ...
def __iter__(self) -> Iterator[Greenlet[..., object]]: ...
def add(self, greenlet: Greenlet[..., object]) -> None: ...
def discard(self, greenlet: Greenlet[..., object]) -> None: ...
def start(self, greenlet: Greenlet[..., object]) -> None: ...
def join(self, timeout: float | None = None, raise_error: bool = False) -> bool: ...
def kill(
self, exception: type[BaseException] | BaseException = ..., block: bool = True, timeout: float | None = None
) -> None: ...
def killone(
self,
greenlet: Greenlet[..., object],
exception: type[BaseException] | BaseException = ...,
block: bool = True,
timeout: float | None = None,
) -> None: ...
def full(self) -> bool: ...
def wait_available(self, timeout: float | None = None) -> int | None: ...
class PoolFull(QueueFull): ...
class Pool(Group):
size: int | None
def __init__(self, size: int | None = None, greenlet_class: type[Greenlet[..., object]] | None = None) -> None: ...
def wait_available(self, timeout: float | None = None) -> int: ...
def free_count(self) -> int: ...
def start(self, greenlet: Greenlet[..., object], blocking: bool = True, timeout: float | None = None) -> None: ...
def add(self, greenlet: Greenlet[..., object], blocking: bool = True, timeout: float | None = None) -> None: ...

View File

@@ -0,0 +1,178 @@
from _typeshed import OptExcInfo, StrOrBytesPath, SupportsWrite
from _typeshed.wsgi import WSGIApplication, WSGIEnvironment
from collections.abc import Callable, Container, Iterable, Iterator
from http.client import HTTPMessage
from io import BufferedIOBase, BufferedReader
from logging import Logger
from types import TracebackType
from typing import Any, ClassVar, Protocol, TypeVar, overload
from typing_extensions import Literal, Self
from gevent.baseserver import _Spawner
from gevent.server import StreamServer
from gevent.socket import socket as _GeventSocket
from gevent.ssl import SSLContext
__all__ = ["WSGIServer", "WSGIHandler", "LoggingLogAdapter", "Environ", "SecureEnviron", "WSGISecureEnviron"]
_T = TypeVar("_T")
class _LogOutputStream(SupportsWrite[str], Protocol):
def writelines(self, __lines: Iterable[str]) -> None: ...
def flush(self) -> None: ...
class Input:
rfile: BufferedReader
content_length: int | None
socket: _GeventSocket | None
position: int
chunked_input: bool
chunk_length: int
def __init__(
self, rfile: BufferedReader, content_length: int | None, socket: _GeventSocket | None = None, chunked_input: bool = False
) -> None: ...
def read(self, length: int | None = None) -> bytes: ...
def readline(self, size: int | None = None) -> bytes: ...
def readlines(self, hint: object | None = None) -> list[bytes]: ...
def __iter__(self) -> Self: ...
def next(self) -> bytes: ...
__next__ = next
class OldMessage(HTTPMessage):
status: str
def __init__(self) -> None: ...
@overload
def getheader(self, name: str, default: None = None) -> str | None: ...
@overload
def getheader(self, name: str, default: _T) -> str | _T: ...
@property
def headers(self) -> Iterator[str]: ...
@property
def typeheader(self) -> str | None: ...
class WSGIHandler:
protocol_version: str
def MessageClass(self, fp: BufferedIOBase) -> OldMessage: ...
status: str | None
response_headers: list[tuple[str, str]] | None
code: int | None
provided_date: str | None
provided_content_length: str | None
close_connection: bool
time_start: float
time_finish: float
headers_sent: bool
response_use_chunked: bool
connection_upgraded: bool
environ: WSGIEnvironment | None
application: WSGIApplication | None
requestline: str | None
response_length: int
result: Iterable[bytes] | None
wsgi_input: Input | None
content_length: int
headers: OldMessage
request_version: str | None
command: str | None
path: str | None
socket: _GeventSocket
client_address: str
server: WSGIServer
rfile: BufferedReader
def __init__(self, sock: _GeventSocket, address: str, server: WSGIServer) -> None: ...
def handle(self) -> None: ...
def read_request(self, raw_requestline: str) -> OldMessage: ...
def log_error(self, msg: str, *args: object) -> None: ...
def read_requestline(self) -> str: ...
def handle_one_request(self) -> tuple[str, bytes] | Literal[True] | None: ...
def finalize_headers(self) -> None: ...
ApplicationError: type[AssertionError]
def write(self, data: bytes) -> None: ...
def start_response(
self, status: str, headers: list[tuple[str, str]], exc_info: OptExcInfo | None = None
) -> Callable[[bytes], None]: ...
def log_request(self) -> None: ...
def format_request(self) -> str: ...
def process_result(self) -> None: ...
def run_application(self) -> None: ...
ignored_socket_errors: tuple[int, ...]
def handle_one_response(self) -> None: ...
def handle_error(self, t: type[BaseException] | None, v: BaseException | None, tb: TracebackType | None) -> None: ...
def get_environ(self) -> WSGIEnvironment: ...
class LoggingLogAdapter:
def __init__(self, logger: Logger, level: int = 20) -> None: ...
def write(self, msg: str) -> None: ...
def flush(self) -> None: ...
def writelines(self, lines: Iterable[str]) -> None: ...
def __getattr__(self, name: str) -> Any: ...
def __setattr__(self, name: str, value: object) -> None: ...
def __delattr__(self, name: str) -> None: ...
class Environ(WSGIEnvironment): ...
class SecureEnviron(Environ):
default_secure_repr: ClassVar[bool]
default_whitelist_keys: ClassVar[Container[str]]
default_print_masked_keys: ClassVar[bool]
secure_repr: bool
whitelist_keys: Container[str]
print_masked_keys: bool
class WSGISecureEnviron(SecureEnviron): ...
class WSGIServer(StreamServer):
handler_class: type[WSGIHandler]
log: _LogOutputStream
error_log: _LogOutputStream
environ_class: type[WSGIEnvironment]
secure_environ_class: type[SecureEnviron]
base_env: WSGIEnvironment
application: WSGIApplication
@overload
def __init__(
self,
listener: _GeventSocket | tuple[str, int] | str,
application: WSGIApplication | None = None,
backlog: int | None = None,
spawn: _Spawner = "default",
log: str | Logger | _LogOutputStream | None = "default",
error_log: str | Logger | _LogOutputStream | None = "default",
handler_class: type[WSGIHandler] | None = None,
environ: WSGIEnvironment | None = None,
*,
ssl_context: SSLContext,
server_side: bool = True,
do_handshake_on_connect: bool = True,
suppress_ragged_eofs: bool = True,
) -> None: ...
@overload
def __init__(
self,
listener: _GeventSocket | tuple[str, int] | str,
application: WSGIApplication | None = None,
backlog: int | None = None,
spawn: _Spawner = "default",
log: str | Logger | _LogOutputStream | None = "default",
error_log: str | Logger | _LogOutputStream | None = "default",
handler_class: type[WSGIHandler] | None = None,
environ: WSGIEnvironment | None = None,
*,
keyfile: StrOrBytesPath = ...,
certfile: StrOrBytesPath = ...,
server_side: bool = True,
cert_reqs: int = ...,
ssl_version: int = ...,
ca_certs: str = ...,
do_handshake_on_connect: bool = True,
suppress_ragged_eofs: bool = True,
ciphers: str = ...,
) -> None: ...
environ: WSGIEnvironment
def set_environ(self, environ: WSGIEnvironment | None = None) -> None: ...
max_accept: int
def set_max_accept(self) -> None: ...
def get_environ(self) -> WSGIEnvironment: ...
def init_socket(self) -> None: ...
def update_environ(self) -> None: ...
def handle(self, sock: _GeventSocket, address: str) -> None: ...

View File

@@ -0,0 +1,87 @@
from collections import deque
from collections.abc import Iterable
# technically it is using _PySimpleQueue, which has the same interface as SimpleQueue
from queue import Empty as Empty, Full as Full, SimpleQueue as SimpleQueue
from typing import Any, Generic, TypeVar, overload
from typing_extensions import Literal, Self, final
from gevent._waiter import Waiter
from gevent.hub import Hub
__all__ = ["Queue", "PriorityQueue", "LifoQueue", "SimpleQueue", "JoinableQueue", "Channel", "Empty", "Full"]
_T = TypeVar("_T")
class Queue(Generic[_T]):
@property
def hub(self) -> Hub: ... # readonly in Cython
@property
def queue(self) -> deque[_T]: ... # readonly in Cython
maxsize: int | None
@overload
def __init__(self, maxsize: int | None = None) -> None: ...
@overload
def __init__(self, maxsize: int | None, items: Iterable[_T]) -> None: ...
@overload
def __init__(self, maxsize: int | None = None, *, items: Iterable[_T]) -> None: ...
def copy(self) -> Self: ...
def empty(self) -> bool: ...
def full(self) -> bool: ...
def get(self, block: bool = True, timeout: float | None = None) -> _T: ...
def get_nowait(self) -> _T: ...
def peek(self, block: bool = True, timeout: float | None = None) -> _T: ...
def peek_nowait(self) -> _T: ...
def put(self, item: _T, block: bool = True, timeout: float | None = None) -> None: ...
def put_nowait(self, item: _T) -> None: ...
def qsize(self) -> int: ...
def __bool__(self) -> bool: ...
def __iter__(self) -> Self: ...
def __len__(self) -> int: ...
def __next__(self) -> _T: ...
next = __next__
@final
class UnboundQueue(Queue[_T]):
@overload
def __init__(self, maxsize: None = None) -> None: ...
@overload
def __init__(self, maxsize: None, items: Iterable[_T]) -> None: ...
@overload
def __init__(self, maxsize: None = None, *, items: Iterable[_T]) -> None: ...
class PriorityQueue(Queue[_T]): ...
class LifoQueue(Queue[_T]): ...
class JoinableQueue(Queue[_T]):
@property
def unfinished_tasks(self) -> int: ... # readonly in Cython
@overload
def __init__(self, maxsize: int | None = None, *, unfinished_tasks: int | None = None) -> None: ...
@overload
def __init__(self, maxsize: int | None, items: Iterable[_T], unfinished_tasks: int | None = None) -> None: ...
@overload
def __init__(self, maxsize: int | None = None, *, items: Iterable[_T], unfinished_tasks: int | None = None) -> None: ...
def join(self, timeout: float | None = None) -> bool: ...
def task_done(self) -> None: ...
class Channel(Generic[_T]):
@property
def getters(self) -> deque[Waiter[Any]]: ... # readonly in Cython
@property
def putters(self) -> deque[tuple[_T, Waiter[Any]]]: ... # readonly in Cython
@property
def hub(self) -> Hub: ... # readonly in Cython
def __init__(self, maxsize: Literal[1] = 1) -> None: ...
@property
def balance(self) -> int: ...
def qsize(self) -> Literal[0]: ...
def empty(self) -> Literal[True]: ...
def full(self) -> Literal[True]: ...
def put(self, item: _T, block: bool = True, timeout: float | None = None) -> None: ...
def put_nowait(self, item: _T) -> None: ...
def get(self, block: bool = True, timeout: float | None = None) -> _T: ...
def get_nowait(self) -> _T: ...
def __iter__(self) -> Self: ...
def __next__(self) -> _T: ...
next = __next__

View File

@@ -0,0 +1,21 @@
from collections.abc import Callable
from typing import Any, TypeVar
from gevent._types import _AddrinfoResult, _NameinfoResult, _SockAddr
_F = TypeVar("_F", bound=Callable[..., Any])
class AbstractResolver:
HOSTNAME_ENCODING: str
EAI_NONAME_MSG: str
EAI_FAMILY_MSG: str
def close(self) -> None: ...
@staticmethod
def fixup_gaierror(func: _F) -> _F: ...
def gethostbyname(self, hostname: str, family: int = 2) -> str: ...
def gethostbyname_ex(self, hostname: str, family: int = 2) -> tuple[str, list[str], list[str]]: ...
def getaddrinfo(
self, host: str, port: int, family: int = 0, socktype: int = 0, proto: int = 0, flags: int = 0
) -> _AddrinfoResult: ...
def gethostbyaddr(self, ip_address: str) -> tuple[str, list[str], list[str]]: ...
def getnameinfo(self, sockaddr: _SockAddr, flags: int) -> _NameinfoResult: ...

View File

@@ -0,0 +1,41 @@
import sys
if sys.platform != "win32":
from collections.abc import Sequence
from typing_extensions import TypedDict
from gevent._types import _Watcher
from gevent.hub import Hub
from gevent.resolver import AbstractResolver
from gevent.resolver.cares import channel
class _ChannelArgs(TypedDict):
flags: str | int | None
timeout: str | float | None
tries: str | int | None
ndots: str | int | None
udp_port: str | int | None
tcp_port: str | int | None
servers: Sequence[str] | str | None
class Resolver(AbstractResolver):
cares_class: type[channel]
hub: Hub
cares: channel
pid: int
params: _ChannelArgs
fork_watcher: _Watcher
def __init__(
self,
hub: Hub | None = None,
use_environ: bool = True,
*,
flags: str | int | None = None,
timeout: str | float | None = None,
tries: str | int | None = None,
ndots: str | int | None = None,
udp_port: str | int | None = None,
tcp_port: str | int | None = None,
servers: Sequence[str] | str | None = None,
) -> None: ...
def __del__(self) -> None: ...

View File

@@ -0,0 +1,13 @@
from gevent._types import _AddrinfoResult, _NameinfoResult, _SockAddr
from gevent.hub import Hub
class Resolver:
def __init__(self, hub: Hub | None = None) -> None: ...
def close(self) -> None: ...
def gethostbyname(self, hostname: str, family: int = 2) -> str: ...
def gethostbyname_ex(self, hostname: str, family: int = 2) -> tuple[str, list[str], list[str]]: ...
def getaddrinfo(
self, host: str, port: int, family: int = 0, socktype: int = 0, proto: int = 0, flags: int = 0
) -> _AddrinfoResult: ...
def gethostbyaddr(self, ip_address: str) -> tuple[str, list[str], list[str]]: ...
def getnameinfo(self, sockaddr: _SockAddr, flags: int) -> _NameinfoResult: ...

View File

@@ -0,0 +1,53 @@
import sys
if sys.platform != "win32":
from collections.abc import Callable, Sequence
from typing import Any, Generic, TypeVar
from typing_extensions import Self
from gevent._types import _AddrinfoResult, _Loop, _NameinfoResult, _SockAddr
_T = TypeVar("_T")
class Result(Generic[_T]):
exception: BaseException | None
value: _T | None
def __init__(self, value: _T | None = None, exception: BaseException | None = None) -> None: ...
def get(self) -> Any | None: ...
def successful(self) -> bool: ...
class ares_host_result(tuple[str, list[str], list[str]]):
family: int
def __new__(cls, family: int, __hostname: str, __aliases: list[str], __addr_list: list[str]) -> Self: ...
class channel:
@property
def loop(self) -> _Loop: ...
def __init__(
self,
loop: _Loop,
flags: str | int | None = None,
timeout: str | float | None = None,
tries: str | int | None = None,
ndots: str | int | None = None,
udp_port: str | int | None = None,
tcp_port: str | int | None = None,
servers: Sequence[str] | str | None = None,
) -> None: ...
def destroy(self) -> None: ...
def getaddrinfo(
self,
callback: Callable[[Result[_AddrinfoResult]], object],
name: str,
service: str | None,
family: int = 0,
type: int = 0,
proto: int = 0,
flags: int = 0,
) -> None: ...
def gethostbyaddr(self, callback: Callable[[Result[ares_host_result]], object], addr: str) -> Any: ...
def gethostbyname(self, callback: Callable[[Result[ares_host_result]], object], name: str, family: int = ...) -> None: ...
def getnameinfo(self, callback: Callable[[Result[_NameinfoResult]], object], sockaddr: _SockAddr, flags: int) -> None: ...
def set_servers(self, servers: Sequence[str] | str | None = None) -> None: ...
__all__ = ["channel"]

View File

@@ -0,0 +1,9 @@
from typing import Any
from gevent.hub import Hub
from gevent.resolver import AbstractResolver
class Resolver(AbstractResolver):
def __init__(self, hub: Hub | None = ...) -> None: ...
@property
def resolver(self) -> Any: ... # this is a custom dnspython Resolver

View File

@@ -0,0 +1,15 @@
from gevent._types import _AddrinfoResult, _NameinfoResult, _SockAddr
from gevent.hub import Hub
from gevent.threadpool import ThreadPool
class Resolver:
pool: ThreadPool
def __init__(self, hub: Hub | None = None) -> None: ...
def close(self) -> None: ...
def gethostbyname(self, hostname: str, family: int = 2) -> str: ...
def gethostbyname_ex(self, hostname: str, family: int = 2) -> tuple[str, list[str], list[str]]: ...
def getaddrinfo(
self, host: str, port: int, family: int = 0, socktype: int = 0, proto: int = 0, flags: int = 0
) -> _AddrinfoResult: ...
def gethostbyaddr(self, ip_address: str) -> tuple[str, list[str], list[str]]: ...
def getnameinfo(self, sockaddr: _SockAddr, flags: int) -> _NameinfoResult: ...

View File

@@ -0,0 +1 @@
from gevent.resolver.ares import *

View File

@@ -0,0 +1 @@
from gevent.resolver.thread import *

View File

@@ -0,0 +1,15 @@
import sys
from collections.abc import Iterable
from select import error as error
from typing import Any
def select(
rlist: Iterable[Any], wlist: Iterable[Any], xlist: Iterable[Any], timeout: float | None = None
) -> tuple[list[Any], list[Any], list[Any]]: ...
if sys.platform != "win32":
from select import poll as poll
__all__ = ["error", "poll", "select"]
else:
__all__ = ["error", "select"]

View File

@@ -0,0 +1,27 @@
from _typeshed import FileDescriptorLike
from collections.abc import Mapping
from selectors import BaseSelector, SelectorKey
from typing import Any
from typing_extensions import TypeAlias
from gevent._util import Lazy
from gevent.hub import Hub
__all__ = ["DefaultSelector", "GeventSelector"]
_EventMask: TypeAlias = int
# technically this derives from _BaseSelectorImpl, which does not have type annotations
# but in terms of type checking the only difference is, that we need to add get_map since
# GeventSelector does not override it
class GeventSelector(BaseSelector):
def __init__(self, hub: Hub | None = None) -> None: ...
@Lazy
def hub(self) -> Hub: ...
def register(self, fileobj: FileDescriptorLike, events: _EventMask, data: Any = None) -> SelectorKey: ...
def unregister(self, fileobj: FileDescriptorLike) -> SelectorKey: ...
def select(self, timeout: float | None = None) -> list[tuple[SelectorKey, _EventMask]]: ...
def close(self) -> None: ...
def get_map(self) -> Mapping[FileDescriptorLike, SelectorKey]: ...
DefaultSelector = GeventSelector

View File

@@ -0,0 +1,84 @@
from _socket import _Address as _StrictAddress
from _typeshed import ReadableBuffer, StrOrBytesPath
from collections.abc import Callable
from typing import Any, ClassVar, overload
from typing_extensions import TypeAlias, TypedDict
from gevent.baseserver import BaseServer, _Spawner
from gevent.socket import socket as _GeventSocket
from gevent.ssl import SSLContext, wrap_socket as ssl_wrap_socket
# For simplicity we treat _Address as Any, we could be more strict and use the definition
# from the stdlib _socket.pyi. But that would exclude some potentially valid handlers.
_Address: TypeAlias = Any
class _SSLArguments(TypedDict, total=False):
keyfile: StrOrBytesPath
certfile: StrOrBytesPath
server_side: bool
cert_reqs: int
ssl_version: int
ca_certs: str
suppress_ragged_eofs: bool
do_handshake_on_connect: bool
ciphers: str
class StreamServer(BaseServer[[_GeventSocket, _Address]]):
backlog: int
reuse_addr: ClassVar[int | None]
wrap_socket = ssl_wrap_socket
ssl_args: _SSLArguments | None
@overload
def __init__(
self,
listener: _GeventSocket | tuple[str, int] | str,
handle: Callable[[_GeventSocket, _Address], object] | None = None,
backlog: int | None = None,
spawn: _Spawner = "default",
*,
ssl_context: SSLContext,
server_side: bool = True,
do_handshake_on_connect: bool = True,
suppress_ragged_eofs: bool = True,
) -> None: ...
@overload
def __init__(
self,
listener: _GeventSocket | tuple[str, int] | str,
handle: Callable[[_GeventSocket, _Address], object] | None = None,
backlog: int | None = None,
spawn: _Spawner = "default",
*,
keyfile: StrOrBytesPath = ...,
certfile: StrOrBytesPath = ...,
server_side: bool = True,
cert_reqs: int = ...,
ssl_version: int = ...,
ca_certs: str = ...,
do_handshake_on_connect: bool = True,
suppress_ragged_eofs: bool = True,
ciphers: str = ...,
) -> None: ...
@property
def ssl_enabled(self) -> bool: ...
@classmethod
def get_listener(cls, address: _StrictAddress, backlog: int | None = None, family: int | None = None) -> _GeventSocket: ...
def do_read(self) -> tuple[_GeventSocket, _Address]: ...
def do_close(self, sock: _GeventSocket, address: _Address) -> None: ...
def wrap_socket_and_handle(self, client_socket: _GeventSocket, address: _StrictAddress) -> Any: ...
class DatagramServer(BaseServer[[_GeventSocket, _Address]]):
reuse_addr: ClassVar[int | None]
def __init__(
self,
listener: _GeventSocket | tuple[str, int] | str,
handle: Callable[[_GeventSocket, _Address], object] | None = None,
spawn: _Spawner = "default",
) -> None: ...
@classmethod
def get_listener(cls, address: _StrictAddress, family: int | None = None) -> _GeventSocket: ...
def do_read(self) -> tuple[_GeventSocket, _Address]: ...
@overload
def sendto(self, __data: ReadableBuffer, __address: _StrictAddress) -> int: ...
@overload
def sendto(self, __data: ReadableBuffer, __flags: int, __address: _StrictAddress) -> int: ...

View File

@@ -0,0 +1,10 @@
import sys
from signal import _HANDLER, _SIGNUM
# technically the implementations will always be around, but since they always
# throw an exception on windows, due to the missing SIGCHLD, we might as well
# pretent they don't exist, but what is different, is that the parameters are
# named even pre 3.10, so we don't just import the symbol from stdlib signal
if sys.platform != "win32":
def getsignal(signalnum: _SIGNUM) -> _HANDLER: ...
def signal(signalnum: _SIGNUM, handler: _HANDLER) -> _HANDLER: ...

View File

@@ -0,0 +1,23 @@
from socket import *
from gevent._hub_primitives import (
wait_on_watcher,
wait_read as wait_read,
wait_readwrite as wait_readwrite,
wait_write as wait_write,
)
from gevent._types import _Watcher
# This matches the stdlib socket module almost exactly, but contains a couple of extensions
# as a result we just pretend we import everything from socket, which is not entirely correct
# but it gets us most of the way there without having to write a really long list of imports
# with the same platform and version checks, just so we can properly distinguish this module's
# socket class from the native socket class (which could cause issues anyways, since functions
# that accept a socket should still accept the gevent implementation...)
# we can put in the work and do it properly once we have a use-case for it.
# the majority of the gevent implementation can be found in _socket3 and _socketcommon
# which also just imports a lot of symbols from the stdlib socket/_socket module
wait = wait_on_watcher
def cancel_wait(watcher: _Watcher, error: type[BaseException] | BaseException) -> None: ...

View File

@@ -0,0 +1,29 @@
import sys
from _typeshed import StrOrBytesPath
from ssl import *
import gevent.socket
# for simplicity we trust that gevent's implementation matches the stdlib version exactly
# for the most part they just copy all the symbols anyways and re-implment the few that
# need to work differently. The only potentially problematic symbol is SSLSocket, since
# it derives from gevent's socket, rather than the stdlib one. SSLContext derives from
# the stdlib SSLContext. Since we already punted on socket, we don't need to change
# anything here either, until we decide that we can't punt on socket.
if sys.version_info >= (3, 12):
# FIXME: wrap_socket has been removed in 3.12, gevent implements its own, so it
# will probably still be there in 3.12, but until we stub out gevent.ssl
# properly we will have to just pretend it still exists
def wrap_socket(
sock: gevent.socket.socket,
keyfile: StrOrBytesPath | None = None,
certfile: StrOrBytesPath | None = None,
server_side: bool = False,
cert_reqs: int = ...,
ssl_version: int = ...,
ca_certs: str | None = None,
do_handshake_on_connect: bool = True,
suppress_ragged_eofs: bool = True,
ciphers: str | None = None,
) -> SSLSocket: ...

View File

@@ -0,0 +1,4 @@
from subprocess import *
# this is another module we decide to just punt on and trust that gevent's implementation
# at the very least satisfies the stdlib interface.

View File

@@ -0,0 +1,55 @@
import concurrent.futures
from collections.abc import Callable
from typing import Any, Generic, TypeVar
from typing_extensions import ParamSpec, TypeAlias
from gevent._threading import Queue
from gevent._types import _AsyncWatcher, _Watcher
from gevent.event import AsyncResult, _OptExcInfo, _ValueSource
from gevent.greenlet import Greenlet
from gevent.hub import Hub
from gevent.pool import GroupMappingMixin
_T = TypeVar("_T")
_P = ParamSpec("_P")
_TaskItem: TypeAlias = tuple[Callable[..., Any], tuple[Any, ...], dict[str, Any], ThreadResult[Any]]
_Receiver: TypeAlias = Callable[[_ValueSource[_T]], object]
class ThreadPool(GroupMappingMixin):
hub: Hub
pid: int
manager: Greenlet[..., Any] | None
task_queue: Queue[_TaskItem]
fork_watcher: _Watcher
def __init__(self, maxsize: int, hub: Hub | None = None, idle_task_timeout: int = -1) -> None: ...
@property
def maxsize(self) -> int: ...
@maxsize.setter
def maxsize(self, value: int) -> None: ...
@property
def size(self) -> int: ...
@size.setter
def size(self, value: int) -> None: ...
def __len__(self) -> int: ...
def join(self) -> None: ...
def kill(self) -> None: ...
def adjust(self) -> None: ...
def spawn(self, func: Callable[_P, _T], *args: _P.args, **kwargs: _P.kwargs) -> AsyncResult[_T]: ... # type:ignore[override]
class ThreadResult(Generic[_T]):
receiver: _Receiver[_T]
hub: Hub
context: object | None
value: _T | None
exc_info: _OptExcInfo | tuple[()]
async_watcher: _AsyncWatcher
def __init__(self, receiver: _Receiver[_T], hub: Hub, call_when_ready: Callable[[], object]) -> None: ...
@property
def exception(self) -> BaseException | None: ...
def destroy_in_main_thread(self) -> None: ...
def set(self, value: _T) -> None: ...
def handle_error(self, context: object, exc_info: _OptExcInfo) -> None: ...
def successful(self) -> bool: ...
class ThreadPoolExecutor(concurrent.futures.ThreadPoolExecutor):
kill = concurrent.futures.ThreadPoolExecutor.shutdown

View File

@@ -0,0 +1,3 @@
from gevent.hub import sleep as sleep
__all__ = ["sleep"]

View File

@@ -0,0 +1,48 @@
from collections.abc import Callable
from types import TracebackType
from typing import Any, TypeVar, overload
from typing_extensions import Literal, ParamSpec, Self
from gevent._types import _TimerWatcher
_T = TypeVar("_T")
_T1 = TypeVar("_T1")
_T2 = TypeVar("_T2")
_TimeoutT = TypeVar("_TimeoutT", bound=Timeout)
_P = ParamSpec("_P")
class Timeout(BaseException):
seconds: float | None
exception: type[BaseException] | BaseException | None
timer: _TimerWatcher
def __init__(
self,
seconds: float | None = None,
exception: type[BaseException] | BaseException | None = None,
ref: bool = True,
priority: int = -1,
) -> None: ...
def start(self) -> None: ...
@overload
@classmethod
def start_new(
cls, timeout: None | float = None, exception: type[BaseException] | BaseException | None = None, ref: bool = True
) -> Self: ...
@overload
@classmethod
def start_new(cls, timeout: _TimeoutT) -> _TimeoutT: ...
@property
def pending(self) -> bool: ...
def cancel(self) -> None: ...
def close(self) -> None: ...
def __enter__(self) -> Self: ...
def __exit__(
self, typ: type[BaseException] | None, value: BaseException | None, tb: TracebackType | None
) -> Literal[True] | None: ...
# when timeout_value is provided we unfortunately get no type checking on *args, **kwargs, because
# ParamSpec does not allow mixing in additional keyword arguments
@overload
def with_timeout(seconds: float | None, function: Callable[..., _T1], *args: Any, timeout_value: _T2, **kwds: Any) -> _T1 | _T2: ... # type: ignore[misc]
@overload
def with_timeout(seconds: float | None, function: Callable[_P, _T], *args: _P.args, **kwds: _P.kwargs) -> _T: ...

View File

@@ -0,0 +1,49 @@
from _typeshed import SupportsWrite
from collections.abc import Callable
from types import TracebackType
from typing import Any, Generic, TypeVar
from typing_extensions import ParamSpec, Self
from gevent.hub import Hub
from greenlet import greenlet as greenlet_t
_T = TypeVar("_T")
_P = ParamSpec("_P")
class wrap_errors(Generic[_P, _T]):
def __init__(self, errors: tuple[type[BaseException], ...], func: Callable[_P, _T]) -> None: ...
def __call__(self, *args: _P.args, **kwargs: _P.kwargs) -> _T: ...
def __getattr__(self, name: str) -> Any: ...
def print_run_info(
thread_stacks: bool = True, greenlet_stacks: bool = True, limit: int | None = ..., file: SupportsWrite[str] | None = None
) -> None: ...
def format_run_info(
thread_stacks: bool = True, greenlet_stacks: bool = True, limit: int | None = ..., current_thread_ident: int | None = None
) -> None: ...
class GreenletTree:
greenlet: greenlet_t | None
is_current_tree: bool
child_trees: list[GreenletTree]
DEFAULT_DETAILS: dict[str, Any]
def __init__(self, greenlet: greenlet_t | None) -> None: ...
def add_child(self, tree: GreenletTree) -> None: ...
@property
def root(self) -> bool: ...
def __getattr__(self, name: str) -> Any: ...
def format_lines(self, details: bool | dict[str, Any] = True) -> str: ...
def format(self, details: bool | dict[str, Any] = True) -> str: ...
@classmethod
def forest(cls) -> list[GreenletTree]: ...
@classmethod
def current_tree(cls) -> GreenletTree: ...
class assert_switches:
hub: Hub | None
tracer: object | None
max_blocking_time: float | None
hub_only: bool
def __init__(self, max_blocking_time: float | None = None, hub_only: bool = False) -> None: ...
def __enter__(self) -> Self: ...
def __exit__(self, t: type[BaseException] | None, v: BaseException | None, tb: TracebackType | None) -> None: ...

View File

@@ -0,0 +1,3 @@
from collections.abc import Callable
formatError: Callable[[object], str]