-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathbulkStream.js
61 lines (49 loc) · 1.18 KB
/
bulkStream.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
var stream = require('stream'),
events = require('events'),
util = require('util');
function BulkStream(chunkSize) {
stream.Writable.call(this, {objectMode: true});
this.bulk = [];
this.chunkSize = chunkSize || 1000;
this.emitter = new events.EventEmitter();
this.on('finish', function() {
this._close();
}.bind(this));
}
util.inherits(BulkStream, stream.Writable);
BulkStream.prototype._write = function (chunk, encoding, callback) {
this.push(chunk);
callback();
}
BulkStream.prototype._close = function(cb) {
clearTimeout(this.timer);
this._emit();
this.emitter.emit('close');
}
BulkStream.prototype._emit = function() {
if (this.bulk.length == 0) {
return;
}
this.emitter.emit('data', this.bulk.slice());
this.bulk = [];
}
BulkStream.prototype.on = function(event, cb) {
if (event != 'data' && event != 'close') {
stream.Writable.prototype.on.call(this, event, cb);
return;
}
this.emitter.on(event, cb);
return this;
}
BulkStream.prototype.push = function(data, cb) {
this.bulk.push(data);
if (this.bulk.length >= this.chunkSize) {
this._emit();
return;
}
}
module.exports = {
create: create = function(chunkSize) {
return new BulkStream(chunkSize);
}
}