feat: VoiceScript STT+OCR 자막기능 수정

This commit is contained in:
root
2026-05-04 08:12:59 +09:00
parent b3805c2b0b
commit c3cb7a6e8f
4 changed files with 1371 additions and 886 deletions

View File

@@ -1,22 +1,11 @@
""" """
인증 모듈 — 다중 사용자 JSON 파일 기반 인증 모듈 — 다중 사용자 JSON 파일 기반
사용자 구조: 권한: stt | ocr | subtitle
{
"password": "...",
"role": "admin" | "user",
"permissions": {
"stt": true | false,
"ocr": true | false,
"allowed_stt_models": ["medium", "large-v3", ...], # 빈 배열 = 모두 허용
"allowed_ocr_models": ["granite3.2-vision", ...] # 빈 배열 = 모두 허용
}
}
""" """
import os, json, threading import os, json, threading
from pathlib import Path from pathlib import Path
from datetime import datetime, timedelta from datetime import datetime, timedelta
from fastapi import Depends, HTTPException
from fastapi import Depends, HTTPException, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from jose import JWTError, jwt from jose import JWTError, jwt
@@ -32,20 +21,15 @@ USERS_FILE = DATA_DIR / "users.json"
_lock = threading.Lock() _lock = threading.Lock()
bearer = HTTPBearer(auto_error=False) bearer = HTTPBearer(auto_error=False)
# ── 파일 I/O ──────────────────────────────────────────────────
def _load() -> dict: def _load() -> dict:
if not USERS_FILE.exists(): return {} if not USERS_FILE.exists(): return {}
with open(USERS_FILE, "r", encoding="utf-8") as f: with open(USERS_FILE, "r", encoding="utf-8") as f: return json.load(f)
return json.load(f)
def _save(users: dict): def _save(users: dict):
USERS_FILE.parent.mkdir(parents=True, exist_ok=True) USERS_FILE.parent.mkdir(parents=True, exist_ok=True)
with open(USERS_FILE, "w", encoding="utf-8") as f: with open(USERS_FILE, "w", encoding="utf-8") as f:
json.dump(users, f, ensure_ascii=False, indent=2) json.dump(users, f, ensure_ascii=False, indent=2)
# ── 초기화 ────────────────────────────────────────────────────
def init_users(): def init_users():
with _lock: with _lock:
users = _load() users = _load()
@@ -53,15 +37,12 @@ def init_users():
"password": ADMIN_PASSWORD, "password": ADMIN_PASSWORD,
"role": "admin", "role": "admin",
"permissions": { "permissions": {
"stt": True, "ocr": True, "stt": True, "ocr": True, "subtitle": True,
"allowed_stt_models": [], # 빈 배열 = 제한 없음 "allowed_stt_models": [], "allowed_ocr_models": [],
"allowed_ocr_models": [],
}, },
} }
_save(users) _save(users)
# ── CRUD ──────────────────────────────────────────────────────
def authenticate(username: str, password: str): def authenticate(username: str, password: str):
with _lock: users = _load() with _lock: users = _load()
u = users.get(username) u = users.get(username)
@@ -80,9 +61,9 @@ def create_user(username: str, password: str, permissions: dict) -> tuple:
with _lock: with _lock:
users = _load() users = _load()
if username in users: return False, "이미 존재하는 사용자입니다" if username in users: return False, "이미 존재하는 사용자입니다"
# 기본값 보완
permissions.setdefault("allowed_stt_models", []) permissions.setdefault("allowed_stt_models", [])
permissions.setdefault("allowed_ocr_models", []) permissions.setdefault("allowed_ocr_models", [])
permissions.setdefault("subtitle", False)
users[username] = {"password": password, "role": "user", "permissions": permissions} users[username] = {"password": password, "role": "user", "permissions": permissions}
_save(users) _save(users)
return True, "사용자가 생성되었습니다" return True, "사용자가 생성되었습니다"
@@ -94,6 +75,7 @@ def update_user(username: str, permissions: dict, password: str = None) -> tuple
if username not in users: return False, "사용자를 찾을 수 없습니다" if username not in users: return False, "사용자를 찾을 수 없습니다"
permissions.setdefault("allowed_stt_models", []) permissions.setdefault("allowed_stt_models", [])
permissions.setdefault("allowed_ocr_models", []) permissions.setdefault("allowed_ocr_models", [])
permissions.setdefault("subtitle", False)
users[username]["permissions"] = permissions users[username]["permissions"] = permissions
if password: users[username]["password"] = password if password: users[username]["password"] = password
_save(users) _save(users)
@@ -107,14 +89,10 @@ def delete_user(username: str) -> tuple:
del users[username]; _save(users) del users[username]; _save(users)
return True, "삭제되었습니다" return True, "삭제되었습니다"
# ── JWT ───────────────────────────────────────────────────────
def create_access_token(username: str) -> str: def create_access_token(username: str) -> str:
exp = datetime.utcnow() + timedelta(hours=EXPIRE_HOURS) exp = datetime.utcnow() + timedelta(hours=EXPIRE_HOURS)
return jwt.encode({"sub": username, "exp": exp}, SECRET_KEY, algorithm=ALGORITHM) return jwt.encode({"sub": username, "exp": exp}, SECRET_KEY, algorithm=ALGORITHM)
# ── FastAPI 의존성 ─────────────────────────────────────────────
def require_auth(credentials: HTTPAuthorizationCredentials = Depends(bearer)) -> dict: def require_auth(credentials: HTTPAuthorizationCredentials = Depends(bearer)) -> dict:
if credentials is None: if credentials is None:
raise HTTPException(401, "인증이 필요합니다", headers={"WWW-Authenticate": "Bearer"}) raise HTTPException(401, "인증이 필요합니다", headers={"WWW-Authenticate": "Bearer"})
@@ -126,16 +104,24 @@ def require_auth(credentials: HTTPAuthorizationCredentials = Depends(bearer)) ->
if not u: raise JWTError() if not u: raise JWTError()
return {"username": username, **u} return {"username": username, **u}
except JWTError: except JWTError:
raise HTTPException(401, "토큰이 유효하지 않거나 만료되었습니다", headers={"WWW-Authenticate": "Bearer"}) raise HTTPException(401, "토큰이 유효하지 않거나 만료되었습니다",
headers={"WWW-Authenticate": "Bearer"})
def require_admin(user: dict = Depends(require_auth)) -> dict: def require_admin(user: dict = Depends(require_auth)) -> dict:
if user.get("role") != "admin": raise HTTPException(403, "관리자 권한이 필요합니다") if user.get("role") != "admin": raise HTTPException(403, "관리자 권한이 필요합니다")
return user return user
def require_stt(user: dict = Depends(require_auth)) -> dict: def require_stt(user: dict = Depends(require_auth)) -> dict:
if not user.get("permissions", {}).get("stt", False): raise HTTPException(403, "STT 사용 권한이 없습니다") if not user.get("permissions", {}).get("stt", False):
raise HTTPException(403, "STT 사용 권한이 없습니다")
return user return user
def require_ocr(user: dict = Depends(require_auth)) -> dict: def require_ocr(user: dict = Depends(require_auth)) -> dict:
if not user.get("permissions", {}).get("ocr", False): raise HTTPException(403, "OCR 사용 권한이 없습니다") if not user.get("permissions", {}).get("ocr", False):
raise HTTPException(403, "OCR 사용 권한이 없습니다")
return user
def require_subtitle(user: dict = Depends(require_auth)) -> dict:
if not user.get("permissions", {}).get("subtitle", False):
raise HTTPException(403, "자막 사용 권한이 없습니다")
return user return user

View File

@@ -8,7 +8,7 @@ from fastapi.responses import FileResponse
from typing import List from typing import List
from auth import (authenticate, create_access_token, init_users, from auth import (authenticate, create_access_token, init_users,
require_auth, require_admin, require_stt, require_ocr, require_auth, require_admin, require_stt, require_ocr, require_subtitle,
list_users, create_user, update_user, delete_user) list_users, create_user, update_user, delete_user)
from tasks import celery_app, transcribe_task, subtitle_pipeline_task from tasks import celery_app, transcribe_task, subtitle_pipeline_task
from ocr_tasks import ocr_task from ocr_tasks import ocr_task
@@ -24,26 +24,26 @@ OUTPUT_KEEP_SECS = int(os.getenv("OUTPUT_KEEP_HOURS", "48")) * 3600
DATA_DIR = Path(UPLOAD_DIR).parent DATA_DIR = Path(UPLOAD_DIR).parent
SETTINGS_FILE = DATA_DIR / "settings.json" SETTINGS_FILE = DATA_DIR / "settings.json"
HISTORY_FILE = DATA_DIR / "history.json" HISTORY_FILE = DATA_DIR / "history.json"
HISTORY_MAX = 300 HISTORY_MAX = 500
os.makedirs(UPLOAD_DIR, exist_ok=True) os.makedirs(UPLOAD_DIR, exist_ok=True)
os.makedirs(OUTPUT_DIR, exist_ok=True) os.makedirs(OUTPUT_DIR, exist_ok=True)
AUDIO_EXT = {"mp3","mp4","wav","m4a","ogg","flac","aac","wma","webm", AUDIO_EXT = {"mp3","mp4","wav","m4a","ogg","flac","aac","wma","webm",
"mkv","avi","mov","ts","mts","m2ts","wmv","flv","h264","h265","hevc","264","265"} "mkv","avi","mov","ts","mts","m2ts","wmv","flv","h264","h265","hevc","264","265","m4v"}
IMAGE_EXT = {"jpg","jpeg","png","bmp","tiff","tif","webp","gif"} IMAGE_EXT = {"jpg","jpeg","png","bmp","tiff","tif","webp","gif"}
VIDEO_EXT = {"mp4","mkv","avi","mov","webm","ts","mts","m2ts","wmv","flv","h264","h265","hevc","264","265","m4v","3gp","rm","rmvb"}
_DEFAULT_SETTINGS = { _DEFAULT_SETTINGS = {
"stt_ollama_model":"","ocr_ollama_model":"granite3.2-vision:latest", "stt_ollama_model":"","ocr_ollama_model":"granite3.2-vision:latest",
"cpu_threads":0,"stt_timeout":0,"ollama_timeout":600, "cpu_threads":0,"stt_timeout":0,"ollama_timeout":600,"subtitle_timeout":600,
"openrouter_url":"https://openrouter.ai/api/v1", "openrouter_url":"https://openrouter.ai/api/v1",
"openrouter_api_key":"","openrouter_stt_model":"","openrouter_ocr_model":"", "openrouter_api_key":"","openrouter_stt_model":"","openrouter_ocr_model":"",
"groq_api_key":"","openai_api_key":"","default_stt_engine":"local",
} }
_hist_lock = threading.Lock() _hist_lock = threading.Lock()
# ── 설정 I/O ───────────────────────────────────────────────── # ── 설정 I/O ─────────────────────────────────────────────────
def _load_settings() -> dict: def _load_settings() -> dict:
if not SETTINGS_FILE.exists(): return dict(_DEFAULT_SETTINGS) if not SETTINGS_FILE.exists(): return dict(_DEFAULT_SETTINGS)
with open(SETTINGS_FILE,"r",encoding="utf-8") as f: data=json.load(f) with open(SETTINGS_FILE,"r",encoding="utf-8") as f: data=json.load(f)
@@ -54,8 +54,15 @@ def _save_settings(data:dict):
SETTINGS_FILE.parent.mkdir(parents=True,exist_ok=True) SETTINGS_FILE.parent.mkdir(parents=True,exist_ok=True)
with open(SETTINGS_FILE,"w",encoding="utf-8") as f: json.dump(data,f,ensure_ascii=False,indent=2) with open(SETTINGS_FILE,"w",encoding="utf-8") as f: json.dump(data,f,ensure_ascii=False,indent=2)
def _mask(key:str)->str:
if not key: return ""
return key[:6]+"..."+(key[-4:] if len(key)>10 else "")
# ── 이력 I/O ───────────────────────────────────────────────── def _keep(new_val:str, field:str, current:dict)->str:
return new_val.strip() if new_val.strip() else current.get(field,"")
# ── 이력 I/O ──────────────────────────────────────────────────
def _load_history()->list: def _load_history()->list:
with _hist_lock: with _hist_lock:
if not HISTORY_FILE.exists(): return [] if not HISTORY_FILE.exists(): return []
@@ -83,10 +90,10 @@ def _update_history_by_task(task_id:str, result:dict, success:bool, error_msg:st
with open(HISTORY_FILE,"r",encoding="utf-8") as f: history=json.load(f) with open(HISTORY_FILE,"r",encoding="utf-8") as f: history=json.load(f)
for h in history: for h in history:
if h.get("task_id")!=task_id: continue if h.get("task_id")!=task_id: continue
if h.get("status")!="processing": break if h.get("status") not in ("processing","cancelled"): break
h["status"]="failed" if not success else "success" h["status"]="failed" if not success else "success"
if not success: if not success:
h["output"]={"error":error_msg[:300]} h["output"]={"error":error_msg[:500]}
elif h["type"]=="stt": elif h["type"]=="stt":
text=result.get("text","") text=result.get("text","")
h["output"]={ h["output"]={
@@ -99,14 +106,17 @@ def _update_history_by_task(task_id:str, result:dict, success:bool, error_msg:st
"ollama_model":result.get("ollama_model",""), "ollama_model":result.get("ollama_model",""),
"openrouter_used":result.get("openrouter_used",False), "openrouter_used":result.get("openrouter_used",False),
"openrouter_model":result.get("openrouter_model",""), "openrouter_model":result.get("openrouter_model",""),
"stt_engine":result.get("stt_engine","local"),
} }
elif h["type"]=="subtitle": elif h["type"]=="subtitle":
h["output"]={ h["output"]={
"detected_language":result.get("detected_language",""), "detected_language":result.get("detected_language",""),
"duration_s":result.get("duration",0), "duration_s":result.get("duration",0),
"segment_count":result.get("segment_count",0), "segment_count":result.get("segment_count",0),
"stt_engine":result.get("stt_engine","local"),
"translated":result.get("translated",False), "translated":result.get("translated",False),
"translate_to":result.get("translate_to",""), "translate_to":result.get("translate_to",""),
"refine_model":result.get("refine_model",""),
"srt_orig":result.get("srt_orig",""), "srt_orig":result.get("srt_orig",""),
"vtt_orig":result.get("vtt_orig",""), "vtt_orig":result.get("vtt_orig",""),
"srt_trans":result.get("srt_trans",""), "srt_trans":result.get("srt_trans",""),
@@ -163,7 +173,7 @@ def login(username:str=Form(...),password:str=Form(...)):
@app.get("/api/me") @app.get("/api/me")
def me(user:dict=Depends(require_auth)): def me(user:dict=Depends(require_auth)):
return {"username":user["username"],"role":user.get("role","user"), return {"username":user["username"],"role":user.get("role","user"),
"permissions":user.get("permissions",{"stt":False,"ocr":False})} "permissions":user.get("permissions",{"stt":False,"ocr":False,"subtitle":False})}
# ════════════════════════════════════════════════════════════════ # ════════════════════════════════════════════════════════════════
@@ -180,17 +190,72 @@ def system_info(user:dict=Depends(require_auth)):
"cpu_percent":psutil.cpu_percent(interval=0.3), "cpu_percent":psutil.cpu_percent(interval=0.3),
"cpu_threads_setting":s.get("cpu_threads",0), "cpu_threads_setting":s.get("cpu_threads",0),
"stt_timeout":s.get("stt_timeout",0),"ollama_timeout":s.get("ollama_timeout",600), "stt_timeout":s.get("stt_timeout",0),"ollama_timeout":s.get("ollama_timeout",600),
"subtitle_timeout":s.get("subtitle_timeout",600),
}
@app.get("/api/stt-engines")
def stt_engines(user:dict=Depends(require_auth)):
s=_load_settings()
return {
"local":{"available":True},
"groq":{"available":True,"key_set":bool(s.get("groq_api_key",""))},
"openai":{"available":True,"key_set":bool(s.get("openai_api_key",""))},
"default":s.get("default_stt_engine","local"),
} }
# ════════════════════════════════════════════════════════════════ # ════════════════════════════════════════════════════════════════
# STT 단일 / 배치 # 작업 상태 / 취소
# ════════════════════════════════════════════════════════════════ # ════════════════════════════════════════════════════════════════
async def _dispatch_stt(request,files,use_ollama,ollama_model,use_openrouter,openrouter_model,user): @app.get("/api/status/{task_id}")
def get_status(task_id:str, user:dict=Depends(require_auth)):
r=celery_app.AsyncResult(task_id)
if r.state=="PENDING": return {"state":"pending","progress":0,"message":"대기 중..."}
if r.state=="PROGRESS":
m=r.info or {}
return {"state":"progress","progress":m.get("progress",0),
"step":m.get("step",0),"step_msg":m.get("step_msg",""),
"message":m.get("message","처리 중...")}
if r.state=="SUCCESS":
_update_history_by_task(task_id,r.result or {},True)
return {"state":"success","progress":100,**(r.result or {})}
if r.state=="FAILURE":
_update_history_by_task(task_id,{},False,str(r.info))
return {"state":"failure","progress":0,"message":str(r.info)}
if r.state=="REVOKED":
return {"state":"cancelled","progress":0,"message":"작업이 취소되었습니다"}
return {"state":r.state.lower(),"progress":0}
@app.post("/api/cancel/{task_id}")
def cancel_task(task_id:str, user:dict=Depends(require_auth)):
"""작업 취소 (Celery revoke)"""
try:
celery_app.control.revoke(task_id, terminate=True, signal="SIGTERM")
# 이력에 취소 표시
with _hist_lock:
if HISTORY_FILE.exists():
with open(HISTORY_FILE,"r",encoding="utf-8") as f: history=json.load(f)
for h in history:
if h.get("task_id")==task_id and h.get("status")=="processing":
h["status"]="cancelled"
h["output"]={"error":"사용자가 취소했습니다"}
break
_write_history(history)
return {"ok":True,"message":"취소 요청 전송됨"}
except Exception as e:
return {"ok":False,"message":str(e)}
# ════════════════════════════════════════════════════════════════
# STT
# ════════════════════════════════════════════════════════════════
async def _dispatch_stt(request,files,use_ollama,ollama_model,use_openrouter,openrouter_model,
stt_engine,stt_language,user):
s=_load_settings() s=_load_settings()
_uo=use_ollama.lower()=="true"; _uor=use_openrouter.lower()=="true" _uo=use_ollama.lower()=="true"; _uor=use_openrouter.lower()=="true"
if _uo and not ollama_model.strip(): ollama_model=s.get("stt_ollama_model","") if _uo and not ollama_model.strip(): ollama_model=s.get("stt_ollama_model","")
if _uor and not openrouter_model.strip():openrouter_model=s.get("openrouter_stt_model","") if _uor and not openrouter_model.strip():openrouter_model=s.get("openrouter_stt_model","")
if not stt_engine: stt_engine=s.get("default_stt_engine","local")
results=[] results=[]
for file in files: for file in files:
_check_size(request) _check_size(request)
@@ -200,13 +265,18 @@ async def _dispatch_stt(request,files,use_ollama,ollama_model,use_openrouter,ope
file_id=str(uuid.uuid4()) file_id=str(uuid.uuid4())
save_path=os.path.join(UPLOAD_DIR,f"{file_id}.{ext}") save_path=os.path.join(UPLOAD_DIR,f"{file_id}.{ext}")
await _save_upload(file,save_path); file_size=os.path.getsize(save_path) await _save_upload(file,save_path); file_size=os.path.getsize(save_path)
task=transcribe_task.delay(file_id,save_path,_uo,ollama_model,_uor,openrouter_model, task=transcribe_task.delay(
s.get("openrouter_url",""),s.get("openrouter_api_key","")) file_id,save_path,_uo,ollama_model,_uor,openrouter_model,
s.get("openrouter_url",""),s.get("openrouter_api_key",""),
stt_engine,s.get("groq_api_key",""),s.get("openai_api_key",""),stt_language or "",
)
append_history({"id":file_id,"task_id":task.id,"type":"stt","status":"processing", append_history({"id":file_id,"task_id":task.id,"type":"stt","status":"processing",
"timestamp":datetime.now().strftime("%Y-%m-%d %H:%M:%S"),"username":user["username"], "timestamp":datetime.now().strftime("%Y-%m-%d %H:%M:%S"),"username":user["username"],
"input":{"filename":file.filename,"size_bytes":file_size,"format":ext.upper()}, "input":{"filename":file.filename,"size_bytes":file_size,"format":ext.upper()},
"settings":{"model":os.getenv("WHISPER_MODEL","medium"),"language":os.getenv("WHISPER_LANGUAGE","ko"), "settings":{"model":os.getenv("WHISPER_MODEL","medium"),
"compute_type":os.getenv("WHISPER_COMPUTE_TYPE","int8"),"cpu_threads":s.get("cpu_threads",0), "language":stt_language or os.getenv("WHISPER_LANGUAGE","ko"),
"compute_type":os.getenv("WHISPER_COMPUTE_TYPE","int8"),
"cpu_threads":s.get("cpu_threads",0),"stt_engine":stt_engine,
"use_ollama":_uo,"ollama_model":ollama_model if _uo else "", "use_ollama":_uo,"ollama_model":ollama_model if _uo else "",
"use_openrouter":_uor,"openrouter_model":openrouter_model if _uor else ""}, "use_openrouter":_uor,"openrouter_model":openrouter_model if _uor else ""},
"output":None}) "output":None})
@@ -217,80 +287,72 @@ async def _dispatch_stt(request,files,use_ollama,ollama_model,use_openrouter,ope
async def transcribe(request:Request,file:UploadFile=File(...), async def transcribe(request:Request,file:UploadFile=File(...),
use_ollama:str=Form("false"),ollama_model:str=Form(""), use_ollama:str=Form("false"),ollama_model:str=Form(""),
use_openrouter:str=Form("false"),openrouter_model:str=Form(""), use_openrouter:str=Form("false"),openrouter_model:str=Form(""),
stt_engine:str=Form(""),stt_language:str=Form(""),
user:dict=Depends(require_stt)): user:dict=Depends(require_stt)):
items=await _dispatch_stt(request,[file],use_ollama,ollama_model,use_openrouter,openrouter_model,user) items=await _dispatch_stt(request,[file],use_ollama,ollama_model,use_openrouter,openrouter_model,stt_engine,stt_language,user)
return items[0] return items[0]
@app.post("/api/transcribe/batch") @app.post("/api/transcribe/batch")
async def transcribe_batch(request:Request,files:List[UploadFile]=File(...), async def transcribe_batch(request:Request,files:List[UploadFile]=File(...),
use_ollama:str=Form("false"),ollama_model:str=Form(""), use_ollama:str=Form("false"),ollama_model:str=Form(""),
use_openrouter:str=Form("false"),openrouter_model:str=Form(""), use_openrouter:str=Form("false"),openrouter_model:str=Form(""),
stt_engine:str=Form(""),stt_language:str=Form(""),
user:dict=Depends(require_stt)): user:dict=Depends(require_stt)):
if not files: raise HTTPException(400,"파일이 없습니다") if not files: raise HTTPException(400,"파일이 없습니다")
if len(files)>20: raise HTTPException(400,"최대 20개까지") if len(files)>20: raise HTTPException(400,"최대 20개까지")
items=await _dispatch_stt(request,files,use_ollama,ollama_model,use_openrouter,openrouter_model,user) items=await _dispatch_stt(request,files,use_ollama,ollama_model,use_openrouter,openrouter_model,stt_engine,stt_language,user)
return {"items":items,"total":len(items)} return {"items":items,"total":len(items)}
# ════════════════════════════════════════════════════════════════ # ════════════════════════════════════════════════════════════════
# 자막 파이프라인 (영상 → SRT/VTT) # 자막
# ════════════════════════════════════════════════════════════════ # ════════════════════════════════════════════════════════════════
@app.post("/api/subtitle") @app.post("/api/subtitle")
async def create_subtitle( async def create_subtitle(
request: Request, request:Request, file:UploadFile=File(...),
file: UploadFile = File(...), src_language:str=Form(""),subtitle_fmt:str=Form("srt"),
src_language: str = Form(""), # 원어 (빈칸=자동) stt_engine:str=Form("local"),
subtitle_fmt: str = Form("srt"), # srt | vtt | both refine_model:str=Form(""),refine_via:str=Form("ollama"),
translate_to: str = Form(""), # 번역 대상 언어 (빈칸=번역 안 함) translate_to:str=Form(""),trans_model:str=Form(""),trans_via:str=Form("ollama"),
trans_model: str = Form(""), # 번역 모델 user:dict=Depends(require_subtitle),
trans_via: str = Form("ollama"), # ollama | openrouter
user: dict = Depends(require_stt),
): ):
_check_size(request) _check_size(request)
ext = _ext(file.filename) ext=_ext(file.filename)
# 영상 + 오디오 모두 허용 (오디오만 있어도 자막 생성 가능) if ext not in AUDIO_EXT: raise HTTPException(400,"지원하지 않는 형식입니다")
if ext not in AUDIO_EXT: if subtitle_fmt not in ("srt","vtt","both"): subtitle_fmt="srt"
raise HTTPException(400, f"지원하지 않는 형식입니다. 영상/오디오 파일을 업로드하세요.") s=_load_settings()
if subtitle_fmt not in ("srt","vtt","both"): subtitle_fmt = "srt" if not stt_engine: stt_engine=s.get("default_stt_engine","local")
if not refine_model.strip():
s = _load_settings() refine_model=(s.get("openrouter_stt_model","") if refine_via=="openrouter"
# 번역 모델 미지정 시 설정에서 가져옴
if not trans_model.strip():
trans_model = (s.get("openrouter_stt_model","") if trans_via=="openrouter"
else s.get("stt_ollama_model","")) else s.get("stt_ollama_model",""))
if not trans_model.strip():
file_id = str(uuid.uuid4()) trans_model=(s.get("openrouter_stt_model","") if trans_via=="openrouter"
save_path = os.path.join(UPLOAD_DIR, f"{file_id}.{ext}") else s.get("stt_ollama_model",""))
await _save_upload(file, save_path) file_id=str(uuid.uuid4())
file_size = os.path.getsize(save_path) save_path=os.path.join(UPLOAD_DIR,f"{file_id}.{ext}")
await _save_upload(file,save_path)
task = subtitle_pipeline_task.delay( file_size=os.path.getsize(save_path)
file_id, save_path, subtitle_timeout=int(s.get("subtitle_timeout",600))
src_language, subtitle_fmt, task=subtitle_pipeline_task.delay(
translate_to, trans_model, trans_via, file_id,save_path,src_language,subtitle_fmt,
s.get("openrouter_url",""), s.get("openrouter_api_key",""), stt_engine,s.get("groq_api_key",""),s.get("openai_api_key",""),
refine_model,refine_via,translate_to,trans_model,trans_via,
s.get("openrouter_url",""),s.get("openrouter_api_key",""),
subtitle_timeout,
) )
append_history({"id":file_id,"task_id":task.id,"type":"subtitle","status":"processing",
append_history({ "timestamp":datetime.now().strftime("%Y-%m-%d %H:%M:%S"),"username":user["username"],
"id": file_id, "task_id": task.id, "type": "subtitle", "input":{"filename":file.filename,"size_bytes":file_size,"format":ext.upper()},
"status": "processing", "settings":{"src_language":src_language or "auto","subtitle_fmt":subtitle_fmt,
"timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), "stt_engine":stt_engine,"refine_model":refine_model,"refine_via":refine_via,
"username": user["username"], "translate_to":translate_to,"trans_model":trans_model,"trans_via":trans_via,
"input": {"filename": file.filename, "size_bytes": file_size, "format": ext.upper()}, "subtitle_timeout":subtitle_timeout},
"settings": { "output":None})
"src_language": src_language or "auto", return {"task_id":task.id,"file_id":file_id,"filename":file.filename}
"subtitle_fmt": subtitle_fmt,
"translate_to": translate_to,
"trans_model": trans_model,
"trans_via": trans_via,
},
"output": None,
})
return {"task_id": task.id, "file_id": file_id, "filename": file.filename}
# ════════════════════════════════════════════════════════════════ # ════════════════════════════════════════════════════════════════
# OCR 단일 / 배치 # OCR
# ════════════════════════════════════════════════════════════════ # ════════════════════════════════════════════════════════════════
async def _dispatch_ocr(request,files,mode,backend,ollama_model,openrouter_model,custom_prompt,user): async def _dispatch_ocr(request,files,mode,backend,ollama_model,openrouter_model,custom_prompt,user):
if mode not in ("text","structure"): mode="text" if mode not in ("text","structure"): mode="text"
@@ -315,7 +377,7 @@ async def _dispatch_ocr(request,files,mode,backend,ollama_model,openrouter_model
"settings":{"backend":backend,"mode":mode,"ocr_lang":os.getenv("OCR_LANG","korean"), "settings":{"backend":backend,"mode":mode,"ocr_lang":os.getenv("OCR_LANG","korean"),
"ollama_model":ollama_model if backend=="ollama" else "", "ollama_model":ollama_model if backend=="ollama" else "",
"openrouter_model":openrouter_model if backend=="openrouter" else "", "openrouter_model":openrouter_model if backend=="openrouter" else "",
"ollama_timeout":s.get("ollama_timeout",600),"custom_prompt":custom_prompt[:200] if custom_prompt else ""}, "custom_prompt":custom_prompt[:200] if custom_prompt else ""},
"output":None}) "output":None})
results.append({"task_id":task.id,"file_id":file_id,"filename":file.filename}) results.append({"task_id":task.id,"file_id":file_id,"filename":file.filename})
return results return results
@@ -340,17 +402,8 @@ async def ocr_batch(request:Request,files:List[UploadFile]=File(...),
# ════════════════════════════════════════════════════════════════ # ════════════════════════════════════════════════════════════════
# 상태 / 이력 / 다운로드 / Ollama / OpenRouter / 설정 / 관리자 # 이력
# ════════════════════════════════════════════════════════════════ # ════════════════════════════════════════════════════════════════
@app.get("/api/status/{task_id}")
def get_status(task_id:str,user:dict=Depends(require_auth)):
r=celery_app.AsyncResult(task_id)
if r.state=="PENDING": return {"state":"pending","progress":0,"message":"대기 중..."}
if r.state=="PROGRESS": m=r.info or {};return {"state":"progress","progress":m.get("progress",0),"step":m.get("step",0),"step_msg":m.get("step_msg",""),"message":m.get("message","처리 중...")}
if r.state=="SUCCESS": _update_history_by_task(task_id,r.result or {},True);return {"state":"success","progress":100,**(r.result or {})}
if r.state=="FAILURE": _update_history_by_task(task_id,{},False,str(r.info));return {"state":"failure","progress":0,"message":str(r.info)}
return {"state":r.state.lower(),"progress":0}
@app.get("/api/history") @app.get("/api/history")
def get_history(page:int=1,per_page:int=15,type_:str="",user:dict=Depends(require_auth)): def get_history(page:int=1,per_page:int=15,type_:str="",user:dict=Depends(require_auth)):
history=_load_history() history=_load_history()
@@ -366,8 +419,12 @@ def delete_history(history_id:str,user:dict=Depends(require_auth)):
@app.delete("/api/history") @app.delete("/api/history")
def clear_all_history(user:dict=Depends(require_admin)): def clear_all_history(user:dict=Depends(require_admin)):
clear_history();return {"ok":True} clear_history(); return {"ok":True}
# ════════════════════════════════════════════════════════════════
# 다운로드 / Ollama / OpenRouter / 설정 / 관리자
# ════════════════════════════════════════════════════════════════
@app.get("/api/download/{filename}") @app.get("/api/download/{filename}")
def download(filename:str,user:dict=Depends(require_auth)): def download(filename:str,user:dict=Depends(require_auth)):
if ".." in filename or "/" in filename: raise HTTPException(400,"잘못된 파일명") if ".." in filename or "/" in filename: raise HTTPException(400,"잘못된 파일명")
@@ -382,79 +439,102 @@ def download(filename:str,user:dict=Depends(require_auth)):
@app.get("/api/ollama/models") @app.get("/api/ollama/models")
def ollama_models(user:dict=Depends(require_auth)): def ollama_models(user:dict=Depends(require_auth)):
try: try:
resp=httpx.get(f"{OLLAMA_URL}/api/tags",timeout=8.0);resp.raise_for_status() resp=httpx.get(f"{OLLAMA_URL}/api/tags",timeout=8.0); resp.raise_for_status()
return {"models":[m["name"] for m in resp.json().get("models",[])], "connected":True} return {"models":[m["name"] for m in resp.json().get("models",[])], "connected":True}
except Exception as e: return {"models":[],"connected":False,"error":str(e)} except Exception as e: return {"models":[],"connected":False,"error":str(e)}
@app.get("/api/openrouter/models") @app.get("/api/openrouter/models")
def openrouter_models(user:dict=Depends(require_auth)): def openrouter_models(user:dict=Depends(require_auth)):
s=_load_settings();api_key=s.get("openrouter_api_key","");base_url=s.get("openrouter_url","https://openrouter.ai/api/v1").rstrip("/") s=_load_settings(); api_key=s.get("openrouter_api_key","")
if not api_key: return {"models":[],"connected":False,"error":"API 키가 설정되지 않았습니다"} base_url=s.get("openrouter_url","https://openrouter.ai/api/v1").rstrip("/")
if not api_key: return {"models":[],"vision_models":[],"text_models":[],"connected":False,"error":"API 키가 설정되지 않았습니다"}
try: try:
resp=httpx.get(f"{base_url}/models",headers={"Authorization":f"Bearer {api_key}","HTTP-Referer":"https://voicescript.local"},timeout=12.0) resp=httpx.get(f"{base_url}/models",
headers={"Authorization":f"Bearer {api_key}","HTTP-Referer":"https://voicescript.local"},timeout=12.0)
resp.raise_for_status() resp.raise_for_status()
all_models=resp.json().get("data",[]) all_models=resp.json().get("data",[])
vision=[m["id"] for m in all_models if any(k in m["id"].lower() for k in ["vision","claude-3","gemini","gpt-4o","llava","pixtral","qwen-vl","deepseek-vl"])] vision=[m["id"] for m in all_models if any(k in m["id"].lower()
return {"models":[m["id"] for m in all_models],"vision_models":vision,"connected":True,"total":len(all_models)} for k in ["vision","claude-3","gemini","gpt-4o","llava","pixtral","qwen-vl","deepseek-vl"])]
except httpx.HTTPStatusError as e: return {"models":[],"connected":False,"error":f"HTTP {e.response.status_code}"} text=[m["id"] for m in all_models if m["id"] not in vision]
except Exception as e: return {"models":[],"connected":False,"error":str(e)} return {"models":[m["id"] for m in all_models],"vision_models":vision,"text_models":text,
"connected":True,"total":len(all_models)}
except httpx.HTTPStatusError as e: return {"models":[],"vision_models":[],"text_models":[],"connected":False,"error":f"HTTP {e.response.status_code}"}
except Exception as e: return {"models":[],"vision_models":[],"text_models":[],"connected":False,"error":str(e)}
@app.post("/api/openrouter/test") @app.post("/api/openrouter/test")
def openrouter_test(api_key:str=Form(...),base_url:str=Form("https://openrouter.ai/api/v1"),user:dict=Depends(require_auth)): def openrouter_test(api_key:str=Form(...),base_url:str=Form("https://openrouter.ai/api/v1"),user:dict=Depends(require_auth)):
try: try:
resp=httpx.get(f"{base_url.rstrip('/')}/models",headers={"Authorization":f"Bearer {api_key}","HTTP-Referer":"https://voicescript.local"},timeout=10.0) resp=httpx.get(f"{base_url.rstrip('/')}/models",
resp.raise_for_status();count=len(resp.json().get("data",[])) headers={"Authorization":f"Bearer {api_key}","HTTP-Referer":"https://voicescript.local"},timeout=10.0)
resp.raise_for_status(); count=len(resp.json().get("data",[]))
return {"ok":True,"message":f"연결 성공 — {count}개 모델 사용 가능"} return {"ok":True,"message":f"연결 성공 — {count}개 모델 사용 가능"}
except httpx.HTTPStatusError as e: return {"ok":False,"message":f"인증 실패 (HTTP {e.response.status_code})"} except httpx.HTTPStatusError as e: return {"ok":False,"message":f"인증 실패 (HTTP {e.response.status_code})"}
except Exception as e: return {"ok":False,"message":f"연결 실패: {str(e)}"} except Exception as e: return {"ok":False,"message":f"연결 실패: {str(e)}"}
@app.get("/api/settings") @app.get("/api/settings")
def get_settings(user:dict=Depends(require_auth)): def get_settings(user:dict=Depends(require_auth)):
s=_load_settings();result=dict(s) s=_load_settings(); result=dict(s)
if result.get("openrouter_api_key"): for field in ("openrouter_api_key","groq_api_key","openai_api_key"):
key=result["openrouter_api_key"] result[field+"_masked"]=_mask(result.get(field,""))
result["openrouter_api_key_masked"]=key[:8]+"..."+key[-4:] if len(key)>12 else "****" result[field]=""
else: result["openrouter_api_key_masked"]="" return result
result["openrouter_api_key"]="";return result
@app.post("/api/settings") @app.post("/api/settings")
def save_settings_endpoint( def save_settings_endpoint(
stt_ollama_model:str=Form(""),ocr_ollama_model:str=Form(""), stt_ollama_model:str=Form(""),ocr_ollama_model:str=Form(""),
cpu_threads:str=Form("0"),stt_timeout:str=Form("0"),ollama_timeout:str=Form("600"), cpu_threads:str=Form("0"),stt_timeout:str=Form("0"),
openrouter_url:str=Form("https://openrouter.ai/api/v1"),openrouter_api_key:str=Form(""), ollama_timeout:str=Form("600"),subtitle_timeout:str=Form("600"),
openrouter_stt_model:str=Form(""),openrouter_ocr_model:str=Form(""), openrouter_url:str=Form("https://openrouter.ai/api/v1"),
openrouter_api_key:str=Form(""),openrouter_stt_model:str=Form(""),openrouter_ocr_model:str=Form(""),
groq_api_key:str=Form(""),openai_api_key:str=Form(""),
default_stt_engine:str=Form("local"),
user:dict=Depends(require_auth), user:dict=Depends(require_auth),
): ):
def _int(v,d): def _int(v,d):
try: return max(0,int(v)) try: return max(0,int(v))
except: return d except: return d
current=_load_settings() current=_load_settings()
final_key=openrouter_api_key.strip() if openrouter_api_key.strip() else current.get("openrouter_api_key","") data={
data={"stt_ollama_model":stt_ollama_model,"ocr_ollama_model":ocr_ollama_model, "stt_ollama_model":stt_ollama_model,"ocr_ollama_model":ocr_ollama_model,
"cpu_threads":_int(cpu_threads,0),"stt_timeout":_int(stt_timeout,0),"ollama_timeout":_int(ollama_timeout,600), "cpu_threads":_int(cpu_threads,0),"stt_timeout":_int(stt_timeout,0),
"ollama_timeout":_int(ollama_timeout,600),"subtitle_timeout":_int(subtitle_timeout,600),
"openrouter_url":openrouter_url.strip() or "https://openrouter.ai/api/v1", "openrouter_url":openrouter_url.strip() or "https://openrouter.ai/api/v1",
"openrouter_api_key":final_key,"openrouter_stt_model":openrouter_stt_model,"openrouter_ocr_model":openrouter_ocr_model} "openrouter_api_key":_keep(openrouter_api_key,"openrouter_api_key",current),
_save_settings(data);return {"ok":True,"settings":{k:v for k,v in data.items() if k!="openrouter_api_key"}} "openrouter_stt_model":openrouter_stt_model,"openrouter_ocr_model":openrouter_ocr_model,
"groq_api_key":_keep(groq_api_key,"groq_api_key",current),
"openai_api_key":_keep(openai_api_key,"openai_api_key",current),
"default_stt_engine":default_stt_engine or "local",
}
_save_settings(data)
result={k:v for k,v in data.items() if not k.endswith("_api_key")}
for f in ("openrouter_api_key","groq_api_key","openai_api_key"):
result[f+"_masked"]=_mask(data.get(f,""))
return {"ok":True,"settings":result}
@app.get("/api/admin/users") @app.get("/api/admin/users")
def admin_list_users(user:dict=Depends(require_admin)): return {"users":list_users()} def admin_list_users(user:dict=Depends(require_admin)): return {"users":list_users()}
@app.post("/api/admin/users") @app.post("/api/admin/users")
def admin_create_user(username:str=Form(...),password:str=Form(...), def admin_create_user(username:str=Form(...),password:str=Form(...),
perm_stt:str=Form("false"),perm_ocr:str=Form("false"), perm_stt:str=Form("false"),perm_ocr:str=Form("false"),perm_subtitle:str=Form("false"),
allowed_stt_models:str=Form(""),allowed_ocr_models:str=Form(""),user:dict=Depends(require_admin)): allowed_stt_models:str=Form(""),allowed_ocr_models:str=Form(""),
user:dict=Depends(require_admin)):
def _p(s): return [m.strip() for m in s.split(",") if m.strip()] def _p(s): return [m.strip() for m in s.split(",") if m.strip()]
perms={"stt":perm_stt.lower()=="true","ocr":perm_ocr.lower()=="true", perms={"stt":perm_stt.lower()=="true","ocr":perm_ocr.lower()=="true",
"subtitle":perm_subtitle.lower()=="true",
"allowed_stt_models":_p(allowed_stt_models),"allowed_ocr_models":_p(allowed_ocr_models)} "allowed_stt_models":_p(allowed_stt_models),"allowed_ocr_models":_p(allowed_ocr_models)}
ok,msg=create_user(username,password,perms) ok,msg=create_user(username,password,perms)
if not ok: raise HTTPException(400,msg) if not ok: raise HTTPException(400,msg)
return {"ok":True,"message":msg} return {"ok":True,"message":msg}
@app.put("/api/admin/users/{username}") @app.put("/api/admin/users/{username}")
def admin_update_user(username:str,perm_stt:str=Form("false"),perm_ocr:str=Form("false"), def admin_update_user(username:str,
password:str=Form(""),allowed_stt_models:str=Form(""),allowed_ocr_models:str=Form(""),user:dict=Depends(require_admin)): perm_stt:str=Form("false"),perm_ocr:str=Form("false"),perm_subtitle:str=Form("false"),
password:str=Form(""),allowed_stt_models:str=Form(""),allowed_ocr_models:str=Form(""),
user:dict=Depends(require_admin)):
def _p(s): return [m.strip() for m in s.split(",") if m.strip()] def _p(s): return [m.strip() for m in s.split(",") if m.strip()]
perms={"stt":perm_stt.lower()=="true","ocr":perm_ocr.lower()=="true", perms={"stt":perm_stt.lower()=="true","ocr":perm_ocr.lower()=="true",
"subtitle":perm_subtitle.lower()=="true",
"allowed_stt_models":_p(allowed_stt_models),"allowed_ocr_models":_p(allowed_ocr_models)} "allowed_stt_models":_p(allowed_stt_models),"allowed_ocr_models":_p(allowed_ocr_models)}
ok,msg=update_user(username,perms,password or None) ok,msg=update_user(username,perms,password or None)
if not ok: raise HTTPException(400,msg) if not ok: raise HTTPException(400,msg)
@@ -469,6 +549,17 @@ def admin_delete_user(username:str,user:dict=Depends(require_admin)):
@app.post("/api/cleanup") @app.post("/api/cleanup")
def cleanup(user:dict=Depends(require_auth)): return {"removed":_cleanup_outputs()} def cleanup(user:dict=Depends(require_auth)): return {"removed":_cleanup_outputs()}
@app.get("/")
async def index():
import pathlib
path=pathlib.Path("static/index.html")
resp=FileResponse(path,media_type="text/html")
resp.headers["Cache-Control"]="no-cache, no-store, must-revalidate"
resp.headers["Pragma"]="no-cache"; resp.headers["Expires"]="0"
return resp
app.mount("/",StaticFiles(directory="static",html=True),name="static")
# ════════════════════════════════════════════════════════════════ # ════════════════════════════════════════════════════════════════
# 유틸 # 유틸
@@ -479,10 +570,10 @@ def _check_size(request:Request):
def _cleanup_outputs(): def _cleanup_outputs():
if OUTPUT_KEEP_SECS==0: return 0 if OUTPUT_KEEP_SECS==0: return 0
cutoff=time.time()-OUTPUT_KEEP_SECS;removed=0 cutoff=time.time()-OUTPUT_KEEP_SECS; removed=0
for f in glob.glob(os.path.join(OUTPUT_DIR,"*")): for f in glob.glob(os.path.join(OUTPUT_DIR,"*")):
try: try:
if os.path.getmtime(f)<cutoff: os.remove(f);removed+=1 if os.path.getmtime(f)<cutoff: os.remove(f); removed+=1
except: pass except: pass
return removed return removed
@@ -494,8 +585,6 @@ async def _save_upload(file:UploadFile,path:str):
while chunk:=await file.read(1024*1024): while chunk:=await file.read(1024*1024):
written+=len(chunk) written+=len(chunk)
if written>MAX_UPLOAD_BYTES: if written>MAX_UPLOAD_BYTES:
await f.close();os.remove(path) await f.close(); os.remove(path)
raise HTTPException(413,f"파일이 너무 큽니다. 최대 {MAX_UPLOAD_BYTES//1024//1024}MB") raise HTTPException(413,f"파일이 너무 큽니다. 최대 {MAX_UPLOAD_BYTES//1024//1024}MB")
await f.write(chunk) await f.write(chunk)
app.mount("/", StaticFiles(directory="static", html=True), name="static")

File diff suppressed because it is too large Load Diff

View File

@@ -1,10 +1,5 @@
""" """
STT + Subtitle Pipeline Celery Tasks STT + Subtitle Pipeline Celery Tasks
subtitle_pipeline_task:
Step 1: ffmpeg → 16kHz WAV 추출
Step 2: Whisper → 원어 SRT / VTT 생성
Step 3: LLM → 번역 SRT / VTT 생성 (선택)
""" """
import os, json, subprocess, tempfile import os, json, subprocess, tempfile
import httpx import httpx
@@ -21,6 +16,8 @@ INITIAL_PROMPT = os.getenv("WHISPER_INITIAL_PROMPT", "") or None
OUTPUT_DIR = os.getenv("OUTPUT_DIR", "/data/outputs") OUTPUT_DIR = os.getenv("OUTPUT_DIR", "/data/outputs")
OLLAMA_URL = os.getenv("OLLAMA_URL", "http://192.168.0.126:11434") OLLAMA_URL = os.getenv("OLLAMA_URL", "http://192.168.0.126:11434")
OLLAMA_TIMEOUT = int(os.getenv("OLLAMA_TIMEOUT", "600")) OLLAMA_TIMEOUT = int(os.getenv("OLLAMA_TIMEOUT", "600"))
GROQ_BASE = "https://api.groq.com/openai/v1"
OPENAI_BASE = "https://api.openai.com/v1"
_cpu_threads_env = int(os.getenv("CPU_THREADS", "0")) _cpu_threads_env = int(os.getenv("CPU_THREADS", "0"))
CPU_THREADS = _cpu_threads_env if _cpu_threads_env > 0 else None CPU_THREADS = _cpu_threads_env if _cpu_threads_env > 0 else None
@@ -28,7 +25,7 @@ CPU_THREADS = _cpu_threads_env if _cpu_threads_env > 0 else None
celery_app = Celery("whisper_tasks", broker=REDIS_URL, backend=REDIS_URL) celery_app = Celery("whisper_tasks", broker=REDIS_URL, backend=REDIS_URL)
celery_app.conf.update( celery_app.conf.update(
task_serializer="json", result_serializer="json", task_serializer="json", result_serializer="json",
accept_content=["json"], task_track_started=True, result_expires=3600, accept_content=["json"], task_track_started=True, result_expires=86400,
) )
_whisper_model = None _whisper_model = None
@@ -39,14 +36,14 @@ def get_model():
from faster_whisper import WhisperModel from faster_whisper import WhisperModel
kwargs = dict(device=DEVICE, compute_type=COMPUTE_TYPE) kwargs = dict(device=DEVICE, compute_type=COMPUTE_TYPE)
if CPU_THREADS is not None: kwargs["cpu_threads"] = CPU_THREADS if CPU_THREADS is not None: kwargs["cpu_threads"] = CPU_THREADS
print(f"[Whisper] 로딩: {MODEL_SIZE}/{DEVICE}/{COMPUTE_TYPE}/threads={CPU_THREADS or 'auto'}") print(f"[Whisper] 로딩: {MODEL_SIZE}/{DEVICE}/{COMPUTE_TYPE}")
_whisper_model = WhisperModel(MODEL_SIZE, **kwargs) _whisper_model = WhisperModel(MODEL_SIZE, **kwargs)
print("[Whisper] 로드 완료") print("[Whisper] 완료")
return _whisper_model return _whisper_model
# ══════════════════════════════════════════════════════════════ # ══════════════════════════════════════════════════════════════
# 언어 코드 → 표시명 # 공통 유틸
# ══════════════════════════════════════════════════════════════ # ══════════════════════════════════════════════════════════════
LANG_NAMES = { LANG_NAMES = {
"ko":"한국어","en":"English","ja":"日本語","zh":"中文(简体)", "ko":"한국어","en":"English","ja":"日本語","zh":"中文(简体)",
@@ -54,361 +51,404 @@ LANG_NAMES = {
"it":"Italiano","pt":"Português","ru":"Русский","ar":"العربية", "it":"Italiano","pt":"Português","ru":"Русский","ar":"العربية",
"vi":"Tiếng Việt","th":"ไทย","id":"Bahasa Indonesia", "vi":"Tiếng Việt","th":"ไทย","id":"Bahasa Indonesia",
"nl":"Nederlands","pl":"Polski","tr":"Türkçe","sv":"Svenska", "nl":"Nederlands","pl":"Polski","tr":"Türkçe","sv":"Svenska",
"uk":"Українська","hi":"हिन्दी","bn":"বাংলা", "uk":"Українська","hi":"हिन्दी",
} }
def _lang_name(code): return LANG_NAMES.get(code, code) def _lang_name(code): return LANG_NAMES.get(code, code)
def _srt_time(s):
# ══════════════════════════════════════════════════════════════ ms=int(round(s*1000)); h,r=divmod(ms,3600000); m,r=divmod(r,60000); sec,ms=divmod(r,1000)
# 자막 포맷 생성
# ══════════════════════════════════════════════════════════════
def _srt_time(s: float) -> str:
ms = int(round(s * 1000))
h, r = divmod(ms, 3600000); m, r = divmod(r, 60000); sec, ms = divmod(r, 1000)
return f"{h:02d}:{m:02d}:{sec:02d},{ms:03d}" return f"{h:02d}:{m:02d}:{sec:02d},{ms:03d}"
def _vtt_time(s: float) -> str: def _vtt_time(s): return _srt_time(s).replace(",",".")
return _srt_time(s).replace(",", ".")
def make_srt(segments: list) -> str: def make_srt(segments):
out = [] out=[]
for i, seg in enumerate(segments, 1): for i,seg in enumerate(segments,1):
out += [str(i), f"{_srt_time(seg['start'])} --> {_srt_time(seg['end'])}", seg["text"].strip(), ""] out+=[str(i),f"{_srt_time(seg['start'])} --> {_srt_time(seg['end'])}",seg["text"].strip(),""]
return "\n".join(out) return "\n".join(out)
def make_vtt(segments: list) -> str: def make_vtt(segments):
out = ["WEBVTT", ""] out=["WEBVTT",""]
for i, seg in enumerate(segments, 1): for i,seg in enumerate(segments,1):
out += [str(i), f"{_vtt_time(seg['start'])} --> {_vtt_time(seg['end'])}", seg["text"].strip(), ""] out+=[str(i),f"{_vtt_time(seg['start'])} --> {_vtt_time(seg['end'])}",seg["text"].strip(),""]
return "\n".join(out) return "\n".join(out)
def _llm_call(prompt, model, use_openrouter, openrouter_url, openrouter_key, timeout):
"""LLM 호출 — 명확한 에러 메시지 포함"""
if use_openrouter:
if not openrouter_key:
raise Exception("OpenRouter API 키가 설정되지 않았습니다. 설정 → OpenRouter에서 입력하세요.")
try:
resp = httpx.post(
f"{openrouter_url.rstrip('/')}/chat/completions",
headers={"Authorization":f"Bearer {openrouter_key}",
"HTTP-Referer":"https://voicescript.local","Content-Type":"application/json"},
json={"model":model,"messages":[{"role":"user","content":prompt}],"temperature":0.2},
timeout=float(timeout),
)
resp.raise_for_status()
return resp.json()["choices"][0]["message"]["content"].strip()
except httpx.TimeoutException:
raise Exception(f"OpenRouter 응답 시간 초과 ({timeout}초). 설정에서 타임아웃을 늘리거나 모델을 변경하세요.")
except httpx.HTTPStatusError as e:
raise Exception(f"OpenRouter 오류 ({e.response.status_code}): API 키 또는 모델을 확인하세요.")
else:
try:
resp = httpx.post(f"{OLLAMA_URL}/api/chat",
json={"model":model,"messages":[{"role":"user","content":prompt}],
"stream":False,"options":{"temperature":0.2}},
timeout=float(timeout))
resp.raise_for_status()
result = resp.json().get("message",{}).get("content","").strip()
if not result:
raise Exception(f"Ollama({model}) 빈 응답. 모델이 실행 중인지 확인: ollama list")
return result
except httpx.ConnectError:
raise Exception(f"Ollama 서버 연결 실패 ({OLLAMA_URL}). 서버가 실행 중인지 확인하세요.")
except httpx.TimeoutException:
raise Exception(
f"Ollama({model}) 응답 시간 초과 ({timeout}초).\n"
f"원인: 모델 로딩 중이거나, 시스템 리소스 부족, 또는 모델이 응답하지 않음.\n"
f"해결: 설정에서 Ollama 타임아웃을 늘리거나, 더 작은 모델을 사용하세요."
)
# ══════════════════════════════════════════════════════════════ def _translate_batch(texts, target_lang, use_or, model, or_url, or_key, timeout):
# LLM 번역 (세그먼트 배치)
# ══════════════════════════════════════════════════════════════
def _translate_batch(texts: list, target_lang: str,
use_openrouter: bool, model: str,
openrouter_url: str, openrouter_key: str) -> list:
"""texts 리스트 → 번역된 texts 리스트"""
if not texts or not model: return texts if not texts or not model: return texts
lang_name = _lang_name(target_lang)
prompt = ( prompt = (
f"아래 자막 문장 배열을 {lang_name}로 번역해줘.\n" f"아래 자막 문장 배열을 {_lang_name(target_lang)}로 번역해줘.\n"
f"반드시 JSON 문자열 배열로만 답해. 설명·마크다운 없이 배열만 출력.\n" f"반드시 JSON 문자열 배열로만 답해. 설명·마크다운 없이 배열만 출력.\n"
f"입력과 동일한 개수와 순서를 유지해.\n\n" f"입력과 동일한 개수와 순서를 유지해.\n\n"
f"{json.dumps(texts, ensure_ascii=False)}" f"{json.dumps(texts, ensure_ascii=False)}"
) )
try: try:
if use_openrouter and openrouter_key: raw = _llm_call(prompt, model, use_or, or_url, or_key, timeout)
resp = httpx.post( if "```" in raw: raw=raw.split("```")[1].lstrip("json\n").rstrip()
f"{openrouter_url.rstrip('/')}/chat/completions",
headers={"Authorization": f"Bearer {openrouter_key}",
"HTTP-Referer": "https://voicescript.local",
"Content-Type": "application/json"},
json={"model": model,
"messages": [{"role":"user","content":prompt}],
"temperature": 0.2},
timeout=float(OLLAMA_TIMEOUT),
)
resp.raise_for_status()
raw = resp.json()["choices"][0]["message"]["content"].strip()
else:
resp = httpx.post(f"{OLLAMA_URL}/api/chat",
json={"model": model,
"messages": [{"role":"user","content":prompt}],
"stream": False, "options": {"temperature": 0.2}},
timeout=float(OLLAMA_TIMEOUT))
resp.raise_for_status()
raw = resp.json().get("message",{}).get("content","").strip()
# 코드블록 제거 후 JSON 파싱
if "```" in raw:
raw = raw.split("```")[1].lstrip("json\n").rstrip()
result = json.loads(raw) result = json.loads(raw)
if isinstance(result, list) and len(result) == len(texts): if isinstance(result,list) and len(result)==len(texts):
return [str(r) for r in result] return [str(r) for r in result]
return texts return texts
except Exception as e: except Exception as e:
print(f"[번역 실패] {e}") print(f"[번역 실패] {e}")
return texts # 실패 시 원본 유지 return texts
def _refine_batch(texts, model, use_or, or_url, or_key, timeout):
if not texts or not model: return texts
prompt = (
"아래는 음성 인식 자막 문장 배열입니다.\n"
"내용은 절대 변경하지 말고, 문장 부호만 자연스럽게 교정해줘.\n"
"반드시 JSON 문자열 배열로만 답해. 설명·마크다운 없이 배열만.\n"
"입력과 동일한 개수와 순서를 유지해.\n\n"
f"{json.dumps(texts, ensure_ascii=False)}"
)
try:
raw = _llm_call(prompt, model, use_or, or_url, or_key, timeout)
if "```" in raw: raw=raw.split("```")[1].lstrip("json\n").rstrip()
result = json.loads(raw)
if isinstance(result,list) and len(result)==len(texts):
return [str(r) for r in result]
return texts
except Exception as e:
print(f"[교정 실패] {e}")
return texts
# ══════════════════════════════════════════════════════════════ def _ollama_postprocess(text, model):
# STT + Ollama/OpenRouter 후처리 (기존 음성변환용)
# ══════════════════════════════════════════════════════════════
def _ollama_postprocess(text: str, model: str) -> str:
if not model or not text.strip(): return text if not model or not text.strip(): return text
prompt = ("다음은 음성 인식으로 추출된 텍스트입니다. " prompt=("다음은 음성 인식으로 추출된 텍스트입니다. 내용은 절대 변경하지 말고 문장 부호만 추가해줘. "
"내용은 절대 변경하지 말고, 문장 부호를 추가하고 자연스럽게 다듬어줘. " "결과 텍스트만 출력하고 설명은 하지 마.\n\n"+text)
"결과 텍스트만 출력하고 설명은 하지 마.\n\n" + text)
try: try:
resp = httpx.post(f"{OLLAMA_URL}/api/chat", raw=_llm_call(prompt,model,False,"","",OLLAMA_TIMEOUT)
json={"model":model,"messages":[{"role":"user","content":prompt}], return raw if raw else text
"stream":False,"options":{"temperature":0.1}}, except Exception as e:
timeout=float(OLLAMA_TIMEOUT)) print(f"[Ollama 후처리 실패] {e}"); return text
resp.raise_for_status()
return resp.json().get("message",{}).get("content","").strip() or text
except: return text
def _openrouter_postprocess(text: str, model: str, base_url: str, api_key: str) -> str: def _openrouter_postprocess(text, model, base_url, api_key):
if not model or not api_key or not text.strip(): return text if not model or not api_key or not text.strip(): return text
prompt = ("다음은 음성 인식으로 추출된 텍스트입니다. " prompt=("다음은 음성 인식으로 추출된 텍스트입니다. 내용은 절대 변경하지 말고 문장 부호만 추가해줘. "
"내용은 절대 변경하지 말고, 문장 부호를 추가하고 자연스럽게 다듬어줘. " "결과 텍스트만 출력하고 설명은 하지 마.\n\n"+text)
"결과 텍스트만 출력하고 설명은 하지 마.\n\n" + text)
try: try:
resp = httpx.post(f"{base_url.rstrip('/')}/chat/completions", raw=_llm_call(prompt,model,True,base_url,api_key,OLLAMA_TIMEOUT)
headers={"Authorization":f"Bearer {api_key}","HTTP-Referer":"https://voicescript.local","Content-Type":"application/json"}, return raw if raw else text
json={"model":model,"messages":[{"role":"user","content":prompt}],"temperature":0.1}, except Exception as e:
timeout=float(OLLAMA_TIMEOUT)) print(f"[OpenRouter 후처리 실패] {e}"); return text
def _api_transcribe(audio_path, api_key, base_url, language, model="whisper-large-v3"):
"""Groq / OpenAI Whisper API 호출"""
with open(audio_path,"rb") as f:
data = f.read()
params = {"model":model}
if language: params["language"] = language
try:
resp = httpx.post(
f"{base_url}/audio/transcriptions",
headers={"Authorization":f"Bearer {api_key}"},
files={"file":("audio.mp3", data, "audio/mpeg")},
data=params,
timeout=600.0,
)
resp.raise_for_status() resp.raise_for_status()
return resp.json()["choices"][0]["message"]["content"].strip() or text d = resp.json()
except: return text text = d.get("text","")
# segments 구조 없으면 전체 텍스트로 단일 세그먼트
segs = d.get("segments",[])
if not segs and text:
segs = [{"start":0,"end":0,"text":text}]
return {"text":text, "segments":segs,
"language":d.get("language", language or ""), "duration":0}
except httpx.TimeoutException:
raise Exception(f"API 응답 시간 초과. 파일이 너무 크거나 서버 문제일 수 있습니다.")
except httpx.HTTPStatusError as e:
raise Exception(f"API 오류 ({e.response.status_code}): API 키를 확인하세요.")
# ══════════════════════════════════════════════════════════════ # ══════════════════════════════════════════════════════════════
# 기존 STT 태스크 (음성변환 탭) # STT Task (음성변환 탭)
# ══════════════════════════════════════════════════════════════ # ══════════════════════════════════════════════════════════════
@celery_app.task(bind=True, name="tasks.transcribe_task", queue="stt") @celery_app.task(bind=True, name="tasks.transcribe_task", queue="stt")
def transcribe_task( def transcribe_task(
self, self,
file_id: str, audio_path: str, file_id:str, audio_path:str,
use_ollama: bool = False, ollama_model: str = "", use_ollama:bool=False, ollama_model:str="",
use_openrouter: bool = False, openrouter_model: str = "", use_openrouter:bool=False, openrouter_model:str="",
openrouter_url: str = "", openrouter_key: str = "", openrouter_url:str="", openrouter_key:str="",
stt_engine:str="local",
groq_api_key:str="", openai_api_key:str="",
stt_language:str="",
): ):
self.update_state(state="PROGRESS", meta={"progress":5,"message":"모델 준비 중..."}) self.update_state(state="PROGRESS", meta={"progress":5,"message":"모델 준비 중..."})
tmp_mp3=None
try: try:
model = get_model() segments=[]; duration=0.0; detected_lang=""
self.update_state(state="PROGRESS", meta={"progress":15,"message":"오디오 분석 중..."})
segments_gen, info = model.transcribe( if stt_engine in ("groq","openai"):
audio_path, language=LANGUAGE, beam_size=BEAM_SIZE, api_key = groq_api_key if stt_engine=="groq" else openai_api_key
initial_prompt=INITIAL_PROMPT, vad_filter=True, base_url= GROQ_BASE if stt_engine=="groq" else OPENAI_BASE
vad_parameters=dict(min_silence_duration_ms=500), word_timestamps=False, if not api_key:
) raise Exception(f"{stt_engine.upper()} API 키가 설정되지 않았습니다. 설정 → STT 엔진 API 키에서 입력하세요.")
self.update_state(state="PROGRESS", meta={"progress":30,"message":"텍스트 변환 중..."}) self.update_state(state="PROGRESS",meta={"progress":20,"message":f"{stt_engine.upper()} API 변환 중..."})
segments, parts = [], [] import tempfile
duration = info.duration suffix=".mp3"
with tempfile.NamedTemporaryFile(suffix=suffix,delete=False) as tf: tmp_mp3=tf.name
cmd=["ffmpeg","-y","-i",audio_path,"-ar","16000","-ac","1","-b:a","128k",tmp_mp3]
r=subprocess.run(cmd,capture_output=True,timeout=300)
if r.returncode!=0: raise Exception(f"ffmpeg 변환 실패: {r.stderr.decode(errors='replace')[-200:]}")
result=_api_transcribe(tmp_mp3,api_key,base_url,stt_language)
segments=[{"start":round(s.get("start",0),3),"end":round(s.get("end",0),3),"text":s.get("text","").strip()}
for s in result.get("segments",[])]
detected_lang=result.get("language","")
duration=result.get("duration",0) or (segments[-1]["end"] if segments else 0)
else:
model=get_model()
self.update_state(state="PROGRESS",meta={"progress":15,"message":"오디오 분석 중..."})
lang=stt_language.strip() or LANGUAGE
segments_gen,info=model.transcribe(audio_path,language=lang,beam_size=BEAM_SIZE,
initial_prompt=INITIAL_PROMPT,vad_filter=True,
vad_parameters=dict(min_silence_duration_ms=500),word_timestamps=False)
self.update_state(state="PROGRESS",meta={"progress":30,"message":"텍스트 변환 중..."})
duration=info.duration; detected_lang=info.language
for seg in segments_gen: for seg in segments_gen:
segments.append({"start":round(seg.start,3),"end":round(seg.end,3),"text":seg.text.strip()}) segments.append({"start":round(seg.start,3),"end":round(seg.end,3),"text":seg.text.strip()})
parts.append(seg.text.strip()) if duration>0:
if duration > 0: pct=30+int((seg.end/duration)*50)
pct = 30 + int((seg.end/duration)*50) self.update_state(state="PROGRESS",meta={"progress":min(pct,80),"message":f"변환 중... {seg.end:.0f}s/{duration:.0f}s"})
self.update_state(state="PROGRESS",
meta={"progress":min(pct,80),"message":f"변환 중... {seg.end:.0f}s / {duration:.0f}s"})
raw_text = "\n".join(parts) raw_text="\n".join(s["text"] for s in segments)
full_text = raw_text full_text=raw_text
if use_ollama and ollama_model: if use_ollama and ollama_model:
self.update_state(state="PROGRESS",meta={"progress":85,"message":f"Ollama({ollama_model}) 교정 중..."}) self.update_state(state="PROGRESS",meta={"progress":85,"message":f"Ollama({ollama_model}) 교정 중..."})
full_text = _ollama_postprocess(raw_text, ollama_model) full_text=_ollama_postprocess(raw_text,ollama_model)
elif use_openrouter and openrouter_model and openrouter_key: elif use_openrouter and openrouter_model and openrouter_key:
self.update_state(state="PROGRESS",meta={"progress":85,"message":f"OpenRouter({openrouter_model}) 교정 중..."}) self.update_state(state="PROGRESS",meta={"progress":85,"message":f"OpenRouter({openrouter_model}) 교정 중..."})
full_text = _openrouter_postprocess(raw_text, openrouter_model, openrouter_url, openrouter_key) full_text=_openrouter_postprocess(raw_text,openrouter_model,openrouter_url,openrouter_key)
self.update_state(state="PROGRESS",meta={"progress":95,"message":"파일 저장 중..."}) self.update_state(state="PROGRESS",meta={"progress":95,"message":"파일 저장 중..."})
os.makedirs(OUTPUT_DIR, exist_ok=True) os.makedirs(OUTPUT_DIR,exist_ok=True)
output_filename = f"{file_id}.txt" output_filename=f"{file_id}.txt"
with open(os.path.join(OUTPUT_DIR, output_filename),"w",encoding="utf-8") as f: with open(os.path.join(OUTPUT_DIR,output_filename),"w",encoding="utf-8") as f:
f.write(f"# 변환 결과\n# 언어: {info.language} | 재생 시간: {duration:.1f}\n\n## 전체 텍스트\n\n{full_text}\n\n## 타임스탬프별 세그먼트\n\n") f.write(f"# 변환 결과\n# 언어: {detected_lang} | 재생시간: {duration:.1f}\n\n{full_text}\n\n## 타임스탬프\n\n")
for seg in segments: for seg in segments:
m,s=divmod(int(seg['start']),60) m,s=divmod(int(seg['start']),60)
f.write(f"[{m:02d}:{s:02d}] {seg['text']}\n") f.write(f"[{m:02d}:{s:02d}] {seg['text']}\n")
try: os.remove(audio_path)
for p in [audio_path, tmp_mp3]:
try:
if p: os.remove(p)
except: pass except: pass
return { return {
"text":full_text,"raw_text":raw_text,"segments":segments, "text":full_text,"raw_text":raw_text,"segments":segments,
"language":info.language,"duration":round(duration,1), "language":detected_lang,"duration":round(duration,1),
"output_file":output_filename, "output_file":output_filename,
"ollama_used":use_ollama and bool(ollama_model), "ollama_used":use_ollama and bool(ollama_model),
"ollama_model":ollama_model if (use_ollama and ollama_model) else "", "ollama_model":ollama_model if (use_ollama and ollama_model) else "",
"openrouter_used":use_openrouter and bool(openrouter_model) and bool(openrouter_key), "openrouter_used":use_openrouter and bool(openrouter_model) and bool(openrouter_key),
"openrouter_model":openrouter_model if (use_openrouter and openrouter_model) else "", "openrouter_model":openrouter_model if (use_openrouter and openrouter_model) else "",
"stt_engine":stt_engine,
} }
except Exception as e: except Exception as e:
for p in [audio_path, tmp_mp3]:
try:
if p: os.remove(p)
except: pass
raise Exception(f"변환 실패: {str(e)}") raise Exception(f"변환 실패: {str(e)}")
# ══════════════════════════════════════════════════════════════ # ══════════════════════════════════════════════════════════════
# 자막 파이프라인 태스크 # 자막 파이프라인 Task
# Step 1: ffmpeg → WAV # Step 1: ffmpeg → WAV
# Step 2: Whisper → 원어 SRT/VTT # Step 2: Whisper / API → 원어 자막
# Step 3: LLM → 번역 SRT/VTT (선택) # Step 2b: LLM 교정 (선택)
# Step 3: LLM 번역 (선택)
# ══════════════════════════════════════════════════════════════ # ══════════════════════════════════════════════════════════════
@celery_app.task(bind=True, name="tasks.subtitle_pipeline_task", queue="stt") @celery_app.task(bind=True, name="tasks.subtitle_pipeline_task", queue="stt")
def subtitle_pipeline_task( def subtitle_pipeline_task(
self, self,
file_id: str, file_id:str, video_path:str,
video_path: str, src_language:str="",
src_language: str = "", # 원어 코드 (빈칸=자동) subtitle_fmt:str="srt",
subtitle_fmt: str = "srt", # srt | vtt | both # STT 엔진
translate_to: str = "", # 번역 대상 (빈칸=번역 안 함) stt_engine:str="local",
trans_model: str = "", # 번역 모델 groq_api_key:str="", openai_api_key:str="",
trans_via: str = "ollama",# ollama | openrouter # 교정
openrouter_url: str = "", refine_model:str="", refine_via:str="ollama",
openrouter_key: str = "", # 번역
translate_to:str="", trans_model:str="", trans_via:str="ollama",
# 공통 API 설정
openrouter_url:str="", openrouter_key:str="",
# 타임아웃 (설정에서 받아옴)
subtitle_timeout:int=0, # 0=OLLAMA_TIMEOUT 기본값
): ):
os.makedirs(OUTPUT_DIR, exist_ok=True) os.makedirs(OUTPUT_DIR,exist_ok=True)
wav_path = os.path.join(os.path.dirname(video_path), f"{file_id}_audio.wav") wav_path=os.path.join(os.path.dirname(video_path),f"{file_id}_audio.wav")
result_files = {} tmp_mp3=None
result_files={}
timeout=subtitle_timeout if subtitle_timeout>0 else OLLAMA_TIMEOUT
def _prog(pct, step, step_msg, msg):
self.update_state(state="PROGRESS",meta={"progress":pct,"step":step,"step_msg":step_msg,"message":msg})
try: try:
# ── Step 1: ffmpeg 오디오 추출 ──────────────────────── # ── Step 1: ffmpeg ────────────────────────────────────
self.update_state(state="PROGRESS", meta={ _prog(5,1,"오디오 추출 중...","Step 1/3 — ffmpeg 오디오 추출 중...")
"progress": 5, cmd=["ffmpeg","-y","-i",video_path,"-vn","-ar","16000","-ac","1","-c:a","pcm_s16le",wav_path]
"step": 1, proc=subprocess.run(cmd,capture_output=True,timeout=600)
"step_msg": "오디오 추출 중...", if proc.returncode!=0:
"message": "Step 1/3 — ffmpeg 오디오 추출 중..." raise Exception(f"ffmpeg 오디오 추출 실패: {proc.stderr.decode(errors='replace')[-300:]}")
}) if not os.path.exists(wav_path) or os.path.getsize(wav_path)<1000:
cmd = [
"ffmpeg", "-y",
"-i", video_path,
"-vn", # 비디오 스트림 제거
"-ar", "16000", # 16kHz — Whisper 최적
"-ac", "1", # 모노
"-c:a", "pcm_s16le",# WAV 무손실
wav_path
]
proc = subprocess.run(cmd, capture_output=True, timeout=600)
if proc.returncode != 0:
err = proc.stderr.decode(errors="replace")[-500:]
raise Exception(f"ffmpeg 오디오 추출 실패: {err}")
if not os.path.exists(wav_path) or os.path.getsize(wav_path) < 1000:
raise Exception("ffmpeg가 오디오를 추출하지 못했습니다. 영상에 오디오 트랙이 있는지 확인하세요.") raise Exception("ffmpeg가 오디오를 추출하지 못했습니다. 영상에 오디오 트랙이 있는지 확인하세요.")
try: os.remove(video_path) try: os.remove(video_path)
except: pass except: pass
# ── Step 2: Whisper STT → 원어 자막 ─────────────────── # ── Step 2: STT ──────────────────────────────────────
self.update_state(state="PROGRESS", meta={ _prog(15,2,"음성 인식 중...","Step 2/3 — 음성 인식 시작...")
"progress": 15, segments=[]; duration=0.0; detected_lang=""
"step": 2,
"step_msg": "음성 인식 중...",
"message": "Step 2/3 — Whisper 음성 인식 시작..."
})
whisper = get_model()
lang = src_language.strip() or None
segments_gen, info = whisper.transcribe(
wav_path,
language=lang,
beam_size=BEAM_SIZE,
initial_prompt=INITIAL_PROMPT,
vad_filter=True,
vad_parameters=dict(min_silence_duration_ms=500),
word_timestamps=False,
)
segments = []
duration = info.duration
detected_lang = info.language
if stt_engine in ("groq","openai"):
api_key=groq_api_key if stt_engine=="groq" else openai_api_key
base_url=GROQ_BASE if stt_engine=="groq" else OPENAI_BASE
if not api_key:
raise Exception(f"{stt_engine.upper()} API 키가 없습니다. 설정에서 입력하세요.")
import tempfile
with tempfile.NamedTemporaryFile(suffix=".mp3",delete=False) as tf: tmp_mp3=tf.name
r=subprocess.run(["ffmpeg","-y","-i",wav_path,"-ar","16000","-ac","1","-b:a","128k",tmp_mp3],
capture_output=True,timeout=300)
if r.returncode!=0: raise Exception("MP3 변환 실패")
_prog(25,2,"API 음성 인식 중...",f"Step 2/3 — {stt_engine.upper()} API 인식 중...")
result=_api_transcribe(tmp_mp3,api_key,base_url,src_language)
segments=[{"start":round(s.get("start",0),3),"end":round(s.get("end",0),3),"text":s.get("text","").strip()}
for s in result.get("segments",[])]
detected_lang=result.get("language","")
duration=result.get("duration",0) or (segments[-1]["end"] if segments else 0)
try: os.remove(tmp_mp3); tmp_mp3=None
except: pass
else:
whisper=get_model()
lang=src_language.strip() or None
segments_gen,info=whisper.transcribe(wav_path,language=lang,beam_size=BEAM_SIZE,
initial_prompt=INITIAL_PROMPT,vad_filter=True,
vad_parameters=dict(min_silence_duration_ms=500),word_timestamps=False)
duration=info.duration; detected_lang=info.language
for seg in segments_gen: for seg in segments_gen:
segments.append({ segments.append({"start":round(seg.start,3),"end":round(seg.end,3),"text":seg.text.strip()})
"start": round(seg.start, 3), if duration>0:
"end": round(seg.end, 3), pct=15+int((seg.end/duration)*50)
"text": seg.text.strip(), _prog(min(pct,65),2,f"{seg.end:.0f}s/{duration:.0f}s 인식",f"Step 2/3 — {seg.end:.0f}s / {duration:.0f}s")
})
if duration > 0:
pct = 15 + int((seg.end / duration) * 55)
self.update_state(state="PROGRESS", meta={
"progress": min(pct, 70),
"step": 2,
"step_msg": f"{seg.end:.0f}s / {duration:.0f}s 인식 완료",
"message": f"Step 2/3 — {seg.end:.0f}s / {duration:.0f}s",
})
try: os.remove(wav_path) try: os.remove(wav_path); wav_path=None
except: pass except: pass
if not segments: if not segments:
raise Exception("음성이 감지되지 않았습니다. 영상에 음성이 있는지 확인하세요.") raise Exception("음성이 감지되지 않았습니다. 영상에 음성이 있는지 확인하세요.")
# ── Step 2b: LLM 교정 ────────────────────────────────
if refine_model.strip():
use_or_refine=(refine_via=="openrouter" and bool(openrouter_key))
total=len(segments); CHUNK=25; refined=[]
for ci,start in enumerate(range(0,total,CHUNK)):
chunk=segments[start:start+CHUNK]
pct=67+int((ci*CHUNK/total)*6)
_prog(min(pct,73),2,f"교정 {min(start+CHUNK,total)}/{total}",
f"Step 2/3 — LLM 교정 중... ({min(start+CHUNK,total)}/{total})")
batch=[s["text"] for s in chunk]
refined.extend(_refine_batch(batch,refine_model,use_or_refine,openrouter_url,openrouter_key,timeout))
segments=[{**seg,"text":refined[i] if i<len(refined) else seg["text"]}
for i,seg in enumerate(segments)]
# 원어 자막 저장 # 원어 자막 저장
lang_suffix = detected_lang lang_suffix=detected_lang
if subtitle_fmt in ("srt", "both"): if subtitle_fmt in ("srt","both"):
fn = f"{file_id}.{lang_suffix}.srt" fn=f"{file_id}.{lang_suffix}.srt"
with open(os.path.join(OUTPUT_DIR, fn), "w", encoding="utf-8") as f: with open(os.path.join(OUTPUT_DIR,fn),"w",encoding="utf-8") as f: f.write(make_srt(segments))
f.write(make_srt(segments)) result_files["srt_orig"]=fn
result_files["srt_orig"] = fn if subtitle_fmt in ("vtt","both"):
if subtitle_fmt in ("vtt", "both"): fn=f"{file_id}.{lang_suffix}.vtt"
fn = f"{file_id}.{lang_suffix}.vtt" with open(os.path.join(OUTPUT_DIR,fn),"w",encoding="utf-8") as f: f.write(make_vtt(segments))
with open(os.path.join(OUTPUT_DIR, fn), "w", encoding="utf-8") as f: result_files["vtt_orig"]=fn
f.write(make_vtt(segments))
result_files["vtt_orig"] = fn
# ── Step 3: LLM 번역 (선택) ─────────────────────────── # ── Step 3: LLM 번역 ─────────────────────────────────
translated_segments = None translated_segments=None
if translate_to and translate_to!=detected_lang and trans_model:
use_or=(trans_via=="openrouter" and bool(openrouter_key))
total=len(segments); CHUNK=25; trans_texts=[]
for ci,start in enumerate(range(0,total,CHUNK)):
chunk=segments[start:start+CHUNK]
pct=75+int((ci*CHUNK/total)*20)
_prog(min(pct,95),3,f"{min(start+CHUNK,total)}/{total} 번역",
f"Step 3/3 — {_lang_name(translate_to)}로 번역 중... ({min(start+CHUNK,total)}/{total})")
batch=[s["text"] for s in chunk]
trans_texts.extend(_translate_batch(batch,translate_to,use_or,trans_model,openrouter_url,openrouter_key,timeout))
translated_segments=[{**seg,"text":trans_texts[i] if i<len(trans_texts) else seg["text"]}
for i,seg in enumerate(segments)]
trans_suffix=translate_to
if subtitle_fmt in ("srt","both"):
fn=f"{file_id}.{trans_suffix}.srt"
with open(os.path.join(OUTPUT_DIR,fn),"w",encoding="utf-8") as f: f.write(make_srt(translated_segments))
result_files["srt_trans"]=fn
if subtitle_fmt in ("vtt","both"):
fn=f"{file_id}.{trans_suffix}.vtt"
with open(os.path.join(OUTPUT_DIR,fn),"w",encoding="utf-8") as f: f.write(make_vtt(translated_segments))
result_files["vtt_trans"]=fn
if translate_to and translate_to != detected_lang and trans_model: _prog(99,3,"완료","자막 파일 저장 완료")
target_name = _lang_name(translate_to)
use_or = (trans_via == "openrouter" and bool(openrouter_key))
total = len(segments)
CHUNK = 25 # 한 번에 25개씩 번역
translated_texts = []
for ci, start in enumerate(range(0, total, CHUNK)):
chunk = segments[start:start+CHUNK]
pct = 72 + int((ci * CHUNK / total) * 22)
self.update_state(state="PROGRESS", meta={
"progress": min(pct, 94),
"step": 3,
"step_msg": f"{min(start+CHUNK, total)}/{total}개 번역 완료",
"message": f"Step 3/3 — {target_name}로 번역 중... ({min(start+CHUNK,total)}/{total})",
})
batch_texts = [s["text"] for s in chunk]
translated = _translate_batch(
batch_texts, translate_to,
use_openrouter=use_or,
model=trans_model,
openrouter_url=openrouter_url,
openrouter_key=openrouter_key,
)
translated_texts.extend(translated)
# 번역된 텍스트 → 세그먼트 조합 (타임스탬프 유지)
translated_segments = [
{**seg, "text": translated_texts[i] if i < len(translated_texts) else seg["text"]}
for i, seg in enumerate(segments)
]
# 번역 자막 저장
trans_suffix = translate_to
if subtitle_fmt in ("srt", "both"):
fn = f"{file_id}.{trans_suffix}.srt"
with open(os.path.join(OUTPUT_DIR, fn), "w", encoding="utf-8") as f:
f.write(make_srt(translated_segments))
result_files["srt_trans"] = fn
if subtitle_fmt in ("vtt", "both"):
fn = f"{file_id}.{trans_suffix}.vtt"
with open(os.path.join(OUTPUT_DIR, fn), "w", encoding="utf-8") as f:
f.write(make_vtt(translated_segments))
result_files["vtt_trans"] = fn
self.update_state(state="PROGRESS", meta={
"progress": 98, "step": 3,
"step_msg": "완료", "message": "자막 파일 저장 완료"
})
return { return {
"detected_language": detected_lang, "detected_language":detected_lang,
"duration": round(duration, 1), "duration":round(duration,1),
"segment_count": len(segments), "segment_count":len(segments),
"translated": bool(translated_segments), "stt_engine":stt_engine,
"translate_to": translate_to if translated_segments else "", "translated":bool(translated_segments),
"subtitle_fmt": subtitle_fmt, "translate_to":translate_to if translated_segments else "",
# 파일 "subtitle_fmt":subtitle_fmt,
"srt_orig": result_files.get("srt_orig", ""), "refine_model":refine_model if refine_model.strip() else "",
"vtt_orig": result_files.get("vtt_orig", ""), "srt_orig":result_files.get("srt_orig",""),
"srt_trans": result_files.get("srt_trans", ""), "vtt_orig":result_files.get("vtt_orig",""),
"vtt_trans": result_files.get("vtt_trans", ""), "srt_trans":result_files.get("srt_trans",""),
"vtt_trans":result_files.get("vtt_trans",""),
} }
except Exception as e: except Exception as e:
# 임시 파일 정리 for p in [video_path, wav_path, tmp_mp3]:
for p in [video_path, wav_path]: try:
try: os.remove(p) if p and os.path.exists(p): os.remove(p)
except: pass except: pass
raise Exception(f"자막 생성 실패: {str(e)}") raise Exception(f"자막 생성 실패: {str(e)}")