psycopg2: stub improvements (#7964)

Fixes an entry from #7928 along with a number of other improvements.

I went off the C code:
https://github.com/psycopg/psycopg2/blob/master/psycopg/connection_type.c
This commit is contained in:
Jelle Zijlstra
2022-06-02 18:02:07 -07:00
committed by GitHub
parent 51f97dda15
commit cb9023988d

View File

@@ -2,7 +2,7 @@ from _typeshed import Self
from collections.abc import Callable, Iterable, Mapping, Sequence
from types import TracebackType
from typing import Any, TypeVar, overload
from typing_extensions import TypeAlias
from typing_extensions import Literal, TypeAlias
import psycopg2
import psycopg2.extensions
@@ -82,13 +82,13 @@ class cursor:
row_factory: Any
rowcount: int
rownumber: int
scrollable: Any
scrollable: bool | None
statusmessage: Any
string_types: Any
typecaster: Any
tzinfo_factory: Any
withhold: Any
def __init__(self, *args, **kwargs) -> None: ...
withhold: bool
def __init__(self, conn: connection, name: str | bytes | None = ...) -> None: ...
def callproc(self, procname, parameters=...): ...
def cast(self, oid, s): ...
def close(self): ...
@@ -371,64 +371,99 @@ class connection:
ProgrammingError: Any
Warning: Any
@property
def async_(self) -> Any: ...
autocommit: Any
def async_(self) -> int: ...
autocommit: bool
@property
def binary_types(self) -> Any: ...
@property
def closed(self) -> Any: ...
def closed(self) -> int: ...
cursor_factory: Callable[..., _cursor]
deferrable: Any
@property
def dsn(self) -> Any: ...
def dsn(self) -> str: ...
@property
def encoding(self) -> Any: ...
def encoding(self) -> str: ...
@property
def info(self) -> ConnectionInfo: ...
isolation_level: Any
notices: Any
notifies: Any
@property
def pgconn_ptr(self) -> Any: ...
def isolation_level(self) -> int | None: ...
@isolation_level.setter
def isolation_level(self, __value: str | bytes | int | None) -> None: ...
notices: list[Any]
notifies: list[Any]
@property
def pgconn_ptr(self) -> int | None: ...
@property
def protocol_version(self) -> int: ...
readonly: Any
@property
def deferrable(self) -> bool | None: ...
@deferrable.setter
def deferrable(self, __value: Literal["default"] | bool | None) -> None: ...
@property
def readonly(self) -> bool | None: ...
@readonly.setter
def readonly(self, __value: Literal["default"] | bool | None) -> None: ...
@property
def server_version(self) -> int: ...
@property
def status(self) -> Any: ...
def status(self) -> int: ...
@property
def string_types(self) -> Any: ...
def __init__(self, *args, **kwargs) -> None: ...
def cancel(self, *args, **kwargs): ...
def close(self, *args, **kwargs): ...
def commit(self, *args, **kwargs): ...
# Really it's dsn: str, async: int = ..., async_: int = ..., but
# that would be a syntax error.
def __init__(self, dsn: str, *, async_: int = ...) -> None: ...
def cancel(self) -> None: ...
def close(self) -> None: ...
def commit(self) -> None: ...
@overload
def cursor(self, name=..., *, scrollable=..., withhold=...) -> _cursor: ...
def cursor(self, name: str | bytes | None = ..., *, withhold: bool = ..., scrollable: bool | None = ...) -> _cursor: ...
@overload
def cursor(self, name=..., cursor_factory: Callable[..., _T_cur] = ..., scrollable=..., withhold=...) -> _T_cur: ...
def fileno(self, *args, **kwargs): ...
def cursor(
self,
name: str | bytes | None = ...,
*,
cursor_factory: Callable[..., _T_cur],
withhold: bool = ...,
scrollable: bool | None = ...,
) -> _T_cur: ...
@overload
def cursor(
self, name: str | bytes | None, cursor_factory: Callable[..., _T_cur], withhold: bool = ..., scrollable: bool | None = ...
) -> _T_cur: ...
def fileno(self) -> int: ...
def get_backend_pid(self) -> int: ...
def get_dsn_parameters(self) -> dict[str, str]: ...
def get_native_connection(self, *args, **kwargs): ...
def get_native_connection(self): ...
def get_parameter_status(self, parameter: str) -> str | None: ...
def get_transaction_status(self) -> int: ...
def isexecuting(self, *args, **kwargs): ...
def lobject(self, oid=..., mode=..., new_oid=..., new_file=..., lobject_factory=...): ...
def poll(self, *args, **kwargs): ...
def reset(self): ...
def rollback(self): ...
def set_client_encoding(self, encoding): ...
def set_isolation_level(self, level): ...
def set_session(self, *args, **kwargs): ...
def tpc_begin(self, xid): ...
def tpc_commit(self, *args, **kwargs): ...
def tpc_prepare(self): ...
def tpc_recover(self): ...
def tpc_rollback(self, *args, **kwargs): ...
def xid(self, format_id, gtrid, bqual): ...
def __enter__(self): ...
def __exit__(self, type, value, traceback): ...
def isexecuting(self) -> bool: ...
def lobject(
self,
oid: int = ...,
mode: str | None = ...,
new_oid: int = ...,
new_file: str | None = ...,
lobject_factory: type[lobject] = ...,
) -> lobject: ...
def poll(self) -> int: ...
def reset(self) -> None: ...
def rollback(self) -> None: ...
def set_client_encoding(self, encoding: str) -> None: ...
def set_isolation_level(self, level: int | None) -> None: ...
def set_session(
self,
isolation_level: str | bytes | int | None = ...,
readonly: bool | Literal["default", b"default"] | None = ...,
deferrable: bool | Literal["default", b"default"] | None = ...,
autocommit: bool = ...,
) -> None: ...
def tpc_begin(self, xid: str | bytes | Xid) -> None: ...
def tpc_commit(self, __xid: str | bytes | Xid = ...) -> None: ...
def tpc_prepare(self) -> None: ...
def tpc_recover(self) -> list[Xid]: ...
def tpc_rollback(self, __xid: str | bytes | Xid = ...) -> None: ...
def xid(self, format_id, gtrid, bqual) -> Xid: ...
def __enter__(self: Self) -> Self: ...
def __exit__(self, __type: object, __name: object, __tb: object) -> None: ...
class lobject:
closed: Any