358 lines
14 KiB
Python
358 lines
14 KiB
Python
"""
|
|
PDF 변환 Celery Tasks — opendataloader-pdf (Java) 기반
|
|
지원 출력 포맷: html, docx, xlsx, pptx
|
|
"""
|
|
import os, tempfile
|
|
from pathlib import Path
|
|
from celery import Celery
|
|
|
|
REDIS_URL = os.getenv("REDIS_URL", "redis://redis:6379/0")
|
|
celery_app = Celery("whisper_tasks", broker=REDIS_URL, backend=REDIS_URL)
|
|
celery_app.conf.update(
|
|
task_serializer="json", result_serializer="json",
|
|
accept_content=["json"], task_track_started=True, result_expires=86400,
|
|
)
|
|
|
|
OUTPUT_DIR = os.getenv("OUTPUT_DIR", "/data/outputs")
|
|
|
|
|
|
# ── 출력 포맷 상수 ────────────────────────────────────────────────
|
|
SUPPORTED_FORMATS = ("html", "docx", "xlsx", "pptx")
|
|
|
|
|
|
# ── opendataloader-pdf 실행 ────────────────────────────────────────
|
|
def _run_odpdf(pdf_path: str, out_dir: str, formats: list) -> dict:
|
|
"""
|
|
opendataloader-pdf로 PDF를 변환하고 생성된 파일 경로 dict 반환.
|
|
formats: ["html", "json"] 형태 (odpdf 네이티브 포맷)
|
|
"""
|
|
try:
|
|
from opendataloader_pdf import convert
|
|
except ImportError:
|
|
raise Exception("opendataloader-pdf 패키지가 설치되지 않았습니다.")
|
|
|
|
try:
|
|
convert(
|
|
input_path=pdf_path,
|
|
output_dir=out_dir,
|
|
format=formats,
|
|
quiet=True,
|
|
)
|
|
except FileNotFoundError:
|
|
raise Exception("Java가 설치되지 않았습니다. Docker 이미지를 재빌드하세요.")
|
|
except Exception as e:
|
|
raise Exception(f"PDF 파싱 실패: {e}")
|
|
|
|
stem = Path(pdf_path).stem
|
|
result = {}
|
|
for fmt in formats:
|
|
ext = "json" if fmt == "json" else fmt
|
|
candidate = os.path.join(out_dir, f"{stem}.{ext}")
|
|
if os.path.exists(candidate):
|
|
result[fmt] = candidate
|
|
return result
|
|
|
|
|
|
# ── HTML 파일 정리 ─────────────────────────────────────────────────
|
|
def _read_html(path: str) -> str:
|
|
with open(path, "r", encoding="utf-8", errors="replace") as f:
|
|
return f.read()
|
|
|
|
|
|
# ── HTML → DOCX ───────────────────────────────────────────────────
|
|
def _html_to_docx(html_content: str, output_path: str):
|
|
from docx import Document
|
|
from docx.shared import Pt, RGBColor
|
|
from bs4 import BeautifulSoup
|
|
|
|
soup = BeautifulSoup(html_content, "lxml")
|
|
doc = Document()
|
|
|
|
HEADING_MAP = {"h1": 1, "h2": 2, "h3": 3, "h4": 4, "h5": 5, "h6": 6}
|
|
|
|
def _add_table(tag):
|
|
rows = tag.find_all("tr")
|
|
if not rows:
|
|
return
|
|
cols = max(len(r.find_all(["td", "th"])) for r in rows)
|
|
if cols == 0:
|
|
return
|
|
t = doc.add_table(rows=len(rows), cols=cols)
|
|
t.style = "Table Grid"
|
|
for ri, row in enumerate(rows):
|
|
cells = row.find_all(["td", "th"])
|
|
for ci, cell in enumerate(cells):
|
|
if ci < cols:
|
|
t.cell(ri, ci).text = cell.get_text(strip=True)
|
|
|
|
body = soup.find("body") or soup
|
|
for el in body.find_all(
|
|
["h1", "h2", "h3", "h4", "h5", "h6", "p", "table", "ul", "ol"],
|
|
recursive=False,
|
|
) or body.children:
|
|
name = getattr(el, "name", None)
|
|
if name in HEADING_MAP:
|
|
text = el.get_text(strip=True)
|
|
if text:
|
|
doc.add_heading(text, level=HEADING_MAP[name])
|
|
elif name == "p":
|
|
text = el.get_text(strip=True)
|
|
if text:
|
|
doc.add_paragraph(text)
|
|
elif name == "table":
|
|
_add_table(el)
|
|
elif name in ("ul", "ol"):
|
|
for li in el.find_all("li"):
|
|
text = li.get_text(strip=True)
|
|
if text:
|
|
doc.add_paragraph(text, style="List Bullet")
|
|
|
|
# 본문에 태그 없는 경우 전체 텍스트 추가
|
|
if not any(p.text.strip() for p in doc.paragraphs):
|
|
doc.add_paragraph(soup.get_text(separator="\n", strip=True))
|
|
|
|
doc.save(output_path)
|
|
|
|
|
|
# ── HTML → XLSX ────────────────────────────────────────────────────
|
|
def _html_to_xlsx(html_content: str, output_path: str, pdf_name: str):
|
|
import openpyxl
|
|
from openpyxl.styles import Font, PatternFill, Alignment, Border, Side
|
|
from bs4 import BeautifulSoup
|
|
|
|
soup = BeautifulSoup(html_content, "lxml")
|
|
wb = openpyxl.Workbook()
|
|
ws = wb.active
|
|
ws.title = "텍스트"
|
|
|
|
tables = soup.find_all("table")
|
|
if tables:
|
|
# 첫 번째 시트: 전체 텍스트
|
|
row_idx = 1
|
|
for el in (soup.find("body") or soup).find_all(
|
|
["h1","h2","h3","h4","h5","h6","p"], recursive=True
|
|
):
|
|
text = el.get_text(strip=True)
|
|
if text:
|
|
ws.cell(row=row_idx, column=1, value=text)
|
|
row_idx += 1
|
|
|
|
# 각 표마다 시트 추가
|
|
for ti, tbl in enumerate(tables, 1):
|
|
ws2 = wb.create_sheet(title=f"표{ti}")
|
|
rows = tbl.find_all("tr")
|
|
for ri, row in enumerate(rows, 1):
|
|
cells = row.find_all(["td", "th"])
|
|
for ci, cell in enumerate(cells, 1):
|
|
text = cell.get_text(strip=True)
|
|
c = ws2.cell(row=ri, column=ci, value=text)
|
|
if cell.name == "th":
|
|
c.font = Font(bold=True)
|
|
c.fill = PatternFill("solid", fgColor="D9E1F2")
|
|
c.alignment = Alignment(wrap_text=True)
|
|
ws2.column_dimensions[
|
|
openpyxl.utils.get_column_letter(max(len(row.find_all(["td","th"])) for row in rows) or 1)
|
|
]
|
|
else:
|
|
# 표 없음 — 전체 텍스트를 행으로 분리
|
|
lines = [l.strip() for l in soup.get_text(separator="\n").split("\n") if l.strip()]
|
|
for i, line in enumerate(lines, 1):
|
|
ws.cell(row=i, column=1, value=line)
|
|
|
|
wb.save(output_path)
|
|
|
|
|
|
# ── HTML → PPTX ────────────────────────────────────────────────────
|
|
def _html_to_pptx(html_content: str, output_path: str, pdf_name: str):
|
|
from pptx import Presentation
|
|
from pptx.util import Inches, Pt
|
|
from pptx.dml.color import RGBColor
|
|
from pptx.enum.text import PP_ALIGN
|
|
from bs4 import BeautifulSoup, NavigableString
|
|
|
|
soup = BeautifulSoup(html_content, "lxml")
|
|
prs = Presentation()
|
|
prs.slide_width = Inches(13.33)
|
|
prs.slide_height = Inches(7.5)
|
|
|
|
blank_layout = prs.slide_layouts[6] # blank
|
|
|
|
def _new_slide():
|
|
return prs.slides.add_slide(blank_layout)
|
|
|
|
def _add_textbox(slide, text, left, top, width, height, font_size=18, bold=False, color=None):
|
|
txBox = slide.shapes.add_textbox(left, top, width, height)
|
|
tf = txBox.text_frame
|
|
tf.word_wrap = True
|
|
p = tf.paragraphs[0]
|
|
run = p.add_run()
|
|
run.text = text
|
|
run.font.size = Pt(font_size)
|
|
run.font.bold = bold
|
|
if color:
|
|
run.font.color.rgb = RGBColor(*color)
|
|
|
|
HEADING_SIZES = {1: 36, 2: 28, 3: 22, 4: 18, 5: 16, 6: 14}
|
|
HEADING_MAP = {f"h{i}": i for i in range(1, 7)}
|
|
|
|
body = soup.find("body") or soup
|
|
slide = _new_slide()
|
|
y = Inches(0.3)
|
|
margin_x = Inches(0.5)
|
|
slide_w = prs.slide_width - Inches(1.0)
|
|
slide_h = prs.slide_height
|
|
|
|
# 제목 슬라이드
|
|
title_text = (soup.find("title") or soup.find("h1") or "")
|
|
title_str = title_text.get_text(strip=True) if hasattr(title_text, "get_text") else pdf_name
|
|
_add_textbox(slide, title_str, margin_x, Inches(3.0), slide_w, Inches(1.5), font_size=40, bold=True, color=(0x1F, 0x39, 0x7D))
|
|
slide = _new_slide()
|
|
y = Inches(0.3)
|
|
|
|
for el in body.find_all(
|
|
["h1","h2","h3","h4","h5","h6","p","table"], recursive=True
|
|
):
|
|
name = el.name
|
|
if y > slide_h - Inches(0.8):
|
|
slide = _new_slide()
|
|
y = Inches(0.3)
|
|
|
|
if name in HEADING_MAP:
|
|
level = HEADING_MAP[name]
|
|
text = el.get_text(strip=True)
|
|
if not text:
|
|
continue
|
|
fs = HEADING_SIZES.get(level, 18)
|
|
h = Inches(0.6) if level <= 2 else Inches(0.45)
|
|
_add_textbox(slide, text, margin_x, y, slide_w, h,
|
|
font_size=fs, bold=True,
|
|
color=(0x1F, 0x39, 0x7D) if level == 1 else (0x2E, 0x74, 0xB5))
|
|
y += h + Inches(0.1)
|
|
|
|
elif name == "p":
|
|
text = el.get_text(strip=True)
|
|
if not text:
|
|
continue
|
|
lines = (len(text) // 80) + 1
|
|
h = Inches(0.28 * min(lines, 8))
|
|
_add_textbox(slide, text, margin_x, y, slide_w, h, font_size=14)
|
|
y += h + Inches(0.08)
|
|
|
|
elif name == "table":
|
|
rows = el.find_all("tr")
|
|
if not rows:
|
|
continue
|
|
# 표 제목 박스
|
|
_add_textbox(slide, "[ 표 ]", margin_x, y, slide_w, Inches(0.3), font_size=12, bold=True, color=(0x80, 0x80, 0x80))
|
|
y += Inches(0.32)
|
|
row_h = Inches(0.28)
|
|
for row in rows[:20]:
|
|
cells = row.find_all(["td", "th"])
|
|
line = " │ ".join(c.get_text(strip=True) for c in cells)
|
|
if not line.strip():
|
|
continue
|
|
if y > slide_h - Inches(0.5):
|
|
slide = _new_slide()
|
|
y = Inches(0.3)
|
|
is_header = any(c.name == "th" for c in cells)
|
|
_add_textbox(slide, line, margin_x, y, slide_w, row_h,
|
|
font_size=11, bold=is_header)
|
|
y += row_h
|
|
y += Inches(0.1)
|
|
|
|
if len(prs.slides) == 0 or (len(prs.slides) == 1 and not prs.slides[0].shapes):
|
|
slide = _new_slide()
|
|
_add_textbox(slide, soup.get_text(separator="\n", strip=True)[:2000],
|
|
margin_x, Inches(0.3), slide_w, Inches(6.5), font_size=14)
|
|
|
|
prs.save(output_path)
|
|
|
|
|
|
# ══════════════════════════════════════════════════════════════════
|
|
# Celery Task
|
|
# ══════════════════════════════════════════════════════════════════
|
|
@celery_app.task(bind=True, name="tasks.pdf_convert_task", queue="stt")
|
|
def pdf_convert_task(self, file_id: str, pdf_path: str, target_fmt: str):
|
|
"""
|
|
PDF를 target_fmt(html/docx/xlsx/pptx)로 변환.
|
|
"""
|
|
os.makedirs(OUTPUT_DIR, exist_ok=True)
|
|
tmp_dir = tempfile.mkdtemp(prefix=f"odpdf_{file_id}_")
|
|
pdf_name = Path(pdf_path).stem
|
|
|
|
try:
|
|
self.update_state(state="PROGRESS", meta={"progress": 10, "message": "PDF 파싱 중 (Java)..."})
|
|
|
|
# opendataloader-pdf 실행 — HTML + JSON 동시 추출
|
|
odpdf_formats = ["html"]
|
|
if target_fmt in ("docx", "xlsx", "pptx"):
|
|
odpdf_formats.append("json")
|
|
|
|
converted = _run_odpdf(pdf_path, tmp_dir, odpdf_formats)
|
|
|
|
if "html" not in converted:
|
|
# fallback: text 포맷으로 재시도
|
|
converted = _run_odpdf(pdf_path, tmp_dir, ["text"])
|
|
if not converted:
|
|
raise Exception("PDF에서 내용을 추출할 수 없습니다.")
|
|
|
|
self.update_state(state="PROGRESS", meta={"progress": 50, "message": f"{target_fmt.upper()} 변환 중..."})
|
|
|
|
output_filename = f"{file_id}.{target_fmt}"
|
|
output_path = os.path.join(OUTPUT_DIR, output_filename)
|
|
|
|
if target_fmt == "html":
|
|
# HTML은 opendataloader-pdf 직접 출력 사용
|
|
html_src = converted.get("html")
|
|
if html_src:
|
|
import shutil
|
|
shutil.copy2(html_src, output_path)
|
|
else:
|
|
# text fallback
|
|
text_src = converted.get("text", "")
|
|
with open(output_path, "w", encoding="utf-8") as f:
|
|
f.write(f"<html><body><pre>{text_src}</pre></body></html>")
|
|
|
|
else:
|
|
html_src = converted.get("html")
|
|
if html_src:
|
|
html_content = _read_html(html_src)
|
|
else:
|
|
# HTML 없으면 텍스트로 최소 HTML 생성
|
|
text_src = converted.get("text")
|
|
if text_src:
|
|
with open(text_src, "r", encoding="utf-8", errors="replace") as f:
|
|
raw = f.read()
|
|
html_content = f"<html><body><pre>{raw}</pre></body></html>"
|
|
else:
|
|
raise Exception("PDF 파싱 결과가 없습니다.")
|
|
|
|
if target_fmt == "docx":
|
|
_html_to_docx(html_content, output_path)
|
|
elif target_fmt == "xlsx":
|
|
_html_to_xlsx(html_content, output_path, pdf_name)
|
|
elif target_fmt == "pptx":
|
|
_html_to_pptx(html_content, output_path, pdf_name)
|
|
|
|
self.update_state(state="PROGRESS", meta={"progress": 95, "message": "파일 저장 중..."})
|
|
|
|
file_size = os.path.getsize(output_path)
|
|
return {
|
|
"output_file": output_filename,
|
|
"target_fmt": target_fmt,
|
|
"file_size": file_size,
|
|
"pdf_name": pdf_name,
|
|
}
|
|
|
|
except Exception as e:
|
|
raise Exception(f"PDF 변환 실패: {e}")
|
|
|
|
finally:
|
|
try:
|
|
import shutil as _sh
|
|
_sh.rmtree(tmp_dir, ignore_errors=True)
|
|
if os.path.exists(pdf_path):
|
|
os.remove(pdf_path)
|
|
except Exception:
|
|
pass
|