Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix groups io errors #2616

Merged
merged 6 commits into from
Sep 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 55 additions & 21 deletions backend/src/database/repositories/integrationRepository.ts
Original file line number Diff line number Diff line change
Expand Up @@ -521,32 +521,66 @@ class IntegrationRepository {
const seq = SequelizeRepository.getSequelize(options)

const integrationIds = rows.map((row) => row.id)
let results = []

if (integrationIds.length > 0) {
const query = `select "integrationId", max("processedAt") as "processedAt" from "incomingWebhooks"
where "integrationId" in (:integrationIds) and state = 'PROCESSED'
group by "integrationId"`

results = await seq.query(query, {
replacements: {
integrationIds,
},
type: QueryTypes.SELECT,
transaction: SequelizeRepository.getTransaction(options),
const webhookQuery = `
SELECT "integrationId", MAX("processedAt") AS "webhookProcessedAt"
FROM "incomingWebhooks"
WHERE "integrationId" IN (:integrationIds) AND state = 'PROCESSED'
GROUP BY "integrationId"
`

const runQuery = `
SELECT "integrationId", MAX("processedAt") AS "runProcessedAt"
FROM integration.runs
WHERE "integrationId" IN (:integrationIds)
GROUP BY "integrationId"
`

const [webhookResults, runResults] = await Promise.all([
seq.query(webhookQuery, {
replacements: { integrationIds },
type: QueryTypes.SELECT,
transaction: SequelizeRepository.getTransaction(options),
}),
seq.query(runQuery, {
replacements: { integrationIds },
type: QueryTypes.SELECT,
transaction: SequelizeRepository.getTransaction(options),
}),
])

const processedAtMap = integrationIds.reduce((map, id) => {
const webhookResult: any = webhookResults.find(
(r: { integrationId: string }) => r.integrationId === id,
)
const runResult: any = runResults.find(
(r: { integrationId: string }) => r.integrationId === id,
)
map[id] = {
webhookProcessedAt: webhookResult ? webhookResult.webhookProcessedAt : null,
runProcessedAt: runResult ? runResult.runProcessedAt : null,
}
return map
}, {})

rows.forEach((row) => {
const processedAt = processedAtMap[row.id]
// Use the latest processedAt from either webhook or run, or fall back to updatedAt
row.lastProcessedAt = processedAt
? new Date(
Math.max(
processedAt.webhookProcessedAt
? new Date(processedAt.webhookProcessedAt).getTime()
: 0,
processedAt.runProcessedAt ? new Date(processedAt.runProcessedAt).getTime() : 0,
new Date(row.updatedAt).getTime(),
),
)
: row.updatedAt
})
}

const processedAtMap = results.reduce((map, item) => {
map[item.integrationId] = item.processedAt
return map
}, {})

rows.forEach((row) => {
// Either use the last processedAt, or fall back updatedAt
row.lastProcessedAt = processedAtMap[row.id] || row.updatedAt
})

return { rows, count, limit: parsed.limit, offset: parsed.offset }
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import axios, { AxiosRequestConfig } from 'axios'
import { IProcessStreamContext } from '../../../types'
import { GroupName } from '../types'
import { RedisSemaphore } from '../utils/lock'

export const getActivityLogs = async (
groupName: GroupName,
Expand All @@ -19,11 +20,21 @@ export const getActivityLogs = async (
},
}

const semaphore = new RedisSemaphore({
integrationId: ctx.integration.id,
apiCallType: 'getActivityLogs',
maxConcurrent: 1,
cache: ctx.cache,
})

try {
await semaphore.acquire()
const response = await axios(config)
return response.data
} catch (err) {
ctx.log.error(err, { groupName }, 'Error fetching activity logs from group!')
throw err
} finally {
await semaphore.release()
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import axios, { AxiosRequestConfig } from 'axios'
import { IProcessStreamContext } from '../../../types'
import { GroupName } from '../types'
import { RedisSemaphore } from '../utils/lock'

export const getGroupMember = async (
memberInfoId: string,
Expand All @@ -18,11 +19,21 @@ export const getGroupMember = async (
},
}

const semaphore = new RedisSemaphore({
integrationId: ctx.integration.id,
apiCallType: 'getGroupMember',
maxConcurrent: 1,
cache: ctx.cache,
})

try {
await semaphore.acquire()
const response = await axios(config)
return response.data
} catch (err) {
ctx.log.error(err, { memberInfoId }, 'Error fetching member by memberInfoId!')
throw err
} finally {
await semaphore.release()
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import axios, { AxiosRequestConfig } from 'axios'
import { IProcessStreamContext } from '../../../types'
import { GroupName } from '../types'
import { RedisSemaphore } from '../utils/lock'

export const getGroupMembers = async (
groupName: GroupName,
Expand All @@ -18,11 +19,21 @@ export const getGroupMembers = async (
},
}

const semaphore = new RedisSemaphore({
integrationId: ctx.integration.id,
apiCallType: 'getGroupMembers',
maxConcurrent: 1,
cache: ctx.cache,
})

try {
await semaphore.acquire()
const response = await axios(config)
return response.data
} catch (err) {
ctx.log.error(err, { groupName }, 'Error fetching members from group!')
throw err
} finally {
await semaphore.release()
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import axios, { AxiosRequestConfig } from 'axios'
import { IProcessStreamContext } from '../../../types'
import { RedisSemaphore } from '../utils/lock'

export const getMessagesFromTopic = async (
topicId: string,
Expand All @@ -16,11 +17,21 @@ export const getMessagesFromTopic = async (
},
}

const semaphore = new RedisSemaphore({
integrationId: ctx.integration.id,
apiCallType: 'getMessagesFromTopic',
maxConcurrent: 1,
cache: ctx.cache,
})

try {
await semaphore.acquire()
const response = await axios(config)
return response.data
} catch (err) {
ctx.log.error(err, { topic_id: topicId }, 'Error fetching messags from topic_id!')
throw err
} finally {
await semaphore.release()
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import axios, { AxiosRequestConfig } from 'axios'
import { IProcessStreamContext } from '../../../types'
import { GroupName } from '../types'
import { RedisSemaphore } from '../utils/lock'

export const getPastGroupMembers = async (
groupName: GroupName,
Expand All @@ -18,11 +19,21 @@ export const getPastGroupMembers = async (
},
}

const semaphore = new RedisSemaphore({
integrationId: ctx.integration.id,
apiCallType: 'getPastGroupMembers',
maxConcurrent: 1,
cache: ctx.cache,
})

try {
await semaphore.acquire()
const response = await axios(config)
return response.data
} catch (err) {
ctx.log.error(err, { groupName }, 'Error fetching past members from group!')
throw err
} finally {
await semaphore.release()
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import axios, { AxiosRequestConfig } from 'axios'
import { IProcessStreamContext } from '../../../types'
import { GroupName } from '../types'
import { RedisSemaphore } from '../utils/lock'

export const getTopicsFromGroup = async (
groupName: GroupName,
Expand All @@ -19,11 +20,21 @@ export const getTopicsFromGroup = async (
},
}

const semaphore = new RedisSemaphore({
integrationId: ctx.integration.id,
apiCallType: 'getTopicsFromGroup',
maxConcurrent: 1,
cache: ctx.cache,
})

try {
await semaphore.acquire()
const response = await axios(config)
return response.data
} catch (err) {
ctx.log.error(err, { groupName }, 'Error fetching topics from group!')
throw err
} finally {
await semaphore.release()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -277,15 +277,21 @@ const processPastGroupMembersStream: ProcessStreamHandler = async (ctx) => {
const data = ctx.stream.data as GroupsioPastGroupMembersStreamMetadata
const settings = ctx.integration.settings as GroupsioIntegrationSettings

const response = (await getPastGroupMembers(
data.group,
settings.token,
ctx,
data.page,
)) as ListPastMembers
let response: ListPastMembers
try {
response = await getPastGroupMembers(data.group, settings.token, ctx, data.page)
} catch (error) {
if (error.response.status === 400) {
// no access for this endpoint for this group, just ignoring
// we will go to activity logs
response = null
} else {
throw error
}
}

// publish members
if (response.data !== null) {
if (response?.data !== null && response?.data !== undefined) {
for (const pastMember of response.data) {
// we don't process expired member events because these are
// approvals that were not approved on time
Expand Down Expand Up @@ -319,7 +325,7 @@ const processPastGroupMembersStream: ProcessStreamHandler = async (ctx) => {
// publish activity logs streams to get member_join for historic members
// i.e past members
await ctx.publishStream<GroupsioActivityLogsStreamMetadata>(
`${GroupsioStreamType.ACTIVITY_LOGS}:${data.group}-${response.next_page_token}`,
`${GroupsioStreamType.ACTIVITY_LOGS}:${data.group}`,
{
group: data.group,
page: null,
Expand All @@ -340,7 +346,7 @@ const processGroupMembersStream: ProcessStreamHandler = async (ctx) => {
)) as ListMembers

// publish members
if (response.data !== null) {
if (response?.data !== null) {
for (const member of response.data) {
// caching member
await cacheMember(ctx, member)
Expand Down
49 changes: 49 additions & 0 deletions services/libs/integrations/src/integrations/groupsio/utils/lock.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
import { ICache } from '@crowd/types'

export class RedisSemaphore {
private readonly key: string
private readonly maxConcurrent: number
private readonly cache: ICache
private readonly timeout: number

constructor({
integrationId,
apiCallType,
maxConcurrent,
cache,
timeout = 60000,
}: {
integrationId: string
apiCallType: string
maxConcurrent: number
cache: ICache
timeout?: number
}) {
this.key = `groupsio-semaphore:${integrationId}:${apiCallType}`
this.maxConcurrent = maxConcurrent
this.cache = cache
this.timeout = timeout
}

async acquire(): Promise<boolean> {
const startTime = Date.now()
while (Date.now() - startTime < this.timeout) {
const current = await this.cache.get(this.key)
const currentValue = current ? parseInt(current, 10) : 0

if (currentValue < this.maxConcurrent) {
await this.cache.increment(this.key)
return true
}
await new Promise((resolve) => setTimeout(resolve, 100))
}
throw new Error(`Failed to acquire lock within timeout period for ${this.key}`)
}

async release(): Promise<void> {
const current = await this.cache.get(this.key)
if (current && parseInt(current, 10) > 0) {
await this.cache.decrement(this.key)
}
}
}
Loading