"""OCPP WebSocket 메시지 스니퍼 실제 충전기가 보내는 OCPP 메시지를 캡처하여 JSON으로 저장. 충전기 → 이 프록시 → Steve 서버로 중계하면서 모든 메시지를 로깅. 사용법: pip3 install websockets python3 ocpp_sniffer.py 충전기 WebSocket URL을 이 프록시로 변경: 기존: wss://s1.byunc.com/steve/websocket/CentralSystemService/{id} 변경: ws://192.168.0.114:9000/{id} 프록시가 메시지를 캡처한 후 Steve로 전달. """ import asyncio import websockets import json import os from datetime import datetime # ── 설정 ── PROXY_PORT = 9000 STEVE_WS_URL = "ws://localhost:8180/steve/websocket/CentralSystemService" LOG_DIR = "ocpp_captures" os.makedirs(LOG_DIR, exist_ok=True) # OCPP 메시지 타입 MSG_TYPES = {2: "CALL", 3: "CALLRESULT", 4: "CALLERROR"} # 색상 C = "\033[96m" G = "\033[92m" Y = "\033[93m" R = "\033[91m" M = "\033[95m" E = "\033[0m" DIM = "\033[2m" def parse_ocpp_message(raw): """OCPP JSON 메시지 파싱""" try: msg = json.loads(raw) if not isinstance(msg, list) or len(msg) < 3: return {"type": "unknown", "raw": raw} msg_type = MSG_TYPES.get(msg[0], f"TYPE_{msg[0]}") msg_id = msg[1] if msg[0] == 2: # CALL (요청) return { "type": msg_type, "id": msg_id, "action": msg[2], "payload": msg[3] if len(msg) > 3 else {}, } elif msg[0] == 3: # CALLRESULT (응답) return { "type": msg_type, "id": msg_id, "payload": msg[2] if len(msg) > 2 else {}, } elif msg[0] == 4: # CALLERROR (에러) return { "type": msg_type, "id": msg_id, "error_code": msg[2] if len(msg) > 2 else "", "error_desc": msg[3] if len(msg) > 3 else "", "error_detail": msg[4] if len(msg) > 4 else {}, } except json.JSONDecodeError: return {"type": "parse_error", "raw": raw} def log_message(direction, charger_id, parsed, raw): """메시지 콘솔 출력 + 파일 저장""" now = datetime.now() timestamp = now.strftime("%H:%M:%S.%f")[:-3] # 콘솔 출력 arrow = f"{G}▶ CP→CS{E}" if direction == "charger" else f"{C}◀ CS→CP{E}" action = parsed.get("action", parsed.get("type", "?")) msg_type = parsed.get("type", "?") print(f"\n{DIM}{timestamp}{E} {arrow} {Y}[{msg_type}]{E} {M}{action}{E}") print(f" {json.dumps(parsed.get('payload', {}), indent=2, ensure_ascii=False)}") # 파일 저장 log_entry = { "timestamp": now.isoformat(), "direction": direction, "charger_id": charger_id, "message_type": msg_type, "action": parsed.get("action"), "message_id": parsed.get("id"), "payload": parsed.get("payload", {}), "raw": raw, } # 날짜별 파일 filename = f"{LOG_DIR}/{charger_id}_{now.strftime('%Y%m%d')}.jsonl" with open(filename, "a", encoding="utf-8") as f: f.write(json.dumps(log_entry, ensure_ascii=False) + "\n") # 액션별 파일 (데이터 구조 분석용) if parsed.get("action"): action_file = f"{LOG_DIR}/_actions_{parsed['action']}.jsonl" with open(action_file, "a", encoding="utf-8") as f: f.write(json.dumps(log_entry, ensure_ascii=False) + "\n") async def proxy_handler(ws_client, path): """충전기 → 프록시 → Steve 중계""" # path에서 charger ID 추출: /CHARGER_001 → CHARGER_001 charger_id = path.strip("/") if not charger_id: charger_id = "unknown" steve_url = f"{STEVE_WS_URL}/{charger_id}" print(f"\n{'='*60}") print(f"{G}충전기 연결: {charger_id}{E}") print(f"{DIM}Steve 연결: {steve_url}{E}") print(f"{'='*60}") try: async with websockets.connect( steve_url, subprotocols=["ocpp1.6"], ping_interval=30, ping_timeout=10, ) as ws_steve: async def charger_to_steve(): """충전기 → Steve 방향""" async for message in ws_client: parsed = parse_ocpp_message(message) log_message("charger", charger_id, parsed, message) await ws_steve.send(message) async def steve_to_charger(): """Steve → 충전기 방향""" async for message in ws_steve: parsed = parse_ocpp_message(message) log_message("steve", charger_id, parsed, message) await ws_client.send(message) # 양방향 동시 중계 await asyncio.gather( charger_to_steve(), steve_to_charger(), ) except websockets.exceptions.ConnectionClosed as e: print(f"\n{R}연결 종료: {charger_id} — {e}{E}") except Exception as e: print(f"\n{R}에러: {charger_id} — {e}{E}") finally: print(f"{Y}충전기 연결 해제: {charger_id}{E}") async def main(): print(f""" {C}╔══════════════════════════════════════════════╗ ║ OCPP WebSocket 메시지 스니퍼 v1.0 ║ ║ 충전기 ↔ Steve 양방향 캡처 ║ ╠══════════════════════════════════════════════╣ ║ 프록시 포트 : {PROXY_PORT} ║ ║ Steve 서버 : {STEVE_WS_URL} ║ ║ 로그 폴더 : {LOG_DIR}/ ║ ╠══════════════════════════════════════════════╣ ║ 충전기 URL을 아래로 변경: ║ ║ ws://192.168.0.114:{PROXY_PORT}/CHARGER_ID ║ ╚══════════════════════════════════════════════╝{E} """) server = await websockets.serve( proxy_handler, "0.0.0.0", PROXY_PORT, subprotocols=["ocpp1.6"], ping_interval=30, ping_timeout=10, ) print(f"{G}스니퍼 대기 중... (Ctrl+C로 종료){E}\n") await server.wait_closed() if __name__ == "__main__": try: asyncio.run(main()) except KeyboardInterrupt: print(f"\n{Y}스니퍼 종료{E}")