Skip to content

Commit

Permalink
Exponential backoff in throttler and changing visibility timeout for …
Browse files Browse the repository at this point in the history
…long running streams (#1546)
  • Loading branch information
epipav authored Sep 26, 2023
1 parent 2d2aab4 commit f49be45
Show file tree
Hide file tree
Showing 19 changed files with 144 additions and 24 deletions.
7 changes: 5 additions & 2 deletions services/apps/integration_stream_worker/src/queue/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ export class WorkerQueueReceiver extends SqsQueueReceiver {
super(client, INTEGRATION_STREAM_WORKER_QUEUE_SETTINGS, maxConcurrentProcessing, parentLog)
}

override async processMessage(message: IQueueMessage): Promise<void> {
override async processMessage(message: IQueueMessage, receiptHandle: string): Promise<void> {
try {
this.log.trace({ messageType: message.type }, 'Processing message!')

Expand All @@ -55,7 +55,10 @@ export class WorkerQueueReceiver extends SqsQueueReceiver {
)
break
case IntegrationStreamWorkerQueueMessageType.PROCESS_STREAM:
await service.processStream((message as ProcessStreamQueueMessage).streamId)
await service.processStream(
(message as ProcessStreamQueueMessage).streamId,
receiptHandle,
)
break
case IntegrationStreamWorkerQueueMessageType.PROCESS_WEBHOOK_STREAM:
await service.processWebhookStream(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -382,7 +382,7 @@ export default class IntegrationStreamService extends LoggerBase {
}
}

public async processStream(streamId: string): Promise<boolean> {
public async processStream(streamId: string, receiptHandle?: string): Promise<boolean> {
this.log.debug({ streamId }, 'Trying to process stream!')

const streamInfo = await this.repo.getStreamData(streamId)
Expand Down Expand Up @@ -508,6 +508,10 @@ export default class IntegrationStreamService extends LoggerBase {
undefined,
)
},
setMessageVisibilityTimeout: async (newTimeout: number) => {
this.log.trace(`Changing message visibility of ${receiptHandle} to ${newTimeout}!`)
await this.streamWorkerEmitter.setMessageVisibilityTimeout(receiptHandle, newTimeout)
},
updateIntegrationSettings: async (settings) => {
await this.updateIntegrationSettings(streamId, settings)
},
Expand Down
58 changes: 54 additions & 4 deletions services/libs/common/src/requestThrottler.ts
Original file line number Diff line number Diff line change
@@ -1,32 +1,82 @@
import { timeout } from './timing'

/* eslint-disable @typescript-eslint/no-explicit-any */
export class RequestThrottler {
private requests: number
private totalRequests: number
private interval: number
private logger: any
private backoff: number
private backoffFactor: number
private backoffStart: number
private backoffRetries: number
private MAX_BACKOFF_RETRIES = 4

constructor(totalRequests: number, interval: number, logger: any) {
constructor(
totalRequests: number,
interval: number,
logger: any,
backoffStart = 1,
backOffFactor = 2,
) {
this.totalRequests = totalRequests
this.requests = totalRequests
this.interval = interval
this.logger = logger
this.backoffStart = backoffStart
this.backoff = backoffStart
this.backoffFactor = backOffFactor
this.backoffRetries = 0
setInterval(() => this.replenish(), this.interval)
}

private replenish() {
private replenish(): void {
this.requests = this.totalRequests // Replenishes requests every interval
}

private refreshBackoff(): void {
this.backoff = this.backoffStart
this.backoffRetries = 0
}

private advanceBackoff(): void {
this.backoff = this.backoff * this.backoffFactor
this.backoffRetries += 1
}

async throttle<T>(func: () => Promise<T>): Promise<T> {
if (this.requests > 0) {
this.requests--
return await func()

try {
const value = await func()
this.refreshBackoff()
return value
} catch (error) {
this.logger.warn(`Error while executing throttling function!`)
if (error) {
this.logger.info(
`Starting exponential backoff with: ${this.backoff} seconds and factor: ${this.backoffFactor}!`,
)

if (this.backoffRetries < this.MAX_BACKOFF_RETRIES) {
await timeout(this.backoff)
this.advanceBackoff()
return this.throttle(func)
} else {
this.logger.warn(
`Request failed to resolve after ${this.MAX_BACKOFF_RETRIES} exponential backoff retries!`,
)
}
throw error
}
}
} else {
// Delay by the replenishment interval if out of requests
this.logger.debug(
`Throttling api requests limit ${this.totalRequests}, waiting ${this.interval}ms`,
)
await new Promise((resolve) => setTimeout(resolve, this.interval))
await timeout(this.interval)
return this.throttle(func)
}
}
Expand Down
10 changes: 9 additions & 1 deletion services/libs/integrations/src/integrations/nango.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
import { RequestThrottler } from '@crowd/common'
import { IProcessStreamContext, IGenerateStreamsContext } from '../types'
import axios from 'axios'

export const getNangoToken = async (
connectionId: string,
providerConfigKey: string,
ctx: IProcessStreamContext | IGenerateStreamsContext,
throttler?: RequestThrottler,
): Promise<string> => {
try {
const url = `${ctx.serviceSettings.nangoUrl}/connection/${connectionId}`
Expand All @@ -20,7 +22,13 @@ export const getNangoToken = async (
provider_config_key: providerConfigKey,
}

const response = await axios.get(url, { params, headers })
let response

if (throttler) {
response = await throttler.throttle(() => axios.get(url, { params, headers }))
} else {
response = await axios.get(url, { params, headers })
}

return response.data.credentials.access_token
} catch (err) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@ import axios, { AxiosRequestConfig } from 'axios'
import { PlatformType } from '@crowd/types'
import { getNangoToken } from '../../../nango'
import { IGenerateStreamsContext, IProcessStreamContext } from '@/types'
import { RequestThrottler } from '@crowd/common'

export const addContactsToList = async (
nangoId: string,
listId: string,
contactIds: string[],
ctx: IProcessStreamContext | IGenerateStreamsContext,
throttler: RequestThrottler,
): Promise<void> => {
const config: AxiosRequestConfig<unknown> = {
method: 'post',
Expand All @@ -17,14 +19,14 @@ export const addContactsToList = async (
try {
ctx.log.debug({ nangoId }, `Adding contacts to list ${listId} in HubSpot!`)

const accessToken = await getNangoToken(nangoId, PlatformType.HUBSPOT, ctx)
const accessToken = await getNangoToken(nangoId, PlatformType.HUBSPOT, ctx, throttler)
config.headers = { Authorization: `Bearer ${accessToken}` }

config.data = {
vids: contactIds,
}

await axios(config)
await throttler.throttle(() => axios(config))
} catch (err) {
ctx.log.error({ err }, `Error while adding contacts to hubspot list!`)
throw err
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ export const batchCreateMembers = async (
}

// Get an access token from Nango
const accessToken = await getNangoToken(nangoId, PlatformType.HUBSPOT, ctx)
const accessToken = await getNangoToken(nangoId, PlatformType.HUBSPOT, ctx, throttler)

ctx.log.debug({ nangoId, accessToken, data: config.data }, 'Creating bulk contacts in HubSpot')

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ export const batchCreateOrganizations = async (
}

// Get an access token from Nango
const accessToken = await getNangoToken(nangoId, PlatformType.HUBSPOT, ctx)
const accessToken = await getNangoToken(nangoId, PlatformType.HUBSPOT, ctx, throttler)

ctx.log.debug(
{ nangoId, accessToken, data: config.data },
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ export const batchUpdateMembers = async (
}

// Get an access token from Nango
const accessToken = await getNangoToken(nangoId, PlatformType.HUBSPOT, ctx)
const accessToken = await getNangoToken(nangoId, PlatformType.HUBSPOT, ctx, throttler)

ctx.log.debug({ nangoId, accessToken, data: config.data }, 'Updating bulk contacts in HubSpot')

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ export const batchUpdateOrganizations = async (
}

// Get an access token from Nango
const accessToken = await getNangoToken(nangoId, PlatformType.HUBSPOT, ctx)
const accessToken = await getNangoToken(nangoId, PlatformType.HUBSPOT, ctx, throttler)

ctx.log.debug({ nangoId, accessToken, data: config.data }, 'Updating bulk companies in HubSpot')

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ export const getCompanies = async (

try {
// Get an access token from Nango
const accessToken = await getNangoToken(nangoId, PlatformType.HUBSPOT, ctx)
const accessToken = await getNangoToken(nangoId, PlatformType.HUBSPOT, ctx, throttler)

ctx.log.debug({ nangoId, accessToken }, 'Fetching contacts from HubSpot')

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ export const getCompanyById = async (
ctx.log.debug({ nangoId, companyId }, 'Fetching company by id from HubSpot')

// Get an access token from Nango
const accessToken = await getNangoToken(nangoId, PlatformType.HUBSPOT, ctx)
const accessToken = await getNangoToken(nangoId, PlatformType.HUBSPOT, ctx, throttler)

ctx.log.debug({ accessToken }, `nango token`)
config.headers.Authorization = `Bearer ${accessToken}`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,12 @@ export const getContactAssociations = async (
url: `https://api.hubapi.com/crm/v3/objects/contacts/${contactId}/associations/${association}`,
}
try {
ctx.log.debug({ nangoId }, `Fetching contact associations [${association}] from HubSpot`)
ctx.log.debug(
{ nangoId },
`Fetching contact associations [${association}] of contact ${contactId} from HubSpot`,
)

const accessToken = await getNangoToken(nangoId, PlatformType.HUBSPOT, ctx)
const accessToken = await getNangoToken(nangoId, PlatformType.HUBSPOT, ctx, throttler)
config.headers = { Authorization: `Bearer ${accessToken}` }

const result = await throttler.throttle(() => axios(config))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ export const getContactById = async (
ctx.log.debug({ nangoId, contactId }, 'Fetching contact by id from HubSpot')

// Get an access token from Nango
const accessToken = await getNangoToken(nangoId, PlatformType.HUBSPOT, ctx)
const accessToken = await getNangoToken(nangoId, PlatformType.HUBSPOT, ctx, throttler)

ctx.log.debug({ accessToken }, `nango token`)
config.headers.Authorization = `Bearer ${accessToken}`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ export const getContacts = async (
ctx.log.debug({ nangoId }, 'Fetching contacts from HubSpot')

// Get an access token from Nango
const accessToken = await getNangoToken(nangoId, PlatformType.HUBSPOT, ctx)
const accessToken = await getNangoToken(nangoId, PlatformType.HUBSPOT, ctx, throttler)

ctx.log.debug({ accessToken }, `nango token`)
config.headers.Authorization = `Bearer ${accessToken}`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,13 @@ import { getAllCompanies } from './api/companies'
import { RequestThrottler } from '@crowd/common'

const processRootStream: ProcessStreamHandler = async (ctx) => {
const throttler = new RequestThrottler(9, 1100, ctx.log)
const throttler = new RequestThrottler(99, 11000, ctx.log)

const settings = ctx.integration.settings as IHubspotIntegrationSettings

// hubspot might have long running root stream, change stream queue message visibility to 7 hours
await ctx.setMessageVisibilityTimeout(60 * 60 * 7)

const streams = ctx.stream.data as HubspotStream[]

if (streams.includes(HubspotStream.MEMBERS)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ const handler: ProcessIntegrationSyncHandler = async <T>(
ctx.automation.settings.contactList,
vids,
integrationContext,
throttler,
)
}

Expand Down
2 changes: 2 additions & 0 deletions services/libs/integrations/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ export interface IProcessStreamContext extends IIntegrationContext {
serviceSettings: IIntegrationServiceSettings
platformSettings?: unknown

setMessageVisibilityTimeout: (newTimeout: number) => Promise<void>

publishData: <T>(data: T) => Promise<void>

abortWithError: (message: string, metadata?: unknown, error?: Error) => Promise<void>
Expand Down
25 changes: 25 additions & 0 deletions services/libs/sqs/src/client.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
import {
ChangeMessageVisibilityCommand,
ChangeMessageVisibilityCommandOutput,
ChangeMessageVisibilityRequest,
DeleteMessageCommand,
DeleteMessageRequest,
ReceiveMessageCommand,
Expand Down Expand Up @@ -143,3 +146,25 @@ export const sendMessagesBulk = async (
throw err
}
}

export const changeMessageVisibility = async (
client: SqsClient,
params: ChangeMessageVisibilityRequest,
retry = 0,
): Promise<ChangeMessageVisibilityCommandOutput> => {
try {
return client.send(new ChangeMessageVisibilityCommand(params))
} catch (err) {
if (
(err.message === 'Request is throttled.' ||
err.message === 'Queue Throttled' ||
(err.code === 'EAI_AGAIN' && err.syscall === 'getaddrinfo')) &&
retry < 10
) {
await timeout(1000)
return await changeMessageVisibility(client, params, retry + 1)
}

throw err
}
}
25 changes: 22 additions & 3 deletions services/libs/sqs/src/queue.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import {
ChangeMessageVisibilityRequest,
CreateQueueCommand,
DeleteMessageRequest,
GetQueueUrlCommand,
Expand All @@ -8,7 +9,13 @@ import {
} from '@aws-sdk/client-sqs'
import { IS_PROD_ENV, IS_STAGING_ENV, generateUUIDv1, timeout } from '@crowd/common'
import { Logger, LoggerBase } from '@crowd/logging'
import { deleteMessage, receiveMessage, sendMessage, sendMessagesBulk } from './client'
import {
deleteMessage,
receiveMessage,
sendMessage,
sendMessagesBulk,
changeMessageVisibility,
} from './client'
import { ISqsQueueConfig, SqsClient, SqsMessage, SqsQueueType } from './types'
import { IQueueMessage, ISqsQueueEmitter } from '@crowd/types'

Expand Down Expand Up @@ -134,7 +141,7 @@ export abstract class SqsQueueReceiver extends SqsQueueBase {
if (this.isAvailable()) {
this.log.trace({ messageId: message.MessageId }, 'Received message from queue!')
this.addJob()
this.processMessage(JSON.parse(message.Body))
this.processMessage(JSON.parse(message.Body), message.ReceiptHandle)
// when the message is processed, delete it from the queue
.then(async () => {
this.log.trace(
Expand Down Expand Up @@ -169,7 +176,7 @@ export abstract class SqsQueueReceiver extends SqsQueueBase {
this.started = false
}

protected abstract processMessage(data: IQueueMessage): Promise<void>
protected abstract processMessage(data: IQueueMessage, receiptHandle?: string): Promise<void>

private async receiveMessage(): Promise<SqsMessage[]> {
try {
Expand Down Expand Up @@ -250,4 +257,16 @@ export abstract class SqsQueueEmitter extends SqsQueueBase implements ISqsQueueE
Entries: entries,
})
}

public async setMessageVisibilityTimeout(
receiptHandle: string,
newVisibility: number,
): Promise<void> {
const params: ChangeMessageVisibilityRequest = {
QueueUrl: this.getQueueUrl(),
ReceiptHandle: receiptHandle,
VisibilityTimeout: newVisibility,
}
await changeMessageVisibility(this.sqsClient, params)
}
}

0 comments on commit f49be45

Please sign in to comment.