diff --git a/doc/manuals/taskvine/index.md b/doc/manuals/taskvine/index.md index 805914ebff..14a4605589 100644 --- a/doc/manuals/taskvine/index.md +++ b/doc/manuals/taskvine/index.md @@ -1512,14 +1512,12 @@ function definitions into a library task `libtask` libtask = m.create_library_from_functions("my_library", my_sum, my_mul) ``` -The library task can be further described by any of options available -to normal tasks, such as resources or additional input files: +You can optionally specify the number of functions the library can +run concurrently by setting the number of function slots (default to 1): === "Python" ```python - libtask.set_cores(1) - libtask.set_memory(2000) - libtask.set_disk(2000) + libtask.set_function_slots(4) # maximum 4 concurrent functions ``` Once complete, the library task must be `installed` in the system: diff --git a/poncho/src/poncho/library_network_code.py b/poncho/src/poncho/library_network_code.py index 0ce09dffdb..fa4f1e94af 100755 --- a/poncho/src/poncho/library_network_code.py +++ b/poncho/src/poncho/library_network_code.py @@ -18,8 +18,13 @@ def library_network_code(): import argparse import traceback import cloudpickle - - + import select + import signal + + # self-pipe to turn a sigchld signal when a child finishes execution + # into an I/O event. + r, w = os.pipe() + # This class captures how results from FunctionCalls are conveyed from # the library to the manager. # For now, all communication details should use this class to generate responses. @@ -54,101 +59,141 @@ def remote_wrapper(event): return remote_wrapper # Self-identifying message to send back to the worker, including the name of this library. - def send_configuration(config, out_pipe): + # Send back a SIGCHLD to interrupt worker sleep and get it to work. + def send_configuration(config, out_pipe_fd, worker_pid): config_string = json.dumps(config) config_cmd = f"{len(config_string)}\n{config_string}" - out_pipe.write(bytes(config_cmd, 'utf-8')) - out_pipe.flush() + os.writev(out_pipe_fd, [bytes(config_cmd, 'utf-8')]) + os.kill(worker_pid, signal.SIGCHLD) + + # Handler to sigchld when child exits. + def sigchld_handler(signum, frame): + # write any byte to signal that there's at least 1 child + os.writev(w, [b'a']) + + # Read data from worker, start function, and dump result to `outfile`. + def start_function(in_pipe_fd): + # read length of buffer to read + buffer_len = b'' + while True: + c = os.read(in_pipe_fd, 1) + if c == b'': + print('Library code: cant get length', file=sys.stderr) + exit(1) + elif c == b'\n': + break + else: + buffer_len += c + buffer_len = int(buffer_len) + + # now read the buffer to get invocation details + line = str(os.read(in_pipe_fd, buffer_len), encoding='utf-8') + function_id, function_name, function_sandbox = line.split(" ", maxsplit=2) + function_id = int(function_id) + + if function_name: + # exec method for now is fork only, direct will be supported later + exec_method = 'fork' + if exec_method == "direct": + library_sandbox = os.getcwd() + try: + os.chdir(function_sandbox) + response = cloudpickle.dumps(globals()[function_name](event)) + except Exception as e: + print(f'Library code: Function call failed due to {e}', file=sys.stderr) + sys.exit(1) + finally: + os.chdir(library_sandbox) + else: + p = os.fork() + if p == 0: + # parameters are represented as infile. + os.chdir(function_sandbox) + with open('infile', 'rb') as f: + event = cloudpickle.load(f) + + # output of execution should be dumped to outfile. + with open('outfile', 'wb') as f: + cloudpickle.dump(globals()[function_name](event), f) + os._exit(0) + elif p < 0: + print(f'Library code: unable to fork to execute {function_name}', file=sys.stderr) + result = None + success = False + reason = f'unable to fork-exec function {function_name}' + response = LibraryResponse(result, success, reason).generate() + + # return pid and function id of child process to parent. + else: + return p, function_id + else: + # malformed message from worker so we exit + print('malformed message from worker. Exiting..', file=sys.stderr) + exit(1) + return -1 + + # Send result of a function execution to worker. Wake worker up to do work with SIGCHLD. + def send_result(out_pipe_fd, task_id, worker_pid): + buff = bytes(str(task_id), 'utf-8') + buff = bytes(str(len(buff)), 'utf-8')+b'\n'+buff + os.writev(out_pipe_fd, [buff]) + os.kill(worker_pid, signal.SIGCHLD) def main(): + ppid = os.getppid() parser = argparse.ArgumentParser('Parse input and output file descriptors this process should use. The relevant fds should already be prepared by the vine_worker.') parser.add_argument('--input-fd', required=True, type=int, help='input fd to receive messages from the vine_worker via a pipe') parser.add_argument('--output-fd', required=True, type=int, help='output fd to send messages to the vine_worker via a pipe') + parser.add_argument('--worker-pid', required=True, type=int, help='pid of main vine worker to send sigchild to let it know theres some result.') args = parser.parse_args() # Open communication pipes to vine_worker. # The file descriptors are inherited from the vine_worker parent process # and should already be open for reads and writes. - # Below lines only convert file descriptors into native Python file objects. - in_pipe = os.fdopen(args.input_fd, 'rb') - out_pipe = os.fdopen(args.output_fd, 'wb') + in_pipe_fd = args.input_fd + out_pipe_fd = args.output_fd + # send configuration of library, just its name for now config = { "name": name(), } - send_configuration(config, out_pipe) + send_configuration(config, out_pipe_fd, args.worker_pid) - # A pair of pipes to communicate with the child process if needed. - read, write = os.pipe() + # mapping of child pid to function id of currently running functions + pid_to_func_id = {} - while True: - while True: - # wait for message from worker about what function to execute - try: - # get length of first buffer - # remove trailing \n - buffer_len = int(in_pipe.readline()[:-1]) - # if the worker closed the pipe connected to the input of this process, we should just exit - # stderr is already dup2'ed to send error messages to an output file that can be brought back for further analysis. - except Exception as e: - print("Cannot read message from the manager, exiting. ", e, file=sys.stderr) - sys.exit(1) + # register sigchld handler to turn a sigchld signal into an I/O event + signal.signal(signal.SIGCHLD, sigchld_handler) + + # 5 seconds to wait for select, any value long enough would probably do + timeout = 5 - # read first buffer, this buffer should contain only utf-8 chars. - line = str(in_pipe.read(buffer_len), encoding='utf-8') - function_name, event_size, function_sandbox = line.split(" ", maxsplit=2) - - if event_size: - event_size = int(event_size) - event_str = in_pipe.read(event_size) - - # load the event into a Python object - event = cloudpickle.loads(event_str) - - # see if the user specified an execution method - exec_method = event.get("remote_task_exec_method", None) - - # library either directly executes or forks to do so. - if exec_method == "direct": - library_sandbox = os.getcwd() - try: - os.chdir(function_sandbox) - response = cloudpickle.dumps(globals()[function_name](event)) - except Exception as e: - print(f'Library code: Function call failed due to {e}', file=sys.stderr) - sys.exit(1) - finally: - os.chdir(library_sandbox) - else: - p = os.fork() - - # child executes and pipes result back to parent. - if p == 0: - os.chdir(function_sandbox) - response = cloudpickle.dumps(globals()[function_name](event)) - written = 0 - buff = len(response).to_bytes(8, sys.byteorder)+response - while written < len(buff): - written += os.write(write, buff[written:]) - os._exit(0) - elif p < 0: - print(f'Library code: unable to fork to execute {function_name}', file=sys.stderr) - result = None - success = False - reason = f'unable to fork-exec function {function_name}' - response = LibraryResponse(result, success, reason).generate() - - # parent collects result and waits for child to exit. + while True: + # check if parent exits + c_ppid = os.getppid() + if c_ppid != ppid or c_ppid == 1: + exit(0) + + # wait for messages from worker or child to return + rlist, wlist, xlist = select.select([in_pipe_fd, r], [], [], timeout) + + for re in rlist: + # worker has a function, run it + if re == in_pipe_fd: + pid, func_id = start_function(in_pipe_fd) + pid_to_func_id[pid] = func_id + else: + # at least 1 child exits, reap all. + # read only once as os.read is blocking if there's nothing to read. + # note that there might still be bytes in `r` but it's ok as they will + # be discarded in the next iterations. + os.read(r, 1) + while len(pid_to_func_id) > 0: + c_pid, c_exit_status = os.waitpid(-1, os.WNOHANG) + if c_pid > 0: + send_result(out_pipe_fd, pid_to_func_id[c_pid], args.worker_pid) + del pid_to_func_id[c_pid] + # no exited child to reap, break else: - response_len = b'' - while len(response_len) < 8: - response_len += os.read(read, 8-len(response_len)) - response_len = int.from_bytes(response_len, sys.byteorder) - response = b'' - while len(response) < response_len: - response += os.read(read, response_len-len(response)) - os.waitpid(p, 0) - - out_pipe.write(bytes(str(len(response)), 'utf-8')+b'\n'+response) - out_pipe.flush() + break return 0 diff --git a/taskvine/src/worker/vine_process.c b/taskvine/src/worker/vine_process.c index 6f73b2d8c7..6b5e54c429 100644 --- a/taskvine/src/worker/vine_process.c +++ b/taskvine/src/worker/vine_process.c @@ -49,10 +49,6 @@ See the file COPYING for details. extern char *workspace; -static char *vine_process_invoke_function(struct vine_process *library_process, const char *function_name, - const char *function_input, const int function_input_length, const char *sandbox_path, - int *function_output_len); - /* Give the letter code used for the process sandbox dir. */ @@ -204,29 +200,6 @@ static void set_resources_vars(struct vine_process *p) } } -static char *load_input_file(struct vine_task *t, int *len) -{ - FILE *fp = fopen("infile", "r"); - if (!fp) { - fatal("coprocess could not open file 'infile' for reading: %s", strerror(errno)); - } - - fseek(fp, 0L, SEEK_END); - size_t fsize = ftell(fp); - fseek(fp, 0L, SEEK_SET); - - char *buf = malloc(fsize); - - int bytes_read = full_fread(fp, buf, fsize); - if (bytes_read < 0) { - fatal("error reading file: %s", strerror(errno)); - } - - fclose(fp); - *len = fsize; - return buf; -} - /* After a process exit has been observed, record the completion in the process structure. */ @@ -248,12 +221,6 @@ static void vine_process_complete(struct vine_process *p, int status) p->pid, p->exit_code); } - - /* If this is a completed function, then decrease the number of funcs on that library. */ - - if (p->type == VINE_PROCESS_TYPE_FUNCTION) { - p->library_process->functions_running--; - } } /* @@ -310,19 +277,12 @@ pid_t vine_process_execute(struct vine_process *p) } else { /* For other task types, read input from null and send output to assigned file. */ input_fd = open("/dev/null", O_RDONLY); - if (p->type == VINE_PROCESS_TYPE_FUNCTION) { - char *output_file = string_format("%s/outfile", p->sandbox); - output_fd = open(output_file, O_WRONLY | O_TRUNC | O_CREAT, 0777); - error_fd = open(p->output_file_name, O_WRONLY | O_TRUNC | O_CREAT, 0777); - free(output_file); - } else { - output_fd = open(p->output_file_name, O_WRONLY | O_TRUNC | O_CREAT, 0777); - if (output_fd < 0) { - debug(D_VINE, "Could not open worker stdout: %s", strerror(errno)); - return 0; - } - error_fd = output_fd; + output_fd = open(p->output_file_name, O_WRONLY | O_TRUNC | O_CREAT, 0777); + if (output_fd < 0) { + debug(D_VINE, "Could not open worker stdout: %s", strerror(errno)); + return 0; } + error_fd = output_fd; } /* Start the performance clock just prior to forking the task. */ @@ -337,11 +297,6 @@ pid_t vine_process_execute(struct vine_process *p) debug(D_VINE, "started task %d pid %d: %s", p->task->task_id, p->pid, p->task->command_line); - /* If we just started a function, increase the number assigned to this library. */ - if (p->type == VINE_PROCESS_TYPE_FUNCTION) { - p->library_process->functions_running++; - } - /* If we just started a library, then retain links to communicate with it. */ if (p->type == VINE_PROCESS_TYPE_LIBRARY) { @@ -382,33 +337,6 @@ pid_t vine_process_execute(struct vine_process *p) printf("The sandbox dir is %s", p->sandbox); fatal("could not change directory into %s: %s", p->sandbox, strerror(errno)); } - - /* In the special case of a function-call-task, just load data, communicate with the library, and exit. - */ - - if (p->type == VINE_PROCESS_TYPE_FUNCTION) { - - change_process_title("vine_worker [function]"); - - // load data from input file - int input_len = 0; - char *input = load_input_file(p->task, &input_len); - - int output_len = 0; - - // communicate with library to invoke the function - char *output = vine_process_invoke_function(p->library_process, - p->task->command_line, - input, - input_len, - p->sandbox, - &output_len); - - // write data to output file - full_write(output_fd, output, output_len); - - _exit(0); - } /* For process types other than library, set up file desciptors. * The library will use the input_fd and output_fd to talk to the manager instead. */ if (p->type != VINE_PROCESS_TYPE_LIBRARY) { @@ -451,12 +379,16 @@ pid_t vine_process_execute(struct vine_process *p) export_environment(p); /* Library task passes the file descriptors to talk to the manager via - * the command line so it requires a special execl. */ + * the command line plus the worker pid to wake the worker up + * so it requires a special execl. */ if (p->type != VINE_PROCESS_TYPE_LIBRARY) { execl("/bin/sh", "sh", "-c", p->task->command_line, (char *)0); } else { - char *final_command = string_format( - "%s --input-fd %d --output-fd %d", p->task->command_line, input_fd, output_fd); + char *final_command = string_format("%s --input-fd %d --output-fd %d --worker-pid %d", + p->task->command_line, + input_fd, + output_fd, + getppid()); execl("/bin/sh", "sh", "-c", final_command, (char *)0); } _exit(127); // Failed to execute the cmd. @@ -465,54 +397,6 @@ pid_t vine_process_execute(struct vine_process *p) return 0; } -/* -Invoke a function against a library by sending the invocation message, -and then reading back the result from the necessary pipe. -*/ - -static char *vine_process_invoke_function(struct vine_process *library_process, const char *function_name, - const char *function_input, const int function_input_len, const char *sandbox_path, - int *function_output_len) -{ - /* If this function has a valid maximum runtime, we'll wait for that long for some result. - * Otherwise wait forever. */ - int64_t max_run_time = library_process->task->resources_requested->wall_time; - time_t stoptime = LINK_FOREVER; - if (max_run_time >= 1) { - stoptime = time(0) + max_run_time; - } - - int length = function_input_len; - - /* Send the function name, length of data, and sandbox directory. - * The length of the buffer goes first (marked by `\n`) then followed - * by the buffer. */ - char *buffer = string_format("%s %d %s", function_name, length, sandbox_path); - int buffer_len = strlen(buffer); - link_printf(library_process->library_write_link, stoptime, "%d\n%s", buffer_len, buffer); - - /* Then send the function data itself. */ - link_write(library_process->library_write_link, function_input, length, stoptime); - - /* Read back function result by first reading the length of the result buffer (`\n` terminated), - * then read the result buffer. Return nothing if any error happens.*/ - char response_len_str[VINE_LINE_MAX]; - if (link_readline(library_process->library_read_link, response_len_str, VINE_LINE_MAX, stoptime) == 0) { - *function_output_len = 0; - return 0; - } - - int response_len = atoi(response_len_str); - char *line = malloc(sizeof(char) * response_len); - if (link_read(library_process->library_read_link, line, response_len, stoptime)) { - *function_output_len = response_len; - return line; - } else { - *function_output_len = 0; - return 0; - } -} - /* Non-blocking check to see if a process has completed. Returns true if complete, false otherwise. diff --git a/taskvine/src/worker/vine_worker.c b/taskvine/src/worker/vine_worker.c index b56ab6a524..9ba14a4f97 100644 --- a/taskvine/src/worker/vine_worker.c +++ b/taskvine/src/worker/vine_worker.c @@ -490,6 +490,24 @@ static void report_worker_ready(struct link *manager) send_keepalive(manager, 1); } +/* Send a message containing details of a function call to the relevant library to execute it. + * @param p The relevant vine_process structure encapsulating a function call. + * @return 1 if the message is successfully sent to the library, 0 otherwise. */ +static int start_function(struct vine_process *p) +{ + char *buffer = string_format("%d %s %s", p->task->task_id, p->task->command_line, p->sandbox); + ssize_t result = link_printf(p->library_process->library_write_link, + time(0) + active_timeout, + "%ld\n%s", + strlen(buffer), + buffer); + free(buffer); + if (result < 0) { + return 0; + } + return 1; +} + /* Start executing the given process on the local host, accounting for the resources as necessary. @@ -501,6 +519,7 @@ static int start_process(struct vine_process *p, struct link *manager) pid_t pid; struct vine_task *t = p->task; + int ok = 1; /* Create the sandbox environment for the task. */ if (!vine_sandbox_stagein(p, global_cache)) { @@ -520,19 +539,35 @@ static int start_process(struct vine_process *p, struct link *manager) vine_gpus_allocate(t->resources_requested->gpus, t->task_id); } - /* Now start the actual process. */ - pid = vine_process_execute(p); - if (pid < 0) - fatal("unable to fork process for task_id %d!", p->task->task_id); - - /* If this process represents a library, notify the manager that it is running. */ - if (p->task->provides_library) { - send_message(manager, "info library-update %d %d\n", p->task->task_id, VINE_LIBRARY_STARTED); + /* Starting a function call is different from starting a standard task, as a function call is not + * launched through vine_process_exeute. */ + if (t->needs_library) { + debug(D_VINE, "started function call %d: %s", p->task->task_id, p->task->command_line); + ok = start_function(p); + if (ok) { + p->library_process->functions_running++; + } + } else { + /* Now start the actual process. */ + pid = vine_process_execute(p); + if (pid < 0) + fatal("unable to fork process for task_id %d!", p->task->task_id); + + /* If this process represents a library, notify the manager that it is running. + * Also set the sigchld flag so the worker will immediately check for the library + * startup instead of sleeping and waiting for the manager. */ + if (p->task->provides_library) { + send_message(manager, "info library-update %d %d\n", p->task->task_id, VINE_LIBRARY_STARTED); + sigchld_received_flag = 1; + } } - itable_insert(procs_running, p->task->task_id, p); + /* Only insert process to procs_running if everything runs fine. */ + if (ok) { + itable_insert(procs_running, p->task->task_id, p); + } - return 1; + return ok; } /* @@ -557,6 +592,10 @@ static void reap_process(struct vine_process *p, struct link *manager) p->exit_code = 1; } + if (p->type == VINE_PROCESS_TYPE_FUNCTION) { + p->library_process->functions_running--; + } + itable_remove(procs_running, p->task->task_id); itable_insert(procs_complete, p->task->task_id, p); } @@ -636,6 +675,38 @@ static void expire_procs_running() } } +/* Receive a message containing a function call id from the library and + * reap the completed function call. + * @param p The vine process encapsulating the function call. + * @param manager The link to the manager. + * return 1 if the operation succeeds, 0 otherwise. */ +static int reap_completed_function_call(struct vine_process *p, struct link *manager) +{ + char buffer[VINE_LINE_MAX]; // Buffer to store length of data from library. + int ok = 1; + + /* read number of bytes of data first. */ + ok = link_readline(p->library_read_link, buffer, VINE_LINE_MAX, time(0) + active_timeout); + if (!ok) { + return 0; + } + int len_buffer = atoi(buffer); + + /* now read the buffer, which is the task id of the done function invocation. */ + char buffer_data[len_buffer]; + ok = link_read(p->library_read_link, buffer_data, len_buffer, time(0) + active_timeout); + if (ok <= 0) { + return 0; + } + uint64_t done_task_id = (uint64_t)strtoul(buffer_data, NULL, 10); + debug(D_VINE, "Received result for function %" PRIu64, done_task_id); + + /* Reap the completed function call. */ + struct vine_process *pp = (struct vine_process *)itable_lookup(procs_table, done_task_id); + reap_process(pp, manager); + return ok; +} + /* Scan over all of the processes known by the worker, and if they have exited, move them into the procs_complete table @@ -649,14 +720,33 @@ static int handle_completed_tasks(struct link *manager) ITABLE_ITERATE(procs_running, task_id, p) { - if (vine_process_is_complete(p)) { + /* Skip functions, we take care of them later by checking if the library sends any message. */ + if (p->type != VINE_PROCESS_TYPE_FUNCTION && vine_process_is_complete(p)) { /* collect the resources associated with the process */ reap_process(p, manager); /* must reset the table iterator because an item was removed. */ itable_firstkey(procs_running); } + /* Check if the library has any function invocations done, noblock. */ + if (p->library_ready) { + /* Retrieve all results. */ + int result_retrieved = 0; + while (link_usleep(p->library_read_link, 0, 1, 0)) { + result_retrieved = reap_completed_function_call(p, manager); + if (!result_retrieved) { + fatal("Cannot retrieve result for function call %d - %s", + p->task->task_id, + p->task->command_line); + } + } + if (result_retrieved) { + /* Reset iterator in case a function call is reaped. */ + itable_firstkey(procs_running); + } + } } + return 1; }