Skip to content

Commit

Permalink
bug(#477): Ensure refreshed at is not changed on rent collection, wai…
Browse files Browse the repository at this point in the history
…t for invalidations (#478)
  • Loading branch information
ChewingGlass authored Nov 9, 2023
1 parent fe7f5a0 commit 4d631cd
Show file tree
Hide file tree
Showing 3 changed files with 144 additions and 59 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -75,14 +75,20 @@ export function handleAccountWebhook({
}
}
const model = sequelize.models[accName];
await model.upsert(
{
address: account.pubkey,
refreshed_at: now,
...sanitized,
},
{ transaction: t }
);
const value = await model.findByPk(account.pubkey);
const changed =
!value ||
Object.entries(sanitized).some(([k, v]) => v !== value.dataValues[k]);
if (changed) {
await model.upsert(
{
address: account.pubkey,
refreshed_at: now,
...sanitized,
},
{ transaction: t }
);
}
}

await t.commit();
Expand Down
2 changes: 1 addition & 1 deletion packages/account-postgres-sink-service/vehnt.sql
Original file line number Diff line number Diff line change
Expand Up @@ -117,4 +117,4 @@ SELECT
approx_fall_rate * 1000000000000 as approx_fall_rate,
approx_fall_rate - real_fall_rate as fall_rate_diff,
approx_ve_tokens - real_ve_tokens as ve_tokens_diff
FROM subdao_delegations;
FROM subdao_delegations;
179 changes: 129 additions & 50 deletions packages/entity-invalidator/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,38 +1,53 @@
import { decodeEntityKey } from "@helium/helium-entity-manager-sdk";
import AWS from "aws-sdk";
import { Op } from "sequelize";
import { v4 as uuidv4 } from 'uuid';
import { v4 as uuidv4 } from "uuid";
import { IotHotspotInfo, KeyToAsset, MobileHotspotInfo } from "./model";
// @ts-ignore
import { AWS_REGION, CLOUDFRONT_DISTRIBUTION, LOOKBACK_HOURS, DOMAIN } from "./env";
import {
AWS_REGION,
CLOUDFRONT_DISTRIBUTION,
LOOKBACK_HOURS,
DOMAIN,
} from "./env";
import { chunks } from "@helium/spl-utils";

// How long to wait to check invalidation status again
const INVALIDATION_WAIT = 10000;
// 30 minutes
const INVALIDATION_WAIT_LIMIT = 30 * 60 * 1000;

async function run() {
const date = new Date();
date.setHours(date.getHours() - LOOKBACK_HOURS);
AWS.config.update({ region: AWS_REGION });
const cloudfront = new AWS.CloudFront();

// Invalidate metadata-service routes:
// Invalidate metadata-service routes:
// - /v2/hotspots/pagination-metadata?subnetwork=iot
// - /v2/hotspots/pagination-metadata?subnetwork=mobile
// Or /v2/hotspot* if there is an error
try {
const modelMap: any = {
'iot': IotHotspotInfo,
'mobile': MobileHotspotInfo,
}
const subnetworks = ['iot', 'mobile'];
iot: IotHotspotInfo,
mobile: MobileHotspotInfo,
};
const subnetworks = ["iot", "mobile"];

// Fetch pagination data
const responsePromises = subnetworks.map((subnetwork) => {
return fetch(`https://${DOMAIN}/v2/hotspots/pagination-metadata?subnetwork=${subnetwork}`);
return fetch(
`https://${DOMAIN}/v2/hotspots/pagination-metadata?subnetwork=${subnetwork}`
);
});
const jsonPromises = await Promise.all(responsePromises);
const paginationMetadata = await Promise.all(jsonPromises.map((response) => response.json()));
const paginationMetadata = await Promise.all(
jsonPromises.map((response) => response.json())
);
console.log("Fetched pagination metadata for subnetworks");
console.log(paginationMetadata);

// Fetch counts of newly added hotspots
// Fetch counts of newly added hotspots
const totalCountPromises = subnetworks.map((subnetwork) => {
const whereClause = {
created_at: {
Expand All @@ -54,7 +69,7 @@ async function run() {
console.log("Fetched counts of newly added hotspots");
console.log(totalCounts);

// Prepare invalidation paths
// Prepare invalidation paths
const paths: string[] = [];
totalCounts.forEach((count, i) => {
const subnetwork = subnetworks[i];
Expand All @@ -72,39 +87,33 @@ async function run() {
console.log("Invalidation paths");
console.log(paths);

await cloudfront
.createInvalidation({
DistributionId: CLOUDFRONT_DISTRIBUTION,
InvalidationBatch: {
CallerReference: `${uuidv4()}`,
Paths: {
Quantity: paths.length,
Items: paths,
},
},
})
.promise();
await invalidateAndWait({
cloudfront,
DistributionId: CLOUDFRONT_DISTRIBUTION,
Paths: {
Quantity: paths.length,
Items: paths,
},
});
} catch (err) {
console.error("Granular /v2/hotspots invalidation failed, resorting to full invalidation");
console.error(
"Granular /v2/hotspots invalidation failed, resorting to full invalidation"
);
console.error(err);

await cloudfront
.createInvalidation({
DistributionId: CLOUDFRONT_DISTRIBUTION,
InvalidationBatch: {
CallerReference: `${uuidv4()}`,
Paths: {
Quantity: 1,
Items: ["/v2/hotspots*"],
},
},
})
.promise();
await invalidateAndWait({
cloudfront,
DistributionId: CLOUDFRONT_DISTRIBUTION,
Paths: {
Quantity: 1,
Items: ["/v2/hotspots*"],
},
});
}

// Invalidate metadata-service routes:
// - /v2/hotspot/:keyToAssetKey
// - /v1/:keyToAssetKey
// - /v1/:keyToAssetKey
// - /:eccCompact
const limit = 10000;
let lastId = null;
Expand Down Expand Up @@ -145,6 +154,7 @@ async function run() {
});
console.log(`Found ${totalCount} updated records`);
let totalProgress = 0;
const paths: string[] = [];

do {
if (lastId) {
Expand Down Expand Up @@ -177,25 +187,94 @@ async function run() {
});
});

const paths = entities.flatMap((entity) => getPaths(entity));
await cloudfront
.createInvalidation({
DistributionId: CLOUDFRONT_DISTRIBUTION,
InvalidationBatch: {
CallerReference: `${uuidv4()}`, // unique identifier for this invalidation batch
Paths: {
Quantity: paths.length,
Items: paths,
},
},
})
.promise();
paths.push(...entities.flatMap((entity) => getPaths(entity)));

lastId = entities[entities.length - 1].address;
totalProgress += entities.length;
console.log(`Processed ${totalProgress} / ${totalCount}`);
}
} while (entities.length === limit);

// Split the paths into batches of 3000
const batches = chunks(paths, 3000)

// Process each batch of invalidations
let i = 0;
for (const batch of batches) {
await invalidateAndWait({
cloudfront,
DistributionId: CLOUDFRONT_DISTRIBUTION,
Paths: {
Quantity: batch.length,
Items: batch,
},
})

console.log(`Invalidated ${i} / ${batches.length} batches`);
i++
}
}

function delay(ms) {
return new Promise((resolve) => setTimeout(resolve, ms));
}

async function invalidateAndWait({
cloudfront,
DistributionId,
Paths,
}: {
cloudfront: AWS.CloudFront,
DistributionId: string,
Paths: {
Quantity: number;
Items: string[]
}
}) {
const invalidationResponse = await cloudfront
.createInvalidation({
DistributionId,
InvalidationBatch: {
CallerReference: `${uuidv4()}`,
Paths,
},
})
.promise();

if (invalidationResponse?.Invalidation) {
const invalidationId = invalidationResponse.Invalidation.Id;

// Check the status of the invalidation batch periodically
let invalidationStatus = await getInvalidationStatus(
cloudfront,
invalidationId
);
let totalWait = 0;
while (invalidationStatus !== "Completed") {
console.log("Invalidation in progress. Waiting for completion...");
totalWait += INVALIDATION_WAIT
await delay(INVALIDATION_WAIT); // Wait for 10 seconds before checking the status again
invalidationStatus = await getInvalidationStatus(
cloudfront,
invalidationId
);

if (totalWait > INVALIDATION_WAIT_LIMIT) {
throw new Error("Exceeded invalidation wait limit")
}
}
}
}

async function getInvalidationStatus(cloudfront: AWS.CloudFront, invalidationId: string) {
const invalidationResponse = await cloudfront
.getInvalidation({
DistributionId: CLOUDFRONT_DISTRIBUTION,
Id: invalidationId,
})
.promise();

return invalidationResponse?.Invalidation?.Status;
}

function getPaths(entity: KeyToAsset): string[] {
Expand Down

0 comments on commit 4d631cd

Please sign in to comment.