From 5f15993ac5686f73d9a789c3a5637dd5955cf3ef Mon Sep 17 00:00:00 2001 From: Matthias De Vriendt Date: Wed, 24 Sep 2025 13:09:20 +0200 Subject: [PATCH 1/4] attempt at spec validation --- Taskfile.yml | 34 +++ tools/extract-schemas.go | 216 +++++++++++++++ tools/validate-schemas.go | 541 ++++++++++++++++++++++++++++++++++++++ 3 files changed, 791 insertions(+) create mode 100644 tools/extract-schemas.go create mode 100644 tools/validate-schemas.go diff --git a/Taskfile.yml b/Taskfile.yml index 2d99945..ec6521c 100644 --- a/Taskfile.yml +++ b/Taskfile.yml @@ -92,11 +92,37 @@ tasks: - task: test test:crawl: + desc: Crawl component definitions and validate results cmds: + # Setup - nats kv add TMP > /dev/null - defer: nats kv del --force TMP + + # Pre-crawl validation + - task: validate + - echo "Pre-crawl count:" && find .connect -name '*.yml' -not -path '.connect/runtime.yml' | wc -l + + # Run crawler - cd ../connect-node && go run ./cmd/connect-node crawl ../connect-runtime-wombat --bucket=TMP + # Post-crawl validation + - echo "Validating crawled data..." + - nats kv ls TMP --no-headers || true + - | + # Verify all components were crawled + CRAWLED_COUNT=$(nats kv ls TMP --no-headers 2>/dev/null | wc -l) + SOURCE_COUNT=$(find .connect -name '*.yml' -not -path '.connect/runtime.yml' | wc -l) + echo "Crawled: $CRAWLED_COUNT entries, Expected: $SOURCE_COUNT entries" + if [ "$CRAWLED_COUNT" -eq 0 ]; then + echo "ERROR: No entries found in KV bucket" + exit 1 + fi + if [ "$CRAWLED_COUNT" -lt "$SOURCE_COUNT" ]; then + echo "WARNING: Some components may not have been crawled" + echo "This could be expected if some files are not component specs" + fi + - echo "✓ Crawl validation complete" + validate: cmds: - curl --silent https://raw.githubusercontent.com/synadia-io/connect/refs/heads/main/model/schemas/component-spec-v1.schema.json -O @@ -110,6 +136,14 @@ tasks: - curl --silent https://raw.githubusercontent.com/synadia-io/connect/refs/heads/main/model/schemas/component-spec-v1.schema.json -O - npx --yes ajv-cli validate -s component-spec-v1.schema.json -d "{{.DIR}}/*/*.yml" --verbose + test:spec: + desc: Validate .connect specs against upstream Benthos schemas + cmds: + - echo "Extracting schemas from Benthos dependencies..." + - go run ./tools/extract-schemas.go + - echo "Validating .connect specs against extracted schemas..." + - go run ./tools/validate-schemas.go + docker:local: desc: Build Docker image locally with version information vars: diff --git a/tools/extract-schemas.go b/tools/extract-schemas.go new file mode 100644 index 0000000..6ef34d6 --- /dev/null +++ b/tools/extract-schemas.go @@ -0,0 +1,216 @@ +package main + +import ( + "encoding/json" + "fmt" + "os" + "path/filepath" + "strings" + + // Import components to register them + _ "github.com/wombatwisdom/wombat/public/components/all" + "github.com/redpanda-data/benthos/v4/public/service" +) + +// ComponentSchema represents the extracted schema structure +type ComponentSchema struct { + Name string `json:"name"` + Type string `json:"type"` + Fields []FieldSchema `json:"fields"` +} + +// FieldSchema represents a field in the component configuration +type FieldSchema struct { + Name string `json:"name"` + Type string `json:"type"` + Description string `json:"description"` + Default interface{} `json:"default,omitempty"` + Required bool `json:"required"` + Children []FieldSchema `json:"children,omitempty"` + FullName string `json:"full_name"` +} + +func main() { + fmt.Println("Extracting schemas from Benthos components...") + + // Get the global environment with all registered components + env := service.GlobalEnvironment() + + // Create schemas directory + schemasDir := "schemas" + if err := os.MkdirAll(schemasDir, 0755); err != nil { + fmt.Printf("Error creating schemas directory: %v\n", err) + os.Exit(1) + } + + // Extract output specs (sinks) + var outputCount int + env.WalkOutputs(func(name string, config *service.ConfigView) { + outputCount++ + if err := extractAndSaveComponentSchema(name, "output", config, schemasDir); err != nil { + fmt.Printf("Error extracting schema for output %s: %v\n", name, err) + return + } + fmt.Printf("✓ Extracted schema for output: %s\n", name) + }) + fmt.Printf("Found %d output components\n", outputCount) + + // Extract input specs (sources) + var inputCount int + env.WalkInputs(func(name string, config *service.ConfigView) { + inputCount++ + if err := extractAndSaveComponentSchema(name, "input", config, schemasDir); err != nil { + fmt.Printf("Error extracting schema for input %s: %v\n", name, err) + return + } + fmt.Printf("✓ Extracted schema for input: %s\n", name) + }) + fmt.Printf("Found %d input components\n", inputCount) + + // Extract processor specs + var processorCount int + env.WalkProcessors(func(name string, config *service.ConfigView) { + processorCount++ + if err := extractAndSaveComponentSchema(name, "processor", config, schemasDir); err != nil { + fmt.Printf("Error extracting schema for processor %s: %v\n", name, err) + return + } + fmt.Printf("✓ Extracted schema for processor: %s\n", name) + }) + fmt.Printf("Found %d processor components\n", processorCount) + + fmt.Printf("\nSchema extraction completed. Files saved to %s/\n", schemasDir) +} + +func extractAndSaveComponentSchema(name, componentType string, spec *service.ConfigView, schemasDir string) error { + // Get template data which contains the structured field information + templateData, err := spec.TemplateData() + if err != nil { + return fmt.Errorf("failed to get template data: %w", err) + } + + // Convert template data to our schema format + schema := ComponentSchema{ + Name: name, + Type: componentType, + Fields: convertTemplateFieldsToSchema(templateData.Fields), + } + + // Save to JSON file + filename := filepath.Join(schemasDir, fmt.Sprintf("%s_%s.json", componentType, name)) + file, err := os.Create(filename) + if err != nil { + return fmt.Errorf("failed to create file %s: %w", filename, err) + } + defer file.Close() + + encoder := json.NewEncoder(file) + encoder.SetIndent("", " ") + if err := encoder.Encode(schema); err != nil { + return fmt.Errorf("failed to encode schema: %w", err) + } + + return nil +} + +func convertTemplateFieldsToSchema(templateFields []service.TemplateDataPluginField) []FieldSchema { + var fields []FieldSchema + + for _, tf := range templateFields { + field := FieldSchema{ + Name: extractFieldName(tf.FullName), + FullName: tf.FullName, + Type: tf.Type, + Description: tf.Description, + Required: tf.DefaultMarshalled == "", // Simple heuristic + } + + // Parse default value if available + if tf.DefaultMarshalled != "" { + var defaultVal interface{} + if err := json.Unmarshal([]byte(tf.DefaultMarshalled), &defaultVal); err == nil { + field.Default = defaultVal + } + } + + fields = append(fields, field) + } + + // Build hierarchical structure from flat list + return buildFieldHierarchy(fields) +} + +func extractFieldName(fullName string) string { + // Handle array notation: "tls.client_certs[].cert" -> "cert" + // Remove array notation first, then extract field name + cleanPath := strings.Replace(fullName, "[]", "", -1) + + parts := []rune(cleanPath) + lastDot := -1 + + for i := len(parts) - 1; i >= 0; i-- { + if parts[i] == '.' { + lastDot = i + break + } + } + + if lastDot == -1 { + return cleanPath + } + + return string(parts[lastDot+1:]) +} + +func buildFieldHierarchy(flatFields []FieldSchema) []FieldSchema { + // Group fields by their parent path + fieldMap := make(map[string][]FieldSchema) + rootFields := []FieldSchema{} + + for _, field := range flatFields { + // Determine parent path + parentPath := getParentPath(field.FullName) + + if parentPath == "" { + // Root level field + rootFields = append(rootFields, field) + } else { + // Child field + fieldMap[parentPath] = append(fieldMap[parentPath], field) + } + } + + // Recursively build hierarchy + return attachChildren(rootFields, fieldMap) +} + +func getParentPath(fullName string) string { + // Handle array notation: "tls.client_certs[].cert" -> "tls.client_certs" + // Remove array notation first, then find parent + cleanPath := strings.Replace(fullName, "[]", "", -1) + + parts := []rune(cleanPath) + lastDot := -1 + + for i := len(parts) - 1; i >= 0; i-- { + if parts[i] == '.' { + lastDot = i + break + } + } + + if lastDot == -1 { + return "" + } + + return string(parts[:lastDot]) +} + +func attachChildren(fields []FieldSchema, fieldMap map[string][]FieldSchema) []FieldSchema { + for i := range fields { + if children, exists := fieldMap[fields[i].FullName]; exists { + fields[i].Children = attachChildren(children, fieldMap) + } + } + return fields +} \ No newline at end of file diff --git a/tools/validate-schemas.go b/tools/validate-schemas.go new file mode 100644 index 0000000..4e820e5 --- /dev/null +++ b/tools/validate-schemas.go @@ -0,0 +1,541 @@ +package main + +import ( + "encoding/json" + "fmt" + "os" + "path/filepath" + "strings" + + "gopkg.in/yaml.v3" +) + +// Field patterns to ignore during validation - vendor-specific and advanced features +var ignoredFieldPatterns = []string{ + // Synadia NATS-specific authentication extensions + //"auth.nkey_file", + //"auth.nkey", + //"auth.user_credentials_file", + // + //// Advanced/experimental features outside Connect Runtime scope + //"inject_tracing_map", + //"extract_tracing_map", + // + //// Enterprise features + //"batching.processors", // Complex batching processors + // + //// TLS configurations - Connect Runtime uses simplified TLS configuration + //// Many components have their own TLS implementation that differs from Benthos + //"tls", // General TLS config for NATS, MQTT, Pulsar, etc. + //"tls.enabled", + //"tls.skip_cert_verify", + //"tls.enable_renegotiation", + //"tls.root_cas", + //"tls.client_certs", + //"tls.client_certs[].cert", + //"tls.client_certs[].key", + //"tls.client_certs[].password", + // + //// Complex batching and database-specific configurations + //"write_concern", // MongoDB write concern - advanced configuration + //"write_concern.w", + //"write_concern.j", + //"write_concern.w_timeout", + //"batching", // Complex batching configurations not exposed in Connect + //"batching.count", + //"batching.byte_size", + //"batching.period", + //"batching.check", + // + //// Vendor-specific extensions not in base Benthos schema + //// AWS-specific SASL extensions + //"sasl[].aws.region", + //"sasl[].aws.endpoint", + //"sasl[].aws.credentials", + //"region", // AWS region fields + //"endpoint", // AWS endpoint fields + //"credentials", // AWS credentials fields + // + //// Azure-specific configurations + //"targets_input", // Azure blob storage specific + //"endpoint", // Azure endpoint configurations + //"account_key", // Azure account key + // + //// BigQuery-specific fields + //"prefix", // GCP BigQuery prefix + //"suffix", // GCP BigQuery suffix + // + //// GCP PubSub specific + //"create_subscription", // GCP PubSub subscription creation + // + //// Enterprise and advanced features not exposed in Connect Runtime + //"jwt", // JWT authentication - advanced feature + //"jwt.enabled", + //"jwt.private_key_file", + //"jwt.signing_method", + //"jwt.claims", + //"rate_limit", // Rate limiting - advanced feature + //"rate_limit.count", + //"rate_limit.interval", + // + //// Component-specific edge cases + //// AMQP queue declaration differences between Connect and Benthos + //"queue.queue_declare.enabled", + //"queue.queue_declare.durable", + //"queue.queue_declare.auto_delete", + // + //// HTTP client OAuth endpoint parameters not in base schema + //"oauth.oauth2.endpoint_params", + //"oauth2.endpoint_params", + // + //// Additional AWS and GCP specific advanced configurations + //"backoff", // AWS backoff configuration - advanced feature + //"backoff.initial_interval", + //"backoff.max_interval", + //"backoff.max_elapsed_time", + //"csv", // GCP BigQuery CSV format - advanced feature + //"csv.header", + //"csv.delimiter", + //"flow_control", // GCP PubSub flow control - advanced feature + //"flow_control.max_messages", + //"flow_control.max_bytes", +} + +// ConnectSpec represents our .connect YAML specification +type ConnectSpec struct { + Name string `yaml:"name"` + Type string `yaml:"type"` + Summary string `yaml:"summary,omitempty"` + Description string `yaml:"description,omitempty"` + Fields []ConnectField `yaml:"fields"` +} + +// ConnectField represents a field in our .connect specification +type ConnectField struct { + Path string `yaml:"path"` + Kind string `yaml:"kind,omitempty"` + Type string `yaml:"type,omitempty"` + Description string `yaml:"description,omitempty"` + Default interface{} `yaml:"default,omitempty"` + Fields []ConnectField `yaml:"fields,omitempty"` +} + +// ComponentSchema matches the structure from extract-schemas.go +type ComponentSchema struct { + Name string `json:"name"` + Type string `json:"type"` + Fields []FieldSchema `json:"fields"` +} + +type FieldSchema struct { + Name string `json:"name"` + Type string `json:"type"` + Description string `json:"description"` + Default interface{} `json:"default,omitempty"` + Required bool `json:"required"` + Children []FieldSchema `json:"children,omitempty"` + FullName string `json:"full_name"` +} + +// ValidationResult tracks validation issues +type ValidationResult struct { + ComponentName string + ComponentType string + Issues []ValidationIssue +} + +type ValidationIssue struct { + Severity string // "error", "warning", "info" + Path string + Message string +} + +func main() { + fmt.Println("Validating .connect specs against extracted Benthos schemas...") + + var allResults []ValidationResult + hasErrors := false + + // Validate sinks (outputs) + sinkResults, err := validateComponentType("sinks", "output") + if err != nil { + fmt.Printf("Error validating sinks: %v\n", err) + os.Exit(1) + } + allResults = append(allResults, sinkResults...) + + // Validate sources (inputs) + sourceResults, err := validateComponentType("sources", "input") + if err != nil { + fmt.Printf("Error validating sources: %v\n", err) + os.Exit(1) + } + allResults = append(allResults, sourceResults...) + + // Validate processors + processorResults, err := validateComponentType("processors", "processor") + if err != nil { + fmt.Printf("Error validating processors: %v\n", err) + os.Exit(1) + } + allResults = append(allResults, processorResults...) + + // Report results + fmt.Printf("\n=== VALIDATION RESULTS ===\n\n") + + for _, result := range allResults { + if len(result.Issues) == 0 { + fmt.Printf("✅ %s/%s: OK\n", result.ComponentType, result.ComponentName) + continue + } + + fmt.Printf("❌ %s/%s: %d issues found\n", result.ComponentType, result.ComponentName, len(result.Issues)) + + for _, issue := range result.Issues { + icon := getIssueIcon(issue.Severity) + if issue.Severity == "error" { + hasErrors = true + } + fmt.Printf(" %s [%s] %s: %s\n", icon, strings.ToUpper(issue.Severity), issue.Path, issue.Message) + } + fmt.Println() + } + + // Summary + totalComponents := len(allResults) + componentsWithIssues := 0 + totalIssues := 0 + + for _, result := range allResults { + if len(result.Issues) > 0 { + componentsWithIssues++ + } + totalIssues += len(result.Issues) + } + + fmt.Printf("=== SUMMARY ===\n") + fmt.Printf("Total components validated: %d\n", totalComponents) + fmt.Printf("Components with issues: %d\n", componentsWithIssues) + fmt.Printf("Total issues found: %d\n", totalIssues) + + if hasErrors { + fmt.Printf("\n❌ Validation failed with errors\n") + os.Exit(1) + } else if totalIssues > 0 { + fmt.Printf("\n⚠️ Validation completed with warnings\n") + } else { + fmt.Printf("\n✅ All validations passed\n") + } +} + +func validateComponentType(connectDir, schemaType string) ([]ValidationResult, error) { + var results []ValidationResult + + connectPath := filepath.Join(".connect", connectDir) + + // Check if directory exists + if _, err := os.Stat(connectPath); os.IsNotExist(err) { + fmt.Printf("Directory %s does not exist, skipping...\n", connectPath) + return results, nil + } + + // Read all YAML files in the directory + files, err := filepath.Glob(filepath.Join(connectPath, "*.yml")) + if err != nil { + return nil, fmt.Errorf("failed to glob YAML files in %s: %w", connectPath, err) + } + + for _, file := range files { + result, err := validateSingleComponent(file, schemaType) + if err != nil { + fmt.Printf("Warning: failed to validate %s: %v\n", file, err) + continue + } + results = append(results, result) + } + + return results, nil +} + +func validateSingleComponent(connectFile, schemaType string) (ValidationResult, error) { + // Load .connect spec + connectSpec, err := loadConnectSpec(connectFile) + if err != nil { + return ValidationResult{}, fmt.Errorf("failed to load connect spec: %w", err) + } + + result := ValidationResult{ + ComponentName: connectSpec.Name, + ComponentType: schemaType, + Issues: []ValidationIssue{}, + } + + // Load corresponding Benthos schema + schemaFile := filepath.Join("schemas", fmt.Sprintf("%s_%s.json", schemaType, connectSpec.Name)) + benthos, err := loadBenthosSchema(schemaFile) + if err != nil { + result.Issues = append(result.Issues, ValidationIssue{ + Severity: "warning", + Path: "", + Message: fmt.Sprintf("No Benthos schema found at %s", schemaFile), + }) + return result, nil + } + + // Validate structure + result.Issues = append(result.Issues, validateFields(connectSpec.Fields, benthos.Fields, "")...) + + return result, nil +} + +func loadConnectSpec(filename string) (*ConnectSpec, error) { + data, err := os.ReadFile(filename) + if err != nil { + return nil, err + } + + var spec ConnectSpec + if err := yaml.Unmarshal(data, &spec); err != nil { + return nil, err + } + + return &spec, nil +} + +func loadBenthosSchema(filename string) (*ComponentSchema, error) { + data, err := os.ReadFile(filename) + if err != nil { + return nil, err + } + + var schema ComponentSchema + if err := json.Unmarshal(data, &schema); err != nil { + return nil, err + } + + return &schema, nil +} + +func validateFields(connectFields []ConnectField, benthosFields []FieldSchema, parentPath string) []ValidationIssue { + var issues []ValidationIssue + + // Build maps for easier lookup + connectMap := make(map[string]ConnectField) + benthosMap := make(map[string]FieldSchema) + + for _, cf := range connectFields { + connectMap[cf.Path] = cf + } + + for _, bf := range benthosFields { + benthosMap[bf.Name] = bf + } + + // Check for OAuth-specific structural issues + issues = append(issues, checkOAuthStructure(connectFields, parentPath)...) + + // Check each field in our connect spec + for _, connectField := range connectFields { + fieldPath := buildFieldPath(parentPath, connectField.Path) + + // Try multiple matching strategies for better field resolution + benthosField, exists := findMatchingBenthosField(connectField, benthosFields, parentPath) + if !exists { + // Check if this field should be ignored (vendor-specific, advanced features, etc.) + if shouldIgnoreField(connectField.Path) || shouldIgnoreField(fieldPath) { + continue + } + + fieldName := extractLastPathComponent(connectField.Path) + issues = append(issues, ValidationIssue{ + Severity: "warning", + Path: fieldPath, + Message: fmt.Sprintf("Field not found in Benthos schema: %s", fieldName), + }) + continue + } + + // Validate field type if specified + if connectField.Type != "" { + expectedType := mapConnectTypeToBenthos(connectField) + if expectedType != benthosField.Type { + issues = append(issues, ValidationIssue{ + Severity: "error", + Path: fieldPath, + Message: fmt.Sprintf("Type mismatch: connect=%s, benthos=%s", expectedType, benthosField.Type), + }) + } + } + + // Recursively validate children + if len(connectField.Fields) > 0 { + issues = append(issues, validateFields(connectField.Fields, benthosField.Children, fieldPath)...) + } + } + + // Check for missing required fields in our spec + for _, benthosField := range benthosFields { + if benthosField.Required { + // Try to find matching Connect field by checking both field name and full path + fieldPath := buildFieldPath(parentPath, benthosField.Name) + found := false + + // Check if field exists in connectMap by name (for root level fields) + if _, exists := connectMap[benthosField.Name]; exists { + found = true + } + + // Check if field exists by full path (for nested fields) + if !found { + for connectPath := range connectMap { + if extractLastPathComponent(connectPath) == benthosField.Name { + found = true + break + } + } + } + + if !found { + // Skip ignored vendor-specific and advanced fields + if shouldIgnoreField(benthosField.FullName) { + continue + } + + issues = append(issues, ValidationIssue{ + Severity: "warning", + Path: fieldPath, + Message: "Required field missing from connect spec", + }) + } + } + } + + return issues +} + +func checkOAuthStructure(connectFields []ConnectField, parentPath string) []ValidationIssue { + var issues []ValidationIssue + + // Look for OAuth structure issues + for _, field := range connectFields { + if field.Path == "oauth" { + // Check if OAuth2 fields are incorrectly nested under OAuth1 + oauth2Fields := []string{"client_key", "client_secret", "scopes", "token_url"} + + for _, childField := range field.Fields { + // Check if OAuth2 fields appear as direct children of oauth + for _, oauth2Field := range oauth2Fields { + if strings.HasPrefix(childField.Path, "oauth2."+oauth2Field) { + fieldPath := buildFieldPath(parentPath, field.Path+"."+childField.Path) + issues = append(issues, ValidationIssue{ + Severity: "error", + Path: fieldPath, + Message: fmt.Sprintf("OAuth2 field '%s' incorrectly nested under OAuth1 - should be at top level under separate oauth2 section", oauth2Field), + }) + } + } + } + } + } + + return issues +} + +func buildFieldPath(parent, child string) string { + if parent == "" { + return child + } + return parent + "." + child +} + +func extractLastPathComponent(path string) string { + // For paths like "tls.enabled", return "enabled" + // For simple paths like "oauth", return "oauth" + parts := strings.Split(path, ".") + return parts[len(parts)-1] +} + +// findMatchingBenthosField uses multiple strategies to find the best matching Benthos field +// for a given Connect field, handling nested paths and different naming conventions +func findMatchingBenthosField(connectField ConnectField, benthosFields []FieldSchema, parentPath string) (FieldSchema, bool) { + connectFieldName := extractLastPathComponent(connectField.Path) + fullConnectPath := buildFieldPath(parentPath, connectField.Path) + + // Strategy 1: Direct name match (most common case) + for _, bf := range benthosFields { + if bf.Name == connectFieldName { + return bf, true + } + } + + // Strategy 2: Match against full path for nested fields + for _, bf := range benthosFields { + if bf.FullName == fullConnectPath || bf.FullName == connectField.Path { + return bf, true + } + } + + // Strategy 3: Handle array notation differences - Connect uses "field" while Benthos may use "field[]" + cleanConnectPath := strings.Replace(connectField.Path, "[]", "", -1) + for _, bf := range benthosFields { + cleanBenthosPath := strings.Replace(bf.FullName, "[]", "", -1) + if cleanBenthosPath == cleanConnectPath { + return bf, true + } + } + + // Strategy 4: Partial path matching for complex nested structures + for _, bf := range benthosFields { + if strings.HasSuffix(bf.FullName, connectField.Path) || strings.HasSuffix(connectField.Path, bf.Name) { + return bf, true + } + } + + return FieldSchema{}, false +} + +func mapConnectTypeToBenthos(field ConnectField) string { + // Handle .connect kind + type combinations and map to Benthos types + switch { + case field.Kind == "list": + // kind: list means array type in Benthos + return "array" + case field.Kind == "map": + // kind: map means object type in Benthos + return "object" + case field.Kind == "scalar": + // kind: scalar uses the type directly + return field.Type + case field.Type == "expression": + // Connect "expression" type maps to Benthos "string" - expressions are stored as strings + // This is a semantic enhancement: Connect explicitly labels expression-capable fields + return "string" + default: + // No kind specified, use type directly + return field.Type + } +} + +// shouldIgnoreField checks if a field should be ignored during validation +// Returns true for vendor-specific extensions and advanced features outside Connect Runtime scope +func shouldIgnoreField(fullName string) bool { + for _, pattern := range ignoredFieldPatterns { + if fullName == pattern { + return true + } + } + return false +} + +func getIssueIcon(severity string) string { + switch severity { + case "error": + return "🔴" + case "warning": + return "🟡" + case "info": + return "🔵" + default: + return "⚪" + } +} From ccb2e25791a02f514c1b53356de3f97726f770cf Mon Sep 17 00:00:00 2001 From: Matthias De Vriendt Date: Wed, 24 Sep 2025 22:09:29 +0200 Subject: [PATCH 2/4] lint --- Taskfile.yml | 4 +- tools/{extract-schemas.go => extract/main.go} | 44 +++++++------------ tools/shared/types.go | 19 ++++++++ .../{validate-schemas.go => validate/main.go} | 34 ++++---------- 4 files changed, 46 insertions(+), 55 deletions(-) rename tools/{extract-schemas.go => extract/main.go} (80%) create mode 100644 tools/shared/types.go rename tools/{validate-schemas.go => validate/main.go} (93%) diff --git a/Taskfile.yml b/Taskfile.yml index ec6521c..f7b0f01 100644 --- a/Taskfile.yml +++ b/Taskfile.yml @@ -140,9 +140,9 @@ tasks: desc: Validate .connect specs against upstream Benthos schemas cmds: - echo "Extracting schemas from Benthos dependencies..." - - go run ./tools/extract-schemas.go + - go run ./tools/extract/ - echo "Validating .connect specs against extracted schemas..." - - go run ./tools/validate-schemas.go + - go run ./tools/validate/ docker:local: desc: Build Docker image locally with version information diff --git a/tools/extract-schemas.go b/tools/extract/main.go similarity index 80% rename from tools/extract-schemas.go rename to tools/extract/main.go index 6ef34d6..b10e6e5 100644 --- a/tools/extract-schemas.go +++ b/tools/extract/main.go @@ -10,25 +10,9 @@ import ( // Import components to register them _ "github.com/wombatwisdom/wombat/public/components/all" "github.com/redpanda-data/benthos/v4/public/service" + "github.com/synadia-io/connect-runtime-wombat/tools/shared" ) -// ComponentSchema represents the extracted schema structure -type ComponentSchema struct { - Name string `json:"name"` - Type string `json:"type"` - Fields []FieldSchema `json:"fields"` -} - -// FieldSchema represents a field in the component configuration -type FieldSchema struct { - Name string `json:"name"` - Type string `json:"type"` - Description string `json:"description"` - Default interface{} `json:"default,omitempty"` - Required bool `json:"required"` - Children []FieldSchema `json:"children,omitempty"` - FullName string `json:"full_name"` -} func main() { fmt.Println("Extracting schemas from Benthos components...") @@ -90,7 +74,7 @@ func extractAndSaveComponentSchema(name, componentType string, spec *service.Con } // Convert template data to our schema format - schema := ComponentSchema{ + schema := shared.ComponentSchema{ Name: name, Type: componentType, Fields: convertTemplateFieldsToSchema(templateData.Fields), @@ -102,7 +86,11 @@ func extractAndSaveComponentSchema(name, componentType string, spec *service.Con if err != nil { return fmt.Errorf("failed to create file %s: %w", filename, err) } - defer file.Close() + defer func() { + if err := file.Close(); err != nil { + fmt.Printf("Warning: failed to close file %s: %v\n", filename, err) + } + }() encoder := json.NewEncoder(file) encoder.SetIndent("", " ") @@ -113,11 +101,11 @@ func extractAndSaveComponentSchema(name, componentType string, spec *service.Con return nil } -func convertTemplateFieldsToSchema(templateFields []service.TemplateDataPluginField) []FieldSchema { - var fields []FieldSchema +func convertTemplateFieldsToSchema(templateFields []service.TemplateDataPluginField) []shared.FieldSchema { + var fields []shared.FieldSchema for _, tf := range templateFields { - field := FieldSchema{ + field := shared.FieldSchema{ Name: extractFieldName(tf.FullName), FullName: tf.FullName, Type: tf.Type, @@ -143,7 +131,7 @@ func convertTemplateFieldsToSchema(templateFields []service.TemplateDataPluginFi func extractFieldName(fullName string) string { // Handle array notation: "tls.client_certs[].cert" -> "cert" // Remove array notation first, then extract field name - cleanPath := strings.Replace(fullName, "[]", "", -1) + cleanPath := strings.ReplaceAll(fullName, "[]", "") parts := []rune(cleanPath) lastDot := -1 @@ -162,10 +150,10 @@ func extractFieldName(fullName string) string { return string(parts[lastDot+1:]) } -func buildFieldHierarchy(flatFields []FieldSchema) []FieldSchema { +func buildFieldHierarchy(flatFields []shared.FieldSchema) []shared.FieldSchema { // Group fields by their parent path - fieldMap := make(map[string][]FieldSchema) - rootFields := []FieldSchema{} + fieldMap := make(map[string][]shared.FieldSchema) + rootFields := []shared.FieldSchema{} for _, field := range flatFields { // Determine parent path @@ -187,7 +175,7 @@ func buildFieldHierarchy(flatFields []FieldSchema) []FieldSchema { func getParentPath(fullName string) string { // Handle array notation: "tls.client_certs[].cert" -> "tls.client_certs" // Remove array notation first, then find parent - cleanPath := strings.Replace(fullName, "[]", "", -1) + cleanPath := strings.ReplaceAll(fullName, "[]", "") parts := []rune(cleanPath) lastDot := -1 @@ -206,7 +194,7 @@ func getParentPath(fullName string) string { return string(parts[:lastDot]) } -func attachChildren(fields []FieldSchema, fieldMap map[string][]FieldSchema) []FieldSchema { +func attachChildren(fields []shared.FieldSchema, fieldMap map[string][]shared.FieldSchema) []shared.FieldSchema { for i := range fields { if children, exists := fieldMap[fields[i].FullName]; exists { fields[i].Children = attachChildren(children, fieldMap) diff --git a/tools/shared/types.go b/tools/shared/types.go new file mode 100644 index 0000000..6c7527b --- /dev/null +++ b/tools/shared/types.go @@ -0,0 +1,19 @@ +package shared + +// ComponentSchema represents the extracted schema structure +type ComponentSchema struct { + Name string `json:"name"` + Type string `json:"type"` + Fields []FieldSchema `json:"fields"` +} + +// FieldSchema represents a field in the component configuration +type FieldSchema struct { + Name string `json:"name"` + Type string `json:"type"` + Description string `json:"description"` + Default interface{} `json:"default,omitempty"` + Required bool `json:"required"` + Children []FieldSchema `json:"children,omitempty"` + FullName string `json:"full_name"` +} \ No newline at end of file diff --git a/tools/validate-schemas.go b/tools/validate/main.go similarity index 93% rename from tools/validate-schemas.go rename to tools/validate/main.go index 4e820e5..1e7cac1 100644 --- a/tools/validate-schemas.go +++ b/tools/validate/main.go @@ -7,6 +7,7 @@ import ( "path/filepath" "strings" + "github.com/synadia-io/connect-runtime-wombat/tools/shared" "gopkg.in/yaml.v3" ) @@ -120,23 +121,6 @@ type ConnectField struct { Fields []ConnectField `yaml:"fields,omitempty"` } -// ComponentSchema matches the structure from extract-schemas.go -type ComponentSchema struct { - Name string `json:"name"` - Type string `json:"type"` - Fields []FieldSchema `json:"fields"` -} - -type FieldSchema struct { - Name string `json:"name"` - Type string `json:"type"` - Description string `json:"description"` - Default interface{} `json:"default,omitempty"` - Required bool `json:"required"` - Children []FieldSchema `json:"children,omitempty"` - FullName string `json:"full_name"` -} - // ValidationResult tracks validation issues type ValidationResult struct { ComponentName string @@ -302,13 +286,13 @@ func loadConnectSpec(filename string) (*ConnectSpec, error) { return &spec, nil } -func loadBenthosSchema(filename string) (*ComponentSchema, error) { +func loadBenthosSchema(filename string) (*shared.ComponentSchema, error) { data, err := os.ReadFile(filename) if err != nil { return nil, err } - var schema ComponentSchema + var schema shared.ComponentSchema if err := json.Unmarshal(data, &schema); err != nil { return nil, err } @@ -316,12 +300,12 @@ func loadBenthosSchema(filename string) (*ComponentSchema, error) { return &schema, nil } -func validateFields(connectFields []ConnectField, benthosFields []FieldSchema, parentPath string) []ValidationIssue { +func validateFields(connectFields []ConnectField, benthosFields []shared.FieldSchema, parentPath string) []ValidationIssue { var issues []ValidationIssue // Build maps for easier lookup connectMap := make(map[string]ConnectField) - benthosMap := make(map[string]FieldSchema) + benthosMap := make(map[string]shared.FieldSchema) for _, cf := range connectFields { connectMap[cf.Path] = cf @@ -457,7 +441,7 @@ func extractLastPathComponent(path string) string { // findMatchingBenthosField uses multiple strategies to find the best matching Benthos field // for a given Connect field, handling nested paths and different naming conventions -func findMatchingBenthosField(connectField ConnectField, benthosFields []FieldSchema, parentPath string) (FieldSchema, bool) { +func findMatchingBenthosField(connectField ConnectField, benthosFields []shared.FieldSchema, parentPath string) (shared.FieldSchema, bool) { connectFieldName := extractLastPathComponent(connectField.Path) fullConnectPath := buildFieldPath(parentPath, connectField.Path) @@ -476,9 +460,9 @@ func findMatchingBenthosField(connectField ConnectField, benthosFields []FieldSc } // Strategy 3: Handle array notation differences - Connect uses "field" while Benthos may use "field[]" - cleanConnectPath := strings.Replace(connectField.Path, "[]", "", -1) + cleanConnectPath := strings.ReplaceAll(connectField.Path, "[]", "") for _, bf := range benthosFields { - cleanBenthosPath := strings.Replace(bf.FullName, "[]", "", -1) + cleanBenthosPath := strings.ReplaceAll(bf.FullName, "[]", "") if cleanBenthosPath == cleanConnectPath { return bf, true } @@ -491,7 +475,7 @@ func findMatchingBenthosField(connectField ConnectField, benthosFields []FieldSc } } - return FieldSchema{}, false + return shared.FieldSchema{}, false } func mapConnectTypeToBenthos(field ConnectField) string { From 157275814531d0774268b90d69de594498dad582 Mon Sep 17 00:00:00 2001 From: Matthias De Vriendt Date: Wed, 24 Sep 2025 22:50:59 +0200 Subject: [PATCH 3/4] fix formatting --- tools/extract/main.go | 25 ++++++++++++------------- tools/shared/types.go | 2 +- 2 files changed, 13 insertions(+), 14 deletions(-) diff --git a/tools/extract/main.go b/tools/extract/main.go index b10e6e5..31ebb94 100644 --- a/tools/extract/main.go +++ b/tools/extract/main.go @@ -8,12 +8,11 @@ import ( "strings" // Import components to register them - _ "github.com/wombatwisdom/wombat/public/components/all" "github.com/redpanda-data/benthos/v4/public/service" "github.com/synadia-io/connect-runtime-wombat/tools/shared" + _ "github.com/wombatwisdom/wombat/public/components/all" ) - func main() { fmt.Println("Extracting schemas from Benthos components...") @@ -39,7 +38,7 @@ func main() { }) fmt.Printf("Found %d output components\n", outputCount) - // Extract input specs (sources) + // Extract input specs (sources) var inputCount int env.WalkInputs(func(name string, config *service.ConfigView) { inputCount++ @@ -132,21 +131,21 @@ func extractFieldName(fullName string) string { // Handle array notation: "tls.client_certs[].cert" -> "cert" // Remove array notation first, then extract field name cleanPath := strings.ReplaceAll(fullName, "[]", "") - + parts := []rune(cleanPath) lastDot := -1 - + for i := len(parts) - 1; i >= 0; i-- { if parts[i] == '.' { lastDot = i break } } - + if lastDot == -1 { return cleanPath } - + return string(parts[lastDot+1:]) } @@ -158,7 +157,7 @@ func buildFieldHierarchy(flatFields []shared.FieldSchema) []shared.FieldSchema { for _, field := range flatFields { // Determine parent path parentPath := getParentPath(field.FullName) - + if parentPath == "" { // Root level field rootFields = append(rootFields, field) @@ -176,21 +175,21 @@ func getParentPath(fullName string) string { // Handle array notation: "tls.client_certs[].cert" -> "tls.client_certs" // Remove array notation first, then find parent cleanPath := strings.ReplaceAll(fullName, "[]", "") - + parts := []rune(cleanPath) lastDot := -1 - + for i := len(parts) - 1; i >= 0; i-- { if parts[i] == '.' { lastDot = i break } } - + if lastDot == -1 { return "" } - + return string(parts[:lastDot]) } @@ -201,4 +200,4 @@ func attachChildren(fields []shared.FieldSchema, fieldMap map[string][]shared.Fi } } return fields -} \ No newline at end of file +} diff --git a/tools/shared/types.go b/tools/shared/types.go index 6c7527b..1c03b8c 100644 --- a/tools/shared/types.go +++ b/tools/shared/types.go @@ -16,4 +16,4 @@ type FieldSchema struct { Required bool `json:"required"` Children []FieldSchema `json:"children,omitempty"` FullName string `json:"full_name"` -} \ No newline at end of file +} From 832fe9290594ba2542c3b94e99c1dcaccf8a4060 Mon Sep 17 00:00:00 2001 From: Matthias De Vriendt Date: Thu, 25 Sep 2025 14:01:53 +0200 Subject: [PATCH 4/4] cleanup --- .gitignore | 3 + Taskfile.yml | 8 +- .../main.go => test/cmd/spec-drift/extract.go | 34 +- test/cmd/spec-drift/main.go | 42 +++ test/cmd/spec-drift/types.go | 51 +++ .../cmd/spec-drift/validate.go | 291 +++++------------- tools/shared/types.go | 19 -- 7 files changed, 192 insertions(+), 256 deletions(-) rename tools/extract/main.go => test/cmd/spec-drift/extract.go (85%) create mode 100644 test/cmd/spec-drift/main.go create mode 100644 test/cmd/spec-drift/types.go rename tools/validate/main.go => test/cmd/spec-drift/validate.go (60%) delete mode 100644 tools/shared/types.go diff --git a/.gitignore b/.gitignore index a6d692b..252e2ee 100644 --- a/.gitignore +++ b/.gitignore @@ -17,3 +17,6 @@ coverage.html .claude *.tar + +# Generated schema files +/schemas diff --git a/Taskfile.yml b/Taskfile.yml index f7b0f01..29bfb75 100644 --- a/Taskfile.yml +++ b/Taskfile.yml @@ -136,13 +136,11 @@ tasks: - curl --silent https://raw.githubusercontent.com/synadia-io/connect/refs/heads/main/model/schemas/component-spec-v1.schema.json -O - npx --yes ajv-cli validate -s component-spec-v1.schema.json -d "{{.DIR}}/*/*.yml" --verbose - test:spec: + test:spec:drift: desc: Validate .connect specs against upstream Benthos schemas cmds: - - echo "Extracting schemas from Benthos dependencies..." - - go run ./tools/extract/ - - echo "Validating .connect specs against extracted schemas..." - - go run ./tools/validate/ + - echo "Running spec drift validation..." + - go run ./test/cmd/spec-drift/ docker:local: desc: Build Docker image locally with version information diff --git a/tools/extract/main.go b/test/cmd/spec-drift/extract.go similarity index 85% rename from tools/extract/main.go rename to test/cmd/spec-drift/extract.go index 31ebb94..677a6ca 100644 --- a/tools/extract/main.go +++ b/test/cmd/spec-drift/extract.go @@ -7,26 +7,24 @@ import ( "path/filepath" "strings" - // Import components to register them "github.com/redpanda-data/benthos/v4/public/service" - "github.com/synadia-io/connect-runtime-wombat/tools/shared" _ "github.com/wombatwisdom/wombat/public/components/all" ) -func main() { +func extract() (string, error) { fmt.Println("Extracting schemas from Benthos components...") - // Get the global environment with all registered components env := service.GlobalEnvironment() - // Create schemas directory - schemasDir := "schemas" - if err := os.MkdirAll(schemasDir, 0755); err != nil { - fmt.Printf("Error creating schemas directory: %v\n", err) + schemasDir, err := os.MkdirTemp("", "wombat-schema-") + if err != nil { + fmt.Printf("❌Error creating temp dir for schemas: %v\n", err) os.Exit(1) + } else { + fmt.Printf("Using dir: %v\n", schemasDir) + } - // Extract output specs (sinks) var outputCount int env.WalkOutputs(func(name string, config *service.ConfigView) { outputCount++ @@ -38,7 +36,6 @@ func main() { }) fmt.Printf("Found %d output components\n", outputCount) - // Extract input specs (sources) var inputCount int env.WalkInputs(func(name string, config *service.ConfigView) { inputCount++ @@ -63,6 +60,7 @@ func main() { fmt.Printf("Found %d processor components\n", processorCount) fmt.Printf("\nSchema extraction completed. Files saved to %s/\n", schemasDir) + return schemasDir, nil } func extractAndSaveComponentSchema(name, componentType string, spec *service.ConfigView, schemasDir string) error { @@ -73,7 +71,7 @@ func extractAndSaveComponentSchema(name, componentType string, spec *service.Con } // Convert template data to our schema format - schema := shared.ComponentSchema{ + schema := ComponentSchema{ Name: name, Type: componentType, Fields: convertTemplateFieldsToSchema(templateData.Fields), @@ -100,11 +98,11 @@ func extractAndSaveComponentSchema(name, componentType string, spec *service.Con return nil } -func convertTemplateFieldsToSchema(templateFields []service.TemplateDataPluginField) []shared.FieldSchema { - var fields []shared.FieldSchema +func convertTemplateFieldsToSchema(templateFields []service.TemplateDataPluginField) []FieldSchema { + var fields []FieldSchema for _, tf := range templateFields { - field := shared.FieldSchema{ + field := FieldSchema{ Name: extractFieldName(tf.FullName), FullName: tf.FullName, Type: tf.Type, @@ -149,10 +147,10 @@ func extractFieldName(fullName string) string { return string(parts[lastDot+1:]) } -func buildFieldHierarchy(flatFields []shared.FieldSchema) []shared.FieldSchema { +func buildFieldHierarchy(flatFields []FieldSchema) []FieldSchema { // Group fields by their parent path - fieldMap := make(map[string][]shared.FieldSchema) - rootFields := []shared.FieldSchema{} + fieldMap := make(map[string][]FieldSchema) + rootFields := []FieldSchema{} for _, field := range flatFields { // Determine parent path @@ -193,7 +191,7 @@ func getParentPath(fullName string) string { return string(parts[:lastDot]) } -func attachChildren(fields []shared.FieldSchema, fieldMap map[string][]shared.FieldSchema) []shared.FieldSchema { +func attachChildren(fields []FieldSchema, fieldMap map[string][]FieldSchema) []FieldSchema { for i := range fields { if children, exists := fieldMap[fields[i].FullName]; exists { fields[i].Children = attachChildren(children, fieldMap) diff --git a/test/cmd/spec-drift/main.go b/test/cmd/spec-drift/main.go new file mode 100644 index 0000000..ff77460 --- /dev/null +++ b/test/cmd/spec-drift/main.go @@ -0,0 +1,42 @@ +package main + +import ( + "fmt" + "os" +) + +func main() { + run(".connect") +} + +func run(dir string) { + schemaDir, err := extract() + if err != nil { + fmt.Fprintf(os.Stderr, "Error during extract: %v\n", err) + os.Exit(1) + } + + if err := validate(dir, schemaDir); err != nil { + fmt.Fprintf(os.Stderr, "Error during validate: %v\n", err) + } + + if err := clean(schemaDir); err != nil { + fmt.Fprintf(os.Stderr, "Error cleaning schemas: %v\n", err) + os.Exit(1) + } else { + fmt.Fprintf(os.Stderr, "Removed extracted schemas from: %v\n", schemaDir) + } + +} + +func clean(schemasDir string) error { + if _, err := os.Stat(schemasDir); os.IsNotExist(err) { + return nil + } + + if err := os.RemoveAll(schemasDir); err != nil { + return fmt.Errorf("failed to remove schemas directory: %w", err) + } + + return nil +} diff --git a/test/cmd/spec-drift/types.go b/test/cmd/spec-drift/types.go new file mode 100644 index 0000000..e37926b --- /dev/null +++ b/test/cmd/spec-drift/types.go @@ -0,0 +1,51 @@ +package main + +// ComponentSchema represents the extracted schema structure +type ComponentSchema struct { + Name string `json:"name"` + Type string `json:"type"` + Fields []FieldSchema `json:"fields"` +} + +// FieldSchema represents a field in the component configuration +type FieldSchema struct { + Name string `json:"name"` + Type string `json:"type"` + Description string `json:"description"` + Default interface{} `json:"default,omitempty"` + Required bool `json:"required"` + Children []FieldSchema `json:"children,omitempty"` + FullName string `json:"full_name"` +} + +// ConnectSpec represents our .connect YAML specification +type ConnectSpec struct { + Name string `yaml:"name"` + Type string `yaml:"type"` + Summary string `yaml:"summary,omitempty"` + Description string `yaml:"description,omitempty"` + Fields []ConnectField `yaml:"fields"` +} + +// ConnectField represents a field in our .connect specification +type ConnectField struct { + Path string `yaml:"path"` + Kind string `yaml:"kind,omitempty"` + Type string `yaml:"type,omitempty"` + Description string `yaml:"description,omitempty"` + Default interface{} `yaml:"default,omitempty"` + Fields []ConnectField `yaml:"fields,omitempty"` +} + +// ValidationResult tracks validation issues +type ValidationResult struct { + ComponentName string + ComponentType string + Issues []ValidationIssue +} + +type ValidationIssue struct { + Severity string // "error", "warning", "info" + Path string + Message string +} diff --git a/tools/validate/main.go b/test/cmd/spec-drift/validate.go similarity index 60% rename from tools/validate/main.go rename to test/cmd/spec-drift/validate.go index 1e7cac1..f01d8e4 100644 --- a/tools/validate/main.go +++ b/test/cmd/spec-drift/validate.go @@ -7,230 +7,62 @@ import ( "path/filepath" "strings" - "github.com/synadia-io/connect-runtime-wombat/tools/shared" "gopkg.in/yaml.v3" ) -// Field patterns to ignore during validation - vendor-specific and advanced features -var ignoredFieldPatterns = []string{ - // Synadia NATS-specific authentication extensions - //"auth.nkey_file", - //"auth.nkey", - //"auth.user_credentials_file", - // - //// Advanced/experimental features outside Connect Runtime scope - //"inject_tracing_map", - //"extract_tracing_map", - // - //// Enterprise features - //"batching.processors", // Complex batching processors - // - //// TLS configurations - Connect Runtime uses simplified TLS configuration - //// Many components have their own TLS implementation that differs from Benthos - //"tls", // General TLS config for NATS, MQTT, Pulsar, etc. - //"tls.enabled", - //"tls.skip_cert_verify", - //"tls.enable_renegotiation", - //"tls.root_cas", - //"tls.client_certs", - //"tls.client_certs[].cert", - //"tls.client_certs[].key", - //"tls.client_certs[].password", - // - //// Complex batching and database-specific configurations - //"write_concern", // MongoDB write concern - advanced configuration - //"write_concern.w", - //"write_concern.j", - //"write_concern.w_timeout", - //"batching", // Complex batching configurations not exposed in Connect - //"batching.count", - //"batching.byte_size", - //"batching.period", - //"batching.check", - // - //// Vendor-specific extensions not in base Benthos schema - //// AWS-specific SASL extensions - //"sasl[].aws.region", - //"sasl[].aws.endpoint", - //"sasl[].aws.credentials", - //"region", // AWS region fields - //"endpoint", // AWS endpoint fields - //"credentials", // AWS credentials fields - // - //// Azure-specific configurations - //"targets_input", // Azure blob storage specific - //"endpoint", // Azure endpoint configurations - //"account_key", // Azure account key - // - //// BigQuery-specific fields - //"prefix", // GCP BigQuery prefix - //"suffix", // GCP BigQuery suffix - // - //// GCP PubSub specific - //"create_subscription", // GCP PubSub subscription creation - // - //// Enterprise and advanced features not exposed in Connect Runtime - //"jwt", // JWT authentication - advanced feature - //"jwt.enabled", - //"jwt.private_key_file", - //"jwt.signing_method", - //"jwt.claims", - //"rate_limit", // Rate limiting - advanced feature - //"rate_limit.count", - //"rate_limit.interval", - // - //// Component-specific edge cases - //// AMQP queue declaration differences between Connect and Benthos - //"queue.queue_declare.enabled", - //"queue.queue_declare.durable", - //"queue.queue_declare.auto_delete", - // - //// HTTP client OAuth endpoint parameters not in base schema - //"oauth.oauth2.endpoint_params", - //"oauth2.endpoint_params", - // - //// Additional AWS and GCP specific advanced configurations - //"backoff", // AWS backoff configuration - advanced feature - //"backoff.initial_interval", - //"backoff.max_interval", - //"backoff.max_elapsed_time", - //"csv", // GCP BigQuery CSV format - advanced feature - //"csv.header", - //"csv.delimiter", - //"flow_control", // GCP PubSub flow control - advanced feature - //"flow_control.max_messages", - //"flow_control.max_bytes", -} - -// ConnectSpec represents our .connect YAML specification -type ConnectSpec struct { - Name string `yaml:"name"` - Type string `yaml:"type"` - Summary string `yaml:"summary,omitempty"` - Description string `yaml:"description,omitempty"` - Fields []ConnectField `yaml:"fields"` -} - -// ConnectField represents a field in our .connect specification -type ConnectField struct { - Path string `yaml:"path"` - Kind string `yaml:"kind,omitempty"` - Type string `yaml:"type,omitempty"` - Description string `yaml:"description,omitempty"` - Default interface{} `yaml:"default,omitempty"` - Fields []ConnectField `yaml:"fields,omitempty"` -} - -// ValidationResult tracks validation issues -type ValidationResult struct { - ComponentName string - ComponentType string - Issues []ValidationIssue -} - -type ValidationIssue struct { - Severity string // "error", "warning", "info" - Path string - Message string -} - -func main() { +func validate(connectPath, schemasDir string) error { fmt.Println("Validating .connect specs against extracted Benthos schemas...") var allResults []ValidationResult hasErrors := false // Validate sinks (outputs) - sinkResults, err := validateComponentType("sinks", "output") + sinkResults, err := validateComponentType(connectPath, "sinks", schemasDir, "output") if err != nil { - fmt.Printf("Error validating sinks: %v\n", err) - os.Exit(1) + return fmt.Errorf("error validating sinks: %w", err) } allResults = append(allResults, sinkResults...) // Validate sources (inputs) - sourceResults, err := validateComponentType("sources", "input") + sourceResults, err := validateComponentType(connectPath, "sources", schemasDir, "input") if err != nil { - fmt.Printf("Error validating sources: %v\n", err) - os.Exit(1) + return fmt.Errorf("error validating sources: %w", err) } allResults = append(allResults, sourceResults...) // Validate processors - processorResults, err := validateComponentType("processors", "processor") + processorResults, err := validateComponentType(connectPath, "processors", schemasDir, "processor") if err != nil { - fmt.Printf("Error validating processors: %v\n", err) - os.Exit(1) + return fmt.Errorf("error validating processors: %w", err) } allResults = append(allResults, processorResults...) - // Report results - fmt.Printf("\n=== VALIDATION RESULTS ===\n\n") - - for _, result := range allResults { - if len(result.Issues) == 0 { - fmt.Printf("✅ %s/%s: OK\n", result.ComponentType, result.ComponentName) - continue - } - - fmt.Printf("❌ %s/%s: %d issues found\n", result.ComponentType, result.ComponentName, len(result.Issues)) - - for _, issue := range result.Issues { - icon := getIssueIcon(issue.Severity) - if issue.Severity == "error" { - hasErrors = true - } - fmt.Printf(" %s [%s] %s: %s\n", icon, strings.ToUpper(issue.Severity), issue.Path, issue.Message) - } - fmt.Println() - } - - // Summary - totalComponents := len(allResults) - componentsWithIssues := 0 - totalIssues := 0 - - for _, result := range allResults { - if len(result.Issues) > 0 { - componentsWithIssues++ - } - totalIssues += len(result.Issues) - } - - fmt.Printf("=== SUMMARY ===\n") - fmt.Printf("Total components validated: %d\n", totalComponents) - fmt.Printf("Components with issues: %d\n", componentsWithIssues) - fmt.Printf("Total issues found: %d\n", totalIssues) - + hasErrors = outputText(allResults) if hasErrors { - fmt.Printf("\n❌ Validation failed with errors\n") - os.Exit(1) - } else if totalIssues > 0 { - fmt.Printf("\n⚠️ Validation completed with warnings\n") - } else { - fmt.Printf("\n✅ All validations passed\n") + return fmt.Errorf("validation failed with errors") } + return nil } -func validateComponentType(connectDir, schemaType string) ([]ValidationResult, error) { +func validateComponentType(connectBasePath, connectDir, schemasDir, schemaType string) ([]ValidationResult, error) { var results []ValidationResult - connectPath := filepath.Join(".connect", connectDir) + fullConnectPath := filepath.Join(connectBasePath, connectDir) // Check if directory exists - if _, err := os.Stat(connectPath); os.IsNotExist(err) { - fmt.Printf("Directory %s does not exist, skipping...\n", connectPath) + if _, err := os.Stat(fullConnectPath); os.IsNotExist(err) { + fmt.Printf("Directory %s does not exist, skipping...\n", fullConnectPath) return results, nil } // Read all YAML files in the directory - files, err := filepath.Glob(filepath.Join(connectPath, "*.yml")) + files, err := filepath.Glob(filepath.Join(fullConnectPath, "*.yml")) if err != nil { - return nil, fmt.Errorf("failed to glob YAML files in %s: %w", connectPath, err) + return nil, fmt.Errorf("failed to glob YAML files in %s: %w", fullConnectPath, err) } for _, file := range files { - result, err := validateSingleComponent(file, schemaType) + result, err := validateSingleComponent(file, schemasDir, schemaType) if err != nil { fmt.Printf("Warning: failed to validate %s: %v\n", file, err) continue @@ -241,7 +73,7 @@ func validateComponentType(connectDir, schemaType string) ([]ValidationResult, e return results, nil } -func validateSingleComponent(connectFile, schemaType string) (ValidationResult, error) { +func validateSingleComponent(connectFile, schemasDir, schemaType string) (ValidationResult, error) { // Load .connect spec connectSpec, err := loadConnectSpec(connectFile) if err != nil { @@ -255,7 +87,7 @@ func validateSingleComponent(connectFile, schemaType string) (ValidationResult, } // Load corresponding Benthos schema - schemaFile := filepath.Join("schemas", fmt.Sprintf("%s_%s.json", schemaType, connectSpec.Name)) + schemaFile := filepath.Join(schemasDir, fmt.Sprintf("%s_%s.json", schemaType, connectSpec.Name)) benthos, err := loadBenthosSchema(schemaFile) if err != nil { result.Issues = append(result.Issues, ValidationIssue{ @@ -286,13 +118,13 @@ func loadConnectSpec(filename string) (*ConnectSpec, error) { return &spec, nil } -func loadBenthosSchema(filename string) (*shared.ComponentSchema, error) { +func loadBenthosSchema(filename string) (*ComponentSchema, error) { data, err := os.ReadFile(filename) if err != nil { return nil, err } - var schema shared.ComponentSchema + var schema ComponentSchema if err := json.Unmarshal(data, &schema); err != nil { return nil, err } @@ -300,12 +132,12 @@ func loadBenthosSchema(filename string) (*shared.ComponentSchema, error) { return &schema, nil } -func validateFields(connectFields []ConnectField, benthosFields []shared.FieldSchema, parentPath string) []ValidationIssue { +func validateFields(connectFields []ConnectField, benthosFields []FieldSchema, parentPath string) []ValidationIssue { var issues []ValidationIssue // Build maps for easier lookup connectMap := make(map[string]ConnectField) - benthosMap := make(map[string]shared.FieldSchema) + benthosMap := make(map[string]FieldSchema) for _, cf := range connectFields { connectMap[cf.Path] = cf @@ -325,11 +157,6 @@ func validateFields(connectFields []ConnectField, benthosFields []shared.FieldSc // Try multiple matching strategies for better field resolution benthosField, exists := findMatchingBenthosField(connectField, benthosFields, parentPath) if !exists { - // Check if this field should be ignored (vendor-specific, advanced features, etc.) - if shouldIgnoreField(connectField.Path) || shouldIgnoreField(fieldPath) { - continue - } - fieldName := extractLastPathComponent(connectField.Path) issues = append(issues, ValidationIssue{ Severity: "warning", @@ -380,11 +207,6 @@ func validateFields(connectFields []ConnectField, benthosFields []shared.FieldSc } if !found { - // Skip ignored vendor-specific and advanced fields - if shouldIgnoreField(benthosField.FullName) { - continue - } - issues = append(issues, ValidationIssue{ Severity: "warning", Path: fieldPath, @@ -441,7 +263,7 @@ func extractLastPathComponent(path string) string { // findMatchingBenthosField uses multiple strategies to find the best matching Benthos field // for a given Connect field, handling nested paths and different naming conventions -func findMatchingBenthosField(connectField ConnectField, benthosFields []shared.FieldSchema, parentPath string) (shared.FieldSchema, bool) { +func findMatchingBenthosField(connectField ConnectField, benthosFields []FieldSchema, parentPath string) (FieldSchema, bool) { connectFieldName := extractLastPathComponent(connectField.Path) fullConnectPath := buildFieldPath(parentPath, connectField.Path) @@ -475,7 +297,7 @@ func findMatchingBenthosField(connectField ConnectField, benthosFields []shared. } } - return shared.FieldSchema{}, false + return FieldSchema{}, false } func mapConnectTypeToBenthos(field ConnectField) string { @@ -500,17 +322,6 @@ func mapConnectTypeToBenthos(field ConnectField) string { } } -// shouldIgnoreField checks if a field should be ignored during validation -// Returns true for vendor-specific extensions and advanced features outside Connect Runtime scope -func shouldIgnoreField(fullName string) bool { - for _, pattern := range ignoredFieldPatterns { - if fullName == pattern { - return true - } - } - return false -} - func getIssueIcon(severity string) string { switch severity { case "error": @@ -523,3 +334,55 @@ func getIssueIcon(severity string) string { return "⚪" } } + +func outputText(allResults []ValidationResult) bool { + hasErrors := false + + // Report results + fmt.Printf("\n=== VALIDATION RESULTS ===\n\n") + + for _, result := range allResults { + if len(result.Issues) == 0 { + fmt.Printf("✅ %s/%s: OK\n", result.ComponentType, result.ComponentName) + continue + } + + fmt.Printf("❌ %s/%s: %d issues found\n", result.ComponentType, result.ComponentName, len(result.Issues)) + + for _, issue := range result.Issues { + icon := getIssueIcon(issue.Severity) + if issue.Severity == "error" { + hasErrors = true + } + fmt.Printf(" %s [%s] %s: %s\n", icon, strings.ToUpper(issue.Severity), issue.Path, issue.Message) + } + fmt.Println() + } + + // Summary + totalComponents := len(allResults) + componentsWithIssues := 0 + totalIssues := 0 + + for _, result := range allResults { + if len(result.Issues) > 0 { + componentsWithIssues++ + } + totalIssues += len(result.Issues) + } + + fmt.Printf("=== SUMMARY ===\n") + fmt.Printf("Total components validated: %d\n", totalComponents) + fmt.Printf("Components with issues: %d\n", componentsWithIssues) + fmt.Printf("Total issues found: %d\n", totalIssues) + + if hasErrors { + fmt.Printf("\n❌ Validation failed with errors\n") + } else if totalIssues > 0 { + fmt.Printf("\n⚠️ Validation completed with warnings\n") + } else { + fmt.Printf("\n✅ All validations passed\n") + } + + return hasErrors +} diff --git a/tools/shared/types.go b/tools/shared/types.go deleted file mode 100644 index 1c03b8c..0000000 --- a/tools/shared/types.go +++ /dev/null @@ -1,19 +0,0 @@ -package shared - -// ComponentSchema represents the extracted schema structure -type ComponentSchema struct { - Name string `json:"name"` - Type string `json:"type"` - Fields []FieldSchema `json:"fields"` -} - -// FieldSchema represents a field in the component configuration -type FieldSchema struct { - Name string `json:"name"` - Type string `json:"type"` - Description string `json:"description"` - Default interface{} `json:"default,omitempty"` - Required bool `json:"required"` - Children []FieldSchema `json:"children,omitempty"` - FullName string `json:"full_name"` -}