mdf변환시 에러 제거(multiplexed 시그널)2

This commit is contained in:
2026-01-16 08:05:13 +00:00
parent 576836af63
commit a0ba41cd6b

View File

@@ -103,12 +103,16 @@ class CANBinToCSV:
# 시작 시간 계산
start_time = messages[0].timestamp_us / 1e6 if messages else 0
# 각 메시지별로 초기화
# 각 메시지별로 초기화 - 빈 리스트로 시작
for msg_def in self.db.messages:
msg_name = msg_def.name
decoded_data[msg_name] = {'timestamps': []}
decoded_data[msg_name] = {}
for signal in msg_def.signals:
decoded_data[msg_name][signal.name] = []
# timestamps와 values를 함께 저장하는 구조로 변경
decoded_data[msg_name][signal.name] = {
'timestamps': [],
'values': []
}
decode_count = 0
for msg in messages:
@@ -121,20 +125,16 @@ class CANBinToCSV:
# 상대 시간으로 변환
relative_time = (msg.timestamp_us / 1e6) - start_time
decoded_data[msg_name]['timestamps'].append(relative_time)
# 각 시그널 값 저장 (디코딩된 시그널만 저장)
for signal in msg_def.signals:
signal_name = signal.name
# decoded에 존재하는 시그널만 처리
# (multiplexed 시그널은 조건에 맞을 때만 decoded에 포함됨)
if signal_name in decoded:
raw_value = decoded[signal_name]
decoded_data[msg_name][signal_name].append(raw_value)
else:
# 디코딩되지 않은 시그널은 None 추가
decoded_data[msg_name][signal_name].append(None)
decoded_data[msg_name][signal_name]['timestamps'].append(relative_time)
decoded_data[msg_name][signal_name]['values'].append(raw_value)
decode_count += 1
@@ -147,10 +147,28 @@ class CANBinToCSV:
print(f" - 시작 시간: {datetime.fromtimestamp(start_time)}")
print(f" - 상대 시간으로 변환: 0.000초 부터 시작")
# 빈 메시지 제거
decoded_data = {k: v for k, v in decoded_data.items() if len(v['timestamps']) > 0}
# 빈 시그널 제거 및 통계 출력
cleaned_data = {}
for msg_name, signals in decoded_data.items():
has_data = False
cleaned_signals = {}
for sig_name, sig_data in signals.items():
if len(sig_data['values']) > 0:
cleaned_signals[sig_name] = sig_data
has_data = True
if has_data:
cleaned_data[msg_name] = cleaned_signals
# Multiplexed 시그널 통계 출력
if msg_name == 'EMS2':
print(f"\n [EMS2 Multiplexed Signals]")
for sig_name, sig_data in cleaned_signals.items():
print(f" - {sig_name}: {len(sig_data['values'])} samples")
return decoded_data, start_time
return cleaned_data, start_time
def to_wide_csv(self, decoded_data: Dict[str, Dict[str, List]],
@@ -413,17 +431,12 @@ class CANBinToMDF:
Returns:
(decoded_data, start_time): 디코딩된 데이터와 시작 시간
"""
decoded_data = {}
# 시작 시간 계산 (첫 번째 메시지의 타임스탬프)
# 시작 시간 계산
start_time = messages[0].timestamp_us / 1e6 if messages else 0
# 각 메시지별로 초기화
for msg_def in self.db.messages:
msg_name = msg_def.name
decoded_data[msg_name] = {'timestamps': []}
for signal in msg_def.signals:
decoded_data[msg_name][signal.name] = []
# 각 시그널별로 timestamps와 values를 별도로 저장
# 구조: {msg_name: {sig_name: {'timestamps': [], 'values': []}}}
decoded_data = {}
decode_count = 0
for msg in messages:
@@ -431,17 +444,26 @@ class CANBinToMDF:
msg_def = self.db.get_message_by_frame_id(msg.can_id)
msg_name = msg_def.name
# 메시지 디코딩
decoded = msg_def.decode(msg.data)
# 타임스탬프를 상대 시간으로 변환 (시작 시간을 0으로)
# 상대 시간으로 변환
relative_time = (msg.timestamp_us / 1e6) - start_time
decoded_data[msg_name]['timestamps'].append(relative_time)
# 각 시그널 값 저장
for signal in msg_def.signals:
signal_name = signal.name
raw_value = decoded[signal_name]
decoded_data[msg_name][signal_name].append(raw_value)
# 메시지가 처음 등장하면 초기화
if msg_name not in decoded_data:
decoded_data[msg_name] = {}
# 디코딩된 시그널만 저장
for signal_name, value in decoded.items():
if signal_name not in decoded_data[msg_name]:
decoded_data[msg_name][signal_name] = {
'timestamps': [],
'values': []
}
decoded_data[msg_name][signal_name]['timestamps'].append(relative_time)
decoded_data[msg_name][signal_name]['values'].append(value)
decode_count += 1
@@ -454,12 +476,17 @@ class CANBinToMDF:
print(f" - 시작 시간: {datetime.fromtimestamp(start_time)}")
print(f" - 상대 시간으로 변환: 0.000초 부터 시작")
# 빈 메시지 제거
decoded_data = {k: v for k, v in decoded_data.items() if len(v['timestamps']) > 0}
# 통계 출력 (EMS2 multiplexed 시그널 확인)
if 'EMS2' in decoded_data:
print(f"\n [EMS2 Multiplexed Signals]")
for sig_name in ['CAN_VERS', 'CONF_TCU', 'OBD_FRF_ACK', 'TQ_STND']:
if sig_name in decoded_data['EMS2']:
count = len(decoded_data['EMS2'][sig_name]['values'])
print(f" - {sig_name}: {count} samples")
return decoded_data, start_time
def to_mdf(self, decoded_data: Dict[str, Dict[str, List]],
def to_mdf(self, decoded_data: Dict[str, Dict[str, dict]],
output_path: str,
compression: int = 0,
start_time: float = 0):
@@ -471,14 +498,12 @@ class CANBinToMDF:
print(f" - 원본 시작 시간을 메타데이터에 저장: {mdf.header.start_time}")
signal_count = 0
for msg_name, msg_data in decoded_data.items():
all_timestamps = np.array(msg_data['timestamps'])
for signal_name, values in msg_data.items():
if signal_name == 'timestamps':
continue
total_time_max = 0
for msg_name, signals_dict in decoded_data.items():
for signal_name, sig_data in signals_dict.items():
try:
# DBC에서 시그널 정보 가져오기
msg_def = self.db.get_message_by_name(msg_name)
signal_def = None
for sig in msg_def.signals:
@@ -489,20 +514,26 @@ class CANBinToMDF:
if signal_def is None:
continue
# None 값 필터링
valid_indices = [i for i, v in enumerate(values) if v is not None]
# sig_data는 dict: {'timestamps': [...], 'values': [...]}
timestamps_list = sig_data['timestamps']
values_list = sig_data['values']
if len(valid_indices) == 0:
continue # 모든 값이 None이면 스킵
# 데이터가 있는지 확인
if len(values_list) == 0:
continue
# 유효한 값과 타임스탬프만 추출
valid_samples = np.array([values[i] for i in valid_indices])
valid_timestamps = all_timestamps[valid_indices]
# numpy 배열로 변환
samples = np.array(values_list)
timestamps = np.array(timestamps_list)
# 최대 시간 추적
if len(timestamps) > 0:
total_time_max = max(total_time_max, timestamps[-1])
# Signal 객체 생성
signal = Signal(
samples=valid_samples,
timestamps=valid_timestamps,
samples=samples,
timestamps=timestamps,
name=f"{msg_name}.{signal_name}",
unit=signal_def.unit or '',
comment=signal_def.comment or ''
@@ -526,13 +557,12 @@ class CANBinToMDF:
print(f"✓ MDF4 파일 생성 완료: {output_path}")
print(f" - 시그널 수: {signal_count}")
if len(all_timestamps) > 0:
print(f" - 시간 범위: 0.000초 ~ {all_timestamps[-1]:.3f}")
if total_time_max > 0:
print(f" - 시간 범위: 0.000초 ~ {total_time_max:.3f}")
file_size = Path(output_path).stat().st_size
file_size_mb = file_size / (1024 * 1024)
print(f" - 파일 크기: {file_size_mb:.2f} MB")
def convert(self, bin_path: str, output_path: str, compression: int = 0):
"""bin 파일을 MDF4로 변환 (전체 프로세스)"""
@@ -563,7 +593,7 @@ class CANBinToMDF:
print("✗ 디코딩된 데이터가 없습니다.")
return
# 3. MDF 파일 생성 (상대 시간 사용)
# 3. MDF 파일 생성
self.to_mdf(decoded_data, output_path, compression, abs_start_time)
print(f"{'='*60}")