import os
import random
import uuid
import json
import time
import asyncio
from threading import Thread
from typing import Iterable
import gradio as gr
import spaces
import torch
from PIL import Image, ImageOps
import requests
from transformers import (
Qwen2VLForConditionalGeneration,
Qwen2_5_VLForConditionalGeneration,
AutoModelForCausalLM,
AutoModelForVision2Seq,
AutoProcessor,
TextIteratorStreamer,
)
from transformers.image_utils import load_image
from gradio.themes import Soft
from gradio.themes.utils import colors, fonts, sizes
from docling_core.types.doc import DoclingDocument, DocTagsDocument
import re
import ast
import html
colors.steel_blue = colors.Color(
name="steel_blue",
c50="#EBF3F8",
c100="#D3E5F0",
c200="#A8CCE1",
c300="#7DB3D2",
c400="#529AC3",
c500="#4682B4",
c600="#3E72A0",
c700="#36638C",
c800="#2E5378",
c900="#264364",
c950="#1E3450",
)
class SteelBlueTheme(Soft):
def __init__(
self,
*,
primary_hue: colors.Color | str = colors.gray,
secondary_hue: colors.Color | str = colors.steel_blue,
neutral_hue: colors.Color | str = colors.slate,
text_size: sizes.Size | str = sizes.text_lg,
font: fonts.Font | str | Iterable[fonts.Font | str] = (
fonts.GoogleFont("Outfit"), "Arial", "sans-serif",
),
font_mono: fonts.Font | str | Iterable[fonts.Font | str] = (
fonts.GoogleFont("IBM Plex Mono"), "ui-monospace", "monospace",
),
):
super().__init__(
primary_hue=primary_hue,
secondary_hue=secondary_hue,
neutral_hue=neutral_hue,
text_size=text_size,
font=font,
font_mono=font_mono,
)
super().set(
background_fill_primary="*primary_50",
background_fill_primary_dark="*primary_900",
body_background_fill="linear-gradient(135deg, *primary_200, *primary_100)",
body_background_fill_dark="linear-gradient(135deg, *primary_900, *primary_800)",
button_primary_text_color="white",
button_primary_text_color_hover="white",
button_primary_background_fill="linear-gradient(90deg, *secondary_500, *secondary_600)",
button_primary_background_fill_hover="linear-gradient(90deg, *secondary_600, *secondary_700)",
button_primary_background_fill_dark="linear-gradient(90deg, *secondary_600, *secondary_700)",
button_primary_background_fill_hover_dark="linear-gradient(90deg, *secondary_500, *secondary_600)",
slider_color="*secondary_500",
slider_color_dark="*secondary_600",
block_title_text_weight="600",
block_border_width="3px",
block_shadow="*shadow_drop_lg",
button_primary_shadow="*shadow_drop_lg",
button_large_padding="11px",
color_accent_soft="*primary_100",
block_label_background_fill="*primary_200",
)
steel_blue_theme = SteelBlueTheme()
css = """
#main-title h1 {
font-size: 2.3em !important;
}
#output-title h2 {
font-size: 2.1em !important;
}
/* RadioAnimated Styles */
.ra-wrap{ width: fit-content; }
.ra-inner{
position: relative; display: inline-flex; align-items: center; gap: 0; padding: 6px;
background: var(--neutral-200); border-radius: 9999px; overflow: hidden;
}
.ra-input{ display: none; }
.ra-label{
position: relative; z-index: 2; padding: 8px 16px;
font-family: inherit; font-size: 14px; font-weight: 600;
color: var(--neutral-500); cursor: pointer; transition: color 0.2s; white-space: nowrap;
}
.ra-highlight{
position: absolute; z-index: 1; top: 6px; left: 6px;
height: calc(100% - 12px); border-radius: 9999px;
background: white; box-shadow: 0 2px 4px rgba(0,0,0,0.1);
transition: transform 0.2s, width 0.2s;
}
.ra-input:checked + .ra-label{ color: black; }
/* Dark mode adjustments for Radio */
.dark .ra-inner { background: var(--neutral-800); }
.dark .ra-label { color: var(--neutral-400); }
.dark .ra-highlight { background: var(--neutral-600); }
.dark .ra-input:checked + .ra-label { color: white; }
#gpu-duration-container {
padding: 10px;
border-radius: 8px;
background: var(--background-fill-secondary);
border: 1px solid var(--border-color-primary);
margin-top: 10px;
}
"""
MAX_MAX_NEW_TOKENS = 4096
DEFAULT_MAX_NEW_TOKENS = 2048
MAX_INPUT_TOKEN_LENGTH = int(os.getenv("MAX_INPUT_TOKEN_LENGTH", "4096"))
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
class RadioAnimated(gr.HTML):
def __init__(self, choices, value=None, **kwargs):
if not choices or len(choices) < 2:
raise ValueError("RadioAnimated requires at least 2 choices.")
if value is None:
value = choices[0]
uid = uuid.uuid4().hex[:8]
group_name = f"ra-{uid}"
inputs_html = "\n".join(
f"""
"""
for i, c in enumerate(choices)
)
html_template = f"""
"""
js_on_load = r"""
(() => {
const wrap = element.querySelector('.ra-wrap');
const inner = element.querySelector('.ra-inner');
const highlight = element.querySelector('.ra-highlight');
const inputs = Array.from(element.querySelectorAll('.ra-input'));
if (!inputs.length) return;
const choices = inputs.map(i => i.value);
function setHighlightByIndex(idx) {
const n = choices.length;
const pct = 100 / n;
highlight.style.width = `calc(${pct}% - 6px)`;
highlight.style.transform = `translateX(${idx * 100}%)`;
}
function setCheckedByValue(val, shouldTrigger=false) {
const idx = Math.max(0, choices.indexOf(val));
inputs.forEach((inp, i) => { inp.checked = (i === idx); });
setHighlightByIndex(idx);
props.value = choices[idx];
if (shouldTrigger) trigger('change', props.value);
}
setCheckedByValue(props.value ?? choices[0], false);
inputs.forEach((inp) => {
inp.addEventListener('change', () => {
setCheckedByValue(inp.value, true);
});
});
})();
"""
super().__init__(
value=value,
html_template=html_template,
js_on_load=js_on_load,
**kwargs
)
def apply_gpu_duration(val: str):
return int(val)
MODEL_ID_M = "nanonets/Nanonets-OCR-s"
processor_m = AutoProcessor.from_pretrained(MODEL_ID_M, trust_remote_code=True)
model_m = Qwen2_5_VLForConditionalGeneration.from_pretrained(
MODEL_ID_M,
attn_implementation="kernels-community/flash-attn2",
trust_remote_code=True,
torch_dtype=torch.float16
).to(device).eval()
MODEL_ID_G = "echo840/MonkeyOCR"
SUBFOLDER = "Recognition"
processor_g = AutoProcessor.from_pretrained(
MODEL_ID_G,
trust_remote_code=True,
subfolder=SUBFOLDER
)
model_g = Qwen2_5_VLForConditionalGeneration.from_pretrained(
MODEL_ID_G,
attn_implementation="kernels-community/flash-attn2",
trust_remote_code=True,
subfolder=SUBFOLDER,
torch_dtype=torch.float16
).to(device).eval()
MODEL_ID_L = "scb10x/typhoon-ocr-7b"
processor_l = AutoProcessor.from_pretrained(MODEL_ID_L, trust_remote_code=True)
model_l = Qwen2_5_VLForConditionalGeneration.from_pretrained(
MODEL_ID_L,
attn_implementation="kernels-community/flash-attn2",
trust_remote_code=True,
torch_dtype=torch.float16
).to(device).eval()
MODEL_ID_X = "ds4sd/SmolDocling-256M-preview"
processor_x = AutoProcessor.from_pretrained(MODEL_ID_X, trust_remote_code=True)
model_x = AutoModelForVision2Seq.from_pretrained(
MODEL_ID_X,
trust_remote_code=True,
torch_dtype=torch.float16
).to(device).eval()
MODEL_ID_N = "Kwai-Keye/Thyme-RL"
processor_n = AutoProcessor.from_pretrained(MODEL_ID_N, trust_remote_code=True)
model_n = Qwen2_5_VLForConditionalGeneration.from_pretrained(
MODEL_ID_N,
attn_implementation="kernels-community/flash-attn2",
trust_remote_code=True,
torch_dtype=torch.float16
).to(device).eval()
def add_random_padding(image, min_percent=0.1, max_percent=0.10):
"""Add random padding to an image based on its size."""
image = image.convert("RGB")
width, height = image.size
pad_w_percent = random.uniform(min_percent, max_percent)
pad_h_percent = random.uniform(min_percent, max_percent)
pad_w = int(width * pad_w_percent)
pad_h = int(height * pad_h_percent)
corner_pixel = image.getpixel((0, 0))
padded_image = ImageOps.expand(image, border=(pad_w, pad_h, pad_w, pad_h), fill=corner_pixel)
return padded_image
def normalize_values(text, target_max=500):
"""Normalize numerical values in text to a target maximum."""
def normalize_list(values):
max_value = max(values) if values else 1
return [round((v / max_value) * target_max) for v in values]
def process_match(match):
num_list = ast.literal_eval(match.group(0))
normalized = normalize_list(num_list)
return "".join([f"" for num in normalized])
pattern = r"\[([\d\.\s,]+)\]"
normalized_text = re.sub(pattern, process_match, text)
return normalized_text
def calc_timeout_image(model_name: str, text: str, image: Image.Image,
max_new_tokens: int, temperature: float, top_p: float,
top_k: int, repetition_penalty: float, gpu_timeout: int):
"""Calculate GPU timeout duration for image inference."""
try:
return int(gpu_timeout)
except:
return 60
@spaces.GPU(duration=calc_timeout_image)
def generate_image(model_name: str, text: str, image: Image.Image,
max_new_tokens: int = 1024,
temperature: float = 0.6,
top_p: float = 0.9,
top_k: int = 50,
repetition_penalty: float = 1.2,
gpu_timeout: int = 60):
"""Generate responses for image input using the selected model."""
if model_name == "Nanonets-OCR-s":
processor, model = processor_m, model_m
elif model_name == "MonkeyOCR-Recognition":
processor, model = processor_g, model_g
elif model_name == "SmolDocling-256M-preview":
processor, model = processor_x, model_x
elif model_name == "Typhoon-OCR-7B":
processor, model = processor_l, model_l
elif model_name == "Thyme-RL":
processor, model = processor_n, model_n
else:
yield "Invalid model selected.", "Invalid model selected."
return
if image is None:
yield "Please upload an image.", "Please upload an image."
return
images = [image]
if model_name == "SmolDocling-256M-preview":
if "OTSL" in text or "code" in text:
images = [add_random_padding(img) for img in images]
if "OCR at text at" in text or "Identify element" in text or "formula" in text:
text = normalize_values(text, target_max=500)
messages = [
{
"role": "user",
"content": [{"type": "image"} for _ in images] + [
{"type": "text", "text": text}
]
}
]
prompt = processor.apply_chat_template(messages, add_generation_prompt=True)
inputs = processor(text=prompt, images=images, return_tensors="pt").to(device)
streamer = TextIteratorStreamer(processor, skip_prompt=True, skip_special_tokens=True)
generation_kwargs = {
**inputs,
"streamer": streamer,
"max_new_tokens": max_new_tokens,
"temperature": temperature,
"top_p": top_p,
"top_k": top_k,
"repetition_penalty": repetition_penalty,
}
thread = Thread(target=model.generate, kwargs=generation_kwargs)
thread.start()
buffer = ""
for new_text in streamer:
buffer += new_text.replace("<|im_end|>", "")
yield buffer, buffer
if model_name == "SmolDocling-256M-preview":
cleaned_output = buffer.replace("", "").strip()
if any(tag in cleaned_output for tag in ["", "", "", "", ""]):
if "" in cleaned_output:
cleaned_output = cleaned_output.replace("", "").replace("", "")
cleaned_output = re.sub(r'()(?!.*)<[^>]+>', r'\1', cleaned_output)
doctags_doc = DocTagsDocument.from_doctags_and_image_pairs([cleaned_output], images)
doc = DoclingDocument.load_from_doctags(doctags_doc, document_name="Document")
markdown_output = doc.export_to_markdown()
yield buffer, markdown_output
else:
yield buffer, cleaned_output
image_examples = [
["Perform OCR on the image precisely.", "examples/5.jpg"],
["Run OCR on the image and ensure high accuracy.", "examples/4.jpg"],
["Conduct OCR on the image with exact text recognition.", "examples/2.jpg"],
["Perform precise OCR extraction on the image.", "examples/1.jpg"],
["Convert this page to docling", "examples/3.jpg"],
]
with gr.Blocks() as demo:
gr.Markdown("# **Multimodal OCR2**", elem_id="main-title")
with gr.Row():
with gr.Column(scale=2):
image_query = gr.Textbox(label="Query Input", placeholder="Enter your query here...")
image_upload = gr.Image(type="pil", label="Upload Image", height=290)
image_submit = gr.Button("Submit", variant="primary")
gr.Examples(examples=image_examples, inputs=[image_query, image_upload])
with gr.Accordion("Advanced options", open=False):
max_new_tokens = gr.Slider(label="Max new tokens", minimum=1, maximum=MAX_MAX_NEW_TOKENS, step=1, value=DEFAULT_MAX_NEW_TOKENS)
temperature = gr.Slider(label="Temperature", minimum=0.1, maximum=4.0, step=0.1, value=0.6)
top_p = gr.Slider(label="Top-p (nucleus sampling)", minimum=0.05, maximum=1.0, step=0.05, value=0.9)
top_k = gr.Slider(label="Top-k", minimum=1, maximum=1000, step=1, value=50)
repetition_penalty = gr.Slider(label="Repetition penalty", minimum=1.0, maximum=2.0, step=0.05, value=1.2)
with gr.Column(scale=3):
gr.Markdown("## Output", elem_id="output-title")
raw_output = gr.Textbox(label="Raw Output Stream", interactive=True, lines=11)
with gr.Accordion("(Result.md)", open=False):
formatted_output = gr.Markdown(label="(Result.md)")
model_choice = gr.Radio(
choices=["Nanonets-OCR-s", "MonkeyOCR-Recognition", "Thyme-RL", "Typhoon-OCR-7B", "SmolDocling-256M-preview"],
label="Select Model",
value="Nanonets-OCR-s"
)
with gr.Row(elem_id="gpu-duration-container"):
with gr.Column():
gr.Markdown("**GPU Duration (seconds)**")
radioanimated_gpu_duration = RadioAnimated(
choices=["60", "90", "120", "180", "240", "300"],
value="60",
elem_id="radioanimated_gpu_duration"
)
gpu_duration_state = gr.Number(value=60, visible=False)
gr.Markdown("*Note: Higher GPU duration allows for longer processing but consumes more GPU quota.*")
radioanimated_gpu_duration.change(
fn=apply_gpu_duration,
inputs=radioanimated_gpu_duration,
outputs=[gpu_duration_state],
api_visibility="private"
)
image_submit.click(
fn=generate_image,
inputs=[model_choice, image_query, image_upload, max_new_tokens, temperature, top_p, top_k, repetition_penalty, gpu_duration_state],
outputs=[raw_output, formatted_output]
)
if __name__ == "__main__":
demo.queue(max_size=50).launch(css=css, theme=steel_blue_theme, mcp_server=True, ssr_mode=False)