mirror of
https://github.com/davidhalter/typeshed.git
synced 2026-05-06 21:43:59 +08:00
Python 3.14: PEP-784 compression.zstd (#14129)
This commit is contained in:
@@ -1,10 +1,11 @@
|
||||
from _typeshed import Incomplete, WriteableBuffer
|
||||
from collections.abc import Callable
|
||||
from io import DEFAULT_BUFFER_SIZE, BufferedIOBase, RawIOBase
|
||||
from typing import Any, Protocol
|
||||
from typing import Any, Protocol, type_check_only
|
||||
|
||||
BUFFER_SIZE = DEFAULT_BUFFER_SIZE
|
||||
|
||||
@type_check_only
|
||||
class _Reader(Protocol):
|
||||
def read(self, n: int, /) -> bytes: ...
|
||||
def seekable(self) -> bool: ...
|
||||
|
||||
@@ -0,0 +1,87 @@
|
||||
import enum
|
||||
from _typeshed import ReadableBuffer
|
||||
from collections.abc import Iterable, Mapping
|
||||
from compression.zstd._zstdfile import ZstdFile, open
|
||||
from typing import Final, final
|
||||
|
||||
import _zstd
|
||||
from _zstd import ZstdCompressor, ZstdDecompressor, ZstdDict, ZstdError, get_frame_size, zstd_version
|
||||
|
||||
__all__ = (
|
||||
# compression.zstd
|
||||
"COMPRESSION_LEVEL_DEFAULT",
|
||||
"compress",
|
||||
"CompressionParameter",
|
||||
"decompress",
|
||||
"DecompressionParameter",
|
||||
"finalize_dict",
|
||||
"get_frame_info",
|
||||
"Strategy",
|
||||
"train_dict",
|
||||
# compression.zstd._zstdfile
|
||||
"open",
|
||||
"ZstdFile",
|
||||
# _zstd
|
||||
"get_frame_size",
|
||||
"zstd_version",
|
||||
"zstd_version_info",
|
||||
"ZstdCompressor",
|
||||
"ZstdDecompressor",
|
||||
"ZstdDict",
|
||||
"ZstdError",
|
||||
)
|
||||
|
||||
zstd_version_info: Final[tuple[int, int, int]]
|
||||
COMPRESSION_LEVEL_DEFAULT: Final = _zstd.ZSTD_CLEVEL_DEFAULT
|
||||
|
||||
class FrameInfo:
|
||||
decompressed_size: int
|
||||
dictionary_id: int
|
||||
def __init__(self, decompressed_size: int, dictionary_id: int) -> None: ...
|
||||
|
||||
def get_frame_info(frame_buffer: ReadableBuffer) -> FrameInfo: ...
|
||||
def train_dict(samples: Iterable[ReadableBuffer], dict_size: int) -> ZstdDict: ...
|
||||
def finalize_dict(zstd_dict: ZstdDict, /, samples: Iterable[ReadableBuffer], dict_size: int, level: int) -> ZstdDict: ...
|
||||
def compress(
|
||||
data: ReadableBuffer, level: int | None = None, options: Mapping[int, int] | None = None, zstd_dict: ZstdDict | None = None
|
||||
) -> bytes: ...
|
||||
def decompress(data: ReadableBuffer, zstd_dict: ZstdDict | None = None, options: Mapping[int, int] | None = None) -> bytes: ...
|
||||
@final
|
||||
class CompressionParameter(enum.IntEnum):
|
||||
compression_level = _zstd.ZSTD_c_compressionLevel
|
||||
window_log = _zstd.ZSTD_c_windowLog
|
||||
hash_log = _zstd.ZSTD_c_hashLog
|
||||
chain_log = _zstd.ZSTD_c_chainLog
|
||||
search_log = _zstd.ZSTD_c_searchLog
|
||||
min_match = _zstd.ZSTD_c_minMatch
|
||||
target_length = _zstd.ZSTD_c_targetLength
|
||||
strategy = _zstd.ZSTD_c_strategy
|
||||
enable_long_distance_matching = _zstd.ZSTD_c_enableLongDistanceMatching
|
||||
ldm_hash_log = _zstd.ZSTD_c_ldmHashLog
|
||||
ldm_min_match = _zstd.ZSTD_c_ldmMinMatch
|
||||
ldm_bucket_size_log = _zstd.ZSTD_c_ldmBucketSizeLog
|
||||
ldm_hash_rate_log = _zstd.ZSTD_c_ldmHashRateLog
|
||||
content_size_flag = _zstd.ZSTD_c_contentSizeFlag
|
||||
checksum_flag = _zstd.ZSTD_c_checksumFlag
|
||||
dict_id_flag = _zstd.ZSTD_c_dictIDFlag
|
||||
nb_workers = _zstd.ZSTD_c_nbWorkers
|
||||
job_size = _zstd.ZSTD_c_jobSize
|
||||
overlap_log = _zstd.ZSTD_c_overlapLog
|
||||
def bounds(self) -> tuple[int, int]: ...
|
||||
|
||||
@final
|
||||
class DecompressionParameter(enum.IntEnum):
|
||||
window_log_max = _zstd.ZSTD_d_windowLogMax
|
||||
def bounds(self) -> tuple[int, int]: ...
|
||||
|
||||
@final
|
||||
class Strategy(enum.IntEnum):
|
||||
fast = _zstd.ZSTD_fast
|
||||
dfast = _zstd.ZSTD_dfast
|
||||
greedy = _zstd.ZSTD_greedy
|
||||
lazy = _zstd.ZSTD_lazy
|
||||
lazy2 = _zstd.ZSTD_lazy2
|
||||
btlazy2 = _zstd.ZSTD_btlazy2
|
||||
btopt = _zstd.ZSTD_btopt
|
||||
btultra = _zstd.ZSTD_btultra
|
||||
btultra2 = _zstd.ZSTD_btultra2
|
||||
@@ -0,0 +1,117 @@
|
||||
from _typeshed import ReadableBuffer, StrOrBytesPath, SupportsWrite, WriteableBuffer
|
||||
from collections.abc import Mapping
|
||||
from compression._common import _streams
|
||||
from compression.zstd import ZstdDict
|
||||
from io import TextIOWrapper, _WrappedBuffer
|
||||
from typing import Literal, overload, type_check_only
|
||||
from typing_extensions import TypeAlias
|
||||
|
||||
from _zstd import ZstdCompressor, _ZstdCompressorFlushBlock, _ZstdCompressorFlushFrame
|
||||
|
||||
__all__ = ("ZstdFile", "open")
|
||||
|
||||
_ReadBinaryMode: TypeAlias = Literal["r", "rb"]
|
||||
_WriteBinaryMode: TypeAlias = Literal["w", "wb", "x", "xb", "a", "ab"]
|
||||
_ReadTextMode: TypeAlias = Literal["rt"]
|
||||
_WriteTextMode: TypeAlias = Literal["wt", "xt", "at"]
|
||||
|
||||
@type_check_only
|
||||
class _FileBinaryRead(_streams._Reader):
|
||||
def close(self) -> None: ...
|
||||
|
||||
@type_check_only
|
||||
class _FileBinaryWrite(SupportsWrite[bytes]):
|
||||
def close(self) -> None: ...
|
||||
|
||||
class ZstdFile(_streams.BaseStream):
|
||||
FLUSH_BLOCK = ZstdCompressor.FLUSH_BLOCK
|
||||
FLUSH_FRAME = ZstdCompressor.FLUSH_FRAME
|
||||
|
||||
@overload
|
||||
def __init__(
|
||||
self,
|
||||
file: StrOrBytesPath | _FileBinaryRead,
|
||||
/,
|
||||
mode: _ReadBinaryMode = "r",
|
||||
*,
|
||||
level: None = None,
|
||||
options: Mapping[int, int] | None = None,
|
||||
zstd_dict: ZstdDict | None = None,
|
||||
) -> None: ...
|
||||
@overload
|
||||
def __init__(
|
||||
self,
|
||||
file: StrOrBytesPath | _FileBinaryWrite,
|
||||
/,
|
||||
mode: _WriteBinaryMode,
|
||||
*,
|
||||
level: int | None = None,
|
||||
options: Mapping[int, int] | None = None,
|
||||
zstd_dict: ZstdDict | None = None,
|
||||
) -> None: ...
|
||||
def write(self, data: ReadableBuffer, /) -> int: ...
|
||||
def flush(self, mode: _ZstdCompressorFlushBlock | _ZstdCompressorFlushFrame = 1) -> bytes: ... # type: ignore[override]
|
||||
def read(self, size: int | None = -1) -> bytes: ...
|
||||
def read1(self, size: int | None = -1) -> bytes: ...
|
||||
def readinto(self, b: WriteableBuffer) -> int: ...
|
||||
def readinto1(self, b: WriteableBuffer) -> int: ...
|
||||
def readline(self, size: int | None = -1) -> bytes: ...
|
||||
def seek(self, offset: int, whence: int = 0) -> int: ...
|
||||
def peek(self, size: int = -1) -> bytes: ...
|
||||
@property
|
||||
def name(self) -> str | bytes: ...
|
||||
@property
|
||||
def mode(self) -> Literal["rb", "wb"]: ...
|
||||
|
||||
@overload
|
||||
def open(
|
||||
file: StrOrBytesPath | _FileBinaryRead,
|
||||
/,
|
||||
mode: _ReadBinaryMode = "rb",
|
||||
*,
|
||||
level: None = None,
|
||||
options: Mapping[int, int] | None = None,
|
||||
zstd_dict: ZstdDict | None = None,
|
||||
encoding: str | None = None,
|
||||
errors: str | None = None,
|
||||
newline: str | None = None,
|
||||
) -> ZstdFile: ...
|
||||
@overload
|
||||
def open(
|
||||
file: StrOrBytesPath | _FileBinaryWrite,
|
||||
/,
|
||||
mode: _WriteBinaryMode,
|
||||
*,
|
||||
level: int | None = None,
|
||||
options: Mapping[int, int] | None = None,
|
||||
zstd_dict: ZstdDict | None = None,
|
||||
encoding: str | None = None,
|
||||
errors: str | None = None,
|
||||
newline: str | None = None,
|
||||
) -> ZstdFile: ...
|
||||
@overload
|
||||
def open(
|
||||
file: StrOrBytesPath | _WrappedBuffer,
|
||||
/,
|
||||
mode: _ReadTextMode,
|
||||
*,
|
||||
level: None = None,
|
||||
options: Mapping[int, int] | None = None,
|
||||
zstd_dict: ZstdDict | None = None,
|
||||
encoding: str | None = None,
|
||||
errors: str | None = None,
|
||||
newline: str | None = None,
|
||||
) -> TextIOWrapper: ...
|
||||
@overload
|
||||
def open(
|
||||
file: StrOrBytesPath | _WrappedBuffer,
|
||||
/,
|
||||
mode: _WriteTextMode,
|
||||
*,
|
||||
level: int | None = None,
|
||||
options: Mapping[int, int] | None = None,
|
||||
zstd_dict: ZstdDict | None = None,
|
||||
encoding: str | None = None,
|
||||
errors: str | None = None,
|
||||
newline: str | None = None,
|
||||
) -> TextIOWrapper: ...
|
||||
Reference in New Issue
Block a user