Skip to content

Commit

Permalink
Feature: configurable BigQuery offline store export parquet file size (
Browse files Browse the repository at this point in the history
…#12)

* Update BQ to_remote_storage

* Update sdk/python/feast/infra/offline_stores/bigquery.py

Co-authored-by: Daniel Salvador <[email protected]>

* Add check for BQ_EXPORT_PARQUET_FILE_SIZE_MB value

* Update check

* Raise exception when BQ_EXPORT_PARQUET_FILE_SIZE_MB is not a digit

---------

Co-authored-by: Daniel Salvador <[email protected]>
  • Loading branch information
KarolisKont and danielsalvador authored Nov 8, 2023
1 parent 091b7b1 commit affbc4d
Showing 1 changed file with 29 additions and 2 deletions.
31 changes: 29 additions & 2 deletions sdk/python/feast/infra/offline_stores/bigquery.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import contextlib
import os
import tempfile
import uuid
from datetime import date, datetime, timedelta
Expand Down Expand Up @@ -74,7 +75,8 @@ def get_http_client_info():

class BigQueryTableCreateDisposition(ConstrainedStr):
"""Custom constraint for table_create_disposition. To understand more, see:
https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobConfigurationLoad.FIELDS.create_disposition"""
https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobConfigurationLoad.FIELDS.create_disposition
"""

values = {"CREATE_NEVER", "CREATE_IF_NEEDED"}

Expand Down Expand Up @@ -571,12 +573,37 @@ def to_remote_storage(self) -> List[str]:

table = self.to_bigquery()

parquet_file_size_mb = os.getenv("BQ_EXPORT_PARQUET_FILE_SIZE_MB")
if parquet_file_size_mb is not None:
if not parquet_file_size_mb.isdigit():
raise ValueError(
"The value for the BQ_EXPORT_PARQUET_FILE_SIZE_MB environment variable must "
"be a numeric digit, but it was set to: %s",
parquet_file_size_mb,
)

parquet_file_size_mb_int = int(parquet_file_size_mb)
if parquet_file_size_mb_int > 1000:
raise ValueError(
"The value for the BQ_EXPORT_PARQUET_FILE_SIZE_MB environment variable cannot "
"exceed 1000; however, it was set to: %s.",
parquet_file_size_mb_int,
)

table_size_in_mb = self.client.get_table(table).num_bytes / 1024 / 1024
number_of_files = max(1, table_size_in_mb // parquet_file_size_mb_int)
destination_uris = [
f"{self._gcs_path}/{n:0>12}.parquet" for n in range(number_of_files)
]
else:
destination_uris = [f"{self._gcs_path}/*.parquet"]

job_config = bigquery.job.ExtractJobConfig()
job_config.destination_format = "PARQUET"

extract_job = self.client.extract_table(
table,
destination_uris=[f"{self._gcs_path}/*.parquet"],
destination_uris=destination_uris,
location=self.config.offline_store.location,
job_config=job_config,
)
Expand Down

0 comments on commit affbc4d

Please sign in to comment.