Files
whisper-stt/app/tasks.py

455 lines
23 KiB
Python

"""
STT + Subtitle Pipeline Celery Tasks
"""
import os, json, subprocess, tempfile
import httpx
from celery import Celery
from ocr_tasks import ocr_task # noqa: F401
REDIS_URL = os.getenv("REDIS_URL", "redis://redis:6379/0")
MODEL_SIZE = os.getenv("WHISPER_MODEL", "medium")
DEVICE = os.getenv("WHISPER_DEVICE", "cpu")
COMPUTE_TYPE = os.getenv("WHISPER_COMPUTE_TYPE", "int8")
LANGUAGE = os.getenv("WHISPER_LANGUAGE", "ko") or None
BEAM_SIZE = int(os.getenv("WHISPER_BEAM_SIZE", "5"))
INITIAL_PROMPT = os.getenv("WHISPER_INITIAL_PROMPT", "") or None
OUTPUT_DIR = os.getenv("OUTPUT_DIR", "/data/outputs")
OLLAMA_URL = os.getenv("OLLAMA_URL", "http://192.168.0.126:11434")
OLLAMA_TIMEOUT = int(os.getenv("OLLAMA_TIMEOUT", "600"))
GROQ_BASE = "https://api.groq.com/openai/v1"
OPENAI_BASE = "https://api.openai.com/v1"
_cpu_threads_env = int(os.getenv("CPU_THREADS", "0"))
CPU_THREADS = _cpu_threads_env if _cpu_threads_env > 0 else None
celery_app = Celery("whisper_tasks", broker=REDIS_URL, backend=REDIS_URL)
celery_app.conf.update(
task_serializer="json", result_serializer="json",
accept_content=["json"], task_track_started=True, result_expires=86400,
)
_whisper_model = None
def get_model():
global _whisper_model
if _whisper_model is None:
from faster_whisper import WhisperModel
kwargs = dict(device=DEVICE, compute_type=COMPUTE_TYPE)
if CPU_THREADS is not None: kwargs["cpu_threads"] = CPU_THREADS
print(f"[Whisper] 로딩: {MODEL_SIZE}/{DEVICE}/{COMPUTE_TYPE}")
_whisper_model = WhisperModel(MODEL_SIZE, **kwargs)
print("[Whisper] 완료")
return _whisper_model
# ══════════════════════════════════════════════════════════════
# 공통 유틸
# ══════════════════════════════════════════════════════════════
LANG_NAMES = {
"ko":"한국어","en":"English","ja":"日本語","zh":"中文(简体)",
"zh-tw":"中文(繁體)","fr":"Français","de":"Deutsch","es":"Español",
"it":"Italiano","pt":"Português","ru":"Русский","ar":"العربية",
"vi":"Tiếng Việt","th":"ไทย","id":"Bahasa Indonesia",
"nl":"Nederlands","pl":"Polski","tr":"Türkçe","sv":"Svenska",
"uk":"Українська","hi":"हिन्दी",
}
def _lang_name(code): return LANG_NAMES.get(code, code)
def _srt_time(s):
ms=int(round(s*1000)); h,r=divmod(ms,3600000); m,r=divmod(r,60000); sec,ms=divmod(r,1000)
return f"{h:02d}:{m:02d}:{sec:02d},{ms:03d}"
def _vtt_time(s): return _srt_time(s).replace(",",".")
def make_srt(segments):
out=[]
for i,seg in enumerate(segments,1):
out+=[str(i),f"{_srt_time(seg['start'])} --> {_srt_time(seg['end'])}",seg["text"].strip(),""]
return "\n".join(out)
def make_vtt(segments):
out=["WEBVTT",""]
for i,seg in enumerate(segments,1):
out+=[str(i),f"{_vtt_time(seg['start'])} --> {_vtt_time(seg['end'])}",seg["text"].strip(),""]
return "\n".join(out)
def _llm_call(prompt, model, use_openrouter, openrouter_url, openrouter_key, timeout):
"""LLM 호출 — 명확한 에러 메시지 포함"""
if use_openrouter:
if not openrouter_key:
raise Exception("OpenRouter API 키가 설정되지 않았습니다. 설정 → OpenRouter에서 입력하세요.")
try:
resp = httpx.post(
f"{openrouter_url.rstrip('/')}/chat/completions",
headers={"Authorization":f"Bearer {openrouter_key}",
"HTTP-Referer":"https://voicescript.local","Content-Type":"application/json"},
json={"model":model,"messages":[{"role":"user","content":prompt}],"temperature":0.2},
timeout=float(timeout),
)
resp.raise_for_status()
return resp.json()["choices"][0]["message"]["content"].strip()
except httpx.TimeoutException:
raise Exception(f"OpenRouter 응답 시간 초과 ({timeout}초). 설정에서 타임아웃을 늘리거나 모델을 변경하세요.")
except httpx.HTTPStatusError as e:
raise Exception(f"OpenRouter 오류 ({e.response.status_code}): API 키 또는 모델을 확인하세요.")
else:
try:
resp = httpx.post(f"{OLLAMA_URL}/api/chat",
json={"model":model,"messages":[{"role":"user","content":prompt}],
"stream":False,"options":{"temperature":0.2}},
timeout=float(timeout))
resp.raise_for_status()
result = resp.json().get("message",{}).get("content","").strip()
if not result:
raise Exception(f"Ollama({model}) 빈 응답. 모델이 실행 중인지 확인: ollama list")
return result
except httpx.ConnectError:
raise Exception(f"Ollama 서버 연결 실패 ({OLLAMA_URL}). 서버가 실행 중인지 확인하세요.")
except httpx.TimeoutException:
raise Exception(
f"Ollama({model}) 응답 시간 초과 ({timeout}초).\n"
f"원인: 모델 로딩 중이거나, 시스템 리소스 부족, 또는 모델이 응답하지 않음.\n"
f"해결: 설정에서 Ollama 타임아웃을 늘리거나, 더 작은 모델을 사용하세요."
)
def _translate_batch(texts, target_lang, use_or, model, or_url, or_key, timeout):
if not texts or not model: return texts
prompt = (
f"아래 자막 문장 배열을 {_lang_name(target_lang)}로 번역해줘.\n"
f"반드시 JSON 문자열 배열로만 답해. 설명·마크다운 없이 배열만 출력.\n"
f"입력과 동일한 개수와 순서를 유지해.\n\n"
f"{json.dumps(texts, ensure_ascii=False)}"
)
try:
raw = _llm_call(prompt, model, use_or, or_url, or_key, timeout)
if "```" in raw: raw=raw.split("```")[1].lstrip("json\n").rstrip()
result = json.loads(raw)
if isinstance(result,list) and len(result)==len(texts):
return [str(r) for r in result]
return texts
except Exception as e:
print(f"[번역 실패] {e}")
return texts
def _refine_batch(texts, model, use_or, or_url, or_key, timeout):
if not texts or not model: return texts
prompt = (
"아래는 음성 인식 자막 문장 배열입니다.\n"
"내용은 절대 변경하지 말고, 문장 부호만 자연스럽게 교정해줘.\n"
"반드시 JSON 문자열 배열로만 답해. 설명·마크다운 없이 배열만.\n"
"입력과 동일한 개수와 순서를 유지해.\n\n"
f"{json.dumps(texts, ensure_ascii=False)}"
)
try:
raw = _llm_call(prompt, model, use_or, or_url, or_key, timeout)
if "```" in raw: raw=raw.split("```")[1].lstrip("json\n").rstrip()
result = json.loads(raw)
if isinstance(result,list) and len(result)==len(texts):
return [str(r) for r in result]
return texts
except Exception as e:
print(f"[교정 실패] {e}")
return texts
def _ollama_postprocess(text, model):
if not model or not text.strip(): return text
prompt=("다음은 음성 인식으로 추출된 텍스트입니다. 내용은 절대 변경하지 말고 문장 부호만 추가해줘. "
"결과 텍스트만 출력하고 설명은 하지 마.\n\n"+text)
try:
raw=_llm_call(prompt,model,False,"","",OLLAMA_TIMEOUT)
return raw if raw else text
except Exception as e:
print(f"[Ollama 후처리 실패] {e}"); return text
def _openrouter_postprocess(text, model, base_url, api_key):
if not model or not api_key or not text.strip(): return text
prompt=("다음은 음성 인식으로 추출된 텍스트입니다. 내용은 절대 변경하지 말고 문장 부호만 추가해줘. "
"결과 텍스트만 출력하고 설명은 하지 마.\n\n"+text)
try:
raw=_llm_call(prompt,model,True,base_url,api_key,OLLAMA_TIMEOUT)
return raw if raw else text
except Exception as e:
print(f"[OpenRouter 후처리 실패] {e}"); return text
def _api_transcribe(audio_path, api_key, base_url, language, model="whisper-large-v3"):
"""Groq / OpenAI Whisper API 호출"""
with open(audio_path,"rb") as f:
data = f.read()
params = {"model":model}
if language: params["language"] = language
try:
resp = httpx.post(
f"{base_url}/audio/transcriptions",
headers={"Authorization":f"Bearer {api_key}"},
files={"file":("audio.mp3", data, "audio/mpeg")},
data=params,
timeout=600.0,
)
resp.raise_for_status()
d = resp.json()
text = d.get("text","")
# segments 구조 없으면 전체 텍스트로 단일 세그먼트
segs = d.get("segments",[])
if not segs and text:
segs = [{"start":0,"end":0,"text":text}]
return {"text":text, "segments":segs,
"language":d.get("language", language or ""), "duration":0}
except httpx.TimeoutException:
raise Exception(f"API 응답 시간 초과. 파일이 너무 크거나 서버 문제일 수 있습니다.")
except httpx.HTTPStatusError as e:
raise Exception(f"API 오류 ({e.response.status_code}): API 키를 확인하세요.")
# ══════════════════════════════════════════════════════════════
# STT Task (음성변환 탭)
# ══════════════════════════════════════════════════════════════
@celery_app.task(bind=True, name="tasks.transcribe_task", queue="stt")
def transcribe_task(
self,
file_id:str, audio_path:str,
use_ollama:bool=False, ollama_model:str="",
use_openrouter:bool=False, openrouter_model:str="",
openrouter_url:str="", openrouter_key:str="",
stt_engine:str="local",
groq_api_key:str="", openai_api_key:str="",
stt_language:str="",
):
self.update_state(state="PROGRESS", meta={"progress":5,"message":"모델 준비 중..."})
tmp_mp3=None
try:
segments=[]; duration=0.0; detected_lang=""
if stt_engine in ("groq","openai"):
api_key = groq_api_key if stt_engine=="groq" else openai_api_key
base_url= GROQ_BASE if stt_engine=="groq" else OPENAI_BASE
if not api_key:
raise Exception(f"{stt_engine.upper()} API 키가 설정되지 않았습니다. 설정 → STT 엔진 API 키에서 입력하세요.")
self.update_state(state="PROGRESS",meta={"progress":20,"message":f"{stt_engine.upper()} API 변환 중..."})
import tempfile
suffix=".mp3"
with tempfile.NamedTemporaryFile(suffix=suffix,delete=False) as tf: tmp_mp3=tf.name
cmd=["ffmpeg","-y","-i",audio_path,"-ar","16000","-ac","1","-b:a","128k",tmp_mp3]
r=subprocess.run(cmd,capture_output=True,timeout=300)
if r.returncode!=0: raise Exception(f"ffmpeg 변환 실패: {r.stderr.decode(errors='replace')[-200:]}")
result=_api_transcribe(tmp_mp3,api_key,base_url,stt_language)
segments=[{"start":round(s.get("start",0),3),"end":round(s.get("end",0),3),"text":s.get("text","").strip()}
for s in result.get("segments",[])]
detected_lang=result.get("language","")
duration=result.get("duration",0) or (segments[-1]["end"] if segments else 0)
else:
model=get_model()
self.update_state(state="PROGRESS",meta={"progress":15,"message":"오디오 분석 중..."})
lang=stt_language.strip() or LANGUAGE
segments_gen,info=model.transcribe(audio_path,language=lang,beam_size=BEAM_SIZE,
initial_prompt=INITIAL_PROMPT,vad_filter=True,
vad_parameters=dict(min_silence_duration_ms=500),word_timestamps=False)
self.update_state(state="PROGRESS",meta={"progress":30,"message":"텍스트 변환 중..."})
duration=info.duration; detected_lang=info.language
for seg in segments_gen:
segments.append({"start":round(seg.start,3),"end":round(seg.end,3),"text":seg.text.strip()})
if duration>0:
pct=30+int((seg.end/duration)*50)
self.update_state(state="PROGRESS",meta={"progress":min(pct,80),"message":f"변환 중... {seg.end:.0f}s/{duration:.0f}s"})
raw_text="\n".join(s["text"] for s in segments)
full_text=raw_text
if use_ollama and ollama_model:
self.update_state(state="PROGRESS",meta={"progress":85,"message":f"Ollama({ollama_model}) 교정 중..."})
full_text=_ollama_postprocess(raw_text,ollama_model)
elif use_openrouter and openrouter_model and openrouter_key:
self.update_state(state="PROGRESS",meta={"progress":85,"message":f"OpenRouter({openrouter_model}) 교정 중..."})
full_text=_openrouter_postprocess(raw_text,openrouter_model,openrouter_url,openrouter_key)
self.update_state(state="PROGRESS",meta={"progress":95,"message":"파일 저장 중..."})
os.makedirs(OUTPUT_DIR,exist_ok=True)
output_filename=f"{file_id}.txt"
with open(os.path.join(OUTPUT_DIR,output_filename),"w",encoding="utf-8") as f:
f.write(f"# 변환 결과\n# 언어: {detected_lang} | 재생시간: {duration:.1f}\n\n{full_text}\n\n## 타임스탬프\n\n")
for seg in segments:
m,s=divmod(int(seg['start']),60)
f.write(f"[{m:02d}:{s:02d}] {seg['text']}\n")
for p in [audio_path, tmp_mp3]:
try:
if p: os.remove(p)
except: pass
return {
"text":full_text,"raw_text":raw_text,"segments":segments,
"language":detected_lang,"duration":round(duration,1),
"output_file":output_filename,
"ollama_used":use_ollama and bool(ollama_model),
"ollama_model":ollama_model if (use_ollama and ollama_model) else "",
"openrouter_used":use_openrouter and bool(openrouter_model) and bool(openrouter_key),
"openrouter_model":openrouter_model if (use_openrouter and openrouter_model) else "",
"stt_engine":stt_engine,
}
except Exception as e:
for p in [audio_path, tmp_mp3]:
try:
if p: os.remove(p)
except: pass
raise Exception(f"변환 실패: {str(e)}")
# ══════════════════════════════════════════════════════════════
# 자막 파이프라인 Task
# Step 1: ffmpeg → WAV
# Step 2: Whisper / API → 원어 자막
# Step 2b: LLM 교정 (선택)
# Step 3: LLM 번역 (선택)
# ══════════════════════════════════════════════════════════════
@celery_app.task(bind=True, name="tasks.subtitle_pipeline_task", queue="stt")
def subtitle_pipeline_task(
self,
file_id:str, video_path:str,
src_language:str="",
subtitle_fmt:str="srt",
# STT 엔진
stt_engine:str="local",
groq_api_key:str="", openai_api_key:str="",
# 교정
refine_model:str="", refine_via:str="ollama",
# 번역
translate_to:str="", trans_model:str="", trans_via:str="ollama",
# 공통 API 설정
openrouter_url:str="", openrouter_key:str="",
# 타임아웃 (설정에서 받아옴)
subtitle_timeout:int=0, # 0=OLLAMA_TIMEOUT 기본값
):
os.makedirs(OUTPUT_DIR,exist_ok=True)
wav_path=os.path.join(os.path.dirname(video_path),f"{file_id}_audio.wav")
tmp_mp3=None
result_files={}
timeout=subtitle_timeout if subtitle_timeout>0 else OLLAMA_TIMEOUT
def _prog(pct, step, step_msg, msg):
self.update_state(state="PROGRESS",meta={"progress":pct,"step":step,"step_msg":step_msg,"message":msg})
try:
# ── Step 1: ffmpeg ────────────────────────────────────
_prog(5,1,"오디오 추출 중...","Step 1/3 — ffmpeg 오디오 추출 중...")
cmd=["ffmpeg","-y","-i",video_path,"-vn","-ar","16000","-ac","1","-c:a","pcm_s16le",wav_path]
proc=subprocess.run(cmd,capture_output=True,timeout=600)
if proc.returncode!=0:
raise Exception(f"ffmpeg 오디오 추출 실패: {proc.stderr.decode(errors='replace')[-300:]}")
if not os.path.exists(wav_path) or os.path.getsize(wav_path)<1000:
raise Exception("ffmpeg가 오디오를 추출하지 못했습니다. 영상에 오디오 트랙이 있는지 확인하세요.")
try: os.remove(video_path)
except: pass
# ── Step 2: STT ──────────────────────────────────────
_prog(15,2,"음성 인식 중...","Step 2/3 — 음성 인식 시작...")
segments=[]; duration=0.0; detected_lang=""
if stt_engine in ("groq","openai"):
api_key=groq_api_key if stt_engine=="groq" else openai_api_key
base_url=GROQ_BASE if stt_engine=="groq" else OPENAI_BASE
if not api_key:
raise Exception(f"{stt_engine.upper()} API 키가 없습니다. 설정에서 입력하세요.")
import tempfile
with tempfile.NamedTemporaryFile(suffix=".mp3",delete=False) as tf: tmp_mp3=tf.name
r=subprocess.run(["ffmpeg","-y","-i",wav_path,"-ar","16000","-ac","1","-b:a","128k",tmp_mp3],
capture_output=True,timeout=300)
if r.returncode!=0: raise Exception("MP3 변환 실패")
_prog(25,2,"API 음성 인식 중...",f"Step 2/3 — {stt_engine.upper()} API 인식 중...")
result=_api_transcribe(tmp_mp3,api_key,base_url,src_language)
segments=[{"start":round(s.get("start",0),3),"end":round(s.get("end",0),3),"text":s.get("text","").strip()}
for s in result.get("segments",[])]
detected_lang=result.get("language","")
duration=result.get("duration",0) or (segments[-1]["end"] if segments else 0)
try: os.remove(tmp_mp3); tmp_mp3=None
except: pass
else:
whisper=get_model()
lang=src_language.strip() or None
segments_gen,info=whisper.transcribe(wav_path,language=lang,beam_size=BEAM_SIZE,
initial_prompt=INITIAL_PROMPT,vad_filter=True,
vad_parameters=dict(min_silence_duration_ms=500),word_timestamps=False)
duration=info.duration; detected_lang=info.language
for seg in segments_gen:
segments.append({"start":round(seg.start,3),"end":round(seg.end,3),"text":seg.text.strip()})
if duration>0:
pct=15+int((seg.end/duration)*50)
_prog(min(pct,65),2,f"{seg.end:.0f}s/{duration:.0f}s 인식",f"Step 2/3 — {seg.end:.0f}s / {duration:.0f}s")
try: os.remove(wav_path); wav_path=None
except: pass
if not segments:
raise Exception("음성이 감지되지 않았습니다. 영상에 음성이 있는지 확인하세요.")
# ── Step 2b: LLM 교정 ────────────────────────────────
if refine_model.strip():
use_or_refine=(refine_via=="openrouter" and bool(openrouter_key))
total=len(segments); CHUNK=25; refined=[]
for ci,start in enumerate(range(0,total,CHUNK)):
chunk=segments[start:start+CHUNK]
pct=67+int((ci*CHUNK/total)*6)
_prog(min(pct,73),2,f"교정 {min(start+CHUNK,total)}/{total}",
f"Step 2/3 — LLM 교정 중... ({min(start+CHUNK,total)}/{total})")
batch=[s["text"] for s in chunk]
refined.extend(_refine_batch(batch,refine_model,use_or_refine,openrouter_url,openrouter_key,timeout))
segments=[{**seg,"text":refined[i] if i<len(refined) else seg["text"]}
for i,seg in enumerate(segments)]
# 원어 자막 저장
lang_suffix=detected_lang
if subtitle_fmt in ("srt","both"):
fn=f"{file_id}.{lang_suffix}.srt"
with open(os.path.join(OUTPUT_DIR,fn),"w",encoding="utf-8") as f: f.write(make_srt(segments))
result_files["srt_orig"]=fn
if subtitle_fmt in ("vtt","both"):
fn=f"{file_id}.{lang_suffix}.vtt"
with open(os.path.join(OUTPUT_DIR,fn),"w",encoding="utf-8") as f: f.write(make_vtt(segments))
result_files["vtt_orig"]=fn
# ── Step 3: LLM 번역 ─────────────────────────────────
translated_segments=None
if translate_to and translate_to!=detected_lang and trans_model:
use_or=(trans_via=="openrouter" and bool(openrouter_key))
total=len(segments); CHUNK=25; trans_texts=[]
for ci,start in enumerate(range(0,total,CHUNK)):
chunk=segments[start:start+CHUNK]
pct=75+int((ci*CHUNK/total)*20)
_prog(min(pct,95),3,f"{min(start+CHUNK,total)}/{total} 번역",
f"Step 3/3 — {_lang_name(translate_to)}로 번역 중... ({min(start+CHUNK,total)}/{total})")
batch=[s["text"] for s in chunk]
trans_texts.extend(_translate_batch(batch,translate_to,use_or,trans_model,openrouter_url,openrouter_key,timeout))
translated_segments=[{**seg,"text":trans_texts[i] if i<len(trans_texts) else seg["text"]}
for i,seg in enumerate(segments)]
trans_suffix=translate_to
if subtitle_fmt in ("srt","both"):
fn=f"{file_id}.{trans_suffix}.srt"
with open(os.path.join(OUTPUT_DIR,fn),"w",encoding="utf-8") as f: f.write(make_srt(translated_segments))
result_files["srt_trans"]=fn
if subtitle_fmt in ("vtt","both"):
fn=f"{file_id}.{trans_suffix}.vtt"
with open(os.path.join(OUTPUT_DIR,fn),"w",encoding="utf-8") as f: f.write(make_vtt(translated_segments))
result_files["vtt_trans"]=fn
_prog(99,3,"완료","자막 파일 저장 완료")
return {
"detected_language":detected_lang,
"duration":round(duration,1),
"segment_count":len(segments),
"stt_engine":stt_engine,
"translated":bool(translated_segments),
"translate_to":translate_to if translated_segments else "",
"subtitle_fmt":subtitle_fmt,
"refine_model":refine_model if refine_model.strip() else "",
"srt_orig":result_files.get("srt_orig",""),
"vtt_orig":result_files.get("vtt_orig",""),
"srt_trans":result_files.get("srt_trans",""),
"vtt_trans":result_files.get("vtt_trans",""),
}
except Exception as e:
for p in [video_path, wav_path, tmp_mp3]:
try:
if p and os.path.exists(p): os.remove(p)
except: pass
raise Exception(f"자막 생성 실패: {str(e)}")