Skip to content

Commit

Permalink
chore(firestore-bigquery-export): temporarily disable GCS
Browse files Browse the repository at this point in the history
  • Loading branch information
cabljac committed Oct 1, 2024
1 parent 5bdd28c commit abaff8c
Show file tree
Hide file tree
Showing 14 changed files with 262 additions and 63 deletions.
4 changes: 0 additions & 4 deletions firestore-bigquery-export/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -162,10 +162,6 @@ essential for the script to insert data into an already partitioned table.)

* Maximum number of enqueue attempts: This parameter will set the maximum number of attempts to enqueue a document to cloud tasks for export to BigQuery. If the maximum number of attempts is reached, the failed export will be handled according to the `LOG_FAILED_EXPORTS` parameter.

* Backup to GCS: If enabled, failed BigQuery updates will be written to a GCS bucket.

* Backup GCS Bucket Name: This (optional) parameter will allow you to specify a GCS bucket for which failed BigQuery updates will be written to, if this feature is enabled.



**Cloud Functions:**
Expand Down
38 changes: 19 additions & 19 deletions firestore-bigquery-export/extension.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -425,25 +425,25 @@ params:
validationErrorMessage: Please select an integer between 1 and 10
default: 3

- param: BACKUP_TO_GCS
label: Backup to GCS
description: >-
If enabled, failed BigQuery updates will be written to a GCS bucket.
type: select
options:
- label: Yes
value: yes
- label: No
value: no
default: no
required: true

- param: BACKUP_GCS_BUCKET
label: Backup GCS Bucket Name
description: >-
This (optional) parameter will allow you to specify a GCS bucket for which
failed BigQuery updates will be written to, if this feature is enabled.
type: string
# - param: BACKUP_TO_GCS
# label: Backup to GCS
# description: >-
# If enabled, failed BigQuery updates will be written to a GCS bucket.
# type: select
# options:
# - label: Yes
# value: yes
# - label: No
# value: no
# default: no
# required: true

# - param: BACKUP_GCS_BUCKET
# label: Backup GCS Bucket Name
# description: >-
# This (optional) parameter will allow you to specify a GCS bucket for which
# failed BigQuery updates will be written to, if this feature is enabled.
# type: string

events:
- type: firebase.extensions.firestore-counter.v1.onStart
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ Object {
"backupBucketName": "undefined.appspot.com",
"backupCollectionId": undefined,
"backupDir": "_firestore-bigquery-export",
"backupToGCS": false,
"bqProjectId": undefined,
"clustering": Array [
"data",
Expand Down
Binary file not shown.
5 changes: 3 additions & 2 deletions firestore-bigquery-export/functions/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion firestore-bigquery-export/functions/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
"author": "Jan Wyszynski <[email protected]>",
"license": "Apache-2.0",
"dependencies": {
"@firebaseextensions/firestore-bigquery-change-tracker": "^1.1.37",
"@firebaseextensions/firestore-bigquery-change-tracker": "file:firebaseextensions-firestore-bigquery-change-tracker-1.1.37.tgz",
"@google-cloud/bigquery": "^7.6.0",
"@types/chai": "^4.1.6",
"@types/express-serve-static-core": "4.17.30",
Expand Down
16 changes: 8 additions & 8 deletions firestore-bigquery-export/functions/src/cloud_storage_backups.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ import * as path from "path";
import * as fs from "fs";
import { promisify } from "util";

// Promisify the writeFile function to use async/await
const writeFileAsync = promisify(fs.writeFile);
// TODO: we dont need to promisify in node 18+
const writeFile = promisify(fs.writeFile);

// Initialize Google Cloud Storage client
const storage = new Storage();
Expand Down Expand Up @@ -51,11 +51,8 @@ export async function backupToGCS(
context: functions.EventContext;
}
) {
// Create a timestamp for the backup file
const timestamp = new Date().toISOString();

// Define the filename using documentId and timestamp to ensure uniqueness
const fileName = `${dirName}/${documentId}_${timestamp}.csv`;
const fileName = `${dirName}/${documentId}_${context.eventId}.csv`;

// Create a CSV string from the event data
const csvData = `
Expand All @@ -68,8 +65,11 @@ timestamp,event_id,document_name,operation,data,old_data,document_id

try {
// Write the CSV data to a temporary local file
const tempFilePath = path.join("/tmp", `${documentId}_${timestamp}.csv`);
await writeFileAsync(tempFilePath, csvData, "utf8");
const tempFilePath = path.join(
"/tmp",
`${documentId}_${context.eventId}.csv`
);
await writeFile(tempFilePath, csvData, "utf8");

// Upload the file to Google Cloud Storage
await storage.bucket(bucketName).upload(tempFilePath, {
Expand Down
4 changes: 3 additions & 1 deletion firestore-bigquery-export/functions/src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,9 @@ export default {
process.env.MAX_DISPATCHES_PER_SECOND || "10"
),
kmsKeyName: process.env.KMS_KEY_NAME,
maxEnqueueAttempts: parseInt(process.env.MAX_ENQUEUE_ATTEMPTS || "3"),
maxEnqueueAttempts: isNaN(parseInt(process.env.MAX_ENQUEUE_ATTEMPTS))
? 3
: parseInt(process.env.MAX_ENQUEUE_ATTEMPTS),
// backup bucket defaults to default firebase cloud storage bucket
backupToGCS: process.env.BACKUP_TO_GCS === "yes" ? true : false,
backupBucketName:
Expand Down
55 changes: 29 additions & 26 deletions firestore-bigquery-export/functions/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -111,16 +111,16 @@ export const syncBigQuery = functions.tasks
oldData,
});

if (config.backupToGCS) {
// Backup to Google Cloud Storage as a last resort.
await backupToGCS(config.backupBucketName, config.backupDir, {
changeType,
documentId,
serializedData: data,
serializedOldData: oldData,
context,
});
}
// if (config.backupToGCS) {
// // Backup to Google Cloud Storage as a last resort.
// await backupToGCS(config.backupBucketName, config.backupDir, {
// changeType,
// documentId,
// serializedData: data,
// serializedOldData: oldData,
// context,
// });
// }

throw err;
}
Expand All @@ -130,9 +130,8 @@ export const syncBigQuery = functions.tasks
/**
* Cloud Function triggered on Firestore document changes to export data to BigQuery.
*/
export const fsexportbigquery = functions
.runWith({ failurePolicy: true })
.firestore.database(config.databaseId)
export const fsexportbigquery = functions.firestore
.database(config.databaseId)
.document(config.collectionPath)
.onWrite(async (change, context) => {
// Start logging the function execution.
Expand Down Expand Up @@ -189,8 +188,12 @@ export const fsexportbigquery = functions
context
);
} catch (err) {
functions.logger.warn(
"Failed to write event to BigQuery Immediately. Will attempt to Enqueue to Cloud Tasks.",
err
);
// Handle enqueue errors with retries and backup to GCS.
await handleEnqueueError(
await attemptToEnqueue(
err,
context,
changeType,
Expand Down Expand Up @@ -232,7 +235,7 @@ async function recordEventToBigQuery(
};

// Record the event in the Firestore Event History Tracker and BigQuery.
eventTracker.record([event]);
await eventTracker.record([event]);
}

/**
Expand All @@ -245,7 +248,7 @@ async function recordEventToBigQuery(
* @param serializedData - The serialized new data of the document.
* @param serializedOldData - The serialized old data of the document.
*/
async function handleEnqueueError(
async function attemptToEnqueue(
err: Error,
context: functions.EventContext,
changeType: string,
Expand Down Expand Up @@ -316,16 +319,16 @@ async function handleEnqueueError(
);
}

if (config.backupToGCS) {
// Backup to Google Cloud Storage as a last resort.
await backupToGCS(config.backupBucketName, config.backupDir, {
changeType,
documentId,
serializedData,
serializedOldData,
context,
});
}
// if (config.backupToGCS) {
// // Backup to Google Cloud Storage as a last resort.
// await backupToGCS(config.backupBucketName, config.backupDir, {
// changeType,
// documentId,
// serializedData,
// serializedOldData,
// context,
// });
// }
}
}

Expand Down
33 changes: 33 additions & 0 deletions firestore-bigquery-export/functions/stress_test/count.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
const admin = require("firebase-admin");

// Initialize Firebase Admin with your credentials
// Make sure you've already set up your Firebase Admin SDK
admin.initializeApp({
projectId: "vertex-testing-1efc3",
});

const firestore = admin.firestore();

async function countDocuments(collectionPath) {
try {
const collectionRef = firestore.collection(collectionPath);

// Perform an aggregate query to count the documents
const snapshot = await collectionRef.count().get();

// Access the count from the snapshot
const docCount = snapshot.data().count;

console.log(
`Number of documents in collection '${collectionPath}':`,
docCount
);
return docCount;
} catch (error) {
console.error("Error counting documents:", error);
throw error;
}
}

// Call the function and pass the collection path
countDocuments("posts_2");
104 changes: 104 additions & 0 deletions firestore-bigquery-export/functions/stress_test/main.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
const { Worker } = require("worker_threads");
const { performance } = require("perf_hooks");
const path = require("path");

const totalDocs = 10000000; // Total number of documents to write
const maxThreads = 20; // Maximum number of worker threads
const batchSize = 500; // Documents per batch
const rampUpDelay = 2000; // 5 seconds delay between ramp-ups
const rampUps = 20; // Number of ramp-ups (planned)

const docsPerRampUp = Math.ceil(totalDocs / rampUps); // Documents per ramp-up

// Start measuring total execution time
const totalStartTime = performance.now();

const workerJsPath = path.resolve(__dirname, "worker.js");

// Function to spawn worker threads for a specific ramp-up
const spawnWorkers = async (activeThreads, startDoc, docsPerRampUp) => {
console.log(`Spawning ${activeThreads} worker(s)...`);
let promises = [];
const docsPerThread = Math.ceil(docsPerRampUp / activeThreads);

for (let i = 0; i < activeThreads; i++) {
const docsForThisThread = Math.min(docsPerThread, docsPerRampUp);
const start = startDoc + i * docsPerThread;
const end = Math.min(start + docsForThisThread, startDoc + docsPerRampUp);

promises.push(
new Promise((resolve, reject) => {
const worker = new Worker(workerJsPath, {
workerData: {
start,
end,
batchSize,
},
});

worker.on("message", (message) => {
console.log(`Worker ${i + 1}: ${message}`);
});

worker.on("error", (err) => {
console.error(`Worker ${i + 1} error: ${err}`);
reject(err);
});

worker.on("exit", (code) => {
if (code !== 0) {
reject(new Error(`Worker ${i + 1} stopped with exit code ${code}`));
} else {
resolve();
}
});
})
);
}

try {
await Promise.all(promises);
} catch (error) {
console.error("Error in worker threads: ", error);
throw error;
}
};

// Function to execute ramp-ups
const executeRampUps = async () => {
let activeThreads = 1;
let startDoc = 0;

for (let i = 0; i < rampUps; i++) {
await spawnWorkers(activeThreads, startDoc, docsPerRampUp);
startDoc += docsPerRampUp;

if (activeThreads < maxThreads) {
activeThreads++; // Increase the number of threads for next ramp-up
}

if (i < rampUps - 1) {
console.log(
`Ramping up to ${activeThreads} worker(s) in ${
rampUpDelay / 1000
} seconds...`
);
await new Promise((resolve) => setTimeout(resolve, rampUpDelay));
}
}
};

// Run the ramp-ups
executeRampUps()
.then(() => {
const totalEndTime = performance.now();
const totalDuration = (totalEndTime - totalStartTime) / 1000; // Convert to seconds
console.log(
`Successfully written ${totalDocs} documents to the collection in ${totalDuration.toFixed(
2
)} seconds.`
);
})
.catch((error) => {
console.error("Error in worker threads: ", error);
});
Loading

0 comments on commit abaff8c

Please sign in to comment.