mdf변환시 에러 제거(multiplexed 시그널)
This commit is contained in:
@@ -92,11 +92,17 @@ class CANBinToCSV:
|
|||||||
|
|
||||||
return messages
|
return messages
|
||||||
|
|
||||||
def decode_messages(self, messages: List[CANMessage],
|
def decode_messages(self, messages: List[CANMessage]) -> tuple:
|
||||||
apply_value_table: bool = True) -> Dict[str, Dict[str, List]]:
|
"""
|
||||||
"""CAN 메시지를 DBC를 사용하여 디코딩"""
|
CAN 메시지를 DBC를 사용하여 디코딩
|
||||||
|
Returns:
|
||||||
|
(decoded_data, start_time): 디코딩된 데이터와 시작 시간
|
||||||
|
"""
|
||||||
decoded_data = {}
|
decoded_data = {}
|
||||||
|
|
||||||
|
# 시작 시간 계산
|
||||||
|
start_time = messages[0].timestamp_us / 1e6 if messages else 0
|
||||||
|
|
||||||
# 각 메시지별로 초기화
|
# 각 메시지별로 초기화
|
||||||
for msg_def in self.db.messages:
|
for msg_def in self.db.messages:
|
||||||
msg_name = msg_def.name
|
msg_name = msg_def.name
|
||||||
@@ -110,30 +116,25 @@ class CANBinToCSV:
|
|||||||
msg_def = self.db.get_message_by_frame_id(msg.can_id)
|
msg_def = self.db.get_message_by_frame_id(msg.can_id)
|
||||||
msg_name = msg_def.name
|
msg_name = msg_def.name
|
||||||
|
|
||||||
|
# 메시지 디코딩 (decode는 multiplexer를 자동으로 처리)
|
||||||
decoded = msg_def.decode(msg.data)
|
decoded = msg_def.decode(msg.data)
|
||||||
|
|
||||||
# 타임스탬프 저장 (마이크로초 -> 초)
|
# 상대 시간으로 변환
|
||||||
decoded_data[msg_name]['timestamps'].append(msg.timestamp_us / 1e6)
|
relative_time = (msg.timestamp_us / 1e6) - start_time
|
||||||
|
decoded_data[msg_name]['timestamps'].append(relative_time)
|
||||||
|
|
||||||
# 각 시그널 값 저장
|
# 각 시그널 값 저장 (디코딩된 시그널만 저장)
|
||||||
for signal in msg_def.signals:
|
for signal in msg_def.signals:
|
||||||
signal_name = signal.name
|
signal_name = signal.name
|
||||||
|
|
||||||
|
# decoded에 존재하는 시그널만 처리
|
||||||
|
# (multiplexed 시그널은 조건에 맞을 때만 decoded에 포함됨)
|
||||||
|
if signal_name in decoded:
|
||||||
raw_value = decoded[signal_name]
|
raw_value = decoded[signal_name]
|
||||||
|
decoded_data[msg_name][signal_name].append(raw_value)
|
||||||
# Value Table 적용
|
|
||||||
if apply_value_table and signal.choices:
|
|
||||||
try:
|
|
||||||
int_value = int(raw_value)
|
|
||||||
if int_value in signal.choices:
|
|
||||||
value = signal.choices[int_value]
|
|
||||||
else:
|
else:
|
||||||
value = raw_value
|
# 디코딩되지 않은 시그널은 None 추가
|
||||||
except (ValueError, TypeError):
|
decoded_data[msg_name][signal_name].append(None)
|
||||||
value = raw_value
|
|
||||||
else:
|
|
||||||
value = raw_value
|
|
||||||
|
|
||||||
decoded_data[msg_name][signal_name].append(value)
|
|
||||||
|
|
||||||
decode_count += 1
|
decode_count += 1
|
||||||
|
|
||||||
@@ -143,11 +144,14 @@ class CANBinToCSV:
|
|||||||
continue
|
continue
|
||||||
|
|
||||||
print(f"✓ 메시지 디코딩 완료: {decode_count}/{len(messages)} 개")
|
print(f"✓ 메시지 디코딩 완료: {decode_count}/{len(messages)} 개")
|
||||||
|
print(f" - 시작 시간: {datetime.fromtimestamp(start_time)}")
|
||||||
|
print(f" - 상대 시간으로 변환: 0.000초 부터 시작")
|
||||||
|
|
||||||
# 빈 메시지 제거
|
# 빈 메시지 제거
|
||||||
decoded_data = {k: v for k, v in decoded_data.items() if len(v['timestamps']) > 0}
|
decoded_data = {k: v for k, v in decoded_data.items() if len(v['timestamps']) > 0}
|
||||||
|
|
||||||
return decoded_data
|
return decoded_data, start_time
|
||||||
|
|
||||||
|
|
||||||
def to_wide_csv(self, decoded_data: Dict[str, Dict[str, List]],
|
def to_wide_csv(self, decoded_data: Dict[str, Dict[str, List]],
|
||||||
output_path: str,
|
output_path: str,
|
||||||
@@ -459,27 +463,21 @@ class CANBinToMDF:
|
|||||||
output_path: str,
|
output_path: str,
|
||||||
compression: int = 0,
|
compression: int = 0,
|
||||||
start_time: float = 0):
|
start_time: float = 0):
|
||||||
"""
|
"""MDF4 파일 생성"""
|
||||||
MDF4 파일 생성
|
|
||||||
Args:
|
|
||||||
start_time: 원본 시작 시간 (메타데이터 저장용)
|
|
||||||
"""
|
|
||||||
mdf = MDF(version='4.10')
|
mdf = MDF(version='4.10')
|
||||||
|
|
||||||
# 메타데이터에 원본 시작 시간 기록
|
|
||||||
if start_time > 0:
|
if start_time > 0:
|
||||||
mdf.header.start_time = datetime.fromtimestamp(start_time)
|
mdf.header.start_time = datetime.fromtimestamp(start_time)
|
||||||
print(f" - 원본 시작 시간을 메타데이터에 저장: {mdf.header.start_time}")
|
print(f" - 원본 시작 시간을 메타데이터에 저장: {mdf.header.start_time}")
|
||||||
|
|
||||||
signal_count = 0
|
signal_count = 0
|
||||||
for msg_name, msg_data in decoded_data.items():
|
for msg_name, msg_data in decoded_data.items():
|
||||||
timestamps = np.array(msg_data['timestamps'])
|
all_timestamps = np.array(msg_data['timestamps'])
|
||||||
|
|
||||||
for signal_name, values in msg_data.items():
|
for signal_name, values in msg_data.items():
|
||||||
if signal_name == 'timestamps':
|
if signal_name == 'timestamps':
|
||||||
continue
|
continue
|
||||||
|
|
||||||
# DBC에서 시그널 정보 가져오기
|
|
||||||
try:
|
try:
|
||||||
msg_def = self.db.get_message_by_name(msg_name)
|
msg_def = self.db.get_message_by_name(msg_name)
|
||||||
signal_def = None
|
signal_def = None
|
||||||
@@ -491,13 +489,20 @@ class CANBinToMDF:
|
|||||||
if signal_def is None:
|
if signal_def is None:
|
||||||
continue
|
continue
|
||||||
|
|
||||||
# numpy 배열로 변환
|
# None 값 필터링
|
||||||
samples = np.array(values)
|
valid_indices = [i for i, v in enumerate(values) if v is not None]
|
||||||
|
|
||||||
|
if len(valid_indices) == 0:
|
||||||
|
continue # 모든 값이 None이면 스킵
|
||||||
|
|
||||||
|
# 유효한 값과 타임스탬프만 추출
|
||||||
|
valid_samples = np.array([values[i] for i in valid_indices])
|
||||||
|
valid_timestamps = all_timestamps[valid_indices]
|
||||||
|
|
||||||
# Signal 객체 생성
|
# Signal 객체 생성
|
||||||
signal = Signal(
|
signal = Signal(
|
||||||
samples=samples,
|
samples=valid_samples,
|
||||||
timestamps=timestamps,
|
timestamps=valid_timestamps,
|
||||||
name=f"{msg_name}.{signal_name}",
|
name=f"{msg_name}.{signal_name}",
|
||||||
unit=signal_def.unit or '',
|
unit=signal_def.unit or '',
|
||||||
comment=signal_def.comment or ''
|
comment=signal_def.comment or ''
|
||||||
@@ -511,11 +516,7 @@ class CANBinToMDF:
|
|||||||
continue
|
continue
|
||||||
|
|
||||||
# 파일 저장
|
# 파일 저장
|
||||||
compression_map = {
|
compression_map = {0: 0, 1: 1, 2: 2}
|
||||||
0: 0, # no compression
|
|
||||||
1: 1, # deflate
|
|
||||||
2: 2 # transposed + deflate
|
|
||||||
}
|
|
||||||
|
|
||||||
mdf.save(
|
mdf.save(
|
||||||
output_path,
|
output_path,
|
||||||
@@ -525,13 +526,14 @@ class CANBinToMDF:
|
|||||||
|
|
||||||
print(f"✓ MDF4 파일 생성 완료: {output_path}")
|
print(f"✓ MDF4 파일 생성 완료: {output_path}")
|
||||||
print(f" - 시그널 수: {signal_count}")
|
print(f" - 시그널 수: {signal_count}")
|
||||||
print(f" - 시간 범위: 0.000초 ~ {timestamps[-1]:.3f}초")
|
if len(all_timestamps) > 0:
|
||||||
|
print(f" - 시간 범위: 0.000초 ~ {all_timestamps[-1]:.3f}초")
|
||||||
|
|
||||||
# 파일 크기 출력
|
|
||||||
file_size = Path(output_path).stat().st_size
|
file_size = Path(output_path).stat().st_size
|
||||||
file_size_mb = file_size / (1024 * 1024)
|
file_size_mb = file_size / (1024 * 1024)
|
||||||
print(f" - 파일 크기: {file_size_mb:.2f} MB")
|
print(f" - 파일 크기: {file_size_mb:.2f} MB")
|
||||||
|
|
||||||
|
|
||||||
def convert(self, bin_path: str, output_path: str, compression: int = 0):
|
def convert(self, bin_path: str, output_path: str, compression: int = 0):
|
||||||
"""bin 파일을 MDF4로 변환 (전체 프로세스)"""
|
"""bin 파일을 MDF4로 변환 (전체 프로세스)"""
|
||||||
print(f"\n{'='*60}")
|
print(f"\n{'='*60}")
|
||||||
|
|||||||
Reference in New Issue
Block a user