diff --git a/firestore-bigquery-export/CHANGELOG.md b/firestore-bigquery-export/CHANGELOG.md index 501419c4a..06946ab46 100644 --- a/firestore-bigquery-export/CHANGELOG.md +++ b/firestore-bigquery-export/CHANGELOG.md @@ -1,3 +1,7 @@ +## Version 0.1.55 + +feat - log failed queued tasks + ## Version 0.1.54 fixed - bump changetracker and fix more vulnerabilities diff --git a/firestore-bigquery-export/README.md b/firestore-bigquery-export/README.md index 22048b29b..932e71881 100644 --- a/firestore-bigquery-export/README.md +++ b/firestore-bigquery-export/README.md @@ -126,6 +126,8 @@ To install an extension, your project must be on the [Blaze (pay as you go) plan * Collection path: What is the path of the collection that you would like to export? You may use `{wildcard}` notation to match a subcollection of all documents in a collection (for example: `chatrooms/{chatid}/posts`). Parent Firestore Document IDs from `{wildcards}` can be returned in `path_params` as a JSON formatted string. +* Enable logging failed exports: If enabled, the extension will log event exports that failed to enqueue to Cloud Logging, to mitigate data loss. + * Enable Wildcard Column field with Parent Firestore Document IDs: If enabled, creates a column containing a JSON object of all wildcard ids from a documents path. * Dataset ID: What ID would you like to use for your BigQuery dataset? This extension will create the dataset, if it doesn't already exist. diff --git a/firestore-bigquery-export/extension.yaml b/firestore-bigquery-export/extension.yaml index 00bc9bd85..c8045fe97 100644 --- a/firestore-bigquery-export/extension.yaml +++ b/firestore-bigquery-export/extension.yaml @@ -13,7 +13,7 @@ # limitations under the License. name: firestore-bigquery-export -version: 0.1.54 +version: 0.1.55 specVersion: v1beta displayName: Stream Firestore to BigQuery @@ -206,6 +206,19 @@ params: default: posts required: true + - param: LOG_FAILED_EXPORTS + label: Enable logging failed exports + description: >- + If enabled, the extension will log event exports that failed to enqueue to + Cloud Logging, to mitigate data loss. + type: select + options: + - label: Yes + value: yes + - label: No + value: no + required: true + - param: WILDCARD_IDS label: Enable Wildcard Column field with Parent Firestore Document IDs description: >- diff --git a/firestore-bigquery-export/functions/__tests__/__snapshots__/config.test.ts.snap b/firestore-bigquery-export/functions/__tests__/__snapshots__/config.test.ts.snap index 5ceb5035f..d5a48cdab 100644 --- a/firestore-bigquery-export/functions/__tests__/__snapshots__/config.test.ts.snap +++ b/firestore-bigquery-export/functions/__tests__/__snapshots__/config.test.ts.snap @@ -20,6 +20,7 @@ Object { "instanceId": undefined, "kmsKeyName": "test", "location": "us-central1", + "logFailedExportData": false, "maxDispatchesPerSecond": 10, "tableId": "my_table", "timePartitioning": null, @@ -74,4 +75,4 @@ Object { ", "validationRegex": "^[a-zA-Z0-9_]+$", } -`; \ No newline at end of file +`; diff --git a/firestore-bigquery-export/functions/src/config.ts b/firestore-bigquery-export/functions/src/config.ts index 113058c6f..3adb5d05d 100644 --- a/firestore-bigquery-export/functions/src/config.ts +++ b/firestore-bigquery-export/functions/src/config.ts @@ -32,6 +32,7 @@ export function clustering(clusters: string | undefined) { } export default { + logFailedExportData: process.env.LOG_FAILED_EXPORTS === "yes", bqProjectId: process.env.BIGQUERY_PROJECT_ID, databaseId: "(default)", collectionPath: process.env.COLLECTION_PATH, diff --git a/firestore-bigquery-export/functions/src/index.ts b/firestore-bigquery-export/functions/src/index.ts index abb2d3486..89b7cd99a 100644 --- a/firestore-bigquery-export/functions/src/index.ts +++ b/firestore-bigquery-export/functions/src/index.ts @@ -32,25 +32,27 @@ import * as logs from "./logs"; import * as events from "./events"; import { getChangeType, getDocumentId, resolveWildcardIds } from "./util"; +const eventTrackerConfig = { + tableId: config.tableId, + datasetId: config.datasetId, + datasetLocation: config.datasetLocation, + backupTableId: config.backupCollectionId, + transformFunction: config.transformFunction, + timePartitioning: config.timePartitioning, + timePartitioningField: config.timePartitioningField, + timePartitioningFieldType: config.timePartitioningFieldType, + timePartitioningFirestoreField: config.timePartitioningFirestoreField, + databaseId: config.databaseId, + clustering: config.clustering, + wildcardIds: config.wildcardIds, + bqProjectId: config.bqProjectId, + useNewSnapshotQuerySyntax: config.useNewSnapshotQuerySyntax, + skipInit: true, + kmsKeyName: config.kmsKeyName, +}; + const eventTracker: FirestoreEventHistoryTracker = - new FirestoreBigQueryEventHistoryTracker({ - tableId: config.tableId, - datasetId: config.datasetId, - datasetLocation: config.datasetLocation, - backupTableId: config.backupCollectionId, - transformFunction: config.transformFunction, - timePartitioning: config.timePartitioning, - timePartitioningField: config.timePartitioningField, - timePartitioningFieldType: config.timePartitioningFieldType, - timePartitioningFirestoreField: config.timePartitioningFirestoreField, - databaseId: config.databaseId, - clustering: config.clustering, - wildcardIds: config.wildcardIds, - bqProjectId: config.bqProjectId, - useNewSnapshotQuerySyntax: config.useNewSnapshotQuerySyntax, - skipInit: true, - kmsKeyName: config.kmsKeyName, - }); + new FirestoreBigQueryEventHistoryTracker(eventTrackerConfig); logs.init(); @@ -97,60 +99,81 @@ export const fsexportbigquery = functions .document(config.collectionPath) .onWrite(async (change, context) => { logs.start(); - try { - const changeType = getChangeType(change); - const documentId = getDocumentId(change); + const changeType = getChangeType(change); + const documentId = getDocumentId(change); + + const isCreated = changeType === ChangeType.CREATE; + const isDeleted = changeType === ChangeType.DELETE; - const isCreated = changeType === ChangeType.CREATE; - const isDeleted = changeType === ChangeType.DELETE; + const data = isDeleted ? undefined : change.after?.data(); + const oldData = + isCreated || config.excludeOldData ? undefined : change.before?.data(); - const data = isDeleted ? undefined : change.after.data(); - const oldData = - isCreated || config.excludeOldData ? undefined : change.before.data(); + /** + * Serialize early before queueing in cloud task + * Cloud tasks currently have a limit of 1mb, this also ensures payloads are kept to a minimum + */ + let serializedData: any; + let serializedOldData: any; + try { + serializedData = eventTracker.serializeData(data); + serializedOldData = eventTracker.serializeData(oldData); + } catch (err) { + logs.error(false, "Failed to serialize data", err, null, null); + throw err; + } + + try { await events.recordStartEvent({ documentId, changeType, - before: { - data: change.before.data(), - }, - after: { - data: change.after.data(), - }, + before: { data: change.before.data() }, + after: { data: change.after.data() }, context: context.resource, }); + } catch (err) { + logs.error(false, "Failed to record start event", err, null, null); + throw err; + } + try { const queue = getFunctions().taskQueue( `locations/${config.location}/functions/syncBigQuery`, config.instanceId ); - /** - * enqueue data cannot currently handle documentdata - * Serialize early before queueing in clopud task - * Cloud tasks currently have a limit of 1mb, this also ensures payloads are kept to a minimum - */ - const seializedData = eventTracker.serializeData(data); - const serializedOldData = eventTracker.serializeData(oldData); - await queue.enqueue({ context, changeType, documentId, - data: seializedData, + data: serializedData, oldData: serializedOldData, }); } catch (err) { - await events.recordErrorEvent(err as Error); - logs.error(err); - const eventAgeMs = Date.now() - Date.parse(context.timestamp); - const eventMaxAgeMs = 10000; + const event = { + timestamp: context.timestamp, // This is a Cloud Firestore commit timestamp with microsecond precision. + operation: changeType, + documentName: context.resource.name, + documentId: documentId, + pathParams: config.wildcardIds ? context.params : null, + eventId: context.eventId, + data: serializedData, + oldData: serializedOldData, + }; - if (eventAgeMs > eventMaxAgeMs) { - return; + await events.recordErrorEvent(err as Error); + // Only log the error once here + if (!err.logged) { + logs.error( + config.logFailedExportData, + "Failed to enqueue task to syncBigQuery", + err, + event, + eventTrackerConfig + ); } - - throw err; + return; } logs.complete(); diff --git a/firestore-bigquery-export/functions/src/logs.ts b/firestore-bigquery-export/functions/src/logs.ts index ae0a931ea..c312cecdf 100644 --- a/firestore-bigquery-export/functions/src/logs.ts +++ b/firestore-bigquery-export/functions/src/logs.ts @@ -149,8 +149,24 @@ export const dataTypeInvalid = ( ); }; -export const error = (err: Error) => { - logger.error("Error when mirroring data to BigQuery", err); +export const error = ( + includeEvent: boolean, + message: string, + err: Error, + event: any, + eventTrackerConfig: any +) => { + if (includeEvent) { + logger.error(`Error when mirroring data to BigQuery: ${message}`, { + error: err, + event, + eventTrackerConfig, + }); + } else { + logger.error(`Error when mirroring data to BigQuery: ${message}`, { + error: err, + }); + } }; export const init = () => {