Modernize syntax in various stubs (#7469)

Use `str` and `contextlib.AbstractContextManager` instead of `typing.Text` and `typing.ContextManager`.
This commit is contained in:
Alex Waygood
2022-03-09 21:23:26 +00:00
committed by GitHub
parent cdb573b398
commit 9a1f5fb06c
16 changed files with 197 additions and 202 deletions

View File

@@ -1,5 +1,5 @@
import logging
from typing import Any, Text
from typing import Any
from .s3.connection import S3Connection
@@ -23,7 +23,7 @@ perflog: Any
def set_file_logger(name, filepath, level: Any = ..., format_string: Any | None = ...): ...
def set_stream_logger(name, level: Any = ..., format_string: Any | None = ...): ...
def connect_sqs(aws_access_key_id: Any | None = ..., aws_secret_access_key: Any | None = ..., **kwargs): ...
def connect_s3(aws_access_key_id: Text | None = ..., aws_secret_access_key: Text | None = ..., **kwargs) -> S3Connection: ...
def connect_s3(aws_access_key_id: str | None = ..., aws_secret_access_key: str | None = ..., **kwargs) -> S3Connection: ...
def connect_gs(gs_access_key_id: Any | None = ..., gs_secret_access_key: Any | None = ..., **kwargs): ...
def connect_ec2(aws_access_key_id: Any | None = ..., aws_secret_access_key: Any | None = ..., **kwargs): ...
def connect_elb(aws_access_key_id: Any | None = ..., aws_secret_access_key: Any | None = ..., **kwargs): ...

View File

@@ -1,5 +1,3 @@
from typing import Text
from boto.connection import AWSAuthConnection
from boto.regioninfo import RegionInfo
@@ -8,11 +6,11 @@ from .connection import S3Connection
class S3RegionInfo(RegionInfo):
def connect(
self,
name: Text | None = ...,
name: str | None = ...,
endpoint: str | None = ...,
connection_cls: type[AWSAuthConnection] | None = ...,
**kw_params,
) -> S3Connection: ...
def regions() -> list[S3RegionInfo]: ...
def connect_to_region(region_name: Text, **kw_params): ...
def connect_to_region(region_name: str, **kw_params): ...

View File

@@ -1,4 +1,4 @@
from typing import Any, Text
from typing import Any
from .connection import S3Connection
from .user import User
@@ -11,8 +11,8 @@ class Policy:
acl: ACL
def __init__(self, parent: Any | None = ...) -> None: ...
owner: User
def startElement(self, name: Text, attrs: dict[str, Any], connection: S3Connection) -> None | User | ACL: ...
def endElement(self, name: Text, value: Any, connection: S3Connection) -> None: ...
def startElement(self, name: str, attrs: dict[str, Any], connection: S3Connection) -> None | User | ACL: ...
def endElement(self, name: str, value: Any, connection: S3Connection) -> None: ...
def to_xml(self) -> str: ...
class ACL:
@@ -20,29 +20,29 @@ class ACL:
grants: list[Grant]
def __init__(self, policy: Policy | None = ...) -> None: ...
def add_grant(self, grant: Grant) -> None: ...
def add_email_grant(self, permission: Text, email_address: Text) -> None: ...
def add_user_grant(self, permission: Text, user_id: Text, display_name: Text | None = ...) -> None: ...
def add_email_grant(self, permission: str, email_address: str) -> None: ...
def add_user_grant(self, permission: str, user_id: str, display_name: str | None = ...) -> None: ...
def startElement(self, name, attrs, connection): ...
def endElement(self, name: Text, value: Any, connection: S3Connection) -> None: ...
def endElement(self, name: str, value: Any, connection: S3Connection) -> None: ...
def to_xml(self) -> str: ...
class Grant:
NameSpace: Text
permission: Text
id: Text
display_name: Text
uri: Text
email_address: Text
type: Text
NameSpace: str
permission: str
id: str
display_name: str
uri: str
email_address: str
type: str
def __init__(
self,
permission: Text | None = ...,
type: Text | None = ...,
id: Text | None = ...,
display_name: Text | None = ...,
uri: Text | None = ...,
email_address: Text | None = ...,
permission: str | None = ...,
type: str | None = ...,
id: str | None = ...,
display_name: str | None = ...,
uri: str | None = ...,
email_address: str | None = ...,
) -> None: ...
def startElement(self, name, attrs, connection): ...
def endElement(self, name: Text, value: Any, connection: S3Connection) -> None: ...
def endElement(self, name: str, value: Any, connection: S3Connection) -> None: ...
def to_xml(self) -> str: ...

View File

@@ -1,4 +1,4 @@
from typing import Any, Text
from typing import Any
from .bucketlistresultset import BucketListResultSet
from .connection import S3Connection
@@ -7,7 +7,7 @@ from .key import Key
class S3WebsiteEndpointTranslate:
trans_region: dict[str, str]
@classmethod
def translate_region(cls, reg: Text) -> str: ...
def translate_region(cls, reg: str) -> str: ...
S3Permissions: list[str]
@@ -17,31 +17,31 @@ class Bucket:
VersioningBody: str
VersionRE: str
MFADeleteRE: str
name: Text
name: str
connection: S3Connection
key_class: type[Key]
def __init__(self, connection: S3Connection | None = ..., name: Text | None = ..., key_class: type[Key] = ...) -> None: ...
def __init__(self, connection: S3Connection | None = ..., name: str | None = ..., key_class: type[Key] = ...) -> None: ...
def __iter__(self): ...
def __contains__(self, key_name) -> bool: ...
def startElement(self, name, attrs, connection): ...
creation_date: Any
def endElement(self, name, value, connection): ...
def set_key_class(self, key_class): ...
def lookup(self, key_name, headers: dict[Text, Text] | None = ...): ...
def lookup(self, key_name, headers: dict[str, str] | None = ...): ...
def get_key(
self,
key_name,
headers: dict[Text, Text] | None = ...,
headers: dict[str, str] | None = ...,
version_id: Any | None = ...,
response_headers: dict[Text, Text] | None = ...,
response_headers: dict[str, str] | None = ...,
validate: bool = ...,
) -> Key: ...
def list(
self,
prefix: Text = ...,
delimiter: Text = ...,
marker: Text = ...,
headers: dict[Text, Text] | None = ...,
prefix: str = ...,
delimiter: str = ...,
marker: str = ...,
headers: dict[str, str] | None = ...,
encoding_type: Any | None = ...,
) -> BucketListResultSet: ...
def list_versions(
@@ -50,34 +50,34 @@ class Bucket:
delimiter: str = ...,
key_marker: str = ...,
version_id_marker: str = ...,
headers: dict[Text, Text] | None = ...,
encoding_type: Text | None = ...,
headers: dict[str, str] | None = ...,
encoding_type: str | None = ...,
) -> BucketListResultSet: ...
def list_multipart_uploads(
self,
key_marker: str = ...,
upload_id_marker: str = ...,
headers: dict[Text, Text] | None = ...,
headers: dict[str, str] | None = ...,
encoding_type: Any | None = ...,
): ...
def validate_kwarg_names(self, kwargs, names): ...
def get_all_keys(self, headers: dict[Text, Text] | None = ..., **params): ...
def get_all_versions(self, headers: dict[Text, Text] | None = ..., **params): ...
def get_all_keys(self, headers: dict[str, str] | None = ..., **params): ...
def get_all_versions(self, headers: dict[str, str] | None = ..., **params): ...
def validate_get_all_versions_params(self, params): ...
def get_all_multipart_uploads(self, headers: dict[Text, Text] | None = ..., **params): ...
def get_all_multipart_uploads(self, headers: dict[str, str] | None = ..., **params): ...
def new_key(self, key_name: Any | None = ...): ...
def generate_url(
self,
expires_in,
method: str = ...,
headers: dict[Text, Text] | None = ...,
headers: dict[str, str] | None = ...,
force_http: bool = ...,
response_headers: dict[Text, Text] | None = ...,
response_headers: dict[str, str] | None = ...,
expires_in_absolute: bool = ...,
): ...
def delete_keys(self, keys, quiet: bool = ..., mfa_token: Any | None = ..., headers: dict[Text, Text] | None = ...): ...
def delete_keys(self, keys, quiet: bool = ..., mfa_token: Any | None = ..., headers: dict[str, str] | None = ...): ...
def delete_key(
self, key_name, headers: dict[Text, Text] | None = ..., version_id: Any | None = ..., mfa_token: Any | None = ...
self, key_name, headers: dict[str, str] | None = ..., version_id: Any | None = ..., mfa_token: Any | None = ...
): ...
def copy_key(
self,
@@ -89,90 +89,90 @@ class Bucket:
storage_class: str = ...,
preserve_acl: bool = ...,
encrypt_key: bool = ...,
headers: dict[Text, Text] | None = ...,
headers: dict[str, str] | None = ...,
query_args: Any | None = ...,
): ...
def set_canned_acl(
self, acl_str, key_name: str = ..., headers: dict[Text, Text] | None = ..., version_id: Any | None = ...
self, acl_str, key_name: str = ..., headers: dict[str, str] | None = ..., version_id: Any | None = ...
): ...
def get_xml_acl(self, key_name: str = ..., headers: dict[Text, Text] | None = ..., version_id: Any | None = ...): ...
def get_xml_acl(self, key_name: str = ..., headers: dict[str, str] | None = ..., version_id: Any | None = ...): ...
def set_xml_acl(
self,
acl_str,
key_name: str = ...,
headers: dict[Text, Text] | None = ...,
headers: dict[str, str] | None = ...,
version_id: Any | None = ...,
query_args: str = ...,
): ...
def set_acl(self, acl_or_str, key_name: str = ..., headers: dict[Text, Text] | None = ..., version_id: Any | None = ...): ...
def get_acl(self, key_name: str = ..., headers: dict[Text, Text] | None = ..., version_id: Any | None = ...): ...
def set_acl(self, acl_or_str, key_name: str = ..., headers: dict[str, str] | None = ..., version_id: Any | None = ...): ...
def get_acl(self, key_name: str = ..., headers: dict[str, str] | None = ..., version_id: Any | None = ...): ...
def set_subresource(
self, subresource, value, key_name: str = ..., headers: dict[Text, Text] | None = ..., version_id: Any | None = ...
self, subresource, value, key_name: str = ..., headers: dict[str, str] | None = ..., version_id: Any | None = ...
): ...
def get_subresource(
self, subresource, key_name: str = ..., headers: dict[Text, Text] | None = ..., version_id: Any | None = ...
self, subresource, key_name: str = ..., headers: dict[str, str] | None = ..., version_id: Any | None = ...
): ...
def make_public(self, recursive: bool = ..., headers: dict[Text, Text] | None = ...): ...
def add_email_grant(self, permission, email_address, recursive: bool = ..., headers: dict[Text, Text] | None = ...): ...
def make_public(self, recursive: bool = ..., headers: dict[str, str] | None = ...): ...
def add_email_grant(self, permission, email_address, recursive: bool = ..., headers: dict[str, str] | None = ...): ...
def add_user_grant(
self, permission, user_id, recursive: bool = ..., headers: dict[Text, Text] | None = ..., display_name: Any | None = ...
self, permission, user_id, recursive: bool = ..., headers: dict[str, str] | None = ..., display_name: Any | None = ...
): ...
def list_grants(self, headers: dict[Text, Text] | None = ...): ...
def list_grants(self, headers: dict[str, str] | None = ...): ...
def get_location(self): ...
def set_xml_logging(self, logging_str, headers: dict[Text, Text] | None = ...): ...
def set_xml_logging(self, logging_str, headers: dict[str, str] | None = ...): ...
def enable_logging(
self, target_bucket, target_prefix: str = ..., grants: Any | None = ..., headers: dict[Text, Text] | None = ...
self, target_bucket, target_prefix: str = ..., grants: Any | None = ..., headers: dict[str, str] | None = ...
): ...
def disable_logging(self, headers: dict[Text, Text] | None = ...): ...
def get_logging_status(self, headers: dict[Text, Text] | None = ...): ...
def set_as_logging_target(self, headers: dict[Text, Text] | None = ...): ...
def get_request_payment(self, headers: dict[Text, Text] | None = ...): ...
def set_request_payment(self, payer: str = ..., headers: dict[Text, Text] | None = ...): ...
def disable_logging(self, headers: dict[str, str] | None = ...): ...
def get_logging_status(self, headers: dict[str, str] | None = ...): ...
def set_as_logging_target(self, headers: dict[str, str] | None = ...): ...
def get_request_payment(self, headers: dict[str, str] | None = ...): ...
def set_request_payment(self, payer: str = ..., headers: dict[str, str] | None = ...): ...
def configure_versioning(
self, versioning, mfa_delete: bool = ..., mfa_token: Any | None = ..., headers: dict[Text, Text] | None = ...
self, versioning, mfa_delete: bool = ..., mfa_token: Any | None = ..., headers: dict[str, str] | None = ...
): ...
def get_versioning_status(self, headers: dict[Text, Text] | None = ...): ...
def configure_lifecycle(self, lifecycle_config, headers: dict[Text, Text] | None = ...): ...
def get_lifecycle_config(self, headers: dict[Text, Text] | None = ...): ...
def delete_lifecycle_configuration(self, headers: dict[Text, Text] | None = ...): ...
def get_versioning_status(self, headers: dict[str, str] | None = ...): ...
def configure_lifecycle(self, lifecycle_config, headers: dict[str, str] | None = ...): ...
def get_lifecycle_config(self, headers: dict[str, str] | None = ...): ...
def delete_lifecycle_configuration(self, headers: dict[str, str] | None = ...): ...
def configure_website(
self,
suffix: Any | None = ...,
error_key: Any | None = ...,
redirect_all_requests_to: Any | None = ...,
routing_rules: Any | None = ...,
headers: dict[Text, Text] | None = ...,
headers: dict[str, str] | None = ...,
): ...
def set_website_configuration(self, config, headers: dict[Text, Text] | None = ...): ...
def set_website_configuration_xml(self, xml, headers: dict[Text, Text] | None = ...): ...
def get_website_configuration(self, headers: dict[Text, Text] | None = ...): ...
def get_website_configuration_obj(self, headers: dict[Text, Text] | None = ...): ...
def get_website_configuration_with_xml(self, headers: dict[Text, Text] | None = ...): ...
def get_website_configuration_xml(self, headers: dict[Text, Text] | None = ...): ...
def delete_website_configuration(self, headers: dict[Text, Text] | None = ...): ...
def set_website_configuration(self, config, headers: dict[str, str] | None = ...): ...
def set_website_configuration_xml(self, xml, headers: dict[str, str] | None = ...): ...
def get_website_configuration(self, headers: dict[str, str] | None = ...): ...
def get_website_configuration_obj(self, headers: dict[str, str] | None = ...): ...
def get_website_configuration_with_xml(self, headers: dict[str, str] | None = ...): ...
def get_website_configuration_xml(self, headers: dict[str, str] | None = ...): ...
def delete_website_configuration(self, headers: dict[str, str] | None = ...): ...
def get_website_endpoint(self): ...
def get_policy(self, headers: dict[Text, Text] | None = ...): ...
def set_policy(self, policy, headers: dict[Text, Text] | None = ...): ...
def delete_policy(self, headers: dict[Text, Text] | None = ...): ...
def set_cors_xml(self, cors_xml, headers: dict[Text, Text] | None = ...): ...
def set_cors(self, cors_config, headers: dict[Text, Text] | None = ...): ...
def get_cors_xml(self, headers: dict[Text, Text] | None = ...): ...
def get_cors(self, headers: dict[Text, Text] | None = ...): ...
def delete_cors(self, headers: dict[Text, Text] | None = ...): ...
def get_policy(self, headers: dict[str, str] | None = ...): ...
def set_policy(self, policy, headers: dict[str, str] | None = ...): ...
def delete_policy(self, headers: dict[str, str] | None = ...): ...
def set_cors_xml(self, cors_xml, headers: dict[str, str] | None = ...): ...
def set_cors(self, cors_config, headers: dict[str, str] | None = ...): ...
def get_cors_xml(self, headers: dict[str, str] | None = ...): ...
def get_cors(self, headers: dict[str, str] | None = ...): ...
def delete_cors(self, headers: dict[str, str] | None = ...): ...
def initiate_multipart_upload(
self,
key_name,
headers: dict[Text, Text] | None = ...,
headers: dict[str, str] | None = ...,
reduced_redundancy: bool = ...,
metadata: Any | None = ...,
encrypt_key: bool = ...,
policy: Any | None = ...,
): ...
def complete_multipart_upload(self, key_name, upload_id, xml_body, headers: dict[Text, Text] | None = ...): ...
def cancel_multipart_upload(self, key_name, upload_id, headers: dict[Text, Text] | None = ...): ...
def delete(self, headers: dict[Text, Text] | None = ...): ...
def complete_multipart_upload(self, key_name, upload_id, xml_body, headers: dict[str, str] | None = ...): ...
def cancel_multipart_upload(self, key_name, upload_id, headers: dict[str, str] | None = ...): ...
def delete(self, headers: dict[str, str] | None = ...): ...
def get_tags(self): ...
def get_xml_tags(self): ...
def set_xml_tags(self, tag_str, headers: dict[Text, Text] | None = ..., query_args: str = ...): ...
def set_tags(self, tags, headers: dict[Text, Text] | None = ...): ...
def delete_tags(self, headers: dict[Text, Text] | None = ...): ...
def set_xml_tags(self, tag_str, headers: dict[str, str] | None = ..., query_args: str = ...): ...
def set_tags(self, tags, headers: dict[str, str] | None = ...): ...
def delete_tags(self, headers: dict[str, str] | None = ...): ...

View File

@@ -1,4 +1,4 @@
from typing import Any, Text
from typing import Any
from boto.connection import AWSAuthConnection
from boto.exception import BotoClientError
@@ -97,9 +97,9 @@ class S3Connection(AWSAuthConnection):
method,
bucket: str = ...,
key: str = ...,
headers: dict[Text, Text] | None = ...,
headers: dict[str, str] | None = ...,
force_http: bool = ...,
response_headers: dict[Text, Text] | None = ...,
response_headers: dict[str, str] | None = ...,
version_id: Any | None = ...,
iso_date: Any | None = ...,
): ...
@@ -109,20 +109,18 @@ class S3Connection(AWSAuthConnection):
method,
bucket: str = ...,
key: str = ...,
headers: dict[Text, Text] | None = ...,
headers: dict[str, str] | None = ...,
query_auth: bool = ...,
force_http: bool = ...,
response_headers: dict[Text, Text] | None = ...,
response_headers: dict[str, str] | None = ...,
expires_in_absolute: bool = ...,
version_id: Any | None = ...,
): ...
def get_all_buckets(self, headers: dict[Text, Text] | None = ...): ...
def get_canonical_user_id(self, headers: dict[Text, Text] | None = ...): ...
def get_bucket(self, bucket_name: Text, validate: bool = ..., headers: dict[Text, Text] | None = ...) -> Bucket: ...
def head_bucket(self, bucket_name, headers: dict[Text, Text] | None = ...): ...
def lookup(self, bucket_name, validate: bool = ..., headers: dict[Text, Text] | None = ...): ...
def create_bucket(
self, bucket_name, headers: dict[Text, Text] | None = ..., location: Any = ..., policy: Any | None = ...
): ...
def delete_bucket(self, bucket, headers: dict[Text, Text] | None = ...): ...
def get_all_buckets(self, headers: dict[str, str] | None = ...): ...
def get_canonical_user_id(self, headers: dict[str, str] | None = ...): ...
def get_bucket(self, bucket_name: str, validate: bool = ..., headers: dict[str, str] | None = ...) -> Bucket: ...
def head_bucket(self, bucket_name, headers: dict[str, str] | None = ...): ...
def lookup(self, bucket_name, validate: bool = ..., headers: dict[str, str] | None = ...): ...
def create_bucket(self, bucket_name, headers: dict[str, str] | None = ..., location: Any = ..., policy: Any | None = ...): ...
def delete_bucket(self, bucket, headers: dict[str, str] | None = ...): ...
def make_request(self, method, bucket: str = ..., key: str = ..., headers: Any | None = ..., data: str = ..., query_args: Any | None = ..., sender: Any | None = ..., override_num_retries: Any | None = ..., retry_handler: Any | None = ..., *args, **kwargs): ... # type: ignore # https://github.com/python/mypy/issues/1237

View File

@@ -1,4 +1,4 @@
from typing import Any, Callable, Text, overload
from typing import Any, Callable, overload
class Key:
DefaultContentType: str
@@ -45,16 +45,16 @@ class Key:
def handle_addl_headers(self, headers): ...
def open_read(
self,
headers: dict[Text, Text] | None = ...,
headers: dict[str, str] | None = ...,
query_args: str = ...,
override_num_retries: Any | None = ...,
response_headers: dict[Text, Text] | None = ...,
response_headers: dict[str, str] | None = ...,
): ...
def open_write(self, headers: dict[Text, Text] | None = ..., override_num_retries: Any | None = ...): ...
def open_write(self, headers: dict[str, str] | None = ..., override_num_retries: Any | None = ...): ...
def open(
self,
mode: str = ...,
headers: dict[Text, Text] | None = ...,
headers: dict[str, str] | None = ...,
query_args: Any | None = ...,
override_num_retries: Any | None = ...,
): ...
@@ -76,27 +76,27 @@ class Key:
): ...
def startElement(self, name, attrs, connection): ...
def endElement(self, name, value, connection): ...
def exists(self, headers: dict[Text, Text] | None = ...): ...
def delete(self, headers: dict[Text, Text] | None = ...): ...
def exists(self, headers: dict[str, str] | None = ...): ...
def delete(self, headers: dict[str, str] | None = ...): ...
def get_metadata(self, name): ...
def set_metadata(self, name, value): ...
def update_metadata(self, d): ...
def set_acl(self, acl_str, headers: dict[Text, Text] | None = ...): ...
def get_acl(self, headers: dict[Text, Text] | None = ...): ...
def get_xml_acl(self, headers: dict[Text, Text] | None = ...): ...
def set_xml_acl(self, acl_str, headers: dict[Text, Text] | None = ...): ...
def set_canned_acl(self, acl_str, headers: dict[Text, Text] | None = ...): ...
def set_acl(self, acl_str, headers: dict[str, str] | None = ...): ...
def get_acl(self, headers: dict[str, str] | None = ...): ...
def get_xml_acl(self, headers: dict[str, str] | None = ...): ...
def set_xml_acl(self, acl_str, headers: dict[str, str] | None = ...): ...
def set_canned_acl(self, acl_str, headers: dict[str, str] | None = ...): ...
def get_redirect(self): ...
def set_redirect(self, redirect_location, headers: dict[Text, Text] | None = ...): ...
def make_public(self, headers: dict[Text, Text] | None = ...): ...
def set_redirect(self, redirect_location, headers: dict[str, str] | None = ...): ...
def make_public(self, headers: dict[str, str] | None = ...): ...
def generate_url(
self,
expires_in,
method: str = ...,
headers: dict[Text, Text] | None = ...,
headers: dict[str, str] | None = ...,
query_auth: bool = ...,
force_http: bool = ...,
response_headers: dict[Text, Text] | None = ...,
response_headers: dict[str, str] | None = ...,
expires_in_absolute: bool = ...,
version_id: Any | None = ...,
policy: Any | None = ...,
@@ -106,7 +106,7 @@ class Key:
def send_file(
self,
fp,
headers: dict[Text, Text] | None = ...,
headers: dict[str, str] | None = ...,
cb: Callable[[int, int], Any] | None = ...,
num_cb: int = ...,
query_args: Any | None = ...,
@@ -118,7 +118,7 @@ class Key:
def set_contents_from_stream(
self,
fp,
headers: dict[Text, Text] | None = ...,
headers: dict[str, str] | None = ...,
replace: bool = ...,
cb: Callable[[int, int], Any] | None = ...,
num_cb: int = ...,
@@ -130,7 +130,7 @@ class Key:
def set_contents_from_file(
self,
fp,
headers: dict[Text, Text] | None = ...,
headers: dict[str, str] | None = ...,
replace: bool = ...,
cb: Callable[[int, int], Any] | None = ...,
num_cb: int = ...,
@@ -145,7 +145,7 @@ class Key:
def set_contents_from_filename(
self,
filename,
headers: dict[Text, Text] | None = ...,
headers: dict[str, str] | None = ...,
replace: bool = ...,
cb: Callable[[int, int], Any] | None = ...,
num_cb: int = ...,
@@ -156,8 +156,8 @@ class Key:
): ...
def set_contents_from_string(
self,
string_data: Text | bytes,
headers: dict[Text, Text] | None = ...,
string_data: str | bytes,
headers: dict[str, str] | None = ...,
replace: bool = ...,
cb: Callable[[int, int], Any] | None = ...,
num_cb: int = ...,
@@ -169,63 +169,63 @@ class Key:
def get_file(
self,
fp,
headers: dict[Text, Text] | None = ...,
headers: dict[str, str] | None = ...,
cb: Callable[[int, int], Any] | None = ...,
num_cb: int = ...,
torrent: bool = ...,
version_id: Any | None = ...,
override_num_retries: Any | None = ...,
response_headers: dict[Text, Text] | None = ...,
response_headers: dict[str, str] | None = ...,
): ...
def get_torrent_file(
self, fp, headers: dict[Text, Text] | None = ..., cb: Callable[[int, int], Any] | None = ..., num_cb: int = ...
self, fp, headers: dict[str, str] | None = ..., cb: Callable[[int, int], Any] | None = ..., num_cb: int = ...
): ...
def get_contents_to_file(
self,
fp,
headers: dict[Text, Text] | None = ...,
headers: dict[str, str] | None = ...,
cb: Callable[[int, int], Any] | None = ...,
num_cb: int = ...,
torrent: bool = ...,
version_id: Any | None = ...,
res_download_handler: Any | None = ...,
response_headers: dict[Text, Text] | None = ...,
response_headers: dict[str, str] | None = ...,
): ...
def get_contents_to_filename(
self,
filename,
headers: dict[Text, Text] | None = ...,
headers: dict[str, str] | None = ...,
cb: Callable[[int, int], Any] | None = ...,
num_cb: int = ...,
torrent: bool = ...,
version_id: Any | None = ...,
res_download_handler: Any | None = ...,
response_headers: dict[Text, Text] | None = ...,
response_headers: dict[str, str] | None = ...,
): ...
@overload
def get_contents_as_string(
self,
headers: dict[Text, Text] | None = ...,
headers: dict[str, str] | None = ...,
cb: Callable[[int, int], Any] | None = ...,
num_cb: int = ...,
torrent: bool = ...,
version_id: Any | None = ...,
response_headers: dict[Text, Text] | None = ...,
response_headers: dict[str, str] | None = ...,
encoding: None = ...,
) -> bytes: ...
@overload
def get_contents_as_string(
self,
headers: dict[Text, Text] | None = ...,
headers: dict[str, str] | None = ...,
cb: Callable[[int, int], Any] | None = ...,
num_cb: int = ...,
torrent: bool = ...,
version_id: Any | None = ...,
response_headers: dict[Text, Text] | None = ...,
response_headers: dict[str, str] | None = ...,
*,
encoding: Text,
) -> Text: ...
def add_email_grant(self, permission, email_address, headers: dict[Text, Text] | None = ...): ...
def add_user_grant(self, permission, user_id, headers: dict[Text, Text] | None = ..., display_name: Any | None = ...): ...
def set_remote_metadata(self, metadata_plus, metadata_minus, preserve_acl, headers: dict[Text, Text] | None = ...): ...
def restore(self, days, headers: dict[Text, Text] | None = ...): ...
encoding: str,
) -> str: ...
def add_email_grant(self, permission, email_address, headers: dict[str, str] | None = ...): ...
def add_user_grant(self, permission, user_id, headers: dict[str, str] | None = ..., display_name: Any | None = ...): ...
def set_remote_metadata(self, metadata_plus, metadata_minus, preserve_acl, headers: dict[str, str] | None = ...): ...
def restore(self, days, headers: dict[str, str] | None = ...): ...

View File

@@ -3,7 +3,8 @@ import logging.handlers
import subprocess
import sys
import time
from typing import IO, Any, Callable, ContextManager, Iterable, Mapping, Sequence, TypeVar
from contextlib import AbstractContextManager
from typing import IO, Any, Callable, Iterable, Mapping, Sequence, TypeVar
import boto.connection
@@ -68,7 +69,7 @@ ISO8601_MS: str
RFC1123: str
LOCALE_LOCK: _LockType
def setlocale(name: str | tuple[str, str]) -> ContextManager[str]: ...
def setlocale(name: str | tuple[str, str]) -> AbstractContextManager[str]: ...
def get_ts(ts: time.struct_time | None = ...) -> str: ...
def parse_ts(ts: str) -> datetime.datetime: ...
def find_class(module_name: str, class_name: str | None = ...) -> type[Any] | None: ...

View File

@@ -1,9 +1,7 @@
from typing import Text
class InvalidToken(Exception): ...
class Fernet:
def __init__(self, key: bytes | Text) -> None: ...
def __init__(self, key: bytes | str) -> None: ...
def decrypt(self, token: bytes, ttl: int | None = ...) -> bytes: ...
# decrypt_at_time accepts None ttl at runtime but it's an implementtion detail and it doesn't really
# make sense for the client code to use it like that, so the parameter is typed as int as opposed to

View File

@@ -3,7 +3,7 @@ from _typeshed import Self
from abc import ABCMeta, abstractmethod
from enum import Enum
from ipaddress import IPv4Address, IPv4Network, IPv6Address, IPv6Network
from typing import Any, ClassVar, Generator, Generic, Iterable, Sequence, Text, TypeVar
from typing import Any, ClassVar, Generator, Generic, Iterable, Sequence, TypeVar
from cryptography.hazmat.backends.interfaces import X509Backend
from cryptography.hazmat.primitives.asymmetric.dsa import DSAPrivateKey, DSAPublicKey
@@ -108,8 +108,8 @@ class ExtendedKeyUsageOID:
class NameAttribute:
oid: ObjectIdentifier
value: Text
def __init__(self, oid: ObjectIdentifier, value: Text) -> None: ...
value: str
def __init__(self, oid: ObjectIdentifier, value: str) -> None: ...
def rfc4514_string(self) -> str: ...
class RelativeDistinguishedName:
@@ -254,8 +254,8 @@ class DirectoryName(GeneralName):
def __init__(self, value: Name) -> None: ...
class DNSName(GeneralName):
value: Text
def __init__(self, value: Text) -> None: ...
value: str
def __init__(self, value: str) -> None: ...
class IPAddress(GeneralName):
value: IPv4Address | IPv6Address | IPv4Network | IPv6Network
@@ -271,12 +271,12 @@ class RegisteredID(GeneralName):
def __init__(self, value: ObjectIdentifier) -> None: ...
class RFC822Name(GeneralName):
value: Text
def __init__(self, value: Text) -> None: ...
value: str
def __init__(self, value: str) -> None: ...
class UniformResourceIdentifier(GeneralName):
value: Text
def __init__(self, value: Text) -> None: ...
value: str
def __init__(self, value: str) -> None: ...
# X.509 Extensions

View File

@@ -1,16 +1,14 @@
from threading import Event
from typing import Generic, Text, TypeVar
_T = TypeVar("_T", Text, bytes)
from typing import AnyStr, Generic
class PipeTimeout(IOError): ...
class BufferedPipe(Generic[_T]):
class BufferedPipe(Generic[AnyStr]):
def __init__(self) -> None: ...
def set_event(self, event: Event) -> None: ...
def feed(self, data: _T) -> None: ...
def feed(self, data: AnyStr) -> None: ...
def read_ready(self) -> bool: ...
def read(self, nbytes: int, timeout: float | None = ...) -> _T: ...
def empty(self) -> _T: ...
def read(self, nbytes: int, timeout: float | None = ...) -> AnyStr: ...
def empty(self) -> AnyStr: ...
def close(self) -> None: ...
def __len__(self) -> int: ...

View File

@@ -1,5 +1,5 @@
import sys
from typing import Protocol, Text, Union
from typing import Protocol, Union
MSG_DISCONNECT: int
MSG_IGNORE: int
@@ -109,7 +109,7 @@ else:
class _SupportsAsBytes(Protocol):
def asbytes(self) -> bytes: ...
_LikeBytes = Union[bytes, Text, _SupportsAsBytes]
_LikeBytes = Union[bytes, str, _SupportsAsBytes]
def asbytes(s: _LikeBytes) -> bytes: ...

View File

@@ -1,5 +1,5 @@
import sys
from typing import Any, Iterable, Text
from typing import Any, Iterable
from .common import _LikeBytes
@@ -27,7 +27,7 @@ class Message:
def get_int64(self) -> int: ...
def get_mpint(self) -> int: ...
def get_string(self) -> bytes: ...
def get_text(self) -> Text: ...
def get_text(self) -> str: ...
def get_binary(self) -> bytes: ...
def get_list(self) -> list[str]: ...
def add_bytes(self, b: bytes) -> Message: ...

View File

@@ -1,5 +1,5 @@
import sys
from typing import Any, Iterable, Sequence, Text, TypeVar
from typing import Any, Iterable, Sequence, TypeVar
_T = TypeVar("_T")
@@ -34,7 +34,7 @@ def byte_ord(c: int | str) -> int: ...
def byte_chr(c: int) -> bytes: ...
def byte_mask(c: int, mask: int) -> bytes: ...
def b(s: bytes | str, encoding: str = ...) -> bytes: ...
def u(s: bytes | str, encoding: str = ...) -> Text: ...
def u(s: bytes | str, encoding: str = ...) -> str: ...
def b2s(s: bytes | str) -> str: ...
def is_callable(c: Any) -> bool: ...
def next(c: Iterable[_T]) -> _T: ...

View File

@@ -1,6 +1,6 @@
from _typeshed import Self
from logging import Logger
from typing import IO, Any, Callable, Iterator, Text
from typing import IO, Any, Callable, Iterator
from paramiko.channel import Channel
from paramiko.sftp import BaseSFTP
@@ -27,35 +27,35 @@ class SFTPClient(BaseSFTP, ClosingContextManager):
def get_channel(self) -> Channel | None: ...
def listdir(self, path: str = ...) -> list[str]: ...
def listdir_attr(self, path: str = ...) -> list[SFTPAttributes]: ...
def listdir_iter(self, path: bytes | Text = ..., read_aheads: int = ...) -> Iterator[SFTPAttributes]: ...
def open(self, filename: bytes | Text, mode: str = ..., bufsize: int = ...) -> SFTPFile: ...
def listdir_iter(self, path: bytes | str = ..., read_aheads: int = ...) -> Iterator[SFTPAttributes]: ...
def open(self, filename: bytes | str, mode: str = ..., bufsize: int = ...) -> SFTPFile: ...
file = open
def remove(self, path: bytes | Text) -> None: ...
def remove(self, path: bytes | str) -> None: ...
unlink = remove
def rename(self, oldpath: bytes | Text, newpath: bytes | Text) -> None: ...
def posix_rename(self, oldpath: bytes | Text, newpath: bytes | Text) -> None: ...
def mkdir(self, path: bytes | Text, mode: int = ...) -> None: ...
def rmdir(self, path: bytes | Text) -> None: ...
def stat(self, path: bytes | Text) -> SFTPAttributes: ...
def lstat(self, path: bytes | Text) -> SFTPAttributes: ...
def symlink(self, source: bytes | Text, dest: bytes | Text) -> None: ...
def chmod(self, path: bytes | Text, mode: int) -> None: ...
def chown(self, path: bytes | Text, uid: int, gid: int) -> None: ...
def utime(self, path: bytes | Text, times: tuple[float, float] | None) -> None: ...
def truncate(self, path: bytes | Text, size: int) -> None: ...
def readlink(self, path: bytes | Text) -> Text | None: ...
def normalize(self, path: bytes | Text) -> Text: ...
def chdir(self, path: None | bytes | Text = ...) -> None: ...
def getcwd(self) -> Text | None: ...
def rename(self, oldpath: bytes | str, newpath: bytes | str) -> None: ...
def posix_rename(self, oldpath: bytes | str, newpath: bytes | str) -> None: ...
def mkdir(self, path: bytes | str, mode: int = ...) -> None: ...
def rmdir(self, path: bytes | str) -> None: ...
def stat(self, path: bytes | str) -> SFTPAttributes: ...
def lstat(self, path: bytes | str) -> SFTPAttributes: ...
def symlink(self, source: bytes | str, dest: bytes | str) -> None: ...
def chmod(self, path: bytes | str, mode: int) -> None: ...
def chown(self, path: bytes | str, uid: int, gid: int) -> None: ...
def utime(self, path: bytes | str, times: tuple[float, float] | None) -> None: ...
def truncate(self, path: bytes | str, size: int) -> None: ...
def readlink(self, path: bytes | str) -> str | None: ...
def normalize(self, path: bytes | str) -> str: ...
def chdir(self, path: None | bytes | str = ...) -> None: ...
def getcwd(self) -> str | None: ...
def putfo(
self, fl: IO[bytes], remotepath: bytes | Text, file_size: int = ..., callback: _Callback | None = ..., confirm: bool = ...
self, fl: IO[bytes], remotepath: bytes | str, file_size: int = ..., callback: _Callback | None = ..., confirm: bool = ...
) -> SFTPAttributes: ...
def put(
self, localpath: bytes | Text, remotepath: bytes | Text, callback: _Callback | None = ..., confirm: bool = ...
self, localpath: bytes | str, remotepath: bytes | str, callback: _Callback | None = ..., confirm: bool = ...
) -> SFTPAttributes: ...
def getfo(self, remotepath: bytes | Text, fl: IO[bytes], callback: _Callback | None = ..., prefetch: bool = ...) -> int: ...
def getfo(self, remotepath: bytes | str, fl: IO[bytes], callback: _Callback | None = ..., prefetch: bool = ...) -> int: ...
def get(
self, remotepath: bytes | Text, localpath: bytes | Text, callback: _Callback | None = ..., prefetch: bool = ...
self, remotepath: bytes | str, localpath: bytes | str, callback: _Callback | None = ..., prefetch: bool = ...
) -> None: ...
class SFTP(SFTPClient): ...

View File

@@ -1,7 +1,8 @@
from _typeshed import Self
from contextlib import AbstractContextManager
from stat import S_IMODE as S_IMODE
from types import TracebackType
from typing import IO, Any, Callable, ContextManager, Sequence, Text, Union
from typing import IO, Any, Callable, Sequence, Union
from typing_extensions import Literal
import paramiko
@@ -32,7 +33,7 @@ class CnOpts:
def get_hostkey(self, host: str) -> paramiko.PKey: ...
_Callback = Callable[[int, int], Any]
_Path = Union[Text, bytes]
_Path = Union[str, bytes]
class Connection:
def __init__(
@@ -75,7 +76,7 @@ class Connection:
confirm: bool = ...,
) -> paramiko.SFTPAttributes: ...
def execute(self, command: str) -> list[str]: ...
def cd(self, remotepath: _Path | None = ...) -> ContextManager[None]: ... # noqa: F811
def cd(self, remotepath: _Path | None = ...) -> AbstractContextManager[None]: ... # noqa: F811
def chdir(self, remotepath: _Path) -> None: ...
def cwd(self, remotepath: _Path) -> None: ...
def chmod(self, remotepath: _Path, mode: int = ...) -> None: ...

View File

@@ -1,4 +1,5 @@
from typing import Callable, ContextManager, Iterator
from collections.abc import Callable, Iterator
from contextlib import AbstractContextManager
def known_hosts() -> str: ...
def st_mode_to_int(val: int) -> int: ...
@@ -30,4 +31,4 @@ _PathCallback = Callable[[str], None]
def walktree(
localpath: str, fcallback: _PathCallback, dcallback: _PathCallback, ucallback: _PathCallback, recurse: bool = ...
) -> None: ...
def cd(localpath: str | None = ...) -> ContextManager[None]: ...
def cd(localpath: str | None = ...) -> AbstractContextManager[None]: ...