Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Memory leak in task serialization #16

Open
gench opened this issue May 1, 2020 · 3 comments
Open

Memory leak in task serialization #16

gench opened this issue May 1, 2020 · 3 comments

Comments

@gench
Copy link

gench commented May 1, 2020

In the parallelization of multiple tasks, the memory take of the spark driver is increased by the memory requirement of each task. I think the problem is in apply_async function which keeps the reference to the serialised pickle objects so the driver easily goes out of memory when the number_of_tasks x memory_take_of_a_task > driver_memory.

@WeichenXu123
Copy link
Collaborator

WeichenXu123 commented Jun 25, 2020

@gench This should be your code issue.

in apply_async function which keeps the reference to the serialised pickle objects

The serialized func object will only include function code bytes and some param data which used to run tasks. When tasks launched in spark executor, it will allocate memory in executor side.

@gench
Copy link
Author

gench commented Jul 9, 2020

The serialized func object will only include function code bytes and some param data which used to run tasks.

That is the problem.

My function takes a big dataframe generated in the driver as you can see below. Each time the function is serialised for an executor, its memory is not released afterwards. When I run the following code, it takes 8 times the memory of X pandas dataframe.

Parallel(backend="spark", n_jobs=8)(delayed(my_function)(X=X, ...) for fold, (tr_ind, val_ind) in enumerate(cv_iterator))

@WeichenXu123
Copy link
Collaborator

@gench

My function takes a big dataframe

Could you try convert the dataframe into a spark broadcast variable ?

like:
bc_pandas_df = sparkContext.broadcast(pandas_df)
then in remote executed function,
get the broadcast variable value by bc_pandas_df.value

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants