Improve docker.utils (#13808)

This commit is contained in:
Semyon Moroz
2025-04-15 15:31:53 +04:00
committed by GitHub
parent b0238c58a2
commit fd155cb31b
10 changed files with 188 additions and 112 deletions
+3 -1
View File
@@ -1,7 +1,9 @@
from typing import Final
from .api import APIClient as APIClient
from .client import DockerClient as DockerClient, from_env as from_env
from .context import Context as Context, ContextAPI as ContextAPI
from .tls import TLSConfig as TLSConfig
from .version import __version__ as __version__
__title__: str
__title__: Final[str]
+38 -25
View File
@@ -1,40 +1,53 @@
from _typeshed import Incomplete
from _typeshed import FileDescriptorOrPath, Incomplete, ReadableBuffer
from collections.abc import Mapping, MutableMapping
from logging import Logger
from typing import Final
from typing_extensions import Self
INDEX_NAME: str
INDEX_URL: Incomplete
TOKEN_USERNAME: str
log: Incomplete
INDEX_NAME: Final[str]
INDEX_URL: Final[str]
TOKEN_USERNAME: Final[str]
log: Logger
def resolve_repository_name(repo_name): ...
def resolve_index_name(index_name): ...
def get_config_header(client, registry): ...
def split_repo_name(repo_name): ...
def get_credential_store(authconfig, registry): ...
def resolve_repository_name(repo_name: str) -> tuple[str, str]: ...
def resolve_index_name(index_name: str) -> str: ...
def get_config_header(client, registry) -> bytes | None: ...
def split_repo_name(repo_name: str) -> tuple[str, str]: ...
def get_credential_store(authconfig: AuthConfig | MutableMapping[str, Incomplete], registry: str | None): ...
class AuthConfig(dict[str, Incomplete]):
def __init__(self, dct, credstore_env: Incomplete | None = None) -> None: ...
def __init__(self, dct: MutableMapping[str, Incomplete], credstore_env: Incomplete | None = None) -> None: ...
@classmethod
def parse_auth(cls, entries, raise_on_error: bool = False): ...
def parse_auth(
cls, entries: Mapping[str, dict[Incomplete, Incomplete]], raise_on_error: bool = False
) -> dict[str, Incomplete]: ...
@classmethod
def load_config(cls, config_path, config_dict, credstore_env: Incomplete | None = None): ...
def load_config(
cls,
config_path: FileDescriptorOrPath | None,
config_dict: dict[str, Incomplete] | None,
credstore_env: Incomplete | None = None,
) -> Self: ...
@property
def auths(self): ...
def auths(self) -> dict[str, Incomplete]: ...
@property
def creds_store(self): ...
@property
def cred_helpers(self): ...
@property
def is_empty(self): ...
def resolve_authconfig(self, registry: Incomplete | None = None): ...
def get_credential_store(self, registry): ...
def is_empty(self) -> bool: ...
def resolve_authconfig(self, registry: str | None = None): ...
def get_credential_store(self, registry: str | None): ...
def get_all_credentials(self): ...
def add_auth(self, reg, data) -> None: ...
def add_auth(self, reg: str, data) -> None: ...
def resolve_authconfig(authconfig, registry: Incomplete | None = None, credstore_env: Incomplete | None = None): ...
def convert_to_hostname(url): ...
def decode_auth(auth): ...
def encode_header(auth): ...
def parse_auth(entries, raise_on_error: bool = False): ...
def resolve_authconfig(authconfig, registry: str | None = None, credstore_env: Incomplete | None = None): ...
def convert_to_hostname(url: str) -> str: ...
def decode_auth(auth: str | ReadableBuffer) -> tuple[str, str]: ...
def encode_header(auth) -> bytes: ...
def parse_auth(entries: Mapping[str, dict[Incomplete, Incomplete]], raise_on_error: bool = False): ...
def load_config(
config_path: Incomplete | None = None, config_dict: Incomplete | None = None, credstore_env: Incomplete | None = None
): ...
config_path: FileDescriptorOrPath | None = None,
config_dict: dict[str, Incomplete] | None = None,
credstore_env: Incomplete | None = None,
) -> AuthConfig: ...
+30 -25
View File
@@ -1,38 +1,43 @@
from _typeshed import Incomplete
import io
from _typeshed import Incomplete, StrOrBytesPath, StrPath
from collections.abc import Generator, Iterable, MutableSequence
from os import PathLike
from tarfile import _Fileobj
from tempfile import _TemporaryFileWrapper
def match_tag(tag: str) -> bool: ...
def tar(
path,
exclude: Incomplete | None = None,
dockerfile: Incomplete | None = None,
fileobj: Incomplete | None = None,
path: PathLike[str],
exclude: list[str] | None = None,
dockerfile: tuple[str | None, str | None] | None = None,
fileobj: _Fileobj | None = None,
gzip: bool = False,
): ...
def exclude_paths(root, patterns, dockerfile: Incomplete | None = None): ...
def build_file_list(root): ...
) -> _TemporaryFileWrapper[bytes] | _Fileobj: ...
def exclude_paths(root: StrPath, patterns: MutableSequence[str], dockerfile: str | None = None) -> set[str]: ...
def build_file_list(root: str) -> list[str]: ...
def create_archive(
root,
files: Incomplete | None = None,
fileobj: Incomplete | None = None,
root: str,
files: Iterable[str] | None = None,
fileobj: _Fileobj | None = None,
gzip: bool = False,
extra_files: Incomplete | None = None,
): ...
def mkbuildcontext(dockerfile): ...
def split_path(p): ...
def normalize_slashes(p): ...
def walk(root, patterns, default: bool = True): ...
) -> _TemporaryFileWrapper[bytes] | _Fileobj: ...
def mkbuildcontext(dockerfile: io.IOBase | StrOrBytesPath) -> _TemporaryFileWrapper[bytes]: ...
def split_path(p: str) -> list[str]: ...
def normalize_slashes(p: str) -> str: ...
def walk(root: StrPath, patterns: Iterable[str], default: bool = True) -> Generator[str]: ...
class PatternMatcher:
patterns: Incomplete
def __init__(self, patterns) -> None: ...
def matches(self, filepath): ...
def walk(self, root): ...
patterns: list[Pattern]
def __init__(self, patterns: Iterable[str]) -> None: ...
def matches(self, filepath: PathLike[str]) -> bool: ...
def walk(self, root: StrPath) -> Generator[str]: ...
class Pattern:
exclusion: bool
dirs: Incomplete
cleaned_pattern: Incomplete
def __init__(self, pattern_str) -> None: ...
dirs: list[str]
cleaned_pattern: str
def __init__(self, pattern_str: str) -> None: ...
@classmethod
def normalize(cls, p): ...
def match(self, filepath): ...
def normalize(cls, p: str) -> list[str]: ...
def match(self, filepath: str) -> bool: ...
+10 -8
View File
@@ -1,10 +1,12 @@
from _typeshed import Incomplete
from _typeshed import FileDescriptorOrPath
from logging import Logger
from typing import Final
DOCKER_CONFIG_FILENAME: Incomplete
LEGACY_DOCKER_CONFIG_FILENAME: str
log: Incomplete
DOCKER_CONFIG_FILENAME: Final[str]
LEGACY_DOCKER_CONFIG_FILENAME: Final[str]
log: Logger
def find_config_file(config_path: Incomplete | None = None): ...
def config_path_from_environment(): ...
def home_dir(): ...
def load_general_config(config_path: Incomplete | None = None): ...
def find_config_file(config_path: FileDescriptorOrPath | None = None) -> FileDescriptorOrPath | None: ...
def config_path_from_environment() -> str | None: ...
def home_dir() -> str: ...
def load_general_config(config_path: FileDescriptorOrPath | None = None): ...
+6 -3
View File
@@ -1,3 +1,6 @@
def check_resource(resource_name): ...
def minimum_version(version): ...
def update_headers(f): ...
from _typeshed import Incomplete
from collections.abc import Callable
def check_resource(resource_name: str): ...
def minimum_version(version: str): ...
def update_headers(f: Callable[..., Incomplete]): ...
+3 -3
View File
@@ -1,5 +1,5 @@
__all__ = ["fnmatch", "fnmatchcase", "translate"]
def fnmatch(name, pat): ...
def fnmatchcase(name, pat): ...
def translate(pat): ...
def fnmatch(name: str, pat: str) -> bool: ...
def fnmatchcase(name: str, pat: str) -> bool: ...
def translate(pat: str) -> str: ...
+3 -3
View File
@@ -6,10 +6,10 @@ from docker._types import JSON
json_decoder: json.JSONDecoder
def stream_as_text(stream: Iterator[str | bytes]) -> Generator[str, None, None]: ...
def stream_as_text(stream: Iterator[str | bytes]) -> Generator[str]: ...
def json_splitter(buffer: str) -> tuple[JSON, str] | None: ...
def json_stream(stream: Iterator[str]) -> Generator[JSON, None, None]: ...
def json_stream(stream: Iterator[str]) -> Generator[JSON]: ...
def line_splitter(buffer: str, separator: str = "\n") -> tuple[str, str] | None: ...
def split_buffer(
stream: Iterator[str | bytes], splitter: Callable[[str], tuple[str, str]] | None = None, decoder: Callable[[str], Any] = ...
) -> Generator[Any, None, None]: ...
) -> Generator[Any]: ...
+5 -3
View File
@@ -1,9 +1,11 @@
import re
from _typeshed import Incomplete
from typing import Final
PORT_SPEC: Incomplete
PORT_SPEC: Final[re.Pattern[str]]
def add_port_mapping(port_bindings, internal_port, external) -> None: ...
def add_port(port_bindings, internal_port_range, external_range) -> None: ...
def build_port_bindings(ports): ...
def build_port_bindings(ports) -> dict[Incomplete, Incomplete]: ...
def port_range(start, end, proto, randomly_available_port: bool = False): ...
def split_port(port): ...
def split_port(port: object) -> tuple[Incomplete, Incomplete]: ...
+22 -11
View File
@@ -1,18 +1,29 @@
from _typeshed import Incomplete
from collections.abc import Generator
from _typeshed import Incomplete, ReadableBuffer
from collections.abc import Generator, Iterable
from typing import Final, Literal, TypeVar, overload
STDOUT: int
STDERR: int
_T = TypeVar("_T")
STDOUT: Final = 1
STDERR: Final = 2
class SocketError(Exception): ...
NPIPE_ENDED: int
NPIPE_ENDED: Final = 109
def read(socket, n: int = 4096): ...
def read_exactly(socket, n): ...
def next_frame_header(socket): ...
def read_exactly(socket, n: int) -> bytes: ...
def next_frame_header(socket) -> tuple[Incomplete, int]: ...
def frames_iter(socket, tty): ...
def frames_iter_no_tty(socket) -> Generator[Incomplete, None, None]: ...
def frames_iter_tty(socket) -> Generator[Incomplete, None, None]: ...
def consume_socket_output(frames, demux: bool = False): ...
def demux_adaptor(stream_id, data): ...
def frames_iter_no_tty(socket) -> Generator[tuple[str | Incomplete, str | bytes | Incomplete]]: ...
def frames_iter_tty(socket) -> Generator[Incomplete]: ...
@overload
def consume_socket_output(
frames: Iterable[tuple[Incomplete, Incomplete]], demux: Literal[True]
) -> tuple[Incomplete, Incomplete]: ...
@overload
def consume_socket_output(frames: Iterable[ReadableBuffer], demux: Literal[False] = False) -> bytes: ...
@overload
def demux_adaptor(stream_id: Literal[1], data: _T) -> tuple[_T, None]: ...
@overload
def demux_adaptor(stream_id: Literal[2], data: _T) -> tuple[None, _T]: ...
+68 -30
View File
@@ -1,34 +1,72 @@
from _typeshed import Incomplete
from typing import NamedTuple
import datetime
from _typeshed import FileDescriptorOrPath, Incomplete, ReadableBuffer
from collections.abc import Iterable, Mapping
from shlex import _ShlexInstream
from typing import Literal, NamedTuple, NoReturn, TypedDict, TypeVar, overload, type_check_only
from typing_extensions import deprecated
from ..tls import TLSConfig
_T = TypeVar("_T")
_K = TypeVar("_K")
_V = TypeVar("_V")
@type_check_only
class _EnvKWArgs(TypedDict, total=False):
base_url: str
tls: TLSConfig
class URLComponents(NamedTuple):
scheme: Incomplete
netloc: Incomplete
url: Incomplete
params: Incomplete
query: Incomplete
fragment: Incomplete
scheme: str | None
netloc: str | None
url: str
params: str | None
query: str | None
fragment: str | None
def create_ipam_pool(*args, **kwargs) -> None: ...
def create_ipam_config(*args, **kwargs) -> None: ...
def decode_json_header(header): ...
def compare_version(v1, v2): ...
def version_lt(v1, v2): ...
def version_gte(v1, v2): ...
def convert_port_bindings(port_bindings): ...
def convert_volume_binds(binds): ...
def convert_tmpfs_mounts(tmpfs): ...
def convert_service_networks(networks): ...
def parse_repository_tag(repo_name): ...
def parse_host(addr, is_win32: bool = False, tls: bool = False): ...
def parse_devices(devices): ...
def kwargs_from_env(environment: Incomplete | None = None): ...
def convert_filters(filters): ...
def datetime_to_timestamp(dt): ...
def parse_bytes(s): ...
@deprecated("utils.create_ipam_pool has been removed. Please use a docker.types.IPAMPool object instead.")
def create_ipam_pool(*args, **kwargs) -> NoReturn: ...
@deprecated("utils.create_ipam_config has been removed. Please use a docker.types.IPAMConfig object instead.")
def create_ipam_config(*args, **kwargs) -> NoReturn: ...
def decode_json_header(header: str | ReadableBuffer): ...
def compare_version(v1: str, v2: str) -> Literal[0, -1, 1]: ...
def version_lt(v1: str, v2: str) -> bool: ...
def version_gte(v1: str, v2: str) -> bool: ...
def convert_port_bindings(
port_bindings: Mapping[object, Incomplete], # keys are converted using str()
) -> dict[str, list[dict[str, str]]]: ...
@overload
def convert_volume_binds(binds: list[_T]) -> list[_T]: ...
@overload
def convert_volume_binds(binds: Mapping[str | bytes, Incomplete]) -> list[str]: ...
@overload
def convert_tmpfs_mounts(tmpfs: dict[_K, _V]) -> dict[_K, _V]: ...
@overload
def convert_tmpfs_mounts(tmpfs: list[str]) -> dict[str, str]: ...
@overload
def convert_service_networks(networks: None) -> None: ...
@overload
def convert_service_networks(networks: list[str] | list[dict[str, str]] | list[str | dict[str, str]]) -> list[dict[str, str]]: ...
def parse_repository_tag(repo_name: str) -> tuple[str, str | None]: ...
@overload
def parse_host(addr: None, is_win32: Literal[True], tls: bool = False) -> Literal["npipe:////./pipe/docker_engine"]: ...
@overload
def parse_host(
addr: None, is_win32: Literal[False] = False, tls: bool = False
) -> Literal["http+unix:///var/run/docker.sock"]: ...
@overload
def parse_host(addr: str | None, is_win32: bool = False, tls: bool = False) -> str | bytes: ...
def parse_devices(devices: Iterable[str | dict[str, Incomplete]]) -> list[dict[str, Incomplete]]: ...
def kwargs_from_env(environment: Mapping[str, Incomplete] | None = None) -> _EnvKWArgs: ...
def convert_filters(filters) -> str: ...
def datetime_to_timestamp(dt: datetime.datetime) -> int: ...
def parse_bytes(s: float | str) -> float: ...
def normalize_links(links): ...
def parse_env_file(env_file): ...
def split_command(command): ...
def format_environment(environment): ...
def format_extra_hosts(extra_hosts, task: bool = False): ...
def create_host_config(self, *args, **kwargs) -> None: ...
def parse_env_file(env_file: FileDescriptorOrPath) -> dict[str, str]: ...
def split_command(command: str | _ShlexInstream) -> list[str]: ...
def format_environment(environment: Mapping[str, object | None]) -> list[str]: ...
def format_extra_hosts(
extra_hosts: Mapping[object, object], task: bool = False # keys and values are converted to str
) -> list[str]: ...
@deprecated("utils.create_host_config has been removed. Please use a docker.types.HostConfig object instead.")
def create_host_config(self, *args, **kwargs) -> NoReturn: ...