diff --git a/can_bin_to_csv.py b/can_bin_to_csv.py new file mode 100644 index 0000000..3358c5d --- /dev/null +++ b/can_bin_to_csv.py @@ -0,0 +1,489 @@ +#!/usr/bin/env python3 +""" +ESP32 CAN Logger bin 파일을 CSV 형식으로 변환 +- CAN Database (DBC 파일) 사용 +- Value Table (시그널 값-텍스트 매칭) 지원 +- 다양한 CSV 출력 형식 지원 +""" + +import struct +import csv +import numpy as np +from pathlib import Path +from typing import Dict, List, Optional +from cantools import database as candb +from datetime import datetime + + +class CANMessage: + """ESP32 CAN Logger의 CAN 메시지 구조체""" + STRUCT_FORMAT = ' 'CANMessage': + """바이너리 데이터에서 CANMessage 파싱""" + timestamp_us, can_id, dlc, msg_data = struct.unpack(cls.STRUCT_FORMAT, data) + return cls(timestamp_us, can_id, dlc, msg_data) + + def __repr__(self): + return f"CANMessage(id=0x{self.can_id:X}, dlc={self.dlc}, ts={self.timestamp_us})" + + +class CANBinToCSV: + """CAN bin 파일을 CSV로 변환하는 클래스""" + + def __init__(self, dbc_path: str): + """ + Args: + dbc_path: CAN Database (DBC) 파일 경로 + """ + self.db = candb.load_file(dbc_path) + print(f"✓ DBC 파일 로드: {dbc_path}") + print(f" - 메시지 수: {len(self.db.messages)}") + print(f" - 시그널 수: {sum(len(msg.signals) for msg in self.db.messages)}") + + def read_bin_file(self, bin_path: str) -> List[CANMessage]: + """ + ESP32 CAN Logger의 bin 파일 읽기 + + Args: + bin_path: bin 파일 경로 + + Returns: + CANMessage 리스트 + """ + messages = [] + + with open(bin_path, 'rb') as f: + data = f.read() + + offset = 0 + while offset + CANMessage.STRUCT_SIZE <= len(data): + msg_bytes = data[offset:offset + CANMessage.STRUCT_SIZE] + try: + msg = CANMessage.from_bytes(msg_bytes) + messages.append(msg) + except Exception as e: + print(f"⚠️ 메시지 파싱 오류 at offset {offset}: {e}") + + offset += CANMessage.STRUCT_SIZE + + print(f"✓ bin 파일 읽기 완료: {bin_path}") + print(f" - 총 메시지: {len(messages)}") + + return messages + + def decode_messages(self, messages: List[CANMessage], + apply_value_table: bool = True) -> Dict[str, Dict[str, List]]: + """ + CAN 메시지를 DBC를 사용하여 디코딩 + + Args: + messages: CANMessage 리스트 + apply_value_table: Value Table을 적용할지 여부 + + Returns: + {message_name: {signal_name: [values], 'timestamps': [us]}} 형태의 딕셔너리 + """ + decoded_data = {} + + # 각 메시지별로 초기화 + for msg_def in self.db.messages: + msg_name = msg_def.name + decoded_data[msg_name] = {'timestamps': []} + for signal in msg_def.signals: + decoded_data[msg_name][signal.name] = [] + + decode_count = 0 + for msg in messages: + try: + msg_def = self.db.get_message_by_frame_id(msg.can_id) + msg_name = msg_def.name + + decoded = msg_def.decode(msg.data) + + # 타임스탬프 저장 (마이크로초 -> 초) + decoded_data[msg_name]['timestamps'].append(msg.timestamp_us / 1e6) + + # 각 시그널 값 저장 + for signal in msg_def.signals: + signal_name = signal.name + raw_value = decoded[signal_name] + + # Value Table 적용 + if apply_value_table and signal.choices: + try: + int_value = int(raw_value) + if int_value in signal.choices: + value = signal.choices[int_value] + else: + value = raw_value + except (ValueError, TypeError): + value = raw_value + else: + value = raw_value + + decoded_data[msg_name][signal_name].append(value) + + decode_count += 1 + + except KeyError: + continue + except Exception as e: + continue + + print(f"✓ 메시지 디코딩 완료: {decode_count}/{len(messages)} 개") + + # 빈 메시지 제거 + decoded_data = {k: v for k, v in decoded_data.items() if len(v['timestamps']) > 0} + + return decoded_data + + def to_wide_csv(self, decoded_data: Dict[str, Dict[str, List]], + output_path: str, + datetime_format: bool = True): + """ + Wide 형식 CSV 생성 (각 시그널이 별도 컬럼) + + 모든 시그널이 컬럼으로 나열되는 형식 + Time, Message1.Signal1, Message1.Signal2, Message2.Signal1, ... + + Args: + decoded_data: 디코딩된 데이터 + output_path: 출력 CSV 파일 경로 + datetime_format: True면 타임스탬프를 날짜/시간으로 변환 + """ + # 모든 고유한 타임스탬프 수집 및 정렬 + all_timestamps = set() + for msg_data in decoded_data.values(): + all_timestamps.update(msg_data['timestamps']) + + all_timestamps = sorted(all_timestamps) + + # 각 타임스탬프에 대한 시그널 값 매핑 + signal_columns = [] + signal_data = {} + + for msg_name, msg_data in decoded_data.items(): + for signal_name, values in msg_data.items(): + if signal_name == 'timestamps': + continue + + col_name = f"{msg_name}.{signal_name}" + signal_columns.append(col_name) + + # 타임스탬프-값 매핑 생성 + timestamp_to_value = {} + for ts, val in zip(msg_data['timestamps'], values): + timestamp_to_value[ts] = val + + signal_data[col_name] = timestamp_to_value + + # CSV 작성 + with open(output_path, 'w', newline='', encoding='utf-8') as f: + writer = csv.writer(f) + + # 헤더 작성 + if datetime_format: + header = ['Timestamp', 'DateTime'] + signal_columns + else: + header = ['Timestamp'] + signal_columns + writer.writerow(header) + + # 데이터 작성 + for ts in all_timestamps: + if datetime_format: + dt = datetime.fromtimestamp(ts).strftime('%Y-%m-%d %H:%M:%S.%f')[:-3] + row = [f"{ts:.6f}", dt] + else: + row = [f"{ts:.6f}"] + + for col_name in signal_columns: + value = signal_data[col_name].get(ts, '') + row.append(value) + + writer.writerow(row) + + print(f"✓ Wide CSV 생성 완료: {output_path}") + print(f" - 행 수: {len(all_timestamps)}") + print(f" - 시그널 수: {len(signal_columns)}") + + def to_long_csv(self, decoded_data: Dict[str, Dict[str, List]], + output_path: str, + datetime_format: bool = True): + """ + Long 형식 CSV 생성 (각 시그널 값이 별도 행) + + Time, Message, Signal, Value + 0.001, VehicleSpeed, Speed, 50.5 + 0.001, EngineData, RPM, 2000 + + Args: + decoded_data: 디코딩된 데이터 + output_path: 출력 CSV 파일 경로 + datetime_format: True면 타임스탬프를 날짜/시간으로 변환 + """ + with open(output_path, 'w', newline='', encoding='utf-8') as f: + writer = csv.writer(f) + + # 헤더 작성 + if datetime_format: + header = ['Timestamp', 'DateTime', 'Message', 'Signal', 'Value'] + else: + header = ['Timestamp', 'Message', 'Signal', 'Value'] + writer.writerow(header) + + # 데이터 작성 + row_count = 0 + for msg_name, msg_data in decoded_data.items(): + timestamps = msg_data['timestamps'] + + for signal_name, values in msg_data.items(): + if signal_name == 'timestamps': + continue + + for ts, val in zip(timestamps, values): + if datetime_format: + dt = datetime.fromtimestamp(ts).strftime('%Y-%m-%d %H:%M:%S.%f')[:-3] + row = [f"{ts:.6f}", dt, msg_name, signal_name, val] + else: + row = [f"{ts:.6f}", msg_name, signal_name, val] + + writer.writerow(row) + row_count += 1 + + print(f"✓ Long CSV 생성 완료: {output_path}") + print(f" - 행 수: {row_count}") + + def to_message_csv(self, decoded_data: Dict[str, Dict[str, List]], + output_dir: str, + datetime_format: bool = True): + """ + 메시지별 CSV 생성 (각 CAN 메시지가 별도 CSV 파일) + + 각 메시지마다: + Time, Signal1, Signal2, Signal3, ... + + Args: + decoded_data: 디코딩된 데이터 + output_dir: 출력 디렉토리 경로 + datetime_format: True면 타임스탬프를 날짜/시간으로 변환 + """ + output_path = Path(output_dir) + output_path.mkdir(parents=True, exist_ok=True) + + file_count = 0 + for msg_name, msg_data in decoded_data.items(): + # 파일명에 사용할 수 없는 문자 제거 + safe_name = msg_name.replace('/', '_').replace('\\', '_') + csv_path = output_path / f"{safe_name}.csv" + + with open(csv_path, 'w', newline='', encoding='utf-8') as f: + writer = csv.writer(f) + + # 헤더 작성 + signal_names = [k for k in msg_data.keys() if k != 'timestamps'] + if datetime_format: + header = ['Timestamp', 'DateTime'] + signal_names + else: + header = ['Timestamp'] + signal_names + writer.writerow(header) + + # 데이터 작성 + timestamps = msg_data['timestamps'] + for i, ts in enumerate(timestamps): + if datetime_format: + dt = datetime.fromtimestamp(ts).strftime('%Y-%m-%d %H:%M:%S.%f')[:-3] + row = [f"{ts:.6f}", dt] + else: + row = [f"{ts:.6f}"] + + for signal_name in signal_names: + row.append(msg_data[signal_name][i]) + + writer.writerow(row) + + file_count += 1 + + print(f"✓ 메시지별 CSV 생성 완료: {output_dir}") + print(f" - 파일 수: {file_count}") + + def to_raw_csv(self, messages: List[CANMessage], output_path: str): + """ + 원본 CAN 메시지 CSV 생성 (디코딩 없이 raw 데이터) + + Timestamp, DateTime, CAN_ID, DLC, Data + + Args: + messages: CANMessage 리스트 + output_path: 출력 CSV 파일 경로 + """ + with open(output_path, 'w', newline='', encoding='utf-8') as f: + writer = csv.writer(f) + + # 헤더 + writer.writerow(['Timestamp', 'DateTime', 'CAN_ID', 'DLC', 'Data']) + + # 데이터 + for msg in messages: + ts = msg.timestamp_us / 1e6 + dt = datetime.fromtimestamp(ts).strftime('%Y-%m-%d %H:%M:%S.%f')[:-3] + can_id = f"0x{msg.can_id:03X}" + data_hex = ' '.join([f"{b:02X}" for b in msg.data]) + + writer.writerow([f"{ts:.6f}", dt, can_id, msg.dlc, data_hex]) + + print(f"✓ Raw CSV 생성 완료: {output_path}") + print(f" - 행 수: {len(messages)}") + + def convert(self, bin_path: str, output_path: str, + format: str = 'wide', + apply_value_table: bool = True, + datetime_format: bool = True): + """ + bin 파일을 CSV로 변환 (전체 프로세스) + + Args: + bin_path: 입력 bin 파일 경로 + output_path: 출력 경로 (파일 또는 디렉토리) + format: CSV 형식 ('wide', 'long', 'message', 'raw') + apply_value_table: Value Table 적용 여부 + datetime_format: 날짜/시간 포맷 포함 여부 + """ + print(f"\n{'='*60}") + print(f"ESP32 CAN bin → CSV 변환") + print(f"{'='*60}") + + # 1. bin 파일 읽기 + messages = self.read_bin_file(bin_path) + + if not messages: + print("✗ 메시지가 없습니다.") + return + + # 시간 범위 출력 + start_time = messages[0].timestamp_us / 1e6 + end_time = messages[-1].timestamp_us / 1e6 + duration = end_time - start_time + + print(f" - 시작 시간: {datetime.fromtimestamp(start_time)}") + print(f" - 종료 시간: {datetime.fromtimestamp(end_time)}") + print(f" - 기간: {duration:.2f} 초") + + # Raw 형식은 디코딩 없이 바로 변환 + if format == 'raw': + self.to_raw_csv(messages, output_path) + else: + # 2. 메시지 디코딩 + decoded_data = self.decode_messages(messages, apply_value_table) + + if not decoded_data: + print("✗ 디코딩된 데이터가 없습니다.") + return + + # 3. CSV 생성 + if format == 'wide': + self.to_wide_csv(decoded_data, output_path, datetime_format) + elif format == 'long': + self.to_long_csv(decoded_data, output_path, datetime_format) + elif format == 'message': + self.to_message_csv(decoded_data, output_path, datetime_format) + else: + print(f"✗ 알 수 없는 형식: {format}") + return + + print(f"{'='*60}") + print(f"✓ 변환 완료!") + print(f"{'='*60}\n") + + +def main(): + """사용 예제""" + import argparse + + parser = argparse.ArgumentParser( + description='ESP32 CAN Logger bin 파일을 CSV로 변환', + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=""" +CSV 형식: + wide : 모든 시그널이 컬럼으로 나열 (기본값, 분석에 적합) + long : 각 시그널 값이 별도 행 (데이터베이스 저장에 적합) + message : 메시지별 개별 CSV 파일 생성 + raw : 원본 CAN 메시지 (디코딩 없음) + +사용 예제: + python can_bin_to_csv.py -d example.dbc -i data.bin -o output.csv + python can_bin_to_csv.py -d example.dbc -i data.bin -o output.csv -f long + python can_bin_to_csv.py -d example.dbc -i data.bin -o csv_files/ -f message + python can_bin_to_csv.py -d example.dbc -i data.bin -o raw.csv -f raw + """ + ) + + parser.add_argument('-d', '--dbc', required=True, + help='CAN Database (DBC) 파일 경로') + parser.add_argument('-i', '--input', required=True, nargs='+', + help='입력 bin 파일(들) 경로') + parser.add_argument('-o', '--output', required=True, + help='출력 CSV 파일 경로 또는 디렉토리') + parser.add_argument('-f', '--format', + choices=['wide', 'long', 'message', 'raw'], + default='wide', + help='CSV 형식 (기본값: wide)') + parser.add_argument('--no-value-table', action='store_true', + help='Value Table을 적용하지 않음 (숫자만 출력)') + parser.add_argument('--no-datetime', action='store_true', + help='날짜/시간 컬럼을 포함하지 않음') + + args = parser.parse_args() + + # 변환기 초기화 + converter = CANBinToCSV(args.dbc) + + # 출력 경로 처리 + output_path = Path(args.output) + + # 여러 파일 변환 + for input_file in args.input: + input_path = Path(input_file) + + if not input_path.exists(): + print(f"✗ 파일을 찾을 수 없음: {input_file}") + continue + + # 출력 파일 경로 결정 + if args.format == 'message': + # message 형식은 디렉토리 필요 + if output_path.is_file(): + output_file = output_path.parent / input_path.stem + else: + output_file = output_path / input_path.stem + else: + if output_path.is_dir() or str(output_path) == '/': + output_file = input_path.parent / f"{input_path.stem}.csv" + else: + output_file = output_path + + # 변환 실행 + try: + converter.convert( + str(input_path), + str(output_file), + format=args.format, + apply_value_table=not args.no_value_table, + datetime_format=not args.no_datetime + ) + except Exception as e: + print(f"✗ 변환 실패 ({input_file}): {e}") + import traceback + traceback.print_exc() + + +if __name__ == "__main__": + main() diff --git a/can_bin_to_mdf.py b/can_bin_to_mdf.py new file mode 100644 index 0000000..9e8aa0d --- /dev/null +++ b/can_bin_to_mdf.py @@ -0,0 +1,451 @@ +#!/usr/bin/env python3 +""" +ESP32 CAN Logger bin 파일을 MDF 형식으로 변환 +- CAN Database (DBC 파일) 사용 +- Value Table (시그널 값-텍스트 매칭) 지원 (올바른 conversion으로 저장) +- asammdf 라이브러리 사용 +""" + +import struct +import numpy as np +from pathlib import Path +from typing import Dict, List, Tuple, Optional +from asammdf import MDF, Signal +from cantools import database as candb +from datetime import datetime + + +class CANMessage: + """ESP32 CAN Logger의 CAN 메시지 구조체""" + STRUCT_FORMAT = ' 'CANMessage': + """바이너리 데이터에서 CANMessage 파싱""" + timestamp_us, can_id, dlc, msg_data = struct.unpack(cls.STRUCT_FORMAT, data) + return cls(timestamp_us, can_id, dlc, msg_data) + + def __repr__(self): + return f"CANMessage(id=0x{self.can_id:X}, dlc={self.dlc}, ts={self.timestamp_us})" + + +class CANBinToMDF: + """CAN bin 파일을 MDF로 변환하는 클래스""" + + def __init__(self, dbc_path: str): + """ + Args: + dbc_path: CAN Database (DBC) 파일 경로 + """ + self.db = candb.load_file(dbc_path) + print(f"✓ DBC 파일 로드: {dbc_path}") + print(f" - 메시지 수: {len(self.db.messages)}") + print(f" - 시그널 수: {sum(len(msg.signals) for msg in self.db.messages)}") + + def read_bin_file(self, bin_path: str) -> List[CANMessage]: + """ + ESP32 CAN Logger의 bin 파일 읽기 + + Args: + bin_path: bin 파일 경로 + + Returns: + CANMessage 리스트 + """ + messages = [] + + with open(bin_path, 'rb') as f: + data = f.read() + + # 전체 데이터를 CANMessage 단위로 파싱 + offset = 0 + while offset + CANMessage.STRUCT_SIZE <= len(data): + msg_bytes = data[offset:offset + CANMessage.STRUCT_SIZE] + try: + msg = CANMessage.from_bytes(msg_bytes) + messages.append(msg) + except Exception as e: + print(f"⚠️ 메시지 파싱 오류 at offset {offset}: {e}") + + offset += CANMessage.STRUCT_SIZE + + print(f"✓ bin 파일 읽기 완료: {bin_path}") + print(f" - 총 메시지: {len(messages)}") + + return messages + + def decode_messages(self, messages: List[CANMessage]) -> Dict[str, Dict[str, List]]: + """ + CAN 메시지를 DBC를 사용하여 디코딩 + + Args: + messages: CANMessage 리스트 + + Returns: + {message_name: {signal_name: [values], 'timestamps': [us]}} 형태의 딕셔너리 + """ + decoded_data = {} + + # 각 메시지별로 초기화 + for msg_def in self.db.messages: + msg_name = msg_def.name + decoded_data[msg_name] = {'timestamps': []} + for signal in msg_def.signals: + decoded_data[msg_name][signal.name] = [] + + # 메시지 디코딩 + decode_count = 0 + + # 첫 번째 타임스탬프를 기준점으로 설정 (0초 시작) + start_timestamp = messages[0].timestamp_us / 1e6 if messages else 0 + + for msg in messages: + try: + # DBC에서 해당 CAN ID의 메시지 정의 찾기 + msg_def = self.db.get_message_by_frame_id(msg.can_id) + msg_name = msg_def.name + + # 메시지 디코딩 + decoded = msg_def.decode(msg.data) + + # 타임스탬프 저장 (마이크로초 -> 초, 0부터 시작) + relative_time = (msg.timestamp_us / 1e6) - start_timestamp + decoded_data[msg_name]['timestamps'].append(relative_time) + + # 각 시그널 값 저장 (원본 숫자 값 유지) + for signal in msg_def.signals: + signal_name = signal.name + raw_value = decoded[signal_name] + + # NamedSignalValue 객체인 경우 숫자 값으로 변환 + # (value table이 있는 시그널은 NamedSignalValue로 반환됨) + if hasattr(raw_value, 'value'): + # NamedSignalValue 객체 - 숫자 값 추출 + numeric_value = raw_value.value + else: + # 일반 숫자 값 + numeric_value = raw_value + + # 항상 원본 숫자 값을 저장 + decoded_data[msg_name][signal_name].append(numeric_value) + + decode_count += 1 + + except KeyError: + # DBC에 정의되지 않은 CAN ID는 무시 + continue + except Exception as e: + # print(f"⚠️ 디코딩 오류 (ID: 0x{msg.can_id:X}): {e}") + continue + + print(f"✓ 메시지 디코딩 완료: {decode_count}/{len(messages)} 개") + + # 빈 메시지 제거 + decoded_data = {k: v for k, v in decoded_data.items() if len(v['timestamps']) > 0} + + return decoded_data + + def create_mdf(self, decoded_data: Dict[str, Dict[str, List]], + output_path: str, + metadata: Optional[Dict] = None, + compression: int = 2): + """ + 디코딩된 데이터로부터 MDF 파일 생성 + + Args: + decoded_data: decode_messages()의 반환값 + output_path: 출력 MDF 파일 경로 + metadata: MDF 파일 메타데이터 (선택) + compression: 압축 레벨 (0=없음, 1=deflate, 2=transposed, 기본값=2) + """ + signals = [] + skipped_signals = [] + value_table_signals = [] # value table이 있는 시그널 추적 + + for msg_name, msg_data in decoded_data.items(): + timestamps = np.array(msg_data['timestamps'], dtype=np.float64) + + # 메시지 정의 가져오기 + try: + msg_def = self.db.get_message_by_name(msg_name) + except: + continue + + for signal_name, values in msg_data.items(): + if signal_name == 'timestamps': + continue + + # 시그널 정의 가져오기 + signal_def = None + for sig in msg_def.signals: + if sig.name == signal_name: + signal_def = sig + break + + if signal_def is None: + continue + + # 값의 데이터 타입 확인 + if len(values) == 0: + continue + + # numpy array로 변환 시도 + try: + # 먼저 리스트로 변환하여 타입 확인 + test_array = np.array(values) + + # object dtype이면 강제로 float로 변환 시도 + if test_array.dtype == np.object_: + # 각 값을 float로 변환 + float_values = [] + conversion_failed = False + for v in values: + try: + float_values.append(float(v)) + except (ValueError, TypeError): + conversion_failed = True + break + + if conversion_failed: + skipped_signals.append(f"{msg_name}.{signal_name}") + continue + + samples = np.array(float_values, dtype=np.float64) + else: + # 정상적인 숫자 타입이면 그대로 사용 + samples = np.array(values, dtype=np.float64) + + except Exception as e: + skipped_signals.append(f"{msg_name}.{signal_name}") + print(f"⚠️ 시그널 변환 실패 ({msg_name}.{signal_name}): {e}") + continue + + # Signal 객체 생성 시 conversion 적용 + comment = signal_def.comment or "" + conversion = None + + # Value Table이 있는 경우 conversion 적용 + # asammdf 공식 예제 형식을 정확히 따름 + if signal_def.choices: + value_table_signals.append(f"{msg_name}.{signal_name}") + + # value-to-text conversion 생성 + # 형식: {'val_0': 숫자, 'text_0': bytes, ..., 'default': bytes} + conversion = {} + + # choices를 정렬하여 순서대로 처리 + for idx, (raw_val, text_val) in enumerate(sorted(signal_def.choices.items())): + conversion[f'val_{idx}'] = int(raw_val) + # 텍스트는 반드시 bytes로 변환 + if isinstance(text_val, str): + conversion[f'text_{idx}'] = text_val.encode('utf-8') + elif isinstance(text_val, bytes): + conversion[f'text_{idx}'] = text_val + else: + conversion[f'text_{idx}'] = str(text_val).encode('utf-8') + + # 매칭되지 않는 값에 대한 기본값 (bytes) + conversion['default'] = b'' + + # Value table 정보를 주석에도 추가 + value_table_str = "; ".join([f"{k}={v}" for k, v in sorted(signal_def.choices.items())]) + if comment: + comment += f" [Values: {value_table_str}]" + else: + comment = f"Values: {value_table_str}" + + # Signal 객체 생성 + signal = Signal( + samples=samples, + timestamps=timestamps, + name=f"{msg_name}.{signal_name}", + unit=signal_def.unit or "", + comment=comment, + conversion=conversion # conversion 추가 + ) + + signals.append(signal) + + # MDF 파일 생성 + if metadata is None: + metadata = {} + + default_metadata = { + 'author': 'ESP32 CAN Logger', + 'department': '', + 'project': 'CAN Data Logging', + 'subject': 'CAN Bus Data', + } + default_metadata.update(metadata) + + mdf = MDF(version='4.10') + + # 메타데이터 설정 + for key, value in default_metadata.items(): + if hasattr(mdf.header, key): + setattr(mdf.header, key, value) + + # 시그널 추가 + if signals: + mdf.append(signals) + + # MDF 파일 저장 (압축 적용) + if compression > 0: + print(f" 압축 중... (레벨 {compression})") + + # 압축 옵션 + # 0 = 압축 없음 + # 1 = deflate (표준 압축) + # 2 = transposed deflate (최고 압축률, 기본값) + mdf.save( + output_path, + overwrite=True, + compression=compression, + ) + + # 파일 크기 확인 + import os + compressed_size = os.path.getsize(output_path) / (1024 * 1024) # MB + + compression_names = {0: "압축 없음", 1: "deflate", 2: "transposed"} + compression_name = compression_names.get(compression, f"레벨 {compression}") + + print(f"✓ MDF 파일 생성 완료: {output_path}") + print(f" - 시그널 수: {len(signals)}") + print(f" - Value Table 적용 시그널: {len(value_table_signals)}개") + print(f" - 파일 크기: {compressed_size:.2f} MB ({compression_name})") + if skipped_signals: + print(f" ⚠️ 건너뛴 시그널: {len(skipped_signals)}개") + if len(skipped_signals) <= 10: + for sig in skipped_signals: + print(f" - {sig}") + else: + for sig in skipped_signals[:5]: + print(f" - {sig}") + print(f" ... 외 {len(skipped_signals) - 5}개") + print(f" 💡 Value Table이 적용된 시그널은 MDF 뷰어에서 텍스트로 표시됩니다.") + + def convert(self, bin_path: str, output_path: str, + metadata: Optional[Dict] = None, + compression: int = 2): + """ + bin 파일을 MDF로 변환 (전체 프로세스) + + Args: + bin_path: 입력 bin 파일 경로 + output_path: 출력 MDF 파일 경로 + metadata: MDF 메타데이터 (선택) + compression: 압축 레벨 (0=없음, 1=deflate, 2=transposed, 기본값=2) + """ + print(f"\n{'='*60}") + print(f"ESP32 CAN bin → MDF 변환") + print(f"{'='*60}") + + # 1. bin 파일 읽기 + messages = self.read_bin_file(bin_path) + + if not messages: + print("✗ 메시지가 없습니다.") + return + + # 시간 범위 출력 + start_time = messages[0].timestamp_us / 1e6 + end_time = messages[-1].timestamp_us / 1e6 + duration = end_time - start_time + + print(f" - 시작 시간: {datetime.fromtimestamp(start_time)}") + print(f" - 종료 시간: {datetime.fromtimestamp(end_time)}") + print(f" - 기간: {duration:.2f} 초") + + # 2. 메시지 디코딩 + decoded_data = self.decode_messages(messages) + + if not decoded_data: + print("✗ 디코딩된 데이터가 없습니다.") + return + + # 3. MDF 생성 + if metadata is None: + metadata = { + 'comment': f"Converted from {Path(bin_path).name}", + } + + self.create_mdf(decoded_data, output_path, metadata, compression) + + print(f"{'='*60}") + print(f"✓ 변환 완료!") + print(f"{'='*60}\n") + + +def main(): + """사용 예제""" + import argparse + + parser = argparse.ArgumentParser( + description='ESP32 CAN Logger bin 파일을 MDF로 변환 (Value Table 지원)', + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=""" +사용 예제: + python can_bin_to_mdf_final.py -d example.dbc -i canlog_001.bin -o output.mf4 + python can_bin_to_mdf_final.py -d vehicle.dbc -i data/*.bin -o output/ + """ + ) + + parser.add_argument('-d', '--dbc', required=True, + help='CAN Database (DBC) 파일 경로') + parser.add_argument('-i', '--input', required=True, nargs='+', + help='입력 bin 파일(들) 경로') + parser.add_argument('-o', '--output', required=True, + help='출력 MDF 파일 경로 또는 디렉토리') + parser.add_argument('-m', '--metadata', + help='MDF 메타데이터 (JSON 형식)') + parser.add_argument('-c', '--compression', type=int, default=2, + choices=[0, 1, 2], + help='압축 레벨: 0=없음, 1=deflate, 2=transposed (기본값=2, 최고 압축)') + + args = parser.parse_args() + + # 변환기 초기화 + converter = CANBinToMDF(args.dbc) + + # 메타데이터 파싱 + metadata = None + if args.metadata: + import json + metadata = json.loads(args.metadata) + + # 출력 경로 처리 + output_path = Path(args.output) + + # 여러 파일 변환 + for input_file in args.input: + input_path = Path(input_file) + + if not input_path.exists(): + print(f"✗ 파일을 찾을 수 없음: {input_file}") + continue + + # 출력 파일 경로 결정 + if output_path.is_dir() or str(output_path) == '/': + output_file = input_path.parent / f"{input_path.stem}.mf4" + else: + output_file = output_path + + # 변환 실행 + try: + converter.convert(str(input_path), str(output_file), metadata, args.compression) + except Exception as e: + print(f"✗ 변환 실패 ({input_file}): {e}") + import traceback + traceback.print_exc() + + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/convert_all_bins_simple_advanced.bat b/convert_all_bins_simple_advanced.bat new file mode 100644 index 0000000..0caf148 --- /dev/null +++ b/convert_all_bins_simple_advanced.bat @@ -0,0 +1,134 @@ +@echo off +REM ====================================== +REM CAN BIN to MDF Batch Converter (Advanced) +REM ====================================== + +REM Find DBC file +set DBC_FILE= +for %%f in (*.dbc) do ( + set DBC_FILE=%%f + goto found_dbc +) + +:found_dbc +if "%DBC_FILE%"=="" ( + echo [ERROR] DBC file not found! + pause + exit /b 1 +) + +echo DBC File: %DBC_FILE% +echo. + +REM Count BIN files +set COUNT=0 +for %%f in (*.bin) do set /a COUNT+=1 + +if %COUNT%==0 ( + echo [ERROR] No BIN files to convert! + pause + exit /b 1 +) + +echo Total files: %COUNT% +echo. + +REM Select compression level +echo Select compression level: +echo ====================================== +echo [0] No compression (Fast read, Large file) +echo - Read speed: ***** (Very fast) +echo - File size: 171MB +echo - Use case: Immediate analysis +echo. +echo [1] deflate compression (Balanced) +echo - Read speed: *** (Normal) +echo - File size: 3.6MB (98%% compression) +echo - Use case: General use +echo. +echo [2] transposed compression (Maximum) +echo - Read speed: * (Slow) +echo - File size: 1.2MB (99%% compression) +echo - Use case: Long-term archiving +echo ====================================== +echo. + +set /p COMPRESSION="Select (0/1/2) [default=0]: " +if "%COMPRESSION%"=="" set COMPRESSION=0 + +if "%COMPRESSION%"=="0" ( + echo Selected: No compression - Fast read +) else if "%COMPRESSION%"=="1" ( + echo Selected: deflate - Balanced +) else if "%COMPRESSION%"=="2" ( + echo Selected: transposed - Maximum compression +) else ( + echo [ERROR] Invalid selection. Please select 0, 1, or 2. + pause + exit /b 1 +) + +echo. +echo Starting conversion... +echo ====================================== +echo. + +REM Record start time +set START_TIME=%TIME% + +REM Convert each BIN file +setlocal enabledelayedexpansion +set PROCESSED=0 +set SUCCESS=0 +set FAILED=0 + +for %%f in (*.bin) do ( + set /a PROCESSED+=1 + echo [!PROCESSED!/%COUNT%] Converting: %%f + + python can_bin_to_mdf.py -d "%DBC_FILE%" -i "%%f" -o "./" -c %COMPRESSION% + + if errorlevel 1 ( + echo [FAIL] %%f + set /a FAILED+=1 + ) else ( + echo [OK] %%f + set /a SUCCESS+=1 + ) + echo. +) + +REM Record end time +set END_TIME=%TIME% + +echo ====================================== +echo Conversion Complete! +echo ====================================== +echo Total: %COUNT% +echo Success: %SUCCESS% +echo Failed: %FAILED% +echo Start: %START_TIME% +echo End: %END_TIME% +echo ====================================== +echo. + +REM List generated MDF files with size +echo Generated MDF files: +for %%f in (*.mf4) do ( + set SIZE=%%~zf + set /a SIZE_MB=!SIZE! / 1048576 + echo - %%f (!SIZE_MB! MB^) +) +echo. + +REM Calculate total MDF file size +set TOTAL_SIZE=0 +for %%f in (*.mf4) do ( + set SIZE=%%~zf + set /a TOTAL_SIZE+=!SIZE! +) +set /a TOTAL_MB=!TOTAL_SIZE! / 1048576 +echo Total MDF size: !TOTAL_MB! MB +echo. + +pause diff --git a/convert_all_to_csv_advanced.bat b/convert_all_to_csv_advanced.bat new file mode 100644 index 0000000..b02eaa9 --- /dev/null +++ b/convert_all_to_csv_advanced.bat @@ -0,0 +1,156 @@ +@echo off +REM ====================================== +REM CAN BIN to CSV Batch Converter (Advanced) +REM Multiple format options +REM ====================================== + +REM Find DBC file +set DBC_FILE= +for %%f in (*.dbc) do ( + set DBC_FILE=%%f + goto found_dbc +) + +:found_dbc +if "%DBC_FILE%"=="" ( + echo [ERROR] DBC file not found! + pause + exit /b 1 +) + +echo ====================================== +echo CAN BIN to CSV Converter (Advanced) +echo ====================================== +echo. +echo DBC File: %DBC_FILE% +echo. + +REM Count BIN files +set COUNT=0 +for %%f in (*.bin) do set /a COUNT+=1 + +if %COUNT%==0 ( + echo [ERROR] No BIN files to convert! + pause + exit /b 1 +) + +echo Total files: %COUNT% +echo. + +REM Select CSV format +echo Select CSV format: +echo ====================================== +echo [1] Wide format (Excel-friendly) - RECOMMENDED +echo - One row per timestamp +echo - All signals as columns +echo - Easy to use in Excel +echo - Output: filename_wide.csv +echo. +echo [2] Long format (Database-friendly) +echo - One row per signal value +echo - Columns: Time, Signal, Value, Unit +echo - Good for databases +echo - Output: filename_long.csv +echo. +echo [3] Message format (Separate files per message) +echo - One CSV file per CAN message +echo - Organized by message +echo - Output: csv_files/MessageName_*.csv +echo. +echo [4] Raw format (No decoding) +echo - Raw CAN frames +echo - No signal decoding +echo - Output: filename_raw.csv +echo ====================================== +echo. + +set /p FORMAT="Select (1/2/3/4) [default=1]: " +if "%FORMAT%"=="" set FORMAT=1 + +if "%FORMAT%"=="1" ( + set FORMAT_NAME=wide + set OUTPUT_DIR=./ + echo Selected: Wide format - Excel-friendly +) else if "%FORMAT%"=="2" ( + set FORMAT_NAME=long + set OUTPUT_DIR=./ + echo Selected: Long format - Database-friendly +) else if "%FORMAT%"=="3" ( + set FORMAT_NAME=message + set OUTPUT_DIR=csv_files/ + echo Selected: Message format - Separate files + if not exist csv_files mkdir csv_files +) else if "%FORMAT%"=="4" ( + set FORMAT_NAME=raw + set OUTPUT_DIR=./ + echo Selected: Raw format - No decoding +) else ( + echo [ERROR] Invalid selection. Please select 1, 2, 3, or 4. + pause + exit /b 1 +) + +echo. +echo Starting conversion... +echo ====================================== +echo. + +REM Record start time +set START_TIME=%TIME% + +REM Convert each BIN file +setlocal enabledelayedexpansion +set PROCESSED=0 +set SUCCESS=0 +set FAILED=0 + +for %%f in (*.bin) do ( + set /a PROCESSED+=1 + echo [!PROCESSED!/%COUNT%] Converting: %%f + + python can_bin_to_csv.py -d "%DBC_FILE%" -i "%%f" -o "%OUTPUT_DIR%" -f %FORMAT_NAME% + + if errorlevel 1 ( + echo [FAIL] %%f + set /a FAILED+=1 + ) else ( + echo [OK] %%f + set /a SUCCESS+=1 + ) + echo. +) + +REM Record end time +set END_TIME=%TIME% + +echo ====================================== +echo Conversion Complete! +echo ====================================== +echo Total: %COUNT% +echo Success: %SUCCESS% +echo Failed: %FAILED% +echo Start: %START_TIME% +echo End: %END_TIME% +echo ====================================== +echo. + +REM List generated CSV files +if "%FORMAT%"=="3" ( + echo Generated CSV files in csv_files folder: + dir /b csv_files\*.csv 2>nul | find /c /v "" > temp_count.txt + set /p CSV_COUNT=