#!/usr/bin/env python3 """ ESP32 CAN Logger GUI Converter (All-in-One) bin 파일을 CSV 또는 MDF 형식으로 변환하는 GUI 프로그램 """ import sys import os import struct import csv from pathlib import Path from datetime import datetime import threading from typing import Dict, List, Optional import numpy as np from cantools import database as candb from asammdf import MDF, Signal from PyQt5.QtWidgets import ( QApplication, QMainWindow, QWidget, QVBoxLayout, QHBoxLayout, QPushButton, QLabel, QFileDialog, QComboBox, QTextEdit, QProgressBar, QGroupBox, QRadioButton, QButtonGroup, QCheckBox, QMessageBox, QTabWidget, QListWidget, QSplitter ) from PyQt5.QtCore import Qt, pyqtSignal, QObject, QThread from PyQt5.QtGui import QFont, QTextCursor, QIcon # ============================================================================ # CAN Message 구조체 # ============================================================================ class CANMessage: """ESP32 CAN Logger의 CAN 메시지 구조체""" STRUCT_FORMAT = ' 'CANMessage': """바이너리 데이터에서 CANMessage 파싱""" timestamp_us, can_id, dlc, msg_data = struct.unpack(cls.STRUCT_FORMAT, data) return cls(timestamp_us, can_id, dlc, msg_data) def __repr__(self): return f"CANMessage(id=0x{self.can_id:X}, dlc={self.dlc}, ts={self.timestamp_us})" # ============================================================================ # CSV 변환기 # ============================================================================ class CANBinToCSV: """CAN bin 파일을 CSV로 변환하는 클래스""" def __init__(self, dbc_path: str): """ Args: dbc_path: CAN Database (DBC) 파일 경로 """ self.db = candb.load_file(dbc_path) print(f"✓ DBC 파일 로드: {dbc_path}") print(f" - 메시지 수: {len(self.db.messages)}") print(f" - 시그널 수: {sum(len(msg.signals) for msg in self.db.messages)}") def read_bin_file(self, bin_path: str) -> List[CANMessage]: """ESP32 CAN Logger의 bin 파일 읽기""" messages = [] with open(bin_path, 'rb') as f: data = f.read() offset = 0 while offset + CANMessage.STRUCT_SIZE <= len(data): msg_bytes = data[offset:offset + CANMessage.STRUCT_SIZE] try: msg = CANMessage.from_bytes(msg_bytes) messages.append(msg) except Exception as e: print(f"⚠️ 메시지 파싱 오류 at offset {offset}: {e}") offset += CANMessage.STRUCT_SIZE print(f"✓ bin 파일 읽기 완료: {bin_path}") print(f" - 총 메시지: {len(messages)}") return messages def decode_messages(self, messages: List[CANMessage], apply_value_table: bool = True) -> Dict[str, Dict[str, List]]: """CAN 메시지를 DBC를 사용하여 디코딩""" decoded_data = {} # 각 메시지별로 초기화 for msg_def in self.db.messages: msg_name = msg_def.name decoded_data[msg_name] = {'timestamps': []} for signal in msg_def.signals: decoded_data[msg_name][signal.name] = [] decode_count = 0 for msg in messages: try: msg_def = self.db.get_message_by_frame_id(msg.can_id) msg_name = msg_def.name decoded = msg_def.decode(msg.data) # 타임스탬프 저장 (마이크로초 -> 초) decoded_data[msg_name]['timestamps'].append(msg.timestamp_us / 1e6) # 각 시그널 값 저장 for signal in msg_def.signals: signal_name = signal.name raw_value = decoded[signal_name] # Value Table 적용 if apply_value_table and signal.choices: try: int_value = int(raw_value) if int_value in signal.choices: value = signal.choices[int_value] else: value = raw_value except (ValueError, TypeError): value = raw_value else: value = raw_value decoded_data[msg_name][signal_name].append(value) decode_count += 1 except KeyError: continue except Exception as e: continue print(f"✓ 메시지 디코딩 완료: {decode_count}/{len(messages)} 개") # 빈 메시지 제거 decoded_data = {k: v for k, v in decoded_data.items() if len(v['timestamps']) > 0} return decoded_data def to_wide_csv(self, decoded_data: Dict[str, Dict[str, List]], output_path: str, datetime_format: bool = True): """Wide 형식 CSV 생성 (각 시그널이 별도 컬럼)""" # 모든 고유한 타임스탬프 수집 및 정렬 all_timestamps = set() for msg_data in decoded_data.values(): all_timestamps.update(msg_data['timestamps']) all_timestamps = sorted(all_timestamps) # 각 타임스탬프에 대한 시그널 값 매핑 signal_columns = [] signal_data = {} for msg_name, msg_data in decoded_data.items(): for signal_name, values in msg_data.items(): if signal_name == 'timestamps': continue col_name = f"{msg_name}.{signal_name}" signal_columns.append(col_name) # 타임스탬프-값 매핑 생성 timestamp_to_value = {} for ts, val in zip(msg_data['timestamps'], values): timestamp_to_value[ts] = val signal_data[col_name] = timestamp_to_value # CSV 작성 with open(output_path, 'w', newline='', encoding='utf-8') as f: writer = csv.writer(f) # 헤더 작성 if datetime_format: header = ['Timestamp', 'DateTime'] + signal_columns else: header = ['Timestamp'] + signal_columns writer.writerow(header) # 데이터 작성 for ts in all_timestamps: if datetime_format: dt = datetime.fromtimestamp(ts).strftime('%Y-%m-%d %H:%M:%S.%f')[:-3] row = [f"{ts:.6f}", dt] else: row = [f"{ts:.6f}"] for col_name in signal_columns: value = signal_data[col_name].get(ts, '') row.append(value) writer.writerow(row) print(f"✓ Wide CSV 생성 완료: {output_path}") print(f" - 행 수: {len(all_timestamps)}") print(f" - 시그널 수: {len(signal_columns)}") def to_long_csv(self, decoded_data: Dict[str, Dict[str, List]], output_path: str, datetime_format: bool = True): """Long 형식 CSV 생성 (각 시그널 값이 별도 행)""" with open(output_path, 'w', newline='', encoding='utf-8') as f: writer = csv.writer(f) # 헤더 작성 if datetime_format: header = ['Timestamp', 'DateTime', 'Message', 'Signal', 'Value'] else: header = ['Timestamp', 'Message', 'Signal', 'Value'] writer.writerow(header) # 데이터 작성 row_count = 0 for msg_name, msg_data in decoded_data.items(): timestamps = msg_data['timestamps'] for signal_name, values in msg_data.items(): if signal_name == 'timestamps': continue for ts, val in zip(timestamps, values): if datetime_format: dt = datetime.fromtimestamp(ts).strftime('%Y-%m-%d %H:%M:%S.%f')[:-3] row = [f"{ts:.6f}", dt, msg_name, signal_name, val] else: row = [f"{ts:.6f}", msg_name, signal_name, val] writer.writerow(row) row_count += 1 print(f"✓ Long CSV 생성 완료: {output_path}") print(f" - 행 수: {row_count}") def to_message_csv(self, decoded_data: Dict[str, Dict[str, List]], output_dir: str, datetime_format: bool = True): """메시지별 CSV 생성 (각 CAN 메시지가 별도 CSV 파일)""" output_path = Path(output_dir) output_path.mkdir(parents=True, exist_ok=True) file_count = 0 for msg_name, msg_data in decoded_data.items(): # 파일명에 사용할 수 없는 문자 제거 safe_name = msg_name.replace('/', '_').replace('\\', '_') csv_path = output_path / f"{safe_name}.csv" with open(csv_path, 'w', newline='', encoding='utf-8') as f: writer = csv.writer(f) # 헤더 작성 signal_names = [k for k in msg_data.keys() if k != 'timestamps'] if datetime_format: header = ['Timestamp', 'DateTime'] + signal_names else: header = ['Timestamp'] + signal_names writer.writerow(header) # 데이터 작성 timestamps = msg_data['timestamps'] for i, ts in enumerate(timestamps): if datetime_format: dt = datetime.fromtimestamp(ts).strftime('%Y-%m-%d %H:%M:%S.%f')[:-3] row = [f"{ts:.6f}", dt] else: row = [f"{ts:.6f}"] for signal_name in signal_names: row.append(msg_data[signal_name][i]) writer.writerow(row) file_count += 1 print(f"✓ 메시지별 CSV 생성 완료: {output_dir}") print(f" - 파일 수: {file_count}") def to_raw_csv(self, messages: List[CANMessage], output_path: str): """원본 CAN 메시지 CSV 생성 (디코딩 없이 raw 데이터)""" with open(output_path, 'w', newline='', encoding='utf-8') as f: writer = csv.writer(f) # 헤더 writer.writerow(['Timestamp', 'DateTime', 'CAN_ID', 'DLC', 'Data']) # 데이터 for msg in messages: ts = msg.timestamp_us / 1e6 dt = datetime.fromtimestamp(ts).strftime('%Y-%m-%d %H:%M:%S.%f')[:-3] can_id = f"0x{msg.can_id:03X}" data_hex = ' '.join([f"{b:02X}" for b in msg.data]) writer.writerow([f"{ts:.6f}", dt, can_id, msg.dlc, data_hex]) print(f"✓ Raw CSV 생성 완료: {output_path}") print(f" - 행 수: {len(messages)}") def convert(self, bin_path: str, output_path: str, format: str = 'wide', apply_value_table: bool = True, datetime_format: bool = True): """bin 파일을 CSV로 변환 (전체 프로세스)""" print(f"\n{'='*60}") print(f"ESP32 CAN bin → CSV 변환") print(f"{'='*60}") # 1. bin 파일 읽기 messages = self.read_bin_file(bin_path) if not messages: print("✗ 메시지가 없습니다.") return # 시간 범위 출력 start_time = messages[0].timestamp_us / 1e6 end_time = messages[-1].timestamp_us / 1e6 duration = end_time - start_time print(f" - 시작 시간: {datetime.fromtimestamp(start_time)}") print(f" - 종료 시간: {datetime.fromtimestamp(end_time)}") print(f" - 기간: {duration:.2f} 초") # Raw 형식은 디코딩 없이 바로 변환 if format == 'raw': self.to_raw_csv(messages, output_path) else: # 2. 메시지 디코딩 decoded_data = self.decode_messages(messages, apply_value_table) if not decoded_data: print("✗ 디코딩된 데이터가 없습니다.") return # 3. CSV 생성 if format == 'wide': self.to_wide_csv(decoded_data, output_path, datetime_format) elif format == 'long': self.to_long_csv(decoded_data, output_path, datetime_format) elif format == 'message': self.to_message_csv(decoded_data, output_path, datetime_format) else: print(f"✗ 알 수 없는 형식: {format}") return print(f"{'='*60}") print(f"✓ 변환 완료!") print(f"{'='*60}\n") # ============================================================================ # MDF 변환기 # ============================================================================ # ============================================================================ # MDF 변환기 (수정된 버전) # ============================================================================ class CANBinToMDF: """CAN bin 파일을 MDF4로 변환하는 클래스""" def __init__(self, dbc_path: str): """ Args: dbc_path: CAN Database (DBC) 파일 경로 """ self.db = candb.load_file(dbc_path) print(f"✓ DBC 파일 로드: {dbc_path}") print(f" - 메시지 수: {len(self.db.messages)}") print(f" - 시그널 수: {sum(len(msg.signals) for msg in self.db.messages)}") def read_bin_file(self, bin_path: str) -> List[CANMessage]: """ESP32 CAN Logger의 bin 파일 읽기""" messages = [] with open(bin_path, 'rb') as f: data = f.read() offset = 0 while offset + CANMessage.STRUCT_SIZE <= len(data): msg_bytes = data[offset:offset + CANMessage.STRUCT_SIZE] try: msg = CANMessage.from_bytes(msg_bytes) messages.append(msg) except Exception as e: print(f"⚠️ 메시지 파싱 오류 at offset {offset}: {e}") offset += CANMessage.STRUCT_SIZE print(f"✓ bin 파일 읽기 완료: {bin_path}") print(f" - 총 메시지: {len(messages)}") return messages def decode_messages(self, messages: List[CANMessage]) -> tuple: """ CAN 메시지를 DBC를 사용하여 디코딩 Returns: (decoded_data, start_time): 디코딩된 데이터와 시작 시간 """ decoded_data = {} # 시작 시간 계산 (첫 번째 메시지의 타임스탬프) start_time = messages[0].timestamp_us / 1e6 if messages else 0 # 각 메시지별로 초기화 for msg_def in self.db.messages: msg_name = msg_def.name decoded_data[msg_name] = {'timestamps': []} for signal in msg_def.signals: decoded_data[msg_name][signal.name] = [] decode_count = 0 for msg in messages: try: msg_def = self.db.get_message_by_frame_id(msg.can_id) msg_name = msg_def.name decoded = msg_def.decode(msg.data) # 타임스탬프를 상대 시간으로 변환 (시작 시간을 0으로) relative_time = (msg.timestamp_us / 1e6) - start_time decoded_data[msg_name]['timestamps'].append(relative_time) # 각 시그널 값 저장 for signal in msg_def.signals: signal_name = signal.name raw_value = decoded[signal_name] decoded_data[msg_name][signal_name].append(raw_value) decode_count += 1 except KeyError: continue except Exception as e: continue print(f"✓ 메시지 디코딩 완료: {decode_count}/{len(messages)} 개") print(f" - 시작 시간: {datetime.fromtimestamp(start_time)}") print(f" - 상대 시간으로 변환: 0.000초 부터 시작") # 빈 메시지 제거 decoded_data = {k: v for k, v in decoded_data.items() if len(v['timestamps']) > 0} return decoded_data, start_time def to_mdf(self, decoded_data: Dict[str, Dict[str, List]], output_path: str, compression: int = 0, start_time: float = 0): """ MDF4 파일 생성 Args: start_time: 원본 시작 시간 (메타데이터 저장용) """ mdf = MDF(version='4.10') # 메타데이터에 원본 시작 시간 기록 if start_time > 0: mdf.header.start_time = datetime.fromtimestamp(start_time) print(f" - 원본 시작 시간을 메타데이터에 저장: {mdf.header.start_time}") signal_count = 0 for msg_name, msg_data in decoded_data.items(): timestamps = np.array(msg_data['timestamps']) for signal_name, values in msg_data.items(): if signal_name == 'timestamps': continue # DBC에서 시그널 정보 가져오기 try: msg_def = self.db.get_message_by_name(msg_name) signal_def = None for sig in msg_def.signals: if sig.name == signal_name: signal_def = sig break if signal_def is None: continue # numpy 배열로 변환 samples = np.array(values) # Signal 객체 생성 signal = Signal( samples=samples, timestamps=timestamps, name=f"{msg_name}.{signal_name}", unit=signal_def.unit or '', comment=signal_def.comment or '' ) mdf.append(signal) signal_count += 1 except Exception as e: print(f"⚠️ 시그널 추가 실패 ({msg_name}.{signal_name}): {e}") continue # 파일 저장 compression_map = { 0: 0, # no compression 1: 1, # deflate 2: 2 # transposed + deflate } mdf.save( output_path, compression=compression_map.get(compression, 0), overwrite=True ) print(f"✓ MDF4 파일 생성 완료: {output_path}") print(f" - 시그널 수: {signal_count}") print(f" - 시간 범위: 0.000초 ~ {timestamps[-1]:.3f}초") # 파일 크기 출력 file_size = Path(output_path).stat().st_size file_size_mb = file_size / (1024 * 1024) print(f" - 파일 크기: {file_size_mb:.2f} MB") def convert(self, bin_path: str, output_path: str, compression: int = 0): """bin 파일을 MDF4로 변환 (전체 프로세스)""" print(f"\n{'='*60}") print(f"ESP32 CAN bin → MDF4 변환") print(f"{'='*60}") # 1. bin 파일 읽기 messages = self.read_bin_file(bin_path) if not messages: print("✗ 메시지가 없습니다.") return # 시간 범위 출력 (절대 시간) start_time = messages[0].timestamp_us / 1e6 end_time = messages[-1].timestamp_us / 1e6 duration = end_time - start_time print(f" - 절대 시작 시간: {datetime.fromtimestamp(start_time)}") print(f" - 절대 종료 시간: {datetime.fromtimestamp(end_time)}") print(f" - 기간: {duration:.2f} 초") # 2. 메시지 디코딩 (상대 시간으로 변환) decoded_data, abs_start_time = self.decode_messages(messages) if not decoded_data: print("✗ 디코딩된 데이터가 없습니다.") return # 3. MDF 파일 생성 (상대 시간 사용) self.to_mdf(decoded_data, output_path, compression, abs_start_time) print(f"{'='*60}") print(f"✓ 변환 완료!") print(f"{'='*60}\n") # ============================================================================ # GUI Worker Thread # ============================================================================ class WorkerSignals(QObject): """워커 스레드 시그널""" progress = pyqtSignal(int, int, str) # current, total, message log = pyqtSignal(str) # log message finished = pyqtSignal(int, int) # success, failed error = pyqtSignal(str) # error message class ConversionWorker(QThread): """변환 작업을 수행하는 워커 스레드""" def __init__(self, dbc_path: str, bin_files: List[str], output_dir: str, format_type: str, conversion_type: str, options: dict): super().__init__() self.dbc_path = dbc_path self.bin_files = bin_files self.output_dir = output_dir self.format_type = format_type self.conversion_type = conversion_type self.options = options self.signals = WorkerSignals() self._is_running = True def stop(self): """작업 중단""" self._is_running = False def run(self): """변환 작업 실행""" success_count = 0 failed_count = 0 total = len(self.bin_files) try: # 변환기 초기화 if self.conversion_type == 'csv': converter = CANBinToCSV(self.dbc_path) else: # mdf converter = CANBinToMDF(self.dbc_path) self.signals.log.emit(f"✓ DBC 파일 로드: {self.dbc_path}") # 각 파일 변환 for idx, bin_file in enumerate(self.bin_files): if not self._is_running: self.signals.log.emit("❌ 사용자에 의해 중단됨") break current = idx + 1 self.signals.progress.emit(current, total, f"Converting {Path(bin_file).name}") self.signals.log.emit(f"\n[{current}/{total}] {Path(bin_file).name}") try: # 출력 파일 경로 결정 bin_path = Path(bin_file) if self.conversion_type == 'csv': if self.format_type == 'message': output_path = Path(self.output_dir) / bin_path.stem else: output_path = Path(self.output_dir) / f"{bin_path.stem}.csv" converter.convert( str(bin_path), str(output_path), format=self.format_type, apply_value_table=self.options.get('apply_value_table', True), datetime_format=self.options.get('datetime_format', True) ) else: # mdf output_path = Path(self.output_dir) / f"{bin_path.stem}.mf4" converter.convert( str(bin_path), str(output_path), compression=self.options.get('compression', 0) ) success_count += 1 self.signals.log.emit(f" ✓ 완료: {output_path.name}") except Exception as e: failed_count += 1 self.signals.log.emit(f" ✗ 실패: {str(e)}") self.signals.finished.emit(success_count, failed_count) except Exception as e: self.signals.error.emit(str(e)) # ============================================================================ # GUI Main Window # ============================================================================ class CANConverterGUI(QMainWindow): """CAN Bin to CSV/MDF 변환 GUI""" def __init__(self): super().__init__() self.dbc_path = None self.bin_files = [] self.output_dir = None self.worker = None self.init_ui() def init_ui(self): """UI 초기화""" self.setWindowTitle("ESP32 CAN Logger Converter v1.0") self.setGeometry(100, 100, 1000, 700) # 메인 위젯 main_widget = QWidget() self.setCentralWidget(main_widget) main_layout = QVBoxLayout(main_widget) # 탭 위젯 tab_widget = QTabWidget() main_layout.addWidget(tab_widget) # CSV 변환 탭 csv_tab = self.create_csv_tab() tab_widget.addTab(csv_tab, "CSV 변환") # MDF 변환 탭 mdf_tab = self.create_mdf_tab() tab_widget.addTab(mdf_tab, "MDF 변환") # 하단 로그/진행 영역 bottom_widget = self.create_bottom_widget() main_layout.addWidget(bottom_widget) # 스타일 적용 self.apply_style() def create_csv_tab(self) -> QWidget: """CSV 변환 탭 생성""" tab = QWidget() layout = QVBoxLayout(tab) # 파일 선택 영역 file_group = self.create_file_selection_group() layout.addWidget(file_group) # CSV 형식 선택 format_group = QGroupBox("CSV 형식 선택") format_layout = QVBoxLayout() self.csv_format_group = QButtonGroup() formats = [ ("wide", "Wide Format (Excel-friendly)", "모든 시그널이 컬럼으로 나열 - Excel 분석에 최적"), ("long", "Long Format (Database-friendly)", "각 시그널 값이 별도 행 - 데이터베이스 저장에 적합"), ("message", "Message Format (Separate files)", "메시지별 개별 CSV 파일 생성 - 메시지별 분석"), ("raw", "Raw Format (No decoding)", "원본 CAN 프레임 - 디코딩 없음") ] self.csv_format_radios = {} for idx, (value, title, desc) in enumerate(formats): radio = QRadioButton(title) if idx == 0: radio.setChecked(True) self.csv_format_group.addButton(radio, idx) self.csv_format_radios[value] = radio format_layout.addWidget(radio) desc_label = QLabel(f" → {desc}") desc_label.setStyleSheet("color: gray; font-size: 10px;") format_layout.addWidget(desc_label) format_group.setLayout(format_layout) layout.addWidget(format_group) # CSV 옵션 option_group = QGroupBox("옵션") option_layout = QVBoxLayout() self.csv_value_table_check = QCheckBox("Value Table 적용 (숫자 → 텍스트 변환)") self.csv_value_table_check.setChecked(True) option_layout.addWidget(self.csv_value_table_check) self.csv_datetime_check = QCheckBox("날짜/시간 컬럼 포함") self.csv_datetime_check.setChecked(True) option_layout.addWidget(self.csv_datetime_check) option_group.setLayout(option_layout) layout.addWidget(option_group) # 변환 버튼 self.csv_convert_btn = QPushButton("CSV 변환 시작") self.csv_convert_btn.setMinimumHeight(50) self.csv_convert_btn.clicked.connect(lambda: self.start_conversion('csv')) layout.addWidget(self.csv_convert_btn) layout.addStretch() return tab def create_mdf_tab(self) -> QWidget: """MDF 변환 탭 생성""" tab = QWidget() layout = QVBoxLayout(tab) # 파일 선택 영역 (공유) info_label = QLabel("※ 파일 선택은 상단 '파일 선택' 영역을 사용하세요") info_label.setStyleSheet("color: blue; font-weight: bold;") layout.addWidget(info_label) # 압축 방식 선택 compression_group = QGroupBox("압축 방식 선택") compression_layout = QVBoxLayout() self.mdf_compression_group = QButtonGroup() compressions = [ (0, "압축 없음 (No compression)", "읽기 속도: ★★★★★ | 파일 크기: 171MB | 즉시 분석용"), (1, "Deflate 압축 (Balanced)", "읽기 속도: ★★★☆☆ | 파일 크기: 3.6MB (98% 압축) | 일반 사용"), (2, "Transposed 압축 (Maximum)", "읽기 속도: ★☆☆☆☆ | 파일 크기: 1.2MB (99% 압축) | 장기 보관용") ] self.mdf_compression_radios = {} for idx, (value, title, desc) in enumerate(compressions): radio = QRadioButton(title) if idx == 0: radio.setChecked(True) self.mdf_compression_group.addButton(radio, value) self.mdf_compression_radios[value] = radio compression_layout.addWidget(radio) desc_label = QLabel(f" → {desc}") desc_label.setStyleSheet("color: gray; font-size: 10px;") compression_layout.addWidget(desc_label) compression_group.setLayout(compression_layout) layout.addWidget(compression_group) # MDF 정보 info_group = QGroupBox("MDF4 형식 정보") info_layout = QVBoxLayout() info_text = QLabel( "• MDF4는 자동차 업계 표준 측정 데이터 형식입니다\n" "• CANoe, CANalyzer, Vector Tools에서 직접 열 수 있습니다\n" "• 압축률이 매우 높아 저장 공간을 절약할 수 있습니다\n" "• 모든 시그널 정보, 단위, 설명이 포함됩니다" ) info_layout.addWidget(info_text) info_group.setLayout(info_layout) layout.addWidget(info_group) # 변환 버튼 self.mdf_convert_btn = QPushButton("MDF 변환 시작") self.mdf_convert_btn.setMinimumHeight(50) self.mdf_convert_btn.clicked.connect(lambda: self.start_conversion('mdf')) layout.addWidget(self.mdf_convert_btn) layout.addStretch() return tab def create_file_selection_group(self) -> QGroupBox: """파일 선택 영역 생성""" group = QGroupBox("파일 선택") layout = QVBoxLayout() # DBC 파일 dbc_layout = QHBoxLayout() dbc_layout.addWidget(QLabel("DBC 파일:")) self.dbc_label = QLabel("선택되지 않음") self.dbc_label.setStyleSheet("color: red;") dbc_layout.addWidget(self.dbc_label, 1) dbc_btn = QPushButton("찾아보기") dbc_btn.clicked.connect(self.select_dbc) dbc_layout.addWidget(dbc_btn) auto_dbc_btn = QPushButton("자동 검색") auto_dbc_btn.clicked.connect(self.auto_find_dbc) dbc_layout.addWidget(auto_dbc_btn) layout.addLayout(dbc_layout) # BIN 파일 목록 bin_layout = QVBoxLayout() bin_label_layout = QHBoxLayout() bin_label_layout.addWidget(QLabel("BIN 파일:")) self.bin_count_label = QLabel("0개 선택됨") bin_label_layout.addWidget(self.bin_count_label) bin_label_layout.addStretch() bin_layout.addLayout(bin_label_layout) self.bin_list = QListWidget() self.bin_list.setMaximumHeight(100) bin_layout.addWidget(self.bin_list) bin_btn_layout = QHBoxLayout() add_bin_btn = QPushButton("BIN 파일 추가") add_bin_btn.clicked.connect(self.select_bin_files) bin_btn_layout.addWidget(add_bin_btn) auto_bin_btn = QPushButton("자동 검색") auto_bin_btn.clicked.connect(self.auto_find_bin_files) bin_btn_layout.addWidget(auto_bin_btn) clear_bin_btn = QPushButton("목록 지우기") clear_bin_btn.clicked.connect(self.clear_bin_files) bin_btn_layout.addWidget(clear_bin_btn) bin_layout.addLayout(bin_btn_layout) layout.addLayout(bin_layout) # 출력 디렉토리 output_layout = QHBoxLayout() output_layout.addWidget(QLabel("출력 폴더:")) self.output_label = QLabel("BIN 파일과 같은 폴더") output_layout.addWidget(self.output_label, 1) output_btn = QPushButton("변경") output_btn.clicked.connect(self.select_output_dir) output_layout.addWidget(output_btn) layout.addLayout(output_layout) group.setLayout(layout) return group def create_bottom_widget(self) -> QWidget: """하단 로그/진행 영역 생성""" widget = QWidget() layout = QVBoxLayout(widget) # 진행률 progress_layout = QHBoxLayout() self.progress_label = QLabel("대기 중...") progress_layout.addWidget(self.progress_label) self.progress_bar = QProgressBar() progress_layout.addWidget(self.progress_bar, 1) self.stop_btn = QPushButton("중단") self.stop_btn.setEnabled(False) self.stop_btn.clicked.connect(self.stop_conversion) progress_layout.addWidget(self.stop_btn) layout.addLayout(progress_layout) # 로그 log_label = QLabel("변환 로그:") layout.addWidget(log_label) self.log_text = QTextEdit() self.log_text.setReadOnly(True) self.log_text.setMaximumHeight(200) layout.addWidget(self.log_text) return widget def apply_style(self): """스타일 적용""" self.setStyleSheet(""" QMainWindow { background-color: #f5f5f5; } QGroupBox { font-weight: bold; border: 2px solid #cccccc; border-radius: 5px; margin-top: 10px; padding-top: 10px; } QGroupBox::title { subcontrol-origin: margin; left: 10px; padding: 0 5px; } QPushButton { background-color: #4CAF50; color: white; border: none; padding: 8px; border-radius: 4px; font-weight: bold; } QPushButton:hover { background-color: #45a049; } QPushButton:pressed { background-color: #3d8b40; } QPushButton:disabled { background-color: #cccccc; color: #666666; } QRadioButton { font-weight: normal; } QRadioButton::indicator { width: 15px; height: 15px; } """) def select_dbc(self): """DBC 파일 선택""" file_path, _ = QFileDialog.getOpenFileName( self, "DBC 파일 선택", "", "DBC Files (*.dbc);;All Files (*)" ) if file_path: self.dbc_path = file_path self.dbc_label.setText(Path(file_path).name) self.dbc_label.setStyleSheet("color: green;") self.log(f"✓ DBC 파일 선택: {Path(file_path).name}") def auto_find_dbc(self): """현재 디렉토리에서 DBC 파일 자동 검색""" dbc_files = list(Path.cwd().glob("*.dbc")) if dbc_files: self.dbc_path = str(dbc_files[0]) self.dbc_label.setText(dbc_files[0].name) self.dbc_label.setStyleSheet("color: green;") self.log(f"✓ DBC 파일 자동 검색: {dbc_files[0].name}") else: QMessageBox.warning(self, "경고", "현재 폴더에서 DBC 파일을 찾을 수 없습니다.") def select_bin_files(self): """BIN 파일 선택""" file_paths, _ = QFileDialog.getOpenFileNames( self, "BIN 파일 선택", "", "BIN Files (*.bin);;All Files (*)" ) if file_paths: for file_path in file_paths: if file_path not in self.bin_files: self.bin_files.append(file_path) self.bin_list.addItem(Path(file_path).name) self.update_bin_count() self.log(f"✓ BIN 파일 {len(file_paths)}개 추가") def auto_find_bin_files(self): """현재 디렉토리에서 BIN 파일 자동 검색""" bin_files = list(Path.cwd().glob("*.bin")) if bin_files: added = 0 for bin_file in bin_files: file_path = str(bin_file) if file_path not in self.bin_files: self.bin_files.append(file_path) self.bin_list.addItem(bin_file.name) added += 1 self.update_bin_count() self.log(f"✓ BIN 파일 {added}개 자동 검색") else: QMessageBox.warning(self, "경고", "현재 폴더에서 BIN 파일을 찾을 수 없습니다.") def clear_bin_files(self): """BIN 파일 목록 지우기""" self.bin_files.clear() self.bin_list.clear() self.update_bin_count() self.log("목록 초기화") def update_bin_count(self): """BIN 파일 개수 업데이트""" count = len(self.bin_files) self.bin_count_label.setText(f"{count}개 선택됨") if count > 0: self.bin_count_label.setStyleSheet("color: green; font-weight: bold;") else: self.bin_count_label.setStyleSheet("color: red;") def select_output_dir(self): """출력 디렉토리 선택""" dir_path = QFileDialog.getExistingDirectory(self, "출력 폴더 선택") if dir_path: self.output_dir = dir_path self.output_label.setText(dir_path) self.log(f"✓ 출력 폴더: {dir_path}") def start_conversion(self, conversion_type: str): """변환 시작""" # 검증 if not self.dbc_path: QMessageBox.warning(self, "경고", "DBC 파일을 선택하세요.") return if not self.bin_files: QMessageBox.warning(self, "경고", "BIN 파일을 선택하세요.") return # 출력 디렉토리 결정 output_dir = self.output_dir if self.output_dir else str(Path(self.bin_files[0]).parent) # 옵션 수집 if conversion_type == 'csv': # CSV 형식 결정 format_type = 'wide' for fmt, radio in self.csv_format_radios.items(): if radio.isChecked(): format_type = fmt break options = { 'apply_value_table': self.csv_value_table_check.isChecked(), 'datetime_format': self.csv_datetime_check.isChecked() } else: # mdf format_type = 'mdf' # 압축 방식 결정 compression = 0 for comp, radio in self.mdf_compression_radios.items(): if radio.isChecked(): compression = comp break options = { 'compression': compression } # 로그 초기화 self.log_text.clear() self.log("="*60) self.log(f"{conversion_type.upper()} 변환 시작") self.log("="*60) self.log(f"DBC: {Path(self.dbc_path).name}") self.log(f"BIN 파일: {len(self.bin_files)}개") self.log(f"출력: {output_dir}") if conversion_type == 'csv': self.log(f"형식: {format_type}") else: self.log(f"압축: {compression}") self.log("") # UI 상태 변경 self.csv_convert_btn.setEnabled(False) self.mdf_convert_btn.setEnabled(False) self.stop_btn.setEnabled(True) self.progress_bar.setValue(0) # 워커 스레드 시작 self.worker = ConversionWorker( self.dbc_path, self.bin_files, output_dir, format_type, conversion_type, options ) self.worker.signals.progress.connect(self.on_progress) self.worker.signals.log.connect(self.log) self.worker.signals.finished.connect(self.on_finished) self.worker.signals.error.connect(self.on_error) self.worker.start() def stop_conversion(self): """변환 중단""" if self.worker: self.worker.stop() self.log("중단 요청...") def on_progress(self, current: int, total: int, message: str): """진행 상황 업데이트""" progress = int((current / total) * 100) self.progress_bar.setValue(progress) self.progress_label.setText(f"[{current}/{total}] {message}") def on_finished(self, success: int, failed: int): """변환 완료""" total = success + failed self.log("") self.log("="*60) self.log("변환 완료!") self.log("="*60) self.log(f"전체: {total}") self.log(f"성공: {success}") self.log(f"실패: {failed}") self.log("="*60) # UI 상태 복원 self.csv_convert_btn.setEnabled(True) self.mdf_convert_btn.setEnabled(True) self.stop_btn.setEnabled(False) self.progress_bar.setValue(100) self.progress_label.setText("완료") # 완료 메시지 QMessageBox.information( self, "완료", f"변환이 완료되었습니다.\n\n성공: {success}\n실패: {failed}" ) def on_error(self, error_msg: str): """에러 발생""" self.log(f"❌ 오류: {error_msg}") # UI 상태 복원 self.csv_convert_btn.setEnabled(True) self.mdf_convert_btn.setEnabled(True) self.stop_btn.setEnabled(False) QMessageBox.critical(self, "오류", f"변환 중 오류가 발생했습니다:\n{error_msg}") def log(self, message: str): """로그 메시지 추가""" self.log_text.append(message) # 스크롤을 맨 아래로 cursor = self.log_text.textCursor() cursor.movePosition(QTextCursor.End) self.log_text.setTextCursor(cursor) # ============================================================================ # Main Entry Point # ============================================================================ def main(): app = QApplication(sys.argv) # 어플리케이션 정보 app.setApplicationName("ESP32 CAN Logger Converter") app.setOrganizationName("CAN Tools") # 메인 윈도우 window = CANConverterGUI() window.show() sys.exit(app.exec_()) if __name__ == "__main__": main()