From bd74dae573466a4bc49aba195dea7d21166b0223 Mon Sep 17 00:00:00 2001 From: ArrayRecord Team Date: Mon, 6 Nov 2023 16:03:30 -0800 Subject: [PATCH] Beam helper pipelines covering different conversion use-cases PiperOrigin-RevId: 579985171 --- beam/__init__.py | 20 ++++++ beam/arrayrecordio.py | 67 +++++++++++++++++ beam/dofns.py | 57 +++++++++++++++ beam/pipelines.py | 162 ++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 306 insertions(+) create mode 100644 beam/__init__.py create mode 100644 beam/arrayrecordio.py create mode 100644 beam/dofns.py create mode 100644 beam/pipelines.py diff --git a/beam/__init__.py b/beam/__init__.py new file mode 100644 index 0000000..81841bd --- /dev/null +++ b/beam/__init__.py @@ -0,0 +1,20 @@ +"""Apache Beam module for array_record. + +This module provides both core components and +helper functions to enable users to convert different file formats to AR. + +To keep dependencies light, we'll import Beam on module usage so any errors +occur early. +""" + +import apache_beam as beam + +# I'd really like a PEP8 compatible conditional import here with a more +# explicit error message. Example below: + +# try: +# import apache_beam as beam +# except Exception as e: +# raise ImportError( +# ('Beam functionality requires extra dependencies. ' +# 'Install apache-beam or run "pip install array_record[beam]".')) from e diff --git a/beam/arrayrecordio.py b/beam/arrayrecordio.py new file mode 100644 index 0000000..9316e26 --- /dev/null +++ b/beam/arrayrecordio.py @@ -0,0 +1,67 @@ +"""An IO module for ArrayRecord. + +CURRENTLY ONLY SINK IS IMPLEMENTED, AND IT DOESN'T WORK WITH NON-DISK WRITES +""" + +from apache_beam import io +from apache_beam import transforms +from apache_beam.coders import coders +from apache_beam.io import filebasedsink +from apache_beam.io.filesystem.CompressionTypes import AUTO +from array_record.python.array_record_module import ArrayRecordWriter + + +class _ArrayRecordSink(filebasedsink.FileBasedSink): + """Sink Class for use in Arrayrecord PTransform.""" + + def __init__( + self, + file_path_prefix, + file_name_suffix=None, + num_shards=0, + shard_name_template=None, + coder=coders.ToBytesCoder(), + compression_type=AUTO): + + super().__init__( + file_path_prefix, + file_name_suffix=file_name_suffix, + num_shards=num_shards, + shard_name_template=shard_name_template, + coder=coder, + mime_type='application/octet-stream', + compression_type=compression_type) + + def open(self, temp_path): + array_writer = ArrayRecordWriter(temp_path, 'group_size:1') + return array_writer + + def close(self, file_handle): + file_handle.close() + + def write_encoded_record(self, file_handle, value): + file_handle.write(value) + + +class WriteToArrayRecord(transforms.PTransform): + """PTransform for a disk-based write to ArrayRecord.""" + + def __init__( + self, + file_path_prefix, + file_name_suffix='', + num_shards=0, + shard_name_template=None, + coder=coders.ToBytesCoder(), + compression_type=AUTO): + + self._sink = _ArrayRecordSink( + file_path_prefix, + file_name_suffix, + num_shards, + shard_name_template, + coder, + compression_type) + + def expand(self, pcoll): + return pcoll | io.iobase.Write(self._sink) diff --git a/beam/dofns.py b/beam/dofns.py new file mode 100644 index 0000000..a966dde --- /dev/null +++ b/beam/dofns.py @@ -0,0 +1,57 @@ +"""DoFn's for parallel processing.""" + +import os +import urllib +import apache_beam as beam +from array_record.python.array_record_module import ArrayRecordWriter +from google.cloud import storage + + +class ConvertToArrayRecordGCS(beam.DoFn): + """Write a tuple consisting of a filename and records to GCS ArrayRecords.""" + + _WRITE_DIR = '/tmp/' + + def process( + self, + element, + path, + write_dir=_WRITE_DIR, + file_path_suffix='.arrayrecord', + overwrite_extension=False, + ): + + ## Upload to GCS + def upload_to_gcs(bucket_name, filename, prefix='', source_dir=self._WRITE_DIR): + source_filename = os.path.join(source_dir, filename) + blob_name = os.path.join(prefix, filename) + storage_client = storage.Client() + bucket = storage_client.get_bucket(bucket_name) + blob = bucket.blob(blob_name) + blob.upload_from_filename(source_filename) + + ## Simple logic for stripping a file extension and replacing it + def fix_filename(filename): + base_name = os.path.splitext(filename)[0] + new_filename = base_name + file_path_suffix + return new_filename + + parsed_gcs_path = urllib.parse.urlparse(path) + bucket_name = parsed_gcs_path.hostname + gcs_prefix = parsed_gcs_path.path.lstrip('/') + + if overwrite_extension: + filename = fix_filename(os.path.basename(element[0])) + else: + filename = '{}{}'.format(os.path.basename(element[0]), file_path_suffix) + + write_path = os.path.join(write_dir, filename) + writer = ArrayRecordWriter(write_path, 'group_size:1') + + for item in element[1]: + writer.write(item) + + writer.close() + + upload_to_gcs(bucket_name, filename, prefix=gcs_prefix) + os.remove(os.path.join(write_dir, filename)) diff --git a/beam/pipelines.py b/beam/pipelines.py new file mode 100644 index 0000000..181d3c3 --- /dev/null +++ b/beam/pipelines.py @@ -0,0 +1,162 @@ +"""Various opinionated Beam pipelines for testing different functionality.""" + +import apache_beam as beam +from apache_beam.coders import coders +from array_record.beam.arrayrecordio import WriteToArrayRecord +from array_record.beam.dofns import ConvertToArrayRecordGCS +from .example import generate_movie_examples +from .options import get_arguments + + +## Grab CLI arguments. +## Override by passing args/pipeline_options to the function manually. +def_args, def_pipeline_options = get_arguments() + + +def example_to_tfrecord( + num_shards=1, + args=def_args, + pipeline_options=def_pipeline_options): + """Beam pipeline for creating example TFRecord data. + + Args: + num_shards: Number of files + args: Custom arguments + pipeline_options: Beam arguments in dict format + + Returns: + Beam Pipeline object + """ + + p1 = beam.Pipeline(options=pipeline_options) + initial = (p1 + | 'Create' >> beam.Create(generate_movie_examples()) + | 'Write' >> beam.io.WriteToTFRecord( + args['output'], + coder=coders.ToBytesCoder(), + num_shards=num_shards, + file_name_suffix='.tfrecord')) + + return p1, initial + + +def example_to_arrayrecord( + num_shards=1, + args=def_args, + pipeline_options=def_pipeline_options): + """Beam pipeline for creating example ArrayRecord data. + + Args: + num_shards: Number of files + args: Custom arguments + pipeline_options: Beam arguments in dict format + + Returns: + Beam Pipeline object + """ + + p1 = beam.Pipeline(options=pipeline_options) + initial = (p1 + | 'Create' >> beam.Create(generate_movie_examples()) + | 'Write' >> WriteToArrayRecord( + args['output'], + coder=coders.ToBytesCoder(), + num_shards=num_shards, + file_name_suffix='.arrayrecord')) + + return p1, initial + + +def convert_tf_to_arrayrecord_disk( + num_shards=1, + args=def_args, + pipeline_options=def_pipeline_options): + """Convert TFRecords to ArrayRecords using sink/sharding functionality. + + THIS ONLY WORKS FOR DISK ARRAYRECORD WRITES + + Args: + num_shards: Number of files + args: Custom arguments + pipeline_options: Beam arguments in dict format + + Returns: + Beam Pipeline object + """ + + p1 = beam.Pipeline(options=pipeline_options) + initial = (p1 + | 'Read TFRecord' >> beam.io.ReadFromTFRecord(args['input']) + | 'Write ArrayRecord' >> WriteToArrayRecord( + args['output'], + coder=coders.ToBytesCoder(), + num_shards=num_shards, + file_name_suffix='.arrayrecord')) + + return p1, initial + + +def convert_tf_to_arrayrecord_disk_match_shards( + args=def_args, + pipeline_options=def_pipeline_options): + """Convert TFRecords to matching number of ArrayRecords. + + THIS ONLY WORKS FOR DISK ARRAYRECORD WRITES + + Args: + args: Custom arguments + pipeline_options: Beam arguments in dict format + + Returns: + Beam Pipeline object + """ + + p1 = beam.Pipeline(options=pipeline_options) + initial = (p1 + | 'Start' >> beam.Create([args['input']]) + | 'Read' >> beam.io.ReadAllFromTFRecord(with_filename=True)) + + file_count = (initial + | 'Group' >> beam.GroupByKey() + | 'Count Shards' >> beam.combiners.Count.Globally()) + + write_files = (initial + | 'Drop Filename' >> beam.Map(lambda x: x[1]) + | 'Write ArrayRecord' >> WriteToArrayRecord( + args['output'], + coder=coders.ToBytesCoder(), + num_shards=beam.pvalue.AsSingleton(file_count), + file_name_suffix='.arrayrecord')) + + return p1, write_files + + +def convert_tf_to_arrayrecord_gcs( + overwrite_extension=False, + file_path_suffix='.arrayrecord', + args=def_args, + pipeline_options=def_pipeline_options): + """Convert TFRecords to ArrayRecords in GCS 1:1. + + Args: + overwrite_extension: Boolean making DoFn attempt to overwrite extension + file_path_suffix: Intended suffix for overwrite or append + args: Custom arguments + pipeline_options: Beam arguments in dict format + + Returns: + Beam Pipeline object + """ + + p1 = beam.Pipeline(options=pipeline_options) + initial = (p1 + | 'Start' >> beam.Create([args['input']]) + | 'Read' >> beam.io.ReadAllFromTFRecord(with_filename=True) + | 'Group' >> beam.GroupByKey() + | 'Write to ArrayRecord in GCS' >> beam.ParDo( + ConvertToArrayRecordGCS(), + args['output'], + file_path_suffix=file_path_suffix, + overwrite_extension=overwrite_extension)) + + return p1, initial