From 0a225cd3b9142009d9a9a00cf10a90edc6e7e028 Mon Sep 17 00:00:00 2001 From: junhuanchen Date: Wed, 13 Jan 2021 20:45:12 +0800 Subject: [PATCH] [update] Don't want to explain. --- ext_modules/_maix/_maix.c | 59 +++++++ ext_modules/_maix/example/test__maix.py | 4 +- ext_modules/_maix/pyCamera.c | 60 ------- maix/Video.py | 74 +++++---- maix/__init__.py | 5 +- maix/display.py | 86 +++++----- maix/rpycs.py | 209 ++++++++++++++++++++++++ maix/utils/__init__.py | 0 maix/utils/rtp_packet.py | 90 ++++++++++ maix/utils/rtsp_packet.py | 165 +++++++++++++++++++ maix/utils/video_stream.py | 64 ++++++++ setup.py | 6 +- tests/test_maix.py | 4 +- tests/test_rpyc.py | 62 +++++++ tox.ini | 2 + 15 files changed, 745 insertions(+), 145 deletions(-) create mode 100644 maix/rpycs.py create mode 100644 maix/utils/__init__.py create mode 100644 maix/utils/rtp_packet.py create mode 100644 maix/utils/rtsp_packet.py create mode 100644 maix/utils/video_stream.py create mode 100644 tests/test_rpyc.py diff --git a/ext_modules/_maix/_maix.c b/ext_modules/_maix/_maix.c index 284b89d..4473f7e 100644 --- a/ext_modules/_maix/_maix.c +++ b/ext_modules/_maix/_maix.c @@ -4,14 +4,39 @@ #define _VERSION_ "0.1" #define _NAME_ "_maix" +#include "jpeglib.h" +#include "string.h" + PyDoc_STRVAR(_maix_doc, "MaixPy Python3 library.\n"); static PyObject *_maix_help() { return PyUnicode_FromString(_maix_doc); } +PyDoc_STRVAR(_maix_rgb2jpg_doc, "rgb2jpg()\n\nConvert image(rgb888) bytes data to jpeg image bytes.\n"); +static PyObject *PyJpegCompress(char *inData, int width, int height, int channels, int color_space, int quality); +static PyObject *_maix_rgb2jpg(PyObject *self, PyObject *args) +{ + PyObject *bytes = NULL; + PyObject *inRGB = NULL; + + int width = 1, height = 1; + int channels = 3, color_space = JCS_RGB, quality = 75; + + if (!PyArg_ParseTuple(args, "Oii|iii", &inRGB, + &width, &height, &channels, &color_space, &quality)) + return NULL; + + char *rgb_data = PyBytes_AS_STRING(inRGB); + + bytes = PyJpegCompress(rgb_data, width, height, channels, color_space, quality); + + return bytes; +} + static PyMethodDef _maix_methods[] = { {"help", (PyCFunction)_maix_help, METH_NOARGS, _maix_doc}, + {"rgb2jpg", (PyCFunction)_maix_rgb2jpg, METH_VARARGS, _maix_rgb2jpg_doc}, {NULL} }; @@ -54,3 +79,37 @@ PyMODINIT_FUNC PyInit__maix(void) return module; } +static PyObject *PyJpegCompress(char *inData, int width, int height, int channels, int color_space, int quality) +{ + struct jpeg_compress_struct cinfo; + struct jpeg_error_mgr jerr; + long unsigned int outSize = 0; + uint8_t *outbuffer = NULL; + JSAMPROW row_pointer[1]; + + cinfo.err = jpeg_std_error(&jerr); + jpeg_create_compress(&cinfo); + jpeg_mem_dest(&cinfo, &outbuffer, &outSize); + cinfo.image_width = width; + cinfo.image_height = height; + cinfo.input_components = channels; // 3 / 1 + cinfo.in_color_space = color_space; // JCS_RGB / JCS_GRAYSCALE + jpeg_set_defaults(&cinfo); + jpeg_set_quality(&cinfo, quality, TRUE); // default 75 + jpeg_start_compress(&cinfo, TRUE); + int row_stride = width * 3; + while (cinfo.next_scanline < cinfo.image_height) + { + row_pointer[0] = (uint8_t *)&inData[cinfo.next_scanline * row_stride]; + (void)jpeg_write_scanlines(&cinfo, row_pointer, 1); + } + jpeg_finish_compress(&cinfo); + + PyObject *bytes = PyBytes_FromStringAndSize((const char *)outbuffer, outSize); + + if (NULL != outbuffer) + free(outbuffer), outbuffer = NULL; + + jpeg_destroy_compress(&cinfo); + return bytes; +} diff --git a/ext_modules/_maix/example/test__maix.py b/ext_modules/_maix/example/test__maix.py index f62dd23..b8d49d1 100644 --- a/ext_modules/_maix/example/test__maix.py +++ b/ext_modules/_maix/example/test__maix.py @@ -5,5 +5,5 @@ def test_import(): import _maix print(_maix.help()) - tmp = _maix.Camera() - print(tmp.rgb2jpg(b"\xff"*640*480*3)) + tmp = _maix.rgb2jpg(b"\xff"*4*6*3, 4, 6) + print(len(tmp)) diff --git a/ext_modules/_maix/pyCamera.c b/ext_modules/_maix/pyCamera.c index dea2aac..bf670c9 100644 --- a/ext_modules/_maix/pyCamera.c +++ b/ext_modules/_maix/pyCamera.c @@ -1,45 +1,6 @@ #include "_maix.h" -#include "jpeglib.h" - -#include "string.h" - -static PyObject *PyJpegCompress(char *inData, int width, int height, int channels, int color_space, int quality) -{ - struct jpeg_compress_struct cinfo; - struct jpeg_error_mgr jerr; - long unsigned int outSize = 0; - uint8_t *outbuffer = NULL; - JSAMPROW row_pointer[1]; - - cinfo.err = jpeg_std_error(&jerr); - jpeg_create_compress(&cinfo); - jpeg_mem_dest(&cinfo, &outbuffer, &outSize); - cinfo.image_width = width; - cinfo.image_height = height; - cinfo.input_components = channels; // 3 / 1 - cinfo.in_color_space = color_space; // JCS_RGB / JCS_GRAYSCALE - jpeg_set_defaults(&cinfo); - jpeg_set_quality(&cinfo, quality, TRUE); // default 75 - jpeg_start_compress(&cinfo, TRUE); - int row_stride = width; - while (cinfo.next_scanline < cinfo.image_height) - { - row_pointer[0] = (uint8_t *)&inData[cinfo.next_scanline * row_stride]; - (void)jpeg_write_scanlines(&cinfo, row_pointer, 1); - } - jpeg_finish_compress(&cinfo); - - PyObject *bytes = PyBytes_FromStringAndSize((const char *)outbuffer, outSize); - - if (NULL != outbuffer) - free(outbuffer), outbuffer = NULL; - - jpeg_destroy_compress(&cinfo); - return bytes; -} - /* Macros needed for Python 3 */ #ifndef PyInt_Check #define PyInt_Check PyLong_Check @@ -139,29 +100,8 @@ static PyObject *Camera_str(PyObject *object) return dev_desc; } -PyDoc_STRVAR(Camera_rgb2jpg_doc, "rgb2jpg()\n\nConvert image(rgb888) bytes data to jpeg image bytes.\n"); -static PyObject *Camera_rgb2jpg(CameraObject *self, PyObject *args) -{ - PyObject *bytes = NULL; - PyObject *inRGB = NULL; - - int width = self->width, height = self->height; - int channels = 3, color_space = JCS_RGB, quality = 75; - - if (!PyArg_ParseTuple(args, "O|iiiii", &inRGB, - &width, &height, &channels, &color_space, &quality)) - return NULL; - - char *rgb_data = PyBytes_AS_STRING(inRGB); - - bytes = PyJpegCompress(rgb_data, width, height, channels, color_space, quality); - - return bytes; -} - static PyMethodDef Camera_methods[] = { - {"rgb2jpg", (PyCFunction)Camera_rgb2jpg, METH_VARARGS, Camera_rgb2jpg_doc}, {"close", (PyCFunction)Camera_close, METH_NOARGS, Camera_close_doc}, {"__enter__", (PyCFunction)Camera_enter, METH_NOARGS, NULL}, {"__exit__", (PyCFunction)Camera_exit, METH_NOARGS, NULL}, diff --git a/maix/Video.py b/maix/Video.py index 41fdd51..6d667db 100644 --- a/maix/Video.py +++ b/maix/Video.py @@ -1,64 +1,80 @@ -class VideoCapture(): +class MaixVideo(): - def __init__(self, source="/dev/video", size=(640, 480)): - self.source = source + def __init__(self, size=(640, 480)): self.width, self.height = size def write(self): pass # for file def read(self): - return b'\x00\xFF\x00' * (self.width * self.height) + return b'\xFF\x00\x00' * (self.width * self.height) def capture(self): from PIL import Image - return Image.frombytes("RGB", (self.width, self.height), self.read()) + return Image.frombytes( + "RGB", (self.width, self.height), self.read()) def close(self): pass # for file - -camera = VideoCapture() +camera = MaixVideo() try: # use libmaix on v831 from libmaix import Camera - class V831VideoCapture(VideoCapture): + class V831MaixVideo(MaixVideo): - def __init__(self, source="/sipeed/v831", size=(480, 360)): + def __init__(self, source="/v831", size=(480, 360)): + super(V831MaixVideo, self).__init__(size) self.source = source - self.width, self.height = size self.cam = Camera(self.width, self.height) def read(self): - return self.cam.read() # bytes + return self.cam.read() def __del__(self): self.cam.close() - camera = V831VideoCapture() + camera = V831MaixVideo() +except Exception as e: + pass + +try: + from cv2 import VideoCapture + + class CvMaixVideo(MaixVideo): + + def __init__(self, source=0, size=(640, 480)): + super(CvMaixVideo, self).__init__(size) + self.source = source + self.cam = VideoCapture(0) + + def read(self): + ret, frame = self.cam.read() + if ret: + bgr = frame[..., ::-1] # bgr2rgb + return bgr.tobytes() # bytes + return None + + def __del__(self): + self.cam.release() + + camera = CvMaixVideo() except Exception as e: pass if __name__ == '__main__': - ''' - def test_cv(): - import cv2 - cap = cv2.VideoCapture(0) - print(cap) - while True: - ret, frame = cap.read() - print(ret, type(frame)) - if frame is not None: - cv2.imshow("Video", frame) - if cv2.waitKey(1) & 0xFF == ord('q'): - break - cap.release() - cv2.destroyAllWindows() - # test_cv() - ''' import display - # display.clear((255, 0, 0)) + display.clear((255, 0, 0)) display.show(camera.capture()) + # tmp = camera.read() + # import _maix + # frame = _maix.rgb2jpg(camera.rgbbuf, camera.width, camera.height) + # print(len(frame) // 1024, camera.width, camera.height) + # from PIL import Image + # from io import BytesIO + # img = Image.open(BytesIO(frame)) + # img.show() + \ No newline at end of file diff --git a/maix/__init__.py b/maix/__init__.py index 238ad50..f1130c0 100644 --- a/maix/__init__.py +++ b/maix/__init__.py @@ -1,4 +1,5 @@ from .Video import camera -from .import display +from .import display, rpycs + +__all__ = ['display', 'Video', 'camera', 'rpycs'] -__all__ = ['display', 'Video', 'camera'] diff --git a/maix/display.py b/maix/display.py index b5d3dec..f7469b6 100644 --- a/maix/display.py +++ b/maix/display.py @@ -1,6 +1,7 @@ import shutil import io +import os from PIL import Image, ImageShow @@ -16,35 +17,40 @@ def get_command_ex(self, file, **options): except ModuleNotFoundError as e: pass +# try: +# from _maix import Display +# class dispViewer(ImageShow.UnixViewer): +# def get_command_ex(self, file, **options): +# print(file, **options) +# command = executable = "" +# return command, executable + +# def show_file(self, file, **options): +# print(file, **options) +# ImageShow.register(dispViewer, 0) +# except ModuleNotFoundError as e: +# pass +# except Exception as e: +# pass + +# options +clear_output = True # for jupyter +sync_show, local_show = True, True + try: - # use libmaix on v831 - from libmaix import disp - - class dispViewer(ImageShow.UnixViewer): - def get_command_ex(self, file, **options): - print(file, **options) - command = executable = "" - return command, executable - - def show_file(self, file, **options): - print(file, **options) - ImageShow.register(dispViewer, 0) -except ModuleNotFoundError as e: - pass + __width__, __height__ = (240, 240) + env = os.environ + __width__, __height__ = (env['_MAIX_WIDTH_'], env['_MAIX_HEIGHT_']) except Exception as e: - pass - -# TODO display size from linux env -clear_output = True -local_show = False -sync_show = True -# jpeg_quality = 95 + print('[display] tips: os.environ not _MAIX_WIDTH_ or _MAIX_HEIGHT_.') +finally: + __display__ = Image.new("RGB", (__width__, __height__), (255, 255, 255)) -__width__, __height__ = (240, 240) -__images__, __show__ = None, False # jupyter -__display__ = Image.new("RGB", (__width__, __height__)) +def tobytes(): + global __display__ + return __display__.tobytes() -def resize(size=(640, 480), update=False): +def set_window(size=(640, 480), update=False): global __display__, __width__, __height__ __width__, __height__ = size if update: @@ -56,9 +62,15 @@ def __thumbnail__(im): # print(w, __width__, h, __height__) im.thumbnail((__display__.width, __display__.height)) +def fill(box=(0, 0), color=(0, 0, 0)): + global __display__ + if len(box) == 2: + box = box + __display__.size + __display__.paste(color, box) + def show(im=None, box=(0, 0)): - global __display__, __show__, __images__, local_show, sync_show # , jpeg_quality + global __display__, local_show, sync_show if local_show: if isinstance(im, bytes): @@ -71,31 +83,11 @@ def show(im=None, box=(0, 0)): __display__.show() if sync_show: - try: - # __image__ = io.BytesIO() - # __display__.save(__image__, format='jpeg', quality=jpeg_quality) - # __images__.append(__image__) - if isinstance(im, bytes): - __images__.append(im) - elif isinstance(im, Image.Image): - __images__.append(im.tobytes()) - except Exception as e: - # print(e) - __images__ = None - __show__ = True - print(end='') # Give it time - - -def fill(box=(0, 0), color=(0, 0, 0)): - global __display__ - if len(box) == 2: - box = box + __display__.size - __display__.paste(color, box) + pass # send def clear(c=(0, 0, 0)): fill(color=c) - show() if __name__ == '__main__': diff --git a/maix/rpycs.py b/maix/rpycs.py new file mode 100644 index 0000000..f0430e3 --- /dev/null +++ b/maix/rpycs.py @@ -0,0 +1,209 @@ +from socketserver import BaseRequestHandler, TCPServer +import socket +import time + +from threading import Thread + +from rpyc.utils.server import ThreadedServer +from rpyc.core.service import SlaveService + +from .utils.video_stream import VideoStream +from .utils.rtsp_packet import RTSPPacket +from .utils.rtp_packet import RTPPacket + + +class Server: + FRAME_PERIOD = 1000 // VideoStream.DEFAULT_FPS # in milliseconds + SESSION_ID = '000001' + DEFAULT_CHUNK_SIZE = 4 * 1024 + + # for allowing simulated non-blocking operations + # (useful for keyboard break) + RTSP_SOFT_TIMEOUT = 100 # in milliseconds + + class STATE: + INIT = 0 + PAUSED = 1 + PLAYING = 2 + FINISHED = 3 + TEARDOWN = 4 + + def __init__(self: object): + self._video_stream: Union[None, VideoStream] = None + self._rtp_send_thread: Union[None, Thread] = None + self._rtsp_connection: Union[None, socket.socket] = None + self._rtp_socket: Union[None, socket.socket] = None + self._client_address: (str, int) = None + self.server_state: int = self.STATE.INIT + + def __del__(self): + if self._rtsp_connection: + self._rtsp_connection.close() + self._rtsp_connection = None + if self._video_stream: + self._video_stream.close() + self._video_stream = None + if self._rtp_socket: + self._rtp_socket.close() + self._rtp_socket = None + + def _rtsp_recv(self, size=DEFAULT_CHUNK_SIZE) -> bytes: + recv = None + while True: + try: + recv = self._rtsp_connection.recv(size) + break + except socket.timeout: + continue + print(f"Received from client: {repr(recv)}") + return recv + + def _rtsp_send(self, data: bytes) -> int: + print(f"Sending to client: {repr(data)}") + return self._rtsp_connection.send(data) + + def _get_rtsp_packet(self) -> RTSPPacket: + return RTSPPacket.from_request(self._rtsp_recv()) + + def _wait_connection(self, sock, addr): + self._rtsp_connection, self._client_address = sock, addr + self._rtsp_connection.settimeout(self.RTSP_SOFT_TIMEOUT/1000.) + print( + f"Accepted connection from {self._client_address[0]}:{self._client_address[1]}") + + def _wait_setup(self): + if self.server_state != self.STATE.INIT: + raise Exception('server is already setup') + while True: + packet = self._get_rtsp_packet() + if packet.request_type == RTSPPacket.SETUP: + self.server_state = self.STATE.PAUSED + print('State set to PAUSED') + self._client_address = self._client_address[0], packet.rtp_dst_port + self._setup_rtp(packet.video_file_path) + self._send_rtsp_response(packet.sequence_number) + break # will exit rtsp + + def _start_rtp_send_thread(self): + self._rtp_send_thread = Thread(target=self._handle_video_send) + self._rtp_send_thread.setDaemon(True) + self._rtp_send_thread.start() + + def _setup_rtp(self, video_file_path: str): + print(f"Opening up video stream for file {video_file_path}") + self._video_stream = VideoStream(video_file_path) + print('Setting up RTP socket...') + self._rtp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + self._start_rtp_send_thread() + + def handle_rtsp_requests(self): + print("Waiting for RTSP requests...") + # main thread will be running here most of the time + while True: + packet = self._get_rtsp_packet() + # assuming state will only ever be PAUSED or PLAYING at this point + if packet.request_type == RTSPPacket.PLAY: + if self.server_state == self.STATE.PLAYING: + print('Current state is already PLAYING.') + continue + self.server_state = self.STATE.PLAYING + print('State set to PLAYING.') + elif packet.request_type == RTSPPacket.PAUSE: + if self.server_state == self.STATE.PAUSED: + print('Current state is already PAUSED.') + continue + self.server_state = self.STATE.PAUSED + print('State set to PAUSED.') + elif packet.request_type == RTSPPacket.TEARDOWN: + print('Received TEARDOWN request, shutting down...') + self._send_rtsp_response(packet.sequence_number) + self.__del__() + self.server_state = self.STATE.TEARDOWN + # for simplicity's sake, caught on main_server + # raise ConnectionError('teardown requested') + return None + else: + # will never happen, since exception is raised inside `parse_rtsp_request()` + # raise InvalidRTSPRequest() + pass + self._send_rtsp_response(packet.sequence_number) + + def _send_rtp_packet(self, packet: bytes): + to_send = packet[:] + while to_send: + try: + if self._rtp_socket: + self._rtp_socket.sendto( + to_send[:self.DEFAULT_CHUNK_SIZE], self._client_address) + except socket.error as e: + print(f"failed to send rtp packet: {e}") + return + # trim bytes sent + to_send = to_send[self.DEFAULT_CHUNK_SIZE:] + + def _handle_video_send(self): + print( + f"Sending video to {self._client_address[0]}:{self._client_address[1]}") + while True: + if self.server_state == self.STATE.TEARDOWN: + return + if self.server_state != self.STATE.PLAYING: + time.sleep(0.5) # diminish cpu hogging + continue + if self._video_stream.current_frame_number >= VideoStream.VIDEO_LENGTH-1: # frames are 0-indexed + print('Reached end of file.') + self.server_state = self.STATE.FINISHED + return + frame = self._video_stream.get_next_frame() + frame_number = self._video_stream.current_frame_number + rtp_packet = RTPPacket( + payload_type=RTPPacket.TYPE.MJPEG, + sequence_number=frame_number, + timestamp=frame_number*self.FRAME_PERIOD, + payload=frame + ) + # print(f"Sending packet #{frame_number}") + # print('Packet header:') + # rtp_packet.print_header() + packet = rtp_packet.get_packet() + self._send_rtp_packet(packet) + time.sleep(self.FRAME_PERIOD/1000.) + + def _send_rtsp_response(self, sequence_number: int): + response = RTSPPacket.build_response(sequence_number, self.SESSION_ID) + self._rtsp_send(response.encode()) + print('Sent response to client.') + + +class _Rtsp_(BaseRequestHandler): + def handle(self): # from BaseRequestHandler + try: + s = Server() + s._wait_connection(self.request, self.client_address) + s._wait_setup() + s.handle_rtsp_requests() # keep rtp udp + except BrokenPipeError as e: + print(e) + + +def start(HostName='0.0.0.0', RtspPort=18811, RpycPort=18812): + print(__file__, 'start', HostName, RtspPort, RpycPort) + + class RtspServerThread(Thread): + server = TCPServer((HostName, RtspPort), _Rtsp_) + + def run(self): + __class__.server.serve_forever() + + rtsp_server = RtspServerThread() + rtsp_server.start() + + rpyc_server = ThreadedServer( + SlaveService, hostname=HostName, port=RpycPort, reuse_addr=True) + rpyc_server.start() + + rtsp_server.server.shutdown() # close rtsp_server + + +if __name__ == '__main__': + start() diff --git a/maix/utils/__init__.py b/maix/utils/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/maix/utils/rtp_packet.py b/maix/utils/rtp_packet.py new file mode 100644 index 0000000..ec45936 --- /dev/null +++ b/maix/utils/rtp_packet.py @@ -0,0 +1,90 @@ +class InvalidPacketException(Exception): + pass + + +class RTPPacket: + # default header info + HEADER_SIZE = 12 # bytes + VERSION = 0b10 # 2 bits -> current version 2 + PADDING = 0b0 # 1 bit + EXTENSION = 0b0 # 1 bit + CC = 0x0 # 4 bits + MARKER = 0b0 # 1 bit + SSRC = 0x00000000 # 32 bits + + class TYPE: + MJPEG = 26 + + def __init__( + self, + payload_type: int = None, + sequence_number: int = None, + timestamp: int = None, + payload: bytes = None): + + self.payload = payload + self.payload_type = payload_type + self.sequence_number = sequence_number + self.timestamp = timestamp + + # b0 -> v0 v1 p x c0 c1 c2 c3 + zeroth_byte = (self.VERSION << 6) | (self.PADDING << 5) | (self.EXTENSION << 4) | self.CC + # b1 -> m pt0 pt1 pt2 pt3 pt4 pt5 pt6 + first_byte = (self.MARKER << 7) | self.payload_type + # b2 -> s0 s1 s2 s3 s4 s5 s6 s7 + second_byte = self.sequence_number >> 8 + # b3 -> s8 s9 s10 s11 s12 s13 s14 s15 + third_byte = self.sequence_number & 0xFF + # b4~b7 -> timestamp + fourth_to_seventh_bytes = [ + (self.timestamp >> shift) & 0xFF for shift in (24, 16, 8, 0) + ] + # b8~b11 -> ssrc + eigth_to_eleventh_bytes = [ + (self.SSRC >> shift) & 0xFF for shift in (24, 16, 8, 0) + ] + self.header = bytes(( + zeroth_byte, + first_byte, + second_byte, + third_byte, + *fourth_to_seventh_bytes, + *eigth_to_eleventh_bytes, + )) + + @classmethod + def from_packet(cls, packet: bytes): + if len(packet) < cls.HEADER_SIZE: + raise InvalidPacketException(f"The packet {repr(packet)} is invalid") + + header = packet[:cls.HEADER_SIZE] + payload = packet[cls.HEADER_SIZE:] + + # b1 -> m pt0 ... pt6 + # i.e. payload type is whole byte except first bit + payload_type = header[1] & 0x7F + # b2 -> s0 ~ s7 + # b3 -> s8 ~ s15 + # i.e. sequence number is b2<<8 | b3 + sequence_number = header[2] << 8 | header[3] + # b4 ~ b7 -> t0 ~ t31 + timestamp = 0 + for i, b in enumerate(header[4:8]): + timestamp = timestamp | b << (3 - i) * 8 + + return cls( + payload_type, + sequence_number, + timestamp, + payload + ) + + def get_packet(self) -> bytes: + return bytes((*self.header, *self.payload)) + + def print_header(self): + # print header without SSRC + for i, by in enumerate(self.header[:8]): + s = ' '.join(f"{by:08b}") + # break line after the third and seventh bytes + print(s, end=' ' if i not in (3, 7) else '\n') diff --git a/maix/utils/rtsp_packet.py b/maix/utils/rtsp_packet.py new file mode 100644 index 0000000..d2ede92 --- /dev/null +++ b/maix/utils/rtsp_packet.py @@ -0,0 +1,165 @@ +import re +from typing import Optional + + +class InvalidRTSPRequest(Exception): + pass + + +class RTSPPacket: + RTSP_VERSION = 'RTSP/1.0' + + INVALID = -1 + SETUP = 'SETUP' + PLAY = 'PLAY' + PAUSE = 'PAUSE' + TEARDOWN = 'TEARDOWN' + RESPONSE = 'RESPONSE' + + def __init__( + self, + request_type, + video_file_path: Optional[str] = None, + sequence_number: Optional[int] = None, + dst_port: Optional[int] = None, + session_id: Optional[str] = None): + self.request_type = request_type + self.video_file_path = video_file_path + self.sequence_number = sequence_number + self.session_id = session_id + + # if request_type SETUP + self.rtp_dst_port = dst_port + + def __str__(self): + return (f"RTSPPacket({self.request_type}, " + f"{self.video_file_path}, " + f"{self.sequence_number}, " + f"{self.rtp_dst_port}, " + f"{self.session_id})") + + @classmethod + def from_response(cls, response: bytes): + # only response format implemented, taken from server class: + # """ + # 200 OK\r\n + # CSeq: \r\n + # Session: \r\n + # """ + match = re.match( + r"(?PRTSP/\d+.\d+) 200 OK\r?\n" + r"CSeq: (?P\d+)\r?\n" + r"Session: (?P\d+)\r?\n", + response.decode() + ) + + if match is None: + raise Exception(f"failed to parse RTSP response: {response}") + + g = match.groupdict() + + # not used, defaults to 1.0 + # rtsp_version = g.get('rtsp_version') + sequence_number = g.get('sequence_number') + session_id = g.get('session_id') + + try: + sequence_number = int(sequence_number) + except (ValueError, TypeError): + raise Exception(f"failed to parse sequence number: {response}") + + if session_id is None: + raise Exception(f"failed to parse session id: {response}") + + return cls( + request_type=RTSPPacket.RESPONSE, + sequence_number=sequence_number, + session_id=session_id + ) + + @classmethod + def build_response(cls, sequence_number: int, session_id: str): + response = '\r\n'.join(( + f"{cls.RTSP_VERSION} 200 OK", + f"CSeq: {sequence_number}", + f"Session: {session_id}", + )) + '\r\n' + return response + + @classmethod + def from_request(cls, request: bytes): + # loosely follows actual rtsp protocol, considering only SETUP, PLAY, PAUSE, and TEARDOWN + # https://en.wikipedia.org/wiki/Real_Time_Streaming_Protocol + match = re.match( + r"(?P\w+) rtsp://(?P\S+) (?PRTSP/\d+.\d+)\r?\n" + r"CSeq: (?P\d+)\r?\n" + r"(Range: (?P\w+=\d+-\d+\r?\n))?" + r"(Transport: .*client_port=(?P\d+).*\r?\n)?" # in case of SETUP request + r"(Session: (?P\d+)\r?\n)?", + request.decode() + ) + + if match is None: + raise InvalidRTSPRequest(f"failed to parse request: {request}") + + g = match.groupdict() + request_type = g.get('request_type') + + if request_type not in (RTSPPacket.SETUP, + RTSPPacket.PLAY, + RTSPPacket.PAUSE, + RTSPPacket.TEARDOWN): + raise InvalidRTSPRequest(f"invalid request type: {request}") + + video_file_path = g.get('video_file_path') + # not used, defaults to `RTSPPacket.RTSP_VERSION` + # rtsp_version = g.get('rtsp_version') + sequence_number = g.get('sequence_number') + dst_port = g.get('dst_port') + session_id = g.get('session_id') + + if request_type == RTSPPacket.SETUP: + try: + dst_port = int(dst_port) + except (ValueError, TypeError): + raise InvalidRTSPRequest(f"failed to parse RTP port") + try: + sequence_number = int(sequence_number) + except (ValueError, TypeError): + raise InvalidRTSPRequest(f"failed to parse sequence number: {request}") + + return cls( + request_type, + video_file_path, + sequence_number, + dst_port, + session_id + ) + + def to_request(self) -> bytes: + # loosely follows actual rtsp protocol, considering only SETUP, PLAY, PAUSE, and TEARDOWN + # https://en.wikipedia.org/wiki/Real_Time_Streaming_Protocol + if any((attr is None for attr in (self.request_type, + self.sequence_number, + self.session_id))): + raise InvalidRTSPRequest('missing one attribute of: `request_type`, `sequence_number`, `session_id`') + + if self.request_type in (self.INVALID, self.RESPONSE): + raise InvalidRTSPRequest(f"invalid request type: {self}") + + request_lines = [ + f"{self.request_type} rtsp://{self.video_file_path} {self.RTSP_VERSION}", + f"CSeq: {self.sequence_number}", + ] + if self.request_type == self.SETUP: + if self.rtp_dst_port is None: + raise InvalidRTSPRequest(f"missing RTP destination port: {self}") + request_lines.append( + f"Transport: RTP/UDP;client_port={self.rtp_dst_port}" + ) + else: + request_lines.append( + f"Session: {self.session_id}" + ) + request = '\r\n'.join(request_lines) + '\r\n' + return request.encode() diff --git a/maix/utils/video_stream.py b/maix/utils/video_stream.py new file mode 100644 index 0000000..56c77db --- /dev/null +++ b/maix/utils/video_stream.py @@ -0,0 +1,64 @@ + +import sys +import _maix +from maix import camera, display + +class VideoStream: + FRAME_HEADER_LENGTH = 5 + DEFAULT_IMAGE_SHAPE = (380, 280) + VIDEO_LENGTH = 500 + DEFAULT_FPS = 30 # 24 fps + + # if it's present at the end of chunk, + # it's the last chunk for current jpeg (end of frame) + JPEG_EOF = b'\xff\xd9' + + def __init__(self, file_path: str): + # for simplicity, mjpeg is assumed to be on working directory + print(file_path) + if file_path == '/dev/display': + self.TYPE = 0 # display + self._stream = None + __class__.VIDEO_LENGTH = sys.maxsize + elif file_path == '/dev/camera': + self._stream = camera + self.TYPE = 1 # camera + __class__.VIDEO_LENGTH = sys.maxsize + else: + self._stream = open(file_path, 'rb') + self.TYPE = 2 # file + # frame number is zero-indexed + # after first frame is sent, this is set to zero + self.current_frame_number = -1 + + def close(self): + if self._stream: + self._stream.close() + + def get_next_frame(self) -> bytes: + # sample video file format is as follows: + # - 5 digit integer `frame_length` written as 5 bytes, one for each digit (ascii) + # - `frame_length` bytes follow, which represent the frame encoded as a JPEG + # - repeat until EOF + if self.TYPE == 0: + # import random + # display.clear((random.randint(10, 100), 0, 0)) + frame = _maix.rgb2jpg(display.tobytes(), display.__width__, display.__height__) + frame_length = len(frame) + elif self.TYPE == 1: + # import io + # img = self._stream.capture() + # tmp = io.BytesIO() + # img.save(tmp, format='jpeg', quality=75) + # frame = tmp.getvalue() + frame = _maix.rgb2jpg(self._stream.read(), self._stream.width, self._stream.height) + frame_length = len(frame) + elif self.TYPE == 2: + try: + frame_length = self._stream.read(self.FRAME_HEADER_LENGTH) + except ValueError: + raise EOFError + frame_length = int(frame_length.decode()) + frame = self._stream.read(frame_length) + self.current_frame_number += 1 + return bytes(frame) diff --git a/setup.py b/setup.py index 51312ca..924facb 100644 --- a/setup.py +++ b/setup.py @@ -22,7 +22,7 @@ def get_srcs(path): _maix_module = Extension('_maix', include_dirs=['ext_modules/_maix/include'], sources=get_srcs('ext_modules/_maix'), libraries=['jpeg']) -libi2c_module = Extension('pylibi2c', include_dirs=['ext_modules/libi2c/src'], sources=get_srcs('ext_modules/libi2c/src')) +# libi2c_module = Extension('pylibi2c', include_dirs=['ext_modules/libi2c/src'], sources=get_srcs('ext_modules/libi2c/src')) setup( name='MaixPy3', @@ -33,10 +33,10 @@ def get_srcs(path): url='https://github.com/sipeed/MaixPy3', description="MaixPy Python3 library", long_description=open('README.md').read(), - install_requires=["Pillow"], + install_requires=["Pillow", "pexpect", "rpyc"], ext_modules=[ _maix_module, - libi2c_module, + # libi2c_module, ], packages = find_packages(), # find __init__.py packages classifiers=[ diff --git a/tests/test_maix.py b/tests/test_maix.py index 429ac34..7055bd6 100644 --- a/tests/test_maix.py +++ b/tests/test_maix.py @@ -2,7 +2,7 @@ import pytest def test_import(): - from maix import display, Video + from maix import display, Video, rpycs def test_display(): from maix import display @@ -24,4 +24,4 @@ def test_image(): def test_camera(): from maix import camera, display - display.show(camera.capture()) \ No newline at end of file + display.show(camera.capture()) diff --git a/tests/test_rpyc.py b/tests/test_rpyc.py new file mode 100644 index 0000000..6ec9067 --- /dev/null +++ b/tests/test_rpyc.py @@ -0,0 +1,62 @@ +import os +import rpyc +import tempfile +from rpyc.utils.server import ThreadedServer, ThreadPoolServer +from rpyc import SlaveService +import unittest + + +class Test_ThreadedServer(unittest.TestCase): + + def setUp(self): + self.server = ThreadedServer(SlaveService, port=18878, auto_register=False) + self.server.logger.quiet = False + self.server._start_in_thread() + + def tearDown(self): + while self.server.clients: + pass + self.server.close() + + def test_connection(self): + conn = rpyc.classic.connect("localhost", port=18878) + print(conn.modules.sys) + print(conn.modules["xml.dom.minidom"].parseString("")) + conn.execute("x = 5") + self.assertEqual(conn.namespace["x"], 5) + self.assertEqual(conn.eval("1+x"), 6) + conn.close() + + +class Test_ThreadedServerOverUnixSocket(unittest.TestCase): + + def setUp(self): + self.socket_path = tempfile.mktemp() + self.server = ThreadedServer(SlaveService, socket_path=self.socket_path, auto_register=False) + self.server.logger.quiet = False + self.server._start_in_thread() + + def tearDown(self): + self.server.close() + os.remove(self.socket_path) + + def test_connection(self): + c = rpyc.classic.unix_connect(self.socket_path) + print(c.modules.sys) + print(c.modules["xml.dom.minidom"].parseString("")) + c.execute("x = 5") + self.assertEqual(c.namespace["x"], 5) + self.assertEqual(c.eval("1+x"), 6) + c.close() + + +class Test_ThreadPoolServer(Test_ThreadedServer): + + def setUp(self): + self.server = ThreadPoolServer(SlaveService, port=18878, auto_register=False) + self.server.logger.quiet = False + self.server._start_in_thread() + + +if __name__ == "__main__": + unittest.main() diff --git a/tox.ini b/tox.ini index 134ac90..e088d5b 100644 --- a/tox.ini +++ b/tox.ini @@ -13,3 +13,5 @@ deps = pytest scripttest Pillow + pexpect + rpyc \ No newline at end of file