paramiko: Improve various bytes-related types (#10109)

This commit is contained in:
Jelle Zijlstra
2023-05-03 06:53:54 -07:00
committed by GitHub
parent 6b5ca0b238
commit 8e24885da7
15 changed files with 61 additions and 43 deletions

View File

@@ -1,9 +1,10 @@
from _typeshed import ReadableBuffer
from socket import _RetAddress, socket
from threading import Thread
from typing import Protocol
from paramiko.channel import Channel
from paramiko.message import Message
from paramiko.message import Message, _LikeBytes
from paramiko.pkey import PKey
from paramiko.transport import Transport
@@ -61,7 +62,7 @@ class AgentKey(PKey):
blob: bytes
public_blob: None
name: str
def __init__(self, agent: AgentSSH, blob: bytes) -> None: ...
def __init__(self, agent: AgentSSH, blob: ReadableBuffer) -> None: ...
def asbytes(self) -> bytes: ...
def get_name(self) -> str: ...
def sign_ssh_data(self, data: bytes, algorithm: str | None = None) -> Message: ...
def sign_ssh_data(self, data: _LikeBytes, algorithm: str | None = None) -> Message: ...

View File

@@ -1,4 +1,5 @@
from collections.abc import Callable, Mapping
from _typeshed import SupportsItems
from collections.abc import Callable
from logging import Logger
from threading import Condition, Event, Lock
from typing import Any, TypeVar
@@ -6,6 +7,7 @@ from typing_extensions import Literal
from paramiko.buffered_pipe import BufferedPipe
from paramiko.file import BufferedFile
from paramiko.message import _LikeBytes
from paramiko.transport import Transport
from paramiko.util import ClosingContextManager
@@ -43,22 +45,22 @@ class Channel(ClosingContextManager):
def __init__(self, chanid: int) -> None: ...
def __del__(self) -> None: ...
def get_pty(
self, term: str | bytes = "vt100", width: int = 80, height: int = 24, width_pixels: int = 0, height_pixels: int = 0
self, term: _LikeBytes = "vt100", width: int = 80, height: int = 24, width_pixels: int = 0, height_pixels: int = 0
) -> None: ...
def invoke_shell(self) -> None: ...
def exec_command(self, command: str | bytes) -> None: ...
def invoke_subsystem(self, subsystem: str | bytes) -> None: ...
def exec_command(self, command: _LikeBytes) -> None: ...
def invoke_subsystem(self, subsystem: _LikeBytes) -> None: ...
def resize_pty(self, width: int = 80, height: int = 24, width_pixels: int = 0, height_pixels: int = 0) -> None: ...
def update_environment(self, environment: Mapping[str | bytes, str | bytes]) -> None: ...
def set_environment_variable(self, name: str | bytes, value: str | bytes) -> None: ...
def update_environment(self, environment: SupportsItems[_LikeBytes, _LikeBytes]) -> None: ...
def set_environment_variable(self, name: _LikeBytes, value: _LikeBytes) -> None: ...
def exit_status_ready(self) -> bool: ...
def recv_exit_status(self) -> int: ...
def send_exit_status(self, status: int) -> None: ...
def request_x11(
self,
screen_number: int = 0,
auth_protocol: str | bytes | None = None,
auth_cookie: str | bytes | None = None,
auth_protocol: _LikeBytes | None = None,
auth_cookie: _LikeBytes | None = None,
single_connection: bool = False,
handler: Callable[[Channel, tuple[str, int]], object] | None = None,
) -> bytes: ...
@@ -78,10 +80,10 @@ class Channel(ClosingContextManager):
def recv_stderr_ready(self) -> bool: ...
def recv_stderr(self, nbytes: int) -> bytes: ...
def send_ready(self) -> bool: ...
def send(self, s: bytes) -> int: ...
def send_stderr(self, s: bytes) -> int: ...
def sendall(self, s: bytes) -> None: ...
def sendall_stderr(self, s: bytes) -> None: ...
def send(self, s: bytes | bytearray) -> int: ...
def send_stderr(self, s: bytes | bytearray) -> int: ...
def sendall(self, s: bytes | bytearray) -> None: ...
def sendall_stderr(self, s: bytes | bytearray) -> None: ...
def makefile(self, *params: Any) -> ChannelFile: ...
def makefile_stderr(self, *params: Any) -> ChannelStderrFile: ...
def makefile_stdin(self, *params: Any) -> ChannelStdinFile: ...

View File

@@ -1,11 +1,12 @@
from _typeshed import ReadableBuffer
from zlib import _Compress, _Decompress
class ZlibCompressor:
z: _Compress
def __init__(self) -> None: ...
def __call__(self, data: bytes) -> bytes: ...
def __call__(self, data: ReadableBuffer) -> bytes: ...
class ZlibDecompressor:
z: _Decompress
def __init__(self) -> None: ...
def __call__(self, data: bytes) -> bytes: ...
def __call__(self, data: ReadableBuffer) -> bytes: ...

View File

@@ -1,3 +1,4 @@
from _typeshed import ReadableBuffer
from collections.abc import Callable
from typing import IO
@@ -15,7 +16,7 @@ class DSSKey(PKey):
def __init__(
self,
msg: Message | None = None,
data: bytes | None = None,
data: ReadableBuffer | None = None,
filename: str | None = None,
password: str | None = None,
vals: tuple[int, int, int, int] | None = None,

View File

@@ -1,3 +1,4 @@
from _typeshed import ReadableBuffer
from collections.abc import Callable, Sequence
from typing import IO, Any
@@ -30,7 +31,7 @@ class ECDSAKey(PKey):
def __init__(
self,
msg: Message | None = None,
data: bytes | None = None,
data: ReadableBuffer | None = None,
filename: str | None = None,
password: str | None = None,
vals: tuple[EllipticCurvePrivateKey, EllipticCurvePublicKey] | None = None,

View File

@@ -1,3 +1,4 @@
from _typeshed import ReadableBuffer
from typing import IO
from paramiko.message import Message
@@ -8,7 +9,7 @@ class Ed25519Key(PKey):
def __init__(
self,
msg: Message | None = None,
data: bytes | None = None,
data: ReadableBuffer | None = None,
filename: str | None = None,
password: str | None = None,
file_obj: IO[str] | None = None,

View File

@@ -1,3 +1,4 @@
from _typeshed import ReadableBuffer
from collections.abc import Iterable
from io import BytesIO
from typing import Any, Protocol
@@ -6,13 +7,13 @@ from typing_extensions import TypeAlias
class _SupportsAsBytes(Protocol):
def asbytes(self) -> bytes: ...
_LikeBytes: TypeAlias = bytes | str | _SupportsAsBytes
_LikeBytes: TypeAlias = bytes | str | _SupportsAsBytes | ReadableBuffer
class Message:
big_int: int
packet: BytesIO
seqno: int # only when packet.Packetizer.read_message() is used
def __init__(self, content: bytes | None = None) -> None: ...
def __init__(self, content: ReadableBuffer | None = None) -> None: ...
def __bytes__(self) -> bytes: ...
def asbytes(self) -> bytes: ...
def rewind(self) -> None: ...
@@ -29,8 +30,8 @@ class Message:
def get_text(self) -> str: ...
def get_binary(self) -> bytes: ...
def get_list(self) -> list[str]: ...
def add_bytes(self, b: bytes) -> Message: ...
def add_byte(self, b: bytes) -> Message: ...
def add_bytes(self, b: ReadableBuffer) -> Message: ...
def add_byte(self, b: ReadableBuffer) -> Message: ...
def add_boolean(self, b: bool) -> Message: ...
def add_int(self, n: int) -> Message: ...
def add_adaptive_int(self, n: int) -> Message: ...

View File

@@ -1,4 +1,4 @@
from _typeshed import Incomplete
from _typeshed import Incomplete, ReadableBuffer
from collections.abc import Callable
from hashlib import _Hash
from logging import Logger
@@ -9,7 +9,7 @@ from cryptography.hazmat.primitives.ciphers import Cipher
from paramiko.compress import ZlibCompressor, ZlibDecompressor
from paramiko.message import Message
def compute_hmac(key: bytes, message: bytes, digest_class: _Hash) -> bytes: ...
def compute_hmac(key: bytes | bytearray, message: ReadableBuffer, digest_class: _Hash) -> bytes: ...
class NeedRekeyException(Exception): ...
@@ -30,7 +30,7 @@ class Packetizer:
block_size: int,
mac_engine: _Hash,
mac_size: int,
mac_key: bytes,
mac_key: bytes | bytearray,
sdctr: bool = False,
etm: bool = False,
) -> None: ...
@@ -40,7 +40,7 @@ class Packetizer:
block_size: int,
mac_engine: _Hash,
mac_size: int,
mac_key: bytes,
mac_key: bytes | bytearray,
etm: bool = False,
) -> None: ...
def set_outbound_compressor(self, compressor: ZlibCompressor) -> None: ...
@@ -57,7 +57,7 @@ class Packetizer:
def handshake_timed_out(self) -> bool: ...
def complete_handshake(self) -> None: ...
def read_all(self, n: int, check_rekey: bool = False) -> bytes: ...
def write_all(self, out: bytes) -> None: ...
def write_all(self, out: ReadableBuffer) -> None: ...
def readline(self, timeout: float) -> str: ...
def send_message(self, data: Message) -> None: ...
def read_message(self) -> tuple[int, Message]: ...

View File

@@ -1,12 +1,14 @@
from re import Pattern
from typing import IO
from typing import IO, TypeVar
from typing_extensions import Self
from paramiko.message import Message
OPENSSH_AUTH_MAGIC: bytes
def _unpad_openssh(data: bytes) -> bytes: ...
_BytesT = TypeVar("_BytesT", bound=bytes | bytearray)
def _unpad_openssh(data: _BytesT) -> _BytesT: ...
class PKey:
public_blob: PublicBlob | None
@@ -33,7 +35,7 @@ class PKey:
class PublicBlob:
key_type: str
key_blob: str
key_blob: bytes
comment: str
def __init__(self, type_: str, blob: bytes, comment: str | None = None) -> None: ...
@classmethod

View File

@@ -1,3 +1,4 @@
from _typeshed import ReadableBuffer
from subprocess import Popen
from typing import Any
@@ -8,7 +9,7 @@ class ProxyCommand(ClosingContextManager):
process: Popen[Any]
timeout: float | None
def __init__(self, command_line: str) -> None: ...
def send(self, content: bytes) -> int: ...
def send(self, content: ReadableBuffer) -> int: ...
def recv(self, size: int) -> bytes: ...
def close(self) -> None: ...
@property

View File

@@ -1,3 +1,4 @@
from _typeshed import ReadableBuffer
from collections.abc import Callable
from typing import IO
@@ -11,7 +12,7 @@ class RSAKey(PKey):
def __init__(
self,
msg: Message | None = None,
data: bytes | None = None,
data: ReadableBuffer | None = None,
filename: str | None = None,
password: str | None = None,
key: None | RSAPublicKey | RSAPrivateKey = None,

View File

@@ -1,3 +1,4 @@
from _typeshed import StrOrBytesPath
from collections.abc import Callable, Iterator
from logging import Logger
from typing import IO
@@ -50,11 +51,11 @@ class SFTPClient(BaseSFTP, ClosingContextManager):
self, fl: IO[bytes], remotepath: bytes | str, file_size: int = 0, callback: _Callback | None = None, confirm: bool = True
) -> SFTPAttributes: ...
def put(
self, localpath: bytes | str, remotepath: bytes | str, callback: _Callback | None = None, confirm: bool = True
self, localpath: StrOrBytesPath, remotepath: bytes | str, callback: _Callback | None = None, confirm: bool = True
) -> SFTPAttributes: ...
def getfo(self, remotepath: bytes | str, fl: IO[bytes], callback: _Callback | None = None, prefetch: bool = True) -> int: ...
def get(
self, remotepath: bytes | str, localpath: bytes | str, callback: _Callback | None = None, prefetch: bool = True
self, remotepath: bytes | str, localpath: StrOrBytesPath, callback: _Callback | None = None, prefetch: bool = True
) -> None: ...
class SFTP(SFTPClient): ...

View File

@@ -2,6 +2,7 @@ from collections.abc import Iterator, Sequence
from typing import Any
from paramiko.file import BufferedFile
from paramiko.message import _LikeBytes
from paramiko.sftp_attr import SFTPAttributes
from paramiko.sftp_client import SFTPClient
from paramiko.sftp_handle import SFTPHandle
@@ -11,7 +12,7 @@ class SFTPFile(BufferedFile[Any]):
sftp: SFTPClient
handle: SFTPHandle
pipelined: bool
def __init__(self, sftp: SFTPClient, handle: bytes, mode: str = "r", bufsize: int = -1) -> None: ...
def __init__(self, sftp: SFTPClient, handle: _LikeBytes, mode: str = "r", bufsize: int = -1) -> None: ...
def __del__(self) -> None: ...
def close(self) -> None: ...
def settimeout(self, timeout: float) -> None: ...

View File

@@ -1,3 +1,5 @@
from _typeshed import ReadableBuffer
from paramiko.sftp_attr import SFTPAttributes
from paramiko.util import ClosingContextManager
@@ -5,6 +7,6 @@ class SFTPHandle(ClosingContextManager):
def __init__(self, flags: int = 0) -> None: ...
def close(self) -> None: ...
def read(self, offset: int, length: int) -> bytes | int: ...
def write(self, offset: int, data: bytes) -> int: ...
def write(self, offset: int, data: ReadableBuffer) -> int: ...
def stat(self) -> int | SFTPAttributes: ...
def chattr(self, attr: SFTPAttributes) -> int: ...

View File

@@ -1,3 +1,5 @@
from _typeshed import ReadableBuffer
from collections.abc import Iterable
from hashlib import _Hash
from logging import Logger, LogRecord
from types import TracebackType
@@ -10,14 +12,14 @@ from paramiko.hostkeys import HostKeys
class SupportsClose(Protocol):
def close(self) -> None: ...
def inflate_long(s: bytes, always_positive: bool = False) -> int: ...
def inflate_long(s: bytes | bytearray, always_positive: bool = False) -> int: ...
def deflate_long(n: int, add_sign_padding: bool = True) -> bytes: ...
def format_binary(data: bytes, prefix: str = "") -> list[str]: ...
def format_binary_line(data: bytes) -> str: ...
def safe_string(s: bytes) -> bytes: ...
def format_binary(data: bytes | bytearray, prefix: str = "") -> list[str]: ...
def format_binary_line(data: bytes | bytearray) -> str: ...
def safe_string(s: Iterable[int | str]) -> bytes: ...
def bit_length(n: int) -> int: ...
def tb_strings() -> list[str]: ...
def generate_key_bytes(hash_alg: type[_Hash], salt: bytes, key: bytes | str, nbytes: int) -> bytes: ...
def generate_key_bytes(hash_alg: type[_Hash], salt: ReadableBuffer, key: bytes | str, nbytes: int) -> bytes: ...
def load_host_keys(filename: str) -> HostKeys: ...
def parse_ssh_config(file_obj: IO[str]) -> SSHConfig: ...
def lookup_ssh_host_config(hostname: str, config: SSHConfig) -> SSHConfigDict: ...