From 5fa552a7ec7980429f243aac6c1793f3245bee2e Mon Sep 17 00:00:00 2001 From: Noureddine Seddik Date: Mon, 26 Jul 2021 11:50:08 +0100 Subject: [PATCH] DataSource with optional version support, latest by default --- README.md | 30 +++++++++++- examples/main.tf | 6 +++ schemaregistry/data_source_schema.go | 14 ++++-- schemaregistry/data_source_schema_test.go | 57 +++++++++++++++++++++++ 4 files changed, 103 insertions(+), 4 deletions(-) diff --git a/README.md b/README.md index e7cb216..6155a43 100644 --- a/README.md +++ b/README.md @@ -32,7 +32,9 @@ resource "schemaregistry_schema" "main" { Schema registry references can be used to allow [putting Several Event Types in the Same Topic](https://www.confluent.io/blog/multiple-event-types-in-the-same-kafka-topic/). Please refer to Confluent [Schema Registry API Reference](https://docs.confluent.io/platform/current/schema-registry/develop/api.html) for details. -Referenced versions will always be upgraded with the referenced event schema version. +### Upgrade reference version to the latest event schema version + +Reference the event schema `resource` from a schema with reference wil upgrade a reference alongside with its referenced schema. ``` resource "schemaregistry_schema" "referenced_event" { @@ -63,7 +65,33 @@ resource "schemaregistry_schema" "with_reference" { version = schemaregistry_schema.referenced_event.version } } +``` + +### Stick reference version to a given version + +Use a `dataSource` to stick a reference to a **given version**, while upgrading the referenced event schema. + +``` +resource "schemaregistry_schema" "referenced_event_latest" { + subject = "referenced_event_subject" + schema = file("") +} + +data "schemaregistry_schema" "referenced_event_v1" { + subject = "other_referenced_event_subject" + schema = "{\"type\":\"record\",\"name\":\"other_event\",\"namespace\":\"akc.test\",\"fields\":[{\"name\":\"bar\",\"type\":\"string\"}]}" +} +resource "schemaregistry_schema" "with_reference_to_v1" { + subject = "with_reference_subject" + schema = "[\"akc.test.event\", \"akc.test.other_event\"]" + + references { + name = "akc.test.event" + subject = data.schemaregistry_schema.referenced_event_v1.subject + version = data.schemaregistry_schema.referenced_event_v1.version + } +} ``` ## The schema data source diff --git a/examples/main.tf b/examples/main.tf index 3d10347..9b5e424 100644 --- a/examples/main.tf +++ b/examples/main.tf @@ -31,6 +31,12 @@ data "schemaregistry_schema" "main" { subject = schemaregistry_schema.with_reference.subject } +data "schemaregistry_schema" "user_added_v1" { + subject = "MyTopic-akc.test.userAdded-value" + schema = 1 +} + + output "schema_id" { value = data.schemaregistry_schema.main.id } diff --git a/schemaregistry/data_source_schema.go b/schemaregistry/data_source_schema.go index cc4e76d..22a1233 100644 --- a/schemaregistry/data_source_schema.go +++ b/schemaregistry/data_source_schema.go @@ -19,7 +19,7 @@ func dataSourceSchema() *schema.Resource { }, "version": { Type: schema.TypeInt, - Computed: true, + Optional: true, Description: "The version of the schema", }, "schema_id": { @@ -64,13 +64,21 @@ func dataSourceSubjectRead(ctx context.Context, d *schema.ResourceData, m interf var diags diag.Diagnostics subject := d.Get("subject").(string) + version := d.Get("version").(int) client := m.(*srclient.SchemaRegistryClient) + var schema *srclient.Schema + var err error + + if version > 0 { + schema, err = client.GetSchemaByVersionWithArbitrarySubject(subject, version) + + } else { + schema, err = client.GetLatestSchemaWithArbitrarySubject(subject) + } - schema, err := client.GetLatestSchemaWithArbitrarySubject(subject) if err != nil { return diag.FromErr(err) - // return diag.FromErr(fmt.Errorf("unknown schema for subject '%s'", subject)) } d.Set("schema", schema.Schema()) diff --git a/schemaregistry/data_source_schema_test.go b/schemaregistry/data_source_schema_test.go index 2de7874..9d7df9b 100644 --- a/schemaregistry/data_source_schema_test.go +++ b/schemaregistry/data_source_schema_test.go @@ -106,4 +106,61 @@ func TestAccDataSourceSchemaReferences_basic(t *testing.T) { }, }, }) +} + +func TestAccDataSourceSchema_atVersion(t *testing.T) { + // GIVEN + url, found := os.LookupEnv("SCHEMA_REGISTRY_URL") + if !found { + t.Fatalf("SCHEMA_REGISTRY_URL must be set for acceptance tests") + } + username := os.Getenv("SCHEMA_REGISTRY_USERNAME") + password := os.Getenv("SCHEMA_REGISTRY_PASSWORD") + + client := srclient.CreateSchemaRegistryClient(url) + if (username != "") && (password != "") { + client.SetCredentials(username, password) + } + + // AND + u, err := uuid.GenerateUUID() + if err != nil { + t.Fatal(err) + } + + referencedSchemaSubject := fmt.Sprintf("referencedSub-%s", u) + referencedSchema := strings.Replace(fixtureAvro1, "\\", "", -1) + referencedSchemaLatest := strings.Replace(fixtureAvro2, "\\", "", -1) + + // AND + if _, err = client.CreateSchemaWithArbitrarySubject(referencedSchemaSubject, referencedSchema, srclient.Avro); err != nil { + t.Fatalf("could not create schema for subject: %s, err: %s", referencedSchema, err) + } + + if _, err = client.CreateSchemaWithArbitrarySubject(referencedSchemaSubject, referencedSchemaLatest, srclient.Avro); err != nil { + t.Fatalf("could not create schema for subject: %s, err: %s", referencedSchema, err) + } + + // WHEN / THEN + resource.Test(t, resource.TestCase{ + ProviderFactories: testAccProviders, + PreCheck: func() { testAccPreCheck(t) }, + Steps: []resource.TestStep{ + { + Config: fmt.Sprintf(` + data "schemaregistry_schema" "schemaAtVersion" { + subject = "%s" + version = 1 + } + `, referencedSchemaSubject), + Check: resource.ComposeTestCheckFunc( + resource.TestCheckResourceAttr("data.schemaregistry_schema.schemaAtVersion", "id", referencedSchemaSubject), + resource.TestCheckResourceAttr("data.schemaregistry_schema.schemaAtVersion", "subject", referencedSchemaSubject), + resource.TestCheckResourceAttrSet("data.schemaregistry_schema.schemaAtVersion", "schema_id"), + resource.TestCheckResourceAttr("data.schemaregistry_schema.schemaAtVersion", "version", "1"), + resource.TestCheckResourceAttr("data.schemaregistry_schema.schemaAtVersion", "schema", referencedSchema), + ), + }, + }, + }) } \ No newline at end of file