Files
palladium/app/views/summary.py
Robert Helewka 64fb83257d feat: add GenesysCX study and fix Streamlit chart key collisions
- Add 202512_GenesysCX TEI study (config, seed data, notebooks, README)
  with NPV $10.8M / ROI 266% including AI-token cost line
- Add explicit `key` parameter to all chart wrappers in app/components
  to prevent StreamlitDuplicateElementId errors when the same figure
  type renders across Summary/Benefits/Costs tabs
- Render benefits bar and cost pie charts on their respective tabs
- Add benefits_vs_costs_by_year chart wrapper
2026-06-10 14:26:49 -04:00

203 lines
7.6 KiB
Python

"""Financial summary dashboard tab."""
from __future__ import annotations
import streamlit as st
from app.components import charts
from app.locale import CURRENCY_SYMBOL, currency_fmt, fmt_currency
from app.utils import icon
from app.views._helpers import report_meta, safe
from core.export import build_report_data
from core.tei_client import AthenaAPIError, TEIClient
def render(client: TEIClient, tool: dict) -> None:
st.markdown(
f"<h2>{icon('bar-chart-line')} Financial Summary</h2>",
unsafe_allow_html=True,
)
public_id = tool["id"]
report = report_meta(client, tool)
try:
summary = client.get_summary(public_id)
except AthenaAPIError as e:
if e.status_code == 404:
st.info(
"No summary yet — click **Recalculate** in the sidebar after "
"filling in benefits and costs."
)
return
st.error(f"Athena API error: {e.detail}")
return
npv = float(summary.get("net_present_value") or summary.get("npv") or 0)
roi = float(
summary.get("roi_percentage")
or summary.get("roi")
or summary.get("roi_pct")
or 0
)
payback = summary.get("payback_period_months", summary.get("payback_months"))
bpv = float(summary.get("total_benefits_pv") or 0)
cpv = float(summary.get("total_costs_pv") or 0)
cols = st.columns(5)
cols[0].metric("NPV", f"{CURRENCY_SYMBOL}{npv/1_000_000:,.1f}M")
cols[1].metric("ROI", f"{roi:,.0f}%")
cols[2].metric(
"Payback",
f"{float(payback):.1f} months" if payback is not None else "N/A",
)
cols[3].metric("Benefits PV", f"{CURRENCY_SYMBOL}{bpv/1_000_000:,.1f}M")
cols[4].metric("Costs PV", f"{CURRENCY_SYMBOL}{cpv/1_000_000:,.1f}M")
st.divider()
# ── Financial visualizations ────────────────────────────────────
# Built from the live value rows so Year-0 "Initial" amounts stay
# separate (Athena's per-year summary folds them into Year 1).
values = safe(client.get_values, public_id) or []
benefit_rows = [v for v in values if v.get("table") == "benefits"]
cost_rows = [v for v in values if v.get("table") == "costs"]
if benefit_rows or cost_rows:
col_pie, col_bar = st.columns(2)
with col_pie:
charts.cost_pie(cost_rows, key=f"summary_pie_{public_id}")
with col_bar:
charts.benefits_bar(benefit_rows, key=f"summary_bar_{public_id}")
charts.benefits_vs_costs_by_year(
benefit_rows, cost_rows, key=f"summary_by_year_{public_id}"
)
# Cash flow + cumulative net — the Forrester-style exhibit.
def _yearly_breakdown_from_values():
initial = sum(float(c.get("initial") or 0) for c in cost_rows)
years: set[int] = set()
for v in [*benefit_rows, *cost_rows]:
years.update(int(y) for y in (v.get("year_values") or {}))
rows, cumulative = [], -initial
for y in sorted(years):
b = sum(
float((v.get("year_values") or {}).get(str(y), 0) or 0)
* (1 - float(v.get("risk_adjustment") or 0))
for v in benefit_rows
)
c = sum(
float((v.get("year_values") or {}).get(str(y), 0) or 0)
for v in cost_rows
)
cumulative += b - c
rows.append(
{"year": y, "benefits": b, "costs": c, "net": b - c,
"cumulative_net": cumulative}
)
return rows, initial
yb, initial = ([], 0.0)
if benefit_rows or cost_rows:
yb, initial = _yearly_breakdown_from_values()
if not yb:
# Fallback: documented per-year summary keys (initial folded in Y1).
n = 1
while f"benefits_year_{n}" in summary or f"costs_year_{n}" in summary:
b = float(summary.get(f"benefits_year_{n}") or 0)
c = float(summary.get(f"costs_year_{n}") or 0)
yb.append({"year": n, "benefits": b, "costs": c, "net": b - c})
n += 1
initial = float(summary.get("initial_costs") or 0)
if yb:
charts.cashflow(yb, initial_cost=initial, key=f"summary_cashflow_{public_id}")
with st.expander("Cash flow table"):
_cur = currency_fmt()
st.dataframe(
yb,
column_config={
"year": st.column_config.NumberColumn("Year", format="%d"),
"benefits": st.column_config.NumberColumn("Benefits", format=_cur),
"costs": st.column_config.NumberColumn("Costs", format=_cur),
"net": st.column_config.NumberColumn("Net", format=_cur),
},
width="stretch",
hide_index=True,
)
else:
st.caption("No yearly breakdown in this summary.")
# Waterfall — Benefits PV down to NPV.
if bpv or cpv:
charts.waterfall([
("Benefits PV", bpv),
("Costs PV", -cpv),
("NPV", npv),
], key=f"summary_waterfall_{public_id}")
# Scenario comparison — computed locally from current values
with st.expander("Scenario analysis (conservative / moderate / aggressive)"):
envelope = safe(
build_report_data,
client,
public_id,
include_scenarios=True,
study_slug=report.get("name", ""),
)
if envelope and envelope.get("scenarios"):
charts.scenario_bars(
envelope["scenarios"], key=f"summary_scenarios_{public_id}"
)
rows = [
{
"Scenario": k,
"Benefits PV": float(v.get("total_benefits_pv") or 0),
"Costs PV": float(v.get("total_costs_pv") or 0),
"NPV": float(v.get("npv") or 0),
"ROI %": float(v.get("roi_pct") or 0),
"Payback (months)": (
round(float(v.get("payback_months") or 0), 1)
if v.get("payback_months") is not None
else None
),
}
for k, v in envelope["scenarios"].items()
]
_cur = currency_fmt()
st.dataframe(
rows,
column_config={
"Scenario": st.column_config.TextColumn("Scenario"),
"Benefits PV": st.column_config.NumberColumn("Benefits PV", format=_cur),
"Costs PV": st.column_config.NumberColumn("Costs PV", format=_cur),
"NPV": st.column_config.NumberColumn("NPV", format=_cur),
"ROI %": st.column_config.NumberColumn("ROI %", format="%.1f%%"),
"Payback (months)": st.column_config.NumberColumn(
"Payback (months)", format="%.1f"
),
},
width="stretch",
hide_index=True,
)
# Export button
st.divider()
if st.button("Build export envelope (JSON)"):
envelope = safe(
build_report_data,
client,
public_id,
include_scenarios=True,
study_slug=report.get("name", ""),
)
if envelope:
import json
data = json.dumps(envelope, indent=2, default=str)
st.download_button(
"Download export.json",
data=data,
file_name=f"{public_id}_export.json",
mime="application/json",
)