From eb9edf1bd6bd2e7e2f1b7e3ed4a79c56db9da469 Mon Sep 17 00:00:00 2001 From: Nikita Sobolev Date: Sun, 24 Jul 2022 18:48:41 +0300 Subject: [PATCH] Add missing methods to `OpenSSL.SSL.Connection` (#8374) Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com> Co-authored-by: Alex Waygood --- stubs/pyOpenSSL/OpenSSL/SSL.pyi | 61 +++++++++++++++++++++++++++------ 1 file changed, 51 insertions(+), 10 deletions(-) diff --git a/stubs/pyOpenSSL/OpenSSL/SSL.pyi b/stubs/pyOpenSSL/OpenSSL/SSL.pyi index 8857e80a4..9881f9115 100644 --- a/stubs/pyOpenSSL/OpenSSL/SSL.pyi +++ b/stubs/pyOpenSSL/OpenSSL/SSL.pyi @@ -1,8 +1,10 @@ import socket -from collections.abc import Callable, Sequence +from _typeshed import Incomplete, ReadableBuffer +from collections.abc import Callable, MutableSequence, Sequence from typing import Any, TypeVar -from OpenSSL.crypto import X509, PKey +from _socket import _Address, _RetAddress +from OpenSSL.crypto import X509, PKey, X509Name OPENSSL_VERSION_NUMBER: int SSLEAY_VERSION: int @@ -115,25 +117,64 @@ def SSLeay_version(type: int) -> str: ... class Session: ... class Connection: - def __getattr__(self, name: str) -> Any: ... # incomplete + def __getattr__(self, name: str) -> Any: ... # takes attributes from `self._socket` def __init__(self, context: Context, socket: socket.socket | None = ...) -> None: ... - def connect(self, addr: str | bytes | Sequence[str | int]) -> None: ... - def do_handshake(self) -> None: ... - def get_peer_certificate(self) -> X509: ... + def get_context(self) -> Context: ... + def set_context(self, context: Context) -> None: ... + def get_servername(self) -> bytes | None: ... def set_tlsext_host_name(self, name: bytes) -> None: ... + def pending(self) -> int: ... + def send(self, buf: ReadableBuffer | str, flags: int = ...) -> int: ... + write = send + def sendall(self, buf: ReadableBuffer | str, flags: int = ...) -> int: ... + def recv(self, bufsiz: int, flags: int | None = ...) -> bytes: ... + read = recv + def recv_into(self, buffer: MutableSequence[int], nbytes: int | None = ..., flags: int | None = ...) -> int: ... + def connect(self, addr: str | bytes | Sequence[str | int]) -> None: ... + def connect_ex(self, addr: _Address | bytes) -> int: ... + def accept(self) -> tuple[Connection, _RetAddress]: ... + def shutdown(self) -> bool: ... + def do_handshake(self) -> None: ... + def get_certificate(self) -> X509 | None: ... + def get_peer_certificate(self) -> X509 | None: ... + def get_peer_cert_chain(self) -> list[X509] | None: ... + def get_verified_chain(self) -> list[X509] | None: ... def bio_read(self, bufsiz: int) -> bytes: ... def bio_write(self, buf: bytes) -> int: ... - def recv(self, bufsiz: int, flags: int | None = ...) -> bytes: ... - def sendall(self, buf: bytes, flags: int = ...) -> int: ... + def bio_shutdown(self) -> None: ... + def renegotiate(self) -> bool: ... + def renegotiate_pending(self) -> bool: ... + def total_renegotiations(self) -> int: ... def set_accept_state(self) -> None: ... def set_connect_state(self) -> None: ... - def get_peer_cert_chain(self) -> list[X509]: ... - def get_alpn_proto_negotiated(self) -> bytes: ... + def get_client_ca_list(self) -> list[X509Name]: ... + def get_cipher_list(self) -> list[str]: ... def get_cipher_name(self) -> str | None: ... + def get_cipher_bits(self) -> int | None: ... + def get_cipher_version(self) -> str | None: ... def get_protocol_version_name(self) -> str: ... + def get_protocol_version(self) -> int: ... def get_shutdown(self) -> int: ... + def set_shutdown(self, state: int) -> None: ... + def get_state_string(self) -> bytes: ... + def server_random(self) -> bytes | None: ... + def client_random(self) -> bytes | None: ... + def master_key(self) -> bytes | None: ... + def export_keying_material( + self, label: Incomplete, olen: Incomplete, context: Incomplete = ... + ) -> Incomplete: ... # TODO: type, see RFC-5705 def get_app_data(self) -> Any: ... def set_app_data(self, data: Any) -> None: ... + def sock_shutdown(self, __how: int) -> None: ... # alias to `_socket.socket.shutdown` + def want_read(self) -> bool: ... + def want_write(self) -> bool: ... + def get_session(self) -> Session | None: ... + def set_session(self, session: Session) -> None: ... + def get_finished(self) -> bytes | None: ... + def get_peer_finished(self) -> bytes | None: ... + def set_alpn_protos(self, protos: Sequence[bytes]) -> None: ... + def get_alpn_proto_negotiated(self) -> bytes: ... + def request_ocsp(self) -> None: ... _T = TypeVar("_T")