Add docker-py stubs (#11749)

This commit is contained in:
kasium
2024-04-12 15:30:36 +02:00
committed by GitHub
parent a7e13002d4
commit bc8fc36739
69 changed files with 1935 additions and 0 deletions

View File

@@ -42,6 +42,7 @@
"stubs/commonmark",
"stubs/dateparser",
"stubs/defusedxml",
"stubs/docker",
"stubs/docutils",
"stubs/Flask-SocketIO",
"stubs/fpdf2",

View File

@@ -0,0 +1,7 @@
# additional requirements are needed, e.g. win32 apis
docker.transport.NpipeHTTPAdapter
docker.transport.NpipeSocket
docker.transport.SSHHTTPAdapter
docker.transport.npipeconn
docker.transport.npipesocket
docker.transport.sshconn

View File

@@ -0,0 +1,3 @@
version = "7.0.*"
upstream_repository = "https://github.com/docker/docker-py"
requires = ["types-requests", "urllib3>=2"]

View File

@@ -0,0 +1,7 @@
from .api import APIClient as APIClient
from .client import DockerClient as DockerClient, from_env as from_env
from .context import Context as Context, ContextAPI as ContextAPI
from .tls import TLSConfig as TLSConfig
from .version import __version__ as __version__
__title__: str

View File

@@ -0,0 +1 @@
from .client import APIClient as APIClient

View File

@@ -0,0 +1,39 @@
from _typeshed import Incomplete
log: Incomplete
class BuildApiMixin:
def build(
self,
path: Incomplete | None = None,
tag: str | None = None,
quiet: bool = False,
fileobj: Incomplete | None = None,
nocache: bool = False,
rm: bool = False,
timeout: Incomplete | None = None,
custom_context: bool = False,
encoding: Incomplete | None = None,
pull: bool = False,
forcerm: bool = False,
dockerfile: Incomplete | None = None,
container_limits: Incomplete | None = None,
decode: bool = False,
buildargs: Incomplete | None = None,
gzip: bool = False,
shmsize: Incomplete | None = None,
labels: Incomplete | None = None,
cache_from: Incomplete | None = None,
target: Incomplete | None = None,
network_mode: Incomplete | None = None,
squash: Incomplete | None = None,
extra_hosts: Incomplete | None = None,
platform: Incomplete | None = None,
isolation: Incomplete | None = None,
use_config_proxy: bool = True,
): ...
def prune_builds(
self, filters: Incomplete | None = None, keep_storage: Incomplete | None = None, all: Incomplete | None = None
): ...
def process_dockerfile(dockerfile, path): ...

View File

@@ -0,0 +1,55 @@
from _typeshed import Incomplete
from collections.abc import Mapping, Sequence
import requests
from docker.tls import TLSConfig
from requests.adapters import BaseAdapter
from .build import BuildApiMixin
from .config import ConfigApiMixin
from .container import ContainerApiMixin
from .daemon import DaemonApiMixin
from .exec_api import ExecApiMixin
from .image import ImageApiMixin
from .network import NetworkApiMixin
from .plugin import PluginApiMixin
from .secret import SecretApiMixin
from .service import ServiceApiMixin
from .swarm import SwarmApiMixin
from .volume import VolumeApiMixin
class APIClient(
requests.Session,
BuildApiMixin,
ConfigApiMixin,
ContainerApiMixin,
DaemonApiMixin,
ExecApiMixin,
ImageApiMixin,
NetworkApiMixin,
PluginApiMixin,
SecretApiMixin,
ServiceApiMixin,
SwarmApiMixin,
VolumeApiMixin,
):
__attrs__: Sequence[str]
base_url: str
timeout: int
credstore_env: Mapping[Incomplete, Incomplete] | None
def __init__(
self,
base_url: str | None = None,
version: str | None = None,
timeout: int = 60,
tls: bool | TLSConfig = False,
user_agent: str = "docker-sdk-python/7.0.0",
num_pools: int | None = None,
credstore_env: Mapping[Incomplete, Incomplete] | None = None,
use_ssh_client: bool = False,
max_pool_size: int = 10,
) -> None: ...
def get_adapter(self, url: str) -> BaseAdapter: ...
@property
def api_version(self) -> str: ...
def reload_config(self, dockercfg_path: str | None = None) -> None: ...

View File

@@ -0,0 +1,7 @@
from _typeshed import Incomplete
class ConfigApiMixin:
def create_config(self, name, data, labels: Incomplete | None = None, templating: Incomplete | None = None): ...
def inspect_config(self, id): ...
def remove_config(self, id): ...
def configs(self, filters: Incomplete | None = None): ...

View File

@@ -0,0 +1,109 @@
from _typeshed import Incomplete
class ContainerApiMixin:
def attach(
self, container, stdout: bool = True, stderr: bool = True, stream: bool = False, logs: bool = False, demux: bool = False
): ...
def attach_socket(self, container, params: Incomplete | None = None, ws: bool = False): ...
def commit(
self,
container,
repository: str | None = None,
tag: str | None = None,
message: Incomplete | None = None,
author: Incomplete | None = None,
pause: bool = True,
changes: Incomplete | None = None,
conf: Incomplete | None = None,
): ...
def containers(
self,
quiet: bool = False,
all: bool = False,
trunc: bool = False,
latest: bool = False,
since: Incomplete | None = None,
before: Incomplete | None = None,
limit: int = -1,
size: bool = False,
filters: Incomplete | None = None,
): ...
def create_container(
self,
image,
command: Incomplete | None = None,
hostname: Incomplete | None = None,
user: Incomplete | None = None,
detach: bool = False,
stdin_open: bool = False,
tty: bool = False,
ports: Incomplete | None = None,
environment: Incomplete | None = None,
volumes: Incomplete | None = None,
network_disabled: bool = False,
name: Incomplete | None = None,
entrypoint: Incomplete | None = None,
working_dir: Incomplete | None = None,
domainname: Incomplete | None = None,
host_config: Incomplete | None = None,
mac_address: Incomplete | None = None,
labels: Incomplete | None = None,
stop_signal: Incomplete | None = None,
networking_config: Incomplete | None = None,
healthcheck: Incomplete | None = None,
stop_timeout: Incomplete | None = None,
runtime: Incomplete | None = None,
use_config_proxy: bool = True,
platform: Incomplete | None = None,
): ...
def create_container_config(self, *args, **kwargs): ...
def create_container_from_config(self, config, name: Incomplete | None = None, platform: Incomplete | None = None): ...
def create_host_config(self, *args, **kwargs): ...
def create_networking_config(self, *args, **kwargs): ...
def create_endpoint_config(self, *args, **kwargs): ...
def diff(self, container): ...
def export(self, container, chunk_size=2097152): ...
def get_archive(self, container, path, chunk_size=2097152, encode_stream: bool = False): ...
def inspect_container(self, container): ...
def kill(self, container, signal: Incomplete | None = None) -> None: ...
def logs(
self,
container,
stdout: bool = True,
stderr: bool = True,
stream: bool = False,
timestamps: bool = False,
tail: str = "all",
since: Incomplete | None = None,
follow: Incomplete | None = None,
until: Incomplete | None = None,
): ...
def pause(self, container) -> None: ...
def port(self, container, private_port): ...
def put_archive(self, container, path, data): ...
def prune_containers(self, filters: Incomplete | None = None): ...
def remove_container(self, container, v: bool = False, link: bool = False, force: bool = False) -> None: ...
def rename(self, container, name) -> None: ...
def resize(self, container, height, width) -> None: ...
def restart(self, container, timeout: int = 10) -> None: ...
def start(self, container, *args, **kwargs) -> None: ...
def stats(self, container, decode: Incomplete | None = None, stream: bool = True, one_shot: Incomplete | None = None): ...
def stop(self, container, timeout: Incomplete | None = None) -> None: ...
def top(self, container, ps_args: Incomplete | None = None): ...
def unpause(self, container) -> None: ...
def update_container(
self,
container,
blkio_weight: Incomplete | None = None,
cpu_period: Incomplete | None = None,
cpu_quota: Incomplete | None = None,
cpu_shares: Incomplete | None = None,
cpuset_cpus: Incomplete | None = None,
cpuset_mems: Incomplete | None = None,
mem_limit: Incomplete | None = None,
mem_reservation: Incomplete | None = None,
memswap_limit: Incomplete | None = None,
kernel_memory: Incomplete | None = None,
restart_policy: Incomplete | None = None,
): ...
def wait(self, container, timeout: Incomplete | None = None, condition: Incomplete | None = None): ...

View File

@@ -0,0 +1,23 @@
from _typeshed import Incomplete
class DaemonApiMixin:
def df(self): ...
def events(
self,
since: Incomplete | None = None,
until: Incomplete | None = None,
filters: Incomplete | None = None,
decode: Incomplete | None = None,
): ...
def info(self): ...
def login(
self,
username,
password: Incomplete | None = None,
email: Incomplete | None = None,
registry: Incomplete | None = None,
reauth: bool = False,
dockercfg_path: Incomplete | None = None,
): ...
def ping(self): ...
def version(self, api_version: bool = True): ...

View File

@@ -0,0 +1,22 @@
from _typeshed import Incomplete
class ExecApiMixin:
def exec_create(
self,
container,
cmd,
stdout: bool = True,
stderr: bool = True,
stdin: bool = False,
tty: bool = False,
privileged: bool = False,
user: str = "",
environment: Incomplete | None = None,
workdir: Incomplete | None = None,
detach_keys: Incomplete | None = None,
): ...
def exec_inspect(self, exec_id): ...
def exec_resize(self, exec_id, height: Incomplete | None = None, width: Incomplete | None = None) -> None: ...
def exec_start(
self, exec_id, detach: bool = False, tty: bool = False, stream: bool = False, socket: bool = False, demux: bool = False
): ...

View File

@@ -0,0 +1,59 @@
from _typeshed import Incomplete
log: Incomplete
class ImageApiMixin:
def get_image(self, image: str, chunk_size: int = 2097152): ...
def history(self, image): ...
def images(self, name: str | None = None, quiet: bool = False, all: bool = False, filters: Incomplete | None = None): ...
def import_image(
self,
src: Incomplete | None = None,
repository: str | None = None,
tag: str | None = None,
image: str | None = None,
changes: Incomplete | None = None,
stream_src: bool = False,
): ...
def import_image_from_data(
self, data, repository: str | None = None, tag: str | None = None, changes: Incomplete | None = None
): ...
def import_image_from_file(
self, filename: str, repository: str | None = None, tag: str | None = None, changes: Incomplete | None = None
): ...
def import_image_from_stream(
self, stream, repository: str | None = None, tag: str | None = None, changes: Incomplete | None = None
): ...
def import_image_from_url(
self, url, repository: str | None = None, tag: str | None = None, changes: Incomplete | None = None
): ...
def import_image_from_image(
self, image, repository: str | None = None, tag: str | None = None, changes: Incomplete | None = None
): ...
def inspect_image(self, image): ...
def inspect_distribution(self, image, auth_config: Incomplete | None = None): ...
def load_image(self, data, quiet: Incomplete | None = None): ...
def prune_images(self, filters: Incomplete | None = None): ...
def pull(
self,
repository: str,
tag: str | None = None,
stream: bool = False,
auth_config: Incomplete | None = None,
decode: bool = False,
platform: Incomplete | None = None,
all_tags: bool = False,
): ...
def push(
self,
repository: str,
tag: str | None = None,
stream: bool = False,
auth_config: Incomplete | None = None,
decode: bool = False,
): ...
def remove_image(self, image: str, force: bool = False, noprune: bool = False): ...
def search(self, term: str, limit: int | None = None): ...
def tag(self, image, repository, tag: str | None = None, force: bool = False): ...
def is_file(src: str) -> bool: ...

View File

@@ -0,0 +1,34 @@
from _typeshed import Incomplete
class NetworkApiMixin:
def networks(self, names: Incomplete | None = None, ids: Incomplete | None = None, filters: Incomplete | None = None): ...
def create_network(
self,
name,
driver: Incomplete | None = None,
options: Incomplete | None = None,
ipam: Incomplete | None = None,
check_duplicate: Incomplete | None = None,
internal: bool = False,
labels: Incomplete | None = None,
enable_ipv6: bool = False,
attachable: Incomplete | None = None,
scope: Incomplete | None = None,
ingress: Incomplete | None = None,
): ...
def prune_networks(self, filters: Incomplete | None = None): ...
def remove_network(self, net_id) -> None: ...
def inspect_network(self, net_id, verbose: Incomplete | None = None, scope: Incomplete | None = None): ...
def connect_container_to_network(
self,
container,
net_id,
ipv4_address: Incomplete | None = None,
ipv6_address: Incomplete | None = None,
aliases: Incomplete | None = None,
links: Incomplete | None = None,
link_local_ips: Incomplete | None = None,
driver_opt: Incomplete | None = None,
mac_address: Incomplete | None = None,
) -> None: ...
def disconnect_container_from_network(self, container, net_id, force: bool = False) -> None: ...

View File

@@ -0,0 +1,14 @@
from _typeshed import Incomplete
class PluginApiMixin:
def configure_plugin(self, name, options): ...
def create_plugin(self, name, plugin_data_dir, gzip: bool = False): ...
def disable_plugin(self, name, force: bool = False): ...
def enable_plugin(self, name, timeout: int = 0): ...
def inspect_plugin(self, name): ...
def pull_plugin(self, remote, privileges, name: Incomplete | None = None): ...
def plugins(self): ...
def plugin_privileges(self, name): ...
def push_plugin(self, name): ...
def remove_plugin(self, name, force: bool = False): ...
def upgrade_plugin(self, name, remote, privileges): ...

View File

@@ -0,0 +1,7 @@
from _typeshed import Incomplete
class SecretApiMixin:
def create_secret(self, name, data, labels: Incomplete | None = None, driver: Incomplete | None = None): ...
def inspect_secret(self, id): ...
def remove_secret(self, id): ...
def secrets(self, filters: Incomplete | None = None): ...

View File

@@ -0,0 +1,47 @@
from _typeshed import Incomplete
class ServiceApiMixin:
def create_service(
self,
task_template,
name: Incomplete | None = None,
labels: Incomplete | None = None,
mode: Incomplete | None = None,
update_config: Incomplete | None = None,
networks: Incomplete | None = None,
endpoint_config: Incomplete | None = None,
endpoint_spec: Incomplete | None = None,
rollback_config: Incomplete | None = None,
): ...
def inspect_service(self, service, insert_defaults: Incomplete | None = None): ...
def inspect_task(self, task): ...
def remove_service(self, service): ...
def services(self, filters: Incomplete | None = None, status: Incomplete | None = None): ...
def service_logs(
self,
service,
details: bool = False,
follow: bool = False,
stdout: bool = False,
stderr: bool = False,
since: int = 0,
timestamps: bool = False,
tail: str = "all",
is_tty: Incomplete | None = None,
): ...
def tasks(self, filters: Incomplete | None = None): ...
def update_service(
self,
service,
version,
task_template: Incomplete | None = None,
name: Incomplete | None = None,
labels: Incomplete | None = None,
mode: Incomplete | None = None,
update_config: Incomplete | None = None,
networks: Incomplete | None = None,
endpoint_config: Incomplete | None = None,
endpoint_spec: Incomplete | None = None,
fetch_current_spec: bool = False,
rollback_config: Incomplete | None = None,
): ...

View File

@@ -0,0 +1,41 @@
from _typeshed import Incomplete
log: Incomplete
class SwarmApiMixin:
def create_swarm_spec(self, *args, **kwargs): ...
def get_unlock_key(self): ...
def init_swarm(
self,
advertise_addr: Incomplete | None = None,
listen_addr: str = "0.0.0.0:2377",
force_new_cluster: bool = False,
swarm_spec: Incomplete | None = None,
default_addr_pool: Incomplete | None = None,
subnet_size: Incomplete | None = None,
data_path_addr: Incomplete | None = None,
data_path_port: Incomplete | None = None,
): ...
def inspect_swarm(self): ...
def inspect_node(self, node_id): ...
def join_swarm(
self,
remote_addrs,
join_token,
listen_addr: str = "0.0.0.0:2377",
advertise_addr: Incomplete | None = None,
data_path_addr: Incomplete | None = None,
): ...
def leave_swarm(self, force: bool = False): ...
def nodes(self, filters: Incomplete | None = None): ...
def remove_node(self, node_id, force: bool = False): ...
def unlock_swarm(self, key): ...
def update_node(self, node_id, version, node_spec: Incomplete | None = None): ...
def update_swarm(
self,
version,
swarm_spec: Incomplete | None = None,
rotate_worker_token: bool = False,
rotate_manager_token: bool = False,
rotate_manager_unlock_key: bool = False,
): ...

View File

@@ -0,0 +1,14 @@
from _typeshed import Incomplete
class VolumeApiMixin:
def volumes(self, filters: Incomplete | None = None): ...
def create_volume(
self,
name: Incomplete | None = None,
driver: Incomplete | None = None,
driver_opts: Incomplete | None = None,
labels: Incomplete | None = None,
): ...
def inspect_volume(self, name): ...
def prune_volumes(self, filters: Incomplete | None = None): ...
def remove_volume(self, name, force: bool = False) -> None: ...

View File

@@ -0,0 +1,40 @@
from _typeshed import Incomplete
INDEX_NAME: str
INDEX_URL: Incomplete
TOKEN_USERNAME: str
log: Incomplete
def resolve_repository_name(repo_name): ...
def resolve_index_name(index_name): ...
def get_config_header(client, registry): ...
def split_repo_name(repo_name): ...
def get_credential_store(authconfig, registry): ...
class AuthConfig(dict[str, Incomplete]):
def __init__(self, dct, credstore_env: Incomplete | None = None) -> None: ...
@classmethod
def parse_auth(cls, entries, raise_on_error: bool = False): ...
@classmethod
def load_config(cls, config_path, config_dict, credstore_env: Incomplete | None = None): ...
@property
def auths(self): ...
@property
def creds_store(self): ...
@property
def cred_helpers(self): ...
@property
def is_empty(self): ...
def resolve_authconfig(self, registry: Incomplete | None = None): ...
def get_credential_store(self, registry): ...
def get_all_credentials(self): ...
def add_auth(self, reg, data) -> None: ...
def resolve_authconfig(authconfig, registry: Incomplete | None = None, credstore_env: Incomplete | None = None): ...
def convert_to_hostname(url): ...
def decode_auth(auth): ...
def encode_header(auth): ...
def parse_auth(entries, raise_on_error: bool = False): ...
def load_config(
config_path: Incomplete | None = None, config_dict: Incomplete | None = None, credstore_env: Incomplete | None = None
): ...

View File

@@ -0,0 +1,49 @@
from typing import NoReturn
from docker import APIClient
from docker.models.configs import ConfigCollection
from docker.models.containers import ContainerCollection
from docker.models.images import ImageCollection
from docker.models.networks import NetworkCollection
from docker.models.nodes import NodeCollection
from docker.models.plugins import PluginCollection
from docker.models.secrets import SecretCollection
from docker.models.services import ServiceCollection
from docker.models.swarm import Swarm
from docker.models.volumes import VolumeCollection
class DockerClient:
api: APIClient
def __init__(self, *args, **kwargs) -> None: ...
@classmethod
def from_env(cls, **kwargs) -> DockerClient: ...
@property
def configs(self) -> ConfigCollection: ...
@property
def containers(self) -> ContainerCollection: ...
@property
def images(self) -> ImageCollection: ...
@property
def networks(self) -> NetworkCollection: ...
@property
def nodes(self) -> NodeCollection: ...
@property
def plugins(self) -> PluginCollection: ...
@property
def secrets(self) -> SecretCollection: ...
@property
def services(self) -> ServiceCollection: ...
@property
def swarm(self) -> Swarm: ...
@property
def volumes(self) -> VolumeCollection: ...
def events(self, *args, **kwargs): ...
def df(self): ...
def info(self, *args, **kwargs): ...
def login(self, *args, **kwargs): ...
def ping(self, *args, **kwargs): ...
def version(self, *args, **kwargs): ...
def close(self): ...
def __getattr__(self, name: str) -> NoReturn: ...
from_env = DockerClient.from_env

View File

@@ -0,0 +1,22 @@
from collections.abc import Mapping, Sequence
from typing import Final
DEFAULT_DOCKER_API_VERSION: Final[str]
MINIMUM_DOCKER_API_VERSION: Final[str]
DEFAULT_TIMEOUT_SECONDS: Final[int]
STREAM_HEADER_SIZE_BYTES: Final[int]
CONTAINER_LIMITS_KEYS: Final[Sequence[str]]
DEFAULT_HTTP_HOST: Final[str]
DEFAULT_UNIX_SOCKET: Final[str]
DEFAULT_NPIPE: Final[str]
BYTE_UNITS: Final[Mapping[str, int]]
INSECURE_REGISTRY_DEPRECATION_WARNING: Final[str]
IS_WINDOWS_PLATFORM: Final[bool]
WINDOWS_LONGPATH_PREFIX: Final[str]
DEFAULT_USER_AGENT: Final[str]
DEFAULT_NUM_POOLS: Final[int]
DEFAULT_NUM_POOLS_SSH: Final[int]
DEFAULT_MAX_POOL_SIZE: Final[int]
DEFAULT_DATA_CHUNK_SIZE: Final[int]
DEFAULT_SWARM_ADDR_POOL: Final[Sequence[str]]
DEFAULT_SWARM_SUBNET_SIZE: Final[int]

View File

@@ -0,0 +1,2 @@
from .api import ContextAPI as ContextAPI
from .context import Context as Context

View File

@@ -0,0 +1,30 @@
from _typeshed import Incomplete
from collections.abc import Mapping, Sequence
from docker.context.context import Context
from docker.tls import TLSConfig
class ContextAPI:
DEFAULT_CONTEXT: Context
@classmethod
def create_context(
cls,
name: str,
orchestrator: str | None = None,
host: str | None = None,
tls_cfg: TLSConfig | None = None,
default_namespace: str | None = None,
skip_tls_verify: bool = False,
) -> Context: ...
@classmethod
def get_context(cls, name: str | None = None) -> Context: ...
@classmethod
def contexts(cls) -> Sequence[Context]: ...
@classmethod
def get_current_context(cls) -> Context: ...
@classmethod
def set_current_context(cls, name: str = "default") -> None: ...
@classmethod
def remove_context(cls, name: str) -> None: ...
@classmethod
def inspect_context(cls, name: str = "default") -> Mapping[str, Incomplete]: ...

View File

@@ -0,0 +1,10 @@
METAFILE: str
def get_current_context_name() -> str: ...
def write_context_name_to_docker_config(name: str | None = None) -> Exception | None: ...
def get_context_id(name: str) -> str: ...
def get_context_dir() -> str: ...
def get_meta_dir(name: str | None = None) -> str: ...
def get_meta_file(name: str) -> str: ...
def get_tls_dir(name: str | None = None, endpoint: str = "") -> str: ...
def get_context_host(path: str | None = None, tls: bool = False) -> str: ...

View File

@@ -0,0 +1,47 @@
from _typeshed import Incomplete
class Context:
name: Incomplete
context_type: Incomplete
orchestrator: Incomplete
endpoints: Incomplete
tls_cfg: Incomplete
meta_path: str
tls_path: str
def __init__(
self,
name,
orchestrator: Incomplete | None = None,
host: Incomplete | None = None,
endpoints: Incomplete | None = None,
tls: bool = False,
) -> None: ...
def set_endpoint(
self,
name: str = "docker",
host: Incomplete | None = None,
tls_cfg: Incomplete | None = None,
skip_tls_verify: bool = False,
def_namespace: Incomplete | None = None,
) -> None: ...
def inspect(self): ...
@classmethod
def load_context(cls, name): ...
def save(self) -> None: ...
def remove(self) -> None: ...
def __call__(self): ...
def is_docker_host(self): ...
@property
def Name(self): ...
@property
def Host(self): ...
@property
def Orchestrator(self): ...
@property
def Metadata(self): ...
@property
def TLSConfig(self): ...
@property
def TLSMaterial(self): ...
@property
def Storage(self): ...

View File

@@ -0,0 +1,8 @@
from .constants import (
DEFAULT_LINUX_STORE as DEFAULT_LINUX_STORE,
DEFAULT_OSX_STORE as DEFAULT_OSX_STORE,
DEFAULT_WIN32_STORE as DEFAULT_WIN32_STORE,
PROGRAM_PREFIX as PROGRAM_PREFIX,
)
from .errors import CredentialsNotFound as CredentialsNotFound, StoreError as StoreError
from .store import Store as Store

View File

@@ -0,0 +1,4 @@
PROGRAM_PREFIX: str
DEFAULT_LINUX_STORE: str
DEFAULT_OSX_STORE: str
DEFAULT_WIN32_STORE: str

View File

@@ -0,0 +1,7 @@
from subprocess import CalledProcessError
class StoreError(RuntimeError): ...
class CredentialsNotFound(StoreError): ...
class InitializationError(StoreError): ...
def process_store_error(cpe: CalledProcessError, program: str) -> Exception: ...

View File

@@ -0,0 +1,11 @@
from _typeshed import Incomplete
class Store:
program: Incomplete
exe: Incomplete
environment: Incomplete
def __init__(self, program, environment: Incomplete | None = None) -> None: ...
def get(self, server): ...
def store(self, server, username, secret): ...
def erase(self, server) -> None: ...
def list(self): ...

View File

@@ -0,0 +1 @@
def create_environment_dict(overrides): ...

View File

@@ -0,0 +1,71 @@
from _typeshed import Incomplete
from collections.abc import Mapping
from typing import NoReturn
from docker.models.containers import Container
from requests import HTTPError, Response
class DockerException(Exception): ...
def create_api_error_from_http_exception(e: HTTPError) -> NoReturn: ...
class APIError(HTTPError, DockerException):
response: Response | None
explanation: str | None
def __init__(self, message: str, response: Response | None = None, explanation: str | None = None) -> None: ...
@property
def status_code(self) -> int | None: ...
def is_error(self) -> bool: ...
def is_client_error(self) -> bool: ...
def is_server_error(self) -> bool: ...
class NotFound(APIError): ...
class ImageNotFound(NotFound): ...
class InvalidVersion(DockerException): ...
class InvalidRepository(DockerException): ...
class InvalidConfigFile(DockerException): ...
class InvalidArgument(DockerException): ...
class DeprecatedMethod(DockerException): ...
class TLSParameterError(DockerException):
msg: str
def __init__(self, msg: str) -> None: ...
class NullResource(DockerException, ValueError): ...
class ContainerError(DockerException):
container: Container
exit_status: Incomplete
command: Incomplete
image: Incomplete
stderr: str | None
def __init__(self, container: Container, exit_status, command, image, stderr: str | None) -> None: ...
class StreamParseError(RuntimeError):
msg: str
def __init__(self, reason: str) -> None: ...
class BuildError(DockerException):
msg: str
build_log: str
def __init__(self, reason: str, build_log: str) -> None: ...
class ImageLoadError(DockerException): ...
def create_unexpected_kwargs_error(name, kwargs: Mapping[str, Incomplete]) -> NoReturn: ...
class MissingContextParameter(DockerException):
param: str
def __init__(self, param: str) -> None: ...
class ContextAlreadyExists(DockerException):
name: str
def __init__(self, name: str) -> None: ...
class ContextException(DockerException):
msg: str
def __init__(self, msg: str) -> None: ...
class ContextNotFound(DockerException):
name: str
def __init__(self, name: str) -> None: ...

View File

View File

@@ -0,0 +1,13 @@
from .resource import Collection, Model
class Config(Model):
id_attribute: str
@property
def name(self): ...
def remove(self): ...
class ConfigCollection(Collection):
model: type[Config]
def create(self, **kwargs): ...
def get(self, config_id): ...
def list(self, **kwargs): ...

View File

@@ -0,0 +1,81 @@
from _typeshed import Incomplete
from typing import NamedTuple
from .resource import Collection, Model
class Container(Model):
@property
def name(self): ...
@property
def image(self): ...
@property
def labels(self): ...
@property
def status(self): ...
@property
def health(self): ...
@property
def ports(self): ...
def attach(self, **kwargs): ...
def attach_socket(self, **kwargs): ...
def commit(self, repository: str | None = None, tag: str | None = None, **kwargs): ...
def diff(self): ...
def exec_run(
self,
cmd,
stdout: bool = True,
stderr: bool = True,
stdin: bool = False,
tty: bool = False,
privileged: bool = False,
user: str = "",
detach: bool = False,
stream: bool = False,
socket: bool = False,
environment: Incomplete | None = None,
workdir: Incomplete | None = None,
demux: bool = False,
): ...
def export(self, chunk_size=2097152): ...
def get_archive(self, path, chunk_size=2097152, encode_stream: bool = False): ...
def kill(self, signal: Incomplete | None = None): ...
def logs(self, **kwargs): ...
def pause(self): ...
def put_archive(self, path, data): ...
def remove(self, **kwargs): ...
def rename(self, name): ...
def resize(self, height, width): ...
def restart(self, **kwargs): ...
def start(self, **kwargs): ...
def stats(self, **kwargs): ...
def stop(self, **kwargs): ...
def top(self, **kwargs): ...
def unpause(self): ...
def update(self, **kwargs): ...
def wait(self, **kwargs): ...
class ContainerCollection(Collection):
model: type[Container]
def run(
self, image, command: Incomplete | None = None, stdout: bool = True, stderr: bool = False, remove: bool = False, **kwargs
): ...
def create(self, image, command: Incomplete | None = None, **kwargs): ... # type:ignore[override]
def get(self, container_id): ...
def list(
self,
all: bool = False,
before: Incomplete | None = None,
filters: Incomplete | None = None,
limit: int = -1,
since: Incomplete | None = None,
sparse: bool = False,
ignore_removed: bool = False,
): ...
def prune(self, filters: Incomplete | None = None): ...
RUN_CREATE_KWARGS: Incomplete
RUN_HOST_CONFIG_KWARGS: Incomplete
class ExecResult(NamedTuple):
exit_code: Incomplete
output: Incomplete

View File

@@ -0,0 +1,43 @@
from _typeshed import Incomplete
from .resource import Collection, Model
class Image(Model):
@property
def labels(self): ...
@property
def short_id(self): ...
@property
def tags(self): ...
def history(self): ...
def remove(self, force: bool = False, noprune: bool = False): ...
def save(self, chunk_size=2097152, named: bool = False): ...
def tag(self, repository, tag: str | None = None, **kwargs): ...
class RegistryData(Model):
image_name: Incomplete
def __init__(self, image_name, *args, **kwargs) -> None: ...
@property
def id(self): ...
@property
def short_id(self): ...
def pull(self, platform: Incomplete | None = None): ...
def has_platform(self, platform): ...
attrs: Incomplete
def reload(self) -> None: ...
class ImageCollection(Collection):
model: type[Image]
def build(self, **kwargs): ...
def get(self, name): ...
def get_registry_data(self, name, auth_config: Incomplete | None = None): ...
def list(self, name: Incomplete | None = None, all: bool = False, filters: Incomplete | None = None): ...
def load(self, data): ...
def pull(self, repository, tag: str | None = None, all_tags: bool = False, **kwargs): ...
def push(self, repository, tag: str | None = None, **kwargs): ...
def remove(self, *args, **kwargs) -> None: ...
def search(self, *args, **kwargs): ...
def prune(self, filters: Incomplete | None = None): ...
def prune_builds(self, *args, **kwargs): ...
def normalize_platform(platform, engine_info): ...

View File

@@ -0,0 +1,19 @@
from _typeshed import Incomplete
from .resource import Collection, Model
class Network(Model):
@property
def name(self): ...
@property
def containers(self): ...
def connect(self, container, *args, **kwargs): ...
def disconnect(self, container, *args, **kwargs): ...
def remove(self): ...
class NetworkCollection(Collection):
model: type[Network]
def create(self, name, *args, **kwargs): ...
def get(self, network_id, *args, **kwargs): ...
def list(self, *args, **kwargs): ...
def prune(self, filters: Incomplete | None = None): ...

View File

@@ -0,0 +1,13 @@
from .resource import Collection, Model
class Node(Model):
id_attribute: str
@property
def version(self): ...
def update(self, node_spec): ...
def remove(self, force: bool = False): ...
class NodeCollection(Collection):
model: type[Node]
def get(self, node_id): ...
def list(self, *args, **kwargs): ...

View File

@@ -0,0 +1,25 @@
from _typeshed import Incomplete
from collections.abc import Generator
from .resource import Collection, Model
class Plugin(Model):
@property
def name(self): ...
@property
def enabled(self): ...
@property
def settings(self): ...
def configure(self, options) -> None: ...
def disable(self, force: bool = False) -> None: ...
def enable(self, timeout: int = 0) -> None: ...
def push(self): ...
def remove(self, force: bool = False): ...
def upgrade(self, remote: Incomplete | None = None) -> Generator[Incomplete, Incomplete, None]: ...
class PluginCollection(Collection):
model: type[Plugin]
def create(self, name, plugin_data_dir, gzip: bool = False): ... # type:ignore[override]
def get(self, name): ...
def install(self, remote_name, local_name: Incomplete | None = None): ...
def list(self): ...

View File

@@ -0,0 +1,27 @@
from _typeshed import Incomplete
class Model:
id_attribute: str
client: Incomplete
collection: Incomplete
attrs: Incomplete
def __init__(
self, attrs: Incomplete | None = None, client: Incomplete | None = None, collection: Incomplete | None = None
) -> None: ...
def __eq__(self, other): ...
def __hash__(self): ...
@property
def id(self): ...
@property
def short_id(self): ...
def reload(self) -> None: ...
class Collection:
model: Incomplete
client: Incomplete
def __init__(self, client: Incomplete | None = None) -> None: ...
def __call__(self, *args, **kwargs) -> None: ...
def list(self) -> None: ...
def get(self, key) -> None: ...
def create(self, attrs: Incomplete | None = None) -> None: ...
def prepare_model(self, attrs): ...

View File

@@ -0,0 +1,13 @@
from .resource import Collection, Model
class Secret(Model):
id_attribute: str
@property
def name(self): ...
def remove(self): ...
class SecretCollection(Collection):
model: type[Secret]
def create(self, **kwargs): ...
def get(self, secret_id): ...
def list(self, **kwargs): ...

View File

@@ -0,0 +1,27 @@
from _typeshed import Incomplete
from .resource import Collection, Model
class Service(Model):
id_attribute: str
@property
def name(self): ...
@property
def version(self): ...
def remove(self): ...
def tasks(self, filters: Incomplete | None = None): ...
def update(self, **kwargs): ...
def logs(self, **kwargs): ...
def scale(self, replicas): ...
def force_update(self): ...
class ServiceCollection(Collection):
model: type[Service]
def create(self, image, command: Incomplete | None = None, **kwargs): ... # type:ignore[override]
def get(self, service_id, insert_defaults: Incomplete | None = None): ...
def list(self, **kwargs): ...
CONTAINER_SPEC_KWARGS: Incomplete
TASK_TEMPLATE_KWARGS: Incomplete
CREATE_SERVICE_KWARGS: Incomplete
PLACEMENT_KWARGS: Incomplete

View File

@@ -0,0 +1,33 @@
from _typeshed import Incomplete
from .resource import Model
class Swarm(Model):
id_attribute: str
def __init__(self, *args, **kwargs) -> None: ...
@property
def version(self): ...
def get_unlock_key(self): ...
def init(
self,
advertise_addr: Incomplete | None = None,
listen_addr: str = "0.0.0.0:2377",
force_new_cluster: bool = False,
default_addr_pool: Incomplete | None = None,
subnet_size: Incomplete | None = None,
data_path_addr: Incomplete | None = None,
data_path_port: Incomplete | None = None,
**kwargs,
): ...
def join(self, *args, **kwargs): ...
def leave(self, *args, **kwargs): ...
attrs: Incomplete
def reload(self) -> None: ...
def unlock(self, key): ...
def update(
self,
rotate_worker_token: bool = False,
rotate_manager_token: bool = False,
rotate_manager_unlock_key: bool = False,
**kwargs,
): ...

View File

@@ -0,0 +1,16 @@
from _typeshed import Incomplete
from .resource import Collection, Model
class Volume(Model):
id_attribute: str
@property
def name(self): ...
def remove(self, force: bool = False): ...
class VolumeCollection(Collection):
model: type[Volume]
def create(self, name: Incomplete | None = None, **kwargs): ... # type:ignore[override]
def get(self, volume_id): ...
def list(self, **kwargs): ...
def prune(self, filters: Incomplete | None = None): ...

View File

@@ -0,0 +1,10 @@
from docker import APIClient
class TLSConfig:
cert: tuple[str, str] | None
ca_cert: str | None
verify: bool | str | None
def __init__(
self, client_cert: tuple[str, str] | None = None, ca_cert: str | None = None, verify: bool | str | None = None
) -> None: ...
def configure_client(self, client: APIClient) -> None: ...

View File

@@ -0,0 +1,4 @@
from .npipeconn import NpipeHTTPAdapter as NpipeHTTPAdapter
from .npipesocket import NpipeSocket as NpipeSocket
from .sshconn import SSHHTTPAdapter as SSHHTTPAdapter
from .unixconn import UnixHTTPAdapter as UnixHTTPAdapter

View File

@@ -0,0 +1,4 @@
import requests.adapters
class BaseHTTPAdapter(requests.adapters.HTTPAdapter):
def close(self) -> None: ...

View File

@@ -0,0 +1,29 @@
from _typeshed import Incomplete
import urllib3
import urllib3.connection
from docker.transport.basehttpadapter import BaseHTTPAdapter
RecentlyUsedContainer: Incomplete
class NpipeHTTPConnection(urllib3.connection.HTTPConnection):
npipe_path: Incomplete
timeout: Incomplete
def __init__(self, npipe_path, timeout: int = 60) -> None: ...
sock: Incomplete
def connect(self) -> None: ...
class NpipeHTTPConnectionPool(urllib3.connectionpool.HTTPConnectionPool):
npipe_path: Incomplete
timeout: Incomplete
def __init__(self, npipe_path, timeout: int = 60, maxsize: int = 10) -> None: ...
class NpipeHTTPAdapter(BaseHTTPAdapter):
__attrs__: Incomplete
npipe_path: Incomplete
timeout: Incomplete
max_pool_size: Incomplete
pools: Incomplete
def __init__(self, base_url, timeout: int = 60, pool_connections=..., max_pool_size=...) -> None: ...
def get_connection(self, url, proxies: Incomplete | None = None): ...
def request_url(self, request, proxies): ...

View File

@@ -0,0 +1,49 @@
import io
from _typeshed import Incomplete
cERROR_PIPE_BUSY: int
cSECURITY_SQOS_PRESENT: int
cSECURITY_ANONYMOUS: int
MAXIMUM_RETRY_COUNT: int
def check_closed(f): ...
class NpipeSocket:
def __init__(self, handle: Incomplete | None = None) -> None: ...
def accept(self) -> None: ...
def bind(self, address) -> None: ...
def close(self) -> None: ...
flags: Incomplete
def connect(self, address, retry_count: int = 0): ...
def connect_ex(self, address): ...
def detach(self): ...
def dup(self): ...
def getpeername(self): ...
def getsockname(self): ...
def getsockopt(self, level, optname, buflen: Incomplete | None = None) -> None: ...
def ioctl(self, control, option) -> None: ...
def listen(self, backlog) -> None: ...
def makefile(self, mode: Incomplete | None = None, bufsize: Incomplete | None = None): ...
def recv(self, bufsize, flags: int = 0): ...
def recvfrom(self, bufsize, flags: int = 0): ...
def recvfrom_into(self, buf, nbytes: int = 0, flags: int = 0): ...
def recv_into(self, buf, nbytes: int = 0): ...
def send(self, string, flags: int = 0): ...
def sendall(self, string, flags: int = 0): ...
def sendto(self, string, address): ...
def setblocking(self, flag): ...
def settimeout(self, value) -> None: ...
def gettimeout(self): ...
def setsockopt(self, level, optname, value) -> None: ...
def shutdown(self, how): ...
class NpipeFileIOBase(io.RawIOBase):
sock: Incomplete
def __init__(self, npipe_socket) -> None: ...
def close(self) -> None: ...
def fileno(self): ...
def isatty(self): ...
def readable(self): ...
def readinto(self, buf): ...
def seekable(self): ...
def writable(self): ...

View File

@@ -0,0 +1,49 @@
import socket
from _typeshed import Incomplete
import urllib3
import urllib3.connection
from docker.transport.basehttpadapter import BaseHTTPAdapter
RecentlyUsedContainer: Incomplete
class SSHSocket(socket.socket):
host: Incomplete
port: Incomplete
user: Incomplete
proc: Incomplete
def __init__(self, host) -> None: ...
def connect(self, **kwargs) -> None: ... # type:ignore[override]
def sendall(self, data) -> None: ... # type:ignore[override]
def send(self, data): ...
def recv(self, n): ...
def makefile(self, mode): ...
def close(self) -> None: ...
class SSHConnection(urllib3.connection.HTTPConnection):
ssh_transport: Incomplete
timeout: Incomplete
ssh_host: Incomplete
def __init__(self, ssh_transport: Incomplete | None = None, timeout: int = 60, host: Incomplete | None = None) -> None: ...
sock: Incomplete
def connect(self) -> None: ...
class SSHConnectionPool(urllib3.connectionpool.HTTPConnectionPool):
scheme: str
ssh_transport: Incomplete
timeout: Incomplete
ssh_host: Incomplete
def __init__(
self, ssh_client: Incomplete | None = None, timeout: int = 60, maxsize: int = 10, host: Incomplete | None = None
) -> None: ...
class SSHHTTPAdapter(BaseHTTPAdapter):
__attrs__: Incomplete
ssh_client: Incomplete
ssh_host: Incomplete
timeout: Incomplete
max_pool_size: Incomplete
pools: Incomplete
def __init__(self, base_url, timeout: int = 60, pool_connections=..., max_pool_size=..., shell_out: bool = False) -> None: ...
def get_connection(self, url, proxies: Incomplete | None = None): ...
def close(self) -> None: ...

View File

@@ -0,0 +1,31 @@
from _typeshed import Incomplete
import urllib3
import urllib3.connection
from docker.transport.basehttpadapter import BaseHTTPAdapter
RecentlyUsedContainer: Incomplete
class UnixHTTPConnection(urllib3.connection.HTTPConnection):
base_url: Incomplete
unix_socket: Incomplete
timeout: Incomplete
def __init__(self, base_url, unix_socket, timeout: int = 60) -> None: ...
sock: Incomplete
def connect(self) -> None: ...
class UnixHTTPConnectionPool(urllib3.connectionpool.HTTPConnectionPool):
base_url: Incomplete
socket_path: Incomplete
timeout: Incomplete
def __init__(self, base_url, socket_path, timeout: int = 60, maxsize: int = 10) -> None: ...
class UnixHTTPAdapter(BaseHTTPAdapter):
__attrs__: Incomplete
socket_path: Incomplete
timeout: Incomplete
max_pool_size: Incomplete
pools: Incomplete
def __init__(self, socket_url, timeout: int = 60, pool_connections=25, max_pool_size=10) -> None: ...
def get_connection(self, url, proxies: Incomplete | None = None): ...
def request_url(self, request, proxies): ...

View File

@@ -0,0 +1,35 @@
from .containers import (
ContainerConfig as ContainerConfig,
DeviceRequest as DeviceRequest,
HostConfig as HostConfig,
LogConfig as LogConfig,
Ulimit as Ulimit,
)
from .daemon import CancellableStream as CancellableStream
from .healthcheck import Healthcheck as Healthcheck
from .networks import (
EndpointConfig as EndpointConfig,
IPAMConfig as IPAMConfig,
IPAMPool as IPAMPool,
NetworkingConfig as NetworkingConfig,
)
from .services import (
ConfigReference as ConfigReference,
ContainerSpec as ContainerSpec,
DNSConfig as DNSConfig,
DriverConfig as DriverConfig,
EndpointSpec as EndpointSpec,
Mount as Mount,
NetworkAttachmentConfig as NetworkAttachmentConfig,
Placement as Placement,
PlacementPreference as PlacementPreference,
Privileges as Privileges,
Resources as Resources,
RestartPolicy as RestartPolicy,
RollbackConfig as RollbackConfig,
SecretReference as SecretReference,
ServiceMode as ServiceMode,
TaskTemplate as TaskTemplate,
UpdateConfig as UpdateConfig,
)
from .swarm import SwarmExternalCA as SwarmExternalCA, SwarmSpec as SwarmSpec

View File

@@ -0,0 +1,5 @@
from _typeshed import Incomplete
from collections.abc import Mapping
class DictType(dict[str, Incomplete]):
def __init__(self, init: Mapping[str, Incomplete]) -> None: ...

View File

@@ -0,0 +1,164 @@
from _typeshed import Incomplete
from .base import DictType
class LogConfigTypesEnum:
JSON: Incomplete
SYSLOG: Incomplete
JOURNALD: Incomplete
GELF: Incomplete
FLUENTD: Incomplete
NONE: Incomplete
class LogConfig(DictType):
types: type[LogConfigTypesEnum]
def __init__(self, **kwargs) -> None: ...
@property
def type(self): ...
@type.setter
def type(self, value) -> None: ...
@property
def config(self): ...
def set_config_value(self, key, value) -> None: ...
def unset_config(self, key) -> None: ...
class Ulimit(DictType):
def __init__(self, **kwargs) -> None: ...
@property
def name(self): ...
@name.setter
def name(self, value) -> None: ...
@property
def soft(self): ...
@soft.setter
def soft(self, value) -> None: ...
@property
def hard(self): ...
@hard.setter
def hard(self, value) -> None: ...
class DeviceRequest(DictType):
def __init__(self, **kwargs) -> None: ...
@property
def driver(self): ...
@driver.setter
def driver(self, value) -> None: ...
@property
def count(self): ...
@count.setter
def count(self, value) -> None: ...
@property
def device_ids(self): ...
@device_ids.setter
def device_ids(self, value) -> None: ...
@property
def capabilities(self): ...
@capabilities.setter
def capabilities(self, value) -> None: ...
@property
def options(self): ...
@options.setter
def options(self, value) -> None: ...
class HostConfig(dict[str, Incomplete]):
def __init__(
self,
version,
binds: Incomplete | None = None,
port_bindings: Incomplete | None = None,
lxc_conf: Incomplete | None = None,
publish_all_ports: bool = False,
links: Incomplete | None = None,
privileged: bool = False,
dns: Incomplete | None = None,
dns_search: Incomplete | None = None,
volumes_from: Incomplete | None = None,
network_mode: Incomplete | None = None,
restart_policy: Incomplete | None = None,
cap_add: Incomplete | None = None,
cap_drop: Incomplete | None = None,
devices: Incomplete | None = None,
extra_hosts: Incomplete | None = None,
read_only: Incomplete | None = None,
pid_mode: Incomplete | None = None,
ipc_mode: Incomplete | None = None,
security_opt: Incomplete | None = None,
ulimits: Incomplete | None = None,
log_config: Incomplete | None = None,
mem_limit: Incomplete | None = None,
memswap_limit: Incomplete | None = None,
mem_reservation: Incomplete | None = None,
kernel_memory: Incomplete | None = None,
mem_swappiness: Incomplete | None = None,
cgroup_parent: Incomplete | None = None,
group_add: Incomplete | None = None,
cpu_quota: Incomplete | None = None,
cpu_period: Incomplete | None = None,
blkio_weight: Incomplete | None = None,
blkio_weight_device: Incomplete | None = None,
device_read_bps: Incomplete | None = None,
device_write_bps: Incomplete | None = None,
device_read_iops: Incomplete | None = None,
device_write_iops: Incomplete | None = None,
oom_kill_disable: bool = False,
shm_size: Incomplete | None = None,
sysctls: Incomplete | None = None,
tmpfs: Incomplete | None = None,
oom_score_adj: Incomplete | None = None,
dns_opt: Incomplete | None = None,
cpu_shares: Incomplete | None = None,
cpuset_cpus: Incomplete | None = None,
userns_mode: Incomplete | None = None,
uts_mode: Incomplete | None = None,
pids_limit: Incomplete | None = None,
isolation: Incomplete | None = None,
auto_remove: bool = False,
storage_opt: Incomplete | None = None,
init: Incomplete | None = None,
init_path: Incomplete | None = None,
volume_driver: Incomplete | None = None,
cpu_count: Incomplete | None = None,
cpu_percent: Incomplete | None = None,
nano_cpus: Incomplete | None = None,
cpuset_mems: Incomplete | None = None,
runtime: Incomplete | None = None,
mounts: Incomplete | None = None,
cpu_rt_period: Incomplete | None = None,
cpu_rt_runtime: Incomplete | None = None,
device_cgroup_rules: Incomplete | None = None,
device_requests: Incomplete | None = None,
cgroupns: Incomplete | None = None,
) -> None: ...
def host_config_type_error(param, param_value, expected): ...
def host_config_version_error(param, version, less_than: bool = True): ...
def host_config_value_error(param, param_value): ...
def host_config_incompatible_error(param, param_value, incompatible_param): ...
class ContainerConfig(dict[str, Incomplete]):
def __init__(
self,
version,
image,
command,
hostname: Incomplete | None = None,
user: Incomplete | None = None,
detach: bool = False,
stdin_open: bool = False,
tty: bool = False,
ports: Incomplete | None = None,
environment: Incomplete | None = None,
volumes: Incomplete | None = None,
network_disabled: bool = False,
entrypoint: Incomplete | None = None,
working_dir: Incomplete | None = None,
domainname: Incomplete | None = None,
host_config: Incomplete | None = None,
mac_address: Incomplete | None = None,
labels: Incomplete | None = None,
stop_signal: Incomplete | None = None,
networking_config: Incomplete | None = None,
healthcheck: Incomplete | None = None,
stop_timeout: Incomplete | None = None,
runtime: Incomplete | None = None,
) -> None: ...

View File

@@ -0,0 +1,6 @@
class CancellableStream:
def __init__(self, stream, response) -> None: ...
def __iter__(self): ...
def __next__(self): ...
next = __next__
def close(self) -> None: ...

View File

@@ -0,0 +1,24 @@
from .base import DictType
class Healthcheck(DictType):
def __init__(self, **kwargs) -> None: ...
@property
def test(self): ...
@test.setter
def test(self, value) -> None: ...
@property
def interval(self): ...
@interval.setter
def interval(self, value) -> None: ...
@property
def timeout(self): ...
@timeout.setter
def timeout(self, value) -> None: ...
@property
def retries(self): ...
@retries.setter
def retries(self, value) -> None: ...
@property
def start_period(self): ...
@start_period.setter
def start_period(self, value) -> None: ...

View File

@@ -0,0 +1,31 @@
from _typeshed import Incomplete
class EndpointConfig(dict[str, Incomplete]):
def __init__(
self,
version,
aliases: Incomplete | None = None,
links: Incomplete | None = None,
ipv4_address: Incomplete | None = None,
ipv6_address: Incomplete | None = None,
link_local_ips: Incomplete | None = None,
driver_opt: Incomplete | None = None,
mac_address: Incomplete | None = None,
) -> None: ...
class NetworkingConfig(dict[str, Incomplete]):
def __init__(self, endpoints_config: Incomplete | None = None) -> None: ...
class IPAMConfig(dict[str, Incomplete]):
def __init__(
self, driver: str = "default", pool_configs: Incomplete | None = None, options: Incomplete | None = None
) -> None: ...
class IPAMPool(dict[str, Incomplete]):
def __init__(
self,
subnet: Incomplete | None = None,
iprange: Incomplete | None = None,
gateway: Incomplete | None = None,
aux_addresses: Incomplete | None = None,
) -> None: ...

View File

@@ -0,0 +1,170 @@
from _typeshed import Incomplete
class TaskTemplate(dict[str, Incomplete]):
def __init__(
self,
container_spec,
resources: Incomplete | None = None,
restart_policy: Incomplete | None = None,
placement: Incomplete | None = None,
log_driver: Incomplete | None = None,
networks: Incomplete | None = None,
force_update: Incomplete | None = None,
) -> None: ...
@property
def container_spec(self): ...
@property
def resources(self): ...
@property
def restart_policy(self): ...
@property
def placement(self): ...
class ContainerSpec(dict[str, Incomplete]):
def __init__(
self,
image,
command: Incomplete | None = None,
args: Incomplete | None = None,
hostname: Incomplete | None = None,
env: Incomplete | None = None,
workdir: Incomplete | None = None,
user: Incomplete | None = None,
labels: Incomplete | None = None,
mounts: Incomplete | None = None,
stop_grace_period: Incomplete | None = None,
secrets: Incomplete | None = None,
tty: Incomplete | None = None,
groups: Incomplete | None = None,
open_stdin: Incomplete | None = None,
read_only: Incomplete | None = None,
stop_signal: Incomplete | None = None,
healthcheck: Incomplete | None = None,
hosts: Incomplete | None = None,
dns_config: Incomplete | None = None,
configs: Incomplete | None = None,
privileges: Incomplete | None = None,
isolation: Incomplete | None = None,
init: Incomplete | None = None,
cap_add: Incomplete | None = None,
cap_drop: Incomplete | None = None,
sysctls: Incomplete | None = None,
) -> None: ...
class Mount(dict[str, Incomplete]):
def __init__(
self,
target,
source,
type: str = "volume",
read_only: bool = False,
consistency: Incomplete | None = None,
propagation: Incomplete | None = None,
no_copy: bool = False,
labels: Incomplete | None = None,
driver_config: Incomplete | None = None,
tmpfs_size: Incomplete | None = None,
tmpfs_mode: Incomplete | None = None,
) -> None: ...
@classmethod
def parse_mount_string(cls, string): ...
class Resources(dict[str, Incomplete]):
def __init__(
self,
cpu_limit: Incomplete | None = None,
mem_limit: Incomplete | None = None,
cpu_reservation: Incomplete | None = None,
mem_reservation: Incomplete | None = None,
generic_resources: Incomplete | None = None,
) -> None: ...
class UpdateConfig(dict[str, Incomplete]):
def __init__(
self,
parallelism: int = 0,
delay: Incomplete | None = None,
failure_action: str = "continue",
monitor: Incomplete | None = None,
max_failure_ratio: Incomplete | None = None,
order: Incomplete | None = None,
) -> None: ...
class RollbackConfig(UpdateConfig): ...
class RestartConditionTypesEnum:
NONE: Incomplete
ON_FAILURE: Incomplete
ANY: Incomplete
class RestartPolicy(dict[str, Incomplete]):
condition_types: type[RestartConditionTypesEnum]
def __init__(self, condition="none", delay: int = 0, max_attempts: int = 0, window: int = 0) -> None: ...
class DriverConfig(dict[str, Incomplete]):
def __init__(self, name, options: Incomplete | None = None) -> None: ...
class EndpointSpec(dict[str, Incomplete]):
def __init__(self, mode: Incomplete | None = None, ports: Incomplete | None = None) -> None: ...
def convert_service_ports(ports): ...
class ServiceMode(dict[str, Incomplete]):
mode: Incomplete
def __init__(self, mode, replicas: Incomplete | None = None, concurrency: Incomplete | None = None) -> None: ...
@property
def replicas(self): ...
class SecretReference(dict[str, Incomplete]):
def __init__(
self,
secret_id,
secret_name,
filename: Incomplete | None = None,
uid: Incomplete | None = None,
gid: Incomplete | None = None,
mode: int = 292,
) -> None: ...
class ConfigReference(dict[str, Incomplete]):
def __init__(
self,
config_id,
config_name,
filename: Incomplete | None = None,
uid: Incomplete | None = None,
gid: Incomplete | None = None,
mode: int = 292,
) -> None: ...
class Placement(dict[str, Incomplete]):
def __init__(
self,
constraints: Incomplete | None = None,
preferences: Incomplete | None = None,
platforms: Incomplete | None = None,
maxreplicas: Incomplete | None = None,
) -> None: ...
class PlacementPreference(dict[str, Incomplete]):
def __init__(self, strategy, descriptor) -> None: ...
class DNSConfig(dict[str, Incomplete]):
def __init__(
self, nameservers: Incomplete | None = None, search: Incomplete | None = None, options: Incomplete | None = None
) -> None: ...
class Privileges(dict[str, Incomplete]):
def __init__(
self,
credentialspec_file: Incomplete | None = None,
credentialspec_registry: Incomplete | None = None,
selinux_disable: Incomplete | None = None,
selinux_user: Incomplete | None = None,
selinux_role: Incomplete | None = None,
selinux_type: Incomplete | None = None,
selinux_level: Incomplete | None = None,
) -> None: ...
class NetworkAttachmentConfig(dict[str, Incomplete]):
def __init__(self, target, aliases: Incomplete | None = None, options: Incomplete | None = None) -> None: ...

View File

@@ -0,0 +1,29 @@
from _typeshed import Incomplete
from typing import Any
class SwarmSpec(dict[str, Any]):
def __init__(
self,
version,
task_history_retention_limit: Incomplete | None = None,
snapshot_interval: Incomplete | None = None,
keep_old_snapshots: Incomplete | None = None,
log_entries_for_slow_followers: Incomplete | None = None,
heartbeat_tick: Incomplete | None = None,
election_tick: Incomplete | None = None,
dispatcher_heartbeat_period: Incomplete | None = None,
node_cert_expiry: Incomplete | None = None,
external_cas: Incomplete | None = None,
name: Incomplete | None = None,
labels: Incomplete | None = None,
signing_ca_cert: Incomplete | None = None,
signing_ca_key: Incomplete | None = None,
ca_force_rotate: Incomplete | None = None,
autolock_managers: Incomplete | None = None,
log_driver: Incomplete | None = None,
) -> None: ...
class SwarmExternalCA(dict[str, Any]):
def __init__(
self, url, protocol: Incomplete | None = None, options: Incomplete | None = None, ca_cert: Incomplete | None = None
) -> None: ...

View File

@@ -0,0 +1,32 @@
from .build import (
create_archive as create_archive,
exclude_paths as exclude_paths,
match_tag as match_tag,
mkbuildcontext as mkbuildcontext,
tar as tar,
)
from .decorators import check_resource as check_resource, minimum_version as minimum_version, update_headers as update_headers
from .utils import (
compare_version as compare_version,
convert_filters as convert_filters,
convert_port_bindings as convert_port_bindings,
convert_service_networks as convert_service_networks,
convert_volume_binds as convert_volume_binds,
create_host_config as create_host_config,
create_ipam_config as create_ipam_config,
create_ipam_pool as create_ipam_pool,
datetime_to_timestamp as datetime_to_timestamp,
decode_json_header as decode_json_header,
format_environment as format_environment,
format_extra_hosts as format_extra_hosts,
kwargs_from_env as kwargs_from_env,
normalize_links as normalize_links,
parse_bytes as parse_bytes,
parse_devices as parse_devices,
parse_env_file as parse_env_file,
parse_host as parse_host,
parse_repository_tag as parse_repository_tag,
split_command as split_command,
version_gte as version_gte,
version_lt as version_lt,
)

View File

@@ -0,0 +1,38 @@
from _typeshed import Incomplete
def match_tag(tag: str) -> bool: ...
def tar(
path,
exclude: Incomplete | None = None,
dockerfile: Incomplete | None = None,
fileobj: Incomplete | None = None,
gzip: bool = False,
): ...
def exclude_paths(root, patterns, dockerfile: Incomplete | None = None): ...
def build_file_list(root): ...
def create_archive(
root,
files: Incomplete | None = None,
fileobj: Incomplete | None = None,
gzip: bool = False,
extra_files: Incomplete | None = None,
): ...
def mkbuildcontext(dockerfile): ...
def split_path(p): ...
def normalize_slashes(p): ...
def walk(root, patterns, default: bool = True): ...
class PatternMatcher:
patterns: Incomplete
def __init__(self, patterns) -> None: ...
def matches(self, filepath): ...
def walk(self, root): ...
class Pattern:
exclusion: bool
dirs: Incomplete
cleaned_pattern: Incomplete
def __init__(self, pattern_str) -> None: ...
@classmethod
def normalize(cls, p): ...
def match(self, filepath): ...

View File

@@ -0,0 +1,10 @@
from _typeshed import Incomplete
DOCKER_CONFIG_FILENAME: Incomplete
LEGACY_DOCKER_CONFIG_FILENAME: str
log: Incomplete
def find_config_file(config_path: Incomplete | None = None): ...
def config_path_from_environment(): ...
def home_dir(): ...
def load_general_config(config_path: Incomplete | None = None): ...

View File

@@ -0,0 +1,3 @@
def check_resource(resource_name): ...
def minimum_version(version): ...
def update_headers(f): ...

View File

@@ -0,0 +1,5 @@
__all__ = ["fnmatch", "fnmatchcase", "translate"]
def fnmatch(name, pat): ...
def fnmatchcase(name, pat): ...
def translate(pat): ...

View File

@@ -0,0 +1,10 @@
from _typeshed import Incomplete
from collections.abc import Generator
json_decoder: Incomplete
def stream_as_text(stream) -> Generator[Incomplete, None, None]: ...
def json_splitter(buffer): ...
def json_stream(stream): ...
def line_splitter(buffer, separator: str = "\n"): ...
def split_buffer(stream, splitter: Incomplete | None = None, decoder=...) -> Generator[Incomplete, None, Incomplete]: ...

View File

@@ -0,0 +1,9 @@
from _typeshed import Incomplete
PORT_SPEC: Incomplete
def add_port_mapping(port_bindings, internal_port, external) -> None: ...
def add_port(port_bindings, internal_port_range, external_range) -> None: ...
def build_port_bindings(ports): ...
def port_range(start, end, proto, randomly_available_port: bool = False): ...
def split_port(port): ...

View File

@@ -0,0 +1,33 @@
from collections.abc import Sequence
from typing import TypedDict
from typing_extensions import NotRequired
class _ProxyConfigDict(TypedDict):
http: NotRequired[str]
https: NotRequired[str]
ftpProxy: NotRequired[str]
noProxy: NotRequired[str]
class _Environment(TypedDict):
http_proxy: NotRequired[str]
HTTP_PROXY: NotRequired[str]
https_proxy: NotRequired[str]
HTTPS_PROXY: NotRequired[str]
ftp_proxy: NotRequired[str]
FTP_PROXY: NotRequired[str]
no_proxy: NotRequired[str]
NO_PROXY: NotRequired[str]
class ProxyConfig(dict[str, str]):
@property
def http(self) -> str | None: ...
@property
def https(self) -> str | None: ...
@property
def ftp(self) -> str | None: ...
@property
def no_proxy(self) -> str | None: ...
@staticmethod
def from_dict(config: _ProxyConfigDict) -> ProxyConfig: ...
def get_environment(self) -> _Environment: ...
def inject_proxy_environment(self, environment: None | Sequence[str]) -> None | Sequence[str]: ...

View File

@@ -0,0 +1,18 @@
from _typeshed import Incomplete
from collections.abc import Generator
STDOUT: int
STDERR: int
class SocketError(Exception): ...
NPIPE_ENDED: int
def read(socket, n: int = 4096): ...
def read_exactly(socket, n): ...
def next_frame_header(socket): ...
def frames_iter(socket, tty): ...
def frames_iter_no_tty(socket) -> Generator[Incomplete, None, None]: ...
def frames_iter_tty(socket) -> Generator[Incomplete, None, None]: ...
def consume_socket_output(frames, demux: bool = False): ...
def demux_adaptor(stream_id, data): ...

View File

@@ -0,0 +1,34 @@
from _typeshed import Incomplete
from typing import NamedTuple
class URLComponents(NamedTuple):
scheme: Incomplete
netloc: Incomplete
url: Incomplete
params: Incomplete
query: Incomplete
fragment: Incomplete
def create_ipam_pool(*args, **kwargs) -> None: ...
def create_ipam_config(*args, **kwargs) -> None: ...
def decode_json_header(header): ...
def compare_version(v1, v2): ...
def version_lt(v1, v2): ...
def version_gte(v1, v2): ...
def convert_port_bindings(port_bindings): ...
def convert_volume_binds(binds): ...
def convert_tmpfs_mounts(tmpfs): ...
def convert_service_networks(networks): ...
def parse_repository_tag(repo_name): ...
def parse_host(addr, is_win32: bool = False, tls: bool = False): ...
def parse_devices(devices): ...
def kwargs_from_env(environment: Incomplete | None = None): ...
def convert_filters(filters): ...
def datetime_to_timestamp(dt): ...
def parse_bytes(s): ...
def normalize_links(links): ...
def parse_env_file(env_file): ...
def split_command(command): ...
def format_environment(environment): ...
def format_extra_hosts(extra_hosts, task: bool = False): ...
def create_host_config(self, *args, **kwargs) -> None: ...

View File

@@ -0,0 +1 @@
__version__: str