From 45cb1bcf3b498211efcc1a47f031ca5aec1dc238 Mon Sep 17 00:00:00 2001 From: Tyler Rockwood Date: Fri, 17 Jan 2025 19:03:08 +0000 Subject: [PATCH] mycdc: add license check --- internal/impl/mysql/input_mysql_stream.go | 6 ++++++ internal/impl/mysql/integration_test.go | 7 +++++++ 2 files changed, 13 insertions(+) diff --git a/internal/impl/mysql/input_mysql_stream.go b/internal/impl/mysql/input_mysql_stream.go index b2b1173dcf..9fc74dd6a3 100644 --- a/internal/impl/mysql/input_mysql_stream.go +++ b/internal/impl/mysql/input_mysql_stream.go @@ -27,6 +27,8 @@ import ( "github.com/go-sql-driver/mysql" "github.com/redpanda-data/benthos/v4/public/service" "golang.org/x/sync/errgroup" + + "github.com/redpanda-data/connect/v4/internal/license" ) const ( @@ -117,6 +119,10 @@ type mysqlStreamInput struct { } func newMySQLStreamInput(conf *service.ParsedConfig, res *service.Resources) (s service.BatchInput, err error) { + if err := license.CheckRunningEnterprise(res); err != nil { + return nil, err + } + i := mysqlStreamInput{ logger: res.Logger(), rawMessageEvents: make(chan MessageEvent), diff --git a/internal/impl/mysql/integration_test.go b/internal/impl/mysql/integration_test.go index 242ea60dca..443739d070 100644 --- a/internal/impl/mysql/integration_test.go +++ b/internal/impl/mysql/integration_test.go @@ -26,6 +26,8 @@ import ( "github.com/redpanda-data/benthos/v4/public/service/integration" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + + "github.com/redpanda-data/connect/v4/internal/license" ) type testDB struct { @@ -146,6 +148,7 @@ file: streamOut, err := streamOutBuilder.Build() require.NoError(t, err) + license.InjectTestService(streamOut.Resources()) go func() { err = streamOut.Run(context.Background()) @@ -183,6 +186,7 @@ file: streamOut, err = streamOutBuilder.Build() require.NoError(t, err) + license.InjectTestService(streamOut.Resources()) time.Sleep(time.Second) for i := 1001; i < 2001; i++ { @@ -251,6 +255,7 @@ file: streamOut, err := streamOutBuilder.Build() require.NoError(t, err) + license.InjectTestService(streamOut.Resources()) go func() { err = streamOut.Run(context.Background()) @@ -333,6 +338,7 @@ file: streamOut, err := streamOutBuilder.Build() require.NoError(t, err) + license.InjectTestService(streamOut.Resources()) go func() { err = streamOut.Run(context.Background()) @@ -504,6 +510,7 @@ memory: {} streamOut, err := streamOutBuilder.Build() require.NoError(t, err) + license.InjectTestService(streamOut.Resources()) go func() { err = streamOut.Run(context.Background())