Files
ev-charging-backend/ocpp_sniffer.py
2026-04-18 05:59:31 +09:00

198 lines
6.5 KiB
Python

"""OCPP WebSocket 메시지 스니퍼
실제 충전기가 보내는 OCPP 메시지를 캡처하여 JSON으로 저장.
충전기 → 이 프록시 → Steve 서버로 중계하면서 모든 메시지를 로깅.
사용법:
pip3 install websockets
python3 ocpp_sniffer.py
충전기 WebSocket URL을 이 프록시로 변경:
기존: wss://s1.byunc.com/steve/websocket/CentralSystemService/{id}
변경: ws://192.168.0.114:9000/{id}
프록시가 메시지를 캡처한 후 Steve로 전달.
"""
import asyncio
import websockets
import json
import os
from datetime import datetime
# ── 설정 ──
PROXY_PORT = 9000
STEVE_WS_URL = "ws://localhost:8180/steve/websocket/CentralSystemService"
LOG_DIR = "ocpp_captures"
os.makedirs(LOG_DIR, exist_ok=True)
# OCPP 메시지 타입
MSG_TYPES = {2: "CALL", 3: "CALLRESULT", 4: "CALLERROR"}
# 색상
C = "\033[96m"
G = "\033[92m"
Y = "\033[93m"
R = "\033[91m"
M = "\033[95m"
E = "\033[0m"
DIM = "\033[2m"
def parse_ocpp_message(raw):
"""OCPP JSON 메시지 파싱"""
try:
msg = json.loads(raw)
if not isinstance(msg, list) or len(msg) < 3:
return {"type": "unknown", "raw": raw}
msg_type = MSG_TYPES.get(msg[0], f"TYPE_{msg[0]}")
msg_id = msg[1]
if msg[0] == 2: # CALL (요청)
return {
"type": msg_type,
"id": msg_id,
"action": msg[2],
"payload": msg[3] if len(msg) > 3 else {},
}
elif msg[0] == 3: # CALLRESULT (응답)
return {
"type": msg_type,
"id": msg_id,
"payload": msg[2] if len(msg) > 2 else {},
}
elif msg[0] == 4: # CALLERROR (에러)
return {
"type": msg_type,
"id": msg_id,
"error_code": msg[2] if len(msg) > 2 else "",
"error_desc": msg[3] if len(msg) > 3 else "",
"error_detail": msg[4] if len(msg) > 4 else {},
}
except json.JSONDecodeError:
return {"type": "parse_error", "raw": raw}
def log_message(direction, charger_id, parsed, raw):
"""메시지 콘솔 출력 + 파일 저장"""
now = datetime.now()
timestamp = now.strftime("%H:%M:%S.%f")[:-3]
# 콘솔 출력
arrow = f"{G}▶ CP→CS{E}" if direction == "charger" else f"{C}◀ CS→CP{E}"
action = parsed.get("action", parsed.get("type", "?"))
msg_type = parsed.get("type", "?")
print(f"\n{DIM}{timestamp}{E} {arrow} {Y}[{msg_type}]{E} {M}{action}{E}")
print(f" {json.dumps(parsed.get('payload', {}), indent=2, ensure_ascii=False)}")
# 파일 저장
log_entry = {
"timestamp": now.isoformat(),
"direction": direction,
"charger_id": charger_id,
"message_type": msg_type,
"action": parsed.get("action"),
"message_id": parsed.get("id"),
"payload": parsed.get("payload", {}),
"raw": raw,
}
# 날짜별 파일
filename = f"{LOG_DIR}/{charger_id}_{now.strftime('%Y%m%d')}.jsonl"
with open(filename, "a", encoding="utf-8") as f:
f.write(json.dumps(log_entry, ensure_ascii=False) + "\n")
# 액션별 파일 (데이터 구조 분석용)
if parsed.get("action"):
action_file = f"{LOG_DIR}/_actions_{parsed['action']}.jsonl"
with open(action_file, "a", encoding="utf-8") as f:
f.write(json.dumps(log_entry, ensure_ascii=False) + "\n")
async def proxy_handler(ws_client, path):
"""충전기 → 프록시 → Steve 중계"""
# path에서 charger ID 추출: /CHARGER_001 → CHARGER_001
charger_id = path.strip("/")
if not charger_id:
charger_id = "unknown"
steve_url = f"{STEVE_WS_URL}/{charger_id}"
print(f"\n{'='*60}")
print(f"{G}충전기 연결: {charger_id}{E}")
print(f"{DIM}Steve 연결: {steve_url}{E}")
print(f"{'='*60}")
try:
async with websockets.connect(
steve_url,
subprotocols=["ocpp1.6"],
ping_interval=30,
ping_timeout=10,
) as ws_steve:
async def charger_to_steve():
"""충전기 → Steve 방향"""
async for message in ws_client:
parsed = parse_ocpp_message(message)
log_message("charger", charger_id, parsed, message)
await ws_steve.send(message)
async def steve_to_charger():
"""Steve → 충전기 방향"""
async for message in ws_steve:
parsed = parse_ocpp_message(message)
log_message("steve", charger_id, parsed, message)
await ws_client.send(message)
# 양방향 동시 중계
await asyncio.gather(
charger_to_steve(),
steve_to_charger(),
)
except websockets.exceptions.ConnectionClosed as e:
print(f"\n{R}연결 종료: {charger_id}{e}{E}")
except Exception as e:
print(f"\n{R}에러: {charger_id}{e}{E}")
finally:
print(f"{Y}충전기 연결 해제: {charger_id}{E}")
async def main():
print(f"""
{C}╔══════════════════════════════════════════════╗
║ OCPP WebSocket 메시지 스니퍼 v1.0 ║
║ 충전기 ↔ Steve 양방향 캡처 ║
╠══════════════════════════════════════════════╣
║ 프록시 포트 : {PROXY_PORT}
║ Steve 서버 : {STEVE_WS_URL}
║ 로그 폴더 : {LOG_DIR}/ ║
╠══════════════════════════════════════════════╣
║ 충전기 URL을 아래로 변경: ║
║ ws://192.168.0.114:{PROXY_PORT}/CHARGER_ID ║
╚══════════════════════════════════════════════╝{E}
""")
server = await websockets.serve(
proxy_handler,
"0.0.0.0",
PROXY_PORT,
subprotocols=["ocpp1.6"],
ping_interval=30,
ping_timeout=10,
)
print(f"{G}스니퍼 대기 중... (Ctrl+C로 종료){E}\n")
await server.wait_closed()
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
print(f"\n{Y}스니퍼 종료{E}")