feat: OpenRouter 외부 AI 연동 (STT 교정 + OCR Vision)

This commit is contained in:
root
2026-04-28 15:38:06 +09:00
parent f9075ae3f6
commit f35fe1143a
5 changed files with 667 additions and 299 deletions

View File

@@ -32,11 +32,16 @@ AUDIO_EXT = {"mp3","mp4","wav","m4a","ogg","flac","aac","wma","webm","mkv","avi"
IMAGE_EXT = {"jpg","jpeg","png","bmp","tiff","tif","webp","gif"}
_DEFAULT_SETTINGS = {
"stt_ollama_model": "",
"ocr_ollama_model": "granite3.2-vision:latest",
"cpu_threads": 0,
"stt_timeout": 0,
"ollama_timeout": 600,
"stt_ollama_model": "",
"ocr_ollama_model": "granite3.2-vision:latest",
"cpu_threads": 0,
"stt_timeout": 0,
"ollama_timeout": 600,
# OpenRouter
"openrouter_url": "https://openrouter.ai/api/v1",
"openrouter_api_key": "",
"openrouter_stt_model": "",
"openrouter_ocr_model": "",
}
_hist_lock = threading.Lock()
@@ -50,7 +55,8 @@ def _load_settings() -> dict:
def _save_settings(data: dict):
SETTINGS_FILE.parent.mkdir(parents=True, exist_ok=True)
with open(SETTINGS_FILE, "w", encoding="utf-8") as f: json.dump(data, f, ensure_ascii=False, indent=2)
with open(SETTINGS_FILE, "w", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False, indent=2)
# ── 이력 I/O ─────────────────────────────────────────────────
@@ -77,19 +83,15 @@ def append_history(record: dict):
except: pass
def _update_history_by_task(task_id: str, result: dict, success: bool, error_msg: str = ""):
"""task_id로 이력을 찾아 결과 업데이트 — 핵심 버그 수정"""
with _hist_lock:
if not HISTORY_FILE.exists(): return
try:
with open(HISTORY_FILE, "r", encoding="utf-8") as f: history = json.load(f)
for h in history:
# task_id 필드로 매칭
if h.get("task_id") != task_id: continue
if h.get("status") != "processing": break
if not success:
h["status"] = "failed"
h["output"] = {"error": error_msg[:300]}
break
h["status"] = "failed"; h["output"] = {"error": error_msg[:300]}; break
h["status"] = "success"
if h["type"] == "stt":
h["output"] = {
@@ -97,12 +99,14 @@ def _update_history_by_task(task_id: str, result: dict, success: bool, error_msg
"language": result.get("language", ""),
"duration_s": result.get("duration", 0),
"segments": len(result.get("segments", [])),
"text_preview": result.get("text", "")[:200] + ("" if len(result.get("text",""))>200 else ""),
"text_preview": result.get("text","")[:200] + ("" if len(result.get("text",""))>200 else ""),
"ollama_used": result.get("ollama_used", False),
"ollama_model": result.get("ollama_model", ""),
"openrouter_used": result.get("openrouter_used", False),
"openrouter_model": result.get("openrouter_model", ""),
}
else:
full_text = result.get("full_text", "")
ft = result.get("full_text", "")
h["output"] = {
"txt_file": result.get("txt_file", ""),
"xlsx_file": result.get("xlsx_file", ""),
@@ -110,7 +114,8 @@ def _update_history_by_task(task_id: str, result: dict, success: bool, error_msg
"table_count": len(result.get("tables", [])),
"backend": result.get("backend", ""),
"ollama_model": result.get("ollama_model", ""),
"text_preview": full_text[:200] + ("" if len(full_text)>200 else ""),
"openrouter_model": result.get("openrouter_model", ""),
"text_preview": ft[:200] + ("" if len(ft)>200 else ""),
}
break
_write_history(history)
@@ -132,12 +137,11 @@ def clear_history():
# ════════════════════════════════════════════════════════════════
# 시작 이벤트
# 시작
# ════════════════════════════════════════════════════════════════
@app.on_event("startup")
async def on_startup():
init_users()
_cleanup_outputs()
init_users(); _cleanup_outputs()
# ════════════════════════════════════════════════════════════════
@@ -183,7 +187,10 @@ def system_info(user: dict = Depends(require_auth)):
@app.post("/api/transcribe")
async def transcribe(
request: Request, file: UploadFile = File(...),
use_ollama: str = Form("false"), ollama_model: str = Form(""),
use_ollama: str = Form("false"),
ollama_model: str = Form(""),
use_openrouter: str = Form("false"),
openrouter_model: str = Form(""),
user: dict = Depends(require_stt),
):
_check_size(request)
@@ -193,29 +200,39 @@ async def transcribe(
save_path = os.path.join(UPLOAD_DIR, f"{file_id}.{ext}")
await _save(file, save_path)
file_size = os.path.getsize(save_path)
_use_ollama = use_ollama.lower() == "true"
s = _load_settings()
if _use_ollama and not ollama_model.strip(): ollama_model = s.get("stt_ollama_model", "")
_use_ollama = use_ollama.lower() == "true"
_use_openrouter = use_openrouter.lower() == "true"
task = transcribe_task.delay(file_id, save_path, _use_ollama, ollama_model)
if _use_ollama and not ollama_model.strip():
ollama_model = s.get("stt_ollama_model", "")
if _use_openrouter and not openrouter_model.strip():
openrouter_model = s.get("openrouter_stt_model", "")
task = transcribe_task.delay(
file_id, save_path,
_use_ollama, ollama_model,
_use_openrouter, openrouter_model,
s.get("openrouter_url", ""), s.get("openrouter_api_key", ""),
)
# ★ task_id를 이력에 함께 저장
append_history({
"id": file_id,
"task_id": task.id, # ← 업데이트 매칭 키
"type": "stt",
"status": "processing",
"id": file_id, "task_id": task.id, "type": "stt",
"status": "processing",
"timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"username": user["username"],
"input": {"filename": file.filename, "size_bytes": file_size, "format": ext.upper()},
"settings": {
"model": os.getenv("WHISPER_MODEL", "medium"),
"language": os.getenv("WHISPER_LANGUAGE", "ko"),
"compute_type": os.getenv("WHISPER_COMPUTE_TYPE", "int8"),
"cpu_threads": s.get("cpu_threads", 0),
"stt_timeout": s.get("stt_timeout", 0),
"use_ollama": _use_ollama,
"ollama_model": ollama_model if _use_ollama else "",
"username": user["username"],
"input": {"filename": file.filename, "size_bytes": file_size, "format": ext.upper()},
"settings": {
"model": os.getenv("WHISPER_MODEL","medium"),
"language": os.getenv("WHISPER_LANGUAGE","ko"),
"compute_type": os.getenv("WHISPER_COMPUTE_TYPE","int8"),
"cpu_threads": s.get("cpu_threads",0),
"stt_timeout": s.get("stt_timeout",0),
"use_ollama": _use_ollama,
"ollama_model": ollama_model if _use_ollama else "",
"use_openrouter": _use_openrouter,
"openrouter_model": openrouter_model if _use_openrouter else "",
},
"output": None,
})
@@ -228,211 +245,297 @@ async def transcribe(
@app.post("/api/ocr")
async def ocr(
request: Request, file: UploadFile = File(...),
mode: str = Form("text"), backend: str = Form("paddle"),
ollama_model: str = Form(""), custom_prompt: str = Form(""),
mode: str = Form("text"),
backend: str = Form("paddle"), # paddle | ollama | openrouter
ollama_model: str = Form(""),
openrouter_model: str = Form(""),
custom_prompt: str = Form(""),
user: dict = Depends(require_ocr),
):
_check_size(request)
ext = _ext(file.filename)
if ext not in IMAGE_EXT: raise HTTPException(400, f"지원하지 않는 형식: {', '.join(sorted(IMAGE_EXT))}")
if mode not in ("text","structure"): mode = "text"
if backend not in ("paddle","ollama"): backend = "paddle"
if backend not in ("paddle","ollama","openrouter"): backend = "paddle"
s = _load_settings()
if backend == "ollama" and not ollama_model.strip(): ollama_model = s.get("ocr_ollama_model","granite3.2-vision:latest")
if backend == "ollama" and not ollama_model.strip():
ollama_model = s.get("ocr_ollama_model","granite3.2-vision:latest")
if backend == "openrouter" and not openrouter_model.strip():
openrouter_model = s.get("openrouter_ocr_model","")
file_id = str(uuid.uuid4())
save_path = os.path.join(UPLOAD_DIR, f"{file_id}.{ext}")
await _save(file, save_path)
file_size = os.path.getsize(save_path)
task = ocr_task.delay(file_id, save_path, mode, backend, ollama_model, custom_prompt)
task = ocr_task.delay(
file_id, save_path, mode, backend,
ollama_model, openrouter_model,
s.get("openrouter_url",""), s.get("openrouter_api_key",""),
custom_prompt,
)
# ★ task_id를 이력에 함께 저장
append_history({
"id": file_id,
"task_id": task.id, # ← 업데이트 매칭 키
"type": "ocr",
"status": "processing",
"id": file_id, "task_id": task.id, "type": "ocr",
"status": "processing",
"timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"username": user["username"],
"input": {"filename": file.filename, "size_bytes": file_size, "format": ext.upper()},
"settings": {
"backend": backend,
"mode": mode,
"ocr_lang": os.getenv("OCR_LANG", "korean"),
"ollama_model": ollama_model if backend == "ollama" else "",
"ollama_timeout":s.get("ollama_timeout", 600),
"custom_prompt": custom_prompt[:200] if custom_prompt else "",
"username": user["username"],
"input": {"filename": file.filename, "size_bytes": file_size, "format": ext.upper()},
"settings": {
"backend": backend,
"mode": mode,
"ocr_lang": os.getenv("OCR_LANG","korean"),
"ollama_model": ollama_model if backend=="ollama" else "",
"openrouter_model": openrouter_model if backend=="openrouter" else "",
"ollama_timeout": s.get("ollama_timeout",600),
"custom_prompt": custom_prompt[:200] if custom_prompt else "",
},
"output": None,
})
return {"task_id": task.id, "file_id": file_id, "filename": file.filename, "mode": mode, "backend": backend}
return {"task_id": task.id, "file_id": file_id,
"filename": file.filename, "mode": mode, "backend": backend}
# ════════════════════════════════════════════════════════════════
# 상태 — task_id 기준으로 이력 업데이트
# 상태
# ════════════════════════════════════════════════════════════════
@app.get("/api/status/{task_id}")
def get_status(task_id: str, user: dict = Depends(require_auth)):
r = celery_app.AsyncResult(task_id)
if r.state == "PENDING":
return {"state": "pending", "progress": 0, "message": "대기 중..."}
if r.state == "PROGRESS":
m = r.info or {}
return {"state": "progress", "progress": m.get("progress",0), "message": m.get("message","처리 중...")}
if r.state == "SUCCESS":
result = r.result or {}
# ★ task_id로 이력 업데이트 (file_id 아님)
_update_history_by_task(task_id, result, success=True)
return {"state": "success", "progress": 100, **result}
if r.state == "FAILURE":
_update_history_by_task(task_id, {}, success=False, error_msg=str(r.info))
return {"state": "failure", "progress": 0, "message": str(r.info)}
return {"state": r.state.lower(), "progress": 0}
if r.state == "PENDING": return {"state":"pending", "progress":0, "message":"대기 중..."}
if r.state == "PROGRESS": m=r.info or {}; return {"state":"progress","progress":m.get("progress",0),"message":m.get("message","처리 중...")}
if r.state == "SUCCESS": _update_history_by_task(task_id, r.result or {}, True); return {"state":"success","progress":100,**(r.result or {})}
if r.state == "FAILURE": _update_history_by_task(task_id, {}, False, str(r.info)); return {"state":"failure","progress":0,"message":str(r.info)}
return {"state":r.state.lower(),"progress":0}
# ════════════════════════════════════════════════════════════════
# 이력
# ════════════════════════════════════════════════════════════════
@app.get("/api/history")
def get_history(page: int=1, per_page: int=15, type_: str="", user: dict=Depends(require_auth)):
def get_history(page:int=1,per_page:int=15,type_:str="",user:dict=Depends(require_auth)):
history = _load_history()
if user.get("role") != "admin": history = [h for h in history if h.get("username")==user["username"]]
if type_ in ("stt","ocr"): history = [h for h in history if h.get("type")==type_]
total = len(history); start = (page-1)*per_page
return {"total": total, "page": page, "per_page": per_page, "items": history[start:start+per_page]}
return {"total":total,"page":page,"per_page":per_page,"items":history[start:start+per_page]}
@app.delete("/api/history/{history_id}")
def delete_history(history_id: str, user: dict=Depends(require_auth)):
if not delete_history_item(history_id): raise HTTPException(404, "이력을 찾을 수 없습니다")
return {"ok": True}
def delete_history(history_id:str,user:dict=Depends(require_auth)):
if not delete_history_item(history_id): raise HTTPException(404,"이력을 찾을 수 없습니다")
return {"ok":True}
@app.delete("/api/history")
def clear_all_history(user: dict=Depends(require_admin)):
clear_history(); return {"ok": True}
def clear_all_history(user:dict=Depends(require_admin)):
clear_history(); return {"ok":True}
# ════════════════════════════════════════════════════════════════
# 다운로드 / Ollama / 설정 / 관리자
# 다운로드
# ════════════════════════════════════════════════════════════════
@app.get("/api/download/{filename}")
def download(filename: str, user: dict=Depends(require_auth)):
if ".." in filename or "/" in filename: raise HTTPException(400, "잘못된 파일명")
def download(filename:str,user:dict=Depends(require_auth)):
if ".." in filename or "/" in filename: raise HTTPException(400,"잘못된 파일명")
path = os.path.join(OUTPUT_DIR, filename)
if not os.path.exists(path): raise HTTPException(404, "파일을 찾을 수 없습니다")
if not os.path.exists(path): raise HTTPException(404,"파일을 찾을 수 없습니다")
media = ("application/vnd.openxmlformats-officedocument.spreadsheetml.sheet"
if filename.endswith(".xlsx") else "text/plain")
return FileResponse(path, media_type=media, filename=filename)
# ════════════════════════════════════════════════════════════════
# Ollama 모델 목록
# ════════════════════════════════════════════════════════════════
@app.get("/api/ollama/models")
def ollama_models(user: dict=Depends(require_auth)):
def ollama_models(user:dict=Depends(require_auth)):
try:
resp = httpx.get(f"{OLLAMA_URL}/api/tags", timeout=8.0); resp.raise_for_status()
return {"models": [m["name"] for m in resp.json().get("models",[])], "connected": True}
return {"models":[m["name"] for m in resp.json().get("models",[])], "connected":True}
except Exception as e:
return {"models": [], "connected": False, "error": str(e)}
return {"models":[], "connected":False, "error":str(e)}
# ════════════════════════════════════════════════════════════════
# OpenRouter 모델 목록 & 연결 테스트
# ════════════════════════════════════════════════════════════════
@app.get("/api/openrouter/models")
def openrouter_models(user: dict = Depends(require_auth)):
s = _load_settings()
api_key = s.get("openrouter_api_key", "")
base_url = s.get("openrouter_url", "https://openrouter.ai/api/v1").rstrip("/")
if not api_key:
return {"models": [], "connected": False, "error": "API 키가 설정되지 않았습니다"}
try:
resp = httpx.get(
f"{base_url}/models",
headers={"Authorization": f"Bearer {api_key}",
"HTTP-Referer": "https://voicescript.local"},
timeout=12.0,
)
resp.raise_for_status()
data = resp.json()
# Vision 모델 필터링 (multimodal 지원 모델)
all_models = data.get("data", [])
vision = [m["id"] for m in all_models
if any(k in str(m.get("architecture", {}).get("modality","")).lower()
for k in ["image","vision","multimodal"])
or any(k in m["id"].lower()
for k in ["vision","claude-3","gemini","gpt-4o","llava","pixtral","qwen-vl","intern","deepseek-vl"])]
text = [m["id"] for m in all_models if m["id"] not in vision]
return {
"models": [m["id"] for m in all_models],
"vision_models": vision,
"text_models": text,
"connected": True,
"total": len(all_models),
}
except httpx.HTTPStatusError as e:
return {"models":[], "connected":False, "error":f"HTTP {e.response.status_code}: API 키를 확인하세요"}
except Exception as e:
return {"models":[], "connected":False, "error":str(e)}
@app.post("/api/openrouter/test")
def openrouter_test(
api_key: str = Form(...),
base_url: str = Form("https://openrouter.ai/api/v1"),
user: dict = Depends(require_auth),
):
"""API 키 연결 테스트"""
try:
resp = httpx.get(
f"{base_url.rstrip('/')}/models",
headers={"Authorization": f"Bearer {api_key}",
"HTTP-Referer": "https://voicescript.local"},
timeout=10.0,
)
resp.raise_for_status()
count = len(resp.json().get("data", []))
return {"ok": True, "message": f"연결 성공 — {count}개 모델 사용 가능"}
except httpx.HTTPStatusError as e:
return {"ok": False, "message": f"인증 실패 (HTTP {e.response.status_code}) — API 키를 확인하세요"}
except Exception as e:
return {"ok": False, "message": f"연결 실패: {str(e)}"}
# ════════════════════════════════════════════════════════════════
# 설정
# ════════════════════════════════════════════════════════════════
@app.get("/api/settings")
def get_settings(user: dict=Depends(require_auth)): return _load_settings()
def get_settings(user: dict = Depends(require_auth)):
s = _load_settings()
# API 키는 마스킹해서 반환
result = dict(s)
if result.get("openrouter_api_key"):
key = result["openrouter_api_key"]
result["openrouter_api_key_masked"] = key[:8] + "..." + key[-4:] if len(key) > 12 else "****"
else:
result["openrouter_api_key_masked"] = ""
result["openrouter_api_key"] = "" # 평문은 반환 안 함
return result
@app.post("/api/settings")
def save_settings_endpoint(
stt_ollama_model: str = Form(""),
ocr_ollama_model: str = Form(""),
cpu_threads: str = Form("0"),
stt_timeout: str = Form("0"),
ollama_timeout: str = Form("600"),
stt_ollama_model: str = Form(""),
ocr_ollama_model: str = Form(""),
cpu_threads: str = Form("0"),
stt_timeout: str = Form("0"),
ollama_timeout: str = Form("600"),
openrouter_url: str = Form("https://openrouter.ai/api/v1"),
openrouter_api_key: str = Form(""),
openrouter_stt_model: str = Form(""),
openrouter_ocr_model: str = Form(""),
user: dict = Depends(require_auth),
):
def _int(v, d):
try: return max(0, int(v))
except: return d
current = _load_settings()
# API 키가 비어있으면 기존 값 유지
final_key = openrouter_api_key.strip() if openrouter_api_key.strip() else current.get("openrouter_api_key","")
data = {
"stt_ollama_model": stt_ollama_model,
"ocr_ollama_model": ocr_ollama_model,
"cpu_threads": _int(cpu_threads, 0),
"stt_timeout": _int(stt_timeout, 0),
"ollama_timeout": _int(ollama_timeout, 600),
"stt_ollama_model": stt_ollama_model,
"ocr_ollama_model": ocr_ollama_model,
"cpu_threads": _int(cpu_threads, 0),
"stt_timeout": _int(stt_timeout, 0),
"ollama_timeout": _int(ollama_timeout, 600),
"openrouter_url": openrouter_url.strip() or "https://openrouter.ai/api/v1",
"openrouter_api_key": final_key,
"openrouter_stt_model": openrouter_stt_model,
"openrouter_ocr_model": openrouter_ocr_model,
}
_save_settings(data)
return {"ok": True, "settings": data}
return {"ok": True, "settings": {k: v for k, v in data.items() if k != "openrouter_api_key"}}
# ════════════════════════════════════════════════════════════════
# 관리자
# ════════════════════════════════════════════════════════════════
@app.get("/api/admin/users")
def admin_list_users(user: dict=Depends(require_admin)): return {"users": list_users()}
def admin_list_users(user:dict=Depends(require_admin)): return {"users":list_users()}
@app.post("/api/admin/users")
def admin_create_user(
username: str = Form(...),
password: str = Form(...),
perm_stt: str = Form("false"),
perm_ocr: str = Form("false"),
allowed_stt_models: str = Form(""), # 콤마 구분 모델명
allowed_ocr_models: str = Form(""),
user: dict = Depends(require_admin),
username:str=Form(...),password:str=Form(...),
perm_stt:str=Form("false"),perm_ocr:str=Form("false"),
allowed_stt_models:str=Form(""),allowed_ocr_models:str=Form(""),
user:dict=Depends(require_admin),
):
def _parse_models(s): return [m.strip() for m in s.split(",") if m.strip()]
perms = {
"stt": perm_stt.lower() == "true",
"ocr": perm_ocr.lower() == "true",
"allowed_stt_models": _parse_models(allowed_stt_models),
"allowed_ocr_models": _parse_models(allowed_ocr_models),
}
ok, msg = create_user(username, password, perms)
if not ok: raise HTTPException(400, msg)
return {"ok": True, "message": msg}
def _p(s): return [m.strip() for m in s.split(",") if m.strip()]
perms={"stt":perm_stt.lower()=="true","ocr":perm_ocr.lower()=="true",
"allowed_stt_models":_p(allowed_stt_models),"allowed_ocr_models":_p(allowed_ocr_models)}
ok,msg=create_user(username,password,perms)
if not ok: raise HTTPException(400,msg)
return {"ok":True,"message":msg}
@app.put("/api/admin/users/{username}")
def admin_update_user(
username: str,
perm_stt: str = Form("false"),
perm_ocr: str = Form("false"),
password: str = Form(""),
allowed_stt_models: str = Form(""),
allowed_ocr_models: str = Form(""),
user: dict = Depends(require_admin),
username:str,perm_stt:str=Form("false"),perm_ocr:str=Form("false"),
password:str=Form(""),allowed_stt_models:str=Form(""),allowed_ocr_models:str=Form(""),
user:dict=Depends(require_admin),
):
def _parse_models(s): return [m.strip() for m in s.split(",") if m.strip()]
perms = {
"stt": perm_stt.lower() == "true",
"ocr": perm_ocr.lower() == "true",
"allowed_stt_models": _parse_models(allowed_stt_models),
"allowed_ocr_models": _parse_models(allowed_ocr_models),
}
ok, msg = update_user(username, perms, password or None)
if not ok: raise HTTPException(400, msg)
return {"ok": True, "message": msg}
def _p(s): return [m.strip() for m in s.split(",") if m.strip()]
perms={"stt":perm_stt.lower()=="true","ocr":perm_ocr.lower()=="true",
"allowed_stt_models":_p(allowed_stt_models),"allowed_ocr_models":_p(allowed_ocr_models)}
ok,msg=update_user(username,perms,password or None)
if not ok: raise HTTPException(400,msg)
return {"ok":True,"message":msg}
@app.delete("/api/admin/users/{username}")
def admin_delete_user(username: str, user: dict=Depends(require_admin)):
ok, msg = delete_user(username)
if not ok: raise HTTPException(400, msg)
return {"ok": True, "message": msg}
def admin_delete_user(username:str,user:dict=Depends(require_admin)):
ok,msg=delete_user(username)
if not ok: raise HTTPException(400,msg)
return {"ok":True,"message":msg}
@app.post("/api/cleanup")
def cleanup(user: dict=Depends(require_auth)): return {"removed": _cleanup_outputs()}
def cleanup(user:dict=Depends(require_auth)): return {"removed":_cleanup_outputs()}
# ════════════════════════════════════════════════════════════════
# 유틸
# ════════════════════════════════════════════════════════════════
def _check_size(request: Request):
def _check_size(request):
cl = request.headers.get("content-length")
if cl and int(cl) > MAX_UPLOAD_BYTES: raise HTTPException(413, f"파일이 너무 큽니다. 최대 {MAX_UPLOAD_BYTES//1024//1024}MB")
if cl and int(cl) > MAX_UPLOAD_BYTES:
raise HTTPException(413, f"파일이 너무 큽니다. 최대 {MAX_UPLOAD_BYTES//1024//1024}MB")
def _cleanup_outputs() -> int:
def _cleanup_outputs():
if OUTPUT_KEEP_SECS == 0: return 0
cutoff = time.time() - OUTPUT_KEEP_SECS; removed = 0
for f in glob.glob(os.path.join(OUTPUT_DIR, "*")):
for f in glob.glob(os.path.join(OUTPUT_DIR,"*")):
try:
if os.path.getmtime(f) < cutoff: os.remove(f); removed += 1
except: pass
return removed
def _ext(fn): return fn.rsplit(".", 1)[-1].lower() if "." in fn else ""
def _ext(fn): return fn.rsplit(".",1)[-1].lower() if "." in fn else ""
async def _save(file, path):
written = 0
async with aiofiles.open(path, "wb") as f:
while chunk := await file.read(1024 * 1024):
async with aiofiles.open(path,"wb") as f:
while chunk := await file.read(1024*1024):
written += len(chunk)
if written > MAX_UPLOAD_BYTES:
await f.close(); os.remove(path)