Skip to content

Commit

Permalink
Service/match api (#431)
Browse files Browse the repository at this point in the history
* temp

* match api call

* match api call

* pre-commit

* decouple API args & add type hints

* agentscope demo

* update demos

* update pre-commt

* add yaml

* update notebooks

* add py

* refine

---------

Co-authored-by: null <[email protected]>
Co-authored-by: gece.gc <[email protected]>
  • Loading branch information
3 people authored Oct 9, 2024
1 parent 3315358 commit eca52dc
Show file tree
Hide file tree
Showing 13 changed files with 1,647 additions and 68 deletions.
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ exclude: |
(?x)^(
docs/.*|
tests/.*|
demos/.*|
demos/(?!api_service/).*|
tools/mm_eval/inception_metrics/.*|
thirdparty/easy_animate/.*|
.*\.md
Expand Down
8 changes: 4 additions & 4 deletions data_juicer/config/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,7 @@ def update_ds_cache_dir_and_related_vars(new_ds_cache_path):
config.DEFAULT_EXTRACTED_DATASETS_PATH)


def init_setup_from_cfg(cfg):
def init_setup_from_cfg(cfg: Namespace):
"""
Do some extra setup tasks after parsing config file or command line.
Expand Down Expand Up @@ -628,7 +628,7 @@ def namespace_to_arg_list(namespace, prefix='', includes=None, excludes=None):
return arg_list


def config_backup(cfg):
def config_backup(cfg: Namespace):
cfg_path = cfg.config[0].absolute
work_dir = cfg.work_dir
target_path = os.path.join(work_dir, os.path.basename(cfg_path))
Expand All @@ -638,7 +638,7 @@ def config_backup(cfg):
shutil.copyfile(cfg_path, target_path)


def display_config(cfg):
def display_config(cfg: Namespace):
import pprint

from tabulate import tabulate
Expand Down Expand Up @@ -790,7 +790,7 @@ def prepare_side_configs(ori_config: Union[str, Namespace, Dict]):
return config


def get_init_configs(cfg: Namespace):
def get_init_configs(cfg: Union[Namespace, Dict]):
"""
set init configs of datajucer for cfg
"""
Expand Down
7 changes: 5 additions & 2 deletions data_juicer/core/analyzer.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,12 +70,14 @@ def __init__(self, cfg: Optional[Namespace] = None):

def run(self,
load_data_np: Optional[PositiveInt] = None,
skip_export: bool = False):
skip_export: bool = False,
skip_return: bool = False):
"""
Running the dataset analysis pipeline.
:param load_data_np: number of workers when loading the dataset.
:param skip_export: whether export the results into disk
:param skip_return: skip return for API called.
:return: analyzed dataset.
"""
# 1. format data
Expand Down Expand Up @@ -134,4 +136,5 @@ def run(self,
)
column_wise_analysis.analyze(skip_export=skip_export)

return dataset
if not skip_return:
return dataset
8 changes: 6 additions & 2 deletions data_juicer/core/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,11 +138,14 @@ def sample_data(self,
else:
raise ValueError(f'Unsupported sample_algo: {sample_algo}')

def run(self, load_data_np: Optional[PositiveInt] = None):
def run(self,
load_data_np: Optional[PositiveInt] = None,
skip_return=False):
"""
Running the dataset process pipeline.
:param load_data_np: number of workers when loading the dataset.
:param skip_return: skip return for API called.
:return: processed dataset.
"""
# 1. format data
Expand Down Expand Up @@ -179,4 +182,5 @@ def run(self, load_data_np: Optional[PositiveInt] = None):
from data_juicer.utils.compress import compress
compress(dataset)

return dataset
if not skip_return:
return dataset
13 changes: 13 additions & 0 deletions demos/api_service/configs/dj_config_template.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# data-juicer config template

# global parameters
project_name: 'dj_agent'
dataset_path: '' # path to your dataset directory or file, specified in the agent
np: 4 # number of subprocess to process your dataset

export_path: '' # path to the output path, specified in the agent
export_original_dataset: true

# process schedule
# a list of several process operators with their arguments, specified in the agent
process: []
21 changes: 21 additions & 0 deletions demos/api_service/configs/model_configs.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
[
{
"config_name": "gpt-4",
"model_type": "openai-chat",
"model_name": "gpt-4",
"api_key": "your API key",
"organization": "your organization name",
"generate_args": {
"temperature": 0.5
}
},
{
"config_name": "dashscope_chat-qwen-max",
"model_type": "dashscope_chat",
"model_name": "qwen-max",
"api_key": "your API key",
"generate_args": {
"temperature": 0.0
}
}
]
603 changes: 603 additions & 0 deletions demos/api_service/react_data_filter_process.ipynb

Large diffs are not rendered by default.

473 changes: 473 additions & 0 deletions demos/api_service/react_data_mapper_process.ipynb

Large diffs are not rendered by default.

150 changes: 150 additions & 0 deletions demos/api_service/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
import datetime
import glob
import importlib
import os
from json import dumps as jdumps
from json import loads as jloads
from typing import Dict, Optional
from urllib.parse import urljoin

import requests
import yaml
from agentscope.message import Msg
from agentscope.service import ServiceToolkit
from loguru import logger
from PIL import Image

DJ_BASE_URL = 'http://localhost:8000'
DJ_CONFIG_TEMPLATE = './configs/dj_config_template.yaml'
DJ_OUTPUT = 'outputs'


def call_data_juicer_api(path: str,
params: Optional[Dict] = None,
json: Optional[Dict] = None):
url = urljoin(DJ_BASE_URL, path)

if json is not None:
response = requests.post(url, params=params, json=json)
else:
response = requests.get(url, params=params)

return jloads(response.text)


def init_config(dataset_path: str, op_name: str, **op_args):
"""
Initialize Data-Juicer config with operator `op_name`.
Args:
dataset_path (`str`):
The input dataset path.
op_name: name of the operator.
op_args: arguments of the operator.
"""
with open(DJ_CONFIG_TEMPLATE) as fin:
dj_config = yaml.safe_load(fin)
dj_config['dataset_path'] = dataset_path
timestamp = datetime.datetime.now().strftime('%Y%m%d_%H%M%S')
dj_config['export_path'] = os.path.join(DJ_OUTPUT, timestamp,
'processed_data.jsonl')
dj_config['process'].append({op_name: op_args})
url_path = '/data_juicer/config/get_init_configs'
try:
res = call_data_juicer_api(url_path, params={'cfg': jdumps(dj_config)})
except Exception as e:
error_msg = f'An unexpected error occurred in calling {url_path}:\n{e}'
raise RuntimeError(error_msg)
return res['result']


def execute_analyzer(dj_config: dict):
"""
Execute data-juicer analyzer.
Args:
dj_config: configs of data-juicer
"""
logger.chat(Msg(name='system', content='Analyzing data...', role='system'))
url_path = '/data_juicer/core/Analyzer/run'
try:
res = call_data_juicer_api(url_path,
params={'skip_return': True},
json={'cfg': jdumps(dj_config)})
print(res)
assert res['status'] == 'success'
return dj_config['export_path']
except Exception as e:
error_msg = f'An unexpected error occurred in calling {url_path}:\n{e}'
raise RuntimeError(error_msg)


def show_analyzed_results(analyzed_result_path: str,
require_min=True,
require_max=True):
"""
Show the analyzed results to the users and get the specified thresholds.
Args:
analyzed_result_path (`str`):
The analyzed result path.
"""

if os.path.isfile(analyzed_result_path):
analyzed_result_path = os.path.join(
os.path.dirname(analyzed_result_path), 'analysis')

hist_file = max(glob.glob(os.path.join(analyzed_result_path, '*hist.png')),
key=os.path.getctime,
default=None)

if hist_file is not None:
img = Image.open(hist_file)
img.show()
min_threshold, max_threshold = 0, 0
if require_min:
min_threshold = float(
input('Based on above analyzed results, '
'enter the minimum threshold value for filter: '))
if require_max:
max_threshold = float(
input('Based on above analyzed results, '
'enter the maximum threshold value for filter: '))
return min_threshold, max_threshold
else:
error_msg = f'Error in showing analyzed result: {analyzed_result_path}'
raise RuntimeError(error_msg)


def execute_config(dj_config: Dict):
"""
Execute data-juicer data process.
Args:
dj_config: configs of data-juicer
"""
logger.chat(Msg(name='system', content='Processing data...',
role='system'))
url_path = '/data_juicer/core/Executor/run'
try:
res = call_data_juicer_api(url_path,
params={'skip_return': True},
json={'cfg': jdumps(dj_config)})
print(res)
assert res['status'] == 'success'
return dj_config['export_path']
except Exception as e:
error_msg = f'An unexpected error occurred in calling {url_path}:\n{e}'
raise RuntimeError(error_msg)


def setup_service_toolkit(module_name, service_toolkit=None):
if service_toolkit is None:
service_toolkit = ServiceToolkit()
module = importlib.import_module(module_name)
for name in dir(module):
if name.startswith('execute_'):
obj = getattr(module, name)
if callable(obj):
service_toolkit.add(obj)
return service_toolkit
Loading

0 comments on commit eca52dc

Please sign in to comment.