95 lines
3.1 KiB
Python
95 lines
3.1 KiB
Python
"""백그라운드 태스크 — Steve 폴링 + 세션 정리
|
|
|
|
Steve 서버에 webhook이 없는 경우,
|
|
APScheduler로 주기적으로 트랜잭션 데이터 동기화.
|
|
"""
|
|
|
|
import logging
|
|
from datetime import datetime, timezone, timedelta
|
|
|
|
from sqlalchemy import select, and_
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
|
|
from app.database import AsyncSessionLocal
|
|
from app.models import Charger, ChargingSession, SessionStatus
|
|
from app.services.steve_client import steve_client
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
async def poll_steve_heartbeats():
|
|
"""Steve에서 충전기 상태 폴링 (60초 간격 권장)"""
|
|
try:
|
|
charge_points = await steve_client.get_charge_points()
|
|
if not charge_points:
|
|
return
|
|
|
|
async with AsyncSessionLocal() as db:
|
|
for cp in charge_points:
|
|
cp_id = cp.get("chargeBoxId") or cp.get("chargePointId")
|
|
if not cp_id:
|
|
continue
|
|
|
|
result = await db.execute(
|
|
select(Charger).where(Charger.charge_box_id == cp_id)
|
|
)
|
|
charger = result.scalar_one_or_none()
|
|
if charger:
|
|
charger.last_heartbeat = datetime.now(timezone.utc)
|
|
|
|
await db.commit()
|
|
except Exception as e:
|
|
logger.error(f"Steve 폴링 실패: {e}")
|
|
|
|
|
|
async def cleanup_stale_sessions():
|
|
"""오래된 PENDING 세션 정리 (10분 초과 시 취소)"""
|
|
try:
|
|
cutoff = datetime.now(timezone.utc) - timedelta(minutes=10)
|
|
|
|
async with AsyncSessionLocal() as db:
|
|
result = await db.execute(
|
|
select(ChargingSession).where(
|
|
and_(
|
|
ChargingSession.status == SessionStatus.PENDING,
|
|
ChargingSession.created_at < cutoff,
|
|
)
|
|
)
|
|
)
|
|
stale = result.scalars().all()
|
|
|
|
for session in stale:
|
|
session.status = SessionStatus.CANCELLED
|
|
logger.info(f"만료 세션 취소: {session.session_uid}")
|
|
|
|
if stale:
|
|
await db.commit()
|
|
logger.info(f"만료 세션 {len(stale)}건 정리 완료")
|
|
except Exception as e:
|
|
logger.error(f"세션 정리 실패: {e}")
|
|
|
|
|
|
async def check_long_charging_sessions():
|
|
"""12시간 이상 충전 중인 세션 경고"""
|
|
try:
|
|
cutoff = datetime.now(timezone.utc) - timedelta(hours=12)
|
|
|
|
async with AsyncSessionLocal() as db:
|
|
result = await db.execute(
|
|
select(ChargingSession).where(
|
|
and_(
|
|
ChargingSession.status == SessionStatus.CHARGING,
|
|
ChargingSession.started_at < cutoff,
|
|
)
|
|
)
|
|
)
|
|
long_sessions = result.scalars().all()
|
|
|
|
for session in long_sessions:
|
|
logger.warning(
|
|
f"장시간 충전: {session.session_uid} "
|
|
f"시작: {session.started_at}"
|
|
)
|
|
except Exception as e:
|
|
logger.error(f"장시간 충전 체크 실패: {e}")
|