Skip to content

Commit

Permalink
Add cli helper tool
Browse files Browse the repository at this point in the history
Add postInstall script
Create fast data validation function
Bump rc version
  • Loading branch information
guilledk committed May 21, 2024
1 parent 691d2f4 commit e8a912d
Show file tree
Hide file tree
Showing 9 changed files with 480 additions and 17 deletions.
13 changes: 9 additions & 4 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
{
"name": "@guilledk/arrowbatch-nodejs",
"version": "1.0.0-rc13",
"version": "1.0.0-rc14",
"description": "Arrow Batch Storage protocol",
"main": "./build/index.js",
"bin": {
"ab-cli": "./build/tools/ab-cli.js"
},
"type": "module",
"exports": {
".": {
Expand All @@ -11,9 +14,9 @@
},
"scripts": {
"bootstrap": "yarn",
"build": "yarn run bootstrap && tsc",
"test-all": "mocha build/tests/test*.spec.js",
"coverage": "c8 mocha build/tests/test*.spec.js"
"build": "yarn run bootstrap && tsc && node build/tools/postInstall.js",
"test-all": "mocha build/src/tests/test*.spec.js",
"coverage": "c8 mocha build/src/tests/test*.spec.js"
},
"repository": {
"type": "git",
Expand All @@ -25,6 +28,8 @@
"apache-arrow": "^15.0.2",
"bufferutil": "^4.0.8",
"bytes": "^3.1.2",
"commander": "^12.1.0",
"fast-folder-size": "^2.2.0",
"moment": "^2.30.1",
"rlp": "^3.0.0",
"simple-zstd": "^1.4.2",
Expand Down
2 changes: 1 addition & 1 deletion src/cache.ts
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ export class ArrowBatchCache {
return [result, true];
}

private async directLoadTable(
async directLoadTable(
tableName: string,
adjustedOrdinal: number,
batchIndex: number
Expand Down
38 changes: 38 additions & 0 deletions src/reader/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -487,6 +487,44 @@ export class ArrowBatchReader extends ArrowBatchContext {
iter(params: {from: bigint, to: bigint}) : RowScroller {
return new RowScroller(this, params);
}

async validate() {
for (const adjustedOrdinal of this.tableFileMap.keys()) {
const [bucketMeta, _] = await this.cache.getMetadataFor(adjustedOrdinal, 'root');

for (const [batchIndex, batchMeta] of bucketMeta.meta.batches.entries()) {
this.logger.info(`validating bucket ${adjustedOrdinal} batch ${batchIndex + 1}/${bucketMeta.meta.batches.length}`);

const metaSize = Number(batchMeta.batch.lastOrdinal - batchMeta.batch.startOrdinal) + 1;

const [_, table] = await this.cache.directLoadTable('root', adjustedOrdinal, batchIndex);
const tableSize = table.numRows;
const actualStart = table.get(0).toArray()[0] as bigint;
const actualLast = table.get(tableSize - 1).toArray()[0] as bigint;

if (metaSize !== tableSize) {
this.logger.error(`metaSize (${metaSize}) != tableSize (${tableSize})`);

let lastEval = actualStart - 1n;
for (const [i, row] of table.toArray().entries()) {
const ord = row.toArray()[0] as bigint;
if (ord !== lastEval + 1n)
throw new Error(
`table row size metadata mismatch at table index ${i} expected ${lastEval + 1n} and got ${ord}`);

lastEval = ord;
}
}
// throw new Error(`table row size metadata mismatch!`);

if (batchMeta.batch.startOrdinal !== actualStart)
throw new Error(`batch metadata startOrd mismatch with actual!`);

if (batchMeta.batch.lastOrdinal !== actualLast)
throw new Error(`batch metadata lastOrd mismatch with actual!`);
}
}
}
}

export class RowScroller {
Expand Down
21 changes: 16 additions & 5 deletions src/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,11 @@ import {format, LogEntry, loggers, transports} from "winston";
import Transport from "winston-transport";
import {ZSTDCompress} from 'simple-zstd';
import EventEmitter from "node:events";
import {number} from "zod";


// currentDir == build/ dir
const currentDir = path.dirname(fileURLToPath(import.meta.url));
export const ROOT_DIR = path.join(currentDir, '..')
export const ROOT_DIR = path.join(currentDir, '../..')
export const SRC_DIR = path.join(ROOT_DIR, 'src');

const packageJsonFile = path.join(ROOT_DIR, 'package.json');
Expand Down Expand Up @@ -156,11 +155,23 @@ export async function waitEvent(emitter: EventEmitter, event: string): Promise<v
return new Promise(resolve => emitter.once(event, resolve));
}

export function extendedStringify(obj: any): string {
export function extendedStringify(obj: any, indent?: number): string {
return JSON.stringify(obj, (key, value) => {
if (typeof value === "bigint") {
return value.toString();
} else if (typeof value === "object" && value.type === "Buffer") {
return Buffer.from(value).toString('hex')
}
return value;
})
}
}, indent)
}

export function humanizeByteSize(bytes: number): string {
if (bytes === 0) return '0 Bytes';

const k = 1024;
const sizes = ['Bytes', 'KB', 'MB', 'GB', 'TB', 'PB', 'EB', 'ZB', 'YB'];
const i = Math.floor(Math.log(Math.abs(bytes)) / Math.log(k));

return parseFloat((bytes / Math.pow(k, i)).toFixed(2)) + ' ' + sizes[i];
}
2 changes: 1 addition & 1 deletion src/writer/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ export class ArrowBatchWriter extends ArrowBatchReader {
streamBufMem = bytes(streamBufMem);

const worker = new Worker(
path.join(ROOT_DIR, 'build/writer/worker.js'),
path.join(ROOT_DIR, 'build/src/writer/worker.js'),
{
workerData: {
tableName: name,
Expand Down
103 changes: 103 additions & 0 deletions tools/ab-cli.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
#!/usr/bin/env node
import { program } from 'commander';
import * as fs from 'fs';
import {
ArrowBatchConfig,
ArrowBatchConfigSchema,
ArrowBatchReader,
createLogger,
extendedStringify, humanizeByteSize,
packageInfo
} from "../src/index.js";
import * as process from "node:process";
import * as console from "node:console";
import fastFolderSizeSync from "fast-folder-size/sync.js";

async function readerFromCLIOpts(options: {
config: string, dataDir: string
}): Promise<ArrowBatchReader> {
let config: ArrowBatchConfig;
if (options.config) {
// Check if the config file exists
if (!fs.existsSync(options.config)) {
console.error(`Config file '${options.config}' does not exist.`);
process.exit(1);
}

// Read the config file
const configData = JSON.parse(fs.readFileSync(options.config, 'utf8'));
config = ArrowBatchConfigSchema.parse(configData);
} else if (options.dataDir) {
config = ArrowBatchConfigSchema.parse({
dataDir: options.dataDir
});
} else {
console.error(`Cant figure out data dir with those arguments. Try --help`);
process.exit(1);
}

// Check if the data directory exists
if (!fs.existsSync(config.dataDir)) {
console.error(`Data directory '${config.dataDir}' does not exist.`);
process.exit(1);
}

const logger = createLogger('ab-cli', 'info');
const reader = new ArrowBatchReader(config, undefined, logger);
await reader.init(0n);

return reader;
}

program
.version(packageInfo.version)
.description('AB CLI Tool');

program
.command('stat')
.description('Get statistics for the data directory')
.option('-c, --config <configFile>', 'Path to the config file', undefined)
.option('-d, --data-dir <dataDir>', 'Path to data directory, generate config dynamically', undefined)
.action(async (options: {config: string, dataDir: string}) => {
const reader = await readerFromCLIOpts(options);

const dataDirSize = fastFolderSizeSync(reader.config.dataDir);
const totalRows = reader.lastOrdinal - reader.firstOrdinal;

console.log(`data dir size: ${humanizeByteSize(dataDirSize)}`);

console.log(`start ordinal: ${reader.firstOrdinal.toLocaleString()}`);
console.log(`last ordinal: ${reader.lastOrdinal.toLocaleString()}`);
console.log(`total rows: ${totalRows.toLocaleString()}`);
});

program
.command('validate')
.description('Validate on disk tables')
.option('-c, --config <configFile>', 'Path to the config file', undefined)
.option('-d, --data-dir <dataDir>', 'Path to data directory, generate config dynamically', undefined)
.action(async (options: {config: string, dataDir: string}) => {
const reader = await readerFromCLIOpts(options);

try {
await reader.validate();
} catch (e) {
reader.logger.error(e.message);
process.exit(1);
}
});

program
.command('get <ordinal>')
.description('Get the value at the specified ordinal')
.option('-c, --config <configFile>', 'Path to the config file', undefined)
.option('-d, --data-dir <dataDir>', 'Path to data directory, generate config dynamically', undefined)
.action(async (ordinal: string, options: {config: string, dataDir: string}) => {
const reader = await readerFromCLIOpts(options);

const row = await reader.getRow(BigInt(ordinal));

console.log(extendedStringify(row, 4));
});

program.parse(process.argv);
29 changes: 29 additions & 0 deletions tools/postInstall.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
import path from "node:path";
import fs from "node:fs";
import {packageInfo, ROOT_DIR} from "../src/index.js";

const binPath = path.join(ROOT_DIR, 'build', 'tools');
const nodeModulesBinPath = path.join(ROOT_DIR, 'node_modules', '.bin');

const binaries = Object.values(packageInfo.bin).map((binPath: string) => path.basename(binPath));

fs.readdirSync(binPath).forEach(file => {
if (!binaries.includes(file))
return;

const srcPath = path.join(binPath, file);

if (!fs.existsSync(srcPath))
throw new Error(`source binary does not exist!`);

const srcStat = fs.statSync(srcPath);

fs.chmodSync(srcPath, srcStat.mode | fs.constants.S_IXUSR);

const destPath = path.join(nodeModulesBinPath, file);

if (fs.existsSync(destPath))
fs.rmSync(destPath);

fs.symlinkSync(srcPath, destPath, 'file');
});
4 changes: 2 additions & 2 deletions tsconfig.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"compilerOptions": {
"allowJs": true,
"allowSyntheticDefaultImports": true,
"baseUrl": "src",
"baseUrl": ".",
"module": "esnext",
"outDir": "build",
"target": "esnext",
Expand All @@ -27,5 +27,5 @@
},
"compileOnSave": false,
"exclude": ["node_modules", "build"],
"include": ["src"]
"include": ["src", "tools"]
}
Loading

0 comments on commit e8a912d

Please sign in to comment.