Skip to content

Commit

Permalink
Merge pull request #710 from panoratech/feat/gdrive-batch-scaling
Browse files Browse the repository at this point in the history
🚑 Scaling fetching datta from gdrive
  • Loading branch information
naelob authored Oct 8, 2024
2 parents a12f389 + fc854e1 commit 462c514
Show file tree
Hide file tree
Showing 12 changed files with 315 additions and 39 deletions.
3 changes: 3 additions & 0 deletions packages/api/src/@core/@core-services/queues/queue.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ import { Queues } from './types';
{
name: Queues.RAG_DOCUMENT_PROCESSING,
},
{
name: Queues.THIRD_PARTY_DATA_INGESTION,
},
),
],
providers: [BullQueueService],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ export class BullQueueService {
public readonly syncJobsQueue: Queue,
@InjectQueue(Queues.FAILED_PASSTHROUGH_REQUESTS_HANDLER)
public readonly failedPassthroughRequestsQueue: Queue,
@InjectQueue(Queues.THIRD_PARTY_DATA_INGESTION)
public readonly thirdPartyDataIngestionQueue: Queue,
@InjectQueue(Queues.RAG_DOCUMENT_PROCESSING)
private ragDocumentQueue: Queue,
) {}
Expand All @@ -35,6 +37,9 @@ export class BullQueueService {
getRagDocumentQueue() {
return this.ragDocumentQueue;
}
getThirdPartyDataIngestionQueue() {
return this.thirdPartyDataIngestionQueue;
}

async removeRepeatableJob(jobName: string) {
const jobs = await this.syncJobsQueue.getRepeatableJobs();
Expand Down
1 change: 1 addition & 0 deletions packages/api/src/@core/@core-services/queues/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,5 @@ export enum Queues {
SYNC_JOBS_WORKER = 'SYNC_JOBS_WORKER', // Queue which syncs data from remote 3rd parties
FAILED_PASSTHROUGH_REQUESTS_HANDLER = 'FAILED_PASSTHROUGH_REQUESTS_HANDLER', // Queue which handles failed passthrough request due to rate limit and retries it with backOff
RAG_DOCUMENT_PROCESSING = 'RAG_DOCUMENT_PROCESSING',
THIRD_PARTY_DATA_INGESTION = 'THIRD_PARTY_DATA_INGESTION',
}
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,16 @@ export class IngestDataService {
.filter((p) => p.shouldPassToService)
.map((p) => p.param);

const ingestParams = params
.filter((p) => p.shouldPassToIngest)
.reduce((acc, p) => ({ ...acc, [p.paramName]: p.param }), {});

// Construct the syncParam object dynamically
const syncParam: SyncParam = {
linkedUserId,
custom_properties: remoteProperties,
custom_field_mappings: customFieldMappings,
ingestParams: ingestParams,
};

serviceParams.forEach((param, index) => {
Expand Down Expand Up @@ -124,11 +130,7 @@ export class IngestDataService {
return;
}

const sourceObject: U[] = resp.data;

const ingestParams = params
.filter((p) => p.shouldPassToIngest)
.reduce((acc, p) => ({ ...acc, [p.paramName]: p.param }), {});
/*const sourceObject: U[] = resp.data;
await this.ingestData<T, U>(
sourceObject,
Expand All @@ -138,7 +140,7 @@ export class IngestDataService {
commonObject,
customFieldMappings,
ingestParams,
);
);*/
} catch (syncError) {
this.logger.error(
`Error syncing ${integrationId} ${commonObject}: ${syncError.message}`,
Expand Down Expand Up @@ -209,7 +211,7 @@ export class IngestDataService {
);

// insert the files in our s3 bucket so we can process them for our RAG
if (vertical === 'filestorage' && commonObject === 'file') {
/*if (vertical === 'filestorage' && commonObject === 'file') {
try {
const filesInfo: FileInfo[] = data
.filter((file: FileStorageFile) => file.mime_type !== null)
Expand Down Expand Up @@ -239,7 +241,7 @@ export class IngestDataService {
// Optionally, you could create an event to log this error
// await this.prisma.events.create({...});
}
}
}*/

const event = await this.prisma.events.create({
data: {
Expand Down
3 changes: 3 additions & 0 deletions packages/api/src/@core/utils/helpers.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
export function sleep(ms: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, ms));
}
14 changes: 14 additions & 0 deletions packages/api/src/@core/utils/types/interface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,22 @@ export interface IBaseSync {

export type SyncParam = {
linkedUserId: string;
custom_field_mappings?: {
slug: string;
remote_id: string;
}[];
ingestParams: { [key: string]: any };
[key: string]: any;
};
export interface IBaseObjectService {
sync(data: SyncParam): Promise<ApiResponse<any>>;
ingestData?(
sourceData: any[],
connectionId: string,
customFieldMappings?: {
slug: string;
remote_id: string;
}[],
extraParams?: { [key: string]: any },
): Promise<any[]>;
}
16 changes: 16 additions & 0 deletions packages/api/src/filestorage/file/file.controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,22 @@ export class FileController {
this.logger.setContext(FileController.name);
}

/*@Get('count')
async getNumberFiles(
@Headers('x-connection-token') connection_token: string,
@Query() query: QueryDto,
) {
try {
const { linkedUserId, remoteSource, connectionId, projectId } =
await this.connectionUtils.getConnectionMetadataFromConnectionToken(
connection_token,
);
return this.fileService.getCountFiles(connectionId);
} catch (error) {
throw new Error(error);
}
}*/

@ApiOperation({
operationId: 'listFilestorageFile',
summary: 'List Files',
Expand Down
2 changes: 2 additions & 0 deletions packages/api/src/filestorage/file/file.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import { ServiceRegistry } from './services/registry.service';
import { SharepointService } from './services/sharepoint';
import { SharepointFileMapper } from './services/sharepoint/mappers';
import { SyncService } from './sync/sync.service';
import { GoogleDriveQueueProcessor } from './services/googledrive/processor';

@Module({
controllers: [FileController],
Expand All @@ -39,6 +40,7 @@ import { SyncService } from './sync/sync.service';
DropboxService,
DropboxFileMapper,
GoogleDriveService,
GoogleDriveQueueProcessor,
],
exports: [SyncService, ServiceRegistry],
})
Expand Down
31 changes: 21 additions & 10 deletions packages/api/src/filestorage/file/services/file.service.ts
Original file line number Diff line number Diff line change
@@ -1,20 +1,19 @@
import { Injectable } from '@nestjs/common';
import { PrismaService } from '@@core/@core-services/prisma/prisma.service';
import { LoggerService } from '@@core/@core-services/logger/logger.service';
import { v4 as uuidv4 } from 'uuid';
import { ApiResponse } from '@@core/utils/types';
import { PrismaService } from '@@core/@core-services/prisma/prisma.service';
import { CoreUnification } from '@@core/@core-services/unification/core-unification.service';
import { IngestDataService } from '@@core/@core-services/unification/ingest-data.service';
import { WebhookService } from '@@core/@core-services/webhooks/panora-webhooks/webhook.service';
import { FieldMappingService } from '@@core/field-mapping/field-mapping.service';
import { ApiResponse } from '@@core/utils/types';
import { OriginalFileOutput } from '@@core/utils/types/original/original.file-storage';
import { FileStorageObject } from '@filestorage/@lib/@types';
import { Injectable } from '@nestjs/common';
import { v4 as uuidv4 } from 'uuid';
import {
UnifiedFilestorageFileInput,
UnifiedFilestorageFileOutput,
} from '../types/model.unified';
import { FieldMappingService } from '@@core/field-mapping/field-mapping.service';
import { ServiceRegistry } from './registry.service';
import { CoreSyncRegistry } from '@@core/@core-services/registries/core-sync.registry';
import { FileStorageObject } from '@filestorage/@lib/@types';
import { OriginalFileOutput } from '@@core/utils/types/original/original.file-storage';
import { CoreUnification } from '@@core/@core-services/unification/core-unification.service';
import { IngestDataService } from '@@core/@core-services/unification/ingest-data.service';

@Injectable()
export class FileService {
Expand Down Expand Up @@ -441,4 +440,16 @@ export class FileService {
throw error;
}
}
/*async getCountFiles(connection_id: string): Promise<number> {
try {
const fileCount = await this.prisma.fs_files.count({
where: {
id_connection: connection_id,
},
});
return fileCount;
} catch (error) {
throw error;
}
}*/
}
Loading

0 comments on commit 462c514

Please sign in to comment.