This repository has been archived by the owner on Apr 18, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 1
/
worker.js
148 lines (128 loc) · 3.62 KB
/
worker.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
const { ServiceBroker } = require("moleculer");
const { loadConfig } = require("./config");
const { death, nodeid, exit, uuid, pipeline, to, logger, shortname, sleep } = require("./utils");
const s3 = require("./s3");
const fs = require("fs");
// variables
let running = false;
let exiting = false;
// create broker
const config = {
...loadConfig("worker"),
}
config.nodeID = nodeid(config.nodeID);
const broker = new ServiceBroker(config);
// service:
broker.createService({
name: "worker",
events: {
async "worker.wakeup"() {
logger.debug("wakeup");
run();
}
},
});
async function success(task) {
let err;
task.result = "success";
task.hostname = shortname();
logger.info("success:", task);
[err] = await to(broker.call("controller.updateTask", task));
if (err) { logger.error(err); }
}
async function failure(task, error) {
let err;
task.result = "failure";
task.hostname = shortname();
task.error = error.message || error.code || error;
logger.info("failure:", task);
[err] = await to(broker.call("controller.updateTask", task));
if (err) { logger.error(err); }
}
// background job
async function run() {
logger.debug("run called");
// return if already running - placed here are run is called async'd
if (running) return;
// starting loop
logger.debug("run loop started");
running = true;
while (!exiting) {
let err, task;
// get a task to process
[err, task] = await to(broker.call("controller.pullTask"));
if (err) { logger.error(err); }
// if no task, just go to sleep
if (task == null) {
running = false;
logger.debug("run loop stopped");
return;
}
// if task found, process it
logger.info("task:", task);
let input;
let tempName = uuid();
const tempInput = `/tmp/${tempName}.in`;
const tempOutput = `/tmp/${tempName}.out`;
// get stream from s3
if (!err) {
[err, input] = await to(s3.readFile(task.input));
}
// save stream to tempInput
if (!err) {
[err] = await to(pipeline(
input,
fs.createWriteStream(tempInput),
));
}
// run something
if (!err) {
try {
// if (task.name.endsWith("#1")) throw new Error("stop");
await sleep(4000 + 2000*Math.random());
[err] = await to(pipeline(
fs.createReadStream(tempInput),
fs.createWriteStream(tempOutput),
));
} catch (e) {
err = e;
}
}
// save file to s3 from tempOutput
if (!err) {
[err] = await to(s3.writeFile(fs.createReadStream(tempOutput), task.output));
}
// remove temp files
fs.unlink(tempInput, () => { });
fs.unlink(tempOutput, () => { });
// return result async, so we can start next task asap
if (err) {
failure(task, err);
logger.error(err);
} else {
success(task);
}
}
// exiting
exit(5000);
await broker.stop();
process.exit();
}
// start
async function startup() {
await broker.start();
try {
run();
} catch (e) {
console.log(e);
}
setInterval(run, 1000);
}
startup();
// SIGINT
death((_, err) => {
exit(1000);
if (err) { logger.error(err); }
if (broker != null) logger.info("Exiting, waiting for current process to finish");
exiting = true;
});