Spaces:
Sleeping
Sleeping
| import gradio as gr | |
| from uuid_extensions import uuid7str | |
| from invokation import processing_stream | |
| #For Gradio format | |
| def is_chatmessage_obj(x) -> bool: | |
| return hasattr(x, "role") and hasattr(x, "content") | |
| def content_to_text(content) -> str: | |
| if content is None: | |
| return "" | |
| if isinstance(content, str): | |
| return content | |
| # Gradio message parts: [{"type":"text","text":"..."} , ...] | |
| if isinstance(content, list): | |
| out = [] | |
| for part in content: | |
| if isinstance(part, str): | |
| out.append(part) | |
| elif isinstance(part, dict): | |
| if part.get("type") == "text" and "text" in part: | |
| out.append(str(part["text"])) | |
| elif "text" in part: | |
| out.append(str(part["text"])) | |
| elif "content" in part: | |
| out.append(str(part["content"])) | |
| return "".join(out).strip() | |
| return str(content).strip() | |
| def normalize_messages(history): | |
| if not history: | |
| return [] | |
| out = [] | |
| # messages dict | |
| if isinstance(history, list) and history and isinstance(history[0], dict): | |
| for m in history: | |
| role = m.get("role") | |
| content = content_to_text(m.get("content")) | |
| if role in ("user", "assistant") and content is not None: | |
| out.append({"role": role, "content": str(content)}) | |
| return out | |
| # ChatMessage objects | |
| if isinstance(history, list) and history and is_chatmessage_obj(history[0]): | |
| for m in history: | |
| out.append({"role": getattr(m, "role"), "content": str(getattr(m, "content"))}) | |
| return out | |
| # tuples/list pairs | |
| if isinstance(history, list): | |
| for pair in history: | |
| if not isinstance(pair, (list, tuple)) or len(pair) != 2: | |
| continue | |
| u, a = pair | |
| if u is not None and str(u).strip(): | |
| out.append({"role": "user", "content": str(u)}) | |
| if a is not None and str(a).strip(): | |
| out.append({"role": "assistant", "content": str(a)}) | |
| return out | |
| return [] | |
| def set_last_assistant(msgs, text: str): | |
| if not msgs: | |
| return [{"role": "assistant", "content": text}] | |
| if isinstance(msgs[-1], dict) and msgs[-1].get("role") == "assistant": | |
| msgs[-1]["content"] = text | |
| else: | |
| msgs.append({"role": "assistant", "content": text}) | |
| return msgs | |
| def respond_stream(user_input, ui_history, session_id, score_value): | |
| user_input = (user_input or "").strip() | |
| msgs = normalize_messages(ui_history) | |
| if not user_input: | |
| yield gr.update(value="", interactive=True), msgs, session_id, score_value | |
| return | |
| # refresh -> session_state จะว่าง -> create new session | |
| if not session_id: | |
| session_id = uuid7str() | |
| score_value = 0 | |
| msgs = msgs + [ | |
| {"role": "user", "content": user_input}, | |
| {"role": "assistant", "content": ""}, | |
| ] | |
| yield gr.update(value="", interactive=False), msgs, session_id, score_value | |
| # question + session_id Only | |
| req = {"session_id": session_id, "question": user_input} | |
| try: | |
| for ev in processing_stream(req): | |
| t = ev.get("type") | |
| if t == "partial": | |
| msgs = set_last_assistant(msgs, ev.get("answer", "")) | |
| yield gr.update(value="", interactive=False), msgs, session_id, score_value | |
| elif t == "final": | |
| msgs = set_last_assistant(msgs, ev.get("answer", "")) | |
| score_value = ev.get("score_after", score_value) | |
| session_id = ev.get("session_id", session_id) | |
| yield gr.update(value="", interactive=True), msgs, session_id, score_value | |
| except Exception as e: | |
| msgs = set_last_assistant(msgs, f"(Error) {type(e).__name__}: {e}") | |
| yield gr.update(value="", interactive=True), msgs, session_id, score_value | |
| def chat_api(question: str, session_id: str): | |
| q = (question or "").strip() | |
| sid = (session_id or "").strip() or uuid7str() | |
| if not q: | |
| return {"error": "question is empty", "session_id": sid} | |
| req = {"session_id": sid, "question": q} | |
| final_ev = None | |
| for ev in processing_stream(req): | |
| if ev.get("type") == "final": | |
| final_ev = ev | |
| if not final_ev: | |
| return {"error": "no final response", "session_id": sid} | |
| return { | |
| "session_id": final_ev.get("session_id", sid), | |
| "answer": final_ev.get("answer", ""), | |
| "score_delta": final_ev.get("score_delta", 0), | |
| "score_after": final_ev.get("score_after", 0), | |
| "sentiment": final_ev.get("sentiment"), | |
| } | |
| with gr.Blocks() as demo: | |
| session_state = gr.State("") | |
| gr.Markdown("## Kaguya-sama Chat") | |
| score_box = gr.Number(label="Current Score", value=0, interactive=False) | |
| chatbot = gr.Chatbot(value=[]) | |
| msg = gr.Textbox(placeholder="พิมพ์ข้อความ...") | |
| msg.submit( | |
| respond_stream, | |
| inputs=[msg, chatbot, session_state, score_box], | |
| outputs=[msg, chatbot, session_state, score_box], | |
| stream_every=0.0005 | |
| ) | |
| api_q = gr.Textbox(visible=False) | |
| api_sid = gr.Textbox(visible=False) | |
| api_out = gr.JSON(visible=False) | |
| api_btn = gr.Button(visible=False) | |
| api_btn.click( | |
| chat_api, | |
| inputs=[api_q, api_sid], | |
| outputs=api_out, | |
| api_name="chat_api" | |
| ) | |
| demo.queue().launch() |