Skip to content

Commit

Permalink
add metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
plusminushalf committed Nov 7, 2024
1 parent 0ad7913 commit c4cd165
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 14 deletions.
9 changes: 7 additions & 2 deletions src/cli/setupServer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -391,9 +391,14 @@ export const setupServer = async ({
"dumping mempool before shutdown"
)

for (const wallet of senderManager.getWalletsInUse()) {
console.log("pushing wallet back to pool", wallet.address)
senderManager.pushWallet(wallet)
}

process.exit(0)
}

process.on("SIGINT", gracefulShutdown)
process.on("SIGTERM", gracefulShutdown)
process.once("SIGINT", gracefulShutdown)
process.once("SIGTERM", gracefulShutdown)
}
25 changes: 18 additions & 7 deletions src/executor/senderManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import type { AltoConfig } from "@alto/config"
import { Redis } from "ioredis"

export type SenderManager = {
getWalletsInUse: () => Account[]
getAllWallets: () => Account[]
getWallet: () => Promise<Account>
pushWallet: (wallet: Account) => void
Expand Down Expand Up @@ -34,7 +35,7 @@ const createLocalSenderManager = ({
}: {
config: AltoConfig
metrics: Metrics
}) => {
}): SenderManager => {
const wallets = getAvailableWallets(config)
const availableWallets = [...wallets]

Expand All @@ -49,11 +50,10 @@ const createLocalSenderManager = ({

return {
getAllWallets: () => [...wallets],
getWalletsInUse: () => [...availableWallets],
async getWallet() {
logger.trace("waiting for semaphore ")
const [result] = await semaphore.acquire()

console.log(result)
await semaphore.acquire()

const wallet = availableWallets.shift()

Expand Down Expand Up @@ -83,6 +83,7 @@ const createLocalSenderManager = ({
{ executor: wallet.address },
"pushed wallet to sender manager"
)

metrics.walletsAvailable.set(availableWallets.length)
return
}
Expand All @@ -102,8 +103,11 @@ async function createRedisQueue({
}) {
const hasElements = await redis.llen(name)

if (!hasElements) {
await redis.lpush(name, ...entries)
if (hasElements === 0) {
const pipeline = redis.pipeline()
pipeline.del(name)
pipeline.lpush(name, ...entries)
await pipeline.exec()
}

return {
Expand All @@ -119,12 +123,13 @@ const createRedisSenderManager = async ({
}: {
config: AltoConfig
metrics: Metrics
}) => {
}): Promise<SenderManager> => {
if (!config.redisQueueEndpoint) {
throw new Error("redisQueueEndpoint is required")
}

const wallets = getAvailableWallets(config)
const walletsInUse: Record<string, Account> = {}

const logger = config.getLogger(
{ module: "sender-manager" },
Expand All @@ -142,6 +147,7 @@ const createRedisSenderManager = async ({

return {
getAllWallets: () => [...wallets],
getWalletsInUse: () => Object.values(walletsInUse),
getWallet: async () => {
logger.trace("waiting for wallet ")

Expand All @@ -167,6 +173,7 @@ const createRedisSenderManager = async ({
redisQueue.llen().then((len) => {
metrics.walletsAvailable.set(len)
})
walletsInUse[wallet.address] = wallet

return wallet
},
Expand All @@ -176,6 +183,10 @@ const createRedisSenderManager = async ({
metrics.walletsAvailable.set(len)
})
})

if (walletsInUse[wallet.address]) {
delete walletsInUse[wallet.address]
}
}
}
}
Expand Down
50 changes: 45 additions & 5 deletions src/store/createRedisStore.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import {
type UserOperationInfo
} from "@alto/types"

const outstandingQueueName = "outstanding-mempool"
const outstandingQueueName = "outstanding-mempool-v2"

const createQueue = <T>(url: string, queueName: string) => {
let client: Redis
Expand Down Expand Up @@ -60,6 +60,10 @@ export const createRedisStore = ({
}> => {
const { redisMempoolUrl } = config

const logger = config.getLogger({
module: "redisStore"
})

if (!redisMempoolUrl) {
throw new Error("Redis mempool URL is not set")
}
Expand Down Expand Up @@ -114,6 +118,15 @@ export const createRedisStore = ({
return () => clearInterval(interval)
},
async addOutstanding(op) {
logger.debug(
{ userOpHash: op.userOperationHash, store: "outstanding" },
"added user op to mempool"
)
metrics.userOperationsInMempool
.labels({
status: "outstanding"
})
.inc()
await this.outstandingQueue.add(op)
},
async addProcessing(op) {
Expand All @@ -125,12 +138,28 @@ export const createRedisStore = ({
async removeOutstanding(userOpHash) {
const jobs = await this.outstandingQueue.getWaiting()

const job = jobs.find(
(job) => job.data.userOperationHash === userOpHash
)
const job = jobs.find((job) => {
const parsedData = userOperationInfoSchema.parse(job.data)

return parsedData.userOperationHash === userOpHash
})

if (job) {
await job.remove()
logger.debug(
{ userOpHash, store: "outstanding" },
"removed user op from mempool"
)
metrics.userOperationsInMempool
.labels({
status: "outstanding"
})
.dec()
} else {
logger.warn(
{ userOpHash, store: "outstanding" },
"tried to remove non-existent user op from mempool"
)
}
},
async removeProcessing(userOpHash) {
Expand All @@ -142,7 +171,17 @@ export const createRedisStore = ({
async dumpOutstanding() {
const awaitingJobs = await this.outstandingQueue.getWaiting()

return awaitingJobs.map((job) => job.data)
logger.trace(
{
store: "outstanding",
length: awaitingJobs.length
},
"dumping mempool"
)

return awaitingJobs.map((job) => {
return userOperationInfoSchema.parse(job.data)
})
},
dumpProcessing() {
return this.memoryStore.dumpProcessing()
Expand All @@ -152,6 +191,7 @@ export const createRedisStore = ({
},
async clear(from) {
if (from === "outstanding") {
logger.debug({ store: from }, "clearing mempool")
await this.outstandingQueue.clean(0, "active")
} else {
await this.memoryStore.clear(from)
Expand Down

0 comments on commit c4cd165

Please sign in to comment.