From ec877a56ec5314cb3f6cc8c7e752b047469a665c Mon Sep 17 00:00:00 2001 From: river Date: Mon, 15 Apr 2024 11:08:57 +0800 Subject: [PATCH] zte ragas and longchat dataset --- configs/datasets/opseval/datasets.py | 13 +++- configs/datasets/opseval/qa_gen.py | 42 ++++++++++- configs/datasets/opseval/qa_ppl.py | 39 ++++++++++ configs/tests/test_ragas.py | 73 +++++++++++++++++++ configs/xz/run_qwen1_5_72b_chat.sh | 22 ++++++ configs/xz/runconfig.py | 23 +++--- configs/xz/runconfig_base.py | 11 ++- .../icl_evaluator/opseval_gen_evaluator.py | 29 ++++++-- opencompass/ragas/config.py | 11 ++- opencompass/ragas/judge.py | 13 ++-- opencompass/runners/local.py | 59 +++++++++++++-- opencompass/tasks/openicl_eval.py | 29 +++++++- opencompass/tasks/openicl_infer.py | 2 +- 13 files changed, 323 insertions(+), 43 deletions(-) create mode 100644 configs/tests/test_ragas.py create mode 100755 configs/xz/run_qwen1_5_72b_chat.sh diff --git a/configs/datasets/opseval/datasets.py b/configs/datasets/opseval/datasets.py index 89a9b12..e7eb425 100644 --- a/configs/datasets/opseval/datasets.py +++ b/configs/datasets/opseval/datasets.py @@ -2,8 +2,8 @@ with read_base(): from .mc_gen import get_mc_gen_datasets from .mc_ppl import get_mc_ppl_datasets - from .qa_gen import get_qa_gen_datasets - from .qa_ppl import get_qa_ppl_datasets + from .qa_gen import get_qa_gen_datasets, get_qa_long_gen_datasets + from .qa_ppl import get_qa_ppl_datasets, get_qa_long_ppl_datasets from ...paths import ROOT_DIR def get_all_datasets(name, path, langs, qtypes): @@ -102,11 +102,16 @@ def get_selected_datasets(setting_func, name, path, langs, qtypes): rzy_qa_gen = get_selected_datasets([get_qa_gen_datasets], 'rzy', f'{ROOT_DIR}data/opseval/rzy/splitted_v2', langs=['zh'], qtypes=None) rzy_qa = rzy_qa_ppl + rzy_qa_gen +zedx_qa_ppl = get_selected_datasets([get_qa_long_ppl_datasets], 'zedx', f'{ROOT_DIR}data/opseval/zedx', langs=['zh'], qtypes=None) +zedx_qa_gen = get_selected_datasets([get_qa_long_gen_datasets], 'zedx', f'{ROOT_DIR}data/opseval/zedx', langs=['zh'], qtypes=None) +zedx_qa = zedx_qa_ppl + zedx_qa_gen + all_ppl_mc = zte_mc_ppl + oracle_mc_ppl + owl_mc_ppl + network_mc_ppl + company_mc_ppl all_gen_mc = zte_mc_gen + oracle_mc_gen + owl_mc_gen + network_mc_gen + company_mc_gen -all_ppl_qa = owl_qa_ppl + company_qa_ppl -all_gen_qa = owl_qa_gen + company_qa_gen +all_ppl_qa = owl_qa_ppl + company_qa_ppl + zedx_qa_ppl +all_gen_qa = owl_qa_gen + company_qa_gen + zedx_qa_gen +all_qa = all_ppl_qa + all_gen_qa ceval_mc_ppl = get_selected_datasets([get_mc_ppl_datasets], 'ceval', f'{ROOT_DIR}data/opseval/ceval/splitted_dev', langs=['zh'], qtypes=['single']) ceval_mc_gen = get_selected_datasets([get_mc_gen_datasets], 'ceval', f'{ROOT_DIR}data/opseval/ceval/splitted_dev', langs=['zh'], qtypes=['single']) diff --git a/configs/datasets/opseval/qa_gen.py b/configs/datasets/opseval/qa_gen.py index ebfa76d..30522ca 100644 --- a/configs/datasets/opseval/qa_gen.py +++ b/configs/datasets/opseval/qa_gen.py @@ -35,7 +35,7 @@ def get_qa_gen_datasets(dataset_name, path, langs=['zh'], qtypes=None): inferencer=get_gen_inferencer(sc=False), ), # eval_cfg=dict(evaluator=dict(type=BleuRougeEvaluator)) - eval_cfg=dict(evaluator=dict(type=OpsEvalGenQAEvaluator, language=lang)) + eval_cfg=dict(evaluator=dict(type=OpsEvalGenQAEvaluator, language=lang), need_ragas=True) ) for shot_abbr, shot_hint_id, retriever_dict in zip( ['Zero-shot', '3-shot'], @@ -60,5 +60,45 @@ def get_qa_gen_datasets(dataset_name, path, langs=['zh'], qtypes=None): selected.extend([d for d in datasets if f'{lang}' in d['abbr'].replace('_', '-').split('-')]) return selected +def get_qa_long_gen_datasets(dataset_name, path, langs=['zh'], qtypes=None): + naive_gen_datasets = [ + dict( + type=OpsEvalQADataset, + abbr=f'{dataset_name}-qa-{shot_abbr}-{lang}-sc-gen', + path=path, + name=f'{dataset_name}_qa_{lang}', + reader_cfg=qa_gen_reader_cfg, + infer_cfg=dict( + ice_template=qa_gen_ice_template(prompt_hint, answer_hint), + prompt_template=qa_gen_prompt_template(prompt_hint, answer_hint), + retriever=retriever_dict, + inferencer=get_gen_inferencer(sc=False), + ), + # eval_cfg=dict(evaluator=dict(type=BleuRougeEvaluator)) + eval_cfg=dict(evaluator=dict(type=OpsEvalGenQAEvaluator, language=lang), need_ragas=True) + ) + for shot_abbr, shot_hint_id, retriever_dict in zip( + ['Zero-shot', '3-shot'], + [0, 1], + [dict(type=ZeroRetriever), dict(type=FixKRetriever, fix_id_list=[0,1,2])] + ) + for lang, prompt_hint, answer_hint in zip( + ['zh', 'en'], + [ + f"你是一名运维专家,请回答下面这个问题:\n", + f"You are an IT operations expert, please answer the following question: \n" + ], + [ + "答案:", + "Answer:" + ] + ) + ] + datasets = naive_gen_datasets + selected = [] + for lang in langs: + selected.extend([d for d in datasets if f'{lang}' in d['abbr'].replace('_', '-').split('-')]) + return selected + zjyd_qa_gen_datasets = get_qa_gen_datasets('zjyd', f'{ROOT_DIR}data/opseval/zjyd/') \ No newline at end of file diff --git a/configs/datasets/opseval/qa_ppl.py b/configs/datasets/opseval/qa_ppl.py index 9616681..5ce778a 100644 --- a/configs/datasets/opseval/qa_ppl.py +++ b/configs/datasets/opseval/qa_ppl.py @@ -59,5 +59,44 @@ def get_qa_ppl_datasets(dataset_name, path, langs=['zh'], qtypes=None): selected.extend([d for d in datasets if f'{lang}' in d['abbr'].replace('_', '-').split('-')]) return selected +def get_qa_long_ppl_datasets(dataset_name, path, langs=['zh'], qtypes=None): + naive_ppl_datasets = [ + dict( + type=OpsEvalQADataset, + abbr=f'{dataset_name}-qa-{shot_abbr}-{lang}-sc-ppl', + path=path, + name=f'{dataset_name}_qa_{lang}', + reader_cfg=qa_ppl_reader_cfg, + infer_cfg=dict( + ice_template=qa_ppl_ice_template(prompt_hint, answer_hint), + prompt_template=qa_ppl_prompt_template(prompt_hint, answer_hint), + retriever=retriever_dict, + inferencer=get_ppl_qa_inferencer(), + ), + eval_cfg=dict(evaluator=dict(type=MeanEvaluator, field_name='PPL')) + ) + for shot_abbr, shot_hint_id, retriever_dict in zip( + ['Zero-shot', '3-shot'], + [0, 1], + [dict(type=ZeroRetriever), dict(type=FixKRetriever, fix_id_list=[0,1,2])] + ) + for lang, prompt_hint, answer_hint in zip( + ['zh', 'en'], + [ + f"你是一名运维专家,请回答下面这个问题:\n", + f"You are an IT operations expert, please answer the following question: \n" + ], + [ + "答案:", + "Answer:" + ] + ) + ] + datasets = naive_ppl_datasets + selected = [] + for lang in langs: + selected.extend([d for d in datasets if f'{lang}' in d['abbr'].replace('_', '-').split('-')]) + return selected + zjyd_qa_ppl_datasets = get_qa_ppl_datasets('zjyd', f'{ROOT_DIR}data/opseval/zjyd/') \ No newline at end of file diff --git a/configs/tests/test_ragas.py b/configs/tests/test_ragas.py new file mode 100644 index 0000000..a98a13b --- /dev/null +++ b/configs/tests/test_ragas.py @@ -0,0 +1,73 @@ +from mmengine.config import read_base +from opencompass.partitioners import SizePartitioner, NaivePartitioner +from opencompass.runners import LocalRunner +from opencompass.tasks import OpenICLInferTask, OpenICLEvalTask + +with read_base(): + # Datasets + from ..datasets.opseval.datasets import owl_qa_gen, rzy_qa_gen, zedx_qa_gen, owl_qa_ppl, rzy_qa_ppl + # Models + from ..local_models.google.t5 import t5_base + from ..local_models.bert.bert import bert_large_cased + from ..local_models.qwen.qwen import qwen1_5_chat_models + + from ..paths import ROOT_DIR + + +datasets = [ + *owl_qa_gen, + *owl_qa_ppl, + *rzy_qa_gen, + *rzy_qa_ppl, + *zedx_qa_gen, +] + +datasets = [ + dataset for dataset in datasets if 'Zero-shot' in dataset['abbr'] and 'zh' in dataset['abbr'] +] + +models = [ + # t5_base, + # bert_large_cased, + model for model in qwen1_5_chat_models if '14' in model['abbr'] + # *vicuna_bases, + # *internlm2_bases, + # *yi_bases, + # mistral_7b +] + +for model in models: + model['run_cfg'] = dict(num_gpus=1, num_procs=1) + pass + +for dataset in datasets: + dataset['sample_setting'] = dict() + dataset['infer_cfg']['inferencer']['save_every'] = 8 + dataset['infer_cfg']['inferencer']['sc_size'] = 2 + dataset['infer_cfg']['inferencer']['max_token_len'] = 200 + dataset['eval_cfg']['sc_size'] = 2 + dataset['sample_setting'] = dict(sample_size=5) # !!!WARNING: Use for testing only!!! + + +infer = dict( + partitioner=dict( + # type=SizePartitioner, + # max_task_size=100, + # gen_task_coef=1, + type=NaivePartitioner + ), + runner=dict( + type=LocalRunner, + max_num_workers=16, + max_workers_per_gpu=1, + task=dict(type=OpenICLInferTask), + ), +) + +eval = dict( + partitioner=dict(type=NaivePartitioner), + runner=dict( + type=LocalRunner, + max_num_workers=16, + task=dict(type=OpenICLEvalTask)), +) diff --git a/configs/xz/run_qwen1_5_72b_chat.sh b/configs/xz/run_qwen1_5_72b_chat.sh new file mode 100755 index 0000000..0d0ddc5 --- /dev/null +++ b/configs/xz/run_qwen1_5_72b_chat.sh @@ -0,0 +1,22 @@ +#!/bin/sh +set -x +set -e + +MODEL_PATH="/mnt/tenant-home_speed/gaozhengwei/projects/LLM/models/Qwen/Qwen1.5-72B-Chat" +PORT=12310 +GPUS=("0,1" "2,3" "4,5" "6,7") + +source /root/miniconda3/etc/profile.d/conda.sh && conda activate vllm +for i in {0..3}; do + CUDA_VISIBLE_DEVICES=${GPUS[$i]} ray start --head --port $((8012 + $i)) --num-cpus 2 + CUDA_VISIBLE_DEVICES=${GPUS[$i]} ray start --address=localhost:$((8012 + $i)) --num-cpus 2 + CUDA_VISIBLE_DEVICES=${GPUS[$i]} RAY_ADDRESS=localhost:$((8012 + $i)) python -m vllm.entrypoints.openai.api_server \ + --model $MODEL_PATH --host 127.0.0.1 --port $(($PORT + $i)) --tensor-parallel-size 2 --gpu-memory-utilization 0.98 --trust-remote-code --max-model-len 2048 & pid[$i]=$! + echo "port=$(($PORT + $i)), pid=${pid[$i]}" +done +echo "[VLLM] All backend servers have been started!!!" + +wait +echo "[VLLM] All backend services have been successfully killed!!!" +ray stop +echo "[VLLM] Ray stoped" diff --git a/configs/xz/runconfig.py b/configs/xz/runconfig.py index 22dbcbb..c5e70f1 100644 --- a/configs/xz/runconfig.py +++ b/configs/xz/runconfig.py @@ -6,7 +6,7 @@ with read_base(): # Datasets - from ..datasets.opseval.datasets import ceval_mc_ppl, network_mc_ppl, zte_mc_ppl, owl_mc_ppl, ceval_mc_gen, network_mc_gen, zte_mc_gen, owl_mc_gen, owl_qa_gen, owl_qa_ppl, rzy_qa_gen, rzy_qa_ppl, oracle_mc_gen, oracle_mc_ppl, company_mc_gen, company_mc_ppl + from ..datasets.opseval.datasets import ceval_mc_ppl, network_mc_ppl, zte_mc_ppl, owl_mc_ppl, ceval_mc_gen, network_mc_gen, zte_mc_gen, owl_mc_gen, owl_qa_gen, owl_qa_ppl, rzy_qa_gen, rzy_qa_ppl, zedx_qa_gen, zedx_qa_ppl, oracle_mc_gen, oracle_mc_ppl, company_mc_gen, company_mc_ppl from ..datasets.simple_qa.owl_qa import owl_qa_datasets from ..datasets.ppl_qa.owl_qa import owl_ppl_qa_datasets from ..datasets.simple_qa.rzy_qa import rzy_qa_datasets @@ -25,8 +25,8 @@ model_dataset_combinations = [{ 'models': [dict( type=VLLM, - abbr='nm_qwen1.5_32b_dsir_new_10000_full_owl_network_sft_800steps', - path='/mnt/home/opsfm-xz/sft_checkpoint/xz/qwen1.5-32b-dsir_new_10000-full-owl-network-sft-2000steps/checkpoint-800/merged_model', + abbr='nm_qwen1.5_32b_zedx_full_2000step_sft_2000step', + path='/mnt/tenant-home_speed/xz/sft_checkpoint/qwen1.5-32b-zedx-full-2000step-sft-2000step/merged_model', max_out_len=400, max_seq_len=2048, batch_size=8, @@ -47,9 +47,9 @@ }, { 'models': [dict( type=HuggingFaceCausalLM, - abbr='nm_qwen1.5_32b_dsir_new_10000_full_owl_network_sft_800steps', - path='/mnt/home/opsfm-xz/sft_checkpoint/xz/qwen1.5-32b-dsir_new_10000-full-owl-network-sft-2000steps/checkpoint-800/merged_model', - tokenizer_path='/mnt/home/opsfm-xz/sft_checkpoint/xz/qwen1.5-32b-dsir_new_10000-full-owl-network-sft-2000steps/checkpoint-800/merged_model', + abbr='nm_qwen1.5_32b_zedx_full_2000step_sft_2000step', + path='/mnt/tenant-home_speed/xz/sft_checkpoint/qwen1.5-32b-zedx-full-2000step-sft-2000step/merged_model', + tokenizer_path='/mnt/tenant-home_speed/xz/sft_checkpoint/qwen1.5-32b-zedx-full-2000step-sft-2000step/merged_model', tokenizer_kwargs=dict(padding_side='left', truncation_side='left', trust_remote_code=True, @@ -74,7 +74,7 @@ zeroshot_datasets = [] fewshot_datasets = [] -for dataset in [*ceval_mc_ppl,*network_mc_ppl,*zte_mc_ppl,*owl_mc_ppl,*oracle_mc_ppl,*company_mc_ppl,*ceval_mc_gen,*network_mc_gen,*zte_mc_gen,*owl_mc_gen,*oracle_mc_gen,*company_mc_gen,*owl_qa_gen,*owl_qa_ppl,*rzy_qa_gen,*rzy_qa_ppl]: +for dataset in [*ceval_mc_ppl,*network_mc_ppl,*zte_mc_ppl,*owl_mc_ppl,*oracle_mc_ppl,*company_mc_ppl,*ceval_mc_gen,*network_mc_gen,*zte_mc_gen,*owl_mc_gen,*oracle_mc_gen,*company_mc_gen,*zedx_qa_gen,*zedx_qa_ppl]: # dataset['path'] = dataset['path'].replace('/mnt/mfs/opsgpt/evaluation','/mnt/home/opseval/evaluation/') dataset['sample_setting'] = dict() dataset['infer_cfg']['inferencer']['save_every'] = 8 @@ -82,14 +82,17 @@ dataset['infer_cfg']['inferencer']['max_out_len'] = 20 # dataset['infer_cfg']['inferencer']['generation_kwargs'] = {'stopping_criteria': ['<|im_end|>', '<|endoftext|>']} if 'qa' in dataset['abbr'].replace('-', '_').split('_'): - dataset['infer_cfg']['inferencer']['max_out_len'] = 50 + if 'zedx' in dataset['abbr']: + dataset['infer_cfg']['inferencer']['max_out_len'] = 100 + else: + dataset['infer_cfg']['inferencer']['max_out_len'] = 50 if 'en' in dataset['abbr'].replace('-', '_').split('_'): continue if 'sc+cot' in dataset['abbr']: continue if 'mc' in dataset['abbr'].replace('-', '_').split('_'): dataset['sample_setting']= dict(sample_size=500) - if 'owl_qa' in dataset['abbr']: + if 'qa' in dataset['abbr'].replace('-', '_').split('_') and 'ppl' not in dataset['abbr']: dataset['sample_setting']= dict(sample_size=500) dataset['eval_cfg']['sc_size'] = 1 if 'network' in dataset['abbr']: @@ -127,6 +130,6 @@ partitioner=dict(type=NaivePartitioner), runner=dict( type=LocalRunner, - max_num_workers=32, + max_num_workers=8, task=dict(type=OpenICLEvalTask)), ) diff --git a/configs/xz/runconfig_base.py b/configs/xz/runconfig_base.py index c314d85..ba3abee 100644 --- a/configs/xz/runconfig_base.py +++ b/configs/xz/runconfig_base.py @@ -6,7 +6,7 @@ with read_base(): # Datasets - from ..datasets.opseval.datasets import ceval_mc_ppl, network_mc_ppl, zte_mc_ppl, owl_mc_ppl, ceval_mc_gen, network_mc_gen, zte_mc_gen, owl_mc_gen, owl_qa_gen, owl_qa_ppl, rzy_qa_gen, rzy_qa_ppl, oracle_mc_gen, oracle_mc_ppl, company_mc_gen, company_mc_ppl + from ..datasets.opseval.datasets import ceval_mc_ppl, network_mc_ppl, zte_mc_ppl, owl_mc_ppl, ceval_mc_gen, network_mc_gen, zte_mc_gen, owl_mc_gen, owl_qa_gen, owl_qa_ppl, rzy_qa_gen, rzy_qa_ppl, zedx_qa_gen, zedx_qa_ppl, oracle_mc_gen, oracle_mc_ppl, company_mc_gen, company_mc_ppl from ..datasets.simple_qa.owl_qa import owl_qa_datasets from ..datasets.ppl_qa.owl_qa import owl_ppl_qa_datasets from ..datasets.simple_qa.rzy_qa import rzy_qa_datasets @@ -41,14 +41,17 @@ dataset['infer_cfg']['inferencer']['max_out_len'] = 20 # dataset['infer_cfg']['inferencer']['generation_kwargs'] = {'stopping_criteria': ['<|im_end|>', '<|endoftext|>']} if 'qa' in dataset['abbr'].replace('-', '_').split('_'): - dataset['infer_cfg']['inferencer']['max_out_len'] = 50 + if 'zedx' in dataset['abbr']: + dataset['infer_cfg']['inferencer']['max_out_len'] = 100 + else: + dataset['infer_cfg']['inferencer']['max_out_len'] = 50 if 'en' in dataset['abbr'].replace('-', '_').split('_'): continue if 'sc+cot' in dataset['abbr']: continue if 'mc' in dataset['abbr'].replace('-', '_').split('_'): dataset['sample_setting']= dict(sample_size=500) - if 'owl_qa' in dataset['abbr']: + if 'qa' in dataset['abbr'].replace('-', '_').split('_') and 'ppl' not in dataset['abbr']: dataset['sample_setting']= dict(sample_size=500) dataset['eval_cfg']['sc_size'] = 1 if 'network' in dataset['abbr']: @@ -86,6 +89,6 @@ partitioner=dict(type=NaivePartitioner), runner=dict( type=LocalRunner, - max_num_workers=32, + max_num_workers=8, task=dict(type=OpenICLEvalTask)), ) diff --git a/opencompass/openicl/icl_evaluator/opseval_gen_evaluator.py b/opencompass/openicl/icl_evaluator/opseval_gen_evaluator.py index 3a4e76c..76258c1 100644 --- a/opencompass/openicl/icl_evaluator/opseval_gen_evaluator.py +++ b/opencompass/openicl/icl_evaluator/opseval_gen_evaluator.py @@ -116,15 +116,24 @@ def score(self, predictions: List, references: List) -> dict: correct += not_sc(pred, ans) sc_correct += sc_cot(pred, ans) return { - 'Accuracy': correct / tot * 100, - 'SC-Accuracy': sc_correct / tot * 100, + 'accuracy': correct / tot * 100, + 'sc-accuracy': sc_correct / tot * 100, } class OpsEvalGenQAEvaluator(BaseEvaluator): - def __init__(self, language='en'): + def __init__(self, + language='en', + ragas_config=None, + ): super().__init__() self.language = language + self.ragas_config = ragas_config + if self.ragas_config == None: + self.ragas_config = dict( + api_ip='localhost', + api_port=12310, # 12310 ~ 12313 + ) def score(self, predictions: List, references: List, test_set: List) -> dict: tot_bleu, tot_rouge = 0, 0 @@ -136,7 +145,9 @@ def score(self, predictions: List, references: List, test_set: List) -> dict: report = { "bleu": tot_bleu / len(predictions), "rouge": tot_rouge / len(predictions), - "ragas": ragas_report + "ragas_score": ragas_report["score"], + "ragas_acc": ragas_report["accuracy"], + "ragas_report": ragas_report } return report @@ -166,7 +177,7 @@ def get_ragas_score(self, predictions, references, test_set) -> dict: for idx, (question, ref) in enumerate(zip(test_set['question'], references))] answers = [{"id": idx, "question": question, "answer": ans} for idx, (question, ans) in enumerate(zip(test_set['question'], predictions))] - report = calculate_score(reference, answers) + report = calculate_score(reference, answers, self.ragas_config) return report class OpsEvalRagasEvaluator(BaseEvaluator): @@ -180,5 +191,9 @@ def score(self, predictions: List, references: List, test_set: List) -> dict: for idx, (question, ref) in enumerate(zip(test_set['question'], references))] answers = [{"id": idx, "question": question, "answer": ans} for idx, (question, ans) in enumerate(zip(test_set['question'], predictions))] - report = calculate_score(reference, answers) - return report \ No newline at end of file + try: + report = calculate_score(reference, answers) + except Exception as err: + print(f"[OpsEvalRagasEvaluator] {err}") + report = {} + return report diff --git a/opencompass/ragas/config.py b/opencompass/ragas/config.py index 41e8394..48a7406 100644 --- a/opencompass/ragas/config.py +++ b/opencompass/ragas/config.py @@ -26,13 +26,20 @@ def load_config() -> dict: config = load_config() -def load_llm() -> BaseLanguageModel: +def load_llm(ragas_config: dict) -> BaseLanguageModel: models_config = config.get('models') llm_type = models_config.get('llm_type', 'openai') if llm_type == 'openai': os.environ["OPENAI_API_BASE"] = models_config.get('openai_api_base', '') os.environ["OPENAI_API_KEY"] = models_config.get('openai_api_key', '') + if 'api_ip' in ragas_config and 'api_port' in ragas_config: + api_ip = ragas_config['api_ip'] + api_port = ragas_config['api_port'] + if 'ragas_port' in ragas_config: + api_port = ragas_config['ragas_port'] + os.environ["OPENAI_API_BASE"] = f"http://{api_ip}:{api_port}/v1" + from langchain_openai.chat_models import ChatOpenAI return ChatOpenAI(model=models_config.get('llm_model', 'gpt-3.5-turbo-16k')) @@ -49,7 +56,7 @@ def load_llm() -> BaseLanguageModel: sys.exit(1) -def load_embeddings() -> Embeddings: +def load_embeddings(ragas_config: dict) -> Embeddings: models_config = config.get('models') emb_type = models_config.get('emb_type', 'openai') if emb_type == 'openai': diff --git a/opencompass/ragas/judge.py b/opencompass/ragas/judge.py index c68e38c..e1a64a1 100644 --- a/opencompass/ragas/judge.py +++ b/opencompass/ragas/judge.py @@ -63,7 +63,7 @@ def preprocess_data(ground_truth: pd.DataFrame, predictions: pd.DataFrame) -> pd return ground_truth.merge(predictions, on='id', how='left').fillna('') -def calculate_score(reference: list[dict], answers: list[dict]) -> dict: +def calculate_score(reference: list[dict], answers: list[dict], ragas_config: dict) -> dict: """ Calculate the score of the team's answers based on the reference answers. @@ -79,7 +79,7 @@ def calculate_score(reference: list[dict], answers: list[dict]) -> dict: gt_df = pd.DataFrame(reference) preds_df = pd.DataFrame(validate_and_format_answers(answers)) data = preprocess_data(gt_df, preds_df) - res_df = compute_scores(data) + res_df = compute_scores(data, ragas_config) detail = res_df.to_dict(orient='records') overall_score = sum([item['score'] for item in detail]) / len(detail) @@ -131,7 +131,7 @@ def calculate_score(reference: list[dict], answers: list[dict]) -> dict: return report -def compute_scores(df: pd.DataFrame) -> list[dict]: +def compute_scores(df: pd.DataFrame, ragas_config: dict) -> list[dict]: langsmith_config = config.get('langsmith', {}) langsmith_enabled = langsmith_config.get('enabled', False) @@ -154,15 +154,18 @@ def compute_scores(df: pd.DataFrame) -> list[dict]: tracer = LangChainTracer(project_name=langsmith_config.get('project_name')) callbacks.append(tracer) + print("[DEBUG RAGAS] ragas_config", ragas_config) + result = evaluate( dataset, metrics=[ answer_correctness, ], - llm=load_llm(), - embeddings=load_embeddings(), + llm=load_llm(ragas_config), + embeddings=load_embeddings(ragas_config), run_config=RunConfig(max_workers=judge_config.get('max_workers', 16)), callbacks=callbacks, + raise_exceptions=False ) res_df = result.to_pandas() diff --git a/opencompass/runners/local.py b/opencompass/runners/local.py index 54d0dff..497cdcd 100644 --- a/opencompass/runners/local.py +++ b/opencompass/runners/local.py @@ -51,10 +51,12 @@ def __init__(self, max_num_workers: int = 16, debug: bool = False, max_workers_per_gpu: int = 1, + max_workers_per_ragas: int = 1, lark_bot_url: str = None): super().__init__(task=task, debug=debug, lark_bot_url=lark_bot_url) self.max_num_workers = max_num_workers self.max_workers_per_gpu = max_workers_per_gpu + self.max_workers_per_ragas = max_workers_per_ragas def launch(self, tasks: List[Dict[str, Any]]) -> List[Tuple[str, int]]: """Launch multiple tasks. @@ -78,7 +80,8 @@ def launch(self, tasks: List[Dict[str, Any]]) -> List[Tuple[str, int]]: all_gpu_ids = list(range(torch.cuda.device_count())) if self.debug: - for task in tasks: + # OpsEval: give ID to each task + for tid, task in enumerate(tasks): task = TASKS.build(dict(cfg=task, type=self.task_cfg['type'])) task_name = task.name num_gpus = task.num_gpus @@ -125,19 +128,52 @@ def launch(self, tasks: List[Dict[str, Any]]) -> List[Tuple[str, int]]: print('DEBUG: ', gpus) + # ragas ports !AD HOC! + all_ragas_ids = [0,1] + ragases = np.zeros(max(all_ragas_ids)+1, dtype=np.uint) + ragases[all_ragas_ids] = self.max_workers_per_ragas + ragas_lock = np.zeros(1) + non_ragas_lock = np.zeros(1) + pbar = tqdm(total=len(tasks)) lock = Lock() def submit(task, index): - task = TASKS.build(dict(cfg=task, type=self.task_cfg['type'])) + # OpsEval: give id to each task + get_logger().info(f"[LocalRunner] running a new task with index: {index}") + task = TASKS.build(dict(cfg=task, type=self.task_cfg['type'], tid=index)) num_gpus = task.num_gpus + need_ragas = 1 if hasattr(task, 'need_ragas') and task.need_ragas else 0 + assert len(gpus) >= num_gpus while True: lock.acquire() - if sum(gpus > 0) >= num_gpus: + # If ragas_lock > 0 and not need_ragas: wait + if ragas_lock[0] > 0 and not need_ragas: + # get_logger().info(f"[LocalRunner] ragas_lock {ragas_lock} non_ragas_lock {non_ragas_lock} trying to run {need_ragas}") + lock.release() + time.sleep(1) + continue + # If non_ragas_lock > 0 and need_ragas: wait + if non_ragas_lock[0] > 0 and need_ragas: + # get_logger().info(f"[LocalRunner] ragas_lock {ragas_lock} non_ragas_lock {non_ragas_lock} trying to run {need_ragas}") + lock.release() + time.sleep(1) + continue + + if sum(gpus > 0) >= num_gpus and sum(ragases > 0) >= need_ragas: gpu_ids = np.where(gpus)[0][:num_gpus] gpus[gpu_ids] -= 1 + + ragas_ids = np.where(ragases)[0][:1] + ragases[ragas_ids] -= 1 + + if need_ragas: + ragas_lock[0] += 1 + else: + non_ragas_lock[0] += 1 + lock.release() break lock.release() @@ -148,12 +184,23 @@ def submit(task, index): ','.join(map(str, gpu_ids))) else: tqdm.write(f'launch {task.name} on CPU ') - - res = self._launch(task, gpu_ids, index) + + if need_ragas: + tqdm.write(f'launch {task.name} on RAGAS id {ragas_ids[0]}') + + if len(ragas_ids): + task.ragas_id = ragas_ids[0] + + res = self._launch(task, gpu_ids, ragas_ids, index) pbar.update() with lock: gpus[gpu_ids] += 1 + ragases[ragas_ids] += 1 + if need_ragas: + ragas_lock[0] -= 1 + else: + non_ragas_lock[0] -= 1 return res @@ -163,7 +210,7 @@ def submit(task, index): return status - def _launch(self, task, gpu_ids, index): + def _launch(self, task, gpu_ids, ragas_ids, index): """Launch a single task. Args: diff --git a/opencompass/tasks/openicl_eval.py b/opencompass/tasks/openicl_eval.py index 01cd424..b313f37 100644 --- a/opencompass/tasks/openicl_eval.py +++ b/opencompass/tasks/openicl_eval.py @@ -65,19 +65,28 @@ class OpenICLEvalTask(BaseTask): log_subdir = 'logs/eval' output_subdir = 'results' - def __init__(self, cfg: ConfigDict): + def __init__(self, cfg: ConfigDict, tid: int, ragas_id: int = None): super().__init__(cfg) + self.tid = tid + self.ragas_id = ragas_id self.logger = get_logger() self.num_gpus = max( c.get('eval_cfg', {}).get('num_gpus', 0) for c in sum(self.dataset_cfgs, [])) + self.need_ragas = max( + c.get('eval_cfg', {}).get('need_ragas', False) + for c in sum(self.dataset_cfgs, []) + ) self.dump_details = cfg.get('eval', {}).get('runner', {}).get( 'task', {}).get('dump_details', False) def get_command(self, cfg_path, template): script_path = __file__ python = 'python3' if which('python3') else 'python' - command = f'{python} {script_path} {cfg_path}' + if hasattr(self, 'ragas_id'): + command = f'{python} {script_path} {cfg_path} {self.tid} {self.ragas_id}' + else: + command = f'{python} {script_path} {cfg_path} {self.tid}' return template.format(task_cmd=command) def run(self): @@ -200,6 +209,16 @@ def postprocess(sample): ] icl_evaluator = ICL_EVALUATORS.build(self.eval_cfg['evaluator']) + + #TODO: assign the evaluator with different port + self.logger.info(f"[DEBUG OpenICLEvalTask] tid: {self.tid}") + if hasattr(icl_evaluator, 'ragas_config'): + icl_evaluator.ragas_config['ragas_port'] = 12310 + (self.tid % 4) + if hasattr(self, 'ragas_id') and self.ragas_id: + self.logger.info(f"[DEBUG OpenICLEvalTask] Allocated ragas_id: {self.ragas_id}") + icl_evaluator.ragas_config['ragas_port'] = 12310 + (int(self.ragas_id) % 4) + self.logger.info(f"[DEBUG OpenICLEvalTask] Setting ragas_port to: {icl_evaluator.ragas_config['ragas_port']}") + # need results dir to save other files out_path = get_infer_output_path( self.model_cfg, self.dataset_cfg, @@ -351,6 +370,8 @@ def filters(origins): def parse_args(): parser = argparse.ArgumentParser(description='Score Calculator') parser.add_argument('config', help='Config file path') + parser.add_argument('tid', help='Task ID', default=0, type=int) + parser.add_argument('ragas_id', help='RAGAS ID', default=None) args = parser.parse_args() return args @@ -358,8 +379,10 @@ def parse_args(): if __name__ == '__main__': args = parse_args() cfg = Config.fromfile(args.config) + tid = args.tid + ragas_id = args.ragas_id start_time = time.time() - inferencer = OpenICLEvalTask(cfg) + inferencer = OpenICLEvalTask(cfg, tid, ragas_id) inferencer.run() end_time = time.time() get_logger().info(f'time elapsed: {end_time - start_time:.2f}s') diff --git a/opencompass/tasks/openicl_infer.py b/opencompass/tasks/openicl_infer.py index c74c227..ba97876 100644 --- a/opencompass/tasks/openicl_infer.py +++ b/opencompass/tasks/openicl_infer.py @@ -27,7 +27,7 @@ class OpenICLInferTask(BaseTask): log_subdir = 'logs/infer' output_subdir = 'predictions' - def __init__(self, cfg: ConfigDict): + def __init__(self, cfg: ConfigDict, tid: int = 0): super().__init__(cfg) run_cfg = self.model_cfgs[0].get('run_cfg', {}) self.num_gpus = run_cfg.get('num_gpus', 0)