Skip to content

Latest commit

 

History

History
545 lines (394 loc) · 40.2 KB

ch05.asciidoc

File metadata and controls

545 lines (394 loc) · 40.2 KB

Ray Design Details

Now that you’ve created and worked with remote functions and actors, it’s time to learn what’s happening behind the scenes. In this chapter, you will learn about important distributed system concepts, like fault tolerance, how Ray manages resources, and ways to speed up your remote functions and actors. Many of these details are most important when using Ray in a distributed fashion, but even local users benefit. Having a solid grasp of how Ray is working will help you decide how and when to use it.

Fault Tolerance

Fault tolerance refers to how a system will handle failures of everything from user code to the framework itself or the machines it runs on. Ray has a different fault tolerance mechanism tailored for each system. Like many systems, Ray can not recover from the "head" node failing.[1]

Overall, Ray’s architecture (Overall Ray architecture) consists of an application layer and a system layer, both of which can handle failures.

Warning

There are some "non-recoverable" errors that exist in Ray, which you can not (at present) configure away. If the head node, global control store, or the connection between your application and the head node fails, your application will fail and not be able to be recovered by Ray. If you require fault tolerance for these situations, you will have to roll your own high availability, likely using Zookeeper or similar lower-level tools.

The system layer consists of three major components: a global control store, a distributed scheduler, and a distributed object-store. Except for the global control store, all components are horizontally scalable and fault-tolerant.

spwr 0501
Figure 1. Overall Ray architecture

At the heart of Ray’s architecture is a global control store (GCS) that maintains the entire control state of the system. Internally, the GCS is a key-value store with pubsub[2] functionality. At present, the GCS is a single point of failure and runs on the head node.

Usage of GCS, centrally maintaining Ray’s state significantly simplifies overall architecture, by enabling the rest of the system layer component to be stateless. This design is fundamental for fault tolerance (i.e., on failure, components simply restart and read the lineage from the GCS) and makes it easy to scale the distributed object store and scheduler independently, as all components share the needed state via the GCS.

Since remote functions do not contain any persistent state, recovering from their failure is relatively simple. Ray will try again until it succeeds or reaches a maximum number of retries. As seen in the previous chapter, you can control the number of retries through the max_retries parameter in the @ray.remote annotation. To try out and better understand Ray’s fault tolerance, you should write a "flaky" remote function that fails some % of the time as shown in Auto retry remote functions.

Example 1. Auto retry remote functions
link:examples/ray_examples/concepts/Ray-Concepts.py[role=include]

If your flaky function fails, you will see WARNING worker.py:1215 — A worker died or was killed while executing a task by an unexpected system error. output to stderr. You’ll still get back the correct value when you execute ray.get, demonstrating Ray’s fault tolerance.

Tip

Alternatively, to see fault tolerance in action, if you’re running a distributed Ray cluster, you can find the node running your remote function by returning the hostname and then shut down the node while running a request.

Remote actors are a complicated case for fault tolerance as they contain state within them. In [ch04], this is why you explored the different options for persisting & recovering that state. Actors can experience failure at any stage: setup, message processing, or between messages.

Unlike Ray Remote functions, if an actor fails while processing a message, Ray does not automatically retry it. This is true even if you have set max_restarts. Ray will restart your actor for processing the next message. On error, you will get back a RayActorError exception.

Tip

Ray actors are lazily initialized, so failure during the init stage is the same as failing on the first message.

When an actor fails between messages, Ray automatically attempts to recover the actor the next time it is called, up to max_retries times. If you’ve written your state recovery code well, failures between messages are generally invisible besides slightly slower processing times. If you don’t have state recovery, each restart will reset the actor to the initial values.

If your application fails, nearly all of the resources your application was using will eventually be garbage collected. The one exception is detached resources[3], which Ray will restart as configured beyond the life of your current program provided the cluster does not fail.[4]

Ray does not automatically attempt to recreate lost objects after they are first stored. You can configure Ray to try and recreate lost objects when accessed. In the next section, you’ll learn more about the Ray objects and how to configure that resiliency.

Ray Objects

Ray objects can contain anything serializable (covered in the next section), including references to other ray Objects, called ObjectRefs. An ObjectRef, is essentially a unique ID that refers to a remote object and is conceptually similar to futures. Ray objects are created automatically for task results, and large parameters of actors and remote functions. You can manually create objects by calling ray.put, which will return an immediately ready ObjectRef, e.g. o = ray.put(1).

Tip

In general, initially, small objects are stored in their owner’s in-process store while Ray stores large objects on the worker, which generates them. This allows Ray to balance each object’s memory footprint and resolution time.

The owner of an object is the worker that created the initial ObjectRef, by submitting the creating task or calling ray.put. The owner manages the lifetime of the object through reference counting.

Tip

Reference counting makes it especially important when defining objects to set them to None when you are done with them or make sure they go out of scope. Ray’s reference counting is susceptible to circular references, where objects refer to each other. Printing the objects stored in the cluster by running ray memory --group-by STACK_TRACE can be a good way to find objects Ray cannot garbage collect.

Ray objects are immutable, meaning they can not be modified. It’s important to note that if you change an object you’ve read from Ray (e.g. with a ray.get) or stored in Ray (e.g. with a ray.put) it won’t be reflected in the object store. For example Immutable Ray Objects.

Example 2. Immutable Ray Objects
link:examples/ray_examples/concepts/Ray-Concepts.py[role=include]

When you run the above code you can see that while you can mutate a value, the change won’t propagate to the object store.

If a parameter or return value is large and used more than once, or medium-sized and used frequently, then it can be worthwhile to store it as an object explicitly. You can then use the ObjectRef in place of the regular parameter, and it will automatically be translated for you, as shown in Use Ray Put.

Example 3. Use Ray Put
link:examples/ray_examples/concepts/Ray-Concepts.py[role=include]

When another node needs an object it asks the owner for who has any copies of the object and then fetches and creates a local copy of that object. This means there can be many copies of the same object in object stores on different nodes. Ray does not pro-actively replicate objects, so it is also possible that Ray may only have one copy of an object.

By default, Ray will raise an ObjectLostError when you attempt to get a lost object. You can enable recomputing by providing enable_object_reconstruction=True to ray.init or adding --enable-object-reconstruction to ray start. This recomputation[5] will only happen once the object is needed, e.g. it is lazy on resolution.

There are two possible ways to lose an object. Since the owner is responsible for reference counting, if the owner is lost, the object is lost regardless of whether there are other copies of the object. If there are no copies of an object left Ray, e.g. all the nodes having it stored die, Ray also loses the object.[6]

Tip

Ray will follow the max_retries limit discussed above during reconstruction.

Ray’s object store uses reference counting garbage collection[7] to cleanup objects which your program doesn’t need anymore. The object store keeps track of both direct references, and ObjectRefs which refer to other ObjectRefs[8].

Even with garbage collection, an object store can fill up with objects. When an object store fills up, Ray will first execute garbage collection, removing objects with no references. If there is still memory pressure, the object store will attempt to spill to disk. Spilling to disk copies objects from memory to disk and is called spilling since it happens when the memory usage overflows.

Note

Earlier versions of Ray had the ability to create per-actor object eviction by setting a object_store_memory limit.

It’s possible that you may wish to fine-tune the object store settings. Depending on your use case, you may need more or less memory for the object store. You configure the object store through the _system_config settings. Two important configuration options include the minimum aggregate size to spill to disk, min_spilling_size, and total memory allocated to the object store, object_store_memory_mb. You can set these when calling ray.init as shown in Ray Object Store Configuration.

If you have a mixture of fast and slow disks (e.g. SSD, HDD, network), you should consider using the faster storage for spilled objects. Unlike the rest of the storage configs, you configure the spilled object storage location with a nested JSON blob. Like the rest of the object store settings, object_spilling_config is stored under _system_config. This is a bit counterintuitive, but if your machine had fast temporary storage at "/tmp/fast" you would configure Ray to use it as in Ray Object Store Configuration.

Example 4. Ray Object Store Configuration
link:examples/ray_examples/concepts/Ray-Concepts.py[role=include]

Frameworks like Ray use serialization to pass both data and functions between workers. Before Ray can transfer an object into the object store, it must first serialize the object.

Serialization/Pickling

Ray, and systems like it, depend on serialization, to be able to store and move data (and functions) between different processes[9]. Not all objects are serializable, and as a result, can not move between workers. In addition to the object store and IPC, fault tolerance depends on serialization so the same restrictions apply. There are many different kinds of serialization, from multi-language data-only tools like JSON and Arrow to Python’s internal pickle. Serializing with pickle is called "pickling." Pickling can handle a wider range of types than JSON, but can only be used between Python processes. Pickling does not work for all objects —in most cases, there is no good way to serialize (like a network connection), and in other cases, this is because no one has had the time to implement one.

In addition to communicating between processes, Ray also has a shared in-memory object store. This object store allows multiple processes on the same computer to share objects.

Ray uses a few different serialization techniques depending on the use case. With some exceptions, Ray’s Python libraries generally use a fork of Cloudpickle, an improved pickle. For Datasets, Ray tries to use Apache Arrow (Arrow) and will fall back to cloudpickle when Arrow does not work. Ray’s Java libraries use a variety of serializers, including "fast-serialization", and msgpack. Internally, Ray uses protocol buffers between workers. As a Ray Python developer, you will benefit the most from an in-depth understanding of the Cloudpickle and Arrow serialization tools.

Cloudpickle

Cloudpickle serializes the functions, actors, and most of the data in Ray. Most non-distributed Python code doesn’t depend on serializing functions. However, cluster computing often does require serializing functions. Cloudpickle is a project designed for cluster computing and can serialize and deserialize more functions than Python’s built-in pickle.

Tip

If you are uncertain why some data is not serializable, you can either try looking at the stack traces or use Ray’s ray.util.inspect_serializability function.

When pickling classes, Cloudpickle still uses the same extension mechanisms (getnewargs, getstate, setstate, etc.) as pickling. You can write a custom serializer if you have a class with non-serializable components, such as a database connection. While this won’t allow you to serialize things like database connections, you can instead serialize the information required to create a similar object. Custom serializer takes this approach by serializing a class containing a thread pool.

Example 5. Custom serializer
link:examples/ray_examples/concepts/Ray-Concepts.py[role=include]

Alternatively, Ray allows you to register serializers for classes. This approach allows you to change the serialization of classes that are not your own, as shown in Custom serializer, external class.

Example 6. Custom serializer, external class
link:examples/ray_examples/concepts/Ray-Concepts.py[role=include]

Otherwise you would need to subclass and extend the classes, which can make your code difficult to read when working with external libraries.

Note

Cloudpickle requires that the version of Python loading and the version of Python reading is exactly the same. This requirement carries forward and means that all of Ray’s workers must have the same Python version.

Apache Arrow

As mentioned before, Ray uses Apache Arrow to serialize Datasets when possible. Ray DataFrames can have types that are not supported by Apache Arrow. Under the hood, Ray performs schema inference or translation when loading data into Datasets. If Arrow can not represent a type, Ray serializes the dataset using lists via Cloudpickle.

Apache Arrow works with many data processing and machine learning tools, including Pandas, PySpark, Tensorflow, and Dask. Arrow is a columnar format with a strongly typed schema. It is generally more space-efficient than pickling, and it can be used not only between different versions of Python but also between programming languages (e.g. Rust/C/Java/Python/CUDA/ etc.)

Note

Not all tools using Apache Arrow support all of the same data types. For example, Arrow supports nested columns, which Pandas does not.

gRPC

While you won’t interact directly with gRPC(gRPC remote procedure calls) in the same way you do with cloudpickle and arrow, it forms the foundation of communication inside of Ray. gRPC uses protocol buffers for serialization, which is incredibly fast for small objects. Larger objects are serialized with Arrow or cloudpickle and put in Ray’s object store. Like Apache Arrow, gRPC & protocol buffers have native implementations in all of the languages used in Ray.

Resources/Vertical Scaling

By default Ray assumes that all functions and actors have the same resource requirements (e.g. 1 CPU). For actors or functions with different resource requirements, you can specify the resources that your function or actor needs. The scheduler will attempt to find a node that has these resources available, and if there are none, the autoscaler, covered next, will attempt to allocate a node that meets those requirements.

The ray.remote decorator takes num_cpus, num_gpus, and memory as parameters to indicate the amount of resources an actor or remote function will consume. The defaults are one CPU and zero GPUs.

Tip

When no CPU requirements are specified, the resource allocation behaviour is different between remote functions and actors. In the case of remote functions 1 CPU requirement is used for both allocation and running. Alternatively, in the case of actors, if no CPU resources are specified, Ray uses 1 CPU for scheduling and 0 CPU for running. This means the actor cannot get scheduled on a zero-cpu node, but an infinite number of them can run on any non-zero cpu node. On another hand, if resources are specified explicitly, they are required for both scheduling and running. We recommend to always specify CPU resource requirements explicitely and not rely on defauts.

You can override this value by specifying required resources in the @ray.remote annotation, for example, usage of annotation @ray.remote(num_cpus=4, num_gpus=2) will request four CPUs and two GPUs for function execution.

Tip

Most resource requests in Ray are "soft", which means that Ray does not enforce or guarantee the limits, but does its best to try and meet them.

If you know how much memory a task or actor requires, you can specify it in the resource requirements of its ray.remote annotation to enable memory-aware scheduling[10]. For example, @ray.remote(memory=500 * 1024 * 1024) will request 500MiB of memory to for this task[11].

Ray Memory usage

Ray memory usage is split into two main groups - memory used by Ray itself (Ray system memory) and memory used by applications (Ray application memory). Ray’s system memory is currently comprised of:

  • Redis: memory used for storing the list of nodes and actors present in the cluster. The amount of memory used for these purposes is typically quite small.

  • Raylet: memory used by the C++ raylet process running on each node. This cannot be controlled but is typically quite small.

Ray application memory is comprised of:

  • Worker heap: memory used by users’ application, best measured as the resident set size (RSS) of your application minus its shared memory usage (SHR) in commands such as top.

  • Object store memory: memory used when your application creates objects in the object store via ray.put and when returning values from remote functions. Objects are evicted when they fall out of scope. There is an object store server running on each node. Objects will be spilled to disk if the object store fills up.

  • Object store shared memory: memory used when your application reads objects via ray.get.

To help you to debug some of the memory issues Ray provides a ray memory command that can be invoked from the command line from the machine where the Ray node is running (at the time of this writing there is no corresponding API). This command allows you to get a dump of all of the ObjectRef references that are currently held by the driver, actors, and tasks in the cluster. This allows to track down what ObjectRef references are in scope and may be causing an ObjectStoreFullError.

Ray can also keep track of and assign custom resources using the same mechanism as memory and CPU resources. When the worker process is starting, it needs to know all of the resources that are present. For manually launched workers you specify the customs resources with a --resources argument. For example, on a mixed ARCH cluster, you might want to add --resources=\{"x86": "1"} to the x86 nodes and --resources\{"arm64":"1"} to the ARM nodes. If See [app_deploying] to configure resources with your deployment mechanism.

Tip

These resources don’t need to be limited to hardware, if you have certain libraries or datasets only available on some nodes due to licensing, you can use the same technique.

So far we’ve focused on horizontal scaling, but you can also use Ray to get more resources for each process. Scaling by using machines with more resources is known as vertical scaling. You can request different amounts of memory, CPU cores, or even GPUs from Ray for your tasks and actors. The default Ray configuration only supports machines of the same size, but as covered in [appB], you can create multiple node types. If you create node or container types of different sizes this can be used for vertical scaling.

Autoscaler

One of the important components of Ray is the autoscaler, which is responsible for managing the workers. More specifically, autoscaler is responsible for the following three functions[12]:

  • Launching of the new workers (based on demand) including:

    • Uploading user defined files or directories

    • Running init/setup/start commands on the started worker

  • Termination of worker nodes if:

    • The node is idle

    • The node is failing to startup/initialize

    • The node configuration changed

  • Restarting workers if:

    • The Raylet running a worker crashes

    • Worker’s set up/startup/file mounts changes

The auto-scaler creates new nodes in response to the following events[13]:

  • Cluster creation with the min-nodes configuration. In this case, the autoscaler creates the required amount of nodes.

  • Resource demands. For remote functions with the resource requirements, the autoscaler checks whether a cluster can satisfy additional resource requirements and, if not, creates a new worker node(s).

  • Placement groups. Similar to resource demand, for new placement groups the autoscaler checks whether the cluster has enough resources and, if not, creates new worker node(s).

  • SDK request_resources function call is similar to the cluster creation request, but these resources are never released for the life of the cluster.

Ray’s autoscaler works with different node/computer types, which can map to different physical instance type (for example different AWS node types), or accelerators (for example GPU).

Placement Groups - Organizing your tasks & actors

Ray applications use placement groups to organize tasks as well as pre-allocate resources. Organizing tasks is sometimes important for re-using resources and increased data locality.

Tip

Ray is using node-based data storage, so running multiple functions with large data exchange on the same node leads to data locality and thus can often improve overall execution performance

Data locality can reduce the amount of data to be transferred, and is based on the idea it’s often faster to serialize a function than your data.[14] On the flip side it can also be used to minimize impact of hardware failure by ensuring work is spread out on many computers. Pre-allocating resources can speed up your work by allowing the auto-scaler to request multiple machines before they are needed.

When you start a remote function/actor Ray may need to start an additional node to meet the resource needs, which delays the function/actor creation. If you try to create several large functions/actors in series, Ray creates the workers sequentially, which slows down your job even more. You can force parallel allocation with Ray’s placement groups, which often reduces resources waiting time.

Tip

Ray creates placement groups atomically, so if you have a minimum number of resources required before your task can run, you can also use placement groups for this effect. Note though that placement groups can experience partial restarts.

You can use placement groups for a few different purposes:

  • Pre-allocating resources

  • Gang Scheduling - ensuring that all tasks/actors will be scheduled and start at the same time.

  • Organizing your tasks and actors inside your cluster:

    • Maximizing data locality - ensuring placing off all your tasks and actors close to your data to avoid object transfer overheads.

    • Load balancing - improving application availability by placing your actors or tasks into different physical machines as much as possible.

Placement groups consist of both the desired resources for each worker, as well as the placement strategy.

Since a placement group can span multiple workers, you must specify the desired resources (or resource bundle) for each worker. Each group of resources for a worker is known as a resource bundle and must be able to fit inside a single machine, otherwise the autoscaler will be unable to create the node types and the placement group will never be scheduled.

Placement groups are collections of resource bundles, where a resource bundle is a collection of resources (CPU, GPU, etc.). You define the resource bundles with the same arguments. Each resource bundle must fit on a single machine.

You control how Ray schedules your resource group by setting placement strategies. Your placement strategy can either try and reduce the number of nodes (improving locality), or spreading the work out more (improving reliability and load balancing). You have a few variations on these core strategies to choose from:

  • STRICT_PACK - all bundles must be placed into a single node on the cluster.

  • PACK - all provided bundles are packed onto a single node on a best-effort basis. If strict packing is not feasible, bundles can be placed onto other nodes. This is the default placement group strategy.

  • STRICT_SPREAD - each bundle must be scheduled in a separate node.

  • SPREAD - Each bundle will be spread onto separate nodes on a best effort basis. If strict spreading is not feasible, some bundles can be collocated on nodes.

Tip

Multiple remote functions or actors can be in the same resource bundle. Any functions/actors using the same bundle will always be on the same node.

The lifecycle of placement groups has the following stages:

  • Creation: placement group creation request is sent to the Global Control Structure (GCS) that calculates how to distribute the bundles and sends resource reservation requests to all the nodes. Ray guarantees placement groups are placed atomically.

  • Allocation: placement groups are pending creation. If existing Ray nodes can satisfy resource requirements for a given strategy, then placing groups are allocated and success is returned. Otherwise, the result depends on whether Ray is able to add nodes. If the autoscaler is not present or the node limit is reached, then placement group allocation fails and the error is returned, otherwise the autoscaler scales the cluster to ensure pending groups can be allocated.

  • Node’s failure: when worker nodes[15] that contain some bundles of a placement group die, all the bundles will be rescheduled on different nodes by GCS. This means that the placement group creation atomicity only applies to initial placement creation. Once a placement group is created, it can become partial due to node failures.

  • Cleanup: Ray automatically removes placement groups when the job that created the placement group is finished. If you’d like to keep the placement group alive regardless of the job that created it, you should specify lifetime=”detached” during placement group creation. You can also explicitly free a placement group at any time by calling remove_placement_group.

To make a placement group you will need a few extra imports, shown in Placement group imports. If you are working with Ray in local mode, it’s harder to see the effect of placement groups since there is only one node. You can still create CPU only bundles together into a placement group. Once you’ve created the placement group you can use options to run a function or actor in a specific bundle as shown in CPU Only Placement group.

Example 7. Placement group imports
link:examples/ray_examples/concepts/Ray-Concepts.py[role=include]
Example 8. CPU Only Placement group
link:examples/ray_examples/concepts/Ray-Concepts.py[role=include]

If you are running Ray on a cluster you can create a more complex resource group. If you have some GPU nodes in your cluster, you can create more complex placement groups. When we run a Mixed CPU and GPU Placement group on our test cluster, the auto scaler allocates a node with a GPU. Once you’re finished with your placement group you can delete it with remove_placement_group(pg).

Example 9. Mixed CPU and GPU Placement group
link:examples/ray_examples/concepts/Ray-Concepts-Remote.py[role=include]
Ray Scheduler

Ray uses a Bottom-Up Distributed Scheduler, which consists of a global scheduler and per-node local schedulers. On task creation, the task is always first submitted to the node’s local scheduler, which encourages task locality. If the local node is overloaded (its local task queue exceeds a predefined threshold) or it cannot satisfy the task’s requirements (for example, lacks GPU), the local scheduler calls the global scheduler to take over.

The global scheduler first identifies the set of nodes that have enough resources to satisfy the task’s requirements and then selects the one that provides the lowest estimated waiting time - a sum of the estimated time the task will be queued at that node (task queue size times average task execution), and the estimated transfer time of the task’s remote inputs (total size of remote inputs divided by average bandwidth).

Each worker sends a periodic heartbeat with resource availability and queue depth to the global scheduler. The global scheduler also has access to the location of the task’s inputs and their sizes from the GCS when deciding where to schedule. Once the global scheduler picks the node, it calls the node’s local scheduler, which schedules the task.

Additional improvements to this basic algorithm are described here and include:

  • Parallel retrieval of task arguments

  • Pre-emptive local object handling - if the object is used locally it is available even before it is available in the Global Control store

  • Taking into account node resource imbalances for the global scheduler

  • Dependency aware task scheduling

You can assign placement groups names. You can achieve this by specifying a parameter name="desired_name" at the point of placement group creation. This allows you to retrieve and use the placement group from any job in the Ray cluster by name rather than passing a placement group handle.

Namespaces

A namespace is a logical grouping of jobs and actors that provides limited isolation. By default, each Ray program runs in its own anonymous namespace. The anonymous namespace can not be accessed from another Ray program. To share actors between your Ray applications, you’ll need to put both of your programs in the same namespace. When constructing your Ray context with ray.init, just add the namespace named parameter, e.g. ray.init(namespace="timbit").

Note

Namespaces are not intended to provide security isolation.

You can get the current namespace by calling ray.get_runtime_context().namespace.

Managing Dependencies with Runtime Environments

One of the big draws of Python is the amazing ecosystem of tools available. Ray supports managing dependencies with both conda and virtualenv. Ray dynamically creates these virtual environments inside of your larger container as needed and launches workers using the matching environment.

The fastest way to add a "few" packages to your runtime context is by specifying a list of packages from the Python Package Index (PyPI) you need. Looking at the web scraping example from [ch02] where you used the beautiful soup library, you can ensure this package is available in a distributed environment by creating an execution context with it as shown in pip package list.

Example 10. pip package list
link:examples/ray_examples/concepts/Ray-Concepts.py[role=include]

This works well for a few dependencies, but if you’ve got a requirements.txt like Holden’s print the world project, you can also just point this to your local requirements.txt as shown in pip package requirements file.

Example 11. pip package requirements file
link:examples/ray_examples/concepts/Ray-Concepts.py[role=include]
Tip

If you have an even more complex setup using Conda, you can make a runtime context by passing the path to your Conda environment file or package list with conda= instead of pip=.

Once you’ve created a runtime context you can specify it either globally when creating your ray client, like in Using a runtime environment for an entire program., or inside of the ray.remote decorator Using a runtime environment for a specific function..

Example 12. Using a runtime environment for an entire program.
link:examples/ray_examples/concepts/Ray-Concepts.py[role=include]
Example 13. Using a runtime environment for a specific function.
link:examples/ray_examples/concepts/Ray-Concepts.py[role=include]
Warning

Not all dependencies are well suited to the dynamically created execution context. Anything involving large native code compilation without a pre-existing wheel takes too long, for example Tensorflow on ARM.

Adding certain packages to a runtime execution context can result in a slower start & scale-up. Think of, for example, how long it takes to install Tensorflow without a wheel. If Ray had to do that for each time it started another worker, this would be much slower. You can solve this by creating conda environments in your cluster or container. We discuss how to do this in [appB].

Deploying Ray Applications with the Ray Job API

In addition to connecting your job to an existing cluster with ray.init, Ray offers a job API. The job API provides a lightweight mechanism to submit jobs without having to worry about library mismatches and avoids the issue of flaky networks between the remote cluster and the head node. The three main methods of the job API you will use are:

  • Submit a new job to the cluster, returning a job ID.

  • Get a job’s status based on execution ID which returns back status of submitted job.

  • See the execution logs for a job for an execution ID

Why another API?

Although you can theoretically attach your program to the existing Ray cluster using the Ray client, it often does not always work, especially when Ray is deployed on a Kubernetes cluster. The issue here is that Ray node’s gRPC interface is using insecure gRPC, which is not supported by the majority of Kubernetes Ingress implementations. To overcome this issue, Ray has introduced a new Ray job SDK, using HTTP instead of gRPC. The job API is on port 8625 and [appa_deploying].

A job request consists of:

  1. a directory containing a collection of files and configurations that defines an application,

  2. an entrypoint for the execution

  3. A runtime Environments consisting of any needed files, Python libraries, and environment variables, etc.

The code below shows you how to[16] run your code on a Ray cluster with the job API. Below is the example of the Ray code that we want to submit to the cluster:

class ParseKwargs(argparse.Action):
   def __call__(self, parser, namespace, values, option_string=None):
       setattr(namespace, self.dest, dict())
       for value in values:
           key, value = value.split('=')
           getattr(namespace, self.dest)[key] = value

parser = argparse.ArgumentParser()
parser.add_argument('-k', '--kwargs', nargs='*', action=ParseKwargs)
args = parser.parse_args()

numberOfIterations = int(args.kwargs["iterations"])
print(f"Requested number of iterations is: {numberOfIterations}")

print(f'Environment variable MY_VARIABLE has a value of {os.getenv("MY_VARIABLE")}')

ray.init()

@ray.remote
class Counter:
   def __init__(self):
       self.counter = 0

   def inc(self):
       self.counter += 1

   def get_counter(self):
       return self.counter

counter = Counter.remote()

for _ in range(numberOfIterations):
   ray.get(counter.inc.remote())
   print(ray.get(counter.get_counter.remote()))

print("Requests", requests.__version__)
print("Qiskit", qiskit.__version__)

In addition to the Ray code itself, the example above shows several additional things:

  • The way to get variables that can be used during job submission

  • Accessing environment variables that can be set during job submission

  • Getting version of libraries, that are installed during job submission

With this in place you can now submit your job to the Ray cluster as follows:

client = JobSubmissionClient("<your Ray URL>")

job_id = client.submit_job(
   # Entrypoint shell command to execute
   entrypoint="python script_with_parameters.py --kwargs iterations=7",
   # Working dir
   runtime_env={
       "working_dir": ".",
       "pip": ["requests==2.26.0", "qiskit==0.34.2"],
       "env_vars": {"MY_VARIABLE": "foo"}
   }
)

print(f"Submitted job with ID : {job_id}")

while True:
   status = client.get_job_status(job_id)
   print(f"status: {status}")
   if status in {JobStatus.SUCCEEDED, JobStatus.STOPPED, JobStatus.FAILED}:
       break
   time.sleep(5)

logs = client.get_job_logs(job_id)
print(f"logs: {logs}")

Conclusion

In this chapter, you’ve gained a deeper understanding of how Ray works. Your knoweledge of serialization will help you understand which work to distribute and which to keep in the same process. You now know your different options and how to choose the right scaling technique. You have a few different techniques to manage Python dependencies, even conflicting ones, on your Ray cluster. You are well set up to learn about the higher-level building blocks covered in the next part of the book.


1. Some distributed systems can survive failure of head nodes, these involve using systems such as Zookeeper and algorithms like Paxos or Raft that use multiple computers to monitor and restart jobs with a voting system. If you need to handle head node failure, you can write your own recovery logic, but this is very complicated to do right, instead a system like Spark, which has integrated job restarts, may be a better option. Even Spark
2. Pubsub systems allow processes to subscribe to updates by categories
3. Like detached actors and detached placement groups.
4. This can prevent your cluster from scaling down as Ray will not release the resources.
5. This recomputation is done using information in the global control store.
6. This case is distinct since the object may be stored only on nodes different from the owner.
7. The same algorithm as Python.
8. Same cycle problem as Python.
9. These processes can be on the same node or different ones
10. Specifying a memory requirement does NOT impose any limits on memory usage. The requirements are used for admission control during scheduling only (similar to how CPU scheduling works in Ray). It is up to the task itself to not use more memory than it requested.
11. Note that Ray itself does not impose any limits on the memory usage. It is up to the task itself to not use more memory than it requested.
12. for more detailed information on autoscaler refer to this excellent video
13. For more information on the worker nodes creation for different platforms refer to this documentation.
14. Systems before Ray, like Spark and Hadoop, take advantage of data locality.
15. Ray’s cluster head node is a single point of failure, so if it fails, the whole cluster will fail, as mentioned in Fault Tolerance.
16. The complete code is in github