Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add flush callback #182

Merged
merged 5 commits into from
Oct 5, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -83,11 +83,13 @@ For `sync:true` this is not relevant because the `'ready'` event will be fired w
Writes the string to the file.
It will return false to signal the producer to slow down.

### SonicBoom#flush()
### SonicBoom#flush([cb])

Writes the current buffer to the file if a write was not in progress.
Do nothing if `minLength` is zero or if it is already writing.

call the callback when the flush operation is completed. when failed the callback is called with an error.

### SonicBoom#reopen([file])

Reopen the file in place, useful for log rotation.
Expand Down
49 changes: 44 additions & 5 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -336,12 +336,40 @@ function writeBuffer (data) {
return this._len < this._hwm
}

function flush () {
function callFlushCallbackOnDrain (cb) {
const onDrain = () => {
cb()
this.off('error', onError)
}
const onError = (err) => {
cb(err)
this.off('drain', onDrain)
}
this.once('drain', onDrain)
this.once('error', onError)
}
rluvaton marked this conversation as resolved.
Show resolved Hide resolved

function flush (cb) {
if (cb != null && typeof cb !== 'function') {
throw new Error('flush cb must be a function')
}

if (this.destroyed) {
throw new Error('SonicBoom destroyed')
const error = new Error('SonicBoom destroyed')
cb?.(error)
throw error
rluvaton marked this conversation as resolved.
Show resolved Hide resolved
}

if (this._writing || this.minLength <= 0) {
if (this.minLength <= 0) {
cb?.()
return
}

if (cb) {
callFlushCallbackOnDrain.call(this, cb)
}

if (this._writing) {
return
}

Expand All @@ -352,15 +380,26 @@ function flush () {
this._actualWrite()
}

function flushBuffer () {
function flushBuffer (cb) {
if (cb != null && typeof cb !== 'function') {
throw new Error('flush cb must be a function')
}

if (this.destroyed) {
throw new Error('SonicBoom destroyed')
const error = new Error('SonicBoom destroyed')
cb?.(error)
throw error
}

if (this._writing || this.minLength <= 0) {
cb?.()
return
}

if (cb) {
callFlushCallbackOnDrain.call(this, cb)
}

if (this._bufs.length === 0) {
this._bufs.push([])
this._lens.push(0)
Expand Down
145 changes: 145 additions & 0 deletions test/flush.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ const fs = require('fs')
const path = require('path')
const SonicBoom = require('../')
const { file, runTests } = require('./helper')
const proxyquire = require('proxyquire')

runTests(buildTests)

Expand Down Expand Up @@ -100,4 +101,148 @@ function buildTests (test, sync) {
t.pass('drain emitted')
})
})

test('call flush cb after flushed', (t) => {
t.plan(4)

const dest = file()
const fd = fs.openSync(dest, 'w')
const stream = new SonicBoom({ fd, minLength: 4096, sync })

stream.on('ready', () => {
t.pass('ready emitted')
})

t.ok(stream.write('hello world\n'))
t.ok(stream.write('something else\n'))

stream.flush((err) => {
if (err) t.fail(err)
else t.pass('flush cb called')
})
})

test('call flush cb even when have no data', (t) => {
t.plan(2)

const dest = file()
const fd = fs.openSync(dest, 'w')
const stream = new SonicBoom({ fd, minLength: 4096, sync })

stream.on('ready', () => {
t.pass('ready emitted')

stream.flush((err) => {
if (err) t.fail(err)
else t.pass('flush cb called')
})
})
})

test('call flush cb even when minLength is 0', (t) => {
t.plan(1)

const dest = file()
const fd = fs.openSync(dest, 'w')
const stream = new SonicBoom({ fd, minLength: 0, sync })

stream.flush((err) => {
if (err) t.fail(err)
else t.pass('flush cb called')
})
})

test('call flush cb with an error when trying to flush destroyed stream', (t) => {
t.plan(1)

const dest = file()
const fd = fs.openSync(dest, 'w')
const stream = new SonicBoom({ fd, minLength: 4096, sync })
stream.destroy()

try {
stream.flush((err) => {
if (err) t.pass(err)
else t.fail('flush cb called without an error')
})
} catch {
// ignore
}
})

test('call flush cb with an error when failed to flush', (t) => {
t.plan(5)

const fakeFs = Object.create(fs)
const SonicBoom = proxyquire('../', {
fs: fakeFs
})

const dest = file()
const fd = fs.openSync(dest, 'w')
const stream = new SonicBoom({
fd,
sync: false,
minLength: 1000
})

stream.on('ready', () => {
t.pass('ready emitted')
})

const err = new Error('other')
err.code = 'other'
fakeFs.write = function (fd, buf, enc, cb) {
fakeFs.write = fs.write
Error.captureStackTrace(err)
t.pass('fake fs.write called')
cb(err)
}

t.ok(stream.write('hello world\n'))
stream.flush((err) => {
if (err) t.equal(err.code, 'other')
else t.fail('flush cb called without an error')
})

stream.end()

stream.on('close', () => {
t.pass('close emitted')
})
})

test('call flush cb when finish writing when currently in the middle', (t) => {
t.plan(4)

const fakeFs = Object.create(fs)
const SonicBoom = proxyquire('../', {
fs: fakeFs
})

const dest = file()
const fd = fs.openSync(dest, 'w')
const stream = new SonicBoom({
fd,
sync: false,
minLength: 1
})

stream.on('ready', () => {
t.pass('ready emitted')
})

fakeFs.write = function (...args) {
stream.flush((err) => {
if (err) t.fail(err)
else t.pass('flush cb called')
})

t.pass('fake fs.write called')
fakeFs.write = fs.write
return fakeFs.write(...args)
}

t.ok(stream.write('hello world\n'))
})
}
2 changes: 1 addition & 1 deletion types/index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ export class SonicBoom extends EventEmitter {
* Writes the current buffer to the file if a write was not in progress.
* Do nothing if minLength is zero or if it is already writing.
*/
flush(): void;
flush(cb?: (err?: Error) => unknown): void;
rluvaton marked this conversation as resolved.
Show resolved Hide resolved

/**
* Reopen the file in place, useful for log rotation.
Expand Down