I unintentionally killed a number of useful logging lines in the metric computation during my refactor.
In particular, there are counts of the per-level rubric and function evaluations that would be useful to restore.
# Set up a ThreadPoolExecutor to manage threading
n_workers = runner.evalrun.config.max_workers
runner.logger.info(f"Generating metrics with {n_workers} workers.")
# collect turns to run metrics on
turns_to_evaluate = []
for turn in evalsetrun.turns:
# only do completions
if (
evalsetrun.do_completion and turn.is_completion
): # NOTE: ANR: turn no longer has an is_completion
turns_to_evaluate.append(turn)
# or do all turns
elif not evalsetrun.do_completion:
turns_to_evaluate.append(turn)
# collect function calls to make
# here, we'll use the metric ordering established in evalsetrun.metric_graph
rubric_count = 0
# Here, need loops over threads, turns, messages, and tool calls, and then getting the appropriate
# metrics to each. Seems like we can still use the same metric_graph (which may have ≥ 4 connected
# components), and enforce in validation step that dependencies are only between metrics defined
# at the same granularity.
# Create a dictionary for the metrics
metrics_by_level = {}
for metric_instance in json.loads(evalsetrun.metrics_graph_ordered_list):
metric_level = metric_instance["metric_level"]
if metric_level not in metrics_by_level:
metrics_by_level[metric_level] = []
metrics_by_level[metric_level].append(metric_instance)
# TODO: if we go back to supporting completions, this will likely need to change
threads_to_evaluate = [thread for thread in evalsetrun.threads]
messages_to_evaluate = [message for message in evalsetrun.messages]
toolcalls_to_evaluate = [toolcall for toolcall in evalsetrun.toolcalls]
object_lists_by_level = {
"Thread": threads_to_evaluate,
"Turn": turns_to_evaluate,
"Message": messages_to_evaluate,
"ToolCall": toolcalls_to_evaluate,
}
# log details of the identified metrics and objects at the various levels
for level, object_list in object_lists_by_level.items():
metrics_at_level = metrics_by_level.get(level, [])
metric_names = ", ".join(
[
f"{metric['evaluation_name']} ({metric['id']})"
for metric in metrics_at_level
]
)
runner.logger.debug(
f"Will execute {len(metrics_at_level)} metrics for {len(object_list)} objects at the {level} level: {metric_names}"
)
for level, object_list in object_lists_by_level.items():
# Add the metrics to objects at this level
metrics_at_level = metrics_by_level.get(level, [])
self_metrics_at_level = []
relative_object_metric_map = defaultdict(list)
for metric in metrics_at_level:
if metric["relative_object_position"] == 0:
self_metrics_at_level.append(metric)
else:
# this metric refers to a different object
relative_object_position = metric["relative_object_position"]
relative_object_metric_map[relative_object_position].append(metric)
for object_list in object_lists_by_level[level]:
compute_metrics.add_all_metrics_to_objects(
object_list, self_metrics_at_level
)
for object in object_list:
object.thread
# Update the count of how many rubrics might be run based on rubric evals at this level
rubric_count += compute_metrics.count_rubric_metrics(object_list)
runner.logger.info(
f"Metrics will include up to {rubric_count} rubric evaluations."
)
metric_computer = runner.get_metric_computer()
if n_workers == 1:
metrics = []
for level, object_list in object_lists_by_level.items():
runner.logger.info(
f"Computing metrics for level {level} on {len(object_list)} objects."
)
for object in object_list:
cur_metrics = metric_computer.compute_metrics(object)
for m in cur_metrics:
if m.get("evaluation_type", None) is None:
runner.logger.exception(
f"Metric {m} does not have a value for the key `type`."
)
if eval_run.config.raise_on_metric_error:
raise ValueError(
f"Metric {m} does not have a value for the key `type`."
)
if m.get("metric_value", None) is None:
runner.logger.exception(
f"Metric {m} does not have a value for the key `metric_value`."
)
if eval_run.config.raise_on_metric_error:
raise ValueError(
f"Metric {m} does not have a value for the key `metric_value`."
)
metrics.extend(cur_metrics)
else:
with ThreadPoolExecutor(max_workers=n_workers) as executor:
futures = []
# TODO parallel execution will fail if metric dependencies cross levels
# or if metric dependencies depend on other objects.
# we could potentially bundle dependent objects together in the same thread,
# or otherwise provide a global graph for dependency execution and submit whole graphs
for level, object_list in object_lists_by_level.items():
for object in object_list:
futures.append(
executor.submit(metric_computer.compute_metrics, object)
)
# Wait for all futures to complete and handle exceptions
for fid, future in enumerate(futures):
try:
future.result() # If you need to catch exceptions or ensure completion
if fid % 100 == 0:
runner.logger.info(
f"Metrics futures resulted: {fid} / {len(futures)}"
)
except Exception as ex:
runner.logger.exception(
f"An error occurred during processing: {ex}"
)
if eval_run.config.raise_on_metric_error:
runner.shutdown_logging()
raise
metrics = []
for future in futures:
metrics += future.result()
I unintentionally killed a number of useful logging lines in the metric computation during my refactor.
In particular, there are counts of the per-level rubric and function evaluations that would be useful to restore.
Here's the old code: