Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add more unittest #304

Merged
merged 30 commits into from
Jun 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 65 additions & 0 deletions .github/workflows/docker/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
version: '3'
services:
ray-head:
image: data-juicer-unittest:0.2.0
pull_policy: never
command: ray start --head --dashboard-host 0.0.0.0 --include-dashboard true --block
environment:
- HF_HOME=/data/huggingface
- HF_ENDPOINT=https://hf-mirror.com
- TORCH_HOME=/data/torch
- NLTK_DATA=/data/nltk
- DATA_JUICER_CACHE_HOME=/data/dj
- RAY_ADDRESS=auto
working_dir: /workspace
networks:
- ray-network
volumes:
- huggingface_cache:/data
- ../../..:/workspace
ports:
- "6379:6379"
- "8265:8265"
shm_size: "64G"
deploy:
resources:
reservations:
devices:
- driver: nvidia
device_ids: ['0', '1']
capabilities: [gpu]

ray-worker:
image: data-juicer-unittest:0.2.0
pan-x-c marked this conversation as resolved.
Show resolved Hide resolved
pull_policy: never
command: ray start --address=ray-head:6379 --block
environment:
- HF_HOME=/data/huggingface
- HF_ENDPOINT=https://hf-mirror.com
- TORCH_HOME=/data/torch
- NLTK_DATA=/data/nltk
- DATA_JUICER_CACHE_HOME=/data/dj
working_dir: /workspace
volumes:
- huggingface_cache:/data
- ../../..:/workspace
depends_on:
- ray-head
networks:
- ray-network
shm_size: "64G"
deploy:
resources:
reservations:
devices:
- driver: nvidia
device_ids: ['2', '3']
capabilities: [gpu]

networks:
ray-network:
driver: bridge

volumes:
huggingface_cache:
external: true
84 changes: 43 additions & 41 deletions .github/workflows/unit-test.yml
Original file line number Diff line number Diff line change
@@ -1,58 +1,60 @@
# This workflow will install Python dependencies, run tests and lint with a single version of Python
# For more information see: https://docs.github.com/en/actions/automating-builds-and-tests/building-and-testing-python

name: Unit Test
name: unittest

on: [push, pull_request, workflow_dispatch]
on:
workflow_dispatch:
pull_request:
push:
branches:
- main

permissions:
contents: read

jobs:
build:

runs-on: ubuntu-latest

unittest-single:
runs-on: [self-hosted, linux]
environment: Testing
steps:
- uses: actions/checkout@v3
- name: Check disk space
run: |
df -h
- name: Set up Python 3.8
uses: actions/setup-python@v3
with:
python-version: "3.8"
- name: Check disk space
path: dj-${{ github.run_id }}

- name: Setup docker compose
working-directory: dj-${{ github.run_id }}/.github/workflows/docker
run: |
df -h
- name: Install dependencies
docker compose up -d

- name: Install data-juicer
working-directory: dj-${{ github.run_id }}/.github/workflows/docker
run: |
sudo apt-get install ffmpeg
python -m pip install --upgrade pip
pip install -v -e .[all]
pip install -v -e .[sandbox]
- name: Increase swapfile
docker compose exec ray-head pip install -e .\[all\]
docker compose exec ray-worker pip install -e .\[all\]

- name: Clean dataset cache
working-directory: dj-${{ github.run_id }}/.github/workflows/docker
run: |
df -h
free -h
sudo swapoff -a
sudo fallocate -l 12G /mnt/swapfile
sudo chmod 600 /mnt/swapfile
sudo mkswap /mnt/swapfile
sudo swapon /mnt/swapfile
sudo swapon --show
- name: Clean data-juicer assets and models after cached
uses: webiny/[email protected]
with:
run: rm -rf ~/.cache/data_juicer
- name: Cache data-juicer assets and models
uses: actions/cache@v3
with:
path: ~/.cache/data_juicer
key: dj-assets-models
- name: Check disk space
docker compose exec ray-head rm -rf /data/huggingface/dataset

- name: Run unittest standalone
working-directory: dj-${{ github.run_id }}/.github/workflows/docker
run: |
docker compose exec ray-head python tests/run.py --tag standalone

- name: Run unittest ray
working-directory: dj-${{ github.run_id }}/.github/workflows/docker
run: |
df -h
- name: Run the test
docker compose exec ray-head python tests/run.py --tag ray

- name: Remove docker compose
working-directory: dj-${{ github.run_id }}/.github/workflows/docker
if: always()
run: |
docker compose down --remove-orphans

- name: Cleanup workspace
if: always()
run: |
python tests/run.py
rm -rf dj-${{ github.run_id }}
122 changes: 60 additions & 62 deletions data_juicer/core/ray_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,65 @@ def get_num_gpus(self, op, op_proc):
proc_per_gpu = op_proc / cuda_device_count()
return 1.0 / proc_per_gpu

def run_op(self, op, op_cfg, dataset):
op_name, op_args = list(op_cfg.items())[0]
op_cls = OPERATORS.modules[op_name]
op_proc = calculate_np(self.cfg.np, op, op_name)
num_gpus = self.get_num_gpus(op, op_proc)
use_actor = op.use_actor() or num_gpus
try:
if isinstance(op, Mapper):
if op.is_batched_op():
if use_actor:
dataset = dataset.map_batches(
op_cls,
compute=ActorPoolStrategy(),
concurrency=op_proc,
fn_constructor_kwargs=op_args,
batch_format='pyarrow',
num_gpus=num_gpus,
batch_size=1)
# The batch size here is same as in data.py
else:
dataset = dataset.map_batches(partial(
ray_batch_mapper_wrapper, fn=op.process),
batch_format='pyarrow',
num_gpus=num_gpus,
batch_size=1)
# The batch size here is same as in data.py
else:
if use_actor:
dataset = dataset.map(op_cls,
compute=ActorPoolStrategy(),
concurrency=op_proc,
fn_constructor_kwargs=op_args,
num_gpus=num_gpus)
else:
dataset = dataset.map(op.process, num_gpus=num_gpus)

elif isinstance(op, Filter):
if use_actor:
dataset = dataset.map(op_cls,
compute=ActorPoolStrategy(),
concurrency=op_proc,
fn_constructor_kwargs=op_args,
num_gpus=num_gpus)
else:
dataset = dataset.map(op.compute_stats, num_gpus=num_gpus)
if op.stats_export_path is not None:
dataset.write_json(op.stats_export_path, force_ascii=False)
dataset = dataset.filter(op.process)
else:
logger.error(
'Ray executor only support Filter and Mapper OPs for '
'now')
raise NotImplementedError
except: # noqa: E722
logger.error(f'An error occurred during Op [{op_name}].')
import traceback
traceback.print_exc()
exit(1)

def run(self, load_data_np=None):
"""
Running the dataset process pipeline.
Expand Down Expand Up @@ -140,68 +199,7 @@ def process_batch_arrow(table: pa.Table) -> pa.Table:
logger.info('Processing data...')
tstart = time.time()
for op_cfg, op in zip(self.process_list, self.ops):
op_name, op_args = list(op_cfg.items())[0]
op_cls = OPERATORS.modules[op_name]
op_proc = calculate_np(self.cfg.np, op, op_name)
num_gpus = self.get_num_gpus(op, op_proc)
use_actor = op.use_actor() or num_gpus
try:
if isinstance(op, Mapper):
if op.is_batched_op():
if use_actor:
dataset = dataset.map_batches(
op_cls,
compute=ActorPoolStrategy(),
concurrency=op_proc,
fn_constructor_kwargs=op_args,
batch_format='pyarrow',
num_gpus=num_gpus,
batch_size=1)
# The batch size here is same as in data.py
else:
dataset = dataset.map_batches(
partial(ray_batch_mapper_wrapper,
fn=op.process),
batch_format='pyarrow',
num_gpus=num_gpus,
batch_size=1)
# The batch size here is same as in data.py
else:
if use_actor:
dataset = dataset.map(
op_cls,
compute=ActorPoolStrategy(),
concurrency=op_proc,
fn_constructor_kwargs=op_args,
num_gpus=num_gpus)
else:
dataset = dataset.map(op.process,
num_gpus=num_gpus)

elif isinstance(op, Filter):
if use_actor:
dataset = dataset.map(op_cls,
compute=ActorPoolStrategy(),
concurrency=op_proc,
fn_constructor_kwargs=op_args,
num_gpus=num_gpus)
else:
dataset = dataset.map(op.compute_stats,
num_gpus=num_gpus)
if op.stats_export_path is not None:
dataset.write_json(op.stats_export_path,
force_ascii=False)
dataset = dataset.filter(op.process)
else:
logger.error(
'Ray executor only support Filter and Mapper OPs for '
'now')
raise NotImplementedError
except: # noqa: E722
logger.error(f'An error occurred during Op [{op_name}].')
import traceback
traceback.print_exc()
exit(1)
dataset = self.run_op(op, op_cfg, dataset)

# 4. data export
logger.info('Exporting dataset to disk...')
Expand Down
80 changes: 80 additions & 0 deletions data_juicer/utils/unittest_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,30 @@
import shutil
import unittest

import numpy
import pyarrow as pa
import ray.data as rd
from datasets import Dataset

from data_juicer.ops import Filter
from data_juicer.utils.constant import Fields
from data_juicer.utils.registry import Registry

SKIPPED_TESTS = Registry('SkippedTests')


def TEST_TAG(*tags):
pan-x-c marked this conversation as resolved.
Show resolved Hide resolved
"""Tags for test case.
Currently, `standalone`, `ray` are supported.
"""

def decorator(func):
setattr(func, '__test_tags__', tags)
return func

return decorator


class DataJuicerTestCaseBase(unittest.TestCase):

@classmethod
Expand All @@ -32,3 +51,64 @@ def tearDownClass(cls, hf_model_name=None) -> None:
if os.path.exists(transformers.TRANSFORMERS_CACHE):
print('CLEAN all TRANSFORMERS_CACHE')
shutil.rmtree(transformers.TRANSFORMERS_CACHE)

def generate_dataset(self, data):
"""Generate dataset for a specific executor.

Args:
type (str, optional): "standalone" or "ray".
Defaults to "standalone".
"""
if self.current_tag.startswith('standalone'):
return Dataset.from_list(data)
elif self.current_tag.startswith('ray'):
dataset = rd.from_items(data)
if Fields.stats not in dataset.columns(fetch_if_missing=False):

def process_batch_arrow(table: pa.Table) -> pa.Table:
new_column_data = [{} for _ in range(len(table))]
new_talbe = table.append_column(Fields.stats,
[new_column_data])
return new_talbe

dataset = dataset.map_batches(process_batch_arrow,
batch_format='pyarrow')
return dataset
else:
raise ValueError('Unsupported type')

def run_single_op(self, dataset, op, column_names):
"""Run operator in the specific executor."""
if self.current_tag.startswith('standalone'):
if isinstance(op, Filter) and Fields.stats not in dataset.features:
dataset = dataset.add_column(name=Fields.stats,
column=[{}] * dataset.num_rows)
dataset = dataset.map(op.compute_stats)
dataset = dataset.filter(op.process)
dataset = dataset.select_columns(column_names=column_names)
return dataset.to_list()
elif self.current_tag.startswith('ray'):
dataset = dataset.map(op.compute_stats)
dataset = dataset.filter(op.process)
dataset = dataset.to_pandas().get(column_names)
if dataset is None:
return []
return dataset.to_dict(orient='records')
else:
raise ValueError('Unsupported type')

def assertDatasetEqual(self, first, second):

def convert_record(rec):
for key in rec.keys():
# Convert incomparable `list` to comparable `tuple`
if isinstance(rec[key], numpy.ndarray) or isinstance(
rec[key], list):
rec[key] = tuple(rec[key])
return rec

first = [convert_record(d) for d in first]
second = [convert_record(d) for d in second]
first = sorted(first, key=lambda x: tuple(sorted(x.items())))
second = sorted(second, key=lambda x: tuple(sorted(x.items())))
return self.assertEqual(first, second)
Loading
Loading