Skip to content

Commit

Permalink
Add trim to cli utils
Browse files Browse the repository at this point in the history
  • Loading branch information
guilledk committed Jun 28, 2024
1 parent fb76d31 commit faa1199
Show file tree
Hide file tree
Showing 6 changed files with 49 additions and 16 deletions.
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
"test-cache": "mocha build/src/tests/testCache.spec.js --timeout=120000",
"test-rw": "mocha build/src/tests/testRW.spec.js --timeout=120000",
"test-serial": "mocha build/src/tests/testSerialization.spec.js --timeout=120000",
"test-live": "mocha build/src/tests/testLiveMode.spec.js --timeout=120000",
"coverage": "c8 mocha build/src/tests/test*.spec.js"
},
"repository": {
Expand Down
5 changes: 4 additions & 1 deletion src/reader/broadcast.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import {DEFAULT_AWK_RANGE} from "../protocol.js";
export interface BroadcastClientParams {
url: string,
logger: Logger,
ordinalIndex: number,
handlers: {
pushRow: (row: any[]) => void,
flush: (info: FlushReq['params']) => void
Expand All @@ -28,6 +29,7 @@ export interface BroadcastClientParams {
export class ArrowBatchBroadcastClient {
private readonly url: string;
private readonly logger: Logger;
private readonly ordinalIndex: number;

private ws: ReconnectingWebSocket;
private pendingRequests: Map<string, (response: Response) => void>;
Expand All @@ -45,6 +47,7 @@ export class ArrowBatchBroadcastClient {
constructor(params: BroadcastClientParams) {
this.url = params.url;
this.logger = params.logger;
this.ordinalIndex = params.ordinalIndex;

this.syncAwkRange = BigInt(params.syncAwkRange ?? DEFAULT_AWK_RANGE);

Expand All @@ -53,7 +56,7 @@ export class ArrowBatchBroadcastClient {
this.serverMethodHandlers = new Map<string, (request: Request) => void>();

const genericServerRowHandler = (request: SyncRowReq) => {
const ordinal = BigInt(request.params.row[0]);
const ordinal = BigInt(request.params[this.ordinalIndex]);
const expected = this.syncTaskInfo.cursor + 1n;

if (ordinal % 1000n == 0n)
Expand Down
1 change: 1 addition & 0 deletions src/reader/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ export class ArrowBatchReader extends ArrowBatchContext {
this.wsClient = new ArrowBatchBroadcastClient({
url,
logger: this.logger,
ordinalIndex: this.ordinalIndex,
handlers: {
pushRow: (row: any[]) => {
this.pushRow(row);
Expand Down
2 changes: 1 addition & 1 deletion src/tests/testLiveMode.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import {

describe('liveMode', () => {

const logger = createLogger('testLive', 'info');
const logger = createLogger('testLive', 'debug');

const config: ArrowBatchConfig = ArrowBatchConfigSchema.parse({
dataDir: '../telosevm-translator/arrow-data',
Expand Down
22 changes: 11 additions & 11 deletions src/writer/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,9 @@ export class ArrowBatchWriter extends ArrowBatchReader {
this.workerLogger = loggers.add('writer', workerLogOptions);
this.workerLogger.debug(`logger for writer initialized with level ${this.config.writerLogLevel}`);

let alias = definition.alias ?? DEFAULT_ALIAS;
let alias = this.definition.alias ?? DEFAULT_ALIAS;

let streamBufMem = definition.stream_size ?? DEFAULT_STREAM_BUF_MEM;
let streamBufMem = this.definition.stream_size ?? DEFAULT_STREAM_BUF_MEM;
if (streamBufMem && typeof streamBufMem === 'string')
streamBufMem = bytes(streamBufMem);

Expand All @@ -63,7 +63,7 @@ export class ArrowBatchWriter extends ArrowBatchReader {
{
workerData: {
alias,
tableMapping: definition.map,
tableMapping: this.definition.map,
compression: this.config.compression,
logLevel: this.config.writerLogLevel,
streamBufMem
Expand Down Expand Up @@ -310,7 +310,7 @@ export class ArrowBatchWriter extends ArrowBatchReader {

await waitEvent(this.events, 'writer-ready');
}
private async trimOnDisk(ordinal: bigint) {
async trimOnDisk(ordinal: bigint) {
const adjustedOrdinal = this.getOrdinal(ordinal);

// delete every bucket bigger than adjustedOrdinal
Expand Down Expand Up @@ -339,15 +339,15 @@ export class ArrowBatchWriter extends ArrowBatchReader {
const fileName = this.tableFileMap.get(adjustedOrdinal);
await pfs.truncate(fileName, tableIndexEnd + 1);

// unwrap adjustedOrdinal:tableIndex table into fresh intermediate
this._intermediateBuffers = this._createBuffer();
const [table, ___] = await this.cache.getTableFor(ordinal);
// // unwrap adjustedOrdinal:tableIndex table into fresh intermediate
// this._intermediateBuffers = this._createBuffer();
// const [table, ___] = await this.cache.getTableFor(ordinal);

for (let i = 0; i < table.numRows; i++)
this.pushRow(table.get(i).toArray());
// for (let i = 0; i < table.numRows; i++)
// this.pushRow(table.get(i).toArray());

// use trim buffers helper
await this.trimOnBuffers(ordinal, this.tableMapping[this.ordinalIndex]);
// // use trim buffers helper
// await this.trimOnBuffers(ordinal, this.tableMapping[this.ordinalIndex]);
}

// async trimFrom(ordinal: bigint) {
Expand Down
34 changes: 31 additions & 3 deletions tools/ab-cli.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,19 @@ import {
ArrowBatchConfig,
ArrowBatchConfigSchema,
ArrowBatchReader,
ArrowBatchWriter,
createLogger,
extendedStringify, humanizeByteSize,
packageInfo
} from "../src/index.js";
import * as process from "node:process";
import * as console from "node:console";
import fastFolderSizeSync from "fast-folder-size/sync.js";
import {Logger} from "winston";

async function readerFromCLIOpts(options: {
function contextFromCLIOpts(options: {
config: string, dataDir: string
}): Promise<ArrowBatchReader> {
}): [ArrowBatchConfig, Logger] {
let config: ArrowBatchConfig;
if (options.config) {
// Check if the config file exists
Expand Down Expand Up @@ -43,12 +45,28 @@ async function readerFromCLIOpts(options: {
}

const logger = createLogger('ab-cli', 'info');

return [config, logger]
}

async function readerFromCLIOpts(options: {
config: string, dataDir: string
}): Promise<ArrowBatchReader> {
const [config, logger] = contextFromCLIOpts(options);
const reader = new ArrowBatchReader(config, undefined, logger);
await reader.init();

return reader;
}

async function writerFromCLIOpts(options: {
config: string, dataDir: string
}): Promise<ArrowBatchWriter> {
const [config, logger] = contextFromCLIOpts(options);
const writer = new ArrowBatchWriter(config, undefined, logger);
await writer.init();
return writer;
}

program
.version(packageInfo.version)
.description('AB CLI Tool');
Expand Down Expand Up @@ -100,4 +118,14 @@ program
console.log(extendedStringify(row, 4));
});

program
.command('trim <ordinal>')
.description('Trim data from ordinal')
.option('-c, --config <configFile>', 'Path to the config file', undefined)
.option('-d, --data-dir <dataDir>', 'Path to data directory, generate config dynamically', undefined)
.action(async (ordinal: string, options: {config: string, dataDir: string}) => {
const writer = await writerFromCLIOpts(options);
await writer.trimOnDisk(BigInt(ordinal));
});

program.parse(process.argv);

0 comments on commit faa1199

Please sign in to comment.