Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Clicking "Stop Pipeline" should terminate the code_map operation gracefully #222

Open
plpycoin opened this issue Dec 2, 2024 · 1 comment
Labels
bug Something isn't working

Comments

@plpycoin
Copy link
Contributor

plpycoin commented Dec 2, 2024

  • After starting a code_map operation, clicking "Stop Pipeline" during the processing of the operation will not stop the code_map thread.
@shreyashankar shreyashankar added the bug Something isn't working label Dec 2, 2024
@reproduce-bot
Copy link

The following script is generated by AI Agent to help reproduce the issue:

# docetl/reproduce.py
import threading
import time
from concurrent.futures import ThreadPoolExecutor

class MockPipeline:
    def __init__(self):
        self.stop_event = threading.Event()
        self.executor = ThreadPoolExecutor(max_workers=2)

    def code_map_operation(self):
        def mock_task():
            while not self.stop_event.is_set():
                time.sleep(0.1)  # Simulate work
            return "Task Stopped"

        futures = [self.executor.submit(mock_task) for _ in range(2)]
        return futures

    def check_if_tasks_running(self, futures):
        for future in futures:
            if not future.done():
                return True
        return False

    def stop_pipeline(self):
        self.stop_event.set()
        self.executor.shutdown(wait=False)

def test_stop_pipeline():
    pipeline = MockPipeline()
    futures = pipeline.code_map_operation()

    time.sleep(1)  # Let the tasks run for a while
    pipeline.stop_pipeline()  # Attempt to stop the pipeline

    tasks_running = pipeline.check_if_tasks_running(futures)
    if tasks_running:
        raise AssertionError("Tasks are still running after attempting to stop the pipeline")

    try:
        for future in futures:
            result = future.result(timeout=1)  # This should raise an exception if the tasks don't stop
            assert result == "Task Stopped"
        print("Test passed successfully with no errors!")
    except Exception as e:
        raise AssertionError(e)

if __name__ == "__main__":
    test_stop_pipeline()

How to run:

python3 docetl/reproduce.py

Expected Result:

Traceback (most recent call last):
  File "docetl/reproduce.py", line 49, in <module>
    test_stop_pipeline()
  File "docetl/reproduce.py", line 38, in test_stop_pipeline
    raise AssertionError("Tasks are still running after attempting to stop the pipeline")
AssertionError: Tasks are still running after attempting to stop the pipeline

Thank you for your valuable contribution to this project and we appreciate your feedback! Please respond with an emoji if you find this script helpful. Feel free to comment below if any improvements are needed.

Best regards from an AI Agent!
@plpycoin

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

3 participants