Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Perf: load data systems on rank 0 #4478

Merged
merged 11 commits into from
Dec 26, 2024
Merged

Perf: load data systems on rank 0 #4478

merged 11 commits into from
Dec 26, 2024

Conversation

caic99
Copy link
Member

@caic99 caic99 commented Dec 19, 2024

The current implementation loads data on each rank. This will stress the file system.
In this PR, only rank 0 will load data systems, and it will be broadcasted to each rank.
The data sampler initialized later will still use the exclusive seed of each rank.

Summary by CodeRabbit

  • New Features

    • Enhanced handling of distributed data loading for improved synchronization across processes.
    • Added broadcasting of the constructed dataset to ensure consistency in all processes.
  • Bug Fixes

    • Implemented safeguards to prevent incomplete data distribution by asserting the integrity of the dataset.

Copy link
Contributor

coderabbitai bot commented Dec 19, 2024

📝 Walkthrough
📝 Walkthrough

Walkthrough

The pull request modifies the DpLoaderSet class in the deepmd/pt/utils/dataloader.py file to improve distributed data loading. The changes focus on enhancing the initialization of the self.systems attribute by introducing a process rank-based conditional check. When the global rank is 0, the dataset is constructed using a multiprocessing pool, and the self.systems list is broadcast to all processes using dist.broadcast_object_list(). An assertion is added to ensure complete data distribution.

Changes

File Change Summary
deepmd/pt/utils/dataloader.py - Modified DpLoaderSet class initialization to handle distributed data loading
- Added conditional check for global process rank
- Implemented dist.broadcast_object_list() for synchronizing self.systems
- Added assertion to verify complete data distribution

Possibly related PRs

Suggested reviewers

  • njzjz
  • CaRoLZhangxy
  • wanghan-iapcm

Tip

CodeRabbit's docstrings feature is now available as part of our Early Access Program! Simply use the command @coderabbitai generate docstrings to have CodeRabbit automatically generate docstrings for your pull request. We would love to hear your feedback on Discord.


Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media?

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR. (Beta)
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (3)
deepmd/pt/utils/dataloader.py (3)

96-97: Consider documenting the self.systems initialization more explicitly.

Here, you add a new typed attribute, but it would be helpful to have a docstring or an inline comment indicating that this list will either be populated with real datasets on rank 0 or with dummy placeholders on other ranks. This clarifies the rank-dependent data flow for future maintainers.


103-104: Explore building partial placeholders instead of a full list of None.

Currently, you allocate a “None” list for all systems on non-zero ranks. This is fine, but consider if there's an even lighter or more descriptive placeholder structure (e.g., a small object with essential metadata) to prevent confusion about what these positions represent during debugging. This can help future readers who might assume that "None" indicates an error rather than a deferred load.


105-107: Strengthen your broadcast verification.

The assertion only checks if the last dataset is non-None. Consider verifying that none of the items in self.systems remain None to ensure a complete broadcast. For example:
assert all(s is not None for s in self.systems)
This reduces the risk of partial broadcast issues going unnoticed.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between c0914e1 and 555c3f9.

📒 Files selected for processing (1)
  • deepmd/pt/utils/dataloader.py (1 hunks)
🔇 Additional comments (1)
deepmd/pt/utils/dataloader.py (1)

98-102: Evaluate large-scale broadcasting overhead.

When rank 0 constructs numerous (or very large) datasets and then broadcasts them, it might be memory-intensive or time-consuming in large-scale distributed settings. Consider verifying that each dataset remains lightweight enough for object broadcasting. If necessary, explore an alternative approach (e.g., distributing only minimal metadata) to avoid potential OOM or performance bottlenecks.

deepmd/pt/utils/dataloader.py Outdated Show resolved Hide resolved
@wanghan-iapcm wanghan-iapcm requested a review from iProzd December 19, 2024 03:20
@njzjz
Copy link
Member

njzjz commented Dec 19, 2024

In this PR, only rank 0 will load data systems, and it will be broadcasted to each rank

Why do you expect each rank loads the same data?

Copy link

codecov bot commented Dec 19, 2024

Codecov Report

All modified and coverable lines are covered by tests ✅

Project coverage is 84.58%. Comparing base (c0914e1) to head (4c137c3).
Report is 16 commits behind head on devel.

Additional details and impacted files
@@            Coverage Diff             @@
##            devel    #4478      +/-   ##
==========================================
+ Coverage   84.41%   84.58%   +0.17%     
==========================================
  Files         670      675       +5     
  Lines       62147    63580    +1433     
  Branches     3487     3488       +1     
==========================================
+ Hits        52464    53782    +1318     
- Misses       8556     8673     +117     
+ Partials     1127     1125       -2     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@caic99
Copy link
Member Author

caic99 commented Dec 19, 2024

In this PR, only rank 0 will load data systems, and it will be broadcasted to each rank

Why do you expect each rank loads the same data?

Good question. Currently every rank reads the config from the same input.json file, and initializes datasets there.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (1)
deepmd/pt/utils/dataloader.py (1)

96-106: Consider memory management and error handling improvements

While centralizing data loading to rank 0 reduces file system stress, there are several areas for improvement:

  1. Memory Management: Loading all systems on rank 0 before broadcasting could cause memory pressure on that node.
  2. Error Handling: Add explicit error handling for broadcast failures.
  3. Verification: The assertion only checks the last system, consider verifying all systems.

Consider these improvements:

 self.systems: list[DeepmdDataSetForLoader] = []
 global_rank = dist.get_rank() if dist.is_initialized() else 0
 if global_rank == 0:
     log.info(f"Constructing DataLoaders from {len(systems)} systems")
-    with Pool(os.cpu_count()) as pool:
+    # Use a configurable number of workers to avoid overwhelming the system
+    n_workers = min(os.cpu_count(), 8)  # or make this configurable
+    with Pool(n_workers) as pool:
         self.systems = pool.map(construct_dataset, systems)
 else:
     self.systems = [None] * len(systems)  # type: ignore
 if dist.is_initialized():
-    dist.broadcast_object_list(self.systems)
-    assert self.systems[-1] is not None
+    try:
+        dist.broadcast_object_list(self.systems)
+        # Verify all systems were received correctly
+        assert all(system is not None for system in self.systems), "Some systems failed to broadcast"
+    except Exception as e:
+        log.error(f"Failed to broadcast systems: {str(e)}")
+        raise
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 555c3f9 and 1dcc5d5.

📒 Files selected for processing (1)
  • deepmd/pt/utils/dataloader.py (1 hunks)
🔇 Additional comments (1)
deepmd/pt/utils/dataloader.py (1)

96-106: Distributed design explanation: Same data, different sampling

The implementation correctly handles distributed training by:

  1. Loading data once on rank 0 and broadcasting to all ranks (reducing I/O)
  2. Using separate DistributedSampler instances per rank (ensuring different samples per rank)

This is a standard pattern in distributed deep learning where the dataset is shared but each rank samples differently.

@njzjz: Regarding your question about why each rank loads the same data - while ranks receive identical data, they use different samplers to access different portions during training. This approach reduces file system load while maintaining training correctness.

Let's verify the distributed sampling behavior:

✅ Verification successful

Distributed implementation is correct and follows best practices

The code correctly implements distributed data loading and sampling:

  1. Data is efficiently loaded once on rank 0 and broadcast to other ranks
  2. Each rank gets its own DistributedSampler instance when distributed mode is active (dist.is_available() and dist.is_initialized())
  3. Samplers are properly initialized with the dataset, ensuring each rank gets different portions of data during training
  4. The implementation follows PyTorch's recommended pattern for distributed training
🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Verify that the code maintains distributed sampling after loading data

# Check for DistributedSampler usage
rg "DistributedSampler" --type py

# Verify sampler initialization is rank-aware
ast-grep --pattern 'DistributedSampler($system)'

Length of output: 315


Script:

#!/bin/bash
# Let's verify the complete context around DistributedSampler usage

# Check the full context of sampler initialization
rg "DistributedSampler" -B 5 -A 5 deepmd/pt/utils/dataloader.py

# Check if there's any rank-specific logic in sampler setup
ast-grep --pattern 'def _setup_samplers(self) {
  $$$
}'

Length of output: 1241

@njzjz
Copy link
Member

njzjz commented Dec 19, 2024

In this PR, only rank 0 will load data systems, and it will be broadcasted to each rank

Why do you expect each rank loads the same data?

Good question. Currently every rank reads the config from the same input.json file, and initializes datasets there.

I don't get your point. I believe the data is loaded asynchronously.

@caic99
Copy link
Member Author

caic99 commented Dec 19, 2024

Why do you expect each rank loads the same data?

I don't get your point. I believe the data is loaded asynchronously.

I see. What you mean is not all ranks are accessing the same data files, and this is correct. The case is when the data is stored in a shared file system with a bandwidth limit.

@njzjz
Copy link
Member

njzjz commented Dec 19, 2024

So it just affects the initialization of the data class? Well, I am not sure how much data is loaded in the initialization step.

@njzjz njzjz added this to the v3.0.1 milestone Dec 21, 2024
@wanghan-iapcm wanghan-iapcm requested a review from njzjz December 25, 2024 01:25
@njzjz njzjz removed this from the v3.0.1 milestone Dec 25, 2024
@caic99 caic99 requested a review from njzjz December 25, 2024 02:25
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (1)
deepmd/pt/utils/dataloader.py (1)

96-106: Implementation looks good with room for robustness improvements.

The changes effectively implement the PR objective of loading data only on rank 0 and broadcasting to other ranks. The implementation is clean and handles both distributed and non-distributed scenarios correctly.

Consider these improvements for handling large datasets:

  1. Memory efficiency: Consider implementing chunked broadcasts for large datasets to manage memory usage better.
  2. Error handling: Add timeout and error handling for broadcast operations:
try:
    if dist.is_initialized():
        # Consider chunking for large datasets
        chunk_size = 10  # or calculate based on system size
        for i in range(0, len(systems), chunk_size):
            chunk = self.systems[i:i + chunk_size]
            with torch.cuda.device('cpu'):
                dist.broadcast_object_list(chunk, timeout=timedelta(minutes=5))
            self.systems[i:i + chunk_size] = chunk
except Exception as e:
    log.error(f"Failed to broadcast systems: {str(e)}")
    raise
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 1dcc5d5 and feb5902.

📒 Files selected for processing (1)
  • deepmd/pt/utils/dataloader.py (1 hunks)
🔇 Additional comments (1)
deepmd/pt/utils/dataloader.py (1)

96-106: Verify sampler independence across ranks.

The implementation preserves sampler independence across ranks as required. Let's verify this behavior:

✅ Verification successful

Sampler independence across ranks is properly implemented

The verification confirms that sampler independence is correctly maintained:

  • Each rank gets its own DistributedSampler instance for each system
  • The seed initialization is handled properly through setup_seed() which sets seeds for:
    • PyTorch's main RNG (torch.manual_seed)
    • CUDA RNG (torch.cuda.manual_seed_all)
    • DeepMD's custom RNG (dp_random.seed)
  • The sampler initialization is independent of the data loading changes, as it occurs after the data broadcast
🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Verify that samplers maintain independence across ranks
# Look for sampler seed initialization or rank-specific sampling logic

# Check for rank-specific sampling logic
rg -A 5 "DistributedSampler|WeightedRandomSampler" deepmd/pt/utils/dataloader.py

# Check for seed-related configurations
rg "seed|random" deepmd/pt/utils/dataloader.py

Length of output: 1197

Co-authored-by: Jinzhe Zeng <[email protected]>
Signed-off-by: Chun Cai <[email protected]>
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (2)
deepmd/pt/utils/dataloader.py (2)

98-103: Add error handling for parallel data loading

While the rank-based loading implementation is efficient, consider adding error handling for the pool.map operation to gracefully handle potential failures during data loading.

 if global_rank == 0:
     log.info(f"Constructing DataLoaders from {len(systems)} systems")
     with Pool(max(1, env.NUM_WORKERS)) as pool:
-        self.systems = pool.map(construct_dataset, systems)
+        try:
+            self.systems = pool.map(construct_dataset, systems)
+        except Exception as e:
+            log.error(f"Failed to load data systems: {str(e)}")
+            raise

96-106: Consider chunked broadcasting for very large datasets

The current implementation successfully achieves the PR's objective of reducing file system stress by loading data only on rank 0. However, for very large datasets, consider implementing a chunked broadcasting mechanism to:

  1. Reduce memory pressure during broadcast
  2. Allow for progress tracking
  3. Enable partial retry on failure

Example approach:

def broadcast_in_chunks(obj_list, chunk_size=1000):
    """Broadcast large lists in chunks to manage memory better"""
    for i in range(0, len(obj_list), chunk_size):
        chunk = obj_list[i:i + chunk_size]
        dist.broadcast_object_list(chunk)
        yield chunk
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between feb5902 and 4c137c3.

📒 Files selected for processing (1)
  • deepmd/pt/utils/dataloader.py (1 hunks)
🔇 Additional comments (2)
deepmd/pt/utils/dataloader.py (2)

96-97: LGTM: Proper initialization with distributed awareness

The initialization of the systems list and rank determination is well-implemented with appropriate fallback for non-distributed scenarios.


104-106: Consider adding size verification and timeout mechanism

The broadcast implementation could benefit from additional safeguards:

  1. Verify the total size of systems before broadcast to prevent OOM
  2. Add a timeout mechanism for the broadcast operation

Let's verify the potential impact of large datasets:

@njzjz njzjz added this pull request to the merge queue Dec 26, 2024
Merged via the queue into deepmodeling:devel with commit cc27a60 Dec 26, 2024
60 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants