From 8c632c208afc2c4ded4bb1b514338e6e51d36d00 Mon Sep 17 00:00:00 2001 From: Tural Neymanov Date: Thu, 23 Jan 2020 18:12:26 -0500 Subject: [PATCH] Remove WRITE_TRUNCATE option when writing to BQ. --- docker/Dockerfile | 2 ++ .../transforms/sample_info_to_bigquery.py | 9 +++------ gcp_variant_transforms/transforms/variant_to_bigquery.py | 5 +---- gcp_variant_transforms/vcf_to_bq.py | 2 -- 4 files changed, 6 insertions(+), 12 deletions(-) diff --git a/docker/Dockerfile b/docker/Dockerfile index f7d9c4511..928fe6545 100644 --- a/docker/Dockerfile +++ b/docker/Dockerfile @@ -47,6 +47,8 @@ RUN apt-get update && apt-get install -y \ zlib1g-dev # Install dependencies. + + RUN python -m pip install --upgrade pip && \ python -m pip install --upgrade virtualenv && \ virtualenv /opt/gcp_variant_transforms/venv && \ diff --git a/gcp_variant_transforms/transforms/sample_info_to_bigquery.py b/gcp_variant_transforms/transforms/sample_info_to_bigquery.py index fe63c71e1..6e7cc120b 100644 --- a/gcp_variant_transforms/transforms/sample_info_to_bigquery.py +++ b/gcp_variant_transforms/transforms/sample_info_to_bigquery.py @@ -73,14 +73,11 @@ def __init__(self, output_table_prefix, sample_name_encoding, append=False): def expand(self, pcoll): return (pcoll | 'ConvertSampleInfoToBigQueryTableRow' >> beam.ParDo( - ConvertSampleInfoToRow(self._sample_name_encoding)) - | 'WriteSampleInfoToBigQuery' >> beam.io.Write(beam.io.BigQuerySink( + ConvertSampleInfoToRow(self.sample_name_encoding)) + | 'WriteSampleInfoToBigQuery' >> beam.io.WriteToBigQuery( self._output_table, schema=self._schema, create_disposition=( beam.io.BigQueryDisposition.CREATE_IF_NEEDED), - write_disposition=( - beam.io.BigQueryDisposition.WRITE_APPEND - if self._append - else beam.io.BigQueryDisposition.WRITE_TRUNCATE), + write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND, method=beam.io.WriteToBigQuery.Method.FILE_LOADS)) diff --git a/gcp_variant_transforms/transforms/variant_to_bigquery.py b/gcp_variant_transforms/transforms/variant_to_bigquery.py index 2f17ba358..7d8e880c4 100644 --- a/gcp_variant_transforms/transforms/variant_to_bigquery.py +++ b/gcp_variant_transforms/transforms/variant_to_bigquery.py @@ -115,8 +115,5 @@ def expand(self, pcoll): schema=self._schema, create_disposition=( beam.io.BigQueryDisposition.CREATE_IF_NEEDED), - write_disposition=( - beam.io.BigQueryDisposition.WRITE_APPEND - if self._append - else beam.io.BigQueryDisposition.WRITE_TRUNCATE), + write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND, method=beam.io.WriteToBigQuery.Method.FILE_LOADS)) diff --git a/gcp_variant_transforms/vcf_to_bq.py b/gcp_variant_transforms/vcf_to_bq.py index 81202c307..45299e3c8 100644 --- a/gcp_variant_transforms/vcf_to_bq.py +++ b/gcp_variant_transforms/vcf_to_bq.py @@ -483,8 +483,6 @@ def run(argv=None): num_shards = 1 if known_args.output_table: -<<<<<<< HEAD -<<<<<<< HEAD schema_file = tempfile.mkstemp(prefix=known_args.output_table, suffix=_BQ_SCHEMA_FILE_SUFFIX)[1] schema = (