Skip to content

Commit

Permalink
1. Shorted the seal timeout judgemet from 30 minutes to 5 minutes for…
Browse files Browse the repository at this point in the history
… fast timeout to avoid pending file queue jam, which may leads to more file seal timeout because the file copy may have been removed from the original IPFS node if the file waits too long time in the queue.

2. Implement retry mechanism for seal failed records.

3. For the probability filter, the pTake value should divide by GroupCount instead of nodeCount, because one group can have only one replica.
  • Loading branch information
wuhaixian1984 committed Sep 10, 2024
1 parent d0b28a8 commit 6b68ed7
Show file tree
Hide file tree
Showing 10 changed files with 114 additions and 14 deletions.
2 changes: 2 additions & 0 deletions src/config/config.schema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ const schedulerConfig = Joi.object().keys({
maxFileSize: Joi.number().min(0).default(0),
minReplicas: Joi.number().min(0).default(0),
maxReplicas: Joi.number().min(0).default(200),
sealFailedRetryCount: Joi.number().min(0).default(3),
sealFailedRetryInterval: Joi.number().min(1).default(3), // Unit in hours
});

const sealCoordinatorConfig = Joi.object().keys({
Expand Down
10 changes: 10 additions & 0 deletions src/db/file-record.ts
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,15 @@ export function createFileOrderOperator(db: Database): DbOrderOperator {
);
};

const getRecordById = async (
id: number
): DbResult<FileRecord> => {
return db.get(
'select * from file_record where id = ? limit 1',
[id],
);
};

return {
addFiles,
getFileInfo: async (cid, indexer) => {
Expand All @@ -213,5 +222,6 @@ export function createFileOrderOperator(db: Database): DbOrderOperator {
getPendingCleanupRecords,
updateCleanupRecordStatus,
getPendingFileRecord,
getRecordById,
};
}
41 changes: 41 additions & 0 deletions src/db/migrations/2024.0910.1.pin-failed-retry.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
import { DataTypes, QueryInterface } from 'sequelize';
import { MigrationFn } from 'umzug';
import { withTransaction } from '../db-utils';

export const up: MigrationFn<QueryInterface> = async ({
context: sequelize,
}) => {
await withTransaction(sequelize, async (transaction) => {
await sequelize.addColumn(
'file_record',
'retry_count',
{
type: DataTypes.INTEGER,
allowNull: true,
defaultValue: 0,
},
{
transaction,
},
);

await sequelize.addColumn(
'pin_record',
'file_record_id',
{
type: DataTypes.INTEGER,
allowNull: true,
},
{
transaction,
},
);
});
};

export const down: MigrationFn<QueryInterface> = async ({
context: sequelize,
}) => {
await sequelize.removeColumn('pin_record', 'file_record_id');
await sequelize.removeColumn('file_record', 'retry_count');
};
7 changes: 4 additions & 3 deletions src/db/pin-record.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,13 @@ export function createPinRecordOperator(db: Database): PinRecordOperator {
cid: string,
size: number,
pinBy: PullingStrategy,
fileRecordId: number
): DbWriteResult => {
await db.run(
'insert into pin_record ' +
'(`cid`, `size`, `status`, `pin_at`, `last_updated`, `pin_by`) ' +
' values(?, ?, ?, ?, ?, ?)',
[cid, size, 'sealing', getTimestamp(), getTimestamp(), pinBy],
'(`cid`, `size`, `status`, `pin_at`, `last_updated`, `pin_by`, `file_record_id`) ' +
' values(?, ?, ?, ?, ?, ?, ?)',
[cid, size, 'sealing', getTimestamp(), getTimestamp(), pinBy, fileRecordId],
);
};
const getPinRecordsByCid = async (cid: string): DbResult<PinRecord[]> => {
Expand Down
15 changes: 15 additions & 0 deletions src/tasks/file-retry-task.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,21 @@ async function handleRetry(context: AppContext) {
and last_updated < ?`,
[maxRetryTime],
);

// Retry seal failed records
const sealFailedRetryInterval = Dayjs.duration({
hours: context.config.scheduler.sealFailedRetryInterval,
}).asSeconds();
const maxPinFailedRetryTime = now - sealFailedRetryInterval;

await database.run(
`update file_record
set status = "new",
retry_count = COALESCE(retry_count, 0) + 1
where status = 'sealFailedRetry'
and last_updated < ?`,
[maxPinFailedRetryTime],
);
}

export async function createFileRetryTask(
Expand Down
4 changes: 2 additions & 2 deletions src/tasks/pull-scheduler-task.ts
Original file line number Diff line number Diff line change
Expand Up @@ -331,8 +331,8 @@ async function sealFile(
strategey: PullingStrategy,
) {
logger.info('sealing for file "%s"', record.cid);
await pinRecordOps.addPinRecord(record.cid, record.size, strategey);
await fileOrderOps.updateFileInfoStatus(record.id, 'handled');
await pinRecordOps.addPinRecord(record.cid, record.size, strategey, record.id);
await fileOrderOps.updateFileInfoStatus(record.id, 'sealing');
const { ipfsApi } = context;
// timeout is necessay
const [abortCtrl, result] = ipfsApi.pin(
Expand Down
6 changes: 4 additions & 2 deletions src/tasks/pull-utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,15 @@ import { Logger } from 'winston';
import { logger } from '../utils/logger';

const CID = (IpfsHttpClient as any).CID; // eslint-disable-line
const GroupCount = 80; // Current average group count - 2024
export const SysMinFreeSpace = 50 * 1024; // 50 * 1024 MB
export const BasePinTimeout = 60 * 60 * 1000; // 60 minutes

export const RetryableStatus: FileStatus[] = [
'pending_replica',
'insufficient_space',
];
export const PendingStatus: FileStatus[] = ['new', ...RetryableStatus];
export const PendingStatus: FileStatus[] = ['new', 'sealFailedRetry', ...RetryableStatus];

type FilterFileResult =
| 'good'
Expand Down Expand Up @@ -165,7 +166,8 @@ function probabilityFilter(context: AppContext, maxReplicas: number): boolean {
if (nodeCount === 0) {
pTake = 0.0;
} else {
pTake = maxReplicas / nodeCount;
// Should not divide by nodeCount here, one group can have only one replica, so it should divide by groupCount
pTake = maxReplicas / GroupCount;
}

const memberCount = _.max([1, context.groupInfo.totalMembers]);
Expand Down
31 changes: 26 additions & 5 deletions src/tasks/seal-status-updater-task.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,16 @@ import _ from 'lodash';
import { Logger } from 'winston';
import { createPinRecordOperator } from '../db/pin-record';
import { AppContext } from '../types/context';
import { PinRecord, PinRecordOperator } from '../types/database';
import { DbOrderOperator, FileStatus, PinRecord, PinRecordOperator } from '../types/database';
import { SealInfoMap } from '../types/sworker';
import { SimpleTask } from '../types/tasks';
import { getTimestamp } from '../utils';
import { isSealDone } from './pull-utils';
import { IsStopped, makeIntervalTask } from './task-utils';
import { createFileOrderOperator } from '../db/file-record';

const MinSealStartTime = 30; // 30 seconds for a sealing job to start
const SealUpdateTimeout = 30 * 60; // 30 minutes for a sealing job timeout
const SealUpdateTimeout = 5 * 60; // 5 minutes for a sealing job timeout

/**
* task to update the sealing status in the pin records table
Expand All @@ -22,14 +23,15 @@ async function handleUpdate(
) {
const { database, sworkerApi } = context;
const pinRecordOps = createPinRecordOperator(database);
const fileOrderOps = createFileOrderOperator(database);
const pendingFiles = await sworkerApi.pendings();
const sealingRecords = await pinRecordOps.getSealingRecords();
logger.info('checking %d sealing records', sealingRecords.length);
for (const r of sealingRecords) {
if (isStopped()) {
break;
}
await checkAndUpdateStatus(r, pendingFiles, context, logger, pinRecordOps);
await checkAndUpdateStatus(r, pendingFiles, context, logger, pinRecordOps, fileOrderOps);
}
}

Expand All @@ -39,6 +41,7 @@ async function checkAndUpdateStatus(
context: AppContext,
logger: Logger,
pinRecordOps: PinRecordOperator,
fileOrderOps: DbOrderOperator,
) {
const now = getTimestamp();
const totalTimeUsed = now - record.pin_at;
Expand Down Expand Up @@ -76,7 +79,7 @@ async function checkAndUpdateStatus(
'sealing is too slow for file "%s", cancel sealing',
record.cid,
);
await markRecordAsFailed(record, pinRecordOps, context, logger, true);
await markRecordAsFailed(record, pinRecordOps, fileOrderOps, context, logger, true);
}
}
} else {
Expand All @@ -85,18 +88,21 @@ async function checkAndUpdateStatus(
if (!done) {
if (sealUpdateInterval > SealUpdateTimeout) {
logger.info('sealing blocked for file "%s", cancel sealing', record.cid);
await markRecordAsFailed(record, pinRecordOps, context, logger, false);
await markRecordAsFailed(record, pinRecordOps, fileOrderOps, context, logger, false);
}
} else {
logger.info('file "%s" is sealed, update the seal status', record.cid);
await pinRecordOps.updatePinRecordStatus(record.id, 'sealed');
// Update file record state from sealing to handled
await fileOrderOps.updateFileInfoStatus(record.file_record_id, 'handled');
}
}
}

async function markRecordAsFailed(
record: PinRecord,
pinRecordOps: PinRecordOperator,
fileOrderOps: DbOrderOperator,
context: AppContext,
logger: Logger,
endSworker: boolean,
Expand All @@ -107,7 +113,22 @@ async function markRecordAsFailed(
context.cancelationTokens[record.cid].abort();
delete context.cancelationTokens[record.cid];
}

// Check whether the file need to retry
let fileStatus: FileStatus = 'failed';
if (!_.isNil(record.file_record_id)) {
const fileRecord = await fileOrderOps.getRecordById(record.file_record_id);
if (!_.isNil(fileRecord)) {
const retry_count = _.isNil(fileRecord.retry_count) ? 0 : fileRecord.retry_count;
if (retry_count < context.config.scheduler.sealFailedRetryCount) {
fileStatus = 'sealFailedRetry';
}
}
}
// Update file_record and pin_record status
await pinRecordOps.updatePinRecordStatus(record.id, 'failed');
await fileOrderOps.updateFileInfoStatus(record.file_record_id, fileStatus);

if (endSworker) {
await sworkerApi.sealEnd(record.cid);
}
Expand Down
10 changes: 8 additions & 2 deletions src/types/database.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ export interface SDatabase {
getConfig: (name: string) => Promise<string | null>;
}

type FileStatus =
export type FileStatus =
| 'new'
| 'pending_replica'
| 'insufficient_space'
Expand All @@ -22,7 +22,9 @@ type FileStatus =
| 'sizeSmallSkipped'
| 'sizeLargeSkipped'
| 'replicasNotEnoughSkipped'
| 'tooManyReplicasSkipped';
| 'tooManyReplicasSkipped'
| 'sealing'
| 'sealFailedRetry';
type CleanupStatus = 'pending' | 'failed' | 'done';

export interface FileRecord {
Expand All @@ -36,6 +38,7 @@ export interface FileRecord {
status: FileStatus;
last_updated: number;
create_at: number;
retry_count: number;
}

export interface FileOwnerRecord {
Expand Down Expand Up @@ -76,6 +79,7 @@ export interface DbOrderOperator {
indexer: Indexer | null,
smallFile: boolean,
) => DbResult<FileRecord>;
getRecordById: (id: number) => DbResult<FileRecord>;
}

type DbResult<T> = Promise<T | null>;
Expand Down Expand Up @@ -107,6 +111,7 @@ export interface PinRecord {
pin_by: PullingStrategy;
sealed_size: number;
last_check_time: number;
file_record_id: number;
}

export interface PinRecordOperator {
Expand All @@ -115,6 +120,7 @@ export interface PinRecordOperator {
cid: string,
size: number,
pinBy: PullingStrategy,
fileRecordId: number,
) => DbWriteResult;
getSealingRecords: () => DbResult<PinRecord[]>;
getPinRecordsByCid: (cid: string) => DbResult<PinRecord[]>;
Expand Down
2 changes: 2 additions & 0 deletions src/types/smanager-config.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ export interface SchedulerConfig {
maxFileSize: number; // in MB
minReplicas: number; // min replicas for chainDb indexer
maxReplicas: number; // max replicas limit for all indexer
sealFailedRetryCount: number;
sealFailedRetryInterval: number;
}

export interface SealCoordinatorConfig {
Expand Down

0 comments on commit 6b68ed7

Please sign in to comment.