diff --git a/go.mod b/go.mod index 863f9b97a9c..34ea3d76bdc 100644 --- a/go.mod +++ b/go.mod @@ -4,8 +4,10 @@ go 1.25.1 require ( github.com/AlekSi/gocov-xml v1.0.0 + github.com/Auburn/FastNoiseLite v1.1.1 github.com/Masterminds/semver/v3 v3.3.0 github.com/a8m/envsubst v1.4.2 + github.com/aquilax/go-perlin v1.1.0 github.com/axw/gocov v1.1.0 github.com/aybabtme/uniplot v0.0.0-20151203143629-039c559e5e7e github.com/benbjohnson/clock v1.3.5 @@ -460,3 +462,5 @@ require ( github.com/ziutek/mymysql v1.5.4 // indirect golang.org/x/exp v0.0.0-20241009180824-f66d83c29e7c ) + +replace go.viam.com/api => ../api diff --git a/go.sum b/go.sum index 06326b07d06..5249db702d7 100644 --- a/go.sum +++ b/go.sum @@ -83,6 +83,8 @@ github.com/Antonboom/nilnil v0.1.9 h1:eKFMejSxPSA9eLSensFmjW2XTgTwJMjZ8hUHtV4s/S github.com/Antonboom/nilnil v0.1.9/go.mod h1:iGe2rYwCq5/Me1khrysB4nwI7swQvjclR8/YRPl5ihQ= github.com/Antonboom/testifylint v1.4.3 h1:ohMt6AHuHgttaQ1xb6SSnxCeK4/rnK7KKzbvs7DmEck= github.com/Antonboom/testifylint v1.4.3/go.mod h1:+8Q9+AOLsz5ZiQiiYujJKs9mNz398+M6UgslP4qgJLA= +github.com/Auburn/FastNoiseLite v1.1.1 h1:ezkEt1+MJUcZa4+7uXONiztapr+IJ9NMY0EMhkn+7VE= +github.com/Auburn/FastNoiseLite v1.1.1/go.mod h1:L4QQUME0Wkn7Rq5S0oif42SLLpSCk224uw4AbF2H9sc= github.com/Azure/go-ansiterm v0.0.0-20230124172434-306776ec8161 h1:L/gRVlceqvL25UVaW/CKtUDjefjrs0SPonmDGUVOYP0= github.com/Azure/go-ansiterm v0.0.0-20230124172434-306776ec8161/go.mod h1:xomTg63KZ2rFqZQzSB4Vz2SUXa1BpHTVz9L5PTmPC4E= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= @@ -143,6 +145,8 @@ github.com/apache/arrow/go/arrow v0.0.0-20201229220542-30ce2eb5d4dc h1:zvQ6w7Kwt github.com/apache/arrow/go/arrow v0.0.0-20201229220542-30ce2eb5d4dc/go.mod h1:c9sxoIT3YgLxH4UhLOCKaBlEojuMhVYpk4Ntv3opUTQ= github.com/apache/thrift v0.12.0/go.mod h1:cp2SuWMxlEZw2r+iP2GNCdIi4C1qmUzdZFSVb+bacwQ= github.com/apache/thrift v0.13.0/go.mod h1:cp2SuWMxlEZw2r+iP2GNCdIi4C1qmUzdZFSVb+bacwQ= +github.com/aquilax/go-perlin v1.1.0 h1:Gg+3jQ24wT4Y5GI7TCRLmYarzUG0k+n/JATFqOimb7s= +github.com/aquilax/go-perlin v1.1.0/go.mod h1:z9Rl7EM4BZY0Ikp2fEN1I5mKSOJ26HQpk0O2TBdN2HE= github.com/armon/circbuf v0.0.0-20150827004946-bbbad097214e/go.mod h1:3U/XgcO3hCbHZ8TKRvWD2dDTCfh9M9ya+I9JpbB7O8o= github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da/go.mod h1:Q73ZrmVTwzkszR9V5SSuryQ31EELlFMUz1kKyl939pY= github.com/armon/go-radix v0.0.0-20180808171621-7fddfc383310/go.mod h1:ufUuZ+zHj4x4TnLV4JWEpy2hxWSpsRywHrMgIH9cCH8= @@ -1532,8 +1536,6 @@ go.uber.org/zap v1.18.1/go.mod h1:xg/QME4nWcxGxrpdeYfq7UvYrLh66cuVKdrbD1XF/NI= go.uber.org/zap v1.23.0/go.mod h1:D+nX8jyLsMHMYrln8A0rJjFt/T/9/bGgIhAqxv5URuY= go.uber.org/zap v1.27.0 h1:aJMhYGrd5QSmlpLMr2MftRKl7t8J8PTZPA732ud/XR8= go.uber.org/zap v1.27.0/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E= -go.viam.com/api v0.1.477 h1:huAOmn3iejrRapzlYSyB3R0S47itXTUkQ3+kt0Yx02I= -go.viam.com/api v0.1.477/go.mod h1:p/am76zx8SZ74V/F4rEAYQIpHaaLUwJgY2q3Uw3FIWk= go.viam.com/test v1.2.4 h1:JYgZhsuGAQ8sL9jWkziAXN9VJJiKbjoi9BsO33TW3ug= go.viam.com/test v1.2.4/go.mod h1:zI2xzosHdqXAJ/kFqcN+OIF78kQuTV2nIhGZ8EzvaJI= go.viam.com/utils v0.1.167 h1:OnuC5u2YcLTMuwbvyky5yjNDEbQjf0kpoUHoLXhJWz8= diff --git a/services/worldstatestore/fake/fake.go b/services/worldstatestore/fake/fake.go index 99266b08aca..1ee671c53b3 100644 --- a/services/worldstatestore/fake/fake.go +++ b/services/worldstatestore/fake/fake.go @@ -5,16 +5,21 @@ import ( "context" "errors" "fmt" + "image/color" "math" "strings" "sync" "time" + "unsafe" + "github.com/aquilax/go-perlin" + "github.com/golang/geo/r3" commonpb "go.viam.com/api/common/v1" pb "go.viam.com/api/service/worldstatestore/v1" "google.golang.org/protobuf/types/known/structpb" "go.viam.com/rdk/logging" + "go.viam.com/rdk/pointcloud" "go.viam.com/rdk/resource" "go.viam.com/rdk/services/worldstatestore" ) @@ -37,12 +42,64 @@ type WorldStateStore struct { cancel context.CancelFunc logger logging.Logger + + noise *perlin.Perlin + noiseTime float64 + minZ float64 + maxZ float64 + width int + height int + spacing float64 } +const ( + // GPUBufferAlignment ensures optimal GPU memory transfer (aligned to 256 bytes) + GPUBufferAlignment = 256 + + // MinFPS is the minimum non-zero frame rate + MinFPS = 0 + + // MaxFPS is the maximum supported frame rate (matches high-end monitors) + MaxFPS = 144.0 + + // MaxPointCloudSize is the maximum point cloud size + MaxPointCloudSize = 1000000 + + // MinPointCloudSpacing is the minimum point cloud spacing + MinPointCloudSpacing = 1 + + // TargetBandwidthMBps is the target bandwidth in MB/sec for point cloud updates + TargetBandwidthMBps = 2.0 + + // MinChunkSize is the minimum chunk size to avoid excessive overhead + MinChunkSize = 1000 + + // MaxChunkSize is the maximum chunk size - capped at 512KB worth of points + MaxChunkSize = 32000 +) + var ( - boxUUID = "box-001" - sphereUUID = "sphere-001" - capsuleUUID = "capsule-001" + XYZRGB_FIELDS = []string{"x", "y", "z", "rgb"} + XYZRGB_SIZES = []uint32{4, 4, 4, 4} + XYZRGB_TYPES = []commonpb.PointCloudDataType{ + commonpb.PointCloudDataType_POINT_CLOUD_DATA_TYPE_FLOAT, + commonpb.PointCloudDataType_POINT_CLOUD_DATA_TYPE_FLOAT, + commonpb.PointCloudDataType_POINT_CLOUD_DATA_TYPE_FLOAT, + commonpb.PointCloudDataType_POINT_CLOUD_DATA_TYPE_FLOAT, + } + XYZRGB_COUNTS = []uint32{1, 1, 1, 1} +) + +var ( + ErrTransformNotFound = errors.New("transform not found") + ErrInvalidFPS = errors.New("fps must be greater than 0") +) + +var ( + boxUUID = "box-001" + sphereUUID = "sphere-001" + capsuleUUID = "capsule-001" + pointcloudUUID = "pointcloud-001" boxMetadata = &structpb.Struct{ Fields: map[string]*structpb.Value{ @@ -64,6 +121,7 @@ var ( }, }, } + sphereMetadata = &structpb.Struct{ Fields: map[string]*structpb.Value{ "color": { @@ -84,6 +142,7 @@ var ( }, }, } + capsuleMetadata = &structpb.Struct{ Fields: map[string]*structpb.Value{ "color": { @@ -104,6 +163,7 @@ var ( }, }, } + dynamicBoxMetadata = &structpb.Struct{ Fields: map[string]*structpb.Value{ "color": { @@ -141,12 +201,12 @@ func init() { } // ListUUIDs returns all transform UUIDs currently in the store. -func (f *WorldStateStore) ListUUIDs(ctx context.Context, extra map[string]any) ([][]byte, error) { - f.mu.RLock() - defer f.mu.RUnlock() +func (worldState *WorldStateStore) ListUUIDs(ctx context.Context, extra map[string]any) ([][]byte, error) { + worldState.mu.RLock() + defer worldState.mu.RUnlock() - uuids := make([][]byte, 0, len(f.transforms)) - for _, transform := range f.transforms { + uuids := make([][]byte, 0, len(worldState.transforms)) + for _, transform := range worldState.transforms { uuids = append(uuids, transform.Uuid) } @@ -154,52 +214,157 @@ func (f *WorldStateStore) ListUUIDs(ctx context.Context, extra map[string]any) ( } // GetTransform returns the transform for the given UUID. -func (f *WorldStateStore) GetTransform(ctx context.Context, uuid []byte, extra map[string]any) (*commonpb.Transform, error) { - f.mu.RLock() - defer f.mu.RUnlock() +func (worldState *WorldStateStore) GetTransform(ctx context.Context, uuid []byte, extra map[string]any) (*commonpb.Transform, error) { + worldState.mu.RLock() + defer worldState.mu.RUnlock() - transform, exists := f.transforms[string(uuid)] + uuidStr := string(uuid) + transform, exists := worldState.transforms[uuidStr] if !exists { - return nil, errors.New("transform not found") + return nil, ErrTransformNotFound } return transform, nil } // StreamTransformChanges returns a channel of transform changes. -func (f *WorldStateStore) StreamTransformChanges( +func (worldState *WorldStateStore) StreamTransformChanges( ctx context.Context, extra map[string]any, ) (*worldstatestore.TransformChangeStream, error) { - return worldstatestore.NewTransformChangeStreamFromChannel(ctx, f.changeChan), nil + return worldstatestore.NewTransformChangeStreamFromChannel(ctx, worldState.changeChan), nil } -// DoCommand handles arbitrary commands. Currently accepts "fps": float64 to set the animation rate. -func (f *WorldStateStore) DoCommand(ctx context.Context, cmd map[string]interface{}) (map[string]interface{}, error) { +// DoCommand handles arbitrary commands. Accepts: +// +// - "fps": float64 to set the animation rate (0 to pause) +// +// - "point_cloud_width": int to set the width of the point cloud +// +// - "point_cloud_height": int to set the height of the point cloud +// +// - "point_cloud_spacing": float64 to set the spacing of the point cloud +func (worldState *WorldStateStore) DoCommand(ctx context.Context, cmd map[string]interface{}) (map[string]interface{}, error) { + var statusMsgs []string if fps, ok := cmd["fps"].(float64); ok { - if fps <= 0 { - return nil, errors.New("fps must be greater than 0") + originalFPS := fps + + if fps < 0 { + worldState.logger.Warnf("FPS value %.2f is below 0, clamping to 0 (paused)", fps) + fps = 0 + } else if fps > 0 && fps < MinFPS { + worldState.logger.Warnf("FPS value %.2f is below minimum %.2f, clamping to %.2f", fps, MinFPS, MinFPS) + fps = MinFPS + } else if fps > MaxFPS { + worldState.logger.Warnf("FPS value %.2f exceeds maximum %.2f, clamping to %.2f", fps, MaxFPS, MaxFPS) + fps = MaxFPS } - f.mu.Lock() - f.fps = float64(fps) - f.mu.Unlock() + + worldState.mu.Lock() + worldState.fps = fps + worldState.mu.Unlock() + + msg := fmt.Sprintf("fps set to %.2f", fps) + if fps != originalFPS { + msg += fmt.Sprintf(" (clamped from %.2f)", originalFPS) + } + statusMsgs = append(statusMsgs, msg) + } + + worldState.mu.Lock() + currentWidth := worldState.width + currentHeight := worldState.height + worldState.mu.Unlock() + + if pointCloudWidth, ok := cmd["point_cloud_width"].(int); ok { + originalWidth := pointCloudWidth + if pointCloudWidth < 1 { + worldState.logger.Warnf("Point cloud width %d is below 1, clamping to 1", pointCloudWidth) + pointCloudWidth = 1 + } + + height := currentHeight + if h, ok := cmd["point_cloud_height"].(int); ok { + height = h + } + if pointCloudWidth*height > MaxPointCloudSize { + pointCloudWidth = MaxPointCloudSize / height + worldState.logger.Warnf("Point cloud dimensions exceed maximum points (%d), clamping width to %d", MaxPointCloudSize, pointCloudWidth) + } + + worldState.mu.Lock() + worldState.width = pointCloudWidth + worldState.mu.Unlock() + + msg := fmt.Sprintf("point cloud width set to %d", pointCloudWidth) + if pointCloudWidth != originalWidth { + msg += fmt.Sprintf(" (clamped from %d)", originalWidth) + } + statusMsgs = append(statusMsgs, msg) + } + + if pointCloudHeight, ok := cmd["point_cloud_height"].(int); ok { + originalHeight := pointCloudHeight + if pointCloudHeight < 1 { + worldState.logger.Warnf("Point cloud height %d is below 1, clamping to 1", pointCloudHeight) + pointCloudHeight = 1 + } + + width := currentWidth + if w, ok := cmd["point_cloud_width"].(int); ok { + width = w + } + if width*pointCloudHeight > MaxPointCloudSize { + pointCloudHeight = MaxPointCloudSize / width + worldState.logger.Warnf("Point cloud dimensions exceed maximum points (%d), clamping height to %d", MaxPointCloudSize, pointCloudHeight) + } + + worldState.mu.Lock() + worldState.height = pointCloudHeight + worldState.mu.Unlock() + + msg := fmt.Sprintf("point cloud height set to %d", pointCloudHeight) + if pointCloudHeight != originalHeight { + msg += fmt.Sprintf(" (clamped from %d)", originalHeight) + } + statusMsgs = append(statusMsgs, msg) + } + + if pointCloudSpacing, ok := cmd["point_cloud_spacing"].(float64); ok { + originalSpacing := pointCloudSpacing + if pointCloudSpacing < 1 { + worldState.logger.Warnf("Point cloud spacing %.2f is below 1, clamping to 1", pointCloudSpacing) + pointCloudSpacing = 1 + } + + worldState.mu.Lock() + worldState.spacing = pointCloudSpacing + worldState.mu.Unlock() + + msg := fmt.Sprintf("point cloud spacing set to %.2f", pointCloudSpacing) + if pointCloudSpacing != originalSpacing { + msg += fmt.Sprintf(" (clamped from %.2f)", originalSpacing) + } + statusMsgs = append(statusMsgs, msg) + } + + if len(statusMsgs) == 0 { return map[string]any{ - "status": "fps set to " + fmt.Sprintf("%.2f", fps), + "status": "no commands processed", }, nil } return map[string]any{ - "status": "command not implemented", + "status": strings.Join(statusMsgs, "; "), }, nil } -// Close stops the fake service and cleans up resources. -func (f *WorldStateStore) Close(ctx context.Context) error { - f.cancel() +func (worldState *WorldStateStore) Close(ctx context.Context) error { + worldState.cancel() done := make(chan struct{}) go func() { - f.activeBackgroundWorkers.Wait() + worldState.activeBackgroundWorkers.Wait() close(done) }() @@ -209,12 +374,13 @@ func (f *WorldStateStore) Close(ctx context.Context) error { // proceed even if workers did not exit in time } - close(f.changeChan) + close(worldState.changeChan) return nil } func newFakeWorldStateStore(name resource.Name, logger logging.Logger) worldstatestore.Service { ctx, cancel := context.WithCancel(context.Background()) + noise := perlin.NewPerlin(2, 2, 5, 1337) fake := &WorldStateStore{ Named: name.AsNamed(), @@ -227,30 +393,124 @@ func newFakeWorldStateStore(name resource.Name, logger logging.Logger) worldstat streamCtx: ctx, cancel: cancel, logger: logger, + noise: noise, + width: 750, + height: 750, + spacing: 100, } - fake.initializeStaticTransforms() + fake.initializeTransforms() fake.activeBackgroundWorkers.Add(1) go func() { defer fake.activeBackgroundWorkers.Done() - fake.animationLoop() + fake.animate() }() fake.activeBackgroundWorkers.Add(1) go func() { defer fake.activeBackgroundWorkers.Done() - fake.dynamicBoxSequence() + fake.boxSequence() }() return fake } -// initializeStaticTransforms creates the initial three transforms in the world. -func (f *WorldStateStore) initializeStaticTransforms() { - f.mu.Lock() - defer f.mu.Unlock() +func getStride() int { + stride := 0 + for i := range XYZRGB_SIZES { + stride += int(XYZRGB_SIZES[i] * XYZRGB_COUNTS[i]) + } + return stride +} + +func pointCloudToRawBytes(pc pointcloud.PointCloud) []byte { + stride := getStride() + size := pc.Size() + data := make([]byte, size*stride) + + idx := 0 + pc.Iterate(0, 0, func(p r3.Vector, d pointcloud.Data) bool { + offset := idx * stride + xMeters := float32(p.X / 1000.0) + yMeters := float32(p.Y / 1000.0) + zMeters := float32(p.Z / 1000.0) + + var rgb float32 + if d != nil && d.HasValue() { + rgb = math.Float32frombits(uint32(d.Value())) + } + + *(*float32)(unsafe.Pointer(&data[offset])) = xMeters + *(*float32)(unsafe.Pointer(&data[offset+4])) = yMeters + *(*float32)(unsafe.Pointer(&data[offset+8])) = zMeters + *(*float32)(unsafe.Pointer(&data[offset+12])) = rgb + + idx++ + return true + }) + + return data +} + +func packColor(colorValue color.NRGBA) float32 { + rgb := (uint32(colorValue.R) << 16) | (uint32(colorValue.G) << 8) | uint32(colorValue.B) + return math.Float32frombits(rgb) +} - // Create initial transforms - f.transforms[boxUUID] = &commonpb.Transform{ +func buildUpdateHeader(start, count uint32) *commonpb.PointCloudHeader { + startPtr := &start + header := &commonpb.PointCloudHeader{ + Fields: append([]string{}, XYZRGB_FIELDS...), + Size: append([]uint32{}, XYZRGB_SIZES...), + Type: append([]commonpb.PointCloudDataType{}, XYZRGB_TYPES...), + Count: append([]uint32{}, XYZRGB_COUNTS...), + Width: count, // Number of points in this update + Height: 1, + Start: startPtr, // Buffer offset for partial update + } + + return header +} + +func validateStride(header *commonpb.PointCloudHeader) int { + if len(header.Size) != len(header.Count) { + return 0 + } + + stride := 0 + for i := range header.Size { + stride += int(header.Size[i] * header.Count[i]) + } + return stride +} + +func validatePointCloud(data []byte, header *commonpb.PointCloudHeader) error { + if header == nil { + return errors.New("header cannot be nil") + } + + stride := validateStride(header) + if stride <= 0 { + return errors.New("invalid header: stride must be positive") + } + + expectedSize := stride * int(header.Width) + if len(data) != expectedSize { + return fmt.Errorf("binary data size mismatch: expected %d bytes, got %d bytes", expectedSize, len(data)) + } + + return nil +} + +func createBuffer(sizeBytes int) []byte { + alignedSize := ((sizeBytes + GPUBufferAlignment - 1) / GPUBufferAlignment) * GPUBufferAlignment + return make([]byte, sizeBytes, alignedSize) +} + +func (worldState *WorldStateStore) initializeTransforms() { + worldState.mu.Lock() + defer worldState.mu.Unlock() + + worldState.transforms[boxUUID] = &commonpb.Transform{ ReferenceFrame: "static-box", PoseInObserverFrame: &commonpb.PoseInFrame{ ReferenceFrame: "world", @@ -273,7 +533,7 @@ func (f *WorldStateStore) initializeStaticTransforms() { Metadata: boxMetadata, } - f.transforms[sphereUUID] = &commonpb.Transform{ + worldState.transforms[sphereUUID] = &commonpb.Transform{ ReferenceFrame: "static-sphere", PoseInObserverFrame: &commonpb.PoseInFrame{ ReferenceFrame: "world", @@ -292,7 +552,7 @@ func (f *WorldStateStore) initializeStaticTransforms() { Metadata: sphereMetadata, } - f.transforms[capsuleUUID] = &commonpb.Transform{ + worldState.transforms[capsuleUUID] = &commonpb.Transform{ ReferenceFrame: "static-capsule", PoseInObserverFrame: &commonpb.PoseInFrame{ ReferenceFrame: "world", @@ -311,142 +571,317 @@ func (f *WorldStateStore) initializeStaticTransforms() { Uuid: []byte(capsuleUUID), Metadata: capsuleMetadata, } + + pointCloud := worldState.generatePointCloud(worldState.width, worldState.height) + pointcloudBytes := pointCloudToRawBytes(pointCloud) + pointcloudHeader := buildUpdateHeader(0, uint32(worldState.width*worldState.height)) + + worldState.transforms[pointcloudUUID] = &commonpb.Transform{ + ReferenceFrame: "static-pointcloud", + PoseInObserverFrame: &commonpb.PoseInFrame{ + ReferenceFrame: "world", + Pose: &commonpb.Pose{ + X: 0, Y: 0, Z: 0, Theta: 0, OX: 0, OY: 0, OZ: 1, + }, + }, + PhysicalObject: &commonpb.Geometry{ + GeometryType: &commonpb.Geometry_Pointcloud{ + Pointcloud: &commonpb.PointCloud{ + PointCloud: pointcloudBytes, + Header: pointcloudHeader, + }, + }, + }, + Uuid: []byte(pointcloudUUID), + Metadata: &structpb.Struct{}, + } } -func (f *WorldStateStore) updateBoxTransform(elapsed time.Duration) { - rotationSpeed := 2 * math.Pi / 5.0 // radians per second +func (worldState *WorldStateStore) updateBox(elapsed time.Duration) { + rotationSpeed := 2 * math.Pi / 5.0 angle := rotationSpeed * elapsed.Seconds() - f.mu.Lock() - if transform, exists := f.transforms["box-001"]; exists { + worldState.mu.Lock() + if transform, exists := worldState.transforms["box-001"]; exists { theta := angle * 180 / math.Pi transform.PoseInObserverFrame.Pose.Theta = theta - f.mu.Unlock() - f.emitTransformUpdate(&commonpb.Transform{ - Uuid: transform.Uuid, - PoseInObserverFrame: &commonpb.PoseInFrame{ - Pose: &commonpb.Pose{ - Theta: theta, + worldState.mu.Unlock() + worldState.emitTransformChange( + &commonpb.Transform{ + Uuid: transform.Uuid, + PoseInObserverFrame: &commonpb.PoseInFrame{ + Pose: &commonpb.Pose{ + Theta: theta, + }, }, }, - }, []string{"poseInObserverFrame.pose.theta"}) + pb.TransformChangeType_TRANSFORM_CHANGE_TYPE_UPDATED, + []string{"poseInObserverFrame.pose.theta"}, + ) return } - f.mu.Unlock() + worldState.mu.Unlock() } -func (f *WorldStateStore) updateSphereTransform(elapsed time.Duration) { - frequency := 2 * math.Pi / 5.0 // radians per second +func (worldState *WorldStateStore) updateSphere(elapsed time.Duration) { + frequency := 2 * math.Pi / 5.0 height := math.Sin(frequency*elapsed.Seconds()) * 2000.0 // ±2 units - f.mu.Lock() - if transform, exists := f.transforms["sphere-001"]; exists { - transform.PoseInObserverFrame.Pose.Y = height - f.mu.Unlock() - f.emitTransformUpdate(&commonpb.Transform{ - Uuid: transform.Uuid, - PoseInObserverFrame: &commonpb.PoseInFrame{ - Pose: &commonpb.Pose{ - Y: height, + worldState.mu.Lock() + if transform, exists := worldState.transforms["sphere-001"]; exists { + transform.PoseInObserverFrame.Pose.Z = height + worldState.mu.Unlock() + worldState.emitTransformChange( + &commonpb.Transform{ + Uuid: transform.Uuid, + PoseInObserverFrame: &commonpb.PoseInFrame{ + Pose: &commonpb.Pose{ + Z: height, + }, }, }, - }, []string{"poseInObserverFrame.pose.y"}) + pb.TransformChangeType_TRANSFORM_CHANGE_TYPE_UPDATED, + []string{"poseInObserverFrame.pose.z"}, + ) return } - f.mu.Unlock() + worldState.mu.Unlock() } -func (f *WorldStateStore) updateCapsuleTransform(elapsed time.Duration) { - frequency := 2 * math.Pi / 5.0 // radians per second +func (worldState *WorldStateStore) updateCapsule(elapsed time.Duration) { + frequency := 2 * math.Pi / 5.0 scale := 1.0 + 0.5*math.Sin(frequency*elapsed.Seconds()) // 0.5x to 1.5x - r := 125 * scale - l := 1000 * scale - - f.mu.Lock() - if transform, exists := f.transforms["capsule-001"]; exists { - transform.PhysicalObject.GetCapsule().RadiusMm = r - transform.PhysicalObject.GetCapsule().LengthMm = l - f.mu.Unlock() - f.emitTransformUpdate(&commonpb.Transform{ - Uuid: transform.Uuid, - PhysicalObject: &commonpb.Geometry{ - GeometryType: &commonpb.Geometry_Capsule{ - Capsule: &commonpb.Capsule{ - RadiusMm: r, - LengthMm: l, + radius := 125 * scale + length := 1000 * scale + + worldState.mu.Lock() + if transform, exists := worldState.transforms["capsule-001"]; exists { + transform.PhysicalObject.GetCapsule().RadiusMm = radius + transform.PhysicalObject.GetCapsule().LengthMm = length + worldState.mu.Unlock() + worldState.emitTransformChange( + &commonpb.Transform{ + Uuid: transform.Uuid, + PhysicalObject: &commonpb.Geometry{ + GeometryType: &commonpb.Geometry_Capsule{ + Capsule: &commonpb.Capsule{ + RadiusMm: radius, + LengthMm: length, + }, }, }, }, - }, []string{"physicalObject.geometryType.value.radiusMm", "physicalObject.geometryType.value.lengthMm"}) + pb.TransformChangeType_TRANSFORM_CHANGE_TYPE_UPDATED, + []string{"physicalObject.geometryType.value.radiusMm", "physicalObject.geometryType.value.lengthMm"}, + ) return } - f.mu.Unlock() + worldState.mu.Unlock() } -func (f *WorldStateStore) emitTransformChange(transform *commonpb.Transform, changeType pb.TransformChangeType, updatedFields []string) { - change := worldstatestore.TransformChange{ - ChangeType: changeType, - Transform: transform, - UpdatedFields: updatedFields, +func calculateChunkSize(fps float64) int { + if fps <= 0 { + return MaxChunkSize } - select { - case f.changeChan <- change: - case <-f.streamCtx.Done(): - default: - // Channel is full, skip this update + stride := getStride() + targetBytesPerSecond := TargetBandwidthMBps * 1024 * 1024 + pointsPerSecond := targetBytesPerSecond / float64(stride) + chunkSize := int(pointsPerSecond / fps) + if chunkSize < MinChunkSize { + return MinChunkSize + } + if chunkSize > MaxChunkSize { + return MaxChunkSize + } + + return chunkSize +} + +func (worldState *WorldStateStore) updatePointCloud(elapsed time.Duration) { + worldState.mu.Lock() + transform, exists := worldState.transforms[pointcloudUUID] + if !exists { + worldState.mu.Unlock() + return + } + + originalPC := transform.PhysicalObject.GetPointcloud() + if originalPC == nil || originalPC.Header == nil { + worldState.mu.Unlock() + return + } + + totalPoints := int(originalPC.Header.Width) + if totalPoints == 0 { + worldState.mu.Unlock() + return + } + + originalData := originalPC.PointCloud + worldState.mu.Unlock() + worldState.mu.RLock() + fps := worldState.fps + worldState.mu.RUnlock() + + chunkSize := calculateChunkSize(fps) + worldState.noiseTime = elapsed.Seconds() + frameNumber := int(elapsed.Seconds() * fps) + totalChunks := (totalPoints + chunkSize - 1) / chunkSize + chunkIdx := frameNumber % totalChunks + startIdx := chunkIdx * chunkSize + count := chunkSize + if startIdx+count > totalPoints { + count = totalPoints - startIdx + } + + stride := getStride() + chunkData := createBuffer(count * stride) + for i := 0; i < count; i++ { + pointIdx := startIdx + i + offset := i * stride + + x := pointIdx / worldState.width + y := pointIdx % worldState.height + + originalOffset := pointIdx * stride + currentHeightMeters := float32(0) + if originalOffset+8 < len(originalData) { + currentHeightMeters = math.Float32frombits( + uint32(originalData[originalOffset+8]) | + uint32(originalData[originalOffset+9])<<8 | + uint32(originalData[originalOffset+10])<<16 | + uint32(originalData[originalOffset+11])<<24, + ) + } + + currentNoiseValue := float64(currentHeightMeters*1000.0) / (worldState.spacing * 10.0) + newNoiseValue := worldState.updatePointHeight(x, y, currentNoiseValue, worldState.noiseTime) + + heightColor := heightToColor(newNoiseValue) + animatedRGB := packColor(heightColor) + + xMeters := float32(x) * float32(worldState.spacing) / 1000.0 + yMeters := float32(y) * float32(worldState.spacing) / 1000.0 + zMeters := float32((newNoiseValue * worldState.spacing * 10.0) / 1000.0) + + *(*float32)(unsafe.Pointer(&chunkData[offset])) = xMeters + *(*float32)(unsafe.Pointer(&chunkData[offset+4])) = yMeters + *(*float32)(unsafe.Pointer(&chunkData[offset+8])) = zMeters + *(*float32)(unsafe.Pointer(&chunkData[offset+12])) = animatedRGB + } + + header := buildUpdateHeader(uint32(startIdx), uint32(count)) + if err := validatePointCloud(chunkData, header); err != nil { + worldState.logger.Errorw("chunk data validation failed", "error", err) + return + } + + worldState.mu.Lock() + if transform, exists := worldState.transforms[pointcloudUUID]; exists { + pc := transform.PhysicalObject.GetPointcloud() + if pc != nil && pc.PointCloud != nil { + for i := 0; i < count; i++ { + pointIdx := startIdx + i + srcOffset := i * stride + dstOffset := pointIdx * stride + if dstOffset+stride <= len(pc.PointCloud) && srcOffset+stride <= len(chunkData) { + copy(pc.PointCloud[dstOffset:dstOffset+stride], chunkData[srcOffset:srcOffset+stride]) + } + } + } + } + worldState.mu.Unlock() + + updatedFields := []string{ + "physicalObject.geometryType.value.pointCloud.pointCloud", + "physicalObject.geometryType.value.pointCloud.header", } + + deltaTransform := &commonpb.Transform{ + Uuid: transform.Uuid, + PhysicalObject: &commonpb.Geometry{ + GeometryType: &commonpb.Geometry_Pointcloud{ + Pointcloud: &commonpb.PointCloud{ + PointCloud: chunkData, + Header: header, + }, + }, + }, + } + + worldState.emitTransformChange(deltaTransform, pb.TransformChangeType_TRANSFORM_CHANGE_TYPE_UPDATED, updatedFields) } -func (f *WorldStateStore) emitTransformUpdate(partial *commonpb.Transform, updatedFields []string) { - if partial == nil || len(partial.GetUuid()) == 0 { +func (worldState *WorldStateStore) emitTransformChange(transform *commonpb.Transform, changeType pb.TransformChangeType, updatedFields []string) { + if transform == nil || len(transform.GetUuid()) == 0 { return } + change := worldstatestore.TransformChange{ - ChangeType: pb.TransformChangeType_TRANSFORM_CHANGE_TYPE_UPDATED, - Transform: partial, + ChangeType: changeType, + Transform: transform, UpdatedFields: updatedFields, } + select { - case f.changeChan <- change: - case <-f.streamCtx.Done(): + case worldState.changeChan <- change: + // Successfully sent + case <-worldState.streamCtx.Done(): + return default: - // Channel is full, skip this update + // Channel is full - implement backpressure strategy with brief timeout + select { + case worldState.changeChan <- change: + case <-time.After(time.Millisecond): + // Drop update after timeout to prevent blocking + case <-worldState.streamCtx.Done(): + return + } } } -func (f *WorldStateStore) animationLoop() { - f.mu.RLock() - curFPS := f.fps - f.mu.RUnlock() - if curFPS <= 0 { - curFPS = 1 - } - interval := time.Duration(float64(time.Second) / curFPS) - ticker := time.NewTicker(interval) +func (worldState *WorldStateStore) animate() { + var ( + curFPS = 10.0 + interval = time.Duration(float64(time.Second) / curFPS) + ticker = time.NewTicker(interval) + lastElapsed time.Duration + fpsCheckCounter int64 + ) defer ticker.Stop() for { select { - case <-f.streamCtx.Done(): + case <-worldState.streamCtx.Done(): return case <-ticker.C: - f.updateTransforms() - // Reconfigure ticker if FPS changed - f.mu.RLock() - newFPS := f.fps - f.mu.RUnlock() - if newFPS != curFPS && newFPS > 0 { - ticker.Stop() - curFPS = newFPS - interval = time.Duration(float64(time.Second) / curFPS) - ticker = time.NewTicker(interval) + elapsed := time.Since(worldState.startTime) + if elapsed-lastElapsed < time.Millisecond { + continue + } + + lastElapsed = elapsed + worldState.update() + + fpsCheckCounter++ + if fpsCheckCounter%100 == 0 { + worldState.mu.RLock() + newFPS := worldState.fps + worldState.mu.RUnlock() + + if newFPS != curFPS && newFPS > 0 { + ticker.Stop() + curFPS = newFPS + interval = time.Duration(float64(time.Second) / curFPS) + ticker = time.NewTicker(interval) + } } } } } -func (f *WorldStateStore) dynamicBoxSequence() { +func (worldState *WorldStateStore) boxSequence() { delay := 3 * time.Second sequence := []struct { action string @@ -463,20 +898,20 @@ func (f *WorldStateStore) dynamicBoxSequence() { for { for _, step := range sequence { select { - case <-f.streamCtx.Done(): + case <-worldState.streamCtx.Done(): return default: } switch step.action { case "add": - f.addDynamicBox(step.name) + worldState.addBox(step.name) case "remove": - f.removeDynamicBox(step.name) + worldState.removeBox(step.name) } select { - case <-f.streamCtx.Done(): + case <-worldState.streamCtx.Done(): return case <-time.After(delay): } @@ -484,7 +919,7 @@ func (f *WorldStateStore) dynamicBoxSequence() { } } -func (f *WorldStateStore) addDynamicBox(name string) { +func (worldState *WorldStateStore) addBox(name string) { var xOffset float64 switch name { @@ -520,18 +955,18 @@ func (f *WorldStateStore) addDynamicBox(name string) { Metadata: dynamicBoxMetadata, } - f.mu.Lock() - f.transforms[uuid] = transform - f.mu.Unlock() + worldState.mu.Lock() + worldState.transforms[uuid] = transform + worldState.mu.Unlock() - f.emitTransformChange(transform, pb.TransformChangeType_TRANSFORM_CHANGE_TYPE_ADDED, nil) + worldState.emitTransformChange(transform, pb.TransformChangeType_TRANSFORM_CHANGE_TYPE_ADDED, nil) } -func (f *WorldStateStore) removeDynamicBox(name string) { - f.mu.Lock() +func (worldState *WorldStateStore) removeBox(name string) { + worldState.mu.Lock() var uuidToRemove string - for uuid := range f.transforms { + for uuid := range worldState.transforms { if strings.HasPrefix(uuid, name) { uuidToRemove = uuid break @@ -539,13 +974,13 @@ func (f *WorldStateStore) removeDynamicBox(name string) { } if uuidToRemove == "" { - f.mu.Unlock() + worldState.mu.Unlock() return } - transform := f.transforms[uuidToRemove] - delete(f.transforms, uuidToRemove) - f.mu.Unlock() + transform := worldState.transforms[uuidToRemove] + delete(worldState.transforms, uuidToRemove) + worldState.mu.Unlock() change := worldstatestore.TransformChange{ ChangeType: pb.TransformChangeType_TRANSFORM_CHANGE_TYPE_REMOVED, @@ -555,17 +990,18 @@ func (f *WorldStateStore) removeDynamicBox(name string) { } select { - case f.changeChan <- change: - case <-f.streamCtx.Done(): + case worldState.changeChan <- change: + case <-worldState.streamCtx.Done(): default: // Channel is full, skip this update } } -func (f *WorldStateStore) updateTransforms() { - elapsed := time.Since(f.startTime) +func (worldState *WorldStateStore) update() { + elapsed := time.Since(worldState.startTime) - f.updateBoxTransform(elapsed) - f.updateSphereTransform(elapsed) - f.updateCapsuleTransform(elapsed) + worldState.updateBox(elapsed) + worldState.updateSphere(elapsed) + worldState.updateCapsule(elapsed) + worldState.updatePointCloud(elapsed) } diff --git a/services/worldstatestore/fake/fake_test.go b/services/worldstatestore/fake/fake_test.go index 7af6e9f8e90..9895d3cccd1 100644 --- a/services/worldstatestore/fake/fake_test.go +++ b/services/worldstatestore/fake/fake_test.go @@ -14,14 +14,15 @@ import ( func TestFakeWorldStateStore(t *testing.T) { // Create a new fake service - fake := newFakeWorldStateStore(resource.Name{Name: "test"}, nil) + logger := logging.NewTestLogger(t) + fake := newFakeWorldStateStore(resource.Name{Name: "test"}, logger) defer fake.Close(context.Background()) // Test ListUUIDs uuids, err := fake.ListUUIDs(context.Background(), nil) test.That(t, err, test.ShouldBeNil) - test.That(t, len(uuids), test.ShouldBeGreaterThanOrEqualTo, 3) // Static transforms are available - test.That(t, len(uuids), test.ShouldBeLessThanOrEqualTo, 4) // Dynamic transform may be available + test.That(t, len(uuids), test.ShouldBeGreaterThanOrEqualTo, 4) // Static transforms are available + test.That(t, len(uuids), test.ShouldBeLessThanOrEqualTo, 5) // Dynamic transform may be available // Test GetTransform for each static transform boxTransform, err := fake.GetTransform(context.Background(), []byte("box-001"), nil) @@ -41,6 +42,12 @@ func TestFakeWorldStateStore(t *testing.T) { test.That(t, capsuleTransform.Uuid, test.ShouldResemble, []byte("capsule-001")) test.That(t, capsuleTransform.Metadata, test.ShouldNotBeNil) + pointcloudTransform, err := fake.GetTransform(context.Background(), []byte("pointcloud-001"), nil) + test.That(t, err, test.ShouldBeNil) + test.That(t, pointcloudTransform, test.ShouldNotBeNil) + test.That(t, pointcloudTransform.Uuid, test.ShouldResemble, []byte("pointcloud-001")) + test.That(t, pointcloudTransform.Metadata, test.ShouldNotBeNil) + // Test StreamTransformChanges ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) defer cancel() diff --git a/services/worldstatestore/fake/point_cloud_generator.go b/services/worldstatestore/fake/point_cloud_generator.go new file mode 100644 index 00000000000..fc7765c9067 --- /dev/null +++ b/services/worldstatestore/fake/point_cloud_generator.go @@ -0,0 +1,122 @@ +package fake + +import ( + "image/color" + "math" + + "github.com/golang/geo/r3" + "go.viam.com/rdk/pointcloud" +) + +func (worldState *WorldStateStore) generateTerrain(x, y int, timeOffset float64) float64 { + var total float64 + var maxAmplitude float64 + + frequency := 0.005 + amplitude := 1.0 + persistence := 0.5 + octaves := 4 + + for i := 0; i < octaves; i++ { + noiseValue := worldState.noise.Noise3D( + (float64(x)+0.5)*frequency, + (float64(y)+0.5)*frequency, + timeOffset*frequency, + ) + + total += noiseValue * amplitude + maxAmplitude += amplitude + + frequency *= 2.0 + amplitude *= persistence + } + + return total +} + +func (worldState *WorldStateStore) generatePointCloud(width, height int) pointcloud.PointCloud { + pointCloud := pointcloud.NewBasicPointCloud(width * height) + for x := 0; x < width; x++ { + for y := 0; y < height; y++ { + xPos := float64(x) + yPos := float64(y) + heightMM := worldState.generateTerrain(x, y, 0) + point := r3.Vector{ + X: xPos * worldState.spacing, + Y: yPos * worldState.spacing, + Z: heightMM * worldState.spacing * 10, // make the height more dramatic + } + + heightColor := heightToColor(heightMM) + packedColor := packColor(heightColor) + colorData := pointcloud.NewValueData(int(math.Float32bits(packedColor))) + + if err := pointCloud.Set(point, colorData); err != nil { + worldState.logger.Warnf("failed to set point", "error", err) + continue + } + } + } + + return pointCloud +} + +func (worldState *WorldStateStore) updatePointHeight(x, y int, currentHeight float64, timeOffset float64) float64 { + animHeightMM := worldState.generateTerrain(x, y, timeOffset) + targetHeightMM := currentHeight*0.8 + animHeightMM*0.2 + newHeightMM := math.Max(-1, math.Min(1, targetHeightMM)) + return newHeightMM +} + +func heightToColor(heightMM float64) color.NRGBA { + var r, g, b uint8 + + if heightMM < -0.5 { + // Deep Blue + r = 29 + g = 130 + b = 201 + } else if heightMM < 0 { + // Blue + r = 33 + g = 136 + b = 217 + } else if heightMM < 0.05 { + // Beige + r = 221 + g = 186 + b = 152 + } else if heightMM < 0.15 { + // Green + r = 76 + g = 186 + b = 80 + } else if heightMM < 0.4 { + // Dark Green + r = 70 + g = 178 + b = 74 + } else if heightMM < 0.65 { + // Brown + r = 111 + g = 105 + b = 96 + } else if heightMM < 0.75 { + // Dark Brown + r = 100 + g = 90 + b = 84 + } else if heightMM < 9 { + // Grey + r = 220 + g = 220 + b = 220 + } else { + // White + r = 255 + g = 255 + b = 255 + } + + return color.NRGBA{R: r, G: g, B: b, A: 255} +}