diff --git a/CHANGELOG.md b/CHANGELOG.md index 74f593f027..74deeaf4b8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,7 @@ All notable changes to this project will be documented in this file. - New `mysql_cdc` input supporting change data capture (CDC) from MySQL. (@rockwotj, @le-vlad) - Field `instance_id` added to `kafka`, `kafka_franz`, `ockam_kafka`, `redpanda`, `redpanda_common`, and `redpanda_migrator` inputs. (@rockwotj) +- Fields `rebalance_timeout`, `session_timeout` and `heartbeat_interval` added to the `kafka_franz`, `redpanda`, `redpanda_common`, `redpanda_migrator` and `ockam_kafka` inputs. (@rockwotj) ## 4.45.1 - 2025-01-17 diff --git a/go.mod b/go.mod index 73bed77273..e327845f46 100644 --- a/go.mod +++ b/go.mod @@ -127,7 +127,7 @@ require ( github.com/twmb/franz-go v1.18.0 github.com/twmb/franz-go/pkg/kadm v1.13.0 github.com/twmb/franz-go/pkg/kmsg v1.9.0 - github.com/twmb/franz-go/pkg/sr v1.2.0 + github.com/twmb/franz-go/pkg/sr v1.3.0 github.com/vmihailenco/msgpack/v5 v5.4.1 github.com/xdg-go/scram v1.1.2 github.com/xeipuuv/gojsonschema v1.2.0 diff --git a/go.sum b/go.sum index f34214bc14..bd0f0ba552 100644 --- a/go.sum +++ b/go.sum @@ -1889,8 +1889,8 @@ github.com/twmb/franz-go/pkg/kadm v1.13.0 h1:bJq4C2ZikUE2jh/wl9MtMTQ/kpmnBgVFh8X github.com/twmb/franz-go/pkg/kadm v1.13.0/go.mod h1:VMvpfjz/szpH9WB+vGM+rteTzVv0djyHFimci9qm2C0= github.com/twmb/franz-go/pkg/kmsg v1.9.0 h1:JojYUph2TKAau6SBtErXpXGC7E3gg4vGZMv9xFU/B6M= github.com/twmb/franz-go/pkg/kmsg v1.9.0/go.mod h1:CMbfazviCyY6HM0SXuG5t9vOwYDHRCSrJJyBAe5paqg= -github.com/twmb/franz-go/pkg/sr v1.2.0 h1:zYr0Ly7KLFfeCGaSr8teN6LvAVeYVrZoUsyyPHTYB+M= -github.com/twmb/franz-go/pkg/sr v1.2.0/go.mod h1:gpd2Xl5/prkj3gyugcL+rVzagjaxFqMgvKMYcUlrpDw= +github.com/twmb/franz-go/pkg/sr v1.3.0 h1:UlXpZ2suGgylzQBUb6Wn1jzqVShoPGzt7BbixznJ4qo= +github.com/twmb/franz-go/pkg/sr v1.3.0/go.mod h1:gpd2Xl5/prkj3gyugcL+rVzagjaxFqMgvKMYcUlrpDw= github.com/uptrace/bun v1.1.12 h1:sOjDVHxNTuM6dNGaba0wUuz7KvDE1BmNu9Gqs2gJSXQ= github.com/uptrace/bun v1.1.12/go.mod h1:NPG6JGULBeQ9IU6yHp7YGELRa5Agmd7ATZdz4tGZ6z0= github.com/uptrace/bun/dialect/pgdialect v1.1.12 h1:m/CM1UfOkoBTglGO5CUTKnIKKOApOYxkcP2qn0F9tJk= diff --git a/internal/impl/confluent/processor_schema_registry_decode_test.go b/internal/impl/confluent/processor_schema_registry_decode_test.go index dc4304afcf..03ea165128 100644 --- a/internal/impl/confluent/processor_schema_registry_decode_test.go +++ b/internal/impl/confluent/processor_schema_registry_decode_test.go @@ -73,10 +73,6 @@ basic_auth: require.NoError(t, err) e, err := newSchemaRegistryDecoderFromConfig(conf, service.MockResources()) - if e != nil { - assert.Equal(t, test.expectedBaseURL, e.client.SchemaRegistryBaseURL.String()) - } - if err == nil { _ = e.Close(context.Background()) } diff --git a/internal/impl/confluent/processor_schema_registry_encode_test.go b/internal/impl/confluent/processor_schema_registry_encode_test.go index 38402fbf26..11d2f20234 100644 --- a/internal/impl/confluent/processor_schema_registry_encode_test.go +++ b/internal/impl/confluent/processor_schema_registry_encode_test.go @@ -106,10 +106,6 @@ subject: foo require.NoError(t, err) e, err := newSchemaRegistryEncoderFromConfig(conf, service.MockResources()) - if e != nil { - assert.Equal(t, test.expectedBaseURL, e.client.SchemaRegistryBaseURL.String()) - } - if err == nil { _ = e.Close(context.Background()) } diff --git a/internal/impl/confluent/sr/client.go b/internal/impl/confluent/sr/client.go index b16d9ad393..b306e023a3 100644 --- a/internal/impl/confluent/sr/client.go +++ b/internal/impl/confluent/sr/client.go @@ -19,11 +19,9 @@ import ( "crypto/tls" "fmt" "io/fs" - "net" "net/http" "net/url" "slices" - "time" "github.com/twmb/franz-go/pkg/sr" @@ -32,26 +30,7 @@ import ( // Client is used to make requests to a schema registry. type Client struct { - SchemaRegistryBaseURL *url.URL - clientSR *sr.Client - requestSigner func(f fs.FS, req *http.Request) error - mgr *service.Resources -} - -type roundTripper struct { - reqSigner func(req *http.Request) error - *http.Transport -} - -func (rt *roundTripper) RoundTrip(req *http.Request) (*http.Response, error) { - // This is naughty, but it's probably fine... - // The `RoundTrip` docs state that "RoundTrip should not modify the request, except for consuming and closing the Request's Body." - // This is because the following code https://github.com/golang/go/blob/e25b913127ac8ba26c4ecc39288c7f8781f4ef5d/src/net/http/client.go#L246-L252 - // already tries to set the `Authorization` header if `req.URL.User` is already set, but `reqSigner` replicates the same functionality anyway. - if err := rt.reqSigner(req); err != nil { - return nil, err - } - return rt.Transport.RoundTrip(req) + Client *sr.Client } // NewClient creates a new schema registry client. @@ -61,55 +40,18 @@ func NewClient( tlsConf *tls.Config, mgr *service.Resources, ) (*Client, error) { - u, err := url.Parse(urlStr) + _, err := url.Parse(urlStr) if err != nil { return nil, fmt.Errorf("failed to parse url: %w", err) } - reqSignerWrapped := func(req *http.Request) error { return reqSigner(mgr.FS(), req) } - - // Timeout copied from https://github.com/twmb/franz-go/blob/cea7aa5d803781e5f0162187795482ba1990c729/pkg/sr/client.go#L73 - hClient := &http.Client{Timeout: 5 * time.Second} - if c, ok := http.DefaultTransport.(*http.Transport); ok { - cloned := c.Clone() - cloned.TLSClientConfig = tlsConf - hClient.Transport = &roundTripper{ - reqSigner: reqSignerWrapped, - Transport: cloned, - } - } else { - hClient.Transport = &roundTripper{ - reqSigner: reqSignerWrapped, - // Copied from https://github.com/twmb/franz-go/blob/cea7aa5d803781e5f0162187795482ba1990c729/pkg/sr/clientopt.go#L48-L68 - // TODO: Why are we setting `MaxIdleConnsPerHost: 100`? It's not set in `http.DefaultTransport`. - // Note: `http.DefaultMaxIdleConnsPerHost` is 2. - Transport: &http.Transport{ - Proxy: http.ProxyFromEnvironment, - DialContext: (&net.Dialer{ - Timeout: 30 * time.Second, - KeepAlive: 30 * time.Second, - }).DialContext, - TLSClientConfig: tlsConf, - ForceAttemptHTTP2: true, - MaxIdleConns: 100, - MaxIdleConnsPerHost: 100, - IdleConnTimeout: 90 * time.Second, - TLSHandshakeTimeout: 10 * time.Second, - ExpectContinueTimeout: 1 * time.Second, - }, - } - } - - clientSR, err := sr.NewClient(sr.HTTPClient(hClient), sr.URLs(urlStr)) + clientSR, err := sr.NewClient(sr.URLs(urlStr), sr.PreReq(func(req *http.Request) error { return reqSigner(mgr.FS(), req) })) if err != nil { return nil, fmt.Errorf("failed to init client: %w", err) } return &Client{ - clientSR: clientSR, - SchemaRegistryBaseURL: u, - requestSigner: reqSigner, - mgr: mgr, + Client: clientSR, }, nil } @@ -119,7 +61,7 @@ func (c *Client) GetSchemaByID(ctx context.Context, id int, includeDeleted bool) ctx = sr.WithParams(ctx, sr.ShowDeleted) } - schema, err := c.clientSR.SchemaByID(ctx, id) + schema, err := c.Client.SchemaByID(ctx, id) if err != nil { return sr.Schema{}, fmt.Errorf("schema %d not found by registry: %s", id, err) } @@ -132,12 +74,12 @@ func (c *Client) GetSubjectsBySchemaID(ctx context.Context, id int, includeDelet ctx = sr.WithParams(ctx, sr.ShowDeleted) } - return c.clientSR.SubjectsByID(ctx, id) + return c.Client.SubjectsByID(ctx, id) } // GetLatestSchemaVersionForSchemaIDAndSubject gets the latest version of a schema by its global identifier scoped to the provided subject. func (c *Client) GetLatestSchemaVersionForSchemaIDAndSubject(ctx context.Context, id int, subject string) (versionID int, err error) { - svs, err := c.clientSR.SchemaVersionsByID(ctx, id) + svs, err := c.Client.SchemaVersionsByID(ctx, id) if err != nil { return -1, fmt.Errorf("failed to fetch schema versions for ID %d and subject %q", id, subject) } @@ -166,10 +108,10 @@ func (c *Client) GetSchemaBySubjectAndVersion(ctx context.Context, subject strin var schema sr.SubjectSchema var err error if version != nil { - schema, err = c.clientSR.SchemaByVersion(ctx, subject, *version) + schema, err = c.Client.SchemaByVersion(ctx, subject, *version) } else { // Setting version to -1 will return the latest schema. - schema, err = c.clientSR.SchemaByVersion(ctx, subject, -1) + schema, err = c.Client.SchemaByVersion(ctx, subject, -1) } if err != nil { return sr.SubjectSchema{}, err @@ -180,7 +122,7 @@ func (c *Client) GetSchemaBySubjectAndVersion(ctx context.Context, subject strin // GetMode returns the mode of the Schema Registry instance. func (c *Client) GetMode(ctx context.Context) (string, error) { - res := c.clientSR.Mode(ctx) + res := c.Client.Mode(ctx) // There will be one and only one element in the response. if res[0].Err != nil { return "", fmt.Errorf("request failed: %s", res[0].Err) @@ -195,7 +137,7 @@ func (c *Client) GetSubjects(ctx context.Context, includeDeleted bool) ([]string ctx = sr.WithParams(ctx, sr.ShowDeleted) } - return c.clientSR.Subjects(ctx) + return c.Client.Subjects(ctx) } // GetVersionsForSubject returns the versions for a given subject. @@ -204,12 +146,12 @@ func (c *Client) GetVersionsForSubject(ctx context.Context, subject string, incl ctx = sr.WithParams(ctx, sr.ShowDeleted) } - return c.clientSR.SubjectVersions(ctx, subject) + return c.Client.SubjectVersions(ctx, subject) } // CreateSchema creates a new schema for the given subject. func (c *Client) CreateSchema(ctx context.Context, subject string, schema sr.Schema) (int, error) { - ss, err := c.clientSR.CreateSchema(ctx, subject, schema) + ss, err := c.Client.CreateSchema(ctx, subject, schema) if err != nil { return -1, fmt.Errorf("failed to create schema for subject %q: %s", subject, err) } diff --git a/internal/impl/kafka/enterprise/integration_test.go b/internal/impl/kafka/enterprise/integration_test.go index 96976fa28d..e56cc4dd90 100644 --- a/internal/impl/kafka/enterprise/integration_test.go +++ b/internal/impl/kafka/enterprise/integration_test.go @@ -282,7 +282,7 @@ func deleteSubject(t *testing.T, url string, subject string, hardDelete bool) { deleteMode = franz_sr.HardDelete } - _, err = client.DeleteSubject(context.Background(), subject, franz_sr.DeleteHow(deleteMode)) + _, err = client.DeleteSubject(context.Background(), subject, deleteMode) require.NoError(t, err) }