Remove boto stubs (#12856)

Closes: #12848
This commit is contained in:
Sebastian Rittau
2024-10-19 18:07:28 +02:00
committed by GitHub
parent a22e35814e
commit b0d57cc03c
34 changed files with 0 additions and 2090 deletions

View File

@@ -34,7 +34,6 @@
"stubs/beautifulsoup4",
"stubs/bleach/bleach/sanitizer.pyi",
"stubs/boltons",
"stubs/boto",
"stubs/braintree",
"stubs/caldav",
"stubs/cffi",

View File

@@ -1,9 +0,0 @@
boto.connection.AWSQueryConnection.make_request
boto.elb
boto.kms.layer1.KMSConnection.make_request
boto.s3.S3RegionInfo.connect
boto.s3.bucket.Bucket.get_location
boto.s3.bucket.Bucket.get_tags
boto.s3.bucket.Bucket.get_xml_tags
boto.s3.connection.S3Connection.make_request
boto.utils.LazyLoadMetadata.get

View File

@@ -1,8 +0,0 @@
version = "2.49.*"
upstream_repository = "https://github.com/boto/boto"
requires = []
partial_stub = true
no_longer_updated = true
[tool.stubtest]
ignore_missing_stub = true

View File

@@ -1,143 +0,0 @@
import logging
from _typeshed import Incomplete
from typing import Any
from .s3.connection import S3Connection
Version: Any
UserAgent: Any
config: Any
BUCKET_NAME_RE: Any
TOO_LONG_DNS_NAME_COMP: Any
GENERATION_RE: Any
VERSION_RE: Any
ENDPOINTS_PATH: Any
def init_logging(): ...
class NullHandler(logging.Handler):
def emit(self, record): ...
log: Any
perflog: Any
def set_file_logger(name, filepath, level: Any = 20, format_string: Incomplete | None = None): ...
def set_stream_logger(name, level: Any = 10, format_string: Incomplete | None = None): ...
def connect_sqs(aws_access_key_id: Incomplete | None = None, aws_secret_access_key: Incomplete | None = None, **kwargs): ...
def connect_s3(aws_access_key_id: str | None = None, aws_secret_access_key: str | None = None, **kwargs) -> S3Connection: ...
def connect_gs(gs_access_key_id: Incomplete | None = None, gs_secret_access_key: Incomplete | None = None, **kwargs): ...
def connect_ec2(aws_access_key_id: Incomplete | None = None, aws_secret_access_key: Incomplete | None = None, **kwargs): ...
def connect_elb(aws_access_key_id: Incomplete | None = None, aws_secret_access_key: Incomplete | None = None, **kwargs): ...
def connect_autoscale(aws_access_key_id: Incomplete | None = None, aws_secret_access_key: Incomplete | None = None, **kwargs): ...
def connect_cloudwatch(
aws_access_key_id: Incomplete | None = None, aws_secret_access_key: Incomplete | None = None, **kwargs
): ...
def connect_sdb(aws_access_key_id: Incomplete | None = None, aws_secret_access_key: Incomplete | None = None, **kwargs): ...
def connect_fps(aws_access_key_id: Incomplete | None = None, aws_secret_access_key: Incomplete | None = None, **kwargs): ...
def connect_mturk(aws_access_key_id: Incomplete | None = None, aws_secret_access_key: Incomplete | None = None, **kwargs): ...
def connect_cloudfront(
aws_access_key_id: Incomplete | None = None, aws_secret_access_key: Incomplete | None = None, **kwargs
): ...
def connect_vpc(aws_access_key_id: Incomplete | None = None, aws_secret_access_key: Incomplete | None = None, **kwargs): ...
def connect_rds(aws_access_key_id: Incomplete | None = None, aws_secret_access_key: Incomplete | None = None, **kwargs): ...
def connect_rds2(aws_access_key_id: Incomplete | None = None, aws_secret_access_key: Incomplete | None = None, **kwargs): ...
def connect_emr(aws_access_key_id: Incomplete | None = None, aws_secret_access_key: Incomplete | None = None, **kwargs): ...
def connect_sns(aws_access_key_id: Incomplete | None = None, aws_secret_access_key: Incomplete | None = None, **kwargs): ...
def connect_iam(aws_access_key_id: Incomplete | None = None, aws_secret_access_key: Incomplete | None = None, **kwargs): ...
def connect_route53(aws_access_key_id: Incomplete | None = None, aws_secret_access_key: Incomplete | None = None, **kwargs): ...
def connect_cloudformation(
aws_access_key_id: Incomplete | None = None, aws_secret_access_key: Incomplete | None = None, **kwargs
): ...
def connect_euca(
host: Incomplete | None = None,
aws_access_key_id: Incomplete | None = None,
aws_secret_access_key: Incomplete | None = None,
port: int = 8773,
path: str = "/services/Eucalyptus",
is_secure: bool = False,
**kwargs,
): ...
def connect_glacier(aws_access_key_id: Incomplete | None = None, aws_secret_access_key: Incomplete | None = None, **kwargs): ...
def connect_ec2_endpoint(
url, aws_access_key_id: Incomplete | None = None, aws_secret_access_key: Incomplete | None = None, **kwargs
): ...
def connect_walrus(
host: Incomplete | None = None,
aws_access_key_id: Incomplete | None = None,
aws_secret_access_key: Incomplete | None = None,
port: int = 8773,
path: str = "/services/Walrus",
is_secure: bool = False,
**kwargs,
): ...
def connect_ses(aws_access_key_id: Incomplete | None = None, aws_secret_access_key: Incomplete | None = None, **kwargs): ...
def connect_sts(aws_access_key_id: Incomplete | None = None, aws_secret_access_key: Incomplete | None = None, **kwargs): ...
def connect_ia(
ia_access_key_id: Incomplete | None = None, ia_secret_access_key: Incomplete | None = None, is_secure: bool = False, **kwargs
): ...
def connect_dynamodb(aws_access_key_id: Incomplete | None = None, aws_secret_access_key: Incomplete | None = None, **kwargs): ...
def connect_swf(aws_access_key_id: Incomplete | None = None, aws_secret_access_key: Incomplete | None = None, **kwargs): ...
def connect_cloudsearch(
aws_access_key_id: Incomplete | None = None, aws_secret_access_key: Incomplete | None = None, **kwargs
): ...
def connect_cloudsearch2(
aws_access_key_id: Incomplete | None = None,
aws_secret_access_key: Incomplete | None = None,
sign_request: bool = False,
**kwargs,
): ...
def connect_cloudsearchdomain(
aws_access_key_id: Incomplete | None = None, aws_secret_access_key: Incomplete | None = None, **kwargs
): ...
def connect_beanstalk(aws_access_key_id: Incomplete | None = None, aws_secret_access_key: Incomplete | None = None, **kwargs): ...
def connect_elastictranscoder(
aws_access_key_id: Incomplete | None = None, aws_secret_access_key: Incomplete | None = None, **kwargs
): ...
def connect_opsworks(aws_access_key_id: Incomplete | None = None, aws_secret_access_key: Incomplete | None = None, **kwargs): ...
def connect_redshift(aws_access_key_id: Incomplete | None = None, aws_secret_access_key: Incomplete | None = None, **kwargs): ...
def connect_support(aws_access_key_id: Incomplete | None = None, aws_secret_access_key: Incomplete | None = None, **kwargs): ...
def connect_cloudtrail(
aws_access_key_id: Incomplete | None = None, aws_secret_access_key: Incomplete | None = None, **kwargs
): ...
def connect_directconnect(
aws_access_key_id: Incomplete | None = None, aws_secret_access_key: Incomplete | None = None, **kwargs
): ...
def connect_kinesis(aws_access_key_id: Incomplete | None = None, aws_secret_access_key: Incomplete | None = None, **kwargs): ...
def connect_logs(aws_access_key_id: Incomplete | None = None, aws_secret_access_key: Incomplete | None = None, **kwargs): ...
def connect_route53domains(
aws_access_key_id: Incomplete | None = None, aws_secret_access_key: Incomplete | None = None, **kwargs
): ...
def connect_cognito_identity(
aws_access_key_id: Incomplete | None = None, aws_secret_access_key: Incomplete | None = None, **kwargs
): ...
def connect_cognito_sync(
aws_access_key_id: Incomplete | None = None, aws_secret_access_key: Incomplete | None = None, **kwargs
): ...
def connect_kms(aws_access_key_id: Incomplete | None = None, aws_secret_access_key: Incomplete | None = None, **kwargs): ...
def connect_awslambda(aws_access_key_id: Incomplete | None = None, aws_secret_access_key: Incomplete | None = None, **kwargs): ...
def connect_codedeploy(
aws_access_key_id: Incomplete | None = None, aws_secret_access_key: Incomplete | None = None, **kwargs
): ...
def connect_configservice(
aws_access_key_id: Incomplete | None = None, aws_secret_access_key: Incomplete | None = None, **kwargs
): ...
def connect_cloudhsm(aws_access_key_id: Incomplete | None = None, aws_secret_access_key: Incomplete | None = None, **kwargs): ...
def connect_ec2containerservice(
aws_access_key_id: Incomplete | None = None, aws_secret_access_key: Incomplete | None = None, **kwargs
): ...
def connect_machinelearning(
aws_access_key_id: Incomplete | None = None, aws_secret_access_key: Incomplete | None = None, **kwargs
): ...
def storage_uri(
uri_str,
default_scheme: str = "file",
debug: int = 0,
validate: bool = True,
bucket_storage_uri_class: Any = ...,
suppress_consec_slashes: bool = True,
is_latest: bool = False,
): ...
def storage_uri_for_key(key): ...
# Explicitly mark this package as incomplete.
def __getattr__(name: str) -> Incomplete: ...

View File

@@ -1,112 +0,0 @@
from _typeshed import Incomplete
from typing import Any
from boto.auth_handler import AuthHandler
SIGV4_DETECT: Any
class HmacKeys:
host: Any
def __init__(self, host, config, provider) -> None: ...
def update_provider(self, provider): ...
def algorithm(self): ...
def sign_string(self, string_to_sign): ...
class AnonAuthHandler(AuthHandler, HmacKeys):
capability: Any
def __init__(self, host, config, provider) -> None: ...
def add_auth(self, http_request, **kwargs): ...
class HmacAuthV1Handler(AuthHandler, HmacKeys):
capability: Any
def __init__(self, host, config, provider) -> None: ...
def update_provider(self, provider): ...
def add_auth(self, http_request, **kwargs): ...
class HmacAuthV2Handler(AuthHandler, HmacKeys):
capability: Any
def __init__(self, host, config, provider) -> None: ...
def update_provider(self, provider): ...
def add_auth(self, http_request, **kwargs): ...
class HmacAuthV3Handler(AuthHandler, HmacKeys):
capability: Any
def __init__(self, host, config, provider) -> None: ...
def add_auth(self, http_request, **kwargs): ...
class HmacAuthV3HTTPHandler(AuthHandler, HmacKeys):
capability: Any
def __init__(self, host, config, provider) -> None: ...
def headers_to_sign(self, http_request): ...
def canonical_headers(self, headers_to_sign): ...
def string_to_sign(self, http_request): ...
def add_auth(self, req, **kwargs): ... # type: ignore[override]
class HmacAuthV4Handler(AuthHandler, HmacKeys):
capability: Any
service_name: Any
region_name: Any
def __init__(
self, host, config, provider, service_name: Incomplete | None = None, region_name: Incomplete | None = None
) -> None: ...
def headers_to_sign(self, http_request): ...
def host_header(self, host, http_request): ...
def query_string(self, http_request): ...
def canonical_query_string(self, http_request): ...
def canonical_headers(self, headers_to_sign): ...
def signed_headers(self, headers_to_sign): ...
def canonical_uri(self, http_request): ...
def payload(self, http_request): ...
def canonical_request(self, http_request): ...
def scope(self, http_request): ...
def split_host_parts(self, host): ...
def determine_region_name(self, host): ...
def determine_service_name(self, host): ...
def credential_scope(self, http_request): ...
def string_to_sign(self, http_request, canonical_request): ...
def signature(self, http_request, string_to_sign): ...
def add_auth(self, req, **kwargs): ... # type: ignore[override]
class S3HmacAuthV4Handler(HmacAuthV4Handler, AuthHandler):
capability: Any
region_name: Any
def __init__(self, *args, **kwargs) -> None: ...
def clean_region_name(self, region_name): ...
def canonical_uri(self, http_request): ...
def canonical_query_string(self, http_request): ...
def host_header(self, host, http_request): ...
def headers_to_sign(self, http_request): ...
def determine_region_name(self, host): ...
def determine_service_name(self, host): ...
def mangle_path_and_params(self, req): ...
def payload(self, http_request): ...
def add_auth(self, req, **kwargs): ... # type: ignore[override]
def presign(self, req, expires, iso_date: Incomplete | None = None): ...
class STSAnonHandler(AuthHandler):
capability: Any
def add_auth(self, http_request, **kwargs): ...
class QuerySignatureHelper(HmacKeys):
def add_auth(self, http_request, **kwargs): ...
class QuerySignatureV0AuthHandler(QuerySignatureHelper, AuthHandler):
SignatureVersion: int
capability: Any
class QuerySignatureV1AuthHandler(QuerySignatureHelper, AuthHandler):
SignatureVersion: int
capability: Any
def __init__(self, *args, **kw) -> None: ...
class QuerySignatureV2AuthHandler(QuerySignatureHelper, AuthHandler):
SignatureVersion: int
capability: Any
class POSTPathQSV2AuthHandler(QuerySignatureV2AuthHandler, AuthHandler):
capability: Any
def add_auth(self, req, **kwargs): ... # type: ignore[override]
def get_auth_handler(host, config, provider, requested_capability: Incomplete | None = None): ...
def detect_potential_sigv4(func): ...
def detect_potential_s3sigv4(func): ...

View File

@@ -1,10 +0,0 @@
from typing import Any
from boto.plugin import Plugin
class NotReadyToAuthenticate(Exception): ...
class AuthHandler(Plugin):
capability: Any
def __init__(self, host, config, provider) -> None: ...
def add_auth(self, http_request): ...

View File

@@ -1,8 +0,0 @@
from base64 import encodebytes as encodebytes
from typing import Any
expanduser: Any
StandardError = Exception
long_type: Any
unquote_str: Any
parse_qs_safe: Any

View File

@@ -1,178 +0,0 @@
import http.client
from _typeshed import Incomplete
from typing import Any
HAVE_HTTPS_CONNECTION: bool
ON_APP_ENGINE: Any
PORTS_BY_SECURITY: Any
DEFAULT_CA_CERTS_FILE: Any
class HostConnectionPool:
queue: Any
def __init__(self) -> None: ...
def size(self): ...
def put(self, conn): ...
def get(self): ...
def clean(self): ...
class ConnectionPool:
CLEAN_INTERVAL: float
STALE_DURATION: float
host_to_pool: Any
last_clean_time: float
mutex: Any
def __init__(self) -> None: ...
def size(self): ...
def get_http_connection(self, host, port, is_secure): ...
def put_http_connection(self, host, port, is_secure, conn): ...
def clean(self): ...
class HTTPRequest:
method: Any
protocol: Any
host: Any
port: Any
path: Any
auth_path: Any
params: Any
headers: Any
body: Any
def __init__(self, method, protocol, host, port, path, auth_path, params, headers, body) -> None: ...
def authorize(self, connection, **kwargs): ...
class HTTPResponse(http.client.HTTPResponse):
def __init__(self, *args, **kwargs) -> None: ...
def read(self, amt: Incomplete | None = None): ...
class AWSAuthConnection:
suppress_consec_slashes: Any
num_retries: int
is_secure: Any
https_validate_certificates: Any
ca_certificates_file: Any
port: Any
http_exceptions: Any
http_unretryable_exceptions: Any
socket_exception_values: Any
https_connection_factory: Any
protocol: str
host: Any
path: Any
debug: Any
host_header: Any
http_connection_kwargs: Any
provider: Any
auth_service_name: Any
request_hook: Any
def __init__(
self,
host,
aws_access_key_id: Incomplete | None = None,
aws_secret_access_key: Incomplete | None = None,
is_secure: bool = True,
port: Incomplete | None = None,
proxy: Incomplete | None = None,
proxy_port: Incomplete | None = None,
proxy_user: Incomplete | None = None,
proxy_pass: Incomplete | None = None,
debug: int = 0,
https_connection_factory: Incomplete | None = None,
path: str = "/",
provider: str = "aws",
security_token: Incomplete | None = None,
suppress_consec_slashes: bool = True,
validate_certs: bool = True,
profile_name: Incomplete | None = None,
) -> None: ...
auth_region_name: Any
@property
def connection(self): ...
@property
def aws_access_key_id(self): ...
@property
def gs_access_key_id(self) -> Any: ...
@property
def access_key(self): ...
@property
def aws_secret_access_key(self): ...
@property
def gs_secret_access_key(self): ...
@property
def secret_key(self): ...
@property
def profile_name(self): ...
def get_path(self, path: str = "/"): ...
def server_name(self, port: Incomplete | None = None): ...
proxy: Any
proxy_port: Any
proxy_user: Any
proxy_pass: Any
no_proxy: Any
use_proxy: Any
def handle_proxy(self, proxy, proxy_port, proxy_user, proxy_pass): ...
def get_http_connection(self, host, port, is_secure): ...
def skip_proxy(self, host): ...
def new_http_connection(self, host, port, is_secure): ...
def put_http_connection(self, host, port, is_secure, connection): ...
def proxy_ssl(self, host: Incomplete | None = None, port: Incomplete | None = None): ...
def prefix_proxy_to_path(self, path, host: Incomplete | None = None): ...
def get_proxy_auth_header(self): ...
def get_proxy_url_with_auth(self): ...
def set_host_header(self, request): ...
def set_request_hook(self, hook): ...
def build_base_http_request(
self,
method,
path,
auth_path,
params: Incomplete | None = None,
headers: Incomplete | None = None,
data: str = "",
host: Incomplete | None = None,
): ...
def make_request(
self,
method,
path,
headers: Incomplete | None = None,
data: str = "",
host: Incomplete | None = None,
auth_path: Incomplete | None = None,
sender: Incomplete | None = None,
override_num_retries: Incomplete | None = None,
params: Incomplete | None = None,
retry_handler: Incomplete | None = None,
): ...
def close(self): ...
class AWSQueryConnection(AWSAuthConnection):
APIVersion: str
ResponseError: Any
def __init__(
self,
aws_access_key_id: Incomplete | None = None,
aws_secret_access_key: Incomplete | None = None,
is_secure: bool = True,
port: Incomplete | None = None,
proxy: Incomplete | None = None,
proxy_port: Incomplete | None = None,
proxy_user: Incomplete | None = None,
proxy_pass: Incomplete | None = None,
host: Incomplete | None = None,
debug: int = 0,
https_connection_factory: Incomplete | None = None,
path: str = "/",
security_token: Incomplete | None = None,
validate_certs: bool = True,
profile_name: Incomplete | None = None,
provider: str = "aws",
) -> None: ...
def get_utf8_value(self, value): ...
def make_request( # type: ignore[override]
self, action, params: Incomplete | None = None, path: str = "/", verb: str = "GET", *args, **kwargs
): ...
def build_list_params(self, params, items, label): ...
def build_complex_list_params(self, params, items, label, names): ...
def get_list(self, action, params, markers, path: str = "/", parent: Incomplete | None = None, verb: str = "GET"): ...
def get_object(self, action, params, cls, path: str = "/", parent: Incomplete | None = None, verb: str = "GET"): ...
def get_status(self, action, params, path: str = "/", parent: Incomplete | None = None, verb: str = "GET"): ...

View File

@@ -1,11 +0,0 @@
from _typeshed import Incomplete
from typing import Any
RegionData: Any
def regions(**kw_params): ...
def connect_to_region(region_name, **kw_params): ...
def get_region(region_name, **kw_params): ...
# Explicitly mark this package as incomplete.
def __getattr__(name: str) -> Incomplete: ...

View File

@@ -1,59 +0,0 @@
from typing import Any
from boto.connection import AWSQueryConnection
RegionData: Any
def regions(): ...
def connect_to_region(region_name, **kw_params): ...
class ELBConnection(AWSQueryConnection):
APIVersion: Any
DefaultRegionName: Any
DefaultRegionEndpoint: Any
region: Any
def __init__(
self,
aws_access_key_id=...,
aws_secret_access_key=...,
is_secure=...,
port=...,
proxy=...,
proxy_port=...,
proxy_user=...,
proxy_pass=...,
debug=...,
https_connection_factory=...,
region=...,
path=...,
security_token=...,
validate_certs=...,
profile_name=...,
) -> None: ...
def build_list_params(self, params, items, label): ...
def get_all_load_balancers(self, load_balancer_names=..., marker=...): ...
def create_load_balancer(
self, name, zones, listeners=..., subnets=..., security_groups=..., scheme=..., complex_listeners=...
): ...
def create_load_balancer_listeners(self, name, listeners=..., complex_listeners=...): ...
def delete_load_balancer(self, name): ...
def delete_load_balancer_listeners(self, name, ports): ...
def enable_availability_zones(self, load_balancer_name, zones_to_add): ...
def disable_availability_zones(self, load_balancer_name, zones_to_remove): ...
def modify_lb_attribute(self, load_balancer_name, attribute, value): ...
def get_all_lb_attributes(self, load_balancer_name): ...
def get_lb_attribute(self, load_balancer_name, attribute): ...
def register_instances(self, load_balancer_name, instances): ...
def deregister_instances(self, load_balancer_name, instances): ...
def describe_instance_health(self, load_balancer_name, instances=...): ...
def configure_health_check(self, name, health_check): ...
def set_lb_listener_SSL_certificate(self, lb_name, lb_port, ssl_certificate_id): ...
def create_app_cookie_stickiness_policy(self, name, lb_name, policy_name): ...
def create_lb_cookie_stickiness_policy(self, cookie_expiration_period, lb_name, policy_name): ...
def create_lb_policy(self, lb_name, policy_name, policy_type, policy_attributes): ...
def delete_lb_policy(self, lb_name, policy_name): ...
def set_lb_policies_of_listener(self, lb_name, lb_port, policies): ...
def set_lb_policies_of_backend_server(self, lb_name, instance_port, policies): ...
def apply_security_groups_to_lb(self, name, security_groups): ...
def attach_lb_to_subnets(self, name, subnets): ...
def detach_lb_from_subnets(self, name, subnets): ...

View File

@@ -1,148 +0,0 @@
from _typeshed import Incomplete
from typing import Any
from boto.compat import StandardError
class BotoClientError(StandardError):
reason: Any
def __init__(self, reason, *args) -> None: ...
class SDBPersistenceError(StandardError): ...
class StoragePermissionsError(BotoClientError): ...
class S3PermissionsError(StoragePermissionsError): ...
class GSPermissionsError(StoragePermissionsError): ...
class BotoServerError(StandardError):
status: Any
reason: Any
body: Any
request_id: Any
error_code: Any
message: str
box_usage: Any
def __init__(self, status, reason, body: Incomplete | None = None, *args) -> None: ...
def __getattr__(self, name: str): ...
def __setattr__(self, name: str, value) -> None: ...
def startElement(self, name, attrs, connection): ...
def endElement(self, name, value, connection): ...
class ConsoleOutput:
parent: Any
instance_id: Any
timestamp: Any
comment: Any
output: Any
def __init__(self, parent: Incomplete | None = None) -> None: ...
def startElement(self, name, attrs, connection): ...
def endElement(self, name, value, connection): ...
class StorageCreateError(BotoServerError):
bucket: Any
def __init__(self, status, reason, body: Incomplete | None = None) -> None: ...
def endElement(self, name, value, connection): ...
class S3CreateError(StorageCreateError): ...
class GSCreateError(StorageCreateError): ...
class StorageCopyError(BotoServerError): ...
class S3CopyError(StorageCopyError): ...
class GSCopyError(StorageCopyError): ...
class SQSError(BotoServerError):
detail: Any
type: Any
def __init__(self, status, reason, body: Incomplete | None = None) -> None: ...
def startElement(self, name, attrs, connection): ...
def endElement(self, name, value, connection): ...
class SQSDecodeError(BotoClientError):
message: Any
def __init__(self, reason, message) -> None: ...
class StorageResponseError(BotoServerError):
resource: Any
def __init__(self, status, reason, body: Incomplete | None = None) -> None: ...
def startElement(self, name, attrs, connection): ...
def endElement(self, name, value, connection): ...
class S3ResponseError(StorageResponseError): ...
class GSResponseError(StorageResponseError): ...
class EC2ResponseError(BotoServerError):
errors: Any
def __init__(self, status, reason, body: Incomplete | None = None) -> None: ...
def startElement(self, name, attrs, connection): ...
request_id: Any
def endElement(self, name, value, connection): ...
class JSONResponseError(BotoServerError):
status: Any
reason: Any
body: Any
error_message: Any
error_code: Any
def __init__(self, status, reason, body: Incomplete | None = None, *args) -> None: ...
class DynamoDBResponseError(JSONResponseError): ...
class SWFResponseError(JSONResponseError): ...
class EmrResponseError(BotoServerError): ...
class _EC2Error:
connection: Any
error_code: Any
error_message: Any
def __init__(self, connection: Incomplete | None = None) -> None: ...
def startElement(self, name, attrs, connection): ...
def endElement(self, name, value, connection): ...
class SDBResponseError(BotoServerError): ...
class AWSConnectionError(BotoClientError): ...
class StorageDataError(BotoClientError): ...
class S3DataError(StorageDataError): ...
class GSDataError(StorageDataError): ...
class InvalidUriError(Exception):
message: Any
def __init__(self, message) -> None: ...
class InvalidAclError(Exception):
message: Any
def __init__(self, message) -> None: ...
class InvalidCorsError(Exception):
message: Any
def __init__(self, message) -> None: ...
class NoAuthHandlerFound(Exception): ...
class InvalidLifecycleConfigError(Exception):
message: Any
def __init__(self, message) -> None: ...
class ResumableTransferDisposition:
START_OVER: str
WAIT_BEFORE_RETRY: str
ABORT_CUR_PROCESS: str
ABORT: str
class ResumableUploadException(Exception):
message: Any
disposition: Any
def __init__(self, message, disposition) -> None: ...
class ResumableDownloadException(Exception):
message: Any
disposition: Any
def __init__(self, message, disposition) -> None: ...
class TooManyRecordsException(Exception):
message: Any
def __init__(self, message) -> None: ...
class PleaseRetryException(Exception):
message: Any
response: Any
def __init__(self, message, response: Incomplete | None = None) -> None: ...
class InvalidInstanceMetadataError(Exception):
MSG: str
def __init__(self, msg) -> None: ...

View File

@@ -1,4 +0,0 @@
import boto.regioninfo
def regions() -> list[boto.regioninfo.RegionInfo]: ...
def connect_to_region(region_name, **kw_params): ...

View File

@@ -1,17 +0,0 @@
from boto.exception import BotoServerError
class InvalidGrantTokenException(BotoServerError): ...
class DisabledException(BotoServerError): ...
class LimitExceededException(BotoServerError): ...
class DependencyTimeoutException(BotoServerError): ...
class InvalidMarkerException(BotoServerError): ...
class AlreadyExistsException(BotoServerError): ...
class InvalidCiphertextException(BotoServerError): ...
class KeyUnavailableException(BotoServerError): ...
class InvalidAliasNameException(BotoServerError): ...
class UnsupportedOperationException(BotoServerError): ...
class InvalidArnException(BotoServerError): ...
class KMSInternalException(BotoServerError): ...
class InvalidKeyUsageException(BotoServerError): ...
class MalformedPolicyDocumentException(BotoServerError): ...
class NotFoundException(BotoServerError): ...

View File

@@ -1,78 +0,0 @@
from collections.abc import Mapping
from typing import Any
from boto.connection import AWSQueryConnection
class KMSConnection(AWSQueryConnection):
APIVersion: str
DefaultRegionName: str
DefaultRegionEndpoint: str
ServiceName: str
TargetPrefix: str
ResponseError: type[Exception]
region: Any
def __init__(self, **kwargs) -> None: ...
def create_alias(self, alias_name: str, target_key_id: str) -> dict[str, Any] | None: ...
def create_grant(
self,
key_id: str,
grantee_principal: str,
retiring_principal: str | None = None,
operations: list[str] | None = None,
constraints: dict[str, dict[str, str]] | None = None,
grant_tokens: list[str] | None = None,
) -> dict[str, Any] | None: ...
def create_key(
self, policy: str | None = None, description: str | None = None, key_usage: str | None = None
) -> dict[str, Any] | None: ...
def decrypt(
self, ciphertext_blob: bytes, encryption_context: Mapping[str, Any] | None = None, grant_tokens: list[str] | None = None
) -> dict[str, Any] | None: ...
def delete_alias(self, alias_name: str) -> dict[str, Any] | None: ...
def describe_key(self, key_id: str) -> dict[str, Any] | None: ...
def disable_key(self, key_id: str) -> dict[str, Any] | None: ...
def disable_key_rotation(self, key_id: str) -> dict[str, Any] | None: ...
def enable_key(self, key_id: str) -> dict[str, Any] | None: ...
def enable_key_rotation(self, key_id: str) -> dict[str, Any] | None: ...
def encrypt(
self,
key_id: str,
plaintext: bytes,
encryption_context: Mapping[str, Any] | None = None,
grant_tokens: list[str] | None = None,
) -> dict[str, Any] | None: ...
def generate_data_key(
self,
key_id: str,
encryption_context: Mapping[str, Any] | None = None,
number_of_bytes: int | None = None,
key_spec: str | None = None,
grant_tokens: list[str] | None = None,
) -> dict[str, Any] | None: ...
def generate_data_key_without_plaintext(
self,
key_id: str,
encryption_context: Mapping[str, Any] | None = None,
key_spec: str | None = None,
number_of_bytes: int | None = None,
grant_tokens: list[str] | None = None,
) -> dict[str, Any] | None: ...
def generate_random(self, number_of_bytes: int | None = None) -> dict[str, Any] | None: ...
def get_key_policy(self, key_id: str, policy_name: str) -> dict[str, Any] | None: ...
def get_key_rotation_status(self, key_id: str) -> dict[str, Any] | None: ...
def list_aliases(self, limit: int | None = None, marker: str | None = None) -> dict[str, Any] | None: ...
def list_grants(self, key_id: str, limit: int | None = None, marker: str | None = None) -> dict[str, Any] | None: ...
def list_key_policies(self, key_id: str, limit: int | None = None, marker: str | None = None) -> dict[str, Any] | None: ...
def list_keys(self, limit: int | None = None, marker: str | None = None) -> dict[str, Any] | None: ...
def put_key_policy(self, key_id: str, policy_name: str, policy: str) -> dict[str, Any] | None: ...
def re_encrypt(
self,
ciphertext_blob: bytes,
destination_key_id: str,
source_encryption_context: Mapping[str, Any] | None = None,
destination_encryption_context: Mapping[str, Any] | None = None,
grant_tokens: list[str] | None = None,
) -> dict[str, Any] | None: ...
def retire_grant(self, grant_token: str) -> dict[str, Any] | None: ...
def revoke_grant(self, key_id: str, grant_id: str) -> dict[str, Any] | None: ...
def update_key_description(self, key_id: str, description: str) -> dict[str, Any] | None: ...

View File

@@ -1,10 +0,0 @@
from _typeshed import Incomplete
from typing import Any
class Plugin:
capability: Any
@classmethod
def is_capable(cls, requested_capability): ...
def get_plugin(cls, requested_capability: Incomplete | None = None): ...
def load_plugins(config): ...

View File

@@ -1,23 +0,0 @@
from _typeshed import Incomplete
from typing import Any
def load_endpoint_json(path): ...
def merge_endpoints(defaults, additions): ...
def load_regions(): ...
def get_regions(service_name, region_cls: Incomplete | None = None, connection_cls: Incomplete | None = None): ...
class RegionInfo:
connection: Any
name: Any
endpoint: Any
connection_cls: Any
def __init__(
self,
connection: Incomplete | None = None,
name: Incomplete | None = None,
endpoint: Incomplete | None = None,
connection_cls: Incomplete | None = None,
) -> None: ...
def startElement(self, name, attrs, connection): ...
def endElement(self, name, value, connection): ...
def connect(self, **kw_params): ...

View File

@@ -1,16 +0,0 @@
from boto.connection import AWSAuthConnection
from boto.regioninfo import RegionInfo
from .connection import S3Connection
class S3RegionInfo(RegionInfo):
def connect(
self,
name: str | None = ...,
endpoint: str | None = ...,
connection_cls: type[AWSAuthConnection] | None = ...,
**kw_params,
) -> S3Connection: ...
def regions() -> list[S3RegionInfo]: ...
def connect_to_region(region_name: str, **kw_params): ...

View File

@@ -1,49 +0,0 @@
from _typeshed import Incomplete
from typing import Any
from .connection import S3Connection
from .user import User
CannedACLStrings: list[str]
class Policy:
parent: Any
namespace: Any
acl: ACL
def __init__(self, parent: Incomplete | None = None) -> None: ...
owner: User
def startElement(self, name: str, attrs: dict[str, Any], connection: S3Connection) -> None | User | ACL: ...
def endElement(self, name: str, value: Any, connection: S3Connection) -> None: ...
def to_xml(self) -> str: ...
class ACL:
policy: Policy
grants: list[Grant]
def __init__(self, policy: Policy | None = None) -> None: ...
def add_grant(self, grant: Grant) -> None: ...
def add_email_grant(self, permission: str, email_address: str) -> None: ...
def add_user_grant(self, permission: str, user_id: str, display_name: str | None = None) -> None: ...
def startElement(self, name, attrs, connection): ...
def endElement(self, name: str, value: Any, connection: S3Connection) -> None: ...
def to_xml(self) -> str: ...
class Grant:
NameSpace: str
permission: str
id: str
display_name: str
uri: str
email_address: str
type: str
def __init__(
self,
permission: str | None = None,
type: str | None = None,
id: str | None = None,
display_name: str | None = None,
uri: str | None = None,
email_address: str | None = None,
) -> None: ...
def startElement(self, name, attrs, connection): ...
def endElement(self, name: str, value: Any, connection: S3Connection) -> None: ...
def to_xml(self) -> str: ...

View File

@@ -1,192 +0,0 @@
from _typeshed import Incomplete
from typing import Any
from .bucketlistresultset import BucketListResultSet
from .connection import S3Connection
from .key import Key
class S3WebsiteEndpointTranslate:
trans_region: dict[str, str]
@classmethod
def translate_region(cls, reg: str) -> str: ...
S3Permissions: list[str]
class Bucket:
LoggingGroup: str
BucketPaymentBody: str
VersioningBody: str
VersionRE: str
MFADeleteRE: str
name: str
connection: S3Connection
key_class: type[Key]
def __init__(self, connection: S3Connection | None = None, name: str | None = None, key_class: type[Key] = ...) -> None: ...
def __iter__(self): ...
def __contains__(self, key_name) -> bool: ...
def startElement(self, name, attrs, connection): ...
creation_date: Any
def endElement(self, name, value, connection): ...
def set_key_class(self, key_class): ...
def lookup(self, key_name, headers: dict[str, str] | None = None): ...
def get_key(
self,
key_name,
headers: dict[str, str] | None = None,
version_id: Incomplete | None = None,
response_headers: dict[str, str] | None = None,
validate: bool = True,
) -> Key: ...
def list(
self,
prefix: str = "",
delimiter: str = "",
marker: str = "",
headers: dict[str, str] | None = None,
encoding_type: Incomplete | None = None,
) -> BucketListResultSet: ...
def list_versions(
self,
prefix: str = "",
delimiter: str = "",
key_marker: str = "",
version_id_marker: str = "",
headers: dict[str, str] | None = None,
encoding_type: str | None = None,
) -> BucketListResultSet: ...
def list_multipart_uploads(
self,
key_marker: str = "",
upload_id_marker: str = "",
headers: dict[str, str] | None = None,
encoding_type: Incomplete | None = None,
): ...
def validate_kwarg_names(self, kwargs, names): ...
def get_all_keys(self, headers: dict[str, str] | None = None, **params): ...
def get_all_versions(self, headers: dict[str, str] | None = None, **params): ...
def validate_get_all_versions_params(self, params): ...
def get_all_multipart_uploads(self, headers: dict[str, str] | None = None, **params): ...
def new_key(self, key_name: Incomplete | None = None): ...
def generate_url(
self,
expires_in,
method: str = "GET",
headers: dict[str, str] | None = None,
force_http: bool = False,
response_headers: dict[str, str] | None = None,
expires_in_absolute: bool = False,
): ...
def delete_keys(
self, keys, quiet: bool = False, mfa_token: Incomplete | None = None, headers: dict[str, str] | None = None
): ...
def delete_key(
self,
key_name,
headers: dict[str, str] | None = None,
version_id: Incomplete | None = None,
mfa_token: Incomplete | None = None,
): ...
def copy_key(
self,
new_key_name,
src_bucket_name,
src_key_name,
metadata: Incomplete | None = None,
src_version_id: Incomplete | None = None,
storage_class: str = "STANDARD",
preserve_acl: bool = False,
encrypt_key: bool = False,
headers: dict[str, str] | None = None,
query_args: Incomplete | None = None,
): ...
def set_canned_acl(
self, acl_str, key_name: str = "", headers: dict[str, str] | None = None, version_id: Incomplete | None = None
): ...
def get_xml_acl(self, key_name: str = "", headers: dict[str, str] | None = None, version_id: Incomplete | None = None): ...
def set_xml_acl(
self,
acl_str,
key_name: str = "",
headers: dict[str, str] | None = None,
version_id: Incomplete | None = None,
query_args: str = "acl",
): ...
def set_acl(
self, acl_or_str, key_name: str = "", headers: dict[str, str] | None = None, version_id: Incomplete | None = None
): ...
def get_acl(self, key_name: str = "", headers: dict[str, str] | None = None, version_id: Incomplete | None = None): ...
def set_subresource(
self, subresource, value, key_name: str = "", headers: dict[str, str] | None = None, version_id: Incomplete | None = None
): ...
def get_subresource(
self, subresource, key_name: str = "", headers: dict[str, str] | None = None, version_id: Incomplete | None = None
): ...
def make_public(self, recursive: bool = False, headers: dict[str, str] | None = None): ...
def add_email_grant(self, permission, email_address, recursive: bool = False, headers: dict[str, str] | None = None): ...
def add_user_grant(
self,
permission,
user_id,
recursive: bool = False,
headers: dict[str, str] | None = None,
display_name: Incomplete | None = None,
): ...
def list_grants(self, headers: dict[str, str] | None = None): ...
def get_location(self): ...
def set_xml_logging(self, logging_str, headers: dict[str, str] | None = None): ...
def enable_logging(
self, target_bucket, target_prefix: str = "", grants: Incomplete | None = None, headers: dict[str, str] | None = None
): ...
def disable_logging(self, headers: dict[str, str] | None = None): ...
def get_logging_status(self, headers: dict[str, str] | None = None): ...
def set_as_logging_target(self, headers: dict[str, str] | None = None): ...
def get_request_payment(self, headers: dict[str, str] | None = None): ...
def set_request_payment(self, payer: str = "BucketOwner", headers: dict[str, str] | None = None): ...
def configure_versioning(
self, versioning, mfa_delete: bool = False, mfa_token: Incomplete | None = None, headers: dict[str, str] | None = None
): ...
def get_versioning_status(self, headers: dict[str, str] | None = None): ...
def configure_lifecycle(self, lifecycle_config, headers: dict[str, str] | None = None): ...
def get_lifecycle_config(self, headers: dict[str, str] | None = None): ...
def delete_lifecycle_configuration(self, headers: dict[str, str] | None = None): ...
def configure_website(
self,
suffix: Incomplete | None = None,
error_key: Incomplete | None = None,
redirect_all_requests_to: Incomplete | None = None,
routing_rules: Incomplete | None = None,
headers: dict[str, str] | None = None,
): ...
def set_website_configuration(self, config, headers: dict[str, str] | None = None): ...
def set_website_configuration_xml(self, xml, headers: dict[str, str] | None = None): ...
def get_website_configuration(self, headers: dict[str, str] | None = None): ...
def get_website_configuration_obj(self, headers: dict[str, str] | None = None): ...
def get_website_configuration_with_xml(self, headers: dict[str, str] | None = None): ...
def get_website_configuration_xml(self, headers: dict[str, str] | None = None): ...
def delete_website_configuration(self, headers: dict[str, str] | None = None): ...
def get_website_endpoint(self): ...
def get_policy(self, headers: dict[str, str] | None = None): ...
def set_policy(self, policy, headers: dict[str, str] | None = None): ...
def delete_policy(self, headers: dict[str, str] | None = None): ...
def set_cors_xml(self, cors_xml, headers: dict[str, str] | None = None): ...
def set_cors(self, cors_config, headers: dict[str, str] | None = None): ...
def get_cors_xml(self, headers: dict[str, str] | None = None): ...
def get_cors(self, headers: dict[str, str] | None = None): ...
def delete_cors(self, headers: dict[str, str] | None = None): ...
def initiate_multipart_upload(
self,
key_name,
headers: dict[str, str] | None = None,
reduced_redundancy: bool = False,
metadata: Incomplete | None = None,
encrypt_key: bool = False,
policy: Incomplete | None = None,
): ...
def complete_multipart_upload(self, key_name, upload_id, xml_body, headers: dict[str, str] | None = None): ...
def cancel_multipart_upload(self, key_name, upload_id, headers: dict[str, str] | None = None): ...
def delete(self, headers: dict[str, str] | None = None): ...
def get_tags(self): ...
def get_xml_tags(self): ...
def set_xml_tags(self, tag_str, headers: dict[str, str] | None = None, query_args: str = "tagging"): ...
def set_tags(self, tags, headers: dict[str, str] | None = None): ...
def delete_tags(self, headers: dict[str, str] | None = None): ...

View File

@@ -1,86 +0,0 @@
from _typeshed import Incomplete
from collections.abc import Iterable, Iterator
from typing import Any
from .key import Key
def bucket_lister(
bucket,
prefix: str = "",
delimiter: str = "",
marker: str = "",
headers: Incomplete | None = None,
encoding_type: Incomplete | None = None,
): ...
class BucketListResultSet(Iterable[Key]):
bucket: Any
prefix: Any
delimiter: Any
marker: Any
headers: Any
encoding_type: Any
def __init__(
self,
bucket: Incomplete | None = None,
prefix: str = "",
delimiter: str = "",
marker: str = "",
headers: Incomplete | None = None,
encoding_type: Incomplete | None = None,
) -> None: ...
def __iter__(self) -> Iterator[Key]: ...
def versioned_bucket_lister(
bucket,
prefix: str = "",
delimiter: str = "",
key_marker: str = "",
version_id_marker: str = "",
headers: Incomplete | None = None,
encoding_type: Incomplete | None = None,
): ...
class VersionedBucketListResultSet:
bucket: Any
prefix: Any
delimiter: Any
key_marker: Any
version_id_marker: Any
headers: Any
encoding_type: Any
def __init__(
self,
bucket: Incomplete | None = None,
prefix: str = "",
delimiter: str = "",
key_marker: str = "",
version_id_marker: str = "",
headers: Incomplete | None = None,
encoding_type: Incomplete | None = None,
) -> None: ...
def __iter__(self) -> Iterator[Key]: ...
def multipart_upload_lister(
bucket,
key_marker: str = "",
upload_id_marker: str = "",
headers: Incomplete | None = None,
encoding_type: Incomplete | None = None,
): ...
class MultiPartUploadListResultSet:
bucket: Any
key_marker: Any
upload_id_marker: Any
headers: Any
encoding_type: Any
def __init__(
self,
bucket: Incomplete | None = None,
key_marker: str = "",
upload_id_marker: str = "",
headers: Incomplete | None = None,
encoding_type: Incomplete | None = None,
) -> None: ...
def __iter__(self): ...

View File

@@ -1,14 +0,0 @@
from _typeshed import Incomplete
from typing import Any
class BucketLogging:
target: Any
prefix: Any
grants: Any
def __init__(
self, target: Incomplete | None = None, prefix: Incomplete | None = None, grants: Incomplete | None = None
) -> None: ...
def add_grant(self, grant): ...
def startElement(self, name, attrs, connection): ...
def endElement(self, name, value, connection): ...
def to_xml(self): ...

View File

@@ -1,142 +0,0 @@
from _typeshed import Incomplete
from typing import Any
from boto.connection import AWSAuthConnection
from boto.exception import BotoClientError
from .bucket import Bucket
def check_lowercase_bucketname(n): ...
def assert_case_insensitive(f): ...
class _CallingFormat:
def get_bucket_server(self, server, bucket): ...
def build_url_base(self, connection, protocol, server, bucket, key: str = ""): ...
def build_host(self, server, bucket): ...
def build_auth_path(self, bucket, key: str = ""): ...
def build_path_base(self, bucket, key: str = ""): ...
class SubdomainCallingFormat(_CallingFormat):
def get_bucket_server(self, server, bucket): ...
class VHostCallingFormat(_CallingFormat):
def get_bucket_server(self, server, bucket): ...
class OrdinaryCallingFormat(_CallingFormat):
def get_bucket_server(self, server, bucket): ...
def build_path_base(self, bucket, key: str = ""): ...
class ProtocolIndependentOrdinaryCallingFormat(OrdinaryCallingFormat):
def build_url_base(self, connection, protocol, server, bucket, key: str = ""): ...
class Location:
DEFAULT: str
EU: str
EUCentral1: str
USWest: str
USWest2: str
SAEast: str
APNortheast: str
APSoutheast: str
APSoutheast2: str
CNNorth1: str
class NoHostProvided: ...
class HostRequiredError(BotoClientError): ...
class S3Connection(AWSAuthConnection):
DefaultHost: Any
DefaultCallingFormat: Any
QueryString: str
calling_format: Any
bucket_class: type[Bucket]
anon: Any
def __init__(
self,
aws_access_key_id: Incomplete | None = None,
aws_secret_access_key: Incomplete | None = None,
is_secure: bool = True,
port: Incomplete | None = None,
proxy: Incomplete | None = None,
proxy_port: Incomplete | None = None,
proxy_user: Incomplete | None = None,
proxy_pass: Incomplete | None = None,
host: Any = ...,
debug: int = 0,
https_connection_factory: Incomplete | None = None,
calling_format: Any = "boto.s3.connection.SubdomainCallingFormat",
path: str = "/",
provider: str = "aws",
bucket_class: type[Bucket] = ...,
security_token: Incomplete | None = None,
suppress_consec_slashes: bool = True,
anon: bool = False,
validate_certs: Incomplete | None = None,
profile_name: Incomplete | None = None,
) -> None: ...
def __iter__(self): ...
def __contains__(self, bucket_name): ...
def set_bucket_class(self, bucket_class: type[Bucket]) -> None: ...
def build_post_policy(self, expiration_time, conditions): ...
def build_post_form_args(
self,
bucket_name,
key,
expires_in: int = 6000,
acl: Incomplete | None = None,
success_action_redirect: Incomplete | None = None,
max_content_length: Incomplete | None = None,
http_method: str = "http",
fields: Incomplete | None = None,
conditions: Incomplete | None = None,
storage_class: str = "STANDARD",
server_side_encryption: Incomplete | None = None,
): ...
def generate_url_sigv4(
self,
expires_in,
method,
bucket: str = "",
key: str = "",
headers: dict[str, str] | None = None,
force_http: bool = False,
response_headers: dict[str, str] | None = None,
version_id: Incomplete | None = None,
iso_date: Incomplete | None = None,
): ...
def generate_url(
self,
expires_in,
method,
bucket: str = "",
key: str = "",
headers: dict[str, str] | None = None,
query_auth: bool = True,
force_http: bool = False,
response_headers: dict[str, str] | None = None,
expires_in_absolute: bool = False,
version_id: Incomplete | None = None,
): ...
def get_all_buckets(self, headers: dict[str, str] | None = None): ...
def get_canonical_user_id(self, headers: dict[str, str] | None = None): ...
def get_bucket(self, bucket_name: str, validate: bool = True, headers: dict[str, str] | None = None) -> Bucket: ...
def head_bucket(self, bucket_name, headers: dict[str, str] | None = None): ...
def lookup(self, bucket_name, validate: bool = True, headers: dict[str, str] | None = None): ...
def create_bucket(
self, bucket_name, headers: dict[str, str] | None = None, location: Any = "", policy: Incomplete | None = None
): ...
def delete_bucket(self, bucket, headers: dict[str, str] | None = None): ...
def make_request( # type: ignore[override]
self,
method,
bucket: str = "",
key: str = "",
headers: Incomplete | None = None,
data: str = "",
query_args: Incomplete | None = None,
sender: Incomplete | None = None,
override_num_retries: Incomplete | None = None,
retry_handler: Incomplete | None = None,
*args,
**kwargs,
): ...

View File

@@ -1,36 +0,0 @@
from _typeshed import Incomplete
from typing import Any
class CORSRule:
allowed_method: Any
allowed_origin: Any
id: Any
allowed_header: Any
max_age_seconds: Any
expose_header: Any
def __init__(
self,
allowed_method: Incomplete | None = None,
allowed_origin: Incomplete | None = None,
id: Incomplete | None = None,
allowed_header: Incomplete | None = None,
max_age_seconds: Incomplete | None = None,
expose_header: Incomplete | None = None,
) -> None: ...
def startElement(self, name, attrs, connection): ...
def endElement(self, name, value, connection): ...
def to_xml(self) -> str: ...
class CORSConfiguration(list[CORSRule]):
def startElement(self, name, attrs, connection): ...
def endElement(self, name, value, connection): ...
def to_xml(self) -> str: ...
def add_rule(
self,
allowed_method,
allowed_origin,
id: Incomplete | None = None,
allowed_header: Incomplete | None = None,
max_age_seconds: Incomplete | None = None,
expose_header: Incomplete | None = None,
): ...

View File

@@ -1,13 +0,0 @@
from _typeshed import Incomplete
from typing import Any
class DeleteMarker:
bucket: Any
name: Any
version_id: Any
is_latest: bool
last_modified: Any
owner: Any
def __init__(self, bucket: Incomplete | None = None, name: Incomplete | None = None) -> None: ...
def startElement(self, name, attrs, connection): ...
def endElement(self, name, value, connection): ...

View File

@@ -1,235 +0,0 @@
from _typeshed import Incomplete
from collections.abc import Callable
from typing import Any, overload
class Key:
DefaultContentType: str
RestoreBody: str
BufferSize: Any
base_user_settable_fields: Any
base_fields: Any
bucket: Any
name: str
metadata: Any
cache_control: Any
content_type: Any
content_encoding: Any
content_disposition: Any
content_language: Any
filename: Any
etag: Any
is_latest: bool
last_modified: Any
owner: Any
path: Any
resp: Any
mode: Any
size: Any
version_id: Any
source_version_id: Any
delete_marker: bool
encrypted: Any
ongoing_restore: Any
expiry_date: Any
local_hashes: Any
def __init__(self, bucket: Incomplete | None = None, name: Incomplete | None = None) -> None: ...
def __iter__(self): ...
@property
def provider(self): ...
key: Any
md5: Any
base64md5: Any
storage_class: Any
def get_md5_from_hexdigest(self, md5_hexdigest): ...
def handle_encryption_headers(self, resp): ...
def handle_version_headers(self, resp, force: bool = False): ...
def handle_restore_headers(self, response): ...
def handle_addl_headers(self, headers): ...
def open_read(
self,
headers: dict[str, str] | None = None,
query_args: str = "",
override_num_retries: Incomplete | None = None,
response_headers: dict[str, str] | None = None,
): ...
def open_write(self, headers: dict[str, str] | None = None, override_num_retries: Incomplete | None = None): ...
def open(
self,
mode: str = "r",
headers: dict[str, str] | None = None,
query_args: Incomplete | None = None,
override_num_retries: Incomplete | None = None,
): ...
closed: bool
def close(self, fast: bool = False): ...
def next(self): ...
__next__: Any
def read(self, size: int = 0): ...
def change_storage_class(self, new_storage_class, dst_bucket: Incomplete | None = None, validate_dst_bucket: bool = True): ...
def copy(
self,
dst_bucket,
dst_key,
metadata: Incomplete | None = None,
reduced_redundancy: bool = False,
preserve_acl: bool = False,
encrypt_key: bool = False,
validate_dst_bucket: bool = True,
): ...
def startElement(self, name, attrs, connection): ...
def endElement(self, name, value, connection): ...
def exists(self, headers: dict[str, str] | None = None): ...
def delete(self, headers: dict[str, str] | None = None): ...
def get_metadata(self, name): ...
def set_metadata(self, name, value): ...
def update_metadata(self, d): ...
def set_acl(self, acl_str, headers: dict[str, str] | None = None): ...
def get_acl(self, headers: dict[str, str] | None = None): ...
def get_xml_acl(self, headers: dict[str, str] | None = None): ...
def set_xml_acl(self, acl_str, headers: dict[str, str] | None = None): ...
def set_canned_acl(self, acl_str, headers: dict[str, str] | None = None): ...
def get_redirect(self): ...
def set_redirect(self, redirect_location, headers: dict[str, str] | None = None): ...
def make_public(self, headers: dict[str, str] | None = None): ...
def generate_url(
self,
expires_in,
method: str = "GET",
headers: dict[str, str] | None = None,
query_auth: bool = True,
force_http: bool = False,
response_headers: dict[str, str] | None = None,
expires_in_absolute: bool = False,
version_id: Incomplete | None = None,
policy: Incomplete | None = None,
reduced_redundancy: bool = False,
encrypt_key: bool = False,
): ...
def send_file(
self,
fp,
headers: dict[str, str] | None = None,
cb: Callable[[int, int], object] | None = None,
num_cb: int = 10,
query_args: Incomplete | None = None,
chunked_transfer: bool = False,
size: Incomplete | None = None,
): ...
def should_retry(self, response, chunked_transfer: bool = False): ...
def compute_md5(self, fp, size: Incomplete | None = None): ...
def set_contents_from_stream(
self,
fp,
headers: dict[str, str] | None = None,
replace: bool = True,
cb: Callable[[int, int], object] | None = None,
num_cb: int = 10,
policy: Incomplete | None = None,
reduced_redundancy: bool = False,
query_args: Incomplete | None = None,
size: Incomplete | None = None,
): ...
def set_contents_from_file(
self,
fp,
headers: dict[str, str] | None = None,
replace: bool = True,
cb: Callable[[int, int], object] | None = None,
num_cb: int = 10,
policy: Incomplete | None = None,
md5: Incomplete | None = None,
reduced_redundancy: bool = False,
query_args: Incomplete | None = None,
encrypt_key: bool = False,
size: Incomplete | None = None,
rewind: bool = False,
): ...
def set_contents_from_filename(
self,
filename,
headers: dict[str, str] | None = None,
replace: bool = True,
cb: Callable[[int, int], object] | None = None,
num_cb: int = 10,
policy: Incomplete | None = None,
md5: Incomplete | None = None,
reduced_redundancy: bool = False,
encrypt_key: bool = False,
): ...
def set_contents_from_string(
self,
string_data: str | bytes,
headers: dict[str, str] | None = None,
replace: bool = True,
cb: Callable[[int, int], object] | None = None,
num_cb: int = 10,
policy: Incomplete | None = None,
md5: Incomplete | None = None,
reduced_redundancy: bool = False,
encrypt_key: bool = False,
) -> None: ...
def get_file(
self,
fp,
headers: dict[str, str] | None = None,
cb: Callable[[int, int], object] | None = None,
num_cb: int = 10,
torrent: bool = False,
version_id: Incomplete | None = None,
override_num_retries: Incomplete | None = None,
response_headers: dict[str, str] | None = None,
): ...
def get_torrent_file(
self, fp, headers: dict[str, str] | None = None, cb: Callable[[int, int], object] | None = None, num_cb: int = 10
): ...
def get_contents_to_file(
self,
fp,
headers: dict[str, str] | None = None,
cb: Callable[[int, int], object] | None = None,
num_cb: int = 10,
torrent: bool = False,
version_id: Incomplete | None = None,
res_download_handler: Incomplete | None = None,
response_headers: dict[str, str] | None = None,
): ...
def get_contents_to_filename(
self,
filename,
headers: dict[str, str] | None = None,
cb: Callable[[int, int], object] | None = None,
num_cb: int = 10,
torrent: bool = False,
version_id: Incomplete | None = None,
res_download_handler: Incomplete | None = None,
response_headers: dict[str, str] | None = None,
): ...
@overload
def get_contents_as_string(
self,
headers: dict[str, str] | None = None,
cb: Callable[[int, int], object] | None = None,
num_cb: int = 10,
torrent: bool = False,
version_id: Incomplete | None = None,
response_headers: dict[str, str] | None = None,
encoding: None = None,
) -> bytes: ...
@overload
def get_contents_as_string(
self,
headers: dict[str, str] | None = None,
cb: Callable[[int, int], object] | None = None,
num_cb: int = 10,
torrent: bool = False,
version_id: Incomplete | None = None,
response_headers: dict[str, str] | None = None,
*,
encoding: str,
) -> str: ...
def add_email_grant(self, permission, email_address, headers: dict[str, str] | None = None): ...
def add_user_grant(
self, permission, user_id, headers: dict[str, str] | None = None, display_name: Incomplete | None = None
): ...
def set_remote_metadata(self, metadata_plus, metadata_minus, preserve_acl, headers: dict[str, str] | None = None): ...
def restore(self, days, headers: dict[str, str] | None = None): ...

View File

@@ -1,29 +0,0 @@
from typing import Any
class KeyFile:
key: Any
location: int
closed: bool
softspace: int
mode: str
encoding: str
errors: str
newlines: str
name: Any
def __init__(self, key) -> None: ...
def tell(self): ...
def seek(self, pos, whence: Any = 0): ...
def read(self, size): ...
def close(self): ...
def isatty(self): ...
def getkey(self): ...
def write(self, buf): ...
def fileno(self): ...
def flush(self): ...
def next(self): ...
def readinto(self): ...
def readline(self): ...
def readlines(self): ...
def truncate(self): ...
def writelines(self): ...
def xreadlines(self): ...

View File

@@ -1,70 +0,0 @@
from _typeshed import Incomplete
from typing import Any
class Rule:
id: Any
prefix: Any
status: Any
expiration: Any
transition: Any
def __init__(
self,
id: Incomplete | None = None,
prefix: Incomplete | None = None,
status: Incomplete | None = None,
expiration: Incomplete | None = None,
transition: Incomplete | None = None,
) -> None: ...
def startElement(self, name, attrs, connection): ...
def endElement(self, name, value, connection): ...
def to_xml(self): ...
class Expiration:
days: Any
date: Any
def __init__(self, days: Incomplete | None = None, date: Incomplete | None = None) -> None: ...
def startElement(self, name, attrs, connection): ...
def endElement(self, name, value, connection): ...
def to_xml(self): ...
class Transition:
days: Any
date: Any
storage_class: Any
def __init__(
self, days: Incomplete | None = None, date: Incomplete | None = None, storage_class: Incomplete | None = None
) -> None: ...
def to_xml(self): ...
class Transitions(list[Transition]):
transition_properties: int
current_transition_property: int
temp_days: Any
temp_date: Any
temp_storage_class: Any
def __init__(self) -> None: ...
def startElement(self, name, attrs, connection): ...
def endElement(self, name, value, connection): ...
def to_xml(self): ...
def add_transition(
self, days: Incomplete | None = None, date: Incomplete | None = None, storage_class: Incomplete | None = None
): ...
@property
def days(self): ...
@property
def date(self): ...
@property
def storage_class(self): ...
class Lifecycle(list[Rule]):
def startElement(self, name, attrs, connection): ...
def endElement(self, name, value, connection): ...
def to_xml(self): ...
def add_rule(
self,
id: Incomplete | None = None,
prefix: str = "",
status: str = "Enabled",
expiration: Incomplete | None = None,
transition: Incomplete | None = None,
): ...

View File

@@ -1,40 +0,0 @@
from _typeshed import Incomplete
from typing import Any
class Deleted:
key: Any
version_id: Any
delete_marker: Any
delete_marker_version_id: Any
def __init__(
self,
key: Incomplete | None = None,
version_id: Incomplete | None = None,
delete_marker: bool = False,
delete_marker_version_id: Incomplete | None = None,
) -> None: ...
def startElement(self, name, attrs, connection): ...
def endElement(self, name, value, connection): ...
class Error:
key: Any
version_id: Any
code: Any
message: Any
def __init__(
self,
key: Incomplete | None = None,
version_id: Incomplete | None = None,
code: Incomplete | None = None,
message: Incomplete | None = None,
) -> None: ...
def startElement(self, name, attrs, connection): ...
def endElement(self, name, value, connection): ...
class MultiDeleteResult:
bucket: Any
deleted: Any
errors: Any
def __init__(self, bucket: Incomplete | None = None) -> None: ...
def startElement(self, name, attrs, connection): ...
def endElement(self, name, value, connection): ...

View File

@@ -1,74 +0,0 @@
from _typeshed import Incomplete
from typing import Any
class CompleteMultiPartUpload:
bucket: Any
location: Any
bucket_name: Any
key_name: Any
etag: Any
version_id: Any
encrypted: Any
def __init__(self, bucket: Incomplete | None = None) -> None: ...
def startElement(self, name, attrs, connection): ...
def endElement(self, name, value, connection): ...
class Part:
bucket: Any
part_number: Any
last_modified: Any
etag: Any
size: Any
def __init__(self, bucket: Incomplete | None = None) -> None: ...
def startElement(self, name, attrs, connection): ...
def endElement(self, name, value, connection): ...
def part_lister(mpupload, part_number_marker: Incomplete | None = None): ...
class MultiPartUpload:
bucket: Any
bucket_name: Any
key_name: Any
id: Any
initiator: Any
owner: Any
storage_class: Any
initiated: Any
part_number_marker: Any
next_part_number_marker: Any
max_parts: Any
is_truncated: bool
def __init__(self, bucket: Incomplete | None = None) -> None: ...
def __iter__(self): ...
def to_xml(self): ...
def startElement(self, name, attrs, connection): ...
def endElement(self, name, value, connection): ...
def get_all_parts(
self,
max_parts: Incomplete | None = None,
part_number_marker: Incomplete | None = None,
encoding_type: Incomplete | None = None,
): ...
def upload_part_from_file(
self,
fp,
part_num,
headers: Incomplete | None = None,
replace: bool = True,
cb: Incomplete | None = None,
num_cb: int = 10,
md5: Incomplete | None = None,
size: Incomplete | None = None,
): ...
def copy_part_from_key(
self,
src_bucket_name,
src_key_name,
part_num,
start: Incomplete | None = None,
end: Incomplete | None = None,
src_version_id: Incomplete | None = None,
headers: Incomplete | None = None,
): ...
def complete_upload(self): ...
def cancel_upload(self): ...

View File

@@ -1,11 +0,0 @@
from _typeshed import Incomplete
from typing import Any
class Prefix:
bucket: Any
name: Any
def __init__(self, bucket: Incomplete | None = None, name: Incomplete | None = None) -> None: ...
def startElement(self, name, attrs, connection): ...
def endElement(self, name, value, connection): ...
@property
def provider(self): ...

View File

@@ -1,23 +0,0 @@
from _typeshed import Incomplete
from typing import Any
class Tag:
key: Any
value: Any
def __init__(self, key: Incomplete | None = None, value: Incomplete | None = None) -> None: ...
def startElement(self, name, attrs, connection): ...
def endElement(self, name, value, connection): ...
def to_xml(self): ...
def __eq__(self, other): ...
class TagSet(list[Tag]):
def startElement(self, name, attrs, connection): ...
def endElement(self, name, value, connection): ...
def add_tag(self, key, value): ...
def to_xml(self): ...
class Tags(list[TagSet]):
def startElement(self, name, attrs, connection): ...
def endElement(self, name, value, connection): ...
def to_xml(self): ...
def add_tag_set(self, tag_set): ...

View File

@@ -1,11 +0,0 @@
from _typeshed import Incomplete
from typing import Any
class User:
type: Any
id: Any
display_name: Any
def __init__(self, parent: Incomplete | None = None, id: str = "", display_name: str = "") -> None: ...
def startElement(self, name, attrs, connection): ...
def endElement(self, name, value, connection): ...
def to_xml(self, element_name: str = "Owner"): ...

View File

@@ -1,83 +0,0 @@
from _typeshed import Incomplete
from typing import Any
def tag(key, value): ...
class WebsiteConfiguration:
suffix: Any
error_key: Any
redirect_all_requests_to: Any
routing_rules: Any
def __init__(
self,
suffix: Incomplete | None = None,
error_key: Incomplete | None = None,
redirect_all_requests_to: Incomplete | None = None,
routing_rules: Incomplete | None = None,
) -> None: ...
def startElement(self, name, attrs, connection): ...
def endElement(self, name, value, connection): ...
def to_xml(self): ...
class _XMLKeyValue:
translator: Any
container: Any
def __init__(self, translator, container: Incomplete | None = None) -> None: ...
def startElement(self, name, attrs, connection): ...
def endElement(self, name, value, connection): ...
def to_xml(self): ...
class RedirectLocation(_XMLKeyValue):
TRANSLATOR: Any
hostname: Any
protocol: Any
def __init__(self, hostname: Incomplete | None = None, protocol: Incomplete | None = None) -> None: ...
def to_xml(self): ...
class RoutingRules(list[RoutingRule]):
def add_rule(self, rule: RoutingRule) -> RoutingRules: ...
def startElement(self, name, attrs, connection): ...
def endElement(self, name, value, connection): ...
def to_xml(self): ...
class RoutingRule:
condition: Any
redirect: Any
def __init__(self, condition: Incomplete | None = None, redirect: Incomplete | None = None) -> None: ...
def startElement(self, name, attrs, connection): ...
def endElement(self, name, value, connection): ...
def to_xml(self): ...
@classmethod
def when(cls, key_prefix: Incomplete | None = None, http_error_code: Incomplete | None = None): ...
def then_redirect(
self,
hostname: Incomplete | None = None,
protocol: Incomplete | None = None,
replace_key: Incomplete | None = None,
replace_key_prefix: Incomplete | None = None,
http_redirect_code: Incomplete | None = None,
): ...
class Condition(_XMLKeyValue):
TRANSLATOR: Any
key_prefix: Any
http_error_code: Any
def __init__(self, key_prefix: Incomplete | None = None, http_error_code: Incomplete | None = None) -> None: ...
def to_xml(self): ...
class Redirect(_XMLKeyValue):
TRANSLATOR: Any
hostname: Any
protocol: Any
replace_key: Any
replace_key_prefix: Any
http_redirect_code: Any
def __init__(
self,
hostname: Incomplete | None = None,
protocol: Incomplete | None = None,
replace_key: Incomplete | None = None,
replace_key_prefix: Incomplete | None = None,
http_redirect_code: Incomplete | None = None,
) -> None: ...
def to_xml(self): ...

View File

@@ -1,147 +0,0 @@
import datetime
import io
import logging.handlers
import subprocess
import time
from _typeshed import StrOrBytesPath
from collections.abc import Callable, Iterable, Mapping, Sequence
from contextlib import AbstractContextManager
from email.message import Message
from hashlib import _Hash
from typing import IO, Any, TypeVar
from typing_extensions import TypeAlias
import boto.connection
_KT = TypeVar("_KT")
_VT = TypeVar("_VT")
_Provider: TypeAlias = Any # TODO replace this with boto.provider.Provider once stubs exist
_LockType: TypeAlias = Any # TODO replace this with _thread.LockType once stubs exist
JSONDecodeError: type[ValueError]
qsa_of_interest: list[str]
def unquote_v(nv: str) -> str | tuple[str, str]: ...
def canonical_string(
method: str, path: str, headers: Mapping[str, str | None], expires: int | None = None, provider: _Provider | None = None
) -> str: ...
def merge_meta(
headers: Mapping[str, str], metadata: Mapping[str, str], provider: _Provider | None = None
) -> Mapping[str, str]: ...
def get_aws_metadata(headers: Mapping[str, str], provider: _Provider | None = None) -> Mapping[str, str]: ...
def retry_url(url: str, retry_on_404: bool = True, num_retries: int = 10, timeout: int | None = None) -> str: ...
class LazyLoadMetadata(dict[_KT, _VT]):
def __init__(self, url: str, num_retries: int, timeout: int | None = None) -> None: ...
def get_instance_metadata(
version: str = "latest",
url: str = "http://169.254.169.254",
data: str = "meta-data/",
timeout: int | None = None,
num_retries: int = 5,
) -> LazyLoadMetadata[Any, Any] | None: ...
def get_instance_identity(
version: str = "latest", url: str = "http://169.254.169.254", timeout: int | None = None, num_retries: int = 5
) -> Mapping[str, Any] | None: ...
def get_instance_userdata(
version: str = "latest",
sep: str | None = None,
url: str = "http://169.254.169.254",
timeout: int | None = None,
num_retries: int = 5,
) -> Mapping[str, str]: ...
ISO8601: str
ISO8601_MS: str
RFC1123: str
LOCALE_LOCK: _LockType
def setlocale(name: str | tuple[str, str]) -> AbstractContextManager[str]: ...
def get_ts(ts: time.struct_time | None = None) -> str: ...
def parse_ts(ts: str) -> datetime.datetime: ...
def find_class(module_name: str, class_name: str | None = None) -> type[Any] | None: ...
def update_dme(username: str, password: str, dme_id: str, ip_address: str) -> str: ...
def fetch_file(
uri: str, file: IO[str] | None = None, username: str | None = None, password: str | None = None
) -> IO[str] | None: ...
class ShellCommand:
exit_code: int
command: subprocess._CMD
log_fp: io.StringIO
wait: bool
fail_fast: bool
def __init__(
self, command: subprocess._CMD, wait: bool = True, fail_fast: bool = False, cwd: StrOrBytesPath | None = None
) -> None: ...
process: subprocess.Popen[Any]
def run(self, cwd: subprocess._CMD | None = None) -> int | None: ...
def setReadOnly(self, value) -> None: ...
def getStatus(self) -> int | None: ...
status: int | None
def getOutput(self) -> str: ...
output: str
class AuthSMTPHandler(logging.handlers.SMTPHandler):
username: str
password: str
def __init__(
self, mailhost: str, username: str, password: str, fromaddr: str, toaddrs: Sequence[str], subject: str
) -> None: ...
class LRUCache(dict[_KT, _VT]):
class _Item:
previous: LRUCache._Item | None
next: LRUCache._Item | None
key = ...
value = ...
def __init__(self, key, value) -> None: ...
_dict: dict[_KT, LRUCache._Item]
capacity: int
head: LRUCache._Item | None
tail: LRUCache._Item | None
def __init__(self, capacity: int) -> None: ...
# This exists to work around Password.str's name shadowing the str type
_Str: TypeAlias = str
class Password:
hashfunc: Callable[[bytes], _Hash]
str: _Str | None
def __init__(self, str: _Str | None = None, hashfunc: Callable[[bytes], _Hash] | None = None) -> None: ...
def set(self, value: bytes | _Str) -> None: ...
def __eq__(self, other: _Str | bytes | None) -> bool: ... # type: ignore[override]
def __len__(self) -> int: ...
def notify(
subject: str,
body: str | None = None,
html_body: Sequence[str] | str | None = None,
to_string: str | None = None,
attachments: Iterable[Message] | None = None,
append_instance_id: bool = True,
) -> None: ...
def get_utf8_value(value: str) -> bytes: ...
def mklist(value: Any) -> list[Any]: ...
def pythonize_name(name: str) -> str: ...
def write_mime_multipart(
content: list[tuple[str, str]], compress: bool = False, deftype: str = "text/plain", delimiter: str = ":"
) -> str: ...
def guess_mime_type(content: str, deftype: str) -> str: ...
def compute_md5(fp: IO[Any], buf_size: int = 8192, size: int | None = None) -> tuple[str, str, int]: ...
def compute_hash(
fp: IO[Any], buf_size: int = 8192, size: int | None = None, hash_algorithm: Any = ...
) -> tuple[str, str, int]: ...
def find_matching_headers(name: str, headers: Mapping[str, str | None]) -> list[str]: ...
def merge_headers_by_name(name: str, headers: Mapping[str, str | None]) -> str: ...
class RequestHook:
def handle_request_data(
self, request: boto.connection.HTTPRequest, response: boto.connection.HTTPResponse, error: bool = False
) -> Any: ...
def host_is_ipv6(hostname: str) -> bool: ...
def parse_host(hostname: str) -> str: ...