Skip to content

Commit

Permalink
Adding examples of function usage
Browse files Browse the repository at this point in the history
PiperOrigin-RevId: 579985172
  • Loading branch information
ArrayRecord Team authored and copybara-github committed Nov 13, 2023
1 parent 2544e3b commit 0ba0b1e
Show file tree
Hide file tree
Showing 9 changed files with 450 additions and 0 deletions.
43 changes: 43 additions & 0 deletions beam/demo.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
"""Demo Pipeline.
This file creates a TFrecord dataset and converts it to ArrayRecord on GCS
"""

import apache_beam as beam
from apache_beam.coders import coders
from . import dofns
from . import example
from . import options


## Grab CLI arguments.
## Override by passing args/pipeline_options to the function manually.
args, pipeline_options = options.get_arguments()


def main():
p1 = beam.Pipeline(options=pipeline_options)
initial = (p1
| 'Create a set of TFExamples' >> beam.Create(
example.generate_movie_examples()
)
| 'Write TFRecords' >> beam.io.WriteToTFRecord(
args['input'],
coder=coders.ToBytesCoder(),
num_shards=4,
file_name_suffix='.tfrecord'
)
| 'Read shards from GCS' >> beam.io.ReadAllFromTFRecord(
with_filename=True)
| 'Group with Filename' >> beam.GroupByKey()
| 'Write to ArrayRecord in GCS' >> beam.ParDo(
dofns.ConvertToArrayRecordGCS(),
args['output'],
overwrite_extension=True))

return p1, initial


if __name__ == '__main__':
demo_pipeline = main()
demo_pipeline.run()
106 changes: 106 additions & 0 deletions beam/example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
"""Helper file for generating TF/ArrayRecords and writing them to disk."""

import os
from array_record.python.array_record_module import ArrayRecordWriter
import tensorflow as tf
from . import testdata


def generate_movie_examples():
"""Create a list of TF examples from the dummy data above and return it.
Returns:
TFExample object
"""

examples = []
for example in testdata.data:
examples.append(
tf.train.Example(
features=tf.train.Features(
feature={
'Age': tf.train.Feature(
int64_list=tf.train.Int64List(value=[example['Age']])),
'Movie': tf.train.Feature(
bytes_list=tf.train.BytesList(
value=[
m.encode('utf-8') for m in example['Movie']])),
'Movie Ratings': tf.train.Feature(
float_list=tf.train.FloatList(
value=example['Movie Ratings'])),
'Suggestion': tf.train.Feature(
bytes_list=tf.train.BytesList(
value=[example['Suggestion'].encode('utf-8')])),
'Suggestion Purchased': tf.train.Feature(
float_list=tf.train.FloatList(
value=[example['Suggestion Purchased']])),
'Purchase Price': tf.train.Feature(
float_list=tf.train.FloatList(
value=[example['Purchase Price']]))
}
)
)
)

return(examples)


def generate_serialized_movie_examples():
"""Return a serialized version of the above data for byte insertion."""

return [example.SerializeToString() for example in generate_movie_examples()]


def write_example_to_tfrecord(example, file_path):
"""Write example(s) to a single TFrecord file."""

with tf.io.TFRecordWriter(file_path) as writer:
writer.write(example.SerializeToString())


# Write example(s) to a single ArrayRecord file
def write_example_to_arrayrecord(example, file_path):
writer = ArrayRecordWriter(file_path, 'group_size:1')
writer.write(example.SerializeToString())
writer.close()


def kitty_tfrecord(prefix=''):
"""Create a TFRecord from a cat pic on the Internet.
This is mainly for testing; probably don't use it.
Args:
prefix: A file directory in string format.
"""

cat_in_snow = tf.keras.utils.get_file(
'320px-Felis_catus-cat_on_snow.jpg',
'https://storage.googleapis.com/download.tensorflow.org/example_images/320px-Felis_catus-cat_on_snow.jpg')

image_labels = {
cat_in_snow: 0
}

image_string = open(cat_in_snow, 'rb').read()
label = image_labels[cat_in_snow]
image_shape = tf.io.decode_jpeg(image_string).shape

feature = {
'height': tf.train.Feature(int64_list=tf.train.Int64List(
value=[image_shape[0]])),
'width': tf.train.Feature(int64_list=tf.train.Int64List(
value=[image_shape[1]])),
'depth': tf.train.Feature(int64_list=tf.train.Int64List(
value=[image_shape[2]])),
'label': tf.train.Feature(int64_list=tf.train.Int64List(
value=[label])),
'image_raw': tf.train.Feature(bytes_list=tf.train.BytesList(
value=[image_string]))
}

example = tf.train.Example(features=tf.train.Features(feature=feature))

record_file = os.path.join(prefix, 'kittymeow.tfrecord')
with tf.io.TFRecordWriter(record_file) as writer:
writer.write(example.SerializeToString())
13 changes: 13 additions & 0 deletions beam/examples/example_full_demo_cli.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# Execute this via BASH to run a full demo that creates TFRecords and converts them

#!/bin/bash


# Set bucket info below. Uncomment lower lines and set values to use Dataflow.
python -m array_record.beam.demo \
--input="gs://<YOUR_INPUT_BUCKET>/records/movies" \
--output="gs://<YOUR_OUTPUT_BUCKET>/records/" \
# --region="<YOUR_REGION>" \
# --runner="DataflowRunner" \
# --project="<YOUR_PROJECT>" \
# --requirements_file="requirements.txt"
29 changes: 29 additions & 0 deletions beam/examples/example_gcs_conversion.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
"""Execute this to convert an existing set of TFRecords to ArrayRecords."""


from apache_beam.options import pipeline_options
from array_record.beam.pipelines import convert_tf_to_arrayrecord_gcs

## Set input and output patterns as specified
input_pattern = 'gs://<YOUR_INPUT_BUCKET>/records/*.tfrecord'
output_path = 'gs://<YOUR_OUTPUT_BUCKET>/records/'

args = {'input': input_pattern, 'output': output_path}

## Set pipeline options and uncomment in main() to run in Dataflow
pipeline_options = pipeline_options.PipelineOptions(
runner='DataflowRunner',
project='<YOUR_PROJECT>',
region='<YOUR_REGION>',
requirements_file='requirements.txt'
)


def main():
convert_tf_to_arrayrecord_gcs(
args=args,
# pipeline_options=pipeline_options
).run()

if __name__ == '__main__':
main()
29 changes: 29 additions & 0 deletions beam/examples/example_sink_conversion.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
"""Execute this to convert TFRecords to ArrayRecords using the disk Sink."""


from apache_beam.options import pipeline_options
from array_record.beam.pipelines import convert_tf_to_arrayrecord_disk_match_shards

## Set input and output patterns as specified
input_pattern = 'gs://<YOUR_INPUT_BUCKET>/records/*.tfrecord'
output_path = 'records/movies'

args = {'input': input_pattern, 'output': output_path}

## Set pipeline options and uncomment in main() to run in Dataflow
pipeline_options = pipeline_options.PipelineOptions(
runner='DataflowRunner',
project='<YOUR_PROJECT>',
region='<YOUR_REGION>',
requirements_file='requirements.txt'
)


def main():
convert_tf_to_arrayrecord_disk_match_shards(
args=args,
# pipeline_options=pipeline_options
).run()

if __name__ == '__main__':
main()
2 changes: 2 additions & 0 deletions beam/examples/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
google-cloud-storage==2.11.0
tensorflow==2.14.0
26 changes: 26 additions & 0 deletions beam/options.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
"""Handler for Pipeline and Beam options that allows for cleaner importing."""


import argparse
from apache_beam.options import pipeline_options


def get_arguments():
"""Simple external wrapper for argparse that allows for manual construction.
Returns:
1. A dictionary of known args for use in pipelines
2. The remainder of the arguments in PipelineOptions format
"""

parser = argparse.ArgumentParser()
parser.add_argument(
'--input',
help='The file pattern for the input TFRecords.',)
parser.add_argument(
'--output',
help='The path prefix for output ArrayRecords.')

args, beam_args = parser.parse_known_args()
return(args.__dict__, pipeline_options.PipelineOptions(beam_args))
Loading

0 comments on commit 0ba0b1e

Please sign in to comment.