Skip to content

Commit

Permalink
update vital TODOs and comments in Kaggle scen
Browse files Browse the repository at this point in the history
  • Loading branch information
WinstonLiyt committed Aug 22, 2024
1 parent d1e2d51 commit 3e0cf74
Show file tree
Hide file tree
Showing 5 changed files with 127 additions and 13 deletions.
97 changes: 90 additions & 7 deletions rdagent/scenarios/feature_engineering/developer/data_runner.py
Original file line number Diff line number Diff line change
@@ -1,30 +1,113 @@
from typing import List
import pandas as pd

from rdagent.components.coder.model_coder.model import ModelExperiment, ModelFBWorkspace
from rdagent.components.runner import CachedRunner
from rdagent.components.runner.conf import RUNNER_SETTINGS
from rdagent.core.exception import ModelEmptyError
from rdagent.core.exception import FactorEmptyError
from rdagent.core.conf import RD_AGENT_SETTINGS
from rdagent.core.utils import multiprocessing_wrapper
from rdagent.log import rdagent_logger as logger
from rdagent.scenarios.feature_engineering.experiment.feature_experiment import FEFeatureExperiment


class FEFeatureRunner(CachedRunner[FEFeatureExperiment]):
def develop(self, exp: FEFeatureExperiment) -> FEFeatureExperiment:
import pickle
with open('/home/v-yuanteli/RD-Agent/git_ignore_folder/test_featexp_data.pkl', 'wb') as f:
pickle.dump(exp, f)
print("Feature Experiment object saved to test_featexp_data.pkl")


# 这里考虑把每次实验都加一次原始数据
if exp.based_experiments and exp.based_experiments[-1].result is None:
exp.based_experiments[-1] = self.develop(exp.based_experiments[-1])

if RUNNER_SETTINGS.cache_result:
cache_hit, result = self.get_cache_result(exp)
if cache_hit:
exp.result = result
return exp

#TODO 这里对应于SOTA因子库的概念
if exp.based_experiments:
SOTA_factor = None
if len(exp.based_experiments) > 1:
SOTA_factor = self.process_factor_data(exp.based_experiments)

# Process the new factors data
new_factors = self.process_factor_data(exp)

if new_factors.empty:
raise FactorEmptyError("No valid factor data found to merge.")

# Combine the SOTA factor and new factors if SOTA factor exists
if SOTA_factor is not None and not SOTA_factor.empty:
new_factors = self.deduplicate_new_factors(SOTA_factor, new_factors)
if new_factors.empty:
raise FactorEmptyError("No valid factor data found to merge.")
combined_factors = pd.concat([SOTA_factor, new_factors], axis=1).dropna()
else:
combined_factors = new_factors

if exp.sub_workspace_list[0].code_dict.get("model.py") is None:
raise ModelEmptyError("model.py is empty")
# to replace & inject code
exp.experiment_workspace.inject_code(**{"model.py": exp.sub_workspace_list[0].code_dict["model.py"]})
# Sort and nest the combined factors under 'feature'
# TODO 这里是去重吧,针对feature的格式处理 kaggle应该不需要
combined_factors = combined_factors.sort_index()
combined_factors = combined_factors.loc[:, ~combined_factors.columns.duplicated(keep="last")]
new_columns = pd.MultiIndex.from_product([["feature"], combined_factors.columns])
combined_factors.columns = new_columns

env_to_use = {"PYTHONPATH": "./"}
# Save the combined factors to the workspace
with open(exp.experiment_workspace.workspace_path / "combined_factors_df.pkl", "wb") as f:
pickle.dump(combined_factors, f)

result = exp.experiment_workspace.execute(run_env=env_to_use)
# TODO 这里还是execute,应该是连kaggle的dockers
result = exp.experiment_workspace.execute(
qlib_config_name=f"conf.yaml" if len(exp.based_experiments) == 0 else "conf_combined.yaml"
)

exp.result = result
if RUNNER_SETTINGS.cache_result:
self.dump_cache_result(exp, result)

return exp


return exp

def process_factor_data(self, exp_or_list: List[FEFeatureExperiment] | FEFeatureExperiment) -> pd.DataFrame:
"""
Process and combine factor data from experiment implementations.
Args:
exp (ASpecificExp): The experiment containing factor data.
Returns:
pd.DataFrame: Combined factor data without NaN values.
"""
#TODO 这里需要把task的代码执行一遍,得到一个dataframe
if isinstance(exp_or_list, FEFeatureExperiment):
exp_or_list = [exp_or_list]
factor_dfs = []

# Collect all exp's dataframes
for exp in exp_or_list:
# Iterate over sub-implementations and execute them to get each factor data
#TODO 这里应当使用feature_execute函数实现
message_and_df_list = multiprocessing_wrapper(
[(implementation.feature_execute) for implementation in exp.sub_workspace_list],
n=RD_AGENT_SETTINGS.multi_proc_n,
)
#TODO datatime这些 这里应该不需要了
for message, df in message_and_df_list:
# Check if factor generation was successful
if df is not None and "datetime" in df.index.names:
time_diff = df.index.get_level_values("datetime").to_series().diff().dropna().unique()
if pd.Timedelta(minutes=1) not in time_diff:
factor_dfs.append(df)

# Combine all successful factor data
if factor_dfs:
return pd.concat(factor_dfs, axis=1)
else:
raise FactorEmptyError("No valid factor data found to merge.")
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ def background(self) -> str:

@property
def source_data(self) -> str:
# TODO: Add the source data property from kaggle data feature or sota data feature
# TODO Add the source data property from kaggle data feature or sota data feature
# 连KGDockerenv来获取原始数据
# 后续怎么做? 对于特征工程来说,数据是不是需要更新
raise NotImplementedError("source_data is not implemented")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import torch
import torch.nn as nn
import torch.nn.functional as F

class HybridFeatureInteractionModel(nn.Module):
def __init__(self, num_features):
super(HybridFeatureInteractionModel, self).__init__()
self.fc1 = nn.Linear(num_features, 128)
self.bn1 = nn.BatchNorm1d(128, momentum=0.1)
self.fc2 = nn.Linear(128, 64)
self.bn2 = nn.BatchNorm1d(64, momentum=0.1)
self.fc3 = nn.Linear(64, 1)
self.dropout = nn.Dropout(0.3)

def forward(self, x):
x = F.relu(self.bn1(self.fc1(x)))
x = F.relu(self.bn2(self.fc2(x)))
x = self.dropout(x)
x = torch.sigmoid(self.fc3(x))
return x

model_cls = HybridFeatureInteractionModel
12 changes: 8 additions & 4 deletions rdagent/scenarios/feature_engineering/experiment/prompts.yaml
Original file line number Diff line number Diff line change
@@ -1,17 +1,19 @@
feature_engineering_background: |-
Data preprocessing is a crucial step in the data analysis pipeline, involving the transformation and preparation of raw data into a suitable format for further analysis or modeling.
This process is essential to enhance the quality of data and to ensure the accuracy of the results obtained from analytical models
Data preprocessing typically includes some of teh several key activities: Data Cleaning, Data Integration, Data Transformation, Data Reduction, Data Discretization.
Data preprocessing typically includes some of teh several key activities: Data Cleaning, Data Creation, Data Integration, Data Transformation, Data Reduction, Data Discretization.
The preprocessing method is defined in the following parts:
1. Name: The name of the data preprocessing method.
2. Description: The description of the method.
3. Code: The code of the preprocessing method.
4. Variables: The steps or functions used for this data preprocessing method.
The method might not provide all the parts of the information above since some might not be applicable.
Please specifically give all the hyperparameter in the method like Learning Rate, Regularization parameter, Threshold, etc.
One method should statically define one output with a fixed hyperparameter. For example, a method with a learning rate of 0.01 and a method with a learning rate of 0.1 should be considered two different methods.
One method should statically define one output with a fixed hyperparameter.
For example, a method with a learning rate of 0.01 and a method with a learning rate of 0.1 should be considered two different methods.
#TODO 是否特征工程需要和模型强相关
feature_engineering_interface: |-
Your python code should follow the interface to better interact with the user's system.
Your python code should contain the following part: the import part and the function part.
Expand All @@ -31,7 +33,9 @@ feature_engineering_interface: |-
```
The input data is a pandas dataframe and the output should be a pandas dataframe similar to the original data.
#TODO: Generate a python function (the whole preprocessing process) or generate a transformation each time
#TODO 这里有个问题,输出的data是直接给模型Train,还是要多个task concat起来给模型Train
#TODO Generate a python function (the whole preprocessing process) or generate a transformation each time

feature_engineering_output_format: |-
Your output should be a pandas dataframe similar to the original data.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,9 @@ def execute(self, run_env: dict = {}, *args, **kwargs) -> str:
if not csv_path.exists():
logger.error(f"File {csv_path} does not exist.")
return None
return pd.read_csv(csv_path, index_col=0).iloc[:, 0]
return pd.read_csv(csv_path, index_col=0).iloc[:, 0]

def feature_execute(self, run_env: dict = {}, *args, **kwargs) -> pd.DataFrame:
#TODO 基于task的代码得到具体的特征数据,应当返回一个dataframe
#TODO 查看task的代码的案例,看一看是否现在生成的代码是什么样的,再决定下面这个函数怎么写
pass

0 comments on commit 3e0cf74

Please sign in to comment.