Skip to content

Commit

Permalink
Add unit test to assert error if worker count changes on resumption
Browse files Browse the repository at this point in the history
Summary:

Test Plan:

Reviewers:

Subscribers:

Tasks:

Tags:
  • Loading branch information
gokulavasan committed Apr 13, 2024
1 parent 2c3b338 commit 82a7872
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 9 deletions.
32 changes: 32 additions & 0 deletions test/stateful_dataloader/test_state_dict.py
Original file line number Diff line number Diff line change
Expand Up @@ -740,5 +740,37 @@ def test_map_shuffle(self):
self.assertEqual(batches, exp)


class TestNumWorkersMismatch(unittest.TestCase):
def test_num_workers_mismatch(self):
for initial_num_workers, num_workers in itertools.product([0, 5], [0, 3, 7]):
if initial_num_workers == num_workers:
continue
dataset = DummyMapDataset(100, shuffle=False)
dl = StatefulDataLoader(
dataset=dataset,
num_workers=initial_num_workers,
collate_fn=identity,
)
state = dl.state_dict()
self.assertEqual(len(state), 0)

iter(dl)
state = dl.state_dict()
self.assertTrue(len(state) > 0)

dl = StatefulDataLoader(
dataset=dataset,
num_workers=num_workers,
collate_fn=identity,
)
dl.load_state_dict(state)
try:
iter(dl)
raise Exception("Expected AssertionError to be thrown")
except AssertionError:
continue
self.assertTrue(False, "Error should be of type AssertionError")


if __name__ == "__main__":
unittest.main()
26 changes: 17 additions & 9 deletions torchdata/stateful_dataloader/stateful_dataloader.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,8 @@
_INDEX_SAMPLER_STATE = "_index_sampler_state"
_SAMPLER_ITER_STATE = "_sampler_iter_state"
_SAMPLER_ITER_YIELDED = "_sampler_iter_yielded"
_NUM_YIELDED = "_num_yielded"
_ITERABLEDATASET_LEN_CALLED = "_IterableDataset_len_called"
_SHARED_SEED = "_shared_seed"
_BASE_SEED = "_base_seed"
_ITERATOR_FINISHED = "_iterator_finished"


Expand Down Expand Up @@ -289,6 +287,8 @@ class _StatefulSingleProcessDataLoaderIter(_StatefulBaseDataLoaderIter):
_StatefulBaseDataLoader inherits from _BaseDataLoaderIter).
"""

_NUM_YIELDED = "_num_yielded"

def __init__(self, loader, next_iter_state=None):
super().__init__(loader)
assert self._timeout == 0
Expand Down Expand Up @@ -321,23 +321,27 @@ def state_dict(self):
}
else:
fetcher_state = None

state_dict = {
_INDEX_SAMPLER_STATE: try_to_serialize(self._index_sampler),
_SAMPLER_ITER_STATE: try_to_serialize(self._sampler_iter),
_SAMPLER_ITER_YIELDED: self._sampler_iter_yielded,
_NUM_YIELDED: self._num_yielded,
self._NUM_YIELDED: self._num_yielded,
_ITERABLEDATASET_LEN_CALLED: self._IterableDataset_len_called,
_SHARED_SEED: self._shared_seed,
_FETCHER_STATE: fetcher_state,
_DATASET_STATE: try_to_serialize(self._dataset_fetcher.dataset),
_ITERATOR_FINISHED: self._finished,
}

return state_dict

def load_state_dict(self, state_dict):
# Try to restore from either _index_sampler state_dict or _sampler_iter state_dict
assert (
self._NUM_YIELDED in state_dict
), f"State doesn't contain key '{self._NUM_YIELDED}' expected for single process dataloader"
self._sampler_iter_yielded = state_dict[_SAMPLER_ITER_YIELDED]

# Try to restore from either _index_sampler state_dict or _sampler_iter state_dict
if isinstance(self._index_sampler, Stateful) or isinstance(self._sampler_iter, Stateful):
self._index_sampler = try_to_deserialize(self._index_sampler, state_dict[_INDEX_SAMPLER_STATE])
self._sampler_iter = iter(self._index_sampler)
Expand All @@ -347,7 +351,7 @@ def load_state_dict(self, state_dict):
if not isinstance(self._index_sampler, torch.utils.data.dataloader._InfiniteConstantSampler):
# Fallback to fastforward
self._sampler_iter = itertools.islice(self._index_sampler, self._sampler_iter_yielded, None)
self._num_yielded = state_dict[_NUM_YIELDED]
self._num_yielded = state_dict[self._NUM_YIELDED]
self._IterableDataset_len_called = state_dict[_ITERABLEDATASET_LEN_CALLED]
self._shared_seed = state_dict[_SHARED_SEED]

Expand Down Expand Up @@ -709,6 +713,7 @@ class _StatefulMultiProcessingDataLoaderIter(_StatefulBaseDataLoaderIter):
_SNAPSHOT_STEP = "_snapshot_step"
_STEPS_SINCE_SNAPSHOT = "_steps_since_snapshot"
_LAST_YIELDED_WORKER_ID = "_last_yielded_worker_id"
_BASE_SEED = "_base_seed"

def __init__(self, loader, next_iter_state):
super().__init__(loader)
Expand Down Expand Up @@ -743,11 +748,14 @@ def __init__(self, loader, next_iter_state):

worker_states = [None] * self._num_workers
if next_iter_state is not None:
assert (
self._SNAPSHOT in next_iter_state
), f"State doesn't contain key '{self._SNAPSHOT}' expected for multiprocess dataloader"
wstates = next_iter_state[self._SNAPSHOT].get(self._WORKER_SNAPSHOTS, {})
assert set(range(len(wstates))) == set(wstates.keys()), (len(wstates), wstates.keys())
for wid, sd in wstates.items():
worker_states[wid] = sd
self._base_seed = next_iter_state[self._SNAPSHOT][self._MAIN_SNAPSHOT].get(_BASE_SEED, self._base_seed)
self._base_seed = next_iter_state[self._SNAPSHOT][self._MAIN_SNAPSHOT].get(self._BASE_SEED, self._base_seed)
self._shared_seed = next_iter_state[self._SNAPSHOT][self._MAIN_SNAPSHOT].get(
_SHARED_SEED, self._shared_seed
)
Expand Down Expand Up @@ -1190,7 +1198,7 @@ def _get_main_state(self):
_SAMPLER_ITER_YIELDED: self._sampler_iter_yielded,
_ITERABLEDATASET_LEN_CALLED: self._IterableDataset_len_called,
_SHARED_SEED: self._shared_seed,
_BASE_SEED: self._base_seed,
self._BASE_SEED: self._base_seed,
}

def _restore_main_state(self, state_dict):
Expand All @@ -1208,7 +1216,7 @@ def _restore_main_state(self, state_dict):
self._sampler_iter = itertools.islice(self._index_sampler, self._sampler_iter_yielded, None)
self._IterableDataset_len_called = state_dict[_ITERABLEDATASET_LEN_CALLED]
self._shared_seed = state_dict[_SHARED_SEED]
self._base_seed = state_dict[_BASE_SEED]
self._base_seed = state_dict[self._BASE_SEED]

def _try_put_index(self):
assert self._tasks_outstanding < self._prefetch_factor * self._num_workers
Expand Down

0 comments on commit 82a7872

Please sign in to comment.