""" 인증 모듈 — 다중 사용자 JSON 파일 기반 사용자 구조: { "password": "...", "role": "admin" | "user", "permissions": { "stt": true | false, "ocr": true | false, "allowed_stt_models": ["medium", "large-v3", ...], # 빈 배열 = 모두 허용 "allowed_ocr_models": ["granite3.2-vision", ...] # 빈 배열 = 모두 허용 } } """ import os, json, threading from pathlib import Path from datetime import datetime, timedelta from fastapi import Depends, HTTPException, status from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials from jose import JWTError, jwt SECRET_KEY = os.getenv("JWT_SECRET", "fallback-secret-change-this") ALGORITHM = "HS256" EXPIRE_HOURS = int(os.getenv("JWT_EXPIRE_HOURS", "12")) ADMIN_USERNAME = os.getenv("AUTH_USERNAME", "admin") ADMIN_PASSWORD = os.getenv("AUTH_PASSWORD", "changeme1234") DATA_DIR = Path(os.getenv("UPLOAD_DIR", "/data/uploads")).parent USERS_FILE = DATA_DIR / "users.json" _lock = threading.Lock() bearer = HTTPBearer(auto_error=False) # ── 파일 I/O ────────────────────────────────────────────────── def _load() -> dict: if not USERS_FILE.exists(): return {} with open(USERS_FILE, "r", encoding="utf-8") as f: return json.load(f) def _save(users: dict): USERS_FILE.parent.mkdir(parents=True, exist_ok=True) with open(USERS_FILE, "w", encoding="utf-8") as f: json.dump(users, f, ensure_ascii=False, indent=2) # ── 초기화 ──────────────────────────────────────────────────── def init_users(): with _lock: users = _load() users[ADMIN_USERNAME] = { "password": ADMIN_PASSWORD, "role": "admin", "permissions": { "stt": True, "ocr": True, "allowed_stt_models": [], # 빈 배열 = 제한 없음 "allowed_ocr_models": [], }, } _save(users) # ── CRUD ────────────────────────────────────────────────────── def authenticate(username: str, password: str): with _lock: users = _load() u = users.get(username) if not u or u["password"] != password: return None return {"username": username, **u} def get_user(username: str): with _lock: return _load().get(username) def list_users() -> dict: with _lock: users = _load() return {k: {kk: vv for kk, vv in v.items() if kk != "password"} for k, v in users.items()} def create_user(username: str, password: str, permissions: dict) -> tuple: with _lock: users = _load() if username in users: return False, "이미 존재하는 사용자입니다" # 기본값 보완 permissions.setdefault("allowed_stt_models", []) permissions.setdefault("allowed_ocr_models", []) users[username] = {"password": password, "role": "user", "permissions": permissions} _save(users) return True, "사용자가 생성되었습니다" def update_user(username: str, permissions: dict, password: str = None) -> tuple: if username == ADMIN_USERNAME: return False, "기본 관리자 계정은 수정할 수 없습니다" with _lock: users = _load() if username not in users: return False, "사용자를 찾을 수 없습니다" permissions.setdefault("allowed_stt_models", []) permissions.setdefault("allowed_ocr_models", []) users[username]["permissions"] = permissions if password: users[username]["password"] = password _save(users) return True, "업데이트되었습니다" def delete_user(username: str) -> tuple: if username == ADMIN_USERNAME: return False, "기본 관리자 계정은 삭제할 수 없습니다" with _lock: users = _load() if username not in users: return False, "사용자를 찾을 수 없습니다" del users[username]; _save(users) return True, "삭제되었습니다" # ── JWT ─────────────────────────────────────────────────────── def create_access_token(username: str) -> str: exp = datetime.utcnow() + timedelta(hours=EXPIRE_HOURS) return jwt.encode({"sub": username, "exp": exp}, SECRET_KEY, algorithm=ALGORITHM) # ── FastAPI 의존성 ───────────────────────────────────────────── def require_auth(credentials: HTTPAuthorizationCredentials = Depends(bearer)) -> dict: if credentials is None: raise HTTPException(401, "인증이 필요합니다", headers={"WWW-Authenticate": "Bearer"}) try: payload = jwt.decode(credentials.credentials, SECRET_KEY, algorithms=[ALGORITHM]) username = payload.get("sub") if not username: raise JWTError() u = get_user(username) if not u: raise JWTError() return {"username": username, **u} except JWTError: raise HTTPException(401, "토큰이 유효하지 않거나 만료되었습니다", headers={"WWW-Authenticate": "Bearer"}) def require_admin(user: dict = Depends(require_auth)) -> dict: if user.get("role") != "admin": raise HTTPException(403, "관리자 권한이 필요합니다") return user def require_stt(user: dict = Depends(require_auth)) -> dict: if not user.get("permissions", {}).get("stt", False): raise HTTPException(403, "STT 사용 권한이 없습니다") return user def require_ocr(user: dict = Depends(require_auth)) -> dict: if not user.get("permissions", {}).get("ocr", False): raise HTTPException(403, "OCR 사용 권한이 없습니다") return user