"""인증 서비스 — JWT 토큰 + 비밀번호 해싱""" from datetime import datetime, timezone, timedelta from typing import Optional from fastapi import Depends, HTTPException, status from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials from jose import JWTError, jwt from passlib.context import CryptContext from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession from app.config import get_settings from app.database import get_db from app.models import User, UserRole settings = get_settings() pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") security = HTTPBearer(auto_error=False) # ── 비밀번호 ── def hash_password(password: str) -> str: return pwd_context.hash(password) def verify_password(plain: str, hashed: str) -> bool: return pwd_context.verify(plain, hashed) # ── JWT 토큰 ── def create_token(user_id: int, username: str, role: str) -> str: expire = datetime.now(timezone.utc) + timedelta( minutes=settings.JWT_EXPIRE_MINUTES ) payload = { "sub": str(user_id), "username": username, "role": role, "exp": expire, } return jwt.encode(payload, settings.JWT_SECRET, algorithm=settings.JWT_ALGORITHM) def decode_token(token: str) -> Optional[dict]: try: payload = jwt.decode( token, settings.JWT_SECRET, algorithms=[settings.JWT_ALGORITHM] ) return payload except JWTError: return None # ── FastAPI 인증 의존성 ── async def get_current_user( credentials: Optional[HTTPAuthorizationCredentials] = Depends(security), db: AsyncSession = Depends(get_db), ) -> User: """현재 로그인된 사용자 반환 — 인증 필수 엔드포인트에 사용""" if not credentials: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="인증이 필요합니다", headers={"WWW-Authenticate": "Bearer"}, ) payload = decode_token(credentials.credentials) if not payload: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="유효하지 않은 토큰입니다", ) user_id = int(payload.get("sub", 0)) result = await db.execute(select(User).where(User.id == user_id)) user = result.scalar_one_or_none() if not user or not user.is_active: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="비활성화된 계정입니다", ) return user async def require_admin(user: User = Depends(get_current_user)) -> User: """관리자 권한 필수""" if user.role != UserRole.ADMIN: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="관리자 권한이 필요합니다", ) return user async def optional_auth( credentials: Optional[HTTPAuthorizationCredentials] = Depends(security), db: AsyncSession = Depends(get_db), ) -> Optional[User]: """인증 선택 — 토큰 있으면 사용자 반환, 없으면 None""" if not credentials: return None payload = decode_token(credentials.credentials) if not payload: return None user_id = int(payload.get("sub", 0)) result = await db.execute(select(User).where(User.id == user_id)) return result.scalar_one_or_none()