Files
2026-04-18 05:59:31 +09:00

114 lines
3.4 KiB
Python

"""인증 서비스 — JWT 토큰 + 비밀번호 해싱"""
from datetime import datetime, timezone, timedelta
from typing import Optional
from fastapi import Depends, HTTPException, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from jose import JWTError, jwt
from passlib.context import CryptContext
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.config import get_settings
from app.database import get_db
from app.models import User, UserRole
settings = get_settings()
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
security = HTTPBearer(auto_error=False)
# ── 비밀번호 ──
def hash_password(password: str) -> str:
return pwd_context.hash(password)
def verify_password(plain: str, hashed: str) -> bool:
return pwd_context.verify(plain, hashed)
# ── JWT 토큰 ──
def create_token(user_id: int, username: str, role: str) -> str:
expire = datetime.now(timezone.utc) + timedelta(
minutes=settings.JWT_EXPIRE_MINUTES
)
payload = {
"sub": str(user_id),
"username": username,
"role": role,
"exp": expire,
}
return jwt.encode(payload, settings.JWT_SECRET, algorithm=settings.JWT_ALGORITHM)
def decode_token(token: str) -> Optional[dict]:
try:
payload = jwt.decode(
token, settings.JWT_SECRET, algorithms=[settings.JWT_ALGORITHM]
)
return payload
except JWTError:
return None
# ── FastAPI 인증 의존성 ──
async def get_current_user(
credentials: Optional[HTTPAuthorizationCredentials] = Depends(security),
db: AsyncSession = Depends(get_db),
) -> User:
"""현재 로그인된 사용자 반환 — 인증 필수 엔드포인트에 사용"""
if not credentials:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="인증이 필요합니다",
headers={"WWW-Authenticate": "Bearer"},
)
payload = decode_token(credentials.credentials)
if not payload:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="유효하지 않은 토큰입니다",
)
user_id = int(payload.get("sub", 0))
result = await db.execute(select(User).where(User.id == user_id))
user = result.scalar_one_or_none()
if not user or not user.is_active:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="비활성화된 계정입니다",
)
return user
async def require_admin(user: User = Depends(get_current_user)) -> User:
"""관리자 권한 필수"""
if user.role != UserRole.ADMIN:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="관리자 권한이 필요합니다",
)
return user
async def optional_auth(
credentials: Optional[HTTPAuthorizationCredentials] = Depends(security),
db: AsyncSession = Depends(get_db),
) -> Optional[User]:
"""인증 선택 — 토큰 있으면 사용자 반환, 없으면 None"""
if not credentials:
return None
payload = decode_token(credentials.credentials)
if not payload:
return None
user_id = int(payload.get("sub", 0))
result = await db.execute(select(User).where(User.id == user_id))
return result.scalar_one_or_none()