Skip to content

Latest commit

 

History

History
212 lines (148 loc) · 21.9 KB

ch03.asciidoc

File metadata and controls

212 lines (148 loc) · 21.9 KB

Remote Functions

You often need some form of distributed or parallel computing when building modern applications at scale. Many Python developers' introduction to parallel computing is through the multiprocessing module. Multiprocessing is limited in its ability to handle the requirements of modern applications. These requirements include the following:

  • Running the same code on multiple cores or machines

  • Tooling to handle machine and processing failures.

  • Efficiently handling large parameters.

  • Easily passing information between processes.

Unlike multiprocessing, Ray’s remote functions satisfy the above requirements. It’s important to note that "remote", doesn’t necessarily refer to a separate computer despite its name - it could be running on the same machine. What Ray does provide is mapping function calls to the right process on your behalf - Ray takes over distributing calls to that function instead of running in the same process. When calling remote functions, you are effectively running asynchronously[1] on multiple cores or different machines, without having to concern yourself with how or where.

In this chapter, you will learn how to create remote functions, wait for their completion, and fetch results. Once you’ve got the basics down, you will learn to compose remote functions together to create more complex operations. Before you go too far, let’s start with understanding some of what we glossed over in the previous chapter.

Understanding essentials of Ray remote functions

In the previous chapter, you learned how to create a basic Ray remote function, [sleepy_task].

When you call a remote function, it immediately returns an ObjectRef (a future), which is a reference to a remote object. Ray creates and executes a task itself in the background on a separate worker process and writes the result when finished into the original reference. You can then call ray.get on the ObjectRef to obtain the value. Note that ray.get is a blocking method waiting for task execution completion before returning the result.

Remote objects in Ray

A remote object is just an object, which may be on another node. ObjectRef’s are like pointers or IDs to objects that you can use to get the value from or status of the remote function. In addition to being created from remote function calls, you can also create ObjectRefs explicitly by using the ray.put function.

We’ll explore remote objects' more & their fault tolerance in [ray_objects].

Some details in the [sleepy_task] example from the previous chapter are worth understanding. The example converts the iterator from a iterator to a list before passing it to ray.get. You need to do this when calling ray.get takes in a list of futures or an individual future.[2] ray.get waits until it has all of the objects so that it can return the list in order.

Tip

As with regular ray remote functions, it’s important to think about the amount of work done inside each remote invocation. For example, using ray.remote to compute factorials recursively will be slower than doing it locally since the work inside each function is small even though the overall work can be large. The exact amount depends on how busy your cluster is, but as a general rule anything that is executed in under a few seconds without any special resources is not worth scheduling remotely.

Remote functions lifecycle

The invoking Ray process, called the owner, of a remote function schedules the execution of a submitted task and facilitates the resolution of the returned ObjectRef to its underlying value if needed.

On task submission, the owner waits for all dependencies, i.e. `ObjectRef`s that were passed as an argument to the task, to become available before scheduling. The dependencies can be local or remote, and the owner considers the dependencies to be ready as soon as they are available anywhere in the cluster. When the dependencies are ready, the owner requests resources from the distributed scheduler to execute the task. Once resources are available, the scheduler grants the request and responds with the address of a worker that will execute the function.

At this point the owner sends the task specification over gRPC to the worker. After executing the task, the worker stores the return values. If the return values are small[3], the worker returns the values inline directly to the owner, which copies them to its in-process object store. If the return values are large, the worker stores the objects in its local shared memory store and replies to the owner indicating that the objects are now in distributed memory. This allows the owner to refer to the objects without having to fetch the objects to its local node.

When a task is submitted with an ObjectRef as its argument, the worker must resolve its value before it can start executing the task.

Tasks can end in an error. Ray distinguishes between two types of task errors:

  • Application-level. This is any scenario where the worker process is alive, but the task ends in an error. For example, a task that throws an IndexError in Python.

  • System-level. This is any scenario where the worker process dies unexpectedly. For example, a process that segfaults, or if the worker’s local raylet dies.

Tasks that fail due to application-level errors are never retried. The exception is caught and stored as the return value of the task. Tasks that fail due to system-level errors may be automatically retried up to a specified number of attempts. This is covered in more detail in [fault_tolerance].

In our examples so far, using ray.get was fine because the futures all had the same execution time. If the execution times are different, such as training, a model on different-sized batches of data, and you don’t need all of the results at the same time, this can be quite wasteful. Instead of directly calling ray.get, you should use ray.wait, which returns the requested number of futures that have already been completed. To see the performance difference, you will need to modify your remote function to have a variable sleep time, as in Remote function with different execution times.

Example 1. Remote function with different execution times
link:examples/ray_examples/remote/Ray-Remote.py[role=include]

As you recall, the example remote function sleeps based on the input argument. Since the range is ascending in order, calling the remote function on it will result in futures that are complete in order. To ensure that the futures won’t complete in order you will need to modify the list, one way you can do this is by calling things.sort(reverse=True) prior to mapping your remote function over things.

To see the difference between using ray.get and ray.wait, you can write a function that collects the values from your futures with some time delay on each object to simulate business logic.

The first option, not using ray.wait, is a bit simpler and cleaner to read, as shown in Ray get without the wait, but is not recommended for production use:

Example 2. Ray get without the wait
link:examples/ray_examples/remote/Ray-Remote.py[role=include]

The second option is a bit more complex, as shown in Using ray wait. This works by calling ray.wait to find the next available future and iterating until all of the futures have been completed. ray.wait returns two lists, one of the object references for completed tasks (of the size requested, which defaults to one) and another list of the rest of the object references.

Example 3. Using ray wait
link:examples/ray_examples/remote/Ray-Remote.py[role=include]

Running these functions side by side with timeit.time, you can see the difference in performance. It’s important to note that this performance improvement depends on how long the non-parallelized business logic (the logic in the loop) takes. If you’re just summing the results, using ray.get directly could be OK, but if you’re doing something more complex, you should use ray.wait. When we run this, we see roughly a 2x performance improvement with ray.wait. You can try varying the sleep times and see how it works out.

You may wish to specify one of the few optional parameters to ray.wait:

  • num_returns - The number of ObjectRefs for Ray to wait for completion before returning. You should set num_returns to less than or equal to the length of the input list of ObjectRefs; otherwise, the function throws an exception.[4] The default value is 1.

  • timeout - The maximum amount of time in seconds to wait before returning. This defaults to -1 (which is treated as infinite).

  • fetch_local - You can disable fetching of results by setting this to false if you are only interested in ensuring the futures are completed.

Tip

The timeout parameter is extremely important in both ray.get() and ray.wait(). If this parameter is not specified and one of your remote functions misbehaves (i.e. never completes), then the ray.get()/ray.wait() will never return, and your program will block forever.[5] As a result, for any production code, we recommend that you use the timeout parameter in both to avoid deadlocks.

Ray’s get and wait functions handle timeouts slightly differently. Ray doesn’t raise an exception on ray.wait when a timeout occurs; instead, it simply returns fewer ready futures than num_returns. However, if ray.get encounters a timeout Ray will raise a GetTimeoutError. Note that the return of the wait/get function does not mean that your remote function will be terminated; it will still run in the dedicated process. You can explicitly terminate your future (see below) if you want to release the resources.

Tip

Since ray.wait can return results in any order it’s essential to not depend on the order of the results. If you need to do different processing with different records (e.g., test a mix of group A and group B), you should encode this in the result (often with types).

If you have a task that does not finish in a reasonable time (e.g., a straggler), you can cancel the task using ray.cancel with the same ObjectRef used to wait/get. You can modify the previous ray.wait example to add a timeout and cancel any "bad" tasks, resulting in something like Ray wait with a timeout plus cancel.

Example 4. Ray wait with a timeout plus cancel
link:examples/ray_examples/remote/Ray-Remote.py[role=include]
Warning

Canceling a task should not be part of your normal program flow. If you find yourself having to kill tasks frequently, you should investigate what’s going on. Any subsequent calls to wait or get for a canceled task are unspecified and could raise an exception or return incorrect results.

Another minor point that we skipped in the previous chapter is that while the examples so far return only a single value. Ray remote functions can return multiple values, as with regular Python functions.

Fault tolerance is an important consideration for those running in a distributed environment. If the worker executing the task dies unexpectedly[6] Ray will rerun the task (after a delay) until either the task succeeds or the maximum number of retries is exceeded. We cover fault tolerance more in [ch05].

Composition of remote Ray functions

You can make your remote functions even more powerful by composing them. The two most common methods of composition with remote functions in Ray are pipelining and nested parallelism. You can compose your functions with nested parallelism to express recursive functions. Ray also allows you to express sequential dependencies without having to block/collect the result in the driver, known as "pipelining."

You can build a pipelined function by using ObjectRefs from an earlier ray.remote as parameters for a new remote function call. Ray will automatically fetch the ObjectRefs and pass the underlying objects to your function. This approach allows for easy coordination between the function invocations. Additionally, such an approach minimizes data transfer - the result will be sent directly to the node where execution of the second remote function is executed. A simple example of such sequential calculation is presented below Ray pipelining/sequential remote execution with tasks dependency.

Example 5. Ray pipelining/sequential remote execution with tasks dependency
link:examples/ray_examples/remote/Ray-Remote.py[role=include]

This code defines two remote functions and then starts three instances of the first one. ObjectRefs for all three instances are then used as an input for the second function. In this case, Ray will wait for all three instances to complete before starting to execute sum_values. You can use this approach not only for passing data but also for expressing basic workflow style dependencies. There is no restriction on the number of ObjectRef’s you can pass, and you can also pass "normal" Python objects at the same time.

You cannot use Python structures (for example, lists, dictionaries, or classes) containing ObjectRef instead of using ObjectRef directly. Ray only waits for and resolves ObjectRef`s that are passed directly to a function. If you attempt to pass a structure, you will have to do your own `ray.wait + ray.get inside of the function. Here is a variation of code from Ray pipelining/sequential remote execution with tasks dependency that does not work Broken sequential remote function execution with tasks dependency:

Example 6. Broken sequential remote function execution with tasks dependency
link:examples/ray_examples/remote/Ray-Remote.py[role=include]

The Broken sequential remote function execution with tasks dependency example has been modified from Ray pipelining/sequential remote execution with tasks dependency to take a list of ObjectRefs as parameters instead of ObjectRefs themselves. Ray does not "look inside" any structure being passed in. As such, the function will be invoked immediately, and since types won’t match, the function will fail with an error TypeError: unsupported operand type(s) for +: 'int' and 'ray._raylet.ObjectRef'. You could fix this error by using ray.wait + ray.get, but this would still launch the function too early resulting in unnecessary blocking.

The other composition approach is nested parallelism where your remote function launches additional remote functions. This can be very useful in many cases, including recursive algorithms, combining hyperparameter tuning with parallel model training,[7] and more. Let’s take a look at two different ways you can implement nested parallelism below Nested parallelism implementation:

Example 7. Nested parallelism implementation
link:examples/ray_examples/remote/Ray-Remote.py[role=include]

This code defines three different remote functions:

  • generate_numbers is a simple function that generate random numbers

  • remote_objrefs invokes several remote functions and returns resulting ObjectRefs

  • remote_values invokes several remote functions, waits for their completion and returns the resulting values.

As you can see from this example, nested parallelism allows for two different approaches. In the first case (remote _objrefs), you return all the ObjectRefs to the invoker of the aggregating function. In the first case, the invoking code is responsible for waiting for all the remote functions’ completion and processing the results. In the second case (remote _values), the aggregating function waits for all of the remote functions’ executions to complete and returns the actual execution results.

Returning all of the ObjectRefs allows for more flexibility with non-sequential consumption, as described back in ray.await, but it is not suitable for many recursive algorithms. With many recursive algorithms (e.g., quicksort, factorial, etc.) we have many levels of a combination step that need to be performed, requiring that the results be combined at each level of recursion.

Ray remote best practices

When you are using remote functions, keep in mind that you don’t want to make them too small. If the tasks are very small, using Ray can take longer than if you used Python without Ray. The reason for this is that every task invocation has a non-trivial overhead (e.g., scheduling, data passing, inter-process communication, updating the system state). To get the real advantage from the parallel execution, you need to make sure that this overhead is negligible compared to the execution time of the function itself.[8]

As described in this chapter, one of the most powerful features of Ray remote is the ability to parallelize functions' execution. Once you call the remote functions, the handle to the remote object (future) is returned immediately, and the invoker can continue execution either locally or with additional remote functions. If, at this point, you call ray.get(), your code will block waiting for a remote function to complete, and as a result you will have no parallelism. To ensure parallelization of your code you should invoke ray.get() only at the point when you absolutely need the data to continue the main thread of execution. Moreover, as described above, it is recommended to use ray.wait instead of ray.get directly. Additionally, if the result of one remote function is required for the execution of another remote function(s), consider using pipelining (described above) to leverage Ray’s task coordination.

When you submit your parameters to remote functions, Ray does not submit them directly to the remote function, but rather copies the parameters into object storage and then passes ObjectRef as a parameter. As a result, if you send the same parameter to multiple remote functions you are paying a (performance) penalty for storing the same data to the object storage several times. The larger the size of the data, the larger the penalty is. To avoid this, if you need to pass the same data to multiple remote functions, a better option is to first put the shared data in object storage and use the resulting ObjectRef as a parameter to the function. We illustrate how to do this in [ray_objects].

As we will show in Ch5, remote function invocation is done by Raylet component. If you invoke a lot of remote functions from a single client, all these invocations are done by a single Raylet and as a result, it takes a certain time for a given Raylet to process these requests, which can cause a delay in starting all of the functions. A better approach, as described in Ray design patterns is to use invocation tree - nested function invocation as described in the previous section. Basically, a client creates several remote functions, each of which, in turn, creates more remote functions and so on. In this approach, the invocations are spread across multiple Raylets, allowing scheduling to happen faster.

Every time you define a remote function using @ray.remote decorator Ray exports these definitions to all Ray workers, which takes time (especially if you have a lot of nodes). To reduce the number of function exports, a good practice is to define as many of the remote tasks on the top level outside of the loops and local functions using them.

Bringing It Together With An Example

Machine learning models composed of other models, e.g. ensemble models, are well suited to evaluation with Ray. The Ensemble Example shows what it looks like to use Ray’s function composition for a hypothetical spam model for web links.

Example 8. Ensemble Example
link:examples/ray_examples/remote/Ray-Remote.py[role=include]

By using Ray instead of taking the summation of the time to evaluate all of the models, you instead only need to wait for the slowest model and all other models that finish faster are "free." For example, if the models took equal lengths of time to run evaluating these models serialy, without Ray, would take almost 3 times as long.

Conclusion

In this chapter, you learned about a fundamental Ray feature — remote functions' invocation and their usage for the creation of parallel asynchronous execution of Python across multiple cores and machines. You also learned multiple approaches for waiting for remote functions execution completion and how to use ray.wait to prevent deadlocks in your code.

Finally, you have learned about remote functions composition and how you can use it for rudimentary execution control (mini workflows). You have also learned how to implement nested parallelism, where you can invoke several functions in parallel with each of these functions in turn, invoking more parallel functions. In the next chapter, you will learn how to manage the state in Ray using actors.


1. A fancy way of saying running multiple things at the same time without waiting on each other.
2. Ray does not "go inside" classes or structures to resolve futures, so if you have a list of lists of futures or a class containeng a future Ray will not resolve the "inner" future.
3. Less than 100KiB by default.
4. Currently if the list of ObjectRefs passed in is empty Ray treats it as a special case, and returns immediately regardless of the value of num_returns.
5. If you’re working interactively, you can fix this with a SIGINT or the stop button in Jupyter.
6. either because the process crashed or because the machine failed
7. You can then train multiple models in parallel and train each of the models using data parallel gradient computations resulting in nested parallelism.
8. As an exercise you can remove sleep from function in [sleepy_task] and you will see that execution of remote functions on Ray takes several times longer compared to regular functions invocation. Overhead is not constant, but rather depends on your network, size of the invocation parameters, etc. For example, if you have only small bits of data to transfer the overhead will be lower than if you are transferring say the entire text of wikipedia as a parameter.