Files
esp32s3_canlogger_mcp2515/can_converter_gui.py
2025-12-18 06:47:46 +00:00

1192 lines
43 KiB
Python

#!/usr/bin/env python3
"""
ESP32 CAN Logger GUI Converter (All-in-One)
bin 파일을 CSV 또는 MDF 형식으로 변환하는 GUI 프로그램
"""
import sys
import os
import struct
import csv
from pathlib import Path
from datetime import datetime
import threading
from typing import Dict, List, Optional
import numpy as np
from cantools import database as candb
from asammdf import MDF, Signal
from PyQt5.QtWidgets import (
QApplication, QMainWindow, QWidget, QVBoxLayout, QHBoxLayout,
QPushButton, QLabel, QFileDialog, QComboBox, QTextEdit,
QProgressBar, QGroupBox, QRadioButton, QButtonGroup,
QCheckBox, QMessageBox, QTabWidget, QListWidget, QSplitter
)
from PyQt5.QtCore import Qt, pyqtSignal, QObject, QThread
from PyQt5.QtGui import QFont, QTextCursor, QIcon
# ============================================================================
# CAN Message 구조체
# ============================================================================
class CANMessage:
"""ESP32 CAN Logger의 CAN 메시지 구조체"""
STRUCT_FORMAT = '<QIB8s' # little-endian: uint64, uint32, uint8, 8*uint8
STRUCT_SIZE = struct.calcsize(STRUCT_FORMAT)
def __init__(self, timestamp_us: int, can_id: int, dlc: int, data: bytes):
self.timestamp_us = timestamp_us
self.can_id = can_id
self.dlc = dlc
self.data = data[:dlc]
@classmethod
def from_bytes(cls, data: bytes) -> 'CANMessage':
"""바이너리 데이터에서 CANMessage 파싱"""
timestamp_us, can_id, dlc, msg_data = struct.unpack(cls.STRUCT_FORMAT, data)
return cls(timestamp_us, can_id, dlc, msg_data)
def __repr__(self):
return f"CANMessage(id=0x{self.can_id:X}, dlc={self.dlc}, ts={self.timestamp_us})"
# ============================================================================
# CSV 변환기
# ============================================================================
class CANBinToCSV:
"""CAN bin 파일을 CSV로 변환하는 클래스"""
def __init__(self, dbc_path: str):
"""
Args:
dbc_path: CAN Database (DBC) 파일 경로
"""
self.db = candb.load_file(dbc_path)
print(f"✓ DBC 파일 로드: {dbc_path}")
print(f" - 메시지 수: {len(self.db.messages)}")
print(f" - 시그널 수: {sum(len(msg.signals) for msg in self.db.messages)}")
def read_bin_file(self, bin_path: str) -> List[CANMessage]:
"""ESP32 CAN Logger의 bin 파일 읽기"""
messages = []
with open(bin_path, 'rb') as f:
data = f.read()
offset = 0
while offset + CANMessage.STRUCT_SIZE <= len(data):
msg_bytes = data[offset:offset + CANMessage.STRUCT_SIZE]
try:
msg = CANMessage.from_bytes(msg_bytes)
messages.append(msg)
except Exception as e:
print(f"⚠️ 메시지 파싱 오류 at offset {offset}: {e}")
offset += CANMessage.STRUCT_SIZE
print(f"✓ bin 파일 읽기 완료: {bin_path}")
print(f" - 총 메시지: {len(messages)}")
return messages
def decode_messages(self, messages: List[CANMessage],
apply_value_table: bool = True) -> Dict[str, Dict[str, List]]:
"""CAN 메시지를 DBC를 사용하여 디코딩"""
decoded_data = {}
# 각 메시지별로 초기화
for msg_def in self.db.messages:
msg_name = msg_def.name
decoded_data[msg_name] = {'timestamps': []}
for signal in msg_def.signals:
decoded_data[msg_name][signal.name] = []
decode_count = 0
for msg in messages:
try:
msg_def = self.db.get_message_by_frame_id(msg.can_id)
msg_name = msg_def.name
decoded = msg_def.decode(msg.data)
# 타임스탬프 저장 (마이크로초 -> 초)
decoded_data[msg_name]['timestamps'].append(msg.timestamp_us / 1e6)
# 각 시그널 값 저장
for signal in msg_def.signals:
signal_name = signal.name
raw_value = decoded[signal_name]
# Value Table 적용
if apply_value_table and signal.choices:
try:
int_value = int(raw_value)
if int_value in signal.choices:
value = signal.choices[int_value]
else:
value = raw_value
except (ValueError, TypeError):
value = raw_value
else:
value = raw_value
decoded_data[msg_name][signal_name].append(value)
decode_count += 1
except KeyError:
continue
except Exception as e:
continue
print(f"✓ 메시지 디코딩 완료: {decode_count}/{len(messages)}")
# 빈 메시지 제거
decoded_data = {k: v for k, v in decoded_data.items() if len(v['timestamps']) > 0}
return decoded_data
def to_wide_csv(self, decoded_data: Dict[str, Dict[str, List]],
output_path: str,
datetime_format: bool = True):
"""Wide 형식 CSV 생성 (각 시그널이 별도 컬럼)"""
# 모든 고유한 타임스탬프 수집 및 정렬
all_timestamps = set()
for msg_data in decoded_data.values():
all_timestamps.update(msg_data['timestamps'])
all_timestamps = sorted(all_timestamps)
# 각 타임스탬프에 대한 시그널 값 매핑
signal_columns = []
signal_data = {}
for msg_name, msg_data in decoded_data.items():
for signal_name, values in msg_data.items():
if signal_name == 'timestamps':
continue
col_name = f"{msg_name}.{signal_name}"
signal_columns.append(col_name)
# 타임스탬프-값 매핑 생성
timestamp_to_value = {}
for ts, val in zip(msg_data['timestamps'], values):
timestamp_to_value[ts] = val
signal_data[col_name] = timestamp_to_value
# CSV 작성
with open(output_path, 'w', newline='', encoding='utf-8') as f:
writer = csv.writer(f)
# 헤더 작성
if datetime_format:
header = ['Timestamp', 'DateTime'] + signal_columns
else:
header = ['Timestamp'] + signal_columns
writer.writerow(header)
# 데이터 작성
for ts in all_timestamps:
if datetime_format:
dt = datetime.fromtimestamp(ts).strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]
row = [f"{ts:.6f}", dt]
else:
row = [f"{ts:.6f}"]
for col_name in signal_columns:
value = signal_data[col_name].get(ts, '')
row.append(value)
writer.writerow(row)
print(f"✓ Wide CSV 생성 완료: {output_path}")
print(f" - 행 수: {len(all_timestamps)}")
print(f" - 시그널 수: {len(signal_columns)}")
def to_long_csv(self, decoded_data: Dict[str, Dict[str, List]],
output_path: str,
datetime_format: bool = True):
"""Long 형식 CSV 생성 (각 시그널 값이 별도 행)"""
with open(output_path, 'w', newline='', encoding='utf-8') as f:
writer = csv.writer(f)
# 헤더 작성
if datetime_format:
header = ['Timestamp', 'DateTime', 'Message', 'Signal', 'Value']
else:
header = ['Timestamp', 'Message', 'Signal', 'Value']
writer.writerow(header)
# 데이터 작성
row_count = 0
for msg_name, msg_data in decoded_data.items():
timestamps = msg_data['timestamps']
for signal_name, values in msg_data.items():
if signal_name == 'timestamps':
continue
for ts, val in zip(timestamps, values):
if datetime_format:
dt = datetime.fromtimestamp(ts).strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]
row = [f"{ts:.6f}", dt, msg_name, signal_name, val]
else:
row = [f"{ts:.6f}", msg_name, signal_name, val]
writer.writerow(row)
row_count += 1
print(f"✓ Long CSV 생성 완료: {output_path}")
print(f" - 행 수: {row_count}")
def to_message_csv(self, decoded_data: Dict[str, Dict[str, List]],
output_dir: str,
datetime_format: bool = True):
"""메시지별 CSV 생성 (각 CAN 메시지가 별도 CSV 파일)"""
output_path = Path(output_dir)
output_path.mkdir(parents=True, exist_ok=True)
file_count = 0
for msg_name, msg_data in decoded_data.items():
# 파일명에 사용할 수 없는 문자 제거
safe_name = msg_name.replace('/', '_').replace('\\', '_')
csv_path = output_path / f"{safe_name}.csv"
with open(csv_path, 'w', newline='', encoding='utf-8') as f:
writer = csv.writer(f)
# 헤더 작성
signal_names = [k for k in msg_data.keys() if k != 'timestamps']
if datetime_format:
header = ['Timestamp', 'DateTime'] + signal_names
else:
header = ['Timestamp'] + signal_names
writer.writerow(header)
# 데이터 작성
timestamps = msg_data['timestamps']
for i, ts in enumerate(timestamps):
if datetime_format:
dt = datetime.fromtimestamp(ts).strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]
row = [f"{ts:.6f}", dt]
else:
row = [f"{ts:.6f}"]
for signal_name in signal_names:
row.append(msg_data[signal_name][i])
writer.writerow(row)
file_count += 1
print(f"✓ 메시지별 CSV 생성 완료: {output_dir}")
print(f" - 파일 수: {file_count}")
def to_raw_csv(self, messages: List[CANMessage], output_path: str):
"""원본 CAN 메시지 CSV 생성 (디코딩 없이 raw 데이터)"""
with open(output_path, 'w', newline='', encoding='utf-8') as f:
writer = csv.writer(f)
# 헤더
writer.writerow(['Timestamp', 'DateTime', 'CAN_ID', 'DLC', 'Data'])
# 데이터
for msg in messages:
ts = msg.timestamp_us / 1e6
dt = datetime.fromtimestamp(ts).strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]
can_id = f"0x{msg.can_id:03X}"
data_hex = ' '.join([f"{b:02X}" for b in msg.data])
writer.writerow([f"{ts:.6f}", dt, can_id, msg.dlc, data_hex])
print(f"✓ Raw CSV 생성 완료: {output_path}")
print(f" - 행 수: {len(messages)}")
def convert(self, bin_path: str, output_path: str,
format: str = 'wide',
apply_value_table: bool = True,
datetime_format: bool = True):
"""bin 파일을 CSV로 변환 (전체 프로세스)"""
print(f"\n{'='*60}")
print(f"ESP32 CAN bin → CSV 변환")
print(f"{'='*60}")
# 1. bin 파일 읽기
messages = self.read_bin_file(bin_path)
if not messages:
print("✗ 메시지가 없습니다.")
return
# 시간 범위 출력
start_time = messages[0].timestamp_us / 1e6
end_time = messages[-1].timestamp_us / 1e6
duration = end_time - start_time
print(f" - 시작 시간: {datetime.fromtimestamp(start_time)}")
print(f" - 종료 시간: {datetime.fromtimestamp(end_time)}")
print(f" - 기간: {duration:.2f}")
# Raw 형식은 디코딩 없이 바로 변환
if format == 'raw':
self.to_raw_csv(messages, output_path)
else:
# 2. 메시지 디코딩
decoded_data = self.decode_messages(messages, apply_value_table)
if not decoded_data:
print("✗ 디코딩된 데이터가 없습니다.")
return
# 3. CSV 생성
if format == 'wide':
self.to_wide_csv(decoded_data, output_path, datetime_format)
elif format == 'long':
self.to_long_csv(decoded_data, output_path, datetime_format)
elif format == 'message':
self.to_message_csv(decoded_data, output_path, datetime_format)
else:
print(f"✗ 알 수 없는 형식: {format}")
return
print(f"{'='*60}")
print(f"✓ 변환 완료!")
print(f"{'='*60}\n")
# ============================================================================
# MDF 변환기
# ============================================================================
class CANBinToMDF:
"""CAN bin 파일을 MDF4로 변환하는 클래스"""
def __init__(self, dbc_path: str):
"""
Args:
dbc_path: CAN Database (DBC) 파일 경로
"""
self.db = candb.load_file(dbc_path)
print(f"✓ DBC 파일 로드: {dbc_path}")
print(f" - 메시지 수: {len(self.db.messages)}")
print(f" - 시그널 수: {sum(len(msg.signals) for msg in self.db.messages)}")
def read_bin_file(self, bin_path: str) -> List[CANMessage]:
"""ESP32 CAN Logger의 bin 파일 읽기"""
messages = []
with open(bin_path, 'rb') as f:
data = f.read()
offset = 0
while offset + CANMessage.STRUCT_SIZE <= len(data):
msg_bytes = data[offset:offset + CANMessage.STRUCT_SIZE]
try:
msg = CANMessage.from_bytes(msg_bytes)
messages.append(msg)
except Exception as e:
print(f"⚠️ 메시지 파싱 오류 at offset {offset}: {e}")
offset += CANMessage.STRUCT_SIZE
print(f"✓ bin 파일 읽기 완료: {bin_path}")
print(f" - 총 메시지: {len(messages)}")
return messages
def decode_messages(self, messages: List[CANMessage]) -> Dict[str, Dict[str, List]]:
"""CAN 메시지를 DBC를 사용하여 디코딩"""
decoded_data = {}
# 각 메시지별로 초기화
for msg_def in self.db.messages:
msg_name = msg_def.name
decoded_data[msg_name] = {'timestamps': []}
for signal in msg_def.signals:
decoded_data[msg_name][signal.name] = []
decode_count = 0
for msg in messages:
try:
msg_def = self.db.get_message_by_frame_id(msg.can_id)
msg_name = msg_def.name
decoded = msg_def.decode(msg.data)
# 타임스탬프 저장 (마이크로초 -> 초)
decoded_data[msg_name]['timestamps'].append(msg.timestamp_us / 1e6)
# 각 시그널 값 저장
for signal in msg_def.signals:
signal_name = signal.name
raw_value = decoded[signal_name]
decoded_data[msg_name][signal_name].append(raw_value)
decode_count += 1
except KeyError:
continue
except Exception as e:
continue
print(f"✓ 메시지 디코딩 완료: {decode_count}/{len(messages)}")
# 빈 메시지 제거
decoded_data = {k: v for k, v in decoded_data.items() if len(v['timestamps']) > 0}
return decoded_data
def to_mdf(self, decoded_data: Dict[str, Dict[str, List]],
output_path: str,
compression: int = 0):
"""MDF4 파일 생성"""
mdf = MDF(version='4.10')
signal_count = 0
for msg_name, msg_data in decoded_data.items():
timestamps = np.array(msg_data['timestamps'])
for signal_name, values in msg_data.items():
if signal_name == 'timestamps':
continue
# DBC에서 시그널 정보 가져오기
try:
msg_def = self.db.get_message_by_name(msg_name)
signal_def = None
for sig in msg_def.signals:
if sig.name == signal_name:
signal_def = sig
break
if signal_def is None:
continue
# numpy 배열로 변환
samples = np.array(values)
# Signal 객체 생성
signal = Signal(
samples=samples,
timestamps=timestamps,
name=f"{msg_name}.{signal_name}",
unit=signal_def.unit or '',
comment=signal_def.comment or ''
)
mdf.append(signal)
signal_count += 1
except Exception as e:
print(f"⚠️ 시그널 추가 실패 ({msg_name}.{signal_name}): {e}")
continue
# 파일 저장
compression_map = {
0: 0, # no compression
1: 1, # deflate
2: 2 # transposed + deflate
}
mdf.save(
output_path,
compression=compression_map.get(compression, 0),
overwrite=True
)
print(f"✓ MDF4 파일 생성 완료: {output_path}")
print(f" - 시그널 수: {signal_count}")
# 파일 크기 출력
file_size = Path(output_path).stat().st_size
file_size_mb = file_size / (1024 * 1024)
print(f" - 파일 크기: {file_size_mb:.2f} MB")
def convert(self, bin_path: str, output_path: str, compression: int = 0):
"""bin 파일을 MDF4로 변환 (전체 프로세스)"""
print(f"\n{'='*60}")
print(f"ESP32 CAN bin → MDF4 변환")
print(f"{'='*60}")
# 1. bin 파일 읽기
messages = self.read_bin_file(bin_path)
if not messages:
print("✗ 메시지가 없습니다.")
return
# 시간 범위 출력
start_time = messages[0].timestamp_us / 1e6
end_time = messages[-1].timestamp_us / 1e6
duration = end_time - start_time
print(f" - 시작 시간: {datetime.fromtimestamp(start_time)}")
print(f" - 종료 시간: {datetime.fromtimestamp(end_time)}")
print(f" - 기간: {duration:.2f}")
# 2. 메시지 디코딩
decoded_data = self.decode_messages(messages)
if not decoded_data:
print("✗ 디코딩된 데이터가 없습니다.")
return
# 3. MDF 파일 생성
self.to_mdf(decoded_data, output_path, compression)
print(f"{'='*60}")
print(f"✓ 변환 완료!")
print(f"{'='*60}\n")
# ============================================================================
# GUI Worker Thread
# ============================================================================
class WorkerSignals(QObject):
"""워커 스레드 시그널"""
progress = pyqtSignal(int, int, str) # current, total, message
log = pyqtSignal(str) # log message
finished = pyqtSignal(int, int) # success, failed
error = pyqtSignal(str) # error message
class ConversionWorker(QThread):
"""변환 작업을 수행하는 워커 스레드"""
def __init__(self, dbc_path: str, bin_files: List[str],
output_dir: str, format_type: str,
conversion_type: str, options: dict):
super().__init__()
self.dbc_path = dbc_path
self.bin_files = bin_files
self.output_dir = output_dir
self.format_type = format_type
self.conversion_type = conversion_type
self.options = options
self.signals = WorkerSignals()
self._is_running = True
def stop(self):
"""작업 중단"""
self._is_running = False
def run(self):
"""변환 작업 실행"""
success_count = 0
failed_count = 0
total = len(self.bin_files)
try:
# 변환기 초기화
if self.conversion_type == 'csv':
converter = CANBinToCSV(self.dbc_path)
else: # mdf
converter = CANBinToMDF(self.dbc_path)
self.signals.log.emit(f"✓ DBC 파일 로드: {self.dbc_path}")
# 각 파일 변환
for idx, bin_file in enumerate(self.bin_files):
if not self._is_running:
self.signals.log.emit("❌ 사용자에 의해 중단됨")
break
current = idx + 1
self.signals.progress.emit(current, total, f"Converting {Path(bin_file).name}")
self.signals.log.emit(f"\n[{current}/{total}] {Path(bin_file).name}")
try:
# 출력 파일 경로 결정
bin_path = Path(bin_file)
if self.conversion_type == 'csv':
if self.format_type == 'message':
output_path = Path(self.output_dir) / bin_path.stem
else:
output_path = Path(self.output_dir) / f"{bin_path.stem}.csv"
converter.convert(
str(bin_path),
str(output_path),
format=self.format_type,
apply_value_table=self.options.get('apply_value_table', True),
datetime_format=self.options.get('datetime_format', True)
)
else: # mdf
output_path = Path(self.output_dir) / f"{bin_path.stem}.mf4"
converter.convert(
str(bin_path),
str(output_path),
compression=self.options.get('compression', 0)
)
success_count += 1
self.signals.log.emit(f" ✓ 완료: {output_path.name}")
except Exception as e:
failed_count += 1
self.signals.log.emit(f" ✗ 실패: {str(e)}")
self.signals.finished.emit(success_count, failed_count)
except Exception as e:
self.signals.error.emit(str(e))
# ============================================================================
# GUI Main Window
# ============================================================================
class CANConverterGUI(QMainWindow):
"""CAN Bin to CSV/MDF 변환 GUI"""
def __init__(self):
super().__init__()
self.dbc_path = None
self.bin_files = []
self.output_dir = None
self.worker = None
self.init_ui()
def init_ui(self):
"""UI 초기화"""
self.setWindowTitle("ESP32 CAN Logger Converter v1.0")
self.setGeometry(100, 100, 1000, 700)
# 메인 위젯
main_widget = QWidget()
self.setCentralWidget(main_widget)
main_layout = QVBoxLayout(main_widget)
# 탭 위젯
tab_widget = QTabWidget()
main_layout.addWidget(tab_widget)
# CSV 변환 탭
csv_tab = self.create_csv_tab()
tab_widget.addTab(csv_tab, "CSV 변환")
# MDF 변환 탭
mdf_tab = self.create_mdf_tab()
tab_widget.addTab(mdf_tab, "MDF 변환")
# 하단 로그/진행 영역
bottom_widget = self.create_bottom_widget()
main_layout.addWidget(bottom_widget)
# 스타일 적용
self.apply_style()
def create_csv_tab(self) -> QWidget:
"""CSV 변환 탭 생성"""
tab = QWidget()
layout = QVBoxLayout(tab)
# 파일 선택 영역
file_group = self.create_file_selection_group()
layout.addWidget(file_group)
# CSV 형식 선택
format_group = QGroupBox("CSV 형식 선택")
format_layout = QVBoxLayout()
self.csv_format_group = QButtonGroup()
formats = [
("wide", "Wide Format (Excel-friendly)",
"모든 시그널이 컬럼으로 나열 - Excel 분석에 최적"),
("long", "Long Format (Database-friendly)",
"각 시그널 값이 별도 행 - 데이터베이스 저장에 적합"),
("message", "Message Format (Separate files)",
"메시지별 개별 CSV 파일 생성 - 메시지별 분석"),
("raw", "Raw Format (No decoding)",
"원본 CAN 프레임 - 디코딩 없음")
]
self.csv_format_radios = {}
for idx, (value, title, desc) in enumerate(formats):
radio = QRadioButton(title)
if idx == 0:
radio.setChecked(True)
self.csv_format_group.addButton(radio, idx)
self.csv_format_radios[value] = radio
format_layout.addWidget(radio)
desc_label = QLabel(f"{desc}")
desc_label.setStyleSheet("color: gray; font-size: 10px;")
format_layout.addWidget(desc_label)
format_group.setLayout(format_layout)
layout.addWidget(format_group)
# CSV 옵션
option_group = QGroupBox("옵션")
option_layout = QVBoxLayout()
self.csv_value_table_check = QCheckBox("Value Table 적용 (숫자 → 텍스트 변환)")
self.csv_value_table_check.setChecked(True)
option_layout.addWidget(self.csv_value_table_check)
self.csv_datetime_check = QCheckBox("날짜/시간 컬럼 포함")
self.csv_datetime_check.setChecked(True)
option_layout.addWidget(self.csv_datetime_check)
option_group.setLayout(option_layout)
layout.addWidget(option_group)
# 변환 버튼
self.csv_convert_btn = QPushButton("CSV 변환 시작")
self.csv_convert_btn.setMinimumHeight(50)
self.csv_convert_btn.clicked.connect(lambda: self.start_conversion('csv'))
layout.addWidget(self.csv_convert_btn)
layout.addStretch()
return tab
def create_mdf_tab(self) -> QWidget:
"""MDF 변환 탭 생성"""
tab = QWidget()
layout = QVBoxLayout(tab)
# 파일 선택 영역 (공유)
info_label = QLabel("※ 파일 선택은 상단 '파일 선택' 영역을 사용하세요")
info_label.setStyleSheet("color: blue; font-weight: bold;")
layout.addWidget(info_label)
# 압축 방식 선택
compression_group = QGroupBox("압축 방식 선택")
compression_layout = QVBoxLayout()
self.mdf_compression_group = QButtonGroup()
compressions = [
(0, "압축 없음 (No compression)",
"읽기 속도: ★★★★★ | 파일 크기: 171MB | 즉시 분석용"),
(1, "Deflate 압축 (Balanced)",
"읽기 속도: ★★★☆☆ | 파일 크기: 3.6MB (98% 압축) | 일반 사용"),
(2, "Transposed 압축 (Maximum)",
"읽기 속도: ★☆☆☆☆ | 파일 크기: 1.2MB (99% 압축) | 장기 보관용")
]
self.mdf_compression_radios = {}
for idx, (value, title, desc) in enumerate(compressions):
radio = QRadioButton(title)
if idx == 0:
radio.setChecked(True)
self.mdf_compression_group.addButton(radio, value)
self.mdf_compression_radios[value] = radio
compression_layout.addWidget(radio)
desc_label = QLabel(f"{desc}")
desc_label.setStyleSheet("color: gray; font-size: 10px;")
compression_layout.addWidget(desc_label)
compression_group.setLayout(compression_layout)
layout.addWidget(compression_group)
# MDF 정보
info_group = QGroupBox("MDF4 형식 정보")
info_layout = QVBoxLayout()
info_text = QLabel(
"• MDF4는 자동차 업계 표준 측정 데이터 형식입니다\n"
"• CANoe, CANalyzer, Vector Tools에서 직접 열 수 있습니다\n"
"• 압축률이 매우 높아 저장 공간을 절약할 수 있습니다\n"
"• 모든 시그널 정보, 단위, 설명이 포함됩니다"
)
info_layout.addWidget(info_text)
info_group.setLayout(info_layout)
layout.addWidget(info_group)
# 변환 버튼
self.mdf_convert_btn = QPushButton("MDF 변환 시작")
self.mdf_convert_btn.setMinimumHeight(50)
self.mdf_convert_btn.clicked.connect(lambda: self.start_conversion('mdf'))
layout.addWidget(self.mdf_convert_btn)
layout.addStretch()
return tab
def create_file_selection_group(self) -> QGroupBox:
"""파일 선택 영역 생성"""
group = QGroupBox("파일 선택")
layout = QVBoxLayout()
# DBC 파일
dbc_layout = QHBoxLayout()
dbc_layout.addWidget(QLabel("DBC 파일:"))
self.dbc_label = QLabel("선택되지 않음")
self.dbc_label.setStyleSheet("color: red;")
dbc_layout.addWidget(self.dbc_label, 1)
dbc_btn = QPushButton("찾아보기")
dbc_btn.clicked.connect(self.select_dbc)
dbc_layout.addWidget(dbc_btn)
auto_dbc_btn = QPushButton("자동 검색")
auto_dbc_btn.clicked.connect(self.auto_find_dbc)
dbc_layout.addWidget(auto_dbc_btn)
layout.addLayout(dbc_layout)
# BIN 파일 목록
bin_layout = QVBoxLayout()
bin_label_layout = QHBoxLayout()
bin_label_layout.addWidget(QLabel("BIN 파일:"))
self.bin_count_label = QLabel("0개 선택됨")
bin_label_layout.addWidget(self.bin_count_label)
bin_label_layout.addStretch()
bin_layout.addLayout(bin_label_layout)
self.bin_list = QListWidget()
self.bin_list.setMaximumHeight(100)
bin_layout.addWidget(self.bin_list)
bin_btn_layout = QHBoxLayout()
add_bin_btn = QPushButton("BIN 파일 추가")
add_bin_btn.clicked.connect(self.select_bin_files)
bin_btn_layout.addWidget(add_bin_btn)
auto_bin_btn = QPushButton("자동 검색")
auto_bin_btn.clicked.connect(self.auto_find_bin_files)
bin_btn_layout.addWidget(auto_bin_btn)
clear_bin_btn = QPushButton("목록 지우기")
clear_bin_btn.clicked.connect(self.clear_bin_files)
bin_btn_layout.addWidget(clear_bin_btn)
bin_layout.addLayout(bin_btn_layout)
layout.addLayout(bin_layout)
# 출력 디렉토리
output_layout = QHBoxLayout()
output_layout.addWidget(QLabel("출력 폴더:"))
self.output_label = QLabel("BIN 파일과 같은 폴더")
output_layout.addWidget(self.output_label, 1)
output_btn = QPushButton("변경")
output_btn.clicked.connect(self.select_output_dir)
output_layout.addWidget(output_btn)
layout.addLayout(output_layout)
group.setLayout(layout)
return group
def create_bottom_widget(self) -> QWidget:
"""하단 로그/진행 영역 생성"""
widget = QWidget()
layout = QVBoxLayout(widget)
# 진행률
progress_layout = QHBoxLayout()
self.progress_label = QLabel("대기 중...")
progress_layout.addWidget(self.progress_label)
self.progress_bar = QProgressBar()
progress_layout.addWidget(self.progress_bar, 1)
self.stop_btn = QPushButton("중단")
self.stop_btn.setEnabled(False)
self.stop_btn.clicked.connect(self.stop_conversion)
progress_layout.addWidget(self.stop_btn)
layout.addLayout(progress_layout)
# 로그
log_label = QLabel("변환 로그:")
layout.addWidget(log_label)
self.log_text = QTextEdit()
self.log_text.setReadOnly(True)
self.log_text.setMaximumHeight(200)
layout.addWidget(self.log_text)
return widget
def apply_style(self):
"""스타일 적용"""
self.setStyleSheet("""
QMainWindow {
background-color: #f5f5f5;
}
QGroupBox {
font-weight: bold;
border: 2px solid #cccccc;
border-radius: 5px;
margin-top: 10px;
padding-top: 10px;
}
QGroupBox::title {
subcontrol-origin: margin;
left: 10px;
padding: 0 5px;
}
QPushButton {
background-color: #4CAF50;
color: white;
border: none;
padding: 8px;
border-radius: 4px;
font-weight: bold;
}
QPushButton:hover {
background-color: #45a049;
}
QPushButton:pressed {
background-color: #3d8b40;
}
QPushButton:disabled {
background-color: #cccccc;
color: #666666;
}
QRadioButton {
font-weight: normal;
}
QRadioButton::indicator {
width: 15px;
height: 15px;
}
""")
def select_dbc(self):
"""DBC 파일 선택"""
file_path, _ = QFileDialog.getOpenFileName(
self, "DBC 파일 선택", "", "DBC Files (*.dbc);;All Files (*)"
)
if file_path:
self.dbc_path = file_path
self.dbc_label.setText(Path(file_path).name)
self.dbc_label.setStyleSheet("color: green;")
self.log(f"✓ DBC 파일 선택: {Path(file_path).name}")
def auto_find_dbc(self):
"""현재 디렉토리에서 DBC 파일 자동 검색"""
dbc_files = list(Path.cwd().glob("*.dbc"))
if dbc_files:
self.dbc_path = str(dbc_files[0])
self.dbc_label.setText(dbc_files[0].name)
self.dbc_label.setStyleSheet("color: green;")
self.log(f"✓ DBC 파일 자동 검색: {dbc_files[0].name}")
else:
QMessageBox.warning(self, "경고", "현재 폴더에서 DBC 파일을 찾을 수 없습니다.")
def select_bin_files(self):
"""BIN 파일 선택"""
file_paths, _ = QFileDialog.getOpenFileNames(
self, "BIN 파일 선택", "", "BIN Files (*.bin);;All Files (*)"
)
if file_paths:
for file_path in file_paths:
if file_path not in self.bin_files:
self.bin_files.append(file_path)
self.bin_list.addItem(Path(file_path).name)
self.update_bin_count()
self.log(f"✓ BIN 파일 {len(file_paths)}개 추가")
def auto_find_bin_files(self):
"""현재 디렉토리에서 BIN 파일 자동 검색"""
bin_files = list(Path.cwd().glob("*.bin"))
if bin_files:
added = 0
for bin_file in bin_files:
file_path = str(bin_file)
if file_path not in self.bin_files:
self.bin_files.append(file_path)
self.bin_list.addItem(bin_file.name)
added += 1
self.update_bin_count()
self.log(f"✓ BIN 파일 {added}개 자동 검색")
else:
QMessageBox.warning(self, "경고", "현재 폴더에서 BIN 파일을 찾을 수 없습니다.")
def clear_bin_files(self):
"""BIN 파일 목록 지우기"""
self.bin_files.clear()
self.bin_list.clear()
self.update_bin_count()
self.log("목록 초기화")
def update_bin_count(self):
"""BIN 파일 개수 업데이트"""
count = len(self.bin_files)
self.bin_count_label.setText(f"{count}개 선택됨")
if count > 0:
self.bin_count_label.setStyleSheet("color: green; font-weight: bold;")
else:
self.bin_count_label.setStyleSheet("color: red;")
def select_output_dir(self):
"""출력 디렉토리 선택"""
dir_path = QFileDialog.getExistingDirectory(self, "출력 폴더 선택")
if dir_path:
self.output_dir = dir_path
self.output_label.setText(dir_path)
self.log(f"✓ 출력 폴더: {dir_path}")
def start_conversion(self, conversion_type: str):
"""변환 시작"""
# 검증
if not self.dbc_path:
QMessageBox.warning(self, "경고", "DBC 파일을 선택하세요.")
return
if not self.bin_files:
QMessageBox.warning(self, "경고", "BIN 파일을 선택하세요.")
return
# 출력 디렉토리 결정
output_dir = self.output_dir if self.output_dir else str(Path(self.bin_files[0]).parent)
# 옵션 수집
if conversion_type == 'csv':
# CSV 형식 결정
format_type = 'wide'
for fmt, radio in self.csv_format_radios.items():
if radio.isChecked():
format_type = fmt
break
options = {
'apply_value_table': self.csv_value_table_check.isChecked(),
'datetime_format': self.csv_datetime_check.isChecked()
}
else: # mdf
format_type = 'mdf'
# 압축 방식 결정
compression = 0
for comp, radio in self.mdf_compression_radios.items():
if radio.isChecked():
compression = comp
break
options = {
'compression': compression
}
# 로그 초기화
self.log_text.clear()
self.log("="*60)
self.log(f"{conversion_type.upper()} 변환 시작")
self.log("="*60)
self.log(f"DBC: {Path(self.dbc_path).name}")
self.log(f"BIN 파일: {len(self.bin_files)}")
self.log(f"출력: {output_dir}")
if conversion_type == 'csv':
self.log(f"형식: {format_type}")
else:
self.log(f"압축: {compression}")
self.log("")
# UI 상태 변경
self.csv_convert_btn.setEnabled(False)
self.mdf_convert_btn.setEnabled(False)
self.stop_btn.setEnabled(True)
self.progress_bar.setValue(0)
# 워커 스레드 시작
self.worker = ConversionWorker(
self.dbc_path,
self.bin_files,
output_dir,
format_type,
conversion_type,
options
)
self.worker.signals.progress.connect(self.on_progress)
self.worker.signals.log.connect(self.log)
self.worker.signals.finished.connect(self.on_finished)
self.worker.signals.error.connect(self.on_error)
self.worker.start()
def stop_conversion(self):
"""변환 중단"""
if self.worker:
self.worker.stop()
self.log("중단 요청...")
def on_progress(self, current: int, total: int, message: str):
"""진행 상황 업데이트"""
progress = int((current / total) * 100)
self.progress_bar.setValue(progress)
self.progress_label.setText(f"[{current}/{total}] {message}")
def on_finished(self, success: int, failed: int):
"""변환 완료"""
total = success + failed
self.log("")
self.log("="*60)
self.log("변환 완료!")
self.log("="*60)
self.log(f"전체: {total}")
self.log(f"성공: {success}")
self.log(f"실패: {failed}")
self.log("="*60)
# UI 상태 복원
self.csv_convert_btn.setEnabled(True)
self.mdf_convert_btn.setEnabled(True)
self.stop_btn.setEnabled(False)
self.progress_bar.setValue(100)
self.progress_label.setText("완료")
# 완료 메시지
QMessageBox.information(
self, "완료",
f"변환이 완료되었습니다.\n\n성공: {success}\n실패: {failed}"
)
def on_error(self, error_msg: str):
"""에러 발생"""
self.log(f"❌ 오류: {error_msg}")
# UI 상태 복원
self.csv_convert_btn.setEnabled(True)
self.mdf_convert_btn.setEnabled(True)
self.stop_btn.setEnabled(False)
QMessageBox.critical(self, "오류", f"변환 중 오류가 발생했습니다:\n{error_msg}")
def log(self, message: str):
"""로그 메시지 추가"""
self.log_text.append(message)
# 스크롤을 맨 아래로
cursor = self.log_text.textCursor()
cursor.movePosition(QTextCursor.End)
self.log_text.setTextCursor(cursor)
# ============================================================================
# Main Entry Point
# ============================================================================
def main():
app = QApplication(sys.argv)
# 어플리케이션 정보
app.setApplicationName("ESP32 CAN Logger Converter")
app.setOrganizationName("CAN Tools")
# 메인 윈도우
window = CANConverterGUI()
window.show()
sys.exit(app.exec_())
if __name__ == "__main__":
main()