diff --git a/can_converter_gui.py b/can_converter_gui.py index c864c78..dd7fc10 100644 --- a/can_converter_gui.py +++ b/can_converter_gui.py @@ -103,12 +103,16 @@ class CANBinToCSV: # 시작 시간 계산 start_time = messages[0].timestamp_us / 1e6 if messages else 0 - # 각 메시지별로 초기화 + # 각 메시지별로 초기화 - 빈 리스트로 시작 for msg_def in self.db.messages: msg_name = msg_def.name - decoded_data[msg_name] = {'timestamps': []} + decoded_data[msg_name] = {} for signal in msg_def.signals: - decoded_data[msg_name][signal.name] = [] + # timestamps와 values를 함께 저장하는 구조로 변경 + decoded_data[msg_name][signal.name] = { + 'timestamps': [], + 'values': [] + } decode_count = 0 for msg in messages: @@ -121,20 +125,16 @@ class CANBinToCSV: # 상대 시간으로 변환 relative_time = (msg.timestamp_us / 1e6) - start_time - decoded_data[msg_name]['timestamps'].append(relative_time) # 각 시그널 값 저장 (디코딩된 시그널만 저장) for signal in msg_def.signals: signal_name = signal.name # decoded에 존재하는 시그널만 처리 - # (multiplexed 시그널은 조건에 맞을 때만 decoded에 포함됨) if signal_name in decoded: raw_value = decoded[signal_name] - decoded_data[msg_name][signal_name].append(raw_value) - else: - # 디코딩되지 않은 시그널은 None 추가 - decoded_data[msg_name][signal_name].append(None) + decoded_data[msg_name][signal_name]['timestamps'].append(relative_time) + decoded_data[msg_name][signal_name]['values'].append(raw_value) decode_count += 1 @@ -147,10 +147,28 @@ class CANBinToCSV: print(f" - 시작 시간: {datetime.fromtimestamp(start_time)}") print(f" - 상대 시간으로 변환: 0.000초 부터 시작") - # 빈 메시지 제거 - decoded_data = {k: v for k, v in decoded_data.items() if len(v['timestamps']) > 0} + # 빈 시그널 제거 및 통계 출력 + cleaned_data = {} + for msg_name, signals in decoded_data.items(): + has_data = False + cleaned_signals = {} + + for sig_name, sig_data in signals.items(): + if len(sig_data['values']) > 0: + cleaned_signals[sig_name] = sig_data + has_data = True + + if has_data: + cleaned_data[msg_name] = cleaned_signals + + # Multiplexed 시그널 통계 출력 + if msg_name == 'EMS2': + print(f"\n [EMS2 Multiplexed Signals]") + for sig_name, sig_data in cleaned_signals.items(): + print(f" - {sig_name}: {len(sig_data['values'])} samples") - return decoded_data, start_time + return cleaned_data, start_time + def to_wide_csv(self, decoded_data: Dict[str, Dict[str, List]], @@ -413,17 +431,12 @@ class CANBinToMDF: Returns: (decoded_data, start_time): 디코딩된 데이터와 시작 시간 """ - decoded_data = {} - - # 시작 시간 계산 (첫 번째 메시지의 타임스탬프) + # 시작 시간 계산 start_time = messages[0].timestamp_us / 1e6 if messages else 0 - # 각 메시지별로 초기화 - for msg_def in self.db.messages: - msg_name = msg_def.name - decoded_data[msg_name] = {'timestamps': []} - for signal in msg_def.signals: - decoded_data[msg_name][signal.name] = [] + # 각 시그널별로 timestamps와 values를 별도로 저장 + # 구조: {msg_name: {sig_name: {'timestamps': [], 'values': []}}} + decoded_data = {} decode_count = 0 for msg in messages: @@ -431,17 +444,26 @@ class CANBinToMDF: msg_def = self.db.get_message_by_frame_id(msg.can_id) msg_name = msg_def.name + # 메시지 디코딩 decoded = msg_def.decode(msg.data) - # 타임스탬프를 상대 시간으로 변환 (시작 시간을 0으로) + # 상대 시간으로 변환 relative_time = (msg.timestamp_us / 1e6) - start_time - decoded_data[msg_name]['timestamps'].append(relative_time) - # 각 시그널 값 저장 - for signal in msg_def.signals: - signal_name = signal.name - raw_value = decoded[signal_name] - decoded_data[msg_name][signal_name].append(raw_value) + # 메시지가 처음 등장하면 초기화 + if msg_name not in decoded_data: + decoded_data[msg_name] = {} + + # 디코딩된 시그널만 저장 + for signal_name, value in decoded.items(): + if signal_name not in decoded_data[msg_name]: + decoded_data[msg_name][signal_name] = { + 'timestamps': [], + 'values': [] + } + + decoded_data[msg_name][signal_name]['timestamps'].append(relative_time) + decoded_data[msg_name][signal_name]['values'].append(value) decode_count += 1 @@ -454,12 +476,17 @@ class CANBinToMDF: print(f" - 시작 시간: {datetime.fromtimestamp(start_time)}") print(f" - 상대 시간으로 변환: 0.000초 부터 시작") - # 빈 메시지 제거 - decoded_data = {k: v for k, v in decoded_data.items() if len(v['timestamps']) > 0} + # 통계 출력 (EMS2 multiplexed 시그널 확인) + if 'EMS2' in decoded_data: + print(f"\n [EMS2 Multiplexed Signals]") + for sig_name in ['CAN_VERS', 'CONF_TCU', 'OBD_FRF_ACK', 'TQ_STND']: + if sig_name in decoded_data['EMS2']: + count = len(decoded_data['EMS2'][sig_name]['values']) + print(f" - {sig_name}: {count} samples") return decoded_data, start_time - def to_mdf(self, decoded_data: Dict[str, Dict[str, List]], + def to_mdf(self, decoded_data: Dict[str, Dict[str, dict]], output_path: str, compression: int = 0, start_time: float = 0): @@ -471,14 +498,12 @@ class CANBinToMDF: print(f" - 원본 시작 시간을 메타데이터에 저장: {mdf.header.start_time}") signal_count = 0 - for msg_name, msg_data in decoded_data.items(): - all_timestamps = np.array(msg_data['timestamps']) - - for signal_name, values in msg_data.items(): - if signal_name == 'timestamps': - continue - + total_time_max = 0 + + for msg_name, signals_dict in decoded_data.items(): + for signal_name, sig_data in signals_dict.items(): try: + # DBC에서 시그널 정보 가져오기 msg_def = self.db.get_message_by_name(msg_name) signal_def = None for sig in msg_def.signals: @@ -489,20 +514,26 @@ class CANBinToMDF: if signal_def is None: continue - # None 값 필터링 - valid_indices = [i for i, v in enumerate(values) if v is not None] + # sig_data는 dict: {'timestamps': [...], 'values': [...]} + timestamps_list = sig_data['timestamps'] + values_list = sig_data['values'] - if len(valid_indices) == 0: - continue # 모든 값이 None이면 스킵 + # 데이터가 있는지 확인 + if len(values_list) == 0: + continue - # 유효한 값과 타임스탬프만 추출 - valid_samples = np.array([values[i] for i in valid_indices]) - valid_timestamps = all_timestamps[valid_indices] + # numpy 배열로 변환 + samples = np.array(values_list) + timestamps = np.array(timestamps_list) + + # 최대 시간 추적 + if len(timestamps) > 0: + total_time_max = max(total_time_max, timestamps[-1]) # Signal 객체 생성 signal = Signal( - samples=valid_samples, - timestamps=valid_timestamps, + samples=samples, + timestamps=timestamps, name=f"{msg_name}.{signal_name}", unit=signal_def.unit or '', comment=signal_def.comment or '' @@ -526,13 +557,12 @@ class CANBinToMDF: print(f"✓ MDF4 파일 생성 완료: {output_path}") print(f" - 시그널 수: {signal_count}") - if len(all_timestamps) > 0: - print(f" - 시간 범위: 0.000초 ~ {all_timestamps[-1]:.3f}초") + if total_time_max > 0: + print(f" - 시간 범위: 0.000초 ~ {total_time_max:.3f}초") file_size = Path(output_path).stat().st_size file_size_mb = file_size / (1024 * 1024) print(f" - 파일 크기: {file_size_mb:.2f} MB") - def convert(self, bin_path: str, output_path: str, compression: int = 0): """bin 파일을 MDF4로 변환 (전체 프로세스)""" @@ -563,7 +593,7 @@ class CANBinToMDF: print("✗ 디코딩된 데이터가 없습니다.") return - # 3. MDF 파일 생성 (상대 시간 사용) + # 3. MDF 파일 생성 self.to_mdf(decoded_data, output_path, compression, abs_start_time) print(f"{'='*60}")