From 1b2f372777f7f8ccdbb2809d982e8b064d26d6a8 Mon Sep 17 00:00:00 2001 From: Tim Huff Date: Tue, 7 Oct 2025 17:11:09 -0700 Subject: [PATCH 01/37] got it sort of working --- sample_scripts/video_to_rtsp.py | 115 +++++++++++------ sample_scripts/view_rtsp_stream.py | 77 +++++++++++ src/framegrab/rtsp_server.py | 197 +++++++++++++++++++++-------- 3 files changed, 294 insertions(+), 95 deletions(-) create mode 100644 sample_scripts/view_rtsp_stream.py diff --git a/sample_scripts/video_to_rtsp.py b/sample_scripts/video_to_rtsp.py index 1de83cd..3271da5 100644 --- a/sample_scripts/video_to_rtsp.py +++ b/sample_scripts/video_to_rtsp.py @@ -1,55 +1,92 @@ import argparse +import logging +import time from framegrab import FrameGrabber -from framegrab.config import FileStreamFrameGrabberConfig, GenericUSBFrameGrabberConfig +from framegrab.config import FileStreamFrameGrabberConfig from framegrab.rtsp_server import RTSPServer import cv2 import numpy as np -import time -def main(): - parser = argparse.ArgumentParser(description='Stream a video file via RTSP') - parser.add_argument('video_path', help='Path to the video file to stream') +# Configure logging to show INFO level messages +logging.basicConfig(level=logging.INFO, format='%(levelname)s - %(name)s - %(message)s') + +logger = logging.getLogger(__name__) + +SMALL_FRAME_WIDTH_MAX = 300 +SMALL_FRAME_HEIGHT_MAX = 200 + +class VideoToRTSPSampleApp: + def __init__(self, video_paths: list[str], port: int): + + self.video_paths = video_paths + self.port = port + + self.server = RTSPServer(port=port) + + self.grabbers = [] + for n, video_path in enumerate(video_paths): + # Connect to the grabber + config = FileStreamFrameGrabberConfig(filename=video_path) + grabber = FrameGrabber.create_grabber(config) + self.grabbers.append(grabber) + + # Determine the resolution of the full-size stream + test_frame = grabber.grab() + height, width, _ = test_frame.shape + + # Determine the FPS of the video + fps = grabber.get_fps() + + # Reset to beginning after test frame + grabber.seek_to_beginning() + + def get_frame_callback(grabber: FrameGrabber = grabber, video_path: str = video_path) -> np.ndarray: + try: + return grabber.grab() + except RuntimeWarning: + last_frame_read_number = grabber.get_last_frame_read_number() + logger.info(f'Got to end of {video_path}. Read {last_frame_read_number + 1} frames. Restarting from the beginning of the video...') + grabber.seek_to_beginning() + return grabber.grab() + + self.server.create_stream(get_frame_callback, width=width, height=height, fps=fps, mount_point=f'/stream{n}') + + def list_rtsp_urls(self) -> list[str]: + return self.server.list_rtsp_urls() + + def run(self) -> None: + self.server.start() + + def stop(self) -> None: + self.server.stop() + + for g in self.grabbers: + g.release() + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description='Stream multiple video files via RTSP') + parser.add_argument('video_paths', nargs='+', help='Paths to video files to stream (one or more)') parser.add_argument('--port', type=int, default=8554, help='RTSP server port (default: 8554)') args = parser.parse_args() - # Connect to the grabber - config = FileStreamFrameGrabberConfig(filename=args.video_path) - grabber = FrameGrabber.create_grabber(config) - - # Determine the resolution of the video - test_frame = grabber.grab() - height, width, _ = test_frame.shape - - # Determine the FPS of the video - fps = grabber.get_fps() - - # Reset to beginning after test frame - grabber.seek_to_beginning() - - def get_frame_callback() -> np.ndarray: - try: - return grabber.grab() - except RuntimeWarning: - last_frame_read_number = grabber.get_last_frame_read_number() - print(f'Got to end of file. Read {last_frame_read_number + 1} frames. Seeking back to the beginning of the video...') - grabber.seek_to_beginning() - print(f'Returned to the beginning of the file. Continuing to read the video...') - return grabber.grab() + app = VideoToRTSPSampleApp(args.video_paths, args.port) try: - with RTSPServer(get_frame_callback, width=width, height=height, fps=fps, port=args.port) as server: - print(server) - print("Press Ctrl+C to stop...") + app.run() + logger.info(f'RTSP Server startd on port {app.port}') + + rtsp_urls = app.list_rtsp_urls() + for url, path in zip(rtsp_urls, app.video_paths): + logger.info(f'{path} available at {url}') + logger.info("Press Ctrl+C to stop...") + + # Keep the program running + while True: + time.sleep(1) - while True: - time.sleep(1) # Keep alive, wake up periodically to check for KeyboardInterrupt - except KeyboardInterrupt: - print("\nShutting down gracefully...") + logger.info("Shutting down gracefully...") finally: - grabber.release() - -if __name__ == "__main__": - main() \ No newline at end of file + app.stop() \ No newline at end of file diff --git a/sample_scripts/view_rtsp_stream.py b/sample_scripts/view_rtsp_stream.py new file mode 100644 index 0000000..ba7aaf3 --- /dev/null +++ b/sample_scripts/view_rtsp_stream.py @@ -0,0 +1,77 @@ +import argparse +from framegrab import FrameGrabber +from framegrab.config import RTSPFrameGrabberConfig + +import cv2 +import numpy as np + +import time + +def resize_frame(frame: np.ndarray, max_width: int = None, max_height: int = None) -> np.ndarray: + """ + Resizes an image to fit within a given height and/or width, without changing the aspect ratio. + + Args: + frame: Input image as numpy array + max_width: Maximum width (optional) + max_height: Maximum height (optional) + + Returns: + Resized image that fits within the specified dimensions + """ + if max_width is None and max_height is None: + return frame + + height, width = frame.shape[:2] + + # Calculate scaling factors + scale_w = scale_h = 1.0 + + if max_width is not None and width > max_width: + scale_w = max_width / width + + if max_height is not None and height > max_height: + scale_h = max_height / height + + # Use the smaller scaling factor to ensure image fits within both dimensions + scale = min(scale_w, scale_h) + + # Only resize if scaling is needed + if scale < 1.0: + new_width = int(width * scale) + new_height = int(height * scale) + return cv2.resize(frame, (new_width, new_height)) + + return frame + +def main(): + parser = argparse.ArgumentParser(description='Stream RTSP video using framegrab') + parser.add_argument('rtsp_url', help='RTSP URL to stream (e.g., rtsp://localhost:8554/stream_fullsize)') + args = parser.parse_args() + + config = RTSPFrameGrabberConfig(rtsp_url=args.rtsp_url) + t1 = time.time() + grabber = FrameGrabber.create_grabber(config) + t2 = time.time() + elapsed_time = t2 - t1 + print(f'Created grabber in {elapsed_time:.2f} seconds.') + + print(f"Streaming from: {args.rtsp_url}") + print("Press 'q' to quit") + + try: + while True: + frame = grabber.grab() + resized_frame = resize_frame(frame, 640, 480) + cv2.imshow(f'Streaming {args.rtsp_url}', resized_frame) + key = cv2.waitKey(30) + if key == ord('q'): + break + except KeyboardInterrupt: + print("\nStopping...") + finally: + grabber.release() + cv2.destroyAllWindows() + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/src/framegrab/rtsp_server.py b/src/framegrab/rtsp_server.py index d9cfaef..3ad1e15 100644 --- a/src/framegrab/rtsp_server.py +++ b/src/framegrab/rtsp_server.py @@ -2,7 +2,7 @@ import platform import threading import time -from typing import Callable +from typing import Callable, Dict, List import cv2 import numpy as np @@ -25,29 +25,28 @@ logger = logging.getLogger(__name__) +class Stream: + """Represents a single RTSP stream.""" + + def __init__(self, callback: Callable[[], np.ndarray], width: int, height: int, + mount_point: str, fps: int = 30): + self.callback = callback + self.width = width + self.height = height + self.mount_point = mount_point + self.fps = int(fps) + self.frame_count = 0 + + class RTSPServer: - """Simple RTSP server that streams frames via callback.""" - - def __init__( - self, - callback: Callable[[], np.ndarray], - width: int, - height: int, - port: int = 8554, - mount_point: str = "/stream", - fps: int = 30, - ): + """RTSP server that supports multiple streams.""" + + def __init__(self, port: int = 8554): """Initialize RTSP server. Args: - callback: Function that returns a frame when called - width: Frame width (required) - height: Frame height (required) port: RTSP server port (default: 8554) - mount_point: RTSP mount point (default: /stream) - fps: Target FPS for RTSP stream (default: 30) """ - system = platform.system() if system == "Windows": raise RuntimeError( @@ -60,37 +59,62 @@ def __init__( "RTSPServer has limited support on macOS. " "You may need to install GStreamer via Homebrew: " ) - _ = gi, cv2, GLib, Gst, GstRtspServer - - self.callback = callback self.port = port - self.mount_point = mount_point - self.fps = int(fps) - self.width = width - self.height = height - - self.frame_count = 0 - + self.streams: Dict[str, Stream] = {} + self._client_streams = {} # Track which streams each client is accessing + # GStreamer objects self._server = None self._loop = None self._loop_thread = None self._running = False - self.rtsp_url = f"rtsp://localhost:{self.port}{self.mount_point}" - def __str__(self) -> str: status = "running" if self._running else "stopped" - return f"RTSPServer({status}) - {self.rtsp_url}" + stream_count = len(self.streams) + return f"RTSPServer({status}) - port:{self.port}, streams:{stream_count}" def __repr__(self) -> str: - status = "running" if self._running else "stopped" - return f"RTSPServer({status}) - {self.rtsp_url}" + return self.__str__() + + def create_stream(self, callback: Callable[[], np.ndarray], width: int, height: int, + mount_point: str, fps: int = 30) -> None: + """Create a new stream. + + Args: + callback: Function that returns a frame when called + width: Frame width + height: Frame height + mount_point: RTSP mount point (e.g., '/stream0') + fps: Target FPS for stream (default: 30) + """ + if mount_point in self.streams: + raise ValueError(f"Stream with mount point '{mount_point}' already exists") + + self.streams[mount_point] = Stream(callback, width, height, mount_point, fps) + + def list_streams(self) -> List[str]: + """List all stream mount points.""" + return list(self.streams.keys()) + + def list_rtsp_urls(self) -> List[str]: + """Get a list of RTSP URLs for all streams.""" + return [f"rtsp://localhost:{self.port}{mount_point}" for mount_point in self.streams.keys()] + + def remove_stream(self, mount_point: str) -> None: + """Remove a stream.""" + if mount_point not in self.streams: + raise ValueError(f"Stream with mount point '{mount_point}' does not exist") + + del self.streams[mount_point] def start(self) -> None: """Start the RTSP server in a background thread.""" if self._running: return + + if not self.streams: + raise RuntimeError("No streams created. Call create_stream() first.") self._running = True self._loop_thread = threading.Thread(target=self._run_server, daemon=True) @@ -99,8 +123,6 @@ def start(self) -> None: # Give server time to start time.sleep(0.5) - logger.info(f"RTSP server started: {self.rtsp_url}") - def stop(self) -> None: """Stop the RTSP server.""" if not self._running: @@ -111,6 +133,8 @@ def stop(self) -> None: self._loop.quit() if self._loop_thread: self._loop_thread.join(timeout=2.0) + + self.streams.clear() def _run_server(self) -> None: """Run the GStreamer RTSP server main loop.""" @@ -119,11 +143,16 @@ def _run_server(self) -> None: self._server = GstRtspServer.RTSPServer() self._server.set_service(str(self.port)) - factory = self._create_media_factory() - factory.set_shared(True) + # Set up client connection callback + self._server.connect("client-connected", self._on_client_connected) mount_points = self._server.get_mount_points() - mount_points.add_factory(self.mount_point, factory) + + # Create a factory for each stream + for stream in self.streams.values(): + factory = self._create_media_factory(stream) + factory.set_shared(True) + mount_points.add_factory(stream.mount_point, factory) self._server.attach(None) @@ -133,19 +162,21 @@ def _run_server(self) -> None: finally: self._running = False - def _create_media_factory(self): - """Create the GStreamer media factory.""" + def _create_media_factory(self, stream: Stream): + """Create the GStreamer media factory for a specific stream.""" class RTSPMediaFactory(GstRtspServer.RTSPMediaFactory): - def __init__(self, rtsp_server): + def __init__(self, stream, rtsp_server): super().__init__() + self.stream = stream self.rtsp_server = rtsp_server + self.set_shared(False) def do_create_element(self, url): pipeline = ( f"appsrc name=source is-live=true format=GST_FORMAT_TIME " - f"caps=video/x-raw,format=RGB,width={self.rtsp_server.width}," - f"height={self.rtsp_server.height},framerate={self.rtsp_server.fps}/1 " + f"caps=video/x-raw,format=RGB,width={self.stream.width}," + f"height={self.stream.height},framerate={self.stream.fps}/1 " f"! videoconvert ! video/x-raw,format=I420 " f"! x264enc speed-preset=ultrafast tune=zerolatency " f"! rtph264pay name=pay0 pt=96" @@ -155,21 +186,75 @@ def do_create_element(self, url): def do_configure(self, rtsp_media): appsrc = rtsp_media.get_element().get_child_by_name("source") appsrc.connect("need-data", self.on_need_data) + + # Connect to cleanup signal to prevent resource leaks + rtsp_media.connect("unprepared", self._on_media_unprepared) + + # Try to find which client is accessing this stream + # This is a bit of a hack since GStreamer doesn't directly provide this info + client_info = None + for _, info in self.rtsp_server._client_streams.items(): + if not info['streams']: # This client hasn't accessed any streams yet + client_info = info + info['streams'].add(self.stream.mount_point) + break + + if client_info: + logger.info(f"RTSP Server on port {self.rtsp_server.port}: RTSP client {client_info['ip']} connected to {self.stream.mount_point}") + + def _on_media_unprepared(self, rtsp_media): + """Clean up resources when client disconnects to prevent leaks.""" + element = rtsp_media.get_element() + if element: + element.set_state(Gst.State.NULL) + # Force state change to complete + element.get_state(Gst.CLOCK_TIME_NONE) + # Unref to ensure complete cleanup + element.unref() def on_need_data(self, src, length): - frame = self.rtsp_server.callback() - frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) - - # Convert to GStreamer buffer - buf = Gst.Buffer.new_allocate(None, frame.nbytes, None) - buf.fill(0, frame.tobytes()) - buf.duration = Gst.SECOND // self.rtsp_server.fps - buf.pts = self.rtsp_server.frame_count * buf.duration - - self.rtsp_server.frame_count += 1 - src.emit("push-buffer", buf) - - return RTSPMediaFactory(self) + try: + frame = self.stream.callback() + frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) + + # Convert to GStreamer buffer + buf = Gst.Buffer.new_allocate(None, frame.nbytes, None) + buf.fill(0, frame.tobytes()) + buf.duration = Gst.SECOND // self.stream.fps + buf.pts = self.stream.frame_count * buf.duration + self.stream.frame_count += 1 + src.emit("push-buffer", buf) + except Exception as e: + logger.error(f"Error in RTSP callback for {self.stream.mount_point}: {e}") + # Push an empty buffer to keep the stream alive + buf = Gst.Buffer.new_allocate(None, 0, None) + src.emit("push-buffer", buf) + + return RTSPMediaFactory(stream, self) + + def _on_client_connected(self, server, client): + """Callback when a client connects to the RTSP server.""" + connection = client.get_connection() + client_ip = connection.get_ip() + + # Track this client and their streams + self._client_streams[client] = { + 'ip': client_ip, + 'streams': set() + } + + # Connect to the client's 'closed' signal to detect disconnection + client.connect("closed", self._on_client_disconnected) + + def _on_client_disconnected(self, client): + """Callback when a client disconnects from the RTSP server.""" + client_info = self._client_streams.pop(client, None) + if client_info is None: + logger.warning(f"RTSP Server on port {self.port}: Client disconnected but was not tracked") + return + + streams_str = ', '.join(sorted(client_info['streams'])) if client_info['streams'] else 'no streams' + logger.info(f"RTSP Server on port {self.port}: RTSP client {client_info['ip']} disconnected from {streams_str}") def __enter__(self): """Context manager entry.""" From 611fe507416d3a5bcc15f1dba6b1a359570ed9ac Mon Sep 17 00:00:00 2001 From: Tim Huff Date: Tue, 7 Oct 2025 17:22:17 -0700 Subject: [PATCH 02/37] fixed buffer issue --- src/framegrab/rtsp_server.py | 18 +++++------------- 1 file changed, 5 insertions(+), 13 deletions(-) diff --git a/src/framegrab/rtsp_server.py b/src/framegrab/rtsp_server.py index 3ad1e15..8ba29c3 100644 --- a/src/framegrab/rtsp_server.py +++ b/src/framegrab/rtsp_server.py @@ -170,7 +170,7 @@ def __init__(self, stream, rtsp_server): super().__init__() self.stream = stream self.rtsp_server = rtsp_server - self.set_shared(False) + # self.set_shared(False) def do_create_element(self, url): pipeline = ( @@ -187,8 +187,10 @@ def do_configure(self, rtsp_media): appsrc = rtsp_media.get_element().get_child_by_name("source") appsrc.connect("need-data", self.on_need_data) - # Connect to cleanup signal to prevent resource leaks - rtsp_media.connect("unprepared", self._on_media_unprepared) + # Reset frame count for each new client connection. + # Without this, new clients inherit accumulated frame count from previous clients, + # causing PTS overflow and progressive connection slowdown due to buffer accumulation. + self.stream.frame_count = 0 # Try to find which client is accessing this stream # This is a bit of a hack since GStreamer doesn't directly provide this info @@ -202,16 +204,6 @@ def do_configure(self, rtsp_media): if client_info: logger.info(f"RTSP Server on port {self.rtsp_server.port}: RTSP client {client_info['ip']} connected to {self.stream.mount_point}") - def _on_media_unprepared(self, rtsp_media): - """Clean up resources when client disconnects to prevent leaks.""" - element = rtsp_media.get_element() - if element: - element.set_state(Gst.State.NULL) - # Force state change to complete - element.get_state(Gst.CLOCK_TIME_NONE) - # Unref to ensure complete cleanup - element.unref() - def on_need_data(self, src, length): try: frame = self.stream.callback() From e77a30137e0c7c7432eb4a7b14122ceae9515235 Mon Sep 17 00:00:00 2001 From: Auto-format Bot Date: Wed, 8 Oct 2025 00:24:53 +0000 Subject: [PATCH 03/37] Automatically reformatting code with black and isort --- src/framegrab/rtsp_server.py | 53 ++++++++++++++++++------------------ 1 file changed, 26 insertions(+), 27 deletions(-) diff --git a/src/framegrab/rtsp_server.py b/src/framegrab/rtsp_server.py index 8ba29c3..bfb1ea3 100644 --- a/src/framegrab/rtsp_server.py +++ b/src/framegrab/rtsp_server.py @@ -27,9 +27,8 @@ class Stream: """Represents a single RTSP stream.""" - - def __init__(self, callback: Callable[[], np.ndarray], width: int, height: int, - mount_point: str, fps: int = 30): + + def __init__(self, callback: Callable[[], np.ndarray], width: int, height: int, mount_point: str, fps: int = 30): self.callback = callback self.width = width self.height = height @@ -62,7 +61,7 @@ def __init__(self, port: int = 8554): self.port = port self.streams: Dict[str, Stream] = {} self._client_streams = {} # Track which streams each client is accessing - + # GStreamer objects self._server = None self._loop = None @@ -77,20 +76,21 @@ def __str__(self) -> str: def __repr__(self) -> str: return self.__str__() - def create_stream(self, callback: Callable[[], np.ndarray], width: int, height: int, - mount_point: str, fps: int = 30) -> None: + def create_stream( + self, callback: Callable[[], np.ndarray], width: int, height: int, mount_point: str, fps: int = 30 + ) -> None: """Create a new stream. - + Args: callback: Function that returns a frame when called width: Frame width - height: Frame height + height: Frame height mount_point: RTSP mount point (e.g., '/stream0') fps: Target FPS for stream (default: 30) """ if mount_point in self.streams: raise ValueError(f"Stream with mount point '{mount_point}' already exists") - + self.streams[mount_point] = Stream(callback, width, height, mount_point, fps) def list_streams(self) -> List[str]: @@ -105,14 +105,14 @@ def remove_stream(self, mount_point: str) -> None: """Remove a stream.""" if mount_point not in self.streams: raise ValueError(f"Stream with mount point '{mount_point}' does not exist") - + del self.streams[mount_point] def start(self) -> None: """Start the RTSP server in a background thread.""" if self._running: return - + if not self.streams: raise RuntimeError("No streams created. Call create_stream() first.") @@ -133,7 +133,7 @@ def stop(self) -> None: self._loop.quit() if self._loop_thread: self._loop_thread.join(timeout=2.0) - + self.streams.clear() def _run_server(self) -> None: @@ -147,7 +147,7 @@ def _run_server(self) -> None: self._server.connect("client-connected", self._on_client_connected) mount_points = self._server.get_mount_points() - + # Create a factory for each stream for stream in self.streams.values(): factory = self._create_media_factory(stream) @@ -186,23 +186,25 @@ def do_create_element(self, url): def do_configure(self, rtsp_media): appsrc = rtsp_media.get_element().get_child_by_name("source") appsrc.connect("need-data", self.on_need_data) - + # Reset frame count for each new client connection. # Without this, new clients inherit accumulated frame count from previous clients, # causing PTS overflow and progressive connection slowdown due to buffer accumulation. self.stream.frame_count = 0 - + # Try to find which client is accessing this stream # This is a bit of a hack since GStreamer doesn't directly provide this info client_info = None for _, info in self.rtsp_server._client_streams.items(): - if not info['streams']: # This client hasn't accessed any streams yet + if not info["streams"]: # This client hasn't accessed any streams yet client_info = info - info['streams'].add(self.stream.mount_point) + info["streams"].add(self.stream.mount_point) break - + if client_info: - logger.info(f"RTSP Server on port {self.rtsp_server.port}: RTSP client {client_info['ip']} connected to {self.stream.mount_point}") + logger.info( + f"RTSP Server on port {self.rtsp_server.port}: RTSP client {client_info['ip']} connected to {self.stream.mount_point}" + ) def on_need_data(self, src, length): try: @@ -228,13 +230,10 @@ def _on_client_connected(self, server, client): """Callback when a client connects to the RTSP server.""" connection = client.get_connection() client_ip = connection.get_ip() - + # Track this client and their streams - self._client_streams[client] = { - 'ip': client_ip, - 'streams': set() - } - + self._client_streams[client] = {"ip": client_ip, "streams": set()} + # Connect to the client's 'closed' signal to detect disconnection client.connect("closed", self._on_client_disconnected) @@ -244,8 +243,8 @@ def _on_client_disconnected(self, client): if client_info is None: logger.warning(f"RTSP Server on port {self.port}: Client disconnected but was not tracked") return - - streams_str = ', '.join(sorted(client_info['streams'])) if client_info['streams'] else 'no streams' + + streams_str = ", ".join(sorted(client_info["streams"])) if client_info["streams"] else "no streams" logger.info(f"RTSP Server on port {self.port}: RTSP client {client_info['ip']} disconnected from {streams_str}") def __enter__(self): From e81678a5f4388874248bdd908c94c4a57058f447 Mon Sep 17 00:00:00 2001 From: Tim Huff Date: Tue, 7 Oct 2025 17:34:43 -0700 Subject: [PATCH 04/37] fixing some typos --- sample_scripts/video_to_rtsp.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sample_scripts/video_to_rtsp.py b/sample_scripts/video_to_rtsp.py index 3271da5..654cc2c 100644 --- a/sample_scripts/video_to_rtsp.py +++ b/sample_scripts/video_to_rtsp.py @@ -32,7 +32,7 @@ def __init__(self, video_paths: list[str], port: int): grabber = FrameGrabber.create_grabber(config) self.grabbers.append(grabber) - # Determine the resolution of the full-size stream + # Determine the resolution of the video test_frame = grabber.grab() height, width, _ = test_frame.shape @@ -75,7 +75,7 @@ def stop(self) -> None: try: app.run() - logger.info(f'RTSP Server startd on port {app.port}') + logger.info(f'RTSP Server started on port {app.port}') rtsp_urls = app.list_rtsp_urls() for url, path in zip(rtsp_urls, app.video_paths): From a3dd9e0c58df40e5215fd79c65535245f7085b3f Mon Sep 17 00:00:00 2001 From: Tim Huff Date: Tue, 7 Oct 2025 17:36:31 -0700 Subject: [PATCH 05/37] trivial change --- sample_scripts/video_to_rtsp.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sample_scripts/video_to_rtsp.py b/sample_scripts/video_to_rtsp.py index 654cc2c..45babe2 100644 --- a/sample_scripts/video_to_rtsp.py +++ b/sample_scripts/video_to_rtsp.py @@ -47,7 +47,7 @@ def get_frame_callback(grabber: FrameGrabber = grabber, video_path: str = video_ return grabber.grab() except RuntimeWarning: last_frame_read_number = grabber.get_last_frame_read_number() - logger.info(f'Got to end of {video_path}. Read {last_frame_read_number + 1} frames. Restarting from the beginning of the video...') + logger.info(f'Reached the end of {video_path}. Read {last_frame_read_number + 1} frames. Restarting from the beginning of the video...') grabber.seek_to_beginning() return grabber.grab() From 51acd36f6995cfc96ecf364cba838b3283e821ef Mon Sep 17 00:00:00 2001 From: Tim Huff Date: Wed, 8 Oct 2025 17:41:36 -0700 Subject: [PATCH 06/37] refactoring to properly handle multiple clients --- sample_scripts/video_to_rtsp.py | 40 +++++++++++++++++++++--------- sample_scripts/view_rtsp_stream.py | 5 ++-- src/framegrab/grabber.py | 3 ++- src/framegrab/rtsp_server.py | 32 ++++++++---------------- 4 files changed, 43 insertions(+), 37 deletions(-) diff --git a/sample_scripts/video_to_rtsp.py b/sample_scripts/video_to_rtsp.py index 45babe2..7963360 100644 --- a/sample_scripts/video_to_rtsp.py +++ b/sample_scripts/video_to_rtsp.py @@ -1,35 +1,37 @@ import argparse import logging import time -from framegrab import FrameGrabber +from framegrab.grabber import FileStreamFrameGrabber from framegrab.config import FileStreamFrameGrabberConfig from framegrab.rtsp_server import RTSPServer -import cv2 import numpy as np -# Configure logging to show INFO level messages +import threading + +from cachetools import TTLCache, cached + + logging.basicConfig(level=logging.INFO, format='%(levelname)s - %(name)s - %(message)s') logger = logging.getLogger(__name__) -SMALL_FRAME_WIDTH_MAX = 300 -SMALL_FRAME_HEIGHT_MAX = 200 - class VideoToRTSPSampleApp: + frame_cache = {} def __init__(self, video_paths: list[str], port: int): self.video_paths = video_paths self.port = port self.server = RTSPServer(port=port) + self.grabbers = [] for n, video_path in enumerate(video_paths): # Connect to the grabber config = FileStreamFrameGrabberConfig(filename=video_path) - grabber = FrameGrabber.create_grabber(config) + grabber = FileStreamFrameGrabber(config) self.grabbers.append(grabber) # Determine the resolution of the video @@ -38,20 +40,34 @@ def __init__(self, video_paths: list[str], port: int): # Determine the FPS of the video fps = grabber.get_fps() + print(fps) - # Reset to beginning after test frame + # Reset to beginning after test frame so that streaming starts from the beginning of the video grabber.seek_to_beginning() - def get_frame_callback(grabber: FrameGrabber = grabber, video_path: str = video_path) -> np.ndarray: + # frame_cache = TTLCache(maxsize=1, ttl=1/fps * 0.9) + # print(1/fps) + start_time = None + # @cached(frame_cache) + def get_frame_callback( + grabber: FileStreamFrameGrabber = grabber, + video_path: str = video_path, + ) -> np.ndarray: + nonlocal start_time + if start_time is None: + start_time = time.time() try: return grabber.grab() except RuntimeWarning: + elapsed_time = time.time() - start_time last_frame_read_number = grabber.get_last_frame_read_number() - logger.info(f'Reached the end of {video_path}. Read {last_frame_read_number + 1} frames. Restarting from the beginning of the video...') + logger.info(f'Reached the end of {video_path}. Read {last_frame_read_number + 1} frames in {elapsed_time:.2f} seconds. Restarting from the beginning of the video...') + start_time = time.time() grabber.seek_to_beginning() return grabber.grab() - self.server.create_stream(get_frame_callback, width=width, height=height, fps=fps, mount_point=f'/stream{n}') + mount_point = f'/stream{n}' + self.server.create_stream(get_frame_callback, width=width, height=height, fps=fps, mount_point=mount_point) def list_rtsp_urls(self) -> list[str]: return self.server.list_rtsp_urls() @@ -68,7 +84,7 @@ def stop(self) -> None: if __name__ == "__main__": parser = argparse.ArgumentParser(description='Stream multiple video files via RTSP') parser.add_argument('video_paths', nargs='+', help='Paths to video files to stream (one or more)') - parser.add_argument('--port', type=int, default=8554, help='RTSP server port (default: 8554)') + parser.add_argument('--port', type=int, default=8554, help='RTSP server port') args = parser.parse_args() app = VideoToRTSPSampleApp(args.video_paths, args.port) diff --git a/sample_scripts/view_rtsp_stream.py b/sample_scripts/view_rtsp_stream.py index ba7aaf3..cfb0e6d 100644 --- a/sample_scripts/view_rtsp_stream.py +++ b/sample_scripts/view_rtsp_stream.py @@ -59,11 +59,12 @@ def main(): print(f"Streaming from: {args.rtsp_url}") print("Press 'q' to quit") + start_time = time.time() try: while True: frame = grabber.grab() - resized_frame = resize_frame(frame, 640, 480) - cv2.imshow(f'Streaming {args.rtsp_url}', resized_frame) + resized_frame = resize_frame(frame, 640, 480) # get a smaller frame so it's easier to view + cv2.imshow(f'Streaming {args.rtsp_url} {start_time}', resized_frame) key = cv2.waitKey(30) if key == ord('q'): break diff --git a/src/framegrab/grabber.py b/src/framegrab/grabber.py index 8ff5835..87eceeb 100644 --- a/src/framegrab/grabber.py +++ b/src/framegrab/grabber.py @@ -1382,7 +1382,7 @@ def _initialize_grabber_implementation(self): raise ValueError(f"Could not read first frame of file {self.config.filename}. Is it a valid video file?") # Reset frame position back to the first frame after validation - self.capture.set(cv2.CAP_PROP_POS_FRAMES, 0) + self.seek_to_beginning() self.fps_source = round(self.capture.get(cv2.CAP_PROP_FPS), 2) if self.fps_source <= 0.1: @@ -1444,6 +1444,7 @@ def seek_to_frame(self, frame_number: int) -> None: frame_number: Frame number to seek to (0-based) """ if frame_number < 0: + # OpenCV fails silently when you try this, so we raise an exception raise ValueError(f"Frame number must be non-negative, got {frame_number}") self.capture.set(cv2.CAP_PROP_POS_FRAMES, frame_number) diff --git a/src/framegrab/rtsp_server.py b/src/framegrab/rtsp_server.py index bfb1ea3..130bdda 100644 --- a/src/framegrab/rtsp_server.py +++ b/src/framegrab/rtsp_server.py @@ -28,14 +28,12 @@ class Stream: """Represents a single RTSP stream.""" - def __init__(self, callback: Callable[[], np.ndarray], width: int, height: int, mount_point: str, fps: int = 30): + def __init__(self, callback: Callable[[], np.ndarray], width: int, height: int, mount_point: str, fps: int): self.callback = callback self.width = width self.height = height self.mount_point = mount_point self.fps = int(fps) - self.frame_count = 0 - class RTSPServer: """RTSP server that supports multiple streams.""" @@ -58,7 +56,7 @@ def __init__(self, port: int = 8554): "RTSPServer has limited support on macOS. " "You may need to install GStreamer via Homebrew: " ) - self.port = port + self.port = int(port) self.streams: Dict[str, Stream] = {} self._client_streams = {} # Track which streams each client is accessing @@ -77,7 +75,7 @@ def __repr__(self) -> str: return self.__str__() def create_stream( - self, callback: Callable[[], np.ndarray], width: int, height: int, mount_point: str, fps: int = 30 + self, callback: Callable[[], np.ndarray], width: int, height: int, mount_point: str, fps: int ) -> None: """Create a new stream. @@ -86,7 +84,7 @@ def create_stream( width: Frame width height: Frame height mount_point: RTSP mount point (e.g., '/stream0') - fps: Target FPS for stream (default: 30) + fps: Target FPS for stream """ if mount_point in self.streams: raise ValueError(f"Stream with mount point '{mount_point}' already exists") @@ -111,7 +109,7 @@ def remove_stream(self, mount_point: str) -> None: def start(self) -> None: """Start the RTSP server in a background thread.""" if self._running: - return + raise RuntimeError("RTSP server already started.") if not self.streams: raise RuntimeError("No streams created. Call create_stream() first.") @@ -120,9 +118,6 @@ def start(self) -> None: self._loop_thread = threading.Thread(target=self._run_server, daemon=True) self._loop_thread.start() - # Give server time to start - time.sleep(0.5) - def stop(self) -> None: """Stop the RTSP server.""" if not self._running: @@ -151,7 +146,7 @@ def _run_server(self) -> None: # Create a factory for each stream for stream in self.streams.values(): factory = self._create_media_factory(stream) - factory.set_shared(True) + factory.set_shared(False) mount_points.add_factory(stream.mount_point, factory) self._server.attach(None) @@ -170,7 +165,6 @@ def __init__(self, stream, rtsp_server): super().__init__() self.stream = stream self.rtsp_server = rtsp_server - # self.set_shared(False) def do_create_element(self, url): pipeline = ( @@ -185,13 +179,10 @@ def do_create_element(self, url): def do_configure(self, rtsp_media): appsrc = rtsp_media.get_element().get_child_by_name("source") + appsrc.set_property("format", Gst.Format.TIME) + appsrc.set_property("do-timestamp", True) appsrc.connect("need-data", self.on_need_data) - # Reset frame count for each new client connection. - # Without this, new clients inherit accumulated frame count from previous clients, - # causing PTS overflow and progressive connection slowdown due to buffer accumulation. - self.stream.frame_count = 0 - # Try to find which client is accessing this stream # This is a bit of a hack since GStreamer doesn't directly provide this info client_info = None @@ -200,7 +191,7 @@ def do_configure(self, rtsp_media): client_info = info info["streams"].add(self.stream.mount_point) break - + if client_info: logger.info( f"RTSP Server on port {self.rtsp_server.port}: RTSP client {client_info['ip']} connected to {self.stream.mount_point}" @@ -215,8 +206,6 @@ def on_need_data(self, src, length): buf = Gst.Buffer.new_allocate(None, frame.nbytes, None) buf.fill(0, frame.tobytes()) buf.duration = Gst.SECOND // self.stream.fps - buf.pts = self.stream.frame_count * buf.duration - self.stream.frame_count += 1 src.emit("push-buffer", buf) except Exception as e: logger.error(f"Error in RTSP callback for {self.stream.mount_point}: {e}") @@ -228,10 +217,9 @@ def on_need_data(self, src, length): def _on_client_connected(self, server, client): """Callback when a client connects to the RTSP server.""" + # Track this client and their streams connection = client.get_connection() client_ip = connection.get_ip() - - # Track this client and their streams self._client_streams[client] = {"ip": client_ip, "streams": set()} # Connect to the client's 'closed' signal to detect disconnection From a250a9db62c8684c220e6bc359a05baf9f51e4bc Mon Sep 17 00:00:00 2001 From: Auto-format Bot Date: Thu, 9 Oct 2025 00:42:04 +0000 Subject: [PATCH 07/37] Automatically reformatting code with black and isort --- src/framegrab/rtsp_server.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/framegrab/rtsp_server.py b/src/framegrab/rtsp_server.py index 130bdda..bf4c0e0 100644 --- a/src/framegrab/rtsp_server.py +++ b/src/framegrab/rtsp_server.py @@ -35,6 +35,7 @@ def __init__(self, callback: Callable[[], np.ndarray], width: int, height: int, self.mount_point = mount_point self.fps = int(fps) + class RTSPServer: """RTSP server that supports multiple streams.""" @@ -191,7 +192,7 @@ def do_configure(self, rtsp_media): client_info = info info["streams"].add(self.stream.mount_point) break - + if client_info: logger.info( f"RTSP Server on port {self.rtsp_server.port}: RTSP client {client_info['ip']} connected to {self.stream.mount_point}" From bff5fe3034a4d9bcc121fb27da02278248cb78ed Mon Sep 17 00:00:00 2001 From: Tim Huff Date: Thu, 9 Oct 2025 10:02:34 -0700 Subject: [PATCH 08/37] removing an unneeded import --- sample_scripts/video_to_rtsp.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/sample_scripts/video_to_rtsp.py b/sample_scripts/video_to_rtsp.py index 7963360..336ec16 100644 --- a/sample_scripts/video_to_rtsp.py +++ b/sample_scripts/video_to_rtsp.py @@ -10,8 +10,6 @@ import threading -from cachetools import TTLCache, cached - logging.basicConfig(level=logging.INFO, format='%(levelname)s - %(name)s - %(message)s') From 1e0f75b7598ec3e2bf7458208eed98b774850c66 Mon Sep 17 00:00:00 2001 From: Tim Huff Date: Thu, 9 Oct 2025 13:17:27 -0700 Subject: [PATCH 09/37] got a new approach working --- src/framegrab/rtsp_server.py | 269 +++++++++++++++++------------------ 1 file changed, 133 insertions(+), 136 deletions(-) diff --git a/src/framegrab/rtsp_server.py b/src/framegrab/rtsp_server.py index bf4c0e0..bd78554 100644 --- a/src/framegrab/rtsp_server.py +++ b/src/framegrab/rtsp_server.py @@ -2,14 +2,13 @@ import platform import threading import time -from typing import Callable, Dict, List +from typing import Callable, Dict, List, Tu ple import cv2 import numpy as np from .unavailable_module import UnavailableModuleOrObject -# Only import GStreamer modules if available try: import gi @@ -26,8 +25,6 @@ class Stream: - """Represents a single RTSP stream.""" - def __init__(self, callback: Callable[[], np.ndarray], width: int, height: int, mount_point: str, fps: int): self.callback = callback self.width = width @@ -37,121 +34,71 @@ def __init__(self, callback: Callable[[], np.ndarray], width: int, height: int, class RTSPServer: - """RTSP server that supports multiple streams.""" - def __init__(self, port: int = 8554): - """Initialize RTSP server. - - Args: - port: RTSP server port (default: 8554) - """ system = platform.system() if system == "Windows": - raise RuntimeError( - "RTSPServer is not supported on Windows. " - "GStreamer RTSP server libraries are difficult to install on Windows. " - "Please use a Linux system, WSL2, or Docker container." - ) - elif system == "Darwin": # macOS - logger.warning( - "RTSPServer has limited support on macOS. " "You may need to install GStreamer via Homebrew: " - ) - + raise RuntimeError("RTSPServer not supported on Windows") self.port = int(port) self.streams: Dict[str, Stream] = {} - self._client_streams = {} # Track which streams each client is accessing - - # GStreamer objects + # mount_point -> {"clients": [ { "appsrc": ..., "frame_count": int }, ...], + # "clients_lock": Lock(), "producer": (thread, stop_evt) or None} + self._mounts: Dict[str, Dict] = {} + self._client_streams = {} self._server = None self._loop = None self._loop_thread = None self._running = False - def __str__(self) -> str: - status = "running" if self._running else "stopped" - stream_count = len(self.streams) - return f"RTSPServer({status}) - port:{self.port}, streams:{stream_count}" - - def __repr__(self) -> str: - return self.__str__() - - def create_stream( - self, callback: Callable[[], np.ndarray], width: int, height: int, mount_point: str, fps: int - ) -> None: - """Create a new stream. - - Args: - callback: Function that returns a frame when called - width: Frame width - height: Frame height - mount_point: RTSP mount point (e.g., '/stream0') - fps: Target FPS for stream - """ + def create_stream(self, callback: Callable[[], np.ndarray], width: int, height: int, mount_point: str, fps: int): if mount_point in self.streams: - raise ValueError(f"Stream with mount point '{mount_point}' already exists") - + raise ValueError(f"Stream '{mount_point}' exists") self.streams[mount_point] = Stream(callback, width, height, mount_point, fps) - - def list_streams(self) -> List[str]: - """List all stream mount points.""" - return list(self.streams.keys()) + self._mounts[mount_point] = {"clients": [], "clients_lock": threading.Lock(), "producer": None} def list_rtsp_urls(self) -> List[str]: - """Get a list of RTSP URLs for all streams.""" - return [f"rtsp://localhost:{self.port}{mount_point}" for mount_point in self.streams.keys()] - - def remove_stream(self, mount_point: str) -> None: - """Remove a stream.""" - if mount_point not in self.streams: - raise ValueError(f"Stream with mount point '{mount_point}' does not exist") - - del self.streams[mount_point] + return [f"rtsp://localhost:{self.port}{m}" for m in self.streams.keys()] - def start(self) -> None: - """Start the RTSP server in a background thread.""" + def start(self): if self._running: - raise RuntimeError("RTSP server already started.") - + return if not self.streams: - raise RuntimeError("No streams created. Call create_stream() first.") - + raise RuntimeError("No streams created") self._running = True self._loop_thread = threading.Thread(target=self._run_server, daemon=True) self._loop_thread.start() + time.sleep(0.2) - def stop(self) -> None: - """Stop the RTSP server.""" + def stop(self): if not self._running: return - self._running = False + # stop producers + for mount in list(self._mounts.values()): + prod = mount.get("producer") + if prod: + thr, stop_evt = prod + stop_evt.set() + thr.join(timeout=1.0) + mount["producer"] = None if self._loop: self._loop.quit() if self._loop_thread: self._loop_thread.join(timeout=2.0) - self.streams.clear() + self._mounts.clear() + self._client_streams.clear() - def _run_server(self) -> None: - """Run the GStreamer RTSP server main loop.""" + def _run_server(self): Gst.init(None) - self._server = GstRtspServer.RTSPServer() self._server.set_service(str(self.port)) - - # Set up client connection callback self._server.connect("client-connected", self._on_client_connected) - mount_points = self._server.get_mount_points() - - # Create a factory for each stream - for stream in self.streams.values(): - factory = self._create_media_factory(stream) - factory.set_shared(False) - mount_points.add_factory(stream.mount_point, factory) - + for s in self.streams.values(): + f = self._create_media_factory(s) + f.set_shared(False) # per-client pipelines + mount_points.add_factory(s.mount_point, f) self._server.attach(None) - self._loop = GLib.MainLoop() try: self._loop.run() @@ -159,13 +106,11 @@ def _run_server(self) -> None: self._running = False def _create_media_factory(self, stream: Stream): - """Create the GStreamer media factory for a specific stream.""" - - class RTSPMediaFactory(GstRtspServer.RTSPMediaFactory): - def __init__(self, stream, rtsp_server): + class Factory(GstRtspServer.RTSPMediaFactory): + def __init__(self, stream, server): super().__init__() self.stream = stream - self.rtsp_server = rtsp_server + self.server = server def do_create_element(self, url): pipeline = ( @@ -180,67 +125,119 @@ def do_create_element(self, url): def do_configure(self, rtsp_media): appsrc = rtsp_media.get_element().get_child_by_name("source") + appsrc.set_property("is-live", True) appsrc.set_property("format", Gst.Format.TIME) - appsrc.set_property("do-timestamp", True) - appsrc.connect("need-data", self.on_need_data) - - # Try to find which client is accessing this stream - # This is a bit of a hack since GStreamer doesn't directly provide this info - client_info = None - for _, info in self.rtsp_server._client_streams.items(): - if not info["streams"]: # This client hasn't accessed any streams yet - client_info = info - info["streams"].add(self.stream.mount_point) - break - - if client_info: - logger.info( - f"RTSP Server on port {self.rtsp_server.port}: RTSP client {client_info['ip']} connected to {self.stream.mount_point}" - ) - - def on_need_data(self, src, length): + appsrc.set_property("do-timestamp", False) # we manage per-client PTS + + mount = self.server._mounts[self.stream.mount_point] + duration = int(Gst.SECOND / float(self.stream.fps)) + + client = {"appsrc": appsrc, "frame_count": 0} + with mount["clients_lock"]: + mount["clients"].append(client) + + # start producer if absent + if mount["producer"] is None: + stop_evt = threading.Event() + + def producer(): + period = 1.0 / float(self.stream.fps) + next_t = time.monotonic() + while not stop_evt.is_set(): + now = time.monotonic() + s = next_t - now + if s > 0: + time.sleep(s) + next_t += period + + try: + frame = self.stream.callback() + frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) + fb = frame.tobytes() + except Exception: + logger.exception("grab failed for %s", self.stream.mount_point) + fb = b"" + + with mount["clients_lock"]: + clients = list(mount["clients"]) + + for c in clients: + GLib.idle_add(push_to_client, c, fb, duration) + + with mount["clients_lock"]: + if not mount["clients"]: + break + return + + thr = threading.Thread(target=producer, daemon=True, name=f"prod-{self.stream.mount_point}") + mount["producer"] = (thr, stop_evt) + thr.start() + + def remove_client(): + with mount["clients_lock"]: + mount["clients"] = [c for c in mount["clients"] if c is not client] + if not mount["clients"] and mount["producer"]: + thr, stop_evt = mount["producer"] + stop_evt.set() + thr.join(timeout=1.0) + mount["producer"] = None + + # cleanup when media is unprepared try: - frame = self.stream.callback() - frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) - - # Convert to GStreamer buffer - buf = Gst.Buffer.new_allocate(None, frame.nbytes, None) - buf.fill(0, frame.tobytes()) - buf.duration = Gst.SECOND // self.stream.fps - src.emit("push-buffer", buf) - except Exception as e: - logger.error(f"Error in RTSP callback for {self.stream.mount_point}: {e}") - # Push an empty buffer to keep the stream alive - buf = Gst.Buffer.new_allocate(None, 0, None) - src.emit("push-buffer", buf) - - return RTSPMediaFactory(stream, self) + rtsp_media.connect("unprepared", lambda *a: remove_client()) + except Exception: + try: + rtsp_media.connect("notify::state", lambda *a: remove_client()) + except Exception: + pass + + def push_to_client(client_entry, frame_bytes: bytes, duration_ns: int): + app = client_entry.get("appsrc") + if app is None: + return False + try: + buf = Gst.Buffer.new_allocate(None, len(frame_bytes), None) + buf.fill(0, frame_bytes) + fc = client_entry.get("frame_count", 0) + buf.pts = int(fc * duration_ns) + buf.duration = int(duration_ns) + client_entry["frame_count"] = fc + 1 + app.emit("push-buffer", buf) + except Exception: + logger.exception("push failed; removing client") + # remove failing client + for mount in self._mounts.values(): + with mount["clients_lock"]: + if client_entry in mount["clients"]: + mount["clients"].remove(client_entry) + if not mount["clients"] and mount["producer"]: + thr, stop_evt = mount["producer"] + stop_evt.set() + thr.join(timeout=1.0) + mount["producer"] = None + break + return False + + return Factory(stream, self) def _on_client_connected(self, server, client): - """Callback when a client connects to the RTSP server.""" - # Track this client and their streams - connection = client.get_connection() - client_ip = connection.get_ip() - self._client_streams[client] = {"ip": client_ip, "streams": set()} + conn = client.get_connection() + ip = conn.get_ip() - # Connect to the client's 'closed' signal to detect disconnection + # TODO log info about the client that connected + + self._client_streams[client] = {"ip": ip, "streams": set()} + client.connect("closed", self._on_client_disconnected) def _on_client_disconnected(self, client): - """Callback when a client disconnects from the RTSP server.""" - client_info = self._client_streams.pop(client, None) - if client_info is None: - logger.warning(f"RTSP Server on port {self.port}: Client disconnected but was not tracked") - return + info = self._client_streams.pop(client, None) - streams_str = ", ".join(sorted(client_info["streams"])) if client_info["streams"] else "no streams" - logger.info(f"RTSP Server on port {self.port}: RTSP client {client_info['ip']} disconnected from {streams_str}") + # TODO log info about the client that disconnected def __enter__(self): - """Context manager entry.""" self.start() return self def __exit__(self, exc_type, exc_val, exc_tb): - """Context manager exit.""" self.stop() From 849514f45249b38f051b5f7776c992400a27f003 Mon Sep 17 00:00:00 2001 From: Auto-format Bot Date: Thu, 9 Oct 2025 20:17:55 +0000 Subject: [PATCH 10/37] Automatically reformatting code with black and isort --- src/framegrab/rtsp_server.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/framegrab/rtsp_server.py b/src/framegrab/rtsp_server.py index bd78554..d80ece0 100644 --- a/src/framegrab/rtsp_server.py +++ b/src/framegrab/rtsp_server.py @@ -2,7 +2,7 @@ import platform import threading import time -from typing import Callable, Dict, List, Tu ple +from typing import Callable, Dict, List, Tu, ple import cv2 import numpy as np @@ -227,7 +227,7 @@ def _on_client_connected(self, server, client): # TODO log info about the client that connected self._client_streams[client] = {"ip": ip, "streams": set()} - + client.connect("closed", self._on_client_disconnected) def _on_client_disconnected(self, client): From d0883c4b7adfa353f36384beecfc04401950e456 Mon Sep 17 00:00:00 2001 From: Tim Huff Date: Thu, 9 Oct 2025 14:47:33 -0700 Subject: [PATCH 11/37] got it working better --- sample_scripts/video_to_rtsp.py | 6 -- src/framegrab/rtsp_server.py | 142 +++++++++++++++++++------------- 2 files changed, 86 insertions(+), 62 deletions(-) diff --git a/sample_scripts/video_to_rtsp.py b/sample_scripts/video_to_rtsp.py index 336ec16..d0919df 100644 --- a/sample_scripts/video_to_rtsp.py +++ b/sample_scripts/video_to_rtsp.py @@ -8,8 +8,6 @@ import numpy as np -import threading - logging.basicConfig(level=logging.INFO, format='%(levelname)s - %(name)s - %(message)s') @@ -38,15 +36,11 @@ def __init__(self, video_paths: list[str], port: int): # Determine the FPS of the video fps = grabber.get_fps() - print(fps) # Reset to beginning after test frame so that streaming starts from the beginning of the video grabber.seek_to_beginning() - # frame_cache = TTLCache(maxsize=1, ttl=1/fps * 0.9) - # print(1/fps) start_time = None - # @cached(frame_cache) def get_frame_callback( grabber: FileStreamFrameGrabber = grabber, video_path: str = video_path, diff --git a/src/framegrab/rtsp_server.py b/src/framegrab/rtsp_server.py index bd78554..ec4a03a 100644 --- a/src/framegrab/rtsp_server.py +++ b/src/framegrab/rtsp_server.py @@ -2,7 +2,8 @@ import platform import threading import time -from typing import Callable, Dict, List, Tu ple +from typing import Callable, Dict, List, Optional, Tuple, Any +from dataclasses import dataclass, field import cv2 import numpy as np @@ -24,45 +25,84 @@ logger = logging.getLogger(__name__) + +@dataclass +class ClientEntry: + """Per-client state for a single RTSP consumer. + + Holds the per-connection `appsrc` element and a monotonically + increasing `frame_count` used to compute buffer PTS/duration. + """ + appsrc: Any + frame_count: int = 0 + + +@dataclass +class MountState: + """Mutable runtime state for a single RTSP mount point. + + - `clients`: active consumers attached to this mount + - `clients_lock`: guards access to `clients` and `producer` + - `producer`: (thread, stop_event) driving frames for this mount, or None + """ + clients: List[ClientEntry] = field(default_factory=list) + clients_lock: threading.Lock = field(default_factory=threading.Lock) + # (thread, stop_event) when a producer is running, else None + producer: Optional[Tuple[threading.Thread, threading.Event]] = None + + class Stream: - def __init__(self, callback: Callable[[], np.ndarray], width: int, height: int, mount_point: str, fps: int): + def __init__(self, callback: Callable[[], np.ndarray], width: int, height: int, mount_point: str, fps: float): self.callback = callback self.width = width self.height = height self.mount_point = mount_point - self.fps = int(fps) + self.fps = fps class RTSPServer: def __init__(self, port: int = 8554): system = platform.system() if system == "Windows": - raise RuntimeError("RTSPServer not supported on Windows") + raise RuntimeError( + "RTSPServer is not supported on Windows. " + "GStreamer RTSP server libraries are difficult to install on Windows. " + "Please use a Linux system, WSL2, or Docker container." + ) + elif system == "Darwin": # macOS + logger.warning( + "RTSPServer has limited support on macOS. " "You may need to install GStreamer via Homebrew: " + ) + self.port = int(port) self.streams: Dict[str, Stream] = {} - # mount_point -> {"clients": [ { "appsrc": ..., "frame_count": int }, ...], - # "clients_lock": Lock(), "producer": (thread, stop_evt) or None} - self._mounts: Dict[str, Dict] = {} - self._client_streams = {} + + # mount_point -> MountState + self._mounts: Dict[str, MountState] = {} self._server = None self._loop = None self._loop_thread = None self._running = False - def create_stream(self, callback: Callable[[], np.ndarray], width: int, height: int, mount_point: str, fps: int): + def create_stream(self, callback: Callable[[], np.ndarray], width: int, height: int, mount_point: str, fps: float): + if self._running: + raise RuntimeError("RTSPServer has already started. Streams can only be created prior to starting the server.") + if mount_point in self.streams: raise ValueError(f"Stream '{mount_point}' exists") self.streams[mount_point] = Stream(callback, width, height, mount_point, fps) - self._mounts[mount_point] = {"clients": [], "clients_lock": threading.Lock(), "producer": None} + self._mounts[mount_point] = MountState() def list_rtsp_urls(self) -> List[str]: return [f"rtsp://localhost:{self.port}{m}" for m in self.streams.keys()] def start(self): if self._running: - return + raise RuntimeError("RTSPServer is already running.") + if not self.streams: - raise RuntimeError("No streams created") + raise RuntimeError("No streams created. Please call `create_stream` first.") + self._running = True self._loop_thread = threading.Thread(target=self._run_server, daemon=True) self._loop_thread.start() @@ -74,19 +114,18 @@ def stop(self): self._running = False # stop producers for mount in list(self._mounts.values()): - prod = mount.get("producer") + prod = mount.producer if prod: thr, stop_evt = prod stop_evt.set() thr.join(timeout=1.0) - mount["producer"] = None + mount.producer = None if self._loop: self._loop.quit() if self._loop_thread: self._loop_thread.join(timeout=2.0) self.streams.clear() self._mounts.clear() - self._client_streams.clear() def _run_server(self): Gst.init(None) @@ -113,10 +152,11 @@ def __init__(self, stream, server): self.server = server def do_create_element(self, url): + fps_int = int(round(self.stream.fps, 0)) # Gstreamer wants an int here pipeline = ( f"appsrc name=source is-live=true format=GST_FORMAT_TIME " f"caps=video/x-raw,format=RGB,width={self.stream.width}," - f"height={self.stream.height},framerate={self.stream.fps}/1 " + f"height={self.stream.height},framerate={fps_int}/1 " f"! videoconvert ! video/x-raw,format=I420 " f"! x264enc speed-preset=ultrafast tune=zerolatency " f"! rtph264pay name=pay0 pt=96" @@ -132,12 +172,12 @@ def do_configure(self, rtsp_media): mount = self.server._mounts[self.stream.mount_point] duration = int(Gst.SECOND / float(self.stream.fps)) - client = {"appsrc": appsrc, "frame_count": 0} - with mount["clients_lock"]: - mount["clients"].append(client) + client = ClientEntry(appsrc=appsrc) + with mount.clients_lock: + mount.clients.append(client) # start producer if absent - if mount["producer"] is None: + if mount.producer is None: stop_evt = threading.Event() def producer(): @@ -158,82 +198,72 @@ def producer(): logger.exception("grab failed for %s", self.stream.mount_point) fb = b"" - with mount["clients_lock"]: - clients = list(mount["clients"]) + with mount.clients_lock: + clients = list(mount.clients) for c in clients: GLib.idle_add(push_to_client, c, fb, duration) - with mount["clients_lock"]: - if not mount["clients"]: + with mount.clients_lock: + if not mount.clients: break - return thr = threading.Thread(target=producer, daemon=True, name=f"prod-{self.stream.mount_point}") - mount["producer"] = (thr, stop_evt) + mount.producer = (thr, stop_evt) thr.start() def remove_client(): - with mount["clients_lock"]: - mount["clients"] = [c for c in mount["clients"] if c is not client] - if not mount["clients"] and mount["producer"]: - thr, stop_evt = mount["producer"] + with mount.clients_lock: + mount.clients = [c for c in mount.clients if c is not client] + if not mount.clients and mount.producer: + thr, stop_evt = mount.producer stop_evt.set() thr.join(timeout=1.0) - mount["producer"] = None + mount.producer = None # cleanup when media is unprepared - try: - rtsp_media.connect("unprepared", lambda *a: remove_client()) - except Exception: - try: - rtsp_media.connect("notify::state", lambda *a: remove_client()) - except Exception: - pass + rtsp_media.connect("unprepared", lambda *a: remove_client()) def push_to_client(client_entry, frame_bytes: bytes, duration_ns: int): - app = client_entry.get("appsrc") + app = client_entry.appsrc if app is None: - return False + return + try: buf = Gst.Buffer.new_allocate(None, len(frame_bytes), None) buf.fill(0, frame_bytes) - fc = client_entry.get("frame_count", 0) + fc = client_entry.frame_count buf.pts = int(fc * duration_ns) buf.duration = int(duration_ns) - client_entry["frame_count"] = fc + 1 + client_entry.frame_count = fc + 1 app.emit("push-buffer", buf) except Exception: logger.exception("push failed; removing client") # remove failing client for mount in self._mounts.values(): - with mount["clients_lock"]: - if client_entry in mount["clients"]: - mount["clients"].remove(client_entry) - if not mount["clients"] and mount["producer"]: - thr, stop_evt = mount["producer"] + with mount.clients_lock: + if client_entry in mount.clients: + mount.clients.remove(client_entry) + if not mount.clients and mount.producer: + thr, stop_evt = mount.producer stop_evt.set() thr.join(timeout=1.0) - mount["producer"] = None + mount.producer = None break - return False return Factory(stream, self) def _on_client_connected(self, server, client): conn = client.get_connection() ip = conn.get_ip() - - # TODO log info about the client that connected - - self._client_streams[client] = {"ip": ip, "streams": set()} + logger.info(f"RTSP client connected: ip={ip}") client.connect("closed", self._on_client_disconnected) def _on_client_disconnected(self, client): - info = self._client_streams.pop(client, None) - - # TODO log info about the client that disconnected + conn = client.get_connection() + ip = conn.get_ip() + logger.info(f"RTSP client disconnected: ip={ip}") def __enter__(self): self.start() From c700b8adb3b91ed279e9b787f8f5e1d2ae2d6db6 Mon Sep 17 00:00:00 2001 From: Auto-format Bot Date: Thu, 9 Oct 2025 21:52:12 +0000 Subject: [PATCH 12/37] Automatically reformatting code with black and isort --- src/framegrab/rtsp_server.py | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/src/framegrab/rtsp_server.py b/src/framegrab/rtsp_server.py index ec4a03a..abec88c 100644 --- a/src/framegrab/rtsp_server.py +++ b/src/framegrab/rtsp_server.py @@ -2,8 +2,8 @@ import platform import threading import time -from typing import Callable, Dict, List, Optional, Tuple, Any from dataclasses import dataclass, field +from typing import Any, Callable, Dict, List, Optional, Tuple import cv2 import numpy as np @@ -25,7 +25,6 @@ logger = logging.getLogger(__name__) - @dataclass class ClientEntry: """Per-client state for a single RTSP consumer. @@ -33,6 +32,7 @@ class ClientEntry: Holds the per-connection `appsrc` element and a monotonically increasing `frame_count` used to compute buffer PTS/duration. """ + appsrc: Any frame_count: int = 0 @@ -45,6 +45,7 @@ class MountState: - `clients_lock`: guards access to `clients` and `producer` - `producer`: (thread, stop_event) driving frames for this mount, or None """ + clients: List[ClientEntry] = field(default_factory=list) clients_lock: threading.Lock = field(default_factory=threading.Lock) # (thread, stop_event) when a producer is running, else None @@ -76,7 +77,7 @@ def __init__(self, port: int = 8554): self.port = int(port) self.streams: Dict[str, Stream] = {} - + # mount_point -> MountState self._mounts: Dict[str, MountState] = {} self._server = None @@ -86,7 +87,9 @@ def __init__(self, port: int = 8554): def create_stream(self, callback: Callable[[], np.ndarray], width: int, height: int, mount_point: str, fps: float): if self._running: - raise RuntimeError("RTSPServer has already started. Streams can only be created prior to starting the server.") + raise RuntimeError( + "RTSPServer has already started. Streams can only be created prior to starting the server." + ) if mount_point in self.streams: raise ValueError(f"Stream '{mount_point}' exists") @@ -102,7 +105,7 @@ def start(self): if not self.streams: raise RuntimeError("No streams created. Please call `create_stream` first.") - + self._running = True self._loop_thread = threading.Thread(target=self._run_server, daemon=True) self._loop_thread.start() @@ -152,7 +155,7 @@ def __init__(self, stream, server): self.server = server def do_create_element(self, url): - fps_int = int(round(self.stream.fps, 0)) # Gstreamer wants an int here + fps_int = int(round(self.stream.fps, 0)) # Gstreamer wants an int here pipeline = ( f"appsrc name=source is-live=true format=GST_FORMAT_TIME " f"caps=video/x-raw,format=RGB,width={self.stream.width}," @@ -257,7 +260,7 @@ def _on_client_connected(self, server, client): conn = client.get_connection() ip = conn.get_ip() logger.info(f"RTSP client connected: ip={ip}") - + client.connect("closed", self._on_client_disconnected) def _on_client_disconnected(self, client): From 653ea8e684e78b21867226ac022affdbf28853ec Mon Sep 17 00:00:00 2001 From: Tim Huff Date: Thu, 9 Oct 2025 16:48:11 -0700 Subject: [PATCH 13/37] cleaning up some scripts --- sample_scripts/video_to_rtsp.py | 9 +-------- sample_scripts/view_rtsp_stream.py | 9 +-------- 2 files changed, 2 insertions(+), 16 deletions(-) diff --git a/sample_scripts/video_to_rtsp.py b/sample_scripts/video_to_rtsp.py index d0919df..34d5eaf 100644 --- a/sample_scripts/video_to_rtsp.py +++ b/sample_scripts/video_to_rtsp.py @@ -8,7 +8,6 @@ import numpy as np - logging.basicConfig(level=logging.INFO, format='%(levelname)s - %(name)s - %(message)s') logger = logging.getLogger(__name__) @@ -40,21 +39,15 @@ def __init__(self, video_paths: list[str], port: int): # Reset to beginning after test frame so that streaming starts from the beginning of the video grabber.seek_to_beginning() - start_time = None def get_frame_callback( grabber: FileStreamFrameGrabber = grabber, video_path: str = video_path, ) -> np.ndarray: - nonlocal start_time - if start_time is None: - start_time = time.time() try: return grabber.grab() except RuntimeWarning: - elapsed_time = time.time() - start_time last_frame_read_number = grabber.get_last_frame_read_number() - logger.info(f'Reached the end of {video_path}. Read {last_frame_read_number + 1} frames in {elapsed_time:.2f} seconds. Restarting from the beginning of the video...') - start_time = time.time() + logger.info(f'Reached the end of {video_path}. Read {last_frame_read_number + 1} frames. Restarting from the beginning of the video...') grabber.seek_to_beginning() return grabber.grab() diff --git a/sample_scripts/view_rtsp_stream.py b/sample_scripts/view_rtsp_stream.py index cfb0e6d..1e17939 100644 --- a/sample_scripts/view_rtsp_stream.py +++ b/sample_scripts/view_rtsp_stream.py @@ -5,8 +5,6 @@ import cv2 import numpy as np -import time - def resize_frame(frame: np.ndarray, max_width: int = None, max_height: int = None) -> np.ndarray: """ Resizes an image to fit within a given height and/or width, without changing the aspect ratio. @@ -50,21 +48,16 @@ def main(): args = parser.parse_args() config = RTSPFrameGrabberConfig(rtsp_url=args.rtsp_url) - t1 = time.time() grabber = FrameGrabber.create_grabber(config) - t2 = time.time() - elapsed_time = t2 - t1 - print(f'Created grabber in {elapsed_time:.2f} seconds.') print(f"Streaming from: {args.rtsp_url}") print("Press 'q' to quit") - start_time = time.time() try: while True: frame = grabber.grab() resized_frame = resize_frame(frame, 640, 480) # get a smaller frame so it's easier to view - cv2.imshow(f'Streaming {args.rtsp_url} {start_time}', resized_frame) + cv2.imshow(f'Streaming {args.rtsp_url}', resized_frame) key = cv2.waitKey(30) if key == ord('q'): break From bf5283f0bdaac24ffcc6d9e3625edda2e96f3def Mon Sep 17 00:00:00 2001 From: Tim Huff Date: Thu, 9 Oct 2025 17:44:27 -0700 Subject: [PATCH 14/37] refactoring --- sample_scripts/video_to_rtsp.py | 2 + src/framegrab/rtsp_server.py | 262 ++++++++++++++++++-------------- 2 files changed, 149 insertions(+), 115 deletions(-) diff --git a/sample_scripts/video_to_rtsp.py b/sample_scripts/video_to_rtsp.py index 34d5eaf..b8aae39 100644 --- a/sample_scripts/video_to_rtsp.py +++ b/sample_scripts/video_to_rtsp.py @@ -8,6 +8,8 @@ import numpy as np +import random + logging.basicConfig(level=logging.INFO, format='%(levelname)s - %(name)s - %(message)s') logger = logging.getLogger(__name__) diff --git a/src/framegrab/rtsp_server.py b/src/framegrab/rtsp_server.py index abec88c..07b134b 100644 --- a/src/framegrab/rtsp_server.py +++ b/src/framegrab/rtsp_server.py @@ -52,13 +52,73 @@ class MountState: producer: Optional[Tuple[threading.Thread, threading.Event]] = None +@dataclass class Stream: - def __init__(self, callback: Callable[[], np.ndarray], width: int, height: int, mount_point: str, fps: float): - self.callback = callback - self.width = width - self.height = height - self.mount_point = mount_point - self.fps = fps + """Configuration for a single RTSP stream. + + Contains the callback function to generate frames and the stream parameters + like dimensions, mount point, and frame rate. + """ + callback: Callable[[], np.ndarray] + width: int + height: int + mount_point: str + fps: float + + +class RTSPStreamMediaFactory(GstRtspServer.RTSPMediaFactory): + """GStreamer RTSP Media Factory for handling individual stream mount points. + + This factory creates and configures the GStreamer pipeline for each RTSP stream, + manages client connections, and coordinates with the RTSPServer for frame production. + """ + + def __init__(self, stream: Stream, server: 'RTSPServer'): + super().__init__() + self.stream = stream + self.server = server + + def do_create_element(self, url): + """Create the GStreamer pipeline for this stream.""" + fps_int = int(round(self.stream.fps, 0)) # Gstreamer wants an int here + pipeline = ( + f"appsrc name=source is-live=true format=GST_FORMAT_TIME " + f"caps=video/x-raw,format=RGB,width={self.stream.width}," + f"height={self.stream.height},framerate={fps_int}/1 " + f"! videoconvert ! video/x-raw,format=I420 " + f"! x264enc speed-preset=ultrafast tune=zerolatency " + f"! rtph264pay name=pay0 pt=96" + ) + return Gst.parse_launch(pipeline) + + def do_configure(self, rtsp_media): + """Configure a new client connection for this stream.""" + appsrc = rtsp_media.get_element().get_child_by_name("source") + appsrc.set_property("format", Gst.Format.TIME) + appsrc.set_property("do-timestamp", False) # we manage per-client PTS + + mount = self.server._mounts[self.stream.mount_point] + duration = int(Gst.SECOND / float(self.stream.fps)) + + client = ClientEntry(appsrc=appsrc) + with mount.clients_lock: + mount.clients.append(client) + + # start producer if absent + if mount.producer is None: + stop_evt = threading.Event() + + thr = threading.Thread( + target=self.server._producer_worker, + args=(self.stream, mount, stop_evt, duration), + daemon=True, + name=f"prod-{self.stream.mount_point}" + ) + mount.producer = (thr, stop_evt) + thr.start() + + # cleanup when media is unprepared + rtsp_media.connect("unprepared", lambda *a: self.server._remove_client(mount, client)) class RTSPServer: @@ -137,7 +197,7 @@ def _run_server(self): self._server.connect("client-connected", self._on_client_connected) mount_points = self._server.get_mount_points() for s in self.streams.values(): - f = self._create_media_factory(s) + f = RTSPStreamMediaFactory(s, self) f.set_shared(False) # per-client pipelines mount_points.add_factory(s.mount_point, f) self._server.attach(None) @@ -147,114 +207,6 @@ def _run_server(self): finally: self._running = False - def _create_media_factory(self, stream: Stream): - class Factory(GstRtspServer.RTSPMediaFactory): - def __init__(self, stream, server): - super().__init__() - self.stream = stream - self.server = server - - def do_create_element(self, url): - fps_int = int(round(self.stream.fps, 0)) # Gstreamer wants an int here - pipeline = ( - f"appsrc name=source is-live=true format=GST_FORMAT_TIME " - f"caps=video/x-raw,format=RGB,width={self.stream.width}," - f"height={self.stream.height},framerate={fps_int}/1 " - f"! videoconvert ! video/x-raw,format=I420 " - f"! x264enc speed-preset=ultrafast tune=zerolatency " - f"! rtph264pay name=pay0 pt=96" - ) - return Gst.parse_launch(pipeline) - - def do_configure(self, rtsp_media): - appsrc = rtsp_media.get_element().get_child_by_name("source") - appsrc.set_property("is-live", True) - appsrc.set_property("format", Gst.Format.TIME) - appsrc.set_property("do-timestamp", False) # we manage per-client PTS - - mount = self.server._mounts[self.stream.mount_point] - duration = int(Gst.SECOND / float(self.stream.fps)) - - client = ClientEntry(appsrc=appsrc) - with mount.clients_lock: - mount.clients.append(client) - - # start producer if absent - if mount.producer is None: - stop_evt = threading.Event() - - def producer(): - period = 1.0 / float(self.stream.fps) - next_t = time.monotonic() - while not stop_evt.is_set(): - now = time.monotonic() - s = next_t - now - if s > 0: - time.sleep(s) - next_t += period - - try: - frame = self.stream.callback() - frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) - fb = frame.tobytes() - except Exception: - logger.exception("grab failed for %s", self.stream.mount_point) - fb = b"" - - with mount.clients_lock: - clients = list(mount.clients) - - for c in clients: - GLib.idle_add(push_to_client, c, fb, duration) - - with mount.clients_lock: - if not mount.clients: - break - - thr = threading.Thread(target=producer, daemon=True, name=f"prod-{self.stream.mount_point}") - mount.producer = (thr, stop_evt) - thr.start() - - def remove_client(): - with mount.clients_lock: - mount.clients = [c for c in mount.clients if c is not client] - if not mount.clients and mount.producer: - thr, stop_evt = mount.producer - stop_evt.set() - thr.join(timeout=1.0) - mount.producer = None - - # cleanup when media is unprepared - rtsp_media.connect("unprepared", lambda *a: remove_client()) - - def push_to_client(client_entry, frame_bytes: bytes, duration_ns: int): - app = client_entry.appsrc - if app is None: - return - - try: - buf = Gst.Buffer.new_allocate(None, len(frame_bytes), None) - buf.fill(0, frame_bytes) - fc = client_entry.frame_count - buf.pts = int(fc * duration_ns) - buf.duration = int(duration_ns) - client_entry.frame_count = fc + 1 - app.emit("push-buffer", buf) - except Exception: - logger.exception("push failed; removing client") - # remove failing client - for mount in self._mounts.values(): - with mount.clients_lock: - if client_entry in mount.clients: - mount.clients.remove(client_entry) - if not mount.clients and mount.producer: - thr, stop_evt = mount.producer - stop_evt.set() - thr.join(timeout=1.0) - mount.producer = None - break - - return Factory(stream, self) def _on_client_connected(self, server, client): conn = client.get_connection() @@ -268,6 +220,86 @@ def _on_client_disconnected(self, client): ip = conn.get_ip() logger.info(f"RTSP client disconnected: ip={ip}") + def _producer_worker(self, stream: Stream, mount: MountState, stop_evt: threading.Event, + duration: int): + """Worker function that produces frames for a stream at the target FPS. + + Args: + stream: The stream configuration (callback, fps, etc.) + mount: The mount state containing client list and locks + stop_evt: Event to signal when to stop producing + duration: Frame duration in nanoseconds + """ + period = 1.0 / float(stream.fps) + next_t = time.monotonic() + + while not stop_evt.is_set(): + now = time.monotonic() + s = next_t - now + if s > 0: + time.sleep(s) + next_t += period + + try: + frame = stream.callback() + print(f'got a frame for {stream.mount_point}') + frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) + fb = frame.tobytes() + except Exception: + logger.exception("grab failed for %s", stream.mount_point) + fb = b"" + + with mount.clients_lock: + clients = list(mount.clients) + + for c in clients: + GLib.idle_add(self._push_to_client, c, fb, duration, mount) + + with mount.clients_lock: + if not mount.clients: + break + + def _push_to_client(self, client_entry: ClientEntry, frame_bytes: bytes, duration_ns: int, mount: MountState): + """Push a frame buffer to a specific client. + + Args: + client_entry: The client to push the frame to + frame_bytes: The frame data as bytes + duration_ns: Frame duration in nanoseconds + mount: The mount state (used for client removal on error) + """ + app = client_entry.appsrc + if app is None: + return + + try: + buf = Gst.Buffer.new_allocate(None, len(frame_bytes), None) + buf.fill(0, frame_bytes) + fc = client_entry.frame_count + buf.pts = int(fc * duration_ns) + buf.duration = int(duration_ns) + client_entry.frame_count = fc + 1 + app.emit("push-buffer", buf) + except Exception: + logger.exception("push failed; removing client") + # Use the existing _remove_client method + self._remove_client(mount, client_entry) + + def _remove_client(self, mount: MountState, client: ClientEntry): + """Remove a client from a mount and stop the producer if no clients remain. + + Args: + mount: The mount state containing the client list and producer + client: The client entry to remove + """ + with mount.clients_lock: + mount.clients = [c for c in mount.clients if c is not client] + if not mount.clients and mount.producer: + thr, stop_evt = mount.producer + stop_evt.set() + thr.join(timeout=1.0) + mount.producer = None + def __enter__(self): self.start() return self From 83580ab3b1b92850ab45cb039c70d428d708af6b Mon Sep 17 00:00:00 2001 From: Auto-format Bot Date: Fri, 10 Oct 2025 00:44:58 +0000 Subject: [PATCH 15/37] Automatically reformatting code with black and isort --- src/framegrab/rtsp_server.py | 29 ++++++++++++++--------------- 1 file changed, 14 insertions(+), 15 deletions(-) diff --git a/src/framegrab/rtsp_server.py b/src/framegrab/rtsp_server.py index 07b134b..8f824b5 100644 --- a/src/framegrab/rtsp_server.py +++ b/src/framegrab/rtsp_server.py @@ -55,10 +55,11 @@ class MountState: @dataclass class Stream: """Configuration for a single RTSP stream. - + Contains the callback function to generate frames and the stream parameters like dimensions, mount point, and frame rate. """ + callback: Callable[[], np.ndarray] width: int height: int @@ -68,12 +69,12 @@ class Stream: class RTSPStreamMediaFactory(GstRtspServer.RTSPMediaFactory): """GStreamer RTSP Media Factory for handling individual stream mount points. - + This factory creates and configures the GStreamer pipeline for each RTSP stream, manages client connections, and coordinates with the RTSPServer for frame production. """ - - def __init__(self, stream: Stream, server: 'RTSPServer'): + + def __init__(self, stream: Stream, server: "RTSPServer"): super().__init__() self.stream = stream self.server = server @@ -109,10 +110,10 @@ def do_configure(self, rtsp_media): stop_evt = threading.Event() thr = threading.Thread( - target=self.server._producer_worker, + target=self.server._producer_worker, args=(self.stream, mount, stop_evt, duration), - daemon=True, - name=f"prod-{self.stream.mount_point}" + daemon=True, + name=f"prod-{self.stream.mount_point}", ) mount.producer = (thr, stop_evt) thr.start() @@ -207,7 +208,6 @@ def _run_server(self): finally: self._running = False - def _on_client_connected(self, server, client): conn = client.get_connection() ip = conn.get_ip() @@ -220,10 +220,9 @@ def _on_client_disconnected(self, client): ip = conn.get_ip() logger.info(f"RTSP client disconnected: ip={ip}") - def _producer_worker(self, stream: Stream, mount: MountState, stop_evt: threading.Event, - duration: int): + def _producer_worker(self, stream: Stream, mount: MountState, stop_evt: threading.Event, duration: int): """Worker function that produces frames for a stream at the target FPS. - + Args: stream: The stream configuration (callback, fps, etc.) mount: The mount state containing client list and locks @@ -232,7 +231,7 @@ def _producer_worker(self, stream: Stream, mount: MountState, stop_evt: threadin """ period = 1.0 / float(stream.fps) next_t = time.monotonic() - + while not stop_evt.is_set(): now = time.monotonic() s = next_t - now @@ -242,7 +241,7 @@ def _producer_worker(self, stream: Stream, mount: MountState, stop_evt: threadin try: frame = stream.callback() - print(f'got a frame for {stream.mount_point}') + print(f"got a frame for {stream.mount_point}") frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) fb = frame.tobytes() except Exception: @@ -261,7 +260,7 @@ def _producer_worker(self, stream: Stream, mount: MountState, stop_evt: threadin def _push_to_client(self, client_entry: ClientEntry, frame_bytes: bytes, duration_ns: int, mount: MountState): """Push a frame buffer to a specific client. - + Args: client_entry: The client to push the frame to frame_bytes: The frame data as bytes @@ -287,7 +286,7 @@ def _push_to_client(self, client_entry: ClientEntry, frame_bytes: bytes, duratio def _remove_client(self, mount: MountState, client: ClientEntry): """Remove a client from a mount and stop the producer if no clients remain. - + Args: mount: The mount state containing the client list and producer client: The client entry to remove From 789e36a211bbca11c380fa2b70e264130c199a3a Mon Sep 17 00:00:00 2001 From: Tim Huff Date: Thu, 9 Oct 2025 17:45:01 -0700 Subject: [PATCH 16/37] removing an unneeded import --- sample_scripts/video_to_rtsp.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/sample_scripts/video_to_rtsp.py b/sample_scripts/video_to_rtsp.py index b8aae39..34d5eaf 100644 --- a/sample_scripts/video_to_rtsp.py +++ b/sample_scripts/video_to_rtsp.py @@ -8,8 +8,6 @@ import numpy as np -import random - logging.basicConfig(level=logging.INFO, format='%(levelname)s - %(name)s - %(message)s') logger = logging.getLogger(__name__) From 9d12f0ac0b1e63c9044f8636d06262e6cf876aa9 Mon Sep 17 00:00:00 2001 From: Tim Huff Date: Thu, 9 Oct 2025 17:45:59 -0700 Subject: [PATCH 17/37] removing unneeded import --- src/framegrab/rtsp_server.py | 1 - 1 file changed, 1 deletion(-) diff --git a/src/framegrab/rtsp_server.py b/src/framegrab/rtsp_server.py index 07b134b..4b7338f 100644 --- a/src/framegrab/rtsp_server.py +++ b/src/framegrab/rtsp_server.py @@ -242,7 +242,6 @@ def _producer_worker(self, stream: Stream, mount: MountState, stop_evt: threadin try: frame = stream.callback() - print(f'got a frame for {stream.mount_point}') frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) fb = frame.tobytes() except Exception: From ba0f1f3a3d3313b321131b63e23e950e7a66dbfb Mon Sep 17 00:00:00 2001 From: Tim Huff Date: Thu, 9 Oct 2025 17:50:05 -0700 Subject: [PATCH 18/37] removing unnecessary code --- src/framegrab/rtsp_server.py | 19 +++++++------------ 1 file changed, 7 insertions(+), 12 deletions(-) diff --git a/src/framegrab/rtsp_server.py b/src/framegrab/rtsp_server.py index a031c24..b4b07b1 100644 --- a/src/framegrab/rtsp_server.py +++ b/src/framegrab/rtsp_server.py @@ -270,18 +270,13 @@ def _push_to_client(self, client_entry: ClientEntry, frame_bytes: bytes, duratio if app is None: return - try: - buf = Gst.Buffer.new_allocate(None, len(frame_bytes), None) - buf.fill(0, frame_bytes) - fc = client_entry.frame_count - buf.pts = int(fc * duration_ns) - buf.duration = int(duration_ns) - client_entry.frame_count = fc + 1 - app.emit("push-buffer", buf) - except Exception: - logger.exception("push failed; removing client") - # Use the existing _remove_client method - self._remove_client(mount, client_entry) + buf = Gst.Buffer.new_allocate(None, len(frame_bytes), None) + buf.fill(0, frame_bytes) + fc = client_entry.frame_count + buf.pts = int(fc * duration_ns) + buf.duration = int(duration_ns) + client_entry.frame_count = fc + 1 + app.emit("push-buffer", buf) def _remove_client(self, mount: MountState, client: ClientEntry): """Remove a client from a mount and stop the producer if no clients remain. From 9e942a3e8bd434ea7afe2519a8475788bd6bbf93 Mon Sep 17 00:00:00 2001 From: Tim Huff Date: Fri, 10 Oct 2025 11:25:00 -0700 Subject: [PATCH 19/37] simplying the sample script --- sample_scripts/video_to_rtsp.py | 32 ++++++++++++++++---------------- src/framegrab/rtsp_server.py | 12 ++++++------ 2 files changed, 22 insertions(+), 22 deletions(-) diff --git a/sample_scripts/video_to_rtsp.py b/sample_scripts/video_to_rtsp.py index 34d5eaf..a23e12b 100644 --- a/sample_scripts/video_to_rtsp.py +++ b/sample_scripts/video_to_rtsp.py @@ -20,7 +20,6 @@ def __init__(self, video_paths: list[str], port: int): self.port = port self.server = RTSPServer(port=port) - self.grabbers = [] for n, video_path in enumerate(video_paths): @@ -39,20 +38,19 @@ def __init__(self, video_paths: list[str], port: int): # Reset to beginning after test frame so that streaming starts from the beginning of the video grabber.seek_to_beginning() - def get_frame_callback( - grabber: FileStreamFrameGrabber = grabber, - video_path: str = video_path, - ) -> np.ndarray: - try: - return grabber.grab() - except RuntimeWarning: - last_frame_read_number = grabber.get_last_frame_read_number() - logger.info(f'Reached the end of {video_path}. Read {last_frame_read_number + 1} frames. Restarting from the beginning of the video...') - grabber.seek_to_beginning() - return grabber.grab() - + callback = lambda g=grabber: self.get_frame_callback(g) mount_point = f'/stream{n}' - self.server.create_stream(get_frame_callback, width=width, height=height, fps=fps, mount_point=mount_point) + self.server.create_stream(callback, width=width, height=height, fps=fps, mount_point=mount_point) + + def get_frame_callback(self, grabber: FileStreamFrameGrabber) -> np.ndarray: + try: + return grabber.grab() + except RuntimeWarning: + last_frame_read_number = grabber.get_last_frame_read_number() + video_path = grabber.config.filename + logger.info(f'Reached the end of {video_path}. Read {last_frame_read_number + 1} frames. Restarting from the beginning of the video...') + grabber.seek_to_beginning() + return grabber.grab() def list_rtsp_urls(self) -> list[str]: return self.server.list_rtsp_urls() @@ -88,6 +86,8 @@ def stop(self) -> None: time.sleep(1) except KeyboardInterrupt: - logger.info("Shutting down gracefully...") + logger.info("Keyboard interrupt detected.") finally: - app.stop() \ No newline at end of file + logger.info("Stopping RTSP server...") + app.stop() + logger.info(f'RTSP server stopped.') \ No newline at end of file diff --git a/src/framegrab/rtsp_server.py b/src/framegrab/rtsp_server.py index b4b07b1..e46d113 100644 --- a/src/framegrab/rtsp_server.py +++ b/src/framegrab/rtsp_server.py @@ -26,14 +26,14 @@ @dataclass -class ClientEntry: +class ClientState: """Per-client state for a single RTSP consumer. Holds the per-connection `appsrc` element and a monotonically increasing `frame_count` used to compute buffer PTS/duration. """ - appsrc: Any + appsrc: Any # Gst.Element frame_count: int = 0 @@ -46,7 +46,7 @@ class MountState: - `producer`: (thread, stop_event) driving frames for this mount, or None """ - clients: List[ClientEntry] = field(default_factory=list) + clients: List[ClientState] = field(default_factory=list) clients_lock: threading.Lock = field(default_factory=threading.Lock) # (thread, stop_event) when a producer is running, else None producer: Optional[Tuple[threading.Thread, threading.Event]] = None @@ -101,7 +101,7 @@ def do_configure(self, rtsp_media): mount = self.server._mounts[self.stream.mount_point] duration = int(Gst.SECOND / float(self.stream.fps)) - client = ClientEntry(appsrc=appsrc) + client = ClientState(appsrc=appsrc) with mount.clients_lock: mount.clients.append(client) @@ -257,7 +257,7 @@ def _producer_worker(self, stream: Stream, mount: MountState, stop_evt: threadin if not mount.clients: break - def _push_to_client(self, client_entry: ClientEntry, frame_bytes: bytes, duration_ns: int, mount: MountState): + def _push_to_client(self, client_entry: ClientState, frame_bytes: bytes, duration_ns: int, mount: MountState): """Push a frame buffer to a specific client. Args: @@ -278,7 +278,7 @@ def _push_to_client(self, client_entry: ClientEntry, frame_bytes: bytes, duratio client_entry.frame_count = fc + 1 app.emit("push-buffer", buf) - def _remove_client(self, mount: MountState, client: ClientEntry): + def _remove_client(self, mount: MountState, client: ClientState): """Remove a client from a mount and stop the producer if no clients remain. Args: From 94723d4436137af01b5841b3940e2e3f62b25d44 Mon Sep 17 00:00:00 2001 From: Tim Huff Date: Fri, 10 Oct 2025 12:07:20 -0700 Subject: [PATCH 20/37] renaming class and removing unnecessary class attribute --- sample_scripts/video_to_rtsp.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/sample_scripts/video_to_rtsp.py b/sample_scripts/video_to_rtsp.py index a23e12b..ca0498b 100644 --- a/sample_scripts/video_to_rtsp.py +++ b/sample_scripts/video_to_rtsp.py @@ -12,8 +12,7 @@ logger = logging.getLogger(__name__) -class VideoToRTSPSampleApp: - frame_cache = {} +class VideoToRtspSampleApp: def __init__(self, video_paths: list[str], port: int): self.video_paths = video_paths From 58c82a9e1cdd182458de65ecb1fbe91531a36d68 Mon Sep 17 00:00:00 2001 From: Tim Huff Date: Fri, 10 Oct 2025 12:09:07 -0700 Subject: [PATCH 21/37] fixing typo --- sample_scripts/video_to_rtsp.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sample_scripts/video_to_rtsp.py b/sample_scripts/video_to_rtsp.py index ca0498b..33fdec6 100644 --- a/sample_scripts/video_to_rtsp.py +++ b/sample_scripts/video_to_rtsp.py @@ -69,7 +69,7 @@ def stop(self) -> None: parser.add_argument('--port', type=int, default=8554, help='RTSP server port') args = parser.parse_args() - app = VideoToRTSPSampleApp(args.video_paths, args.port) + app = VideoToRtspSampleApp(args.video_paths, args.port) try: app.run() From 2a9404698353aec79d521fd384aed762e63cb7d9 Mon Sep 17 00:00:00 2001 From: Tim Huff Date: Fri, 10 Oct 2025 13:48:07 -0700 Subject: [PATCH 22/37] adding a test --- test/test_rtsp.py | 81 +++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 81 insertions(+) create mode 100644 test/test_rtsp.py diff --git a/test/test_rtsp.py b/test/test_rtsp.py new file mode 100644 index 0000000..524a09a --- /dev/null +++ b/test/test_rtsp.py @@ -0,0 +1,81 @@ +import unittest +import numpy as np +import time + +from framegrab.rtsp_server import RTSPServer +from framegrab.grabber import RTSPFrameGrabber +from framegrab.config import RTSPFrameGrabberConfig + +import time + +def generate_static_frame(width: int, height: int) -> np.ndarray: + return np.random.randint(0, 255, (height, width, 3), dtype=np.uint8) + +class TestRTSP(unittest.TestCase): + def setUp(self): + """Set up test RTSP server and grabber.""" + self.port = 8554 + self.server = None + self.grabber = None + + def tearDown(self): + """Clean up resources.""" + if self.grabber: + self.grabber.release() + if self.server: + self.server.stop() + + def test_rtsp_server_to_grabber_integration(self): + """Test that RTSPFrameGrabber can successfully grab frames from RTSPServer.""" + # Create a static noise frame for testing + TEST_FRAME_WIDTH = 640 + TEST_FRAME_HEIGHT = 480 + FPS = 15.0 + + # Create RTSP server with static frame callback + def frame_callback(): + return generate_static_frame(TEST_FRAME_WIDTH, TEST_FRAME_HEIGHT) + + self.server = RTSPServer(port=self.port) + self.server.create_stream( + callback=frame_callback, + width=TEST_FRAME_WIDTH, + height=TEST_FRAME_HEIGHT, + fps=FPS, + mount_point="/test" + ) + self.server.start() + + # Create RTSP grabber to connect to the server + rtsp_url = f"rtsp://localhost:{self.port}/test" + config = RTSPFrameGrabberConfig( + rtsp_url=rtsp_url, + keep_connection_open=True, + name="test_rtsp_grabber" + ) + self.grabber = RTSPFrameGrabber(config) + + # Grab a frame and validate it + frame = self.grabber.grab() + + + # Assertions + self.assertIsInstance(frame, np.ndarray) + + test_frame_shape = (TEST_FRAME_HEIGHT, TEST_FRAME_WIDTH, 3) + self.assertEqual(frame.shape, test_frame_shape) + self.assertEqual(frame.dtype, np.uint8) + + # Verify we got a valid frame (not all zeros or corrupted) + self.assertGreater(frame.max(), 0) + self.assertLess(frame.min(), 255) + + # Verify that if we grab two separate frames, they are unique + plenty_of_time_for_next_frame_to_be_available = 1 / FPS * 2 + time.sleep(plenty_of_time_for_next_frame_to_be_available) + frame2 = self.grabber.grab() + assert not np.array_equal(frame, frame2), "The two captured frames were not unique. Is the server publishing correct frames at a correct FPS?" + + +if __name__ == "__main__": + unittest.main() From 29160448033d961904e5a726669e857ec2e4b6ae Mon Sep 17 00:00:00 2001 From: Tim Huff Date: Fri, 10 Oct 2025 14:00:24 -0700 Subject: [PATCH 23/37] trying to fix test --- .github/workflows/tests.yaml | 6 ++++-- docker/Dockerfile | 33 +++++++++++++++++++++++++++++++++ 2 files changed, 37 insertions(+), 2 deletions(-) create mode 100644 docker/Dockerfile diff --git a/.github/workflows/tests.yaml b/.github/workflows/tests.yaml index acf3f12..1c4a75c 100644 --- a/.github/workflows/tests.yaml +++ b/.github/workflows/tests.yaml @@ -35,5 +35,7 @@ jobs: pip install -U pip pip install poetry poetry install --extras youtube - - name: run tests - run: poetry run pytest + - name: Build Docker image with dependencies + run: docker build -f docker/Dockerfile -t framegrab-test . + - name: Run tests in Docker + run: docker run --rm framegrab-test poetry run pytest diff --git a/docker/Dockerfile b/docker/Dockerfile new file mode 100644 index 0000000..6a5c749 --- /dev/null +++ b/docker/Dockerfile @@ -0,0 +1,33 @@ +FROM python:3.11-slim + +# Install system dependencies for GStreamer and RTSP +RUN apt-get update && apt-get install -y \ + gstreamer1.0-tools \ + gstreamer1.0-rtsp \ + gstreamer1.0-plugins-base \ + gstreamer1.0-plugins-good \ + gstreamer1.0-plugins-bad \ + gstreamer1.0-plugins-ugly \ + libgstreamer1.0-dev \ + libgirepository1.0-dev \ + gir1.2-gst-rtsp-server-1.0 \ + python3-gi \ + && rm -rf /var/lib/apt/lists/* + +# Install Poetry +RUN pip install poetry + +# Set working directory +WORKDIR /app + +# Copy dependency files +COPY pyproject.toml poetry.lock ./ + +# Install Python dependencies +RUN poetry config virtualenvs.create false && \ + poetry install --extras youtube + +# Copy source code +COPY . . + +# No CMD or ENTRYPOINT - let the caller decide what to run From 53cf0fdb7f75d211ae21afa693f9eacae591887b Mon Sep 17 00:00:00 2001 From: Tim Huff Date: Fri, 10 Oct 2025 14:04:49 -0700 Subject: [PATCH 24/37] fixing dockerfile --- docker/Dockerfile | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/docker/Dockerfile b/docker/Dockerfile index 6a5c749..e862c5b 100644 --- a/docker/Dockerfile +++ b/docker/Dockerfile @@ -25,9 +25,7 @@ COPY pyproject.toml poetry.lock ./ # Install Python dependencies RUN poetry config virtualenvs.create false && \ - poetry install --extras youtube + poetry install --extras youtube --no-root # Copy source code COPY . . - -# No CMD or ENTRYPOINT - let the caller decide what to run From f2ec3c1ab628dfd53ca2c72d0d088d3307d297f7 Mon Sep 17 00:00:00 2001 From: Tim Huff Date: Fri, 10 Oct 2025 14:09:24 -0700 Subject: [PATCH 25/37] fixing dockerfile again --- docker/Dockerfile | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/docker/Dockerfile b/docker/Dockerfile index e862c5b..6d939b0 100644 --- a/docker/Dockerfile +++ b/docker/Dockerfile @@ -20,12 +20,12 @@ RUN pip install poetry # Set working directory WORKDIR /app -# Copy dependency files -COPY pyproject.toml poetry.lock ./ +# Copy dependency files and README (needed for Poetry) +COPY pyproject.toml poetry.lock README.md ./ # Install Python dependencies RUN poetry config virtualenvs.create false && \ - poetry install --extras youtube --no-root + poetry install --extras youtube # Copy source code COPY . . From 00831d0e1f3939a0cba897c68fbf24202666c5ce Mon Sep 17 00:00:00 2001 From: Tim Huff Date: Fri, 10 Oct 2025 14:19:38 -0700 Subject: [PATCH 26/37] fixing dockerfile again --- docker/Dockerfile | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/docker/Dockerfile b/docker/Dockerfile index 6d939b0..87b2619 100644 --- a/docker/Dockerfile +++ b/docker/Dockerfile @@ -20,12 +20,9 @@ RUN pip install poetry # Set working directory WORKDIR /app -# Copy dependency files and README (needed for Poetry) -COPY pyproject.toml poetry.lock README.md ./ +# Copy everything +COPY . . -# Install Python dependencies +# Install Python dependencies and project RUN poetry config virtualenvs.create false && \ poetry install --extras youtube - -# Copy source code -COPY . . From 3386dad6ddb9f6f27723dd393296ed54404e8619 Mon Sep 17 00:00:00 2001 From: Tim Huff Date: Fri, 10 Oct 2025 14:24:19 -0700 Subject: [PATCH 27/37] fixing dockerfile again --- docker/Dockerfile | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/docker/Dockerfile b/docker/Dockerfile index 87b2619..6df8353 100644 --- a/docker/Dockerfile +++ b/docker/Dockerfile @@ -23,6 +23,9 @@ WORKDIR /app # Copy everything COPY . . -# Install Python dependencies and project +# Install Python dependencies only RUN poetry config virtualenvs.create false && \ - poetry install --extras youtube + poetry install --extras youtube --no-root + +# Install the project +RUN pip install . From 6db057093d2ae3531aa95c37fdb55b3a03e021cb Mon Sep 17 00:00:00 2001 From: Tim Huff Date: Fri, 10 Oct 2025 14:34:55 -0700 Subject: [PATCH 28/37] fixing dockerfile again --- docker/Dockerfile | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/docker/Dockerfile b/docker/Dockerfile index 6df8353..65928ad 100644 --- a/docker/Dockerfile +++ b/docker/Dockerfile @@ -1,7 +1,10 @@ -FROM python:3.11-slim +FROM ubuntu:22.04 -# Install system dependencies for GStreamer and RTSP +# Install Python and system dependencies for GStreamer and RTSP RUN apt-get update && apt-get install -y \ + python3.11 \ + python3.11-pip \ + python3.11-venv \ gstreamer1.0-tools \ gstreamer1.0-rtsp \ gstreamer1.0-plugins-base \ @@ -12,8 +15,14 @@ RUN apt-get update && apt-get install -y \ libgirepository1.0-dev \ gir1.2-gst-rtsp-server-1.0 \ python3-gi \ + python3-gi-cairo \ + gir1.2-gstreamer-1.0 \ + gir1.2-gst-plugins-base-1.0 \ && rm -rf /var/lib/apt/lists/* +# Create symlink for python command +RUN ln -s /usr/bin/python3.11 /usr/bin/python + # Install Poetry RUN pip install poetry From 2484e5ee96774a4299966d26d0bd2caf8cc5beab Mon Sep 17 00:00:00 2001 From: Tim Huff Date: Fri, 10 Oct 2025 14:45:49 -0700 Subject: [PATCH 29/37] fixing dockerfile again --- docker/Dockerfile | 14 ++++---------- 1 file changed, 4 insertions(+), 10 deletions(-) diff --git a/docker/Dockerfile b/docker/Dockerfile index 65928ad..a3cb292 100644 --- a/docker/Dockerfile +++ b/docker/Dockerfile @@ -1,10 +1,9 @@ FROM ubuntu:22.04 -# Install Python and system dependencies for GStreamer and RTSP +# Install Python and minimal GStreamer/RTSP dependencies RUN apt-get update && apt-get install -y \ - python3.11 \ - python3.11-pip \ - python3.11-venv \ + python3 \ + python3-pip \ gstreamer1.0-tools \ gstreamer1.0-rtsp \ gstreamer1.0-plugins-base \ @@ -14,15 +13,10 @@ RUN apt-get update && apt-get install -y \ libgstreamer1.0-dev \ libgirepository1.0-dev \ gir1.2-gst-rtsp-server-1.0 \ - python3-gi \ - python3-gi-cairo \ gir1.2-gstreamer-1.0 \ - gir1.2-gst-plugins-base-1.0 \ + python3-gi \ && rm -rf /var/lib/apt/lists/* -# Create symlink for python command -RUN ln -s /usr/bin/python3.11 /usr/bin/python - # Install Poetry RUN pip install poetry From 7417fd70fdc8f16fbfd3df52ec64dbdb98e841af Mon Sep 17 00:00:00 2001 From: Tim Huff Date: Fri, 10 Oct 2025 14:58:55 -0700 Subject: [PATCH 30/37] fixing CICD --- test/test_rtsp.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/test_rtsp.py b/test/test_rtsp.py index 524a09a..b91c5a8 100644 --- a/test/test_rtsp.py +++ b/test/test_rtsp.py @@ -47,7 +47,7 @@ def frame_callback(): self.server.start() # Create RTSP grabber to connect to the server - rtsp_url = f"rtsp://localhost:{self.port}/test" + rtsp_url = f"rtsp://127.0.0.1:{self.port}/test" config = RTSPFrameGrabberConfig( rtsp_url=rtsp_url, keep_connection_open=True, From ae9741bbb54ba4df5c57841fd4c76e01bfe23854 Mon Sep 17 00:00:00 2001 From: Tim Huff Date: Fri, 10 Oct 2025 15:18:47 -0700 Subject: [PATCH 31/37] fixing CICD --- test/test_rtsp.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/test/test_rtsp.py b/test/test_rtsp.py index b91c5a8..39c2be9 100644 --- a/test/test_rtsp.py +++ b/test/test_rtsp.py @@ -31,6 +31,7 @@ def test_rtsp_server_to_grabber_integration(self): TEST_FRAME_WIDTH = 640 TEST_FRAME_HEIGHT = 480 FPS = 15.0 + MOUNT_POINT = '/test' # Create RTSP server with static frame callback def frame_callback(): @@ -42,16 +43,16 @@ def frame_callback(): width=TEST_FRAME_WIDTH, height=TEST_FRAME_HEIGHT, fps=FPS, - mount_point="/test" + mount_point=MOUNT_POINT ) self.server.start() + time.sleep(5) # Create RTSP grabber to connect to the server - rtsp_url = f"rtsp://127.0.0.1:{self.port}/test" + rtsp_url = f"rtsp://localhost:{self.port}{MOUNT_POINT}" config = RTSPFrameGrabberConfig( rtsp_url=rtsp_url, keep_connection_open=True, - name="test_rtsp_grabber" ) self.grabber = RTSPFrameGrabber(config) From 3383ae0a26a1524505ed3e5eaf6ba4c2430b3165 Mon Sep 17 00:00:00 2001 From: Tim Huff Date: Fri, 10 Oct 2025 15:35:48 -0700 Subject: [PATCH 32/37] fixed the test --- src/framegrab/rtsp_server.py | 23 ++++++++++++++++++++++- test/test_rtsp.py | 2 -- 2 files changed, 22 insertions(+), 3 deletions(-) diff --git a/src/framegrab/rtsp_server.py b/src/framegrab/rtsp_server.py index e46d113..adef7ca 100644 --- a/src/framegrab/rtsp_server.py +++ b/src/framegrab/rtsp_server.py @@ -1,5 +1,6 @@ import logging import platform +import socket import threading import time from dataclasses import dataclass, field @@ -170,7 +171,27 @@ def start(self): self._running = True self._loop_thread = threading.Thread(target=self._run_server, daemon=True) self._loop_thread.start() - time.sleep(0.2) + + # Wait for server to be ready + self._wait_for_server_ready() + + def _wait_for_server_ready(self, timeout: float = 10.0) -> None: + """Wait for the RTSP server to be ready by checking if the port is bound.""" + start_time = time.time() + while time.time() - start_time < timeout: + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock.settimeout(0.1) + result = sock.connect_ex(('127.0.0.1', self.port)) + sock.close() + + if result == 0: # Port is open + server_ready_time = time.time() - start_time + logger.debug(f'Server ready in {server_ready_time:.2f} seconds.') + return + else: + time.sleep(0.01) + + raise RuntimeError(f"RTSP server failed to start within {timeout} seconds") def stop(self): if not self._running: diff --git a/test/test_rtsp.py b/test/test_rtsp.py index 39c2be9..645cf6d 100644 --- a/test/test_rtsp.py +++ b/test/test_rtsp.py @@ -46,7 +46,6 @@ def frame_callback(): mount_point=MOUNT_POINT ) self.server.start() - time.sleep(5) # Create RTSP grabber to connect to the server rtsp_url = f"rtsp://localhost:{self.port}{MOUNT_POINT}" @@ -59,7 +58,6 @@ def frame_callback(): # Grab a frame and validate it frame = self.grabber.grab() - # Assertions self.assertIsInstance(frame, np.ndarray) From 94148b4676a867d7847c19b8d0f950e440a2f503 Mon Sep 17 00:00:00 2001 From: Auto-format Bot Date: Fri, 10 Oct 2025 22:36:14 +0000 Subject: [PATCH 33/37] Automatically reformatting code with black and isort --- src/framegrab/rtsp_server.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/framegrab/rtsp_server.py b/src/framegrab/rtsp_server.py index adef7ca..e559889 100644 --- a/src/framegrab/rtsp_server.py +++ b/src/framegrab/rtsp_server.py @@ -171,7 +171,7 @@ def start(self): self._running = True self._loop_thread = threading.Thread(target=self._run_server, daemon=True) self._loop_thread.start() - + # Wait for server to be ready self._wait_for_server_ready() @@ -181,16 +181,16 @@ def _wait_for_server_ready(self, timeout: float = 10.0) -> None: while time.time() - start_time < timeout: sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.settimeout(0.1) - result = sock.connect_ex(('127.0.0.1', self.port)) + result = sock.connect_ex(("127.0.0.1", self.port)) sock.close() - + if result == 0: # Port is open server_ready_time = time.time() - start_time - logger.debug(f'Server ready in {server_ready_time:.2f} seconds.') + logger.debug(f"Server ready in {server_ready_time:.2f} seconds.") return else: time.sleep(0.01) - + raise RuntimeError(f"RTSP server failed to start within {timeout} seconds") def stop(self): From 8ebb873a6ed1eba3d79b07cf64a02a51b8f7ed64 Mon Sep 17 00:00:00 2001 From: Tim Huff Date: Fri, 10 Oct 2025 15:46:42 -0700 Subject: [PATCH 34/37] bumping version --- pyproject.toml | 2 +- src/framegrab/rtsp_server.py | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 7ed93cc..023c426 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "framegrab" -version = "0.13.2" +version = "0.14.0" description = "Easily grab frames from cameras or streams" authors = ["Groundlight "] license = "MIT" diff --git a/src/framegrab/rtsp_server.py b/src/framegrab/rtsp_server.py index e559889..a23c5e3 100644 --- a/src/framegrab/rtsp_server.py +++ b/src/framegrab/rtsp_server.py @@ -172,7 +172,6 @@ def start(self): self._loop_thread = threading.Thread(target=self._run_server, daemon=True) self._loop_thread.start() - # Wait for server to be ready self._wait_for_server_ready() def _wait_for_server_ready(self, timeout: float = 10.0) -> None: From 1140361b31917e98bbaae557c9572e78b49c357c Mon Sep 17 00:00:00 2001 From: Tim Huff Date: Fri, 10 Oct 2025 16:02:53 -0700 Subject: [PATCH 35/37] cleaning up cicd --- .github/workflows/tests.yaml | 5 ----- test/test_rtsp.py | 4 ++-- 2 files changed, 2 insertions(+), 7 deletions(-) diff --git a/.github/workflows/tests.yaml b/.github/workflows/tests.yaml index 1c4a75c..1bf5b7f 100644 --- a/.github/workflows/tests.yaml +++ b/.github/workflows/tests.yaml @@ -30,11 +30,6 @@ jobs: python-version: ${{ matrix.python-version }} - name: Display Python version run: python -c "import sys; print(sys.version)" - - name: install poetry and build poetry environment - run: | - pip install -U pip - pip install poetry - poetry install --extras youtube - name: Build Docker image with dependencies run: docker build -f docker/Dockerfile -t framegrab-test . - name: Run tests in Docker diff --git a/test/test_rtsp.py b/test/test_rtsp.py index 645cf6d..005b07c 100644 --- a/test/test_rtsp.py +++ b/test/test_rtsp.py @@ -8,7 +8,7 @@ import time -def generate_static_frame(width: int, height: int) -> np.ndarray: +def generate_noise_frame(width: int, height: int) -> np.ndarray: return np.random.randint(0, 255, (height, width, 3), dtype=np.uint8) class TestRTSP(unittest.TestCase): @@ -35,7 +35,7 @@ def test_rtsp_server_to_grabber_integration(self): # Create RTSP server with static frame callback def frame_callback(): - return generate_static_frame(TEST_FRAME_WIDTH, TEST_FRAME_HEIGHT) + return generate_noise_frame(TEST_FRAME_WIDTH, TEST_FRAME_HEIGHT) self.server = RTSPServer(port=self.port) self.server.create_stream( From 8a91999e36da9514f92feb107368814b32fc1a3d Mon Sep 17 00:00:00 2001 From: Tim Huff Date: Mon, 13 Oct 2025 10:47:01 -0700 Subject: [PATCH 36/37] update readme --- README.md | 34 ++++++++++++++++++++++++++++++++++ 1 file changed, 34 insertions(+) diff --git a/README.md b/README.md index bbe1015..53b506b 100644 --- a/README.md +++ b/README.md @@ -866,6 +866,40 @@ if m.motion_detected(frame): print("Motion detected!") ``` +### RTSP Server +Framegrab provides tools for RTSP stream generation, which can be useful for testing applications. + +Basic usage looks like this: +``` +server = RTSPServer(port=port) +server.create_stream(get_frame_callback1, width, height, fps, mount_point='/stream0') +server.create_stream(get_frame_callback2, width, height, fps, mount_point='/stream1') +server.start() +time.sleep(n) # keep the server up +server.stop() +``` + +Using these tools requires a number of system dependencies. which are listed below: + +``` +gstreamer1.0-tools +gstreamer1.0-rtsp +gstreamer1.0-plugins-base +gstreamer1.0-plugins-good +gstreamer1.0-plugins-bad +gstreamer1.0-plugins-ugly +libgstreamer1.0-dev +libgirepository1.0-dev +gir1.2-gst-rtsp-server-1.0 +gir1.2-gstreamer-1.0 +``` +We test RTSP server functionality on Ubuntu. It may also work on Mac. It will _not_ work on Windows natively, but you may be able to get it to work with Docker or WSL. + +We provide a [Dockerfile](docker/Dockerfile) that contains the necessary packages. + +For inspiration on how to implement an RTSP server, see [sample_scripts/video_to_rtsp.py](sample_scripts/video_to_rtsp.py), which shows can you can convert multiple videos into RTSP streams with a single RTSP server. + + ## Examples ### Generic USB From 296213f0cc49811d9c4e41ee29b3fe77337480bb Mon Sep 17 00:00:00 2001 From: Tim Huff Date: Mon, 13 Oct 2025 10:49:37 -0700 Subject: [PATCH 37/37] fix typo --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 53b506b..09ed3fc 100644 --- a/README.md +++ b/README.md @@ -879,7 +879,7 @@ time.sleep(n) # keep the server up server.stop() ``` -Using these tools requires a number of system dependencies. which are listed below: +Using these tools requires a number of system dependencies, which are listed below: ``` gstreamer1.0-tools