0x523 Dataset
- 1. Serialization
- 1.1. Row Format
- 1.2. Column Format
- 2. Tensorflow Datasets
- 2.1. tf.data
- 2.2. tfds
- 3. Torch Datasets
1. Serialization
1.1. Row Format
1.1.1. ProtoBuffer
A single datapoint can be represented using tf.train.Example
proto, which serializes a single \((x,y)\) datapoint into binary format.
An Example
essentially represent the following structure:
Dict[str,
Union[List[bytes],
List[int64],
List[float]]]
where it maps feature name (string key) to corresponding value tf.train.Feature
which can be a list of int, float, byte. The proto def can be seen here
message Example {
Features features = 1;
}
message Features {
// Map from feature name to feature.
map<string, Feature> feature = 1;
}
// Containers for non-sequential data.
message Feature {
// Each feature can be exactly one kind.
oneof kind {
BytesList bytes_list = 1;
FloatList float_list = 2;
Int64List int64_list = 3;
}
}
message BytesList {
repeated bytes value = 1;
}
This section lists a few Example related conversion
To convert native/numpy types into tf.train.Feature
, we can use the following snippet
# native to feature
def _bytes_feature(value):
"""Returns a bytes_list from a string / byte."""
if isinstance(value, type(tf.constant(0))):
value = value.numpy() # BytesList won't unpack a string from an EagerTensor.
return tf.train.Feature(bytes_list=tf.train.BytesList(value=[value]))
def _float_feature(value):
"""Returns a float_list from a float / double."""
return tf.train.Feature(float_list=tf.train.FloatList(value=[value]))
def _int64_feature(value):
"""Returns an int64_list from a bool / enum / int / uint."""
return tf.train.Feature(int64_list=tf.train.Int64List(value=[value]))
feature = _int64_feature(1)
# feature to native
feature.int64_list.value
feature = {
'feature0': _int64_feature(feature0),
'feature1': _int64_feature(feature1),
'feature2': _bytes_feature(feature2),
'feature3': _float_feature(feature3),
}
# Create a Features message using tf.train.Example.
example_proto = tf.train.Example(features=tf.train.Features(feature=feature))
1.1.2. TFRecord
TFRecord contains a sequence of records (e.g. tf.train.Example). The file can only be read sequentially.
1.1.3. tf.io
Each special byte sequence should be using its own encoder/decoder, tf.io
package provides those features
tensor conversion to convert tensor type into tf.train.Feature
, we can do the following
# convert to bytelist tensor
t = tf.constant(1)
serialized_tensor = tf.io.serialize_tensor(t)
feature_of_bytes = tf.train.Feature(
bytes_list=tf.train.BytesList(value=[serialized_tensor.numpy()]))
# convert back, type needed to be provided and matched
tf.io.parse_tensor(serialized_tensor, tf.int32)
example conversion
serialized = tf.train.Example.FromString(b'')
res = tf.io.parse_single_example(serialized, {'audio_samples': tf.io.VarLenFeature(tf.float32)})
1.2. Column Format
1.2.1. Arrow
In-memory data storage
1.2.2. Parquet
disk storage
2. Tensorflow Datasets
2.1. tf.data
tf.data.Dataset
API for input pipelines
2.1.1. Naive Dataset
use from_tensors
for a single datapoint and from_tensor_slices
for multiple datapoints
# tf.data.Dataset.from_tensor_slices((X,y)) or tf.data.Dataset.from_tensor_slices(X)
dataset = tf.data.Dataset.from_tensor_slices(([np.array([1,2,3,4]), np.array([0,0,1,0])], np.array([1,0])))
# <_TensorSliceDataset element_spec=(TensorSpec(shape=(4,), dtype=tf.int64, name=None), TensorSpec(shape=(), dtype=tf.int64, name=None))>
print(dataset)
# (<tf.Tensor: shape=(4,), dtype=int64, numpy=array([1, 2, 3, 4])>, <tf.Tensor: shape=(), dtype=int64, numpy=1>)
#(<tf.Tensor: shape=(4,), dtype=int64, numpy=array([0, 0, 1, 0])>, <tf.Tensor: shape=(), dtype=int64, numpy=0>)
for elem in iter(dataset):
print(tf.data)
use from_generator
to consume a python generator
def count(stop):
i = 0
while i<stop:
yield i
i += 1
ds_counter = tf.data.Dataset.from_generator(count, args=[25], output_types=tf.int32, output_shapes = (), )
2.1.2. Batching
use batch
or padded_batch
def gen_series():
i = 0
while True:
size = np.random.randint(0, 10)
yield i, np.random.normal(size=(size,))
i += 1
ds_series = tf.data.Dataset.from_generator(
gen_series,
output_types=(tf.int32, tf.float32),
output_shapes=((), (None,)))
# batch 2 datapoint with max shape 8
# if each sample is a dict, the 2nd arg padded_shape can take a dict mapping each key to its max length or [] to indicate list without padding
ds_series_batch = ds_series.shuffle(20).padded_batch(2, 8)
_, sequence_batch = next(iter(ds_series_batch))
print(sequence_batch.numpy())
[[ 0.0208 0. 0. 0. 0. 0. 0. 0.],
[-1.2131 0.523 1.083 0.3762 -1.1041 -1.6604 -2.3436, 0 ]]
2.1.3. Tokenize
can use tokenizer in tensorflow_text
tokenizer = text.WhitespaceTokenizer()
dataset = dataset.map(lambda x: tokenizer.tokenize(x))
2.1.4. TFRecordDataset
an Example
proto can be serialized/deserialized into/from string
# convert example to string
string_example = example_proto.SerializeToString()
# convert back
example_proto = tf.train.Example.FromString(serialized_example.numpy())
TFRecord
is a file format storing a sequence of byte sequences, it can be used to store example strings.
# Write the `tf.train.Example` observations to the file.
with tf.io.TFRecordWriter(filename) as writer:
for i in range(n_observations):
example = serialize_example(feature0[i], feature1[i], feature2[i], feature3[i])
writer.write(example.numpy())
# Read
filenames = [filename]
raw_dataset = tf.data.TFRecordDataset(filenames)
for raw_record in raw_dataset.take(1):
example = tf.train.Example()
example.ParseFromString(raw_record.numpy())
print(example)
2.2. tfds
See doc here
tfds provides a ready-to-use datasets
tfds.load
is roughly equivalently doing the following:
builder = tfds.builder('mnist')
# 1. Create the tfrecord files (no-op if already exists)
builder.download_and_prepare()
# 2. Load the `tf.data.Dataset`
ds = builder.as_dataset(split='train', shuffle_files=True)
3. Torch Datasets
Dataset
provides the map style dataset, which can do fast random access by using apache arrow as in-memory column format, which is cached on disk.
# Many raw dataset will be converted into arrow cache during loading
data_files = {"train": ["path/to/data.csv"]}
my_dataset = load_dataset("csv", data_files=data_files, split="train")
# transformation will trigger processing on all data immediately and build new cache
my_dataset = my_dataset.map(process_fn)
IterableDataset
is the iterable dataset, it dynamically load data from disk (without arrow conversion) and transformation are applied on the fly. See this doc
It can be creatd by load_dataset's streaming mode or generator
# streaming mode
imagenet = load_dataset("imagenet-1k", split="train", streaming=True) # will start loading the data when iterated over
for example in imagenet:
print(example)
break
def my_generator(n):
for i in range(n):
yield {"col_1": i}
my_iterable_dataset = IterableDataset.from_generator(my_generator, gen_kwargs={"n": 10})