변환 tools

This commit is contained in:
2025-11-02 19:21:00 +00:00
parent 23bed9688b
commit fd3065e8c9
5 changed files with 1241 additions and 0 deletions

489
can_bin_to_csv.py Normal file
View File

@@ -0,0 +1,489 @@
#!/usr/bin/env python3
"""
ESP32 CAN Logger bin 파일을 CSV 형식으로 변환
- CAN Database (DBC 파일) 사용
- Value Table (시그널 값-텍스트 매칭) 지원
- 다양한 CSV 출력 형식 지원
"""
import struct
import csv
import numpy as np
from pathlib import Path
from typing import Dict, List, Optional
from cantools import database as candb
from datetime import datetime
class CANMessage:
"""ESP32 CAN Logger의 CAN 메시지 구조체"""
STRUCT_FORMAT = '<QIB8s' # little-endian: uint64, uint32, uint8, 8*uint8
STRUCT_SIZE = struct.calcsize(STRUCT_FORMAT)
def __init__(self, timestamp_us: int, can_id: int, dlc: int, data: bytes):
self.timestamp_us = timestamp_us
self.can_id = can_id
self.dlc = dlc
self.data = data[:dlc]
@classmethod
def from_bytes(cls, data: bytes) -> 'CANMessage':
"""바이너리 데이터에서 CANMessage 파싱"""
timestamp_us, can_id, dlc, msg_data = struct.unpack(cls.STRUCT_FORMAT, data)
return cls(timestamp_us, can_id, dlc, msg_data)
def __repr__(self):
return f"CANMessage(id=0x{self.can_id:X}, dlc={self.dlc}, ts={self.timestamp_us})"
class CANBinToCSV:
"""CAN bin 파일을 CSV로 변환하는 클래스"""
def __init__(self, dbc_path: str):
"""
Args:
dbc_path: CAN Database (DBC) 파일 경로
"""
self.db = candb.load_file(dbc_path)
print(f"✓ DBC 파일 로드: {dbc_path}")
print(f" - 메시지 수: {len(self.db.messages)}")
print(f" - 시그널 수: {sum(len(msg.signals) for msg in self.db.messages)}")
def read_bin_file(self, bin_path: str) -> List[CANMessage]:
"""
ESP32 CAN Logger의 bin 파일 읽기
Args:
bin_path: bin 파일 경로
Returns:
CANMessage 리스트
"""
messages = []
with open(bin_path, 'rb') as f:
data = f.read()
offset = 0
while offset + CANMessage.STRUCT_SIZE <= len(data):
msg_bytes = data[offset:offset + CANMessage.STRUCT_SIZE]
try:
msg = CANMessage.from_bytes(msg_bytes)
messages.append(msg)
except Exception as e:
print(f"⚠️ 메시지 파싱 오류 at offset {offset}: {e}")
offset += CANMessage.STRUCT_SIZE
print(f"✓ bin 파일 읽기 완료: {bin_path}")
print(f" - 총 메시지: {len(messages)}")
return messages
def decode_messages(self, messages: List[CANMessage],
apply_value_table: bool = True) -> Dict[str, Dict[str, List]]:
"""
CAN 메시지를 DBC를 사용하여 디코딩
Args:
messages: CANMessage 리스트
apply_value_table: Value Table을 적용할지 여부
Returns:
{message_name: {signal_name: [values], 'timestamps': [us]}} 형태의 딕셔너리
"""
decoded_data = {}
# 각 메시지별로 초기화
for msg_def in self.db.messages:
msg_name = msg_def.name
decoded_data[msg_name] = {'timestamps': []}
for signal in msg_def.signals:
decoded_data[msg_name][signal.name] = []
decode_count = 0
for msg in messages:
try:
msg_def = self.db.get_message_by_frame_id(msg.can_id)
msg_name = msg_def.name
decoded = msg_def.decode(msg.data)
# 타임스탬프 저장 (마이크로초 -> 초)
decoded_data[msg_name]['timestamps'].append(msg.timestamp_us / 1e6)
# 각 시그널 값 저장
for signal in msg_def.signals:
signal_name = signal.name
raw_value = decoded[signal_name]
# Value Table 적용
if apply_value_table and signal.choices:
try:
int_value = int(raw_value)
if int_value in signal.choices:
value = signal.choices[int_value]
else:
value = raw_value
except (ValueError, TypeError):
value = raw_value
else:
value = raw_value
decoded_data[msg_name][signal_name].append(value)
decode_count += 1
except KeyError:
continue
except Exception as e:
continue
print(f"✓ 메시지 디코딩 완료: {decode_count}/{len(messages)}")
# 빈 메시지 제거
decoded_data = {k: v for k, v in decoded_data.items() if len(v['timestamps']) > 0}
return decoded_data
def to_wide_csv(self, decoded_data: Dict[str, Dict[str, List]],
output_path: str,
datetime_format: bool = True):
"""
Wide 형식 CSV 생성 (각 시그널이 별도 컬럼)
모든 시그널이 컬럼으로 나열되는 형식
Time, Message1.Signal1, Message1.Signal2, Message2.Signal1, ...
Args:
decoded_data: 디코딩된 데이터
output_path: 출력 CSV 파일 경로
datetime_format: True면 타임스탬프를 날짜/시간으로 변환
"""
# 모든 고유한 타임스탬프 수집 및 정렬
all_timestamps = set()
for msg_data in decoded_data.values():
all_timestamps.update(msg_data['timestamps'])
all_timestamps = sorted(all_timestamps)
# 각 타임스탬프에 대한 시그널 값 매핑
signal_columns = []
signal_data = {}
for msg_name, msg_data in decoded_data.items():
for signal_name, values in msg_data.items():
if signal_name == 'timestamps':
continue
col_name = f"{msg_name}.{signal_name}"
signal_columns.append(col_name)
# 타임스탬프-값 매핑 생성
timestamp_to_value = {}
for ts, val in zip(msg_data['timestamps'], values):
timestamp_to_value[ts] = val
signal_data[col_name] = timestamp_to_value
# CSV 작성
with open(output_path, 'w', newline='', encoding='utf-8') as f:
writer = csv.writer(f)
# 헤더 작성
if datetime_format:
header = ['Timestamp', 'DateTime'] + signal_columns
else:
header = ['Timestamp'] + signal_columns
writer.writerow(header)
# 데이터 작성
for ts in all_timestamps:
if datetime_format:
dt = datetime.fromtimestamp(ts).strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]
row = [f"{ts:.6f}", dt]
else:
row = [f"{ts:.6f}"]
for col_name in signal_columns:
value = signal_data[col_name].get(ts, '')
row.append(value)
writer.writerow(row)
print(f"✓ Wide CSV 생성 완료: {output_path}")
print(f" - 행 수: {len(all_timestamps)}")
print(f" - 시그널 수: {len(signal_columns)}")
def to_long_csv(self, decoded_data: Dict[str, Dict[str, List]],
output_path: str,
datetime_format: bool = True):
"""
Long 형식 CSV 생성 (각 시그널 값이 별도 행)
Time, Message, Signal, Value
0.001, VehicleSpeed, Speed, 50.5
0.001, EngineData, RPM, 2000
Args:
decoded_data: 디코딩된 데이터
output_path: 출력 CSV 파일 경로
datetime_format: True면 타임스탬프를 날짜/시간으로 변환
"""
with open(output_path, 'w', newline='', encoding='utf-8') as f:
writer = csv.writer(f)
# 헤더 작성
if datetime_format:
header = ['Timestamp', 'DateTime', 'Message', 'Signal', 'Value']
else:
header = ['Timestamp', 'Message', 'Signal', 'Value']
writer.writerow(header)
# 데이터 작성
row_count = 0
for msg_name, msg_data in decoded_data.items():
timestamps = msg_data['timestamps']
for signal_name, values in msg_data.items():
if signal_name == 'timestamps':
continue
for ts, val in zip(timestamps, values):
if datetime_format:
dt = datetime.fromtimestamp(ts).strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]
row = [f"{ts:.6f}", dt, msg_name, signal_name, val]
else:
row = [f"{ts:.6f}", msg_name, signal_name, val]
writer.writerow(row)
row_count += 1
print(f"✓ Long CSV 생성 완료: {output_path}")
print(f" - 행 수: {row_count}")
def to_message_csv(self, decoded_data: Dict[str, Dict[str, List]],
output_dir: str,
datetime_format: bool = True):
"""
메시지별 CSV 생성 (각 CAN 메시지가 별도 CSV 파일)
각 메시지마다:
Time, Signal1, Signal2, Signal3, ...
Args:
decoded_data: 디코딩된 데이터
output_dir: 출력 디렉토리 경로
datetime_format: True면 타임스탬프를 날짜/시간으로 변환
"""
output_path = Path(output_dir)
output_path.mkdir(parents=True, exist_ok=True)
file_count = 0
for msg_name, msg_data in decoded_data.items():
# 파일명에 사용할 수 없는 문자 제거
safe_name = msg_name.replace('/', '_').replace('\\', '_')
csv_path = output_path / f"{safe_name}.csv"
with open(csv_path, 'w', newline='', encoding='utf-8') as f:
writer = csv.writer(f)
# 헤더 작성
signal_names = [k for k in msg_data.keys() if k != 'timestamps']
if datetime_format:
header = ['Timestamp', 'DateTime'] + signal_names
else:
header = ['Timestamp'] + signal_names
writer.writerow(header)
# 데이터 작성
timestamps = msg_data['timestamps']
for i, ts in enumerate(timestamps):
if datetime_format:
dt = datetime.fromtimestamp(ts).strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]
row = [f"{ts:.6f}", dt]
else:
row = [f"{ts:.6f}"]
for signal_name in signal_names:
row.append(msg_data[signal_name][i])
writer.writerow(row)
file_count += 1
print(f"✓ 메시지별 CSV 생성 완료: {output_dir}")
print(f" - 파일 수: {file_count}")
def to_raw_csv(self, messages: List[CANMessage], output_path: str):
"""
원본 CAN 메시지 CSV 생성 (디코딩 없이 raw 데이터)
Timestamp, DateTime, CAN_ID, DLC, Data
Args:
messages: CANMessage 리스트
output_path: 출력 CSV 파일 경로
"""
with open(output_path, 'w', newline='', encoding='utf-8') as f:
writer = csv.writer(f)
# 헤더
writer.writerow(['Timestamp', 'DateTime', 'CAN_ID', 'DLC', 'Data'])
# 데이터
for msg in messages:
ts = msg.timestamp_us / 1e6
dt = datetime.fromtimestamp(ts).strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]
can_id = f"0x{msg.can_id:03X}"
data_hex = ' '.join([f"{b:02X}" for b in msg.data])
writer.writerow([f"{ts:.6f}", dt, can_id, msg.dlc, data_hex])
print(f"✓ Raw CSV 생성 완료: {output_path}")
print(f" - 행 수: {len(messages)}")
def convert(self, bin_path: str, output_path: str,
format: str = 'wide',
apply_value_table: bool = True,
datetime_format: bool = True):
"""
bin 파일을 CSV로 변환 (전체 프로세스)
Args:
bin_path: 입력 bin 파일 경로
output_path: 출력 경로 (파일 또는 디렉토리)
format: CSV 형식 ('wide', 'long', 'message', 'raw')
apply_value_table: Value Table 적용 여부
datetime_format: 날짜/시간 포맷 포함 여부
"""
print(f"\n{'='*60}")
print(f"ESP32 CAN bin → CSV 변환")
print(f"{'='*60}")
# 1. bin 파일 읽기
messages = self.read_bin_file(bin_path)
if not messages:
print("✗ 메시지가 없습니다.")
return
# 시간 범위 출력
start_time = messages[0].timestamp_us / 1e6
end_time = messages[-1].timestamp_us / 1e6
duration = end_time - start_time
print(f" - 시작 시간: {datetime.fromtimestamp(start_time)}")
print(f" - 종료 시간: {datetime.fromtimestamp(end_time)}")
print(f" - 기간: {duration:.2f}")
# Raw 형식은 디코딩 없이 바로 변환
if format == 'raw':
self.to_raw_csv(messages, output_path)
else:
# 2. 메시지 디코딩
decoded_data = self.decode_messages(messages, apply_value_table)
if not decoded_data:
print("✗ 디코딩된 데이터가 없습니다.")
return
# 3. CSV 생성
if format == 'wide':
self.to_wide_csv(decoded_data, output_path, datetime_format)
elif format == 'long':
self.to_long_csv(decoded_data, output_path, datetime_format)
elif format == 'message':
self.to_message_csv(decoded_data, output_path, datetime_format)
else:
print(f"✗ 알 수 없는 형식: {format}")
return
print(f"{'='*60}")
print(f"✓ 변환 완료!")
print(f"{'='*60}\n")
def main():
"""사용 예제"""
import argparse
parser = argparse.ArgumentParser(
description='ESP32 CAN Logger bin 파일을 CSV로 변환',
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
CSV 형식:
wide : 모든 시그널이 컬럼으로 나열 (기본값, 분석에 적합)
long : 각 시그널 값이 별도 행 (데이터베이스 저장에 적합)
message : 메시지별 개별 CSV 파일 생성
raw : 원본 CAN 메시지 (디코딩 없음)
사용 예제:
python can_bin_to_csv.py -d example.dbc -i data.bin -o output.csv
python can_bin_to_csv.py -d example.dbc -i data.bin -o output.csv -f long
python can_bin_to_csv.py -d example.dbc -i data.bin -o csv_files/ -f message
python can_bin_to_csv.py -d example.dbc -i data.bin -o raw.csv -f raw
"""
)
parser.add_argument('-d', '--dbc', required=True,
help='CAN Database (DBC) 파일 경로')
parser.add_argument('-i', '--input', required=True, nargs='+',
help='입력 bin 파일(들) 경로')
parser.add_argument('-o', '--output', required=True,
help='출력 CSV 파일 경로 또는 디렉토리')
parser.add_argument('-f', '--format',
choices=['wide', 'long', 'message', 'raw'],
default='wide',
help='CSV 형식 (기본값: wide)')
parser.add_argument('--no-value-table', action='store_true',
help='Value Table을 적용하지 않음 (숫자만 출력)')
parser.add_argument('--no-datetime', action='store_true',
help='날짜/시간 컬럼을 포함하지 않음')
args = parser.parse_args()
# 변환기 초기화
converter = CANBinToCSV(args.dbc)
# 출력 경로 처리
output_path = Path(args.output)
# 여러 파일 변환
for input_file in args.input:
input_path = Path(input_file)
if not input_path.exists():
print(f"✗ 파일을 찾을 수 없음: {input_file}")
continue
# 출력 파일 경로 결정
if args.format == 'message':
# message 형식은 디렉토리 필요
if output_path.is_file():
output_file = output_path.parent / input_path.stem
else:
output_file = output_path / input_path.stem
else:
if output_path.is_dir() or str(output_path) == '/':
output_file = input_path.parent / f"{input_path.stem}.csv"
else:
output_file = output_path
# 변환 실행
try:
converter.convert(
str(input_path),
str(output_file),
format=args.format,
apply_value_table=not args.no_value_table,
datetime_format=not args.no_datetime
)
except Exception as e:
print(f"✗ 변환 실패 ({input_file}): {e}")
import traceback
traceback.print_exc()
if __name__ == "__main__":
main()

451
can_bin_to_mdf.py Normal file
View File

@@ -0,0 +1,451 @@
#!/usr/bin/env python3
"""
ESP32 CAN Logger bin 파일을 MDF 형식으로 변환
- CAN Database (DBC 파일) 사용
- Value Table (시그널 값-텍스트 매칭) 지원 (올바른 conversion으로 저장)
- asammdf 라이브러리 사용
"""
import struct
import numpy as np
from pathlib import Path
from typing import Dict, List, Tuple, Optional
from asammdf import MDF, Signal
from cantools import database as candb
from datetime import datetime
class CANMessage:
"""ESP32 CAN Logger의 CAN 메시지 구조체"""
STRUCT_FORMAT = '<QIB8s' # little-endian: uint64, uint32, uint8, 8*uint8
STRUCT_SIZE = struct.calcsize(STRUCT_FORMAT)
def __init__(self, timestamp_us: int, can_id: int, dlc: int, data: bytes):
self.timestamp_us = timestamp_us
self.can_id = can_id
self.dlc = dlc
self.data = data[:dlc] # DLC만큼만 유효
@classmethod
def from_bytes(cls, data: bytes) -> 'CANMessage':
"""바이너리 데이터에서 CANMessage 파싱"""
timestamp_us, can_id, dlc, msg_data = struct.unpack(cls.STRUCT_FORMAT, data)
return cls(timestamp_us, can_id, dlc, msg_data)
def __repr__(self):
return f"CANMessage(id=0x{self.can_id:X}, dlc={self.dlc}, ts={self.timestamp_us})"
class CANBinToMDF:
"""CAN bin 파일을 MDF로 변환하는 클래스"""
def __init__(self, dbc_path: str):
"""
Args:
dbc_path: CAN Database (DBC) 파일 경로
"""
self.db = candb.load_file(dbc_path)
print(f"✓ DBC 파일 로드: {dbc_path}")
print(f" - 메시지 수: {len(self.db.messages)}")
print(f" - 시그널 수: {sum(len(msg.signals) for msg in self.db.messages)}")
def read_bin_file(self, bin_path: str) -> List[CANMessage]:
"""
ESP32 CAN Logger의 bin 파일 읽기
Args:
bin_path: bin 파일 경로
Returns:
CANMessage 리스트
"""
messages = []
with open(bin_path, 'rb') as f:
data = f.read()
# 전체 데이터를 CANMessage 단위로 파싱
offset = 0
while offset + CANMessage.STRUCT_SIZE <= len(data):
msg_bytes = data[offset:offset + CANMessage.STRUCT_SIZE]
try:
msg = CANMessage.from_bytes(msg_bytes)
messages.append(msg)
except Exception as e:
print(f"⚠️ 메시지 파싱 오류 at offset {offset}: {e}")
offset += CANMessage.STRUCT_SIZE
print(f"✓ bin 파일 읽기 완료: {bin_path}")
print(f" - 총 메시지: {len(messages)}")
return messages
def decode_messages(self, messages: List[CANMessage]) -> Dict[str, Dict[str, List]]:
"""
CAN 메시지를 DBC를 사용하여 디코딩
Args:
messages: CANMessage 리스트
Returns:
{message_name: {signal_name: [values], 'timestamps': [us]}} 형태의 딕셔너리
"""
decoded_data = {}
# 각 메시지별로 초기화
for msg_def in self.db.messages:
msg_name = msg_def.name
decoded_data[msg_name] = {'timestamps': []}
for signal in msg_def.signals:
decoded_data[msg_name][signal.name] = []
# 메시지 디코딩
decode_count = 0
# 첫 번째 타임스탬프를 기준점으로 설정 (0초 시작)
start_timestamp = messages[0].timestamp_us / 1e6 if messages else 0
for msg in messages:
try:
# DBC에서 해당 CAN ID의 메시지 정의 찾기
msg_def = self.db.get_message_by_frame_id(msg.can_id)
msg_name = msg_def.name
# 메시지 디코딩
decoded = msg_def.decode(msg.data)
# 타임스탬프 저장 (마이크로초 -> 초, 0부터 시작)
relative_time = (msg.timestamp_us / 1e6) - start_timestamp
decoded_data[msg_name]['timestamps'].append(relative_time)
# 각 시그널 값 저장 (원본 숫자 값 유지)
for signal in msg_def.signals:
signal_name = signal.name
raw_value = decoded[signal_name]
# NamedSignalValue 객체인 경우 숫자 값으로 변환
# (value table이 있는 시그널은 NamedSignalValue로 반환됨)
if hasattr(raw_value, 'value'):
# NamedSignalValue 객체 - 숫자 값 추출
numeric_value = raw_value.value
else:
# 일반 숫자 값
numeric_value = raw_value
# 항상 원본 숫자 값을 저장
decoded_data[msg_name][signal_name].append(numeric_value)
decode_count += 1
except KeyError:
# DBC에 정의되지 않은 CAN ID는 무시
continue
except Exception as e:
# print(f"⚠️ 디코딩 오류 (ID: 0x{msg.can_id:X}): {e}")
continue
print(f"✓ 메시지 디코딩 완료: {decode_count}/{len(messages)}")
# 빈 메시지 제거
decoded_data = {k: v for k, v in decoded_data.items() if len(v['timestamps']) > 0}
return decoded_data
def create_mdf(self, decoded_data: Dict[str, Dict[str, List]],
output_path: str,
metadata: Optional[Dict] = None,
compression: int = 2):
"""
디코딩된 데이터로부터 MDF 파일 생성
Args:
decoded_data: decode_messages()의 반환값
output_path: 출력 MDF 파일 경로
metadata: MDF 파일 메타데이터 (선택)
compression: 압축 레벨 (0=없음, 1=deflate, 2=transposed, 기본값=2)
"""
signals = []
skipped_signals = []
value_table_signals = [] # value table이 있는 시그널 추적
for msg_name, msg_data in decoded_data.items():
timestamps = np.array(msg_data['timestamps'], dtype=np.float64)
# 메시지 정의 가져오기
try:
msg_def = self.db.get_message_by_name(msg_name)
except:
continue
for signal_name, values in msg_data.items():
if signal_name == 'timestamps':
continue
# 시그널 정의 가져오기
signal_def = None
for sig in msg_def.signals:
if sig.name == signal_name:
signal_def = sig
break
if signal_def is None:
continue
# 값의 데이터 타입 확인
if len(values) == 0:
continue
# numpy array로 변환 시도
try:
# 먼저 리스트로 변환하여 타입 확인
test_array = np.array(values)
# object dtype이면 강제로 float로 변환 시도
if test_array.dtype == np.object_:
# 각 값을 float로 변환
float_values = []
conversion_failed = False
for v in values:
try:
float_values.append(float(v))
except (ValueError, TypeError):
conversion_failed = True
break
if conversion_failed:
skipped_signals.append(f"{msg_name}.{signal_name}")
continue
samples = np.array(float_values, dtype=np.float64)
else:
# 정상적인 숫자 타입이면 그대로 사용
samples = np.array(values, dtype=np.float64)
except Exception as e:
skipped_signals.append(f"{msg_name}.{signal_name}")
print(f"⚠️ 시그널 변환 실패 ({msg_name}.{signal_name}): {e}")
continue
# Signal 객체 생성 시 conversion 적용
comment = signal_def.comment or ""
conversion = None
# Value Table이 있는 경우 conversion 적용
# asammdf 공식 예제 형식을 정확히 따름
if signal_def.choices:
value_table_signals.append(f"{msg_name}.{signal_name}")
# value-to-text conversion 생성
# 형식: {'val_0': 숫자, 'text_0': bytes, ..., 'default': bytes}
conversion = {}
# choices를 정렬하여 순서대로 처리
for idx, (raw_val, text_val) in enumerate(sorted(signal_def.choices.items())):
conversion[f'val_{idx}'] = int(raw_val)
# 텍스트는 반드시 bytes로 변환
if isinstance(text_val, str):
conversion[f'text_{idx}'] = text_val.encode('utf-8')
elif isinstance(text_val, bytes):
conversion[f'text_{idx}'] = text_val
else:
conversion[f'text_{idx}'] = str(text_val).encode('utf-8')
# 매칭되지 않는 값에 대한 기본값 (bytes)
conversion['default'] = b''
# Value table 정보를 주석에도 추가
value_table_str = "; ".join([f"{k}={v}" for k, v in sorted(signal_def.choices.items())])
if comment:
comment += f" [Values: {value_table_str}]"
else:
comment = f"Values: {value_table_str}"
# Signal 객체 생성
signal = Signal(
samples=samples,
timestamps=timestamps,
name=f"{msg_name}.{signal_name}",
unit=signal_def.unit or "",
comment=comment,
conversion=conversion # conversion 추가
)
signals.append(signal)
# MDF 파일 생성
if metadata is None:
metadata = {}
default_metadata = {
'author': 'ESP32 CAN Logger',
'department': '',
'project': 'CAN Data Logging',
'subject': 'CAN Bus Data',
}
default_metadata.update(metadata)
mdf = MDF(version='4.10')
# 메타데이터 설정
for key, value in default_metadata.items():
if hasattr(mdf.header, key):
setattr(mdf.header, key, value)
# 시그널 추가
if signals:
mdf.append(signals)
# MDF 파일 저장 (압축 적용)
if compression > 0:
print(f" 압축 중... (레벨 {compression})")
# 압축 옵션
# 0 = 압축 없음
# 1 = deflate (표준 압축)
# 2 = transposed deflate (최고 압축률, 기본값)
mdf.save(
output_path,
overwrite=True,
compression=compression,
)
# 파일 크기 확인
import os
compressed_size = os.path.getsize(output_path) / (1024 * 1024) # MB
compression_names = {0: "압축 없음", 1: "deflate", 2: "transposed"}
compression_name = compression_names.get(compression, f"레벨 {compression}")
print(f"✓ MDF 파일 생성 완료: {output_path}")
print(f" - 시그널 수: {len(signals)}")
print(f" - Value Table 적용 시그널: {len(value_table_signals)}")
print(f" - 파일 크기: {compressed_size:.2f} MB ({compression_name})")
if skipped_signals:
print(f" ⚠️ 건너뛴 시그널: {len(skipped_signals)}")
if len(skipped_signals) <= 10:
for sig in skipped_signals:
print(f" - {sig}")
else:
for sig in skipped_signals[:5]:
print(f" - {sig}")
print(f" ... 외 {len(skipped_signals) - 5}")
print(f" 💡 Value Table이 적용된 시그널은 MDF 뷰어에서 텍스트로 표시됩니다.")
def convert(self, bin_path: str, output_path: str,
metadata: Optional[Dict] = None,
compression: int = 2):
"""
bin 파일을 MDF로 변환 (전체 프로세스)
Args:
bin_path: 입력 bin 파일 경로
output_path: 출력 MDF 파일 경로
metadata: MDF 메타데이터 (선택)
compression: 압축 레벨 (0=없음, 1=deflate, 2=transposed, 기본값=2)
"""
print(f"\n{'='*60}")
print(f"ESP32 CAN bin → MDF 변환")
print(f"{'='*60}")
# 1. bin 파일 읽기
messages = self.read_bin_file(bin_path)
if not messages:
print("✗ 메시지가 없습니다.")
return
# 시간 범위 출력
start_time = messages[0].timestamp_us / 1e6
end_time = messages[-1].timestamp_us / 1e6
duration = end_time - start_time
print(f" - 시작 시간: {datetime.fromtimestamp(start_time)}")
print(f" - 종료 시간: {datetime.fromtimestamp(end_time)}")
print(f" - 기간: {duration:.2f}")
# 2. 메시지 디코딩
decoded_data = self.decode_messages(messages)
if not decoded_data:
print("✗ 디코딩된 데이터가 없습니다.")
return
# 3. MDF 생성
if metadata is None:
metadata = {
'comment': f"Converted from {Path(bin_path).name}",
}
self.create_mdf(decoded_data, output_path, metadata, compression)
print(f"{'='*60}")
print(f"✓ 변환 완료!")
print(f"{'='*60}\n")
def main():
"""사용 예제"""
import argparse
parser = argparse.ArgumentParser(
description='ESP32 CAN Logger bin 파일을 MDF로 변환 (Value Table 지원)',
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
사용 예제:
python can_bin_to_mdf_final.py -d example.dbc -i canlog_001.bin -o output.mf4
python can_bin_to_mdf_final.py -d vehicle.dbc -i data/*.bin -o output/
"""
)
parser.add_argument('-d', '--dbc', required=True,
help='CAN Database (DBC) 파일 경로')
parser.add_argument('-i', '--input', required=True, nargs='+',
help='입력 bin 파일(들) 경로')
parser.add_argument('-o', '--output', required=True,
help='출력 MDF 파일 경로 또는 디렉토리')
parser.add_argument('-m', '--metadata',
help='MDF 메타데이터 (JSON 형식)')
parser.add_argument('-c', '--compression', type=int, default=2,
choices=[0, 1, 2],
help='압축 레벨: 0=없음, 1=deflate, 2=transposed (기본값=2, 최고 압축)')
args = parser.parse_args()
# 변환기 초기화
converter = CANBinToMDF(args.dbc)
# 메타데이터 파싱
metadata = None
if args.metadata:
import json
metadata = json.loads(args.metadata)
# 출력 경로 처리
output_path = Path(args.output)
# 여러 파일 변환
for input_file in args.input:
input_path = Path(input_file)
if not input_path.exists():
print(f"✗ 파일을 찾을 수 없음: {input_file}")
continue
# 출력 파일 경로 결정
if output_path.is_dir() or str(output_path) == '/':
output_file = input_path.parent / f"{input_path.stem}.mf4"
else:
output_file = output_path
# 변환 실행
try:
converter.convert(str(input_path), str(output_file), metadata, args.compression)
except Exception as e:
print(f"✗ 변환 실패 ({input_file}): {e}")
import traceback
traceback.print_exc()
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,134 @@
@echo off
REM ======================================
REM CAN BIN to MDF Batch Converter (Advanced)
REM ======================================
REM Find DBC file
set DBC_FILE=
for %%f in (*.dbc) do (
set DBC_FILE=%%f
goto found_dbc
)
:found_dbc
if "%DBC_FILE%"=="" (
echo [ERROR] DBC file not found!
pause
exit /b 1
)
echo DBC File: %DBC_FILE%
echo.
REM Count BIN files
set COUNT=0
for %%f in (*.bin) do set /a COUNT+=1
if %COUNT%==0 (
echo [ERROR] No BIN files to convert!
pause
exit /b 1
)
echo Total files: %COUNT%
echo.
REM Select compression level
echo Select compression level:
echo ======================================
echo [0] No compression (Fast read, Large file)
echo - Read speed: ***** (Very fast)
echo - File size: 171MB
echo - Use case: Immediate analysis
echo.
echo [1] deflate compression (Balanced)
echo - Read speed: *** (Normal)
echo - File size: 3.6MB (98%% compression)
echo - Use case: General use
echo.
echo [2] transposed compression (Maximum)
echo - Read speed: * (Slow)
echo - File size: 1.2MB (99%% compression)
echo - Use case: Long-term archiving
echo ======================================
echo.
set /p COMPRESSION="Select (0/1/2) [default=0]: "
if "%COMPRESSION%"=="" set COMPRESSION=0
if "%COMPRESSION%"=="0" (
echo Selected: No compression - Fast read
) else if "%COMPRESSION%"=="1" (
echo Selected: deflate - Balanced
) else if "%COMPRESSION%"=="2" (
echo Selected: transposed - Maximum compression
) else (
echo [ERROR] Invalid selection. Please select 0, 1, or 2.
pause
exit /b 1
)
echo.
echo Starting conversion...
echo ======================================
echo.
REM Record start time
set START_TIME=%TIME%
REM Convert each BIN file
setlocal enabledelayedexpansion
set PROCESSED=0
set SUCCESS=0
set FAILED=0
for %%f in (*.bin) do (
set /a PROCESSED+=1
echo [!PROCESSED!/%COUNT%] Converting: %%f
python can_bin_to_mdf.py -d "%DBC_FILE%" -i "%%f" -o "./" -c %COMPRESSION%
if errorlevel 1 (
echo [FAIL] %%f
set /a FAILED+=1
) else (
echo [OK] %%f
set /a SUCCESS+=1
)
echo.
)
REM Record end time
set END_TIME=%TIME%
echo ======================================
echo Conversion Complete!
echo ======================================
echo Total: %COUNT%
echo Success: %SUCCESS%
echo Failed: %FAILED%
echo Start: %START_TIME%
echo End: %END_TIME%
echo ======================================
echo.
REM List generated MDF files with size
echo Generated MDF files:
for %%f in (*.mf4) do (
set SIZE=%%~zf
set /a SIZE_MB=!SIZE! / 1048576
echo - %%f (!SIZE_MB! MB^)
)
echo.
REM Calculate total MDF file size
set TOTAL_SIZE=0
for %%f in (*.mf4) do (
set SIZE=%%~zf
set /a TOTAL_SIZE+=!SIZE!
)
set /a TOTAL_MB=!TOTAL_SIZE! / 1048576
echo Total MDF size: !TOTAL_MB! MB
echo.
pause

View File

@@ -0,0 +1,156 @@
@echo off
REM ======================================
REM CAN BIN to CSV Batch Converter (Advanced)
REM Multiple format options
REM ======================================
REM Find DBC file
set DBC_FILE=
for %%f in (*.dbc) do (
set DBC_FILE=%%f
goto found_dbc
)
:found_dbc
if "%DBC_FILE%"=="" (
echo [ERROR] DBC file not found!
pause
exit /b 1
)
echo ======================================
echo CAN BIN to CSV Converter (Advanced)
echo ======================================
echo.
echo DBC File: %DBC_FILE%
echo.
REM Count BIN files
set COUNT=0
for %%f in (*.bin) do set /a COUNT+=1
if %COUNT%==0 (
echo [ERROR] No BIN files to convert!
pause
exit /b 1
)
echo Total files: %COUNT%
echo.
REM Select CSV format
echo Select CSV format:
echo ======================================
echo [1] Wide format (Excel-friendly) - RECOMMENDED
echo - One row per timestamp
echo - All signals as columns
echo - Easy to use in Excel
echo - Output: filename_wide.csv
echo.
echo [2] Long format (Database-friendly)
echo - One row per signal value
echo - Columns: Time, Signal, Value, Unit
echo - Good for databases
echo - Output: filename_long.csv
echo.
echo [3] Message format (Separate files per message)
echo - One CSV file per CAN message
echo - Organized by message
echo - Output: csv_files/MessageName_*.csv
echo.
echo [4] Raw format (No decoding)
echo - Raw CAN frames
echo - No signal decoding
echo - Output: filename_raw.csv
echo ======================================
echo.
set /p FORMAT="Select (1/2/3/4) [default=1]: "
if "%FORMAT%"=="" set FORMAT=1
if "%FORMAT%"=="1" (
set FORMAT_NAME=wide
set OUTPUT_DIR=./
echo Selected: Wide format - Excel-friendly
) else if "%FORMAT%"=="2" (
set FORMAT_NAME=long
set OUTPUT_DIR=./
echo Selected: Long format - Database-friendly
) else if "%FORMAT%"=="3" (
set FORMAT_NAME=message
set OUTPUT_DIR=csv_files/
echo Selected: Message format - Separate files
if not exist csv_files mkdir csv_files
) else if "%FORMAT%"=="4" (
set FORMAT_NAME=raw
set OUTPUT_DIR=./
echo Selected: Raw format - No decoding
) else (
echo [ERROR] Invalid selection. Please select 1, 2, 3, or 4.
pause
exit /b 1
)
echo.
echo Starting conversion...
echo ======================================
echo.
REM Record start time
set START_TIME=%TIME%
REM Convert each BIN file
setlocal enabledelayedexpansion
set PROCESSED=0
set SUCCESS=0
set FAILED=0
for %%f in (*.bin) do (
set /a PROCESSED+=1
echo [!PROCESSED!/%COUNT%] Converting: %%f
python can_bin_to_csv.py -d "%DBC_FILE%" -i "%%f" -o "%OUTPUT_DIR%" -f %FORMAT_NAME%
if errorlevel 1 (
echo [FAIL] %%f
set /a FAILED+=1
) else (
echo [OK] %%f
set /a SUCCESS+=1
)
echo.
)
REM Record end time
set END_TIME=%TIME%
echo ======================================
echo Conversion Complete!
echo ======================================
echo Total: %COUNT%
echo Success: %SUCCESS%
echo Failed: %FAILED%
echo Start: %START_TIME%
echo End: %END_TIME%
echo ======================================
echo.
REM List generated CSV files
if "%FORMAT%"=="3" (
echo Generated CSV files in csv_files folder:
dir /b csv_files\*.csv 2>nul | find /c /v "" > temp_count.txt
set /p CSV_COUNT=<temp_count.txt
del temp_count.txt
echo Total CSV files: !CSV_COUNT!
echo Location: csv_files\
) else (
echo Generated CSV files:
for %%f in (*_%FORMAT_NAME%.csv) do (
set SIZE=%%~zf
set /a SIZE_KB=!SIZE! / 1024
echo - %%f (!SIZE_KB! KB^)
)
)
echo.
pause

11
csv변환법.txt Normal file
View File

@@ -0,0 +1,11 @@
# Wide 형식 (Excel용, 권장)
python can_bin_to_csv.py -d CAN_DB_BluePlug_231207a.dbc -i *.bin -o ./ -f wide
# Long 형식 (데이터베이스용)
python can_bin_to_csv.py -d CAN_DB_BluePlug_231207a.dbc -i *.bin -o ./ -f long
# Message 형식 (메시지별 파일)
python can_bin_to_csv.py -d CAN_DB_BluePlug_231207a.dbc -i *.bin -o csv_files/ -f message
# Raw 형식 (디코딩 없음)
python can_bin_to_csv.py -d CAN_DB_BluePlug_231207a.dbc -i *.bin -o ./ -f raw