Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Incremental sync for Attio provider #701

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 32 additions & 15 deletions packages/api/prisma/schema.prisma
Original file line number Diff line number Diff line change
Expand Up @@ -1300,21 +1300,22 @@ model connection_strategies {

/// This model or at least one of its fields has comments in the database, and requires an additional setup for migrations: Read more: https://pris.ly/d/database-comments
model connections {
id_connection String @id(map: "pk_connections") @db.Uuid
status String
provider_slug String
vertical String
account_url String?
token_type String
access_token String?
refresh_token String?
expiration_timestamp DateTime? @db.Timestamptz(6)
created_at DateTime @db.Timestamptz(6)
connection_token String?
id_project String @db.Uuid
id_linked_user String @db.Uuid
linked_users linked_users @relation(fields: [id_linked_user], references: [id_linked_user], onDelete: NoAction, onUpdate: NoAction, map: "fk_11")
projects projects @relation(fields: [id_project], references: [id_project], onDelete: NoAction, onUpdate: NoAction, map: "fk_9")
id_connection String @id(map: "pk_connections") @db.Uuid
status String
provider_slug String
vertical String
account_url String?
token_type String
access_token String?
refresh_token String?
expiration_timestamp DateTime? @db.Timestamptz(6)
created_at DateTime @db.Timestamptz(6)
connection_token String?
id_project String @db.Uuid
id_linked_user String @db.Uuid
linked_users linked_users @relation(fields: [id_linked_user], references: [id_linked_user], onDelete: NoAction, onUpdate: NoAction, map: "fk_11")
projects projects @relation(fields: [id_project], references: [id_project], onDelete: NoAction, onUpdate: NoAction, map: "fk_9")
vertical_objects_sync_track_data vertical_objects_sync_track_data[]

@@index([id_project], map: "fk_1")
@@index([id_linked_user], map: "fk_connections_to_linkedusersid")
Expand Down Expand Up @@ -1346,6 +1347,11 @@ model connector_sets {
ats_ashby Boolean?
ecom_webflow Boolean?
crm_microsoftdynamicssales Boolean?
fs_dropbox Boolean?
fs_googledrive Boolean?
fs_sharepoint Boolean?
fs_onedrive Boolean?
crm_salesforce Boolean?
projects projects[]
}

Expand Down Expand Up @@ -2144,3 +2150,14 @@ model projects_pull_frequency {
id_project String @unique(map: "uq_projects_pull_frequency_project") @db.Uuid
projects projects @relation(fields: [id_project], references: [id_project], onDelete: NoAction, onUpdate: NoAction, map: "fk_projects_pull_frequency_project")
}

model vertical_objects_sync_track_data {
id_vertical_objects_sync_track_data String @id(map: "pk_vertical_objects_sync_track_data") @db.Uuid
vertical String
provider_slug String
object String
pagination_type String
id_connection String @db.Uuid
data Json? @db.Json
connections connections @relation(fields: [id_connection], references: [id_connection], onDelete: NoAction, onUpdate: NoAction, map: "fk_connection")
}
14 changes: 14 additions & 0 deletions packages/api/scripts/init.sql
Original file line number Diff line number Diff line change
Expand Up @@ -2900,6 +2900,20 @@ COMMENT ON COLUMN connections.token_type IS 'The type of the token, such as "Bea
COMMENT ON COLUMN connections.connection_token IS 'Connection token users will put in their header to identify which service / linked_User they make request for';


-- ************************************** vertical_objects_sync_track_data
CREATE TABLE vertical_objects_sync_track_data
(
id_vertical_objects_sync_track_data uuid NOT NULL,
vertical text NOT NULL,
provider_slug text NOT NULL,
object text NOT NULL,
pagination_type text NOT NULL,
id_connection uuid NOT NULL,
data json,
CONSTRAINT PK_vertical_objects_sync_track_data PRIMARY KEY ( id_vertical_objects_sync_track_data ),
CONSTRAINT FK_connection FOREIGN KEY ( id_connection ) REFERENCES connections ( id_connection )
);

-- ************************************** ats_scorecards
CREATE TABLE ats_scorecards
(
Expand Down
141 changes: 128 additions & 13 deletions packages/api/src/crm/company/services/attio/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,10 @@ import { EncryptionService } from '@@core/@core-services/encryption/encryption.s
import { ApiResponse } from '@@core/utils/types';
import { ICompanyService } from '@crm/company/types';
import { ServiceRegistry } from '../registry.service';
import { AttioCompanyInput, AttioCompanyOutput } from './types';
import { AttioCompanyInput, AttioCompanyOutput, paginationType } from './types';
import { SyncParam } from '@@core/utils/types/interface';
import { OriginalCompanyOutput } from '@@core/utils/types/original/original.crm';
import { v4 as uuidv4 } from 'uuid';

@Injectable()
export class AttioService implements ICompanyService {
Expand Down Expand Up @@ -74,20 +75,134 @@ export class AttioService implements ICompanyService {
vertical: 'crm',
},
});
const resp = await axios.post(
`${connection.account_url}/v2/objects/companies/records/query`,
{},
{
headers: {
'Content-Type': 'application/json',
Authorization: `Bearer ${this.cryptoService.decrypt(
connection.access_token,
)}`,
},

const paginationTrackInfo = await this.prisma.vertical_objects_sync_track_data.findFirst({
where: {
id_connection: connection.id_connection,
vertical: 'crm',
provider_slug: 'attio',
object: 'company',
},
);
});

let respData: AttioCompanyOutput[] = [];
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use const instead of let.

The variable respData is only assigned once during its initialization and is not reassigned throughout the code. Using const instead of let would ensure that respData cannot be accidentally reassigned, improving code quality and readability.

Apply this diff to use const:

-let respData = [];
+const respData = [];
Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
let respData: AttioCompanyOutput[] = [];
const respData: AttioCompanyOutput[] = [];
Tools
Biome

[error] 88-88: This let declares a variable that is only assigned once.

'respData' is never reassigned.

Safe fix: Use const instead.

(lint/style/useConst)


Consider removing the type annotation.

The type annotation AttioCompanyOutput[] is not necessary as TypeScript can infer the type of respData based on its initialization to an empty array []. Removing the type annotation would not affect the functionality of the code and would improve readability.

Apply this diff to remove the type annotation:

-let respData: AttioCompanyOutput[] = [];
+let respData = [];
Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
let respData: AttioCompanyOutput[] = [];
let respData = [];
Tools
Biome

[error] 88-88: This let declares a variable that is only assigned once.

'respData' is never reassigned.

Safe fix: Use const instead.

(lint/style/useConst)

let initialOffset: number = 0;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider removing the type annotation.

The type annotation number is not necessary as TypeScript can infer the type of initialOffset based on its initialization to 0. Removing the type annotation would not affect the functionality of the code and would improve readability.

Apply this diff to remove the type annotation:

-let initialOffset: number = 0;
+let initialOffset = 0;
Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
let initialOffset: number = 0;
let initialOffset = 0;
Tools
Biome

[error] 89-89: This type annotation is trivially inferred from its initialization.

Safe fix: Remove the type annotation.

(lint/style/noInferrableTypes)


if (!paginationTrackInfo) {
// Intial sync
try {
while (true) {
const resp = await axios.post(
`${connection.account_url}/v2/objects/companies/records/query`,
{
"sorts": [
{
"attribute": "created_at",
"direction": "asc"
}
],
"offset": initialOffset,
"limit": 500
},
{
headers: {
accept: 'application/json',
'Content-Type': 'application/json',
Authorization: `Bearer ${this.cryptoService.decrypt(
connection.access_token,
)}`,
},
},
);


respData.push(...resp.data.data);
initialOffset = initialOffset + resp.data.data.length;

if (resp.data.data.length < 500) {
break;
}
}
}
catch (error) {
this.logger.log(`Error in initial sync ${error.message} and last offset is ${initialOffset}`);
}

}
else {
// Incremental sync
const currentPaginationData = paginationTrackInfo.data as paginationType;
initialOffset = currentPaginationData.offset;

try {
while (true) {
const resp = await axios.post(
`${connection.account_url}/v2/objects/companies/records/query`,
{
"sorts": [
{
"attribute": "created_at",
"direction": "asc"
}
],
"offset": initialOffset,
"limit": 500
},
{
headers: {
accept: 'application/json',
'Content-Type': 'application/json',
Authorization: `Bearer ${this.cryptoService.decrypt(
connection.access_token,
)}`
}
}
);

respData.push(...resp.data.data);
initialOffset = initialOffset + resp.data.data.length;

if (resp.data.data.length < 500) {
break;
}
}
}
catch (error) {
this.logger.log(`Error in incremental sync ${error.message} and last offset is ${initialOffset}`);
}
}

// create or update records
if (paginationTrackInfo) {
await this.prisma.vertical_objects_sync_track_data.update({
where: {
id_vertical_objects_sync_track_data: paginationTrackInfo.id_vertical_objects_sync_track_data,
},
data: {
data: {
offset: initialOffset,
},
},
});
}
else {
await this.prisma.vertical_objects_sync_track_data.create({
data: {
id_vertical_objects_sync_track_data: uuidv4(),
vertical: 'crm',
provider_slug: 'attio',
object: 'company',
pagination_type: 'offset',
id_connection: connection.id_connection,
data: {
offset: initialOffset,
},
},
});
}

return {
data: resp.data.data,
data: respData,
message: 'Attio companies retrieved',
statusCode: 200,
};
Expand Down
4 changes: 4 additions & 0 deletions packages/api/src/crm/company/services/attio/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -127,3 +127,7 @@ export interface AttioCompany {

export type AttioCompanyInput = Partial<AttioCompany>;
export type AttioCompanyOutput = AttioCompanyInput;

export type paginationType = {
offset: number;
};
Loading
Loading