Skip to content

Commit

Permalink
Merge pull request #1188 from multiversx/SERVICES-1832-timescale-db-p…
Browse files Browse the repository at this point in the history
…erformance

[SERVICES-1832] fix logs processor lastProcessedTimestamp
  • Loading branch information
claudiulataretu authored Sep 18, 2023
2 parents 167996a + 94d20d5 commit 398ffc3
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 14 deletions.
1 change: 1 addition & 0 deletions src/services/analytics/timescaledb/timescaledb.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import {
database: apiConfig.getTimescaleDbDatabase(),
username: apiConfig.getTimescaleDbUsername(),
password: apiConfig.getTimescaleDbPassword(),
applicationName: 'xExchangeService',
ssl: true,
extra: {
ssl: {
Expand Down
46 changes: 32 additions & 14 deletions src/services/crons/logs.processor.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ export class LogsProcessorService {

let currentTimestamp: number;

if (lastProcessedTimestamp === null) {
if (!lastProcessedTimestamp || lastProcessedTimestamp === undefined) {
lastProcessedTimestamp =
(await this.apiService.getShardTimestamp(1)) - delay;
currentTimestamp = lastProcessedTimestamp;
Expand All @@ -109,10 +109,21 @@ export class LogsProcessorService {
return [lastProcessedTimestamp, currentTimestamp];
}

private async getFeeBurned(gte: number, lte: number) {
private async getFeeBurned(
currentTimestamp: number,
lastProcessedTimestamp: number,
) {
this.feeMap.clear();
await this.getSwapLogs('swapTokensFixedInput', gte, lte);
await this.getSwapLogs('swapTokensFixedOutput', gte, lte);
await this.getSwapLogs(
'swapTokensFixedInput',
currentTimestamp,
lastProcessedTimestamp,
);
await this.getSwapLogs(
'swapTokensFixedOutput',
currentTimestamp,
lastProcessedTimestamp,
);

const totalWriteRecords = await this.writeRecords(
this.feeMap,
Expand All @@ -122,11 +133,15 @@ export class LogsProcessorService {
this.logger.info(`fee burned records: ${totalWriteRecords}`);
}

private async getSwapLogs(swapType: string, gte: number, lte: number) {
private async getSwapLogs(
swapType: string,
currentTimestamp: number,
lastProcessedTimestamp: number,
) {
const transactionsLogs = await this.getTransactionsLogs(
swapType,
gte,
lte,
currentTimestamp,
lastProcessedTimestamp,
);

for (const transactionLogs of transactionsLogs) {
Expand All @@ -136,11 +151,14 @@ export class LogsProcessorService {
}
}

private async getExitFarmLogs(gte: number, lte: number) {
private async getExitFarmLogs(
currentTimestamp: number,
lastProcessedTimestamp: number,
) {
const transactionsLogs = await this.getTransactionsLogs(
'exitFarm',
gte,
lte,
currentTimestamp,
lastProcessedTimestamp,
);

this.penaltyMap.clear();
Expand All @@ -162,8 +180,8 @@ export class LogsProcessorService {

private async getTransactionsLogs(
eventName: string,
gte: number,
lte: number,
currentTimestamp: number,
lastProcessedTimestamp: number,
): Promise<any[]> {
const elasticQueryAdapter: ElasticQuery = new ElasticQuery();
elasticQueryAdapter.condition.must = [
Expand All @@ -177,11 +195,11 @@ export class LogsProcessorService {
'timestamp',
{
key: 'gte',
value: lte,
value: lastProcessedTimestamp,
},
{
key: 'lte',
value: gte,
value: currentTimestamp,
},
),
];
Expand Down

0 comments on commit 398ffc3

Please sign in to comment.