Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

minhash dedup causes local machine to hang. #222

Open
staticpunch opened this issue Jun 17, 2024 · 4 comments
Open

minhash dedup causes local machine to hang. #222

staticpunch opened this issue Jun 17, 2024 · 4 comments

Comments

@staticpunch
Copy link

Currently my goal is to deduplicate ~750GB text (around 750 jsonl files, each is 1GB). My machine has 1TB RAM, 256 CPU cores. I used the following config to run Minhash Deduplication but then my machine hanged for more than 24 hours. I couldn't even Ctrl+C the process that I had to reboot the server.

from datatrove.executor import LocalPipelineExecutor
from datatrove.pipeline.dedup import MinhashDedupSignature
from datatrove.pipeline.dedup.minhash import (
    MinhashConfig,
    MinhashDedupBuckets,
    MinhashDedupCluster,
    MinhashDedupFilter,
)
from datatrove.pipeline.readers import JsonlReader
from datatrove.pipeline.tokens import TokensCounter
from datatrove.pipeline.writers.jsonl import JsonlWriter

# you can also change ngrams or the number of buckets and their size here
minhash_config = MinhashConfig(use_64bit_hashes=True)  # better precision -> fewer false positives (collisions)
MINHASH_BASE_PATH = "minhash/output"
LOGS_FOLDER = "minhash/log"
LOCAL_LOGS_FOLDER = "my_local_folder_for_slurm_logs/"

TOTAL_TASKS = 256
NUM_WORKERS = 128

# this is the original data that we want to deduplicate
INPUT_READER = JsonlReader(
	data_folder="data",
	recursive=True ## I have some subfolders.
)

# stage 1 computes minhash signatures for each task (each task gets a set of files)
stage1 = LocalPipelineExecutor(
    pipeline=[
        INPUT_READER,
        MinhashDedupSignature(
            output_folder=f"{MINHASH_BASE_PATH}/signatures", 
            config=minhash_config
        ),
    ],
    tasks=TOTAL_TASKS,
    workers=NUM_WORKERS,
    logging_dir=f"{LOGS_FOLDER}/signatures",
) 

# stage 2 finds matches between signatures in each bucket
stage2 = LocalPipelineExecutor(
    pipeline=[
        MinhashDedupBuckets(
            input_folder=f"{MINHASH_BASE_PATH}/signatures",
            output_folder=f"{MINHASH_BASE_PATH}/buckets",
            config=minhash_config,
        ),
    ],
    tasks=minhash_config.num_buckets,
    workers=NUM_WORKERS,
    logging_dir=f"{LOGS_FOLDER}/buckets",
    depends=stage1,
)

# stage 3 creates clusters of duplicates using the results from all buckets
stage3 = LocalPipelineExecutor(
    pipeline=[
        MinhashDedupCluster(
            input_folder=f"{MINHASH_BASE_PATH}/buckets",
            output_folder=f"{MINHASH_BASE_PATH}/remove_ids",
            config=minhash_config,
        ),
    ],
    tasks=1,
    workers=NUM_WORKERS,
    logging_dir=f"{LOGS_FOLDER}/clusters",
    depends=stage2,
)

stage4 = LocalPipelineExecutor(
    # job_name="mh4",
    pipeline=[
        INPUT_READER,
        TokensCounter("Llama-3-8B/tokenizer.json"),  # nice way to see how many tokens we had before and after deduplication
        MinhashDedupFilter(
            input_folder=f"{MINHASH_BASE_PATH}/remove_ids",
            exclusion_writer=JsonlWriter(f"{MINHASH_BASE_PATH}/removed"),
        ),
        JsonlWriter(output_folder=f"{MINHASH_BASE_PATH}/deduplicated_output"),
    ],
    tasks=TOTAL_TASKS,
    workers=NUM_WORKERS,
    logging_dir=f"{LOGS_FOLDER}/filter",
    depends=stage3,
)

stage4.run()

I did limit the workers and tasks to be lower than the number of my CPU cores, so I'm pretty clueless what is the reason to cause my server to hang. Please suggest me a better config to run minhash smoothly.

@guipenedo
Copy link
Collaborator

What stage is hanging?

@staticpunch
Copy link
Author

It hanged at stage 2. FYI, it took 6 hours to complete stage 1.

@guipenedo
Copy link
Collaborator

guipenedo commented Jun 17, 2024

I am not super sure about the version we currently have on PyPI, but you should be able to set tasks=minhash_config.num_buckets to a multiple such as tasks=minhash_config.num_buckets * 50 to further parallelize this step.
If it doesn't work or there is any error I recommend using the version currently on our main branch (probably a good idea to rerun from the first step in that case).

@staticpunch
Copy link
Author

Thank you I'll try that.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants