From 4fe7b0d7cabdab37bbafbef38d3b32f000e05733 Mon Sep 17 00:00:00 2001 From: 041 <381151237@qq.com> Date: Fri, 3 Apr 2026 10:46:35 +0800 Subject: [PATCH 1/7] feat(rules): add three-tier rules system with /rules slash command Implement a comprehensive rules system to manage development guidelines and coding standards across builtin, user, and project levels. Features: - Three-tier rule hierarchy: builtin (~/.config/agents/rules/) -> user -> project (.agents/rules/) - YAML frontmatter support for rule metadata (name, description, paths, priority, extends) - /rules slash commands for listing, viewing, and toggling rules (list, show, on, off, reset) - Automatic rule detection based on file path patterns - State persistence in .agents/rules.state.toml - System prompt injection with size (32 KiB) and count (10) limits Includes: - Bug fixes for async/await issues and KaosPath conversion - UI polish for /rules list display format New modules: src/kimi_cli/rules/ (models, discovery, parser, registry, injector, state) --- src/kimi_cli/agents/default/system.md | 10 + src/kimi_cli/config.py | 31 +++ src/kimi_cli/rules/__init__.py | 32 +++ src/kimi_cli/rules/common/coding-style.md | 32 +++ src/kimi_cli/rules/common/testing.md | 38 +++ src/kimi_cli/rules/discovery.py | 164 ++++++++++++ src/kimi_cli/rules/injector.py | 195 ++++++++++++++ src/kimi_cli/rules/models.py | 75 ++++++ src/kimi_cli/rules/parser.py | 141 ++++++++++ src/kimi_cli/rules/python/coding-style.md | 71 +++++ src/kimi_cli/rules/registry.py | 312 ++++++++++++++++++++++ src/kimi_cli/rules/state.py | 212 +++++++++++++++ src/kimi_cli/soul/agent.py | 41 +++ src/kimi_cli/soul/slash.py | 221 +++++++++++++++ 14 files changed, 1575 insertions(+) create mode 100644 src/kimi_cli/rules/__init__.py create mode 100644 src/kimi_cli/rules/common/coding-style.md create mode 100644 src/kimi_cli/rules/common/testing.md create mode 100644 src/kimi_cli/rules/discovery.py create mode 100644 src/kimi_cli/rules/injector.py create mode 100644 src/kimi_cli/rules/models.py create mode 100644 src/kimi_cli/rules/parser.py create mode 100644 src/kimi_cli/rules/python/coding-style.md create mode 100644 src/kimi_cli/rules/registry.py create mode 100644 src/kimi_cli/rules/state.py diff --git a/src/kimi_cli/agents/default/system.md b/src/kimi_cli/agents/default/system.md index 2d9623b0c..185f306f2 100644 --- a/src/kimi_cli/agents/default/system.md +++ b/src/kimi_cli/agents/default/system.md @@ -126,6 +126,16 @@ When working on files in subdirectories, always check whether those directories If you modified any files/styles/structures/configurations/workflows/... mentioned in `AGENTS.md` files, you MUST update the corresponding `AGENTS.md` files to keep them up-to-date. +# Active Rules + +The following development rules are currently enabled. You MUST follow these standards when writing or modifying code. Use `/rules list` to see all available rules and `/rules on|off ` to toggle them. + +{% if KIMI_ACTIVE_RULES %} +${KIMI_ACTIVE_RULES} +{% else %} +_No rules are currently active. Add rules to `~/.config/agents/rules/` or `.agents/rules/` to enable development guidelines._ +{% endif %} + # Skills Skills are reusable, composable capabilities that enhance your abilities. Each skill is a self-contained directory with a `SKILL.md` file that contains instructions, examples, and/or reference material. diff --git a/src/kimi_cli/config.py b/src/kimi_cli/config.py index 78019555a..1b716a886 100644 --- a/src/kimi_cli/config.py +++ b/src/kimi_cli/config.py @@ -172,6 +172,34 @@ class MCPConfig(BaseModel): ) +class RulesConfig(BaseModel): + """Rules system configuration for development guidelines.""" + + enabled: bool = Field( + default=True, + description="Enable the rules system for development guidelines", + ) + auto_enable_by_path: bool = Field( + default=True, + description="Automatically enable rules matching current project file paths", + ) + max_active_rules: int = Field( + default=10, + ge=1, + le=50, + description="Maximum number of active rules to inject into system prompt", + ) + max_total_size: int = Field( + default=32 * 1024, + ge=0, + description="Maximum total size of rules content in bytes (0 = unlimited)", + ) + include_source: bool = Field( + default=False, + description="Include rule source (level/id) in system prompt output", + ) + + class Config(BaseModel): """Main configuration structure.""" @@ -211,6 +239,9 @@ class Config(BaseModel): services: Services = Field(default_factory=Services, description="Services configuration") mcp: MCPConfig = Field(default_factory=MCPConfig, description="MCP configuration") hooks: list[HookDef] = Field(default_factory=list, description="Hook definitions") # pyright: ignore[reportUnknownVariableType] + rules: RulesConfig = Field( + default_factory=RulesConfig, description="Rules system configuration" + ) merge_all_available_skills: bool = Field( default=False, description=( diff --git a/src/kimi_cli/rules/__init__.py b/src/kimi_cli/rules/__init__.py new file mode 100644 index 000000000..a93f2480f --- /dev/null +++ b/src/kimi_cli/rules/__init__.py @@ -0,0 +1,32 @@ +"""Rules system for development guidelines and coding standards.""" + +from __future__ import annotations + +from kimi_cli.rules.discovery import ( + get_builtin_rules_dir, + get_user_rules_dir_candidates, + get_project_rules_dir_candidates, + resolve_rules_roots, + find_first_existing_dir, +) +from kimi_cli.rules.models import Rule, RuleMetadata, RuleState +from kimi_cli.rules.parser import parse_rule_file +from kimi_cli.rules.registry import RulesRegistry +from kimi_cli.rules.injector import RulesInjector, load_active_rules +from kimi_cli.rules.state import RulesStateManager + +__all__ = [ + "get_builtin_rules_dir", + "get_user_rules_dir_candidates", + "get_project_rules_dir_candidates", + "resolve_rules_roots", + "find_first_existing_dir", + "Rule", + "RuleMetadata", + "RuleState", + "parse_rule_file", + "RulesRegistry", + "RulesInjector", + "load_active_rules", + "RulesStateManager", +] diff --git a/src/kimi_cli/rules/common/coding-style.md b/src/kimi_cli/rules/common/coding-style.md new file mode 100644 index 000000000..1fe92a41c --- /dev/null +++ b/src/kimi_cli/rules/common/coding-style.md @@ -0,0 +1,32 @@ +--- +name: "Coding Style" +description: "General coding style guidelines for all languages" +priority: 100 +--- + +# Coding Style Guidelines + +## Code Organization + +- **Small, focused files**: Aim for 200-400 lines per file, maximum 800 lines +- **Single responsibility**: Each file/module should have one clear purpose +- **Meaningful names**: Use descriptive variable, function, and class names + +## Code Quality + +- **Functions should be small**: Ideally under 50 lines +- **Avoid deep nesting**: Maximum 4 levels of indentation +- **Fail fast**: Validate inputs and preconditions early +- **No silent failures**: Always handle errors explicitly + +## Comments and Documentation + +- **Self-documenting code**: Prefer clear names over comments +- **Why, not what**: Comments should explain intent, not mechanics +- **Keep comments current**: Update or remove outdated comments + +## General Principles + +- **DRY (Don't Repeat Yourself)**: Extract common logic into reusable functions +- **YAGNI (You Aren't Gonna Need It)**: Don't add functionality until necessary +- **KISS (Keep It Simple, Stupid)**: Simple solutions are better than clever ones diff --git a/src/kimi_cli/rules/common/testing.md b/src/kimi_cli/rules/common/testing.md new file mode 100644 index 000000000..82ba4706c --- /dev/null +++ b/src/kimi_cli/rules/common/testing.md @@ -0,0 +1,38 @@ +--- +name: "Testing Standards" +description: "Guidelines for writing effective tests" +priority: 110 +--- + +# Testing Standards + +## Test Coverage + +- **Test critical paths**: Focus on business logic and edge cases +- **Aim for meaningful coverage**: Quality over quantity (80% is a good target) +- **Test behavior, not implementation**: Tests should verify what code does, not how + +## Test Structure + +- **Arrange-Act-Assert**: Structure tests clearly + - Arrange: Set up test data and conditions + - Act: Execute the code being tested + - Assert: Verify the expected outcome + +## Test Naming + +- **Descriptive names**: Test names should explain the scenario being tested +- **Pattern**: `test___` +- **Example**: `test_calculate_discount_negative_price_raises_error` + +## Test Independence + +- **Isolated tests**: Each test should be independent and not rely on others +- **Clean state**: Tests should clean up after themselves or use fresh fixtures +- **Deterministic**: Tests should produce the same result every time + +## Test Maintenance + +- **Keep tests simple**: Test code should be simpler than production code +- **Refactor tests**: Don't be afraid to improve test code structure +- **Review test failures**: Never ignore failing tests diff --git a/src/kimi_cli/rules/discovery.py b/src/kimi_cli/rules/discovery.py new file mode 100644 index 000000000..2f9449711 --- /dev/null +++ b/src/kimi_cli/rules/discovery.py @@ -0,0 +1,164 @@ +"""Directory discovery for Rules system. + +Follows the same pattern as skills discovery for consistency. +""" + +from __future__ import annotations + +from collections.abc import Iterable, Sequence +from pathlib import Path + +from kaos.path import KaosPath + + +def get_builtin_rules_dir() -> Path: + """Get the built-in rules directory path (distributed with CLI).""" + return Path(__file__).parent.parent / "rules" + + +def get_user_rules_dir_candidates() -> tuple[KaosPath, ...]: + """ + Get user-level rules directory candidates in priority order. + + Mirrors the skill discovery priority: + - ~/.config/agents/rules/ (recommended, consistent with skills) + - ~/.agents/rules/ + - ~/.kimi/rules/ (backward compatibility) + - ~/.claude/rules/ + - ~/.codex/rules/ + """ + return ( + KaosPath.home() / ".config" / "agents" / "rules", + KaosPath.home() / ".agents" / "rules", + KaosPath.home() / ".kimi" / "rules", + KaosPath.home() / ".claude" / "rules", + KaosPath.home() / ".codex" / "rules", + ) + + +def get_project_rules_dir_candidates(work_dir: KaosPath) -> tuple[KaosPath, ...]: + """ + Get project-level rules directory candidates in priority order. + + Mirrors the skill discovery priority: + - .agents/rules/ + - .kimi/rules/ + - .claude/rules/ + - .codex/rules/ + """ + return ( + work_dir / ".agents" / "rules", + work_dir / ".kimi" / "rules", + work_dir / ".claude" / "rules", + work_dir / ".codex" / "rules", + ) + + +async def find_first_existing_dir(candidates: Iterable[KaosPath]) -> KaosPath | None: + """ + Return the first existing directory from candidates. + """ + for candidate in candidates: + if await candidate.is_dir(): + return candidate + return None + + +async def resolve_rules_roots( + work_dir: KaosPath, + *, + rules_dirs: Sequence[KaosPath] | None = None, + include_builtin: bool = True, +) -> list[KaosPath]: + """ + Resolve layered rule roots in priority order. + + Priority (high to low): + 1. Project-level rules (.agents/rules/) + 2. User-level rules (~/.config/agents/rules/) + 3. Built-in rules (distributed with CLI) + + When ``rules_dirs`` is provided, it overrides user/project discovery. + + Args: + work_dir: Current working directory for project-level discovery + rules_dirs: Optional custom directories to override discovery + include_builtin: Whether to include built-in rules + + Returns: + List of rule directory roots in priority order + """ + roots: list[KaosPath] = [] + + if rules_dirs: + # Custom directories override discovery + roots.extend(rules_dirs) + else: + # Project-level rules have highest priority + if project_dir := await find_first_existing_dir(get_project_rules_dir_candidates(work_dir)): + roots.append(project_dir) + + # User-level rules + if user_dir := await find_first_existing_dir(get_user_rules_dir_candidates()): + roots.append(user_dir) + + # Built-in rules have lowest priority (serve as defaults) + if include_builtin: + builtin_dir = get_builtin_rules_dir() + if builtin_dir.is_dir(): + roots.append(KaosPath.unsafe_from_local_path(builtin_dir)) + + return roots + + +async def discover_rule_files(rules_dir: KaosPath) -> list[KaosPath]: + """ + Discover all rule files in a directory. + + Rule files are .md files in subdirectories: + - common/coding-style.md + - python/testing.md + - etc. + + Files directly in the root are ignored; they must be in a category directory. + + Args: + rules_dir: Root directory containing rule categories + + Returns: + List of paths to rule files + """ + from kimi_cli.utils.logging import logger + + is_dir = await rules_dir.is_dir() + logger.debug("discover_rule_files: rules_dir={path} is_dir={is_dir}", path=rules_dir, is_dir=is_dir) + + if not is_dir: + return [] + + rule_files: list[KaosPath] = [] + + async for category_dir in rules_dir.iterdir(): + if not await category_dir.is_dir(): + continue + + # Skip hidden directories + if category_dir.name.startswith("."): + continue + + async for item in category_dir.iterdir(): + if not await item.is_file(): + continue + + # Only .md files + if not item.name.endswith(".md"): + continue + + # Skip hidden files and common non-rule files + if item.name.startswith(".") or item.name.lower() in ("readme.md", "index.md"): + continue + + rule_files.append(item) + + # Sort for deterministic ordering + return sorted(rule_files, key=lambda p: str(p)) diff --git a/src/kimi_cli/rules/injector.py b/src/kimi_cli/rules/injector.py new file mode 100644 index 000000000..ad659cefe --- /dev/null +++ b/src/kimi_cli/rules/injector.py @@ -0,0 +1,195 @@ +"""Rules injector for system prompt integration.""" + +from __future__ import annotations + +from pathlib import Path +from typing import TYPE_CHECKING + +from kimi_cli.rules.discovery import resolve_rules_roots +from kimi_cli.rules.parser import parse_rule_file +from kimi_cli.utils.logging import logger + +if TYPE_CHECKING: + from kaos.path import KaosPath + from kimi_cli.rules.models import Rule + + +# Maximum total size of rules content to inject +MAX_RULES_CONTENT_SIZE = 32 * 1024 # 32KB + + +class RulesInjector: + """ + Handles injection of active rules into system prompt. + + This is a lightweight alternative to full RulesRegistry for cases + where we just need to load and format rules without state management. + """ + + def __init__( + self, + work_dir: KaosPath, + max_size: int = MAX_RULES_CONTENT_SIZE, + ): + self.work_dir = work_dir + self.max_size = max_size + + async def load_active_rules( + self, + file_path: Path | None = None, + ) -> list[Rule]: + """ + Load rules that should be active for the given context. + + This is a simplified version that loads all discovered rules + without state management. For full control, use RulesRegistry. + + Args: + file_path: Optional file path to filter applicable rules + + Returns: + List of active rules + """ + from kimi_cli.rules.parser import should_apply_rule + + roots = await resolve_rules_roots(self.work_dir) + + rules: list[Rule] = [] + seen_ids: set[str] = set() + + # Load from each root (project overrides user overrides builtin) + for root in reversed(roots): # Reverse to apply priority correctly + level = self._determine_level(root) + + from kimi_cli.rules.discovery import discover_rule_files + rule_files = await discover_rule_files(root) + + for rule_file in rule_files: + try: + rule = parse_rule_file( + rule_file.to_path(), + level=level, + rules_root=root.to_path(), + ) + + # Skip duplicates (higher priority wins) + if rule.id in seen_ids: + continue + + # Check file path match if specified + if file_path and not should_apply_rule(rule, file_path): + continue + + rules.append(rule) + seen_ids.add(rule.id) + + except Exception as e: + logger.warning( + "Failed to parse rule {path}: {error}", + path=rule_file, + error=e, + ) + + # Sort by priority + return sorted(rules, key=lambda r: r.metadata.priority) + + def _determine_level(self, root: KaosPath) -> str: + """Determine rule level from root path.""" + from pathlib import Path + + root_str = str(root).replace("\\", "/") + + builtin_dir = str(Path(__file__).parent.parent / "rules").replace("\\", "/") + if root_str == builtin_dir: + return "builtin" + + work_dir_str = str(self.work_dir).replace("\\", "/") + if root_str.startswith(work_dir_str): + return "project" + + return "user" + + def format_rules_content( + self, + rules: list[Rule], + include_source: bool = False, + ) -> str: + """ + Format rules for injection into system prompt. + + Args: + rules: List of rules to format + include_source: Whether to include source annotations + + Returns: + Formatted rules content + """ + if not rules: + return "" + + parts: list[str] = [] + total_size = 0 + + for rule in rules: + content = rule.content + + # Build header + if include_source: + header = f"## {rule.name} ({rule.level}/{rule.id})\n\n" + else: + header = f"## {rule.name}\n\n" + + full_section = header + content + "\n\n" + section_size = len(full_section.encode("utf-8")) + + # Check size limit + if total_size + section_size > self.max_size: + remaining = self.max_size - total_size + if remaining > 100: # Add truncated notice if space permits + parts.append( + "\n> _Additional rules truncated due to size limit._\n" + ) + break + + parts.append(full_section) + total_size += section_size + + return "".join(parts).strip() + + async def get_injection_content( + self, + file_path: Path | None = None, + include_source: bool = False, + ) -> str: + """ + Get formatted rules content ready for system prompt injection. + + Args: + file_path: Optional file path to filter applicable rules + include_source: Whether to include source annotations + + Returns: + Formatted rules content (may be empty if no rules match) + """ + rules = await self.load_active_rules(file_path) + return self.format_rules_content(rules, include_source) + + +async def load_active_rules( + work_dir: KaosPath, + file_path: Path | None = None, + max_size: int = MAX_RULES_CONTENT_SIZE, +) -> str: + """ + Convenience function to load and format active rules. + + Args: + work_dir: Current working directory + file_path: Optional file path to filter applicable rules + max_size: Maximum content size in bytes + + Returns: + Formatted rules content for system prompt + """ + injector = RulesInjector(work_dir, max_size=max_size) + return await injector.get_injection_content(file_path) diff --git a/src/kimi_cli/rules/models.py b/src/kimi_cli/rules/models.py new file mode 100644 index 000000000..d2d547de4 --- /dev/null +++ b/src/kimi_cli/rules/models.py @@ -0,0 +1,75 @@ +"""Data models for Rules system.""" + +from __future__ import annotations + +from dataclasses import dataclass, field +from pathlib import Path +from typing import Literal + + +@dataclass(frozen=True, slots=True) +class RuleMetadata: + """Rule 文件的 YAML frontmatter 元数据""" + + name: str | None = None # 规则显示名称 + description: str | None = None # 规则描述 + paths: list[str] = field(default_factory=list) # 匹配的文件路径 glob 模式 + priority: int = 100 # 优先级(数值越小越优先) + extends: list[str] = field(default_factory=list) # 继承的其他规则 + + +@dataclass(frozen=True, slots=True) +class Rule: + """单个 Rule 定义""" + + id: str # 唯一标识符(如 "common/coding-style") + name: str # 显示名称 + description: str # 描述 + source: Path # 文件路径 + level: Literal["builtin", "user", "project"] # 层级 + category: str # 分类(如 "common", "python") + metadata: RuleMetadata # 元数据 + content: str # 规则内容(不含 frontmatter) + + @property + def full_id(self) -> str: + """返回包含层级的完整 ID""" + return f"{self.level}/{self.id}" + + +@dataclass +class RuleState: + """Rule 开关状态(持久化到 rules.state.toml)""" + + enabled: bool = True # 是否启用 + pinned: bool = False # 是否固定(不受自动检测影响) + last_modified: str | None = None # 最后修改时间 ISO 格式 + + def to_dict(self) -> dict: + """转换为字典用于序列化""" + result: dict = {"enabled": self.enabled} + if self.pinned: + result["pinned"] = True + if self.last_modified: + result["last_modified"] = self.last_modified + return result + + @classmethod + def from_dict(cls, data: dict) -> RuleState: + """从字典反序列化""" + return cls( + enabled=data.get("enabled", True), + pinned=data.get("pinned", False), + last_modified=data.get("last_modified"), + ) + + +@dataclass +class RulesStats: + """Rules 统计信息""" + + total: int = 0 + enabled: int = 0 + builtin: int = 0 + user: int = 0 + project: int = 0 diff --git a/src/kimi_cli/rules/parser.py b/src/kimi_cli/rules/parser.py new file mode 100644 index 000000000..3dd45777b --- /dev/null +++ b/src/kimi_cli/rules/parser.py @@ -0,0 +1,141 @@ +"""Rule file parser with YAML frontmatter support.""" + +from __future__ import annotations + +import re +from pathlib import Path +from typing import TYPE_CHECKING + +from kimi_cli.utils.logging import logger + +if TYPE_CHECKING: + from kimi_cli.rules.models import Rule + + +def parse_frontmatter(content: str) -> tuple[dict, str]: + """ + Parse YAML frontmatter from content. + + Frontmatter format: + --- + name: "Rule Name" + description: "Rule description" + paths: ["**/*.py"] + priority: 100 + --- + + Returns: + Tuple of (metadata_dict, body_content) + """ + frontmatter_pattern = r'^---\s*\n(.*?)\n---\s*\n' + match = re.match(frontmatter_pattern, content, re.DOTALL) + + if match: + import yaml + try: + metadata_yaml = match.group(1) + body = content[match.end():] + metadata = yaml.safe_load(metadata_yaml) or {} + if not isinstance(metadata, dict): + logger.warning("Invalid frontmatter format, expected dict") + metadata = {} + return metadata, body + except yaml.YAMLError as e: + logger.warning(f"Failed to parse frontmatter: {e}") + return {}, content + + # No frontmatter found + return {}, content + + +def parse_rule_file( + path: Path, + level: str, + rules_root: Path | None = None, +) -> "Rule": + """ + Parse a single rule file, extracting frontmatter and content. + + Args: + path: Path to the .md rule file + level: Rule level ("builtin", "user", or "project") + rules_root: Root directory for this level (for generating rule ID) + + Returns: + Rule object + """ + from kimi_cli.rules.models import Rule, RuleMetadata + + content = path.read_text(encoding="utf-8") + metadata_dict, body = parse_frontmatter(content) + + # Determine category and name from path + # Path structure: //.md + if rules_root: + rel_path = path.relative_to(rules_root) + category = rel_path.parent.name + name_from_path = rel_path.stem + else: + # Fallback: use parent directory name + category = path.parent.name + name_from_path = path.stem + + # Build rule ID: category/name + rule_id = f"{category}/{name_from_path}" + + # Build metadata + metadata = RuleMetadata( + name=metadata_dict.get("name"), + description=metadata_dict.get("description"), + paths=metadata_dict.get("paths", []), + priority=metadata_dict.get("priority", 100), + extends=metadata_dict.get("extends", []), + ) + + # Use filename as fallback for name + display_name = metadata.name or name_from_path.replace("-", " ").title() + description = metadata.description or f"{display_name} guidelines" + + return Rule( + id=rule_id, + name=display_name, + description=description, + source=path, + level=level, # type: ignore + category=category, + metadata=metadata, + content=body.strip(), + ) + + +def should_apply_rule(rule: "Rule", file_path: Path | None) -> bool: + """ + Check if a rule should apply to a given file path. + + Rules without paths metadata apply to all files. + Rules with paths metadata only apply if the file matches one of the patterns. + + Args: + rule: The rule to check + file_path: Path to check against (None means check all rules) + + Returns: + True if the rule should apply + """ + from fnmatch import fnmatch + + # No paths specified = applies to all + if not rule.metadata.paths: + return True + + # No file path provided = check if rule has paths (could apply) + if file_path is None: + return True + + path_str = str(file_path).replace("\\", "/") + + for pattern in rule.metadata.paths: + if fnmatch(path_str, pattern): + return True + + return False diff --git a/src/kimi_cli/rules/python/coding-style.md b/src/kimi_cli/rules/python/coding-style.md new file mode 100644 index 000000000..db45a32ef --- /dev/null +++ b/src/kimi_cli/rules/python/coding-style.md @@ -0,0 +1,71 @@ +--- +name: "Python Coding Style" +description: "Python-specific coding conventions" +priority: 50 +paths: + - "**/*.py" + - "**/*.pyi" +--- + +# Python Coding Style + +## Standards + +- **Follow PEP 8**: Use standard Python naming and formatting conventions +- **Type annotations**: Add type hints to all function signatures +- **Docstrings**: Use Google-style or NumPy-style docstrings for public APIs + +## Code Patterns + +### Immutability + +Prefer immutable data structures where possible: + +```python +from dataclasses import dataclass +from typing import NamedTuple, Final + +# Use frozen dataclasses +@dataclass(frozen=True) +class Config: + name: str + value: int + +# Or NamedTuple for simple cases +class Point(NamedTuple): + x: float + y: float + +# Constants should be UPPER_CASE +MAX_RETRIES: Final[int] = 3 +``` + +### Error Handling + +- **Use specific exceptions**: Catch specific exceptions, not bare `except:` +- **Provide context**: Include relevant information in exception messages +- **Don't swallow exceptions**: Log or re-raise, don't silently ignore + +```python +# Good +try: + data = load_config(path) +except FileNotFoundError as e: + logger.error("Config file not found: %s", path) + raise ConfigError(f"Cannot load config from {path}") from e + +# Bad +try: + data = load_config(path) +except: # Too broad! + pass # Silently ignoring! +``` + +## Tooling + +When available, prefer these tools for code quality: + +- **ruff**: Fast Python linter (replaces flake8, pylint) +- **black**: Code formatter +- **pyright** or **mypy**: Type checking +- **pytest**: Testing framework diff --git a/src/kimi_cli/rules/registry.py b/src/kimi_cli/rules/registry.py new file mode 100644 index 000000000..2d431d367 --- /dev/null +++ b/src/kimi_cli/rules/registry.py @@ -0,0 +1,312 @@ +"""Rules Registry - manages rule discovery, registration, and state.""" + +from __future__ import annotations + +from pathlib import Path +from typing import TYPE_CHECKING + +from kaos.path import KaosPath + +from kimi_cli.rules.discovery import discover_rule_files, resolve_rules_roots +from kimi_cli.rules.models import Rule, RuleState, RulesStats +from kimi_cli.rules.parser import parse_rule_file, should_apply_rule +from kimi_cli.rules.state import RulesStateManager +from kimi_cli.utils.logging import logger + +if TYPE_CHECKING: + from kimi_cli.config import RulesConfig + + +class RulesRegistry: + """ + Registry for managing rules across all levels. + + Handles: + - Discovering rules from builtin/user/project directories + - Loading and parsing rule files + - Managing rule enable/disable state + - Matching rules to file paths + """ + + def __init__( + self, + work_dir: KaosPath, + config: "RulesConfig | None" = None, + state_manager: RulesStateManager | None = None, + ): + self.work_dir = work_dir + self.config = config + self.state_manager = state_manager or RulesStateManager(work_dir) + + self._rules: dict[str, Rule] = {} # rule_id -> Rule + self._state: dict[str, RuleState] = {} # rule_id -> RuleState + self._loaded = False + + async def load(self) -> None: + """Load all rules from all levels.""" + if self._loaded: + return + + # Resolve rule directories + include_builtin = self.config.enabled if self.config else True + roots = await resolve_rules_roots( + self.work_dir, + include_builtin=include_builtin, + ) + + # Load rules from each root (later roots override earlier ones for same ID) + for root in roots: + await self._load_from_root(root) + + # Load persisted states + await self._load_states() + + self._loaded = True + logger.info( + "Loaded {count} rules from {levels}", + count=len(self._rules), + levels=[r.name for r in roots], + ) + + async def _load_from_root(self, root: KaosPath) -> None: + """Load rules from a single root directory.""" + # Determine level from root path + level = self._determine_level(root) + + logger.debug("Loading rules from {root} (level={level})", root=root, level=level) + + rule_files = await discover_rule_files(root) + logger.debug("Discovered {count} rule files in {root}", count=len(rule_files), root=root) + + for rule_file in rule_files: + try: + rule = parse_rule_file( + rule_file.to_path(), + level=level, + rules_root=root.to_path(), + ) + + # Store rule (overwrites if same ID from lower priority level) + self._rules[rule.id] = rule + + except Exception as e: + logger.warning( + "Failed to parse rule file {path}: {error}", + path=rule_file, + error=e, + ) + + def _determine_level(self, root: KaosPath) -> str: + """Determine rule level from root path.""" + root_str = str(root).replace("\\", "/") + + # Check for builtin + builtin_dir = str(Path(__file__).parent.parent / "rules").replace("\\", "/") + if root_str == builtin_dir: + return "builtin" + + # Check for project-level (within work_dir) + work_dir_str = str(self.work_dir).replace("\\", "/") + if root_str.startswith(work_dir_str): + return "project" + + # Otherwise user-level + return "user" + + async def _load_states(self) -> None: + """Load persisted rule states.""" + if not self.state_manager: + return + + await self.state_manager.load() + + # Apply auto-enable logic if configured + if self.config and self.config.auto_enable_by_path: + await self._auto_enable_rules() + + # Merge persisted states + for rule_id, state in self.state_manager.get_all_states().items(): + if rule_id in self._rules: + self._state[rule_id] = state + + async def _auto_enable_rules(self) -> None: + """Auto-enable rules based on current project context.""" + # Detect project type from files + project_types = await self._detect_project_types() + + for rule in self._rules.values(): + # Skip if already has state + if rule.id in self._state: + continue + + # Auto-enable based on category matching project type + if rule.category in project_types: + self._state[rule.id] = RuleState(enabled=True, pinned=False) + logger.debug( + "Auto-enabled rule {rule_id} for {category}", + rule_id=rule.id, + category=rule.category, + ) + + async def _detect_project_types(self) -> set[str]: + """Detect project types from file patterns.""" + types: set[str] = set() + + # Common project type detection + detection_patterns = { + "python": ["*.py", "pyproject.toml", "requirements.txt", "setup.py"], + "typescript": ["*.ts", "*.tsx", "package.json", "tsconfig.json"], + "javascript": ["*.js", "*.jsx", "package.json"], + "go": ["*.go", "go.mod"], + "rust": ["*.rs", "Cargo.toml"], + "java": ["*.java", "pom.xml", "build.gradle"], + "csharp": ["*.cs", "*.csproj", "*.sln"], + "cpp": ["*.cpp", "*.hpp", "*.c", "*.h", "CMakeLists.txt"], + "php": ["*.php", "composer.json"], + "swift": ["*.swift", "Package.swift"], + "kotlin": ["*.kt", "*.kts", "build.gradle.kts"], + } + + for lang, patterns in detection_patterns.items(): + for pattern in patterns: + if await self._any_file_exists(pattern): + types.add(lang) + break + + # Always include common + types.add("common") + + return types + + async def _any_file_exists(self, pattern: str) -> bool: + """Check if any file matching pattern exists in work_dir.""" + import fnmatch + + try: + async for item in self.work_dir.rglob("*"): + if fnmatch.fnmatch(item.name, pattern): + return True + except Exception: + pass + return False + + def get_rule(self, rule_id: str) -> Rule | None: + """Get a rule by ID.""" + return self._rules.get(rule_id) + + def get_all_rules(self) -> list[Rule]: + """Get all rules sorted by priority.""" + return sorted(self._rules.values(), key=lambda r: r.metadata.priority) + + def get_rules_by_level(self, level: str) -> list[Rule]: + """Get rules filtered by level.""" + return [r for r in self._rules.values() if r.level == level] + + def get_rules_by_category(self, category: str) -> list[Rule]: + """Get rules filtered by category.""" + return [r for r in self._rules.values() if r.category == category] + + def get_active_rules(self, file_path: Path | None = None) -> list[Rule]: + """ + Get currently enabled rules, optionally filtered by file path. + + Args: + file_path: If provided, only return rules applicable to this file + + Returns: + List of active rules sorted by priority + """ + active: list[Rule] = [] + + for rule_id, rule in self._rules.items(): + # Check if enabled + state = self._state.get(rule_id, RuleState()) + if not state.enabled: + continue + + # Check file path match + if file_path and not should_apply_rule(rule, file_path): + continue + + active.append(rule) + + return sorted(active, key=lambda r: r.metadata.priority) + + def is_enabled(self, rule_id: str) -> bool: + """Check if a rule is enabled.""" + state = self._state.get(rule_id, RuleState()) + return state.enabled + + def toggle(self, rule_id: str, enabled: bool) -> bool: + """ + Enable or disable a rule. + + Returns: + True if successful, False if rule not found + """ + if rule_id not in self._rules: + return False + + from datetime import datetime + + state = self._state.get(rule_id, RuleState()) + state.enabled = enabled + state.pinned = True # User action pins the state + state.last_modified = datetime.now().isoformat() + self._state[rule_id] = state + + # Persist state + if self.state_manager: + self.state_manager.set_state(rule_id, state) + + logger.info( + "Rule {rule_id} {action}", + rule_id=rule_id, + action="enabled" if enabled else "disabled", + ) + return True + + def reset_to_defaults(self, level: str | None = None) -> None: + """ + Reset rules to default state. + + Args: + level: If specified, only reset rules from this level + """ + ids_to_reset = [ + rid for rid, r in self._rules.items() + if level is None or r.level == level + ] + + for rule_id in ids_to_reset: + if rule_id in self._state: + del self._state[rule_id] + + # Persist + if self.state_manager: + self.state_manager.clear_states(level) + + logger.info("Reset {count} rules to defaults", count=len(ids_to_reset)) + + def get_stats(self) -> RulesStats: + """Get statistics about loaded rules.""" + stats = RulesStats(total=len(self._rules)) + + for rule_id, rule in self._rules.items(): + if self.is_enabled(rule_id): + stats.enabled += 1 + + match rule.level: + case "builtin": + stats.builtin += 1 + case "user": + stats.user += 1 + case "project": + stats.project += 1 + + return stats + + async def save_states(self) -> None: + """Persist current rule states.""" + if self.state_manager: + await self.state_manager.save() diff --git a/src/kimi_cli/rules/state.py b/src/kimi_cli/rules/state.py new file mode 100644 index 000000000..01c09e91a --- /dev/null +++ b/src/kimi_cli/rules/state.py @@ -0,0 +1,212 @@ +"""Rules state persistence management.""" + +from __future__ import annotations + +from pathlib import Path + +import tomlkit +from kaos.path import KaosPath +from tomlkit.exceptions import TOMLKitError + +from kimi_cli.utils.logging import logger + +from kimi_cli.rules.models import RuleState + + +STATE_FILENAME = "rules.state.toml" +USER_STATE_DIR = ".config/agents" # ~/.config/agents/ + + +class RulesStateManager: + """ + Manages persistence of rule enable/disable states. + + States are stored at two levels: + - User level: ~/.config/agents/rules.state.toml (global preferences) + - Project level: .agents/rules.state.toml (project-specific overrides) + + Project-level states take precedence over user-level states. + """ + + def __init__( + self, + work_dir: KaosPath, + user_state_path: Path | None = None, + ): + self.work_dir = work_dir + self._states: dict[str, RuleState] = {} + self._loaded = False + + # Determine state file paths + if user_state_path: + self.user_state_path = user_state_path + else: + self.user_state_path = ( + Path.home() / USER_STATE_DIR / STATE_FILENAME + ) + + # Project state path - will be determined async in load() + self._project_state_path: Path | None = None + + async def _get_project_state_path(self) -> Path | None: + """Async version to find project state path.""" + candidates = [ + (self.work_dir / ".agents" / STATE_FILENAME, self.work_dir / ".agents"), + (self.work_dir / ".kimi" / STATE_FILENAME, self.work_dir / ".kimi"), + ] + + for state_path, parent_dir in candidates: + if await parent_dir.is_dir(): + return state_path.to_path() if hasattr(state_path, 'to_path') else state_path + + # Default to .agents if neither exists (will create on save) + return (self.work_dir / ".agents" / STATE_FILENAME).to_path() + + async def load(self) -> None: + """Load states from both user and project levels.""" + if self._loaded: + return + + self._states = {} + + # Load user-level states first (lowest priority) + user_states = await self._load_state_file(self.user_state_path) + self._states.update(user_states) + + # Load project-level states (highest priority) + project_path = await self._get_project_state_path() + if project_path: + project_states = await self._load_state_file(project_path) + self._states.update(project_states) + + self._loaded = True + logger.debug("Loaded {count} rule states", count=len(self._states)) + + async def _load_state_file(self, path: Path) -> dict[str, RuleState]: + """Load states from a single TOML file.""" + states: dict[str, RuleState] = {} + + # Convert to KaosPath for async operations if needed + kaos_path = KaosPath(str(path)) + if not await kaos_path.is_file(): + return states + + try: + content = await kaos_path.read_text(encoding="utf-8") + data = tomlkit.loads(content) + + # Handle versioned format + rules_data = data.get("rules", {}) + + for rule_id, rule_data in rules_data.items(): + if isinstance(rule_data, dict): + states[rule_id] = RuleState.from_dict(rule_data) + else: + # Simple boolean format for backward compatibility + states[rule_id] = RuleState(enabled=bool(rule_data)) + + except (TOMLKitError, Exception) as e: + logger.warning("Failed to load rule states from {path}: {error}", path=path, error=e) + + return states + + async def save(self) -> None: + """Save states to appropriate level files.""" + # Separate states by level preference + user_states: dict[str, RuleState] = {} + project_states: dict[str, RuleState] = {} + + for rule_id, state in self._states.items(): + # Pinned states go to user level unless they came from project + # For now, save all to user level for simplicity + # TODO: Track origin level and save accordingly + user_states[rule_id] = state + + # Save user-level states + if user_states: + await self._save_state_file(self.user_state_path, user_states) + + # Save project-level states if any + if project_states: + project_path = await self._get_project_state_path() + if project_path: + await self._save_state_file(project_path, project_states) + + async def _save_state_file(self, path: Path, states: dict[str, RuleState]) -> None: + """Save states to a single TOML file.""" + try: + # Ensure directory exists + path.parent.mkdir(parents=True, exist_ok=True) + + # Build TOML document + doc = tomlkit.document() + doc["version"] = "1" + doc["updated_at"] = __import__("datetime").datetime.now().isoformat() + + rules_table = tomlkit.table() + for rule_id, state in sorted(states.items()): + state_dict = state.to_dict() + if len(state_dict) == 1 and "enabled" in state_dict: + # Simple format for just enabled flag + rules_table[rule_id] = state_dict["enabled"] + else: + rules_table[rule_id] = state_dict + + doc["rules"] = rules_table + + # Write atomically + path.write_text(tomlkit.dumps(doc), encoding="utf-8") + logger.debug("Saved {count} rule states to {path}", count=len(states), path=path) + + except Exception as e: + logger.error("Failed to save rule states to {path}: {error}", path=path, error=e) + + def get_state(self, rule_id: str) -> RuleState | None: + """Get state for a specific rule.""" + return self._states.get(rule_id) + + def set_state(self, rule_id: str, state: RuleState) -> None: + """Set state for a specific rule.""" + self._states[rule_id] = state + + def get_all_states(self) -> dict[str, RuleState]: + """Get all loaded states.""" + return dict(self._states) + + def clear_states(self, level: str | None = None) -> None: + """ + Clear states. + + Args: + level: If specified, only clear states from this level + ("user" or "project"). Currently clears all. + """ + self._states.clear() + + async def migrate_from_legacy(self) -> None: + """ + Migrate states from legacy locations if present. + + Legacy locations: + - ~/.kimi/rules.state.toml + """ + legacy_paths = [ + Path.home() / ".kimi" / STATE_FILENAME, + ] + + for legacy_path in legacy_paths: + if legacy_path.exists() and not self.user_state_path.exists(): + try: + states = await self._load_state_file(legacy_path) + if states: + self._states.update(states) + await self.save() + logger.info( + "Migrated rule states from {legacy} to {new}", + legacy=legacy_path, + new=self.user_state_path, + ) + # Rename legacy file + legacy_path.rename(legacy_path.with_suffix(".toml.bak")) + except Exception as e: + logger.warning("Failed to migrate legacy states: {error}", error=e) diff --git a/src/kimi_cli/soul/agent.py b/src/kimi_cli/soul/agent.py index 8d47db568..823d52776 100644 --- a/src/kimi_cli/soul/agent.py +++ b/src/kimi_cli/soul/agent.py @@ -62,9 +62,12 @@ class BuiltinSystemPromptArgs: """The operating system kind, e.g. 'Windows', 'macOS', 'Linux'.""" KIMI_SHELL: str """The shell executable used by the Shell tool, e.g. 'bash (`/bin/bash`)'.""" + KIMI_ACTIVE_RULES: str + """The formatted content of active development rules for injection into system prompt.""" _AGENTS_MD_MAX_BYTES = 32 * 1024 # 32 KiB +_RULES_MAX_BYTES = 32 * 1024 # 32 KiB async def _find_project_root(work_dir: KaosPath) -> KaosPath: @@ -208,6 +211,8 @@ class Runtime: role: Literal["root", "subagent"] = "root" hook_engine: Any = None """HookEngine instance, set by KimiCLI after soul creation.""" + rules_registry: Any = None + """RulesRegistry instance for managing development rules. Set during Runtime.create.""" def __post_init__(self) -> None: if self.subagent_store is None: @@ -308,6 +313,39 @@ def _on_approval_change() -> None: config.notifications, ) + # Load rules if enabled + rules_registry = None + active_rules_content = "" + if config.rules.enabled: + from kimi_cli.rules import RulesRegistry, RulesStateManager + + rules_registry = RulesRegistry( + session.work_dir, + config=config.rules, + state_manager=RulesStateManager(session.work_dir), + ) + await rules_registry.load() + + # Format active rules for system prompt + active_rules = rules_registry.get_active_rules() + if active_rules: + max_size = config.rules.max_total_size or _RULES_MAX_BYTES + lines = [] + total_size = 0 + for rule in sorted(active_rules, key=lambda r: r.metadata.priority): + section = f"## {rule.name}\n\n{rule.content}\n\n" + section_size = len(section.encode("utf-8")) + if total_size + section_size > max_size: + break + lines.append(section) + total_size += section_size + active_rules_content = "".join(lines).strip() + logger.info( + "Loaded {count} active rules ({size} bytes)", + count=len(active_rules), + size=total_size, + ) + return Runtime( config=config, oauth=oauth, @@ -322,6 +360,7 @@ def _on_approval_change() -> None: KIMI_ADDITIONAL_DIRS_INFO=additional_dirs_info, KIMI_OS=environment.os_kind, KIMI_SHELL=f"{environment.shell_name} (`{environment.shell_path}`)", + KIMI_ACTIVE_RULES=active_rules_content, ), denwa_renji=DenwaRenji(), approval=Approval(state=approval_state), @@ -344,6 +383,7 @@ def _on_approval_change() -> None: approval_runtime=ApprovalRuntime(), root_wire_hub=RootWireHub(), role="root", + rules_registry=rules_registry, ) def copy_for_subagent( @@ -376,6 +416,7 @@ def copy_for_subagent( subagent_id=agent_id, subagent_type=subagent_type, role="subagent", + rules_registry=self.rules_registry, ) diff --git a/src/kimi_cli/soul/slash.py b/src/kimi_cli/soul/slash.py index fbe5a4541..a2b469762 100644 --- a/src/kimi_cli/soul/slash.py +++ b/src/kimi_cli/soul/slash.py @@ -283,3 +283,224 @@ async def import_context(soul: KimiSoul, args: str): "The content is now part of your session context." ) ) + + +# ============================================================================= +# Rules System Commands +# ============================================================================= + + +@registry.command +async def rules(soul: KimiSoul, args: str): + """ + Manage development rules. Usage: /rules [list|show|on|off|reset] [args] + + Commands: + list [level] [--all] List all rules with status + show Show rule content + on Enable a rule + off Disable a rule + reset [--hard] Reset rules to defaults + """ + args_str = args.strip() + parts = args_str.split(maxsplit=1) if args_str else [] + subcmd = parts[0].lower() if parts else "list" + subargs = parts[1] if len(parts) > 1 else "" + + # Ensure rules registry is loaded + if soul.runtime.rules_registry is None: + wire_send(TextPart(text="Rules system is not initialized.")) + return + + registry = soul.runtime.rules_registry + await registry.load() + + match subcmd: + case "list": + await _rules_list(registry, subargs) + case "show": + await _rules_show(registry, subargs) + case "on" | "enable": + await _rules_toggle(registry, subargs, enabled=True) + case "off" | "disable": + await _rules_toggle(registry, subargs, enabled=False) + case "reset": + await _rules_reset(registry, subargs) + case _: + wire_send(TextPart(text=f"Unknown /rules subcommand: {subcmd}\nUsage: /rules [list|show|on|off|reset]")) + + +async def _rules_list(registry, args: str) -> None: + """List all rules with their status.""" + show_disabled = "--all" in args or "--show-disabled" in args + level_filter = args.replace("--all", "").replace("--show-disabled", "").strip() or None + + rules = registry.get_all_rules() + if not rules: + wire_send(TextPart(text="No rules found. Rules are loaded from:\n - ~/.config/agents/rules/\n - .agents/rules/ (project)\n - Built-in defaults")) + return + + # Group by level + by_level: dict[str, list] = {"builtin": [], "user": [], "project": []} + for rule in rules: + if level_filter and rule.level != level_filter: + continue + by_level[rule.level].append(rule) + + lines = [] + stats = registry.get_stats() + lines.append(f"Rules: {stats.enabled}/{stats.total} enabled\n") + + for level_name, level_rules in by_level.items(): + if not level_rules: + continue + + # Determine display path + match level_name: + case "builtin": + header = "Built-in Rules" + case "user": + header = "User Rules (~/.config/agents/rules/)" + case "project": + header = "Project Rules (.agents/rules/)" + case _: + header = f"{level_name.title()} Rules" + + lines.append("") + lines.append(f"{header}:") + + first_rule = True + for rule in level_rules: + enabled = registry.is_enabled(rule.id) + # Always show enabled rules, only show disabled with --all + if not enabled and not show_disabled: + continue + + # Add blank line between rules for visual separation + if not first_rule: + lines.append("") + first_rule = False + + status = "✓" if enabled else "✗" + state = registry._state.get(rule.id) + is_pinned = state is not None and state.pinned + + # Vertical format: each rule on its own line + status_icon = "✅" if enabled else "❌" + lines.append(f" {status_icon} {rule.id}") + if rule.description: + desc = rule.description[:60] + if len(rule.description) > 60: + desc += "..." + # Add pinned emoji at start of description if pinned + prefix = "📌 " if is_pinned else "" + lines.append(f" {prefix}{desc}") + + lines.append("") + lines.append("Tip: Use `/rules show ` to view rule content, `/rules on|off ` to toggle.") + if not show_disabled: + lines.append(" Use `/rules list --all` to show disabled rules.") + wire_send(TextPart(text="\n".join(lines))) + + +async def _rules_show(registry, rule_id: str) -> None: + """Show rule content.""" + if not rule_id: + wire_send(TextPart(text="Usage: /rules show \nExample: /rules show common/coding-style")) + return + + # Try to find rule with various ID formats + rule = registry.get_rule(rule_id) + + # Try without level prefix + if not rule and "/" in rule_id: + for r in registry.get_all_rules(): + if r.id == rule_id or r.id.endswith(rule_id): + rule = r + break + + if not rule: + wire_send(TextPart(text=f"Rule not found: {rule_id}\nUse `/rules list` to see available rules.")) + return + + status = "enabled" if registry.is_enabled(rule.id) else "disabled" + + lines = [ + f"Rule: {rule.name}", + f"ID: {rule.id}", + f"Level: {rule.level}", + f"Category: {rule.category}", + f"Status: {status}", + f"Priority: {rule.metadata.priority}", + ] + + if rule.metadata.paths: + lines.append(f"Applies to: {', '.join(rule.metadata.paths)}") + + lines.extend([ + "", + "-" * 40, + rule.content, + ]) + + wire_send(TextPart(text="\n".join(lines))) + + +async def _rules_toggle(registry, rule_id: str, enabled: bool) -> None: + """Enable or disable a rule.""" + if not rule_id: + action = "enable" if enabled else "disable" + wire_send(TextPart(text=f"Usage: /rules {'on' if enabled else 'off'} \nExample: /rules {'on' if enabled else 'off'} common/coding-style")) + return + + # Support wildcards + if "*" in rule_id: + matched = 0 + for rule in registry.get_all_rules(): + import fnmatch + if fnmatch.fnmatch(rule.id, rule_id) or fnmatch.fnmatch(f"{rule.level}/{rule.id}", rule_id): + registry.toggle(rule.id, enabled) + matched += 1 + + if matched > 0: + await registry.save_states() + action = "Enabled" if enabled else "Disabled" + wire_send(TextPart(text=f"{action} {matched} rules matching '{rule_id}'")) + else: + wire_send(TextPart(text=f"No rules match pattern: {rule_id}")) + return + + # Single rule toggle + success = registry.toggle(rule_id, enabled) + + if not success: + # Try to find with partial match + for rule in registry.get_all_rules(): + if rule.id.endswith(rule_id) or rule.name.lower() == rule_id.lower(): + success = registry.toggle(rule.id, enabled) + rule_id = rule.id + break + + if success: + await registry.save_states() + action = "enabled" if enabled else "disabled" + wire_send(TextPart(text=f"Rule '{rule_id}' is now {action}.")) + + # Refresh system prompt to reflect changes + wire_send(TextPart(text="Rules will take effect on the next turn.")) + else: + wire_send(TextPart(text=f"Rule not found: {rule_id}\nUse `/rules list` to see available rules.")) + + +async def _rules_reset(registry, args: str) -> None: + """Reset rules to defaults.""" + hard = "--hard" in args + + registry.reset_to_defaults() + await registry.save_states() + + msg = "Rules reset to defaults." + if hard: + msg += " All custom states cleared." + + wire_send(TextPart(text=msg)) From c5837b7f2944b369ee3758e6d64d87b6f6e30eaa Mon Sep 17 00:00:00 2001 From: 041 <381151237@qq.com> Date: Tue, 7 Apr 2026 10:34:25 +0800 Subject: [PATCH 2/7] docs: update changelog and slash commands for /rules feature --- CHANGELOG.md | 2 ++ docs/en/guides/interaction.md | 2 +- docs/en/reference/slash-commands.md | 18 +++++++++++++++++- docs/en/release-notes/changelog.md | 2 ++ docs/zh/guides/interaction.md | 2 +- docs/zh/reference/slash-commands.md | 16 ++++++++++++++++ docs/zh/release-notes/changelog.md | 2 ++ 7 files changed, 41 insertions(+), 3 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 2bd59759e..5539d48e1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,8 @@ Only write entries that are worth mentioning to users. ## Unreleased +- Core: Add three-tier rules system for development guidelines — introduces builtin, user (`~/.config/agents/rules/`), and project (`.agents/rules/`) level rules with YAML frontmatter support (name, description, paths, priority, extends); active rules are automatically injected into the system prompt with size (32 KiB default) and count (10 default) limits; rules can be enabled/disabled per path pattern +- Shell: Add `/rules` command to manage development rules — supports `list [level] [--all]`, `show `, `on/off `, and `reset [--hard]` subcommands; rules status is persisted per project - Todo: Refactor SetTodoList to persist state and prevent tool call storms — todos are now persisted to session state (root agent) and independent state files (sub-agents); adds query mode (omit `todos` to read current state) and clear mode (pass `[]`); includes anti-storm guidance in tool description to prevent repeated calls without progress (fixes #1710) - ReadFile: Add total line count to every read response and support negative `line_offset` for tail mode — the tool now reports `Total lines in file: N.` in its message so the model can plan subsequent reads; negative `line_offset` (e.g. `-100`) reads the last N lines using a sliding window, useful for viewing recent log output without shell commands; the absolute value is capped at 1000 (MAX_LINES) - Shell: Fix black background on inline code and code blocks in Markdown rendering — `NEUTRAL_MARKDOWN_THEME` now overrides all Rich default `markdown.*` styles to `"none"`, preventing Rich's built-in `"cyan on black"` from leaking through on non-black terminals diff --git a/docs/en/guides/interaction.md b/docs/en/guides/interaction.md index 071687245..fb18f2caf 100644 --- a/docs/en/guides/interaction.md +++ b/docs/en/guides/interaction.md @@ -19,7 +19,7 @@ $ git status $ npm run build ``` -Shell mode also supports some slash commands, including `/help`, `/exit`, `/version`, `/editor`, `/changelog`, `/feedback`, `/export`, `/import`, and `/task`. +Shell mode also supports some slash commands, including `/help`, `/exit`, `/version`, `/editor`, `/changelog`, `/feedback`, `/export`, `/import`, `/rules`, and `/task`. ::: warning Note In shell mode, each command executes independently. Commands that change the environment like `cd` or `export` won't affect subsequent commands. diff --git a/docs/en/reference/slash-commands.md b/docs/en/reference/slash-commands.md index 67d35d694..d6b920db3 100644 --- a/docs/en/reference/slash-commands.md +++ b/docs/en/reference/slash-commands.md @@ -3,7 +3,7 @@ Slash commands are built-in commands for Kimi Code CLI, used to control sessions, configuration, and debugging. Enter a command starting with `/` in the input box to trigger. ::: tip Shell mode -Some slash commands are also available in shell mode, including `/help`, `/exit`, `/version`, `/editor`, `/theme`, `/changelog`, `/feedback`, `/export`, `/import`, and `/task`. +Some slash commands are also available in shell mode, including `/help`, `/exit`, `/version`, `/editor`, `/theme`, `/changelog`, `/feedback`, `/export`, `/import`, `/rules`, and `/task`. ::: ## Help and info @@ -262,6 +262,22 @@ Usage: When plan mode is enabled, the prompt changes to `📋` and a blue `plan` badge appears in the status bar. +### `/rules` + +Manage development rules. Rules are development guidelines and coding standards defined for projects, supported at three levels: builtin, user (`~/.config/agents/rules/`), and project (`.agents/rules/`). Active rules are automatically injected into the system prompt. + +Usage: + +- `/rules list [level] [--all]`: List all rules and their status + - `level`: Optional filter, specify `builtin`, `user`, or `project` + - `--all`: Also show disabled rules +- `/rules show `: View the full content of a specified rule +- `/rules on `: Enable a specified rule +- `/rules off `: Disable a specified rule +- `/rules reset [--hard]`: Reset rule status to defaults, `--hard` also deletes the state file + +Rule status is persisted per project in `.agents/rules.state.toml`. + ### `/task` Open the interactive task browser to view, monitor, and manage background tasks. diff --git a/docs/en/release-notes/changelog.md b/docs/en/release-notes/changelog.md index 76c3d921a..42f2eb1be 100644 --- a/docs/en/release-notes/changelog.md +++ b/docs/en/release-notes/changelog.md @@ -4,6 +4,8 @@ This page documents the changes in each Kimi Code CLI release. ## Unreleased +- Core: Add three-tier rules system for development guidelines — introduces builtin, user (`~/.config/agents/rules/`), and project (`.agents/rules/`) level rules with YAML frontmatter support (name, description, paths, priority, extends); active rules are automatically injected into the system prompt with size (32 KiB default) and count (10 default) limits; rules can be enabled/disabled per path pattern +- Shell: Add `/rules` command to manage development rules — supports `list [level] [--all]`, `show `, `on/off `, and `reset [--hard]` subcommands; rules status is persisted per project - Todo: Refactor SetTodoList to persist state and prevent tool call storms — todos are now persisted to session state (root agent) and independent state files (sub-agents); adds query mode (omit `todos` to read current state) and clear mode (pass `[]`); includes anti-storm guidance in tool description to prevent repeated calls without progress (fixes #1710) - ReadFile: Add total line count to every read response and support negative `line_offset` for tail mode — the tool now reports `Total lines in file: N.` in its message so the model can plan subsequent reads; negative `line_offset` (e.g. `-100`) reads the last N lines using a sliding window, useful for viewing recent log output without shell commands; the absolute value is capped at 1000 (MAX_LINES) - Shell: Fix black background on inline code and code blocks in Markdown rendering — `NEUTRAL_MARKDOWN_THEME` now overrides all Rich default `markdown.*` styles to `"none"`, preventing Rich's built-in `"cyan on black"` from leaking through on non-black terminals diff --git a/docs/zh/guides/interaction.md b/docs/zh/guides/interaction.md index cedff60ef..83bea082a 100644 --- a/docs/zh/guides/interaction.md +++ b/docs/zh/guides/interaction.md @@ -19,7 +19,7 @@ $ git status $ npm run build ``` -Shell 模式也支持部分斜杠命令,包括 `/help`、`/exit`、`/version`、`/editor`、`/changelog`、`/feedback`、`/export`、`/import` 和 `/task`。 +Shell 模式也支持部分斜杠命令,包括 `/help`、`/exit`、`/version`、`/editor`、`/changelog`、`/feedback`、`/export`、`/import`、`/rules` 和 `/task`。 ::: warning 注意 Shell 模式中每个命令独立执行,`cd`、`export` 等改变环境的命令不会影响后续命令。 diff --git a/docs/zh/reference/slash-commands.md b/docs/zh/reference/slash-commands.md index ef36e7f2d..ad5fe4099 100644 --- a/docs/zh/reference/slash-commands.md +++ b/docs/zh/reference/slash-commands.md @@ -262,6 +262,22 @@ Flow Skill 也可以通过 `/skill:` 调用,此时作为普通 Skill 加 开启 Plan 模式后,提示符变为 `📋`,底部状态栏显示蓝色的 `plan` 标识。 +### `/rules` + +管理开发规则(Rules)。Rules 是为项目定义的开发指南和编码标准,支持三层:内置(builtin)、用户级(`~/.config/agents/rules/`)和项目级(`.agents/rules/`)。激活的规则会自动注入到系统提示词中。 + +用法: + +- `/rules list [level] [--all]`:列出所有规则及其状态 + - `level`:可选过滤器,指定 `builtin`、`user` 或 `project` + - `--all`:同时显示已禁用的规则 +- `/rules show `:查看指定规则的完整内容 +- `/rules on `:启用指定规则 +- `/rules off `:禁用指定规则 +- `/rules reset [--hard]`:重置规则状态为默认值,`--hard` 同时删除状态文件 + +规则状态按项目持久化存储在 `.agents/rules.state.toml` 中。 + ### `/task` 打开交互式任务浏览器,查看、监控和管理后台任务。 diff --git a/docs/zh/release-notes/changelog.md b/docs/zh/release-notes/changelog.md index e59a29819..f94724019 100644 --- a/docs/zh/release-notes/changelog.md +++ b/docs/zh/release-notes/changelog.md @@ -4,6 +4,8 @@ ## 未发布 +- Core:新增三层规则系统(Rules)用于开发指南和编码标准——支持内置(builtin)、用户级(`~/.config/agents/rules/`)和项目级(`.agents/rules/`)三层规则,规则文件使用带 YAML frontmatter(name、description、paths、priority、extends)的 Markdown 格式;激活的规则会自动注入到系统提示词中,受大小限制(默认 32 KiB)和数量限制(默认 10 条)约束;支持按文件路径模式自动启用规则 +- Shell:新增 `/rules` 命令用于管理开发规则——支持 `list [level] [--all]`、`show `、`on/off ` 和 `reset [--hard]` 子命令;规则状态按项目持久化存储 - Todo:重构 `SetTodoList` 工具,支持状态持久化并防止工具调用风暴——待办事项现在会持久化到会话状态(主 Agent)和独立状态文件(子 Agent);新增查询模式(省略 `todos` 参数可读取当前状态)和清空模式(传 `[]` 清空);工具描述中增加了防风暴指导,防止在没有实际进展的情况下反复调用(修复 #1710) - ReadFile:每次读取返回文件总行数,并支持负数 `line_offset` 实现 tail 模式——工具现在会在消息中报告 `Total lines in file: N.`,方便模型规划后续读取;负数 `line_offset`(如 `-100`)通过滑动窗口读取文件末尾 N 行,适用于无需 Shell 命令即可查看最新日志输出的场景;绝对值上限为 1000(MAX_LINES) - Shell:修复 Markdown 渲染中行内代码和代码块出现黑色背景的问题——`NEUTRAL_MARKDOWN_THEME` 现在将所有 Rich 默认的 `markdown.*` 样式覆盖为 `"none"`,防止 Rich 内置的 `"cyan on black"` 在非黑色背景终端上泄露 From f2ffc0ccf806107d9b3673b39d64b83adedd24f6 Mon Sep 17 00:00:00 2001 From: 041 <381151237@qq.com> Date: Tue, 7 Apr 2026 10:58:34 +0800 Subject: [PATCH 3/7] test(rules): add comprehensive tests for rules system --- src/kimi_cli/rules/state.py | 4 +- tests/core/test_rules.py | 534 ++++++++++++++++++++++++++++++++++++ 2 files changed, 536 insertions(+), 2 deletions(-) create mode 100644 tests/core/test_rules.py diff --git a/src/kimi_cli/rules/state.py b/src/kimi_cli/rules/state.py index 01c09e91a..101d67a79 100644 --- a/src/kimi_cli/rules/state.py +++ b/src/kimi_cli/rules/state.py @@ -57,10 +57,10 @@ async def _get_project_state_path(self) -> Path | None: for state_path, parent_dir in candidates: if await parent_dir.is_dir(): - return state_path.to_path() if hasattr(state_path, 'to_path') else state_path + return state_path.unsafe_to_local_path() # Default to .agents if neither exists (will create on save) - return (self.work_dir / ".agents" / STATE_FILENAME).to_path() + return (self.work_dir / ".agents" / STATE_FILENAME).unsafe_to_local_path() async def load(self) -> None: """Load states from both user and project levels.""" diff --git a/tests/core/test_rules.py b/tests/core/test_rules.py new file mode 100644 index 000000000..4b12bca38 --- /dev/null +++ b/tests/core/test_rules.py @@ -0,0 +1,534 @@ +"""Tests for Rules system.""" + +from __future__ import annotations + +from pathlib import Path +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest +import yaml + +from kimi_cli.rules.discovery import discover_rule_files, resolve_rules_roots +from kimi_cli.rules.injector import RulesInjector +from kimi_cli.rules.models import Rule, RuleMetadata, RuleState, RulesStats +from kimi_cli.rules.parser import parse_frontmatter, parse_rule_file, should_apply_rule +from kimi_cli.rules.registry import RulesRegistry +from kimi_cli.rules.state import RulesStateManager + + +class TestParseFrontmatter: + """Test YAML frontmatter parsing.""" + + def test_parse_valid_frontmatter(self): + """Parse frontmatter with all fields.""" + content = """--- +name: "Python Style" +description: "Python coding guidelines" +paths: ["**/*.py", "**/pyproject.toml"] +priority: 10 +extends: ["common/coding-style"] +--- +Use type hints for function parameters. +""" + metadata, body = parse_frontmatter(content) + + assert metadata["name"] == "Python Style" + assert metadata["description"] == "Python coding guidelines" + assert metadata["paths"] == ["**/*.py", "**/pyproject.toml"] + assert metadata["priority"] == 10 + assert metadata["extends"] == ["common/coding-style"] + assert "Use type hints" in body + + def test_parse_no_frontmatter(self): + """Content without frontmatter returns empty metadata.""" + content = "Just plain content without frontmatter." + metadata, body = parse_frontmatter(content) + + assert metadata == {} + assert body == content + + def test_parse_empty_frontmatter(self): + """Handle empty frontmatter block - returns content after frontmatter.""" + content = "---\n---\nContent here." + metadata, body = parse_frontmatter(content) + + assert metadata == {} + # Empty frontmatter returns body including the trailing newlines + assert "Content here" in body + + def test_parse_invalid_yaml(self): + """Invalid YAML in frontmatter returns empty metadata.""" + content = "---\nname: [invalid: yaml: syntax\n---\nBody content." + metadata, body = parse_frontmatter(content) + + assert metadata == {} + assert body == content + + +class TestParseRuleFile: + """Test rule file parsing.""" + + def test_parse_complete_rule(self, tmp_path: Path): + """Parse a complete rule file.""" + rule_file = tmp_path / "python" / "coding-style.md" + rule_file.parent.mkdir(parents=True) + rule_file.write_text( + """--- +name: "Python Coding Style" +description: "Guidelines for Python code" +paths: ["**/*.py"] +priority: 50 +--- +Always use type hints. +Follow PEP 8. +""", + encoding="utf-8", + ) + + rule = parse_rule_file(rule_file, level="project", rules_root=tmp_path) + + assert rule.id == "python/coding-style" + assert rule.name == "Python Coding Style" + assert rule.description == "Guidelines for Python code" + assert rule.level == "project" + assert rule.category == "python" + assert rule.metadata.paths == ["**/*.py"] + assert rule.metadata.priority == 50 + assert "Always use type hints" in rule.content + + def test_parse_rule_without_frontmatter(self, tmp_path: Path): + """Parse rule file without frontmatter uses defaults.""" + rule_file = tmp_path / "common" / "testing.md" + rule_file.parent.mkdir(parents=True) + rule_file.write_text("Write unit tests for all functions.", encoding="utf-8") + + rule = parse_rule_file(rule_file, level="builtin", rules_root=tmp_path) + + assert rule.id == "common/testing" + assert rule.name == "Testing" # Derived from filename + assert rule.metadata.priority == 100 # Default + assert rule.metadata.paths == [] + + def test_parse_rule_fallback_name(self, tmp_path: Path): + """Rule without name in frontmatter uses filename as fallback.""" + rule_file = tmp_path / "my-rules" / "custom-rule.md" + rule_file.parent.mkdir(parents=True) + rule_file.write_text("---\ndescription: Custom\n---\nContent.", encoding="utf-8") + + rule = parse_rule_file(rule_file, level="user", rules_root=tmp_path) + + assert rule.name == "Custom Rule" # Filename converted to title case + + +class TestShouldApplyRule: + """Test rule path matching.""" + + def test_rule_without_paths_applies_to_all(self): + """Rules without paths metadata apply to all files.""" + rule = Rule( + id="common/general", + name="General", + description="General rules", + source=Path("/fake"), + level="builtin", + category="common", + metadata=RuleMetadata(paths=[]), + content="content", + ) + + assert should_apply_rule(rule, Path("/any/file.py")) is True + assert should_apply_rule(rule, None) is True + + def test_rule_with_matching_path(self): + """Rule applies when file matches path pattern.""" + rule = Rule( + id="python/style", + name="Python Style", + description="Python rules", + source=Path("/fake"), + level="builtin", + category="python", + metadata=RuleMetadata(paths=["**/*.py", "**/pyproject.toml"]), + content="content", + ) + + assert should_apply_rule(rule, Path("/src/main.py")) is True + assert should_apply_rule(rule, Path("/pyproject.toml")) is True + assert should_apply_rule(rule, Path("/src/nested/file.py")) is True + + def test_rule_with_non_matching_path(self): + """Rule doesn't apply when file doesn't match.""" + rule = Rule( + id="python/style", + name="Python Style", + description="Python rules", + source=Path("/fake"), + level="builtin", + category="python", + metadata=RuleMetadata(paths=["**/*.py"]), + content="content", + ) + + assert should_apply_rule(rule, Path("/src/main.js")) is False + assert should_apply_rule(rule, Path("/README.md")) is False + + def test_windows_path_separators(self): + """Handle Windows backslash path separators.""" + rule = Rule( + id="python/style", + name="Python Style", + description="Python rules", + source=Path("/fake"), + level="builtin", + category="python", + metadata=RuleMetadata(paths=["**/*.py"]), + content="content", + ) + + assert should_apply_rule(rule, Path("\\src\\main.py")) is True + + +class TestRulesStateManager: + """Test rule state persistence.""" + + @pytest.fixture + def state_manager(self, tmp_path: Path) -> RulesStateManager: + """Create a state manager with temp directory and isolated user state.""" + from kaos.path import KaosPath + work_dir = KaosPath.unsafe_from_local_path(tmp_path) + # Use temp path for user state to avoid loading real user states + user_state_path = tmp_path / "user_rules.state.toml" + return RulesStateManager(work_dir=work_dir, user_state_path=user_state_path) + + async def test_load_nonexistent_state(self, state_manager: RulesStateManager): + """Loading non-existent state returns empty dict.""" + await state_manager.load() + + assert state_manager.get_all_states() == {} + assert state_manager.get_state("any-rule") is None + + async def test_save_and_load_state(self, state_manager: RulesStateManager, tmp_path: Path): + """Save and load rule states.""" + # Create .agents directory + from pathlib import Path + agents_dir = Path(tmp_path / ".agents") + agents_dir.mkdir(exist_ok=True) + + state = RuleState(enabled=True, pinned=True, last_modified="2024-01-01T00:00:00") + state_manager.set_state("python/style", state) + await state_manager.save() + + # Create new manager instance to test loading (reuse same user_state_path) + from kaos.path import KaosPath + new_manager = RulesStateManager( + work_dir=KaosPath.unsafe_from_local_path(tmp_path), + user_state_path=state_manager.user_state_path + ) + await new_manager.load() + + loaded = new_manager.get_state("python/style") + assert loaded is not None + assert loaded.enabled is True + assert loaded.pinned is True + assert loaded.last_modified == "2024-01-01T00:00:00" + + async def test_clear_states(self, state_manager: RulesStateManager): + """Clear all states or filter by level.""" + state_manager.set_state("common/style", RuleState()) + state_manager.set_state("python/style", RuleState()) + await state_manager.save() + + # Clear all + state_manager.clear_states() + + assert state_manager.get_state("common/style") is None + assert state_manager.get_state("python/style") is None +class TestRulesRegistry: + """Test rules registry.""" + + @pytest.fixture + async def registry(self, tmp_path: Path) -> RulesRegistry: + """Create a registry with mocked dependencies.""" + from kaos.path import KaosPath + + work_dir = KaosPath.unsafe_from_local_path(tmp_path) + registry = RulesRegistry(work_dir) + return registry + + def test_get_rule_not_found(self, registry: RulesRegistry): + """Get non-existent rule returns None.""" + assert registry.get_rule("nonexistent") is None + + def test_toggle_nonexistent_rule(self, registry: RulesRegistry): + """Toggle non-existent rule returns False.""" + assert registry.toggle("nonexistent", enabled=False) is False + + def test_is_enabled_default(self, registry: RulesRegistry): + """Rules are enabled by default when no state exists.""" + # Manually add a rule + rule = Rule( + id="test/rule", + name="Test Rule", + description="Test", + source=Path("/fake"), + level="builtin", + category="test", + metadata=RuleMetadata(), + content="content", + ) + registry._rules["test/rule"] = rule + + assert registry.is_enabled("test/rule") is True + + def test_get_active_rules(self, registry: RulesRegistry): + """Get active rules with filtering.""" + # Add rules + rule1 = Rule( + id="common/style", + name="Common Style", + description="Common", + source=Path("/fake"), + level="builtin", + category="common", + metadata=RuleMetadata(paths=[], priority=10), + content="content", + ) + rule2 = Rule( + id="python/style", + name="Python Style", + description="Python", + source=Path("/fake"), + level="builtin", + category="python", + metadata=RuleMetadata(paths=["**/*.py"], priority=20), + content="content", + ) + registry._rules["common/style"] = rule1 + registry._rules["python/style"] = rule2 + + # Disable one rule + registry._state["common/style"] = RuleState(enabled=False) + + # Get all active + active = registry.get_active_rules() + assert len(active) == 1 + assert active[0].id == "python/style" + + # Get active for specific file + active_py = registry.get_active_rules(file_path=Path("/src/main.py")) + assert len(active_py) == 1 + assert active_py[0].id == "python/style" + + # Get active for non-matching file + active_js = registry.get_active_rules(file_path=Path("/src/main.js")) + assert len(active_js) == 0 + + def test_toggle_persists_state(self, registry: RulesRegistry): + """Toggle updates state and marks as pinned.""" + rule = Rule( + id="test/rule", + name="Test Rule", + description="Test", + source=Path("/fake"), + level="builtin", + category="test", + metadata=RuleMetadata(), + content="content", + ) + registry._rules["test/rule"] = rule + + # Mock state manager + registry.state_manager = MagicMock() + registry.state_manager.set_state = MagicMock() + + # Disable rule + result = registry.toggle("test/rule", enabled=False) + + assert result is True + assert registry.is_enabled("test/rule") is False + + # Check state was pinned + state = registry._state["test/rule"] + assert state.pinned is True + assert state.last_modified is not None + + # Check persistence was called + registry.state_manager.set_state.assert_called_once_with("test/rule", state) + + def test_get_stats(self, registry: RulesRegistry): + """Get statistics about rules.""" + # Add rules of different levels + for level, count in [("builtin", 3), ("user", 2), ("project", 1)]: + for i in range(count): + rule = Rule( + id=f"{level}/rule{i}", + name=f"Rule {i}", + description="Test", + source=Path("/fake"), + level=level, + category="test", + metadata=RuleMetadata(), + content="content", + ) + registry._rules[f"{level}/rule{i}"] = rule + + # Disable some rules + registry._state["builtin/rule0"] = RuleState(enabled=False) + registry._state["user/rule0"] = RuleState(enabled=False) + + stats = registry.get_stats() + + assert stats.total == 6 + assert stats.enabled == 4 + assert stats.builtin == 3 + assert stats.user == 2 + assert stats.project == 1 + + def test_get_rules_by_level_and_category(self, registry: RulesRegistry): + """Filter rules by level and category.""" + for id_, level, category in [ + ("common/style", "builtin", "common"), + ("common/testing", "builtin", "common"), + ("python/style", "builtin", "python"), + ("custom/rule", "project", "custom"), + ]: + rule = Rule( + id=id_, + name=id_, + description="Test", + source=Path("/fake"), + level=level, + category=category, + metadata=RuleMetadata(), + content="content", + ) + registry._rules[id_] = rule + + builtin = registry.get_rules_by_level("builtin") + assert len(builtin) == 3 + + common = registry.get_rules_by_category("common") + assert len(common) == 2 + + +class TestRulesInjector: + """Test rules prompt injection.""" + + def test_format_rules_for_prompt(self): + """Format rules for system prompt.""" + rules = [ + Rule( + id="common/style", + name="Common Style", + description="Common coding style", + source=Path("/fake"), + level="builtin", + category="common", + metadata=RuleMetadata(priority=10), + content="Use clear names.", + ), + Rule( + id="python/style", + name="Python Style", + description="Python specific", + source=Path("/fake"), + level="builtin", + category="python", + metadata=RuleMetadata(priority=20), + content="Use type hints.", + ), + ] + + injector = RulesInjector(work_dir=None) # type: ignore + formatted = injector.format_rules_content(rules) + + # Verify rule content is formatted correctly + assert "## Common Style" in formatted + assert "Use clear names." in formatted + assert "## Python Style" in formatted + assert "Use type hints." in formatted + + def test_format_rules_respects_size_limit(self): + """Respect max size limit when formatting rules.""" + # Create a rule with very long content + long_content = "x" * 50000 + rules = [ + Rule( + id="common/style", + name="Common Style", + description="Test", + source=Path("/fake"), + level="builtin", + category="common", + metadata=RuleMetadata(), + content=long_content, + ), + ] + + injector = RulesInjector(work_dir=None, max_size=1000) # type: ignore + formatted = injector.format_rules_content(rules) + + assert len(formatted) <= 1100 # Allow some margin for header/trailer + assert "truncated" in formatted or len(formatted) < len(long_content) + + def test_format_empty_rules(self): + """Format empty rules list.""" + injector = RulesInjector(work_dir=None) # type: ignore + formatted = injector.format_rules_content([]) + assert formatted == "" + + +class TestDiscovery: + """Test rule discovery.""" + + async def test_discover_rule_files(self, tmp_path: Path): + """Discover rule files in directory.""" + from kaos.path import KaosPath + + # Create rule files + (tmp_path / "common").mkdir() + (tmp_path / "common" / "style.md").write_text("content") + (tmp_path / "python").mkdir() + (tmp_path / "python" / "coding.md").write_text("content") + (tmp_path / "python" / "not-a-rule.txt").write_text("content") + + root = KaosPath.unsafe_from_local_path(tmp_path) + files = await discover_rule_files(root) + + assert len(files) == 2 + names = {f.name for f in files} + assert names == {"style.md", "coding.md"} + + async def test_discover_empty_directory(self, tmp_path: Path): + """Discover in empty directory returns empty list.""" + from kaos.path import KaosPath + + root = KaosPath.unsafe_from_local_path(tmp_path) + files = await discover_rule_files(root) + + assert files == [] + + async def test_resolve_rules_roots(self, tmp_path: Path): + """Resolve all rules directories with project rules present.""" + from kaos.path import KaosPath + + # Create project rules directory + (tmp_path / ".agents" / "rules").mkdir(parents=True) + + work_dir = KaosPath.unsafe_from_local_path(tmp_path) + roots = await resolve_rules_roots(work_dir, include_builtin=True) + + # Should include project and builtin (user may or may not exist) + assert len(roots) >= 2 # At least project and builtin + + async def test_resolve_rules_roots_without_builtin(self, tmp_path: Path): + """Resolve rules roots excluding builtin.""" + from kaos.path import KaosPath + + work_dir = KaosPath.unsafe_from_local_path(tmp_path) + roots = await resolve_rules_roots(work_dir, include_builtin=False) + + # Should not include builtin rules + builtin_dir = str(Path(__file__).parent.parent.parent / "src" / "kimi_cli" / "rules") + root_strs = [str(r) for r in roots] + assert builtin_dir not in root_strs From 3e82acce08bfe2ce86152f79608ed7811d74d60d Mon Sep 17 00:00:00 2001 From: 041 <381151237@qq.com> Date: Tue, 7 Apr 2026 12:00:33 +0800 Subject: [PATCH 4/7] fix(rules): address PR review feedback on state management and priority - Add level field to RuleState for state separation by user/project - Fix rule loading priority: project rules now correctly override builtin - Fix injector priority to respect hierarchy (project > user > builtin) - Implement --hard reset to delete state files instead of saving empty - Delete empty state files on save to prevent stale data after reset - Combine nested if statements and use ternary operators for clarity --- src/kimi_cli/rules/__init__.py | 6 +- src/kimi_cli/rules/discovery.py | 6 +- src/kimi_cli/rules/injector.py | 5 +- src/kimi_cli/rules/models.py | 4 +- src/kimi_cli/rules/parser.py | 10 +- src/kimi_cli/rules/registry.py | 28 ++++- src/kimi_cli/rules/state.py | 175 +++++++++++++++++++++++++------- src/kimi_cli/soul/slash.py | 50 ++++++--- 8 files changed, 215 insertions(+), 69 deletions(-) diff --git a/src/kimi_cli/rules/__init__.py b/src/kimi_cli/rules/__init__.py index a93f2480f..b6e3a70b8 100644 --- a/src/kimi_cli/rules/__init__.py +++ b/src/kimi_cli/rules/__init__.py @@ -3,16 +3,16 @@ from __future__ import annotations from kimi_cli.rules.discovery import ( + find_first_existing_dir, get_builtin_rules_dir, - get_user_rules_dir_candidates, get_project_rules_dir_candidates, + get_user_rules_dir_candidates, resolve_rules_roots, - find_first_existing_dir, ) +from kimi_cli.rules.injector import RulesInjector, load_active_rules from kimi_cli.rules.models import Rule, RuleMetadata, RuleState from kimi_cli.rules.parser import parse_rule_file from kimi_cli.rules.registry import RulesRegistry -from kimi_cli.rules.injector import RulesInjector, load_active_rules from kimi_cli.rules.state import RulesStateManager __all__ = [ diff --git a/src/kimi_cli/rules/discovery.py b/src/kimi_cli/rules/discovery.py index 2f9449711..fbef524c2 100644 --- a/src/kimi_cli/rules/discovery.py +++ b/src/kimi_cli/rules/discovery.py @@ -131,7 +131,11 @@ async def discover_rule_files(rules_dir: KaosPath) -> list[KaosPath]: from kimi_cli.utils.logging import logger is_dir = await rules_dir.is_dir() - logger.debug("discover_rule_files: rules_dir={path} is_dir={is_dir}", path=rules_dir, is_dir=is_dir) + logger.debug( + "discover_rule_files: rules_dir={path} is_dir={is_dir}", + path=rules_dir, + is_dir=is_dir, + ) if not is_dir: return [] diff --git a/src/kimi_cli/rules/injector.py b/src/kimi_cli/rules/injector.py index ad659cefe..ce3044028 100644 --- a/src/kimi_cli/rules/injector.py +++ b/src/kimi_cli/rules/injector.py @@ -11,6 +11,7 @@ if TYPE_CHECKING: from kaos.path import KaosPath + from kimi_cli.rules.models import Rule @@ -58,7 +59,9 @@ async def load_active_rules( seen_ids: set[str] = set() # Load from each root (project overrides user overrides builtin) - for root in reversed(roots): # Reverse to apply priority correctly + # resolve_rules_roots returns [project, user, builtin] (high to low priority) + # First-seen wins, so project rules take precedence over duplicates + for root in roots: level = self._determine_level(root) from kimi_cli.rules.discovery import discover_rule_files diff --git a/src/kimi_cli/rules/models.py b/src/kimi_cli/rules/models.py index d2d547de4..2754454c1 100644 --- a/src/kimi_cli/rules/models.py +++ b/src/kimi_cli/rules/models.py @@ -44,9 +44,10 @@ class RuleState: enabled: bool = True # 是否启用 pinned: bool = False # 是否固定(不受自动检测影响) last_modified: str | None = None # 最后修改时间 ISO 格式 + level: Literal["builtin", "user", "project"] | None = None # 规则来源层级 def to_dict(self) -> dict: - """转换为字典用于序列化""" + """转换为字典用于序列化(不包含 level,因为 level 决定保存位置而非内容)""" result: dict = {"enabled": self.enabled} if self.pinned: result["pinned"] = True @@ -61,6 +62,7 @@ def from_dict(cls, data: dict) -> RuleState: enabled=data.get("enabled", True), pinned=data.get("pinned", False), last_modified=data.get("last_modified"), + level=data.get("level"), # May be None for legacy states ) diff --git a/src/kimi_cli/rules/parser.py b/src/kimi_cli/rules/parser.py index 3dd45777b..5a96ccac5 100644 --- a/src/kimi_cli/rules/parser.py +++ b/src/kimi_cli/rules/parser.py @@ -52,7 +52,7 @@ def parse_rule_file( path: Path, level: str, rules_root: Path | None = None, -) -> "Rule": +) -> Rule: """ Parse a single rule file, extracting frontmatter and content. @@ -108,7 +108,7 @@ def parse_rule_file( ) -def should_apply_rule(rule: "Rule", file_path: Path | None) -> bool: +def should_apply_rule(rule: Rule, file_path: Path | None) -> bool: """ Check if a rule should apply to a given file path. @@ -134,8 +134,4 @@ def should_apply_rule(rule: "Rule", file_path: Path | None) -> bool: path_str = str(file_path).replace("\\", "/") - for pattern in rule.metadata.paths: - if fnmatch(path_str, pattern): - return True - - return False + return any(fnmatch(path_str, pattern) for pattern in rule.metadata.paths) diff --git a/src/kimi_cli/rules/registry.py b/src/kimi_cli/rules/registry.py index 2d431d367..d2b7bb37e 100644 --- a/src/kimi_cli/rules/registry.py +++ b/src/kimi_cli/rules/registry.py @@ -8,7 +8,7 @@ from kaos.path import KaosPath from kimi_cli.rules.discovery import discover_rule_files, resolve_rules_roots -from kimi_cli.rules.models import Rule, RuleState, RulesStats +from kimi_cli.rules.models import Rule, RulesStats, RuleState from kimi_cli.rules.parser import parse_rule_file, should_apply_rule from kimi_cli.rules.state import RulesStateManager from kimi_cli.utils.logging import logger @@ -31,7 +31,7 @@ class RulesRegistry: def __init__( self, work_dir: KaosPath, - config: "RulesConfig | None" = None, + config: RulesConfig | None = None, state_manager: RulesStateManager | None = None, ): self.work_dir = work_dir @@ -55,7 +55,9 @@ async def load(self) -> None: ) # Load rules from each root (later roots override earlier ones for same ID) - for root in roots: + # resolve_rules_roots returns [project, user, builtin] (high to low priority) + # We iterate in reverse so project rules are loaded last and override others + for root in reversed(roots): await self._load_from_root(root) # Load persisted states @@ -249,15 +251,16 @@ def toggle(self, rule_id: str, enabled: bool) -> bool: from datetime import datetime + rule = self._rules[rule_id] state = self._state.get(rule_id, RuleState()) state.enabled = enabled state.pinned = True # User action pins the state state.last_modified = datetime.now().isoformat() self._state[rule_id] = state - # Persist state + # Persist state with level info for proper file separation if self.state_manager: - self.state_manager.set_state(rule_id, state) + self.state_manager.set_state(rule_id, state, level=rule.level) logger.info( "Rule {rule_id} {action}", @@ -310,3 +313,18 @@ async def save_states(self) -> None: """Persist current rule states.""" if self.state_manager: await self.state_manager.save() + + async def delete_state_files(self, level: str | None = None) -> None: + """ + Delete state files from disk. + + Args: + level: If specified, only delete the state file for this level + ("user" or "project"). If None, deletes both. + """ + if self.state_manager: + from typing import Literal + level_typed: Literal["user", "project"] | None = ( + level if level in ("user", "project") else None # type: ignore + ) + await self.state_manager.delete_state_files(level_typed) diff --git a/src/kimi_cli/rules/state.py b/src/kimi_cli/rules/state.py index 101d67a79..4bcf024b8 100644 --- a/src/kimi_cli/rules/state.py +++ b/src/kimi_cli/rules/state.py @@ -3,15 +3,14 @@ from __future__ import annotations from pathlib import Path +from typing import Literal import tomlkit from kaos.path import KaosPath from tomlkit.exceptions import TOMLKitError -from kimi_cli.utils.logging import logger - from kimi_cli.rules.models import RuleState - +from kimi_cli.utils.logging import logger STATE_FILENAME = "rules.state.toml" USER_STATE_DIR = ".config/agents" # ~/.config/agents/ @@ -34,7 +33,9 @@ def __init__( user_state_path: Path | None = None, ): self.work_dir = work_dir - self._states: dict[str, RuleState] = {} + # States grouped by level: {"user": {rule_id: RuleState}, "project": {...}} + self._user_states: dict[str, RuleState] = {} + self._project_states: dict[str, RuleState] = {} self._loaded = False # Determine state file paths @@ -67,20 +68,31 @@ async def load(self) -> None: if self._loaded: return - self._states = {} + self._user_states = {} + self._project_states = {} # Load user-level states first (lowest priority) user_states = await self._load_state_file(self.user_state_path) - self._states.update(user_states) + for rule_id, state in user_states.items(): + state.level = "user" + self._user_states[rule_id] = state # Load project-level states (highest priority) project_path = await self._get_project_state_path() if project_path: project_states = await self._load_state_file(project_path) - self._states.update(project_states) + for rule_id, state in project_states.items(): + state.level = "project" + self._project_states[rule_id] = state self._loaded = True - logger.debug("Loaded {count} rule states", count=len(self._states)) + total = len(self._user_states) + len(self._project_states) + logger.debug( + "Loaded {total} rule states ({user} user, {project} project)", + total=total, + user=len(self._user_states), + project=len(self._project_states), + ) async def _load_state_file(self, path: Path) -> dict[str, RuleState]: """Load states from a single TOML file.""" @@ -112,25 +124,34 @@ async def _load_state_file(self, path: Path) -> dict[str, RuleState]: async def save(self) -> None: """Save states to appropriate level files.""" - # Separate states by level preference - user_states: dict[str, RuleState] = {} - project_states: dict[str, RuleState] = {} - - for rule_id, state in self._states.items(): - # Pinned states go to user level unless they came from project - # For now, save all to user level for simplicity - # TODO: Track origin level and save accordingly - user_states[rule_id] = state - - # Save user-level states - if user_states: - await self._save_state_file(self.user_state_path, user_states) - - # Save project-level states if any - if project_states: - project_path = await self._get_project_state_path() - if project_path: - await self._save_state_file(project_path, project_states) + # Save user-level states (builtin + user rules) + if self._user_states: + await self._save_state_file(self.user_state_path, self._user_states) + else: + # Empty states: delete user state file if it exists + if self.user_state_path.exists(): + try: + self.user_state_path.unlink() + logger.debug("Deleted empty user state file: {path}", path=self.user_state_path) + except Exception as e: + logger.warning("Failed to delete empty user state file: {error}", error=e) + + # Save project-level states (project rules) + project_path = await self._get_project_state_path() + if project_path: + if self._project_states: + await self._save_state_file(project_path, self._project_states) + else: + # Empty states: delete project state file if it exists + if project_path.exists(): + try: + project_path.unlink() + logger.debug("Deleted empty project state file: {path}", path=project_path) + except Exception as e: + logger.warning( + "Failed to delete empty project state file: {error}", + error=e, + ) async def _save_state_file(self, path: Path, states: dict[str, RuleState]) -> None: """Save states to a single TOML file.""" @@ -162,26 +183,71 @@ async def _save_state_file(self, path: Path, states: dict[str, RuleState]) -> No logger.error("Failed to save rule states to {path}: {error}", path=path, error=e) def get_state(self, rule_id: str) -> RuleState | None: - """Get state for a specific rule.""" - return self._states.get(rule_id) + """Get state for a specific rule. + + Project-level states take precedence over user-level states. + """ + # Check project-level first (higher priority) + if rule_id in self._project_states: + return self._project_states[rule_id] + # Fall back to user-level + return self._user_states.get(rule_id) - def set_state(self, rule_id: str, state: RuleState) -> None: - """Set state for a specific rule.""" - self._states[rule_id] = state + def set_state( + self, + rule_id: str, + state: RuleState, + level: Literal["builtin", "user", "project"] | None = None, + ) -> None: + """Set state for a specific rule. + + Args: + rule_id: The rule identifier + state: The rule state to save + level: The rule level determining where to save: + - "builtin" or "user" → user-level state file + - "project" → project-level state file + - None → infer from existing state or default to user + """ + # Determine target level + if level is None: + # Try to infer from existing state + target_level = "project" if rule_id in self._project_states else "user" + elif level == "builtin": + # Builtin rules save to user-level + target_level = "user" + else: + target_level = level + + # Store in appropriate bucket + state.level = target_level # type: ignore + if target_level == "project": + self._project_states[rule_id] = state + else: + self._user_states[rule_id] = state def get_all_states(self) -> dict[str, RuleState]: - """Get all loaded states.""" - return dict(self._states) + """Get all loaded states (project-level takes precedence).""" + # Start with user states, then override with project states + all_states = dict(self._user_states) + all_states.update(self._project_states) + return all_states - def clear_states(self, level: str | None = None) -> None: + def clear_states(self, level: Literal["user", "project"] | None = None) -> None: """ Clear states. Args: level: If specified, only clear states from this level - ("user" or "project"). Currently clears all. + ("user" or "project"). If None, clears all. """ - self._states.clear() + if level is None: + self._user_states.clear() + self._project_states.clear() + elif level == "user": + self._user_states.clear() + elif level == "project": + self._project_states.clear() async def migrate_from_legacy(self) -> None: """ @@ -199,7 +265,9 @@ async def migrate_from_legacy(self) -> None: try: states = await self._load_state_file(legacy_path) if states: - self._states.update(states) + for state in states.values(): + state.level = "user" + self._user_states.update(states) await self.save() logger.info( "Migrated rule states from {legacy} to {new}", @@ -210,3 +278,34 @@ async def migrate_from_legacy(self) -> None: legacy_path.rename(legacy_path.with_suffix(".toml.bak")) except Exception as e: logger.warning("Failed to migrate legacy states: {error}", error=e) + + async def delete_state_files(self, level: Literal["user", "project"] | None = None) -> None: + """ + Delete state files from disk. + + Args: + level: If specified, only delete the state file for this level. + If None, deletes both user and project state files. + """ + deleted = [] + + # Delete user-level state file + if (level is None or level == "user") and self.user_state_path.exists(): + try: + self.user_state_path.unlink() + deleted.append(str(self.user_state_path)) + except Exception as e: + logger.warning("Failed to delete user state file: {error}", error=e) + + # Delete project-level state file + if level is None or level == "project": + project_path = await self._get_project_state_path() + if project_path and project_path.exists(): + try: + project_path.unlink() + deleted.append(str(project_path)) + except Exception as e: + logger.warning("Failed to delete project state file: {error}", error=e) + + if deleted: + logger.info("Deleted state files: {files}", files=deleted) diff --git a/src/kimi_cli/soul/slash.py b/src/kimi_cli/soul/slash.py index a2b469762..245d2520f 100644 --- a/src/kimi_cli/soul/slash.py +++ b/src/kimi_cli/soul/slash.py @@ -327,7 +327,10 @@ async def rules(soul: KimiSoul, args: str): case "reset": await _rules_reset(registry, subargs) case _: - wire_send(TextPart(text=f"Unknown /rules subcommand: {subcmd}\nUsage: /rules [list|show|on|off|reset]")) + wire_send(TextPart( + text=f"Unknown /rules subcommand: {subcmd}\n" + f"Usage: /rules [list|show|on|off|reset]" + )) async def _rules_list(registry, args: str) -> None: @@ -337,7 +340,12 @@ async def _rules_list(registry, args: str) -> None: rules = registry.get_all_rules() if not rules: - wire_send(TextPart(text="No rules found. Rules are loaded from:\n - ~/.config/agents/rules/\n - .agents/rules/ (project)\n - Built-in defaults")) + wire_send(TextPart( + text="No rules found. Rules are loaded from:\n" + " - ~/.config/agents/rules/\n" + " - .agents/rules/ (project)\n" + " - Built-in defaults" + )) return # Group by level @@ -381,7 +389,6 @@ async def _rules_list(registry, args: str) -> None: lines.append("") first_rule = False - status = "✓" if enabled else "✗" state = registry._state.get(rule.id) is_pinned = state is not None and state.pinned @@ -397,7 +404,9 @@ async def _rules_list(registry, args: str) -> None: lines.append(f" {prefix}{desc}") lines.append("") - lines.append("Tip: Use `/rules show ` to view rule content, `/rules on|off ` to toggle.") + lines.append( + "Tip: Use `/rules show ` to view rule content, `/rules on|off ` to toggle." + ) if not show_disabled: lines.append(" Use `/rules list --all` to show disabled rules.") wire_send(TextPart(text="\n".join(lines))) @@ -406,7 +415,9 @@ async def _rules_list(registry, args: str) -> None: async def _rules_show(registry, rule_id: str) -> None: """Show rule content.""" if not rule_id: - wire_send(TextPart(text="Usage: /rules show \nExample: /rules show common/coding-style")) + wire_send(TextPart( + text="Usage: /rules show \nExample: /rules show common/coding-style" + )) return # Try to find rule with various ID formats @@ -420,7 +431,9 @@ async def _rules_show(registry, rule_id: str) -> None: break if not rule: - wire_send(TextPart(text=f"Rule not found: {rule_id}\nUse `/rules list` to see available rules.")) + wire_send(TextPart( + text=f"Rule not found: {rule_id}\nUse `/rules list` to see available rules." + )) return status = "enabled" if registry.is_enabled(rule.id) else "disabled" @@ -450,7 +463,10 @@ async def _rules_toggle(registry, rule_id: str, enabled: bool) -> None: """Enable or disable a rule.""" if not rule_id: action = "enable" if enabled else "disable" - wire_send(TextPart(text=f"Usage: /rules {'on' if enabled else 'off'} \nExample: /rules {'on' if enabled else 'off'} common/coding-style")) + cmd = "on" if enabled else "off" + wire_send(TextPart( + text=f"Usage: /rules {cmd} \nExample: /rules {cmd} common/coding-style" + )) return # Support wildcards @@ -458,7 +474,9 @@ async def _rules_toggle(registry, rule_id: str, enabled: bool) -> None: matched = 0 for rule in registry.get_all_rules(): import fnmatch - if fnmatch.fnmatch(rule.id, rule_id) or fnmatch.fnmatch(f"{rule.level}/{rule.id}", rule_id): + if fnmatch.fnmatch(rule.id, rule_id) or fnmatch.fnmatch( + f"{rule.level}/{rule.id}", rule_id + ): registry.toggle(rule.id, enabled) matched += 1 @@ -489,7 +507,9 @@ async def _rules_toggle(registry, rule_id: str, enabled: bool) -> None: # Refresh system prompt to reflect changes wire_send(TextPart(text="Rules will take effect on the next turn.")) else: - wire_send(TextPart(text=f"Rule not found: {rule_id}\nUse `/rules list` to see available rules.")) + wire_send(TextPart( + text=f"Rule not found: {rule_id}\nUse `/rules list` to see available rules." + )) async def _rules_reset(registry, args: str) -> None: @@ -497,10 +517,14 @@ async def _rules_reset(registry, args: str) -> None: hard = "--hard" in args registry.reset_to_defaults() - await registry.save_states() - - msg = "Rules reset to defaults." + if hard: - msg += " All custom states cleared." + # Hard reset: delete state files entirely + await registry.delete_state_files() + msg = "Rules reset to defaults. All custom states cleared and state files deleted." + else: + # Soft reset: save empty states (rules revert to defaults on next load) + await registry.save_states() + msg = "Rules reset to defaults." wire_send(TextPart(text=msg)) From 58369aa6a693587c7a0ee2ce0d8daa2aeafc5eb6 Mon Sep 17 00:00:00 2001 From: 041 <381151237@qq.com> Date: Tue, 7 Apr 2026 12:00:53 +0800 Subject: [PATCH 5/7] test(rules): add tests for state management and priority - Test level separation of user/project states - Test project state priority over user state - Test empty states delete files - Test delete_state_files functionality --- tests/core/test_rules.py | 118 +++++++++++++++++++++++++++++++++++++-- 1 file changed, 113 insertions(+), 5 deletions(-) diff --git a/tests/core/test_rules.py b/tests/core/test_rules.py index 4b12bca38..f39a99bd3 100644 --- a/tests/core/test_rules.py +++ b/tests/core/test_rules.py @@ -3,14 +3,13 @@ from __future__ import annotations from pathlib import Path -from unittest.mock import AsyncMock, MagicMock, patch +from unittest.mock import MagicMock import pytest -import yaml from kimi_cli.rules.discovery import discover_rule_files, resolve_rules_roots from kimi_cli.rules.injector import RulesInjector -from kimi_cli.rules.models import Rule, RuleMetadata, RuleState, RulesStats +from kimi_cli.rules.models import Rule, RuleMetadata, RuleState from kimi_cli.rules.parser import parse_frontmatter, parse_rule_file, should_apply_rule from kimi_cli.rules.registry import RulesRegistry from kimi_cli.rules.state import RulesStateManager @@ -243,6 +242,115 @@ async def test_clear_states(self, state_manager: RulesStateManager): assert state_manager.get_state("common/style") is None assert state_manager.get_state("python/style") is None + + async def test_level_separation(self, state_manager: RulesStateManager, tmp_path: Path): + """States are saved to separate files based on rule level.""" + from pathlib import Path + + # Create .agents directory for project-level state + agents_dir = Path(tmp_path / ".agents") + agents_dir.mkdir(exist_ok=True) + + # Set states with different levels + builtin_state = RuleState(enabled=True, pinned=True) + user_state = RuleState(enabled=True, pinned=False) + project_state = RuleState(enabled=False, pinned=True) + + state_manager.set_state("builtin/rule", builtin_state, level="builtin") + state_manager.set_state("user/custom", user_state, level="user") + state_manager.set_state("project/local", project_state, level="project") + + await state_manager.save() + + # Verify user-level file contains builtin and user rules + assert state_manager.user_state_path.exists() + user_content = state_manager.user_state_path.read_text() + assert "builtin/rule" in user_content + assert "user/custom" in user_content + assert "project/local" not in user_content # Project rule not in user file + + # Verify project-level file contains only project rules + project_path = tmp_path / ".agents" / "rules.state.toml" + assert project_path.exists() + project_content = project_path.read_text() + assert "project/local" in project_content + assert "builtin/rule" not in project_content # Builtin rule not in project file + assert "user/custom" not in project_content # User rule not in project file + + async def test_project_state_priority(self, state_manager: RulesStateManager, tmp_path: Path): + """Project-level states take precedence over user-level states.""" + from pathlib import Path + + # Create .agents directory + agents_dir = Path(tmp_path / ".agents") + agents_dir.mkdir(exist_ok=True) + + # Set same rule in both levels (user disabled, project enabled) + state_manager.set_state("shared/rule", RuleState(enabled=False), level="user") + state_manager.set_state("shared/rule", RuleState(enabled=True), level="project") + + # Project-level should take precedence + loaded_state = state_manager.get_state("shared/rule") + assert loaded_state is not None + assert loaded_state.enabled is True # Project level wins + + # After clearing project states, should fall back to user state + state_manager.clear_states(level="project") + loaded_state = state_manager.get_state("shared/rule") + assert loaded_state is not None + assert loaded_state.enabled is False # Falls back to user level (disabled) + + async def test_empty_states_delete_files(self, state_manager: RulesStateManager, tmp_path: Path): + """Empty states should delete state files, not leave stale data.""" + from pathlib import Path + + # Create .agents directory for project-level state + agents_dir = Path(tmp_path / ".agents") + agents_dir.mkdir(exist_ok=True) + + # Set some states and save + state_manager.set_state("user/rule", RuleState(enabled=False), level="user") + state_manager.set_state("project/rule", RuleState(enabled=True), level="project") + await state_manager.save() + + # Verify files exist + assert state_manager.user_state_path.exists() + assert (tmp_path / ".agents" / "rules.state.toml").exists() + + # Clear all states and save again + state_manager.clear_states() + await state_manager.save() + + # Files should be deleted (no empty states to persist) + assert not state_manager.user_state_path.exists() + assert not (tmp_path / ".agents" / "rules.state.toml").exists() + + async def test_delete_state_files(self, state_manager: RulesStateManager, tmp_path: Path): + """delete_state_files should remove state files from disk.""" + from pathlib import Path + + # Create .agents directory + agents_dir = Path(tmp_path / ".agents") + agents_dir.mkdir(exist_ok=True) + + # Set states and save + state_manager.set_state("user/rule", RuleState(), level="user") + state_manager.set_state("project/rule", RuleState(), level="project") + await state_manager.save() + + # Verify files exist + assert state_manager.user_state_path.exists() + assert (tmp_path / ".agents" / "rules.state.toml").exists() + + # Delete only user state file + await state_manager.delete_state_files(level="user") + assert not state_manager.user_state_path.exists() + assert (tmp_path / ".agents" / "rules.state.toml").exists() + + # Delete all state files + await state_manager.delete_state_files() + assert not (tmp_path / ".agents" / "rules.state.toml").exists() + class TestRulesRegistry: """Test rules registry.""" @@ -352,8 +460,8 @@ def test_toggle_persists_state(self, registry: RulesRegistry): assert state.pinned is True assert state.last_modified is not None - # Check persistence was called - registry.state_manager.set_state.assert_called_once_with("test/rule", state) + # Check persistence was called with level info + registry.state_manager.set_state.assert_called_once_with("test/rule", state, level="builtin") def test_get_stats(self, registry: RulesRegistry): """Get statistics about rules.""" From 172415ce386f87b3f4abf689169557b5c9b1c376 Mon Sep 17 00:00:00 2001 From: 041 <381151237@qq.com> Date: Tue, 7 Apr 2026 14:14:45 +0800 Subject: [PATCH 6/7] fix(rules): fix critical bugs from PR review MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Fix KaosPath.to_path() → unsafe_to_local_path() (was causing all rule loading to fail) - Fix max_total_size=0 to mean unlimited (was incorrectly falling back to default) - Add max_active_rules enforcement (was not being applied) - Remove /rules from shell mode documentation (command only available in agent mode) --- docs/en/guides/interaction.md | 2 +- docs/en/reference/slash-commands.md | 2 +- docs/zh/guides/interaction.md | 2 +- src/kimi_cli/rules/injector.py | 4 ++-- src/kimi_cli/rules/registry.py | 4 ++-- src/kimi_cli/soul/agent.py | 18 +++++++++++++++--- 6 files changed, 22 insertions(+), 10 deletions(-) diff --git a/docs/en/guides/interaction.md b/docs/en/guides/interaction.md index fb18f2caf..071687245 100644 --- a/docs/en/guides/interaction.md +++ b/docs/en/guides/interaction.md @@ -19,7 +19,7 @@ $ git status $ npm run build ``` -Shell mode also supports some slash commands, including `/help`, `/exit`, `/version`, `/editor`, `/changelog`, `/feedback`, `/export`, `/import`, `/rules`, and `/task`. +Shell mode also supports some slash commands, including `/help`, `/exit`, `/version`, `/editor`, `/changelog`, `/feedback`, `/export`, `/import`, and `/task`. ::: warning Note In shell mode, each command executes independently. Commands that change the environment like `cd` or `export` won't affect subsequent commands. diff --git a/docs/en/reference/slash-commands.md b/docs/en/reference/slash-commands.md index d6b920db3..c96eb9505 100644 --- a/docs/en/reference/slash-commands.md +++ b/docs/en/reference/slash-commands.md @@ -3,7 +3,7 @@ Slash commands are built-in commands for Kimi Code CLI, used to control sessions, configuration, and debugging. Enter a command starting with `/` in the input box to trigger. ::: tip Shell mode -Some slash commands are also available in shell mode, including `/help`, `/exit`, `/version`, `/editor`, `/theme`, `/changelog`, `/feedback`, `/export`, `/import`, `/rules`, and `/task`. +Some slash commands are also available in shell mode, including `/help`, `/exit`, `/version`, `/editor`, `/theme`, `/changelog`, `/feedback`, `/export`, `/import`, and `/task`. ::: ## Help and info diff --git a/docs/zh/guides/interaction.md b/docs/zh/guides/interaction.md index 83bea082a..cedff60ef 100644 --- a/docs/zh/guides/interaction.md +++ b/docs/zh/guides/interaction.md @@ -19,7 +19,7 @@ $ git status $ npm run build ``` -Shell 模式也支持部分斜杠命令,包括 `/help`、`/exit`、`/version`、`/editor`、`/changelog`、`/feedback`、`/export`、`/import`、`/rules` 和 `/task`。 +Shell 模式也支持部分斜杠命令,包括 `/help`、`/exit`、`/version`、`/editor`、`/changelog`、`/feedback`、`/export`、`/import` 和 `/task`。 ::: warning 注意 Shell 模式中每个命令独立执行,`cd`、`export` 等改变环境的命令不会影响后续命令。 diff --git a/src/kimi_cli/rules/injector.py b/src/kimi_cli/rules/injector.py index ce3044028..73be0942c 100644 --- a/src/kimi_cli/rules/injector.py +++ b/src/kimi_cli/rules/injector.py @@ -70,9 +70,9 @@ async def load_active_rules( for rule_file in rule_files: try: rule = parse_rule_file( - rule_file.to_path(), + rule_file.unsafe_to_local_path(), level=level, - rules_root=root.to_path(), + rules_root=root.unsafe_to_local_path(), ) # Skip duplicates (higher priority wins) diff --git a/src/kimi_cli/rules/registry.py b/src/kimi_cli/rules/registry.py index d2b7bb37e..b5f14148a 100644 --- a/src/kimi_cli/rules/registry.py +++ b/src/kimi_cli/rules/registry.py @@ -83,9 +83,9 @@ async def _load_from_root(self, root: KaosPath) -> None: for rule_file in rule_files: try: rule = parse_rule_file( - rule_file.to_path(), + rule_file.unsafe_to_local_path(), level=level, - rules_root=root.to_path(), + rules_root=root.unsafe_to_local_path(), ) # Store rule (overwrites if same ID from lower priority level) diff --git a/src/kimi_cli/soul/agent.py b/src/kimi_cli/soul/agent.py index 823d52776..0f7ff8643 100644 --- a/src/kimi_cli/soul/agent.py +++ b/src/kimi_cli/soul/agent.py @@ -329,13 +329,25 @@ def _on_approval_change() -> None: # Format active rules for system prompt active_rules = rules_registry.get_active_rules() if active_rules: - max_size = config.rules.max_total_size or _RULES_MAX_BYTES + # max_total_size = 0 means unlimited + max_size = ( + _RULES_MAX_BYTES + if config.rules.max_total_size is None + else config.rules.max_total_size + ) + max_rules = config.rules.max_active_rules lines = [] total_size = 0 - for rule in sorted(active_rules, key=lambda r: r.metadata.priority): + for rule_count, rule in enumerate( + sorted(active_rules, key=lambda r: r.metadata.priority) + ): + # Enforce max_active_rules limit + if rule_count >= max_rules: + break section = f"## {rule.name}\n\n{rule.content}\n\n" section_size = len(section.encode("utf-8")) - if total_size + section_size > max_size: + # Enforce max_total_size limit (0 = unlimited) + if max_size > 0 and total_size + section_size > max_size: break lines.append(section) total_size += section_size From 595cdebcf18075a2e649d3766afaa45c75a74639 Mon Sep 17 00:00:00 2001 From: 041 <381151237@qq.com> Date: Tue, 7 Apr 2026 14:43:26 +0800 Subject: [PATCH 7/7] fix(rules): use glob instead of rglob for KaosPath compatibility --- src/kimi_cli/rules/registry.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/kimi_cli/rules/registry.py b/src/kimi_cli/rules/registry.py index b5f14148a..c83daa1ba 100644 --- a/src/kimi_cli/rules/registry.py +++ b/src/kimi_cli/rules/registry.py @@ -185,7 +185,8 @@ async def _any_file_exists(self, pattern: str) -> bool: import fnmatch try: - async for item in self.work_dir.rglob("*"): + # KaosPath has glob() but not rglob(), use **/* for recursive + async for item in self.work_dir.glob("**/*"): if fnmatch.fnmatch(item.name, pattern): return True except Exception: