Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor: drop ability to configure BigQuery offline store export parquet file size #17

Merged
merged 4 commits into from
Nov 13, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 17 additions & 23 deletions sdk/python/feast/infra/offline_stores/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import pandas as pd
import pyarrow
import pyarrow.parquet
from pydantic import ConstrainedStr, Field, StrictStr, validator
from pydantic import ConstrainedStr, StrictStr, validator
from pydantic.typing import Literal
from tenacity import Retrying, retry_if_exception_type, stop_after_delay, wait_fixed

Expand All @@ -47,6 +47,7 @@
from feast.repo_config import FeastConfigBaseModel, RepoConfig
from feast.saved_dataset import SavedDatasetStorage
from feast.usage import get_user_agent, log_exceptions_and_usage

from .bigquery_source import (
BigQueryLoggingDestination,
BigQuerySource,
Expand Down Expand Up @@ -105,9 +106,6 @@ class BigQueryOfflineStoreConfig(FeastConfigBaseModel):
gcs_staging_location: Optional[str] = None
""" (optional) GCS location used for offloading BigQuery results as parquet files."""

gcs_staging_file_size_mb: Optional[int] = Field(None, ge=1, le=1000)
""" (optional) Specify the staging file size in Megabytes. If it is not set, the BigQuery export function will determine the export file size automatically."""

table_create_disposition: Optional[BigQueryTableCreateDisposition] = None
""" (optional) Specifies whether the job is allowed to create new tables. The default value is CREATE_IF_NEEDED."""

Expand Down Expand Up @@ -579,29 +577,19 @@ def to_remote_storage(self) -> List[str]:

table = None
try:
logger.info(f"Starting data export to '{self._gcs_path}'")
logger.info("Starting data export to '%s'", self._gcs_path)
table = self.to_bigquery()
logger.info(f"Data exported to table '{table}'")

if self.config.offline_store.gcs_staging_file_size_mb is not None:
table_size_in_mb = self.client.get_table(table).num_bytes / 1024 / 1024
number_of_files = max(
1,
int(table_size_in_mb // self.config.offline_store.gcs_staging_file_size_mb),
)
destination_uris = [
f"{self._gcs_path}/{n:0>12}.parquet" for n in range(number_of_files)
]
else:
destination_uris = [f"{self._gcs_path}/*.parquet"]
logger.info("Data exported to table '%s'", table)

job_config = bigquery.job.ExtractJobConfig()
job_config.destination_format = "PARQUET"

logger.info(f"Starting data extraction from '{table}' to '{self._gcs_path}'")
logger.info(
"Starting data extraction from '%s' to '%s'", table, self._gcs_path
)
extract_job = self.client.extract_table(
table,
destination_uris=destination_uris,
destination_uris=[f"{self._gcs_path}/*.parquet"],
location=self.config.offline_store.location,
job_config=job_config,
)
Expand All @@ -610,10 +598,14 @@ def to_remote_storage(self) -> List[str]:
bucket: str
prefix: str
if self.config.offline_store.billing_project_id:
storage_client = StorageClient(project=self.config.offline_store.project_id)
storage_client = StorageClient(
project=self.config.offline_store.project_id
)
else:
storage_client = StorageClient(project=self.client.project)
# fmt: off
bucket, prefix = self._gcs_path[len("gs://"):].split("/", 1)
# fmt: on
if prefix.startswith("/"):
prefix = prefix[1:]

Expand All @@ -622,11 +614,13 @@ def to_remote_storage(self) -> List[str]:
for b in blobs:
results.append(f"gs://{b.bucket.name}/{b.name}")

logger.info(f"Data extraction completed. Extracted to {len(results)} files")
logger.info(
"Data extraction completed. Extracted to %s files", len(results)
)
return results
finally:
if table:
logger.info(f"Cleanup: Deleting temporary table '{table}'")
logger.info("Cleanup: Deleting temporary table '%s'", table)
self.client.delete_table(table=table, not_found_ok=True)


Expand Down
Loading