You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Hi, I have done an experiment and found Ray.data.write_parquet will write twice when use fsspec local filesystem .
step1: I created a file named local.py and the code is from https://github.com/fsspec/filesystem_spec/blob/master/fsspec/implementations/local.py. At the same time, I added some printing statements in method init() and write() from LocalFileSystem.
step2: I wrote the test code:
fsspec_fs = LocalFileSystem()
filesystem = PyFileSystem(FSSpecHandler(fsspec_fs))
ds = ray.data.range(1)
ds.write_parquet(
path="./test",
filesystem=afs_filesystem
)
step3: I found this information:
LocalFileSystem it is init method
2025-01-09 23:26:14,389 INFO streaming_executor.py:108 -- Starting execution of Dataset. Full logs are in /tmp/ray/session_2025-01-09_21-01-50_741900_2400895/logs/ray-data
2025-01-09 23:26:14,389 INFO streaming_executor.py:109 -- Execution plan of Dataset: InputDataBuffer[Input] -> TaskPoolMapOperator[ReadRange] -> TaskPoolMapOperator[Write]
(Write pid=3061491) LocalFileSystem it is init method
(Write pid=3061491) LocalFileSystem: it is _open method and the mode is wb, path is /home/ray/code/afs_demo/file/45_000000_000000.parquet
(Write pid=3061491) it is file close
(Write pid=3061491) it is file close
(Write pid=3061491) it is write method
(Write pid=3061491) args: (b'PAR1',)
(Write pid=3061491) kwargs: {}
(Write pid=3061491) it is write method
(Write pid=3061491) args: (b'\x15\x04\x15\x10\x15\x14L\x15\x02\x15\x00\x12\x00\x00',)
(Write pid=3061491) kwargs: {}
(Write pid=3061491) it is write method
(Write pid=3061491) args: (b'\x08\x1c\x00\x00\x00\x00\x00\x00\x00\x00',)
(Write pid=3061491) kwargs: {}
(Write pid=3061491) it is write method
(Write pid=3061491) args: (b'\x15\x00\x15\x12\x15\x16,\x15\x02\x15\x10\x15\x06\x15\x06\x1c\x18\x08\x00\x00\x00\x00\x00\x00\x00\x00\x18\x08\x00\x00\x00\x00\x00\x00\x00\x00\x16\x00(\x08\x00\x00\x00\x00\x00\x00\x00\x00\x18\x08\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00',)
(Write pid=3061491) kwargs: {}
(Write pid=3061491) it is write method
(Write pid=3061491) args: (b'\t \x02\x00\x00\x00\x02\x01\x01\x02\x00',)
(Write pid=3061491) kwargs: {}
(Write pid=3061491) it is write method
(Write pid=3061491) args: (b'\x15\x04\x19,5\x00\x18\x06schema\x15\x02\x00\x15\x04%\x02\x18\x02id\x00\x16\x02\x19\x1c\x19\x1c&\x00\x1c\x15\x04\x195\x00\x06\x10\x19\x18\x02id\x15\x02\x16\x02\x16\xb8\x01\x16\xc0\x01&8&\x08\x1c\x18\x08\x00\x00\x00\x00\x00\x00\x00\x00\x18\x08\x00\x00\x00\x00\x00\x00\x00\x00\x16\x00(\x08\x00\x00\x00\x00\x00\x00\x00\x00\x18\x08\x00\x00\x00\x00\x00\x00\x00\x00\x00\x19,\x15\x04\x15\x00\x15\x02\x00\x15\x00\x15\x10\x15\x02\x00\x00\x00\x16\xb8\x01\x16\x02&\x08\x16\xc0\x01\x14\x00\x00\x19\x1c\x18\x0cARROW:schema\x18\xac\x01/////3gAAAAQAAAAAAAKAAwABgAFAAgACgAAAAABBAAMAAAACAAIAAAABAAIAAAABAAAAAEAAAAUAAAAEAAUAAgABgAHAAwAAAAQABAAAAAAAAECEAAAABwAAAAEAAAAAAAAAAIAAABpZAAACAAMAAgABwAIAAAAAAAAAUAAAAA=\x00\x18 parquet-cpp-arrow version 18.1.0\x19\x1c\x1c\x00\x00\x00',)
(Write pid=3061491) kwargs: {}
(Write pid=3061491) it is write method
(Write pid=3061491) args: (b'p\x01\x00\x00',)
(Write pid=3061491) kwargs: {}
(Write pid=3061491) it is write method
(Write pid=3061491) args: (b'PAR1',)
(Write pid=3061491) kwargs: {}
(Write pid=3061491) it is file close
(Write pid=3061491) it is file close
(Write pid=3061491) LocalFileSystem it is init method
(Write pid=3061491) LocalFileSystem: it is _open method and the mode is wb, path is /home/ray/code/afs_demo/file/45_000000_000000.parquet
(Write pid=3061491) it is file close
(Write pid=3061491) it is file close
(Write pid=3061491) it is write method
(Write pid=3061491) args: (b'PAR1',)
(Write pid=3061491) kwargs: {}
(Write pid=3061491) it is write method
(Write pid=3061491) args: (b'\x15\x04\x15\x10\x15\x14L\x15\x02\x15\x00\x12\x00\x00',)
(Write pid=3061491) kwargs: {}
(Write pid=3061491) it is write method
(Write pid=3061491) args: (b'\x08\x1c\x00\x00\x00\x00\x00\x00\x00\x00',)
(Write pid=3061491) kwargs: {}
(Write pid=3061491) it is write method
(Write pid=3061491) args: (b'\x15\x00\x15\x12\x15\x16,\x15\x02\x15\x10\x15\x06\x15\x06\x1c\x18\x08\x00\x00\x00\x00\x00\x00\x00\x00\x18\x08\x00\x00\x00\x00\x00\x00\x00\x00\x16\x00(\x08\x00\x00\x00\x00\x00\x00\x00\x00\x18\x08\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00',)
(Write pid=3061491) kwargs: {}
(Write pid=3061491) it is write method
(Write pid=3061491) args: (b'\t \x02\x00\x00\x00\x02\x01\x01\x02\x00',)
(Write pid=3061491) kwargs: {}
(Write pid=3061491) it is write method
(Write pid=3061491) args: (b'\x15\x04\x19,5\x00\x18\x06schema\x15\x02\x00\x15\x04%\x02\x18\x02id\x00\x16\x02\x19\x1c\x19\x1c&\x00\x1c\x15\x04\x195\x00\x06\x10\x19\x18\x02id\x15\x02\x16\x02\x16\xb8\x01\x16\xc0\x01&8&\x08\x1c\x18\x08\x00\x00\x00\x00\x00\x00\x00\x00\x18\x08\x00\x00\x00\x00\x00\x00\x00\x00\x16\x00(\x08\x00\x00\x00\x00\x00\x00\x00\x00\x18\x08\x00\x00\x00\x00\x00\x00\x00\x00\x00\x19,\x15\x04\x15\x00\x15\x02\x00\x15\x00\x15\x10\x15\x02\x00\x00\x00\x16\xb8\x01\x16\x02&\x08\x16\xc0\x01\x14\x00\x00\x19\x1c\x18\x0cARROW:schema\x18\xac\x01/////3gAAAAQAAAAAAAKAAwABgAFAAgACgAAAAABBAAMAAAACAAIAAAABAAIAAAABAAAAAEAAAAUAAAAEAAUAAgABgAHAAwAAAAQABAAAAAAAAECEAAAABwAAAAEAAAAAAAAAAIAAABpZAAACAAMAAgABwAIAAAAAAAAAUAAAAA=\x00\x18 parquet-cpp-arrow version 18.1.0\x19\x1c\x1c\x00\x00\x00',)
(Write pid=3061491) kwargs: {}
(Write pid=3061491) it is write method
(Write pid=3061491) args: (b'p\x01\x00\x00',)
(Write pid=3061491) kwargs: {}
(Write pid=3061491) it is write method
(Write pid=3061491) args: (b'PAR1',)
(Write pid=3061491) kwargs: {}
(Write pid=3061491) it is file close
(Write pid=3061491) it is file close
(Write pid=2403171) LocalFileSystem it is init method [repeated 4x across cluster] (Ray deduplicates logs by default. Set RAY_DEDUP_LOGS=0 to disable log deduplication, or see https://docs.ray.io/en/master/ray-observability/user-guides/configure-logging.html#log-deduplication for more options.)
This shows the method write_parquet call write task twice.
Medium: It is a significant difficulty but I can work around it.
The text was updated successfully, but these errors were encountered:
Catherine-aka
added
bug
Something that is supposed to be working; but isn't
triage
Needs triage (eg: priority, bug/not-bug, and owning component)
labels
Jan 9, 2025
What happened + What you expected to happen
Hi, I have done an experiment and found Ray.data.write_parquet will write twice when use fsspec local filesystem .
step1: I created a file named local.py and the code is from https://github.com/fsspec/filesystem_spec/blob/master/fsspec/implementations/local.py. At the same time, I added some printing statements in method init() and write() from LocalFileSystem.
step2: I wrote the test code:
fsspec_fs = LocalFileSystem()
filesystem = PyFileSystem(FSSpecHandler(fsspec_fs))
ds = ray.data.range(1)
ds.write_parquet(
path="./test",
filesystem=afs_filesystem
)
step3: I found this information:
LocalFileSystem it is init method
2025-01-09 23:26:14,389 INFO streaming_executor.py:108 -- Starting execution of Dataset. Full logs are in /tmp/ray/session_2025-01-09_21-01-50_741900_2400895/logs/ray-data
2025-01-09 23:26:14,389 INFO streaming_executor.py:109 -- Execution plan of Dataset: InputDataBuffer[Input] -> TaskPoolMapOperator[ReadRange] -> TaskPoolMapOperator[Write]
(Write pid=3061491) LocalFileSystem it is init method
(Write pid=3061491) LocalFileSystem: it is _open method and the mode is wb, path is /home/ray/code/afs_demo/file/45_000000_000000.parquet
(Write pid=3061491) it is file close
(Write pid=3061491) it is file close
(Write pid=3061491) it is write method
(Write pid=3061491) args: (b'PAR1',)
(Write pid=3061491) kwargs: {}
(Write pid=3061491) it is write method
(Write pid=3061491) args: (b'\x15\x04\x15\x10\x15\x14L\x15\x02\x15\x00\x12\x00\x00',)
(Write pid=3061491) kwargs: {}
(Write pid=3061491) it is write method
(Write pid=3061491) args: (b'\x08\x1c\x00\x00\x00\x00\x00\x00\x00\x00',)
(Write pid=3061491) kwargs: {}
(Write pid=3061491) it is write method
(Write pid=3061491) args: (b'\x15\x00\x15\x12\x15\x16,\x15\x02\x15\x10\x15\x06\x15\x06\x1c\x18\x08\x00\x00\x00\x00\x00\x00\x00\x00\x18\x08\x00\x00\x00\x00\x00\x00\x00\x00\x16\x00(\x08\x00\x00\x00\x00\x00\x00\x00\x00\x18\x08\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00',)
(Write pid=3061491) kwargs: {}
(Write pid=3061491) it is write method
(Write pid=3061491) args: (b'\t \x02\x00\x00\x00\x02\x01\x01\x02\x00',)
(Write pid=3061491) kwargs: {}
(Write pid=3061491) it is write method
(Write pid=3061491) args: (b'\x15\x04\x19,5\x00\x18\x06schema\x15\x02\x00\x15\x04%\x02\x18\x02id\x00\x16\x02\x19\x1c\x19\x1c&\x00\x1c\x15\x04\x195\x00\x06\x10\x19\x18\x02id\x15\x02\x16\x02\x16\xb8\x01\x16\xc0\x01&8&\x08\x1c\x18\x08\x00\x00\x00\x00\x00\x00\x00\x00\x18\x08\x00\x00\x00\x00\x00\x00\x00\x00\x16\x00(\x08\x00\x00\x00\x00\x00\x00\x00\x00\x18\x08\x00\x00\x00\x00\x00\x00\x00\x00\x00\x19,\x15\x04\x15\x00\x15\x02\x00\x15\x00\x15\x10\x15\x02\x00\x00\x00\x16\xb8\x01\x16\x02&\x08\x16\xc0\x01\x14\x00\x00\x19\x1c\x18\x0cARROW:schema\x18\xac\x01/////3gAAAAQAAAAAAAKAAwABgAFAAgACgAAAAABBAAMAAAACAAIAAAABAAIAAAABAAAAAEAAAAUAAAAEAAUAAgABgAHAAwAAAAQABAAAAAAAAECEAAAABwAAAAEAAAAAAAAAAIAAABpZAAACAAMAAgABwAIAAAAAAAAAUAAAAA=\x00\x18 parquet-cpp-arrow version 18.1.0\x19\x1c\x1c\x00\x00\x00',)
(Write pid=3061491) kwargs: {}
(Write pid=3061491) it is write method
(Write pid=3061491) args: (b'p\x01\x00\x00',)
(Write pid=3061491) kwargs: {}
(Write pid=3061491) it is write method
(Write pid=3061491) args: (b'PAR1',)
(Write pid=3061491) kwargs: {}
(Write pid=3061491) it is file close
(Write pid=3061491) it is file close
(Write pid=3061491) LocalFileSystem it is init method
(Write pid=3061491) LocalFileSystem: it is _open method and the mode is wb, path is /home/ray/code/afs_demo/file/45_000000_000000.parquet
(Write pid=3061491) it is file close
(Write pid=3061491) it is file close
(Write pid=3061491) it is write method
(Write pid=3061491) args: (b'PAR1',)
(Write pid=3061491) kwargs: {}
(Write pid=3061491) it is write method
(Write pid=3061491) args: (b'\x15\x04\x15\x10\x15\x14L\x15\x02\x15\x00\x12\x00\x00',)
(Write pid=3061491) kwargs: {}
(Write pid=3061491) it is write method
(Write pid=3061491) args: (b'\x08\x1c\x00\x00\x00\x00\x00\x00\x00\x00',)
(Write pid=3061491) kwargs: {}
(Write pid=3061491) it is write method
(Write pid=3061491) args: (b'\x15\x00\x15\x12\x15\x16,\x15\x02\x15\x10\x15\x06\x15\x06\x1c\x18\x08\x00\x00\x00\x00\x00\x00\x00\x00\x18\x08\x00\x00\x00\x00\x00\x00\x00\x00\x16\x00(\x08\x00\x00\x00\x00\x00\x00\x00\x00\x18\x08\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00',)
(Write pid=3061491) kwargs: {}
(Write pid=3061491) it is write method
(Write pid=3061491) args: (b'\t \x02\x00\x00\x00\x02\x01\x01\x02\x00',)
(Write pid=3061491) kwargs: {}
(Write pid=3061491) it is write method
(Write pid=3061491) args: (b'\x15\x04\x19,5\x00\x18\x06schema\x15\x02\x00\x15\x04%\x02\x18\x02id\x00\x16\x02\x19\x1c\x19\x1c&\x00\x1c\x15\x04\x195\x00\x06\x10\x19\x18\x02id\x15\x02\x16\x02\x16\xb8\x01\x16\xc0\x01&8&\x08\x1c\x18\x08\x00\x00\x00\x00\x00\x00\x00\x00\x18\x08\x00\x00\x00\x00\x00\x00\x00\x00\x16\x00(\x08\x00\x00\x00\x00\x00\x00\x00\x00\x18\x08\x00\x00\x00\x00\x00\x00\x00\x00\x00\x19,\x15\x04\x15\x00\x15\x02\x00\x15\x00\x15\x10\x15\x02\x00\x00\x00\x16\xb8\x01\x16\x02&\x08\x16\xc0\x01\x14\x00\x00\x19\x1c\x18\x0cARROW:schema\x18\xac\x01/////3gAAAAQAAAAAAAKAAwABgAFAAgACgAAAAABBAAMAAAACAAIAAAABAAIAAAABAAAAAEAAAAUAAAAEAAUAAgABgAHAAwAAAAQABAAAAAAAAECEAAAABwAAAAEAAAAAAAAAAIAAABpZAAACAAMAAgABwAIAAAAAAAAAUAAAAA=\x00\x18 parquet-cpp-arrow version 18.1.0\x19\x1c\x1c\x00\x00\x00',)
(Write pid=3061491) kwargs: {}
(Write pid=3061491) it is write method
(Write pid=3061491) args: (b'p\x01\x00\x00',)
(Write pid=3061491) kwargs: {}
(Write pid=3061491) it is write method
(Write pid=3061491) args: (b'PAR1',)
(Write pid=3061491) kwargs: {}
(Write pid=3061491) it is file close
(Write pid=3061491) it is file close
(Write pid=2403171) LocalFileSystem it is init method [repeated 4x across cluster] (Ray deduplicates logs by default. Set RAY_DEDUP_LOGS=0 to disable log deduplication, or see https://docs.ray.io/en/master/ray-observability/user-guides/configure-logging.html#log-deduplication for more options.)
This shows the method write_parquet call write task twice.
Versions / Dependencies
ray, version 2.23.0
Reproduction script
fsspec_fs = LocalFileSystem()
filesystem = PyFileSystem(FSSpecHandler(fsspec_fs))
ds = ray.data.range(1)
ds.write_parquet(
path="./test",
filesystem=afs_filesystem
)
Issue Severity
Medium: It is a significant difficulty but I can work around it.
The text was updated successfully, but these errors were encountered: