Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,6 @@ coverage.html

.claude
*.tar

# Generated schema files
/schemas
32 changes: 32 additions & 0 deletions Taskfile.yml
Original file line number Diff line number Diff line change
Expand Up @@ -92,11 +92,37 @@ tasks:
- task: test

test:crawl:
desc: Crawl component definitions and validate results
cmds:
# Setup
- nats kv add TMP > /dev/null
- defer: nats kv del --force TMP

# Pre-crawl validation
- task: validate
- echo "Pre-crawl count:" && find .connect -name '*.yml' -not -path '.connect/runtime.yml' | wc -l

# Run crawler
- cd ../connect-node && go run ./cmd/connect-node crawl ../connect-runtime-wombat --bucket=TMP

# Post-crawl validation
- echo "Validating crawled data..."
- nats kv ls TMP --no-headers || true
- |
# Verify all components were crawled
CRAWLED_COUNT=$(nats kv ls TMP --no-headers 2>/dev/null | wc -l)
SOURCE_COUNT=$(find .connect -name '*.yml' -not -path '.connect/runtime.yml' | wc -l)
echo "Crawled: $CRAWLED_COUNT entries, Expected: $SOURCE_COUNT entries"
if [ "$CRAWLED_COUNT" -eq 0 ]; then
echo "ERROR: No entries found in KV bucket"
exit 1
fi
if [ "$CRAWLED_COUNT" -lt "$SOURCE_COUNT" ]; then
echo "WARNING: Some components may not have been crawled"
echo "This could be expected if some files are not component specs"
fi
- echo "βœ“ Crawl validation complete"

validate:
cmds:
- curl --silent https://raw.githubusercontent.com/synadia-io/connect/refs/heads/main/model/schemas/component-spec-v1.schema.json -O
Expand All @@ -110,6 +136,12 @@ tasks:
- curl --silent https://raw.githubusercontent.com/synadia-io/connect/refs/heads/main/model/schemas/component-spec-v1.schema.json -O
- npx --yes ajv-cli validate -s component-spec-v1.schema.json -d "{{.DIR}}/*/*.yml" --verbose

test:spec:drift:
desc: Validate .connect specs against upstream Benthos schemas
cmds:
- echo "Running spec drift validation..."
- go run ./test/cmd/spec-drift/

docker:local:
desc: Build Docker image locally with version information
vars:
Expand Down
201 changes: 201 additions & 0 deletions test/cmd/spec-drift/extract.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,201 @@
package main

import (
"encoding/json"
"fmt"
"os"
"path/filepath"
"strings"

"github.com/redpanda-data/benthos/v4/public/service"
_ "github.com/wombatwisdom/wombat/public/components/all"
)

func extract() (string, error) {
fmt.Println("Extracting schemas from Benthos components...")

env := service.GlobalEnvironment()

schemasDir, err := os.MkdirTemp("", "wombat-schema-")
if err != nil {
fmt.Printf("❌Error creating temp dir for schemas: %v\n", err)
os.Exit(1)
} else {
fmt.Printf("Using dir: %v\n", schemasDir)

}

var outputCount int
env.WalkOutputs(func(name string, config *service.ConfigView) {
outputCount++
if err := extractAndSaveComponentSchema(name, "output", config, schemasDir); err != nil {
fmt.Printf("Error extracting schema for output %s: %v\n", name, err)
return
}
fmt.Printf("βœ“ Extracted schema for output: %s\n", name)
})
fmt.Printf("Found %d output components\n", outputCount)

var inputCount int
env.WalkInputs(func(name string, config *service.ConfigView) {
inputCount++
if err := extractAndSaveComponentSchema(name, "input", config, schemasDir); err != nil {
fmt.Printf("Error extracting schema for input %s: %v\n", name, err)
return
}
fmt.Printf("βœ“ Extracted schema for input: %s\n", name)
})
fmt.Printf("Found %d input components\n", inputCount)

// Extract processor specs
var processorCount int
env.WalkProcessors(func(name string, config *service.ConfigView) {
processorCount++
if err := extractAndSaveComponentSchema(name, "processor", config, schemasDir); err != nil {
fmt.Printf("Error extracting schema for processor %s: %v\n", name, err)
return
}
fmt.Printf("βœ“ Extracted schema for processor: %s\n", name)
})
fmt.Printf("Found %d processor components\n", processorCount)

fmt.Printf("\nSchema extraction completed. Files saved to %s/\n", schemasDir)
return schemasDir, nil
}

func extractAndSaveComponentSchema(name, componentType string, spec *service.ConfigView, schemasDir string) error {
// Get template data which contains the structured field information
templateData, err := spec.TemplateData()
if err != nil {
return fmt.Errorf("failed to get template data: %w", err)
}

// Convert template data to our schema format
schema := ComponentSchema{
Name: name,
Type: componentType,
Fields: convertTemplateFieldsToSchema(templateData.Fields),
}

// Save to JSON file
filename := filepath.Join(schemasDir, fmt.Sprintf("%s_%s.json", componentType, name))
file, err := os.Create(filename)
if err != nil {
return fmt.Errorf("failed to create file %s: %w", filename, err)
}
defer func() {
if err := file.Close(); err != nil {
fmt.Printf("Warning: failed to close file %s: %v\n", filename, err)
}
}()

encoder := json.NewEncoder(file)
encoder.SetIndent("", " ")
if err := encoder.Encode(schema); err != nil {
return fmt.Errorf("failed to encode schema: %w", err)
}

return nil
}

func convertTemplateFieldsToSchema(templateFields []service.TemplateDataPluginField) []FieldSchema {
var fields []FieldSchema

for _, tf := range templateFields {
field := FieldSchema{
Name: extractFieldName(tf.FullName),
FullName: tf.FullName,
Type: tf.Type,
Description: tf.Description,
Required: tf.DefaultMarshalled == "", // Simple heuristic
}

// Parse default value if available
if tf.DefaultMarshalled != "" {
var defaultVal interface{}
if err := json.Unmarshal([]byte(tf.DefaultMarshalled), &defaultVal); err == nil {
field.Default = defaultVal
}
}

fields = append(fields, field)
}

// Build hierarchical structure from flat list
return buildFieldHierarchy(fields)
}

func extractFieldName(fullName string) string {
// Handle array notation: "tls.client_certs[].cert" -> "cert"
// Remove array notation first, then extract field name
cleanPath := strings.ReplaceAll(fullName, "[]", "")

parts := []rune(cleanPath)
lastDot := -1

for i := len(parts) - 1; i >= 0; i-- {
if parts[i] == '.' {
lastDot = i
break
}
}

if lastDot == -1 {
return cleanPath
}

return string(parts[lastDot+1:])
}

func buildFieldHierarchy(flatFields []FieldSchema) []FieldSchema {
// Group fields by their parent path
fieldMap := make(map[string][]FieldSchema)
rootFields := []FieldSchema{}

for _, field := range flatFields {
// Determine parent path
parentPath := getParentPath(field.FullName)

if parentPath == "" {
// Root level field
rootFields = append(rootFields, field)
} else {
// Child field
fieldMap[parentPath] = append(fieldMap[parentPath], field)
}
}

// Recursively build hierarchy
return attachChildren(rootFields, fieldMap)
}

func getParentPath(fullName string) string {
// Handle array notation: "tls.client_certs[].cert" -> "tls.client_certs"
// Remove array notation first, then find parent
cleanPath := strings.ReplaceAll(fullName, "[]", "")

parts := []rune(cleanPath)
lastDot := -1

for i := len(parts) - 1; i >= 0; i-- {
if parts[i] == '.' {
lastDot = i
break
}
}

if lastDot == -1 {
return ""
}

return string(parts[:lastDot])
}

func attachChildren(fields []FieldSchema, fieldMap map[string][]FieldSchema) []FieldSchema {
for i := range fields {
if children, exists := fieldMap[fields[i].FullName]; exists {
fields[i].Children = attachChildren(children, fieldMap)
}
}
return fields
}
42 changes: 42 additions & 0 deletions test/cmd/spec-drift/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package main

import (
"fmt"
"os"
)

func main() {
run(".connect")
}

func run(dir string) {
schemaDir, err := extract()
if err != nil {
fmt.Fprintf(os.Stderr, "Error during extract: %v\n", err)
os.Exit(1)
}

if err := validate(dir, schemaDir); err != nil {
fmt.Fprintf(os.Stderr, "Error during validate: %v\n", err)
}

if err := clean(schemaDir); err != nil {
fmt.Fprintf(os.Stderr, "Error cleaning schemas: %v\n", err)
os.Exit(1)
} else {
fmt.Fprintf(os.Stderr, "Removed extracted schemas from: %v\n", schemaDir)
}

}

func clean(schemasDir string) error {
if _, err := os.Stat(schemasDir); os.IsNotExist(err) {
return nil
}

if err := os.RemoveAll(schemasDir); err != nil {
return fmt.Errorf("failed to remove schemas directory: %w", err)
}

return nil
}
51 changes: 51 additions & 0 deletions test/cmd/spec-drift/types.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package main

// ComponentSchema represents the extracted schema structure
type ComponentSchema struct {
Name string `json:"name"`
Type string `json:"type"`
Fields []FieldSchema `json:"fields"`
}

// FieldSchema represents a field in the component configuration
type FieldSchema struct {
Name string `json:"name"`
Type string `json:"type"`
Description string `json:"description"`
Default interface{} `json:"default,omitempty"`
Required bool `json:"required"`
Children []FieldSchema `json:"children,omitempty"`
FullName string `json:"full_name"`
}

// ConnectSpec represents our .connect YAML specification
type ConnectSpec struct {
Name string `yaml:"name"`
Type string `yaml:"type"`
Summary string `yaml:"summary,omitempty"`
Description string `yaml:"description,omitempty"`
Fields []ConnectField `yaml:"fields"`
}

// ConnectField represents a field in our .connect specification
type ConnectField struct {
Path string `yaml:"path"`
Kind string `yaml:"kind,omitempty"`
Type string `yaml:"type,omitempty"`
Description string `yaml:"description,omitempty"`
Default interface{} `yaml:"default,omitempty"`
Fields []ConnectField `yaml:"fields,omitempty"`
}

// ValidationResult tracks validation issues
type ValidationResult struct {
ComponentName string
ComponentType string
Issues []ValidationIssue
}

type ValidationIssue struct {
Severity string // "error", "warning", "info"
Path string
Message string
}
Loading
Loading