Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Single executor #982

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion python/langsmith/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -3939,7 +3939,7 @@ def _log_evaluation_feedback(
source_info: Optional[Dict[str, Any]] = None,
project_id: Optional[ID_TYPE] = None,
*,
_executor: Optional[cf.ThreadPoolExecutor] = None,
_executor: Optional[cf.Executor] = None,
) -> List[ls_evaluator.EvaluationResult]:
results = self._select_eval_results(evaluator_response)

Expand Down
180 changes: 93 additions & 87 deletions python/langsmith/evaluation/_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -851,7 +851,11 @@ def _evaluate(
runs,
client,
)

executor = ls_utils.ContextThreadPoolExecutor(
# If 0, we will run the system in the main thread but still
# submit the feedback in a background thread
max_workers=max_concurrency if max_concurrency != 0 else 1
)
manager = _ExperimentManager(
data,
client=client,
Expand All @@ -861,6 +865,7 @@ def _evaluate(
num_repetitions=num_repetitions,
# If provided, we don't need to create a new experiment.
runs=runs,
executor=executor,
# Create or resolve the experiment.
).start()
cache_dir = ls_utils.get_cache_dir(None)
Expand Down Expand Up @@ -1100,6 +1105,8 @@ def __init__(
summary_results: Optional[Iterable[EvaluationResults]] = None,
description: Optional[str] = None,
num_repetitions: int = 1,
*,
executor: cf.Executor,
):
super().__init__(
experiment=experiment,
Expand All @@ -1113,6 +1120,7 @@ def __init__(
self._evaluation_results = evaluation_results
self._summary_results = summary_results
self._num_repetitions = num_repetitions
self._executor = executor

@property
def examples(self) -> Iterable[schemas.Example]:
Expand Down Expand Up @@ -1163,6 +1171,7 @@ def start(self) -> _ExperimentManager:
client=self.client,
runs=self._runs,
evaluation_results=self._evaluation_results,
executor=self._executor,
)

def with_predictions(
Expand All @@ -1184,6 +1193,7 @@ def with_predictions(
client=self.client,
runs=(pred["run"] for pred in r2),
# TODO: Can't do multiple prediction rounds rn.
executor=self._executor,
)

def with_evaluators(
Expand Down Expand Up @@ -1214,6 +1224,7 @@ def with_evaluators(
runs=(result["run"] for result in r2),
evaluation_results=(result["evaluation_results"] for result in r3),
summary_results=self._summary_results,
executor=self._executor,
)

def with_summary_evaluators(
Expand All @@ -1234,6 +1245,7 @@ def with_summary_evaluators(
runs=self.runs,
evaluation_results=self._evaluation_results,
summary_results=aggregate_feedback_gen,
executor=self._executor,
)

def get_results(self) -> Iterable[ExperimentResultRow]:
Expand Down Expand Up @@ -1274,28 +1286,26 @@ def _predict(
)

else:
with ls_utils.ContextThreadPoolExecutor(max_concurrency) as executor:
futures = [
executor.submit(
_forward,
fn,
example,
self.experiment_name,
self._metadata,
self.client,
)
for example in self.examples
]
for future in cf.as_completed(futures):
yield future.result()
futures = [
self._executor.submit(
_forward,
fn,
example,
self.experiment_name,
self._metadata,
self.client,
)
for example in self.examples
]
for future in cf.as_completed(futures):
yield future.result()
# Close out the project.
self._end()

def _run_evaluators(
self,
evaluators: Sequence[RunEvaluator],
current_results: ExperimentResultRow,
executor: cf.ThreadPoolExecutor,
) -> ExperimentResultRow:
current_context = rh.get_tracing_context()
metadata = {
Expand Down Expand Up @@ -1327,7 +1337,7 @@ def _run_evaluators(
eval_results["results"].extend(
# TODO: This is a hack
self.client._log_evaluation_feedback(
evaluator_response, run=run, _executor=executor
evaluator_response, run=run, _executor=self._executor
)
)
except Exception as e:
Expand All @@ -1352,40 +1362,36 @@ def _score(
Expects runs to be available in the manager.
(e.g. from a previous prediction step)
"""
with ls_utils.ContextThreadPoolExecutor(
max_workers=max_concurrency
) as executor:
if max_concurrency == 0:
context = copy_context()
for current_results in self.get_results():
yield context.run(
if max_concurrency == 0:
context = copy_context()
for current_results in self.get_results():
yield context.run(
self._run_evaluators,
evaluators,
current_results,
executor=self._executor,
)
else:
futures = set()
for current_results in self.get_results():
futures.add(
self._executor.submit(
self._run_evaluators,
evaluators,
current_results,
executor=executor,
)
else:
futures = set()
for current_results in self.get_results():
futures.add(
executor.submit(
self._run_evaluators,
evaluators,
current_results,
executor=executor,
)
)
try:
# Since prediction may be slow, yield (with a timeout) to
# allow for early results to be emitted.
for future in cf.as_completed(futures, timeout=0.001):
yield future.result()
futures.remove(future)
except (cf.TimeoutError, TimeoutError):
pass
for future in cf.as_completed(futures):
result = future.result()
yield result
)
try:
# Since prediction may be slow, yield (with a timeout) to
# allow for early results to be emitted.
for future in cf.as_completed(futures, timeout=0.001):
yield future.result()
futures.remove(future)
except (cf.TimeoutError, TimeoutError):
pass
for future in cf.as_completed(futures):
result = future.result()
yield result

def _apply_summary_evaluators(
self, summary_evaluators: Sequence[SUMMARY_EVALUATOR_T]
Expand All @@ -1395,48 +1401,47 @@ def _apply_summary_evaluators(
runs.append(run)
examples.append(example)
aggregate_feedback = []
with ls_utils.ContextThreadPoolExecutor() as executor:
project_id = self._get_experiment().id
current_context = rh.get_tracing_context()
metadata = {
**(current_context["metadata"] or {}),
**{
"experiment": self.experiment_name,
"experiment_id": project_id,
},
project_id = self._get_experiment().id
current_context = rh.get_tracing_context()
metadata = {
**(current_context["metadata"] or {}),
**{
"experiment": self.experiment_name,
"experiment_id": project_id,
},
}
with rh.tracing_context(
**{
**current_context,
"project_name": "evaluators",
"metadata": metadata,
"client": self.client,
"enabled": True,
}
with rh.tracing_context(
**{
**current_context,
"project_name": "evaluators",
"metadata": metadata,
"client": self.client,
"enabled": True,
}
):
for evaluator in summary_evaluators:
try:
summary_eval_result = evaluator(runs, examples)
# TODO: Expose public API for this.
flattened_results = self.client._select_eval_results(
summary_eval_result,
fn_name=evaluator.__name__,
)
aggregate_feedback.extend(flattened_results)
for result in flattened_results:
feedback = result.dict(exclude={"target_run_id"})
evaluator_info = feedback.pop("evaluator_info", None)
executor.submit(
self.client.create_feedback,
**feedback,
run_id=None,
project_id=project_id,
source_info=evaluator_info,
)
except Exception as e:
logger.error(
f"Error running summary evaluator {repr(evaluator)}: {e}"
):
for evaluator in summary_evaluators:
try:
summary_eval_result = evaluator(runs, examples)
# TODO: Expose public API for this.
flattened_results = self.client._select_eval_results(
summary_eval_result,
fn_name=evaluator.__name__,
)
aggregate_feedback.extend(flattened_results)
for result in flattened_results:
feedback = result.dict(exclude={"target_run_id"})
evaluator_info = feedback.pop("evaluator_info", None)
self._executor.submit(
self.client.create_feedback,
**feedback,
run_id=None,
project_id=project_id,
source_info=evaluator_info,
)
except Exception as e:
logger.error(
f"Error running summary evaluator {repr(evaluator)}: {e}"
)
yield {"results": aggregate_feedback}

def _get_dataset_version(self) -> Optional[str]:
Expand Down Expand Up @@ -1477,6 +1482,7 @@ def _end(self) -> None:
end_time=datetime.datetime.now(datetime.timezone.utc),
metadata=project_metadata,
)
self._executor.shutdown()


def _resolve_evaluators(
Expand Down
Loading