# -*- coding: utf-8 -*-
"""
Postprocessing Wrapper for DVC Pipeline
This script executes the postprocessing stage that aggregates and concatenates
results from all RDM futures into final CSV files.
This script calls 1_output_dataset_creator.py which consolidates parquet outputs
from the experiment stage into final CSV files in src/Results/.
Usage:
python scripts/run_postprocess.py
Manual execution:
This script can be run independently to regenerate aggregated results.
DVC integration:
Called automatically by DVC when executing the 'postprocess' stage.
Author: AFR_RDM Team
"""
import os
import sys
import json
import time
from pathlib import Path
def generate_metrics(results_dir, metrics_file):
"""
Generate metrics JSON file for DVC tracking.
Tracks number of output CSV files and their sizes.
"""
metrics = {
"stage": "postprocess",
"timestamp": time.strftime("%Y-%m-%d %H:%M:%S"),
"output_files": 0,
"total_size_mb": 0.0,
"files_list": []
}
# Count and measure output files
if results_dir.exists():
csv_files = list(results_dir.glob('*.csv'))
metrics["output_files"] = len(csv_files)
for csv_file in csv_files:
size_mb = csv_file.stat().st_size / (1024 * 1024)
metrics["total_size_mb"] += size_mb
metrics["files_list"].append({
"filename": csv_file.name,
"size_mb": round(size_mb, 2)
})
# Round total size
metrics["total_size_mb"] = round(metrics["total_size_mb"], 2)
# Write metrics
metrics_file.parent.mkdir(parents=True, exist_ok=True)
with open(metrics_file, 'w') as f:
json.dump(metrics, f, indent=2)
print(f"β Metrics written to {metrics_file}")
print(f" - Output CSV files: {metrics['output_files']}")
print(f" - Total size: {metrics['total_size_mb']} MB")
[docs]
def main():
"""Main execution function"""
print("=" * 70)
print("AFR_RDM - Postprocessing (Results Aggregation)")
print("=" * 70)
# Paths
project_root = Path(__file__).parent.parent
experiment_dir = project_root / 'src' / 'workflow' / '1_Experiment'
postprocess_script = experiment_dir / '1_output_dataset_creator.py'
results_dir = project_root / 'src' / 'Results'
metrics_file = project_root / 'src' / 'workflow' / '3_Postprocessing' / 'postprocess_metrics.json'
interface_path = project_root / 'src' / 'Interface_RDM.xlsx'
# Verify postprocessing script exists
if not postprocess_script.exists():
print(f"β Error: {postprocess_script} not found")
sys.exit(1)
# Read parameters from Interface_RDM.xlsx
import pandas as pd
setup_df = pd.read_excel(interface_path, sheet_name='Setup')
region = str(setup_df.at[0, 'Region'])
start_time = time.time()
try:
print(f"π Experiment directory: {experiment_dir}")
print(f"π Script: {postprocess_script.name}")
print(f"π§ Region: {region}")
print()
# Change to src directory (script expects to be run from there)
original_dir = os.getcwd()
os.chdir(project_root / 'src')
print("π Executing 1_output_dataset_creator.py...")
print("-" * 70)
# Execute the postprocessing script as subprocess with region parameter
import subprocess
result = subprocess.run(
[sys.executable, str(postprocess_script), region],
check=True,
capture_output=False
)
print("-" * 70)
# Change back to original directory
os.chdir(original_dir)
# Ensure Results directory exists (even if empty) for DVC
results_dir.mkdir(parents=True, exist_ok=True)
# Generate metrics
elapsed_time = time.time() - start_time
print(f"\nβ±οΈ Execution time: {elapsed_time:.2f} seconds")
generate_metrics(results_dir, metrics_file)
print("\nβ
Postprocessing completed successfully!")
except Exception as e:
print(f"\nβ Error during postprocessing: {e}")
import traceback
traceback.print_exc()
sys.exit(1)
if __name__ == "__main__":
main()