diff --git a/config/config.go b/config/config.go index 5ac34ad9d27..88b100d749a 100644 --- a/config/config.go +++ b/config/config.go @@ -71,6 +71,12 @@ type Config struct { // Revision contains the current revision of the config. Revision string + // Initial represents whether this is an "initial" config passed in by web + // server entrypoint code. If true, the robot will continue to report a state + // of initializing after applying this config. If false, the robot will + // report a state of running after applying this config. + Initial bool + // toCache stores the JSON marshalled version of the config to be cached. It should be a copy of // the config pulled from cloud with minor changes. // This version is kept because the config is changed as it moves through the system. @@ -101,6 +107,7 @@ type configData struct { LogConfig []logging.LoggerPatternConfig `json:"log,omitempty"` Revision string `json:"revision,omitempty"` MaintenanceConfig *MaintenanceConfig `json:"maintenance,omitempty"` + PackagePath string `json:"package_path,omitempty"` } // AppValidationStatus refers to the. @@ -308,6 +315,7 @@ func (c *Config) UnmarshalJSON(data []byte) error { c.LogConfig = conf.LogConfig c.Revision = conf.Revision c.MaintenanceConfig = conf.MaintenanceConfig + c.PackagePath = conf.PackagePath return nil } @@ -340,6 +348,7 @@ func (c Config) MarshalJSON() ([]byte, error) { LogConfig: c.LogConfig, Revision: c.Revision, MaintenanceConfig: c.MaintenanceConfig, + PackagePath: c.PackagePath, }) } diff --git a/robot/client/client.go b/robot/client/client.go index 32a641052a2..a3967f954bc 100644 --- a/robot/client/client.go +++ b/robot/client/client.go @@ -60,6 +60,12 @@ var ( // defaultResourcesTimeout is the default timeout for getting resources. defaultResourcesTimeout = 5 * time.Second + + // DoNotWaitForRunning should be set only in tests to allow connecting to + // still-initializing machines. Note that robot clients in production (not in + // a testing environment) will already allow connecting to still-initializing + // machines. + DoNotWaitForRunning = atomic.Bool{} ) // RobotClient satisfies the robot.Robot interface through a gRPC based @@ -288,6 +294,41 @@ func New(ctx context.Context, address string, clientLogger logging.ZapCompatible return nil, err } + // If running in a testing environment, wait for machine to report a state of + // running. We often establish connections in tests and expect resources to + // be immediately available once the web service has started; resources will + // not be available when the machine is still initializing. + // + // It is expected that golang SDK users will handle lack of resource + // availability due to the machine being in an initializing state themselves. + // + // Allow this behavior to be turned off in some tests that specifically want + // to examine the behavior of a machine in an initializing state through the + // use of a global variable. + if testing.Testing() && !DoNotWaitForRunning.Load() { + for { + if ctx.Err() != nil { + return nil, multierr.Combine(ctx.Err(), rc.conn.Close()) + } + + mStatus, err := rc.MachineStatus(ctx) + if err != nil { + // Allow for MachineStatus to not be injected/implemented in some tests. + if status.Code(err) == codes.Unimplemented { + break + } + // Ignore error from Close and just return original machine status error. + utils.UncheckedError(rc.conn.Close()) + return nil, err + } + + if mStatus.State == robot.StateRunning { + break + } + time.Sleep(50 * time.Millisecond) + } + } + // refresh once to hydrate the robot. if err := rc.Refresh(ctx); err != nil { return nil, multierr.Combine(err, rc.conn.Close()) @@ -1115,6 +1156,16 @@ func (rc *RobotClient) MachineStatus(ctx context.Context) (robot.MachineStatus, mStatus.Resources = append(mStatus.Resources, resStatus) } + switch resp.State { + case pb.GetMachineStatusResponse_STATE_UNSPECIFIED: + rc.logger.CError(ctx, "received unspecified machine state") + mStatus.State = robot.StateUnknown + case pb.GetMachineStatusResponse_STATE_INITIALIZING: + mStatus.State = robot.StateInitializing + case pb.GetMachineStatusResponse_STATE_RUNNING: + mStatus.State = robot.StateRunning + } + return mStatus, nil } diff --git a/robot/client/client_session_test.go b/robot/client/client_session_test.go index 8475f2201da..abae64269df 100644 --- a/robot/client/client_session_test.go +++ b/robot/client/client_session_test.go @@ -107,8 +107,11 @@ func TestClientSessionOptions(t *testing.T) { return &dummyEcho{Named: arbName.AsNamed()}, nil }, ResourceRPCAPIsFunc: func() []resource.RPCAPI { return nil }, - LoggerFunc: func() logging.Logger { return logger }, - SessMgr: sessMgr, + MachineStatusFunc: func(_ context.Context) (robot.MachineStatus, error) { + return robot.MachineStatus{State: robot.StateRunning}, nil + }, + LoggerFunc: func() logging.Logger { return logger }, + SessMgr: sessMgr, } svc := web.New(injectRobot, logger) @@ -129,13 +132,11 @@ func TestClientSessionOptions(t *testing.T) { Disable: true, }))) } - roboClient, err := client.New(ctx, addr, logger, opts...) - test.That(t, err, test.ShouldBeNil) injectRobot.Mu.Lock() injectRobot.MachineStatusFunc = func(ctx context.Context) (robot.MachineStatus, error) { session.SafetyMonitorResourceName(ctx, someTargetName1) - return robot.MachineStatus{}, nil + return robot.MachineStatus{State: robot.StateRunning}, nil } injectRobot.Mu.Unlock() @@ -179,6 +180,8 @@ func TestClientSessionOptions(t *testing.T) { } sessMgr.mu.Unlock() + roboClient, err := client.New(ctx, addr, logger, opts...) + test.That(t, err, test.ShouldBeNil) resp, err := roboClient.MachineStatus(nextCtx) test.That(t, err, test.ShouldBeNil) test.That(t, resp, test.ShouldNotBeNil) @@ -289,6 +292,9 @@ func TestClientSessionExpiration(t *testing.T) { ResourceByNameFunc: func(name resource.Name) (resource.Resource, error) { return &dummyEcho1, nil }, + MachineStatusFunc: func(_ context.Context) (robot.MachineStatus, error) { + return robot.MachineStatus{State: robot.StateRunning}, nil + }, ResourceRPCAPIsFunc: func() []resource.RPCAPI { return nil }, LoggerFunc: func() logging.Logger { return logger }, SessMgr: sessMgr, @@ -306,8 +312,6 @@ func TestClientSessionExpiration(t *testing.T) { Disable: true, }))) } - roboClient, err := client.New(ctx, addr, logger, opts...) - test.That(t, err, test.ShouldBeNil) injectRobot.Mu.Lock() var capSessID uuid.UUID @@ -317,7 +321,7 @@ func TestClientSessionExpiration(t *testing.T) { panic("expected session") } capSessID = sess.ID() - return robot.MachineStatus{}, nil + return robot.MachineStatus{State: robot.StateRunning}, nil } injectRobot.Mu.Unlock() @@ -372,6 +376,8 @@ func TestClientSessionExpiration(t *testing.T) { } sessMgr.mu.Unlock() + roboClient, err := client.New(ctx, addr, logger, opts...) + test.That(t, err, test.ShouldBeNil) resp, err := roboClient.MachineStatus(nextCtx) test.That(t, err, test.ShouldBeNil) test.That(t, resp, test.ShouldNotBeNil) @@ -480,8 +486,11 @@ func TestClientSessionResume(t *testing.T) { injectRobot := &inject.Robot{ ResourceNamesFunc: func() []resource.Name { return []resource.Name{} }, ResourceRPCAPIsFunc: func() []resource.RPCAPI { return nil }, - LoggerFunc: func() logging.Logger { return logger }, - SessMgr: sessMgr, + MachineStatusFunc: func(_ context.Context) (robot.MachineStatus, error) { + return robot.MachineStatus{State: robot.StateRunning}, nil + }, + LoggerFunc: func() logging.Logger { return logger }, + SessMgr: sessMgr, } svc := web.New(injectRobot, logger) @@ -496,8 +505,6 @@ func TestClientSessionResume(t *testing.T) { Disable: true, }))) } - roboClient, err := client.New(ctx, addr, logger, opts...) - test.That(t, err, test.ShouldBeNil) var capMu sync.Mutex var startCalled int @@ -536,10 +543,12 @@ func TestClientSessionResume(t *testing.T) { panic("expected session") } capSessID = sess.ID() - return robot.MachineStatus{}, nil + return robot.MachineStatus{State: robot.StateRunning}, nil } injectRobot.Mu.Unlock() + roboClient, err := client.New(ctx, addr, logger, opts...) + test.That(t, err, test.ShouldBeNil) resp, err := roboClient.MachineStatus(nextCtx) test.That(t, err, test.ShouldBeNil) test.That(t, resp, test.ShouldNotBeNil) diff --git a/robot/client/client_test.go b/robot/client/client_test.go index e9a850e7549..5258dcb2e7c 100644 --- a/robot/client/client_test.go +++ b/robot/client/client_test.go @@ -108,6 +108,12 @@ func (ms *mockRPCSubtypesUnimplemented) ResourceNames( return ms.ResourceNamesFunc(req) } +func (ms *mockRPCSubtypesUnimplemented) GetMachineStatus( + ctx context.Context, req *pb.GetMachineStatusRequest, +) (*pb.GetMachineStatusResponse, error) { + return &pb.GetMachineStatusResponse{State: pb.GetMachineStatusResponse_STATE_RUNNING}, nil +} + type mockRPCSubtypesImplemented struct { mockRPCSubtypesUnimplemented ResourceNamesFunc func(*pb.ResourceNamesRequest) (*pb.ResourceNamesResponse, error) @@ -125,6 +131,12 @@ func (ms *mockRPCSubtypesImplemented) ResourceNames( return ms.ResourceNamesFunc(req) } +func (ms *mockRPCSubtypesImplemented) GetMachineStatus( + ctx context.Context, req *pb.GetMachineStatusRequest, +) (*pb.GetMachineStatusResponse, error) { + return &pb.GetMachineStatusResponse{State: pb.GetMachineStatusResponse_STATE_RUNNING}, nil +} + var resourceFunc1 = func(*pb.ResourceNamesRequest) (*pb.ResourceNamesResponse, error) { board1 := board.Named("board1") rNames := []*commonpb.ResourceName{ @@ -309,11 +321,17 @@ func TestStatusClient(t *testing.T) { FrameSystemConfigFunc: frameSystemConfigFunc, ResourceNamesFunc: resourcesFunc, ResourceRPCAPIsFunc: func() []resource.RPCAPI { return nil }, + MachineStatusFunc: func(_ context.Context) (robot.MachineStatus, error) { + return robot.MachineStatus{State: robot.StateRunning}, nil + }, } injectRobot2 := &inject.Robot{ FrameSystemConfigFunc: frameSystemConfigFunc, ResourceNamesFunc: resourcesFunc, ResourceRPCAPIsFunc: func() []resource.RPCAPI { return nil }, + MachineStatusFunc: func(_ context.Context) (robot.MachineStatus, error) { + return robot.MachineStatus{State: robot.StateRunning}, nil + }, } pb.RegisterRobotServiceServer(gServer1, server.New(injectRobot1)) pb.RegisterRobotServiceServer(gServer2, server.New(injectRobot2)) @@ -669,6 +687,9 @@ func TestClientRefresh(t *testing.T) { callCountNames++ return emptyResources } + injectRobot.MachineStatusFunc = func(context.Context) (robot.MachineStatus, error) { + return robot.MachineStatus{State: robot.StateRunning}, nil + } mu.Unlock() start := time.Now() @@ -806,6 +827,9 @@ func TestClientDisconnect(t *testing.T) { injectRobot.ResourceNamesFunc = func() []resource.Name { return []resource.Name{arm.Named("arm1")} } + injectRobot.MachineStatusFunc = func(context.Context) (robot.MachineStatus, error) { + return robot.MachineStatus{State: robot.StateRunning}, nil + } // TODO(RSDK-882): will update this so that this is not necessary injectRobot.FrameSystemConfigFunc = func(ctx context.Context) (*framesystem.Config, error) { @@ -859,21 +883,24 @@ func TestClientUnaryDisconnectHandler(t *testing.T) { info *grpc.UnaryServerInfo, handler grpc.UnaryHandler, ) (interface{}, error) { + // Allow a single GetMachineStatus through; return `io.ErrClosedPipe` + // after that. if strings.HasSuffix(info.FullMethod, "RobotService/GetMachineStatus") { if unaryStatusCallReceived { return nil, status.Error(codes.Unknown, io.ErrClosedPipe.Error()) } unaryStatusCallReceived = true } - var resp interface{} - return resp, nil + return handler(ctx, req) }, ) gServer := grpc.NewServer(justOneUnaryStatusCall) injectRobot := &inject.Robot{} + injectRobot.ResourceRPCAPIsFunc = func() []resource.RPCAPI { return nil } + injectRobot.ResourceNamesFunc = func() []resource.Name { return nil } injectRobot.MachineStatusFunc = func(ctx context.Context) (robot.MachineStatus, error) { - return robot.MachineStatus{}, nil + return robot.MachineStatus{State: robot.StateRunning}, nil } pb.RegisterRobotServiceServer(gServer, server.New(injectRobot)) @@ -888,10 +915,11 @@ func TestClientUnaryDisconnectHandler(t *testing.T) { WithReconnectEvery(never), ) test.That(t, err, test.ShouldBeNil) + // Reset unaryStatusCallReceived to false, as `New` call above set it to + // true. + unaryStatusCallReceived = false t.Run("unary call to connected remote", func(t *testing.T) { - t.Helper() - client.connected.Store(false) _, err = client.MachineStatus(context.Background()) test.That(t, status.Code(err), test.ShouldEqual, codes.Unavailable) @@ -901,8 +929,6 @@ func TestClientUnaryDisconnectHandler(t *testing.T) { }) t.Run("unary call to disconnected remote", func(t *testing.T) { - t.Helper() - _, err = client.MachineStatus(context.Background()) test.That(t, err, test.ShouldBeNil) test.That(t, unaryStatusCallReceived, test.ShouldBeTrue) @@ -947,7 +973,7 @@ func TestClientStreamDisconnectHandler(t *testing.T) { injectRobot.ResourceRPCAPIsFunc = func() []resource.RPCAPI { return nil } injectRobot.ResourceNamesFunc = func() []resource.Name { return nil } injectRobot.MachineStatusFunc = func(ctx context.Context) (robot.MachineStatus, error) { - return robot.MachineStatus{}, nil + return robot.MachineStatus{State: robot.StateRunning}, nil } pb.RegisterRobotServiceServer(gServer, server.New(injectRobot)) @@ -1041,7 +1067,9 @@ func TestClientReconnect(t *testing.T) { injectRobot.ResourceNamesFunc = func() []resource.Name { return []resource.Name{arm.Named("arm1"), thing1Name} } - + injectRobot.MachineStatusFunc = func(ctx context.Context) (robot.MachineStatus, error) { + return robot.MachineStatus{State: robot.StateRunning}, nil + } // TODO(RSDK-882): will update this so that this is not necessary injectRobot.FrameSystemConfigFunc = func(ctx context.Context) (*framesystem.Config, error) { return &framesystem.Config{}, nil @@ -1137,6 +1165,9 @@ func TestClientRefreshNoReconfigure(t *testing.T) { pb.RegisterRobotServiceServer(gServer, server.New(injectRobot)) injectRobot.ResourceRPCAPIsFunc = func() []resource.RPCAPI { return nil } thing1Name := resource.NewName(someAPI, "thing1") + injectRobot.MachineStatusFunc = func(context.Context) (robot.MachineStatus, error) { + return robot.MachineStatus{State: robot.StateRunning}, nil + } var callCount int calledEnough := make(chan struct{}) @@ -1227,6 +1258,9 @@ func TestClientResources(t *testing.T) { injectRobot.ResourceRPCAPIsFunc = func() []resource.RPCAPI { return respWith } injectRobot.ResourceNamesFunc = func() []resource.Name { return finalResources } + injectRobot.MachineStatusFunc = func(_ context.Context) (robot.MachineStatus, error) { + return robot.MachineStatus{State: robot.StateRunning}, nil + } gServer := grpc.NewServer() pb.RegisterRobotServiceServer(gServer, server.New(injectRobot)) @@ -1283,6 +1317,9 @@ func TestClientDiscovery(t *testing.T) { injectRobot.ResourceNamesFunc = func() []resource.Name { return finalResources } + injectRobot.MachineStatusFunc = func(_ context.Context) (robot.MachineStatus, error) { + return robot.MachineStatus{State: robot.StateRunning}, nil + } q := resource.DiscoveryQuery{ API: movementsensor.Named("foo").API, Model: resource.DefaultModelFamily.WithModel("bar"), @@ -1359,10 +1396,17 @@ func TestClientConfig(t *testing.T) { workingRobot := &inject.Robot{ ResourceNamesFunc: resourcesFunc, ResourceRPCAPIsFunc: func() []resource.RPCAPI { return nil }, + MachineStatusFunc: func(_ context.Context) (robot.MachineStatus, error) { + return robot.MachineStatus{State: robot.StateRunning}, nil + }, } + failingRobot := &inject.Robot{ ResourceNamesFunc: resourcesFunc, ResourceRPCAPIsFunc: func() []resource.RPCAPI { return nil }, + MachineStatusFunc: func(_ context.Context) (robot.MachineStatus, error) { + return robot.MachineStatus{State: robot.StateRunning}, nil + }, } o1 := &spatialmath.R4AA{Theta: math.Pi / 2, RZ: 1} @@ -1506,6 +1550,9 @@ func TestForeignResource(t *testing.T) { injectRobot.ResourceRPCAPIsFunc = func() []resource.RPCAPI { return respWith } injectRobot.ResourceNamesFunc = func() []resource.Name { return respWithResources } + injectRobot.MachineStatusFunc = func(_ context.Context) (robot.MachineStatus, error) { + return robot.MachineStatus{State: robot.StateRunning}, nil + } // TODO(RSDK-882): will update this so that this is not necessary injectRobot.FrameSystemConfigFunc = func(ctx context.Context) (*framesystem.Config, error) { return &framesystem.Config{}, nil @@ -1555,6 +1602,9 @@ func TestNewRobotClientRefresh(t *testing.T) { callCount++ return emptyResources } + injectRobot.MachineStatusFunc = func(context.Context) (robot.MachineStatus, error) { + return robot.MachineStatus{State: robot.StateRunning}, nil + } pb.RegisterRobotServiceServer(gServer, server.New(injectRobot)) @@ -1608,6 +1658,9 @@ func TestClientStopAll(t *testing.T) { injectRobot1 := &inject.Robot{ ResourceNamesFunc: resourcesFunc, ResourceRPCAPIsFunc: func() []resource.RPCAPI { return nil }, + MachineStatusFunc: func(_ context.Context) (robot.MachineStatus, error) { + return robot.MachineStatus{State: robot.StateRunning}, nil + }, StopAllFunc: func(ctx context.Context, extra map[resource.Name]map[string]interface{}) error { stopAllCalled = true return nil @@ -1638,6 +1691,9 @@ func TestRemoteClientMatch(t *testing.T) { injectRobot1 := &inject.Robot{ ResourceNamesFunc: func() []resource.Name { return validResources }, ResourceRPCAPIsFunc: func() []resource.RPCAPI { return nil }, + MachineStatusFunc: func(ctx context.Context) (robot.MachineStatus, error) { + return robot.MachineStatus{State: robot.StateRunning}, nil + }, } // TODO(RSDK-882): will update this so that this is not necessary @@ -1688,6 +1744,9 @@ func TestRemoteClientDuplicate(t *testing.T) { injectRobot1 := &inject.Robot{ ResourceNamesFunc: func() []resource.Name { return validResources }, ResourceRPCAPIsFunc: func() []resource.RPCAPI { return nil }, + MachineStatusFunc: func(ctx context.Context) (robot.MachineStatus, error) { + return robot.MachineStatus{State: robot.StateRunning}, nil + }, } pb.RegisterRobotServiceServer(gServer1, server.New(injectRobot1)) @@ -1731,6 +1790,9 @@ func TestClientOperationIntercept(t *testing.T) { injectRobot := &inject.Robot{ ResourceNamesFunc: func() []resource.Name { return []resource.Name{} }, ResourceRPCAPIsFunc: func() []resource.RPCAPI { return nil }, + MachineStatusFunc: func(_ context.Context) (robot.MachineStatus, error) { + return robot.MachineStatus{State: robot.StateRunning}, nil + }, } gServer := grpc.NewServer() @@ -1756,7 +1818,7 @@ func TestClientOperationIntercept(t *testing.T) { receivedOpID, err := operation.GetOrCreateFromMetadata(meta) test.That(t, err, test.ShouldBeNil) test.That(t, receivedOpID.String(), test.ShouldEqual, fakeOp.ID.String()) - return robot.MachineStatus{}, nil + return robot.MachineStatus{State: robot.StateRunning}, nil } resp, err := client.MachineStatus(ctx) @@ -1775,6 +1837,9 @@ func TestGetUnknownResource(t *testing.T) { injectRobot := &inject.Robot{ ResourceNamesFunc: func() []resource.Name { return []resource.Name{arm.Named("myArm")} }, ResourceRPCAPIsFunc: func() []resource.RPCAPI { return nil }, + MachineStatusFunc: func(ctx context.Context) (robot.MachineStatus, error) { + return robot.MachineStatus{State: robot.StateRunning}, nil + }, } // TODO(RSDK-882): will update this so that this is not necessary @@ -1821,15 +1886,15 @@ func TestLoggingInterceptor(t *testing.T) { MachineStatusFunc: func(ctx context.Context) (robot.MachineStatus, error) { // If there is no debug information with the context, return no revision if !logging.IsDebugMode(ctx) && logging.GetName(ctx) == "" { - return robot.MachineStatus{}, nil + return robot.MachineStatus{State: robot.StateRunning}, nil } // If there is debug information with `oliver` with the context, return a revision of `oliver` if logging.IsDebugMode(ctx) && logging.GetName(ctx) == "oliver" { - return robot.MachineStatus{Config: config.Revision{Revision: "oliver"}}, nil + return robot.MachineStatus{Config: config.Revision{Revision: "oliver"}, State: robot.StateRunning}, nil } - return robot.MachineStatus{}, errors.New("shouldn't happen") + return robot.MachineStatus{State: robot.StateRunning}, errors.New("shouldn't happen") }, } pb.RegisterRobotServiceServer(gServer, server.New(injectRobot)) @@ -1872,6 +1937,9 @@ func TestCloudMetadata(t *testing.T) { CloudMetadataFunc: func(ctx context.Context) (cloud.Metadata, error) { return injectCloudMD, nil }, + MachineStatusFunc: func(ctx context.Context) (robot.MachineStatus, error) { + return robot.MachineStatus{State: robot.StateRunning}, nil + }, } // TODO(RSDK-882): will update this so that this is not necessary injectRobot.FrameSystemConfigFunc = func(ctx context.Context) (*framesystem.Config, error) { @@ -1906,6 +1974,9 @@ func TestShutDown(t *testing.T) { shutdownCalled = true return nil }, + MachineStatusFunc: func(ctx context.Context) (robot.MachineStatus, error) { + return robot.MachineStatus{State: robot.StateRunning}, nil + }, } gServer := grpc.NewServer() @@ -1942,6 +2013,9 @@ func TestUnregisteredResourceByName(t *testing.T) { injectRobot := &inject.Robot{ ResourceNamesFunc: func() []resource.Name { return resourceList }, ResourceRPCAPIsFunc: func() []resource.RPCAPI { return nil }, + MachineStatusFunc: func(ctx context.Context) (robot.MachineStatus, error) { + return robot.MachineStatus{State: robot.StateRunning}, nil + }, } gServer := grpc.NewServer() @@ -1975,6 +2049,7 @@ func TestMachineStatus(t *testing.T) { robot.MachineStatus{ Config: config.Revision{Revision: "rev1"}, Resources: []resource.Status{}, + State: robot.StateRunning, }, 0, }, @@ -1990,8 +2065,9 @@ func TestMachineStatus(t *testing.T) { }, }, }, + State: robot.StateRunning, }, - 1, + 2, // once for client.New call and once for MachineStatus call }, { "resource with valid status", @@ -2006,6 +2082,7 @@ func TestMachineStatus(t *testing.T) { }, }, }, + State: robot.StateRunning, }, 0, }, @@ -2034,8 +2111,9 @@ func TestMachineStatus(t *testing.T) { }, }, }, + State: robot.StateRunning, }, - 2, + 4, // twice for client.New call and twice for MachineStatus call }, { "unhealthy status", @@ -2051,6 +2129,7 @@ func TestMachineStatus(t *testing.T) { }, }, }, + State: robot.StateRunning, }, 0, }, @@ -2073,6 +2152,7 @@ func TestMachineStatus(t *testing.T) { }, }, }, + State: robot.StateRunning, }, 0, }, @@ -2126,6 +2206,9 @@ func TestVersion(t *testing.T) { injectRobot := &inject.Robot{ ResourceNamesFunc: func() []resource.Name { return nil }, ResourceRPCAPIsFunc: func() []resource.RPCAPI { return nil }, + MachineStatusFunc: func(ctx context.Context) (robot.MachineStatus, error) { + return robot.MachineStatus{State: robot.StateRunning}, nil + }, } pb.RegisterRobotServiceServer(gServer, server.New(injectRobot)) diff --git a/robot/impl/local_robot.go b/robot/impl/local_robot.go index 5189618a5fe..2703a7ae871 100644 --- a/robot/impl/local_robot.go +++ b/robot/impl/local_robot.go @@ -94,6 +94,11 @@ type localRobot struct { // whether the robot is actively reconfiguring reconfiguring atomic.Bool + + // whether the robot is still initializing. this value controls what state will be + // returned by the MachineStatus endpoint (initializing if true, running if false.) + // configured based on the `Initial` value of applied `config.Config`s. + initializing atomic.Bool } // ExportResourcesAsDot exports the resource graph as a DOT representation for @@ -395,6 +400,7 @@ func newWithResources( localModuleVersions: make(map[string]semver.Version), ftdc: ftdcWorker, } + r.mostRecentCfg.Store(config.Config{}) var heartbeatWindow time.Duration if cfg.Network.Sessions.HeartbeatWindow == 0 { @@ -551,20 +557,6 @@ func (r *localRobot) removeOrphanedResources(ctx context.Context, r.updateWeakDependents(ctx) } -// resourceHasWeakDependencies will return whether a given resource has weak dependencies. -// Internal services that depend on other resources are also included in the check. -func (r *localRobot) resourceHasWeakDependencies(rName resource.Name, node *resource.GraphNode) bool { - if len(r.getWeakDependencyMatchers(node.Config().API, node.Config().Model)) > 0 { - return true - } - - // also return true for internal services that depends on other resources (web, framesystem). - if rName == web.InternalServiceName || rName == framesystem.InternalServiceName { - return true - } - return false -} - // getDependencies derives a collection of dependencies from a robot for a given // component's name. We don't use the resource manager for this information since // it is not be constructed at this point. @@ -1113,6 +1105,9 @@ func (r *localRobot) Reconfigure(ctx context.Context, newConfig *config.Config) r.reconfigurationLock.Lock() defer r.reconfigurationLock.Unlock() r.reconfigure(ctx, newConfig, false) + + // Set initializing value based on `newConfig.Initial`. + r.initializing.Store(newConfig.Initial) } // set Module.LocalVersion on Type=local modules. Call this before localPackages.Sync and in RestartModule. @@ -1305,12 +1300,20 @@ func (r *localRobot) reconfigure(ctx context.Context, newConfig *config.Config, allErrs = multierr.Combine(allErrs, err) } - // Cleanup unused packages after all old resources have been closed above. This ensures - // processes are shutdown before any files are deleted they are using. - allErrs = multierr.Combine(allErrs, r.packageManager.Cleanup(ctx)) - allErrs = multierr.Combine(allErrs, r.localPackages.Cleanup(ctx)) - // Cleanup extra dirs from previous modules or rogue scripts. - allErrs = multierr.Combine(allErrs, r.manager.moduleManager.CleanModuleDataDirectory()) + // If new config is not marked as initial, cleanup unused packages after all + // old resources have been closed above. This ensures processes are shutdown + // before any files are deleted they are using. + // + // If new config IS marked as initial, machine will be starting with no + // modules but may immediately reconfigure to start modules that have + // already been downloaded. Do not cleanup packages/module dirs in that case. + if !newConfig.Initial { + allErrs = multierr.Combine(allErrs, r.packageManager.Cleanup(ctx)) + allErrs = multierr.Combine(allErrs, r.localPackages.Cleanup(ctx)) + + // Cleanup extra dirs from previous modules or rogue scripts. + allErrs = multierr.Combine(allErrs, r.manager.moduleManager.CleanModuleDataDirectory()) + } if allErrs != nil { r.logger.CErrorw(ctx, "The following errors were gathered during reconfiguration", "errors", allErrs) @@ -1449,6 +1452,10 @@ func (r *localRobot) MachineStatus(ctx context.Context) (robot.MachineStatus, er result.Config = r.configRevision r.configRevisionMu.RUnlock() + result.State = robot.StateRunning + if r.initializing.Load() { + result.State = robot.StateInitializing + } return result, nil } diff --git a/robot/impl/resource_manager.go b/robot/impl/resource_manager.go index 0c832ae405f..8b711d2309d 100644 --- a/robot/impl/resource_manager.go +++ b/robot/impl/resource_manager.go @@ -622,11 +622,8 @@ func (manager *resourceManager) completeConfig( levels := manager.resources.ReverseTopologicalSortInLevels() timeout := rutils.GetResourceConfigurationTimeout(manager.logger) for _, resourceNames := range levels { - // At the start of every reconfiguration level, check if updateWeakDependents should be run. - // Both conditions below should be met for `updateWeakDependents` to be called: - // - At least one resource that needs to reconfigure in this level - // depends on at least one resource with weak dependencies (weak dependents) - // - The logical clock is higher than the `lastWeakDependentsRound` value + // At the start of every reconfiguration level, check if updateWeakDependents should be run + // by checking if the logical clock is higher than the `lastWeakDependentsRound` value. // // This will make sure that weak dependents are updated before they are passed into constructors // or reconfigure methods. @@ -634,7 +631,6 @@ func (manager *resourceManager) completeConfig( // Resources that depend on weak dependents should expect that the weak dependents pass into the // constructor or reconfigure method will only have been reconfigured with all resources constructed // before their level. - var weakDependentsUpdated bool for _, resName := range resourceNames { select { case <-ctx.Done(): @@ -649,17 +645,8 @@ func (manager *resourceManager) completeConfig( continue } - for _, dep := range manager.resources.GetAllParentsOf(resName) { - if node, ok := manager.resources.Node(dep); ok { - if lr.resourceHasWeakDependencies(dep, node) && lr.lastWeakDependentsRound.Load() < manager.resources.CurrLogicalClockValue() { - lr.updateWeakDependents(ctx) - weakDependentsUpdated = true - break - } - } - } - if weakDependentsUpdated { - break + if lr.lastWeakDependentsRound.Load() < manager.resources.CurrLogicalClockValue() { + lr.updateWeakDependents(ctx) } } // we use an errgroup here instead of a normal waitgroup to conveniently bubble diff --git a/robot/impl/robot_options.go b/robot/impl/robot_options.go index c59aded407b..50ce478b30a 100644 --- a/robot/impl/robot_options.go +++ b/robot/impl/robot_options.go @@ -19,6 +19,7 @@ type options struct { // shutdownCallback provides a callback for the robot to be able to shut itself down. shutdownCallback func() + // whether or not to run FTDC enableFTDC bool // disableCompleteConfigWorker starts the robot without the complete config worker - should only be used for tests. diff --git a/robot/robot.go b/robot/robot.go index f57330d0852..e3a8c7538df 100644 --- a/robot/robot.go +++ b/robot/robot.go @@ -310,10 +310,25 @@ func (rmr *RestartModuleRequest) MatchesModule(mod config.Module) bool { return mod.Name == rmr.ModuleName } +// MachineState captures the state of a machine. +type MachineState uint8 + +const ( + // StateUnknown represents an unknown state. + StateUnknown MachineState = iota + // StateInitializing denotes a currently initializing machine. The first + // reconfigure after initial creation has not completed. + StateInitializing + // StateRunning denotes a running machine. The first reconfigure after + // initial creation has completed. + StateRunning +) + // MachineStatus encapsulates the current status of the robot. type MachineStatus struct { Resources []resource.Status Config config.Revision + State MachineState } // VersionResponse encapsulates the version info of the robot. diff --git a/robot/server/server.go b/robot/server/server.go index 86c3df4a8ee..cdcd8a7988f 100644 --- a/robot/server/server.go +++ b/robot/server/server.go @@ -476,6 +476,16 @@ func (s *Server) GetMachineStatus(ctx context.Context, _ *pb.GetMachineStatusReq result.Resources = append(result.Resources, pbResStatus) } + switch mStatus.State { + case robot.StateUnknown: + s.robot.Logger().CError(ctx, "machine in an unknown state") + result.State = pb.GetMachineStatusResponse_STATE_UNSPECIFIED + case robot.StateInitializing: + result.State = pb.GetMachineStatusResponse_STATE_INITIALIZING + case robot.StateRunning: + result.State = pb.GetMachineStatusResponse_STATE_RUNNING + } + return &result, nil } diff --git a/robot/server/server_test.go b/robot/server/server_test.go index 79437d8f2fe..40425a8976b 100644 --- a/robot/server/server_test.go +++ b/robot/server/server_test.go @@ -72,21 +72,26 @@ func TestServer(t *testing.T) { }) t.Run("GetMachineStatus", func(t *testing.T) { - for _, tc := range []struct { - name string - injectMachineStatus robot.MachineStatus - expConfig *pb.ConfigStatus - expResources []*pb.ResourceStatus - expBadStateCount int + testCases := []struct { + name string + injectMachineStatus robot.MachineStatus + expConfig *pb.ConfigStatus + expResources []*pb.ResourceStatus + expState pb.GetMachineStatusResponse_State + expBadResourceStateCount int + expBadMachineStateCount int }{ { "no resources", robot.MachineStatus{ Config: config.Revision{Revision: "rev1"}, Resources: []resource.Status{}, + State: robot.StateRunning, }, &pb.ConfigStatus{Revision: "rev1"}, []*pb.ResourceStatus{}, + pb.GetMachineStatusResponse_STATE_RUNNING, + 0, 0, }, { @@ -101,6 +106,7 @@ func TestServer(t *testing.T) { }, }, }, + State: robot.StateRunning, }, &pb.ConfigStatus{Revision: "rev1"}, []*pb.ResourceStatus{ @@ -110,7 +116,9 @@ func TestServer(t *testing.T) { Revision: "rev0", }, }, + pb.GetMachineStatusResponse_STATE_RUNNING, 1, + 0, }, { "resource with valid status", @@ -125,6 +133,7 @@ func TestServer(t *testing.T) { }, }, }, + State: robot.StateRunning, }, &pb.ConfigStatus{Revision: "rev1"}, []*pb.ResourceStatus{ @@ -134,6 +143,8 @@ func TestServer(t *testing.T) { Revision: "rev1", }, }, + pb.GetMachineStatusResponse_STATE_RUNNING, + 0, 0, }, { @@ -150,6 +161,7 @@ func TestServer(t *testing.T) { CloudMetadata: cloud.Metadata{}, }, }, + State: robot.StateRunning, }, &pb.ConfigStatus{Revision: "rev1"}, []*pb.ResourceStatus{ @@ -160,6 +172,8 @@ func TestServer(t *testing.T) { CloudMetadata: &pb.GetCloudMetadataResponse{}, }, }, + pb.GetMachineStatusResponse_STATE_RUNNING, + 0, 0, }, { @@ -193,6 +207,7 @@ func TestServer(t *testing.T) { }, }, }, + State: robot.StateRunning, }, &pb.ConfigStatus{Revision: "rev1"}, []*pb.ResourceStatus{ @@ -222,6 +237,8 @@ func TestServer(t *testing.T) { ), }, }, + pb.GetMachineStatusResponse_STATE_RUNNING, + 0, 0, }, { @@ -249,6 +266,7 @@ func TestServer(t *testing.T) { }, }, }, + State: robot.StateRunning, }, &pb.ConfigStatus{Revision: "rev1"}, []*pb.ResourceStatus{ @@ -268,7 +286,9 @@ func TestServer(t *testing.T) { Revision: "rev-1", }, }, + pb.GetMachineStatusResponse_STATE_RUNNING, 2, + 0, }, { "unhealthy status", @@ -284,6 +304,7 @@ func TestServer(t *testing.T) { }, }, }, + State: robot.StateRunning, }, &pb.ConfigStatus{Revision: "rev1"}, []*pb.ResourceStatus{ @@ -294,30 +315,69 @@ func TestServer(t *testing.T) { Error: "bad configuration", }, }, + pb.GetMachineStatusResponse_STATE_RUNNING, + 0, 0, }, - } { - logger, logs := logging.NewObservedTestLogger(t) - injectRobot := &inject.Robot{} - server := server.New(injectRobot) - req := pb.GetMachineStatusRequest{} - injectRobot.LoggerFunc = func() logging.Logger { - return logger - } - injectRobot.MachineStatusFunc = func(ctx context.Context) (robot.MachineStatus, error) { - return tc.injectMachineStatus, nil - } - resp, err := server.GetMachineStatus(context.Background(), &req) - test.That(t, err, test.ShouldBeNil) - test.That(t, resp.GetConfig().GetRevision(), test.ShouldEqual, tc.expConfig.Revision) - for i, res := range resp.GetResources() { - test.That(t, res.GetName(), test.ShouldResemble, tc.expResources[i].Name) - test.That(t, res.GetState(), test.ShouldResemble, tc.expResources[i].State) - test.That(t, res.GetRevision(), test.ShouldEqual, tc.expResources[i].Revision) - } - const badStateMsg = "resource in an unknown state" - badStateCount := logs.FilterLevelExact(zapcore.ErrorLevel).FilterMessageSnippet(badStateMsg).Len() - test.That(t, badStateCount, test.ShouldEqual, tc.expBadStateCount) + { + "initializing machine state", + robot.MachineStatus{ + Config: config.Revision{Revision: "rev1"}, + Resources: []resource.Status{}, + State: robot.StateInitializing, + }, + &pb.ConfigStatus{Revision: "rev1"}, + []*pb.ResourceStatus{}, + pb.GetMachineStatusResponse_STATE_INITIALIZING, + 0, + 0, + }, + { + "unknown machine state", + robot.MachineStatus{ + Config: config.Revision{Revision: "rev1"}, + Resources: []resource.Status{}, + State: robot.StateUnknown, + }, + &pb.ConfigStatus{Revision: "rev1"}, + []*pb.ResourceStatus{}, + pb.GetMachineStatusResponse_STATE_UNSPECIFIED, + 0, + 1, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + logger, logs := logging.NewObservedTestLogger(t) + injectRobot := &inject.Robot{} + server := server.New(injectRobot) + req := pb.GetMachineStatusRequest{} + injectRobot.LoggerFunc = func() logging.Logger { + return logger + } + injectRobot.MachineStatusFunc = func(ctx context.Context) (robot.MachineStatus, error) { + return tc.injectMachineStatus, nil + } + resp, err := server.GetMachineStatus(context.Background(), &req) + test.That(t, err, test.ShouldBeNil) + test.That(t, resp.GetConfig().GetRevision(), test.ShouldEqual, tc.expConfig.Revision) + for i, res := range resp.GetResources() { + test.That(t, res.GetName(), test.ShouldResemble, tc.expResources[i].Name) + test.That(t, res.GetState(), test.ShouldResemble, tc.expResources[i].State) + test.That(t, res.GetRevision(), test.ShouldEqual, tc.expResources[i].Revision) + } + + test.That(t, resp.GetState(), test.ShouldEqual, tc.expState) + + const badResourceStateMsg = "resource in an unknown state" + badResourceStateCount := logs.FilterLevelExact(zapcore.ErrorLevel).FilterMessageSnippet(badResourceStateMsg).Len() + test.That(t, badResourceStateCount, test.ShouldEqual, tc.expBadResourceStateCount) + + const badMachineStateMsg = "machine in an unknown state" + badMachineStateCount := logs.FilterLevelExact(zapcore.ErrorLevel).FilterMessageSnippet(badMachineStateMsg).Len() + test.That(t, badMachineStateCount, test.ShouldEqual, tc.expBadMachineStateCount) + }) } }) diff --git a/web/server/entrypoint.go b/web/server/entrypoint.go index 759e7f5f4fb..6c2a2bed01e 100644 --- a/web/server/entrypoint.go +++ b/web/server/entrypoint.go @@ -25,6 +25,7 @@ import ( "go.viam.com/rdk/config" "go.viam.com/rdk/logging" "go.viam.com/rdk/resource" + "go.viam.com/rdk/robot" robotimpl "go.viam.com/rdk/robot/impl" "go.viam.com/rdk/robot/web" weboptions "go.viam.com/rdk/robot/web/options" @@ -249,6 +250,92 @@ func (s *robotServer) createWebOptions(cfg *config.Config) (weboptions.Options, return options, nil } +// A wrapper around actual config processing that also applies options from the +// robot server. +func (s *robotServer) processConfig(in *config.Config) (*config.Config, error) { + out, err := config.ProcessConfig(in) + if err != nil { + return nil, err + } + out.Debug = s.args.Debug || in.Debug + out.EnableWebProfile = s.args.WebProfile || in.EnableWebProfile + out.FromCommand = true + out.AllowInsecureCreds = s.args.AllowInsecureCreds + out.UntrustedEnv = s.args.UntrustedEnv + + // Use ~/.viam/packages for package path if one was not specified. + if in.PackagePath == "" { + out.PackagePath = path.Join(viamDotDir, "packages") + } + + return out, nil +} + +// A function to be started as a goroutine that watches for changes, either +// from disk or from cloud, to the robot's config. Starts comparisons based on +// `currCfg`. Reconfigures the robot when config changes are received from the +// watcher. +func (s *robotServer) configWatcher(ctx context.Context, currCfg *config.Config, r robot.LocalRobot, + watcher config.Watcher, +) { + // Reconfigure robot to have passed-in config before listening for any config + // changes. + r.Reconfigure(ctx, currCfg) + + for { + select { + case <-ctx.Done(): + return + default: + } + select { + case <-ctx.Done(): + return + case cfg := <-watcher.Config(): + processedConfig, err := s.processConfig(cfg) + if err != nil { + s.logger.Errorw("reconfiguration aborted: error processing config", "error", err) + continue + } + + // flag to restart web service if necessary + diff, err := config.DiffConfigs(*currCfg, *processedConfig, s.args.RevealSensitiveConfigDiffs) + if err != nil { + s.logger.Errorw("reconfiguration aborted: error diffing config", "error", err) + continue + } + var options weboptions.Options + + if !diff.NetworkEqual { + // TODO(RSDK-2694): use internal web service reconfiguration instead + r.StopWeb() + options, err = s.createWebOptions(processedConfig) + if err != nil { + s.logger.Errorw("reconfiguration aborted: error creating weboptions", "error", err) + continue + } + } + + // Update logger registry if log patterns may have changed. + // + // This functionality is tested in `TestLogPropagation` in `local_robot_test.go`. + if !diff.LogEqual { + s.logger.Debug("Detected potential changes to log patterns; updating logger levels") + config.UpdateLoggerRegistryFromConfig(s.registry, processedConfig, s.logger) + } + + r.Reconfigure(ctx, processedConfig) + + if !diff.NetworkEqual { + if err := r.StartWeb(ctx, options); err != nil { + s.logger.Errorw("reconfiguration failed: error starting web service while reconfiguring", "error", err) + } + } + currCfg = processedConfig + } + } +} + func (s *robotServer) serveWeb(ctx context.Context, cfg *config.Config) (err error) { ctx, cancel := context.WithCancel(ctx) @@ -261,6 +348,17 @@ func (s *robotServer) serveWeb(ctx context.Context, cfg *config.Config) (err err forceShutdown := make(chan struct{}) defer func() { <-forceShutdown }() + var cloudRestartCheckerActive chan struct{} + rpcDialer := rpc.NewCachedDialer() + defer func() { + if cloudRestartCheckerActive != nil { + <-cloudRestartCheckerActive + } + err = multierr.Combine(err, rpcDialer.Close()) + }() + defer cancel() + ctx = rpc.ContextWithDialer(ctx, rpcDialer) + utils.PanicCapturingGo(func() { defer close(forceShutdown) @@ -313,32 +411,7 @@ func (s *robotServer) serveWeb(ctx context.Context, cfg *config.Config) (err err <-slowWatcher }() - var cloudRestartCheckerActive chan struct{} - rpcDialer := rpc.NewCachedDialer() - defer func() { - if cloudRestartCheckerActive != nil { - <-cloudRestartCheckerActive - } - err = multierr.Combine(err, rpcDialer.Close()) - }() - defer cancel() - ctx = rpc.ContextWithDialer(ctx, rpcDialer) - - processConfig := func(in *config.Config) (*config.Config, error) { - out, err := config.ProcessConfig(in) - if err != nil { - return nil, err - } - out.Debug = s.args.Debug || in.Debug - out.EnableWebProfile = s.args.WebProfile || in.EnableWebProfile - out.FromCommand = true - out.AllowInsecureCreds = s.args.AllowInsecureCreds - out.UntrustedEnv = s.args.UntrustedEnv - out.PackagePath = path.Join(viamDotDir, "packages") - return out, nil - } - - processedConfig, err := processConfig(cfg) + fullProcessedConfig, err := s.processConfig(cfg) if err != nil { return err } @@ -347,9 +420,9 @@ func (s *robotServer) serveWeb(ctx context.Context, cfg *config.Config) (err err // updates to the registry will be handled by the config watcher goroutine. // // This functionality is tested in `TestLogPropagation` in `local_robot_test.go`. - config.UpdateLoggerRegistryFromConfig(s.registry, processedConfig, s.logger) + config.UpdateLoggerRegistryFromConfig(s.registry, fullProcessedConfig, s.logger) - if processedConfig.Cloud != nil { + if fullProcessedConfig.Cloud != nil { cloudRestartCheckerActive = make(chan struct{}) utils.PanicCapturingGo(func() { defer close(cloudRestartCheckerActive) @@ -398,7 +471,24 @@ func (s *robotServer) serveWeb(ctx context.Context, cfg *config.Config) (err err robotOptions = append(robotOptions, robotimpl.WithFTDC()) } - myRobot, err := robotimpl.New(ctx, processedConfig, s.logger, robotOptions...) + // Create `minimalProcessedConfig`, a copy of `fullProcessedConfig`. Remove + // all components, services, remotes, modules, and processes from + // `minimalProcessedConfig`. Create new robot with `minimalProcessedConfig` + // and immediately start web service. We need the machine to be reachable + // through the web service ASAP, even if some resources take a long time to + // initially configure. + minimalProcessedConfig := *fullProcessedConfig + minimalProcessedConfig.Components = nil + minimalProcessedConfig.Services = nil + minimalProcessedConfig.Remotes = nil + minimalProcessedConfig.Modules = nil + minimalProcessedConfig.Processes = nil + + // Mark minimalProcessedConfig as an initial config, so robot reports a + // state of initializing until reconfigured with full config. + minimalProcessedConfig.Initial = true + + myRobot, err := robotimpl.New(ctx, &minimalProcessedConfig, s.logger, robotOptions...) if err != nil { cancel() return err @@ -416,70 +506,25 @@ func (s *robotServer) serveWeb(ctx context.Context, cfg *config.Config) (err err defer func() { err = multierr.Combine(err, watcher.Close()) }() - onWatchDone := make(chan struct{}) - oldCfg := processedConfig - utils.ManagedGo(func() { - for { - select { - case <-ctx.Done(): - return - default: - } - select { - case <-ctx.Done(): - return - case cfg := <-watcher.Config(): - processedConfig, err := processConfig(cfg) - if err != nil { - s.logger.Errorw("reconfiguration aborted: error processing config", "error", err) - continue - } - - // flag to restart web service if necessary - diff, err := config.DiffConfigs(*oldCfg, *processedConfig, s.args.RevealSensitiveConfigDiffs) - if err != nil { - s.logger.Errorw("reconfiguration aborted: error diffing config", "error", err) - continue - } - var options weboptions.Options - - if !diff.NetworkEqual { - // TODO(RSDK-2694): use internal web service reconfiguration instead - myRobot.StopWeb() - options, err = s.createWebOptions(processedConfig) - if err != nil { - s.logger.Errorw("reconfiguration aborted: error creating weboptions", "error", err) - continue - } - } - // Update logger registry if log patterns may have changed. - // - // This functionality is tested in `TestLogPropagation` in `local_robot_test.go`. - if !diff.LogEqual { - s.logger.Debug("Detected potential changes to log patterns; updating logger levels") - config.UpdateLoggerRegistryFromConfig(s.registry, processedConfig, s.logger) - } - - myRobot.Reconfigure(ctx, processedConfig) + onWatchDone := make(chan struct{}) + go func() { + defer close(onWatchDone) - if !diff.NetworkEqual { - if err := myRobot.StartWeb(ctx, options); err != nil { - s.logger.Errorw("reconfiguration failed: error starting web service while reconfiguring", "error", err) - } - } - oldCfg = processedConfig - } - } - }, func() { - close(onWatchDone) - }) + // Use `fullProcessedConfig` as the initial config for the config watcher + // goroutine, as we want incoming config changes to be compared to the full + // config. + s.configWatcher(ctx, fullProcessedConfig, myRobot, watcher) + }() + // At end of this function, cancel context and wait for watcher goroutine + // to complete. defer func() { + cancel() <-onWatchDone }() - defer cancel() - options, err := s.createWebOptions(processedConfig) + // Create initial web options with `minimalProcessedConfig`. + options, err := s.createWebOptions(&minimalProcessedConfig) if err != nil { return err } diff --git a/web/server/entrypoint_test.go b/web/server/entrypoint_test.go index 7b8cf98771c..a4865a111eb 100644 --- a/web/server/entrypoint_test.go +++ b/web/server/entrypoint_test.go @@ -10,6 +10,8 @@ import ( "path/filepath" "runtime" "strconv" + "strings" + "sync" "testing" "time" @@ -23,14 +25,18 @@ import ( "google.golang.org/grpc/codes" "google.golang.org/grpc/status" + "go.viam.com/rdk/components/generic" _ "go.viam.com/rdk/components/register" "go.viam.com/rdk/config" "go.viam.com/rdk/logging" "go.viam.com/rdk/resource" + "go.viam.com/rdk/robot" + "go.viam.com/rdk/robot/client" _ "go.viam.com/rdk/services/register" "go.viam.com/rdk/testutils" "go.viam.com/rdk/testutils/robottestutils" "go.viam.com/rdk/utils" + "go.viam.com/rdk/web/server" ) // numResources is the # of resources in /etc/configs/fake.json + the 2 @@ -192,3 +198,125 @@ func isExpectedShutdownError(err error, testLogger logging.Logger) bool { testLogger.Errorw("Unexpected shutdown error", "err", err) return false } + +// Tests that machine state properly reports initializing or running. +func TestMachineState(t *testing.T) { + logger := logging.NewTestLogger(t) + ctx, cancel := context.WithCancel(context.Background()) + + machineAddress := "localhost:23654" + + // Create a fake package directory using `t.TempDir`. Set it up to be identical to the + // expected file tree of the local package manager. Place a single file `foo` in a + // `fake-module` directory. + tempDir := t.TempDir() + fakePackagePath := filepath.Join(tempDir, fmt.Sprint("packages", config.LocalPackagesSuffix)) + fakeModuleDataPath := filepath.Join(fakePackagePath, "data", "fake-module") + err := os.MkdirAll(fakeModuleDataPath, 0o777) // should create all dirs along path + test.That(t, err, test.ShouldBeNil) + fakeModuleDataFile, err := os.Create(filepath.Join(fakeModuleDataPath, "foo")) + test.That(t, err, test.ShouldBeNil) + + // Register a slow-constructing generic resource and defer its deregistration. + type slow struct { + resource.Named + resource.AlwaysRebuild + resource.TriviallyCloseable + } + completeConstruction := make(chan struct{}, 1) + slowModel := resource.NewModel("slow", "to", "build") + resource.RegisterComponent(generic.API, slowModel, resource.Registration[resource.Resource, resource.NoNativeConfig]{ + Constructor: func( + ctx context.Context, + deps resource.Dependencies, + conf resource.Config, + logger logging.Logger, + ) (resource.Resource, error) { + // Wait for `completeConstruction` to close before returning from constructor. + <-completeConstruction + + return &slow{ + Named: conf.ResourceName().AsNamed(), + }, nil + }, + }) + defer func() { + resource.Deregister(generic.API, slowModel) + }() + + // Run entrypoint code (RunServer) in a goroutine, as it is blocking. + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + + // Create a temporary config file with a single + tempConfigFile, err := os.CreateTemp(t.TempDir(), "temp_config.json") + test.That(t, err, test.ShouldBeNil) + + cfg := &config.Config{ + // Set PackagePath to temp dir created at top of test with the "-local" piece trimmed. Local + // package manager will automatically add that suffix. + PackagePath: strings.TrimSuffix(fakePackagePath, config.LocalPackagesSuffix), + Components: []resource.Config{ + { + Name: "slowpoke", + API: generic.API, + Model: slowModel, + }, + }, + Network: config.NetworkConfig{ + NetworkConfigData: config.NetworkConfigData{ + BindAddress: machineAddress, + }, + }, + } + + cfgBytes, err := json.Marshal(&cfg) + test.That(t, err, test.ShouldBeNil) + test.That(t, os.WriteFile(tempConfigFile.Name(), cfgBytes, 0o755), test.ShouldBeNil) + + args := []string{"viam-server", "-config", tempConfigFile.Name()} + test.That(t, server.RunServer(ctx, args, logger), test.ShouldBeNil) + }() + + // Set `DoNotWaitForRunning` to true to allow connecting to a still-initializing + // machine. + client.DoNotWaitForRunning.Store(true) + defer func() { + client.DoNotWaitForRunning.Store(false) + }() + + rc := robottestutils.NewRobotClient(t, logger, machineAddress, time.Second) + + // Assert that, from client's perspective, robot is in an initializing state until + // `slowpoke` completes construction. + machineStatus, err := rc.MachineStatus(ctx) + test.That(t, err, test.ShouldBeNil) + test.That(t, machineStatus, test.ShouldNotBeNil) + test.That(t, machineStatus.State, test.ShouldEqual, robot.StateInitializing) + + // Assert that the `foo` package file exists during initialization, machine assumes + // package files may still be in use.) + _, err = os.Stat(fakeModuleDataFile.Name()) + test.That(t, err, test.ShouldBeNil) + + // Allow `slowpoke` to complete construction. + close(completeConstruction) + + gtestutils.WaitForAssertion(t, func(tb testing.TB) { + machineStatus, err := rc.MachineStatus(ctx) + test.That(tb, err, test.ShouldBeNil) + test.That(tb, machineStatus, test.ShouldNotBeNil) + test.That(tb, machineStatus.State, test.ShouldEqual, robot.StateRunning) + }) + + // Assert that the `foo` file was removed, as the non-initializing `Reconfigure` + // determined it was unnecessary (no associated package/module.) + _, err = os.Stat(fakeModuleDataFile.Name()) + test.That(t, os.IsNotExist(err), test.ShouldBeTrue) + + // Cancel context and wait for server goroutine to stop running. + cancel() + wg.Wait() +}