EvalPipeline Class

The orchestrator for running evaluations.


Source Code

eval_pipeline.py
import json
import time
from typing import List, Callable, Dict

class EvalPipeline:
    """
    Orchestrates the evaluation of a model against a dataset using multiple scorers.
    """
    def __init__(self, data_path: str, model_fn: Callable, scorers: List):
        self.data = self._load_data(data_path)
        self.model = model_fn
        self.scorers = scorers
        self.results = []

    def run(self) -> Dict:
        print(f"Running eval on {len(self.data)} examples...")
        
        for example in self.data:
            input_text = example["input"]
            
            # 1. Get Model Output
            start_time = time.time()
            output = self.model(input_text)
            latency = time.time() - start_time
            
            # 2. Score Output
            scores = {}
            for scorer in self.scorers:
                scores.update(scorer.score(output, example))
            
            # 3. Log Result
            self.results.append({
                "input": input_text,
                "output": output,
                "scores": scores,
                "latency": latency
            })
            
        return self._aggregate_results()

    def _load_data(self, path):
        with open(path, 'r') as f:
            return json.load(f)

    def _aggregate_results(self):
        # Simple averaging of scores
        agg = {}
        if not self.results: return agg
        
        first_scores = self.results[0]["scores"]
        for key in first_scores:
            values = [r["scores"][key] for r in self.results]
            agg[key] = sum(values) / len(values)
            
        return {"aggregate": agg, "examples": self.results}

    def export_report(self, path="eval_report.json"):
        with open(path, 'w') as f:
            json.dump(self.results, f, indent=2)
        print(f"Report saved to {path}")