-
Notifications
You must be signed in to change notification settings - Fork 1.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Simplify Parallel Executor #2031
base: fix-streaming
Are you sure you want to change the base?
Conversation
def test_multi_thread_evaluate_call_cancelled(monkeypatch): | ||
# slow LM that sleeps for 1 second before returning the answer | ||
class SlowLM(DummyLM): | ||
def __call__(self, *args, **kwargs): | ||
import time | ||
|
||
time.sleep(1) | ||
return super().__call__(*args, **kwargs) | ||
|
||
dspy.settings.configure(lm=SlowLM({"What is 1+1?": {"answer": "2"}, "What is 2+2?": {"answer": "4"}})) | ||
|
||
devset = [new_example("What is 1+1?", "2"), new_example("What is 2+2?", "4")] | ||
program = Predict("question -> answer") | ||
assert program(question="What is 1+1?").answer == "2" | ||
|
||
# spawn a thread that will sleep for .1 seconds then send a KeyboardInterrupt | ||
def sleep_then_interrupt(): | ||
import time | ||
|
||
time.sleep(0.1) | ||
import os | ||
|
||
os.kill(os.getpid(), signal.SIGINT) | ||
|
||
input_thread = threading.Thread(target=sleep_then_interrupt) | ||
input_thread.start() | ||
|
||
with pytest.raises(KeyboardInterrupt): | ||
ev = Evaluate( | ||
devset=devset, | ||
metric=answer_exact_match, | ||
display_progress=False, | ||
num_threads=2, | ||
) | ||
score = ev(program) | ||
assert score == 100.0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This effectively got moved to tests/utils/test_parallelizer.py
Thanks a lot @CyrusNuevoDia ! Is this ready to merge |
It works... can you think of any tricky testing scenarios? |
else: | ||
logger.error( | ||
f"Error processing item {item}: {e}. Set `provide_traceback=True` to see the stack trace." | ||
) | ||
with self._lock: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would probably move this above the logging
return self._execute_isolated_single_thread(wrapped_function, data) | ||
else: | ||
return self._execute_multi_thread(wrapped_function, data) | ||
exec_type = "multi" if self.num_threads != 1 else "single" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
would could self._execute_single_thread if self.num_threads == 1 else self._execute_multi_thread
?
# If not in the main thread, skip setting signal handlers | ||
yield | ||
|
||
def cancellable_function(parent_overrides, index_item): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If I recall correctly, this was really important. It seems to have gotten lost in the (otherwise extremely neat) refactor.
When launching multiple threads, we want each thread to inherit the parent thread's local overrides.
def _execute_multi_thread(self, function, data): | ||
pbar = self._create_pbar(data) | ||
total_score = 0 | ||
total_processed = 0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Somewhere here, we'd need to have something like:
from dspy.dsp.utils.settings import thread_local_overrides
parent_overrides = thread_local_overrides.overrides.copy()
and then we should pass parent_overrides
in data
, so that wrapped(item)
can handle using the parent thread's overrides, not the new child's overrides.
Thanks a ton @CyrusNuevoDia ! I reviewed this carefully. Two issues:
|
No description provided.