From faa119947e328c97710bef4d7114215e0209f175 Mon Sep 17 00:00:00 2001 From: Guillermo Rodriguez Date: Thu, 27 Jun 2024 23:26:40 -0300 Subject: [PATCH] Add trim to cli utils --- package.json | 1 + src/reader/broadcast.ts | 5 ++++- src/reader/index.ts | 1 + src/tests/testLiveMode.spec.ts | 2 +- src/writer/index.ts | 22 +++++++++++----------- tools/ab-cli.ts | 34 +++++++++++++++++++++++++++++++--- 6 files changed, 49 insertions(+), 16 deletions(-) diff --git a/package.json b/package.json index cbf2387..7660d2e 100644 --- a/package.json +++ b/package.json @@ -19,6 +19,7 @@ "test-cache": "mocha build/src/tests/testCache.spec.js --timeout=120000", "test-rw": "mocha build/src/tests/testRW.spec.js --timeout=120000", "test-serial": "mocha build/src/tests/testSerialization.spec.js --timeout=120000", + "test-live": "mocha build/src/tests/testLiveMode.spec.js --timeout=120000", "coverage": "c8 mocha build/src/tests/test*.spec.js" }, "repository": { diff --git a/src/reader/broadcast.ts b/src/reader/broadcast.ts index ee03023..05f4335 100644 --- a/src/reader/broadcast.ts +++ b/src/reader/broadcast.ts @@ -18,6 +18,7 @@ import {DEFAULT_AWK_RANGE} from "../protocol.js"; export interface BroadcastClientParams { url: string, logger: Logger, + ordinalIndex: number, handlers: { pushRow: (row: any[]) => void, flush: (info: FlushReq['params']) => void @@ -28,6 +29,7 @@ export interface BroadcastClientParams { export class ArrowBatchBroadcastClient { private readonly url: string; private readonly logger: Logger; + private readonly ordinalIndex: number; private ws: ReconnectingWebSocket; private pendingRequests: Map void>; @@ -45,6 +47,7 @@ export class ArrowBatchBroadcastClient { constructor(params: BroadcastClientParams) { this.url = params.url; this.logger = params.logger; + this.ordinalIndex = params.ordinalIndex; this.syncAwkRange = BigInt(params.syncAwkRange ?? DEFAULT_AWK_RANGE); @@ -53,7 +56,7 @@ export class ArrowBatchBroadcastClient { this.serverMethodHandlers = new Map void>(); const genericServerRowHandler = (request: SyncRowReq) => { - const ordinal = BigInt(request.params.row[0]); + const ordinal = BigInt(request.params[this.ordinalIndex]); const expected = this.syncTaskInfo.cursor + 1n; if (ordinal % 1000n == 0n) diff --git a/src/reader/index.ts b/src/reader/index.ts index a9ee295..7f099b9 100644 --- a/src/reader/index.ts +++ b/src/reader/index.ts @@ -154,6 +154,7 @@ export class ArrowBatchReader extends ArrowBatchContext { this.wsClient = new ArrowBatchBroadcastClient({ url, logger: this.logger, + ordinalIndex: this.ordinalIndex, handlers: { pushRow: (row: any[]) => { this.pushRow(row); diff --git a/src/tests/testLiveMode.spec.ts b/src/tests/testLiveMode.spec.ts index acac9a1..70fd4b3 100644 --- a/src/tests/testLiveMode.spec.ts +++ b/src/tests/testLiveMode.spec.ts @@ -6,7 +6,7 @@ import { describe('liveMode', () => { - const logger = createLogger('testLive', 'info'); + const logger = createLogger('testLive', 'debug'); const config: ArrowBatchConfig = ArrowBatchConfigSchema.parse({ dataDir: '../telosevm-translator/arrow-data', diff --git a/src/writer/index.ts b/src/writer/index.ts index d8abf1f..f674833 100644 --- a/src/writer/index.ts +++ b/src/writer/index.ts @@ -52,9 +52,9 @@ export class ArrowBatchWriter extends ArrowBatchReader { this.workerLogger = loggers.add('writer', workerLogOptions); this.workerLogger.debug(`logger for writer initialized with level ${this.config.writerLogLevel}`); - let alias = definition.alias ?? DEFAULT_ALIAS; + let alias = this.definition.alias ?? DEFAULT_ALIAS; - let streamBufMem = definition.stream_size ?? DEFAULT_STREAM_BUF_MEM; + let streamBufMem = this.definition.stream_size ?? DEFAULT_STREAM_BUF_MEM; if (streamBufMem && typeof streamBufMem === 'string') streamBufMem = bytes(streamBufMem); @@ -63,7 +63,7 @@ export class ArrowBatchWriter extends ArrowBatchReader { { workerData: { alias, - tableMapping: definition.map, + tableMapping: this.definition.map, compression: this.config.compression, logLevel: this.config.writerLogLevel, streamBufMem @@ -310,7 +310,7 @@ export class ArrowBatchWriter extends ArrowBatchReader { await waitEvent(this.events, 'writer-ready'); } - private async trimOnDisk(ordinal: bigint) { + async trimOnDisk(ordinal: bigint) { const adjustedOrdinal = this.getOrdinal(ordinal); // delete every bucket bigger than adjustedOrdinal @@ -339,15 +339,15 @@ export class ArrowBatchWriter extends ArrowBatchReader { const fileName = this.tableFileMap.get(adjustedOrdinal); await pfs.truncate(fileName, tableIndexEnd + 1); - // unwrap adjustedOrdinal:tableIndex table into fresh intermediate - this._intermediateBuffers = this._createBuffer(); - const [table, ___] = await this.cache.getTableFor(ordinal); + // // unwrap adjustedOrdinal:tableIndex table into fresh intermediate + // this._intermediateBuffers = this._createBuffer(); + // const [table, ___] = await this.cache.getTableFor(ordinal); - for (let i = 0; i < table.numRows; i++) - this.pushRow(table.get(i).toArray()); + // for (let i = 0; i < table.numRows; i++) + // this.pushRow(table.get(i).toArray()); - // use trim buffers helper - await this.trimOnBuffers(ordinal, this.tableMapping[this.ordinalIndex]); + // // use trim buffers helper + // await this.trimOnBuffers(ordinal, this.tableMapping[this.ordinalIndex]); } // async trimFrom(ordinal: bigint) { diff --git a/tools/ab-cli.ts b/tools/ab-cli.ts index 9678e67..4be0a5f 100644 --- a/tools/ab-cli.ts +++ b/tools/ab-cli.ts @@ -5,6 +5,7 @@ import { ArrowBatchConfig, ArrowBatchConfigSchema, ArrowBatchReader, + ArrowBatchWriter, createLogger, extendedStringify, humanizeByteSize, packageInfo @@ -12,10 +13,11 @@ import { import * as process from "node:process"; import * as console from "node:console"; import fastFolderSizeSync from "fast-folder-size/sync.js"; +import {Logger} from "winston"; -async function readerFromCLIOpts(options: { +function contextFromCLIOpts(options: { config: string, dataDir: string -}): Promise { +}): [ArrowBatchConfig, Logger] { let config: ArrowBatchConfig; if (options.config) { // Check if the config file exists @@ -43,12 +45,28 @@ async function readerFromCLIOpts(options: { } const logger = createLogger('ab-cli', 'info'); + + return [config, logger] +} + +async function readerFromCLIOpts(options: { + config: string, dataDir: string +}): Promise { + const [config, logger] = contextFromCLIOpts(options); const reader = new ArrowBatchReader(config, undefined, logger); await reader.init(); - return reader; } +async function writerFromCLIOpts(options: { + config: string, dataDir: string +}): Promise { + const [config, logger] = contextFromCLIOpts(options); + const writer = new ArrowBatchWriter(config, undefined, logger); + await writer.init(); + return writer; +} + program .version(packageInfo.version) .description('AB CLI Tool'); @@ -100,4 +118,14 @@ program console.log(extendedStringify(row, 4)); }); +program + .command('trim ') + .description('Trim data from ordinal') + .option('-c, --config ', 'Path to the config file', undefined) + .option('-d, --data-dir ', 'Path to data directory, generate config dynamically', undefined) + .action(async (ordinal: string, options: {config: string, dataDir: string}) => { + const writer = await writerFromCLIOpts(options); + await writer.trimOnDisk(BigInt(ordinal)); + }); + program.parse(process.argv); \ No newline at end of file