Source code for pyralph.hooks

"""
Hook/Event system for Ralph lifecycle events.

This module provides a mechanism for external programs to subscribe to
Ralph's lifecycle events (phase start/end, task execution, verification, etc.).

TECHNICAL DEBT (TASK-015): This module exceeds the ~500 line target at ~2100 lines.
Consider future decomposition into separate modules for:
- Hook registration and discovery
- Event execution and dispatch
- Built-in hook implementations

Usage:
    1. Create Python hooks in .ralph/hooks/*.py with:
       - EVENTS = ["TASK_SUCCESS", "TASK_FAILURE"]  # Required
       - PRIORITY = 100  # Optional, lower = earlier
       - def on_event(event): ...  # Required handler

    2. Or create executable hooks that receive JSON via stdin

Example hook (.ralph/hooks/notify.py):
    EVENTS = ["TASK_SUCCESS", "TASK_FAILURE"]

    def on_event(event):
        print(f"Task {event.task_id}: {event.event_type.name}")
"""

from abc import ABC, abstractmethod
from collections import defaultdict
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum, auto
from pathlib import Path
from typing import Any, Callable, Dict, List, Optional, Set, Tuple
import importlib.util
import json
import subprocess
import threading


# ==============================================================================
# EVENT TYPES
# ==============================================================================

[docs] class EventType(Enum): """All lifecycle events that can be hooked.""" # Phase lifecycle (generic) PHASE_START = auto() PHASE_END = auto() # Architect phase ARCHITECT_START = auto() ARCHITECT_SUCCESS = auto() ARCHITECT_FAILURE = auto() # Planner phase PLANNER_START = auto() PLANNER_SUCCESS = auto() PLANNER_FAILURE = auto() # Execute phase EXECUTE_START = auto() EXECUTE_END = auto() # Task lifecycle TASK_START = auto() TASK_SUCCESS = auto() TASK_FAILURE = auto() TASK_RETRY = auto() # Verification VERIFICATION_START = auto() VERIFICATION_SUCCESS = auto() VERIFICATION_FAILURE = auto() # Intent enhancement INTENT_ENHANCE_START = auto() INTENT_ENHANCE_SUCCESS = auto() INTENT_ENHANCE_FAILURE = auto() # PRD events PRD_CREATED = auto() PRD_ARCHIVED = auto() PRD_REVISE_START = auto() PRD_REVISE_SUCCESS = auto() PRD_REVISE_FAILURE = auto() PRD_COMPLETE = auto() PRD_INCOMPLETE = auto() # Error events ERROR = auto() # IssueWatcher events WATCHER_START = auto() WATCHER_STOP = auto() ISSUE_DETECTED = auto() ISSUE_STORED = auto() ISSUE_QUEUED = auto() ISSUE_PROCESSING_START = auto() ISSUE_PROCESSING_SUCCESS = auto() ISSUE_PROCESSING_FAILURE = auto() POLL_START = auto() POLL_SUCCESS = auto() POLL_ERROR = auto()
# ============================================================================== # EVENT PAYLOAD # ==============================================================================
[docs] @dataclass class Event: """Immutable event payload passed to hooks.""" event_type: EventType timestamp: str = field(default_factory=lambda: datetime.now().isoformat()) phase: Optional[str] = None task_id: Optional[str] = None task_description: Optional[str] = None retry_count: Optional[int] = None max_retries: Optional[int] = None error: Optional[Any] = None # AgentError when available verification_command: Optional[str] = None verification_exit_code: Optional[int] = None prd_path: Optional[str] = None # IssueWatcher-related fields issue_number: Optional[int] = None issue_title: Optional[str] = None issue_url: Optional[str] = None issues_count: Optional[int] = None metadata: Dict[str, Any] = field(default_factory=dict) def _serialize_error(self) -> Optional[str]: """Serialize error field for JSON export.""" if self.error is None: return None if hasattr(self.error, 'format_log_entry'): return self.error.format_log_entry() return str(self.error)
[docs] def to_dict(self) -> Dict[str, Any]: """Serialize event to dictionary for JSON export.""" return { "event_type": self.event_type.name, "timestamp": self.timestamp, "phase": self.phase, "task_id": self.task_id, "task_description": self.task_description, "retry_count": self.retry_count, "max_retries": self.max_retries, "error": self._serialize_error(), "verification_command": self.verification_command, "verification_exit_code": self.verification_exit_code, "prd_path": self.prd_path, "issue_number": self.issue_number, "issue_title": self.issue_title, "issue_url": self.issue_url, "issues_count": self.issues_count, "metadata": self.metadata, }
[docs] def to_json(self) -> str: """Serialize event to JSON string.""" return json.dumps(self.to_dict())
# ============================================================================== # HOOK BASE CLASS # ==============================================================================
[docs] class Hook(ABC): """Abstract base class for all hook types.""" def __init__( self, name: str, events: Set[EventType], priority: int = 100, timeout: float = 5.0, modifies_data: bool = False ): self.name = name self.events = events self.priority = priority self.timeout = timeout self.modifies_data = modifies_data
[docs] @abstractmethod def execute(self, event: Event) -> Optional[Event]: """ Execute the hook with the given event. Args: event: The event to process. Returns: If modifies_data is True, returns a modified Event or None to keep original. If modifies_data is False, return value is ignored. """ pass
# ============================================================================== # PYTHON MODULE HOOK # ==============================================================================
[docs] class PythonHook(Hook): """Hook loaded from a Python module.""" def __init__(self, path: Path, module: Any): events = self._parse_events(getattr(module, 'EVENTS', [])) priority = getattr(module, 'PRIORITY', 100) timeout = getattr(module, 'TIMEOUT', 5.0) modifies_data = getattr(module, 'MODIFIES_DATA', False) super().__init__(path.stem, events, priority, timeout, modifies_data) self._module = module self._handler: Optional[Callable[[Event], Optional[Event]]] = getattr(module, 'on_event', None) @staticmethod def _parse_events(event_names: List[str]) -> Set[EventType]: """Parse event names to EventType enum values.""" result = set() for name in event_names: if hasattr(EventType, name.upper()): result.add(EventType[name.upper()]) return result
[docs] def execute(self, event: Event) -> Optional[Event]: """ Execute the Python hook handler. Returns: Modified Event if modifies_data is True and handler returns an Event, None otherwise. """ if self._handler: result = self._handler(event) if self.modifies_data and isinstance(result, Event): return result return None
# ============================================================================== # EXECUTABLE HOOK # ==============================================================================
[docs] class ExecutableHook(Hook): """Hook that runs an external executable.""" def __init__( self, path: Path, events: Set[EventType], priority: int = 100, timeout: float = 5.0, modifies_data: bool = False ): super().__init__(path.name, events, priority, timeout, modifies_data) self._path = path
[docs] def execute(self, event: Event) -> Optional[Event]: """ Execute the external hook, passing event JSON via stdin. If modifies_data is True, the hook's stdout is parsed as JSON to create a modified Event. Returns: Modified Event if modifies_data is True and valid JSON is returned, None otherwise. """ try: result = subprocess.run( [str(self._path)], input=event.to_json(), timeout=self.timeout, capture_output=True, text=True, check=False ) if self.modifies_data and result.returncode == 0 and result.stdout: return self._parse_modified_event(result.stdout, event) except subprocess.TimeoutExpired: pass # Timeout is handled by caller except (OSError, json.JSONDecodeError) as e: # Log hook execution errors for debugging, caller handles the None return from logger import Logger Logger.debug(f"Hook execution failed for {self._path}: {type(e).__name__}: {e}") return None
@staticmethod def _parse_event_type(data: Dict[str, Any], original: Event) -> EventType: """Parse event_type from data, falling back to original.""" if 'event_type' not in data: return original.event_type name = data['event_type'] if hasattr(EventType, str(name).upper()): return EventType[name.upper()] return original.event_type @staticmethod def _parse_modified_event(json_str: str, original: Event) -> Optional[Event]: """Parse JSON output from executable hook into an Event.""" try: data = json.loads(json_str) if not isinstance(data, dict): return None return Event( event_type=ExecutableHook._parse_event_type(data, original), timestamp=data.get('timestamp', original.timestamp), phase=data.get('phase', original.phase), task_id=data.get('task_id', original.task_id), task_description=data.get('task_description', original.task_description), retry_count=data.get('retry_count', original.retry_count), max_retries=data.get('max_retries', original.max_retries), error=data.get('error', original.error), verification_command=data.get('verification_command', original.verification_command), verification_exit_code=data.get('verification_exit_code', original.verification_exit_code), prd_path=data.get('prd_path', original.prd_path), issue_number=data.get('issue_number', original.issue_number), issue_title=data.get('issue_title', original.issue_title), issue_url=data.get('issue_url', original.issue_url), issues_count=data.get('issues_count', original.issues_count), metadata=data.get('metadata', original.metadata), ) except (json.JSONDecodeError, TypeError): return None
# ============================================================================== # FUNCTION HOOK (PROGRAMMATIC) # ==============================================================================
[docs] class FunctionHook(Hook): """Hook backed by a Python callable for programmatic registration.""" def __init__( self, name: str, handler: Callable[[Event], Optional[Event]], events: Set[EventType], priority: int = 100, timeout: float = 5.0, modifies_data: bool = False ): """ Create a hook from a Python callable. Args: name: Unique identifier for this hook. handler: Callable that receives an Event and optionally returns a modified Event. events: Set of EventType values this hook subscribes to. priority: Execution order (lower = earlier). Default: 100. timeout: Maximum execution time in seconds. Default: 5.0. modifies_data: If True, handler's return value is used as modified event. """ super().__init__(name, events, priority, timeout, modifies_data) self._handler = handler
[docs] def execute(self, event: Event) -> Optional[Event]: """ Execute the callable handler. Returns: Modified Event if modifies_data is True and handler returns an Event, None otherwise. """ result = self._handler(event) if self.modifies_data and isinstance(result, Event): return result return None
# ============================================================================== # HOOK MANAGER # ==============================================================================
[docs] class HookManager: """Discovers, loads, and executes hooks.""" CONFIG_FILENAME = "hooks.yaml" def __init__(self, hooks_dir: Path, logger: Any = None): """ Initialize the hook manager. Args: hooks_dir: Path to the hooks directory (.ralph/hooks/) logger: Optional logger instance with warning() method """ self.hooks_dir = hooks_dir self._logger = logger self._hooks: Dict[EventType, List[Hook]] = defaultdict(list) self._enabled = True self._loaded = False self._enabled_hooks: Optional[Set[str]] = None # None means all hooks enabled
[docs] def enable(self) -> None: """Enable hook execution.""" self._enabled = True
[docs] def disable(self) -> None: """Disable hook execution.""" self._enabled = False
[docs] def set_enabled_hooks(self, hook_names: Optional[List[str]]) -> None: """ Set which hooks are enabled by name. Args: hook_names: List of hook names to enable, or None to enable all hooks. """ if hook_names is None: self._enabled_hooks = None else: self._enabled_hooks = set(hook_names)
@property def is_enabled(self) -> bool: """Check if hooks are enabled.""" return self._enabled
[docs] def is_hook_enabled(self, hook_name: str) -> bool: """ Check if a specific hook is enabled. Args: hook_name: The name of the hook to check. Returns: True if the hook is enabled, False otherwise. """ if not self._enabled: return False if self._enabled_hooks is None: return True return hook_name in self._enabled_hooks
[docs] def register_hook( self, name: str, handler: Callable[[Event], Optional[Event]], events: List[str], priority: int = 100, timeout: float = 5.0, modifies_data: bool = False ) -> bool: """ Register a hook programmatically. This allows plugins and external code to register hooks without creating files in the .ralph/hooks/ directory. Args: name: Unique identifier for this hook. handler: Callable that receives an Event and optionally returns a modified Event. events: List of event names to subscribe to (e.g., ["TASK_START", "TASK_SUCCESS"]). priority: Execution order (lower = earlier). Default: 100. timeout: Maximum execution time in seconds. Default: 5.0. modifies_data: If True, handler's return value is used as modified event. Returns: True if hook was registered successfully, False otherwise. Example: >>> def my_handler(event): ... print(f"Task {event.task_id} started") >>> manager.register_hook( ... name="my_plugin", ... handler=my_handler, ... events=["TASK_START", "TASK_SUCCESS"] ... ) """ # Parse event names to EventType event_types = PythonHook._parse_events(events) if not event_types: self._log_warning(f"Hook '{name}' has no valid events") return False # Check for duplicate hook name existing_hooks = self.get_all_hooks() for hook in existing_hooks: if hook.name == name: self._log_warning(f"Hook '{name}' is already registered") return False # Create and register the hook hook = FunctionHook( name=name, handler=handler, events=event_types, priority=priority, timeout=timeout, modifies_data=modifies_data ) self._register_hook(hook) return True
[docs] def unregister_hook(self, name: str) -> bool: """ Unregister a hook by name. Removes a previously registered hook from all event subscriptions. Args: name: The name of the hook to unregister. Returns: True if a hook was unregistered, False if no hook with that name was found. Example: >>> manager.unregister_hook("my_plugin") """ found = False for event_type in list(self._hooks.keys()): original_count = len(self._hooks[event_type]) self._hooks[event_type] = [h for h in self._hooks[event_type] if h.name != name] if len(self._hooks[event_type]) < original_count: found = True # Clean up empty lists if not self._hooks[event_type]: del self._hooks[event_type] return found
[docs] def clear_hooks(self) -> int: """ Clear all registered hooks. This removes all hooks (both programmatically registered and discovered). Useful for resetting state in tests or when reloading plugins. Returns: Number of unique hooks that were cleared. Example: >>> count = manager.clear_hooks() >>> print(f"Cleared {count} hooks") """ all_hooks = self.get_all_hooks() count = len(all_hooks) self._hooks.clear() self._loaded = False return count
[docs] def discover(self) -> int: """ Scan hooks directory and load all valid hooks. Hooks are loaded from three sources: 1. Python module hooks (.py files in hooks directory) 2. Executable hooks (executable files with optional .yaml config) 3. Configuration file (hooks.yaml) for registering external scripts Returns: Number of hooks loaded. """ if not self.hooks_dir.exists(): self._loaded = True return 0 count = 0 # Load hooks from configuration file first count += self._load_hooks_from_config() # Load Python module hooks for py_file in self.hooks_dir.glob("*.py"): if py_file.name.startswith("_"): continue if self._load_python_hook(py_file): count += 1 # Load executable hooks (skip config file and companion yaml files) for exe_file in self.hooks_dir.iterdir(): if exe_file.suffix in (".py", ".yaml", ".yml") or exe_file.name.startswith("_"): continue if exe_file.is_file() and self._is_executable(exe_file): if self._load_executable_hook(exe_file): count += 1 self._loaded = True return count
def _load_hooks_from_config(self) -> int: """ Load hooks from the configuration file (hooks.yaml). The configuration file allows registering external scripts as hooks without placing them in the hooks directory. Expected format: hooks: - name: my_hook path: /path/to/script.sh events: - TASK_START - TASK_SUCCESS priority: 100 # optional, default 100 timeout: 5.0 # optional, default 5.0 modifies_data: false # optional, default false Returns: Number of hooks loaded from config. """ config_path = self.hooks_dir / self.CONFIG_FILENAME if not config_path.exists(): return 0 try: import yaml with open(config_path, 'r', encoding='utf-8') as f: config = yaml.safe_load(f) if not config or not isinstance(config, dict): return 0 hooks_config = config.get('hooks', []) if not isinstance(hooks_config, list): self._log_warning(f"Invalid hooks config: 'hooks' must be a list") return 0 count = 0 for hook_entry in hooks_config: if self._load_hook_from_config_entry(hook_entry): count += 1 return count except Exception as e: self._log_warning(f"Failed to load hooks config: {e}") return 0 def _validate_config_entry(self, entry: Any) -> Optional[Tuple[str, Path, Set[EventType]]]: """ Validate a hook config entry and extract required fields. Returns: Tuple of (name, path, events) if valid, None otherwise. """ if not isinstance(entry, dict): self._log_warning("Invalid hook entry: must be a dictionary") return None name = entry.get('name') path_str = entry.get('path') if not name: self._log_warning("Hook entry missing 'name' field") return None if not path_str: self._log_warning(f"Hook '{name}' missing 'path' field") return None # Resolve path (support relative paths from hooks directory) path = Path(path_str) if Path(path_str).is_absolute() else self.hooks_dir / path_str if not path.exists() or not path.is_file(): self._log_warning(f"Hook '{name}' path does not exist or is not a file: {path}") return None event_names = entry.get('events', []) if not isinstance(event_names, list) or not event_names: self._log_warning(f"Hook '{name}' has no valid events") return None events = PythonHook._parse_events(event_names) if not events: self._log_warning(f"Hook '{name}' has no valid events") return None return name, path, events def _load_hook_from_config_entry(self, entry: Dict[str, Any]) -> bool: """ Load a single hook from a configuration entry. Returns: True if hook was loaded successfully, False otherwise. """ validated = self._validate_config_entry(entry) if not validated: return False name, path, events = validated # Extract optional fields with type coercion priority = entry.get('priority', 100) timeout = entry.get('timeout', 5.0) modifies_data = entry.get('modifies_data', False) hook = ExecutableHook( path=path, events=events, priority=int(priority) if isinstance(priority, (int, float)) else 100, timeout=float(timeout) if isinstance(timeout, (int, float)) else 5.0, modifies_data=bool(modifies_data) if isinstance(modifies_data, bool) else False ) hook.name = name self._register_hook(hook) return True def _load_python_hook(self, path: Path) -> bool: """Load a Python module hook.""" try: spec = importlib.util.spec_from_file_location(path.stem, path) if spec is None or spec.loader is None: return False module = importlib.util.module_from_spec(spec) spec.loader.exec_module(module) # Validate hook has required attributes if not hasattr(module, 'EVENTS'): self._log_warning(f"Hook '{path.name}' missing EVENTS attribute") return False if not hasattr(module, 'on_event'): self._log_warning(f"Hook '{path.name}' missing on_event function") return False hook = PythonHook(path, module) if not hook.events: self._log_warning(f"Hook '{path.name}' has no valid events") return False self._register_hook(hook) return True except Exception as e: self._log_warning(f"Failed to load hook '{path.name}': {e}") return False def _load_executable_hook(self, path: Path) -> bool: """Load an executable hook.""" try: # Try to read hook config from companion .yaml file config_path = path.with_suffix('.yaml') events: Set[EventType] = set() priority = 100 timeout = 5.0 modifies_data = False if config_path.exists(): import yaml with open(config_path, 'r', encoding='utf-8') as f: config = yaml.safe_load(f) or {} event_names = config.get('events', []) events = PythonHook._parse_events(event_names) priority = config.get('priority', 100) timeout = config.get('timeout', 5.0) modifies_data = config.get('modifies_data', False) # If no config, subscribe to all events if not events: events = set(EventType) hook = ExecutableHook(path, events, priority, timeout, modifies_data) self._register_hook(hook) return True except Exception as e: self._log_warning(f"Failed to load executable hook '{path.name}': {e}") return False def _register_hook(self, hook: Hook) -> None: """Register a hook for its subscribed events.""" for event_type in hook.events: self._hooks[event_type].append(hook) @staticmethod def _is_executable(path: Path) -> bool: """Check if a file is executable.""" import os import stat # On Windows, check for common executable extensions if os.name == 'nt': return path.suffix.lower() in {'.exe', '.bat', '.cmd', '.ps1', '.sh'} # On Unix, check executable bit try: return bool(path.stat().st_mode & (stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH)) except OSError: return False
[docs] def emit(self, event: Event) -> Event: """ Emit an event to all subscribed hooks. Hooks are executed in priority order (lower = earlier). Hook errors are caught and logged but never halt execution. Hooks with modifies_data=True can transform the event data, with modifications chained through subsequent hooks. Args: event: The event to emit. Returns: The (potentially modified) event after all hooks have executed. """ if not self._enabled: return event if not self._loaded: self.discover() hooks = self._hooks.get(event.event_type, []) if not hooks: return event # Sort by priority (lower = earlier) hooks_sorted = sorted(hooks, key=lambda h: h.priority) # Filter by enabled hooks if selective enabling is active if self._enabled_hooks is not None: hooks_sorted = [h for h in hooks_sorted if h.name in self._enabled_hooks] current_event = event for hook in hooks_sorted: modified = self._safe_execute(hook, current_event) if modified is not None: current_event = modified return current_event
def _safe_execute(self, hook: Hook, event: Event) -> Optional[Event]: """ Execute a hook with timeout and error isolation. Returns: Modified Event if hook has modifies_data=True and returns a valid Event, None otherwise. """ result: Dict[str, Any] = {'completed': False, 'error': None, 'modified_event': None} def run_hook(): try: modified = hook.execute(event) result['completed'] = True if hook.modifies_data and isinstance(modified, Event): result['modified_event'] = modified except Exception as e: result['error'] = e thread = threading.Thread(target=run_hook, daemon=True) thread.start() thread.join(timeout=hook.timeout) if thread.is_alive(): self._log_warning(f"Hook '{hook.name}' timed out after {hook.timeout}s") return None elif result['error']: self._log_warning(f"Hook '{hook.name}' failed: {result['error']}") return None return result['modified_event'] def _log_warning(self, message: str) -> None: """Log a warning message if logger is available.""" if self._logger and hasattr(self._logger, 'warning'): self._logger.warning(message)
[docs] def get_hooks_for_event(self, event_type: EventType) -> List[Hook]: """Get all hooks registered for an event type.""" return list(self._hooks.get(event_type, []))
[docs] def get_all_hooks(self) -> List[Hook]: """Get all registered hooks (deduplicated).""" seen: Dict[str, Hook] = {} for hook_list in self._hooks.values(): for hook in hook_list: if hook.name not in seen: seen[hook.name] = hook return list(seen.values())