from fastapi import FastAPI, HTTPException from fastapi.middleware.cors import CORSMiddleware from fastapi.staticfiles import StaticFiles from datetime import datetime, timedelta from collections import defaultdict import calendar as cal_module import os from routers import auth_router, chargers, reports, repairs, costs, improvements, accounts, settings, export, manufacturers from routers import holidays app = FastAPI(title="EV 충전기 AS 관리 시스템", version="1.0.0") app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # 라우터 등록 app.include_router(auth_router.router) app.include_router(chargers.router) app.include_router(reports.router) app.include_router(repairs.router) app.include_router(costs.router) app.include_router(improvements.router) app.include_router(accounts.router) app.include_router(settings.router) app.include_router(export.router) app.include_router(manufacturers.router) app.include_router(holidays.router) def _calc_business_hours(start: datetime, end: datetime, holiday_dates: set, work_start: int = 9, work_end: int = 18) -> float: """업무시간(평일 09:00-18:00, 공휴일 제외) 기준 경과 시간 계산.""" if not start or not end or end <= start: return 0.0 s = start.replace(tzinfo=None) e = end.replace(tzinfo=None) total = 0.0 cur_date = s.date() end_date = e.date() while cur_date <= end_date: if cur_date.weekday() < 5 and cur_date not in holiday_dates: day_ws = datetime(cur_date.year, cur_date.month, cur_date.day, work_start) day_we = datetime(cur_date.year, cur_date.month, cur_date.day, work_end) seg_start = max(s, day_ws) seg_end = min(e, day_we) if seg_end > seg_start: total += (seg_end - seg_start).total_seconds() / 3600 cur_date += timedelta(days=1) return round(total, 1) def _calc_holiday_excluded_hours(start: datetime, end: datetime, holiday_dates: set) -> float: """공휴일을 제외하고 나머지 날(주말 포함)은 24시간 전체 카운트.""" if not start or not end or end <= start: return 0.0 s = start.replace(tzinfo=None) e = end.replace(tzinfo=None) total = 0.0 cur_date = s.date() end_date = e.date() while cur_date <= end_date: if cur_date not in holiday_dates: day_s = datetime(cur_date.year, cur_date.month, cur_date.day) day_e = day_s + timedelta(days=1) seg_start = max(s, day_s) seg_end = min(e, day_e) if seg_end > seg_start: total += (seg_end - seg_start).total_seconds() / 3600 cur_date += timedelta(days=1) return round(total, 1) @app.get("/api/health") def health(): return {"status": "ok"} @app.get("/api/stats") def stats(db=None): from database import SessionLocal from sqlalchemy import func, text from models import Report, Repair, RepairCost, Improvement, SystemSetting, Holiday db = SessionLocal() try: total = db.query(Report).count() pending = db.query(Report).filter(Report.status.in_(["pending","pending_approval"])).count() in_prog = db.query(Report).filter(Report.status == "in_progress").count() done = db.query(Report).filter(Report.status == "done").count() cost_pend = db.query(RepairCost).filter(RepairCost.cost_status == "pending").count() imp_open = db.query(Improvement).filter( Improvement.status.in_(["registered","reviewing","developing"])).count() # ── 설정 읽기 ── def _setting(key, default): r = db.query(SystemSetting).filter_by(key=key).first() return r.value if r else default _base = _setting("time_metric_base", "occurred") # 구버전 "true"/"false" → 신버전 mode 문자열로 정규화 _raw_mode = _setting("time_metric_worktime", "off") if _raw_mode == "true": _raw_mode = "worktime" elif _raw_mode == "false": _raw_mode = "off" _mode = _raw_mode # "off" | "holiday_24h" | "worktime" if _base == "reported": t_join = "rep.reported_at" t_plain = "reported_at" else: t_join = "COALESCE(rep.occurred_at, rep.reported_at)" t_plain = "COALESCE(occurred_at, reported_at)" if _mode in ("worktime", "holiday_24h"): # ── Python 기반 계산 (공휴일 테이블 활용) ── holiday_dates = {r.holiday_date for r in db.query(Holiday).all()} calc_fn = _calc_business_hours if _mode == "worktime" else _calc_holiday_excluded_hours def _avg_py(interval_days): rows = db.execute(text(f""" SELECT {t_join} AS start_t, r.completed_at FROM repairs r JOIN repair_reports rr ON rr.repair_id = r.id JOIN reports rep ON rep.id = rr.report_id WHERE r.completed_at IS NOT NULL AND r.completed_at >= NOW() - INTERVAL '{interval_days} days' """)).fetchall() h_list = [calc_fn(row[0], row[1], holiday_dates) for row in rows if row[0] and row[1]] return round(sum(h_list) / len(h_list), 1) if h_list else None avg_30d = _avg_py(30) avg_7d = _avg_py(7) pending_rows = db.execute(text(f""" SELECT {t_plain} AS start_t FROM reports WHERE status IN ('pending','pending_approval','in_progress','waiting','revisit') AND {t_plain} IS NOT NULL """)).fetchall() now = datetime.now() h_pending = [calc_fn(row[0], now, holiday_dates) for row in pending_rows if row[0]] over_24h = sum(1 for h in h_pending if h > 24) over_72h = sum(1 for h in h_pending if h > 72) longest_h = max(h_pending, default=0.0) else: # ── 단순 경과시간 기준 계산 (SQL) ── avg_30d = db.execute(text(f""" SELECT ROUND(AVG(EXTRACT(EPOCH FROM (r.completed_at - {t_join}))/3600)::numeric, 1) FROM repairs r JOIN repair_reports rr ON rr.repair_id = r.id JOIN reports rep ON rep.id = rr.report_id WHERE r.completed_at IS NOT NULL AND r.completed_at >= NOW() - INTERVAL '30 days' """)).scalar() avg_7d = db.execute(text(f""" SELECT ROUND(AVG(EXTRACT(EPOCH FROM (r.completed_at - {t_join}))/3600)::numeric, 1) FROM repairs r JOIN repair_reports rr ON rr.repair_id = r.id JOIN reports rep ON rep.id = rr.report_id WHERE r.completed_at IS NOT NULL AND r.completed_at >= NOW() - INTERVAL '7 days' """)).scalar() avg_30d = float(avg_30d) if avg_30d else None avg_7d = float(avg_7d) if avg_7d else None row = db.execute(text(f""" SELECT COUNT(*) FILTER (WHERE EXTRACT(EPOCH FROM (NOW()-{t_plain}))/3600 > 24) AS over_24h, COUNT(*) FILTER (WHERE EXTRACT(EPOCH FROM (NOW()-{t_plain}))/3600 > 72) AS over_72h, COALESCE(MAX(ROUND(EXTRACT(EPOCH FROM (NOW()-{t_plain}))/3600, 1)), 0) AS longest_h FROM reports WHERE status IN ('pending','pending_approval','in_progress','waiting','revisit') """)).fetchone() over_24h = int(row.over_24h) over_72h = int(row.over_72h) longest_h = float(row.longest_h) return { "total": total, "pending": pending, "in_progress": in_prog, "done": done, "cost_pending": cost_pend, "improvement_open": imp_open, "time_metric_base": _base, "time_metric_worktime": _mode, "avg_resolution_hours_30d": avg_30d, "avg_resolution_hours_7d": avg_7d, "pending_over_24h": over_24h, "pending_over_72h": over_72h, "longest_pending_hours": longest_h, } finally: db.close() def _month_range(n: int = 13): """최근 n개월 목록 생성 (YYYY-MM 형식).""" now = datetime.now() result = [] for i in range(n - 1, -1, -1): m = now.month - i y = now.year while m <= 0: m += 12 y -= 1 result.append(f"{y:04d}-{m:02d}") return result @app.get("/api/stats/monthly") def stats_monthly(months: int = 13): from database import SessionLocal from sqlalchemy import text from models import SystemSetting, Holiday db = SessionLocal() try: def _setting(key, default): r = db.query(SystemSetting).filter_by(key=key).first() return r.value if r else default _base = _setting("time_metric_base", "occurred") _raw = _setting("time_metric_worktime", "off") if _raw == "true": _raw = "worktime" elif _raw == "false": _raw = "off" _mode = _raw if _base == "reported": t_join = "rep.reported_at" else: t_join = "COALESCE(rep.occurred_at, rep.reported_at)" all_months = _month_range(months) if _mode in ("worktime", "holiday_24h"): holiday_dates = {r.holiday_date for r in db.query(Holiday).all()} calc_fn = _calc_business_hours if _mode == "worktime" else _calc_holiday_excluded_hours rows = db.execute(text(f""" SELECT TO_CHAR(r.completed_at, 'YYYY-MM') AS month, {t_join} AS start_t, r.completed_at FROM repairs r JOIN repair_reports rr ON rr.repair_id = r.id JOIN reports rep ON rep.id = rr.report_id WHERE r.completed_at IS NOT NULL AND r.completed_at >= NOW() - INTERVAL '{months} months' """)).fetchall() by_month = defaultdict(list) for row in rows: if row[1] and row[2]: by_month[row[0]].append(calc_fn(row[1], row[2], holiday_dates)) data = {} for m, vals in by_month.items(): data[m] = {"avg": round(sum(vals) / len(vals), 1), "count": len(vals)} else: rows = db.execute(text(f""" SELECT TO_CHAR(r.completed_at, 'YYYY-MM') AS month, ROUND(AVG(EXTRACT(EPOCH FROM (r.completed_at - {t_join}))/3600)::numeric, 1) AS avg_h, COUNT(*) AS cnt FROM repairs r JOIN repair_reports rr ON rr.repair_id = r.id JOIN reports rep ON rep.id = rr.report_id WHERE r.completed_at IS NOT NULL AND r.completed_at >= NOW() - INTERVAL '{months} months' GROUP BY month ORDER BY month """)).fetchall() data = {row[0]: {"avg": float(row[1]) if row[1] else None, "count": int(row[2])} for row in rows} # ── 월별 신고 접수 건수 ── rpt_rows = db.execute(text(f""" SELECT TO_CHAR(reported_at, 'YYYY-MM') AS month, COUNT(*) AS total, COUNT(*) FILTER (WHERE status = 'done') AS done_cnt FROM reports WHERE reported_at >= NOW() - INTERVAL '{months} months' GROUP BY month ORDER BY month """)).fetchall() rpt_map = {row[0]: {"total": int(row[1]), "done": int(row[2])} for row in rpt_rows} result = [] for m in all_months: d = data.get(m) r = rpt_map.get(m) result.append({ "month": m, "avg_hours": d["avg"] if d else None, "count": d["count"] if d else 0, "report_total": r["total"] if r else 0, "report_done": r["done"] if r else 0, }) return {"data": result, "time_metric_worktime": _mode, "time_metric_base": _base} finally: db.close() @app.get("/api/stats/daily") def stats_daily(month: str): """month: YYYY-MM. Returns day-by-day processing time and report counts.""" from database import SessionLocal from sqlalchemy import text from models import SystemSetting, Holiday try: year, mon = int(month[:4]), int(month[5:7]) except (ValueError, IndexError): raise HTTPException(400, "month must be YYYY-MM format") db = SessionLocal() try: def _setting(key, default): r = db.query(SystemSetting).filter_by(key=key).first() return r.value if r else default _base = _setting("time_metric_base", "occurred") _raw = _setting("time_metric_worktime", "off") if _raw == "true": _raw = "worktime" elif _raw == "false": _raw = "off" _mode = _raw if _base == "reported": t_join = "rep.reported_at" t_plain = "reported_at" else: t_join = "COALESCE(rep.occurred_at, rep.reported_at)" t_plain = "COALESCE(occurred_at, reported_at)" _, days_in_month = cal_module.monthrange(year, mon) all_days = [f"{year:04d}-{mon:02d}-{d:02d}" for d in range(1, days_in_month + 1)] if _mode in ("worktime", "holiday_24h"): holiday_dates = {r.holiday_date for r in db.query(Holiday).all()} calc_fn = _calc_business_hours if _mode == "worktime" else _calc_holiday_excluded_hours rows = db.execute(text(f""" SELECT TO_CHAR(r.completed_at, 'YYYY-MM-DD') AS day, {t_join} AS start_t, r.completed_at FROM repairs r JOIN repair_reports rr ON rr.repair_id = r.id JOIN reports rep ON rep.id = rr.report_id WHERE r.completed_at IS NOT NULL AND TO_CHAR(r.completed_at, 'YYYY-MM') = :month """), {"month": month}).fetchall() by_day = defaultdict(list) for row in rows: if row[1] and row[2]: by_day[row[0]].append(calc_fn(row[1], row[2], holiday_dates)) data = {} for d, vals in by_day.items(): data[d] = {"avg": round(sum(vals) / len(vals), 1), "count": len(vals)} else: rows = db.execute(text(f""" SELECT TO_CHAR(r.completed_at, 'YYYY-MM-DD') AS day, ROUND(AVG(EXTRACT(EPOCH FROM (r.completed_at - {t_join}))/3600)::numeric, 1) AS avg_h, COUNT(*) AS cnt FROM repairs r JOIN repair_reports rr ON rr.repair_id = r.id JOIN reports rep ON rep.id = rr.report_id WHERE r.completed_at IS NOT NULL AND TO_CHAR(r.completed_at, 'YYYY-MM') = :month GROUP BY day ORDER BY day """), {"month": month}).fetchall() data = {row[0]: {"avg": float(row[1]) if row[1] else None, "count": int(row[2])} for row in rows} rpt_rows = db.execute(text(f""" SELECT TO_CHAR(reported_at, 'YYYY-MM-DD') AS day, COUNT(*) AS total, COUNT(*) FILTER (WHERE status = 'done') AS done_cnt FROM reports WHERE TO_CHAR(reported_at, 'YYYY-MM') = :month GROUP BY day ORDER BY day """), {"month": month}).fetchall() rpt_map = {row[0]: {"total": int(row[1]), "done": int(row[2])} for row in rpt_rows} result = [] for day in all_days: d = data.get(day) r = rpt_map.get(day) result.append({ "day": day, "avg_hours": d["avg"] if d else None, "count": d["count"] if d else 0, "report_total": r["total"] if r else 0, "report_done": r["done"] if r else 0, }) return {"data": result, "time_metric_worktime": _mode, "time_metric_base": _base, "month": month} finally: db.close() @app.get("/api/stats/daily/detail") def stats_daily_detail(day: str): """day: YYYY-MM-DD. Returns per-repair and per-report detail for that day.""" import json as _json from database import SessionLocal from sqlalchemy import text from models import SystemSetting, Holiday try: datetime.strptime(day, "%Y-%m-%d") except ValueError: raise HTTPException(400, "day must be YYYY-MM-DD format") db = SessionLocal() try: def _setting(key, default): r = db.query(SystemSetting).filter_by(key=key).first() return r.value if r else default _base = _setting("time_metric_base", "occurred") _raw = _setting("time_metric_worktime", "off") if _raw == "true": _raw = "worktime" elif _raw == "false": _raw = "off" _mode = _raw if _base == "reported": t_join = "rep.reported_at" else: t_join = "COALESCE(rep.occurred_at, rep.reported_at)" if _mode in ("worktime", "holiday_24h"): holiday_dates = {r.holiday_date for r in db.query(Holiday).all()} calc_fn = _calc_business_hours if _mode == "worktime" else _calc_holiday_excluded_hours # ── 처리 완료 내역 ── repair_rows = db.execute(text(f""" SELECT r.id AS repair_id, rep.id AS report_id, rep.charger_id, c.station_name, rep.issue_types, {t_join} AS start_t, r.completed_at, u.name AS mechanic_name FROM repairs r JOIN repair_reports rr ON rr.repair_id = r.id JOIN reports rep ON rep.id = rr.report_id LEFT JOIN users u ON u.id = r.mechanic_id LEFT JOIN chargers c ON c.id = rep.charger_id WHERE r.completed_at IS NOT NULL AND TO_CHAR(r.completed_at, 'YYYY-MM-DD') = :day ORDER BY r.completed_at """), {"day": day}).fetchall() repairs = [] for row in repair_rows: start_t, completed = row[5], row[6] if _mode in ("worktime", "holiday_24h") and start_t and completed: h = calc_fn(start_t, completed, holiday_dates) elif start_t and completed: h = round((completed - start_t).total_seconds() / 3600, 1) else: h = None try: issues = _json.loads(row[4]) if row[4] else [] except Exception: issues = [] repairs.append({ "repair_id": row[0], "report_id": row[1], "charger_id": row[2] or "", "station_name": row[3] or "", "issue_types": issues, "start_t": start_t.isoformat() if start_t else None, "completed_at": completed.isoformat() if completed else None, "processing_hours": h, "mechanic_name": row[7] or "", }) # ── 신고 접수 내역 ── rpt_rows = db.execute(text(""" SELECT rep.id, rep.charger_id, c.station_name, rep.issue_types, rep.status, rep.reported_at FROM reports rep LEFT JOIN chargers c ON c.id = rep.charger_id WHERE TO_CHAR(rep.reported_at, 'YYYY-MM-DD') = :day ORDER BY rep.reported_at """), {"day": day}).fetchall() reports = [] for row in rpt_rows: try: issues = _json.loads(row[3]) if row[3] else [] except Exception: issues = [] reports.append({ "id": row[0], "charger_id": row[1] or "", "station_name": row[2] or "", "issue_types": issues, "status": row[4], "reported_at": row[5].isoformat() if row[5] else None, }) return {"day": day, "repairs": repairs, "reports": reports} finally: db.close() @app.get("/api/stats/top-chargers") def stats_top_chargers(limit: int = 10): """충전기별 누적 고장 신고 건수 Top N.""" from database import SessionLocal from sqlalchemy import text db = SessionLocal() try: rows = db.execute(text(""" SELECT rep.charger_id, COALESCE(c.station_name, rep.charger_id) AS station_name, COALESCE(c.name, '') AS charger_name, COUNT(*) AS total, COUNT(*) FILTER (WHERE rep.status = 'done') AS done_cnt, COUNT(*) FILTER (WHERE rep.status != 'done') AS active_cnt FROM reports rep LEFT JOIN chargers c ON c.id = rep.charger_id GROUP BY rep.charger_id, c.station_name, c.name ORDER BY total DESC LIMIT :lim """), {"lim": limit}).fetchall() return [ { "charger_id": row[0], "station_name": row[1], "charger_name": row[2], "total": int(row[3]), "done": int(row[4]), "active": int(row[5]), } for row in rows ] finally: db.close() @app.get("/api/stats/top-stations") def stats_top_stations(limit: int = 10): """충전소별 누적 고장 신고 건수 Top N.""" from database import SessionLocal from sqlalchemy import text db = SessionLocal() try: rows = db.execute(text(""" SELECT COALESCE(c.station_name, rep.charger_id) AS station_name, COUNT(*) AS total, COUNT(*) FILTER (WHERE rep.status = 'done') AS done_cnt, COUNT(*) FILTER (WHERE rep.status != 'done') AS active_cnt FROM reports rep LEFT JOIN chargers c ON c.id = rep.charger_id GROUP BY COALESCE(c.station_name, rep.charger_id) ORDER BY total DESC LIMIT :lim """), {"lim": limit}).fetchall() return [ { "station_name": row[0], "total": int(row[1]), "done": int(row[2]), "active": int(row[3]), } for row in rows ] finally: db.close() @app.get("/api/stats/charger-error-codes") def stats_charger_error_codes(code_limit: int = 10): """에러코드별 누적 건수 Top N (에러코드 없음 포함).""" from database import SessionLocal from sqlalchemy import text db = SessionLocal() try: rows = db.execute(text(""" SELECT COALESCE(NULLIF(TRIM(COALESCE(error_code, '')), ''), '에러코드 없음') AS error_code, COUNT(*) AS cnt FROM reports GROUP BY COALESCE(NULLIF(TRIM(COALESCE(error_code, '')), ''), '에러코드 없음') ORDER BY cnt DESC LIMIT :limit """), {"limit": code_limit}).fetchall() result = [{"error_code": r[0], "total": int(r[1])} for r in reversed(rows)] return {"error_codes": result} finally: db.close()