Big diff: Use new "|" union syntax (#5872)

This commit is contained in:
Akuli
2021-08-08 12:05:21 +03:00
committed by GitHub
parent b9adb7a874
commit ee487304d7
578 changed files with 8080 additions and 8966 deletions

View File

@@ -2,14 +2,14 @@ import builtins
import ctypes
import sys
from types import TracebackType
from typing import Any, Optional, Type, TypeVar
from typing import Any, Type, TypeVar
if sys.platform == "win32":
_T = TypeVar("_T")
def format_system_message(errno: int) -> Optional[str]: ...
def format_system_message(errno: int) -> str | None: ...
class WindowsError(builtins.WindowsError):
def __init__(self, value: Optional[int] = ...) -> None: ...
def __init__(self, value: int | None = ...) -> None: ...
@property
def message(self) -> str: ...
@property
@@ -31,13 +31,13 @@ if sys.platform == "win32":
pos: int
filemap: Any = ...
view: Any = ...
def __init__(self, name: str, length: int, security_attributes: Optional[Any] = ...) -> None: ...
def __init__(self, name: str, length: int, security_attributes: Any | None = ...) -> None: ...
def __enter__(self: _T) -> _T: ...
def seek(self, pos: int) -> None: ...
def write(self, msg: bytes) -> None: ...
def read(self, n: int) -> bytes: ...
def __exit__(
self, exc_type: Optional[Type[BaseException]], exc_val: Optional[BaseException], tb: Optional[TracebackType]
self, exc_type: Type[BaseException] | None, exc_val: BaseException | None, tb: TracebackType | None
) -> None: ...
READ_CONTROL: int
STANDARD_RIGHTS_REQUIRED: int
@@ -82,4 +82,4 @@ if sys.platform == "win32":
def GetTokenInformation(token: Any, information_class: Any) -> Any: ...
def OpenProcessToken(proc_handle: Any, access: Any) -> Any: ...
def get_current_user() -> TOKEN_USER: ...
def get_security_attributes_for_user(user: Optional[TOKEN_USER] = ...) -> SECURITY_ATTRIBUTES: ...
def get_security_attributes_for_user(user: TOKEN_USER | None = ...) -> SECURITY_ATTRIBUTES: ...

View File

@@ -1,5 +1,5 @@
from threading import Event
from typing import Callable, List, Optional, Tuple
from typing import Callable, List, Tuple
from paramiko.pkey import PKey
from paramiko.ssh_gss import _SSH_GSSAuth
@@ -9,22 +9,22 @@ _InteractiveCallback = Callable[[str, str, List[Tuple[str, bool]]], List[str]]
class AuthHandler:
transport: Transport
username: Optional[str]
username: str | None
authenticated: bool
auth_event: Optional[Event]
auth_event: Event | None
auth_method: str
banner: Optional[str]
password: Optional[str]
private_key: Optional[PKey]
interactive_handler: Optional[_InteractiveCallback]
submethods: Optional[str]
auth_username: Optional[str]
banner: str | None
password: str | None
private_key: PKey | None
interactive_handler: _InteractiveCallback | None
submethods: str | None
auth_username: str | None
auth_fail_count: int
gss_host: Optional[str]
gss_host: str | None
gss_deleg_creds: bool
def __init__(self, transport: Transport) -> None: ...
def is_authenticated(self) -> bool: ...
def get_username(self) -> Optional[str]: ...
def get_username(self) -> str | None: ...
def auth_none(self, username: str, event: Event) -> None: ...
def auth_publickey(self, username: str, key: PKey, event: Event) -> None: ...
def auth_password(self, username: str, password: str, event: Event) -> None: ...

View File

@@ -1,4 +1,4 @@
from typing import Any, Iterable, List, Union
from typing import Any, Iterable, List
class BERException(Exception): ...
@@ -7,10 +7,10 @@ class BER:
idx: int
def __init__(self, content: bytes = ...) -> None: ...
def asbytes(self) -> bytes: ...
def decode(self) -> Union[None, int, List[int]]: ...
def decode_next(self) -> Union[None, int, List[int]]: ...
def decode(self) -> None | int | List[int]: ...
def decode_next(self) -> None | int | List[int]: ...
@staticmethod
def decode_sequence(data: bytes) -> List[Union[int, List[int]]]: ...
def decode_sequence(data: bytes) -> List[int | List[int]]: ...
def encode_tlv(self, ident: int, val: bytes) -> None: ...
def encode(self, x: Any) -> None: ...
@staticmethod

View File

@@ -1,5 +1,5 @@
from threading import Event
from typing import Generic, Optional, Text, TypeVar
from typing import Generic, Text, TypeVar
_T = TypeVar("_T", Text, bytes)
@@ -10,7 +10,7 @@ class BufferedPipe(Generic[_T]):
def set_event(self, event: Event) -> None: ...
def feed(self, data: _T) -> None: ...
def read_ready(self) -> bool: ...
def read(self, nbytes: int, timeout: Optional[float] = ...) -> _T: ...
def read(self, nbytes: int, timeout: float | None = ...) -> _T: ...
def empty(self) -> _T: ...
def close(self) -> None: ...
def __len__(self) -> int: ...

View File

@@ -1,6 +1,6 @@
from logging import Logger
from threading import Condition, Event, Lock
from typing import Any, Callable, Mapping, Optional, Tuple, TypeVar
from typing import Any, Callable, Mapping, Tuple, TypeVar
from paramiko.buffered_pipe import BufferedPipe
from paramiko.file import BufferedFile
@@ -14,13 +14,13 @@ def open_only(func: _F) -> Callable[[_F], _F]: ...
class Channel(ClosingContextManager):
chanid: int
remote_chanid: int
transport: Optional[Transport]
transport: Transport | None
active: bool
eof_received: int
eof_sent: int
in_buffer: BufferedPipe[Any]
in_stderr_buffer: BufferedPipe[Any]
timeout: Optional[float]
timeout: float | None
closed: bool
ultra_debug: bool
lock: Lock
@@ -55,10 +55,10 @@ class Channel(ClosingContextManager):
def request_x11(
self,
screen_number: int = ...,
auth_protocol: Optional[str] = ...,
auth_cookie: Optional[str] = ...,
auth_protocol: str | None = ...,
auth_cookie: str | None = ...,
single_connection: bool = ...,
handler: Optional[Callable[[Channel, Tuple[str, int]], None]] = ...,
handler: Callable[[Channel, Tuple[str, int]], None] | None = ...,
) -> bytes: ...
def request_forward_agent(self, handler: Callable[[Channel], None]) -> bool: ...
def get_transport(self) -> Transport: ...
@@ -66,8 +66,8 @@ class Channel(ClosingContextManager):
def get_name(self) -> str: ...
def get_id(self) -> int: ...
def set_combine_stderr(self, combine: bool) -> bool: ...
def settimeout(self, timeout: Optional[float]) -> None: ...
def gettimeout(self) -> Optional[float]: ...
def settimeout(self, timeout: float | None) -> None: ...
def gettimeout(self) -> float | None: ...
def setblocking(self, blocking: bool) -> None: ...
def getpeername(self) -> str: ...
def close(self) -> None: ...

View File

@@ -1,5 +1,5 @@
from socket import socket
from typing import Dict, Iterable, Mapping, NoReturn, Optional, Tuple, Type, Union
from typing import Dict, Iterable, Mapping, NoReturn, Tuple, Type
from paramiko.channel import Channel, ChannelFile, ChannelStderrFile, ChannelStdinFile
from paramiko.hostkeys import HostKeys
@@ -10,43 +10,43 @@ from paramiko.util import ClosingContextManager
class SSHClient(ClosingContextManager):
def __init__(self) -> None: ...
def load_system_host_keys(self, filename: Optional[str] = ...) -> None: ...
def load_system_host_keys(self, filename: str | None = ...) -> None: ...
def load_host_keys(self, filename: str) -> None: ...
def save_host_keys(self, filename: str) -> None: ...
def get_host_keys(self) -> HostKeys: ...
def set_log_channel(self, name: str) -> None: ...
def set_missing_host_key_policy(self, policy: Union[Type[MissingHostKeyPolicy], MissingHostKeyPolicy]) -> None: ...
def set_missing_host_key_policy(self, policy: Type[MissingHostKeyPolicy] | MissingHostKeyPolicy) -> None: ...
def connect(
self,
hostname: str,
port: int = ...,
username: Optional[str] = ...,
password: Optional[str] = ...,
pkey: Optional[PKey] = ...,
key_filename: Optional[str] = ...,
timeout: Optional[float] = ...,
username: str | None = ...,
password: str | None = ...,
pkey: PKey | None = ...,
key_filename: str | None = ...,
timeout: float | None = ...,
allow_agent: bool = ...,
look_for_keys: bool = ...,
compress: bool = ...,
sock: Optional[socket] = ...,
sock: socket | None = ...,
gss_auth: bool = ...,
gss_kex: bool = ...,
gss_deleg_creds: bool = ...,
gss_host: Optional[str] = ...,
banner_timeout: Optional[float] = ...,
auth_timeout: Optional[float] = ...,
gss_host: str | None = ...,
banner_timeout: float | None = ...,
auth_timeout: float | None = ...,
gss_trust_dns: bool = ...,
passphrase: Optional[str] = ...,
disabled_algorithms: Optional[Dict[str, Iterable[str]]] = ...,
passphrase: str | None = ...,
disabled_algorithms: Dict[str, Iterable[str]] | None = ...,
) -> None: ...
def close(self) -> None: ...
def exec_command(
self,
command: str,
bufsize: int = ...,
timeout: Optional[float] = ...,
timeout: float | None = ...,
get_pty: bool = ...,
environment: Optional[Dict[str, str]] = ...,
environment: Dict[str, str] | None = ...,
) -> Tuple[ChannelStdinFile, ChannelFile, ChannelStderrFile]: ...
def invoke_shell(
self,
@@ -55,10 +55,10 @@ class SSHClient(ClosingContextManager):
height: int = ...,
width_pixels: int = ...,
height_pixels: int = ...,
environment: Optional[Mapping[str, str]] = ...,
environment: Mapping[str, str] | None = ...,
) -> Channel: ...
def open_sftp(self) -> SFTPClient: ...
def get_transport(self) -> Optional[Transport]: ...
def get_transport(self) -> Transport | None: ...
class MissingHostKeyPolicy:
def missing_host_key(self, client: SSHClient, hostname: str, key: PKey) -> None: ...

View File

@@ -1,4 +1,4 @@
from typing import IO, Any, Dict, Iterable, List, Optional, Pattern, Set
from typing import IO, Any, Dict, Iterable, List, Pattern, Set
from paramiko.ssh_exception import ConfigParseError as ConfigParseError, CouldNotCanonicalize as CouldNotCanonicalize
@@ -20,10 +20,10 @@ class SSHConfig:
def get_hostnames(self) -> Set[str]: ...
class LazyFqdn:
fqdn: Optional[str]
fqdn: str | None
config: SSHConfig
host: Optional[str]
def __init__(self, config: SSHConfigDict, host: Optional[str] = ...) -> None: ...
host: str | None
def __init__(self, config: SSHConfigDict, host: str | None = ...) -> None: ...
class SSHConfigDict(Dict[str, str]):
def __init__(self, *args: Any, **kwargs: Any) -> None: ...

View File

@@ -1,24 +1,24 @@
from typing import IO, Any, Callable, Optional, Tuple
from typing import IO, Any, Callable, Tuple
from paramiko.message import Message
from paramiko.pkey import PKey
class DSSKey(PKey):
p: Optional[int]
q: Optional[int]
g: Optional[int]
y: Optional[int]
x: Optional[int]
p: int | None
q: int | None
g: int | None
y: int | None
x: int | None
public_blob: None
size: int
def __init__(
self,
msg: Optional[Message] = ...,
data: Optional[bytes] = ...,
filename: Optional[str] = ...,
password: Optional[str] = ...,
vals: Optional[Tuple[int, int, int, int]] = ...,
file_obj: Optional[IO[str]] = ...,
msg: Message | None = ...,
data: bytes | None = ...,
filename: str | None = ...,
password: str | None = ...,
vals: Tuple[int, int, int, int] | None = ...,
file_obj: IO[str] | None = ...,
) -> None: ...
def asbytes(self) -> bytes: ...
def __hash__(self) -> int: ...
@@ -27,7 +27,7 @@ class DSSKey(PKey):
def can_sign(self) -> bool: ...
def sign_ssh_data(self, data: bytes) -> Message: ...
def verify_ssh_sig(self, data: bytes, msg: Message) -> bool: ...
def write_private_key_file(self, filename: str, password: Optional[str] = ...) -> None: ...
def write_private_key(self, file_obj: IO[str], password: Optional[str] = ...) -> None: ...
def write_private_key_file(self, filename: str, password: str | None = ...) -> None: ...
def write_private_key(self, file_obj: IO[str], password: str | None = ...) -> None: ...
@staticmethod
def generate(bits: int = ..., progress_func: Optional[Callable[..., Any]] = ...) -> DSSKey: ...
def generate(bits: int = ..., progress_func: Callable[..., Any] | None = ...) -> DSSKey: ...

View File

@@ -1,4 +1,4 @@
from typing import IO, Any, Callable, List, Optional, Sequence, Tuple, Type
from typing import IO, Any, Callable, List, Sequence, Tuple, Type
from cryptography.hazmat.primitives.asymmetric.ec import EllipticCurve, EllipticCurvePrivateKey, EllipticCurvePublicKey
from cryptography.hazmat.primitives.hashes import HashAlgorithm
@@ -17,23 +17,23 @@ class _ECDSACurveSet:
ecdsa_curves: Sequence[_ECDSACurve]
def __init__(self, ecdsa_curves: Sequence[_ECDSACurve]) -> None: ...
def get_key_format_identifier_list(self) -> List[str]: ...
def get_by_curve_class(self, curve_class: Type[Any]) -> Optional[_ECDSACurve]: ...
def get_by_key_format_identifier(self, key_format_identifier: str) -> Optional[_ECDSACurve]: ...
def get_by_key_length(self, key_length: int) -> Optional[_ECDSACurve]: ...
def get_by_curve_class(self, curve_class: Type[Any]) -> _ECDSACurve | None: ...
def get_by_key_format_identifier(self, key_format_identifier: str) -> _ECDSACurve | None: ...
def get_by_key_length(self, key_length: int) -> _ECDSACurve | None: ...
class ECDSAKey(PKey):
verifying_key: EllipticCurvePublicKey
signing_key: EllipticCurvePrivateKey
public_blob: None
ecdsa_curve: Optional[_ECDSACurve]
ecdsa_curve: _ECDSACurve | None
def __init__(
self,
msg: Optional[Message] = ...,
data: Optional[bytes] = ...,
filename: Optional[str] = ...,
password: Optional[str] = ...,
vals: Optional[Tuple[EllipticCurvePrivateKey, EllipticCurvePublicKey]] = ...,
file_obj: Optional[IO[str]] = ...,
msg: Message | None = ...,
data: bytes | None = ...,
filename: str | None = ...,
password: str | None = ...,
vals: Tuple[EllipticCurvePrivateKey, EllipticCurvePublicKey] | None = ...,
file_obj: IO[str] | None = ...,
validate_point: bool = ...,
) -> None: ...
@classmethod
@@ -45,9 +45,9 @@ class ECDSAKey(PKey):
def can_sign(self) -> bool: ...
def sign_ssh_data(self, data: bytes) -> Message: ...
def verify_ssh_sig(self, data: bytes, msg: Message) -> bool: ...
def write_private_key_file(self, filename: str, password: Optional[str] = ...) -> None: ...
def write_private_key(self, file_obj: IO[str], password: Optional[str] = ...) -> None: ...
def write_private_key_file(self, filename: str, password: str | None = ...) -> None: ...
def write_private_key(self, file_obj: IO[str], password: str | None = ...) -> None: ...
@classmethod
def generate(
cls, curve: EllipticCurve = ..., progress_func: Optional[Callable[..., Any]] = ..., bits: Optional[int] = ...
cls, curve: EllipticCurve = ..., progress_func: Callable[..., Any] | None = ..., bits: int | None = ...
) -> ECDSAKey: ...

View File

@@ -1,4 +1,4 @@
from typing import IO, Optional
from typing import IO
from paramiko.message import Message
from paramiko.pkey import PKey
@@ -7,11 +7,11 @@ class Ed25519Key(PKey):
public_blob: None
def __init__(
self,
msg: Optional[Message] = ...,
data: Optional[bytes] = ...,
filename: Optional[str] = ...,
password: Optional[str] = ...,
file_obj: Optional[IO[str]] = ...,
msg: Message | None = ...,
data: bytes | None = ...,
filename: str | None = ...,
password: str | None = ...,
file_obj: IO[str] | None = ...,
) -> None: ...
def asbytes(self) -> bytes: ...
def __hash__(self) -> int: ...

View File

@@ -1,4 +1,4 @@
from typing import Any, AnyStr, Generic, Iterable, List, Optional, Tuple, Union
from typing import Any, AnyStr, Generic, Iterable, List, Tuple
from paramiko.util import ClosingContextManager
@@ -15,7 +15,7 @@ class BufferedFile(ClosingContextManager, Generic[AnyStr]):
FLAG_LINE_BUFFERED: int
FLAG_UNIVERSAL_NEWLINE: int
newlines: Union[None, AnyStr, Tuple[AnyStr, ...]]
newlines: None | AnyStr | Tuple[AnyStr, ...]
def __init__(self) -> None: ...
def __del__(self) -> None: ...
def __iter__(self) -> BufferedFile[Any]: ...
@@ -26,9 +26,9 @@ class BufferedFile(ClosingContextManager, Generic[AnyStr]):
def writable(self) -> bool: ...
def seekable(self) -> bool: ...
def readinto(self, buff: bytearray) -> int: ...
def read(self, size: Optional[int] = ...) -> bytes: ...
def readline(self, size: Optional[int] = ...) -> AnyStr: ...
def readlines(self, sizehint: Optional[int] = ...) -> List[AnyStr]: ...
def read(self, size: int | None = ...) -> bytes: ...
def readline(self, size: int | None = ...) -> AnyStr: ...
def readlines(self, sizehint: int | None = ...) -> List[AnyStr]: ...
def seek(self, offset: int, whence: int = ...) -> None: ...
def tell(self) -> int: ...
def write(self, data: AnyStr) -> None: ...

View File

@@ -1,4 +1,4 @@
from typing import Iterator, List, Mapping, MutableMapping, Optional
from typing import Iterator, List, Mapping, MutableMapping
from paramiko.pkey import PKey
@@ -13,11 +13,11 @@ class _SubDict(MutableMapping[str, PKey]):
def keys(self) -> List[str]: ... # type: ignore
class HostKeys(MutableMapping[str, _SubDict]):
def __init__(self, filename: Optional[str] = ...) -> None: ...
def __init__(self, filename: str | None = ...) -> None: ...
def add(self, hostname: str, keytype: str, key: PKey) -> None: ...
def load(self, filename: str) -> None: ...
def save(self, filename: str) -> None: ...
def lookup(self, hostname: str) -> Optional[_SubDict]: ...
def lookup(self, hostname: str) -> _SubDict | None: ...
def check(self, hostname: str, key: PKey) -> bool: ...
def clear(self) -> None: ...
def __iter__(self) -> Iterator[str]: ...
@@ -28,7 +28,7 @@ class HostKeys(MutableMapping[str, _SubDict]):
def keys(self) -> List[str]: ... # type: ignore
def values(self) -> List[_SubDict]: ... # type: ignore
@staticmethod
def hash_host(hostname: str, salt: Optional[str] = ...) -> str: ...
def hash_host(hostname: str, salt: str | None = ...) -> str: ...
class InvalidHostKey(Exception):
line: str
@@ -39,7 +39,7 @@ class HostKeyEntry:
valid: bool
hostnames: str
key: PKey
def __init__(self, hostnames: Optional[List[str]] = ..., key: Optional[PKey] = ...) -> None: ...
def __init__(self, hostnames: List[str] | None = ..., key: PKey | None = ...) -> None: ...
@classmethod
def from_line(cls, line: str, lineno: Optional[int] = ...) -> Optional[HostKeyEntry]: ...
def to_line(self) -> Optional[str]: ...
def from_line(cls, line: str, lineno: int | None = ...) -> HostKeyEntry | None: ...
def to_line(self) -> str | None: ...

View File

@@ -1,6 +1,6 @@
import sys
from _typeshed import ReadableBuffer as ReadableBuffer
from typing import Callable, Optional
from typing import Callable
from cryptography.hazmat.primitives.asymmetric.x25519 import X25519PrivateKey
from paramiko.message import Message
@@ -17,7 +17,7 @@ c_MSG_KEXECDH_REPLY: bytes
class KexCurve25519:
hash_algo: Callable[[ReadableBuffer], _Hash]
transport: Transport
key: Optional[X25519PrivateKey]
key: X25519PrivateKey | None
def __init__(self, transport: Transport) -> None: ...
@classmethod
def is_available(cls) -> bool: ...

View File

@@ -1,6 +1,6 @@
import sys
from _typeshed import ReadableBuffer
from typing import Callable, Optional, Union
from typing import Callable
from cryptography.hazmat.primitives.asymmetric.ec import EllipticCurve, EllipticCurvePrivateKey, EllipticCurvePublicKey
from paramiko.message import Message
@@ -19,9 +19,9 @@ class KexNistp256:
hash_algo: Callable[[ReadableBuffer], _Hash]
curve: EllipticCurve
transport: Transport
P: Union[int, EllipticCurvePrivateKey]
Q_C: Optional[EllipticCurvePublicKey]
Q_S: Optional[EllipticCurvePublicKey]
P: int | EllipticCurvePrivateKey
Q_C: EllipticCurvePublicKey | None
Q_S: EllipticCurvePublicKey | None
def __init__(self, transport: Transport) -> None: ...
def start_kex(self) -> None: ...
def parse_next(self, ptype: int, m: Message) -> None: ...

View File

@@ -1,6 +1,6 @@
import sys
from _typeshed import ReadableBuffer
from typing import Callable, Optional
from typing import Callable
from paramiko.message import Message
from paramiko.transport import Transport
@@ -23,12 +23,12 @@ class KexGex:
preferred_bits: int
hash_algo: Callable[[ReadableBuffer], _Hash] = ...
transport: Transport
p: Optional[int]
q: Optional[int]
g: Optional[int]
x: Optional[int]
e: Optional[int]
f: Optional[int]
p: int | None
q: int | None
g: int | None
x: int | None
e: int | None
f: int | None
old_style: bool
def __init__(self, transport: Transport) -> None: ...
def start_kex(self, _test_old_style: bool = ...) -> None: ...

View File

@@ -1,5 +1,3 @@
from typing import Optional
from paramiko.message import Message
from paramiko.ssh_gss import _SSH_GSSAuth
from paramiko.transport import Transport
@@ -28,7 +26,7 @@ class KexGSSGroup1:
NAME: str
transport: Transport
kexgss: _SSH_GSSAuth
gss_host: Optional[str]
gss_host: str | None
x: int
e: int
f: int
@@ -48,13 +46,13 @@ class KexGSSGex:
preferred_bits: int
transport: Transport
kexgss: _SSH_GSSAuth
gss_host: Optional[str]
p: Optional[int]
q: Optional[int]
g: Optional[int]
x: Optional[int]
e: Optional[int]
f: Optional[int]
gss_host: str | None
p: int | None
q: int | None
g: int | None
x: int | None
e: int | None
f: int | None
old_style: bool
def __init__(self, transport: Transport) -> None: ...
def start_kex(self) -> None: ...

View File

@@ -1,5 +1,5 @@
import sys
from typing import Any, Iterable, List, Optional, Text
from typing import Any, Iterable, List, Text
from .common import _LikeBytes
@@ -14,7 +14,7 @@ class Message:
big_int: int
packet: BytesIO
seqno: int # only when packet.Packetizer.read_message() is used
def __init__(self, content: Optional[bytes] = ...) -> None: ...
def __init__(self, content: bytes | None = ...) -> None: ...
def asbytes(self) -> bytes: ...
def rewind(self) -> None: ...
def get_remainder(self) -> bytes: ...

View File

@@ -1,4 +1,4 @@
from typing import IO, Optional, Pattern, Type, TypeVar, Union
from typing import IO, Pattern, Type, TypeVar
from paramiko.message import Message
@@ -9,10 +9,10 @@ def _unpad_openssh(data: bytes) -> bytes: ...
_PK = TypeVar("_PK", bound=PKey)
class PKey:
public_blob: Optional[PublicBlob]
public_blob: PublicBlob | None
BEGIN_TAG: Pattern[str]
END_TAG: Pattern[str]
def __init__(self, msg: Optional[Message] = ..., data: Optional[str] = ...) -> None: ...
def __init__(self, msg: Message | None = ..., data: str | None = ...) -> None: ...
def asbytes(self) -> bytes: ...
def __cmp__(self, other: object) -> int: ...
def __eq__(self, other: object) -> bool: ...
@@ -24,18 +24,18 @@ class PKey:
def sign_ssh_data(self, data: bytes) -> Message: ...
def verify_ssh_sig(self, data: bytes, msg: Message) -> bool: ...
@classmethod
def from_private_key_file(cls: Type[_PK], filename: str, password: Optional[str] = ...) -> _PK: ...
def from_private_key_file(cls: Type[_PK], filename: str, password: str | None = ...) -> _PK: ...
@classmethod
def from_private_key(cls: Type[_PK], file_obj: IO[str], password: Optional[str] = ...) -> _PK: ...
def write_private_key_file(self, filename: str, password: Optional[str] = ...) -> None: ...
def write_private_key(self, file_obj: IO[str], password: Optional[str] = ...) -> None: ...
def load_certificate(self, value: Union[Message, str]) -> None: ...
def from_private_key(cls: Type[_PK], file_obj: IO[str], password: str | None = ...) -> _PK: ...
def write_private_key_file(self, filename: str, password: str | None = ...) -> None: ...
def write_private_key(self, file_obj: IO[str], password: str | None = ...) -> None: ...
def load_certificate(self, value: Message | str) -> None: ...
class PublicBlob:
key_type: str
key_blob: str
comment: str
def __init__(self, type_: str, blob: bytes, comment: Optional[str] = ...) -> None: ...
def __init__(self, type_: str, blob: bytes, comment: str | None = ...) -> None: ...
@classmethod
def from_file(cls, filename: str) -> PublicBlob: ...
@classmethod

View File

@@ -1,12 +1,12 @@
from subprocess import Popen
from typing import Any, List, Optional
from typing import Any, List
from paramiko.util import ClosingContextManager
class ProxyCommand(ClosingContextManager):
cmd: List[str]
process: Popen[Any]
timeout: Optional[float]
timeout: float | None
def __init__(self, command_line: str) -> None: ...
def send(self, content: bytes) -> int: ...
def recv(self, size: int) -> bytes: ...

View File

@@ -1,15 +1,15 @@
import sys
from typing import Any, Iterable, Sequence, Text, Type, TypeVar, Union
from typing import Any, Iterable, Sequence, Text, Type, TypeVar
_T = TypeVar("_T")
PY2: bool
string_types: Union[Type[Any], Sequence[Type[Any]]]
text_type: Union[Type[Any], Sequence[Type[Any]]]
bytes_types: Union[Type[Any], Sequence[Type[Any]]]
string_types: Type[Any] | Sequence[Type[Any]]
text_type: Type[Any] | Sequence[Type[Any]]
bytes_types: Type[Any] | Sequence[Type[Any]]
bytes = bytes
integer_types: Union[Type[Any], Sequence[Type[Any]]]
integer_types: Type[Any] | Sequence[Type[Any]]
long = int
def input(prompt: Any) -> str: ...
@@ -29,12 +29,12 @@ else:
StringIO = cStringIO.StringIO
BytesIO = StringIO
def byte_ord(c: Union[int, str]) -> int: ...
def byte_ord(c: int | str) -> int: ...
def byte_chr(c: int) -> bytes: ...
def byte_mask(c: int, mask: int) -> bytes: ...
def b(s: Union[bytes, str], encoding: str = ...) -> bytes: ...
def u(s: Union[bytes, str], encoding: str = ...) -> Text: ...
def b2s(s: Union[bytes, str]) -> str: ...
def b(s: bytes | str, encoding: str = ...) -> bytes: ...
def u(s: bytes | str, encoding: str = ...) -> Text: ...
def b2s(s: bytes | str) -> str: ...
def is_callable(c: Any) -> bool: ...
def next(c: Iterable[_T]) -> _T: ...

View File

@@ -1,20 +1,20 @@
from typing import IO, Any, Callable, Optional, Union
from typing import IO, Any, Callable
from cryptography.hazmat.primitives.asymmetric.rsa import RSAPrivateKey, RSAPublicKey, RSAPublicNumbers
from paramiko.message import Message
from paramiko.pkey import PKey
class RSAKey(PKey):
key: Union[None, RSAPublicKey, RSAPrivateKey]
key: None | RSAPublicKey | RSAPrivateKey
public_blob: None
def __init__(
self,
msg: Optional[Message] = ...,
data: Optional[bytes] = ...,
filename: Optional[str] = ...,
password: Optional[str] = ...,
key: Union[None, RSAPublicKey, RSAPrivateKey] = ...,
file_obj: Optional[IO[str]] = ...,
msg: Message | None = ...,
data: bytes | None = ...,
filename: str | None = ...,
password: str | None = ...,
key: None | RSAPublicKey | RSAPrivateKey = ...,
file_obj: IO[str] | None = ...,
) -> None: ...
@property
def size(self) -> int: ...
@@ -27,7 +27,7 @@ class RSAKey(PKey):
def can_sign(self) -> bool: ...
def sign_ssh_data(self, data: bytes) -> Message: ...
def verify_ssh_sig(self, data: bytes, msg: Message) -> bool: ...
def write_private_key_file(self, filename: str, password: Optional[str] = ...) -> None: ...
def write_private_key(self, file_obj: IO[str], password: Optional[str] = ...) -> None: ...
def write_private_key_file(self, filename: str, password: str | None = ...) -> None: ...
def write_private_key(self, file_obj: IO[str], password: str | None = ...) -> None: ...
@staticmethod
def generate(bits: int, progress_func: Optional[Callable[..., Any]] = ...) -> RSAKey: ...
def generate(bits: int, progress_func: Callable[..., Any] | None = ...) -> RSAKey: ...

View File

@@ -1,5 +1,5 @@
import threading
from typing import Any, List, Optional, Tuple, Union
from typing import Any, List, Tuple
from paramiko.channel import Channel
from paramiko.message import Message
@@ -12,14 +12,14 @@ class ServerInterface:
def check_auth_none(self, username: str) -> int: ...
def check_auth_password(self, username: str, password: str) -> int: ...
def check_auth_publickey(self, username: str, key: PKey) -> int: ...
def check_auth_interactive(self, username: str, submethods: str) -> Union[int, InteractiveQuery]: ...
def check_auth_interactive_response(self, responses: List[str]) -> Union[int, InteractiveQuery]: ...
def check_auth_gssapi_with_mic(self, username: str, gss_authenticated: int = ..., cc_file: Optional[str] = ...) -> int: ...
def check_auth_gssapi_keyex(self, username: str, gss_authenticated: int = ..., cc_file: Optional[str] = ...) -> int: ...
def check_auth_interactive(self, username: str, submethods: str) -> int | InteractiveQuery: ...
def check_auth_interactive_response(self, responses: List[str]) -> int | InteractiveQuery: ...
def check_auth_gssapi_with_mic(self, username: str, gss_authenticated: int = ..., cc_file: str | None = ...) -> int: ...
def check_auth_gssapi_keyex(self, username: str, gss_authenticated: int = ..., cc_file: str | None = ...) -> int: ...
def enable_auth_gssapi(self) -> bool: ...
def check_port_forward_request(self, address: str, port: int) -> int: ...
def cancel_port_forward_request(self, address: str, port: int) -> None: ...
def check_global_request(self, kind: str, msg: Message) -> Union[bool, Tuple[Any, ...]]: ...
def check_global_request(self, kind: str, msg: Message) -> bool | Tuple[Any, ...]: ...
def check_channel_pty_request(
self, channel: Channel, term: str, width: int, height: int, pixelwidth: int, pixelheight: int, modes: str
) -> bool: ...
@@ -35,13 +35,13 @@ class ServerInterface:
def check_channel_forward_agent_request(self, channel: Channel) -> bool: ...
def check_channel_direct_tcpip_request(self, chanid: int, origin: Tuple[str, int], destination: Tuple[str, int]) -> int: ...
def check_channel_env_request(self, channel: Channel, name: str, value: str) -> bool: ...
def get_banner(self) -> Tuple[Optional[str], Optional[str]]: ...
def get_banner(self) -> Tuple[str | None, str | None]: ...
class InteractiveQuery:
name: str
instructions: str
prompts: List[Tuple[str, bool]]
def __init__(self, name: str = ..., instructions: str = ..., *prompts: Union[str, Tuple[str, bool]]) -> None: ...
def __init__(self, name: str = ..., instructions: str = ..., *prompts: str | Tuple[str, bool]) -> None: ...
def add_prompt(self, prompt: str, echo: bool = ...) -> None: ...
class SubsystemHandler(threading.Thread):

View File

@@ -1,5 +1,5 @@
from logging import Logger
from typing import Dict, List, Optional
from typing import Dict, List
from paramiko.channel import Channel
@@ -56,6 +56,6 @@ class SFTPError(Exception): ...
class BaseSFTP:
logger: Logger
sock: Optional[Channel]
sock: Channel | None
ultra_debug: bool
def __init__(self) -> None: ...

View File

@@ -1,5 +1,5 @@
from os import stat_result
from typing import Dict, Optional
from typing import Dict
class SFTPAttributes:
FLAG_SIZE: int
@@ -7,16 +7,16 @@ class SFTPAttributes:
FLAG_PERMISSIONS: int
FLAG_AMTIME: int
FLAG_EXTENDED: int
st_size: Optional[int]
st_uid: Optional[int]
st_gid: Optional[int]
st_mode: Optional[int]
st_atime: Optional[int]
st_mtime: Optional[int]
st_size: int | None
st_uid: int | None
st_gid: int | None
st_mode: int | None
st_atime: int | None
st_mtime: int | None
filename: str # only when from_stat() is used
longname: str # only when from_stat() is used
attr: Dict[str, str]
def __init__(self) -> None: ...
@classmethod
def from_stat(cls, obj: stat_result, filename: Optional[str] = ...) -> SFTPAttributes: ...
def from_stat(cls, obj: stat_result, filename: str | None = ...) -> SFTPAttributes: ...
def asbytes(self) -> bytes: ...

View File

@@ -1,5 +1,5 @@
from logging import Logger
from typing import IO, Any, Callable, Iterator, List, Optional, Text, Tuple, Union
from typing import IO, Any, Callable, Iterator, List, Text, Tuple
from paramiko.channel import Channel
from paramiko.sftp import BaseSFTP
@@ -20,48 +20,39 @@ class SFTPClient(BaseSFTP, ClosingContextManager):
def __init__(self, sock: Channel) -> None: ...
@classmethod
def from_transport(
cls, t: Transport, window_size: Optional[int] = ..., max_packet_size: Optional[int] = ...
) -> Optional[SFTPClient]: ...
cls, t: Transport, window_size: int | None = ..., max_packet_size: int | None = ...
) -> SFTPClient | None: ...
def close(self) -> None: ...
def get_channel(self) -> Optional[Channel]: ...
def get_channel(self) -> Channel | None: ...
def listdir(self, path: str = ...) -> List[str]: ...
def listdir_attr(self, path: str = ...) -> List[SFTPAttributes]: ...
def listdir_iter(self, path: Union[bytes, Text] = ..., read_aheads: int = ...) -> Iterator[SFTPAttributes]: ...
def open(self, filename: Union[bytes, Text], mode: str = ..., bufsize: int = ...) -> SFTPFile: ...
def listdir_iter(self, path: bytes | Text = ..., read_aheads: int = ...) -> Iterator[SFTPAttributes]: ...
def open(self, filename: bytes | Text, mode: str = ..., bufsize: int = ...) -> SFTPFile: ...
file = open
def remove(self, path: Union[bytes, Text]) -> None: ...
def remove(self, path: bytes | Text) -> None: ...
unlink = remove
def rename(self, oldpath: Union[bytes, Text], newpath: Union[bytes, Text]) -> None: ...
def posix_rename(self, oldpath: Union[bytes, Text], newpath: Union[bytes, Text]) -> None: ...
def mkdir(self, path: Union[bytes, Text], mode: int = ...) -> None: ...
def rmdir(self, path: Union[bytes, Text]) -> None: ...
def stat(self, path: Union[bytes, Text]) -> SFTPAttributes: ...
def lstat(self, path: Union[bytes, Text]) -> SFTPAttributes: ...
def symlink(self, source: Union[bytes, Text], dest: Union[bytes, Text]) -> None: ...
def chmod(self, path: Union[bytes, Text], mode: int) -> None: ...
def chown(self, path: Union[bytes, Text], uid: int, gid: int) -> None: ...
def utime(self, path: Union[bytes, Text], times: Optional[Tuple[float, float]]) -> None: ...
def truncate(self, path: Union[bytes, Text], size: int) -> None: ...
def readlink(self, path: Union[bytes, Text]) -> Optional[Text]: ...
def normalize(self, path: Union[bytes, Text]) -> Text: ...
def chdir(self, path: Union[None, bytes, Text] = ...) -> None: ...
def getcwd(self) -> Optional[Text]: ...
def rename(self, oldpath: bytes | Text, newpath: bytes | Text) -> None: ...
def posix_rename(self, oldpath: bytes | Text, newpath: bytes | Text) -> None: ...
def mkdir(self, path: bytes | Text, mode: int = ...) -> None: ...
def rmdir(self, path: bytes | Text) -> None: ...
def stat(self, path: bytes | Text) -> SFTPAttributes: ...
def lstat(self, path: bytes | Text) -> SFTPAttributes: ...
def symlink(self, source: bytes | Text, dest: bytes | Text) -> None: ...
def chmod(self, path: bytes | Text, mode: int) -> None: ...
def chown(self, path: bytes | Text, uid: int, gid: int) -> None: ...
def utime(self, path: bytes | Text, times: Tuple[float, float] | None) -> None: ...
def truncate(self, path: bytes | Text, size: int) -> None: ...
def readlink(self, path: bytes | Text) -> Text | None: ...
def normalize(self, path: bytes | Text) -> Text: ...
def chdir(self, path: None | bytes | Text = ...) -> None: ...
def getcwd(self) -> Text | None: ...
def putfo(
self,
fl: IO[bytes],
remotepath: Union[bytes, Text],
file_size: int = ...,
callback: Optional[_Callback] = ...,
confirm: bool = ...,
self, fl: IO[bytes], remotepath: bytes | Text, file_size: int = ..., callback: _Callback | None = ..., confirm: bool = ...
) -> SFTPAttributes: ...
def put(
self,
localpath: Union[bytes, Text],
remotepath: Union[bytes, Text],
callback: Optional[_Callback] = ...,
confirm: bool = ...,
self, localpath: bytes | Text, remotepath: bytes | Text, callback: _Callback | None = ..., confirm: bool = ...
) -> SFTPAttributes: ...
def getfo(self, remotepath: Union[bytes, Text], fl: IO[bytes], callback: Optional[_Callback] = ...) -> int: ...
def get(self, remotepath: Union[bytes, Text], localpath: Union[bytes, Text], callback: Optional[_Callback] = ...) -> None: ...
def getfo(self, remotepath: bytes | Text, fl: IO[bytes], callback: _Callback | None = ...) -> int: ...
def get(self, remotepath: bytes | Text, localpath: bytes | Text, callback: _Callback | None = ...) -> None: ...
class SFTP(SFTPClient): ...

View File

@@ -1,4 +1,4 @@
from typing import Any, Iterator, Optional, Sequence, Tuple
from typing import Any, Iterator, Sequence, Tuple
from paramiko.file import BufferedFile
from paramiko.sftp_attr import SFTPAttributes
@@ -21,9 +21,9 @@ class SFTPFile(BufferedFile[Any]):
def stat(self) -> SFTPAttributes: ...
def chmod(self, mode: int) -> None: ...
def chown(self, uid: int, gid: int) -> None: ...
def utime(self, times: Optional[Tuple[float, float]]) -> None: ...
def utime(self, times: Tuple[float, float] | None) -> None: ...
def truncate(self, size: int) -> None: ...
def check(self, hash_algorithm: str, offset: int = ..., length: int = ..., block_size: int = ...) -> bytes: ...
def set_pipelined(self, pipelined: bool = ...) -> None: ...
def prefetch(self, file_size: Optional[int] = ...) -> None: ...
def prefetch(self, file_size: int | None = ...) -> None: ...
def readv(self, chunks: Sequence[Tuple[int, int]]) -> Iterator[bytes]: ...

View File

@@ -1,12 +1,10 @@
from typing import Union
from paramiko.sftp_attr import SFTPAttributes
from paramiko.util import ClosingContextManager
class SFTPHandle(ClosingContextManager):
def __init__(self, flags: int = ...) -> None: ...
def close(self) -> None: ...
def read(self, offset: int, length: int) -> Union[bytes, int]: ...
def read(self, offset: int, length: int) -> bytes | int: ...
def write(self, offset: int, data: bytes) -> int: ...
def stat(self) -> Union[int, SFTPAttributes]: ...
def stat(self) -> int | SFTPAttributes: ...
def chattr(self, attr: SFTPAttributes) -> int: ...

View File

@@ -1,5 +1,5 @@
from logging import Logger
from typing import Any, Dict, Optional, Type
from typing import Any, Dict, Type
from paramiko.channel import Channel
from paramiko.server import ServerInterface, SubsystemHandler
@@ -16,7 +16,7 @@ class SFTPServer(BaseSFTP, SubsystemHandler):
file_table: Dict[bytes, SFTPHandle]
folder_table: Dict[bytes, SFTPHandle]
server: SFTPServerInterface
sock: Optional[Channel]
sock: Channel | None
def __init__(
self, channel: Channel, name: str, server: ServerInterface, sftp_si: Type[SFTPServerInterface], *largs: Any, **kwargs: Any
) -> None: ...

View File

@@ -1,4 +1,4 @@
from typing import Any, List, Union
from typing import Any, List
from paramiko.server import ServerInterface
from paramiko.sftp_attr import SFTPAttributes
@@ -8,10 +8,10 @@ class SFTPServerInterface:
def __init__(self, server: ServerInterface, *largs: Any, **kwargs: Any) -> None: ...
def session_started(self) -> None: ...
def session_ended(self) -> None: ...
def open(self, path: str, flags: int, attr: SFTPAttributes) -> Union[SFTPHandle, int]: ...
def list_folder(self, path: str) -> Union[List[SFTPAttributes], int]: ...
def stat(self, path: str) -> Union[SFTPAttributes, int]: ...
def lstat(self, path: str) -> Union[SFTPAttributes, int]: ...
def open(self, path: str, flags: int, attr: SFTPAttributes) -> SFTPHandle | int: ...
def list_folder(self, path: str) -> List[SFTPAttributes] | int: ...
def stat(self, path: str) -> SFTPAttributes | int: ...
def lstat(self, path: str) -> SFTPAttributes | int: ...
def remove(self, path: str) -> int: ...
def rename(self, oldpath: str, newpath: str) -> int: ...
def posix_rename(self, oldpath: str, newpath: str) -> int: ...
@@ -19,5 +19,5 @@ class SFTPServerInterface:
def rmdir(self, path: str) -> int: ...
def chattr(self, path: str, attr: SFTPAttributes) -> int: ...
def canonicalize(self, path: str) -> str: ...
def readlink(self, path: str) -> Union[str, int]: ...
def readlink(self, path: str) -> str | int: ...
def symlink(self, target_path: str, path: str) -> int: ...

View File

@@ -1,5 +1,5 @@
import socket
from typing import List, Mapping, Tuple, Union
from typing import List, Mapping, Tuple
from paramiko.pkey import PKey
@@ -33,9 +33,9 @@ class ProxyCommandFailure(SSHException):
def __init__(self, command: str, error: str) -> None: ...
class NoValidConnectionsError(socket.error):
errors: Mapping[Union[Tuple[str, int], Tuple[str, int, int, int]], Exception]
def __init__(self, errors: Mapping[Union[Tuple[str, int], Tuple[str, int, int, int]], Exception]) -> None: ...
def __reduce__(self) -> Tuple[type, Tuple[Mapping[Union[Tuple[str, int], Tuple[str, int, int, int]], Exception]]]: ...
errors: Mapping[Tuple[str, int] | Tuple[str, int, int, int], Exception]
def __init__(self, errors: Mapping[Tuple[str, int] | Tuple[str, int, int, int], Exception]) -> None: ...
def __reduce__(self) -> Tuple[type, Tuple[Mapping[Tuple[str, int] | Tuple[str, int, int, int], Exception]]]: ...
class CouldNotCanonicalize(SSHException): ...
class ConfigParseError(SSHException): ...

View File

@@ -1,4 +1,4 @@
from typing import Any, Optional, Tuple, Type
from typing import Any, Tuple, Type
GSS_AUTH_AVAILABLE: bool
GSS_EXCEPTIONS: Tuple[Type[Exception], ...]
@@ -16,11 +16,11 @@ class _SSH_GSSAuth:
class _SSH_GSSAPI_OLD(_SSH_GSSAuth):
def __init__(self, auth_method: str, gss_deleg_creds: bool) -> None: ...
def ssh_init_sec_context(
self, target: str, desired_mech: Optional[str] = ..., username: Optional[str] = ..., recv_token: Optional[str] = ...
) -> Optional[str]: ...
self, target: str, desired_mech: str | None = ..., username: str | None = ..., recv_token: str | None = ...
) -> str | None: ...
def ssh_get_mic(self, session_id: bytes, gss_kex: bool = ...) -> Any: ...
def ssh_accept_sec_context(self, hostname: str, recv_token: str, username: Optional[str] = ...) -> Optional[str]: ...
def ssh_check_mic(self, mic_token: str, session_id: bytes, username: Optional[str] = ...) -> None: ...
def ssh_accept_sec_context(self, hostname: str, recv_token: str, username: str | None = ...) -> str | None: ...
def ssh_check_mic(self, mic_token: str, session_id: bytes, username: str | None = ...) -> None: ...
@property
def credentials_delegated(self) -> bool: ...
def save_client_creds(self, client_token: str) -> None: ...
@@ -30,11 +30,11 @@ _SSH_GSSAPI = _SSH_GSSAPI_OLD
class _SSH_GSSAPI_NEW(_SSH_GSSAuth):
def __init__(self, auth_method: str, gss_deleg_creds: bool) -> None: ...
def ssh_init_sec_context(
self, target: str, desired_mech: Optional[str] = ..., username: Optional[str] = ..., recv_token: Optional[str] = ...
self, target: str, desired_mech: str | None = ..., username: str | None = ..., recv_token: str | None = ...
) -> str: ...
def ssh_get_mic(self, session_id: bytes, gss_kex: bool = ...) -> Any: ...
def ssh_accept_sec_context(self, hostname: str, recv_token: str, username: Optional[str] = ...) -> Optional[str]: ...
def ssh_check_mic(self, mic_token: str, session_id: bytes, username: Optional[str] = ...) -> None: ...
def ssh_accept_sec_context(self, hostname: str, recv_token: str, username: str | None = ...) -> str | None: ...
def ssh_check_mic(self, mic_token: str, session_id: bytes, username: str | None = ...) -> None: ...
@property
def credentials_delegated(self) -> bool: ...
def save_client_creds(self, client_token: str) -> None: ...
@@ -42,11 +42,11 @@ class _SSH_GSSAPI_NEW(_SSH_GSSAuth):
class _SSH_SSPI(_SSH_GSSAuth):
def __init__(self, auth_method: str, gss_deleg_creds: bool) -> None: ...
def ssh_init_sec_context(
self, target: str, desired_mech: Optional[str] = ..., username: Optional[str] = ..., recv_token: Optional[str] = ...
self, target: str, desired_mech: str | None = ..., username: str | None = ..., recv_token: str | None = ...
) -> str: ...
def ssh_get_mic(self, session_id: bytes, gss_kex: bool = ...) -> Any: ...
def ssh_accept_sec_context(self, hostname: str, username: str, recv_token: str) -> Optional[str]: ...
def ssh_check_mic(self, mic_token: str, session_id: bytes, username: Optional[str] = ...) -> None: ...
def ssh_accept_sec_context(self, hostname: str, username: str, recv_token: str) -> str | None: ...
def ssh_check_mic(self, mic_token: str, session_id: bytes, username: str | None = ...) -> None: ...
@property
def credentials_delegated(self) -> bool: ...
def save_client_creds(self, client_token: str) -> None: ...

View File

@@ -2,7 +2,7 @@ from logging import Logger
from socket import socket
from threading import Condition, Event, Lock, Thread
from types import ModuleType
from typing import Any, Callable, Dict, Iterable, List, Optional, Protocol, Sequence, Tuple, Type, Union
from typing import Any, Callable, Dict, Iterable, List, Protocol, Sequence, Tuple, Type
from paramiko.auth_handler import AuthHandler, _InteractiveCallback
from paramiko.channel import Channel
@@ -22,25 +22,25 @@ class _KexEngine(Protocol):
class Transport(Thread, ClosingContextManager):
active: bool
hostname: Optional[str]
hostname: str | None
sock: socket
packetizer: Packetizer
local_version: str
remote_version: str
local_cipher: str
local_kex_init: Optional[bytes]
local_mac: Optional[str]
local_compression: Optional[str]
session_id: Optional[bytes]
host_key_type: Optional[str]
host_key: Optional[PKey]
local_kex_init: bytes | None
local_mac: str | None
local_compression: str | None
session_id: bytes | None
host_key_type: str | None
host_key: PKey | None
use_gss_kex: bool
gss_kex_used: bool
kexgss_ctxt: Optional[_SSH_GSSAuth]
kexgss_ctxt: _SSH_GSSAuth | None
gss_host: str
kex_engine: Optional[_KexEngine]
H: Optional[bytes]
K: Optional[int]
kex_engine: _KexEngine | None
H: bytes | None
K: int | None
initial_kex_done: bool
in_kex: bool
authenticated: bool
@@ -49,21 +49,21 @@ class Transport(Thread, ClosingContextManager):
channels_seen: Dict[int, bool]
default_max_packet_size: int
default_window_size: int
saved_exception: Optional[Exception]
saved_exception: Exception | None
clear_to_send: Event
clear_to_send_lock: Lock
clear_to_send_timeout: float
log_name: str
logger: Logger
auth_handler: Optional[AuthHandler]
global_response: Optional[Message]
completion_event: Optional[Event]
auth_handler: AuthHandler | None
global_response: Message | None
completion_event: Event | None
banner_timeout: float
handshake_timeout: float
auth_timeout: float
disabled_algorithms: Optional[Dict[str, Iterable[str]]]
disabled_algorithms: Dict[str, Iterable[str]] | None
server_mode: bool
server_object: Optional[ServerInterface]
server_object: ServerInterface | None
server_key_dict: Dict[str, PKey]
server_accepts: List[Channel]
server_accept_cv: Condition
@@ -71,12 +71,12 @@ class Transport(Thread, ClosingContextManager):
sys: ModuleType
def __init__(
self,
sock: Union[str, Tuple[str, int], socket],
sock: str | Tuple[str, int] | socket,
default_window_size: int = ...,
default_max_packet_size: int = ...,
gss_kex: bool = ...,
gss_deleg_creds: bool = ...,
disabled_algorithms: Optional[Dict[str, Iterable[str]]] = ...,
disabled_algorithms: Dict[str, Iterable[str]] | None = ...,
) -> None: ...
@property
def preferred_ciphers(self) -> Sequence[str]: ...
@@ -90,18 +90,18 @@ class Transport(Thread, ClosingContextManager):
def preferred_compression(self) -> Sequence[str]: ...
def atfork(self) -> None: ...
def get_security_options(self) -> SecurityOptions: ...
def set_gss_host(self, gss_host: Optional[str], trust_dns: bool = ..., gssapi_requested: bool = ...) -> None: ...
def start_client(self, event: Optional[Event] = ..., timeout: Optional[float] = ...) -> None: ...
def start_server(self, event: Event = ..., server: Optional[ServerInterface] = ...) -> None: ...
def set_gss_host(self, gss_host: str | None, trust_dns: bool = ..., gssapi_requested: bool = ...) -> None: ...
def start_client(self, event: Event | None = ..., timeout: float | None = ...) -> None: ...
def start_server(self, event: Event = ..., server: ServerInterface | None = ...) -> None: ...
def add_server_key(self, key: PKey) -> None: ...
def get_server_key(self) -> Optional[PKey]: ...
def get_server_key(self) -> PKey | None: ...
@staticmethod
def load_server_moduli(filename: Optional[str] = ...) -> bool: ...
def load_server_moduli(filename: str | None = ...) -> bool: ...
def close(self) -> None: ...
def get_remote_server_key(self) -> PKey: ...
def is_active(self) -> bool: ...
def open_session(
self, window_size: Optional[int] = ..., max_packet_size: Optional[int] = ..., timeout: Optional[float] = ...
self, window_size: int | None = ..., max_packet_size: int | None = ..., timeout: float | None = ...
) -> Channel: ...
def open_x11_channel(self, src_addr: _Addr = ...) -> Channel: ...
def open_forward_agent_channel(self) -> Channel: ...
@@ -109,45 +109,45 @@ class Transport(Thread, ClosingContextManager):
def open_channel(
self,
kind: str,
dest_addr: Optional[_Addr] = ...,
src_addr: Optional[_Addr] = ...,
window_size: Optional[int] = ...,
max_packet_size: Optional[int] = ...,
timeout: Optional[float] = ...,
dest_addr: _Addr | None = ...,
src_addr: _Addr | None = ...,
window_size: int | None = ...,
max_packet_size: int | None = ...,
timeout: float | None = ...,
) -> Channel: ...
def request_port_forward(
self, address: str, port: int, handler: Optional[Callable[[Channel, _Addr, _Addr], None]] = ...
self, address: str, port: int, handler: Callable[[Channel, _Addr, _Addr], None] | None = ...
) -> int: ...
def cancel_port_forward(self, address: str, port: int) -> None: ...
def open_sftp_client(self) -> Optional[SFTPClient]: ...
def open_sftp_client(self) -> SFTPClient | None: ...
def send_ignore(self, byte_count: int = ...) -> None: ...
def renegotiate_keys(self) -> None: ...
def set_keepalive(self, interval: int) -> None: ...
def global_request(self, kind: str, data: Optional[Iterable[Any]] = ..., wait: bool = ...) -> Optional[Message]: ...
def accept(self, timeout: Optional[float] = ...) -> Optional[Channel]: ...
def global_request(self, kind: str, data: Iterable[Any] | None = ..., wait: bool = ...) -> Message | None: ...
def accept(self, timeout: float | None = ...) -> Channel | None: ...
def connect(
self,
hostkey: Optional[PKey] = ...,
hostkey: PKey | None = ...,
username: str = ...,
password: Optional[str] = ...,
pkey: Optional[PKey] = ...,
gss_host: Optional[str] = ...,
password: str | None = ...,
pkey: PKey | None = ...,
gss_host: str | None = ...,
gss_auth: bool = ...,
gss_kex: bool = ...,
gss_deleg_creds: bool = ...,
gss_trust_dns: bool = ...,
) -> None: ...
def get_exception(self) -> Optional[Exception]: ...
def get_exception(self) -> Exception | None: ...
def set_subsystem_handler(self, name: str, handler: Type[SubsystemHandler], *larg: Any, **kwarg: Any) -> None: ...
def is_authenticated(self) -> bool: ...
def get_username(self) -> Optional[str]: ...
def get_banner(self) -> Optional[bytes]: ...
def get_username(self) -> str | None: ...
def get_banner(self) -> bytes | None: ...
def auth_none(self, username: str) -> List[str]: ...
def auth_password(self, username: str, password: str, event: Optional[Event] = ..., fallback: bool = ...) -> List[str]: ...
def auth_publickey(self, username: str, key: PKey, event: Optional[Event] = ...) -> List[str]: ...
def auth_password(self, username: str, password: str, event: Event | None = ..., fallback: bool = ...) -> List[str]: ...
def auth_publickey(self, username: str, key: PKey, event: Event | None = ...) -> List[str]: ...
def auth_interactive(self, username: str, handler: _InteractiveCallback, submethods: str = ...) -> List[str]: ...
def auth_interactive_dumb(
self, username: str, handler: Optional[_InteractiveCallback] = ..., submethods: str = ...
self, username: str, handler: _InteractiveCallback | None = ..., submethods: str = ...
) -> List[str]: ...
def auth_gssapi_with_mic(self, username: str, gss_host: str, gss_deleg_creds: bool) -> List[str]: ...
def auth_gssapi_keyex(self, username: str) -> List[str]: ...

View File

@@ -1,7 +1,7 @@
import sys
from logging import Logger, LogRecord
from types import TracebackType
from typing import IO, AnyStr, Callable, List, Optional, Protocol, Type, TypeVar, Union
from typing import IO, AnyStr, Callable, List, Protocol, Type, TypeVar
from paramiko.config import SSHConfig, SSHConfigDict
from paramiko.hostkeys import HostKeys
@@ -28,7 +28,7 @@ def format_binary_line(data: bytes) -> str: ...
def safe_string(s: bytes) -> bytes: ...
def bit_length(n: int) -> int: ...
def tb_strings() -> List[str]: ...
def generate_key_bytes(hash_alg: Type[_Hash], salt: bytes, key: Union[bytes, str], nbytes: int) -> bytes: ...
def generate_key_bytes(hash_alg: Type[_Hash], salt: bytes, key: bytes | str, nbytes: int) -> bytes: ...
def load_host_keys(filename: str) -> HostKeys: ...
def parse_ssh_config(file_obj: IO[str]) -> SSHConfig: ...
def lookup_ssh_host_config(hostname: str, config: SSHConfig) -> SSHConfigDict: ...
@@ -46,7 +46,7 @@ def constant_time_bytes_eq(a: AnyStr, b: AnyStr) -> bool: ...
class ClosingContextManager:
def __enter__(self: _TC) -> _TC: ...
def __exit__(
self, type: Optional[Type[BaseException]], value: Optional[BaseException], traceback: Optional[TracebackType]
self, type: Type[BaseException] | None, value: BaseException | None, traceback: TracebackType | None
) -> None: ...
def clamp_value(minimum: int, val: int, maximum: int) -> int: ...