Add PDF processing and multi-format document conversion
Features added: - PDF to image conversion with configurable DPI - Multi-page PDF processing with OCR - Export to Markdown, HTML, DOCX, and JSON formats - Automatic image extraction from PDFs - Formula and formatting preservation - Real-time progress tracking for multi-page documents Backend changes: - New /api/process-pdf endpoint for PDF processing - pdf_utils.py: PDF conversion and image extraction utilities - format_converter.py: Document format conversion (MD, HTML, DOCX) - Updated dependencies: PyMuPDF, img2pdf, python-docx, markdown Frontend changes: - File type toggle (Image OCR / PDF Processing) - PDFProcessor component with format selection - Updated ImageUpload to support both images and PDFs - Progress bars for multi-page processing - Download options for converted documents Documentation: - Updated README with PDF processing features - Added API documentation for /api/process-pdf endpoint - Added format conversion examples
This commit is contained in:
326
backend/format_converter.py
Normal file
326
backend/format_converter.py
Normal file
@@ -0,0 +1,326 @@
|
||||
"""
|
||||
Document Format Conversion Utilities
|
||||
Handles conversion to Markdown, HTML, DOCX while preserving formatting
|
||||
"""
|
||||
|
||||
import re
|
||||
from typing import List, Dict, Any
|
||||
from io import BytesIO
|
||||
from docx import Document
|
||||
from docx.shared import Pt, Inches, RGBColor
|
||||
from docx.enum.text import WD_PARAGRAPH_ALIGNMENT
|
||||
import markdown
|
||||
import base64
|
||||
from PIL import Image
|
||||
|
||||
|
||||
class DocumentConverter:
|
||||
"""Handles conversion of OCR results to various document formats"""
|
||||
|
||||
def __init__(self):
|
||||
self.page_separator = '<--- Page Split --->'
|
||||
|
||||
def to_markdown(self, pages_content: List[Dict[str, Any]], include_images: bool = True) -> str:
|
||||
"""
|
||||
Convert OCR results to Markdown format
|
||||
|
||||
Args:
|
||||
pages_content: List of page dictionaries with text and metadata
|
||||
include_images: Whether to include image references
|
||||
|
||||
Returns:
|
||||
Markdown formatted string
|
||||
"""
|
||||
md_content = []
|
||||
|
||||
for idx, page in enumerate(pages_content):
|
||||
# Add page header
|
||||
md_content.append(f"# Page {idx + 1}\n")
|
||||
|
||||
text = page.get('text', '')
|
||||
|
||||
# Process and clean the text
|
||||
if include_images and 'images' in page:
|
||||
# Replace image placeholders with actual markdown image syntax
|
||||
for img_idx, img_data in enumerate(page.get('images', [])):
|
||||
placeholder = f"[IMAGE_{img_idx}]"
|
||||
img_ref = f""
|
||||
text = text.replace(placeholder, img_ref)
|
||||
|
||||
md_content.append(text)
|
||||
md_content.append("\n\n---\n\n") # Page separator
|
||||
|
||||
return "\n".join(md_content)
|
||||
|
||||
def to_html(self, pages_content: List[Dict[str, Any]], include_images: bool = True) -> str:
|
||||
"""
|
||||
Convert OCR results to HTML format
|
||||
|
||||
Args:
|
||||
pages_content: List of page dictionaries with text and metadata
|
||||
include_images: Whether to include images
|
||||
|
||||
Returns:
|
||||
HTML formatted string
|
||||
"""
|
||||
html_parts = []
|
||||
|
||||
# HTML header
|
||||
html_parts.append("""
|
||||
<!DOCTYPE html>
|
||||
<html lang="en">
|
||||
<head>
|
||||
<meta charset="UTF-8">
|
||||
<meta name="viewport" content="width=device-width, initial-scale=1.0">
|
||||
<title>OCR Results</title>
|
||||
<style>
|
||||
body {
|
||||
font-family: 'Segoe UI', Tahoma, Geneva, Verdana, sans-serif;
|
||||
max-width: 900px;
|
||||
margin: 40px auto;
|
||||
padding: 20px;
|
||||
line-height: 1.6;
|
||||
background-color: #f5f5f5;
|
||||
}
|
||||
.page {
|
||||
background: white;
|
||||
padding: 40px;
|
||||
margin-bottom: 30px;
|
||||
box-shadow: 0 2px 8px rgba(0,0,0,0.1);
|
||||
border-radius: 8px;
|
||||
}
|
||||
.page-header {
|
||||
color: #333;
|
||||
border-bottom: 2px solid #4CAF50;
|
||||
padding-bottom: 10px;
|
||||
margin-bottom: 20px;
|
||||
}
|
||||
table {
|
||||
border-collapse: collapse;
|
||||
width: 100%;
|
||||
margin: 20px 0;
|
||||
}
|
||||
th, td {
|
||||
border: 1px solid #ddd;
|
||||
padding: 12px;
|
||||
text-align: left;
|
||||
}
|
||||
th {
|
||||
background-color: #4CAF50;
|
||||
color: white;
|
||||
}
|
||||
tr:nth-child(even) {
|
||||
background-color: #f9f9f9;
|
||||
}
|
||||
img {
|
||||
max-width: 100%;
|
||||
height: auto;
|
||||
margin: 15px 0;
|
||||
border-radius: 4px;
|
||||
}
|
||||
code {
|
||||
background-color: #f4f4f4;
|
||||
padding: 2px 6px;
|
||||
border-radius: 3px;
|
||||
font-family: 'Courier New', monospace;
|
||||
}
|
||||
pre {
|
||||
background-color: #f4f4f4;
|
||||
padding: 15px;
|
||||
border-radius: 5px;
|
||||
overflow-x: auto;
|
||||
}
|
||||
</style>
|
||||
</head>
|
||||
<body>
|
||||
<h1>DeepSeek OCR Results</h1>
|
||||
""")
|
||||
|
||||
# Process each page
|
||||
for idx, page in enumerate(pages_content):
|
||||
html_parts.append(f' <div class="page">')
|
||||
html_parts.append(f' <h2 class="page-header">Page {idx + 1}</h2>')
|
||||
|
||||
text = page.get('text', '')
|
||||
|
||||
# Handle images if present
|
||||
if include_images and 'images' in page:
|
||||
for img_idx, img_data in enumerate(page.get('images', [])):
|
||||
placeholder = f"[IMAGE_{img_idx}]"
|
||||
img_tag = f'<img src="data:image/jpeg;base64,{img_data}" alt="Image {img_idx + 1}" />'
|
||||
text = text.replace(placeholder, img_tag)
|
||||
|
||||
# Convert markdown to HTML if the text appears to be markdown
|
||||
if self._is_markdown(text):
|
||||
html_content = markdown.markdown(text, extensions=['tables', 'fenced_code'])
|
||||
else:
|
||||
# Otherwise, preserve the HTML or wrap in paragraph
|
||||
html_content = text if '<' in text else f'<p>{text.replace(chr(10), "<br>")}</p>'
|
||||
|
||||
html_parts.append(f' {html_content}')
|
||||
html_parts.append(' </div>')
|
||||
|
||||
# HTML footer
|
||||
html_parts.append("""
|
||||
</body>
|
||||
</html>
|
||||
""")
|
||||
|
||||
return "\n".join(html_parts)
|
||||
|
||||
def to_docx(self, pages_content: List[Dict[str, Any]], include_images: bool = True) -> BytesIO:
|
||||
"""
|
||||
Convert OCR results to DOCX format
|
||||
|
||||
Args:
|
||||
pages_content: List of page dictionaries with text and metadata
|
||||
include_images: Whether to include images
|
||||
|
||||
Returns:
|
||||
BytesIO object containing the DOCX file
|
||||
"""
|
||||
doc = Document()
|
||||
|
||||
# Set default font
|
||||
style = doc.styles['Normal']
|
||||
font = style.font
|
||||
font.name = 'Calibri'
|
||||
font.size = Pt(11)
|
||||
|
||||
# Add title
|
||||
title = doc.add_heading('DeepSeek OCR Results', 0)
|
||||
title.alignment = WD_PARAGRAPH_ALIGNMENT.CENTER
|
||||
|
||||
# Process each page
|
||||
for idx, page in enumerate(pages_content):
|
||||
# Add page heading
|
||||
page_heading = doc.add_heading(f'Page {idx + 1}', level=1)
|
||||
page_heading.alignment = WD_PARAGRAPH_ALIGNMENT.LEFT
|
||||
|
||||
text = page.get('text', '')
|
||||
|
||||
# Handle images
|
||||
if include_images and 'images' in page:
|
||||
for img_idx, img_data in enumerate(page.get('images', [])):
|
||||
placeholder = f"[IMAGE_{img_idx}]"
|
||||
|
||||
# Add image to document
|
||||
try:
|
||||
img_bytes = base64.b64decode(img_data)
|
||||
img_stream = BytesIO(img_bytes)
|
||||
doc.add_picture(img_stream, width=Inches(5))
|
||||
text = text.replace(placeholder, '')
|
||||
except Exception as e:
|
||||
print(f"Error adding image to DOCX: {e}")
|
||||
|
||||
# Process text content
|
||||
self._add_formatted_text_to_doc(doc, text)
|
||||
|
||||
# Add page break (except for last page)
|
||||
if idx < len(pages_content) - 1:
|
||||
doc.add_page_break()
|
||||
|
||||
# Save to BytesIO
|
||||
docx_buffer = BytesIO()
|
||||
doc.save(docx_buffer)
|
||||
docx_buffer.seek(0)
|
||||
|
||||
return docx_buffer
|
||||
|
||||
def _is_markdown(self, text: str) -> bool:
|
||||
"""Check if text appears to be markdown formatted"""
|
||||
markdown_patterns = [
|
||||
r'^#+\s', # Headers
|
||||
r'\*\*.*\*\*', # Bold
|
||||
r'\*.*\*', # Italic
|
||||
r'^\*\s', # Lists
|
||||
r'^\d+\.\s', # Numbered lists
|
||||
r'\[.*\]\(.*\)', # Links
|
||||
r'```', # Code blocks
|
||||
]
|
||||
|
||||
for pattern in markdown_patterns:
|
||||
if re.search(pattern, text, re.MULTILINE):
|
||||
return True
|
||||
return False
|
||||
|
||||
def _add_formatted_text_to_doc(self, doc: Document, text: str):
|
||||
"""
|
||||
Add formatted text to document, preserving structure
|
||||
|
||||
Args:
|
||||
doc: Document object
|
||||
text: Text to add
|
||||
"""
|
||||
# Split into paragraphs
|
||||
paragraphs = text.split('\n\n')
|
||||
|
||||
for para in paragraphs:
|
||||
if not para.strip():
|
||||
continue
|
||||
|
||||
# Check for headers
|
||||
if para.startswith('# '):
|
||||
doc.add_heading(para.replace('# ', ''), level=1)
|
||||
elif para.startswith('## '):
|
||||
doc.add_heading(para.replace('## ', ''), level=2)
|
||||
elif para.startswith('### '):
|
||||
doc.add_heading(para.replace('### ', ''), level=3)
|
||||
# Check for tables (simple detection)
|
||||
elif '|' in para and para.count('|') > 2:
|
||||
self._add_table_to_doc(doc, para)
|
||||
# Check for code blocks
|
||||
elif para.startswith('```'):
|
||||
code_text = para.strip('```').strip()
|
||||
p = doc.add_paragraph()
|
||||
run = p.add_run(code_text)
|
||||
run.font.name = 'Courier New'
|
||||
run.font.size = Pt(10)
|
||||
else:
|
||||
# Regular paragraph
|
||||
doc.add_paragraph(para.strip())
|
||||
|
||||
def _add_table_to_doc(self, doc: Document, table_text: str):
|
||||
"""
|
||||
Add a table to the document from markdown-style table text
|
||||
|
||||
Args:
|
||||
doc: Document object
|
||||
table_text: Table in markdown format
|
||||
"""
|
||||
rows = [row.strip() for row in table_text.split('\n') if row.strip()]
|
||||
|
||||
# Filter out separator rows
|
||||
data_rows = [row for row in rows if not re.match(r'^[\|\s\-:]+$', row)]
|
||||
|
||||
if not data_rows:
|
||||
return
|
||||
|
||||
# Parse table data
|
||||
table_data = []
|
||||
for row in data_rows:
|
||||
cells = [cell.strip() for cell in row.split('|')]
|
||||
cells = [c for c in cells if c] # Remove empty cells
|
||||
if cells:
|
||||
table_data.append(cells)
|
||||
|
||||
if not table_data:
|
||||
return
|
||||
|
||||
# Create table
|
||||
max_cols = max(len(row) for row in table_data)
|
||||
table = doc.add_table(rows=len(table_data), cols=max_cols)
|
||||
table.style = 'Light Grid Accent 1'
|
||||
|
||||
# Populate table
|
||||
for i, row_data in enumerate(table_data):
|
||||
row = table.rows[i]
|
||||
for j, cell_text in enumerate(row_data):
|
||||
if j < len(row.cells):
|
||||
row.cells[j].text = cell_text
|
||||
|
||||
# Make header row bold
|
||||
if i == 0:
|
||||
for paragraph in row.cells[j].paragraphs:
|
||||
for run in paragraph.runs:
|
||||
run.font.bold = True
|
||||
206
backend/main.py
206
backend/main.py
@@ -2,18 +2,29 @@ import os
|
||||
import re
|
||||
import tempfile
|
||||
import shutil
|
||||
import base64
|
||||
from typing import List, Dict, Any, Optional
|
||||
from contextlib import asynccontextmanager
|
||||
|
||||
from fastapi import FastAPI, File, UploadFile, Form, HTTPException
|
||||
from fastapi.middleware.cors import CORSMiddleware
|
||||
from fastapi.responses import JSONResponse
|
||||
from fastapi.responses import JSONResponse, StreamingResponse
|
||||
import torch
|
||||
from transformers import AutoModel, AutoTokenizer
|
||||
from PIL import Image
|
||||
import uvicorn
|
||||
from decouple import config as env_config
|
||||
|
||||
# Import PDF and document conversion utilities
|
||||
from pdf_utils import (
|
||||
pdf_to_images_high_quality,
|
||||
images_to_pdf,
|
||||
extract_ref_patterns,
|
||||
crop_images_from_refs,
|
||||
clean_markdown_content
|
||||
)
|
||||
from format_converter import DocumentConverter
|
||||
|
||||
# -----------------------------
|
||||
# Lifespan context for model loading
|
||||
# -----------------------------
|
||||
@@ -373,6 +384,199 @@ async def ocr_inference(
|
||||
if out_dir:
|
||||
shutil.rmtree(out_dir, ignore_errors=True)
|
||||
|
||||
@app.post("/api/process-pdf")
|
||||
async def process_pdf(
|
||||
pdf_file: UploadFile = File(...),
|
||||
mode: str = Form("plain_ocr"),
|
||||
prompt: str = Form(""),
|
||||
output_format: str = Form("markdown"), # markdown, html, docx, json
|
||||
grounding: bool = Form(False),
|
||||
include_caption: bool = Form(False),
|
||||
extract_images: bool = Form(True),
|
||||
dpi: int = Form(144),
|
||||
base_size: int = Form(1024),
|
||||
image_size: int = Form(640),
|
||||
crop_mode: bool = Form(True),
|
||||
):
|
||||
"""
|
||||
Process PDF document with OCR and convert to various formats
|
||||
|
||||
- **pdf_file**: PDF file to process
|
||||
- **mode**: OCR mode (plain_ocr, markdown, tables_csv, etc.)
|
||||
- **prompt**: Custom prompt for freeform mode
|
||||
- **output_format**: Output format (markdown, html, docx, json)
|
||||
- **grounding**: Enable grounding boxes
|
||||
- **include_caption**: Add image descriptions
|
||||
- **extract_images**: Extract images from PDF
|
||||
- **dpi**: PDF rendering resolution (default: 144)
|
||||
- **base_size**: Base processing size
|
||||
- **image_size**: Image size parameter
|
||||
- **crop_mode**: Enable crop mode
|
||||
"""
|
||||
if model is None or tokenizer is None:
|
||||
raise HTTPException(status_code=503, detail="Model not loaded yet")
|
||||
|
||||
# Validate output format
|
||||
if output_format not in ["markdown", "html", "docx", "json"]:
|
||||
raise HTTPException(status_code=400, detail="Invalid output format. Must be: markdown, html, docx, or json")
|
||||
|
||||
try:
|
||||
# Read PDF file
|
||||
pdf_bytes = await pdf_file.read()
|
||||
|
||||
# Convert PDF to images
|
||||
print(f"📄 Converting PDF to images (DPI: {dpi})...")
|
||||
images = pdf_to_images_high_quality(pdf_bytes, dpi=dpi)
|
||||
total_pages = len(images)
|
||||
print(f"✅ Converted {total_pages} pages")
|
||||
|
||||
# Process each page
|
||||
pages_content = []
|
||||
converter = DocumentConverter()
|
||||
|
||||
for page_idx, img in enumerate(images):
|
||||
print(f"🔍 Processing page {page_idx + 1}/{total_pages}...")
|
||||
|
||||
# Build prompt for this page
|
||||
prompt_text = build_prompt(
|
||||
mode=mode,
|
||||
user_prompt=prompt,
|
||||
grounding=grounding,
|
||||
find_term=None,
|
||||
schema=None,
|
||||
include_caption=include_caption,
|
||||
)
|
||||
|
||||
# Save image temporarily
|
||||
tmp_img = None
|
||||
out_dir = None
|
||||
try:
|
||||
with tempfile.NamedTemporaryFile(delete=False, suffix=".png") as tmp:
|
||||
img.save(tmp, format="PNG")
|
||||
tmp_img = tmp.name
|
||||
|
||||
orig_w, orig_h = img.size
|
||||
out_dir = tempfile.mkdtemp(prefix="dsocr_pdf_")
|
||||
|
||||
# Run inference
|
||||
res = model.infer(
|
||||
tokenizer,
|
||||
prompt=prompt_text,
|
||||
image_file=tmp_img,
|
||||
output_path=out_dir,
|
||||
base_size=base_size,
|
||||
image_size=image_size,
|
||||
crop_mode=crop_mode,
|
||||
save_results=False,
|
||||
test_compress=False,
|
||||
eval_mode=True,
|
||||
)
|
||||
|
||||
# Normalize response
|
||||
if isinstance(res, str):
|
||||
text = res.strip()
|
||||
elif isinstance(res, dict) and "text" in res:
|
||||
text = str(res["text"]).strip()
|
||||
elif isinstance(res, (list, tuple)):
|
||||
text = "\n".join(map(str, res)).strip()
|
||||
else:
|
||||
text = ""
|
||||
|
||||
if not text:
|
||||
mmd = os.path.join(out_dir, "result.mmd")
|
||||
if os.path.exists(mmd):
|
||||
with open(mmd, "r", encoding="utf-8") as fh:
|
||||
text = fh.read().strip()
|
||||
if not text:
|
||||
text = f"No text returned for page {page_idx + 1}."
|
||||
|
||||
# Extract images if requested
|
||||
page_images = []
|
||||
if extract_images:
|
||||
matches, matches_image, matches_other = extract_ref_patterns(text)
|
||||
if matches_image:
|
||||
cropped = crop_images_from_refs(img, matches)
|
||||
for cropped_img in cropped:
|
||||
# Convert to base64
|
||||
img_buffer = tempfile.NamedTemporaryFile(delete=False, suffix=".jpg")
|
||||
cropped_img.save(img_buffer.name, format="JPEG", quality=95)
|
||||
with open(img_buffer.name, "rb") as f:
|
||||
img_b64 = base64.b64encode(f.read()).decode('utf-8')
|
||||
page_images.append(img_b64)
|
||||
os.remove(img_buffer.name)
|
||||
|
||||
# Clean the text and add image placeholders
|
||||
text = clean_markdown_content(text, matches_image, matches_other)
|
||||
for img_idx in range(len(page_images)):
|
||||
text = f"[IMAGE_{img_idx}]\n" + text
|
||||
|
||||
# Parse grounding boxes
|
||||
boxes = parse_detections(text, orig_w, orig_h) if ("<|det|>" in text or "<|ref|>" in text) else []
|
||||
|
||||
# Clean grounding tags from display text
|
||||
display_text = clean_grounding_text(text) if ("<|ref|>" in text or "<|grounding|>" in text) else text
|
||||
|
||||
pages_content.append({
|
||||
'page_number': page_idx + 1,
|
||||
'text': display_text,
|
||||
'raw_text': text,
|
||||
'boxes': boxes,
|
||||
'images': page_images,
|
||||
'image_dims': {'w': orig_w, 'h': orig_h}
|
||||
})
|
||||
|
||||
finally:
|
||||
if tmp_img:
|
||||
try:
|
||||
os.remove(tmp_img)
|
||||
except Exception:
|
||||
pass
|
||||
if out_dir:
|
||||
shutil.rmtree(out_dir, ignore_errors=True)
|
||||
|
||||
print(f"✅ Processed all {total_pages} pages")
|
||||
|
||||
# Convert to requested format
|
||||
if output_format == "json":
|
||||
return JSONResponse({
|
||||
"success": True,
|
||||
"total_pages": total_pages,
|
||||
"pages": pages_content,
|
||||
"metadata": {
|
||||
"mode": mode,
|
||||
"grounding": grounding,
|
||||
"extract_images": extract_images,
|
||||
"dpi": dpi
|
||||
}
|
||||
})
|
||||
elif output_format == "markdown":
|
||||
md_content = converter.to_markdown(pages_content, include_images=extract_images)
|
||||
return StreamingResponse(
|
||||
iter([md_content.encode('utf-8')]),
|
||||
media_type="text/markdown",
|
||||
headers={"Content-Disposition": f"attachment; filename=ocr_result.md"}
|
||||
)
|
||||
elif output_format == "html":
|
||||
html_content = converter.to_html(pages_content, include_images=extract_images)
|
||||
return StreamingResponse(
|
||||
iter([html_content.encode('utf-8')]),
|
||||
media_type="text/html",
|
||||
headers={"Content-Disposition": f"attachment; filename=ocr_result.html"}
|
||||
)
|
||||
elif output_format == "docx":
|
||||
docx_buffer = converter.to_docx(pages_content, include_images=extract_images)
|
||||
return StreamingResponse(
|
||||
docx_buffer,
|
||||
media_type="application/vnd.openxmlformats-officedocument.wordprocessingml.document",
|
||||
headers={"Content-Disposition": f"attachment; filename=ocr_result.docx"}
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
import traceback
|
||||
print(f"❌ Error processing PDF: {e}")
|
||||
print(traceback.format_exc())
|
||||
raise HTTPException(status_code=500, detail=f"{type(e).__name__}: {str(e)}")
|
||||
|
||||
if __name__ == "__main__":
|
||||
host = env_config("API_HOST", default="0.0.0.0")
|
||||
port = env_config("API_PORT", default=8000, cast=int)
|
||||
|
||||
214
backend/pdf_utils.py
Normal file
214
backend/pdf_utils.py
Normal file
@@ -0,0 +1,214 @@
|
||||
"""
|
||||
PDF Processing Utilities for DeepSeek OCR
|
||||
Handles PDF to image conversion and batch processing
|
||||
"""
|
||||
|
||||
import io
|
||||
import re
|
||||
from typing import List, Tuple, Dict, Any
|
||||
import fitz # PyMuPDF
|
||||
import img2pdf
|
||||
from PIL import Image
|
||||
import numpy as np
|
||||
|
||||
|
||||
def pdf_to_images_high_quality(pdf_bytes: bytes, dpi: int = 144) -> List[Image.Image]:
|
||||
"""
|
||||
Convert PDF pages to high-quality PIL images
|
||||
|
||||
Args:
|
||||
pdf_bytes: PDF file as bytes
|
||||
dpi: Resolution for rendering (default: 144)
|
||||
|
||||
Returns:
|
||||
List of PIL Image objects, one per page
|
||||
"""
|
||||
images = []
|
||||
|
||||
# Open PDF from bytes
|
||||
pdf_document = fitz.open(stream=pdf_bytes, filetype="pdf")
|
||||
|
||||
# Calculate zoom factor from DPI
|
||||
zoom = dpi / 72.0
|
||||
matrix = fitz.Matrix(zoom, zoom)
|
||||
|
||||
# Process each page
|
||||
for page_num in range(pdf_document.page_count):
|
||||
page = pdf_document[page_num]
|
||||
|
||||
# Render page to pixmap
|
||||
pixmap = page.get_pixmap(matrix=matrix, alpha=False)
|
||||
|
||||
# Allow large images
|
||||
Image.MAX_IMAGE_PIXELS = None
|
||||
|
||||
# Convert to PIL Image
|
||||
img_data = pixmap.tobytes("png")
|
||||
img = Image.open(io.BytesIO(img_data))
|
||||
|
||||
# Ensure RGB mode
|
||||
if img.mode in ('RGBA', 'LA'):
|
||||
background = Image.new('RGB', img.size, (255, 255, 255))
|
||||
background.paste(img, mask=img.split()[-1] if img.mode == 'RGBA' else None)
|
||||
img = background
|
||||
elif img.mode != 'RGB':
|
||||
img = img.convert('RGB')
|
||||
|
||||
images.append(img)
|
||||
|
||||
pdf_document.close()
|
||||
return images
|
||||
|
||||
|
||||
def images_to_pdf(pil_images: List[Image.Image]) -> bytes:
|
||||
"""
|
||||
Convert list of PIL images to PDF bytes
|
||||
|
||||
Args:
|
||||
pil_images: List of PIL Image objects
|
||||
|
||||
Returns:
|
||||
PDF file as bytes
|
||||
"""
|
||||
if not pil_images:
|
||||
return b''
|
||||
|
||||
image_bytes_list = []
|
||||
|
||||
for img in pil_images:
|
||||
# Ensure RGB mode
|
||||
if img.mode != 'RGB':
|
||||
img = img.convert('RGB')
|
||||
|
||||
# Convert to JPEG bytes
|
||||
img_buffer = io.BytesIO()
|
||||
img.save(img_buffer, format='JPEG', quality=95)
|
||||
img_bytes = img_buffer.getvalue()
|
||||
image_bytes_list.append(img_bytes)
|
||||
|
||||
# Convert to PDF
|
||||
pdf_bytes = img2pdf.convert(image_bytes_list)
|
||||
return pdf_bytes
|
||||
|
||||
|
||||
def extract_ref_patterns(text: str) -> Tuple[List[Tuple], List[str], List[str]]:
|
||||
"""
|
||||
Extract reference patterns from OCR output
|
||||
|
||||
Args:
|
||||
text: OCR output text with reference tags
|
||||
|
||||
Returns:
|
||||
Tuple of (all_matches, image_matches, other_matches)
|
||||
"""
|
||||
pattern = r'(<\|ref\|>(.*?)<\|/ref\|><\|det\|>(.*?)<\|/det\|>)'
|
||||
matches = re.findall(pattern, text, re.DOTALL)
|
||||
|
||||
matches_image = []
|
||||
matches_other = []
|
||||
|
||||
for match in matches:
|
||||
if '<|ref|>image<|/ref|>' in match[0]:
|
||||
matches_image.append(match[0])
|
||||
else:
|
||||
matches_other.append(match[0])
|
||||
|
||||
return matches, matches_image, matches_other
|
||||
|
||||
|
||||
def parse_coordinates(ref_text: Tuple, image_width: int, image_height: int) -> Dict[str, Any]:
|
||||
"""
|
||||
Parse coordinates from reference text
|
||||
|
||||
Args:
|
||||
ref_text: Tuple of (full_match, label, coordinates)
|
||||
image_width: Image width in pixels
|
||||
image_height: Image height in pixels
|
||||
|
||||
Returns:
|
||||
Dictionary with label and scaled coordinates
|
||||
"""
|
||||
try:
|
||||
label_type = ref_text[1]
|
||||
cor_list = eval(ref_text[2])
|
||||
|
||||
# Scale coordinates from 0-999 to actual pixels
|
||||
scaled_boxes = []
|
||||
for points in cor_list:
|
||||
x1, y1, x2, y2 = points
|
||||
scaled_box = [
|
||||
int(x1 / 999 * image_width),
|
||||
int(y1 / 999 * image_height),
|
||||
int(x2 / 999 * image_width),
|
||||
int(y2 / 999 * image_height)
|
||||
]
|
||||
scaled_boxes.append(scaled_box)
|
||||
|
||||
return {
|
||||
'label': label_type,
|
||||
'boxes': scaled_boxes
|
||||
}
|
||||
except Exception as e:
|
||||
print(f"Error parsing coordinates: {e}")
|
||||
return None
|
||||
|
||||
|
||||
def crop_images_from_refs(image: Image.Image, refs: List[Tuple]) -> List[Image.Image]:
|
||||
"""
|
||||
Crop images based on reference bounding boxes
|
||||
|
||||
Args:
|
||||
image: Source PIL Image
|
||||
refs: List of reference tuples
|
||||
|
||||
Returns:
|
||||
List of cropped PIL Images
|
||||
"""
|
||||
cropped_images = []
|
||||
image_width, image_height = image.size
|
||||
|
||||
for ref in refs:
|
||||
coord_data = parse_coordinates(ref, image_width, image_height)
|
||||
if coord_data and coord_data['label'] == 'image':
|
||||
for box in coord_data['boxes']:
|
||||
x1, y1, x2, y2 = box
|
||||
try:
|
||||
cropped = image.crop((x1, y1, x2, y2))
|
||||
cropped_images.append(cropped)
|
||||
except Exception as e:
|
||||
print(f"Error cropping image: {e}")
|
||||
continue
|
||||
|
||||
return cropped_images
|
||||
|
||||
|
||||
def clean_markdown_content(content: str, image_refs: List[str], other_refs: List[str]) -> str:
|
||||
"""
|
||||
Clean markdown content by removing reference tags
|
||||
|
||||
Args:
|
||||
content: Raw OCR output with tags
|
||||
image_refs: List of image reference tags
|
||||
other_refs: List of other reference tags
|
||||
|
||||
Returns:
|
||||
Cleaned markdown content
|
||||
"""
|
||||
cleaned = content
|
||||
|
||||
# Remove image reference tags (will be replaced with markdown images)
|
||||
for ref in image_refs:
|
||||
cleaned = cleaned.replace(ref, '')
|
||||
|
||||
# Remove other reference tags and clean up formatting
|
||||
for ref in other_refs:
|
||||
cleaned = cleaned.replace(ref, '')
|
||||
|
||||
# Clean up LaTeX and formatting
|
||||
cleaned = (cleaned
|
||||
.replace('\\coloneqq', ':=')
|
||||
.replace('\\eqqcolon', '=:')
|
||||
.replace('\n\n\n\n', '\n\n')
|
||||
.replace('\n\n\n', '\n\n'))
|
||||
|
||||
return cleaned
|
||||
@@ -11,3 +11,7 @@ pillow
|
||||
safetensors
|
||||
torch
|
||||
python-decouple>=3.8
|
||||
PyMuPDF>=1.23.0
|
||||
img2pdf>=0.5.0
|
||||
python-docx>=1.1.0
|
||||
markdown>=3.5.0
|
||||
|
||||
Reference in New Issue
Block a user