Skip to content

Commit

Permalink
Introduce new cache which is tied up to integrationId (#1200)
Browse files Browse the repository at this point in the history
  • Loading branch information
garrrikkotua authored Aug 1, 2023
1 parent 1b082e2 commit a991c96
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,12 @@ export default class IntegrationStreamService extends LoggerBase {

const globalCache = new RedisCache(`int-global`, this.redisClient, this.log)

const integrationCache = new RedisCache(
`int-${streamInfo.integrationId}`,
this.redisClient,
this.log,
)

const nangoConfig = NANGO_CONFIG()

const context: IProcessWebhookStreamContext = {
Expand Down Expand Up @@ -286,6 +292,7 @@ export default class IntegrationStreamService extends LoggerBase {
log: this.log,
cache,
globalCache,
integrationCache,

publishData: async (data) => {
await this.publishData(
Expand Down Expand Up @@ -403,6 +410,12 @@ export default class IntegrationStreamService extends LoggerBase {

const globalCache = new RedisCache(`int-global`, this.redisClient, this.log)

const integrationCache = new RedisCache(
`int-${streamInfo.integrationId}`,
this.redisClient,
this.log,
)

const nangoConfig = NANGO_CONFIG()

const context: IProcessStreamContext = {
Expand Down Expand Up @@ -433,6 +446,7 @@ export default class IntegrationStreamService extends LoggerBase {
log: this.log,
cache,
globalCache,
integrationCache,

publishData: async (data) => {
await this.publishData(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ function getAuth(ctx: IProcessStreamContext): AuthInterface | undefined {

const getTokenFromCache = async (ctx: IProcessStreamContext) => {
const key = 'github-token-cache'
const cache = ctx.cache
const cache = ctx.integrationCache // this cache is tied up with integrationId

const token = await cache.get(key)

Expand All @@ -77,7 +77,7 @@ const getTokenFromCache = async (ctx: IProcessStreamContext) => {

const setTokenToCache = async (ctx: IProcessStreamContext, token: AppTokenResponse) => {
const key = 'github-token-cache'
const cache = ctx.cache
const cache = ctx.integrationCache // this cache is tied up with integrationId

// cache for 5 minutes
await cache.set(key, JSON.stringify(token), 5 * 60)
Expand All @@ -97,7 +97,7 @@ async function getGithubToken(ctx: IProcessStreamContext): Promise<string> {
appToken = await getAppToken(jwtToken, ctx.integration.identifier)
}

setTokenToCache(ctx, appToken)
await setTokenToCache(ctx, appToken)

return appToken.token
}
Expand All @@ -115,6 +115,9 @@ async function getMemberEmail(ctx: IProcessStreamContext, login: string): Promis
return ''
}

// here we use cache for tenantId-integrationType
// So in LFX case different integration will have access to the same cache
// But this is fine
const cache = ctx.cache

const existing = await cache.get(login)
Expand Down
19 changes: 19 additions & 0 deletions services/libs/integrations/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ export interface IIntegrationContext {
onboarding?: boolean
integration: IIntegration
log: Logger
/**
* Cache that is tied up to the tenantId and integration type
*/
cache: ICache

publishStream: <T>(identifier: string, metadata?: T) => Promise<void>
Expand Down Expand Up @@ -45,8 +48,16 @@ export interface IProcessStreamContext extends IIntegrationContext {

abortWithError: (message: string, metadata?: unknown, error?: Error) => Promise<void>

/**
* Global cache that is shared between all integrations
*/
globalCache: ICache

/**
* Cache that is shared between all streams of the same integration (integrationId)
*/
integrationCache: ICache

getRateLimiter: (maxRequests: number, timeWindowSeconds: number, cacheKey: string) => IRateLimiter
}

Expand All @@ -65,8 +76,16 @@ export interface IProcessWebhookStreamContext {

abortWithError: (message: string, metadata?: unknown, error?: Error) => Promise<void>

/**
* Global cache that is shared between all integrations
*/
globalCache: ICache

/**
* Cache that is shared between all streams of the same integration (integrationId)
*/
integrationCache: ICache

getRateLimiter: (maxRequests: number, timeWindowSeconds: number, cacheKey: string) => IRateLimiter
}

Expand Down

0 comments on commit a991c96

Please sign in to comment.