Skip to content

Commit

Permalink
workflow: Allow passing API into testing utility functions (#92)
Browse files Browse the repository at this point in the history
* workflow: Allow passing API into testing utility functions

* remove unused function

* clean up
  • Loading branch information
andrewwormald authored Feb 21, 2025
1 parent d38f95b commit 8294b24
Show file tree
Hide file tree
Showing 8 changed files with 102 additions and 23 deletions.
4 changes: 2 additions & 2 deletions _examples/connector/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ func Workflow(d Deps) *workflow.Workflow[GettingStarted, Status] {
builder.AddConnector(
"my-example-connector",
d.Connector,
func(ctx context.Context, w workflow.API[GettingStarted, Status], e *workflow.ConnectorEvent) error {
_, err := w.Trigger(
func(ctx context.Context, api workflow.API[GettingStarted, Status], e *workflow.ConnectorEvent) error {
_, err := api.Trigger(
ctx,
e.ForeignID,
StatusStarted,
Expand Down
13 changes: 9 additions & 4 deletions adapters/adaptertest/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,15 @@ func RunConnectorTest(t *testing.T, maker func(seedEvents []workflow.ConnectorEv
builder.AddConnector(
"tester",
constructor,
func(ctx context.Context, w workflow.API[User, SyncStatus], e *workflow.ConnectorEvent) error {
_, err := w.Trigger(ctx, e.ForeignID, SyncStatusStarted, workflow.WithInitialValue[User, SyncStatus](&User{
UID: e.ForeignID,
}))
func(ctx context.Context, api workflow.API[User, SyncStatus], e *workflow.ConnectorEvent) error {
_, err := api.Trigger(
ctx,
e.ForeignID,
SyncStatusStarted,
workflow.WithInitialValue[User, SyncStatus](&User{
UID: e.ForeignID,
}),
)
if err != nil {
return err
}
Expand Down
12 changes: 5 additions & 7 deletions adapters/jlog/jlog_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package jlog_test
import (
"bytes"
"context"
"strings"
"testing"

"github.com/luno/jettison/errors"
Expand Down Expand Up @@ -34,11 +35,8 @@ func TestError(t *testing.T) {
testErr := errors.New("test error")
logger.Error(ctx, testErr)

expected := `E 00:00:00.000 g/l/w/a/jlog/jlog.go:23: error(s)
test error
- github.com/luno/workflow/adapters/jlog/jlog_test.go:34 TestError
- testing/testing.go:1689 tRunner
- runtime/asm_arm64.s:1222 goexit
`
require.Equal(t, expected, buf.String())
s := buf.String()
require.True(t, strings.Contains(s, "E 00:00:00.000 g/l/w/a/jlog/jlog.go:23: error(s)"))
require.True(t, strings.Contains(s, "test error"))
require.True(t, strings.Contains(s, "github.com/luno/workflow/adapters/jlog/jlog_test.go"))
}
2 changes: 1 addition & 1 deletion connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ type ConnectorConsumer interface {
Close() error
}

type ConnectorFunc[Type any, Status StatusType] func(ctx context.Context, w API[Type, Status], e *ConnectorEvent) error
type ConnectorFunc[Type any, Status StatusType] func(ctx context.Context, api API[Type, Status], e *ConnectorEvent) error

type connectorConfig[Type any, Status StatusType] struct {
name string
Expand Down
2 changes: 1 addition & 1 deletion state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func TestInternalState(t *testing.T) {
b.AddConnector(
"consume-other-stream",
memstreamer.NewConnector(nil),
func(ctx context.Context, w workflow.API[string, status], e *workflow.ConnectorEvent) error {
func(ctx context.Context, api workflow.API[string, status], e *workflow.ConnectorEvent) error {
return nil
},
).WithOptions(
Expand Down
28 changes: 24 additions & 4 deletions testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (

func TriggerCallbackOn[Type any, Status StatusType, Payload any](
t testing.TB,
w *Workflow[Type, Status],
api API[Type, Status],
foreignID, runID string,
waitForStatus Status,
p Payload,
Expand All @@ -22,6 +22,11 @@ func TriggerCallbackOn[Type any, Status StatusType, Payload any](
panic("TriggerCallbackOn can only be used for testing")
}

w, ok := api.(*Workflow[Type, Status])
if !ok {
panic("*workflow.Workflow required for testing utility functions")
}

_ = waitFor(t, w, foreignID, func(r *Record) (bool, error) {
return r.Status == int(waitForStatus), nil
})
Expand All @@ -35,14 +40,19 @@ func TriggerCallbackOn[Type any, Status StatusType, Payload any](

func AwaitTimeoutInsert[Type any, Status StatusType](
t testing.TB,
w *Workflow[Type, Status],
api API[Type, Status],
foreignID, runID string,
waitFor Status,
) {
if t == nil {
panic("AwaitTimeoutInsert can only be used for testing")
}

w, ok := api.(*Workflow[Type, Status])
if !ok {
panic("*workflow.Workflow required for testing utility functions")
}

var found bool
for !found {
if w.ctx.Err() != nil {
Expand Down Expand Up @@ -73,7 +83,7 @@ func AwaitTimeoutInsert[Type any, Status StatusType](

func Require[Type any, Status StatusType](
t testing.TB,
w *Workflow[Type, Status],
api API[Type, Status],
foreignID string,
waitForStatus Status,
expected Type,
Expand All @@ -82,6 +92,11 @@ func Require[Type any, Status StatusType](
panic("Require can only be used for testing")
}

w, ok := api.(*Workflow[Type, Status])
if !ok {
panic("*workflow.Workflow required for testing utility functions")
}

if !w.statusGraph.IsValid(int(waitForStatus)) {
t.Error(
fmt.Sprintf(`Status provided is not configured for workflow: "%v" (Workflow: %v)`, waitForStatus, w.Name()),
Expand Down Expand Up @@ -114,14 +129,19 @@ func Require[Type any, Status StatusType](

func WaitFor[Type any, Status StatusType](
t testing.TB,
w *Workflow[Type, Status],
api API[Type, Status],
foreignID string,
fn func(r *Run[Type, Status]) (bool, error),
) {
if t == nil {
panic("WaitFor can only be used for testing")
}

w, ok := api.(*Workflow[Type, Status])
if !ok {
panic("*workflow.Workflow required for testing utility functions")
}

waitFor(t, w, foreignID, func(r *Record) (bool, error) {
run, err := buildRun[Type, Status](w.recordStore.Store, r)
require.Nil(t, err)
Expand Down
60 changes: 58 additions & 2 deletions testing_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package workflow_test
import (
"context"
"encoding/json"
"io"
"strings"
"testing"
"time"
Expand All @@ -24,6 +25,15 @@ func TestTriggerCallbackOn_validation(t *testing.T) {
}, "Not providing a testing.T or testing.B should panic")
})

t.Run("TriggerCallbackOn must be provided with a workflow.Workflow", func(t *testing.T) {
require.PanicsWithValue(t,
"*workflow.Workflow required for testing utility functions",
func() {
api := &apiImpl[string, status]{}
workflow.Require(t, api, "", StatusEnd, "")
}, "Not providing a workflow using a TestingRecordStore implemented record store should panic")
})

t.Run("TriggerCallbackOn must be provided with a workflow that is using a record store that implements TestingRecordStore", func(t *testing.T) {
require.PanicsWithValue(t,
"TestingRecordStore implementation for record store dependency required",
Expand All @@ -33,13 +43,13 @@ func TestTriggerCallbackOn_validation(t *testing.T) {
return r.Cancel(ctx)
}, StatusEnd)

wf := b.Build(
w := b.Build(
memstreamer.New(),
nil,
memrolescheduler.New(),
)

workflow.Require(t, wf, "", StatusEnd, "")
workflow.TriggerCallbackOn(t, w, "", "", StatusEnd, "")
}, "Not providing a workflow using a TestingRecordStore implemented record store should panic")
})
}
Expand Down Expand Up @@ -71,6 +81,15 @@ func TestAwaitTimeoutInsert_validation(t *testing.T) {
workflow.Require(t, wf, "", StatusEnd, "")
}, "Not providing a workflow using a TestingRecordStore implemented record store should panic")
})

t.Run("AwaitTimeoutInsert must be provided with a *workflow.Workflow", func(t *testing.T) {
require.PanicsWithValue(t,
"*workflow.Workflow required for testing utility functions",
func() {
api := &apiImpl[string, status]{}
workflow.AwaitTimeoutInsert(t, api, "", "", StatusEnd)
}, "Not providing a workflow using a TestingRecordStore implemented record store should panic")
})
}

func TestRequire(t *testing.T) {
Expand Down Expand Up @@ -124,6 +143,15 @@ func TestRequire_validation(t *testing.T) {
workflow.Require(t, wf, "", StatusEnd, "")
}, "Not providing a workflow using a TestingRecordStore implemented record store should panic")
})

t.Run("Require must be provided with a *workflow.Workflow", func(t *testing.T) {
require.PanicsWithValue(t,
"*workflow.Workflow required for testing utility functions",
func() {
api := &apiImpl[string, status]{}
workflow.Require(t, api, "", StatusEnd, "")
}, "Not providing a workflow using a TestingRecordStore implemented record store should panic")
})
}

// testCustomMarshaler is for testing and implements custom and weird behaviour via the
Expand Down Expand Up @@ -160,3 +188,31 @@ func TestWaitFor(t *testing.T) {
return r.RunState == workflow.RunStateCompleted, nil
})
}

var _ workflow.API[string, status] = (*apiImpl[string, status])(nil)

type apiImpl[Type any, Status workflow.StatusType] struct{}

func (a apiImpl[Type, Status]) Name() string {
return "test"
}

func (a apiImpl[Type, Status]) Trigger(ctx context.Context, foreignID string, startingStatus Status, opts ...workflow.TriggerOption[Type, Status]) (runID string, err error) {
return "", nil
}

func (a apiImpl[Type, Status]) Schedule(foreignID string, startingStatus Status, spec string, opts ...workflow.ScheduleOption[Type, Status]) error {
return nil
}

func (a apiImpl[Type, Status]) Await(ctx context.Context, foreignID, runID string, status Status, opts ...workflow.AwaitOption) (*workflow.Run[Type, Status], error) {
return &workflow.Run[Type, Status]{}, nil
}

func (a apiImpl[Type, Status]) Callback(ctx context.Context, foreignID string, status Status, payload io.Reader) error {
return nil
}

func (a apiImpl[Type, Status]) Run(ctx context.Context) {}

func (a apiImpl[Type, Status]) Stop() {}
4 changes: 2 additions & 2 deletions workflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -519,8 +519,8 @@ func TestConnector(t *testing.T) {
buidler.AddConnector(
"my-test-connector",
connector,
func(ctx context.Context, w workflow.API[typeX, status], e *workflow.ConnectorEvent) error {
_, err := w.Trigger(ctx, e.ForeignID, StatusStart, workflow.WithInitialValue[typeX, status](&typeX{
func(ctx context.Context, api workflow.API[typeX, status], e *workflow.ConnectorEvent) error {
_, err := api.Trigger(ctx, e.ForeignID, StatusStart, workflow.WithInitialValue[typeX, status](&typeX{
Val: "trigger set value",
}))
if err != nil {
Expand Down

0 comments on commit 8294b24

Please sign in to comment.