Skip to content

Commit

Permalink
Merge pull request #3130 from redpanda-data/mycdc
Browse files Browse the repository at this point in the history
mycdc: add license check
  • Loading branch information
rockwotj authored Jan 17, 2025
2 parents da85d74 + 45cb1bc commit 6b9e0fd
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 0 deletions.
6 changes: 6 additions & 0 deletions internal/impl/mysql/input_mysql_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ import (
"github.com/go-sql-driver/mysql"
"github.com/redpanda-data/benthos/v4/public/service"
"golang.org/x/sync/errgroup"

"github.com/redpanda-data/connect/v4/internal/license"
)

const (
Expand Down Expand Up @@ -117,6 +119,10 @@ type mysqlStreamInput struct {
}

func newMySQLStreamInput(conf *service.ParsedConfig, res *service.Resources) (s service.BatchInput, err error) {
if err := license.CheckRunningEnterprise(res); err != nil {
return nil, err
}

i := mysqlStreamInput{
logger: res.Logger(),
rawMessageEvents: make(chan MessageEvent),
Expand Down
7 changes: 7 additions & 0 deletions internal/impl/mysql/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ import (
"github.com/redpanda-data/benthos/v4/public/service/integration"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/redpanda-data/connect/v4/internal/license"
)

type testDB struct {
Expand Down Expand Up @@ -146,6 +148,7 @@ file:

streamOut, err := streamOutBuilder.Build()
require.NoError(t, err)
license.InjectTestService(streamOut.Resources())

go func() {
err = streamOut.Run(context.Background())
Expand Down Expand Up @@ -183,6 +186,7 @@ file:

streamOut, err = streamOutBuilder.Build()
require.NoError(t, err)
license.InjectTestService(streamOut.Resources())

time.Sleep(time.Second)
for i := 1001; i < 2001; i++ {
Expand Down Expand Up @@ -251,6 +255,7 @@ file:

streamOut, err := streamOutBuilder.Build()
require.NoError(t, err)
license.InjectTestService(streamOut.Resources())

go func() {
err = streamOut.Run(context.Background())
Expand Down Expand Up @@ -333,6 +338,7 @@ file:

streamOut, err := streamOutBuilder.Build()
require.NoError(t, err)
license.InjectTestService(streamOut.Resources())

go func() {
err = streamOut.Run(context.Background())
Expand Down Expand Up @@ -504,6 +510,7 @@ memory: {}

streamOut, err := streamOutBuilder.Build()
require.NoError(t, err)
license.InjectTestService(streamOut.Resources())

go func() {
err = streamOut.Run(context.Background())
Expand Down

0 comments on commit 6b9e0fd

Please sign in to comment.