From 40908cc0df1a6d8fbb0e98eb0ad0c5176c5cb8ab Mon Sep 17 00:00:00 2001 From: achingbrain Date: Wed, 27 Nov 2019 07:50:49 +0000 Subject: [PATCH] perf: concurrent file import Adds two new options: `fileImportConcurrency` This controls the number of files that are imported concurrently. You may wish to set this high if you are importing lots of small files. `blockWriteConcurrency` This controls how many blocks from each file we write to disk at the same time. Setting this high when writing large files will significantly increase import speed, though having it high when `fileImportConcurrency` is also high can swamp the process. It also: 1. Flattens module options because validating deep objects was clunky and the separation of access to config sub objects within this module isn't very good 1. Replaces `superstruct` and `deep-extend` with `merge-options` which is better suited for merging options and is smaller 1. Replaces `async-iterator-*` modules with the more zeitgeisty `it-*` namespace Supersedes #38, sort of. --- README.md | 9 +-- package.json | 18 +++--- src/dag-builder/file/balanced.js | 2 +- src/dag-builder/file/flat.js | 4 +- src/dag-builder/file/index.js | 70 +++++++++++---------- src/dag-builder/file/trickle.js | 2 +- src/dag-builder/index.js | 6 +- src/dir-sharded.js | 4 +- src/index.js | 89 ++++++++------------------- src/tree-builder.js | 5 +- test/benchmark.spec.js | 4 +- test/builder-balanced.spec.js | 2 +- test/builder-dir-sharding.spec.js | 4 +- test/builder-flat.spec.js | 2 +- test/builder-only-hash.spec.js | 12 ++-- test/builder-trickle-dag.spec.js | 2 +- test/builder.spec.js | 12 ++-- test/chunker-fixed-size.spec.js | 2 +- test/chunker-rabin.spec.js | 2 +- test/hash-parity-with-go-ipfs.spec.js | 2 +- test/import-export-nested-dir.spec.js | 2 +- test/importer.spec.js | 8 +-- 22 files changed, 116 insertions(+), 147 deletions(-) diff --git a/README.md b/README.md index aab39d9..6eea143 100644 --- a/README.md +++ b/README.md @@ -124,10 +124,9 @@ The input's file paths and directory structure will be preserved in the [`dag-pb - `chunker` (string, defaults to `"fixed"`): the chunking strategy. Supports: - `fixed` - `rabin` -- `chunkerOptions` (object, optional): the options for the chunker. Defaults to an object with the following properties: - - `avgChunkSize` (positive integer, defaults to `262144`): the average chunk size (rabin chunker only) - - `minChunkSize` (positive integer): the minimum chunk size (rabin chunker only) - - `maxChunkSize` (positive integer, defaults to `262144`): the maximum chunk size +- `avgChunkSize` (positive integer, defaults to `262144`): the average chunk size (rabin chunker only) +- `minChunkSize` (positive integer): the minimum chunk size (rabin chunker only) +- `maxChunkSize` (positive integer, defaults to `262144`): the maximum chunk size - `strategy` (string, defaults to `"balanced"`): the DAG builder strategy name. Supports: - `flat`: flat list of chunks - `balanced`: builds a balanced tree @@ -144,6 +143,8 @@ The input's file paths and directory structure will be preserved in the [`dag-pb - `cidVersion` (integer, default 0): the CID version to use when storing the data (storage keys are based on the CID, _including_ it's version) - `rawLeaves` (boolean, defaults to false): When a file would span multiple DAGNodes, if this is true the leaf nodes will not be wrapped in `UnixFS` protobufs and will instead contain the raw file bytes - `leafType` (string, defaults to `'file'`) what type of UnixFS node leaves should be - can be `'file'` or `'raw'` (ignored when `rawLeaves` is `true`) +- `blockWriteConcurrency` (positive integer, defaults to 10) How many blocks to hash and write to the block store concurrently. For small numbers of large files this should be high (e.g. 50). +- `fileImportConcurrency` (number, defaults to 50) How many files to import concurrently. For large numbers of small files this should be high (e.g. 50). [ipld-resolver instance]: https://github.com/ipld/js-ipld-resolver [UnixFS]: https://github.com/ipfs/specs/tree/master/unixfs diff --git a/package.json b/package.json index e9d2f59..5e5b735 100644 --- a/package.json +++ b/package.json @@ -38,33 +38,35 @@ "homepage": "https://github.com/ipfs/js-ipfs-unixfs-importer#readme", "devDependencies": { "aegir": "^20.0.0", - "async-iterator-buffer-stream": "^1.0.0", - "async-iterator-last": "^1.0.0", "chai": "^4.2.0", "cids": "~0.7.1", + "deep-extend": "~0.6.0", "detect-node": "^2.0.4", "dirty-chai": "^2.0.1", "ipfs-unixfs-exporter": "^0.39.0", "ipld": "^0.25.0", "ipld-in-memory": "^3.0.0", + "it-buffer-stream": "^1.0.0", + "it-last": "^1.0.0", "multihashes": "~0.4.14", "nyc": "^14.0.0", "sinon": "^7.1.0" }, "dependencies": { - "async-iterator-all": "^1.0.0", - "async-iterator-batch": "~0.0.1", - "async-iterator-first": "^1.0.0", "bl": "^4.0.0", - "deep-extend": "~0.6.0", "err-code": "^2.0.0", "hamt-sharding": "~0.0.2", "ipfs-unixfs": "^0.2.0", "ipld-dag-pb": "^0.18.0", + "it-all": "^1.0.1", + "it-batch": "^1.0.3", + "it-first": "^1.0.1", + "it-flat-batch": "^1.0.2", + "it-parallel-batch": "1.0.2", + "merge-options": "^2.0.0", "multicodec": "~0.5.1", "multihashing-async": "^0.8.0", - "rabin-wasm": "~0.0.8", - "superstruct": "^0.8.2" + "rabin-wasm": "~0.0.8" }, "contributors": [ "Alan Shaw ", diff --git a/src/dag-builder/file/balanced.js b/src/dag-builder/file/balanced.js index 72b3cb8..26fd7a2 100644 --- a/src/dag-builder/file/balanced.js +++ b/src/dag-builder/file/balanced.js @@ -1,6 +1,6 @@ 'use strict' -const batch = require('async-iterator-batch') +const batch = require('it-flat-batch') async function * balanced (source, reduce, options) { yield await reduceToParents(source, reduce, options) diff --git a/src/dag-builder/file/flat.js b/src/dag-builder/file/flat.js index c7ba75e..7146d2e 100644 --- a/src/dag-builder/file/flat.js +++ b/src/dag-builder/file/flat.js @@ -1,11 +1,11 @@ 'use strict' -const batch = require('async-iterator-batch') +const batch = require('it-flat-batch') module.exports = async function * (source, reduce) { const roots = [] - for await (const chunk of batch(source, Infinity)) { + for await (const chunk of batch(source, Number.MAX_SAFE_INTEGER)) { roots.push(await reduce(chunk)) } diff --git a/src/dag-builder/file/index.js b/src/dag-builder/file/index.js index f3be403..bfb71a6 100644 --- a/src/dag-builder/file/index.js +++ b/src/dag-builder/file/index.js @@ -7,7 +7,8 @@ const { DAGNode, DAGLink } = require('ipld-dag-pb') -const all = require('async-iterator-all') +const all = require('it-all') +const parallelBatch = require('it-parallel-batch') const dagBuilders = { flat: require('./flat'), @@ -15,46 +16,53 @@ const dagBuilders = { trickle: require('./trickle') } -async function * buildFile (file, source, ipld, options) { - let count = -1 - let previous - +async function * importBuffer (file, source, ipld, options) { for await (const buffer of source) { - count++ - options.progress(buffer.length) - let node - let unixfs + yield async () => { + options.progress(buffer.length) + let node + let unixfs - const opts = { - ...options - } + const opts = { + ...options + } - if (options.rawLeaves) { - node = buffer + if (options.rawLeaves) { + node = buffer - opts.codec = 'raw' - opts.cidVersion = 1 - } else { - unixfs = new UnixFS(options.leafType, buffer) + opts.codec = 'raw' + opts.cidVersion = 1 + } else { + unixfs = new UnixFS(options.leafType, buffer) - if (file.mtime) { - unixfs.mtime = file.mtime - } + if (file.mtime) { + unixfs.mtime = file.mtime + } + + if (file.mode) { + unixfs.mode = file.mode + } - if (file.mode) { - unixfs.mode = file.mode + node = new DAGNode(unixfs.marshal()) } - node = new DAGNode(unixfs.marshal()) + const cid = await persist(node, ipld, opts) + + return { + cid: cid, + unixfs, + node + } } + } +} - const cid = await persist(node, ipld, opts) +async function * buildFileBatch (file, source, ipld, options) { + let count = -1 + let previous - const entry = { - cid: cid, - unixfs, - node - } + for await (const entry of parallelBatch(importBuffer(file, source, ipld, options), options.blockWriteConcurrency)) { + count++ if (count === 0) { previous = entry @@ -149,7 +157,7 @@ const fileBuilder = async (file, source, ipld, options) => { throw errCode(new Error(`Unknown importer build strategy name: ${options.strategy}`), 'ERR_BAD_STRATEGY') } - const roots = await all(dagBuilder(buildFile(file, source, ipld, options), reduce(file, ipld, options), options.builderOptions)) + const roots = await all(dagBuilder(buildFileBatch(file, source, ipld, options), reduce(file, ipld, options), options)) if (roots.length > 1) { throw errCode(new Error('expected a maximum of 1 roots and got ' + roots.length), 'ETOOMANYROOTS') diff --git a/src/dag-builder/file/trickle.js b/src/dag-builder/file/trickle.js index a970517..9941d87 100644 --- a/src/dag-builder/file/trickle.js +++ b/src/dag-builder/file/trickle.js @@ -1,6 +1,6 @@ 'use strict' -const batch = require('async-iterator-batch') +const batch = require('it-flat-batch') module.exports = function * trickleReduceToRoot (source, reduce, options) { yield trickleStream(source, reduce, options) diff --git a/src/dag-builder/index.js b/src/dag-builder/index.js index 013d498..bc2f4b2 100644 --- a/src/dag-builder/index.js +++ b/src/dag-builder/index.js @@ -30,13 +30,13 @@ async function * dagBuilder (source, ipld, options) { } } - const chunker = createChunker(options.chunker, validateChunks(source), options.chunkerOptions) + const chunker = createChunker(options.chunker, validateChunks(source), options) // item is a file - yield fileBuilder(entry, chunker, ipld, options) + yield () => fileBuilder(entry, chunker, ipld, options) } else { // item is a directory - yield dirBuilder(entry, ipld, options) + yield () => dirBuilder(entry, ipld, options) } } } diff --git a/src/dir-sharded.js b/src/dir-sharded.js index e515b8a..97dcf36 100644 --- a/src/dir-sharded.js +++ b/src/dir-sharded.js @@ -9,7 +9,7 @@ const multihashing = require('multihashing-async') const Dir = require('./dir') const persist = require('./utils/persist') const Bucket = require('hamt-sharding') -const extend = require('deep-extend') +const mergeOptions = require('merge-options').bind({ ignoreUndefined: true }) const hashFn = async function (value) { const hash = await multihashing(Buffer.from(value, 'utf8'), 'murmur3-128') @@ -36,7 +36,7 @@ const defaultOptions = { class DirSharded extends Dir { constructor (props, options) { - options = extend({}, defaultOptions, options) + options = mergeOptions(defaultOptions, options) super(props, options) diff --git a/src/index.js b/src/index.js index 4ff621d..fc09074 100644 --- a/src/index.js +++ b/src/index.js @@ -1,78 +1,41 @@ 'use strict' -const { superstruct } = require('superstruct') const dagBuilder = require('./dag-builder') const treeBuilder = require('./tree-builder') -const mh = require('multihashes') +const parallelBatch = require('it-parallel-batch') +const mergeOptions = require('merge-options').bind({ ignoreUndefined: true }) -const struct = superstruct({ - types: { - codec: v => ['dag-pb', 'dag-cbor', 'raw'].includes(v), - hashAlg: v => Object.keys(mh.names).includes(v), - leafType: v => ['file', 'raw'].includes(v) - } -}) - -const ChunkerOptions = struct({ - minChunkSize: 'number?', - maxChunkSize: 'number?', - avgChunkSize: 'number?', - window: 'number?', - polynomial: 'number?' -}, { - maxChunkSize: 262144, - avgChunkSize: 262144, - window: 16, - polynomial: 17437180132763653 // https://github.com/ipfs/go-ipfs-chunker/blob/d0125832512163708c0804a3cda060e21acddae4/rabin.go#L11 -}) - -const BuilderOptions = struct({ - maxChildrenPerNode: 'number?', - layerRepeat: 'number?' -}, { - maxChildrenPerNode: 174, - layerRepeat: 4 -}) - -const Options = struct({ - chunker: struct.enum(['fixed', 'rabin']), - rawLeaves: 'boolean?', - hashOnly: 'boolean?', - strategy: struct.enum(['balanced', 'flat', 'trickle']), - reduceSingleLeafToSelf: 'boolean?', - codec: 'codec?', - format: 'codec?', - hashAlg: 'hashAlg?', - leafType: 'leafType?', - cidVersion: 'number?', - progress: 'function?', - wrapWithDirectory: 'boolean?', - shardSplitThreshold: 'number?', - onlyHash: 'boolean?', - chunkerOptions: ChunkerOptions, - builderOptions: BuilderOptions, - - wrap: 'boolean?', - pin: 'boolean?', - recursive: 'boolean?', - ignore: 'array?', - hidden: 'boolean?', - preload: 'boolean?' -}, { +const defaultOptions = { chunker: 'fixed', - strategy: 'balanced', + strategy: 'balanced', // 'flat', 'trickle' rawLeaves: false, + onlyHash: false, reduceSingleLeafToSelf: true, codec: 'dag-pb', hashAlg: 'sha2-256', - leafType: 'file', + leafType: 'file', // 'raw' cidVersion: 0, progress: () => () => {}, - shardSplitThreshold: 1000 -}) + shardSplitThreshold: 1000, + fileImportConcurrency: 50, + blockWriteConcurrency: 10, + minChunkSize: 262144, + maxChunkSize: 262144, + avgChunkSize: 262144, + window: 16, + polynomial: 17437180132763653, // https://github.com/ipfs/go-ipfs-chunker/blob/d0125832512163708c0804a3cda060e21acddae4/rabin.go#L11 + maxChildrenPerNode: 174, + layerRepeat: 4, + wrapWithDirectory: false, + pin: true, + recursive: false, + ignore: null, // [] + hidden: false, + preload: true +} module.exports = async function * (source, ipld, options = {}) { - const opts = Options(options) + const opts = mergeOptions(defaultOptions, options) if (options.cidVersion > 0 && options.rawLeaves === undefined) { // if the cid version is 1 or above, use raw leaves as this is @@ -93,10 +56,10 @@ module.exports = async function * (source, ipld, options = {}) { } if (options.format) { - options.codec = options.format + opts.codec = options.format } - for await (const entry of treeBuilder(dagBuilder(source, ipld, opts), ipld, opts)) { + for await (const entry of treeBuilder(parallelBatch(dagBuilder(source, ipld, opts), opts.fileImportConcurrency), ipld, opts)) { yield { cid: entry.cid, path: entry.path, diff --git a/src/tree-builder.js b/src/tree-builder.js index 55bab49..cd43170 100644 --- a/src/tree-builder.js +++ b/src/tree-builder.js @@ -5,7 +5,7 @@ const flatToShard = require('./flat-to-shard') const Dir = require('./dir') const toPathComponents = require('./utils/to-path-components') const errCode = require('err-code') -const first = require('async-iterator-first') +const first = require('it-first') async function addToTree (elem, tree, options) { const pathElems = toPathComponents(elem.path || '') @@ -61,6 +61,9 @@ async function * treeBuilder (source, ipld, options) { }, options) for await (const entry of source) { + if (!entry) { + continue + } tree = await addToTree(entry, tree, options) yield entry diff --git a/test/benchmark.spec.js b/test/benchmark.spec.js index b0b1db2..fae3f48 100644 --- a/test/benchmark.spec.js +++ b/test/benchmark.spec.js @@ -5,8 +5,8 @@ const importer = require('../src') const IPLD = require('ipld') const inMemory = require('ipld-in-memory') -const bufferStream = require('async-iterator-buffer-stream') -const all = require('async-iterator-all') +const bufferStream = require('it-buffer-stream') +const all = require('it-all') const REPEATS = 10 const FILE_SIZE = Math.pow(2, 20) * 500 // 500MB diff --git a/test/builder-balanced.spec.js b/test/builder-balanced.spec.js index f064dd8..17242a3 100644 --- a/test/builder-balanced.spec.js +++ b/test/builder-balanced.spec.js @@ -5,7 +5,7 @@ const chai = require('chai') chai.use(require('dirty-chai')) const expect = chai.expect const builder = require('../src/dag-builder/file/balanced') -const all = require('async-iterator-all') +const all = require('it-all') function reduce (leaves) { if (leaves.length > 1) { diff --git a/test/builder-dir-sharding.spec.js b/test/builder-dir-sharding.spec.js index 057eb2e..b52b07b 100644 --- a/test/builder-dir-sharding.spec.js +++ b/test/builder-dir-sharding.spec.js @@ -9,8 +9,8 @@ chai.use(require('dirty-chai')) const expect = chai.expect const IPLD = require('ipld') const inMemory = require('ipld-in-memory') -const all = require('async-iterator-all') -const last = require('async-iterator-last') +const all = require('it-all') +const last = require('it-last') describe('builder: directory sharding', () => { let ipld diff --git a/test/builder-flat.spec.js b/test/builder-flat.spec.js index 1e3acc8..e3f0339 100644 --- a/test/builder-flat.spec.js +++ b/test/builder-flat.spec.js @@ -5,7 +5,7 @@ const chai = require('chai') chai.use(require('dirty-chai')) const expect = chai.expect const builder = require('../src/dag-builder/file/flat') -const all = require('async-iterator-all') +const all = require('it-all') function reduce (leaves) { if (leaves.length > 1) { diff --git a/test/builder-only-hash.spec.js b/test/builder-only-hash.spec.js index 45ac5dd..e7e7642 100644 --- a/test/builder-only-hash.spec.js +++ b/test/builder-only-hash.spec.js @@ -7,7 +7,7 @@ const expect = chai.expect const IPLD = require('ipld') const inMemory = require('ipld-in-memory') const builder = require('../src/dag-builder') -const all = require('async-iterator-all') +const all = require('it-all') describe('builder: onlyHash', () => { let ipld @@ -30,18 +30,14 @@ describe('builder: onlyHash', () => { format: 'dag-pb', hashAlg: 'sha2-256', wrap: true, - chunkerOptions: { - maxChunkSize: 1024 - }, - builderOptions: { - maxChildrenPerNode: 254 - } + maxChunkSize: 1024, + maxChildrenPerNode: 254 })) expect(nodes.length).to.equal(1) try { - await ipld.get(nodes[0].cid) + await ipld.get((await nodes[0]()).cid) throw new Error('Should have errored') } catch (err) { diff --git a/test/builder-trickle-dag.spec.js b/test/builder-trickle-dag.spec.js index 8e625b4..ba6c239 100644 --- a/test/builder-trickle-dag.spec.js +++ b/test/builder-trickle-dag.spec.js @@ -5,7 +5,7 @@ const chai = require('chai') chai.use(require('dirty-chai')) const expect = chai.expect const builder = require('../src/dag-builder/file/trickle') -const all = require('async-iterator-all') +const all = require('it-all') const createValues = (max) => { const output = [] diff --git a/test/builder.spec.js b/test/builder.spec.js index 1eea492..bbeb9d0 100644 --- a/test/builder.spec.js +++ b/test/builder.spec.js @@ -9,7 +9,7 @@ const IPLD = require('ipld') const inMemory = require('ipld-in-memory') const UnixFS = require('ipfs-unixfs') const builder = require('../src/dag-builder') -const first = require('async-iterator-first') +const first = require('it-first') describe('builder', () => { let ipld @@ -27,9 +27,7 @@ describe('builder', () => { format: 'dag-pb', hashAlg: 'sha2-256', progress: () => {}, - chunkerOptions: { - maxChunkSize: 262144 - } + maxChunkSize: 262144 } it('allows multihash hash algorithm to be specified', async () => { @@ -45,7 +43,7 @@ describe('builder', () => { content: Buffer.from(content) } - const imported = await first(builder([inputFile], ipld, options)) + const imported = await (await first(builder([inputFile], ipld, options)))() expect(imported).to.exist() @@ -76,7 +74,7 @@ describe('builder', () => { content: Buffer.alloc(262144 + 5).fill(1) } - const imported = await first(builder([Object.assign({}, inputFile)], ipld, options)) + const imported = await (await first(builder([inputFile], ipld, options)))() expect(imported).to.exist() expect(mh.decode(imported.cid.multihash).name).to.equal(hashAlg) @@ -96,7 +94,7 @@ describe('builder', () => { content: null } - const imported = await first(builder([Object.assign({}, inputFile)], ipld, options)) + const imported = await (await first(builder([Object.assign({}, inputFile)], ipld, options)))() expect(mh.decode(imported.cid.multihash).name).to.equal(hashAlg) diff --git a/test/chunker-fixed-size.spec.js b/test/chunker-fixed-size.spec.js index e61653e..276702a 100644 --- a/test/chunker-fixed-size.spec.js +++ b/test/chunker-fixed-size.spec.js @@ -6,7 +6,7 @@ const chai = require('chai') chai.use(require('dirty-chai')) const expect = chai.expect const isNode = require('detect-node') -const all = require('async-iterator-all') +const all = require('it-all') const loadFixture = require('aegir/fixtures') const rawFile = loadFixture((isNode ? __dirname : 'test') + '/fixtures/1MiB.txt') diff --git a/test/chunker-rabin.spec.js b/test/chunker-rabin.spec.js index 6caccc8..9f9a4af 100644 --- a/test/chunker-rabin.spec.js +++ b/test/chunker-rabin.spec.js @@ -7,7 +7,7 @@ chai.use(require('dirty-chai')) const expect = chai.expect const loadFixture = require('aegir/fixtures') const isNode = require('detect-node') -const all = require('async-iterator-all') +const all = require('it-all') const rawFile = loadFixture((isNode ? __dirname : 'test') + '/fixtures/1MiB.txt') diff --git a/test/hash-parity-with-go-ipfs.spec.js b/test/hash-parity-with-go-ipfs.spec.js index ce936be..94e44fd 100644 --- a/test/hash-parity-with-go-ipfs.spec.js +++ b/test/hash-parity-with-go-ipfs.spec.js @@ -9,7 +9,7 @@ const expect = chai.expect const IPLD = require('ipld') const inMemory = require('ipld-in-memory') const randomByteStream = require('./helpers/finite-pseudorandom-byte-stream') -const first = require('async-iterator-first') +const first = require('it-first') const strategies = [ 'flat', diff --git a/test/import-export-nested-dir.spec.js b/test/import-export-nested-dir.spec.js index 8d58080..ae60712 100644 --- a/test/import-export-nested-dir.spec.js +++ b/test/import-export-nested-dir.spec.js @@ -6,7 +6,7 @@ chai.use(require('dirty-chai')) const expect = chai.expect const IPLD = require('ipld') const inMemory = require('ipld-in-memory') -const all = require('async-iterator-all') +const all = require('it-all') const importer = require('../src') const exporter = require('ipfs-unixfs-exporter') diff --git a/test/importer.spec.js b/test/importer.spec.js index 6fab88e..321db3a 100644 --- a/test/importer.spec.js +++ b/test/importer.spec.js @@ -17,8 +17,8 @@ const loadFixture = require('aegir/fixtures') const isNode = require('detect-node') const bigFile = loadFixture((isNode ? __dirname : 'test') + '/fixtures/1.2MiB.txt') const smallFile = loadFixture((isNode ? __dirname : 'test') + '/fixtures/200Bytes.txt') -const all = require('async-iterator-all') -const first = require('async-iterator-first') +const all = require('it-all') +const first = require('it-first') function stringifyMh (files) { return files.map((file) => { @@ -563,9 +563,7 @@ strategies.forEach((strategy) => { const options = { progress: spy(), - chunkerOptions: { - maxChunkSize - } + maxChunkSize } await all(importer([{