-
Notifications
You must be signed in to change notification settings - Fork 1
/
subprocess.hpp
556 lines (493 loc) · 19.9 KB
/
subprocess.hpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
#pragma once
#include <algorithm>
#include <chrono>
#include <functional>
#include <future>
#include <iostream>
#include <list>
#include <stdexcept>
#include <string>
#include <tuple>
#include <vector>
// unix process stuff
#include <cstring>
#include <poll.h>
#include <signal.h>
#include <sys/prctl.h>
#include <sys/wait.h>
#include <unistd.h>
namespace subprocess {
namespace internal {
/**
* A TwoWayPipe that allows reading and writing between two processes
* must call initialize before being passed between processes or used
* */
class TwoWayPipe {
private:
//[0] is the output end of each pipe and [1] is the input end of
// each pipe
int input_pipe_file_descriptor[2];
int output_pipe_file_descriptor[2];
std::string internalBuffer;
bool inStreamGood = true;
bool endSelected = false;
bool initialized = false;
size_t currentSearchPos = 0;
/**
* closes the ends that aren't used (do we need to do this?
* */
void closeUnusedEnds() {
// we don't need the input end of the input pipe
// or the output end of the output pipe
close(input_pipe_file_descriptor[1]);
close(output_pipe_file_descriptor[0]);
}
/**
* reads up to n bytes into the internal buffer
* @param n - the max number of bytes to read in
* @return the number of bytes read in, -1 in the case of an
* error
* */
ssize_t readToInternalBuffer() {
char buf[256];
ssize_t bytesCounted = -1;
while ((bytesCounted = read(input_pipe_file_descriptor[0], buf, 256)) <= 0) {
if (bytesCounted < 0) {
if (errno != EINTR) { /* interrupted by sig handler return */
inStreamGood = false;
return -1;
}
} else if (bytesCounted == 0) { /* EOF */
return 0;
}
}
internalBuffer.append(buf, bytesCounted);
return bytesCounted;
}
/**
* tests pipe state, returns short which represents the status.
* If POLLIN bit is set then it can be read from, if POLLHUP bit is
* set then the write end has closed.
* */
short inPipeState(long wait_ms) {
// file descriptor struct to check if pollin bit will be set
struct pollfd fds = {.fd = input_pipe_file_descriptor[0], .events = POLLIN};
// poll with no wait time
int res = poll(&fds, 1, wait_ms);
// if res < 0 then an error occurred with poll
// POLLERR is set for some other errors
// POLLNVAL is set if the pipe is closed
if (res < 0 || fds.revents & (POLLERR | POLLNVAL)) {
// TODO
// an error occurred, check errno then throw exception if it is critical
}
// check if there is either data in the pipe or the other end is closed
//(in which case a call will not block, it will simply return 0 bytes)
return fds.revents;
}
public:
TwoWayPipe() = default;
/**
* initializes the TwoWayPipe the pipe can not be used until
* this is called
* */
void initialize() {
if (initialized) {
return;
}
bool failed = pipe(input_pipe_file_descriptor) < 0;
failed |= pipe(output_pipe_file_descriptor) < 0;
if (failed)
{
// error occurred, check errno and throw relevant exception
} else {
initialized = true;
}
}
/**
* sets this to be the child end of the TwoWayPipe
* linking the input and output ends to stdin and stdout/stderr
* This call does nothing if it is already set as the child end
* */
bool setAsChildEnd() {
if (endSelected) return false;
endSelected = true;
int tmp[2] = {input_pipe_file_descriptor[0], input_pipe_file_descriptor[1]};
input_pipe_file_descriptor[0] = output_pipe_file_descriptor[0];
input_pipe_file_descriptor[1] = output_pipe_file_descriptor[1];
output_pipe_file_descriptor[0] = tmp[0];
output_pipe_file_descriptor[1] = tmp[1];
dup2(input_pipe_file_descriptor[0], STDIN_FILENO);
dup2(output_pipe_file_descriptor[1], STDOUT_FILENO);
dup2(output_pipe_file_descriptor[1], STDERR_FILENO);
closeUnusedEnds();
return true;
}
/**
* sets this pipe to be the parent end of the TwoWayPipe
* */
bool setAsParentEnd() {
if (endSelected) return false;
endSelected = true;
closeUnusedEnds();
return true;
}
/**
* writes a string to the pipe
* @param input - the string to write
* @return the number of bytes written
* */
size_t writeP(const std::string& input) {
return write(output_pipe_file_descriptor[1], input.c_str(), input.size());
}
/**
* @return true unless the last call to read either failed or
* reached EOF
* */
bool isGood() const {
return inStreamGood;
}
/**
* Read line from the pipe - Not threadsafe
* Blocks until either a newline is read
* or the other end of the pipe is closed
* @return the string read from the pipe or the empty string if
* there was not a line to read.
* */
std::string readLine() {
size_t firstNewLine;
while ((firstNewLine = internalBuffer.find_first_of('\n', currentSearchPos)) == std::string::npos) {
currentSearchPos = internalBuffer.size();
ssize_t bytesRead = readToInternalBuffer();
if (bytesRead < 0) {
std::cerr << "errno " << errno << " occurred" << std::endl;
return "";
}
if (bytesRead == 0) { // an EOF was reached, return the final line
inStreamGood = false;
return internalBuffer;
}
}
// contains the characters after the
// firstNewLine
std::string endOfInternalBuffer = internalBuffer.substr(firstNewLine + 1);
internalBuffer.erase(firstNewLine + 1);
internalBuffer.swap(endOfInternalBuffer);
currentSearchPos = 0;
// now contains the first characters up to and
// including the newline character
return endOfInternalBuffer;
}
bool canReadLine(long wait_ms) {
if (!inStreamGood) {
return false;
}
while (true) {
size_t firstNewLine = internalBuffer.find_first_of('\n', currentSearchPos);
if (firstNewLine != std::string::npos) {
// this means that the next call to readLine won't
// have to search through the whole string again
currentSearchPos = firstNewLine;
return true;
}
currentSearchPos = internalBuffer.size();
short pipeState = inPipeState(wait_ms);
if (!(pipeState & POLLIN)) { // no bytes to read in pipe
if (pipeState & POLLHUP) { // the write end has closed
if (internalBuffer.size() == 0) { // and theres no bytes in the buffer
// this pipe is done
inStreamGood = false;
return false;
}
// the buffer can be read as the final string
return true;
}
// pipe is still good, it just hasn't got anything in it
return false;
}
ssize_t bytesRead = readToInternalBuffer();
if (bytesRead < 0) {
// error check errno and throw exception
return false; // for now just return false
}
}
}
void closeOutput() {
close(output_pipe_file_descriptor[1]);
}
};
/**
* A Process class that wraps the creation of a seperate process
* and gives acces to a TwoWayPipe to that process and its pid
* The Process is not in a valid state until start is called
* This class does not have ownership of the process, it merely maintains a
* connection
* */
class Process {
pid_t pid;
TwoWayPipe pipe;
public:
Process() = default;
/**
* Starts a seperate process with the provided command and
* arguments This also initializes the TwoWayPipe
* @param commandPath - an absolute string to the program path
* @param argsItBegin - the begin iterator to strings that
* will be passed as arguments
* @param argsItEnd - the end iterator to strings that
* will be passed as arguments
* @return TODO return errno returned by child call of execv
* (need to use the TwoWayPipe)
* */
template <class InputIT>
void start(const std::string& commandPath, InputIT argsItBegin, InputIT argsItEnd) {
pid = 0;
pipe.initialize();
// construct the argument list (unfortunately,
// the C api wasn't defined with C++ in mind, so
// we have to abuse const_cast) see:
// https://stackoverflow.com/a/190208
std::vector<char*> cargs;
// the process name must be first for execv
cargs.push_back(const_cast<char*>(commandPath.c_str()));
while (argsItBegin != argsItEnd) {
cargs.push_back(const_cast<char*>((*argsItBegin).c_str()));
argsItBegin++;
}
// must be terminated with a nullptr for execv
cargs.push_back(nullptr);
pid = fork();
// child
if (pid == 0) {
pipe.setAsChildEnd();
// ask kernel to deliver SIGTERM
// in case the parent dies
prctl(PR_SET_PDEATHSIG, SIGTERM);
execv(commandPath.c_str(), cargs.data());
// Nothing below this line
// should be executed by child
// process. If so, it means that
// the execl function wasn't
// successfull, so lets exit:
exit(1);
}
pipe.setAsParentEnd();
}
template <typename Rep = long>
bool isReady(std::chrono::duration<Rep> timeout = std::chrono::duration<long>(0)) {
if (timeout.count() < 0) {
return pipe.canReadLine(-1);
}
return pipe.canReadLine(std::chrono::duration_cast<std::chrono::milliseconds>(timeout).count());
}
template <typename Rep = long>
std::string readLine(std::chrono::duration<Rep> timeout = std::chrono::duration<long>(-1)) {
if (isReady(timeout)) {
return pipe.readLine();
}
return "";
}
size_t write(const std::string& input) {
return pipe.writeP(input);
}
void sendEOF() {
pipe.closeOutput();
}
bool isGood() const {
return pipe.isGood();
}
/**
* blocks until the process exits and returns the exit
* closeUnusedEnds
* */
int waitUntilFinished() {
int status;
waitpid(pid, &status, 0);
return status;
}
};
}
/**
* Execute a subprocess and optionally call a function per line of stdout.
* @param commandPath - the path of the executable to execute, e.g. "/bin/cat"
* @param commandArgs - the extra arguments for an executable e.g. {"argument 1", "henlo"}
* @param stdinInput - a list of inputs that will be piped into the processes' stdin
* @param lambda - a function that is called with every line from the executed process (default NOP function)
* @param env - a list of environment variables that the process will execute with (default nothing)
*/
int execute(const std::string& commandPath, const std::vector<std::string>& commandArgs,
const std::vector<std::string>& stdinInput,
const std::function<void(std::string)>& lambda = [](std::string) {},
const std::vector<std::string>& env = {});
/**
* Execute a subprocess and optionally call a function per line of stdout.
* @param commandPath - the path of the executable to execute, e.g. "/bin/cat"
* @param commandArgs - the extra arguments for an executable e.g. {"argument 1", "henlo"}
* @param stdinBegin - an InputIterator to provide stdin
* @param stdinEnd - the end of the InputIterator range for stdin
* @param lambda - a function that is called with every line from the executed process (default NOP function)
* @param env - a list of environment variables that the process will execute with (default nothing)
*/
template <class InputIt>
int execute(const std::string& commandPath, const std::vector<std::string>& commandArgs, InputIt stdinBegin,
InputIt stdinEnd, const std::function<void(std::string)>& lambda = [](std::string) {},
const std::vector<std::string>& env = {});
/**
* Execute a subprocess and retrieve the output of the command
* @param commandPath - the path of the executable to execute, e.g. "/bin/cat"
* @param commandArgs - the extra arguments for an executable e.g. {"argument 1", "henlo"}
* @param stdinInput - a list of inputs that will be piped into the processes' stdin
* @param env - a list of environment variables that the process will execute with (default nothing)
*/
std::vector<std::string> check_output(const std::string& commandPath,
const std::vector<std::string>& commandArgs, const std::vector<std::string>& stdioInput,
const std::vector<std::string>& env = {});
/**
* Execute a subprocess and retrieve the output of the command
* @param commandPath - the path of the executable to execute, e.g. "/bin/cat"
* @param commandArgs - the extra arguments for an executable e.g. {"argument 1", "henlo"}
* @param stdinBegin - an InputIterator to provide stdin
* @param stdinEnd - the end of the InputIterator range for stdin
* @param env - a list of environment variables that the process will execute with (default nothing)
*/
template <class InputIt>
std::vector<std::string> check_output(const std::string& commandPath,
const std::vector<std::string>& commandArgs, InputIt stdioBegin, InputIt stdioEnd,
const std::vector<std::string>& env = {});
// // TODO: what if the process terminates? consider error handling potentials...
// class ProcessStream {
// public:
// ProcessStream(const std::string& commandPath, const std::vector<std::string>& commandArgs);
// // write a line to the subprocess's stdin
// void write(const std::string& inputLine);
// // read a line and block until received (or until timeout reached)
// template<typename Rep>
// std::string read(std::chrono::duration<Rep> timeout=-1);
// // if there is a line for reading
// template<typename Rep>
// bool ready(std::chrono::duration<Rep> timeout=0);
// ProcessStream& operator<<(const std::string& inputLine);
// ProcessStream& operator>>(std::string& outputLine);
// };
/**
* Execute a process, inputting stdin and calling the functor with the stdout
* lines.
* @param commandPath - an absolute string to the program path
* @param commandArgs - a vector of arguments that will be passed to the process
* @param stringInput - a feed of strings that feed into the process (you'll typically want to end them with a
* newline)
* @param lambda - the function to execute with every line output by the process
* @return the exit status of the process
* */
int execute(const std::string& commandPath, const std::vector<std::string>& commandArgs,
std::list<std::string>& stringInput /* what pumps into stdin */,
std::function<void(std::string)> lambda) {
internal::Process childProcess;
childProcess.start(commandPath, commandArgs.begin(), commandArgs.end());
// while our string queue is working,
while (!stringInput.empty()) {
// write our input to the process's stdin pipe
std::string newInput = stringInput.front();
stringInput.pop_front();
childProcess.write(newInput);
}
childProcess.sendEOF();
// iterate over each line output by the child's stdout, and call
// the functor
std::string input;
while ((input = childProcess.readLine()).size() > 0) {
lambda(input);
}
return childProcess.waitUntilFinished();
}
/* convenience fn to return a list of outputted strings */
std::vector<std::string> checkOutput(const std::string& commandPath,
const std::vector<std::string>& commandArgs,
std::list<std::string>& stringInput /* what pumps into stdin */, int& status) {
std::vector<std::string> retVec;
status = execute(
commandPath, commandArgs, stringInput, [&](std::string s) { retVec.push_back(std::move(s)); });
return retVec;
}
/* spawn the process in the background asynchronously, and return a future of the status code */
std::future<int> async(const std::string commandPath, const std::vector<std::string> commandArgs,
std::list<std::string> stringInput, std::function<void(std::string)> lambda) {
// spawn the function async - we must pass the args by value into the async lambda
// otherwise they may destruct before the execute fn executes!
// whew, that was an annoying bug to find...
return std::async(std::launch::async,
[&](const std::string cp, const std::vector<std::string> ca, std::list<std::string> si,
std::function<void(std::string)> l) { return execute(cp, ca, si, l); },
commandPath, commandArgs, stringInput, lambda);
}
/* TODO: refactor up this function so that there isn't duplicated code - most of this is identical to the
* execute fn execute a program and stream the output after each line input this function calls select to
* check if outputs needs to be pumped after each line input. This means that if the line takes too long to
* output, it may be not input into the functor until another line is fed in. You may modify the delay to try
* and wait longer until moving on. This delay must exist, as several programs may not output a line for each
* line input. Consider grep - it will not output a line if no match is made for that input. */
class ProcessStream {
internal::Process childProcess;
public:
ProcessStream(const std::string& commandPath, const std::vector<std::string>& commandArgs,
std::list<std::string>& stringInput) {
childProcess.start(commandPath, commandArgs.begin(), commandArgs.end());
// while our string queue is working,
while (!stringInput.empty()) {
// write our input to the
// process's stdin pipe
std::string newInput = stringInput.front();
stringInput.pop_front();
childProcess.write(newInput);
}
// now we finished chucking in the string, send
// an EOF
childProcess.sendEOF();
}
~ProcessStream() {
childProcess.waitUntilFinished();
}
struct iterator {
ProcessStream* ps;
bool isFinished = false;
// current read line of the process
std::string cline;
iterator(ProcessStream* ps) : ps(ps) {
// increment this ptr, because nothing exists initially
++(*this);
}
// ctor for end()
iterator(ProcessStream* ps, bool) : ps(ps), isFinished(true) {}
const std::string& operator*() const {
return cline;
}
/* preincrement */
iterator& operator++() {
// iterate over each line output by the child's stdout, and call the functor
cline = ps->childProcess.readLine();
if (cline.empty()) {
isFinished = true;
}
return *this;
}
/* post increment */
iterator operator++(int) {
iterator old(*this);
++(*this);
return old;
}
bool operator==(const iterator& other) const {
return other.ps == this->ps && this->isFinished == other.isFinished;
}
bool operator!=(const iterator& other) const {
return !((*this) == other);
}
};
iterator begin() {
return iterator(this);
}
iterator end() {
return iterator(this, true);
}
};
} // end namespace subprocess