Skip to content

Commit

Permalink
Vine: Improve Serverless (#3537)
Browse files Browse the repository at this point in the history
* prelim 1

* save

* work basic

* work ready

* lint

* clean code

* correct function running count

* final fix

* format

* add docs

* fix docs
  • Loading branch information
tphung3 authored Oct 12, 2023
1 parent d455e4c commit 8b9f90d
Show file tree
Hide file tree
Showing 4 changed files with 240 additions and 223 deletions.
8 changes: 3 additions & 5 deletions doc/manuals/taskvine/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -1512,14 +1512,12 @@ function definitions into a library task `libtask`
libtask = m.create_library_from_functions("my_library", my_sum, my_mul)
```

The library task can be further described by any of options available
to normal tasks, such as resources or additional input files:
You can optionally specify the number of functions the library can
run concurrently by setting the number of function slots (default to 1):

=== "Python"
```python
libtask.set_cores(1)
libtask.set_memory(2000)
libtask.set_disk(2000)
libtask.set_function_slots(4) # maximum 4 concurrent functions
```

Once complete, the library task must be `installed` in the system:
Expand Down
203 changes: 124 additions & 79 deletions poncho/src/poncho/library_network_code.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,13 @@ def library_network_code():
import argparse
import traceback
import cloudpickle


import select
import signal

# self-pipe to turn a sigchld signal when a child finishes execution
# into an I/O event.
r, w = os.pipe()

# This class captures how results from FunctionCalls are conveyed from
# the library to the manager.
# For now, all communication details should use this class to generate responses.
Expand Down Expand Up @@ -54,101 +59,141 @@ def remote_wrapper(event):
return remote_wrapper

# Self-identifying message to send back to the worker, including the name of this library.
def send_configuration(config, out_pipe):
# Send back a SIGCHLD to interrupt worker sleep and get it to work.
def send_configuration(config, out_pipe_fd, worker_pid):
config_string = json.dumps(config)
config_cmd = f"{len(config_string)}\n{config_string}"
out_pipe.write(bytes(config_cmd, 'utf-8'))
out_pipe.flush()
os.writev(out_pipe_fd, [bytes(config_cmd, 'utf-8')])
os.kill(worker_pid, signal.SIGCHLD)

# Handler to sigchld when child exits.
def sigchld_handler(signum, frame):
# write any byte to signal that there's at least 1 child
os.writev(w, [b'a'])

# Read data from worker, start function, and dump result to `outfile`.
def start_function(in_pipe_fd):
# read length of buffer to read
buffer_len = b''
while True:
c = os.read(in_pipe_fd, 1)
if c == b'':
print('Library code: cant get length', file=sys.stderr)
exit(1)
elif c == b'\n':
break
else:
buffer_len += c
buffer_len = int(buffer_len)

# now read the buffer to get invocation details
line = str(os.read(in_pipe_fd, buffer_len), encoding='utf-8')
function_id, function_name, function_sandbox = line.split(" ", maxsplit=2)
function_id = int(function_id)

if function_name:
# exec method for now is fork only, direct will be supported later
exec_method = 'fork'
if exec_method == "direct":
library_sandbox = os.getcwd()
try:
os.chdir(function_sandbox)
response = cloudpickle.dumps(globals()[function_name](event))
except Exception as e:
print(f'Library code: Function call failed due to {e}', file=sys.stderr)
sys.exit(1)
finally:
os.chdir(library_sandbox)
else:
p = os.fork()
if p == 0:
# parameters are represented as infile.
os.chdir(function_sandbox)
with open('infile', 'rb') as f:
event = cloudpickle.load(f)

# output of execution should be dumped to outfile.
with open('outfile', 'wb') as f:
cloudpickle.dump(globals()[function_name](event), f)
os._exit(0)
elif p < 0:
print(f'Library code: unable to fork to execute {function_name}', file=sys.stderr)
result = None
success = False
reason = f'unable to fork-exec function {function_name}'
response = LibraryResponse(result, success, reason).generate()

# return pid and function id of child process to parent.
else:
return p, function_id
else:
# malformed message from worker so we exit
print('malformed message from worker. Exiting..', file=sys.stderr)
exit(1)
return -1

# Send result of a function execution to worker. Wake worker up to do work with SIGCHLD.
def send_result(out_pipe_fd, task_id, worker_pid):
buff = bytes(str(task_id), 'utf-8')
buff = bytes(str(len(buff)), 'utf-8')+b'\n'+buff
os.writev(out_pipe_fd, [buff])
os.kill(worker_pid, signal.SIGCHLD)

def main():
ppid = os.getppid()
parser = argparse.ArgumentParser('Parse input and output file descriptors this process should use. The relevant fds should already be prepared by the vine_worker.')
parser.add_argument('--input-fd', required=True, type=int, help='input fd to receive messages from the vine_worker via a pipe')
parser.add_argument('--output-fd', required=True, type=int, help='output fd to send messages to the vine_worker via a pipe')
parser.add_argument('--worker-pid', required=True, type=int, help='pid of main vine worker to send sigchild to let it know theres some result.')
args = parser.parse_args()

# Open communication pipes to vine_worker.
# The file descriptors are inherited from the vine_worker parent process
# and should already be open for reads and writes.
# Below lines only convert file descriptors into native Python file objects.
in_pipe = os.fdopen(args.input_fd, 'rb')
out_pipe = os.fdopen(args.output_fd, 'wb')
in_pipe_fd = args.input_fd
out_pipe_fd = args.output_fd

# send configuration of library, just its name for now
config = {
"name": name(),
}
send_configuration(config, out_pipe)
send_configuration(config, out_pipe_fd, args.worker_pid)

# A pair of pipes to communicate with the child process if needed.
read, write = os.pipe()
# mapping of child pid to function id of currently running functions
pid_to_func_id = {}

while True:
while True:
# wait for message from worker about what function to execute
try:
# get length of first buffer
# remove trailing \n
buffer_len = int(in_pipe.readline()[:-1])
# if the worker closed the pipe connected to the input of this process, we should just exit
# stderr is already dup2'ed to send error messages to an output file that can be brought back for further analysis.
except Exception as e:
print("Cannot read message from the manager, exiting. ", e, file=sys.stderr)
sys.exit(1)
# register sigchld handler to turn a sigchld signal into an I/O event
signal.signal(signal.SIGCHLD, sigchld_handler)

# 5 seconds to wait for select, any value long enough would probably do
timeout = 5

# read first buffer, this buffer should contain only utf-8 chars.
line = str(in_pipe.read(buffer_len), encoding='utf-8')
function_name, event_size, function_sandbox = line.split(" ", maxsplit=2)

if event_size:
event_size = int(event_size)
event_str = in_pipe.read(event_size)

# load the event into a Python object
event = cloudpickle.loads(event_str)

# see if the user specified an execution method
exec_method = event.get("remote_task_exec_method", None)

# library either directly executes or forks to do so.
if exec_method == "direct":
library_sandbox = os.getcwd()
try:
os.chdir(function_sandbox)
response = cloudpickle.dumps(globals()[function_name](event))
except Exception as e:
print(f'Library code: Function call failed due to {e}', file=sys.stderr)
sys.exit(1)
finally:
os.chdir(library_sandbox)
else:
p = os.fork()

# child executes and pipes result back to parent.
if p == 0:
os.chdir(function_sandbox)
response = cloudpickle.dumps(globals()[function_name](event))
written = 0
buff = len(response).to_bytes(8, sys.byteorder)+response
while written < len(buff):
written += os.write(write, buff[written:])
os._exit(0)
elif p < 0:
print(f'Library code: unable to fork to execute {function_name}', file=sys.stderr)
result = None
success = False
reason = f'unable to fork-exec function {function_name}'
response = LibraryResponse(result, success, reason).generate()

# parent collects result and waits for child to exit.
while True:
# check if parent exits
c_ppid = os.getppid()
if c_ppid != ppid or c_ppid == 1:
exit(0)

# wait for messages from worker or child to return
rlist, wlist, xlist = select.select([in_pipe_fd, r], [], [], timeout)

for re in rlist:
# worker has a function, run it
if re == in_pipe_fd:
pid, func_id = start_function(in_pipe_fd)
pid_to_func_id[pid] = func_id
else:
# at least 1 child exits, reap all.
# read only once as os.read is blocking if there's nothing to read.
# note that there might still be bytes in `r` but it's ok as they will
# be discarded in the next iterations.
os.read(r, 1)
while len(pid_to_func_id) > 0:
c_pid, c_exit_status = os.waitpid(-1, os.WNOHANG)
if c_pid > 0:
send_result(out_pipe_fd, pid_to_func_id[c_pid], args.worker_pid)
del pid_to_func_id[c_pid]
# no exited child to reap, break
else:
response_len = b''
while len(response_len) < 8:
response_len += os.read(read, 8-len(response_len))
response_len = int.from_bytes(response_len, sys.byteorder)
response = b''
while len(response) < response_len:
response += os.read(read, response_len-len(response))
os.waitpid(p, 0)

out_pipe.write(bytes(str(len(response)), 'utf-8')+b'\n'+response)
out_pipe.flush()
break
return 0
Loading

0 comments on commit 8b9f90d

Please sign in to comment.