Skip to content

Commit

Permalink
Use new BigQuery sink.
Browse files Browse the repository at this point in the history
  • Loading branch information
tneymanov committed Feb 10, 2020
1 parent dd79dc5 commit 2895b87
Show file tree
Hide file tree
Showing 12 changed files with 26 additions and 215 deletions.
29 changes: 4 additions & 25 deletions docs/large_inputs.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ Default settings:
--worker_machine_type <default n1-standard-1> \
--disk_size_gb <default 250> \
--worker_disk_type <default PD> \
--num_bigquery_write_shards <default 1> \
--partition_config_path <default None> \
```

Expand Down Expand Up @@ -98,8 +97,7 @@ transforms (e.g. the sample name is repeated in every record in the BigQuery
output rather than just being specified once as in the VCF header), you
typically need 3 to 4 times the total size of the raw VCF files.

In addition, if [merging](variant_merging.md) or
[--num_bigquery_write_shards](#--num_bigquery_write_shards) is enabled, you may
In addition, if [merging](variant_merging.md) is enabled, you may
need more disk per worker (e.g. 500GB) as the same variants need to be
aggregated together on one machine.

Expand All @@ -110,32 +108,14 @@ more expensive. However, when choosing a large machine (e.g. `n1-standard-16`),
they can reduce cost as they can avoid idle CPU cycles due to disk IOPS
limitations.

As a result, we recommend using SSDs if [merging](variant_merge.md) or
[--num_bigquery_write_shards](#--num_bigquery_write_shards) is enabled: these
operations require "shuffling" the data (i.e. redistributing the data among
workers), which require significant disk I/O.
As a result, we recommend using SSDs if [merging](variant_merge.md) is enabled:
this operation requires "shuffling" the data (i.e. redistributing the data
among workers), which requires significant disk I/O.

Set
`--worker_disk_type compute.googleapis.com/projects//zones//diskTypes/pd-ssd`
to use SSDs.

### `--num_bigquery_write_shards`

Currently, the write operation to BigQuery in Dataflow is performed as a
postprocessing step after the main transforms are done. As a workaround for
BigQuery write limitations (more details
[here](https://github.com/googlegenomics/gcp-variant-transforms/issues/199)),
we have added "sharding" when writing to BigQuery. This makes the data load
to BigQuery significantly faster as it parallelizes the process and enables
loading large (>5TB) data to BigQuery at once.

As a result, we recommend setting `--num_bigquery_write_shards 20` when loading
any data that has more than 1 billion rows (after merging) or 1TB of final
output. You may use a smaller number of write shards (e.g. 5) when using
[sharded output](#--sharding_config_path) as each partition also acts as a
"shard". Note that using a larger value (e.g. 50) can cause BigQuery write to
fail as there is a maximum limit on the number of concurrent writes per table.

### `--sharding_config_path`

Sharding the output can save significant query costs once the data is in
Expand All @@ -146,4 +126,3 @@ partition).
As a result, we recommend setting the partition config for very large data
where possible. Please see the [documentation](sharding.md) for more
details.

7 changes: 1 addition & 6 deletions gcp_variant_transforms/options/variant_transform_options.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,12 +185,7 @@ def add_arguments(self, parser):
parser.add_argument(
'--num_bigquery_write_shards',
type=int, default=1,
help=('Before writing the final result to output BigQuery, the data is '
'sharded to avoid a known failure for very large inputs (issue '
'#199). Setting this flag to 1 will avoid this extra sharding.'
'It is recommended to use 20 for loading large inputs without '
'merging. Use a smaller value (2 or 3) if both merging and '
'optimize_for_large_inputs are enabled.'))
help=('This flag is deprecated and may be removed in future releases.'))
parser.add_argument(
'--null_numeric_value_replacement',
type=int,
Expand Down
3 changes: 3 additions & 0 deletions gcp_variant_transforms/pipeline_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,9 @@ def parse_args(argv, command_line_options):
if hasattr(known_args, 'input_pattern') or hasattr(known_args, 'input_file'):
known_args.all_patterns = _get_all_patterns(
known_args.input_pattern, known_args.input_file)

# Enable new BQ sink experiment.
pipeline_args += ['--experiment', 'use_beam_bq_sink']
return known_args, pipeline_args


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
"worker_machine_type": "n1-standard-64",
"max_num_workers": "64",
"num_workers": "20",
"num_bigquery_write_shards": "2",
"assertion_configs": [
{
"query": ["NUM_ROWS_QUERY"],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
"worker_machine_type": "n1-standard-16",
"max_num_workers": "20",
"num_workers": "20",
"num_bigquery_write_shards": "2",
"assertion_configs": [
{
"query": ["NUM_ROWS_QUERY"],
Expand Down Expand Up @@ -69,4 +68,3 @@
]
}
]

Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
"worker_machine_type": "n1-standard-16",
"max_num_workers": "20",
"num_workers": "20",
"num_bigquery_write_shards": "20",
"assertion_configs": [
{
"query": ["NUM_ROWS_QUERY"],
Expand Down
57 changes: 0 additions & 57 deletions gcp_variant_transforms/transforms/limit_write.py

This file was deleted.

76 changes: 0 additions & 76 deletions gcp_variant_transforms/transforms/limit_write_test.py

This file was deleted.

5 changes: 4 additions & 1 deletion gcp_variant_transforms/transforms/sample_info_to_bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ def __init__(self, output_table_prefix, sample_name_encoding, append=False):
self._append = append
self._sample_name_encoding = sample_name_encoding
self._schema = sample_info_table_schema_generator.generate_schema()
self._temp_location = temp_location

def expand(self, pcoll):
return (pcoll
Expand All @@ -82,4 +83,6 @@ def expand(self, pcoll):
write_disposition=(
beam.io.BigQueryDisposition.WRITE_APPEND
if self._append
else beam.io.BigQueryDisposition.WRITE_TRUNCATE))))
else beam.io.BigQueryDisposition.WRITE_TRUNCATE),
method=beam.io.WriteToBigQuery.Method.FILE_LOADS,
custom_gcs_temp_location=self._temp_location))
49 changes: 9 additions & 40 deletions gcp_variant_transforms/transforms/variant_to_bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

from __future__ import absolute_import

import random
from typing import Dict, List # pylint: disable=unused-import

import apache_beam as beam
Expand All @@ -29,13 +28,6 @@
from gcp_variant_transforms.libs import processed_variant
from gcp_variant_transforms.libs import vcf_field_conflict_resolver
from gcp_variant_transforms.libs.variant_merge import variant_merge_strategy # pylint: disable=unused-import
from gcp_variant_transforms.transforms import limit_write


# TODO(samanvp): remove this hack when BQ custom sink is added to Python SDK,
# see: https://issues.apache.org/jira/browse/BEAM-2801
# This has to be less than 10000.
_WRITE_SHARDS_LIMIT = 1000


@beam.typehints.with_input_types(processed_variant.ProcessedVariant)
Expand Down Expand Up @@ -71,8 +63,8 @@ def __init__(
update_schema_on_append=False, # type: bool
allow_incompatible_records=False, # type: bool
omit_empty_sample_calls=False, # type: bool
num_bigquery_write_shards=1, # type: int
null_numeric_value_replacement=None # type: int
null_numeric_value_replacement=None # type: int,

):
# type: (...) -> None
"""Initializes the transform.
Expand All @@ -88,8 +80,6 @@ def __init__(
+ schema if there is a mismatch.
omit_empty_sample_calls: If true, samples that don't have a given call
will be omitted.
num_bigquery_write_shards: If > 1, we will limit number of sources which
are used for writing to the output BigQuery table.
null_numeric_value_replacement: the value to use instead of null for
numeric (float/int/long) lists. For instance, [0, None, 1] will become
[0, `null_numeric_value_replacement`, 1]. If not set, the value will set
Expand All @@ -109,7 +99,6 @@ def __init__(

self._allow_incompatible_records = allow_incompatible_records
self._omit_empty_sample_calls = omit_empty_sample_calls
self._num_bigquery_write_shards = num_bigquery_write_shards
if update_schema_on_append:
bigquery_util.update_bigquery_schema_on_append(self._schema.fields,
self._output_table)
Expand All @@ -120,35 +109,15 @@ def expand(self, pcoll):
self._bigquery_row_generator,
self._allow_incompatible_records,
self._omit_empty_sample_calls))
if self._num_bigquery_write_shards > 1:
# We split data into self._num_bigquery_write_shards random partitions
# and then write each part to final BQ by appending them together.
# Combined with LimitWrite transform, this will avoid the BQ failure.
bq_row_partitions = bq_rows | beam.Partition(
lambda _, n: random.randint(0, n - 1),
self._num_bigquery_write_shards)
bq_writes = []
for i in range(self._num_bigquery_write_shards):
bq_rows = (bq_row_partitions[i] | 'LimitWrite' + str(i) >>
limit_write.LimitWrite(_WRITE_SHARDS_LIMIT))
bq_writes.append(
bq_rows | 'WriteToBigQuery' + str(i) >>
beam.io.Write(beam.io.BigQuerySink(
return (bq_rows
| 'WriteToBigQuery' >> beam.io.WriteToBigQuery(
self._output_table,
schema=self._schema,
create_disposition=(
beam.io.BigQueryDisposition.CREATE_IF_NEEDED),
write_disposition=(
beam.io.BigQueryDisposition.WRITE_APPEND))))
return bq_writes
else:
return (bq_rows
| 'WriteToBigQuery' >> beam.io.Write(beam.io.BigQuerySink(
self._output_table,
schema=self._schema,
create_disposition=(
beam.io.BigQueryDisposition.CREATE_IF_NEEDED),
write_disposition=(
beam.io.BigQueryDisposition.WRITE_APPEND
if self._append
else beam.io.BigQueryDisposition.WRITE_TRUNCATE))))
beam.io.BigQueryDisposition.WRITE_APPEND
if self._append
else beam.io.BigQueryDisposition.WRITE_TRUNCATE),
method=beam.io.WriteToBigQuery.Method.FILE_LOADS,
custom_gcs_temp_location=self._temp_location))
8 changes: 4 additions & 4 deletions gcp_variant_transforms/vcf_to_bq.py
Original file line number Diff line number Diff line change
Expand Up @@ -389,7 +389,8 @@ def _run_annotation_pipeline(known_args, pipeline_args):

def _create_sample_info_table(pipeline, # type: beam.Pipeline
pipeline_mode, # type: PipelineModes
known_args, # type: argparse.Namespace
known_args, # type: argparse.Namespace,
pipeline_args, # type: List[str]
):
# type: (...) -> None
headers = pipeline_common.read_headers(
Expand All @@ -409,7 +410,6 @@ def run(argv=None):
logging.info('Command: %s', ' '.join(argv or sys.argv))
known_args, pipeline_args = pipeline_common.parse_args(argv,
_COMMAND_LINE_OPTIONS)

if known_args.auto_flags_experiment:
_get_input_dimensions(known_args, pipeline_args)

Expand Down Expand Up @@ -494,11 +494,11 @@ def run(argv=None):
update_schema_on_append=known_args.update_schema_on_append,
allow_incompatible_records=known_args.allow_incompatible_records,
omit_empty_sample_calls=known_args.omit_empty_sample_calls,
num_bigquery_write_shards=known_args.num_bigquery_write_shards,
null_numeric_value_replacement=(
known_args.null_numeric_value_replacement)))
if known_args.generate_sample_info_table:
_create_sample_info_table(pipeline, pipeline_mode, known_args)
_create_sample_info_table(
pipeline, pipeline_mode, known_args, pipeline_args)

if known_args.output_avro_path:
# TODO(bashir2): Add an integration test that outputs to Avro files and
Expand Down
3 changes: 1 addition & 2 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,7 @@
'google-api-python-client>=1.6',
'intervaltree>=2.1.0,<2.2.0',
'mmh3<2.6',
# Refer to issue #528
'google-cloud-storage<1.23.0',
'google-cloud-storage',
'pyfarmhash',
'pyyaml'
]
Expand Down

0 comments on commit 2895b87

Please sign in to comment.