import gradio as gr import json import os import tempfile import shutil import zipfile import random from relatively_constant_variables import finished_product_demo, all_states from game_state import GameState, Player from condition_evaluator import ConditionEvaluator, TransitionResolver, EffectApplicator class GameSession: def __init__(self, starting_location='village', starting_state='start'): # NEW: Use GameState for rich state tracking self.game_state = GameState() self.game_state.current_location = starting_location self.game_state.current_state = starting_state self.game_state.money = 20 # Default starting money # Backwards compatibility wrapper self.player = Player(self.game_state) # Aliases for compatibility self.current_location = starting_location self.current_state = starting_state self.game_log = [] # NEW: Evaluation components (initialized lazily for Gradio serialization) self._evaluator = None self._resolver = None self._applicator = None # Apply on_enter effects for starting state self._apply_on_enter_effects() # Reset components so Gradio can pickle the session # (they recreate automatically via lazy properties) self._evaluator = None self._resolver = None self._applicator = None def _ensure_components(self): """Lazily initialize/recreate evaluation components after deserialization.""" if self._evaluator is None or self._resolver is None or self._applicator is None: self._evaluator = ConditionEvaluator(self.game_state) self._resolver = TransitionResolver(self.game_state) self._applicator = EffectApplicator(self.game_state) @property def evaluator(self): self._ensure_components() return self._evaluator @property def resolver(self): self._ensure_components() return self._resolver @property def applicator(self): self._ensure_components() return self._applicator def get_visible_choices(self): """ Get list of choices visible to player based on conditions. Returns list of (index, choice_text, is_locked, lock_reason). """ try: state = all_states[self.current_location][self.current_state] except KeyError: return [] choices = state.get('choices', []) choice_config = state.get('choice_config', {}) visible = [] for idx, choice in enumerate(choices): config = choice_config.get(choice, {}) condition = config.get('condition') if condition: is_visible = self.evaluator.evaluate(condition) if is_visible: # Show the choice (possibly with override text) display_text = config.get('visible_text', choice) visible.append((idx, display_text, False, None)) else: # Optionally show locked state hidden_text = config.get('hidden_text') if hidden_text: visible.append((idx, hidden_text, True, "Conditions not met")) # If no hidden_text, choice is completely hidden else: # No condition = always visible (backwards compatible) visible.append((idx, choice, False, None)) return visible def get_available_choice_indices(self): """Get indices of choices that are currently available (not locked).""" return [idx for idx, _, is_locked, _ in self.get_visible_choices() if not is_locked] def _apply_on_enter_effects(self): """Apply effects triggered on entering a state.""" try: state = all_states[self.current_location][self.current_state] on_enter = state.get('on_enter') if on_enter: self.applicator.apply(on_enter) # Auto-track location visit self.game_state.visit_location(self.current_location) except KeyError: pass def _check_encounter_injection(self, state: dict) -> bool: """ Check if a random encounter should be injected. Returns True if encounter was triggered (navigation handled). """ encounter = state.get('encounter_chance') if not encounter: return False probability = encounter.get('probability', 0) # Check bypass conditions bypass = encounter.get('bypass_conditions') if bypass and self.evaluator.evaluate(bypass): return False # Roll for encounter if random.random() < probability: pool = encounter.get('pool', []) if pool: encounter_state = random.choice(pool) # Store return state for after encounter self.game_state.set_flag('_in_encounter', True) self.game_state.update_knowledge('_return_state', f"{self.current_location}_{self.current_state}") self._navigate_to_state(encounter_state) self._apply_on_enter_effects() return True return False def _navigate_to_state(self, next_state: str): """ Navigate to a state, handling location/state (slash) and location_state (underscore) formats. """ # First, check for slash notation: "location/state" if '/' in next_state: parts = next_state.split('/', 1) potential_location = parts[0] potential_state = parts[1] if potential_location in all_states and potential_state in all_states[potential_location]: self.current_location = potential_location self.current_state = potential_state else: self.game_log.append(f"ERROR: Invalid transition target '{next_state}'") return elif '_' in next_state: parts = next_state.split('_', 1) potential_location = parts[0] potential_state = parts[1] if potential_location in all_states and potential_state in all_states[potential_location]: self.current_location = potential_location self.current_state = potential_state elif next_state in all_states.get(self.current_location, {}): self.current_state = next_state else: # Try to find valid split point found = False for i in range(len(next_state)): if next_state[i] == '_': loc = next_state[:i] st = next_state[i+1:] if loc in all_states and st in all_states[loc]: self.current_location = loc self.current_state = st found = True break if not found: self.game_log.append(f"ERROR: Invalid transition target '{next_state}'") return else: if next_state in all_states.get(self.current_location, {}): self.current_state = next_state else: self.game_log.append(f"ERROR: State '{next_state}' not found") return # Sync with game_state self.game_state.current_location = self.current_location self.game_state.current_state = self.current_state def save_game(self) -> str: """Serialize game state for saving.""" return self.game_state.to_json() def load_game_state(self, save_data: str) -> None: """Restore game state from save.""" self.game_state = GameState.from_json(save_data) self.player = Player(self.game_state) self.current_location = self.game_state.current_location self.current_state = self.game_state.current_state # Reset components so they get recreated with new game_state self._evaluator = None self._resolver = None self._applicator = None def make_choice(self, choice_index): """ Process a player choice with full condition/transition/effect handling. Supports: conditional choices, dynamic transitions, declarative effects, encounters. """ try: state = all_states[self.current_location][self.current_state] except KeyError as e: error_msg = f"ERROR: Cannot find state '{self.current_location}_{self.current_state}'" self.game_log.append(error_msg) return error_msg, [], "\n".join(self.game_log) choices = state.get('choices', []) if not (0 <= choice_index < len(choices)): return "Invalid choice. Please try again.", choices, "\n".join(self.game_log) choice = choices[choice_index] # NEW: Verify choice is currently visible/allowed choice_config = state.get('choice_config', {}) if choice in choice_config: condition = choice_config[choice].get('condition') if condition and not self.evaluator.evaluate(condition): error_msg = f"Choice '{choice}' is not currently available." self.game_log.append(error_msg) return state['description'], choices, "\n".join(self.game_log) # Validate transition exists if choice not in state.get('transitions', {}): error_msg = f"ERROR: No transition defined for choice '{choice}'" self.game_log.append(error_msg) return state['description'], choices, "\n".join(self.game_log) # Log the choice self.game_log.append(f"You chose: {choice}") self.game_log.append(state['description']) # Record choice in history self.game_state.record_choice( f"{self.current_location}_{self.current_state}", choice ) # NEW: Apply declarative effects first effects = state.get('effects', {}) if choice in effects: self.applicator.apply(effects[choice]) # EXISTING: Execute lambda consequences (backwards compatibility) if 'consequences' in state and choice in state['consequences']: consequence = state['consequences'][choice] if consequence: if callable(consequence): consequence(self.player) # Skip if it's not callable (string, etc.) # NEW: Check for random encounter injection if self._check_encounter_injection(state): return self.get_current_state_info() # NEW: Resolve dynamic transition (supports random, conditional) transition_spec = state['transitions'][choice] try: if isinstance(transition_spec, str): # Simple string transition (backwards compatible) next_state = transition_spec else: # Dynamic transition (dict with random/conditional logic) next_state = self.resolver.resolve(transition_spec) except ValueError as e: error_msg = f"ERROR: Transition resolution failed - {str(e)}" self.game_log.append(error_msg) return state['description'], choices, "\n".join(self.game_log) # Navigate to next state self._navigate_to_state(next_state) # NEW: Apply on_enter effects for new state self._apply_on_enter_effects() return self.get_current_state_info() def make_choice_old(self, choice_index): """Legacy make_choice for reference - can be removed later.""" try: state = all_states[self.current_location][self.current_state] except KeyError as e: error_msg = f"ERROR: Cannot find state '{self.current_location}_{self.current_state}'" self.game_log.append(error_msg) return error_msg, [], "\n".join(self.game_log) if 0 <= choice_index < len(state['choices']): choice = state['choices'][choice_index] if choice not in state['transitions']: error_msg = f"ERROR: No transition defined for choice '{choice}'" self.game_log.append(error_msg) return state['description'], state['choices'], "\n".join(self.game_log) next_state = state['transitions'][choice] self.game_log.append(f"You chose: {choice}") self.game_log.append(state['description']) if 'consequences' in state and choice in state['consequences']: if state['consequences'][choice]: state['consequences'][choice](self.player) self._navigate_to_state(next_state) return self.get_current_state_info() else: return "Invalid choice. Please try again." def _format_description(self, description): """Replace placeholders in description with actual GameState values.""" gs = self.game_state replacements = { '{inventory}': ', '.join(gs.inventory) if gs.inventory else '(empty)', '{money}': str(gs.money), '{flags}': ', '.join(f"{k}" for k, v in gs.flags.items() if v) if gs.flags else '(none)', '{counters}': ', '.join(f"{k}={v}" for k, v in gs.counters.items()) if gs.counters else '(none)', '{people_met}': ', '.join(gs.people_met) if gs.people_met else '(none)', '{locations_visited}': ', '.join(gs.locations_visited) if gs.locations_visited else '(none)', '{current_location}': gs.current_location, '{current_state}': gs.current_state, '{missions_active}': ', '.join(gs.missions_active.keys()) if gs.missions_active else '(none)', '{missions_completed}': ', '.join(gs.missions_completed) if gs.missions_completed else '(none)', } for placeholder, value in replacements.items(): description = description.replace(placeholder, value) return description def get_current_state_info(self): try: state = all_states[self.current_location][self.current_state] description = self._format_description(state['description']) choices = [f"{idx + 1}. {choice}" for idx, choice in enumerate(state['choices'])] return description, choices, "\n".join(self.game_log) except KeyError: error_msg = f"ERROR: State '{self.current_location}_{self.current_state}' not found in config" self.game_log.append(error_msg) return error_msg, [], "\n".join(self.game_log) def get_current_state_media(self): try: media = all_states[self.current_location][self.current_state]['media'] return media except KeyError: return [] def start_game(starting_location='village', starting_state='start', new_states=all_states): global all_states game_session = GameSession(starting_location, starting_state) description, choices, game_log = game_session.get_current_state_info() all_states = new_states return description, choices, game_log, game_session def make_choice(choice, game_session, with_media=False): #Calls the nested make choice function in the game session class if not choice: description, choices, game_log = game_session.get_current_state_info() return description, choices, "Please select a choice before proceeding.", game_session choice_index = int(choice.split('.')[0]) - 1 result = game_session.make_choice(choice_index) if with_media: media = game_session.get_current_state_media() return result[0], gr.update(choices=result[1]), result[2], game_session, media else: return result[0], gr.update(choices=result[1]), result[2], game_session def validate_transitions(all_states): errors = [] for location, states in all_states.items(): for state_key, state in states.items(): for transition_key, transition_state in state.get('transitions', {}).items(): # Skip complex transitions (random, conditional) - they're validated at runtime if not isinstance(transition_state, str): continue # Check if the transition is to another location # Support both slash notation (location/state) and underscore notation (location_state) if transition_state in all_states: # Direct location name - assume first state or 'start' trans_location = transition_state # Find first state in that location first_state = next(iter(all_states[trans_location].keys()), 'start') trans_state = first_state elif '/' in transition_state: # Slash notation: "location/state" parts = transition_state.split('/', 1) trans_location, trans_state = parts[0], parts[1] elif '_' in transition_state and transition_state.rsplit('_', 1)[0] in all_states: # Underscore notation only if the prefix is a valid location trans_location, trans_state = transition_state.rsplit('_', 1) else: # Same location, just state name trans_location, trans_state = location, transition_state # Validate the transition state if trans_location not in all_states or trans_state not in all_states[trans_location]: errors.append(f"Invalid transition from {location}.{state_key} to {trans_location}.{trans_state}") return errors # Lazy initialization - don't validate at import time _path_errors_cache = None def get_path_errors(): """Get path errors lazily - only validates when first called.""" global _path_errors_cache if _path_errors_cache is None: _path_errors_cache = validate_transitions(all_states) return _path_errors_cache # For backwards compatibility path_errors = [] # Empty by default, call get_path_errors() to validate def sanitize_config_for_serialization(config): """ Remove non-serializable elements (lambdas) from config. Gradio's gr.State uses pickle for serialization, and lambda functions cannot be pickled. This function replaces callable consequences with None so the config can be safely serialized. The new 'effects' system handles state changes declaratively anyway. """ for location, states in config.items(): if not isinstance(states, dict): continue for state_key, state in states.items(): if not isinstance(state, dict): continue if 'consequences' in state and isinstance(state['consequences'], dict): for choice, consequence in list(state['consequences'].items()): if callable(consequence): state['consequences'][choice] = None return config def load_game(custom_config=None, with_media=False): global all_states if not custom_config: return gr.update(value="No custom configuration provided."), None, None, None, None, None, None try: new_config = json.loads(custom_config) # Sanitize to remove lambdas that can't be pickled by Gradio new_config = sanitize_config_for_serialization(new_config) all_states = new_config # Determine the starting location and state starting_location = next(iter(all_states.keys())) starting_state = next(iter(all_states[starting_location].keys())) print(f"Starting location: {starting_location}, Starting state: {starting_state}") game_session = GameSession(starting_location, starting_state) description, choices, game_log = game_session.get_current_state_info() new_path_errors = validate_transitions(all_states) output_media = [] if with_media: media_list = all_states[starting_location][starting_state].get('media', []) print(f"Media list: {media_list}") if media_list: for media_path in media_list: #media_component = create_media_component(media_path) output_media.append(media_path) print(f"Created {len(output_media)} media components") success_message = f"Custom configuration loaded successfully!\n{new_path_errors}" return ( gr.update(value=success_message), game_log, description, gr.update(choices=choices), gr.update(value=custom_config), game_session, output_media if with_media else None ) except json.JSONDecodeError as e: error_message = format_json_error(custom_config, e) return gr.update(value=error_message), None, None, None, gr.update(value=custom_config), None, None except Exception as e: error_message = f"Error loading custom configuration: {str(e)}" return gr.update(value=error_message), None, None, None, gr.update(value=custom_config), None, None def load_game_edit_version(custom_config=None, with_media=False, custom_starting_location=None, custom_starting_state=None): global all_states if not custom_config: return gr.update(value="No custom configuration provided."), None, None, None, None, None, None try: new_config = json.loads(custom_config) # Sanitize to remove lambdas that can't be pickled by Gradio new_config = sanitize_config_for_serialization(new_config) all_states = new_config # Determine the starting location and state if custom_starting_location and custom_starting_state: if custom_starting_location not in all_states or custom_starting_state not in all_states[custom_starting_location]: raise ValueError(f"Invalid custom starting point: {custom_starting_location}, {custom_starting_state}") starting_location = custom_starting_location starting_state = custom_starting_state else: starting_location = next(iter(all_states.keys())) starting_state = next(iter(all_states[starting_location].keys())) print(f"Starting location: {starting_location}, Starting state: {starting_state}") game_session = GameSession(starting_location, starting_state) description, choices, game_log = game_session.get_current_state_info() new_path_errors = validate_transitions(all_states) output_media = [] if with_media: media_list = all_states[starting_location][starting_state].get('media', []) print(f"Media list: {media_list}") if media_list: for media_path in media_list: output_media.append(media_path) print(f"Created {len(output_media)} media components") success_message = f"Custom configuration loaded successfully!\n{new_path_errors}" return ( gr.update(value=success_message), game_log, description, gr.update(choices=choices), gr.update(value=custom_config), game_session, output_media if with_media else None ) except json.JSONDecodeError as e: error_message = format_json_error(custom_config, e) return gr.update(value=error_message), None, None, None, gr.update(value=custom_config), None, None except Exception as e: error_message = f"Error loading custom configuration: {str(e)}" return gr.update(value=error_message), None, None, None, gr.update(value=custom_config), None, None media_folder = os.path.abspath("saved_media") #make sure same as SAVE_DIR below def export_config_with_media(config_json): global media_folder """ Export the config JSON and zip it along with any files referenced in the media fields. :param config_json: JSON string containing the config :param media_folder: Path to the folder containing media files :return: Path to the created zip file """ # Parse the JSON config = json.loads(config_json) # Create a temporary directory to store files for zipping with tempfile.TemporaryDirectory() as temp_dir: # Save the config JSON to the temp directory config_path = os.path.join(temp_dir, 'config.json') with open(config_path, 'w') as f: json.dump(config, f, indent=2) # Collect all media files media_files = set() for location in config.values(): if isinstance(location, dict): for sublocation in location.values(): if isinstance(sublocation, dict) and 'media' in sublocation: media_files.update(sublocation['media']) # Copy media files to the temp directory for media_file in media_files: src_path = os.path.join(media_folder, media_file) if os.path.exists(src_path): dst_path = os.path.join(temp_dir, media_file) shutil.copy2(src_path, dst_path) else: print(f"Warning: Media file not found: {media_file}") # Create a zip file zip_path = os.path.join(os.path.dirname(media_folder), 'config_with_media.zip') with zipfile.ZipFile(zip_path, 'w') as zipf: for root, _, files in os.walk(temp_dir): for file in files: file_path = os.path.join(root, file) arcname = os.path.relpath(file_path, temp_dir) zipf.write(file_path, arcname) return zip_path def format_json_error(config, error): lineno, colno = error.lineno, error.colno lines = config.split('\n') error_line = lines[lineno - 1] if lineno <= len(lines) else "" pointer = ' ' * (colno - 1) + '^' return f"""Invalid JSON format in custom configuration: Error at line {lineno}, column {colno}: {error_line} {pointer} Error details: {str(error)}""" def display_website(link): html = f"" gr.Info("If 404 then the space/page has probably been disabled - normally due to a better alternative") return html # ==================== ENHANCED PLAYTEST FUNCTIONS ==================== def get_all_states_from_config(config_json): """ Extract all state IDs from a config for the 'Jump to State' dropdown. Returns list of "location_state" strings and a dict mapping them to display names. """ if not config_json: return [], {} try: data = json.loads(config_json) except json.JSONDecodeError: return [], {} states = [] state_display = {} for location, location_data in data.items(): if not isinstance(location_data, dict): continue for state_name, state_data in location_data.items(): if not isinstance(state_data, dict): continue state_id = f"{location}_{state_name}" # Create a display name with description preview desc = state_data.get('description', '')[:40] desc_preview = f"{desc}..." if len(state_data.get('description', '')) > 40 else desc display_name = f"{state_id}: {desc_preview}" states.append(state_id) state_display[state_id] = display_name return states, state_display def jump_to_state(config_json, state_id, with_media=False): """ Jump directly to a specific state without playing through. Useful for testing specific parts of the game. """ global all_states if not config_json or not state_id: return ( gr.update(value="Please load a config and select a state"), None, None, None, None, None, None, None ) try: new_config = json.loads(config_json) all_states = new_config # Parse state_id into location and state if '_' in state_id: parts = state_id.split('_', 1) location = parts[0] state = parts[1] # Handle nested underscores - find the valid split if location not in all_states or state not in all_states.get(location, {}): # Try other split points found = False for i in range(len(state_id)): if state_id[i] == '_': loc = state_id[:i] st = state_id[i+1:] if loc in all_states and st in all_states.get(loc, {}): location = loc state = st found = True break if not found: return ( gr.update(value=f"State '{state_id}' not found in config"), None, None, None, None, None, None, None ) else: # No underscore - assume first location location = next(iter(all_states.keys())) state = state_id game_session = GameSession(location, state) game_session.game_log.append(f"[JUMPED TO: {location}_{state}]") description, choices, game_log = game_session.get_current_state_info() output_media = [] if with_media: media_list = all_states[location][state].get('media', []) output_media = media_list if media_list else [] current_state_display = f"{location}_{state}" return ( gr.update(value=f"Jumped to state: {location}_{state}"), game_log, description, gr.update(choices=choices), gr.update(value=config_json), game_session, output_media if with_media else None, current_state_display ) except Exception as e: return ( gr.update(value=f"Error jumping to state: {str(e)}"), None, None, None, None, None, None, None ) def hot_reload_config(config_json, game_session, with_media=False): """ Reload config while preserving current game state. Updates the config but keeps player at their current position. """ global all_states if not config_json: return ( gr.update(value="No config to reload"), None, None, None, None, None, None, None ) if game_session is None: # No active session, do a normal load return load_game(config_json, with_media) + (None,) try: new_config = json.loads(config_json) # Remember current position current_location = game_session.current_location current_state = game_session.current_state # Check if current position still exists in new config if current_location in new_config and current_state in new_config.get(current_location, {}): # Update global state all_states = new_config # Get updated state info description, choices, game_log = game_session.get_current_state_info() game_session.game_log.append(f"[CONFIG RELOADED - staying at {current_location}_{current_state}]") output_media = [] if with_media: media_list = all_states[current_location][current_state].get('media', []) output_media = media_list if media_list else [] current_state_display = f"{current_location}_{current_state}" return ( gr.update(value=f"Config reloaded! Stayed at: {current_location}_{current_state}"), "\n".join(game_session.game_log), description, gr.update(choices=choices), gr.update(value=config_json), game_session, output_media if with_media else None, current_state_display ) else: # Current position doesn't exist, restart from beginning all_states = new_config starting_location = next(iter(all_states.keys())) starting_state = next(iter(all_states[starting_location].keys())) new_session = GameSession(starting_location, starting_state) new_session.game_log.append(f"[CONFIG RELOADED - previous state not found, restarted]") description, choices, game_log = new_session.get_current_state_info() output_media = [] if with_media: media_list = all_states[starting_location][starting_state].get('media', []) output_media = media_list if media_list else [] current_state_display = f"{starting_location}_{starting_state}" return ( gr.update(value=f"Config reloaded! Previous state not found, restarted at: {starting_location}_{starting_state}"), "\n".join(new_session.game_log), description, gr.update(choices=choices), gr.update(value=config_json), new_session, output_media if with_media else None, current_state_display ) except json.JSONDecodeError as e: error_message = format_json_error(config_json, e) return (gr.update(value=error_message), None, None, None, gr.update(value=config_json), None, None, None) except Exception as e: return (gr.update(value=f"Error reloading config: {str(e)}"), None, None, None, None, None, None, None) def get_current_state_id(game_session): """Get the current state ID from a game session.""" if game_session is None: return "No active game" return f"{game_session.current_location}_{game_session.current_state}" def make_choice_with_state_display(choice, game_session, with_media=False): """ Enhanced make_choice that also returns the current state ID. """ # Handle case where game session doesn't exist if game_session is None: error_msg = "No game loaded. Please load a config first." if with_media: return error_msg, gr.update(choices=[]), error_msg, None, [], "No game loaded" return error_msg, gr.update(choices=[]), error_msg, None, "No game loaded" if not choice: description, choices, game_log = game_session.get_current_state_info() current_state = get_current_state_id(game_session) if with_media: return description, choices, "Please select a choice before proceeding.", game_session, [], current_state return description, choices, "Please select a choice before proceeding.", game_session, current_state choice_index = int(choice.split('.')[0]) - 1 result = game_session.make_choice(choice_index) current_state = get_current_state_id(game_session) if with_media: media = game_session.get_current_state_media() return result[0], gr.update(choices=result[1]), result[2], game_session, media, current_state else: return result[0], gr.update(choices=result[1]), result[2], game_session, current_state def export_playthrough_log(game_session): """ Export the current playthrough as a test case / documentation. """ if game_session is None: return "No active game session to export." log_lines = [ "# Playthrough Log", f"## Final State: {game_session.current_location}_{game_session.current_state}", "", "## Player Status", f"- Inventory: {game_session.player.inventory}", f"- Money: {game_session.player.money}", f"- Knowledge: {list(game_session.player.knowledge.keys())}", "", "## Game Log", "```" ] log_lines.extend(game_session.game_log) log_lines.append("```") return "\n".join(log_lines) # Lazy initialization - don't start games at import time _initgameinfo_cache = None _fpeinitgameinfo_cache = None def get_initgameinfo(): """Get initial game info lazily.""" global _initgameinfo_cache if _initgameinfo_cache is None: _initgameinfo_cache = start_game() return _initgameinfo_cache def get_fpeinitgameinfo(): """Get finished product example game info lazily.""" global _fpeinitgameinfo_cache if _fpeinitgameinfo_cache is None: _fpeinitgameinfo_cache = start_game(new_states=finished_product_demo) return _fpeinitgameinfo_cache # For backwards compatibility - these will be populated on first access via the functions above initgameinfo = None fpeinitgameinfo = None