Skip to content

Commit

Permalink
Fix groups io errors (#2616)
Browse files Browse the repository at this point in the history
  • Loading branch information
garrrikkotua authored Sep 19, 2024
1 parent d33b5ed commit 55c7e89
Show file tree
Hide file tree
Showing 9 changed files with 185 additions and 30 deletions.
76 changes: 55 additions & 21 deletions backend/src/database/repositories/integrationRepository.ts
Original file line number Diff line number Diff line change
Expand Up @@ -521,32 +521,66 @@ class IntegrationRepository {
const seq = SequelizeRepository.getSequelize(options)

const integrationIds = rows.map((row) => row.id)
let results = []

if (integrationIds.length > 0) {
const query = `select "integrationId", max("processedAt") as "processedAt" from "incomingWebhooks"
where "integrationId" in (:integrationIds) and state = 'PROCESSED'
group by "integrationId"`

results = await seq.query(query, {
replacements: {
integrationIds,
},
type: QueryTypes.SELECT,
transaction: SequelizeRepository.getTransaction(options),
const webhookQuery = `
SELECT "integrationId", MAX("processedAt") AS "webhookProcessedAt"
FROM "incomingWebhooks"
WHERE "integrationId" IN (:integrationIds) AND state = 'PROCESSED'
GROUP BY "integrationId"
`

const runQuery = `
SELECT "integrationId", MAX("processedAt") AS "runProcessedAt"
FROM integration.runs
WHERE "integrationId" IN (:integrationIds)
GROUP BY "integrationId"
`

const [webhookResults, runResults] = await Promise.all([
seq.query(webhookQuery, {
replacements: { integrationIds },
type: QueryTypes.SELECT,
transaction: SequelizeRepository.getTransaction(options),
}),
seq.query(runQuery, {
replacements: { integrationIds },
type: QueryTypes.SELECT,
transaction: SequelizeRepository.getTransaction(options),
}),
])

const processedAtMap = integrationIds.reduce((map, id) => {
const webhookResult: any = webhookResults.find(
(r: { integrationId: string }) => r.integrationId === id,
)
const runResult: any = runResults.find(
(r: { integrationId: string }) => r.integrationId === id,
)
map[id] = {
webhookProcessedAt: webhookResult ? webhookResult.webhookProcessedAt : null,
runProcessedAt: runResult ? runResult.runProcessedAt : null,
}
return map
}, {})

rows.forEach((row) => {
const processedAt = processedAtMap[row.id]
// Use the latest processedAt from either webhook or run, or fall back to updatedAt
row.lastProcessedAt = processedAt
? new Date(
Math.max(
processedAt.webhookProcessedAt
? new Date(processedAt.webhookProcessedAt).getTime()
: 0,
processedAt.runProcessedAt ? new Date(processedAt.runProcessedAt).getTime() : 0,
new Date(row.updatedAt).getTime(),
),
)
: row.updatedAt
})
}

const processedAtMap = results.reduce((map, item) => {
map[item.integrationId] = item.processedAt
return map
}, {})

rows.forEach((row) => {
// Either use the last processedAt, or fall back updatedAt
row.lastProcessedAt = processedAtMap[row.id] || row.updatedAt
})

return { rows, count, limit: parsed.limit, offset: parsed.offset }
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import axios, { AxiosRequestConfig } from 'axios'
import { IProcessStreamContext } from '../../../types'
import { GroupName } from '../types'
import { RedisSemaphore } from '../utils/lock'

export const getActivityLogs = async (
groupName: GroupName,
Expand All @@ -19,11 +20,21 @@ export const getActivityLogs = async (
},
}

const semaphore = new RedisSemaphore({
integrationId: ctx.integration.id,
apiCallType: 'getActivityLogs',
maxConcurrent: 1,
cache: ctx.cache,
})

try {
await semaphore.acquire()
const response = await axios(config)
return response.data
} catch (err) {
ctx.log.error(err, { groupName }, 'Error fetching activity logs from group!')
throw err
} finally {
await semaphore.release()
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import axios, { AxiosRequestConfig } from 'axios'
import { IProcessStreamContext } from '../../../types'
import { GroupName } from '../types'
import { RedisSemaphore } from '../utils/lock'

export const getGroupMember = async (
memberInfoId: string,
Expand All @@ -18,11 +19,21 @@ export const getGroupMember = async (
},
}

const semaphore = new RedisSemaphore({
integrationId: ctx.integration.id,
apiCallType: 'getGroupMember',
maxConcurrent: 1,
cache: ctx.cache,
})

try {
await semaphore.acquire()
const response = await axios(config)
return response.data
} catch (err) {
ctx.log.error(err, { memberInfoId }, 'Error fetching member by memberInfoId!')
throw err
} finally {
await semaphore.release()
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import axios, { AxiosRequestConfig } from 'axios'
import { IProcessStreamContext } from '../../../types'
import { GroupName } from '../types'
import { RedisSemaphore } from '../utils/lock'

export const getGroupMembers = async (
groupName: GroupName,
Expand All @@ -18,11 +19,21 @@ export const getGroupMembers = async (
},
}

const semaphore = new RedisSemaphore({
integrationId: ctx.integration.id,
apiCallType: 'getGroupMembers',
maxConcurrent: 1,
cache: ctx.cache,
})

try {
await semaphore.acquire()
const response = await axios(config)
return response.data
} catch (err) {
ctx.log.error(err, { groupName }, 'Error fetching members from group!')
throw err
} finally {
await semaphore.release()
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import axios, { AxiosRequestConfig } from 'axios'
import { IProcessStreamContext } from '../../../types'
import { RedisSemaphore } from '../utils/lock'

export const getMessagesFromTopic = async (
topicId: string,
Expand All @@ -16,11 +17,21 @@ export const getMessagesFromTopic = async (
},
}

const semaphore = new RedisSemaphore({
integrationId: ctx.integration.id,
apiCallType: 'getMessagesFromTopic',
maxConcurrent: 1,
cache: ctx.cache,
})

try {
await semaphore.acquire()
const response = await axios(config)
return response.data
} catch (err) {
ctx.log.error(err, { topic_id: topicId }, 'Error fetching messags from topic_id!')
throw err
} finally {
await semaphore.release()
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import axios, { AxiosRequestConfig } from 'axios'
import { IProcessStreamContext } from '../../../types'
import { GroupName } from '../types'
import { RedisSemaphore } from '../utils/lock'

export const getPastGroupMembers = async (
groupName: GroupName,
Expand All @@ -18,11 +19,21 @@ export const getPastGroupMembers = async (
},
}

const semaphore = new RedisSemaphore({
integrationId: ctx.integration.id,
apiCallType: 'getPastGroupMembers',
maxConcurrent: 1,
cache: ctx.cache,
})

try {
await semaphore.acquire()
const response = await axios(config)
return response.data
} catch (err) {
ctx.log.error(err, { groupName }, 'Error fetching past members from group!')
throw err
} finally {
await semaphore.release()
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import axios, { AxiosRequestConfig } from 'axios'
import { IProcessStreamContext } from '../../../types'
import { GroupName } from '../types'
import { RedisSemaphore } from '../utils/lock'

export const getTopicsFromGroup = async (
groupName: GroupName,
Expand All @@ -19,11 +20,21 @@ export const getTopicsFromGroup = async (
},
}

const semaphore = new RedisSemaphore({
integrationId: ctx.integration.id,
apiCallType: 'getTopicsFromGroup',
maxConcurrent: 1,
cache: ctx.cache,
})

try {
await semaphore.acquire()
const response = await axios(config)
return response.data
} catch (err) {
ctx.log.error(err, { groupName }, 'Error fetching topics from group!')
throw err
} finally {
await semaphore.release()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -277,15 +277,21 @@ const processPastGroupMembersStream: ProcessStreamHandler = async (ctx) => {
const data = ctx.stream.data as GroupsioPastGroupMembersStreamMetadata
const settings = ctx.integration.settings as GroupsioIntegrationSettings

const response = (await getPastGroupMembers(
data.group,
settings.token,
ctx,
data.page,
)) as ListPastMembers
let response: ListPastMembers
try {
response = await getPastGroupMembers(data.group, settings.token, ctx, data.page)
} catch (error) {
if (error.response.status === 400) {
// no access for this endpoint for this group, just ignoring
// we will go to activity logs
response = null
} else {
throw error
}
}

// publish members
if (response.data !== null) {
if (response?.data !== null && response?.data !== undefined) {
for (const pastMember of response.data) {
// we don't process expired member events because these are
// approvals that were not approved on time
Expand Down Expand Up @@ -319,7 +325,7 @@ const processPastGroupMembersStream: ProcessStreamHandler = async (ctx) => {
// publish activity logs streams to get member_join for historic members
// i.e past members
await ctx.publishStream<GroupsioActivityLogsStreamMetadata>(
`${GroupsioStreamType.ACTIVITY_LOGS}:${data.group}-${response.next_page_token}`,
`${GroupsioStreamType.ACTIVITY_LOGS}:${data.group}`,
{
group: data.group,
page: null,
Expand All @@ -340,7 +346,7 @@ const processGroupMembersStream: ProcessStreamHandler = async (ctx) => {
)) as ListMembers

// publish members
if (response.data !== null) {
if (response?.data !== null) {
for (const member of response.data) {
// caching member
await cacheMember(ctx, member)
Expand Down
49 changes: 49 additions & 0 deletions services/libs/integrations/src/integrations/groupsio/utils/lock.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
import { ICache } from '@crowd/types'

export class RedisSemaphore {
private readonly key: string
private readonly maxConcurrent: number
private readonly cache: ICache
private readonly timeout: number

constructor({
integrationId,
apiCallType,
maxConcurrent,
cache,
timeout = 60000,
}: {
integrationId: string
apiCallType: string
maxConcurrent: number
cache: ICache
timeout?: number
}) {
this.key = `groupsio-semaphore:${integrationId}:${apiCallType}`
this.maxConcurrent = maxConcurrent
this.cache = cache
this.timeout = timeout
}

async acquire(): Promise<boolean> {
const startTime = Date.now()
while (Date.now() - startTime < this.timeout) {
const current = await this.cache.get(this.key)
const currentValue = current ? parseInt(current, 10) : 0

if (currentValue < this.maxConcurrent) {
await this.cache.increment(this.key)
return true
}
await new Promise((resolve) => setTimeout(resolve, 100))
}
throw new Error(`Failed to acquire lock within timeout period for ${this.key}`)
}

async release(): Promise<void> {
const current = await this.cache.get(this.key)
if (current && parseInt(current, 10) > 0) {
await this.cache.decrement(this.key)
}
}
}

0 comments on commit 55c7e89

Please sign in to comment.