Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add more unittest #304

Merged
merged 30 commits into from
Jun 26, 2024
Merged
Show file tree
Hide file tree
Changes from 28 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 65 additions & 0 deletions .github/workflows/docker/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
version: '3'
services:
ray-head:
image: data-juicer-unittest:0.2.0
pull_policy: never
command: ray start --head --dashboard-host 0.0.0.0 --include-dashboard true --block
environment:
- HF_HOME=/data/huggingface
- HF_ENDPOINT=https://hf-mirror.com
- TORCH_HOME=/data/torch
- NLTK_DATA=/data/nltk
- DATA_JUICER_CACHE_HOME=/data/dj
- RAY_ADDRESS=auto
working_dir: /workspace
networks:
- ray-network
volumes:
- huggingface_cache:/data
- ../../..:/workspace
ports:
- "6379:6379"
- "8265:8265"
shm_size: "64G"
deploy:
resources:
reservations:
devices:
- driver: nvidia
device_ids: ['0', '1']
capabilities: [gpu]

ray-worker:
image: data-juicer-unittest:0.2.0
pan-x-c marked this conversation as resolved.
Show resolved Hide resolved
pull_policy: never
command: ray start --address=ray-head:6379 --block
environment:
- HF_HOME=/data/huggingface
- HF_ENDPOINT=https://hf-mirror.com
- TORCH_HOME=/data/torch
- NLTK_DATA=/data/nltk
- DATA_JUICER_CACHE_HOME=/data/dj
working_dir: /workspace
volumes:
- huggingface_cache:/data
- ../../..:/workspace
depends_on:
- ray-head
networks:
- ray-network
shm_size: "64G"
deploy:
resources:
reservations:
devices:
- driver: nvidia
device_ids: ['2', '3']
capabilities: [gpu]

networks:
ray-network:
driver: bridge

volumes:
huggingface_cache:
external: true
57 changes: 0 additions & 57 deletions .github/workflows/unit-test.yml

This file was deleted.

60 changes: 60 additions & 0 deletions .github/workflows/unittest.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
# This workflow will install Python dependencies, run tests and lint with a single version of Python
# For more information see: https://docs.github.com/en/actions/automating-builds-and-tests/building-and-testing-python

name: unittest

on:
workflow_dispatch:
pull_request:
push:
branches:
- main

permissions:
contents: read

jobs:
unittest-single:
runs-on: [self-hosted, linux]
pan-x-c marked this conversation as resolved.
Show resolved Hide resolved
environment: Testing
steps:
- uses: actions/checkout@v3
with:
path: dj-${{ github.run_id }}

- name: Setup docker compose
working-directory: dj-${{ github.run_id }}/.github/workflows/docker
run: |
docker compose up -d

- name: Install data-juicer
working-directory: dj-${{ github.run_id }}/.github/workflows/docker
run: |
docker compose exec ray-head pip install -e .\[all\]
docker compose exec ray-worker pip install -e .\[all\]

- name: Clean dataset cache
working-directory: dj-${{ github.run_id }}/.github/workflows/docker
run: |
docker compose exec ray-head rm -rf /data/huggingface/dataset

- name: Run unittest standalone
working-directory: dj-${{ github.run_id }}/.github/workflows/docker
run: |
docker compose exec ray-head python tests/run.py --tag standalone

- name: Run unittest ray
working-directory: dj-${{ github.run_id }}/.github/workflows/docker
run: |
docker compose exec ray-head python tests/run.py --tag ray

- name: Remove docker compose
working-directory: dj-${{ github.run_id }}/.github/workflows/docker
if: always()
run: |
docker compose down --remove-orphans

- name: Cleanup workspace
if: always()
run: |
rm -rf dj-${{ github.run_id }}
122 changes: 60 additions & 62 deletions data_juicer/core/ray_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,65 @@ def get_num_gpus(self, op, op_proc):
proc_per_gpu = op_proc / cuda_device_count()
return 1.0 / proc_per_gpu

def run_op(self, op, op_cfg, dataset):
op_name, op_args = list(op_cfg.items())[0]
op_cls = OPERATORS.modules[op_name]
op_proc = calculate_np(self.cfg.np, op, op_name)
num_gpus = self.get_num_gpus(op, op_proc)
use_actor = op.use_actor() or num_gpus
try:
if isinstance(op, Mapper):
if op.is_batched_op():
if use_actor:
dataset = dataset.map_batches(
op_cls,
compute=ActorPoolStrategy(),
concurrency=op_proc,
fn_constructor_kwargs=op_args,
batch_format='pyarrow',
num_gpus=num_gpus,
batch_size=1)
# The batch size here is same as in data.py
else:
dataset = dataset.map_batches(partial(
ray_batch_mapper_wrapper, fn=op.process),
batch_format='pyarrow',
num_gpus=num_gpus,
batch_size=1)
# The batch size here is same as in data.py
else:
if use_actor:
dataset = dataset.map(op_cls,
compute=ActorPoolStrategy(),
concurrency=op_proc,
fn_constructor_kwargs=op_args,
num_gpus=num_gpus)
else:
dataset = dataset.map(op.process, num_gpus=num_gpus)

elif isinstance(op, Filter):
if use_actor:
dataset = dataset.map(op_cls,
compute=ActorPoolStrategy(),
concurrency=op_proc,
fn_constructor_kwargs=op_args,
num_gpus=num_gpus)
else:
dataset = dataset.map(op.compute_stats, num_gpus=num_gpus)
if op.stats_export_path is not None:
dataset.write_json(op.stats_export_path, force_ascii=False)
dataset = dataset.filter(op.process)
else:
logger.error(
'Ray executor only support Filter and Mapper OPs for '
'now')
raise NotImplementedError
except: # noqa: E722
logger.error(f'An error occurred during Op [{op_name}].')
import traceback
traceback.print_exc()
exit(1)

def run(self, load_data_np=None):
"""
Running the dataset process pipeline.
Expand Down Expand Up @@ -140,68 +199,7 @@ def process_batch_arrow(table: pa.Table) -> pa.Table:
logger.info('Processing data...')
tstart = time.time()
for op_cfg, op in zip(self.process_list, self.ops):
op_name, op_args = list(op_cfg.items())[0]
op_cls = OPERATORS.modules[op_name]
op_proc = calculate_np(self.cfg.np, op, op_name)
num_gpus = self.get_num_gpus(op, op_proc)
use_actor = op.use_actor() or num_gpus
try:
if isinstance(op, Mapper):
if op.is_batched_op():
if use_actor:
dataset = dataset.map_batches(
op_cls,
compute=ActorPoolStrategy(),
concurrency=op_proc,
fn_constructor_kwargs=op_args,
batch_format='pyarrow',
num_gpus=num_gpus,
batch_size=1)
# The batch size here is same as in data.py
else:
dataset = dataset.map_batches(
partial(ray_batch_mapper_wrapper,
fn=op.process),
batch_format='pyarrow',
num_gpus=num_gpus,
batch_size=1)
# The batch size here is same as in data.py
else:
if use_actor:
dataset = dataset.map(
op_cls,
compute=ActorPoolStrategy(),
concurrency=op_proc,
fn_constructor_kwargs=op_args,
num_gpus=num_gpus)
else:
dataset = dataset.map(op.process,
num_gpus=num_gpus)

elif isinstance(op, Filter):
if use_actor:
dataset = dataset.map(op_cls,
compute=ActorPoolStrategy(),
concurrency=op_proc,
fn_constructor_kwargs=op_args,
num_gpus=num_gpus)
else:
dataset = dataset.map(op.compute_stats,
num_gpus=num_gpus)
if op.stats_export_path is not None:
dataset.write_json(op.stats_export_path,
force_ascii=False)
dataset = dataset.filter(op.process)
else:
logger.error(
'Ray executor only support Filter and Mapper OPs for '
'now')
raise NotImplementedError
except: # noqa: E722
logger.error(f'An error occurred during Op [{op_name}].')
import traceback
traceback.print_exc()
exit(1)
dataset = self.run_op(op, op_cfg, dataset)

# 4. data export
logger.info('Exporting dataset to disk...')
Expand Down
Loading
Loading