Skip to content

Commit

Permalink
🐛 Cleaner badge color + mutex on interactions
Browse files Browse the repository at this point in the history
  • Loading branch information
KONFeature committed Sep 29, 2024
1 parent 732e4a9 commit 66372a8
Show file tree
Hide file tree
Showing 8 changed files with 210 additions and 171 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ export function SendInteraction() {
<Button
onClick={() =>
sendInteraction({
interaction: PressInteractionEncoder.openArticle({
interaction: PressInteractionEncoder.readArticle({
articleId: generatePrivateKey(),
}),
productId:
Expand Down
124 changes: 65 additions & 59 deletions packages/backend-elysia/src/domain/business/oracle/jobs/updateOrale.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import {
addresses,
purchaseOracleAbi,
} from "@frak-labs/app-essentials/blockchain";
import { Mutex } from "async-mutex";
import { and, eq, inArray, isNotNull, isNull } from "drizzle-orm";
import { type Client, type Hex, type LocalAccount, encodePacked } from "viem";
import {
Expand All @@ -18,6 +19,8 @@ import { productOracleTable, purchaseStatusTable } from "../../db/schema";
import type { BusinessOracleContextApp } from "../context";
import type { MerkleTreeRepository } from "../repositories/MerkleTreeRepository";

const oracleMutex = new Mutex();

export const updateMerkleRootJob = (app: BusinessOracleContextApp) =>
app.use(
cron({
Expand All @@ -26,70 +29,73 @@ export const updateMerkleRootJob = (app: BusinessOracleContextApp) =>
protect: true,
catch: true,
interval: 30,
run: async () => {
// Extract some stuff from the app
const {
businessDb,
merkleRepository,
adminWalletsRepository,
client,
} = app.decorator;
run: () =>
oracleMutex.runExclusive(async () => {
// Extract some stuff from the app
const {
businessDb,
merkleRepository,
adminWalletsRepository,
client,
} = app.decorator;

// Get some unsynced products
const notSyncedProductIds = await businessDb
.select({
productId: productOracleTable.productId,
})
.from(productOracleTable)
.where(
and(
eq(productOracleTable.synced, false),
isNotNull(productOracleTable.merkleRoot)
)
// Get some unsynced products
const notSyncedProductIds = await businessDb
.select({
productId: productOracleTable.productId,
})
.from(productOracleTable)
.where(
and(
eq(productOracleTable.synced, false),
isNotNull(productOracleTable.merkleRoot)
)
);
log.debug(
`${notSyncedProductIds.length} products are not synced`
);
log.debug(
`${notSyncedProductIds.length} products are not synced`
);

// Update the empty leafs
const updatedOracleIds = await updateEmptyLeafs({
businessDb,
});
if (
updatedOracleIds.size === 0 &&
notSyncedProductIds.length === 0
) {
log.debug("No oracle to update");
return;
}
// Update the empty leafs
const updatedOracleIds = await updateEmptyLeafs({
businessDb,
});
if (
updatedOracleIds.size === 0 &&
notSyncedProductIds.length === 0
) {
log.debug("No oracle to update");
return;
}

// Invalidate the merkle tree
const productIds = await invalidateOracleTree({
oracleIds: updatedOracleIds,
businessDb,
merkleRepository,
});
log.debug(
`Invalidating oracle for ${productIds.length} products`
);
// Invalidate the merkle tree
const productIds = await invalidateOracleTree({
oracleIds: updatedOracleIds,
businessDb,
merkleRepository,
});
log.debug(
`Invalidating oracle for ${productIds.length} products`
);

const finalProductIds = new Set(
productIds.concat(
notSyncedProductIds.map((product) => product.productId)
)
);
log.debug(
`Will update ${finalProductIds.size} products merkle tree`
);
// Then update each product ids merkle root
await updateProductsMerkleRoot({
productIds: [...finalProductIds],
businessDb,
merkleRepository,
adminRepository: adminWalletsRepository,
client,
});
},
const finalProductIds = new Set(
productIds.concat(
notSyncedProductIds.map(
(product) => product.productId
)
)
);
log.debug(
`Will update ${finalProductIds.size} products merkle tree`
);
// Then update each product ids merkle root
await updateProductsMerkleRoot({
productIds: [...finalProductIds],
businessDb,
merkleRepository,
adminRepository: adminWalletsRepository,
client,
});
}),
})
);

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { log } from "@backend-common";
import cron, { Patterns } from "@elysiajs/cron";
import { Mutex } from "async-mutex";
import { and, eq, inArray } from "drizzle-orm";
import {
pendingInteractionsTable,
Expand All @@ -10,6 +11,8 @@ import type { InteractionDiamondRepository } from "../repositories/InteractionDi
import type { InteractionSignerRepository } from "../repositories/InteractionSignerRepository";
import type { PreparedInteraction } from "../types/interactions";

const executionMutex = new Mutex();

export const executeInteractionJob = (app: InteractionsContextApp) =>
app.use(
cron({
Expand All @@ -18,39 +21,42 @@ export const executeInteractionJob = (app: InteractionsContextApp) =>
protect: true,
catch: true,
interval: 60,
run: async () => {
// Get some stuff from the app
const {
interactionsDb,
interactionDiamondRepository,
interactionSignerRepository,
} = app.decorator;
run: () =>
executionMutex.runExclusive(async () => {
// Get some stuff from the app
const {
interactionsDb,
interactionDiamondRepository,
interactionSignerRepository,
} = app.decorator;

// Get interactions to simulate
const interactions = await getInteractionsToExecute({
interactionsDb,
});
if (interactions.length === 0) {
log.debug("No interactions to execute");
return;
}
log.debug(`Will execute ${interactions.length} interactions`);
// Get interactions to simulate
const interactions = await getInteractionsToExecute({
interactionsDb,
});
if (interactions.length === 0) {
log.debug("No interactions to execute");
return;
}
log.debug(
`Will execute ${interactions.length} interactions`
);

// Execute them
const txHash = await executeInteractions({
interactions,
interactionsDb,
interactionDiamondRepository,
interactionSignerRepository,
});
// Execute them
const txHash = await executeInteractions({
interactions,
interactionsDb,
interactionDiamondRepository,
interactionSignerRepository,
});

log.info(
{
txHash,
},
`Executed ${interactions.length} interactions`
);
},
log.info(
{
txHash,
},
`Executed ${interactions.length} interactions`
);
}),
})
);

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { log } from "@backend-common";
import cron, { Patterns } from "@elysiajs/cron";
import { PurchaseInteractionEncoder } from "@frak-labs/nexus-sdk/interactions";
import { Mutex } from "async-mutex";
import { eq } from "drizzle-orm";
import { Elysia } from "elysia";
import { PurchaseProofService } from "../../../business/oracle/services/proofService";
Expand All @@ -16,6 +17,8 @@ const outerPurchaseTracker = new Elysia({ name: "Job.OuterPurchaseTracker" })

type OuterPurchaseTrackerApp = typeof outerPurchaseTracker;

const trackerMutex = new Mutex();

const innerPurchaseTrackerJob = (app: OuterPurchaseTrackerApp) =>
app.use(
cron({
Expand All @@ -24,57 +27,64 @@ const innerPurchaseTrackerJob = (app: OuterPurchaseTrackerApp) =>
protect: true,
catch: true,
interval: 60,
run: async () => {
// Get stuff from the app
const { interactionsDb, getPurchaseProof } = app.decorator;

// Get all the currents tracker (max 50 at the time)
const trackers = await interactionsDb
.select()
.from(interactionsPurchaseTrackerTable)
.where(eq(interactionsPurchaseTrackerTable.pushed, false))
.limit(50);
run: () =>
trackerMutex.runExclusive(async () => {
// Get stuff from the app
const { interactionsDb, getPurchaseProof } = app.decorator;

// For each tracker, try to get the proof, and if done, push the interactions
for (const tracker of trackers) {
const result = await getPurchaseProof({
token: tracker.token,
externalId: tracker.externalPurchaseId,
});
if (result.status !== "success") {
log.debug(
{ result, tracker },
"Proof not available yet for tracker"
);
continue;
}
// Get all the currents tracker (max 50 at the time)
const trackers = await interactionsDb
.select()
.from(interactionsPurchaseTrackerTable)
.where(
eq(interactionsPurchaseTrackerTable.pushed, false)
)
.limit(50);

// If all good, build the interaction and push it
const interaction =
PurchaseInteractionEncoder.completedPurchase({
purchaseId: result.purchase.purchaseId,
proof: result.proof,
// For each tracker, try to get the proof, and if done, push the interactions
for (const tracker of trackers) {
const result = await getPurchaseProof({
token: tracker.token,
externalId: tracker.externalPurchaseId,
});
if (result.status !== "success") {
log.debug(
{ result, tracker },
"Proof not available yet for tracker"
);
continue;
}

// Insert it and mark this interaction as pushed
await interactionsDb
.insert(pendingInteractionsTable)
.values({
wallet: tracker.wallet,
productId: result.oracle.productId,
typeDenominator: interaction.handlerTypeDenominator,
interactionData: interaction.interactionData,
status: "pending",
})
.onConflictDoNothing();
await interactionsDb
.update(interactionsPurchaseTrackerTable)
.set({ pushed: true })
.where(
eq(interactionsPurchaseTrackerTable.id, tracker.id)
);
}
},
// If all good, build the interaction and push it
const interaction =
PurchaseInteractionEncoder.completedPurchase({
purchaseId: result.purchase.purchaseId,
proof: result.proof,
});

// Insert it and mark this interaction as pushed
await interactionsDb
.insert(pendingInteractionsTable)
.values({
wallet: tracker.wallet,
productId: result.oracle.productId,
typeDenominator:
interaction.handlerTypeDenominator,
interactionData: interaction.interactionData,
status: "pending",
})
.onConflictDoNothing();
await interactionsDb
.update(interactionsPurchaseTrackerTable)
.set({ pushed: true })
.where(
eq(
interactionsPurchaseTrackerTable.id,
tracker.id
)
);
}
}),
})
);

Expand Down
Loading

0 comments on commit 66372a8

Please sign in to comment.