This was done in order to update the qs module to one that doesn't have the Denial of Service (Memory Exhaustion) issue reported by synk.
Use pumps to import, export, transform or transfer data. A data pump will read from its input stream, array or datapumps Buffer and will write to its output buffers. A pump will finish when all data is consumed from its output buffers. Make a group of pumps to handle complex ETL tasks.
$ npm install datapumps --save
var
datapumps = require('datapumps'),
Pump = datapumps.Pump,
MongodbMixin = datapumps.mixin.MongodbMixin,
ExcelWriterMixin = datapumps.mixin.ExcelWriterMixin,
pump = new Pump();
pump
.mixin(MongodbMixin('mongodb://localhost/marketing'))
.useCollection('Contact')
.from(pump.find({ country: "US" }))
.mixin(ExcelWriterMixin())
.createWorkbook('/tmp/ContactsInUs.xlsx')
.createWorksheet('Contacts')
.writeHeaders(['Name', 'Email'])
.process(function(contact) {
return pump.writeRow([ contact.name, contact.email ]);
})
.logErrorsToConsole()
.run()
.then(function() {
console.log("Done writing contacts to file");
});
Usage example with more details:
-
First, we create a pump and setup reading from mongodb
var pump = new Pump(); pump .mixin(MongodbMixin('mongodb://localhost/marketing')) .useCollection('Contact') .from(pump.find({ country: "US" }))
Mixins extend the functionality of a pump. The MongodbMixin adds
.find()
method which executes a query on the collection specified with.useCollection()
method. The pump will read the query results and controls data flow, i.e. it pauses read when it cannot write excel rows. -
Write data to excel with ExcelWriterMixin:
pump .mixin(ExcelWriterMixin()) .createWorkbook('/tmp/ContactsInUs.xlsx') .createWorksheet('Contacts') .writeHeaders(['Name', 'Email']) .process(function(contact) { return pump.writeRow([ contact.name, contact.email ]); })
The excel workbook, worksheet and header rows are created after adding ExcelWriterMixin to the pump. Each pump has a
.process()
callback that may transform or filter data. The callback is called for every data item of the buffer and should return a promise (we use bluebird library) that fulfills when the data is processed. In this example, the default processing callback (which copies data to the output buffer by default) is overridden with writing rows to the excel worksheet. -
Finally, start the pump and write to console when it's done.
pump .logErrorsToConsole() .run() .then(function() { console.log("Done writing contacts to file"); });
The
.logErrorsToConsole()
will log any error to the console, surprisingly. The pump will start on calling.run()
. It returns a promise that resolves when the pump finished.
A pump reads data from its input buffer or stream and copies it to the output buffer by default:
datapumps = require('datapumps');
(pump = new datapumps.Pump())
.from(<put a nodejs stream or datapumps buffer here>)
.run()
To access the output buffer, use the .buffer()
method, which returns a Buffer instance:
buffer = pump.buffer('output');
buffer = pump.buffer(); // equivalent with previous as the default buffer
// of the pump is called 'output'
Use the .buffers()
method when you need to write data into multiple output buffers:
ticketsPump
.buffers({
openTickets: ticketsPump.createBuffer(),
closedTickets: ticketsPump.createBuffer(),
});
reminderMailer = new datapumps.Pump()
reminderMailer
.from(ticketPump.buffer('openTickets'))
...
Note that the ticketsPump pump has two output buffers: openTickets and closedTickets. The reminderMailer pump reads data from the openTickets buffer of the tickets pump.
Use the .process()
method to set the function which processes data:
ticketsPump
.process(function(ticket) {
ticket.title = 'URGENT: ' + ticket.title;
return this.buffer('openTickets').writeAsync(ticket);
});
The argument of .process()
is a function that will be executed after the pump reads a data item.
The function is executed in the context of the pump object, i.e. this
refers to the pump itself. The
function should return a Promise that fulfills when the data is processed (i.e. written into a buffer
or stored elsewhere).
A pump is started by calling the .start()
method. The end
event will be emitted when the
input stream or buffer ended and all output buffers became empty.
pump.on('end', function() {
console.log('Pumped everything, and all my output buffers are empty. Bye.')
})
You often need multiple pumps to complete an ETL task. Pump groups help starting multiple pump in one step, and also enables handling the event when every pump ended:
sendMails = datapumps.group();
sendMails.addPump('tickets')
...;
sendMails.addPump('reminderMailer')
...;
sendMails
.start()
.whenFinished().then(function() {
console.log('Tickets processed.');
});
The .addPump()
method creates a new pump with given name and returns it for configuration.
.start()
will start all pumps in the group, while .whenFinished()
returns a Promise the fulfills
when every pump ended (Note: end
event is also emitted).
Sometimes you wish to encapsulate a part of an ETL process and also use it elsewhere. It is possible
to set an input pump and expose buffers from the group, so it will provide the same interface as a
simple pump (i.e. it has .from()
, .start()
, .buffer()
methods and emits end
event).
Most likely, you want to extend datapumps.Group
class (example is written in CoffeeScript):
{ Group, mixin: { MysqlMixin } } = require 'datapumps'
class Notifier extends Group
constructor: ->
super()
@addPump 'emailLookup'
.mixin MysqlMixin connection
.process (data) ->
@query('SELECT email FROM user where username = ?', [ data.username ])
.then (result) =>
data.emailAddress = result.email
@buffer().writeAsync data
@addPump 'sendMail'
.from @pump 'emailLookup'
.process (data) ->
... # send email to data.emailAddress
@buffer().writeAsync
recipient:
name: data.name
email: data.emailAddress
@setInputPump 'emailLookup'
@expose 'output', 'sendMail/output'
The Notifier
will behave like pump, but in the inside, it does an email address lookup using
mysql, and sends mail to those addresses. The output buffer of sendMail
pump is filled with
recipient data.
Use the created class like this:
etlProcess = datapumps.group()
etlProcess
.addPump 'notifier', new Notifier
.from <node stream or datapumps buffer>
etlProcess
.addPump 'logger'
.from etlProcess.pump('notifier').buffer()
.process (data) ->
console.log "Email sent to #{data.name} (#{data.email})"
Please note that you cannot use .process
method on a group.
Errors may occur while data is transfered between systems. Most of the time, you don't want to stop
on the first error but complete the transfer and re-run after fixing problems. Therefore
the pump group has an error buffer (.errorBuffer()
) which can hold ten error messages by default.
When the error buffer fills up, error
event is triggered and .whenFinised()
promise is rejected:
group
.start()
.whenFinished()
.then(function() {
if (!group.errorBuffer().isEmpty()) {
console.log("Transfer finished, but with errors.");
// errors list will be at group.errorBuffer().getContent()
}
})
.catch(function() {
console.log("Pump group failed with errors");
// errors list will be at group.errorBuffer().getContent()
});
You can use the .logErrorsToConsole()
helper method will configure the pump or group to print
errors when processing finished:
group
.logErrorsToConsole()
.start();
The following example shows a fingers-crossed type logging, i.e. debug logging is turned on after the first error occured:
{ group } = require('datapumps')
(d = group())
.addPump 'test'
.from d.createBuffer
sealed: true,
content: [ 'first', 'second', 'third', 'fourth' ]
.process (data) ->
throw new Error 'Start debugging', data if data == 'second'
@copy data
d.errorBuffer().on 'write', (data) ->
console.log data
d.buffer('test/output').on 'write', (data) ->
console.log "#{data} was written to test/output buffer"
d.start()
The output:
{ message: [Error: Start debugging], pump: 'test' }
third was written to test/output buffer
fourth was written to test/output buffer
The core components of datapumps is only responsible for passing data in a flow-controlled manner. The features required for import, export or transfer is provided by mixins:
- BatchMixin - Processes input in batches. Useful with MysqlMixin or other database writing mixins (batch insert can be much faster than inserting one by one).
- MergeMixin - Enables pump to read from multiple input buffers.
- ObjectTransformMixin - Common object transformation and validation methods
- CsvWriterMixin - Writes csv files using fast-csv package
- ExcelWriterMixin - Writes excel xlsx workbooks
- ExcelReaderMixin - Reads excel xlsx workbooks
- MysqlMixin - Queries and writes mysql databases
- MongodbMixin - Queries and writes mongodb
- RestMixin - Interact with REST services
When you implement new mixins, please fork datapumps and make a pull request.