Skip to content

Commit

Permalink
A full demo implementation showing record generation and conversion
Browse files Browse the repository at this point in the history
PiperOrigin-RevId: 579985173
  • Loading branch information
ArrayRecord Team authored and copybara-github committed Nov 13, 2023
1 parent 2544e3b commit a41aefa
Show file tree
Hide file tree
Showing 5 changed files with 377 additions and 0 deletions.
43 changes: 43 additions & 0 deletions beam/demo.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
"""Demo Pipeline.
This file creates a TFrecord dataset and converts it to ArrayRecord on GCS
"""

import apache_beam as beam
from apache_beam.coders import coders
from . import dofns
from . import example
from . import options


## Grab CLI arguments.
## Override by passing args/pipeline_options to the function manually.
args, pipeline_options = options.get_arguments()


def main():
p1 = beam.Pipeline(options=pipeline_options)
initial = (p1
| 'Create a set of TFExamples' >> beam.Create(
example.generate_movie_examples()
)
| 'Write TFRecords' >> beam.io.WriteToTFRecord(
args['input'],
coder=coders.ToBytesCoder(),
num_shards=4,
file_name_suffix='.tfrecord'
)
| 'Read shards from GCS' >> beam.io.ReadAllFromTFRecord(
with_filename=True)
| 'Group with Filename' >> beam.GroupByKey()
| 'Write to ArrayRecord in GCS' >> beam.ParDo(
dofns.ConvertToArrayRecordGCS(),
args['output'],
overwrite_extension=True))

return p1, initial


if __name__ == '__main__':
demo_pipeline = main()
demo_pipeline.run()
106 changes: 106 additions & 0 deletions beam/example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
"""Helper file for generating TF/ArrayRecords and writing them to disk."""

import os
from array_record.python.array_record_module import ArrayRecordWriter
import tensorflow as tf
from . import testdata


def generate_movie_examples():
"""Create a list of TF examples from the dummy data above and return it.
Returns:
TFExample object
"""

examples = []
for example in testdata.data:
examples.append(
tf.train.Example(
features=tf.train.Features(
feature={
'Age': tf.train.Feature(
int64_list=tf.train.Int64List(value=[example['Age']])),
'Movie': tf.train.Feature(
bytes_list=tf.train.BytesList(
value=[
m.encode('utf-8') for m in example['Movie']])),
'Movie Ratings': tf.train.Feature(
float_list=tf.train.FloatList(
value=example['Movie Ratings'])),
'Suggestion': tf.train.Feature(
bytes_list=tf.train.BytesList(
value=[example['Suggestion'].encode('utf-8')])),
'Suggestion Purchased': tf.train.Feature(
float_list=tf.train.FloatList(
value=[example['Suggestion Purchased']])),
'Purchase Price': tf.train.Feature(
float_list=tf.train.FloatList(
value=[example['Purchase Price']]))
}
)
)
)

return(examples)


def generate_serialized_movie_examples():
"""Return a serialized version of the above data for byte insertion."""

return [example.SerializeToString() for example in generate_movie_examples()]


def write_example_to_tfrecord(example, file_path):
"""Write example(s) to a single TFrecord file."""

with tf.io.TFRecordWriter(file_path) as writer:
writer.write(example.SerializeToString())


# Write example(s) to a single ArrayRecord file
def write_example_to_arrayrecord(example, file_path):
writer = ArrayRecordWriter(file_path, 'group_size:1')
writer.write(example.SerializeToString())
writer.close()


def kitty_tfrecord(prefix=''):
"""Create a TFRecord from a cat pic on the Internet.
This is mainly for testing; probably don't use it.
Args:
prefix: A file directory in string format.
"""

cat_in_snow = tf.keras.utils.get_file(
'320px-Felis_catus-cat_on_snow.jpg',
'https://storage.googleapis.com/download.tensorflow.org/example_images/320px-Felis_catus-cat_on_snow.jpg')

image_labels = {
cat_in_snow: 0
}

image_string = open(cat_in_snow, 'rb').read()
label = image_labels[cat_in_snow]
image_shape = tf.io.decode_jpeg(image_string).shape

feature = {
'height': tf.train.Feature(int64_list=tf.train.Int64List(
value=[image_shape[0]])),
'width': tf.train.Feature(int64_list=tf.train.Int64List(
value=[image_shape[1]])),
'depth': tf.train.Feature(int64_list=tf.train.Int64List(
value=[image_shape[2]])),
'label': tf.train.Feature(int64_list=tf.train.Int64List(
value=[label])),
'image_raw': tf.train.Feature(bytes_list=tf.train.BytesList(
value=[image_string]))
}

example = tf.train.Example(features=tf.train.Features(feature=feature))

record_file = os.path.join(prefix, 'kittymeow.tfrecord')
with tf.io.TFRecordWriter(record_file) as writer:
writer.write(example.SerializeToString())
26 changes: 26 additions & 0 deletions beam/options.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
"""Handler for Pipeline and Beam options that allows for cleaner importing."""


import argparse
from apache_beam.options import pipeline_options


def get_arguments():
"""Simple external wrapper for argparse that allows for manual construction.
Returns:
1. A dictionary of known args for use in pipelines
2. The remainder of the arguments in PipelineOptions format
"""

parser = argparse.ArgumentParser()
parser.add_argument(
'--input',
help='The file pattern for the input TFRecords.',)
parser.add_argument(
'--output',
help='The path prefix for output ArrayRecords.')

args, beam_args = parser.parse_known_args()
return(args.__dict__, pipeline_options.PipelineOptions(beam_args))
162 changes: 162 additions & 0 deletions beam/pipelines.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
"""Various opinionated Beam pipelines for testing different functionality."""

import apache_beam as beam
from apache_beam.coders import coders
from . import arrayrecordio
from . import dofns
from . import example
from . import options


## Grab CLI arguments.
## Override by passing args/pipeline_options to the function manually.
def_args, def_pipeline_options = options.get_arguments()


def example_to_tfrecord(
num_shards=1,
args=def_args,
pipeline_options=def_pipeline_options):
"""Beam pipeline for creating example TFRecord data.
Args:
num_shards: Number of files
args: Custom arguments
pipeline_options: Beam arguments in dict format
Returns:
Beam Pipeline object
"""

p1 = beam.Pipeline(options=pipeline_options)
initial = (p1
| 'Create' >> beam.Create(example.generate_movie_examples())
| 'Write' >> beam.io.WriteToTFRecord(
args['output'],
coder=coders.ToBytesCoder(),
num_shards=num_shards,
file_name_suffix='.tfrecord'))

return p1, initial


def example_to_arrayrecord(
num_shards=1,
args=def_args,
pipeline_options=def_pipeline_options):
"""Beam pipeline for creating example ArrayRecord data.
Args:
num_shards: Number of files
args: Custom arguments
pipeline_options: Beam arguments in dict format
Returns:
Beam Pipeline object
"""

p1 = beam.Pipeline(options=pipeline_options)
initial = (p1
| 'Create' >> beam.Create(example.generate_movie_examples())
| 'Write' >> arrayrecordio.WriteToArrayRecord(
args['output'],
coder=coders.ToBytesCoder(),
num_shards=num_shards,
file_name_suffix='.arrayrecord'))

return p1, initial


def convert_tf_to_arrayrecord_disk(
num_shards=1,
args=def_args,
pipeline_options=def_pipeline_options):
"""Convert TFRecords to ArrayRecords using sink/sharding functionality.
THIS ONLY WORKS FOR DISK ARRAYRECORD WRITES
Args:
num_shards: Number of files
args: Custom arguments
pipeline_options: Beam arguments in dict format
Returns:
Beam Pipeline object
"""

p1 = beam.Pipeline(options=pipeline_options)
initial = (p1
| 'Read TFRecord' >> beam.io.ReadFromTFRecord(args['input'])
| 'Write ArrayRecord' >> arrayrecordio.WriteToArrayRecord(
args['output'],
coder=coders.ToBytesCoder(),
num_shards=num_shards,
file_name_suffix='.arrayrecord'))

return p1, initial


def convert_tf_to_arrayrecord_disk_match_shards(
args=def_args,
pipeline_options=def_pipeline_options):
"""Convert TFRecords to matching number of ArrayRecords.
THIS ONLY WORKS FOR DISK ARRAYRECORD WRITES
Args:
args: Custom arguments
pipeline_options: Beam arguments in dict format
Returns:
Beam Pipeline object
"""

p1 = beam.Pipeline(options=pipeline_options)
initial = (p1
| 'Start' >> beam.Create([args['input']])
| 'Read' >> beam.io.ReadAllFromTFRecord(with_filename=True))

file_count = (initial
| 'Group' >> beam.GroupByKey()
| 'Count Shards' >> beam.combiners.Count.Globally())

write_files = (initial
| 'Drop Filename' >> beam.Map(lambda x: x[1])
| 'Write ArrayRecord' >> arrayrecordio.WriteToArrayRecord(
args['output'],
coder=coders.ToBytesCoder(),
num_shards=beam.pvalue.AsSingleton(file_count),
file_name_suffix='.arrayrecord'))

return p1, write_files


def convert_tf_to_arrayrecord_gcs(
overwrite_extension=False,
file_path_suffix='.arrayrecord',
args=def_args,
pipeline_options=def_pipeline_options):
"""Convert TFRecords to ArrayRecords in GCS 1:1.
Args:
overwrite_extension: Boolean making DoFn attempt to overwrite extension
file_path_suffix: Intended suffix for overwrite or append
args: Custom arguments
pipeline_options: Beam arguments in dict format
Returns:
Beam Pipeline object
"""

p1 = beam.Pipeline(options=pipeline_options)
initial = (p1
| 'Start' >> beam.Create([args['input']])
| 'Read' >> beam.io.ReadAllFromTFRecord(with_filename=True)
| 'Group' >> beam.GroupByKey()
| 'Write to ArrayRecord in GCS' >> beam.ParDo(
dofns.ConvertToArrayRecordGCS(),
args['output'],
file_path_suffix=file_path_suffix,
overwrite_extension=overwrite_extension))

return p1, initial
40 changes: 40 additions & 0 deletions beam/testdata.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
"""Simple test data wrapper.
Separated to keep Tensorflow and Beam dependencies away from test data
"""

# Hardcoded multirecord dataset in dict format for testing and demo.
data = [
{
'Age': 29,
'Movie': ['The Shawshank Redemption', 'Fight Club'],
'Movie Ratings': [9.0, 9.7],
'Suggestion': 'Inception',
'Suggestion Purchased': 1.0,
'Purchase Price': 9.99
},
{
'Age': 39,
'Movie': ['The Prestige', 'The Big Lebowski', 'The Fall'],
'Movie Ratings': [9.5, 8.5, 8.5],
'Suggestion': 'Interstellar',
'Suggestion Purchased': 1.0,
'Purchase Price': 14.99
},
{
'Age': 19,
'Movie': ['Barbie', 'The Batman', 'Boss Baby', 'Oppenheimer'],
'Movie Ratings': [9.6, 8.2, 10.0, 4.2],
'Suggestion': 'Secret Life of Pets',
'Suggestion Purchased': 0.0,
'Purchase Price': 25.99
},
{
'Age': 35,
'Movie': ['The Mothman Prophecies', 'Sinister'],
'Movie Ratings': [8.3, 9.0],
'Suggestion': 'Hereditary',
'Suggestion Purchased': 1.0,
'Purchase Price': 12.99
}
]

0 comments on commit a41aefa

Please sign in to comment.