Skip to content

Commit

Permalink
mycdc: add license check
Browse files Browse the repository at this point in the history
  • Loading branch information
rockwotj committed Jan 17, 2025
1 parent da85d74 commit 45cb1bc
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 0 deletions.
6 changes: 6 additions & 0 deletions internal/impl/mysql/input_mysql_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ import (
"github.com/go-sql-driver/mysql"
"github.com/redpanda-data/benthos/v4/public/service"
"golang.org/x/sync/errgroup"

"github.com/redpanda-data/connect/v4/internal/license"
)

const (
Expand Down Expand Up @@ -117,6 +119,10 @@ type mysqlStreamInput struct {
}

func newMySQLStreamInput(conf *service.ParsedConfig, res *service.Resources) (s service.BatchInput, err error) {
if err := license.CheckRunningEnterprise(res); err != nil {
return nil, err
}

i := mysqlStreamInput{
logger: res.Logger(),
rawMessageEvents: make(chan MessageEvent),
Expand Down
7 changes: 7 additions & 0 deletions internal/impl/mysql/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ import (
"github.com/redpanda-data/benthos/v4/public/service/integration"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/redpanda-data/connect/v4/internal/license"
)

type testDB struct {
Expand Down Expand Up @@ -146,6 +148,7 @@ file:

streamOut, err := streamOutBuilder.Build()
require.NoError(t, err)
license.InjectTestService(streamOut.Resources())

go func() {
err = streamOut.Run(context.Background())
Expand Down Expand Up @@ -183,6 +186,7 @@ file:

streamOut, err = streamOutBuilder.Build()
require.NoError(t, err)
license.InjectTestService(streamOut.Resources())

time.Sleep(time.Second)
for i := 1001; i < 2001; i++ {
Expand Down Expand Up @@ -251,6 +255,7 @@ file:

streamOut, err := streamOutBuilder.Build()
require.NoError(t, err)
license.InjectTestService(streamOut.Resources())

go func() {
err = streamOut.Run(context.Background())
Expand Down Expand Up @@ -333,6 +338,7 @@ file:

streamOut, err := streamOutBuilder.Build()
require.NoError(t, err)
license.InjectTestService(streamOut.Resources())

go func() {
err = streamOut.Run(context.Background())
Expand Down Expand Up @@ -504,6 +510,7 @@ memory: {}

streamOut, err := streamOutBuilder.Build()
require.NoError(t, err)
license.InjectTestService(streamOut.Resources())

go func() {
err = streamOut.Run(context.Background())
Expand Down

0 comments on commit 45cb1bc

Please sign in to comment.