From 5dae2dfad96ccdd05ca0ecc191bdfee06edd2330 Mon Sep 17 00:00:00 2001 From: Egor Kraev Date: Wed, 18 Dec 2024 10:16:24 +0100 Subject: [PATCH] Attempt to use Ray cluster --- causaltune/optimiser.py | 11 +- notebooks/RunExperiments/cluster_config.yaml | 197 ++++++++++++++++++ .../{ => runners}/experiment_runner.py | 39 +++- notebooks/RunExperiments/runners/iv.py | 14 ++ notebooks/RunExperiments/runners/kc.py | 24 +++ notebooks/RunExperiments/runners/kckp.py | 24 +++ notebooks/RunExperiments/runners/rct.py | 24 +++ 7 files changed, 325 insertions(+), 8 deletions(-) create mode 100644 notebooks/RunExperiments/cluster_config.yaml rename notebooks/RunExperiments/{ => runners}/experiment_runner.py (93%) create mode 100644 notebooks/RunExperiments/runners/iv.py create mode 100644 notebooks/RunExperiments/runners/kc.py create mode 100644 notebooks/RunExperiments/runners/kckp.py create mode 100644 notebooks/RunExperiments/runners/rct.py diff --git a/causaltune/optimiser.py b/causaltune/optimiser.py index f2b37e1..ff5f037 100644 --- a/causaltune/optimiser.py +++ b/causaltune/optimiser.py @@ -32,8 +32,6 @@ from causaltune.dataset_processor import CausalityDatasetProcessor from causaltune.models.passthrough import feature_filter -# tune.run = run - # Patched from sklearn.linear_model._base to adjust rtol and atol values def _check_precomputed_gram_matrix(X, precompute, X_offset, X_scale, rtol=1e-4, atol=1e-2): @@ -489,10 +487,12 @@ def fit( self._tune_with_config, search_space, metric=self.metric, + # use_ray=self.use_ray, cost_attr="evaluation_cost", points_to_evaluate=(init_cfg if len(self.resume_cfg) == 0 else self.resume_cfg), evaluated_rewards=([] if len(self.resume_scores) == 0 else self.resume_scores), mode=("min" if self.metric in metrics_to_minimize() else "max"), + # resources_per_trial= {"cpu": 1} if self.use_ray else None, low_cost_partial_config={}, **self._settings["tuner"], ) @@ -544,7 +544,12 @@ def _tune_with_config(self, config: dict) -> dict: """ from causaltune.remote import remote_exec - estimates = remote_exec(CausalTune._estimate_effect, (self, config), self.use_ray) + if self.use_ray: + # flaml.tune handles the interaction with Ray itself + # estimates = self._estimate_effect(config) + estimates = remote_exec(CausalTune._estimate_effect, (self, config), self.use_ray) + else: + estimates = remote_exec(CausalTune._estimate_effect, (self, config), self.use_ray) # Parallel(n_jobs=2, backend="threading")( # delayed(self._estimate_effect)(config) for i in range(1) diff --git a/notebooks/RunExperiments/cluster_config.yaml b/notebooks/RunExperiments/cluster_config.yaml new file mode 100644 index 0000000..9e4dbbf --- /dev/null +++ b/notebooks/RunExperiments/cluster_config.yaml @@ -0,0 +1,197 @@ + + + +# An unique identifier for the head node and workers of this cluster. +cluster_name: default + +# The maximum number of workers nodes to launch in addition to the head +# node. +max_workers: 2 + +# The autoscaler will scale up the cluster faster with higher upscaling speed. +# E.g., if the task requires adding more nodes then autoscaler will gradually +# scale up the cluster in chunks of upscaling_speed*currently_running_nodes. +# This number should be > 0. +upscaling_speed: 1.0 + +# This executes all commands on all nodes in the docker container, +# and opens all the necessary ports to support the Ray cluster. +# Empty string means disabled. +docker: + image: "rayproject/ray:2.40.0-py310-cpu" # You can change this to latest-cpu if you don't need GPU support and want a faster startup + # image: rayproject/ray:latest-gpu # use this one if you don't need ML dependencies, it's faster to pull + container_name: "ray_container" + # If true, pulls latest version of image. Otherwise, `docker run` will only pull the image + # if no cached version is present. + pull_before_run: True + run_options: # Extra options to pass into "docker run" + - --ulimit nofile=65536:65536 + + # Example of running a GPU head with CPU workers + # head_image: "rayproject/ray-ml:latest-gpu" + # Allow Ray to automatically detect GPUs + + # worker_image: "rayproject/ray-ml:latest-cpu" + # worker_run_options: [] + +# If a node is idle for this many minutes, it will be removed. +idle_timeout_minutes: 5 + +# Cloud-provider specific configuration. +provider: + type: gcp + region: us-west1 + availability_zone: us-west1-a + project_id: motleys # Globally unique project id + +# How Ray will authenticate with newly launched nodes. +auth: + ssh_user: ubuntu +# ssh_private_key: "C:/Users/Egor/.ssh/ray-gcp-key" +# By default Ray creates a new private keypair, but you can also use your own. +# If you do so, make sure to also set "KeyName" in the head and worker node +# configurations below. This requires that you have added the key into the +# project wide meta-data. +# ssh_private_key: /path/to/your/key.pem + +# Tell the autoscaler the allowed node types and the resources they provide. +# The key is the name of the node type, which is just for debugging purposes. +# The node config specifies the launch config and physical instance type. +available_node_types: + ray_head_default: + # The resources provided by this node type. + resources: {"CPU": 2} + # Provider-specific config for the head node, e.g. instance type. By default + # Ray will auto-configure unspecified fields such as subnets and ssh-keys. + # For more documentation on available fields, see: + # https://cloud.google.com/compute/docs/reference/rest/v1/instances/insert + node_config: + machineType: n1-standard-2 + disks: + - boot: true + autoDelete: true + type: PERSISTENT + initializeParams: + diskSizeGb: 50 + # See https://cloud.google.com/compute/docs/images for more images + sourceImage: projects/deeplearning-platform-release/global/images/common-cpu-v20240922 + + # Additional options can be found in in the compute docs at + # https://cloud.google.com/compute/docs/reference/rest/v1/instances/insert + + # If the network interface is specified as below in both head and worker + # nodes, the manual network config is used. Otherwise an existing subnet is + # used. To use a shared subnet, ask the subnet owner to grant permission + # for 'compute.subnetworks.use' to the ray autoscaler account... + # networkInterfaces: + # - kind: compute#networkInterface + # subnetwork: path/to/subnet + # aliasIpRanges: [] + ray_worker_small: + # The minimum number of worker nodes of this type to launch. + # This number should be >= 0. + min_workers: 1 + # The maximum number of worker nodes of this type to launch. + # This takes precedence over min_workers. + max_workers: 5 + # The resources provided by this node type. + resources: {"CPU": 2} + # Provider-specific config for the head node, e.g. instance type. By default + # Ray will auto-configure unspecified fields such as subnets and ssh-keys. + # For more documentation on available fields, see: + # https://cloud.google.com/compute/docs/reference/rest/v1/instances/insert + node_config: + machineType: n1-standard-2 + disks: + - boot: true + autoDelete: true + type: PERSISTENT + initializeParams: + diskSizeGb: 50 + # See https://cloud.google.com/compute/docs/images for more images + sourceImage: projects/deeplearning-platform-release/global/images/common-cpu-v20240922 + # Run workers on preemtible instance by default. + # Comment this out to use on-demand. + scheduling: + - preemptible: true + # Un-Comment this to launch workers with the Service Account of the Head Node + # serviceAccounts: + # - email: ray-autoscaler-sa-v1@.iam.gserviceaccount.com + # scopes: + # - https://www.googleapis.com/auth/cloud-platform + + # Additional options can be found in in the compute docs at + # https://cloud.google.com/compute/docs/reference/rest/v1/instances/insert + +# Specify the node type of the head node (as configured above). +head_node_type: ray_head_default + +# Files or directories to copy to the head and worker nodes. The format is a +# dictionary from REMOTE_PATH: LOCAL_PATH, e.g. +file_mounts: { +# "/path1/on/remote/machine": "/path1/on/local/machine", +# "/path2/on/remote/machine": "/path2/on/local/machine", +} + +# Files or directories to copy from the head node to the worker nodes. The format is a +# list of paths. The same path on the head node will be copied to the worker node. +# This behavior is a subset of the file_mounts behavior. In the vast majority of cases +# you should just use file_mounts. Only use this if you know what you're doing! +cluster_synced_files: [] + +# Whether changes to directories in file_mounts or cluster_synced_files in the head node +# should sync to the worker node continuously +file_mounts_sync_continuously: False + +# Patterns for files to exclude when running rsync up or rsync down +rsync_exclude: + - "**/.git" + - "**/.git/**" + +# Pattern files to use for filtering out files when running rsync up or rsync down. The file is searched for +# in the source directory and recursively through all subdirectories. For example, if .gitignore is provided +# as a value, the behavior will match git's behavior for finding and using .gitignore files. +rsync_filter: + - ".gitignore" + +# List of commands that will be run before `setup_commands`. If docker is +# enabled, these commands will run outside the container and before docker +# is setup. +initialization_commands: [] + +# List of shell commands to run to set up nodes. +setup_commands: + - pip install causaltune catboost + + # Note: if you're developing Ray, you probably want to create a Docker image that + # has your Ray repo pre-cloned. Then, you can replace the pip installs + # below with a git checkout (and possibly a recompile). + # To run the nightly version of ray (as opposed to the latest), either use a rayproject docker image + # that has the "nightly" (e.g. "rayproject/ray-ml:nightly-gpu") or uncomment the following line: + # - pip install -U "ray[default] @ https://s3-us-west-2.amazonaws.com/ray-wheels/latest/ray-3.0.0.dev0-cp37-cp37m-manylinux2014_x86_64.whl" + + +# Custom commands that will be run on the head node after common setup. +head_setup_commands: + - pip install google-api-python-client==1.7.8 + +# Custom commands that will be run on worker nodes after common setup. +worker_setup_commands: [] + +# Command to start ray on the head node. You don't need to change this. +head_start_ray_commands: + - ray stop + - >- + ray start + --head + --port=6379 + --object-manager-port=8076 + --autoscaling-config=~/ray_bootstrap_config.yaml + +# Command to start ray on worker nodes. You don't need to change this. +worker_start_ray_commands: + - ray stop + - >- + ray start + --address=$RAY_HEAD_IP:6379 + --object-manager-port=8076 diff --git a/notebooks/RunExperiments/experiment_runner.py b/notebooks/RunExperiments/runners/experiment_runner.py similarity index 93% rename from notebooks/RunExperiments/experiment_runner.py rename to notebooks/RunExperiments/runners/experiment_runner.py index 775e180..f26bb89 100644 --- a/notebooks/RunExperiments/experiment_runner.py +++ b/notebooks/RunExperiments/runners/experiment_runner.py @@ -18,7 +18,7 @@ warnings.filterwarnings("ignore") # Ensure CausalTune is in the Python path -root_path = os.path.realpath("../../../..") +root_path = os.path.realpath("../../../../..") sys.path.append(os.path.join(root_path, "causaltune")) # noqa: E402 # Import CausalTune and other custom modules after setting up the path @@ -86,7 +86,7 @@ def get_estimator_list(dataset_name): return [est for est in estimator_list if "Dummy" not in est] -def run_experiment(args, use_ray: bool = False): +def run_experiment(args, dataset_path: str, use_ray: bool = False): # Process datasets data_sets = {} for dataset in args.datasets: @@ -97,7 +97,7 @@ def run_experiment(args, use_ray: bool = False): ) size = parts[0] name = " ".join(parts[1:]) - file_path = f"RunDatasets/{size}/{name}.pkl" + file_path = f"{dataset_path}/{size}/{name}.pkl" data_sets[f"{size} {name}"] = load_dataset(file_path) if args.timestamp_in_dirname: @@ -107,7 +107,7 @@ def run_experiment(args, use_ray: bool = False): out_dir = f"EXPERIMENT_RESULTS_{args.identifier}" os.makedirs(out_dir, exist_ok=True) - out_dir = os.path.join(out_dir, size) + out_dir = os.path.realpath(os.path.join(out_dir, size)) os.makedirs(out_dir, exist_ok=True) print(f"Loaded datasets: {list(data_sets.keys())}") @@ -460,6 +460,35 @@ def plot_mse_grid(title): plot_mse_grid("Experiment Results") +def run_batch( + identifier: str, + kind: str, + metrics: List[str], + dataset_path: str, +): + args = parse_arguments() + args.identifier = identifier + args.metrics = metrics + # run_experiment assumes we don't mix large and small datasets in the same call + args.datasets = [f"Large Linear_{kind}", f"Large NonLinear_{kind}"] + args.num_samples = 100 + args.timestamp_in_dirname = False + args.outcome_model = "auto" # or use "nested" for the old-style nested model + + # os.environ["RAY_ADDRESS"] = "ray://127.0.0.1:8265" + + use_ray = True + if use_ray: + import ray + + # Assuming we port-mapped already by running ray dashboard + ray.init( + "ray://localhost:10001", runtime_env={"pip": ["causaltune", "catboost"]} + ) # "34.82.184.148:6379" + out_dir = run_experiment(args, dataset_path=dataset_path, use_ray=use_ray) + return out_dir + + if __name__ == "__main__": args = parse_arguments() @@ -476,7 +505,7 @@ def plot_mse_grid(title): import ray ray.init() - out_dir = run_experiment(args, use_ray=use_ray) + out_dir = run_experiment(args, dataset_path="../RunDatasets", use_ray=use_ray) # plot results upper_bounds = {"MSE": 1e2, "policy_risk": 0.2} diff --git a/notebooks/RunExperiments/runners/iv.py b/notebooks/RunExperiments/runners/iv.py new file mode 100644 index 0000000..8e975e8 --- /dev/null +++ b/notebooks/RunExperiments/runners/iv.py @@ -0,0 +1,14 @@ +import os + +from experiment_runner import run_batch, generate_plots + +identifier = "Egor_test" +kind = "IV" +metrics = ["energy_distance", "frobenius_norm", "codec"] + +out_dir = run_batch(identifier, kind, metrics, dataset_path=os.path.realpath("../RunDatasets")) +# plot results +# upper_bounds = {"MSE": 1e2, "policy_risk": 0.2} +# lower_bounds = {"erupt": 0.06, "bite": 0.75} +generate_plots(os.path.join(out_dir, kind)) # , upper_bounds, lower_bounds) +print("yay!") diff --git a/notebooks/RunExperiments/runners/kc.py b/notebooks/RunExperiments/runners/kc.py new file mode 100644 index 0000000..3168719 --- /dev/null +++ b/notebooks/RunExperiments/runners/kc.py @@ -0,0 +1,24 @@ +import os + +from experiment_runner import run_batch, generate_plots + +identifier = "Egor_test" +kind = "KC" +metrics = [ + "erupt", + # "greedy_erupt", # regular erupt was made probabilistic, + "policy_risk", # NEW + "qini", + "auc", + "psw_energy_distance", + "frobenius_norm", # NEW + "codec", # NEW + "bite", # NEW +] + +out_dir = run_batch(identifier, kind, metrics, dataset_path=os.path.realpath("../RunDatasets")) +# plot results +# upper_bounds = {"MSE": 1e2, "policy_risk": 0.2} +# lower_bounds = {"erupt": 0.06, "bite": 0.75} +generate_plots(os.path.join(out_dir, kind)) # , upper_bounds, lower_bounds) +print("yay!") diff --git a/notebooks/RunExperiments/runners/kckp.py b/notebooks/RunExperiments/runners/kckp.py new file mode 100644 index 0000000..79955ca --- /dev/null +++ b/notebooks/RunExperiments/runners/kckp.py @@ -0,0 +1,24 @@ +import os + +from experiment_runner import run_batch, generate_plots + +identifier = "Egor_test" +kind = "KCKP" +metrics = [ + "erupt", + # "greedy_erupt", # regular erupt was made probabilistic, + "policy_risk", # NEW + "qini", + "auc", + "psw_energy_distance", + "frobenius_norm", # NEW + "codec", # NEW + "bite", # NEW +] + +out_dir = run_batch(identifier, kind, metrics, dataset_path=os.path.realpath("../RunDatasets")) +# plot results +# upper_bounds = {"MSE": 1e2, "policy_risk": 0.2} +# lower_bounds = {"erupt": 0.06, "bite": 0.75} +generate_plots(os.path.join(out_dir, kind)) # , upper_bounds, lower_bounds) +print("yay!") diff --git a/notebooks/RunExperiments/runners/rct.py b/notebooks/RunExperiments/runners/rct.py new file mode 100644 index 0000000..797bc69 --- /dev/null +++ b/notebooks/RunExperiments/runners/rct.py @@ -0,0 +1,24 @@ +import os + +from experiment_runner import run_batch, generate_plots + +identifier = "Egor_test" +kind = "RCT" +metrics = [ + "erupt", + # "greedy_erupt", # regular erupt was made probabilistic, + "policy_risk", # NEW + "qini", + "auc", + "psw_energy_distance", + "frobenius_norm", # NEW + "codec", # NEW + "bite", # NEW +] + +out_dir = run_batch(identifier, kind, metrics, dataset_path=os.path.realpath("../RunDatasets")) +# plot results +# upper_bounds = {"MSE": 1e2, "policy_risk": 0.2} +# lower_bounds = {"erupt": 0.06, "bite": 0.75} +generate_plots(os.path.join(out_dir, kind)) # , upper_bounds, lower_bounds) +print("yay!")