""" STT + Subtitle Pipeline Celery Tasks subtitle_pipeline_task: Step 1: ffmpeg → 16kHz WAV 추출 Step 2: Whisper → 원어 SRT / VTT 생성 Step 3: LLM → 번역 SRT / VTT 생성 (선택) """ import os, json, subprocess, tempfile import httpx from celery import Celery from ocr_tasks import ocr_task # noqa: F401 REDIS_URL = os.getenv("REDIS_URL", "redis://redis:6379/0") MODEL_SIZE = os.getenv("WHISPER_MODEL", "medium") DEVICE = os.getenv("WHISPER_DEVICE", "cpu") COMPUTE_TYPE = os.getenv("WHISPER_COMPUTE_TYPE", "int8") LANGUAGE = os.getenv("WHISPER_LANGUAGE", "ko") or None BEAM_SIZE = int(os.getenv("WHISPER_BEAM_SIZE", "5")) INITIAL_PROMPT = os.getenv("WHISPER_INITIAL_PROMPT", "") or None OUTPUT_DIR = os.getenv("OUTPUT_DIR", "/data/outputs") OLLAMA_URL = os.getenv("OLLAMA_URL", "http://192.168.0.126:11434") OLLAMA_TIMEOUT = int(os.getenv("OLLAMA_TIMEOUT", "600")) _cpu_threads_env = int(os.getenv("CPU_THREADS", "0")) CPU_THREADS = _cpu_threads_env if _cpu_threads_env > 0 else None celery_app = Celery("whisper_tasks", broker=REDIS_URL, backend=REDIS_URL) celery_app.conf.update( task_serializer="json", result_serializer="json", accept_content=["json"], task_track_started=True, result_expires=3600, ) _whisper_model = None def get_model(): global _whisper_model if _whisper_model is None: from faster_whisper import WhisperModel kwargs = dict(device=DEVICE, compute_type=COMPUTE_TYPE) if CPU_THREADS is not None: kwargs["cpu_threads"] = CPU_THREADS print(f"[Whisper] 로딩: {MODEL_SIZE}/{DEVICE}/{COMPUTE_TYPE}/threads={CPU_THREADS or 'auto'}") _whisper_model = WhisperModel(MODEL_SIZE, **kwargs) print("[Whisper] 로드 완료") return _whisper_model # ══════════════════════════════════════════════════════════════ # 언어 코드 → 표시명 # ══════════════════════════════════════════════════════════════ LANG_NAMES = { "ko":"한국어","en":"English","ja":"日本語","zh":"中文(简体)", "zh-tw":"中文(繁體)","fr":"Français","de":"Deutsch","es":"Español", "it":"Italiano","pt":"Português","ru":"Русский","ar":"العربية", "vi":"Tiếng Việt","th":"ไทย","id":"Bahasa Indonesia", "nl":"Nederlands","pl":"Polski","tr":"Türkçe","sv":"Svenska", "uk":"Українська","hi":"हिन्दी","bn":"বাংলা", } def _lang_name(code): return LANG_NAMES.get(code, code) # ══════════════════════════════════════════════════════════════ # 자막 포맷 생성 # ══════════════════════════════════════════════════════════════ def _srt_time(s: float) -> str: ms = int(round(s * 1000)) h, r = divmod(ms, 3600000); m, r = divmod(r, 60000); sec, ms = divmod(r, 1000) return f"{h:02d}:{m:02d}:{sec:02d},{ms:03d}" def _vtt_time(s: float) -> str: return _srt_time(s).replace(",", ".") def make_srt(segments: list) -> str: out = [] for i, seg in enumerate(segments, 1): out += [str(i), f"{_srt_time(seg['start'])} --> {_srt_time(seg['end'])}", seg["text"].strip(), ""] return "\n".join(out) def make_vtt(segments: list) -> str: out = ["WEBVTT", ""] for i, seg in enumerate(segments, 1): out += [str(i), f"{_vtt_time(seg['start'])} --> {_vtt_time(seg['end'])}", seg["text"].strip(), ""] return "\n".join(out) # ══════════════════════════════════════════════════════════════ # LLM 번역 (세그먼트 배치) # ══════════════════════════════════════════════════════════════ def _translate_batch(texts: list, target_lang: str, use_openrouter: bool, model: str, openrouter_url: str, openrouter_key: str) -> list: """texts 리스트 → 번역된 texts 리스트""" if not texts or not model: return texts lang_name = _lang_name(target_lang) prompt = ( f"아래 자막 문장 배열을 {lang_name}로 번역해줘.\n" f"반드시 JSON 문자열 배열로만 답해. 설명·마크다운 없이 배열만 출력.\n" f"입력과 동일한 개수와 순서를 유지해.\n\n" f"{json.dumps(texts, ensure_ascii=False)}" ) try: if use_openrouter and openrouter_key: resp = httpx.post( f"{openrouter_url.rstrip('/')}/chat/completions", headers={"Authorization": f"Bearer {openrouter_key}", "HTTP-Referer": "https://voicescript.local", "Content-Type": "application/json"}, json={"model": model, "messages": [{"role":"user","content":prompt}], "temperature": 0.2}, timeout=float(OLLAMA_TIMEOUT), ) resp.raise_for_status() raw = resp.json()["choices"][0]["message"]["content"].strip() else: resp = httpx.post(f"{OLLAMA_URL}/api/chat", json={"model": model, "messages": [{"role":"user","content":prompt}], "stream": False, "options": {"temperature": 0.2}}, timeout=float(OLLAMA_TIMEOUT)) resp.raise_for_status() raw = resp.json().get("message",{}).get("content","").strip() # 코드블록 제거 후 JSON 파싱 if "```" in raw: raw = raw.split("```")[1].lstrip("json\n").rstrip() result = json.loads(raw) if isinstance(result, list) and len(result) == len(texts): return [str(r) for r in result] return texts except Exception as e: print(f"[번역 실패] {e}") return texts # 실패 시 원본 유지 # ══════════════════════════════════════════════════════════════ # STT + Ollama/OpenRouter 후처리 (기존 음성변환용) # ══════════════════════════════════════════════════════════════ def _ollama_postprocess(text: str, model: str) -> str: if not model or not text.strip(): return text prompt = ("다음은 음성 인식으로 추출된 텍스트입니다. " "내용은 절대 변경하지 말고, 문장 부호를 추가하고 자연스럽게 다듬어줘. " "결과 텍스트만 출력하고 설명은 하지 마.\n\n" + text) try: resp = httpx.post(f"{OLLAMA_URL}/api/chat", json={"model":model,"messages":[{"role":"user","content":prompt}], "stream":False,"options":{"temperature":0.1}}, timeout=float(OLLAMA_TIMEOUT)) resp.raise_for_status() return resp.json().get("message",{}).get("content","").strip() or text except: return text def _openrouter_postprocess(text: str, model: str, base_url: str, api_key: str) -> str: if not model or not api_key or not text.strip(): return text prompt = ("다음은 음성 인식으로 추출된 텍스트입니다. " "내용은 절대 변경하지 말고, 문장 부호를 추가하고 자연스럽게 다듬어줘. " "결과 텍스트만 출력하고 설명은 하지 마.\n\n" + text) try: resp = httpx.post(f"{base_url.rstrip('/')}/chat/completions", headers={"Authorization":f"Bearer {api_key}","HTTP-Referer":"https://voicescript.local","Content-Type":"application/json"}, json={"model":model,"messages":[{"role":"user","content":prompt}],"temperature":0.1}, timeout=float(OLLAMA_TIMEOUT)) resp.raise_for_status() return resp.json()["choices"][0]["message"]["content"].strip() or text except: return text # ══════════════════════════════════════════════════════════════ # 기존 STT 태스크 (음성변환 탭용) # ══════════════════════════════════════════════════════════════ @celery_app.task(bind=True, name="tasks.transcribe_task", queue="stt") def transcribe_task( self, file_id: str, audio_path: str, use_ollama: bool = False, ollama_model: str = "", use_openrouter: bool = False, openrouter_model: str = "", openrouter_url: str = "", openrouter_key: str = "", ): self.update_state(state="PROGRESS", meta={"progress":5,"message":"모델 준비 중..."}) try: model = get_model() self.update_state(state="PROGRESS", meta={"progress":15,"message":"오디오 분석 중..."}) segments_gen, info = model.transcribe( audio_path, language=LANGUAGE, beam_size=BEAM_SIZE, initial_prompt=INITIAL_PROMPT, vad_filter=True, vad_parameters=dict(min_silence_duration_ms=500), word_timestamps=False, ) self.update_state(state="PROGRESS", meta={"progress":30,"message":"텍스트 변환 중..."}) segments, parts = [], [] duration = info.duration for seg in segments_gen: segments.append({"start":round(seg.start,3),"end":round(seg.end,3),"text":seg.text.strip()}) parts.append(seg.text.strip()) if duration > 0: pct = 30 + int((seg.end/duration)*50) self.update_state(state="PROGRESS", meta={"progress":min(pct,80),"message":f"변환 중... {seg.end:.0f}s / {duration:.0f}s"}) raw_text = "\n".join(parts) full_text = raw_text if use_ollama and ollama_model: self.update_state(state="PROGRESS",meta={"progress":85,"message":f"Ollama({ollama_model}) 교정 중..."}) full_text = _ollama_postprocess(raw_text, ollama_model) elif use_openrouter and openrouter_model and openrouter_key: self.update_state(state="PROGRESS",meta={"progress":85,"message":f"OpenRouter({openrouter_model}) 교정 중..."}) full_text = _openrouter_postprocess(raw_text, openrouter_model, openrouter_url, openrouter_key) self.update_state(state="PROGRESS",meta={"progress":95,"message":"파일 저장 중..."}) os.makedirs(OUTPUT_DIR, exist_ok=True) output_filename = f"{file_id}.txt" with open(os.path.join(OUTPUT_DIR, output_filename),"w",encoding="utf-8") as f: f.write(f"# 변환 결과\n# 언어: {info.language} | 재생 시간: {duration:.1f}초\n\n## 전체 텍스트\n\n{full_text}\n\n## 타임스탬프별 세그먼트\n\n") for seg in segments: m,s=divmod(int(seg['start']),60) f.write(f"[{m:02d}:{s:02d}] {seg['text']}\n") try: os.remove(audio_path) except: pass return { "text":full_text,"raw_text":raw_text,"segments":segments, "language":info.language,"duration":round(duration,1), "output_file":output_filename, "ollama_used":use_ollama and bool(ollama_model), "ollama_model":ollama_model if (use_ollama and ollama_model) else "", "openrouter_used":use_openrouter and bool(openrouter_model) and bool(openrouter_key), "openrouter_model":openrouter_model if (use_openrouter and openrouter_model) else "", } except Exception as e: raise Exception(f"변환 실패: {str(e)}") # ══════════════════════════════════════════════════════════════ # 자막 파이프라인 태스크 # Step 1: ffmpeg → WAV # Step 2: Whisper → 원어 SRT/VTT # Step 3: LLM → 번역 SRT/VTT (선택) # ══════════════════════════════════════════════════════════════ @celery_app.task(bind=True, name="tasks.subtitle_pipeline_task", queue="stt") def subtitle_pipeline_task( self, file_id: str, video_path: str, src_language: str = "", # 원어 코드 (빈칸=자동) subtitle_fmt: str = "srt", # srt | vtt | both translate_to: str = "", # 번역 대상 (빈칸=번역 안 함) trans_model: str = "", # 번역 모델 trans_via: str = "ollama",# ollama | openrouter openrouter_url: str = "", openrouter_key: str = "", ): os.makedirs(OUTPUT_DIR, exist_ok=True) wav_path = os.path.join(os.path.dirname(video_path), f"{file_id}_audio.wav") result_files = {} try: # ── Step 1: ffmpeg 오디오 추출 ──────────────────────── self.update_state(state="PROGRESS", meta={ "progress": 5, "step": 1, "step_msg": "오디오 추출 중...", "message": "Step 1/3 — ffmpeg 오디오 추출 중..." }) cmd = [ "ffmpeg", "-y", "-i", video_path, "-vn", # 비디오 스트림 제거 "-ar", "16000", # 16kHz — Whisper 최적 "-ac", "1", # 모노 "-c:a", "pcm_s16le",# WAV 무손실 wav_path ] proc = subprocess.run(cmd, capture_output=True, timeout=600) if proc.returncode != 0: err = proc.stderr.decode(errors="replace")[-500:] raise Exception(f"ffmpeg 오디오 추출 실패: {err}") if not os.path.exists(wav_path) or os.path.getsize(wav_path) < 1000: raise Exception("ffmpeg가 오디오를 추출하지 못했습니다. 영상에 오디오 트랙이 있는지 확인하세요.") try: os.remove(video_path) except: pass # ── Step 2: Whisper STT → 원어 자막 ─────────────────── self.update_state(state="PROGRESS", meta={ "progress": 15, "step": 2, "step_msg": "음성 인식 중...", "message": "Step 2/3 — Whisper 음성 인식 시작..." }) whisper = get_model() lang = src_language.strip() or None segments_gen, info = whisper.transcribe( wav_path, language=lang, beam_size=BEAM_SIZE, initial_prompt=INITIAL_PROMPT, vad_filter=True, vad_parameters=dict(min_silence_duration_ms=500), word_timestamps=False, ) segments = [] duration = info.duration detected_lang = info.language for seg in segments_gen: segments.append({ "start": round(seg.start, 3), "end": round(seg.end, 3), "text": seg.text.strip(), }) if duration > 0: pct = 15 + int((seg.end / duration) * 55) self.update_state(state="PROGRESS", meta={ "progress": min(pct, 70), "step": 2, "step_msg": f"{seg.end:.0f}s / {duration:.0f}s 인식 완료", "message": f"Step 2/3 — {seg.end:.0f}s / {duration:.0f}s", }) try: os.remove(wav_path) except: pass if not segments: raise Exception("음성이 감지되지 않았습니다. 영상에 음성이 있는지 확인하세요.") # 원어 자막 저장 lang_suffix = detected_lang if subtitle_fmt in ("srt", "both"): fn = f"{file_id}.{lang_suffix}.srt" with open(os.path.join(OUTPUT_DIR, fn), "w", encoding="utf-8") as f: f.write(make_srt(segments)) result_files["srt_orig"] = fn if subtitle_fmt in ("vtt", "both"): fn = f"{file_id}.{lang_suffix}.vtt" with open(os.path.join(OUTPUT_DIR, fn), "w", encoding="utf-8") as f: f.write(make_vtt(segments)) result_files["vtt_orig"] = fn # ── Step 3: LLM 번역 (선택) ─────────────────────────── translated_segments = None if translate_to and translate_to != detected_lang and trans_model: target_name = _lang_name(translate_to) use_or = (trans_via == "openrouter" and bool(openrouter_key)) total = len(segments) CHUNK = 25 # 한 번에 25개씩 번역 translated_texts = [] for ci, start in enumerate(range(0, total, CHUNK)): chunk = segments[start:start+CHUNK] pct = 72 + int((ci * CHUNK / total) * 22) self.update_state(state="PROGRESS", meta={ "progress": min(pct, 94), "step": 3, "step_msg": f"{min(start+CHUNK, total)}/{total}개 번역 완료", "message": f"Step 3/3 — {target_name}로 번역 중... ({min(start+CHUNK,total)}/{total})", }) batch_texts = [s["text"] for s in chunk] translated = _translate_batch( batch_texts, translate_to, use_openrouter=use_or, model=trans_model, openrouter_url=openrouter_url, openrouter_key=openrouter_key, ) translated_texts.extend(translated) # 번역된 텍스트 → 세그먼트 조합 (타임스탬프 유지) translated_segments = [ {**seg, "text": translated_texts[i] if i < len(translated_texts) else seg["text"]} for i, seg in enumerate(segments) ] # 번역 자막 저장 trans_suffix = translate_to if subtitle_fmt in ("srt", "both"): fn = f"{file_id}.{trans_suffix}.srt" with open(os.path.join(OUTPUT_DIR, fn), "w", encoding="utf-8") as f: f.write(make_srt(translated_segments)) result_files["srt_trans"] = fn if subtitle_fmt in ("vtt", "both"): fn = f"{file_id}.{trans_suffix}.vtt" with open(os.path.join(OUTPUT_DIR, fn), "w", encoding="utf-8") as f: f.write(make_vtt(translated_segments)) result_files["vtt_trans"] = fn self.update_state(state="PROGRESS", meta={ "progress": 98, "step": 3, "step_msg": "완료", "message": "자막 파일 저장 완료" }) return { "detected_language": detected_lang, "duration": round(duration, 1), "segment_count": len(segments), "translated": bool(translated_segments), "translate_to": translate_to if translated_segments else "", "subtitle_fmt": subtitle_fmt, # 파일 "srt_orig": result_files.get("srt_orig", ""), "vtt_orig": result_files.get("vtt_orig", ""), "srt_trans": result_files.get("srt_trans", ""), "vtt_trans": result_files.get("vtt_trans", ""), } except Exception as e: # 임시 파일 정리 for p in [video_path, wav_path]: try: os.remove(p) except: pass raise Exception(f"자막 생성 실패: {str(e)}")