#!/usr/bin/env python3
"""RalphOrchestrator - Core execution logic for Ralph autonomous development agent.
NOTE: This module is the documented exception to the ~500 line target (TASK-015).
As the core orchestrator containing the main RalphOrchestrator class with all CLI
flag handling and execution logic, it is expected to be the largest module in the
codebase.
"""
import datetime
import json
import shutil
import subprocess
import sys
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple
# Import configuration
from .config import CONF
# Import logging
from .logger import Logger
# Import shell
from .shell import Shell
# Import PRD utilities
from .prd import PRDManager, JsonUtils
# Import templates
from .templates import TemplateManager
# Import agents
from .agents import get_agent
from .agents.base import AgentError
# Import hooks
from .hooks import HookManager, Event, EventType
[docs]
class RalphOrchestrator:
def __init__(self, agent_name: str = "claude", enable_hooks: bool = True, enabled_hook_names: Optional[List[str]] = None,
intent: Optional[str] = None, intent_file: Optional[str] = None, prompt_file: Optional[str] = None,
enhance_intent: bool = False, enhance_intent_strict: bool = False,
tree_depth: int = 2, tree_ignore: Optional[List[str]] = None,
test_cmd: Optional[str] = None, skip_verify: bool = False, retries: Optional[int] = None,
timeout: Optional[int] = None, only: Optional[List[str]] = None, except_tasks: Optional[List[str]] = None,
resume: Optional[str] = None, include: Optional[List[str]] = None, exclude: Optional[List[str]] = None,
context_limit: Optional[int] = None,
model: Optional[str] = None, temperature: Optional[float] = None,
max_tokens: Optional[int] = None, seed: Optional[int] = None,
log_file: Optional[str] = None, log_level: Optional[str] = None,
json_output: bool = False, ndjson_output: bool = False,
print_prd: bool = False, prd_out: Optional[str] = None, archive: bool = True,
non_interactive: bool = False, ci: bool = False, status_check: bool = False,
pre: Optional[List[str]] = None, post: Optional[List[str]] = None,
plugin: Optional[List[str]] = None,
schema: Optional[str] = None, min_criteria: Optional[int] = None,
label: Optional[List[str]] = None, revise_prd: bool = False) -> None:
# Use --timeout override if provided, otherwise use config default
agent_timeout = timeout if timeout is not None else CONF.TIMEOUT_SECONDS
self.agent = get_agent(agent_name, timeout_seconds=agent_timeout,
model=model, temperature=temperature,
max_tokens=max_tokens, seed=seed)
if hasattr(self.agent, 'set_logger'):
self.agent.set_logger(Logger)
if hasattr(self.agent, 'set_config'):
self.agent.set_config(CONF)
if not self.agent.check_dependencies():
Logger.info(f"❌ Agent '{self.agent.get_name()}' dependencies not satisfied.", "RED")
sys.exit(1)
CONF.ensure_directories()
# Initialize hook system
self.hooks = HookManager(CONF.HOOKS_DIR, Logger)
if not enable_hooks:
self.hooks.disable()
elif enabled_hook_names is not None:
self.hooks.set_enabled_hooks(enabled_hook_names)
# Store intent flags for non-interactive runs
self._intent = intent
self._intent_file = intent_file
# Backwards compatibility for legacy flag name used in tests
self._intent_file_override = intent_file
self._prompt_file_override = prompt_file
self._enhance_intent = enhance_intent
self._enhance_intent_strict = enhance_intent_strict
# Store architect control flags
self._tree_depth = tree_depth
self._tree_ignore = tree_ignore
# memory feature removed; maintain compatibility without memory exports
# Store execution and verification flags
self._test_cmd_override = test_cmd
self._skip_verify = skip_verify
self._retries_override = retries
self._timeout_override = timeout
self._only_tasks = only
self._except_tasks = except_tasks
self._resume_from = resume
# Store context control flags
self._include_patterns = include
self._exclude_patterns = exclude
self._context_limit = context_limit
# Store I/O, logging and output flags
self._log_file = log_file
self._log_level = log_level
self._json_output = json_output
self._ndjson_output = ndjson_output
self._print_prd_flag = print_prd
self._prd_out = prd_out
self._archive = archive
# Store headless operation flags
self._non_interactive = non_interactive
self._ci = ci
self._status_check = status_check
# Store extensibility and hook flags
self._pre_commands = pre or []
self._post_commands = post or []
self._plugin_paths = plugin or []
# Load plugins if specified
self._load_plugins()
# Store PRD and story control flags
self._schema_path = schema
self._min_criteria = min_criteria
self._labels = label or []
self._revise_prd = revise_prd
# Initialize PRD manager for consolidated file operations
self._prd = PRDManager(CONF.PRD_FILE)
def _load_plugins(self) -> None:
"""
Load plugins from specified paths.
Plugins are Python files or directories containing hook definitions.
Each plugin can register hooks programmatically via the HookManager API.
"""
for plugin_path_str in self._plugin_paths:
plugin_path = Path(plugin_path_str)
if not plugin_path.exists():
Logger.warning(f"Plugin path not found: {plugin_path}")
continue
if plugin_path.is_file() and plugin_path.suffix == '.py':
self._load_plugin_file(plugin_path)
elif plugin_path.is_dir():
# Load all .py files in the directory
for py_file in plugin_path.glob('*.py'):
if not py_file.name.startswith('_'):
self._load_plugin_file(py_file)
else:
Logger.warning(f"Invalid plugin path (must be .py file or directory): {plugin_path}")
def _load_plugin_file(self, path: Path) -> None:
"""
Load a single plugin file.
The plugin file should define:
- EVENTS: List of event names to subscribe to
- on_event(event): Handler function
- Optional: PRIORITY, TIMEOUT, MODIFIES_DATA
Args:
path: Path to the plugin Python file
"""
try:
import importlib.util
spec = importlib.util.spec_from_file_location(path.stem, path)
if spec is None or spec.loader is None:
Logger.warning(f"Could not load plugin: {path}")
return
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module)
# Check if plugin has required attributes
if not hasattr(module, 'EVENTS') or not hasattr(module, 'on_event'):
Logger.warning(f"Plugin '{path.name}' missing EVENTS or on_event")
return
# Register the plugin as a hook
events = getattr(module, 'EVENTS', [])
priority = getattr(module, 'PRIORITY', 100)
timeout = getattr(module, 'TIMEOUT', 5.0)
modifies_data = getattr(module, 'MODIFIES_DATA', False)
handler = getattr(module, 'on_event')
success = self.hooks.register_hook(
name=f"plugin_{path.stem}",
handler=handler,
events=events,
priority=priority,
timeout=timeout,
modifies_data=modifies_data
)
if success:
Logger.debug(f"Loaded plugin: {path.name}")
else:
Logger.warning(f"Failed to register plugin: {path.name}")
except Exception as e:
Logger.warning(f"Error loading plugin '{path.name}': {e}")
def _run_pre_commands(self, phase: str) -> bool:
"""
Run pre-execution commands before a phase.
Pre-commands are shell commands executed before each phase.
If any command fails (non-zero exit code), the phase is aborted.
Args:
phase: The phase about to run (architect, planner, execute)
Returns:
True if all commands succeeded, False if any failed
"""
if not self._pre_commands:
return True
Logger.debug(f"Running {len(self._pre_commands)} pre-command(s) for {phase} phase")
for cmd in self._pre_commands:
Logger.debug(f" Pre-command: {cmd}")
stdout, stderr, code = Shell.run(cmd, timeout=60)
if code != 0:
Logger.error(f"Pre-command failed: {cmd}")
Logger.error(f" Exit code: {code}")
if stderr:
Logger.error(f" Stderr: {stderr[:500]}")
self.hooks.emit(Event(
EventType.ERROR,
phase=phase,
metadata={"reason": "pre_command_failed", "command": cmd, "exit_code": code}
))
return False
if stdout and Logger.verbosity >= 2:
Logger.trace(f" Output: {stdout[:200]}")
return True
def _run_post_commands(self, phase: str, success: bool) -> None:
"""
Run post-execution commands after a phase.
Post-commands are shell commands executed after each phase completes.
They receive the phase result via environment variables.
Args:
phase: The phase that just completed (architect, planner, execute)
success: Whether the phase completed successfully
Security Note:
Uses shell=True for command execution. Commands are sourced from
user-controlled configuration (--post-command flag), so command
injection risk is accepted as the user controls their own config.
Environment variables RALPH_PHASE and RALPH_SUCCESS are set with
sanitized values (fixed strings and booleans only).
"""
if not self._post_commands:
return
import os
# Set environment variables for post-commands
env = os.environ.copy()
env['RALPH_PHASE'] = phase
env['RALPH_SUCCESS'] = '1' if success else '0'
Logger.debug(f"Running {len(self._post_commands)} post-command(s) for {phase} phase")
for cmd in self._post_commands:
Logger.debug(f" Post-command: {cmd}")
try:
result = subprocess.run(
cmd, shell=True, capture_output=True,
text=True, encoding='utf-8', timeout=60,
env=env
)
if result.returncode != 0:
Logger.warning(f"Post-command failed: {cmd} (exit code: {result.returncode})")
elif result.stdout and Logger.verbosity >= 2:
Logger.trace(f" Output: {result.stdout[:200]}")
except subprocess.TimeoutExpired:
Logger.warning(f"Post-command timed out: {cmd}")
except Exception as e:
Logger.warning(f"Post-command error: {cmd} ({e})")
[docs]
def run_architect(self, user_intent: str) -> None:
"""
Run the architect phase to generate architecture documentation.
Generates ARCHITECTURE.md and ARCH.md with project structure,
tech stack, and test command configuration.
Args:
user_intent: Description of what the user wants to build
"""
Logger.info("\n🕵️ Architect: Generating Architecture...", "CYAN")
self.hooks.emit(Event(EventType.PHASE_START, phase="architect"))
self.hooks.emit(Event(EventType.ARCHITECT_START, phase="architect"))
# Run pre-commands before phase execution
if not self._run_pre_commands("architect"):
Logger.info("⚠️ Architect aborted: pre-command failed.", "RED")
self.hooks.emit(Event(EventType.ARCHITECT_FAILURE, phase="architect"))
self.hooks.emit(Event(EventType.PHASE_END, phase="architect"))
self._run_post_commands("architect", success=False)
sys.exit(1)
file_tree = Shell.get_file_tree(depth=self._tree_depth, ignore=self._tree_ignore)
prompt = TemplateManager.render(
"architect.txt",
user_intent=user_intent,
file_tree=file_tree
)
success, _, _ = self.agent.run(prompt, "ARCHITECT")
if not success:
Logger.info("⚠️ Architect failed.", "RED")
self.hooks.emit(Event(EventType.ARCHITECT_FAILURE, phase="architect"))
self.hooks.emit(Event(EventType.PHASE_END, phase="architect"))
self._run_post_commands("architect", success=False)
sys.exit(1)
arch_md_path = CONF.BASE_DIR / "ARCH.md"
if not arch_md_path.exists():
Logger.info("⚠️ Architect failed: ARCH.md was not created.", "RED")
self.hooks.emit(Event(EventType.ARCHITECT_FAILURE, phase="architect"))
self.hooks.emit(Event(EventType.PHASE_END, phase="architect"))
self._run_post_commands("architect", success=False)
sys.exit(1)
Logger.info("✅ Architect completed.", "GREEN")
self.hooks.emit(Event(EventType.ARCHITECT_SUCCESS, phase="architect"))
self.hooks.emit(Event(EventType.PHASE_END, phase="architect"))
self._run_post_commands("architect", success=True)
def _validate_prd_schema(self, data: Dict[str, Any]) -> Tuple[bool, str]:
"""
Validate PRD data against a JSON schema file.
Args:
data: The PRD data to validate
Returns:
Tuple of (is_valid, error_message). error_message is empty if valid.
"""
if not self._schema_path:
return True, ""
schema_path = Path(self._schema_path)
if not schema_path.exists():
return False, f"Schema file not found: {self._schema_path}"
try:
schema = json.loads(schema_path.read_text(encoding='utf-8'))
except json.JSONDecodeError as e:
return False, f"Invalid JSON schema: {e}"
# Basic JSON schema validation (supports type, required, properties)
errors = self._validate_against_schema(data, schema, "")
if errors:
return False, "; ".join(errors)
return True, ""
def _validate_against_schema(self, data: Any, schema: Dict[str, Any], path: str) -> List[str]:
"""
Recursively validate data against a JSON schema.
Supports a subset of JSON Schema: type, required, properties, items, minItems.
Args:
data: The data to validate
schema: The schema to validate against
path: Current path in the data for error messages
Returns:
List of validation error messages
"""
errors: List[str] = []
path_prefix = f"{path}." if path else ""
# Check type
if "type" in schema:
expected_type = schema["type"]
type_map = {"string": str, "number": (int, float), "integer": int,
"boolean": bool, "array": list, "object": dict, "null": type(None)}
if expected_type in type_map:
expected = type_map[expected_type]
if not isinstance(data, expected):
errors.append(f"{path or 'root'}: expected {expected_type}, got {type(data).__name__}")
return errors # Don't check further if type is wrong
# Check required properties (for objects)
if "required" in schema and isinstance(data, dict):
for req in schema["required"]:
if req not in data:
errors.append(f"{path_prefix}{req}: required property missing")
# Check properties (for objects)
if "properties" in schema and isinstance(data, dict):
for prop, prop_schema in schema["properties"].items():
if prop in data:
errors.extend(self._validate_against_schema(data[prop], prop_schema, f"{path_prefix}{prop}"))
# Check items (for arrays)
if "items" in schema and isinstance(data, list):
for i, item in enumerate(data):
errors.extend(self._validate_against_schema(item, schema["items"], f"{path}[{i}]"))
# Check minItems (for arrays)
if "minItems" in schema and isinstance(data, list):
if len(data) < schema["minItems"]:
errors.append(f"{path or 'root'}: array has {len(data)} items, minimum is {schema['minItems']}")
return errors
def _validate_min_criteria(self, data: Dict[str, Any]) -> Tuple[bool, str]:
"""
Validate that each user story has at least the minimum number of acceptance criteria.
Args:
data: The PRD data to validate
Returns:
Tuple of (is_valid, error_message). error_message is empty if valid.
"""
if self._min_criteria is None:
return True, ""
stories = data.get("userStories", [])
violations = []
for story in stories:
story_id = story.get("id", "unknown")
criteria = story.get("acceptanceCriteria", [])
if len(criteria) < self._min_criteria:
violations.append(f"{story_id} has {len(criteria)} criteria (minimum: {self._min_criteria})")
if violations:
return False, "; ".join(violations)
return True, ""
def _apply_labels(self, data: Dict[str, Any]) -> Dict[str, Any]:
"""
Apply custom labels to the PRD data.
Labels are key=value pairs that get added to a 'labels' dict in the PRD.
Args:
data: The PRD data to annotate
Returns:
The PRD data with labels applied
"""
if not self._labels:
return data
labels_dict: Dict[str, str] = {}
for label in self._labels:
if "=" in label:
key, value = label.split("=", 1)
labels_dict[key.strip()] = value.strip()
else:
# Labels without = are treated as tags with empty value
labels_dict[label.strip()] = ""
if labels_dict:
data["labels"] = labels_dict
return data
def _revise_prd_impl(self, original_prd: Dict[str, Any]) -> Dict[str, Any]:
"""Revise PRD through the revision agent for quality improvements.
Args:
original_prd: The original PRD data to revise
Returns:
Revised PRD data, or original PRD on failure
Notes:
- If revision fails or times out, falls back to original PRD with warning
- If revised PRD fails schema validation, falls back to original PRD
- Logs when no revision was needed (PRD already optimal)
"""
Logger.info("\n🔧 Revising PRD...", "CYAN")
self.hooks.emit(Event(EventType.PRD_REVISE_START, phase="planner"))
# Convert PRD to JSON string for the prompt
original_prd_json = json.dumps(original_prd, indent=2)
prompt = TemplateManager.render(
"revise_prd.txt",
original_prd=original_prd_json
)
success, stdout, error = self.agent.run(prompt, "REVISE_PRD")
if not success:
error_msg = error.message if error else "Unknown error"
Logger.warning(f"PRD revision failed: {error_msg}")
Logger.warning("Falling back to original PRD.")
self.hooks.emit(Event(EventType.PRD_REVISE_FAILURE, phase="planner",
metadata={"reason": "agent_failure", "error": error_msg}))
return original_prd
# Parse the revised PRD from response
revised_prd, revision_summary = self._parse_revised_prd(stdout, original_prd)
if revised_prd is None:
Logger.warning("Could not parse revised PRD from response.")
Logger.warning("Falling back to original PRD.")
self.hooks.emit(Event(EventType.PRD_REVISE_FAILURE, phase="planner",
metadata={"reason": "parse_failure"}))
return original_prd
# Validate revised PRD against schema if specified
if self._schema_path:
schema_valid, schema_error = self._validate_prd_schema(revised_prd)
if not schema_valid:
Logger.warning(f"Revised PRD failed schema validation: {schema_error}")
Logger.warning("Falling back to original PRD.")
self.hooks.emit(Event(EventType.PRD_REVISE_FAILURE, phase="planner",
metadata={"reason": "schema_validation_failed", "error": schema_error}))
return original_prd
# Check if no revision was needed
if revision_summary and "no revision" in revision_summary.lower():
Logger.info("✅ PRD already optimal, no revision needed.", "GREEN")
else:
Logger.debug(f"Revision summary: {revision_summary}", "CYAN")
Logger.info("✅ PRD revised.", "GREEN")
self.hooks.emit(Event(EventType.PRD_REVISE_SUCCESS, phase="planner",
metadata={"summary": revision_summary or ""}))
return revised_prd
def _parse_revised_prd(self, response: str, fallback: Dict[str, Any]) -> Tuple[Optional[Dict[str, Any]], Optional[str]]:
"""Parse the revised PRD from the agent response.
Args:
response: The raw response from the revision agent
fallback: The fallback PRD data if parsing fails
Returns:
Tuple of (revised_prd_data, revision_summary). revised_prd_data is None if parsing fails.
"""
import re
# Extract content between <REVISED_PRD> tags
prd_pattern = r'<REVISED_PRD>\s*(.*?)\s*</REVISED_PRD>'
prd_match = re.search(prd_pattern, response, re.DOTALL)
# Extract revision summary
summary_pattern = r'<REVISION_SUMMARY>\s*(.*?)\s*</REVISION_SUMMARY>'
summary_match = re.search(summary_pattern, response, re.DOTALL)
revision_summary = summary_match.group(1).strip() if summary_match else None
if not prd_match:
Logger.warning("Could not find <REVISED_PRD> tags in response.")
return None, revision_summary
prd_text = prd_match.group(1).strip()
try:
revised_prd = JsonUtils.parse(prd_text)
# Validate basic structure
if "userStories" not in revised_prd:
Logger.warning("Revised PRD missing 'userStories' key.")
return None, revision_summary
return revised_prd, revision_summary
except json.JSONDecodeError as e:
Logger.warning(f"Invalid JSON in revised PRD: {e}")
return None, revision_summary
[docs]
def run_planner(self, user_intent: str) -> None:
"""
Run the planner phase to create a Product Requirements Document.
Generates a PRD with user stories and acceptance criteria,
saved to .ralph/prd.json.
Respects the following flags:
- --schema: Validate PRD against a JSON schema file
- --min-criteria: Ensure each story has at least N acceptance criteria
- --label: Add custom labels to the PRD
- --revise-prd: Pass PRD through revision agent before saving
Args:
user_intent: Description of what the user wants to build
"""
Logger.info("\n🧠 Planner: Creating PRD...", "CYAN")
self.hooks.emit(Event(EventType.PHASE_START, phase="planner"))
self.hooks.emit(Event(EventType.PLANNER_START, phase="planner"))
# Run pre-commands before phase execution
if not self._run_pre_commands("planner"):
Logger.info("⚠️ Planner aborted: pre-command failed.", "RED")
self.hooks.emit(Event(EventType.PLANNER_FAILURE, phase="planner"))
self.hooks.emit(Event(EventType.PHASE_END, phase="planner"))
self._run_post_commands("planner", success=False)
sys.exit(1)
prompt = TemplateManager.render(
"planner.txt",
user_intent=user_intent
)
for attempt in range(3):
success, raw, _ = self.agent.run(prompt, "PLANNER")
if not success: continue
try:
data = JsonUtils.parse(raw)
if "userStories" not in data: raise ValueError("Missing userStories")
# Validate against JSON schema if --schema is specified
schema_valid, schema_error = self._validate_prd_schema(data)
if not schema_valid:
Logger.info(f"⚠️ Schema validation failed (Attempt {attempt+1}): {schema_error}", "YELLOW")
continue
# Validate minimum acceptance criteria if --min-criteria is specified
criteria_valid, criteria_error = self._validate_min_criteria(data)
if not criteria_valid:
Logger.info(f"⚠️ Criteria validation failed (Attempt {attempt+1}): {criteria_error}", "YELLOW")
continue
# Apply labels if --label is specified
data = self._apply_labels(data)
# Revise PRD if --revise-prd is specified
if self._revise_prd:
data = self._revise_prd_impl(data)
# Re-validate against schema after revision
if self._schema_path:
schema_valid, schema_error = self._validate_prd_schema(data)
if not schema_valid:
Logger.warning(f"Revised PRD failed schema validation: {schema_error}")
# This shouldn't happen as _revise_prd_impl already validates,
# but we check again for safety
continue
self._prd.save(data)
Logger.info(f"✅ PRD Created ({len(data['userStories'])} stories).", "GREEN")
self.hooks.emit(Event(EventType.PRD_CREATED, phase="planner", prd_path=str(CONF.PRD_FILE)))
self.hooks.emit(Event(EventType.PLANNER_SUCCESS, phase="planner"))
self.hooks.emit(Event(EventType.PHASE_END, phase="planner"))
self._run_post_commands("planner", success=True)
return
except Exception as e:
Logger.info(f"⚠️ JSON Error (Attempt {attempt+1}): {e}", "YELLOW")
Logger.info("❌ Planning Failed.", "RED")
self.hooks.emit(Event(EventType.PLANNER_FAILURE, phase="planner"))
self.hooks.emit(Event(EventType.PHASE_END, phase="planner"))
self._run_post_commands("planner", success=False)
sys.exit(1)
[docs]
def execute_loop(self) -> None:
"""
Execute all pending tasks from the PRD.
Iterates through user stories, executing each pending task
with verification. Continues to next task on failure instead
of terminating. Archives the PRD upon completion.
Respects the following flags:
- --test-cmd: Override the test command
- --skip-verify: Skip verification step after task execution
- --retries: Override max retry count
- --only: Execute only specified task IDs
- --except: Skip specified task IDs
- --resume: Resume execution from a specific task ID
- --pre: Run pre-commands before phase execution
- --post: Run post-commands after phase completion
"""
prd = self._prd.load()
test_cmd = self._test_cmd_override if self._test_cmd_override else "pytest"
Logger.info(f"\n🚀 Starting Loop. Verify Command: '{test_cmd}'", "YELLOW")
if self._skip_verify:
Logger.info(" ⏭️ Verification will be skipped (--skip-verify)", "YELLOW")
self.hooks.emit(Event(EventType.PHASE_START, phase="execute"))
self.hooks.emit(Event(EventType.EXECUTE_START, phase="execute", verification_command=test_cmd))
# Run pre-commands before phase execution
if not self._run_pre_commands("execute"):
Logger.info("⚠️ Execute aborted: pre-command failed.", "RED")
self.hooks.emit(Event(EventType.EXECUTE_END, phase="execute"))
self.hooks.emit(Event(EventType.PHASE_END, phase="execute"))
self._run_post_commands("execute", success=False)
return
failed_tasks: List[str] = []
resume_found = self._resume_from is None # If no --resume, start immediately
for task in prd.get('userStories', []):
task_id = task['id']
# Handle --resume: skip tasks until we find the resume target
if not resume_found:
if task_id == self._resume_from:
resume_found = True
Logger.info(f" ➡️ Resuming from task {task_id}", "CYAN")
else:
Logger.debug(f" ⏭️ Skipping {task_id} (before resume point)")
continue
# Handle --only: execute only specified tasks
if self._only_tasks and task_id not in self._only_tasks:
Logger.debug(f" ⏭️ Skipping {task_id} (not in --only list)")
continue
# Handle --except: skip specified tasks
if self._except_tasks and task_id in self._except_tasks:
Logger.info(f" ⏭️ Skipping {task_id} (in --except list)", "YELLOW")
continue
if task.get('status') == 'completed':
continue
# Reset failed tasks to pending so they can be retried
if task.get('status') == 'failed':
task['status'] = 'pending'
Logger.info(f"\n▶️ Task {task['id']}: {task['description']}", "CYAN")
success = self._execute_task(prd, task, test_cmd)
# Save state after each task attempt
self._prd.save(prd)
if not success:
failed_tasks.append(task['id'])
# Check if --resume target was not found
if not resume_found:
Logger.warning(f"Resume task '{self._resume_from}' not found in PRD. No tasks executed.")
# Report summary
phase_success = len(failed_tasks) == 0
if failed_tasks:
Logger.info(f"\n⚠️ {len(failed_tasks)} task(s) failed: {', '.join(failed_tasks)}", "YELLOW")
Logger.info("Run 'ralph execute' again to retry failed tasks.", "YELLOW")
else:
Logger.info("\n🎉 All Tasks Complete.", "GREEN")
self._archive_prd()
self.hooks.emit(Event(EventType.EXECUTE_END, phase="execute"))
self.hooks.emit(Event(EventType.PHASE_END, phase="execute"))
self._run_post_commands("execute", success=phase_success)
def _sanitize_id(self, text: str) -> str:
"""Sanitize an ID string to contain only alphanumeric chars and hyphens/underscores."""
return "".join(c for c in text if c.isalnum() or c in '-_')
def _load_user_context(self, prd: Dict[str, Any], task: Dict[str, Any], test_cmd: str) -> str:
"""Load and prepare user context from prompt.md with variable substitution."""
import re
# Use --prompt-file override if provided, otherwise default to prompt.md
if self._prompt_file_override:
prompt_md_path = Path(self._prompt_file_override)
if not prompt_md_path.exists():
Logger.error(f"Prompt file not found: {self._prompt_file_override}")
sys.exit(1)
else:
prompt_md_path = CONF.BASE_DIR / "prompt.md"
if not prompt_md_path.exists():
return "No specific user preferences provided."
raw_text = prompt_md_path.read_text(encoding='utf-8')
# Only use prompt file if it has non-empty content
if not raw_text.strip():
if self._prompt_file_override:
Logger.warning(f"Prompt file is empty: {self._prompt_file_override}, using default user context.")
else:
Logger.warning("prompt.md exists but is empty, using default user context.")
return "No specific user preferences provided."
replacements = {
"{{PRD_ID}}": self._sanitize_id(prd['id']),
"{{PRD_DESCRIPTION}}": prd['description'],
"{{TASK_ID}}": self._sanitize_id(task['id']),
"{{TASK_DESCRIPTION}}": task['description'],
"{{TEST_CMD}}": test_cmd,
}
pattern = re.compile('|'.join(re.escape(k) for k in replacements.keys()))
return pattern.sub(lambda m: replacements[m.group()], raw_text)
def _format_acceptance_criteria(self, task: Dict[str, Any]) -> str:
"""Format acceptance criteria as a bulleted list for the developer prompt."""
criteria = task.get('acceptanceCriteria', [])
if not criteria:
return "(No acceptance criteria specified)"
return "\n".join(f"- {criterion}" for criterion in criteria)
def _verify_task(self, task: Dict[str, Any], test_cmd: str) -> Tuple[bool, Optional[AgentError]]:
"""Run verification and return (success, error_if_failed)."""
Logger.info(" 🔒 Verifying Agent's Claim...", "YELLOW")
self.hooks.emit(Event(
EventType.VERIFICATION_START, phase="execute",
task_id=task['id'], verification_command=test_cmd
))
stdout, stderr, code = Shell.run(test_cmd)
verify_log = f"CMD: {test_cmd}\nEXIT CODE: {code}\nSTDOUT:\n{stdout}\nSTDERR:\n{stderr}"
Logger.file_log(verify_log, "VERIFICATION", f"WORKER-{task['id']}")
if code == 0:
Logger.info(" ✅ Verified.", "GREEN")
self.hooks.emit(Event(
EventType.VERIFICATION_SUCCESS, phase="execute",
task_id=task['id'], verification_command=test_cmd, verification_exit_code=code
))
return True, None
Logger.info(" 🛑 Agent Hallucinated Success.", "RED")
error = AgentError(
exception_type="VerificationError",
message=f"Test command '{test_cmd}' failed with exit code {code}",
stack_trace=f"STDOUT:\n{stdout}\nSTDERR:\n{stderr}",
timestamp=datetime.datetime.now().isoformat(),
agent_name=self.agent.get_name(),
task_id=task['id'],
)
self.hooks.emit(Event(
EventType.VERIFICATION_FAILURE, phase="execute",
task_id=task['id'], verification_command=test_cmd,
verification_exit_code=code, error=error
))
return False, error
def _execute_task(self, prd: Dict[str, Any], task: Dict[str, Any], test_cmd: str) -> bool:
"""
Execute a single task with retries.
Respects the following flags:
- --skip-verify: Skip verification step after task execution
- --retries: Override max retry count
- --timeout: Override agent timeout
Returns:
True if task completed successfully, False if max retries exhausted.
"""
# Use --retries override if provided, otherwise use config default
max_retries = self._retries_override if self._retries_override is not None else CONF.MAX_RETRIES
self.hooks.emit(Event(
EventType.TASK_START, phase="execute",
task_id=task['id'], task_description=task['description'], max_retries=max_retries
))
for retry in range(max_retries):
prev_errors = CONF.PROGRESS_FILE.read_text(encoding='utf-8') if CONF.PROGRESS_FILE.exists() else ""
prompt = TemplateManager.render(
"developer.txt",
task_id=task['id'], task_description=task['description'],
acceptance_criteria=self._format_acceptance_criteria(task),
user_context=self._load_user_context(prd, task, test_cmd),
test_cmd=test_cmd, prev_errors=prev_errors if prev_errors else "(No previous errors)"
)
success, output, agent_error = self.agent.run(prompt, f"WORKER-{task['id']}")
if not success:
self._record_failure(retry, "CLI Crash", output, agent_error=agent_error, task_id=task['id'])
self._emit_retry_event(task, retry)
continue
if "STATUS: SUCCESS" in output:
# Handle --skip-verify: skip verification step if flag is set
if self._skip_verify:
Logger.info(" ⏭️ Skipping verification (--skip-verify)", "YELLOW")
task['status'] = 'completed'
if CONF.PROGRESS_FILE.exists():
CONF.PROGRESS_FILE.unlink()
self.hooks.emit(Event(
EventType.TASK_SUCCESS, phase="execute",
task_id=task['id'], task_description=task['description']
))
return True
verified, verify_error = self._verify_task(task, test_cmd)
if verified:
task['status'] = 'completed'
if CONF.PROGRESS_FILE.exists():
CONF.PROGRESS_FILE.unlink()
self.hooks.emit(Event(
EventType.TASK_SUCCESS, phase="execute",
task_id=task['id'], task_description=task['description']
))
return True
self._record_failure(retry, "Verification Failed", output[-1000:], agent_error=verify_error, task_id=task['id'])
else:
error = AgentError(
exception_type="AgentReportedFailure",
message="Agent did not report STATUS: SUCCESS",
stack_trace=f"Agent output (last 2000 chars):\n{output[-2000:]}",
timestamp=datetime.datetime.now().isoformat(),
agent_name=self.agent.get_name(),
task_id=task['id'],
)
self._record_failure(retry, "Agent Reported Failure", output[-1000:], agent_error=error, task_id=task['id'])
self._emit_retry_event(task, retry, max_retries)
Logger.info(f"🛑 Max retries for {task['id']}. Marking as failed and continuing.", "RED")
task['status'] = 'failed'
self.hooks.emit(Event(
EventType.TASK_FAILURE, phase="execute",
task_id=task['id'], task_description=task['description'],
retry_count=max_retries, max_retries=max_retries
))
return False
def _emit_retry_event(self, task: Dict[str, Any], retry: int, max_retries: Optional[int] = None) -> None:
"""Emit a task retry event."""
if max_retries is None:
max_retries = self._retries_override if self._retries_override is not None else CONF.MAX_RETRIES
self.hooks.emit(Event(
EventType.TASK_RETRY, phase="execute",
task_id=task['id'], task_description=task['description'],
retry_count=retry + 1, max_retries=max_retries
))
def _record_failure(self, retry: int, reason: str, detail: str, agent_error: Optional[AgentError] = None, task_id: Optional[str] = None) -> None:
if agent_error:
msg = (
f"Attempt {retry+1} Failed: {reason}\n"
f"--- Structured Error Context ---\n"
f"{agent_error.format_log_entry()}\n"
f"--- Agent Output (last 1000 chars) ---\n"
f"{detail}"
)
else:
msg = f"Attempt {retry+1} Failed: {reason}\n{detail}"
CONF.PROGRESS_FILE.write_text(msg, encoding='utf-8')
Logger.file_log(msg, "FAILURE_RECORD", f"RETRY-{retry+1}")
Logger.info(f" ⚠️ Retry {retry+1}/{CONF.MAX_RETRIES}: {reason}", "RED")
self.hooks.emit(Event(
EventType.ERROR,
phase="execute",
task_id=task_id or (agent_error.task_id if agent_error else None),
error=agent_error,
metadata={"reason": reason, "retry": retry + 1}
))
def _get_code_changes(self) -> str:
"""Get recent code changes using git diff.
Returns:
String containing diff output, or message if no changes or git unavailable.
"""
try:
stdout, stderr, code = Shell.run("git diff HEAD~1 --stat", timeout=30)
if code != 0:
stdout, stderr, code = Shell.run("git diff --cached --stat", timeout=30)
if code == 0 and stdout.strip():
diff_stdout, _, diff_code = Shell.run("git diff HEAD~1", timeout=60)
if diff_code == 0 and diff_stdout.strip():
if len(diff_stdout) > 50000:
return diff_stdout[:50000] + "\n... (truncated, diff too large)"
return diff_stdout
return "(No detailed diff available)"
return "(No code changes detected)"
except Exception as e:
Logger.debug(f"Failed to get code changes: {e}")
return "(Unable to detect code changes)"
def _archive_prd(self) -> None:
if not self._prd.exists():
return
# Respect --no-archive flag (archive is default behavior)
if not self._archive:
Logger.debug("Skipping PRD archival (--no-archive)")
return
ts = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
dest = CONF.ARCHIVE_DIR / f"prd_{ts}.json"
shutil.move(str(CONF.PRD_FILE), str(dest))
self._prd.invalidate_cache()
Logger.info(f"📦 PRD Archived to {dest}", "MAGENTA")
self.hooks.emit(Event(EventType.PRD_ARCHIVED, prd_path=str(dest)))
def _print_prd(self) -> None:
"""
Print the PRD contents to stdout.
Outputs the PRD as formatted JSON for inspection without execution.
"""
if not self._prd.exists():
Logger.error("No PRD file found. Run planner first.")
sys.exit(1)
if self._json_output or self._ndjson_output:
# For JSON/NDJSON mode, output as-is (already JSON)
print(self._prd.read_raw())
else:
# Pretty print with indentation
prd_data = self._prd.load()
print(json.dumps(prd_data, indent=2))
def _export_prd(self, output_path: str) -> None:
"""
Export the PRD to a specified file.
Args:
output_path: Path to write the PRD content
"""
if not self._prd.exists():
Logger.error("No PRD file found. Run planner first.")
sys.exit(1)
out_path = Path(output_path)
out_path.parent.mkdir(parents=True, exist_ok=True)
out_path.write_text(self._prd.read_raw(), encoding='utf-8')
Logger.info(f"📋 PRD exported to {out_path}", "MAGENTA")
def _save_generated_prd(self, prd_data: Dict[str, Any]) -> Optional[Path]:
"""
Save generated PRD to a file with overwrite protection.
Saves the PRD to the location specified by --prd-out flag, or to the default
.ralph/prd.json location. Handles:
- Overwrite confirmation in interactive mode
- Error in non-interactive mode when file exists
- Auto-creation of output directory
- Fallback to stdout on write errors
Args:
prd_data: The PRD dictionary to save
Returns:
Path to the saved file if successful, None if save was aborted
"""
# Determine output path: use --prd-out if specified, otherwise default
if self._prd_out:
out_path = Path(self._prd_out)
else:
out_path = CONF.PRD_FILE
prd_json = json.dumps(prd_data, indent=2)
# Check if file exists and handle overwrite
if out_path.exists():
if self._non_interactive:
Logger.error(f"Output file already exists: {out_path}")
Logger.error("Use a different path or remove the existing file.")
Logger.info("PRD content printed to stdout as fallback:", "YELLOW")
print(prd_json)
return None
else:
# Interactive mode: prompt for overwrite confirmation
if not self._prompt_overwrite_confirmation(out_path):
Logger.info("PRD save cancelled. PRD content printed to stdout:", "YELLOW")
print(prd_json)
return None
# Ensure output directory exists
try:
out_path.parent.mkdir(parents=True, exist_ok=True)
except OSError as e:
Logger.error(f"Failed to create output directory {out_path.parent}: {e}")
Logger.info("PRD content printed to stdout as fallback:", "YELLOW")
print(prd_json)
return None
# Write the file with error handling
try:
out_path.write_text(prd_json, encoding='utf-8')
Logger.info(f"📋 PRD saved to {out_path}", "MAGENTA")
# Update PRD manager cache if saved to default location
if out_path == CONF.PRD_FILE:
self._prd.invalidate_cache()
return out_path
except PermissionError as e:
Logger.error(f"Permission denied writing to {out_path}: {e}")
Logger.info("PRD content printed to stdout as fallback:", "YELLOW")
print(prd_json)
return None
except OSError as e:
Logger.error(f"Failed to write PRD to {out_path}: {e}")
Logger.info("PRD content printed to stdout as fallback:", "YELLOW")
print(prd_json)
return None
def _prompt_overwrite_confirmation(self, file_path: Path) -> bool:
"""
Prompt user to confirm overwriting an existing file.
Args:
file_path: Path to the file that would be overwritten
Returns:
True if user confirms overwrite, False otherwise
"""
while True:
response = input(
f"{Logger.COLORS['YELLOW']}File {file_path} already exists. Overwrite? (y/n): {Logger.COLORS['RESET']}"
).strip().lower()
if response in ('y', 'yes'):
return True
elif response in ('n', 'no'):
return False
else:
Logger.info(" ⚠️ Invalid input. Please enter 'y' or 'n'.", "YELLOW")
def _check_prd_status(self) -> int:
"""
Check PRD status and return appropriate exit code.
Returns:
0 if all tasks completed, 1 if tasks pending/failed, 2 if no PRD exists.
"""
if not self._prd.exists():
if Logger.json_output or Logger.ndjson_output:
print(Logger._format_json_message("No PRD file found", "error", status="no_prd", exit_code=2))
else:
Logger.error("No PRD file found. Run planner first.")
return 2
prd = self._prd.load()
tasks = prd.get('userStories', [])
if not tasks:
if Logger.json_output or Logger.ndjson_output:
print(Logger._format_json_message("PRD has no tasks", "warn", status="empty", exit_code=1))
else:
Logger.warning("PRD has no tasks.")
return 1
completed = sum(1 for t in tasks if t.get('status') == 'completed')
failed = sum(1 for t in tasks if t.get('status') == 'failed')
pending = sum(1 for t in tasks if t.get('status') in ('pending', None))
total = len(tasks)
status_data = {
"total": total,
"completed": completed,
"failed": failed,
"pending": pending,
}
if completed == total:
if Logger.json_output or Logger.ndjson_output:
print(Logger._format_json_message("All tasks completed", "info", status="success", exit_code=0, **status_data))
else:
Logger.info(f"✅ All {total} task(s) completed.", "GREEN")
return 0
else:
if Logger.json_output or Logger.ndjson_output:
print(Logger._format_json_message("Tasks incomplete", "warn", status="incomplete", exit_code=1, **status_data))
else:
Logger.warning(f"Tasks incomplete: {completed}/{total} completed, {failed} failed, {pending} pending.")
return 1
def _prompt_user_for_phase(self, phase_name: str) -> bool:
"""Prompt user to run a phase, or fail in non-interactive mode."""
if self._non_interactive:
Logger.error(f"Cannot prompt for {phase_name} phase in non-interactive mode. Use --accept-all (-y) to auto-accept.")
sys.exit(1)
return input(f"{Logger.COLORS['YELLOW']}Run {phase_name} phase? (y/n): {Logger.COLORS['RESET']}").strip().lower() == 'y'
def _get_intent(self, user_intent=None):
"""Get intent from flags or interactive prompt."""
if user_intent:
return user_intent
# Check --intent flag
if self._intent:
return self._intent
# Check --intent-file flag
if self._intent_file:
intent_path = Path(self._intent_file)
if not intent_path.exists():
Logger.error(f"Intent file not found: {self._intent_file}")
sys.exit(1)
content = intent_path.read_text(encoding='utf-8').strip()
if not content:
Logger.error(f"Intent file is empty: {self._intent_file}")
sys.exit(1)
return content
# Non-interactive mode requires --intent or --intent-file
if self._non_interactive:
Logger.error("Intent required in non-interactive mode. Use --intent or --intent-file.")
sys.exit(1)
# Interactive prompt
intent = input(f"{Logger.COLORS['YELLOW']}>> What are we building? {Logger.COLORS['RESET']}").strip()
if not intent:
sys.exit(0)
return intent
def _enhance_intent_impl(self, original_intent: str) -> str:
"""Enhance user intent through the enhancement agent.
Args:
original_intent: The original user intent to enhance
Returns:
Enhanced intent string, or original intent on failure (unless strict mode)
Raises:
SystemExit: If strict mode is enabled and enhancement fails
"""
# Validate input - empty or whitespace-only intent
if not original_intent or not original_intent.strip():
Logger.error("Cannot enhance empty or whitespace-only intent.")
sys.exit(1)
Logger.info("\n🔧 Enhancing intent...", "CYAN")
self.hooks.emit(Event(EventType.INTENT_ENHANCE_START, phase="enhance_intent"))
prompt = TemplateManager.render(
"enhance_intent.txt",
original_intent=original_intent
)
success, stdout, error = self.agent.run(prompt, "ENHANCE_INTENT")
if not success:
error_msg = error.message if error else "Unknown error"
Logger.warning(f"Intent enhancement failed: {error_msg}")
self.hooks.emit(Event(EventType.INTENT_ENHANCE_FAILURE, phase="enhance_intent"))
if self._enhance_intent_strict:
Logger.error("Intent enhancement failed in strict mode. Exiting.")
sys.exit(1)
Logger.warning("Falling back to original intent.")
return original_intent
# Extract enhanced intent from response
enhanced_intent = self._parse_enhanced_intent(stdout, original_intent)
# Validate enhanced intent is not empty
if not enhanced_intent or not enhanced_intent.strip():
Logger.warning("Enhancement agent returned empty response.")
self.hooks.emit(Event(EventType.INTENT_ENHANCE_FAILURE, phase="enhance_intent"))
if self._enhance_intent_strict:
Logger.error("Intent enhancement returned invalid response in strict mode. Exiting.")
sys.exit(1)
Logger.warning("Falling back to original intent.")
return original_intent
# Log both original and enhanced intent when verbose
Logger.debug(f"Original intent: {original_intent}", "CYAN")
Logger.debug(f"Enhanced intent: {enhanced_intent}", "GREEN")
Logger.info("✅ Intent enhanced.", "GREEN")
self.hooks.emit(Event(EventType.INTENT_ENHANCE_SUCCESS, phase="enhance_intent"))
return enhanced_intent
def _parse_enhanced_intent(self, response: str, fallback: str) -> str:
"""Parse the enhanced intent from the agent response.
Args:
response: The raw response from the enhancement agent
fallback: The fallback value if parsing fails
Returns:
The parsed enhanced intent or fallback value
"""
import re
# Try to extract content between <ENHANCED_INTENT> tags
pattern = r'<ENHANCED_INTENT>\s*(.*?)\s*</ENHANCED_INTENT>'
match = re.search(pattern, response, re.DOTALL)
if match:
return match.group(1).strip()
# If no tags found, log warning and return fallback
Logger.warning("Could not parse enhanced intent from response. Falling back to original.")
return fallback
def _get_and_enhance_intent(self, user_intent=None) -> str:
"""Get intent and optionally enhance it based on --enhance-intent flag.
Args:
user_intent: Optional pre-provided intent
Returns:
The (possibly enhanced) intent string
"""
intent = self._get_intent(user_intent)
if self._enhance_intent:
intent = self._enhance_intent_impl(intent)
return intent
def _run_single_phase(self, phase: str) -> None:
"""Run a single specified phase with prerequisite checks."""
Logger.info(f"📋 Phase: {phase} only", "YELLOW")
if phase == "planner" and not (CONF.BASE_DIR / "ARCH.md").exists():
Logger.info("❌ Architecture doc missing. Run architect first.", "RED")
sys.exit(1)
if phase == "execute" and not self._prd.exists():
Logger.info("❌ PRD missing. Run planner first.", "RED")
sys.exit(1)
if phase == "execute":
self.execute_loop()
else:
user_intent = self._get_and_enhance_intent()
if phase == "architect":
self.run_architect(user_intent)
else:
self.run_planner(user_intent)
Logger.info(f"✅ {phase.title()} complete.", "GREEN")
def _run_all_phases(self, accept_all: bool) -> None:
"""Run all phases with optional user confirmation."""
Logger.info("📋 Running all phases...", "YELLOW")
user_intent = None
# Architect phase
if (CONF.BASE_DIR / "ARCH.md").exists():
Logger.info("📋 Architecture doc exists, skipping architect.", "YELLOW")
elif accept_all or self._prompt_user_for_phase("Architect"):
user_intent = self._get_and_enhance_intent()
self.run_architect(user_intent)
else:
Logger.info("⏭️ Skipping architect.", "YELLOW")
# Planner phase
if self._prd.exists():
Logger.info("📋 PRD exists, skipping planner.", "YELLOW")
elif accept_all or self._prompt_user_for_phase("Planner"):
user_intent = self._get_and_enhance_intent(user_intent)
self.run_planner(user_intent)
else:
Logger.info("⏭️ Skipping planner.", "YELLOW")
# Execute phase
if accept_all or self._prompt_user_for_phase("Execute"):
self.execute_loop()
else:
Logger.info("⏭️ Skipping execute.", "YELLOW")
Logger.info("✅ All phases complete.", "GREEN")
[docs]
def start(self, phase: str = "all", accept_all: bool = False) -> None:
"""
Start the Ralph orchestrator.
Args:
phase: Which phase to run ("architect", "planner", "execute", or "all")
accept_all: If True, skip user confirmation prompts
Respects the following flags:
- --print-prd: Print PRD contents and exit without executing
- --prd-out: Export PRD to specified file and continue
- --status-check: Check PRD status and exit with appropriate code
"""
# Handle --status-check flag: check PRD status and exit
if self._status_check:
exit_code = self._check_prd_status()
sys.exit(exit_code)
# Handle --print-prd flag: print PRD and exit
if self._print_prd_flag:
self._print_prd()
return
# Handle --prd-out flag: export PRD to file
if self._prd_out:
self._export_prd(self._prd_out)
Logger.info(f"🤖 Ralph {self.agent.get_name()} Agent active in: {CONF.BASE_DIR}", "GREEN")
if phase in ("architect", "planner", "execute"):
self._run_single_phase(phase)
else:
self._run_all_phases(accept_all)