"""
File-system provider for the benchmark pipeline.
Separates pipeline coordination from filesystem I/O: workspace preparation,
path resolution, and aggregated path collection. The orchestrator uses this
provider so that I/O can be mocked in tests without touching disk.
"""
import re
import shutil
from pathlib import Path
from typing import Dict, List, Optional
from .results_collector import (
ENERGY_AGGREGATED,
TIME_AGGREGATED,
ENERGY_COMBINED,
TIME_COMBINED,
CARBON_FOOTPRINT,
GREENSCORE,
ENERGY_CSV_PATTERN,
TIME_CSV_PATTERN,
GARBAGE_ENTRIES,
)
[docs]
class FileSystemProvider:
"""
Handles filesystem operations for the benchmark pipeline: creating workspaces,
copying raw CSVs by pattern, resolving output paths, and collecting aggregated paths.
"""
[docs]
def prepare_aggregation_workspace(
self,
output_dir: Path,
method: str,
raw_dirs: List[Path],
kind: str,
) -> Path:
"""
Create a workspace directory and copy raw CSVs from raw_dirs into it.
Args:
output_dir: Base output directory (e.g. results/).
method: Execution method name (e.g. cpython, pypy).
raw_dirs: List of directories that contain raw CSV files.
kind: "energy" or "time" (used for temp dir naming and pattern).
Returns:
Path to the created directory containing the copied CSV files.
"""
workspace = Path(output_dir) / f"temp_{kind}_{method}"
workspace.mkdir(parents=True, exist_ok=True)
pattern = ENERGY_CSV_PATTERN if kind == "energy" else TIME_CSV_PATTERN
for dir_path in raw_dirs:
dir_path = Path(dir_path)
if not dir_path.is_dir():
continue
for entry in dir_path.iterdir():
if entry.name in GARBAGE_ENTRIES:
continue
if not entry.is_file():
continue
if pattern.match(entry.name):
shutil.copy2(entry, workspace / entry.name)
return workspace
[docs]
def get_output_path(
self,
output_dir: Path,
method: Optional[str] = None,
file_type: Optional[str] = None,
) -> Path:
"""
Return the canonical path for an output file (aggregated, combined, or final).
Creates method subdirectories when file_type is energy_aggregated or time_aggregated.
Args:
output_dir: Base output directory.
method: Execution method (required for energy_aggregated, time_aggregated).
file_type: One of energy_aggregated, time_aggregated, energy_combined,
time_combined, carbon_footprint, GreenScore.
Returns:
Path where the file should be written.
"""
output_dir = Path(output_dir)
if file_type == ENERGY_AGGREGATED or file_type == TIME_AGGREGATED:
if not method:
raise ValueError("method required for energy_aggregated / time_aggregated")
method_dir = output_dir / method
method_dir.mkdir(parents=True, exist_ok=True)
if file_type == ENERGY_AGGREGATED:
return method_dir / "energy_aggregated.csv"
return method_dir / "time_aggregated.csv"
if file_type == ENERGY_COMBINED:
return output_dir / "energy_combined.csv"
if file_type == TIME_COMBINED:
return output_dir / "time_combined.csv"
if file_type == CARBON_FOOTPRINT:
return output_dir / "carbon_footprint.csv"
if file_type == GREENSCORE:
return output_dir / "GreenScore.csv"
raise ValueError(f"Unknown file_type: {file_type}")
[docs]
def collect_aggregated_paths(
self,
output_dir: Path,
methods: List[str],
) -> Dict[str, Dict[str, Path]]:
"""
Return paths to energy_aggregated.csv and time_aggregated.csv for each method.
Args:
output_dir: Base output directory.
methods: List of method names (e.g. cpython, pypy).
Returns:
{"energy": {method: path}, "time": {method: path}}
"""
output_dir = Path(output_dir)
energy_paths: Dict[str, Path] = {}
time_paths: Dict[str, Path] = {}
for method in methods:
energy_paths[method] = self.get_output_path(output_dir, method=method, file_type=ENERGY_AGGREGATED)
time_paths[method] = self.get_output_path(output_dir, method=method, file_type=TIME_AGGREGATED)
return {"energy": energy_paths, "time": time_paths}