Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update Schema Registry client #3135

Merged
merged 2 commits into from
Jan 22, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ All notable changes to this project will be documented in this file.

- New `mysql_cdc` input supporting change data capture (CDC) from MySQL. (@rockwotj, @le-vlad)
- Field `instance_id` added to `kafka`, `kafka_franz`, `ockam_kafka`, `redpanda`, `redpanda_common`, and `redpanda_migrator` inputs. (@rockwotj)
- Fields `rebalance_timeout`, `session_timeout` and `heartbeat_interval` added to the `kafka_franz`, `redpanda`, `redpanda_common`, `redpanda_migrator` and `ockam_kafka` inputs. (@rockwotj)

## 4.45.1 - 2025-01-17

Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ require (
github.com/twmb/franz-go v1.18.0
github.com/twmb/franz-go/pkg/kadm v1.13.0
github.com/twmb/franz-go/pkg/kmsg v1.9.0
github.com/twmb/franz-go/pkg/sr v1.2.0
github.com/twmb/franz-go/pkg/sr v1.3.0
github.com/vmihailenco/msgpack/v5 v5.4.1
github.com/xdg-go/scram v1.1.2
github.com/xeipuuv/gojsonschema v1.2.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1889,8 +1889,8 @@ github.com/twmb/franz-go/pkg/kadm v1.13.0 h1:bJq4C2ZikUE2jh/wl9MtMTQ/kpmnBgVFh8X
github.com/twmb/franz-go/pkg/kadm v1.13.0/go.mod h1:VMvpfjz/szpH9WB+vGM+rteTzVv0djyHFimci9qm2C0=
github.com/twmb/franz-go/pkg/kmsg v1.9.0 h1:JojYUph2TKAau6SBtErXpXGC7E3gg4vGZMv9xFU/B6M=
github.com/twmb/franz-go/pkg/kmsg v1.9.0/go.mod h1:CMbfazviCyY6HM0SXuG5t9vOwYDHRCSrJJyBAe5paqg=
github.com/twmb/franz-go/pkg/sr v1.2.0 h1:zYr0Ly7KLFfeCGaSr8teN6LvAVeYVrZoUsyyPHTYB+M=
github.com/twmb/franz-go/pkg/sr v1.2.0/go.mod h1:gpd2Xl5/prkj3gyugcL+rVzagjaxFqMgvKMYcUlrpDw=
github.com/twmb/franz-go/pkg/sr v1.3.0 h1:UlXpZ2suGgylzQBUb6Wn1jzqVShoPGzt7BbixznJ4qo=
github.com/twmb/franz-go/pkg/sr v1.3.0/go.mod h1:gpd2Xl5/prkj3gyugcL+rVzagjaxFqMgvKMYcUlrpDw=
github.com/uptrace/bun v1.1.12 h1:sOjDVHxNTuM6dNGaba0wUuz7KvDE1BmNu9Gqs2gJSXQ=
github.com/uptrace/bun v1.1.12/go.mod h1:NPG6JGULBeQ9IU6yHp7YGELRa5Agmd7ATZdz4tGZ6z0=
github.com/uptrace/bun/dialect/pgdialect v1.1.12 h1:m/CM1UfOkoBTglGO5CUTKnIKKOApOYxkcP2qn0F9tJk=
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,10 +73,6 @@ basic_auth:
require.NoError(t, err)

e, err := newSchemaRegistryDecoderFromConfig(conf, service.MockResources())
if e != nil {
assert.Equal(t, test.expectedBaseURL, e.client.SchemaRegistryBaseURL.String())
}

if err == nil {
_ = e.Close(context.Background())
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,10 +106,6 @@ subject: foo
require.NoError(t, err)

e, err := newSchemaRegistryEncoderFromConfig(conf, service.MockResources())
if e != nil {
assert.Equal(t, test.expectedBaseURL, e.client.SchemaRegistryBaseURL.String())
}

if err == nil {
_ = e.Close(context.Background())
}
Expand Down
84 changes: 13 additions & 71 deletions internal/impl/confluent/sr/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,9 @@ import (
"crypto/tls"
"fmt"
"io/fs"
"net"
"net/http"
"net/url"
"slices"
"time"

"github.com/twmb/franz-go/pkg/sr"

Expand All @@ -32,26 +30,7 @@ import (

// Client is used to make requests to a schema registry.
type Client struct {
SchemaRegistryBaseURL *url.URL
clientSR *sr.Client
requestSigner func(f fs.FS, req *http.Request) error
mgr *service.Resources
}

type roundTripper struct {
reqSigner func(req *http.Request) error
*http.Transport
}

func (rt *roundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
// This is naughty, but it's probably fine...
// The `RoundTrip` docs state that "RoundTrip should not modify the request, except for consuming and closing the Request's Body."
// This is because the following code https://github.com/golang/go/blob/e25b913127ac8ba26c4ecc39288c7f8781f4ef5d/src/net/http/client.go#L246-L252
// already tries to set the `Authorization` header if `req.URL.User` is already set, but `reqSigner` replicates the same functionality anyway.
if err := rt.reqSigner(req); err != nil {
return nil, err
}
return rt.Transport.RoundTrip(req)
Client *sr.Client
}

// NewClient creates a new schema registry client.
Expand All @@ -61,55 +40,18 @@ func NewClient(
tlsConf *tls.Config,
mgr *service.Resources,
) (*Client, error) {
u, err := url.Parse(urlStr)
_, err := url.Parse(urlStr)
if err != nil {
return nil, fmt.Errorf("failed to parse url: %w", err)
}

reqSignerWrapped := func(req *http.Request) error { return reqSigner(mgr.FS(), req) }

// Timeout copied from https://github.com/twmb/franz-go/blob/cea7aa5d803781e5f0162187795482ba1990c729/pkg/sr/client.go#L73
hClient := &http.Client{Timeout: 5 * time.Second}
if c, ok := http.DefaultTransport.(*http.Transport); ok {
cloned := c.Clone()
cloned.TLSClientConfig = tlsConf
hClient.Transport = &roundTripper{
reqSigner: reqSignerWrapped,
Transport: cloned,
}
} else {
hClient.Transport = &roundTripper{
reqSigner: reqSignerWrapped,
// Copied from https://github.com/twmb/franz-go/blob/cea7aa5d803781e5f0162187795482ba1990c729/pkg/sr/clientopt.go#L48-L68
// TODO: Why are we setting `MaxIdleConnsPerHost: 100`? It's not set in `http.DefaultTransport`.
// Note: `http.DefaultMaxIdleConnsPerHost` is 2.
Transport: &http.Transport{
Proxy: http.ProxyFromEnvironment,
DialContext: (&net.Dialer{
Timeout: 30 * time.Second,
KeepAlive: 30 * time.Second,
}).DialContext,
TLSClientConfig: tlsConf,
ForceAttemptHTTP2: true,
MaxIdleConns: 100,
MaxIdleConnsPerHost: 100,
IdleConnTimeout: 90 * time.Second,
TLSHandshakeTimeout: 10 * time.Second,
ExpectContinueTimeout: 1 * time.Second,
},
}
}

clientSR, err := sr.NewClient(sr.HTTPClient(hClient), sr.URLs(urlStr))
clientSR, err := sr.NewClient(sr.URLs(urlStr), sr.PreReq(func(req *http.Request) error { return reqSigner(mgr.FS(), req) }))
if err != nil {
return nil, fmt.Errorf("failed to init client: %w", err)
}

return &Client{
clientSR: clientSR,
SchemaRegistryBaseURL: u,
requestSigner: reqSigner,
mgr: mgr,
Client: clientSR,
}, nil
}

Expand All @@ -119,7 +61,7 @@ func (c *Client) GetSchemaByID(ctx context.Context, id int, includeDeleted bool)
ctx = sr.WithParams(ctx, sr.ShowDeleted)
}

schema, err := c.clientSR.SchemaByID(ctx, id)
schema, err := c.Client.SchemaByID(ctx, id)
if err != nil {
return sr.Schema{}, fmt.Errorf("schema %d not found by registry: %s", id, err)
}
Expand All @@ -132,12 +74,12 @@ func (c *Client) GetSubjectsBySchemaID(ctx context.Context, id int, includeDelet
ctx = sr.WithParams(ctx, sr.ShowDeleted)
}

return c.clientSR.SubjectsByID(ctx, id)
return c.Client.SubjectsByID(ctx, id)
}

// GetLatestSchemaVersionForSchemaIDAndSubject gets the latest version of a schema by its global identifier scoped to the provided subject.
func (c *Client) GetLatestSchemaVersionForSchemaIDAndSubject(ctx context.Context, id int, subject string) (versionID int, err error) {
svs, err := c.clientSR.SchemaVersionsByID(ctx, id)
svs, err := c.Client.SchemaVersionsByID(ctx, id)
if err != nil {
return -1, fmt.Errorf("failed to fetch schema versions for ID %d and subject %q", id, subject)
}
Expand Down Expand Up @@ -166,10 +108,10 @@ func (c *Client) GetSchemaBySubjectAndVersion(ctx context.Context, subject strin
var schema sr.SubjectSchema
var err error
if version != nil {
schema, err = c.clientSR.SchemaByVersion(ctx, subject, *version)
schema, err = c.Client.SchemaByVersion(ctx, subject, *version)
} else {
// Setting version to -1 will return the latest schema.
schema, err = c.clientSR.SchemaByVersion(ctx, subject, -1)
schema, err = c.Client.SchemaByVersion(ctx, subject, -1)
}
if err != nil {
return sr.SubjectSchema{}, err
Expand All @@ -180,7 +122,7 @@ func (c *Client) GetSchemaBySubjectAndVersion(ctx context.Context, subject strin

// GetMode returns the mode of the Schema Registry instance.
func (c *Client) GetMode(ctx context.Context) (string, error) {
res := c.clientSR.Mode(ctx)
res := c.Client.Mode(ctx)
// There will be one and only one element in the response.
if res[0].Err != nil {
return "", fmt.Errorf("request failed: %s", res[0].Err)
Expand All @@ -195,7 +137,7 @@ func (c *Client) GetSubjects(ctx context.Context, includeDeleted bool) ([]string
ctx = sr.WithParams(ctx, sr.ShowDeleted)
}

return c.clientSR.Subjects(ctx)
return c.Client.Subjects(ctx)
}

// GetVersionsForSubject returns the versions for a given subject.
Expand All @@ -204,12 +146,12 @@ func (c *Client) GetVersionsForSubject(ctx context.Context, subject string, incl
ctx = sr.WithParams(ctx, sr.ShowDeleted)
}

return c.clientSR.SubjectVersions(ctx, subject)
return c.Client.SubjectVersions(ctx, subject)
}

// CreateSchema creates a new schema for the given subject.
func (c *Client) CreateSchema(ctx context.Context, subject string, schema sr.Schema) (int, error) {
ss, err := c.clientSR.CreateSchema(ctx, subject, schema)
ss, err := c.Client.CreateSchema(ctx, subject, schema)
if err != nil {
return -1, fmt.Errorf("failed to create schema for subject %q: %s", subject, err)
}
Expand Down
2 changes: 1 addition & 1 deletion internal/impl/kafka/enterprise/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ func deleteSubject(t *testing.T, url string, subject string, hardDelete bool) {
deleteMode = franz_sr.HardDelete
}

_, err = client.DeleteSubject(context.Background(), subject, franz_sr.DeleteHow(deleteMode))
_, err = client.DeleteSubject(context.Background(), subject, deleteMode)
require.NoError(t, err)
}

Expand Down
Loading