diff --git a/rdagent/scenarios/feature_engineering/developer/data_runner.py b/rdagent/scenarios/feature_engineering/developer/data_runner.py index 27995853..f333da2a 100644 --- a/rdagent/scenarios/feature_engineering/developer/data_runner.py +++ b/rdagent/scenarios/feature_engineering/developer/data_runner.py @@ -1,30 +1,113 @@ +from typing import List +import pandas as pd + from rdagent.components.coder.model_coder.model import ModelExperiment, ModelFBWorkspace from rdagent.components.runner import CachedRunner from rdagent.components.runner.conf import RUNNER_SETTINGS -from rdagent.core.exception import ModelEmptyError +from rdagent.core.exception import FactorEmptyError +from rdagent.core.conf import RD_AGENT_SETTINGS +from rdagent.core.utils import multiprocessing_wrapper from rdagent.log import rdagent_logger as logger from rdagent.scenarios.feature_engineering.experiment.feature_experiment import FEFeatureExperiment class FEFeatureRunner(CachedRunner[FEFeatureExperiment]): def develop(self, exp: FEFeatureExperiment) -> FEFeatureExperiment: + import pickle + with open('/home/v-yuanteli/RD-Agent/git_ignore_folder/test_featexp_data.pkl', 'wb') as f: + pickle.dump(exp, f) + print("Feature Experiment object saved to test_featexp_data.pkl") + + + # 这里考虑把每次实验都加一次原始数据 + if exp.based_experiments and exp.based_experiments[-1].result is None: + exp.based_experiments[-1] = self.develop(exp.based_experiments[-1]) + if RUNNER_SETTINGS.cache_result: cache_hit, result = self.get_cache_result(exp) if cache_hit: exp.result = result return exp + + #TODO 这里对应于SOTA因子库的概念 + if exp.based_experiments: + SOTA_factor = None + if len(exp.based_experiments) > 1: + SOTA_factor = self.process_factor_data(exp.based_experiments) + + # Process the new factors data + new_factors = self.process_factor_data(exp) + + if new_factors.empty: + raise FactorEmptyError("No valid factor data found to merge.") + + # Combine the SOTA factor and new factors if SOTA factor exists + if SOTA_factor is not None and not SOTA_factor.empty: + new_factors = self.deduplicate_new_factors(SOTA_factor, new_factors) + if new_factors.empty: + raise FactorEmptyError("No valid factor data found to merge.") + combined_factors = pd.concat([SOTA_factor, new_factors], axis=1).dropna() + else: + combined_factors = new_factors - if exp.sub_workspace_list[0].code_dict.get("model.py") is None: - raise ModelEmptyError("model.py is empty") - # to replace & inject code - exp.experiment_workspace.inject_code(**{"model.py": exp.sub_workspace_list[0].code_dict["model.py"]}) + # Sort and nest the combined factors under 'feature' + # TODO 这里是去重吧,针对feature的格式处理 kaggle应该不需要 + combined_factors = combined_factors.sort_index() + combined_factors = combined_factors.loc[:, ~combined_factors.columns.duplicated(keep="last")] + new_columns = pd.MultiIndex.from_product([["feature"], combined_factors.columns]) + combined_factors.columns = new_columns - env_to_use = {"PYTHONPATH": "./"} + # Save the combined factors to the workspace + with open(exp.experiment_workspace.workspace_path / "combined_factors_df.pkl", "wb") as f: + pickle.dump(combined_factors, f) - result = exp.experiment_workspace.execute(run_env=env_to_use) + # TODO 这里还是execute,应该是连kaggle的dockers + result = exp.experiment_workspace.execute( + qlib_config_name=f"conf.yaml" if len(exp.based_experiments) == 0 else "conf_combined.yaml" + ) exp.result = result if RUNNER_SETTINGS.cache_result: self.dump_cache_result(exp, result) return exp + + + return exp + + def process_factor_data(self, exp_or_list: List[FEFeatureExperiment] | FEFeatureExperiment) -> pd.DataFrame: + """ + Process and combine factor data from experiment implementations. + + Args: + exp (ASpecificExp): The experiment containing factor data. + + Returns: + pd.DataFrame: Combined factor data without NaN values. + """ + #TODO 这里需要把task的代码执行一遍,得到一个dataframe + if isinstance(exp_or_list, FEFeatureExperiment): + exp_or_list = [exp_or_list] + factor_dfs = [] + + # Collect all exp's dataframes + for exp in exp_or_list: + # Iterate over sub-implementations and execute them to get each factor data + #TODO 这里应当使用feature_execute函数实现 + message_and_df_list = multiprocessing_wrapper( + [(implementation.feature_execute) for implementation in exp.sub_workspace_list], + n=RD_AGENT_SETTINGS.multi_proc_n, + ) + #TODO datatime这些 这里应该不需要了 + for message, df in message_and_df_list: + # Check if factor generation was successful + if df is not None and "datetime" in df.index.names: + time_diff = df.index.get_level_values("datetime").to_series().diff().dropna().unique() + if pd.Timedelta(minutes=1) not in time_diff: + factor_dfs.append(df) + + # Combine all successful factor data + if factor_dfs: + return pd.concat(factor_dfs, axis=1) + else: + raise FactorEmptyError("No valid factor data found to merge.") \ No newline at end of file diff --git a/rdagent/scenarios/feature_engineering/experiment/feature_experiment.py b/rdagent/scenarios/feature_engineering/experiment/feature_experiment.py index fa9eb4c9..6fd3fe97 100644 --- a/rdagent/scenarios/feature_engineering/experiment/feature_experiment.py +++ b/rdagent/scenarios/feature_engineering/experiment/feature_experiment.py @@ -35,7 +35,7 @@ def background(self) -> str: @property def source_data(self) -> str: - # TODO: Add the source data property from kaggle data feature or sota data feature + # TODO Add the source data property from kaggle data feature or sota data feature # 连KGDockerenv来获取原始数据 # 后续怎么做? 对于特征工程来说,数据是不是需要更新 raise NotImplementedError("source_data is not implemented") diff --git a/rdagent/scenarios/feature_engineering/experiment/feature_template/model.py b/rdagent/scenarios/feature_engineering/experiment/feature_template/model.py new file mode 100644 index 00000000..d7c5139e --- /dev/null +++ b/rdagent/scenarios/feature_engineering/experiment/feature_template/model.py @@ -0,0 +1,22 @@ +import torch +import torch.nn as nn +import torch.nn.functional as F + +class HybridFeatureInteractionModel(nn.Module): + def __init__(self, num_features): + super(HybridFeatureInteractionModel, self).__init__() + self.fc1 = nn.Linear(num_features, 128) + self.bn1 = nn.BatchNorm1d(128, momentum=0.1) + self.fc2 = nn.Linear(128, 64) + self.bn2 = nn.BatchNorm1d(64, momentum=0.1) + self.fc3 = nn.Linear(64, 1) + self.dropout = nn.Dropout(0.3) + + def forward(self, x): + x = F.relu(self.bn1(self.fc1(x))) + x = F.relu(self.bn2(self.fc2(x))) + x = self.dropout(x) + x = torch.sigmoid(self.fc3(x)) + return x + +model_cls = HybridFeatureInteractionModel \ No newline at end of file diff --git a/rdagent/scenarios/feature_engineering/experiment/prompts.yaml b/rdagent/scenarios/feature_engineering/experiment/prompts.yaml index a858de94..f392fa72 100644 --- a/rdagent/scenarios/feature_engineering/experiment/prompts.yaml +++ b/rdagent/scenarios/feature_engineering/experiment/prompts.yaml @@ -1,17 +1,19 @@ feature_engineering_background: |- Data preprocessing is a crucial step in the data analysis pipeline, involving the transformation and preparation of raw data into a suitable format for further analysis or modeling. This process is essential to enhance the quality of data and to ensure the accuracy of the results obtained from analytical models - Data preprocessing typically includes some of teh several key activities: Data Cleaning, Data Integration, Data Transformation, Data Reduction, Data Discretization. + Data preprocessing typically includes some of teh several key activities: Data Cleaning, Data Creation, Data Integration, Data Transformation, Data Reduction, Data Discretization. The preprocessing method is defined in the following parts: 1. Name: The name of the data preprocessing method. 2. Description: The description of the method. 3. Code: The code of the preprocessing method. 4. Variables: The steps or functions used for this data preprocessing method. + The method might not provide all the parts of the information above since some might not be applicable. Please specifically give all the hyperparameter in the method like Learning Rate, Regularization parameter, Threshold, etc. - One method should statically define one output with a fixed hyperparameter. For example, a method with a learning rate of 0.01 and a method with a learning rate of 0.1 should be considered two different methods. - + One method should statically define one output with a fixed hyperparameter. + For example, a method with a learning rate of 0.01 and a method with a learning rate of 0.1 should be considered two different methods. +#TODO 是否特征工程需要和模型强相关 feature_engineering_interface: |- Your python code should follow the interface to better interact with the user's system. Your python code should contain the following part: the import part and the function part. @@ -31,7 +33,9 @@ feature_engineering_interface: |- ``` The input data is a pandas dataframe and the output should be a pandas dataframe similar to the original data. -#TODO: Generate a python function (the whole preprocessing process) or generate a transformation each time +#TODO 这里有个问题,输出的data是直接给模型Train,还是要多个task concat起来给模型Train +#TODO Generate a python function (the whole preprocessing process) or generate a transformation each time + feature_engineering_output_format: |- Your output should be a pandas dataframe similar to the original data. diff --git a/rdagent/scenarios/feature_engineering/experiment/workspace.py b/rdagent/scenarios/feature_engineering/experiment/workspace.py index eb8c8971..100cc4aa 100644 --- a/rdagent/scenarios/feature_engineering/experiment/workspace.py +++ b/rdagent/scenarios/feature_engineering/experiment/workspace.py @@ -30,4 +30,9 @@ def execute(self, run_env: dict = {}, *args, **kwargs) -> str: if not csv_path.exists(): logger.error(f"File {csv_path} does not exist.") return None - return pd.read_csv(csv_path, index_col=0).iloc[:, 0] \ No newline at end of file + return pd.read_csv(csv_path, index_col=0).iloc[:, 0] + + def feature_execute(self, run_env: dict = {}, *args, **kwargs) -> pd.DataFrame: + #TODO 基于task的代码得到具体的特征数据,应当返回一个dataframe + #TODO 查看task的代码的案例,看一看是否现在生成的代码是什么样的,再决定下面这个函数怎么写 + pass \ No newline at end of file