From b61ff7cd0b15cbe9ddc9ceb5da2422f880a270b8 Mon Sep 17 00:00:00 2001 From: ArrayRecord Team Date: Wed, 6 Nov 2024 11:17:05 -0800 Subject: [PATCH] fix beam pipeline for newer python & beam - Pin Apache Beam version to 2.53.0 - Fix bugs in arrayrecordio.py and pipelines.py - Add a quickstart section in README.md PiperOrigin-RevId: 693797378 --- beam/README.md | 13 +++ beam/arrayrecordio.py | 12 +- beam/examples/example_gcs_conversion.py | 5 +- beam/examples/example_sink_conversion.py | 5 +- beam/examples/requirements.txt | 3 +- beam/pipelines.py | 143 +++++++++++++---------- setup.py | 2 +- 7 files changed, 109 insertions(+), 74 deletions(-) diff --git a/beam/README.md b/beam/README.md index 6818964..a0f1a56 100644 --- a/beam/README.md +++ b/beam/README.md @@ -1,5 +1,18 @@ ## Apache Beam Integration for ArrayRecord +### Quickstart + +#### Convert TFRecord in a GCS bucket to ArrayRecord +``` +pip install array-record[beam] +git clone https://github.com/google/array_record.git +cd array_record/beam/examples +# Fill in the required fields in example_gcs_conversion.py +# If use DataFlow, set pipeline_options as instructed in example_gcs_conversion.py +python example_gcs_conversion.py +``` +If DataFlow is used, you can monitor the run from the DataFlow job monitoring UI (https://cloud.google.com/dataflow/docs/guides/monitoring-overview) + ### Summary This submodule provides some Apache Beam components and lightweight pipelines for converting different file formats (TFRecord at present) into ArrayRecords. The intention is to provide a variety of fairly seamless tools for migrating existing TFRecord datasets, allowing a few different choices regarding sharding and write location. diff --git a/beam/arrayrecordio.py b/beam/arrayrecordio.py index 9316e26..32a5fe4 100644 --- a/beam/arrayrecordio.py +++ b/beam/arrayrecordio.py @@ -7,8 +7,8 @@ from apache_beam import transforms from apache_beam.coders import coders from apache_beam.io import filebasedsink -from apache_beam.io.filesystem.CompressionTypes import AUTO -from array_record.python.array_record_module import ArrayRecordWriter +from apache_beam.io import filesystem +from array_record.python import array_record_module class _ArrayRecordSink(filebasedsink.FileBasedSink): @@ -21,7 +21,7 @@ def __init__( num_shards=0, shard_name_template=None, coder=coders.ToBytesCoder(), - compression_type=AUTO): + compression_type=filesystem.CompressionTypes.AUTO): super().__init__( file_path_prefix, @@ -33,7 +33,9 @@ def __init__( compression_type=compression_type) def open(self, temp_path): - array_writer = ArrayRecordWriter(temp_path, 'group_size:1') + array_writer = array_record_module.ArrayRecordWriter( + temp_path, 'group_size:1' + ) return array_writer def close(self, file_handle): @@ -53,7 +55,7 @@ def __init__( num_shards=0, shard_name_template=None, coder=coders.ToBytesCoder(), - compression_type=AUTO): + compression_type=filesystem.CompressionTypes.AUTO): self._sink = _ArrayRecordSink( file_path_prefix, diff --git a/beam/examples/example_gcs_conversion.py b/beam/examples/example_gcs_conversion.py index 2da526c..1355bcc 100644 --- a/beam/examples/example_gcs_conversion.py +++ b/beam/examples/example_gcs_conversion.py @@ -10,7 +10,8 @@ args = {'input': input_pattern, 'output': output_path} -## Set pipeline options and uncomment in main() to run in Dataflow +## If run in Dataflow, set pipeline options and uncomment in main() +## If run pipeline_options is not set, you will use a local runner pipeline_options = pipeline_options.PipelineOptions( runner='DataflowRunner', project='', @@ -22,7 +23,7 @@ def main(): convert_tf_to_arrayrecord_gcs( args=args, - # pipeline_options=pipeline_options + # pipeline_options=pipeline_options, ).run() if __name__ == '__main__': diff --git a/beam/examples/example_sink_conversion.py b/beam/examples/example_sink_conversion.py index 2d595bb..674a45c 100644 --- a/beam/examples/example_sink_conversion.py +++ b/beam/examples/example_sink_conversion.py @@ -10,7 +10,8 @@ args = {'input': input_pattern, 'output': output_path} -## Set pipeline options and uncomment in main() to run in Dataflow +## If run in Dataflow, set pipeline options and uncomment in main() +## If run pipeline_options is not set, you will use a local runner pipeline_options = pipeline_options.PipelineOptions( runner='DataflowRunner', project='', @@ -22,7 +23,7 @@ def main(): convert_tf_to_arrayrecord_disk_match_shards( args=args, - # pipeline_options=pipeline_options + # pipeline_options=pipeline_options, ).run() if __name__ == '__main__': diff --git a/beam/examples/requirements.txt b/beam/examples/requirements.txt index 100aa2c..d0cb7fa 100644 --- a/beam/examples/requirements.txt +++ b/beam/examples/requirements.txt @@ -1,2 +1,3 @@ +array-record[beam] google-cloud-storage==2.11.0 -tensorflow==2.14.0 +tensorflow==2.14.0 \ No newline at end of file diff --git a/beam/pipelines.py b/beam/pipelines.py index 2563d7f..effffbe 100644 --- a/beam/pipelines.py +++ b/beam/pipelines.py @@ -29,21 +29,23 @@ def example_to_tfrecord( """ p1 = beam.Pipeline(options=pipeline_options) - initial = (p1 - | 'Create' >> beam.Create(example.generate_movie_examples()) - | 'Write' >> beam.io.WriteToTFRecord( - args['output'], - coder=coders.ToBytesCoder(), - num_shards=num_shards, - file_name_suffix='.tfrecord')) - - return p1, initial + _ = ( + p1 + | 'Create' >> beam.Create(example.generate_movie_examples()) + | 'Write' + >> beam.io.WriteToTFRecord( + args['output'], + coder=coders.ToBytesCoder(), + num_shards=num_shards, + file_name_suffix='.tfrecord', + ) + ) + return p1 def example_to_arrayrecord( - num_shards=1, - args=def_args, - pipeline_options=def_pipeline_options): + num_shards=1, args=def_args, pipeline_options=def_pipeline_options +): """Beam pipeline for creating example ArrayRecord data. Args: @@ -56,21 +58,23 @@ def example_to_arrayrecord( """ p1 = beam.Pipeline(options=pipeline_options) - initial = (p1 - | 'Create' >> beam.Create(example.generate_movie_examples()) - | 'Write' >> arrayrecordio.WriteToArrayRecord( - args['output'], - coder=coders.ToBytesCoder(), - num_shards=num_shards, - file_name_suffix='.arrayrecord')) - - return p1, initial + _ = ( + p1 + | 'Create' >> beam.Create(example.generate_movie_examples()) + | 'Write' + >> arrayrecordio.WriteToArrayRecord( + args['output'], + coder=coders.ToBytesCoder(), + num_shards=num_shards, + file_name_suffix='.arrayrecord', + ) + ) + return p1 def convert_tf_to_arrayrecord_disk( - num_shards=1, - args=def_args, - pipeline_options=def_pipeline_options): + num_shards=1, args=def_args, pipeline_options=def_pipeline_options +): """Convert TFRecords to ArrayRecords using sink/sharding functionality. THIS ONLY WORKS FOR DISK ARRAYRECORD WRITES @@ -85,20 +89,23 @@ def convert_tf_to_arrayrecord_disk( """ p1 = beam.Pipeline(options=pipeline_options) - initial = (p1 - | 'Read TFRecord' >> beam.io.ReadFromTFRecord(args['input']) - | 'Write ArrayRecord' >> arrayrecordio.WriteToArrayRecord( - args['output'], - coder=coders.ToBytesCoder(), - num_shards=num_shards, - file_name_suffix='.arrayrecord')) - - return p1, initial + _ = ( + p1 + | 'Read TFRecord' >> beam.io.ReadFromTFRecord(args['input']) + | 'Write ArrayRecord' + >> arrayrecordio.WriteToArrayRecord( + args['output'], + coder=coders.ToBytesCoder(), + num_shards=num_shards, + file_name_suffix='.arrayrecord', + ) + ) + return p1 def convert_tf_to_arrayrecord_disk_match_shards( - args=def_args, - pipeline_options=def_pipeline_options): + args=def_args, pipeline_options=def_pipeline_options +): """Convert TFRecords to matching number of ArrayRecords. THIS ONLY WORKS FOR DISK ARRAYRECORD WRITES @@ -112,23 +119,30 @@ def convert_tf_to_arrayrecord_disk_match_shards( """ p1 = beam.Pipeline(options=pipeline_options) - initial = (p1 - | 'Start' >> beam.Create([args['input']]) - | 'Read' >> beam.io.ReadAllFromTFRecord(with_filename=True)) - - file_count = (initial - | 'Group' >> beam.GroupByKey() - | 'Count Shards' >> beam.combiners.Count.Globally()) - - write_files = (initial - | 'Drop Filename' >> beam.Map(lambda x: x[1]) - | 'Write ArrayRecord' >> arrayrecordio.WriteToArrayRecord( - args['output'], - coder=coders.ToBytesCoder(), - num_shards=beam.pvalue.AsSingleton(file_count), - file_name_suffix='.arrayrecord')) - - return p1, write_files + initial = ( + p1 + | 'Start' >> beam.Create([args['input']]) + | 'Read' >> beam.io.ReadAllFromTFRecord(with_filename=True) + ) + + file_count = ( + initial + | 'Group' >> beam.GroupByKey() + | 'Count Shards' >> beam.combiners.Count.Globally() + ) + + _ = ( + initial + | 'Drop Filename' >> beam.Map(lambda x: x[1]) + | 'Write ArrayRecord' + >> arrayrecordio.WriteToArrayRecord( + args['output'], + coder=coders.ToBytesCoder(), + num_shards=beam.pvalue.AsSingleton(file_count), + file_name_suffix='.arrayrecord', + ) + ) + return p1 def convert_tf_to_arrayrecord_gcs( @@ -149,14 +163,17 @@ def convert_tf_to_arrayrecord_gcs( """ p1 = beam.Pipeline(options=pipeline_options) - initial = (p1 - | 'Start' >> beam.Create([args['input']]) - | 'Read' >> beam.io.ReadAllFromTFRecord(with_filename=True) - | 'Group' >> beam.GroupByKey() - | 'Write to ArrayRecord in GCS' >> beam.ParDo( - dofns.ConvertToArrayRecordGCS(), - args['output'], - file_path_suffix=file_path_suffix, - overwrite_extension=overwrite_extension)) - - return p1, initial + _ = ( + p1 + | 'Start' >> beam.Create([args['input']]) + | 'Read' >> beam.io.ReadAllFromTFRecord(with_filename=True) + | 'Group' >> beam.GroupByKey() + | 'Write to ArrayRecord in GCS' + >> beam.ParDo( + dofns.ConvertToArrayRecordGCS(), + args['output'], + file_path_suffix=file_path_suffix, + overwrite_extension=overwrite_extension, + ) + ) + return p1 diff --git a/setup.py b/setup.py index cfb0bac..f5d6dda 100644 --- a/setup.py +++ b/setup.py @@ -10,7 +10,7 @@ ] BEAM_EXTRAS = [ - 'apache-beam[gcp]>=2.50.0', + 'apache-beam[gcp]==2.53.0', 'google-cloud-storage>=2.11.0', 'tensorflow>=2.14.0' ]