diff --git a/.gitignore b/.gitignore index 17dbd963..f95cc178 100644 --- a/.gitignore +++ b/.gitignore @@ -7,6 +7,7 @@ id_rsa* *.tf .tmp logs/ +jupyter_notebooks/* *package-lock.json *node_modules scripts/*.csv diff --git a/application/text_translation/launch_benchmark_kubernetes.yml b/application/text_translation/launch_benchmark_kubernetes.yml new file mode 100644 index 00000000..6a00e607 --- /dev/null +++ b/application/text_translation/launch_benchmark_kubernetes.yml @@ -0,0 +1,57 @@ +--- +- hosts: cloudcontroller + become: true + tasks: + - name: Create job file + shell: | + cat > "/home/{{ username }}/job-template.yaml" < "/home/{{ username }}/jobs/job-$i.yaml" + done + + - name: Launch jobs + command: > + kubectl apply -f "/home/{{ username }}/jobs" diff --git a/application/text_translation/src/publisher/Dockerfile b/application/text_translation/src/publisher/Dockerfile new file mode 100644 index 00000000..2d5c63a5 --- /dev/null +++ b/application/text_translation/src/publisher/Dockerfile @@ -0,0 +1,23 @@ +# Define custom function directory +ARG FUNCTION_DIR="/function" + +#------------------------------------------------------------------------ +FROM python:3.8.7-slim-buster + +# Include global arg in this stage of the build +ARG FUNCTION_DIR + +# Copy function source code +RUN mkdir -p ${FUNCTION_DIR} +COPY src/ ${FUNCTION_DIR}/ + +# Install app specific packages +RUN pip3 install -r ${FUNCTION_DIR}/requirements.txt + +# Set working directory to function root directory +WORKDIR ${FUNCTION_DIR} + +# Open port to the MQTT broker +EXPOSE 1883 + +CMD [ "python3", "-u", "publisher.py"] diff --git a/application/text_translation/src/publisher/docker.sh b/application/text_translation/src/publisher/docker.sh new file mode 100755 index 00000000..cbe77391 --- /dev/null +++ b/application/text_translation/src/publisher/docker.sh @@ -0,0 +1 @@ +docker buildx build --platform linux/amd64,linux/arm64 -t fzovpec2/text_translation:text_translation_publisher --push . \ No newline at end of file diff --git a/application/text_translation/src/publisher/src/crime_and_punishment.txt b/application/text_translation/src/publisher/src/crime_and_punishment.txt new file mode 100644 index 00000000..abbd5d71 --- /dev/null +++ b/application/text_translation/src/publisher/src/crime_and_punishment.txt @@ -0,0 +1,53 @@ +On an exceptionally hot evening early in July a young man came out of the garret in which he lodged in S. +Place and walked slowly, as though in hesitation, towards K. bridge. +He had successfully avoided meeting his landlady on the staircase. His garret was under the roof of a high, +five-storied house and was more like a cupboard than a room. The landlady who provided him with garret, dinners, +and attendance, lived on the floor below, and every time he went out he was obliged to pass her kitchen, the +door of which invariably stood open. And each time he passed, the young man had a sick, frightened feeling, +which made him scowl and feel ashamed. He was hopelessly in debt to his landlady, and was afraid of meeting her. +This was not because he was cowardly and abject, quite the contrary; but for some time past he had been in an +overstrained irritable condition, verging on hypochondria. He had become so completely absorbed in himself, +and isolated from his fellows that he dreaded meeting, not only his landlady, but anyone at all. He was +crushed by poverty, but the anxieties of his position had of late ceased to weigh upon him. He had given +up attending to matters of practical importance; he had lost all desire to do so. Nothing that any landlady +could do had a real terror for him. But to be stopped on the stairs, to be forced to listen to her trivial, +irrelevant gossip, to pestering demands for payment, threats and complaints, and to rack his brains for excuses, +to prevaricate, to lie—no, rather than that, he would creep down the stairs like a cat and slip out unseen. +This evening, however, on coming out into the street, he became acutely aware of his fears. +“I want to attempt a thing like that and am frightened by these trifles,” he thought, with an odd +smile. “Hm... yes, all is in a man’s hands and he lets it all slip from cowardice, that’s an axiom. It would +be interesting to know what it is men are most afraid of. Taking a new step, uttering a new word is what they +fear most.... But I am talking too much. It’s because I chatter that I do nothing. Or perhaps it is that I +chatter because I do nothing. I’ve learned to chatter this last month, lying for days together in my den +thinking... of Jack the Giant-killer. Why am I going there now? Am I capable of that? Is that serious? It is +not serious at all. It’s simply a fantasy to amuse myself; a plaything! Yes, maybe it is a plaything.” +The heat in the street was terrible: and the airlessness, the bustle and the plaster, scaffolding, bricks, +and dust all about him, and that special Petersburg stench, so familiar to all who are unable to get out of +town in summer—all worked painfully upon the young man’s already overwrought nerves. The insufferable stench +from the pot-houses, which are particularly numerous in that part of the town, and the drunken men whom he met +continually, although it was a working day, completed the revolting misery of the picture. An expression of the +profoundest disgust gleamed for a moment in the young man’s refined face. He was, by the way, exceptionally handsome, +above the average in height, slim, well-built, with beautiful dark eyes and dark brown hair. Soon he sank into +deep thought, or more accurately speaking into a complete blankness of mind; he walked along not observing what +was about him and not caring to observe it. From time to time, he would mutter something, from the habit of talking +to himself, to which he had just confessed. At these moments he would become conscious that his ideas were sometimes +in a tangle and that he was very weak; for two days he had scarcely tasted food. +He was so badly dressed that even a man accustomed to shabbiness would have been ashamed to be seen in the street in +such rags. In that quarter of the town, however, scarcely any shortcoming in dress would have created surprise. Owing +to the proximity of the Hay Market, the number of establishments of bad character, the preponderance of the trading +and working class population crowded in these streets and alleys in the heart of Petersburg, types so various were +to be seen in the streets that no figure, however queer, would have caused surprise. But there was such accumulated +bitterness and contempt in the young man’s heart, that, in spite of all the fastidiousness of youth, he minded +his rags least of all in the street. It was a different matter when he met with acquaintances or with former +fellow students, whom, indeed, he disliked meeting at any time. And yet when a drunken man who, for some +unknown reason, was being taken somewhere in a huge waggon dragged by a heavy dray horse, suddenly shouted at him +as he drove past: “Hey there, German hatter” bawling at the top of his voice and pointing at him—the young man +stopped suddenly and clutched tremulously at his hat. It was a tall round hat from Zimmerman’s, but completely worn +out, rusty with age, all torn and bespattered, brimless and bent on one side in a most unseemly fashion. Not shame, +however, but quite another feeling akin to terror had overtaken him. +“I knew it,” he muttered in confusion, “I thought so! That’s the worst of all! Why, a stupid thing like this, the +most trivial detail might spoil the whole plan. Yes, my hat is too noticeable.... It looks absurd and that makes +it noticeable.... With my rags I ought to wear a cap, any sort of old pancake, but not this grotesque thing. Nobody +wears such a hat, it would be noticed a mile off, it would be remembered.... What matters is that people would +remember it, and that would give them a clue. For this business one should be as little conspicuous as possible.... +Trifles, trifles are what matter! Why, it’s just such trifles that always ruin everything....” diff --git a/application/text_translation/src/publisher/src/publisher.py b/application/text_translation/src/publisher/src/publisher.py new file mode 100644 index 00000000..8d5a02d7 --- /dev/null +++ b/application/text_translation/src/publisher/src/publisher.py @@ -0,0 +1,187 @@ +"""\ +This is a publisher, sending local images over MQTT to a subscriber for further processing. +""" + +import time +import os +import paho.mqtt.client as mqtt + +MQTT_LOCAL_IP = os.environ["MQTT_LOCAL_IP"] +MQTT_REMOTE_IP = os.environ["MQTT_REMOTE_IP"] +MQTT_LOGS = os.environ["MQTT_LOGS"] +FREQUENCY = float(os.environ["FREQUENCY"]) / 10 +MQTT_TOPIC_PUB = "text-translation-pub" +MQTT_TOPIC_SUB = "text-translation-sub" + +if "DURATION" in os.environ: + DURATION = int(os.environ["DURATION"]) +else: + DURATION = 300 + +# Set how many imgs to send, and how often +SEC_PER_FRAME = float(1 / FREQUENCY) +MAX_TXTS = int(FREQUENCY * DURATION) + +RECEIVED = 0 + + +def on_connect(local_client, _userdata, _flags, rc): + """Execute when connecting to MQTT broker + + Args: + local_client (object): Client object + _userdata (_type_): _description_ + _flags (_type_): _description_ + rc (str): Result code + """ + print("Connected with result code " + str(rc) + "\n", end="") + local_client.subscribe(MQTT_TOPIC_PUB) + + +def on_subscribe(_mqttc, _obj, _mid, _granted_qos): + """Execute when subscribing to a topic on a MQTT broker + + Args: + _mqttc (_type_): _description_ + _obj (_type_): _description_ + _mid (_type_): _description_ + _granted_qos (_type_): _description_ + """ + print("Subscribed to topic\n", end="") + + +def on_log(_client, _userdata, level, buff): + """Execute MQTT log on every MQTT event + + Args: + _client (_type_): _description_ + _userdata (_type_): _description_ + level (str): Log level (error, warning, info, etc) + buff (str): Log message + """ + print("[ %s ] %s\n" % (str(level), buff), end="") + + +def on_message(_client, _userdata, msg): + """Execute when receiving a message on a topic you are subscribed to + + Args: + _client (_type_): _description_ + _userdata (_type_): _description_ + msg (str): Received message + """ + t_now = time.time_ns() + + print(msg.payload) + + t_old_bytes = msg.payload[-20:] + print(len(t_old_bytes)) + t_old = int(t_old_bytes.decode("utf-8")) + + print("Latency (ns): %i" % (t_now - t_old)) + global RECEIVED + RECEIVED += 1 + + +def on_publish(_mqttc, _obj, _mid): + """Execute when publishing / sending data + + Args: + _mqttc (_type_): _description_ + _obj (_type_): _description_ + _mid (_type_): _description_ + """ + print("Published data") + + +def connect(): + """Execute when connecting to a MQTT broker""" + print("Start connecting to the local MQTT broker") + print("Broker ip: " + str(MQTT_LOCAL_IP)) + print("Topic: " + str(MQTT_TOPIC_PUB)) + + local_client = mqtt.Client() + local_client.on_connect = on_connect + local_client.on_message = on_message + local_client.on_subscribe = on_subscribe + + if MQTT_LOGS == "True": + local_client.on_log = on_log + + local_client.connect(MQTT_LOCAL_IP, port=1883, keepalive=300) + local_client.loop_start() + + +def send(): + """Loop over local images, and send them one by one to a remote MQTT broker""" + # Loop over the dataset of 60 images + files = [] + print("Start connecting to the remote MQTT broker") + print("Broker ip: " + str(MQTT_REMOTE_IP)) + print("Topic: " + str(MQTT_TOPIC_SUB)) + + remote_client = mqtt.Client() + remote_client.on_publish = on_publish + + remote_client.connect(MQTT_REMOTE_IP, port=1883, keepalive=120) + print("Connected with the broker") + + with open('crime_and_punishment.txt', 'r') as file: + lines = file.readlines() + print("Read %i lines from the file" % len(lines)) + + # Send all frames over MQTT, one by one + for i in range(MAX_TXTS): + start_time = time.time_ns() + + print("The start time (ns): %i" % (start_time)) + + text = lines[i % len(lines)].strip() + print("Sending text: %s" % text) + byte_arr = bytearray(text.encode("utf-8")) + + # Prepend 0's to the time to get a fixed length string + t = time.time_ns() + t = (20 - len(str(t))) * "0" + str(t) + byte_arr.extend(t.encode("utf-8")) + + # Append local IP address so edge or cloud knows who to send a reply to + ip_bytes = (15 - len(MQTT_LOCAL_IP)) * "-" + MQTT_LOCAL_IP + byte_arr.extend(ip_bytes.encode("utf-8")) + + print("Sending data (bytes): %i" % (len(byte_arr))) + _ = remote_client.publish(MQTT_TOPIC_SUB, byte_arr, qos=0) + + # Try to keep a frame rate of X + sec_frame = time.time_ns() - start_time + print("Preparation and preprocessing (ns): %i" % (sec_frame)) + sec_frame = float(sec_frame) / 10**9 + + if sec_frame < SEC_PER_FRAME: + # Wait until next frame should happen + frame = 0.1 * (SEC_PER_FRAME - sec_frame) + while sec_frame < SEC_PER_FRAME: + time.sleep(frame) + sec_frame = float(time.time_ns() - start_time) / 10**9 + else: + print("Can't keep up with %f seconds per frame: Took %f" % (SEC_PER_FRAME, sec_frame)) + + # Make sure the finish message arrives + remote_client.loop_start() + remote_client.publish(MQTT_TOPIC_SUB, "1", qos=2) + remote_client.loop_stop() + + remote_client.disconnect() + print("Finished, sent %i texts" % (MAX_TXTS)) + + +if __name__ == "__main__": + connect() + send() + + print("Wait for all texts to be received back") + while RECEIVED != MAX_TXTS: + print("Waiting progress: %i / %i" % (RECEIVED, MAX_TXTS)) + time.sleep(10) + + print("All %i texts have been received back" % (MAX_TXTS)) \ No newline at end of file diff --git a/application/text_translation/src/publisher/src/requirements.txt b/application/text_translation/src/publisher/src/requirements.txt new file mode 100644 index 00000000..390a959a --- /dev/null +++ b/application/text_translation/src/publisher/src/requirements.txt @@ -0,0 +1 @@ +paho-mqtt==1.5.1 \ No newline at end of file diff --git a/application/text_translation/src/subscriber/Dockerfile b/application/text_translation/src/subscriber/Dockerfile new file mode 100644 index 00000000..7d90685a --- /dev/null +++ b/application/text_translation/src/subscriber/Dockerfile @@ -0,0 +1,56 @@ +# Define custom function directory +ARG FUNCTION_DIR="/function" + +#------------------------------------------------------------------------ +FROM python:3.8.7-slim-buster as build_image + +# Install packages needed for TFlite +RUN apt update +RUN apt install -y \ + swig \ + libjpeg-dev \ + zlib1g-dev \ + python3-dev \ + python3-numpy \ + python3-pip \ + git \ + curl \ + unzip \ + wget \ + tar + +# Install python specific packages +RUN pip3 install numpy pybind11 + +# Install TF lite from github +RUN wget https://github.com/tensorflow/tensorflow/archive/refs/tags/v2.3.4.tar.gz +RUN tar xvf v2.3.4.tar.gz +RUN bash ./tensorflow-2.3.4/tensorflow/lite/tools/make/download_dependencies.sh +RUN bash ./tensorflow-2.3.4/tensorflow/lite/tools/pip_package/build_pip_package.sh + + +#------------------------------------------------------------------------ +FROM python:3.8.7-slim-buster as runtime_image + +# Include global arg in this stage of the build +ARG FUNCTION_DIR + +# Copy function source code +RUN mkdir -p ${FUNCTION_DIR} +COPY src/* ${FUNCTION_DIR}/ + +# Copy TFlite wheel +# NOTE: IF TENSORFLOW UPDATES, THIS DOCKERFILE MAY CRASH HERE. UPDATE THE NAMES IN THAT CASE +# COPY --from=build_image tensorflow-2.3.4/tensorflow/lite/tools/pip_package/gen/tflite_pip/python3/dist/*.whl ./ +# RUN pip3 install *.whl + +# Install app specific packages +RUN pip3 install -r ${FUNCTION_DIR}/requirements.txt + +# Set working directory to function root directory +WORKDIR ${FUNCTION_DIR} + +# Open port to the MQTT broker +EXPOSE 1883 + +CMD [ "python3", "-u", "subscriber.py"] diff --git a/application/text_translation/src/subscriber/docker.sh b/application/text_translation/src/subscriber/docker.sh new file mode 100755 index 00000000..3e2eeaee --- /dev/null +++ b/application/text_translation/src/subscriber/docker.sh @@ -0,0 +1,4 @@ +#!/bin/bash +cp -r ../translator ./src/translator +docker buildx build --platform linux/amd64,linux/arm64 -t fzovpec2/text_translation:text_translation_subscriber --push . +rm -rf src/translator \ No newline at end of file diff --git a/application/text_translation/src/subscriber/src/requirements.txt b/application/text_translation/src/subscriber/src/requirements.txt new file mode 100644 index 00000000..5b7f28d2 --- /dev/null +++ b/application/text_translation/src/subscriber/src/requirements.txt @@ -0,0 +1,6 @@ +numpy +Pillow==8.1.0 +paho-mqtt==1.5.1 +transformers==4.46.3 +torch +sentencepiece \ No newline at end of file diff --git a/application/text_translation/src/subscriber/src/subscriber.py b/application/text_translation/src/subscriber/src/subscriber.py new file mode 100644 index 00000000..b05328e4 --- /dev/null +++ b/application/text_translation/src/subscriber/src/subscriber.py @@ -0,0 +1,234 @@ +"""\ +This is a subscriber, receiving images through MQTT and +processing them using image classification from TFLite. +""" + +import os +import time +import multiprocessing +import paho.mqtt.client as mqtt +import torch + +# pylint: disable-next=import-error +from transformers import MarianMTModel, MarianTokenizer + +MQTT_LOGS = os.environ["MQTT_LOGS"] +CPU_THREADS = int(os.environ["CPU_THREADS"]) +ENDPOINT_CONNECTED = int(os.environ["ENDPOINT_CONNECTED"]) +MQTT_TOPIC_PUB = "text-translation-pub" +MQTT_LOCAL_IP = os.environ["MQTT_LOCAL_IP"] +MQTT_TOPIC_SUB = "text-translation-sub" + +work_queue = multiprocessing.Queue() +endpoints_connected = multiprocessing.Value("i", ENDPOINT_CONNECTED) +texts_processed = multiprocessing.Value("i", 0) + + +def on_connect(client, _userdata, _flags, rc): + """Execute when connecting to MQTT broker + + Args: + client (object): Client object + _userdata (_type_): _description_ + _flags (_type_): _description_ + rc (str): Result code + """ + print("Connected with result code " + str(rc) + "\n", end="") + client.subscribe(MQTT_TOPIC_SUB) + + +def on_subscribe(_mqttc, _obj, _mid, _granted_qos): + """Execute when subscribing to a topic on a MQTT broker + + Args: + _mqttc (_type_): _description_ + _obj (_type_): _description_ + _mid (_type_): _description_ + _granted_qos (_type_): _description_ + """ + print("Subscribed to topic\n", end="") + + +def on_log(_client, _userdata, level, buff): + """Execute MQTT log on every MQTT event + + Args: + _client (_type_): _description_ + _userdata (_type_): _description_ + level (str): Log level (error, warning, info, etc) + buff (str): Log message + """ + print("[ %s ] %s\n" % (str(level), buff), end="") + + +def on_message(_client, _userdata, msg): + """Execute when receiving a message on a topic you are subscribed to + + Args: + _client (_type_): _description_ + _userdata (_type_): _description_ + msg (str): Received message + """ + work_queue.put([time.time_ns(), msg.payload]) + + +def on_publish(_mqttc, _obj, _mid): + """Execute when publishing / sending data + + Args: + _mqttc (_type_): _description_ + _obj (_type_): _description_ + _mid (_type_): _description_ + """ + print("Published data") + + +def connect_remote_client(current, ip): + """Connect to a remote MQTT broker + + Args: + current (obj): Multiprocessing current process object + ip (str): IP address to connect to + + Returns: + obj: MQTT client object, broker you connected to + """ + # Save IPs from connected endpoints + print("[%s] Connect to remote broker on endpoint %s" % (current.name, ip)) + remote_client = mqtt.Client() + remote_client.on_publish = on_publish + + remote_client.connect(ip, port=1883, keepalive=120) + print("[%s] Connected with the remote broker" % (current.name)) + + return remote_client + + +def do_tflite(queue): + """A Multiprocessing thread + Receive text from a queue, and perform text translation on it + + Args: + queue (obj): Multiprocessing queue with work + """ + current = multiprocessing.current_process() + print("[%s] Start thread\n" % (current.name), end="") + + # Load the model + interpreter = MarianMTModel.from_pretrained("./model") + tokenizer = MarianTokenizer.from_pretrained("./tokenizer") + print("[%s] Model loaded\n" % (current.name), end="") + # Set the model to evaluation mode + interpreter.eval() + print("[%s] Model set to evaluation mode\n" % (current.name), end="") + # Set the model to use CPU + if torch.cuda.is_available(): + interpreter.to("cpu") + print("[%s] Model set to use CPU\n" % (current.name), end="") + else: + print("[%s] No GPU available, using CPU\n" % (current.name), end="") + + print("[%s] Preparations finished\n" % (current.name), end="") + + remote_clients = {} + + while True: + print("[%s] Get item\n" % (current.name), end="") + item = queue.get(block=True) + + start_time = time.time_ns() + t_now = item[0] + data = item[1] + + # Stop if a specific message is sent + try: + if data.decode() == "1": + with endpoints_connected.get_lock(): + endpoints_connected.value -= 1 + counter = endpoints_connected.value + + print( + "[%s] A client disconnected, %i clients left\n" % (current.name, counter), + end="", + ) + continue + except (AttributeError, UnicodeDecodeError): + print("[%s] Read text and apply ML\n" % (current.name), end="") + + print("[%s] Read text and apply ML\n" % (current.name), end="") + + # Read the text, do ML on it + with texts_processed.get_lock(): + texts_processed.value += 1 + + # Get sender IP, needed to reply back + ip_bytes = data[-15:] + ip = ip_bytes.decode("utf-8") + while ip[0] == "-": + ip = ip[1:] + + # Get timestamp to calculate latency. We prepended 0's to the time to make it a fixed length + t_bytes = data[-35:-15] + t_old = int(t_bytes.decode("utf-8")) + + print("[%s] Received time is" % (current.name, t_old), end="") + print("[%s] Latency (ns): %s\n" % (current.name, str(t_now - t_old)), end="") + + # Get data to process + text = data[:-35].decode("utf-8") + translated = interpreter.generate(**tokenizer([text], return_tensors="pt", padding=True)) + result = [tokenizer.decode(t, skip_special_tokens=True) for t in translated][0] + + print("[%s] Translated text: %s\n" % (current.name, result), end="") + # Prepare the result to send back + result_bytes = result.encode("utf-8") + + sec_frame = time.time_ns() - start_time + print("[%s] Processing (ns): %i\n" % (current.name, sec_frame), end="") + + # Send result back (currently only timestamp, + # but adding real feedback is trivial and has no impact) + print("[%s] Send result to source: %s" % (current.name, ip)) + if ip not in remote_clients: + remote_clients[ip] = connect_remote_client(current, ip) + + _ = remote_clients[ip].publish(MQTT_TOPIC_PUB, result_bytes + t_bytes, qos=0) + + +def main(): + """Create multiprocessing elements and start generator / processor functions.""" + print("Start connecting to the local MQTT broker") + print("Broker ip: " + str(MQTT_LOCAL_IP)) + print("Topic: " + str(MQTT_TOPIC_SUB)) + + with multiprocessing.Pool(CPU_THREADS, do_tflite, (work_queue,)): + local_client = mqtt.Client() + local_client.on_connect = on_connect + local_client.on_message = on_message + local_client.on_subscribe = on_subscribe + + if MQTT_LOGS == "True": + local_client.on_log = on_log + + local_client.connect(MQTT_LOCAL_IP, port=1883, keepalive=300) + local_client.loop_start() + + while True: + time.sleep(1) + with endpoints_connected.get_lock(): + if endpoints_connected.value == 0 and work_queue.empty(): + # Wait for any processing still happening to finish + time.sleep(10) + break + + local_client.loop_stop() + + work_queue.close() + work_queue.join_thread() + + with texts_processed.get_lock(): + print("Finished, processed texts: %i" % texts_processed.value) + + +if __name__ == "__main__": + main() diff --git a/application/text_translation/text_translation.py b/application/text_translation/text_translation.py new file mode 100644 index 00000000..07397cce --- /dev/null +++ b/application/text_translation/text_translation.py @@ -0,0 +1,440 @@ +"""Manage the image_classification application""" + +import logging +import copy +import sys +import numpy as np +import pandas as pd + +from application import application + + +def set_container_location(config): + """Set registry location/path of containerized applications + + Args: + config (dict): Parsed configuration + """ + source = "fzovpec2/text_translation" + + config["images"] = { + "worker": "%s:text_translation_subscriber" % (source), + "endpoint": "%s:text_translation_publisher" % (source) + } + + +def add_options(_config): + """Add config options for a particular module + + Args: + config (ConfigParser): ConfigParser object + + Returns: + list(list()): Options to add + """ + settings = [ + ["frequency", float, lambda x: x >= 0, True, None], + ["duration", int, lambda x: x >= 1, False, 300], + ] + + return settings + + +def verify_options(parser, config): + """Verify the config from the module's requirements + + Args: + parser (ArgumentParser): Argparse object + config (ConfigParser): ConfigParser object + """ + if config["benchmark"]["application"] != "text_translation": + parser.error("ERROR: Application should be image_classification") + elif "cache_worker" in config["benchmark"] and config["benchmark"]["cache_worker"] == "True": + parser.error("ERROR: image_classification app does not support application caching") + elif config["benchmark"]["resource_manager"] == "kubecontrol": + parser.error("ERROR: Application image_classification does not support kubecontrol") + elif config["infrastructure"]["endpoint_nodes"] <= 0: + parser.error("ERROR: Application image classification requires at least 1 endpoint") + + +def start_worker(config, machines): + """Set variables needed when launching the app on workers + + Args: + config (dict): Parsed configuration + machines (list(Machine object)): List of machine objects representing physical machines + + Returns: + (dict): Application variables + OR + (list): Application variables + """ + if config["benchmark"]["resource_manager"] == "mist": + return start_worker_mist(config, machines) + if config["benchmark"]["resource_manager"] == "baremetal": + return start_worker_baremetal(config, machines) + + return start_worker_kube(config, machines) + + +def start_worker_kube(config, _machines): + """Set variables needed when launching the app on workers + + Args: + config (dict): Parsed configuration + machines (list(Machine object)): List of machine objects representing physical machines + + Returns: + (dict): Application variables + """ + if config["mode"] == "cloud": + worker_apps = (config["infrastructure"]["cloud_nodes"] - 1) * config["benchmark"][ + "applications_per_worker" + ] + elif config["mode"] == "edge": + worker_apps = ( + config["infrastructure"]["edge_nodes"] * config["benchmark"]["applications_per_worker"] + ) + + app_vars = { + "container_port": 1883, + "mqtt_logs": True, + "endpoint_connected": int(config["infrastructure"]["endpoint_nodes"] / worker_apps), + "cpu_threads": max(1, int(config["benchmark"]["application_worker_cpu"])), + } + return app_vars + + +def start_worker_mist(config, _machines): + """Set variables needed when launching the app on workers with Mist + + Args: + config (dict): Parsed configuration + machines (list(Machine object)): List of machine objects representing physical machines + + Returns: + (list): Application variables + """ + app_vars = [ + "MQTT_LOGS=True", + "CPU_THREADS=%i" % (config["infrastructure"]["edge_cores"]), + "ENDPOINT_CONNECTED=%i" + % ( + int(config["infrastructure"]["endpoint_nodes"] / config["infrastructure"]["edge_nodes"]) + ), + ] + return app_vars + + +def start_worker_baremetal(config, _machines): + """Set variables needed when launching the app on workers with Mist + + Args: + config (dict): Parsed configuration + machines (list(Machine object)): List of machine objects representing physical machines + + Returns: + (list): Application variables + """ + app_vars = [ + "MQTT_LOCAL_IP=%s" % (config["registry"].split(":")[0]), + "MQTT_LOGS=True", + "CPU_THREADS=%i" % (config["infrastructure"]["cloud_cores"]), + "ENDPOINT_CONNECTED=%i" + % ( + int( + config["infrastructure"]["endpoint_nodes"] / config["infrastructure"]["cloud_nodes"] + ) + ), + ] + return app_vars + + +def gather_worker_metrics(_machines, _config, worker_output, _starttime): + """Gather metrics from cloud or edge workers for the image_classification app + + Args: + machines (list(Machine object)): List of machine objects representing physical machines + config (dict): Parsed configuration + worker_output (list(list(str))): Output of each container ran on the edge + starttime (datetime): Time that 'kubectl apply' is called to launche the benchmark + + Returns: + list(dict): List of parsed output for each cloud or edge worker + """ + worker_metrics = [] + if worker_output == []: + return worker_metrics + + worker_set = { + "worker_id": None, # ID of this worker + "total_time": None, # Total runtime for the worker + "comm_delay_avg": None, # Average endpoint -> worker delay + "comm_delay_stdev": None, # Stdev of delay + "proc_avg": None, # Average time to process 1 data element on worker + } + + # Use 5th-90th percentile for average + lower_percentile = 0.05 + upper_percentile = 0.90 + + # Worker_output = [[pod_name, [output_line, ...]], ...] + for i, out in enumerate(worker_output): + logging.info("Parse output from worker node %i", i) + worker_metrics.append(copy.deepcopy(worker_set)) + worker_metrics[-1]["worker_id"] = i + + # Get network delay in ms (10**-3) and execution times + # Sometimes, the program gets an incorrect line, then skip + delays = [] + processing = [] + start_time = 0 + end_time = 0 + negatives = [] + for line in out[1]: + if start_time == 0 and "Read image and apply ML" in line: + start_time = application.to_datetime(line) + elif "Get item" in line: + end_time = application.to_datetime(line) + elif any(word in line for word in ["Latency", "Processing"]): + try: + unit = line[line.find("(") + 1 : line.find(")")] + time = int(line.rstrip().split(":")[-1]) + except ValueError as e: + logging.warning("Got an error while parsing line: %s. Exception: %s", line, e) + continue + + units = ["ns"] + if time < 0: + negatives.append(time) + continue + + if unit not in units: + logging.warning("Unit should be [%s], got %s", ",".join(units), unit) + continue + + if unit == "ns": + if "Latency" in line: + delays.append(round(time / 10**6, 4)) + elif "Processing" in line: + processing.append(round(time / 10**6, 4)) + + worker_metrics[-1]["total_time"] = round((end_time - start_time).total_seconds(), 2) + + if len(negatives) > 0: + logging.warning("Got %i negative time values", len(negatives)) + + delays.sort() + processing.sort() + + logging.info( + "Get percentile values between %i - %i", lower_percentile * 100, upper_percentile * 100 + ) + + delays_perc = delays[ + int(len(delays) * lower_percentile) : int(len(delays) * upper_percentile) + ] + processing_perc = processing[ + int(len(processing) * lower_percentile) : int(len(processing) * upper_percentile) + ] + + worker_metrics[-1]["comm_delay_avg"] = round(np.mean(delays_perc), 2) + worker_metrics[-1]["comm_delay_stdev"] = round(np.std(delays_perc), 2) + worker_metrics[-1]["proc_avg"] = round(np.mean(processing_perc), 2) + + return sorted(worker_metrics, key=lambda x: x["worker_id"]) + + +def gather_endpoint_metrics(config, endpoint_output, container_names): + """Gather metrics from endpoints + + Args: + config (dict): Parsed configuration + endpoint_output (list(list(str))): Output of each endpoint container + container_names (list(str)): Names of docker containers launched + + Returns: + list(dict): List of parsed output for each endpoint + """ + endpoint_metrics = [] + endpoint_set = { + "worker_id": None, # To which worker is this endpoint connected + "total_time": None, # Total runtime of the endpoint + "proc_avg": None, # Average procesing time per data element + "data_avg": None, # Average generated data size + "latency_avg": None, # Average end-to-end latency + "latency_stdev": None, # Stdev latency + } + + # Use 5th-90th percentile for average + lower_percentile = 0.05 + upper_percentile = 0.90 + + for out, container_name in zip(endpoint_output, container_names): + logging.info("Parse output from endpoint %s", container_name) + endpoint_metrics.append(copy.deepcopy(endpoint_set)) + + # Get timestamp from first and last line + start_time = application.to_datetime(out[0]) + end_time = application.to_datetime(out[-1]) + + endpoint_metrics[-1]["total_time"] = round((end_time - start_time).total_seconds(), 2) + endpoint_metrics[-1]["data_avg"] = 0.0 + + if config["mode"] == "cloud": + name = container_name.split("_")[0] + endpoint_metrics[-1]["worker_id"] = int(name[5:]) + elif config["mode"] == "edge": + name = container_name.split("_")[0] + endpoint_metrics[-1]["worker_id"] = int(name[4:]) + elif config["mode"] == "endpoint": + endpoint_metrics[-1]["worker_id"] = int(container_name[8:]) + + # Parse line by line to get preparation, preprocessing and processing times + processing = [] + latency = [] + data_size = [] + for line in out: + if any( + word in line + for word in [ + "Preparation and preprocessing", + "Preparation, preprocessing and processing", + "Sending data", + "Latency", + ] + ): + try: + unit = line[line.find("(") + 1 : line.find(")")] + number = int(line.rstrip().split(":")[-1]) + except ValueError as e: + # Sometimes a sending and receiving line get appended in the docker log + # We just ignore this for now - it happens very infrequently + if not ("Sending data" in line and "Received PUBLISH" in line): + logging.warning( + "Got an error while parsing line: %s. Exception: %s", line, e + ) + + continue + + units = ["ns", "bytes"] + if number < 0: + logging.warning("Time/Size < 0 should not be possible: %i", number) + continue + + if unit not in units: + logging.warning("Unit should be one of [%s], got %s", ",".join(units), unit) + continue + + if "Preparation, preprocessing and processing" in line: + processing.append(round(number / 10**6, 4)) + elif "Preparation and preprocessing" in line: + processing.append(round(number / 10**6, 4)) + elif "Latency" in line: + latency.append(round(number / 10**6, 4)) + elif "Sending data" in line: + data_size.append(round(number / 10**3, 4)) + + processing.sort() + latency.sort() + + logging.info( + "Get percentile values between %i - %i", lower_percentile * 100, upper_percentile * 100 + ) + + processing_perc = processing[ + int(len(processing) * lower_percentile) : int(len(processing) * upper_percentile) + ] + latency_perc = latency[ + int(len(latency) * lower_percentile) : int(len(latency) * upper_percentile) + ] + + endpoint_metrics[-1]["proc_avg"] = round(np.mean(processing_perc), 2) + endpoint_metrics[-1]["latency_avg"] = round(np.mean(latency_perc), 2) + endpoint_metrics[-1]["latency_stdev"] = round(np.std(latency_perc), 2) + + if data_size: + endpoint_metrics[-1]["data_avg"] = round(np.mean(data_size), 2) + + endpoint_metrics = sorted(endpoint_metrics, key=lambda x: x["worker_id"]) + + return endpoint_metrics + + +def format_output(config, worker_metrics, endpoint_metrics, status=None): + """Format processed output to provide useful insights (image_classification) + + Args: + config (dict): Parsed configuration + sub_metrics (list(dict)): Metrics per worker node + endpoint_metrics (list(dict)): Metrics per endpoint + """ + if status is not None: + logging.error("This application does not support status reporting") + sys.exit() + + df1 = None + if config["mode"] == "cloud" or config["mode"] == "edge" and worker_metrics: + logging.info("------------------------------------") + logging.info("%s OUTPUT", config["mode"].upper()) + logging.info("------------------------------------") + df1 = pd.DataFrame(worker_metrics) + df1.rename( + columns={ + "total_time": "total_time (s)", + "comm_delay_avg": "delay_avg (ms)", + "comm_delay_stdev": "delay_stdev (ms)", + "proc_avg": "proc_time/data (ms)", + }, + inplace=True, + ) + df1_no_indices = df1.to_string(index=False) + logging.info("\n%s", df1_no_indices) + + if config["infrastructure"]["endpoint_nodes"]: + logging.info("------------------------------------") + logging.info("ENDPOINT OUTPUT") + logging.info("------------------------------------") + if config["mode"] == "cloud" or config["mode"] == "edge": + df2 = pd.DataFrame(endpoint_metrics) + df2.rename( + columns={ + "worker_id": "connected_to", + "total_time": "total_time (s)", + "proc_avg": "preproc_time/data (ms)", + "data_avg": "data_size_avg (kb)", + "latency_avg": "latency_avg (ms)", + "latency_stdev": "latency_stdev (ms)", + }, + inplace=True, + ) + else: + df2 = pd.DataFrame( + endpoint_metrics, + columns=[ + "worker_id", + "total_time", + "proc_avg", + "latency_avg", + "latency_stdev", + ], + ) + df2.rename( + columns={ + "worker_id": "endpoint_id", + "total_time": "total_time (s)", + "proc_avg": "proc_time/data (ms)", + "latency_avg": "latency_avg (ms)", + "latency_stdev": "latency_stdev (ms)", + }, + inplace=True, + ) + + df2_no_indices = df2.to_string(index=False) + logging.info("\n%s", df2_no_indices) + + # Print ouput in csv format + if config["mode"] == "cloud" or config["mode"] == "edge" and worker_metrics: + logging.debug("Output in csv format\n%s\n%s", repr(df1.to_csv()), repr(df2.to_csv())) + else: + logging.debug("Output in csv format\n%s", repr(df2.to_csv())) diff --git a/configuration/bench_cloud_4g.cfg b/configuration/bench_cloud_4g.cfg new file mode 100644 index 00000000..87f17526 --- /dev/null +++ b/configuration/bench_cloud_4g.cfg @@ -0,0 +1,26 @@ +# Benchmark a cloud deployment +# Using 2 cloud (manager + worker) and 1 endpoint node +[infrastructure] +provider = qemu + +cloud_nodes = 2 +cloud_cores = 3 +cloud_memory = 24 +cloud_quota = 1.0 + +endpoint_nodes = 1 +endpoint_cores = 4 +endpoint_memory = 1 +endpoint_quota = 0.5 + +network_emulation = True +wireless_network_preset = 4g_us_verizon_mahimahi + +cloud_location = eu_central_1 +edge_location = aws_vodafone + +[benchmark] +resource_manager = kubernetes + +application = text_translation +frequency = 1 diff --git a/configuration/bench_cloud.cfg b/configuration/bench_cloud_4g_10.cfg similarity index 74% rename from configuration/bench_cloud.cfg rename to configuration/bench_cloud_4g_10.cfg index f859b3ea..28821173 100644 --- a/configuration/bench_cloud.cfg +++ b/configuration/bench_cloud_4g_10.cfg @@ -9,12 +9,15 @@ cloud_memory = 4 cloud_quota = 1.0 endpoint_nodes = 1 -endpoint_cores = 1 +endpoint_cores = 4 endpoint_memory = 1 endpoint_quota = 0.5 network_emulation = True -wireless_network_preset = 4g +wireless_network_preset = 4g_us_verizon_mahimahi + +cloud_location = eu_central_1 +edge_location = aws_vodafone [benchmark] resource_manager = kubernetes diff --git a/configuration/bench_cloud_4g_20.cfg b/configuration/bench_cloud_4g_20.cfg new file mode 100644 index 00000000..4e48e4c6 --- /dev/null +++ b/configuration/bench_cloud_4g_20.cfg @@ -0,0 +1,26 @@ +# Benchmark a cloud deployment +# Using 2 cloud (manager + worker) and 1 endpoint node +[infrastructure] +provider = qemu + +cloud_nodes = 2 +cloud_cores = 4 +cloud_memory = 4 +cloud_quota = 1.0 + +endpoint_nodes = 1 +endpoint_cores = 4 +endpoint_memory = 1 +endpoint_quota = 0.5 + +network_emulation = True +wireless_network_preset = 4g_us_verizon_mahimahi + +cloud_location = eu_central_1 +edge_location = aws_vodafone + +[benchmark] +resource_manager = kubernetes + +application = image_classification +frequency = 10 diff --git a/configuration/bench_cloud_4g_5.cfg b/configuration/bench_cloud_4g_5.cfg new file mode 100644 index 00000000..ba149fea --- /dev/null +++ b/configuration/bench_cloud_4g_5.cfg @@ -0,0 +1,26 @@ +# Benchmark a cloud deployment +# Using 2 cloud (manager + worker) and 1 endpoint node +[infrastructure] +provider = qemu + +cloud_nodes = 2 +cloud_cores = 4 +cloud_memory = 4 +cloud_quota = 1.0 + +endpoint_nodes = 1 +endpoint_cores = 4 +endpoint_memory = 1 +endpoint_quota = 0.5 + +network_emulation = True +wireless_network_preset = 4g_us_verizon_mahimahi + +cloud_location = eu_central_1 +edge_location = aws_vodafone + +[benchmark] +resource_manager = kubernetes + +application = image_classification +frequency = 2 diff --git a/configuration/bench_cloud_5g_10.cfg b/configuration/bench_cloud_5g_10.cfg new file mode 100644 index 00000000..a67abb39 --- /dev/null +++ b/configuration/bench_cloud_5g_10.cfg @@ -0,0 +1,26 @@ +# Benchmark a cloud deployment +# Using 2 cloud (manager + worker) and 1 endpoint node +[infrastructure] +provider = qemu + +cloud_nodes = 2 +cloud_cores = 64 +cloud_memory = 64 +cloud_quota = 1.0 + +endpoint_nodes = 1 +endpoint_cores = 4 +endpoint_memory = 1 +endpoint_quota = 0.5 + +network_emulation = True +wireless_network_preset = 5g_nl_kpn_mahimahi + +cloud_location = eu_central_1 +edge_location = aws_vodafone + +[benchmark] +resource_manager = kubernetes + +application = image_classification +frequency = 10 diff --git a/configuration/bench_cloud_5g_100.cfg b/configuration/bench_cloud_5g_100.cfg new file mode 100644 index 00000000..ee0e74fd --- /dev/null +++ b/configuration/bench_cloud_5g_100.cfg @@ -0,0 +1,26 @@ +# Benchmark a cloud deployment +# Using 2 cloud (manager + worker) and 1 endpoint node +[infrastructure] +provider = qemu + +cloud_nodes = 2 +cloud_cores = 20 +cloud_memory = 32 +cloud_quota = 1.0 + +endpoint_nodes = 1 +endpoint_cores = 4 +endpoint_memory = 1 +endpoint_quota = 0.5 + +network_emulation = True +wireless_network_preset = 5g_nl_kpn_mahimahi + +cloud_location = eu_central_1 +edge_location = aws_vodafone + +[benchmark] +resource_manager = kubernetes + +application = image_classification +frequency = 70 diff --git a/configuration/bench_cloud_5g_20.cfg b/configuration/bench_cloud_5g_20.cfg new file mode 100644 index 00000000..c25aab4a --- /dev/null +++ b/configuration/bench_cloud_5g_20.cfg @@ -0,0 +1,26 @@ +# Benchmark a cloud deployment +# Using 2 cloud (manager + worker) and 1 endpoint node +[infrastructure] +provider = qemu + +cloud_nodes = 2 +cloud_cores = 64 +cloud_memory = 64 +cloud_quota = 1.0 + +endpoint_nodes = 1 +endpoint_cores = 4 +endpoint_memory = 1 +endpoint_quota = 0.5 + +network_emulation = True +wireless_network_preset = 5g_nl_kpn_mahimahi + +cloud_location = eu_central_1 +edge_location = aws_vodafone + +[benchmark] +resource_manager = kubernetes + +application = image_classification +frequency = 20 diff --git a/configuration/bench_cloud_5g_50.cfg b/configuration/bench_cloud_5g_50.cfg new file mode 100644 index 00000000..a0305a36 --- /dev/null +++ b/configuration/bench_cloud_5g_50.cfg @@ -0,0 +1,26 @@ +# Benchmark a cloud deployment +# Using 2 cloud (manager + worker) and 1 endpoint node +[infrastructure] +provider = qemu + +cloud_nodes = 2 +cloud_cores = 64 +cloud_memory = 64 +cloud_quota = 1.0 + +endpoint_nodes = 1 +endpoint_cores = 4 +endpoint_memory = 1 +endpoint_quota = 0.5 + +network_emulation = True +wireless_network_preset = 5g_nl_kpn_mahimahi + +cloud_location = eu_central_1 +edge_location = aws_vodafone + +[benchmark] +resource_manager = kubernetes + +application = image_classification +frequency = 50 diff --git a/configuration/bench_cloud_edvo.cfg b/configuration/bench_cloud_edvo.cfg new file mode 100644 index 00000000..0ea2d252 --- /dev/null +++ b/configuration/bench_cloud_edvo.cfg @@ -0,0 +1,26 @@ +# Benchmark a cloud deployment +# Using 2 cloud (manager + worker) and 1 endpoint node +[infrastructure] +provider = qemu + +cloud_nodes = 2 +cloud_cores = 4 +cloud_memory = 4 +cloud_quota = 1.0 + +endpoint_nodes = 1 +endpoint_cores = 4 +endpoint_memory = 1 +endpoint_quota = 1.0 + +network_emulation = True +wireless_network_preset = evdo_us_verizon_mahimahi + +cloud_location = eu_central_1 +edge_location = aws_vodafone + +[benchmark] +resource_manager = kubernetes + +application = image_classification +frequency = 1 diff --git a/configuration/bench_edge_4g.cfg b/configuration/bench_edge_4g.cfg new file mode 100644 index 00000000..22d6c705 --- /dev/null +++ b/configuration/bench_edge_4g.cfg @@ -0,0 +1,32 @@ + +# Benchmark an edge deployment +# Using 1 cloud (manager), 1 edge, and 1 endpoint node +[infrastructure] +provider = qemu +cloud_nodes = 1 +cloud_cores = 4 +cloud_memory = 4 +cloud_quota = 1.0 + +edge_nodes = 1 +edge_cores = 2 +edge_memory = 2 +edge_quota = 0.75 + +endpoint_nodes = 1 +endpoint_cores = 1 +endpoint_memory = 1 +endpoint_quota = 0.5 + +network_emulation = True +wireless_network_preset = 4g_us_verizon_mahimahi + +cloud_location = eu_central_1 +edge_location = aws_vodafone + +[benchmark] +resource_manager = kubeedge +kube_version = v1.27.0 + +application = image_classification +frequency = 10 diff --git a/configuration/bench_edge.cfg b/configuration/bench_edge_5g.cfg similarity index 75% rename from configuration/bench_edge.cfg rename to configuration/bench_edge_5g.cfg index cd29fd88..5cb504f5 100644 --- a/configuration/bench_edge.cfg +++ b/configuration/bench_edge_5g.cfg @@ -3,7 +3,6 @@ # Using 1 cloud (manager), 1 edge, and 1 endpoint node [infrastructure] provider = qemu - cloud_nodes = 1 cloud_cores = 4 cloud_memory = 4 @@ -20,10 +19,14 @@ endpoint_memory = 1 endpoint_quota = 0.5 network_emulation = True -wireless_network_preset = 4g +wireless_network_preset = 5g_nl_kpn_mahimahi + +cloud_location = eu_central_1 +edge_location = aws_vodafone [benchmark] resource_manager = kubeedge +kube_version = v1.27.0 application = image_classification -frequency = 5 +frequency = 10 diff --git a/configuration/bench_edge_edvo.cfg b/configuration/bench_edge_edvo.cfg new file mode 100644 index 00000000..973fa9c7 --- /dev/null +++ b/configuration/bench_edge_edvo.cfg @@ -0,0 +1,32 @@ + +# Benchmark an edge deployment +# Using 1 cloud (manager), 1 edge, and 1 endpoint node +[infrastructure] +provider = qemu +cloud_nodes = 1 +cloud_cores = 4 +cloud_memory = 4 +cloud_quota = 1.0 + +edge_nodes = 1 +edge_cores = 2 +edge_memory = 2 +edge_quota = 0.75 + +endpoint_nodes = 1 +endpoint_cores = 1 +endpoint_memory = 1 +endpoint_quota = 0.5 + +network_emulation = True +wireless_network_preset = evdo_us_verizon_mahimahi + +cloud_location = eu_central_1 +edge_location = aws_vodafone + +[benchmark] +resource_manager = kubeedge +kube_version = v1.27.0 + +application = image_classification +frequency = 10 diff --git a/configuration/infra_only.cfg b/configuration/infra_only.cfg index 9896effc..17697d1b 100644 --- a/configuration/infra_only.cfg +++ b/configuration/infra_only.cfg @@ -5,16 +5,31 @@ provider = qemu infra_only = True cloud_nodes = 1 -cloud_cores = 4 -cloud_memory = 4 +cloud_cores = 16 +cloud_memory = 32 cloud_quota = 1.0 edge_nodes = 1 -edge_cores = 2 -edge_memory = 2 -edge_quota = 0.75 +edge_cores = 8 +edge_memory = 16 +edge_quota = 1.0 endpoint_nodes = 1 -endpoint_cores = 1 -endpoint_memory = 1 -endpoint_quota = 0.5 \ No newline at end of file +endpoint_cores = 4 +endpoint_memory = 4 +endpoint_quota = 1.0 + +# Found in https://github.com/atlarge-research/continuum/blob/main/infrastructure/network.py +# Currently, only uses static Linux TC +# You eventually want your solution to work here as well, e.g., by setting mahimahi_4g you enable mahimahi 4g network emulation +# network_emulation = True +wireless_network_preset = 4g_mahimati + +# Continuum has a built-in netperf benchmark +# - Installation code: https://github.com/atlarge-research/continuum/blob/main/infrastructure/qemu/infrastructure/netperf.yml +# - Execution code: https://github.com/atlarge-research/continuum/blob/main/infrastructure/network.py#L342 +# - More info on netperf: https://linux.die.net/man/1/netperf +# netperf = False + +middleIP = 192 +middleIP_base = 193 diff --git a/infrastructure/ansible.py b/infrastructure/ansible.py index e807a989..ceb36456 100644 --- a/infrastructure/ansible.py +++ b/infrastructure/ansible.py @@ -420,4 +420,4 @@ def copy(config, machines): sys.exit() elif output: logging.error("".join(output)) - sys.exit() + sys.exit() \ No newline at end of file diff --git a/infrastructure/infrastructure.py b/infrastructure/infrastructure.py index 1e1c9c1f..c1087089 100644 --- a/infrastructure/infrastructure.py +++ b/infrastructure/infrastructure.py @@ -336,6 +336,15 @@ def create_continuum_dir(config, machines): commands.append(command) + if machine.is_local: + print("We are copying") + command = ( + "cp -r mahimahi %s/.continuum/mahimahi" + % ((config["infrastructure"]["base_path"],)) + ) + + commands.append(command) + results = machines[0].process(config, commands, shell=True) for (output, error), command in zip(results, commands): diff --git a/infrastructure/machine.py b/infrastructure/machine.py index c7de59c4..bd56d3f5 100644 --- a/infrastructure/machine.py +++ b/infrastructure/machine.py @@ -479,4 +479,4 @@ def print_schedule(machines): machine.endpoints, ) - logging.info("-" * 78) + logging.info("-" * 78) \ No newline at end of file diff --git a/infrastructure/network.py b/infrastructure/network.py index 7bfef48d..cd73fe79 100644 --- a/infrastructure/network.py +++ b/infrastructure/network.py @@ -45,24 +45,6 @@ def generate_tc_commands(config, values, ips, disk): ] ) - # Set throughput - commands.append( - [ - "sudo", - "tc", - "class", - "add", - "dev", - network, - "parent", - "1:", - "classid", - "1:%i" % (disk), - "htb", - "rate", - "%smbit" % (throughput), - ] - ) # Filter for specific IPs for ip in ips: @@ -115,10 +97,100 @@ def generate_tc_commands(config, values, ips, disk): return commands +def generate_mahimati_command(endpoint_ip, targets, uplink, downlink): + """Generate Mahimati command + Executing this command puts application into containerized Mahimati shell. + Every command executed with the shell will have throughput and latecies + corresponding to the provided trace and progation delay + + Args: + config (dict): Parsed configuration + propagation_delay (int): Propagation delay on the link measured in ms + trace (str): Saturate-formatted trace file + + Returns: + str: mahimati command + """ + # the path for verizon let's say is /home/mahimahi/traces/Verizon-LTE-driving.up + if not uplink or not downlink: + return [[]] + + commands = [] + + commands.append([ + "export", + "SRC_TO_IGNORE=10.0.0.1" + ]) + + commands.append([ + "export", + "DEST_TO_IGNORE=10.0.0.1" + ]) + + commands.append([ + "(", + "mm-link", + f"--uplink-log=uplink.log", + f"--downlink-log=downlink.log", + uplink, + downlink, + "sudo", + f"/home/mahimahi/setup_container.sh {endpoint_ip} {' '.join([target for target in targets])}", + ">output_mahi.txt", + "2>&1", + "&", + ")" + ]) + + + commands.append(["sleep", "10"]) + + commands.append([ + "(", + "sudo", + f"/home/mahimahi/setup_traffic.sh {endpoint_ip} {" ".join([target for target in targets])}", + ">output_reroute.txt", + "2>&1", + "&", + ")" + ]) + + + return commands + +def mahimahi_values(config): + """ + Set values used for for MahiMahi + In case non-mahimahi preset is used, function returns None + + Args: + config (dict): Parsed configuration + + Returns: + 2x list(str): Path to the MahiMahi traces + """ + if config["infrastructure"]["wireless_network_preset"] == '4g_us_verizon_mahimahi': + return ["/home/mahimahi/traces/Verizon-LTE-driving.up", "/home/mahimahi/traces/Verizon-LTE-driving.down",] + + elif config["infrastructure"]["wireless_network_preset"] == '5g_nl_kpn_mahimahi': + return ["/home/mahimahi/traces/KPN_5G.up", "/home/mahimahi/traces/KPN_5G.down",] + + elif config["infrastructure"]["wireless_network_preset"] == 'lte_nl_kpn_mahimahi': + return ["/home/mahimahi/traces/KPN_4G.up", "/home/mahimahi/traces/KPN_4G.down",] + + elif config["infrastructure"]["wireless_network_preset"] == '5g_obstacled_nl_kpn_mahimahi': + return ["/home/mahimahi/traces/KPN_5G_low_band.up", "/home/mahimahi/traces/KPN_5G_low_band.down",] + + elif config["infrastructure"]["wireless_network_preset"] == 'evdo_us_verizon_mahimahi': + return ["/home/mahimahi/traces/Verizon-EVDO-driving.up", "/home/mahimahi/traces/Verizon-EVDO-driving.down",] + + return [None, None] def tc_values(config): """Set latency/throughput values to be used for tc + The MahiMahi keys have the following format: standard_location_provider + Args: config (dict): Parsed configuration @@ -130,6 +202,22 @@ def tc_values(config): edge = [7.5, 2.5, 1000] # Between edge nodes (wired) cloud_edge = [7.5, 2.5, 1000] # Between cloud and edge (wired) + if config["infrastructure"]["wireless_network_preset"] == '4g_us_verizon_mahimahi' or config["infrastructure"]["wireless_network_preset"] == 'evdo_us_verizon_mahimahi' or config["infrastructure"]["wireless_network_preset"] == '5g_nl_kpn_mahimahi' or config["infrastructure"]["wireless_network_preset"] == '6g_nl_kpn_mahimahi': + cloud_endpoint = [0, 0, 1000] + edge_endpoint = [0, 0, 1000] + + if config["infrastructure"]["edge_location"] == "aws_vodafone_edge": + edge_endpoint = [0.07, 0.01, 10000] + elif config["infrastructure"]["edge_location"] == "base_edge": + edge_endpoint = [0, 0, 1000] + + if config["infrastructure"]["cloud_location"] == "eu_central_1": + cloud_endpoint = [3.125, 0.01, 10000] + elif config["infrastructure"]["cloud_location"] == "us_east_1": + cloud_endpoint = [45, 0.01, 10000] + elif config["infrastructure"]["cloud_location"] == "eu_west_3": + cloud_endpoint = [7.5, 0.01, 10000] + # Set values based on 4g/5g preset (if the user didn't set anything, 4g is default) if config["infrastructure"]["wireless_network_preset"] == "4g": cloud_endpoint = [45, 5, 7.21] @@ -176,11 +264,18 @@ def tc_values(config): def start(config, machines): """Set network latency/throughput between VMs to emulate edge continuum networking + Whenever the network emulation is set to MahiMahi (name should end with _mahimahi), + mobile network emulation is a responsibility of MahiMahi and the core network emulation + is the responsibility of tc. + + Otherwise tc performs end-to-end network emulation + Args: config (dict): Parsed configuration machines (list(Machine object)): List of machine objects representing physical machines """ logging.info("Add network latency between VMs") + uplink, downlink = mahimahi_values(config) cloud, edge, cloud_edge, cloud_endpoint, edge_endpoint = tc_values(config) commands = [] @@ -236,7 +331,7 @@ def start(config, machines): commands.append(command) # For endpoint nodes (no endpoint->endpoint connection possible) - for _ in config["endpoint_ips_internal"]: + for endpoint_ip in config["endpoint_ips_internal"]: command = [] disk = 1 @@ -251,11 +346,15 @@ def start(config, machines): if targets: command += generate_tc_commands(config, edge_endpoint, targets, disk) - commands.append(command) + targets = config["control_ips_internal"] + config["cloud_ips_internal"] + config["edge_ips_internal"] + if targets: + command += generate_mahimati_command(endpoint_ip, targets, uplink, downlink) + commands.append(command) # Generate all TC commands and the ssh addresses where they need to be executed commands_final = [] sshs = [] + for ssh, command in zip( config["cloud_ssh"] + config["edge_ssh"] + config["endpoint_ssh"], commands ): @@ -273,6 +372,7 @@ def start(config, machines): # Execute TC command in parallel if commands_final: + print(commands_final, sshs) results = machines[0].process(config, commands_final, shell=True, ssh=sshs) # Check output of TC commands diff --git a/infrastructure/qemu/infrastructure/base_cloud_start.yml b/infrastructure/qemu/infrastructure/base_cloud_start.yml index 5cf4ccea..0b86c6c4 100644 --- a/infrastructure/qemu/infrastructure/base_cloud_start.yml +++ b/infrastructure/qemu/infrastructure/base_cloud_start.yml @@ -4,8 +4,6 @@ - name: Remove old base image from {{ base_path }}/.continuum/images shell: | rm -f "{{ base_path }}/.continuum/images/{{ base_cloud }}.qcow2" - args: - warn: false - name: Create base image command: > diff --git a/infrastructure/qemu/infrastructure/base_edge_start.yml b/infrastructure/qemu/infrastructure/base_edge_start.yml index d6a89896..45bd90d7 100644 --- a/infrastructure/qemu/infrastructure/base_edge_start.yml +++ b/infrastructure/qemu/infrastructure/base_edge_start.yml @@ -4,8 +4,6 @@ - name: Remove old base image from {{ base_path }}/.continuum/images shell: | rm -f "{{ base_path }}/.continuum/images/{{ base_edge }}.qcow2" - args: - warn: false - name: Create base image command: > diff --git a/infrastructure/qemu/infrastructure/base_endpoint_start.yml b/infrastructure/qemu/infrastructure/base_endpoint_start.yml index 197c76b7..8e5edde9 100644 --- a/infrastructure/qemu/infrastructure/base_endpoint_start.yml +++ b/infrastructure/qemu/infrastructure/base_endpoint_start.yml @@ -4,8 +4,6 @@ - name: Remove old base image from {{ base_path }}/.continuum/images shell: | rm -f "{{ base_path }}/.continuum/images/{{ base_endpoint }}.qcow2" - args: - warn: false - name: Create base image command: > diff --git a/infrastructure/qemu/infrastructure/base_start.yml b/infrastructure/qemu/infrastructure/base_start.yml index eb97293c..06048a2f 100644 --- a/infrastructure/qemu/infrastructure/base_start.yml +++ b/infrastructure/qemu/infrastructure/base_start.yml @@ -4,8 +4,6 @@ - name: Remove old base image from {{ base_path }}/.continuum/images shell: | rm -f "{{ base_path }}/.continuum/images/{{ base }}.qcow2" - args: - warn: false - name: Create base image command: > @@ -17,7 +15,7 @@ name: cloud-image-utils state: present when: ansible_os_family == "Debian" - + - name: Install cloudinit requirements package: name: cloud-utils diff --git a/infrastructure/qemu/infrastructure/cloud_start.yml b/infrastructure/qemu/infrastructure/cloud_start.yml index eb3f9c98..36bd8993 100644 --- a/infrastructure/qemu/infrastructure/cloud_start.yml +++ b/infrastructure/qemu/infrastructure/cloud_start.yml @@ -14,7 +14,7 @@ for i in $(seq "{{ cloud_start }}" "{{ cloud_end }}"); do qemu-img create -f qcow2 -F qcow2 \ -b "{{ base_path }}/.continuum/images/{{ base_cloud }}.qcow2" \ - "{{ base_path }}/.continuum/images/cloud${i}_{{ username }}.qcow2" + "{{ base_path }}/.continuum/images/cloud${i}_{{ username }}.qcow2" 50G done - name: Add cloudinit disk for cloud controller diff --git a/infrastructure/qemu/infrastructure/mahimati.yml b/infrastructure/qemu/infrastructure/mahimati.yml new file mode 100644 index 00000000..71585e27 --- /dev/null +++ b/infrastructure/qemu/infrastructure/mahimati.yml @@ -0,0 +1,147 @@ +--- +- hosts: base + become: true + tasks: + - name: Copy modded MahiMahi folder + copy: + src: "{{ continuum_home }}/mahimahi/" + dest: /home/mahimahi + + - name: Install protobuf-compiler + apt: + name: protobuf-compiler + state: present + + - name: Install libprotobuf-dev + apt: + name: libprotobuf-dev + state: present + + - name: Install autotools-dev + apt: + name: autotools-dev + state: present + + - name: Install dh-autoreconf + apt: + name: dh-autoreconf + state: present + + - name: Install iptables + apt: + name: iptables + state: present + + - name: Install pkg-config + apt: + name: pkg-config + state: present + + - name: Install dnsmasq-base + apt: + name: dnsmasq-base + state: present + + - name: Install apache2-bin + apt: + name: apache2-bin + state: present + + - name: Install debhelper + apt: + name: debhelper + state: present + + - name: Install libssl-dev + apt: + name: libssl-dev + state: present + + - name: Install ssl-cert + apt: + name: ssl-cert + state: present + + - name: Install libxcb-present-dev + apt: + name: libxcb-present-dev + state: present + + - name: Install libcairo2-dev + apt: + name: libcairo2-dev + state: present + + - name: Install libpango1.0-dev + apt: + name: libpango1.0-dev + state: present + + - name: Install apache2-dev + apt: + name: apache2-dev + state: present + + - name: Install GCC 7 + apt: + name: g++-7 + state: present + become: yes + + - name: Set up GCC 7 as an alternative + alternatives: + name: gcc + path: /usr/bin/gcc-7 + link: /usr/bin/gcc + priority: 60 + slaves: + - name: g++ + path: /usr/bin/g++-7 + link: /usr/bin/g++ + become: yes + + - name: Make autogen.sh executable + file: + path: /home/mahimahi/autogen.sh + mode: '0755' + state: file + become: yes + + + - name: Execute autogen.sh script + command: ./autogen.sh + args: + chdir: /home/mahimahi + + - name: Execute configure script + command: ./configure + args: + chdir: /home/mahimahi + + - name: Make + command: make + args: + chdir: /home/mahimahi + + - name: Make install + command: make install + args: + chdir: /home/mahimahi + + + - name: Ensure setup_container.sh is executable + ansible.builtin.file: + path: /home/mahimahi/setup_container.sh + mode: '0755' + + - name: Ensure setup_traffic.sh is executable + ansible.builtin.file: + path: /home/mahimahi/setup_traffic.sh + mode: '0755' + + - name: Enable IPv4 forwarding + sysctl: + name: net.ipv4.ip_forward + value: '1' + state: present + reload: yes diff --git a/infrastructure/qemu/infrastructure/os.yml b/infrastructure/qemu/infrastructure/os.yml index 24b38905..f40fd4e9 100644 --- a/infrastructure/qemu/infrastructure/os.yml +++ b/infrastructure/qemu/infrastructure/os.yml @@ -18,7 +18,7 @@ ansible_python_interpreter: "/usr/bin/python" when: ansible_os_family == "RedHat" - - name: Get ubuntu source image + - name: Get Ubuntu 22.04 source image get_url: url: https://cloud-images.ubuntu.com/releases/focal/release/ubuntu-20.04-server-cloudimg-amd64.img dest: "{{ base_path }}/.continuum/images/ubuntu2004.img" @@ -37,3 +37,4 @@ - name: Resize image to full format command: qemu-img resize "{{ base_path }}/.continuum/images/ubuntu2004.qcow2" +8G + diff --git a/infrastructure/qemu/infrastructure/remove.yml b/infrastructure/qemu/infrastructure/remove.yml index 7a462501..82ce1552 100644 --- a/infrastructure/qemu/infrastructure/remove.yml +++ b/infrastructure/qemu/infrastructure/remove.yml @@ -4,33 +4,23 @@ - name: Remove existing base image user data (if any) shell: | rm -f "{{ base_path }}/.continuum/images/user_data_base*.img" - args: - warn: false - name: Remove existing cloud controller images (if any) shell: | rm -f "{{ base_path }}/.continuum/images/cloud_controller*.qcow2" rm -f "{{ base_path }}/.continuum/images/user_data_cloud_controller*.img" - args: - warn: false - name: Remove existing cloud images (if any) shell: | rm -f "{{ base_path }}/.continuum/images/cloud*.qcow2" rm -f "{{ base_path }}/.continuum/images/user_data_cloud*.img" - args: - warn: false - name: Remove existing edge images (if any) shell: | rm -f "{{ base_path }}/.continuum/images/edge*.qcow2" rm -f "{{ base_path }}/.continuum/images/user_data_edge*.img" - args: - warn: false - name: Remove existing endpoint images (if any) shell: | rm -f "{{ base_path }}/.continuum/images/endpoint*.qcow2" rm -f "{{ base_path }}/.continuum/images/user_data_endpoint*.img" - args: - warn: false diff --git a/infrastructure/qemu/qemu.py b/infrastructure/qemu/qemu.py index 971bea17..44f2ba05 100644 --- a/infrastructure/qemu/qemu.py +++ b/infrastructure/qemu/qemu.py @@ -533,6 +533,19 @@ def base_image(config, machines): ] ansible.check_output(machines[0].process(config, command)[0]) + # Install mahimati at the endpoint + command = [ + "ansible-playbook", + "-i", + os.path.join(config["infrastructure"]["base_path"], ".continuum/inventory_vms"), + os.path.join( + config["infrastructure"]["base_path"], + ".continuum/infrastructure/mahimati.yml", + ), + ] + + ansible.check_output(machines[0].process(config, command)[0]) + # Install docker containers if required if not (config["infrastructure"]["infra_only"] or config["benchmark"]["resource_manager_only"]): # Kubernetes/KubeEdge don't need docker images on the cloud/edge nodes diff --git a/input/configuration/configuration_parser.py b/input/configuration/configuration_parser.py index 1070eb47..8013bc55 100644 --- a/input/configuration/configuration_parser.py +++ b/input/configuration/configuration_parser.py @@ -379,7 +379,9 @@ def parse_infrastructure_network(parser, input_config, config): settings = [ # Option | Type | Condition | Mandatory | Default - ["wireless_network_preset", str, lambda x: x in ["4g", "5g"], False, "4g"], + ["wireless_network_preset", str, lambda x: x in ["4g", "5g", "4g_us_verizon_mahimahi", "evdo_us_verizon_mahimahi", "5g_nl_kpn_mahimahi", "5g_obstacled_nl_kpn_mahimahi", "lte_nl_kpn_mahimahi"], False, "4g"], + ["edge_location", str, lambda x: x in ["aws_vodafone_edge", "base_edge"], False, ""], + ["cloud_location", str, lambda x: x in ["eu_central_1", "us_east_1", "eu_west_3"], False, ""], ["cloud_latency_avg", float, lambda x: x >= 0.0, False, -1], ["cloud_latency_var", float, lambda x: x >= 0.0, False, -1], ["cloud_throughput", float, lambda x: x >= 1.0, False, -1], @@ -435,6 +437,7 @@ def parse_benchmark(parser, input_config, config): # Option | Type | Condition | Mandatory | Default ["resource_manager", str, lambda x: x in rms, True, None], ["resource_manager_only", bool, lambda x: x in [True, False], False, None], + ["kube_version", str, lambda x: x in ["v1.27.0"], False, None ], ["docker_pull", bool, lambda x: x in [True, False], False, None], ["application", str, lambda x: x in apps, False, None], ]