Skip to content

Commit

Permalink
1. Add filesSkipRate filter logic
Browse files Browse the repository at this point in the history
2. Revert back the original filter logic
  • Loading branch information
wuhaixian1984 committed Jul 23, 2024
1 parent a368310 commit 316c447
Show file tree
Hide file tree
Showing 3 changed files with 103 additions and 92 deletions.
2 changes: 2 additions & 0 deletions src/config/config.schema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ const schedulerConfig = Joi.object().keys({
strategyWeightsSchema,
).default('default'),
minSrdRatio: Joi.number().min(0).max(100).default(70),
filesSkipRate: Joi.number().min(0).max(100).default(0),
enableProbabilityFilter: Joi.boolean().default(false),
maxPendingTasks: Joi.number().min(1).default(32),
minFileSize: Joi.number().min(0).default(0),
maxFileSize: Joi.number().min(0).default(0),
Expand Down
191 changes: 99 additions & 92 deletions src/tasks/pull-utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,15 @@ import BigNumber from 'bignumber.js';
import { FileRecord, FileStatus } from '../types/database';
import { PullingStrategy } from '../types/smanager-config';
import IpfsHttpClient from 'ipfs-http-client';
import { formatError } from '../utils';
//import { Dayjs } from '../utils/datetime';
import { BlockAndTime } from '../utils/chain-math';
import { bytesToMb, formatError } from '../utils';
import { Dayjs } from '../utils/datetime';
import { BlockAndTime, estimateTimeAtBlock } from '../utils/chain-math';
import { AppContext } from '../types/context';
//import seedrandom from 'seedrandom';
import seedrandom from 'seedrandom';
import _ from 'lodash';
import SworkerApi from '../sworker';
import { Logger } from 'winston';
//import { logger } from '../utils/logger';
import { logger } from '../utils/logger';

const CID = (IpfsHttpClient as any).CID; // eslint-disable-line
export const SysMinFreeSpace = 50 * 1024; // 50 * 1024 MB
Expand All @@ -34,24 +34,25 @@ type FilterFileResult =
| 'sizeTooLarge'
| 'replicasNotEnough'
| 'tooManyReplicas'
| 'pendingForReplica';
| 'pendingForReplica'
| 'rateSkipped';

// treat file as invalid if no replicas for at most 10 days
// const MaxNoReplicaDuration = Dayjs.duration({
// days: 10,
// });
// const MinLifeTime = Dayjs.duration({
// months: 4,
// });
const MaxNoReplicaDuration = Dayjs.duration({
days: 10,
});
const MinLifeTime = Dayjs.duration({
months: 4,
});

// TODO: add some tests
export async function filterFile(
record: FileRecord,
_strategey: PullingStrategy,
_lastBlockTime: BlockAndTime,
strategey: PullingStrategy,
lastBlockTime: BlockAndTime,
context: AppContext,
): Promise<FilterFileResult> {
//const config = context.config.scheduler;
const config = context.config.scheduler;
const groupInfo = context.groupInfo;
try {
const bn = cidToBigNumber(record.cid);
Expand All @@ -62,60 +63,66 @@ export async function filterFile(
return 'invalidCID';
}

// 2024/07/08 - Comment out the following filter logic to verify the replica count increase in the whole mainnet

// const maxReplicas = strategey === 'newFilesWeight' ? 300 : 160;
// if (!probabilityFilter(context, maxReplicas)) {
// return 'pfSkipped';
// }
// const fileSizeInMb = bytesToMb(record.size);
// // check min file size limit
// if (config.minFileSize > 0 && fileSizeInMb < config.minFileSize) {
// return 'sizeTooSmall';
// }
// if (config.maxFileSize > 0 && fileSizeInMb > config.maxFileSize) {
// return 'sizeTooLarge';
// }
// if (
// strategey === 'dbFilesWeight' &&
// config.minReplicas > 0 &&
// record.replicas < config.minReplicas
// ) {
// return 'replicasNotEnough';
// }
// if (config.maxReplicas > 0 && record.replicas >= config.maxReplicas) {
// return 'tooManyReplicas';
// }
// if (record.indexer === 'dbScan') {
// // file record has no valid expire_at information
// if (record.expire_at === 0) {
// // check how long the file was indexed
// const createAt = Dayjs.unix(record.create_at);
// if (
// Dayjs.duration(Dayjs().diff(createAt)).asSeconds() >
// MaxNoReplicaDuration.asSeconds()
// ) {
// return 'invalidNoReplica';
// }
// return 'pendingForReplica';
// }
// const expireAt = estimateTimeAtBlock(record.expire_at, lastBlockTime);
// if (
// Dayjs.duration(expireAt.diff(Dayjs())).asSeconds() <
// MinLifeTime.asSeconds()
// ) {
// return 'lifeTimeTooShort';
// }
// }
// const sealCoordinator = context.sealCoordinator;
// if (sealCoordinator != null) {
// const shouldSeal = await sealCoordinator.markSeal(record.cid);
// if (shouldSeal.seal && shouldSeal.reason === 'ok') {
// return 'good';
// }
// logger.info(`seal for file "${record.cid}" skipped by seal coordinator`);
// return 'nodeSkipped';
// }
// Filter by filesSkipRate
if (config.filesSkipRate > 0) {
const randomNumber = rdm(context.config.chain.account) * 100;
if (randomNumber < config.filesSkipRate) {
return 'rateSkipped';
}
}

const maxReplicas = strategey === 'newFilesWeight' ? 300 : 160;
if (config.enableProbabilityFilter && !probabilityFilter(context, maxReplicas)) {
return 'pfSkipped';
}
const fileSizeInMb = bytesToMb(record.size);
// check min file size limit
if (config.minFileSize > 0 && fileSizeInMb < config.minFileSize) {
return 'sizeTooSmall';
}
if (config.maxFileSize > 0 && fileSizeInMb > config.maxFileSize) {
return 'sizeTooLarge';
}
if (
strategey === 'dbFilesWeight' &&
config.minReplicas > 0 &&
record.replicas < config.minReplicas
) {
return 'replicasNotEnough';
}
if (config.maxReplicas > 0 && record.replicas >= config.maxReplicas) {
return 'tooManyReplicas';
}
if (record.indexer === 'dbScan') {
// file record has no valid expire_at information
if (record.expire_at === 0) {
// check how long the file was indexed
const createAt = Dayjs.unix(record.create_at);
if (
Dayjs.duration(Dayjs().diff(createAt)).asSeconds() >
MaxNoReplicaDuration.asSeconds()
) {
return 'invalidNoReplica';
}
return 'pendingForReplica';
}
const expireAt = estimateTimeAtBlock(record.expire_at, lastBlockTime);
if (
Dayjs.duration(expireAt.diff(Dayjs())).asSeconds() <
MinLifeTime.asSeconds()
) {
return 'lifeTimeTooShort';
}
}
const sealCoordinator = context.sealCoordinator;
if (sealCoordinator != null) {
const shouldSeal = await sealCoordinator.markSeal(record.cid);
if (shouldSeal.seal && shouldSeal.reason === 'ok') {
return 'good';
}
logger.info(`seal for file "${record.cid}" skipped by seal coordinator`);
return 'nodeSkipped';
}

return 'good';
}
Expand Down Expand Up @@ -148,29 +155,29 @@ export function estimateIpfsPinTimeout(size: number /** in bytes */): number {
return BasePinTimeout + (size / 1024 / 200) * 1000;
}

// // function probabilityFilter(context: AppContext, maxReplicas: number): boolean {
// // if (!context.nodeInfo) {
// // return false;
// // }
// // // Base probability
// // let pTake = 0.0;
// // const nodeCount = context.nodeInfo.nodeCount;
// // if (nodeCount === 0) {
// // pTake = 0.0;
// // } else {
// // pTake = maxReplicas / nodeCount;
// // }

// // const memberCount = _.max([1, context.groupInfo.totalMembers]);
// // pTake = pTake * memberCount;

// // return pTake > rdm(context.config.chain.account);
// // }

// function rdm(seed: string): number {
// const rng = seedrandom(seed, { entropy: true });
// return rng();
// }
function probabilityFilter(context: AppContext, maxReplicas: number): boolean {
if (!context.nodeInfo) {
return false;
}
// Base probability
let pTake = 0.0;
const nodeCount = context.nodeInfo.nodeCount;
if (nodeCount === 0) {
pTake = 0.0;
} else {
pTake = maxReplicas / nodeCount;
}

const memberCount = _.max([1, context.groupInfo.totalMembers]);
pTake = pTake * memberCount;

return pTake > rdm(context.config.chain.account);
}

function rdm(seed: string): number {
const rng = seedrandom(seed, { entropy: true });
return rng();
}

export async function isSealDone(
cid: string,
Expand Down
2 changes: 2 additions & 0 deletions src/types/smanager-config.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ export interface SchedulerConfig {
strategy: StrategyConfig;
maxPendingTasks: number;
minSrdRatio: number; // percent
filesSkipRate: number; // percent
enableProbabilityFilter: boolean;
minFileSize: number; // in MB
maxFileSize: number; // in MB
minReplicas: number; // min replicas for chainDb indexer
Expand Down

0 comments on commit 316c447

Please sign in to comment.