Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Service/match api #431

Merged
merged 16 commits into from
Oct 9, 2024
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ exclude: |
(?x)^(
docs/.*|
tests/.*|
demos/.*|
demos/(?!api_service/).*|
tools/mm_eval/inception_metrics/.*|
thirdparty/easy_animate/.*|
.*\.md
Expand Down
8 changes: 4 additions & 4 deletions data_juicer/config/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,7 @@ def update_ds_cache_dir_and_related_vars(new_ds_cache_path):
config.DEFAULT_EXTRACTED_DATASETS_PATH)


def init_setup_from_cfg(cfg):
def init_setup_from_cfg(cfg: Namespace):
"""
Do some extra setup tasks after parsing config file or command line.

Expand Down Expand Up @@ -628,7 +628,7 @@ def namespace_to_arg_list(namespace, prefix='', includes=None, excludes=None):
return arg_list


def config_backup(cfg):
def config_backup(cfg: Namespace):
cfg_path = cfg.config[0].absolute
work_dir = cfg.work_dir
target_path = os.path.join(work_dir, os.path.basename(cfg_path))
Expand All @@ -638,7 +638,7 @@ def config_backup(cfg):
shutil.copyfile(cfg_path, target_path)


def display_config(cfg):
def display_config(cfg: Namespace):
import pprint

from tabulate import tabulate
Expand Down Expand Up @@ -790,7 +790,7 @@ def prepare_side_configs(ori_config: Union[str, Namespace, Dict]):
return config


def get_init_configs(cfg: Namespace):
def get_init_configs(cfg: Union[Namespace, Dict]):
"""
set init configs of datajucer for cfg
"""
Expand Down
7 changes: 5 additions & 2 deletions data_juicer/core/analyzer.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,12 +70,14 @@ def __init__(self, cfg: Optional[Namespace] = None):

def run(self,
load_data_np: Optional[PositiveInt] = None,
skip_export: bool = False):
skip_export: bool = False,
skip_return: bool = False):
"""
Running the dataset analysis pipeline.

:param load_data_np: number of workers when loading the dataset.
:param skip_export: whether export the results into disk
:param skip_return: skip return for API called.
:return: analyzed dataset.
"""
# 1. format data
Expand Down Expand Up @@ -134,4 +136,5 @@ def run(self,
)
column_wise_analysis.analyze(skip_export=skip_export)

return dataset
if not skip_return:
return dataset
8 changes: 6 additions & 2 deletions data_juicer/core/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,11 +138,14 @@ def sample_data(self,
else:
raise ValueError(f'Unsupported sample_algo: {sample_algo}')

def run(self, load_data_np: Optional[PositiveInt] = None):
def run(self,
load_data_np: Optional[PositiveInt] = None,
skip_return=False):
"""
Running the dataset process pipeline.

:param load_data_np: number of workers when loading the dataset.
:param skip_return: skip return for API called.
:return: processed dataset.
"""
# 1. format data
Expand Down Expand Up @@ -179,4 +182,5 @@ def run(self, load_data_np: Optional[PositiveInt] = None):
from data_juicer.utils.compress import compress
compress(dataset)

return dataset
if not skip_return:
return dataset
13 changes: 13 additions & 0 deletions demos/api_service/configs/dj_config_template.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# data-juicer config template

# global parameters
project_name: 'dj_agent'
dataset_path: '' # path to your dataset directory or file, specified in the agent
np: 4 # number of subprocess to process your dataset

export_path: '' # path to the output path, specified in the agent
export_original_dataset: true

# process schedule
# a list of several process operators with their arguments, specified in the agent
process: []
21 changes: 21 additions & 0 deletions demos/api_service/configs/model_configs.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
[
{
"config_name": "gpt-4",
"model_type": "openai-chat",
"model_name": "gpt-4",
"api_key": "your API key",
"organization": "your organization name",
"generate_args": {
"temperature": 0.5
}
},
{
"config_name": "dashscope_chat-qwen-max",
"model_type": "dashscope_chat",
"model_name": "qwen-max",
"api_key": "your API key",
"generate_args": {
"temperature": 0.0
}
}
]
603 changes: 603 additions & 0 deletions demos/api_service/react_data_filter_process.ipynb

Large diffs are not rendered by default.

473 changes: 473 additions & 0 deletions demos/api_service/react_data_mapper_process.ipynb

Large diffs are not rendered by default.

150 changes: 150 additions & 0 deletions demos/api_service/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
import datetime
import glob
import importlib
import os
from json import dumps as jdumps
from json import loads as jloads
from typing import Dict, Optional
from urllib.parse import urljoin

import requests
import yaml
from agentscope.message import Msg
from agentscope.service import ServiceToolkit
from loguru import logger
from PIL import Image

DJ_BASE_URL = 'http://localhost:8000'
DJ_CONFIG_TEMPLATE = './configs/dj_config_template.yaml'
DJ_OUTPUT = 'outputs'


def call_data_juicer_api(path: str,
params: Optional[Dict] = None,
json: Optional[Dict] = None):
url = urljoin(DJ_BASE_URL, path)

if json is not None:
response = requests.post(url, params=params, json=json)
else:
response = requests.get(url, params=params)

return jloads(response.text)


def init_config(dataset_path: str, op_name: str, **op_args):
"""
Initialize Data-Juicer config with operator `op_name`.

Args:
dataset_path (`str`):
The input dataset path.
op_name: name of the operator.
op_args: arguments of the operator.
"""
with open(DJ_CONFIG_TEMPLATE) as fin:
dj_config = yaml.safe_load(fin)
dj_config['dataset_path'] = dataset_path
timestamp = datetime.datetime.now().strftime('%Y%m%d_%H%M%S')
dj_config['export_path'] = os.path.join(DJ_OUTPUT, timestamp,
'processed_data.jsonl')
dj_config['process'].append({op_name: op_args})
url_path = '/data_juicer/config/get_init_configs'
try:
res = call_data_juicer_api(url_path, params={'cfg': jdumps(dj_config)})
except Exception as e:
error_msg = f'An unexpected error occurred in calling {url_path}:\n{e}'
raise RuntimeError(error_msg)
return res['result']


def execute_analyzer(dj_config: dict):
"""
Execute data-juicer analyzer.

Args:
dj_config: configs of data-juicer
"""
logger.chat(Msg(name='system', content='Analyzing data...', role='system'))
url_path = '/data_juicer/core/Analyzer/run'
try:
res = call_data_juicer_api(url_path,
params={'skip_return': True},
json={'cfg': jdumps(dj_config)})
print(res)
assert res['status'] == 'success'
return dj_config['export_path']
except Exception as e:
error_msg = f'An unexpected error occurred in calling {url_path}:\n{e}'
raise RuntimeError(error_msg)


def show_analyzed_results(analyzed_result_path: str,
require_min=True,
require_max=True):
"""
Show the analyzed results to the users and get the specified thresholds.

Args:
analyzed_result_path (`str`):
The analyzed result path.
"""

if os.path.isfile(analyzed_result_path):
analyzed_result_path = os.path.join(
os.path.dirname(analyzed_result_path), 'analysis')

hist_file = max(glob.glob(os.path.join(analyzed_result_path, '*hist.png')),
key=os.path.getctime,
default=None)

if hist_file is not None:
img = Image.open(hist_file)
img.show()
min_threshold, max_threshold = 0, 0
if require_min:
min_threshold = float(
input('Based on above analyzed results, '
'enter the minimum threshold value for filter: '))
if require_max:
max_threshold = float(
input('Based on above analyzed results, '
'enter the maximum threshold value for filter: '))
return min_threshold, max_threshold
else:
error_msg = f'Error in showing analyzed result: {analyzed_result_path}'
raise RuntimeError(error_msg)


def execute_config(dj_config: Dict):
"""
Execute data-juicer data process.

Args:
dj_config: configs of data-juicer
"""
logger.chat(Msg(name='system', content='Processing data...',
role='system'))
url_path = '/data_juicer/core/Executor/run'
try:
res = call_data_juicer_api(url_path,
params={'skip_return': True},
json={'cfg': jdumps(dj_config)})
print(res)
assert res['status'] == 'success'
return dj_config['export_path']
except Exception as e:
error_msg = f'An unexpected error occurred in calling {url_path}:\n{e}'
raise RuntimeError(error_msg)


def setup_service_toolkit(module_name, service_toolkit=None):
if service_toolkit is None:
service_toolkit = ServiceToolkit()
module = importlib.import_module(module_name)
for name in dir(module):
if name.startswith('execute_'):
obj = getattr(module, name)
if callable(obj):
service_toolkit.add(obj)
return service_toolkit
Loading
Loading