Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Release: firestore-bigquery-export #2184

Merged
merged 4 commits into from
Sep 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions firestore-bigquery-export/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
## Version 0.1.55

feat - log failed queued tasks

## Version 0.1.54

fixed - bump changetracker and fix more vulnerabilities
Expand Down
2 changes: 2 additions & 0 deletions firestore-bigquery-export/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,8 @@ To install an extension, your project must be on the [Blaze (pay as you go) plan

* Collection path: What is the path of the collection that you would like to export? You may use `{wildcard}` notation to match a subcollection of all documents in a collection (for example: `chatrooms/{chatid}/posts`). Parent Firestore Document IDs from `{wildcards}` can be returned in `path_params` as a JSON formatted string.

* Enable logging failed exports: If enabled, the extension will log event exports that failed to enqueue to Cloud Logging, to mitigate data loss.

* Enable Wildcard Column field with Parent Firestore Document IDs: If enabled, creates a column containing a JSON object of all wildcard ids from a documents path.

* Dataset ID: What ID would you like to use for your BigQuery dataset? This extension will create the dataset, if it doesn't already exist.
Expand Down
15 changes: 14 additions & 1 deletion firestore-bigquery-export/extension.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
# limitations under the License.

name: firestore-bigquery-export
version: 0.1.54
version: 0.1.55
specVersion: v1beta

displayName: Stream Firestore to BigQuery
Expand Down Expand Up @@ -206,6 +206,19 @@ params:
default: posts
required: true

- param: LOG_FAILED_EXPORTS
label: Enable logging failed exports
description: >-
If enabled, the extension will log event exports that failed to enqueue to
Cloud Logging, to mitigate data loss.
type: select
options:
- label: Yes
value: yes
- label: No
value: no
required: true

- param: WILDCARD_IDS
label: Enable Wildcard Column field with Parent Firestore Document IDs
description: >-
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ Object {
"instanceId": undefined,
"kmsKeyName": "test",
"location": "us-central1",
"logFailedExportData": false,
"maxDispatchesPerSecond": 10,
"tableId": "my_table",
"timePartitioning": null,
Expand Down Expand Up @@ -74,4 +75,4 @@ Object {
",
"validationRegex": "^[a-zA-Z0-9_]+$",
}
`;
`;
1 change: 1 addition & 0 deletions firestore-bigquery-export/functions/src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ export function clustering(clusters: string | undefined) {
}

export default {
logFailedExportData: process.env.LOG_FAILED_EXPORTS === "yes",
bqProjectId: process.env.BIGQUERY_PROJECT_ID,
databaseId: "(default)",
collectionPath: process.env.COLLECTION_PATH,
Expand Down
121 changes: 72 additions & 49 deletions firestore-bigquery-export/functions/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,25 +32,27 @@ import * as logs from "./logs";
import * as events from "./events";
import { getChangeType, getDocumentId, resolveWildcardIds } from "./util";

const eventTrackerConfig = {
tableId: config.tableId,
datasetId: config.datasetId,
datasetLocation: config.datasetLocation,
backupTableId: config.backupCollectionId,
transformFunction: config.transformFunction,
timePartitioning: config.timePartitioning,
timePartitioningField: config.timePartitioningField,
timePartitioningFieldType: config.timePartitioningFieldType,
timePartitioningFirestoreField: config.timePartitioningFirestoreField,
databaseId: config.databaseId,
clustering: config.clustering,
wildcardIds: config.wildcardIds,
bqProjectId: config.bqProjectId,
useNewSnapshotQuerySyntax: config.useNewSnapshotQuerySyntax,
skipInit: true,
kmsKeyName: config.kmsKeyName,
};

const eventTracker: FirestoreEventHistoryTracker =
new FirestoreBigQueryEventHistoryTracker({
tableId: config.tableId,
datasetId: config.datasetId,
datasetLocation: config.datasetLocation,
backupTableId: config.backupCollectionId,
transformFunction: config.transformFunction,
timePartitioning: config.timePartitioning,
timePartitioningField: config.timePartitioningField,
timePartitioningFieldType: config.timePartitioningFieldType,
timePartitioningFirestoreField: config.timePartitioningFirestoreField,
databaseId: config.databaseId,
clustering: config.clustering,
wildcardIds: config.wildcardIds,
bqProjectId: config.bqProjectId,
useNewSnapshotQuerySyntax: config.useNewSnapshotQuerySyntax,
skipInit: true,
kmsKeyName: config.kmsKeyName,
});
new FirestoreBigQueryEventHistoryTracker(eventTrackerConfig);

logs.init();

Expand Down Expand Up @@ -97,60 +99,81 @@ export const fsexportbigquery = functions
.document(config.collectionPath)
.onWrite(async (change, context) => {
logs.start();
try {
const changeType = getChangeType(change);
const documentId = getDocumentId(change);
const changeType = getChangeType(change);
const documentId = getDocumentId(change);

const isCreated = changeType === ChangeType.CREATE;
const isDeleted = changeType === ChangeType.DELETE;

const isCreated = changeType === ChangeType.CREATE;
const isDeleted = changeType === ChangeType.DELETE;
const data = isDeleted ? undefined : change.after?.data();
const oldData =
isCreated || config.excludeOldData ? undefined : change.before?.data();

const data = isDeleted ? undefined : change.after.data();
const oldData =
isCreated || config.excludeOldData ? undefined : change.before.data();
/**
* Serialize early before queueing in cloud task
* Cloud tasks currently have a limit of 1mb, this also ensures payloads are kept to a minimum
*/
let serializedData: any;
let serializedOldData: any;

try {
serializedData = eventTracker.serializeData(data);
serializedOldData = eventTracker.serializeData(oldData);
} catch (err) {
logs.error(false, "Failed to serialize data", err, null, null);
throw err;
}

try {
await events.recordStartEvent({
documentId,
changeType,
before: {
data: change.before.data(),
},
after: {
data: change.after.data(),
},
before: { data: change.before.data() },
after: { data: change.after.data() },
context: context.resource,
});
} catch (err) {
logs.error(false, "Failed to record start event", err, null, null);
throw err;
}

try {
const queue = getFunctions().taskQueue(
`locations/${config.location}/functions/syncBigQuery`,
config.instanceId
);

/**
* enqueue data cannot currently handle documentdata
* Serialize early before queueing in clopud task
* Cloud tasks currently have a limit of 1mb, this also ensures payloads are kept to a minimum
*/
const seializedData = eventTracker.serializeData(data);
const serializedOldData = eventTracker.serializeData(oldData);

await queue.enqueue({
context,
changeType,
documentId,
data: seializedData,
data: serializedData,
oldData: serializedOldData,
});
} catch (err) {
await events.recordErrorEvent(err as Error);
logs.error(err);
const eventAgeMs = Date.now() - Date.parse(context.timestamp);
const eventMaxAgeMs = 10000;
const event = {
timestamp: context.timestamp, // This is a Cloud Firestore commit timestamp with microsecond precision.
operation: changeType,
documentName: context.resource.name,
documentId: documentId,
pathParams: config.wildcardIds ? context.params : null,
eventId: context.eventId,
data: serializedData,
oldData: serializedOldData,
};

if (eventAgeMs > eventMaxAgeMs) {
return;
await events.recordErrorEvent(err as Error);
// Only log the error once here
if (!err.logged) {
logs.error(
config.logFailedExportData,
"Failed to enqueue task to syncBigQuery",
err,
event,
eventTrackerConfig
);
}

throw err;
return;
}

logs.complete();
Expand Down
20 changes: 18 additions & 2 deletions firestore-bigquery-export/functions/src/logs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -149,8 +149,24 @@ export const dataTypeInvalid = (
);
};

export const error = (err: Error) => {
logger.error("Error when mirroring data to BigQuery", err);
export const error = (
includeEvent: boolean,
message: string,
err: Error,
event: any,
eventTrackerConfig: any
) => {
if (includeEvent) {
logger.error(`Error when mirroring data to BigQuery: ${message}`, {
error: err,
event,
eventTrackerConfig,
});
} else {
logger.error(`Error when mirroring data to BigQuery: ${message}`, {
error: err,
});
}
};

export const init = () => {
Expand Down
Loading