Skip to content

Commit

Permalink
fix beam pipeline for newer python & beam
Browse files Browse the repository at this point in the history
- Pin Apache Beam version to 2.53.0
- Fix bugs in arrayrecordio.py and pipelines.py
- Add a quickstart section in README.md

PiperOrigin-RevId: 693797378
  • Loading branch information
ArrayRecord Team authored and copybara-github committed Nov 6, 2024
1 parent 1fa615c commit b61ff7c
Show file tree
Hide file tree
Showing 7 changed files with 109 additions and 74 deletions.
13 changes: 13 additions & 0 deletions beam/README.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,18 @@
## Apache Beam Integration for ArrayRecord

### Quickstart

#### Convert TFRecord in a GCS bucket to ArrayRecord
```
pip install array-record[beam]
git clone https://github.com/google/array_record.git
cd array_record/beam/examples
# Fill in the required fields in example_gcs_conversion.py
# If use DataFlow, set pipeline_options as instructed in example_gcs_conversion.py
python example_gcs_conversion.py
```
If DataFlow is used, you can monitor the run from the DataFlow job monitoring UI (https://cloud.google.com/dataflow/docs/guides/monitoring-overview)

### Summary

This submodule provides some Apache Beam components and lightweight pipelines for converting different file formats (TFRecord at present) into ArrayRecords. The intention is to provide a variety of fairly seamless tools for migrating existing TFRecord datasets, allowing a few different choices regarding sharding and write location.
Expand Down
12 changes: 7 additions & 5 deletions beam/arrayrecordio.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@
from apache_beam import transforms
from apache_beam.coders import coders
from apache_beam.io import filebasedsink
from apache_beam.io.filesystem.CompressionTypes import AUTO
from array_record.python.array_record_module import ArrayRecordWriter
from apache_beam.io import filesystem
from array_record.python import array_record_module


class _ArrayRecordSink(filebasedsink.FileBasedSink):
Expand All @@ -21,7 +21,7 @@ def __init__(
num_shards=0,
shard_name_template=None,
coder=coders.ToBytesCoder(),
compression_type=AUTO):
compression_type=filesystem.CompressionTypes.AUTO):

super().__init__(
file_path_prefix,
Expand All @@ -33,7 +33,9 @@ def __init__(
compression_type=compression_type)

def open(self, temp_path):
array_writer = ArrayRecordWriter(temp_path, 'group_size:1')
array_writer = array_record_module.ArrayRecordWriter(
temp_path, 'group_size:1'
)
return array_writer

def close(self, file_handle):
Expand All @@ -53,7 +55,7 @@ def __init__(
num_shards=0,
shard_name_template=None,
coder=coders.ToBytesCoder(),
compression_type=AUTO):
compression_type=filesystem.CompressionTypes.AUTO):

self._sink = _ArrayRecordSink(
file_path_prefix,
Expand Down
5 changes: 3 additions & 2 deletions beam/examples/example_gcs_conversion.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@

args = {'input': input_pattern, 'output': output_path}

## Set pipeline options and uncomment in main() to run in Dataflow
## If run in Dataflow, set pipeline options and uncomment in main()
## If run pipeline_options is not set, you will use a local runner
pipeline_options = pipeline_options.PipelineOptions(
runner='DataflowRunner',
project='<YOUR_PROJECT>',
Expand All @@ -22,7 +23,7 @@
def main():
convert_tf_to_arrayrecord_gcs(
args=args,
# pipeline_options=pipeline_options
# pipeline_options=pipeline_options,
).run()

if __name__ == '__main__':
Expand Down
5 changes: 3 additions & 2 deletions beam/examples/example_sink_conversion.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@

args = {'input': input_pattern, 'output': output_path}

## Set pipeline options and uncomment in main() to run in Dataflow
## If run in Dataflow, set pipeline options and uncomment in main()
## If run pipeline_options is not set, you will use a local runner
pipeline_options = pipeline_options.PipelineOptions(
runner='DataflowRunner',
project='<YOUR_PROJECT>',
Expand All @@ -22,7 +23,7 @@
def main():
convert_tf_to_arrayrecord_disk_match_shards(
args=args,
# pipeline_options=pipeline_options
# pipeline_options=pipeline_options,
).run()

if __name__ == '__main__':
Expand Down
3 changes: 2 additions & 1 deletion beam/examples/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
array-record[beam]
google-cloud-storage==2.11.0
tensorflow==2.14.0
tensorflow==2.14.0
143 changes: 80 additions & 63 deletions beam/pipelines.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,21 +29,23 @@ def example_to_tfrecord(
"""

p1 = beam.Pipeline(options=pipeline_options)
initial = (p1
| 'Create' >> beam.Create(example.generate_movie_examples())
| 'Write' >> beam.io.WriteToTFRecord(
args['output'],
coder=coders.ToBytesCoder(),
num_shards=num_shards,
file_name_suffix='.tfrecord'))

return p1, initial
_ = (
p1
| 'Create' >> beam.Create(example.generate_movie_examples())
| 'Write'
>> beam.io.WriteToTFRecord(
args['output'],
coder=coders.ToBytesCoder(),
num_shards=num_shards,
file_name_suffix='.tfrecord',
)
)
return p1


def example_to_arrayrecord(
num_shards=1,
args=def_args,
pipeline_options=def_pipeline_options):
num_shards=1, args=def_args, pipeline_options=def_pipeline_options
):
"""Beam pipeline for creating example ArrayRecord data.
Args:
Expand All @@ -56,21 +58,23 @@ def example_to_arrayrecord(
"""

p1 = beam.Pipeline(options=pipeline_options)
initial = (p1
| 'Create' >> beam.Create(example.generate_movie_examples())
| 'Write' >> arrayrecordio.WriteToArrayRecord(
args['output'],
coder=coders.ToBytesCoder(),
num_shards=num_shards,
file_name_suffix='.arrayrecord'))

return p1, initial
_ = (
p1
| 'Create' >> beam.Create(example.generate_movie_examples())
| 'Write'
>> arrayrecordio.WriteToArrayRecord(
args['output'],
coder=coders.ToBytesCoder(),
num_shards=num_shards,
file_name_suffix='.arrayrecord',
)
)
return p1


def convert_tf_to_arrayrecord_disk(
num_shards=1,
args=def_args,
pipeline_options=def_pipeline_options):
num_shards=1, args=def_args, pipeline_options=def_pipeline_options
):
"""Convert TFRecords to ArrayRecords using sink/sharding functionality.
THIS ONLY WORKS FOR DISK ARRAYRECORD WRITES
Expand All @@ -85,20 +89,23 @@ def convert_tf_to_arrayrecord_disk(
"""

p1 = beam.Pipeline(options=pipeline_options)
initial = (p1
| 'Read TFRecord' >> beam.io.ReadFromTFRecord(args['input'])
| 'Write ArrayRecord' >> arrayrecordio.WriteToArrayRecord(
args['output'],
coder=coders.ToBytesCoder(),
num_shards=num_shards,
file_name_suffix='.arrayrecord'))

return p1, initial
_ = (
p1
| 'Read TFRecord' >> beam.io.ReadFromTFRecord(args['input'])
| 'Write ArrayRecord'
>> arrayrecordio.WriteToArrayRecord(
args['output'],
coder=coders.ToBytesCoder(),
num_shards=num_shards,
file_name_suffix='.arrayrecord',
)
)
return p1


def convert_tf_to_arrayrecord_disk_match_shards(
args=def_args,
pipeline_options=def_pipeline_options):
args=def_args, pipeline_options=def_pipeline_options
):
"""Convert TFRecords to matching number of ArrayRecords.
THIS ONLY WORKS FOR DISK ARRAYRECORD WRITES
Expand All @@ -112,23 +119,30 @@ def convert_tf_to_arrayrecord_disk_match_shards(
"""

p1 = beam.Pipeline(options=pipeline_options)
initial = (p1
| 'Start' >> beam.Create([args['input']])
| 'Read' >> beam.io.ReadAllFromTFRecord(with_filename=True))

file_count = (initial
| 'Group' >> beam.GroupByKey()
| 'Count Shards' >> beam.combiners.Count.Globally())

write_files = (initial
| 'Drop Filename' >> beam.Map(lambda x: x[1])
| 'Write ArrayRecord' >> arrayrecordio.WriteToArrayRecord(
args['output'],
coder=coders.ToBytesCoder(),
num_shards=beam.pvalue.AsSingleton(file_count),
file_name_suffix='.arrayrecord'))

return p1, write_files
initial = (
p1
| 'Start' >> beam.Create([args['input']])
| 'Read' >> beam.io.ReadAllFromTFRecord(with_filename=True)
)

file_count = (
initial
| 'Group' >> beam.GroupByKey()
| 'Count Shards' >> beam.combiners.Count.Globally()
)

_ = (
initial
| 'Drop Filename' >> beam.Map(lambda x: x[1])
| 'Write ArrayRecord'
>> arrayrecordio.WriteToArrayRecord(
args['output'],
coder=coders.ToBytesCoder(),
num_shards=beam.pvalue.AsSingleton(file_count),
file_name_suffix='.arrayrecord',
)
)
return p1


def convert_tf_to_arrayrecord_gcs(
Expand All @@ -149,14 +163,17 @@ def convert_tf_to_arrayrecord_gcs(
"""

p1 = beam.Pipeline(options=pipeline_options)
initial = (p1
| 'Start' >> beam.Create([args['input']])
| 'Read' >> beam.io.ReadAllFromTFRecord(with_filename=True)
| 'Group' >> beam.GroupByKey()
| 'Write to ArrayRecord in GCS' >> beam.ParDo(
dofns.ConvertToArrayRecordGCS(),
args['output'],
file_path_suffix=file_path_suffix,
overwrite_extension=overwrite_extension))

return p1, initial
_ = (
p1
| 'Start' >> beam.Create([args['input']])
| 'Read' >> beam.io.ReadAllFromTFRecord(with_filename=True)
| 'Group' >> beam.GroupByKey()
| 'Write to ArrayRecord in GCS'
>> beam.ParDo(
dofns.ConvertToArrayRecordGCS(),
args['output'],
file_path_suffix=file_path_suffix,
overwrite_extension=overwrite_extension,
)
)
return p1
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
]

BEAM_EXTRAS = [
'apache-beam[gcp]>=2.50.0',
'apache-beam[gcp]==2.53.0',
'google-cloud-storage>=2.11.0',
'tensorflow>=2.14.0'
]
Expand Down

0 comments on commit b61ff7c

Please sign in to comment.