11import logging
22import threading
33import typing
4- from abc import ABC , abstractmethod
54from datetime import datetime
65from pathlib import Path
76from time import sleep
87
98import cv2
109import numpy as np
11- from pydantic import Field
12- from rcs .camera .interface import (
13- BaseCameraConfig ,
14- BaseCameraSetConfig ,
15- Frame ,
16- FrameSet ,
17- SimpleFrameRate ,
18- )
19-
20-
21- class HWCameraSetConfig (BaseCameraSetConfig ):
22- cameras : dict [str , BaseCameraConfig ] = Field (default = {})
23- warm_up_disposal_frames : int = 30 # frames
24- record_path : str = "camera_frames"
25- max_buffer_frames : int = 1000
26-
27-
28- # TODO(juelg): refactor camera thread into their own class, to avoid a base hardware camera set class
29- # TODO(juelg): add video recording
30- class BaseHardwareCameraSet (ABC ):
31- """This base class should have the ability to poll in a separate thread for all cameras and store them in a buffer.
32- Implements BaseCameraSet
10+ from rcs ._core .common import BaseCameraConfig
11+ from rcs .camera .interface import BaseCameraSet , Frame , FrameSet , SimpleFrameRate
12+
13+
14+ class HardwareCamera (typing .Protocol ):
15+ """Implementation of a hardware camera potentially a set of cameras of the same kind."""
16+
17+ def open (self ):
18+ """Should open the camera and prepare it for polling."""
19+
20+ def close (self ):
21+ """Should close the camera and release all resources."""
22+
23+ def config (self , camera_name : str ) -> BaseCameraConfig :
24+ """Should return the configuration object of the cameras."""
25+
26+ def poll_frame (self , camera_name : str ) -> Frame :
27+ """Should return the latest frame from the camera with the given name.
28+
29+ This method should be thread safe.
30+ """
31+
32+ @property
33+ def camera_names (self ) -> list [str ]:
34+ """Should return a list of the activated human readable names of the cameras."""
35+
36+
37+ class HardwareCameraSet (BaseCameraSet ):
38+ """This base class polls in a separate thread for all cameras and stores them in a buffer.
39+
40+ Cameras can consist of multiple cameras, e.g. RealSense cameras.
3341 """
3442
35- def __init__ (self ):
36- self ._buffer : list [FrameSet | None ] = [None for _ in range (self .config .max_buffer_frames )]
43+ def __init__ (self , cameras : list [HardwareCamera ], warm_up_disposal_frames : int = 30 , max_buffer_frames : int = 1000 ):
44+ self .cameras = cameras
45+ self .camera_dict , self .camera_names = self ._cameras_util ()
46+ self .name_to_identifier = self ._name_to_identifier ()
47+ self .frame_rate = self ._frames_rate ()
48+ self .rate_limiter = SimpleFrameRate (self .frame_rate )
49+
50+ self .warm_up_disposal_frames = warm_up_disposal_frames
51+ self .max_buffer_frames = max_buffer_frames
52+ self ._buffer : list [FrameSet | None ] = [None for _ in range (self .max_buffer_frames )]
3753 self ._buffer_lock = threading .Lock ()
3854 self .running = False
3955 self ._thread : threading .Thread | None = None
4056 self ._logger = logging .getLogger (__name__ )
4157 self ._next_ring_index = 0
4258 self ._buffer_len = 0
4359 self .writer : dict [str , cv2 .VideoWriter ] = {}
44- self .rate = SimpleFrameRate ()
60+
61+ def _name_to_identifier (self ) -> dict [str , str ]:
62+ """Returns a dictionary mapping the camera names to their identifiers."""
63+ name_to_id : dict [str , str ] = {}
64+ for camera in self .cameras :
65+ for name in camera .camera_names :
66+ name_to_id [name ] = camera .config (name ).identifier
67+ return name_to_id
68+
69+ def _frames_rate (self ) -> int :
70+ """Checks if all cameras have the same frame rate."""
71+ frame_rates = {camera .config (name ).frame_rate for camera in self .cameras for name in camera .camera_names }
72+ if len (frame_rates ) > 1 :
73+ msg = "All cameras must have the same frame rate. Different frame rates are not supported."
74+ raise ValueError (msg )
75+ if len (frame_rates ) == 0 :
76+ self ._logger .warning ("No camera found, empty polling with 1 fps." )
77+ return 1
78+ return frame_rates [0 ]
79+
80+ def _cameras_util (self ) -> tuple [dict [str , HardwareCamera ], list [str ]]:
81+ """Utility function to create a dictionary of cameras and a list of camera names."""
82+ camera_dict : dict [str , HardwareCamera ] = {}
83+ camera_names : list [str ] = []
84+ for camera in self .cameras :
85+ camera_names .extend (camera .camera_names )
86+ for name in camera .camera_names :
87+ assert name not in camera_dict , f"Camera name { name } not unique."
88+ camera_dict [name ] = camera
89+ return camera_dict , camera_names
4590
4691 def buffer_size (self ) -> int :
4792 return len (self ._buffer ) - self ._buffer .count (None )
@@ -64,7 +109,7 @@ def get_timestamp_frames(self, ts: datetime) -> FrameSet | None:
64109 # iterate through the buffer and find the closest timestamp
65110 with self ._buffer_lock :
66111 for i in range (self ._buffer_len ):
67- idx = (self ._next_ring_index - i - 1 ) % self .config . max_buffer_frames # iterate backwards
112+ idx = (self ._next_ring_index - i - 1 ) % self .max_buffer_frames # iterate backwards
68113 assert self ._buffer [idx ] is not None
69114 item : FrameSet = typing .cast (FrameSet , self ._buffer [idx ])
70115 assert item .avg_timestamp is not None
@@ -82,6 +127,8 @@ def stop(self):
82127 def close (self ):
83128 if self .running and self ._thread is not None :
84129 self .stop ()
130+ for camera in self .cameras :
131+ camera .close ()
85132 self .stop_video ()
86133
87134 def start (self , warm_up : bool = True ):
@@ -101,8 +148,8 @@ def record_video(self, path: Path, str_id: str):
101148 str (path / f"episode_{ str_id } _{ camera } .mp4" ),
102149 # migh require to install ffmpeg
103150 cv2 .VideoWriter_fourcc (* "mp4v" ), # type: ignore
104- self .config . frame_rate ,
105- (self .config .resolution_width , self .config .resolution_height ),
151+ self .frame_rate ,
152+ (self .config ( camera ) .resolution_width , self .config ( camera ) .resolution_height ),
106153 )
107154
108155 def recording_ongoing (self ) -> bool :
@@ -117,32 +164,35 @@ def stop_video(self):
117164 self .writer = {}
118165
119166 def warm_up (self ):
120- for _ in range (self .config . warm_up_disposal_frames ):
167+ for _ in range (self .warm_up_disposal_frames ):
121168 for camera_name in self .camera_names :
122- self ._poll_frame (camera_name )
123- self .rate ( self . config . frame_rate )
169+ self .poll_frame (camera_name )
170+ self .rate_limiter ( )
124171
125172 def polling_thread (self , warm_up : bool = True ):
173+ for camera in self .cameras :
174+ camera .open ()
126175 if warm_up :
127176 self .warm_up ()
128177 while self .running :
129178 frame_set = self .poll_frame_set ()
130179 # buffering
131180 with self ._buffer_lock :
132181 self ._buffer [self ._next_ring_index ] = frame_set
133- self ._next_ring_index = (self ._next_ring_index + 1 ) % self .config . max_buffer_frames
134- self ._buffer_len = max (self ._buffer_len + 1 , self .config . max_buffer_frames )
182+ self ._next_ring_index = (self ._next_ring_index + 1 ) % self .max_buffer_frames
183+ self ._buffer_len = max (self ._buffer_len + 1 , self .max_buffer_frames )
135184 # video recording
136185 for camera_key , writer in self .writer .items ():
137186 if frame_set is not None :
138187 writer .write (frame_set .frames [camera_key ].camera .color .data [:, :, ::- 1 ])
139- self .rate ( self . config . frame_rate )
188+ self .rate_limiter ( )
140189
141190 def poll_frame_set (self ) -> FrameSet :
142191 """Gather frames over all available cameras."""
143192 frames : dict [str , Frame ] = {}
144193 for camera_name in self .camera_names :
145- frame = self ._poll_frame (camera_name )
194+ # callback
195+ frame = self .poll_frame (camera_name )
146196 frames [camera_name ] = frame
147197 # filter none
148198 timestamps : list [float ] = [frame .avg_timestamp for frame in frames .values () if frame .avg_timestamp is not None ]
@@ -151,29 +201,14 @@ def poll_frame_set(self) -> FrameSet:
151201 def clear_buffer (self ):
152202 """Deletes all frames from the buffer."""
153203 with self ._buffer_lock :
154- self ._buffer = [None for _ in range (self .config . max_buffer_frames )]
204+ self ._buffer = [None for _ in range (self .max_buffer_frames )]
155205 self ._next_ring_index = 0
156206 self ._buffer_len = 0
157207 self .wait_for_frames ()
158208
159- @property
160- @abstractmethod
161- def config (self ) -> HWCameraSetConfig :
162- """Should return the configuration object of the cameras."""
163-
164- @abstractmethod
165- def _poll_frame (self , camera_name : str ) -> Frame :
166- """Should return the latest frame from the camera with the given name.
167-
168- This method should be thread safe.
169- """
170-
171- @property
172- def camera_names (self ) -> list [str ]:
173- """Should return a list of the activated human readable names of the cameras."""
174- return list (self .config .cameras )
209+ def config (self , camera_name : str ) -> BaseCameraConfig :
210+ """Returns the configuration object of the cameras."""
211+ return self .camera_dict [camera_name ].config (camera_name )
175212
176- @property
177- def name_to_identifier (self ) -> dict [str , str ]:
178- # return {key: camera.identifier for key, camera in self._cfg.cameras.items()}
179- return self .config .name_to_identifier
213+ def poll_frame (self , camera_name : str ) -> Frame :
214+ return self .camera_dict [camera_name ].poll_frame (camera_name )
0 commit comments