As we’ve discussed, Ray is useful for managing resources from a single computer up to a cluster. It is simpler to get started with a local installation, which leverages the parallelism of multi-core/multi-CPU. Even when deploying to a cluster you’ll want to have Ray installed locally for development. Once you’ve got Ray installed, we’ll show you how to make and call your first asynchronous parallelized function and store state in an actor.
Tip
|
If you are in a hurry, you can also use gitpod on the example repo to get a web environment with the examples, or check out Anyscales managed Ray. |
Installing Ray, even on a single machine, can range from relatively straightforward to fairly complicated. Ray publishes wheels to PyPi following a normal release cadence as well as nightly releases. These wheels are currently only available for x86 users, so ARM users will mostly need to build Ray from source[1].
Tip
|
M1 ARM users on OSX can use the x86 packages with Rosetta. There is some performance impact, but it’s a much simpler setup. To use the x86s package install Anaconda Python for OSX. |
Most users can run pip install -U ray
to install Ray from PyPi automatically. When you go to distribute your computation on multiple machines it’s often easier to have been working in a conda environment so you can match Python versions with your cluster and know your package dependencies. The commands in Install Ray inside a Conda environment set up a fresh conda environment with Python and install Ray with some minimal dependencies:
link:examples/ray/installRay/ray_setup_on_x86.sh[role=include]
For ARM users or any users with a system architecture that does not have a pre built wheel available, you will need to build Ray from the source.
On our ARM Ubuntu system, we need to install some additional packages, as shown below:
link:examples/ray/installRay/ray_setup_on_arm.sh[role=include]
If you are an M1 Mac user who doesn’t want to use Rosetta you’ll need to install some dependencies. You can install them with homebrew and pip as shown below:
link:examples/ray/installRay/ray_setup_on_arm_osx.sh[role=include]
You need to build some of the Ray components separately because they are written in different languages. This does make it more complicated, but you can follow the steps in Installing the build tools for Ray’s native build tool-chain.:
link:examples/ray/installRay/ray_setup_on_arm.sh[role=include]
Tip
|
The slowest part of the build is compiling the C++ code, which can easily take up to an hour on even modern machines. If you have a cluster with a number of ARM machines, it’s often worth it to build a wheel once and reuse it on your cluster. |
Now that you have Ray installed, it’s time to learn about some of the Ray APIs. We’ll cover these APIs in more detail later, so don’t get too hung up on the details for now.
One of the core building blocks of Ray is that of "remote" functions/futures. The term "remote" here refers to remote to our main process, and can be on the same or a different machine.
To understand this better, you can write a function that returns where it is running. Ray distributes work between multiple processes and, when in distributed mode, multiple hosts. A local (non-ray) version of this function is shown in A local/regular function.
link:examples/ray_examples/helloWorld/Ray-Ch2-Hello-Worlds.py[role=include]
You can use the ray.remote
decorator to create a remote function. Calling remote functions is a bit different and is done by calling .remote
on the function. Ray will immediately return a future when you call a remote function instead of blocking for the result. You can use ray.get
to get the values returned in those futures. To convert A local/regular function to a remote function, all you need to do is use the ray.remote
decorator, as shown in Turning the previous function into a remote function.
link:examples/ray_examples/helloWorld/Ray-Ch2-Hello-Worlds.py[role=include]
When you run these two examples, you’ll see that the first is executed in the same process, and that Ray schedules the second one in another process. When we run the two examples we get "'Running on jupyter-holdenk in pid 33'" and "'Running on jupyter-holdenk in pid 173'" respectively.
While artificial, an easy way to understand how remote futures can help is by making an intentionally slow function, in our case slow_task
, and having Python compute in regular function calls and Ray remote calls.
link:./examples/ray_examples/helloWorld/Ray-Ch2-Hello-Worlds.py[role=include]
When you run the Using Ray to Parallelize an intentionally slow function example, you’ll see that by using Ray remote functions your code is able to execute multiple remote functions at the same time. While you can do this without ray by using multiprocessing
, Ray handles all of the details for you and can also eventually scale up to multiple machines.
Ray is notable in the distributed processing world for allowing nested and chained tasks. Launching more tasks inside of other tasks can make certain kinds of recursive algorithms easier to implement.
One of the more straightforward examples using nested tasks is a web crawler. In the web crawler, each page we visit can launch multiple additional visits to the links on that page.
link:examples/ray_examples/helloWorld/Ray-Ch2-Hello-Worlds.py[role=include]
Many other systems require that all tasks launch on a central coordinator node. Even those that support launching tasks in a nested fashion still usually depend on a central scheduler.
Ray has a somewhat limited dataset API for working with structured data. Apache Arrow powers Ray’s Data API. Arrow is a column-oriented, language-independent format with some popular operations. Many popular tools support Arrow, allowing easy transfer between them (such as Spark, Ray, Dask, Tensorflow, etc.)
Ray only recently added keyed aggregations on datasets with version 1.9. The most popular distributed data example is word count, which requires aggregates. Instead of using these, we can perform embarrassingly parallel tasks, such as map transformations, as shown, by constructing a dataset of web pages shown in Construct dataset of web pages.
link:examples/ray_examples/helloWorld/Ray-Ch2-Hello-Worlds.py[role=include]
Ray 1.9 added GroupedDataset
for supporting different kinds of aggregations. By calling groupby
with either a column name or a function that returns a key, you get a GroupedDataset
. GroupedDataset’s have built in support for count
, max
, min
, and other common aggregations. You can use GroupedDatasets to extend Construct dataset of web pages into a wordcount example as shown in Construct dataset of web pages.
link:examples/ray_examples/helloWorld/Ray-Ch2-Hello-Worlds.py[role=include]
When you need to go beyond the built-in operations, Ray supports custom aggregations provided you implement its interface. We will cover more on Datasets, including aggregate functions, in [ch09].
Note
|
Ray uses blocking evaluation for its Dataset API. This means that when you call a function on a Ray dataset, it will wait until it completes the result instead of returning a future. The rest of the Ray core API uses futures. |
If you want a full-featured DataFrame API, you can convert your Ray dataset into Dask. [ch09] covers how you can use Dask for more complex operations. If you are interested in learning more about Dask, you should check out Holden’s book Scaling Python with Dask (O’Reilly).
One of the unique parts of Ray is its emphasis on actors. Actors give you tools to manage the execution state, which is one of the more challenging parts of scaling systems. Actors send and receive messages, updating their state in response. These messages can come from other actors, programs, or your "main" execution thread with the Ray client. For every actor, Ray starts a dedicated process. Each actor has a mailbox of messages waiting to be processed, and when you call an actor Ray adds a message to the corresponding mailbox. which allows Ray to serialize message processing, thus avoiding expensive distributed locks. Actors can return values in response to messages, so when you send a message to an actor, Ray immediately returns a future so you can fetch the value when done.
Actors have a long history before Ray, and were introduced in 1973. The actor model is an excellent solution to concurrency with state, and can replace complicated locking structures. Some other notable implementations of actors are AKKA in Scala, Erlang,
The actor model can be used for everything from real world systems like e-mail, IOT applications like tracking temperature, to flight booking. A common use case in Ray actors is managing state, e.g., weights, while performing distributed machine learning without requiring expensive locking.[2]
The actor model has challenges with multiple events that need to be processed in order and rolled back as a group. A classic example of this is banking, where transactions need to touch multiple accounts and be rolled back as a group.
Ray actors are created and called similarly to remote functions, but using Python classes, which gives the actor a place to store state. You can see this in action by modifying the classic "Hello World" example to greet you in sequence, as shown in Actor Hello World.
link:examples/ray_examples/helloWorld/Ray-Ch2-Hello-Worlds.py[role=include]
This example is fairly basic; it lacks any fault tolerance or concurrency within each actor. We’ll explore those more in [ch04].
In this chapter, you have installed Ray on your local machine and used many of its core APIs. For the most part, you can continue to run the examples we’ve picked for this book in local mode. Naturally, a local mode can limit your scale or take longer to run. In the next chapter, we’ll look at some of the core concepts behind Ray. One of the concepts (fault tolerance) will be easier to illustrate with a cluster or cloud. So if you have access to a cloud account or a cluster, now would be an excellent time to jump over to [appB] and look at the different deployment options.