Skip to content

Commit

Permalink
♻️ Lock interaction during simulation
Browse files Browse the repository at this point in the history
  • Loading branch information
KONFeature committed Oct 8, 2024
1 parent 149f794 commit 5cf59af
Showing 1 changed file with 78 additions and 37 deletions.
115 changes: 78 additions & 37 deletions packages/backend-elysia/src/domain/interactions/jobs/simulate.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { log } from "@backend-common";
import { mutexCron } from "@backend-utils";
import { Patterns } from "@elysiajs/cron";
import { and, eq, lt, or, sql } from "drizzle-orm";
import { and, eq, inArray } from "drizzle-orm";
import type { Address } from "viem";
import type { InteractionsContextApp, InteractionsDb } from "../context";
import { pendingInteractionsTable } from "../db/schema";
Expand All @@ -28,7 +28,7 @@ export const simulateInteractionJob = (app: InteractionsContextApp) =>
} = app.decorator;

// Get interactions to simulate
const interactions = await getInteractionsToSimulate({
const interactions = await getAndLockInteractionsToSimulate({
interactionsDb,
});
if (interactions.length === 0) {
Expand All @@ -39,23 +39,32 @@ export const simulateInteractionJob = (app: InteractionsContextApp) =>
`Got ${interactions.length} interactions to simulate`
);

// Perform the simulation and update the interactions
const hasSuccessInteractions =
await simulateAndUpdateInteractions({
interactions,
interactionsDb,
interactionDiamondRepository,
walletSessionRepository,
});
try {
// Perform the simulation and update the interactions
const hasSuccessInteractions =
await simulateAndUpdateInteractions({
interactions,
interactionsDb,
interactionDiamondRepository,
walletSessionRepository,
});

log.debug("Simulated interactions", {
interactions: interactions.length,
hasSuccessInteractions,
});
log.debug("Simulated interactions", {
interactions: interactions.length,
hasSuccessInteractions,
});

// Trigger the execution job
const store = app.store as ExecuteInteractionAppJob["store"];
await store.cron.executeInteraction.trigger();
// Trigger the execution job
const store =
app.store as ExecuteInteractionAppJob["store"];
await store.cron.executeInteraction.trigger();
} finally {
// Unlock the interactions
await unlockInteractions({
interactionsDb,
interactions,
});
}
},
})
);
Expand All @@ -68,31 +77,63 @@ export type SimulateInteractionAppJob = ReturnType<
* Get list of interactions to simulate
* @param interactionsDb
*/
async function getInteractionsToSimulate({
async function getAndLockInteractionsToSimulate({
interactionsDb,
}: { interactionsDb: InteractionsDb }) {
// Create a subquery to pre-select interactions that are pending and not locked
const preFilteredInteractions = interactionsDb
.select()
.from(pendingInteractionsTable)
.where(
and(
eq(pendingInteractionsTable.status, "pending"),
eq(pendingInteractionsTable.locked, false)
)
)
.as("preFilteredInteractions");

const fiveMinutesAgo = new Date(Date.now() - 5 * 60 * 1000);
// Run the selection within a transaction to ensure proper lockin
return await interactionsDb.transaction(async (trx) => {
// Get the interactions
const interactions = await interactionsDb
.select()
.from(pendingInteractionsTable)
.where(
and(
eq(pendingInteractionsTable.status, "pending"),
eq(pendingInteractionsTable.locked, false)
)
);

// Main query using the subquery to apply additional conditions
return await interactionsDb
.select()
.from(preFilteredInteractions)
// If none match our criteria early exit with empty array
const hasInteractions5MinOld = interactions.some((interaction) => {
return interaction.createdAt < fiveMinutesAgo;
});
if (interactions.length < 10 || !hasInteractions5MinOld) {
return [];
}

// Lock them
await trx
.update(pendingInteractionsTable)
.set({ locked: true })
.where(
inArray(
pendingInteractionsTable.id,
interactions.map((i) => i.id)
)
);

return interactions;
});
}

/**
* Unlock interactions post simulation
*/
async function unlockInteractions({
interactionsDb,
interactions,
}: {
interactionsDb: InteractionsDb;
interactions: (typeof pendingInteractionsTable.$inferSelect)[];
}) {
await interactionsDb
.update(pendingInteractionsTable)
.set({ locked: false })
.where(
or(
lt(preFilteredInteractions.createdAt, fiveMinutesAgo),
sql`(SELECT COUNT(*) FROM ${preFilteredInteractions}) > 10`
inArray(
pendingInteractionsTable.id,
interactions.map((i) => i.id)
)
);
}
Expand Down

0 comments on commit 5cf59af

Please sign in to comment.