"""
Benchmark execution module.
Handles subprocess execution of benchmarks with proper isolation
and environment setup. Ensures compilation time is excluded from
measurements. Writes audit log (.audit.log) in output_dir for each run.
Supports AuditLogger for path identity verification and audit_report.json.
"""
import json
import subprocess
import sys
import os
import shutil
from pathlib import Path
from typing import Optional, Dict, List, Any
from datetime import datetime
from ..utils import MeasurementError, PlatformError
from ..platform.detection import detect_platform
from ..config import ToolPaths
AUDIT_LOG_FILENAME = ".audit.log"
AUDIT_REPORT_FILENAME = "audit_report.json"
[docs]
def get_runtime_executable(
interpreter_path: str,
env: Dict[str, str],
cwd: Optional[str] = None,
) -> Optional[str]:
"""
Run the interpreter with -c "import sys; print(sys.executable)" and return
the path the runtime reports. Used for path identity verification.
"""
try:
result = subprocess.run(
[interpreter_path, "-c", "import sys; print(sys.executable)"],
env=env,
cwd=cwd or os.getcwd(),
capture_output=True,
text=True,
timeout=10,
)
if result.returncode == 0 and result.stdout:
return result.stdout.strip()
except (subprocess.TimeoutExpired, FileNotFoundError, Exception):
pass
return None
[docs]
class AuditLogger:
"""
Captures cmd list and env for every subprocess.run, and path identity
(resolved vs runtime-reported) per method for audit_report.json.
"""
def __init__(self) -> None:
self._executions: List[Dict[str, Any]] = []
self._path_identity: Dict[str, Dict[str, Any]] = {} # method -> resolved, runtime_reported
[docs]
def log_execution(
self,
method: str,
algorithm: str,
cmd: List[str],
env_slice: Dict[str, str],
resolved_interpreter: str,
runtime_reported_path: Optional[str] = None,
) -> None:
self._executions.append({
"method": method,
"algorithm": algorithm,
"cmd": cmd,
"env_slice": env_slice,
"resolved_interpreter": resolved_interpreter,
"runtime_reported_path": runtime_reported_path,
})
if method not in self._path_identity and runtime_reported_path is not None:
self._path_identity[method] = {
"resolved": resolved_interpreter,
"runtime_reported": runtime_reported_path,
}
[docs]
def set_path_identity(self, method: str, resolved: str, runtime_reported: Optional[str]) -> None:
"""Set or update path identity for a method (e.g. after path identity check)."""
self._path_identity[method] = {"resolved": resolved, "runtime_reported": runtime_reported or ""}
[docs]
def to_report_dict(
self,
path_sources: Dict[str, Dict[str, Any]],
) -> Dict[str, Any]:
"""
Build the structure for audit_report.json: path_integrity per method,
requested/resolved/runtime_reported, source (env/cli/system_default),
and severity HIGH if mismatch.
"""
method_to_tool = {"cpython": "python", "pypy": "pypy", "cython": "python", "ctypes": "python", "py_compile": "python"}
methods = sorted(self._path_identity.keys())
path_entries = []
any_mismatch = False
for method in methods:
tool = method_to_tool.get(method, "python")
sources = path_sources.get(tool, {"path": None, "source": "system_default"})
requested = sources.get("path") or ""
identity = self._path_identity[method]
resolved = identity.get("resolved", "")
runtime_reported = identity.get("runtime_reported", "")
path_ok = bool(resolved and runtime_reported and Path(resolved).resolve() == Path(runtime_reported).resolve())
if not path_ok and resolved and runtime_reported:
any_mismatch = True
path_entries.append({
"method": method,
"requested_path": requested,
"resolved_path": resolved,
"runtime_reported_path": runtime_reported,
"path_source": sources.get("source", "system_default"),
"path_integrity": path_ok,
})
report = {
"timestamp": datetime.now().isoformat(),
"path_entries": path_entries,
"severity": "HIGH" if any_mismatch else "NONE",
}
if any_mismatch:
report["message"] = "Path mismatch detected: resolved interpreter path does not match runtime-reported path (e.g. symlink or PATH shadowing)."
return report
def _append_audit_log(
output_dir: Optional[Path],
algorithm: str,
method: str,
exec_args: List[str],
exec_env: Dict[str, str],
interpreter_absolute: str,
) -> None:
"""
Append one audit record to output_dir/.audit.log.
Logs exec_args, interpreter path (for cpython/pypy), and selected env vars.
"""
if output_dir is None:
return
output_dir = Path(output_dir)
output_dir.mkdir(parents=True, exist_ok=True)
log_path = output_dir / AUDIT_LOG_FILENAME
path_str = str(Path(exec_args[0]).resolve()) if exec_args else ""
env_slice = {
"PATH": exec_env.get("PATH", "")[:500] + ("..." if len(exec_env.get("PATH", "")) > 500 else ""),
"PYTHONPATH": exec_env.get("PYTHONPATH", ""),
"PGSI_RUNS": exec_env.get("PGSI_RUNS", ""),
}
with log_path.open("a", encoding="utf-8") as f:
f.write(f"\n--- {datetime.now().isoformat()} | {algorithm} | {method} ---\n")
f.write(f"interpreter_absolute: {interpreter_absolute or path_str}\n")
f.write(f"exec_args: {exec_args}\n")
f.write(f"env PATH: {env_slice['PATH']}\n")
f.write(f"env PYTHONPATH: {env_slice['PYTHONPATH']}\n")
f.write(f"env PGSI_RUNS: {env_slice['PGSI_RUNS']}\n")
[docs]
def find_python_executable(method: str, tool_paths: Optional[ToolPaths] = None) -> str:
"""
Find the correct Python executable for a method.
Args:
method: Execution method ('cpython', 'pypy', etc.)
tool_paths: Optional ToolPaths configuration. If None, uses defaults.
Returns:
Path to Python executable (as string for subprocess)
Raises:
PlatformError: If required runtime not found
"""
if tool_paths is None:
# Fallback to default behavior for backwards compatibility
tool_paths = ToolPaths(python=Path(sys.executable))
if method == "cpython":
return str(tool_paths.python)
elif method == "pypy":
if tool_paths.pypy:
return str(tool_paths.pypy)
raise PlatformError(
"PyPy method selected but no valid PyPy executable found. "
"Configure PGSI_PYPY_PATH, use --pypy-path, or ensure 'pypy' is on PATH."
)
elif method in ("cython", "ctypes", "py_compile"):
# These use standard Python after compilation/preparation
return str(tool_paths.python)
else:
raise ValueError(f"Unknown execution method: {method}")
[docs]
def prepare_py_compile(benchmark_path: Path, tool_paths: Optional[ToolPaths] = None) -> Path:
"""
Pre-compile Python file to .pyc for py_compile method.
Args:
benchmark_path: Path to main.py file or to the py_compile directory
tool_paths: Optional ToolPaths configuration for Python executable
Returns:
Path to compiled .pyc file (or main.py if compilation fails)
"""
if benchmark_path.is_file() and benchmark_path.name == "main.py":
bench_dir = benchmark_path.parent
main_py = benchmark_path
else:
bench_dir = benchmark_path
main_py = benchmark_path / "main.py"
if not main_py.exists():
raise FileNotFoundError(f"main.py not found in {bench_dir}")
# Use configured Python or default
python_exe = str(tool_paths.python) if tool_paths else sys.executable
# Compile to .pyc
result = subprocess.run(
[python_exe, "-m", "py_compile", str(main_py)],
cwd=str(bench_dir),
capture_output=True,
text=True,
)
if result.returncode != 0:
# Fallback: use .py file directly
return main_py
# Find the compiled .pyc file
# Python 3.8+ uses __pycache__ directory
pycache = bench_dir / "__pycache__"
if pycache.exists():
pyc_files = list(pycache.glob("main*.pyc"))
if pyc_files:
return pyc_files[0]
# Fallback to .py
return main_py
[docs]
def execute_benchmark(
algorithm: str,
method: str,
benchmark_path: Path,
runs: int = 50,
output_dir: Path = None,
env: Optional[Dict[str, str]] = None,
tool_paths: Optional[ToolPaths] = None,
audit_logger: Optional[AuditLogger] = None,
) -> Dict[str, Path]:
"""
Execute a benchmark and collect measurement CSVs.
This function runs the benchmark in a subprocess, allowing the
measurement decorators to capture energy and time data.
Args:
algorithm: Algorithm name (for CSV naming)
method: Execution method
benchmark_path: Path to benchmark directory or main.py
runs: Number of runs (passed to benchmark script)
output_dir: Directory where CSVs will be written (defaults to benchmark_path)
env: Optional environment variables for subprocess
tool_paths: Optional ToolPaths configuration for Python/PyPy executables
Returns:
Dictionary with keys:
- 'energy_csv': Path to energy CSV file
- 'time_csv': Path to time CSV file
- 'system_info': Path to system info JSON
Raises:
MeasurementError: If execution fails
"""
if output_dir is None:
output_dir = benchmark_path
# Determine what to execute
if method == "py_compile":
# Pre-compile and execute .pyc
exec_path = prepare_py_compile(benchmark_path, tool_paths)
python_exe = str(tool_paths.python) if tool_paths else sys.executable
if exec_path.suffix == ".pyc":
# Execute .pyc directly
exec_args = [python_exe, str(exec_path)]
else:
# Fallback to .py
exec_args = [python_exe, str(exec_path)]
elif benchmark_path.is_file() and benchmark_path.name == "main.py":
# Direct main.py execution
python_exe = find_python_executable(method, tool_paths)
exec_args = [python_exe, str(benchmark_path)]
elif (benchmark_path / "main.py").exists():
# Directory with main.py
python_exe = find_python_executable(method, tool_paths)
exec_args = [python_exe, str(benchmark_path / "main.py")]
else:
raise FileNotFoundError(
f"Could not find main.py in {benchmark_path} for {algorithm}/{method}"
)
# Prepare environment
exec_env = os.environ.copy()
if env:
exec_env.update(env)
# Pass run count to benchmark subprocess (source of truth for decorator n)
exec_env["PGSI_RUNS"] = str(runs)
# Add package to Python path if needed
package_root = Path(__file__).parent.parent.parent
pythonpath = exec_env.get("PYTHONPATH", "")
if pythonpath:
exec_env["PYTHONPATH"] = f"{package_root}:{pythonpath}"
else:
exec_env["PYTHONPATH"] = str(package_root)
# Audit log: record exec_args and env for this run (interpreter absolute path for cpython/pypy)
interpreter_absolute = str(Path(exec_args[0]).resolve()) if exec_args else ""
_append_audit_log(output_dir, algorithm, method, exec_args, exec_env, interpreter_absolute)
# Path identity check: run interpreter to get runtime-reported sys.executable (once per method)
cwd = str(benchmark_path.parent if benchmark_path.is_file() else benchmark_path)
runtime_reported = None
if audit_logger is not None:
if method not in audit_logger._path_identity:
runtime_reported = get_runtime_executable(exec_args[0], exec_env, cwd=cwd)
audit_logger.set_path_identity(method, interpreter_absolute, runtime_reported)
else:
runtime_reported = audit_logger._path_identity[method].get("runtime_reported")
env_slice = {
"PATH": exec_env.get("PATH", "")[:500] + ("..." if len(exec_env.get("PATH", "")) > 500 else ""),
"PYTHONPATH": exec_env.get("PYTHONPATH", ""),
"PGSI_RUNS": exec_env.get("PGSI_RUNS", ""),
}
audit_logger.log_execution(
method=method,
algorithm=algorithm,
cmd=exec_args,
env_slice=env_slice,
resolved_interpreter=interpreter_absolute,
runtime_reported_path=runtime_reported,
)
# Execute benchmark
# The benchmark script will use decorators to write CSVs
try:
result = subprocess.run(
exec_args,
cwd=str(benchmark_path.parent if benchmark_path.is_file() else benchmark_path),
env=exec_env,
capture_output=True,
text=True,
timeout=3600, # 1 hour timeout per benchmark
)
if result.returncode != 0:
err_msg = (
f"Benchmark execution failed for {algorithm}/{method}:\n"
f"Command: {' '.join(exec_args)}\n"
f"Return code: {result.returncode}\n"
f"stdout: {result.stdout}\n"
f"stderr: {result.stderr}"
)
if method == "pypy" and "ModuleNotFoundError" in result.stderr and "psutil" in result.stderr:
err_msg += (
"\n\nPyPy is missing dependencies used by benchmark scripts. Install only what's needed:\n"
" pypy3 -m ensurepip # if pip is not installed\n"
" pypy3 -m pip install psutil python-dotenv"
)
raise MeasurementError(err_msg)
except subprocess.TimeoutExpired:
raise MeasurementError(
f"Benchmark execution timed out for {algorithm}/{method}"
)
except Exception as e:
raise MeasurementError(
f"Benchmark execution error for {algorithm}/{method}: {e}"
)
# Locate output CSVs
# Decorators write CSVs to folders:
# - Energy: folder_name (default "energy_benchmark") / {csv_filename}.csv
# - Time: folder_name (default "time_benchmark") / {csv_filename}.csv
# Benchmarks use csv_filename like "hanoi_cpython", "hanoi_pypy", etc.
# Determine search directory
# Benchmarks write to current working directory when executed
# The executor runs from benchmark_path, so CSVs will be relative to that
if benchmark_path.is_file():
search_dir = benchmark_path.parent
else:
search_dir = benchmark_path
# Search in multiple locations
search_dirs = [search_dir, output_dir, search_dir.parent, Path.cwd()]
# Expected CSV filename pattern from benchmarks (audit: energy_*.csv and time_*.csv)
# Many py_compile scripts use "pycompile" (no underscore) in csv_filename
# Some benchmarks (e.g. n-body) use algorithm with hyphens removed (nbody not n_body)
csv_base = f"{algorithm.replace('-', '_')}_{method}"
alg_no_hyphen = algorithm.replace("-", "")
csv_bases_to_try = [csv_base]
if "-" in algorithm:
csv_bases_to_try.append(f"{alg_no_hyphen}_{method}")
if method == "py_compile":
csv_bases_to_try.append(f"{algorithm.replace('-', '_')}_pycompile")
if "-" in algorithm:
csv_bases_to_try.append(f"{alg_no_hyphen}_py_compile")
csv_bases_to_try.append(f"{alg_no_hyphen}_pycompile")
# Prefer prefixed names (energy_*, time_*) for audit compliance
energy_bases = [f"energy_{b}" for b in csv_bases_to_try] + csv_bases_to_try
time_bases = [f"time_{b}" for b in csv_bases_to_try] + csv_bases_to_try
# Find CSV files
energy_csv = None
time_csv = None
system_info = None
# Search for default folders: "energy_benchmark" and "time_benchmark"
for search_base in search_dirs:
if not search_base or not search_base.exists():
continue
# Check default folder names
energy_folder = search_base / "energy_benchmark"
time_folder = search_base / "time_benchmark"
if energy_folder.exists():
for base in energy_bases:
csv_file = energy_folder / f"{base}.csv"
if csv_file.exists() and not energy_csv:
energy_csv = csv_file
break
if not energy_csv:
for csv_file in energy_folder.glob("energy_*.csv"):
if any(b.lower() in csv_file.stem.lower() for b in csv_bases_to_try) and not energy_csv:
energy_csv = csv_file
break
if not energy_csv:
for csv_file in energy_folder.glob("*.csv"):
if any(base.lower() in csv_file.name.lower() for base in csv_bases_to_try) and not energy_csv:
energy_csv = csv_file
break
if time_folder.exists():
for base in time_bases:
csv_file = time_folder / f"{base}.csv"
if csv_file.exists() and not time_csv:
time_csv = csv_file
break
if not time_csv:
for csv_file in time_folder.glob("time_*.csv"):
if any(b.lower() in csv_file.stem.lower() for b in csv_bases_to_try) and not time_csv:
time_csv = csv_file
break
if not time_csv:
for csv_file in time_folder.glob("*.csv"):
if any(base.lower() in csv_file.name.lower() for base in csv_bases_to_try) and not time_csv:
time_csv = csv_file
break
# Check for system_info
for folder in [energy_folder, time_folder]:
if folder.exists():
info_file = folder / "system_info_pyrapl.json"
if not info_file.exists():
info_file = folder / "system_info.json"
if info_file.exists() and not system_info:
system_info = info_file
# Broader search if not found - look for any folder with algorithm/method name
if not energy_csv or not time_csv:
for search_base in search_dirs:
if not search_base or not search_base.exists():
continue
for item in search_base.iterdir():
if not item.is_dir():
continue
folder_name_lower = item.name.lower()
if csv_base.lower() in folder_name_lower or (
algorithm.replace("-", "_").lower() in folder_name_lower
and method.lower() in folder_name_lower
):
for csv_file in item.glob("*.csv"):
try:
import pandas as pd
df = pd.read_csv(csv_file, nrows=1)
if 'package (uJ)' in df.columns and not energy_csv:
energy_csv = csv_file
elif 'execution_time (s)' in df.columns and not time_csv:
time_csv = csv_file
except:
pass
return {
"energy_csv": energy_csv,
"time_csv": time_csv,
"system_info": system_info,
}