Skip to content

Commit

Permalink
fix: update msg_index by order in event and log (#327)
Browse files Browse the repository at this point in the history
* fix: update msg_index by order in event and log

* feat: add test mapping event and log

* feat: add test mapping by authz tx

* feat: add test mapping by authz tx

* fix: refactor mapping event to log, update unit test

* fix: comment set index msg with order, use count attribute to compare

* feat: add checkMappingEventToLog after mapped event

* fix: check map event log by flatten array

* fix: update checking map event log

* fix: add null value handle in checkMappingEventToLog

* feat: init job re assign msg_index to event

* fix: use transaction when update event

* fix: use forEach loop in job

* fix: sort graph when query db

* feat: add unittest for generateListUpdateMsgIndex function

* fix: update msg_index in 1 query per tx for each value

* fix: fix null when compare value attribute value

* fix: fix if tx fail, then no need set msg index
  • Loading branch information
fibonacci998 authored Sep 22, 2023
1 parent 8d721fb commit 7aa36bc
Show file tree
Hide file tree
Showing 8 changed files with 1,754 additions and 43 deletions.
4 changes: 4 additions & 0 deletions ci/config.json.ci
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,10 @@
"jobRedecodeTx": {
"limitRecordGet": 100
},
"jobReassignMsgIndexToEvent": {
"millisecondCrawl": 1000,
"blocksPerCall": 100
},
"crawlIbcTao": {
"key": "crawlIbcTao",
"millisecondRepeatJob": 2000,
Expand Down
4 changes: 4 additions & 0 deletions config.json
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,10 @@
"jobRedecodeTx": {
"limitRecordGet": 100
},
"jobReassignMsgIndexToEvent": {
"millisecondCrawl": 1000,
"blocksPerCall": 100
},
"crawlIbcTao": {
"key": "crawlIbcTao",
"millisecondRepeatJob": 2000,
Expand Down
5 changes: 5 additions & 0 deletions src/common/constant.ts
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ export const BULL_JOB_NAME = {
REINDEX_CW721_HISTORY: 'reindex:cw721-history',
HANDLE_MIGRATE_CONTRACT: 'handle:migrate-contract',
JOB_REDECODE_TX: 'job:redecode-tx',
JOB_REASSIGN_MSG_INDEX_TO_EVENT: 'job:reassign-msg-index-to-event',
CRAWL_IBC_TAO: 'crawl:ibc-tao',
CRAWL_GENESIS_IBC_TAO: 'crawl:genesis-ibc-tao',
CRAWL_IBC_APP: 'crawl:ibc-app',
Expand Down Expand Up @@ -232,6 +233,10 @@ export const SERVICE = {
key: 'ReDecodeTx',
path: 'v1.ReDecodeTx',
},
ReAssignMsgIndexToEvent: {
key: 'ReAssignMsgIndexToEvent',
path: 'v1.ReAssignMsgIndexToEvent',
},
},
CrawlIBCTaoService: {
key: 'CrawlIBCTaoService',
Expand Down
3 changes: 1 addition & 2 deletions src/models/event.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ export class Event extends BaseModel {

tx_id!: number;

tx_msg_index: number | undefined;
tx_msg_index?: number | undefined;

type!: string;

Expand All @@ -32,7 +32,6 @@ export class Event extends BaseModel {
required: ['type'],
properties: {
tx_id: { type: 'number' },
tx_msg_index: { type: 'number' },
type: { type: 'string' },
block_height: { type: 'number' },
source: { type: 'string', enum: Object.values(this.SOURCE) },
Expand Down
275 changes: 234 additions & 41 deletions src/services/crawl-tx/crawl_tx.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -333,58 +333,251 @@ export default class CrawlTxService extends BullableService {
this.logger.debug('result insert tx', resultInsertGraph);
}

private setMsgIndexToEvent(tx: any) {
const mapEventMsgIdx: Map<string, number[]> = new Map();
public mappingFlatEventToLog(
eventHasIndex: any,
eventEncodedFlat: any,
indexMsg: number
) {
const attributesInEvent = eventHasIndex.attributes;
// i, j are index of eventHasIndex and eventEncodedFlat
let i = 0;
let j = 0;
while (i < attributesInEvent.length && j < eventEncodedFlat.length) {
// find last element with same eventIndex
let lastIndexSameEvent = j;
while (
lastIndexSameEvent < eventEncodedFlat.length &&
eventEncodedFlat[lastIndexSameEvent].indexEvent ===
eventEncodedFlat[j].indexEvent
) {
lastIndexSameEvent += 1;
}

if (
attributesInEvent[i].key === eventEncodedFlat[j].key &&
attributesInEvent[i].value === eventEncodedFlat[j].value
) {
const isEventMapped = eventEncodedFlat
.slice(j, lastIndexSameEvent)
// eslint-disable-next-line @typescript-eslint/no-loop-func, no-inner-declarations
.every((item: any, index: number) => {
if (
attributesInEvent[i + index].key === item.key &&
attributesInEvent[i + index].value === item.value
) {
return true;
}
return false;
});
if (isEventMapped) {
// mark event encoded to be mapped
for (let k = j; k < lastIndexSameEvent; k += 1) {
// eslint-disable-next-line no-param-reassign
eventEncodedFlat[k].indexMapped = indexMsg;
}
i += lastIndexSameEvent - j;
}
}
j = lastIndexSameEvent;
}
}

// self check msg index by counting event
private selfCheckByAnotherWay(tx: any) {
// count total attribute for each message, countAttributeInEvent[i] = x mean message i has x attributes
const countAttributeInEvent: number[] = [];
tx.tx_response.logs.forEach((log: any) => {
const countAttribute = log.events.reduce(
(acc: number, curr: any) => acc + curr.attributes.length,
0
);
countAttributeInEvent.push(countAttribute);
});

let reachLastEventTypeTx = false;
let countCurrentAttribute = 0;
let currentCompareEventId = 0;
for (let i = 0; i < tx.tx_response.events.length; i += 1) {
if (tx.tx_response.events[i].type === 'tx') {
reachLastEventTypeTx = true;
}
if (reachLastEventTypeTx && tx.tx_response.events[i].type !== 'tx') {
if (
countCurrentAttribute < countAttributeInEvent[currentCompareEventId]
) {
countCurrentAttribute += tx.tx_response.events[i].attributes.length;
}

// after count, check if count is equal countAttributeInEvent[currentCompareEventId] or not
if (
countCurrentAttribute === countAttributeInEvent[currentCompareEventId]
) {
// if true, count success, then next currentCompareEventId and reset count = 0
currentCompareEventId += 1;
countCurrentAttribute = 0;
} else if (
countCurrentAttribute > countAttributeInEvent[currentCompareEventId]
) {
this.logger.warn('Count event in log is not equal event encoded');
return false;
}
}
}
return true;
}

private checkMappingEventToLog(tx: any) {
this.logger.info('checking mapping log in tx :', tx.tx_response.txhash);
let flattenLog: string[] = [];
let flattenEventEncoded: string[] = [];

// set index_msg from log to mapEventMsgIdx
tx.tx_response.logs?.forEach((log: any, index: number) => {
tx?.tx_response?.logs?.forEach((log: any, index: number) => {
log.events.forEach((event: any) => {
const { type } = event;
event.attributes.forEach((attribute: any) => {
const keyInMap = `${type}_${attribute.key}_${attribute.value}`;
if (mapEventMsgIdx.has(keyInMap)) {
const listIndex = mapEventMsgIdx.get(keyInMap);
listIndex?.push(index);
event.attributes.forEach((attr: any) => {
if (attr.value === undefined) {
flattenLog.push(`${index}-${event.type}-${attr.key}-null`);
} else {
mapEventMsgIdx.set(keyInMap, [index]);
flattenLog.push(`${index}-${event.type}-${attr.key}-${attr.value}`);
}
});
});
});

// set index_msg from mapEventMsgIdx to event
tx.tx_response.events.forEach((event: any) => {
const { type } = event;
event.attributes.forEach((attribute: any) => {
const key = attribute?.key
? fromUtf8(fromBase64(attribute?.key))
: null;
const value = attribute?.value
? fromUtf8(fromBase64(attribute?.value))
: null;
const keyInMap = `${type}_${key}_${value}`;

const listIndex = mapEventMsgIdx.get(keyInMap);
// get first index with this key
const firstIndex = listIndex?.shift();

if (firstIndex != null) {
if (event.msg_index && event.msg_index !== firstIndex) {
this.logger.warn(
`something wrong: setting index ${firstIndex} to existed index ${event.msg_index}`
);
} else {
// eslint-disable-next-line no-param-reassign
event.msg_index = firstIndex;
}
}

// delete key in map if value is empty
if (listIndex?.length === 0) {
mapEventMsgIdx.delete(keyInMap);
tx?.tx_response?.events?.forEach((event: any) => {
event.attributes.forEach((attr: any) => {
if (event.msg_index !== undefined) {
const key = attr.key ? fromUtf8(fromBase64(attr.key)) : null;
const value = attr.value ? fromUtf8(fromBase64(attr.value)) : null;
flattenEventEncoded.push(
`${event.msg_index}-${event.type}-${key}-${value}`
);
}
});
});
// compare 2 array
if (flattenLog.length !== flattenEventEncoded.length) {
this.logger.warn('Length between 2 flatten array is not equal');
}
flattenLog = flattenLog.sort();
flattenEventEncoded = flattenEventEncoded.sort();
const checkResult = flattenLog.every(
(item: string, index: number) => item === flattenEventEncoded[index]
);
if (checkResult === false) {
this.logger.warn('Mapping event to log is wrong');
}
}

public setMsgIndexToEvent(tx: any) {
/*------
DO NOT USE CURRENTLY
MAPPING BY ORDER IN EVENT AND LOG
THIS CASE BASED ON ORDER NOT CHANGED BETWEEN EVENT AND LOG
--------*/
// // flatten event and decode key value
// const eventEncodedFlats: any[] = [];
// tx.tx_response.events.forEach((event: any, index: number) => {
// event.attributes.forEach((attr: any) => {
// eventEncodedFlats.push({
// ...{
// key: attr.key ? fromUtf8(fromBase64(attr.key)) : null,
// value: attr.value ? fromUtf8(fromBase64(attr.value)) : null,
// },
// indexEvent: index,
// type: event.type,
// });
// });
// });
// // loop logs (has order for each messages)
// tx.tx_response.logs.forEach((log: any, index: number) => {
// // loop each event in log to compare with event encoded
// log.events.forEach((eventInLog: any) => {
// // filter list event has type equal eventInLog.type and not be setted index msg
// const filtedEventByTypeFlat = eventEncodedFlats.filter(
// (eventEncoded: any) =>
// eventEncoded.type === eventInLog.type &&
// eventEncoded.msg_index === undefined
// );
// // mapping between log and event
// this.mappingFlatEventToLog(eventInLog, filtedEventByTypeFlat, index);
// });
// });
// // set index msg to event encoded
// eventEncodedFlats
// .filter((item: any) => item.indexMapped !== undefined)
// .forEach((item: any) => {
// if (
// tx.tx_response.events[item.indexEvent].msg_index !== undefined &&
// tx.tx_response.events[item.indexEvent].msg_index !== item.indexMapped
// ) {
// this.logger.warn(
// `something wrong: setting index ${
// item.indexMapped
// } to existed index ${
// tx.tx_response.eventstx.tx_response.events[item.indexEvent]
// .msg_index
// }`
// );
// }
// // eslint-disable-next-line no-param-reassign
// tx.tx_response.events[item.indexEvent].msg_index = item.indexMapped;
// });
// // self check msg index by counting event
// const selfCheck = this.selfCheckByAnotherWay(tx);
// if (!selfCheck) {
// this.logger.warn('selfcheck fail');
// }

/*---------
TESTING
MAPPING EVENT BY COUNT EACH LOG AND EVENT MUST BE SAME
-----------*/

// if this is failed tx, then no need to set index msg
if (!tx.tx_response.logs) {
this.logger.info('Failed tx, no need to set index msg');
return;
}
// count total attribute for each message, countAttributeInEvent[i] = x mean message i has x attributes
const countAttributeInEvent: number[] = tx?.tx_response?.logs?.map(
(log: any) =>
log.events.reduce(
(acc: number, curr: any) => acc + curr.attributes.length,
0
)
);

let reachLastEventTypeTx = false;
let countCurrentAttribute = 0;
let currentCompareEventId = 0;
for (let i = 0; i < tx?.tx_response?.events?.length; i += 1) {
if (tx.tx_response.events[i].type === 'tx') {
reachLastEventTypeTx = true;
}
if (reachLastEventTypeTx && tx.tx_response.events[i].type !== 'tx') {
if (
countCurrentAttribute < countAttributeInEvent[currentCompareEventId]
) {
countCurrentAttribute += tx.tx_response.events[i].attributes.length;
// eslint-disable-next-line no-param-reassign
tx.tx_response.events[i].msg_index = currentCompareEventId;
}

// after count, check if count is equal countAttributeInEvent[currentCompareEventId] or not
if (
countCurrentAttribute === countAttributeInEvent[currentCompareEventId]
) {
// if true, count success, then next currentCompareEventId and reset count = 0
currentCompareEventId += 1;
countCurrentAttribute = 0;
} else if (
countCurrentAttribute > countAttributeInEvent[currentCompareEventId]
) {
this.logger.warn('Count event in log is not equal event encoded');
}
}
}
this.checkMappingEventToLog(tx);
}

private _findAttribute(
Expand Down
Loading

0 comments on commit 7aa36bc

Please sign in to comment.