Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🚑 Scaling fetching datta from gdrive #710

Merged
merged 3 commits into from
Oct 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions packages/api/src/@core/@core-services/queues/queue.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ import { Queues } from './types';
{
name: Queues.RAG_DOCUMENT_PROCESSING,
},
{
name: Queues.THIRD_PARTY_DATA_INGESTION,
},
Comment on lines +24 to +26
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick (assertive)

LGTM! Consider adding a comment for the new queue.

The addition of the THIRD_PARTY_DATA_INGESTION queue is correct and aligns with the PR objective of scaling data fetching from Google Drive. The implementation is consistent with the existing code structure.

Consider adding a brief comment above this queue configuration to explain its purpose, similar to:

// Queue for handling third-party data ingestion tasks, such as fetching data from Google Drive
{
  name: Queues.THIRD_PARTY_DATA_INGESTION,
},

This would improve code readability and make it easier for other developers to understand the purpose of this queue.

),
],
providers: [BullQueueService],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ export class BullQueueService {
public readonly syncJobsQueue: Queue,
@InjectQueue(Queues.FAILED_PASSTHROUGH_REQUESTS_HANDLER)
public readonly failedPassthroughRequestsQueue: Queue,
@InjectQueue(Queues.THIRD_PARTY_DATA_INGESTION)
public readonly thirdPartyDataIngestionQueue: Queue,
Comment on lines +17 to +18
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick (assertive)

LGTM! Consider adding a comment for the new queue.

The new queue injection for thirdPartyDataIngestionQueue is well-implemented and follows the existing patterns in the class. It's correctly declared as public readonly and uses the Queues.THIRD_PARTY_DATA_INGESTION constant.

Consider adding a brief comment explaining the purpose of this new queue, similar to:

/**
 * Queue for handling third-party data ingestion tasks
 */
@InjectQueue(Queues.THIRD_PARTY_DATA_INGESTION)
public readonly thirdPartyDataIngestionQueue: Queue,

This would improve code documentation and make it easier for other developers to understand the queue's purpose at a glance.

@InjectQueue(Queues.RAG_DOCUMENT_PROCESSING)
private ragDocumentQueue: Queue,
) {}
Expand All @@ -35,6 +37,9 @@ export class BullQueueService {
getRagDocumentQueue() {
return this.ragDocumentQueue;
}
getThirdPartyDataIngestionQueue() {
return this.thirdPartyDataIngestionQueue;
}

async removeRepeatableJob(jobName: string) {
const jobs = await this.syncJobsQueue.getRepeatableJobs();
Expand Down
1 change: 1 addition & 0 deletions packages/api/src/@core/@core-services/queues/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,5 @@ export enum Queues {
SYNC_JOBS_WORKER = 'SYNC_JOBS_WORKER', // Queue which syncs data from remote 3rd parties
FAILED_PASSTHROUGH_REQUESTS_HANDLER = 'FAILED_PASSTHROUGH_REQUESTS_HANDLER', // Queue which handles failed passthrough request due to rate limit and retries it with backOff
RAG_DOCUMENT_PROCESSING = 'RAG_DOCUMENT_PROCESSING',
THIRD_PARTY_DATA_INGESTION = 'THIRD_PARTY_DATA_INGESTION',
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick (assertive)

LGTM! Consider adding a descriptive comment.

The addition of THIRD_PARTY_DATA_INGESTION to the Queues enum is appropriate and aligns with the PR's objective of scaling data fetching from Google Drive. The naming convention is consistent with other entries.

Consider adding a descriptive comment for this new queue type, similar to the other entries. For example:

-  THIRD_PARTY_DATA_INGESTION = 'THIRD_PARTY_DATA_INGESTION',
+  THIRD_PARTY_DATA_INGESTION = 'THIRD_PARTY_DATA_INGESTION', // Queue for ingesting data from third-party sources like Google Drive
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
THIRD_PARTY_DATA_INGESTION = 'THIRD_PARTY_DATA_INGESTION',
THIRD_PARTY_DATA_INGESTION = 'THIRD_PARTY_DATA_INGESTION', // Queue for ingesting data from third-party sources like Google Drive

}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import { PrismaService } from '../prisma/prisma.service';
import {
ApiResponse,
getFileExtensionFromMimeType,

Check warning on line 8 in packages/api/src/@core/@core-services/unification/ingest-data.service.ts

View workflow job for this annotation

GitHub Actions / Build and Test (18.x)

'getFileExtensionFromMimeType' is defined but never used
TargetObject,
} from '@@core/utils/types';
import { UnifySourceType } from '@@core/utils/types/unify.output';
Expand All @@ -15,8 +15,8 @@
import { FieldMappingService } from '@@core/field-mapping/field-mapping.service';
import { LoggerService } from '../logger/logger.service';
import { RagService } from '@@core/rag/rag.service';
import { FileInfo } from '@@core/rag/types';

Check warning on line 18 in packages/api/src/@core/@core-services/unification/ingest-data.service.ts

View workflow job for this annotation

GitHub Actions / Build and Test (18.x)

'FileInfo' is defined but never used
import { fs_files as FileStorageFile } from '@prisma/client';

Check warning on line 19 in packages/api/src/@core/@core-services/unification/ingest-data.service.ts

View workflow job for this annotation

GitHub Actions / Build and Test (18.x)

'FileStorageFile' is defined but never used

@Injectable()
export class IngestDataService {
Expand All @@ -31,7 +31,7 @@
private ragService: RagService,
) {}

async syncForLinkedUser<T, U, V extends IBaseObjectService>(

Check warning on line 34 in packages/api/src/@core/@core-services/unification/ingest-data.service.ts

View workflow job for this annotation

GitHub Actions / Build and Test (18.x)

'T' is defined but never used
integrationId: string,
linkedUserId: string,
vertical: string,
Expand Down Expand Up @@ -85,10 +85,16 @@
.filter((p) => p.shouldPassToService)
.map((p) => p.param);

const ingestParams = params
.filter((p) => p.shouldPassToIngest)
.reduce((acc, p) => ({ ...acc, [p.paramName]: p.param }), {});
Comment on lines +88 to +90
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Consider optimizing the ingestParams creation for better performance.

The current implementation using reduce with spread operator on the accumulator can lead to performance issues for large arrays, potentially causing O(n^2) time complexity.

Consider refactoring to use Object.fromEntries for better performance:

const ingestParams = Object.fromEntries(
  params
    .filter((p) => p.shouldPassToIngest)
    .map((p) => [p.paramName, p.param])
);

This approach avoids the spread operator in the accumulator, resulting in O(n) time complexity.

🧰 Tools
🪛 Biome

[error] 90-90: Avoid the use of spread (...) syntax on accumulators.

Spread syntax should be avoided on accumulators (like those in .reduce) because it causes a time complexity of O(n^2).
Consider methods such as .splice or .push instead.

(lint/performance/noAccumulatingSpread)


// Construct the syncParam object dynamically
const syncParam: SyncParam = {
linkedUserId,
custom_properties: remoteProperties,
custom_field_mappings: customFieldMappings,
ingestParams: ingestParams,
};

serviceParams.forEach((param, index) => {
Expand Down Expand Up @@ -124,11 +130,7 @@
return;
}

const sourceObject: U[] = resp.data;

const ingestParams = params
.filter((p) => p.shouldPassToIngest)
.reduce((acc, p) => ({ ...acc, [p.paramName]: p.param }), {});
/*const sourceObject: U[] = resp.data;

await this.ingestData<T, U>(
sourceObject,
Expand All @@ -138,7 +140,7 @@
commonObject,
customFieldMappings,
ingestParams,
);
);*/
} catch (syncError) {
this.logger.error(
`Error syncing ${integrationId} ${commonObject}: ${syncError.message}`,
Expand Down Expand Up @@ -209,7 +211,7 @@
);

// insert the files in our s3 bucket so we can process them for our RAG
if (vertical === 'filestorage' && commonObject === 'file') {
/*if (vertical === 'filestorage' && commonObject === 'file') {
try {
const filesInfo: FileInfo[] = data
.filter((file: FileStorageFile) => file.mime_type !== null)
Expand Down Expand Up @@ -239,7 +241,7 @@
// Optionally, you could create an event to log this error
// await this.prisma.events.create({...});
}
}
}*/

const event = await this.prisma.events.create({
data: {
Expand Down
3 changes: 3 additions & 0 deletions packages/api/src/@core/utils/helpers.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
export function sleep(ms: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, ms));
}
Comment on lines +1 to +3
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick (assertive)

LGTM! Consider adding a brief JSDoc comment.

The sleep function is well-implemented and follows good practices. It provides a useful utility for introducing delays in asynchronous operations, which could be beneficial for rate limiting or managing API requests when fetching data from Google Drive.

Consider adding a brief JSDoc comment to improve documentation:

+/**
+ * Pauses execution for the specified number of milliseconds.
+ * @param ms The number of milliseconds to sleep.
+ * @returns A Promise that resolves after the specified delay.
+ */
export function sleep(ms: number): Promise<void> {
  return new Promise((resolve) => setTimeout(resolve, ms));
}
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
export function sleep(ms: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, ms));
}
/**
* Pauses execution for the specified number of milliseconds.
* @param ms The number of milliseconds to sleep.
* @returns A Promise that resolves after the specified delay.
*/
export function sleep(ms: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, ms));
}

14 changes: 14 additions & 0 deletions packages/api/src/@core/utils/types/interface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,22 @@ export interface IBaseSync {

export type SyncParam = {
linkedUserId: string;
custom_field_mappings?: {
slug: string;
remote_id: string;
}[];
ingestParams: { [key: string]: any };
Comment on lines +71 to +75
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick (assertive)

LGTM with a minor suggestion

The additions to the SyncParam type enhance its flexibility for data integration and ingestion scenarios. This aligns well with the PR's objective of scaling data fetching from Google Drive.

Consider making ingestParams optional to accommodate cases where no additional parameters are needed:

ingestParams?: { [key: string]: any };

This change would maintain flexibility while not requiring empty objects to be passed when no extra parameters are necessary.

[key: string]: any;
};
export interface IBaseObjectService {
sync(data: SyncParam): Promise<ApiResponse<any>>;
ingestData?(
sourceData: any[],
connectionId: string,
customFieldMappings?: {
slug: string;
remote_id: string;
}[],
extraParams?: { [key: string]: any },
): Promise<any[]>;
}
16 changes: 16 additions & 0 deletions packages/api/src/filestorage/file/file.controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,22 @@ export class FileController {
this.logger.setContext(FileController.name);
}

/*@Get('count')
async getNumberFiles(
@Headers('x-connection-token') connection_token: string,
@Query() query: QueryDto,
) {
try {
const { linkedUserId, remoteSource, connectionId, projectId } =
await this.connectionUtils.getConnectionMetadataFromConnectionToken(
connection_token,
);
return this.fileService.getCountFiles(connectionId);
} catch (error) {
throw new Error(error);
}
}*/

@ApiOperation({
operationId: 'listFilestorageFile',
summary: 'List Files',
Expand Down
2 changes: 2 additions & 0 deletions packages/api/src/filestorage/file/file.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import { ServiceRegistry } from './services/registry.service';
import { SharepointService } from './services/sharepoint';
import { SharepointFileMapper } from './services/sharepoint/mappers';
import { SyncService } from './sync/sync.service';
import { GoogleDriveQueueProcessor } from './services/googledrive/processor';

@Module({
controllers: [FileController],
Expand All @@ -39,6 +40,7 @@ import { SyncService } from './sync/sync.service';
DropboxService,
DropboxFileMapper,
GoogleDriveService,
GoogleDriveQueueProcessor,
],
exports: [SyncService, ServiceRegistry],
})
Expand Down
31 changes: 21 additions & 10 deletions packages/api/src/filestorage/file/services/file.service.ts
Original file line number Diff line number Diff line change
@@ -1,20 +1,19 @@
import { Injectable } from '@nestjs/common';
import { PrismaService } from '@@core/@core-services/prisma/prisma.service';
import { LoggerService } from '@@core/@core-services/logger/logger.service';
import { v4 as uuidv4 } from 'uuid';
import { ApiResponse } from '@@core/utils/types';
import { PrismaService } from '@@core/@core-services/prisma/prisma.service';
import { CoreUnification } from '@@core/@core-services/unification/core-unification.service';
import { IngestDataService } from '@@core/@core-services/unification/ingest-data.service';
import { WebhookService } from '@@core/@core-services/webhooks/panora-webhooks/webhook.service';
import { FieldMappingService } from '@@core/field-mapping/field-mapping.service';
import { ApiResponse } from '@@core/utils/types';
import { OriginalFileOutput } from '@@core/utils/types/original/original.file-storage';
import { FileStorageObject } from '@filestorage/@lib/@types';
import { Injectable } from '@nestjs/common';
import { v4 as uuidv4 } from 'uuid';
import {
UnifiedFilestorageFileInput,
UnifiedFilestorageFileOutput,
} from '../types/model.unified';
import { FieldMappingService } from '@@core/field-mapping/field-mapping.service';
import { ServiceRegistry } from './registry.service';
import { CoreSyncRegistry } from '@@core/@core-services/registries/core-sync.registry';
import { FileStorageObject } from '@filestorage/@lib/@types';
import { OriginalFileOutput } from '@@core/utils/types/original/original.file-storage';
import { CoreUnification } from '@@core/@core-services/unification/core-unification.service';
import { IngestDataService } from '@@core/@core-services/unification/ingest-data.service';

@Injectable()
export class FileService {
Expand Down Expand Up @@ -441,4 +440,16 @@ export class FileService {
throw error;
}
}
/*async getCountFiles(connection_id: string): Promise<number> {
try {
const fileCount = await this.prisma.fs_files.count({
where: {
id_connection: connection_id,
},
});
return fileCount;
} catch (error) {
throw error;
}
}*/
}
Loading
Loading