Skip to content

Commit

Permalink
Fix async execution after transaction commit
Browse files Browse the repository at this point in the history
  • Loading branch information
loicsaintroch committed Jan 5, 2024
1 parent 3788897 commit b4627b1
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 42 deletions.
42 changes: 0 additions & 42 deletions backend/src/database/repositories/memberRepository.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import {
SegmentData,
SegmentProjectGroupNestedData,
SegmentProjectNestedData,
TemporalWorkflowId,
} from '@crowd/types'
import lodash, { chunk } from 'lodash'
import moment from 'moment'
Expand All @@ -20,7 +19,6 @@ import Sequelize, { QueryTypes } from 'sequelize'
import { Error400, Error404 } from '@crowd/common'
import { FieldTranslatorFactory, OpensearchQueryParser } from '@crowd/opensearch'
import { ActivityDisplayService } from '@crowd/integrations'
import { WorkflowIdReusePolicy } from '@crowd/temporal'
import { KUBE_MODE, SERVICE } from '@/conf'
import { ServiceType } from '../../conf/configTypes'
import isFeatureEnabled from '../../feature-flags/isFeatureEnabled'
Expand Down Expand Up @@ -946,24 +944,6 @@ class MemberRepository {
): Promise<void> {
const affiliationRepository = new MemberSegmentAffiliationRepository(options)
await affiliationRepository.setForMember(memberId, data)
await options.temporal.workflow.start('memberUpdate', {
taskQueue: 'profiles',
workflowId: `${TemporalWorkflowId.MEMBER_UPDATE}/${options.currentTenant.id}/${memberId}`,
workflowIdReusePolicy: WorkflowIdReusePolicy.WORKFLOW_ID_REUSE_POLICY_TERMINATE_IF_RUNNING,
retry: {
maximumAttempts: 10,
},
args: [
{
member: {
id: memberId,
},
},
],
searchAttributes: {
TenantId: [options.currentTenant.id],
},
})
}

static async getAffiliations(
Expand Down Expand Up @@ -3474,7 +3454,6 @@ class MemberRepository {
title = null,
dateStart = null,
dateEnd = null,
updateAffiliation = true,
},
options: IRepositoryOptions,
) {
Expand Down Expand Up @@ -3558,27 +3537,6 @@ class MemberRepository {
transaction,
},
)

if (updateAffiliation) {
await options.temporal.workflow.start('memberUpdate', {
taskQueue: 'profiles',
workflowId: `${TemporalWorkflowId.MEMBER_UPDATE}/${options.currentTenant.id}/${memberId}`,
workflowIdReusePolicy: WorkflowIdReusePolicy.WORKFLOW_ID_REUSE_POLICY_TERMINATE_IF_RUNNING,
retry: {
maximumAttempts: 10,
},
args: [
{
member: {
id: memberId,
},
},
],
searchAttributes: {
TenantId: [options.currentTenant.id],
},
})
}
}

static async deleteWorkExperience(id, options: IRepositoryOptions) {
Expand Down
18 changes: 18 additions & 0 deletions backend/src/services/memberService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1000,6 +1000,24 @@ export default class MemberService extends LoggerBase {
const record = await MemberRepository.update(id, data, repoOptions)

await SequelizeRepository.commitTransaction(transaction)
await this.options.temporal.workflow.start('memberUpdate', {
taskQueue: 'profiles',
workflowId: `${TemporalWorkflowId.MEMBER_UPDATE}/${this.options.currentTenant.id}/${id}`,
workflowIdReusePolicy: WorkflowIdReusePolicy.WORKFLOW_ID_REUSE_POLICY_TERMINATE_IF_RUNNING,
retry: {
maximumAttempts: 10,
},
args: [
{
member: {
id,
},
},
],
searchAttributes: {
TenantId: [this.options.currentTenant.id],
},
})

if (syncToOpensearch) {
try {
Expand Down
23 changes: 23 additions & 0 deletions backend/src/services/premium/enrichment/memberEnrichmentService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import {
OrganizationSource,
SyncMode,
IOrganizationIdentity,
TemporalWorkflowId,
} from '@crowd/types'
import {
EnrichmentAPICertification,
Expand All @@ -24,6 +25,7 @@ import {
EnrichmentAPISkills,
EnrichmentAPIWorkExperience,
} from '@crowd/types/premium'
import { WorkflowIdReusePolicy } from '@crowd/temporal'
import { ENRICHMENT_CONFIG, REDIS_CONFIG } from '../../../conf'
import { AttributeData } from '../../../database/attributes/attribute'
import MemberEnrichmentCacheRepository from '../../../database/repositories/memberEnrichmentCacheRepository'
Expand Down Expand Up @@ -389,6 +391,27 @@ export default class MemberEnrichmentService extends LoggerBase {
}

await SequelizeRepository.commitTransaction(transaction)
if (enrichmentData.work_experiences) {
await this.options.temporal.workflow.start('memberUpdate', {
taskQueue: 'profiles',
workflowId: `${TemporalWorkflowId.MEMBER_UPDATE}/${this.options.currentTenant.id}/${result.id}`,
workflowIdReusePolicy: WorkflowIdReusePolicy.WORKFLOW_ID_REUSE_POLICY_TERMINATE_IF_RUNNING,
retry: {
maximumAttempts: 10,
},
args: [
{
member: {
id: result.id,
},
},
],
searchAttributes: {
TenantId: [this.options.currentTenant.id],
},
})
}

await searchSyncService.triggerMemberSync(this.options.currentTenant.id, result.id)

result = await MemberRepository.findByIdOpensearch(result.id, this.options)
Expand Down

0 comments on commit b4627b1

Please sign in to comment.