Skip to content

Commit

Permalink
maybe? maybe not :c
Browse files Browse the repository at this point in the history
  • Loading branch information
1majom committed Sep 3, 2024
1 parent 6c3dfdb commit 9e5ff00
Show file tree
Hide file tree
Showing 6 changed files with 154 additions and 603 deletions.
197 changes: 112 additions & 85 deletions good-try.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,15 @@
import os
import subprocess
import yaml
import json
from functools import partial
from time import sleep
from sys import exit # pylint: disable=redefined-builtin

from mininet.topo import Topo
from mininet.net import Mininet
from mininet.util import dumpNodeConnections, quietRun
from mininet.log import setLogLevel, info, error
from mininet.util import dumpNodeConnections
from mininet.log import setLogLevel, info
from mininet.cli import CLI
from mininet.node import Node, OVSSwitch
from mininet.node import Node
from mininet.link import TCLink
from mininet import log
import os
Expand All @@ -23,10 +21,10 @@
import numpy as np

my_debug = os.getenv("MY_DEBUG", False)
all_gas_no_brakes = os.getenv("NO_BRAKES", True)
printit= os.getenv("PRINTIT", True)
video_on= os.getenv("VIDEON_ON", False)
okos_komp_is_here = os.getenv("OKOS_KOMP", True)
all_gas_no_brakes= os.getenv("NO_BRAKES", False)
video_on= os.getenv("VIDEO_ON", False)
tls_verify= os.getenv("TLS_VERIFY", True)

def info(msg):
log.info(msg + '\n')
Expand All @@ -43,6 +41,7 @@ def relayid_to_ip(relayid):
else:
print("** Mopping up remaining mininet")
subprocess.call(['sudo', 'mn', '-c'], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
subprocess.call(['sudo', 'pkill', '-f','gst-launch'], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)

print("** Folding them needed binaries")
if my_debug:
Expand All @@ -56,34 +55,30 @@ def relayid_to_ip(relayid):

setLogLevel( 'info' )


net = Mininet( topo=None, waitConnected=True, link=partial(TCLink) )
net.staticArp()

switch = net.addSwitch('s1',failMode='standalone')
if not okos_komp_is_here:
with open("topo.yaml", 'r') as file:
config = yaml.safe_load(file)
else:
with open("topo.json", 'r') as file:
config = json.load(file)
with open("topo.yaml", 'r') as file:
config = yaml.safe_load(file)

original_api = config['origi_api']
relay_number = len(config['nodes'])
if okos_komp_is_here:
node_names = [item['name'] for item in config['nodes']]
edges = config['edges']
connections = []
for edge in edges:
src = edge['src']
dst = edge['dst']
src_index = node_names.index(src) + 1
dst_index = node_names.index(dst) + 1
latency = edge['attributes']['latency']
connection = {'node1': src_index, 'node2': dst_index, 'delay': latency}
connections.append(connection)
debug(f"I see {src} to {dst} at index {connection['node1']} and {connection['node2']} with latency {connection['delay']}ms")
config['delays'] = connections


print("** Sorting out the config")
node_names = [item['name'] for item in config['nodes']]
edges = config['edges']
connections = []
for edge in edges:
src = edge['node1']
dst = edge['node2']
src_index = node_names.index(src) + 1
dst_index = node_names.index(dst) + 1
latency = edge['attributes']['latency']
connection = {'node1': src_index, 'node2': dst_index, 'delay': latency}
connections.append(connection)
debug(f"I see {src} to {dst} at index {connection['node1']} and {connection['node2']} with latency {connection['delay']}ms")
edges = connections


print("** Baking fresh cert")
Expand All @@ -96,25 +91,27 @@ def relayid_to_ip(relayid):
env = os.environ.copy()
env['PATH'] = '/usr/local/go:/usr/local/go/bin:' + env['PATH']
subprocess.call(['./dev/cert2'], env=env, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
edges = config['delays']
# the different networks are:
# 10.0.x/24 - relay to relay connections there x is a counter
# 10.1.1/24 - api network
# 10.2.0/24 - api to host os connection (for docker)
# 10.3.0/24 - relay identifing ips, on the lo interface of relays
# 10.4.x/24 - pub and sub to relay connections, there x is a counter
# the first_hop_relay is the relay which the pub will use
# the last_hop_relay is the relay which the sub(s) will use (with 3 subs the third will fail, if sleep is higher than 0.2)

""" the different networks are:
- 10.0.x.0/24 - relay to relay connections where x is a counter
- 10.1.1.0/24 - api network
- 10.2.0.0/24 - api to host os connection (for docker)
- 10.3.0.0/24 - relay identifying ips, on the lo interface of the relays
- 10.4.x.0/24 - pub and sub to relay connections, where x is a counter
the first_hop_relay is the relay which the pub will use
the last_hop_relay is the relay which the sub(s) will use (with 3 subs the third will fail, if sleep is higher than 0.2)
"""

first_hop_relay = [(relayid_to_ip(item['relayid']), item['track']) for item in config['first_hop_relay']]
last_hop_relay = [(relayid_to_ip(item['relayid']), item['track']) for item in config['last_hop_relay']]

number_of_clients = len(last_hop_relay)+len(first_hop_relay)
relays = []
pubs = []
subs= []
k=1
# Create hosts


# ** Creating hosts
for i in range(relay_number):
host = net.addHost(f'h{k}', ip="")
host.cmd('ip addr add 10.3.0.%s/32 dev lo' % str((k)))
Expand All @@ -137,13 +134,11 @@ def relayid_to_ip(relayid):
k += 1




# Setting up full mesh network
# ** Setting up full mesh network
network_counter = 0
delay=None
# *** connecting pubs and subs
for i in range(relay_number):
# connecting pubs and subs
matching_pubs = [g for g, (ip, _) in enumerate(first_hop_relay) if ip.split('.')[-1] == str(i+1)]
for index in matching_pubs:
net.addLink( pubs[index][0],relays[i],
Expand All @@ -161,8 +156,9 @@ def relayid_to_ip(relayid):
subs[index][0].cmd(f'ip route add 10.3.0.{i+1}/32 via 10.5.{network_counter}.{2*index+2}')
debug(f'ip route add 10.3.0.{i+1}/32 via 10.5.{network_counter}.{2*index+2}')
network_counter += 1

# *** connecting relays to each other adding delays
for i in range(relay_number):
# connecting relays to each other adding delays
for j in range(i + 1, relay_number):
for edge in edges:
if i+1 == edge['node1'] and j+1 == edge['node2']:
Expand Down Expand Up @@ -200,99 +196,132 @@ def relayid_to_ip(relayid):
root.setIP( '10.2.0.99', intf=intf )


# Setting up "api network"
# *** Setting up "api network"
ip_counter = 1
net.addLink(
api,switch,params1={'ip': f"10.1.1.{ip_counter}/24"},
)
ip_counter += 1


for host in relays:
net.addLink(
host, switch, params1={'ip': f"10.1.1.{ip_counter}/24"},
)
ip_counter += 1


net.start()

if my_debug:
dumpNodeConnections(net.hosts)
info("pubs: " + str(pubs))
info("subs: " + str(subs))

template_for_relays=""
if original_api:
api.cmd('REDIS=10.2.0.99 ./dev/api &')
template_for_relays = (
template_for_relays = (
'RUST_LOG=debug RUST_BACKTRACE=0 '
'./target/debug/moq-relay --bind \'{bind}\' --api {api} --node \'{node}\' '
'--tls-cert ./dev/localhost.crt --tls-key ./dev/localhost.key '
'--tls-disable-verify --dev --original &'
' {tls_verify} --dev {origi} &'
)

origi_api_str=""
if config['api']=="origi":
origi_api_str="--original"
api.cmd('REDIS=10.2.0.99 ./dev/api --bind [::]:4442&')
else:
api.cmd('REDIS=10.2.0.99 ./dev/api --topo-path topo.yaml &')
template_for_relays = (
'RUST_LOG=debug RUST_BACKTRACE=0 '
'./target/debug/moq-relay --bind \'{bind}\' --api {api} --node \'{node}\' '
'--tls-cert ./dev/localhost.crt --tls-key ./dev/localhost.key '
'--tls-disable-verify --dev &'
)
if config['api']=="opti":
api.cmd('cd ../cdn-optimization; source env/bin/activate; python -m fastapi dev app/api.py --host 10.1.1.1 --port 4442 &')

# else:
# api.cmd('REDIS=10.2.0.99 ./dev/api --topo-path topo.yaml --bind [::]:4442 &')
# template_for_relays = (
# 'RUST_LOG=debug RUST_BACKTRACE=0 '
# './target/debug/moq-relay --bind \'{bind}\' --api {api} --node \'{node}\' '
# '--tls-cert ./dev/localhost.crt --tls-key ./dev/localhost.key '
# '--tls-disable-verify --dev &'
# )





# for some reason this is needed or the first relay wont reach the api
# (ffmpeg needs the 1s, gst can work with less)
sleep(1)

host_counter = 1

tls_verify_str = ""
if not tls_verify:
tls_verify_str = "--tls-disable-verify"

for h in relays:
ip_address = f'10.3.0.{host_counter}'
debug(f'Starting relay on {h} - {ip_address}')

h.cmd(template_for_relays.format(
host=h.name,
bind=f'{ip_address}:4443',
api=f'http://10.1.1.1',
node=f'https://{ip_address}:4443'
api=f'http://10.1.1.1:4442',
node=f'https://{ip_address}:4443',
tls_verify=tls_verify_str,
origi=origi_api_str
))
debug(template_for_relays.format(
host=h.name,
bind=f'{ip_address}:4443',
api=f'http://10.1.1.1',
node=f'https://{ip_address}:4443'
api=f'http://10.1.1.1:4442',
node=f'https://{ip_address}:4443',
tls_verify=tls_verify_str,
origi=origi_api_str


))

host_counter += 1


# the two sleeps are needed at that specific line, bc other way they would start and the exact same time,
# and the pub wouldnt connect to the relay, and the sub couldnt connect to the pub
# and the pub wouldn't connect to the relay, and the sub couldn't connect to the pub
sleep(0.5)
k=0
def get_video_duration(file_path):
command = ['ffprobe', '-v', 'error', '-show_entries', 'format=duration', '-of', 'default=noprint_wrappers=1:nokey=1', file_path]
output = subprocess.check_output(command).decode().strip()
duration = float(output)
return duration

max_video_duration = 0
max_resolution=300

sleep(1)
k=0
for (h,track) in pubs:
vidi_filenammm=track.split("_")[0]
track_duration = get_video_duration(f"./dev/{vidi_filenammm}.mp4")
if track_duration > max_video_duration:
max_video_duration = track_duration

resolution=track.split("_")[0].split("-")[1]
if int(resolution)>max_resolution:
max_resolution=int(resolution)

if config['mode'] == 'clock':
le_cmd=(f'xterm -hold -T "h{k}-pub" -e bash -c "RUST_LOG=info ./target/debug/moq-clock --publish --namespace {track} https://{first_hop_relay[k][0]}:4443 --tls-disable-verify" &')
else:
if config['mode'] == 'ffmpeg':
le_cmd=(f'xterm -hold -T "h{k}-pub" -e bash -c "ffmpeg -hide_banner -stream_loop -1 -re -i ./dev/{track}.mp4 -c copy -an -f mp4 -movflags cmaf+separate_moof+delay_moov+skip_trailer+frag_every_frame - '
le_cmd=(f'xterm -hold -T "h{k}-pub" -e bash -c "ffmpeg -hide_banner -stream_loop -1 -re -i ./dev/{vidi_filenammm}.mp4 -c copy -an -f mp4 -movflags cmaf+separate_moof+delay_moov+skip_trailer+frag_every_frame - '
f' | RUST_LOG=info ./target/debug/moq-pub --name {track} https://{first_hop_relay[k][0]}:4443 --tls-disable-verify" &')
else:
if config['mode'] == 'gst':
holder=" "
if my_debug:
holder=" -hold "
le_cmd=f'xterm {holder} -T "h{k}-pub" -e bash -c "export GST_PLUGIN_PATH="${{PWD}}/../moq-gst/target/debug${{GST_PLUGIN_PATH:+:$GST_PLUGIN_PATH}}:${{PWD}}/../6gxr-latency-clock"; gst-launch-1.0 -q -v -e filesrc location="./dev/{track}.mp4" ! decodebin ! videoconvert ! timestampoverlay ! videoconvert ! x264enc tune=zerolatency ! h264parse ! isofmp4mux name=mux chunk-duration=1 fragment-duration=1 ! moqsink tls-disable-verify=true url="https://{first_hop_relay[k][0]}:4443" namespace="{track}"; sleep 7 "&'
le_cmd=f'xterm {holder} -T "h{k}-pub" -e bash -c "export GST_PLUGIN_PATH="${{PWD}}/../moq-gst/target/debug${{GST_PLUGIN_PATH:+:$GST_PLUGIN_PATH}}:${{PWD}}/../6gxr-latency-clock"; gst-launch-1.0 -q -v -e filesrc location="./dev/{vidi_filenammm}.mp4" ! decodebin ! videoconvert ! timestampoverlay ! videoconvert ! x264enc tune=zerolatency ! h264parse ! isofmp4mux name=mux chunk-duration=1 fragment-duration=1 ! moqsink tls-disable-verify=true url="https://{first_hop_relay[k][0]}:4443" namespace="{track}"; sleep 5 "&'

debug(f'{h} - {le_cmd}')
debug(f'{net.hosts[k]} - {first_hop_relay[k][0]}')
debug(f'{h} - {first_hop_relay[k][0]}')
h.cmd(le_cmd)
sleep(0.2)
k+=1

# net.hosts[-2].cmd(f'xterm -e bash -c "ffmpeg -hide_banner -stream_loop -1 -re -i ./dev/bbb.mp4 -c copy -an -f mp4 -movflags cmaf+separate_moof+delay_moov+skip_trailer+frag_every_frame - | RUST_LOG=info ./target/debug/moq-pub --name bbb https://{first_hop_relay}:4443 --tls-disable-verify" &')


# if this is 1.5 or more it will cause problems
# around 0.7 needed
sleep(0.7)
Expand All @@ -308,7 +337,7 @@ def relayid_to_ip(relayid):
f' --tls-disable-verify | ffplay -window_title \'h{k}sub\' -x 360 -y 200 - "&')
else:
current_time = datetime.datetime.now().strftime("%Y%m%d%H%M%S")
filename = f"measurements/{current_time}_{h.name}_{track}"
filename = f"measurements/{track}_{current_time}_{h.name}"
le_sink="autovideosink"
if not video_on:
le_sink="fakesink"
Expand All @@ -324,27 +353,25 @@ def relayid_to_ip(relayid):
sleep(1)

if video_on:

if config['mode'] == 'gst':
sleep(2)

process_ids = subprocess.check_output(['xdotool', 'search', '--name', 'gst-launch']).decode().split()
for i, process_id in enumerate(process_ids):
sleep(0.2)

subprocess.call(['xdotool', 'windowmove', process_id, f'{i*360+50}', '0'])
subprocess.call(['xdotool', 'windowmove', process_id, f'{i*max_resolution+50}', '0'])
else:
for i in range(len(subs)):
sleep(0.2)
subprocess.call(['xdotool', 'search', '--name', f'h{i}sub', 'windowmove', f'{i*360+50}', '0'])
subprocess.call(['xdotool', 'search', '--name', f'h{i}sub', 'windowmove', f'{i*max_resolution+50}', '0'])

def get_video_duration(file_path):
command = ['ffprobe', '-v', 'error', '-show_entries', 'format=duration', '-of', 'default=noprint_wrappers=1:nokey=1', file_path]
output = subprocess.check_output(command).decode().strip()
duration = float(output)
return duration

track_duration = get_video_duration(f"./dev/{track}.mp4")

sleep(track_duration+5)
if not all_gas_no_brakes:
if all_gas_no_brakes:
sleep(max_video_duration+5)
else:
CLI( net )


Expand Down
Loading

0 comments on commit 9e5ff00

Please sign in to comment.