114 lines
3.8 KiB
Python
114 lines
3.8 KiB
Python
"""Streamlit data-editor wrappers for benefit/cost rows."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import pandas as pd
|
|
import streamlit as st
|
|
|
|
|
|
def _years_for_table(fields: list[dict], analysis_years: int) -> list[int]:
|
|
"""Years 1..N — taken from analysis_period_years on the report."""
|
|
return list(range(1, max(int(analysis_years or 3), 1) + 1))
|
|
|
|
|
|
def value_editor(
|
|
table: str,
|
|
fields: list[dict],
|
|
values: list[dict],
|
|
*,
|
|
analysis_years: int,
|
|
key: str,
|
|
) -> pd.DataFrame:
|
|
"""
|
|
Render an ``st.data_editor`` for benefit or cost values.
|
|
|
|
The editor shows one row per field (filtered to ``table``), with year
|
|
columns, an ``initial`` column for costs, a risk_adjustment column, and
|
|
a notes column. Returns the edited DataFrame; the caller is responsible
|
|
for converting it back to value-row dicts and PUTting to Athena.
|
|
"""
|
|
fields = [
|
|
f
|
|
for f in fields
|
|
if f.get("table") == table
|
|
# Companion "<key>_initial" fields are edited via the Initial column
|
|
# on their parent cost row, not as standalone rows.
|
|
and not str(f.get("field_key", "")).endswith("_initial")
|
|
]
|
|
fields.sort(key=lambda f: int(f.get("sort_order") or 0))
|
|
|
|
by_key = {v.get("field_key"): v for v in values}
|
|
years = _years_for_table(fields, analysis_years)
|
|
|
|
rows: list[dict] = []
|
|
for f in fields:
|
|
v = by_key.get(f["field_key"], {}) or {}
|
|
yv = v.get("year_values") or {}
|
|
row = {
|
|
"field_key": f["field_key"],
|
|
"label": f.get("label", f["field_key"]),
|
|
"category": f.get("category", "") or "",
|
|
}
|
|
if table == "costs":
|
|
row["Initial"] = float(v.get("initial") or 0.0)
|
|
for y in years:
|
|
row[f"Year {y}"] = float(yv.get(str(y)) or 0.0)
|
|
row["risk_adj"] = float(v.get("risk_adjustment") or 0.0)
|
|
row["notes"] = v.get("notes", "") or ""
|
|
rows.append(row)
|
|
|
|
df = pd.DataFrame(rows)
|
|
|
|
column_config: dict = {
|
|
"field_key": st.column_config.TextColumn("Key", disabled=True, width="small"),
|
|
"label": st.column_config.TextColumn("Field", disabled=True),
|
|
"category": st.column_config.TextColumn("Category", disabled=True, width="small"),
|
|
"risk_adj": st.column_config.NumberColumn(
|
|
"Risk Adj.", min_value=0.0, max_value=1.0, step=0.05, format="%.2f"
|
|
),
|
|
"notes": st.column_config.TextColumn("Notes", width="medium"),
|
|
}
|
|
if table == "costs":
|
|
column_config["Initial"] = st.column_config.NumberColumn(
|
|
"Initial", format="$%.0f"
|
|
)
|
|
for y in years:
|
|
column_config[f"Year {y}"] = st.column_config.NumberColumn(
|
|
f"Year {y}", format="$%.0f"
|
|
)
|
|
|
|
edited = st.data_editor(
|
|
df,
|
|
column_config=column_config,
|
|
use_container_width=True,
|
|
num_rows="fixed",
|
|
hide_index=True,
|
|
key=key,
|
|
)
|
|
return edited
|
|
|
|
|
|
def df_to_values(df: pd.DataFrame, table: str, analysis_years: int) -> list[dict]:
|
|
"""Convert an edited DataFrame back to wire-format value rows."""
|
|
out: list[dict] = []
|
|
years = list(range(1, max(int(analysis_years or 3), 1) + 1))
|
|
for _, row in df.iterrows():
|
|
item: dict = {"field_key": row["field_key"], "table": table}
|
|
yv = {}
|
|
for y in years:
|
|
col = f"Year {y}"
|
|
if col in df.columns:
|
|
yv[str(y)] = float(row[col] or 0)
|
|
if yv:
|
|
item["year_values"] = yv
|
|
if table == "costs" and "Initial" in df.columns:
|
|
item["initial"] = float(row["Initial"] or 0)
|
|
ra = row.get("risk_adj")
|
|
if ra is not None and not pd.isna(ra):
|
|
item["risk_adjustment"] = float(ra)
|
|
notes = row.get("notes")
|
|
if isinstance(notes, str) and notes.strip():
|
|
item["notes"] = notes.strip()
|
|
out.append(item)
|
|
return out
|