-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathRtpServer.py
More file actions
119 lines (93 loc) · 3.57 KB
/
RtpServer.py
File metadata and controls
119 lines (93 loc) · 3.57 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
from tornado.ioloop import PeriodicCallback
from RtpFrameGenerator import RtpPacket, RtpFrameGenerator
import socket
class RtpServer:
"""
RTP Server
Deals with publishing rtp datagrams to clients
"""
def __init__(self, address="0.0.0.0"):
self._sockets = None
self._address = address
self._rtp_pub_ports = range(8888, 8889)
# Frame provider
self._stream = None
# Frame generator
self._frame_generator = PeriodicCallback(self._gen_rtp_frame, 40)
# Maps from some key to (address,port) pairs
self._destinations = {}
self._sockets = None
self.init_sockets()
def init_sockets(self):
try:
sock_primary = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock_primary.bind((self._address, self._rtp_pub_ports.start))
sock_secondary = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock_secondary.bind((self._address, self._rtp_pub_ports.stop))
self._sockets = (sock_primary, sock_secondary)
return True
except OSError as e:
return False
def get_server_ports(self):
return self._rtp_pub_ports
def set_stream(self, stream):
self._stream = stream
# Start RTP streaming
def start(self):
self._frame_generator.start()
# Stop RTP streaming
def stop(self):
self._frame_generator.stop()
def add_destination(self, key, dest):
self._destinations[key] = dest
def remove_destination(self, key, dest):
if key in self._destinations:
self._destinations.pop(key)
# Returns a list of pairs (address, port)
def _get_rtp_destinations(self):
result = []
# Add own addresses
# if self._rtp_pub_ports is not None and self._local_address is not None:
# for port in self._rtp_pub_ports:
# result.append((self._local_address, port))
for key, dest in self._destinations.items():
result.append(dest)
return result
def close_sockets(self):
for sock in self._sockets:
sock.close()
self._sockets = None
def sockets_invalid(self):
return self._sockets is None
def _publish_rtp_frame(self, rtp_packet):
destinations = self._get_rtp_destinations()
data_raw = rtp_packet.raw_packet
data_len = len(data_raw)
if data_len == 0:
return
if self._sockets is None or len(self._sockets) == 0:
return
for address in destinations:
try:
sent_len = self._sockets[0].sendto(data_raw, address)
if sent_len < 0:
print("System error in sendto %s" % address)
elif sent_len < data_len:
print("Sent %d of %d to %s" % (sent_len, data_len, address))
except OSError as e:
# TODO: Switch to NetInit state
print("OS Exception: %s" % str(e))
self.close_sockets()
def _restart_stream(self):
pass
# Publish RTP frame to all clients
def _gen_rtp_frame(self):
if self.sockets_invalid():
self.init_sockets()
if self._stream is None or not isinstance(self._stream, RtpFrameGenerator):
raise Exception("RtpServer has invalid RTP Frame generator")
rtp_packet = self._stream.next_packet()
if rtp_packet is None:
raise Exception("RtpServer got invalid rtp packet")
self._restart_stream()
self._publish_rtp_frame(rtp_packet)