From 6a130dce55b83cad0ca8bfc36358424f0bf30fff Mon Sep 17 00:00:00 2001 From: Aditya Anand M C Date: Thu, 5 Sep 2024 21:59:00 +0530 Subject: [PATCH] feat: store ipfs data in db --- src/config.ts | 8 ++ src/database/changeset.ts | 5 + src/database/index.ts | 194 +++++++++++++++++++++++++++++++------- src/database/migrate.ts | 10 ++ src/database/schema.ts | 10 ++ src/http/api/v1/status.ts | 3 +- src/index.ts | 29 +++++- 7 files changed, 223 insertions(+), 36 deletions(-) diff --git a/src/config.ts b/src/config.ts index 0625a969..3e4b23be 100644 --- a/src/config.ts +++ b/src/config.ts @@ -21,6 +21,7 @@ type CoingeckoSupportedChainId = | 1088; const CHAIN_DATA_VERSION = "81"; +const IPFS_DATA_VERSION = "1"; export type Token = { code: string; @@ -1829,6 +1830,8 @@ export type Config = { readOnlyDatabaseUrl: string; dataVersion: string; databaseSchemaName: string; + ipfsDataVersion: string; + ipfsDatabaseSchemaName: string; hostname: string; pinoPretty: boolean; deploymentEnvironment: "local" | "development" | "staging" | "production"; @@ -1989,6 +1992,9 @@ export function getConfig(): Config { const dataVersion = CHAIN_DATA_VERSION; const databaseSchemaName = `chain_data_${dataVersion}`; + const ipfsDataVersion = IPFS_DATA_VERSION; + const ipfsDatabaseSchemaName = `ipfs_data_${ipfsDataVersion}`; + const dropDb = z.boolean().default(false).parse(args["drop-db"]); const removeCache = z.boolean().default(false).parse(args["rm-cache"]); @@ -2041,6 +2047,8 @@ export function getConfig(): Config { removeCache, dataVersion, databaseSchemaName, + ipfsDataVersion, + ipfsDatabaseSchemaName, httpServerWaitForSync, httpServerEnabled, indexerEnabled, diff --git a/src/database/changeset.ts b/src/database/changeset.ts index 5eb7f12e..c53d2a14 100644 --- a/src/database/changeset.ts +++ b/src/database/changeset.ts @@ -16,6 +16,7 @@ import { NewPrice, NewLegacyProject, NewApplicationPayout, + NewIpfsData, } from "./schema.js"; export type DataChange = @@ -140,4 +141,8 @@ export type DataChange = | { type: "InsertApplicationPayout"; payout: NewApplicationPayout; + } + | { + type: "InsertIpfsData"; + ipfs: NewIpfsData; }; diff --git a/src/database/index.ts b/src/database/index.ts index 2b749ff2..2f1016b3 100644 --- a/src/database/index.ts +++ b/src/database/index.ts @@ -14,8 +14,9 @@ import { NewDonation, LegacyProjectTable, ApplicationPayout, + IpfsDataTable, } from "./schema.js"; -import { migrate } from "./migrate.js"; +import { migrate, migrateDataFetcher } from "./migrate.js"; import { encodeJsonWithBigInts } from "../utils/index.js"; import type { DataChange } from "./changeset.js"; import { Logger } from "pino"; @@ -37,6 +38,7 @@ interface Tables { prices: PriceTable; legacyProjects: LegacyProjectTable; applicationsPayouts: ApplicationPayout; + ipfsData: IpfsDataTable; } type KyselyDb = Kysely; @@ -53,13 +55,15 @@ export class Database { #statsTimeout: ReturnType | null = null; #logger: Logger; - readonly databaseSchemaName: string; + readonly chainDataSchemaName: string; + readonly ipfsDataSchemaName: string; constructor(options: { statsUpdaterEnabled: boolean; logger: Logger; connectionPool: Pool; - schemaName: string; + chainDataSchemaName: string; + ipfsDataSchemaName: string; }) { const dialect = new PostgresDialect({ pool: options.connectionPool, @@ -72,10 +76,11 @@ export class Database { plugins: [new CamelCasePlugin()], }); - this.#db = this.#db.withSchema(options.schemaName); + // Initialize schema names + this.chainDataSchemaName = options.chainDataSchemaName; + this.ipfsDataSchemaName = options.ipfsDataSchemaName; this.#logger = options.logger; - this.databaseSchemaName = options.schemaName; this.scheduleDonationQueueFlush(); @@ -87,21 +92,40 @@ export class Database { async acquireWriteLock() { const client = await this.#connectionPool.connect(); - // generate lock id based on schema - const lockId = this.databaseSchemaName.split("").reduce((acc, char) => { - return acc + char.charCodeAt(0); - }, 0); + // Helper function to generate lock ID based on schema name + const generateLockId = (schemaName: string): number => { + return schemaName.split("").reduce((acc, char) => { + return acc + char.charCodeAt(0); + }, 0); + }; - try { + // Helper function to acquire a lock for a specific schema + const acquireLockForSchema = async (lockId: number) => { const result = await client.query( `SELECT pg_try_advisory_lock(${lockId}) as lock` ); - // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access - if (result.rows[0].lock === true) { + return result.rows[0].lock === true; + }; + + // Helper function to release a lock for a specific schema + const releaseLockForSchema = async (lockId: number) => { + await client.query(`SELECT pg_advisory_unlock(${lockId})`); + }; + + // Acquire locks for both schemas + const chainDataLockId = generateLockId(this.chainDataSchemaName); + const ipfsDataLockId = generateLockId(this.ipfsDataSchemaName); + + try { + const chainDataLockAcquired = await acquireLockForSchema(chainDataLockId); + const ipfsDataLockAcquired = await acquireLockForSchema(ipfsDataLockId); + + if (chainDataLockAcquired && ipfsDataLockAcquired) { return { release: async () => { - await client.query(`SELECT pg_advisory_unlock(${lockId})`); + await releaseLockForSchema(chainDataLockId); + await releaseLockForSchema(ipfsDataLockId); client.release(); }, client, @@ -132,12 +156,12 @@ export class Database { } private async updateStats() { - const donationsTableRef = `"${this.databaseSchemaName}"."donations"`; + const donationsTableRef = `"${this.chainDataSchemaName}"."donations"`; await sql .raw( ` - UPDATE "${this.databaseSchemaName}"."rounds" AS r + UPDATE "${this.chainDataSchemaName}"."rounds" AS r SET total_amount_donated_in_usd = d.total_amount, total_donations_count = d.donation_count, @@ -160,7 +184,7 @@ export class Database { await sql .raw( ` - UPDATE "${this.databaseSchemaName}"."applications" AS a + UPDATE "${this.chainDataSchemaName}"."applications" AS a SET total_amount_donated_in_usd = d.total_amount, total_donations_count = d.donation_count, @@ -225,36 +249,64 @@ export class Database { async dropSchemaIfExists() { await this.#db.schema - .dropSchema(this.databaseSchemaName) + .withSchema(this.chainDataSchemaName) + .dropSchema(this.chainDataSchemaName) .ifExists() .cascade() .execute(); + + // TODO: check if we should drop ipfsDataSchemaName as well + // await this.#db + // .schema + // .withSchema(this.ipfsDataSchemaName) + // .dropSchema(this.ipfsDataSchemaName) + // .ifExists() + // .cascade() + // .execute(); } - async createSchemaIfNotExists(logger: Logger) { + async createSchemaIfNotExists( + schemaName: string, + migrateFn: (tx: any, schemaName: string) => Promise, + logger: Logger + ) { const exists = await sql<{ exists: boolean }>` - SELECT EXISTS ( - SELECT 1 FROM information_schema.schemata - WHERE schema_name = ${this.databaseSchemaName} - )`.execute(this.#db); + SELECT EXISTS ( + SELECT 1 FROM information_schema.schemata + WHERE schema_name = ${schemaName} + )`.execute(this.#db.withSchema(schemaName)); if (exists.rows.length > 0 && exists.rows[0].exists) { logger.info({ - msg: `schema "${this.databaseSchemaName}" exists, skipping creation`, + msg: `schema "${schemaName}" exists, skipping creation`, }); - return; } logger.info({ - msg: `schema "${this.databaseSchemaName}" does not exist, creating schema`, + msg: `schema "${schemaName}" does not exist, creating schema`, }); - await this.#db.transaction().execute(async (tx) => { - await tx.schema.createSchema(this.databaseSchemaName).execute(); + await this.#db + .withSchema(schemaName) + .transaction() + .execute(async (tx) => { + await tx.schema.createSchema(schemaName).execute(); + await migrateFn(tx, schemaName); + }); + } - await migrate(tx, this.databaseSchemaName); - }); + async createAllSchemas(logger: Logger) { + await this.createSchemaIfNotExists( + this.chainDataSchemaName, + migrate, + logger + ); + await this.createSchemaIfNotExists( + this.ipfsDataSchemaName, + migrateDataFetcher, + logger + ); } async applyChanges(changes: DataChange[]): Promise { @@ -267,6 +319,7 @@ export class Database { switch (change.type) { case "InsertPendingProjectRole": { await this.#db + .withSchema(this.chainDataSchemaName) .insertInto("pendingProjectRoles") .values(change.pendingProjectRole) .execute(); @@ -275,6 +328,7 @@ export class Database { case "DeletePendingProjectRoles": { await this.#db + .withSchema(this.chainDataSchemaName) .deleteFrom("pendingProjectRoles") .where("id", "in", change.ids) .execute(); @@ -283,6 +337,7 @@ export class Database { case "InsertPendingRoundRole": { await this.#db + .withSchema(this.chainDataSchemaName) .insertInto("pendingRoundRoles") .values(change.pendingRoundRole) .execute(); @@ -291,6 +346,7 @@ export class Database { case "DeletePendingRoundRoles": { await this.#db + .withSchema(this.chainDataSchemaName) .deleteFrom("pendingRoundRoles") .where("id", "in", change.ids) .execute(); @@ -298,12 +354,17 @@ export class Database { } case "InsertProject": { - await this.#db.insertInto("projects").values(change.project).execute(); + await this.#db + .withSchema(this.chainDataSchemaName) + .insertInto("projects") + .values(change.project) + .execute(); break; } case "UpdateProject": { await this.#db + .withSchema(this.chainDataSchemaName) .updateTable("projects") .set(change.project) .where("id", "=", change.projectId) @@ -314,6 +375,7 @@ export class Database { case "InsertProjectRole": { await this.#db + .withSchema(this.chainDataSchemaName) .insertInto("projectRoles") .values(change.projectRole) .execute(); @@ -322,6 +384,7 @@ export class Database { case "DeleteAllProjectRolesByRole": { await this.#db + .withSchema(this.chainDataSchemaName) .deleteFrom("projectRoles") .where("chainId", "=", change.projectRole.chainId) .where("projectId", "=", change.projectRole.projectId) @@ -332,6 +395,7 @@ export class Database { case "DeleteAllProjectRolesByRoleAndAddress": { await this.#db + .withSchema(this.chainDataSchemaName) .deleteFrom("projectRoles") .where("chainId", "=", change.projectRole.chainId) .where("projectId", "=", change.projectRole.projectId) @@ -342,12 +406,17 @@ export class Database { } case "InsertRound": { - await this.#db.insertInto("rounds").values(change.round).execute(); + await this.#db + .withSchema(this.chainDataSchemaName) + .insertInto("rounds") + .values(change.round) + .execute(); break; } case "UpdateRound": { await this.#db + .withSchema(this.chainDataSchemaName) .updateTable("rounds") .set(change.round) .where("chainId", "=", change.chainId) @@ -358,6 +427,7 @@ export class Database { case "IncrementRoundFundedAmount": { await this.#db + .withSchema(this.chainDataSchemaName) .updateTable("rounds") .set((eb) => ({ fundedAmount: eb("fundedAmount", "+", change.fundedAmount), @@ -375,6 +445,7 @@ export class Database { case "UpdateRoundByStrategyAddress": { await this.#db + .withSchema(this.chainDataSchemaName) .updateTable("rounds") .set(change.round) .where("chainId", "=", change.chainId) @@ -385,6 +456,7 @@ export class Database { case "InsertRoundRole": { await this.#db + .withSchema(this.chainDataSchemaName) .insertInto("roundRoles") .values(change.roundRole) .execute(); @@ -393,6 +465,7 @@ export class Database { case "DeleteAllRoundRolesByRoleAndAddress": { await this.#db + .withSchema(this.chainDataSchemaName) .deleteFrom("roundRoles") .where("chainId", "=", change.roundRole.chainId) .where("roundId", "=", change.roundRole.roundId) @@ -411,7 +484,11 @@ export class Database { }; } - await this.#db.insertInto("applications").values(application).execute(); + await this.#db + .withSchema(this.chainDataSchemaName) + .insertInto("applications") + .values(application) + .execute(); break; } @@ -425,6 +502,7 @@ export class Database { } await this.#db + .withSchema(this.chainDataSchemaName) .updateTable("applications") .set(application) .where("chainId", "=", change.chainId) @@ -441,6 +519,7 @@ export class Database { case "InsertManyDonations": { await this.#db + .withSchema(this.chainDataSchemaName) .insertInto("donations") .values(change.donations) .onConflict((c) => c.column("id").doNothing()) @@ -449,12 +528,17 @@ export class Database { } case "InsertManyPrices": { - await this.#db.insertInto("prices").values(change.prices).execute(); + await this.#db + .withSchema(this.chainDataSchemaName) + .insertInto("prices") + .values(change.prices) + .execute(); break; } case "IncrementRoundDonationStats": { await this.#db + .withSchema(this.chainDataSchemaName) .updateTable("rounds") .set((eb) => ({ totalAmountDonatedInUsd: eb( @@ -472,6 +556,7 @@ export class Database { case "IncrementRoundTotalDistributed": { await this.#db + .withSchema(this.chainDataSchemaName) .updateTable("rounds") .set((eb) => ({ totalDistributed: eb("totalDistributed", "+", change.amount), @@ -484,6 +569,7 @@ export class Database { case "IncrementApplicationDonationStats": { await this.#db + .withSchema(this.chainDataSchemaName) .updateTable("applications") .set((eb) => ({ totalAmountDonatedInUsd: eb( @@ -502,6 +588,7 @@ export class Database { case "NewLegacyProject": { await this.#db + .withSchema(this.chainDataSchemaName) .insertInto("legacyProjects") .values(change.legacyProject) .execute(); @@ -510,12 +597,22 @@ export class Database { case "InsertApplicationPayout": { await this.#db + .withSchema(this.chainDataSchemaName) .insertInto("applicationsPayouts") .values(change.payout) .execute(); break; } + case "InsertIpfsData": { + await this.#db + .withSchema(this.ipfsDataSchemaName) + .insertInto("ipfsData") + .values(change.ipfs) + .execute(); + break; + } + default: throw new Error(`Unknown changeset type`); } @@ -523,6 +620,7 @@ export class Database { async getPendingProjectRolesByRole(chainId: ChainId, role: string) { const pendingProjectRole = await this.#db + .withSchema(this.chainDataSchemaName) .selectFrom("pendingProjectRoles") .where("chainId", "=", chainId) .where("role", "=", role) @@ -534,6 +632,7 @@ export class Database { async getPendingRoundRolesByRole(chainId: ChainId, role: string) { const pendingRoundRole = await this.#db + .withSchema(this.chainDataSchemaName) .selectFrom("pendingRoundRoles") .where("chainId", "=", chainId) .where("role", "=", role) @@ -545,6 +644,7 @@ export class Database { async getProjectById(chainId: ChainId, projectId: string) { const project = await this.#db + .withSchema(this.chainDataSchemaName) .selectFrom("projects") .where("chainId", "=", chainId) .where("id", "=", projectId) @@ -556,6 +656,7 @@ export class Database { async getProjectByAnchor(chainId: ChainId, anchorAddress: Address) { const project = await this.#db + .withSchema(this.chainDataSchemaName) .selectFrom("projects") .where("chainId", "=", chainId) .where("anchorAddress", "=", anchorAddress) @@ -567,6 +668,7 @@ export class Database { async getRoundById(chainId: ChainId, roundId: string) { const round = await this.#db + .withSchema(this.chainDataSchemaName) .selectFrom("rounds") .where("chainId", "=", chainId) .where("id", "=", roundId) @@ -578,6 +680,7 @@ export class Database { async getRoundByStrategyAddress(chainId: ChainId, strategyAddress: Address) { const round = await this.#db + .withSchema(this.chainDataSchemaName) .selectFrom("rounds") .where("chainId", "=", chainId) .where("strategyAddress", "=", strategyAddress) @@ -593,6 +696,7 @@ export class Database { roleValue: string ) { const round = await this.#db + .withSchema(this.chainDataSchemaName) .selectFrom("rounds") .where("chainId", "=", chainId) .where(`${roleName}Role`, "=", roleValue) @@ -615,6 +719,7 @@ export class Database { } const round = await this.#db + .withSchema(this.chainDataSchemaName) .selectFrom("rounds") .where("chainId", "=", chainId) .where("id", "=", roundId) @@ -631,6 +736,7 @@ export class Database { async getAllChainRounds(chainId: ChainId) { const rounds = await this.#db + .withSchema(this.chainDataSchemaName) .selectFrom("rounds") .where("chainId", "=", chainId) .selectAll() @@ -641,6 +747,7 @@ export class Database { async getAllRoundApplications(chainId: ChainId, roundId: string) { return await this.#db + .withSchema(this.chainDataSchemaName) .selectFrom("applications") .where("chainId", "=", chainId) .where("roundId", "=", roundId) @@ -650,6 +757,7 @@ export class Database { async getAllRoundDonations(chainId: ChainId, roundId: string) { return await this.#db + .withSchema(this.chainDataSchemaName) .selectFrom("donations") .where("chainId", "=", chainId) .where("roundId", "=", roundId) @@ -663,6 +771,7 @@ export class Database { applicationId: string ) { const application = await this.#db + .withSchema(this.chainDataSchemaName) .selectFrom("applications") .where("chainId", "=", chainId) .where("roundId", "=", roundId) @@ -679,6 +788,7 @@ export class Database { projectId: string ) { const application = await this.#db + .withSchema(this.chainDataSchemaName) .selectFrom("applications") .where("chainId", "=", chainId) .where("roundId", "=", roundId) @@ -695,6 +805,7 @@ export class Database { anchorAddress: Address ) { const application = await this.#db + .withSchema(this.chainDataSchemaName) .selectFrom("applications") .where("chainId", "=", chainId) .where("roundId", "=", roundId) @@ -707,6 +818,7 @@ export class Database { async getLatestPriceTimestampForChain(chainId: ChainId) { const latestPriceTimestamp = await this.#db + .withSchema(this.chainDataSchemaName) .selectFrom("prices") .where("chainId", "=", chainId) .orderBy("timestamp", "desc") @@ -723,6 +835,7 @@ export class Database { blockNumber: bigint | "latest" ) { let priceQuery = this.#db + .withSchema(this.chainDataSchemaName) .selectFrom("prices") .where("chainId", "=", chainId) .where("tokenAddress", "=", tokenAddress) @@ -741,6 +854,7 @@ export class Database { async getAllChainPrices(chainId: ChainId) { return await this.#db + .withSchema(this.chainDataSchemaName) .selectFrom("prices") .where("chainId", "=", chainId) .orderBy("blockNumber", "asc") @@ -750,6 +864,7 @@ export class Database { async getAllChainProjects(chainId: ChainId) { return await this.#db + .withSchema(this.chainDataSchemaName) .selectFrom("projects") .where("chainId", "=", chainId) .selectAll() @@ -761,6 +876,7 @@ export class Database { donorAddress: Address ) { const donations = await this.#db + .withSchema(this.chainDataSchemaName) .selectFrom("donations") .where("donations.donorAddress", "=", donorAddress) .where("donations.chainId", "=", chainId) @@ -784,6 +900,7 @@ export class Database { async getV2ProjectIdByV1ProjectId(v1ProjectId: string) { const result = await this.#db + .withSchema(this.chainDataSchemaName) .selectFrom("legacyProjects") .where("v1ProjectId", "=", v1ProjectId) .select("v2ProjectId") @@ -791,4 +908,15 @@ export class Database { return result ?? null; } + + async getDataByCid(cId: string) { + const metadata = await this.#db + .withSchema(this.ipfsDataSchemaName) + .selectFrom("ipfsData") + .where("cid", "=", cId) + .selectAll() + .executeTakeFirst(); + + return metadata ?? null; + } } diff --git a/src/database/migrate.ts b/src/database/migrate.ts index c473d710..c59d6bd2 100644 --- a/src/database/migrate.ts +++ b/src/database/migrate.ts @@ -392,3 +392,13 @@ export async function migrate(db: Kysely, schemaName: string) { $$ language sql stable; `.execute(db); } + +export async function migrateDataFetcher(db: Kysely, schemaName: string) { + const schema = db.withSchema(schemaName).schema; + + await schema + .createTable("ipfs_data") + .addColumn("cid", "text") + .addColumn("data", "jsonb") + .execute(); +} diff --git a/src/database/schema.ts b/src/database/schema.ts index 06b9a906..8e50c133 100644 --- a/src/database/schema.ts +++ b/src/database/schema.ts @@ -125,6 +125,11 @@ export type ProjectTable = { projectType: ProjectType; }; +export type IpfsDataTable = { + cid: string; + data: unknown; +}; + export type Project = Selectable; export type NewProject = Insertable; export type PartialProject = Updateable; @@ -253,3 +258,8 @@ export type ApplicationPayout = { }; export type NewApplicationPayout = Insertable; + +export type NewIpfsData = { + cid: string; + data: unknown; +}; diff --git a/src/http/api/v1/status.ts b/src/http/api/v1/status.ts index e4ad3c08..be6910b1 100644 --- a/src/http/api/v1/status.ts +++ b/src/http/api/v1/status.ts @@ -9,7 +9,8 @@ export const createHandler = (config: HttpApiConfig): express.Router => { res.json({ hostname: config.hostname, buildTag: config.buildTag, - databaseSchema: config.db.databaseSchemaName, + chainDataSchemaName: config.db.chainDataSchemaName, + ipfsDataSchema: config.db.ipfsDataSchemaName, }); }); diff --git a/src/index.ts b/src/index.ts index 4ad25e91..a21237a7 100644 --- a/src/index.ts +++ b/src/index.ts @@ -151,7 +151,8 @@ async function main(): Promise { logger: baseLogger.child({ subsystem: "Database" }), statsUpdaterEnabled: config.indexerEnabled, connectionPool: databaseConnectionPool, - schemaName: config.databaseSchemaName, + chainDataSchemaName: config.databaseSchemaName, + ipfsDataSchemaName: config.ipfsDatabaseSchemaName, }); baseLogger.info({ @@ -248,7 +249,8 @@ async function main(): Promise { await db.dropSchemaIfExists(); } - await db.createSchemaIfNotExists(baseLogger); + console.log("Creating all schemas"); + await db.createAllSchemas(baseLogger); await subscriptionStore.init(); } @@ -465,6 +467,13 @@ async function catchupAndWatchChain( return undefined; } + // Check if data is already in the IPFS database + const ipfsData = await db.getDataByCid(cid); + if (ipfsData) { + chainLogger.info(`Found IPFS data in database for CID: ${cid}`); + return Promise.resolve(ipfsData.data as string as T); + } + // Fetch from a single IPFS gateway const fetchFromGateway = async (url: string): Promise => { try { @@ -510,6 +519,22 @@ async function catchupAndWatchChain( chainLogger.info( `Fetch successful from gateway: ${gateway} for CID: ${cid}` ); + + // Save to IpfsData table + try { + await db.applyChange({ + type: "InsertIpfsData", + ipfs: { + cid, + data: result, // TODO: check is JSON.parse is needed + }, + }); + } catch (err) { + chainLogger.error( + `Error saving IPFS data to database: ${String(err)}` + ); + } + return result; // Return the result if fetched successfully } else { chainLogger.warn(