Skip to content

Commit

Permalink
Feat/pg sink full streaming fast (#759)
Browse files Browse the repository at this point in the history
* wip

* Add substream and fix plugin issue

* add spl-utils

* fix test
  • Loading branch information
bryzettler authored Dec 17, 2024
1 parent 27076bf commit 0398c7b
Show file tree
Hide file tree
Showing 10 changed files with 601 additions and 459 deletions.
9 changes: 5 additions & 4 deletions packages/account-postgres-sink-service/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,16 @@
"dev": "ts-node-dev --inspect --respawn --project tsconfig.cjs.json src/server.ts"
},
"dependencies": {
"@bufbuild/protobuf": "^1.7.2",
"@connectrpc/connect": "^1.3.0",
"@connectrpc/connect-node": "^1.3.0",
"@bufbuild/protobuf": "^1.10.0",
"@connectrpc/connect": "^1.4.0",
"@connectrpc/connect-node": "^1.4.0",
"@coral-xyz/anchor": "^0.28.0",
"@fastify/cors": "^8.1.1",
"@helium/account-fetch-cache": "^0.9.18",
"@helium/spl-utils": "^0.9.18",
"@metaplex-foundation/mpl-token-metadata": "^2.10.0",
"@solana/web3.js": "^1.78.8",
"@substreams/core": "^0.15.1",
"@substreams/core": "^0.16.0",
"@triton-one/yellowstone-grpc": "^0.4.0",
"async-retry": "^1.3.3",
"aws-sdk": "^2.1344.0",
Expand Down
4 changes: 3 additions & 1 deletion packages/account-postgres-sink-service/src/env.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@ export const YELLOWSTONE_URL =
export const USE_HELIUS_WEBHOOK = getEnvBoolean("USE_HELIUS_WEBHOOK");
export const HELIUS_AUTH_SECRET = process.env.HELIUS_AUTH_SECRET;

export const USE_SUBSTREAMS = getEnvBoolean("USE_SUBSTREAMS");
export const USE_SUBSTREAM = getEnvBoolean("USE_SUBSTREAM");
export const SUBSTREAM_API_KEY = process.env.SUBSTREAM_API_KEY;
export const SUBSTREAM_URL = process.env.SUBSTREAM_URL;
export const SUBSTREAM = process.env.SUBSTREAM;

export const USE_KAFKA = getEnvBoolean("USE_KAFKA");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import BN from "bn.js";
import { cellToLatLng } from "h3-js";
import { camelize } from "inflection";
import _omit from "lodash/omit";
import { DECIMAL, DataTypes, Model, QueryTypes } from "sequelize";
import { DataTypes, Model, QueryTypes } from "sequelize";
import { IPlugin } from "../types";
import { database } from "../utils/database";
import { MapboxService } from "../utils/mapboxService";
Expand All @@ -11,7 +11,7 @@ const parseH3BNLocation = (location: BN) =>
cellToLatLng(location.toString("hex"));

export class ReverseGeoCache extends Model {
declare h3: string;
declare location: number;
declare street: string;
declare city: string;
declare state: string;
Expand All @@ -20,18 +20,19 @@ export class ReverseGeoCache extends Model {
declare lng: number;
declare raw: Object;
}

ReverseGeoCache.init(
{
location: {
type: DECIMAL,
type: DataTypes.DECIMAL,
primaryKey: true,
},
street: DataTypes.STRING,
lat: DataTypes.DECIMAL(8, 6),
long: DataTypes.DECIMAL(9, 6),
city: DataTypes.STRING,
state: DataTypes.STRING,
country: DataTypes.STRING,
lat: DataTypes.DECIMAL(8, 6),
long: DataTypes.DECIMAL(9, 6),
raw: DataTypes.JSONB,
},
{
Expand All @@ -42,6 +43,7 @@ ReverseGeoCache.init(
timestamps: true,
}
);

const locationFetchCache: { [location: string]: Promise<ReverseGeoCache> } = {};
export const ExtractHexLocationPlugin = ((): IPlugin => {
const name = "ExtractHexLocation";
Expand All @@ -54,6 +56,7 @@ export const ExtractHexLocationPlugin = ((): IPlugin => {
"lat",
"long",
];

const existingColumns = (
await database.query(
`
Expand All @@ -68,13 +71,13 @@ export const ExtractHexLocationPlugin = ((): IPlugin => {
const columns = Object.keys(ReverseGeoCache.getAttributes()).map((att) =>
camelize(att, true)
);

if (
!existingColumns.length ||
!columns.every((col) => existingColumns.includes(col))
) {
ReverseGeoCache.sync({ alter: true });
await ReverseGeoCache.sync({ alter: true });
}
await ReverseGeoCache.sync({ alter: true });

const addFields = (schema: { [key: string]: any }, accountName: string) => {
schema[accountName] = {
Expand Down
Loading

0 comments on commit 0398c7b

Please sign in to comment.