Skip to content

Commit

Permalink
test for hang
Browse files Browse the repository at this point in the history
  • Loading branch information
andrewkho committed Nov 5, 2024
1 parent 336859e commit 1ed42eb
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 54 deletions.
6 changes: 3 additions & 3 deletions .github/workflows/nodes_ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,13 @@ jobs:
matrix:
os:
- macos-latest
- ubuntu-latest
- windows-latest
# - ubuntu-latest
# - windows-latest
python-version:
- 3.9
- "3.10"
- "3.11"
- "3.12"
# - "3.12"
steps:
- name: Get PyTorch Channel
shell: bash
Expand Down
102 changes: 51 additions & 51 deletions test/nodes/test_map.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,54 +108,54 @@ def test_in_order_process(self):
def test_out_of_order_process(self):
self._test_map(False, "process")

@parameterized.expand(
itertools.product(
[0, 7, 14],
[True], # TODO: define and fix in_order = False
[0, 1, 9], # TODO: define and fix in_order = False
)
)
def test_save_load_state_thread(self, midpoint: int, in_order: bool, snapshot_frequency: int):
method = "thread"
batch_size = 6
n = 80
multiprocessing_context = None if IS_WINDOWS else "forkserver"
src = MockSource(num_samples=n)
node = Batcher(src, batch_size=batch_size, drop_last=False)
node = ParallelMapper(
node,
RandomSleepUdf(),
num_workers=4,
in_order=in_order,
method=method,
multiprocessing_context=multiprocessing_context,
snapshot_frequency=snapshot_frequency,
)
node = Prefetcher(node, prefetch_factor=2)
run_test_save_load_state(self, node, midpoint)

@parameterized.expand(
itertools.product(
[0, 7, 14],
[True], # TODO: define and fix in_order = False
[0, 1, 9], # TODO: define and fix in_order = False
)
)
def test_save_load_state_process(self, midpoint: int, in_order: bool, snapshot_frequency: int):
method = "process"
batch_size = 6
n = 80
multiprocessing_context = None if IS_WINDOWS else "forkserver"
src = MockSource(num_samples=n)
node = Batcher(src, batch_size=batch_size, drop_last=False)
node = ParallelMapper(
node,
RandomSleepUdf(),
num_workers=4,
in_order=in_order,
method=method,
multiprocessing_context=multiprocessing_context,
snapshot_frequency=snapshot_frequency,
)
node = Prefetcher(node, prefetch_factor=2)
run_test_save_load_state(self, node, midpoint)
# @parameterized.expand(
# itertools.product(
# [0, 7, 14],
# [True], # TODO: define and fix in_order = False
# [0, 1, 9], # TODO: define and fix in_order = False
# )
# )
# def test_save_load_state_thread(self, midpoint: int, in_order: bool, snapshot_frequency: int):
# method = "thread"
# batch_size = 6
# n = 80
# multiprocessing_context = None if IS_WINDOWS else "forkserver"
# src = MockSource(num_samples=n)
# node = Batcher(src, batch_size=batch_size, drop_last=False)
# node = ParallelMapper(
# node,
# RandomSleepUdf(),
# num_workers=4,
# in_order=in_order,
# method=method,
# multiprocessing_context=multiprocessing_context,
# snapshot_frequency=snapshot_frequency,
# )
# node = Prefetcher(node, prefetch_factor=2)
# run_test_save_load_state(self, node, midpoint)

# @parameterized.expand(
# itertools.product(
# [0, 7, 14],
# [True], # TODO: define and fix in_order = False
# [0, 1, 9], # TODO: define and fix in_order = False
# )
# )
# def test_save_load_state_process(self, midpoint: int, in_order: bool, snapshot_frequency: int):
# method = "process"
# batch_size = 6
# n = 80
# multiprocessing_context = None if IS_WINDOWS else "forkserver"
# src = MockSource(num_samples=n)
# node = Batcher(src, batch_size=batch_size, drop_last=False)
# node = ParallelMapper(
# node,
# RandomSleepUdf(),
# num_workers=4,
# in_order=in_order,
# method=method,
# multiprocessing_context=multiprocessing_context,
# snapshot_frequency=snapshot_frequency,
# )
# node = Prefetcher(node, prefetch_factor=2)
# run_test_save_load_state(self, node, midpoint)

0 comments on commit 1ed42eb

Please sign in to comment.