Skip to content
This repository has been archived by the owner on Apr 29, 2020. It is now read-only.

add support for batch puts #38

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 10 additions & 3 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -40,16 +40,21 @@
"aegir": "^20.0.0",
"async-iterator-buffer-stream": "^1.0.0",
"async-iterator-last": "^1.0.0",
"benchmark": "^2.1.4",
"chai": "^4.2.0",
"cids": "~0.7.1",
"detect-node": "^2.0.4",
"dirty-chai": "^2.0.1",
"ipfs-block-service": "^0.16.0",
"ipfs-repo": "^0.28.0",
"ipfs-unixfs-exporter": "~0.37.0",
"ipld": "^0.25.0",
"ipld": "ipld/js-ipld#feat/batch-put",
"ipld-in-memory": "^3.0.0",
"iso-random-stream": "^1.1.1",
"lodash": "^4.17.15",
"multihashes": "~0.4.14",
"nyc": "^14.0.0",
"sinon": "^7.1.0"
"sinon": "^7.1.0",
"tempy": "^0.3.0"
},
"dependencies": {
"async-iterator-all": "^1.0.0",
Expand All @@ -59,11 +64,13 @@
"deep-extend": "~0.6.0",
"err-code": "^2.0.0",
"hamt-sharding": "~0.0.2",
"ipfs-block": "^0.8.1",
"ipfs-unixfs": "~0.1.16",
"ipld-dag-pb": "^0.18.0",
"multicodec": "~0.5.1",
"multihashing-async": "~0.7.0",
"rabin-wasm": "~0.0.8",
"set-interval-async": "^1.0.29",
"superstruct": "~0.6.1"
},
"contributors": [
Expand Down
93 changes: 88 additions & 5 deletions src/dag-builder/file/index.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@
'use strict'

const Block = require('ipfs-block')
const errCode = require('err-code')
const UnixFS = require('ipfs-unixfs')
const mh = require('multihashes')
const mc = require('multicodec')
const { setIntervalAsync } = require('set-interval-async/dynamic')
const { clearIntervalAsync } = require('set-interval-async')
const persist = require('../../utils/persist')
const {
DAGNode,
Expand All @@ -25,13 +30,10 @@ async function * buildFile (source, ipld, options) {
let node
let unixfs

const opts = {
...options
}
const opts = { ...options }

if (options.rawLeaves) {
node = buffer

opts.codec = 'raw'
opts.cidVersion = 1
} else {
Expand Down Expand Up @@ -64,6 +66,83 @@ async function * buildFile (source, ipld, options) {
}
}

const serialize = (node, ipld, options) => {
if (!options.codec && node.length) {
options.cidVersion = 1
options.codec = 'raw'
}

if (isNaN(options.hashAlg)) {
options.hashAlg = mh.names[options.hashAlg]
}

if (options.hashAlg !== mh.names['sha2-256']) {
options.cidVersion = 1
}

if (options.format) {
options.codec = options.format
}

const format = mc[options.codec.toUpperCase().replace(/-/g, '_')]

return ipld.serialize(node, format, options)
}

async function * buildFileBatch (source, ipld, options) {
let count = -1
let previous
let nodesToPersist = []

const timer = setIntervalAsync(async () => {
const temp = nodesToPersist
nodesToPersist = []
await ipld.putBatch(temp, options)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where does the error go if this throws? Should we retry the batch? What if retrying fails?

}, options.batchInterval)

for await (const buffer of source) {
count++
options.progress(buffer.length)
let node
let unixfs
const opts = { ...options }
if (options.rawLeaves) {
node = buffer
opts.codec = 'raw'
opts.cidVersion = 1
} else {
unixfs = new UnixFS(options.leafType, buffer)
node = new DAGNode(unixfs.marshal())
}

const result = await serialize(node, ipld, opts)
nodesToPersist.push(new Block(result[1], result[0]))

const entry = {
cid: result[0],
unixfs,
node
}
if (count === 0) {
previous = entry
continue
} else if (count === 1) {
yield previous
previous = null
}

yield entry
}
// Wait for everything to be saved
await clearIntervalAsync(timer)
await ipld.putBatch(nodesToPersist, options)

if (previous) {
previous.single = true
yield previous
}
}

const reduce = (file, ipld, options) => {
return async function (leaves) {
if (leaves.length === 1 && leaves[0].single && options.reduceSingleLeafToSelf) {
Expand Down Expand Up @@ -132,7 +211,11 @@ const fileBuilder = async (file, source, ipld, options) => {
throw errCode(new Error(`Unknown importer build strategy name: ${options.strategy}`), 'ERR_BAD_STRATEGY')
}

const roots = await all(dagBuilder(buildFile(source, ipld, options), reduce(file, ipld, options), options.builderOptions))
const roots = await all(dagBuilder(
options.batch ? buildFileBatch(source, ipld, options) : buildFile(source, ipld, options),
reduce(file, ipld, options),
options.builderOptions
))

if (roots.length > 1) {
throw errCode(new Error('expected a maximum of 1 roots and got ' + roots.length), 'ETOOMANYROOTS')
Expand Down
4 changes: 4 additions & 0 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ const Options = struct({
chunker: struct.enum(['fixed', 'rabin']),
rawLeaves: 'boolean?',
hashOnly: 'boolean?',
batch: 'boolean?',
batchInterval: 'number?',
strategy: struct.enum(['balanced', 'flat', 'trickle']),
reduceSingleLeafToSelf: 'boolean?',
codec: 'codec?',
Expand All @@ -62,6 +64,8 @@ const Options = struct({
chunker: 'fixed',
strategy: 'balanced',
rawLeaves: false,
batch: true,
batchInterval: 50,
reduceSingleLeafToSelf: true,
codec: 'dag-pb',
hashAlg: 'sha2-256',
Expand Down
2 changes: 1 addition & 1 deletion test/benchmark.spec.js → test/benchmark.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ describe.skip('benchmark', function () {
const times = []

after(() => {
console.info(`Percent\tms`) // eslint-disable-line no-console
console.info('Percent\tms') // eslint-disable-line no-console
times.forEach((time, index) => {
console.info(`${index}\t${parseInt(time / REPEATS)}`) // eslint-disable-line no-console
})
Expand Down
Loading