"""
Benchmark orchestration module.
Coordinates the full benchmark execution pipeline:
1. Build benchmarks (if needed)
2. Execute benchmarks
3. Aggregate results
4. Combine methods
5. Calculate carbon
6. Calculate GreenScore
File and path logistics are delegated to ResultsCollector.
"""
import json
import pandas as pd
from pathlib import Path
from typing import List, Dict, Optional
from collections import defaultdict
from .results_collector import (
ResultsCollector,
ENERGY_AGGREGATED,
TIME_AGGREGATED,
ENERGY_COMBINED,
TIME_COMBINED,
CARBON_FOOTPRINT,
GREENSCORE,
)
from .provider import FileSystemProvider
from ..benchmarks.registry import (
list_methods as list_methods_builtin,
)
from ..benchmarks.discovery import (
build_registry,
get_benchmark_path_from_registry,
list_algorithms_from_registry,
list_methods_from_registry,
)
from .builder import build_benchmark, requires_build
from .executor import execute_benchmark, AuditLogger, AUDIT_REPORT_FILENAME
from ..models import (
aggregate_energy,
aggregate_time,
combine_energy_results,
combine_time_results,
calculate_carbon_footprint,
calculate_greenscore,
)
from ..utils import AnalysisError, ConfigurationError
from ..config import ToolPaths
[docs]
def get_benchmark_path(
algorithm: str, method: str, registry: Optional[Dict[str, Dict[str, str]]] = None
) -> Path:
"""
Backward-compatible path resolver used by tests and orchestrator internals.
"""
active_registry = registry or build_registry()
return get_benchmark_path_from_registry(active_registry, algorithm, method)
[docs]
def resolve_algorithms(algorithms: List[str], registry: Optional[Dict[str, Dict[str, str]]] = None) -> List[str]:
"""
Resolve algorithm list, expanding 'all' to all available algorithms.
Args:
algorithms: List of algorithm names or ['all']
Returns:
Sorted list of algorithm names
"""
active_registry = registry or build_registry()
if "all" in algorithms:
return list_algorithms_from_registry(active_registry)
available = set(active_registry.keys())
invalid = [a for a in algorithms if a not in available]
if invalid:
raise ValueError(
f"Invalid algorithms: {invalid}. Available: {list_algorithms_from_registry(active_registry)}"
)
return sorted(set(algorithms)) # Deduplicate and sort
[docs]
def resolve_methods(
methods: List[str],
algorithm: Optional[str] = None,
registry: Optional[Dict[str, Dict[str, str]]] = None,
) -> List[str]:
"""
Resolve method list, expanding 'all' to all available methods.
Args:
methods: List of method names or ['all']
algorithm: Optional algorithm name for validation
Returns:
List of method names in deterministic order
"""
active_registry = registry or build_registry()
if "all" in methods:
return list_methods_from_registry(active_registry, algorithm)
# Validate methods
if algorithm:
valid_for_algorithm = set(list_methods_from_registry(active_registry, algorithm))
invalid = [m for m in methods if m not in valid_for_algorithm]
if invalid:
raise ValueError(
f"Invalid methods for {algorithm}: {invalid}. "
f"Available: {list_methods_from_registry(active_registry, algorithm)}"
)
else:
valid_methods = list_methods_from_registry(active_registry)
invalid = [m for m in methods if m not in valid_methods]
if invalid:
raise ValueError(
f"Invalid methods: {invalid}. Available: {valid_methods}"
)
# Return in deterministic order
valid_order = list_methods_builtin()
return [m for m in valid_order if m in methods]
[docs]
def run_benchmark_suite(
algorithms: List[str],
methods: List[str],
runs: int = 50,
algorithm_runs: Optional[Dict[str, int]] = None,
output_dir: Path = None,
carbon_intensity: float = 0.000475,
alpha: float = 0.4,
beta: float = 0.4,
gamma: float = 0.2,
tool_paths: Optional[ToolPaths] = None,
path_sources: Optional[dict] = None,
provider: Optional[FileSystemProvider] = None,
benchmarks_dir: Optional[Path] = None,
) -> Path:
"""
Execute full benchmark suite and generate GreenScore CSV.
This is the main orchestration function that:
1. Resolves algorithm and method lists
2. Builds benchmarks that require compilation
3. Executes all benchmark × method combinations
4. Aggregates raw CSVs
5. Combines results across methods
6. Calculates carbon footprint
7. Calculates GreenScore
8. Writes final GreenScore.csv
Args:
algorithms: List of algorithm names or ['all']
methods: List of method names or ['all']
runs: Number of runs per benchmark (default: 50)
algorithm_runs: Optional per-algorithm run overrides (e.g. {"hanoi": 10})
output_dir: Base directory for all outputs (defaults to ./results)
carbon_intensity: Carbon intensity factor in gCO₂e/J (default: 0.000475)
alpha: Energy weight for GreenScore (default: 0.4)
beta: Carbon weight for GreenScore (default: 0.4)
gamma: Time weight for GreenScore (default: 0.2)
tool_paths: Optional ToolPaths configuration for Python/PyPy/C compiler
path_sources: Optional dict of path source metadata for audit report
provider: Optional FileSystemProvider for workspace/path I/O (default: real provider)
Returns:
Path to final GreenScore.csv file
Raises:
ValueError: If algorithms or methods are invalid
ConfigurationError: If build fails
AnalysisError: If data processing fails
"""
registry = build_registry(benchmarks_dir)
# Resolve algorithm and method lists
algorithm_list = resolve_algorithms(algorithms, registry=registry)
method_list = resolve_methods(methods, registry=registry)
if output_dir is None:
output_dir = Path.cwd() / "results"
output_dir = Path(output_dir)
output_dir.mkdir(parents=True, exist_ok=True)
print(f"Running benchmark suite:")
print(f" Algorithms: {len(algorithm_list)} ({', '.join(algorithm_list[:3])}...)")
print(f" Methods: {len(method_list)} ({', '.join(method_list)})")
print(f" Runs per benchmark: {runs}")
print(f" Output directory: {output_dir}")
print()
# Track execution results and audit (path identity verification)
execution_results: Dict[str, Dict[str, Dict[str, Path]]] = defaultdict(dict)
audit_logger = AuditLogger()
if path_sources is None:
path_sources = {
"python": {"path": str(tool_paths.python.resolve()) if tool_paths else None, "source": "system_default"},
"pypy": {"path": str(tool_paths.pypy.resolve()) if tool_paths and tool_paths.pypy else None, "source": "system_default"},
"c_compiler": {"path": str(tool_paths.c_compiler.resolve()) if tool_paths and tool_paths.c_compiler else None, "source": "system_default"},
}
# Phase 1: Build benchmarks that require compilation
print("Phase 1: Building benchmarks...")
built_benchmarks: Dict[str, Dict[str, Path]] = {}
for algorithm in algorithm_list:
for method in method_list:
if requires_build(method):
print(f" Building {algorithm}/{method}...", end=" ", flush=True)
try:
benchmark_path = get_benchmark_path(algorithm, method, registry=registry)
built_path = build_benchmark(algorithm, method, benchmark_path, tool_paths=tool_paths)
built_benchmarks.setdefault(algorithm, {})[method] = built_path
print("OK")
except Exception as e:
print(f"ERROR: {e}")
# Continue with other benchmarks
continue
print()
# Phase 2: Execute benchmarks
print("Phase 2: Executing benchmarks...")
total_combinations = len(algorithm_list) * len(method_list)
current = 0
for algorithm in algorithm_list:
for method in method_list:
current += 1
print(f" [{current}/{total_combinations}] {algorithm}/{method}...", end=" ", flush=True)
try:
effective_runs = int((algorithm_runs or {}).get(algorithm, runs))
# Get benchmark path (use built path if available)
if algorithm in built_benchmarks and method in built_benchmarks[algorithm]:
benchmark_path = built_benchmarks[algorithm][method]
else:
benchmark_path = get_benchmark_path(algorithm, method, registry=registry)
# Execute benchmark
# Note: The benchmark script itself handles runs via decorators
# We just need to execute it once - decorators will run n times
results = execute_benchmark(
algorithm=algorithm,
method=method,
benchmark_path=benchmark_path,
runs=effective_runs,
output_dir=output_dir,
tool_paths=tool_paths,
audit_logger=audit_logger,
)
execution_results[algorithm][method] = results
print("OK")
except Exception as e:
print(f"ERROR: {e}")
# Continue with other benchmarks
continue
print()
# Write audit_report.json (path identity and requested/resolved/runtime-reported)
try:
report = audit_logger.to_report_dict(path_sources)
report_path = output_dir / AUDIT_REPORT_FILENAME
report_path.write_text(json.dumps(report, indent=2), encoding="utf-8")
if report.get("severity") == "HIGH":
print(f" WARNING: Audit report path mismatch detected - see {report_path}")
except Exception:
pass # Do not fail the run if report write fails
fs_provider = provider if provider is not None else FileSystemProvider()
collector = ResultsCollector(provider=fs_provider)
# Phase 3: Collect and organize raw CSVs (delegate to ResultsCollector)
print("Phase 3: Collecting raw measurement data...")
collected = collector.collect_paths(execution_results)
method_energy_dirs = collected["energy"]
method_time_dirs = collected["time"]
# Phase 4: Aggregate per method (workspace and paths via FileSystemProvider)
print("Phase 4: Aggregating results per method...")
aggregated_energy_files: Dict[str, Path] = {}
aggregated_time_files: Dict[str, Path] = {}
for method in method_list:
if method in method_energy_dirs and method_energy_dirs[method]:
workspace_energy = fs_provider.prepare_aggregation_workspace(
output_dir, method, method_energy_dirs[method], "energy"
)
agg_energy_path = fs_provider.get_output_path(
output_dir, method=method, file_type=ENERGY_AGGREGATED
)
aggregate_energy(workspace_energy, output_path=agg_energy_path)
aggregated_energy_files[method] = agg_energy_path
if method in method_time_dirs and method_time_dirs[method]:
workspace_time = fs_provider.prepare_aggregation_workspace(
output_dir, method, method_time_dirs[method], "time"
)
agg_time_path = fs_provider.get_output_path(
output_dir, method=method, file_type=TIME_AGGREGATED
)
aggregate_time(workspace_time, output_path=agg_time_path)
aggregated_time_files[method] = agg_time_path
print()
# Phase 5: Combine methods
print("Phase 5: Combining results across methods...")
if not aggregated_energy_files or not aggregated_time_files:
raise AnalysisError(
"No aggregated data available. Check benchmark execution results."
)
energy_combined_path = fs_provider.get_output_path(output_dir, file_type=ENERGY_COMBINED)
combine_energy_results(
list(aggregated_energy_files.values()),
output_path=energy_combined_path,
)
time_combined_path = fs_provider.get_output_path(output_dir, file_type=TIME_COMBINED)
combine_time_results(
list(aggregated_time_files.values()),
output_path=time_combined_path,
)
print(f" Energy combined: {energy_combined_path}")
print(f" Time combined: {time_combined_path}")
print()
# Phase 6: Calculate carbon footprint
print("Phase 6: Calculating carbon footprint...")
carbon_path = fs_provider.get_output_path(output_dir, file_type=CARBON_FOOTPRINT)
carbon_df = calculate_carbon_footprint(
energy_combined_path,
output_path=carbon_path,
carbon_intensity=carbon_intensity,
)
print(f" Carbon footprint: {carbon_path}")
print()
# Phase 7: Calculate GreenScore
print("Phase 7: Calculating GreenScore...")
energy_df = pd.read_csv(energy_combined_path)
time_df = pd.read_csv(time_combined_path)
greenscore_path = fs_provider.get_output_path(output_dir, file_type=GREENSCORE)
greenscore_df = calculate_greenscore(
energy_df,
time_df,
carbon_df,
alpha=alpha,
beta=beta,
gamma=gamma,
output_path=greenscore_path,
aggregated_energy_paths=aggregated_energy_files,
)
print(f" GreenScore: {greenscore_path}")
print()
# Augment audit_report.json with data_methodology_summary and normalization_bounds
try:
report_path = output_dir / AUDIT_REPORT_FILENAME
if report_path.exists():
report = json.loads(report_path.read_text(encoding="utf-8"))
if "points_measured" in greenscore_df.columns and "points_estimated" in greenscore_df.columns:
total_measured = int(greenscore_df["points_measured"].sum())
total_estimated = int(greenscore_df["points_estimated"].sum())
total_points = total_measured + total_estimated
report["data_methodology_summary"] = {
"total_points": total_points,
"hardware_percentage": round(100.0 * total_measured / total_points, 2) if total_points else 0.0,
"estimation_percentage": round(100.0 * total_estimated / total_points, 2) if total_points else 0.0,
}
else:
report["data_methodology_summary"] = {"total_points": 0, "hardware_percentage": 0.0, "estimation_percentage": 0.0}
numeric_energy = energy_df.drop(columns=["algorithm"], errors="ignore").select_dtypes(include="number")
numeric_time = time_df.drop(columns=["algorithm"], errors="ignore").select_dtypes(include="number")
numeric_carbon = carbon_df.drop(columns=["algorithm"], errors="ignore").select_dtypes(include="number")
report["normalization_bounds"] = {
"energy": {"min": float(numeric_energy.min().min()), "max": float(numeric_energy.max().max())} if not numeric_energy.empty else {"min": None, "max": None},
"time": {"min": float(numeric_time.min().min()), "max": float(numeric_time.max().max())} if not numeric_time.empty else {"min": None, "max": None},
"carbon": {"min": float(numeric_carbon.min().min()), "max": float(numeric_carbon.max().max())} if not numeric_carbon.empty else {"min": None, "max": None},
}
report_path.write_text(json.dumps(report, indent=2), encoding="utf-8")
except Exception:
pass
# Final summary
print("=" * 70)
print("Benchmark Suite Execution Complete")
print("=" * 70)
print(f"Final GreenScore ranking saved to: {greenscore_path}")
print()
print("Top 3 methods by GreenScore (lower is better):")
print(greenscore_df.head(3).to_string(index=False))
print()
return greenscore_path