| |
| |
| |
| |
| |
| |
| |
| |
|
|
| import re |
| import json |
| from typing import List, Dict, Any, Tuple |
| import copy |
|
|
| import numpy as np |
| import streamlit as st |
| import torch |
| from transformers import pipeline |
| from sentence_transformers import SentenceTransformer, util |
|
|
| st.set_page_config(page_title="Invoice → JSON (Paste Text) · Accurate v2", layout="wide") |
| st.title("Invoice → JSON (Paste Text) — Accurate v2") |
|
|
| |
| SCHEMA_JSON: Dict[str, Any] = { |
| "invoice_header": { |
| "car_number": None, |
| "shipment_number": None, |
| "shipping_point": None, |
| "currency": None, |
| "invoice_number": None, |
| "invoice_date": None, |
| "order_number": None, |
| "customer_order_number": None, |
| "our_order_number": None, |
| "sales_order_number": None, |
| "purchase_order_number": None, |
| "order_date": None, |
| "supplier_name": None, |
| "supplier_address": None, |
| "supplier_phone": None, |
| "supplier_email": None, |
| "supplier_tax_id": None, |
| "customer_name": None, |
| "customer_address": None, |
| "customer_phone": None, |
| "customer_email": None, |
| "customer_tax_id": None, |
| "ship_to_name": None, |
| "ship_to_address": None, |
| "bill_to_name": None, |
| "bill_to_address": None, |
| "remit_to_name": None, |
| "remit_to_address": None, |
| "tax_id": None, |
| "tax_registration_number": None, |
| "vat_number": None, |
| "payment_terms": None, |
| "payment_method": None, |
| "payment_reference": None, |
| "bank_account_number": None, |
| "iban": None, |
| "swift_code": None, |
| "total_before_tax": None, |
| "tax_amount": None, |
| "tax_rate": None, |
| "shipping_charges": None, |
| "discount": None, |
| "total_due": None, |
| "amount_paid": None, |
| "balance_due": None, |
| "due_date": None, |
| "invoice_status": None, |
| "reference_number": None, |
| "project_code": None, |
| "department": None, |
| "contact_person": None, |
| "notes": None, |
| "additional_info": None |
| }, |
| "line_items": [ |
| { |
| "quantity": None, |
| "units": None, |
| "description": None, |
| "footage": None, |
| "price": None, |
| "amount": None, |
| "notes": None |
| } |
| ] |
| } |
| STATIC_HEADERS: List[str] = list(SCHEMA_JSON["invoice_header"].keys()) |
|
|
| |
| st.sidebar.header("Settings") |
| threshold = st.sidebar.slider("Semantic match threshold (cosine)", 0.0, 1.0, 0.60, 0.01) |
| max_new_tokens = st.sidebar.slider("Max new tokens (MD2JSON)", 128, 2048, 512, 32) |
| show_intermediates = st.sidebar.checkbox("Show intermediates", value=True) |
|
|
| |
| @st.cache_resource(show_spinner=True) |
| def load_models(): |
| sentence_model = SentenceTransformer("sentence-transformers/all-MiniLM-L6-v2") |
| json_converter = pipeline("text2text-generation", model="yahyakhoder/MD2JSON-T5-small-V1") |
| return sentence_model, json_converter |
| sentence_model, json_converter = load_models() |
|
|
| |
| SYN2KEY: Dict[str, str] = { |
| |
| "invoice no": "invoice_number", |
| "invoice number": "invoice_number", |
| "invoice#": "invoice_number", |
| "inv no": "invoice_number", |
| "inv#": "invoice_number", |
|
|
| "invoice date": "invoice_date", |
| "date of invoice": "invoice_date", |
|
|
| "po no": "purchase_order_number", |
| "po number": "purchase_order_number", |
| "purchase order": "purchase_order_number", |
| "order no": "order_number", |
| "order number": "order_number", |
| "sales order": "sales_order_number", |
| "customer order": "customer_order_number", |
| "our order": "our_order_number", |
|
|
| "due date": "due_date", |
| "date of supply": "order_date", |
|
|
| "gstin": "supplier_tax_id", |
| "gstin no": "supplier_tax_id", |
| "tax id": "tax_id", |
| "vat number": "vat_number", |
| "tax registration number": "tax_registration_number", |
|
|
| "place of supply": "shipping_point", |
| "state code": "additional_info", |
|
|
| "taxable value": "total_before_tax", |
| "total value": "total_due", |
| "total amount": "total_due", |
| "amount due": "total_due", |
|
|
| "bank": "bank_account_number", |
| "account no": "bank_account_number", |
| "account number": "bank_account_number", |
| "ifs code": "swift_code", |
| "ifsc": "payment_reference", |
| "swift code": "swift_code", |
| "iban": "iban", |
|
|
| "e-way bill no": "reference_number", |
| "eway bill": "reference_number", |
|
|
| "dispatched via": "additional_info", |
| "documents dispatched through": "additional_info", |
| "kind attn": "contact_person", |
|
|
| |
| "billed to": "bill_to_name", |
| "receiver": "bill_to_name", |
| "shipped to": "ship_to_name", |
| "consignee": "ship_to_name", |
| } |
|
|
| |
| def norm(s: str) -> str: |
| return re.sub(r"\s+", " ", s).strip() |
|
|
| def to_lower(s: str) -> str: |
| return s.lower().strip() |
|
|
| def deep_copy_schema() -> Dict[str, Any]: |
| return json.loads(json.dumps(SCHEMA_JSON)) |
|
|
| |
| def extract_candidates(text: str) -> Dict[str, str]: |
| """ |
| Build candidates from: |
| 1) colon lines: Key: Value |
| 2) pipe rows: | ... | ... | (pick obvious key:value pairs like "Invoice No: X" inside cells) |
| 3) single-value lines for totals (Taxable Value, Total, etc.) |
| """ |
| cands: Dict[str, str] = {} |
|
|
| |
| for raw in text.splitlines(): |
| line = raw.strip().strip("|").strip() |
| if not line: |
| continue |
| if ":" in line: |
| |
| if "|" in raw: |
| parts = [p.strip() for p in raw.split("|") if p.strip()] |
| for cell in parts: |
| if ":" in cell: |
| k, v = cell.split(":", 1) |
| cands[norm(k)] = norm(v) |
| else: |
| k, v = line.split(":", 1) |
| cands[norm(k)] = norm(v) |
|
|
| |
| for raw in text.splitlines(): |
| if "|" in raw and ":" not in raw: |
| parts = [p.strip() for p in raw.split("|") if p.strip() and not set(p.strip()) <= set("-")] |
| |
| |
|
|
| |
| for raw in text.splitlines(): |
| m = re.search(r"\b(Taxable\s+Value|Total\s+Value|Total\s+Amount|Amount\s+Due)\b[:\s]*([0-9][0-9,]*(?:\.[0-9]{2})?)", raw, re.I) |
| if m: |
| k = norm(m.group(1)) |
| v = norm(m.group(2)) |
| cands[k] = v |
|
|
| return cands |
|
|
| |
| def regex_extract_all(text: str) -> Dict[str, str]: |
| out: Dict[str, str] = {} |
|
|
| |
| m = re.search(r"\bInvoice\s*(?:No\.?|Number|#)\s*[:\-]?\s*([A-Z0-9\-\/]+)", text, re.I) |
| if m: out["invoice_number"] = m.group(1) |
|
|
| |
| m = re.search(r"\bInvoice\s*Date\s*[:\-]?\s*([0-9]{1,2}[-/][0-9]{1,2}[-/][0-9]{2,4})", text, re.I) |
| if m: out["invoice_date"] = m.group(1) |
|
|
| |
| m = re.search(r"\bPO\s*(?:No\.?|Number)?\s*[:\-]?\s*([A-Z0-9\-\/]+)", text, re.I) |
| if m: out["purchase_order_number"] = m.group(1) |
| m = re.search(r"\bPO\s*Date\s*[:\-]?\s*([0-9]{1,2}[-/][0-9]{1,2}[-/][0-9]{2,4})", text, re.I) |
| if m: out["order_date"] = m.group(1) |
|
|
| |
| if "order_date" not in out: |
| m = re.search(r"\bDate\s*of\s*Supply\s*[:\-]?\s*([0-9]{1,2}[-/][0-9]{1,2}[-/][0-9]{2,4})", text, re.I) |
| if m: out["order_date"] = m.group(1) |
|
|
| |
| m = re.search(r"\bPlace\s*of\s*Supply\s*[:\-]?\s*([A-Za-z0-9 ,\-\(\)]+)", text, re.I) |
| if m: out["shipping_point"] = m.group(1).strip(" |") |
|
|
| |
| m = re.search(r"\bGSTIN\s*(?:No\.?)?\s*[:\-]?\s*([A-Z0-9]{15})", text, re.I) |
| if m: out["supplier_tax_id"] = m.group(1) |
|
|
| |
| m = re.search(r"\bTaxable\s*Value\s*[:\-]?\s*([0-9][0-9,]*(?:\.[0-9]{2})?)", text, re.I) |
| if m: out["total_before_tax"] = m.group(1).replace(",", "") |
|
|
| |
| cgst = re.search(r"\bCGST\s*Value\s*[:\-]?\s*([0-9][0-9,]*(?:\.[0-9]{2})?)", text, re.I) |
| sgst = re.search(r"\bSGST\s*Value\s*[:\-]?\s*([0-9][0-9,]*(?:\.[0-9]{2})?)", text, re.I) |
| if cgst and sgst: |
| try: |
| tax_total = float(cgst.group(1).replace(",", "")) + float(sgst.group(1).replace(",", "")) |
| out["tax_amount"] = f"{tax_total:.2f}" |
| |
| cgstp = re.search(r"\bCGST\s*%?\s*[:\-]?\s*([0-9]+(?:\.[0-9]+)?)", text, re.I) |
| sgstp = re.search(r"\bSGST\s*%?\s*[:\-]?\s*([0-9]+(?:\.[0-9]+)?)", text, re.I) |
| if cgstp and sgstp: |
| try: |
| rate = float(cgstp.group(1)) + float(sgstp.group(1)) |
| out["tax_rate"] = f"{rate:g}" |
| except: |
| pass |
| except: |
| pass |
|
|
| |
| m = re.search(r"\bE[-\s]?Way\s*bill\s*no\.?\s*[:\-]?\s*([0-9 ]+)", text, re.I) |
| if m: out["reference_number"] = m.group(1).strip() |
|
|
| return out |
|
|
| |
| def extract_bank_block(text: str) -> Dict[str, str]: |
| bank: Dict[str, str] = {} |
| |
| m = re.search(r"\bAccount\s*Name\s*:\s*(.+)", text, re.I) |
| if m: bank["supplier_name"] = m.group(1).strip() |
|
|
| |
| m = re.search(r"\bAccount\s*(?:No|Number)\s*:\s*([A-Za-z0-9\- ]+)", text, re.I) |
| if m: bank["bank_account_number"] = m.group(1).strip() |
|
|
| |
| m = re.search(r"\bBank\s*:\s*([A-Za-z0-9 ,\-\(\)&]+)", text, re.I) |
| if m: |
| |
| bank["additional_info"] = ("Bank: " + m.group(1).strip()) |
|
|
| |
| m = re.search(r"\bIFSC?\s*Code\s*:\s*([A-Za-z0-9]+)", text, re.I) |
| if m: bank["payment_reference"] = m.group(1).strip() |
|
|
| |
| m = re.search(r"\bSWIFT\s*Code\s*:\s*([A-Za-z0-9]+)", text, re.I) |
| if m: bank["swift_code"] = m.group(1).strip() |
|
|
| |
| branch = re.search(r"\bBranch\s*:\s*(.+)", text, re.I) |
| micr = re.search(r"\bMICR\s*Code\s*:\s*([0-9]+)", text, re.I) |
| extra_bits = [] |
| if branch: extra_bits.append("Branch: " + branch.group(1).strip()) |
| if micr: extra_bits.append("MICR: " + micr.group(1).strip()) |
| if extra_bits: |
| bank["additional_info"] = ((bank.get("additional_info") + " | ") if bank.get("additional_info") else "") + " | ".join(extra_bits) |
| return bank |
|
|
| |
| def parse_line_items(text: str) -> List[Dict[str, Any]]: |
| """ |
| Parse a classic table with header like: |
| | SNO | Description | HSN/SAC | Qty | UOM | Rate | ... | Total Value | |
| """ |
| items: List[Dict[str, Any]] = [] |
| lines = [ln for ln in text.splitlines() if ln.strip()] |
| |
| header_idx = -1 |
| for i, ln in enumerate(lines): |
| if ("|") in ln and ("Description" in ln and ("Qty" in ln or "QTY" in ln)) and ("Rate" in ln or "Price" in ln) and ("Total" in ln): |
| header_idx = i |
| break |
| if header_idx == -1: |
| return items |
|
|
| |
| headers = [c.strip().lower() for c in lines[header_idx].split("|")] |
| |
| headers = [h for h in headers if h and set(h) - set("-")] |
|
|
| |
| for j in range(header_idx + 1, len(lines)): |
| row = lines[j] |
| if row.strip().startswith("|") and row.count("|") >= 2: |
| cells = [c.strip() for c in row.split("|")] |
| cells = [c for c in cells if c and set(c) - set("-")] |
| if len(cells) < 3: |
| continue |
| |
| rowd = {"quantity": None, "units": None, "description": None, "footage": None, "price": None, "amount": None, "notes": None} |
| |
| def idx_of(name_parts: List[str]) -> int: |
| for k, h in enumerate(headers): |
| if any(p in h for p in name_parts): |
| return k |
| return -1 |
| i_desc = idx_of(["description", "item"]) |
| i_qty = idx_of(["qty", "quantity"]) |
| i_uom = idx_of(["uom", "unit"]) |
| i_rate = idx_of(["rate", "price"]) |
| i_amt = idx_of(["total value", "amount", "total"]) |
|
|
| |
| def safe(i: int) -> str: |
| return cells[i] if 0 <= i < len(cells) else "" |
|
|
| if i_desc != -1: rowd["description"] = safe(i_desc) or None |
| if i_qty != -1: rowd["quantity"] = safe(i_qty) or None |
| if i_uom != -1: rowd["units"] = safe(i_uom) or None |
| if i_rate != -1: rowd["price"] = safe(i_rate) or None |
| if i_amt != -1: rowd["amount"] = safe(i_amt) or None |
|
|
| |
| if rowd["units"] and rowd["quantity"]: |
| rowd["footage"] = f'{rowd["quantity"]} {rowd["units"]}' |
| items.append(rowd) |
| else: |
| |
| if j > header_idx + 1: |
| break |
| return items |
|
|
| |
| def semantic_map_candidates(candidates: Dict[str, str], static_headers: List[str], thresh: float) -> Dict[str, str]: |
| if not candidates: |
| return {} |
| cand_keys = list(candidates.keys()) |
| |
| mapped: Dict[str, str] = {} |
| leftovers: Dict[str, str] = {} |
| for k, v in candidates.items(): |
| lk = k.lower() |
| lk_norm = re.sub(r"[^a-z0-9]+", " ", lk).strip() |
| hit = None |
| for syn, key in SYN2KEY.items(): |
| if syn in lk_norm: |
| hit = key |
| break |
| if hit: |
| mapped[hit] = v |
| else: |
| leftovers[k] = v |
|
|
| if leftovers: |
| cand_emb = sentence_model.encode(list(leftovers.keys()), normalize_embeddings=True) |
| head_emb = sentence_model.encode(static_headers, normalize_embeddings=True) |
| M = util.cos_sim(torch.tensor(cand_emb), torch.tensor(head_emb)).cpu().numpy() |
| keys_left = list(leftovers.keys()) |
| for i, ck in enumerate(keys_left): |
| j = int(np.argmax(M[i])) |
| score = float(M[i][j]) |
| if score >= thresh: |
| mapped[static_headers[j]] = leftovers[ck] |
| return mapped |
|
|
| |
| def build_prompt(invoice_text: str, mapped_hints: Dict[str, str], items_hints: List[Dict[str, Any]]) -> str: |
| instruction = ( |
| 'Use this schema:\n' |
| '{\n' |
| ' "invoice_header": {\n' |
| ' "car_number": "string or null",\n' |
| ' "shipment_number": "string or null",\n' |
| ' "shipping_point": "string or null",\n' |
| ' "currency": "string or null",\n' |
| ' "invoice_number": "string or null",\n' |
| ' "invoice_date": "string or null",\n' |
| ' "order_number": "string or null",\n' |
| ' "customer_order_number": "string or null",\n' |
| ' "our_order_number": "string or null",\n' |
| ' "sales_order_number": "string or null",\n' |
| ' "purchase_order_number": "string or null",\n' |
| ' "order_date": "string or null",\n' |
| ' "supplier_name": "string or null",\n' |
| ' "supplier_address": "string or null",\n' |
| ' "supplier_phone": "string or null",\n' |
| ' "supplier_email": "string or null",\n' |
| ' "supplier_tax_id": "string or null",\n' |
| ' "customer_name": "string or null",\n' |
| ' "customer_address": "string or null",\n' |
| ' "customer_phone": "string or null",\n' |
| ' "customer_email": "string or null",\n' |
| ' "customer_tax_id": "string or null",\n' |
| ' "ship_to_name": "string or null",\n' |
| ' "ship_to_address": "string or null",\n' |
| ' "bill_to_name": "string or null",\n' |
| ' "bill_to_address": "string or null",\n' |
| ' "remit_to_name": "string or null",\n' |
| ' "remit_to_address": "string or null",\n' |
| ' "tax_id": "string or null",\n' |
| ' "tax_registration_number": "string or null",\n' |
| ' "vat_number": "string or null",\n' |
| ' "payment_terms": "string or null",\n' |
| ' "payment_method": "string or null",\n' |
| ' "payment_reference": "string or null",\n' |
| ' "bank_account_number": "string or null",\n' |
| ' "iban": "string or null",\n' |
| ' "swift_code": "string or null",\n' |
| ' "total_before_tax": "string or null",\n' |
| ' "tax_amount": "string or null",\n' |
| ' "tax_rate": "string or null",\n' |
| ' "shipping_charges": "string or null",\n' |
| ' "discount": "string or null",\n' |
| ' "total_due": "string or null",\n' |
| ' "amount_paid": "string or null",\n' |
| ' "balance_due": "string or null",\n' |
| ' "due_date": "string or null",\n' |
| ' "invoice_status": "string or null",\n' |
| ' "reference_number": "string or null",\n' |
| ' "project_code": "string or null",\n' |
| ' "department": "string or null",\n' |
| ' "contact_person": "string or null",\n' |
| ' "notes": "string or null",\n' |
| ' "additional_info": "string or null"\n' |
| ' },\n' |
| ' "line_items": [\n' |
| ' {\n' |
| ' "quantity": "string or null",\n' |
| ' "units": "string or null",\n' |
| ' "description": "string or null",\n' |
| ' "footage": "string or null",\n' |
| ' "price": "string or null",\n' |
| ' "amount": "string or null",\n' |
| ' "notes": "string or null"\n' |
| ' }\n' |
| ' ]\n' |
| '}\n' |
| 'If a field is missing for a line item or header, use null. ' |
| 'Do not invent fields. Do not add any header or shipment data to any line item. ' |
| 'Return ONLY the JSON object, no explanation.\n' |
| ) |
| hints = "" |
| if mapped_hints: |
| hints += "\nHints (header):\n" + " ".join([f"#{k}: {v}" for k, v in mapped_hints.items()]) |
| if items_hints: |
| try: |
| hints += "\nHints (line_items):\n" + json.dumps(items_hints, ensure_ascii=False) |
| except: |
| pass |
|
|
| return instruction + "\nInvoice Text:\n" + invoice_text.strip() + hints |
|
|
| def strict_json(text: str) -> Dict[str, Any]: |
| |
| try: |
| return json.loads(text) |
| except: |
| pass |
| |
| start = text.find("{") |
| end = text.rfind("}") |
| if start != -1 and end != -1 and end > start: |
| try: |
| return json.loads(text[start:end+1]) |
| except: |
| pass |
| raise ValueError("Model did not return valid JSON.") |
|
|
| |
| def merge_schema(rule_json: Dict[str, Any], model_json: Dict[str, Any]) -> Dict[str, Any]: |
| """ |
| RULES WIN: Keep everything we extracted deterministically; fill only missing (None) from model. |
| """ |
| final = copy.deepcopy(rule_json) |
|
|
| |
| hdr = final["invoice_header"] |
| mdl_hdr = (model_json.get("invoice_header") or {}) |
| for k in hdr.keys(): |
| if hdr[k] in [None, "", "null"]: |
| v = mdl_hdr.get(k, None) |
| if v not in [None, "", "null"]: |
| hdr[k] = v |
|
|
| |
| if final["line_items"] and any(any(v for v in row.values() if v not in [None, "", "null"]) for row in final["line_items"]): |
| pass |
| else: |
| mdl_items = model_json.get("line_items") |
| if isinstance(mdl_items, list) and mdl_items: |
| final["line_items"] = mdl_items |
| else: |
| |
| pass |
|
|
| return final |
|
|
| |
| invoice_text = st.text_area( |
| "Paste the invoice text here.", |
| height=320, |
| placeholder="Paste the invoice content (OCR/plain text) ..." |
| ) |
|
|
| if st.button("Generate JSON", type="primary", use_container_width=True): |
| if not invoice_text.strip(): |
| st.error("Please paste the invoice text first.") |
| st.stop() |
|
|
| txt = invoice_text |
|
|
| |
| |
| candidates = extract_candidates(txt) |
|
|
| |
| hard = regex_extract_all(txt) |
|
|
| |
| bank = extract_bank_block(txt) |
|
|
| |
| items = parse_line_items(txt) |
|
|
| |
| sem_mapped = semantic_map_candidates(candidates, STATIC_HEADERS, threshold) |
|
|
| |
| header_found: Dict[str, Any] = {} |
| header_found.update(sem_mapped) |
| header_found.update(hard) |
| header_found.update(bank) |
|
|
| |
| rule_json = deep_copy_schema() |
| for k, v in header_found.items(): |
| if k in rule_json["invoice_header"]: |
| rule_json["invoice_header"][k] = v |
| |
| if items: |
| rule_json["line_items"] = items |
|
|
| if show_intermediates: |
| st.subheader("Candidates (first 20)") |
| st.json(dict(list(candidates.items())[:20])) |
| st.subheader("Regex/Hard fields") |
| st.json(hard) |
| st.subheader("Bank block") |
| st.json(bank) |
| st.subheader("Semantic-mapped headers") |
| st.json(sem_mapped) |
| st.subheader("Line items (parsed)") |
| st.json(items) |
|
|
| |
| with st.spinner("Generating structured JSON with MD2JSON-T5-small-V1..."): |
| prompt = build_prompt(txt, header_found, items) |
| gen = json_converter(prompt, max_new_tokens=max_new_tokens)[0]["generated_text"] |
| try: |
| model_json = strict_json(gen) |
| except: |
| model_json = deep_copy_schema() |
|
|
| |
| final_json = merge_schema(rule_json, model_json) |
|
|
| st.subheader("Final JSON") |
| st.json(final_json) |
| st.download_button("Download JSON", data=json.dumps(final_json, indent=2), |
| file_name="invoice.json", mime="application/json", use_container_width=True) |
|
|