"""충전 세션 API — 핵심 충전 흐름 관리 흐름: QR 스캔 → POST /sessions (세션 생성) → 결제 완료 → POST /sessions/{uid}/start (RemoteStart) → 충전 중 MeterValues 수신 → POST /sessions/{uid}/stop (RemoteStop) → 자동 정산 """ import uuid from datetime import datetime, timezone from typing import List, Optional from fastapi import APIRouter, Depends, HTTPException, Query from sqlalchemy import select, func, and_ from sqlalchemy.ext.asyncio import AsyncSession from app.database import get_db from app.models import Charger, ChargingSession, SessionStatus, ChargerStatus from app.schemas import SessionCreate, SessionOut, SessionBilling from app.services.steve_client import steve_client from app.services.billing import calculate_bill router = APIRouter(prefix="/sessions", tags=["충전 세션"]) # ── 세션 생성 (QR 스캔 시) ── @router.post("/", response_model=SessionOut, status_code=201) async def create_session( data: SessionCreate, db: AsyncSession = Depends(get_db), ): """충전 세션 생성 사용자가 QR 스캔 시 호출. 충전기 상태 확인 후 세션 생성. 이후 결제 → 충전 시작 순서로 진행. """ # 충전기 확인 result = await db.execute( select(Charger).where(Charger.charge_box_id == data.charge_box_id) ) charger = result.scalar_one_or_none() if not charger: raise HTTPException(404, f"충전기 미등록: {data.charge_box_id}") if not charger.is_active: raise HTTPException(400, "비활성화된 충전기입니다") # 해당 충전기에 진행 중인 세션 확인 active = await db.execute( select(ChargingSession).where( and_( ChargingSession.charger_id == charger.id, ChargingSession.status.in_([ SessionStatus.PENDING, SessionStatus.AUTHORIZED, SessionStatus.CHARGING, ]), ) ) ) if active.scalar_one_or_none(): raise HTTPException(409, "이미 사용 중인 충전기입니다") # 세션 생성 session_uid = f"SES-{uuid.uuid4().hex[:12].upper()}" id_tag = f"APP-{uuid.uuid4().hex[:8].upper()}" session = ChargingSession( session_uid=session_uid, charger_id=charger.id, connector_id=data.connector_id, id_tag=id_tag, status=SessionStatus.PENDING, ) db.add(session) await db.flush() await db.refresh(session) return session # ── 충전 시작 (결제 승인 후) ── @router.post("/{session_uid}/start") async def start_charging( session_uid: str, db: AsyncSession = Depends(get_db), ): """충전 시작 — 결제 승인 후 호출 Steve 서버에 RemoteStartTransaction 전송. 충전기가 수락하면 StartTransaction 응답이 옴. """ session = await _get_session(db, session_uid) if session.status != SessionStatus.AUTHORIZED: raise HTTPException(400, f"시작 불가 상태: {session.status}") charger = await db.get(Charger, session.charger_id) # Steve에 원격 시작 명령 result = await steve_client.remote_start_transaction( charge_box_id=charger.charge_box_id, id_tag=session.id_tag, connector_id=session.connector_id, ) if not result: raise HTTPException(502, "Steve 서버 통신 실패") session.status = SessionStatus.CHARGING session.started_at = datetime.now(timezone.utc) return { "message": "충전 시작 명령 전송 완료", "session_uid": session_uid, "steve_response": result, } # ── 충전 종료 ── @router.post("/{session_uid}/stop") async def stop_charging( session_uid: str, db: AsyncSession = Depends(get_db), ): """충전 종료 — 사용자 요청 또는 자동 종료""" session = await _get_session(db, session_uid) if session.status != SessionStatus.CHARGING: raise HTTPException(400, f"종료 불가 상태: {session.status}") if not session.ocpp_transaction_id: raise HTTPException(400, "OCPP 트랜잭션 ID 없음 — 아직 시작되지 않음") charger = await db.get(Charger, session.charger_id) # Steve에 원격 종료 명령 result = await steve_client.remote_stop_transaction( charge_box_id=charger.charge_box_id, transaction_id=session.ocpp_transaction_id, ) if not result: raise HTTPException(502, "Steve 서버 통신 실패") return { "message": "충전 종료 명령 전송 완료", "session_uid": session_uid, } # ── 세션 상태 조회 ── @router.get("/{session_uid}", response_model=SessionOut) async def get_session( session_uid: str, db: AsyncSession = Depends(get_db), ): """세션 상태 조회 (충전 중 폴링용)""" return await _get_session(db, session_uid) @router.get("/{session_uid}/billing", response_model=SessionBilling) async def get_session_billing( session_uid: str, db: AsyncSession = Depends(get_db), ): """세션 정산 내역""" session = await _get_session(db, session_uid) if session.status not in (SessionStatus.COMPLETED, SessionStatus.CHARGING): raise HTTPException(400, "정산 정보 없음") start_wh = session.meter_start or 0 end_wh = session.meter_stop or session.last_meter_value or start_wh bill = calculate_bill(start_wh, end_wh) return SessionBilling( session_uid=session_uid, charged_kwh=bill["charged_kwh"], electricity_cost=bill["electricity_cost"], service_fee=bill["service_fee"], total_bill=bill["total_bill"], saved_vs_cpo=bill["saved_vs_cpo"], ) # ── 세션 목록 ── @router.get("/", response_model=List[SessionOut]) async def list_sessions( status: Optional[str] = None, charge_box_id: Optional[str] = None, limit: int = Query(50, le=200), offset: int = 0, db: AsyncSession = Depends(get_db), ): """충전 세션 목록 (관리자 대시보드용)""" stmt = select(ChargingSession).order_by( ChargingSession.created_at.desc() ) if status: stmt = stmt.where(ChargingSession.status == status) if charge_box_id: stmt = stmt.join(Charger).where( Charger.charge_box_id == charge_box_id ) stmt = stmt.offset(offset).limit(limit) result = await db.execute(stmt) return result.scalars().all() # ── 테스트용 (프로덕션에서 제거) ── @router.post("/reset/{charge_box_id}") async def reset_charger_sessions( charge_box_id: str, db: AsyncSession = Depends(get_db), ): """[테스트 전용] 충전기의 미완료 세션 전부 취소""" result = await db.execute( select(Charger).where(Charger.charge_box_id == charge_box_id) ) charger = result.scalar_one_or_none() if not charger: return {"message": "충전기 없음", "cancelled": 0} result = await db.execute( select(ChargingSession).where( and_( ChargingSession.charger_id == charger.id, ChargingSession.status.in_([ SessionStatus.PENDING, SessionStatus.AUTHORIZED, SessionStatus.CHARGING, ]), ) ) ) sessions = result.scalars().all() for s in sessions: s.status = SessionStatus.CANCELLED return {"message": f"{len(sessions)}건 세션 취소", "cancelled": len(sessions)} @router.post("/{session_uid}/force-authorize") async def force_authorize( session_uid: str, db: AsyncSession = Depends(get_db), ): """[테스트 전용] 결제 없이 세션을 AUTHORIZED로 강제 변경""" session = await _get_session(db, session_uid) session.status = SessionStatus.AUTHORIZED return {"message": "세션 AUTHORIZED로 변경", "session_uid": session_uid} # ── 유틸 ── async def _get_session( db: AsyncSession, session_uid: str ) -> ChargingSession: result = await db.execute( select(ChargingSession).where( ChargingSession.session_uid == session_uid ) ) session = result.scalar_one_or_none() if not session: raise HTTPException(404, f"세션 없음: {session_uid}") return session