-
Notifications
You must be signed in to change notification settings - Fork 653
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
FEAT-#7202: Use custom resources for Ray #7205
Changes from all commits
aa839bd
c3dc911
3b9e665
6bb9847
c763161
36f5d7b
ee077b3
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -32,6 +32,7 @@ | |
from ray._private.services import get_node_ip_address | ||
from ray.util.client.common import ClientObjectRef | ||
|
||
from modin.config import RayTaskCustomResources | ||
from modin.core.execution.ray.common import MaterializationHook, RayWrapper | ||
from modin.logging import get_logger | ||
|
||
|
@@ -155,9 +156,9 @@ def exec( | |
and self.flat_kwargs | ||
and self.num_returns == 1 | ||
): | ||
result, length, width, ip = remote_exec_func.remote( | ||
self.func, self.data, *self.args, **self.kwargs | ||
) | ||
result, length, width, ip = remote_exec_func.options( | ||
resources=RayTaskCustomResources.get() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I wonder if we should just call it There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I would prefer to be explicit here as Ray itself calls it as custom resources - https://docs.ray.io/en/latest/ray-core/scheduling/resources.html#custom-resources. |
||
).remote(self.func, self.data, *self.args, **self.kwargs) | ||
meta = MetaList([length, width, ip]) | ||
self._set_result(result, meta, 0) | ||
return result, meta, 0 | ||
|
@@ -435,11 +436,13 @@ def _remote_exec_chain(num_returns: int, *args: Tuple) -> List[Any]: | |
# Prefer _remote_exec_single_chain(). It has fewer arguments and | ||
# does not require the num_returns to be specified in options. | ||
if num_returns == 2: | ||
return _remote_exec_single_chain.remote(*args) | ||
return _remote_exec_single_chain.options( | ||
resources=RayTaskCustomResources.get() | ||
).remote(*args) | ||
else: | ||
return _remote_exec_multi_chain.options(num_returns=num_returns).remote( | ||
num_returns, *args | ||
) | ||
return _remote_exec_multi_chain.options( | ||
num_returns=num_returns, resources=RayTaskCustomResources.get() | ||
).remote(num_returns, *args) | ||
|
||
def _set_result( | ||
self, | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -31,6 +31,7 @@ | |
IsRayCluster, | ||
Memory, | ||
NPartitions, | ||
RayInitCustomResources, | ||
RayRedisAddress, | ||
RayRedisPassword, | ||
StorageFormat, | ||
|
@@ -126,6 +127,7 @@ def initialize_ray( | |
"object_store_memory": object_store_memory, | ||
"_redis_password": redis_password, | ||
"_memory": object_store_memory, | ||
"resources": RayInitCustomResources.get(), | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we add a test for this case? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What exactly would you like to test with this? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We do not test the situation when this config is different from |
||
**extra_init_kw, | ||
} | ||
# It should be enough to simply set the required variables for the main process | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's good that there is now an option to limit concurrency, but this only works for Ray. Let's create an issue for the rest of the engines.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This config is not generic but only specific to Ray. I am not sure if there is a way to limit concurrency for other engines. I think if we will want to have something similar for other engines, we will open an issue and explore options if they are there. Do you still think we should create an issue now?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Limiting concurrency in context looks like a good feature for an advanced user. We can create a low priority issue, but now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
#7211