Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix shared memory permission issue in a shared pod environment #813

Merged
merged 43 commits into from
Nov 1, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
c32f066
update
XiaohanZhangCMU Oct 25, 2024
7934dcc
update
XiaohanZhangCMU Oct 28, 2024
98ac253
update
XiaohanZhangCMU Oct 28, 2024
56c364e
update
XiaohanZhangCMU Oct 28, 2024
06a585d
lint
XiaohanZhangCMU Oct 28, 2024
beac17b
update
XiaohanZhangCMU Oct 28, 2024
e48c11b
add test
XiaohanZhangCMU Oct 28, 2024
c9f4ff3
update
XiaohanZhangCMU Oct 28, 2024
aa5a808
update
XiaohanZhangCMU Oct 29, 2024
60969b3
lint
XiaohanZhangCMU Oct 29, 2024
bd81ba9
update
XiaohanZhangCMU Oct 29, 2024
d4b4715
update
XiaohanZhangCMU Oct 29, 2024
ea388d4
update
XiaohanZhangCMU Oct 29, 2024
f87e1a9
add prints
XiaohanZhangCMU Oct 29, 2024
13884ff
update
XiaohanZhangCMU Oct 29, 2024
f588bd5
update
XiaohanZhangCMU Oct 29, 2024
aa3c798
update
XiaohanZhangCMU Oct 29, 2024
44144fd
refactoring
XiaohanZhangCMU Oct 30, 2024
cdd3472
update
XiaohanZhangCMU Oct 30, 2024
00fa94e
update
XiaohanZhangCMU Oct 30, 2024
5bb325a
update
XiaohanZhangCMU Oct 30, 2024
3bc6f5c
update
XiaohanZhangCMU Oct 30, 2024
1158e6d
update
XiaohanZhangCMU Oct 30, 2024
e00976c
update
XiaohanZhangCMU Oct 30, 2024
4d550fd
update
XiaohanZhangCMU Oct 30, 2024
14cb77c
update
XiaohanZhangCMU Oct 30, 2024
50edd61
update
XiaohanZhangCMU Oct 30, 2024
587532a
update
XiaohanZhangCMU Oct 30, 2024
161768f
update
XiaohanZhangCMU Oct 30, 2024
2c1915e
update
XiaohanZhangCMU Oct 30, 2024
fd56d62
update
XiaohanZhangCMU Oct 30, 2024
f401ba3
update
XiaohanZhangCMU Oct 30, 2024
8309cc4
update
XiaohanZhangCMU Oct 30, 2024
4bbe87b
update
XiaohanZhangCMU Oct 30, 2024
f6fc0d7
update
XiaohanZhangCMU Oct 30, 2024
4cf8ade
update
XiaohanZhangCMU Oct 30, 2024
0861452
update
XiaohanZhangCMU Oct 30, 2024
c8ced84
update
XiaohanZhangCMU Oct 30, 2024
1532d72
update
XiaohanZhangCMU Nov 1, 2024
4b98159
update
XiaohanZhangCMU Nov 1, 2024
0ec6b84
update
XiaohanZhangCMU Nov 1, 2024
5dc5d14
update
XiaohanZhangCMU Nov 1, 2024
9d90a23
update
XiaohanZhangCMU Nov 1, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion streaming/base/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -526,7 +526,7 @@ def __init__(self,
]
self._shm_prefix_int, self._locals_shm = get_shm_prefix(streams_local, streams_remote,
self._unique_rank_world)
self._filelock_root = os.path.join(gettempdir(), 'streaming')
XiaohanZhangCMU marked this conversation as resolved.
Show resolved Hide resolved
self._filelock_root = gettempdir()
os.makedirs(self._filelock_root, exist_ok=True)

# Create the shared memory-backed barrier, without its lock, which is unpickleable.
Expand Down
18 changes: 17 additions & 1 deletion streaming/base/shared/prefix.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,16 @@
prevent shared resources like shared memory from colliding.
"""

import os
from collections import Counter
from tempfile import gettempdir
from time import sleep
from typing import Iterator, Union

import numpy as np
from torch import distributed as dist

from streaming.base.constant import LOCALS, TICK
from streaming.base.constant import LOCALS, SHM_TO_CLEAN, TICK
from streaming.base.shared import SharedMemory
from streaming.base.world import World

Expand Down Expand Up @@ -108,8 +110,22 @@ def _check_and_find(streams_local: list[str], streams_remote: list[Union[str, No
prefix_int = 0
for prefix_int in _each_prefix_int():
name = _get_path(prefix_int, LOCALS)

# Check if any shared memory filelocks exist for the current prefix
try:
shared_memory_exists = any(
os.path.exists(os.path.join(gettempdir(), _get_path(prefix_int, shm_name)))
for shm_name in SHM_TO_CLEAN)
if shared_memory_exists:
continue
except PermissionError:
XiaohanZhangCMU marked this conversation as resolved.
Show resolved Hide resolved
continue

# Attempt to access shared memory by name. Use prefix_int if files do not exist
try:
shm = SharedMemory(name, False)
except PermissionError:
continue
except FileNotFoundError:
break
their_locals, _ = _unpack_locals(bytes(shm.buf))
Expand Down
9 changes: 6 additions & 3 deletions streaming/base/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,9 +184,12 @@ def clean_stale_shared_memory() -> None:
try:
shm = BuiltinSharedMemory(name, True, 4)
except FileExistsError:
shm = BuiltinSharedMemory(name, False, 4)
leaked_shm = True
finally:
try:
shm = BuiltinSharedMemory(name, False, 4)
leaked_shm = True
except PermissionError:
continue
if shm:
shm.close() # pyright: ignore
shm.unlink()
# Come out of loop if no leaked shared memory
Expand Down
Loading