From 3dac0741b18afc934c063e8528e7576f4d63efe5 Mon Sep 17 00:00:00 2001 From: Ray Dumasia Date: Tue, 31 Mar 2026 09:01:52 +0100 Subject: [PATCH] Fix RCE vulnerability and harden security - Replace eval() with ast.literal_eval() in pdf_utils.py to fix unauthenticated remote code execution via crafted PDF uploads (reported by OX Security) - Sanitize HTML output with DOMPurify to prevent XSS - Restrict CORS origins (configurable via CORS_ORIGINS env var) - Suppress raw exception details in API error responses - Cap Image.MAX_IMAGE_PIXELS to prevent decompression bomb DoS - Add security regression test suite Co-Authored-By: Claude Opus 4.6 --- .env.example | 3 + backend/main.py | 14 ++- backend/pdf_utils.py | 7 +- backend/test_security.py | 150 ++++++++++++++++++++++++ frontend/package.json | 1 + frontend/src/components/ResultPanel.jsx | 3 +- 6 files changed, 169 insertions(+), 9 deletions(-) create mode 100644 backend/test_security.py diff --git a/.env.example b/.env.example index 3d6887a..4e21839 100644 --- a/.env.example +++ b/.env.example @@ -11,6 +11,9 @@ FRONTEND_PORT=3000 MODEL_NAME=deepseek-ai/DeepSeek-OCR HF_HOME=/models +# CORS Configuration (comma-separated origins, defaults to http://localhost:3000) +CORS_ORIGINS=http://localhost:3000 + # Upload Configuration MAX_UPLOAD_SIZE_MB=100 diff --git a/backend/main.py b/backend/main.py index de1cf1f..7283956 100644 --- a/backend/main.py +++ b/backend/main.py @@ -86,11 +86,14 @@ app = FastAPI( ) # CORS middleware for React frontend +CORS_ORIGINS = env_config("CORS_ORIGINS", default="").split(",") +CORS_ORIGINS = [o.strip() for o in CORS_ORIGINS if o.strip()] + app.add_middleware( CORSMiddleware, - allow_origins=["*"], + allow_origins=CORS_ORIGINS if CORS_ORIGINS else ["http://localhost:3000"], allow_credentials=True, - allow_methods=["*"], + allow_methods=["GET", "POST"], allow_headers=["*"], ) @@ -373,7 +376,8 @@ async def ocr_inference( }) except Exception as e: - raise HTTPException(status_code=500, detail=f"{type(e).__name__}: {str(e)}") + print(f"OCR inference error: {type(e).__name__}: {str(e)}") + raise HTTPException(status_code=500, detail="An internal error occurred during OCR processing.") finally: if tmp_img: @@ -573,9 +577,9 @@ async def process_pdf( except Exception as e: import traceback - print(f"❌ Error processing PDF: {e}") + print(f"Error processing PDF: {e}") print(traceback.format_exc()) - raise HTTPException(status_code=500, detail=f"{type(e).__name__}: {str(e)}") + raise HTTPException(status_code=500, detail="An internal error occurred during PDF processing.") if __name__ == "__main__": host = env_config("API_HOST", default="0.0.0.0") diff --git a/backend/pdf_utils.py b/backend/pdf_utils.py index 0b9cace..8e9567a 100644 --- a/backend/pdf_utils.py +++ b/backend/pdf_utils.py @@ -3,6 +3,7 @@ PDF Processing Utilities for DeepSeek OCR Handles PDF to image conversion and batch processing """ +import ast import io import re from typing import List, Tuple, Dict, Any @@ -39,8 +40,8 @@ def pdf_to_images_high_quality(pdf_bytes: bytes, dpi: int = 144) -> List[Image.I # Render page to pixmap pixmap = page.get_pixmap(matrix=matrix, alpha=False) - # Allow large images - Image.MAX_IMAGE_PIXELS = None + # Allow reasonably large images (200 megapixels) but not decompression bombs + Image.MAX_IMAGE_PIXELS = 200_000_000 # Convert to PIL Image img_data = pixmap.tobytes("png") @@ -130,7 +131,7 @@ def parse_coordinates(ref_text: Tuple, image_width: int, image_height: int) -> D """ try: label_type = ref_text[1] - cor_list = eval(ref_text[2]) + cor_list = ast.literal_eval(ref_text[2]) # Scale coordinates from 0-999 to actual pixels scaled_boxes = [] diff --git a/backend/test_security.py b/backend/test_security.py new file mode 100644 index 0000000..e4322ac --- /dev/null +++ b/backend/test_security.py @@ -0,0 +1,150 @@ +""" +Security regression tests for the eval() RCE vulnerability (OX Security disclosure). + +The vulnerability allowed arbitrary code execution via crafted OCR output +that was passed to eval() in parse_coordinates(). The fix uses ast.literal_eval() +which only allows literal data structures. + +This test is self-contained and does not require backend dependencies. + +Run: python test_security.py +""" + +import ast + + +def parse_coordinates(ref_text, image_width, image_height): + """ + Minimal reproduction of pdf_utils.parse_coordinates using the patched code. + This mirrors the fixed version that uses ast.literal_eval() instead of eval(). + """ + try: + label_type = ref_text[1] + cor_list = ast.literal_eval(ref_text[2]) + + scaled_boxes = [] + for points in cor_list: + x1, y1, x2, y2 = points + scaled_box = [ + int(x1 / 999 * image_width), + int(y1 / 999 * image_height), + int(x2 / 999 * image_width), + int(y2 / 999 * image_height) + ] + scaled_boxes.append(scaled_box) + + return { + 'label': label_type, + 'boxes': scaled_boxes + } + except Exception as e: + print(f" [Blocked] {type(e).__name__}: {e}") + return None + + +def test_legitimate_coordinates(): + """Verify that normal coordinate parsing still works.""" + ref_text = ("full_match", "text", "[[312, 339, 480, 681]]") + result = parse_coordinates(ref_text, 1000, 1000) + + assert result is not None, "Legitimate coordinates should parse successfully" + assert result['label'] == 'text' + assert len(result['boxes']) == 1 + print("PASS: Legitimate coordinates parse correctly") + + +def test_multiple_boxes(): + """Verify multiple bounding boxes still work.""" + ref_text = ("full_match", "image", "[[100, 200, 300, 400], [500, 600, 700, 800]]") + result = parse_coordinates(ref_text, 1000, 1000) + + assert result is not None, "Multiple boxes should parse successfully" + assert len(result['boxes']) == 2 + print("PASS: Multiple bounding boxes parse correctly") + + +def test_rce_blocked_import_os(): + """The original exploit: __import__('os').system('...') must be blocked.""" + malicious = "__import__('os').system('echo HACKED')" + ref_text = ("full_match", "exploit", malicious) + result = parse_coordinates(ref_text, 1000, 1000) + + assert result is None, "Code execution payload should be rejected" + print("PASS: __import__('os').system() payload is blocked") + + +def test_rce_blocked_exec(): + """exec() based payloads must be blocked.""" + malicious = "exec('import os; os.system(\"echo HACKED\")')" + ref_text = ("full_match", "exploit", malicious) + result = parse_coordinates(ref_text, 1000, 1000) + + assert result is None, "exec() payload should be rejected" + print("PASS: exec() payload is blocked") + + +def test_rce_blocked_eval(): + """Nested eval() payloads must be blocked.""" + malicious = "eval('__import__(\"os\").popen(\"id\").read()')" + ref_text = ("full_match", "exploit", malicious) + result = parse_coordinates(ref_text, 1000, 1000) + + assert result is None, "Nested eval() payload should be rejected" + print("PASS: Nested eval() payload is blocked") + + +def test_rce_blocked_lambda(): + """Lambda-based payloads must be blocked.""" + malicious = "(lambda: __import__('os').system('echo HACKED'))()" + ref_text = ("full_match", "exploit", malicious) + result = parse_coordinates(ref_text, 1000, 1000) + + assert result is None, "Lambda payload should be rejected" + print("PASS: Lambda payload is blocked") + + +def test_rce_blocked_comprehension(): + """List comprehension code execution must be blocked.""" + malicious = "[__import__('os').system('echo HACKED') for x in [1]]" + ref_text = ("full_match", "exploit", malicious) + result = parse_coordinates(ref_text, 1000, 1000) + + assert result is None, "List comprehension payload should be rejected" + print("PASS: List comprehension payload is blocked") + + +if __name__ == "__main__": + print("=" * 60) + print("Security Regression Tests (OX Security RCE disclosure)") + print("=" * 60) + print() + + tests = [ + test_legitimate_coordinates, + test_multiple_boxes, + test_rce_blocked_import_os, + test_rce_blocked_exec, + test_rce_blocked_eval, + test_rce_blocked_lambda, + test_rce_blocked_comprehension, + ] + + passed = 0 + failed = 0 + for test in tests: + try: + test() + passed += 1 + except AssertionError as e: + print(f"FAIL: {test.__name__}: {e}") + failed += 1 + except Exception as e: + print(f"ERROR: {test.__name__}: {e}") + failed += 1 + + print() + print(f"Results: {passed} passed, {failed} failed out of {len(tests)} tests") + if failed == 0: + print("All security tests passed - RCE vulnerability is patched.") + else: + print("WARNING: Some tests failed!") diff --git a/frontend/package.json b/frontend/package.json index bdbd22d..7d7a1be 100644 --- a/frontend/package.json +++ b/frontend/package.json @@ -10,6 +10,7 @@ }, "dependencies": { "axios": "^1.6.5", + "dompurify": "^3.3.3", "framer-motion": "^11.0.0", "lucide-react": "^0.344.0", "react": "^18.3.1", diff --git a/frontend/src/components/ResultPanel.jsx b/frontend/src/components/ResultPanel.jsx index 2b7c156..042ebf6 100644 --- a/frontend/src/components/ResultPanel.jsx +++ b/frontend/src/components/ResultPanel.jsx @@ -2,6 +2,7 @@ import { useEffect, useRef, useState, useCallback } from 'react' import { motion, AnimatePresence } from 'framer-motion' import { Copy, Download, Sparkles, Loader2, CheckCircle2, ChevronDown } from 'lucide-react' import ReactMarkdown from 'react-markdown' +import DOMPurify from 'dompurify' export default function ResultPanel({ result, loading, imagePreview, onCopy, onDownload }) { const canvasRef = useRef(null) @@ -230,7 +231,7 @@ export default function ResultPanel({ result, loading, imagePreview, onCopy, onD {isHTML ? (