Source code for pgsi_analyzer.benchmark.results_collector

"""
Results collection and filesystem layout for the benchmark pipeline.

Groups raw CSV paths by method (collect_paths). Filesystem I/O (workspace
preparation, output path resolution) is delegated to FileSystemProvider;
ResultsCollector retains backward compatibility by delegating to a default
provider when prepare_aggregation_workspace or get_output_path are called.
"""

import re
from pathlib import Path
from typing import Dict, List, Any, TYPE_CHECKING

from ..utils.errors import AuditError
from ..benchmarks.registry import VALID_METHODS

if TYPE_CHECKING:
    from .provider import FileSystemProvider

# Allowed filename patterns (audit): only these are copied into aggregation workspace
ENERGY_CSV_PATTERN = re.compile(r"^energy_.*\.csv$")
TIME_CSV_PATTERN = re.compile(r"^time_.*\.csv$")
# Ignore directory-level garbage
GARBAGE_ENTRIES = {".DS_Store", "__pycache__", ".git", ".env"}


# File type constants for get_output_path
ENERGY_AGGREGATED = "energy_aggregated"
TIME_AGGREGATED = "time_aggregated"
ENERGY_COMBINED = "energy_combined"
TIME_COMBINED = "time_combined"
CARBON_FOOTPRINT = "carbon_footprint"
GREENSCORE = "GreenScore"


[docs] class ResultsCollector: """ Handles grouping of raw CSV paths by method. Workspace and path resolution delegate to FileSystemProvider (default instance) for backward compatibility. """ def __init__(self, provider: "FileSystemProvider" = None) -> None: from .provider import FileSystemProvider as _FS self._provider = provider if provider is not None else _FS()
[docs] def collect_paths( self, execution_results: Dict[str, Dict[str, Dict[str, Any]]], ) -> Dict[str, Dict[str, List[Path]]]: """ Group energy and time CSV paths by execution method. ``execution_results`` is expected to be a nested mapping in this shape: ``{algorithm: {method: {"energy_csv": Path, "time_csv": Path, ...}}}``. Returns: Dict[str, Dict[str, List[Path]]]: Grouped CSV parent directories: ``{"energy": {method: [Path, ...]}, "time": {method: [Path, ...]}}``. Raises: AuditError: If a method is not in the registry whitelist (``VALID_METHODS``). """ energy_by_method: Dict[str, List[Path]] = {} time_by_method: Dict[str, List[Path]] = {} for _algorithm, methods_dict in execution_results.items(): for method, results in methods_dict.items(): # Verification: method must be in registry whitelist if method not in VALID_METHODS: raise AuditError( f"Data file found for method '{method}' which is not registered in " "benchmarks/registry.py (VALID_METHODS). Audit requires all methods to be whitelisted." ) if results.get("energy_csv"): energy_csv = Path(results["energy_csv"]) energy_dir = energy_csv.parent if method not in energy_by_method: energy_by_method[method] = [] if energy_dir not in energy_by_method[method]: energy_by_method[method].append(energy_dir) if results.get("time_csv"): time_csv = Path(results["time_csv"]) time_dir = time_csv.parent if method not in time_by_method: time_by_method[method] = [] if time_dir not in time_by_method[method]: time_by_method[method].append(time_dir) return {"energy": energy_by_method, "time": time_by_method}
[docs] def prepare_aggregation_workspace( self, output_dir: Path, method: str, raw_dirs: List[Path], kind: str, ) -> Path: """Delegate to FileSystemProvider. See provider.prepare_aggregation_workspace.""" return self._provider.prepare_aggregation_workspace(output_dir, method, raw_dirs, kind)
[docs] def get_output_path( self, output_dir: Path, method: str = None, file_type: str = None, ) -> Path: """Delegate to FileSystemProvider. See provider.get_output_path.""" return self._provider.get_output_path(output_dir, method=method, file_type=file_type)