diff --git a/can_converter_gui.py b/can_converter_gui.py new file mode 100644 index 0000000..4167b9d --- /dev/null +++ b/can_converter_gui.py @@ -0,0 +1,1191 @@ +#!/usr/bin/env python3 +""" +ESP32 CAN Logger GUI Converter (All-in-One) +bin 파일을 CSV 또는 MDF 형식으로 변환하는 GUI 프로그램 +""" + +import sys +import os +import struct +import csv +from pathlib import Path +from datetime import datetime +import threading +from typing import Dict, List, Optional + +import numpy as np +from cantools import database as candb +from asammdf import MDF, Signal + +from PyQt5.QtWidgets import ( + QApplication, QMainWindow, QWidget, QVBoxLayout, QHBoxLayout, + QPushButton, QLabel, QFileDialog, QComboBox, QTextEdit, + QProgressBar, QGroupBox, QRadioButton, QButtonGroup, + QCheckBox, QMessageBox, QTabWidget, QListWidget, QSplitter +) +from PyQt5.QtCore import Qt, pyqtSignal, QObject, QThread +from PyQt5.QtGui import QFont, QTextCursor, QIcon + + +# ============================================================================ +# CAN Message 구조체 +# ============================================================================ + +class CANMessage: + """ESP32 CAN Logger의 CAN 메시지 구조체""" + STRUCT_FORMAT = ' 'CANMessage': + """바이너리 데이터에서 CANMessage 파싱""" + timestamp_us, can_id, dlc, msg_data = struct.unpack(cls.STRUCT_FORMAT, data) + return cls(timestamp_us, can_id, dlc, msg_data) + + def __repr__(self): + return f"CANMessage(id=0x{self.can_id:X}, dlc={self.dlc}, ts={self.timestamp_us})" + + +# ============================================================================ +# CSV 변환기 +# ============================================================================ + +class CANBinToCSV: + """CAN bin 파일을 CSV로 변환하는 클래스""" + + def __init__(self, dbc_path: str): + """ + Args: + dbc_path: CAN Database (DBC) 파일 경로 + """ + self.db = candb.load_file(dbc_path) + print(f"✓ DBC 파일 로드: {dbc_path}") + print(f" - 메시지 수: {len(self.db.messages)}") + print(f" - 시그널 수: {sum(len(msg.signals) for msg in self.db.messages)}") + + def read_bin_file(self, bin_path: str) -> List[CANMessage]: + """ESP32 CAN Logger의 bin 파일 읽기""" + messages = [] + + with open(bin_path, 'rb') as f: + data = f.read() + + offset = 0 + while offset + CANMessage.STRUCT_SIZE <= len(data): + msg_bytes = data[offset:offset + CANMessage.STRUCT_SIZE] + try: + msg = CANMessage.from_bytes(msg_bytes) + messages.append(msg) + except Exception as e: + print(f"⚠️ 메시지 파싱 오류 at offset {offset}: {e}") + + offset += CANMessage.STRUCT_SIZE + + print(f"✓ bin 파일 읽기 완료: {bin_path}") + print(f" - 총 메시지: {len(messages)}") + + return messages + + def decode_messages(self, messages: List[CANMessage], + apply_value_table: bool = True) -> Dict[str, Dict[str, List]]: + """CAN 메시지를 DBC를 사용하여 디코딩""" + decoded_data = {} + + # 각 메시지별로 초기화 + for msg_def in self.db.messages: + msg_name = msg_def.name + decoded_data[msg_name] = {'timestamps': []} + for signal in msg_def.signals: + decoded_data[msg_name][signal.name] = [] + + decode_count = 0 + for msg in messages: + try: + msg_def = self.db.get_message_by_frame_id(msg.can_id) + msg_name = msg_def.name + + decoded = msg_def.decode(msg.data) + + # 타임스탬프 저장 (마이크로초 -> 초) + decoded_data[msg_name]['timestamps'].append(msg.timestamp_us / 1e6) + + # 각 시그널 값 저장 + for signal in msg_def.signals: + signal_name = signal.name + raw_value = decoded[signal_name] + + # Value Table 적용 + if apply_value_table and signal.choices: + try: + int_value = int(raw_value) + if int_value in signal.choices: + value = signal.choices[int_value] + else: + value = raw_value + except (ValueError, TypeError): + value = raw_value + else: + value = raw_value + + decoded_data[msg_name][signal_name].append(value) + + decode_count += 1 + + except KeyError: + continue + except Exception as e: + continue + + print(f"✓ 메시지 디코딩 완료: {decode_count}/{len(messages)} 개") + + # 빈 메시지 제거 + decoded_data = {k: v for k, v in decoded_data.items() if len(v['timestamps']) > 0} + + return decoded_data + + def to_wide_csv(self, decoded_data: Dict[str, Dict[str, List]], + output_path: str, + datetime_format: bool = True): + """Wide 형식 CSV 생성 (각 시그널이 별도 컬럼)""" + # 모든 고유한 타임스탬프 수집 및 정렬 + all_timestamps = set() + for msg_data in decoded_data.values(): + all_timestamps.update(msg_data['timestamps']) + + all_timestamps = sorted(all_timestamps) + + # 각 타임스탬프에 대한 시그널 값 매핑 + signal_columns = [] + signal_data = {} + + for msg_name, msg_data in decoded_data.items(): + for signal_name, values in msg_data.items(): + if signal_name == 'timestamps': + continue + + col_name = f"{msg_name}.{signal_name}" + signal_columns.append(col_name) + + # 타임스탬프-값 매핑 생성 + timestamp_to_value = {} + for ts, val in zip(msg_data['timestamps'], values): + timestamp_to_value[ts] = val + + signal_data[col_name] = timestamp_to_value + + # CSV 작성 + with open(output_path, 'w', newline='', encoding='utf-8') as f: + writer = csv.writer(f) + + # 헤더 작성 + if datetime_format: + header = ['Timestamp', 'DateTime'] + signal_columns + else: + header = ['Timestamp'] + signal_columns + writer.writerow(header) + + # 데이터 작성 + for ts in all_timestamps: + if datetime_format: + dt = datetime.fromtimestamp(ts).strftime('%Y-%m-%d %H:%M:%S.%f')[:-3] + row = [f"{ts:.6f}", dt] + else: + row = [f"{ts:.6f}"] + + for col_name in signal_columns: + value = signal_data[col_name].get(ts, '') + row.append(value) + + writer.writerow(row) + + print(f"✓ Wide CSV 생성 완료: {output_path}") + print(f" - 행 수: {len(all_timestamps)}") + print(f" - 시그널 수: {len(signal_columns)}") + + def to_long_csv(self, decoded_data: Dict[str, Dict[str, List]], + output_path: str, + datetime_format: bool = True): + """Long 형식 CSV 생성 (각 시그널 값이 별도 행)""" + with open(output_path, 'w', newline='', encoding='utf-8') as f: + writer = csv.writer(f) + + # 헤더 작성 + if datetime_format: + header = ['Timestamp', 'DateTime', 'Message', 'Signal', 'Value'] + else: + header = ['Timestamp', 'Message', 'Signal', 'Value'] + writer.writerow(header) + + # 데이터 작성 + row_count = 0 + for msg_name, msg_data in decoded_data.items(): + timestamps = msg_data['timestamps'] + + for signal_name, values in msg_data.items(): + if signal_name == 'timestamps': + continue + + for ts, val in zip(timestamps, values): + if datetime_format: + dt = datetime.fromtimestamp(ts).strftime('%Y-%m-%d %H:%M:%S.%f')[:-3] + row = [f"{ts:.6f}", dt, msg_name, signal_name, val] + else: + row = [f"{ts:.6f}", msg_name, signal_name, val] + + writer.writerow(row) + row_count += 1 + + print(f"✓ Long CSV 생성 완료: {output_path}") + print(f" - 행 수: {row_count}") + + def to_message_csv(self, decoded_data: Dict[str, Dict[str, List]], + output_dir: str, + datetime_format: bool = True): + """메시지별 CSV 생성 (각 CAN 메시지가 별도 CSV 파일)""" + output_path = Path(output_dir) + output_path.mkdir(parents=True, exist_ok=True) + + file_count = 0 + for msg_name, msg_data in decoded_data.items(): + # 파일명에 사용할 수 없는 문자 제거 + safe_name = msg_name.replace('/', '_').replace('\\', '_') + csv_path = output_path / f"{safe_name}.csv" + + with open(csv_path, 'w', newline='', encoding='utf-8') as f: + writer = csv.writer(f) + + # 헤더 작성 + signal_names = [k for k in msg_data.keys() if k != 'timestamps'] + if datetime_format: + header = ['Timestamp', 'DateTime'] + signal_names + else: + header = ['Timestamp'] + signal_names + writer.writerow(header) + + # 데이터 작성 + timestamps = msg_data['timestamps'] + for i, ts in enumerate(timestamps): + if datetime_format: + dt = datetime.fromtimestamp(ts).strftime('%Y-%m-%d %H:%M:%S.%f')[:-3] + row = [f"{ts:.6f}", dt] + else: + row = [f"{ts:.6f}"] + + for signal_name in signal_names: + row.append(msg_data[signal_name][i]) + + writer.writerow(row) + + file_count += 1 + + print(f"✓ 메시지별 CSV 생성 완료: {output_dir}") + print(f" - 파일 수: {file_count}") + + def to_raw_csv(self, messages: List[CANMessage], output_path: str): + """원본 CAN 메시지 CSV 생성 (디코딩 없이 raw 데이터)""" + with open(output_path, 'w', newline='', encoding='utf-8') as f: + writer = csv.writer(f) + + # 헤더 + writer.writerow(['Timestamp', 'DateTime', 'CAN_ID', 'DLC', 'Data']) + + # 데이터 + for msg in messages: + ts = msg.timestamp_us / 1e6 + dt = datetime.fromtimestamp(ts).strftime('%Y-%m-%d %H:%M:%S.%f')[:-3] + can_id = f"0x{msg.can_id:03X}" + data_hex = ' '.join([f"{b:02X}" for b in msg.data]) + + writer.writerow([f"{ts:.6f}", dt, can_id, msg.dlc, data_hex]) + + print(f"✓ Raw CSV 생성 완료: {output_path}") + print(f" - 행 수: {len(messages)}") + + def convert(self, bin_path: str, output_path: str, + format: str = 'wide', + apply_value_table: bool = True, + datetime_format: bool = True): + """bin 파일을 CSV로 변환 (전체 프로세스)""" + print(f"\n{'='*60}") + print(f"ESP32 CAN bin → CSV 변환") + print(f"{'='*60}") + + # 1. bin 파일 읽기 + messages = self.read_bin_file(bin_path) + + if not messages: + print("✗ 메시지가 없습니다.") + return + + # 시간 범위 출력 + start_time = messages[0].timestamp_us / 1e6 + end_time = messages[-1].timestamp_us / 1e6 + duration = end_time - start_time + + print(f" - 시작 시간: {datetime.fromtimestamp(start_time)}") + print(f" - 종료 시간: {datetime.fromtimestamp(end_time)}") + print(f" - 기간: {duration:.2f} 초") + + # Raw 형식은 디코딩 없이 바로 변환 + if format == 'raw': + self.to_raw_csv(messages, output_path) + else: + # 2. 메시지 디코딩 + decoded_data = self.decode_messages(messages, apply_value_table) + + if not decoded_data: + print("✗ 디코딩된 데이터가 없습니다.") + return + + # 3. CSV 생성 + if format == 'wide': + self.to_wide_csv(decoded_data, output_path, datetime_format) + elif format == 'long': + self.to_long_csv(decoded_data, output_path, datetime_format) + elif format == 'message': + self.to_message_csv(decoded_data, output_path, datetime_format) + else: + print(f"✗ 알 수 없는 형식: {format}") + return + + print(f"{'='*60}") + print(f"✓ 변환 완료!") + print(f"{'='*60}\n") + + +# ============================================================================ +# MDF 변환기 +# ============================================================================ + +class CANBinToMDF: + """CAN bin 파일을 MDF4로 변환하는 클래스""" + + def __init__(self, dbc_path: str): + """ + Args: + dbc_path: CAN Database (DBC) 파일 경로 + """ + self.db = candb.load_file(dbc_path) + print(f"✓ DBC 파일 로드: {dbc_path}") + print(f" - 메시지 수: {len(self.db.messages)}") + print(f" - 시그널 수: {sum(len(msg.signals) for msg in self.db.messages)}") + + def read_bin_file(self, bin_path: str) -> List[CANMessage]: + """ESP32 CAN Logger의 bin 파일 읽기""" + messages = [] + + with open(bin_path, 'rb') as f: + data = f.read() + + offset = 0 + while offset + CANMessage.STRUCT_SIZE <= len(data): + msg_bytes = data[offset:offset + CANMessage.STRUCT_SIZE] + try: + msg = CANMessage.from_bytes(msg_bytes) + messages.append(msg) + except Exception as e: + print(f"⚠️ 메시지 파싱 오류 at offset {offset}: {e}") + + offset += CANMessage.STRUCT_SIZE + + print(f"✓ bin 파일 읽기 완료: {bin_path}") + print(f" - 총 메시지: {len(messages)}") + + return messages + + def decode_messages(self, messages: List[CANMessage]) -> Dict[str, Dict[str, List]]: + """CAN 메시지를 DBC를 사용하여 디코딩""" + decoded_data = {} + + # 각 메시지별로 초기화 + for msg_def in self.db.messages: + msg_name = msg_def.name + decoded_data[msg_name] = {'timestamps': []} + for signal in msg_def.signals: + decoded_data[msg_name][signal.name] = [] + + decode_count = 0 + for msg in messages: + try: + msg_def = self.db.get_message_by_frame_id(msg.can_id) + msg_name = msg_def.name + + decoded = msg_def.decode(msg.data) + + # 타임스탬프 저장 (마이크로초 -> 초) + decoded_data[msg_name]['timestamps'].append(msg.timestamp_us / 1e6) + + # 각 시그널 값 저장 + for signal in msg_def.signals: + signal_name = signal.name + raw_value = decoded[signal_name] + decoded_data[msg_name][signal_name].append(raw_value) + + decode_count += 1 + + except KeyError: + continue + except Exception as e: + continue + + print(f"✓ 메시지 디코딩 완료: {decode_count}/{len(messages)} 개") + + # 빈 메시지 제거 + decoded_data = {k: v for k, v in decoded_data.items() if len(v['timestamps']) > 0} + + return decoded_data + + def to_mdf(self, decoded_data: Dict[str, Dict[str, List]], + output_path: str, + compression: int = 0): + """MDF4 파일 생성""" + mdf = MDF(version='4.10') + + signal_count = 0 + for msg_name, msg_data in decoded_data.items(): + timestamps = np.array(msg_data['timestamps']) + + for signal_name, values in msg_data.items(): + if signal_name == 'timestamps': + continue + + # DBC에서 시그널 정보 가져오기 + try: + msg_def = self.db.get_message_by_name(msg_name) + signal_def = None + for sig in msg_def.signals: + if sig.name == signal_name: + signal_def = sig + break + + if signal_def is None: + continue + + # numpy 배열로 변환 + samples = np.array(values) + + # Signal 객체 생성 + signal = Signal( + samples=samples, + timestamps=timestamps, + name=f"{msg_name}.{signal_name}", + unit=signal_def.unit or '', + comment=signal_def.comment or '' + ) + + mdf.append(signal) + signal_count += 1 + + except Exception as e: + print(f"⚠️ 시그널 추가 실패 ({msg_name}.{signal_name}): {e}") + continue + + # 파일 저장 + compression_map = { + 0: 0, # no compression + 1: 1, # deflate + 2: 2 # transposed + deflate + } + + mdf.save( + output_path, + compression=compression_map.get(compression, 0), + overwrite=True + ) + + print(f"✓ MDF4 파일 생성 완료: {output_path}") + print(f" - 시그널 수: {signal_count}") + + # 파일 크기 출력 + file_size = Path(output_path).stat().st_size + file_size_mb = file_size / (1024 * 1024) + print(f" - 파일 크기: {file_size_mb:.2f} MB") + + def convert(self, bin_path: str, output_path: str, compression: int = 0): + """bin 파일을 MDF4로 변환 (전체 프로세스)""" + print(f"\n{'='*60}") + print(f"ESP32 CAN bin → MDF4 변환") + print(f"{'='*60}") + + # 1. bin 파일 읽기 + messages = self.read_bin_file(bin_path) + + if not messages: + print("✗ 메시지가 없습니다.") + return + + # 시간 범위 출력 + start_time = messages[0].timestamp_us / 1e6 + end_time = messages[-1].timestamp_us / 1e6 + duration = end_time - start_time + + print(f" - 시작 시간: {datetime.fromtimestamp(start_time)}") + print(f" - 종료 시간: {datetime.fromtimestamp(end_time)}") + print(f" - 기간: {duration:.2f} 초") + + # 2. 메시지 디코딩 + decoded_data = self.decode_messages(messages) + + if not decoded_data: + print("✗ 디코딩된 데이터가 없습니다.") + return + + # 3. MDF 파일 생성 + self.to_mdf(decoded_data, output_path, compression) + + print(f"{'='*60}") + print(f"✓ 변환 완료!") + print(f"{'='*60}\n") + + +# ============================================================================ +# GUI Worker Thread +# ============================================================================ + +class WorkerSignals(QObject): + """워커 스레드 시그널""" + progress = pyqtSignal(int, int, str) # current, total, message + log = pyqtSignal(str) # log message + finished = pyqtSignal(int, int) # success, failed + error = pyqtSignal(str) # error message + + +class ConversionWorker(QThread): + """변환 작업을 수행하는 워커 스레드""" + + def __init__(self, dbc_path: str, bin_files: List[str], + output_dir: str, format_type: str, + conversion_type: str, options: dict): + super().__init__() + self.dbc_path = dbc_path + self.bin_files = bin_files + self.output_dir = output_dir + self.format_type = format_type + self.conversion_type = conversion_type + self.options = options + self.signals = WorkerSignals() + self._is_running = True + + def stop(self): + """작업 중단""" + self._is_running = False + + def run(self): + """변환 작업 실행""" + success_count = 0 + failed_count = 0 + total = len(self.bin_files) + + try: + # 변환기 초기화 + if self.conversion_type == 'csv': + converter = CANBinToCSV(self.dbc_path) + else: # mdf + converter = CANBinToMDF(self.dbc_path) + + self.signals.log.emit(f"✓ DBC 파일 로드: {self.dbc_path}") + + # 각 파일 변환 + for idx, bin_file in enumerate(self.bin_files): + if not self._is_running: + self.signals.log.emit("❌ 사용자에 의해 중단됨") + break + + current = idx + 1 + self.signals.progress.emit(current, total, f"Converting {Path(bin_file).name}") + self.signals.log.emit(f"\n[{current}/{total}] {Path(bin_file).name}") + + try: + # 출력 파일 경로 결정 + bin_path = Path(bin_file) + + if self.conversion_type == 'csv': + if self.format_type == 'message': + output_path = Path(self.output_dir) / bin_path.stem + else: + output_path = Path(self.output_dir) / f"{bin_path.stem}.csv" + + converter.convert( + str(bin_path), + str(output_path), + format=self.format_type, + apply_value_table=self.options.get('apply_value_table', True), + datetime_format=self.options.get('datetime_format', True) + ) + else: # mdf + output_path = Path(self.output_dir) / f"{bin_path.stem}.mf4" + + converter.convert( + str(bin_path), + str(output_path), + compression=self.options.get('compression', 0) + ) + + success_count += 1 + self.signals.log.emit(f" ✓ 완료: {output_path.name}") + + except Exception as e: + failed_count += 1 + self.signals.log.emit(f" ✗ 실패: {str(e)}") + + self.signals.finished.emit(success_count, failed_count) + + except Exception as e: + self.signals.error.emit(str(e)) + + +# ============================================================================ +# GUI Main Window +# ============================================================================ + +class CANConverterGUI(QMainWindow): + """CAN Bin to CSV/MDF 변환 GUI""" + + def __init__(self): + super().__init__() + self.dbc_path = None + self.bin_files = [] + self.output_dir = None + self.worker = None + + self.init_ui() + + def init_ui(self): + """UI 초기화""" + self.setWindowTitle("ESP32 CAN Logger Converter v1.0") + self.setGeometry(100, 100, 1000, 700) + + # 메인 위젯 + main_widget = QWidget() + self.setCentralWidget(main_widget) + main_layout = QVBoxLayout(main_widget) + + # 탭 위젯 + tab_widget = QTabWidget() + main_layout.addWidget(tab_widget) + + # CSV 변환 탭 + csv_tab = self.create_csv_tab() + tab_widget.addTab(csv_tab, "CSV 변환") + + # MDF 변환 탭 + mdf_tab = self.create_mdf_tab() + tab_widget.addTab(mdf_tab, "MDF 변환") + + # 하단 로그/진행 영역 + bottom_widget = self.create_bottom_widget() + main_layout.addWidget(bottom_widget) + + # 스타일 적용 + self.apply_style() + + def create_csv_tab(self) -> QWidget: + """CSV 변환 탭 생성""" + tab = QWidget() + layout = QVBoxLayout(tab) + + # 파일 선택 영역 + file_group = self.create_file_selection_group() + layout.addWidget(file_group) + + # CSV 형식 선택 + format_group = QGroupBox("CSV 형식 선택") + format_layout = QVBoxLayout() + + self.csv_format_group = QButtonGroup() + + formats = [ + ("wide", "Wide Format (Excel-friendly)", + "모든 시그널이 컬럼으로 나열 - Excel 분석에 최적"), + ("long", "Long Format (Database-friendly)", + "각 시그널 값이 별도 행 - 데이터베이스 저장에 적합"), + ("message", "Message Format (Separate files)", + "메시지별 개별 CSV 파일 생성 - 메시지별 분석"), + ("raw", "Raw Format (No decoding)", + "원본 CAN 프레임 - 디코딩 없음") + ] + + self.csv_format_radios = {} + for idx, (value, title, desc) in enumerate(formats): + radio = QRadioButton(title) + if idx == 0: + radio.setChecked(True) + self.csv_format_group.addButton(radio, idx) + self.csv_format_radios[value] = radio + + format_layout.addWidget(radio) + + desc_label = QLabel(f" → {desc}") + desc_label.setStyleSheet("color: gray; font-size: 10px;") + format_layout.addWidget(desc_label) + + format_group.setLayout(format_layout) + layout.addWidget(format_group) + + # CSV 옵션 + option_group = QGroupBox("옵션") + option_layout = QVBoxLayout() + + self.csv_value_table_check = QCheckBox("Value Table 적용 (숫자 → 텍스트 변환)") + self.csv_value_table_check.setChecked(True) + option_layout.addWidget(self.csv_value_table_check) + + self.csv_datetime_check = QCheckBox("날짜/시간 컬럼 포함") + self.csv_datetime_check.setChecked(True) + option_layout.addWidget(self.csv_datetime_check) + + option_group.setLayout(option_layout) + layout.addWidget(option_group) + + # 변환 버튼 + self.csv_convert_btn = QPushButton("CSV 변환 시작") + self.csv_convert_btn.setMinimumHeight(50) + self.csv_convert_btn.clicked.connect(lambda: self.start_conversion('csv')) + layout.addWidget(self.csv_convert_btn) + + layout.addStretch() + + return tab + + def create_mdf_tab(self) -> QWidget: + """MDF 변환 탭 생성""" + tab = QWidget() + layout = QVBoxLayout(tab) + + # 파일 선택 영역 (공유) + info_label = QLabel("※ 파일 선택은 상단 '파일 선택' 영역을 사용하세요") + info_label.setStyleSheet("color: blue; font-weight: bold;") + layout.addWidget(info_label) + + # 압축 방식 선택 + compression_group = QGroupBox("압축 방식 선택") + compression_layout = QVBoxLayout() + + self.mdf_compression_group = QButtonGroup() + + compressions = [ + (0, "압축 없음 (No compression)", + "읽기 속도: ★★★★★ | 파일 크기: 171MB | 즉시 분석용"), + (1, "Deflate 압축 (Balanced)", + "읽기 속도: ★★★☆☆ | 파일 크기: 3.6MB (98% 압축) | 일반 사용"), + (2, "Transposed 압축 (Maximum)", + "읽기 속도: ★☆☆☆☆ | 파일 크기: 1.2MB (99% 압축) | 장기 보관용") + ] + + self.mdf_compression_radios = {} + for idx, (value, title, desc) in enumerate(compressions): + radio = QRadioButton(title) + if idx == 0: + radio.setChecked(True) + self.mdf_compression_group.addButton(radio, value) + self.mdf_compression_radios[value] = radio + + compression_layout.addWidget(radio) + + desc_label = QLabel(f" → {desc}") + desc_label.setStyleSheet("color: gray; font-size: 10px;") + compression_layout.addWidget(desc_label) + + compression_group.setLayout(compression_layout) + layout.addWidget(compression_group) + + # MDF 정보 + info_group = QGroupBox("MDF4 형식 정보") + info_layout = QVBoxLayout() + + info_text = QLabel( + "• MDF4는 자동차 업계 표준 측정 데이터 형식입니다\n" + "• CANoe, CANalyzer, Vector Tools에서 직접 열 수 있습니다\n" + "• 압축률이 매우 높아 저장 공간을 절약할 수 있습니다\n" + "• 모든 시그널 정보, 단위, 설명이 포함됩니다" + ) + info_layout.addWidget(info_text) + + info_group.setLayout(info_layout) + layout.addWidget(info_group) + + # 변환 버튼 + self.mdf_convert_btn = QPushButton("MDF 변환 시작") + self.mdf_convert_btn.setMinimumHeight(50) + self.mdf_convert_btn.clicked.connect(lambda: self.start_conversion('mdf')) + layout.addWidget(self.mdf_convert_btn) + + layout.addStretch() + + return tab + + def create_file_selection_group(self) -> QGroupBox: + """파일 선택 영역 생성""" + group = QGroupBox("파일 선택") + layout = QVBoxLayout() + + # DBC 파일 + dbc_layout = QHBoxLayout() + dbc_layout.addWidget(QLabel("DBC 파일:")) + self.dbc_label = QLabel("선택되지 않음") + self.dbc_label.setStyleSheet("color: red;") + dbc_layout.addWidget(self.dbc_label, 1) + dbc_btn = QPushButton("찾아보기") + dbc_btn.clicked.connect(self.select_dbc) + dbc_layout.addWidget(dbc_btn) + auto_dbc_btn = QPushButton("자동 검색") + auto_dbc_btn.clicked.connect(self.auto_find_dbc) + dbc_layout.addWidget(auto_dbc_btn) + layout.addLayout(dbc_layout) + + # BIN 파일 목록 + bin_layout = QVBoxLayout() + bin_label_layout = QHBoxLayout() + bin_label_layout.addWidget(QLabel("BIN 파일:")) + self.bin_count_label = QLabel("0개 선택됨") + bin_label_layout.addWidget(self.bin_count_label) + bin_label_layout.addStretch() + bin_layout.addLayout(bin_label_layout) + + self.bin_list = QListWidget() + self.bin_list.setMaximumHeight(100) + bin_layout.addWidget(self.bin_list) + + bin_btn_layout = QHBoxLayout() + add_bin_btn = QPushButton("BIN 파일 추가") + add_bin_btn.clicked.connect(self.select_bin_files) + bin_btn_layout.addWidget(add_bin_btn) + + auto_bin_btn = QPushButton("자동 검색") + auto_bin_btn.clicked.connect(self.auto_find_bin_files) + bin_btn_layout.addWidget(auto_bin_btn) + + clear_bin_btn = QPushButton("목록 지우기") + clear_bin_btn.clicked.connect(self.clear_bin_files) + bin_btn_layout.addWidget(clear_bin_btn) + bin_layout.addLayout(bin_btn_layout) + + layout.addLayout(bin_layout) + + # 출력 디렉토리 + output_layout = QHBoxLayout() + output_layout.addWidget(QLabel("출력 폴더:")) + self.output_label = QLabel("BIN 파일과 같은 폴더") + output_layout.addWidget(self.output_label, 1) + output_btn = QPushButton("변경") + output_btn.clicked.connect(self.select_output_dir) + output_layout.addWidget(output_btn) + layout.addLayout(output_layout) + + group.setLayout(layout) + return group + + def create_bottom_widget(self) -> QWidget: + """하단 로그/진행 영역 생성""" + widget = QWidget() + layout = QVBoxLayout(widget) + + # 진행률 + progress_layout = QHBoxLayout() + self.progress_label = QLabel("대기 중...") + progress_layout.addWidget(self.progress_label) + + self.progress_bar = QProgressBar() + progress_layout.addWidget(self.progress_bar, 1) + + self.stop_btn = QPushButton("중단") + self.stop_btn.setEnabled(False) + self.stop_btn.clicked.connect(self.stop_conversion) + progress_layout.addWidget(self.stop_btn) + + layout.addLayout(progress_layout) + + # 로그 + log_label = QLabel("변환 로그:") + layout.addWidget(log_label) + + self.log_text = QTextEdit() + self.log_text.setReadOnly(True) + self.log_text.setMaximumHeight(200) + layout.addWidget(self.log_text) + + return widget + + def apply_style(self): + """스타일 적용""" + self.setStyleSheet(""" + QMainWindow { + background-color: #f5f5f5; + } + QGroupBox { + font-weight: bold; + border: 2px solid #cccccc; + border-radius: 5px; + margin-top: 10px; + padding-top: 10px; + } + QGroupBox::title { + subcontrol-origin: margin; + left: 10px; + padding: 0 5px; + } + QPushButton { + background-color: #4CAF50; + color: white; + border: none; + padding: 8px; + border-radius: 4px; + font-weight: bold; + } + QPushButton:hover { + background-color: #45a049; + } + QPushButton:pressed { + background-color: #3d8b40; + } + QPushButton:disabled { + background-color: #cccccc; + color: #666666; + } + QRadioButton { + font-weight: normal; + } + QRadioButton::indicator { + width: 15px; + height: 15px; + } + """) + + def select_dbc(self): + """DBC 파일 선택""" + file_path, _ = QFileDialog.getOpenFileName( + self, "DBC 파일 선택", "", "DBC Files (*.dbc);;All Files (*)" + ) + if file_path: + self.dbc_path = file_path + self.dbc_label.setText(Path(file_path).name) + self.dbc_label.setStyleSheet("color: green;") + self.log(f"✓ DBC 파일 선택: {Path(file_path).name}") + + def auto_find_dbc(self): + """현재 디렉토리에서 DBC 파일 자동 검색""" + dbc_files = list(Path.cwd().glob("*.dbc")) + if dbc_files: + self.dbc_path = str(dbc_files[0]) + self.dbc_label.setText(dbc_files[0].name) + self.dbc_label.setStyleSheet("color: green;") + self.log(f"✓ DBC 파일 자동 검색: {dbc_files[0].name}") + else: + QMessageBox.warning(self, "경고", "현재 폴더에서 DBC 파일을 찾을 수 없습니다.") + + def select_bin_files(self): + """BIN 파일 선택""" + file_paths, _ = QFileDialog.getOpenFileNames( + self, "BIN 파일 선택", "", "BIN Files (*.bin);;All Files (*)" + ) + if file_paths: + for file_path in file_paths: + if file_path not in self.bin_files: + self.bin_files.append(file_path) + self.bin_list.addItem(Path(file_path).name) + + self.update_bin_count() + self.log(f"✓ BIN 파일 {len(file_paths)}개 추가") + + def auto_find_bin_files(self): + """현재 디렉토리에서 BIN 파일 자동 검색""" + bin_files = list(Path.cwd().glob("*.bin")) + if bin_files: + added = 0 + for bin_file in bin_files: + file_path = str(bin_file) + if file_path not in self.bin_files: + self.bin_files.append(file_path) + self.bin_list.addItem(bin_file.name) + added += 1 + + self.update_bin_count() + self.log(f"✓ BIN 파일 {added}개 자동 검색") + else: + QMessageBox.warning(self, "경고", "현재 폴더에서 BIN 파일을 찾을 수 없습니다.") + + def clear_bin_files(self): + """BIN 파일 목록 지우기""" + self.bin_files.clear() + self.bin_list.clear() + self.update_bin_count() + self.log("목록 초기화") + + def update_bin_count(self): + """BIN 파일 개수 업데이트""" + count = len(self.bin_files) + self.bin_count_label.setText(f"{count}개 선택됨") + if count > 0: + self.bin_count_label.setStyleSheet("color: green; font-weight: bold;") + else: + self.bin_count_label.setStyleSheet("color: red;") + + def select_output_dir(self): + """출력 디렉토리 선택""" + dir_path = QFileDialog.getExistingDirectory(self, "출력 폴더 선택") + if dir_path: + self.output_dir = dir_path + self.output_label.setText(dir_path) + self.log(f"✓ 출력 폴더: {dir_path}") + + def start_conversion(self, conversion_type: str): + """변환 시작""" + # 검증 + if not self.dbc_path: + QMessageBox.warning(self, "경고", "DBC 파일을 선택하세요.") + return + + if not self.bin_files: + QMessageBox.warning(self, "경고", "BIN 파일을 선택하세요.") + return + + # 출력 디렉토리 결정 + output_dir = self.output_dir if self.output_dir else str(Path(self.bin_files[0]).parent) + + # 옵션 수집 + if conversion_type == 'csv': + # CSV 형식 결정 + format_type = 'wide' + for fmt, radio in self.csv_format_radios.items(): + if radio.isChecked(): + format_type = fmt + break + + options = { + 'apply_value_table': self.csv_value_table_check.isChecked(), + 'datetime_format': self.csv_datetime_check.isChecked() + } + else: # mdf + format_type = 'mdf' + + # 압축 방식 결정 + compression = 0 + for comp, radio in self.mdf_compression_radios.items(): + if radio.isChecked(): + compression = comp + break + + options = { + 'compression': compression + } + + # 로그 초기화 + self.log_text.clear() + self.log("="*60) + self.log(f"{conversion_type.upper()} 변환 시작") + self.log("="*60) + self.log(f"DBC: {Path(self.dbc_path).name}") + self.log(f"BIN 파일: {len(self.bin_files)}개") + self.log(f"출력: {output_dir}") + if conversion_type == 'csv': + self.log(f"형식: {format_type}") + else: + self.log(f"압축: {compression}") + self.log("") + + # UI 상태 변경 + self.csv_convert_btn.setEnabled(False) + self.mdf_convert_btn.setEnabled(False) + self.stop_btn.setEnabled(True) + self.progress_bar.setValue(0) + + # 워커 스레드 시작 + self.worker = ConversionWorker( + self.dbc_path, + self.bin_files, + output_dir, + format_type, + conversion_type, + options + ) + + self.worker.signals.progress.connect(self.on_progress) + self.worker.signals.log.connect(self.log) + self.worker.signals.finished.connect(self.on_finished) + self.worker.signals.error.connect(self.on_error) + + self.worker.start() + + def stop_conversion(self): + """변환 중단""" + if self.worker: + self.worker.stop() + self.log("중단 요청...") + + def on_progress(self, current: int, total: int, message: str): + """진행 상황 업데이트""" + progress = int((current / total) * 100) + self.progress_bar.setValue(progress) + self.progress_label.setText(f"[{current}/{total}] {message}") + + def on_finished(self, success: int, failed: int): + """변환 완료""" + total = success + failed + self.log("") + self.log("="*60) + self.log("변환 완료!") + self.log("="*60) + self.log(f"전체: {total}") + self.log(f"성공: {success}") + self.log(f"실패: {failed}") + self.log("="*60) + + # UI 상태 복원 + self.csv_convert_btn.setEnabled(True) + self.mdf_convert_btn.setEnabled(True) + self.stop_btn.setEnabled(False) + self.progress_bar.setValue(100) + self.progress_label.setText("완료") + + # 완료 메시지 + QMessageBox.information( + self, "완료", + f"변환이 완료되었습니다.\n\n성공: {success}\n실패: {failed}" + ) + + def on_error(self, error_msg: str): + """에러 발생""" + self.log(f"❌ 오류: {error_msg}") + + # UI 상태 복원 + self.csv_convert_btn.setEnabled(True) + self.mdf_convert_btn.setEnabled(True) + self.stop_btn.setEnabled(False) + + QMessageBox.critical(self, "오류", f"변환 중 오류가 발생했습니다:\n{error_msg}") + + def log(self, message: str): + """로그 메시지 추가""" + self.log_text.append(message) + # 스크롤을 맨 아래로 + cursor = self.log_text.textCursor() + cursor.movePosition(QTextCursor.End) + self.log_text.setTextCursor(cursor) + + +# ============================================================================ +# Main Entry Point +# ============================================================================ + +def main(): + app = QApplication(sys.argv) + + # 어플리케이션 정보 + app.setApplicationName("ESP32 CAN Logger Converter") + app.setOrganizationName("CAN Tools") + + # 메인 윈도우 + window = CANConverterGUI() + window.show() + + sys.exit(app.exec_()) + + +if __name__ == "__main__": + main()