diff --git a/client/amqp/amqp.go b/client/amqp/amqp.go index c77fd32..017986b 100644 --- a/client/amqp/amqp.go +++ b/client/amqp/amqp.go @@ -74,8 +74,8 @@ type ( ) var ( - _ orbital.Initiator = &Client{} - _ orbital.Responder = &Client{} + _ orbital.Initiator = &Client{} + _ orbital.AsyncResponder = &Client{} ) // WithBasicAuth tells the client to use SASL PLAIN with user/password. diff --git a/client/grpc/grpc.go b/client/rpc/rpc.go similarity index 99% rename from client/grpc/grpc.go rename to client/rpc/rpc.go index 46449e4..ecef75d 100644 --- a/client/grpc/grpc.go +++ b/client/rpc/rpc.go @@ -1,4 +1,4 @@ -package grpc +package rpc import ( "context" diff --git a/client/grpc/grpc_test.go b/client/rpc/rpc_test.go similarity index 85% rename from client/grpc/grpc_test.go rename to client/rpc/rpc_test.go index 8a33630..10b7a0b 100644 --- a/client/grpc/grpc_test.go +++ b/client/rpc/rpc_test.go @@ -1,4 +1,4 @@ -package grpc_test +package rpc_test import ( "context" @@ -17,7 +17,7 @@ import ( "google.golang.org/grpc/test/bufconn" "github.com/openkcm/orbital" - grpcclient "github.com/openkcm/orbital/client/grpc" + "github.com/openkcm/orbital/client/rpc" "github.com/openkcm/orbital/codec" orbitalv1 "github.com/openkcm/orbital/proto/orbital/v1" ) @@ -71,42 +71,42 @@ func TestNewClient(t *testing.T) { tests := []struct { name string conn *grpc.ClientConn - opts []grpcclient.ClientOption + opts []rpc.ClientOption wantErr error }{ { name: "NilConn", conn: nil, - wantErr: grpcclient.ErrNilConn, + wantErr: rpc.ErrNilConn, }, { name: "InvalidBufferSize", conn: conn, - opts: []grpcclient.ClientOption{grpcclient.WithBufferSize(-1)}, - wantErr: grpcclient.ErrInvalidBufferSize, + opts: []rpc.ClientOption{rpc.WithBufferSize(-1)}, + wantErr: rpc.ErrInvalidBufferSize, }, { name: "InvalidCallTimeout/zero", conn: conn, - opts: []grpcclient.ClientOption{grpcclient.WithCallTimeout(0)}, - wantErr: grpcclient.ErrInvalidCallTimeout, + opts: []rpc.ClientOption{rpc.WithCallTimeout(0)}, + wantErr: rpc.ErrInvalidCallTimeout, }, { name: "InvalidCallTimeout/negative", conn: conn, - opts: []grpcclient.ClientOption{grpcclient.WithCallTimeout(-1 * time.Second)}, - wantErr: grpcclient.ErrInvalidCallTimeout, + opts: []rpc.ClientOption{rpc.WithCallTimeout(-1 * time.Second)}, + wantErr: rpc.ErrInvalidCallTimeout, }, { name: "ValidWithZeroBuffer", conn: conn, - opts: []grpcclient.ClientOption{grpcclient.WithBufferSize(0)}, + opts: []rpc.ClientOption{rpc.WithBufferSize(0)}, }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - c, err := grpcclient.NewClient(tt.conn, tt.opts...) + c, err := rpc.NewClient(tt.conn, tt.opts...) if tt.wantErr != nil { assert.Nil(t, c) assert.ErrorIs(t, err, tt.wantErr) @@ -137,7 +137,7 @@ func TestSendTaskRequest(t *testing.T) { }, nil }) - client, err := grpcclient.NewClient(conn, grpcclient.WithCallTimeout(5*time.Second)) + client, err := rpc.NewClient(conn, rpc.WithCallTimeout(5*time.Second)) require.NoError(t, err) defer client.Close(t.Context()) @@ -174,7 +174,7 @@ func TestSendTaskRequest(t *testing.T) { return nil, status.Error(codes.Internal, "something broke") }) - client, err := grpcclient.NewClient(conn, grpcclient.WithCallTimeout(5*time.Second)) + client, err := rpc.NewClient(conn, rpc.WithCallTimeout(5*time.Second)) require.NoError(t, err) defer client.Close(t.Context()) @@ -200,7 +200,7 @@ func TestSendTaskRequest(t *testing.T) { }, nil }) - client, err := grpcclient.NewClient(conn, grpcclient.WithCallTimeout(5*time.Second)) + client, err := rpc.NewClient(conn, rpc.WithCallTimeout(5*time.Second)) require.NoError(t, err) defer client.Close(t.Context()) @@ -220,7 +220,7 @@ func TestSendTaskRequest(t *testing.T) { t.Run("ContextCancelled", func(t *testing.T) { conn := startServer(t, noopHandler) - client, err := grpcclient.NewClient(conn) + client, err := rpc.NewClient(conn) require.NoError(t, err) defer client.Close(t.Context()) @@ -241,7 +241,7 @@ func TestSendTaskRequest(t *testing.T) { }, nil }) - client, err := grpcclient.NewClient(conn, grpcclient.WithCallTimeout(5*time.Second)) + client, err := rpc.NewClient(conn, rpc.WithCallTimeout(5*time.Second)) require.NoError(t, err) defer client.Close(t.Context()) @@ -274,7 +274,7 @@ func TestReceiveTaskResponse(t *testing.T) { t.Run("ContextCancelled", func(t *testing.T) { conn := startServer(t, noopHandler) - client, err := grpcclient.NewClient(conn) + client, err := rpc.NewClient(conn) require.NoError(t, err) defer client.Close(t.Context()) @@ -291,24 +291,24 @@ func TestClose(t *testing.T) { t.Run("ThenSendAndReceive", func(t *testing.T) { conn := startServer(t, noopHandler) - client, err := grpcclient.NewClient(conn) + client, err := rpc.NewClient(conn) require.NoError(t, err) err = client.Close(t.Context()) require.NoError(t, err) err = client.SendTaskRequest(t.Context(), orbital.TaskRequest{}) - assert.ErrorIs(t, err, grpcclient.ErrClientClosed) + assert.ErrorIs(t, err, rpc.ErrClientClosed) resp, err := client.ReceiveTaskResponse(t.Context()) - assert.ErrorIs(t, err, grpcclient.ErrClientClosed) + assert.ErrorIs(t, err, rpc.ErrClientClosed) assert.Equal(t, orbital.TaskResponse{}, resp) }) t.Run("Idempotent", func(t *testing.T) { conn := startServer(t, noopHandler) - client, err := grpcclient.NewClient(conn) + client, err := rpc.NewClient(conn) require.NoError(t, err) assert.NotPanics(t, func() { diff --git a/client/rpc/server.go b/client/rpc/server.go new file mode 100644 index 0000000..9bf8832 --- /dev/null +++ b/client/rpc/server.go @@ -0,0 +1,155 @@ +package rpc + +import ( + "context" + "errors" + "net" + "sync" + "sync/atomic" + + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + + slogctx "github.com/veqryn/slog-context" + + "github.com/openkcm/orbital" + "github.com/openkcm/orbital/codec" + orbitalv1 "github.com/openkcm/orbital/proto/orbital/v1" +) + +var _ orbital.SyncResponder = (*Server)(nil) + +var ( + // ErrNilListener is returned by NewServer when given a nil net.Listener. + ErrNilListener = errors.New("grpc server: listener cannot be nil") + // ErrServerAlreadyRan is returned by Run if the server has already been started. + ErrServerAlreadyRan = errors.New("grpc server: Run already called") +) + +type ( + // ServerOption configures a Server. + ServerOption func(*serverConfig) error + + serverConfig struct { + grpcServerOpts []grpc.ServerOption + } + + // Server implements the orbital TaskService gRPC server and the + // SyncResponder interface. It receives task requests over gRPC, + // processes them via the TaskRequestHandler provided to Run, + // and returns responses synchronously. + Server struct { + orbitalv1.UnimplementedTaskServiceServer + + config serverConfig + lis net.Listener + grpcServer *grpc.Server + handler orbital.TaskRequestHandler + ran atomic.Bool + stopOnce sync.Once + } +) + +// NewServer creates a gRPC Server bound to the given listener. +func NewServer(lis net.Listener, opts ...ServerOption) (*Server, error) { + if lis == nil { + return nil, ErrNilListener + } + + cfg := serverConfig{} + for _, opt := range opts { + if err := opt(&cfg); err != nil { + return nil, err + } + } + + return &Server{ + config: cfg, + lis: lis, + }, nil +} + +// WithServerOptions appends gRPC server options (interceptors, TLS +// credentials, etc.) used when the underlying grpc.Server is created. +func WithServerOptions(opts ...grpc.ServerOption) ServerOption { + return func(cfg *serverConfig) error { + cfg.grpcServerOpts = append(cfg.grpcServerOpts, opts...) + return nil + } +} + +// Run registers the TaskService, starts serving, and blocks until +// Close is called or ctx is cancelled. Run may only be called once. +func (s *Server) Run(ctx context.Context, handler orbital.TaskRequestHandler) error { + if !s.ran.CompareAndSwap(false, true) { + return ErrServerAlreadyRan + } + + s.handler = handler + s.grpcServer = grpc.NewServer(s.config.grpcServerOpts...) + orbitalv1.RegisterTaskServiceServer(s.grpcServer, s) + + go func() { + <-ctx.Done() + s.stop() + }() + + slogctx.Info(ctx, "grpc server starting", "address", s.lis.Addr().String()) + return s.grpcServer.Serve(s.lis) +} + +// Close gracefully stops the gRPC server. If ctx expires before graceful +// shutdown completes, it force-stops. +func (s *Server) Close(ctx context.Context) error { + if s.grpcServer == nil { + return nil + } + + stopped := make(chan struct{}) + go func() { + s.stop() + close(stopped) + }() + + select { + case <-stopped: + case <-ctx.Done(): + s.grpcServer.Stop() + } + return nil +} + +// SendTaskRequest implements orbitalv1.TaskServiceServer. +func (s *Server) SendTaskRequest(ctx context.Context, pReq *orbitalv1.TaskRequest) (*orbitalv1.TaskResponse, error) { + req, err := codec.FromProtoToTaskRequest(pReq) + if err != nil { + return nil, status.Errorf(codes.InvalidArgument, "invalid task request: %v", err) + } + + resp, err := s.handler(ctx, req) + if err != nil { + return nil, mapProcessError(err) + } + + return codec.FromTaskResponseToProto(resp), nil +} + +func mapProcessError(err error) error { + switch { + case errors.Is(err, orbital.ErrSignatureInvalid): + return status.Error(codes.Unauthenticated, err.Error()) + case errors.Is(err, orbital.ErrResponseSigning): + return status.Error(codes.Internal, err.Error()) + default: + return status.Error(codes.Internal, err.Error()) + } +} + +func (s *Server) stop() { + s.stopOnce.Do(func() { + if s.grpcServer != nil { + s.grpcServer.GracefulStop() + } + }) +} diff --git a/client/rpc/server_test.go b/client/rpc/server_test.go new file mode 100644 index 0000000..1225d71 --- /dev/null +++ b/client/rpc/server_test.go @@ -0,0 +1,268 @@ +package rpc_test + +import ( + "context" + "net" + "testing" + "time" + + "github.com/google/uuid" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/status" + "google.golang.org/grpc/test/bufconn" + + "github.com/openkcm/orbital" + "github.com/openkcm/orbital/client/rpc" + "github.com/openkcm/orbital/codec" + orbitalv1 "github.com/openkcm/orbital/proto/orbital/v1" +) + +const serverBufSize = 1024 * 1024 + +func startTestServer(t *testing.T, handler orbital.TaskRequestHandler) (orbitalv1.TaskServiceClient, *rpc.Server) { + t.Helper() + + lis := bufconn.Listen(serverBufSize) + + srv, err := rpc.NewServer(lis) + require.NoError(t, err) + + ctx, cancel := context.WithCancel(t.Context()) + t.Cleanup(cancel) + + errCh := make(chan error, 1) + go func() { + errCh <- srv.Run(ctx, handler) + }() + + conn, err := grpc.NewClient( + "passthrough:///bufconn", + grpc.WithContextDialer(func(ctx context.Context, _ string) (net.Conn, error) { + return lis.DialContext(ctx) + }), + grpc.WithTransportCredentials(insecure.NewCredentials()), + ) + require.NoError(t, err) + t.Cleanup(func() { conn.Close() }) + + return orbitalv1.NewTaskServiceClient(conn), srv +} + +func TestNewServer(t *testing.T) { + lis := bufconn.Listen(serverBufSize) + defer lis.Close() + + t.Run("NilListener", func(t *testing.T) { + srv, err := rpc.NewServer(nil) + assert.Nil(t, srv) + assert.ErrorIs(t, err, rpc.ErrNilListener) + }) + + t.Run("Defaults", func(t *testing.T) { + srv, err := rpc.NewServer(lis) + require.NoError(t, err) + assert.NotNil(t, srv) + }) + + t.Run("WithServerOptions", func(t *testing.T) { + srv, err := rpc.NewServer(lis, rpc.WithServerOptions(grpc.MaxRecvMsgSize(1024))) + require.NoError(t, err) + assert.NotNil(t, srv) + }) +} + +func TestServer_SendTaskRequest(t *testing.T) { + t.Run("RoundTrip", func(t *testing.T) { + taskID := uuid.New() + etag := uuid.New().String() + + handler := func(_ context.Context, req orbital.TaskRequest) (orbital.TaskResponse, error) { + return orbital.TaskResponse{ + TaskID: req.TaskID, + Type: req.Type, + ExternalID: req.ExternalID, + WorkingState: []byte(`{"progress":50}`), + ETag: req.ETag, + Status: string(orbital.TaskStatusDone), + ReconcileAfterSec: 120, + MetaData: orbital.MetaData{"key": "value"}, + }, nil + } + + stub, _ := startTestServer(t, handler) + + protoReq := codec.FromTaskRequestToProto(orbital.TaskRequest{ + TaskID: taskID, + Type: "test-type", + ExternalID: "ext-123", + Data: []byte(`{"input":"data"}`), + WorkingState: []byte(`{"state":"initial"}`), + ETag: etag, + MetaData: orbital.MetaData{"reqKey": "reqVal"}, + TaskCreatedAt: time.Now().Unix(), + TaskLastReconciledAt: time.Now().Unix(), + }) + + resp, err := stub.SendTaskRequest(t.Context(), protoReq) + require.NoError(t, err) + + assert.Equal(t, taskID.String(), resp.GetTaskId()) + assert.Equal(t, "test-type", resp.GetType()) + assert.Equal(t, "ext-123", resp.GetExternalId()) + assert.Equal(t, etag, resp.GetEtag()) + assert.Equal(t, orbitalv1.TaskStatus_DONE, resp.GetStatus()) + assert.Equal(t, []byte(`{"progress":50}`), resp.GetWorkingState()) + assert.Equal(t, uint64(120), resp.GetReconcileAfterSec()) + assert.Equal(t, map[string]string{"key": "value"}, resp.GetMetaData()) + }) + + t.Run("InvalidTaskID", func(t *testing.T) { + handler := func(_ context.Context, _ orbital.TaskRequest) (orbital.TaskResponse, error) { + return orbital.TaskResponse{}, nil + } + + stub, _ := startTestServer(t, handler) + + resp, err := stub.SendTaskRequest(t.Context(), &orbitalv1.TaskRequest{ + TaskId: "not-a-uuid", + Type: "test", + }) + require.Error(t, err) + assert.Nil(t, resp) + assert.Equal(t, codes.InvalidArgument, status.Code(err)) + }) + + t.Run("ErrSignatureInvalid", func(t *testing.T) { + handler := func(_ context.Context, _ orbital.TaskRequest) (orbital.TaskResponse, error) { + return orbital.TaskResponse{}, orbital.ErrSignatureInvalid + } + + stub, _ := startTestServer(t, handler) + + protoReq := codec.FromTaskRequestToProto(orbital.TaskRequest{ + TaskID: uuid.New(), + Type: "test", + ETag: "etag-1", + }) + + resp, err := stub.SendTaskRequest(t.Context(), protoReq) + require.Error(t, err) + assert.Nil(t, resp) + assert.Equal(t, codes.Unauthenticated, status.Code(err)) + }) + + t.Run("ErrResponseSigning", func(t *testing.T) { + handler := func(_ context.Context, _ orbital.TaskRequest) (orbital.TaskResponse, error) { + return orbital.TaskResponse{}, orbital.ErrResponseSigning + } + + stub, _ := startTestServer(t, handler) + + protoReq := codec.FromTaskRequestToProto(orbital.TaskRequest{ + TaskID: uuid.New(), + Type: "test", + ETag: "etag-1", + }) + + resp, err := stub.SendTaskRequest(t.Context(), protoReq) + require.Error(t, err) + assert.Nil(t, resp) + assert.Equal(t, codes.Internal, status.Code(err)) + }) + + t.Run("FailedStatusIsNormalResponse", func(t *testing.T) { + handler := func(_ context.Context, req orbital.TaskRequest) (orbital.TaskResponse, error) { + return orbital.TaskResponse{ + TaskID: req.TaskID, + Status: string(orbital.TaskStatusFailed), + ErrorMessage: "something went wrong", + ETag: req.ETag, + }, nil + } + + stub, _ := startTestServer(t, handler) + + protoReq := codec.FromTaskRequestToProto(orbital.TaskRequest{ + TaskID: uuid.New(), + Type: "test", + ETag: "etag-1", + }) + + resp, err := stub.SendTaskRequest(t.Context(), protoReq) + require.NoError(t, err) + assert.Equal(t, orbitalv1.TaskStatus_FAILED, resp.GetStatus()) + assert.Equal(t, "something went wrong", resp.GetErrorMessage()) + }) +} + +func TestServer_Close(t *testing.T) { + handler := func(_ context.Context, req orbital.TaskRequest) (orbital.TaskResponse, error) { + return orbital.TaskResponse{TaskID: req.TaskID}, nil + } + + t.Run("GracefulClose", func(t *testing.T) { + stub, srv := startTestServer(t, handler) + + protoReq := codec.FromTaskRequestToProto(orbital.TaskRequest{ + TaskID: uuid.New(), + Type: "test", + ETag: "etag-1", + }) + _, err := stub.SendTaskRequest(t.Context(), protoReq) + require.NoError(t, err) + + require.NoError(t, srv.Close(t.Context())) + + _, err = stub.SendTaskRequest(t.Context(), protoReq) + assert.Error(t, err) + }) + + t.Run("CloseBeforeRun", func(t *testing.T) { + lis := bufconn.Listen(serverBufSize) + defer lis.Close() + + srv, err := rpc.NewServer(lis) + require.NoError(t, err) + + assert.NotPanics(t, func() { + require.NoError(t, srv.Close(t.Context())) + }) + }) + + t.Run("ForceCloseOnContextExpiry", func(t *testing.T) { + _, srv := startTestServer(t, handler) + + ctx, cancel := context.WithCancel(t.Context()) + cancel() + + assert.NotPanics(t, func() { + require.NoError(t, srv.Close(ctx)) + }) + }) + + t.Run("ContextCancellationStopsServer", func(t *testing.T) { + lis := bufconn.Listen(serverBufSize) + + srv, err := rpc.NewServer(lis) + require.NoError(t, err) + + ctx, cancel := context.WithCancel(t.Context()) + + errCh := make(chan error, 1) + go func() { + errCh <- srv.Run(ctx, handler) + }() + + cancel() + + select { + case <-errCh: + case <-time.After(5 * time.Second): + t.Fatal("server did not stop after context cancellation") + } + }) +} diff --git a/examples/operator/main.go b/examples/operator/amqp/main.go similarity index 91% rename from examples/operator/main.go rename to examples/operator/amqp/main.go index 3319c85..28829b4 100644 --- a/examples/operator/main.go +++ b/examples/operator/amqp/main.go @@ -10,7 +10,6 @@ import ( "github.com/openkcm/orbital" "github.com/openkcm/orbital/client/amqp" "github.com/openkcm/orbital/codec" - "github.com/openkcm/orbital/runner/async" ) // This example uses RabbitMQ as an AMQP message broker. @@ -31,20 +30,19 @@ func main() { handleErr("initializing responder", err) defer client.Close(ctx) - // Initialize runner - runner, err := async.New(client) - handleErr("initializing runner", err) - // Initialize an orbital operator that uses the responder - operator, err := orbital.NewOperator(orbital.TargetOperator{Runner: runner}) + operator, err := orbital.NewOperator(orbital.TargetOperator{Client: client}) handleErr("initializing operator", err) // Register a handler for the "example" task type err = operator.RegisterHandler("example", handlerExample) handleErr("registering handler", err) - // Start the operator to listen for task requests and respond - operator.ListenAndRespond(ctx) + // ListenAndRespond blocks (like http.ListenAndServe), so run it + // in a goroutine since this example also sends a request below. + go func() { + log.Fatal(operator.ListenAndRespond(ctx)) + }() // Send a task request to the operator via the AMQP message broker and wait for the response sendAndReceive(ctx) diff --git a/examples/operator/rpc/main.go b/examples/operator/rpc/main.go new file mode 100644 index 0000000..061cae2 --- /dev/null +++ b/examples/operator/rpc/main.go @@ -0,0 +1,52 @@ +package main + +import ( + "context" + "log" + "net" + "time" + + "github.com/openkcm/orbital" + "github.com/openkcm/orbital/client/rpc" +) + +func main() { + ctx := context.Background() + + lis, err := (&net.ListenConfig{}).Listen(ctx, "tcp", "localhost:50051") + handleErr("creating listener", err) + + srv, err := rpc.NewServer(lis) + handleErr("creating grpc server", err) + + operator, err := orbital.NewOperator(orbital.TargetOperator{Client: srv}) + handleErr("initializing operator", err) + + err = operator.RegisterHandler("example", handlerExample) + handleErr("registering handler", err) + + // ListenAndRespond blocks (like http.ListenAndServe). + log.Fatal(operator.ListenAndRespond(ctx)) +} + +func handlerExample(_ context.Context, req orbital.HandlerRequest, resp *orbital.HandlerResponse) { + log.Printf("Received task: ID=%s Type=%s Data=%s", + req.TaskID, req.TaskType, string(req.TaskData)) + + workingState, err := resp.WorkingState() + if err != nil { + resp.Fail("invalid working state") + return + } + + workingState.Set("processed", true) + workingState.Set("processedAt", time.Now().Format(time.RFC3339)) + + resp.Complete() +} + +func handleErr(msg string, err error) { + if err != nil { + log.Fatalf("%s: %s\n", msg, err) + } +} diff --git a/integration/helper_test.go b/integration/helper_test.go index bd34a60..ecae09d 100644 --- a/integration/helper_test.go +++ b/integration/helper_test.go @@ -21,7 +21,6 @@ import ( "github.com/openkcm/orbital" "github.com/openkcm/orbital/client/amqp" "github.com/openkcm/orbital/codec" - "github.com/openkcm/orbital/runner/async" "github.com/openkcm/orbital/store/sql" ) @@ -238,15 +237,10 @@ func createAndStartManager(ctx context.Context, t *testing.T, store *sql.SQL, co } // createAndStartOperator creates and starts an operator instance. -func createAndStartOperator(ctx context.Context, t *testing.T, client orbital.Responder, config operatorConfig) error { +func createAndStartOperator(ctx context.Context, t *testing.T, client orbital.AsyncResponder, config operatorConfig) error { t.Helper() - runner, err := async.New(client) - if err != nil { - return err - } - - operator, err := orbital.NewOperator(orbital.TargetOperator{Runner: runner}) + operator, err := orbital.NewOperator(orbital.TargetOperator{Client: client}) if err != nil { return fmt.Errorf("failed to create operator: %w", err) } @@ -275,7 +269,9 @@ func addHandlerAndListen(ctx context.Context, t *testing.T, config operatorConfi } } - go operator.ListenAndRespond(ctx) + go func() { + assert.ErrorIs(t, operator.ListenAndRespond(ctx), context.Canceled) + }() return nil } diff --git a/integration/signing_test.go b/integration/signing_test.go index f4f6691..ccb6a1e 100644 --- a/integration/signing_test.go +++ b/integration/signing_test.go @@ -20,7 +20,6 @@ import ( "github.com/stretchr/testify/require" "github.com/openkcm/orbital" - "github.com/openkcm/orbital/runner/async" ) const ( @@ -421,11 +420,12 @@ func execSigningReconciliation(t *testing.T, env *testEnvironment, initiatorHand }, } - runner, err := async.New(operatorClient) - require.NoError(t, err) - - targetOperator := orbital.TargetOperator{Runner: runner, Verifier: responderHandler, Signer: responderHandler, MustCheckSignature: true} - err = createAndStartOperatorWithTarget(ctxCancel, t, targetOperator, operatorConfig) + err = createAndStartOperatorWithTarget(ctxCancel, t, orbital.TargetOperator{ + Client: operatorClient, + Verifier: responderHandler, + Signer: responderHandler, + MustCheckSignature: true, + }, operatorConfig) require.NoError(t, err) job, err := createTestJob(ctx, t, manager, "test-job", []byte("job-data")) diff --git a/operator.go b/operator.go index 60e859f..410159e 100644 --- a/operator.go +++ b/operator.go @@ -10,67 +10,92 @@ import ( ) type ( - // Runner drives a transport: it decides where task requests come from - // and where task responses go. Run is fire-and-forget: implementations - // spawn their own goroutines and return; ctx cancellation signals - // shutdown. - Runner interface { - Run(ctx context.Context, process ProcessFunc) + // TaskRequestHandler processes a single inbound TaskRequest and returns a TaskResponse. + // It performs signature verification, handler dispatch, and response signing. + TaskRequestHandler func(ctx context.Context, req TaskRequest) (TaskResponse, error) + + // Option is a function that modifies the config parameter of the Operator. + Option func(*config) error + config struct { + bufferSize int + numberOfWorkers int } - // ProcessFunc is the shared pipeline handed to a Runner. It performs - // signature verification, handler dispatch, and response signing. - // Sentinel errors (ErrSignatureInvalid, ErrResponseSigning) let sync - // transports map to transport-level failures; unknown task types stay - // in-band as a FAILED TaskResponse with a nil error. - ProcessFunc func(ctx context.Context, req TaskRequest) (TaskResponse, error) + handlerRegistry struct { + mu sync.RWMutex + r map[string]HandlerFunc + } // Operator handles task requests and responses. Operator struct { target TargetOperator handlerRegistry handlerRegistry - } - - handlerRegistry struct { - mu sync.RWMutex - r map[string]HandlerFunc + requests chan TaskRequest + numberOfWorkers int } ) var ( - ErrOperatorInvalidConfig = errors.New("invalid operator configuration") - ErrHandlerNil = errors.New("handler cannot be nil") - ErrRunnerNil = errors.New("runner cannot be nil") - ErrUnknownTaskType = errors.New("unknown task type") - - // ErrSignatureInvalid is returned by Process when signature verification - // fails. Sync transports should translate this to a transport-level - // auth failure (e.g. gRPC codes.Unauthenticated). - ErrSignatureInvalid = errors.New("task request signature invalid") - - // ErrResponseSigning is returned by Process when the response signer - // fails. Sync transports should translate this to a transport-level - // internal failure (e.g. gRPC codes.Internal). - ErrResponseSigning = errors.New("failed to sign task response") + ErrOperatorInvalidConfig = errors.New("invalid operator configuration") + ErrHandlerNil = errors.New("handler cannot be nil") + ErrBufferSizeNegative = errors.New("buffer size cannot be negative") + ErrNumberOfWorkersNotPositive = errors.New("number of workers must be greater than 0") + ErrUnknownTaskType = errors.New("unknown task type") + ErrUnsupportedResponder = errors.New("unsupported responder type") + ErrSignatureInvalid = errors.New("task request signature invalid") + ErrResponseSigning = errors.New("failed to sign task response") ) -// NewOperator creates a new Operator from a TargetOperator configuration. -// The Runner is required; a nil Runner yields ErrOperatorInvalidConfig. -// If MustCheckSignature is set, Verifier must be non-nil. -func NewOperator(target TargetOperator) (*Operator, error) { - if target.Runner == nil { - return nil, fmt.Errorf("%w: %w", ErrOperatorInvalidConfig, ErrRunnerNil) - } +// NewOperator creates a new Operator instance with the given TargetOperator and options. +func NewOperator(target TargetOperator, opts ...Option) (*Operator, error) { if target.MustCheckSignature && target.Verifier == nil { return nil, ErrOperatorInvalidConfig } + c := config{ + bufferSize: 100, + numberOfWorkers: 10, + } + + for _, opt := range opts { + err := opt(&c) + if err != nil { + return nil, err + } + } + return &Operator{ target: target, handlerRegistry: handlerRegistry{r: make(map[string]HandlerFunc)}, + requests: make(chan TaskRequest, c.bufferSize), + numberOfWorkers: c.numberOfWorkers, }, nil } +// WithBufferSize sets the buffer size for the requests channel. +// It returns an error if the size is negative. +func WithBufferSize(size int) Option { + return func(c *config) error { + if size < 0 { + return ErrBufferSizeNegative + } + c.bufferSize = size + return nil + } +} + +// WithNumberOfWorkers sets the number of workers for processing requests. +// It returns an error if the number is not positive. +func WithNumberOfWorkers(num int) Option { + return func(c *config) error { + if num <= 0 { + return ErrNumberOfWorkersNotPositive + } + c.numberOfWorkers = num + return nil + } +} + // RegisterHandler registers a handler for a specific task type. // It returns an error if the handler is nil. func (o *Operator) RegisterHandler(taskType string, h HandlerFunc) error { @@ -83,25 +108,23 @@ func (o *Operator) RegisterHandler(taskType string, h HandlerFunc) error { return nil } -// ListenAndRespond starts the configured Runner. It is fire-and-forget: the -// Runner spawns its own goroutines and this call returns. Shutdown is -// signalled via ctx cancellation. -func (o *Operator) ListenAndRespond(ctx context.Context) { - o.target.Runner.Run(ctx, o.Process) +// ListenAndRespond starts listening for task requests and responding to them. +// It blocks until ctx is cancelled or the transport exits. +func (o *Operator) ListenAndRespond(ctx context.Context) error { + switch r := o.target.Client.(type) { + case SyncResponder: + return r.Run(ctx, o.process) + case AsyncResponder: + o.startWorkers(ctx, r) + o.startListening(ctx, r) + + return ctx.Err() + default: + return ErrUnsupportedResponder + } } -// Process is the shared pipeline: signature verify → handler dispatch → -// response signing. It is exposed so that custom Runner implementations -// (sync or async) can drive the Operator directly. -// -// Return semantics: -// - A TaskResponse with Status=FAILED (unknown task type or handler-reported -// failure) is a *successful* pipeline run and is returned with nil error. -// Transports should deliver it normally. -// - A non-nil error (ErrSignatureInvalid, ErrResponseSigning) is a pipeline -// failure. Async transports typically drop; sync transports map to a -// transport-level error. -func (o *Operator) Process(ctx context.Context, req TaskRequest) (TaskResponse, error) { +func (o *Operator) process(ctx context.Context, req TaskRequest) (TaskResponse, error) { logCtx := slogctx.With(ctx, "externalId", req.ExternalID, "taskId", req.TaskID, @@ -165,3 +188,55 @@ func (o *Operator) createSignature(ctx context.Context, resp TaskResponse) (Sign } return Signature{}, nil } + +func (o *Operator) startListening(ctx context.Context, r AsyncResponder) { + for { + select { + case <-ctx.Done(): + return + default: + } + + req, err := r.ReceiveTaskRequest(ctx) + if err != nil { + slogctx.Error(ctx, "failed to receive task request", "error", err) + continue + } + + select { + case <-ctx.Done(): + return + case o.requests <- req: + } + } +} + +func (o *Operator) startWorkers(ctx context.Context, r AsyncResponder) { + for range o.numberOfWorkers { + go o.worker(ctx, r) + } +} + +func (o *Operator) worker(ctx context.Context, r AsyncResponder) { + for { + select { + case <-ctx.Done(): + return + case req := <-o.requests: + resp, err := o.process(ctx, req) + if err != nil { + slogctx.Error(ctx, "failed to process task request", "error", err, "taskId", req.TaskID, "etag", req.ETag) + continue + } + + err = r.SendTaskResponse(ctx, resp) + if err != nil { + slogctx.Error(ctx, "failed to send task response", + "error", err, "taskId", resp.TaskID, "etag", resp.ETag) + continue + } + slogctx.Debug(ctx, "sent task response", + "taskId", resp.TaskID, "etag", resp.ETag) + } + } +} diff --git a/operator_test.go b/operator_test.go index 2a7de97..a83a8da 100644 --- a/operator_test.go +++ b/operator_test.go @@ -7,53 +7,63 @@ import ( "github.com/google/uuid" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "github.com/openkcm/orbital" "github.com/openkcm/orbital/respondertest" - "github.com/openkcm/orbital/runner/async" ) -func newAsyncTarget(t *testing.T, client orbital.Responder) orbital.TargetOperator { - t.Helper() - runner, err := async.New(client) - assert.NoError(t, err) - return orbital.TargetOperator{Runner: runner} -} - func TestOperator_NewOperator(t *testing.T) { - t.Run("should return error if runner is nil", func(t *testing.T) { - actResult, actErr := orbital.NewOperator(orbital.TargetOperator{}) + t.Run("should return error if MustCheckSignature is true but Verifier is nil", func(t *testing.T) { + client := respondertest.NewResponder() + actResult, actErr := orbital.NewOperator(orbital.TargetOperator{ + Client: client, + MustCheckSignature: true, + }) assert.Nil(t, actResult) assert.ErrorIs(t, actErr, orbital.ErrOperatorInvalidConfig) - assert.ErrorIs(t, actErr, orbital.ErrRunnerNil) }) - t.Run("should return error if signature checking is enabled and the verifier is nil set for the target", func(t *testing.T) { - runner, err := async.New(respondertest.NewResponder()) + t.Run("valid", func(t *testing.T) { + client := respondertest.NewResponder() + o, err := orbital.NewOperator(orbital.TargetOperator{Client: client}) assert.NoError(t, err) + assert.NotNil(t, o) + }) - invalidTarget := orbital.TargetOperator{ - Runner: runner, - Verifier: nil, - MustCheckSignature: true, - } + t.Run("WithBufferSize", func(t *testing.T) { + client := respondertest.NewResponder() + o, err := orbital.NewOperator(orbital.TargetOperator{Client: client}, orbital.WithBufferSize(50)) + assert.NoError(t, err) + assert.NotNil(t, o) + }) - actResult, actErr := orbital.NewOperator(invalidTarget) - assert.Nil(t, actResult) - assert.ErrorIs(t, actErr, orbital.ErrOperatorInvalidConfig) + t.Run("WithBufferSize negative", func(t *testing.T) { + client := respondertest.NewResponder() + o, err := orbital.NewOperator(orbital.TargetOperator{Client: client}, orbital.WithBufferSize(-1)) + assert.Nil(t, o) + assert.ErrorIs(t, err, orbital.ErrBufferSizeNegative) }) - t.Run("valid", func(t *testing.T) { - o, err := orbital.NewOperator(newAsyncTarget(t, respondertest.NewResponder())) + t.Run("WithNumberOfWorkers", func(t *testing.T) { + client := respondertest.NewResponder() + o, err := orbital.NewOperator(orbital.TargetOperator{Client: client}, orbital.WithNumberOfWorkers(5)) assert.NoError(t, err) assert.NotNil(t, o) }) + + t.Run("WithNumberOfWorkers zero", func(t *testing.T) { + client := respondertest.NewResponder() + o, err := orbital.NewOperator(orbital.TargetOperator{Client: client}, orbital.WithNumberOfWorkers(0)) + assert.Nil(t, o) + assert.ErrorIs(t, err, orbital.ErrNumberOfWorkersNotPositive) + }) } func TestRegisterHandler(t *testing.T) { - o, err := orbital.NewOperator(newAsyncTarget(t, respondertest.NewResponder())) - assert.NoError(t, err) - assert.NotNil(t, o) + client := respondertest.NewResponder() + o, err := orbital.NewOperator(orbital.TargetOperator{Client: client}) + require.NoError(t, err) h := func(_ context.Context, _ orbital.HandlerRequest, _ *orbital.HandlerResponse) {} @@ -93,12 +103,12 @@ func TestRegisterHandler(t *testing.T) { func TestListenAndRespond_UnknownTaskType(t *testing.T) { client := respondertest.NewResponder() + o, err := orbital.NewOperator(orbital.TargetOperator{Client: client}) + require.NoError(t, err) - o, err := orbital.NewOperator(newAsyncTarget(t, client)) - assert.NoError(t, err) - assert.NotNil(t, o) - - o.ListenAndRespond(t.Context()) + ctx, cancel := context.WithCancel(t.Context()) + defer cancel() + go func() { assert.ErrorIs(t, o.ListenAndRespond(ctx), context.Canceled) }() taskType := "unknown-task-type" @@ -149,12 +159,12 @@ func TestListenAndRespond(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { client := respondertest.NewResponder() + o, err := orbital.NewOperator(orbital.TargetOperator{Client: client}) + require.NoError(t, err) - o, err := orbital.NewOperator(newAsyncTarget(t, client)) - assert.NoError(t, err) - assert.NotNil(t, o) - - o.ListenAndRespond(t.Context()) + ctx, cancel := context.WithCancel(t.Context()) + defer cancel() + go func() { assert.ErrorIs(t, o.ListenAndRespond(ctx), context.Canceled) }() h := func(_ context.Context, req orbital.HandlerRequest, resp *orbital.HandlerResponse) { assert.Equal(t, taskReq.TaskID, req.TaskID) @@ -195,12 +205,12 @@ func TestListenAndRespond(t *testing.T) { func TestListenAndRespond_WorkingState(t *testing.T) { client := respondertest.NewResponder() + o, err := orbital.NewOperator(orbital.TargetOperator{Client: client}) + require.NoError(t, err) - o, err := orbital.NewOperator(newAsyncTarget(t, client)) - assert.NoError(t, err) - assert.NotNil(t, o) - - o.ListenAndRespond(t.Context()) + ctx, cancel := context.WithCancel(t.Context()) + defer cancel() + go func() { assert.ErrorIs(t, o.ListenAndRespond(ctx), context.Canceled) }() tests := []struct { name string @@ -317,20 +327,30 @@ func TestListenAndRespond_WorkingState(t *testing.T) { } } +func TestListenAndRespond_UnsupportedResponder(t *testing.T) { + o, err := orbital.NewOperator(orbital.TargetOperator{Client: &unsupportedResponder{}}) + require.NoError(t, err) + + err = o.ListenAndRespond(t.Context()) + assert.ErrorIs(t, err, orbital.ErrUnsupportedResponder) +} + +type unsupportedResponder struct{} + type mockResponder struct { FnReceiveTaskRequest func(ctx context.Context) (orbital.TaskRequest, error) FnSendTaskResponse func(ctx context.Context, response orbital.TaskResponse) error FnClose func(ctx context.Context) error } -var _ orbital.Responder = &mockResponder{} +var _ orbital.AsyncResponder = &mockResponder{} -// ReceiveTaskRequest implements orbital.Responder. +// ReceiveTaskRequest implements orbital.AsyncResponder. func (m *mockResponder) ReceiveTaskRequest(ctx context.Context) (orbital.TaskRequest, error) { return m.FnReceiveTaskRequest(ctx) } -// SendTaskResponse implements orbital.Responder. +// SendTaskResponse implements orbital.AsyncResponder. func (m *mockResponder) SendTaskResponse(ctx context.Context, response orbital.TaskResponse) error { return m.FnSendTaskResponse(ctx, response) } diff --git a/regression/regression_test.go b/regression/regression_test.go index 6ee0e3e..6f1aa06 100644 --- a/regression/regression_test.go +++ b/regression/regression_test.go @@ -48,7 +48,7 @@ func TestNewRegression(t *testing.T) { op, err := NewOperatorTracker(ctxCancel, env, operatorName) require.NoError(t, err) optTrackers = append(optTrackers, op) - go op.operator.ListenAndRespond(ctxCancel) + go func() { assert.ErrorIs(t, op.operator.ListenAndRespond(ctxCancel), context.Canceled) }() } defer func() { diff --git a/regression/tracker_test.go b/regression/tracker_test.go index 33b080c..2b22632 100644 --- a/regression/tracker_test.go +++ b/regression/tracker_test.go @@ -13,7 +13,6 @@ import ( "github.com/openkcm/orbital" "github.com/openkcm/orbital/client/amqp" - "github.com/openkcm/orbital/runner/async" ) // ManagerTracker tracks the state and statistics of a Manager instance @@ -175,12 +174,8 @@ func NewOperatorTracker(ctx context.Context, env *testEnvironment, name string) } tracker.client = client - runner, err := async.New(client) - if err != nil { - return tracker, fmt.Errorf("failed to create async runner: %w", err) - } - operator, err := orbital.NewOperator(orbital.TargetOperator{Runner: runner}) + operator, err := orbital.NewOperator(orbital.TargetOperator{Client: client}) if err != nil { return tracker, fmt.Errorf("failed to create operator: %w", err) } diff --git a/respondertest/respondertest.go b/respondertest/respondertest.go index 488dccb..45ea1f8 100644 --- a/respondertest/respondertest.go +++ b/respondertest/respondertest.go @@ -21,7 +21,7 @@ type ( } ) -var _ orbital.Responder = &Responder{} +var _ orbital.AsyncResponder = &Responder{} // NewResponder creates a new Responder instance. // It allows options to configure the buffer size of the input and output channel. diff --git a/runner/async/runner.go b/runner/async/runner.go deleted file mode 100644 index 735f6aa..0000000 --- a/runner/async/runner.go +++ /dev/null @@ -1,136 +0,0 @@ -package async - -import ( - "context" - "errors" - - slogctx "github.com/veqryn/slog-context" - - "github.com/openkcm/orbital" -) - -// Runner implements orbital.Runner on top of a half-duplex Responder -// (e.g. AMQP). It pulls requests from the Responder, fans out to a bounded -// worker pool, and pushes responses back on the same Responder. -type Runner struct { - client orbital.Responder - requests chan orbital.TaskRequest - numberOfWorkers int -} - -type ( - Option func(*config) error - config struct { - bufferSize int - numberOfWorkers int - } -) - -var ( - ErrBufferSizeNegative = errors.New("buffer size cannot be negative") - ErrNumberOfWorkersNotPositive = errors.New("number of workers must be greater than 0") - ErrResponderNil = errors.New("responder cannot be nil") -) - -// New creates a Runner backed by the given Responder. -// Defaults: buffer size 100, 10 workers. -func New(client orbital.Responder, opts ...Option) (*Runner, error) { - if client == nil { - return nil, ErrResponderNil - } - - c := config{ - bufferSize: 100, - numberOfWorkers: 10, - } - for _, opt := range opts { - if err := opt(&c); err != nil { - return nil, err - } - } - - return &Runner{ - client: client, - requests: make(chan orbital.TaskRequest, c.bufferSize), - numberOfWorkers: c.numberOfWorkers, - }, nil -} - -// WithBufferSize sets the buffer size for the requests channel. -// It returns an error if the size is negative. -func WithBufferSize(size int) Option { - return func(c *config) error { - if size < 0 { - return ErrBufferSizeNegative - } - c.bufferSize = size - return nil - } -} - -// WithNumberOfWorkers sets the number of workers for processing requests. -// It returns an error if the number is not positive. -func WithNumberOfWorkers(num int) Option { - return func(c *config) error { - if num <= 0 { - return ErrNumberOfWorkersNotPositive - } - c.numberOfWorkers = num - return nil - } -} - -// Run spawns one listener goroutine and numberOfWorkers worker goroutines, -// then returns. Cancellation of ctx stops all of them. -func (r *Runner) Run(ctx context.Context, process orbital.ProcessFunc) { - go r.startListening(ctx) - for range r.numberOfWorkers { - go r.worker(ctx, process) - } -} - -func (r *Runner) startListening(ctx context.Context) { - for { - select { - case <-ctx.Done(): - return - default: - } - - req, err := r.client.ReceiveTaskRequest(ctx) - if err != nil { - slogctx.Error(ctx, "failed to receive task request", "error", err) - continue - } - - select { - case <-ctx.Done(): - return - case r.requests <- req: - } - } -} - -func (r *Runner) worker(ctx context.Context, process orbital.ProcessFunc) { - for { - select { - case <-ctx.Done(): - return - case req := <-r.requests: - resp, err := process(ctx, req) - if err != nil { - slogctx.Error(ctx, "failed to process task request", "error", err, "taskId", req.TaskID, "etag", req.ETag) - continue - } - - err = r.client.SendTaskResponse(ctx, resp) - if err != nil { - slogctx.Error(ctx, "failed to send task response", - "error", err, "taskId", resp.TaskID, "etag", resp.ETag) - continue - } - slogctx.Debug(ctx, "sent task response", - "taskId", resp.TaskID, "etag", resp.ETag) - } - } -} diff --git a/runner/async/runner_test.go b/runner/async/runner_test.go deleted file mode 100644 index 8d0ad9b..0000000 --- a/runner/async/runner_test.go +++ /dev/null @@ -1,60 +0,0 @@ -package async_test - -import ( - "testing" - - "github.com/stretchr/testify/assert" - - "github.com/openkcm/orbital/respondertest" - "github.com/openkcm/orbital/runner/async" -) - -func TestNew(t *testing.T) { - client := respondertest.NewResponder() - - tests := []struct { - name string - opts []async.Option - expErr error - }{ - { - name: "negative buffer size", - opts: []async.Option{async.WithBufferSize(-1)}, - expErr: async.ErrBufferSizeNegative, - }, - { - name: "zero number of workers", - opts: []async.Option{async.WithNumberOfWorkers(0)}, - expErr: async.ErrNumberOfWorkersNotPositive, - }, - { - name: "without options", - opts: []async.Option{}, - }, - { - name: "with options", - opts: []async.Option{ - async.WithBufferSize(0), - async.WithNumberOfWorkers(1), - }, - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - r, err := async.New(client, tt.opts...) - if tt.expErr != nil { - assert.ErrorIs(t, err, tt.expErr) - return - } - assert.NoError(t, err) - assert.NotNil(t, r) - }) - } -} - -func TestNew_NilResponder(t *testing.T) { - r, err := async.New(nil) - assert.Nil(t, r) - assert.ErrorIs(t, err, async.ErrResponderNil) -} diff --git a/signing_test.go b/signing_test.go index 2c6dd47..f32ccb0 100644 --- a/signing_test.go +++ b/signing_test.go @@ -13,7 +13,6 @@ import ( "github.com/openkcm/orbital" "github.com/openkcm/orbital/respondertest" - "github.com/openkcm/orbital/runner/async" ) func TestManager_Signing(t *testing.T) { @@ -366,15 +365,16 @@ func TestOperator_Signing(t *testing.T) { return nil } - runner, err := async.New(client) - require.NoError(t, err) - - o, err := orbital.NewOperator(orbital.TargetOperator{Runner: runner, Verifier: mockVerifier, Signer: tt.respSigner}) + o, err := orbital.NewOperator(orbital.TargetOperator{ + Client: client, + Verifier: mockVerifier, + Signer: tt.respSigner, + }) assert.NoError(t, err) assert.NotNil(t, o) ctx := t.Context() - o.ListenAndRespond(ctx) + go func() { assert.ErrorIs(t, o.ListenAndRespond(ctx), context.Canceled) }() h := func(_ context.Context, req orbital.HandlerRequest, resp *orbital.HandlerResponse) { assert.Equal(t, taskReq.TaskID, req.TaskID) @@ -488,11 +488,9 @@ func TestOperator_Verification(t *testing.T) { actVerifyTaskRequestCalls.Store(0) client := respondertest.NewResponder() - runner, err := async.New(client) - require.NoError(t, err) o, err := orbital.NewOperator(orbital.TargetOperator{ - Runner: runner, + Client: client, Signer: respSigner, Verifier: tt.reqVerifier, MustCheckSignature: tt.mustCheckSignature, @@ -503,11 +501,10 @@ func TestOperator_Verification(t *testing.T) { assert.Nil(t, o) return } - assert.NoError(t, err) - assert.NotNil(t, o) + require.NoError(t, err) ctx := t.Context() - o.ListenAndRespond(ctx) + go func() { assert.ErrorIs(t, o.ListenAndRespond(ctx), context.Canceled) }() var actHandlerCalls atomic.Int32 actHandlerCallChan := make(chan struct{}) diff --git a/target.go b/target.go index 3799c46..4891229 100644 --- a/target.go +++ b/target.go @@ -55,23 +55,35 @@ type Initiator interface { Close(ctx context.Context) error } -// TargetOperator holds the runner and cryptographic implementation for responding +// Responder is a marker interface satisfied by any operator transport. +// Use a type switch to AsyncResponder or SyncResponder to access transport methods. +type Responder = any + +// AsyncResponder is a half-duplex messaging transport (e.g. AMQP, Solace). +// The operator pulls requests and pushes responses via a worker pool. +type AsyncResponder interface { + ReceiveTaskRequest(ctx context.Context) (TaskRequest, error) + SendTaskResponse(ctx context.Context, response TaskResponse) error + Close(ctx context.Context) error +} + +// SyncResponder is a request/response transport (e.g. gRPC). +// Run blocks, calling handler for each inbound request. +type SyncResponder interface { + Run(ctx context.Context, handler TaskRequestHandler) error + Close(ctx context.Context) error +} + +// TargetOperator holds the client and cryptographic implementation for responding // to tasks. It provides access to the Responder for communication, // Signer and Verifier for signing and verification operations. type TargetOperator struct { - Runner Runner + Client Responder Verifier TaskRequestVerifier Signer TaskResponseSigner MustCheckSignature bool } -// Responder defines the methods for receiving task requests and sending task responses. -type Responder interface { - ReceiveTaskRequest(ctx context.Context) (TaskRequest, error) - SendTaskResponse(ctx context.Context, response TaskResponse) error - Close(ctx context.Context) error -} - // Signature represents the metadata used for signing and verifying requests. // It typically contains cryptographic information such as signatures or tokens. type Signature MetaData