Source code for questfoundry.compiler.compile

"""Compilation pipeline for QuestFoundry domain.

This module provides the main entry points for compiling MyST domain files
into generated Python code. It orchestrates the parser and generators.

Example Usage
-------------
Compile the full domain::

    from questfoundry.compiler import compile_domain

    # Parse and generate
    result = compile_domain(
        domain_dir="src/questfoundry/domain",
        output_dir="src/questfoundry/generated"
    )

    print(f"Generated {len(result)} files")

Command Line::

    # From project root
    uv run python -m questfoundry.compiler.compile

See Also
--------
:mod:`questfoundry.compiler.parser` : MyST parsing
:mod:`questfoundry.compiler.generators` : Code generation
"""

from __future__ import annotations

from pathlib import Path

from questfoundry.compiler.generators import generate_loops, generate_models, generate_roles
from questfoundry.compiler.models import (
    Agency,
    ArtifactFieldIR,
    ArtifactTypeIR,
    EnumTypeIR,
    EnumValueIR,
    GraphEdgeIR,
    GraphNodeIR,
    LoopIR,
    QualityGateIR,
    RoleIR,
    RoleToolIR,
    StoreType,
)
from questfoundry.compiler.parser import Directive, parse_myst_file
from questfoundry.compiler.parser.directives import DirectiveType


def _parse_ontology_files(ontology_path: Path) -> dict[str, list[Directive]]:
    """Parse all MyST files in the ontology directory.

    Parameters
    ----------
    ontology_path : Path
        Path to the ontology directory.

    Returns
    -------
    dict
        Dictionary with "directives" key containing all parsed directives.
    """
    all_directives: list[Directive] = []

    if ontology_path.exists():
        for md_file in ontology_path.glob("*.md"):
            # Skip non-domain files
            if md_file.name.startswith("_") or md_file.name.isupper():
                continue

            result = parse_myst_file(md_file)
            all_directives.extend(result.directives)

    return {"directives": all_directives}


[docs] def compile_ontology( domain_dir: str | Path, output_dir: str | Path, ) -> dict[str, Path]: """Compile ontology definitions to Pydantic models. Parses the domain/ontology/*.md files and generates enums.py and artifacts.py. Parameters ---------- domain_dir : str | Path Path to the domain directory containing ontology/ subdirectory. output_dir : str | Path Path to the generated/models/ output directory. Returns ------- dict[str, Path] Dictionary mapping filename to path of generated files. Examples -------- >>> result = compile_ontology( ... "src/questfoundry/domain", ... "src/questfoundry/generated/models" ... ) >>> "enums.py" in result True """ domain_path = Path(domain_dir) ontology_path = domain_path / "ontology" # Parse all MyST files in ontology directory # parse_domain_directory expects a domain root and looks for subdirs, # but we're passing the ontology dir directly, so we need to adjust result = _parse_ontology_files(ontology_path) # Convert ParseResult to IR structures enums = _extract_enums(result) artifacts = _extract_artifacts(result, enums) # Generate Python code return generate_models(enums, artifacts, output_dir)
def _extract_enums(result: dict[str, list[Directive]]) -> dict[str, EnumTypeIR]: """Extract enum definitions from parse result. Parameters ---------- result : dict Parse result from parse_domain_directory. Returns ------- dict[str, EnumTypeIR] Dictionary mapping enum ID to EnumTypeIR. """ from questfoundry.compiler.parser.directives import DirectiveType enums: dict[str, EnumTypeIR] = {} current_enum: str | None = None for directive in result.get("directives", []): if directive.type == DirectiveType.ENUM_TYPE: enum_id = directive.content.get("id", "") description = directive.content.get("description", "") current_enum = enum_id enums[enum_id] = EnumTypeIR( id=enum_id, description=description, values=[], ) # Check for inline values in the enum-type directive if "values" in directive.content: values_data = directive.content["values"] if isinstance(values_data, dict): for value_name, value_desc in values_data.items(): enums[enum_id].values.append( EnumValueIR(name=value_name, description=value_desc or "") ) elif isinstance(values_data, list): for value_item in values_data: if isinstance(value_item, str): enums[enum_id].values.append( EnumValueIR(name=value_item, description="") ) elif isinstance(value_item, dict): for k, v in value_item.items(): enums[enum_id].values.append( EnumValueIR(name=k, description=v or "") ) elif directive.type == DirectiveType.ENUM_VALUE: # Individual enum value directive enum_ref = directive.content.get("enum", current_enum) if enum_ref and enum_ref in enums: enums[enum_ref].values.append( EnumValueIR( name=directive.content.get("value", ""), description=directive.content.get("description", ""), ) ) return enums def _extract_artifacts( result: dict[str, list[Directive]], enums: dict[str, EnumTypeIR], # noqa: ARG001 ) -> dict[str, ArtifactTypeIR]: """Extract artifact definitions from parse result. Parameters ---------- result : dict Parse result from parse_domain_directory. enums : dict[str, EnumTypeIR] Available enum definitions for type resolution. Returns ------- dict[str, ArtifactTypeIR] Dictionary mapping artifact ID to ArtifactTypeIR. """ from questfoundry.compiler.parser.directives import DirectiveType artifacts: dict[str, ArtifactTypeIR] = {} for directive in result.get("directives", []): if directive.type == DirectiveType.ARTIFACT_TYPE: artifact_id = directive.content.get("id", "") store_str = directive.content.get("store", "hot") store = StoreType(store_str) if store_str in ["hot", "cold", "both"] else StoreType.HOT # Parse cold promotion config content_field = directive.content.get("content_field") requires_content_raw = directive.content.get("requires_content", True) requires_content = requires_content_raw not in (False, "false", "False", "no", "No") artifacts[artifact_id] = ArtifactTypeIR( id=artifact_id, name=directive.content.get("name", artifact_id), store=store, lifecycle=directive.content.get("lifecycle", []), fields=[], content_field=content_field, requires_content=requires_content, ) elif directive.type == DirectiveType.ARTIFACT_FIELD: artifact_ref = directive.content.get("artifact", "") if artifact_ref in artifacts: artifacts[artifact_ref].fields.append( ArtifactFieldIR( artifact=artifact_ref, name=directive.content.get("name", ""), type=directive.content.get("type", "str"), required=directive.content.get("required", False), description=directive.content.get("description", ""), ) ) return artifacts def _parse_role_files(roles_path: Path) -> dict[str, list[Directive]]: """Parse all MyST files in the roles directory. Parameters ---------- roles_path : Path Path to the roles directory. Returns ------- dict Dictionary mapping role_id to list of directives. """ roles_by_id: dict[str, list[Directive]] = {} if roles_path.exists(): for md_file in roles_path.glob("*.md"): # Skip non-role files if md_file.name.startswith("_") or md_file.name.isupper(): continue result = parse_myst_file(md_file) role_id = md_file.stem # e.g., "showrunner" from "showrunner.md" roles_by_id[role_id] = result.directives return roles_by_id def _extract_roles(roles_by_id: dict[str, list[Directive]]) -> dict[str, RoleIR]: """Extract role definitions from parsed directives. Parameters ---------- roles_by_id : dict Dictionary mapping role_id to list of directives. Returns ------- dict[str, RoleIR] Dictionary mapping role ID to RoleIR. """ roles: dict[str, RoleIR] = {} for _role_id, directives in roles_by_id.items(): # Find role-meta directive (role_id comes from meta["id"], not filename) meta: dict[str, str] = {} tools: list[RoleToolIR] = [] constraints: list[str] = [] prompt_template: str = "" for directive in directives: if directive.type == DirectiveType.ROLE_META: meta = directive.content elif directive.type == DirectiveType.ROLE_TOOLS: # Tools come as {"items": [...]} or direct list/dict tools_data = directive.content # Handle {"items": [...]} wrapper from YAML list parsing if isinstance(tools_data, dict) and "items" in tools_data: tools_data = tools_data["items"] if isinstance(tools_data, dict): for name, desc in tools_data.items(): if name != "items": # Skip wrapper key tools.append( RoleToolIR(name=name, description=str(desc) if desc else "") ) elif isinstance(tools_data, list): for item in tools_data: if isinstance(item, dict): for name, desc in item.items(): tools.append( RoleToolIR(name=name, description=str(desc) if desc else "") ) elif isinstance(item, str): # Parse "name: description" format if ": " in item: name, desc = item.split(": ", 1) tools.append( RoleToolIR(name=name.strip(), description=desc.strip()) ) else: tools.append(RoleToolIR(name=item, description="")) elif directive.type == DirectiveType.ROLE_CONSTRAINTS: # Constraints come as {"items": [...]} or direct list constraints_data = directive.content # Handle {"items": [...]} wrapper from YAML list parsing if isinstance(constraints_data, dict) and "items" in constraints_data: constraints_data = constraints_data["items"] if isinstance(constraints_data, list): constraints = [str(c) for c in constraints_data] elif directive.type == DirectiveType.ROLE_PROMPT: # Prompt template comes as {"template": "..."} or direct string prompt_data = directive.content if isinstance(prompt_data, dict) and "template" in prompt_data: prompt_template = str(prompt_data["template"]) elif isinstance(prompt_data, str): prompt_template = prompt_data else: prompt_template = str(prompt_data) if prompt_data else "" # Build the RoleIR if we have meta if meta and "id" in meta: agency_str = meta.get("agency", "medium") try: agency = Agency(agency_str.lower()) except ValueError: agency = Agency.MEDIUM # Extract version (default to 1) version_str = meta.get("version", "1") try: version = int(version_str) except (ValueError, TypeError): version = 1 roles[meta["id"]] = RoleIR( id=meta["id"], abbr=meta.get("abbr", ""), archetype=meta.get("archetype", ""), agency=agency, mandate=meta.get("mandate", ""), version=version, tools=tools, constraints=constraints, prompt_template=prompt_template, ) return roles
[docs] def compile_roles( domain_dir: str | Path, output_dir: str | Path, ) -> dict[str, Path]: """Compile role definitions to Python configurations. Parses the domain/roles/*.md files and generates role config files. Parameters ---------- domain_dir : str | Path Path to the domain directory containing roles/ subdirectory. output_dir : str | Path Path to the generated/roles/ output directory. Returns ------- dict[str, Path] Dictionary mapping filename to path of generated files. """ domain_path = Path(domain_dir) roles_path = domain_path / "roles" # Parse all MyST files in roles directory roles_by_id = _parse_role_files(roles_path) # Convert to IR structures roles = _extract_roles(roles_by_id) # Generate Python code return generate_roles(roles, output_dir)
def _parse_loop_files(loops_path: Path) -> dict[str, list[Directive]]: """Parse all MyST files in the loops directory. Parameters ---------- loops_path : Path Path to the loops directory. Returns ------- dict Dictionary mapping loop_id to list of directives. """ loops_by_id: dict[str, list[Directive]] = {} if loops_path.exists(): for md_file in loops_path.glob("*.md"): # Skip non-loop files if md_file.name.startswith("_") or md_file.name.isupper(): continue result = parse_myst_file(md_file) loop_id = md_file.stem # e.g., "story_spark" from "story_spark.md" loops_by_id[loop_id] = result.directives return loops_by_id def _extract_loops(loops_by_id: dict[str, list[Directive]]) -> dict[str, LoopIR]: """Extract loop definitions from parsed directives. Parameters ---------- loops_by_id : dict Dictionary mapping loop_id to list of directives. Returns ------- dict[str, LoopIR] Dictionary mapping loop ID to LoopIR. """ loops: dict[str, LoopIR] = {} for _loop_id, directives in loops_by_id.items(): # Find loop-meta directive meta: dict[str, str] = {} nodes: list[GraphNodeIR] = [] edges: list[GraphEdgeIR] = [] quality_gates: list[QualityGateIR] = [] for directive in directives: if directive.type == DirectiveType.LOOP_META: meta = directive.content elif directive.type == DirectiveType.GRAPH_NODE: node_data = directive.content nodes.append( GraphNodeIR( id=node_data.get("id", ""), role=node_data.get("role", ""), timeout=int(node_data.get("timeout", 300)), max_iterations=int(node_data.get("max_iterations", 10)), ) ) elif directive.type == DirectiveType.GRAPH_EDGE: edge_data = directive.content edges.append( GraphEdgeIR( source=edge_data.get("source", ""), target=edge_data.get("target", ""), condition=edge_data.get("condition", "true"), ) ) elif directive.type == DirectiveType.QUALITY_GATE: gate_data = directive.content bars = gate_data.get("bars", []) # Handle {"items": [...]} wrapper if isinstance(bars, dict) and "items" in bars: bars = bars["items"] quality_gates.append( QualityGateIR( before=gate_data.get("before", ""), role=gate_data.get("role", ""), bars=bars if isinstance(bars, list) else [], blocking=gate_data.get("blocking", True), ) ) # Build the LoopIR if we have meta if meta and "id" in meta: # Extract version (default to 1) version_str = meta.get("version", "1") try: version = int(version_str) except (ValueError, TypeError): version = 1 loops[meta["id"]] = LoopIR( id=meta["id"], name=meta.get("name", meta["id"]), trigger=meta.get("trigger", "manual"), entry_point=meta.get("entry_point", ""), exit_point=meta.get("exit_point"), version=version, nodes=nodes, edges=edges, quality_gates=quality_gates, ) return loops
[docs] def validate_loops( domain_dir: str | Path, roles: dict[str, RoleIR], ) -> dict[str, LoopIR]: """Parse and validate loop definitions (no code generation). Loop definitions serve as documentation and guidance for SR orchestration. They are NOT compiled to executable graphs. This function parses them for validation purposes only. Parameters ---------- domain_dir : str | Path Path to the domain directory containing loops/ subdirectory. roles : dict[str, RoleIR] Available role definitions (for validation). Returns ------- dict[str, LoopIR] Dictionary mapping loop ID to LoopIR (for validation/reference). Raises ------ ValueError If a loop references a role that doesn't exist. """ domain_path = Path(domain_dir) loops_path = domain_path / "loops" # Parse all MyST files in loops directory loops_by_id = _parse_loop_files(loops_path) # Convert to IR structures loops = _extract_loops(loops_by_id) # Validate that all referenced roles exist for loop_id, loop in loops.items(): for node in loop.nodes: if node.role not in roles: raise ValueError( f"Loop '{loop_id}' references unknown role '{node.role}' in node '{node.id}'" ) return loops
[docs] def compile_domain( domain_dir: str | Path = "src/questfoundry/domain", output_dir: str | Path = "src/questfoundry/generated", *, validate: bool = True, ) -> dict[str, Path]: """Compile full domain to generated code. This is the main entry point for compilation. It compiles: - ontology/ → generated/models/ - roles/ → generated/roles/ - loops/ → generated/loops/ Parameters ---------- domain_dir : str | Path Path to the domain directory. output_dir : str | Path Path to the generated output directory. validate : bool, optional If True, also validate loop definitions against roles (default: True). Returns ------- dict[str, Path] Dictionary mapping filename to path of all generated files. """ domain_path = Path(domain_dir) output_path = Path(output_dir) all_generated: dict[str, Path] = {} # Compile ontology models_output = output_path / "models" ontology_result = compile_ontology(domain_path, models_output) all_generated.update(ontology_result) # Compile roles roles_output = output_path / "roles" roles_result = compile_roles(domain_path, roles_output) all_generated.update(roles_result) # Extract roles for loop validation roles_path = domain_path / "roles" roles_by_id = _parse_role_files(roles_path) roles = _extract_roles(roles_by_id) # Validate and compile loops loops = validate_loops(domain_path, roles) if validate else {} # If validation was skipped, still need to parse loops for generation if not validate: loops_path = domain_path / "loops" loops_by_id = _parse_loop_files(loops_path) loops = _extract_loops(loops_by_id) # Generate loops code if loops: loops_output = output_path / "loops" loops_result = generate_loops(loops, loops_output) all_generated.update(loops_result) return all_generated
# ============================================================================= # CLI Entry Point # ============================================================================= if __name__ == "__main__": import sys # Default paths relative to project root domain = "src/questfoundry/domain" output = "src/questfoundry/generated" # Allow override via args if len(sys.argv) > 1: domain = sys.argv[1] if len(sys.argv) > 2: output = sys.argv[2] print(f"Compiling domain: {domain}") print(f"Output directory: {output}") result = compile_domain(domain, output) print("\nGenerated files:") for name, path in sorted(result.items()): print(f" {name}: {path}") print(f"\nTotal: {len(result)} files generated")