From a41aefa0b8122e6e98f06667af98a45db51f5ae7 Mon Sep 17 00:00:00 2001 From: ArrayRecord Team Date: Mon, 6 Nov 2023 16:03:30 -0800 Subject: [PATCH] A full demo implementation showing record generation and conversion PiperOrigin-RevId: 579985173 --- beam/demo.py | 43 ++++++++++++ beam/example.py | 106 ++++++++++++++++++++++++++++++ beam/options.py | 26 ++++++++ beam/pipelines.py | 162 ++++++++++++++++++++++++++++++++++++++++++++++ beam/testdata.py | 40 ++++++++++++ 5 files changed, 377 insertions(+) create mode 100644 beam/demo.py create mode 100644 beam/example.py create mode 100644 beam/options.py create mode 100644 beam/pipelines.py create mode 100644 beam/testdata.py diff --git a/beam/demo.py b/beam/demo.py new file mode 100644 index 0000000..2951dea --- /dev/null +++ b/beam/demo.py @@ -0,0 +1,43 @@ +"""Demo Pipeline. + +This file creates a TFrecord dataset and converts it to ArrayRecord on GCS +""" + +import apache_beam as beam +from apache_beam.coders import coders +from . import dofns +from . import example +from . import options + + +## Grab CLI arguments. +## Override by passing args/pipeline_options to the function manually. +args, pipeline_options = options.get_arguments() + + +def main(): + p1 = beam.Pipeline(options=pipeline_options) + initial = (p1 + | 'Create a set of TFExamples' >> beam.Create( + example.generate_movie_examples() + ) + | 'Write TFRecords' >> beam.io.WriteToTFRecord( + args['input'], + coder=coders.ToBytesCoder(), + num_shards=4, + file_name_suffix='.tfrecord' + ) + | 'Read shards from GCS' >> beam.io.ReadAllFromTFRecord( + with_filename=True) + | 'Group with Filename' >> beam.GroupByKey() + | 'Write to ArrayRecord in GCS' >> beam.ParDo( + dofns.ConvertToArrayRecordGCS(), + args['output'], + overwrite_extension=True)) + + return p1, initial + + +if __name__ == '__main__': + demo_pipeline = main() + demo_pipeline.run() diff --git a/beam/example.py b/beam/example.py new file mode 100644 index 0000000..1bf9dac --- /dev/null +++ b/beam/example.py @@ -0,0 +1,106 @@ +"""Helper file for generating TF/ArrayRecords and writing them to disk.""" + +import os +from array_record.python.array_record_module import ArrayRecordWriter +import tensorflow as tf +from . import testdata + + +def generate_movie_examples(): + """Create a list of TF examples from the dummy data above and return it. + + Returns: + TFExample object + """ + + examples = [] + for example in testdata.data: + examples.append( + tf.train.Example( + features=tf.train.Features( + feature={ + 'Age': tf.train.Feature( + int64_list=tf.train.Int64List(value=[example['Age']])), + 'Movie': tf.train.Feature( + bytes_list=tf.train.BytesList( + value=[ + m.encode('utf-8') for m in example['Movie']])), + 'Movie Ratings': tf.train.Feature( + float_list=tf.train.FloatList( + value=example['Movie Ratings'])), + 'Suggestion': tf.train.Feature( + bytes_list=tf.train.BytesList( + value=[example['Suggestion'].encode('utf-8')])), + 'Suggestion Purchased': tf.train.Feature( + float_list=tf.train.FloatList( + value=[example['Suggestion Purchased']])), + 'Purchase Price': tf.train.Feature( + float_list=tf.train.FloatList( + value=[example['Purchase Price']])) + } + ) + ) + ) + + return(examples) + + +def generate_serialized_movie_examples(): + """Return a serialized version of the above data for byte insertion.""" + + return [example.SerializeToString() for example in generate_movie_examples()] + + +def write_example_to_tfrecord(example, file_path): + """Write example(s) to a single TFrecord file.""" + + with tf.io.TFRecordWriter(file_path) as writer: + writer.write(example.SerializeToString()) + + +# Write example(s) to a single ArrayRecord file +def write_example_to_arrayrecord(example, file_path): + writer = ArrayRecordWriter(file_path, 'group_size:1') + writer.write(example.SerializeToString()) + writer.close() + + +def kitty_tfrecord(prefix=''): + """Create a TFRecord from a cat pic on the Internet. + + This is mainly for testing; probably don't use it. + + Args: + prefix: A file directory in string format. + """ + + cat_in_snow = tf.keras.utils.get_file( + '320px-Felis_catus-cat_on_snow.jpg', + 'https://storage.googleapis.com/download.tensorflow.org/example_images/320px-Felis_catus-cat_on_snow.jpg') + + image_labels = { + cat_in_snow: 0 + } + + image_string = open(cat_in_snow, 'rb').read() + label = image_labels[cat_in_snow] + image_shape = tf.io.decode_jpeg(image_string).shape + + feature = { + 'height': tf.train.Feature(int64_list=tf.train.Int64List( + value=[image_shape[0]])), + 'width': tf.train.Feature(int64_list=tf.train.Int64List( + value=[image_shape[1]])), + 'depth': tf.train.Feature(int64_list=tf.train.Int64List( + value=[image_shape[2]])), + 'label': tf.train.Feature(int64_list=tf.train.Int64List( + value=[label])), + 'image_raw': tf.train.Feature(bytes_list=tf.train.BytesList( + value=[image_string])) + } + + example = tf.train.Example(features=tf.train.Features(feature=feature)) + + record_file = os.path.join(prefix, 'kittymeow.tfrecord') + with tf.io.TFRecordWriter(record_file) as writer: + writer.write(example.SerializeToString()) diff --git a/beam/options.py b/beam/options.py new file mode 100644 index 0000000..a638c1a --- /dev/null +++ b/beam/options.py @@ -0,0 +1,26 @@ +"""Handler for Pipeline and Beam options that allows for cleaner importing.""" + + +import argparse +from apache_beam.options import pipeline_options + + +def get_arguments(): + """Simple external wrapper for argparse that allows for manual construction. + + Returns: + 1. A dictionary of known args for use in pipelines + 2. The remainder of the arguments in PipelineOptions format + + """ + + parser = argparse.ArgumentParser() + parser.add_argument( + '--input', + help='The file pattern for the input TFRecords.',) + parser.add_argument( + '--output', + help='The path prefix for output ArrayRecords.') + + args, beam_args = parser.parse_known_args() + return(args.__dict__, pipeline_options.PipelineOptions(beam_args)) diff --git a/beam/pipelines.py b/beam/pipelines.py new file mode 100644 index 0000000..2563d7f --- /dev/null +++ b/beam/pipelines.py @@ -0,0 +1,162 @@ +"""Various opinionated Beam pipelines for testing different functionality.""" + +import apache_beam as beam +from apache_beam.coders import coders +from . import arrayrecordio +from . import dofns +from . import example +from . import options + + +## Grab CLI arguments. +## Override by passing args/pipeline_options to the function manually. +def_args, def_pipeline_options = options.get_arguments() + + +def example_to_tfrecord( + num_shards=1, + args=def_args, + pipeline_options=def_pipeline_options): + """Beam pipeline for creating example TFRecord data. + + Args: + num_shards: Number of files + args: Custom arguments + pipeline_options: Beam arguments in dict format + + Returns: + Beam Pipeline object + """ + + p1 = beam.Pipeline(options=pipeline_options) + initial = (p1 + | 'Create' >> beam.Create(example.generate_movie_examples()) + | 'Write' >> beam.io.WriteToTFRecord( + args['output'], + coder=coders.ToBytesCoder(), + num_shards=num_shards, + file_name_suffix='.tfrecord')) + + return p1, initial + + +def example_to_arrayrecord( + num_shards=1, + args=def_args, + pipeline_options=def_pipeline_options): + """Beam pipeline for creating example ArrayRecord data. + + Args: + num_shards: Number of files + args: Custom arguments + pipeline_options: Beam arguments in dict format + + Returns: + Beam Pipeline object + """ + + p1 = beam.Pipeline(options=pipeline_options) + initial = (p1 + | 'Create' >> beam.Create(example.generate_movie_examples()) + | 'Write' >> arrayrecordio.WriteToArrayRecord( + args['output'], + coder=coders.ToBytesCoder(), + num_shards=num_shards, + file_name_suffix='.arrayrecord')) + + return p1, initial + + +def convert_tf_to_arrayrecord_disk( + num_shards=1, + args=def_args, + pipeline_options=def_pipeline_options): + """Convert TFRecords to ArrayRecords using sink/sharding functionality. + + THIS ONLY WORKS FOR DISK ARRAYRECORD WRITES + + Args: + num_shards: Number of files + args: Custom arguments + pipeline_options: Beam arguments in dict format + + Returns: + Beam Pipeline object + """ + + p1 = beam.Pipeline(options=pipeline_options) + initial = (p1 + | 'Read TFRecord' >> beam.io.ReadFromTFRecord(args['input']) + | 'Write ArrayRecord' >> arrayrecordio.WriteToArrayRecord( + args['output'], + coder=coders.ToBytesCoder(), + num_shards=num_shards, + file_name_suffix='.arrayrecord')) + + return p1, initial + + +def convert_tf_to_arrayrecord_disk_match_shards( + args=def_args, + pipeline_options=def_pipeline_options): + """Convert TFRecords to matching number of ArrayRecords. + + THIS ONLY WORKS FOR DISK ARRAYRECORD WRITES + + Args: + args: Custom arguments + pipeline_options: Beam arguments in dict format + + Returns: + Beam Pipeline object + """ + + p1 = beam.Pipeline(options=pipeline_options) + initial = (p1 + | 'Start' >> beam.Create([args['input']]) + | 'Read' >> beam.io.ReadAllFromTFRecord(with_filename=True)) + + file_count = (initial + | 'Group' >> beam.GroupByKey() + | 'Count Shards' >> beam.combiners.Count.Globally()) + + write_files = (initial + | 'Drop Filename' >> beam.Map(lambda x: x[1]) + | 'Write ArrayRecord' >> arrayrecordio.WriteToArrayRecord( + args['output'], + coder=coders.ToBytesCoder(), + num_shards=beam.pvalue.AsSingleton(file_count), + file_name_suffix='.arrayrecord')) + + return p1, write_files + + +def convert_tf_to_arrayrecord_gcs( + overwrite_extension=False, + file_path_suffix='.arrayrecord', + args=def_args, + pipeline_options=def_pipeline_options): + """Convert TFRecords to ArrayRecords in GCS 1:1. + + Args: + overwrite_extension: Boolean making DoFn attempt to overwrite extension + file_path_suffix: Intended suffix for overwrite or append + args: Custom arguments + pipeline_options: Beam arguments in dict format + + Returns: + Beam Pipeline object + """ + + p1 = beam.Pipeline(options=pipeline_options) + initial = (p1 + | 'Start' >> beam.Create([args['input']]) + | 'Read' >> beam.io.ReadAllFromTFRecord(with_filename=True) + | 'Group' >> beam.GroupByKey() + | 'Write to ArrayRecord in GCS' >> beam.ParDo( + dofns.ConvertToArrayRecordGCS(), + args['output'], + file_path_suffix=file_path_suffix, + overwrite_extension=overwrite_extension)) + + return p1, initial diff --git a/beam/testdata.py b/beam/testdata.py new file mode 100644 index 0000000..fd867dc --- /dev/null +++ b/beam/testdata.py @@ -0,0 +1,40 @@ +"""Simple test data wrapper. + +Separated to keep Tensorflow and Beam dependencies away from test data +""" + +# Hardcoded multirecord dataset in dict format for testing and demo. +data = [ + { + 'Age': 29, + 'Movie': ['The Shawshank Redemption', 'Fight Club'], + 'Movie Ratings': [9.0, 9.7], + 'Suggestion': 'Inception', + 'Suggestion Purchased': 1.0, + 'Purchase Price': 9.99 + }, + { + 'Age': 39, + 'Movie': ['The Prestige', 'The Big Lebowski', 'The Fall'], + 'Movie Ratings': [9.5, 8.5, 8.5], + 'Suggestion': 'Interstellar', + 'Suggestion Purchased': 1.0, + 'Purchase Price': 14.99 + }, + { + 'Age': 19, + 'Movie': ['Barbie', 'The Batman', 'Boss Baby', 'Oppenheimer'], + 'Movie Ratings': [9.6, 8.2, 10.0, 4.2], + 'Suggestion': 'Secret Life of Pets', + 'Suggestion Purchased': 0.0, + 'Purchase Price': 25.99 + }, + { + 'Age': 35, + 'Movie': ['The Mothman Prophecies', 'Sinister'], + 'Movie Ratings': [8.3, 9.0], + 'Suggestion': 'Hereditary', + 'Suggestion Purchased': 1.0, + 'Purchase Price': 12.99 + } +]