diff --git a/packages/api/prisma/schema.prisma b/packages/api/prisma/schema.prisma index 45ad8e9b0..72078d483 100644 --- a/packages/api/prisma/schema.prisma +++ b/packages/api/prisma/schema.prisma @@ -1300,21 +1300,22 @@ model connection_strategies { /// This model or at least one of its fields has comments in the database, and requires an additional setup for migrations: Read more: https://pris.ly/d/database-comments model connections { - id_connection String @id(map: "pk_connections") @db.Uuid - status String - provider_slug String - vertical String - account_url String? - token_type String - access_token String? - refresh_token String? - expiration_timestamp DateTime? @db.Timestamptz(6) - created_at DateTime @db.Timestamptz(6) - connection_token String? - id_project String @db.Uuid - id_linked_user String @db.Uuid - linked_users linked_users @relation(fields: [id_linked_user], references: [id_linked_user], onDelete: NoAction, onUpdate: NoAction, map: "fk_11") - projects projects @relation(fields: [id_project], references: [id_project], onDelete: NoAction, onUpdate: NoAction, map: "fk_9") + id_connection String @id(map: "pk_connections") @db.Uuid + status String + provider_slug String + vertical String + account_url String? + token_type String + access_token String? + refresh_token String? + expiration_timestamp DateTime? @db.Timestamptz(6) + created_at DateTime @db.Timestamptz(6) + connection_token String? + id_project String @db.Uuid + id_linked_user String @db.Uuid + linked_users linked_users @relation(fields: [id_linked_user], references: [id_linked_user], onDelete: NoAction, onUpdate: NoAction, map: "fk_11") + projects projects @relation(fields: [id_project], references: [id_project], onDelete: NoAction, onUpdate: NoAction, map: "fk_9") + vertical_objects_sync_track_data vertical_objects_sync_track_data[] @@index([id_project], map: "fk_1") @@index([id_linked_user], map: "fk_connections_to_linkedusersid") @@ -1346,6 +1347,11 @@ model connector_sets { ats_ashby Boolean? ecom_webflow Boolean? crm_microsoftdynamicssales Boolean? + fs_dropbox Boolean? + fs_googledrive Boolean? + fs_sharepoint Boolean? + fs_onedrive Boolean? + crm_salesforce Boolean? projects projects[] } @@ -2144,3 +2150,14 @@ model projects_pull_frequency { id_project String @unique(map: "uq_projects_pull_frequency_project") @db.Uuid projects projects @relation(fields: [id_project], references: [id_project], onDelete: NoAction, onUpdate: NoAction, map: "fk_projects_pull_frequency_project") } + +model vertical_objects_sync_track_data { + id_vertical_objects_sync_track_data String @id(map: "pk_vertical_objects_sync_track_data") @db.Uuid + vertical String + provider_slug String + object String + pagination_type String + id_connection String @db.Uuid + data Json? @db.Json + connections connections @relation(fields: [id_connection], references: [id_connection], onDelete: NoAction, onUpdate: NoAction, map: "fk_connection") +} diff --git a/packages/api/scripts/init.sql b/packages/api/scripts/init.sql index 4bdb1db6e..9daef6355 100644 --- a/packages/api/scripts/init.sql +++ b/packages/api/scripts/init.sql @@ -2900,6 +2900,20 @@ COMMENT ON COLUMN connections.token_type IS 'The type of the token, such as "Bea COMMENT ON COLUMN connections.connection_token IS 'Connection token users will put in their header to identify which service / linked_User they make request for'; +-- ************************************** vertical_objects_sync_track_data +CREATE TABLE vertical_objects_sync_track_data +( + id_vertical_objects_sync_track_data uuid NOT NULL, + vertical text NOT NULL, + provider_slug text NOT NULL, + object text NOT NULL, + pagination_type text NOT NULL, + id_connection uuid NOT NULL, + data json, + CONSTRAINT PK_vertical_objects_sync_track_data PRIMARY KEY ( id_vertical_objects_sync_track_data ), + CONSTRAINT FK_connection FOREIGN KEY ( id_connection ) REFERENCES connections ( id_connection ) +); + -- ************************************** ats_scorecards CREATE TABLE ats_scorecards ( diff --git a/packages/api/src/crm/company/services/attio/index.ts b/packages/api/src/crm/company/services/attio/index.ts index c47485406..77e306411 100644 --- a/packages/api/src/crm/company/services/attio/index.ts +++ b/packages/api/src/crm/company/services/attio/index.ts @@ -9,9 +9,10 @@ import { EncryptionService } from '@@core/@core-services/encryption/encryption.s import { ApiResponse } from '@@core/utils/types'; import { ICompanyService } from '@crm/company/types'; import { ServiceRegistry } from '../registry.service'; -import { AttioCompanyInput, AttioCompanyOutput } from './types'; +import { AttioCompanyInput, AttioCompanyOutput, paginationType } from './types'; import { SyncParam } from '@@core/utils/types/interface'; import { OriginalCompanyOutput } from '@@core/utils/types/original/original.crm'; +import { v4 as uuidv4 } from 'uuid'; @Injectable() export class AttioService implements ICompanyService { @@ -74,20 +75,134 @@ export class AttioService implements ICompanyService { vertical: 'crm', }, }); - const resp = await axios.post( - `${connection.account_url}/v2/objects/companies/records/query`, - {}, - { - headers: { - 'Content-Type': 'application/json', - Authorization: `Bearer ${this.cryptoService.decrypt( - connection.access_token, - )}`, - }, + + const paginationTrackInfo = await this.prisma.vertical_objects_sync_track_data.findFirst({ + where: { + id_connection: connection.id_connection, + vertical: 'crm', + provider_slug: 'attio', + object: 'company', }, - ); + }); + + let respData: AttioCompanyOutput[] = []; + let initialOffset: number = 0; + + if (!paginationTrackInfo) { + // Intial sync + try { + while (true) { + const resp = await axios.post( + `${connection.account_url}/v2/objects/companies/records/query`, + { + "sorts": [ + { + "attribute": "created_at", + "direction": "asc" + } + ], + "offset": initialOffset, + "limit": 500 + }, + { + headers: { + accept: 'application/json', + 'Content-Type': 'application/json', + Authorization: `Bearer ${this.cryptoService.decrypt( + connection.access_token, + )}`, + }, + }, + ); + + + respData.push(...resp.data.data); + initialOffset = initialOffset + resp.data.data.length; + + if (resp.data.data.length < 500) { + break; + } + } + } + catch (error) { + this.logger.log(`Error in initial sync ${error.message} and last offset is ${initialOffset}`); + } + + } + else { + // Incremental sync + const currentPaginationData = paginationTrackInfo.data as paginationType; + initialOffset = currentPaginationData.offset; + + try { + while (true) { + const resp = await axios.post( + `${connection.account_url}/v2/objects/companies/records/query`, + { + "sorts": [ + { + "attribute": "created_at", + "direction": "asc" + } + ], + "offset": initialOffset, + "limit": 500 + }, + { + headers: { + accept: 'application/json', + 'Content-Type': 'application/json', + Authorization: `Bearer ${this.cryptoService.decrypt( + connection.access_token, + )}` + } + } + ); + + respData.push(...resp.data.data); + initialOffset = initialOffset + resp.data.data.length; + + if (resp.data.data.length < 500) { + break; + } + } + } + catch (error) { + this.logger.log(`Error in incremental sync ${error.message} and last offset is ${initialOffset}`); + } + } + + // create or update records + if (paginationTrackInfo) { + await this.prisma.vertical_objects_sync_track_data.update({ + where: { + id_vertical_objects_sync_track_data: paginationTrackInfo.id_vertical_objects_sync_track_data, + }, + data: { + data: { + offset: initialOffset, + }, + }, + }); + } + else { + await this.prisma.vertical_objects_sync_track_data.create({ + data: { + id_vertical_objects_sync_track_data: uuidv4(), + vertical: 'crm', + provider_slug: 'attio', + object: 'company', + pagination_type: 'offset', + id_connection: connection.id_connection, + data: { + offset: initialOffset, + }, + }, + }); + } + return { - data: resp.data.data, + data: respData, message: 'Attio companies retrieved', statusCode: 200, }; diff --git a/packages/api/src/crm/company/services/attio/types.ts b/packages/api/src/crm/company/services/attio/types.ts index c9da6922a..033b69790 100644 --- a/packages/api/src/crm/company/services/attio/types.ts +++ b/packages/api/src/crm/company/services/attio/types.ts @@ -127,3 +127,7 @@ export interface AttioCompany { export type AttioCompanyInput = Partial; export type AttioCompanyOutput = AttioCompanyInput; + +export type paginationType = { + offset: number; +}; \ No newline at end of file diff --git a/packages/api/src/crm/contact/services/attio/index.ts b/packages/api/src/crm/contact/services/attio/index.ts index 3c98594f2..bdae2c46a 100644 --- a/packages/api/src/crm/contact/services/attio/index.ts +++ b/packages/api/src/crm/contact/services/attio/index.ts @@ -8,7 +8,8 @@ import { ActionType, handle3rdPartyServiceError } from '@@core/utils/errors'; import { EncryptionService } from '@@core/@core-services/encryption/encryption.service'; import { ApiResponse } from '@@core/utils/types'; import { ServiceRegistry } from '../registry.service'; -import { AttioContactInput, AttioContactOutput } from './types'; +import { v4 as uuidv4 } from 'uuid'; +import { AttioContactInput, AttioContactOutput, paginationType } from './types'; import { SyncParam } from '@@core/utils/types/interface'; @Injectable() @@ -74,22 +75,133 @@ export class AttioService implements IContactService { }, }); - const resp = await axios.post( - `${connection.account_url}/v2/objects/people/records/query`, - {}, - { - headers: { - accept: 'application/json', - 'Content-Type': 'application/json', - Authorization: `Bearer ${this.cryptoService.decrypt( - connection.access_token, - )}`, - }, + const paginationTrackInfo = await this.prisma.vertical_objects_sync_track_data.findFirst({ + where: { + id_connection: connection.id_connection, + vertical: 'crm', + provider_slug: 'attio', + object: 'contact', }, - ); + }); + + let respData: AttioContactOutput[] = []; + let initialOffset: number = 0; + + if (!paginationTrackInfo) { + // Intial sync + try { + while (true) { + const resp = await axios.post( + `${connection.account_url}/v2/objects/people/records/query`, + { + "sorts": [ + { + "attribute": "created_at", + "direction": "asc" + } + ], + "offset": initialOffset, + "limit": 500 + }, + { + headers: { + accept: 'application/json', + 'Content-Type': 'application/json', + Authorization: `Bearer ${this.cryptoService.decrypt( + connection.access_token, + )}`, + }, + }, + ); + + + respData.push(...resp.data.data); + initialOffset = initialOffset + resp.data.data.length; + + if (resp.data.data.length < 500) { + break; + } + } + } + catch (error) { + this.logger.log(`Error in initial sync ${error.message} and last offset is ${initialOffset}`); + } + + } + else { + // Incremental sync + const currentPaginationData = paginationTrackInfo.data as paginationType; + initialOffset = currentPaginationData.offset; + + try { + while (true) { + const resp = await axios.post( + `${connection.account_url}/v2/objects/people/records/query`, + { + "sorts": [ + { + "attribute": "created_at", + "direction": "asc" + } + ], + "offset": initialOffset, + "limit": 500 + }, + { + headers: { + accept: 'application/json', + 'Content-Type': 'application/json', + Authorization: `Bearer ${this.cryptoService.decrypt( + connection.access_token, + )}` + } + } + ); + + respData.push(...resp.data.data); + initialOffset = initialOffset + resp.data.data.length; + + if (resp.data.data.length < 500) { + break; + } + } + } + catch (error) { + this.logger.log(`Error in incremental sync ${error.message} and last offset is ${initialOffset}`); + } + } + + // create or update records + if (paginationTrackInfo) { + await this.prisma.vertical_objects_sync_track_data.update({ + where: { + id_vertical_objects_sync_track_data: paginationTrackInfo.id_vertical_objects_sync_track_data, + }, + data: { + data: { + offset: initialOffset, + }, + }, + }); + } + else { + await this.prisma.vertical_objects_sync_track_data.create({ + data: { + id_vertical_objects_sync_track_data: uuidv4(), + vertical: 'crm', + provider_slug: 'attio', + object: 'contact', + pagination_type: 'offset', + id_connection: connection.id_connection, + data: { + offset: initialOffset, + }, + }, + }); + } return { - data: resp.data.data, + data: respData, message: 'Attio contacts retrieved', statusCode: 200, }; diff --git a/packages/api/src/crm/contact/services/attio/types.ts b/packages/api/src/crm/contact/services/attio/types.ts index c4bb4d423..6a0ee1d72 100644 --- a/packages/api/src/crm/contact/services/attio/types.ts +++ b/packages/api/src/crm/contact/services/attio/types.ts @@ -126,3 +126,8 @@ export interface AttioContact { export type AttioContactInput = Partial; export type AttioContactOutput = AttioContactInput; + + +export type paginationType = { + offset: number; +}; \ No newline at end of file diff --git a/packages/api/src/crm/deal/services/attio/index.ts b/packages/api/src/crm/deal/services/attio/index.ts index 09ec6e3eb..4ac7233a5 100644 --- a/packages/api/src/crm/deal/services/attio/index.ts +++ b/packages/api/src/crm/deal/services/attio/index.ts @@ -8,9 +8,10 @@ import { EncryptionService } from '@@core/@core-services/encryption/encryption.s import { ApiResponse } from '@@core/utils/types'; import { IDealService } from '@crm/deal/types'; import { ServiceRegistry } from '../registry.service'; -import { AttioDealInput, AttioDealOutput } from './types'; +import { AttioDealInput, AttioDealOutput, paginationType } from './types'; import { SyncParam } from '@@core/utils/types/interface'; import { OriginalDealOutput } from '@@core/utils/types/original/original.crm'; +import { v4 as uuidv4 } from 'uuid'; @Injectable() export class AttioService implements IDealService { @@ -73,21 +74,134 @@ export class AttioService implements IDealService { vertical: 'crm', }, }); - const resp = await axios.post( - `${connection.account_url}/v2/objects/deals/records/query`, - {}, - { - headers: { - accept: 'application/json', - 'Content-Type': 'application/json', - Authorization: `Bearer ${this.cryptoService.decrypt( - connection.access_token, - )}`, - }, + + const paginationTrackInfo = await this.prisma.vertical_objects_sync_track_data.findFirst({ + where: { + id_connection: connection.id_connection, + vertical: 'crm', + provider_slug: 'attio', + object: 'deal', }, - ); + }); + + let respData: AttioDealOutput[] = []; + let initialOffset: number = 0; + + if (!paginationTrackInfo) { + // Intial sync + try { + while (true) { + const resp = await axios.post( + `${connection.account_url}/v2/objects/deals/records/query`, + { + "sorts": [ + { + "attribute": "created_at", + "direction": "asc" + } + ], + "offset": initialOffset, + "limit": 500 + }, + { + headers: { + accept: 'application/json', + 'Content-Type': 'application/json', + Authorization: `Bearer ${this.cryptoService.decrypt( + connection.access_token, + )}`, + }, + }, + ); + + + respData.push(...resp.data.data); + initialOffset = initialOffset + resp.data.data.length; + + if (resp.data.data.length < 500) { + break; + } + } + } + catch (error) { + this.logger.log(`Error in initial sync ${error.message} and last offset is ${initialOffset}`); + } + + } + else { + // Incremental sync + const currentPaginationData = paginationTrackInfo.data as paginationType; + initialOffset = currentPaginationData.offset; + + try { + while (true) { + const resp = await axios.post( + `${connection.account_url}/v2/objects/deals/records/query`, + { + "sorts": [ + { + "attribute": "created_at", + "direction": "asc" + } + ], + "offset": initialOffset, + "limit": 500 + }, + { + headers: { + accept: 'application/json', + 'Content-Type': 'application/json', + Authorization: `Bearer ${this.cryptoService.decrypt( + connection.access_token, + )}` + } + } + ); + + respData.push(...resp.data.data); + initialOffset = initialOffset + resp.data.data.length; + + if (resp.data.data.length < 500) { + break; + } + } + } + catch (error) { + this.logger.log(`Error in incremental sync ${error.message} and last offset is ${initialOffset}`); + } + } + + // create or update records + if (paginationTrackInfo) { + await this.prisma.vertical_objects_sync_track_data.update({ + where: { + id_vertical_objects_sync_track_data: paginationTrackInfo.id_vertical_objects_sync_track_data, + }, + data: { + data: { + offset: initialOffset, + }, + }, + }); + } + else { + await this.prisma.vertical_objects_sync_track_data.create({ + data: { + id_vertical_objects_sync_track_data: uuidv4(), + vertical: 'crm', + provider_slug: 'attio', + object: 'deal', + pagination_type: 'offset', + id_connection: connection.id_connection, + data: { + offset: initialOffset, + }, + }, + }); + } + return { - data: resp.data.data, + data: respData, message: 'Attio deals retrieved', statusCode: 200, }; diff --git a/packages/api/src/crm/deal/services/attio/types.ts b/packages/api/src/crm/deal/services/attio/types.ts index 398a637db..97a55173d 100644 --- a/packages/api/src/crm/deal/services/attio/types.ts +++ b/packages/api/src/crm/deal/services/attio/types.ts @@ -102,3 +102,8 @@ export type AttioDealInput = { }[]; }>; }; + + +export type paginationType = { + offset: number; +}; \ No newline at end of file diff --git a/packages/api/src/crm/task/services/attio/index.ts b/packages/api/src/crm/task/services/attio/index.ts index c55541a40..943b1208f 100644 --- a/packages/api/src/crm/task/services/attio/index.ts +++ b/packages/api/src/crm/task/services/attio/index.ts @@ -8,7 +8,8 @@ import { ITaskService } from '@crm/task/types'; import { Injectable } from '@nestjs/common'; import axios from 'axios'; import { ServiceRegistry } from '../registry.service'; -import { AttioTaskInput, AttioTaskOutput } from './types'; +import { AttioTaskInput, AttioTaskOutput, paginationType } from './types'; +import { v4 as uuidv4 } from 'uuid'; @Injectable() export class AttioService implements ITaskService { @@ -91,19 +92,109 @@ export class AttioService implements ITaskService { }, }); - const baseURL = `${connection.account_url}/v2/tasks`; - - const resp = await axios.get(baseURL, { - headers: { - 'Content-Type': 'application/json', - Authorization: `Bearer ${this.cryptoService.decrypt( - connection.access_token, - )}`, + const paginationTrackInfo = await this.prisma.vertical_objects_sync_track_data.findFirst({ + where: { + id_connection: connection.id_connection, + vertical: 'crm', + provider_slug: 'attio', + object: 'task', }, }); - this.logger.log(`Synced attio tasks !`); + + let respData: AttioTaskOutput[] = []; + let initialOffset: number = 0; + + if (!paginationTrackInfo) { + // Intial sync + try { + while (true) { + const resp = await axios.get( + `${connection.account_url}/v2/tasks?limit=500&offset=${initialOffset}`, + { + headers: { + 'Content-Type': 'application/json', + Authorization: `Bearer ${this.cryptoService.decrypt( + connection.access_token, + )}`, + }, + }); + + + respData.push(...resp.data.data); + initialOffset = initialOffset + resp.data.data.length; + + if (resp.data.data.length < 500) { + break; + } + } + } + catch (error) { + this.logger.log(`Error in initial sync ${error.message} and last offset is ${initialOffset}`); + } + + } + else { + // Incremental sync + const currentPaginationData = paginationTrackInfo.data as paginationType; + initialOffset = currentPaginationData.offset; + + try { + while (true) { + const resp = await axios.get( + `${connection.account_url}/v2/tasks?limit=500&offset=${initialOffset}`, + { + headers: { + 'Content-Type': 'application/json', + Authorization: `Bearer ${this.cryptoService.decrypt( + connection.access_token, + )}`, + }, + }); + + respData.push(...resp.data.data); + initialOffset = initialOffset + resp.data.data.length; + + if (resp.data.data.length < 500) { + break; + } + } + } + catch (error) { + this.logger.log(`Error in incremental sync ${error.message} and last offset is ${initialOffset}`); + } + } + + // create or update records + if (paginationTrackInfo) { + await this.prisma.vertical_objects_sync_track_data.update({ + where: { + id_vertical_objects_sync_track_data: paginationTrackInfo.id_vertical_objects_sync_track_data, + }, + data: { + data: { + offset: initialOffset, + }, + }, + }); + } + else { + await this.prisma.vertical_objects_sync_track_data.create({ + data: { + id_vertical_objects_sync_track_data: uuidv4(), + vertical: 'crm', + provider_slug: 'attio', + object: 'task', + pagination_type: 'offset', + id_connection: connection.id_connection, + data: { + offset: initialOffset, + }, + }, + }); + } + return { - data: resp?.data?.data, + data: respData, message: 'Attio tasks retrieved', statusCode: 200, }; diff --git a/packages/api/src/crm/task/services/attio/types.ts b/packages/api/src/crm/task/services/attio/types.ts index 6ac7b4c67..13733f0cb 100644 --- a/packages/api/src/crm/task/services/attio/types.ts +++ b/packages/api/src/crm/task/services/attio/types.ts @@ -45,3 +45,7 @@ export type AttioTaskInput = { }[]; }; }; + +export type paginationType = { + offset: number; +};