refactor: restructure repo into core/app modules with per-study folders

Reorganize Palladium codebase into a modular architecture with `core/`
shared logic and `app/` Streamlit UI, separating per-study assets into
`studies/YYYYMM_<Vendor>/` folders containing notebooks, seed data, and
configuration. Update README to reflect new structure, add `.gitignore`
entries for `.env` and study exports, and refresh component documentation.
This commit is contained in:
2026-05-20 22:28:12 -04:00
parent a6f3ee3676
commit a2420ed692
52 changed files with 35300 additions and 105 deletions

0
app/__init__.py Normal file
View File

View File

32
app/components/charts.py Normal file
View File

@@ -0,0 +1,32 @@
"""Streamlit-friendly chart wrappers (delegate to core.notebook_helpers.charts)."""
from __future__ import annotations
import streamlit as st
from core.notebook_helpers import charts as core_charts
def cashflow(yearly_breakdown, *, initial_cost: float = 0.0) -> None:
fig = core_charts.cashflow_chart(yearly_breakdown, initial_cost=initial_cost)
st.plotly_chart(fig, use_container_width=True)
def benefits_bar(items) -> None:
fig = core_charts.benefits_bar(items)
st.plotly_chart(fig, use_container_width=True)
def cost_pie(items) -> None:
fig = core_charts.cost_breakdown_pie(items)
st.plotly_chart(fig, use_container_width=True)
def scenario_bars(scenarios) -> None:
fig = core_charts.scenario_comparison(scenarios)
st.plotly_chart(fig, use_container_width=True)
def waterfall(values) -> None:
fig = core_charts.waterfall(values)
st.plotly_chart(fig, use_container_width=True)

106
app/components/tables.py Normal file
View File

@@ -0,0 +1,106 @@
"""Streamlit data-editor wrappers for benefit/cost rows."""
from __future__ import annotations
import pandas as pd
import streamlit as st
def _years_for_table(fields: list[dict], analysis_years: int) -> list[int]:
"""Years 1..N — taken from analysis_period_years on the report."""
return list(range(1, max(int(analysis_years or 3), 1) + 1))
def value_editor(
table: str,
fields: list[dict],
values: list[dict],
*,
analysis_years: int,
key: str,
) -> pd.DataFrame:
"""
Render an ``st.data_editor`` for benefit or cost values.
The editor shows one row per field (filtered to ``table``), with year
columns, an ``initial`` column for costs, a risk_adjustment column, and
a notes column. Returns the edited DataFrame; the caller is responsible
for converting it back to value-row dicts and PUTting to Athena.
"""
fields = [f for f in fields if f.get("table") == table]
fields.sort(key=lambda f: int(f.get("sort_order") or 0))
by_key = {v.get("field_key"): v for v in values}
years = _years_for_table(fields, analysis_years)
rows: list[dict] = []
for f in fields:
v = by_key.get(f["field_key"], {}) or {}
yv = v.get("year_values") or {}
row = {
"field_key": f["field_key"],
"label": f.get("label", f["field_key"]),
"category": f.get("category", "") or "",
}
if table == "costs":
row["Initial"] = float(v.get("initial") or 0.0)
for y in years:
row[f"Year {y}"] = float(yv.get(str(y)) or 0.0)
row["risk_adj"] = float(v.get("risk_adjustment") or 0.0)
row["notes"] = v.get("notes", "") or ""
rows.append(row)
df = pd.DataFrame(rows)
column_config: dict = {
"field_key": st.column_config.TextColumn("Key", disabled=True, width="small"),
"label": st.column_config.TextColumn("Field", disabled=True),
"category": st.column_config.TextColumn("Category", disabled=True, width="small"),
"risk_adj": st.column_config.NumberColumn(
"Risk Adj.", min_value=0.0, max_value=1.0, step=0.05, format="%.2f"
),
"notes": st.column_config.TextColumn("Notes", width="medium"),
}
if table == "costs":
column_config["Initial"] = st.column_config.NumberColumn(
"Initial", format="$%.0f"
)
for y in years:
column_config[f"Year {y}"] = st.column_config.NumberColumn(
f"Year {y}", format="$%.0f"
)
edited = st.data_editor(
df,
column_config=column_config,
use_container_width=True,
num_rows="fixed",
hide_index=True,
key=key,
)
return edited
def df_to_values(df: pd.DataFrame, table: str, analysis_years: int) -> list[dict]:
"""Convert an edited DataFrame back to wire-format value rows."""
out: list[dict] = []
years = list(range(1, max(int(analysis_years or 3), 1) + 1))
for _, row in df.iterrows():
item: dict = {"field_key": row["field_key"], "table": table}
yv = {}
for y in years:
col = f"Year {y}"
if col in df.columns:
yv[str(y)] = float(row[col] or 0)
if yv:
item["year_values"] = yv
if table == "costs" and "Initial" in df.columns:
item["initial"] = float(row["Initial"] or 0)
ra = row.get("risk_adj")
if ra is not None and not pd.isna(ra):
item["risk_adjustment"] = float(ra)
notes = row.get("notes")
if isinstance(notes, str) and notes.strip():
item["notes"] = notes.strip()
out.append(item)
return out

138
app/main.py Normal file
View File

@@ -0,0 +1,138 @@
"""
Palladium Streamlit app — TEI data entry, calculation, versioning, export.
Run from the project root::
streamlit run app/main.py
The app picks a TEI tool by ``public_id`` (or creates one from a Report
template) and exposes Benefits, Costs, Summary, and Versions pages. It is
study-agnostic — the field set is loaded dynamically from Athena based on
the linked Report template.
"""
from __future__ import annotations
import sys
from pathlib import Path
# Allow `streamlit run app/main.py` from project root without `pip install -e .`
_ROOT = Path(__file__).resolve().parent.parent
if str(_ROOT) not in sys.path:
sys.path.insert(0, str(_ROOT))
import streamlit as st
from core.tei_client import AthenaAPIError, TEIClient
st.set_page_config(
page_title="Palladium — TEI Calculator",
page_icon="🛡️",
layout="wide",
)
@st.cache_resource(show_spinner=False)
def get_client() -> TEIClient:
return TEIClient()
def _safe_call(fn, *args, **kwargs):
"""Run an API call, surfacing errors as Streamlit messages."""
try:
return fn(*args, **kwargs)
except AthenaAPIError as e:
st.error(f"Athena API error {e.status_code}: {e.detail}")
except ValueError as e:
st.error(str(e))
return None
def sidebar_tool_picker(client: TEIClient) -> dict | None:
"""Sidebar: pick an existing TEI tool or create one from a report template."""
st.sidebar.title("🛡️ Palladium")
st.sidebar.caption("TEI Calculator")
tools = _safe_call(client.list_tools) or []
if tools:
labels = {
f"{t.get('name', '(unnamed)')}{t.get('id', '')[:8]}": t for t in tools
}
choice = st.sidebar.selectbox("TEI Tool", list(labels.keys()))
tool = labels[choice]
else:
st.sidebar.info("No TEI tools yet. Create one below.")
tool = None
with st.sidebar.expander("Create new tool"):
reports = _safe_call(client.list_reports) or []
if not reports:
st.write("No report templates available.")
else:
report_labels = {f"{r['name']} ({r['vendor']} {r['version']})": r for r in reports}
r_choice = st.selectbox("Report template", list(report_labels.keys()))
new_name = st.text_input("Tool name (optional)", "")
proposal_id = st.number_input(
"Proposal ID (optional)", min_value=0, value=0, step=1
)
if st.button("Create"):
report = report_labels[r_choice]
created = _safe_call(
client.create_tool,
report_public_id=report["id"],
proposal=int(proposal_id) or None,
name=new_name or None,
)
if created:
st.success(f"Created tool {created.get('id')}")
st.rerun()
if tool:
st.sidebar.divider()
st.sidebar.markdown(f"**Public ID**: `{tool.get('id')}`")
st.sidebar.markdown(f"**Status**: {tool.get('status', '?')}")
st.sidebar.markdown(f"**Version**: {tool.get('current_version', 0)}")
if st.sidebar.button("🔄 Recalculate"):
_safe_call(client.calculate, tool["id"])
st.toast("Recalculated.", icon="")
st.cache_data.clear()
return tool
def main() -> None:
st.title("Palladium — TEI Calculator")
try:
client = get_client()
except ValueError as e:
st.error(str(e))
st.info("Set ATHENA_BASE_URL and ATHENA_API_KEY in your `.env` file.")
st.stop()
return
tool = sidebar_tool_picker(client)
if tool is None:
st.info("Pick or create a TEI tool from the sidebar to begin.")
return
# Tab navigation — matches `app/pages/*` modules but kept as tabs so all
# views share the chosen tool/state without re-querying.
tabs = st.tabs(["📊 Summary", "💰 Benefits", "💸 Costs", "🕒 Versions"])
from app.pages import benefits as benefits_page
from app.pages import costs as costs_page
from app.pages import summary as summary_page
from app.pages import versions as versions_page
with tabs[0]:
summary_page.render(client, tool)
with tabs[1]:
benefits_page.render(client, tool)
with tabs[2]:
costs_page.render(client, tool)
with tabs[3]:
versions_page.render(client, tool)
if __name__ == "__main__":
main()

0
app/pages/__init__.py Normal file
View File

28
app/pages/_helpers.py Normal file
View File

@@ -0,0 +1,28 @@
"""Common helpers shared by the page modules."""
from __future__ import annotations
import streamlit as st
from core.tei_client import AthenaAPIError, TEIClient
def report_meta(client: TEIClient, tool: dict) -> dict:
"""Fetch the linked report (handles both nested-object and id-only forms)."""
report_obj = tool.get("report")
if isinstance(report_obj, dict):
return report_obj
if isinstance(report_obj, str):
try:
return client.get_report(report_obj)
except AthenaAPIError as e:
st.error(f"Failed to load report template: {e}")
return {}
def safe(fn, *args, **kwargs):
try:
return fn(*args, **kwargs)
except AthenaAPIError as e:
st.error(f"Athena API error {e.status_code}: {e.detail}")
return None

46
app/pages/benefits.py Normal file
View File

@@ -0,0 +1,46 @@
"""Benefits data-entry tab."""
from __future__ import annotations
import streamlit as st
from app.components.tables import df_to_values, value_editor
from app.pages._helpers import report_meta, safe
from core.tei_client import TEIClient
def render(client: TEIClient, tool: dict) -> None:
st.header("💰 Benefits")
public_id = tool["id"]
report = report_meta(client, tool)
analysis_years = int(report.get("analysis_period_years") or 3)
fields = safe(client.list_fields, report.get("id"), "benefits") or []
values = [v for v in safe(client.get_values, public_id) or [] if v.get("table") == "benefits"]
if not fields:
st.info("This report template has no benefit fields defined.")
return
edited = value_editor(
"benefits",
fields,
values,
analysis_years=analysis_years,
key=f"benefits_editor_{public_id}",
)
col1, col2 = st.columns([1, 4])
with col1:
if st.button("💾 Save benefits", use_container_width=True):
payload = df_to_values(edited, "benefits", analysis_years)
result = safe(client.update_values, public_id, payload)
if result is not None:
st.success(f"Saved {len(payload)} benefit values.")
st.cache_data.clear()
with col2:
st.caption(
"Values are saved as nominal annual amounts. Risk adjustments are "
"applied at calculate time. Use the Recalculate button in the "
"sidebar after saving to refresh the summary."
)

46
app/pages/costs.py Normal file
View File

@@ -0,0 +1,46 @@
"""Costs data-entry tab."""
from __future__ import annotations
import streamlit as st
from app.components.tables import df_to_values, value_editor
from app.pages._helpers import report_meta, safe
from core.tei_client import TEIClient
def render(client: TEIClient, tool: dict) -> None:
st.header("💸 Costs")
public_id = tool["id"]
report = report_meta(client, tool)
analysis_years = int(report.get("analysis_period_years") or 3)
fields = safe(client.list_fields, report.get("id"), "costs") or []
values = [v for v in safe(client.get_values, public_id) or [] if v.get("table") == "costs"]
if not fields:
st.info("This report template has no cost fields defined.")
return
edited = value_editor(
"costs",
fields,
values,
analysis_years=analysis_years,
key=f"costs_editor_{public_id}",
)
col1, col2 = st.columns([1, 4])
with col1:
if st.button("💾 Save costs", use_container_width=True):
payload = df_to_values(edited, "costs", analysis_years)
result = safe(client.update_values, public_id, payload)
if result is not None:
st.success(f"Saved {len(payload)} cost values.")
st.cache_data.clear()
with col2:
st.caption(
"The Initial column is undiscounted year-0 spend. Year columns "
"are end-of-year cashflows. Costs are risk-adjusted upward "
"(higher risk → higher cost)."
)

104
app/pages/summary.py Normal file
View File

@@ -0,0 +1,104 @@
"""Financial summary dashboard tab."""
from __future__ import annotations
import streamlit as st
from app.components import charts
from app.pages._helpers import report_meta, safe
from core.export import build_report_data
from core.tei_client import AthenaAPIError, TEIClient
def render(client: TEIClient, tool: dict) -> None:
st.header("📊 Financial Summary")
public_id = tool["id"]
report = report_meta(client, tool)
try:
summary = client.get_summary(public_id)
except AthenaAPIError as e:
if e.status_code == 404:
st.info(
"No summary yet — click **Recalculate** in the sidebar after "
"filling in benefits and costs."
)
return
st.error(f"Athena API error: {e.detail}")
return
npv = float(summary.get("npv") or 0)
roi = float(summary.get("roi") or summary.get("roi_pct") or 0)
payback = summary.get("payback_months")
bpv = float(summary.get("total_benefits_pv") or 0)
cpv = float(summary.get("total_costs_pv") or 0)
cols = st.columns(5)
cols[0].metric("NPV", f"${npv/1_000_000:,.1f}M")
cols[1].metric("ROI", f"{roi:,.0f}%")
cols[2].metric(
"Payback",
f"{float(payback):.1f} months" if payback is not None else "N/A",
)
cols[3].metric("Benefits PV", f"${bpv/1_000_000:,.1f}M")
cols[4].metric("Costs PV", f"${cpv/1_000_000:,.1f}M")
st.divider()
yb = summary.get("yearly_breakdown") or []
initial = float(summary.get("initial_costs") or 0)
if yb:
charts.cashflow(yb, initial_cost=initial)
with st.expander("Cash flow table"):
st.dataframe(yb, use_container_width=True, hide_index=True)
else:
st.caption("No yearly breakdown in this summary.")
# Scenario comparison — computed locally from current values
with st.expander("Scenario analysis (conservative / moderate / aggressive)"):
envelope = safe(
build_report_data,
client,
public_id,
include_scenarios=True,
study_slug=report.get("name", ""),
)
if envelope and envelope.get("scenarios"):
charts.scenario_bars(envelope["scenarios"])
rows = [
{
"Scenario": k,
"Benefits PV": float(v.get("total_benefits_pv") or 0),
"Costs PV": float(v.get("total_costs_pv") or 0),
"NPV": float(v.get("npv") or 0),
"ROI %": float(v.get("roi_pct") or 0),
"Payback (months)": (
round(float(v.get("payback_months") or 0), 1)
if v.get("payback_months") is not None
else None
),
}
for k, v in envelope["scenarios"].items()
]
st.dataframe(rows, use_container_width=True, hide_index=True)
# Export button
st.divider()
if st.button("📦 Build export envelope (JSON)"):
envelope = safe(
build_report_data,
client,
public_id,
include_scenarios=True,
study_slug=report.get("name", ""),
)
if envelope:
import json
data = json.dumps(envelope, indent=2, default=str)
st.download_button(
"Download export.json",
data=data,
file_name=f"{public_id}_export.json",
mime="application/json",
)

117
app/pages/versions.py Normal file
View File

@@ -0,0 +1,117 @@
"""Version history tab — list, diff, save, restore."""
from __future__ import annotations
import streamlit as st
from app.pages._helpers import safe
from core.tei_client import TEIClient
def _flatten_values(values: list[dict]) -> dict[str, dict]:
"""Index a values list by field_key for easy diffing."""
return {v.get("field_key", ""): v for v in values}
def _diff_rows(a: dict[str, dict], b: dict[str, dict]) -> list[dict]:
"""Return one row per field with side-by-side year values."""
keys = sorted(set(a.keys()) | set(b.keys()))
rows: list[dict] = []
for k in keys:
av = a.get(k, {}) or {}
bv = b.get(k, {}) or {}
ay = av.get("year_values") or {}
by = bv.get("year_values") or {}
years = sorted(set(ay.keys()) | set(by.keys()), key=lambda x: int(x))
for y in years:
a_val = float(ay.get(y) or 0)
b_val = float(by.get(y) or 0)
if abs(a_val - b_val) < 1e-9:
continue
rows.append(
{
"field_key": k,
"year": y,
"left": a_val,
"right": b_val,
"delta": b_val - a_val,
}
)
return rows
def render(client: TEIClient, tool: dict) -> None:
st.header("🕒 Versions")
public_id = tool["id"]
versions = safe(client.list_versions, public_id) or []
versions = sorted(
versions, key=lambda v: int(v.get("version_number") or 0), reverse=True
)
# Save new version
with st.expander(" Save current state as a new version", expanded=not versions):
note = st.text_area(
"Version note",
placeholder=(
"What changed? E.g. 'CFO confirmed 1.8M contacts/month; "
"raised legacy license cost from $160 to $180/agent.'"
),
)
if st.button("💾 Save version", disabled=not note.strip()):
result = safe(client.save_version, public_id, note.strip())
if result:
st.success(
f"Saved version {result.get('version_number', '?')}."
)
st.rerun()
if not versions:
st.info("No versions saved yet.")
return
# Listing
st.subheader("History")
rows = []
for v in versions:
snap = v.get("summary_snapshot") or v.get("summary") or {}
rows.append(
{
"Version": v.get("version_number"),
"Date": v.get("created_at") or v.get("date"),
"NPV": float(snap.get("npv") or 0),
"ROI %": float(snap.get("roi") or snap.get("roi_pct") or 0),
"Note": v.get("note", ""),
}
)
st.dataframe(rows, use_container_width=True, hide_index=True)
# Compare two versions
st.subheader("Compare")
if len(versions) < 2:
st.caption("Save two or more versions to compare.")
return
labels = {f"v{v['version_number']}{v.get('note', '')[:40]}": v for v in versions}
keys = list(labels.keys())
c1, c2 = st.columns(2)
with c1:
left_label = st.selectbox("Left (older)", keys, index=min(1, len(keys) - 1))
with c2:
right_label = st.selectbox("Right (newer)", keys, index=0)
if left_label == right_label:
st.caption("Pick two different versions to see a diff.")
return
left = safe(client.get_version, public_id, labels[left_label]["version_number"])
right = safe(client.get_version, public_id, labels[right_label]["version_number"])
if not (left and right):
return
a_values = left.get("values_snapshot") or left.get("values") or []
b_values = right.get("values_snapshot") or right.get("values") or []
diff = _diff_rows(_flatten_values(a_values), _flatten_values(b_values))
if not diff:
st.success("No value differences between these versions.")
else:
st.dataframe(diff, use_container_width=True, hide_index=True)