Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix file filter bug and add more telemetry data #65

Merged
merged 6 commits into from
Jun 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 12 additions & 2 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 4 additions & 3 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
{
"name": "crust-smanager",
"version": "1.0.3",
"version": "2.0.0",
"description": "A storage manager integrated with Crust, IPFS and sWorker(storage inspector) of Crust protocol.",
"main": "build/src/main.js",
"repository": "https://github.com/crustio/crust-smanager.git",
"author": "Zikun Fan <kun@crust.network>",
"author": "Brian Wu <brianwu@crust.network>",
"engines": {
"node": ">= 14.16 <15"
},
Expand All @@ -30,7 +30,8 @@
"sqlite3": "^5.0.2",
"tslib": "~2.3.0",
"umzug": "^3.0.0-beta.16",
"winston": "^3.3.3"
"winston": "^3.3.3",
"node-os-utils": "^1.3.7"
},
"devDependencies": {
"@types/bignumber.js": "^5.0.0",
Expand Down
8 changes: 8 additions & 0 deletions src/sworker/index.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import axios, { AxiosInstance } from 'axios';
import qs from 'querystring';
import {
EnclaveIdInfo,
QuerySealInfoResult,
SealInfoMap,
SealInfoResp,
Expand Down Expand Up @@ -87,6 +88,13 @@ export default class SworkerApi {
return parseObj(res.data);
}

async getEnclaveIdInfo(): Promise<EnclaveIdInfo> {
const res = await this.sworker.get('/enclave/id_info');
if (!res || res.status !== 200) {
throw new Error(`invalid sworker response: ${res}`);
}
return parseObj(res.data);
}
/// READ methods
/**
* Query local free storage size
Expand Down
46 changes: 39 additions & 7 deletions src/tasks/group-info-updater-task.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@ import { AppContext } from '../types/context';
import { SimpleTask } from '../types/tasks';
import { formatError } from '../utils';
import { makeIntervalTask } from './task-utils';
import { Identity } from '../chain';
import { ValidNodeAnchors } from './node-info-updater-task';
import _ from 'lodash';

async function handleUpdate(context: AppContext, logger: Logger) {
const { api } = context;
Expand All @@ -26,13 +29,42 @@ async function handleUpdate(context: AppContext, logger: Logger) {

// Get group members
const members = await api.groupMembers(groupOwner);
members.sort();
const nodeIndex = members.indexOf(api.getChainAccount());
context.groupInfo = {
groupAccount: groupOwner,
totalMembers: members.length,
nodeIndex,
};

/// Filter valid members
// First get swork.Identities of the members
const queries = [];
for (const member of members) {
const query = [api.chainApi().query.swork.identities, member];
queries.push(query);
}
const identities = await api.chainApi().queryMulti(queries);

// Perform the filter
const validMembers = [];
for (let i = 0; i < members.length; i++) {
const member = members[i];
const idCodec = identities[i];
if (!_.isNil(idCodec) && !idCodec.isEmpty) {
const id = idCodec.toJSON() as Identity;
const anchor = id.anchor;
if (ValidNodeAnchors.has(anchor)) {
validMembers.push(member);
}
}
}

if (validMembers.length > 0) {
logger.info(`load ${validMembers.length} valid group members`);
validMembers.sort();
const nodeIndex = validMembers.indexOf(api.getChainAccount());
context.groupInfo = {
groupAccount: groupOwner,
totalMembers: validMembers.length,
nodeIndex,
};
} else {
logger.warn(`load ${validMembers.length} valid group members`);
}
} catch (e) {
logger.error('failed updating group info: %s', formatError(e));
context.groupInfo = null;
Expand Down
56 changes: 36 additions & 20 deletions src/tasks/node-info-updater-task.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,20 @@
import { SimpleTask } from '../types/tasks';
import { formatError } from '../utils';
import { SLOT_LENGTH } from '../utils/consts';
import { Dayjs } from '../utils/datetime';
import { makeIntervalTask } from './task-utils';

// the storage key for 'swork->workReport'
const WorkReportKey =
'0x2e3b7ab5757e6bbf28d3df3b5e01d6b9b7e949778e4650a54fcc65ad1f1ba39f';

export const ValidNodeAnchors = new Set();

async function handleUpdate(context: AppContext, logger: Logger) {
const { api } = context;
try {
let lastKey = null;
let totalCount = 0;
let tempValidNodeAnchors = new Set();

Check failure on line 21 in src/tasks/node-info-updater-task.ts

View workflow job for this annotation

GitHub Actions / build

'tempValidNodeAnchors' is never reassigned. Use 'const' instead
// eslint-disable-next-line
while (true) {
const keys = await (lastKey
Expand All @@ -30,21 +32,33 @@
.map(extractReportAnchorFromKey)
.filter()
.value();
const workReports = await Bluebird.mapSeries(validKeys, async (k) => {
// logger.info('loading workreport for key: %s', k);
return api.chainApi().query.swork.workReports(k);
});
const validReports = _.filter(workReports, (r) => {
if (!r) {
return false;
}
const report = r.toJSON() as any; // eslint-disable-line
if (!report) {
logger.error('invalid workreport loaded');
return false;

// Get work reports from chain in multi mode
const queries = [];
for (const anchor of validKeys) {
const query = [api.chainApi().query.swork.workReports, anchor];
queries.push(query);
}
const workReports = await api.chainApi().queryMulti(queries);

// Filter out valid reports
const validReports = [];
for (let i = 0; i < validKeys.length; i++) {
const anchor = validKeys[i];
const reportCodec = workReports[i];
if (!_.isNil(reportCodec) && !reportCodec.isEmpty) {
const report = reportCodec.toJSON() as any;

Check warning on line 50 in src/tasks/node-info-updater-task.ts

View workflow job for this annotation

GitHub Actions / build

Unexpected any. Specify a different type
if (!_.isNil(report)) {
if (report.report_slot >= currentSlot - SLOT_LENGTH) {
validReports.push(report);
tempValidNodeAnchors.add(anchor);
}
} else {
logger.error('invalid workreport loaded');
}
}
return report.report_slot >= currentSlot - SLOT_LENGTH;
});
}

logger.info('load %d valid work reports', _.size(validReports));
totalCount += _.size(validReports);
// wait for a short while to reduce system load
Expand All @@ -59,6 +73,9 @@
context.nodeInfo = {
nodeCount: totalCount,
};
// Update the global ValidNodeAnchors data which will be used by group-info-updater-task
ValidNodeAnchors.clear();
tempValidNodeAnchors.forEach(anchor => ValidNodeAnchors.add(anchor));
} catch (e) {
logger.error('failed updating node info: %s', formatError(e));
}
Expand All @@ -75,11 +92,10 @@
context: AppContext,
loggerParent: Logger,
): Promise<SimpleTask> {
// update node count every 5 hours
// note: it's slow
const updateInterval = Dayjs.duration({
hours: 1,
}).asMilliseconds();
// After we change to use batch mode, the time cost to retrieve all work reports from chain
// has been reduced from ~100 minutes to ~5 minutes, so we can shorten the updateInterval
// update group info every 30 minutes
const updateInterval = 30 * 60 * 1000;

return makeIntervalTask(
5 * 1000,
Expand Down
48 changes: 47 additions & 1 deletion src/tasks/telemetry-task.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,22 @@ import SworkerApi from '../sworker';
import { AppContext } from '../types/context';
import { PinStatus } from '../types/database';
import { NormalizedConfig } from '../types/smanager-config';
import { WorkloadInfo } from '../types/sworker';
import { EnclaveIdInfo, WorkloadInfo } from '../types/sworker';
import { SimpleTask } from '../types/tasks';
import {
PinStats,
QueueInfo,
SManagerInfo,
TelemetryData,
SWorkerStats,
OSInfo,
} from '../types/telemetry';
import { formatError, getTimestamp, toQuotedList } from '../utils';
import { Dayjs } from '../utils/datetime';
import { PendingStatus } from './pull-utils';
import { makeIntervalTask } from './task-utils';
import os from 'os';
import osu from 'node-os-utils';

const ReportSlotDuration = Dayjs.duration({
hours: 24,
Expand Down Expand Up @@ -63,9 +66,11 @@ async function collectStats(
[timeStart],
);
const workload = await getSworkerWorkload(sworkerApi, logger);
const enclaveIdInfo = await getSworkerEnclaveIdInfo(sworkerApi, logger);
let reportWL: SWorkerStats;
if (workload) {
reportWL = {
id_info: enclaveIdInfo,
srd: {
srd_complete: workload.srd.srd_complete,
srd_remaining_task: workload.srd.srd_remaining_task,
Expand All @@ -81,6 +86,8 @@ async function collectStats(
reportWL = null;
}

const osInfo = await collectOSInfo(logger);

return {
chainAccount: account,
smangerInfo,
Expand All @@ -96,6 +103,7 @@ async function collectStats(
deletedCount,
},
hasSealCoordinator: !!context.sealCoordinator,
osInfo
};
}

Expand All @@ -111,6 +119,18 @@ async function getSworkerWorkload(
}
}

async function getSworkerEnclaveIdInfo(
sworkerApi: SworkerApi,
logger: Logger,
): Promise<EnclaveIdInfo | null> {
try {
return await sworkerApi.getEnclaveIdInfo();
} catch (e) {
logger.error('failed to get sworker enclave id_info: %s', formatError(e));
return null;
}
}

async function collectQueueInfo(database): Promise<QueueInfo> {
const { pendingCount } = await database.get(
`select count(*) as pendingCount from file_record
Expand Down Expand Up @@ -164,6 +184,32 @@ async function getPinStats(database, timeStart: number): Promise<PinStats> {
};
}


async function collectOSInfo(logger: Logger): Promise<OSInfo> {

try {
const kernel = os.release();
const uptime = os.uptime();
const cpuModel = osu.cpu.model();
const cpuCount = osu.cpu.count();
const memInfo = await osu.mem.info();

return {
kernel,
uptime,
cpuInfo: {
cpuModel,
cpuCount,
},
memInfo
}
}catch(err){
logger.error(`failed to collect os info: ${err}`);
}

return null;
}

export async function createTelemetryReportTask(
context: AppContext,
loggerParent: Logger,
Expand Down
9 changes: 9 additions & 0 deletions src/types/sworker.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,15 @@ export interface WorkloadInfo {
};
}

export interface EnclaveIdInfo {
account: string;
attestation_mode: string | null;
mrenclave: string;
pub_key: string;
sworker_version: string;
version: string;
}

export type SealedType = 'valid' | 'lost' | 'pending';

export interface SealInfoData {
Expand Down
Loading
Loading