Skip to content

Commit

Permalink
E2E tests for SQS splitting (#2554)
Browse files Browse the repository at this point in the history
* idk

* changelog

* test resources for sqs

* idk

* patch operator to use localstack

* test can use localstack

* send messages, expect output

* verifications

* fixes

* provide credentials for sqs client

* words

* resources for manual testing

* fix broken configs

* prints

* implement echo queue verification

* don't verify queue deletion, remove done TODOs

* passing locally

* refactoring and docs

* waits and sleeps

* trim localstack host

* remove commented out verification

* fmt

* fix docs

* thanks clippy

* use image after CR - with release build

* used released test image

* increase test timeout

* fix image name
  • Loading branch information
t4lz authored Sep 19, 2024
1 parent 9cc8102 commit 7b8dae6
Show file tree
Hide file tree
Showing 20 changed files with 1,695 additions and 39 deletions.
382 changes: 382 additions & 0 deletions Cargo.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ members = [
"tests/rust-bypassed-unix-socket",
"tests/issue1317",
"tests/rust-websockets",
"tests/rust-sqs-printer",
]
resolver = "2"

Expand Down
1 change: 1 addition & 0 deletions changelog.d/+sqs-tests.internal.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
E2E tests for SQS splitting.
3 changes: 1 addition & 2 deletions mirrord/layer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,6 @@ use mirrord_protocol::{EnvVars, GetEnvVarsRequest};
use proxy_connection::ProxyConnection;
use setup::LayerSetup;
use socket::SOCKETS;
use tracing::Level;
use tracing_subscriber::{fmt::format::FmtSpan, prelude::*};

use crate::{
Expand Down Expand Up @@ -488,7 +487,7 @@ fn sip_only_layer_start(mut config: LayerConfig, patch_binaries: Vec<String>) {
/// - `enabled_remote_dns`: replaces [`libc::getaddrinfo`] and [`libc::freeaddrinfo`] when this is
/// `true`, see [`NetworkConfig`](mirrord_config::feature::network::NetworkConfig), and
/// [`hooks::enable_socket_hooks`](socket::hooks::enable_socket_hooks).
#[mirrord_layer_macro::instrument(level = Level::TRACE)]
#[mirrord_layer_macro::instrument(level = tracing::Level::TRACE)]
fn enable_hooks(state: &LayerSetup) {
let enabled_file_ops = state.fs_config().is_active();
let enabled_remote_dns = state.remote_dns_enabled();
Expand Down
2 changes: 1 addition & 1 deletion mirrord/operator/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -661,7 +661,7 @@ impl OperatorApi<PreparedClientCert> {

if config.feature.copy_target.enabled.not() {
if is_empty_deployment.not() {
copy_subtask.info("Creating a copy-target for queue-splitting (even thought copy_target was not explicitly set).")
copy_subtask.info("Creating a copy-target for queue-splitting (even though copy_target was not explicitly set).")
} else {
copy_subtask.info("Creating a copy-target for deployment (even thought copy_target was not explicitly set).")
}
Expand Down
2 changes: 1 addition & 1 deletion mirrord/operator/src/setup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use thiserror::Error;

use crate::crd::{MirrordPolicy, MirrordSqsSession, MirrordWorkloadQueueRegistry, TargetCrd};

static OPERATOR_NAME: &str = "mirrord-operator";
pub static OPERATOR_NAME: &str = "mirrord-operator";
/// 443 is standard port for APIService, do not change this value
/// (will require users to add FW rules)
static OPERATOR_PORT: i32 = 443;
Expand Down
10 changes: 8 additions & 2 deletions tests/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,20 @@ workspace = true
doctest = false

[dependencies]
aws-config = { version = "1.5", features = ["behavior-version-latest"], optional = true }
aws-credential-types = "1.2"
aws-sdk-sqs = { version = "1.39", optional = true }
aws-types = "1.3"
json-patch = "2.0"
jsonptr = "0.4.7" # for compatiblity with json-patch
k8s-openapi.workspace = true
kube.workspace = true
reqwest.workspace = true
regex.workspace = true
tokio = { workspace = true, features = ["rt", "rt-multi-thread", "net", "macros", "process"] }
serde_json.workspace = true
mirrord = { artifact = "bin", path = "../mirrord/cli" }
mirrord-operator = { path = "../mirrord/operator" }
mirrord-operator = { path = "../mirrord/operator", features = ["setup", "crd"] }
serde = "1"
futures.workspace = true
rstest = "0.21"
Expand All @@ -38,7 +44,7 @@ rustls.workspace = true
default = ["ephemeral", "job", "cli", "targetless"]
ephemeral = []
job = []
operator = []
operator = ["dep:aws-config", "dep:aws-sdk-sqs"]
docker = []
cli = []
targetless = []
19 changes: 19 additions & 0 deletions tests/configs/sqs_queue_splitting_a.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
{
"operator": true,
"feature": {
"split_queues": {
"e2e-test-queue1": {
"queue_type": "SQS",
"message_filter": {
"client": "^a$"
}
},
"e2e-test-queue2": {
"queue_type": "SQS",
"message_filter": {
"client": "^a$"
}
}
}
}
}
19 changes: 19 additions & 0 deletions tests/configs/sqs_queue_splitting_b.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
{
"operator": true,
"feature": {
"split_queues": {
"e2e-test-queue1": {
"queue_type": "SQS",
"message_filter": {
"client": "^b$"
}
},
"e2e-test-queue2": {
"queue_type": "SQS",
"message_filter": {
"client": "^b$"
}
}
}
}
}
9 changes: 9 additions & 0 deletions tests/rust-sqs-printer/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
[package]
name = "rust-sqs-printer"
version = "0.1.0"
edition = "2021"

[dependencies]
aws-config = { version = "1.5", features = ["behavior-version-latest"] }
aws-sdk-sqs = { version = "1.39" }
tokio = { version = "1.39", features = ["rt", "rt-multi-thread", "macros"] }
28 changes: 28 additions & 0 deletions tests/rust-sqs-printer/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# Using the agent's build image, that has rustup targets, zigbuild etc.
FROM --platform=$BUILDPLATFORM ghcr.io/metalbear-co/ci-agent-build:latest AS build-env
ARG TARGETARCH

WORKDIR /build
COPY . .

# Translate docker's platform to cargo's target.
RUN /build/platform.sh


# Build binary.
ENV CARGO_NET_GIT_FETCH_WITH_CLI=true
RUN cargo zigbuild -Z bindeps --target $(cat /.platform)
RUN cp /build/target/$(cat /.platform)/debug/rust-sqs-printer /rust-sqs-printer

# Slim final layer with only the binary.
FROM debian:stable-slim
# TODO: optimize this?
RUN apt-get update
RUN apt-get install -y --no-install-recommends ca-certificates
RUN update-ca-certificates

WORKDIR /app
COPY --from=build-env /rust-sqs-printer .

CMD ["./rust-sqs-printer"]

5 changes: 5 additions & 0 deletions tests/rust-sqs-printer/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# rust-sqs-printer

Test application used by the SQS E2E tests.

The `Dockerfile` and k8s manifest sqs-printer.yaml are for manual testing.
20 changes: 20 additions & 0 deletions tests/rust-sqs-printer/mirrord.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
{
"operator": true,
"target": "deployment/sqs-printer",
"feature": {
"split_queues": {
"manual-queue": {
"queue_type": "SQS",
"message_filter": {
"local": "1"
}
},
"fifo-queue": {
"queue_type": "SQS",
"message_filter": {
"local": "1"
}
}
}
}
}
20 changes: 20 additions & 0 deletions tests/rust-sqs-printer/platform.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
#!/bin/sh

# Used in Docker build to set platform dependent variables

case $TARGETARCH in

"amd64")
echo "x86_64-unknown-linux-gnu" > /.platform
echo "" > /.compiler
;;
"arm64")
echo "aarch64-unknown-linux-gnu" > /.platform
echo "gcc-aarch64-linux-gnu" > /.compiler
;;
"arm")
echo "armv7-unknown-linux-gnueabihf" > /.platform
echo "gcc-arm-linux-gnueabihf" > /.compiler
;;
esac

28 changes: 28 additions & 0 deletions tests/rust-sqs-printer/sqs-printer.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
apiVersion: apps/v1
kind: Deployment
metadata:
name: sqs-printer
labels:
app: sqs-printer
spec:
replicas: 1
selector:
matchLabels:
app: sqs-printer
template:
metadata:
labels:
app: sqs-printer
spec:
serviceAccountName: sqs-reader-account
containers:
- name: sqs-printer
image: docker.io/t4lz/sqs-printer:latest
env:
- name: RUST_BACKTRACE
value: "1"
- name: SQS_TEST_Q_NAME1
value: ManualTesting1
- name: SQS_TEST_Q_NAME2
value: TestQueue.fifo

80 changes: 80 additions & 0 deletions tests/rust-sqs-printer/src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
use aws_sdk_sqs::{operation::receive_message::ReceiveMessageOutput, types::Message, Client};
use tokio::time::{sleep, Duration};

/// Name of the environment variable that holds the name of the first SQS queue to read from.
const QUEUE_NAME_ENV_VAR1: &str = "SQS_TEST_Q_NAME1";

/// Name of the environment variable that holds the name of the second SQS queue to read from.
const QUEUE_NAME_ENV_VAR2: &str = "SQS_TEST_Q_NAME2";

/// Reads from queue and prints the contents of each message in a new line.
async fn read_from_queue(read_q_name: String, client: Client, queue_num: u8) {
let read_q_url = client
.get_queue_url()
.queue_name(read_q_name)
.send()
.await
.unwrap()
.queue_url
.unwrap();
let receive_message_request = client
.receive_message()
.message_attribute_names(".*")
// Without this the wait time would be 0 and responses would return immediately also when
// there are no messages (and potentially even sometimes return empty immediately when
// there are actually messages).
// By setting a time != 0 (20 is the maximum), we perform "long polling" which means we
// won't get "false empties" and also less empty responses, because SQS will wait for that
// time before returning an empty response.
.wait_time_seconds(5)
.queue_url(&read_q_url);
loop {
let res = match receive_message_request.clone().send().await {
Ok(res) => res,
Err(err) => {
println!("ERROR: {err:?}");
sleep(Duration::from_secs(3)).await;
continue;
}
};
if let ReceiveMessageOutput {
messages: Some(messages),
..
} = res
{
for Message {
body,
receipt_handle,
..
} in messages
{
println!(
"{queue_num}:{}",
body.expect("Got message without body. Expected content.")
);
if let Some(handle) = receipt_handle {
client
.delete_message()
.queue_url(&read_q_url)
.receipt_handle(handle)
.send()
.await
.unwrap();
}
}
}
}
}

#[tokio::main]
async fn main() {
let sdk_config = aws_config::load_from_env().await;
let client = Client::new(&sdk_config);
let read_q_name = std::env::var(QUEUE_NAME_ENV_VAR1).unwrap();
let q_task_handle = tokio::spawn(read_from_queue(read_q_name.clone(), client.clone(), 1));
let read_q_name = std::env::var(QUEUE_NAME_ENV_VAR2).unwrap();
let fifo_q_task_handle = tokio::spawn(read_from_queue(read_q_name.clone(), client.clone(), 2));
let (q_res, fifo_res) = tokio::join!(q_task_handle, fifo_q_task_handle);
q_res.unwrap();
fifo_res.unwrap();
}
1 change: 1 addition & 0 deletions tests/src/operator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,6 @@

mod concurrent_steal;
mod policies;
mod queue_splitting;
mod sanity;
mod setup;
Loading

0 comments on commit 7b8dae6

Please sign in to comment.