Skip to content

Commit

Permalink
use <schema>_<table> format to name tables in BQ
Browse files Browse the repository at this point in the history
  • Loading branch information
imor committed Nov 28, 2024
1 parent d891093 commit 84d93f0
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 6 deletions.
2 changes: 1 addition & 1 deletion pg_replicate/src/clients/bigquery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,7 @@ impl BigQueryClient {
pub async fn stream_rows(
&mut self,
dataset_id: &str,
table_name: &str,
table_name: String,
table_descriptor: &TableDescriptor,
table_rows: &[TableRow],
) -> Result<(), BQError> {
Expand Down
14 changes: 9 additions & 5 deletions pg_replicate/src/pipeline/sinks/bigquery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use crate::{
clients::bigquery::BigQueryClient,
conversions::{cdc_event::CdcEvent, table_row::TableRow, Cell},
pipeline::PipelineResumptionState,
table::{ColumnSchema, TableId, TableSchema},
table::{ColumnSchema, TableId, TableName, TableSchema},
};

use super::{BatchSink, SinkError};
Expand Down Expand Up @@ -76,6 +76,10 @@ impl BigQueryBatchSink {
.get(&table_id)
.ok_or(BigQuerySinkError::MissingTableId(table_id))
}

fn table_name_in_bq(table_name: &TableName) -> String {
format!("{}_{}", table_name.schema, table_name.name)
}
}

#[async_trait]
Expand Down Expand Up @@ -138,10 +142,11 @@ impl BatchSink for BigQueryBatchSink {
table_schemas: HashMap<TableId, TableSchema>,
) -> Result<(), SinkError> {
for table_schema in table_schemas.values() {
let table_name = Self::table_name_in_bq(&table_schema.table_name);
self.client
.create_table_if_missing(
&self.dataset_id,
&table_schema.table_name.name,
&table_name,
&table_schema.column_schemas,
)
.await?;
Expand All @@ -158,8 +163,7 @@ impl BatchSink for BigQueryBatchSink {
table_id: TableId,
) -> Result<(), SinkError> {
let table_schema = self.get_table_schema(table_id)?;
//TODO: remove this clone
let table_name = &table_schema.table_name.name.clone();
let table_name = Self::table_name_in_bq(&table_schema.table_name);
let table_descriptor = table_schema.into();

for table_row in &mut table_rows {
Expand Down Expand Up @@ -221,7 +225,7 @@ impl BatchSink for BigQueryBatchSink {

for (table_id, table_rows) in table_name_to_table_rows {
let table_schema = self.get_table_schema(table_id)?;
let table_name = &table_schema.table_name.name.clone();
let table_name = Self::table_name_in_bq(&table_schema.table_name);
let table_descriptor = table_schema.into();
self.client
.stream_rows(&self.dataset_id, table_name, &table_descriptor, &table_rows)
Expand Down

0 comments on commit 84d93f0

Please sign in to comment.