csv에러 수정

This commit is contained in:
2026-01-16 08:59:56 +00:00
parent a0ba41cd6b
commit 9ee55fb2c5

View File

@@ -50,344 +50,10 @@ class CANMessage:
def __repr__(self):
return f"CANMessage(id=0x{self.can_id:X}, dlc={self.dlc}, ts={self.timestamp_us})"
# ============================================================================
# CSV 변환기
# ============================================================================
class CANBinToCSV:
"""CAN bin 파일을 CSV로 변환하는 클래스"""
def __init__(self, dbc_path: str):
"""
Args:
dbc_path: CAN Database (DBC) 파일 경로
"""
self.db = candb.load_file(dbc_path)
print(f"✓ DBC 파일 로드: {dbc_path}")
print(f" - 메시지 수: {len(self.db.messages)}")
print(f" - 시그널 수: {sum(len(msg.signals) for msg in self.db.messages)}")
def read_bin_file(self, bin_path: str) -> List[CANMessage]:
"""ESP32 CAN Logger의 bin 파일 읽기"""
messages = []
with open(bin_path, 'rb') as f:
data = f.read()
offset = 0
while offset + CANMessage.STRUCT_SIZE <= len(data):
msg_bytes = data[offset:offset + CANMessage.STRUCT_SIZE]
try:
msg = CANMessage.from_bytes(msg_bytes)
messages.append(msg)
except Exception as e:
print(f"⚠️ 메시지 파싱 오류 at offset {offset}: {e}")
offset += CANMessage.STRUCT_SIZE
print(f"✓ bin 파일 읽기 완료: {bin_path}")
print(f" - 총 메시지: {len(messages)}")
return messages
def decode_messages(self, messages: List[CANMessage]) -> tuple:
"""
CAN 메시지를 DBC를 사용하여 디코딩
Returns:
(decoded_data, start_time): 디코딩된 데이터와 시작 시간
"""
decoded_data = {}
# 시작 시간 계산
start_time = messages[0].timestamp_us / 1e6 if messages else 0
# 각 메시지별로 초기화 - 빈 리스트로 시작
for msg_def in self.db.messages:
msg_name = msg_def.name
decoded_data[msg_name] = {}
for signal in msg_def.signals:
# timestamps와 values를 함께 저장하는 구조로 변경
decoded_data[msg_name][signal.name] = {
'timestamps': [],
'values': []
}
decode_count = 0
for msg in messages:
try:
msg_def = self.db.get_message_by_frame_id(msg.can_id)
msg_name = msg_def.name
# 메시지 디코딩 (decode는 multiplexer를 자동으로 처리)
decoded = msg_def.decode(msg.data)
# 상대 시간으로 변환
relative_time = (msg.timestamp_us / 1e6) - start_time
# 각 시그널 값 저장 (디코딩된 시그널만 저장)
for signal in msg_def.signals:
signal_name = signal.name
# decoded에 존재하는 시그널만 처리
if signal_name in decoded:
raw_value = decoded[signal_name]
decoded_data[msg_name][signal_name]['timestamps'].append(relative_time)
decoded_data[msg_name][signal_name]['values'].append(raw_value)
decode_count += 1
except KeyError:
continue
except Exception as e:
continue
print(f"✓ 메시지 디코딩 완료: {decode_count}/{len(messages)}")
print(f" - 시작 시간: {datetime.fromtimestamp(start_time)}")
print(f" - 상대 시간으로 변환: 0.000초 부터 시작")
# 빈 시그널 제거 및 통계 출력
cleaned_data = {}
for msg_name, signals in decoded_data.items():
has_data = False
cleaned_signals = {}
for sig_name, sig_data in signals.items():
if len(sig_data['values']) > 0:
cleaned_signals[sig_name] = sig_data
has_data = True
if has_data:
cleaned_data[msg_name] = cleaned_signals
# Multiplexed 시그널 통계 출력
if msg_name == 'EMS2':
print(f"\n [EMS2 Multiplexed Signals]")
for sig_name, sig_data in cleaned_signals.items():
print(f" - {sig_name}: {len(sig_data['values'])} samples")
return cleaned_data, start_time
def to_wide_csv(self, decoded_data: Dict[str, Dict[str, List]],
output_path: str,
datetime_format: bool = True):
"""Wide 형식 CSV 생성 (각 시그널이 별도 컬럼)"""
# 모든 고유한 타임스탬프 수집 및 정렬
all_timestamps = set()
for msg_data in decoded_data.values():
all_timestamps.update(msg_data['timestamps'])
all_timestamps = sorted(all_timestamps)
# 각 타임스탬프에 대한 시그널 값 매핑
signal_columns = []
signal_data = {}
for msg_name, msg_data in decoded_data.items():
for signal_name, values in msg_data.items():
if signal_name == 'timestamps':
continue
col_name = f"{msg_name}.{signal_name}"
signal_columns.append(col_name)
# 타임스탬프-값 매핑 생성
timestamp_to_value = {}
for ts, val in zip(msg_data['timestamps'], values):
timestamp_to_value[ts] = val
signal_data[col_name] = timestamp_to_value
# CSV 작성
with open(output_path, 'w', newline='', encoding='utf-8') as f:
writer = csv.writer(f)
# 헤더 작성
if datetime_format:
header = ['Timestamp', 'DateTime'] + signal_columns
else:
header = ['Timestamp'] + signal_columns
writer.writerow(header)
# 데이터 작성
for ts in all_timestamps:
if datetime_format:
dt = datetime.fromtimestamp(ts).strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]
row = [f"{ts:.6f}", dt]
else:
row = [f"{ts:.6f}"]
for col_name in signal_columns:
value = signal_data[col_name].get(ts, '')
row.append(value)
writer.writerow(row)
print(f"✓ Wide CSV 생성 완료: {output_path}")
print(f" - 행 수: {len(all_timestamps)}")
print(f" - 시그널 수: {len(signal_columns)}")
def to_long_csv(self, decoded_data: Dict[str, Dict[str, List]],
output_path: str,
datetime_format: bool = True):
"""Long 형식 CSV 생성 (각 시그널 값이 별도 행)"""
with open(output_path, 'w', newline='', encoding='utf-8') as f:
writer = csv.writer(f)
# 헤더 작성
if datetime_format:
header = ['Timestamp', 'DateTime', 'Message', 'Signal', 'Value']
else:
header = ['Timestamp', 'Message', 'Signal', 'Value']
writer.writerow(header)
# 데이터 작성
row_count = 0
for msg_name, msg_data in decoded_data.items():
timestamps = msg_data['timestamps']
for signal_name, values in msg_data.items():
if signal_name == 'timestamps':
continue
for ts, val in zip(timestamps, values):
if datetime_format:
dt = datetime.fromtimestamp(ts).strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]
row = [f"{ts:.6f}", dt, msg_name, signal_name, val]
else:
row = [f"{ts:.6f}", msg_name, signal_name, val]
writer.writerow(row)
row_count += 1
print(f"✓ Long CSV 생성 완료: {output_path}")
print(f" - 행 수: {row_count}")
def to_message_csv(self, decoded_data: Dict[str, Dict[str, List]],
output_dir: str,
datetime_format: bool = True):
"""메시지별 CSV 생성 (각 CAN 메시지가 별도 CSV 파일)"""
output_path = Path(output_dir)
output_path.mkdir(parents=True, exist_ok=True)
file_count = 0
for msg_name, msg_data in decoded_data.items():
# 파일명에 사용할 수 없는 문자 제거
safe_name = msg_name.replace('/', '_').replace('\\', '_')
csv_path = output_path / f"{safe_name}.csv"
with open(csv_path, 'w', newline='', encoding='utf-8') as f:
writer = csv.writer(f)
# 헤더 작성
signal_names = [k for k in msg_data.keys() if k != 'timestamps']
if datetime_format:
header = ['Timestamp', 'DateTime'] + signal_names
else:
header = ['Timestamp'] + signal_names
writer.writerow(header)
# 데이터 작성
timestamps = msg_data['timestamps']
for i, ts in enumerate(timestamps):
if datetime_format:
dt = datetime.fromtimestamp(ts).strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]
row = [f"{ts:.6f}", dt]
else:
row = [f"{ts:.6f}"]
for signal_name in signal_names:
row.append(msg_data[signal_name][i])
writer.writerow(row)
file_count += 1
print(f"✓ 메시지별 CSV 생성 완료: {output_dir}")
print(f" - 파일 수: {file_count}")
def to_raw_csv(self, messages: List[CANMessage], output_path: str):
"""원본 CAN 메시지 CSV 생성 (디코딩 없이 raw 데이터)"""
with open(output_path, 'w', newline='', encoding='utf-8') as f:
writer = csv.writer(f)
# 헤더
writer.writerow(['Timestamp', 'DateTime', 'CAN_ID', 'DLC', 'Data'])
# 데이터
for msg in messages:
ts = msg.timestamp_us / 1e6
dt = datetime.fromtimestamp(ts).strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]
can_id = f"0x{msg.can_id:03X}"
data_hex = ' '.join([f"{b:02X}" for b in msg.data])
writer.writerow([f"{ts:.6f}", dt, can_id, msg.dlc, data_hex])
print(f"✓ Raw CSV 생성 완료: {output_path}")
print(f" - 행 수: {len(messages)}")
def convert(self, bin_path: str, output_path: str,
format: str = 'wide',
apply_value_table: bool = True,
datetime_format: bool = True):
"""bin 파일을 CSV로 변환 (전체 프로세스)"""
print(f"\n{'='*60}")
print(f"ESP32 CAN bin → CSV 변환")
print(f"{'='*60}")
# 1. bin 파일 읽기
messages = self.read_bin_file(bin_path)
if not messages:
print("✗ 메시지가 없습니다.")
return
# 시간 범위 출력
start_time = messages[0].timestamp_us / 1e6
end_time = messages[-1].timestamp_us / 1e6
duration = end_time - start_time
print(f" - 시작 시간: {datetime.fromtimestamp(start_time)}")
print(f" - 종료 시간: {datetime.fromtimestamp(end_time)}")
print(f" - 기간: {duration:.2f}")
# Raw 형식은 디코딩 없이 바로 변환
if format == 'raw':
self.to_raw_csv(messages, output_path)
else:
# 2. 메시지 디코딩
decoded_data = self.decode_messages(messages, apply_value_table)
if not decoded_data:
print("✗ 디코딩된 데이터가 없습니다.")
return
# 3. CSV 생성
if format == 'wide':
self.to_wide_csv(decoded_data, output_path, datetime_format)
elif format == 'long':
self.to_long_csv(decoded_data, output_path, datetime_format)
elif format == 'message':
self.to_message_csv(decoded_data, output_path, datetime_format)
else:
print(f"✗ 알 수 없는 형식: {format}")
return
print(f"{'='*60}")
print(f"✓ 변환 완료!")
print(f"{'='*60}\n")
# ============================================================================
# MDF 변환기
# ============================================================================
# ============================================================================
# MDF 변환기 (수정된 버전)
# ============================================================================
class CANBinToMDF:
"""CAN bin 파일을 MDF4로 변환하는 클래스"""
@@ -601,6 +267,417 @@ class CANBinToMDF:
print(f"{'='*60}\n")
# ============================================================================
# CSV 변환기
# ============================================================================
class CANBinToCSV:
"""CAN bin 파일을 CSV로 변환하는 클래스"""
def __init__(self, dbc_path: str):
"""
Args:
dbc_path: CAN Database (DBC) 파일 경로
"""
self.db = candb.load_file(dbc_path)
print(f"✓ DBC 파일 로드: {dbc_path}")
print(f" - 메시지 수: {len(self.db.messages)}")
print(f" - 시그널 수: {sum(len(msg.signals) for msg in self.db.messages)}")
def read_bin_file(self, bin_path: str) -> List[CANMessage]:
"""ESP32 CAN Logger의 bin 파일 읽기"""
messages = []
try:
with open(bin_path, 'rb') as f:
data = f.read()
if not data:
print(f"⚠️ 파일이 비어있습니다: {bin_path}")
return messages
print(f" 파일 크기: {len(data)} bytes")
offset = 0
parse_errors = 0
while offset + CANMessage.STRUCT_SIZE <= len(data):
msg_bytes = data[offset:offset + CANMessage.STRUCT_SIZE]
try:
msg = CANMessage.from_bytes(msg_bytes)
messages.append(msg)
except Exception as e:
parse_errors += 1
if parse_errors <= 5: # 처음 5개 에러만 출력
print(f"⚠️ 메시지 파싱 오류 at offset {offset}: {e}")
offset += CANMessage.STRUCT_SIZE
if parse_errors > 5:
print(f"⚠️ 총 {parse_errors}개의 파싱 오류 발생")
print(f"✓ bin 파일 읽기 완료: {bin_path}")
print(f" - 총 메시지: {len(messages)}")
return messages
except FileNotFoundError:
print(f"❌ 파일을 찾을 수 없습니다: {bin_path}")
return messages
except Exception as e:
print(f"❌ 파일 읽기 오류: {e}")
import traceback
traceback.print_exc()
return messages
def decode_messages(self, messages: List[CANMessage],
apply_value_table: bool = True) -> Dict[str, Dict[str, List]]:
"""CAN 메시지를 DBC를 사용하여 디코딩 (multiplexed 시그널 지원)"""
if not messages:
print("⚠️ 디코딩할 메시지가 없습니다")
return {}
decoded_data = {}
decode_count = 0
unknown_id_count = 0
decode_error_count = 0
for msg in messages:
try:
msg_def = self.db.get_message_by_frame_id(msg.can_id)
msg_name = msg_def.name
# 메시지가 처음 등장하면 초기화
if msg_name not in decoded_data:
decoded_data[msg_name] = {'timestamps': []}
# 메시지 디코딩
decoded = msg_def.decode(msg.data)
# 타임스탬프 저장 (마이크로초 -> 초)
timestamp = msg.timestamp_us / 1e6
decoded_data[msg_name]['timestamps'].append(timestamp)
# 디코딩된 시그널만 저장 (multiplexed 시그널 처리)
for signal_name, raw_value in decoded.items():
# 시그널이 처음 등장하면 초기화
if signal_name not in decoded_data[msg_name]:
decoded_data[msg_name][signal_name] = []
# Value Table 적용
if apply_value_table:
try:
msg_signal = next((s for s in msg_def.signals if s.name == signal_name), None)
if msg_signal and msg_signal.choices:
try:
int_value = int(raw_value)
if int_value in msg_signal.choices:
value = msg_signal.choices[int_value]
else:
value = raw_value
except (ValueError, TypeError):
value = raw_value
else:
value = raw_value
except:
value = raw_value
else:
value = raw_value
decoded_data[msg_name][signal_name].append(value)
decode_count += 1
except KeyError:
unknown_id_count += 1
continue
except Exception as e:
decode_error_count += 1
if decode_error_count <= 5:
print(f"⚠️ 디코딩 오류: {e}")
continue
print(f"✓ 메시지 디코딩 완료")
print(f" - 성공: {decode_count}/{len(messages)}")
if unknown_id_count > 0:
print(f" - Unknown ID: {unknown_id_count}")
if decode_error_count > 0:
print(f" - 디코딩 오류: {decode_error_count}")
# 빈 메시지 제거
decoded_data = {k: v for k, v in decoded_data.items() if len(v['timestamps']) > 0}
return decoded_data
def to_wide_csv(self, decoded_data: Dict[str, Dict[str, List]],
output_path: str,
datetime_format: bool = True):
"""Wide 형식 CSV 생성 (각 시그널이 별도 컬럼)"""
# 모든 고유한 타임스탬프 수집 및 정렬
all_timestamps = set()
for msg_data in decoded_data.values():
all_timestamps.update(msg_data['timestamps'])
all_timestamps = sorted(all_timestamps)
# 각 타임스탬프에 대한 시그널 값 매핑
signal_columns = []
signal_data = {}
for msg_name, msg_data in decoded_data.items():
for signal_name, values in msg_data.items():
if signal_name == 'timestamps':
continue
col_name = f"{msg_name}.{signal_name}"
signal_columns.append(col_name)
# 타임스탬프-값 매핑 생성
timestamp_to_value = {}
for ts, val in zip(msg_data['timestamps'], values):
timestamp_to_value[ts] = val
signal_data[col_name] = timestamp_to_value
# CSV 작성
with open(output_path, 'w', newline='', encoding='utf-8') as f:
writer = csv.writer(f)
# 헤더 작성
if datetime_format:
header = ['Timestamp', 'DateTime'] + signal_columns
else:
header = ['Timestamp'] + signal_columns
writer.writerow(header)
# 데이터 작성
for ts in all_timestamps:
if datetime_format:
dt = datetime.fromtimestamp(ts).strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]
row = [f"{ts:.6f}", dt]
else:
row = [f"{ts:.6f}"]
for col_name in signal_columns:
value = signal_data[col_name].get(ts, '')
row.append(value)
writer.writerow(row)
print(f"✓ Wide CSV 생성 완료: {output_path}")
print(f" - 행 수: {len(all_timestamps)}")
print(f" - 시그널 수: {len(signal_columns)}")
def to_long_csv(self, decoded_data: Dict[str, Dict[str, List]],
output_path: str,
datetime_format: bool = True):
"""Long 형식 CSV 생성 (각 시그널 값이 별도 행)"""
with open(output_path, 'w', newline='', encoding='utf-8') as f:
writer = csv.writer(f)
# 헤더 작성
if datetime_format:
header = ['Timestamp', 'DateTime', 'Message', 'Signal', 'Value']
else:
header = ['Timestamp', 'Message', 'Signal', 'Value']
writer.writerow(header)
# 데이터 작성
row_count = 0
for msg_name, msg_data in decoded_data.items():
timestamps = msg_data['timestamps']
for signal_name, values in msg_data.items():
if signal_name == 'timestamps':
continue
for ts, val in zip(timestamps, values):
if datetime_format:
dt = datetime.fromtimestamp(ts).strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]
row = [f"{ts:.6f}", dt, msg_name, signal_name, val]
else:
row = [f"{ts:.6f}", msg_name, signal_name, val]
writer.writerow(row)
row_count += 1
print(f"✓ Long CSV 생성 완료: {output_path}")
print(f" - 행 수: {row_count}")
def to_message_csv(self, decoded_data: Dict[str, Dict[str, List]],
output_dir: str,
datetime_format: bool = True):
"""메시지별 CSV 생성 (각 CAN 메시지가 별도 CSV 파일)"""
output_path = Path(output_dir)
output_path.mkdir(parents=True, exist_ok=True)
file_count = 0
for msg_name, msg_data in decoded_data.items():
# 파일명에 사용할 수 없는 문자 제거
safe_name = msg_name.replace('/', '_').replace('\\', '_')
csv_path = output_path / f"{safe_name}.csv"
try:
timestamps = msg_data['timestamps']
signal_names = [k for k in msg_data.keys() if k != 'timestamps']
# 각 시그널의 길이가 다를 수 있으므로 (multiplexed signals)
# 타임스탬프별로 매핑을 생성
signal_mappings = {}
for signal_name in signal_names:
signal_values = msg_data[signal_name]
# 타임스탬프와 값의 개수가 다르면 경고
if len(signal_values) != len(timestamps):
print(f"⚠️ {msg_name}.{signal_name}: {len(signal_values)} values != {len(timestamps)} timestamps")
# 타임스탬프-값 매핑 생성 (있는 것만)
signal_mappings[signal_name] = {
timestamps[i]: signal_values[i]
for i in range(min(len(timestamps), len(signal_values)))
}
else:
# 정상적인 경우
signal_mappings[signal_name] = {
timestamps[i]: signal_values[i]
for i in range(len(timestamps))
}
with open(csv_path, 'w', newline='', encoding='utf-8') as f:
writer = csv.writer(f)
# 헤더 작성
if datetime_format:
header = ['Timestamp', 'DateTime'] + signal_names
else:
header = ['Timestamp'] + signal_names
writer.writerow(header)
# 데이터 작성 - 각 타임스탬프별로
for ts in timestamps:
if datetime_format:
dt = datetime.fromtimestamp(ts).strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]
row = [f"{ts:.6f}", dt]
else:
row = [f"{ts:.6f}"]
# 각 시그널의 값 가져오기 (없으면 빈 문자열)
for signal_name in signal_names:
value = signal_mappings[signal_name].get(ts, '')
row.append(value)
writer.writerow(row)
file_count += 1
except Exception as e:
print(f"⚠️ {msg_name} CSV 생성 실패: {e}")
import traceback
traceback.print_exc()
continue
print(f"✓ 메시지별 CSV 생성 완료: {output_dir}")
print(f" - 파일 수: {file_count}")
def to_raw_csv(self, messages: List[CANMessage], output_path: str):
"""원본 CAN 메시지 CSV 생성 (디코딩 없이 raw 데이터)"""
with open(output_path, 'w', newline='', encoding='utf-8') as f:
writer = csv.writer(f)
# 헤더
writer.writerow(['Timestamp', 'DateTime', 'CAN_ID', 'DLC', 'Data'])
# 데이터
for msg in messages:
ts = msg.timestamp_us / 1e6
dt = datetime.fromtimestamp(ts).strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]
can_id = f"0x{msg.can_id:03X}"
data_hex = ' '.join([f"{b:02X}" for b in msg.data])
writer.writerow([f"{ts:.6f}", dt, can_id, msg.dlc, data_hex])
print(f"✓ Raw CSV 생성 완료: {output_path}")
print(f" - 행 수: {len(messages)}")
def convert(self, bin_path: str, output_path: str,
format: str = 'wide',
apply_value_table: bool = True,
datetime_format: bool = True):
"""bin 파일을 CSV로 변환 (전체 프로세스)"""
try:
print(f"\n{'='*60}")
print(f"ESP32 CAN bin → CSV 변환")
print(f"{'='*60}")
# 1. bin 파일 읽기
print(f"1. bin 파일 읽기 시작...")
messages = self.read_bin_file(bin_path)
if not messages:
print("✗ 메시지가 없습니다.")
return
print(f"{len(messages)}개 메시지 읽음")
# 시간 범위 출력
print(f"2. 시간 정보 추출 중...")
start_time = messages[0].timestamp_us / 1e6
end_time = messages[-1].timestamp_us / 1e6
duration = end_time - start_time
print(f" - 시작 시간: {datetime.fromtimestamp(start_time)}")
print(f" - 종료 시간: {datetime.fromtimestamp(end_time)}")
print(f" - 기간: {duration:.2f}")
# Raw 형식은 디코딩 없이 바로 변환
if format == 'raw':
print(f"3. Raw CSV 생성 중...")
self.to_raw_csv(messages, output_path)
else:
# 2. 메시지 디코딩
print(f"3. 메시지 디코딩 시작...")
decoded_data = self.decode_messages(messages, apply_value_table)
if not decoded_data:
print("✗ 디코딩된 데이터가 없습니다.")
return
print(f" {len(decoded_data)}개 메시지 타입 디코딩됨")
# 3. CSV 생성
print(f"4. CSV 파일 생성 중...")
if format == 'wide':
self.to_wide_csv(decoded_data, output_path, datetime_format)
elif format == 'long':
self.to_long_csv(decoded_data, output_path, datetime_format)
elif format == 'message':
self.to_message_csv(decoded_data, output_path, datetime_format)
else:
print(f"✗ 알 수 없는 형식: {format}")
return
print(f"{'='*60}")
print(f"✓ 변환 완료!")
print(f"{'='*60}\n")
except Exception as e:
import traceback
print(f"\n{'='*60}")
print(f"❌ 변환 중 오류 발생")
print(f"{'='*60}")
print(f"오류 타입: {type(e).__name__}")
print(f"오류 메시지: {str(e)}")
print(f"\n상세 스택 트레이스:")
traceback.print_exc()
print(f"{'='*60}\n")
raise
# ============================================================================
# GUI Worker Thread
# ============================================================================