import os import uuid import tempfile import base64 from typing import List, Dict, Any, Optional from contextlib import asynccontextmanager from datetime import datetime, timezone from fastapi import FastAPI, File, UploadFile, Form, HTTPException, Query from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import JSONResponse, StreamingResponse, FileResponse from pydantic import BaseModel from PIL import Image import uvicorn from decouple import config as env_config # Import PDF and document conversion utilities from pdf_utils import ( pdf_to_images_high_quality, images_to_pdf, extract_ref_patterns, crop_images_from_refs, clean_markdown_content ) from format_converter import DocumentConverter from database import init_db, get_db from providers import ( build_registry, parse_detections, clean_grounding_text, ProviderError, GROUNDING_MODES, ) OCR_IMAGES_DIR = env_config("OCR_IMAGES_DIR", default="/data/ocr_images") # ----------------------------- # Lifespan context # ----------------------------- # The model registry holds all available OCR providers. Local models (e.g. # DeepSeek-OCR) are loaded lazily on first use so an Ollama-only deployment # starts instantly and never touches the GPU. registry = None @asynccontextmanager async def lifespan(app: FastAPI): """Build the model registry on startup.""" global registry # Image storage directory os.makedirs(OCR_IMAGES_DIR, exist_ok=True) # Database try: init_db() except Exception as exc: print(f"Warning: database initialization failed: {exc}") # OCR model registry (providers load their models lazily) registry = build_registry() yield # Cleanup print("🛑 Shutting down...") # ----------------------------- # FastAPI app # ----------------------------- app = FastAPI( title="DeepSeek-OCR API", description="Blazing fast OCR with DeepSeek-OCR model 🔥", version="2.0.0", lifespan=lifespan ) # CORS middleware for React frontend CORS_ORIGINS = env_config("CORS_ORIGINS", default="").split(",") CORS_ORIGINS = [o.strip() for o in CORS_ORIGINS if o.strip()] app.add_middleware( CORSMiddleware, allow_origins=CORS_ORIGINS if CORS_ORIGINS else ["http://localhost:3000"], allow_credentials=True, allow_methods=["GET", "POST"], allow_headers=["*"], ) # ----------------------------- # Routes # ----------------------------- @app.get("/") async def root(): return {"message": "DeepSeek-OCR API is running! 🚀", "docs": "/docs"} @app.get("/health") async def health(): return {"status": "healthy", "models": registry.list_models() if registry else []} @app.get("/api/models") async def list_models(): """List the OCR models available for selection in the UI.""" if registry is None: raise HTTPException(status_code=503, detail="Model registry not ready.") return JSONResponse({"models": registry.list_models()}) def _resolve_provider(model_id: Optional[str], mode: str): """Look up the provider and reject capability mismatches (e.g. grounding).""" if registry is None: raise HTTPException(status_code=503, detail="Model registry not ready.") try: provider = registry.get(model_id) except ProviderError as exc: raise HTTPException(status_code=400, detail=str(exc)) if mode in GROUNDING_MODES and not provider.capabilities.get("grounding"): raise HTTPException( status_code=400, detail=f"Model '{provider.label}' does not support grounding modes (e.g. {mode}).", ) return provider @app.post("/api/ocr") async def ocr_inference( image: UploadFile = File(...), model: Optional[str] = Form(None), mode: str = Form("plain_ocr"), prompt: str = Form(""), grounding: bool = Form(False), include_caption: bool = Form(False), find_term: Optional[str] = Form(None), schema: Optional[str] = Form(None), base_size: int = Form(1024), image_size: int = Form(640), crop_mode: bool = Form(True), test_compress: bool = Form(False), ): """ Perform OCR inference on uploaded image - **image**: Image file to process - **model**: OCR model id (see GET /api/models); defaults to the registry default - **mode**: OCR mode (plain_ocr, markdown, tables_csv, etc.) - **prompt**: Custom prompt for freeform mode - **grounding**: Enable grounding boxes (DeepSeek only) - **include_caption**: Add image description - **find_term**: Term to find (for find_ref mode) - **schema**: JSON schema (for kv_json mode) - **base_size/image_size/crop_mode/test_compress**: DeepSeek processing options """ provider = _resolve_provider(model, mode) tmp_img = None try: # Save uploaded file with tempfile.NamedTemporaryFile(delete=False, suffix=".png") as tmp: content = await image.read() tmp.write(content) tmp_img = tmp.name # Get original dimensions try: with Image.open(tmp_img) as im: orig_w, orig_h = im.size except Exception: orig_w = orig_h = None # Run inference through the selected provider text = provider.run( tmp_img, mode=mode, prompt=prompt, grounding=grounding, find_term=find_term, schema=schema, include_caption=include_caption, options={ "base_size": base_size, "image_size": image_size, "crop_mode": crop_mode, "test_compress": test_compress, }, ) if not text: text = "No text returned by model." # Parse grounding boxes (no-op for providers/text without grounding tokens) boxes = parse_detections(text, orig_w or 1, orig_h or 1) if ("<|det|>" in text or "<|ref|>" in text) else [] # Clean grounding tags from display text, but keep the labels display_text = clean_grounding_text(text) if ("<|ref|>" in text or "<|grounding|>" in text) else text # If display text is empty after cleaning but we have boxes, show the labels if not display_text and boxes: display_text = ", ".join([b["label"] for b in boxes]) return JSONResponse({ "success": True, "text": display_text, "raw_text": text, # Include raw model output for debugging "boxes": boxes, "image_dims": {"w": orig_w, "h": orig_h}, "metadata": { "model": provider.id, "model_label": provider.label, "mode": mode, "grounding": grounding or (mode in GROUNDING_MODES), "base_size": base_size, "image_size": image_size, "crop_mode": crop_mode } }) except ProviderError as e: print(f"OCR provider error: {e}") raise HTTPException(status_code=502, detail=str(e)) except HTTPException: raise except Exception as e: print(f"OCR inference error: {type(e).__name__}: {str(e)}") raise HTTPException(status_code=500, detail="An internal error occurred during OCR processing.") finally: if tmp_img: try: os.remove(tmp_img) except Exception: pass @app.post("/api/process-pdf") async def process_pdf( pdf_file: UploadFile = File(...), model: Optional[str] = Form(None), mode: str = Form("plain_ocr"), prompt: str = Form(""), output_format: str = Form("markdown"), # markdown, html, docx, json grounding: bool = Form(False), include_caption: bool = Form(False), extract_images: bool = Form(True), dpi: int = Form(144), base_size: int = Form(1024), image_size: int = Form(640), crop_mode: bool = Form(True), ): """ Process PDF document with OCR and convert to various formats - **pdf_file**: PDF file to process - **mode**: OCR mode (plain_ocr, markdown, tables_csv, etc.) - **prompt**: Custom prompt for freeform mode - **output_format**: Output format (markdown, html, docx, json) - **grounding**: Enable grounding boxes - **include_caption**: Add image descriptions - **extract_images**: Extract images from PDF - **dpi**: PDF rendering resolution (default: 144) - **base_size**: Base processing size - **image_size**: Image size parameter - **crop_mode**: Enable crop mode """ provider = _resolve_provider(model, mode) # Validate output format if output_format not in ["markdown", "html", "docx", "json"]: raise HTTPException(status_code=400, detail="Invalid output format. Must be: markdown, html, docx, or json") try: # Read PDF file pdf_bytes = await pdf_file.read() # Convert PDF to images print(f"📄 Converting PDF to images (DPI: {dpi})...") images = pdf_to_images_high_quality(pdf_bytes, dpi=dpi) total_pages = len(images) print(f"✅ Converted {total_pages} pages") # Process each page pages_content = [] converter = DocumentConverter() for page_idx, img in enumerate(images): print(f"🔍 Processing page {page_idx + 1}/{total_pages}...") # Save image temporarily tmp_img = None try: with tempfile.NamedTemporaryFile(delete=False, suffix=".png") as tmp: img.save(tmp, format="PNG") tmp_img = tmp.name orig_w, orig_h = img.size # Run inference through the selected provider text = provider.run( tmp_img, mode=mode, prompt=prompt, grounding=grounding, find_term=None, schema=None, include_caption=include_caption, options={ "base_size": base_size, "image_size": image_size, "crop_mode": crop_mode, "test_compress": False, }, ) if not text: text = f"No text returned for page {page_idx + 1}." # Extract images if requested page_images = [] if extract_images: matches, matches_image, matches_other = extract_ref_patterns(text) if matches_image: cropped = crop_images_from_refs(img, matches) for cropped_img in cropped: # Convert to base64 img_buffer = tempfile.NamedTemporaryFile(delete=False, suffix=".jpg") cropped_img.save(img_buffer.name, format="JPEG", quality=95) with open(img_buffer.name, "rb") as f: img_b64 = base64.b64encode(f.read()).decode('utf-8') page_images.append(img_b64) os.remove(img_buffer.name) # Clean the text and add image placeholders text = clean_markdown_content(text, matches_image, matches_other) for img_idx in range(len(page_images)): text = f"[IMAGE_{img_idx}]\n" + text # Parse grounding boxes boxes = parse_detections(text, orig_w, orig_h) if ("<|det|>" in text or "<|ref|>" in text) else [] # Clean grounding tags from display text display_text = clean_grounding_text(text) if ("<|ref|>" in text or "<|grounding|>" in text) else text pages_content.append({ 'page_number': page_idx + 1, 'text': display_text, 'raw_text': text, 'boxes': boxes, 'images': page_images, 'image_dims': {'w': orig_w, 'h': orig_h} }) finally: if tmp_img: try: os.remove(tmp_img) except Exception: pass print(f"✅ Processed all {total_pages} pages") # Convert to requested format if output_format == "json": return JSONResponse({ "success": True, "total_pages": total_pages, "pages": pages_content, "metadata": { "model": provider.id, "model_label": provider.label, "mode": mode, "grounding": grounding, "extract_images": extract_images, "dpi": dpi } }) elif output_format == "markdown": md_content = converter.to_markdown(pages_content, include_images=extract_images) return StreamingResponse( iter([md_content.encode('utf-8')]), media_type="text/markdown", headers={"Content-Disposition": f"attachment; filename=ocr_result.md"} ) elif output_format == "html": html_content = converter.to_html(pages_content, include_images=extract_images) return StreamingResponse( iter([html_content.encode('utf-8')]), media_type="text/html", headers={"Content-Disposition": f"attachment; filename=ocr_result.html"} ) elif output_format == "docx": docx_buffer = converter.to_docx(pages_content, include_images=extract_images) return StreamingResponse( docx_buffer, media_type="application/vnd.openxmlformats-officedocument.wordprocessingml.document", headers={"Content-Disposition": f"attachment; filename=ocr_result.docx"} ) except ProviderError as e: print(f"PDF provider error: {e}") raise HTTPException(status_code=502, detail=str(e)) except Exception as e: import traceback print(f"Error processing PDF: {e}") print(traceback.format_exc()) raise HTTPException(status_code=500, detail="An internal error occurred during PDF processing.") # ----------------------------- # Job management routes # ----------------------------- class ReviewRequest(BaseModel): reviewed_text: str reviewer_name: str author: Optional[str] = None book: Optional[str] = None chapter: Optional[str] = None page: Optional[str] = None describe_text: Optional[str] = None freeform_text: Optional[str] = None def _job_row_to_dict(row) -> Dict[str, Any]: """Convert a DB row (RealDictRow) to a plain dict with serialisable values.""" d = dict(row) for key, val in d.items(): if isinstance(val, datetime): d[key] = val.isoformat() elif val is not None and hasattr(val, '__str__') and type(val).__name__ == 'UUID': d[key] = str(val) return d @app.post("/api/jobs") async def commit_job( image: UploadFile = File(...), author: str = Form(""), book: str = Form(""), chapter: str = Form(""), page: str = Form(""), ocr_text: str = Form(""), describe_text: str = Form(""), freeform_text: str = Form(""), mode: str = Form("plain_ocr"), ocr_model: str = Form(""), ): """Commit an OCR job: save the image and insert a DB record.""" job_id = str(uuid.uuid4()) # Determine file extension from original filename or content type original_filename = image.filename or "image" ext = os.path.splitext(original_filename)[1].lower() if not ext: ct = (image.content_type or "").lower() ext_map = { "image/png": ".png", "image/jpeg": ".jpg", "image/jpg": ".jpg", "image/webp": ".webp", "image/gif": ".gif", "image/bmp": ".bmp", } ext = ext_map.get(ct, ".png") image_path = os.path.join(OCR_IMAGES_DIR, f"{job_id}{ext}") try: content = await image.read() with open(image_path, "wb") as f: f.write(content) except Exception as exc: raise HTTPException(status_code=500, detail="Failed to save image file.") try: with get_db() as conn: with conn.cursor() as cur: cur.execute( """ INSERT INTO ocr_jobs (id, author, book, chapter, page, image_path, original_filename, ocr_text, describe_text, freeform_text, mode, ocr_model, status) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, 'unreviewed') RETURNING * """, (job_id, author or None, book or None, chapter or None, page or None, image_path, original_filename, ocr_text or None, describe_text or None, freeform_text or None, mode, ocr_model or None), ) row = cur.fetchone() except Exception as exc: # Clean up saved image if DB insert fails try: os.remove(image_path) except Exception: pass # Unique constraint violation (author + chapter + page already exists) if getattr(exc, 'pgcode', None) == '23505': raise HTTPException( status_code=409, detail="A job with this Author, Chapter, and Page already exists." ) print(f"Job commit DB error: {exc}") raise HTTPException(status_code=500, detail="Failed to save job to database.") return JSONResponse(_job_row_to_dict(row), status_code=201) @app.get("/api/jobs") async def list_jobs( search: Optional[str] = Query(None, description="General text search across all fields"), author: Optional[str] = Query(None), book: Optional[str] = Query(None), chapter: Optional[str] = Query(None), status: Optional[str] = Query(None, description="unreviewed | reviewed"), limit: int = Query(20, ge=1, le=200), offset: int = Query(0, ge=0), ): """Search and list jobs. All filters are optional and combinable.""" conditions = [] params: List[Any] = [] if search: conditions.append( "(author ILIKE %s OR book ILIKE %s OR chapter ILIKE %s " "OR page ILIKE %s OR ocr_text ILIKE %s OR reviewer_name ILIKE %s)" ) like = f"%{search}%" params.extend([like, like, like, like, like, like]) if author: conditions.append("author ILIKE %s") params.append(f"%{author}%") if book: conditions.append("book ILIKE %s") params.append(f"%{book}%") if chapter: conditions.append("chapter ILIKE %s") params.append(f"%{chapter}%") if status: conditions.append("status = %s") params.append(status) where = ("WHERE " + " AND ".join(conditions)) if conditions else "" try: with get_db() as conn: with conn.cursor() as cur: cur.execute( f"SELECT COUNT(*) AS total FROM ocr_jobs {where}", params, ) total = cur.fetchone()["total"] cur.execute( f""" SELECT id, author, book, chapter, page, submitted_at, status, reviewer_name, reviewed_at, mode, ocr_model, original_filename FROM ocr_jobs {where} ORDER BY submitted_at DESC LIMIT %s OFFSET %s """, params + [limit, offset], ) rows = [_job_row_to_dict(r) for r in cur.fetchall()] except Exception as exc: print(f"list_jobs DB error: {exc}") raise HTTPException(status_code=500, detail="Database error.") return JSONResponse({"total": total, "limit": limit, "offset": offset, "jobs": rows}) @app.get("/api/jobs/suggestions") async def job_suggestions(): """Return distinct values for author, book, chapter, and reviewer_name to power autocomplete.""" try: with get_db() as conn: with conn.cursor() as cur: cur.execute(""" SELECT array_remove(array_agg(DISTINCT author ORDER BY author), NULL) AS authors, array_remove(array_agg(DISTINCT book ORDER BY book), NULL) AS books, array_remove(array_agg(DISTINCT chapter ORDER BY chapter), NULL) AS chapters, array_remove(array_agg(DISTINCT reviewer_name ORDER BY reviewer_name), NULL) AS reviewers FROM ocr_jobs """) row = cur.fetchone() except Exception as exc: print(f"suggestions DB error: {exc}") raise HTTPException(status_code=500, detail="Database error.") return JSONResponse({ "authors": row["authors"] or [], "books": row["books"] or [], "chapters": row["chapters"] or [], "reviewers": row["reviewers"] or [], }) @app.get("/api/jobs/{job_id}") async def get_job(job_id: str): """Retrieve full job record including OCR text.""" try: uuid.UUID(job_id) except ValueError: raise HTTPException(status_code=400, detail="Invalid job ID.") try: with get_db() as conn: with conn.cursor() as cur: cur.execute("SELECT * FROM ocr_jobs WHERE id = %s", (job_id,)) row = cur.fetchone() except Exception as exc: print(f"get_job DB error: {exc}") raise HTTPException(status_code=500, detail="Database error.") if not row: raise HTTPException(status_code=404, detail="Job not found.") return JSONResponse(_job_row_to_dict(row)) @app.get("/api/jobs/{job_id}/image") async def get_job_image(job_id: str): """Serve the stored image for a job.""" try: uuid.UUID(job_id) except ValueError: raise HTTPException(status_code=400, detail="Invalid job ID.") try: with get_db() as conn: with conn.cursor() as cur: cur.execute("SELECT image_path FROM ocr_jobs WHERE id = %s", (job_id,)) row = cur.fetchone() except Exception as exc: print(f"get_job_image DB error: {exc}") raise HTTPException(status_code=500, detail="Database error.") if not row: raise HTTPException(status_code=404, detail="Job not found.") path = row["image_path"] if not os.path.isfile(path): raise HTTPException(status_code=404, detail="Image file not found on disk.") return FileResponse(path) @app.put("/api/jobs/{job_id}/review") async def review_job(job_id: str, body: ReviewRequest): """Mark a job as reviewed with the corrected text and reviewer name.""" try: uuid.UUID(job_id) except ValueError: raise HTTPException(status_code=400, detail="Invalid job ID.") try: with get_db() as conn: with conn.cursor() as cur: cur.execute( """ UPDATE ocr_jobs SET status = 'reviewed', reviewed_text = %s, reviewer_name = %s, reviewed_at = NOW(), author = %s, book = %s, chapter = %s, page = %s, describe_text = %s, freeform_text = %s WHERE id = %s RETURNING * """, ( body.reviewed_text, body.reviewer_name, body.author or None, body.book or None, body.chapter or None, body.page or None, body.describe_text, body.freeform_text, job_id, ), ) row = cur.fetchone() except Exception as exc: print(f"review_job DB error: {exc}") raise HTTPException(status_code=500, detail="Database error.") if not row: raise HTTPException(status_code=404, detail="Job not found.") return JSONResponse(_job_row_to_dict(row)) class StatusRequest(BaseModel): status: str reviewer_name: Optional[str] = None @app.put("/api/jobs/{job_id}/status") async def set_job_status(job_id: str, body: StatusRequest): """Toggle a job's reviewed status without touching its text or metadata. Marking 'reviewed' requires a reviewer_name and stamps reviewed_at. Marking 'unreviewed' clears reviewed_at while preserving reviewed_text. """ try: uuid.UUID(job_id) except ValueError: raise HTTPException(status_code=400, detail="Invalid job ID.") if body.status not in ("reviewed", "unreviewed"): raise HTTPException(status_code=400, detail="status must be 'reviewed' or 'unreviewed'.") if body.status == "reviewed" and not (body.reviewer_name or "").strip(): raise HTTPException(status_code=400, detail="Reviewer name is required to mark reviewed.") try: with get_db() as conn: with conn.cursor() as cur: if body.status == "reviewed": cur.execute( """ UPDATE ocr_jobs SET status = 'reviewed', reviewer_name = %s, reviewed_at = NOW() WHERE id = %s RETURNING * """, (body.reviewer_name.strip(), job_id), ) else: cur.execute( """ UPDATE ocr_jobs SET status = 'unreviewed', reviewed_at = NULL WHERE id = %s RETURNING * """, (job_id,), ) row = cur.fetchone() except Exception as exc: print(f"set_job_status DB error: {exc}") raise HTTPException(status_code=500, detail="Database error.") if not row: raise HTTPException(status_code=404, detail="Job not found.") return JSONResponse(_job_row_to_dict(row)) class JobDescribeRequest(BaseModel): model: Optional[str] = None @app.post("/api/jobs/{job_id}/describe") async def describe_job(job_id: str, body: JobDescribeRequest): """Run Describe mode on a job's stored image and save the result to describe_text.""" try: uuid.UUID(job_id) except ValueError: raise HTTPException(status_code=400, detail="Invalid job ID.") # Look up the stored image for this job try: with get_db() as conn: with conn.cursor() as cur: cur.execute("SELECT image_path FROM ocr_jobs WHERE id = %s", (job_id,)) row = cur.fetchone() except Exception as exc: print(f"describe_job lookup DB error: {exc}") raise HTTPException(status_code=500, detail="Database error.") if not row: raise HTTPException(status_code=404, detail="Job not found.") image_path = row["image_path"] if not image_path or not os.path.isfile(image_path): raise HTTPException(status_code=404, detail="Image file not found on disk.") provider = _resolve_provider(body.model, "describe") try: text = provider.run( image_path, mode="describe", prompt="", grounding=False, find_term=None, schema=None, include_caption=False, options={"base_size": 1024, "image_size": 640, "crop_mode": True, "test_compress": False}, ) except ProviderError as e: print(f"describe_job provider error: {e}") raise HTTPException(status_code=502, detail=str(e)) except Exception as e: print(f"describe_job inference error: {type(e).__name__}: {e}") raise HTTPException(status_code=500, detail="An internal error occurred during description.") display_text = clean_grounding_text(text) if ("<|ref|>" in text or "<|grounding|>" in text) else text # Persist the generated description on the job try: with get_db() as conn: with conn.cursor() as cur: cur.execute( "UPDATE ocr_jobs SET describe_text = %s WHERE id = %s RETURNING *", (display_text, job_id), ) updated = cur.fetchone() except Exception as exc: print(f"describe_job save DB error: {exc}") raise HTTPException(status_code=500, detail="Database error.") if not updated: raise HTTPException(status_code=404, detail="Job not found.") return JSONResponse(_job_row_to_dict(updated)) @app.delete("/api/jobs/{job_id}") async def delete_job(job_id: str): """Delete a job record and its stored image.""" try: uuid.UUID(job_id) except ValueError: raise HTTPException(status_code=400, detail="Invalid job ID.") try: with get_db() as conn: with conn.cursor() as cur: cur.execute( "DELETE FROM ocr_jobs WHERE id = %s RETURNING image_path", (job_id,), ) row = cur.fetchone() except Exception as exc: print(f"delete_job DB error: {exc}") raise HTTPException(status_code=500, detail="Database error.") if not row: raise HTTPException(status_code=404, detail="Job not found.") # Best-effort removal of the stored image file try: if row["image_path"] and os.path.isfile(row["image_path"]): os.remove(row["image_path"]) except Exception: pass return JSONResponse({"deleted": job_id}) if __name__ == "__main__": host = env_config("API_HOST", default="0.0.0.0") port = env_config("API_PORT", default=8000, cast=int) uvicorn.run(app, host=host, port=port)