diff --git a/packages/api/src/@core/@core-services/queues/queue.module.ts b/packages/api/src/@core/@core-services/queues/queue.module.ts index 8ce85065e..6dce48c5e 100644 --- a/packages/api/src/@core/@core-services/queues/queue.module.ts +++ b/packages/api/src/@core/@core-services/queues/queue.module.ts @@ -21,6 +21,9 @@ import { Queues } from './types'; { name: Queues.RAG_DOCUMENT_PROCESSING, }, + { + name: Queues.THIRD_PARTY_DATA_INGESTION, + }, ), ], providers: [BullQueueService], diff --git a/packages/api/src/@core/@core-services/queues/shared.service.ts b/packages/api/src/@core/@core-services/queues/shared.service.ts index b059e2529..0c208b9f4 100644 --- a/packages/api/src/@core/@core-services/queues/shared.service.ts +++ b/packages/api/src/@core/@core-services/queues/shared.service.ts @@ -14,6 +14,8 @@ export class BullQueueService { public readonly syncJobsQueue: Queue, @InjectQueue(Queues.FAILED_PASSTHROUGH_REQUESTS_HANDLER) public readonly failedPassthroughRequestsQueue: Queue, + @InjectQueue(Queues.THIRD_PARTY_DATA_INGESTION) + public readonly thirdPartyDataIngestionQueue: Queue, @InjectQueue(Queues.RAG_DOCUMENT_PROCESSING) private ragDocumentQueue: Queue, ) {} @@ -35,6 +37,9 @@ export class BullQueueService { getRagDocumentQueue() { return this.ragDocumentQueue; } + getThirdPartyDataIngestionQueue() { + return this.thirdPartyDataIngestionQueue; + } async removeRepeatableJob(jobName: string) { const jobs = await this.syncJobsQueue.getRepeatableJobs(); diff --git a/packages/api/src/@core/@core-services/queues/types.ts b/packages/api/src/@core/@core-services/queues/types.ts index 5cd578e4b..be54c4139 100644 --- a/packages/api/src/@core/@core-services/queues/types.ts +++ b/packages/api/src/@core/@core-services/queues/types.ts @@ -4,4 +4,5 @@ export enum Queues { SYNC_JOBS_WORKER = 'SYNC_JOBS_WORKER', // Queue which syncs data from remote 3rd parties FAILED_PASSTHROUGH_REQUESTS_HANDLER = 'FAILED_PASSTHROUGH_REQUESTS_HANDLER', // Queue which handles failed passthrough request due to rate limit and retries it with backOff RAG_DOCUMENT_PROCESSING = 'RAG_DOCUMENT_PROCESSING', + THIRD_PARTY_DATA_INGESTION = 'THIRD_PARTY_DATA_INGESTION', } diff --git a/packages/api/src/@core/@core-services/unification/ingest-data.service.ts b/packages/api/src/@core/@core-services/unification/ingest-data.service.ts index 74f11f6b0..353f5718c 100644 --- a/packages/api/src/@core/@core-services/unification/ingest-data.service.ts +++ b/packages/api/src/@core/@core-services/unification/ingest-data.service.ts @@ -85,10 +85,16 @@ export class IngestDataService { .filter((p) => p.shouldPassToService) .map((p) => p.param); + const ingestParams = params + .filter((p) => p.shouldPassToIngest) + .reduce((acc, p) => ({ ...acc, [p.paramName]: p.param }), {}); + // Construct the syncParam object dynamically const syncParam: SyncParam = { linkedUserId, custom_properties: remoteProperties, + custom_field_mappings: customFieldMappings, + ingestParams: ingestParams, }; serviceParams.forEach((param, index) => { @@ -124,11 +130,7 @@ export class IngestDataService { return; } - const sourceObject: U[] = resp.data; - - const ingestParams = params - .filter((p) => p.shouldPassToIngest) - .reduce((acc, p) => ({ ...acc, [p.paramName]: p.param }), {}); + /*const sourceObject: U[] = resp.data; await this.ingestData( sourceObject, @@ -138,7 +140,7 @@ export class IngestDataService { commonObject, customFieldMappings, ingestParams, - ); + );*/ } catch (syncError) { this.logger.error( `Error syncing ${integrationId} ${commonObject}: ${syncError.message}`, @@ -209,7 +211,7 @@ export class IngestDataService { ); // insert the files in our s3 bucket so we can process them for our RAG - if (vertical === 'filestorage' && commonObject === 'file') { + /*if (vertical === 'filestorage' && commonObject === 'file') { try { const filesInfo: FileInfo[] = data .filter((file: FileStorageFile) => file.mime_type !== null) @@ -239,7 +241,7 @@ export class IngestDataService { // Optionally, you could create an event to log this error // await this.prisma.events.create({...}); } - } + }*/ const event = await this.prisma.events.create({ data: { diff --git a/packages/api/src/@core/utils/helpers.ts b/packages/api/src/@core/utils/helpers.ts new file mode 100644 index 000000000..421bda080 --- /dev/null +++ b/packages/api/src/@core/utils/helpers.ts @@ -0,0 +1,3 @@ +export function sleep(ms: number): Promise { + return new Promise((resolve) => setTimeout(resolve, ms)); +} diff --git a/packages/api/src/@core/utils/types/interface.ts b/packages/api/src/@core/utils/types/interface.ts index e744718c0..e03406eae 100644 --- a/packages/api/src/@core/utils/types/interface.ts +++ b/packages/api/src/@core/utils/types/interface.ts @@ -68,8 +68,22 @@ export interface IBaseSync { export type SyncParam = { linkedUserId: string; + custom_field_mappings?: { + slug: string; + remote_id: string; + }[]; + ingestParams: { [key: string]: any }; [key: string]: any; }; export interface IBaseObjectService { sync(data: SyncParam): Promise>; + ingestData?( + sourceData: any[], + connectionId: string, + customFieldMappings?: { + slug: string; + remote_id: string; + }[], + extraParams?: { [key: string]: any }, + ): Promise; } diff --git a/packages/api/src/filestorage/file/file.controller.ts b/packages/api/src/filestorage/file/file.controller.ts index a79fbb1dd..8b4320a89 100644 --- a/packages/api/src/filestorage/file/file.controller.ts +++ b/packages/api/src/filestorage/file/file.controller.ts @@ -46,6 +46,22 @@ export class FileController { this.logger.setContext(FileController.name); } + /*@Get('count') + async getNumberFiles( + @Headers('x-connection-token') connection_token: string, + @Query() query: QueryDto, + ) { + try { + const { linkedUserId, remoteSource, connectionId, projectId } = + await this.connectionUtils.getConnectionMetadataFromConnectionToken( + connection_token, + ); + return this.fileService.getCountFiles(connectionId); + } catch (error) { + throw new Error(error); + } + }*/ + @ApiOperation({ operationId: 'listFilestorageFile', summary: 'List Files', diff --git a/packages/api/src/filestorage/file/file.module.ts b/packages/api/src/filestorage/file/file.module.ts index b46829cee..278fbd810 100644 --- a/packages/api/src/filestorage/file/file.module.ts +++ b/packages/api/src/filestorage/file/file.module.ts @@ -16,6 +16,7 @@ import { ServiceRegistry } from './services/registry.service'; import { SharepointService } from './services/sharepoint'; import { SharepointFileMapper } from './services/sharepoint/mappers'; import { SyncService } from './sync/sync.service'; +import { GoogleDriveQueueProcessor } from './services/googledrive/processor'; @Module({ controllers: [FileController], @@ -39,6 +40,7 @@ import { SyncService } from './sync/sync.service'; DropboxService, DropboxFileMapper, GoogleDriveService, + GoogleDriveQueueProcessor, ], exports: [SyncService, ServiceRegistry], }) diff --git a/packages/api/src/filestorage/file/services/file.service.ts b/packages/api/src/filestorage/file/services/file.service.ts index f0f88a3d3..5102fde26 100644 --- a/packages/api/src/filestorage/file/services/file.service.ts +++ b/packages/api/src/filestorage/file/services/file.service.ts @@ -1,20 +1,19 @@ -import { Injectable } from '@nestjs/common'; -import { PrismaService } from '@@core/@core-services/prisma/prisma.service'; import { LoggerService } from '@@core/@core-services/logger/logger.service'; -import { v4 as uuidv4 } from 'uuid'; -import { ApiResponse } from '@@core/utils/types'; +import { PrismaService } from '@@core/@core-services/prisma/prisma.service'; +import { CoreUnification } from '@@core/@core-services/unification/core-unification.service'; +import { IngestDataService } from '@@core/@core-services/unification/ingest-data.service'; import { WebhookService } from '@@core/@core-services/webhooks/panora-webhooks/webhook.service'; +import { FieldMappingService } from '@@core/field-mapping/field-mapping.service'; +import { ApiResponse } from '@@core/utils/types'; +import { OriginalFileOutput } from '@@core/utils/types/original/original.file-storage'; +import { FileStorageObject } from '@filestorage/@lib/@types'; +import { Injectable } from '@nestjs/common'; +import { v4 as uuidv4 } from 'uuid'; import { UnifiedFilestorageFileInput, UnifiedFilestorageFileOutput, } from '../types/model.unified'; -import { FieldMappingService } from '@@core/field-mapping/field-mapping.service'; import { ServiceRegistry } from './registry.service'; -import { CoreSyncRegistry } from '@@core/@core-services/registries/core-sync.registry'; -import { FileStorageObject } from '@filestorage/@lib/@types'; -import { OriginalFileOutput } from '@@core/utils/types/original/original.file-storage'; -import { CoreUnification } from '@@core/@core-services/unification/core-unification.service'; -import { IngestDataService } from '@@core/@core-services/unification/ingest-data.service'; @Injectable() export class FileService { @@ -441,4 +440,16 @@ export class FileService { throw error; } } + /*async getCountFiles(connection_id: string): Promise { + try { + const fileCount = await this.prisma.fs_files.count({ + where: { + id_connection: connection_id, + }, + }); + return fileCount; + } catch (error) { + throw error; + } + }*/ } diff --git a/packages/api/src/filestorage/file/services/googledrive/index.ts b/packages/api/src/filestorage/file/services/googledrive/index.ts index 6fbf1c0d3..8c19fa217 100644 --- a/packages/api/src/filestorage/file/services/googledrive/index.ts +++ b/packages/api/src/filestorage/file/services/googledrive/index.ts @@ -1,10 +1,12 @@ import { EncryptionService } from '@@core/@core-services/encryption/encryption.service'; import { LoggerService } from '@@core/@core-services/logger/logger.service'; import { PrismaService } from '@@core/@core-services/prisma/prisma.service'; -import { ApiResponse } from '@@core/utils/types'; +import { BullQueueService } from '@@core/@core-services/queues/shared.service'; +import { IngestDataService } from '@@core/@core-services/unification/ingest-data.service'; import { SyncParam } from '@@core/utils/types/interface'; import { FileStorageObject } from '@filestorage/@lib/@types'; import { IFileService } from '@filestorage/file/types'; +import { UnifiedFilestorageFileOutput } from '@filestorage/file/types/model.unified'; import { Injectable } from '@nestjs/common'; import axios from 'axios'; import { OAuth2Client } from 'google-auth-library'; @@ -12,6 +14,9 @@ import { google } from 'googleapis'; import { ServiceRegistry } from '../registry.service'; import { GoogleDriveFileOutput } from './types'; +const BATCH_SIZE = 1000; // Number of files to process in each batch +const API_RATE_LIMIT = 10; // Requests per second + @Injectable() export class GoogleDriveService implements IFileService { constructor( @@ -19,6 +24,8 @@ export class GoogleDriveService implements IFileService { private logger: LoggerService, private cryptoService: EncryptionService, private registry: ServiceRegistry, + private ingestService: IngestDataService, + private bullQueueService: BullQueueService, ) { this.logger.setContext( FileStorageObject.file.toUpperCase() + ':' + GoogleDriveService.name, @@ -26,7 +33,167 @@ export class GoogleDriveService implements IFileService { this.registry.registerService('googledrive', this); } - async sync(data: SyncParam): Promise> { + async ingestData( + sourceData: GoogleDriveFileOutput[], + connectionId: string, + customFieldMappings?: { + slug: string; + remote_id: string; + }[], + extraParams?: { [key: string]: any }, + ): Promise { + return this.ingestService.ingestData< + UnifiedFilestorageFileOutput, + GoogleDriveFileOutput + >( + sourceData, + 'googledrive', + connectionId, + 'filestorage', + 'file', + customFieldMappings, + extraParams, + ); + } + + async sync(data: SyncParam) { + const { linkedUserId, custom_field_mappings, ingestParams } = data; + const connection = await this.prisma.connections.findFirst({ + where: { + id_linked_user: linkedUserId, + provider_slug: 'googledrive', + vertical: 'filestorage', + }, + }); + + if (!connection) return; + + const auth = new OAuth2Client(); + auth.setCredentials({ + access_token: this.cryptoService.decrypt(connection.access_token), + }); + const drive = google.drive({ version: 'v3', auth }); + + const lastSyncTime = await this.getLastSyncTime(connection.id_connection); + const query = lastSyncTime + ? `trashed = false and modifiedTime > '${lastSyncTime.toISOString()}'` + : 'trashed = false'; + + let pageToken: string | undefined; + let count = 0; + do { + const response = await this.rateLimitedRequest(() => + drive.files.list({ + q: query, + fields: 'nextPageToken', + pageSize: BATCH_SIZE, + pageToken: pageToken, + }), + ); + + count++; + + await this.bullQueueService + .getThirdPartyDataIngestionQueue() + .add('fs_file_googledrive', { + ...data, + pageToken: (response as any).data.nextPageToken, + query, + connectionId: connection.id_connection, + custom_field_mappings, + ingestParams, + }); + + pageToken = (response as any).data.nextPageToken; + } while (pageToken); + console.log(`it has been called ${count} times`) + return { + data: [], + message: 'Google Drive sync completed', + statusCode: 200, + }; + } + + async processBatch(job: any) { + const { + linkedUserId, + query, + pageToken, + connectionId, + custom_field_mappings, + ingestParams, + } = job.data; + const connection = await this.prisma.connections.findFirst({ + where: { + id_linked_user: linkedUserId, + provider_slug: 'googledrive', + vertical: 'filestorage', + }, + }); + + if (!connection) return; + + const auth = new OAuth2Client(); + auth.setCredentials({ + access_token: this.cryptoService.decrypt(connection.access_token), + }); + const drive = google.drive({ version: 'v3', auth }); + try { + const response = await this.rateLimitedRequest(() => + drive.files.list({ + q: query, + fields: + 'files(id, name, mimeType, modifiedTime, size, parents, webViewLink)', + pageSize: BATCH_SIZE, + pageToken: pageToken, + orderBy: 'modifiedTime', + }), + ); + + const files: GoogleDriveFileOutput[] = (response as any).data.files.map((file) => ({ + id: file.id!, + name: file.name!, + mimeType: file.mimeType!, + modifiedTime: file.modifiedTime!, + size: file.size!, + parents: file.parents, + webViewLink: file.webViewLink, + })); + + await this.ingestData( + files, + connectionId, + custom_field_mappings, + ingestParams, + ); + } catch (error) { + this.logger.error('Error in processBatch:', error); + if (error.message === 'Invalid Value') { + this.logger.error('This may be due to an expired or invalid access token.', error); + } + throw error; + } + } + + private async rateLimitedRequest(request: () => Promise): Promise { + return new Promise((resolve, reject) => { + setTimeout(async () => { + try { + const result = await request(); + resolve(result); + } catch (error) { + this.logger.error('Error in rateLimitedRequest:', error); + if (error.response) { + this.logger.error('Response data:', error.response.data); + this.logger.error('Response status:', error.response.status); + } + reject(error); + } + }, 1000 / API_RATE_LIMIT); + }); + } + + /*async sync(data: SyncParam): Promise> { try { const { linkedUserId, id_folder } = data; @@ -46,34 +213,63 @@ export class GoogleDriveService implements IFileService { }); const drive = google.drive({ version: 'v3', auth }); - const response = await drive.files.list({ - q: 'trashed = false', - fields: - 'files(id, name, mimeType, modifiedTime, size, parents, webViewLink)', - pageSize: 1000, // Adjust as needed - }); - - const files: GoogleDriveFileOutput[] = response.data.files.map( - (file) => ({ - id: file.id!, - name: file.name!, - mimeType: file.mimeType!, - modifiedTime: file.modifiedTime!, - size: file.size!, - parents: file.parents, - webViewLink: file.webViewLink, - }), + const lastSyncTime = await this.getLastSyncTime(connection.id_connection); + console.log( + 'last updated time for google drive file is ' + + JSON.stringify(lastSyncTime), ); + let pageToken: string | undefined; + let allFiles: GoogleDriveFileOutput[] = []; + + const query = lastSyncTime + ? `trashed = false and modifiedTime > '${lastSyncTime.toISOString()}'` + : 'trashed = false'; + + do { + const response = await drive.files.list({ + q: query, + fields: + 'nextPageToken, files(id, name, mimeType, modifiedTime, size, parents, webViewLink)', + pageSize: 1000, + pageToken: pageToken, + orderBy: 'modifiedTime', + }); + + const files: GoogleDriveFileOutput[] = response.data.files.map( + (file) => ({ + id: file.id!, + name: file.name!, + mimeType: file.mimeType!, + modifiedTime: file.modifiedTime!, + size: file.size!, + parents: file.parents, + webViewLink: file.webViewLink, + }), + ); + allFiles = allFiles.concat(files); + pageToken = response.data.nextPageToken; + if (pageToken) { + await sleep(100); // Wait 100ms between requests to avoid hitting rate limits + } + } while (pageToken); this.logger.log(`Synced googledrive files !`); return { - data: files, + data: allFiles, message: 'Google Drive files retrieved', statusCode: 200, }; } catch (error) { throw error; } + }*/ + + private async getLastSyncTime(connectionId: string): Promise { + const lastSync = await this.prisma.fs_files.findFirst({ + where: { id_connection: connectionId }, + orderBy: { modified_at: 'desc' }, + }); + return lastSync ? lastSync.modified_at : null; } async downloadFile(fileId: string, connection: any): Promise { diff --git a/packages/api/src/filestorage/file/services/googledrive/processor.ts b/packages/api/src/filestorage/file/services/googledrive/processor.ts new file mode 100644 index 000000000..2ae382e97 --- /dev/null +++ b/packages/api/src/filestorage/file/services/googledrive/processor.ts @@ -0,0 +1,23 @@ +import { Process, Processor } from '@nestjs/bull'; +import { Injectable } from '@nestjs/common'; +import { Job } from 'bull'; +import { Queues } from '@@core/@core-services/queues/types'; +import { GoogleDriveService } from '.'; + +@Injectable() +@Processor(Queues.THIRD_PARTY_DATA_INGESTION) +export class GoogleDriveQueueProcessor { + constructor(private readonly googleDriveService: GoogleDriveService) {} + + @Process('fs_file_googledrive') + async handleGoogleDriveSync(job: Job) { + try { + await this.googleDriveService.processBatch(job); + } catch (error) { + console.error( + `Failed to process Google Drive sync job: ${error.message}`, + ); + throw error; + } + } +} diff --git a/packages/api/src/main.ts b/packages/api/src/main.ts index 4ee9678d4..ca08b7e59 100644 --- a/packages/api/src/main.ts +++ b/packages/api/src/main.ts @@ -95,7 +95,7 @@ async function bootstrap() { extendedSpecs['x-speakeasy-name-override']; addSpeakeasyGroup(document); - await generatePanoraParamsSpec(document); + // TODO: await generatePanoraParamsSpec(document); useContainer(app.select(AppModule), { fallbackOnErrors: true });