Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

.write(Buffer) support #174

Merged
merged 7 commits into from
Oct 3, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 32 additions & 5 deletions bench.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,8 @@ const dummyConsole = new Console(fs.createWriteStream('/dev/null'))

const MAX = 10000

let str = ''

for (let i = 0; i < 10; i++) {
str += 'hello'
}
const buf = Buffer.alloc(50, 'hello', 'utf8')
const str = buf.toString()

setTimeout(doBench, 100)

Expand Down Expand Up @@ -59,6 +56,36 @@ const run = bench([
dummyConsole.log(str)
}
setImmediate(cb)
},
function benchSonicBuf (cb) {
sonic.once('drain', cb)
for (let i = 0; i < MAX; i++) {
sonic.write(buf)
}
},
function benchSonicSyncBuf (cb) {
sonicSync.once('drain', cb)
for (let i = 0; i < MAX; i++) {
sonicSync.write(buf)
}
},
function benchSonic4kBuf (cb) {
sonic4k.once('drain', cb)
for (let i = 0; i < MAX; i++) {
sonic4k.write(buf)
}
},
function benchSonicSync4kBuf (cb) {
sonicSync4k.once('drain', cb)
for (let i = 0; i < MAX; i++) {
sonicSync4k.write(buf)
}
},
function benchCoreBuf (cb) {
core.once('drain', cb)
for (let i = 0; i < MAX; i++) {
core.write(buf)
}
}
], 1000)

Expand Down
64 changes: 43 additions & 21 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ const path = require('path')
const sleep = require('atomic-sleep')

const BUSY_WRITE_TIMEOUT = 100
const kEmptyBuffer = Buffer.allocUnsafe(0)

// 16 KB. Don't write more than docker buffer size.
// https://github.com/moby/moby/blob/513ec73831269947d38a644c278ce3cac36783b2/daemon/logger/copier.go#L13
Expand Down Expand Up @@ -92,10 +93,11 @@ function SonicBoom (opts) {
fd = fd || dest

this._bufs = []
this._lens = []
this._len = 0
this.fd = -1
this._writing = false
this._writingBuf = ''
this._writingBuf = kEmptyBuffer
this._ending = false
this._reopening = false
this._asyncDrainScheduled = false
Expand All @@ -106,6 +108,7 @@ function SonicBoom (opts) {
this.maxLength = maxLength || 0
this.maxWrite = maxWrite || MAX_WRITE
this.sync = sync || false
this.writable = true
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

forgot to mention, this is so that node.js stream util compose function works properly with this

this._fsync = fsync || false
this.append = append || false
this.mode = mode
Expand Down Expand Up @@ -141,7 +144,7 @@ function SonicBoom (opts) {
} else {
// Let's give the destination some time to process the chunk.
setTimeout(() => {
fs.write(this.fd, this._writingBuf, 'utf8', this.release)
fs.write(this.fd, this._writingBuf, this.release)
}, BUSY_WRITE_TIMEOUT)
}
} else {
Expand All @@ -167,20 +170,20 @@ function SonicBoom (opts) {
// TODO if we have a multi-byte character in the buffer, we need to
// n might not be the same as this._writingBuf.length, so we might loose
// characters here. The solution to this problem is to use a Buffer for _writingBuf.
this._writingBuf = this._writingBuf.slice(n)
this._writingBuf = this._writingBuf.subarray(n)

if (this._writingBuf.length) {
if (!this.sync) {
fs.write(this.fd, this._writingBuf, 'utf8', this.release)
fs.write(this.fd, this._writingBuf, this.release)
return
}

try {
do {
const n = fs.writeSync(this.fd, this._writingBuf, 'utf8')
const n = fs.writeSync(this.fd, this._writingBuf)
this._len -= n
this._writingBuf = this._writingBuf.slice(n)
} while (this._writingBuf)
this._writingBuf = this._writingBuf.subarray(n)
} while (this._writingBuf.length)
} catch (err) {
this.release(err)
return
Expand Down Expand Up @@ -234,13 +237,27 @@ function emitDrain (sonic) {

inherits(SonicBoom, EventEmitter)

SonicBoom.prototype.write = function (data) {
function mergeBuf (bufs, len) {
if (bufs.length === 0) {
return kEmptyBuffer
}

if (bufs.length === 1) {
return bufs[0]
}

return Buffer.concat(bufs, len)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the source of the slowdown. We should not merge them, but rather keep them as a list.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've tried having a list here, but tests that expect single flush instead of multiples break. Will work on it further now that there are 2 separate content modes

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All in all Buffer.concat seem to be cheaper than multiple fs.write as long as it doesn't have to allocate memory outside of the Buffer.poolSize. Will investigate further how that could be avoided

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tried fs.writev to avoid concat, but that made it slower than writeStream

}

SonicBoom.prototype.write = function (_data) {
if (this.destroyed) {
throw new Error('SonicBoom destroyed')
}

const data = Buffer.isBuffer(_data) ? _data : Buffer.from(_data, 'utf8')
const len = this._len + data.length
const bufs = this._bufs
const lens = this._lens

if (this.maxLength && len > this.maxLength) {
this.emit('drop', data)
Expand All @@ -249,11 +266,13 @@ SonicBoom.prototype.write = function (data) {

if (
bufs.length === 0 ||
bufs[bufs.length - 1].length + data.length > this.maxWrite
lens[lens.length - 1] + data.length > this.maxWrite
) {
bufs.push('' + data)
bufs.push([data])
lens.push(data.length)
} else {
bufs[bufs.length - 1] += data
bufs[bufs.length - 1].push(data)
lens[lens.length - 1] += data.length
}

this._len = len
Expand All @@ -275,7 +294,8 @@ SonicBoom.prototype.flush = function () {
}

if (this._bufs.length === 0) {
this._bufs.push('')
this._bufs.push([])
this._lens.push(0)
}

actualWrite(this)
Expand Down Expand Up @@ -360,21 +380,22 @@ SonicBoom.prototype.flushSync = function () {
}

if (!this._writing && this._writingBuf.length > 0) {
this._bufs.unshift(this._writingBuf)
this._writingBuf = ''
this._bufs.unshift([this._writingBuf])
this._writingBuf = kEmptyBuffer
}

let buf = ''
let buf = kEmptyBuffer
while (this._bufs.length || buf.length) {
if (buf.length <= 0) {
buf = this._bufs[0]
buf = mergeBuf(this._bufs[0], this._lens[0])
}
try {
const n = fs.writeSync(this.fd, buf, 'utf8')
buf = buf.slice(n)
const n = fs.writeSync(this.fd, buf)
buf = buf.subarray(n)
this._len = Math.max(this._len - n, 0)
if (buf.length <= 0) {
this._bufs.shift()
this._lens.shift()
}
} catch (err) {
const shouldRetry = err.code === 'EAGAIN' || err.code === 'EBUSY'
Expand All @@ -397,17 +418,17 @@ SonicBoom.prototype.destroy = function () {
function actualWrite (sonic) {
const release = sonic.release
sonic._writing = true
sonic._writingBuf = sonic._writingBuf || sonic._bufs.shift() || ''
sonic._writingBuf = sonic._writingBuf.length ? sonic._writingBuf : mergeBuf(sonic._bufs.shift(), sonic._lens.shift())

if (sonic.sync) {
try {
const written = fs.writeSync(sonic.fd, sonic._writingBuf, 'utf8')
const written = fs.writeSync(sonic.fd, sonic._writingBuf)
release(null, written)
} catch (err) {
release(err)
}
} else {
fs.write(sonic.fd, sonic._writingBuf, 'utf8', release)
fs.write(sonic.fd, sonic._writingBuf, release)
}
}

Expand All @@ -419,6 +440,7 @@ function actualClose (sonic) {

sonic.destroyed = true
sonic._bufs = []
sonic._lens = []

if (sonic.fd !== 1 && sonic.fd !== 2) {
fs.close(sonic.fd, done)
Expand Down
18 changes: 9 additions & 9 deletions test/retry.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ function buildTests (test, sync) {
t.plan(7)

const fakeFs = Object.create(fs)
fakeFs.write = function (fd, buf, enc, cb) {
fakeFs.write = function (fd, buf, cb) {
t.pass('fake fs.write called')
fakeFs.write = fs.write
const err = new Error('EAGAIN')
Expand Down Expand Up @@ -56,7 +56,7 @@ test('emit error on async EAGAIN', (t) => {
t.plan(11)

const fakeFs = Object.create(fs)
fakeFs.write = function (fd, buf, enc, cb) {
fakeFs.write = function (fd, buf, cb) {
t.pass('fake fs.write called')
fakeFs.write = fs.write
const err = new Error('EAGAIN')
Expand Down Expand Up @@ -109,7 +109,7 @@ test('retry on EAGAIN (sync)', (t) => {
t.plan(7)

const fakeFs = Object.create(fs)
fakeFs.writeSync = function (fd, buf, enc, cb) {
fakeFs.writeSync = function (fd, buf, cb) {
t.pass('fake fs.writeSync called')
fakeFs.writeSync = fs.writeSync
const err = new Error('EAGAIN')
Expand Down Expand Up @@ -148,7 +148,7 @@ test('emit error on EAGAIN (sync)', (t) => {
t.plan(11)

const fakeFs = Object.create(fs)
fakeFs.writeSync = function (fd, buf, enc, cb) {
fakeFs.writeSync = function (fd, buf, cb) {
t.pass('fake fs.writeSync called')
fakeFs.writeSync = fs.writeSync
const err = new Error('EAGAIN')
Expand Down Expand Up @@ -228,7 +228,7 @@ test('retryEAGAIN receives remaining buffer on async if write fails', (t) => {
t.ok(stream.write('done'))
})

fakeFs.write = function (fd, buf, enc, cb) {
fakeFs.write = function (fd, buf, cb) {
t.pass('fake fs.write called')
fakeFs.write = fs.write
const err = new Error('EAGAIN')
Expand Down Expand Up @@ -279,14 +279,14 @@ test('retryEAGAIN receives remaining buffer if exceeds maxWrite', (t) => {
t.pass('ready emitted')
})

fakeFs.write = function (fd, buf, enc, cb) {
fakeFs.write = function (fd, buf, cb) {
t.pass('fake fs.write called')
const err = new Error('EAGAIN')
err.code = 'EAGAIN'
process.nextTick(cb, err)
}

fakeFs.writeSync = function (fd, buf, enc, cb) {
fakeFs.writeSync = function (fd, buf, cb) {
t.pass('fake fs.write called')
const err = new Error('EAGAIN')
err.code = 'EAGAIN'
Expand Down Expand Up @@ -325,7 +325,7 @@ test('retry on EBUSY', (t) => {
t.plan(7)

const fakeFs = Object.create(fs)
fakeFs.write = function (fd, buf, enc, cb) {
fakeFs.write = function (fd, buf, cb) {
t.pass('fake fs.write called')
fakeFs.write = fs.write
const err = new Error('EBUSY')
Expand Down Expand Up @@ -364,7 +364,7 @@ test('emit error on async EBUSY', (t) => {
t.plan(11)

const fakeFs = Object.create(fs)
fakeFs.write = function (fd, buf, enc, cb) {
fakeFs.write = function (fd, buf, cb) {
t.pass('fake fs.write called')
fakeFs.write = fs.write
const err = new Error('EBUSY')
Expand Down
16 changes: 8 additions & 8 deletions test/write.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -181,12 +181,12 @@ function buildTests (test, sync) {
})

if (sync) {
fakeFs.writeSync = function (fd, buf, enc) {
fakeFs.writeSync = function (fd, buf) {
t.pass('fake fs.writeSync called')
throw new Error('recoverable error')
}
} else {
fakeFs.write = function (fd, buf, enc, cb) {
fakeFs.write = function (fd, buf, cb) {
t.pass('fake fs.write called')
setTimeout(() => cb(new Error('recoverable error')), 0)
}
Expand Down Expand Up @@ -253,11 +253,11 @@ test('write buffers that are not totally written', (t) => {
t.plan(9)

const fakeFs = Object.create(fs)
fakeFs.write = function (fd, buf, enc, cb) {
fakeFs.write = function (fd, buf, cb) {
t.pass('fake fs.write called')
fakeFs.write = function (fd, buf, enc, cb) {
fakeFs.write = function (fd, buf, cb) {
t.pass('calling real fs.write, ' + buf)
fs.write(fd, buf, enc, cb)
fs.write(fd, buf, cb)
}
process.nextTick(cb, null, 0)
}
Expand Down Expand Up @@ -336,7 +336,7 @@ test('write enormously large buffers async atomicly', (t) => {

const buf = Buffer.alloc(1023).fill('x').toString()

fakeFs.write = function (fd, _buf, enc, cb) {
fakeFs.write = function (fd, _buf, cb) {
if (_buf.length % buf.length !== 0) {
t.fail('write called with wrong buffer size')
}
Expand Down Expand Up @@ -375,7 +375,7 @@ test('write should not drop new data if buffer is not full', (t) => {

const buf = Buffer.alloc(100).fill('x').toString()

fakeFs.write = function (fd, _buf, enc, cb) {
fakeFs.write = function (fd, _buf, cb) {
t.equal(_buf.length, buf.length + 2)
setImmediate(cb, null, _buf.length)
fakeFs.write = () => t.error('shouldnt call write again')
Expand Down Expand Up @@ -407,7 +407,7 @@ test('write should drop new data if buffer is full', (t) => {

const buf = Buffer.alloc(100).fill('x').toString()

fakeFs.write = function (fd, _buf, enc, cb) {
fakeFs.write = function (fd, _buf, cb) {
t.equal(_buf.length, buf.length)
setImmediate(cb, null, _buf.length)
fakeFs.write = () => t.error('shouldnt call write more than once')
Expand Down