Skip to content

Commit

Permalink
Fix actors
Browse files Browse the repository at this point in the history
Signed-off-by: Igoshev, Iaroslav <[email protected]>
  • Loading branch information
YarShev committed Apr 19, 2024
1 parent aa839bd commit c3dc911
Showing 1 changed file with 3 additions and 13 deletions.
16 changes: 3 additions & 13 deletions modin/core/execution/ray/implementations/pandas_on_ray/io/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -228,11 +228,7 @@ def func(df, **kw): # pragma: no cover
csv_kwargs["path_or_buf"].close()

# each process waits for its turn to write to a file
RayWrapper.materialize(
signals.wait.options(resources=RayCustomResources.get()).remote(
partition_idx
)
)
RayWrapper.materialize(signals.wait.remote(partition_idx))

# preparing to write data from the buffer to a file
with get_handle(
Expand All @@ -249,18 +245,12 @@ def func(df, **kw): # pragma: no cover
handles.handle.write(content)

# signal that the next process can start writing to the file
RayWrapper.materialize(
signals.send.options(resources=RayCustomResources.get()).remote(
partition_idx + 1
)
)
RayWrapper.materialize(signals.send.remote(partition_idx + 1))
# used for synchronization purposes
return pandas.DataFrame()

# signaling that the partition with id==0 can be written to the file
RayWrapper.materialize(
signals.send.options(resources=RayCustomResources.get()).remote(0)
)
RayWrapper.materialize(signals.send.remote(0))
# Ensure that the metadata is syncrhonized
qc._modin_frame._propagate_index_objs(axis=None)
result = qc._modin_frame._partition_mgr_cls.map_axis_partitions(
Expand Down

0 comments on commit c3dc911

Please sign in to comment.