초기 커밋 - EV AS 관리 시스템

This commit is contained in:
root
2026-04-18 06:18:58 +09:00
commit 7a5c397983
52 changed files with 6044 additions and 0 deletions

14
backend/Dockerfile Normal file
View File

@@ -0,0 +1,14 @@
FROM python:3.11-slim
WORKDIR /app
RUN apt-get update && apt-get install -y \
libpq-dev gcc libzbar0 \
&& rm -rf /var/lib/apt/lists/*
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]

70
backend/auth.py Normal file
View File

@@ -0,0 +1,70 @@
import os
import bcrypt
from datetime import datetime, timedelta
from typing import Optional
from jose import JWTError, jwt
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from sqlalchemy.orm import Session
from database import get_db
import models
SECRET_KEY = os.getenv("SECRET_KEY", "changeme")
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_HOURS = 24
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/api/auth/login")
def hash_password(password: str) -> str:
"""비밀번호 bcrypt 해시 생성"""
salt = bcrypt.gensalt(rounds=12)
return bcrypt.hashpw(password.encode("utf-8"), salt).decode("utf-8")
def verify_password(plain: str, hashed: str) -> bool:
"""비밀번호 검증"""
try:
return bcrypt.checkpw(plain.encode("utf-8"), hashed.encode("utf-8"))
except Exception:
return False
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None) -> str:
to_encode = data.copy()
expire = datetime.utcnow() + (expires_delta or timedelta(hours=ACCESS_TOKEN_EXPIRE_HOURS))
to_encode.update({"exp": expire})
return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
def get_current_user(
token: str = Depends(oauth2_scheme),
db: Session = Depends(get_db)
) -> models.User:
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="인증 정보가 유효하지 않습니다.",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
user_id: str = payload.get("sub")
if user_id is None:
raise credentials_exception
except JWTError:
raise credentials_exception
user = db.query(models.User).filter(
models.User.id == int(user_id),
models.User.is_active == True
).first()
if user is None:
raise credentials_exception
return user
def require_role(*roles):
def checker(current_user: models.User = Depends(get_current_user)):
if current_user.role not in roles:
raise HTTPException(status_code=403, detail="권한이 없습니다.")
return current_user
return checker
require_admin = require_role("admin")
require_mechanic = require_role("mechanic", "admin")
require_manufacturer = require_role("manufacturer", "admin")

18
backend/database.py Normal file
View File

@@ -0,0 +1,18 @@
import os
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, DeclarativeBase
DATABASE_URL = os.getenv("DATABASE_URL", "postgresql://evuser:evpassword@db:5432/evcharger")
engine = create_engine(DATABASE_URL, pool_pre_ping=True)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
class Base(DeclarativeBase):
pass
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()

169
backend/init_db.sql Normal file
View File

@@ -0,0 +1,169 @@
-- ============================================================
-- EV 충전기 AS 관리 시스템 DB 스키마
-- ============================================================
CREATE TABLE IF NOT EXISTS charger_types (
id SERIAL PRIMARY KEY,
name VARCHAR(100) NOT NULL,
description TEXT,
created_at TIMESTAMP DEFAULT NOW()
);
CREATE TABLE IF NOT EXISTS users (
id SERIAL PRIMARY KEY,
username VARCHAR(50) UNIQUE NOT NULL,
password_hash VARCHAR(255) NOT NULL,
role VARCHAR(20) NOT NULL CHECK (role IN ('admin','mechanic','manufacturer')),
company VARCHAR(100),
name VARCHAR(50) NOT NULL,
phone VARCHAR(20),
email VARCHAR(100),
is_active BOOLEAN DEFAULT TRUE,
created_at TIMESTAMP DEFAULT NOW()
);
CREATE TABLE IF NOT EXISTS chargers (
id VARCHAR(50) PRIMARY KEY,
charger_type_id INT REFERENCES charger_types(id),
name VARCHAR(100) NOT NULL,
station_name VARCHAR(100) NOT NULL,
location_detail TEXT,
cpo_name VARCHAR(100),
installed_at DATE,
gps_lat DOUBLE PRECISION,
gps_lng DOUBLE PRECISION,
is_active BOOLEAN DEFAULT TRUE,
created_at TIMESTAMP DEFAULT NOW()
);
CREATE TABLE IF NOT EXISTS reports (
id SERIAL PRIMARY KEY,
charger_id VARCHAR(50) REFERENCES chargers(id),
issue_types TEXT[] NOT NULL,
issue_detail TEXT,
error_code VARCHAR(100),
occurred_at TIMESTAMP,
contact VARCHAR(20),
consent BOOLEAN DEFAULT FALSE,
gps_lat DOUBLE PRECISION,
gps_lng DOUBLE PRECISION,
status VARCHAR(30) DEFAULT 'pending'
CHECK (status IN ('pending_approval','pending','in_progress','done','waiting','revisit')),
reported_at TIMESTAMP DEFAULT NOW()
);
CREATE TABLE IF NOT EXISTS report_photos (
id SERIAL PRIMARY KEY,
report_id INT REFERENCES reports(id) ON DELETE CASCADE,
file_path VARCHAR(255) NOT NULL,
uploaded_at TIMESTAMP DEFAULT NOW()
);
CREATE TABLE IF NOT EXISTS repairs (
id SERIAL PRIMARY KEY,
mechanic_id INT REFERENCES users(id),
repair_types TEXT[] NOT NULL,
description TEXT NOT NULL,
started_at TIMESTAMP NOT NULL,
completed_at TIMESTAMP,
result_status VARCHAR(20) DEFAULT 'done'
CHECK (result_status IN ('done','waiting','revisit'))
);
CREATE TABLE IF NOT EXISTS repair_reports (
repair_id INT REFERENCES repairs(id) ON DELETE CASCADE,
report_id INT REFERENCES reports(id) ON DELETE CASCADE,
PRIMARY KEY (repair_id, report_id)
);
CREATE TABLE IF NOT EXISTS repair_photos (
id SERIAL PRIMARY KEY,
repair_id INT REFERENCES repairs(id) ON DELETE CASCADE,
photo_type VARCHAR(10) DEFAULT 'after' CHECK (photo_type IN ('before','after')),
file_path VARCHAR(255) NOT NULL,
uploaded_at TIMESTAMP DEFAULT NOW()
);
CREATE TABLE IF NOT EXISTS repair_costs (
id SERIAL PRIMARY KEY,
repair_id INT UNIQUE REFERENCES repairs(id) ON DELETE CASCADE,
root_cause TEXT,
admin_note TEXT,
cost_party_type VARCHAR(20)
CHECK (cost_party_type IN ('cpo','manufacturer','self','user','other')),
cost_party_manufacturer_id INT REFERENCES users(id),
cost_party_custom VARCHAR(100),
cost_amount INT DEFAULT 0,
cost_status VARCHAR(20) DEFAULT 'pending'
CHECK (cost_status IN ('pending','billed','waived','settled')),
reviewed_by INT REFERENCES users(id),
reviewed_at TIMESTAMP
);
CREATE TABLE IF NOT EXISTS improvements (
id SERIAL PRIMARY KEY,
title VARCHAR(200) NOT NULL,
category VARCHAR(20) NOT NULL
CHECK (category IN ('sw','hw','ui','firmware','other')),
description TEXT NOT NULL,
priority VARCHAR(10) DEFAULT 'normal'
CHECK (priority IN ('urgent','high','normal','low')),
part_name VARCHAR(100),
status VARCHAR(20) DEFAULT 'registered'
CHECK (status IN ('registered','reviewing','developing','deployed','done')),
manufacturer_id INT REFERENCES users(id),
created_by INT REFERENCES users(id),
sw_deploy_target DATE,
sw_deployed_at DATE,
manufacturer_memo TEXT,
created_at TIMESTAMP DEFAULT NOW()
);
CREATE TABLE IF NOT EXISTS improvement_reports (
improvement_id INT REFERENCES improvements(id) ON DELETE CASCADE,
report_id INT REFERENCES reports(id) ON DELETE CASCADE,
PRIMARY KEY (improvement_id, report_id)
);
CREATE TABLE IF NOT EXISTS improvement_attachments (
id SERIAL PRIMARY KEY,
improvement_id INT REFERENCES improvements(id) ON DELETE CASCADE,
file_path VARCHAR(255) NOT NULL,
file_name VARCHAR(255),
uploaded_at TIMESTAMP DEFAULT NOW()
);
CREATE TABLE IF NOT EXISTS improvement_logs (
id SERIAL PRIMARY KEY,
improvement_id INT REFERENCES improvements(id) ON DELETE CASCADE,
changed_by INT REFERENCES users(id),
old_status VARCHAR(20),
new_status VARCHAR(20),
memo TEXT,
changed_at TIMESTAMP DEFAULT NOW()
);
CREATE TABLE IF NOT EXISTS system_settings (
key VARCHAR(100) PRIMARY KEY,
value TEXT NOT NULL,
updated_at TIMESTAMP DEFAULT NOW()
);
-- ============================================================
-- 기본 데이터
-- ============================================================
INSERT INTO charger_types (name, description) VALUES
('완속충전기', 'AC 7kW 이하'),
('급속충전기', 'DC 50kW'),
('초급속충전기', 'DC 100kW 이상')
ON CONFLICT DO NOTHING;
INSERT INTO system_settings (key, value) VALUES
('report_visibility_policy', 'immediate')
ON CONFLICT DO NOTHING;
-- 초기 관리자 계정: admin / admin1234
INSERT INTO users (username, password_hash, role, name, is_active) VALUES
('admin', '$2b$12$ocMnUviG6lYZ4BP4Ut00KumPg/L73b82eJCEfrXUmwfFcFy3zfWDO', 'admin', '관리자', true)
ON CONFLICT DO NOTHING;

53
backend/main.py Normal file
View File

@@ -0,0 +1,53 @@
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from fastapi.staticfiles import StaticFiles
import os
from routers import auth_router, chargers, reports, repairs, costs, improvements, accounts, settings, export
app = FastAPI(title="EV 충전기 AS 관리 시스템", version="1.0.0")
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# 라우터 등록
app.include_router(auth_router.router)
app.include_router(chargers.router)
app.include_router(reports.router)
app.include_router(repairs.router)
app.include_router(costs.router)
app.include_router(improvements.router)
app.include_router(accounts.router)
app.include_router(settings.router)
app.include_router(export.router)
@app.get("/api/health")
def health():
return {"status": "ok"}
@app.get("/api/stats")
def stats(db=None):
from database import SessionLocal
from sqlalchemy import func
from models import Report, Repair, RepairCost, Improvement
db = SessionLocal()
try:
total = db.query(Report).count()
pending = db.query(Report).filter(Report.status.in_(["pending","pending_approval"])).count()
in_prog = db.query(Report).filter(Report.status == "in_progress").count()
done = db.query(Report).filter(Report.status == "done").count()
cost_pend = db.query(RepairCost).filter(RepairCost.cost_status == "pending").count()
imp_open = db.query(Improvement).filter(
Improvement.status.in_(["registered","reviewing","developing"])).count()
return {
"total": total, "pending": pending,
"in_progress": in_prog, "done": done,
"cost_pending": cost_pend, "improvement_open": imp_open,
}
finally:
db.close()

168
backend/models.py Normal file
View File

@@ -0,0 +1,168 @@
from sqlalchemy import Column, Integer, String, Text, Boolean, Float, Date, TIMESTAMP, ARRAY, ForeignKey, func
from sqlalchemy.orm import relationship
from database import Base
class ChargerType(Base):
__tablename__ = "charger_types"
id = Column(Integer, primary_key=True)
name = Column(String(100), nullable=False)
description = Column(Text)
created_at = Column(TIMESTAMP, server_default=func.now())
chargers = relationship("Charger", back_populates="charger_type")
class User(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True)
username = Column(String(50), unique=True, nullable=False)
password_hash = Column(String(255), nullable=False)
role = Column(String(20), nullable=False)
company = Column(String(100))
name = Column(String(50), nullable=False)
phone = Column(String(20))
email = Column(String(100))
is_active = Column(Boolean, default=True)
created_at = Column(TIMESTAMP, server_default=func.now())
class Charger(Base):
__tablename__ = "chargers"
id = Column(String(50), primary_key=True)
charger_type_id = Column(Integer, ForeignKey("charger_types.id"))
name = Column(String(100), nullable=False)
station_name = Column(String(100), nullable=False)
location_detail = Column(Text)
cpo_name = Column(String(100))
installed_at = Column(Date)
gps_lat = Column(Float)
gps_lng = Column(Float)
is_active = Column(Boolean, default=True)
created_at = Column(TIMESTAMP, server_default=func.now())
charger_type = relationship("ChargerType", back_populates="chargers")
reports = relationship("Report", back_populates="charger")
class Report(Base):
__tablename__ = "reports"
id = Column(Integer, primary_key=True)
charger_id = Column(String(50), ForeignKey("chargers.id"))
issue_types = Column(ARRAY(Text), nullable=False)
issue_detail = Column(Text)
error_code = Column(String(100))
occurred_at = Column(TIMESTAMP)
contact = Column(String(20))
consent = Column(Boolean, default=False)
gps_lat = Column(Float)
gps_lng = Column(Float)
status = Column(String(30), default="pending")
reported_at = Column(TIMESTAMP, server_default=func.now())
charger = relationship("Charger", back_populates="reports")
photos = relationship("ReportPhoto", back_populates="report", cascade="all, delete-orphan")
repair_links = relationship("RepairReport", back_populates="report")
class ReportPhoto(Base):
__tablename__ = "report_photos"
id = Column(Integer, primary_key=True)
report_id = Column(Integer, ForeignKey("reports.id", ondelete="CASCADE"))
file_path = Column(String(255), nullable=False)
uploaded_at = Column(TIMESTAMP, server_default=func.now())
report = relationship("Report", back_populates="photos")
class Repair(Base):
__tablename__ = "repairs"
id = Column(Integer, primary_key=True)
mechanic_id = Column(Integer, ForeignKey("users.id"))
repair_types = Column(ARRAY(Text), nullable=False)
description = Column(Text, nullable=False)
started_at = Column(TIMESTAMP, nullable=False)
completed_at = Column(TIMESTAMP)
result_status = Column(String(20), default="done")
mechanic = relationship("User", foreign_keys=[mechanic_id])
report_links = relationship("RepairReport", back_populates="repair", cascade="all, delete-orphan")
photos = relationship("RepairPhoto", back_populates="repair", cascade="all, delete-orphan")
cost = relationship("RepairCost", back_populates="repair", uselist=False)
class RepairReport(Base):
__tablename__ = "repair_reports"
repair_id = Column(Integer, ForeignKey("repairs.id", ondelete="CASCADE"), primary_key=True)
report_id = Column(Integer, ForeignKey("reports.id", ondelete="CASCADE"), primary_key=True)
repair = relationship("Repair", back_populates="report_links")
report = relationship("Report", back_populates="repair_links")
class RepairPhoto(Base):
__tablename__ = "repair_photos"
id = Column(Integer, primary_key=True)
repair_id = Column(Integer, ForeignKey("repairs.id", ondelete="CASCADE"))
photo_type = Column(String(10), default="after")
file_path = Column(String(255), nullable=False)
uploaded_at = Column(TIMESTAMP, server_default=func.now())
repair = relationship("Repair", back_populates="photos")
class RepairCost(Base):
__tablename__ = "repair_costs"
id = Column(Integer, primary_key=True)
repair_id = Column(Integer, ForeignKey("repairs.id", ondelete="CASCADE"), unique=True)
root_cause = Column(Text)
admin_note = Column(Text)
cost_party_type = Column(String(20))
cost_party_manufacturer_id = Column(Integer, ForeignKey("users.id"))
cost_party_custom = Column(String(100))
cost_amount = Column(Integer, default=0)
cost_status = Column(String(20), default="pending")
reviewed_by = Column(Integer, ForeignKey("users.id"))
reviewed_at = Column(TIMESTAMP)
repair = relationship("Repair", back_populates="cost")
reviewer = relationship("User", foreign_keys=[reviewed_by])
manufacturer = relationship("User", foreign_keys=[cost_party_manufacturer_id])
class Improvement(Base):
__tablename__ = "improvements"
id = Column(Integer, primary_key=True)
title = Column(String(200), nullable=False)
category = Column(String(20), nullable=False)
description = Column(Text, nullable=False)
priority = Column(String(10), default="normal")
part_name = Column(String(100))
status = Column(String(20), default="registered")
manufacturer_id = Column(Integer, ForeignKey("users.id"))
created_by = Column(Integer, ForeignKey("users.id"))
sw_deploy_target = Column(Date)
sw_deployed_at = Column(Date)
manufacturer_memo = Column(Text)
created_at = Column(TIMESTAMP, server_default=func.now())
manufacturer = relationship("User", foreign_keys=[manufacturer_id])
creator = relationship("User", foreign_keys=[created_by])
report_links = relationship("ImprovementReport", back_populates="improvement", cascade="all, delete-orphan")
attachments = relationship("ImprovementAttachment", back_populates="improvement", cascade="all, delete-orphan")
logs = relationship("ImprovementLog", back_populates="improvement", cascade="all, delete-orphan")
class ImprovementReport(Base):
__tablename__ = "improvement_reports"
improvement_id = Column(Integer, ForeignKey("improvements.id", ondelete="CASCADE"), primary_key=True)
report_id = Column(Integer, ForeignKey("reports.id", ondelete="CASCADE"), primary_key=True)
improvement = relationship("Improvement", back_populates="report_links")
report = relationship("Report")
class ImprovementAttachment(Base):
__tablename__ = "improvement_attachments"
id = Column(Integer, primary_key=True)
improvement_id = Column(Integer, ForeignKey("improvements.id", ondelete="CASCADE"))
file_path = Column(String(255), nullable=False)
file_name = Column(String(255))
uploaded_at = Column(TIMESTAMP, server_default=func.now())
improvement = relationship("Improvement", back_populates="attachments")
class ImprovementLog(Base):
__tablename__ = "improvement_logs"
id = Column(Integer, primary_key=True)
improvement_id = Column(Integer, ForeignKey("improvements.id", ondelete="CASCADE"))
changed_by = Column(Integer, ForeignKey("users.id"))
old_status = Column(String(20))
new_status = Column(String(20))
memo = Column(Text)
changed_at = Column(TIMESTAMP, server_default=func.now())
improvement = relationship("Improvement", back_populates="logs")
changer = relationship("User")
class SystemSetting(Base):
__tablename__ = "system_settings"
key = Column(String(100), primary_key=True)
value = Column(Text, nullable=False)
updated_at = Column(TIMESTAMP, server_default=func.now(), onupdate=func.now())

13
backend/requirements.txt Normal file
View File

@@ -0,0 +1,13 @@
fastapi==0.111.0
uvicorn[standard]==0.29.0
sqlalchemy==2.0.30
psycopg2-binary==2.9.9
python-jose[cryptography]==3.3.0
bcrypt==4.0.1
passlib[bcrypt]==1.7.4
python-multipart==0.0.9
qrcode[pil]==7.4.2
Pillow==10.3.0
openpyxl==3.1.2
python-dotenv==1.0.1
pydantic[email]==2.7.1

View File

View File

@@ -0,0 +1,77 @@
from fastapi import APIRouter, Depends, HTTPException, Form
from sqlalchemy.orm import Session
from typing import Optional
from database import get_db
import models
from auth import require_admin, hash_password, get_current_user
router = APIRouter(prefix="/api/accounts", tags=["accounts"])
@router.get("")
def list_users(role: Optional[str] = None, db: Session = Depends(get_db), _=Depends(require_admin)):
q = db.query(models.User)
if role: q = q.filter(models.User.role == role)
return [{
"id": u.id, "username": u.username, "role": u.role,
"company": u.company, "name": u.name, "phone": u.phone,
"email": u.email, "is_active": u.is_active,
"created_at": u.created_at.isoformat(),
} for u in q.order_by(models.User.id).all()]
@router.post("")
def create_user(
username: str = Form(...), password: str = Form(...),
role: str = Form(...), name: str = Form(...),
company: str = Form(""), phone: str = Form(""), email: str = Form(""),
db: Session = Depends(get_db), _=Depends(require_admin)
):
if db.query(models.User).filter_by(username=username).first():
raise HTTPException(400, "이미 존재하는 아이디입니다.")
u = models.User(
username=username, password_hash=hash_password(password),
role=role, name=name, company=company or None,
phone=phone or None, email=email or None
)
db.add(u); db.commit(); db.refresh(u)
return {"id": u.id, "username": u.username}
@router.put("/{user_id}")
def update_user(
user_id: int,
name: str = Form(...), company: str = Form(""),
phone: str = Form(""), email: str = Form(""),
is_active: bool = Form(True),
password: Optional[str] = Form(None),
db: Session = Depends(get_db), _=Depends(require_admin)
):
u = db.query(models.User).filter_by(id=user_id).first()
if not u: raise HTTPException(404)
u.name = name; u.company = company or None
u.phone = phone or None; u.email = email or None
u.is_active = is_active
if password: u.password_hash = hash_password(password)
db.commit()
return {"ok": True}
@router.delete("/{user_id}")
def delete_user(user_id: int, db: Session = Depends(get_db),
current_user: models.User = Depends(require_admin)):
if user_id == current_user.id:
raise HTTPException(400, "자신의 계정은 삭제할 수 없습니다.")
u = db.query(models.User).filter_by(id=user_id).first()
if not u: raise HTTPException(404)
u.is_active = False; db.commit()
return {"ok": True}
@router.patch("/me/password")
def change_my_password(
current_password: str = Form(...), new_password: str = Form(...),
db: Session = Depends(get_db),
current_user: models.User = Depends(get_current_user)
):
from auth import verify_password
if not verify_password(current_password, current_user.password_hash):
raise HTTPException(400, "현재 비밀번호가 올바르지 않습니다.")
current_user.password_hash = hash_password(new_password)
db.commit()
return {"ok": True}

View File

@@ -0,0 +1,35 @@
from fastapi import APIRouter, Depends, HTTPException
from fastapi.security import OAuth2PasswordRequestForm
from sqlalchemy.orm import Session
from database import get_db
import models
from auth import verify_password, create_access_token, get_current_user
router = APIRouter(prefix="/api/auth", tags=["auth"])
@router.post("/login")
def login(form: OAuth2PasswordRequestForm = Depends(), db: Session = Depends(get_db)):
user = db.query(models.User).filter(
models.User.username == form.username,
models.User.is_active == True
).first()
if not user or not verify_password(form.password, user.password_hash):
raise HTTPException(status_code=401, detail="아이디 또는 비밀번호가 올바르지 않습니다.")
token = create_access_token({"sub": str(user.id)})
return {
"access_token": token,
"token_type": "bearer",
"role": user.role,
"name": user.name,
"user_id": user.id
}
@router.get("/me")
def me(current_user: models.User = Depends(get_current_user)):
return {
"id": current_user.id,
"username": current_user.username,
"role": current_user.role,
"name": current_user.name,
"company": current_user.company,
}

126
backend/routers/chargers.py Normal file
View File

@@ -0,0 +1,126 @@
import os
from fastapi import APIRouter, Depends, HTTPException, UploadFile, File, Form
from fastapi.responses import JSONResponse
from sqlalchemy.orm import Session
from typing import Optional
from database import get_db
import models
from auth import require_admin, get_current_user
from utils import generate_qr
router = APIRouter(prefix="/api/chargers", tags=["chargers"])
# ── 충전기 종류 ──────────────────────────────────────
@router.get("/types")
def list_types(db: Session = Depends(get_db)):
types = db.query(models.ChargerType).order_by(models.ChargerType.id).all()
return [{"id": t.id, "name": t.name, "description": t.description,
"charger_count": len(t.chargers)} for t in types]
@router.post("/types")
def create_type(name: str = Form(...), description: str = Form(""),
db: Session = Depends(get_db), _=Depends(require_admin)):
t = models.ChargerType(name=name, description=description)
db.add(t); db.commit(); db.refresh(t)
return {"id": t.id, "name": t.name}
@router.put("/types/{type_id}")
def update_type(type_id: int, name: str = Form(...), description: str = Form(""),
db: Session = Depends(get_db), _=Depends(require_admin)):
t = db.query(models.ChargerType).filter_by(id=type_id).first()
if not t: raise HTTPException(404, "종류를 찾을 수 없습니다.")
t.name = name; t.description = description
db.commit()
return {"id": t.id, "name": t.name}
@router.delete("/types/{type_id}")
def delete_type(type_id: int, db: Session = Depends(get_db), _=Depends(require_admin)):
t = db.query(models.ChargerType).filter_by(id=type_id).first()
if not t: raise HTTPException(404)
if t.chargers: raise HTTPException(400, "해당 종류로 등록된 충전기가 있어 삭제할 수 없습니다.")
db.delete(t); db.commit()
return {"ok": True}
# ── 충전기 ──────────────────────────────────────────
@router.get("")
def list_chargers(db: Session = Depends(get_db)):
chargers = db.query(models.Charger).order_by(models.Charger.id).all()
result = []
for c in chargers:
pending = db.query(models.Report).filter(
models.Report.charger_id == c.id,
models.Report.status.in_(["pending", "in_progress"])
).count()
result.append({
"id": c.id, "name": c.name, "station_name": c.station_name,
"cpo_name": c.cpo_name, "location_detail": c.location_detail,
"installed_at": str(c.installed_at) if c.installed_at else None,
"gps_lat": c.gps_lat, "gps_lng": c.gps_lng, "is_active": c.is_active,
"charger_type": c.charger_type.name if c.charger_type else None,
"charger_type_id": c.charger_type_id,
"pending_reports": pending,
})
return result
@router.get("/{charger_id}")
def get_charger(charger_id: str, db: Session = Depends(get_db)):
c = db.query(models.Charger).filter_by(id=charger_id).first()
if not c: raise HTTPException(404, "충전기를 찾을 수 없습니다.")
return {
"id": c.id, "name": c.name, "station_name": c.station_name,
"cpo_name": c.cpo_name, "location_detail": c.location_detail,
"installed_at": str(c.installed_at) if c.installed_at else None,
"gps_lat": c.gps_lat, "gps_lng": c.gps_lng, "is_active": c.is_active,
"charger_type": c.charger_type.name if c.charger_type else None,
"charger_type_id": c.charger_type_id,
}
@router.post("")
def create_charger(
id: str = Form(...), charger_type_id: int = Form(...),
name: str = Form(...), station_name: str = Form(...),
location_detail: str = Form(""), cpo_name: str = Form(""),
installed_at: Optional[str] = Form(None),
gps_lat: Optional[float] = Form(None), gps_lng: Optional[float] = Form(None),
db: Session = Depends(get_db), _=Depends(require_admin)
):
if db.query(models.Charger).filter_by(id=id).first():
raise HTTPException(400, "이미 존재하는 충전기 ID입니다.")
c = models.Charger(
id=id, charger_type_id=charger_type_id, name=name,
station_name=station_name, location_detail=location_detail,
cpo_name=cpo_name, installed_at=installed_at or None,
gps_lat=gps_lat, gps_lng=gps_lng
)
db.add(c); db.commit()
domain = os.getenv("DOMAIN", "localhost")
qr_path = generate_qr(id, domain)
return {"id": c.id, "qr_path": qr_path}
@router.put("/{charger_id}")
def update_charger(
charger_id: str,
charger_type_id: int = Form(...), name: str = Form(...),
station_name: str = Form(...), location_detail: str = Form(""),
cpo_name: str = Form(""), installed_at: Optional[str] = Form(None),
gps_lat: Optional[float] = Form(None), gps_lng: Optional[float] = Form(None),
db: Session = Depends(get_db), _=Depends(require_admin)
):
c = db.query(models.Charger).filter_by(id=charger_id).first()
if not c: raise HTTPException(404)
c.charger_type_id = charger_type_id; c.name = name
c.station_name = station_name; c.location_detail = location_detail
c.cpo_name = cpo_name; c.installed_at = installed_at or None
c.gps_lat = gps_lat; c.gps_lng = gps_lng
db.commit()
domain = os.getenv("DOMAIN", "localhost")
qr_path = generate_qr(charger_id, domain)
return {"id": c.id, "qr_path": qr_path}
@router.post("/{charger_id}/qr")
def regenerate_qr(charger_id: str, db: Session = Depends(get_db), _=Depends(require_admin)):
c = db.query(models.Charger).filter_by(id=charger_id).first()
if not c: raise HTTPException(404)
domain = os.getenv("DOMAIN", "localhost")
qr_path = generate_qr(charger_id, domain)
return {"qr_path": qr_path}

97
backend/routers/costs.py Normal file
View File

@@ -0,0 +1,97 @@
from fastapi import APIRouter, Depends, HTTPException, Form
from sqlalchemy.orm import Session
from sqlalchemy import desc
from typing import Optional
from datetime import datetime
from database import get_db
import models
from auth import require_admin
router = APIRouter(prefix="/api/costs", tags=["costs"])
@router.get("")
def list_costs(
cost_status: Optional[str] = None,
cost_party_type: Optional[str] = None,
db: Session = Depends(get_db), _=Depends(require_admin)
):
q = db.query(models.RepairCost).join(models.Repair)
if cost_status: q = q.filter(models.RepairCost.cost_status == cost_status)
if cost_party_type: q = q.filter(models.RepairCost.cost_party_type == cost_party_type)
q = q.order_by(desc(models.RepairCost.reviewed_at))
result = []
for cost in q.all():
repair = cost.repair
rids = [rr.report_id for rr in repair.report_links]
charger_id, station_name, charger_type = None, None, None
if rids:
r = db.query(models.Report).filter_by(id=rids[0]).first()
if r and r.charger:
charger_id = r.charger_id
station_name = r.charger.station_name
charger_type = r.charger.charger_type.name if r.charger.charger_type else None
result.append({
"id": cost.id, "repair_id": cost.repair_id,
"report_ids": rids, "charger_id": charger_id,
"station_name": station_name, "charger_type": charger_type,
"mechanic_name": repair.mechanic.name if repair.mechanic else None,
"mechanic_company": repair.mechanic.company if repair.mechanic else None,
"completed_at": repair.completed_at.isoformat() if repair.completed_at else None,
"root_cause": cost.root_cause, "admin_note": cost.admin_note,
"cost_party_type": cost.cost_party_type,
"cost_party_custom": cost.cost_party_custom,
"cost_amount": cost.cost_amount, "cost_status": cost.cost_status,
"manufacturer_name": cost.manufacturer.name if cost.manufacturer else None,
"reviewed_by_name": cost.reviewer.name if cost.reviewer else None,
"reviewed_at": cost.reviewed_at.isoformat() if cost.reviewed_at else None,
})
return result
@router.get("/stats")
def cost_stats(db: Session = Depends(get_db), _=Depends(require_admin)):
from sqlalchemy import func, extract
now = datetime.now()
monthly = db.query(func.sum(models.RepairCost.cost_amount)).filter(
extract('year', models.RepairCost.reviewed_at) == now.year,
extract('month', models.RepairCost.reviewed_at) == now.month,
).scalar() or 0
pending = db.query(models.RepairCost).filter_by(cost_status="pending").count()
return {"monthly_total": monthly, "pending_count": pending}
@router.post("/repair/{repair_id}")
def upsert_cost(
repair_id: int,
root_cause: str = Form(""),
admin_note: str = Form(""),
cost_party_type: str = Form(...),
cost_party_manufacturer_id: Optional[int] = Form(None),
cost_party_custom: str = Form(""),
cost_amount: int = Form(0),
cost_status: str = Form("pending"),
db: Session = Depends(get_db),
current_user: models.User = Depends(require_admin)
):
repair = db.query(models.Repair).filter_by(id=repair_id).first()
if not repair: raise HTTPException(404, "조치 내역을 찾을 수 없습니다.")
cost = db.query(models.RepairCost).filter_by(repair_id=repair_id).first()
if cost:
cost.root_cause = root_cause; cost.admin_note = admin_note
cost.cost_party_type = cost_party_type
cost.cost_party_manufacturer_id = cost_party_manufacturer_id or None
cost.cost_party_custom = cost_party_custom or None
cost.cost_amount = cost_amount; cost.cost_status = cost_status
cost.reviewed_by = current_user.id; cost.reviewed_at = datetime.now()
else:
cost = models.RepairCost(
repair_id=repair_id, root_cause=root_cause, admin_note=admin_note,
cost_party_type=cost_party_type,
cost_party_manufacturer_id=cost_party_manufacturer_id or None,
cost_party_custom=cost_party_custom or None,
cost_amount=cost_amount, cost_status=cost_status,
reviewed_by=current_user.id, reviewed_at=datetime.now()
)
db.add(cost)
db.commit()
return {"ok": True}

245
backend/routers/export.py Normal file
View File

@@ -0,0 +1,245 @@
from fastapi import APIRouter, Depends
from fastapi.responses import StreamingResponse
from sqlalchemy.orm import Session
from sqlalchemy import desc
from io import BytesIO
from datetime import datetime
from urllib.parse import quote
import openpyxl
from openpyxl.styles import Font, PatternFill, Alignment, Border, Side
from database import get_db
import models
from auth import require_admin
router = APIRouter(prefix="/api/export", tags=["export"])
NAVY = "0B1E3D"
LIGHT = "D6EAF8"
def style_header(ws, headers, row=1):
bd = Side(style="thin", color="AAAAAA")
for col, h in enumerate(headers, 1):
cell = ws.cell(row=row, column=col, value=h)
cell.font = Font(bold=True, color="FFFFFF", size=11)
cell.fill = PatternFill("solid", fgColor=NAVY)
cell.alignment = Alignment(horizontal="center", vertical="center", wrap_text=True)
cell.border = Border(left=bd, right=bd, top=bd, bottom=bd)
ws.row_dimensions[row].height = 20
def style_row(ws, row_num, num_cols, even=True):
bd = Side(style="thin", color="DDDDDD")
for col in range(1, num_cols + 1):
cell = ws.cell(row=row_num, column=col)
if even:
cell.fill = PatternFill("solid", fgColor="F4F7FB")
cell.border = Border(left=bd, right=bd, top=bd, bottom=bd)
cell.alignment = Alignment(vertical="center", wrap_text=True)
def fmt_dt(dt):
return dt.strftime("%Y-%m-%d %H:%M") if dt else ""
def fmt_d(d):
return str(d) if d else ""
def elapsed(start, end):
if not start or not end: return ""
diff = end - start
total = int(diff.total_seconds())
h, m = divmod(total // 60, 60)
return f"{h}시간 {m}"
def make_response(wb: openpyxl.Workbook, korean_name: str) -> StreamingResponse:
"""엑셀 파일을 StreamingResponse로 반환 — 한글 파일명 URL 인코딩 처리"""
buf = BytesIO()
wb.save(buf)
buf.seek(0)
date_str = datetime.now().strftime("%Y%m%d_%H%M")
filename = f"{korean_name}_{date_str}.xlsx"
encoded = quote(filename, safe="") # 한글 URL 인코딩
cd_header = f"attachment; filename*=UTF-8''{encoded}"
return StreamingResponse(
buf,
media_type="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
headers={"Content-Disposition": cd_header},
)
# ─────────────────────────────────────────────
# 1. AS 신고 목록
# ─────────────────────────────────────────────
@router.get("/reports")
def export_reports(db: Session = Depends(get_db), _=Depends(require_admin)):
wb = openpyxl.Workbook()
ws = wb.active
ws.title = "AS신고목록"
ws.freeze_panes = "A2"
headers = [
"접수번호","충전기ID","충전기종류","충전기명","충전소명","CPO명","설치일",
"신고위치(위도)","신고위치(경도)","문제유형","에러코드","상세설명",
"신고자연락처","문제발생시각","신고일시","처리상태",
"담당정비사","정비사소속","조치유형","조치내용",
"조치시작","조치완료","작업소요시간","신고→완료소요시간",
"문제원인(관리자)","비고","출장비부담주체","출장비금액(원)","출장비상태",
"처리담당자","처리일시","연결개선항목번호"
]
style_header(ws, headers)
col_widths = [10,14,14,14,18,14,12,12,12,22,12,24,14,16,16,12,
12,14,16,24,16,16,12,18,24,24,16,12,12,12,16,18]
for i, w in enumerate(col_widths, 1):
ws.column_dimensions[ws.cell(1, i).column_letter].width = w
reports = db.query(models.Report).order_by(desc(models.Report.reported_at)).all()
for row_num, r in enumerate(reports, 2):
c = r.charger
repair = r.repair_links[0].repair if r.repair_links else None
cost = repair.cost if repair else None
imp_ids = [
ir.improvement_id
for ir in db.query(models.ImprovementReport).filter_by(report_id=r.id).all()
]
row_data = [
r.id,
r.charger_id,
c.charger_type.name if c and c.charger_type else "",
c.name if c else "",
c.station_name if c else "",
c.cpo_name if c else "",
fmt_d(c.installed_at) if c else "",
r.gps_lat or "",
r.gps_lng or "",
", ".join(r.issue_types) if r.issue_types else "",
r.error_code or "",
r.issue_detail or "",
r.contact or "",
fmt_dt(r.occurred_at),
fmt_dt(r.reported_at),
r.status,
repair.mechanic.name if repair and repair.mechanic else "",
repair.mechanic.company if repair and repair.mechanic else "",
", ".join(repair.repair_types) if repair and repair.repair_types else "",
repair.description if repair else "",
fmt_dt(repair.started_at) if repair else "",
fmt_dt(repair.completed_at) if repair else "",
elapsed(repair.started_at, repair.completed_at) if repair else "",
elapsed(r.occurred_at or r.reported_at, repair.completed_at if repair else None),
cost.root_cause if cost else "",
cost.admin_note if cost else "",
cost.cost_party_type if cost else "",
cost.cost_amount if cost else "",
cost.cost_status if cost else "",
cost.reviewer.name if cost and cost.reviewer else "",
fmt_dt(cost.reviewed_at) if cost else "",
", ".join(str(i) for i in imp_ids) if imp_ids else "",
]
for col, val in enumerate(row_data, 1):
ws.cell(row=row_num, column=col, value=val)
style_row(ws, row_num, len(headers), row_num % 2 == 0)
ws.row_dimensions[row_num].height = 16
return make_response(wb, "AS신고목록")
# ─────────────────────────────────────────────
# 2. 출장비 목록
# ─────────────────────────────────────────────
@router.get("/costs")
def export_costs(db: Session = Depends(get_db), _=Depends(require_admin)):
wb = openpyxl.Workbook()
ws = wb.active
ws.title = "출장비목록"
ws.freeze_panes = "A2"
headers = [
"신고번호","충전기ID","충전기종류","충전소명","조치완료일",
"정비사","소속","문제원인","비고",
"출장비부담주체","제조사명","금액(원)","처리상태",
"처리담당자","처리일시"
]
style_header(ws, headers)
for i, w in enumerate([10,14,14,18,16,12,14,24,24,16,16,12,12,12,16], 1):
ws.column_dimensions[ws.cell(1, i).column_letter].width = w
costs = db.query(models.RepairCost).join(models.Repair).order_by(
desc(models.RepairCost.reviewed_at)).all()
for row_num, cost in enumerate(costs, 2):
repair = cost.repair
rids = [rr.report_id for rr in repair.report_links]
charger_id = station_name = charger_type = ""
if rids:
r = db.query(models.Report).filter_by(id=rids[0]).first()
if r and r.charger:
charger_id = r.charger_id
station_name = r.charger.station_name
charger_type = r.charger.charger_type.name if r.charger.charger_type else ""
row_data = [
", ".join(str(i) for i in rids),
charger_id, charger_type, station_name,
fmt_dt(repair.completed_at),
repair.mechanic.name if repair.mechanic else "",
repair.mechanic.company if repair.mechanic else "",
cost.root_cause or "",
cost.admin_note or "",
cost.cost_party_type or "",
cost.manufacturer.company if cost.manufacturer else (cost.cost_party_custom or ""),
cost.cost_amount or 0,
cost.cost_status or "",
cost.reviewer.name if cost.reviewer else "",
fmt_dt(cost.reviewed_at),
]
for col, val in enumerate(row_data, 1):
ws.cell(row=row_num, column=col, value=val)
style_row(ws, row_num, len(headers), row_num % 2 == 0)
ws.row_dimensions[row_num].height = 16
return make_response(wb, "출장비목록")
# ─────────────────────────────────────────────
# 3. 개선항목 목록
# ─────────────────────────────────────────────
@router.get("/improvements")
def export_improvements(db: Session = Depends(get_db), _=Depends(require_admin)):
wb = openpyxl.Workbook()
ws = wb.active
ws.title = "개선항목목록"
ws.freeze_panes = "A2"
headers = [
"번호","제목","분류","우선순위","개선내용","관련부품",
"담당제조사","담당자","연락처","연결AS건수","연결AS번호",
"진행상태","SW배포목표일","SW실제배포일","제조사메모",
"등록관리자","등록일시"
]
style_header(ws, headers)
for i, w in enumerate([8,24,10,10,30,14,16,12,14,10,18,12,14,14,24,12,16], 1):
ws.column_dimensions[ws.cell(1, i).column_letter].width = w
imps = db.query(models.Improvement).order_by(desc(models.Improvement.created_at)).all()
for row_num, imp in enumerate(imps, 2):
rids = [ir.report_id for ir in imp.report_links]
row_data = [
imp.id, imp.title, imp.category, imp.priority,
imp.description, imp.part_name or "",
imp.manufacturer.company if imp.manufacturer else "",
imp.manufacturer.name if imp.manufacturer else "",
imp.manufacturer.phone if imp.manufacturer else "",
len(rids),
", ".join(str(i) for i in rids),
imp.status,
fmt_d(imp.sw_deploy_target),
fmt_d(imp.sw_deployed_at),
imp.manufacturer_memo or "",
imp.creator.name if imp.creator else "",
fmt_dt(imp.created_at),
]
for col, val in enumerate(row_data, 1):
ws.cell(row=row_num, column=col, value=val)
style_row(ws, row_num, len(headers), row_num % 2 == 0)
ws.row_dimensions[row_num].height = 16
return make_response(wb, "개선항목목록")

View File

@@ -0,0 +1,224 @@
from fastapi import APIRouter, Depends
from fastapi.responses import StreamingResponse
from sqlalchemy.orm import Session
from sqlalchemy import desc
from io import BytesIO
from datetime import datetime
import openpyxl
from openpyxl.styles import Font, PatternFill, Alignment, Border, Side
from database import get_db
import models
from auth import require_admin
router = APIRouter(prefix="/api/export", tags=["export"])
NAVY = "0B1E3D"
ACCENT = "00B4D8"
LIGHT = "D6EAF8"
def style_header(ws, headers, row=1):
for col, h in enumerate(headers, 1):
cell = ws.cell(row=row, column=col, value=h)
cell.font = Font(bold=True, color="FFFFFF", size=11)
cell.fill = PatternFill("solid", fgColor=NAVY)
cell.alignment = Alignment(horizontal="center", vertical="center", wrap_text=True)
bd = Side(style="thin", color="AAAAAA")
cell.border = Border(left=bd, right=bd, top=bd, bottom=bd)
ws.row_dimensions[row].height = 20
def style_row(ws, row_num, num_cols, even=True):
bd = Side(style="thin", color="DDDDDD")
for col in range(1, num_cols + 1):
cell = ws.cell(row=row_num, column=col)
if even:
cell.fill = PatternFill("solid", fgColor="F4F7FB")
cell.border = Border(left=bd, right=bd, top=bd, bottom=bd)
cell.alignment = Alignment(vertical="center", wrap_text=True)
def fmt_dt(dt):
return dt.strftime("%Y-%m-%d %H:%M") if dt else ""
def fmt_d(d):
return str(d) if d else ""
def elapsed(start, end):
if not start or not end: return ""
diff = end - start
total = int(diff.total_seconds())
h, m = divmod(total // 60, 60)
return f"{h}시간 {m}"
@router.get("/reports")
def export_reports(db: Session = Depends(get_db), _=Depends(require_admin)):
wb = openpyxl.Workbook()
ws = wb.active
ws.title = "AS신고목록"
ws.freeze_panes = "A2"
headers = [
"접수번호","충전기ID","충전기종류","충전기명","충전소명","CPO명","설치일",
"신고위치(위도)","신고위치(경도)","문제유형","에러코드","상세설명",
"신고자연락처","문제발생시각","신고일시","처리상태",
"담당정비사","정비사소속","조치유형","조치내용",
"조치시작","조치완료","작업시간","신고→완료소요시간",
"문제원인(관리자)","비고","출장비부담주체","출장비금액","출장비상태",
"처리담당자","처리일시","연결개선항목번호"
]
style_header(ws, headers)
col_widths = [10,14,14,14,18,14,12,12,12,22,12,24,14,16,16,12,
12,14,16,24,16,16,12,18,24,24,16,12,12,12,16,18]
for i, w in enumerate(col_widths, 1):
ws.column_dimensions[ws.cell(1, i).column_letter].width = w
reports = db.query(models.Report).order_by(desc(models.Report.reported_at)).all()
for row_num, r in enumerate(reports, 2):
c = r.charger
repair, cost = None, None
if r.repair_links:
repair = r.repair_links[0].repair
cost = repair.cost if repair else None
imp_ids = [ir.improvement_id for ir in
db.query(models.ImprovementReport).filter_by(report_id=r.id).all()]
row_data = [
r.id,
r.charger_id,
c.charger_type.name if c and c.charger_type else "",
c.name if c else "",
c.station_name if c else "",
c.cpo_name if c else "",
fmt_d(c.installed_at) if c else "",
r.gps_lat or "",
r.gps_lng or "",
", ".join(r.issue_types) if r.issue_types else "",
r.error_code or "",
r.issue_detail or "",
r.contact or "",
fmt_dt(r.occurred_at),
fmt_dt(r.reported_at),
r.status,
repair.mechanic.name if repair and repair.mechanic else "",
repair.mechanic.company if repair and repair.mechanic else "",
", ".join(repair.repair_types) if repair and repair.repair_types else "",
repair.description if repair else "",
fmt_dt(repair.started_at) if repair else "",
fmt_dt(repair.completed_at) if repair else "",
elapsed(repair.started_at, repair.completed_at) if repair else "",
elapsed(r.occurred_at or r.reported_at, repair.completed_at if repair else None),
cost.root_cause if cost else "",
cost.admin_note if cost else "",
cost.cost_party_type if cost else "",
cost.cost_amount if cost else "",
cost.cost_status if cost else "",
cost.reviewer.name if cost and cost.reviewer else "",
fmt_dt(cost.reviewed_at) if cost else "",
", ".join(str(i) for i in imp_ids) if imp_ids else "",
]
for col, val in enumerate(row_data, 1):
ws.cell(row=row_num, column=col, value=val)
style_row(ws, row_num, len(headers), row_num % 2 == 0)
ws.row_dimensions[row_num].height = 16
buf = BytesIO()
wb.save(buf)
buf.seek(0)
fname = f"AS신고목록_{datetime.now().strftime('%Y%m%d_%H%M')}.xlsx"
return StreamingResponse(buf, media_type="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
headers={"Content-Disposition": f"attachment; filename*=UTF-8''{fname}"})
@router.get("/costs")
def export_costs(db: Session = Depends(get_db), _=Depends(require_admin)):
wb = openpyxl.Workbook()
ws = wb.active
ws.title = "출장비목록"
ws.freeze_panes = "A2"
headers = ["신고번호","충전기ID","충전기종류","충전소명","조치완료일",
"정비사","소속","문제원인","비고",
"출장비부담주체","제조사명","금액(원)","처리상태",
"처리담당자","처리일시"]
style_header(ws, headers)
for i, w in enumerate([10,14,14,18,16,12,14,24,24,16,16,12,12,12,16], 1):
ws.column_dimensions[ws.cell(1, i).column_letter].width = w
costs = db.query(models.RepairCost).join(models.Repair).order_by(
desc(models.RepairCost.reviewed_at)).all()
for row_num, cost in enumerate(costs, 2):
repair = cost.repair
rids = [rr.report_id for rr in repair.report_links]
charger_id, station_name, charger_type = "", "", ""
if rids:
r = db.query(models.Report).filter_by(id=rids[0]).first()
if r and r.charger:
charger_id = r.charger_id
station_name = r.charger.station_name
charger_type = r.charger.charger_type.name if r.charger.charger_type else ""
row_data = [
", ".join(str(i) for i in rids),
charger_id, charger_type, station_name,
fmt_dt(repair.completed_at),
repair.mechanic.name if repair.mechanic else "",
repair.mechanic.company if repair.mechanic else "",
cost.root_cause or "", cost.admin_note or "",
cost.cost_party_type or "",
cost.manufacturer.company if cost.manufacturer else (cost.cost_party_custom or ""),
cost.cost_amount or 0, cost.cost_status or "",
cost.reviewer.name if cost.reviewer else "",
fmt_dt(cost.reviewed_at),
]
for col, val in enumerate(row_data, 1):
ws.cell(row=row_num, column=col, value=val)
style_row(ws, row_num, len(headers), row_num % 2 == 0)
ws.row_dimensions[row_num].height = 16
buf = BytesIO()
wb.save(buf)
buf.seek(0)
fname = f"출장비목록_{datetime.now().strftime('%Y%m%d_%H%M')}.xlsx"
return StreamingResponse(buf, media_type="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
headers={"Content-Disposition": f"attachment; filename*=UTF-8''{fname}"})
@router.get("/improvements")
def export_improvements(db: Session = Depends(get_db), _=Depends(require_admin)):
wb = openpyxl.Workbook()
ws = wb.active
ws.title = "개선항목목록"
ws.freeze_panes = "A2"
headers = ["번호","제목","분류","우선순위","개선내용","관련부품",
"담당제조사","담당자","연락처","연결AS건수","연결AS번호",
"진행상태","SW배포목표일","SW실제배포일","제조사메모",
"등록관리자","등록일시"]
style_header(ws, headers)
for i, w in enumerate([8,24,10,10,30,14,16,12,14,10,18,12,14,14,24,12,16], 1):
ws.column_dimensions[ws.cell(1, i).column_letter].width = w
imps = db.query(models.Improvement).order_by(desc(models.Improvement.created_at)).all()
for row_num, imp in enumerate(imps, 2):
rids = [ir.report_id for ir in imp.report_links]
row_data = [
imp.id, imp.title, imp.category, imp.priority, imp.description,
imp.part_name or "",
imp.manufacturer.company if imp.manufacturer else "",
imp.manufacturer.name if imp.manufacturer else "",
imp.manufacturer.phone if imp.manufacturer else "",
len(rids), ", ".join(str(i) for i in rids),
imp.status,
fmt_d(imp.sw_deploy_target), fmt_d(imp.sw_deployed_at),
imp.manufacturer_memo or "",
imp.creator.name if imp.creator else "",
fmt_dt(imp.created_at),
]
for col, val in enumerate(row_data, 1):
ws.cell(row=row_num, column=col, value=val)
style_row(ws, row_num, len(headers), row_num % 2 == 0)
ws.row_dimensions[row_num].height = 16
buf = BytesIO()
wb.save(buf)
buf.seek(0)
fname = f"개선항목목록_{datetime.now().strftime('%Y%m%d_%H%M')}.xlsx"
return StreamingResponse(buf, media_type="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
headers={"Content-Disposition": f"attachment; filename*=UTF-8''{fname}"})

View File

@@ -0,0 +1,108 @@
import json
from fastapi import APIRouter, Depends, HTTPException, UploadFile, File, Form
from sqlalchemy.orm import Session
from sqlalchemy import desc
from typing import List, Optional
from datetime import datetime
from database import get_db
import models
from auth import require_admin, require_manufacturer, get_current_user
from utils import save_upload
router = APIRouter(prefix="/api/improvements", tags=["improvements"])
def _fmt(imp: models.Improvement):
return {
"id": imp.id, "title": imp.title, "category": imp.category,
"description": imp.description, "priority": imp.priority,
"part_name": imp.part_name, "status": imp.status,
"manufacturer_id": imp.manufacturer_id,
"manufacturer_name": imp.manufacturer.name if imp.manufacturer else None,
"manufacturer_company": imp.manufacturer.company if imp.manufacturer else None,
"created_by_name": imp.creator.name if imp.creator else None,
"sw_deploy_target": str(imp.sw_deploy_target) if imp.sw_deploy_target else None,
"sw_deployed_at": str(imp.sw_deployed_at) if imp.sw_deployed_at else None,
"manufacturer_memo": imp.manufacturer_memo,
"created_at": imp.created_at.isoformat(),
"report_ids": [ir.report_id for ir in imp.report_links],
"report_count": len(imp.report_links),
"attachments": [{"path": a.file_path, "name": a.file_name} for a in imp.attachments],
"logs": [{"old": l.old_status, "new": l.new_status, "memo": l.memo,
"changed_at": l.changed_at.isoformat(),
"by": l.changer.name if l.changer else None} for l in imp.logs],
}
@router.get("")
def list_improvements(
status: Optional[str] = None, manufacturer_id: Optional[int] = None,
db: Session = Depends(get_db),
current_user: models.User = Depends(get_current_user)
):
q = db.query(models.Improvement).order_by(desc(models.Improvement.created_at))
if current_user.role == "manufacturer":
q = q.filter(models.Improvement.manufacturer_id == current_user.id)
if status: q = q.filter(models.Improvement.status == status)
if manufacturer_id: q = q.filter(models.Improvement.manufacturer_id == manufacturer_id)
return [_fmt(imp) for imp in q.all()]
@router.get("/{imp_id}")
def get_improvement(imp_id: int, db: Session = Depends(get_db),
current_user: models.User = Depends(get_current_user)):
imp = db.query(models.Improvement).filter_by(id=imp_id).first()
if not imp: raise HTTPException(404)
if current_user.role == "manufacturer" and imp.manufacturer_id != current_user.id:
raise HTTPException(403)
return _fmt(imp)
@router.post("")
async def create_improvement(
title: str = Form(...), category: str = Form(...),
description: str = Form(...), priority: str = Form("normal"),
part_name: str = Form(""), manufacturer_id: int = Form(...),
report_ids: str = Form("[]"),
sw_deploy_target: Optional[str] = Form(None),
attachments: List[UploadFile] = File(default=[]),
db: Session = Depends(get_db),
current_user: models.User = Depends(require_admin)
):
imp = models.Improvement(
title=title, category=category, description=description,
priority=priority, part_name=part_name or None,
manufacturer_id=manufacturer_id, created_by=current_user.id,
sw_deploy_target=sw_deploy_target or None,
)
db.add(imp); db.commit(); db.refresh(imp)
for rid in json.loads(report_ids):
db.add(models.ImprovementReport(improvement_id=imp.id, report_id=int(rid)))
for f in attachments:
if f.filename:
path = save_upload(f, f"improvements/{imp.id}")
db.add(models.ImprovementAttachment(improvement_id=imp.id, file_path=path, file_name=f.filename))
db.add(models.ImprovementLog(improvement_id=imp.id, changed_by=current_user.id,
old_status=None, new_status="registered", memo="개선항목 등록"))
db.commit()
return {"id": imp.id}
@router.patch("/{imp_id}/status")
def update_status(
imp_id: int, status: str = Form(...), memo: str = Form(""),
sw_deployed_at: Optional[str] = Form(None),
manufacturer_memo: str = Form(""),
db: Session = Depends(get_db),
current_user: models.User = Depends(get_current_user)
):
imp = db.query(models.Improvement).filter_by(id=imp_id).first()
if not imp: raise HTTPException(404)
if current_user.role == "manufacturer" and imp.manufacturer_id != current_user.id:
raise HTTPException(403)
old_status = imp.status
imp.status = status
if sw_deployed_at: imp.sw_deployed_at = sw_deployed_at
if manufacturer_memo: imp.manufacturer_memo = manufacturer_memo
db.add(models.ImprovementLog(improvement_id=imp.id, changed_by=current_user.id,
old_status=old_status, new_status=status, memo=memo))
db.commit()
return {"ok": True}

118
backend/routers/repairs.py Normal file
View File

@@ -0,0 +1,118 @@
import json
from fastapi import APIRouter, Depends, HTTPException, UploadFile, File, Form
from sqlalchemy.orm import Session
from sqlalchemy import desc
from typing import List, Optional
from datetime import datetime
from database import get_db
import models
from auth import require_mechanic, get_current_user
from utils import save_upload
router = APIRouter(prefix="/api/repairs", tags=["repairs"])
@router.get("/pending")
def pending_reports(db: Session = Depends(get_db),
current_user: models.User = Depends(get_current_user)):
"""정비사용: 처리 가능한 신고 목록 (pending / in_progress)"""
q = db.query(models.Report).filter(
models.Report.status.in_(["pending", "in_progress"])
).order_by(desc(models.Report.reported_at))
result = []
for r in q.all():
c = r.charger
result.append({
"id": r.id, "charger_id": r.charger_id,
"charger_name": c.name if c else None,
"station_name": c.station_name if c else None,
"charger_type": c.charger_type.name if c and c.charger_type else None,
"issue_types": r.issue_types, "status": r.status,
"reported_at": r.reported_at.isoformat(),
"occurred_at": r.occurred_at.isoformat() if r.occurred_at else None,
})
return result
@router.get("/charger/{charger_id}/open")
def open_reports_for_charger(charger_id: str, db: Session = Depends(get_db),
_=Depends(require_mechanic)):
"""특정 충전기의 미처리 신고 목록 (중복처리용)"""
reports = db.query(models.Report).filter(
models.Report.charger_id == charger_id,
models.Report.status.in_(["pending", "in_progress"])
).order_by(models.Report.reported_at).all()
return [{
"id": r.id, "issue_types": r.issue_types,
"issue_detail": r.issue_detail, "status": r.status,
"reported_at": r.reported_at.isoformat(),
"photos": [p.file_path for p in r.photos],
} for r in reports]
@router.post("")
async def create_repair(
report_ids: str = Form(...), # JSON 배열
repair_types: str = Form(...), # JSON 배열
description: str = Form(...),
result_status: str = Form("done"),
photos_before: List[UploadFile] = File(default=[]),
photos_after: List[UploadFile] = File(default=[]),
db: Session = Depends(get_db),
current_user: models.User = Depends(require_mechanic)
):
rids = json.loads(report_ids)
rtypes = json.loads(repair_types)
repair = models.Repair(
mechanic_id=current_user.id,
repair_types=rtypes,
description=description,
started_at=datetime.now(),
completed_at=datetime.now(),
result_status=result_status,
)
db.add(repair); db.commit(); db.refresh(repair)
# 신고 연결 및 상태 업데이트
for rid in rids:
r = db.query(models.Report).filter_by(id=rid).first()
if r:
new_status = "done" if result_status == "done" else (
"waiting" if result_status == "waiting" else "revisit"
)
r.status = new_status
db.add(models.RepairReport(repair_id=repair.id, report_id=rid))
# 사진 저장
for photo in photos_before:
if photo.filename:
path = save_upload(photo, f"repairs/{repair.id}")
db.add(models.RepairPhoto(repair_id=repair.id, photo_type="before", file_path=path))
for photo in photos_after:
if photo.filename:
path = save_upload(photo, f"repairs/{repair.id}")
db.add(models.RepairPhoto(repair_id=repair.id, photo_type="after", file_path=path))
db.commit()
return {"id": repair.id}
@router.get("/my")
def my_repairs(db: Session = Depends(get_db),
current_user: models.User = Depends(require_mechanic)):
repairs = db.query(models.Repair).filter_by(
mechanic_id=current_user.id
).order_by(desc(models.Repair.completed_at)).limit(50).all()
result = []
for repair in repairs:
rids = [rr.report_id for rr in repair.report_links]
charger_id = None
if rids:
r = db.query(models.Report).filter_by(id=rids[0]).first()
if r: charger_id = r.charger_id
result.append({
"id": repair.id, "charger_id": charger_id,
"repair_types": repair.repair_types,
"result_status": repair.result_status,
"started_at": repair.started_at.isoformat(),
"completed_at": repair.completed_at.isoformat() if repair.completed_at else None,
"report_count": len(rids),
})
return result

183
backend/routers/reports.py Normal file
View File

@@ -0,0 +1,183 @@
from fastapi import APIRouter, Depends, HTTPException, UploadFile, File, Form
from sqlalchemy.orm import Session
from sqlalchemy import desc
from typing import List, Optional
from datetime import datetime
from database import get_db
import models
from auth import require_admin, get_current_user
from utils import save_upload
router = APIRouter(prefix="/api/reports", tags=["reports"])
def _fmt_report(r: models.Report, db: Session):
c = r.charger
repair_id = None
if r.repair_links:
repair_id = r.repair_links[0].repair_id
return {
"id": r.id, "charger_id": r.charger_id,
"charger_name": c.name if c else None,
"station_name": c.station_name if c else None,
"cpo_name": c.cpo_name if c else None,
"charger_type": c.charger_type.name if c and c.charger_type else None,
"installed_at": str(c.installed_at) if c and c.installed_at else None,
"issue_types": r.issue_types, "issue_detail": r.issue_detail,
"error_code": r.error_code, "contact": r.contact,
"occurred_at": r.occurred_at.isoformat() if r.occurred_at else None,
"reported_at": r.reported_at.isoformat() if r.reported_at else None,
"gps_lat": r.gps_lat, "gps_lng": r.gps_lng,
"status": r.status,
"photos": [p.file_path for p in r.photos],
"repair_id": repair_id,
}
@router.post("")
async def create_report(
charger_id: str = Form(...),
issue_types: str = Form(...), # JSON 배열 문자열
issue_detail: str = Form(""),
error_code: str = Form(""),
occurred_at: Optional[str] = Form(None),
contact: str = Form(""),
consent: bool = Form(False),
gps_lat: Optional[float] = Form(None),
gps_lng: Optional[float] = Form(None),
photos: List[UploadFile] = File(default=[]),
db: Session = Depends(get_db)
):
import json
charger = db.query(models.Charger).filter_by(id=charger_id).first()
if not charger: raise HTTPException(404, "충전기를 찾을 수 없습니다.")
# 신고 공개 정책 확인
setting = db.query(models.SystemSetting).filter_by(key="report_visibility_policy").first()
policy = setting.value if setting else "immediate"
initial_status = "pending_approval" if policy == "admin_approval" else "pending"
issue_list = json.loads(issue_types) if isinstance(issue_types, str) else issue_types
r = models.Report(
charger_id=charger_id, issue_types=issue_list,
issue_detail=issue_detail or None, error_code=error_code or None,
occurred_at=datetime.fromisoformat(occurred_at) if occurred_at else None,
contact=contact or None, consent=consent,
gps_lat=gps_lat, gps_lng=gps_lng, status=initial_status
)
db.add(r); db.commit(); db.refresh(r)
for photo in photos:
if photo.filename:
path = save_upload(photo, f"reports/{r.id}")
db.add(models.ReportPhoto(report_id=r.id, file_path=path))
db.commit()
return {"id": r.id, "status": r.status}
@router.get("")
def list_reports(
status: Optional[str] = None,
charger_id: Optional[str] = None,
db: Session = Depends(get_db),
current_user: models.User = Depends(get_current_user)
):
q = db.query(models.Report).order_by(desc(models.Report.reported_at))
if status: q = q.filter(models.Report.status == status)
if charger_id: q = q.filter(models.Report.charger_id == charger_id)
# 정비사는 공개된 것만 (승인 대기 제외)
if current_user.role == "mechanic":
q = q.filter(models.Report.status != "pending_approval")
return [_fmt_report(r, db) for r in q.all()]
@router.get("/{report_id}")
def get_report(report_id: int, db: Session = Depends(get_db),
current_user: models.User = Depends(get_current_user)):
r = db.query(models.Report).filter_by(id=report_id).first()
if not r: raise HTTPException(404)
result = _fmt_report(r, db)
# 수리 정보 포함
if r.repair_links:
repair = r.repair_links[0].repair
cost = repair.cost
result["repair"] = {
"id": repair.id,
"mechanic_name": repair.mechanic.name if repair.mechanic else None,
"mechanic_company": repair.mechanic.company if repair.mechanic else None,
"repair_types": repair.repair_types,
"description": repair.description,
"started_at": repair.started_at.isoformat(),
"completed_at": repair.completed_at.isoformat() if repair.completed_at else None,
"result_status": repair.result_status,
"photos_before": [p.file_path for p in repair.photos if p.photo_type == "before"],
"photos_after": [p.file_path for p in repair.photos if p.photo_type == "after"],
"cost": {
"root_cause": cost.root_cause,
"admin_note": cost.admin_note,
"cost_party_type": cost.cost_party_type,
"cost_party_custom": cost.cost_party_custom,
"cost_amount": cost.cost_amount,
"cost_status": cost.cost_status,
"manufacturer_name": cost.manufacturer.name if cost.manufacturer else None,
} if cost else None
}
return result
@router.patch("/{report_id}/approve")
def approve_report(report_id: int, db: Session = Depends(get_db), _=Depends(require_admin)):
r = db.query(models.Report).filter_by(id=report_id).first()
if not r: raise HTTPException(404)
r.status = "pending"; db.commit()
return {"ok": True}
@router.patch("/{report_id}/status")
def update_status(report_id: int, status: str = Form(...),
db: Session = Depends(get_db), _=Depends(require_admin)):
r = db.query(models.Report).filter_by(id=report_id).first()
if not r: raise HTTPException(404)
r.status = status; db.commit()
return {"ok": True}
# ── 공개 엔드포인트 — 인증 없이 특정 충전기의 진행 중 신고 조회 ──
# QR 신고 페이지에서 기존 접수 현황을 사용자에게 보여줄 때 사용
@router.get("/public/{charger_id}")
def public_charger_reports(charger_id: str, db: Session = Depends(get_db)):
"""
해당 충전기에서 아직 해결되지 않은 신고 목록을 반환.
완료(done) / 면제 · 정산 상태는 제외하고 진행 중인 것만 반환.
개인정보(연락처) 는 반환하지 않음.
"""
active_statuses = ["pending_approval", "pending", "in_progress", "waiting", "revisit"]
rows = (
db.query(models.Report)
.filter(
models.Report.charger_id == charger_id,
models.Report.status.in_(active_statuses),
)
.order_by(models.Report.reported_at.desc())
.limit(20)
.all()
)
STATUS_LABEL = {
"pending_approval": "검토 대기",
"pending": "접수 완료",
"in_progress": "처리 중",
"waiting": "부품 대기",
"revisit": "재방문 예정",
}
result = []
for r in rows:
repair = r.repair_links[0].repair if r.repair_links else None
result.append({
"id": r.id,
"issue_types": r.issue_types,
"issue_detail": r.issue_detail or "",
"status": r.status,
"status_label": STATUS_LABEL.get(r.status, r.status),
"reported_at": r.reported_at.isoformat() if r.reported_at else "",
"occurred_at": r.occurred_at.isoformat() if r.occurred_at else "",
"photo_count": len(r.photos),
"mechanic_name": repair.mechanic.name if repair and repair.mechanic else None,
"started_at": repair.started_at.isoformat() if repair and repair.started_at else None,
})
return result

View File

@@ -0,0 +1,136 @@
from fastapi import APIRouter, Depends, HTTPException, UploadFile, File, Form
from sqlalchemy.orm import Session
from sqlalchemy import desc
from typing import List, Optional
from datetime import datetime
from database import get_db
import models
from auth import require_admin, get_current_user
from utils import save_upload
router = APIRouter(prefix="/api/reports", tags=["reports"])
def _fmt_report(r: models.Report, db: Session):
c = r.charger
repair_id = None
if r.repair_links:
repair_id = r.repair_links[0].repair_id
return {
"id": r.id, "charger_id": r.charger_id,
"charger_name": c.name if c else None,
"station_name": c.station_name if c else None,
"cpo_name": c.cpo_name if c else None,
"charger_type": c.charger_type.name if c and c.charger_type else None,
"installed_at": str(c.installed_at) if c and c.installed_at else None,
"issue_types": r.issue_types, "issue_detail": r.issue_detail,
"error_code": r.error_code, "contact": r.contact,
"occurred_at": r.occurred_at.isoformat() if r.occurred_at else None,
"reported_at": r.reported_at.isoformat() if r.reported_at else None,
"gps_lat": r.gps_lat, "gps_lng": r.gps_lng,
"status": r.status,
"photos": [p.file_path for p in r.photos],
"repair_id": repair_id,
}
@router.post("")
async def create_report(
charger_id: str = Form(...),
issue_types: str = Form(...), # JSON 배열 문자열
issue_detail: str = Form(""),
error_code: str = Form(""),
occurred_at: Optional[str] = Form(None),
contact: str = Form(""),
consent: bool = Form(False),
gps_lat: Optional[float] = Form(None),
gps_lng: Optional[float] = Form(None),
photos: List[UploadFile] = File(default=[]),
db: Session = Depends(get_db)
):
import json
charger = db.query(models.Charger).filter_by(id=charger_id).first()
if not charger: raise HTTPException(404, "충전기를 찾을 수 없습니다.")
# 신고 공개 정책 확인
setting = db.query(models.SystemSetting).filter_by(key="report_visibility_policy").first()
policy = setting.value if setting else "immediate"
initial_status = "pending_approval" if policy == "admin_approval" else "pending"
issue_list = json.loads(issue_types) if isinstance(issue_types, str) else issue_types
r = models.Report(
charger_id=charger_id, issue_types=issue_list,
issue_detail=issue_detail or None, error_code=error_code or None,
occurred_at=datetime.fromisoformat(occurred_at) if occurred_at else None,
contact=contact or None, consent=consent,
gps_lat=gps_lat, gps_lng=gps_lng, status=initial_status
)
db.add(r); db.commit(); db.refresh(r)
for photo in photos:
if photo.filename:
path = save_upload(photo, f"reports/{r.id}")
db.add(models.ReportPhoto(report_id=r.id, file_path=path))
db.commit()
return {"id": r.id, "status": r.status}
@router.get("")
def list_reports(
status: Optional[str] = None,
charger_id: Optional[str] = None,
db: Session = Depends(get_db),
current_user: models.User = Depends(get_current_user)
):
q = db.query(models.Report).order_by(desc(models.Report.reported_at))
if status: q = q.filter(models.Report.status == status)
if charger_id: q = q.filter(models.Report.charger_id == charger_id)
# 정비사는 공개된 것만 (승인 대기 제외)
if current_user.role == "mechanic":
q = q.filter(models.Report.status != "pending_approval")
return [_fmt_report(r, db) for r in q.all()]
@router.get("/{report_id}")
def get_report(report_id: int, db: Session = Depends(get_db),
current_user: models.User = Depends(get_current_user)):
r = db.query(models.Report).filter_by(id=report_id).first()
if not r: raise HTTPException(404)
result = _fmt_report(r, db)
# 수리 정보 포함
if r.repair_links:
repair = r.repair_links[0].repair
cost = repair.cost
result["repair"] = {
"id": repair.id,
"mechanic_name": repair.mechanic.name if repair.mechanic else None,
"mechanic_company": repair.mechanic.company if repair.mechanic else None,
"repair_types": repair.repair_types,
"description": repair.description,
"started_at": repair.started_at.isoformat(),
"completed_at": repair.completed_at.isoformat() if repair.completed_at else None,
"result_status": repair.result_status,
"photos_before": [p.file_path for p in repair.photos if p.photo_type == "before"],
"photos_after": [p.file_path for p in repair.photos if p.photo_type == "after"],
"cost": {
"root_cause": cost.root_cause,
"admin_note": cost.admin_note,
"cost_party_type": cost.cost_party_type,
"cost_party_custom": cost.cost_party_custom,
"cost_amount": cost.cost_amount,
"cost_status": cost.cost_status,
"manufacturer_name": cost.manufacturer.name if cost.manufacturer else None,
} if cost else None
}
return result
@router.patch("/{report_id}/approve")
def approve_report(report_id: int, db: Session = Depends(get_db), _=Depends(require_admin)):
r = db.query(models.Report).filter_by(id=report_id).first()
if not r: raise HTTPException(404)
r.status = "pending"; db.commit()
return {"ok": True}
@router.patch("/{report_id}/status")
def update_status(report_id: int, status: str = Form(...),
db: Session = Depends(get_db), _=Depends(require_admin)):
r = db.query(models.Report).filter_by(id=report_id).first()
if not r: raise HTTPException(404)
r.status = status; db.commit()
return {"ok": True}

View File

@@ -0,0 +1,69 @@
from fastapi import APIRouter, Depends, Form
from sqlalchemy.orm import Session
from datetime import datetime
from typing import Optional
from database import get_db
import models
from auth import require_admin
router = APIRouter(prefix="/api/settings", tags=["settings"])
# 이미지 설정 기본값
IMAGE_DEFAULTS = {
"image_compress_enabled": "true",
"image_max_px": "1024",
"image_quality": "85",
}
def upsert(db, key, value):
s = db.query(models.SystemSetting).filter_by(key=key).first()
if s:
s.value = value
s.updated_at = datetime.now()
else:
db.add(models.SystemSetting(key=key, value=value))
# ── 공개 엔드포인트: 이미지 설정만 반환 (인증 불필요 — 신고 페이지에서 사용)
@router.get("/public")
def get_public_settings(db: Session = Depends(get_db)):
rows = db.query(models.SystemSetting).filter(
models.SystemSetting.key.in_(IMAGE_DEFAULTS.keys())
).all()
result = dict(IMAGE_DEFAULTS) # 기본값으로 채운 뒤
for r in rows:
result[r.key] = r.value # DB 값으로 덮어쓰기
return {
"image_compress_enabled": result["image_compress_enabled"] == "true",
"image_max_px": int(result["image_max_px"]),
"image_quality": int(result["image_quality"]),
}
# ── 관리자 전체 설정 조회
@router.get("")
def get_settings(db: Session = Depends(get_db), _=Depends(require_admin)):
rows = db.query(models.SystemSetting).all()
result = dict(IMAGE_DEFAULTS)
for r in rows:
result[r.key] = r.value
return result
# ── 관리자 설정 저장 (신고공개정책 + 이미지설정 통합)
@router.put("")
def update_settings(
report_visibility_policy: str = Form(...),
image_compress_enabled: str = Form("true"),
image_max_px: str = Form("1024"),
image_quality: str = Form("85"),
db: Session = Depends(get_db),
_ = Depends(require_admin)
):
pairs = [
("report_visibility_policy", report_visibility_policy),
("image_compress_enabled", image_compress_enabled),
("image_max_px", image_max_px),
("image_quality", image_quality),
]
for key, val in pairs:
upsert(db, key, val)
db.commit()
return {"ok": True}

View File

@@ -0,0 +1,29 @@
from fastapi import APIRouter, Depends, Form
from sqlalchemy.orm import Session
from datetime import datetime
from database import get_db
import models
from auth import require_admin
router = APIRouter(prefix="/api/settings", tags=["settings"])
@router.get("")
def get_settings(db: Session = Depends(get_db), _=Depends(require_admin)):
settings = db.query(models.SystemSetting).all()
return {s.key: s.value for s in settings}
@router.put("")
def update_settings(
report_visibility_policy: str = Form(...),
db: Session = Depends(get_db),
_=Depends(require_admin)
):
for key, value in [("report_visibility_policy", report_visibility_policy)]:
s = db.query(models.SystemSetting).filter_by(key=key).first()
if s:
s.value = value
s.updated_at = datetime.now()
else:
db.add(models.SystemSetting(key=key, value=value))
db.commit()
return {"ok": True}

29
backend/utils.py Normal file
View File

@@ -0,0 +1,29 @@
import os, uuid, qrcode
from PIL import Image
from fastapi import UploadFile
UPLOAD_DIR = os.getenv("UPLOAD_DIR", "/uploads")
def save_upload(file: UploadFile, sub_dir: str = "general") -> str:
"""파일을 저장하고 /uploads 기준 상대 경로 반환"""
ext = os.path.splitext(file.filename or "file")[1].lower() or ".jpg"
folder = os.path.join(UPLOAD_DIR, sub_dir)
os.makedirs(folder, exist_ok=True)
filename = f"{uuid.uuid4().hex}{ext}"
filepath = os.path.join(folder, filename)
with open(filepath, "wb") as f:
f.write(file.file.read())
return f"/uploads/{sub_dir}/{filename}"
def generate_qr(charger_id: str, domain: str) -> str:
"""QR 이미지를 저장하고 /uploads 기준 경로 반환"""
url = f"https://{domain}/report/{charger_id}"
qr = qrcode.QRCode(version=1, box_size=10, border=4)
qr.add_data(url)
qr.make(fit=True)
img = qr.make_image(fill_color="black", back_color="white")
folder = os.path.join(UPLOAD_DIR, "qr")
os.makedirs(folder, exist_ok=True)
filepath = os.path.join(folder, f"{charger_id}.png")
img.save(filepath)
return f"/uploads/qr/{charger_id}.png"