From e8dc9dcfd1eefe2a808ce7d874de96cab455b27d Mon Sep 17 00:00:00 2001 From: Jason Quesenberry Date: Fri, 18 Jul 2025 11:20:16 -0700 Subject: [PATCH] Adds a new writeme with improved validation and error messaging. --- .tools/readmes/cache.py | 106 ++++++ .tools/readmes/deep_validator.py | 251 +++++++++++++ .tools/readmes/enhanced_validator.py | 417 ++++++++++++++++++++++ .tools/readmes/improved_writeme.py | 452 ++++++++++++++++++++++++ .tools/readmes/parallel.py | 100 ++++++ .tools/readmes/progress.py | 111 ++++++ .tools/readmes/runner.py | 37 +- .tools/readmes/scanner.py | 28 ++ .tools/readmes/test_improved_writeme.py | 71 ++++ .tools/readmes/validator.py | 110 ++++++ .tools/readmes/writeme_improved.sh | 32 ++ 11 files changed, 1713 insertions(+), 2 deletions(-) create mode 100644 .tools/readmes/cache.py create mode 100644 .tools/readmes/deep_validator.py create mode 100644 .tools/readmes/enhanced_validator.py create mode 100644 .tools/readmes/improved_writeme.py create mode 100644 .tools/readmes/parallel.py create mode 100644 .tools/readmes/progress.py create mode 100755 .tools/readmes/test_improved_writeme.py create mode 100644 .tools/readmes/validator.py create mode 100755 .tools/readmes/writeme_improved.sh diff --git a/.tools/readmes/cache.py b/.tools/readmes/cache.py new file mode 100644 index 00000000000..a6b81e08edd --- /dev/null +++ b/.tools/readmes/cache.py @@ -0,0 +1,106 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# SPDX-License-Identifier: Apache-2.0 + +""" +Cache implementation for WRITEME to speed up repeated runs. +""" + +import json +import logging +import os +import pickle +from pathlib import Path +from typing import Any, Dict, Optional + +logger = logging.getLogger(__name__) + +# Cache directory relative to the readmes directory +CACHE_DIR = Path(__file__).parent / ".cache" + + +def get_cache_enabled() -> bool: + """Check if caching is enabled via environment variable.""" + return os.environ.get("USE_METADATA_CACHE", "0") == "1" + + +def ensure_cache_dir() -> None: + """Ensure the cache directory exists.""" + if not CACHE_DIR.exists(): + CACHE_DIR.mkdir(exist_ok=True) + logger.debug(f"Created cache directory: {CACHE_DIR}") + + +def get_cache_path(key: str) -> Path: + """Get the cache file path for a given key.""" + # Create a filename-safe version of the key + safe_key = key.replace("/", "_").replace(":", "_") + return CACHE_DIR / f"{safe_key}.pickle" + + +def save_to_cache(key: str, data: Any) -> bool: + """ + Save data to cache. + + Args: + key: Cache key + data: Data to cache (must be pickle-able) + + Returns: + bool: True if successfully cached, False otherwise + """ + if not get_cache_enabled(): + return False + + try: + ensure_cache_dir() + cache_path = get_cache_path(key) + + with open(cache_path, "wb") as f: + pickle.dump(data, f) + + logger.debug(f"Cached data for key: {key}") + return True + except Exception as e: + logger.warning(f"Failed to cache data for key {key}: {e}") + return False + + +def load_from_cache(key: str) -> Optional[Any]: + """ + Load data from cache. + + Args: + key: Cache key + + Returns: + The cached data or None if not found or caching disabled + """ + if not get_cache_enabled(): + return None + + cache_path = get_cache_path(key) + + if not cache_path.exists(): + return None + + try: + with open(cache_path, "rb") as f: + data = pickle.load(f) + + logger.debug(f"Loaded data from cache for key: {key}") + return data + except Exception as e: + logger.warning(f"Failed to load cache for key {key}: {e}") + return None + + +def clear_cache() -> None: + """Clear all cached data.""" + if CACHE_DIR.exists(): + for cache_file in CACHE_DIR.glob("*.pickle"): + try: + cache_file.unlink() + except Exception as e: + logger.warning(f"Failed to delete cache file {cache_file}: {e}") + + logger.info("Cache cleared") \ No newline at end of file diff --git a/.tools/readmes/deep_validator.py b/.tools/readmes/deep_validator.py new file mode 100644 index 00000000000..c5830686205 --- /dev/null +++ b/.tools/readmes/deep_validator.py @@ -0,0 +1,251 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# SPDX-License-Identifier: Apache-2.0 + +""" +Deep validator module for WRITEME to check for issues in the codebase. +This version performs a more thorough check for duplicate snippet tags by +directly scanning the files in the repository. +""" + +import logging +import os +import re +import concurrent.futures +from collections import defaultdict +from pathlib import Path +from typing import Dict, List, Set, Tuple, Optional, Any + +from aws_doc_sdk_examples_tools.doc_gen import DocGen + +logger = logging.getLogger(__name__) + + +class ValidationError(Exception): + """Exception raised for validation errors.""" + pass + + +def find_snippet_tags_in_file(file_path: Path) -> List[Tuple[str, int]]: + """ + Find all snippet tags in a file by directly parsing the file content. + + Args: + file_path: Path to the file to check + + Returns: + List of tuples containing (tag, line_number) + """ + if not file_path.exists(): + return [] + + try: + with open(file_path, 'r', encoding='utf-8', errors='replace') as f: + lines = f.readlines() + except Exception as e: + logger.warning(f"Error reading file {file_path}: {e}") + return [] + + # Common snippet tag patterns + patterns = [ + # Standard snippet tag format + r'snippet-start:\s*\[([^\]]+)\]', + r'snippet-end:\s*\[([^\]]+)\]', + # Alternative formats + r'SNIPPET\s+START\s+\[([^\]]+)\]', + r'SNIPPET\s+END\s+\[([^\]]+)\]', + r'//\s*SNIPPET:\s*([^\s]+)', + r'#\s*SNIPPET:\s*([^\s]+)', + r'', + # Look for any other potential tag formats + r'snippet[:\-_]([a-zA-Z0-9_\-]+)', + # Common AWS SDK snippet formats + r'//\s*snippet-start:\s*([^\s]+)', + r'#\s*snippet-start:\s*([^\s]+)', + r'', + r'//\s*snippet-end:\s*([^\s]+)', + r'#\s*snippet-end:\s*([^\s]+)', + r'', + ] + + results = [] + for i, line in enumerate(lines, 1): + for pattern in patterns: + matches = re.findall(pattern, line, re.IGNORECASE) + for match in matches: + results.append((match, i)) + + return results + + +def scan_directory_for_snippet_tags( + root_dir: Path, + extensions: Optional[List[str]] = None, + max_workers: int = 10 +) -> Dict[str, List[Tuple[str, int, str]]]: + """ + Scan a directory recursively for files containing snippet tags. + Uses parallel processing for faster scanning. + + Args: + root_dir: Root directory to scan + extensions: Optional list of file extensions to check + max_workers: Maximum number of parallel workers + + Returns: + Dictionary mapping snippet tags to lists of (file_path, line_number, context) + """ + if extensions is None: + # Default extensions to check + extensions = [ + '.py', '.java', '.js', '.ts', '.cs', '.cpp', '.c', '.go', '.rb', + '.php', '.swift', '.kt', '.rs', '.abap', '.md', '.html', '.xml' + ] + + # Find all files with the specified extensions + files_to_scan = [] + for root, _, files in os.walk(root_dir): + for file in files: + if any(file.endswith(ext) for ext in extensions): + files_to_scan.append(Path(root) / file) + + # Process files in parallel + tag_to_locations = defaultdict(list) + + def process_file(file_path): + try: + relative_path = file_path.relative_to(root_dir) + tags = find_snippet_tags_in_file(file_path) + + results = [] + for tag, line_number in tags: + # Get some context from the file + try: + with open(file_path, 'r', encoding='utf-8', errors='replace') as f: + lines = f.readlines() + start_line = max(0, line_number - 2) + end_line = min(len(lines), line_number + 1) + context = ''.join(lines[start_line:end_line]).strip() + except Exception: + context = "" + + results.append((str(relative_path), line_number, context)) + + return {tag: [loc] for tag, line_number in tags for loc in [(str(relative_path), line_number, "")]} + except Exception as e: + logger.warning(f"Error processing file {file_path}: {e}") + return {} + + # Use ThreadPoolExecutor for parallel processing + with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor: + future_to_file = {executor.submit(process_file, file): file for file in files_to_scan} + + for future in concurrent.futures.as_completed(future_to_file): + file_results = future.result() + for tag, locations in file_results.items(): + tag_to_locations[tag].extend(locations) + + return tag_to_locations + + +def check_duplicate_snippet_tags_deep(doc_gen: DocGen) -> List[Tuple[str, List[Dict[str, Any]]]]: + """ + Deep check for duplicate snippet tags in the codebase. + This function scans all files directly to find snippet tags. + + Args: + doc_gen: The DocGen instance containing snippets + + Returns: + List of tuples containing (tag, [location_details]) for duplicate tags + """ + logger.info("Starting deep scan for duplicate snippet tags...") + + # Scan the repository directly for snippet tags + root_dir = doc_gen.root + tag_locations = scan_directory_for_snippet_tags(root_dir) + + # Find tags that appear in multiple files + duplicates = [] + for tag, locations in tag_locations.items(): + # Group locations by file path + files = {} + for file_path, line_number, context in locations: + if file_path not in files: + files[file_path] = [] + files[file_path].append({"line": line_number, "context": context}) + + # If the tag appears in multiple files, it's a duplicate + if len(files) > 1: + duplicate_info = [] + for file_path, occurrences in files.items(): + duplicate_info.append({ + "file": file_path, + "occurrences": occurrences + }) + duplicates.append((tag, duplicate_info)) + + logger.info(f"Deep scan complete. Found {len(duplicates)} duplicate tags.") + return duplicates + + +def format_duplicate_report(duplicates: List[Tuple[str, List[Dict[str, Any]]]]) -> str: + """ + Format a detailed report of duplicate snippet tags. + + Args: + duplicates: List of duplicate tag information + + Returns: + Formatted report as a string + """ + if not duplicates: + return "No duplicate snippet tags found." + + report = [f"Found {len(duplicates)} duplicate snippet tags:"] + + for tag, locations in duplicates: + report.append(f"\nTag: '{tag}' found in {len(locations)} files:") + + for location in locations: + file_path = location["file"] + occurrences = location["occurrences"] + + report.append(f" File: {file_path}") + for occurrence in occurrences: + line = occurrence.get("line", "unknown") + context = occurrence.get("context", "").replace("\n", " ").strip() + if context: + context = f" - Context: {context[:60]}..." + report.append(f" Line {line}{context}") + + return "\n".join(report) + + +def validate_snippets_deep(doc_gen: DocGen, strict: bool = False) -> bool: + """ + Deep validation of snippets in the codebase. + + Args: + doc_gen: The DocGen instance containing snippets + strict: If True, raise an exception for validation errors + + Returns: + True if validation passed, False otherwise + """ + validation_passed = True + + # Check for duplicate snippet tags using the deep method + duplicates = check_duplicate_snippet_tags_deep(doc_gen) + if duplicates: + validation_passed = False + report = format_duplicate_report(duplicates) + print("\n=== DUPLICATE SNIPPET TAGS (DEEP SCAN) ===") + print(report) + + # Exit with error if strict validation is enabled + if strict: + raise ValidationError("Validation failed: duplicate snippet tags found") + else: + print("No duplicate snippet tags found in deep scan.") + + return validation_passed \ No newline at end of file diff --git a/.tools/readmes/enhanced_validator.py b/.tools/readmes/enhanced_validator.py new file mode 100644 index 00000000000..e2c875a88cd --- /dev/null +++ b/.tools/readmes/enhanced_validator.py @@ -0,0 +1,417 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# SPDX-License-Identifier: Apache-2.0 + +""" +Enhanced validator module for WRITEME to check for issues in the codebase. +This version performs comprehensive checks for snippet tag issues: +1. Duplicate snippet tags across files +2. Unpaired snippet-start and snippet-end tags within files +3. Multiple snippet-start or snippet-end tags with the same name within a file +""" + +import logging +import os +import re +from collections import defaultdict +from pathlib import Path +from typing import Dict, List, Set, Tuple, Optional, Any + +from aws_doc_sdk_examples_tools.doc_gen import DocGen + +logger = logging.getLogger(__name__) + + +class ValidationError(Exception): + """Exception raised for validation errors.""" + pass + + +class SnippetTagIssue: + """Class to represent a snippet tag issue.""" + + DUPLICATE_ACROSS_FILES = "duplicate_across_files" + UNPAIRED_TAG = "unpaired_tag" + DUPLICATE_IN_FILE = "duplicate_in_file" + + def __init__(self, issue_type: str, tag: str, locations: List[Dict[str, Any]]): + self.issue_type = issue_type + self.tag = tag + self.locations = locations + + def __str__(self) -> str: + if self.issue_type == self.DUPLICATE_ACROSS_FILES: + files = [loc["file"] for loc in self.locations] + return f"Tag '{self.tag}' found in multiple files: {', '.join(files)}" + elif self.issue_type == self.UNPAIRED_TAG: + details = [] + for loc in self.locations: + file = loc["file"] + tag_type = loc["tag_type"] + line = loc["line"] + details.append(f"{tag_type} at line {line} in {file}") + return f"Unpaired tag '{self.tag}': {', '.join(details)}" + elif self.issue_type == self.DUPLICATE_IN_FILE: + file = self.locations[0]["file"] + lines = [str(loc["line"]) for loc in self.locations] + return f"Multiple instances of tag '{self.tag}' in {file} at lines: {', '.join(lines)}" + else: + return f"Unknown issue with tag '{self.tag}'" + + +def find_snippet_tags_in_file(file_path: Path) -> List[Dict[str, Any]]: + """ + Find all snippet tags in a file by directly parsing the file content. + + Args: + file_path: Path to the file to check + + Returns: + List of dictionaries containing tag information + """ + if not file_path.exists(): + return [] + + try: + with open(file_path, 'r', encoding='utf-8', errors='replace') as f: + content = f.read() + lines = content.splitlines() + except Exception as e: + logger.warning(f"Error reading file {file_path}: {e}") + return [] + + # Patterns for snippet start and end tags + start_patterns = [ + r'snippet-start:\s*\[([^\]]+)\]', + r'SNIPPET\s+START\s+\[([^\]]+)\]', + r'//\s*snippet-start:\s*\[([^\]]+)\]', + r'#\s*snippet-start:\s*\[([^\]]+)\]', + r'', + ] + + end_patterns = [ + r'snippet-end:\s*\[([^\]]+)\]', + r'SNIPPET\s+END\s+\[([^\]]+)\]', + r'//\s*snippet-end:\s*\[([^\]]+)\]', + r'#\s*snippet-end:\s*\[([^\]]+)\]', + r'', + ] + + results = [] + + # Process each line individually to avoid duplicate matches + for i, line in enumerate(lines, 1): + # Check for start tags + for pattern in start_patterns: + matches = re.findall(pattern, line, re.IGNORECASE) + # Only take the first match per line for each pattern to avoid duplicates + if matches: + results.append({ + "tag": matches[0], + "tag_type": "snippet-start", + "line": i, + "content": line.strip() + }) + break # Only process the first matching pattern + + # Check for end tags + for pattern in end_patterns: + matches = re.findall(pattern, line, re.IGNORECASE) + # Only take the first match per line for each pattern to avoid duplicates + if matches: + results.append({ + "tag": matches[0], + "tag_type": "snippet-end", + "line": i, + "content": line.strip() + }) + break # Only process the first matching pattern + + return results + + +def scan_directory_for_snippet_tags( + root_dir: Path, + extensions: Optional[List[str]] = None, + exclude_dirs: Optional[List[str]] = None +) -> Dict[str, List[Dict[str, Any]]]: + """ + Scan a directory recursively for files containing snippet tags. + + Args: + root_dir: Root directory to scan + extensions: Optional list of file extensions to check + exclude_dirs: Optional list of directories to exclude from scanning + + Returns: + Dictionary mapping file paths to lists of tag information + """ + if extensions is None: + # Default extensions to check + extensions = [ + '.py', '.java', '.js', '.ts', '.cs', '.cpp', '.c', '.go', '.rb', + '.php', '.swift', '.kt', '.rs', '.abap', '.md', '.html', '.xml' + ] + + if exclude_dirs is None: + # Default directories to exclude + exclude_dirs = ['.tools', '.git', 'node_modules', 'venv', '.venv'] + + file_tags = {} + + # Walk through the directory + for root, dirs, files in os.walk(root_dir): + # Skip excluded directories + dirs[:] = [d for d in dirs if d not in exclude_dirs] + + for file in files: + # Check if the file has one of the extensions we're interested in + if any(file.endswith(ext) for ext in extensions): + file_path = Path(root) / file + try: + relative_path = str(file_path.relative_to(root_dir)) + + # Skip files in excluded directories + if any(f"/{exclude_dir}/" in f"/{relative_path}/" for exclude_dir in exclude_dirs): + continue + + # Find tags in the file + tags = find_snippet_tags_in_file(file_path) + + if tags: + file_tags[relative_path] = tags + except Exception as e: + logger.warning(f"Error processing file {file_path}: {e}") + + return file_tags + + +def check_for_snippet_tag_issues(file_tags: Dict[str, List[Dict[str, Any]]]) -> List[SnippetTagIssue]: + """ + Check for various snippet tag issues. + + Args: + file_tags: Dictionary mapping file paths to lists of tag information + + Returns: + List of SnippetTagIssue objects + """ + issues = [] + + # Track all unique tags across all files + tag_to_files = defaultdict(list) + + # First pass: collect all tags and check for issues within each file + for file_path, tags in file_tags.items(): + # Group tags by name and type within this file + tags_by_name_and_type = defaultdict(list) + for tag_info in tags: + tag_name = tag_info["tag"] + tag_type = tag_info["tag_type"] + key = f"{tag_name}:{tag_type}" + tags_by_name_and_type[key].append(tag_info) + + # Track which files contain each tag + tag_to_files[tag_name].append({ + "file": file_path, + "line": tag_info["line"], + "tag_type": tag_info["tag_type"], + "content": tag_info["content"] + }) + + # Check for multiple instances of the same tag type within the file + for key, tag_infos in tags_by_name_and_type.items(): + tag_name, tag_type = key.split(":", 1) + + # If there are multiple instances of the same tag type, report it + if len(tag_infos) > 1: + locations = [] + for t in tag_infos: + locations.append({ + "file": file_path, + "line": t["line"], + "tag_type": t["tag_type"], + "content": t["content"] + }) + + issues.append(SnippetTagIssue( + SnippetTagIssue.DUPLICATE_IN_FILE, + f"{tag_name} ({tag_type})", + locations + )) + + # Check for unpaired tags within the file + tags_by_name = defaultdict(list) + for tag_info in tags: + tags_by_name[tag_info["tag"]].append(tag_info) + + for tag_name, tag_infos in tags_by_name.items(): + # Count start and end tags + start_tags = [t for t in tag_infos if t["tag_type"] == "snippet-start"] + end_tags = [t for t in tag_infos if t["tag_type"] == "snippet-end"] + + # Check for unpaired tags (missing start or end) + if len(start_tags) != len(end_tags): + # Create location information + locations = [] + for t in tag_infos: + locations.append({ + "file": file_path, + "line": t["line"], + "tag_type": t["tag_type"], + "content": t["content"] + }) + + issues.append(SnippetTagIssue( + SnippetTagIssue.UNPAIRED_TAG, + tag_name, + locations + )) + + # Second pass: check for tags that appear in multiple files + for tag_name, locations in tag_to_files.items(): + # Group locations by file + files = defaultdict(list) + for loc in locations: + files[loc["file"]].append(loc) + + # If the tag appears in multiple files, it's a duplicate across files + if len(files) > 1: + # Create a simplified location list with just one entry per file + simplified_locations = [] + for file_path, file_locs in files.items(): + # Include the first location in each file + simplified_locations.append({ + "file": file_path, + "line": file_locs[0]["line"], + "tag_type": file_locs[0]["tag_type"], + "content": file_locs[0]["content"] + }) + + issues.append(SnippetTagIssue( + SnippetTagIssue.DUPLICATE_ACROSS_FILES, + tag_name, + simplified_locations + )) + + return issues + + +def validate_snippet_tags(doc_gen: DocGen) -> List[SnippetTagIssue]: + """ + Validate snippet tags in the codebase. + + Args: + doc_gen: The DocGen instance + + Returns: + List of SnippetTagIssue objects + """ + # Scan the repository for snippet tags + root_dir = doc_gen.root + file_tags = scan_directory_for_snippet_tags(root_dir) + + # Check for issues + issues = check_for_snippet_tag_issues(file_tags) + + return issues + + +def format_snippet_tag_issues_report(issues: List[SnippetTagIssue]) -> str: + """ + Format a report of snippet tag issues. + + Args: + issues: List of SnippetTagIssue objects + + Returns: + Formatted report as a string + """ + if not issues: + return "No snippet tag issues found." + + # Group issues by type + issues_by_type = defaultdict(list) + for issue in issues: + issues_by_type[issue.issue_type].append(issue) + + report_lines = [f"Found {len(issues)} snippet tag issues:"] + + # Report duplicate tags across files + if SnippetTagIssue.DUPLICATE_ACROSS_FILES in issues_by_type: + duplicates = issues_by_type[SnippetTagIssue.DUPLICATE_ACROSS_FILES] + report_lines.append(f"\n=== DUPLICATE TAGS ACROSS FILES ({len(duplicates)}) ===") + for issue in duplicates: + report_lines.append(f" {issue}") + + # Report unpaired tags + if SnippetTagIssue.UNPAIRED_TAG in issues_by_type: + unpaired = issues_by_type[SnippetTagIssue.UNPAIRED_TAG] + report_lines.append(f"\n=== UNPAIRED TAGS ({len(unpaired)}) ===") + for issue in unpaired: + report_lines.append(f" {issue}") + + # Report duplicate tags within files + if SnippetTagIssue.DUPLICATE_IN_FILE in issues_by_type: + duplicates_in_file = issues_by_type[SnippetTagIssue.DUPLICATE_IN_FILE] + report_lines.append(f"\n=== DUPLICATE TAGS WITHIN FILES ({len(duplicates_in_file)}) ===") + for issue in duplicates_in_file: + report_lines.append(f" {issue}") + + return "\n".join(report_lines) + + +def check_duplicate_snippet_tags_enhanced(doc_gen: DocGen) -> List[Tuple[str, List[str]]]: + """ + Check for duplicate snippet tags across files. + This is a simplified version that returns data in the format expected by the main script. + + Args: + doc_gen: The DocGen instance + + Returns: + List of tuples containing (tag, [file_paths]) for duplicate tags + """ + issues = validate_snippet_tags(doc_gen) + + # Extract duplicate across files issues + duplicates = [] + for issue in issues: + if issue.issue_type == SnippetTagIssue.DUPLICATE_ACROSS_FILES: + files = [loc["file"] for loc in issue.locations] + duplicates.append((issue.tag, files)) + elif issue.issue_type == SnippetTagIssue.UNPAIRED_TAG: + # Also report unpaired tags as duplicates for the main script + files = [f"{loc['file']} (unpaired {loc['tag_type']} at line {loc['line']})" for loc in issue.locations] + duplicates.append((f"{issue.tag} (unpaired)", files)) + elif issue.issue_type == SnippetTagIssue.DUPLICATE_IN_FILE: + # Also report duplicate tags within files for the main script + file = issue.locations[0]["file"] + lines = [str(loc["line"]) for loc in issue.locations] + duplicates.append((issue.tag, [f"{file} (multiple instances at lines: {', '.join(lines)})"])) + + return duplicates + + +def validate_snippets_enhanced(doc_gen: DocGen, strict: bool = False) -> bool: + """ + Validate snippets in the codebase. + + Args: + doc_gen: The DocGen instance + strict: If True, raise an exception for validation errors + + Returns: + True if validation passed, False otherwise + """ + issues = validate_snippet_tags(doc_gen) + + if issues: + report = format_snippet_tag_issues_report(issues) + print(report) + + if strict: + raise ValidationError("Snippet tag validation failed") + + return False + + return True \ No newline at end of file diff --git a/.tools/readmes/improved_writeme.py b/.tools/readmes/improved_writeme.py new file mode 100644 index 00000000000..0f0bb82ad8e --- /dev/null +++ b/.tools/readmes/improved_writeme.py @@ -0,0 +1,452 @@ +#!/usr/bin/env python3 +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# SPDX-License-Identifier: Apache-2.0 + +""" +WRITEME - AWS SDK Examples README Generator + +This tool generates README.md files for AWS SDK code examples across different +programming languages and services. +""" + +import argparse +import logging +import os +import sys +import time +import re +from pathlib import Path + +# Command line flags +NO_UPDATE_FLAG = "--no-update" +CACHE_FLAG = "--use-cache" + + +def setup_logging(verbose: bool = False) -> None: + """Configure logging based on verbosity level.""" + level = logging.DEBUG if verbose else logging.INFO + logging.basicConfig( + level=level, + format="%(asctime)s - %(levelname)s - %(message)s", + datefmt="%H:%M:%S", + force=True + ) + + +def parse_arguments() -> argparse.Namespace: + """Parse command line arguments with better help messages.""" + parser = argparse.ArgumentParser( + description="Generate README.md files for AWS SDK code examples", + formatter_class=argparse.ArgumentDefaultsHelpFormatter + ) + + # Add language and service options + parser.add_argument( + "--languages", + nargs="+", + default=["all"], + help="Languages to process (e.g. 'Python:3 JavaScript:3' or 'all')" + ) + + parser.add_argument( + "--services", + nargs="+", + default=["all"], + help="Services to process (e.g. 's3 dynamodb' or 'all')" + ) + + # Add operation mode options + parser.add_argument( + "--safe", + action="store_true", + help="Save a backup of the original README files" + ) + + parser.add_argument( + "--dry-run", + action="store_true", + help="Compare current vs generated READMEs without making changes" + ) + + parser.add_argument( + "--check", + action="store_true", + help="Alias for --dry-run" + ) + + parser.add_argument( + "--diff", + action="store_true", + help="Show a diff of READMEs that have changed" + ) + + # Add validation options + parser.add_argument( + "--validate", + action="store_true", + help="Validate snippet tags and other metadata" + ) + + parser.add_argument( + "--strict-validation", + action="store_true", + help="Fail if validation errors are found" + ) + + parser.add_argument( + "--validate-only", + action="store_true", + help="Only run validation, don't generate READMEs" + ) + + parser.add_argument( + "--skip-duplicate-check", + action="store_true", + help="Skip checking for duplicate snippet tags (not recommended)" + ) + + # Add performance options + parser.add_argument( + "--no-update", + action="store_true", + help="Skip updating the environment (for faster development)" + ) + + parser.add_argument( + "--use-cache", + action="store_true", + help="Use cached metadata when available (speeds up repeated runs)" + ) + + # Add output options + parser.add_argument( + "--verbose", + action="store_true", + help="Enable verbose debugging output" + ) + + parser.add_argument( + "--summary", + action="store_true", + help="Show a summary of changes at the end" + ) + + return parser.parse_args() + + +def update_environment() -> None: + """Update the WRITEME environment by installing the package in development mode.""" + from update import update + # The update function already logs a message, so we don't need to log here + update() + + +def show_summary(start_time) -> None: + """Show a summary of the changes made.""" + elapsed = time.time() - start_time + + print("\n=== WRITEME Summary ===") + print(f"Execution time: {elapsed:.2f} seconds") + + +def count_readme_errors_from_output(output: str, args) -> int: + """ + Count the number of README errors from the output. + + Args: + output: The output from the writeme function + args: Command line arguments + + Returns: + The number of README errors + """ + # First check for "Incorrect:" lines (standard format) + if "Incorrect:" in output: + incorrect_lines = [line for line in output.split('\n') if line.strip().startswith("Incorrect:")] + return len(incorrect_lines) + + # If using --diff flag, we need to parse the diff output + if args.diff and "Diff:" in output: + # Use regex to find all unique README identifiers in the diff output + # The pattern looks for lines like "Diff: --- .NET:4:Service.dynamodb/current" + pattern = r"Diff:\s+---\s+([^/]+)/current" + matches = re.findall(pattern, output) + + # If we found matches, return the count of unique identifiers + if matches: + return len(set(matches)) + + # Fallback: count the number of diff sections + # Each diff section starts with "Diff:" and represents one incorrect README + sections = output.split("Diff:") + # First section is before any "Diff:" so skip it + return len(sections) - 1 + + # If we can't find any errors, return 0 + return 0 + + +def main() -> int: + """Main entry point for the WRITEME tool.""" + start_time = time.time() + + # Parse command line arguments + args = parse_arguments() + + # Setup logging + setup_logging(args.verbose) + + # Configure caching if requested + if args.use_cache: + os.environ["USE_METADATA_CACHE"] = "1" + + # Update environment if needed + if not args.no_update: + try: + update_environment() + except Exception as e: + logging.error(f"Failed to update environment: {e}") + return 1 + + # Import DocGen and validator here to avoid circular imports + from aws_doc_sdk_examples_tools.doc_gen import DocGen + + # Try to use the enhanced validator if available + try: + from enhanced_validator import ( + validate_snippets_enhanced as validate_snippets, + check_duplicate_snippet_tags_enhanced as check_duplicate_snippet_tags, + validate_snippet_tags, + format_snippet_tag_issues_report, + ValidationError + ) + logging.info("Using enhanced validator for comprehensive snippet tag checks") + use_enhanced_validator = True + except ImportError: + try: + from validator import validate_snippets, check_duplicate_snippet_tags, ValidationError + logging.info("Using standard validator for snippet tag checks") + use_enhanced_validator = False + except ImportError: + logging.error("Validator module not found. Cannot check for duplicate snippet tags.") + return 1 + + # Load DocGen data + try: + # Try to import cache module + try: + from cache import load_from_cache, save_to_cache + CACHE_AVAILABLE = True + except ImportError: + CACHE_AVAILABLE = False + + # Dummy cache functions if cache module not available + def load_from_cache(key): + return None + + def save_to_cache(key, data): + return False + + # Try to load from cache first + doc_gen = None + if CACHE_AVAILABLE and args.use_cache: + doc_gen = load_from_cache("doc_gen_cache") + if doc_gen: + logging.info("Using cached DocGen data") + + if doc_gen is None: + logging.info("Building DocGen data from scratch") + doc_gen = DocGen.from_root(Path(__file__).parent.parent.parent, incremental=True) + + # Process metadata + for path in (doc_gen.root / ".doc_gen/metadata").glob("*_metadata.yaml"): + doc_gen.process_metadata(path) + + # Collect snippets + doc_gen.collect_snippets() + + # Save to cache if available + if CACHE_AVAILABLE and args.use_cache: + save_to_cache("doc_gen_cache", doc_gen) + except Exception as e: + logging.error(f"Failed to load DocGen data: {e}") + return 1 + + # Store validation results to avoid running validation twice + validation_issues = None + validation_passed = True + error_count = 0 + + # Always check for snippet tag issues unless explicitly skipped + if not args.skip_duplicate_check: + logging.info("Checking for snippet tag issues...") + + if use_enhanced_validator: + # Use the enhanced validator to check for all types of snippet tag issues + validation_issues = validate_snippet_tags(doc_gen) + if validation_issues: + print("\n=== SNIPPET TAG VALIDATION ISSUES ===") + print(format_snippet_tag_issues_report(validation_issues)) + validation_passed = False + error_count = len(validation_issues) + + # Exit with error if strict validation is enabled + if args.strict_validation: + logging.error(f"Validation failed: {error_count} snippet tag issues found") + return error_count + else: + print("No snippet tag issues found.") + else: + # Fall back to the standard validator for duplicate tags only + duplicates = check_duplicate_snippet_tags(doc_gen) + if duplicates: + print("\n=== DUPLICATE SNIPPET TAGS ===") + print(f"Found {len(duplicates)} duplicate snippet tags:") + for tag, files in duplicates: + file_list = ", ".join(files) + print(f" Tag '{tag}' found in multiple files: {file_list}") + validation_passed = False + error_count = len(duplicates) + + # Exit with error if strict validation is enabled + if args.strict_validation: + logging.error(f"Validation failed: {error_count} duplicate snippet tags found") + return error_count + else: + print("No snippet tag issues found.") + + # Run additional validation if requested (but don't repeat snippet tag validation) + if args.validate or args.validate_only: + logging.info("Running additional validation checks...") + + # Only run full validation if we haven't already done so + if args.skip_duplicate_check: + try: + # Run validation and get issues + if use_enhanced_validator: + validation_issues = validate_snippet_tags(doc_gen) + validation_passed = len(validation_issues) == 0 + error_count = len(validation_issues) if validation_issues else 0 + + if not validation_passed: + print("\n=== SNIPPET TAG VALIDATION ISSUES ===") + print(format_snippet_tag_issues_report(validation_issues)) + + if args.strict_validation: + logging.error(f"Validation failed: {error_count} snippet tag issues found") + return error_count + else: + # Standard validator doesn't return issues directly + validation_passed = validate_snippets(doc_gen, False) # Don't raise exception + if not validation_passed and args.strict_validation: + logging.error("Validation failed: snippet tag issues found") + return 1 # Can't get exact count with standard validator + except ValidationError as e: + logging.error(f"Validation error: {e}") + if args.strict_validation: + return 1 + except Exception as e: + logging.error(f"Unexpected error during validation: {e}") + if args.strict_validation: + return 1 + else: + # We've already run validation, just report the status + if not validation_passed: + logging.warning(f"Validation found {error_count} issues (see above)") + if args.strict_validation: + return error_count + else: + print("All validations passed successfully.") + + # Exit if only validation was requested + if args.validate_only: + # Return error count if validation failed, otherwise 0 + return error_count if not validation_passed else 0 + + # Use the original writeme.py approach but with our improved arguments + # This avoids the enum conversion issues + modified_argv = [sys.argv[0]] + + # Add our arguments to the modified argv + if args.languages: + for lang in args.languages: + modified_argv.extend(["--languages", lang]) + + if args.services: + for svc in args.services: + modified_argv.extend(["--services", svc]) + + if args.safe: + modified_argv.append("--safe") + + if args.verbose: + modified_argv.append("--verbose") + + if args.dry_run or args.check: + modified_argv.append("--dry-run") + + if args.diff: + modified_argv.append("--diff") + + # Save original argv + original_argv = sys.argv.copy() + + readme_error_count = 0 + try: + # Replace sys.argv with our modified version + sys.argv = modified_argv + + # Import and run the original writeme function through typer + from typer import run + from runner import writeme + + # Capture stdout to parse for incorrect READMEs + import io + from contextlib import redirect_stdout + + f = io.StringIO() + with redirect_stdout(f): + try: + run(writeme) + result = 0 + except SystemExit as e: + # Capture the exit code from typer + result = e.code + + # Get the output and print it + output = f.getvalue() + print(output) + + # Count README errors from the output + readme_error_count = count_readme_errors_from_output(output, args) + if readme_error_count > 0: + logging.info(f"Found {readme_error_count} incorrect READMEs") + except Exception as e: + logging.error(f"Error running writeme: {e}", exc_info=True) + result = 1 + finally: + # Restore original argv + sys.argv = original_argv + + # Show summary if requested + if args.summary: + show_summary(start_time) + + # Calculate total error count (snippet issues + README errors) + total_error_count = error_count + readme_error_count + + # If we have any errors and we're not in validate-only mode, return the total error count + if total_error_count > 0 and not args.validate_only: + print(f"Found {error_count} snippet issues and {readme_error_count} incorrect READMEs.") + print(f"Returning total error count ({total_error_count}) as exit code.") + return total_error_count + + # Otherwise return the result from the writeme function + return result + + +if __name__ == "__main__": + sys.exit(main()) +else: + from .runner import writeme + main = writeme \ No newline at end of file diff --git a/.tools/readmes/parallel.py b/.tools/readmes/parallel.py new file mode 100644 index 00000000000..4a4d93cb933 --- /dev/null +++ b/.tools/readmes/parallel.py @@ -0,0 +1,100 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# SPDX-License-Identifier: Apache-2.0 + +""" +Parallel processing module for WRITEME to speed up README generation. +""" + +import concurrent.futures +import logging +import os +from typing import Any, Callable, Dict, List, Tuple + +logger = logging.getLogger(__name__) + +# Default to number of CPUs minus 1 (leave one for system) +DEFAULT_WORKERS = max(1, os.cpu_count() - 1) if os.cpu_count() else 2 + + +def process_in_parallel( + func: Callable, + items: List[Tuple], + max_workers: int = DEFAULT_WORKERS, + progress_callback: Callable = None +) -> Dict[str, List[Any]]: + """ + Process items in parallel using a thread pool. + + Args: + func: Function to call for each item + items: List of argument tuples to pass to the function + max_workers: Maximum number of worker threads + progress_callback: Optional callback function to report progress + + Returns: + Dict with categorized results + """ + results = { + "written": [], + "unchanged": [], + "skipped": [], + "failed": [], + "non_writeme": [], + "no_folder": [] + } + + # Use fewer workers if we have fewer items + actual_workers = min(max_workers, len(items)) + + if actual_workers <= 1 or len(items) <= 1: + # For small jobs, just process sequentially + for i, args in enumerate(items): + try: + result = func(*args) + _categorize_result(result, results) + + if progress_callback: + progress_callback(1, f"Processed {i+1}/{len(items)}") + except Exception as e: + logger.error(f"Error processing item {args}: {e}") + results["failed"].append(args) + + if progress_callback: + progress_callback(1, f"Error: {e}") + else: + # Process in parallel for larger jobs + logger.info(f"Processing {len(items)} items with {actual_workers} workers") + + with concurrent.futures.ThreadPoolExecutor(max_workers=actual_workers) as executor: + future_to_args = {executor.submit(func, *args): args for args in items} + + for i, future in enumerate(concurrent.futures.as_completed(future_to_args)): + args = future_to_args[future] + try: + result = future.result() + _categorize_result(result, results) + + if progress_callback: + progress_callback(1, f"Processed {i+1}/{len(items)}") + except Exception as e: + logger.error(f"Error processing item {args}: {e}") + results["failed"].append(args) + + if progress_callback: + progress_callback(1, f"Error: {e}") + + return results + + +def _categorize_result(result, results): + """Categorize a result into the appropriate result list.""" + if result is None: + return + + if isinstance(result, tuple) and len(result) == 2: + category, item = result + if category in results: + results[category].append(item) + elif isinstance(result, str): + # Default to "written" category for string results + results["written"].append(result) \ No newline at end of file diff --git a/.tools/readmes/progress.py b/.tools/readmes/progress.py new file mode 100644 index 00000000000..f9de09276e7 --- /dev/null +++ b/.tools/readmes/progress.py @@ -0,0 +1,111 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# SPDX-License-Identifier: Apache-2.0 + +""" +Progress tracking module for WRITEME to provide better feedback during execution. +""" + +import sys +import time +from typing import Dict, List, Optional, Any + +class ProgressTracker: + """Track and display progress for WRITEME operations.""" + + def __init__(self, total: int = 0, show_spinner: bool = True): + self.total = total + self.current = 0 + self.start_time = time.time() + self.show_spinner = show_spinner + self.spinner_chars = ['⠋', '⠙', '⠹', '⠸', '⠼', '⠴', '⠦', '⠧', '⠇', '⠏'] + self.spinner_index = 0 + self.last_update = 0 + self.results: Dict[str, List[Any]] = { + "written": [], + "unchanged": [], + "skipped": [], + "failed": [], + "non_writeme": [], + "no_folder": [] + } + + def start(self, message: str = "Processing") -> None: + """Start the progress tracking with an initial message.""" + self.start_time = time.time() + self.current = 0 + print(f"{message}...", end="", flush=True) + + def update(self, increment: int = 1, message: Optional[str] = None) -> None: + """Update the progress counter and display.""" + self.current += increment + + # Only update display every 100ms to avoid excessive terminal output + current_time = time.time() + if current_time - self.last_update < 0.1 and self.current < self.total: + return + + self.last_update = current_time + + if self.total > 0: + percentage = min(100, int(100 * self.current / self.total)) + + if self.show_spinner: + spinner = self.spinner_chars[self.spinner_index % len(self.spinner_chars)] + self.spinner_index += 1 + + # Calculate elapsed time and ETA + elapsed = current_time - self.start_time + if self.current > 0: + eta = elapsed * (self.total - self.current) / self.current + eta_str = f"ETA: {int(eta)}s" if eta > 0 else "Done" + else: + eta_str = "Calculating..." + + status = f"\r{spinner} {percentage}% ({self.current}/{self.total}) {eta_str}" + if message: + status += f" - {message}" + + # Clear the line and print the status + print(f"\r{' ' * 80}", end="", flush=True) + print(f"\r{status}", end="", flush=True) + elif message: + # Just show spinner and message if no total is known + if self.show_spinner: + spinner = self.spinner_chars[self.spinner_index % len(self.spinner_chars)] + self.spinner_index += 1 + print(f"\r{' ' * 80}", end="", flush=True) + print(f"\r{spinner} {message}", end="", flush=True) + + def add_result(self, category: str, item: Any) -> None: + """Add an item to a result category.""" + if category in self.results: + self.results[category].append(item) + + def finish(self) -> None: + """Complete the progress tracking and show final status.""" + elapsed = time.time() - self.start_time + print(f"\r{' ' * 80}", end="", flush=True) + print(f"\rCompleted in {elapsed:.2f}s", flush=True) + + def summary(self) -> None: + """Print a summary of the results.""" + print("\n=== WRITEME Summary ===") + print(f"Total time: {time.time() - self.start_time:.2f}s") + + for category, items in self.results.items(): + if items: + print(f"{category.capitalize()}: {len(items)}") + + # Print details for important categories + if self.results["written"]: + print("\nWritten READMEs:") + for item in sorted(self.results["written"]): + print(f" ✓ {item}") + + if self.results["failed"]: + print("\nFailed READMEs:") + for item in sorted(self.results["failed"]): + if isinstance(item, tuple): + print(f" ✗ {item[0]}") + else: + print(f" ✗ {item}") \ No newline at end of file diff --git a/.tools/readmes/runner.py b/.tools/readmes/runner.py index 8de2d1b7aa8..cb944644ad2 100755 --- a/.tools/readmes/runner.py +++ b/.tools/readmes/runner.py @@ -44,8 +44,33 @@ def prepare_scanner(doc_gen: DocGen) -> Optional[Scanner]: return scanner +# Try to import cache module +try: + from cache import load_from_cache, save_to_cache + CACHE_AVAILABLE = True +except ImportError: + CACHE_AVAILABLE = False + + # Dummy cache functions if cache module not available + def load_from_cache(key): + return None + + def save_to_cache(key, data): + return False + # Load all examples immediately for cross references. Trades correctness for speed. -doc_gen = DocGen.from_root(Path(__file__).parent.parent.parent, incremental=True) +# Try to load from cache first +doc_gen = None +if CACHE_AVAILABLE: + doc_gen = load_from_cache("doc_gen_cache") + if doc_gen: + logging.info("Using cached DocGen data") + +if doc_gen is None: + logging.info("Building DocGen data from scratch") + doc_gen = DocGen.from_root(Path(__file__).parent.parent.parent, incremental=True) + if CACHE_AVAILABLE: + save_to_cache("doc_gen_cache", doc_gen) Language = Enum( @@ -116,6 +141,14 @@ def writeme( non_writeme = [] unchanged = [] no_folder = [] + + # Try to use progress tracking if available + try: + from progress import ProgressTracker + progress = ProgressTracker() + use_progress = True + except ImportError: + use_progress = False scanner = prepare_scanner(doc_gen) if scanner is None: @@ -200,4 +233,4 @@ def make_diff(renderer, id): current = renderer.read_current().split("\n") expected = renderer.readme_text.split("\n") diff = unified_diff(current, expected, f"{id}/current", f"{id}/expected") - return "\n".join(diff) + return "\n".join(diff) \ No newline at end of file diff --git a/.tools/readmes/scanner.py b/.tools/readmes/scanner.py index 84001469378..5d26841aeb3 100644 --- a/.tools/readmes/scanner.py +++ b/.tools/readmes/scanner.py @@ -13,6 +13,20 @@ from aws_doc_sdk_examples_tools.sdks import Sdk from aws_doc_sdk_examples_tools.services import Service +# Import cache module if available +try: + from cache import load_from_cache, save_to_cache + CACHE_AVAILABLE = True +except ImportError: + CACHE_AVAILABLE = False + + # Dummy cache functions if cache module not available + def load_from_cache(key): + return None + + def save_to_cache(key, data): + return False + logger = logging.getLogger(__name__) T = TypeVar("T") @@ -45,6 +59,17 @@ def load_crosses(self): ) def _build_examples(self): + # Try to load examples from cache first + cache_key = "examples_cache" + cached_examples = load_from_cache(cache_key) + + if cached_examples: + logger.info("Using cached examples data") + self.examples = cached_examples + return + + # Build examples from scratch if not in cache + logger.info("Building examples from scratch") self.examples = defaultdict(list) for example in self.doc_gen.examples.values(): for lang_name, language in example.languages.items(): @@ -53,6 +78,9 @@ def _build_examples(self): self.examples[ f"{lang_name}:{sdk_version.sdk_version}:{svc_name}" ].append(example) + + # Save to cache for future runs + save_to_cache(cache_key, self.examples) def _example_key(self): return f"{self.lang_name}:{self.sdk_ver}:{self.svc_name}" diff --git a/.tools/readmes/test_improved_writeme.py b/.tools/readmes/test_improved_writeme.py new file mode 100755 index 00000000000..923a25c9308 --- /dev/null +++ b/.tools/readmes/test_improved_writeme.py @@ -0,0 +1,71 @@ +#!/usr/bin/env python3 +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# SPDX-License-Identifier: Apache-2.0 + +""" +Test script for the improved writeme.py +""" + +import os +import sys +import subprocess +from pathlib import Path + +def run_test(args, expected_success=True): + """Run a test with the given arguments and check if it succeeds.""" + script_path = Path(__file__).parent / "improved_writeme.py" + + # Add --no-update to speed up tests + if "--no-update" not in args: + args.append("--no-update") + + # Add --dry-run to avoid making changes + if "--dry-run" not in args: + args.append("--dry-run") + + cmd = [sys.executable, str(script_path)] + args + print(f"Running: {' '.join(cmd)}") + + try: + result = subprocess.run(cmd, capture_output=True, text=True) + success = result.returncode == 0 + + if success == expected_success: + print(f"✅ Test passed: {' '.join(args)}") + return True + else: + print(f"❌ Test failed: {' '.join(args)}") + print(f"Exit code: {result.returncode}") + print(f"Output: {result.stdout}") + print(f"Error: {result.stderr}") + return False + except Exception as e: + print(f"❌ Test error: {e}") + return False + +def main(): + """Run tests for the improved writeme.py.""" + tests = [ + # Basic tests + (["--languages", "all", "--services", "all"], True), + (["--languages", "Python:3", "--services", "s3"], True), + (["--languages", "Python:3", "--services", "s3", "--diff"], True), + + # Performance options + (["--use-cache"], True), + + # Output options + (["--verbose"], True), + (["--summary"], True), + ] + + failures = 0 + for args, expected_success in tests: + if not run_test(args, expected_success): + failures += 1 + + print(f"\nTests completed: {len(tests) - failures} passed, {failures} failed") + return failures + +if __name__ == "__main__": + sys.exit(main()) \ No newline at end of file diff --git a/.tools/readmes/validator.py b/.tools/readmes/validator.py new file mode 100644 index 00000000000..11304d04786 --- /dev/null +++ b/.tools/readmes/validator.py @@ -0,0 +1,110 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# SPDX-License-Identifier: Apache-2.0 + +""" +Validator module for WRITEME to check for issues in the codebase. +""" + +import logging +from collections import defaultdict +from typing import Dict, List, Set, Tuple + +from aws_doc_sdk_examples_tools.doc_gen import DocGen + +logger = logging.getLogger(__name__) + + +class ValidationError(Exception): + """Exception raised for validation errors.""" + pass + + +def check_duplicate_snippet_tags(doc_gen: DocGen) -> List[Tuple[str, List[str]]]: + """ + Check for duplicate snippet tags in the codebase. + + Args: + doc_gen: The DocGen instance containing snippets + + Returns: + List of tuples containing (tag, [file_paths]) for duplicate tags + """ + # Dictionary to store tag -> list of files + tag_files: Dict[str, List[str]] = defaultdict(list) + duplicates = [] + + # Collect all tags and their file locations + for tag, snippet in doc_gen.snippets.items(): + tag_files[tag].append(snippet.file) + + # Find tags that appear in multiple files + for tag, files in tag_files.items(): + if len(files) > 1: + duplicates.append((tag, files)) + + return duplicates + + +def check_missing_snippet_tags(doc_gen: DocGen) -> List[Tuple[str, str]]: + """ + Check for snippet tags referenced in metadata but not found in code. + + Args: + doc_gen: The DocGen instance containing snippets and examples + + Returns: + List of tuples containing (tag, example_id) for missing tags + """ + missing = [] + + # Get all available tags + available_tags = set(doc_gen.snippets.keys()) + + # Check all examples for referenced tags that don't exist + for example_id, example in doc_gen.examples.items(): + for lang_name, language in example.languages.items(): + for version in language.versions: + if version.excerpts: + for excerpt in version.excerpts: + if excerpt.snippet_tags: + for tag in excerpt.snippet_tags: + if tag not in available_tags: + missing.append((tag, example_id)) + + return missing + + +def validate_snippets(doc_gen: DocGen, strict: bool = False) -> bool: + """ + Validate snippets in the codebase. + + Args: + doc_gen: The DocGen instance containing snippets + strict: If True, raise an exception for validation errors + + Returns: + True if validation passed, False otherwise + """ + validation_passed = True + + # Check for duplicate snippet tags + duplicates = check_duplicate_snippet_tags(doc_gen) + if duplicates: + validation_passed = False + logger.error("Found %d duplicate snippet tags:", len(duplicates)) + for tag, files in duplicates: + file_list = ", ".join(files) + logger.error(" Tag '%s' found in multiple files: %s", tag, file_list) + + # Check for missing snippet tags + missing = check_missing_snippet_tags(doc_gen) + if missing: + validation_passed = False + logger.error("Found %d missing snippet tags:", len(missing)) + for tag, example_id in missing: + logger.error(" Tag '%s' referenced in example '%s' but not found in code", tag, example_id) + + if not validation_passed and strict: + raise ValidationError("Snippet validation failed") + + return validation_passed \ No newline at end of file diff --git a/.tools/readmes/writeme_improved.sh b/.tools/readmes/writeme_improved.sh new file mode 100755 index 00000000000..34d2fa89545 --- /dev/null +++ b/.tools/readmes/writeme_improved.sh @@ -0,0 +1,32 @@ +#!/bin/bash +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# SPDX-License-Identifier: Apache-2.0 + +# Simple wrapper script to run the improved writeme.py + +# Get the directory of this script +SCRIPT_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )" + +# Check if Python is available +if ! command -v python &> /dev/null; then + echo "Error: Python is not installed or not in PATH" + exit 1 +fi + +# Check if the improved_writeme.py file exists +if [ ! -f "$SCRIPT_DIR/improved_writeme.py" ]; then + echo "Error: improved_writeme.py not found in $SCRIPT_DIR" + exit 1 +fi + +# Run the improved writeme.py script +echo "Running improved WRITEME..." +python "$SCRIPT_DIR/improved_writeme.py" "$@" +exit_code=$? + +# Return the exit code from the Python script +if [ $exit_code -ne 0 ]; then + echo "Error: improved_writeme.py exited with code $exit_code" +fi + +exit $exit_code \ No newline at end of file