"""OCPP 이벤트 콜백 API Steve OCPP 서버에서 충전기 이벤트 수신. Steve 설정에서 webhook URL을 이 엔드포인트로 지정하거나, 주기적으로 Steve API를 폴링하여 이벤트 동기화. ※ Steve 버전에 따라 webhook 지원 여부가 다름. 지원하지 않는 경우 /ocpp/sync 엔드포인트로 수동 동기화. """ import logging from datetime import datetime, timezone from fastapi import APIRouter, Depends, HTTPException from sqlalchemy import select, and_ from sqlalchemy.ext.asyncio import AsyncSession from app.database import get_db from app.models import ( Charger, ChargingSession, MeterValueLog, ChargerStatus, SessionStatus, ) from app.schemas import ( OcppStatusNotification, OcppStartTransaction, OcppStopTransaction, OcppMeterValues, ) from app.services.billing import calculate_bill logger = logging.getLogger(__name__) router = APIRouter(prefix="/ocpp", tags=["OCPP 콜백"]) # ── StatusNotification ── @router.post("/status") async def handle_status_notification( data: OcppStatusNotification, db: AsyncSession = Depends(get_db), ): """충전기 상태 변경 수신 Available / Charging / Faulted / Unavailable """ charger = await _get_charger(db, data.charge_box_id) if not charger: logger.warning(f"미등록 충전기 상태 수신: {data.charge_box_id}") return {"status": "ignored", "reason": "unregistered"} # 상태 매핑 status_map = { "Available": ChargerStatus.AVAILABLE, "Charging": ChargerStatus.CHARGING, "Faulted": ChargerStatus.FAULTED, "Unavailable": ChargerStatus.UNAVAILABLE, "Reserved": ChargerStatus.RESERVED, } new_status = status_map.get(data.status, ChargerStatus.UNAVAILABLE) charger.status = new_status charger.last_heartbeat = data.timestamp or datetime.now(timezone.utc) logger.info(f"충전기 상태: {data.charge_box_id} → {new_status}") return {"status": "ok", "charger_status": new_status} # ── StartTransaction ── @router.post("/start-transaction") async def handle_start_transaction( data: OcppStartTransaction, db: AsyncSession = Depends(get_db), ): """충전 시작 이벤트 수신 충전기가 StartTransaction 보내면 transactionId 기록. meterStart 값으로 시작 전력량 저장. """ # id_tag로 세션 찾기 result = await db.execute( select(ChargingSession).where( and_( ChargingSession.id_tag == data.id_tag, ChargingSession.status.in_([ SessionStatus.AUTHORIZED, SessionStatus.CHARGING, ]), ) ).order_by(ChargingSession.created_at.desc()) ) session = result.scalar_one_or_none() if not session: logger.warning( f"매칭 세션 없음: tag={data.id_tag} " f"charger={data.charge_box_id}" ) return {"status": "no_session"} session.ocpp_transaction_id = data.transaction_id session.meter_start = data.meter_start session.status = SessionStatus.CHARGING session.started_at = data.timestamp or datetime.now(timezone.utc) logger.info( f"충전 시작: session={session.session_uid} " f"txn={data.transaction_id} meter={data.meter_start}Wh" ) return { "status": "ok", "session_uid": session.session_uid, "transaction_id": data.transaction_id, } # ── StopTransaction ── @router.post("/stop-transaction") async def handle_stop_transaction( data: OcppStopTransaction, db: AsyncSession = Depends(get_db), ): """충전 종료 이벤트 수신 + 자동 정산 충전기가 StopTransaction 보내면 meterStop 기록 후 요금 정산. """ # transactionId로 충전 중인 세션 찾기 result = await db.execute( select(ChargingSession).where( and_( ChargingSession.ocpp_transaction_id == data.transaction_id, ChargingSession.status == SessionStatus.CHARGING, ) ).order_by(ChargingSession.created_at.desc()) ) session = result.scalars().first() if not session: logger.warning(f"매칭 세션 없음: txn={data.transaction_id}") return {"status": "no_session"} # 전력량 기록 및 정산 session.meter_stop = data.meter_stop session.charged_wh = max(0, data.meter_stop - (session.meter_start or 0)) session.stopped_at = data.timestamp or datetime.now(timezone.utc) session.status = SessionStatus.COMPLETED # 요금 계산 bill = calculate_bill(session.meter_start or 0, data.meter_stop) session.electricity_cost = bill["electricity_cost"] session.service_fee = bill["service_fee"] session.total_bill = bill["total_bill"] logger.info( f"충전 완료: session={session.session_uid} " f"charged={bill['charged_kwh']}kWh " f"bill={bill['total_bill']}원" ) return { "status": "ok", "session_uid": session.session_uid, "billing": bill, } # ── MeterValues ── @router.post("/meter-values") async def handle_meter_values( data: OcppMeterValues, db: AsyncSession = Depends(get_db), ): """실시간 전력량 수신 충전 중 주기적으로 수신되는 MeterValues를 기록. 실시간 충전량/요금 계산에 사용. """ charger = await _get_charger(db, data.charge_box_id) # 로그 저장 log = MeterValueLog( charger_id=charger.id if charger else 0, transaction_id=data.transaction_id, connector_id=data.connector_id, measurand=data.measurand, value=data.value, unit="Wh", timestamp=data.timestamp or datetime.now(timezone.utc), ) db.add(log) # 진행 중 세션에 마지막 meter 값 업데이트 if data.transaction_id: result = await db.execute( select(ChargingSession).where( and_( ChargingSession.ocpp_transaction_id == data.transaction_id, ChargingSession.status == SessionStatus.CHARGING, ) ) ) session = result.scalars().first() if session: session.last_meter_value = int(data.value) session.charged_wh = max( 0, int(data.value) - (session.meter_start or 0) ) return {"status": "ok", "value": data.value} # ── Steve 데이터 동기화 (폴링 방식) ── @router.post("/sync") async def sync_from_steve( db: AsyncSession = Depends(get_db), ): """Steve 서버에서 최신 트랜잭션 데이터 동기화 Steve에 webhook이 없는 경우 이 엔드포인트를 cron 또는 APScheduler로 주기적 호출. """ from app.services.steve_client import steve_client transactions = await steve_client.get_transactions() if not transactions: return {"status": "no_data"} synced = 0 for txn in transactions: txn_id = txn.get("transactionId") or txn.get("id") if not txn_id: continue # 이미 기록된 트랜잭션인지 확인 result = await db.execute( select(ChargingSession).where( ChargingSession.ocpp_transaction_id == txn_id ) ) session = result.scalar_one_or_none() if session and session.status == SessionStatus.COMPLETED: continue # TODO: 트랜잭션 데이터를 세션에 반영 synced += 1 return {"status": "ok", "synced": synced} # ── 유틸 ── async def _get_charger(db: AsyncSession, charge_box_id: str): result = await db.execute( select(Charger).where(Charger.charge_box_id == charge_box_id) ) return result.scalar_one_or_none()