Skip to content

Commit

Permalink
Update Schema Registry client (#3135)
Browse files Browse the repository at this point in the history
This is a follow-up to twmb/franz-go#867.

Since `sr.URLs()` from franz-go doesn't validate URLs during
client initialisation, I decided to leave our `url.Parse()` in
there because this way we can catch malformed URLs on startup,
before any HTTP requests are attempted.

Also update the changelog. We forgot to do it in #3134

Signed-off-by: Mihai Todor <[email protected]>
  • Loading branch information
mihaitodor authored Jan 22, 2025
1 parent e0de935 commit e6a224d
Show file tree
Hide file tree
Showing 7 changed files with 18 additions and 83 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ All notable changes to this project will be documented in this file.

- New `mysql_cdc` input supporting change data capture (CDC) from MySQL. (@rockwotj, @le-vlad)
- Field `instance_id` added to `kafka`, `kafka_franz`, `ockam_kafka`, `redpanda`, `redpanda_common`, and `redpanda_migrator` inputs. (@rockwotj)
- Fields `rebalance_timeout`, `session_timeout` and `heartbeat_interval` added to the `kafka_franz`, `redpanda`, `redpanda_common`, `redpanda_migrator` and `ockam_kafka` inputs. (@rockwotj)

## 4.45.1 - 2025-01-17

Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ require (
github.com/twmb/franz-go v1.18.0
github.com/twmb/franz-go/pkg/kadm v1.13.0
github.com/twmb/franz-go/pkg/kmsg v1.9.0
github.com/twmb/franz-go/pkg/sr v1.2.0
github.com/twmb/franz-go/pkg/sr v1.3.0
github.com/vmihailenco/msgpack/v5 v5.4.1
github.com/xdg-go/scram v1.1.2
github.com/xeipuuv/gojsonschema v1.2.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1889,8 +1889,8 @@ github.com/twmb/franz-go/pkg/kadm v1.13.0 h1:bJq4C2ZikUE2jh/wl9MtMTQ/kpmnBgVFh8X
github.com/twmb/franz-go/pkg/kadm v1.13.0/go.mod h1:VMvpfjz/szpH9WB+vGM+rteTzVv0djyHFimci9qm2C0=
github.com/twmb/franz-go/pkg/kmsg v1.9.0 h1:JojYUph2TKAau6SBtErXpXGC7E3gg4vGZMv9xFU/B6M=
github.com/twmb/franz-go/pkg/kmsg v1.9.0/go.mod h1:CMbfazviCyY6HM0SXuG5t9vOwYDHRCSrJJyBAe5paqg=
github.com/twmb/franz-go/pkg/sr v1.2.0 h1:zYr0Ly7KLFfeCGaSr8teN6LvAVeYVrZoUsyyPHTYB+M=
github.com/twmb/franz-go/pkg/sr v1.2.0/go.mod h1:gpd2Xl5/prkj3gyugcL+rVzagjaxFqMgvKMYcUlrpDw=
github.com/twmb/franz-go/pkg/sr v1.3.0 h1:UlXpZ2suGgylzQBUb6Wn1jzqVShoPGzt7BbixznJ4qo=
github.com/twmb/franz-go/pkg/sr v1.3.0/go.mod h1:gpd2Xl5/prkj3gyugcL+rVzagjaxFqMgvKMYcUlrpDw=
github.com/uptrace/bun v1.1.12 h1:sOjDVHxNTuM6dNGaba0wUuz7KvDE1BmNu9Gqs2gJSXQ=
github.com/uptrace/bun v1.1.12/go.mod h1:NPG6JGULBeQ9IU6yHp7YGELRa5Agmd7ATZdz4tGZ6z0=
github.com/uptrace/bun/dialect/pgdialect v1.1.12 h1:m/CM1UfOkoBTglGO5CUTKnIKKOApOYxkcP2qn0F9tJk=
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,10 +73,6 @@ basic_auth:
require.NoError(t, err)

e, err := newSchemaRegistryDecoderFromConfig(conf, service.MockResources())
if e != nil {
assert.Equal(t, test.expectedBaseURL, e.client.SchemaRegistryBaseURL.String())
}

if err == nil {
_ = e.Close(context.Background())
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,10 +106,6 @@ subject: foo
require.NoError(t, err)

e, err := newSchemaRegistryEncoderFromConfig(conf, service.MockResources())
if e != nil {
assert.Equal(t, test.expectedBaseURL, e.client.SchemaRegistryBaseURL.String())
}

if err == nil {
_ = e.Close(context.Background())
}
Expand Down
84 changes: 13 additions & 71 deletions internal/impl/confluent/sr/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,9 @@ import (
"crypto/tls"
"fmt"
"io/fs"
"net"
"net/http"
"net/url"
"slices"
"time"

"github.com/twmb/franz-go/pkg/sr"

Expand All @@ -32,26 +30,7 @@ import (

// Client is used to make requests to a schema registry.
type Client struct {
SchemaRegistryBaseURL *url.URL
clientSR *sr.Client
requestSigner func(f fs.FS, req *http.Request) error
mgr *service.Resources
}

type roundTripper struct {
reqSigner func(req *http.Request) error
*http.Transport
}

func (rt *roundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
// This is naughty, but it's probably fine...
// The `RoundTrip` docs state that "RoundTrip should not modify the request, except for consuming and closing the Request's Body."
// This is because the following code https://github.com/golang/go/blob/e25b913127ac8ba26c4ecc39288c7f8781f4ef5d/src/net/http/client.go#L246-L252
// already tries to set the `Authorization` header if `req.URL.User` is already set, but `reqSigner` replicates the same functionality anyway.
if err := rt.reqSigner(req); err != nil {
return nil, err
}
return rt.Transport.RoundTrip(req)
Client *sr.Client
}

// NewClient creates a new schema registry client.
Expand All @@ -61,55 +40,18 @@ func NewClient(
tlsConf *tls.Config,
mgr *service.Resources,
) (*Client, error) {
u, err := url.Parse(urlStr)
_, err := url.Parse(urlStr)
if err != nil {
return nil, fmt.Errorf("failed to parse url: %w", err)
}

reqSignerWrapped := func(req *http.Request) error { return reqSigner(mgr.FS(), req) }

// Timeout copied from https://github.com/twmb/franz-go/blob/cea7aa5d803781e5f0162187795482ba1990c729/pkg/sr/client.go#L73
hClient := &http.Client{Timeout: 5 * time.Second}
if c, ok := http.DefaultTransport.(*http.Transport); ok {
cloned := c.Clone()
cloned.TLSClientConfig = tlsConf
hClient.Transport = &roundTripper{
reqSigner: reqSignerWrapped,
Transport: cloned,
}
} else {
hClient.Transport = &roundTripper{
reqSigner: reqSignerWrapped,
// Copied from https://github.com/twmb/franz-go/blob/cea7aa5d803781e5f0162187795482ba1990c729/pkg/sr/clientopt.go#L48-L68
// TODO: Why are we setting `MaxIdleConnsPerHost: 100`? It's not set in `http.DefaultTransport`.
// Note: `http.DefaultMaxIdleConnsPerHost` is 2.
Transport: &http.Transport{
Proxy: http.ProxyFromEnvironment,
DialContext: (&net.Dialer{
Timeout: 30 * time.Second,
KeepAlive: 30 * time.Second,
}).DialContext,
TLSClientConfig: tlsConf,
ForceAttemptHTTP2: true,
MaxIdleConns: 100,
MaxIdleConnsPerHost: 100,
IdleConnTimeout: 90 * time.Second,
TLSHandshakeTimeout: 10 * time.Second,
ExpectContinueTimeout: 1 * time.Second,
},
}
}

clientSR, err := sr.NewClient(sr.HTTPClient(hClient), sr.URLs(urlStr))
clientSR, err := sr.NewClient(sr.URLs(urlStr), sr.PreReq(func(req *http.Request) error { return reqSigner(mgr.FS(), req) }))
if err != nil {
return nil, fmt.Errorf("failed to init client: %w", err)
}

return &Client{
clientSR: clientSR,
SchemaRegistryBaseURL: u,
requestSigner: reqSigner,
mgr: mgr,
Client: clientSR,
}, nil
}

Expand All @@ -119,7 +61,7 @@ func (c *Client) GetSchemaByID(ctx context.Context, id int, includeDeleted bool)
ctx = sr.WithParams(ctx, sr.ShowDeleted)
}

schema, err := c.clientSR.SchemaByID(ctx, id)
schema, err := c.Client.SchemaByID(ctx, id)
if err != nil {
return sr.Schema{}, fmt.Errorf("schema %d not found by registry: %s", id, err)
}
Expand All @@ -132,12 +74,12 @@ func (c *Client) GetSubjectsBySchemaID(ctx context.Context, id int, includeDelet
ctx = sr.WithParams(ctx, sr.ShowDeleted)
}

return c.clientSR.SubjectsByID(ctx, id)
return c.Client.SubjectsByID(ctx, id)
}

// GetLatestSchemaVersionForSchemaIDAndSubject gets the latest version of a schema by its global identifier scoped to the provided subject.
func (c *Client) GetLatestSchemaVersionForSchemaIDAndSubject(ctx context.Context, id int, subject string) (versionID int, err error) {
svs, err := c.clientSR.SchemaVersionsByID(ctx, id)
svs, err := c.Client.SchemaVersionsByID(ctx, id)
if err != nil {
return -1, fmt.Errorf("failed to fetch schema versions for ID %d and subject %q", id, subject)
}
Expand Down Expand Up @@ -166,10 +108,10 @@ func (c *Client) GetSchemaBySubjectAndVersion(ctx context.Context, subject strin
var schema sr.SubjectSchema
var err error
if version != nil {
schema, err = c.clientSR.SchemaByVersion(ctx, subject, *version)
schema, err = c.Client.SchemaByVersion(ctx, subject, *version)
} else {
// Setting version to -1 will return the latest schema.
schema, err = c.clientSR.SchemaByVersion(ctx, subject, -1)
schema, err = c.Client.SchemaByVersion(ctx, subject, -1)
}
if err != nil {
return sr.SubjectSchema{}, err
Expand All @@ -180,7 +122,7 @@ func (c *Client) GetSchemaBySubjectAndVersion(ctx context.Context, subject strin

// GetMode returns the mode of the Schema Registry instance.
func (c *Client) GetMode(ctx context.Context) (string, error) {
res := c.clientSR.Mode(ctx)
res := c.Client.Mode(ctx)
// There will be one and only one element in the response.
if res[0].Err != nil {
return "", fmt.Errorf("request failed: %s", res[0].Err)
Expand All @@ -195,7 +137,7 @@ func (c *Client) GetSubjects(ctx context.Context, includeDeleted bool) ([]string
ctx = sr.WithParams(ctx, sr.ShowDeleted)
}

return c.clientSR.Subjects(ctx)
return c.Client.Subjects(ctx)
}

// GetVersionsForSubject returns the versions for a given subject.
Expand All @@ -204,12 +146,12 @@ func (c *Client) GetVersionsForSubject(ctx context.Context, subject string, incl
ctx = sr.WithParams(ctx, sr.ShowDeleted)
}

return c.clientSR.SubjectVersions(ctx, subject)
return c.Client.SubjectVersions(ctx, subject)
}

// CreateSchema creates a new schema for the given subject.
func (c *Client) CreateSchema(ctx context.Context, subject string, schema sr.Schema) (int, error) {
ss, err := c.clientSR.CreateSchema(ctx, subject, schema)
ss, err := c.Client.CreateSchema(ctx, subject, schema)
if err != nil {
return -1, fmt.Errorf("failed to create schema for subject %q: %s", subject, err)
}
Expand Down
2 changes: 1 addition & 1 deletion internal/impl/kafka/enterprise/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ func deleteSubject(t *testing.T, url string, subject string, hardDelete bool) {
deleteMode = franz_sr.HardDelete
}

_, err = client.DeleteSubject(context.Background(), subject, franz_sr.DeleteHow(deleteMode))
_, err = client.DeleteSubject(context.Background(), subject, deleteMode)
require.NoError(t, err)
}

Expand Down

0 comments on commit e6a224d

Please sign in to comment.