Skip to content

Commit

Permalink
Attempt to use Ray cluster
Browse files Browse the repository at this point in the history
  • Loading branch information
ZmeiGorynych committed Dec 18, 2024
1 parent d01c6fd commit 5dae2df
Show file tree
Hide file tree
Showing 7 changed files with 325 additions and 8 deletions.
11 changes: 8 additions & 3 deletions causaltune/optimiser.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,6 @@
from causaltune.dataset_processor import CausalityDatasetProcessor
from causaltune.models.passthrough import feature_filter

# tune.run = run


# Patched from sklearn.linear_model._base to adjust rtol and atol values
def _check_precomputed_gram_matrix(X, precompute, X_offset, X_scale, rtol=1e-4, atol=1e-2):
Expand Down Expand Up @@ -489,10 +487,12 @@ def fit(
self._tune_with_config,
search_space,
metric=self.metric,
# use_ray=self.use_ray,
cost_attr="evaluation_cost",
points_to_evaluate=(init_cfg if len(self.resume_cfg) == 0 else self.resume_cfg),
evaluated_rewards=([] if len(self.resume_scores) == 0 else self.resume_scores),
mode=("min" if self.metric in metrics_to_minimize() else "max"),
# resources_per_trial= {"cpu": 1} if self.use_ray else None,
low_cost_partial_config={},
**self._settings["tuner"],
)
Expand Down Expand Up @@ -544,7 +544,12 @@ def _tune_with_config(self, config: dict) -> dict:
"""
from causaltune.remote import remote_exec

estimates = remote_exec(CausalTune._estimate_effect, (self, config), self.use_ray)
if self.use_ray:
# flaml.tune handles the interaction with Ray itself
# estimates = self._estimate_effect(config)
estimates = remote_exec(CausalTune._estimate_effect, (self, config), self.use_ray)
else:
estimates = remote_exec(CausalTune._estimate_effect, (self, config), self.use_ray)

# Parallel(n_jobs=2, backend="threading")(
# delayed(self._estimate_effect)(config) for i in range(1)
Expand Down
197 changes: 197 additions & 0 deletions notebooks/RunExperiments/cluster_config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,197 @@



# An unique identifier for the head node and workers of this cluster.
cluster_name: default

# The maximum number of workers nodes to launch in addition to the head
# node.
max_workers: 2

# The autoscaler will scale up the cluster faster with higher upscaling speed.
# E.g., if the task requires adding more nodes then autoscaler will gradually
# scale up the cluster in chunks of upscaling_speed*currently_running_nodes.
# This number should be > 0.
upscaling_speed: 1.0

# This executes all commands on all nodes in the docker container,
# and opens all the necessary ports to support the Ray cluster.
# Empty string means disabled.
docker:
image: "rayproject/ray:2.40.0-py310-cpu" # You can change this to latest-cpu if you don't need GPU support and want a faster startup
# image: rayproject/ray:latest-gpu # use this one if you don't need ML dependencies, it's faster to pull
container_name: "ray_container"
# If true, pulls latest version of image. Otherwise, `docker run` will only pull the image
# if no cached version is present.
pull_before_run: True
run_options: # Extra options to pass into "docker run"
- --ulimit nofile=65536:65536

# Example of running a GPU head with CPU workers
# head_image: "rayproject/ray-ml:latest-gpu"
# Allow Ray to automatically detect GPUs

# worker_image: "rayproject/ray-ml:latest-cpu"
# worker_run_options: []

# If a node is idle for this many minutes, it will be removed.
idle_timeout_minutes: 5

# Cloud-provider specific configuration.
provider:
type: gcp
region: us-west1
availability_zone: us-west1-a
project_id: motleys # Globally unique project id

# How Ray will authenticate with newly launched nodes.
auth:
ssh_user: ubuntu
# ssh_private_key: "C:/Users/Egor/.ssh/ray-gcp-key"
# By default Ray creates a new private keypair, but you can also use your own.
# If you do so, make sure to also set "KeyName" in the head and worker node
# configurations below. This requires that you have added the key into the
# project wide meta-data.
# ssh_private_key: /path/to/your/key.pem

# Tell the autoscaler the allowed node types and the resources they provide.
# The key is the name of the node type, which is just for debugging purposes.
# The node config specifies the launch config and physical instance type.
available_node_types:
ray_head_default:
# The resources provided by this node type.
resources: {"CPU": 2}
# Provider-specific config for the head node, e.g. instance type. By default
# Ray will auto-configure unspecified fields such as subnets and ssh-keys.
# For more documentation on available fields, see:
# https://cloud.google.com/compute/docs/reference/rest/v1/instances/insert
node_config:
machineType: n1-standard-2
disks:
- boot: true
autoDelete: true
type: PERSISTENT
initializeParams:
diskSizeGb: 50
# See https://cloud.google.com/compute/docs/images for more images
sourceImage: projects/deeplearning-platform-release/global/images/common-cpu-v20240922

# Additional options can be found in in the compute docs at
# https://cloud.google.com/compute/docs/reference/rest/v1/instances/insert

# If the network interface is specified as below in both head and worker
# nodes, the manual network config is used. Otherwise an existing subnet is
# used. To use a shared subnet, ask the subnet owner to grant permission
# for 'compute.subnetworks.use' to the ray autoscaler account...
# networkInterfaces:
# - kind: compute#networkInterface
# subnetwork: path/to/subnet
# aliasIpRanges: []
ray_worker_small:
# The minimum number of worker nodes of this type to launch.
# This number should be >= 0.
min_workers: 1
# The maximum number of worker nodes of this type to launch.
# This takes precedence over min_workers.
max_workers: 5
# The resources provided by this node type.
resources: {"CPU": 2}
# Provider-specific config for the head node, e.g. instance type. By default
# Ray will auto-configure unspecified fields such as subnets and ssh-keys.
# For more documentation on available fields, see:
# https://cloud.google.com/compute/docs/reference/rest/v1/instances/insert
node_config:
machineType: n1-standard-2
disks:
- boot: true
autoDelete: true
type: PERSISTENT
initializeParams:
diskSizeGb: 50
# See https://cloud.google.com/compute/docs/images for more images
sourceImage: projects/deeplearning-platform-release/global/images/common-cpu-v20240922
# Run workers on preemtible instance by default.
# Comment this out to use on-demand.
scheduling:
- preemptible: true
# Un-Comment this to launch workers with the Service Account of the Head Node
# serviceAccounts:
# - email: ray-autoscaler-sa-v1@<project_id>.iam.gserviceaccount.com
# scopes:
# - https://www.googleapis.com/auth/cloud-platform

# Additional options can be found in in the compute docs at
# https://cloud.google.com/compute/docs/reference/rest/v1/instances/insert

# Specify the node type of the head node (as configured above).
head_node_type: ray_head_default

# Files or directories to copy to the head and worker nodes. The format is a
# dictionary from REMOTE_PATH: LOCAL_PATH, e.g.
file_mounts: {
# "/path1/on/remote/machine": "/path1/on/local/machine",
# "/path2/on/remote/machine": "/path2/on/local/machine",
}

# Files or directories to copy from the head node to the worker nodes. The format is a
# list of paths. The same path on the head node will be copied to the worker node.
# This behavior is a subset of the file_mounts behavior. In the vast majority of cases
# you should just use file_mounts. Only use this if you know what you're doing!
cluster_synced_files: []

# Whether changes to directories in file_mounts or cluster_synced_files in the head node
# should sync to the worker node continuously
file_mounts_sync_continuously: False

# Patterns for files to exclude when running rsync up or rsync down
rsync_exclude:
- "**/.git"
- "**/.git/**"

# Pattern files to use for filtering out files when running rsync up or rsync down. The file is searched for
# in the source directory and recursively through all subdirectories. For example, if .gitignore is provided
# as a value, the behavior will match git's behavior for finding and using .gitignore files.
rsync_filter:
- ".gitignore"

# List of commands that will be run before `setup_commands`. If docker is
# enabled, these commands will run outside the container and before docker
# is setup.
initialization_commands: []

# List of shell commands to run to set up nodes.
setup_commands:
- pip install causaltune catboost

# Note: if you're developing Ray, you probably want to create a Docker image that
# has your Ray repo pre-cloned. Then, you can replace the pip installs
# below with a git checkout <your_sha> (and possibly a recompile).
# To run the nightly version of ray (as opposed to the latest), either use a rayproject docker image
# that has the "nightly" (e.g. "rayproject/ray-ml:nightly-gpu") or uncomment the following line:
# - pip install -U "ray[default] @ https://s3-us-west-2.amazonaws.com/ray-wheels/latest/ray-3.0.0.dev0-cp37-cp37m-manylinux2014_x86_64.whl"


# Custom commands that will be run on the head node after common setup.
head_setup_commands:
- pip install google-api-python-client==1.7.8

# Custom commands that will be run on worker nodes after common setup.
worker_setup_commands: []

# Command to start ray on the head node. You don't need to change this.
head_start_ray_commands:
- ray stop
- >-
ray start
--head
--port=6379
--object-manager-port=8076
--autoscaling-config=~/ray_bootstrap_config.yaml
# Command to start ray on worker nodes. You don't need to change this.
worker_start_ray_commands:
- ray stop
- >-
ray start
--address=$RAY_HEAD_IP:6379
--object-manager-port=8076
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
warnings.filterwarnings("ignore")

# Ensure CausalTune is in the Python path
root_path = os.path.realpath("../../../..")
root_path = os.path.realpath("../../../../..")
sys.path.append(os.path.join(root_path, "causaltune")) # noqa: E402

# Import CausalTune and other custom modules after setting up the path
Expand Down Expand Up @@ -86,7 +86,7 @@ def get_estimator_list(dataset_name):
return [est for est in estimator_list if "Dummy" not in est]


def run_experiment(args, use_ray: bool = False):
def run_experiment(args, dataset_path: str, use_ray: bool = False):
# Process datasets
data_sets = {}
for dataset in args.datasets:
Expand All @@ -97,7 +97,7 @@ def run_experiment(args, use_ray: bool = False):
)
size = parts[0]
name = " ".join(parts[1:])
file_path = f"RunDatasets/{size}/{name}.pkl"
file_path = f"{dataset_path}/{size}/{name}.pkl"
data_sets[f"{size} {name}"] = load_dataset(file_path)

if args.timestamp_in_dirname:
Expand All @@ -107,7 +107,7 @@ def run_experiment(args, use_ray: bool = False):
out_dir = f"EXPERIMENT_RESULTS_{args.identifier}"

os.makedirs(out_dir, exist_ok=True)
out_dir = os.path.join(out_dir, size)
out_dir = os.path.realpath(os.path.join(out_dir, size))
os.makedirs(out_dir, exist_ok=True)

print(f"Loaded datasets: {list(data_sets.keys())}")
Expand Down Expand Up @@ -460,6 +460,35 @@ def plot_mse_grid(title):
plot_mse_grid("Experiment Results")


def run_batch(
identifier: str,
kind: str,
metrics: List[str],
dataset_path: str,
):
args = parse_arguments()
args.identifier = identifier
args.metrics = metrics
# run_experiment assumes we don't mix large and small datasets in the same call
args.datasets = [f"Large Linear_{kind}", f"Large NonLinear_{kind}"]
args.num_samples = 100
args.timestamp_in_dirname = False
args.outcome_model = "auto" # or use "nested" for the old-style nested model

# os.environ["RAY_ADDRESS"] = "ray://127.0.0.1:8265"

use_ray = True
if use_ray:
import ray

# Assuming we port-mapped already by running ray dashboard
ray.init(
"ray://localhost:10001", runtime_env={"pip": ["causaltune", "catboost"]}
) # "34.82.184.148:6379"
out_dir = run_experiment(args, dataset_path=dataset_path, use_ray=use_ray)
return out_dir


if __name__ == "__main__":

args = parse_arguments()
Expand All @@ -476,7 +505,7 @@ def plot_mse_grid(title):
import ray

ray.init()
out_dir = run_experiment(args, use_ray=use_ray)
out_dir = run_experiment(args, dataset_path="../RunDatasets", use_ray=use_ray)

# plot results
upper_bounds = {"MSE": 1e2, "policy_risk": 0.2}
Expand Down
14 changes: 14 additions & 0 deletions notebooks/RunExperiments/runners/iv.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
import os

from experiment_runner import run_batch, generate_plots

identifier = "Egor_test"
kind = "IV"
metrics = ["energy_distance", "frobenius_norm", "codec"]

out_dir = run_batch(identifier, kind, metrics, dataset_path=os.path.realpath("../RunDatasets"))
# plot results
# upper_bounds = {"MSE": 1e2, "policy_risk": 0.2}
# lower_bounds = {"erupt": 0.06, "bite": 0.75}
generate_plots(os.path.join(out_dir, kind)) # , upper_bounds, lower_bounds)
print("yay!")
24 changes: 24 additions & 0 deletions notebooks/RunExperiments/runners/kc.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import os

from experiment_runner import run_batch, generate_plots

identifier = "Egor_test"
kind = "KC"
metrics = [
"erupt",
# "greedy_erupt", # regular erupt was made probabilistic,
"policy_risk", # NEW
"qini",
"auc",
"psw_energy_distance",
"frobenius_norm", # NEW
"codec", # NEW
"bite", # NEW
]

out_dir = run_batch(identifier, kind, metrics, dataset_path=os.path.realpath("../RunDatasets"))
# plot results
# upper_bounds = {"MSE": 1e2, "policy_risk": 0.2}
# lower_bounds = {"erupt": 0.06, "bite": 0.75}
generate_plots(os.path.join(out_dir, kind)) # , upper_bounds, lower_bounds)
print("yay!")
24 changes: 24 additions & 0 deletions notebooks/RunExperiments/runners/kckp.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import os

from experiment_runner import run_batch, generate_plots

identifier = "Egor_test"
kind = "KCKP"
metrics = [
"erupt",
# "greedy_erupt", # regular erupt was made probabilistic,
"policy_risk", # NEW
"qini",
"auc",
"psw_energy_distance",
"frobenius_norm", # NEW
"codec", # NEW
"bite", # NEW
]

out_dir = run_batch(identifier, kind, metrics, dataset_path=os.path.realpath("../RunDatasets"))
# plot results
# upper_bounds = {"MSE": 1e2, "policy_risk": 0.2}
# lower_bounds = {"erupt": 0.06, "bite": 0.75}
generate_plots(os.path.join(out_dir, kind)) # , upper_bounds, lower_bounds)
print("yay!")
Loading

0 comments on commit 5dae2df

Please sign in to comment.