diff --git a/.gitignore b/.gitignore index a6d692b..252e2ee 100644 --- a/.gitignore +++ b/.gitignore @@ -17,3 +17,6 @@ coverage.html .claude *.tar + +# Generated schema files +/schemas diff --git a/Taskfile.yml b/Taskfile.yml index 2d99945..29bfb75 100644 --- a/Taskfile.yml +++ b/Taskfile.yml @@ -92,11 +92,37 @@ tasks: - task: test test:crawl: + desc: Crawl component definitions and validate results cmds: + # Setup - nats kv add TMP > /dev/null - defer: nats kv del --force TMP + + # Pre-crawl validation + - task: validate + - echo "Pre-crawl count:" && find .connect -name '*.yml' -not -path '.connect/runtime.yml' | wc -l + + # Run crawler - cd ../connect-node && go run ./cmd/connect-node crawl ../connect-runtime-wombat --bucket=TMP + # Post-crawl validation + - echo "Validating crawled data..." + - nats kv ls TMP --no-headers || true + - | + # Verify all components were crawled + CRAWLED_COUNT=$(nats kv ls TMP --no-headers 2>/dev/null | wc -l) + SOURCE_COUNT=$(find .connect -name '*.yml' -not -path '.connect/runtime.yml' | wc -l) + echo "Crawled: $CRAWLED_COUNT entries, Expected: $SOURCE_COUNT entries" + if [ "$CRAWLED_COUNT" -eq 0 ]; then + echo "ERROR: No entries found in KV bucket" + exit 1 + fi + if [ "$CRAWLED_COUNT" -lt "$SOURCE_COUNT" ]; then + echo "WARNING: Some components may not have been crawled" + echo "This could be expected if some files are not component specs" + fi + - echo "✓ Crawl validation complete" + validate: cmds: - curl --silent https://raw.githubusercontent.com/synadia-io/connect/refs/heads/main/model/schemas/component-spec-v1.schema.json -O @@ -110,6 +136,12 @@ tasks: - curl --silent https://raw.githubusercontent.com/synadia-io/connect/refs/heads/main/model/schemas/component-spec-v1.schema.json -O - npx --yes ajv-cli validate -s component-spec-v1.schema.json -d "{{.DIR}}/*/*.yml" --verbose + test:spec:drift: + desc: Validate .connect specs against upstream Benthos schemas + cmds: + - echo "Running spec drift validation..." + - go run ./test/cmd/spec-drift/ + docker:local: desc: Build Docker image locally with version information vars: diff --git a/test/cmd/spec-drift/extract.go b/test/cmd/spec-drift/extract.go new file mode 100644 index 0000000..677a6ca --- /dev/null +++ b/test/cmd/spec-drift/extract.go @@ -0,0 +1,201 @@ +package main + +import ( + "encoding/json" + "fmt" + "os" + "path/filepath" + "strings" + + "github.com/redpanda-data/benthos/v4/public/service" + _ "github.com/wombatwisdom/wombat/public/components/all" +) + +func extract() (string, error) { + fmt.Println("Extracting schemas from Benthos components...") + + env := service.GlobalEnvironment() + + schemasDir, err := os.MkdirTemp("", "wombat-schema-") + if err != nil { + fmt.Printf("❌Error creating temp dir for schemas: %v\n", err) + os.Exit(1) + } else { + fmt.Printf("Using dir: %v\n", schemasDir) + + } + + var outputCount int + env.WalkOutputs(func(name string, config *service.ConfigView) { + outputCount++ + if err := extractAndSaveComponentSchema(name, "output", config, schemasDir); err != nil { + fmt.Printf("Error extracting schema for output %s: %v\n", name, err) + return + } + fmt.Printf("✓ Extracted schema for output: %s\n", name) + }) + fmt.Printf("Found %d output components\n", outputCount) + + var inputCount int + env.WalkInputs(func(name string, config *service.ConfigView) { + inputCount++ + if err := extractAndSaveComponentSchema(name, "input", config, schemasDir); err != nil { + fmt.Printf("Error extracting schema for input %s: %v\n", name, err) + return + } + fmt.Printf("✓ Extracted schema for input: %s\n", name) + }) + fmt.Printf("Found %d input components\n", inputCount) + + // Extract processor specs + var processorCount int + env.WalkProcessors(func(name string, config *service.ConfigView) { + processorCount++ + if err := extractAndSaveComponentSchema(name, "processor", config, schemasDir); err != nil { + fmt.Printf("Error extracting schema for processor %s: %v\n", name, err) + return + } + fmt.Printf("✓ Extracted schema for processor: %s\n", name) + }) + fmt.Printf("Found %d processor components\n", processorCount) + + fmt.Printf("\nSchema extraction completed. Files saved to %s/\n", schemasDir) + return schemasDir, nil +} + +func extractAndSaveComponentSchema(name, componentType string, spec *service.ConfigView, schemasDir string) error { + // Get template data which contains the structured field information + templateData, err := spec.TemplateData() + if err != nil { + return fmt.Errorf("failed to get template data: %w", err) + } + + // Convert template data to our schema format + schema := ComponentSchema{ + Name: name, + Type: componentType, + Fields: convertTemplateFieldsToSchema(templateData.Fields), + } + + // Save to JSON file + filename := filepath.Join(schemasDir, fmt.Sprintf("%s_%s.json", componentType, name)) + file, err := os.Create(filename) + if err != nil { + return fmt.Errorf("failed to create file %s: %w", filename, err) + } + defer func() { + if err := file.Close(); err != nil { + fmt.Printf("Warning: failed to close file %s: %v\n", filename, err) + } + }() + + encoder := json.NewEncoder(file) + encoder.SetIndent("", " ") + if err := encoder.Encode(schema); err != nil { + return fmt.Errorf("failed to encode schema: %w", err) + } + + return nil +} + +func convertTemplateFieldsToSchema(templateFields []service.TemplateDataPluginField) []FieldSchema { + var fields []FieldSchema + + for _, tf := range templateFields { + field := FieldSchema{ + Name: extractFieldName(tf.FullName), + FullName: tf.FullName, + Type: tf.Type, + Description: tf.Description, + Required: tf.DefaultMarshalled == "", // Simple heuristic + } + + // Parse default value if available + if tf.DefaultMarshalled != "" { + var defaultVal interface{} + if err := json.Unmarshal([]byte(tf.DefaultMarshalled), &defaultVal); err == nil { + field.Default = defaultVal + } + } + + fields = append(fields, field) + } + + // Build hierarchical structure from flat list + return buildFieldHierarchy(fields) +} + +func extractFieldName(fullName string) string { + // Handle array notation: "tls.client_certs[].cert" -> "cert" + // Remove array notation first, then extract field name + cleanPath := strings.ReplaceAll(fullName, "[]", "") + + parts := []rune(cleanPath) + lastDot := -1 + + for i := len(parts) - 1; i >= 0; i-- { + if parts[i] == '.' { + lastDot = i + break + } + } + + if lastDot == -1 { + return cleanPath + } + + return string(parts[lastDot+1:]) +} + +func buildFieldHierarchy(flatFields []FieldSchema) []FieldSchema { + // Group fields by their parent path + fieldMap := make(map[string][]FieldSchema) + rootFields := []FieldSchema{} + + for _, field := range flatFields { + // Determine parent path + parentPath := getParentPath(field.FullName) + + if parentPath == "" { + // Root level field + rootFields = append(rootFields, field) + } else { + // Child field + fieldMap[parentPath] = append(fieldMap[parentPath], field) + } + } + + // Recursively build hierarchy + return attachChildren(rootFields, fieldMap) +} + +func getParentPath(fullName string) string { + // Handle array notation: "tls.client_certs[].cert" -> "tls.client_certs" + // Remove array notation first, then find parent + cleanPath := strings.ReplaceAll(fullName, "[]", "") + + parts := []rune(cleanPath) + lastDot := -1 + + for i := len(parts) - 1; i >= 0; i-- { + if parts[i] == '.' { + lastDot = i + break + } + } + + if lastDot == -1 { + return "" + } + + return string(parts[:lastDot]) +} + +func attachChildren(fields []FieldSchema, fieldMap map[string][]FieldSchema) []FieldSchema { + for i := range fields { + if children, exists := fieldMap[fields[i].FullName]; exists { + fields[i].Children = attachChildren(children, fieldMap) + } + } + return fields +} diff --git a/test/cmd/spec-drift/main.go b/test/cmd/spec-drift/main.go new file mode 100644 index 0000000..ff77460 --- /dev/null +++ b/test/cmd/spec-drift/main.go @@ -0,0 +1,42 @@ +package main + +import ( + "fmt" + "os" +) + +func main() { + run(".connect") +} + +func run(dir string) { + schemaDir, err := extract() + if err != nil { + fmt.Fprintf(os.Stderr, "Error during extract: %v\n", err) + os.Exit(1) + } + + if err := validate(dir, schemaDir); err != nil { + fmt.Fprintf(os.Stderr, "Error during validate: %v\n", err) + } + + if err := clean(schemaDir); err != nil { + fmt.Fprintf(os.Stderr, "Error cleaning schemas: %v\n", err) + os.Exit(1) + } else { + fmt.Fprintf(os.Stderr, "Removed extracted schemas from: %v\n", schemaDir) + } + +} + +func clean(schemasDir string) error { + if _, err := os.Stat(schemasDir); os.IsNotExist(err) { + return nil + } + + if err := os.RemoveAll(schemasDir); err != nil { + return fmt.Errorf("failed to remove schemas directory: %w", err) + } + + return nil +} diff --git a/test/cmd/spec-drift/types.go b/test/cmd/spec-drift/types.go new file mode 100644 index 0000000..e37926b --- /dev/null +++ b/test/cmd/spec-drift/types.go @@ -0,0 +1,51 @@ +package main + +// ComponentSchema represents the extracted schema structure +type ComponentSchema struct { + Name string `json:"name"` + Type string `json:"type"` + Fields []FieldSchema `json:"fields"` +} + +// FieldSchema represents a field in the component configuration +type FieldSchema struct { + Name string `json:"name"` + Type string `json:"type"` + Description string `json:"description"` + Default interface{} `json:"default,omitempty"` + Required bool `json:"required"` + Children []FieldSchema `json:"children,omitempty"` + FullName string `json:"full_name"` +} + +// ConnectSpec represents our .connect YAML specification +type ConnectSpec struct { + Name string `yaml:"name"` + Type string `yaml:"type"` + Summary string `yaml:"summary,omitempty"` + Description string `yaml:"description,omitempty"` + Fields []ConnectField `yaml:"fields"` +} + +// ConnectField represents a field in our .connect specification +type ConnectField struct { + Path string `yaml:"path"` + Kind string `yaml:"kind,omitempty"` + Type string `yaml:"type,omitempty"` + Description string `yaml:"description,omitempty"` + Default interface{} `yaml:"default,omitempty"` + Fields []ConnectField `yaml:"fields,omitempty"` +} + +// ValidationResult tracks validation issues +type ValidationResult struct { + ComponentName string + ComponentType string + Issues []ValidationIssue +} + +type ValidationIssue struct { + Severity string // "error", "warning", "info" + Path string + Message string +} diff --git a/test/cmd/spec-drift/validate.go b/test/cmd/spec-drift/validate.go new file mode 100644 index 0000000..f01d8e4 --- /dev/null +++ b/test/cmd/spec-drift/validate.go @@ -0,0 +1,388 @@ +package main + +import ( + "encoding/json" + "fmt" + "os" + "path/filepath" + "strings" + + "gopkg.in/yaml.v3" +) + +func validate(connectPath, schemasDir string) error { + fmt.Println("Validating .connect specs against extracted Benthos schemas...") + + var allResults []ValidationResult + hasErrors := false + + // Validate sinks (outputs) + sinkResults, err := validateComponentType(connectPath, "sinks", schemasDir, "output") + if err != nil { + return fmt.Errorf("error validating sinks: %w", err) + } + allResults = append(allResults, sinkResults...) + + // Validate sources (inputs) + sourceResults, err := validateComponentType(connectPath, "sources", schemasDir, "input") + if err != nil { + return fmt.Errorf("error validating sources: %w", err) + } + allResults = append(allResults, sourceResults...) + + // Validate processors + processorResults, err := validateComponentType(connectPath, "processors", schemasDir, "processor") + if err != nil { + return fmt.Errorf("error validating processors: %w", err) + } + allResults = append(allResults, processorResults...) + + hasErrors = outputText(allResults) + if hasErrors { + return fmt.Errorf("validation failed with errors") + } + return nil +} + +func validateComponentType(connectBasePath, connectDir, schemasDir, schemaType string) ([]ValidationResult, error) { + var results []ValidationResult + + fullConnectPath := filepath.Join(connectBasePath, connectDir) + + // Check if directory exists + if _, err := os.Stat(fullConnectPath); os.IsNotExist(err) { + fmt.Printf("Directory %s does not exist, skipping...\n", fullConnectPath) + return results, nil + } + + // Read all YAML files in the directory + files, err := filepath.Glob(filepath.Join(fullConnectPath, "*.yml")) + if err != nil { + return nil, fmt.Errorf("failed to glob YAML files in %s: %w", fullConnectPath, err) + } + + for _, file := range files { + result, err := validateSingleComponent(file, schemasDir, schemaType) + if err != nil { + fmt.Printf("Warning: failed to validate %s: %v\n", file, err) + continue + } + results = append(results, result) + } + + return results, nil +} + +func validateSingleComponent(connectFile, schemasDir, schemaType string) (ValidationResult, error) { + // Load .connect spec + connectSpec, err := loadConnectSpec(connectFile) + if err != nil { + return ValidationResult{}, fmt.Errorf("failed to load connect spec: %w", err) + } + + result := ValidationResult{ + ComponentName: connectSpec.Name, + ComponentType: schemaType, + Issues: []ValidationIssue{}, + } + + // Load corresponding Benthos schema + schemaFile := filepath.Join(schemasDir, fmt.Sprintf("%s_%s.json", schemaType, connectSpec.Name)) + benthos, err := loadBenthosSchema(schemaFile) + if err != nil { + result.Issues = append(result.Issues, ValidationIssue{ + Severity: "warning", + Path: "", + Message: fmt.Sprintf("No Benthos schema found at %s", schemaFile), + }) + return result, nil + } + + // Validate structure + result.Issues = append(result.Issues, validateFields(connectSpec.Fields, benthos.Fields, "")...) + + return result, nil +} + +func loadConnectSpec(filename string) (*ConnectSpec, error) { + data, err := os.ReadFile(filename) + if err != nil { + return nil, err + } + + var spec ConnectSpec + if err := yaml.Unmarshal(data, &spec); err != nil { + return nil, err + } + + return &spec, nil +} + +func loadBenthosSchema(filename string) (*ComponentSchema, error) { + data, err := os.ReadFile(filename) + if err != nil { + return nil, err + } + + var schema ComponentSchema + if err := json.Unmarshal(data, &schema); err != nil { + return nil, err + } + + return &schema, nil +} + +func validateFields(connectFields []ConnectField, benthosFields []FieldSchema, parentPath string) []ValidationIssue { + var issues []ValidationIssue + + // Build maps for easier lookup + connectMap := make(map[string]ConnectField) + benthosMap := make(map[string]FieldSchema) + + for _, cf := range connectFields { + connectMap[cf.Path] = cf + } + + for _, bf := range benthosFields { + benthosMap[bf.Name] = bf + } + + // Check for OAuth-specific structural issues + issues = append(issues, checkOAuthStructure(connectFields, parentPath)...) + + // Check each field in our connect spec + for _, connectField := range connectFields { + fieldPath := buildFieldPath(parentPath, connectField.Path) + + // Try multiple matching strategies for better field resolution + benthosField, exists := findMatchingBenthosField(connectField, benthosFields, parentPath) + if !exists { + fieldName := extractLastPathComponent(connectField.Path) + issues = append(issues, ValidationIssue{ + Severity: "warning", + Path: fieldPath, + Message: fmt.Sprintf("Field not found in Benthos schema: %s", fieldName), + }) + continue + } + + // Validate field type if specified + if connectField.Type != "" { + expectedType := mapConnectTypeToBenthos(connectField) + if expectedType != benthosField.Type { + issues = append(issues, ValidationIssue{ + Severity: "error", + Path: fieldPath, + Message: fmt.Sprintf("Type mismatch: connect=%s, benthos=%s", expectedType, benthosField.Type), + }) + } + } + + // Recursively validate children + if len(connectField.Fields) > 0 { + issues = append(issues, validateFields(connectField.Fields, benthosField.Children, fieldPath)...) + } + } + + // Check for missing required fields in our spec + for _, benthosField := range benthosFields { + if benthosField.Required { + // Try to find matching Connect field by checking both field name and full path + fieldPath := buildFieldPath(parentPath, benthosField.Name) + found := false + + // Check if field exists in connectMap by name (for root level fields) + if _, exists := connectMap[benthosField.Name]; exists { + found = true + } + + // Check if field exists by full path (for nested fields) + if !found { + for connectPath := range connectMap { + if extractLastPathComponent(connectPath) == benthosField.Name { + found = true + break + } + } + } + + if !found { + issues = append(issues, ValidationIssue{ + Severity: "warning", + Path: fieldPath, + Message: "Required field missing from connect spec", + }) + } + } + } + + return issues +} + +func checkOAuthStructure(connectFields []ConnectField, parentPath string) []ValidationIssue { + var issues []ValidationIssue + + // Look for OAuth structure issues + for _, field := range connectFields { + if field.Path == "oauth" { + // Check if OAuth2 fields are incorrectly nested under OAuth1 + oauth2Fields := []string{"client_key", "client_secret", "scopes", "token_url"} + + for _, childField := range field.Fields { + // Check if OAuth2 fields appear as direct children of oauth + for _, oauth2Field := range oauth2Fields { + if strings.HasPrefix(childField.Path, "oauth2."+oauth2Field) { + fieldPath := buildFieldPath(parentPath, field.Path+"."+childField.Path) + issues = append(issues, ValidationIssue{ + Severity: "error", + Path: fieldPath, + Message: fmt.Sprintf("OAuth2 field '%s' incorrectly nested under OAuth1 - should be at top level under separate oauth2 section", oauth2Field), + }) + } + } + } + } + } + + return issues +} + +func buildFieldPath(parent, child string) string { + if parent == "" { + return child + } + return parent + "." + child +} + +func extractLastPathComponent(path string) string { + // For paths like "tls.enabled", return "enabled" + // For simple paths like "oauth", return "oauth" + parts := strings.Split(path, ".") + return parts[len(parts)-1] +} + +// findMatchingBenthosField uses multiple strategies to find the best matching Benthos field +// for a given Connect field, handling nested paths and different naming conventions +func findMatchingBenthosField(connectField ConnectField, benthosFields []FieldSchema, parentPath string) (FieldSchema, bool) { + connectFieldName := extractLastPathComponent(connectField.Path) + fullConnectPath := buildFieldPath(parentPath, connectField.Path) + + // Strategy 1: Direct name match (most common case) + for _, bf := range benthosFields { + if bf.Name == connectFieldName { + return bf, true + } + } + + // Strategy 2: Match against full path for nested fields + for _, bf := range benthosFields { + if bf.FullName == fullConnectPath || bf.FullName == connectField.Path { + return bf, true + } + } + + // Strategy 3: Handle array notation differences - Connect uses "field" while Benthos may use "field[]" + cleanConnectPath := strings.ReplaceAll(connectField.Path, "[]", "") + for _, bf := range benthosFields { + cleanBenthosPath := strings.ReplaceAll(bf.FullName, "[]", "") + if cleanBenthosPath == cleanConnectPath { + return bf, true + } + } + + // Strategy 4: Partial path matching for complex nested structures + for _, bf := range benthosFields { + if strings.HasSuffix(bf.FullName, connectField.Path) || strings.HasSuffix(connectField.Path, bf.Name) { + return bf, true + } + } + + return FieldSchema{}, false +} + +func mapConnectTypeToBenthos(field ConnectField) string { + // Handle .connect kind + type combinations and map to Benthos types + switch { + case field.Kind == "list": + // kind: list means array type in Benthos + return "array" + case field.Kind == "map": + // kind: map means object type in Benthos + return "object" + case field.Kind == "scalar": + // kind: scalar uses the type directly + return field.Type + case field.Type == "expression": + // Connect "expression" type maps to Benthos "string" - expressions are stored as strings + // This is a semantic enhancement: Connect explicitly labels expression-capable fields + return "string" + default: + // No kind specified, use type directly + return field.Type + } +} + +func getIssueIcon(severity string) string { + switch severity { + case "error": + return "🔴" + case "warning": + return "🟡" + case "info": + return "🔵" + default: + return "⚪" + } +} + +func outputText(allResults []ValidationResult) bool { + hasErrors := false + + // Report results + fmt.Printf("\n=== VALIDATION RESULTS ===\n\n") + + for _, result := range allResults { + if len(result.Issues) == 0 { + fmt.Printf("✅ %s/%s: OK\n", result.ComponentType, result.ComponentName) + continue + } + + fmt.Printf("❌ %s/%s: %d issues found\n", result.ComponentType, result.ComponentName, len(result.Issues)) + + for _, issue := range result.Issues { + icon := getIssueIcon(issue.Severity) + if issue.Severity == "error" { + hasErrors = true + } + fmt.Printf(" %s [%s] %s: %s\n", icon, strings.ToUpper(issue.Severity), issue.Path, issue.Message) + } + fmt.Println() + } + + // Summary + totalComponents := len(allResults) + componentsWithIssues := 0 + totalIssues := 0 + + for _, result := range allResults { + if len(result.Issues) > 0 { + componentsWithIssues++ + } + totalIssues += len(result.Issues) + } + + fmt.Printf("=== SUMMARY ===\n") + fmt.Printf("Total components validated: %d\n", totalComponents) + fmt.Printf("Components with issues: %d\n", componentsWithIssues) + fmt.Printf("Total issues found: %d\n", totalIssues) + + if hasErrors { + fmt.Printf("\n❌ Validation failed with errors\n") + } else if totalIssues > 0 { + fmt.Printf("\n⚠️ Validation completed with warnings\n") + } else { + fmt.Printf("\n✅ All validations passed\n") + } + + return hasErrors +}