diff --git a/config.js b/config.js index b7429fe55f..1dd31806ff 100644 --- a/config.js +++ b/config.js @@ -882,6 +882,8 @@ config.NSFS_LOW_FREE_SPACE_PERCENT_UNLEASH = 0.10; // anonymous account name config.ANONYMOUS_ACCOUNT_NAME = 'anonymous'; +config.NFSF_UPLOAD_STREAM_MEM_THRESHOLD = 8 * 1024 * 1024; + //////////////////////////// // NSFS NON CONTAINERIZED // //////////////////////////// diff --git a/src/sdk/namespace_fs.js b/src/sdk/namespace_fs.js index 53e12965f7..fabe042ed3 100644 --- a/src/sdk/namespace_fs.js +++ b/src/sdk/namespace_fs.js @@ -19,7 +19,7 @@ const stream_utils = require('../util/stream_utils'); const buffer_utils = require('../util/buffer_utils'); const size_utils = require('../util/size_utils'); const native_fs_utils = require('../util/native_fs_utils'); -const ChunkFS = require('../util/chunk_fs'); +const FileWriter = require('../util/file_writer'); const LRUCache = require('../util/lru_cache'); const nb_native = require('../util/nb_native'); const RpcError = require('../rpc/rpc_error'); @@ -1564,36 +1564,33 @@ class NamespaceFS { // Can be finetuned further on if needed and inserting the Semaphore logic inside // Instead of wrapping the whole _upload_stream function (q_buffers lives outside of the data scope of the stream) async _upload_stream({ fs_context, params, target_file, object_sdk, offset }) { - const { source_stream, copy_source } = params; + const { copy_source } = params; try { // Not using async iterators with ReadableStreams due to unsettled promises issues on abort/destroy const md5_enabled = this._is_force_md5_enabled(object_sdk); - const chunk_fs = new ChunkFS({ + const file_writer = new FileWriter({ target_file, fs_context, - stats: this.stats, - namespace_resource_id: this.namespace_resource_id, - md5_enabled, offset, + md5_enabled, + stats: this.stats, bucket: params.bucket, - large_buf_size: multi_buffer_pool.get_buffers_pool(undefined).buf_size + large_buf_size: multi_buffer_pool.get_buffers_pool(undefined).buf_size, + namespace_resource_id: this.namespace_resource_id, }); - chunk_fs.on('error', err1 => dbg.error('namespace_fs._upload_stream: error occured on stream ChunkFS: ', err1)); - chunk_fs.on('finish', arg => dbg.error('namespace_fs._upload_stream: finish occured on stream ChunkFS: ', arg)); - chunk_fs.on('close', arg => dbg.error('namespace_fs._upload_stream: close occured on stream ChunkFS: ', arg)); + file_writer.on('error', err => dbg.error('namespace_fs._upload_stream: error occured on FileWriter: ', err)); + file_writer.on('finish', arg => dbg.log1('namespace_fs._upload_stream: finish occured on stream FileWriter: ', arg)); + file_writer.on('close', arg => dbg.log1('namespace_fs._upload_stream: close occured on stream FileWriter: ', arg)); + if (copy_source) { - // ChunkFS is a Transform stream, however read_object_stream expects a write stream. call resume to close the read part - // we need to close both read and write parts for Transform stream to properly close and release resorces - chunk_fs.resume(); - await this.read_object_stream(copy_source, object_sdk, chunk_fs); + await this.read_object_stream(copy_source, object_sdk, file_writer); } else if (params.source_params) { - chunk_fs.resume(); - await params.source_ns.read_object_stream(params.source_params, object_sdk, chunk_fs); + await params.source_ns.read_object_stream(params.source_params, object_sdk, file_writer); } else { - await stream_utils.pipeline([source_stream, chunk_fs]); - await stream_utils.wait_finished(chunk_fs); + await stream_utils.pipeline([params.source_stream, file_writer]); + await stream_utils.wait_finished(file_writer); } - return { digest: chunk_fs.digest, total_bytes: chunk_fs.total_bytes }; + return { digest: file_writer.digest, total_bytes: file_writer.total_bytes }; } catch (error) { dbg.error('_upload_stream had error: ', error); throw error; diff --git a/src/test/unit_tests/index.js b/src/test/unit_tests/index.js index ac34e80579..22ecd77f58 100644 --- a/src/test/unit_tests/index.js +++ b/src/test/unit_tests/index.js @@ -57,7 +57,7 @@ require('./test_bucket_chunks_builder'); require('./test_mirror_writer'); require('./test_namespace_fs'); require('./test_ns_list_objects'); -require('./test_chunk_fs'); +require('./test_file_writer'); require('./test_namespace_fs_mpu'); require('./test_nb_native_fs'); require('./test_s3select'); diff --git a/src/test/unit_tests/nc_index.js b/src/test/unit_tests/nc_index.js index 49cc875263..d8d38e5241 100644 --- a/src/test/unit_tests/nc_index.js +++ b/src/test/unit_tests/nc_index.js @@ -7,7 +7,7 @@ coretest.setup(); require('./test_namespace_fs'); require('./test_ns_list_objects'); -require('./test_chunk_fs'); +require('./test_file_writer'); require('./test_namespace_fs_mpu'); require('./test_nb_native_fs'); require('./test_nc_nsfs_cli'); diff --git a/src/test/unit_tests/test_chunk_fs.js b/src/test/unit_tests/test_chunk_fs.js deleted file mode 100644 index 3885e21ed7..0000000000 --- a/src/test/unit_tests/test_chunk_fs.js +++ /dev/null @@ -1,34 +0,0 @@ -/* Copyright (C) 2020 NooBaa */ -/* eslint-disable no-invalid-this */ -'use strict'; - -const mocha = require('mocha'); -const chunk_fs_hashing = require('../../tools/chunk_fs_hashing'); - -mocha.describe('ChunkFS', function() { - const RUN_TIMEOUT = 10 * 60 * 1000; - - mocha.it('Concurrent ChunkFS with hash target', async function() { - const self = this; - self.timeout(RUN_TIMEOUT); - await chunk_fs_hashing.hash_target(); - }); - - mocha.it('Concurrent ChunkFS with file target', async function() { - const self = this; - self.timeout(RUN_TIMEOUT); - await chunk_fs_hashing.file_target(); - }); - - mocha.it('Concurrent ChunkFS with file target - produce num_chunks > 1024 && total_chunks_size < config.NSFS_BUF_SIZE_L', async function() { - const self = this; - self.timeout(RUN_TIMEOUT); - // The goal of this test is to produce num_chunks > 1024 && total_chunks_size < config.NSFS_BUF_SIZE_L - // so we will flush buffers because of reaching max num of buffers and not because we reached the max NSFS buf size - // chunk size = 100, num_chunks = (10 * 1024 * 1024)/100 < 104587, 104587 = num_chunks > 1024 - // chunk size = 100, total_chunks_size after having 1024 chunks is = 100 * 1024 < config.NSFS_BUF_SIZE_L - const chunk_size = 100; - const parts_s = 50; - await chunk_fs_hashing.file_target(chunk_size, parts_s); - }); -}); diff --git a/src/test/unit_tests/test_file_writer.js b/src/test/unit_tests/test_file_writer.js new file mode 100644 index 0000000000..c3773de62b --- /dev/null +++ b/src/test/unit_tests/test_file_writer.js @@ -0,0 +1,69 @@ +/* Copyright (C) 2020 NooBaa */ +/* eslint-disable no-invalid-this */ +'use strict'; + +const mocha = require('mocha'); +const config = require('../../../config'); +const file_writer_hashing = require('../../tools/file_writer_hashing'); +const orig_iov_max = config.NSFS_DEFAULT_IOV_MAX; + +// on iov_max small tests we need to use smaller amount of parts and chunks to ensure that the test will finish +// in a reasonable period of time because we will flush max 1/2 buffers at a time. +const small_iov_num_parts = 20; + + +mocha.describe('FileWriter', function() { + const RUN_TIMEOUT = 10 * 60 * 1000; + + mocha.afterEach(function() { + config.NSFS_DEFAULT_IOV_MAX = orig_iov_max; + }); + + mocha.it('Concurrent FileWriter with hash target', async function() { + const self = this; + self.timeout(RUN_TIMEOUT); + await file_writer_hashing.hash_target(); + }); + + mocha.it('Concurrent FileWriter with file target', async function() { + const self = this; + self.timeout(RUN_TIMEOUT); + await file_writer_hashing.file_target(); + }); + + mocha.it('Concurrent FileWriter with hash target - iov_max=1', async function() { + const self = this; + self.timeout(RUN_TIMEOUT); + await file_writer_hashing.hash_target(undefined, small_iov_num_parts, 1); + }); + + mocha.it('Concurrent FileWriter with file target - iov_max=1', async function() { + const self = this; + self.timeout(RUN_TIMEOUT); + await file_writer_hashing.file_target(undefined, small_iov_num_parts, 1); + }); + + mocha.it('Concurrent FileWriter with hash target - iov_max=2', async function() { + const self = this; + self.timeout(RUN_TIMEOUT); + await file_writer_hashing.hash_target(undefined, small_iov_num_parts, 2); + }); + + mocha.it('Concurrent FileWriter with file target - iov_max=2', async function() { + const self = this; + self.timeout(RUN_TIMEOUT); + await file_writer_hashing.file_target(undefined, small_iov_num_parts, 2); + }); + + mocha.it('Concurrent FileWriter with file target - produce num_chunks > 1024 && total_chunks_size < config.NSFS_BUF_SIZE_L', async function() { + const self = this; + self.timeout(RUN_TIMEOUT); + // The goal of this test is to produce num_chunks > 1024 && total_chunks_size < config.NSFS_BUF_SIZE_L + // so we will flush buffers because of reaching max num of buffers and not because we reached the max NSFS buf size + // chunk size = 100, num_chunks = (10 * 1024 * 1024)/100 < 104587, 104587 = num_chunks > 1024 + // chunk size = 100, total_chunks_size after having 1024 chunks is = 100 * 1024 < config.NSFS_BUF_SIZE_L + const chunk_size = 100; + const parts_s = 50; + await file_writer_hashing.file_target(chunk_size, parts_s); + }); +}); diff --git a/src/tools/chunk_fs_hashing.js b/src/tools/file_writer_hashing.js similarity index 76% rename from src/tools/chunk_fs_hashing.js rename to src/tools/file_writer_hashing.js index 09a3465720..e3f2c980dc 100644 --- a/src/tools/chunk_fs_hashing.js +++ b/src/tools/file_writer_hashing.js @@ -3,7 +3,7 @@ const crypto = require('crypto'); const assert = require('assert'); -const ChunkFS = require('../util/chunk_fs'); +const FileWriter = require('../util/file_writer'); const config = require('../../config'); const nb_native = require('../util/nb_native'); const stream_utils = require('../util/stream_utils'); @@ -19,7 +19,8 @@ const PARTS = Number(argv.parts) || 1000; const CONCURRENCY = Number(argv.concurrency) || 20; const CHUNK = Number(argv.chunk) || 16 * 1024; const PART_SIZE = Number(argv.part_size) || 20 * 1024 * 1024; -const F_PREFIX = argv.dst_folder || '/tmp/chunk_fs_hashing/'; +const F_PREFIX = argv.dst_folder || '/tmp/file_writer_hashing/'; +const IOV_MAX = argv.iov_max || config.NSFS_DEFAULT_IOV_MAX; const DEFAULT_FS_CONFIG = { uid: Number(argv.uid) || process.getuid(), @@ -28,12 +29,6 @@ const DEFAULT_FS_CONFIG = { warn_threshold_ms: 100, }; -const DUMMY_RPC = { - object: { - update_endpoint_stats: (...params) => null - } -}; - const XATTR_USER_PREFIX = 'user.'; // TODO: In order to verify validity add content_md5_mtime as well const XATTR_MD5_KEY = XATTR_USER_PREFIX + 'content_md5'; @@ -64,41 +59,42 @@ function assign_md5_to_fs_xattr(md5_digest, fs_xattr) { return fs_xattr; } -async function hash_target() { - await P.map_with_concurrency(CONCURRENCY, Array(PARTS).fill(), async () => { +async function hash_target(chunk_size = CHUNK, parts = PARTS, iov_max = IOV_MAX) { + config.NSFS_DEFAULT_IOV_MAX = iov_max; + await P.map_with_concurrency(CONCURRENCY, Array(parts).fill(), async () => { const data = crypto.randomBytes(PART_SIZE); const content_md5 = crypto.createHash('md5').update(data).digest('hex'); // Using async generator function in order to push data in small chunks const source_stream = stream.Readable.from(async function*() { - for (let i = 0; i < data.length; i += CHUNK) { - yield data.slice(i, i + CHUNK); + for (let i = 0; i < data.length; i += chunk_size) { + yield data.slice(i, i + chunk_size); } }()); const target = new TargetHash(); - const chunk_fs = new ChunkFS({ + const file_writer = new FileWriter({ target_file: target, fs_context: DEFAULT_FS_CONFIG, - rpc_client: DUMMY_RPC, namespace_resource_id: 'MajesticSloth' }); - await stream_utils.pipeline([source_stream, chunk_fs]); - await stream_utils.wait_finished(chunk_fs); + await stream_utils.pipeline([source_stream, file_writer]); + await stream_utils.wait_finished(file_writer); const write_hash = target.digest(); console.log( 'Hash target', - `NativeMD5=${chunk_fs.digest}`, + `NativeMD5=${file_writer.digest}`, `DataWriteCryptoMD5=${write_hash}`, `DataOriginMD5=${content_md5}`, ); assert.strictEqual(content_md5, write_hash); if (config.NSFS_CALCULATE_MD5) { - assert.strictEqual(chunk_fs.digest, content_md5); - assert.strictEqual(chunk_fs.digest, write_hash); + assert.strictEqual(file_writer.digest, content_md5); + assert.strictEqual(file_writer.digest, write_hash); } }); } -async function file_target(chunk_size = CHUNK, parts = PARTS) { +async function file_target(chunk_size = CHUNK, parts = PARTS, iov_max = IOV_MAX) { + config.NSFS_DEFAULT_IOV_MAX = iov_max; fs.mkdirSync(F_PREFIX); await P.map_with_concurrency(CONCURRENCY, Array(parts).fill(), async () => { let target_file; @@ -113,32 +109,31 @@ async function file_target(chunk_size = CHUNK, parts = PARTS) { yield data.slice(i, i + chunk_size); } }()); - const chunk_fs = new ChunkFS({ + const file_writer = new FileWriter({ target_file, fs_context: DEFAULT_FS_CONFIG, - rpc_client: DUMMY_RPC, namespace_resource_id: 'MajesticSloth' }); - await stream_utils.pipeline([source_stream, chunk_fs]); - await stream_utils.wait_finished(chunk_fs); + await stream_utils.pipeline([source_stream, file_writer]); + await stream_utils.wait_finished(file_writer); if (XATTR) { await target_file.replacexattr( DEFAULT_FS_CONFIG, - assign_md5_to_fs_xattr(chunk_fs.digest, {}) + assign_md5_to_fs_xattr(file_writer.digest, {}) ); } if (FSYNC) await target_file.fsync(DEFAULT_FS_CONFIG); const write_hash = crypto.createHash('md5').update(fs.readFileSync(F_TARGET)).digest('hex'); console.log( 'File target', - `NativeMD5=${chunk_fs.digest}`, + `NativeMD5=${file_writer.digest}`, `DataWriteMD5=${write_hash}`, `DataOriginMD5=${content_md5}`, ); assert.strictEqual(content_md5, write_hash); if (config.NSFS_CALCULATE_MD5) { - assert.strictEqual(chunk_fs.digest, content_md5); - assert.strictEqual(chunk_fs.digest, write_hash); + assert.strictEqual(file_writer.digest, content_md5); + assert.strictEqual(file_writer.digest, write_hash); } // Leave parts on error fs.rmSync(F_TARGET); diff --git a/src/util/chunk_fs.js b/src/util/chunk_fs.js deleted file mode 100644 index e60c9aa9e0..0000000000 --- a/src/util/chunk_fs.js +++ /dev/null @@ -1,111 +0,0 @@ -/* Copyright (C) 2016 NooBaa */ -'use strict'; - -const stream = require('stream'); -const config = require('../../config'); -const nb_native = require('./nb_native'); -const dbg = require('../util/debug_module')(__filename); - -/** - * - * ChunkFS - * - * Calculates etag and writes stream data to the filesystem batching data buffers - * - */ -class ChunkFS extends stream.Transform { - - /** - * @param {{ - * target_file: object, - * fs_context: object, - * namespace_resource_id: string, - * md5_enabled: boolean, - * stats: import('../sdk/endpoint_stats_collector').EndpointStatsCollector, - * offset?: number, - * bucket?: string, - * large_buf_size?: number, - * }} params - */ - constructor({ target_file, fs_context, namespace_resource_id, md5_enabled, stats, offset, bucket, large_buf_size }) { - super(); - this.q_buffers = []; - this.q_size = 0; - this.MD5Async = md5_enabled ? new (nb_native().crypto.MD5Async)() : undefined; - this.target_file = target_file; - this.fs_context = fs_context; - this.count = 1; - this.total_bytes = 0; - this.offset = offset; - this.namespace_resource_id = namespace_resource_id; - this.stats = stats; - this._total_num_buffers = 0; - const platform_iov_max = nb_native().fs.PLATFORM_IOV_MAX; - this.iov_max = platform_iov_max ? Math.min(platform_iov_max, config.NSFS_DEFAULT_IOV_MAX) : config.NSFS_DEFAULT_IOV_MAX; - this.bucket = bucket; - this.large_buf_size = large_buf_size || config.NSFS_BUF_SIZE_L; - } - - async _transform(chunk, encoding, callback) { - try { - if (this.MD5Async) await this.MD5Async.update(chunk); - this.stats?.update_nsfs_write_stats({ - namespace_resource_id: this.namespace_resource_id, - size: chunk.length, - count: this.count, - bucket_name: this.bucket, - }); - this.count = 0; - while (chunk && chunk.length) { - const available_size = this.large_buf_size - this.q_size; - const buf = (available_size < chunk.length) ? chunk.slice(0, available_size) : chunk; - this.q_buffers.push(buf); - this.q_size += buf.length; - // Should flush when num of chunks equals to max iov which is the limit according to https://linux.die.net/man/2/writev - // or when q_size equals to config.NSFS_BUF_SIZE_L, but added greater than just in case - if (this.q_buffers.length === this.iov_max || this.q_size >= config.NSFS_BUF_SIZE_L) await this._flush_buffers(); - chunk = (available_size < chunk.length) ? chunk.slice(available_size) : null; - } - return callback(); - } catch (error) { - console.error('ChunkFS _transform failed', this.q_size, this._total_num_buffers, error); - return callback(error); - } - } - - async _flush(callback) { - // wait before the last writev to finish - await this._flush_buffers(callback); - } - - // callback will be passed only at the end of the stream by _flush() - // while this function is called without callback during _transform() and returns a promise. - async _flush_buffers(callback) { - try { - if (this.q_buffers.length) { - const buffers_to_write = this.q_buffers; - const size_to_write = this.q_size; - this.q_buffers = []; - this.q_size = 0; - dbg.log1(`Chunk_fs._flush_buffers: writing ${buffers_to_write.length} buffers, total size is ${size_to_write}`); - await this.target_file.writev(this.fs_context, buffers_to_write, this.offset); - // Hold the ref on the buffers from the JS side - this._total_num_buffers += buffers_to_write.length; - this.total_bytes += size_to_write; - if (this.offset >= 0) this.offset += size_to_write; - } - if (callback) { - if (this.MD5Async) this.digest = (await this.MD5Async.digest()).toString('hex'); - return callback(); - } - } catch (error) { - console.error('ChunkFS _flush_buffers failed', this.q_size, this._total_num_buffers, error); - if (callback) { - return callback(error); - } - throw error; - } - } -} - -module.exports = ChunkFS; diff --git a/src/util/file_writer.js b/src/util/file_writer.js new file mode 100644 index 0000000000..c8df126719 --- /dev/null +++ b/src/util/file_writer.js @@ -0,0 +1,140 @@ +/* Copyright (C) 2016 NooBaa */ +'use strict'; + +const stream = require('stream'); +const config = require('../../config'); +const nb_native = require('./nb_native'); +const dbg = require('../util/debug_module')(__filename); + +/** + * FileWriter is a Writable stream that write data to a filesystem file, + * with optional calculation of md5 for etag. + */ +class FileWriter extends stream.Writable { + + /** + * @param {{ + * target_file: object, + * fs_context: object, + * namespace_resource_id: string, + * md5_enabled: boolean, + * stats: import('../sdk/endpoint_stats_collector').EndpointStatsCollector, + * offset?: number, + * bucket?: string, + * large_buf_size?: number, + * }} params + */ + constructor({ target_file, fs_context, namespace_resource_id, md5_enabled, stats, offset, bucket, large_buf_size }) { + super({ highWaterMark: config.NFSF_UPLOAD_STREAM_MEM_THRESHOLD }); + this.target_file = target_file; + this.fs_context = fs_context; + this.offset = offset; + this.total_bytes = 0; + this.count_once = 1; + this.stats = stats; + this.bucket = bucket; + this.namespace_resource_id = namespace_resource_id; + this.large_buf_size = large_buf_size || config.NSFS_BUF_SIZE_L; + this.MD5Async = md5_enabled ? new (nb_native().crypto.MD5Async)() : undefined; + const platform_iov_max = nb_native().fs.PLATFORM_IOV_MAX; + this.iov_max = platform_iov_max ? Math.min(platform_iov_max, config.NSFS_DEFAULT_IOV_MAX) : config.NSFS_DEFAULT_IOV_MAX; + } + + /** + * @param {number} size + */ + _update_stats(size) { + const count = this.count_once; + this.count_once = 0; // counting the entire operation just once + this.stats?.update_nsfs_write_stats({ + namespace_resource_id: this.namespace_resource_id, + size, + count, + bucket_name: this.bucket, + }); + } + + /** + * @param {Buffer[]} buffers + * @param {number} size + */ + async _update_md5(buffers, size) { + // TODO optimize by calling once with all buffers + for (const buf of buffers) { + await this.MD5Async.update(buf); + } + } + + /** + * @param {Buffer[]} buffers + * @param {number} size + */ + async _write_all_buffers(buffers, size) { + if (buffers.length <= this.iov_max) { + await this._write_to_file(buffers, size); + } else { + let iov_start = 0; + while (iov_start < buffers.length) { + const iov_end = Math.min(buffers.length, iov_start + this.iov_max); + const buffers_to_write = buffers.slice(iov_start, iov_end); + const size_to_write = buffers_to_write.reduce((s, b) => s + b.length, 0); + await this._write_to_file(buffers_to_write, size_to_write); + iov_start = iov_end; + } + } + } + + /** + * @param {Buffer[]} buffers + * @param {number} size + */ + async _write_to_file(buffers, size) { + dbg.log1(`FileWriter._write_to_file: buffers ${buffers.length} size ${size} offset ${this.offset}`); + await this.target_file.writev(this.fs_context, buffers, this.offset); + if (this.offset >= 0) this.offset += size; // when offset<0 we just append + this.total_bytes += size; + } + + /** + * @param {Array<{ chunk: Buffer; encoding: BufferEncoding; }>} chunks + * @param {(error?: Error | null) => void} callback + */ + async _writev(chunks, callback) { + try { + let size = 0; + const buffers = chunks.map(it => { + size += it.chunk.length; + return it.chunk; + }); + await Promise.all([ + this.MD5Async && this._update_md5(buffers, size), + this._write_all_buffers(buffers, size), + ]); + this._update_stats(size); + return callback(); + } catch (err) { + console.error('FileWriter._writev: failed', err); + return callback(err); + } + } + + /** + * @param {(error?: Error | null) => void} callback + */ + async _final(callback) { + try { + if (this.MD5Async) { + const digest = await this.MD5Async.digest(); + this.digest = digest.toString('hex'); + } + + return callback(); + } catch (err) { + console.error('FileWriter._final: failed', err); + return callback(err); + } + } + +} + +module.exports = FileWriter;