기능 추가 및 버그 수정 — 처리시간 지표, 대시보드 차트, UI 개선

## 처리시간 지표
- 업무시간 기준(09-18 평일) / 공휴일 제외 24h / 달력 기준 3가지 모드 선택
- 공휴일 DB 관리 (holidays 테이블, 수동 등록·삭제·일괄 추가)
- 2026년 공휴일 등록 지원
- 설정 페이지에서 라디오 버튼으로 모드 선택

## 대시보드 차트
- 월별 평균 처리시간 막대 차트 추가
- 월별 신고 접수 건수 누적 막대 차트 추가
- 월별 → 일별 드릴다운 (막대 클릭 시 해당 월의 일별 차트로 전환)
- 일별 막대 클릭 시 처리 완료/신고 접수 상세 내역 모달
- 충전기별 누적 고장 건수 Top 10 수평 막대 차트 추가

## 신고 목록
- # 컬럼을 DB PK 대신 현재 목록 순서(1, 2, 3…)로 표시
- 엑셀 export 접수번호도 순차번호로 변경

## 모바일 네비게이션 버그 수정
- 모바일에서 가로 오버플로우 시 nav가 body 넓이로 늘어나
  햄버거 버튼이 화면 밖으로 밀리는 문제 수정
- nav를 position:fixed + body padding-top:54px 로 변경 (전체 페이지 적용)
- 충전기 관리·신고 목록 페이지 지도 컨테이너에 isolation:isolate 적용

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
byun
2026-05-31 06:52:56 +09:00
parent 05b478372a
commit 2e8751ea6c
35 changed files with 5541 additions and 353 deletions

View File

@@ -13,7 +13,8 @@ SECRET_KEY = os.getenv("SECRET_KEY", "changeme")
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_HOURS = 24
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/api/auth/login")
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/api/auth/login")
oauth2_scheme_optional = OAuth2PasswordBearer(tokenUrl="/api/auth/login", auto_error=False)
def hash_password(password: str) -> str:
"""비밀번호 bcrypt 해시 생성"""
@@ -58,6 +59,24 @@ def get_current_user(
raise credentials_exception
return user
def get_optional_user(
token: Optional[str] = Depends(oauth2_scheme_optional),
db: Session = Depends(get_db)
) -> Optional[models.User]:
if not token:
return None
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
user_id = payload.get("sub")
if not user_id:
return None
return db.query(models.User).filter(
models.User.id == int(user_id),
models.User.is_active == True
).first()
except Exception:
return None
def require_role(*roles):
def checker(current_user: models.User = Depends(get_current_user)):
if current_user.role not in roles:

View File

@@ -1,9 +1,13 @@
from fastapi import FastAPI
from fastapi import FastAPI, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from fastapi.staticfiles import StaticFiles
from datetime import datetime, timedelta
from collections import defaultdict
import calendar as cal_module
import os
from routers import auth_router, chargers, reports, repairs, costs, improvements, accounts, settings, export
from routers import auth_router, chargers, reports, repairs, costs, improvements, accounts, settings, export, manufacturers
from routers import holidays
app = FastAPI(title="EV 충전기 AS 관리 시스템", version="1.0.0")
@@ -25,6 +29,49 @@ app.include_router(improvements.router)
app.include_router(accounts.router)
app.include_router(settings.router)
app.include_router(export.router)
app.include_router(manufacturers.router)
app.include_router(holidays.router)
def _calc_business_hours(start: datetime, end: datetime, holiday_dates: set,
work_start: int = 9, work_end: int = 18) -> float:
"""업무시간(평일 09:00-18:00, 공휴일 제외) 기준 경과 시간 계산."""
if not start or not end or end <= start:
return 0.0
s = start.replace(tzinfo=None)
e = end.replace(tzinfo=None)
total = 0.0
cur_date = s.date()
end_date = e.date()
while cur_date <= end_date:
if cur_date.weekday() < 5 and cur_date not in holiday_dates:
day_ws = datetime(cur_date.year, cur_date.month, cur_date.day, work_start)
day_we = datetime(cur_date.year, cur_date.month, cur_date.day, work_end)
seg_start = max(s, day_ws)
seg_end = min(e, day_we)
if seg_end > seg_start:
total += (seg_end - seg_start).total_seconds() / 3600
cur_date += timedelta(days=1)
return round(total, 1)
def _calc_holiday_excluded_hours(start: datetime, end: datetime, holiday_dates: set) -> float:
"""공휴일을 제외하고 나머지 날(주말 포함)은 24시간 전체 카운트."""
if not start or not end or end <= start:
return 0.0
s = start.replace(tzinfo=None)
e = end.replace(tzinfo=None)
total = 0.0
cur_date = s.date()
end_date = e.date()
while cur_date <= end_date:
if cur_date not in holiday_dates:
day_s = datetime(cur_date.year, cur_date.month, cur_date.day)
day_e = day_s + timedelta(days=1)
seg_start = max(s, day_s)
seg_end = min(e, day_e)
if seg_end > seg_start:
total += (seg_end - seg_start).total_seconds() / 3600
cur_date += timedelta(days=1)
return round(total, 1)
@app.get("/api/health")
def health():
@@ -33,8 +80,8 @@ def health():
@app.get("/api/stats")
def stats(db=None):
from database import SessionLocal
from sqlalchemy import func
from models import Report, Repair, RepairCost, Improvement
from sqlalchemy import func, text
from models import Report, Repair, RepairCost, Improvement, SystemSetting, Holiday
db = SessionLocal()
try:
total = db.query(Report).count()
@@ -44,10 +91,452 @@ def stats(db=None):
cost_pend = db.query(RepairCost).filter(RepairCost.cost_status == "pending").count()
imp_open = db.query(Improvement).filter(
Improvement.status.in_(["registered","reviewing","developing"])).count()
# ── 설정 읽기 ──
def _setting(key, default):
r = db.query(SystemSetting).filter_by(key=key).first()
return r.value if r else default
_base = _setting("time_metric_base", "occurred")
# 구버전 "true"/"false" → 신버전 mode 문자열로 정규화
_raw_mode = _setting("time_metric_worktime", "off")
if _raw_mode == "true": _raw_mode = "worktime"
elif _raw_mode == "false": _raw_mode = "off"
_mode = _raw_mode # "off" | "holiday_24h" | "worktime"
if _base == "reported":
t_join = "rep.reported_at"
t_plain = "reported_at"
else:
t_join = "COALESCE(rep.occurred_at, rep.reported_at)"
t_plain = "COALESCE(occurred_at, reported_at)"
if _mode in ("worktime", "holiday_24h"):
# ── Python 기반 계산 (공휴일 테이블 활용) ──
holiday_dates = {r.holiday_date for r in db.query(Holiday).all()}
calc_fn = _calc_business_hours if _mode == "worktime" else _calc_holiday_excluded_hours
def _avg_py(interval_days):
rows = db.execute(text(f"""
SELECT {t_join} AS start_t, r.completed_at
FROM repairs r
JOIN repair_reports rr ON rr.repair_id = r.id
JOIN reports rep ON rep.id = rr.report_id
WHERE r.completed_at IS NOT NULL
AND r.completed_at >= NOW() - INTERVAL '{interval_days} days'
""")).fetchall()
h_list = [calc_fn(row[0], row[1], holiday_dates)
for row in rows if row[0] and row[1]]
return round(sum(h_list) / len(h_list), 1) if h_list else None
avg_30d = _avg_py(30)
avg_7d = _avg_py(7)
pending_rows = db.execute(text(f"""
SELECT {t_plain} AS start_t
FROM reports
WHERE status IN ('pending','pending_approval','in_progress','waiting','revisit')
AND {t_plain} IS NOT NULL
""")).fetchall()
now = datetime.now()
h_pending = [calc_fn(row[0], now, holiday_dates) for row in pending_rows if row[0]]
over_24h = sum(1 for h in h_pending if h > 24)
over_72h = sum(1 for h in h_pending if h > 72)
longest_h = max(h_pending, default=0.0)
else:
# ── 단순 경과시간 기준 계산 (SQL) ──
avg_30d = db.execute(text(f"""
SELECT ROUND(AVG(EXTRACT(EPOCH FROM (r.completed_at - {t_join}))/3600)::numeric, 1)
FROM repairs r
JOIN repair_reports rr ON rr.repair_id = r.id
JOIN reports rep ON rep.id = rr.report_id
WHERE r.completed_at IS NOT NULL
AND r.completed_at >= NOW() - INTERVAL '30 days'
""")).scalar()
avg_7d = db.execute(text(f"""
SELECT ROUND(AVG(EXTRACT(EPOCH FROM (r.completed_at - {t_join}))/3600)::numeric, 1)
FROM repairs r
JOIN repair_reports rr ON rr.repair_id = r.id
JOIN reports rep ON rep.id = rr.report_id
WHERE r.completed_at IS NOT NULL
AND r.completed_at >= NOW() - INTERVAL '7 days'
""")).scalar()
avg_30d = float(avg_30d) if avg_30d else None
avg_7d = float(avg_7d) if avg_7d else None
row = db.execute(text(f"""
SELECT
COUNT(*) FILTER (WHERE EXTRACT(EPOCH FROM (NOW()-{t_plain}))/3600 > 24) AS over_24h,
COUNT(*) FILTER (WHERE EXTRACT(EPOCH FROM (NOW()-{t_plain}))/3600 > 72) AS over_72h,
COALESCE(MAX(ROUND(EXTRACT(EPOCH FROM (NOW()-{t_plain}))/3600, 1)), 0) AS longest_h
FROM reports
WHERE status IN ('pending','pending_approval','in_progress','waiting','revisit')
""")).fetchone()
over_24h = int(row.over_24h)
over_72h = int(row.over_72h)
longest_h = float(row.longest_h)
return {
"total": total, "pending": pending,
"in_progress": in_prog, "done": done,
"cost_pending": cost_pend, "improvement_open": imp_open,
"time_metric_base": _base,
"time_metric_worktime": _mode,
"avg_resolution_hours_30d": avg_30d,
"avg_resolution_hours_7d": avg_7d,
"pending_over_24h": over_24h,
"pending_over_72h": over_72h,
"longest_pending_hours": longest_h,
}
finally:
db.close()
def _month_range(n: int = 13):
"""최근 n개월 목록 생성 (YYYY-MM 형식)."""
now = datetime.now()
result = []
for i in range(n - 1, -1, -1):
m = now.month - i
y = now.year
while m <= 0:
m += 12
y -= 1
result.append(f"{y:04d}-{m:02d}")
return result
@app.get("/api/stats/monthly")
def stats_monthly(months: int = 13):
from database import SessionLocal
from sqlalchemy import text
from models import SystemSetting, Holiday
db = SessionLocal()
try:
def _setting(key, default):
r = db.query(SystemSetting).filter_by(key=key).first()
return r.value if r else default
_base = _setting("time_metric_base", "occurred")
_raw = _setting("time_metric_worktime", "off")
if _raw == "true": _raw = "worktime"
elif _raw == "false": _raw = "off"
_mode = _raw
if _base == "reported":
t_join = "rep.reported_at"
else:
t_join = "COALESCE(rep.occurred_at, rep.reported_at)"
all_months = _month_range(months)
if _mode in ("worktime", "holiday_24h"):
holiday_dates = {r.holiday_date for r in db.query(Holiday).all()}
calc_fn = _calc_business_hours if _mode == "worktime" else _calc_holiday_excluded_hours
rows = db.execute(text(f"""
SELECT TO_CHAR(r.completed_at, 'YYYY-MM') AS month,
{t_join} AS start_t,
r.completed_at
FROM repairs r
JOIN repair_reports rr ON rr.repair_id = r.id
JOIN reports rep ON rep.id = rr.report_id
WHERE r.completed_at IS NOT NULL
AND r.completed_at >= NOW() - INTERVAL '{months} months'
""")).fetchall()
by_month = defaultdict(list)
for row in rows:
if row[1] and row[2]:
by_month[row[0]].append(calc_fn(row[1], row[2], holiday_dates))
data = {}
for m, vals in by_month.items():
data[m] = {"avg": round(sum(vals) / len(vals), 1), "count": len(vals)}
else:
rows = db.execute(text(f"""
SELECT TO_CHAR(r.completed_at, 'YYYY-MM') AS month,
ROUND(AVG(EXTRACT(EPOCH FROM (r.completed_at - {t_join}))/3600)::numeric, 1) AS avg_h,
COUNT(*) AS cnt
FROM repairs r
JOIN repair_reports rr ON rr.repair_id = r.id
JOIN reports rep ON rep.id = rr.report_id
WHERE r.completed_at IS NOT NULL
AND r.completed_at >= NOW() - INTERVAL '{months} months'
GROUP BY month
ORDER BY month
""")).fetchall()
data = {row[0]: {"avg": float(row[1]) if row[1] else None, "count": int(row[2])} for row in rows}
# ── 월별 신고 접수 건수 ──
rpt_rows = db.execute(text(f"""
SELECT TO_CHAR(reported_at, 'YYYY-MM') AS month,
COUNT(*) AS total,
COUNT(*) FILTER (WHERE status = 'done') AS done_cnt
FROM reports
WHERE reported_at >= NOW() - INTERVAL '{months} months'
GROUP BY month
ORDER BY month
""")).fetchall()
rpt_map = {row[0]: {"total": int(row[1]), "done": int(row[2])} for row in rpt_rows}
result = []
for m in all_months:
d = data.get(m)
r = rpt_map.get(m)
result.append({
"month": m,
"avg_hours": d["avg"] if d else None,
"count": d["count"] if d else 0,
"report_total": r["total"] if r else 0,
"report_done": r["done"] if r else 0,
})
return {"data": result, "time_metric_worktime": _mode, "time_metric_base": _base}
finally:
db.close()
@app.get("/api/stats/daily")
def stats_daily(month: str):
"""month: YYYY-MM. Returns day-by-day processing time and report counts."""
from database import SessionLocal
from sqlalchemy import text
from models import SystemSetting, Holiday
try:
year, mon = int(month[:4]), int(month[5:7])
except (ValueError, IndexError):
raise HTTPException(400, "month must be YYYY-MM format")
db = SessionLocal()
try:
def _setting(key, default):
r = db.query(SystemSetting).filter_by(key=key).first()
return r.value if r else default
_base = _setting("time_metric_base", "occurred")
_raw = _setting("time_metric_worktime", "off")
if _raw == "true": _raw = "worktime"
elif _raw == "false": _raw = "off"
_mode = _raw
if _base == "reported":
t_join = "rep.reported_at"
t_plain = "reported_at"
else:
t_join = "COALESCE(rep.occurred_at, rep.reported_at)"
t_plain = "COALESCE(occurred_at, reported_at)"
_, days_in_month = cal_module.monthrange(year, mon)
all_days = [f"{year:04d}-{mon:02d}-{d:02d}" for d in range(1, days_in_month + 1)]
if _mode in ("worktime", "holiday_24h"):
holiday_dates = {r.holiday_date for r in db.query(Holiday).all()}
calc_fn = _calc_business_hours if _mode == "worktime" else _calc_holiday_excluded_hours
rows = db.execute(text(f"""
SELECT TO_CHAR(r.completed_at, 'YYYY-MM-DD') AS day,
{t_join} AS start_t,
r.completed_at
FROM repairs r
JOIN repair_reports rr ON rr.repair_id = r.id
JOIN reports rep ON rep.id = rr.report_id
WHERE r.completed_at IS NOT NULL
AND TO_CHAR(r.completed_at, 'YYYY-MM') = :month
"""), {"month": month}).fetchall()
by_day = defaultdict(list)
for row in rows:
if row[1] and row[2]:
by_day[row[0]].append(calc_fn(row[1], row[2], holiday_dates))
data = {}
for d, vals in by_day.items():
data[d] = {"avg": round(sum(vals) / len(vals), 1), "count": len(vals)}
else:
rows = db.execute(text(f"""
SELECT TO_CHAR(r.completed_at, 'YYYY-MM-DD') AS day,
ROUND(AVG(EXTRACT(EPOCH FROM (r.completed_at - {t_join}))/3600)::numeric, 1) AS avg_h,
COUNT(*) AS cnt
FROM repairs r
JOIN repair_reports rr ON rr.repair_id = r.id
JOIN reports rep ON rep.id = rr.report_id
WHERE r.completed_at IS NOT NULL
AND TO_CHAR(r.completed_at, 'YYYY-MM') = :month
GROUP BY day
ORDER BY day
"""), {"month": month}).fetchall()
data = {row[0]: {"avg": float(row[1]) if row[1] else None, "count": int(row[2])} for row in rows}
rpt_rows = db.execute(text(f"""
SELECT TO_CHAR(reported_at, 'YYYY-MM-DD') AS day,
COUNT(*) AS total,
COUNT(*) FILTER (WHERE status = 'done') AS done_cnt
FROM reports
WHERE TO_CHAR(reported_at, 'YYYY-MM') = :month
GROUP BY day
ORDER BY day
"""), {"month": month}).fetchall()
rpt_map = {row[0]: {"total": int(row[1]), "done": int(row[2])} for row in rpt_rows}
result = []
for day in all_days:
d = data.get(day)
r = rpt_map.get(day)
result.append({
"day": day,
"avg_hours": d["avg"] if d else None,
"count": d["count"] if d else 0,
"report_total": r["total"] if r else 0,
"report_done": r["done"] if r else 0,
})
return {"data": result, "time_metric_worktime": _mode, "time_metric_base": _base, "month": month}
finally:
db.close()
@app.get("/api/stats/daily/detail")
def stats_daily_detail(day: str):
"""day: YYYY-MM-DD. Returns per-repair and per-report detail for that day."""
import json as _json
from database import SessionLocal
from sqlalchemy import text
from models import SystemSetting, Holiday
try:
datetime.strptime(day, "%Y-%m-%d")
except ValueError:
raise HTTPException(400, "day must be YYYY-MM-DD format")
db = SessionLocal()
try:
def _setting(key, default):
r = db.query(SystemSetting).filter_by(key=key).first()
return r.value if r else default
_base = _setting("time_metric_base", "occurred")
_raw = _setting("time_metric_worktime", "off")
if _raw == "true": _raw = "worktime"
elif _raw == "false": _raw = "off"
_mode = _raw
if _base == "reported":
t_join = "rep.reported_at"
else:
t_join = "COALESCE(rep.occurred_at, rep.reported_at)"
if _mode in ("worktime", "holiday_24h"):
holiday_dates = {r.holiday_date for r in db.query(Holiday).all()}
calc_fn = _calc_business_hours if _mode == "worktime" else _calc_holiday_excluded_hours
# ── 처리 완료 내역 ──
repair_rows = db.execute(text(f"""
SELECT r.id AS repair_id,
rep.id AS report_id,
rep.charger_id,
c.station_name,
rep.issue_types,
{t_join} AS start_t,
r.completed_at,
u.name AS mechanic_name
FROM repairs r
JOIN repair_reports rr ON rr.repair_id = r.id
JOIN reports rep ON rep.id = rr.report_id
LEFT JOIN users u ON u.id = r.mechanic_id
LEFT JOIN chargers c ON c.id = rep.charger_id
WHERE r.completed_at IS NOT NULL
AND TO_CHAR(r.completed_at, 'YYYY-MM-DD') = :day
ORDER BY r.completed_at
"""), {"day": day}).fetchall()
repairs = []
for row in repair_rows:
start_t, completed = row[5], row[6]
if _mode in ("worktime", "holiday_24h") and start_t and completed:
h = calc_fn(start_t, completed, holiday_dates)
elif start_t and completed:
h = round((completed - start_t).total_seconds() / 3600, 1)
else:
h = None
try:
issues = _json.loads(row[4]) if row[4] else []
except Exception:
issues = []
repairs.append({
"repair_id": row[0],
"report_id": row[1],
"charger_id": row[2] or "",
"station_name": row[3] or "",
"issue_types": issues,
"start_t": start_t.isoformat() if start_t else None,
"completed_at": completed.isoformat() if completed else None,
"processing_hours": h,
"mechanic_name": row[7] or "",
})
# ── 신고 접수 내역 ──
rpt_rows = db.execute(text("""
SELECT rep.id, rep.charger_id, c.station_name,
rep.issue_types, rep.status, rep.reported_at
FROM reports rep
LEFT JOIN chargers c ON c.id = rep.charger_id
WHERE TO_CHAR(rep.reported_at, 'YYYY-MM-DD') = :day
ORDER BY rep.reported_at
"""), {"day": day}).fetchall()
reports = []
for row in rpt_rows:
try:
issues = _json.loads(row[3]) if row[3] else []
except Exception:
issues = []
reports.append({
"id": row[0],
"charger_id": row[1] or "",
"station_name": row[2] or "",
"issue_types": issues,
"status": row[4],
"reported_at": row[5].isoformat() if row[5] else None,
})
return {"day": day, "repairs": repairs, "reports": reports}
finally:
db.close()
@app.get("/api/stats/top-chargers")
def stats_top_chargers(limit: int = 10):
"""충전기별 누적 고장 신고 건수 Top N."""
from database import SessionLocal
from sqlalchemy import text
db = SessionLocal()
try:
rows = db.execute(text("""
SELECT rep.charger_id,
COALESCE(c.station_name, rep.charger_id) AS station_name,
COALESCE(c.name, '') AS charger_name,
COUNT(*) AS total,
COUNT(*) FILTER (WHERE rep.status = 'done') AS done_cnt,
COUNT(*) FILTER (WHERE rep.status != 'done') AS active_cnt
FROM reports rep
LEFT JOIN chargers c ON c.id = rep.charger_id
GROUP BY rep.charger_id, c.station_name, c.name
ORDER BY total DESC
LIMIT :lim
"""), {"lim": limit}).fetchall()
return [
{
"charger_id": row[0],
"station_name": row[1],
"charger_name": row[2],
"total": int(row[3]),
"done": int(row[4]),
"active": int(row[5]),
}
for row in rows
]
finally:
db.close()

View File

@@ -9,6 +9,8 @@ class ChargerType(Base):
description = Column(Text)
created_at = Column(TIMESTAMP, server_default=func.now())
chargers = relationship("Charger", back_populates="charger_type")
errors = relationship("ChargerTypeError", back_populates="charger_type",
cascade="all, delete-orphan", order_by="ChargerTypeError.display_order")
class User(Base):
__tablename__ = "users"
@@ -21,6 +23,7 @@ class User(Base):
phone = Column(String(20))
email = Column(String(100))
is_active = Column(Boolean, default=True)
is_pending = Column(Boolean, default=False)
created_at = Column(TIMESTAMP, server_default=func.now())
class Charger(Base):
@@ -52,10 +55,14 @@ class Report(Base):
gps_lat = Column(Float)
gps_lng = Column(Float)
status = Column(String(30), default="pending")
ocpp_log = Column(Text)
source = Column(String(20), default="qr") # qr | admin
reported_by = Column(Integer, ForeignKey("users.id"), nullable=True)
reported_at = Column(TIMESTAMP, server_default=func.now())
charger = relationship("Charger", back_populates="reports")
photos = relationship("ReportPhoto", back_populates="report", cascade="all, delete-orphan")
repair_links = relationship("RepairReport", back_populates="report")
reporter = relationship("User", foreign_keys=[reported_by])
class ReportPhoto(Base):
__tablename__ = "report_photos"
@@ -74,7 +81,12 @@ class Repair(Base):
started_at = Column(TIMESTAMP, nullable=False)
completed_at = Column(TIMESTAMP)
result_status = Column(String(20), default="done")
mechanic_lat = Column(Float)
mechanic_lng = Column(Float)
approved_at = Column(TIMESTAMP)
approved_by = Column(Integer, ForeignKey("users.id"))
mechanic = relationship("User", foreign_keys=[mechanic_id])
approver = relationship("User", foreign_keys=[approved_by])
report_links = relationship("RepairReport", back_populates="repair", cascade="all, delete-orphan")
photos = relationship("RepairPhoto", back_populates="repair", cascade="all, delete-orphan")
cost = relationship("RepairCost", back_populates="repair", uselist=False)
@@ -161,8 +173,36 @@ class ImprovementLog(Base):
improvement = relationship("Improvement", back_populates="logs")
changer = relationship("User")
class ChargerTypeError(Base):
__tablename__ = "charger_type_errors"
id = Column(Integer, primary_key=True)
charger_type_id = Column(Integer, ForeignKey("charger_types.id", ondelete="CASCADE"), nullable=False)
error_code = Column(String(20), nullable=False)
error_name = Column(String(100), nullable=False)
range_condition = Column(String(200))
description = Column(Text)
auto_recovery = Column(Boolean, default=True)
display_order = Column(Integer, default=0)
charger_type = relationship("ChargerType", back_populates="errors")
class Manufacturer(Base):
__tablename__ = "manufacturers"
id = Column(Integer, primary_key=True)
name = Column(String(100), nullable=False)
representative_name = Column(String(100))
business_number = Column(String(50))
phone = Column(String(30))
address = Column(Text)
is_active = Column(Boolean, default=True)
created_at = Column(TIMESTAMP, server_default=func.now())
class SystemSetting(Base):
__tablename__ = "system_settings"
key = Column(String(100), primary_key=True)
value = Column(Text, nullable=False)
updated_at = Column(TIMESTAMP, server_default=func.now(), onupdate=func.now())
class Holiday(Base):
__tablename__ = "holidays"
holiday_date = Column(Date, primary_key=True)
name = Column(String(100), nullable=False)

View File

@@ -1,6 +1,6 @@
from fastapi import APIRouter, Depends, HTTPException, Form
from fastapi import APIRouter, Depends, HTTPException, Form, Body
from sqlalchemy.orm import Session
from typing import Optional
from typing import List, Optional
from database import get_db
import models
from auth import require_admin, hash_password, get_current_user
@@ -15,6 +15,7 @@ def list_users(role: Optional[str] = None, db: Session = Depends(get_db), _=Depe
"id": u.id, "username": u.username, "role": u.role,
"company": u.company, "name": u.name, "phone": u.phone,
"email": u.email, "is_active": u.is_active,
"is_pending": getattr(u, 'is_pending', False),
"created_at": u.created_at.isoformat(),
} for u in q.order_by(models.User.id).all()]
@@ -53,6 +54,29 @@ def update_user(
db.commit()
return {"ok": True}
@router.patch("/{user_id}/approve")
def approve_user(user_id: int, db: Session = Depends(get_db), _=Depends(require_admin)):
u = db.query(models.User).filter_by(id=user_id).first()
if not u: raise HTTPException(404)
u.is_active = True
u.is_pending = False
db.commit()
return {"ok": True}
@router.delete("/bulk")
def bulk_delete_accounts(
ids: List[int] = Body(...),
db: Session = Depends(get_db),
current_user: models.User = Depends(require_admin)
):
if not ids:
raise HTTPException(400, "삭제할 항목을 선택하세요.")
safe_ids = [i for i in ids if i != current_user.id]
count = db.query(models.User).filter(models.User.id.in_(safe_ids)).update(
{"is_active": False}, synchronize_session=False)
db.commit()
return {"deactivated": count}
@router.delete("/{user_id}")
def delete_user(user_id: int, db: Session = Depends(get_db),
current_user: models.User = Depends(require_admin)):

View File

@@ -1,20 +1,24 @@
from fastapi import APIRouter, Depends, HTTPException
from fastapi import APIRouter, Depends, HTTPException, Form
from fastapi.security import OAuth2PasswordRequestForm
from sqlalchemy.orm import Session
from typing import Optional
from database import get_db
import models
from auth import verify_password, create_access_token, get_current_user
from auth import verify_password, create_access_token, get_current_user, hash_password
router = APIRouter(prefix="/api/auth", tags=["auth"])
@router.post("/login")
def login(form: OAuth2PasswordRequestForm = Depends(), db: Session = Depends(get_db)):
user = db.query(models.User).filter(
models.User.username == form.username,
models.User.is_active == True
models.User.username == form.username
).first()
if not user or not verify_password(form.password, user.password_hash):
raise HTTPException(status_code=401, detail="아이디 또는 비밀번호가 올바르지 않습니다.")
if getattr(user, 'is_pending', False):
raise HTTPException(status_code=403, detail="가입 승인 대기 중입니다. 관리자 승인 후 이용 가능합니다.")
if not user.is_active:
raise HTTPException(status_code=403, detail="비활성화된 계정입니다. 관리자에게 문의하세요.")
token = create_access_token({"sub": str(user.id)})
return {
"access_token": token,
@@ -24,6 +28,30 @@ def login(form: OAuth2PasswordRequestForm = Depends(), db: Session = Depends(get
"user_id": user.id
}
@router.post("/register")
def register(
username: str = Form(...),
password: str = Form(...),
name: str = Form(...),
phone: str = Form(""),
company: str = Form(""),
db: Session = Depends(get_db)
):
if db.query(models.User).filter_by(username=username).first():
raise HTTPException(400, "이미 사용 중인 아이디입니다.")
user = models.User(
username=username,
password_hash=hash_password(password),
role="mechanic",
name=name,
phone=phone or None,
company=company or None,
is_active=False,
is_pending=True,
)
db.add(user); db.commit()
return {"ok": True}
@router.get("/me")
def me(current_user: models.User = Depends(get_current_user)):
return {

View File

@@ -1,8 +1,8 @@
import os
from fastapi import APIRouter, Depends, HTTPException, UploadFile, File, Form
from fastapi import APIRouter, Depends, HTTPException, UploadFile, File, Form, Body
from fastapi.responses import JSONResponse
from sqlalchemy.orm import Session
from typing import Optional
from typing import List, Optional
from database import get_db
import models
from auth import require_admin, get_current_user
@@ -41,7 +41,80 @@ def delete_type(type_id: int, db: Session = Depends(get_db), _=Depends(require_a
db.delete(t); db.commit()
return {"ok": True}
# ── 충전기 종류별 에러 코드 ──────────────────────────
@router.get("/types/{type_id}/errors")
def list_type_errors(type_id: int, db: Session = Depends(get_db)):
errors = (db.query(models.ChargerTypeError)
.filter_by(charger_type_id=type_id)
.order_by(models.ChargerTypeError.display_order)
.all())
return [{"id": e.id, "error_code": e.error_code, "error_name": e.error_name,
"range_condition": e.range_condition, "description": e.description,
"auto_recovery": e.auto_recovery, "display_order": e.display_order}
for e in errors]
@router.post("/types/{type_id}/errors")
def create_type_error(
type_id: int,
error_code: str = Form(...), error_name: str = Form(...),
range_condition: str = Form(""), description: str = Form(""),
auto_recovery: bool = Form(True), display_order: int = Form(0),
db: Session = Depends(get_db), _=Depends(require_admin)
):
if not db.query(models.ChargerType).filter_by(id=type_id).first():
raise HTTPException(404)
e = models.ChargerTypeError(
charger_type_id=type_id, error_code=error_code, error_name=error_name,
range_condition=range_condition or None, description=description or None,
auto_recovery=auto_recovery, display_order=display_order
)
db.add(e); db.commit(); db.refresh(e)
return {"id": e.id}
@router.put("/types/{type_id}/errors/{error_id}")
def update_type_error(
type_id: int, error_id: int,
error_code: str = Form(...), error_name: str = Form(...),
range_condition: str = Form(""), description: str = Form(""),
auto_recovery: bool = Form(True), display_order: int = Form(0),
db: Session = Depends(get_db), _=Depends(require_admin)
):
e = db.query(models.ChargerTypeError).filter_by(id=error_id, charger_type_id=type_id).first()
if not e: raise HTTPException(404)
e.error_code = error_code; e.error_name = error_name
e.range_condition = range_condition or None; e.description = description or None
e.auto_recovery = auto_recovery; e.display_order = display_order
db.commit()
return {"ok": True}
@router.delete("/types/{type_id}/errors/{error_id}")
def delete_type_error(
type_id: int, error_id: int,
db: Session = Depends(get_db), _=Depends(require_admin)
):
e = db.query(models.ChargerTypeError).filter_by(id=error_id, charger_type_id=type_id).first()
if not e: raise HTTPException(404)
db.delete(e); db.commit()
return {"ok": True}
# ── 충전기 ──────────────────────────────────────────
@router.delete("/bulk")
def bulk_delete_chargers(
ids: List[str] = Body(...),
db: Session = Depends(get_db),
_=Depends(require_admin)
):
if not ids:
raise HTTPException(400, "삭제할 항목을 선택하세요.")
has_reports = db.query(models.Report.charger_id).filter(
models.Report.charger_id.in_(ids)).distinct().all()
if has_reports:
blocked = [r[0] for r in has_reports]
raise HTTPException(400, f"신고 내역이 있는 충전기는 삭제할 수 없습니다: {', '.join(blocked)}")
result = db.query(models.Charger).filter(models.Charger.id.in_(ids)).delete(synchronize_session=False)
db.commit()
return {"deleted": result}
@router.get("")
def list_chargers(db: Session = Depends(get_db)):
chargers = db.query(models.Charger).order_by(models.Charger.id).all()
@@ -62,6 +135,19 @@ def list_chargers(db: Session = Depends(get_db)):
})
return result
@router.get("/{charger_id}/errors")
def get_charger_errors(charger_id: str, db: Session = Depends(get_db)):
c = db.query(models.Charger).filter_by(id=charger_id).first()
if not c: raise HTTPException(404)
if not c.charger_type_id: return []
errors = (db.query(models.ChargerTypeError)
.filter_by(charger_type_id=c.charger_type_id)
.order_by(models.ChargerTypeError.display_order)
.all())
return [{"id": e.id, "error_code": e.error_code, "error_name": e.error_name,
"range_condition": e.range_condition, "auto_recovery": e.auto_recovery}
for e in errors]
@router.get("/{charger_id}")
def get_charger(charger_id: str, db: Session = Depends(get_db)):
c = db.query(models.Charger).filter_by(id=charger_id).first()

View File

@@ -1,7 +1,7 @@
from fastapi import APIRouter, Depends, HTTPException, Form
from fastapi import APIRouter, Depends, HTTPException, Form, Body
from sqlalchemy.orm import Session
from sqlalchemy import desc
from typing import Optional
from sqlalchemy import desc, text
from typing import List, Optional
from datetime import datetime
from database import get_db
import models
@@ -48,6 +48,18 @@ def list_costs(
})
return result
@router.delete("/bulk")
def bulk_delete_costs(
ids: List[int] = Body(...),
db: Session = Depends(get_db),
_=Depends(require_admin)
):
if not ids:
raise HTTPException(400, "삭제할 항목을 선택하세요.")
result = db.execute(text("DELETE FROM repair_costs WHERE id = ANY(:ids)"), {"ids": ids})
db.commit()
return {"deleted": result.rowcount}
@router.get("/stats")
def cost_stats(db: Session = Depends(get_db), _=Depends(require_admin)):
from sqlalchemy import func, extract

View File

@@ -77,7 +77,7 @@ def export_reports(db: Session = Depends(get_db), _=Depends(require_admin)):
headers = [
"접수번호","충전기ID","충전기종류","충전기명","충전소명","CPO명","설치일",
"신고위치(위도)","신고위치(경도)","문제유형","에러코드","상세설명",
"신고자연락처","문제발생시각","신고일시","처리상태",
"신고자연락처","문제발생시각","신고일시","신고출처","신고자","처리상태",
"담당정비사","정비사소속","조치유형","조치내용",
"조치시작","조치완료","작업소요시간","신고→완료소요시간",
"문제원인(관리자)","비고","출장비부담주체","출장비금액(원)","출장비상태",
@@ -85,7 +85,7 @@ def export_reports(db: Session = Depends(get_db), _=Depends(require_admin)):
]
style_header(ws, headers)
col_widths = [10,14,14,14,18,14,12,12,12,22,12,24,14,16,16,12,
col_widths = [10,14,14,14,18,14,12,12,12,22,12,24,14,16,16,10,16,12,
12,14,16,24,16,16,12,18,24,24,16,12,12,12,16,18]
for i, w in enumerate(col_widths, 1):
ws.column_dimensions[ws.cell(1, i).column_letter].width = w
@@ -99,9 +99,10 @@ def export_reports(db: Session = Depends(get_db), _=Depends(require_admin)):
ir.improvement_id
for ir in db.query(models.ImprovementReport).filter_by(report_id=r.id).all()
]
seq_no = row_num - 1 # 순차번호 (1부터 시작)
row_data = [
r.id,
seq_no,
r.charger_id,
c.charger_type.name if c and c.charger_type else "",
c.name if c else "",
@@ -116,6 +117,8 @@ def export_reports(db: Session = Depends(get_db), _=Depends(require_admin)):
r.contact or "",
fmt_dt(r.occurred_at),
fmt_dt(r.reported_at),
{"qr": "QR스캔", "admin": "관리자접수", "dashboard": "대시보드접수"}.get(r.source or "qr", r.source or "qr"),
r.reporter.name if r.reporter else "",
r.status,
repair.mechanic.name if repair and repair.mechanic else "",
repair.mechanic.company if repair and repair.mechanic else "",
@@ -211,17 +214,30 @@ def export_improvements(db: Session = Depends(get_db), _=Depends(require_admin))
headers = [
"번호","제목","분류","우선순위","개선내용","관련부품",
"담당제조사","담당자","연락처","연결AS건수","연결AS번호",
"담당제조사","담당자","연락처","연결AS건수","연결AS번호","연결AS신고자",
"진행상태","SW배포목표일","SW실제배포일","제조사메모",
"등록관리자","등록일시"
]
style_header(ws, headers)
for i, w in enumerate([8,24,10,10,30,14,16,12,14,10,18,12,14,14,24,12,16], 1):
for i, w in enumerate([8,24,10,10,30,14,16,12,14,10,18,24,12,14,14,24,12,16], 1):
ws.column_dimensions[ws.cell(1, i).column_letter].width = w
imps = db.query(models.Improvement).order_by(desc(models.Improvement.created_at)).all()
for row_num, imp in enumerate(imps, 2):
rids = [ir.report_id for ir in imp.report_links]
reporters = []
for ir in imp.report_links:
r = ir.report
if not r:
continue
if r.source == "admin" and r.reporter:
reporters.append(f"#{r.id} {r.reporter.name}(관리자)")
elif r.contact:
reporters.append(f"#{r.id} {r.contact}(QR)")
else:
reporters.append(f"#{r.id} 익명(QR)")
row_data = [
imp.id, imp.title, imp.category, imp.priority,
imp.description, imp.part_name or "",
@@ -230,6 +246,7 @@ def export_improvements(db: Session = Depends(get_db), _=Depends(require_admin))
imp.manufacturer.phone if imp.manufacturer else "",
len(rids),
", ".join(str(i) for i in rids),
"\n".join(reporters),
imp.status,
fmt_d(imp.sw_deploy_target),
fmt_d(imp.sw_deployed_at),

View File

@@ -0,0 +1,68 @@
from fastapi import APIRouter, Depends, HTTPException, Form, Request
from sqlalchemy.orm import Session
from sqlalchemy import extract
from database import get_db
from auth import require_admin
import models
from datetime import date as date_type
router = APIRouter(prefix="/api/holidays", tags=["holidays"])
@router.get("")
def list_holidays(year: int = None, db: Session = Depends(get_db), _=Depends(require_admin)):
q = db.query(models.Holiday)
if year:
q = q.filter(extract('year', models.Holiday.holiday_date) == year)
rows = q.order_by(models.Holiday.holiday_date).all()
return [{"date": str(r.holiday_date), "name": r.name} for r in rows]
@router.post("")
def add_holiday(
holiday_date: str = Form(...),
name: str = Form(...),
db: Session = Depends(get_db),
_=Depends(require_admin),
):
try:
d = date_type.fromisoformat(holiday_date)
except ValueError:
raise HTTPException(400, "날짜 형식이 올바르지 않습니다 (YYYY-MM-DD).")
if db.query(models.Holiday).filter_by(holiday_date=d).first():
raise HTTPException(400, "이미 등록된 날짜입니다.")
db.add(models.Holiday(holiday_date=d, name=name))
db.commit()
return {"ok": True}
@router.post("/bulk")
async def bulk_add_holidays(request: Request, db: Session = Depends(get_db), _=Depends(require_admin)):
"""JSON body: [{"date": "YYYY-MM-DD", "name": "공휴일명"}, ...]"""
items = await request.json()
if not isinstance(items, list):
raise HTTPException(400, "배열 형식이어야 합니다.")
added = 0
for item in items:
try:
d = date_type.fromisoformat(item["date"])
except (KeyError, ValueError):
continue
if not db.query(models.Holiday).filter_by(holiday_date=d).first():
db.add(models.Holiday(holiday_date=d, name=item.get("name", "공휴일")))
added += 1
db.commit()
return {"ok": True, "added": added}
@router.delete("/{holiday_date}")
def delete_holiday(
holiday_date: str,
db: Session = Depends(get_db),
_=Depends(require_admin),
):
try:
d = date_type.fromisoformat(holiday_date)
except ValueError:
raise HTTPException(400, "날짜 형식이 올바르지 않습니다.")
h = db.query(models.Holiday).filter_by(holiday_date=d).first()
if not h:
raise HTTPException(404, "등록된 공휴일이 아닙니다.")
db.delete(h); db.commit()
return {"ok": True}

View File

@@ -1,7 +1,7 @@
import json
from fastapi import APIRouter, Depends, HTTPException, UploadFile, File, Form
from fastapi import APIRouter, Depends, HTTPException, UploadFile, File, Form, Body
from sqlalchemy.orm import Session
from sqlalchemy import desc
from sqlalchemy import desc, text
from typing import List, Optional
from datetime import datetime
from database import get_db
@@ -86,6 +86,18 @@ async def create_improvement(
db.commit()
return {"id": imp.id}
@router.delete("/bulk")
def bulk_delete_improvements(
ids: List[int] = Body(...),
db: Session = Depends(get_db),
_=Depends(require_admin)
):
if not ids:
raise HTTPException(400, "삭제할 항목을 선택하세요.")
result = db.execute(text("DELETE FROM improvements WHERE id = ANY(:ids)"), {"ids": ids})
db.commit()
return {"deleted": result.rowcount}
@router.patch("/{imp_id}/status")
def update_status(
imp_id: int, status: str = Form(...), memo: str = Form(""),

View File

@@ -0,0 +1,85 @@
from fastapi import APIRouter, Depends, HTTPException, Form
from sqlalchemy.orm import Session
from typing import Optional
from database import get_db
import models
from auth import require_admin
router = APIRouter(prefix="/api/manufacturers", tags=["manufacturers"])
def _fmt(m: models.Manufacturer) -> dict:
return {
"id": m.id, "name": m.name,
"representative_name": m.representative_name,
"business_number": m.business_number,
"phone": m.phone, "address": m.address,
"is_active": m.is_active,
"created_at": m.created_at.isoformat() if m.created_at else None,
}
@router.get("/public")
def list_public(db: Session = Depends(get_db)):
"""인증 없이 활성 제조사 목록 반환 (회원가입 화면용)"""
rows = db.query(models.Manufacturer).filter_by(is_active=True).order_by(models.Manufacturer.name).all()
return [{"id": m.id, "name": m.name} for m in rows]
@router.get("")
def list_manufacturers(db: Session = Depends(get_db), _=Depends(require_admin)):
rows = db.query(models.Manufacturer).order_by(models.Manufacturer.name).all()
return [_fmt(m) for m in rows]
@router.post("")
def create_manufacturer(
name: str = Form(...),
representative_name: str = Form(""),
business_number: str = Form(""),
phone: str = Form(""),
address: str = Form(""),
db: Session = Depends(get_db),
_=Depends(require_admin)
):
if db.query(models.Manufacturer).filter_by(name=name).first():
raise HTTPException(400, "이미 등록된 회사명입니다.")
m = models.Manufacturer(
name=name,
representative_name=representative_name or None,
business_number=business_number or None,
phone=phone or None,
address=address or None,
)
db.add(m); db.commit(); db.refresh(m)
return _fmt(m)
@router.put("/{mfr_id}")
def update_manufacturer(
mfr_id: int,
name: str = Form(...),
representative_name: str = Form(""),
business_number: str = Form(""),
phone: str = Form(""),
address: str = Form(""),
is_active: str = Form("true"),
db: Session = Depends(get_db),
_=Depends(require_admin)
):
m = db.query(models.Manufacturer).filter_by(id=mfr_id).first()
if not m: raise HTTPException(404)
dup = db.query(models.Manufacturer).filter(
models.Manufacturer.name == name, models.Manufacturer.id != mfr_id
).first()
if dup: raise HTTPException(400, "이미 사용 중인 회사명입니다.")
m.name = name
m.representative_name = representative_name or None
m.business_number = business_number or None
m.phone = phone or None
m.address = address or None
m.is_active = is_active == "true"
db.commit()
return _fmt(m)
@router.delete("/{mfr_id}")
def delete_manufacturer(mfr_id: int, db: Session = Depends(get_db), _=Depends(require_admin)):
m = db.query(models.Manufacturer).filter_by(id=mfr_id).first()
if not m: raise HTTPException(404)
db.delete(m); db.commit()
return {"ok": True}

View File

@@ -6,11 +6,57 @@ from typing import List, Optional
from datetime import datetime
from database import get_db
import models
from auth import require_mechanic, get_current_user
from auth import require_mechanic, require_admin, get_current_user
from utils import save_upload
router = APIRouter(prefix="/api/repairs", tags=["repairs"])
STATUS_MAP = {
"done": "done",
"waiting": "waiting",
"revisit": "revisit",
"in_progress": "in_progress",
}
def _fmt_repair(repair: models.Repair) -> dict:
reports = []
charger_id = None
station_name = None
charger_name = None
for link in repair.report_links:
r = link.report
if r:
if not charger_id and r.charger:
charger_id = r.charger_id
station_name = r.charger.station_name
charger_name = r.charger.name
reports.append({
"id": r.id,
"charger_id": r.charger_id,
"issue_types": r.issue_types,
"status": r.status,
})
return {
"id": repair.id,
"charger_id": charger_id,
"charger_name": charger_name,
"station_name": station_name,
"repair_types": repair.repair_types,
"description": repair.description,
"result_status": repair.result_status,
"mechanic_lat": repair.mechanic_lat,
"mechanic_lng": repair.mechanic_lng,
"started_at": repair.started_at.isoformat(),
"completed_at": repair.completed_at.isoformat() if repair.completed_at else None,
"approved_at": repair.approved_at.isoformat() if repair.approved_at else None,
"approved_by_name": repair.approver.name if repair.approved_by and repair.approver else None,
"photos_before": [{"id": p.id, "path": p.file_path} for p in repair.photos if p.photo_type == "before"],
"photos_after": [{"id": p.id, "path": p.file_path} for p in repair.photos if p.photo_type == "after"],
"reports": reports,
"report_count": len(reports),
}
@router.get("/pending")
def pending_reports(db: Session = Depends(get_db),
current_user: models.User = Depends(get_current_user)):
@@ -21,6 +67,10 @@ def pending_reports(db: Session = Depends(get_db),
result = []
for r in q.all():
c = r.charger
# in_progress 신고는 연결된 repair_id 포함 → 편집 모드로 연결
repair_id = None
if r.repair_links:
repair_id = r.repair_links[0].repair_id
result.append({
"id": r.id, "charger_id": r.charger_id,
"charger_name": c.name if c else None,
@@ -29,9 +79,23 @@ def pending_reports(db: Session = Depends(get_db),
"issue_types": r.issue_types, "status": r.status,
"reported_at": r.reported_at.isoformat(),
"occurred_at": r.occurred_at.isoformat() if r.occurred_at else None,
"repair_id": repair_id,
"gps_lat": c.gps_lat if c else None,
"gps_lng": c.gps_lng if c else None,
"location_detail": c.location_detail if c else None,
})
return result
@router.get("/my")
def my_repairs(db: Session = Depends(get_db),
current_user: models.User = Depends(require_mechanic)):
repairs = db.query(models.Repair).filter_by(
mechanic_id=current_user.id
).order_by(desc(models.Repair.completed_at)).limit(100).all()
return [_fmt_repair(r) for r in repairs]
@router.get("/charger/{charger_id}/open")
def open_reports_for_charger(charger_id: str, db: Session = Depends(get_db),
_=Depends(require_mechanic)):
@@ -47,72 +111,185 @@ def open_reports_for_charger(charger_id: str, db: Session = Depends(get_db),
"photos": [p.file_path for p in r.photos],
} for r in reports]
@router.get("/{repair_id}")
def get_repair(repair_id: int, db: Session = Depends(get_db),
current_user: models.User = Depends(get_current_user)):
repair = db.query(models.Repair).filter_by(id=repair_id).first()
if not repair: raise HTTPException(404)
if repair.mechanic_id != current_user.id and current_user.role != "admin":
raise HTTPException(403, "접근 권한이 없습니다.")
return _fmt_repair(repair)
@router.post("")
async def create_repair(
report_ids: str = Form(...), # JSON 배열
repair_types: str = Form(...), # JSON 배열
report_ids: str = Form(...),
repair_types: str = Form(...),
description: str = Form(...),
result_status: str = Form("done"),
mechanic_lat: Optional[float] = Form(None),
mechanic_lng: Optional[float] = Form(None),
photos_before: List[UploadFile] = File(default=[]),
photos_after: List[UploadFile] = File(default=[]),
db: Session = Depends(get_db),
current_user: models.User = Depends(require_mechanic)
):
rids = json.loads(report_ids)
rtypes = json.loads(repair_types)
repair = models.Repair(
mechanic_id=current_user.id,
repair_types=rtypes,
repair_types=json.loads(repair_types),
description=description,
started_at=datetime.now(),
completed_at=datetime.now(),
result_status=result_status,
mechanic_lat=mechanic_lat,
mechanic_lng=mechanic_lng,
)
db.add(repair); db.commit(); db.refresh(repair)
# 신고 연결 및 상태 업데이트
for rid in rids:
r = db.query(models.Report).filter_by(id=rid).first()
if r:
new_status = "done" if result_status == "done" else (
"waiting" if result_status == "waiting" else "revisit"
)
r.status = new_status
r.status = STATUS_MAP.get(result_status, "in_progress")
db.add(models.RepairReport(repair_id=repair.id, report_id=rid))
# 사진 저장
for photo in photos_before:
if photo.filename:
path = save_upload(photo, f"repairs/{repair.id}")
db.add(models.RepairPhoto(repair_id=repair.id, photo_type="before", file_path=path))
db.add(models.RepairPhoto(repair_id=repair.id, photo_type="before",
file_path=save_upload(photo, f"repairs/{repair.id}")))
for photo in photos_after:
if photo.filename:
path = save_upload(photo, f"repairs/{repair.id}")
db.add(models.RepairPhoto(repair_id=repair.id, photo_type="after", file_path=path))
db.add(models.RepairPhoto(repair_id=repair.id, photo_type="after",
file_path=save_upload(photo, f"repairs/{repair.id}")))
db.commit()
return {"id": repair.id}
@router.get("/my")
def my_repairs(db: Session = Depends(get_db),
current_user: models.User = Depends(require_mechanic)):
repairs = db.query(models.Repair).filter_by(
mechanic_id=current_user.id
).order_by(desc(models.Repair.completed_at)).limit(50).all()
result = []
for repair in repairs:
rids = [rr.report_id for rr in repair.report_links]
charger_id = None
if rids:
r = db.query(models.Report).filter_by(id=rids[0]).first()
if r: charger_id = r.charger_id
result.append({
"id": repair.id, "charger_id": charger_id,
"repair_types": repair.repair_types,
"result_status": repair.result_status,
"started_at": repair.started_at.isoformat(),
"completed_at": repair.completed_at.isoformat() if repair.completed_at else None,
"report_count": len(rids),
})
return result
@router.put("/{repair_id}")
async def update_repair(
repair_id: int,
repair_types: str = Form(...),
description: str = Form(...),
result_status: str = Form("done"),
mechanic_lat: Optional[float] = Form(None),
mechanic_lng: Optional[float] = Form(None),
photos_before: List[UploadFile] = File(default=[]),
photos_after: List[UploadFile] = File(default=[]),
db: Session = Depends(get_db),
current_user: models.User = Depends(require_mechanic)
):
repair = db.query(models.Repair).filter_by(id=repair_id).first()
if not repair: raise HTTPException(404)
if repair.mechanic_id != current_user.id:
raise HTTPException(403, "본인 조치 이력만 수정할 수 있습니다.")
if repair.approved_at:
raise HTTPException(403, "관리자가 승인한 조치는 수정할 수 없습니다.")
repair.repair_types = json.loads(repair_types)
repair.description = description
repair.result_status = result_status
repair.completed_at = datetime.now()
if mechanic_lat is not None: repair.mechanic_lat = mechanic_lat
if mechanic_lng is not None: repair.mechanic_lng = mechanic_lng
for link in repair.report_links:
r = link.report
if r: r.status = STATUS_MAP.get(result_status, "in_progress")
for photo in photos_before:
if photo.filename:
db.add(models.RepairPhoto(repair_id=repair_id, photo_type="before",
file_path=save_upload(photo, f"repairs/{repair_id}")))
for photo in photos_after:
if photo.filename:
db.add(models.RepairPhoto(repair_id=repair_id, photo_type="after",
file_path=save_upload(photo, f"repairs/{repair_id}")))
db.commit()
return {"ok": True}
@router.post("/{repair_id}/approve")
def approve_repair(
repair_id: int,
improvement_action: str = Form("none"), # none | link | create
improvement_id: Optional[int] = Form(None),
imp_title: str = Form(""),
imp_category: str = Form(""),
imp_description: str = Form(""),
imp_priority: str = Form("normal"),
imp_manufacturer_id: Optional[int] = Form(None),
db: Session = Depends(get_db),
current_user: models.User = Depends(require_admin)
):
repair = db.query(models.Repair).filter_by(id=repair_id).first()
if not repair: raise HTTPException(404)
repair.approved_at = datetime.now()
repair.approved_by = current_user.id
target_imp_id = None
if improvement_action == "link" and improvement_id:
target_imp_id = improvement_id
elif improvement_action == "create":
if not imp_title or not imp_category or not imp_description:
raise HTTPException(400, "개선항목 제목, 분류, 내용을 모두 입력해 주세요.")
imp = models.Improvement(
title=imp_title, category=imp_category,
description=imp_description, priority=imp_priority,
manufacturer_id=imp_manufacturer_id or None,
created_by=current_user.id,
)
db.add(imp); db.flush()
db.add(models.ImprovementLog(
improvement_id=imp.id, changed_by=current_user.id,
old_status=None, new_status="registered",
memo=f"조치 승인 시 생성 (수리 #{repair_id})"
))
target_imp_id = imp.id
if target_imp_id:
for link in repair.report_links:
exists = db.query(models.ImprovementReport).filter_by(
improvement_id=target_imp_id, report_id=link.report_id
).first()
if not exists:
db.add(models.ImprovementReport(
improvement_id=target_imp_id, report_id=link.report_id
))
db.commit()
return {"ok": True, "improvement_id": target_imp_id}
@router.delete("/{repair_id}")
def cancel_repair(
repair_id: int,
db: Session = Depends(get_db),
_=Depends(require_admin)
):
repair = db.query(models.Repair).filter_by(id=repair_id).first()
if not repair: raise HTTPException(404)
# 연결된 신고를 접수(pending) 상태로 되돌림
for link in repair.report_links:
if link.report:
link.report.status = "pending"
db.delete(repair) # cascade: RepairReport, RepairPhoto 자동 삭제
db.commit()
return {"ok": True}
@router.delete("/{repair_id}/photos/{photo_id}")
def delete_repair_photo(repair_id: int, photo_id: int,
db: Session = Depends(get_db),
current_user: models.User = Depends(require_mechanic)):
repair = db.query(models.Repair).filter_by(id=repair_id).first()
if not repair: raise HTTPException(404)
if repair.mechanic_id != current_user.id and current_user.role != "admin":
raise HTTPException(403)
if repair.approved_at and current_user.role != "admin":
raise HTTPException(403, "승인된 조치는 수정할 수 없습니다.")
photo = db.query(models.RepairPhoto).filter_by(id=photo_id, repair_id=repair_id).first()
if not photo: raise HTTPException(404)
db.delete(photo); db.commit()
return {"ok": True}

View File

@@ -1,20 +1,27 @@
from fastapi import APIRouter, Depends, HTTPException, UploadFile, File, Form
from fastapi import APIRouter, Depends, HTTPException, UploadFile, File, Form, Body
from sqlalchemy.orm import Session
from sqlalchemy import desc
from sqlalchemy import desc, text
from typing import List, Optional
from datetime import datetime
import os, uuid
from database import get_db
import models
from auth import require_admin, get_current_user
from utils import save_upload
from auth import require_admin, get_current_user, get_optional_user
from utils import save_upload, UPLOAD_DIR
router = APIRouter(prefix="/api/reports", tags=["reports"])
def _fmt_report(r: models.Report, db: Session):
c = r.charger
repair_id = None
mechanic_name = None
mechanic_company = None
if r.repair_links:
repair_id = r.repair_links[0].repair_id
rep = r.repair_links[0].repair
if rep and rep.mechanic:
mechanic_name = rep.mechanic.name
mechanic_company = rep.mechanic.company
return {
"id": r.id, "charger_id": r.charger_id,
"charger_name": c.name if c else None,
@@ -27,9 +34,17 @@ def _fmt_report(r: models.Report, db: Session):
"occurred_at": r.occurred_at.isoformat() if r.occurred_at else None,
"reported_at": r.reported_at.isoformat() if r.reported_at else None,
"gps_lat": r.gps_lat, "gps_lng": r.gps_lng,
"charger_lat": c.gps_lat if c else None,
"charger_lng": c.gps_lng if c else None,
"location_detail": c.location_detail if c else None,
"status": r.status,
"photos": [p.file_path for p in r.photos],
"ocpp_log": r.ocpp_log,
"source": r.source or "qr",
"reported_by_name": r.reporter.name if r.reporter else None,
"photos": [{"id": p.id, "path": p.file_path} for p in r.photos],
"repair_id": repair_id,
"mechanic_name": mechanic_name,
"mechanic_company": mechanic_company,
}
@router.post("")
@@ -43,8 +58,11 @@ async def create_report(
consent: bool = Form(False),
gps_lat: Optional[float] = Form(None),
gps_lng: Optional[float] = Form(None),
ocpp_log: Optional[str] = Form(None),
source: Optional[str] = Form(None),
photos: List[UploadFile] = File(default=[]),
db: Session = Depends(get_db)
db: Session = Depends(get_db),
current_user: Optional[models.User] = Depends(get_optional_user)
):
import json
charger = db.query(models.Charger).filter_by(id=charger_id).first()
@@ -55,13 +73,21 @@ async def create_report(
policy = setting.value if setting else "immediate"
initial_status = "pending_approval" if policy == "admin_approval" else "pending"
if current_user:
source_value = source if source in ("admin", "dashboard") else "admin"
else:
source_value = "qr"
issue_list = json.loads(issue_types) if isinstance(issue_types, str) else issue_types
r = models.Report(
charger_id=charger_id, issue_types=issue_list,
issue_detail=issue_detail or None, error_code=error_code or None,
occurred_at=datetime.fromisoformat(occurred_at) if occurred_at else None,
contact=contact or None, consent=consent,
gps_lat=gps_lat, gps_lng=gps_lng, status=initial_status
gps_lat=gps_lat, gps_lng=gps_lng, status=initial_status,
ocpp_log=ocpp_log or None,
source=source_value,
reported_by=current_user.id if current_user else None,
)
db.add(r); db.commit(); db.refresh(r)
@@ -72,15 +98,95 @@ async def create_report(
db.commit()
return {"id": r.id, "status": r.status}
@router.post("/batch")
async def create_batch_report(
charger_id: str = Form(...),
scope: str = Form("single"), # single | station | type
issue_types: str = Form(...),
issue_detail: str = Form(""),
error_code: str = Form(""),
occurred_at: Optional[str] = Form(None),
contact: str = Form(""),
consent: bool = Form(False),
gps_lat: Optional[float] = Form(None),
gps_lng: Optional[float] = Form(None),
ocpp_log: Optional[str] = Form(None),
source: Optional[str] = Form(None),
photos: List[UploadFile] = File(default=[]),
db: Session = Depends(get_db),
current_user: Optional[models.User] = Depends(get_optional_user)
):
import json
charger = db.query(models.Charger).filter_by(id=charger_id).first()
if not charger: raise HTTPException(404, "충전기를 찾을 수 없습니다.")
if scope == "station":
targets = db.query(models.Charger).filter_by(
station_name=charger.station_name, is_active=True).all()
elif scope == "type" and charger.charger_type_id:
targets = db.query(models.Charger).filter_by(
charger_type_id=charger.charger_type_id, is_active=True).all()
else:
targets = [charger]
setting = db.query(models.SystemSetting).filter_by(key="report_visibility_policy").first()
policy = setting.value if setting else "immediate"
initial_status = "pending_approval" if policy == "admin_approval" else "pending"
issue_list = json.loads(issue_types) if isinstance(issue_types, str) else issue_types
if current_user:
source_value = source if source in ("admin", "dashboard") else "admin"
else:
source_value = "qr"
# Read all photo bytes upfront so they can be written for each target
photo_data = []
for photo in photos:
if photo.filename:
photo_data.append((photo.filename, photo.file.read()))
created_ids = []
for target in targets:
r = models.Report(
charger_id=target.id, issue_types=issue_list,
issue_detail=issue_detail or None, error_code=error_code or None,
occurred_at=datetime.fromisoformat(occurred_at) if occurred_at else None,
contact=contact or None, consent=consent,
gps_lat=gps_lat, gps_lng=gps_lng, status=initial_status,
ocpp_log=ocpp_log or None,
source=source_value,
reported_by=current_user.id if current_user else None,
)
db.add(r); db.commit(); db.refresh(r)
for fname, content in photo_data:
ext = os.path.splitext(fname)[1].lower() or ".jpg"
sub = f"reports/{r.id}"
folder = os.path.join(UPLOAD_DIR, sub)
os.makedirs(folder, exist_ok=True)
dest = os.path.join(folder, f"{uuid.uuid4().hex}{ext}")
with open(dest, "wb") as f:
f.write(content)
db.add(models.ReportPhoto(report_id=r.id, file_path=f"/uploads/{sub}/{os.path.basename(dest)}"))
db.commit()
created_ids.append(r.id)
return {"ids": created_ids, "count": len(created_ids), "primary_id": created_ids[0] if created_ids else None}
@router.get("")
def list_reports(
status: Optional[str] = None,
charger_id: Optional[str] = None,
active_only: bool = False,
db: Session = Depends(get_db),
current_user: models.User = Depends(get_current_user)
):
q = db.query(models.Report).order_by(desc(models.Report.reported_at))
if status: q = q.filter(models.Report.status == status)
if status:
q = q.filter(models.Report.status == status)
elif active_only:
q = q.filter(models.Report.status.in_(
["pending", "pending_approval", "in_progress", "waiting", "revisit"]))
if charger_id: q = q.filter(models.Report.charger_id == charger_id)
# 정비사는 공개된 것만 (승인 대기 제외)
if current_user.role == "mechanic":
@@ -106,6 +212,12 @@ def get_report(report_id: int, db: Session = Depends(get_db),
"started_at": repair.started_at.isoformat(),
"completed_at": repair.completed_at.isoformat() if repair.completed_at else None,
"result_status": repair.result_status,
"mechanic_lat": repair.mechanic_lat,
"mechanic_lng": repair.mechanic_lng,
"charger_lat": r.charger.gps_lat if r.charger else None,
"charger_lng": r.charger.gps_lng if r.charger else None,
"approved_at": repair.approved_at.isoformat() if repair.approved_at else None,
"approved_by_name": repair.approver.name if repair.approved_by and repair.approver else None,
"photos_before": [p.file_path for p in repair.photos if p.photo_type == "before"],
"photos_after": [p.file_path for p in repair.photos if p.photo_type == "after"],
"cost": {
@@ -116,10 +228,92 @@ def get_report(report_id: int, db: Session = Depends(get_db),
"cost_amount": cost.cost_amount,
"cost_status": cost.cost_status,
"manufacturer_name": cost.manufacturer.name if cost.manufacturer else None,
} if cost else None
} if cost else None,
"linked_improvements": _get_linked_improvements(repair, db),
}
return result
def _get_linked_improvements(repair, db):
rids = [lk.report_id for lk in repair.report_links]
if not rids:
return []
imp_links = db.query(models.ImprovementReport).filter(
models.ImprovementReport.report_id.in_(rids)
).all()
seen, result = set(), []
for il in imp_links:
if il.improvement_id not in seen:
seen.add(il.improvement_id)
imp = db.query(models.Improvement).filter_by(id=il.improvement_id).first()
if imp:
result.append({"id": imp.id, "title": imp.title,
"category": imp.category, "status": imp.status})
return result
@router.delete("/bulk")
def bulk_delete_reports(
ids: List[int] = Body(...),
db: Session = Depends(get_db),
_=Depends(require_admin)
):
if not ids:
raise HTTPException(400, "삭제할 항목을 선택하세요.")
db.execute(text("DELETE FROM repair_reports WHERE report_id = ANY(:ids)"), {"ids": ids})
result = db.execute(text("DELETE FROM reports WHERE id = ANY(:ids)"), {"ids": ids})
db.commit()
return {"deleted": result.rowcount}
@router.patch("/{report_id}")
async def update_report(
report_id: int,
issue_types: Optional[str] = Form(None),
issue_detail: Optional[str] = Form(None),
error_code: Optional[str] = Form(None),
contact: Optional[str] = Form(None),
occurred_at: Optional[str] = Form(None),
status: Optional[str] = Form(None),
ocpp_log: Optional[str] = Form(None),
photos: List[UploadFile] = File(default=[]),
db: Session = Depends(get_db),
_=Depends(require_admin)
):
import json
r = db.query(models.Report).filter_by(id=report_id).first()
if not r: raise HTTPException(404)
if issue_types is not None:
r.issue_types = json.loads(issue_types)
if issue_detail is not None:
r.issue_detail = issue_detail or None
if error_code is not None:
r.error_code = error_code or None
if contact is not None:
r.contact = contact or None
if occurred_at is not None:
r.occurred_at = datetime.fromisoformat(occurred_at) if occurred_at else None
if status is not None:
r.status = status
if ocpp_log is not None:
r.ocpp_log = ocpp_log or None
db.commit()
for photo in photos:
if photo.filename:
path = save_upload(photo, f"reports/{report_id}")
db.add(models.ReportPhoto(report_id=report_id, file_path=path))
db.commit()
return {"ok": True}
@router.delete("/{report_id}/photos/{photo_id}")
def delete_report_photo(
report_id: int, photo_id: int,
db: Session = Depends(get_db),
_=Depends(require_admin)
):
photo = db.query(models.ReportPhoto).filter_by(id=photo_id, report_id=report_id).first()
if not photo: raise HTTPException(404)
db.delete(photo)
db.commit()
return {"ok": True}
@router.patch("/{report_id}/approve")
def approve_report(report_id: int, db: Session = Depends(get_db), _=Depends(require_admin)):
r = db.query(models.Report).filter_by(id=report_id).first()

View File

@@ -1,9 +1,9 @@
from fastapi import APIRouter, Depends, Form
from fastapi import APIRouter, Depends, Form, Request
from sqlalchemy.orm import Session
from datetime import datetime
from typing import Optional
from database import get_db
import models
import models, json
from auth import require_admin
router = APIRouter(prefix="/api/settings", tags=["settings"])
@@ -47,13 +47,69 @@ def get_settings(db: Session = Depends(get_db), _=Depends(require_admin)):
result[r.key] = r.value
return result
# ── 관리자 설정 저장 (신고공개정책 + 이미지설정 통합)
DEFAULT_ISSUE_TYPES = [
{"key": "충전불가", "label": "⚡ 충전 불가"},
{"key": "화면오류", "label": "🖥 화면 오류"},
{"key": "케이블불량", "label": "🔌 케이블 불량"},
{"key": "결제오류", "label": "💳 결제 오류"},
{"key": "외관손상", "label": "🔨 외관 손상"},
{"key": "에러발생", "label": "⚠️ 에러 발생"},
{"key": "기타", "label": "📋 기타"},
]
@router.get("/issue-types")
def get_issue_types(db: Session = Depends(get_db)):
row = db.query(models.SystemSetting).filter_by(key="issue_types").first()
if row:
return json.loads(row.value)
return DEFAULT_ISSUE_TYPES
@router.put("/issue-types")
async def update_issue_types(request: Request, db: Session = Depends(get_db), _=Depends(require_admin)):
items = await request.json()
if not isinstance(items, list):
from fastapi import HTTPException
raise HTTPException(400, "배열 형식이어야 합니다.")
upsert(db, "issue_types", json.dumps(items, ensure_ascii=False))
db.commit()
return {"ok": True}
DEFAULT_REPAIR_TYPES = [
{"key": "부품교체", "label": "🔩 부품 교체"},
{"key": "재시작", "label": "🔄 재시작"},
{"key": "설정변경", "label": "⚙️ 설정 변경"},
{"key": "청소", "label": "🧹 청소"},
{"key": "배선정리", "label": "🔌 배선 정리"},
{"key": "펌웨어", "label": "💾 펌웨어 업데이트"},
{"key": "기타", "label": "📋 기타"},
]
@router.get("/repair-types")
def get_repair_types(db: Session = Depends(get_db)):
row = db.query(models.SystemSetting).filter_by(key="repair_types").first()
if row:
return json.loads(row.value)
return DEFAULT_REPAIR_TYPES
@router.put("/repair-types")
async def update_repair_types(request: Request, db: Session = Depends(get_db), _=Depends(require_admin)):
items = await request.json()
if not isinstance(items, list):
from fastapi import HTTPException
raise HTTPException(400, "배열 형식이어야 합니다.")
upsert(db, "repair_types", json.dumps(items, ensure_ascii=False))
db.commit()
return {"ok": True}
# ── 관리자 설정 저장 (신고공개정책 + 이미지설정 + 처리시간기준 통합)
@router.put("")
def update_settings(
report_visibility_policy: str = Form(...),
image_compress_enabled: str = Form("true"),
image_max_px: str = Form("1024"),
image_quality: str = Form("85"),
time_metric_base: str = Form("occurred"),
time_metric_worktime: str = Form("false"),
db: Session = Depends(get_db),
_ = Depends(require_admin)
):
@@ -62,6 +118,8 @@ def update_settings(
("image_compress_enabled", image_compress_enabled),
("image_max_px", image_max_px),
("image_quality", image_quality),
("time_metric_base", time_metric_base),
("time_metric_worktime", time_metric_worktime),
]
for key, val in pairs:
upsert(db, key, val)