Skip to content

Commit

Permalink
More load options - assets and throttleMs (#351)
Browse files Browse the repository at this point in the history
  • Loading branch information
filipzeta authored Feb 13, 2024
1 parent a1f0483 commit 17c04d9
Show file tree
Hide file tree
Showing 6 changed files with 78 additions and 28 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@
All notable changes to this project will be documented in this file.
Version changes are pinned to SDK releases.

## [1.20.5]

- More load options - assets and throttleMs. ([#351](https://github.com/zetamarkets/sdk/pull/351))

## [1.20.4]

- Add PNL to TradeEventV3 and PositionSize to ApplyFundingEvent. ([#344](https://github.com/zetamarkets/sdk/pull/344))
Expand Down
5 changes: 4 additions & 1 deletion examples/basic/basic-example.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,14 +51,17 @@ async function main() {
// Airdropping SOL.
await connection.requestAirdrop(wallet.publicKey, 100_000_000);

const loadExchangeConfig = types.defaultLoadExchangeConfig(
let loadExchangeConfig = types.defaultLoadExchangeConfig(
Network.DEVNET,
connection,
utils.defaultCommitment(),
0,
true
);

// Optionally add a throttle for startup load to prevent rate-limiting on free-tier RPCs
// loadExchangeConfig.throttleMs = 1000;

await fetch(`${SERVER_URL}/faucet/USDC`, {
method: "post",
body: JSON.stringify({
Expand Down
81 changes: 64 additions & 17 deletions src/exchange.ts
Original file line number Diff line number Diff line change
Expand Up @@ -371,7 +371,11 @@ export class Exchange {
if (this.isSetup) {
throw "Exchange already setup";
}
this._assets = assets.allAssets(loadConfig.network);
if (loadConfig.loadAssets) {
this._assets = loadConfig.loadAssets;
} else {
this._assets = assets.allAssets();
}
this._provider = new anchor.AnchorProvider(
loadConfig.connection,
wallet instanceof types.DummyWallet ? null : wallet,
Expand Down Expand Up @@ -637,10 +641,23 @@ export class Exchange {
this.subscribeOracle(this.assets, callback),
]);

const accFetches = (await Promise.all(allPromises)).slice(
0,
this.assets.length + 1
);
// If throttleMs is passed, do each promise slowly with a delay, else load everything at once
let accFetches = [];
if (loadConfig.throttleMs > 0) {
console.log(
`Fetching ${allPromises.length} core accounts with ${loadConfig.throttleMs}ms sleep inbetween each fetch...`
);
for (var prom of allPromises) {
await utils.sleep(loadConfig.throttleMs);
accFetches.push(await Promise.resolve(prom));
}
accFetches = accFetches.slice(0, this.assets.length + 1);
} else {
accFetches = (await Promise.all(allPromises)).slice(
0,
this.assets.length + 1
);
}

let perpSyncQueueAccs: PerpSyncQueue[] = [];
for (let i = 0; i < accFetches.length - 1; i++) {
Expand All @@ -652,26 +669,51 @@ export class Exchange {
);
}

await Promise.all(
this.assets.map(async (asset, i) => {
return this.getSubExchange(asset).load(
// If throttleMs is passed, load sequentially with some delay, else load as fast as possible
if (loadConfig.throttleMs > 0) {
for (var asset of this.assets) {
await utils.sleep(loadConfig.throttleMs);
await this.getSubExchange(asset).load(
asset,
this.opts,
[perpSyncQueueAccs[i]],
[perpSyncQueueAccs[this.assets.indexOf(asset)]],
loadConfig.loadFromStore,
loadConfig.throttleMs,
callback
);
})
);
}
} else {
await Promise.all(
this.assets.map(async (asset, i) => {
return this.getSubExchange(asset).load(
asset,
this.opts,
[perpSyncQueueAccs[i]],
loadConfig.loadFromStore,
callback
);
})
);
}

await Promise.all(
this._assets.map(async (a) => {
await this.getPerpMarket(a).serumMarket.updateDecoded(
if (loadConfig.throttleMs > 0) {
console.log(
`Updating ${this.assets.length} markets with ${loadConfig.throttleMs}ms sleep inbetween each update...`
);
for (var asset of this.assets) {
await utils.sleep(loadConfig.throttleMs);
await this.getPerpMarket(asset).serumMarket.updateDecoded(
this.connection as unknown as ConnectionZstd
);
})
);
}
} else {
await Promise.all(
this._assets.map(async (a) => {
await this.getPerpMarket(a).serumMarket.updateDecoded(
this.connection as unknown as ConnectionZstd
);
})
);
}

for (var se of this.getAllSubExchanges()) {
// Only subscribe to the orderbook for assets provided in the override
Expand Down Expand Up @@ -700,6 +742,11 @@ export class Exchange {
await this.updateExchangeState();

this._isInitialized = true;

// An extra log because the throttleMs path is slower and logs more along the way
if (loadConfig.throttleMs > 0) {
console.log("Exchange load complete");
}
}

private addSubExchange(asset: Asset, subExchange: SubExchange) {
Expand Down
1 change: 0 additions & 1 deletion src/market.ts
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,6 @@ export class ZetaGroupMarkets {
public static async load(
asset: Asset,
opts: ConfirmOptions,
throttleMs: number,
loadFromStore: boolean
): Promise<ZetaGroupMarkets> {
let instance = new ZetaGroupMarkets(asset);
Expand Down
10 changes: 2 additions & 8 deletions src/subexchange.ts
Original file line number Diff line number Diff line change
Expand Up @@ -126,14 +126,13 @@ export class SubExchange {

/**
* Loads a fresh instance of the subExchange object using on chain state.
* @param throttle Whether to sleep on market loading for rate limit reasons.
* @param throttleMs Whether to sleep on market loading for rate limit reasons.
*/
public async load(
asset: Asset,
opts: ConfirmOptions,
fetchedAccs: any[],
loadFromStore: boolean,
throttleMs = 0,
callback?: (asset: Asset, event: EventType, slot: number, data: any) => void
) {
console.info(`Loading ${assetToName(asset)} subExchange.`);
Expand All @@ -144,12 +143,7 @@ export class SubExchange {

this._perpSyncQueue = fetchedAccs[0] as PerpSyncQueue;

this._markets = await ZetaGroupMarkets.load(
asset,
opts,
throttleMs,
loadFromStore
);
this._markets = await ZetaGroupMarkets.load(asset, opts, loadFromStore);

Exchange.riskCalculator.updateMarginRequirements(asset);

Expand Down
5 changes: 4 additions & 1 deletion src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -496,6 +496,7 @@ export interface LoadExchangeConfig {
throttleMs: number;
loadFromStore: boolean;
TIFBufferSeconds: number;
loadAssets?: Asset[];
}

export function defaultLoadExchangeConfig(
Expand All @@ -506,7 +507,8 @@ export function defaultLoadExchangeConfig(
loadFromStore = false,
orderbookConnection: Connection = undefined,
orderbookAssetSubscriptionOverride: Asset[] = undefined,
TIFBufferSeconds: number = undefined
TIFBufferSeconds: number = undefined,
loadAssets: Asset[] = undefined
): LoadExchangeConfig {
return {
network,
Expand All @@ -517,6 +519,7 @@ export function defaultLoadExchangeConfig(
throttleMs,
loadFromStore,
TIFBufferSeconds,
loadAssets,
};
}

Expand Down

0 comments on commit 17c04d9

Please sign in to comment.