Skip to content

Commit

Permalink
Merge pull request #5 from eliot-akira/stream-files-by-line
Browse files Browse the repository at this point in the history
Stream database file line by line to avoid string length limit in Node.js
  • Loading branch information
tex0l authored Oct 19, 2021
2 parents a8ef348 + 84e19ea commit 1d8c888
Show file tree
Hide file tree
Showing 16 changed files with 30,275 additions and 42 deletions.
15 changes: 15 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,21 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres
to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [2.1.0-2] - 2021-10-14
### Changed
- properly streaming writing the database file

## [2.1.0-1] - 2021-10-07
### Changed
- fixed package.json browser field for byline.js
- last minute improvements on [PR](https://github.com/seald/nedb/pull/5)

## [2.1.0-0] - 2021-10-05
Thank [@eliot-akira](https://github.com/eliot-akira) for the amazing work on this.
### Changed
- [implement file streaming of the database](https://github.com/seald/nedb/pull/5) like [a PR on the original repo](https://github.com/louischatriot/nedb/pull/463) did;
- internalize [`byline`](https://github.com/jahewson/node-byline) package because it is unmaintained.

## [2.0.4] - 2021-07-12
### Fixed
- switch back to an AVLTree instead of a BinarySearchTree like the original nedb to fix [#1](https://github.com/seald/nedb/issues/1).
Expand Down
1 change: 1 addition & 0 deletions browser-version/lib/byline.js
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
module.exports = {}
7 changes: 6 additions & 1 deletion browser-version/lib/storage.js
Original file line number Diff line number Diff line change
Expand Up @@ -68,11 +68,16 @@ const mkdir = (dir, options, callback) => callback()
// Nothing to do, no data corruption possible in the browser
const ensureDatafileIntegrity = (filename, callback) => callback(null)

const crashSafeWriteFileLines = (filename, lines, callback) => {
lines.push('') // Add final new line
writeFile(filename, lines.join('\n'), callback)
}

// Interface
module.exports.exists = exists
module.exports.rename = rename
module.exports.writeFile = writeFile
module.exports.crashSafeWriteFile = writeFile // No need for a crash safe function in the browser
module.exports.crashSafeWriteFileLines = crashSafeWriteFileLines
module.exports.appendFile = appendFile
module.exports.readFile = readFile
module.exports.unlink = unlink
Expand Down
153 changes: 153 additions & 0 deletions lib/byline.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
// Forked from https://github.com/jahewson/node-byline

// Copyright (C) 2011-2015 John Hewson
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to
// deal in the Software without restriction, including without limitation the
// rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
// sell copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
// IN THE SOFTWARE.

const stream = require('stream')
const util = require('util')
const timers = require('timers')

// convinience API
module.exports = function (readStream, options) {
return module.exports.createStream(readStream, options)
}

// basic API
module.exports.createStream = function (readStream, options) {
if (readStream) {
return createLineStream(readStream, options)
} else {
return new LineStream(options)
}
}

// deprecated API
module.exports.createLineStream = function (readStream) {
console.log('WARNING: byline#createLineStream is deprecated and will be removed soon')
return createLineStream(readStream)
}

function createLineStream (readStream, options) {
if (!readStream) {
throw new Error('expected readStream')
}
if (!readStream.readable) {
throw new Error('readStream must be readable')
}
const ls = new LineStream(options)
readStream.pipe(ls)
return ls
}

//
// using the new node v0.10 "streams2" API
//

module.exports.LineStream = LineStream

function LineStream (options) {
stream.Transform.call(this, options)
options = options || {}

// use objectMode to stop the output from being buffered
// which re-concatanates the lines, just without newlines.
this._readableState.objectMode = true
this._lineBuffer = []
this._keepEmptyLines = options.keepEmptyLines || false
this._lastChunkEndedWithCR = false

// take the source's encoding if we don't have one
const self = this
this.on('pipe', function (src) {
if (!self.encoding) {
// but we can't do this for old-style streams
if (src instanceof stream.Readable) {
self.encoding = src._readableState.encoding
}
}
})
}
util.inherits(LineStream, stream.Transform)

LineStream.prototype._transform = function (chunk, encoding, done) {
// decode binary chunks as UTF-8
encoding = encoding || 'utf8'

if (Buffer.isBuffer(chunk)) {
if (encoding === 'buffer') {
chunk = chunk.toString() // utf8
encoding = 'utf8'
} else {
chunk = chunk.toString(encoding)
}
}
this._chunkEncoding = encoding

// see: http://www.unicode.org/reports/tr18/#Line_Boundaries
const lines = chunk.split(/\r\n|[\n\v\f\r\x85\u2028\u2029]/g)

// don't split CRLF which spans chunks
if (this._lastChunkEndedWithCR && chunk[0] === '\n') {
lines.shift()
}

if (this._lineBuffer.length > 0) {
this._lineBuffer[this._lineBuffer.length - 1] += lines[0]
lines.shift()
}

this._lastChunkEndedWithCR = chunk[chunk.length - 1] === '\r'
this._lineBuffer = this._lineBuffer.concat(lines)
this._pushBuffer(encoding, 1, done)
}

LineStream.prototype._pushBuffer = function (encoding, keep, done) {
// always buffer the last (possibly partial) line
while (this._lineBuffer.length > keep) {
const line = this._lineBuffer.shift()
// skip empty lines
if (this._keepEmptyLines || line.length > 0) {
if (!this.push(this._reencode(line, encoding))) {
// when the high-water mark is reached, defer pushes until the next tick
timers.setImmediate(() => {
this._pushBuffer(encoding, keep, done)
})
return
}
}
}
done()
}

LineStream.prototype._flush = function (done) {
this._pushBuffer(this._chunkEncoding, 0, done)
}

// see Readable::push
LineStream.prototype._reencode = function (line, chunkEncoding) {
if (this.encoding && this.encoding !== chunkEncoding) {
return Buffer.from(line, chunkEncoding).toString(this.encoding)
} else if (this.encoding) {
// this should be the most common case, i.e. we're using an encoded source stream
return line
} else {
return Buffer.from(line, chunkEncoding)
}
}
90 changes: 76 additions & 14 deletions lib/persistence.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
*/
const path = require('path')
const async = require('async')
const byline = require('./byline')
const customUtils = require('./customUtils.js')
const Index = require('./indexes.js')
const model = require('./model.js')
Expand Down Expand Up @@ -72,26 +73,26 @@ class Persistence {
* @param {Function} callback Optional callback, signature: err
*/
persistCachedDatabase (callback = () => {}) {
let toPersist = ''
const lines = []

if (this.inMemoryOnly) return callback(null)

this.db.getAllData().forEach(doc => {
toPersist += this.afterSerialization(model.serialize(doc)) + '\n'
lines.push(this.afterSerialization(model.serialize(doc)))
})
Object.keys(this.db.indexes).forEach(fieldName => {
if (fieldName !== '_id') { // The special _id index is managed by datastore.js, the others need to be persisted
toPersist += this.afterSerialization(model.serialize({
lines.push(this.afterSerialization(model.serialize({
$$indexCreated: {
fieldName: fieldName,
unique: this.db.indexes[fieldName].unique,
sparse: this.db.indexes[fieldName].sparse
}
})) + '\n'
})))
}
})

storage.crashSafeWriteFile(this.filename, toPersist, err => {
storage.crashSafeWriteFileLines(this.filename, lines, err => {
if (err) return callback(err)
this.db.emit('compaction.done')
return callback(null)
Expand Down Expand Up @@ -155,8 +156,9 @@ class Persistence {
treatRawData (rawData) {
const data = rawData.split('\n')
const dataById = {}
const tdata = []
const indexes = {}

// Last line of every data file is usually blank so not really corrupt
let corruptItems = -1

for (const datum of data) {
Expand All @@ -178,11 +180,58 @@ class Persistence {
corruptItems / data.length > this.corruptAlertThreshold
) throw new Error(`More than ${Math.floor(100 * this.corruptAlertThreshold)}% of the data file is corrupt, the wrong beforeDeserialization hook may be used. Cautiously refusing to start NeDB to prevent dataloss`)

tdata.push(...Object.values(dataById))
const tdata = Object.values(dataById)

return { data: tdata, indexes: indexes }
}

/**
* From a database's raw stream, return the corresponding
* machine understandable collection
*/
treatRawStream (rawStream, cb) {
const dataById = {}
const indexes = {}

// Last line of every data file is usually blank so not really corrupt
let corruptItems = -1

const lineStream = byline(rawStream, { keepEmptyLines: true })
let length = 0

lineStream.on('data', (line) => {
try {
const doc = model.deserialize(this.beforeDeserialization(line))
if (doc._id) {
if (doc.$$deleted === true) delete dataById[doc._id]
else dataById[doc._id] = doc
} else if (doc.$$indexCreated && doc.$$indexCreated.fieldName != null) indexes[doc.$$indexCreated.fieldName] = doc.$$indexCreated
else if (typeof doc.$$indexRemoved === 'string') delete indexes[doc.$$indexRemoved]
} catch (e) {
corruptItems += 1
}

length++
})

lineStream.on('end', () => {
// A bit lenient on corruption
if (length > 0 && corruptItems / length > this.corruptAlertThreshold) {
const err = new Error(`More than ${Math.floor(100 * this.corruptAlertThreshold)}% of the data file is corrupt, the wrong beforeDeserialization hook may be used. Cautiously refusing to start NeDB to prevent dataloss`)
cb(err, null)
return
}

const data = Object.values(dataById)

cb(null, { data, indexes: indexes })
})

lineStream.on('error', function (err) {
cb(err)
})
}

/**
* Load the database
* 1) Create all indexes
Expand All @@ -207,14 +256,8 @@ class Persistence {
// eslint-disable-next-line node/handle-callback-err
storage.ensureDatafileIntegrity(this.filename, err => {
// TODO: handle error
storage.readFile(this.filename, 'utf8', (err, rawData) => {
const treatedDataCallback = (err, treatedData) => {
if (err) return cb(err)
let treatedData
try {
treatedData = this.treatRawData(rawData)
} catch (e) {
return cb(e)
}

// Recreate all indexes in the datafile
Object.keys(treatedData.indexes).forEach(key => {
Expand All @@ -230,6 +273,25 @@ class Persistence {
}

this.db.persistence.persistCachedDatabase(cb)
}

if (storage.readFileStream) {
// Server side
const fileStream = storage.readFileStream(this.filename, { encoding: 'utf8' })
this.treatRawStream(fileStream, treatedDataCallback)
return
}

// Browser
storage.readFile(this.filename, 'utf8', (err, rawData) => {
if (err) return cb(err)

try {
const treatedData = this.treatRawData(rawData)
treatedDataCallback(null, treatedData)
} catch (e) {
return cb(e)
}
})
})
})
Expand Down
Loading

0 comments on commit 1d8c888

Please sign in to comment.