From b7f29706be1dcc37f7ad1b5c27b702a2e3f29dc6 Mon Sep 17 00:00:00 2001 From: Hugo Dias Date: Tue, 8 Oct 2019 16:59:34 +0100 Subject: [PATCH 1/3] feat: add support for batch puts and benchmarks --- package.json | 13 +- src/dag-builder/file/index.js | 93 +++++++- src/index.js | 4 + test/benchmark.spec.js | 2 +- test/test-speed.browser.js | 414 ++++++++++++++++++++++++++++++++++ test/test-speed.js | 412 +++++++++++++++++++++++++++++++++ 6 files changed, 930 insertions(+), 8 deletions(-) create mode 100644 test/test-speed.browser.js create mode 100644 test/test-speed.js diff --git a/package.json b/package.json index 43c5a4d..9c36517 100644 --- a/package.json +++ b/package.json @@ -40,16 +40,22 @@ "aegir": "^20.0.0", "async-iterator-buffer-stream": "^1.0.0", "async-iterator-last": "^1.0.0", + "benchmark": "^2.1.4", "chai": "^4.2.0", - "cids": "~0.7.1", "detect-node": "^2.0.4", "dirty-chai": "^2.0.1", + "idb": "^4.0.4", + "ipfs-block-service": "^0.16.0", + "ipfs-repo": "^0.28.0", "ipfs-unixfs-exporter": "~0.37.0", "ipld": "^0.25.0", "ipld-in-memory": "^3.0.0", + "iso-random-stream": "^1.1.1", + "lodash": "^4.17.15", "multihashes": "~0.4.14", "nyc": "^14.0.0", - "sinon": "^7.1.0" + "sinon": "^7.1.0", + "tempy": "^0.3.0" }, "dependencies": { "async-iterator-all": "^1.0.0", @@ -59,7 +65,10 @@ "deep-extend": "~0.6.0", "err-code": "^2.0.0", "hamt-sharding": "~0.0.2", + "interval-promise": "^1.3.0", + "idb-keyval": "^3.2.0", "ipfs-unixfs": "~0.1.16", + "ipfs-block": "^0.8.1", "ipld-dag-pb": "^0.18.0", "multicodec": "~0.5.1", "multihashing-async": "~0.7.0", diff --git a/src/dag-builder/file/index.js b/src/dag-builder/file/index.js index a3fb565..6a2ad53 100644 --- a/src/dag-builder/file/index.js +++ b/src/dag-builder/file/index.js @@ -1,7 +1,11 @@ 'use strict' +const Block = require('ipfs-block') const errCode = require('err-code') const UnixFS = require('ipfs-unixfs') +const mh = require('multihashes') +const mc = require('multicodec') +const interval = require('interval-promise') const persist = require('../../utils/persist') const { DAGNode, @@ -25,13 +29,10 @@ async function * buildFile (source, ipld, options) { let node let unixfs - const opts = { - ...options - } + const opts = { ...options } if (options.rawLeaves) { node = buffer - opts.codec = 'raw' opts.cidVersion = 1 } else { @@ -64,6 +65,84 @@ async function * buildFile (source, ipld, options) { } } +const serialize = (node, ipld, options) => { + if ((!options.codec && node.length) || options.rawLeaves) { + options.cidVersion = 1 + options.codec = 'raw' + } + + if (isNaN(options.hashAlg)) { + options.hashAlg = mh.names[options.hashAlg] + } + + if (options.hashAlg !== mh.names['sha2-256']) { + options.cidVersion = 1 + } + + if (options.format) { + options.codec = options.format + } + + const format = mc[options.codec.toUpperCase().replace(/-/g, '_')] + + return ipld.serialize(node, format, options) +} + +async function * buildFileBatch (source, ipld, options) { + let count = -1 + let previous + let nodesToPersist = [] + + const save = interval(async (iteration, stop) => { + if (nodesToPersist.length) { + const temp = nodesToPersist + nodesToPersist = [] + await ipld.putBatch(temp, options) + } else { + stop() + } + }, options.batchInterval) + + for await (const buffer of source) { + count++ + options.progress(buffer.length) + let node + let unixfs + + if (options.rawLeaves) { + node = buffer + } else { + unixfs = new UnixFS(options.leafType, buffer) + node = new DAGNode(unixfs.marshal()) + } + + const result = await serialize(node, ipld, options) + nodesToPersist.push(new Block(result[1], result[0])) + const entry = { + cid: result[0], + unixfs, + node + } + if (count === 0) { + previous = entry + continue + } else if (count === 1) { + yield previous + previous = null + } + + yield entry + } + + // Wait for everything to be saved + await save + + if (previous) { + previous.single = true + yield previous + } +} + const reduce = (file, ipld, options) => { return async function (leaves) { if (leaves.length === 1 && leaves[0].single && options.reduceSingleLeafToSelf) { @@ -132,7 +211,11 @@ const fileBuilder = async (file, source, ipld, options) => { throw errCode(new Error(`Unknown importer build strategy name: ${options.strategy}`), 'ERR_BAD_STRATEGY') } - const roots = await all(dagBuilder(buildFile(source, ipld, options), reduce(file, ipld, options), options.builderOptions)) + const roots = await all(dagBuilder( + options.batch ? buildFileBatch(source, ipld, options) : buildFile(source, ipld, options), + reduce(file, ipld, options), + options.builderOptions + )) if (roots.length > 1) { throw errCode(new Error('expected a maximum of 1 roots and got ' + roots.length), 'ETOOMANYROOTS') diff --git a/src/index.js b/src/index.js index 4ff621d..75a01c6 100644 --- a/src/index.js +++ b/src/index.js @@ -38,6 +38,8 @@ const Options = struct({ chunker: struct.enum(['fixed', 'rabin']), rawLeaves: 'boolean?', hashOnly: 'boolean?', + batch: 'boolean?', + batchInterval: 'number?', strategy: struct.enum(['balanced', 'flat', 'trickle']), reduceSingleLeafToSelf: 'boolean?', codec: 'codec?', @@ -62,6 +64,8 @@ const Options = struct({ chunker: 'fixed', strategy: 'balanced', rawLeaves: false, + batch: true, + batchInterval: 50, reduceSingleLeafToSelf: true, codec: 'dag-pb', hashAlg: 'sha2-256', diff --git a/test/benchmark.spec.js b/test/benchmark.spec.js index 1c96013..b0b1db2 100644 --- a/test/benchmark.spec.js +++ b/test/benchmark.spec.js @@ -24,7 +24,7 @@ describe.skip('benchmark', function () { const times = [] after(() => { - console.info(`Percent\tms`) // eslint-disable-line no-console + console.info('Percent\tms') // eslint-disable-line no-console times.forEach((time, index) => { console.info(`${index}\t${parseInt(time / REPEATS)}`) // eslint-disable-line no-console }) diff --git a/test/test-speed.browser.js b/test/test-speed.browser.js new file mode 100644 index 0000000..5763359 --- /dev/null +++ b/test/test-speed.browser.js @@ -0,0 +1,414 @@ +/* eslint-env mocha */ +'use strict' + +const importer = require('../src') +const IPLD = require('ipld') +const bufferStream = require('async-iterator-buffer-stream') +const all = require('async-iterator-all') +const randomBuffer = require('iso-random-stream/src/random') +const IPFSRepo = require('ipfs-repo') +const BlockService = require('ipfs-block-service') +const { openDB } = require('idb') + +// Run benchmarkjs in the browser https://github.com/bestiejs/benchmark.js/issues/128#issuecomment-271615298 +const _ = require('lodash') +const process = require('process') +const benchmark = require('benchmark') +const Benchmark = benchmark.runInContext({ _, process }) +window.Benchmark = Benchmark + +const createIPLD = async (opts) => { + const repo = new IPFSRepo(`repo-${Math.random()}`, opts) + const blockService = new BlockService(repo) + await repo.init({}) + await repo.open() + return new IPLD({ blockService }) +} + +const FILE_SIZE = Math.pow(2, 20) * 500 // 500MB +const CHUNK_SIZE = 65536 + +describe('benchmark', function () { + this.timeout(0) + it('single run 500mb without batch', async () => { // eslint-disable-line no-loop-func + const ipld = await createIPLD() + const times = [] + let read = 0 + let lastDate = Date.now() + let lastPercent = 0 + + const options = { + progress: (prog) => { + read += prog + const percent = parseInt((read / FILE_SIZE) * 100) + if (percent > lastPercent) { + times[percent] = (times[percent] || 0) + (Date.now() - lastDate) + lastDate = Date.now() + lastPercent = percent + } + } + } + + await all(importer([{ + path: 'single-500.txt', + content: bufferStream(FILE_SIZE, { + chunkSize: CHUNK_SIZE, + generator: () => { + return randomBuffer(CHUNK_SIZE) + } + }) + }], ipld, options)) + console.info('Percent\tms') // eslint-disable-line no-console + // times.forEach((time, index) => { + // console.info(`${index}\t${parseInt(time)}`) // eslint-disable-line no-console + // }) + }) + it('single run 500mb with batch 50ms', async () => { // eslint-disable-line no-loop-func + const ipld = await createIPLD() + + const options = { + batchInterval: 50, + batch: true + } + + await all(importer([{ + path: 'single-500-batch-50.txt', + content: bufferStream(FILE_SIZE, { + chunkSize: CHUNK_SIZE, + generator: () => { + return randomBuffer(CHUNK_SIZE) + } + }) + }], ipld, options)) + }) + it('single run 500mb with batch 100ms', async () => { // eslint-disable-line no-loop-func + const ipld = await createIPLD() + + const options = { + batchInterval: 100, + batch: true + } + + await all(importer([{ + path: 'single-500-batch.txt', + content: bufferStream(FILE_SIZE, { + chunkSize: CHUNK_SIZE, + generator: () => { + return randomBuffer(CHUNK_SIZE) + } + }) + }], ipld, options)) + }) + it('single run 500mb with batch 150ms', async () => { // eslint-disable-line no-loop-func + const ipld = await createIPLD() + + const options = { + batchInterval: 150, + batch: true + } + + await all(importer([{ + path: 'single-500-batch-150.txt', + content: bufferStream(FILE_SIZE, { + chunkSize: CHUNK_SIZE, + generator: () => { + return randomBuffer(CHUNK_SIZE) + } + }) + }], ipld, options)) + }) + it('single run 500mb with batch 200ms', async () => { // eslint-disable-line no-loop-func + const ipld = await createIPLD() + + const options = { + batchInterval: 200, + batch: true + } + + await all(importer([{ + path: 'single-500-batch-200.txt', + content: bufferStream(FILE_SIZE, { + chunkSize: CHUNK_SIZE, + generator: () => { + return randomBuffer(CHUNK_SIZE) + } + }) + }], ipld, options)) + }) + + const sizes = [10, 50, 100] + + for (const size of sizes) { + it.only(`benchmark ${size}mb`, (done) => { + const suite = new Benchmark.Suite() + const FILE_SIZE = Math.pow(2, 20) * size + + suite + .add('without batch', { + defer: true, + fn: async (deferred) => { + const ipld = await createIPLD() + await all(importer( + [{ + path: '200Bytes.txt', + content: bufferStream(FILE_SIZE, { + chunkSize: CHUNK_SIZE, + generator: () => randomBuffer(CHUNK_SIZE) + }) + }], + ipld, + { batch: false } + )) + + deferred.resolve() + } + }) + .add('batch 50ms ', { + defer: true, + fn: async (deferred) => { + const ipld = await createIPLD() + await all(importer( + [{ + path: 'batch100.txt', + content: bufferStream(FILE_SIZE, { + chunkSize: CHUNK_SIZE, + generator: () => randomBuffer(CHUNK_SIZE) + }) + }], + ipld, + { batch: true, batchInterval: 50 } + )) + + deferred.resolve() + } + }) + .add('batch 100ms ', { + defer: true, + fn: async (deferred) => { + const ipld = await createIPLD() + await all(importer( + [{ + path: 'batch100.txt', + content: bufferStream(FILE_SIZE, { + chunkSize: CHUNK_SIZE, + generator: () => randomBuffer(CHUNK_SIZE) + }) + }], + ipld, + { batch: true, batchInterval: 100 } + )) + + deferred.resolve() + } + }) + .add('batch 150ms ', { + defer: true, + fn: async (deferred) => { + const ipld = await createIPLD() + await all(importer( + [{ + path: 'batch100.txt', + content: bufferStream(FILE_SIZE, { + chunkSize: CHUNK_SIZE, + generator: () => randomBuffer(CHUNK_SIZE) + }) + }], + ipld, + { batch: true, batchInterval: 150 } + )) + + deferred.resolve() + } + }) + .add('batch 200mb ', { + defer: true, + fn: async (deferred) => { + const ipld = await createIPLD() + await all(importer( + [{ + path: 'batch100.txt', + content: bufferStream(FILE_SIZE, { + chunkSize: CHUNK_SIZE, + generator: () => randomBuffer(CHUNK_SIZE) + }) + }], + ipld, + { batch: true, batchInterval: 200 } + )) + + deferred.resolve() + } + }) + .on('cycle', function (event) { + console.log(String(event.target)) + }) + .on('complete', function () { + console.log('Fastest is ' + this.filter('fastest').map('name')) + done() + }) + .run({ async: true }) + }) + } +}) + +const { Store, set, get, del } = require('idb-keyval') + +function isStrictTypedArray (arr) { + return ( + arr instanceof Int8Array || + arr instanceof Int16Array || + arr instanceof Int32Array || + arr instanceof Uint8Array || + arr instanceof Uint8ClampedArray || + arr instanceof Uint16Array || + arr instanceof Uint32Array || + arr instanceof Float32Array || + arr instanceof Float64Array + ) +} + +function typedarrayToBuffer (arr) { + if (isStrictTypedArray(arr)) { + // To avoid a copy, use the typed array's underlying ArrayBuffer to back new Buffer + var buf = Buffer.from(arr.buffer) + if (arr.byteLength !== arr.buffer.byteLength) { + // Respect the "view", i.e. byteOffset and byteLength, without doing a copy + buf = buf.slice(arr.byteOffset, arr.byteOffset + arr.byteLength) + } + return buf + } else { + // Pass through all other types to `Buffer.from` + return Buffer.from(arr) + } +} + +class IdbDatastore { + constructor (location) { + this.store = new Store(location, location) + } + + open () { + } + + put (key, val) { + return set(key.toBuffer(), val, this.store) + } + + async get (key, callback) { + const value = await get(key.toBuffer(), this.store) + + if (!value) { + return callback(new Error('No value')) + } + + return typedarrayToBuffer(value) + } + + async has (key) { + const v = await get(key.toBuffer(), this.store) + if (v) { + return true + } + return false + } + + delete (key) { + return del(key.toBuffer(), this.store) + } + + batch () { + const puts = [] + const dels = [] + + return { + put (key, value) { + puts.push([key.toBuffer(), value]) + }, + delete (key) { + dels.push(key.toBuffer()) + }, + commit: () => { + return Promise.all(puts.map(p => this.put(p[0], p[1]))) + } + } + } + + query (q) { + return null + } + + close () { + + } +} + +class IdbDatastoreBatch { + constructor (location) { + this.location = location + } + + open () { + const location = this.location + return openDB(this.location, 1, { + upgrade (db) { + db.createObjectStore(location) + } + }) + } + + async put (key, val) { + const db = await this.open() + return db.put(this.location, val, key.toBuffer()) + } + + async get (key) { + const db = await this.open() + const value = await db.get(this.location, key.toBuffer()) + + if (!value) { + throw new Error('No value') + } + + return typedarrayToBuffer(value) + } + + async has (key) { + const db = await this.open() + return Boolean(await db.get(this.location, key.toBuffer())) + } + + async delete (key) { + const db = await this.open() + return db.del(this.location, key.toBuffer()) + } + + batch () { + const puts = [] + const dels = [] + + return { + put (key, value) { + puts.push([key.toBuffer(), value]) + }, + delete (key) { + dels.push(key.toBuffer()) + }, + commit: async () => { + const db = await this.open() + const tx = db.transaction(this.location, 'readwrite') + const store = tx.store + await Promise.all(puts.map(p => store.put(p[1], p[0]))) + await Promise.all(dels.map(p => store.del(p))) + await tx.done + } + } + } + + query (q) { + return null + } + + close () { + this.store.close() + } +} diff --git a/test/test-speed.js b/test/test-speed.js new file mode 100644 index 0000000..5a147e8 --- /dev/null +++ b/test/test-speed.js @@ -0,0 +1,412 @@ +/* eslint-env mocha */ +'use strict' + +const importer = require('../src') +const IPLD = require('ipld') +const bufferStream = require('async-iterator-buffer-stream') +const all = require('async-iterator-all') +const randomBuffer = require('iso-random-stream/src/random') +const IPFSRepo = require('ipfs-repo') +const BlockService = require('ipfs-block-service') +const { openDB } = require('idb') +const tempy = require('tempy') +const Benchmark = require('benchmark') + +const createIPLD = async (opts) => { + const repo = new IPFSRepo(tempy.directory(), opts) + const blockService = new BlockService(repo) + await repo.init({}) + await repo.open() + return new IPLD({ blockService }) +} + +const FILE_SIZE = Math.pow(2, 20) * 500 // 500MB +const CHUNK_SIZE = 65536 + +describe('benchmark', function () { + this.timeout(0) + it.only('single run 500mb without batch', async () => { // eslint-disable-line no-loop-func + const ipld = await createIPLD() + const times = [] + let read = 0 + let lastDate = Date.now() + let lastPercent = 0 + + const options = { + progress: (prog) => { + read += prog + const percent = parseInt((read / FILE_SIZE) * 100) + if (percent > lastPercent) { + times[percent] = (times[percent] || 0) + (Date.now() - lastDate) + lastDate = Date.now() + lastPercent = percent + } + } + } + + await all(importer([{ + path: 'single-500.txt', + content: bufferStream(FILE_SIZE, { + chunkSize: CHUNK_SIZE, + generator: () => { + return randomBuffer(CHUNK_SIZE) + } + }) + }], ipld, options)) + // console.info('Percent\tms') // eslint-disable-line no-console + // times.forEach((time, index) => { + // console.info(`${index}\t${parseInt(time)}`) // eslint-disable-line no-console + // }) + }) + + it.only('single run 500mb with batch 50ms', async () => { // eslint-disable-line no-loop-func + const ipld = await createIPLD() + + const options = { + batchInterval: 50, + batch: true + } + + await all(importer([{ + path: 'single-500-batch-50.txt', + content: bufferStream(FILE_SIZE, { + chunkSize: CHUNK_SIZE, + generator: () => { + return randomBuffer(CHUNK_SIZE) + } + }) + }], ipld, options)) + }) + + it.only('single run 500mb with batch 100ms', async () => { // eslint-disable-line no-loop-func + const ipld = await createIPLD() + + const options = { + batchInterval: 100, + batch: true + } + + await all(importer([{ + path: 'single-500-batch.txt', + content: bufferStream(FILE_SIZE, { + chunkSize: CHUNK_SIZE, + generator: () => { + return randomBuffer(CHUNK_SIZE) + } + }) + }], ipld, options)) + }) + + it('single run 500mb with batch 150ms', async () => { // eslint-disable-line no-loop-func + const ipld = await createIPLD() + + const options = { + batchInterval: 150, + batch: true + } + + await all(importer([{ + path: 'single-500-batch-150.txt', + content: bufferStream(FILE_SIZE, { + chunkSize: CHUNK_SIZE, + generator: () => { + return randomBuffer(CHUNK_SIZE) + } + }) + }], ipld, options)) + }) + it('single run 500mb with batch 200ms', async () => { // eslint-disable-line no-loop-func + const ipld = await createIPLD() + + const options = { + batchInterval: 200, + batch: true + } + + await all(importer([{ + path: 'single-500-batch-200.txt', + content: bufferStream(FILE_SIZE, { + chunkSize: CHUNK_SIZE, + generator: () => { + return randomBuffer(CHUNK_SIZE) + } + }) + }], ipld, options)) + }) + + const sizes = [10, 50, 100, 200] + + for (const size of sizes) { + it(`benchmark ${size}mb`, (done) => { + const suite = new Benchmark.Suite() + const FILE_SIZE = Math.pow(2, 20) * size + + suite + .add('without batch', { + defer: true, + fn: async (deferred) => { + const ipld = await createIPLD() + await all(importer( + [{ + path: '200Bytes.txt', + content: bufferStream(FILE_SIZE, { + chunkSize: CHUNK_SIZE, + generator: () => randomBuffer(CHUNK_SIZE) + }) + }], + ipld, + { batch: false } + )) + + deferred.resolve() + } + }) + .add('batch 50ms ', { + defer: true, + fn: async (deferred) => { + const ipld = await createIPLD() + await all(importer( + [{ + path: 'batch100.txt', + content: bufferStream(FILE_SIZE, { + chunkSize: CHUNK_SIZE, + generator: () => randomBuffer(CHUNK_SIZE) + }) + }], + ipld, + { batch: true, batchInterval: 50 } + )) + + deferred.resolve() + } + }) + .add('batch 100ms ', { + defer: true, + fn: async (deferred) => { + const ipld = await createIPLD() + await all(importer( + [{ + path: 'batch100.txt', + content: bufferStream(FILE_SIZE, { + chunkSize: CHUNK_SIZE, + generator: () => randomBuffer(CHUNK_SIZE) + }) + }], + ipld, + { batch: true, batchInterval: 100 } + )) + + deferred.resolve() + } + }) + .add('batch 150ms ', { + defer: true, + fn: async (deferred) => { + const ipld = await createIPLD() + await all(importer( + [{ + path: 'batch100.txt', + content: bufferStream(FILE_SIZE, { + chunkSize: CHUNK_SIZE, + generator: () => randomBuffer(CHUNK_SIZE) + }) + }], + ipld, + { batch: true, batchInterval: 150 } + )) + + deferred.resolve() + } + }) + .add('batch 200mb ', { + defer: true, + fn: async (deferred) => { + const ipld = await createIPLD() + await all(importer( + [{ + path: 'batch100.txt', + content: bufferStream(FILE_SIZE, { + chunkSize: CHUNK_SIZE, + generator: () => randomBuffer(CHUNK_SIZE) + }) + }], + ipld, + { batch: true, batchInterval: 200 } + )) + + deferred.resolve() + } + }) + .on('cycle', function (event) { + console.log(String(event.target)) + }) + .on('complete', function () { + console.log('Fastest is ' + this.filter('fastest').map('name')) + done() + }) + .run({ async: true }) + }) + } +}) + +const { Store, set, get, del } = require('idb-keyval') + +function isStrictTypedArray (arr) { + return ( + arr instanceof Int8Array || + arr instanceof Int16Array || + arr instanceof Int32Array || + arr instanceof Uint8Array || + arr instanceof Uint8ClampedArray || + arr instanceof Uint16Array || + arr instanceof Uint32Array || + arr instanceof Float32Array || + arr instanceof Float64Array + ) +} + +function typedarrayToBuffer (arr) { + if (isStrictTypedArray(arr)) { + // To avoid a copy, use the typed array's underlying ArrayBuffer to back new Buffer + var buf = Buffer.from(arr.buffer) + if (arr.byteLength !== arr.buffer.byteLength) { + // Respect the "view", i.e. byteOffset and byteLength, without doing a copy + buf = buf.slice(arr.byteOffset, arr.byteOffset + arr.byteLength) + } + return buf + } else { + // Pass through all other types to `Buffer.from` + return Buffer.from(arr) + } +} + +class IdbDatastore { + constructor (location) { + this.store = new Store(location, location) + } + + open () { + } + + put (key, val) { + return set(key.toBuffer(), val, this.store) + } + + async get (key, callback) { + const value = await get(key.toBuffer(), this.store) + + if (!value) { + return callback(new Error('No value')) + } + + return typedarrayToBuffer(value) + } + + async has (key) { + const v = await get(key.toBuffer(), this.store) + if (v) { + return true + } + return false + } + + delete (key) { + return del(key.toBuffer(), this.store) + } + + batch () { + const puts = [] + const dels = [] + + return { + put (key, value) { + puts.push([key.toBuffer(), value]) + }, + delete (key) { + dels.push(key.toBuffer()) + }, + commit: () => { + return Promise.all(puts.map(p => this.put(p[0], p[1]))) + } + } + } + + query (q) { + return null + } + + close () { + + } +} + +class IdbDatastoreBatch { + constructor (location) { + this.location = location + } + + open () { + const location = this.location + return openDB(this.location, 1, { + upgrade (db) { + db.createObjectStore(location) + } + }) + } + + async put (key, val) { + const db = await this.open() + return db.put(this.location, val, key.toBuffer()) + } + + async get (key) { + const db = await this.open() + const value = await db.get(this.location, key.toBuffer()) + + if (!value) { + throw new Error('No value') + } + + return typedarrayToBuffer(value) + } + + async has (key) { + const db = await this.open() + return Boolean(await db.get(this.location, key.toBuffer())) + } + + async delete (key) { + const db = await this.open() + return db.del(this.location, key.toBuffer()) + } + + batch () { + const puts = [] + const dels = [] + + return { + put (key, value) { + puts.push([key.toBuffer(), value]) + }, + delete (key) { + dels.push(key.toBuffer()) + }, + commit: async () => { + const db = await this.open() + const tx = db.transaction(this.location, 'readwrite') + const store = tx.store + await Promise.all(puts.map(p => store.put(p[1], p[0]))) + await Promise.all(dels.map(p => store.del(p))) + await tx.done + } + } + } + + query (q) { + return null + } + + close () { + this.store.close() + } +} From 08cdfd331dc60288ddb8b0541c4c3e7db1baf0e2 Mon Sep 17 00:00:00 2001 From: Hugo Dias Date: Wed, 9 Oct 2019 10:23:06 +0100 Subject: [PATCH 2/3] chore: improve benchmarks --- package.json | 4 ++-- test/test-speed.browser.js | 5 ++++- test/test-speed.js | 9 ++++++--- 3 files changed, 12 insertions(+), 6 deletions(-) diff --git a/package.json b/package.json index 9c36517..34761b6 100644 --- a/package.json +++ b/package.json @@ -65,10 +65,10 @@ "deep-extend": "~0.6.0", "err-code": "^2.0.0", "hamt-sharding": "~0.0.2", - "interval-promise": "^1.3.0", "idb-keyval": "^3.2.0", - "ipfs-unixfs": "~0.1.16", + "interval-promise": "^1.3.0", "ipfs-block": "^0.8.1", + "ipfs-unixfs": "~0.1.16", "ipld-dag-pb": "^0.18.0", "multicodec": "~0.5.1", "multihashing-async": "~0.7.0", diff --git a/test/test-speed.browser.js b/test/test-speed.browser.js index 5763359..ec9ffb3 100644 --- a/test/test-speed.browser.js +++ b/test/test-speed.browser.js @@ -1,3 +1,4 @@ +/* eslint-disable no-console */ /* eslint-env mocha */ 'use strict' @@ -11,6 +12,7 @@ const BlockService = require('ipfs-block-service') const { openDB } = require('idb') // Run benchmarkjs in the browser https://github.com/bestiejs/benchmark.js/issues/128#issuecomment-271615298 +// We need to change Karma webpack config according to the above link const _ = require('lodash') const process = require('process') const benchmark = require('benchmark') @@ -38,6 +40,7 @@ describe('benchmark', function () { let lastPercent = 0 const options = { + batch: false, progress: (prog) => { read += prog const percent = parseInt((read / FILE_SIZE) * 100) @@ -139,7 +142,7 @@ describe('benchmark', function () { const sizes = [10, 50, 100] for (const size of sizes) { - it.only(`benchmark ${size}mb`, (done) => { + it(`benchmark ${size}mb`, (done) => { const suite = new Benchmark.Suite() const FILE_SIZE = Math.pow(2, 20) * size diff --git a/test/test-speed.js b/test/test-speed.js index 5a147e8..8aa890d 100644 --- a/test/test-speed.js +++ b/test/test-speed.js @@ -1,3 +1,4 @@ +/* eslint-disable no-console */ /* eslint-env mocha */ 'use strict' @@ -25,7 +26,7 @@ const CHUNK_SIZE = 65536 describe('benchmark', function () { this.timeout(0) - it.only('single run 500mb without batch', async () => { // eslint-disable-line no-loop-func + it('single run 500mb without batch', async () => { // eslint-disable-line no-loop-func const ipld = await createIPLD() const times = [] let read = 0 @@ -33,6 +34,8 @@ describe('benchmark', function () { let lastPercent = 0 const options = { + batchInterval: 50, + batch: false, progress: (prog) => { read += prog const percent = parseInt((read / FILE_SIZE) * 100) @@ -59,7 +62,7 @@ describe('benchmark', function () { // }) }) - it.only('single run 500mb with batch 50ms', async () => { // eslint-disable-line no-loop-func + it('single run 500mb with batch 50ms', async () => { // eslint-disable-line no-loop-func const ipld = await createIPLD() const options = { @@ -78,7 +81,7 @@ describe('benchmark', function () { }], ipld, options)) }) - it.only('single run 500mb with batch 100ms', async () => { // eslint-disable-line no-loop-func + it('single run 500mb with batch 100ms', async () => { // eslint-disable-line no-loop-func const ipld = await createIPLD() const options = { From a0b5bd59cc56533ac5be4d1946295c7c6e3cecc7 Mon Sep 17 00:00:00 2001 From: Hugo Dias Date: Thu, 10 Oct 2019 14:20:03 +0100 Subject: [PATCH 3/3] fix: fix the timer for batch --- package.json | 6 +- src/dag-builder/file/index.js | 28 ++-- test/{benchmark.spec.js => benchmark.js} | 0 test/test-speed.browser.js | 163 ----------------------- test/test-speed.js | 163 ----------------------- 5 files changed, 16 insertions(+), 344 deletions(-) rename test/{benchmark.spec.js => benchmark.js} (100%) diff --git a/package.json b/package.json index 34761b6..2a22065 100644 --- a/package.json +++ b/package.json @@ -44,11 +44,10 @@ "chai": "^4.2.0", "detect-node": "^2.0.4", "dirty-chai": "^2.0.1", - "idb": "^4.0.4", "ipfs-block-service": "^0.16.0", "ipfs-repo": "^0.28.0", "ipfs-unixfs-exporter": "~0.37.0", - "ipld": "^0.25.0", + "ipld": "ipld/js-ipld#feat/batch-put", "ipld-in-memory": "^3.0.0", "iso-random-stream": "^1.1.1", "lodash": "^4.17.15", @@ -65,14 +64,13 @@ "deep-extend": "~0.6.0", "err-code": "^2.0.0", "hamt-sharding": "~0.0.2", - "idb-keyval": "^3.2.0", - "interval-promise": "^1.3.0", "ipfs-block": "^0.8.1", "ipfs-unixfs": "~0.1.16", "ipld-dag-pb": "^0.18.0", "multicodec": "~0.5.1", "multihashing-async": "~0.7.0", "rabin-wasm": "~0.0.8", + "set-interval-async": "^1.0.29", "superstruct": "~0.6.1" }, "contributors": [ diff --git a/src/dag-builder/file/index.js b/src/dag-builder/file/index.js index 6a2ad53..3f1ac79 100644 --- a/src/dag-builder/file/index.js +++ b/src/dag-builder/file/index.js @@ -5,7 +5,8 @@ const errCode = require('err-code') const UnixFS = require('ipfs-unixfs') const mh = require('multihashes') const mc = require('multicodec') -const interval = require('interval-promise') +const { setIntervalAsync } = require('set-interval-async/dynamic') +const { clearIntervalAsync } = require('set-interval-async') const persist = require('../../utils/persist') const { DAGNode, @@ -66,7 +67,7 @@ async function * buildFile (source, ipld, options) { } const serialize = (node, ipld, options) => { - if ((!options.codec && node.length) || options.rawLeaves) { + if (!options.codec && node.length) { options.cidVersion = 1 options.codec = 'raw' } @@ -93,14 +94,10 @@ async function * buildFileBatch (source, ipld, options) { let previous let nodesToPersist = [] - const save = interval(async (iteration, stop) => { - if (nodesToPersist.length) { - const temp = nodesToPersist - nodesToPersist = [] - await ipld.putBatch(temp, options) - } else { - stop() - } + const timer = setIntervalAsync(async () => { + const temp = nodesToPersist + nodesToPersist = [] + await ipld.putBatch(temp, options) }, options.batchInterval) for await (const buffer of source) { @@ -108,16 +105,19 @@ async function * buildFileBatch (source, ipld, options) { options.progress(buffer.length) let node let unixfs - + const opts = { ...options } if (options.rawLeaves) { node = buffer + opts.codec = 'raw' + opts.cidVersion = 1 } else { unixfs = new UnixFS(options.leafType, buffer) node = new DAGNode(unixfs.marshal()) } - const result = await serialize(node, ipld, options) + const result = await serialize(node, ipld, opts) nodesToPersist.push(new Block(result[1], result[0])) + const entry = { cid: result[0], unixfs, @@ -133,9 +133,9 @@ async function * buildFileBatch (source, ipld, options) { yield entry } - // Wait for everything to be saved - await save + await clearIntervalAsync(timer) + await ipld.putBatch(nodesToPersist, options) if (previous) { previous.single = true diff --git a/test/benchmark.spec.js b/test/benchmark.js similarity index 100% rename from test/benchmark.spec.js rename to test/benchmark.js diff --git a/test/test-speed.browser.js b/test/test-speed.browser.js index ec9ffb3..8e6b3a1 100644 --- a/test/test-speed.browser.js +++ b/test/test-speed.browser.js @@ -9,7 +9,6 @@ const all = require('async-iterator-all') const randomBuffer = require('iso-random-stream/src/random') const IPFSRepo = require('ipfs-repo') const BlockService = require('ipfs-block-service') -const { openDB } = require('idb') // Run benchmarkjs in the browser https://github.com/bestiejs/benchmark.js/issues/128#issuecomment-271615298 // We need to change Karma webpack config according to the above link @@ -253,165 +252,3 @@ describe('benchmark', function () { }) } }) - -const { Store, set, get, del } = require('idb-keyval') - -function isStrictTypedArray (arr) { - return ( - arr instanceof Int8Array || - arr instanceof Int16Array || - arr instanceof Int32Array || - arr instanceof Uint8Array || - arr instanceof Uint8ClampedArray || - arr instanceof Uint16Array || - arr instanceof Uint32Array || - arr instanceof Float32Array || - arr instanceof Float64Array - ) -} - -function typedarrayToBuffer (arr) { - if (isStrictTypedArray(arr)) { - // To avoid a copy, use the typed array's underlying ArrayBuffer to back new Buffer - var buf = Buffer.from(arr.buffer) - if (arr.byteLength !== arr.buffer.byteLength) { - // Respect the "view", i.e. byteOffset and byteLength, without doing a copy - buf = buf.slice(arr.byteOffset, arr.byteOffset + arr.byteLength) - } - return buf - } else { - // Pass through all other types to `Buffer.from` - return Buffer.from(arr) - } -} - -class IdbDatastore { - constructor (location) { - this.store = new Store(location, location) - } - - open () { - } - - put (key, val) { - return set(key.toBuffer(), val, this.store) - } - - async get (key, callback) { - const value = await get(key.toBuffer(), this.store) - - if (!value) { - return callback(new Error('No value')) - } - - return typedarrayToBuffer(value) - } - - async has (key) { - const v = await get(key.toBuffer(), this.store) - if (v) { - return true - } - return false - } - - delete (key) { - return del(key.toBuffer(), this.store) - } - - batch () { - const puts = [] - const dels = [] - - return { - put (key, value) { - puts.push([key.toBuffer(), value]) - }, - delete (key) { - dels.push(key.toBuffer()) - }, - commit: () => { - return Promise.all(puts.map(p => this.put(p[0], p[1]))) - } - } - } - - query (q) { - return null - } - - close () { - - } -} - -class IdbDatastoreBatch { - constructor (location) { - this.location = location - } - - open () { - const location = this.location - return openDB(this.location, 1, { - upgrade (db) { - db.createObjectStore(location) - } - }) - } - - async put (key, val) { - const db = await this.open() - return db.put(this.location, val, key.toBuffer()) - } - - async get (key) { - const db = await this.open() - const value = await db.get(this.location, key.toBuffer()) - - if (!value) { - throw new Error('No value') - } - - return typedarrayToBuffer(value) - } - - async has (key) { - const db = await this.open() - return Boolean(await db.get(this.location, key.toBuffer())) - } - - async delete (key) { - const db = await this.open() - return db.del(this.location, key.toBuffer()) - } - - batch () { - const puts = [] - const dels = [] - - return { - put (key, value) { - puts.push([key.toBuffer(), value]) - }, - delete (key) { - dels.push(key.toBuffer()) - }, - commit: async () => { - const db = await this.open() - const tx = db.transaction(this.location, 'readwrite') - const store = tx.store - await Promise.all(puts.map(p => store.put(p[1], p[0]))) - await Promise.all(dels.map(p => store.del(p))) - await tx.done - } - } - } - - query (q) { - return null - } - - close () { - this.store.close() - } -} diff --git a/test/test-speed.js b/test/test-speed.js index 8aa890d..5cf3239 100644 --- a/test/test-speed.js +++ b/test/test-speed.js @@ -9,7 +9,6 @@ const all = require('async-iterator-all') const randomBuffer = require('iso-random-stream/src/random') const IPFSRepo = require('ipfs-repo') const BlockService = require('ipfs-block-service') -const { openDB } = require('idb') const tempy = require('tempy') const Benchmark = require('benchmark') @@ -251,165 +250,3 @@ describe('benchmark', function () { }) } }) - -const { Store, set, get, del } = require('idb-keyval') - -function isStrictTypedArray (arr) { - return ( - arr instanceof Int8Array || - arr instanceof Int16Array || - arr instanceof Int32Array || - arr instanceof Uint8Array || - arr instanceof Uint8ClampedArray || - arr instanceof Uint16Array || - arr instanceof Uint32Array || - arr instanceof Float32Array || - arr instanceof Float64Array - ) -} - -function typedarrayToBuffer (arr) { - if (isStrictTypedArray(arr)) { - // To avoid a copy, use the typed array's underlying ArrayBuffer to back new Buffer - var buf = Buffer.from(arr.buffer) - if (arr.byteLength !== arr.buffer.byteLength) { - // Respect the "view", i.e. byteOffset and byteLength, without doing a copy - buf = buf.slice(arr.byteOffset, arr.byteOffset + arr.byteLength) - } - return buf - } else { - // Pass through all other types to `Buffer.from` - return Buffer.from(arr) - } -} - -class IdbDatastore { - constructor (location) { - this.store = new Store(location, location) - } - - open () { - } - - put (key, val) { - return set(key.toBuffer(), val, this.store) - } - - async get (key, callback) { - const value = await get(key.toBuffer(), this.store) - - if (!value) { - return callback(new Error('No value')) - } - - return typedarrayToBuffer(value) - } - - async has (key) { - const v = await get(key.toBuffer(), this.store) - if (v) { - return true - } - return false - } - - delete (key) { - return del(key.toBuffer(), this.store) - } - - batch () { - const puts = [] - const dels = [] - - return { - put (key, value) { - puts.push([key.toBuffer(), value]) - }, - delete (key) { - dels.push(key.toBuffer()) - }, - commit: () => { - return Promise.all(puts.map(p => this.put(p[0], p[1]))) - } - } - } - - query (q) { - return null - } - - close () { - - } -} - -class IdbDatastoreBatch { - constructor (location) { - this.location = location - } - - open () { - const location = this.location - return openDB(this.location, 1, { - upgrade (db) { - db.createObjectStore(location) - } - }) - } - - async put (key, val) { - const db = await this.open() - return db.put(this.location, val, key.toBuffer()) - } - - async get (key) { - const db = await this.open() - const value = await db.get(this.location, key.toBuffer()) - - if (!value) { - throw new Error('No value') - } - - return typedarrayToBuffer(value) - } - - async has (key) { - const db = await this.open() - return Boolean(await db.get(this.location, key.toBuffer())) - } - - async delete (key) { - const db = await this.open() - return db.del(this.location, key.toBuffer()) - } - - batch () { - const puts = [] - const dels = [] - - return { - put (key, value) { - puts.push([key.toBuffer(), value]) - }, - delete (key) { - dels.push(key.toBuffer()) - }, - commit: async () => { - const db = await this.open() - const tx = db.transaction(this.location, 'readwrite') - const store = tx.store - await Promise.all(puts.map(p => store.put(p[1], p[0]))) - await Promise.all(dels.map(p => store.del(p))) - await tx.done - } - } - } - - query (q) { - return null - } - - close () { - this.store.close() - } -}