Add core tensorflow.data stubs (#10122)

Co-authored-by: Mehdi Drissi <mdrissi@snapchat.com>
Co-authored-by: Alex Waygood <Alex.Waygood@Gmail.com>
Co-authored-by: Jelle Zijlstra <jelle.zijlstra@gmail.com>
This commit is contained in:
Mehdi Drissi
2023-05-22 08:00:27 -07:00
committed by GitHub
parent 31f4b8cf80
commit 510547ef3c
7 changed files with 380 additions and 4 deletions

View File

@@ -18,9 +18,13 @@ tensorflow.Variable.__getattr__
tensorflow.keras.layers.Layer.__getattr__
tensorflow.python.feature_column.feature_column_v2.SharedEmbeddingColumnCreator.__getattr__
tensorflow.GradientTape.__getattr__
tensorflow.data.Dataset.__getattr__
tensorflow.experimental.Optional.__getattr__
# Internal undocumented API
tensorflow.RaggedTensor.__init__
tensorflow.data.Dataset.__init__
# Has an undocumented extra argument that tf.Variable which acts like subclass
# (by dynamically patching tf.Tensor methods) does not preserve.
tensorflow.Tensor.__getitem__

View File

@@ -1,16 +1,26 @@
from _typeshed import Incomplete, Unused
from abc import ABCMeta
from abc import ABC, ABCMeta, abstractmethod
from builtins import bool as _bool
from collections.abc import Callable, Generator, Iterable, Iterator, Mapping, Sequence
from contextlib import contextmanager
from enum import Enum
from types import TracebackType
from typing import Any, NoReturn, TypeVar, overload
from typing import Any, Generic, NoReturn, TypeVar, overload
from typing_extensions import ParamSpec, Self, TypeAlias
import numpy
from tensorflow import feature_column as feature_column, initializers as initializers, io as io, keras as keras, math as math
from google.protobuf.message import Message
from tensorflow import (
data as data,
experimental as experimental,
feature_column as feature_column,
initializers as initializers,
io as io,
keras as keras,
math as math,
)
from tensorflow._aliases import ContainerGradients, ContainerTensors, ContainerTensorsLike, Gradients, TensorLike
from tensorflow.core.protobuf import struct_pb2
# Explicit import of DType is covered by the wildcard, but
# is necessary to avoid a crash in pytype.
@@ -332,4 +342,66 @@ class GradientTape:
def watched_variables(self) -> tuple[Variable, ...]: ...
def __getattr__(self, name: str) -> Incomplete: ...
_SpecProto = TypeVar("_SpecProto", bound=Message)
class TypeSpec(Generic[_SpecProto], ABC):
@property
@abstractmethod
def value_type(self) -> Any: ...
def experimental_as_proto(self) -> _SpecProto: ...
@classmethod
def experimental_from_proto(cls, proto: _SpecProto) -> Self: ...
@classmethod
def experimental_type_proto(cls) -> type[_SpecProto]: ...
def is_compatible_with(self, spec_or_value: Self | _TensorCompatible | SparseTensor | RaggedTensor) -> _bool: ...
# Incomplete as tf.types is not yet covered.
def is_subtype_of(self, other: Incomplete) -> _bool: ...
def most_specific_common_supertype(self, others: Sequence[Incomplete]) -> Self | None: ...
def most_specific_compatible_type(self, other: Self) -> Self: ...
class TensorSpec(TypeSpec[struct_pb2.TensorSpecProto]):
def __init__(self, shape: _ShapeLike, dtype: _DTypeLike = ..., name: str | None = None) -> None: ...
@property
def value_type(self) -> Tensor: ...
@property
def shape(self) -> TensorShape: ...
@property
def dtype(self) -> DType: ...
@property
def name(self) -> str | None: ...
@classmethod
def from_spec(cls, spec: TypeSpec[Any], name: str | None = None) -> Self: ...
@classmethod
def from_tensor(cls, tensor: Tensor, name: str | None = None) -> Self: ...
def is_compatible_with(self, spec_or_tensor: Self | _TensorCompatible) -> _bool: ... # type: ignore[override]
class SparseTensorSpec(TypeSpec[struct_pb2.TypeSpecProto]):
def __init__(self, shape: _ShapeLike | None = None, dtype: _DTypeLike = ...) -> None: ...
@property
def value_type(self) -> SparseTensor: ...
@property
def shape(self) -> TensorShape: ...
@property
def dtype(self) -> DType: ...
@classmethod
def from_value(cls, value: SparseTensor) -> Self: ...
class RaggedTensorSpec(TypeSpec[struct_pb2.TypeSpecProto]):
def __init__(
self,
shape: _ShapeLike | None = None,
dtype: _DTypeLike = ...,
ragged_rank: int | None = None,
row_splits_dtype: _DTypeLike = ...,
flat_values_spec: TypeSpec[Any] | None = None,
) -> None: ...
@property
def value_type(self) -> RaggedTensor: ...
@property
def shape(self) -> TensorShape: ...
@property
def dtype(self) -> DType: ...
@classmethod
def from_value(cls, value: RaggedTensor) -> Self: ...
def __getattr__(name: str) -> Incomplete: ...

View File

@@ -0,0 +1,257 @@
from _typeshed import Incomplete
from abc import ABC, abstractmethod
from collections.abc import Callable, Iterator as _Iterator, Sequence
from typing import Any, Generic, TypeVar, overload
from typing_extensions import Self
import numpy as np
import tensorflow as tf
from tensorflow import TypeSpec, _ScalarTensorCompatible, _TensorCompatible
from tensorflow._aliases import ContainerGeneric
from tensorflow.data import experimental as experimental
from tensorflow.data.experimental import AUTOTUNE as AUTOTUNE
from tensorflow.dtypes import DType
from tensorflow.io import _CompressionTypes
from tensorflow.python.trackable.base import Trackable
_T1 = TypeVar("_T1", covariant=True)
_T2 = TypeVar("_T2")
_T3 = TypeVar("_T3")
class Iterator(_Iterator[_T1], Trackable, ABC):
@property
@abstractmethod
def element_spec(self) -> ContainerGeneric[TypeSpec[Any]]: ...
@abstractmethod
def get_next(self) -> _T1: ...
@abstractmethod
def get_next_as_optional(self) -> tf.experimental.Optional[_T1]: ...
class Dataset(Generic[_T1], ABC):
def apply(self, transformation_func: Callable[[Dataset[_T1]], Dataset[_T2]]) -> Dataset[_T2]: ...
def as_numpy_iterator(self) -> Iterator[np.ndarray[Any, Any]]: ...
def batch(
self,
batch_size: _ScalarTensorCompatible,
drop_remainder: bool = False,
num_parallel_calls: int | None = None,
deterministic: bool | None = None,
name: str | None = None,
) -> Dataset[_T1]: ...
def bucket_by_sequence_length(
self,
element_length_func: Callable[[_T1], _ScalarTensorCompatible],
bucket_boundaries: Sequence[int],
bucket_batch_sizes: Sequence[int],
padded_shapes: ContainerGeneric[tf.TensorShape | _TensorCompatible] | None = None,
padding_values: ContainerGeneric[_ScalarTensorCompatible] | None = None,
pad_to_bucket_boundary: bool = False,
no_padding: bool = False,
drop_remainder: bool = False,
name: str | None = None,
) -> Dataset[_T1]: ...
def cache(self, filename: str = "", name: str | None = None) -> Dataset[_T1]: ...
def cardinality(self) -> int: ...
@staticmethod
def choose_from_datasets(
datasets: Sequence[Dataset[_T2]], choice_dataset: Dataset[tf.Tensor], stop_on_empty_dataset: bool = True
) -> Dataset[_T2]: ...
def concatenate(self, dataset: Dataset[_T1], name: str | None = None) -> Dataset[_T1]: ...
@staticmethod
def counter(
start: _ScalarTensorCompatible = 0, step: _ScalarTensorCompatible = 1, dtype: DType = ..., name: str | None = None
) -> Dataset[tf.Tensor]: ...
@property
@abstractmethod
def element_spec(self) -> ContainerGeneric[TypeSpec[Any]]: ...
def enumerate(self, start: _ScalarTensorCompatible = 0, name: str | None = None) -> Dataset[tuple[int, _T1]]: ...
def filter(self, predicate: Callable[[_T1], bool | tf.Tensor], name: str | None = None) -> Dataset[_T1]: ...
def flat_map(self, map_func: Callable[[_T1], Dataset[_T2]], name: str | None = None) -> Dataset[_T2]: ...
# PEP 646 can be used here for a more precise type when better supported.
@staticmethod
def from_generator(
generator: Callable[..., _T2],
output_types: ContainerGeneric[DType] | None = None,
output_shapes: ContainerGeneric[tf.TensorShape | Sequence[int | None]] | None = None,
args: tuple[object, ...] | None = None,
output_signature: ContainerGeneric[TypeSpec[Any]] | None = None,
name: str | None = None,
) -> Dataset[_T2]: ...
@staticmethod
def from_tensors(tensors: Any, name: str | None = None) -> Dataset[Any]: ...
@staticmethod
def from_tensor_slices(tensors: _TensorCompatible, name: str | None = None) -> Dataset[Any]: ...
def get_single_element(self, name: str | None = None) -> _T1: ...
def group_by_window(
self,
key_func: Callable[[_T1], tf.Tensor],
reduce_func: Callable[[tf.Tensor, Dataset[_T1]], Dataset[_T2]],
window_size: _ScalarTensorCompatible | None = None,
window_size_func: Callable[[tf.Tensor], tf.Tensor] | None = None,
name: str | None = None,
) -> Dataset[_T2]: ...
def ignore_errors(self, log_warning: bool = False, name: str | None = None) -> Dataset[_T1]: ...
def interleave(
self,
map_func: Callable[[_T1], Dataset[_T2]],
cycle_length: int | None = None,
block_length: int | None = None,
num_parallel_calls: int | None = None,
deterministic: bool | None = None,
name: str | None = None,
) -> Dataset[_T2]: ...
def __iter__(self) -> Iterator[_T1]: ...
@staticmethod
def list_files(
file_pattern: str | Sequence[str] | _TensorCompatible,
shuffle: bool | None = None,
seed: int | None = None,
name: str | None = None,
) -> Dataset[str]: ...
@staticmethod
def load(
path: str,
element_spec: ContainerGeneric[tf.TypeSpec[Any]] | None = None,
compression: _CompressionTypes = None,
reader_func: Callable[[Dataset[Dataset[Any]]], Dataset[Any]] | None = None,
) -> Dataset[Any]: ...
# PEP 646 could be used here for a more precise type when better supported.
def map(
self,
map_func: Callable[..., _T2],
num_parallel_calls: int | None = None,
deterministic: None | bool = None,
name: str | None = None,
) -> Dataset[_T2]: ...
def options(self) -> Options: ...
def padded_batch(
self,
batch_size: _ScalarTensorCompatible,
padded_shapes: ContainerGeneric[tf.TensorShape | _TensorCompatible] | None = None,
padding_values: ContainerGeneric[_ScalarTensorCompatible] | None = None,
drop_remainder: bool = False,
name: str | None = None,
) -> Dataset[_T1]: ...
def prefetch(self, buffer_size: _ScalarTensorCompatible, name: str | None = None) -> Dataset[_T1]: ...
def ragged_batch(
self,
batch_size: _ScalarTensorCompatible,
drop_remainder: bool = False,
row_splits_dtype: DType = ...,
name: str | None = None,
) -> Dataset[tf.RaggedTensor]: ...
@staticmethod
def random(
seed: int | None = None, rerandomize_each_iteration: bool | None = None, name: str | None = None
) -> Dataset[tf.Tensor]: ...
@staticmethod
@overload
def range(__stop: _ScalarTensorCompatible, output_type: DType = ..., name: str | None = None) -> Dataset[tf.Tensor]: ...
@staticmethod
@overload
def range(
__start: _ScalarTensorCompatible,
__stop: _ScalarTensorCompatible,
__step: _ScalarTensorCompatible = 1,
output_type: DType = ...,
name: str | None = None,
) -> Dataset[tf.Tensor]: ...
def rebatch(
self, batch_size: _ScalarTensorCompatible, drop_remainder: bool = False, name: str | None = None
) -> Dataset[_T1]: ...
def reduce(self, initial_state: _T2, reduce_func: Callable[[_T2, _T1], _T2], name: str | None = None) -> _T2: ...
def rejection_resample(
self,
class_func: Callable[[_T1], _ScalarTensorCompatible],
target_dist: _TensorCompatible,
initial_dist: _TensorCompatible | None = None,
seed: int | None = None,
name: str | None = None,
) -> Dataset[_T1]: ...
def repeat(self, count: _ScalarTensorCompatible | None = None, name: str | None = None) -> Dataset[_T1]: ...
@staticmethod
def sample_from_datasets(
datasets: Sequence[Dataset[_T1]],
weights: _TensorCompatible | None = None,
seed: int | None = None,
stop_on_empty_dataset: bool = False,
rerandomize_each_iteration: bool | None = None,
) -> Dataset[_T1]: ...
# Incomplete as tf.train.CheckpointOptions not yet covered.
def save(
self,
path: str,
compression: _CompressionTypes = None,
shard_func: Callable[[_T1], int] | None = None,
checkpoint_args: Incomplete | None = None,
) -> None: ...
def scan(
self, initial_state: _T2, scan_func: Callable[[_T2, _T1], tuple[_T2, _T3]], name: str | None = None
) -> Dataset[_T3]: ...
def shard(
self, num_shards: _ScalarTensorCompatible, index: _ScalarTensorCompatible, name: str | None = None
) -> Dataset[_T1]: ...
def shuffle(
self,
buffer_size: _ScalarTensorCompatible,
seed: int | None = None,
reshuffle_each_iteration: bool | None = None,
name: str | None = None,
) -> Dataset[_T1]: ...
def skip(self, count: _ScalarTensorCompatible, name: str | None = None) -> Dataset[_T1]: ...
def snapshot(
self,
path: str,
compression: _CompressionTypes = "AUTO",
reader_func: Callable[[Dataset[Dataset[_T1]]], Dataset[_T1]] | None = None,
shard_func: Callable[[_T1], _ScalarTensorCompatible] | None = None,
name: str | None = None,
) -> Dataset[_T1]: ...
def sparse_batch(
self, batch_size: _ScalarTensorCompatible, row_shape: tf.TensorShape | _TensorCompatible, name: str | None = None
) -> Dataset[tf.SparseTensor]: ...
def take(self, count: _ScalarTensorCompatible, name: str | None = None) -> Dataset[_T1]: ...
def take_while(self, predicate: Callable[[_T1], _ScalarTensorCompatible], name: str | None = None) -> Dataset[_T1]: ...
def unbatch(self, name: str | None = None) -> Dataset[_T1]: ...
def unique(self, name: str | None = None) -> Dataset[_T1]: ...
def window(
self,
size: _ScalarTensorCompatible,
shift: _ScalarTensorCompatible | None = None,
stride: _ScalarTensorCompatible = 1,
drop_remainder: bool = False,
name: str | None = None,
) -> Dataset[Dataset[_T1]]: ...
def with_options(self, options: Options, name: str | None = None) -> Dataset[_T1]: ...
@staticmethod
def zip(datasets: tuple[Dataset[_T2], Dataset[_T3]], name: str | None = None) -> Dataset[tuple[_T2, _T3]]: ...
def __len__(self) -> int: ...
def __nonzero__(self) -> bool: ...
def __getattr__(self, name: str) -> Incomplete: ...
class Options:
autotune: Incomplete
deterministic: bool
experimental_deterministic: bool
experimental_distribute: Incomplete
experimental_external_state_policy: Incomplete
experimental_optimization: Incomplete
experimental_slack: bool
experimental_symbolic_checkpoint: bool
experimental_threading: Incomplete
threading: Incomplete
def merge(self, options: Options) -> Self: ...
class TFRecordDataset(Dataset[tf.Tensor]):
def __init__(
self,
filenames: _TensorCompatible | Dataset[str],
compression_type: _CompressionTypes = None,
buffer_size: int | None = None,
num_parallel_reads: int | None = None,
name: str | None = None,
) -> None: ...
@property
def element_spec(self) -> tf.TensorSpec: ...
def __getattr__(name: str) -> Incomplete: ...

View File

@@ -0,0 +1,33 @@
from _typeshed import Incomplete
from collections.abc import Callable, Sequence
from typing import TypeVar
from typing_extensions import Final
from tensorflow import Tensor, _TensorCompatible
from tensorflow.data import Dataset
AUTOTUNE: Final = -1
INFINITE_CARDINALITY: Final = -1
SHARD_HINT: Final = -1
UNKNOWN_CARDINALITY: Final = -2
_T1 = TypeVar("_T1")
_T2 = TypeVar("_T2")
def parallel_interleave(
map_func: Callable[[_T1], Dataset[_T2]],
cycle_length: int,
block_length: int = 1,
sloppy: bool | None = False,
buffer_output_elements: int | None = None,
prefetch_input_elements: int | None = None,
) -> Callable[[Dataset[_T1]], Dataset[_T2]]: ...
def enable_debug_mode() -> None: ...
def cardinality(dataset: Dataset[object]) -> Tensor: ...
def sample_from_datasets(
datasets: Sequence[Dataset[_T1]],
weights: _TensorCompatible | None = None,
seed: int | None = None,
stop_on_empty_dataset: bool = False,
) -> Dataset[_T1]: ...
def __getattr__(name: str) -> Incomplete: ...

View File

@@ -0,0 +1,10 @@
from _typeshed import Incomplete
from abc import ABC
from typing import Generic, TypeVar
_T_co = TypeVar("_T_co", covariant=True)
class Optional(Generic[_T_co], ABC):
def __getattr__(self, name: str) -> Incomplete: ...
def __getattr__(name: str) -> Incomplete: ...

View File

@@ -10,7 +10,7 @@ from tensorflow.io import gfile as gfile
_FeatureSpecs: TypeAlias = Mapping[str, FixedLenFeature | FixedLenSequenceFeature | VarLenFeature | RaggedFeature | SparseFeature]
_CompressionTypes: TypeAlias = Literal["ZLIB", "GZIP", "", 0, 1, 2] | None
_CompressionTypes: TypeAlias = Literal["ZLIB", "GZIP", "AUTO", "", 0, 1, 2] | None
_CompressionLevels: TypeAlias = Literal[0, 1, 2, 3, 4, 5, 6, 7, 8, 9] | None
_MemoryLevels: TypeAlias = Literal[1, 2, 3, 4, 5, 6, 7, 8, 9] | None