fix: 이력 처리중 버그 수정 + 관리자 모델 제한 기능

This commit is contained in:
root
2026-04-23 07:38:22 +09:00
parent 4af1279a08
commit f9075ae3f6
4 changed files with 412 additions and 181 deletions

View File

@@ -1,7 +1,16 @@
"""
인증 모듈 — 다중 사용자 JSON 파일 기반
/data/users.json 에 사용자 정보 저장
관리자(admin)는 환경변수 AUTH_USERNAME/AUTH_PASSWORD 기준으로 초기화
사용자 구조:
{
"password": "...",
"role": "admin" | "user",
"permissions": {
"stt": true | false,
"ocr": true | false,
"allowed_stt_models": ["medium", "large-v3", ...], # 빈 배열 = 모두 허용
"allowed_ocr_models": ["granite3.2-vision", ...] # 빈 배열 = 모두 허용
}
}
"""
import os, json, threading
from pathlib import Path
@@ -20,14 +29,13 @@ ADMIN_PASSWORD = os.getenv("AUTH_PASSWORD", "changeme1234")
DATA_DIR = Path(os.getenv("UPLOAD_DIR", "/data/uploads")).parent
USERS_FILE = DATA_DIR / "users.json"
_lock = threading.Lock()
_lock = threading.Lock()
bearer = HTTPBearer(auto_error=False)
# ── 파일 I/O ──────────────────────────────────────────────────
# ── 파일 I/O ──────────────────────────────────────────────────
def _load() -> dict:
if not USERS_FILE.exists():
return {}
if not USERS_FILE.exists(): return {}
with open(USERS_FILE, "r", encoding="utf-8") as f:
return json.load(f)
@@ -37,72 +45,66 @@ def _save(users: dict):
json.dump(users, f, ensure_ascii=False, indent=2)
# ── 초기화 (앱 시작 시 1회) ────────────────────────────────────
# ── 초기화 ────────────────────────────────────────────────────
def init_users():
with _lock:
users = _load()
# 관리자 계정은 항상 env var 기준으로 동기화
users[ADMIN_USERNAME] = {
"password": ADMIN_PASSWORD,
"role": "admin",
"permissions": {"stt": True, "ocr": True},
"password": ADMIN_PASSWORD,
"role": "admin",
"permissions": {
"stt": True, "ocr": True,
"allowed_stt_models": [], # 빈 배열 = 제한 없음
"allowed_ocr_models": [],
},
}
_save(users)
# ── CRUD ──────────────────────────────────────────────────────
def authenticate(username: str, password: str):
"""성공 시 user dict, 실패 시 None"""
with _lock:
users = _load()
with _lock: users = _load()
u = users.get(username)
if not u or u["password"] != password:
return None
if not u or u["password"] != password: return None
return {"username": username, **u}
def get_user(username: str):
with _lock:
return _load().get(username)
with _lock: return _load().get(username)
def list_users() -> dict:
with _lock:
users = _load()
# 비밀번호 마스킹
return {k: {**{kk: vv for kk, vv in v.items() if kk != "password"}}
with _lock: users = _load()
return {k: {kk: vv for kk, vv in v.items() if kk != "password"}
for k, v in users.items()}
def create_user(username: str, password: str, permissions: dict) -> tuple:
with _lock:
users = _load()
if username in users:
return False, "이미 존재하는 사용자입니다"
users[username] = {"password": password, "role": "user",
"permissions": permissions}
if username in users: return False, "이미 존재하는 사용자입니다"
# 기본값 보완
permissions.setdefault("allowed_stt_models", [])
permissions.setdefault("allowed_ocr_models", [])
users[username] = {"password": password, "role": "user", "permissions": permissions}
_save(users)
return True, "사용자가 생성되었습니다"
def update_user(username: str, permissions: dict, password: str = None) -> tuple:
if username == ADMIN_USERNAME:
return False, "기본 관리자 계정은 수정할 수 없습니다"
if username == ADMIN_USERNAME: return False, "기본 관리자 계정은 수정할 수 없습니다"
with _lock:
users = _load()
if username not in users:
return False, "사용자를 찾을 수 없습니다"
if username not in users: return False, "사용자를 찾을 수 없습니다"
permissions.setdefault("allowed_stt_models", [])
permissions.setdefault("allowed_ocr_models", [])
users[username]["permissions"] = permissions
if password:
users[username]["password"] = password
if password: users[username]["password"] = password
_save(users)
return True, "업데이트되었습니다"
def delete_user(username: str) -> tuple:
if username == ADMIN_USERNAME:
return False, "기본 관리자 계정은 삭제할 수 없습니다"
if username == ADMIN_USERNAME: return False, "기본 관리자 계정은 삭제할 수 없습니다"
with _lock:
users = _load()
if username not in users:
return False, "사용자를 찾을 수 없습니다"
del users[username]
_save(users)
if username not in users: return False, "사용자를 찾을 수 없습니다"
del users[username]; _save(users)
return True, "삭제되었습니다"
@@ -112,35 +114,28 @@ def create_access_token(username: str) -> str:
return jwt.encode({"sub": username, "exp": exp}, SECRET_KEY, algorithm=ALGORITHM)
# ── FastAPI 의존성 ────────────────────────────────────────────
# ── FastAPI 의존성 ────────────────────────────────────────────
def require_auth(credentials: HTTPAuthorizationCredentials = Depends(bearer)) -> dict:
if credentials is None:
raise HTTPException(401, "인증이 필요합니다",
headers={"WWW-Authenticate": "Bearer"})
raise HTTPException(401, "인증이 필요합니다", headers={"WWW-Authenticate": "Bearer"})
try:
payload = jwt.decode(credentials.credentials, SECRET_KEY, algorithms=[ALGORITHM])
username = payload.get("sub")
if not username:
raise JWTError()
if not username: raise JWTError()
u = get_user(username)
if not u:
raise JWTError()
if not u: raise JWTError()
return {"username": username, **u}
except JWTError:
raise HTTPException(401, "토큰이 유효하지 않거나 만료되었습니다",
headers={"WWW-Authenticate": "Bearer"})
raise HTTPException(401, "토큰이 유효하지 않거나 만료되었습니다", headers={"WWW-Authenticate": "Bearer"})
def require_admin(user: dict = Depends(require_auth)) -> dict:
if user.get("role") != "admin":
raise HTTPException(403, "관리자 권한이 필요합니다")
if user.get("role") != "admin": raise HTTPException(403, "관리자 권한이 필요합니다")
return user
def require_stt(user: dict = Depends(require_auth)) -> dict:
if not user.get("permissions", {}).get("stt", False):
raise HTTPException(403, "STT 사용 권한이 없습니다")
if not user.get("permissions", {}).get("stt", False): raise HTTPException(403, "STT 사용 권한이 없습니다")
return user
def require_ocr(user: dict = Depends(require_auth)) -> dict:
if not user.get("permissions", {}).get("ocr", False):
raise HTTPException(403, "OCR 사용 권한이 없습니다")
if not user.get("permissions", {}).get("ocr", False): raise HTTPException(403, "OCR 사용 권한이 없습니다")
return user