"""OCPP 캡처 데이터 분석기 ocpp_sniffer.py로 캡처한 메시지를 분석하여 충전기가 실제로 보내는 데이터 구조를 요약. 사용법: python3 ocpp_analyzer.py python3 ocpp_analyzer.py ocpp_captures/CHARGER_001_20260405.jsonl """ import json import sys import os from collections import defaultdict from datetime import datetime G = "\033[92m" Y = "\033[93m" C = "\033[96m" M = "\033[95m" R = "\033[91m" DIM = "\033[2m" E = "\033[0m" BOLD = "\033[1m" LOG_DIR = "ocpp_captures" def analyze_file(filepath): """JSONL 파일 분석""" messages = [] with open(filepath, "r", encoding="utf-8") as f: for line in f: line = line.strip() if line: messages.append(json.loads(line)) return messages def summarize(messages): """메시지 요약 분석""" # 액션별 분류 actions = defaultdict(list) for msg in messages: action = msg.get("action") or msg.get("message_type", "unknown") actions[action].append(msg) print(f"\n{C}{'═'*60}") print(f" OCPP 메시지 분석 결과") print(f" 총 {len(messages)}개 메시지 / {len(actions)}종 액션") print(f"{'═'*60}{E}\n") for action, msgs in sorted(actions.items()): charger_msgs = [m for m in msgs if m.get("direction") == "charger"] steve_msgs = [m for m in msgs if m.get("direction") == "steve"] print(f"{Y}{'─'*60}") print(f" {BOLD}{action}{E}{Y} ({len(msgs)}건 — CP→CS: {len(charger_msgs)}, CS→CP: {len(steve_msgs)})") print(f"{'─'*60}{E}") # 페이로드 구조 분석 all_keys = set() sample_payload = None for msg in msgs: payload = msg.get("payload", {}) if isinstance(payload, dict): all_keys.update(payload.keys()) if sample_payload is None and payload: sample_payload = payload if all_keys: print(f"\n {C}필드 목록:{E}") for key in sorted(all_keys): # 각 필드의 값 샘플 수집 values = set() for msg in msgs: p = msg.get("payload", {}) if isinstance(p, dict) and key in p: v = p[key] if isinstance(v, (str, int, float, bool)): values.add(str(v)) elif isinstance(v, list): values.add(f"[list, len={len(v)}]") elif isinstance(v, dict): values.add(f"{{dict, keys={list(v.keys())}}}") vals_str = ", ".join(list(values)[:5]) if len(values) > 5: vals_str += f" ... (+{len(values)-5}개)" print(f" {G}{key}{E}: {DIM}{vals_str}{E}") # MeterValues 상세 분석 if action == "MeterValues": print(f"\n {M}MeterValues 상세 — measurand 목록:{E}") measurands = defaultdict(list) for msg in msgs: payload = msg.get("payload", {}) meter_values = payload.get("meterValue", []) for mv in meter_values: sampled = mv.get("sampledValue", []) for sv in sampled: m_name = sv.get("measurand", "Energy.Active.Import.Register") measurands[m_name].append({ "value": sv.get("value"), "unit": sv.get("unit"), "phase": sv.get("phase"), "context": sv.get("context"), "format": sv.get("format"), "location": sv.get("location"), }) for m_name, samples in sorted(measurands.items()): units = set(s["unit"] for s in samples if s["unit"]) phases = set(s["phase"] for s in samples if s["phase"]) values = [s["value"] for s in samples if s["value"]] unit_str = ", ".join(units) if units else "없음" phase_str = ", ".join(phases) if phases else "전체" val_range = "" if values: try: nums = [float(v) for v in values] val_range = f"범위: {min(nums):.1f} ~ {max(nums):.1f}" except ValueError: val_range = f"샘플: {values[0]}" print(f" {G}{m_name}{E}") print(f" 단위: {unit_str} | 위상: {phase_str} | {val_range} | {len(samples)}건") # StatusNotification 상세 if action == "StatusNotification": print(f"\n {M}StatusNotification 상세:{E}") statuses = defaultdict(int) errors = defaultdict(int) vendor_errors = set() for msg in msgs: p = msg.get("payload", {}) statuses[p.get("status", "?")] += 1 errors[p.get("errorCode", "?")] += 1 ve = p.get("vendorErrorCode", "") if ve: vendor_errors.add(ve) print(f" 상태: {dict(statuses)}") print(f" 에러코드: {dict(errors)}") if vendor_errors: print(f" {R}벤더 에러코드: {vendor_errors}{E}") # BootNotification 상세 if action == "BootNotification": for msg in msgs: p = msg.get("payload", {}) if msg.get("direction") == "charger": print(f"\n {M}충전기 정보:{E}") for k, v in p.items(): print(f" {G}{k}{E}: {v}") # 샘플 출력 if sample_payload: print(f"\n {DIM}샘플 페이로드:{E}") print(f" {DIM}{json.dumps(sample_payload, indent=4, ensure_ascii=False)[:500]}{E}") print() # 데이터 구조 요약 출력 print(f"\n{C}{'═'*60}") print(f" 대시보드 적용 가능한 데이터 항목") print(f"{'═'*60}{E}\n") print(f" 위 분석 결과를 바탕으로 dashboard에 추가할 수 있는 항목:") print(f" - MeterValues의 measurand 목록 → 실시간 모니터링 차트") print(f" - StatusNotification의 에러코드 → 충전기 고장 알림") print(f" - BootNotification의 펌웨어 정보 → 충전기 상세 정보") print(f" - vendorErrorCode → 제조사 전용 에러 코드 매핑") print() def main(): if len(sys.argv) > 1: filepath = sys.argv[1] if not os.path.exists(filepath): print(f"{R}파일 없음: {filepath}{E}") return messages = analyze_file(filepath) summarize(messages) else: # 전체 캡처 파일 분석 if not os.path.exists(LOG_DIR): print(f"{Y}캡처 폴더 없음: {LOG_DIR}/") print(f"먼저 ocpp_sniffer.py를 실행하여 메시지를 캡처하세요.{E}") return all_messages = [] files = sorted(f for f in os.listdir(LOG_DIR) if f.endswith(".jsonl") and not f.startswith("_")) if not files: print(f"{Y}캡처된 메시지가 없습니다.") print(f"먼저 ocpp_sniffer.py를 실행하여 메시지를 캡처하세요.{E}") return print(f"\n{C}캡처 파일 목록:{E}") for f in files: size = os.path.getsize(os.path.join(LOG_DIR, f)) print(f" {f} ({size:,} bytes)") for f in files: all_messages.extend(analyze_file(os.path.join(LOG_DIR, f))) summarize(all_messages) if __name__ == "__main__": main()