from datetime import datetime import json import os import gradio as gr from gradio_client import Client import shutil import time import random import zipfile import base64 # Global variables SPFManstate = { "last_file": 0, "output_dir": "saved_media", # Same as SAVE_DIR in file_explorer_and_upload.py "errors": [], "skipped_items": [], "is_paid_api": False, "cost_per_item": 0.1, # Default cost per item for paid API "timestamp": datetime.now().strftime("%Y%m%d_%H%M%S"), "api_provider": "default", # Default API provider "retry_delay": 300, # 5 minutes delay for retry "max_retries": 3 # Maximum number of retries } SPFManprompt_list = [] #SPFMan - State, Prompt and file manager def SPFManload_state(config_file=None): global SPFManstate, SPFManprompt_list try: if config_file: with open(config_file, "r", encoding="utf-8", errors="replace") as f: loaded_data = json.load(f) SPFManstate.update(loaded_data.get("state", {})) SPFManprompt_list = loaded_data.get("prompt_list", []) elif os.path.exists("state.json"): with open("state.json", "r", encoding="utf-8", errors="replace") as f: loaded_data = json.load(f) SPFManstate.update(loaded_data.get("state", {})) SPFManprompt_list = loaded_data.get("prompt_list", []) except json.JSONDecodeError as e: print(f"Error decoding JSON: {e}") except Exception as e: print(f"Unexpected error: {e}") def SPFMansave_state(): SPFManstate["timestamp"] = datetime.now().strftime("%Y%m%d_%H%M%S") data_to_save = { "state": SPFManstate, "prompt_list": SPFManprompt_list } with open(f"state_{SPFManstate['timestamp']}.json", "w") as f: json.dump(data_to_save, f) # Delete old state files for file in os.listdir(): if file.startswith("state_") and file.endswith(".json") and file != f"state_{SPFManstate['timestamp']}.json": os.remove(file) def SPFManensure_output_directory(): os.makedirs(SPFManstate['output_dir'], exist_ok=True) def SPFMangenerate_image(prompt, retries=0): SPFManensure_output_directory() try: client = Client("black-forest-labs/FLUX.1-dev") result = client.predict( prompt=prompt, seed=0, randomize_seed=True, width=1024, height=1024, guidance_scale=3.5, num_inference_steps=28, api_name="/infer" ) image_path = result[0] filename = f"{SPFManstate['output_dir']}/{prompt[:50].replace(' ', '_')}_image_{SPFManstate['timestamp']}.webp" shutil.move(image_path, filename) return f"Image saved as {filename}" except Exception as e: error_msg = f"Error generating image: {str(e)}" if "exceeded your GPU quota" in str(e) and retries < SPFManstate['max_retries']: time.sleep(SPFManstate['retry_delay']) return SPFMangenerate_image(prompt, retries + 1) SPFManstate["errors"].append(error_msg) return error_msg def SPFMangenerate_audio(prompt, seconds_total=30, steps=100, cfg_scale=7, retries=0): """Generate audio using Stable Audio API. Args: prompt: Text description of the sound seconds_total: Duration in seconds (default 30) steps: Number of generation steps (default 100) cfg_scale: Classifier-free guidance scale (default 7) retries: Number of retry attempts """ SPFManensure_output_directory() try: client = Client("artificialguybr/Stable-Audio-Open-Zero") result = client.predict( prompt=prompt, seconds_total=int(seconds_total), steps=int(steps), cfg_scale=float(cfg_scale), api_name="/predict" ) if isinstance(result, str) and os.path.exists(result): filename = f"{SPFManstate['output_dir']}/{prompt[:50].replace(' ', '_')}_audio_{SPFManstate['timestamp']}.wav" shutil.move(result, filename) else: audio_data = base64.b64decode(result) filename = f"{SPFManstate['output_dir']}/{prompt[:50].replace(' ', '_')}_audio_{SPFManstate['timestamp']}.wav" with open(filename, "wb") as audio_file: audio_file.write(audio_data) return f"Audio saved as {filename}" except Exception as e: error_msg = f"Error generating audio: {str(e)}" if "exceeded your GPU quota" in str(e) and retries < SPFManstate['max_retries']: time.sleep(SPFManstate['retry_delay']) return SPFMangenerate_audio(prompt, retries + 1) SPFManstate["errors"].append(error_msg) return error_msg def SPFMangenerate_3d_model(prompt, guidance_scale=15.0, num_steps=64, retries=0, use_local=True): """ Generate 3D model using Shap-E. Tries local ZeroGPU first, falls back to API if local fails. Args: prompt: Text description of the 3D object guidance_scale: Classifier-free guidance scale (default 15.0) num_steps: Number of inference steps (default 64) retries: Number of retry attempts use_local: Whether to try local generation first """ SPFManensure_output_directory() # Try local generation first (faster, no rate limits) if use_local: try: from leveraging_machine_learning import generate_3d_local status, filepath = generate_3d_local(prompt, guidance_scale=guidance_scale, num_steps=num_steps) if filepath and os.path.exists(filepath): return status print(f"Local 3D generation failed: {status}, falling back to API...") except Exception as local_error: print(f"Local 3D error: {local_error}, falling back to API...") # Fall back to API try: client = Client("hysts/Shap-E") result = client.predict( prompt=prompt, seed=0, guidance_scale=float(guidance_scale), num_inference_steps=int(num_steps), api_name="/text-to-3d" ) if isinstance(result, str) and os.path.exists(result): safe_prompt = prompt[:40].replace(' ', '_').replace('/', '_').replace('\\', '_') filename = f"{SPFManstate['output_dir']}/{safe_prompt}_3d_{SPFManstate['timestamp']}.glb" shutil.move(result, filename) return f"3D model saved as {filename}" else: return f"Error: Unexpected result format from Shap-E" except Exception as e: error_msg = f"Error generating 3D model: {str(e)}" if "exceeded your GPU quota" in str(e) and retries < SPFManstate['max_retries']: time.sleep(SPFManstate['retry_delay']) return SPFMangenerate_3d_model(prompt, retries + 1, use_local=False) SPFManstate["errors"].append(error_msg) return error_msg def SPFMangenerate_tts(text, model="kokoro", voice="af_heart", retries=0, use_local=True): """ Generate TTS audio using multiple TTS backends. Tries local ZeroGPU first, falls back to API if local fails. Supported models: - kokoro: Kokoro-82M (fast, natural voices) - supertonic: Supertonic-2 (high-quality, expressive) - glm-tts: GLM-TTS (multilingual, voice cloning) - API only """ SPFManensure_output_directory() # Try local generation first (faster, no rate limits) # GLM-TTS doesn't support local generation if use_local and model != "glm-tts": try: from leveraging_machine_learning import generate_tts_local status, filepath = generate_tts_local(text, model=model, voice=voice) if filepath and os.path.exists(filepath): return status print(f"Local TTS failed: {status}, falling back to API...") except Exception as local_error: print(f"Local TTS error: {local_error}, falling back to API...") # Fall back to API try: from leveraging_machine_learning import generate_tts_api status, filepath = generate_tts_api(text, model=model, voice=voice) if filepath and os.path.exists(filepath): return status return f"Error: Could not process TTS result - {status}" except Exception as e: error_msg = f"Error generating TTS: {str(e)}" if "exceeded your GPU quota" in str(e) and retries < SPFManstate['max_retries']: time.sleep(SPFManstate['retry_delay']) return SPFMangenerate_tts(text, model, voice, retries + 1, use_local=False) SPFManstate["errors"].append(error_msg) return error_msg def SPFMangenerate_talking_head(image_path, audio_path=None, text=None, retries=0, use_local=False): """ Generate talking head video from portrait image + audio/text. Tries local ZeroGPU first, then falls back to HuggingFace Spaces. Args: image_path: Path to portrait image audio_path: Path to audio file (optional if text provided) text: Text to convert to speech first (optional if audio provided) retries: Retry counter for rate limiting use_local: Try local ZeroGPU generation first Returns: Status message with path to generated video TODO (Future local ZeroGPU implementation): - Wav2Lip is simpler than SadTalker (just lip sync, ~300MB checkpoint) - Could implement using: https://huggingface.co/spaces/pragnakalp/Wav2lip-ZeroGPU - Requires: face_detection, wav2lip model checkpoint, ffmpeg - See: https://github.com/Rudrabha/Wav2Lip for model details """ from gradio_client import Client, handle_file SPFManensure_output_directory() errors = [] try: # If text provided but no audio, generate TTS first if text and not audio_path: tts_result = SPFMangenerate_tts(text) if "saved as" in tts_result: audio_path = tts_result.split("saved as ")[-1].strip() else: return f"Error: Failed to generate TTS audio: {tts_result}" if not audio_path or not os.path.exists(audio_path): return "Error: No audio file provided or generated" if not image_path or not os.path.exists(image_path): return "Error: No portrait image provided" # Try local generation first (ZeroGPU) if use_local: try: print("[TalkingHead] Trying local ZeroGPU generation...") from leveraging_machine_learning import generate_talking_head_local status, filepath = generate_talking_head_local(image_path, audio_path) if filepath and os.path.exists(filepath): return status print(f"[TalkingHead] Local generation failed: {status}") errors.append(f"Local (ZeroGPU): {status}") except Exception as local_error: print(f"[TalkingHead] Local error: {local_error}") errors.append(f"Local (ZeroGPU): {str(local_error)}") result = None # Fallback to API spaces print("[TalkingHead] Falling back to API spaces...") # List of spaces to try with their configurations # Updated 2026-01-25: Prioritize MoDA (runs on ZeroGPU, MIT license) # SadTalker/Hallo often have RUNTIME_ERROR state spaces_to_try = [ { "name": "MoDA-FastTalkingHead", "space": "multimodalart/MoDA-fast-talking-head", "api_name": "/generate_motion", "params": lambda img, aud: { "source_image_path": handle_file(img), "driving_audio_path": handle_file(aud), "emotion_name": "None", # Options: None, Happy, Angry, Surprise, Sad, Disgust, Fear "cfg_scale": 1.2, } }, { "name": "SadTalker", "space": "vinthony/SadTalker", "api_name": "/inference", "params": lambda img, aud: { "source_image": handle_file(img), "driven_audio": handle_file(aud), "preprocess": "crop", "still_mode": False, "use_enhancer": False, "batch_size": 2, "size": 256, "pose_style": 0, "facerender": "facevid2vid", "exp_weight": 1.0, "use_ref": False, "ref_video": None, "ref_info": "pose", "use_idle": False, "length": 0, } }, { "name": "Hallo", "space": "fudan-generative-ai/hallo", "api_name": "/predict", "params": lambda img, aud: { "source_image": handle_file(img), "driving_audio": handle_file(aud), } }, { "name": "Wav2Lip-ZeroGPU", "space": "pragnakalp/Wav2lip-ZeroGPU", "api_name": "/run_infrence", # Note: typo is in original space "params": lambda img, aud: { "input_image": handle_file(img), "input_audio": handle_file(aud), } }, ] for space_config in spaces_to_try: try: print(f"[TalkingHead] Trying {space_config['name']} ({space_config['space']})...") client = Client(space_config["space"]) # Get API info for debugging try: api_info = client.view_api(return_format="dict") named_endpoints = list(api_info.get('named_endpoints', {}).keys()) if api_info else [] unnamed_endpoints = list(api_info.get('unnamed_endpoints', {}).keys()) if api_info else [] print(f"[TalkingHead] Named endpoints: {named_endpoints}") print(f"[TalkingHead] Unnamed endpoints: {unnamed_endpoints}") print(f"[TalkingHead] Attempting to use: {space_config['api_name']}") except Exception as api_err: print(f"[TalkingHead] Could not get API info: {api_err}") params = space_config["params"](image_path, audio_path) print(f"[TalkingHead] Sending request with image={image_path}, audio={audio_path}") result = client.predict(**params, api_name=space_config["api_name"]) print(f"[TalkingHead] SUCCESS with {space_config['name']}!") print(f"[TalkingHead] Result type: {type(result)}, value: {str(result)[:200]}") break # Success, exit loop except Exception as e: error_msg = f"{space_config['name']}: {str(e)}" errors.append(error_msg) print(f"[TalkingHead] FAILED: {error_msg}") import traceback print(f"[TalkingHead] Traceback: {traceback.format_exc()}") continue # Try next space if result is None: return f"Error: All talking head spaces failed.\n" + "\n".join(errors) # Handle result - could be file path or tuple if isinstance(result, str) and os.path.exists(result): video_path = result elif isinstance(result, tuple) and len(result) > 0: # Find the first valid file path in the tuple video_path = None for item in result: if isinstance(item, str) and os.path.exists(item): video_path = item break elif isinstance(result, dict) and 'video' in result: video_path = result['video'] else: video_path = None if video_path and os.path.exists(video_path): # Move to output directory safe_name = os.path.splitext(os.path.basename(image_path))[0][:30] filename = f"{SPFManstate['output_dir']}/talking_head_{safe_name}_{SPFManstate['timestamp']}.mp4" shutil.move(video_path, filename) return f"Talking head video saved as {filename}" else: return f"Error: Could not process result from talking head generation" except Exception as e: error_msg = f"Error generating talking head video: {str(e)}" if "exceeded your GPU quota" in str(e) and retries < SPFManstate['max_retries']: time.sleep(SPFManstate['retry_delay']) return SPFMangenerate_talking_head(image_path, audio_path, text, retries + 1) SPFManstate["errors"].append(error_msg) return error_msg def SPFManprocess_prompts(prompt_list): router = { 'image': SPFMangenerate_image, 'audio': SPFMangenerate_audio, '3d': SPFMangenerate_3d_model, 'tts': SPFMangenerate_tts, 'talking_head': SPFMangenerate_talking_head, } results = [] for prompt_type, prompt in prompt_list: if prompt_type in router: result = router[prompt_type](prompt) results.append(result) if "Error" in result: break # Stop processing if there's an error else: error_msg = f"Unknown prompt type: {prompt_type}" SPFManstate["errors"].append(error_msg) results.append(error_msg) break # Stop processing if there's an error return results def SPFMancreate_files_with_generation(resume=True): global SPFManstate, SPFManprompt_list results = [] if resume and SPFManstate["last_file"] < len(SPFManprompt_list): start = SPFManstate["last_file"] results.append(f"Resuming from item {start + 1}") else: start = 0 end = len(SPFManprompt_list) for i in range(start, end): if i in SPFManstate["skipped_items"]: results.append(f"Skipped item {i + 1}") continue prompt_type, prompt = SPFManprompt_list[i] try: if SPFManstate["is_paid_api"]: generation_results = SPFManprocess_prompts([(prompt_type, prompt)]) results.extend(generation_results) else: results.append(f"Processing: {prompt_type} - {prompt}") generation_results = SPFManprocess_prompts([(prompt_type, prompt)]) results.extend(generation_results) if any("Error" in result for result in generation_results): break # Stop processing if there's an error SPFManstate["last_file"] = i + 1 except Exception as e: error_msg = f"Error processing item {i + 1}: {str(e)}" SPFManstate["errors"].append(error_msg) results.append(error_msg) break # Stop processing if there's an error SPFMansave_state() yield "\n".join(results) if not SPFManstate["is_paid_api"]: break # Stop after processing one item for non-paid API def SPFManadd_prompt(prompt_type, prompt): global SPFManprompt_list SPFManprompt_list.append((prompt_type, prompt)) SPFMansave_state() return f"Added {prompt_type}: {prompt}", gr.update(value=len(SPFManprompt_list)) def SPFManclear_prompts(): global SPFManprompt_list SPFManprompt_list = [] SPFMansave_state() return "Prompt list cleared", gr.update(value=0) def SPFManview_all_prompts(): return "\n".join([f"{i+1}. {t}: {p}" for i, (t, p) in enumerate(SPFManprompt_list)]) def SPFManzip_files(): SPFManensure_output_directory() zip_filename = f"output_{SPFManstate['timestamp']}.zip" with zipfile.ZipFile(zip_filename, 'w') as zipf: for root, _, files in os.walk(SPFManstate['output_dir']): for file in files: zipf.write(os.path.join(root, file)) zipf.write(f"state_{SPFManstate['timestamp']}.json") # Add prompt list to zip prompt_list_filename = f"prompt_list_{SPFManstate['timestamp']}.txt" with open(prompt_list_filename, 'w') as f: for t, p in SPFManprompt_list: f.write(f"{t}: {p}\n") zipf.write(prompt_list_filename) os.remove(prompt_list_filename) # Remove the temporary file return f"Files zipped as {zip_filename}" def SPFMantoggle_paid_api(value): SPFManstate["is_paid_api"] = value SPFMansave_state() return f"Paid API: {'Enabled' if value else 'Disabled'}" def SPFManestimate_cost(): return f"Estimated cost: ${len(SPFManprompt_list) * SPFManstate['cost_per_item']:.2f}" def SPFManauto_generate_prompt(prompt_type): subjects = ["cat", "dog", "tree", "mountain", "ocean", "city", "person", "flower", "car", "building"] styles = ["realistic", "cartoon", "abstract", "vintage", "futuristic", "minimalist", "surreal", "impressionist"] actions = ["running", "sleeping", "flying", "dancing", "singing", "jumping", "sitting", "laughing"] if prompt_type == "image": return f"A {random.choice(styles)} {random.choice(subjects)} {random.choice(actions)}" elif prompt_type == "audio": return f"Sound of a {random.choice(subjects)} {random.choice(actions)}" def SPFManview_config(): config = { "state": SPFManstate, "prompt_list_length": len(SPFManprompt_list) } return json.dumps(config, indent=2) def SPFManskip_item(): if SPFManstate["last_file"] < len(SPFManprompt_list): SPFManstate["skipped_items"].append(SPFManstate["last_file"]) SPFManstate["last_file"] += 1 SPFMansave_state() return f"Skipped item {SPFManstate['last_file']}" return "No more items to skip" def SPFManupdate_api_details(provider, cost): SPFManstate["api_provider"] = provider SPFManstate["cost_per_item"] = float(cost) SPFMansave_state() return f"API details updated: Provider - {provider}, Cost per item - ${cost}" def SPFManload_config_file(file): global SPFManstate, SPFManprompt_list try: # Clear existing state and prompt list SPFManstate = { "last_file": 0, "output_dir": "saved_media", # Same as SAVE_DIR in file_explorer_and_upload.py "errors": [], "skipped_items": [], "is_paid_api": False, "cost_per_item": 0.1, "timestamp": datetime.now().strftime("%Y%m%d_%H%M%S"), "api_provider": "default", "retry_delay": 300, "max_retries": 3 } SPFManprompt_list = [] # Check if the file is a ZIP archive if file.name.endswith('.zip'): # Extract the ZIP file extracted_folder_path = 'extracted_files' os.makedirs(extracted_folder_path, exist_ok=True) with zipfile.ZipFile(file.name, 'r') as zip_ref: zip_ref.extractall(extracted_folder_path) # Find and load the state JSON file json_files = [f for f in os.listdir(extracted_folder_path) if f.startswith('state_') and f.endswith('.json')] if json_files: json_file_path = os.path.join(extracted_folder_path, json_files[0]) with open(json_file_path, 'r') as json_file: loaded_data = json.load(json_file) SPFManstate.update(loaded_data.get("state", {})) SPFManprompt_list = loaded_data.get("prompt_list", []) # Find and load the prompt list text file txt_files = [f for f in os.listdir(extracted_folder_path) if f.startswith('prompt_list_') and f.endswith('.txt')] if txt_files: txt_file_path = os.path.join(extracted_folder_path, txt_files[0]) with open(txt_file_path, 'r') as txt_file: for line in txt_file: prompt_type, prompt = line.strip().split(': ', 1) SPFManprompt_list.append((prompt_type, prompt)) # Clean up extracted files shutil.rmtree(extracted_folder_path) else: # Load new configuration from a single file SPFManload_state(file.name) SPFMansave_state() # Save the loaded state return f"Configuration loaded from {file.name}", gr.update(value=len(SPFManprompt_list)) except Exception as e: return f"Error loading configuration: {str(e)}", gr.update(value=len(SPFManprompt_list)) # Handle JSON input and loading def SPFManload_json_configuration(json_text): global SPFManstate, SPFManprompt_list try: loaded_data = json.loads(json_text) SPFManstate = loaded_data.get("state", SPFManstate) SPFManprompt_list = loaded_data.get("prompt_list", SPFManprompt_list) SPFMansave_state() return "Configuration loaded from JSON input", gr.update(value=len(SPFManprompt_list)) except json.JSONDecodeError as e: return f"Error parsing JSON: {str(e)}", gr.update(value=len(SPFManprompt_list)) except Exception as e: return f"Unexpected error: {str(e)}", gr.update(value=len(SPFManprompt_list)) # ============================================================ # UNIFIED GENERATION QUEUE SYSTEM # ============================================================ # Queue state for unified generation generation_queue = [] generation_log = [] def add_to_generation_queue(prompt_type, prompt, audio_duration=30, audio_steps=100, audio_cfg=7, tts_model="kokoro", tts_voice="af_heart", threeds_guidance=15, threeds_steps=64): """Add a single prompt to the generation queue with type-specific parameters. Args: prompt_type: Type of generation (text, image, audio, tts, 3d) prompt: The prompt/text to generate from audio_duration: Duration for Stable Audio (seconds) audio_steps: Steps for Stable Audio audio_cfg: CFG scale for Stable Audio tts_model: TTS model (kokoro, supertonic, glm-tts) tts_voice: Voice for TTS threeds_guidance: Guidance scale for 3D (Shap-E) threeds_steps: Steps for 3D generation """ global generation_queue if not prompt or not prompt.strip(): return get_queue_dataframe(), f"**Queue: {len(generation_queue)} items**", "Please enter a prompt" item = { "id": len(generation_queue) + 1, "type": prompt_type, "prompt": prompt.strip(), "status": "pending", "params": {} } # Store type-specific parameters if prompt_type == "audio": item["params"] = { "duration": int(audio_duration), "steps": int(audio_steps), "cfg_scale": float(audio_cfg) } elif prompt_type == "tts": item["params"] = { "model": tts_model, "voice": tts_voice } elif prompt_type == "3d": item["params"] = { "guidance_scale": float(threeds_guidance), "steps": int(threeds_steps) } generation_queue.append(item) return get_queue_dataframe(), f"**Queue: {len(generation_queue)} items**", f"Added: {prompt_type} prompt" def get_queue_dataframe(): """Return queue as dataframe for display""" if not generation_queue: return [] return [[q["id"], q["type"], q["prompt"][:60] + ("..." if len(q["prompt"]) > 60 else ""), q["status"]] for q in generation_queue] def clear_generation_queue(): """Clear the entire queue""" global generation_queue, generation_log generation_queue = [] generation_log = [] return get_queue_dataframe(), "**Queue: 0 items**", get_log_dataframe(), "Queue cleared" def remove_completed_from_queue(): """Remove completed and errored items from queue""" global generation_queue generation_queue = [q for q in generation_queue if q["status"] == "pending"] # Re-index for i, q in enumerate(generation_queue): q["id"] = i + 1 return get_queue_dataframe(), f"**Queue: {len(generation_queue)} items**" def get_log_dataframe(): """Return generation log as dataframe""" if not generation_log: return [] return [[l["id"], l["prompt"][:40], l["status"], l["result"][:60] if l["result"] else ""] for l in generation_log] def extract_prompts_from_config(config_file): """Extract media prompts from a game config JSON file""" global generation_queue if config_file is None: return get_queue_dataframe(), f"**Queue: {len(generation_queue)} items**", "No file uploaded" try: with open(config_file.name, 'r') as f: config = json.load(f) extracted_count = 0 # Walk through config looking for media fields for location_key, location_data in config.items(): if isinstance(location_data, dict): for subloc_key, subloc_data in location_data.items(): if isinstance(subloc_data, dict): # Check for media field if 'media' in subloc_data: media_list = subloc_data['media'] if isinstance(media_list, list): for media_item in media_list: # If it looks like a prompt (not a filename), add to queue if isinstance(media_item, str) and not any(ext in media_item.lower() for ext in ['.png', '.jpg', '.jpeg', '.gif', '.bmp', '.webp', '.mp3', '.wav', '.ogg', '.mp4', '.avi', '.mov', '.webm', '.glb', '.gltf', '.obj']): item = { "id": len(generation_queue) + 1, "type": "image", # Default to image "prompt": media_item, "status": "pending" } generation_queue.append(item) extracted_count += 1 # Check for description as potential prompt if 'description' in subloc_data and isinstance(subloc_data['description'], str): desc = subloc_data['description'] if len(desc) > 20: # Only meaningful descriptions item = { "id": len(generation_queue) + 1, "type": "image", "prompt": f"Scene: {desc[:200]}", "status": "pending" } generation_queue.append(item) extracted_count += 1 return get_queue_dataframe(), f"**Queue: {len(generation_queue)} items**", f"Extracted {extracted_count} prompts from config" except json.JSONDecodeError as e: return get_queue_dataframe(), f"**Queue: {len(generation_queue)} items**", f"Invalid JSON: {str(e)}" except Exception as e: return get_queue_dataframe(), f"**Queue: {len(generation_queue)} items**", f"Error: {str(e)}" def process_queue_item_local(item, text_model, image_model, use_rag, use_streaming, max_tokens=512): """Process a single queue item using local ZeroGPU generation""" from leveraging_machine_learning import generate_response, generate_image if item["type"] == "text": # generate_response now handles model loading/switching internally # Pass model_name so it can switch if needed result = None try: for r in generate_response(item["prompt"], use_rag, use_streaming, max_tokens=int(max_tokens), model_name=text_model): result = r if result: return result[0], None # Return the generated text, no image return "No response generated", None except Exception as e: return f"Error: {str(e)}", None elif item["type"] == "image": try: status, ram, image = generate_image(item["prompt"], image_model) if image: return status, image # Return status and actual image return f"Error: {status}", None except Exception as e: return f"Error: {str(e)}", None elif item["type"] == "audio": # Use the existing SPFMan audio generation with parameters params = item.get("params", {}) result = SPFMangenerate_audio( item["prompt"], seconds_total=params.get("duration", 30), steps=params.get("steps", 100), cfg_scale=params.get("cfg_scale", 7) ) return result, None elif item["type"] == "3d": # Use the SPFMan 3D model generation with parameters params = item.get("params", {}) result = SPFMangenerate_3d_model( item["prompt"], guidance_scale=params.get("guidance_scale", 15), num_steps=params.get("steps", 64) ) return result, None elif item["type"] == "tts": # Use the SPFMan TTS generation with parameters params = item.get("params", {}) result = SPFMangenerate_tts( item["prompt"], model=params.get("model", "kokoro"), voice=params.get("voice", "af_heart") ) return result, None return f"Unknown type: {item['type']}", None def process_queue_item_api(item, api_source, hf_model_id, replicate_model): """Process a single queue item using API""" if api_source == "HF Inference": # Use existing SPFMan functions which call HF APIs if item["type"] == "image": result = SPFMangenerate_image(item["prompt"]) return result, None # No image object for API (saved to file) elif item["type"] == "audio": result = SPFMangenerate_audio(item["prompt"]) return result, None elif item["type"] == "3d": result = SPFMangenerate_3d_model(item["prompt"]) return result, None elif item["type"] == "tts": result = SPFMangenerate_tts(item["prompt"]) return result, None else: return "Text generation via HF Inference not implemented yet", None elif api_source == "External (Replicate)": # Placeholder for Replicate API return f"Replicate API not implemented yet for {item['type']}", None return f"Unknown API source: {api_source}", None def retry_failed_items(): """Reset all error items back to pending so they can be retried""" global generation_queue retry_count = 0 for item in generation_queue: if item["status"] == "error": item["status"] = "pending" retry_count += 1 return get_queue_dataframe(), f"**Queue: {len(generation_queue)} items**", f"Reset {retry_count} failed items to pending" def process_next_queue_item(mode, text_model, image_model, use_rag, use_streaming, max_tokens, api_source, hf_model_id, replicate_model): """Process the next pending item in the queue. Returns: (text_out, image_out, audio_out, progress, current_item, queue_df, log_df) """ global generation_queue, generation_log pending = [q for q in generation_queue if q["status"] == "pending"] if not pending: completed = len([q for q in generation_queue if q["status"] in ["completed", "error"]]) total = len(generation_queue) return ( "No pending items in queue", None, None, f"**Progress: {completed}/{total}** - No pending items", "Queue empty or all items processed", get_queue_dataframe(), get_log_dataframe() ) item = pending[0] item["status"] = "processing" result_text = None result_image = None result_audio = None try: if mode == "Local (ZeroGPU)": result_text, result_image = process_queue_item_local(item, text_model, image_model, use_rag, use_streaming, max_tokens) else: result_text, result_image = process_queue_item_api(item, api_source, hf_model_id, replicate_model) if result_text and "Error" in str(result_text): item["status"] = "error" generation_log.append({ "id": item["id"], "prompt": item["prompt"][:40], "status": "Error", "result": str(result_text) }) else: item["status"] = "completed" # Extract generated filename from result text # Multiple formats possible depending on generation method # IMPORTANT: Store full path (e.g., "saved_media/file.webp") for media to load correctly import re result_str = str(result_text) if result_text else "" # Try "saved as" format first (used by SPFMan functions) # Format: "Image saved as saved_media/filename.webp" if "saved as" in result_str.lower(): match = re.search(r'saved as\s+(.+?)(?:\s|$)', result_str, re.IGNORECASE) if match: filepath = match.group(1).strip() # Store full path for media component to load correctly # If it already has saved_media/, use as-is; otherwise prepend it if filepath.startswith("saved_media/") or filepath.startswith("saved_media\\"): item["generated_file"] = filepath else: item["generated_file"] = f"saved_media/{os.path.basename(filepath)}" # Try ": filename" format (used by some local generation) # Format: "Image generated with model: filename.png" elif ": " in result_str: parts = result_str.split(": ") if len(parts) >= 2: filename = parts[-1].strip() # Check for valid image/media extensions valid_extensions = ('.png', '.jpg', '.jpeg', '.webp', '.wav', '.mp3', '.ogg', '.glb', '.gltf', '.obj') if filename.endswith(valid_extensions): # Store with saved_media/ prefix for consistency if filename.startswith("saved_media/") or filename.startswith("saved_media\\"): item["generated_file"] = filename else: item["generated_file"] = f"saved_media/{os.path.basename(filename)}" # Try to find any file path in the result elif not item.get("generated_file"): # Look for saved_media/ path pattern match = re.search(r'(saved_media/[^\s]+)', result_str) if match: item["generated_file"] = match.group(1) # Keep full path generation_log.append({ "id": item["id"], "prompt": item["prompt"][:40], "status": "Success", "result": str(result_text)[:100] if result_text else "Done" }) except Exception as e: item["status"] = "error" generation_log.append({ "id": item["id"], "prompt": item["prompt"][:40], "status": "Error", "result": str(e) }) result_text = f"Error: {str(e)}" result_image = None completed = len([q for q in generation_queue if q["status"] in ["completed", "error"]]) total = len(generation_queue) progress = f"**Progress: {completed}/{total}**" # Return appropriate outputs based on type text_out = result_text if item["type"] == "text" else (result_text if result_text else "") image_out = result_image # Actual image object from generation # For audio/tts types, extract filepath for audio player audio_out = None if item["type"] in ["audio", "tts"] and item.get("generated_file"): audio_out = item["generated_file"] return ( text_out, image_out, audio_out, progress, f"Processed: {item['prompt'][:50]}...", get_queue_dataframe(), get_log_dataframe() ) # ============================================================ # CONFIG-BASED PROMPT BUILDER # ============================================================ # Store loaded config and parsed sections loaded_config_for_prompts = None config_sections = [] section_prompts = {} # {section_key: [{"type": "image", "prompt": "..."}]} def load_config_for_prompts(config_json): """Parse config and extract all sections""" global loaded_config_for_prompts, config_sections, section_prompts if not config_json or not config_json.strip(): return gr.update(choices=[]), "Please paste a config JSON" config_sections = [] section_prompts = {} try: config = json.loads(config_json) if isinstance(config_json, str) else config_json loaded_config_for_prompts = config for location, location_data in config.items(): if isinstance(location_data, dict): for state, state_data in location_data.items(): if isinstance(state_data, dict): section = { "location": location, "state": state, "key": f"{location} → {state}", "description": state_data.get("description", ""), "media": state_data.get("media", []), "choices": state_data.get("choices", []) } config_sections.append(section) choices = [s["key"] for s in config_sections] return gr.update(choices=choices, value=choices[0] if choices else None), f"Loaded {len(config_sections)} sections" except json.JSONDecodeError as e: return gr.update(choices=[]), f"Invalid JSON: {str(e)}" except Exception as e: return gr.update(choices=[]), f"Error: {str(e)}" def get_section_details(section_key): """Get details for a specific section by key""" if not section_key: return "", "", "" for section in config_sections: if section["key"] == section_key: desc = section["description"] or "(No description)" media = "\n".join(section["media"]) if section["media"] else "(No media)" choices = ", ".join(section["choices"]) if section["choices"] else "(No choices)" return desc, media, choices return "", "", "" def add_prompt_to_section(section_key, prompt_type, prompt_text): """Add a prompt for a specific section""" global section_prompts if not section_key: return get_section_prompts_dataframe(), "Please select a section" if not prompt_text or not prompt_text.strip(): return get_section_prompts_dataframe(), "Please enter a prompt" if section_key not in section_prompts: section_prompts[section_key] = [] section_prompts[section_key].append({ "type": prompt_type, "prompt": prompt_text.strip() }) return get_section_prompts_dataframe(), f"Added {prompt_type} prompt to {section_key}" def get_section_prompts_dataframe(): """Return all section prompts as dataframe""" rows = [] for section_key, prompts in section_prompts.items(): for p in prompts: rows.append([section_key, p["type"], p["prompt"][:80] + ("..." if len(p["prompt"]) > 80 else "")]) return rows if rows else [] def auto_generate_section_prompts(): """Auto-generate prompts from descriptions for all sections""" global section_prompts section_prompts = {} count = 0 for section in config_sections: key = section["key"] desc = section["description"] if desc and len(desc) > 20: section_prompts[key] = [{ "type": "image", "prompt": f"Scene illustration: {desc[:200]}" }] count += 1 return get_section_prompts_dataframe(), f"Generated {count} prompts from descriptions" def clear_section_prompts(): """Clear all section prompts""" global section_prompts section_prompts = {} return get_section_prompts_dataframe(), "Cleared all section prompts" def add_all_sections_to_queue(): """Add all section prompts to the generation queue""" global generation_queue, section_prompts if not section_prompts: return get_queue_dataframe(), f"**Queue: {len(generation_queue)} items**", "No prompts to add" added_count = 0 for section_key, prompts in section_prompts.items(): # Parse location and state from key (format: "location → state") parts = section_key.split(" → ") location = parts[0] if len(parts) > 0 else "unknown" state = parts[1] if len(parts) > 1 else "unknown" for p in prompts: item = { "id": len(generation_queue) + 1, "type": p["type"], "prompt": p["prompt"], "status": "pending", "section": { "location": location, "state": state, "source": "config_builder" }, "generated_file": None } generation_queue.append(item) added_count += 1 return get_queue_dataframe(), f"**Queue: {len(generation_queue)} items**", f"Added {added_count} prompts to queue" def update_config_with_generated_media(): """Update loaded config's media arrays with generated files""" global loaded_config_for_prompts, generation_queue if not loaded_config_for_prompts: return "", "No config loaded. Load a config first using 'Load Config' button above." updated_count = 0 for item in generation_queue: if item["status"] == "completed" and item.get("generated_file"): section = item.get("section") if section: loc = section["location"] state = section["state"] if loc in loaded_config_for_prompts and state in loaded_config_for_prompts[loc]: if "media" not in loaded_config_for_prompts[loc][state]: loaded_config_for_prompts[loc][state]["media"] = [] loaded_config_for_prompts[loc][state]["media"].append(item["generated_file"]) updated_count += 1 return json.dumps(loaded_config_for_prompts, indent=2), f"Updated {updated_count} media entries" # ============================================================ # ONE-CLICK TO GAMEPLAY WORKFLOW FUNCTIONS # ============================================================ # State for one-click workflow otg_workflow_state = { "active": False, "config_json": "", "total_items": 0, "completed_items": 0 } def otg_clear_and_prepare_queue(): """Clear the queue and reset workflow state for a fresh one-click run.""" global generation_queue, generation_log, otg_workflow_state generation_queue = [] generation_log = [] otg_workflow_state = { "active": True, "config_json": "", "total_items": 0, "completed_items": 0 } return get_queue_dataframe(), "Queue cleared, ready for new workflow" def otg_add_prompts_with_section_tracking(prompts_text, config_json, media_type="image"): """Add prompts to queue with section tracking for later config update. Args: prompts_text: Text containing prompts (one per line starting with 'Cinematic shot:') config_json: The generated config JSON string to track sections media_type: Type of media to generate ('image', 'audio', '3d', 'tts') """ global generation_queue, loaded_config_for_prompts, otg_workflow_state if not prompts_text or not prompts_text.strip(): return get_queue_dataframe(), f"**Queue: {len(generation_queue)} items**", "No prompts to add" # Parse config to get state keys for section tracking try: config = json.loads(config_json) if config_json else {} loaded_config_for_prompts = config # Store for later media update otg_workflow_state["config_json"] = config_json # Get list of state IDs in order (excluding 'ending' which has no media prompt) state_keys = [] if "story_location" in config: state_keys = [k for k in config["story_location"].keys() if k != "ending"] except: config = {} state_keys = [] lines = prompts_text.strip().split("\n") added = 0 state_index = 0 for line in lines: line = line.strip() # Skip header lines and empty lines if not line or line.startswith("##") or line.startswith("#"): continue # Accept lines starting with "Cinematic shot:" or any meaningful line if line.startswith("Cinematic shot:") or (len(line) > 10 and "Video Prompts" not in line): # Map to corresponding config state if available section = None if state_index < len(state_keys): state_key = state_keys[state_index] section = { "location": "story_location", "state": state_key, "source": "one_click_gameplay" } item = { "id": len(generation_queue) + 1, "type": media_type, "prompt": line.strip(), "status": "pending", "section": section, "generated_file": None } generation_queue.append(item) added += 1 state_index += 1 otg_workflow_state["total_items"] = len(generation_queue) return get_queue_dataframe(), f"**Queue: {len(generation_queue)} items**", f"Added {added} {media_type} prompts to queue" def otg_process_queue_generator(mode, text_model, image_model, use_rag=False, use_streaming=False, max_tokens=512, api_source="HF Inference", hf_model_id="", replicate_model=""): """Generator function that processes queue items one at a time, yielding progress. This is used with Gradio to show live progress during background processing. Yields: (progress_text, current_item_text, queue_dataframe, is_complete, final_config_json, error_details) """ global generation_queue, otg_workflow_state if not otg_workflow_state.get("active", False): yield ("**Workflow not active**", "", get_queue_dataframe(), True, "", "Workflow was not started") return total = len(generation_queue) if total == 0: yield ("**No items to process**", "", get_queue_dataframe(), True, "", "Queue is empty") return error_messages = [] while otg_workflow_state.get("active", False): pending = [q for q in generation_queue if q["status"] == "pending"] if not pending: break # Process one item with error handling try: result = process_next_queue_item( mode, text_model, image_model, use_rag, use_streaming, max_tokens, api_source, hf_model_id, replicate_model ) # Check if this item errored current_item_text = result[3] if len(result) > 3 else "" if "Error" in str(result[0]): error_messages.append(f"Item {len(generation_queue) - len(pending) + 1}: {result[0]}") except Exception as e: error_messages.append(f"Processing error: {str(e)}") current_item_text = f"Error: {str(e)}" # Count completed items completed = len([q for q in generation_queue if q["status"] in ["completed", "error"]]) errors = len([q for q in generation_queue if q["status"] == "error"]) otg_workflow_state["completed_items"] = completed progress_text = f"**Processing: {completed}/{total}**" + (f" ({errors} errors)" if errors > 0 else "") yield (progress_text, current_item_text, get_queue_dataframe(), False, "", "") # All done - update config with generated media paths final_config, update_status = update_config_with_generated_media() completed = len([q for q in generation_queue if q["status"] == "completed"]) errors = len([q for q in generation_queue if q["status"] == "error"]) # Build final status message if completed == 0 and errors > 0: final_progress = f"**Failed: All {errors} items had errors**" elif errors > 0: final_progress = f"**Complete with errors: {completed}/{total} succeeded, {errors} failed**" else: final_progress = f"**Complete! {completed}/{total} succeeded**" # Add update status to progress if update_status and "Updated" in update_status: final_progress += f" - {update_status}" otg_workflow_state["active"] = False # Build error details string error_details = "\n".join(error_messages) if error_messages else "" yield (final_progress, update_status, get_queue_dataframe(), True, final_config, error_details) def otg_stop_workflow(): """Stop the one-click workflow processing and update config with partial results.""" global otg_workflow_state otg_workflow_state["active"] = False completed = len([q for q in generation_queue if q["status"] == "completed"]) errors = len([q for q in generation_queue if q["status"] == "error"]) total = len(generation_queue) # Update config with any completed items partial_config, update_status = update_config_with_generated_media() status_msg = f"**Stopped** ({completed} succeeded, {errors} errors out of {total})" if update_status and "Updated" in update_status: status_msg += f" - {update_status}" return status_msg, get_queue_dataframe(), partial_config def otg_get_workflow_status(): """Get current workflow status.""" global otg_workflow_state, generation_queue if not generation_queue: return "Ready", 0, 0 pending = len([q for q in generation_queue if q["status"] == "pending"]) completed = len([q for q in generation_queue if q["status"] in ["completed", "error"]]) total = len(generation_queue) if otg_workflow_state.get("active"): return "Processing", completed, total elif pending == 0 and total > 0: return "Complete", completed, total else: return "Ready", completed, total