Skip to content

Commit

Permalink
adding clock measurement, making baseline
Browse files Browse the repository at this point in the history
  • Loading branch information
1majom committed Oct 3, 2024
1 parent 2e2559e commit 225fe7e
Show file tree
Hide file tree
Showing 5 changed files with 110 additions and 91 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
This project tries to put the very well written Media over QUIC implementation, moq-rs into a custom topology inside mininet.
The project is closely connected to [Zotyamester/cdn-optimizer](https://github.com/Zotyamester/cdn-optimization) (later referenced as cdn-opti) which can be used as an alternative api instead of the moq-api provided by the original project. The main difference between these that with moq-api the subscriber's relay will always get the publisher's relay without any inbetween relays, only using one link from our relay mesh network. The cdn-opti instead will use the provided costs to calculate a more optimal route while using more then 1 links.

For the whole project to work on a fresh **Ubuntu 22.04** machine the vm_start.sh file should be downloaded alone and ran in the folder where we want to see the moq-rs and other projects folders. - this is wip
For the whole project to work on a fresh **Ubuntu 22.04** machine the vm_start.sh file should be downloaded alone and ran in the folder where we want to see the moq-rs and other projects folders. Also a the_path.py file is needed to be created inside that we only need a variable called PATH_GO which tells the exact path to the go binary. And also a venv variable which is all is correctly set up should be the command activeting the venv under the venv folder. - this is wip

Right now (this is also wip) to start the good_try.py mininet script we need a topology file. Now these topology files are gotten from the cdn-opti repo which should be cloned next to this one so it can be reached.

Expand Down
102 changes: 63 additions & 39 deletions base_try.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@


#!/usr/bin/env python
tls_verify = os.getenv("TLS_VERIFY", True)
clocked = os.getenv("CLOCKED", False)
tls_verify = not os.getenv("NO_CERT", False)


def calculate_statistics(latencies):
average = np.mean(latencies)
Expand All @@ -36,27 +38,26 @@ def extract_latency(line):
return int(match.group(1))
return None

def debug(msg):
if my_debug:
log.info(msg + '\n')


def main():
parser = argparse.ArgumentParser(description='Process some integers.')
parser.add_argument('--filename', type=str, required=True, help='Filename for the output without .txt')
parser.add_argument('--clock', action='store_true', help='Use clocked')
parser.add_argument('--tls-verify', action='store_true', help='Use tls_verify')
parser.add_argument('--track', type=str, required=True, help='Track name')
args = parser.parse_args()

setLogLevel('info')
clocked = args.clock
tls_verify = args.tls_verify

setLogLevel('critical')
template_for_relays = (
'RUST_LOG=debug RUST_BACKTRACE=0 '
'./target/debug/moq-relay --bind \'{bind}\' --api {api} --node \'{node}\' '
'--tls-cert ./dev/localhost.crt --tls-key ./dev/localhost.key '
' {tls_verify} --dev {origi} &'
)

with open("../cdn-optimization/datasource/baselining_topo.yaml", 'r') as file:
config = yaml.safe_load(file)

print("** Baking fresh cert")
ip_string = ' '.join(['12.0.1.2 12.0.1.1 12.0.2.2 12.0.2.1'])
with open('./dev/cert', 'r') as file:
cert_content = file.readlines()
Expand All @@ -65,11 +66,13 @@ def main():
file.writelines(cert_content)
env = os.environ.copy()
env['PATH'] = env['PATH'] +the_path.PATH_GO
subprocess.call(['./dev/cert2'], env=env)
subprocess.call(['./dev/cert2'], env=env, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)

tls_verify_str = ""
tls_verify_gst_str=""
if not tls_verify:
tls_verify_str = "--tls-disable-verify"
tls_verify_gst_str= "tls-disable-verify=true"

filename = args.filename
assumed_baseline = 0
Expand Down Expand Up @@ -114,35 +117,56 @@ def main():
))
sleep(1)
# CLI(net)
# Start the publisher on one host
track = config['first_hop_relay'][0]['track']
print(f"track: {track}")
vidi_filenammm = track.split("_")[0]
baseline_pub.cmd(f'xterm -hold -T "baseline-pub" -e bash -c "export GST_PLUGIN_PATH="${{PWD}}/../moq-gst/target/debug${{GST_PLUGIN_PATH:+:$GST_PLUGIN_PATH}}:${{PWD}}/../6gxr-latency-clock"; gst-launch-1.0 -q -v -e filesrc location="./dev/{vidi_filenammm}.mp4" ! qtdemux name=before01 \
before01.video_0 ! h264parse name=before02 ! avdec_h264 name=before03 ! videoconvert name=before2 ! timestampoverlay name=middle ! videoconvert name=after1 ! x264enc tune=zerolatency name=after2 ! h264parse name=after3 ! isofmp4mux chunk-duration=1 fragment-duration=1 name=after4 ! moqsink tls-disable-verify=true url="https://12.0.1.2:4443" namespace="{track}";sleep 0.1 "&')
sleep(0.5)
baseline_sub.cmd(f'xterm -hold -T "baseline-sub" -e bash -c "export GST_PLUGIN_PATH="${{PWD}}/../moq-gst/target/debug${{GST_PLUGIN_PATH:+:$GST_PLUGIN_PATH}}:${{PWD}}/../6gxr-latency-clock"; export RST_LOG=debug; ./target/debug/moq-sub --name {track} https://12.0.1.2:4443 | GST_DEBUG=timeoverlayparse:4 gst-launch-1.0 --no-position filesrc location=/dev/stdin ! decodebin ! videoconvert ! timeoverlayparse ! videoconvert ! fakesink 2> measurements/assumed_baseline_pre_{filename}.txt" &')
sleep(10)
print([file for file in glob.glob(os.path.join('measurements', '*')) if file.startswith('measurements/assumed_baseline_pre_')])
lat = sorted([file for file in glob.glob(os.path.join('measurements', '*')) if file.startswith('measurements/assumed_baseline_pre_')], key=os.path.getctime, reverse=True)[0]
print(lat)

with open(lat, 'r') as file:
file_latencies = []
for line in file:
latency = extract_latency(line)
if latency is not None:
file_latencies.append(latency)
if file_latencies:
average, median, percentile_99 = calculate_statistics(file_latencies)
print(f"assumed baseline:{average}")
assumed_baseline = average
baseline_file = f"measurements/assumed_baseline_{filename}.txt"
with open(baseline_file, 'w') as file:
file.write(str(assumed_baseline))

track = args.track
if not clocked:
vidi_filenammm = track.split("_")[1]
baseline_pub.cmd(f'xterm -hold -T "baseline-pub" -e bash -c "export GST_PLUGIN_PATH="${{PWD}}/../moq-gst/target/debug${{GST_PLUGIN_PATH:+:$GST_PLUGIN_PATH}}:${{PWD}}/../6gxr-latency-clock"; gst-launch-1.0 -q -v -e filesrc location="./dev/{vidi_filenammm}.mp4" ! qtdemux name=before01 \
before01.video_0 ! h264parse name=before02 ! avdec_h264 name=before03 ! videoconvert name=before2 ! timestampoverlay name=middle ! videoconvert name=after1 ! x264enc tune=zerolatency name=after2 ! h264parse name=after3 ! isofmp4mux chunk-duration=1 fragment-duration=1 name=after4 ! moqsink {tls_verify_gst_str} url="https://12.0.1.2:4443" namespace="{track}";sleep 0.1 "&')
sleep(0.5)
baseline_sub.cmd(f'xterm -hold -T "baseline-sub" -e bash -c "export GST_PLUGIN_PATH="${{PWD}}/../moq-gst/target/debug${{GST_PLUGIN_PATH:+:$GST_PLUGIN_PATH}}:${{PWD}}/../6gxr-latency-clock"; export RST_LOG=debug; ./target/debug/moq-sub --name {track} https://12.0.1.2:4443 | GST_DEBUG=timeoverlayparse:4 gst-launch-1.0 --no-position filesrc location=/dev/stdin ! decodebin ! videoconvert ! timeoverlayparse ! videoconvert ! fakesink 2> measurements/assumed_baseline_pre_{filename}.txt" &')
sleep(30) # should match with the normal measure
subprocess.call(['sudo', 'pkill', '-f', 'xterm'], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)

lat = f"measurements/assumed_baseline_pre_{filename}.txt"

with open(lat, 'r') as file:
file_latencies = []
for line in file:
latency = extract_latency(line)
if latency is not None:
file_latencies.append(latency)
if file_latencies:
average, median, percentile_99 = calculate_statistics(file_latencies)
assumed_baseline = average
baseline_file = f"measurements/assumed_baseline_{filename}.txt"
with open(baseline_file, 'w') as file:
file.write(str(assumed_baseline))
print(f"*** assumed baseline: {assumed_baseline}")


else:
baseline_pub.cmd(f'xterm -hold -T "baseline-pub" -e bash -c "RUST_LOG=info ./target/debug/moq-clock --publish --namespace {track} https://12.0.1.2:4443" &')
sleep(0.5)
baseline_sub.cmd(f'xterm -hold -T "baselin-sub" -e bash -c "RUST_LOG=info ./target/debug/moq-clock --namespace {track} https://12.0.1.2:4443 {tls_verify_str} >> measurements/assumed_baseline_clock_pre_{filename}.txt" &')
sleep(30)
subprocess.call(['sudo', 'pkill', '-f', 'xterm'], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
file_path1 = f"measurements/assumed_baseline_clock_pre_{filename}.txt"
with open(file_path1, 'r') as file:
file_latencies = []
for line in file:
try:
latency = int(line.strip()) * 1000000
file_latencies.append(latency)
except ValueError:
continue
if file_latencies:
assumed_baseline, median, percentile_99 = calculate_statistics(file_latencies)
print(f"*** assumed baseline (clocked): {assumed_baseline}")
baseline_file = f"measurements/assumed_baseline_{filename}.txt"
with open(baseline_file, 'w') as file:
file.write(str(assumed_baseline))
net.stop()
print(f"assumed baseline: {assumed_baseline}")

subprocess.call(['sudo', 'pkill', '-f', 'xterm'], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)


Expand Down
88 changes: 38 additions & 50 deletions good-try.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,7 @@
topofile= os.getenv("TOPO", "tiniest_topo.yaml")
folding= os.getenv("BUILD", False)
# # gst mostly, clock, ffmpeg
# mode= os.getenv("MODE", "clock")
# # origi, opti
# api= os.getenv("API", "origi")
mode = os.getenv("MODE", "clock")


def info(msg):
Expand Down Expand Up @@ -67,37 +65,48 @@ def extract_latency(line):
exit("** This script must be run as root")
else:
print("** Mopping up remaining mininet")
topo_idx=0
test_set=[
# ("small_topo_p2s.yaml","origi"),
# ("small_topo_p2s.yaml","opti"),
# ("small_topo_ps.yaml","origi"),
# ("small_topo_ps.yaml","opti"),
("small_topo_ps.yaml","origi"),
]
for i in range(num_of_tries):
for i in range(len(test_set)):

if not os.path.exists('the_path.py'):
exit("** the_path module is not available")

test_set=the_path.test_set

for topo_idx in range(len(test_set)):
for try_idx in range(num_of_tries):

subprocess.call(['sudo', 'mn', '-c'], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
subprocess.call(['sudo', 'pkill', '-f','gst-launch'], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)

print("** Folding them needed binaries")
if my_debug or folding:
print("** Folding them needed binaries")
subprocess.run(['rm', 'target/debug/*'], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
subprocess.run(['sudo', '-u', 'szebala', '/home/szebala/.cargo/bin/cargo', 'build'], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
subprocess.run(['sudo', '-u', the_path.user, the_path.cargopath, 'build'], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)

sleep(5)



if __name__ == '__main__':

setLogLevel( 'info' )
if my_debug:
setLogLevel( 'info' )
else:
setLogLevel( 'critical' )
current_time1 = datetime.datetime.now().strftime("%Y%m%d%H%M%S")
baseline_path = os.path.join('measurements', f"assumed_baseline_{current_time1}.txt")
based_line=0.0
baseline_clk_str=""
baseline_tls_str=""

topofile = test_set[topo_idx][0]
with open(f"../cdn-optimization/datasource/{topofile}", 'r') as file:
config = yaml.safe_load(file)
config['mode'] = test_set[topo_idx][2] if len(test_set[topo_idx]) > 2 else mode
print(f"** Sorting out the config {topofile} with {config['mode']} and {config['api']}")

subprocess.call(['sudo', 'python', 'base_try.py','--filename',f"{current_time1}"])
if config['mode'] == 'clock':
baseline_clk_str="--clock"
if forklift_certified:
baseline_tls_str="--tls-verify"
subprocess.call(['sudo', 'python', 'base_try.py', '--filename', f"{current_time1}",'--track',f"{config['first_hop_relay'][0]['track']}"] + ([baseline_clk_str] if baseline_clk_str else []) + ([baseline_tls_str] if baseline_tls_str else []))

with open(baseline_path, 'r') as file:
baseline_content = file.read().strip()
Expand All @@ -106,24 +115,15 @@ def extract_latency(line):

net = Mininet( topo=None, waitConnected=True, link=partial(TCLink) )
net.staticArp()
#https://github.com/bigswitch/mininet/blob/master/util/sysctl_addon


switch = net.addSwitch('s1',failMode='standalone')
print(f"!!!!!!!!!!!!!!!!!!!!!!! Reading the topo file {topo_idx}")

topofile = test_set[topo_idx][0]
print(f"!!!!!!!!!!!!!!!!!!!!!!! Reading the topo file {topofile}")
with open(f"../cdn-optimization/datasource/{topofile}", 'r') as file:
config = yaml.safe_load(file)

config['api'] = test_set[topo_idx][1]
config['mode'] = 'clock'

relay_number = len(config['nodes'])


print("** Sorting out the config")
node_names = [item['name'] for item in config['nodes']]
edges = config['edges']
connections = []
Expand All @@ -139,7 +139,7 @@ def extract_latency(line):
edges = connections


print("** Baking fresh cert")
# print("** Baking fresh cert")
ip_string = ' '.join([f'10.3.0.{i}' for i in range(1, relay_number+1)])
with open('./dev/cert', 'r') as file:
cert_content = file.readlines()
Expand Down Expand Up @@ -193,7 +193,7 @@ def extract_latency(line):
k += 1


# ** Setting up full mesh network
# *** Setting up full mesh network
network_counter = 0
delay=None
# *** connecting pubs and subs
Expand Down Expand Up @@ -233,7 +233,6 @@ def extract_latency(line):
params1={'ip': ip1},
params2={'ip': ip2})
else:
# info(f"\n** this delay is put between {host1} {host2}")
net.addLink(host1, host2, cls=TCLink, delay=f'{delay}ms',
params1={'ip': ip1},
params2={'ip': ip2})
Expand Down Expand Up @@ -289,28 +288,18 @@ def extract_latency(line):
api.cmd('REDIS=10.2.0.99 ./dev/api --bind [::]:4442 &')
else:
if config['api']=="opti":
api.cmd(f'cd ../cdn-optimization; source venv/bin/activate; TOPOFILE={topofile} python -m fastapi dev app/api.py --host 10.1.1.1 --port 4442 &')
sleep(2)

# else:
# api.cmd('REDIS=10.2.0.99 ./dev/api --topo-path topo.yaml --bind [::]:4442 &')
# template_for_relays = (
# 'RUST_LOG=debug RUST_BACKTRACE=0 '
# './target/debug/moq-relay --bind \'{bind}\' --api {api} --node \'{node}\' '
# '--tls-cert ./dev/localhost.crt --tls-key ./dev/localhost.key '
# '--tls-disable-verify --dev &'
# )
api.cmd(f'cd ../cdn-optimization;{the_path.venv} TOPOFILE={topofile} python -m fastapi dev app/api.py --host 10.1.1.1 --port 4442 &')

# for some reason this is needed or the first relay wont reach the api
# (ffmpeg needs the 1s, gst can work with less)
sleep(2)
sleep(4)

host_counter = 1

tls_verify_str = ""
if not forklift_certified:
tls_verify_str = "--tls-disable-verify"

host_counter = 1
for h in relays:
ip_address = f'10.3.0.{host_counter}'
debug(f'Starting relay on {h} - {ip_address}')
Expand All @@ -330,8 +319,6 @@ def extract_latency(line):
node=f'https://{ip_address}:4443',
tls_verify=tls_verify_str,
origi=origi_api_str


))

host_counter += 1
Expand Down Expand Up @@ -491,6 +478,9 @@ def get_video_duration(file_path):
if '10.1.1.' not in ip_address:
interface_names.append(interface_name)
net_dev_output = host.cmd('cat /proc/net/dev').strip().split('\n')
with open(f"measurements/{current_time}_{host.name}_network.txt", 'w') as file:
file.write('\n'.join(net_dev_output))
file.write('\n'.join(interfaces))
for line in net_dev_output:
if any(interface_name in line for interface_name in interface_names):
stats = line.split(':')[1].split()
Expand Down Expand Up @@ -539,7 +529,6 @@ def get_video_duration(file_path):
response = api.cmd(f'curl -s http://10.1.1.1:4442/tracks/{first_hop_track}/topology')
response_lines = response.strip().split('\n')
sanitized_response_lines = [line for line in response_lines if line.startswith('{')][0].strip('(venv)')
print(sanitized_response_lines)
if len(response_lines) > 1:
response_json = json.loads(sanitized_response_lines)
sum_cost[first_hop_relay['track']] += float(response_json.get('cost', 0))
Expand Down Expand Up @@ -638,7 +627,7 @@ def get_video_duration(file_path):


with open(f"measurements/{summing_current_time}_enddelays.txt", 'a') as enddelays_file:
header = f"filename(id if multiple tracks_video(bbb-vid resolution-length)_cost budget);average of timestampoverlay;deviation of timestampoverlay;baseline;avarage-baseline;number of frames;didwarn;ending time;sum cost for all subscribers on this track;following are same for all hosts rx bytes;tx bytes;rx pckts;tx pckts"
header = f"filename(id if multiple tracks_video(bbb-vid resolution-length)_cost budget);average of timestampoverlay;deviation of timestampoverlay;baseline;avarage-baseline;number of frames;didwarn;ending time;sum cost for all subscribers on this track;following are same for all hosts;rx bytes;tx bytes;rx pckts;tx pckts"
if not file_exists:
enddelays_file.write(f"{header}")
print(f"{header}")
Expand Down Expand Up @@ -681,7 +670,7 @@ def get_video_duration(file_path):
else:
ending_time=0
did_it_warn=0
actual_line=f"\n{file_path.replace('measurements/','')};{average};{distribution};{based_line};{average-based_line};{count};{did_it_warn};{ending_time};{sum_cost[track]};{all_network_receive_bytes};{all_network_transmit_bytes};{all_network_receive_packets};{all_network_transmit_packets}"
actual_line=f"\n{file_path.replace('measurements/','')};{average};{distribution};{based_line};{average-based_line};{count};{did_it_warn};{ending_time};{sum_cost[track]};;{all_network_receive_bytes};{all_network_transmit_bytes};{all_network_receive_packets};{all_network_transmit_packets}"
enddelays_file.write(f"{actual_line}")
print(f"{actual_line}")
if gst_shark == 2:
Expand All @@ -693,6 +682,5 @@ def get_video_duration(file_path):
if gst_shark == 1:
print(f">> subtracting average proctimes: {average-baseline}")

topo_idx+=1


Loading

0 comments on commit 225fe7e

Please sign in to comment.