Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Rotate Catalyst Servers to Retry #127

Merged
merged 7 commits into from
Nov 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 54 additions & 28 deletions processor/src/adapters/badge-context.ts
Original file line number Diff line number Diff line change
@@ -1,19 +1,26 @@
import { Entity } from '@dcl/schemas'
import { createContentClient } from 'dcl-catalyst-client'
import { createContentClient, ContentClient } from 'dcl-catalyst-client'
import { getCatalystServersFromCache } from 'dcl-catalyst-client/dist/contracts-snapshots'
import { AppComponents, IBadgeContext } from '../types'
import { getTokenIdAndAssetUrn, isExtendedUrn, parseUrn } from '@dcl/urn-resolver'
import { retry } from '../utils/retryer'
import { shuffleArray } from '../utils/array'

type Options = {
retries?: number
waitTime?: number
contentServerUrl?: string
}

const L1_MAINNET = 'mainnet'
const L1_TESTNET = 'sepolia'

export async function createBadgeContext({
fetch,
config
}: Pick<AppComponents, 'fetch' | 'config'>): Promise<IBadgeContext> {
const loadBalancer = await config.requireString('CATALYST_CONTENT_URL_LOADBALANCER')

const contentClient = createContentClient({
fetcher: fetch,
url: loadBalancer
})
const contractNetwork = (await config.getString('ENV')) === 'prod' ? L1_MAINNET : L1_TESTNET

async function getWearablesWithRarity(wearables: string[]): Promise<Entity[]> {
const wearablesUrns: string[] = []
Expand All @@ -31,30 +38,49 @@ export async function createBadgeContext({
return fetchedWearables
}

async function getEntityById(
entityId: string,
options: { retries?: number; waitTime?: number; contentServerUrl?: string } = {}
): Promise<Entity> {
const retries = options.retries ?? 3
const waitTime = options.waitTime ?? 750
const contentClientToUse = options.contentServerUrl
? createContentClient({ fetcher: fetch, url: options.contentServerUrl })
: contentClient

return retry(() => contentClientToUse.fetchEntityById(entityId), retries, waitTime)
function getContentClientOrDefault(contentServerUrl?: string): ContentClient {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This use case will be very common, we can live with it for now but I think the best thing to do on these cases is to expose a function in catalyst-client's contentClient and lambdaClient like:

function setCatalyst(url: string): void

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

return contentServerUrl
? createContentClient({ fetcher: fetch, url: contentServerUrl })
: createContentClient({
fetcher: fetch,
url: loadBalancer
})
}

function rotateContentServerClient<T>(
executeClientRequest: (client: ContentClient) => Promise<T>,
contentServerUrl?: string
) {
const catalystServers = shuffleArray(getCatalystServersFromCache(contractNetwork)).map((server) => server.address)
let contentClientToUse: ContentClient = getContentClientOrDefault(contentServerUrl)

return (attempt: number): Promise<T> => {
if (attempt > 1 && catalystServers.length > 0) {
const [catalystServerUrl] = catalystServers.splice(attempt % catalystServers.length, 1)
contentClientToUse = getContentClientOrDefault(catalystServerUrl)
}

return executeClientRequest(contentClientToUse)
}
}

async function getEntityById(entityId: string, options: Options = {}): Promise<Entity> {
const { retries = 3, waitTime = 750, contentServerUrl } = options
const executeClientRequest = rotateContentServerClient(
(contentClientToUse) => contentClientToUse.fetchEntityById(entityId),
contentServerUrl
)

return retry(executeClientRequest, retries, waitTime)
}

async function getEntitiesByPointers(
pointers: string[],
options: { retries?: number; waitTime?: number; contentServerUrl?: string } = {}
): Promise<Entity[]> {
const retries = options.retries ?? 3
const waitTime = options.waitTime ?? 300
const contentClientToUse = options.contentServerUrl
? createContentClient({ fetcher: fetch, url: options.contentServerUrl })
: contentClient

return retry(() => contentClientToUse.fetchEntitiesByPointers(pointers), retries, waitTime)
async function getEntitiesByPointers(pointers: string[], options: Options = {}): Promise<Entity[]> {
const { retries = 3, waitTime = 300, contentServerUrl } = options
const executeClientRequest = rotateContentServerClient(
(contentClientToUse) => contentClientToUse.fetchEntitiesByPointers(pointers),
contentServerUrl
)
return retry(executeClientRequest, retries, waitTime)
}

return { getWearablesWithRarity, getEntityById, getEntitiesByPointers }
Expand Down
21 changes: 16 additions & 5 deletions processor/src/logic/event-dispatcher.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { EthAddress, Event, Events } from '@dcl/schemas'
import { AppComponents, IEventDispatcher, IObserver } from '../types'
import { AppComponents, EventProcessingResult, IEventDispatcher, IObserver } from '../types'
import { BadgeId, UserBadge } from '@badges/common'
import { retry } from '../utils/retryer'

export function createEventDispatcher({
logs,
Expand Down Expand Up @@ -29,10 +30,14 @@ export function createEventDispatcher({
}
}

async function handleEvent(observer: IObserver, event: Event, userProgress: UserBadge | undefined): Promise<any> {
async function handleEvent(
observer: IObserver,
event: Event,
userProgress: UserBadge | undefined
): Promise<EventProcessingResult> {
try {
const result = await observer.handle(event, userProgress)
return result
const result = await retry(() => observer.handle(event, userProgress))
return { ok: true, result }
} catch (error: any) {
metrics.increment('handler_failures_count', {
event_type: event.type,
Expand All @@ -51,6 +56,8 @@ export function createEventDispatcher({
rootStack: JSON.stringify(error?.cause?.stack),
eventKey: event?.key
})

return { ok: false, error }
}
}

Expand Down Expand Up @@ -101,7 +108,11 @@ export function createEventDispatcher({
return handleEvent(observer, event, userProgress)
})

const badgesToGrant = (await Promise.all(asyncResults)).filter(Boolean).flat()
const results = await Promise.all(asyncResults)

const badgesToGrant = results.filter(({ ok, result }) => ok && !!result).map(({ result }) => result)
// If we go with the retry queue, we could use the following to retry only the handlers that fail
// const handlersToRetry = results.filter(({ ok }) => !ok)

metrics.increment('events_correctly_handled_count', {
event_type: event.type,
Expand Down
6 changes: 6 additions & 0 deletions processor/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,12 @@ export class ParsingEventError extends Error {
}
}

export type EventProcessingResult = {
ok: boolean
result?: BadgeProcessorResult | undefined
error?: Error
}

export type BadgeProcessorResult = {
badgeGranted: Badge
userAddress: string
Expand Down
7 changes: 7 additions & 0 deletions processor/src/utils/array.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
export function shuffleArray<T>(array: T[]): T[] {
for (let i = array.length - 1; i > 0; i--) {
const j = Math.floor(Math.random() * (i + 1))
;[array[i], array[j]] = [array[j], array[i]]
}
return array
}
8 changes: 6 additions & 2 deletions processor/src/utils/retryer.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
import { sleep } from './timer'

export async function retry<T>(action: () => Promise<T>, retries: number = 3, waitTime: number = 300): Promise<T> {
export async function retry<T>(
action: (attempt: number) => Promise<T>,
retries: number = 3,
waitTime: number = 300
): Promise<T> {
for (let attempt = 1; attempt <= retries; attempt++) {
try {
return await action()
return await action(attempt)
} catch (error: any) {
if (attempt === retries) {
throw new Error(`Failed after ${retries} attempts`, { cause: error })
Expand Down
Loading
Loading