""" 인증 모듈 — 다중 사용자 JSON 파일 기반 권한: stt | ocr | subtitle """ import os, json, threading from pathlib import Path from datetime import datetime, timedelta from fastapi import Depends, HTTPException from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials from jose import JWTError, jwt SECRET_KEY = os.getenv("JWT_SECRET", "fallback-secret-change-this") ALGORITHM = "HS256" EXPIRE_HOURS = int(os.getenv("JWT_EXPIRE_HOURS", "12")) ADMIN_USERNAME = os.getenv("AUTH_USERNAME", "admin") ADMIN_PASSWORD = os.getenv("AUTH_PASSWORD", "changeme1234") DATA_DIR = Path(os.getenv("UPLOAD_DIR", "/data/uploads")).parent USERS_FILE = DATA_DIR / "users.json" _lock = threading.Lock() bearer = HTTPBearer(auto_error=False) def _load() -> dict: if not USERS_FILE.exists(): return {} with open(USERS_FILE, "r", encoding="utf-8") as f: return json.load(f) def _save(users: dict): USERS_FILE.parent.mkdir(parents=True, exist_ok=True) with open(USERS_FILE, "w", encoding="utf-8") as f: json.dump(users, f, ensure_ascii=False, indent=2) def init_users(): with _lock: users = _load() users[ADMIN_USERNAME] = { "password": ADMIN_PASSWORD, "role": "admin", "permissions": { "stt": True, "ocr": True, "subtitle": True, "allowed_stt_models": [], "allowed_ocr_models": [], }, } _save(users) def authenticate(username: str, password: str): with _lock: users = _load() u = users.get(username) if not u or u["password"] != password: return None return {"username": username, **u} def get_user(username: str): with _lock: return _load().get(username) def list_users() -> dict: with _lock: users = _load() return {k: {kk: vv for kk, vv in v.items() if kk != "password"} for k, v in users.items()} def create_user(username: str, password: str, permissions: dict) -> tuple: with _lock: users = _load() if username in users: return False, "이미 존재하는 사용자입니다" permissions.setdefault("allowed_stt_models", []) permissions.setdefault("allowed_ocr_models", []) permissions.setdefault("subtitle", False) users[username] = {"password": password, "role": "user", "permissions": permissions} _save(users) return True, "사용자가 생성되었습니다" def update_user(username: str, permissions: dict, password: str = None) -> tuple: if username == ADMIN_USERNAME: return False, "기본 관리자 계정은 수정할 수 없습니다" with _lock: users = _load() if username not in users: return False, "사용자를 찾을 수 없습니다" permissions.setdefault("allowed_stt_models", []) permissions.setdefault("allowed_ocr_models", []) permissions.setdefault("subtitle", False) users[username]["permissions"] = permissions if password: users[username]["password"] = password _save(users) return True, "업데이트되었습니다" def delete_user(username: str) -> tuple: if username == ADMIN_USERNAME: return False, "기본 관리자 계정은 삭제할 수 없습니다" with _lock: users = _load() if username not in users: return False, "사용자를 찾을 수 없습니다" del users[username]; _save(users) return True, "삭제되었습니다" def create_access_token(username: str) -> str: exp = datetime.utcnow() + timedelta(hours=EXPIRE_HOURS) return jwt.encode({"sub": username, "exp": exp}, SECRET_KEY, algorithm=ALGORITHM) def require_auth(credentials: HTTPAuthorizationCredentials = Depends(bearer)) -> dict: if credentials is None: raise HTTPException(401, "인증이 필요합니다", headers={"WWW-Authenticate": "Bearer"}) try: payload = jwt.decode(credentials.credentials, SECRET_KEY, algorithms=[ALGORITHM]) username = payload.get("sub") if not username: raise JWTError() u = get_user(username) if not u: raise JWTError() return {"username": username, **u} except JWTError: raise HTTPException(401, "토큰이 유효하지 않거나 만료되었습니다", headers={"WWW-Authenticate": "Bearer"}) def require_admin(user: dict = Depends(require_auth)) -> dict: if user.get("role") != "admin": raise HTTPException(403, "관리자 권한이 필요합니다") return user def require_stt(user: dict = Depends(require_auth)) -> dict: if not user.get("permissions", {}).get("stt", False): raise HTTPException(403, "STT 사용 권한이 없습니다") return user def require_ocr(user: dict = Depends(require_auth)) -> dict: if not user.get("permissions", {}).get("ocr", False): raise HTTPException(403, "OCR 사용 권한이 없습니다") return user def require_subtitle(user: dict = Depends(require_auth)) -> dict: if not user.get("permissions", {}).get("subtitle", False): raise HTTPException(403, "자막 사용 권한이 없습니다") return user