Skip to content

Commit

Permalink
Initial framework for docker env (#40)
Browse files Browse the repository at this point in the history
* Initial framework for docker env

* Update test name

* add features

* Download Qlib data with extra_volume

* fix pytest error

* Fix the parameters

---------

Co-authored-by: Young <[email protected]>
  • Loading branch information
SunsetWolf and you-n-g authored Jul 2, 2024
1 parent 5bf6934 commit e618f79
Show file tree
Hide file tree
Showing 6 changed files with 290 additions and 1 deletion.
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ celerybeat.pid
# Environments
.env
.venv
env/
^env/
venv/
ENV/
env.bak/
Expand Down
169 changes: 169 additions & 0 deletions rdagent/utils/env.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
"""
The motiviation of the utils is for environment management
Tries to create uniform environment for the agent to run;
- All the code and data is expected included in one folder
"""
import os
import docker
from abc import abstractmethod
from pydantic import BaseModel
from typing import Generic, TypeVar
from pathlib import Path

ASpecificBaseModel = TypeVar("ASpecificBaseModel", bound=BaseModel)


class Env(Generic[ASpecificBaseModel]):
"""
We use BaseModel as the setting due to the featurs it provides
- It provides base typing and checking featurs.
- loading and dumping the information will be easier: for example, we can use package like `pydantic-yaml`
"""
conf: ASpecificBaseModel # different env have different conf.

def __init__(self, conf: ASpecificBaseModel):
self.conf = conf

@abstractmethod
def prepare(self):
"""
Prepare for the environment based on it's configure
"""

@abstractmethod
def run(self,
entry: str | None,
local_path: str | None = None,
env: dict | None = None) -> str:
"""
Run the folder under the environment.
Parameters
----------
entry : str | None
We may we the entry point when we run it.
For example, we may have different entries when we run and summarize the project.
local_path : str | None
the local path (to project, mainly for code) will be mounted into the docker
Here are some examples for a None local path
- for example, run docker for updating the data in the extra_volumes.
- simply run the image. The results are produced by output or network
env : dict | None
Run the code with your specific environment.
Returns
-------
the stdout
"""


## Local Environment -----


class LocalConf(BaseModel):
py_entry: str # where you can find your python path


class LocalEnv(Env[LocalConf]):
"""
Sometimes local environment may be more convinient for testing
"""
conf: LocalConf


## Docker Environment -----


class DockerConf(BaseModel):
image: str # the image you want to run
mount_path: str # the path in the docker image to mount the folder
default_entry: str # the entry point of the image

extra_volumes: dict | None = {}
# Sometime, we need maintain some extra data for the workspace.
# And the extra data may be shared and the downloading can be time consuming.
# So we just want to download it once.


QLIB_TORCH_IMAGE = DockerConf(image="linlanglv/qlib_image_nightly_pytorch:nightly",
mount_path="/workspace",
default_entry="qrun conf.yaml",
extra_volumes={Path("~/.qlib/").expanduser().resolve(): "/root/.qlib/"})


class DockerEnv(Env[DockerConf]):
# TODO: Save the output into a specific file

def prepare(self):
"""
Download image if it doesn't exist
"""
client = docker.from_env()
try:
client.images.get(self.conf.image)
except docker.errors.ImageNotFound:
client.images.pull(self.conf.image)
except docker.errors.APIError as e:
raise RuntimeError(f"Error while pulling the image: {e}")

def run(self, entry: str | None = None, local_path: str | None = None, env: dict | None = None):

if env is None:
env = {}
client = docker.from_env()
if entry is None:
entry = self.conf.default_entry

volumns = {}
if local_path is not None:
local_path = os.path.abspath(local_path)
volumns[local_path] = {'bind': self.conf.mount_path, 'mode': 'rw'}
if self.conf.extra_volumes is not None:
for lp, rp in self.conf.extra_volumes.items():
volumns[lp] = {'bind': rp, 'mode': 'rw'}

log_output = ""
try:
container = client.containers.run(
image=self.conf.image,
command=entry,
volumes=volumns,
environment=env,
detach=True,
working_dir=self.conf.mount_path,
auto_remove=True,
)
logs = container.logs(stream=True)
for log in logs:
decoded_log = log.strip().decode()
print(decoded_log)
log_output += decoded_log + "\n"
container.wait()
return log_output
except docker.errors.ContainerError as e:
raise RuntimeError(f"Error while running the container: {e}")
except docker.errors.ImageNotFound:
raise RuntimeError("Docker image not found.")
except docker.errors.APIError as e:
raise RuntimeError(f"Error while running the container: {e}")


class QTDockerEnv(DockerEnv):
"""Qlib Torch Docker"""

def __init__(self, conf: DockerConf = QLIB_TORCH_IMAGE):
super().__init__(conf)

def prepare(self):
"""
Download image & data if it doesn't exist
"""
super().prepare()
qlib_data_path = next(iter(self.conf.extra_volumes.keys()))
if not (Path(qlib_data_path) / "qlib_data" / "cn_data").exists():
cmd = "python -m qlib.run.get_data qlib_data --target_dir ~/.qlib/qlib_data/cn_data --region cn --interval 1d --delete_old False"
self.run(entry=cmd)
else:
print("Data already exists. Download skipped.")
3 changes: 3 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,6 @@ tree-sitter
jupyter

python-dotenv

# infrastructure related.
docker
4 changes: 4 additions & 0 deletions test/utils/env_tpl/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@

# Introduction

It is a template for testing.
71 changes: 71 additions & 0 deletions test/utils/env_tpl/conf.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
qlib_init:
provider_uri: "~/.qlib/qlib_data/cn_data"
region: cn
market: &market csi300
benchmark: &benchmark SH000300
data_handler_config: &data_handler_config
start_time: 2008-01-01
end_time: 2020-08-01
fit_start_time: 2008-01-01
fit_end_time: 2014-12-31
instruments: *market
port_analysis_config: &port_analysis_config
strategy:
class: TopkDropoutStrategy
module_path: qlib.contrib.strategy
kwargs:
signal: <PRED>
topk: 50
n_drop: 5
backtest:
start_time: 2017-01-01
end_time: 2020-08-01
account: 100000000
benchmark: *benchmark
exchange_kwargs:
limit_threshold: 0.095
deal_price: close
open_cost: 0.0005
close_cost: 0.0015
min_cost: 5
task:
model:
class: LGBModel
module_path: qlib.contrib.model.gbdt
kwargs:
loss: mse
colsample_bytree: 0.8879
learning_rate: 0.2
subsample: 0.8789
lambda_l1: 205.6999
lambda_l2: 580.9768
max_depth: 8
num_leaves: 210
num_threads: 20
dataset:
class: DatasetH
module_path: qlib.data.dataset
kwargs:
handler:
class: Alpha158
module_path: qlib.contrib.data.handler
kwargs: *data_handler_config
segments:
train: [2008-01-01, 2014-12-31]
valid: [2015-01-01, 2016-12-31]
test: [2017-01-01, 2020-08-01]
record:
- class: SignalRecord
module_path: qlib.workflow.record_temp
kwargs:
model: <MODEL>
dataset: <DATASET>
- class: SigAnaRecord
module_path: qlib.workflow.record_temp
kwargs:
ana_long_short: False
ann_scaler: 252
- class: PortAnaRecord
module_path: qlib.workflow.record_temp
kwargs:
config: *port_analysis_config
42 changes: 42 additions & 0 deletions test/utils/test_env.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
import os
import sys
import subprocess
import unittest
from pathlib import Path
sys.path.append(str(Path(__file__).resolve().parent.parent))
from rdagent.utils.env import QTDockerEnv
import shutil


DIRNAME = Path(__file__).absolute().resolve().parent


class EnvUtils(unittest.TestCase):

def setUp(self):
pass

def tearDown(self):
# NOTE: For a docker file, the output are generated with root permission.
# mlrun_p = DIRNAME / "env_tpl" / "mlruns"
# if mlrun_p.exists():
# shutil.rmtree(mlrun_p)
...

def test_docker(self):
"""
We will mount `env_tpl` into the docker image.
And run the docker image with `qrun conf.yaml`
"""
qtde = QTDockerEnv()
qtde.prepare()
qtde.prepare() # you can prepare for multiple times. It is expected to handle it correctly
# the stdout are returned as result
result = qtde.run(local_path=str(DIRNAME / "env_tpl"), entry="qrun conf.yaml")

mlrun_p = DIRNAME / "env_tpl" / "mlruns"
self.assertTrue(mlrun_p.exists(), f"Expected output file {mlrun_p} not found")


if __name__ == "__main__":
unittest.main()

0 comments on commit e618f79

Please sign in to comment.