From aa422b547b9e9991f7a449da2a03fe7f7c714fb7 Mon Sep 17 00:00:00 2001 From: Carlos Date: Tue, 13 Sep 2022 15:19:53 +0100 Subject: [PATCH 1/4] Bump Conan (#269) * chore: bump code version * fix(conan): bump conan version to 1.52.0 --- .env | 4 ++-- .github/workflows/tests.yml | 10 +++++----- VERSION | 2 +- docker/faabric-base.dockerfile | 2 +- docker/faabric.dockerfile | 2 +- 5 files changed, 10 insertions(+), 10 deletions(-) diff --git a/.env b/.env index c4aed19d3..e14bc3897 100644 --- a/.env +++ b/.env @@ -1,4 +1,4 @@ -FAABRIC_VERSION=0.3.3 -FAABRIC_CLI_IMAGE=faasm/faabric:0.3.3 +FAABRIC_VERSION=0.3.4 +FAABRIC_CLI_IMAGE=faasm/faabric:0.3.4 COMPOSE_PROJECT_NAME=faabric-dev CONAN_CACHE_MOUNT_SOURCE=./conan-cache/ diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 24cd12a93..b2cc99531 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -16,7 +16,7 @@ jobs: if: github.event.pull_request.draft == false runs-on: ubuntu-latest container: - image: faasm/faabric:0.3.3 + image: faasm/faabric:0.3.4 defaults: run: working-directory: /code/faabric @@ -31,7 +31,7 @@ jobs: if: github.event.pull_request.draft == false runs-on: ubuntu-latest container: - image: faasm/faabric:0.3.3 + image: faasm/faabric:0.3.4 defaults: run: working-directory: /code/faabric @@ -47,7 +47,7 @@ jobs: if: github.event.pull_request.draft == false runs-on: ubuntu-latest container: - image: faasm/faabric:0.3.3 + image: faasm/faabric:0.3.4 defaults: run: working-directory: /code/faabric @@ -81,7 +81,7 @@ jobs: TSAN_OPTIONS: "verbosity=1:halt_on_error=1:suppressions=/code/faabric/thread-sanitizer-ignorelist.txt:history_size=7" UBSAN_OPTIONS: "print_stacktrace=1:halt_on_error=1" container: - image: faasm/faabric:0.3.3 + image: faasm/faabric:0.3.4 options: --privileged defaults: run: @@ -131,7 +131,7 @@ jobs: REDIS_QUEUE_HOST: redis REDIS_STATE_HOST: redis container: - image: faasm/faabric:0.3.3 + image: faasm/faabric:0.3.4 defaults: run: working-directory: /code/faabric diff --git a/VERSION b/VERSION index 1c09c74e2..42045acae 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -0.3.3 +0.3.4 diff --git a/docker/faabric-base.dockerfile b/docker/faabric-base.dockerfile index 2c1587b44..d757da399 100644 --- a/docker/faabric-base.dockerfile +++ b/docker/faabric-base.dockerfile @@ -49,7 +49,7 @@ RUN apt update -y && apt install -y \ sudo \ unzip -RUN curl -s -L -o /tmp/conan-latest.deb https://github.com/conan-io/conan/releases/download/1.48.1/conan-ubuntu-64.deb && sudo dpkg -i /tmp/conan-latest.deb && rm -f /tmp/conan-latest.deb +RUN curl -s -L -o /tmp/conan-latest.deb https://github.com/conan-io/conan/releases/download/1.52.0/conan-ubuntu-64.deb && sudo dpkg -i /tmp/conan-latest.deb && rm -f /tmp/conan-latest.deb # Update pip RUN pip install -U pip diff --git a/docker/faabric.dockerfile b/docker/faabric.dockerfile index 5c24b098f..f7784afcc 100644 --- a/docker/faabric.dockerfile +++ b/docker/faabric.dockerfile @@ -1,4 +1,4 @@ -FROM faasm/faabric-base:0.3.3 +FROM faasm/faabric-base:0.3.4 ARG FAABRIC_VERSION # faabic-base image is not re-built often, so tag may be behind From d7fce1da0a09b648e6afda672956c05f09c72652 Mon Sep 17 00:00:00 2001 From: Carlos Date: Mon, 3 Oct 2022 16:36:13 +0100 Subject: [PATCH 2/4] Homogenise `git` and `docker` interface (#272) * feat(tasks): add task to bump code version, modify docs accordingly, and bump code version * feat(docker): bump base image to 22.04 and re-factor to follow dockerfile style guidelines * feat(python): add create venv binary to be able to call it separately from docker build. in addition, make the distinction between bare metal venvs and container ones. also fix the issue with zsh paths * fix: refactor FAASM_DOCKER to FAABRIC_DOCKER after copy/paste * fix: correct venv path after another copy/paste error * fix(docker): update faabric dockerfile to inherit from latest base and enforce a reduced number of RUN layers * fix: self-review * fix: add inv wrapper for the tests * fix(gha): use the new venv structure in the tests * fix(python): also install setuptools and wheel with pip for faster pip installs * fix(docker): install llvm 13 from jammy's base repositories, only install llvm 10 from focal's ones * fix(git): add venv-bm to .gitignore * docker: re-factor tasks from container to docker and add iterable * dockerfile: bump back to 20.04 * fix: update comment in docker tasks * fix: nit in dockerfile * fix: update docs to use docker instead of container for tasks --- .env | 4 +-- .github/workflows/tests.yml | 28 +++++++++--------- .gitignore | 1 + VERSION | 2 +- bin/create_venv.sh | 33 +++++++++++++++++++++ bin/inv_wrapper.sh | 6 ++++ bin/workon.sh | 20 ++++--------- docker/faabric-base.dockerfile | 38 ++++++++++++++---------- docker/faabric.dockerfile | 26 ++++++++--------- docs/source/development.md | 30 ++++++++++++------- tasks/__init__.py | 4 +-- tasks/{container.py => docker.py} | 48 ++++++++++++++++--------------- tasks/git.py | 28 ++++++++++++++++++ 13 files changed, 174 insertions(+), 94 deletions(-) create mode 100755 bin/create_venv.sh create mode 100755 bin/inv_wrapper.sh rename tasks/{container.py => docker.py} (54%) diff --git a/.env b/.env index e14bc3897..adc90a62d 100644 --- a/.env +++ b/.env @@ -1,4 +1,4 @@ -FAABRIC_VERSION=0.3.4 -FAABRIC_CLI_IMAGE=faasm/faabric:0.3.4 +FAABRIC_VERSION=0.3.5 +FAABRIC_CLI_IMAGE=faasm/faabric:0.3.5 COMPOSE_PROJECT_NAME=faabric-dev CONAN_CACHE_MOUNT_SOURCE=./conan-cache/ diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index b2cc99531..78ab32090 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -16,7 +16,7 @@ jobs: if: github.event.pull_request.draft == false runs-on: ubuntu-latest container: - image: faasm/faabric:0.3.4 + image: faasm/faabric:0.3.5 defaults: run: working-directory: /code/faabric @@ -25,13 +25,13 @@ jobs: with: directory: /code/faabric - name: "Build dependencies to be shared by all runs" - run: inv dev.cmake -b Debug + run: ./bin/inv_wrapper.sh dev.cmake -b Debug docs: if: github.event.pull_request.draft == false runs-on: ubuntu-latest container: - image: faasm/faabric:0.3.4 + image: faasm/faabric:0.3.5 defaults: run: working-directory: /code/faabric @@ -41,13 +41,13 @@ jobs: - name: "Check out branch" run: git checkout --force ci-branch - name: "Build docs" - run: inv docs + run: ./bin/inv_wrapper.sh docs formatting: if: github.event.pull_request.draft == false runs-on: ubuntu-latest container: - image: faasm/faabric:0.3.4 + image: faasm/faabric:0.3.5 defaults: run: working-directory: /code/faabric @@ -81,7 +81,7 @@ jobs: TSAN_OPTIONS: "verbosity=1:halt_on_error=1:suppressions=/code/faabric/thread-sanitizer-ignorelist.txt:history_size=7" UBSAN_OPTIONS: "print_stacktrace=1:halt_on_error=1" container: - image: faasm/faabric:0.3.4 + image: faasm/faabric:0.3.5 options: --privileged defaults: run: @@ -96,9 +96,9 @@ jobs: - name: "Ping redis" run: redis-cli -h redis ping - name: "Run cmake for tests" - run: inv dev.cmake --build=Debug --sanitiser ${{ matrix.sanitiser }} + run: ./bin/inv_wrapper.sh dev.cmake --build=Debug --sanitiser ${{ matrix.sanitiser }} - name: "Build tests" - run: inv dev.cc faabric_tests + run: ./bin/inv_wrapper.sh dev.cc faabric_tests - name: "Run tests" run: ./bin/faabric_tests working-directory: /build/faabric/static @@ -131,7 +131,7 @@ jobs: REDIS_QUEUE_HOST: redis REDIS_STATE_HOST: redis container: - image: faasm/faabric:0.3.4 + image: faasm/faabric:0.3.5 defaults: run: working-directory: /code/faabric @@ -144,12 +144,12 @@ jobs: build-type: "release" directory: /code/faabric - name: "Run cmake shared" - run: inv dev.cmake --shared --build=Release + run: ./bin/inv_wrapper.sh dev.cmake --shared --build=Release - name: "Build Faabric shared library" - run: inv dev.cc faabric --shared + run: ./bin/inv_wrapper.sh dev.cc faabric --shared - name: "Install Faabric shared library" - run: inv dev.install faabric --shared + run: ./bin/inv_wrapper.sh dev.install faabric --shared - name: "Build examples" - run: inv examples + run: ./bin/inv_wrapper.sh examples - name: "Run example to check" - run: inv examples.execute check + run: ./bin/inv_wrapper.sh examples.execute check diff --git a/.gitignore b/.gitignore index 4d1d564ba..9766f84bd 100644 --- a/.gitignore +++ b/.gitignore @@ -108,4 +108,5 @@ dkms.conf __pycache__/ *.pyc venv/ +venv-bm/ *.egg-info/ diff --git a/VERSION b/VERSION index 42045acae..c2c0004f0 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -0.3.4 +0.3.5 diff --git a/bin/create_venv.sh b/bin/create_venv.sh new file mode 100755 index 000000000..a877bbeca --- /dev/null +++ b/bin/create_venv.sh @@ -0,0 +1,33 @@ +#!/bin/bash + +set -e + +THIS_DIR=$(dirname $(readlink -f $0)) +PROJ_ROOT=${THIS_DIR}/.. + +# Set different virtual environment paths so that these don't clash when +# mounting the code in a development container +VENV_PATH="undetected" +if [[ -z "$FAABRIC_DOCKER" ]]; then + VENV_PATH="${PROJ_ROOT}/venv-bm" +else + VENV_PATH="/code/faabric/venv" +fi + +PIP=${VENV_PATH}/bin/pip3 + +function pip_cmd { + source ${VENV_PATH}/bin/activate && ${PIP} "$@" +} + +pushd ${PROJ_ROOT} >> /dev/null + +if [ ! -d ${VENV_PATH} ]; then + python3 -m venv ${VENV_PATH} +fi + +pip_cmd install -U pip +pip_cmd install -U setuptools wheel +pip_cmd install -r requirements.txt + +popd >> /dev/null diff --git a/bin/inv_wrapper.sh b/bin/inv_wrapper.sh new file mode 100755 index 000000000..497d6a2c6 --- /dev/null +++ b/bin/inv_wrapper.sh @@ -0,0 +1,6 @@ +#!/bin/bash +set -e + +# Wrapper script for running invoke in virtual env + +source bin/workon.sh && inv $@ diff --git a/bin/workon.sh b/bin/workon.sh index ec0d79d13..51eb596fc 100755 --- a/bin/workon.sh +++ b/bin/workon.sh @@ -7,11 +7,9 @@ MODE="undetected" if [[ -z "$FAABRIC_DOCKER" ]]; then - THIS_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null 2>&1 && pwd )" - if [ "$(ps -o comm= -p $$)" = "zsh" ]; then - THIS_DIR="$( cd "$( dirname "${ZSH_ARGZERO}" )" >/dev/null 2>&1 && pwd )" - fi + THIS_DIR="$( cd "$( dirname "${BASH_SOURCE[0]:-${(%):-%x}}" )" >/dev/null 2>&1 && pwd )" PROJ_ROOT="${THIS_DIR}/.." + VENV_PATH="${PROJ_ROOT}/venv-bm" export FAABRIC_BUILD_DIR="${PROJ_ROOT}/build" # Normal terminal @@ -19,6 +17,7 @@ if [[ -z "$FAABRIC_DOCKER" ]]; then else # Running inside the container, we know the project root PROJ_ROOT="/code/faabric" + VENV_PATH="${PROJ_ROOT}/venv" export FAABRIC_BUILD_DIR="/build/faabric" # Use containerised redis @@ -33,19 +32,12 @@ pushd ${PROJ_ROOT}>>/dev/null # Virtualenv # ---------------------------- -if [ ! -d "venv" ]; then - python3 -m venv venv - ( - source venv/bin/activate - pip install -U pip - pip install -U setuptools - pip install -U wheel - pip install -r requirements.txt - ) +if [ ! -d ${VENV_PATH} ]; then + ${PROJ_ROOT}/bin/create_venv.sh fi export VIRTUAL_ENV_DISABLE_PROMPT=1 -source venv/bin/activate +source ${VENV_PATH}/bin/activate # ---------------------------- # Invoke tab-completion diff --git a/docker/faabric-base.dockerfile b/docker/faabric-base.dockerfile index d757da399..afcbbef58 100644 --- a/docker/faabric-base.dockerfile +++ b/docker/faabric-base.dockerfile @@ -1,15 +1,22 @@ FROM ubuntu:20.04 -RUN apt-get update -RUN apt-get install -y software-properties-common gpg wget curl -RUN wget -O - https://apt.llvm.org/llvm-snapshot.gpg.key|apt-key add - -RUN wget -O - https://apt.kitware.com/keys/kitware-archive-latest.asc | apt-key add - -RUN add-apt-repository -y -n "deb http://apt.llvm.org/focal/ llvm-toolchain-focal-10 main" -RUN add-apt-repository -y -n "deb http://apt.llvm.org/focal/ llvm-toolchain-focal-13 main" -RUN add-apt-repository -y -n "deb https://apt.kitware.com/ubuntu/ focal main" -RUN add-apt-repository -y -n ppa:ubuntu-toolchain-r/test +# Configure APT repositories +RUN apt update \ + && apt upgrade -y \ + && apt install -y \ + curl \ + gpg \ + software-properties-common \ + wget \ + && wget -O - https://apt.llvm.org/llvm-snapshot.gpg.key|apt-key add - \ + && wget -O - https://apt.kitware.com/keys/kitware-archive-latest.asc | apt-key add - \ + && add-apt-repository -y -n "deb http://apt.llvm.org/focal/ llvm-toolchain-focal-10 main" \ + && add-apt-repository -y -n "deb http://apt.llvm.org/focal/ llvm-toolchain-focal-13 main" \ + && add-apt-repository -y -n "deb https://apt.kitware.com/ubuntu/ focal main" \ + && add-apt-repository -y -n ppa:ubuntu-toolchain-r/test -RUN apt update -y && apt install -y \ +# Install APT packages +RUN apt update && apt install -y \ autoconf \ automake \ build-essential \ @@ -49,11 +56,12 @@ RUN apt update -y && apt install -y \ sudo \ unzip -RUN curl -s -L -o /tmp/conan-latest.deb https://github.com/conan-io/conan/releases/download/1.52.0/conan-ubuntu-64.deb && sudo dpkg -i /tmp/conan-latest.deb && rm -f /tmp/conan-latest.deb - -# Update pip -RUN pip install -U pip +# Install Conan +RUN curl -s -L -o \ + /tmp/conan-latest.deb https://github.com/conan-io/conan/releases/download/1.52.0/conan-ubuntu-64.deb \ + && sudo dpkg -i /tmp/conan-latest.deb \ + && rm -f /tmp/conan-latest.deb # Tidy up -RUN apt-get clean autoclean -RUN apt-get autoremove +RUN apt clean autoclean -y \ + && apt autoremove -y diff --git a/docker/faabric.dockerfile b/docker/faabric.dockerfile index f7784afcc..8464ebf71 100644 --- a/docker/faabric.dockerfile +++ b/docker/faabric.dockerfile @@ -1,7 +1,8 @@ -FROM faasm/faabric-base:0.3.4 +FROM faasm/faabric-base:0.3.5 ARG FAABRIC_VERSION # faabic-base image is not re-built often, so tag may be behind +SHELL ["/bin/bash", "-c"] # Flag to say we're in a container ENV FAABRIC_DOCKER="on" @@ -11,25 +12,24 @@ WORKDIR /code RUN git clone -b v${FAABRIC_VERSION} https://github.com/faasm/faabric WORKDIR /code/faabric -RUN pip3 install -r requirements.txt - -# Static build -RUN inv dev.cmake --build=Release -RUN inv dev.cc faabric -RUN inv dev.cc faabric_tests -# Shared build -RUN inv dev.cmake --shared --build=Release -RUN inv dev.cc faabric --shared -RUN inv dev.install faabric --shared +# Python set-up and code builds +RUN ./bin/create_venv.sh \ + && source venv/bin/activate \ + # Static build + && inv dev.cmake --build=Release \ + && inv dev.cc faabric \ + && inv dev.cc faabric_tests \ + # Shared build + && inv dev.cmake --shared --build=Release \ + && inv dev.cc faabric --shared \ + && inv dev.install faabric --shared # GDB config, allow loading repo-specific config -RUN touch /root/.gdbinit RUN echo "set auto-load safe-path /" > /root/.gdbinit # CLI setup ENV TERM xterm-256color -SHELL ["/bin/bash", "-c"] RUN echo ". /code/faabric/bin/workon.sh" >> ~/.bashrc CMD ["/bin/bash", "-l"] diff --git a/docs/source/development.md b/docs/source/development.md index 52b4bf8e1..131bfe2e2 100644 --- a/docs/source/development.md +++ b/docs/source/development.md @@ -129,12 +129,18 @@ docker compose stop ## Creating a new tag -Create a new branch, then find and replace the current version with the relevant -bumped version. It should appear in: +Create a new branch, then bump the code version: -- `VERSION` -- `.env` -- `.github/workflows/tests.yml`. +```bash +inv git.bump +``` + +This will increment the minor version, to bump the code to an arbitrary version +you can run: + +```bash +inv git.bump --ver= +``` Once done, commit and push, then run: @@ -171,13 +177,17 @@ To build the main container, run: source bin/workon.sh # Build -inv container.build +inv docker.build -c faabric [--push] +``` + +The base container is not re-built often, and not re-built as part of Github +Actions. If you ever need to add an APT dependency, or update the Conan +version, run: -# Push -inv container.push +```bash +source bin/workon.sh -# Build and push -inv container.build --push +inv docker.build -c faabric-base [--push] ``` ## Publishing a release diff --git a/tasks/__init__.py b/tasks/__init__.py index f125572d2..49b5cc4ec 100644 --- a/tasks/__init__.py +++ b/tasks/__init__.py @@ -1,16 +1,16 @@ from invoke import Collection from . import call -from . import container from . import dev +from . import docker from . import docs from . import examples from . import git ns = Collection( call, - container, dev, + docker, docs, examples, git, diff --git a/tasks/container.py b/tasks/docker.py similarity index 54% rename from tasks/container.py rename to tasks/docker.py index 7b788c569..6f090593e 100644 --- a/tasks/container.py +++ b/tasks/docker.py @@ -49,33 +49,35 @@ def _do_push(name): run(cmd, shell=True, check=True) -@task(default=True) -def build(ctx, nocache=False, push=False): +@task(iterable=["c"]) +def build(ctx, c, nocache=False, push=False): """ - Build current version of faabric container + Build containers for faabric. Targets are: `faabric`, and `faabric-base` """ - _do_container_build(FAABRIC_IMAGE_NAME, nocache=nocache, push=push) + for ctr in c: + if ctr == "faabric": + img_name = FAABRIC_IMAGE_NAME + elif ctr == "faabric-base": + img_name = FAABRIC_BASE_IMAGE_NAME + else: + print("Unrecognised container name: {}".format(ctr)) + raise RuntimeError("Unrecognised container name") - -@task -def build_base(ctx, nocache=False, push=False): - """ - Build faabric's base container - """ - _do_container_build(FAABRIC_BASE_IMAGE_NAME, nocache=nocache, push=push) - - -@task -def push(ctx): - """ - Push current version of faabric container - """ - _do_push(FAABRIC_IMAGE_NAME) + _do_container_build(img_name, nocache=nocache, push=push) -@task -def push_base(ctx): +@task(iterable=["c"]) +def push(ctx, c): """ - Push faabric's base container + Push containers for faabric. Targets are: `faabric`, and `faabric-base` """ - _do_push(FAABRIC_BASE_IMAGE_NAME) + for ctr in c: + if ctr == "faabric": + img_name = FAABRIC_IMAGE_NAME + elif ctr == "faabric-base": + img_name = FAABRIC_BASE_IMAGE_NAME + else: + print("Unrecognised container name: {}".format(ctr)) + raise RuntimeError("Unrecognised container name") + + _do_push(img_name) diff --git a/tasks/git.py b/tasks/git.py index 3f2eed174..81746e0ee 100644 --- a/tasks/git.py +++ b/tasks/git.py @@ -3,11 +3,39 @@ from tasks.util.env import get_faabric_config, get_version, PROJ_ROOT from subprocess import run, PIPE, STDOUT +VERSIONED_FILES = [ + ".env", + ".github/workflows/tests.yml", + "VERSION", +] + def get_tag_name(version): return "v{}".format(version) +@task +def bump(ctx, ver=None): + """ + Bump code version + """ + old_ver = get_version() + + if ver: + new_ver = ver + else: + # Just bump the last minor version part + new_ver_parts = old_ver.split(".") + new_ver_minor = int(new_ver_parts[-1]) + 1 + new_ver_parts[-1] = str(new_ver_minor) + new_ver = ".".join(new_ver_parts) + + # Replace version in all files + for f in VERSIONED_FILES: + sed_cmd = "sed -i 's/{}/{}/g' {}".format(old_ver, new_ver, f) + run(sed_cmd, shell=True, check=True) + + @task def tag(ctx, force=False): """ From be18960275aa23bd94da8a23f8e0608e8eb5ad1f Mon Sep 17 00:00:00 2001 From: Carlos Date: Tue, 4 Oct 2022 11:02:49 +0100 Subject: [PATCH 3/4] Fix: run python formatting in gha inside venv (#273) * fix(gha): run python formatting in gha inside venv * fix(gha): improve the defaults configuration in gha * fix(dist-tests): use venv in dist-tests' build internal --- .github/workflows/tests.yml | 27 +++++++++++---------------- dist-test/build_internal.sh | 3 +++ 2 files changed, 14 insertions(+), 16 deletions(-) diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 78ab32090..2fd011be7 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -9,6 +9,7 @@ on: defaults: run: + working-directory: /code/faabric shell: bash jobs: @@ -17,9 +18,6 @@ jobs: runs-on: ubuntu-latest container: image: faasm/faabric:0.3.5 - defaults: - run: - working-directory: /code/faabric steps: - uses: faasm/conan-cache-action@v1 with: @@ -32,9 +30,6 @@ jobs: runs-on: ubuntu-latest container: image: faasm/faabric:0.3.5 - defaults: - run: - working-directory: /code/faabric steps: - name: "Fetch ref" run: git fetch origin ${GITHUB_REF}:ci-branch @@ -48,18 +43,21 @@ jobs: runs-on: ubuntu-latest container: image: faasm/faabric:0.3.5 - defaults: - run: - working-directory: /code/faabric steps: # --- Code update --- - name: "Fetch ref" run: git fetch origin ${GITHUB_REF}:ci-branch - name: "Check out branch" run: git checkout --force ci-branch + - name: "Update python deps" + run: | + source venv/bin/activate + pip3 install -r requirements.txt # --- Formatting checks --- - name: "Python formatting check" - run: ./bin/check_python.sh + run: | + source venv/bin/activate + ./bin/check_python.sh - name: "Run C/C++ formatting" run: ./bin/run_clang_format.sh - name: "Check C/C++ formatting changes" @@ -83,9 +81,6 @@ jobs: container: image: faasm/faabric:0.3.5 options: --privileged - defaults: - run: - working-directory: /code/faabric services: redis: image: redis @@ -109,6 +104,9 @@ jobs: runs-on: ubuntu-latest env: CONAN_CACHE_MOUNT_SOURCE: ~/.conan/ + defaults: + run: + working-directory: ${{ github.workspace }} steps: # --- Code update --- - name: "Check out code" @@ -132,9 +130,6 @@ jobs: REDIS_STATE_HOST: redis container: image: faasm/faabric:0.3.5 - defaults: - run: - working-directory: /code/faabric services: redis: image: redis diff --git a/dist-test/build_internal.sh b/dist-test/build_internal.sh index 8b335c11c..87a206bc3 100755 --- a/dist-test/build_internal.sh +++ b/dist-test/build_internal.sh @@ -5,6 +5,9 @@ set -e export PROJ_ROOT=$(dirname $(dirname $(readlink -f $0))) pushd ${PROJ_ROOT} >> /dev/null +# Activate the Python venv +source ./bin/workon.sh + # Run the debug build inv dev.cmake --build=Debug inv dev.cc faabric_dist_tests From d686b8eb4c70ea3fa20a32d6f314f17efa941b13 Mon Sep 17 00:00:00 2001 From: Raven Szewczyk Date: Wed, 12 Oct 2022 15:56:42 +0100 Subject: [PATCH 4/4] Asio+Beast-based endpoint server (#274) * Update dependency versions * boost::beast and asio-based asynchronous endpoint implementation * Address review comments * Remove Pistache as it is no longer used * Fix TSan-detected data races in distributed tests * Turn off MPI all-to-all disttests, see * Run clang-format-13 --- cmake/ExternalProjects.cmake | 46 +-- include/faabric/endpoint/FaabricEndpoint.h | 53 ++- .../faabric/endpoint/FaabricEndpointHandler.h | 26 +- include/faabric/endpoint/macros.h | 11 - include/faabric/scheduler/Scheduler.h | 44 ++- include/faabric/util/asio.h | 14 + src/endpoint/FaabricEndpoint.cpp | 312 ++++++++++++++---- src/endpoint/FaabricEndpointHandler.cpp | 139 ++++---- src/runner/FaabricMain.cpp | 7 - src/scheduler/Executor.cpp | 1 + src/scheduler/Scheduler.cpp | 131 +++++++- src/transport/MessageEndpoint.cpp | 7 +- tests/dist/mpi/functions.cpp | 4 +- tests/dist/mpi/mpi_native.h | 3 +- tests/dist/mpi/test_mpi_functions.cpp | 10 +- tests/test/endpoint/test_endpoint_api.cpp | 9 +- tests/test/endpoint/test_handler.cpp | 59 +++- thread-sanitizer-ignorelist.txt | 6 - 18 files changed, 653 insertions(+), 229 deletions(-) delete mode 100644 include/faabric/endpoint/macros.h create mode 100644 include/faabric/util/asio.h diff --git a/cmake/ExternalProjects.cmake b/cmake/ExternalProjects.cmake index 2988b5c98..08ab4549d 100644 --- a/cmake/ExternalProjects.cmake +++ b/cmake/ExternalProjects.cmake @@ -10,15 +10,14 @@ list(PREPEND CMAKE_PREFIX_PATH ${CMAKE_CURRENT_BINARY_DIR}) if(NOT EXISTS "${CMAKE_CURRENT_BINARY_DIR}/conan.cmake") message(STATUS "Downloading conan.cmake from https://github.com/conan-io/cmake-conan") - file(DOWNLOAD "https://raw.githubusercontent.com/conan-io/cmake-conan/v0.16.1/conan.cmake" + file(DOWNLOAD "https://raw.githubusercontent.com/conan-io/cmake-conan/0.18.1/conan.cmake" "${CMAKE_CURRENT_BINARY_DIR}/conan.cmake" - EXPECTED_HASH SHA256=396e16d0f5eabdc6a14afddbcfff62a54a7ee75c6da23f32f7a31bc85db23484 TLS_VERIFY ON) endif() include(${CMAKE_CURRENT_BINARY_DIR}/conan.cmake) -conan_check(VERSION 1.43.0 REQUIRED) +conan_check(VERSION 1.52.0 REQUIRED) # Enable revisions in the conan config execute_process(COMMAND ${CONAN_CMD} config set general.revisions_enabled=1 @@ -29,18 +28,18 @@ endif() conan_cmake_configure( REQUIRES - "boost/1.77.0@#d0be0b4b04a551f5d49ac540e59f51bd" - "catch2/2.13.7@#31c8cd08e3c957a9eac8cb1377cf5863" + "boost/1.80.0@#db5db5bd811d23b95089d4a95259d147" + "catch2/2.13.9@#8793d3e6287d3684201418de556d98fe" "cppcodec/0.2@#f6385611ce2f7cff954ac8b16e25c4fa" - "cpprestsdk/2.10.18@#36e30936126a3da485ce05d619fb1249" - "cppzmq/4.8.1@#e0f26b0614b3d812815edc102ce0d881" - "flatbuffers/2.0.0@#82f5d13594b370c3668bb8abccffc706" - "hiredis/1.0.2@#297f55bf1e66f8b9c1dc0e7d35e705ab" + "cpprestsdk/2.10.18@#ed9788e9d202d6eadd92581368ddfc2f" + "cppzmq/4.8.1@#010df8fa1c5ebbc615704e8c16693bac" + "flatbuffers/2.0.5@#c6a9508bd476da080f7aecbe7a094b68" + "hiredis/1.0.2@#370dad964286cadb1f15dc90252e8ef3" "protobuf/3.20.0@#8e4de7081bea093469c9e6076149b2b4" - "rapidjson/cci.20200410@#abe3eeacf36801901f6f6d82d124781a" - "readerwriterqueue/1.0.5@#4232c2ff826eb41e33d8ad8efd3c4c4c" - "spdlog/1.9.2@#3724602b7b7e843c5e0a687c45e279c9" - "zeromq/4.3.4@#3b9b0de9c4509784dc92629f3aaf2fe4" + "rapidjson/cci.20211112@#65b4e5feb6f1edfc8cbac0f669acaf17" + "readerwriterqueue/1.0.6@#a95c8da3d68822dec4d4c13fff4b5c96" + "spdlog/1.10.0@#6406c337028e15e56cd6a070cbac54c4" + "zeromq/4.3.4@#d4fe4001f6c2e5960e58c251687c5b2f" "zlib/1.2.12@#3b9e037ae1c615d045a06c67d88491ae" GENERATORS cmake_find_package @@ -79,10 +78,10 @@ conan_cmake_install(PATH_OR_REFERENCE . include(${CMAKE_CURRENT_BINARY_DIR}/conan_paths.cmake) -find_package(Boost 1.77.0 REQUIRED) +find_package(Boost 1.79.0 REQUIRED) find_package(Catch2 REQUIRED) -find_package(Flatbuffers REQUIRED) -find_package(Protobuf REQUIRED) +find_package(FlatBuffers REQUIRED) +find_package(Protobuf 3.20.0 REQUIRED) find_package(RapidJSON REQUIRED) find_package(ZLIB REQUIRED) find_package(ZeroMQ REQUIRED) @@ -94,15 +93,6 @@ find_package(hiredis REQUIRED) find_package(spdlog REQUIRED) find_package(readerwriterqueue REQUIRED) -# Pistache - Conan version is out of date and doesn't support clang -FetchContent_Declare(pistache_ext - GIT_REPOSITORY "https://github.com/pistacheio/pistache.git" - GIT_TAG "ff9db0d9439a4411b24541d97a937968f384a4d3" -) - -FetchContent_MakeAvailable(pistache_ext) -add_library(pistache::pistache ALIAS pistache_static) - # zstd (Conan version not customizable enough) set(ZSTD_BUILD_CONTRIB OFF CACHE INTERNAL "") set(ZSTD_BUILD_CONTRIB OFF CACHE INTERNAL "") @@ -120,13 +110,13 @@ set(ZSTD_LZ4_SUPPORT OFF CACHE INTERNAL "") FetchContent_Declare(zstd_ext GIT_REPOSITORY "https://github.com/facebook/zstd" - GIT_TAG "v1.5.0" + GIT_TAG "v1.5.2" SOURCE_SUBDIR "build/cmake" ) FetchContent_MakeAvailable(zstd_ext) # Work around zstd not declaring its targets properly -target_include_directories(libzstd_static INTERFACE $) +target_include_directories(libzstd_static SYSTEM INTERFACE $) add_library(zstd::libzstd_static ALIAS libzstd_static) # Group all external dependencies into a convenient virtual CMake library @@ -143,7 +133,6 @@ target_link_libraries(faabric_common_dependencies INTERFACE cppzmq::cppzmq flatbuffers::flatbuffers hiredis::hiredis - pistache::pistache protobuf::libprotobuf RapidJSON::RapidJSON readerwriterqueue::readerwriterqueue @@ -153,5 +142,6 @@ target_link_libraries(faabric_common_dependencies INTERFACE ) target_compile_definitions(faabric_common_dependencies INTERFACE FMT_DEPRECATED= # Suppress warnings about use of deprecated api by spdlog + BOOST_NO_TYPEID=1 # Prevent odd crashes within asio implementation ) add_library(faabric::common_dependencies ALIAS faabric_common_dependencies) diff --git a/include/faabric/endpoint/FaabricEndpoint.h b/include/faabric/endpoint/FaabricEndpoint.h index 59ea5b976..79cfef6ab 100644 --- a/include/faabric/endpoint/FaabricEndpoint.h +++ b/include/faabric/endpoint/FaabricEndpoint.h @@ -1,37 +1,66 @@ #pragma once -#include -#include +#include +#include +#include +#include #include namespace faabric::endpoint { -enum EndpointMode +enum class EndpointMode { SIGNAL, BG_THREAD }; +namespace detail { +struct EndpointState; +} + +struct HttpRequestContext +{ + asio::io_context& ioc; + asio::any_io_executor executor; + std::function sendFunction; +}; + +class HttpRequestHandler +{ + public: + virtual void onRequest(HttpRequestContext&& ctx, + faabric::util::BeastHttpRequest&& request) = 0; +}; + class FaabricEndpoint { public: FaabricEndpoint(); - FaabricEndpoint(int portIn, int threadCountIn); + FaabricEndpoint( + int port, + int threadCount, + std::shared_ptr requestHandlerIn = nullptr); - void start(EndpointMode mode); + FaabricEndpoint(const FaabricEndpoint&) = delete; - void stop(); + FaabricEndpoint(FaabricEndpoint&&) = delete; - private: - int port = faabric::util::getSystemConfig().endpointPort; - int threadCount = faabric::util::getSystemConfig().endpointNumThreads; + FaabricEndpoint& operator=(const FaabricEndpoint&) = delete; + + FaabricEndpoint& operator=(FaabricEndpoint&&) = delete; - Pistache::Http::Endpoint httpEndpoint; + virtual ~FaabricEndpoint(); - std::mutex mx; + void start(EndpointMode mode = EndpointMode::SIGNAL); - void runEndpoint(); + void stop(); + + private: + int port; + int threadCount; + std::unique_ptr state; + std::shared_ptr requestHandler; }; } diff --git a/include/faabric/endpoint/FaabricEndpointHandler.h b/include/faabric/endpoint/FaabricEndpointHandler.h index b7c25a1ce..0748acd21 100644 --- a/include/faabric/endpoint/FaabricEndpointHandler.h +++ b/include/faabric/endpoint/FaabricEndpointHandler.h @@ -1,23 +1,25 @@ #pragma once +#include #include -#include namespace faabric::endpoint { -class FaabricEndpointHandler : public Pistache::Http::Handler +class FaabricEndpointHandler final + : public HttpRequestHandler + , public std::enable_shared_from_this { public: - HTTP_PROTOTYPE(FaabricEndpointHandler) - - void onTimeout(const Pistache::Http::Request& request, - Pistache::Http::ResponseWriter writer) override; - - void onRequest(const Pistache::Http::Request& request, - Pistache::Http::ResponseWriter response) override; - - std::pair handleFunction(const std::string& requestStr); + void onRequest(HttpRequestContext&& ctx, + faabric::util::BeastHttpRequest&& request) override; private: - std::pair executeFunction(faabric::Message& msg); + void executeFunction(HttpRequestContext&& ctx, + faabric::util::BeastHttpResponse&& partialResponse, + std::shared_ptr ber, + size_t messageIndex); + + void onFunctionResult(HttpRequestContext&& ctx, + faabric::util::BeastHttpResponse&& partialResponse, + faabric::Message& msg); }; } diff --git a/include/faabric/endpoint/macros.h b/include/faabric/endpoint/macros.h deleted file mode 100644 index 6645d8e36..000000000 --- a/include/faabric/endpoint/macros.h +++ /dev/null @@ -1,11 +0,0 @@ -#include - -using namespace faabric::endpoint; - -void _entrypoint(int argc, char* argv[]); - -#define FAABRIC_HTTP_MAIN(port) \ - void main(int argc, char* argv[]) {} \ - \ - void _entrypoint(int argc, char* argv[]) \ - { diff --git a/include/faabric/scheduler/Scheduler.h b/include/faabric/scheduler/Scheduler.h index 530353f69..f69583147 100644 --- a/include/faabric/scheduler/Scheduler.h +++ b/include/faabric/scheduler/Scheduler.h @@ -8,6 +8,7 @@ #include #include #include +#include #include #include #include @@ -152,6 +153,40 @@ class FunctionMigrationThread : public faabric::util::PeriodicBackgroundThread void doWork() override; }; +/** + * A promise for a future message result with an associated eventfd for use with + * asio. + */ +class MessageLocalResult final +{ + public: + std::promise> promise; + int eventFd = -1; + + MessageLocalResult(); + + MessageLocalResult(const MessageLocalResult&) = delete; + + inline MessageLocalResult(MessageLocalResult&& other) + { + this->operator=(std::move(other)); + } + + MessageLocalResult& operator=(const MessageLocalResult&) = delete; + + inline MessageLocalResult& operator=(MessageLocalResult&& other) + { + this->promise = std::move(other.promise); + this->eventFd = other.eventFd; + other.eventFd = -1; + return *this; + } + + ~MessageLocalResult(); + + void setValue(std::unique_ptr&& msg); +}; + /** * Background thread that periodically checks to see if any executors have * become stale (i.e. not handled any requests in a given timeout). If any are @@ -214,6 +249,12 @@ class Scheduler faabric::Message getFunctionResult(unsigned int messageId, int timeout); + void getFunctionResultAsync(unsigned int messageId, + int timeoutMs, + asio::io_context& ioc, + asio::any_io_executor& executor, + std::function handler); + void setThreadResult(const faabric::Message& msg, int32_t returnValue, const std::string& key, @@ -332,8 +373,7 @@ class Scheduler std::unordered_map threadResultMessages; - std::unordered_map>> + std::unordered_map> localResults; std::unordered_map> pushedSnapshotsMap; diff --git a/include/faabric/util/asio.h b/include/faabric/util/asio.h new file mode 100644 index 000000000..9a2e404ec --- /dev/null +++ b/include/faabric/util/asio.h @@ -0,0 +1,14 @@ +#pragma once + +#include +#include +#include +#include + +namespace asio = boost::asio; +namespace beast = boost::beast; + +namespace faabric::util { +using BeastHttpRequest = beast::http::request; +using BeastHttpResponse = beast::http::response; +} diff --git a/src/endpoint/FaabricEndpoint.cpp b/src/endpoint/FaabricEndpoint.cpp index 64f7e56dd..f040db29c 100644 --- a/src/endpoint/FaabricEndpoint.cpp +++ b/src/endpoint/FaabricEndpoint.cpp @@ -1,92 +1,284 @@ #include #include -#include -#include -#include -#include +#include +#include +#include #include +#include +#include +#include + +// Closely follows the example async Beast HTTP server from +// https://www.boost.org/doc/libs/1_80_0/libs/beast/example/http/server/async/http_server_async.cpp namespace faabric::endpoint { -FaabricEndpoint::FaabricEndpoint() - : FaabricEndpoint(faabric::util::getSystemConfig().endpointPort, - faabric::util::getSystemConfig().endpointNumThreads) -{} +namespace detail { +class EndpointState +{ + public: + EndpointState(int threadCountIn) + : ioc(threadCountIn) + {} -FaabricEndpoint::FaabricEndpoint(int portIn, int threadCountIn) - : port(portIn) - , threadCount(threadCountIn) - , httpEndpoint( - Pistache::Address(Pistache::Ipv4::any(), Pistache::Port(portIn))) -{} + asio::io_context ioc; + std::vector ioThreads; +}; +} -void FaabricEndpoint::start(EndpointMode mode) +namespace { +/** + * Wrapper around the Boost::Beast HTTP parser to keep the connection state + * alive in-between asynchronous calls. + */ +class HttpConnection : public std::enable_shared_from_this { - sigset_t signals; + private: + asio::io_context& ioc; + beast::tcp_stream stream; + beast::flat_buffer buffer; + beast::http::request_parser parser; + std::shared_ptr handler; - if (mode == EndpointMode::SIGNAL) { - SPDLOG_INFO( - "Starting blocking endpoint on {}, {} threads", port, threadCount); - - // Set up signal handler - if (sigemptyset(&signals) != 0 || sigaddset(&signals, SIGTERM) != 0 || - sigaddset(&signals, SIGKILL) != 0 || - sigaddset(&signals, SIGINT) != 0 || - sigaddset(&signals, SIGHUP) != 0 || - sigaddset(&signals, SIGQUIT) != 0 || - pthread_sigmask(SIG_BLOCK, &signals, nullptr) != 0) { - - throw std::runtime_error("Install signal handler failed"); + public: + HttpConnection(asio::io_context& iocIn, + asio::ip::tcp::socket&& socket, + std::shared_ptr handlerIn) + : ioc(iocIn) + , stream(std::move(socket)) + , buffer() + , parser() + , handler(handlerIn) + {} + + void run() + { + asio::dispatch( + stream.get_executor(), + std::bind_front(&HttpConnection::doRead, this->shared_from_this())); + } + + private: + void doRead() + { + parser.body_limit(boost::none); + faabric::util::SystemConfig& conf = faabric::util::getSystemConfig(); + stream.expires_after(std::chrono::seconds(conf.globalMessageTimeout)); + beast::http::async_read( + stream, + buffer, + parser, + std::bind_front(&HttpConnection::onRead, this->shared_from_this())); + } + + void handleRequest(faabric::util::BeastHttpRequest msg) + { + HttpRequestContext hrc{ ioc, + stream.get_executor(), + std::bind_front(&HttpConnection::sendResponse, + this->shared_from_this()) }; + handler->onRequest(std::move(hrc), std::move(msg)); + } + + void onRead(beast::error_code ec, size_t bytesTransferred) + { + UNUSED(bytesTransferred); + if (ec == beast::http::error::end_of_stream) { + doClose(); + return; + } + if (ec) { + SPDLOG_ERROR("Error reading an HTTP request: {}", ec.message()); + return; + } + SPDLOG_TRACE("Read HTTP request of {} bytes", bytesTransferred); + handleRequest(parser.release()); + } + + void sendResponse(faabric::util::BeastHttpResponse&& response) + { + // Response needs to be freed after the send completes + auto ownedResponse = std::make_shared( + std::move(response)); + ownedResponse->prepare_payload(); + beast::http::async_write(stream, + *ownedResponse, + std::bind_front(&HttpConnection::onWrite, + this->shared_from_this(), + ownedResponse)); + } + + void onWrite(std::shared_ptr response, + beast::error_code ec, + size_t bytesTransferred) + { + bool needsEof = response->need_eof(); + response.reset(); + UNUSED(bytesTransferred); + if (ec) { + SPDLOG_ERROR("Couldn't write HTTP response: {}", ec.message()); + return; + } + SPDLOG_TRACE("Write HTTP response of {} bytes", bytesTransferred); + if (needsEof) { + doClose(); + return; + } + // Reset parser to a fresh object, it has no copy/move assignment + parser.~parser(); + new (&parser) decltype(parser)(); + doRead(); + } + + void doClose() + { + beast::error_code ec; + stream.socket().shutdown(asio::socket_base::shutdown_send, ec); + // Ignore errors on connection closing + } +}; + +/** + * Keeps the TCP connection alive inside of the Asio asynchronous executor and + * creates HttpConnection objects for incoming connections. + */ +class EndpointListener : public std::enable_shared_from_this +{ + private: + asio::io_context& ioc; + asio::ip::tcp::acceptor acceptor; + std::shared_ptr handler; + + public: + EndpointListener(asio::io_context& iocIn, + asio::ip::tcp::endpoint endpoint, + std::shared_ptr handlerIn) + : ioc(iocIn) + , acceptor(asio::make_strand(iocIn)) + , handler(handlerIn) + { + try { + acceptor.open(endpoint.protocol()); + acceptor.set_option(asio::socket_base::reuse_address(true)); + acceptor.bind(endpoint); + acceptor.listen(asio::socket_base::max_listen_connections); + } catch (std::runtime_error& e) { + SPDLOG_CRITICAL( + "Couldn't listen on port {}: {}", endpoint.port(), e.what()); + throw; } + } + + void run() + { + asio::dispatch(acceptor.get_executor(), + std::bind_front(&EndpointListener::doAccept, + this->shared_from_this())); + } - // Start the endpoint - runEndpoint(); + private: + void doAccept() + { + // create a new strand (forces all related tasks to happen on one + // thread) + acceptor.async_accept(asio::make_strand(ioc), + std::bind_front(&EndpointListener::handleAccept, + this->shared_from_this())); + } - // Wait for a signal - SPDLOG_INFO("Awaiting signal"); - int signal = 0; - int status = sigwait(&signals, &signal); - if (status == 0) { - SPDLOG_INFO("Received signal: {}", signal); + void handleAccept(beast::error_code ec, asio::ip::tcp::socket socket) + { + SPDLOG_TRACE("handleAccept"); + if (ec) { + SPDLOG_ERROR("Failed accept(): {}", ec.message()); } else { - SPDLOG_INFO("Sigwait return value: {}", signal); + std::make_shared(ioc, std::move(socket), handler) + ->run(); } + doAccept(); + } +}; +} - faabric::util::UniqueLock lock(mx); - httpEndpoint.shutdown(); - } else if (mode == EndpointMode::BG_THREAD) { - SPDLOG_INFO( - "Starting background endpoint on {}, {} threads", port, threadCount); +FaabricEndpoint::FaabricEndpoint() + : FaabricEndpoint(faabric::util::getSystemConfig().endpointPort, + faabric::util::getSystemConfig().endpointNumThreads, + nullptr) +{} - runEndpoint(); - } else { - SPDLOG_ERROR("Unrecognised endpoint mode: {}", mode); - throw std::runtime_error("Unrecognised endpoint mode"); +FaabricEndpoint::FaabricEndpoint( + int portIn, + int threadCountIn, + std::shared_ptr requestHandlerIn) + : port(portIn) + , threadCount(threadCountIn) + , state(nullptr) + , requestHandler(requestHandlerIn) +{ + if (!requestHandler) { + requestHandler = + std::make_shared(); } } -void FaabricEndpoint::runEndpoint() +FaabricEndpoint::~FaabricEndpoint() { - faabric::util::UniqueLock lock(mx); + this->stop(); +} + +void FaabricEndpoint::start(EndpointMode mode) +{ + SPDLOG_INFO("Starting HTTP endpoint on {}, {} threads", port, threadCount); + + this->state = std::make_unique(this->threadCount); - auto opts = Pistache::Http::Endpoint::options() - .threads(threadCount) - .backlog(256) - .flags(Pistache::Tcp::Options::ReuseAddr); + const auto address = asio::ip::make_address_v4("0.0.0.0"); + const auto port = static_cast(this->port); - httpEndpoint.init(opts); + // Create a TCP listener and transfer its (shared) ownership to the Asio + // event queue + std::make_shared(state->ioc, + asio::ip::tcp::endpoint{ address, port }, + this->requestHandler) + ->run(); - // Configure and start endpoint in background - httpEndpoint.setHandler( - Pistache::Http::make_handler()); - httpEndpoint.serveThreaded(); + // Create a simple Asio task that awaits incoming signals and tells the + // event queue to shut down (bringing down all the previously created tasks + // with it) when they are received. + std::optional signals; + if (mode == EndpointMode::SIGNAL) { + signals.emplace(state->ioc, SIGINT, SIGTERM, SIGQUIT); + signals->async_wait([&](beast::error_code const& ec, int sig) { + if (!ec) { + SPDLOG_INFO("Received signal: {}", sig); + state->ioc.stop(); + } + }); + } + + // Make sure the total number of worker threads is this->threadCount, when + // in SIGNAL mode the main thread is also used as a worker thread. + int extraThreads = (mode == EndpointMode::SIGNAL) + ? std::max(0, this->threadCount - 1) + : std::max(1, this->threadCount); + state->ioThreads.reserve(extraThreads); + auto ioc_run = [&ioc{ state->ioc }]() { ioc.run(); }; + for (int i = 0; i < extraThreads; i++) { + state->ioThreads.emplace_back(ioc_run); + } + if (mode == EndpointMode::SIGNAL) { + ioc_run(); + } } void FaabricEndpoint::stop() { SPDLOG_INFO("Shutting down endpoint on {}", port); - faabric::util::UniqueLock lock(mx); - httpEndpoint.shutdown(); + state->ioc.stop(); + for (auto& thread : state->ioThreads) { + thread.join(); + } + state->ioThreads.clear(); } } diff --git a/src/endpoint/FaabricEndpointHandler.cpp b/src/endpoint/FaabricEndpointHandler.cpp index ae064bf7e..62aa3983a 100644 --- a/src/endpoint/FaabricEndpointHandler.cpp +++ b/src/endpoint/FaabricEndpointHandler.cpp @@ -9,52 +9,42 @@ #include namespace faabric::endpoint { -void FaabricEndpointHandler::onTimeout(const Pistache::Http::Request& request, - Pistache::Http::ResponseWriter writer) -{ - writer.send(Pistache::Http::Code::No_Content); -} -void FaabricEndpointHandler::onRequest(const Pistache::Http::Request& request, - Pistache::Http::ResponseWriter response) +using header = beast::http::field; + +void FaabricEndpointHandler::onRequest( + HttpRequestContext&& ctx, + faabric::util::BeastHttpRequest&& request) { - SPDLOG_TRACE("Handler received request: {}", request.body()); + SPDLOG_TRACE("Faabric handler received request"); // Very permissive CORS - response.headers().add( - "*"); - response.headers().add( - "GET,POST,PUT,OPTIONS"); - response.headers().add( - "User-Agent,Content-Type"); + faabric::util::BeastHttpResponse response; + response.keep_alive(request.keep_alive()); + response.set(header::server, "Faabric endpoint"); + response.set(header::access_control_allow_origin, "*"); + response.set(header::access_control_allow_methods, "GET,POST,PUT,OPTIONS"); + response.set(header::access_control_allow_headers, + "User-Agent,Content-Type"); // Text response type - response.headers().add( - Pistache::Http::Mime::MediaType("text/plain")); + response.set(header::content_type, "text/plain"); - // Set response timeout - faabric::util::SystemConfig& conf = faabric::util::getSystemConfig(); - response.timeoutAfter(std::chrono::milliseconds(conf.globalMessageTimeout)); + PROF_START(endpointRoundTrip) // Parse message from JSON in request - std::pair result = handleFunction(request.body()); - - Pistache::Http::Code responseCode = Pistache::Http::Code::Ok; - if (result.first > 0) { - responseCode = Pistache::Http::Code::Internal_Server_Error; - } - response.send(responseCode, result.second); -} + const std::string& requestStr = request.body(); -std::pair FaabricEndpointHandler::handleFunction( - const std::string& requestStr) -{ - std::pair response; + // Handle JSON if (requestStr.empty()) { SPDLOG_ERROR("Faabric handler received empty request"); - response = std::make_pair(1, "Empty request"); + response.result(beast::http::status::bad_request); + response.body() = std::string("Empty request"); } else { - faabric::Message msg = faabric::util::jsonToMessage(requestStr); + auto req = faabric::util::batchExecFactory(); + req->set_type(req->FUNCTIONS); + faabric::Message& msg = *req->add_messages(); + msg = faabric::util::jsonToMessage(requestStr); faabric::scheduler::Scheduler& sched = faabric::scheduler::getScheduler(); @@ -64,43 +54,57 @@ std::pair FaabricEndpointHandler::handleFunction( sched.getFunctionResult(msg.id(), 0); if (result.type() == faabric::Message_MessageType_EMPTY) { - response = std::make_pair(0, "RUNNING"); + response.result(beast::http::status::ok); + response.body() = std::string("RUNNING"); } else if (result.returnvalue() == 0) { - response = - std::make_pair(0, faabric::util::messageToJson(result)); + response.result(beast::http::status::ok); + response.body() = faabric::util::messageToJson(result); } else { - response = std::make_pair(1, "FAILED: " + result.outputdata()); + response.result(beast::http::status::internal_server_error); + response.body() = "FAILED: " + result.outputdata(); } } else if (msg.isexecgraphrequest()) { SPDLOG_DEBUG("Processing execution graph request"); faabric::scheduler::ExecGraph execGraph = sched.getFunctionExecGraph(msg.id()); - response = - std::make_pair(0, faabric::scheduler::execGraphToJson(execGraph)); + response.result(beast::http::status::ok); + response.body() = faabric::scheduler::execGraphToJson(execGraph); } else if (msg.type() == faabric::Message_MessageType_FLUSH) { SPDLOG_DEBUG("Broadcasting flush request"); sched.broadcastFlush(); - response = std::make_pair(0, "Flush sent"); + response.result(beast::http::status::ok); + response.body() = std::string("Flush sent"); } else { - response = executeFunction(msg); + executeFunction( + std::move(ctx), std::move(response), std::move(req), 0); + return; } } - return response; + PROF_END(endpointRoundTrip) + ctx.sendFunction(std::move(response)); } -std::pair FaabricEndpointHandler::executeFunction( - faabric::Message& msg) +void FaabricEndpointHandler::executeFunction( + HttpRequestContext&& ctx, + faabric::util::BeastHttpResponse&& response, + std::shared_ptr ber, + size_t messageIndex) { faabric::util::SystemConfig& conf = faabric::util::getSystemConfig(); + faabric::Message& msg = *ber->mutable_messages(messageIndex); if (msg.user().empty()) { - return std::make_pair(1, "Empty user"); + response.result(beast::http::status::bad_request); + response.body() = std::string("Empty user"); + return ctx.sendFunction(std::move(response)); } if (msg.function().empty()) { - return std::make_pair(1, "Empty function"); + response.result(beast::http::status::bad_request); + response.body() = std::string("Empty function"); + return ctx.sendFunction(std::move(response)); } // Set message ID and master host @@ -113,29 +117,48 @@ std::pair FaabricEndpointHandler::executeFunction( msg.set_executeslocally(true); } - auto tid = (pid_t)syscall(SYS_gettid); + auto tid = gettid(); const std::string funcStr = faabric::util::funcToString(msg, true); SPDLOG_DEBUG("Worker HTTP thread {} scheduling {}", tid, funcStr); // Schedule it faabric::scheduler::Scheduler& sch = faabric::scheduler::getScheduler(); - sch.callFunction(msg); + sch.callFunctions(ber); // Await result on global bus (may have been executed on a different worker) if (msg.isasync()) { - return std::make_pair(0, faabric::util::buildAsyncResponse(msg)); + response.result(beast::http::status::ok); + response.body() = faabric::util::buildAsyncResponse(msg); + return ctx.sendFunction(std::move(response)); } SPDLOG_DEBUG("Worker thread {} awaiting {}", tid, funcStr); + sch.getFunctionResultAsync( + msg.id(), + conf.globalMessageTimeout, + ctx.ioc, + ctx.executor, + beast::bind_front_handler(&FaabricEndpointHandler::onFunctionResult, + this->shared_from_this(), + std::move(ctx), + std::move(response))); +} - try { - const faabric::Message result = - sch.getFunctionResult(msg.id(), conf.globalMessageTimeout); - SPDLOG_DEBUG("Worker thread {} result {}", tid, funcStr); - - return std::make_pair(result.returnvalue(), result.outputdata() + "\n"); - } catch (faabric::redis::RedisNoResponseException& ex) { - return std::make_pair(1, "No response from function\n"); - } +void FaabricEndpointHandler::onFunctionResult( + HttpRequestContext&& ctx, + faabric::util::BeastHttpResponse&& response, + faabric::Message& result) +{ + beast::http::status statusCode = + (result.returnvalue() == 0) ? beast::http::status::ok + : beast::http::status::internal_server_error; + response.result(statusCode); + SPDLOG_DEBUG("Worker thread {} result {}", + gettid(), + faabric::util::funcToString(result, true)); + + response.body() = result.outputdata(); + return ctx.sendFunction(std::move(response)); } + } diff --git a/src/runner/FaabricMain.cpp b/src/runner/FaabricMain.cpp index e66152460..553d45b10 100644 --- a/src/runner/FaabricMain.cpp +++ b/src/runner/FaabricMain.cpp @@ -5,13 +5,6 @@ #include #include -#include -#include -#include -#include -#include -#include - namespace faabric::runner { FaabricMain::FaabricMain( std::shared_ptr execFactory) diff --git a/src/scheduler/Executor.cpp b/src/scheduler/Executor.cpp index e7be61ec0..fe027b3ff 100644 --- a/src/scheduler/Executor.cpp +++ b/src/scheduler/Executor.cpp @@ -93,6 +93,7 @@ void Executor::shutdown() // Wait for thread to terminate if (threadPoolThreads[i]->joinable()) { + threadPoolThreads[i]->request_stop(); threadPoolThreads[i]->join(); } diff --git a/src/scheduler/Scheduler.cpp b/src/scheduler/Scheduler.cpp index 4b8041f2a..9aedb4ae3 100644 --- a/src/scheduler/Scheduler.cpp +++ b/src/scheduler/Scheduler.cpp @@ -19,6 +19,10 @@ #include #include +#include +#include +#include + #include #define FLUSH_TIMEOUT_MS 10000 @@ -42,6 +46,24 @@ static thread_local std::unordered_map snapshotClients; +MessageLocalResult::MessageLocalResult() +{ + eventFd = eventfd(0, EFD_CLOEXEC); +} + +MessageLocalResult::~MessageLocalResult() +{ + if (eventFd >= 0) { + close(eventFd); + } +} + +void MessageLocalResult::setValue(std::unique_ptr&& msg) +{ + this->promise.set_value(std::move(msg)); + eventfd_write(this->eventFd, (eventfd_t)1); +} + Scheduler& getScheduler() { static Scheduler sch; @@ -833,8 +855,7 @@ faabric::util::SchedulingDecision Scheduler::doCallFunctions( localResultsMutex); localResults.insert( { localMsg.id(), - std::promise< - std::unique_ptr>() }); + std::make_shared() }); } std::shared_ptr e = claimExecutor(localMsg, lock); @@ -1093,7 +1114,7 @@ void Scheduler::setFunctionResult(faabric::Message& msg) auto it = localResults.find(msg.id()); if (it != localResults.end()) { - it->second.set_value(std::make_unique(msg)); + it->second->setValue(std::make_unique(msg)); } // Sync messages can't have their results read twice, so skip @@ -1260,7 +1281,7 @@ faabric::Message Scheduler::getFunctionResult(unsigned int messageId, if (it == localResults.end()) { break; // fallback to redis } - fut = it->second.get_future(); + fut = it->second->promise.get_future(); } if (!isBlocking) { auto status = fut.wait_for(std::chrono::milliseconds(timeoutMs)); @@ -1314,6 +1335,108 @@ faabric::Message Scheduler::getFunctionResult(unsigned int messageId, return msgResult; } +void Scheduler::getFunctionResultAsync( + unsigned int messageId, + int timeoutMs, + asio::io_context& ioc, + asio::any_io_executor& executor, + std::function handler) +{ + if (messageId == 0) { + throw std::runtime_error("Must provide non-zero message ID"); + } + + do { + std::shared_ptr mlr; + // Try to find matching local promise + { + faabric::util::UniqueLock resultsLock(localResultsMutex); + auto it = localResults.find(messageId); + if (it == localResults.end()) { + break; // Fallback to redis + } + mlr = it->second; + } + // Asio wrapper for the MLR eventfd + class MlrAwaiter : public std::enable_shared_from_this + { + public: + unsigned int messageId; + Scheduler* sched; + std::shared_ptr mlr; + asio::posix::stream_descriptor dsc; + std::function handler; + + MlrAwaiter(unsigned int messageId, + Scheduler* sched, + std::shared_ptr mlr, + asio::posix::stream_descriptor dsc, + std::function handler) + : messageId(messageId) + , sched(sched) + , mlr(std::move(mlr)) + , dsc(std::move(dsc)) + , handler(handler) + {} + + ~MlrAwaiter() + { + // Ensure that Asio doesn't close the eventfd, to prevent a + // double-close in the MLR destructor + dsc.release(); + } + + void await(const boost::system::error_code& ec) + { + if (!ec) { + auto msg = mlr->promise.get_future().get(); + handler(*msg); + { + faabric::util::UniqueLock resultsLock( + sched->localResultsMutex); + sched->localResults.erase(messageId); + } + } else { + // The waiting task can spuriously wake up, requeue if this + // happens + doAwait(); + } + } + + // Schedule this task waiting on the eventfd in the Asio queue + void doAwait() + { + dsc.async_wait(asio::posix::stream_descriptor::wait_read, + beast::bind_front_handler( + &MlrAwaiter::await, this->shared_from_this())); + } + }; + auto awaiter = std::make_shared( + messageId, + this, + mlr, + asio::posix::stream_descriptor(ioc, mlr->eventFd), + std::move(handler)); + awaiter->doAwait(); + return; + } while (0); + + // TODO: Use a non-blocking redis API here to avoid stalling the async + // worker thread + redis::Redis& redis = redis::Redis::getQueue(); + + std::string resultKey = faabric::util::resultKeyFromMessageId(messageId); + + faabric::Message msgResult; + + // Blocking version will throw an exception when timing out + // which is handled by the caller. + std::vector result = redis.dequeueBytes(resultKey, timeoutMs); + msgResult.ParseFromArray(result.data(), (int)result.size()); + + handler(msgResult); +} + faabric::HostResources Scheduler::getThisHostResources() { faabric::util::SharedLock lock(mx); diff --git a/src/transport/MessageEndpoint.cpp b/src/transport/MessageEndpoint.cpp index 77c980340..f3f82c155 100644 --- a/src/transport/MessageEndpoint.cpp +++ b/src/transport/MessageEndpoint.cpp @@ -122,8 +122,9 @@ zmq::socket_t socketFactory(zmq::socket_type socketType, break; } default: { - SPDLOG_ERROR( - "Invalid bind socket type {} ({})", socketType, address); + SPDLOG_ERROR("Invalid bind socket type {} ({})", + static_cast(socketType), + address); throw std::runtime_error( "Binding with invalid socket type"); } @@ -176,7 +177,7 @@ zmq::socket_t socketFactory(zmq::socket_type socketType, } default: { SPDLOG_ERROR("Invalid connect socket type {} ({})", - socketType, + static_cast(socketType), address); throw std::runtime_error( "Connecting with unrecognized socket type"); diff --git a/tests/dist/mpi/functions.cpp b/tests/dist/mpi/functions.cpp index 0572b3114..e856d0d9d 100644 --- a/tests/dist/mpi/functions.cpp +++ b/tests/dist/mpi/functions.cpp @@ -11,7 +11,7 @@ using namespace tests::mpi; namespace tests { -faabric::Message* tests::mpi::executingCall; +std::atomic tests::mpi::executingCall; int handleMpiAllGather(tests::DistTestExecutor* exec, int threadPoolIdx, @@ -140,7 +140,7 @@ int handleMpiMigration(tests::DistTestExecutor* exec, { executingCall = &req->mutable_messages()->at(msgIdx); - return migration(std::stoi(executingCall->inputdata())); + return migration(std::stoi(executingCall.load()->inputdata())); } int handleMpiOrder(tests::DistTestExecutor* exec, diff --git a/tests/dist/mpi/mpi_native.h b/tests/dist/mpi/mpi_native.h index 03d04f858..1d3c70b21 100644 --- a/tests/dist/mpi/mpi_native.h +++ b/tests/dist/mpi/mpi_native.h @@ -1,5 +1,6 @@ #pragma once +#include #include #define NUM_MIGRATION_LOOPS 10000 @@ -7,7 +8,7 @@ namespace tests::mpi { -extern faabric::Message* executingCall; +extern std::atomic executingCall; // --- List of MPI functions --- diff --git a/tests/dist/mpi/test_mpi_functions.cpp b/tests/dist/mpi/test_mpi_functions.cpp index df7d34190..eb109f08c 100644 --- a/tests/dist/mpi/test_mpi_functions.cpp +++ b/tests/dist/mpi/test_mpi_functions.cpp @@ -33,7 +33,7 @@ TEST_CASE_METHOD(MpiDistTestsFixture, "Test MPI all reduce", "[mpi]") checkAllocationAndResult(req); } -TEST_CASE_METHOD(MpiDistTestsFixture, "Test MPI all to all", "[mpi]") +TEST_CASE_METHOD(MpiDistTestsFixture, "Test MPI all to all", "[.][mpi]") { // Set up this host's resources setLocalSlots(nLocalSlots); @@ -45,7 +45,9 @@ TEST_CASE_METHOD(MpiDistTestsFixture, "Test MPI all to all", "[mpi]") checkAllocationAndResult(req); } -TEST_CASE_METHOD(MpiDistTestsFixture, "Test MPI all to all many times", "[mpi]") +TEST_CASE_METHOD(MpiDistTestsFixture, + "Test MPI all to all many times", + "[.][mpi]") { int numRuns = 50; int oldNumLocalSlots = nLocalSlots; @@ -66,7 +68,9 @@ TEST_CASE_METHOD(MpiDistTestsFixture, "Test MPI all to all many times", "[mpi]") nLocalSlots = oldNumLocalSlots; } -TEST_CASE_METHOD(MpiDistTestsFixture, "Test MPI all to all and sleep", "[mpi]") +TEST_CASE_METHOD(MpiDistTestsFixture, + "Test MPI all to all and sleep", + "[.][mpi]") { // Set up this host's resources setLocalSlots(nLocalSlots); diff --git a/tests/test/endpoint/test_endpoint_api.cpp b/tests/test/endpoint/test_endpoint_api.cpp index 57174fbef..cc1bb8aab 100644 --- a/tests/test/endpoint/test_endpoint_api.cpp +++ b/tests/test/endpoint/test_endpoint_api.cpp @@ -9,7 +9,6 @@ #include #include -using namespace Pistache; using namespace faabric::scheduler; namespace tests { @@ -103,7 +102,7 @@ TEST_CASE_METHOD(EndpointApiTestFixture, SECTION("Empty request") { - expectedReturnCode = 500; + expectedReturnCode = 400; expectedResponseBody = "Empty request"; } @@ -113,7 +112,7 @@ TEST_CASE_METHOD(EndpointApiTestFixture, body = faabric::util::messageToJson(msg); expectedReturnCode = 200; expectedResponseBody = - fmt::format("Endpoint API test executed {}\n", msg.id()); + fmt::format("Endpoint API test executed {}", msg.id()); } SECTION("Error request") @@ -122,7 +121,7 @@ TEST_CASE_METHOD(EndpointApiTestFixture, body = faabric::util::messageToJson(msg); expectedReturnCode = 500; expectedResponseBody = - fmt::format("Endpoint API returning 1 for {}\n", msg.id()); + fmt::format("Endpoint API returning 1 for {}", msg.id()); } SECTION("Invalid function") @@ -131,7 +130,7 @@ TEST_CASE_METHOD(EndpointApiTestFixture, body = faabric::util::messageToJson(msg); expectedReturnCode = 500; expectedResponseBody = fmt::format( - "Task {} threw exception. What: Endpoint API error\n", msg.id()); + "Task {} threw exception. What: Endpoint API error", msg.id()); } std::pair result = postToUrl(LOCALHOST, port, body); diff --git a/tests/test/endpoint/test_handler.cpp b/tests/test/endpoint/test_handler.cpp index 68dcc4b6a..659a0a4e3 100644 --- a/tests/test/endpoint/test_handler.cpp +++ b/tests/test/endpoint/test_handler.cpp @@ -9,8 +9,6 @@ #include #include -using namespace Pistache; - namespace tests { class EndpointHandlerTestFixture : public SchedulerTestFixture @@ -29,6 +27,29 @@ class EndpointHandlerTestFixture : public SchedulerTestFixture std::shared_ptr executorFactory; }; +// Taking in a shared_ptr by reference to ensure the handler was constructed +// with std::make_shared +std::pair synchronouslyHandleFunction( + std::shared_ptr& handler, + std::string requestStr) +{ + asio::io_context ioc(1); + asio::strand strand = asio::make_strand(ioc); + faabric::util::BeastHttpResponse response; + faabric::util::BeastHttpRequest req(beast::http::verb::get, "/", 10); + req.body() = requestStr; + faabric::endpoint::HttpRequestContext ctx{ + ioc, + strand, + [&](faabric::util::BeastHttpResponse&& resp) { + response = std::move(resp); + } + }; + handler->onRequest(std::move(ctx), std::move(req)); + ioc.run(); + return std::make_pair(response.result_int(), response.body()); +} + TEST_CASE_METHOD(EndpointHandlerTestFixture, "Test valid calls to endpoint", "[endpoint]") @@ -53,10 +74,12 @@ TEST_CASE_METHOD(EndpointHandlerTestFixture, const std::string& requestStr = faabric::util::messageToJson(call); // Handle the function - endpoint::FaabricEndpointHandler handler; - std::pair response = handler.handleFunction(requestStr); + std::shared_ptr handler = + std::make_shared(); + std::pair response = + synchronouslyHandleFunction(handler, requestStr); - REQUIRE(response.first == 0); + REQUIRE(response.first == 200); std::string responseStr = response.second; // Check actual call has right details including the ID returned to the @@ -75,10 +98,12 @@ TEST_CASE_METHOD(EndpointHandlerTestFixture, TEST_CASE("Test empty invocation", "[endpoint]") { - endpoint::FaabricEndpointHandler handler; - std::pair actual = handler.handleFunction(""); + std::shared_ptr handler = + std::make_shared(); + std::pair actual = + synchronouslyHandleFunction(handler, ""); - REQUIRE(actual.first == 1); + REQUIRE(actual.first == 400); REQUIRE(actual.second == "Empty request"); } @@ -101,11 +126,13 @@ TEST_CASE("Test empty JSON invocation", "[endpoint]") call.set_user("demo"); } - endpoint::FaabricEndpointHandler handler; + std::shared_ptr handler = + std::make_shared(); const std::string& requestStr = faabric::util::messageToJson(call); - std::pair actual = handler.handleFunction(requestStr); + std::pair actual = + synchronouslyHandleFunction(handler, requestStr); - REQUIRE(actual.first == 1); + REQUIRE(actual.first == 400); REQUIRE(actual.second == expected); } @@ -116,7 +143,7 @@ TEST_CASE_METHOD(EndpointHandlerTestFixture, // Create a message faabric::Message msg = faabric::util::messageFactory("demo", "echo"); - int expectedReturnCode = 0; + int expectedReturnCode = 200; std::string expectedOutput; SECTION("Running") { expectedOutput = "RUNNING"; } @@ -128,7 +155,7 @@ TEST_CASE_METHOD(EndpointHandlerTestFixture, msg.set_returnvalue(1); sch.setFunctionResult(msg); - expectedReturnCode = 1; + expectedReturnCode = 500; expectedOutput = "FAILED: " + errorMsg; } @@ -145,9 +172,11 @@ TEST_CASE_METHOD(EndpointHandlerTestFixture, msg.set_isstatusrequest(true); - endpoint::FaabricEndpointHandler handler; + std::shared_ptr handler = + std::make_shared(); const std::string& requestStr = faabric::util::messageToJson(msg); - std::pair actual = handler.handleFunction(requestStr); + std::pair actual = + synchronouslyHandleFunction(handler, requestStr); REQUIRE(actual.first == expectedReturnCode); REQUIRE(actual.second == expectedOutput); diff --git a/thread-sanitizer-ignorelist.txt b/thread-sanitizer-ignorelist.txt index c19ca166d..0f5df4934 100644 --- a/thread-sanitizer-ignorelist.txt +++ b/thread-sanitizer-ignorelist.txt @@ -1,15 +1,9 @@ -# TSan detects a race on pistache shutdown -race:*Pistache* -race:close -race:socket # Ignore ZeroMQ races race:zmq::* # Config only changes in tests, and in places where being slightly racy doesn't matter race:faabric::util::SystemConfig::* # Catch2 allocates in its signal handler, this prevents showing the wrong crash report signal:* -# TODO: moodycamel's queue version 1.0.6 fixes the warnings we silence here -race:moodycamel::* # TODO: Remove: There's something weird going on with MPI code I don't understand race:faabric::scheduler::MpiWorld::*