Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Ray Data]: ray data map_batches leaks memory/doesn't update objectRef count #49757

Open
subodhchhabra opened this issue Jan 10, 2025 · 8 comments
Labels
bug Something that is supposed to be working; but isn't data Ray Data-related issues triage Needs triage (eg: priority, bug/not-bug, and owning component)

Comments

@subodhchhabra
Copy link

subodhchhabra commented Jan 10, 2025

What happened + What you expected to happen

When using ray data map_batches, the memory is not released once the reference to dataset goes out of scope, thus starting to build up memory and ultimately workers getting killed due to OOM.

Versions / Dependencies

Ray: 2.34
Python: 3.9

Reproduction script

In [1]: import ray
   ...: import numpy as np
   ...: from ray.util.state import summarize_actors, summarize_objects, summarize_tasks
   ...: import time
   ...: import gc
   ...:
   ...: class dummyActor:
   ...:     def __init__(self, num) -> None:
   ...:         self.num = num
   ...:
   ...:     def __call__(self, recs):
   ...:         for x in recs:
   ...:             pass
   ...:         return recs
   ...:
   ...: def dummy_transform(arg):
   ...:     arg['item'] =arg['id']*2
   ...:     return arg
   ...:
   ...: if __name__ =="__main__":
   ...:     data = ray.data.range(1000000)
   ...:     data = data.map_batches(dummyActor, fn_constructor_kwargs = {'num':10}, concurrency=2)
   ...:     data = data.map(dummy_transform, concurrency=10)
   ...:     data.write_parquet('local://tmp/datab/')
   ...:     del data
   ...:     gc.collect()
   ...:     time.sleep(1)
   ...:     print(summarize_objects(raise_on_missing_output=False))

output:

In [3]: print(summarize_objects(raise_on_missing_output=False))
{'cluster': {'summary': {'disabled': {'total_objects': 24, 'total_size_mb': 7.7156524658203125, 'total_num_workers': 10, 'total_num_nodes': 1, 'task_state_counts': {'NIL': 24}, 'task_attempt_number_counts': {'1': 24}, 'ref_type_counts': {'PINNED_IN_MEMORY': 24}}}, 'total_objects': 24, 'total_size_mb': 7.7156524658203125, 'callsite_enabled': False, 'summary_by': 'callsite'}}

Issue Severity

High: It blocks me from completing my task.

@subodhchhabra subodhchhabra added bug Something that is supposed to be working; but isn't triage Needs triage (eg: priority, bug/not-bug, and owning component) labels Jan 10, 2025
@jcotant1 jcotant1 added the data Ray Data-related issues label Jan 10, 2025
@richardliaw
Copy link
Contributor

I tried avoiding the nesting of ray data inside ray tasks for this on latest and got:

In [1]: import ray
   ...: import numpy as np
   ...: from ray.util.state import summarize_actors, summarize_objects, summarize_tasks
   ...: import time
   ...: import gc
   ...:
   ...: def dummy_fnc(arg):
   ...:     for i in arg.iter_rows():
   ...:         pass
   ...:     return arg
   ...:
   ...: def dummy_transform(arg):
   ...:     arg['item'] =arg['id']*2
   ...:     return arg
   ...:
   ...: if __name__ =="__main__":
   ...:     data = ray.data.range(1000000)
   ...:     data = data.map_batches(dummy_transform)
   ...:     objref = dummy_fnc(data)
   ...:     del data
   ...:     del objref
   ...:
   ...:     gc.collect()
   ...:     time.sleep(5)
   ...:     print(summarize_objects(raise_on_missing_output=False))
   ...:
Usage stats collection is enabled by default for nightly wheels. To disable this, run the following command: `ray disable-usage-stats` before starting Ray. See https://docs.ray.io/en/master/cluster/usage-stats.html for more details.
2025-01-14 00:20:48,024	INFO worker.py:1833 -- Started a local Ray instance. View the dashboard at 127.0.0.1:8265
2025-01-14 00:20:49,132	INFO streaming_executor.py:108 -- Starting execution of Dataset. Full logs are in /tmp/ray/session_2025-01-14_00-20-46_972340_29396/logs/ray-data
2025-01-14 00:20:49,132	INFO streaming_executor.py:109 -- Execution plan of Dataset: InputDataBuffer[Input] -> TaskPoolMapOperator[ReadRange->MapBatches(dummy_transform)]
✔️  Dataset execution finished in 10.75 seconds: 100%|████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 1.00M/1.00M [00:10<00:00, 93.0k row/s]
- ReadRange->MapBatches(dummy_transform): Tasks: 0; Queued blocks: 0; Resources: 0.0 CPU, 11.4MB object store: 100%|██████████████████████████████████████████████████████████| 1.00M/1.00M [00:10<00:00, 93.0k row/s]
{'cluster': {'summary': {}, 'total_objects': 0, 'total_size_mb': 0.0, 'callsite_enabled': True, 'summary_by': 'callsite'}}

@subodhchhabra
Copy link
Author

subodhchhabra commented Jan 14, 2025

Here is a revised script that I used so I could remove call to ray remote after ray data.

import ray
import numpy as np
from ray.util.state import summarize_actors, summarize_objects, summarize_tasks
import time
import gc

class dummyActor:
    def __init__(self, num) -> None:
        self.num = num

    def __call__(self, recs):
        for x in recs:
            pass
        return recs

@ray.remote
def dummy_fnc(arg):
    for i in arg.iter_rows():
        pass
    return arg

def dummy_transform(arg):
    arg['item'] =arg['id']*2
    return arg

if __name__ =="__main__":
    data = ray.data.range(1000000)
    data = data.map_batches(dummyActor, fn_constructor_kwargs = {'num':10}, concurrency=2)
    data = data.map(dummy_transform, concurrency=10)
    data.write_parquet('local://tmp/datab/')
    del data
    gc.collect()
    time.sleep(1)
    print(summarize_objects(raise_on_missing_output=False))

I still see memory not being released.

2025-01-14 10:07:23,405	INFO streaming_executor.py:108 -- Starting execution of Dataset. Full logs are in /tmp/ray/session_2025-01-14_10-07-21_527086_889374/logs/ray-data
2025-01-14 10:07:23,405	INFO streaming_executor.py:109 -- Execution plan of Dataset: InputDataBuffer[Input] -> ActorPoolMapOperator[ReadRange->MapBatches(dummyActor)] -> TaskPoolMapOperator[Map(dummy_transform)] -> TaskPoolMapOperator[Write]
✔️  Dataset execution finished in 4.02 seconds: 100%|███████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 32.0/32.0 [00:04<00:00, 7.96 row/s]
- ReadRange->MapBatches(dummyActor): Tasks: 0; Actors: 0; Queued blocks: 0; Resources: 0.0 CPU, 0.0B object store; [locality off]: 100%|██████████████████████████████████████████████████████████████████████████████████████████| 1.00M/1.00M [00:03<00:00, 269k row/s]
- Map(dummy_transform): Tasks: 0; Queued blocks: 0; Resources: 0.0 CPU, 0.0B object store: 100%|██████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 1.00M/1.00M [00:03<00:00, 269k row/s]
- Write: Tasks: 0; Queued blocks: 0; Resources: 0.0 CPU, 184.0B object store: 100%|█████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 32.0/32.0 [00:03<00:00, 8.61 row/s]
2025-01-14 10:07:27,470	INFO datasink.py:103 -- Write operation succeeded. Aggregated write results:
	- num_rows: 1000000
	- size_bytes: 16000000

{'cluster': {'summary': {'disabled': {'total_objects': 28, 'total_size_mb': 6.75433349609375, 'total_num_workers': 13, 'total_num_nodes': 1, 'task_state_counts': {'NIL': 28}, 'task_attempt_number_counts': {'1': 28}, 'ref_type_counts': {'PINNED_IN_MEMORY': 28}}}, 'total_objects': 28, 'total_size_mb': 6.75433349609375, 'callsite_enabled': False, 'summary_by': 'callsite'}}```

@richardliaw
Copy link
Contributor

OK thanks. This is reproducible. Does this cause issues / OOM your pipeline?

@raulchen
Copy link
Contributor

I've seen and debugged into this before. The problem is that some objects are still being referenced by idle workers.
I don't remember the exact reason. but IIRC, it's not a big issue.
there are only a few small objects per idle worker. and when the idle workers are GC'ed. the objects will be freed as well.
cc @jjyao in case you remember this.

@subodhchhabra
Copy link
Author

This does cause OOM for me in case of a long running processes, these process continue to build up over a period of time and eventually are killed OOM. Also to add, in such scenario, ray tries to recreate these worker processes(with last know state, ie. including ref's that should be released), back to it's last known state, and that makes whole cluster unusable.

@subodhchhabra
Copy link
Author

@richardliaw , hey any pointers/insights that I can use to free up memory(forced GC?) to have a long running task

@h094071
Copy link

h094071 commented Jan 25, 2025

Not sure if it's the same issue. I'm using the code below, processing 20TB of data, with uneven data file sizes, up to 38GB, after running for more than 10 hours. A large number of ray::IDLEs appear, and each process takes up a lot of memory.

import s3fs
import ray , os 
import pandas as pd
os.environ["RAY_DATA_PUSH_BASED_SHUFFLE"] = "1"
cols = ['text', 'resource', 'source_id', 'synthetic_score', 'source', 'data_type', 'field', 'resource_id']
ds=ray.data.read_json("flagchat/ldwang/CCI4-Pool/", filesystem=filesystem,ray_remote_args={"memory":75*2**30, "resources": {"aaa": 0.6}}, file_extensions=["jsonl"], include_paths=True)
def aa(batch):
    for column in cols:
        if column not in batch.columns:
            batch[column] = None 
    batch['path'] = batch['path'].apply(lambda x: "/".join(x.split("/")[3:-1]))
    batch = batch[cols]
    return batch
ds = ds.map_batches(aa,batch_format="pandas", memory=20*2**30)
ds.write_parquet("flagchat/ldwang/CCI4-Pool_new2/", num_rows_per_file=400000, filesystem=filesystem)

Image
As shown in the figure, almost all memory is occupied by IDLE process

@h094071
Copy link

h094071 commented Jan 25, 2025

Ray data groupby also has similar problems. The Ray version I use is 2.40

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something that is supposed to be working; but isn't data Ray Data-related issues triage Needs triage (eg: priority, bug/not-bug, and owning component)
Projects
None yet
Development

No branches or pull requests

5 participants