forked from awslabs/lambda-streams-to-firehose
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtransformer.js
120 lines (105 loc) · 3.71 KB
/
transformer.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
/**
* Example transformer that adds a newline to each event
*
* Args:
*
* data - Object or string containing the data to be transformed
*
* callback(err, Buffer) - callback to be called once transformation is
* completed. If supplied callback is with a null/undefined output (such as
* filtering) then nothing will be sent to Firehose
*/
var async = require('async');
require('./constants');
function addNewlineTransformer(data, callback) {
// emitting a new buffer as text with newline
callback(null, new Buffer(data + "\n", targetEncoding));
};
exports.addNewlineTransformer = addNewlineTransformer;
/** Convert JSON data to its String representation */
function jsonToStringTransformer(data, callback) {
// emitting a new buffer as text with newline
callback(null, new Buffer(JSON.stringify(data) + "\n", targetEncoding));
};
exports.jsonToStringTransformer = jsonToStringTransformer;
/** literally nothing at all transformer - just wrap the object in a buffer */
function doNothingTransformer(data, callback) {
// emitting a new buffer as text with newline
callback(null, new Buffer(data));
};
exports.doNothingTransformer = doNothingTransformer;
//DOGADDED
var zlib = require('zlib');
/**
* Example transformer that converts a regular expression to delimited text
*/
function regexToDelimiter(regex, delimiter, data, callback) {
var tokens = JSON.stringify(data).match(regex);
if (tokens) {
// emitting a new buffer as delimited text whose contents are the regex
// character classes
callback(null, new Buffer(tokens.slice(1).join(delimiter) + "\n"));
} else {
callback("Configured Regular Expression does not match any tokens", null);
}
};
exports.regexToDelimiter = regexToDelimiter;
//
// example regex transformer
// var transformer = exports.regexToDelimiter.bind(undefined, /(myregex) (.*)/,
// "|");
function transformRecords(serviceName, transformer, userRecords, callback) {
async.map(userRecords, function(userRecord, userRecordCallback) {
if (serviceName === KINESIS_SERVICE_NAME){
new Buffer(userRecord.data, 'base64').toString(targetEncoding)
var rdata=new Buffer(userRecord.data,"base64")
zlib.unzip(rdata, function (err, dataItem) {
transformer.call(undefined, dataItem, function(err, transformed) {
if (err) {
console.log(JSON.stringify(err));
userRecordCallback(err);
} else {
if (transformed && transformed instanceof Buffer) {
// call the map callback with the
// transformed Buffer decorated for use as a
// Firehose batch entry
userRecordCallback(null, transformed);
} else {
// don't know what this transformed
// object is
userRecordCallback("Output of Transformer was malformed. Must be instance of Buffer or routable Object");
}
}
});
});
}else{
var dataItem=userRecord;
transformer.call(undefined, dataItem, function(err, transformed) {
if (err) {
console.log(JSON.stringify(err));
userRecordCallback(err);
} else {
if (transformed && transformed instanceof Buffer) {
// call the map callback with the
// transformed Buffer decorated for use as a
// Firehose batch entry
userRecordCallback(null, transformed);
} else {
// don't know what this transformed
// object is
userRecordCallback("Output of Transformer was malformed. Must be instance of Buffer or routable Object");
}
}
});
}
}, function(err, transformed) {
// user records have now been transformed, so call
// errors or invoke the transformed record processor
if (err) {
callback(err);
} else {
callback(null, transformed);
}
});
};
exports.transformRecords = transformRecords;