diff --git a/README.md b/README.md index aab39d9..6eea143 100644 --- a/README.md +++ b/README.md @@ -124,10 +124,9 @@ The input's file paths and directory structure will be preserved in the [`dag-pb - `chunker` (string, defaults to `"fixed"`): the chunking strategy. Supports: - `fixed` - `rabin` -- `chunkerOptions` (object, optional): the options for the chunker. Defaults to an object with the following properties: - - `avgChunkSize` (positive integer, defaults to `262144`): the average chunk size (rabin chunker only) - - `minChunkSize` (positive integer): the minimum chunk size (rabin chunker only) - - `maxChunkSize` (positive integer, defaults to `262144`): the maximum chunk size +- `avgChunkSize` (positive integer, defaults to `262144`): the average chunk size (rabin chunker only) +- `minChunkSize` (positive integer): the minimum chunk size (rabin chunker only) +- `maxChunkSize` (positive integer, defaults to `262144`): the maximum chunk size - `strategy` (string, defaults to `"balanced"`): the DAG builder strategy name. Supports: - `flat`: flat list of chunks - `balanced`: builds a balanced tree @@ -144,6 +143,8 @@ The input's file paths and directory structure will be preserved in the [`dag-pb - `cidVersion` (integer, default 0): the CID version to use when storing the data (storage keys are based on the CID, _including_ it's version) - `rawLeaves` (boolean, defaults to false): When a file would span multiple DAGNodes, if this is true the leaf nodes will not be wrapped in `UnixFS` protobufs and will instead contain the raw file bytes - `leafType` (string, defaults to `'file'`) what type of UnixFS node leaves should be - can be `'file'` or `'raw'` (ignored when `rawLeaves` is `true`) +- `blockWriteConcurrency` (positive integer, defaults to 10) How many blocks to hash and write to the block store concurrently. For small numbers of large files this should be high (e.g. 50). +- `fileImportConcurrency` (number, defaults to 50) How many files to import concurrently. For large numbers of small files this should be high (e.g. 50). [ipld-resolver instance]: https://github.com/ipld/js-ipld-resolver [UnixFS]: https://github.com/ipfs/specs/tree/master/unixfs diff --git a/package.json b/package.json index e9d2f59..5e5b735 100644 --- a/package.json +++ b/package.json @@ -38,33 +38,35 @@ "homepage": "https://github.com/ipfs/js-ipfs-unixfs-importer#readme", "devDependencies": { "aegir": "^20.0.0", - "async-iterator-buffer-stream": "^1.0.0", - "async-iterator-last": "^1.0.0", "chai": "^4.2.0", "cids": "~0.7.1", + "deep-extend": "~0.6.0", "detect-node": "^2.0.4", "dirty-chai": "^2.0.1", "ipfs-unixfs-exporter": "^0.39.0", "ipld": "^0.25.0", "ipld-in-memory": "^3.0.0", + "it-buffer-stream": "^1.0.0", + "it-last": "^1.0.0", "multihashes": "~0.4.14", "nyc": "^14.0.0", "sinon": "^7.1.0" }, "dependencies": { - "async-iterator-all": "^1.0.0", - "async-iterator-batch": "~0.0.1", - "async-iterator-first": "^1.0.0", "bl": "^4.0.0", - "deep-extend": "~0.6.0", "err-code": "^2.0.0", "hamt-sharding": "~0.0.2", "ipfs-unixfs": "^0.2.0", "ipld-dag-pb": "^0.18.0", + "it-all": "^1.0.1", + "it-batch": "^1.0.3", + "it-first": "^1.0.1", + "it-flat-batch": "^1.0.2", + "it-parallel-batch": "1.0.2", + "merge-options": "^2.0.0", "multicodec": "~0.5.1", "multihashing-async": "^0.8.0", - "rabin-wasm": "~0.0.8", - "superstruct": "^0.8.2" + "rabin-wasm": "~0.0.8" }, "contributors": [ "Alan Shaw ", diff --git a/src/dag-builder/file/balanced.js b/src/dag-builder/file/balanced.js index 72b3cb8..26fd7a2 100644 --- a/src/dag-builder/file/balanced.js +++ b/src/dag-builder/file/balanced.js @@ -1,6 +1,6 @@ 'use strict' -const batch = require('async-iterator-batch') +const batch = require('it-flat-batch') async function * balanced (source, reduce, options) { yield await reduceToParents(source, reduce, options) diff --git a/src/dag-builder/file/flat.js b/src/dag-builder/file/flat.js index c7ba75e..7146d2e 100644 --- a/src/dag-builder/file/flat.js +++ b/src/dag-builder/file/flat.js @@ -1,11 +1,11 @@ 'use strict' -const batch = require('async-iterator-batch') +const batch = require('it-flat-batch') module.exports = async function * (source, reduce) { const roots = [] - for await (const chunk of batch(source, Infinity)) { + for await (const chunk of batch(source, Number.MAX_SAFE_INTEGER)) { roots.push(await reduce(chunk)) } diff --git a/src/dag-builder/file/index.js b/src/dag-builder/file/index.js index f3be403..bfb71a6 100644 --- a/src/dag-builder/file/index.js +++ b/src/dag-builder/file/index.js @@ -7,7 +7,8 @@ const { DAGNode, DAGLink } = require('ipld-dag-pb') -const all = require('async-iterator-all') +const all = require('it-all') +const parallelBatch = require('it-parallel-batch') const dagBuilders = { flat: require('./flat'), @@ -15,46 +16,53 @@ const dagBuilders = { trickle: require('./trickle') } -async function * buildFile (file, source, ipld, options) { - let count = -1 - let previous - +async function * importBuffer (file, source, ipld, options) { for await (const buffer of source) { - count++ - options.progress(buffer.length) - let node - let unixfs + yield async () => { + options.progress(buffer.length) + let node + let unixfs - const opts = { - ...options - } + const opts = { + ...options + } - if (options.rawLeaves) { - node = buffer + if (options.rawLeaves) { + node = buffer - opts.codec = 'raw' - opts.cidVersion = 1 - } else { - unixfs = new UnixFS(options.leafType, buffer) + opts.codec = 'raw' + opts.cidVersion = 1 + } else { + unixfs = new UnixFS(options.leafType, buffer) - if (file.mtime) { - unixfs.mtime = file.mtime - } + if (file.mtime) { + unixfs.mtime = file.mtime + } + + if (file.mode) { + unixfs.mode = file.mode + } - if (file.mode) { - unixfs.mode = file.mode + node = new DAGNode(unixfs.marshal()) } - node = new DAGNode(unixfs.marshal()) + const cid = await persist(node, ipld, opts) + + return { + cid: cid, + unixfs, + node + } } + } +} - const cid = await persist(node, ipld, opts) +async function * buildFileBatch (file, source, ipld, options) { + let count = -1 + let previous - const entry = { - cid: cid, - unixfs, - node - } + for await (const entry of parallelBatch(importBuffer(file, source, ipld, options), options.blockWriteConcurrency)) { + count++ if (count === 0) { previous = entry @@ -149,7 +157,7 @@ const fileBuilder = async (file, source, ipld, options) => { throw errCode(new Error(`Unknown importer build strategy name: ${options.strategy}`), 'ERR_BAD_STRATEGY') } - const roots = await all(dagBuilder(buildFile(file, source, ipld, options), reduce(file, ipld, options), options.builderOptions)) + const roots = await all(dagBuilder(buildFileBatch(file, source, ipld, options), reduce(file, ipld, options), options)) if (roots.length > 1) { throw errCode(new Error('expected a maximum of 1 roots and got ' + roots.length), 'ETOOMANYROOTS') diff --git a/src/dag-builder/file/trickle.js b/src/dag-builder/file/trickle.js index a970517..9941d87 100644 --- a/src/dag-builder/file/trickle.js +++ b/src/dag-builder/file/trickle.js @@ -1,6 +1,6 @@ 'use strict' -const batch = require('async-iterator-batch') +const batch = require('it-flat-batch') module.exports = function * trickleReduceToRoot (source, reduce, options) { yield trickleStream(source, reduce, options) diff --git a/src/dag-builder/index.js b/src/dag-builder/index.js index 013d498..bc2f4b2 100644 --- a/src/dag-builder/index.js +++ b/src/dag-builder/index.js @@ -30,13 +30,13 @@ async function * dagBuilder (source, ipld, options) { } } - const chunker = createChunker(options.chunker, validateChunks(source), options.chunkerOptions) + const chunker = createChunker(options.chunker, validateChunks(source), options) // item is a file - yield fileBuilder(entry, chunker, ipld, options) + yield () => fileBuilder(entry, chunker, ipld, options) } else { // item is a directory - yield dirBuilder(entry, ipld, options) + yield () => dirBuilder(entry, ipld, options) } } } diff --git a/src/dir-sharded.js b/src/dir-sharded.js index e515b8a..97dcf36 100644 --- a/src/dir-sharded.js +++ b/src/dir-sharded.js @@ -9,7 +9,7 @@ const multihashing = require('multihashing-async') const Dir = require('./dir') const persist = require('./utils/persist') const Bucket = require('hamt-sharding') -const extend = require('deep-extend') +const mergeOptions = require('merge-options').bind({ ignoreUndefined: true }) const hashFn = async function (value) { const hash = await multihashing(Buffer.from(value, 'utf8'), 'murmur3-128') @@ -36,7 +36,7 @@ const defaultOptions = { class DirSharded extends Dir { constructor (props, options) { - options = extend({}, defaultOptions, options) + options = mergeOptions(defaultOptions, options) super(props, options) diff --git a/src/index.js b/src/index.js index 4ff621d..fc09074 100644 --- a/src/index.js +++ b/src/index.js @@ -1,78 +1,41 @@ 'use strict' -const { superstruct } = require('superstruct') const dagBuilder = require('./dag-builder') const treeBuilder = require('./tree-builder') -const mh = require('multihashes') +const parallelBatch = require('it-parallel-batch') +const mergeOptions = require('merge-options').bind({ ignoreUndefined: true }) -const struct = superstruct({ - types: { - codec: v => ['dag-pb', 'dag-cbor', 'raw'].includes(v), - hashAlg: v => Object.keys(mh.names).includes(v), - leafType: v => ['file', 'raw'].includes(v) - } -}) - -const ChunkerOptions = struct({ - minChunkSize: 'number?', - maxChunkSize: 'number?', - avgChunkSize: 'number?', - window: 'number?', - polynomial: 'number?' -}, { - maxChunkSize: 262144, - avgChunkSize: 262144, - window: 16, - polynomial: 17437180132763653 // https://github.com/ipfs/go-ipfs-chunker/blob/d0125832512163708c0804a3cda060e21acddae4/rabin.go#L11 -}) - -const BuilderOptions = struct({ - maxChildrenPerNode: 'number?', - layerRepeat: 'number?' -}, { - maxChildrenPerNode: 174, - layerRepeat: 4 -}) - -const Options = struct({ - chunker: struct.enum(['fixed', 'rabin']), - rawLeaves: 'boolean?', - hashOnly: 'boolean?', - strategy: struct.enum(['balanced', 'flat', 'trickle']), - reduceSingleLeafToSelf: 'boolean?', - codec: 'codec?', - format: 'codec?', - hashAlg: 'hashAlg?', - leafType: 'leafType?', - cidVersion: 'number?', - progress: 'function?', - wrapWithDirectory: 'boolean?', - shardSplitThreshold: 'number?', - onlyHash: 'boolean?', - chunkerOptions: ChunkerOptions, - builderOptions: BuilderOptions, - - wrap: 'boolean?', - pin: 'boolean?', - recursive: 'boolean?', - ignore: 'array?', - hidden: 'boolean?', - preload: 'boolean?' -}, { +const defaultOptions = { chunker: 'fixed', - strategy: 'balanced', + strategy: 'balanced', // 'flat', 'trickle' rawLeaves: false, + onlyHash: false, reduceSingleLeafToSelf: true, codec: 'dag-pb', hashAlg: 'sha2-256', - leafType: 'file', + leafType: 'file', // 'raw' cidVersion: 0, progress: () => () => {}, - shardSplitThreshold: 1000 -}) + shardSplitThreshold: 1000, + fileImportConcurrency: 50, + blockWriteConcurrency: 10, + minChunkSize: 262144, + maxChunkSize: 262144, + avgChunkSize: 262144, + window: 16, + polynomial: 17437180132763653, // https://github.com/ipfs/go-ipfs-chunker/blob/d0125832512163708c0804a3cda060e21acddae4/rabin.go#L11 + maxChildrenPerNode: 174, + layerRepeat: 4, + wrapWithDirectory: false, + pin: true, + recursive: false, + ignore: null, // [] + hidden: false, + preload: true +} module.exports = async function * (source, ipld, options = {}) { - const opts = Options(options) + const opts = mergeOptions(defaultOptions, options) if (options.cidVersion > 0 && options.rawLeaves === undefined) { // if the cid version is 1 or above, use raw leaves as this is @@ -93,10 +56,10 @@ module.exports = async function * (source, ipld, options = {}) { } if (options.format) { - options.codec = options.format + opts.codec = options.format } - for await (const entry of treeBuilder(dagBuilder(source, ipld, opts), ipld, opts)) { + for await (const entry of treeBuilder(parallelBatch(dagBuilder(source, ipld, opts), opts.fileImportConcurrency), ipld, opts)) { yield { cid: entry.cid, path: entry.path, diff --git a/src/tree-builder.js b/src/tree-builder.js index 55bab49..cd43170 100644 --- a/src/tree-builder.js +++ b/src/tree-builder.js @@ -5,7 +5,7 @@ const flatToShard = require('./flat-to-shard') const Dir = require('./dir') const toPathComponents = require('./utils/to-path-components') const errCode = require('err-code') -const first = require('async-iterator-first') +const first = require('it-first') async function addToTree (elem, tree, options) { const pathElems = toPathComponents(elem.path || '') @@ -61,6 +61,9 @@ async function * treeBuilder (source, ipld, options) { }, options) for await (const entry of source) { + if (!entry) { + continue + } tree = await addToTree(entry, tree, options) yield entry diff --git a/test/benchmark.spec.js b/test/benchmark.spec.js index b0b1db2..fae3f48 100644 --- a/test/benchmark.spec.js +++ b/test/benchmark.spec.js @@ -5,8 +5,8 @@ const importer = require('../src') const IPLD = require('ipld') const inMemory = require('ipld-in-memory') -const bufferStream = require('async-iterator-buffer-stream') -const all = require('async-iterator-all') +const bufferStream = require('it-buffer-stream') +const all = require('it-all') const REPEATS = 10 const FILE_SIZE = Math.pow(2, 20) * 500 // 500MB diff --git a/test/builder-balanced.spec.js b/test/builder-balanced.spec.js index f064dd8..17242a3 100644 --- a/test/builder-balanced.spec.js +++ b/test/builder-balanced.spec.js @@ -5,7 +5,7 @@ const chai = require('chai') chai.use(require('dirty-chai')) const expect = chai.expect const builder = require('../src/dag-builder/file/balanced') -const all = require('async-iterator-all') +const all = require('it-all') function reduce (leaves) { if (leaves.length > 1) { diff --git a/test/builder-dir-sharding.spec.js b/test/builder-dir-sharding.spec.js index 057eb2e..b52b07b 100644 --- a/test/builder-dir-sharding.spec.js +++ b/test/builder-dir-sharding.spec.js @@ -9,8 +9,8 @@ chai.use(require('dirty-chai')) const expect = chai.expect const IPLD = require('ipld') const inMemory = require('ipld-in-memory') -const all = require('async-iterator-all') -const last = require('async-iterator-last') +const all = require('it-all') +const last = require('it-last') describe('builder: directory sharding', () => { let ipld diff --git a/test/builder-flat.spec.js b/test/builder-flat.spec.js index 1e3acc8..e3f0339 100644 --- a/test/builder-flat.spec.js +++ b/test/builder-flat.spec.js @@ -5,7 +5,7 @@ const chai = require('chai') chai.use(require('dirty-chai')) const expect = chai.expect const builder = require('../src/dag-builder/file/flat') -const all = require('async-iterator-all') +const all = require('it-all') function reduce (leaves) { if (leaves.length > 1) { diff --git a/test/builder-only-hash.spec.js b/test/builder-only-hash.spec.js index 45ac5dd..e7e7642 100644 --- a/test/builder-only-hash.spec.js +++ b/test/builder-only-hash.spec.js @@ -7,7 +7,7 @@ const expect = chai.expect const IPLD = require('ipld') const inMemory = require('ipld-in-memory') const builder = require('../src/dag-builder') -const all = require('async-iterator-all') +const all = require('it-all') describe('builder: onlyHash', () => { let ipld @@ -30,18 +30,14 @@ describe('builder: onlyHash', () => { format: 'dag-pb', hashAlg: 'sha2-256', wrap: true, - chunkerOptions: { - maxChunkSize: 1024 - }, - builderOptions: { - maxChildrenPerNode: 254 - } + maxChunkSize: 1024, + maxChildrenPerNode: 254 })) expect(nodes.length).to.equal(1) try { - await ipld.get(nodes[0].cid) + await ipld.get((await nodes[0]()).cid) throw new Error('Should have errored') } catch (err) { diff --git a/test/builder-trickle-dag.spec.js b/test/builder-trickle-dag.spec.js index 8e625b4..ba6c239 100644 --- a/test/builder-trickle-dag.spec.js +++ b/test/builder-trickle-dag.spec.js @@ -5,7 +5,7 @@ const chai = require('chai') chai.use(require('dirty-chai')) const expect = chai.expect const builder = require('../src/dag-builder/file/trickle') -const all = require('async-iterator-all') +const all = require('it-all') const createValues = (max) => { const output = [] diff --git a/test/builder.spec.js b/test/builder.spec.js index 1eea492..bbeb9d0 100644 --- a/test/builder.spec.js +++ b/test/builder.spec.js @@ -9,7 +9,7 @@ const IPLD = require('ipld') const inMemory = require('ipld-in-memory') const UnixFS = require('ipfs-unixfs') const builder = require('../src/dag-builder') -const first = require('async-iterator-first') +const first = require('it-first') describe('builder', () => { let ipld @@ -27,9 +27,7 @@ describe('builder', () => { format: 'dag-pb', hashAlg: 'sha2-256', progress: () => {}, - chunkerOptions: { - maxChunkSize: 262144 - } + maxChunkSize: 262144 } it('allows multihash hash algorithm to be specified', async () => { @@ -45,7 +43,7 @@ describe('builder', () => { content: Buffer.from(content) } - const imported = await first(builder([inputFile], ipld, options)) + const imported = await (await first(builder([inputFile], ipld, options)))() expect(imported).to.exist() @@ -76,7 +74,7 @@ describe('builder', () => { content: Buffer.alloc(262144 + 5).fill(1) } - const imported = await first(builder([Object.assign({}, inputFile)], ipld, options)) + const imported = await (await first(builder([inputFile], ipld, options)))() expect(imported).to.exist() expect(mh.decode(imported.cid.multihash).name).to.equal(hashAlg) @@ -96,7 +94,7 @@ describe('builder', () => { content: null } - const imported = await first(builder([Object.assign({}, inputFile)], ipld, options)) + const imported = await (await first(builder([Object.assign({}, inputFile)], ipld, options)))() expect(mh.decode(imported.cid.multihash).name).to.equal(hashAlg) diff --git a/test/chunker-fixed-size.spec.js b/test/chunker-fixed-size.spec.js index e61653e..276702a 100644 --- a/test/chunker-fixed-size.spec.js +++ b/test/chunker-fixed-size.spec.js @@ -6,7 +6,7 @@ const chai = require('chai') chai.use(require('dirty-chai')) const expect = chai.expect const isNode = require('detect-node') -const all = require('async-iterator-all') +const all = require('it-all') const loadFixture = require('aegir/fixtures') const rawFile = loadFixture((isNode ? __dirname : 'test') + '/fixtures/1MiB.txt') diff --git a/test/chunker-rabin.spec.js b/test/chunker-rabin.spec.js index 6caccc8..9f9a4af 100644 --- a/test/chunker-rabin.spec.js +++ b/test/chunker-rabin.spec.js @@ -7,7 +7,7 @@ chai.use(require('dirty-chai')) const expect = chai.expect const loadFixture = require('aegir/fixtures') const isNode = require('detect-node') -const all = require('async-iterator-all') +const all = require('it-all') const rawFile = loadFixture((isNode ? __dirname : 'test') + '/fixtures/1MiB.txt') diff --git a/test/hash-parity-with-go-ipfs.spec.js b/test/hash-parity-with-go-ipfs.spec.js index ce936be..94e44fd 100644 --- a/test/hash-parity-with-go-ipfs.spec.js +++ b/test/hash-parity-with-go-ipfs.spec.js @@ -9,7 +9,7 @@ const expect = chai.expect const IPLD = require('ipld') const inMemory = require('ipld-in-memory') const randomByteStream = require('./helpers/finite-pseudorandom-byte-stream') -const first = require('async-iterator-first') +const first = require('it-first') const strategies = [ 'flat', diff --git a/test/import-export-nested-dir.spec.js b/test/import-export-nested-dir.spec.js index 8d58080..ae60712 100644 --- a/test/import-export-nested-dir.spec.js +++ b/test/import-export-nested-dir.spec.js @@ -6,7 +6,7 @@ chai.use(require('dirty-chai')) const expect = chai.expect const IPLD = require('ipld') const inMemory = require('ipld-in-memory') -const all = require('async-iterator-all') +const all = require('it-all') const importer = require('../src') const exporter = require('ipfs-unixfs-exporter') diff --git a/test/importer.spec.js b/test/importer.spec.js index 6fab88e..321db3a 100644 --- a/test/importer.spec.js +++ b/test/importer.spec.js @@ -17,8 +17,8 @@ const loadFixture = require('aegir/fixtures') const isNode = require('detect-node') const bigFile = loadFixture((isNode ? __dirname : 'test') + '/fixtures/1.2MiB.txt') const smallFile = loadFixture((isNode ? __dirname : 'test') + '/fixtures/200Bytes.txt') -const all = require('async-iterator-all') -const first = require('async-iterator-first') +const all = require('it-all') +const first = require('it-first') function stringifyMh (files) { return files.map((file) => { @@ -563,9 +563,7 @@ strategies.forEach((strategy) => { const options = { progress: spy(), - chunkerOptions: { - maxChunkSize - } + maxChunkSize } await all(importer([{