Skip to content

Commit

Permalink
Fix file filter bug and add more telemetry data (#65)
Browse files Browse the repository at this point in the history
1. Fix file filter bug:
There're many 'invalid/unused' members inside a group, so use all group members count to do cid % operation will filter most files out, and it would also increase the probability during the probabilityFilter.
So enhance the code to filter out invalid group members.

2. Use queryMulti to retrieve work reports from chain in a batch mode to speed up the node-info-updater-task, which reduce the whole indexing time from ~100 minutes to ~5 minutes.

3. Shorten the node-info-updater-task update interval from 1 hour to 30 minutes, since node-info-updater-task is pretty quick right now after refactor

4. Add more telemtry data:
    * sworker enclave id_info data
    * some os info statistics
  • Loading branch information
wuhaixian1984 authored Jun 21, 2024
1 parent d336a79 commit b1eacff
Show file tree
Hide file tree
Showing 8 changed files with 180 additions and 33 deletions.
14 changes: 12 additions & 2 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 4 additions & 3 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
{
"name": "crust-smanager",
"version": "1.0.3",
"version": "2.0.0",
"description": "A storage manager integrated with Crust, IPFS and sWorker(storage inspector) of Crust protocol.",
"main": "build/src/main.js",
"repository": "https://github.com/crustio/crust-smanager.git",
"author": "Zikun Fan <kun@crust.network>",
"author": "Brian Wu <brianwu@crust.network>",
"engines": {
"node": ">= 14.16 <15"
},
Expand All @@ -30,7 +30,8 @@
"sqlite3": "^5.0.2",
"tslib": "~2.3.0",
"umzug": "^3.0.0-beta.16",
"winston": "^3.3.3"
"winston": "^3.3.3",
"node-os-utils": "^1.3.7"
},
"devDependencies": {
"@types/bignumber.js": "^5.0.0",
Expand Down
8 changes: 8 additions & 0 deletions src/sworker/index.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import axios, { AxiosInstance } from 'axios';
import qs from 'querystring';
import {
EnclaveIdInfo,
QuerySealInfoResult,
SealInfoMap,
SealInfoResp,
Expand Down Expand Up @@ -87,6 +88,13 @@ export default class SworkerApi {
return parseObj(res.data);
}

async getEnclaveIdInfo(): Promise<EnclaveIdInfo> {
const res = await this.sworker.get('/enclave/id_info');
if (!res || res.status !== 200) {
throw new Error(`invalid sworker response: ${res}`);
}
return parseObj(res.data);
}
/// READ methods
/**
* Query local free storage size
Expand Down
46 changes: 39 additions & 7 deletions src/tasks/group-info-updater-task.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@ import { AppContext } from '../types/context';
import { SimpleTask } from '../types/tasks';
import { formatError } from '../utils';
import { makeIntervalTask } from './task-utils';
import { Identity } from '../chain';
import { ValidNodeAnchors } from './node-info-updater-task';
import _ from 'lodash';

async function handleUpdate(context: AppContext, logger: Logger) {
const { api } = context;
Expand All @@ -26,13 +29,42 @@ async function handleUpdate(context: AppContext, logger: Logger) {

// Get group members
const members = await api.groupMembers(groupOwner);
members.sort();
const nodeIndex = members.indexOf(api.getChainAccount());
context.groupInfo = {
groupAccount: groupOwner,
totalMembers: members.length,
nodeIndex,
};

/// Filter valid members
// First get swork.Identities of the members
const queries = [];
for (const member of members) {
const query = [api.chainApi().query.swork.identities, member];
queries.push(query);
}
const identities = await api.chainApi().queryMulti(queries);

// Perform the filter
const validMembers = [];
for (let i = 0; i < members.length; i++) {
const member = members[i];
const idCodec = identities[i];
if (!_.isNil(idCodec) && !idCodec.isEmpty) {
const id = idCodec.toJSON() as Identity;
const anchor = id.anchor;
if (ValidNodeAnchors.has(anchor)) {
validMembers.push(member);
}
}
}

if (validMembers.length > 0) {
logger.info(`load ${validMembers.length} valid group members`);
validMembers.sort();
const nodeIndex = validMembers.indexOf(api.getChainAccount());
context.groupInfo = {
groupAccount: groupOwner,
totalMembers: validMembers.length,
nodeIndex,
};
} else {
logger.warn(`load ${validMembers.length} valid group members`);
}
} catch (e) {
logger.error('failed updating group info: %s', formatError(e));
context.groupInfo = null;
Expand Down
56 changes: 36 additions & 20 deletions src/tasks/node-info-updater-task.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,20 @@ import { AppContext } from '../types/context';
import { SimpleTask } from '../types/tasks';
import { formatError } from '../utils';
import { SLOT_LENGTH } from '../utils/consts';
import { Dayjs } from '../utils/datetime';
import { makeIntervalTask } from './task-utils';

// the storage key for 'swork->workReport'
const WorkReportKey =
'0x2e3b7ab5757e6bbf28d3df3b5e01d6b9b7e949778e4650a54fcc65ad1f1ba39f';

export const ValidNodeAnchors = new Set();

async function handleUpdate(context: AppContext, logger: Logger) {
const { api } = context;
try {
let lastKey = null;
let totalCount = 0;
let tempValidNodeAnchors = new Set();

Check failure on line 21 in src/tasks/node-info-updater-task.ts

View workflow job for this annotation

GitHub Actions / build

'tempValidNodeAnchors' is never reassigned. Use 'const' instead
// eslint-disable-next-line
while (true) {
const keys = await (lastKey
Expand All @@ -30,21 +32,33 @@ async function handleUpdate(context: AppContext, logger: Logger) {
.map(extractReportAnchorFromKey)
.filter()
.value();
const workReports = await Bluebird.mapSeries(validKeys, async (k) => {
// logger.info('loading workreport for key: %s', k);
return api.chainApi().query.swork.workReports(k);
});
const validReports = _.filter(workReports, (r) => {
if (!r) {
return false;
}
const report = r.toJSON() as any; // eslint-disable-line
if (!report) {
logger.error('invalid workreport loaded');
return false;

// Get work reports from chain in multi mode
const queries = [];
for (const anchor of validKeys) {
const query = [api.chainApi().query.swork.workReports, anchor];
queries.push(query);
}
const workReports = await api.chainApi().queryMulti(queries);

// Filter out valid reports
const validReports = [];
for (let i = 0; i < validKeys.length; i++) {
const anchor = validKeys[i];
const reportCodec = workReports[i];
if (!_.isNil(reportCodec) && !reportCodec.isEmpty) {
const report = reportCodec.toJSON() as any;

Check warning on line 50 in src/tasks/node-info-updater-task.ts

View workflow job for this annotation

GitHub Actions / build

Unexpected any. Specify a different type
if (!_.isNil(report)) {
if (report.report_slot >= currentSlot - SLOT_LENGTH) {
validReports.push(report);
tempValidNodeAnchors.add(anchor);
}
} else {
logger.error('invalid workreport loaded');
}
}
return report.report_slot >= currentSlot - SLOT_LENGTH;
});
}

logger.info('load %d valid work reports', _.size(validReports));
totalCount += _.size(validReports);
// wait for a short while to reduce system load
Expand All @@ -59,6 +73,9 @@ async function handleUpdate(context: AppContext, logger: Logger) {
context.nodeInfo = {
nodeCount: totalCount,
};
// Update the global ValidNodeAnchors data which will be used by group-info-updater-task
ValidNodeAnchors.clear();
tempValidNodeAnchors.forEach(anchor => ValidNodeAnchors.add(anchor));
} catch (e) {
logger.error('failed updating node info: %s', formatError(e));
}
Expand All @@ -75,11 +92,10 @@ export async function createNodeInfoUpdateTask(
context: AppContext,
loggerParent: Logger,
): Promise<SimpleTask> {
// update node count every 5 hours
// note: it's slow
const updateInterval = Dayjs.duration({
hours: 1,
}).asMilliseconds();
// After we change to use batch mode, the time cost to retrieve all work reports from chain
// has been reduced from ~100 minutes to ~5 minutes, so we can shorten the updateInterval
// update group info every 30 minutes
const updateInterval = 30 * 60 * 1000;

return makeIntervalTask(
5 * 1000,
Expand Down
48 changes: 47 additions & 1 deletion src/tasks/telemetry-task.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,22 @@ import SworkerApi from '../sworker';
import { AppContext } from '../types/context';
import { PinStatus } from '../types/database';
import { NormalizedConfig } from '../types/smanager-config';
import { WorkloadInfo } from '../types/sworker';
import { EnclaveIdInfo, WorkloadInfo } from '../types/sworker';
import { SimpleTask } from '../types/tasks';
import {
PinStats,
QueueInfo,
SManagerInfo,
TelemetryData,
SWorkerStats,
OSInfo,
} from '../types/telemetry';
import { formatError, getTimestamp, toQuotedList } from '../utils';
import { Dayjs } from '../utils/datetime';
import { PendingStatus } from './pull-utils';
import { makeIntervalTask } from './task-utils';
import os from 'os';
import osu from 'node-os-utils';

const ReportSlotDuration = Dayjs.duration({
hours: 24,
Expand Down Expand Up @@ -63,9 +66,11 @@ async function collectStats(
[timeStart],
);
const workload = await getSworkerWorkload(sworkerApi, logger);
const enclaveIdInfo = await getSworkerEnclaveIdInfo(sworkerApi, logger);
let reportWL: SWorkerStats;
if (workload) {
reportWL = {
id_info: enclaveIdInfo,
srd: {
srd_complete: workload.srd.srd_complete,
srd_remaining_task: workload.srd.srd_remaining_task,
Expand All @@ -81,6 +86,8 @@ async function collectStats(
reportWL = null;
}

const osInfo = await collectOSInfo(logger);

return {
chainAccount: account,
smangerInfo,
Expand All @@ -96,6 +103,7 @@ async function collectStats(
deletedCount,
},
hasSealCoordinator: !!context.sealCoordinator,
osInfo
};
}

Expand All @@ -111,6 +119,18 @@ async function getSworkerWorkload(
}
}

async function getSworkerEnclaveIdInfo(
sworkerApi: SworkerApi,
logger: Logger,
): Promise<EnclaveIdInfo | null> {
try {
return await sworkerApi.getEnclaveIdInfo();
} catch (e) {
logger.error('failed to get sworker enclave id_info: %s', formatError(e));
return null;
}
}

async function collectQueueInfo(database): Promise<QueueInfo> {
const { pendingCount } = await database.get(
`select count(*) as pendingCount from file_record
Expand Down Expand Up @@ -164,6 +184,32 @@ async function getPinStats(database, timeStart: number): Promise<PinStats> {
};
}


async function collectOSInfo(logger: Logger): Promise<OSInfo> {

try {
const kernel = os.release();
const uptime = os.uptime();
const cpuModel = osu.cpu.model();
const cpuCount = osu.cpu.count();
const memInfo = await osu.mem.info();

return {
kernel,
uptime,
cpuInfo: {
cpuModel,
cpuCount,
},
memInfo
}
}catch(err){
logger.error(`failed to collect os info: ${err}`);
}

return null;
}

export async function createTelemetryReportTask(
context: AppContext,
loggerParent: Logger,
Expand Down
9 changes: 9 additions & 0 deletions src/types/sworker.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,15 @@ export interface WorkloadInfo {
};
}

export interface EnclaveIdInfo {
account: string;
attestation_mode: string | null;
mrenclave: string;
pub_key: string;
sworker_version: string;
version: string;
}

export type SealedType = 'valid' | 'lost' | 'pending';

export interface SealInfoData {
Expand Down
Loading

0 comments on commit b1eacff

Please sign in to comment.