Skip to content

Commit

Permalink
Beam helper pipelines covering different conversion use-cases
Browse files Browse the repository at this point in the history
PiperOrigin-RevId: 579985171
  • Loading branch information
ArrayRecord Team authored and copybara-github committed Nov 7, 2023
1 parent b1403b5 commit bd74dae
Show file tree
Hide file tree
Showing 4 changed files with 306 additions and 0 deletions.
20 changes: 20 additions & 0 deletions beam/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
"""Apache Beam module for array_record.
This module provides both core components and
helper functions to enable users to convert different file formats to AR.
To keep dependencies light, we'll import Beam on module usage so any errors
occur early.
"""

import apache_beam as beam

# I'd really like a PEP8 compatible conditional import here with a more
# explicit error message. Example below:

# try:
# import apache_beam as beam
# except Exception as e:
# raise ImportError(
# ('Beam functionality requires extra dependencies. '
# 'Install apache-beam or run "pip install array_record[beam]".')) from e
67 changes: 67 additions & 0 deletions beam/arrayrecordio.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
"""An IO module for ArrayRecord.
CURRENTLY ONLY SINK IS IMPLEMENTED, AND IT DOESN'T WORK WITH NON-DISK WRITES
"""

from apache_beam import io
from apache_beam import transforms
from apache_beam.coders import coders
from apache_beam.io import filebasedsink
from apache_beam.io.filesystem.CompressionTypes import AUTO
from array_record.python.array_record_module import ArrayRecordWriter


class _ArrayRecordSink(filebasedsink.FileBasedSink):
"""Sink Class for use in Arrayrecord PTransform."""

def __init__(
self,
file_path_prefix,
file_name_suffix=None,
num_shards=0,
shard_name_template=None,
coder=coders.ToBytesCoder(),
compression_type=AUTO):

super().__init__(
file_path_prefix,
file_name_suffix=file_name_suffix,
num_shards=num_shards,
shard_name_template=shard_name_template,
coder=coder,
mime_type='application/octet-stream',
compression_type=compression_type)

def open(self, temp_path):
array_writer = ArrayRecordWriter(temp_path, 'group_size:1')
return array_writer

def close(self, file_handle):
file_handle.close()

def write_encoded_record(self, file_handle, value):
file_handle.write(value)


class WriteToArrayRecord(transforms.PTransform):
"""PTransform for a disk-based write to ArrayRecord."""

def __init__(
self,
file_path_prefix,
file_name_suffix='',
num_shards=0,
shard_name_template=None,
coder=coders.ToBytesCoder(),
compression_type=AUTO):

self._sink = _ArrayRecordSink(
file_path_prefix,
file_name_suffix,
num_shards,
shard_name_template,
coder,
compression_type)

def expand(self, pcoll):
return pcoll | io.iobase.Write(self._sink)
57 changes: 57 additions & 0 deletions beam/dofns.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
"""DoFn's for parallel processing."""

import os
import urllib
import apache_beam as beam
from array_record.python.array_record_module import ArrayRecordWriter
from google.cloud import storage


class ConvertToArrayRecordGCS(beam.DoFn):
"""Write a tuple consisting of a filename and records to GCS ArrayRecords."""

_WRITE_DIR = '/tmp/'

def process(
self,
element,
path,
write_dir=_WRITE_DIR,
file_path_suffix='.arrayrecord',
overwrite_extension=False,
):

## Upload to GCS
def upload_to_gcs(bucket_name, filename, prefix='', source_dir=self._WRITE_DIR):
source_filename = os.path.join(source_dir, filename)
blob_name = os.path.join(prefix, filename)
storage_client = storage.Client()
bucket = storage_client.get_bucket(bucket_name)
blob = bucket.blob(blob_name)
blob.upload_from_filename(source_filename)

## Simple logic for stripping a file extension and replacing it
def fix_filename(filename):
base_name = os.path.splitext(filename)[0]
new_filename = base_name + file_path_suffix
return new_filename

parsed_gcs_path = urllib.parse.urlparse(path)
bucket_name = parsed_gcs_path.hostname
gcs_prefix = parsed_gcs_path.path.lstrip('/')

if overwrite_extension:
filename = fix_filename(os.path.basename(element[0]))
else:
filename = '{}{}'.format(os.path.basename(element[0]), file_path_suffix)

write_path = os.path.join(write_dir, filename)
writer = ArrayRecordWriter(write_path, 'group_size:1')

for item in element[1]:
writer.write(item)

writer.close()

upload_to_gcs(bucket_name, filename, prefix=gcs_prefix)
os.remove(os.path.join(write_dir, filename))
162 changes: 162 additions & 0 deletions beam/pipelines.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
"""Various opinionated Beam pipelines for testing different functionality."""

import apache_beam as beam
from apache_beam.coders import coders
from array_record.beam.arrayrecordio import WriteToArrayRecord
from array_record.beam.dofns import ConvertToArrayRecordGCS
from .example import generate_movie_examples
from .options import get_arguments


## Grab CLI arguments.
## Override by passing args/pipeline_options to the function manually.
def_args, def_pipeline_options = get_arguments()


def example_to_tfrecord(
num_shards=1,
args=def_args,
pipeline_options=def_pipeline_options):
"""Beam pipeline for creating example TFRecord data.
Args:
num_shards: Number of files
args: Custom arguments
pipeline_options: Beam arguments in dict format
Returns:
Beam Pipeline object
"""

p1 = beam.Pipeline(options=pipeline_options)
initial = (p1
| 'Create' >> beam.Create(generate_movie_examples())
| 'Write' >> beam.io.WriteToTFRecord(
args['output'],
coder=coders.ToBytesCoder(),
num_shards=num_shards,
file_name_suffix='.tfrecord'))

return p1, initial


def example_to_arrayrecord(
num_shards=1,
args=def_args,
pipeline_options=def_pipeline_options):
"""Beam pipeline for creating example ArrayRecord data.
Args:
num_shards: Number of files
args: Custom arguments
pipeline_options: Beam arguments in dict format
Returns:
Beam Pipeline object
"""

p1 = beam.Pipeline(options=pipeline_options)
initial = (p1
| 'Create' >> beam.Create(generate_movie_examples())
| 'Write' >> WriteToArrayRecord(
args['output'],
coder=coders.ToBytesCoder(),
num_shards=num_shards,
file_name_suffix='.arrayrecord'))

return p1, initial


def convert_tf_to_arrayrecord_disk(
num_shards=1,
args=def_args,
pipeline_options=def_pipeline_options):
"""Convert TFRecords to ArrayRecords using sink/sharding functionality.
THIS ONLY WORKS FOR DISK ARRAYRECORD WRITES
Args:
num_shards: Number of files
args: Custom arguments
pipeline_options: Beam arguments in dict format
Returns:
Beam Pipeline object
"""

p1 = beam.Pipeline(options=pipeline_options)
initial = (p1
| 'Read TFRecord' >> beam.io.ReadFromTFRecord(args['input'])
| 'Write ArrayRecord' >> WriteToArrayRecord(
args['output'],
coder=coders.ToBytesCoder(),
num_shards=num_shards,
file_name_suffix='.arrayrecord'))

return p1, initial


def convert_tf_to_arrayrecord_disk_match_shards(
args=def_args,
pipeline_options=def_pipeline_options):
"""Convert TFRecords to matching number of ArrayRecords.
THIS ONLY WORKS FOR DISK ARRAYRECORD WRITES
Args:
args: Custom arguments
pipeline_options: Beam arguments in dict format
Returns:
Beam Pipeline object
"""

p1 = beam.Pipeline(options=pipeline_options)
initial = (p1
| 'Start' >> beam.Create([args['input']])
| 'Read' >> beam.io.ReadAllFromTFRecord(with_filename=True))

file_count = (initial
| 'Group' >> beam.GroupByKey()
| 'Count Shards' >> beam.combiners.Count.Globally())

write_files = (initial
| 'Drop Filename' >> beam.Map(lambda x: x[1])
| 'Write ArrayRecord' >> WriteToArrayRecord(
args['output'],
coder=coders.ToBytesCoder(),
num_shards=beam.pvalue.AsSingleton(file_count),
file_name_suffix='.arrayrecord'))

return p1, write_files


def convert_tf_to_arrayrecord_gcs(
overwrite_extension=False,
file_path_suffix='.arrayrecord',
args=def_args,
pipeline_options=def_pipeline_options):
"""Convert TFRecords to ArrayRecords in GCS 1:1.
Args:
overwrite_extension: Boolean making DoFn attempt to overwrite extension
file_path_suffix: Intended suffix for overwrite or append
args: Custom arguments
pipeline_options: Beam arguments in dict format
Returns:
Beam Pipeline object
"""

p1 = beam.Pipeline(options=pipeline_options)
initial = (p1
| 'Start' >> beam.Create([args['input']])
| 'Read' >> beam.io.ReadAllFromTFRecord(with_filename=True)
| 'Group' >> beam.GroupByKey()
| 'Write to ArrayRecord in GCS' >> beam.ParDo(
ConvertToArrayRecordGCS(),
args['output'],
file_path_suffix=file_path_suffix,
overwrite_extension=overwrite_extension))

return p1, initial

0 comments on commit bd74dae

Please sign in to comment.