diff --git a/streaming/base/shared/prefix.py b/streaming/base/shared/prefix.py index bff7dc067..a942b26ea 100644 --- a/streaming/base/shared/prefix.py +++ b/streaming/base/shared/prefix.py @@ -150,9 +150,10 @@ def _check_and_find(streams_local: list[str], streams_remote: list[Union[str, No raise ValueError( f'Reused local directory: {streams_local} vs ' + f'{their_locals}. Provide a different one. If using ' + - f'a unique local directory, try deleting the local directory and ' + - f'call `streaming.base.util.clean_stale_shared_memory()` only once ' + - f'in your script to clean up the stale shared memory before ' + + f'a unique local directory, try deleting the local directory and ' + + + f'call `streaming.base.util.clean_stale_shared_memory()` only once ' + + f'in your script to clean up the stale shared memory before ' + f'instantiation of `StreamingDataset`.') return prefix_int @@ -208,10 +209,8 @@ def get_shm_prefix(streams_local: list[str], _check_self(streams_local) prefix_int = max([ - _check_and_find_retrying(streams_local, - streams_remote, - shm_name=shm_name, - retry=retry) for shm_name in SHM_TO_CLEAN + _check_and_find_retrying(streams_local, streams_remote, shm_name=shm_name, retry=retry) + for shm_name in SHM_TO_CLEAN ]) # First, the local leader registers the first available shm prefix, recording its locals. @@ -235,6 +234,7 @@ def get_shm_prefix(streams_local: list[str], f'local leader. This may be because you specified ' + f'different ``local`` parameters from different ranks.') + their_locals, their_prefix_int = _unpack_locals(bytes(shm.buf)) if streams_local != their_locals or prefix_int != their_prefix_int: raise RuntimeError(f'Internal error: shared memory registered does not match ' + f'local leader as streams_local or prefix_int not match.')