Skip to content

Commit

Permalink
Merge pull request #8577 from romayalon/romy-file-writer
Browse files Browse the repository at this point in the history
NSFS | Replace ChunkFS with FileWriter
  • Loading branch information
romayalon authored Dec 9, 2024
2 parents dee7a5e + a451243 commit cf0daca
Show file tree
Hide file tree
Showing 9 changed files with 252 additions and 194 deletions.
2 changes: 2 additions & 0 deletions config.js
Original file line number Diff line number Diff line change
Expand Up @@ -882,6 +882,8 @@ config.NSFS_LOW_FREE_SPACE_PERCENT_UNLEASH = 0.10;
// anonymous account name
config.ANONYMOUS_ACCOUNT_NAME = 'anonymous';

config.NFSF_UPLOAD_STREAM_MEM_THRESHOLD = 8 * 1024 * 1024;

////////////////////////////
// NSFS NON CONTAINERIZED //
////////////////////////////
Expand Down
35 changes: 16 additions & 19 deletions src/sdk/namespace_fs.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ const stream_utils = require('../util/stream_utils');
const buffer_utils = require('../util/buffer_utils');
const size_utils = require('../util/size_utils');
const native_fs_utils = require('../util/native_fs_utils');
const ChunkFS = require('../util/chunk_fs');
const FileWriter = require('../util/file_writer');
const LRUCache = require('../util/lru_cache');
const nb_native = require('../util/nb_native');
const RpcError = require('../rpc/rpc_error');
Expand Down Expand Up @@ -1564,36 +1564,33 @@ class NamespaceFS {
// Can be finetuned further on if needed and inserting the Semaphore logic inside
// Instead of wrapping the whole _upload_stream function (q_buffers lives outside of the data scope of the stream)
async _upload_stream({ fs_context, params, target_file, object_sdk, offset }) {
const { source_stream, copy_source } = params;
const { copy_source } = params;
try {
// Not using async iterators with ReadableStreams due to unsettled promises issues on abort/destroy
const md5_enabled = this._is_force_md5_enabled(object_sdk);
const chunk_fs = new ChunkFS({
const file_writer = new FileWriter({
target_file,
fs_context,
stats: this.stats,
namespace_resource_id: this.namespace_resource_id,
md5_enabled,
offset,
md5_enabled,
stats: this.stats,
bucket: params.bucket,
large_buf_size: multi_buffer_pool.get_buffers_pool(undefined).buf_size
large_buf_size: multi_buffer_pool.get_buffers_pool(undefined).buf_size,
namespace_resource_id: this.namespace_resource_id,
});
chunk_fs.on('error', err1 => dbg.error('namespace_fs._upload_stream: error occured on stream ChunkFS: ', err1));
chunk_fs.on('finish', arg => dbg.error('namespace_fs._upload_stream: finish occured on stream ChunkFS: ', arg));
chunk_fs.on('close', arg => dbg.error('namespace_fs._upload_stream: close occured on stream ChunkFS: ', arg));
file_writer.on('error', err => dbg.error('namespace_fs._upload_stream: error occured on FileWriter: ', err));
file_writer.on('finish', arg => dbg.log1('namespace_fs._upload_stream: finish occured on stream FileWriter: ', arg));
file_writer.on('close', arg => dbg.log1('namespace_fs._upload_stream: close occured on stream FileWriter: ', arg));

if (copy_source) {
// ChunkFS is a Transform stream, however read_object_stream expects a write stream. call resume to close the read part
// we need to close both read and write parts for Transform stream to properly close and release resorces
chunk_fs.resume();
await this.read_object_stream(copy_source, object_sdk, chunk_fs);
await this.read_object_stream(copy_source, object_sdk, file_writer);
} else if (params.source_params) {
chunk_fs.resume();
await params.source_ns.read_object_stream(params.source_params, object_sdk, chunk_fs);
await params.source_ns.read_object_stream(params.source_params, object_sdk, file_writer);
} else {
await stream_utils.pipeline([source_stream, chunk_fs]);
await stream_utils.wait_finished(chunk_fs);
await stream_utils.pipeline([params.source_stream, file_writer]);
await stream_utils.wait_finished(file_writer);
}
return { digest: chunk_fs.digest, total_bytes: chunk_fs.total_bytes };
return { digest: file_writer.digest, total_bytes: file_writer.total_bytes };
} catch (error) {
dbg.error('_upload_stream had error: ', error);
throw error;
Expand Down
2 changes: 1 addition & 1 deletion src/test/unit_tests/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ require('./test_bucket_chunks_builder');
require('./test_mirror_writer');
require('./test_namespace_fs');
require('./test_ns_list_objects');
require('./test_chunk_fs');
require('./test_file_writer');
require('./test_namespace_fs_mpu');
require('./test_nb_native_fs');
require('./test_s3select');
Expand Down
2 changes: 1 addition & 1 deletion src/test/unit_tests/nc_index.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ coretest.setup();

require('./test_namespace_fs');
require('./test_ns_list_objects');
require('./test_chunk_fs');
require('./test_file_writer');
require('./test_namespace_fs_mpu');
require('./test_nb_native_fs');
require('./test_nc_nsfs_cli');
Expand Down
34 changes: 0 additions & 34 deletions src/test/unit_tests/test_chunk_fs.js

This file was deleted.

69 changes: 69 additions & 0 deletions src/test/unit_tests/test_file_writer.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/* Copyright (C) 2020 NooBaa */
/* eslint-disable no-invalid-this */
'use strict';

const mocha = require('mocha');
const config = require('../../../config');
const file_writer_hashing = require('../../tools/file_writer_hashing');
const orig_iov_max = config.NSFS_DEFAULT_IOV_MAX;

// on iov_max small tests we need to use smaller amount of parts and chunks to ensure that the test will finish
// in a reasonable period of time because we will flush max 1/2 buffers at a time.
const small_iov_num_parts = 20;


mocha.describe('FileWriter', function() {
const RUN_TIMEOUT = 10 * 60 * 1000;

mocha.afterEach(function() {
config.NSFS_DEFAULT_IOV_MAX = orig_iov_max;
});

mocha.it('Concurrent FileWriter with hash target', async function() {
const self = this;
self.timeout(RUN_TIMEOUT);
await file_writer_hashing.hash_target();
});

mocha.it('Concurrent FileWriter with file target', async function() {
const self = this;
self.timeout(RUN_TIMEOUT);
await file_writer_hashing.file_target();
});

mocha.it('Concurrent FileWriter with hash target - iov_max=1', async function() {
const self = this;
self.timeout(RUN_TIMEOUT);
await file_writer_hashing.hash_target(undefined, small_iov_num_parts, 1);
});

mocha.it('Concurrent FileWriter with file target - iov_max=1', async function() {
const self = this;
self.timeout(RUN_TIMEOUT);
await file_writer_hashing.file_target(undefined, small_iov_num_parts, 1);
});

mocha.it('Concurrent FileWriter with hash target - iov_max=2', async function() {
const self = this;
self.timeout(RUN_TIMEOUT);
await file_writer_hashing.hash_target(undefined, small_iov_num_parts, 2);
});

mocha.it('Concurrent FileWriter with file target - iov_max=2', async function() {
const self = this;
self.timeout(RUN_TIMEOUT);
await file_writer_hashing.file_target(undefined, small_iov_num_parts, 2);
});

mocha.it('Concurrent FileWriter with file target - produce num_chunks > 1024 && total_chunks_size < config.NSFS_BUF_SIZE_L', async function() {
const self = this;
self.timeout(RUN_TIMEOUT);
// The goal of this test is to produce num_chunks > 1024 && total_chunks_size < config.NSFS_BUF_SIZE_L
// so we will flush buffers because of reaching max num of buffers and not because we reached the max NSFS buf size
// chunk size = 100, num_chunks = (10 * 1024 * 1024)/100 < 104587, 104587 = num_chunks > 1024
// chunk size = 100, total_chunks_size after having 1024 chunks is = 100 * 1024 < config.NSFS_BUF_SIZE_L
const chunk_size = 100;
const parts_s = 50;
await file_writer_hashing.file_target(chunk_size, parts_s);
});
});
51 changes: 23 additions & 28 deletions src/tools/chunk_fs_hashing.js → src/tools/file_writer_hashing.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

const crypto = require('crypto');
const assert = require('assert');
const ChunkFS = require('../util/chunk_fs');
const FileWriter = require('../util/file_writer');
const config = require('../../config');
const nb_native = require('../util/nb_native');
const stream_utils = require('../util/stream_utils');
Expand All @@ -19,7 +19,8 @@ const PARTS = Number(argv.parts) || 1000;
const CONCURRENCY = Number(argv.concurrency) || 20;
const CHUNK = Number(argv.chunk) || 16 * 1024;
const PART_SIZE = Number(argv.part_size) || 20 * 1024 * 1024;
const F_PREFIX = argv.dst_folder || '/tmp/chunk_fs_hashing/';
const F_PREFIX = argv.dst_folder || '/tmp/file_writer_hashing/';
const IOV_MAX = argv.iov_max || config.NSFS_DEFAULT_IOV_MAX;

const DEFAULT_FS_CONFIG = {
uid: Number(argv.uid) || process.getuid(),
Expand All @@ -28,12 +29,6 @@ const DEFAULT_FS_CONFIG = {
warn_threshold_ms: 100,
};

const DUMMY_RPC = {
object: {
update_endpoint_stats: (...params) => null
}
};

const XATTR_USER_PREFIX = 'user.';
// TODO: In order to verify validity add content_md5_mtime as well
const XATTR_MD5_KEY = XATTR_USER_PREFIX + 'content_md5';
Expand Down Expand Up @@ -64,41 +59,42 @@ function assign_md5_to_fs_xattr(md5_digest, fs_xattr) {
return fs_xattr;
}

async function hash_target() {
await P.map_with_concurrency(CONCURRENCY, Array(PARTS).fill(), async () => {
async function hash_target(chunk_size = CHUNK, parts = PARTS, iov_max = IOV_MAX) {
config.NSFS_DEFAULT_IOV_MAX = iov_max;
await P.map_with_concurrency(CONCURRENCY, Array(parts).fill(), async () => {
const data = crypto.randomBytes(PART_SIZE);
const content_md5 = crypto.createHash('md5').update(data).digest('hex');
// Using async generator function in order to push data in small chunks
const source_stream = stream.Readable.from(async function*() {
for (let i = 0; i < data.length; i += CHUNK) {
yield data.slice(i, i + CHUNK);
for (let i = 0; i < data.length; i += chunk_size) {
yield data.slice(i, i + chunk_size);
}
}());
const target = new TargetHash();
const chunk_fs = new ChunkFS({
const file_writer = new FileWriter({
target_file: target,
fs_context: DEFAULT_FS_CONFIG,
rpc_client: DUMMY_RPC,
namespace_resource_id: 'MajesticSloth'
});
await stream_utils.pipeline([source_stream, chunk_fs]);
await stream_utils.wait_finished(chunk_fs);
await stream_utils.pipeline([source_stream, file_writer]);
await stream_utils.wait_finished(file_writer);
const write_hash = target.digest();
console.log(
'Hash target',
`NativeMD5=${chunk_fs.digest}`,
`NativeMD5=${file_writer.digest}`,
`DataWriteCryptoMD5=${write_hash}`,
`DataOriginMD5=${content_md5}`,
);
assert.strictEqual(content_md5, write_hash);
if (config.NSFS_CALCULATE_MD5) {
assert.strictEqual(chunk_fs.digest, content_md5);
assert.strictEqual(chunk_fs.digest, write_hash);
assert.strictEqual(file_writer.digest, content_md5);
assert.strictEqual(file_writer.digest, write_hash);
}
});
}

async function file_target(chunk_size = CHUNK, parts = PARTS) {
async function file_target(chunk_size = CHUNK, parts = PARTS, iov_max = IOV_MAX) {
config.NSFS_DEFAULT_IOV_MAX = iov_max;
fs.mkdirSync(F_PREFIX);
await P.map_with_concurrency(CONCURRENCY, Array(parts).fill(), async () => {
let target_file;
Expand All @@ -113,32 +109,31 @@ async function file_target(chunk_size = CHUNK, parts = PARTS) {
yield data.slice(i, i + chunk_size);
}
}());
const chunk_fs = new ChunkFS({
const file_writer = new FileWriter({
target_file,
fs_context: DEFAULT_FS_CONFIG,
rpc_client: DUMMY_RPC,
namespace_resource_id: 'MajesticSloth'
});
await stream_utils.pipeline([source_stream, chunk_fs]);
await stream_utils.wait_finished(chunk_fs);
await stream_utils.pipeline([source_stream, file_writer]);
await stream_utils.wait_finished(file_writer);
if (XATTR) {
await target_file.replacexattr(
DEFAULT_FS_CONFIG,
assign_md5_to_fs_xattr(chunk_fs.digest, {})
assign_md5_to_fs_xattr(file_writer.digest, {})
);
}
if (FSYNC) await target_file.fsync(DEFAULT_FS_CONFIG);
const write_hash = crypto.createHash('md5').update(fs.readFileSync(F_TARGET)).digest('hex');
console.log(
'File target',
`NativeMD5=${chunk_fs.digest}`,
`NativeMD5=${file_writer.digest}`,
`DataWriteMD5=${write_hash}`,
`DataOriginMD5=${content_md5}`,
);
assert.strictEqual(content_md5, write_hash);
if (config.NSFS_CALCULATE_MD5) {
assert.strictEqual(chunk_fs.digest, content_md5);
assert.strictEqual(chunk_fs.digest, write_hash);
assert.strictEqual(file_writer.digest, content_md5);
assert.strictEqual(file_writer.digest, write_hash);
}
// Leave parts on error
fs.rmSync(F_TARGET);
Expand Down
Loading

0 comments on commit cf0daca

Please sign in to comment.