diff --git a/.github/workflows/tests.yaml b/.github/workflows/tests.yaml index acf3f12..1bf5b7f 100644 --- a/.github/workflows/tests.yaml +++ b/.github/workflows/tests.yaml @@ -30,10 +30,7 @@ jobs: python-version: ${{ matrix.python-version }} - name: Display Python version run: python -c "import sys; print(sys.version)" - - name: install poetry and build poetry environment - run: | - pip install -U pip - pip install poetry - poetry install --extras youtube - - name: run tests - run: poetry run pytest + - name: Build Docker image with dependencies + run: docker build -f docker/Dockerfile -t framegrab-test . + - name: Run tests in Docker + run: docker run --rm framegrab-test poetry run pytest diff --git a/README.md b/README.md index bbe1015..09ed3fc 100644 --- a/README.md +++ b/README.md @@ -866,6 +866,40 @@ if m.motion_detected(frame): print("Motion detected!") ``` +### RTSP Server +Framegrab provides tools for RTSP stream generation, which can be useful for testing applications. + +Basic usage looks like this: +``` +server = RTSPServer(port=port) +server.create_stream(get_frame_callback1, width, height, fps, mount_point='/stream0') +server.create_stream(get_frame_callback2, width, height, fps, mount_point='/stream1') +server.start() +time.sleep(n) # keep the server up +server.stop() +``` + +Using these tools requires a number of system dependencies, which are listed below: + +``` +gstreamer1.0-tools +gstreamer1.0-rtsp +gstreamer1.0-plugins-base +gstreamer1.0-plugins-good +gstreamer1.0-plugins-bad +gstreamer1.0-plugins-ugly +libgstreamer1.0-dev +libgirepository1.0-dev +gir1.2-gst-rtsp-server-1.0 +gir1.2-gstreamer-1.0 +``` +We test RTSP server functionality on Ubuntu. It may also work on Mac. It will _not_ work on Windows natively, but you may be able to get it to work with Docker or WSL. + +We provide a [Dockerfile](docker/Dockerfile) that contains the necessary packages. + +For inspiration on how to implement an RTSP server, see [sample_scripts/video_to_rtsp.py](sample_scripts/video_to_rtsp.py), which shows can you can convert multiple videos into RTSP streams with a single RTSP server. + + ## Examples ### Generic USB diff --git a/docker/Dockerfile b/docker/Dockerfile new file mode 100644 index 0000000..a3cb292 --- /dev/null +++ b/docker/Dockerfile @@ -0,0 +1,34 @@ +FROM ubuntu:22.04 + +# Install Python and minimal GStreamer/RTSP dependencies +RUN apt-get update && apt-get install -y \ + python3 \ + python3-pip \ + gstreamer1.0-tools \ + gstreamer1.0-rtsp \ + gstreamer1.0-plugins-base \ + gstreamer1.0-plugins-good \ + gstreamer1.0-plugins-bad \ + gstreamer1.0-plugins-ugly \ + libgstreamer1.0-dev \ + libgirepository1.0-dev \ + gir1.2-gst-rtsp-server-1.0 \ + gir1.2-gstreamer-1.0 \ + python3-gi \ + && rm -rf /var/lib/apt/lists/* + +# Install Poetry +RUN pip install poetry + +# Set working directory +WORKDIR /app + +# Copy everything +COPY . . + +# Install Python dependencies only +RUN poetry config virtualenvs.create false && \ + poetry install --extras youtube --no-root + +# Install the project +RUN pip install . diff --git a/pyproject.toml b/pyproject.toml index 7ed93cc..023c426 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "framegrab" -version = "0.13.2" +version = "0.14.0" description = "Easily grab frames from cameras or streams" authors = ["Groundlight "] license = "MIT" diff --git a/sample_scripts/video_to_rtsp.py b/sample_scripts/video_to_rtsp.py index 1de83cd..33fdec6 100644 --- a/sample_scripts/video_to_rtsp.py +++ b/sample_scripts/video_to_rtsp.py @@ -1,55 +1,92 @@ import argparse -from framegrab import FrameGrabber -from framegrab.config import FileStreamFrameGrabberConfig, GenericUSBFrameGrabberConfig +import logging +import time +from framegrab.grabber import FileStreamFrameGrabber +from framegrab.config import FileStreamFrameGrabberConfig from framegrab.rtsp_server import RTSPServer -import cv2 import numpy as np -import time -def main(): - parser = argparse.ArgumentParser(description='Stream a video file via RTSP') - parser.add_argument('video_path', help='Path to the video file to stream') - parser.add_argument('--port', type=int, default=8554, help='RTSP server port (default: 8554)') - args = parser.parse_args() +logging.basicConfig(level=logging.INFO, format='%(levelname)s - %(name)s - %(message)s') + +logger = logging.getLogger(__name__) + +class VideoToRtspSampleApp: + def __init__(self, video_paths: list[str], port: int): - # Connect to the grabber - config = FileStreamFrameGrabberConfig(filename=args.video_path) - grabber = FrameGrabber.create_grabber(config) + self.video_paths = video_paths + self.port = port - # Determine the resolution of the video - test_frame = grabber.grab() - height, width, _ = test_frame.shape + self.server = RTSPServer(port=port) + + self.grabbers = [] + for n, video_path in enumerate(video_paths): + # Connect to the grabber + config = FileStreamFrameGrabberConfig(filename=video_path) + grabber = FileStreamFrameGrabber(config) + self.grabbers.append(grabber) - # Determine the FPS of the video - fps = grabber.get_fps() - - # Reset to beginning after test frame - grabber.seek_to_beginning() + # Determine the resolution of the video + test_frame = grabber.grab() + height, width, _ = test_frame.shape - def get_frame_callback() -> np.ndarray: + # Determine the FPS of the video + fps = grabber.get_fps() + + # Reset to beginning after test frame so that streaming starts from the beginning of the video + grabber.seek_to_beginning() + + callback = lambda g=grabber: self.get_frame_callback(g) + mount_point = f'/stream{n}' + self.server.create_stream(callback, width=width, height=height, fps=fps, mount_point=mount_point) + + def get_frame_callback(self, grabber: FileStreamFrameGrabber) -> np.ndarray: try: return grabber.grab() except RuntimeWarning: last_frame_read_number = grabber.get_last_frame_read_number() - print(f'Got to end of file. Read {last_frame_read_number + 1} frames. Seeking back to the beginning of the video...') + video_path = grabber.config.filename + logger.info(f'Reached the end of {video_path}. Read {last_frame_read_number + 1} frames. Restarting from the beginning of the video...') grabber.seek_to_beginning() - print(f'Returned to the beginning of the file. Continuing to read the video...') return grabber.grab() + def list_rtsp_urls(self) -> list[str]: + return self.server.list_rtsp_urls() + + def run(self) -> None: + self.server.start() + + def stop(self) -> None: + self.server.stop() + + for g in self.grabbers: + g.release() + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description='Stream multiple video files via RTSP') + parser.add_argument('video_paths', nargs='+', help='Paths to video files to stream (one or more)') + parser.add_argument('--port', type=int, default=8554, help='RTSP server port') + args = parser.parse_args() + + app = VideoToRtspSampleApp(args.video_paths, args.port) + try: - with RTSPServer(get_frame_callback, width=width, height=height, fps=fps, port=args.port) as server: - print(server) - print("Press Ctrl+C to stop...") + app.run() + logger.info(f'RTSP Server started on port {app.port}') + + rtsp_urls = app.list_rtsp_urls() + for url, path in zip(rtsp_urls, app.video_paths): + logger.info(f'{path} available at {url}') + logger.info("Press Ctrl+C to stop...") + + # Keep the program running + while True: + time.sleep(1) - while True: - time.sleep(1) # Keep alive, wake up periodically to check for KeyboardInterrupt - except KeyboardInterrupt: - print("\nShutting down gracefully...") + logger.info("Keyboard interrupt detected.") finally: - grabber.release() - -if __name__ == "__main__": - main() \ No newline at end of file + logger.info("Stopping RTSP server...") + app.stop() + logger.info(f'RTSP server stopped.') \ No newline at end of file diff --git a/sample_scripts/view_rtsp_stream.py b/sample_scripts/view_rtsp_stream.py new file mode 100644 index 0000000..1e17939 --- /dev/null +++ b/sample_scripts/view_rtsp_stream.py @@ -0,0 +1,71 @@ +import argparse +from framegrab import FrameGrabber +from framegrab.config import RTSPFrameGrabberConfig + +import cv2 +import numpy as np + +def resize_frame(frame: np.ndarray, max_width: int = None, max_height: int = None) -> np.ndarray: + """ + Resizes an image to fit within a given height and/or width, without changing the aspect ratio. + + Args: + frame: Input image as numpy array + max_width: Maximum width (optional) + max_height: Maximum height (optional) + + Returns: + Resized image that fits within the specified dimensions + """ + if max_width is None and max_height is None: + return frame + + height, width = frame.shape[:2] + + # Calculate scaling factors + scale_w = scale_h = 1.0 + + if max_width is not None and width > max_width: + scale_w = max_width / width + + if max_height is not None and height > max_height: + scale_h = max_height / height + + # Use the smaller scaling factor to ensure image fits within both dimensions + scale = min(scale_w, scale_h) + + # Only resize if scaling is needed + if scale < 1.0: + new_width = int(width * scale) + new_height = int(height * scale) + return cv2.resize(frame, (new_width, new_height)) + + return frame + +def main(): + parser = argparse.ArgumentParser(description='Stream RTSP video using framegrab') + parser.add_argument('rtsp_url', help='RTSP URL to stream (e.g., rtsp://localhost:8554/stream_fullsize)') + args = parser.parse_args() + + config = RTSPFrameGrabberConfig(rtsp_url=args.rtsp_url) + grabber = FrameGrabber.create_grabber(config) + + print(f"Streaming from: {args.rtsp_url}") + print("Press 'q' to quit") + + try: + while True: + frame = grabber.grab() + resized_frame = resize_frame(frame, 640, 480) # get a smaller frame so it's easier to view + cv2.imshow(f'Streaming {args.rtsp_url}', resized_frame) + key = cv2.waitKey(30) + if key == ord('q'): + break + except KeyboardInterrupt: + print("\nStopping...") + finally: + grabber.release() + cv2.destroyAllWindows() + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/src/framegrab/grabber.py b/src/framegrab/grabber.py index 8ff5835..87eceeb 100644 --- a/src/framegrab/grabber.py +++ b/src/framegrab/grabber.py @@ -1382,7 +1382,7 @@ def _initialize_grabber_implementation(self): raise ValueError(f"Could not read first frame of file {self.config.filename}. Is it a valid video file?") # Reset frame position back to the first frame after validation - self.capture.set(cv2.CAP_PROP_POS_FRAMES, 0) + self.seek_to_beginning() self.fps_source = round(self.capture.get(cv2.CAP_PROP_FPS), 2) if self.fps_source <= 0.1: @@ -1444,6 +1444,7 @@ def seek_to_frame(self, frame_number: int) -> None: frame_number: Frame number to seek to (0-based) """ if frame_number < 0: + # OpenCV fails silently when you try this, so we raise an exception raise ValueError(f"Frame number must be non-negative, got {frame_number}") self.capture.set(cv2.CAP_PROP_POS_FRAMES, frame_number) diff --git a/src/framegrab/rtsp_server.py b/src/framegrab/rtsp_server.py index d9cfaef..a23c5e3 100644 --- a/src/framegrab/rtsp_server.py +++ b/src/framegrab/rtsp_server.py @@ -1,15 +1,16 @@ import logging import platform +import socket import threading import time -from typing import Callable +from dataclasses import dataclass, field +from typing import Any, Callable, Dict, List, Optional, Tuple import cv2 import numpy as np from .unavailable_module import UnavailableModuleOrObject -# Only import GStreamer modules if available try: import gi @@ -25,29 +26,105 @@ logger = logging.getLogger(__name__) -class RTSPServer: - """Simple RTSP server that streams frames via callback.""" - - def __init__( - self, - callback: Callable[[], np.ndarray], - width: int, - height: int, - port: int = 8554, - mount_point: str = "/stream", - fps: int = 30, - ): - """Initialize RTSP server. +@dataclass +class ClientState: + """Per-client state for a single RTSP consumer. - Args: - callback: Function that returns a frame when called - width: Frame width (required) - height: Frame height (required) - port: RTSP server port (default: 8554) - mount_point: RTSP mount point (default: /stream) - fps: Target FPS for RTSP stream (default: 30) - """ + Holds the per-connection `appsrc` element and a monotonically + increasing `frame_count` used to compute buffer PTS/duration. + """ + + appsrc: Any # Gst.Element + frame_count: int = 0 + + +@dataclass +class MountState: + """Mutable runtime state for a single RTSP mount point. + + - `clients`: active consumers attached to this mount + - `clients_lock`: guards access to `clients` and `producer` + - `producer`: (thread, stop_event) driving frames for this mount, or None + """ + + clients: List[ClientState] = field(default_factory=list) + clients_lock: threading.Lock = field(default_factory=threading.Lock) + # (thread, stop_event) when a producer is running, else None + producer: Optional[Tuple[threading.Thread, threading.Event]] = None + + +@dataclass +class Stream: + """Configuration for a single RTSP stream. + + Contains the callback function to generate frames and the stream parameters + like dimensions, mount point, and frame rate. + """ + + callback: Callable[[], np.ndarray] + width: int + height: int + mount_point: str + fps: float + + +class RTSPStreamMediaFactory(GstRtspServer.RTSPMediaFactory): + """GStreamer RTSP Media Factory for handling individual stream mount points. + + This factory creates and configures the GStreamer pipeline for each RTSP stream, + manages client connections, and coordinates with the RTSPServer for frame production. + """ + def __init__(self, stream: Stream, server: "RTSPServer"): + super().__init__() + self.stream = stream + self.server = server + + def do_create_element(self, url): + """Create the GStreamer pipeline for this stream.""" + fps_int = int(round(self.stream.fps, 0)) # Gstreamer wants an int here + pipeline = ( + f"appsrc name=source is-live=true format=GST_FORMAT_TIME " + f"caps=video/x-raw,format=RGB,width={self.stream.width}," + f"height={self.stream.height},framerate={fps_int}/1 " + f"! videoconvert ! video/x-raw,format=I420 " + f"! x264enc speed-preset=ultrafast tune=zerolatency " + f"! rtph264pay name=pay0 pt=96" + ) + return Gst.parse_launch(pipeline) + + def do_configure(self, rtsp_media): + """Configure a new client connection for this stream.""" + appsrc = rtsp_media.get_element().get_child_by_name("source") + appsrc.set_property("format", Gst.Format.TIME) + appsrc.set_property("do-timestamp", False) # we manage per-client PTS + + mount = self.server._mounts[self.stream.mount_point] + duration = int(Gst.SECOND / float(self.stream.fps)) + + client = ClientState(appsrc=appsrc) + with mount.clients_lock: + mount.clients.append(client) + + # start producer if absent + if mount.producer is None: + stop_evt = threading.Event() + + thr = threading.Thread( + target=self.server._producer_worker, + args=(self.stream, mount, stop_evt, duration), + daemon=True, + name=f"prod-{self.stream.mount_point}", + ) + mount.producer = (thr, stop_evt) + thr.start() + + # cleanup when media is unprepared + rtsp_media.connect("unprepared", lambda *a: self.server._remove_client(mount, client)) + + +class RTSPServer: + def __init__(self, port: int = 8554): system = platform.system() if system == "Windows": raise RuntimeError( @@ -60,122 +137,185 @@ def __init__( "RTSPServer has limited support on macOS. " "You may need to install GStreamer via Homebrew: " ) - _ = gi, cv2, GLib, Gst, GstRtspServer + self.port = int(port) + self.streams: Dict[str, Stream] = {} - self.callback = callback - self.port = port - self.mount_point = mount_point - self.fps = int(fps) - self.width = width - self.height = height - - self.frame_count = 0 - - # GStreamer objects + # mount_point -> MountState + self._mounts: Dict[str, MountState] = {} self._server = None self._loop = None self._loop_thread = None self._running = False - self.rtsp_url = f"rtsp://localhost:{self.port}{self.mount_point}" + def create_stream(self, callback: Callable[[], np.ndarray], width: int, height: int, mount_point: str, fps: float): + if self._running: + raise RuntimeError( + "RTSPServer has already started. Streams can only be created prior to starting the server." + ) - def __str__(self) -> str: - status = "running" if self._running else "stopped" - return f"RTSPServer({status}) - {self.rtsp_url}" + if mount_point in self.streams: + raise ValueError(f"Stream '{mount_point}' exists") + self.streams[mount_point] = Stream(callback, width, height, mount_point, fps) + self._mounts[mount_point] = MountState() - def __repr__(self) -> str: - status = "running" if self._running else "stopped" - return f"RTSPServer({status}) - {self.rtsp_url}" + def list_rtsp_urls(self) -> List[str]: + return [f"rtsp://localhost:{self.port}{m}" for m in self.streams.keys()] - def start(self) -> None: - """Start the RTSP server in a background thread.""" + def start(self): if self._running: - return + raise RuntimeError("RTSPServer is already running.") + + if not self.streams: + raise RuntimeError("No streams created. Please call `create_stream` first.") self._running = True self._loop_thread = threading.Thread(target=self._run_server, daemon=True) self._loop_thread.start() - # Give server time to start - time.sleep(0.5) + self._wait_for_server_ready() - logger.info(f"RTSP server started: {self.rtsp_url}") + def _wait_for_server_ready(self, timeout: float = 10.0) -> None: + """Wait for the RTSP server to be ready by checking if the port is bound.""" + start_time = time.time() + while time.time() - start_time < timeout: + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock.settimeout(0.1) + result = sock.connect_ex(("127.0.0.1", self.port)) + sock.close() - def stop(self) -> None: - """Stop the RTSP server.""" + if result == 0: # Port is open + server_ready_time = time.time() - start_time + logger.debug(f"Server ready in {server_ready_time:.2f} seconds.") + return + else: + time.sleep(0.01) + + raise RuntimeError(f"RTSP server failed to start within {timeout} seconds") + + def stop(self): if not self._running: return - self._running = False + # stop producers + for mount in list(self._mounts.values()): + prod = mount.producer + if prod: + thr, stop_evt = prod + stop_evt.set() + thr.join(timeout=1.0) + mount.producer = None if self._loop: self._loop.quit() if self._loop_thread: self._loop_thread.join(timeout=2.0) + self.streams.clear() + self._mounts.clear() - def _run_server(self) -> None: - """Run the GStreamer RTSP server main loop.""" + def _run_server(self): Gst.init(None) - self._server = GstRtspServer.RTSPServer() self._server.set_service(str(self.port)) - - factory = self._create_media_factory() - factory.set_shared(True) - + self._server.connect("client-connected", self._on_client_connected) mount_points = self._server.get_mount_points() - mount_points.add_factory(self.mount_point, factory) - + for s in self.streams.values(): + f = RTSPStreamMediaFactory(s, self) + f.set_shared(False) # per-client pipelines + mount_points.add_factory(s.mount_point, f) self._server.attach(None) - self._loop = GLib.MainLoop() try: self._loop.run() finally: self._running = False - def _create_media_factory(self): - """Create the GStreamer media factory.""" - - class RTSPMediaFactory(GstRtspServer.RTSPMediaFactory): - def __init__(self, rtsp_server): - super().__init__() - self.rtsp_server = rtsp_server - - def do_create_element(self, url): - pipeline = ( - f"appsrc name=source is-live=true format=GST_FORMAT_TIME " - f"caps=video/x-raw,format=RGB,width={self.rtsp_server.width}," - f"height={self.rtsp_server.height},framerate={self.rtsp_server.fps}/1 " - f"! videoconvert ! video/x-raw,format=I420 " - f"! x264enc speed-preset=ultrafast tune=zerolatency " - f"! rtph264pay name=pay0 pt=96" - ) - return Gst.parse_launch(pipeline) - - def do_configure(self, rtsp_media): - appsrc = rtsp_media.get_element().get_child_by_name("source") - appsrc.connect("need-data", self.on_need_data) - - def on_need_data(self, src, length): - frame = self.rtsp_server.callback() + def _on_client_connected(self, server, client): + conn = client.get_connection() + ip = conn.get_ip() + logger.info(f"RTSP client connected: ip={ip}") + + client.connect("closed", self._on_client_disconnected) + + def _on_client_disconnected(self, client): + conn = client.get_connection() + ip = conn.get_ip() + logger.info(f"RTSP client disconnected: ip={ip}") + + def _producer_worker(self, stream: Stream, mount: MountState, stop_evt: threading.Event, duration: int): + """Worker function that produces frames for a stream at the target FPS. + + Args: + stream: The stream configuration (callback, fps, etc.) + mount: The mount state containing client list and locks + stop_evt: Event to signal when to stop producing + duration: Frame duration in nanoseconds + """ + period = 1.0 / float(stream.fps) + next_t = time.monotonic() + + while not stop_evt.is_set(): + now = time.monotonic() + s = next_t - now + if s > 0: + time.sleep(s) + next_t += period + + try: + frame = stream.callback() frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) + fb = frame.tobytes() + except Exception: + logger.exception("grab failed for %s", stream.mount_point) + fb = b"" + + with mount.clients_lock: + clients = list(mount.clients) + + for c in clients: + GLib.idle_add(self._push_to_client, c, fb, duration, mount) + + with mount.clients_lock: + if not mount.clients: + break - # Convert to GStreamer buffer - buf = Gst.Buffer.new_allocate(None, frame.nbytes, None) - buf.fill(0, frame.tobytes()) - buf.duration = Gst.SECOND // self.rtsp_server.fps - buf.pts = self.rtsp_server.frame_count * buf.duration + def _push_to_client(self, client_entry: ClientState, frame_bytes: bytes, duration_ns: int, mount: MountState): + """Push a frame buffer to a specific client. - self.rtsp_server.frame_count += 1 - src.emit("push-buffer", buf) + Args: + client_entry: The client to push the frame to + frame_bytes: The frame data as bytes + duration_ns: Frame duration in nanoseconds + mount: The mount state (used for client removal on error) + """ + app = client_entry.appsrc + if app is None: + return + + buf = Gst.Buffer.new_allocate(None, len(frame_bytes), None) + buf.fill(0, frame_bytes) + fc = client_entry.frame_count + buf.pts = int(fc * duration_ns) + buf.duration = int(duration_ns) + client_entry.frame_count = fc + 1 + app.emit("push-buffer", buf) - return RTSPMediaFactory(self) + def _remove_client(self, mount: MountState, client: ClientState): + """Remove a client from a mount and stop the producer if no clients remain. + + Args: + mount: The mount state containing the client list and producer + client: The client entry to remove + """ + with mount.clients_lock: + mount.clients = [c for c in mount.clients if c is not client] + if not mount.clients and mount.producer: + thr, stop_evt = mount.producer + stop_evt.set() + thr.join(timeout=1.0) + mount.producer = None def __enter__(self): - """Context manager entry.""" self.start() return self def __exit__(self, exc_type, exc_val, exc_tb): - """Context manager exit.""" self.stop() diff --git a/test/test_rtsp.py b/test/test_rtsp.py new file mode 100644 index 0000000..005b07c --- /dev/null +++ b/test/test_rtsp.py @@ -0,0 +1,80 @@ +import unittest +import numpy as np +import time + +from framegrab.rtsp_server import RTSPServer +from framegrab.grabber import RTSPFrameGrabber +from framegrab.config import RTSPFrameGrabberConfig + +import time + +def generate_noise_frame(width: int, height: int) -> np.ndarray: + return np.random.randint(0, 255, (height, width, 3), dtype=np.uint8) + +class TestRTSP(unittest.TestCase): + def setUp(self): + """Set up test RTSP server and grabber.""" + self.port = 8554 + self.server = None + self.grabber = None + + def tearDown(self): + """Clean up resources.""" + if self.grabber: + self.grabber.release() + if self.server: + self.server.stop() + + def test_rtsp_server_to_grabber_integration(self): + """Test that RTSPFrameGrabber can successfully grab frames from RTSPServer.""" + # Create a static noise frame for testing + TEST_FRAME_WIDTH = 640 + TEST_FRAME_HEIGHT = 480 + FPS = 15.0 + MOUNT_POINT = '/test' + + # Create RTSP server with static frame callback + def frame_callback(): + return generate_noise_frame(TEST_FRAME_WIDTH, TEST_FRAME_HEIGHT) + + self.server = RTSPServer(port=self.port) + self.server.create_stream( + callback=frame_callback, + width=TEST_FRAME_WIDTH, + height=TEST_FRAME_HEIGHT, + fps=FPS, + mount_point=MOUNT_POINT + ) + self.server.start() + + # Create RTSP grabber to connect to the server + rtsp_url = f"rtsp://localhost:{self.port}{MOUNT_POINT}" + config = RTSPFrameGrabberConfig( + rtsp_url=rtsp_url, + keep_connection_open=True, + ) + self.grabber = RTSPFrameGrabber(config) + + # Grab a frame and validate it + frame = self.grabber.grab() + + # Assertions + self.assertIsInstance(frame, np.ndarray) + + test_frame_shape = (TEST_FRAME_HEIGHT, TEST_FRAME_WIDTH, 3) + self.assertEqual(frame.shape, test_frame_shape) + self.assertEqual(frame.dtype, np.uint8) + + # Verify we got a valid frame (not all zeros or corrupted) + self.assertGreater(frame.max(), 0) + self.assertLess(frame.min(), 255) + + # Verify that if we grab two separate frames, they are unique + plenty_of_time_for_next_frame_to_be_available = 1 / FPS * 2 + time.sleep(plenty_of_time_for_next_frame_to_be_available) + frame2 = self.grabber.grab() + assert not np.array_equal(frame, frame2), "The two captured frames were not unique. Is the server publishing correct frames at a correct FPS?" + + +if __name__ == "__main__": + unittest.main()