Files
ev-charger-as/backend/routers/reports.py
byun af7e47529c 기능 추가 — 신고 상황종료 처리
- DB: reports 테이블에 closure_type, closure_note, closed_at, closed_by 컬럼 추가
- 백엔드: PATCH /reports/{id}/close 엔드포인트 (사유 4종: natural/remote_reset/false_alarm/other)
- 신고상세: 승인대기 상태에서 [상황종료] 버튼 추가, 인라인 패널에서 사유 선택
- 상황종료 후 상세 화면에 사유·메모·처리자·일시 표시
- 엑셀 AS신고목록에 상황종료 4개 컬럼 추가
- 신고목록 필터·지도 상태 목록에 closed 추가, CSS 뱃지 추가

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-06-01 09:39:50 +09:00

405 lines
16 KiB
Python

from fastapi import APIRouter, Depends, HTTPException, UploadFile, File, Form, Body
from sqlalchemy.orm import Session
from sqlalchemy import desc, text
from typing import List, Optional
from datetime import datetime
import os, uuid
from database import get_db
import models
from auth import require_admin, get_current_user, get_optional_user
from utils import save_upload, UPLOAD_DIR
router = APIRouter(prefix="/api/reports", tags=["reports"])
def _fmt_report(r: models.Report, db: Session):
c = r.charger
repair_id = None
mechanic_name = None
mechanic_company = None
if r.repair_links:
repair_id = r.repair_links[0].repair_id
rep = r.repair_links[0].repair
if rep and rep.mechanic:
mechanic_name = rep.mechanic.name
mechanic_company = rep.mechanic.company
return {
"id": r.id, "charger_id": r.charger_id,
"charger_name": c.name if c else None,
"station_name": c.station_name if c else None,
"cpo_name": c.cpo_name if c else None,
"charger_type": c.charger_type.name if c and c.charger_type else None,
"installed_at": str(c.installed_at) if c and c.installed_at else None,
"issue_types": r.issue_types, "issue_detail": r.issue_detail,
"error_code": r.error_code, "contact": r.contact,
"occurred_at": r.occurred_at.isoformat() if r.occurred_at else None,
"reported_at": r.reported_at.isoformat() if r.reported_at else None,
"gps_lat": r.gps_lat, "gps_lng": r.gps_lng,
"charger_lat": c.gps_lat if c else None,
"charger_lng": c.gps_lng if c else None,
"location_detail": c.location_detail if c else None,
"status": r.status,
"ocpp_log": r.ocpp_log,
"source": r.source or "qr",
"reported_by_name": r.reporter.name if r.reporter else None,
"photos": [{"id": p.id, "path": p.file_path} for p in r.photos],
"repair_id": repair_id,
"mechanic_name": mechanic_name,
"mechanic_company": mechanic_company,
"closure_type": r.closure_type,
"closure_note": r.closure_note,
"closed_at": r.closed_at.isoformat() if r.closed_at else None,
"closed_by_name": r.closer.name if r.closer else None,
}
@router.post("")
async def create_report(
charger_id: str = Form(...),
issue_types: str = Form(...), # JSON 배열 문자열
issue_detail: str = Form(""),
error_code: str = Form(""),
occurred_at: Optional[str] = Form(None),
contact: str = Form(""),
consent: bool = Form(False),
gps_lat: Optional[float] = Form(None),
gps_lng: Optional[float] = Form(None),
ocpp_log: Optional[str] = Form(None),
source: Optional[str] = Form(None),
photos: List[UploadFile] = File(default=[]),
db: Session = Depends(get_db),
current_user: Optional[models.User] = Depends(get_optional_user)
):
import json
charger = db.query(models.Charger).filter_by(id=charger_id).first()
if not charger: raise HTTPException(404, "충전기를 찾을 수 없습니다.")
# 신고 공개 정책 확인
setting = db.query(models.SystemSetting).filter_by(key="report_visibility_policy").first()
policy = setting.value if setting else "immediate"
initial_status = "pending_approval" if policy == "admin_approval" else "pending"
if current_user:
source_value = source if source in ("admin", "dashboard") else "admin"
else:
source_value = "qr"
issue_list = json.loads(issue_types) if isinstance(issue_types, str) else issue_types
r = models.Report(
charger_id=charger_id, issue_types=issue_list,
issue_detail=issue_detail or None, error_code=error_code or None,
occurred_at=datetime.fromisoformat(occurred_at) if occurred_at else None,
contact=contact or None, consent=consent,
gps_lat=gps_lat, gps_lng=gps_lng, status=initial_status,
ocpp_log=ocpp_log or None,
source=source_value,
reported_by=current_user.id if current_user else None,
)
db.add(r); db.commit(); db.refresh(r)
for photo in photos:
if photo.filename:
path = save_upload(photo, f"reports/{r.id}")
db.add(models.ReportPhoto(report_id=r.id, file_path=path))
db.commit()
return {"id": r.id, "status": r.status}
@router.post("/batch")
async def create_batch_report(
charger_id: str = Form(...),
scope: str = Form("single"), # single | station | type
issue_types: str = Form(...),
issue_detail: str = Form(""),
error_code: str = Form(""),
occurred_at: Optional[str] = Form(None),
contact: str = Form(""),
consent: bool = Form(False),
gps_lat: Optional[float] = Form(None),
gps_lng: Optional[float] = Form(None),
ocpp_log: Optional[str] = Form(None),
source: Optional[str] = Form(None),
photos: List[UploadFile] = File(default=[]),
db: Session = Depends(get_db),
current_user: Optional[models.User] = Depends(get_optional_user)
):
import json
charger = db.query(models.Charger).filter_by(id=charger_id).first()
if not charger: raise HTTPException(404, "충전기를 찾을 수 없습니다.")
if scope == "station":
targets = db.query(models.Charger).filter_by(
station_name=charger.station_name, is_active=True).all()
elif scope == "type" and charger.charger_type_id:
targets = db.query(models.Charger).filter_by(
charger_type_id=charger.charger_type_id, is_active=True).all()
else:
targets = [charger]
setting = db.query(models.SystemSetting).filter_by(key="report_visibility_policy").first()
policy = setting.value if setting else "immediate"
initial_status = "pending_approval" if policy == "admin_approval" else "pending"
issue_list = json.loads(issue_types) if isinstance(issue_types, str) else issue_types
if current_user:
source_value = source if source in ("admin", "dashboard") else "admin"
else:
source_value = "qr"
# Read all photo bytes upfront so they can be written for each target
photo_data = []
for photo in photos:
if photo.filename:
photo_data.append((photo.filename, photo.file.read()))
created_ids = []
for target in targets:
r = models.Report(
charger_id=target.id, issue_types=issue_list,
issue_detail=issue_detail or None, error_code=error_code or None,
occurred_at=datetime.fromisoformat(occurred_at) if occurred_at else None,
contact=contact or None, consent=consent,
gps_lat=gps_lat, gps_lng=gps_lng, status=initial_status,
ocpp_log=ocpp_log or None,
source=source_value,
reported_by=current_user.id if current_user else None,
)
db.add(r); db.commit(); db.refresh(r)
for fname, content in photo_data:
ext = os.path.splitext(fname)[1].lower() or ".jpg"
sub = f"reports/{r.id}"
folder = os.path.join(UPLOAD_DIR, sub)
os.makedirs(folder, exist_ok=True)
dest = os.path.join(folder, f"{uuid.uuid4().hex}{ext}")
with open(dest, "wb") as f:
f.write(content)
db.add(models.ReportPhoto(report_id=r.id, file_path=f"/uploads/{sub}/{os.path.basename(dest)}"))
db.commit()
created_ids.append(r.id)
return {"ids": created_ids, "count": len(created_ids), "primary_id": created_ids[0] if created_ids else None}
@router.get("")
def list_reports(
status: Optional[str] = None,
charger_id: Optional[str] = None,
active_only: bool = False,
db: Session = Depends(get_db),
current_user: models.User = Depends(get_current_user)
):
q = db.query(models.Report).order_by(desc(models.Report.reported_at))
if status:
q = q.filter(models.Report.status == status)
elif active_only:
q = q.filter(models.Report.status.in_(
["pending", "pending_approval", "in_progress", "waiting", "revisit"]))
if charger_id: q = q.filter(models.Report.charger_id == charger_id)
# 정비사는 공개된 것만 (승인 대기 제외)
if current_user.role == "mechanic":
q = q.filter(models.Report.status != "pending_approval")
return [_fmt_report(r, db) for r in q.all()]
@router.get("/{report_id}")
def get_report(report_id: int, db: Session = Depends(get_db),
current_user: models.User = Depends(get_current_user)):
r = db.query(models.Report).filter_by(id=report_id).first()
if not r: raise HTTPException(404)
result = _fmt_report(r, db)
# 수리 정보 포함
if r.repair_links:
repair = r.repair_links[0].repair
cost = repair.cost
result["repair"] = {
"id": repair.id,
"mechanic_name": repair.mechanic.name if repair.mechanic else None,
"mechanic_company": repair.mechanic.company if repair.mechanic else None,
"repair_types": repair.repair_types,
"description": repair.description,
"started_at": repair.started_at.isoformat(),
"completed_at": repair.completed_at.isoformat() if repair.completed_at else None,
"result_status": repair.result_status,
"mechanic_lat": repair.mechanic_lat,
"mechanic_lng": repair.mechanic_lng,
"charger_lat": r.charger.gps_lat if r.charger else None,
"charger_lng": r.charger.gps_lng if r.charger else None,
"approved_at": repair.approved_at.isoformat() if repair.approved_at else None,
"approved_by_name": repair.approver.name if repair.approved_by and repair.approver else None,
"photos_before": [p.file_path for p in repair.photos if p.photo_type == "before"],
"photos_after": [p.file_path for p in repair.photos if p.photo_type == "after"],
"cost": {
"root_cause": cost.root_cause,
"admin_note": cost.admin_note,
"cost_party_type": cost.cost_party_type,
"cost_party_custom": cost.cost_party_custom,
"cost_amount": cost.cost_amount,
"cost_status": cost.cost_status,
"manufacturer_name": cost.manufacturer.name if cost.manufacturer else None,
} if cost else None,
"linked_improvements": _get_linked_improvements(repair, db),
}
return result
def _get_linked_improvements(repair, db):
rids = [lk.report_id for lk in repair.report_links]
if not rids:
return []
imp_links = db.query(models.ImprovementReport).filter(
models.ImprovementReport.report_id.in_(rids)
).all()
seen, result = set(), []
for il in imp_links:
if il.improvement_id not in seen:
seen.add(il.improvement_id)
imp = db.query(models.Improvement).filter_by(id=il.improvement_id).first()
if imp:
result.append({"id": imp.id, "title": imp.title,
"category": imp.category, "status": imp.status})
return result
@router.delete("/bulk")
def bulk_delete_reports(
ids: List[int] = Body(...),
db: Session = Depends(get_db),
_=Depends(require_admin)
):
if not ids:
raise HTTPException(400, "삭제할 항목을 선택하세요.")
db.execute(text("DELETE FROM repair_reports WHERE report_id = ANY(:ids)"), {"ids": ids})
result = db.execute(text("DELETE FROM reports WHERE id = ANY(:ids)"), {"ids": ids})
db.commit()
return {"deleted": result.rowcount}
@router.patch("/{report_id}")
async def update_report(
report_id: int,
issue_types: Optional[str] = Form(None),
issue_detail: Optional[str] = Form(None),
error_code: Optional[str] = Form(None),
contact: Optional[str] = Form(None),
occurred_at: Optional[str] = Form(None),
status: Optional[str] = Form(None),
ocpp_log: Optional[str] = Form(None),
photos: List[UploadFile] = File(default=[]),
db: Session = Depends(get_db),
_=Depends(require_admin)
):
import json
r = db.query(models.Report).filter_by(id=report_id).first()
if not r: raise HTTPException(404)
if issue_types is not None:
r.issue_types = json.loads(issue_types)
if issue_detail is not None:
r.issue_detail = issue_detail or None
if error_code is not None:
r.error_code = error_code or None
if contact is not None:
r.contact = contact or None
if occurred_at is not None:
r.occurred_at = datetime.fromisoformat(occurred_at) if occurred_at else None
if status is not None:
r.status = status
if ocpp_log is not None:
r.ocpp_log = ocpp_log or None
db.commit()
for photo in photos:
if photo.filename:
path = save_upload(photo, f"reports/{report_id}")
db.add(models.ReportPhoto(report_id=report_id, file_path=path))
db.commit()
return {"ok": True}
@router.delete("/{report_id}/photos/{photo_id}")
def delete_report_photo(
report_id: int, photo_id: int,
db: Session = Depends(get_db),
_=Depends(require_admin)
):
photo = db.query(models.ReportPhoto).filter_by(id=photo_id, report_id=report_id).first()
if not photo: raise HTTPException(404)
db.delete(photo)
db.commit()
return {"ok": True}
@router.patch("/{report_id}/approve")
def approve_report(report_id: int, db: Session = Depends(get_db), _=Depends(require_admin)):
r = db.query(models.Report).filter_by(id=report_id).first()
if not r: raise HTTPException(404)
r.status = "pending"; db.commit()
return {"ok": True}
CLOSURE_TYPES = {"natural", "remote_reset", "false_alarm", "other"}
@router.patch("/{report_id}/close")
def close_report(
report_id: int,
closure_type: str = Form(...),
closure_note: str = Form(""),
db: Session = Depends(get_db),
current_user: models.User = Depends(require_admin)
):
if closure_type not in CLOSURE_TYPES:
raise HTTPException(400, "유효하지 않은 상황종료 사유입니다.")
r = db.query(models.Report).filter_by(id=report_id).first()
if not r: raise HTTPException(404)
from datetime import datetime
r.status = "closed"
r.closure_type = closure_type
r.closure_note = closure_note.strip() or None
r.closed_at = datetime.now()
r.closed_by = current_user.id
db.commit()
return {"ok": True}
@router.patch("/{report_id}/status")
def update_status(report_id: int, status: str = Form(...),
db: Session = Depends(get_db), _=Depends(require_admin)):
r = db.query(models.Report).filter_by(id=report_id).first()
if not r: raise HTTPException(404)
r.status = status; db.commit()
return {"ok": True}
# ── 공개 엔드포인트 — 인증 없이 특정 충전기의 진행 중 신고 조회 ──
# QR 신고 페이지에서 기존 접수 현황을 사용자에게 보여줄 때 사용
@router.get("/public/{charger_id}")
def public_charger_reports(charger_id: str, db: Session = Depends(get_db)):
"""
해당 충전기에서 아직 해결되지 않은 신고 목록을 반환.
완료(done) / 면제 · 정산 상태는 제외하고 진행 중인 것만 반환.
개인정보(연락처) 는 반환하지 않음.
"""
active_statuses = ["pending_approval", "pending", "in_progress", "waiting", "revisit"]
rows = (
db.query(models.Report)
.filter(
models.Report.charger_id == charger_id,
models.Report.status.in_(active_statuses),
)
.order_by(models.Report.reported_at.desc())
.limit(20)
.all()
)
STATUS_LABEL = {
"pending_approval": "검토 대기",
"pending": "접수 완료",
"in_progress": "처리 중",
"waiting": "부품 대기",
"revisit": "재방문 예정",
}
result = []
for r in rows:
repair = r.repair_links[0].repair if r.repair_links else None
result.append({
"id": r.id,
"issue_types": r.issue_types,
"issue_detail": r.issue_detail or "",
"status": r.status,
"status_label": STATUS_LABEL.get(r.status, r.status),
"reported_at": r.reported_at.isoformat() if r.reported_at else "",
"occurred_at": r.occurred_at.isoformat() if r.occurred_at else "",
"photo_count": len(r.photos),
"mechanic_name": repair.mechanic.name if repair and repair.mechanic else None,
"started_at": repair.started_at.isoformat() if repair and repair.started_at else None,
})
return result