Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 15 additions & 30 deletions modules/pulsar/pulsar.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,17 +72,17 @@ func (c *Container) resolveURL(ctx context.Context, port nat.Port) (string, erro
// and add a waiting strategy for the functions worker
func WithFunctionsWorker() testcontainers.CustomizeRequestOption {
return func(req *testcontainers.GenericContainerRequest) error {
req.Cmd = []string{"/bin/bash", "-c", defaultPulsarCmd}
if err := testcontainers.WithCmd("/bin/bash", "-c", defaultPulsarCmd)(req); err != nil {
return err
}

ss := []wait.Strategy{
wait.ForLog("Function worker service started"),
}

ss = append(ss, defaultWaitStrategies.Strategies...)

req.WaitingFor = wait.ForAll(ss...)

return nil
return testcontainers.WithWaitStrategy(wait.ForAll(ss...))(req)
}
}

Expand All @@ -103,9 +103,7 @@ func (c *Container) WithLogConsumers(ctx context.Context, _ ...testcontainers.Lo
// WithPulsarEnv allows to use the native APIs and set each variable with PULSAR_PREFIX_ as prefix.
func WithPulsarEnv(configVar string, configValue string) testcontainers.CustomizeRequestOption {
return func(req *testcontainers.GenericContainerRequest) error {
req.Env["PULSAR_PREFIX_"+configVar] = configValue

return nil
return testcontainers.WithEnv(map[string]string{"PULSAR_PREFIX_" + configVar: configValue})(req)
}
}

Expand All @@ -124,9 +122,7 @@ func WithTransactions() testcontainers.CustomizeRequestOption {

ss = append(ss, defaultWaitStrategies.Strategies...)

req.WaitingFor = wait.ForAll(ss...)

return nil
return testcontainers.WithWaitStrategy(wait.ForAll(ss...))(req)
}
}

Expand All @@ -146,33 +142,22 @@ func RunContainer(ctx context.Context, opts ...testcontainers.ContainerCustomize
//
// - command: "/bin/bash -c /pulsar/bin/apply-config-from-env.py /pulsar/conf/standalone.conf && bin/pulsar standalone --no-functions-worker -nss"
func Run(ctx context.Context, img string, opts ...testcontainers.ContainerCustomizer) (*Container, error) {
req := testcontainers.ContainerRequest{
Image: img,
Env: map[string]string{},
ExposedPorts: []string{defaultPulsarPort, defaultPulsarAdminPort},
WaitingFor: defaultWaitStrategies,
Cmd: []string{"/bin/bash", "-c", strings.Join([]string{defaultPulsarCmd, defaultPulsarCmdWithoutFunctionsWorker}, " ")},
moduleOpts := []testcontainers.ContainerCustomizer{
testcontainers.WithExposedPorts(defaultPulsarPort, defaultPulsarAdminPort),
testcontainers.WithWaitStrategy(defaultWaitStrategies),
testcontainers.WithCmd("/bin/bash", "-c", strings.Join([]string{defaultPulsarCmd, defaultPulsarCmdWithoutFunctionsWorker}, " ")),
}

genericContainerReq := testcontainers.GenericContainerRequest{
ContainerRequest: req,
Started: true,
}

for _, opt := range opts {
if err := opt.Customize(&genericContainerReq); err != nil {
return nil, err
}
}
moduleOpts = append(moduleOpts, opts...)

container, err := testcontainers.GenericContainer(ctx, genericContainerReq)
ctr, err := testcontainers.Run(ctx, img, moduleOpts...)
var c *Container
if container != nil {
c = &Container{Container: container}
if ctr != nil {
c = &Container{Container: ctr}
}

if err != nil {
return c, fmt.Errorf("generic container: %w", err)
return c, fmt.Errorf("run pulsar: %w", err)
}

return c, nil
Expand Down
Loading