From e07675343c6c1fa3e6bf3a16a78ca35f11387c1c Mon Sep 17 00:00:00 2001 From: wangchenguang Date: Wed, 17 Jun 2026 10:41:11 +0800 Subject: [PATCH 1/4] refactor: move extension command handlers to extensions/_commands.py (PR-7/8) Convert the flat extensions.py module into an extensions/ package and extract all extension_app and catalog_app command handlers plus their private helpers (_resolve_installed_extension, _resolve_catalog_extension, _print_extension_info) out of __init__.py into the new extensions/_commands.py, mirroring the domain-dir layout used for presets/_commands.py (PR-6) and integrations/_commands.py (PR-5). - extensions.py -> extensions/__init__.py (pure rename, 99%); intra-module relative imports bumped from `.x` to `..x` since they reference root siblings. - Root helpers (_require_specify_project, _locate_bundled_extension, load_init_options, _display_project_path) are reached through thin shims that re-fetch from the parent package at call time, so test monkeypatching of specify_cli. keeps working unchanged. - __init__.py drops ~1444 lines (3511 -> 2067); CLI surface preserved via register(app). No behavior change. Full suite failure set is identical before/after (82 pre-existing env failures, 0 new). --- src/specify_cli/__init__.py | 1455 +--------------- .../{extensions.py => extensions/__init__.py} | 40 +- src/specify_cli/extensions/_commands.py | 1490 +++++++++++++++++ 3 files changed, 1515 insertions(+), 1470 deletions(-) rename src/specify_cli/{extensions.py => extensions/__init__.py} (99%) create mode 100644 src/specify_cli/extensions/_commands.py diff --git a/src/specify_cli/__init__.py b/src/specify_cli/__init__.py index 3e4c15122f..e934a166f3 100644 --- a/src/specify_cli/__init__.py +++ b/src/specify_cli/__init__.py @@ -29,12 +29,11 @@ import contextlib import os import sys -import zipfile import json import yaml from pathlib import Path -from typing import Any, Optional +from typing import Any import typer from rich.panel import Panel @@ -56,7 +55,7 @@ show_banner, ) from ._assets import ( - _locate_bundled_extension, + _locate_bundled_extension as _locate_bundled_extension, _locate_bundled_preset as _locate_bundled_preset, _locate_bundled_workflow as _locate_bundled_workflow, _locate_core_pack, @@ -563,19 +562,9 @@ def version( # ===== Extension Commands ===== -extension_app = typer.Typer( - name="extension", - help="Manage spec-kit extensions", - add_completion=False, -) -app.add_typer(extension_app, name="extension") - -catalog_app = typer.Typer( - name="catalog", - help="Manage extension catalogs", - add_completion=False, -) -extension_app.add_typer(catalog_app, name="catalog") +# Moved to extensions/_commands.py — registered here to preserve CLI surface. +from .extensions._commands import register as _register_extension_cmds # noqa: E402 +_register_extension_cmds(app) # ===== Integration Commands ===== @@ -609,1440 +598,6 @@ def _require_specify_project() -> Path: _register_preset_cmds(app) -# ===== Extension Commands ===== - - -def _resolve_installed_extension( - argument: str, - installed_extensions: list, - command_name: str = "command", - allow_not_found: bool = False, -) -> tuple[Optional[str], Optional[str]]: - """Resolve an extension argument (ID or display name) to an installed extension. - - Args: - argument: Extension ID or display name provided by user - installed_extensions: List of installed extension dicts from manager.list_installed() - command_name: Name of the command for error messages (e.g., "enable", "disable") - allow_not_found: If True, return (None, None) when not found instead of raising - - Returns: - Tuple of (extension_id, display_name), or (None, None) if allow_not_found=True and not found - - Raises: - typer.Exit: If extension not found (and allow_not_found=False) or name is ambiguous - """ - from rich.table import Table - - # First, try exact ID match - for ext in installed_extensions: - if ext["id"] == argument: - return (ext["id"], ext["name"]) - - # If not found by ID, try display name match - name_matches = [ext for ext in installed_extensions if ext["name"].lower() == argument.lower()] - - if len(name_matches) == 1: - # Unique display-name match - return (name_matches[0]["id"], name_matches[0]["name"]) - elif len(name_matches) > 1: - # Ambiguous display-name match - console.print( - f"[red]Error:[/red] Extension name '{argument}' is ambiguous. " - "Multiple installed extensions share this name:" - ) - table = Table(title="Matching extensions") - table.add_column("ID", style="cyan", no_wrap=True) - table.add_column("Name", style="white") - table.add_column("Version", style="green") - for ext in name_matches: - table.add_row(ext.get("id", ""), ext.get("name", ""), str(ext.get("version", ""))) - console.print(table) - console.print("\nPlease rerun using the extension ID:") - console.print(f" [bold]specify extension {command_name} [/bold]") - raise typer.Exit(1) - else: - # No match by ID or display name - if allow_not_found: - return (None, None) - console.print(f"[red]Error:[/red] Extension '{argument}' is not installed") - raise typer.Exit(1) - - -def _resolve_catalog_extension( - argument: str, - catalog, - command_name: str = "info", -) -> tuple[Optional[dict], Optional[Exception]]: - """Resolve an extension argument (ID or display name) from the catalog. - - Args: - argument: Extension ID or display name provided by user - catalog: ExtensionCatalog instance - command_name: Name of the command for error messages - - Returns: - Tuple of (extension_info, catalog_error) - - If found: (ext_info_dict, None) - - If catalog error: (None, error) - - If not found: (None, None) - """ - from rich.table import Table - from .extensions import ExtensionError - - try: - # First try by ID - ext_info = catalog.get_extension_info(argument) - if ext_info: - return (ext_info, None) - - # Try by display name - search using argument as query, then filter for exact match - search_results = catalog.search(query=argument) - name_matches = [ext for ext in search_results if ext["name"].lower() == argument.lower()] - - if len(name_matches) == 1: - return (name_matches[0], None) - elif len(name_matches) > 1: - # Ambiguous display-name match in catalog - console.print( - f"[red]Error:[/red] Extension name '{argument}' is ambiguous. " - "Multiple catalog extensions share this name:" - ) - table = Table(title="Matching extensions") - table.add_column("ID", style="cyan", no_wrap=True) - table.add_column("Name", style="white") - table.add_column("Version", style="green") - table.add_column("Catalog", style="dim") - for ext in name_matches: - table.add_row( - ext.get("id", ""), - ext.get("name", ""), - str(ext.get("version", "")), - ext.get("_catalog_name", ""), - ) - console.print(table) - console.print("\nPlease rerun using the extension ID:") - console.print(f" [bold]specify extension {command_name} [/bold]") - raise typer.Exit(1) - - # Not found - return (None, None) - - except ExtensionError as e: - return (None, e) - - -@extension_app.command("list") -def extension_list( - available: bool = typer.Option(False, "--available", help="Show available extensions from catalog"), - all_extensions: bool = typer.Option(False, "--all", help="Show both installed and available"), -): - """List installed extensions.""" - from .extensions import ExtensionManager - - project_root = _require_specify_project() - manager = ExtensionManager(project_root) - installed = manager.list_installed() - - if not installed and not (available or all_extensions): - console.print("[yellow]No extensions installed.[/yellow]") - console.print("\nInstall an extension with:") - console.print(" specify extension add ") - return - - if installed: - console.print("\n[bold cyan]Installed Extensions:[/bold cyan]\n") - - for ext in installed: - status_icon = "✓" if ext["enabled"] else "✗" - status_color = "green" if ext["enabled"] else "red" - - console.print(f" [{status_color}]{status_icon}[/{status_color}] [bold]{ext['name']}[/bold] (v{ext['version']})") - console.print(f" [dim]{ext['id']}[/dim]") - console.print(f" {ext['description']}") - console.print(f" Commands: {ext['command_count']} | Hooks: {ext['hook_count']} | Priority: {ext['priority']} | Status: {'Enabled' if ext['enabled'] else 'Disabled'}") - console.print() - - if available or all_extensions: - console.print("\nInstall an extension:") - console.print(" [cyan]specify extension add [/cyan]") - - -@catalog_app.command("list") -def catalog_list(): - """List all active extension catalogs.""" - from .extensions import ExtensionCatalog, ValidationError - - project_root = _require_specify_project() - catalog = ExtensionCatalog(project_root) - - try: - active_catalogs = catalog.get_active_catalogs() - except ValidationError as e: - console.print(f"[red]Error:[/red] {e}") - raise typer.Exit(1) - - console.print("\n[bold cyan]Active Extension Catalogs:[/bold cyan]\n") - for entry in active_catalogs: - install_str = ( - "[green]install allowed[/green]" - if entry.install_allowed - else "[yellow]discovery only[/yellow]" - ) - console.print(f" [bold]{entry.name}[/bold] (priority {entry.priority})") - if entry.description: - console.print(f" {entry.description}") - console.print(f" URL: {entry.url}") - console.print(f" Install: {install_str}") - console.print() - - config_path = project_root / ".specify" / "extension-catalogs.yml" - user_config_path = Path.home() / ".specify" / "extension-catalogs.yml" - if os.environ.get("SPECKIT_CATALOG_URL"): - console.print("[dim]Catalog configured via SPECKIT_CATALOG_URL environment variable.[/dim]") - else: - try: - proj_loaded = config_path.exists() and catalog._load_catalog_config(config_path) is not None - except ValidationError: - proj_loaded = False - if proj_loaded: - console.print(f"[dim]Config: {_display_project_path(project_root, config_path)}[/dim]") - else: - try: - user_loaded = user_config_path.exists() and catalog._load_catalog_config(user_config_path) is not None - except ValidationError: - user_loaded = False - if user_loaded: - console.print("[dim]Config: ~/.specify/extension-catalogs.yml[/dim]") - else: - console.print("[dim]Using built-in default catalog stack.[/dim]") - console.print( - "[dim]Add .specify/extension-catalogs.yml to customize.[/dim]" - ) - - -@catalog_app.command("add") -def catalog_add( - url: str = typer.Argument(help="Catalog URL (must use HTTPS)"), - name: str = typer.Option(..., "--name", help="Catalog name"), - priority: int = typer.Option(10, "--priority", help="Priority (lower = higher priority)"), - install_allowed: bool = typer.Option( - False, "--install-allowed/--no-install-allowed", - help="Allow extensions from this catalog to be installed", - ), - description: str = typer.Option("", "--description", help="Description of the catalog"), -): - """Add a catalog to .specify/extension-catalogs.yml.""" - from .extensions import ExtensionCatalog, ValidationError - - project_root = _require_specify_project() - specify_dir = project_root / ".specify" - - # Validate URL - tmp_catalog = ExtensionCatalog(project_root) - try: - tmp_catalog._validate_catalog_url(url) - except ValidationError as e: - console.print(f"[red]Error:[/red] {e}") - raise typer.Exit(1) - - config_path = specify_dir / "extension-catalogs.yml" - - # Load existing config - if config_path.exists(): - try: - config = yaml.safe_load(config_path.read_text(encoding="utf-8")) or {} - except Exception as e: - config_label = _display_project_path(project_root, config_path) - console.print(f"[red]Error:[/red] Failed to read {config_label}: {e}") - raise typer.Exit(1) - else: - config = {} - - catalogs = config.get("catalogs", []) - if not isinstance(catalogs, list): - console.print("[red]Error:[/red] Invalid catalog config: 'catalogs' must be a list.") - raise typer.Exit(1) - - # Check for duplicate name - for existing in catalogs: - if isinstance(existing, dict) and existing.get("name") == name: - console.print(f"[yellow]Warning:[/yellow] A catalog named '{name}' already exists.") - console.print("Use 'specify extension catalog remove' first, or choose a different name.") - raise typer.Exit(1) - - catalogs.append({ - "name": name, - "url": url, - "priority": priority, - "install_allowed": install_allowed, - "description": description, - }) - - config["catalogs"] = catalogs - config_path.write_text(yaml.dump(config, default_flow_style=False, sort_keys=False, allow_unicode=True), encoding="utf-8") - - install_label = "install allowed" if install_allowed else "discovery only" - console.print(f"\n[green]✓[/green] Added catalog '[bold]{name}[/bold]' ({install_label})") - console.print(f" URL: {url}") - console.print(f" Priority: {priority}") - console.print(f"\nConfig saved to {_display_project_path(project_root, config_path)}") - - -@catalog_app.command("remove") -def catalog_remove( - name: str = typer.Argument(help="Catalog name to remove"), -): - """Remove a catalog from .specify/extension-catalogs.yml.""" - project_root = _require_specify_project() - specify_dir = project_root / ".specify" - - config_path = specify_dir / "extension-catalogs.yml" - if not config_path.exists(): - console.print("[red]Error:[/red] No catalog config found. Nothing to remove.") - raise typer.Exit(1) - - try: - config = yaml.safe_load(config_path.read_text(encoding="utf-8")) or {} - except Exception: - console.print("[red]Error:[/red] Failed to read catalog config.") - raise typer.Exit(1) - - catalogs = config.get("catalogs", []) - if not isinstance(catalogs, list): - console.print("[red]Error:[/red] Invalid catalog config: 'catalogs' must be a list.") - raise typer.Exit(1) - original_count = len(catalogs) - catalogs = [c for c in catalogs if isinstance(c, dict) and c.get("name") != name] - - if len(catalogs) == original_count: - console.print(f"[red]Error:[/red] Catalog '{name}' not found.") - raise typer.Exit(1) - - config["catalogs"] = catalogs - config_path.write_text(yaml.dump(config, default_flow_style=False, sort_keys=False, allow_unicode=True), encoding="utf-8") - - console.print(f"[green]✓[/green] Removed catalog '{name}'") - if not catalogs: - console.print("\n[dim]No catalogs remain in config. Built-in defaults will be used.[/dim]") - - -@extension_app.command("add") -def extension_add( - extension: str = typer.Argument(help="Extension name or path"), - dev: bool = typer.Option(False, "--dev", help="Install from local directory"), - from_url: Optional[str] = typer.Option(None, "--from", help="Install from custom URL"), - force: bool = typer.Option(False, "--force", help="Overwrite if already installed"), - priority: int = typer.Option(10, "--priority", help="Resolution priority (lower = higher precedence, default 10)"), -): - """Install an extension.""" - from .extensions import ExtensionManager, ExtensionCatalog, ExtensionError, ValidationError, CompatibilityError, REINSTALL_COMMAND - - project_root = _require_specify_project() - # Validate priority - if priority < 1: - console.print("[red]Error:[/red] Priority must be a positive integer (1 or higher)") - raise typer.Exit(1) - - manager = ExtensionManager(project_root) - speckit_version = get_speckit_version() - - if force: - console.print("[yellow]--force:[/yellow] Will overwrite if already installed") - - # Prompt for URL-based installs BEFORE the spinner so the user can - # actually see and respond to the confirmation (the Rich status - # spinner overwrites the typer.confirm prompt line, making it appear - # as though the command is hung). - # Guard with ``not dev`` so that --dev + --from does not show a - # confusing confirmation for a URL that will be ignored. - if from_url and not dev: - from urllib.parse import urlparse - from rich.markup import escape as _escape_markup - - parsed = urlparse(from_url) - is_localhost = parsed.hostname in ("localhost", "127.0.0.1", "::1") - - if parsed.scheme != "https" and not (parsed.scheme == "http" and is_localhost): - console.print("[red]Error:[/red] URL must use HTTPS for security.") - console.print("HTTP is only allowed for localhost URLs.") - raise typer.Exit(1) - - safe_url = _escape_markup(from_url) - - # Warn about untrusted sources — default-deny confirmation - console.print() - console.print(Panel( - f"[bold]You are installing an extension from an external URL that is not\n" - f"listed in any of your configured extension catalogs.[/bold]\n\n" - f"URL: {safe_url}\n\n" - f"Only install extensions from sources you trust.", - title="[bold yellow]⚠ Untrusted Source[/bold yellow]", - border_style="yellow", - padding=(1, 2), - )) - console.print() - confirm = typer.confirm("Continue with installation?", default=False) - if not confirm: - console.print("Cancelled") - raise typer.Exit(0) - - try: - with console.status(f"[cyan]Installing extension: {extension}[/cyan]"): - if dev: - # Install from local directory - source_path = Path(extension).expanduser().resolve() - if not source_path.exists(): - console.print(f"[red]Error:[/red] Directory not found: {source_path}") - raise typer.Exit(1) - - if not (source_path / "extension.yml").exists(): - console.print(f"[red]Error:[/red] No extension.yml found in {source_path}") - raise typer.Exit(1) - - if force: - console.print(f"[yellow]--force:[/yellow] Installing from [cyan]{source_path}[/cyan] (will overwrite if already installed)...") - - manifest = manager.install_from_directory( - source_path, - speckit_version, - priority=priority, - link_commands=True, - force=force - ) - - elif from_url: - # Install from URL (ZIP file) - import urllib.error - - console.print(f"Downloading from {safe_url}...") - - # Download ZIP to temp location - download_dir = project_root / ".specify" / "extensions" / ".cache" / "downloads" - download_dir.mkdir(parents=True, exist_ok=True) - zip_path = download_dir / f"{extension}-url-download.zip" - - try: - from specify_cli.authentication.http import open_url as _open_url - - with _open_url(from_url, timeout=60) as response: - zip_data = response.read() - zip_path.write_bytes(zip_data) - - # Install from downloaded ZIP - manifest = manager.install_from_zip(zip_path, speckit_version, priority=priority, force=force) - except urllib.error.URLError as e: - console.print(f"[red]Error:[/red] Failed to download from {safe_url}: {e}") - raise typer.Exit(1) - finally: - # Clean up downloaded ZIP - if zip_path.exists(): - zip_path.unlink() - - else: - # Try bundled extensions first (shipped with spec-kit) - bundled_path = _locate_bundled_extension(extension) - if bundled_path is not None: - manifest = manager.install_from_directory( - bundled_path, speckit_version, priority=priority, force=force - ) - else: - # Install from catalog (also resolves display names to IDs) - catalog = ExtensionCatalog(project_root) - - # Check if extension exists in catalog (supports both ID and display name) - ext_info, catalog_error = _resolve_catalog_extension(extension, catalog, "add") - if catalog_error: - console.print(f"[red]Error:[/red] Could not query extension catalog: {catalog_error}") - raise typer.Exit(1) - if not ext_info: - console.print(f"[red]Error:[/red] Extension '{extension}' not found in catalog") - console.print("\nSearch available extensions:") - console.print(" specify extension search") - raise typer.Exit(1) - - # If catalog resolved a display name to an ID, check bundled again - resolved_id = ext_info['id'] - if resolved_id != extension: - bundled_path = _locate_bundled_extension(resolved_id) - if bundled_path is not None: - manifest = manager.install_from_directory( - bundled_path, speckit_version, priority=priority, force=force - ) - - if bundled_path is None: - # Bundled extensions without a download URL must come from the local package - if ext_info.get("bundled") and not ext_info.get("download_url"): - console.print( - f"[red]Error:[/red] Extension '{ext_info['id']}' is bundled with spec-kit " - f"but could not be found in the installed package." - ) - console.print( - "\nThis usually means the spec-kit installation is incomplete or corrupted." - ) - console.print("Try reinstalling spec-kit:") - console.print(f" {REINSTALL_COMMAND}") - raise typer.Exit(1) - - # Enforce install_allowed policy - if not ext_info.get("_install_allowed", True): - catalog_name = ext_info.get("_catalog_name", "community") - console.print( - f"[red]Error:[/red] '{extension}' is available in the " - f"'{catalog_name}' catalog but installation is not allowed from that catalog." - ) - console.print( - f"\nTo enable installation, add '{extension}' to an approved catalog " - f"(install_allowed: true) in .specify/extension-catalogs.yml." - ) - raise typer.Exit(1) - - # Download extension ZIP (use resolved ID, not original argument which may be display name) - extension_id = ext_info['id'] - console.print(f"Downloading {ext_info['name']} v{ext_info.get('version', 'unknown')}...") - zip_path = catalog.download_extension(extension_id) - - try: - # Install from downloaded ZIP - manifest = manager.install_from_zip(zip_path, speckit_version, priority=priority, force=force) - finally: - # Clean up downloaded ZIP - if zip_path.exists(): - zip_path.unlink() - - console.print("\n[green]✓[/green] Extension installed successfully!") - console.print(f"\n[bold]{manifest.name}[/bold] (v{manifest.version})") - console.print(f" {manifest.description}") - - for warning in manifest.warnings: - console.print(f"\n[yellow]⚠ Compatibility warning:[/yellow] {warning}") - - is_cline = load_init_options(project_root).get("ai") == "cline" - - if is_cline: - from specify_cli.integrations.cline import format_cline_command_name - - console.print("\n[bold cyan]Provided commands:[/bold cyan]") - for cmd in manifest.commands: - cmd_name = cmd['name'] - if is_cline: - cmd_name = format_cline_command_name(cmd_name) - console.print(f" • {cmd_name} - {cmd.get('description', '')}") - - # Report agent skills registration - reg_meta = manager.registry.get(manifest.id) - reg_skills = reg_meta.get("registered_skills", []) if reg_meta else [] - # Normalize to guard against corrupted registry entries - if not isinstance(reg_skills, list): - reg_skills = [] - if reg_skills: - console.print(f"\n[green]✓[/green] {len(reg_skills)} agent skill(s) auto-registered") - - console.print("\n[yellow]⚠[/yellow] Configuration may be required") - console.print(f" Check: .specify/extensions/{manifest.id}/") - - except ValidationError as e: - console.print(f"\n[red]Validation Error:[/red] {e}") - raise typer.Exit(1) - except CompatibilityError as e: - console.print(f"\n[red]Compatibility Error:[/red] {e}") - raise typer.Exit(1) - except ExtensionError as e: - console.print(f"\n[red]Error:[/red] {e}") - raise typer.Exit(1) - - -@extension_app.command("remove") -def extension_remove( - extension: str = typer.Argument(help="Extension ID or name to remove"), - keep_config: bool = typer.Option(False, "--keep-config", help="Don't remove config files"), - force: bool = typer.Option(False, "--force", help="Skip confirmation"), -): - """Uninstall an extension.""" - from .extensions import ExtensionManager - - project_root = _require_specify_project() - manager = ExtensionManager(project_root) - - # Resolve extension ID from argument (handles ambiguous names) - installed = manager.list_installed() - extension_id, display_name = _resolve_installed_extension(extension, installed, "remove") - - # Get extension info for command and skill counts - ext_manifest = manager.get_extension(extension_id) - reg_meta = manager.registry.get(extension_id) - # Derive cmd_count from the registry's registered_commands (includes aliases) - # rather than from the manifest (primary commands only). Use max() across - # agents to get the per-agent count; sum() would double-count since users - # think in logical commands, not per-agent file counts. - # Use get() without a default so we can distinguish "key missing" (fall back - # to manifest) from "key present but empty dict" (zero commands registered). - registered_commands = reg_meta.get("registered_commands") if isinstance(reg_meta, dict) else None - if isinstance(registered_commands, dict): - cmd_count = max( - (len(v) for v in registered_commands.values() if isinstance(v, list)), - default=0, - ) - else: - cmd_count = len(ext_manifest.commands) if ext_manifest else 0 - raw_skills = reg_meta.get("registered_skills") if reg_meta else None - skill_count = len(raw_skills) if isinstance(raw_skills, list) else 0 - - # Confirm removal - if not force: - console.print("\n[yellow]⚠ This will remove:[/yellow]") - console.print(f" • {cmd_count} command{'s' if cmd_count != 1 else ''} per agent") - if skill_count: - console.print(f" • {skill_count} agent skill(s)") - console.print(f" • Extension directory: .specify/extensions/{extension_id}/") - if not keep_config: - console.print(" • Config files (will be backed up)") - console.print() - - confirm = typer.confirm("Continue?") - if not confirm: - console.print("Cancelled") - raise typer.Exit(0) - - # Remove extension - success = manager.remove(extension_id, keep_config=keep_config) - - if success: - console.print(f"\n[green]✓[/green] Extension '{display_name}' removed successfully") - if keep_config: - console.print(f"\nConfig files preserved in .specify/extensions/{extension_id}/") - else: - console.print(f"\nConfig files backed up to .specify/extensions/.backup/{extension_id}/") - console.print(f"\nTo reinstall: specify extension add {extension_id}") - else: - console.print("[red]Error:[/red] Failed to remove extension") - raise typer.Exit(1) - - -@extension_app.command("search") -def extension_search( - query: str = typer.Argument(None, help="Search query (optional)"), - tag: Optional[str] = typer.Option(None, "--tag", help="Filter by tag"), - author: Optional[str] = typer.Option(None, "--author", help="Filter by author"), - verified: bool = typer.Option(False, "--verified", help="Show only verified extensions"), -): - """Search for available extensions in catalog.""" - from .extensions import ExtensionCatalog, ExtensionError - - project_root = _require_specify_project() - catalog = ExtensionCatalog(project_root) - - try: - console.print("🔍 Searching extension catalog...") - results = catalog.search(query=query, tag=tag, author=author, verified_only=verified) - - if not results: - console.print("\n[yellow]No extensions found matching criteria[/yellow]") - if query or tag or author or verified: - console.print("\nTry:") - console.print(" • Broader search terms") - console.print(" • Remove filters") - console.print(" • specify extension search (show all)") - raise typer.Exit(0) - - console.print(f"\n[green]Found {len(results)} extension(s):[/green]\n") - - for ext in results: - # Extension header - verified_badge = " [green]✓ Verified[/green]" if ext.get("verified") else "" - console.print(f"[bold]{ext['name']}[/bold] (v{ext['version']}){verified_badge}") - console.print(f" {ext['description']}") - - # Metadata - console.print(f"\n [dim]Author:[/dim] {ext.get('author', 'Unknown')}") - if ext.get('tags'): - tags_str = ", ".join(ext['tags']) - console.print(f" [dim]Tags:[/dim] {tags_str}") - - # Source catalog - catalog_name = ext.get("_catalog_name", "") - install_allowed = ext.get("_install_allowed", True) - if catalog_name: - if install_allowed: - console.print(f" [dim]Catalog:[/dim] {catalog_name}") - else: - console.print(f" [dim]Catalog:[/dim] {catalog_name} [yellow](discovery only — not installable)[/yellow]") - - # Stats - stats = [] - if ext.get('downloads') is not None: - stats.append(f"Downloads: {ext['downloads']:,}") - if ext.get('stars') is not None: - stats.append(f"Stars: {ext['stars']}") - if stats: - console.print(f" [dim]{' | '.join(stats)}[/dim]") - - # Links - if ext.get('repository'): - console.print(f" [dim]Repository:[/dim] {ext['repository']}") - - # Install command (show warning if not installable) - if install_allowed: - console.print(f"\n [cyan]Install:[/cyan] specify extension add {ext['id']}") - else: - console.print(f"\n [yellow]⚠[/yellow] Not directly installable from '{catalog_name}'.") - console.print( - f" Add to an approved catalog with install_allowed: true, " - f"or install from a ZIP URL: specify extension add {ext['id']} --from " - ) - console.print() - - except ExtensionError as e: - console.print(f"\n[red]Error:[/red] {e}") - console.print("\nTip: The catalog may be temporarily unavailable. Try again later.") - raise typer.Exit(1) - - -@extension_app.command("info") -def extension_info( - extension: str = typer.Argument(help="Extension ID or name"), -): - """Show detailed information about an extension.""" - from .extensions import ExtensionCatalog, ExtensionManager, normalize_priority - - project_root = _require_specify_project() - catalog = ExtensionCatalog(project_root) - manager = ExtensionManager(project_root) - installed = manager.list_installed() - - # Try to resolve from installed extensions first (by ID or name) - # Use allow_not_found=True since the extension may be catalog-only - resolved_installed_id, resolved_installed_name = _resolve_installed_extension( - extension, installed, "info", allow_not_found=True - ) - - # Try catalog lookup (with error handling) - # If we resolved an installed extension by display name, use its ID for catalog lookup - # to ensure we get the correct catalog entry (not a different extension with same name) - lookup_key = resolved_installed_id if resolved_installed_id else extension - ext_info, catalog_error = _resolve_catalog_extension(lookup_key, catalog, "info") - - # Case 1: Found in catalog - show full catalog info - if ext_info: - _print_extension_info(ext_info, manager) - return - - # Case 2: Installed locally but catalog lookup failed or not in catalog - if resolved_installed_id: - # Get local manifest info - ext_manifest = manager.get_extension(resolved_installed_id) - metadata = manager.registry.get(resolved_installed_id) - metadata_is_dict = isinstance(metadata, dict) - if not metadata_is_dict: - console.print( - "[yellow]Warning:[/yellow] Extension metadata appears to be corrupted; " - "some information may be unavailable." - ) - version = metadata.get("version", "unknown") if metadata_is_dict else "unknown" - - console.print(f"\n[bold]{resolved_installed_name}[/bold] (v{version})") - console.print(f"ID: {resolved_installed_id}") - console.print() - - if ext_manifest: - console.print(f"{ext_manifest.description}") - console.print() - # Author is optional in extension.yml, safely retrieve it - author = ext_manifest.data.get("extension", {}).get("author") - if author: - console.print(f"[dim]Author:[/dim] {author}") - if ext_manifest.category: - console.print(f"[dim]Category:[/dim] {ext_manifest.category}") - if ext_manifest.effect: - console.print(f"[dim]Effect:[/dim] {ext_manifest.effect}") - console.print() - - if ext_manifest.commands: - console.print("[bold]Commands:[/bold]") - for cmd in ext_manifest.commands: - console.print(f" • {cmd['name']}: {cmd.get('description', '')}") - console.print() - - # Show catalog status - if catalog_error: - console.print(f"[yellow]Catalog unavailable:[/yellow] {catalog_error}") - console.print("[dim]Note: Using locally installed extension; catalog info could not be verified.[/dim]") - else: - console.print("[yellow]Note:[/yellow] Not found in catalog (custom/local extension)") - - console.print() - console.print("[green]✓ Installed[/green]") - priority = normalize_priority(metadata.get("priority") if metadata_is_dict else None) - console.print(f"[dim]Priority:[/dim] {priority}") - console.print(f"\nTo remove: specify extension remove {resolved_installed_id}") - return - - # Case 3: Not found anywhere - if catalog_error: - console.print(f"[red]Error:[/red] Could not query extension catalog: {catalog_error}") - console.print("\nTry again when online, or use the extension ID directly.") - else: - console.print(f"[red]Error:[/red] Extension '{extension}' not found") - console.print("\nTry: specify extension search") - raise typer.Exit(1) - - -def _print_extension_info(ext_info: dict, manager): - """Print formatted extension info from catalog data.""" - from .extensions import normalize_priority - - # Header - verified_badge = " [green]✓ Verified[/green]" if ext_info.get("verified") else "" - console.print(f"\n[bold]{ext_info['name']}[/bold] (v{ext_info['version']}){verified_badge}") - console.print(f"ID: {ext_info['id']}") - console.print() - - # Description - console.print(f"{ext_info['description']}") - console.print() - - # Author and License - console.print(f"[dim]Author:[/dim] {ext_info.get('author', 'Unknown')}") - console.print(f"[dim]License:[/dim] {ext_info.get('license', 'Unknown')}") - - # Category and Effect - if ext_info.get('category'): - console.print(f"[dim]Category:[/dim] {ext_info['category']}") - if ext_info.get('effect'): - console.print(f"[dim]Effect:[/dim] {ext_info['effect']}") - - # Source catalog - if ext_info.get("_catalog_name"): - install_allowed = ext_info.get("_install_allowed", True) - install_note = "" if install_allowed else " [yellow](discovery only)[/yellow]" - console.print(f"[dim]Source catalog:[/dim] {ext_info['_catalog_name']}{install_note}") - console.print() - - # Requirements - if ext_info.get('requires'): - console.print("[bold]Requirements:[/bold]") - reqs = ext_info['requires'] - if reqs.get('speckit_version'): - console.print(f" • Spec Kit: {reqs['speckit_version']}") - if reqs.get('tools'): - for tool in reqs['tools']: - tool_name = tool['name'] - tool_version = tool.get('version', 'any') - required = " (required)" if tool.get('required') else " (optional)" - console.print(f" • {tool_name}: {tool_version}{required}") - console.print() - - # Provides - if ext_info.get('provides'): - console.print("[bold]Provides:[/bold]") - provides = ext_info['provides'] - if provides.get('commands'): - console.print(f" • Commands: {provides['commands']}") - if provides.get('hooks'): - console.print(f" • Hooks: {provides['hooks']}") - console.print() - - # Tags - if ext_info.get('tags'): - tags_str = ", ".join(ext_info['tags']) - console.print(f"[bold]Tags:[/bold] {tags_str}") - console.print() - - # Statistics - stats = [] - if ext_info.get('downloads') is not None: - stats.append(f"Downloads: {ext_info['downloads']:,}") - if ext_info.get('stars') is not None: - stats.append(f"Stars: {ext_info['stars']}") - if stats: - console.print(f"[bold]Statistics:[/bold] {' | '.join(stats)}") - console.print() - - # Links - console.print("[bold]Links:[/bold]") - if ext_info.get('repository'): - console.print(f" • Repository: {ext_info['repository']}") - if ext_info.get('homepage'): - console.print(f" • Homepage: {ext_info['homepage']}") - if ext_info.get('documentation'): - console.print(f" • Documentation: {ext_info['documentation']}") - if ext_info.get('changelog'): - console.print(f" • Changelog: {ext_info['changelog']}") - console.print() - - # Installation status and command - is_installed = manager.registry.is_installed(ext_info['id']) - install_allowed = ext_info.get("_install_allowed", True) - if is_installed: - console.print("[green]✓ Installed[/green]") - metadata = manager.registry.get(ext_info['id']) - priority = normalize_priority(metadata.get("priority") if isinstance(metadata, dict) else None) - console.print(f"[dim]Priority:[/dim] {priority}") - console.print(f"\nTo remove: specify extension remove {ext_info['id']}") - elif install_allowed: - console.print("[yellow]Not installed[/yellow]") - console.print(f"\n[cyan]Install:[/cyan] specify extension add {ext_info['id']}") - else: - catalog_name = ext_info.get("_catalog_name", "community") - console.print("[yellow]Not installed[/yellow]") - console.print( - f"\n[yellow]⚠[/yellow] '{ext_info['id']}' is available in the '{catalog_name}' catalog " - f"but not in your approved catalog. Add it to .specify/extension-catalogs.yml " - f"with install_allowed: true to enable installation." - ) - - -@extension_app.command("update") -def extension_update( - extension: str = typer.Argument(None, help="Extension ID or name to update (or all)"), -): - """Update extension(s) to latest version.""" - from .extensions import ( - ExtensionManager, - ExtensionCatalog, - ExtensionError, - ValidationError, - CommandRegistrar, - HookExecutor, - normalize_priority, - ) - from packaging import version as pkg_version - import shutil - - project_root = _require_specify_project() - manager = ExtensionManager(project_root) - catalog = ExtensionCatalog(project_root) - speckit_version = get_speckit_version() - - try: - # Get list of extensions to update - installed = manager.list_installed() - if extension: - # Update specific extension - resolve ID from argument (handles ambiguous names) - extension_id, _ = _resolve_installed_extension(extension, installed, "update") - extensions_to_update = [extension_id] - else: - # Update all extensions - extensions_to_update = [ext["id"] for ext in installed] - - if not extensions_to_update: - console.print("[yellow]No extensions installed[/yellow]") - raise typer.Exit(0) - - console.print("🔄 Checking for updates...\n") - - updates_available = [] - - for ext_id in extensions_to_update: - # Get installed version - metadata = manager.registry.get(ext_id) - if metadata is None or not isinstance(metadata, dict) or "version" not in metadata: - console.print(f"⚠ {ext_id}: Registry entry corrupted or missing (skipping)") - continue - try: - installed_version = pkg_version.Version(metadata["version"]) - except pkg_version.InvalidVersion: - console.print( - f"⚠ {ext_id}: Invalid installed version '{metadata.get('version')}' in registry (skipping)" - ) - continue - - # Get catalog info - ext_info = catalog.get_extension_info(ext_id) - if not ext_info: - console.print(f"⚠ {ext_id}: Not found in catalog (skipping)") - continue - - # Check if installation is allowed from this catalog - if not ext_info.get("_install_allowed", True): - console.print(f"⚠ {ext_id}: Updates not allowed from '{ext_info.get('_catalog_name', 'catalog')}' (skipping)") - continue - - try: - catalog_version = pkg_version.Version(ext_info["version"]) - except pkg_version.InvalidVersion: - console.print( - f"⚠ {ext_id}: Invalid catalog version '{ext_info.get('version')}' (skipping)" - ) - continue - - if catalog_version > installed_version: - updates_available.append( - { - "id": ext_id, - "name": ext_info.get("name", ext_id), # Display name for status messages - "installed": str(installed_version), - "available": str(catalog_version), - "download_url": ext_info.get("download_url"), - } - ) - else: - console.print(f"✓ {ext_id}: Up to date (v{installed_version})") - - if not updates_available: - console.print("\n[green]All extensions are up to date![/green]") - raise typer.Exit(0) - - # Show available updates - console.print("\n[bold]Updates available:[/bold]\n") - for update in updates_available: - console.print( - f" • {update['id']}: {update['installed']} → {update['available']}" - ) - - console.print() - confirm = typer.confirm("Update these extensions?") - if not confirm: - console.print("Cancelled") - raise typer.Exit(0) - - # Perform updates with atomic backup/restore - console.print() - updated_extensions = [] - failed_updates = [] - registrar = CommandRegistrar() - hook_executor = HookExecutor(project_root) - from .agents import CommandRegistrar as _AgentReg # used in backup and rollback paths - - # UNSET sentinel: backup not yet captured (exception before backup step) - UNSET = object() - - for update in updates_available: - extension_id = update["id"] - ext_name = update["name"] # Use display name for user-facing messages - console.print(f"📦 Updating {ext_name}...") - - # Backup paths - backup_base = manager.extensions_dir / ".backup" / f"{extension_id}-update" - backup_ext_dir = backup_base / "extension" - backup_commands_dir = backup_base / "commands" - backup_config_dir = backup_base / "config" - - # Store backup state - backup_registry_entry = None # None means registry entry not yet captured - backup_installed = UNSET # Original installed list from extensions.yml - backup_hooks = None # None means backup step 4 not yet reached; {} or {...} means backup was captured - backed_up_command_files = {} - - try: - # 1. Backup registry entry (always, even if extension dir doesn't exist) - backup_registry_entry = manager.registry.get(extension_id) - - # 2. Backup extension directory - extension_dir = manager.extensions_dir / extension_id - if extension_dir.exists(): - backup_base.mkdir(parents=True, exist_ok=True) - if backup_ext_dir.exists(): - shutil.rmtree(backup_ext_dir) - shutil.copytree(extension_dir, backup_ext_dir) - - # Backup config files separately so they can be restored - # after a successful install (install_from_directory clears dest dir). - config_files = list(extension_dir.glob("*-config.yml")) + list( - extension_dir.glob("*-config.local.yml") - ) - for cfg_file in config_files: - backup_config_dir.mkdir(parents=True, exist_ok=True) - shutil.copy2(cfg_file, backup_config_dir / cfg_file.name) - - # 3. Backup command files for all agents - registered_commands = backup_registry_entry.get("registered_commands", {}) if isinstance(backup_registry_entry, dict) else {} - for agent_name, cmd_names in registered_commands.items(): - if agent_name not in registrar.AGENT_CONFIGS: - continue - agent_config = registrar.AGENT_CONFIGS[agent_name] - commands_dir = _AgentReg._resolve_agent_dir( - agent_name, agent_config, project_root - ) - - for cmd_name in cmd_names: - output_name = _AgentReg._compute_output_name(agent_name, cmd_name, agent_config) - cmd_file = commands_dir / f"{output_name}{agent_config['extension']}" - if cmd_file.exists(): - backup_cmd_path = backup_commands_dir / agent_name / cmd_file.name - backup_cmd_path.parent.mkdir(parents=True, exist_ok=True) - shutil.copy2(cmd_file, backup_cmd_path) - backed_up_command_files[str(cmd_file)] = str(backup_cmd_path) - - # Also backup copilot prompt files - if agent_name == "copilot": - prompt_file = project_root / ".github" / "prompts" / f"{cmd_name}.prompt.md" - if prompt_file.exists(): - backup_prompt_path = backup_commands_dir / "copilot-prompts" / prompt_file.name - backup_prompt_path.parent.mkdir(parents=True, exist_ok=True) - shutil.copy2(prompt_file, backup_prompt_path) - backed_up_command_files[str(prompt_file)] = str(backup_prompt_path) - - # 4. Backup hooks and installed list from extensions.yml - # get_project_config() always normalizes installed->[] and hooks->{}, - # so no sentinel is needed to distinguish key-absent from key-empty. - config = hook_executor.get_project_config() - if isinstance(config, dict): - import copy - # Deep-copy so nested mapping entries (e.g. version-pin dicts) - # are not affected by in-place mutations during the update. - backup_installed = copy.deepcopy(config.get("installed", [])) - backup_hooks = {} - for hook_name, hook_list in config.get("hooks", {}).items(): - if not isinstance(hook_list, list): - continue - ext_hooks = [h for h in hook_list if isinstance(h, dict) and h.get("extension") == extension_id] - if ext_hooks: - backup_hooks[hook_name] = ext_hooks - - # 5. Download new version - zip_path = catalog.download_extension(extension_id) - try: - # 6. Validate extension ID from ZIP BEFORE modifying installation - # Handle both root-level and nested extension.yml (GitHub auto-generated ZIPs) - with zipfile.ZipFile(zip_path, "r") as zf: - import yaml - manifest_data = None - namelist = zf.namelist() - - # First try root-level extension.yml - if "extension.yml" in namelist: - with zf.open("extension.yml") as f: - manifest_data = yaml.safe_load(f) or {} - else: - # Look for extension.yml in a single top-level subdirectory - # (e.g., "repo-name-branch/extension.yml") - manifest_paths = [n for n in namelist if n.endswith("/extension.yml") and n.count("/") == 1] - if len(manifest_paths) == 1: - with zf.open(manifest_paths[0]) as f: - manifest_data = yaml.safe_load(f) or {} - - if manifest_data is None: - raise ValueError("Downloaded extension archive is missing 'extension.yml'") - - zip_extension_id = manifest_data.get("extension", {}).get("id") - if zip_extension_id != extension_id: - raise ValueError( - f"Extension ID mismatch: expected '{extension_id}', got '{zip_extension_id}'" - ) - - # 7. Remove old extension (handles command file cleanup and registry removal) - manager.remove(extension_id, keep_config=True) - - # 8. Install new version - _ = manager.install_from_zip(zip_path, speckit_version) - - # Restore user config files from backup after successful install. - new_extension_dir = manager.extensions_dir / extension_id - if backup_config_dir.exists() and new_extension_dir.exists(): - for cfg_file in backup_config_dir.iterdir(): - if cfg_file.is_file(): - shutil.copy2(cfg_file, new_extension_dir / cfg_file.name) - - # 9. Restore metadata from backup (installed_at, enabled state) - if backup_registry_entry and isinstance(backup_registry_entry, dict): - # Copy current registry entry to avoid mutating internal - # registry state before explicit restore(). - current_metadata = manager.registry.get(extension_id) - if current_metadata is None or not isinstance(current_metadata, dict): - raise RuntimeError( - f"Registry entry for '{extension_id}' missing or corrupted after install — update incomplete" - ) - new_metadata = dict(current_metadata) - - # Preserve the original installation timestamp - if "installed_at" in backup_registry_entry: - new_metadata["installed_at"] = backup_registry_entry["installed_at"] - - # Preserve the original priority (normalized to handle corruption) - if "priority" in backup_registry_entry: - new_metadata["priority"] = normalize_priority(backup_registry_entry["priority"]) - - # If extension was disabled before update, disable it again - if not backup_registry_entry.get("enabled", True): - new_metadata["enabled"] = False - - # Use restore() instead of update() because update() always - # preserves the existing installed_at, ignoring our override - manager.registry.restore(extension_id, new_metadata) - - # Also disable hooks in extensions.yml if extension was disabled - if not backup_registry_entry.get("enabled", True): - config = hook_executor.get_project_config() - if "hooks" in config: - for hook_name in config["hooks"]: - for hook in config["hooks"][hook_name]: - if hook.get("extension") == extension_id: - hook["enabled"] = False - hook_executor.save_project_config(config) - finally: - # Clean up downloaded ZIP - if zip_path.exists(): - zip_path.unlink() - - # 10. Clean up backup on success - if backup_base.exists(): - shutil.rmtree(backup_base) - - console.print(f" [green]✓[/green] Updated to v{update['available']}") - updated_extensions.append(ext_name) - - except KeyboardInterrupt: - raise - except Exception as e: - console.print(f" [red]✗[/red] Failed: {e}") - failed_updates.append((ext_name, str(e))) - - # Rollback on failure - console.print(f" [yellow]↩[/yellow] Rolling back {ext_name}...") - - try: - # Restore extension directory - # Only perform destructive rollback if backup exists (meaning we - # actually modified the extension). This avoids deleting a valid - # installation when failure happened before changes were made. - extension_dir = manager.extensions_dir / extension_id - if backup_ext_dir.exists(): - if extension_dir.exists(): - shutil.rmtree(extension_dir) - shutil.copytree(backup_ext_dir, extension_dir) - - # Remove any NEW command files created by failed install - # (files that weren't in the original backup) - try: - new_registry_entry = manager.registry.get(extension_id) - if new_registry_entry is None or not isinstance(new_registry_entry, dict): - new_registered_commands = {} - else: - new_registered_commands = new_registry_entry.get("registered_commands", {}) - for agent_name, cmd_names in new_registered_commands.items(): - if agent_name not in registrar.AGENT_CONFIGS: - continue - agent_config = registrar.AGENT_CONFIGS[agent_name] - commands_dir = _AgentReg._resolve_agent_dir( - agent_name, agent_config, project_root - ) - - for cmd_name in cmd_names: - output_name = _AgentReg._compute_output_name(agent_name, cmd_name, agent_config) - cmd_file = commands_dir / f"{output_name}{agent_config['extension']}" - # Delete if it exists and wasn't in our backup - if cmd_file.exists() and str(cmd_file) not in backed_up_command_files: - cmd_file.unlink() - - # Also handle copilot prompt files - if agent_name == "copilot": - prompt_file = project_root / ".github" / "prompts" / f"{cmd_name}.prompt.md" - if prompt_file.exists() and str(prompt_file) not in backed_up_command_files: - prompt_file.unlink() - except KeyError: - pass # No new registry entry exists, nothing to clean up - - # Restore backed up command files - for original_path, backup_path in backed_up_command_files.items(): - backup_file = Path(backup_path) - if backup_file.exists(): - original_file = Path(original_path) - original_file.parent.mkdir(parents=True, exist_ok=True) - shutil.copy2(backup_file, original_file) - - # Restore metadata in extensions.yml (hooks and installed list). - # Only run if backup step 4 was reached (backup_hooks is not None); - # otherwise we have no safe baseline to restore from and could corrupt - # the config by removing pre-existing hooks. - if backup_hooks is not None: - config = hook_executor.get_project_config() - if not isinstance(config, dict): - config = {} - - modified = False - - # 1. Restore hooks in extensions.yml - if not isinstance(config.get("hooks"), dict): - config["hooks"] = {} - modified = True - - # Remove any hooks for this extension added by the failed install - for hook_name in list(config["hooks"].keys()): - hooks_list = config["hooks"][hook_name] - if not isinstance(hooks_list, list): - config["hooks"][hook_name] = [] - modified = True - continue - - original_len = len(hooks_list) - config["hooks"][hook_name] = [ - h for h in hooks_list - if isinstance(h, dict) and h.get("extension") != extension_id - ] - if len(config["hooks"][hook_name]) != original_len: - modified = True - - # Add back the backed-up hooks - if backup_hooks: - for hook_name, hooks in backup_hooks.items(): - if not isinstance(config["hooks"].get(hook_name), list): - config["hooks"][hook_name] = [] - config["hooks"][hook_name].extend(hooks) - modified = True - - # 2. Restore installed list in extensions.yml - if backup_installed is not UNSET: - if config.get("installed") != backup_installed: - config["installed"] = backup_installed - modified = True - - if modified: - hook_executor.save_project_config(config) - - # Restore registry entry (use restore() since entry was removed) - if backup_registry_entry: - manager.registry.restore(extension_id, backup_registry_entry) - - console.print(" [green]✓[/green] Rollback successful") - # Clean up backup directory only on successful rollback - if backup_base.exists(): - shutil.rmtree(backup_base) - except Exception as rollback_error: - console.print(f" [red]✗[/red] Rollback failed: {rollback_error}") - console.print(f" [dim]Backup preserved at: {backup_base}[/dim]") - - # Summary - console.print() - if updated_extensions: - console.print(f"[green]✓[/green] Successfully updated {len(updated_extensions)} extension(s)") - if failed_updates: - console.print(f"[red]✗[/red] Failed to update {len(failed_updates)} extension(s):") - for ext_name, error in failed_updates: - console.print(f" • {ext_name}: {error}") - raise typer.Exit(1) - - except ValidationError as e: - console.print(f"\n[red]Validation Error:[/red] {e}") - raise typer.Exit(1) - except ExtensionError as e: - console.print(f"\n[red]Error:[/red] {e}") - raise typer.Exit(1) - - -@extension_app.command("enable") -def extension_enable( - extension: str = typer.Argument(help="Extension ID or name to enable"), -): - """Enable a disabled extension.""" - from .extensions import ExtensionManager, HookExecutor - - project_root = _require_specify_project() - manager = ExtensionManager(project_root) - hook_executor = HookExecutor(project_root) - - # Resolve extension ID from argument (handles ambiguous names) - installed = manager.list_installed() - extension_id, display_name = _resolve_installed_extension(extension, installed, "enable") - - # Update registry - metadata = manager.registry.get(extension_id) - if metadata is None or not isinstance(metadata, dict): - console.print(f"[red]Error:[/red] Extension '{extension_id}' not found in registry (corrupted state)") - raise typer.Exit(1) - - if metadata.get("enabled", True): - console.print(f"[yellow]Extension '{display_name}' is already enabled[/yellow]") - raise typer.Exit(0) - - manager.registry.update(extension_id, {"enabled": True}) - - # Enable hooks in extensions.yml - config = hook_executor.get_project_config() - if "hooks" in config: - for hook_name in config["hooks"]: - for hook in config["hooks"][hook_name]: - if hook.get("extension") == extension_id: - hook["enabled"] = True - hook_executor.save_project_config(config) - - console.print(f"[green]✓[/green] Extension '{display_name}' enabled") - - -@extension_app.command("disable") -def extension_disable( - extension: str = typer.Argument(help="Extension ID or name to disable"), -): - """Disable an extension without removing it.""" - from .extensions import ExtensionManager, HookExecutor - - project_root = _require_specify_project() - manager = ExtensionManager(project_root) - hook_executor = HookExecutor(project_root) - - # Resolve extension ID from argument (handles ambiguous names) - installed = manager.list_installed() - extension_id, display_name = _resolve_installed_extension(extension, installed, "disable") - - # Update registry - metadata = manager.registry.get(extension_id) - if metadata is None or not isinstance(metadata, dict): - console.print(f"[red]Error:[/red] Extension '{extension_id}' not found in registry (corrupted state)") - raise typer.Exit(1) - - if not metadata.get("enabled", True): - console.print(f"[yellow]Extension '{display_name}' is already disabled[/yellow]") - raise typer.Exit(0) - - manager.registry.update(extension_id, {"enabled": False}) - - # Disable hooks in extensions.yml - config = hook_executor.get_project_config() - if "hooks" in config: - for hook_name in config["hooks"]: - for hook in config["hooks"][hook_name]: - if hook.get("extension") == extension_id: - hook["enabled"] = False - hook_executor.save_project_config(config) - - console.print(f"[green]✓[/green] Extension '{display_name}' disabled") - console.print("\nCommands will no longer be available. Hooks will not execute.") - console.print(f"To re-enable: specify extension enable {extension_id}") - - -@extension_app.command("set-priority") -def extension_set_priority( - extension: str = typer.Argument(help="Extension ID or name"), - priority: int = typer.Argument(help="New priority (lower = higher precedence)"), -): - """Set the resolution priority of an installed extension.""" - from .extensions import ExtensionManager - - project_root = _require_specify_project() - # Validate priority - if priority < 1: - console.print("[red]Error:[/red] Priority must be a positive integer (1 or higher)") - raise typer.Exit(1) - - manager = ExtensionManager(project_root) - - # Resolve extension ID from argument (handles ambiguous names) - installed = manager.list_installed() - extension_id, display_name = _resolve_installed_extension(extension, installed, "set-priority") - - # Get current metadata - metadata = manager.registry.get(extension_id) - if metadata is None or not isinstance(metadata, dict): - console.print(f"[red]Error:[/red] Extension '{extension_id}' not found in registry (corrupted state)") - raise typer.Exit(1) - - from .extensions import normalize_priority - raw_priority = metadata.get("priority") - # Only skip if the stored value is already a valid int equal to requested priority - # This ensures corrupted values (e.g., "high") get repaired even when setting to default (10) - if isinstance(raw_priority, int) and raw_priority == priority: - console.print(f"[yellow]Extension '{display_name}' already has priority {priority}[/yellow]") - raise typer.Exit(0) - - old_priority = normalize_priority(raw_priority) - - # Update priority - manager.registry.update(extension_id, {"priority": priority}) - - console.print(f"[green]✓[/green] Extension '{display_name}' priority changed: {old_priority} → {priority}") - console.print("\n[dim]Lower priority = higher precedence in template resolution[/dim]") - - # ===== Workflow Commands ===== workflow_app = typer.Typer( diff --git a/src/specify_cli/extensions.py b/src/specify_cli/extensions/__init__.py similarity index 99% rename from src/specify_cli/extensions.py rename to src/specify_cli/extensions/__init__.py index ecc26fa877..d52e7bc05d 100644 --- a/src/specify_cli/extensions.py +++ b/src/specify_cli/extensions/__init__.py @@ -26,11 +26,11 @@ from packaging import version as pkg_version from packaging.specifiers import InvalidSpecifier, SpecifierSet -from ._init_options import is_ai_skills_enabled -from ._invocation_style import is_slash_skills_agent -from ._utils import dump_frontmatter -from .catalogs import CatalogEntry as BaseCatalogEntry -from .catalogs import CatalogStackBase +from .._init_options import is_ai_skills_enabled +from .._invocation_style import is_slash_skills_agent +from .._utils import dump_frontmatter +from ..catalogs import CatalogEntry as BaseCatalogEntry +from ..catalogs import CatalogStackBase _FALLBACK_CORE_COMMAND_NAMES = frozenset( { @@ -893,7 +893,7 @@ def _get_skills_dir(self) -> Optional[Path]: be created due to symlink, containment, or permission issues so that callers can fall back gracefully. """ - from . import ( + from .. import ( _print_cli_warning, load_init_options, resolve_active_skills_dir, @@ -936,7 +936,7 @@ def _ensure_usable(skills_dir: Path) -> Optional[Path]: if not isinstance(selected_ai, str) or not selected_ai: return _ensure_usable(skills_dir) - from .agents import CommandRegistrar + from ..agents import CommandRegistrar registrar = CommandRegistrar() agent_config = registrar.AGENT_CONFIGS.get(selected_ai) @@ -973,9 +973,9 @@ def _register_extension_skills( if not skills_dir: return [] - from . import load_init_options - from .agents import CommandRegistrar - from .integrations import get_integration + from .. import load_init_options + from ..agents import CommandRegistrar + from ..integrations import get_integration written: List[str] = [] opts = load_init_options(self.project_root) @@ -1199,7 +1199,7 @@ def _unregister_extension_skills( shutil.rmtree(skill_subdir) else: # Fallback: scan all possible agent skills directories - from . import AGENT_CONFIG, DEFAULT_SKILLS_DIR + from .. import AGENT_CONFIG, DEFAULT_SKILLS_DIR candidate_dirs: set[Path] = set() for cfg in AGENT_CONFIG.values(): @@ -1614,7 +1614,7 @@ def unregister_agent_artifacts(self, agent_name: str) -> None: # Resolve the skills directory for the specific agent so cleanup is # agent-scoped and does not depend on the currently-active agent in # init-options. Use the same helper that extension install uses. - from . import _get_skills_dir as resolve_skills_dir + from .. import _get_skills_dir as resolve_skills_dir agent_skills_dir = resolve_skills_dir(self.project_root, agent_name) @@ -1690,7 +1690,7 @@ def register_enabled_extensions_for_agent(self, agent_name: str) -> None: if not agent_name: return - from . import load_init_options + from .. import load_init_options registrar = CommandRegistrar() agent_config = registrar.AGENT_CONFIGS.get(agent_name) @@ -1844,31 +1844,31 @@ class CommandRegistrar: """ # Re-export AGENT_CONFIGS at class level for direct attribute access - from .agents import CommandRegistrar as _AgentRegistrar + from ..agents import CommandRegistrar as _AgentRegistrar AGENT_CONFIGS = _AgentRegistrar.AGENT_CONFIGS def __init__(self): - from .agents import CommandRegistrar as _Registrar + from ..agents import CommandRegistrar as _Registrar self._registrar = _Registrar() # Delegate static/utility methods @staticmethod def parse_frontmatter(content: str) -> tuple[dict, str]: - from .agents import CommandRegistrar as _Registrar + from ..agents import CommandRegistrar as _Registrar return _Registrar.parse_frontmatter(content) @staticmethod def render_frontmatter(fm: dict) -> str: - from .agents import CommandRegistrar as _Registrar + from ..agents import CommandRegistrar as _Registrar return _Registrar.render_frontmatter(fm) @staticmethod def _write_copilot_prompt(project_root, cmd_name: str) -> None: - from .agents import CommandRegistrar as _Registrar + from ..agents import CommandRegistrar as _Registrar _Registrar.write_copilot_prompt(project_root, cmd_name) @@ -2819,7 +2819,7 @@ def _load_init_options(self) -> Dict[str, Any]: instance to avoid repeated filesystem reads during hook rendering. """ if self._init_options_cache is None: - from . import load_init_options + from .. import load_init_options payload = load_init_options(self.project_root) self._init_options_cache = payload if isinstance(payload, dict) else {} @@ -2858,7 +2858,7 @@ def _render_hook_invocation(self, command: Any) -> str: if kimi_skill_mode and skill_name: return f"/skill:{skill_name}" if cline_mode: - from .integrations.cline import format_cline_command_name + from ..integrations.cline import format_cline_command_name return f"/{format_cline_command_name(command_id)}" diff --git a/src/specify_cli/extensions/_commands.py b/src/specify_cli/extensions/_commands.py new file mode 100644 index 0000000000..4df76f0596 --- /dev/null +++ b/src/specify_cli/extensions/_commands.py @@ -0,0 +1,1490 @@ +"""specify extension * and catalog * command handlers — app objects and register(). + +Moved out of __init__.py (PR-7/8). Handlers reference helpers that remain in +the package root (`_require_specify_project`, `_locate_bundled_extension`, +`load_init_options`, `_display_project_path`) through the thin shims below, +which re-fetch from the parent package at call time so test monkeypatching of +`specify_cli.` keeps working. +""" +from __future__ import annotations + +import os +import shutil +import zipfile +from pathlib import Path +from typing import Optional + +import typer +import yaml +from rich.panel import Panel +from rich.table import Table + +from .._console import console +from .._assets import get_speckit_version + +extension_app = typer.Typer( + name="extension", + help="Manage spec-kit extensions", + add_completion=False, +) + +catalog_app = typer.Typer( + name="catalog", + help="Manage extension catalogs", + add_completion=False, +) +extension_app.add_typer(catalog_app, name="catalog") + + +# Root helpers re-fetched at call time so test monkeypatching of +# `specify_cli.` keeps working after the move. +def _require_specify_project(*args, **kwargs): + from .. import _require_specify_project as _f + return _f(*args, **kwargs) + + +def _locate_bundled_extension(*args, **kwargs): + from .. import _locate_bundled_extension as _f + return _f(*args, **kwargs) + + +def load_init_options(*args, **kwargs): + from .. import load_init_options as _f + return _f(*args, **kwargs) + + +def _display_project_path(*args, **kwargs): + from .. import _display_project_path as _f + return _f(*args, **kwargs) + + +def _resolve_installed_extension( + argument: str, + installed_extensions: list, + command_name: str = "command", + allow_not_found: bool = False, +) -> tuple[Optional[str], Optional[str]]: + """Resolve an extension argument (ID or display name) to an installed extension. + + Args: + argument: Extension ID or display name provided by user + installed_extensions: List of installed extension dicts from manager.list_installed() + command_name: Name of the command for error messages (e.g., "enable", "disable") + allow_not_found: If True, return (None, None) when not found instead of raising + + Returns: + Tuple of (extension_id, display_name), or (None, None) if allow_not_found=True and not found + + Raises: + typer.Exit: If extension not found (and allow_not_found=False) or name is ambiguous + """ + # First, try exact ID match + for ext in installed_extensions: + if ext["id"] == argument: + return (ext["id"], ext["name"]) + + # If not found by ID, try display name match + name_matches = [ext for ext in installed_extensions if ext["name"].lower() == argument.lower()] + + if len(name_matches) == 1: + # Unique display-name match + return (name_matches[0]["id"], name_matches[0]["name"]) + elif len(name_matches) > 1: + # Ambiguous display-name match + console.print( + f"[red]Error:[/red] Extension name '{argument}' is ambiguous. " + "Multiple installed extensions share this name:" + ) + table = Table(title="Matching extensions") + table.add_column("ID", style="cyan", no_wrap=True) + table.add_column("Name", style="white") + table.add_column("Version", style="green") + for ext in name_matches: + table.add_row(ext.get("id", ""), ext.get("name", ""), str(ext.get("version", ""))) + console.print(table) + console.print("\nPlease rerun using the extension ID:") + console.print(f" [bold]specify extension {command_name} [/bold]") + raise typer.Exit(1) + else: + # No match by ID or display name + if allow_not_found: + return (None, None) + console.print(f"[red]Error:[/red] Extension '{argument}' is not installed") + raise typer.Exit(1) + + +def _resolve_catalog_extension( + argument: str, + catalog, + command_name: str = "info", +) -> tuple[Optional[dict], Optional[Exception]]: + """Resolve an extension argument (ID or display name) from the catalog. + + Args: + argument: Extension ID or display name provided by user + catalog: ExtensionCatalog instance + command_name: Name of the command for error messages + + Returns: + Tuple of (extension_info, catalog_error) + - If found: (ext_info_dict, None) + - If catalog error: (None, error) + - If not found: (None, None) + """ + from . import ExtensionError + + try: + # First try by ID + ext_info = catalog.get_extension_info(argument) + if ext_info: + return (ext_info, None) + + # Try by display name - search using argument as query, then filter for exact match + search_results = catalog.search(query=argument) + name_matches = [ext for ext in search_results if ext["name"].lower() == argument.lower()] + + if len(name_matches) == 1: + return (name_matches[0], None) + elif len(name_matches) > 1: + # Ambiguous display-name match in catalog + console.print( + f"[red]Error:[/red] Extension name '{argument}' is ambiguous. " + "Multiple catalog extensions share this name:" + ) + table = Table(title="Matching extensions") + table.add_column("ID", style="cyan", no_wrap=True) + table.add_column("Name", style="white") + table.add_column("Version", style="green") + table.add_column("Catalog", style="dim") + for ext in name_matches: + table.add_row( + ext.get("id", ""), + ext.get("name", ""), + str(ext.get("version", "")), + ext.get("_catalog_name", ""), + ) + console.print(table) + console.print("\nPlease rerun using the extension ID:") + console.print(f" [bold]specify extension {command_name} [/bold]") + raise typer.Exit(1) + + # Not found + return (None, None) + + except ExtensionError as e: + return (None, e) + + +@extension_app.command("list") +def extension_list( + available: bool = typer.Option(False, "--available", help="Show available extensions from catalog"), + all_extensions: bool = typer.Option(False, "--all", help="Show both installed and available"), +): + """List installed extensions.""" + from . import ExtensionManager + + project_root = _require_specify_project() + manager = ExtensionManager(project_root) + installed = manager.list_installed() + + if not installed and not (available or all_extensions): + console.print("[yellow]No extensions installed.[/yellow]") + console.print("\nInstall an extension with:") + console.print(" specify extension add ") + return + + if installed: + console.print("\n[bold cyan]Installed Extensions:[/bold cyan]\n") + + for ext in installed: + status_icon = "✓" if ext["enabled"] else "✗" + status_color = "green" if ext["enabled"] else "red" + + console.print(f" [{status_color}]{status_icon}[/{status_color}] [bold]{ext['name']}[/bold] (v{ext['version']})") + console.print(f" [dim]{ext['id']}[/dim]") + console.print(f" {ext['description']}") + console.print(f" Commands: {ext['command_count']} | Hooks: {ext['hook_count']} | Priority: {ext['priority']} | Status: {'Enabled' if ext['enabled'] else 'Disabled'}") + console.print() + + if available or all_extensions: + console.print("\nInstall an extension:") + console.print(" [cyan]specify extension add [/cyan]") + + +@catalog_app.command("list") +def catalog_list(): + """List all active extension catalogs.""" + from . import ExtensionCatalog, ValidationError + + project_root = _require_specify_project() + catalog = ExtensionCatalog(project_root) + + try: + active_catalogs = catalog.get_active_catalogs() + except ValidationError as e: + console.print(f"[red]Error:[/red] {e}") + raise typer.Exit(1) + + console.print("\n[bold cyan]Active Extension Catalogs:[/bold cyan]\n") + for entry in active_catalogs: + install_str = ( + "[green]install allowed[/green]" + if entry.install_allowed + else "[yellow]discovery only[/yellow]" + ) + console.print(f" [bold]{entry.name}[/bold] (priority {entry.priority})") + if entry.description: + console.print(f" {entry.description}") + console.print(f" URL: {entry.url}") + console.print(f" Install: {install_str}") + console.print() + + config_path = project_root / ".specify" / "extension-catalogs.yml" + user_config_path = Path.home() / ".specify" / "extension-catalogs.yml" + if os.environ.get("SPECKIT_CATALOG_URL"): + console.print("[dim]Catalog configured via SPECKIT_CATALOG_URL environment variable.[/dim]") + else: + try: + proj_loaded = config_path.exists() and catalog._load_catalog_config(config_path) is not None + except ValidationError: + proj_loaded = False + if proj_loaded: + console.print(f"[dim]Config: {_display_project_path(project_root, config_path)}[/dim]") + else: + try: + user_loaded = user_config_path.exists() and catalog._load_catalog_config(user_config_path) is not None + except ValidationError: + user_loaded = False + if user_loaded: + console.print("[dim]Config: ~/.specify/extension-catalogs.yml[/dim]") + else: + console.print("[dim]Using built-in default catalog stack.[/dim]") + console.print( + "[dim]Add .specify/extension-catalogs.yml to customize.[/dim]" + ) + + +@catalog_app.command("add") +def catalog_add( + url: str = typer.Argument(help="Catalog URL (must use HTTPS)"), + name: str = typer.Option(..., "--name", help="Catalog name"), + priority: int = typer.Option(10, "--priority", help="Priority (lower = higher priority)"), + install_allowed: bool = typer.Option( + False, "--install-allowed/--no-install-allowed", + help="Allow extensions from this catalog to be installed", + ), + description: str = typer.Option("", "--description", help="Description of the catalog"), +): + """Add a catalog to .specify/extension-catalogs.yml.""" + from . import ExtensionCatalog, ValidationError + + project_root = _require_specify_project() + specify_dir = project_root / ".specify" + + # Validate URL + tmp_catalog = ExtensionCatalog(project_root) + try: + tmp_catalog._validate_catalog_url(url) + except ValidationError as e: + console.print(f"[red]Error:[/red] {e}") + raise typer.Exit(1) + + config_path = specify_dir / "extension-catalogs.yml" + + # Load existing config + if config_path.exists(): + try: + config = yaml.safe_load(config_path.read_text(encoding="utf-8")) or {} + except Exception as e: + config_label = _display_project_path(project_root, config_path) + console.print(f"[red]Error:[/red] Failed to read {config_label}: {e}") + raise typer.Exit(1) + else: + config = {} + + catalogs = config.get("catalogs", []) + if not isinstance(catalogs, list): + console.print("[red]Error:[/red] Invalid catalog config: 'catalogs' must be a list.") + raise typer.Exit(1) + + # Check for duplicate name + for existing in catalogs: + if isinstance(existing, dict) and existing.get("name") == name: + console.print(f"[yellow]Warning:[/yellow] A catalog named '{name}' already exists.") + console.print("Use 'specify extension catalog remove' first, or choose a different name.") + raise typer.Exit(1) + + catalogs.append({ + "name": name, + "url": url, + "priority": priority, + "install_allowed": install_allowed, + "description": description, + }) + + config["catalogs"] = catalogs + config_path.write_text(yaml.dump(config, default_flow_style=False, sort_keys=False, allow_unicode=True), encoding="utf-8") + + install_label = "install allowed" if install_allowed else "discovery only" + console.print(f"\n[green]✓[/green] Added catalog '[bold]{name}[/bold]' ({install_label})") + console.print(f" URL: {url}") + console.print(f" Priority: {priority}") + console.print(f"\nConfig saved to {_display_project_path(project_root, config_path)}") + + +@catalog_app.command("remove") +def catalog_remove( + name: str = typer.Argument(help="Catalog name to remove"), +): + """Remove a catalog from .specify/extension-catalogs.yml.""" + project_root = _require_specify_project() + specify_dir = project_root / ".specify" + + config_path = specify_dir / "extension-catalogs.yml" + if not config_path.exists(): + console.print("[red]Error:[/red] No catalog config found. Nothing to remove.") + raise typer.Exit(1) + + try: + config = yaml.safe_load(config_path.read_text(encoding="utf-8")) or {} + except Exception: + console.print("[red]Error:[/red] Failed to read catalog config.") + raise typer.Exit(1) + + catalogs = config.get("catalogs", []) + if not isinstance(catalogs, list): + console.print("[red]Error:[/red] Invalid catalog config: 'catalogs' must be a list.") + raise typer.Exit(1) + original_count = len(catalogs) + catalogs = [c for c in catalogs if isinstance(c, dict) and c.get("name") != name] + + if len(catalogs) == original_count: + console.print(f"[red]Error:[/red] Catalog '{name}' not found.") + raise typer.Exit(1) + + config["catalogs"] = catalogs + config_path.write_text(yaml.dump(config, default_flow_style=False, sort_keys=False, allow_unicode=True), encoding="utf-8") + + console.print(f"[green]✓[/green] Removed catalog '{name}'") + if not catalogs: + console.print("\n[dim]No catalogs remain in config. Built-in defaults will be used.[/dim]") + + +@extension_app.command("add") +def extension_add( + extension: str = typer.Argument(help="Extension name or path"), + dev: bool = typer.Option(False, "--dev", help="Install from local directory"), + from_url: Optional[str] = typer.Option(None, "--from", help="Install from custom URL"), + force: bool = typer.Option(False, "--force", help="Overwrite if already installed"), + priority: int = typer.Option(10, "--priority", help="Resolution priority (lower = higher precedence, default 10)"), +): + """Install an extension.""" + from . import ExtensionManager, ExtensionCatalog, ExtensionError, ValidationError, CompatibilityError, REINSTALL_COMMAND + + project_root = _require_specify_project() + # Validate priority + if priority < 1: + console.print("[red]Error:[/red] Priority must be a positive integer (1 or higher)") + raise typer.Exit(1) + + manager = ExtensionManager(project_root) + speckit_version = get_speckit_version() + + if force: + console.print("[yellow]--force:[/yellow] Will overwrite if already installed") + + # Prompt for URL-based installs BEFORE the spinner so the user can + # actually see and respond to the confirmation (the Rich status + # spinner overwrites the typer.confirm prompt line, making it appear + # as though the command is hung). + # Guard with ``not dev`` so that --dev + --from does not show a + # confusing confirmation for a URL that will be ignored. + if from_url and not dev: + from urllib.parse import urlparse + from rich.markup import escape as _escape_markup + + parsed = urlparse(from_url) + is_localhost = parsed.hostname in ("localhost", "127.0.0.1", "::1") + + if parsed.scheme != "https" and not (parsed.scheme == "http" and is_localhost): + console.print("[red]Error:[/red] URL must use HTTPS for security.") + console.print("HTTP is only allowed for localhost URLs.") + raise typer.Exit(1) + + safe_url = _escape_markup(from_url) + + # Warn about untrusted sources — default-deny confirmation + console.print() + console.print(Panel( + f"[bold]You are installing an extension from an external URL that is not\n" + f"listed in any of your configured extension catalogs.[/bold]\n\n" + f"URL: {safe_url}\n\n" + f"Only install extensions from sources you trust.", + title="[bold yellow]⚠ Untrusted Source[/bold yellow]", + border_style="yellow", + padding=(1, 2), + )) + console.print() + confirm = typer.confirm("Continue with installation?", default=False) + if not confirm: + console.print("Cancelled") + raise typer.Exit(0) + + try: + with console.status(f"[cyan]Installing extension: {extension}[/cyan]"): + if dev: + # Install from local directory + source_path = Path(extension).expanduser().resolve() + if not source_path.exists(): + console.print(f"[red]Error:[/red] Directory not found: {source_path}") + raise typer.Exit(1) + + if not (source_path / "extension.yml").exists(): + console.print(f"[red]Error:[/red] No extension.yml found in {source_path}") + raise typer.Exit(1) + + if force: + console.print(f"[yellow]--force:[/yellow] Installing from [cyan]{source_path}[/cyan] (will overwrite if already installed)...") + + manifest = manager.install_from_directory( + source_path, + speckit_version, + priority=priority, + link_commands=True, + force=force + ) + + elif from_url: + # Install from URL (ZIP file) + import urllib.error + + console.print(f"Downloading from {safe_url}...") + + # Download ZIP to temp location + download_dir = project_root / ".specify" / "extensions" / ".cache" / "downloads" + download_dir.mkdir(parents=True, exist_ok=True) + zip_path = download_dir / f"{extension}-url-download.zip" + + try: + from specify_cli.authentication.http import open_url as _open_url + + with _open_url(from_url, timeout=60) as response: + zip_data = response.read() + zip_path.write_bytes(zip_data) + + # Install from downloaded ZIP + manifest = manager.install_from_zip(zip_path, speckit_version, priority=priority, force=force) + except urllib.error.URLError as e: + console.print(f"[red]Error:[/red] Failed to download from {safe_url}: {e}") + raise typer.Exit(1) + finally: + # Clean up downloaded ZIP + if zip_path.exists(): + zip_path.unlink() + + else: + # Try bundled extensions first (shipped with spec-kit) + bundled_path = _locate_bundled_extension(extension) + if bundled_path is not None: + manifest = manager.install_from_directory( + bundled_path, speckit_version, priority=priority, force=force + ) + else: + # Install from catalog (also resolves display names to IDs) + catalog = ExtensionCatalog(project_root) + + # Check if extension exists in catalog (supports both ID and display name) + ext_info, catalog_error = _resolve_catalog_extension(extension, catalog, "add") + if catalog_error: + console.print(f"[red]Error:[/red] Could not query extension catalog: {catalog_error}") + raise typer.Exit(1) + if not ext_info: + console.print(f"[red]Error:[/red] Extension '{extension}' not found in catalog") + console.print("\nSearch available extensions:") + console.print(" specify extension search") + raise typer.Exit(1) + + # If catalog resolved a display name to an ID, check bundled again + resolved_id = ext_info['id'] + if resolved_id != extension: + bundled_path = _locate_bundled_extension(resolved_id) + if bundled_path is not None: + manifest = manager.install_from_directory( + bundled_path, speckit_version, priority=priority, force=force + ) + + if bundled_path is None: + # Bundled extensions without a download URL must come from the local package + if ext_info.get("bundled") and not ext_info.get("download_url"): + console.print( + f"[red]Error:[/red] Extension '{ext_info['id']}' is bundled with spec-kit " + f"but could not be found in the installed package." + ) + console.print( + "\nThis usually means the spec-kit installation is incomplete or corrupted." + ) + console.print("Try reinstalling spec-kit:") + console.print(f" {REINSTALL_COMMAND}") + raise typer.Exit(1) + + # Enforce install_allowed policy + if not ext_info.get("_install_allowed", True): + catalog_name = ext_info.get("_catalog_name", "community") + console.print( + f"[red]Error:[/red] '{extension}' is available in the " + f"'{catalog_name}' catalog but installation is not allowed from that catalog." + ) + console.print( + f"\nTo enable installation, add '{extension}' to an approved catalog " + f"(install_allowed: true) in .specify/extension-catalogs.yml." + ) + raise typer.Exit(1) + + # Download extension ZIP (use resolved ID, not original argument which may be display name) + extension_id = ext_info['id'] + console.print(f"Downloading {ext_info['name']} v{ext_info.get('version', 'unknown')}...") + zip_path = catalog.download_extension(extension_id) + + try: + # Install from downloaded ZIP + manifest = manager.install_from_zip(zip_path, speckit_version, priority=priority, force=force) + finally: + # Clean up downloaded ZIP + if zip_path.exists(): + zip_path.unlink() + + console.print("\n[green]✓[/green] Extension installed successfully!") + console.print(f"\n[bold]{manifest.name}[/bold] (v{manifest.version})") + console.print(f" {manifest.description}") + + for warning in manifest.warnings: + console.print(f"\n[yellow]⚠ Compatibility warning:[/yellow] {warning}") + + is_cline = load_init_options(project_root).get("ai") == "cline" + + if is_cline: + from specify_cli.integrations.cline import format_cline_command_name + + console.print("\n[bold cyan]Provided commands:[/bold cyan]") + for cmd in manifest.commands: + cmd_name = cmd['name'] + if is_cline: + cmd_name = format_cline_command_name(cmd_name) + console.print(f" • {cmd_name} - {cmd.get('description', '')}") + + # Report agent skills registration + reg_meta = manager.registry.get(manifest.id) + reg_skills = reg_meta.get("registered_skills", []) if reg_meta else [] + # Normalize to guard against corrupted registry entries + if not isinstance(reg_skills, list): + reg_skills = [] + if reg_skills: + console.print(f"\n[green]✓[/green] {len(reg_skills)} agent skill(s) auto-registered") + + console.print("\n[yellow]⚠[/yellow] Configuration may be required") + console.print(f" Check: .specify/extensions/{manifest.id}/") + + except ValidationError as e: + console.print(f"\n[red]Validation Error:[/red] {e}") + raise typer.Exit(1) + except CompatibilityError as e: + console.print(f"\n[red]Compatibility Error:[/red] {e}") + raise typer.Exit(1) + except ExtensionError as e: + console.print(f"\n[red]Error:[/red] {e}") + raise typer.Exit(1) + + +@extension_app.command("remove") +def extension_remove( + extension: str = typer.Argument(help="Extension ID or name to remove"), + keep_config: bool = typer.Option(False, "--keep-config", help="Don't remove config files"), + force: bool = typer.Option(False, "--force", help="Skip confirmation"), +): + """Uninstall an extension.""" + from . import ExtensionManager + + project_root = _require_specify_project() + manager = ExtensionManager(project_root) + + # Resolve extension ID from argument (handles ambiguous names) + installed = manager.list_installed() + extension_id, display_name = _resolve_installed_extension(extension, installed, "remove") + + # Get extension info for command and skill counts + ext_manifest = manager.get_extension(extension_id) + reg_meta = manager.registry.get(extension_id) + # Derive cmd_count from the registry's registered_commands (includes aliases) + # rather than from the manifest (primary commands only). Use max() across + # agents to get the per-agent count; sum() would double-count since users + # think in logical commands, not per-agent file counts. + # Use get() without a default so we can distinguish "key missing" (fall back + # to manifest) from "key present but empty dict" (zero commands registered). + registered_commands = reg_meta.get("registered_commands") if isinstance(reg_meta, dict) else None + if isinstance(registered_commands, dict): + cmd_count = max( + (len(v) for v in registered_commands.values() if isinstance(v, list)), + default=0, + ) + else: + cmd_count = len(ext_manifest.commands) if ext_manifest else 0 + raw_skills = reg_meta.get("registered_skills") if reg_meta else None + skill_count = len(raw_skills) if isinstance(raw_skills, list) else 0 + + # Confirm removal + if not force: + console.print("\n[yellow]⚠ This will remove:[/yellow]") + console.print(f" • {cmd_count} command{'s' if cmd_count != 1 else ''} per agent") + if skill_count: + console.print(f" • {skill_count} agent skill(s)") + console.print(f" • Extension directory: .specify/extensions/{extension_id}/") + if not keep_config: + console.print(" • Config files (will be backed up)") + console.print() + + confirm = typer.confirm("Continue?") + if not confirm: + console.print("Cancelled") + raise typer.Exit(0) + + # Remove extension + success = manager.remove(extension_id, keep_config=keep_config) + + if success: + console.print(f"\n[green]✓[/green] Extension '{display_name}' removed successfully") + if keep_config: + console.print(f"\nConfig files preserved in .specify/extensions/{extension_id}/") + else: + console.print(f"\nConfig files backed up to .specify/extensions/.backup/{extension_id}/") + console.print(f"\nTo reinstall: specify extension add {extension_id}") + else: + console.print("[red]Error:[/red] Failed to remove extension") + raise typer.Exit(1) + + +@extension_app.command("search") +def extension_search( + query: str = typer.Argument(None, help="Search query (optional)"), + tag: Optional[str] = typer.Option(None, "--tag", help="Filter by tag"), + author: Optional[str] = typer.Option(None, "--author", help="Filter by author"), + verified: bool = typer.Option(False, "--verified", help="Show only verified extensions"), +): + """Search for available extensions in catalog.""" + from . import ExtensionCatalog, ExtensionError + + project_root = _require_specify_project() + catalog = ExtensionCatalog(project_root) + + try: + console.print("🔍 Searching extension catalog...") + results = catalog.search(query=query, tag=tag, author=author, verified_only=verified) + + if not results: + console.print("\n[yellow]No extensions found matching criteria[/yellow]") + if query or tag or author or verified: + console.print("\nTry:") + console.print(" • Broader search terms") + console.print(" • Remove filters") + console.print(" • specify extension search (show all)") + raise typer.Exit(0) + + console.print(f"\n[green]Found {len(results)} extension(s):[/green]\n") + + for ext in results: + # Extension header + verified_badge = " [green]✓ Verified[/green]" if ext.get("verified") else "" + console.print(f"[bold]{ext['name']}[/bold] (v{ext['version']}){verified_badge}") + console.print(f" {ext['description']}") + + # Metadata + console.print(f"\n [dim]Author:[/dim] {ext.get('author', 'Unknown')}") + if ext.get('tags'): + tags_str = ", ".join(ext['tags']) + console.print(f" [dim]Tags:[/dim] {tags_str}") + + # Source catalog + catalog_name = ext.get("_catalog_name", "") + install_allowed = ext.get("_install_allowed", True) + if catalog_name: + if install_allowed: + console.print(f" [dim]Catalog:[/dim] {catalog_name}") + else: + console.print(f" [dim]Catalog:[/dim] {catalog_name} [yellow](discovery only — not installable)[/yellow]") + + # Stats + stats = [] + if ext.get('downloads') is not None: + stats.append(f"Downloads: {ext['downloads']:,}") + if ext.get('stars') is not None: + stats.append(f"Stars: {ext['stars']}") + if stats: + console.print(f" [dim]{' | '.join(stats)}[/dim]") + + # Links + if ext.get('repository'): + console.print(f" [dim]Repository:[/dim] {ext['repository']}") + + # Install command (show warning if not installable) + if install_allowed: + console.print(f"\n [cyan]Install:[/cyan] specify extension add {ext['id']}") + else: + console.print(f"\n [yellow]⚠[/yellow] Not directly installable from '{catalog_name}'.") + console.print( + f" Add to an approved catalog with install_allowed: true, " + f"or install from a ZIP URL: specify extension add {ext['id']} --from " + ) + console.print() + + except ExtensionError as e: + console.print(f"\n[red]Error:[/red] {e}") + console.print("\nTip: The catalog may be temporarily unavailable. Try again later.") + raise typer.Exit(1) + + +@extension_app.command("info") +def extension_info( + extension: str = typer.Argument(help="Extension ID or name"), +): + """Show detailed information about an extension.""" + from . import ExtensionCatalog, ExtensionManager, normalize_priority + + project_root = _require_specify_project() + catalog = ExtensionCatalog(project_root) + manager = ExtensionManager(project_root) + installed = manager.list_installed() + + # Try to resolve from installed extensions first (by ID or name) + # Use allow_not_found=True since the extension may be catalog-only + resolved_installed_id, resolved_installed_name = _resolve_installed_extension( + extension, installed, "info", allow_not_found=True + ) + + # Try catalog lookup (with error handling) + # If we resolved an installed extension by display name, use its ID for catalog lookup + # to ensure we get the correct catalog entry (not a different extension with same name) + lookup_key = resolved_installed_id if resolved_installed_id else extension + ext_info, catalog_error = _resolve_catalog_extension(lookup_key, catalog, "info") + + # Case 1: Found in catalog - show full catalog info + if ext_info: + _print_extension_info(ext_info, manager) + return + + # Case 2: Installed locally but catalog lookup failed or not in catalog + if resolved_installed_id: + # Get local manifest info + ext_manifest = manager.get_extension(resolved_installed_id) + metadata = manager.registry.get(resolved_installed_id) + metadata_is_dict = isinstance(metadata, dict) + if not metadata_is_dict: + console.print( + "[yellow]Warning:[/yellow] Extension metadata appears to be corrupted; " + "some information may be unavailable." + ) + version = metadata.get("version", "unknown") if metadata_is_dict else "unknown" + + console.print(f"\n[bold]{resolved_installed_name}[/bold] (v{version})") + console.print(f"ID: {resolved_installed_id}") + console.print() + + if ext_manifest: + console.print(f"{ext_manifest.description}") + console.print() + # Author is optional in extension.yml, safely retrieve it + author = ext_manifest.data.get("extension", {}).get("author") + if author: + console.print(f"[dim]Author:[/dim] {author}") + if ext_manifest.category: + console.print(f"[dim]Category:[/dim] {ext_manifest.category}") + if ext_manifest.effect: + console.print(f"[dim]Effect:[/dim] {ext_manifest.effect}") + console.print() + + if ext_manifest.commands: + console.print("[bold]Commands:[/bold]") + for cmd in ext_manifest.commands: + console.print(f" • {cmd['name']}: {cmd.get('description', '')}") + console.print() + + # Show catalog status + if catalog_error: + console.print(f"[yellow]Catalog unavailable:[/yellow] {catalog_error}") + console.print("[dim]Note: Using locally installed extension; catalog info could not be verified.[/dim]") + else: + console.print("[yellow]Note:[/yellow] Not found in catalog (custom/local extension)") + + console.print() + console.print("[green]✓ Installed[/green]") + priority = normalize_priority(metadata.get("priority") if metadata_is_dict else None) + console.print(f"[dim]Priority:[/dim] {priority}") + console.print(f"\nTo remove: specify extension remove {resolved_installed_id}") + return + + # Case 3: Not found anywhere + if catalog_error: + console.print(f"[red]Error:[/red] Could not query extension catalog: {catalog_error}") + console.print("\nTry again when online, or use the extension ID directly.") + else: + console.print(f"[red]Error:[/red] Extension '{extension}' not found") + console.print("\nTry: specify extension search") + raise typer.Exit(1) + + +def _print_extension_info(ext_info: dict, manager): + """Print formatted extension info from catalog data.""" + from . import normalize_priority + + # Header + verified_badge = " [green]✓ Verified[/green]" if ext_info.get("verified") else "" + console.print(f"\n[bold]{ext_info['name']}[/bold] (v{ext_info['version']}){verified_badge}") + console.print(f"ID: {ext_info['id']}") + console.print() + + # Description + console.print(f"{ext_info['description']}") + console.print() + + # Author and License + console.print(f"[dim]Author:[/dim] {ext_info.get('author', 'Unknown')}") + console.print(f"[dim]License:[/dim] {ext_info.get('license', 'Unknown')}") + + # Category and Effect + if ext_info.get('category'): + console.print(f"[dim]Category:[/dim] {ext_info['category']}") + if ext_info.get('effect'): + console.print(f"[dim]Effect:[/dim] {ext_info['effect']}") + + # Source catalog + if ext_info.get("_catalog_name"): + install_allowed = ext_info.get("_install_allowed", True) + install_note = "" if install_allowed else " [yellow](discovery only)[/yellow]" + console.print(f"[dim]Source catalog:[/dim] {ext_info['_catalog_name']}{install_note}") + console.print() + + # Requirements + if ext_info.get('requires'): + console.print("[bold]Requirements:[/bold]") + reqs = ext_info['requires'] + if reqs.get('speckit_version'): + console.print(f" • Spec Kit: {reqs['speckit_version']}") + if reqs.get('tools'): + for tool in reqs['tools']: + tool_name = tool['name'] + tool_version = tool.get('version', 'any') + required = " (required)" if tool.get('required') else " (optional)" + console.print(f" • {tool_name}: {tool_version}{required}") + console.print() + + # Provides + if ext_info.get('provides'): + console.print("[bold]Provides:[/bold]") + provides = ext_info['provides'] + if provides.get('commands'): + console.print(f" • Commands: {provides['commands']}") + if provides.get('hooks'): + console.print(f" • Hooks: {provides['hooks']}") + console.print() + + # Tags + if ext_info.get('tags'): + tags_str = ", ".join(ext_info['tags']) + console.print(f"[bold]Tags:[/bold] {tags_str}") + console.print() + + # Statistics + stats = [] + if ext_info.get('downloads') is not None: + stats.append(f"Downloads: {ext_info['downloads']:,}") + if ext_info.get('stars') is not None: + stats.append(f"Stars: {ext_info['stars']}") + if stats: + console.print(f"[bold]Statistics:[/bold] {' | '.join(stats)}") + console.print() + + # Links + console.print("[bold]Links:[/bold]") + if ext_info.get('repository'): + console.print(f" • Repository: {ext_info['repository']}") + if ext_info.get('homepage'): + console.print(f" • Homepage: {ext_info['homepage']}") + if ext_info.get('documentation'): + console.print(f" • Documentation: {ext_info['documentation']}") + if ext_info.get('changelog'): + console.print(f" • Changelog: {ext_info['changelog']}") + console.print() + + # Installation status and command + is_installed = manager.registry.is_installed(ext_info['id']) + install_allowed = ext_info.get("_install_allowed", True) + if is_installed: + console.print("[green]✓ Installed[/green]") + metadata = manager.registry.get(ext_info['id']) + priority = normalize_priority(metadata.get("priority") if isinstance(metadata, dict) else None) + console.print(f"[dim]Priority:[/dim] {priority}") + console.print(f"\nTo remove: specify extension remove {ext_info['id']}") + elif install_allowed: + console.print("[yellow]Not installed[/yellow]") + console.print(f"\n[cyan]Install:[/cyan] specify extension add {ext_info['id']}") + else: + catalog_name = ext_info.get("_catalog_name", "community") + console.print("[yellow]Not installed[/yellow]") + console.print( + f"\n[yellow]⚠[/yellow] '{ext_info['id']}' is available in the '{catalog_name}' catalog " + f"but not in your approved catalog. Add it to .specify/extension-catalogs.yml " + f"with install_allowed: true to enable installation." + ) + + +@extension_app.command("update") +def extension_update( + extension: str = typer.Argument(None, help="Extension ID or name to update (or all)"), +): + """Update extension(s) to latest version.""" + from . import ( + ExtensionManager, + ExtensionCatalog, + ExtensionError, + ValidationError, + CommandRegistrar, + HookExecutor, + normalize_priority, + ) + from packaging import version as pkg_version + + project_root = _require_specify_project() + manager = ExtensionManager(project_root) + catalog = ExtensionCatalog(project_root) + speckit_version = get_speckit_version() + + try: + # Get list of extensions to update + installed = manager.list_installed() + if extension: + # Update specific extension - resolve ID from argument (handles ambiguous names) + extension_id, _ = _resolve_installed_extension(extension, installed, "update") + extensions_to_update = [extension_id] + else: + # Update all extensions + extensions_to_update = [ext["id"] for ext in installed] + + if not extensions_to_update: + console.print("[yellow]No extensions installed[/yellow]") + raise typer.Exit(0) + + console.print("🔄 Checking for updates...\n") + + updates_available = [] + + for ext_id in extensions_to_update: + # Get installed version + metadata = manager.registry.get(ext_id) + if metadata is None or not isinstance(metadata, dict) or "version" not in metadata: + console.print(f"⚠ {ext_id}: Registry entry corrupted or missing (skipping)") + continue + try: + installed_version = pkg_version.Version(metadata["version"]) + except pkg_version.InvalidVersion: + console.print( + f"⚠ {ext_id}: Invalid installed version '{metadata.get('version')}' in registry (skipping)" + ) + continue + + # Get catalog info + ext_info = catalog.get_extension_info(ext_id) + if not ext_info: + console.print(f"⚠ {ext_id}: Not found in catalog (skipping)") + continue + + # Check if installation is allowed from this catalog + if not ext_info.get("_install_allowed", True): + console.print(f"⚠ {ext_id}: Updates not allowed from '{ext_info.get('_catalog_name', 'catalog')}' (skipping)") + continue + + try: + catalog_version = pkg_version.Version(ext_info["version"]) + except pkg_version.InvalidVersion: + console.print( + f"⚠ {ext_id}: Invalid catalog version '{ext_info.get('version')}' (skipping)" + ) + continue + + if catalog_version > installed_version: + updates_available.append( + { + "id": ext_id, + "name": ext_info.get("name", ext_id), # Display name for status messages + "installed": str(installed_version), + "available": str(catalog_version), + "download_url": ext_info.get("download_url"), + } + ) + else: + console.print(f"✓ {ext_id}: Up to date (v{installed_version})") + + if not updates_available: + console.print("\n[green]All extensions are up to date![/green]") + raise typer.Exit(0) + + # Show available updates + console.print("\n[bold]Updates available:[/bold]\n") + for update in updates_available: + console.print( + f" • {update['id']}: {update['installed']} → {update['available']}" + ) + + console.print() + confirm = typer.confirm("Update these extensions?") + if not confirm: + console.print("Cancelled") + raise typer.Exit(0) + + # Perform updates with atomic backup/restore + console.print() + updated_extensions = [] + failed_updates = [] + registrar = CommandRegistrar() + hook_executor = HookExecutor(project_root) + from ..agents import CommandRegistrar as _AgentReg # used in backup and rollback paths + + # UNSET sentinel: backup not yet captured (exception before backup step) + UNSET = object() + + for update in updates_available: + extension_id = update["id"] + ext_name = update["name"] # Use display name for user-facing messages + console.print(f"📦 Updating {ext_name}...") + + # Backup paths + backup_base = manager.extensions_dir / ".backup" / f"{extension_id}-update" + backup_ext_dir = backup_base / "extension" + backup_commands_dir = backup_base / "commands" + backup_config_dir = backup_base / "config" + + # Store backup state + backup_registry_entry = None # None means registry entry not yet captured + backup_installed = UNSET # Original installed list from extensions.yml + backup_hooks = None # None means backup step 4 not yet reached; {} or {...} means backup was captured + backed_up_command_files = {} + + try: + # 1. Backup registry entry (always, even if extension dir doesn't exist) + backup_registry_entry = manager.registry.get(extension_id) + + # 2. Backup extension directory + extension_dir = manager.extensions_dir / extension_id + if extension_dir.exists(): + backup_base.mkdir(parents=True, exist_ok=True) + if backup_ext_dir.exists(): + shutil.rmtree(backup_ext_dir) + shutil.copytree(extension_dir, backup_ext_dir) + + # Backup config files separately so they can be restored + # after a successful install (install_from_directory clears dest dir). + config_files = list(extension_dir.glob("*-config.yml")) + list( + extension_dir.glob("*-config.local.yml") + ) + for cfg_file in config_files: + backup_config_dir.mkdir(parents=True, exist_ok=True) + shutil.copy2(cfg_file, backup_config_dir / cfg_file.name) + + # 3. Backup command files for all agents + registered_commands = backup_registry_entry.get("registered_commands", {}) if isinstance(backup_registry_entry, dict) else {} + for agent_name, cmd_names in registered_commands.items(): + if agent_name not in registrar.AGENT_CONFIGS: + continue + agent_config = registrar.AGENT_CONFIGS[agent_name] + commands_dir = _AgentReg._resolve_agent_dir( + agent_name, agent_config, project_root + ) + + for cmd_name in cmd_names: + output_name = _AgentReg._compute_output_name(agent_name, cmd_name, agent_config) + cmd_file = commands_dir / f"{output_name}{agent_config['extension']}" + if cmd_file.exists(): + backup_cmd_path = backup_commands_dir / agent_name / cmd_file.name + backup_cmd_path.parent.mkdir(parents=True, exist_ok=True) + shutil.copy2(cmd_file, backup_cmd_path) + backed_up_command_files[str(cmd_file)] = str(backup_cmd_path) + + # Also backup copilot prompt files + if agent_name == "copilot": + prompt_file = project_root / ".github" / "prompts" / f"{cmd_name}.prompt.md" + if prompt_file.exists(): + backup_prompt_path = backup_commands_dir / "copilot-prompts" / prompt_file.name + backup_prompt_path.parent.mkdir(parents=True, exist_ok=True) + shutil.copy2(prompt_file, backup_prompt_path) + backed_up_command_files[str(prompt_file)] = str(backup_prompt_path) + + # 4. Backup hooks and installed list from extensions.yml + # get_project_config() always normalizes installed->[] and hooks->{}, + # so no sentinel is needed to distinguish key-absent from key-empty. + config = hook_executor.get_project_config() + if isinstance(config, dict): + import copy + # Deep-copy so nested mapping entries (e.g. version-pin dicts) + # are not affected by in-place mutations during the update. + backup_installed = copy.deepcopy(config.get("installed", [])) + backup_hooks = {} + for hook_name, hook_list in config.get("hooks", {}).items(): + if not isinstance(hook_list, list): + continue + ext_hooks = [h for h in hook_list if isinstance(h, dict) and h.get("extension") == extension_id] + if ext_hooks: + backup_hooks[hook_name] = ext_hooks + + # 5. Download new version + zip_path = catalog.download_extension(extension_id) + try: + # 6. Validate extension ID from ZIP BEFORE modifying installation + # Handle both root-level and nested extension.yml (GitHub auto-generated ZIPs) + with zipfile.ZipFile(zip_path, "r") as zf: + import yaml + manifest_data = None + namelist = zf.namelist() + + # First try root-level extension.yml + if "extension.yml" in namelist: + with zf.open("extension.yml") as f: + manifest_data = yaml.safe_load(f) or {} + else: + # Look for extension.yml in a single top-level subdirectory + # (e.g., "repo-name-branch/extension.yml") + manifest_paths = [n for n in namelist if n.endswith("/extension.yml") and n.count("/") == 1] + if len(manifest_paths) == 1: + with zf.open(manifest_paths[0]) as f: + manifest_data = yaml.safe_load(f) or {} + + if manifest_data is None: + raise ValueError("Downloaded extension archive is missing 'extension.yml'") + + zip_extension_id = manifest_data.get("extension", {}).get("id") + if zip_extension_id != extension_id: + raise ValueError( + f"Extension ID mismatch: expected '{extension_id}', got '{zip_extension_id}'" + ) + + # 7. Remove old extension (handles command file cleanup and registry removal) + manager.remove(extension_id, keep_config=True) + + # 8. Install new version + _ = manager.install_from_zip(zip_path, speckit_version) + + # Restore user config files from backup after successful install. + new_extension_dir = manager.extensions_dir / extension_id + if backup_config_dir.exists() and new_extension_dir.exists(): + for cfg_file in backup_config_dir.iterdir(): + if cfg_file.is_file(): + shutil.copy2(cfg_file, new_extension_dir / cfg_file.name) + + # 9. Restore metadata from backup (installed_at, enabled state) + if backup_registry_entry and isinstance(backup_registry_entry, dict): + # Copy current registry entry to avoid mutating internal + # registry state before explicit restore(). + current_metadata = manager.registry.get(extension_id) + if current_metadata is None or not isinstance(current_metadata, dict): + raise RuntimeError( + f"Registry entry for '{extension_id}' missing or corrupted after install — update incomplete" + ) + new_metadata = dict(current_metadata) + + # Preserve the original installation timestamp + if "installed_at" in backup_registry_entry: + new_metadata["installed_at"] = backup_registry_entry["installed_at"] + + # Preserve the original priority (normalized to handle corruption) + if "priority" in backup_registry_entry: + new_metadata["priority"] = normalize_priority(backup_registry_entry["priority"]) + + # If extension was disabled before update, disable it again + if not backup_registry_entry.get("enabled", True): + new_metadata["enabled"] = False + + # Use restore() instead of update() because update() always + # preserves the existing installed_at, ignoring our override + manager.registry.restore(extension_id, new_metadata) + + # Also disable hooks in extensions.yml if extension was disabled + if not backup_registry_entry.get("enabled", True): + config = hook_executor.get_project_config() + if "hooks" in config: + for hook_name in config["hooks"]: + for hook in config["hooks"][hook_name]: + if hook.get("extension") == extension_id: + hook["enabled"] = False + hook_executor.save_project_config(config) + finally: + # Clean up downloaded ZIP + if zip_path.exists(): + zip_path.unlink() + + # 10. Clean up backup on success + if backup_base.exists(): + shutil.rmtree(backup_base) + + console.print(f" [green]✓[/green] Updated to v{update['available']}") + updated_extensions.append(ext_name) + + except KeyboardInterrupt: + raise + except Exception as e: + console.print(f" [red]✗[/red] Failed: {e}") + failed_updates.append((ext_name, str(e))) + + # Rollback on failure + console.print(f" [yellow]↩[/yellow] Rolling back {ext_name}...") + + try: + # Restore extension directory + # Only perform destructive rollback if backup exists (meaning we + # actually modified the extension). This avoids deleting a valid + # installation when failure happened before changes were made. + extension_dir = manager.extensions_dir / extension_id + if backup_ext_dir.exists(): + if extension_dir.exists(): + shutil.rmtree(extension_dir) + shutil.copytree(backup_ext_dir, extension_dir) + + # Remove any NEW command files created by failed install + # (files that weren't in the original backup) + try: + new_registry_entry = manager.registry.get(extension_id) + if new_registry_entry is None or not isinstance(new_registry_entry, dict): + new_registered_commands = {} + else: + new_registered_commands = new_registry_entry.get("registered_commands", {}) + for agent_name, cmd_names in new_registered_commands.items(): + if agent_name not in registrar.AGENT_CONFIGS: + continue + agent_config = registrar.AGENT_CONFIGS[agent_name] + commands_dir = _AgentReg._resolve_agent_dir( + agent_name, agent_config, project_root + ) + + for cmd_name in cmd_names: + output_name = _AgentReg._compute_output_name(agent_name, cmd_name, agent_config) + cmd_file = commands_dir / f"{output_name}{agent_config['extension']}" + # Delete if it exists and wasn't in our backup + if cmd_file.exists() and str(cmd_file) not in backed_up_command_files: + cmd_file.unlink() + + # Also handle copilot prompt files + if agent_name == "copilot": + prompt_file = project_root / ".github" / "prompts" / f"{cmd_name}.prompt.md" + if prompt_file.exists() and str(prompt_file) not in backed_up_command_files: + prompt_file.unlink() + except KeyError: + pass # No new registry entry exists, nothing to clean up + + # Restore backed up command files + for original_path, backup_path in backed_up_command_files.items(): + backup_file = Path(backup_path) + if backup_file.exists(): + original_file = Path(original_path) + original_file.parent.mkdir(parents=True, exist_ok=True) + shutil.copy2(backup_file, original_file) + + # Restore metadata in extensions.yml (hooks and installed list). + # Only run if backup step 4 was reached (backup_hooks is not None); + # otherwise we have no safe baseline to restore from and could corrupt + # the config by removing pre-existing hooks. + if backup_hooks is not None: + config = hook_executor.get_project_config() + if not isinstance(config, dict): + config = {} + + modified = False + + # 1. Restore hooks in extensions.yml + if not isinstance(config.get("hooks"), dict): + config["hooks"] = {} + modified = True + + # Remove any hooks for this extension added by the failed install + for hook_name in list(config["hooks"].keys()): + hooks_list = config["hooks"][hook_name] + if not isinstance(hooks_list, list): + config["hooks"][hook_name] = [] + modified = True + continue + + original_len = len(hooks_list) + config["hooks"][hook_name] = [ + h for h in hooks_list + if isinstance(h, dict) and h.get("extension") != extension_id + ] + if len(config["hooks"][hook_name]) != original_len: + modified = True + + # Add back the backed-up hooks + if backup_hooks: + for hook_name, hooks in backup_hooks.items(): + if not isinstance(config["hooks"].get(hook_name), list): + config["hooks"][hook_name] = [] + config["hooks"][hook_name].extend(hooks) + modified = True + + # 2. Restore installed list in extensions.yml + if backup_installed is not UNSET: + if config.get("installed") != backup_installed: + config["installed"] = backup_installed + modified = True + + if modified: + hook_executor.save_project_config(config) + + # Restore registry entry (use restore() since entry was removed) + if backup_registry_entry: + manager.registry.restore(extension_id, backup_registry_entry) + + console.print(" [green]✓[/green] Rollback successful") + # Clean up backup directory only on successful rollback + if backup_base.exists(): + shutil.rmtree(backup_base) + except Exception as rollback_error: + console.print(f" [red]✗[/red] Rollback failed: {rollback_error}") + console.print(f" [dim]Backup preserved at: {backup_base}[/dim]") + + # Summary + console.print() + if updated_extensions: + console.print(f"[green]✓[/green] Successfully updated {len(updated_extensions)} extension(s)") + if failed_updates: + console.print(f"[red]✗[/red] Failed to update {len(failed_updates)} extension(s):") + for ext_name, error in failed_updates: + console.print(f" • {ext_name}: {error}") + raise typer.Exit(1) + + except ValidationError as e: + console.print(f"\n[red]Validation Error:[/red] {e}") + raise typer.Exit(1) + except ExtensionError as e: + console.print(f"\n[red]Error:[/red] {e}") + raise typer.Exit(1) + + +@extension_app.command("enable") +def extension_enable( + extension: str = typer.Argument(help="Extension ID or name to enable"), +): + """Enable a disabled extension.""" + from . import ExtensionManager, HookExecutor + + project_root = _require_specify_project() + manager = ExtensionManager(project_root) + hook_executor = HookExecutor(project_root) + + # Resolve extension ID from argument (handles ambiguous names) + installed = manager.list_installed() + extension_id, display_name = _resolve_installed_extension(extension, installed, "enable") + + # Update registry + metadata = manager.registry.get(extension_id) + if metadata is None or not isinstance(metadata, dict): + console.print(f"[red]Error:[/red] Extension '{extension_id}' not found in registry (corrupted state)") + raise typer.Exit(1) + + if metadata.get("enabled", True): + console.print(f"[yellow]Extension '{display_name}' is already enabled[/yellow]") + raise typer.Exit(0) + + manager.registry.update(extension_id, {"enabled": True}) + + # Enable hooks in extensions.yml + config = hook_executor.get_project_config() + if "hooks" in config: + for hook_name in config["hooks"]: + for hook in config["hooks"][hook_name]: + if hook.get("extension") == extension_id: + hook["enabled"] = True + hook_executor.save_project_config(config) + + console.print(f"[green]✓[/green] Extension '{display_name}' enabled") + + +@extension_app.command("disable") +def extension_disable( + extension: str = typer.Argument(help="Extension ID or name to disable"), +): + """Disable an extension without removing it.""" + from . import ExtensionManager, HookExecutor + + project_root = _require_specify_project() + manager = ExtensionManager(project_root) + hook_executor = HookExecutor(project_root) + + # Resolve extension ID from argument (handles ambiguous names) + installed = manager.list_installed() + extension_id, display_name = _resolve_installed_extension(extension, installed, "disable") + + # Update registry + metadata = manager.registry.get(extension_id) + if metadata is None or not isinstance(metadata, dict): + console.print(f"[red]Error:[/red] Extension '{extension_id}' not found in registry (corrupted state)") + raise typer.Exit(1) + + if not metadata.get("enabled", True): + console.print(f"[yellow]Extension '{display_name}' is already disabled[/yellow]") + raise typer.Exit(0) + + manager.registry.update(extension_id, {"enabled": False}) + + # Disable hooks in extensions.yml + config = hook_executor.get_project_config() + if "hooks" in config: + for hook_name in config["hooks"]: + for hook in config["hooks"][hook_name]: + if hook.get("extension") == extension_id: + hook["enabled"] = False + hook_executor.save_project_config(config) + + console.print(f"[green]✓[/green] Extension '{display_name}' disabled") + console.print("\nCommands will no longer be available. Hooks will not execute.") + console.print(f"To re-enable: specify extension enable {extension_id}") + + +@extension_app.command("set-priority") +def extension_set_priority( + extension: str = typer.Argument(help="Extension ID or name"), + priority: int = typer.Argument(help="New priority (lower = higher precedence)"), +): + """Set the resolution priority of an installed extension.""" + from . import ExtensionManager + + project_root = _require_specify_project() + # Validate priority + if priority < 1: + console.print("[red]Error:[/red] Priority must be a positive integer (1 or higher)") + raise typer.Exit(1) + + manager = ExtensionManager(project_root) + + # Resolve extension ID from argument (handles ambiguous names) + installed = manager.list_installed() + extension_id, display_name = _resolve_installed_extension(extension, installed, "set-priority") + + # Get current metadata + metadata = manager.registry.get(extension_id) + if metadata is None or not isinstance(metadata, dict): + console.print(f"[red]Error:[/red] Extension '{extension_id}' not found in registry (corrupted state)") + raise typer.Exit(1) + + from . import normalize_priority + raw_priority = metadata.get("priority") + # Only skip if the stored value is already a valid int equal to requested priority + # This ensures corrupted values (e.g., "high") get repaired even when setting to default (10) + if isinstance(raw_priority, int) and raw_priority == priority: + console.print(f"[yellow]Extension '{display_name}' already has priority {priority}[/yellow]") + raise typer.Exit(0) + + old_priority = normalize_priority(raw_priority) + + # Update priority + manager.registry.update(extension_id, {"priority": priority}) + + console.print(f"[green]✓[/green] Extension '{display_name}' priority changed: {old_priority} → {priority}") + console.print("\n[dim]Lower priority = higher precedence in template resolution[/dim]") + + +def register(app: typer.Typer) -> None: + """Attach the extension command group to the root Typer app.""" + app.add_typer(extension_app, name="extension") From f040f76b46b75964388162d8b1b35d8288f3fb51 Mon Sep 17 00:00:00 2001 From: wangchenguang Date: Thu, 18 Jun 2026 11:37:15 +0800 Subject: [PATCH 2/4] fix(extensions): preserve per-command path in update backup for skills agents MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Skills agents (extension == "/SKILL.md") name every command file SKILL.md, each in its own per-command subdir (e.g. speckit-plan/SKILL.md). The update backup keyed the backup path on cmd_file.name alone, so all of an agent's skill files collided onto a single backup path — each shutil.copy2 overwrote the previous one, and rollback restored one skill's content over all the others, corrupting or losing the rest. Mirror the real on-disk layout by using cmd_file.relative_to(commands_dir), keeping each backup path unique. This also makes backed_up_command_files values unique so restore copies the correct content back to each command. Add a regression test asserting two distinct skill files survive a backup -> failed-update -> rollback cycle with their own content. --- src/specify_cli/extensions/_commands.py | 8 +++- tests/test_extension_update_hardening.py | 48 ++++++++++++++++++++++++ 2 files changed, 55 insertions(+), 1 deletion(-) diff --git a/src/specify_cli/extensions/_commands.py b/src/specify_cli/extensions/_commands.py index 4df76f0596..8441c42ad8 100644 --- a/src/specify_cli/extensions/_commands.py +++ b/src/specify_cli/extensions/_commands.py @@ -1101,7 +1101,13 @@ def extension_update( output_name = _AgentReg._compute_output_name(agent_name, cmd_name, agent_config) cmd_file = commands_dir / f"{output_name}{agent_config['extension']}" if cmd_file.exists(): - backup_cmd_path = backup_commands_dir / agent_name / cmd_file.name + # Mirror the real on-disk layout under the backup dir. + # Skills agents (extension == "/SKILL.md") name every + # command file "SKILL.md", living in a per-command + # subdir (e.g. speckit-plan/SKILL.md). Using cmd_file.name + # alone would collide all of them onto one backup path and + # break rollback; keep the relative path to stay unique. + backup_cmd_path = backup_commands_dir / agent_name / cmd_file.relative_to(commands_dir) backup_cmd_path.parent.mkdir(parents=True, exist_ok=True) shutil.copy2(cmd_file, backup_cmd_path) backed_up_command_files[str(cmd_file)] = str(backup_cmd_path) diff --git a/tests/test_extension_update_hardening.py b/tests/test_extension_update_hardening.py index 426e5ec7e9..08877680d2 100644 --- a/tests/test_extension_update_hardening.py +++ b/tests/test_extension_update_hardening.py @@ -107,3 +107,51 @@ def mock_download_fail(*args, **kwargs): assert isinstance(restored_config, dict) assert "hooks" in restored_config assert restored_config["hooks"] == {} + + +def test_extension_update_skills_backup_no_collision(project_dir, monkeypatch): + """Regression: skills agents name every command file SKILL.md (one per + per-command subdir). Backup must keep the per-command path so rollback + restores each skill's own content instead of overwriting them onto a + single backup path.""" + monkeypatch.chdir(project_dir) + + config_path = project_dir / ".specify" / "extensions.yml" + config_path.write_text(yaml.dump({"installed": ["test-ext"], "hooks": {}})) + + # Two skill command files with DISTINCT content, mirroring the claude + # skills layout (.claude/skills//SKILL.md). + skills_root = project_dir / ".claude" / "skills" + plan_file = skills_root / "speckit-plan" / "SKILL.md" + tasks_file = skills_root / "speckit-tasks" / "SKILL.md" + plan_file.parent.mkdir(parents=True) + tasks_file.parent.mkdir(parents=True) + plan_file.write_text("PLAN CONTENT") + tasks_file.write_text("TASKS CONTENT") + + monkeypatch.setattr(ExtensionManager, "list_installed", lambda self: [{"id": "test-ext", "name": "Test Ext", "version": "1.0.0"}]) + monkeypatch.setattr(ExtensionRegistry, "get", lambda self, ext_id: { + "version": "1.0.0", + "enabled": True, + "registered_commands": {"claude": ["speckit.plan", "speckit.tasks"]}, + }) + monkeypatch.setattr(ExtensionCatalog, "get_extension_info", lambda self, ext_id: {"id": "test-ext", "name": "Test Ext", "version": "1.1.0", "download_url": "https://example.com/ext.zip"}) + + # Fail at download (step 5, after the command backup in step 3). Delete the + # originals first to simulate an install clobbering them, forcing rollback + # to rely entirely on the backups. + def mock_download_fail(self, ext_id): + plan_file.unlink() + tasks_file.unlink() + raise Exception("Download failed") + + monkeypatch.setattr(ExtensionCatalog, "download_extension", mock_download_fail) + monkeypatch.setattr("typer.confirm", lambda _: True) + + result = runner.invoke(app, ["extension", "update", "test-ext"], obj={"project_root": project_dir}) + + assert result.exit_code == 1 + # Rollback must restore EACH skill's own content, not a single collided copy. + assert plan_file.exists() and tasks_file.exists() + assert plan_file.read_text() == "PLAN CONTENT" + assert tasks_file.read_text() == "TASKS CONTENT" From 2d137633c046089117133ac8f4655ebaed8d2799 Mon Sep 17 00:00:00 2001 From: wangchenguang Date: Thu, 18 Jun 2026 11:42:33 +0800 Subject: [PATCH 3/4] style(extensions): use yaml.safe_dump when writing catalog config MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The catalog add/remove handlers wrote the integration catalog config with yaml.dump. Switch to yaml.safe_dump to align with the SafeDumper used by the presets commands and to refuse emitting !!python/object tags if a non-basic value ever reaches the config dict. Output is unchanged for the current basic-type payload (str/int/bool/dict/ list) — this is a defensive/consistency change, not a behavioral fix. --- src/specify_cli/extensions/_commands.py | 4 ++-- tests/test_extension_update_hardening.py | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/specify_cli/extensions/_commands.py b/src/specify_cli/extensions/_commands.py index 8441c42ad8..a294c2c5f4 100644 --- a/src/specify_cli/extensions/_commands.py +++ b/src/specify_cli/extensions/_commands.py @@ -323,7 +323,7 @@ def catalog_add( }) config["catalogs"] = catalogs - config_path.write_text(yaml.dump(config, default_flow_style=False, sort_keys=False, allow_unicode=True), encoding="utf-8") + config_path.write_text(yaml.safe_dump(config, default_flow_style=False, sort_keys=False, allow_unicode=True), encoding="utf-8") install_label = "install allowed" if install_allowed else "discovery only" console.print(f"\n[green]✓[/green] Added catalog '[bold]{name}[/bold]' ({install_label})") @@ -363,7 +363,7 @@ def catalog_remove( raise typer.Exit(1) config["catalogs"] = catalogs - config_path.write_text(yaml.dump(config, default_flow_style=False, sort_keys=False, allow_unicode=True), encoding="utf-8") + config_path.write_text(yaml.safe_dump(config, default_flow_style=False, sort_keys=False, allow_unicode=True), encoding="utf-8") console.print(f"[green]✓[/green] Removed catalog '{name}'") if not catalogs: diff --git a/tests/test_extension_update_hardening.py b/tests/test_extension_update_hardening.py index 08877680d2..8e8ed18543 100644 --- a/tests/test_extension_update_hardening.py +++ b/tests/test_extension_update_hardening.py @@ -111,7 +111,7 @@ def mock_download_fail(*args, **kwargs): def test_extension_update_skills_backup_no_collision(project_dir, monkeypatch): """Regression: skills agents name every command file SKILL.md (one per - per-command subdir). Backup must keep the per-command path so rollback + command subdirectory). Backup must keep the per-command path so rollback restores each skill's own content instead of overwriting them onto a single backup path.""" monkeypatch.chdir(project_dir) From 19dfad7e414654bf1baf3adbb97610fbe34d0c2f Mon Sep 17 00:00:00 2001 From: wangchenguang Date: Sat, 20 Jun 2026 00:11:19 +0800 Subject: [PATCH 4/4] Prevent extension config and download path escapes Extension catalog commands accepted parsed YAML roots without first checking that they were mappings, so corrupted configs could surface raw AttributeError tracebacks. URL-based extension downloads also derived their cache filename from user input, which let path separators escape the intended cache directory. The command layer now validates catalog config roots before calling get(), and URL downloads use an unnamed temporary filename created inside the downloads cache directory. Regression tests cover both corrupted catalog roots and traversal-looking extension names. --- src/specify_cli/extensions/_commands.py | 43 ++++++++++++++------ tests/integrations/test_cli.py | 30 ++++++++++++++ tests/test_extensions.py | 52 +++++++++++++++++++++++++ 3 files changed, 113 insertions(+), 12 deletions(-) diff --git a/src/specify_cli/extensions/_commands.py b/src/specify_cli/extensions/_commands.py index a294c2c5f4..fbefcc3d9c 100644 --- a/src/specify_cli/extensions/_commands.py +++ b/src/specify_cli/extensions/_commands.py @@ -10,6 +10,7 @@ import os import shutil +import tempfile import zipfile from pathlib import Path from typing import Optional @@ -58,6 +59,27 @@ def _display_project_path(*args, **kwargs): return _f(*args, **kwargs) +def _load_catalog_command_config(project_root: Path, config_path: Path) -> dict: + """Load extension catalog CLI config with user-facing shape errors.""" + try: + config = yaml.safe_load(config_path.read_text(encoding="utf-8")) + except Exception as e: + config_label = _display_project_path(project_root, config_path) + console.print(f"[red]Error:[/red] Failed to read {config_label}: {e}") + raise typer.Exit(1) + + if config is None: + return {} + if not isinstance(config, dict): + config_label = _display_project_path(project_root, config_path) + console.print( + f"[red]Error:[/red] Invalid catalog config {config_label}: " + "expected a YAML mapping at the root." + ) + raise typer.Exit(1) + return config + + def _resolve_installed_extension( argument: str, installed_extensions: list, @@ -293,12 +315,7 @@ def catalog_add( # Load existing config if config_path.exists(): - try: - config = yaml.safe_load(config_path.read_text(encoding="utf-8")) or {} - except Exception as e: - config_label = _display_project_path(project_root, config_path) - console.print(f"[red]Error:[/red] Failed to read {config_label}: {e}") - raise typer.Exit(1) + config = _load_catalog_command_config(project_root, config_path) else: config = {} @@ -345,11 +362,7 @@ def catalog_remove( console.print("[red]Error:[/red] No catalog config found. Nothing to remove.") raise typer.Exit(1) - try: - config = yaml.safe_load(config_path.read_text(encoding="utf-8")) or {} - except Exception: - console.print("[red]Error:[/red] Failed to read catalog config.") - raise typer.Exit(1) + config = _load_catalog_command_config(project_root, config_path) catalogs = config.get("catalogs", []) if not isinstance(catalogs, list): @@ -463,7 +476,13 @@ def extension_add( # Download ZIP to temp location download_dir = project_root / ".specify" / "extensions" / ".cache" / "downloads" download_dir.mkdir(parents=True, exist_ok=True) - zip_path = download_dir / f"{extension}-url-download.zip" + with tempfile.NamedTemporaryFile( + prefix="extension-url-download-", + suffix=".zip", + dir=download_dir, + delete=False, + ) as download_file: + zip_path = Path(download_file.name) try: from specify_cli.authentication.http import open_url as _open_url diff --git a/tests/integrations/test_cli.py b/tests/integrations/test_cli.py index 30bcb015d1..d142225872 100644 --- a/tests/integrations/test_cli.py +++ b/tests/integrations/test_cli.py @@ -1315,6 +1315,36 @@ def test_catalog_config_output_uses_posix_paths(self, tmp_path): assert extension_list.exit_code == 0, extension_list.output assert "Config: .specify/extension-catalogs.yml" in extension_list.output + def test_extension_catalog_add_rejects_non_mapping_config_root(self, tmp_path): + project = self._make_project(tmp_path) + cfg_path = project / ".specify" / "extension-catalogs.yml" + cfg_path.write_text("- not\n- a\n- mapping\n", encoding="utf-8") + + result = self._invoke([ + "extension", "catalog", "add", + "https://example.com/extension-catalog.yml", + "--name", "demo-extensions", + ], project) + + assert result.exit_code == 1, result.output + output = _normalize_cli_output(result.output) + assert "Invalid catalog config .specify/extension-catalogs.yml" in output + assert "expected a YAML mapping at the root" in output + assert "AttributeError" not in output + + def test_extension_catalog_remove_rejects_non_mapping_config_root(self, tmp_path): + project = self._make_project(tmp_path) + cfg_path = project / ".specify" / "extension-catalogs.yml" + cfg_path.write_text("- not\n- a\n- mapping\n", encoding="utf-8") + + result = self._invoke(["extension", "catalog", "remove", "demo"], project) + + assert result.exit_code == 1, result.output + output = _normalize_cli_output(result.output) + assert "Invalid catalog config .specify/extension-catalogs.yml" in output + assert "expected a YAML mapping at the root" in output + assert "AttributeError" not in output + # -- search ------------------------------------------------------------ def test_search_lists_all(self, tmp_path, monkeypatch): diff --git a/tests/test_extensions.py b/tests/test_extensions.py index e063571b14..d7aeab11d4 100644 --- a/tests/test_extensions.py +++ b/tests/test_extensions.py @@ -4871,6 +4871,58 @@ def test_add_from_url_cancel_exits_cleanly(self, tmp_path): assert result.exit_code == 0 assert "Cancelled" in result.output + def test_add_from_url_uses_cache_tempfile_for_untrusted_extension_name(self, tmp_path): + """The extension argument must not control the downloaded ZIP path.""" + import io + from types import SimpleNamespace + from typer.testing import CliRunner + from unittest.mock import patch + from specify_cli import app + + class FakeResponse(io.BytesIO): + def __enter__(self): + return self + + def __exit__(self, exc_type, exc, tb): + return False + + project_dir = tmp_path / "test-project" + project_dir.mkdir() + (project_dir / ".specify").mkdir() + downloads_dir = project_dir / ".specify" / "extensions" / ".cache" / "downloads" + installed = {} + + def fake_install_from_zip(self_obj, zip_path, speckit_version, priority=10, force=False): + captured_path = Path(zip_path) + installed["zip_path"] = captured_path + installed["zip_bytes"] = captured_path.read_bytes() + return SimpleNamespace( + id="escape", + name="Escape Test", + version="1.0.0", + description="Test extension", + warnings=[], + commands=[], + hooks=[], + ) + + runner = CliRunner() + with patch.object(Path, "cwd", return_value=project_dir), \ + patch("typer.confirm", return_value=True), \ + patch("specify_cli.authentication.http.open_url", return_value=FakeResponse(b"zip-bytes")), \ + patch.object(ExtensionManager, "install_from_zip", fake_install_from_zip): + result = runner.invoke( + app, + ["extension", "add", "../outside", "--from", "https://example.com/ext.zip"], + catch_exceptions=True, + ) + + assert result.exit_code == 0 + assert installed["zip_bytes"] == b"zip-bytes" + assert installed["zip_path"].resolve().is_relative_to(downloads_dir.resolve()) + assert installed["zip_path"].name.startswith("extension-url-download-") + assert not installed["zip_path"].exists() + class TestDownloadExtensionBundled: """Tests for download_extension handling of bundled extensions."""