diff --git a/package-lock.json b/package-lock.json index d1bc38d..96f905b 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,6 +1,6 @@ { "name": "crust-smanager", - "version": "1.0.3", + "version": "2.0.0", "lockfileVersion": 1, "requires": true, "dependencies": { @@ -6528,7 +6528,7 @@ }, "node-fetch": { "version": "npm:@achingbrain/node-fetch@2.6.7", - "resolved": "https://registry.npmmirror.com/@achingbrain/node-fetch/-/node-fetch-2.6.7.tgz", + "resolved": "https://registry.npmjs.org/@achingbrain/node-fetch/-/node-fetch-2.6.7.tgz", "integrity": "sha512-iTASGs+HTFK5E4ZqcMsHmeJ4zodyq8L38lZV33jwqcBJYoUt3HjN4+ot+O9/0b+ke8ddE7UgOtVuZN/OkV19/g==" }, "node-forge": { @@ -6608,6 +6608,11 @@ "integrity": "sha1-jZ2+KJZKSsVxLpExZCEHxx6Q7EA=", "dev": true }, + "node-os-utils": { + "version": "1.3.7", + "resolved": "https://registry.npmjs.org/node-os-utils/-/node-os-utils-1.3.7.tgz", + "integrity": "sha512-fvnX9tZbR7WfCG5BAy3yO/nCLyjVWD6MghEq0z5FDfN+ZXpLWNITBdbifxQkQ25ebr16G0N7eRWJisOcMEHG3Q==" + }, "node-pre-gyp": { "version": "0.11.0", "resolved": "https://registry.npmjs.org/node-pre-gyp/-/node-pre-gyp-0.11.0.tgz", @@ -7738,6 +7743,11 @@ "integrity": "sha512-9QNk5KwDF+Bvz+PyObkmSYjI5ksVUYtjW7AU22r2NKcfLJcXp96hkDWU3+XndOsUb+AQ9QhfzfCT2O+CNWT5Tw==", "dev": true }, + "systeminformation": { + "version": "5.22.11", + "resolved": "https://registry.npmjs.org/systeminformation/-/systeminformation-5.22.11.tgz", + "integrity": "sha512-aLws5yi4KCHTb0BVvbodQY5bY8eW4asMRDTxTW46hqw9lGjACX6TlLdJrkdoHYRB0qs+MekqEq1zG7WDnWE8Ug==" + }, "table": { "version": "6.7.1", "resolved": "https://registry.npmjs.org/table/-/table-6.7.1.tgz", diff --git a/package.json b/package.json index 335bcef..94c696f 100644 --- a/package.json +++ b/package.json @@ -1,10 +1,10 @@ { "name": "crust-smanager", - "version": "1.0.3", + "version": "2.0.0", "description": "A storage manager integrated with Crust, IPFS and sWorker(storage inspector) of Crust protocol.", "main": "build/src/main.js", "repository": "https://github.com/crustio/crust-smanager.git", - "author": "Zikun Fan ", + "author": "Brian Wu ", "engines": { "node": ">= 14.16 <15" }, @@ -30,7 +30,8 @@ "sqlite3": "^5.0.2", "tslib": "~2.3.0", "umzug": "^3.0.0-beta.16", - "winston": "^3.3.3" + "winston": "^3.3.3", + "node-os-utils": "^1.3.7" }, "devDependencies": { "@types/bignumber.js": "^5.0.0", diff --git a/src/sworker/index.ts b/src/sworker/index.ts index 3926297..933d7d0 100644 --- a/src/sworker/index.ts +++ b/src/sworker/index.ts @@ -1,6 +1,7 @@ import axios, { AxiosInstance } from 'axios'; import qs from 'querystring'; import { + EnclaveIdInfo, QuerySealInfoResult, SealInfoMap, SealInfoResp, @@ -87,6 +88,13 @@ export default class SworkerApi { return parseObj(res.data); } + async getEnclaveIdInfo(): Promise { + const res = await this.sworker.get('/enclave/id_info'); + if (!res || res.status !== 200) { + throw new Error(`invalid sworker response: ${res}`); + } + return parseObj(res.data); + } /// READ methods /** * Query local free storage size diff --git a/src/tasks/group-info-updater-task.ts b/src/tasks/group-info-updater-task.ts index 206b58c..fe1f63d 100644 --- a/src/tasks/group-info-updater-task.ts +++ b/src/tasks/group-info-updater-task.ts @@ -3,6 +3,9 @@ import { AppContext } from '../types/context'; import { SimpleTask } from '../types/tasks'; import { formatError } from '../utils'; import { makeIntervalTask } from './task-utils'; +import { Identity } from '../chain'; +import { ValidNodeAnchors } from './node-info-updater-task'; +import _ from 'lodash'; async function handleUpdate(context: AppContext, logger: Logger) { const { api } = context; @@ -26,13 +29,42 @@ async function handleUpdate(context: AppContext, logger: Logger) { // Get group members const members = await api.groupMembers(groupOwner); - members.sort(); - const nodeIndex = members.indexOf(api.getChainAccount()); - context.groupInfo = { - groupAccount: groupOwner, - totalMembers: members.length, - nodeIndex, - }; + + /// Filter valid members + // First get swork.Identities of the members + const queries = []; + for (const member of members) { + const query = [api.chainApi().query.swork.identities, member]; + queries.push(query); + } + const identities = await api.chainApi().queryMulti(queries); + + // Perform the filter + const validMembers = []; + for (let i = 0; i < members.length; i++) { + const member = members[i]; + const idCodec = identities[i]; + if (!_.isNil(idCodec) && !idCodec.isEmpty) { + const id = idCodec.toJSON() as Identity; + const anchor = id.anchor; + if (ValidNodeAnchors.has(anchor)) { + validMembers.push(member); + } + } + } + + if (validMembers.length > 0) { + logger.info(`load ${validMembers.length} valid group members`); + validMembers.sort(); + const nodeIndex = validMembers.indexOf(api.getChainAccount()); + context.groupInfo = { + groupAccount: groupOwner, + totalMembers: validMembers.length, + nodeIndex, + }; + } else { + logger.warn(`load ${validMembers.length} valid group members`); + } } catch (e) { logger.error('failed updating group info: %s', formatError(e)); context.groupInfo = null; diff --git a/src/tasks/node-info-updater-task.ts b/src/tasks/node-info-updater-task.ts index e5d5900..6eee9fb 100644 --- a/src/tasks/node-info-updater-task.ts +++ b/src/tasks/node-info-updater-task.ts @@ -5,18 +5,20 @@ import { AppContext } from '../types/context'; import { SimpleTask } from '../types/tasks'; import { formatError } from '../utils'; import { SLOT_LENGTH } from '../utils/consts'; -import { Dayjs } from '../utils/datetime'; import { makeIntervalTask } from './task-utils'; // the storage key for 'swork->workReport' const WorkReportKey = '0x2e3b7ab5757e6bbf28d3df3b5e01d6b9b7e949778e4650a54fcc65ad1f1ba39f'; +export const ValidNodeAnchors = new Set(); + async function handleUpdate(context: AppContext, logger: Logger) { const { api } = context; try { let lastKey = null; let totalCount = 0; + let tempValidNodeAnchors = new Set(); // eslint-disable-next-line while (true) { const keys = await (lastKey @@ -30,21 +32,33 @@ async function handleUpdate(context: AppContext, logger: Logger) { .map(extractReportAnchorFromKey) .filter() .value(); - const workReports = await Bluebird.mapSeries(validKeys, async (k) => { - // logger.info('loading workreport for key: %s', k); - return api.chainApi().query.swork.workReports(k); - }); - const validReports = _.filter(workReports, (r) => { - if (!r) { - return false; - } - const report = r.toJSON() as any; // eslint-disable-line - if (!report) { - logger.error('invalid workreport loaded'); - return false; + + // Get work reports from chain in multi mode + const queries = []; + for (const anchor of validKeys) { + const query = [api.chainApi().query.swork.workReports, anchor]; + queries.push(query); + } + const workReports = await api.chainApi().queryMulti(queries); + + // Filter out valid reports + const validReports = []; + for (let i = 0; i < validKeys.length; i++) { + const anchor = validKeys[i]; + const reportCodec = workReports[i]; + if (!_.isNil(reportCodec) && !reportCodec.isEmpty) { + const report = reportCodec.toJSON() as any; + if (!_.isNil(report)) { + if (report.report_slot >= currentSlot - SLOT_LENGTH) { + validReports.push(report); + tempValidNodeAnchors.add(anchor); + } + } else { + logger.error('invalid workreport loaded'); + } } - return report.report_slot >= currentSlot - SLOT_LENGTH; - }); + } + logger.info('load %d valid work reports', _.size(validReports)); totalCount += _.size(validReports); // wait for a short while to reduce system load @@ -59,6 +73,9 @@ async function handleUpdate(context: AppContext, logger: Logger) { context.nodeInfo = { nodeCount: totalCount, }; + // Update the global ValidNodeAnchors data which will be used by group-info-updater-task + ValidNodeAnchors.clear(); + tempValidNodeAnchors.forEach(anchor => ValidNodeAnchors.add(anchor)); } catch (e) { logger.error('failed updating node info: %s', formatError(e)); } @@ -75,11 +92,10 @@ export async function createNodeInfoUpdateTask( context: AppContext, loggerParent: Logger, ): Promise { - // update node count every 5 hours - // note: it's slow - const updateInterval = Dayjs.duration({ - hours: 1, - }).asMilliseconds(); + // After we change to use batch mode, the time cost to retrieve all work reports from chain + // has been reduced from ~100 minutes to ~5 minutes, so we can shorten the updateInterval + // update group info every 30 minutes + const updateInterval = 30 * 60 * 1000; return makeIntervalTask( 5 * 1000, diff --git a/src/tasks/telemetry-task.ts b/src/tasks/telemetry-task.ts index 699c640..0bcb9ef 100644 --- a/src/tasks/telemetry-task.ts +++ b/src/tasks/telemetry-task.ts @@ -5,7 +5,7 @@ import SworkerApi from '../sworker'; import { AppContext } from '../types/context'; import { PinStatus } from '../types/database'; import { NormalizedConfig } from '../types/smanager-config'; -import { WorkloadInfo } from '../types/sworker'; +import { EnclaveIdInfo, WorkloadInfo } from '../types/sworker'; import { SimpleTask } from '../types/tasks'; import { PinStats, @@ -13,11 +13,14 @@ import { SManagerInfo, TelemetryData, SWorkerStats, + OSInfo, } from '../types/telemetry'; import { formatError, getTimestamp, toQuotedList } from '../utils'; import { Dayjs } from '../utils/datetime'; import { PendingStatus } from './pull-utils'; import { makeIntervalTask } from './task-utils'; +import os from 'os'; +import osu from 'node-os-utils'; const ReportSlotDuration = Dayjs.duration({ hours: 24, @@ -63,9 +66,11 @@ async function collectStats( [timeStart], ); const workload = await getSworkerWorkload(sworkerApi, logger); + const enclaveIdInfo = await getSworkerEnclaveIdInfo(sworkerApi, logger); let reportWL: SWorkerStats; if (workload) { reportWL = { + id_info: enclaveIdInfo, srd: { srd_complete: workload.srd.srd_complete, srd_remaining_task: workload.srd.srd_remaining_task, @@ -81,6 +86,8 @@ async function collectStats( reportWL = null; } + const osInfo = await collectOSInfo(logger); + return { chainAccount: account, smangerInfo, @@ -96,6 +103,7 @@ async function collectStats( deletedCount, }, hasSealCoordinator: !!context.sealCoordinator, + osInfo }; } @@ -111,6 +119,18 @@ async function getSworkerWorkload( } } +async function getSworkerEnclaveIdInfo( + sworkerApi: SworkerApi, + logger: Logger, +): Promise { + try { + return await sworkerApi.getEnclaveIdInfo(); + } catch (e) { + logger.error('failed to get sworker enclave id_info: %s', formatError(e)); + return null; + } +} + async function collectQueueInfo(database): Promise { const { pendingCount } = await database.get( `select count(*) as pendingCount from file_record @@ -164,6 +184,32 @@ async function getPinStats(database, timeStart: number): Promise { }; } + +async function collectOSInfo(logger: Logger): Promise { + + try { + const kernel = os.release(); + const uptime = os.uptime(); + const cpuModel = osu.cpu.model(); + const cpuCount = osu.cpu.count(); + const memInfo = await osu.mem.info(); + + return { + kernel, + uptime, + cpuInfo: { + cpuModel, + cpuCount, + }, + memInfo + } + }catch(err){ + logger.error(`failed to collect os info: ${err}`); + } + + return null; +} + export async function createTelemetryReportTask( context: AppContext, loggerParent: Logger, diff --git a/src/types/sworker.d.ts b/src/types/sworker.d.ts index 5d578f1..509270b 100644 --- a/src/types/sworker.d.ts +++ b/src/types/sworker.d.ts @@ -33,6 +33,15 @@ export interface WorkloadInfo { }; } +export interface EnclaveIdInfo { + account: string; + attestation_mode: string | null; + mrenclave: string; + pub_key: string; + sworker_version: string; + version: string; +} + export type SealedType = 'valid' | 'lost' | 'pending'; export interface SealInfoData { diff --git a/src/types/telemetry.d.ts b/src/types/telemetry.d.ts index 3dcfdde..e9cdcb3 100644 --- a/src/types/telemetry.d.ts +++ b/src/types/telemetry.d.ts @@ -10,9 +10,18 @@ export interface TelemetryData { cleanupStats: CleanupStats; groupInfo: GroupInfo; hasSealCoordinator: boolean; + osInfo: OSInfo | null; } export interface SWorkerStats { + id_info: { + account: string; + attestation_mode: string | null; + mrenclave: string; + pub_key: string; + sworker_version: string; + version: string; + }; files: { lost: { num: number; @@ -59,3 +68,19 @@ export interface PinStats { export interface CleanupStats { deletedCount: number; } + +export interface OSInfo { + kernel: string; + uptime: number; + cpuInfo: { + cpuModel: string; + cpuCount: number; + }; + memInfo: { + totalMemMb: number; + usedMemMb: number; + freeMemMb: number; + usedMemPercentage: number; + freeMemPercentage: number; + }; +} \ No newline at end of file