Files
ev-charger-as/backend/main.py
byun 585cacfa13 대시보드 — 충전기별 에러코드 누적 순위 차트 추가
- /api/stats/charger-error-codes 엔드포인트 추가
  (Top 10 충전기 × Top 6 에러코드 stacked bar, 나머지 기타로 합산)
- dashboard.html: 에러코드 누적 순위 가로 스택 바 차트 카드 추가
  (클릭 시 해당 충전기 신고 목록으로 이동)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-06-01 08:23:59 +09:00

606 lines
24 KiB
Python

from fastapi import FastAPI, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from fastapi.staticfiles import StaticFiles
from datetime import datetime, timedelta
from collections import defaultdict
import calendar as cal_module
import os
from routers import auth_router, chargers, reports, repairs, costs, improvements, accounts, settings, export, manufacturers
from routers import holidays
app = FastAPI(title="EV 충전기 AS 관리 시스템", version="1.0.0")
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# 라우터 등록
app.include_router(auth_router.router)
app.include_router(chargers.router)
app.include_router(reports.router)
app.include_router(repairs.router)
app.include_router(costs.router)
app.include_router(improvements.router)
app.include_router(accounts.router)
app.include_router(settings.router)
app.include_router(export.router)
app.include_router(manufacturers.router)
app.include_router(holidays.router)
def _calc_business_hours(start: datetime, end: datetime, holiday_dates: set,
work_start: int = 9, work_end: int = 18) -> float:
"""업무시간(평일 09:00-18:00, 공휴일 제외) 기준 경과 시간 계산."""
if not start or not end or end <= start:
return 0.0
s = start.replace(tzinfo=None)
e = end.replace(tzinfo=None)
total = 0.0
cur_date = s.date()
end_date = e.date()
while cur_date <= end_date:
if cur_date.weekday() < 5 and cur_date not in holiday_dates:
day_ws = datetime(cur_date.year, cur_date.month, cur_date.day, work_start)
day_we = datetime(cur_date.year, cur_date.month, cur_date.day, work_end)
seg_start = max(s, day_ws)
seg_end = min(e, day_we)
if seg_end > seg_start:
total += (seg_end - seg_start).total_seconds() / 3600
cur_date += timedelta(days=1)
return round(total, 1)
def _calc_holiday_excluded_hours(start: datetime, end: datetime, holiday_dates: set) -> float:
"""공휴일을 제외하고 나머지 날(주말 포함)은 24시간 전체 카운트."""
if not start or not end or end <= start:
return 0.0
s = start.replace(tzinfo=None)
e = end.replace(tzinfo=None)
total = 0.0
cur_date = s.date()
end_date = e.date()
while cur_date <= end_date:
if cur_date not in holiday_dates:
day_s = datetime(cur_date.year, cur_date.month, cur_date.day)
day_e = day_s + timedelta(days=1)
seg_start = max(s, day_s)
seg_end = min(e, day_e)
if seg_end > seg_start:
total += (seg_end - seg_start).total_seconds() / 3600
cur_date += timedelta(days=1)
return round(total, 1)
@app.get("/api/health")
def health():
return {"status": "ok"}
@app.get("/api/stats")
def stats(db=None):
from database import SessionLocal
from sqlalchemy import func, text
from models import Report, Repair, RepairCost, Improvement, SystemSetting, Holiday
db = SessionLocal()
try:
total = db.query(Report).count()
pending = db.query(Report).filter(Report.status.in_(["pending","pending_approval"])).count()
in_prog = db.query(Report).filter(Report.status == "in_progress").count()
done = db.query(Report).filter(Report.status == "done").count()
cost_pend = db.query(RepairCost).filter(RepairCost.cost_status == "pending").count()
imp_open = db.query(Improvement).filter(
Improvement.status.in_(["registered","reviewing","developing"])).count()
# ── 설정 읽기 ──
def _setting(key, default):
r = db.query(SystemSetting).filter_by(key=key).first()
return r.value if r else default
_base = _setting("time_metric_base", "occurred")
# 구버전 "true"/"false" → 신버전 mode 문자열로 정규화
_raw_mode = _setting("time_metric_worktime", "off")
if _raw_mode == "true": _raw_mode = "worktime"
elif _raw_mode == "false": _raw_mode = "off"
_mode = _raw_mode # "off" | "holiday_24h" | "worktime"
if _base == "reported":
t_join = "rep.reported_at"
t_plain = "reported_at"
else:
t_join = "COALESCE(rep.occurred_at, rep.reported_at)"
t_plain = "COALESCE(occurred_at, reported_at)"
if _mode in ("worktime", "holiday_24h"):
# ── Python 기반 계산 (공휴일 테이블 활용) ──
holiday_dates = {r.holiday_date for r in db.query(Holiday).all()}
calc_fn = _calc_business_hours if _mode == "worktime" else _calc_holiday_excluded_hours
def _avg_py(interval_days):
rows = db.execute(text(f"""
SELECT {t_join} AS start_t, r.completed_at
FROM repairs r
JOIN repair_reports rr ON rr.repair_id = r.id
JOIN reports rep ON rep.id = rr.report_id
WHERE r.completed_at IS NOT NULL
AND r.completed_at >= NOW() - INTERVAL '{interval_days} days'
""")).fetchall()
h_list = [calc_fn(row[0], row[1], holiday_dates)
for row in rows if row[0] and row[1]]
return round(sum(h_list) / len(h_list), 1) if h_list else None
avg_30d = _avg_py(30)
avg_7d = _avg_py(7)
pending_rows = db.execute(text(f"""
SELECT {t_plain} AS start_t
FROM reports
WHERE status IN ('pending','pending_approval','in_progress','waiting','revisit')
AND {t_plain} IS NOT NULL
""")).fetchall()
now = datetime.now()
h_pending = [calc_fn(row[0], now, holiday_dates) for row in pending_rows if row[0]]
over_24h = sum(1 for h in h_pending if h > 24)
over_72h = sum(1 for h in h_pending if h > 72)
longest_h = max(h_pending, default=0.0)
else:
# ── 단순 경과시간 기준 계산 (SQL) ──
avg_30d = db.execute(text(f"""
SELECT ROUND(AVG(EXTRACT(EPOCH FROM (r.completed_at - {t_join}))/3600)::numeric, 1)
FROM repairs r
JOIN repair_reports rr ON rr.repair_id = r.id
JOIN reports rep ON rep.id = rr.report_id
WHERE r.completed_at IS NOT NULL
AND r.completed_at >= NOW() - INTERVAL '30 days'
""")).scalar()
avg_7d = db.execute(text(f"""
SELECT ROUND(AVG(EXTRACT(EPOCH FROM (r.completed_at - {t_join}))/3600)::numeric, 1)
FROM repairs r
JOIN repair_reports rr ON rr.repair_id = r.id
JOIN reports rep ON rep.id = rr.report_id
WHERE r.completed_at IS NOT NULL
AND r.completed_at >= NOW() - INTERVAL '7 days'
""")).scalar()
avg_30d = float(avg_30d) if avg_30d else None
avg_7d = float(avg_7d) if avg_7d else None
row = db.execute(text(f"""
SELECT
COUNT(*) FILTER (WHERE EXTRACT(EPOCH FROM (NOW()-{t_plain}))/3600 > 24) AS over_24h,
COUNT(*) FILTER (WHERE EXTRACT(EPOCH FROM (NOW()-{t_plain}))/3600 > 72) AS over_72h,
COALESCE(MAX(ROUND(EXTRACT(EPOCH FROM (NOW()-{t_plain}))/3600, 1)), 0) AS longest_h
FROM reports
WHERE status IN ('pending','pending_approval','in_progress','waiting','revisit')
""")).fetchone()
over_24h = int(row.over_24h)
over_72h = int(row.over_72h)
longest_h = float(row.longest_h)
return {
"total": total, "pending": pending,
"in_progress": in_prog, "done": done,
"cost_pending": cost_pend, "improvement_open": imp_open,
"time_metric_base": _base,
"time_metric_worktime": _mode,
"avg_resolution_hours_30d": avg_30d,
"avg_resolution_hours_7d": avg_7d,
"pending_over_24h": over_24h,
"pending_over_72h": over_72h,
"longest_pending_hours": longest_h,
}
finally:
db.close()
def _month_range(n: int = 13):
"""최근 n개월 목록 생성 (YYYY-MM 형식)."""
now = datetime.now()
result = []
for i in range(n - 1, -1, -1):
m = now.month - i
y = now.year
while m <= 0:
m += 12
y -= 1
result.append(f"{y:04d}-{m:02d}")
return result
@app.get("/api/stats/monthly")
def stats_monthly(months: int = 13):
from database import SessionLocal
from sqlalchemy import text
from models import SystemSetting, Holiday
db = SessionLocal()
try:
def _setting(key, default):
r = db.query(SystemSetting).filter_by(key=key).first()
return r.value if r else default
_base = _setting("time_metric_base", "occurred")
_raw = _setting("time_metric_worktime", "off")
if _raw == "true": _raw = "worktime"
elif _raw == "false": _raw = "off"
_mode = _raw
if _base == "reported":
t_join = "rep.reported_at"
else:
t_join = "COALESCE(rep.occurred_at, rep.reported_at)"
all_months = _month_range(months)
if _mode in ("worktime", "holiday_24h"):
holiday_dates = {r.holiday_date for r in db.query(Holiday).all()}
calc_fn = _calc_business_hours if _mode == "worktime" else _calc_holiday_excluded_hours
rows = db.execute(text(f"""
SELECT TO_CHAR(r.completed_at, 'YYYY-MM') AS month,
{t_join} AS start_t,
r.completed_at
FROM repairs r
JOIN repair_reports rr ON rr.repair_id = r.id
JOIN reports rep ON rep.id = rr.report_id
WHERE r.completed_at IS NOT NULL
AND r.completed_at >= NOW() - INTERVAL '{months} months'
""")).fetchall()
by_month = defaultdict(list)
for row in rows:
if row[1] and row[2]:
by_month[row[0]].append(calc_fn(row[1], row[2], holiday_dates))
data = {}
for m, vals in by_month.items():
data[m] = {"avg": round(sum(vals) / len(vals), 1), "count": len(vals)}
else:
rows = db.execute(text(f"""
SELECT TO_CHAR(r.completed_at, 'YYYY-MM') AS month,
ROUND(AVG(EXTRACT(EPOCH FROM (r.completed_at - {t_join}))/3600)::numeric, 1) AS avg_h,
COUNT(*) AS cnt
FROM repairs r
JOIN repair_reports rr ON rr.repair_id = r.id
JOIN reports rep ON rep.id = rr.report_id
WHERE r.completed_at IS NOT NULL
AND r.completed_at >= NOW() - INTERVAL '{months} months'
GROUP BY month
ORDER BY month
""")).fetchall()
data = {row[0]: {"avg": float(row[1]) if row[1] else None, "count": int(row[2])} for row in rows}
# ── 월별 신고 접수 건수 ──
rpt_rows = db.execute(text(f"""
SELECT TO_CHAR(reported_at, 'YYYY-MM') AS month,
COUNT(*) AS total,
COUNT(*) FILTER (WHERE status = 'done') AS done_cnt
FROM reports
WHERE reported_at >= NOW() - INTERVAL '{months} months'
GROUP BY month
ORDER BY month
""")).fetchall()
rpt_map = {row[0]: {"total": int(row[1]), "done": int(row[2])} for row in rpt_rows}
result = []
for m in all_months:
d = data.get(m)
r = rpt_map.get(m)
result.append({
"month": m,
"avg_hours": d["avg"] if d else None,
"count": d["count"] if d else 0,
"report_total": r["total"] if r else 0,
"report_done": r["done"] if r else 0,
})
return {"data": result, "time_metric_worktime": _mode, "time_metric_base": _base}
finally:
db.close()
@app.get("/api/stats/daily")
def stats_daily(month: str):
"""month: YYYY-MM. Returns day-by-day processing time and report counts."""
from database import SessionLocal
from sqlalchemy import text
from models import SystemSetting, Holiday
try:
year, mon = int(month[:4]), int(month[5:7])
except (ValueError, IndexError):
raise HTTPException(400, "month must be YYYY-MM format")
db = SessionLocal()
try:
def _setting(key, default):
r = db.query(SystemSetting).filter_by(key=key).first()
return r.value if r else default
_base = _setting("time_metric_base", "occurred")
_raw = _setting("time_metric_worktime", "off")
if _raw == "true": _raw = "worktime"
elif _raw == "false": _raw = "off"
_mode = _raw
if _base == "reported":
t_join = "rep.reported_at"
t_plain = "reported_at"
else:
t_join = "COALESCE(rep.occurred_at, rep.reported_at)"
t_plain = "COALESCE(occurred_at, reported_at)"
_, days_in_month = cal_module.monthrange(year, mon)
all_days = [f"{year:04d}-{mon:02d}-{d:02d}" for d in range(1, days_in_month + 1)]
if _mode in ("worktime", "holiday_24h"):
holiday_dates = {r.holiday_date for r in db.query(Holiday).all()}
calc_fn = _calc_business_hours if _mode == "worktime" else _calc_holiday_excluded_hours
rows = db.execute(text(f"""
SELECT TO_CHAR(r.completed_at, 'YYYY-MM-DD') AS day,
{t_join} AS start_t,
r.completed_at
FROM repairs r
JOIN repair_reports rr ON rr.repair_id = r.id
JOIN reports rep ON rep.id = rr.report_id
WHERE r.completed_at IS NOT NULL
AND TO_CHAR(r.completed_at, 'YYYY-MM') = :month
"""), {"month": month}).fetchall()
by_day = defaultdict(list)
for row in rows:
if row[1] and row[2]:
by_day[row[0]].append(calc_fn(row[1], row[2], holiday_dates))
data = {}
for d, vals in by_day.items():
data[d] = {"avg": round(sum(vals) / len(vals), 1), "count": len(vals)}
else:
rows = db.execute(text(f"""
SELECT TO_CHAR(r.completed_at, 'YYYY-MM-DD') AS day,
ROUND(AVG(EXTRACT(EPOCH FROM (r.completed_at - {t_join}))/3600)::numeric, 1) AS avg_h,
COUNT(*) AS cnt
FROM repairs r
JOIN repair_reports rr ON rr.repair_id = r.id
JOIN reports rep ON rep.id = rr.report_id
WHERE r.completed_at IS NOT NULL
AND TO_CHAR(r.completed_at, 'YYYY-MM') = :month
GROUP BY day
ORDER BY day
"""), {"month": month}).fetchall()
data = {row[0]: {"avg": float(row[1]) if row[1] else None, "count": int(row[2])} for row in rows}
rpt_rows = db.execute(text(f"""
SELECT TO_CHAR(reported_at, 'YYYY-MM-DD') AS day,
COUNT(*) AS total,
COUNT(*) FILTER (WHERE status = 'done') AS done_cnt
FROM reports
WHERE TO_CHAR(reported_at, 'YYYY-MM') = :month
GROUP BY day
ORDER BY day
"""), {"month": month}).fetchall()
rpt_map = {row[0]: {"total": int(row[1]), "done": int(row[2])} for row in rpt_rows}
result = []
for day in all_days:
d = data.get(day)
r = rpt_map.get(day)
result.append({
"day": day,
"avg_hours": d["avg"] if d else None,
"count": d["count"] if d else 0,
"report_total": r["total"] if r else 0,
"report_done": r["done"] if r else 0,
})
return {"data": result, "time_metric_worktime": _mode, "time_metric_base": _base, "month": month}
finally:
db.close()
@app.get("/api/stats/daily/detail")
def stats_daily_detail(day: str):
"""day: YYYY-MM-DD. Returns per-repair and per-report detail for that day."""
import json as _json
from database import SessionLocal
from sqlalchemy import text
from models import SystemSetting, Holiday
try:
datetime.strptime(day, "%Y-%m-%d")
except ValueError:
raise HTTPException(400, "day must be YYYY-MM-DD format")
db = SessionLocal()
try:
def _setting(key, default):
r = db.query(SystemSetting).filter_by(key=key).first()
return r.value if r else default
_base = _setting("time_metric_base", "occurred")
_raw = _setting("time_metric_worktime", "off")
if _raw == "true": _raw = "worktime"
elif _raw == "false": _raw = "off"
_mode = _raw
if _base == "reported":
t_join = "rep.reported_at"
else:
t_join = "COALESCE(rep.occurred_at, rep.reported_at)"
if _mode in ("worktime", "holiday_24h"):
holiday_dates = {r.holiday_date for r in db.query(Holiday).all()}
calc_fn = _calc_business_hours if _mode == "worktime" else _calc_holiday_excluded_hours
# ── 처리 완료 내역 ──
repair_rows = db.execute(text(f"""
SELECT r.id AS repair_id,
rep.id AS report_id,
rep.charger_id,
c.station_name,
rep.issue_types,
{t_join} AS start_t,
r.completed_at,
u.name AS mechanic_name
FROM repairs r
JOIN repair_reports rr ON rr.repair_id = r.id
JOIN reports rep ON rep.id = rr.report_id
LEFT JOIN users u ON u.id = r.mechanic_id
LEFT JOIN chargers c ON c.id = rep.charger_id
WHERE r.completed_at IS NOT NULL
AND TO_CHAR(r.completed_at, 'YYYY-MM-DD') = :day
ORDER BY r.completed_at
"""), {"day": day}).fetchall()
repairs = []
for row in repair_rows:
start_t, completed = row[5], row[6]
if _mode in ("worktime", "holiday_24h") and start_t and completed:
h = calc_fn(start_t, completed, holiday_dates)
elif start_t and completed:
h = round((completed - start_t).total_seconds() / 3600, 1)
else:
h = None
try:
issues = _json.loads(row[4]) if row[4] else []
except Exception:
issues = []
repairs.append({
"repair_id": row[0],
"report_id": row[1],
"charger_id": row[2] or "",
"station_name": row[3] or "",
"issue_types": issues,
"start_t": start_t.isoformat() if start_t else None,
"completed_at": completed.isoformat() if completed else None,
"processing_hours": h,
"mechanic_name": row[7] or "",
})
# ── 신고 접수 내역 ──
rpt_rows = db.execute(text("""
SELECT rep.id, rep.charger_id, c.station_name,
rep.issue_types, rep.status, rep.reported_at
FROM reports rep
LEFT JOIN chargers c ON c.id = rep.charger_id
WHERE TO_CHAR(rep.reported_at, 'YYYY-MM-DD') = :day
ORDER BY rep.reported_at
"""), {"day": day}).fetchall()
reports = []
for row in rpt_rows:
try:
issues = _json.loads(row[3]) if row[3] else []
except Exception:
issues = []
reports.append({
"id": row[0],
"charger_id": row[1] or "",
"station_name": row[2] or "",
"issue_types": issues,
"status": row[4],
"reported_at": row[5].isoformat() if row[5] else None,
})
return {"day": day, "repairs": repairs, "reports": reports}
finally:
db.close()
@app.get("/api/stats/top-chargers")
def stats_top_chargers(limit: int = 10):
"""충전기별 누적 고장 신고 건수 Top N."""
from database import SessionLocal
from sqlalchemy import text
db = SessionLocal()
try:
rows = db.execute(text("""
SELECT rep.charger_id,
COALESCE(c.station_name, rep.charger_id) AS station_name,
COALESCE(c.name, '') AS charger_name,
COUNT(*) AS total,
COUNT(*) FILTER (WHERE rep.status = 'done') AS done_cnt,
COUNT(*) FILTER (WHERE rep.status != 'done') AS active_cnt
FROM reports rep
LEFT JOIN chargers c ON c.id = rep.charger_id
GROUP BY rep.charger_id, c.station_name, c.name
ORDER BY total DESC
LIMIT :lim
"""), {"lim": limit}).fetchall()
return [
{
"charger_id": row[0],
"station_name": row[1],
"charger_name": row[2],
"total": int(row[3]),
"done": int(row[4]),
"active": int(row[5]),
}
for row in rows
]
finally:
db.close()
@app.get("/api/stats/charger-error-codes")
def stats_charger_error_codes(charger_limit: int = 10, code_limit: int = 6):
"""충전기별 에러코드 누적 건수 Top N (에러코드 입력된 신고 기준)."""
from database import SessionLocal
from sqlalchemy import text
from collections import defaultdict
db = SessionLocal()
try:
rows = db.execute(text("""
SELECT rep.charger_id,
COALESCE(c.station_name, rep.charger_id) AS station_name,
COALESCE(c.name, '') AS charger_name,
TRIM(rep.error_code) AS error_code,
COUNT(*) AS cnt
FROM reports rep
LEFT JOIN chargers c ON c.id = rep.charger_id
WHERE rep.error_code IS NOT NULL
AND TRIM(rep.error_code) != ''
GROUP BY rep.charger_id, c.station_name, c.name, TRIM(rep.error_code)
""")).fetchall()
if not rows:
return {"chargers": [], "error_codes": []}
charger_info = {}
code_totals = defaultdict(int)
for row in rows:
cid, sname, cname, ecode, cnt = row
cnt = int(cnt)
if cid not in charger_info:
charger_info[cid] = {"station_name": sname, "charger_name": cname,
"total": 0, "errors": {}}
charger_info[cid]["total"] += cnt
charger_info[cid]["errors"][ecode] = cnt
code_totals[ecode] += cnt
top_chargers = sorted(charger_info.items(), key=lambda x: -x[1]["total"])[:charger_limit]
top_codes = [c for c, _ in sorted(code_totals.items(), key=lambda x: -x[1])[:code_limit]]
result = []
for cid, info in reversed(top_chargers): # 역순: 차트에서 1위가 위에
label = info["station_name"]
if info["charger_name"]:
label += f" ({info['charger_name']})"
if len(label) > 22:
label = label[:20] + ""
errors = info["errors"]
other = sum(cnt for code, cnt in errors.items() if code not in top_codes)
entry = {"charger_id": cid, "label": label, "total": info["total"]}
for code in top_codes:
entry[code] = errors.get(code, 0)
if other:
entry["기타"] = other
result.append(entry)
has_other = any(r.get("기타", 0) > 0 for r in result)
all_codes = top_codes + (["기타"] if has_other else [])
return {"chargers": result, "error_codes": all_codes}
finally:
db.close()