Skip to content

Commit

Permalink
Merge branch 'main' into crowd-linux
Browse files Browse the repository at this point in the history
  • Loading branch information
Uroš Marolt committed Aug 25, 2023
2 parents 988833b + 6c35017 commit 3dff772
Show file tree
Hide file tree
Showing 41 changed files with 571 additions and 133 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
DROP INDEX CONCURRENTLY IF EXISTS activities_tenant_segment_source_id_idx;
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
DROP INDEX IF EXISTS segments_tenant_subprojects;
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
DROP INDEX CONCURRENTLY IF EXISTS conversations_tenant_segment_slug;
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
CREATE UNIQUE INDEX CONCURRENTLY IF NOT EXISTS activities_tenant_segment_source_id_idx ON activities ("tenantId", "segmentId", "sourceId") WHERE "deletedAt" IS NULL;
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
CREATE INDEX CONCURRENTLY IF NOT EXISTS segments_tenant_subprojects ON segments ("tenantId") WHERE "grandparentSlug" IS NOT NULL AND "parentSlug" IS NOT NULL;
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
CREATE INDEX CONCURRENTLY IF NOT EXISTS conversations_tenant_segment_slug ON conversations ("tenantId", "segmentId", MD5(slug));
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,7 @@ export default class IncomingWebhookRepository extends RepositoryBase<
from "incomingWebhooks"
where state = :pending
and "createdAt" < now() - interval '1 hour'
and type not in ('GITHUB', 'DISCORD')
limit ${perPage} offset ${(page - 1) * perPage};
`

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ export class WebhookProcessor extends LoggerBase {
integrationId: webhook.integrationId,
})

logger.info('Webhook found!')
logger.debug('Webhook found!')

if (!(force === true) && webhook.state !== WebhookState.PENDING) {
logger.error({ state: webhook.state }, 'Webhook is not in pending state!')
Expand All @@ -55,6 +55,9 @@ export class WebhookProcessor extends LoggerBase {
userContext.log = logger

const integration = await IntegrationRepository.findById(webhook.integrationId, userContext)
if (integration.platform === 'github' || integration.platform === 'discord') {
return
}
const segment = await new SegmentRepository(userContext).findById(integration.segmentId)
userContext.currentSegments = [segment]

Expand Down
2 changes: 1 addition & 1 deletion backend/src/serverless/utils/nodeWorkerSQS.ts
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ export const sendNodeWorkerMessage = async (
DelaySeconds: delay,
}

log.info(
log.debug(
{
messageType: body.type,
body,
Expand Down
2 changes: 1 addition & 1 deletion services/apps/data_sink_worker/src/queue/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ export class WorkerQueueReceiver extends SqsQueueReceiver {
parentLog: Logger,
maxConcurrentProcessing: number,
) {
super(client, DATA_SINK_WORKER_QUEUE_SETTINGS, maxConcurrentProcessing, parentLog, true)
super(client, DATA_SINK_WORKER_QUEUE_SETTINGS, maxConcurrentProcessing, parentLog)
}

override async processMessage(message: IQueueMessage): Promise<void> {
Expand Down
8 changes: 6 additions & 2 deletions services/apps/data_sink_worker/src/repo/activity.repo.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,11 @@ export default class ActivityRepository extends RepositoryBase<ActivityRepositor
channel,
url,
sentiment
from activities where "tenantId" = $(tenantId) and "segmentId" = $(segmentId) and "sourceId" = $(sourceId)
from activities
where "tenantId" = $(tenantId)
and "segmentId" = $(segmentId)
and "sourceId" = $(sourceId)
and "deletedAt" is null
`
public async findExisting(
tenantId: string,
Expand Down Expand Up @@ -83,7 +87,7 @@ export default class ActivityRepository extends RepositoryBase<ActivityRepositor
promises.push(
this.db().none(
`
update activities set "parentId" = (select id from activities where "tenantId" = $(tenantId) and "sourceId" = $(sourceParentId) limit 1)
update activities set "parentId" = (select id from activities where "tenantId" = $(tenantId) and "sourceId" = $(sourceParentId) and "deletedAt" IS NULL limit 1)
where "id" = $(id) and "tenantId" = $(tenantId)
`,
{
Expand Down
14 changes: 11 additions & 3 deletions services/apps/data_sink_worker/src/repo/member.repo.ts
Original file line number Diff line number Diff line change
Expand Up @@ -145,15 +145,23 @@ export default class MemberRepository extends RepositoryBase<MemberRepository> {
}

public async update(id: string, tenantId: string, data: IDbMemberUpdateData): Promise<void> {
const keys = Object.keys(data)
keys.push('updatedAt')
// construct custom column set
const dynamicColumnSet = new this.dbInstance.helpers.ColumnSet(keys, {
table: {
table: 'members',
},
})

const prepared = RepositoryBase.prepare(
{
...data,
weakIdentities: JSON.stringify(data.weakIdentities || []),
updatedAt: new Date(),
},
this.updateMemberColumnSet,
dynamicColumnSet,
)
const query = this.dbInstance.helpers.update(prepared, this.updateMemberColumnSet)
const query = this.dbInstance.helpers.update(prepared, dynamicColumnSet)

const condition = this.format('where id = $(id) and "tenantId" = $(tenantId)', {
id,
Expand Down
11 changes: 2 additions & 9 deletions services/apps/data_sink_worker/src/service/dataSink.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -77,11 +77,8 @@ export default class DataSinkService extends LoggerBase {
return
}

this.log.debug('Marking result as in progress.')
await this.repo.markResultInProgress(resultId)
if (resultInfo.runId) {
await this.repo.touchRun(resultInfo.runId)
}
// this.log.debug('Marking result as in progress.')
// await this.repo.markResultInProgress(resultId)

try {
const data = resultInfo.data
Expand Down Expand Up @@ -170,10 +167,6 @@ export default class DataSinkService extends LoggerBase {
log: this.log,
frameworkVersion: 'new',
})
} finally {
if (resultInfo.runId) {
await this.repo.touchRun(resultInfo.runId)
}
}
}
}
39 changes: 26 additions & 13 deletions services/apps/data_sink_worker/src/service/member.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -154,15 +154,19 @@ export default class MemberService extends LoggerBase {

if (!isObjectEmpty(toUpdate)) {
this.log.debug({ memberId: id }, 'Updating a member!')
await txRepo.update(id, tenantId, {
emails: toUpdate.emails || original.emails,
joinedAt: toUpdate.joinedAt || original.joinedAt,
attributes: toUpdate.attributes || original.attributes,
weakIdentities: toUpdate.weakIdentities || original.weakIdentities,
// leave this one empty if nothing changed - we are only adding up new identities not removing them
identities: toUpdate.identities,
displayName: toUpdate.displayName || original.displayName,
})

const dateToUpdate = Object.entries(toUpdate).reduce((acc, [key, value]) => {
if (key === 'identities') {
return acc
}

if (value) {
acc[key] = value
}
return acc
}, {} as IDbMemberUpdateData)

await txRepo.update(id, tenantId, dateToUpdate)
await txRepo.addToSegment(id, tenantId, segmentId)

updated = true
Expand Down Expand Up @@ -272,6 +276,12 @@ export default class MemberService extends LoggerBase {
await this.store.transactionally(async (txStore) => {
const txRepo = new MemberRepository(txStore, this.log)
const txIntegrationRepo = new IntegrationRepository(txStore, this.log)
const txService = new MemberService(
txStore,
this.nodejsWorkerEmitter,
this.searchSyncWorkerEmitter,
this.log,
)

const dbIntegration = await txIntegrationRepo.findById(integrationId)
const segmentId = dbIntegration.segmentId
Expand Down Expand Up @@ -300,7 +310,7 @@ export default class MemberService extends LoggerBase {
)
}

await this.update(
await txService.update(
dbMember.id,
tenantId,
segmentId,
Expand Down Expand Up @@ -374,9 +384,12 @@ export default class MemberService extends LoggerBase {
const newDate = member.joinedAt
const oldDate = new Date(dbMember.joinedAt)

if (newDate.getTime() !== oldDate.getTime()) {
// pick the oldest
joinedAt = newDate < oldDate ? newDate.toISOString() : oldDate.toISOString()
if (oldDate <= newDate) {
// we already have the oldest date in the db, so we don't need to update it
joinedAt = undefined
} else {
// we have a new date and it's older, so we need to update it
joinedAt = newDate.toISOString()
}
}

Expand Down
81 changes: 53 additions & 28 deletions services/apps/data_sink_worker/src/service/settings.repo.ts
Original file line number Diff line number Diff line change
Expand Up @@ -62,39 +62,64 @@ export default class SettingsRepository extends RepositoryBase<SettingsRepositor
platform: string,
channel: string,
): Promise<void> {
const result = await this.db().result(
`
update segments
set "activityChannels" =
case
-- If platform exists, and channel does not exist, add it
when "activityChannels" ? $(platform)
and not ($(channel) = any (select jsonb_array_elements_text("activityChannels" -> $(platform)))) then
jsonb_set(
"activityChannels",
array [$(platform)::text],
"activityChannels" -> $(platform) || jsonb_build_array($(channel))
)
-- If platform does not exist, create it
when not ("activityChannels" ? $(platform)) or "activityChannels" is null then
coalesce("activityChannels", '{}'::jsonb) ||
jsonb_build_object($(platform), jsonb_build_array($(channel)))
-- Else, do nothing
else
"activityChannels"
end
where "tenantId" = $(tenantId)
and id = $(segmentId)
`,
const existingData = await this.db().oneOrNone(
`select "activityChannels" from "segments" where "tenantId" = $(tenantId) and id = $(segmentId)`,
{
tenantId,
segmentId,
platform,
channel,
},
)

this.checkUpdateRowCount(result.rowCount, 1)
if (existingData) {
const channels = existingData.activityChannels

if (channels && channels[platform] && channels[platform].includes(channel)) {
return
} else {
await this.db().result(
`
update segments
set "activityChannels" =
case
-- If platform exists, and channel does not exist, add it
when "activityChannels" ? $(platform)
and not ($(channel) = any (select jsonb_array_elements_text("activityChannels" -> $(platform)))) then
jsonb_set(
"activityChannels",
array [$(platform)::text],
"activityChannels" -> $(platform) || jsonb_build_array($(channel))
)
-- If platform does not exist, create it
when not ("activityChannels" ? $(platform)) or "activityChannels" is null then
coalesce("activityChannels", '{}'::jsonb) ||
jsonb_build_object($(platform), jsonb_build_array($(channel)))
-- Else, do nothing
else
"activityChannels"
end
where "tenantId" = $(tenantId)
and id = $(segmentId)
and case
-- If platform exists, and channel does not exist, add it
when "activityChannels" ? $(platform)
and not ($(channel) = any (select jsonb_array_elements_text("activityChannels" -> $(platform)))) then
1
-- If platform does not exist, create it
when not ("activityChannels" ? $(platform)) or "activityChannels" is null then
1
-- Else, do nothing
else
0
end = 1
`,
{
tenantId,
segmentId,
platform,
channel,
},
)
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -198,16 +198,11 @@ export default class IntegrationDataRepository extends RepositoryBase<Integratio
this.checkUpdateRowCount(result.rowCount, 1)
}

public async markDataProcessed(dataId: string): Promise<void> {
public async deleteData(dataId: string): Promise<void> {
const result = await this.db().result(
`update integration."apiData"
set state = $(state),
"processedAt" = now(),
"updatedAt" = now()
where id = $(dataId)`,
`delete from integration."apiData" where id = $(dataId)`,
{
dataId,
state: IntegrationStreamDataState.PROCESSED,
},
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,17 +171,19 @@ export default class IntegrationDataService extends LoggerBase {
: undefined,
}

this.log.debug('Marking data as in progress!')
await this.repo.markDataInProgress(dataId)
if (dataInfo.runId) {
await this.repo.touchRun(dataInfo.runId)
}
// this.log.debug('Marking data as in progress!')
// await this.repo.markDataInProgress(dataId)

// TODO we might need that later to check for stuck runs
// if (dataInfo.runId) {
// await this.repo.touchRun(dataInfo.runId)
// }

this.log.debug('Processing data!')
try {
await integrationService.processData(context)
this.log.debug('Finished processing data!')
await this.repo.markDataProcessed(dataId)
await this.repo.deleteData(dataId)
} catch (err) {
this.log.error(err, 'Error while processing stream!')
await this.triggerDataError(
Expand Down Expand Up @@ -232,11 +234,14 @@ export default class IntegrationDataService extends LoggerBase {
frameworkVersion: 'new',
})
}
} finally {
if (dataInfo.runId) {
await this.repo.touchRun(dataInfo.runId)
}
}

// TODO we might need that later to check for stuck runs
// finally {
// if (dataInfo.runId) {
// await this.repo.touchRun(dataInfo.runId)
// }
// }
}

private async publishCustom(
Expand All @@ -253,7 +258,12 @@ export default class IntegrationDataService extends LoggerBase {
type,
data: entity,
})
await this.dataSinkWorkerEmitter.triggerResultProcessing(tenantId, platform, resultId)
await this.dataSinkWorkerEmitter.triggerResultProcessing(
tenantId,
platform,
resultId,
resultId,
)
} catch (err) {
await this.triggerDataError(
dataId,
Expand All @@ -278,7 +288,12 @@ export default class IntegrationDataService extends LoggerBase {
type: IntegrationResultType.ACTIVITY,
data: activity,
})
await this.dataSinkWorkerEmitter.triggerResultProcessing(tenantId, platform, resultId)
await this.dataSinkWorkerEmitter.triggerResultProcessing(
tenantId,
platform,
resultId,
activity.sourceId,
)
} catch (err) {
await this.triggerDataError(
dataId,
Expand Down
3 changes: 2 additions & 1 deletion services/apps/integration_run_worker/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@
"format-check": "./node_modules/.bin/prettier --check .",
"tsc-check": "./node_modules/.bin/tsc --noEmit",
"script:onboard-integration": "SERVICE=script TS_NODE_TRANSPILE_ONLY=true node -r tsconfig-paths/register -r ts-node/register src/bin/onboard-integration.ts",
"script:process-repo": "SERVICE=script TS_NODE_TRANSPILE_ONLY=true node -r tsconfig-paths/register -r ts-node/register src/bin/process-repo.ts"
"script:process-repo": "SERVICE=script TS_NODE_TRANSPILE_ONLY=true node -r tsconfig-paths/register -r ts-node/register src/bin/process-repo.ts",
"script:trigger-stream-processed": "SERVICE=script TS_NODE_TRANSPILE_ONLY=true node -r tsconfig-paths/register -r ts-node/register src/bin/trigger-stream-processed.ts"
},
"dependencies": {
"@crowd/common": "file:../../libs/common",
Expand Down
Loading

0 comments on commit 3dff772

Please sign in to comment.