From 2895b8737a378a6b7e6b31740c98e7b0b212ade6 Mon Sep 17 00:00:00 2001 From: Tural Neymanov Date: Mon, 24 Jun 2019 14:13:55 -0400 Subject: [PATCH 1/5] Use new BigQuery sink. --- docs/large_inputs.md | 29 +------ .../options/variant_transform_options.py | 7 +- gcp_variant_transforms/pipeline_common.py | 3 + .../huge_tests/test_1000_genomes.json | 1 - .../option_optimize_for_large_inputs.json | 2 - .../large_tests/platinum_no_merge.json | 1 - .../transforms/limit_write.py | 57 -------------- .../transforms/limit_write_test.py | 76 ------------------- .../transforms/sample_info_to_bigquery.py | 5 +- .../transforms/variant_to_bigquery.py | 49 +++--------- gcp_variant_transforms/vcf_to_bq.py | 8 +- setup.py | 3 +- 12 files changed, 26 insertions(+), 215 deletions(-) delete mode 100644 gcp_variant_transforms/transforms/limit_write.py delete mode 100644 gcp_variant_transforms/transforms/limit_write_test.py diff --git a/docs/large_inputs.md b/docs/large_inputs.md index eae4770c1..ccf8adfc9 100644 --- a/docs/large_inputs.md +++ b/docs/large_inputs.md @@ -14,7 +14,6 @@ Default settings: --worker_machine_type \ --disk_size_gb \ --worker_disk_type \ - --num_bigquery_write_shards \ --partition_config_path \ ``` @@ -98,8 +97,7 @@ transforms (e.g. the sample name is repeated in every record in the BigQuery output rather than just being specified once as in the VCF header), you typically need 3 to 4 times the total size of the raw VCF files. -In addition, if [merging](variant_merging.md) or -[--num_bigquery_write_shards](#--num_bigquery_write_shards) is enabled, you may +In addition, if [merging](variant_merging.md) is enabled, you may need more disk per worker (e.g. 500GB) as the same variants need to be aggregated together on one machine. @@ -110,32 +108,14 @@ more expensive. However, when choosing a large machine (e.g. `n1-standard-16`), they can reduce cost as they can avoid idle CPU cycles due to disk IOPS limitations. -As a result, we recommend using SSDs if [merging](variant_merge.md) or -[--num_bigquery_write_shards](#--num_bigquery_write_shards) is enabled: these -operations require "shuffling" the data (i.e. redistributing the data among -workers), which require significant disk I/O. +As a result, we recommend using SSDs if [merging](variant_merge.md) is enabled: +this operation requires "shuffling" the data (i.e. redistributing the data +among workers), which requires significant disk I/O. Set `--worker_disk_type compute.googleapis.com/projects//zones//diskTypes/pd-ssd` to use SSDs. -### `--num_bigquery_write_shards` - -Currently, the write operation to BigQuery in Dataflow is performed as a -postprocessing step after the main transforms are done. As a workaround for -BigQuery write limitations (more details -[here](https://github.com/googlegenomics/gcp-variant-transforms/issues/199)), -we have added "sharding" when writing to BigQuery. This makes the data load -to BigQuery significantly faster as it parallelizes the process and enables -loading large (>5TB) data to BigQuery at once. - -As a result, we recommend setting `--num_bigquery_write_shards 20` when loading -any data that has more than 1 billion rows (after merging) or 1TB of final -output. You may use a smaller number of write shards (e.g. 5) when using -[sharded output](#--sharding_config_path) as each partition also acts as a -"shard". Note that using a larger value (e.g. 50) can cause BigQuery write to -fail as there is a maximum limit on the number of concurrent writes per table. - ### `--sharding_config_path` Sharding the output can save significant query costs once the data is in @@ -146,4 +126,3 @@ partition). As a result, we recommend setting the partition config for very large data where possible. Please see the [documentation](sharding.md) for more details. - diff --git a/gcp_variant_transforms/options/variant_transform_options.py b/gcp_variant_transforms/options/variant_transform_options.py index 94ef2d405..40b92dde6 100644 --- a/gcp_variant_transforms/options/variant_transform_options.py +++ b/gcp_variant_transforms/options/variant_transform_options.py @@ -185,12 +185,7 @@ def add_arguments(self, parser): parser.add_argument( '--num_bigquery_write_shards', type=int, default=1, - help=('Before writing the final result to output BigQuery, the data is ' - 'sharded to avoid a known failure for very large inputs (issue ' - '#199). Setting this flag to 1 will avoid this extra sharding.' - 'It is recommended to use 20 for loading large inputs without ' - 'merging. Use a smaller value (2 or 3) if both merging and ' - 'optimize_for_large_inputs are enabled.')) + help=('This flag is deprecated and may be removed in future releases.')) parser.add_argument( '--null_numeric_value_replacement', type=int, diff --git a/gcp_variant_transforms/pipeline_common.py b/gcp_variant_transforms/pipeline_common.py index 525dc047b..378738af6 100644 --- a/gcp_variant_transforms/pipeline_common.py +++ b/gcp_variant_transforms/pipeline_common.py @@ -77,6 +77,9 @@ def parse_args(argv, command_line_options): if hasattr(known_args, 'input_pattern') or hasattr(known_args, 'input_file'): known_args.all_patterns = _get_all_patterns( known_args.input_pattern, known_args.input_file) + + # Enable new BQ sink experiment. + pipeline_args += ['--experiment', 'use_beam_bq_sink'] return known_args, pipeline_args diff --git a/gcp_variant_transforms/testing/integration/vcf_to_bq_tests/huge_tests/test_1000_genomes.json b/gcp_variant_transforms/testing/integration/vcf_to_bq_tests/huge_tests/test_1000_genomes.json index 871f3fb01..48ce40d43 100644 --- a/gcp_variant_transforms/testing/integration/vcf_to_bq_tests/huge_tests/test_1000_genomes.json +++ b/gcp_variant_transforms/testing/integration/vcf_to_bq_tests/huge_tests/test_1000_genomes.json @@ -10,7 +10,6 @@ "worker_machine_type": "n1-standard-64", "max_num_workers": "64", "num_workers": "20", - "num_bigquery_write_shards": "2", "assertion_configs": [ { "query": ["NUM_ROWS_QUERY"], diff --git a/gcp_variant_transforms/testing/integration/vcf_to_bq_tests/presubmit_tests/large_tests/option_optimize_for_large_inputs.json b/gcp_variant_transforms/testing/integration/vcf_to_bq_tests/presubmit_tests/large_tests/option_optimize_for_large_inputs.json index cd1b958bb..7bfd502d8 100644 --- a/gcp_variant_transforms/testing/integration/vcf_to_bq_tests/presubmit_tests/large_tests/option_optimize_for_large_inputs.json +++ b/gcp_variant_transforms/testing/integration/vcf_to_bq_tests/presubmit_tests/large_tests/option_optimize_for_large_inputs.json @@ -10,7 +10,6 @@ "worker_machine_type": "n1-standard-16", "max_num_workers": "20", "num_workers": "20", - "num_bigquery_write_shards": "2", "assertion_configs": [ { "query": ["NUM_ROWS_QUERY"], @@ -69,4 +68,3 @@ ] } ] - diff --git a/gcp_variant_transforms/testing/integration/vcf_to_bq_tests/presubmit_tests/large_tests/platinum_no_merge.json b/gcp_variant_transforms/testing/integration/vcf_to_bq_tests/presubmit_tests/large_tests/platinum_no_merge.json index c66767591..6b664dd11 100644 --- a/gcp_variant_transforms/testing/integration/vcf_to_bq_tests/presubmit_tests/large_tests/platinum_no_merge.json +++ b/gcp_variant_transforms/testing/integration/vcf_to_bq_tests/presubmit_tests/large_tests/platinum_no_merge.json @@ -8,7 +8,6 @@ "worker_machine_type": "n1-standard-16", "max_num_workers": "20", "num_workers": "20", - "num_bigquery_write_shards": "20", "assertion_configs": [ { "query": ["NUM_ROWS_QUERY"], diff --git a/gcp_variant_transforms/transforms/limit_write.py b/gcp_variant_transforms/transforms/limit_write.py deleted file mode 100644 index 0f910dc71..000000000 --- a/gcp_variant_transforms/transforms/limit_write.py +++ /dev/null @@ -1,57 +0,0 @@ -# Copyright 2018 Google Inc. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -"""A PTransform to limit BQ sink from producing too many files (shards) - -This is a work around to avoid the following failure: - BigQuery execution failed., Error: - Message: Too many sources provided: xxxxx. Limit is 10000. -To limit sink we generate a random dummy key and group by input elements (which -are BigQuery rows) based on that key before writing them to output table. -""" - -from __future__ import absolute_import - -import random -import apache_beam as beam - - -class _RoundRobinKeyFn(beam.DoFn): - def __init__(self, count): - # type: (int) -> None - self._count = count - # This attribute will be properly initiated at each worker by start_bundle() - self._counter = 0 - - def start_bundle(self): - # type: () -> None - self._counter = random.randint(0, self._count - 1) - - def process(self, element): - self._counter += 1 - if self._counter >= self._count: - self._counter -= self._count - yield self._counter, element - - -class LimitWrite(beam.PTransform): - def __init__(self, count): - # type: (int) -> None - self._count = count - - def expand(self, pcoll): - return (pcoll - | beam.ParDo(_RoundRobinKeyFn(self._count)) - | beam.GroupByKey() - | beam.FlatMap(lambda kv: kv[1])) diff --git a/gcp_variant_transforms/transforms/limit_write_test.py b/gcp_variant_transforms/transforms/limit_write_test.py deleted file mode 100644 index 980e1cc87..000000000 --- a/gcp_variant_transforms/transforms/limit_write_test.py +++ /dev/null @@ -1,76 +0,0 @@ -# Copyright 2018 Google Inc. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -"""Tests for limit_write module.""" - -from __future__ import absolute_import - -import unittest - -from apache_beam.testing.test_pipeline import TestPipeline -from apache_beam.testing.util import assert_that -from apache_beam.testing.util import equal_to -from apache_beam.transforms import Create - -from gcp_variant_transforms.beam_io import vcfio -from gcp_variant_transforms.transforms import limit_write - - -class LimitWriteTest(unittest.TestCase): - """Test cases for the ``LimitWrite`` PTransform.""" - - def _get_sample_variants(self): - variant1 = vcfio.Variant( - reference_name='chr19', start=11, end=12, reference_bases='C') - variant2 = vcfio.Variant( - reference_name='20', start=123, end=125, reference_bases='CT') - variant3 = vcfio.Variant( - reference_name='20', start=None, end=None, reference_bases=None) - variant4 = vcfio.Variant( - reference_name='20', start=123, end=125, reference_bases='CT') - return [variant1, variant2, variant3, variant4] - - - def test_limit_write_default_shard_limit(self): - variants = self._get_sample_variants() - input_pcoll = Create(variants) - pipeline = TestPipeline() - output_pcoll = ( - pipeline - | input_pcoll - | 'LimitWrite' >> limit_write.LimitWrite(4500)) - assert_that(output_pcoll, equal_to(variants)) - pipeline.run() - - def test_limit_write_shard_limit_4(self): - variants = self._get_sample_variants() - input_pcoll = Create(variants) - pipeline = TestPipeline() - output_pcoll = ( - pipeline - | input_pcoll - | 'LimitWrite' >> limit_write.LimitWrite(4)) - assert_that(output_pcoll, equal_to(variants)) - pipeline.run() - - def test_limit_write_shard_limit_1(self): - variants = self._get_sample_variants() - input_pcoll = Create(variants) - pipeline = TestPipeline() - output_pcoll = ( - pipeline - | input_pcoll - | 'LimitWrite' >> limit_write.LimitWrite(1)) - assert_that(output_pcoll, equal_to(variants)) - pipeline.run() diff --git a/gcp_variant_transforms/transforms/sample_info_to_bigquery.py b/gcp_variant_transforms/transforms/sample_info_to_bigquery.py index 7a4fb804f..fde467de1 100644 --- a/gcp_variant_transforms/transforms/sample_info_to_bigquery.py +++ b/gcp_variant_transforms/transforms/sample_info_to_bigquery.py @@ -69,6 +69,7 @@ def __init__(self, output_table_prefix, sample_name_encoding, append=False): self._append = append self._sample_name_encoding = sample_name_encoding self._schema = sample_info_table_schema_generator.generate_schema() + self._temp_location = temp_location def expand(self, pcoll): return (pcoll @@ -82,4 +83,6 @@ def expand(self, pcoll): write_disposition=( beam.io.BigQueryDisposition.WRITE_APPEND if self._append - else beam.io.BigQueryDisposition.WRITE_TRUNCATE)))) + else beam.io.BigQueryDisposition.WRITE_TRUNCATE), + method=beam.io.WriteToBigQuery.Method.FILE_LOADS, + custom_gcs_temp_location=self._temp_location)) diff --git a/gcp_variant_transforms/transforms/variant_to_bigquery.py b/gcp_variant_transforms/transforms/variant_to_bigquery.py index 2d01dc4cf..5a954e502 100644 --- a/gcp_variant_transforms/transforms/variant_to_bigquery.py +++ b/gcp_variant_transforms/transforms/variant_to_bigquery.py @@ -16,7 +16,6 @@ from __future__ import absolute_import -import random from typing import Dict, List # pylint: disable=unused-import import apache_beam as beam @@ -29,13 +28,6 @@ from gcp_variant_transforms.libs import processed_variant from gcp_variant_transforms.libs import vcf_field_conflict_resolver from gcp_variant_transforms.libs.variant_merge import variant_merge_strategy # pylint: disable=unused-import -from gcp_variant_transforms.transforms import limit_write - - -# TODO(samanvp): remove this hack when BQ custom sink is added to Python SDK, -# see: https://issues.apache.org/jira/browse/BEAM-2801 -# This has to be less than 10000. -_WRITE_SHARDS_LIMIT = 1000 @beam.typehints.with_input_types(processed_variant.ProcessedVariant) @@ -71,8 +63,8 @@ def __init__( update_schema_on_append=False, # type: bool allow_incompatible_records=False, # type: bool omit_empty_sample_calls=False, # type: bool - num_bigquery_write_shards=1, # type: int - null_numeric_value_replacement=None # type: int + null_numeric_value_replacement=None # type: int, + ): # type: (...) -> None """Initializes the transform. @@ -88,8 +80,6 @@ def __init__( + schema if there is a mismatch. omit_empty_sample_calls: If true, samples that don't have a given call will be omitted. - num_bigquery_write_shards: If > 1, we will limit number of sources which - are used for writing to the output BigQuery table. null_numeric_value_replacement: the value to use instead of null for numeric (float/int/long) lists. For instance, [0, None, 1] will become [0, `null_numeric_value_replacement`, 1]. If not set, the value will set @@ -109,7 +99,6 @@ def __init__( self._allow_incompatible_records = allow_incompatible_records self._omit_empty_sample_calls = omit_empty_sample_calls - self._num_bigquery_write_shards = num_bigquery_write_shards if update_schema_on_append: bigquery_util.update_bigquery_schema_on_append(self._schema.fields, self._output_table) @@ -120,35 +109,15 @@ def expand(self, pcoll): self._bigquery_row_generator, self._allow_incompatible_records, self._omit_empty_sample_calls)) - if self._num_bigquery_write_shards > 1: - # We split data into self._num_bigquery_write_shards random partitions - # and then write each part to final BQ by appending them together. - # Combined with LimitWrite transform, this will avoid the BQ failure. - bq_row_partitions = bq_rows | beam.Partition( - lambda _, n: random.randint(0, n - 1), - self._num_bigquery_write_shards) - bq_writes = [] - for i in range(self._num_bigquery_write_shards): - bq_rows = (bq_row_partitions[i] | 'LimitWrite' + str(i) >> - limit_write.LimitWrite(_WRITE_SHARDS_LIMIT)) - bq_writes.append( - bq_rows | 'WriteToBigQuery' + str(i) >> - beam.io.Write(beam.io.BigQuerySink( + return (bq_rows + | 'WriteToBigQuery' >> beam.io.WriteToBigQuery( self._output_table, schema=self._schema, create_disposition=( beam.io.BigQueryDisposition.CREATE_IF_NEEDED), write_disposition=( - beam.io.BigQueryDisposition.WRITE_APPEND)))) - return bq_writes - else: - return (bq_rows - | 'WriteToBigQuery' >> beam.io.Write(beam.io.BigQuerySink( - self._output_table, - schema=self._schema, - create_disposition=( - beam.io.BigQueryDisposition.CREATE_IF_NEEDED), - write_disposition=( - beam.io.BigQueryDisposition.WRITE_APPEND - if self._append - else beam.io.BigQueryDisposition.WRITE_TRUNCATE)))) + beam.io.BigQueryDisposition.WRITE_APPEND + if self._append + else beam.io.BigQueryDisposition.WRITE_TRUNCATE), + method=beam.io.WriteToBigQuery.Method.FILE_LOADS, + custom_gcs_temp_location=self._temp_location)) diff --git a/gcp_variant_transforms/vcf_to_bq.py b/gcp_variant_transforms/vcf_to_bq.py index 056a300b7..58daae61b 100644 --- a/gcp_variant_transforms/vcf_to_bq.py +++ b/gcp_variant_transforms/vcf_to_bq.py @@ -389,7 +389,8 @@ def _run_annotation_pipeline(known_args, pipeline_args): def _create_sample_info_table(pipeline, # type: beam.Pipeline pipeline_mode, # type: PipelineModes - known_args, # type: argparse.Namespace + known_args, # type: argparse.Namespace, + pipeline_args, # type: List[str] ): # type: (...) -> None headers = pipeline_common.read_headers( @@ -409,7 +410,6 @@ def run(argv=None): logging.info('Command: %s', ' '.join(argv or sys.argv)) known_args, pipeline_args = pipeline_common.parse_args(argv, _COMMAND_LINE_OPTIONS) - if known_args.auto_flags_experiment: _get_input_dimensions(known_args, pipeline_args) @@ -494,11 +494,11 @@ def run(argv=None): update_schema_on_append=known_args.update_schema_on_append, allow_incompatible_records=known_args.allow_incompatible_records, omit_empty_sample_calls=known_args.omit_empty_sample_calls, - num_bigquery_write_shards=known_args.num_bigquery_write_shards, null_numeric_value_replacement=( known_args.null_numeric_value_replacement))) if known_args.generate_sample_info_table: - _create_sample_info_table(pipeline, pipeline_mode, known_args) + _create_sample_info_table( + pipeline, pipeline_mode, known_args, pipeline_args) if known_args.output_avro_path: # TODO(bashir2): Add an integration test that outputs to Avro files and diff --git a/setup.py b/setup.py index 194843822..b6f139ea4 100644 --- a/setup.py +++ b/setup.py @@ -38,8 +38,7 @@ 'google-api-python-client>=1.6', 'intervaltree>=2.1.0,<2.2.0', 'mmh3<2.6', - # Refer to issue #528 - 'google-cloud-storage<1.23.0', + 'google-cloud-storage', 'pyfarmhash', 'pyyaml' ] From 5dd0a66a7fc021df4b2181bd11886b5b95ec113e Mon Sep 17 00:00:00 2001 From: Tural Neymanov Date: Mon, 13 Jan 2020 18:44:09 -0500 Subject: [PATCH 2/5] Address first iteration of comments. --- gcp_variant_transforms/vcf_to_bq.py | 13 ++++++++----- setup.py | 3 ++- 2 files changed, 10 insertions(+), 6 deletions(-) diff --git a/gcp_variant_transforms/vcf_to_bq.py b/gcp_variant_transforms/vcf_to_bq.py index 58daae61b..5b98a79f0 100644 --- a/gcp_variant_transforms/vcf_to_bq.py +++ b/gcp_variant_transforms/vcf_to_bq.py @@ -390,7 +390,7 @@ def _run_annotation_pipeline(known_args, pipeline_args): def _create_sample_info_table(pipeline, # type: beam.Pipeline pipeline_mode, # type: PipelineModes known_args, # type: argparse.Namespace, - pipeline_args, # type: List[str] + temp_directory, # str ): # type: (...) -> None headers = pipeline_common.read_headers( @@ -410,6 +410,8 @@ def run(argv=None): logging.info('Command: %s', ' '.join(argv or sys.argv)) known_args, pipeline_args = pipeline_common.parse_args(argv, _COMMAND_LINE_OPTIONS) + if known_args.output_table and '--temp_location' not in pipeline_args: + raise ValueError('--temp_location is required for BigQuery imports.') if known_args.auto_flags_experiment: _get_input_dimensions(known_args, pipeline_args) @@ -483,9 +485,10 @@ def run(argv=None): file_to_write.write(schema_json) for i in range(num_shards): - table_suffix = sharding.get_output_table_suffix(i) - table_name = sample_info_table_schema_generator.compose_table_name( - known_args.output_table, table_suffix) + table_suffix = '' + if sharding and sharding.get_shard_name(i): + table_suffix = '_' + sharding.get_shard_name(i) + table_name = known_args.output_table + table_suffix _ = (variants[i] | 'VariantToBigQuery' + table_suffix >> variant_to_bigquery.VariantToBigQuery( table_name, @@ -498,7 +501,7 @@ def run(argv=None): known_args.null_numeric_value_replacement))) if known_args.generate_sample_info_table: _create_sample_info_table( - pipeline, pipeline_mode, known_args, pipeline_args) + pipeline, pipeline_mode, known_args) if known_args.output_avro_path: # TODO(bashir2): Add an integration test that outputs to Avro files and diff --git a/setup.py b/setup.py index b6f139ea4..194843822 100644 --- a/setup.py +++ b/setup.py @@ -38,7 +38,8 @@ 'google-api-python-client>=1.6', 'intervaltree>=2.1.0,<2.2.0', 'mmh3<2.6', - 'google-cloud-storage', + # Refer to issue #528 + 'google-cloud-storage<1.23.0', 'pyfarmhash', 'pyyaml' ] From d7c4271694bb151265e25c8878257293d8ad1427 Mon Sep 17 00:00:00 2001 From: Tural Neymanov Date: Tue, 14 Jan 2020 16:17:13 -0500 Subject: [PATCH 3/5] Address second iteration of comments. --- .../options/variant_transform_options.py | 3 ++- gcp_variant_transforms/pipeline_common.py | 16 +++++++++++++--- gcp_variant_transforms/pipeline_common_test.py | 18 ++++++++++++++---- .../transforms/sample_info_to_bigquery.py | 4 +--- .../transforms/variant_to_bigquery.py | 3 +-- gcp_variant_transforms/vcf_to_bq.py | 9 +++------ 6 files changed, 34 insertions(+), 19 deletions(-) diff --git a/gcp_variant_transforms/options/variant_transform_options.py b/gcp_variant_transforms/options/variant_transform_options.py index 40b92dde6..086ab6904 100644 --- a/gcp_variant_transforms/options/variant_transform_options.py +++ b/gcp_variant_transforms/options/variant_transform_options.py @@ -185,7 +185,8 @@ def add_arguments(self, parser): parser.add_argument( '--num_bigquery_write_shards', type=int, default=1, - help=('This flag is deprecated and may be removed in future releases.')) + help=('This flag is deprecated and will be removed in future ' + 'releases.')) parser.add_argument( '--null_numeric_value_replacement', type=int, diff --git a/gcp_variant_transforms/pipeline_common.py b/gcp_variant_transforms/pipeline_common.py index 378738af6..fcb8308c2 100644 --- a/gcp_variant_transforms/pipeline_common.py +++ b/gcp_variant_transforms/pipeline_common.py @@ -73,7 +73,9 @@ def parse_args(argv, command_line_options): known_args, pipeline_args = parser.parse_known_args(argv) for transform_options in options: transform_options.validate(known_args) - _raise_error_on_invalid_flags(pipeline_args) + _raise_error_on_invalid_flags( + pipeline_args, + known_args.output_table if hasattr(known_args, 'output_table') else None) if hasattr(known_args, 'input_pattern') or hasattr(known_args, 'input_file'): known_args.all_patterns = _get_all_patterns( known_args.input_pattern, known_args.input_file) @@ -304,8 +306,8 @@ def write_headers(merged_header, file_path): vcf_header_io.WriteVcfHeaders(file_path)) -def _raise_error_on_invalid_flags(pipeline_args): - # type: (List[str]) -> None +def _raise_error_on_invalid_flags(pipeline_args, output_table): + # type: (List[str], Any) -> None """Raises an error if there are unrecognized flags.""" parser = argparse.ArgumentParser() for cls in pipeline_options.PipelineOptions.__subclasses__(): @@ -318,6 +320,14 @@ def _raise_error_on_invalid_flags(pipeline_args): not known_pipeline_args.setup_file): raise ValueError('The --setup_file flag is required for DataflowRunner. ' 'Please provide a path to the setup.py file.') + if output_table: + if (not hasattr(known_pipeline_args, 'temp_location') or + not known_pipeline_args.temp_location): + raise ValueError('--temp_location is required for BigQuery imports.') + if not known_pipeline_args.temp_location.startswith('gs://'): + raise ValueError( + '--temp_location must be valid GCS location for BigQuery imports') + def is_pipeline_direct_runner(pipeline): diff --git a/gcp_variant_transforms/pipeline_common_test.py b/gcp_variant_transforms/pipeline_common_test.py index a5633b615..181c1b851 100644 --- a/gcp_variant_transforms/pipeline_common_test.py +++ b/gcp_variant_transforms/pipeline_common_test.py @@ -94,21 +94,31 @@ def test_fail_on_invalid_flags(self): 'gcp-variant-transforms-test', '--staging_location', 'gs://integration_test_runs/staging'] - pipeline_common._raise_error_on_invalid_flags(pipeline_args) + pipeline_common._raise_error_on_invalid_flags(pipeline_args, None) # Add Dataflow runner (requires --setup_file). pipeline_args.extend(['--runner', 'DataflowRunner']) with self.assertRaisesRegexp(ValueError, 'setup_file'): - pipeline_common._raise_error_on_invalid_flags(pipeline_args) + pipeline_common._raise_error_on_invalid_flags(pipeline_args, None) # Add setup.py (required for Variant Transforms run). This is now valid. pipeline_args.extend(['--setup_file', 'setup.py']) - pipeline_common._raise_error_on_invalid_flags(pipeline_args) + pipeline_common._raise_error_on_invalid_flags(pipeline_args, None) + + with self.assertRaisesRegexp(ValueError, '--temp_location is required*'): + pipeline_common._raise_error_on_invalid_flags(pipeline_args, 'output') + + pipeline_args.extend(['--temp_location', 'wrong_gcs']) + with self.assertRaisesRegexp(ValueError, '--temp_location must be valid*'): + pipeline_common._raise_error_on_invalid_flags(pipeline_args, 'output') + + pipeline_args = pipeline_args[:-1] + ['gs://valid_bucket/temp'] + pipeline_common._raise_error_on_invalid_flags(pipeline_args, 'output') # Add an unknown flag. pipeline_args.extend(['--unknown_flag', 'somevalue']) with self.assertRaisesRegexp(ValueError, 'Unrecognized.*unknown_flag'): - pipeline_common._raise_error_on_invalid_flags(pipeline_args) + pipeline_common._raise_error_on_invalid_flags(pipeline_args, 'output') def test_get_compression_type(self): vcf_metadata_list = [filesystem.FileMetadata(path, size) for diff --git a/gcp_variant_transforms/transforms/sample_info_to_bigquery.py b/gcp_variant_transforms/transforms/sample_info_to_bigquery.py index fde467de1..fe63c71e1 100644 --- a/gcp_variant_transforms/transforms/sample_info_to_bigquery.py +++ b/gcp_variant_transforms/transforms/sample_info_to_bigquery.py @@ -69,7 +69,6 @@ def __init__(self, output_table_prefix, sample_name_encoding, append=False): self._append = append self._sample_name_encoding = sample_name_encoding self._schema = sample_info_table_schema_generator.generate_schema() - self._temp_location = temp_location def expand(self, pcoll): return (pcoll @@ -84,5 +83,4 @@ def expand(self, pcoll): beam.io.BigQueryDisposition.WRITE_APPEND if self._append else beam.io.BigQueryDisposition.WRITE_TRUNCATE), - method=beam.io.WriteToBigQuery.Method.FILE_LOADS, - custom_gcs_temp_location=self._temp_location)) + method=beam.io.WriteToBigQuery.Method.FILE_LOADS)) diff --git a/gcp_variant_transforms/transforms/variant_to_bigquery.py b/gcp_variant_transforms/transforms/variant_to_bigquery.py index 5a954e502..2f17ba358 100644 --- a/gcp_variant_transforms/transforms/variant_to_bigquery.py +++ b/gcp_variant_transforms/transforms/variant_to_bigquery.py @@ -119,5 +119,4 @@ def expand(self, pcoll): beam.io.BigQueryDisposition.WRITE_APPEND if self._append else beam.io.BigQueryDisposition.WRITE_TRUNCATE), - method=beam.io.WriteToBigQuery.Method.FILE_LOADS, - custom_gcs_temp_location=self._temp_location)) + method=beam.io.WriteToBigQuery.Method.FILE_LOADS)) diff --git a/gcp_variant_transforms/vcf_to_bq.py b/gcp_variant_transforms/vcf_to_bq.py index 5b98a79f0..72ede7621 100644 --- a/gcp_variant_transforms/vcf_to_bq.py +++ b/gcp_variant_transforms/vcf_to_bq.py @@ -390,7 +390,6 @@ def _run_annotation_pipeline(known_args, pipeline_args): def _create_sample_info_table(pipeline, # type: beam.Pipeline pipeline_mode, # type: PipelineModes known_args, # type: argparse.Namespace, - temp_directory, # str ): # type: (...) -> None headers = pipeline_common.read_headers( @@ -410,8 +409,6 @@ def run(argv=None): logging.info('Command: %s', ' '.join(argv or sys.argv)) known_args, pipeline_args = pipeline_common.parse_args(argv, _COMMAND_LINE_OPTIONS) - if known_args.output_table and '--temp_location' not in pipeline_args: - raise ValueError('--temp_location is required for BigQuery imports.') if known_args.auto_flags_experiment: _get_input_dimensions(known_args, pipeline_args) @@ -486,9 +483,9 @@ def run(argv=None): for i in range(num_shards): table_suffix = '' - if sharding and sharding.get_shard_name(i): - table_suffix = '_' + sharding.get_shard_name(i) - table_name = known_args.output_table + table_suffix + table_suffix = sharding.get_output_table_suffix(i) + table_name = sample_info_table_schema_generator.compose_table_name( + known_args.output_table, table_suffix) _ = (variants[i] | 'VariantToBigQuery' + table_suffix >> variant_to_bigquery.VariantToBigQuery( table_name, From 0d69f808bcc140f66b17099dd17d927582447a7a Mon Sep 17 00:00:00 2001 From: Tural Neymanov Date: Thu, 23 Jan 2020 18:12:26 -0500 Subject: [PATCH 4/5] Remove WRITE_TRUNCATE option when writing to BQ. --- .../transforms/sample_info_to_bigquery.py | 9 +++------ gcp_variant_transforms/transforms/variant_to_bigquery.py | 8 ++------ gcp_variant_transforms/vcf_to_bq.py | 2 +- 3 files changed, 6 insertions(+), 13 deletions(-) diff --git a/gcp_variant_transforms/transforms/sample_info_to_bigquery.py b/gcp_variant_transforms/transforms/sample_info_to_bigquery.py index fe63c71e1..6e7cc120b 100644 --- a/gcp_variant_transforms/transforms/sample_info_to_bigquery.py +++ b/gcp_variant_transforms/transforms/sample_info_to_bigquery.py @@ -73,14 +73,11 @@ def __init__(self, output_table_prefix, sample_name_encoding, append=False): def expand(self, pcoll): return (pcoll | 'ConvertSampleInfoToBigQueryTableRow' >> beam.ParDo( - ConvertSampleInfoToRow(self._sample_name_encoding)) - | 'WriteSampleInfoToBigQuery' >> beam.io.Write(beam.io.BigQuerySink( + ConvertSampleInfoToRow(self.sample_name_encoding)) + | 'WriteSampleInfoToBigQuery' >> beam.io.WriteToBigQuery( self._output_table, schema=self._schema, create_disposition=( beam.io.BigQueryDisposition.CREATE_IF_NEEDED), - write_disposition=( - beam.io.BigQueryDisposition.WRITE_APPEND - if self._append - else beam.io.BigQueryDisposition.WRITE_TRUNCATE), + write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND, method=beam.io.WriteToBigQuery.Method.FILE_LOADS)) diff --git a/gcp_variant_transforms/transforms/variant_to_bigquery.py b/gcp_variant_transforms/transforms/variant_to_bigquery.py index 2f17ba358..a9697c3ec 100644 --- a/gcp_variant_transforms/transforms/variant_to_bigquery.py +++ b/gcp_variant_transforms/transforms/variant_to_bigquery.py @@ -63,8 +63,7 @@ def __init__( update_schema_on_append=False, # type: bool allow_incompatible_records=False, # type: bool omit_empty_sample_calls=False, # type: bool - null_numeric_value_replacement=None # type: int, - + null_numeric_value_replacement=None # type: int ): # type: (...) -> None """Initializes the transform. @@ -115,8 +114,5 @@ def expand(self, pcoll): schema=self._schema, create_disposition=( beam.io.BigQueryDisposition.CREATE_IF_NEEDED), - write_disposition=( - beam.io.BigQueryDisposition.WRITE_APPEND - if self._append - else beam.io.BigQueryDisposition.WRITE_TRUNCATE), + write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND, method=beam.io.WriteToBigQuery.Method.FILE_LOADS)) diff --git a/gcp_variant_transforms/vcf_to_bq.py b/gcp_variant_transforms/vcf_to_bq.py index 72ede7621..a3853f4bf 100644 --- a/gcp_variant_transforms/vcf_to_bq.py +++ b/gcp_variant_transforms/vcf_to_bq.py @@ -389,7 +389,7 @@ def _run_annotation_pipeline(known_args, pipeline_args): def _create_sample_info_table(pipeline, # type: beam.Pipeline pipeline_mode, # type: PipelineModes - known_args, # type: argparse.Namespace, + known_args # type: argparse.Namespace ): # type: (...) -> None headers = pipeline_common.read_headers( From b755d89748a430bcf99f1af7a39ba9c72ddc9ddf Mon Sep 17 00:00:00 2001 From: Tural Neymanov Date: Mon, 10 Feb 2020 12:15:31 -0500 Subject: [PATCH 5/5] Play with tests. --- .../large_tests/test_non_splittable_gzip.json | 23 ------------------- 1 file changed, 23 deletions(-) delete mode 100644 gcp_variant_transforms/testing/integration/vcf_to_bq_tests/presubmit_tests/large_tests/test_non_splittable_gzip.json diff --git a/gcp_variant_transforms/testing/integration/vcf_to_bq_tests/presubmit_tests/large_tests/test_non_splittable_gzip.json b/gcp_variant_transforms/testing/integration/vcf_to_bq_tests/presubmit_tests/large_tests/test_non_splittable_gzip.json deleted file mode 100644 index cc2d7a8ab..000000000 --- a/gcp_variant_transforms/testing/integration/vcf_to_bq_tests/presubmit_tests/large_tests/test_non_splittable_gzip.json +++ /dev/null @@ -1,23 +0,0 @@ -[ - { - "test_name": "test-non-splittable-gzip", - "table_name": "test_non_splittable_gzip", - "input_pattern": "gs://gcp-variant-transforms-testfiles/large_tests/non-splittable-gzip/**.bgz", - "sharding_config_path": "gcp_variant_transforms/data/sharding_configs/homo_sapiens_default.yaml", - "runner": "DataflowRunner", - "assertion_configs": [ - { - "query": ["NUM_ROWS_QUERY"], - "expected_result": {"num_rows": 409932} - }, - { - "query": ["SUM_START_QUERY"], - "expected_result": {"sum_start": 32190612292607} - }, - { - "query": ["SUM_END_QUERY"], - "expected_result": {"sum_end": 32190612813885} - } - ] - } -]