Files
whisper-stt/app/auth.py

147 lines
5.7 KiB
Python

"""
인증 모듈 — 다중 사용자 JSON 파일 기반
/data/users.json 에 사용자 정보 저장
관리자(admin)는 환경변수 AUTH_USERNAME/AUTH_PASSWORD 기준으로 초기화
"""
import os, json, threading
from pathlib import Path
from datetime import datetime, timedelta
from fastapi import Depends, HTTPException, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from jose import JWTError, jwt
SECRET_KEY = os.getenv("JWT_SECRET", "fallback-secret-change-this")
ALGORITHM = "HS256"
EXPIRE_HOURS = int(os.getenv("JWT_EXPIRE_HOURS", "12"))
ADMIN_USERNAME = os.getenv("AUTH_USERNAME", "admin")
ADMIN_PASSWORD = os.getenv("AUTH_PASSWORD", "changeme1234")
DATA_DIR = Path(os.getenv("UPLOAD_DIR", "/data/uploads")).parent
USERS_FILE = DATA_DIR / "users.json"
_lock = threading.Lock()
bearer = HTTPBearer(auto_error=False)
# ── 파일 I/O ───────────────────────────────────────────────────
def _load() -> dict:
if not USERS_FILE.exists():
return {}
with open(USERS_FILE, "r", encoding="utf-8") as f:
return json.load(f)
def _save(users: dict):
USERS_FILE.parent.mkdir(parents=True, exist_ok=True)
with open(USERS_FILE, "w", encoding="utf-8") as f:
json.dump(users, f, ensure_ascii=False, indent=2)
# ── 초기화 (앱 시작 시 1회) ────────────────────────────────────
def init_users():
with _lock:
users = _load()
# 관리자 계정은 항상 env var 기준으로 동기화
users[ADMIN_USERNAME] = {
"password": ADMIN_PASSWORD,
"role": "admin",
"permissions": {"stt": True, "ocr": True},
}
_save(users)
# ── CRUD ──────────────────────────────────────────────────────
def authenticate(username: str, password: str):
"""성공 시 user dict, 실패 시 None"""
with _lock:
users = _load()
u = users.get(username)
if not u or u["password"] != password:
return None
return {"username": username, **u}
def get_user(username: str):
with _lock:
return _load().get(username)
def list_users() -> dict:
with _lock:
users = _load()
# 비밀번호 마스킹
return {k: {**{kk: vv for kk, vv in v.items() if kk != "password"}}
for k, v in users.items()}
def create_user(username: str, password: str, permissions: dict) -> tuple:
with _lock:
users = _load()
if username in users:
return False, "이미 존재하는 사용자입니다"
users[username] = {"password": password, "role": "user",
"permissions": permissions}
_save(users)
return True, "사용자가 생성되었습니다"
def update_user(username: str, permissions: dict, password: str = None) -> tuple:
if username == ADMIN_USERNAME:
return False, "기본 관리자 계정은 수정할 수 없습니다"
with _lock:
users = _load()
if username not in users:
return False, "사용자를 찾을 수 없습니다"
users[username]["permissions"] = permissions
if password:
users[username]["password"] = password
_save(users)
return True, "업데이트되었습니다"
def delete_user(username: str) -> tuple:
if username == ADMIN_USERNAME:
return False, "기본 관리자 계정은 삭제할 수 없습니다"
with _lock:
users = _load()
if username not in users:
return False, "사용자를 찾을 수 없습니다"
del users[username]
_save(users)
return True, "삭제되었습니다"
# ── JWT ───────────────────────────────────────────────────────
def create_access_token(username: str) -> str:
exp = datetime.utcnow() + timedelta(hours=EXPIRE_HOURS)
return jwt.encode({"sub": username, "exp": exp}, SECRET_KEY, algorithm=ALGORITHM)
# ── FastAPI 의존성 ────────────────────────────────────────────
def require_auth(credentials: HTTPAuthorizationCredentials = Depends(bearer)) -> dict:
if credentials is None:
raise HTTPException(401, "인증이 필요합니다",
headers={"WWW-Authenticate": "Bearer"})
try:
payload = jwt.decode(credentials.credentials, SECRET_KEY, algorithms=[ALGORITHM])
username = payload.get("sub")
if not username:
raise JWTError()
u = get_user(username)
if not u:
raise JWTError()
return {"username": username, **u}
except JWTError:
raise HTTPException(401, "토큰이 유효하지 않거나 만료되었습니다",
headers={"WWW-Authenticate": "Bearer"})
def require_admin(user: dict = Depends(require_auth)) -> dict:
if user.get("role") != "admin":
raise HTTPException(403, "관리자 권한이 필요합니다")
return user
def require_stt(user: dict = Depends(require_auth)) -> dict:
if not user.get("permissions", {}).get("stt", False):
raise HTTPException(403, "STT 사용 권한이 없습니다")
return user
def require_ocr(user: dict = Depends(require_auth)) -> dict:
if not user.get("permissions", {}).get("ocr", False):
raise HTTPException(403, "OCR 사용 권한이 없습니다")
return user