Skip to content

0x523 Dataset

1. Serialization

1.1. Row Format

1.1.1. ProtoBuffer

A single datapoint can be represented using tf.train.Example proto, which serializes a single \((x,y)\) datapoint into binary format.

An Example essentially represent the following structure:

Dict[str,
     Union[List[bytes],
           List[int64],
           List[float]]]

where it maps feature name (string key) to corresponding value tf.train.Feature which can be a list of int, float, byte. The proto def can be seen here

message Example {
  Features features = 1;
}

message Features {
  // Map from feature name to feature.
  map<string, Feature> feature = 1;
}

// Containers for non-sequential data.
message Feature {
  // Each feature can be exactly one kind.
  oneof kind {
    BytesList bytes_list = 1;
    FloatList float_list = 2;
    Int64List int64_list = 3;
  }
}

message BytesList {
  repeated bytes value = 1;
}

This section lists a few Example related conversion

To convert native/numpy types into tf.train.Feature, we can use the following snippet

# native to feature
def _bytes_feature(value):
  """Returns a bytes_list from a string / byte."""
  if isinstance(value, type(tf.constant(0))):
    value = value.numpy() # BytesList won't unpack a string from an EagerTensor.
  return tf.train.Feature(bytes_list=tf.train.BytesList(value=[value]))

def _float_feature(value):
  """Returns a float_list from a float / double."""
  return tf.train.Feature(float_list=tf.train.FloatList(value=[value]))

def _int64_feature(value):
  """Returns an int64_list from a bool / enum / int / uint."""
  return tf.train.Feature(int64_list=tf.train.Int64List(value=[value]))

feature = _int64_feature(1)

# feature to native
feature.int64_list.value
To convert to and from example is similar

feature = {
    'feature0': _int64_feature(feature0),
    'feature1': _int64_feature(feature1),
    'feature2': _bytes_feature(feature2),
    'feature3': _float_feature(feature3),
}

# Create a Features message using tf.train.Example.
example_proto = tf.train.Example(features=tf.train.Features(feature=feature))

1.1.2. TFRecord

TFRecord contains a sequence of records (e.g. tf.train.Example). The file can only be read sequentially.

1.1.3. tf.io

Each special byte sequence should be using its own encoder/decoder, tf.io package provides those features

tensor conversion to convert tensor type into tf.train.Feature, we can do the following

# convert to bytelist tensor
t = tf.constant(1)
serialized_tensor = tf.io.serialize_tensor(t)

feature_of_bytes = tf.train.Feature(
  bytes_list=tf.train.BytesList(value=[serialized_tensor.numpy()]))

# convert back, type needed to be provided and matched
tf.io.parse_tensor(serialized_tensor, tf.int32)

example conversion

serialized = tf.train.Example.FromString(b'')
res = tf.io.parse_single_example(serialized, {'audio_samples': tf.io.VarLenFeature(tf.float32)})

1.2. Column Format

1.2.1. Arrow

In-memory data storage

1.2.2. Parquet

disk storage

2. Tensorflow Datasets

2.1. tf.data

tf.data.Dataset API for input pipelines

2.1.1. Naive Dataset

use from_tensors for a single datapoint and from_tensor_slices for multiple datapoints

# tf.data.Dataset.from_tensor_slices((X,y)) or tf.data.Dataset.from_tensor_slices(X)
dataset = tf.data.Dataset.from_tensor_slices(([np.array([1,2,3,4]), np.array([0,0,1,0])], np.array([1,0])))

# <_TensorSliceDataset element_spec=(TensorSpec(shape=(4,), dtype=tf.int64, name=None), TensorSpec(shape=(), dtype=tf.int64, name=None))>
print(dataset)


# (<tf.Tensor: shape=(4,), dtype=int64, numpy=array([1, 2, 3, 4])>, <tf.Tensor: shape=(), dtype=int64, numpy=1>)
#(<tf.Tensor: shape=(4,), dtype=int64, numpy=array([0, 0, 1, 0])>, <tf.Tensor: shape=(), dtype=int64, numpy=0>)
for elem in iter(dataset):
    print(tf.data)

use from_generator to consume a python generator

def count(stop):
  i = 0
  while i<stop:
    yield i
    i += 1

ds_counter = tf.data.Dataset.from_generator(count, args=[25], output_types=tf.int32, output_shapes = (), )

2.1.2. Batching

use batch or padded_batch

def gen_series():
  i = 0
  while True:
    size = np.random.randint(0, 10)
    yield i, np.random.normal(size=(size,))
    i += 1

ds_series = tf.data.Dataset.from_generator(
    gen_series,
    output_types=(tf.int32, tf.float32),
    output_shapes=((), (None,)))

# batch 2 datapoint with max shape 8
# if each sample is a dict, the 2nd arg padded_shape can take a dict mapping each key to its max length or [] to indicate list without padding
ds_series_batch = ds_series.shuffle(20).padded_batch(2, 8)

_, sequence_batch = next(iter(ds_series_batch))
print(sequence_batch.numpy())
[[ 0.0208  0.      0.      0.      0.      0.      0.      0.],
 [-1.2131  0.523   1.083   0.3762 -1.1041 -1.6604 -2.3436, 0 ]]

2.1.3. Tokenize

can use tokenizer in tensorflow_text

tokenizer = text.WhitespaceTokenizer()
dataset = dataset.map(lambda x: tokenizer.tokenize(x))

2.1.4. TFRecordDataset

an Example proto can be serialized/deserialized into/from string

# convert example to string
string_example = example_proto.SerializeToString()

# convert back
example_proto = tf.train.Example.FromString(serialized_example.numpy())

TFRecord is a file format storing a sequence of byte sequences, it can be used to store example strings.

# Write the `tf.train.Example` observations to the file.
with tf.io.TFRecordWriter(filename) as writer:
  for i in range(n_observations):
    example = serialize_example(feature0[i], feature1[i], feature2[i], feature3[i])
    writer.write(example.numpy())


# Read
filenames = [filename]
raw_dataset = tf.data.TFRecordDataset(filenames)
for raw_record in raw_dataset.take(1):
  example = tf.train.Example()
  example.ParseFromString(raw_record.numpy())
  print(example)

2.2. tfds

See doc here

tfds provides a ready-to-use datasets

tfds.load is roughly equivalently doing the following:

builder = tfds.builder('mnist')
# 1. Create the tfrecord files (no-op if already exists)
builder.download_and_prepare()
# 2. Load the `tf.data.Dataset`
ds = builder.as_dataset(split='train', shuffle_files=True)

3. Torch Datasets

Dataset provides the map style dataset, which can do fast random access by using apache arrow as in-memory column format, which is cached on disk.

# Many raw dataset will be converted into arrow cache during loading
data_files = {"train": ["path/to/data.csv"]}
my_dataset = load_dataset("csv", data_files=data_files, split="train")

# transformation will trigger processing on all data immediately and build new cache
my_dataset = my_dataset.map(process_fn)

IterableDataset is the iterable dataset, it dynamically load data from disk (without arrow conversion) and transformation are applied on the fly. See this doc

It can be creatd by load_dataset's streaming mode or generator

# streaming mode
imagenet = load_dataset("imagenet-1k", split="train", streaming=True)  # will start loading the data when iterated over
for example in imagenet:
    print(example)
    break


def my_generator(n):
    for i in range(n):
        yield {"col_1": i}

my_iterable_dataset = IterableDataset.from_generator(my_generator, gen_kwargs={"n": 10})