From b1751e1349c897e5b8d21271b3fee73dd567260c Mon Sep 17 00:00:00 2001 From: Sachin Singh <152496895+SachinSingh008@users.noreply.github.com> Date: Mon, 21 Jul 2025 19:46:18 +0530 Subject: [PATCH] Create app.py MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Yes—it's absolutely possible to integrate a Basler industrial camera via pypylon into a Streamlit‑webrtc app, but it’ll require a bit of custom setup. The key thing to understand is: Streamlit‑webrtc expects frames to arrive through its video_frame_callback or VideoProcessor.recv(), which takes care of RTP/WebRTC streaming. --- streamlit_webrtc/app.py | 65 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 65 insertions(+) create mode 100644 streamlit_webrtc/app.py diff --git a/streamlit_webrtc/app.py b/streamlit_webrtc/app.py new file mode 100644 index 00000000..ce908de5 --- /dev/null +++ b/streamlit_webrtc/app.py @@ -0,0 +1,65 @@ +import streamlit as st +from streamlit_webrtc import webrtc_streamer, VideoProcessorBase +import av +import cv2 +from threading import Thread +import queue +import numpy as np +from pypylon import pylon + +# Create a queue to hold frames +frame_queue = queue.Queue(maxsize=1) + +# Background thread to continuously grab frames from the Basler camera +def camera_loop(): + try: + camera = pylon.InstantCamera(pylon.TlFactory.GetInstance().CreateFirstDevice()) + camera.Open() + camera.StartGrabbing(pylon.GrabStrategy_LatestImageOnly) + + while camera.IsGrabbing(): + grab_result = camera.RetrieveResult(5000, pylon.TimeoutHandling_ThrowException) + + if grab_result.GrabSucceeded(): + frame = grab_result.Array # NumPy array + if not frame_queue.full(): + frame_queue.queue.clear() # Drop old frame if any + frame_queue.put(frame) + grab_result.Release() + except Exception as e: + print(f"[Camera Loop Error] {e}") + +# Start the camera loop in the background +Thread(target=camera_loop, daemon=True).start() + +# Define the WebRTC video processor +class BaslerVideoProcessor(VideoProcessorBase): + def recv(self, frame: av.VideoFrame) -> av.VideoFrame: + try: + image = frame_queue.get_nowait() + except queue.Empty: + return frame # return original frame if no camera frame available + + # Optional: you can apply OpenCV operations here + image = cv2.putText( + image.copy(), + "From Basler", + (10, 30), + cv2.FONT_HERSHEY_SIMPLEX, + 1, + (0, 255, 0), + 2, + cv2.LINE_AA, + ) + + return av.VideoFrame.from_ndarray(image, format="bgr24") + +# Streamlit UI +st.title("Basler Camera Stream via streamlit-webrtc") + +webrtc_streamer( + key="basler", + video_processor_factory=BaslerVideoProcessor, + media_stream_constraints={"video": True, "audio": False}, + async_processing=True, +)