Skip to content

Commit

Permalink
fix(firestore-bigquery-export): implemented RC changes including logg…
Browse files Browse the repository at this point in the history
…ing keys
  • Loading branch information
cabljac committed Nov 6, 2024
1 parent 211f91b commit 36b23af
Show file tree
Hide file tree
Showing 12 changed files with 106 additions and 88 deletions.
8 changes: 8 additions & 0 deletions _emulator/.firebaserc
Original file line number Diff line number Diff line change
@@ -1,5 +1,13 @@
{
"projects": {
"default": "demo-test"
},
"targets": {},
"etags": {
"dev-extensions-testing": {
"extensionInstances": {
"firestore-bigquery-export": "02acbd8b443b9635716d52d65758a78db1e51140191caecaaf60d932d314a62a"
}
}
}
}
7 changes: 1 addition & 6 deletions _emulator/firebase.json
Original file line number Diff line number Diff line change
@@ -1,11 +1,6 @@
{
"extensions": {
"firestore-send-email": "../firestore-send-email",
"delete-user-data": "../delete-user-data",
"storage-resize-images": "../storage-resize-images",
"firestore-counter": "../firestore-counter",
"firestore-bigquery-export": "../firestore-bigquery-export",
"firestore-send-email-sendgrid": "../firestore-send-email"
"firestore-bigquery-export": "../firestore-bigquery-export"
},
"storage": {
"rules": "storage.rules"
Expand Down
13 changes: 0 additions & 13 deletions firestore-bigquery-export/extension.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -196,19 +196,6 @@ params:
default: posts
required: true

- param: LOG_FAILED_EXPORTS
label: Enable logging failed exports
description: >-
If enabled, the extension will log event exports that failed to enqueue to
Cloud Logging, to mitigate data loss.
type: select
options:
- label: Yes
value: yes
- label: No
value: no
default: yes

- param: WILDCARD_IDS
label: Enable Wildcard Column field with Parent Firestore Document IDs
description: >-
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ Object {
"instanceId": undefined,
"kmsKeyName": "test",
"location": "us-central1",
"logFailedExportData": false,
"maxDispatchesPerSecond": 10,
"maxEnqueueAttempts": 3,
"tableId": "my_table",
Expand Down
6 changes: 3 additions & 3 deletions firestore-bigquery-export/functions/__tests__/e2e.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ import { BigQuery } from "@google-cloud/bigquery";

/** Set defaults */
const bqProjectId = process.env.BQ_PROJECT_ID || "dev-extensions-testing";
const datasetId = process.env.DATASET_ID || "firestore_export_e2e";
const tableId = process.env.TABLE_ID || "posts_raw_changelog";
const datasetId = process.env.DATASET_ID || "firestore_export";
const tableId = process.env.TABLE_ID || "bq_e2e_test_raw_changelog";

/** Init resources */
admin.initializeApp({ projectId: bqProjectId });
Expand Down Expand Up @@ -34,7 +34,7 @@ describe("e2e", () => {

/** Get the latest record from this table */
const [changeLogQuery] = await bq.createQueryJob({
query: `SELECT * FROM \`${bqProjectId}.${datasetId}.${tableId}\` ORDER BY timestamp DESC \ LIMIT 1`,
query: `SELECT * FROM \`${bqProjectId}.${datasetId}.${tableId}\` ORDER BY timestamp DESC LIMIT 1`,
});

const [rows] = await changeLogQuery.getQueryResults();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ jest.mock("firebase-admin/functions", () => ({
}));

jest.mock("../src/logs", () => ({
...jest.requireActual("../src/logs"),
start: jest.fn(() =>
logger.log("Started execution of extension with configuration", config)
),
Expand Down
Binary file not shown.
15 changes: 8 additions & 7 deletions firestore-bigquery-export/functions/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion firestore-bigquery-export/functions/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
"author": "Jan Wyszynski <[email protected]>",
"license": "Apache-2.0",
"dependencies": {
"@firebaseextensions/firestore-bigquery-change-tracker": "file:firebaseextensions-firestore-bigquery-change-tracker-1.1.37.tgz",
"@firebaseextensions/firestore-bigquery-change-tracker": "^1.1.38",
"@google-cloud/bigquery": "^7.6.0",
"@types/chai": "^4.1.6",
"@types/express-serve-static-core": "4.17.30",
Expand Down
1 change: 0 additions & 1 deletion firestore-bigquery-export/functions/src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ export function clustering(clusters: string | undefined) {
}

export default {
logFailedExportData: process.env.LOG_FAILED_EXPORTS === "yes",
bqProjectId: process.env.BIGQUERY_PROJECT_ID,
databaseId: "(default)",
collectionPath: process.env.COLLECTION_PATH,
Expand Down
111 changes: 55 additions & 56 deletions firestore-bigquery-export/functions/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,12 @@ import { getFunctions } from "firebase-admin/functions";
import {
ChangeType,
FirestoreBigQueryEventHistoryTracker,
FirestoreEventHistoryTracker,
FirestoreDocumentChangeEvent,
} from "@firebaseextensions/firestore-bigquery-change-tracker";

import * as logs from "./logs";
import * as events from "./events";
import { getChangeType, getDocumentId, resolveWildcardIds } from "./util";
import { backupToGCS } from "./cloud_storage_backups";
import { getChangeType, getDocumentId } from "./util";

// Configuration for the Firestore Event History Tracker.
const eventTrackerConfig = {
Expand All @@ -53,7 +52,7 @@ const eventTrackerConfig = {
};

// Initialize the Firestore Event History Tracker with the given configuration.
const eventTracker: FirestoreEventHistoryTracker =
const eventTracker: FirestoreBigQueryEventHistoryTracker =
new FirestoreBigQueryEventHistoryTracker(eventTrackerConfig);

// Initialize logging.
Expand All @@ -74,6 +73,17 @@ export const syncBigQuery = functions.tasks
.taskQueue()
.onDispatch(
async ({ context, changeType, documentId, data, oldData }, ctx) => {
const documentName = context.resource.name;
const eventId = context.eventId;
const operation = changeType;

logs.logEventAction(
"Firestore event received by onDispatch trigger",
documentName,
eventId,
operation
);

try {
// Use the shared function to write the event to BigQuery
await recordEventToBigQuery(
Expand Down Expand Up @@ -103,24 +113,13 @@ export const syncBigQuery = functions.tasks
logs.complete();
} catch (err) {
// Log error and throw it to handle in the calling function.
logs.error(true, "Failed to process syncBigQuery task", err, {
context,
changeType,
documentId,
data,
oldData,
});

// if (config.backupToGCS) {
// // Backup to Google Cloud Storage as a last resort.
// await backupToGCS(config.backupBucketName, config.backupDir, {
// changeType,
// documentId,
// serializedData: data,
// serializedOldData: oldData,
// context,
// });
// }
logs.logFailedEventAction(
"Failed to write event to BigQuery from onDispatch handler",
documentName,
eventId,
operation,
err as Error
);

throw err;
}
Expand Down Expand Up @@ -150,6 +149,17 @@ export const fsexportbigquery = functions.firestore
const oldData =
isCreated || config.excludeOldData ? undefined : change.before?.data();

const documentName = context.resource.name;
const eventId = context.eventId;
const operation = changeType;

logs.logEventAction(
"Firestore event received by onWrite trigger",
documentName,
eventId,
operation
);

let serializedData: any;
let serializedOldData: any;

Expand All @@ -159,7 +169,13 @@ export const fsexportbigquery = functions.firestore
serializedOldData = eventTracker.serializeData(oldData);
} catch (err) {
// Log serialization error and throw it.
logs.error(true, "Failed to serialize data", err, { data, oldData });
logs.logFailedEventAction(
"Failed to serialize data",
documentName,
eventId,
operation,
err as Error
);
throw err;
}

Expand Down Expand Up @@ -217,18 +233,20 @@ export const fsexportbigquery = functions.firestore
* @param context - The event context from Firestore.
*/
async function recordEventToBigQuery(
changeType: string,
changeType: ChangeType,
documentId: string,
serializedData: any,
serializedOldData: any,
context: functions.EventContext
) {
const event = {
const event: FirestoreDocumentChangeEvent = {
timestamp: context.timestamp, // Cloud Firestore commit timestamp
operation: changeType, // The type of operation performed
documentName: context.resource.name, // The document name
documentId, // The document ID
pathParams: config.wildcardIds ? context.params : null, // Path parameters, if any
pathParams: (config.wildcardIds ? context.params : null) as
| FirestoreDocumentChangeEvent["pathParams"]
| null, // Path parameters, if any
eventId: context.eventId, // The event ID from Firestore
data: serializedData, // Serialized new data
oldData: serializedOldData, // Serialized old data
Expand All @@ -251,7 +269,7 @@ async function recordEventToBigQuery(
async function attemptToEnqueue(
err: Error,
context: functions.EventContext,
changeType: string,
changeType: ChangeType,
documentId: string,
serializedData: any,
serializedOldData: any
Expand Down Expand Up @@ -295,40 +313,21 @@ async function attemptToEnqueue(
}
} catch (enqueueErr) {
// Prepare the event object for error logging.
const event = {
timestamp: context.timestamp,
operation: changeType,
documentName: context.resource.name,
documentId,
pathParams: config.wildcardIds ? context.params : null,
eventId: context.eventId,
data: serializedData,
oldData: serializedOldData,
};

// Record the error event.
await events.recordErrorEvent(enqueueErr as Error);

// Log the error if it has not been logged already.
if (!enqueueErr.logged && config.logFailedExportData) {
logs.error(
true,
"Failed to enqueue task to syncBigQuery",
enqueueErr,
event
);
}
const documentName = context.resource.name;
const eventId = context.eventId;
const operation = changeType;

// if (config.backupToGCS) {
// // Backup to Google Cloud Storage as a last resort.
// await backupToGCS(config.backupBucketName, config.backupDir, {
// changeType,
// documentId,
// serializedData,
// serializedOldData,
// context,
// });
// }
logs.logFailedEventAction(
"Failed to enqueue event to Cloud Tasks from onWrite handler",
documentName,
eventId,
operation,
enqueueErr as Error
);
}
}

Expand Down
29 changes: 29 additions & 0 deletions firestore-bigquery-export/functions/src/logs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
import { logger } from "firebase-functions";
import config from "./config";
import { ChangeType } from "@firebaseextensions/firestore-bigquery-change-tracker";

export const arrayFieldInvalid = (fieldName: string) => {
logger.warn(`Array field '${fieldName}' does not contain an array, skipping`);
Expand Down Expand Up @@ -182,3 +183,31 @@ export const timestampMissingValue = (fieldName: string) => {
`Missing value for timestamp field: ${fieldName}, using default timestamp instead.`
);
};

export const logEventAction = (
action: string,
document_name: string,
event_id: string,
operation: ChangeType
) => {
logger.info(action, {
document_name,
event_id,
operation,
});
};

export const logFailedEventAction = (
action: string,
document_name: string,
event_id: string,
operation: ChangeType,
error: Error
) => {
logger.error(action, {
document_name,
event_id,
operation,
error,
});
};

0 comments on commit 36b23af

Please sign in to comment.