diff --git a/.github/workflows/build.yaml b/.github/workflows/build.yaml index 82b26dc8..adbd9fbd 100644 --- a/.github/workflows/build.yaml +++ b/.github/workflows/build.yaml @@ -44,6 +44,9 @@ jobs: ARTIFACT=$(make help | grep 'Artifact: ') echo "name=${ARTIFACT#'Artifact: '}" >> $GITHUB_OUTPUT + - name: Change artifact permissions + run: sudo chmod a+r ${{ steps.artifact.outputs.name }} + - name: Upload locally built artifact uses: actions/upload-artifact@v3 with: diff --git a/.gitignore b/.gitignore index 723ef36f..0b26130a 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,4 @@ -.idea \ No newline at end of file +.idea +*.rock +*.tar +.make_cache \ No newline at end of file diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index eba0a602..c27e3b68 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -13,6 +13,7 @@ cd charmed-spark-rock sudo snap install rockcraft --edge sudo snap install docker sudo snap install lxd +sudo snap install yq sudo snap install skopeo --edge --devmode ``` diff --git a/Makefile b/Makefile index 14a7fa9c..4474e3d1 100644 --- a/Makefile +++ b/Makefile @@ -24,6 +24,7 @@ _MAKE_DIR := .make_cache $(shell mkdir -p $(_MAKE_DIR)) K8S_TAG := $(_MAKE_DIR)/.k8s_tag +AWS_TAG := $(_MAKE_DIR)/.aws_tag IMAGE_NAME := $(shell yq .name rockcraft.yaml) VERSION := $(shell yq .version rockcraft.yaml) @@ -70,7 +71,7 @@ $(_TMP_OCI_TAG): $(_ROCK_OCI) touch $(_TMP_OCI_TAG) $(CHARMED_OCI_TAG): $(_TMP_OCI_TAG) - docker build - -t "$(CHARMED_OCI_FULL_NAME):$(TAG)" --build-arg BASE_IMAGE="$(_TMP_OCI_NAME):$(TAG)" < Dockerfile + docker build -t "$(CHARMED_OCI_FULL_NAME):$(TAG)" --build-arg BASE_IMAGE="$(_TMP_OCI_NAME):$(TAG)" -f Dockerfile . if [ ! -d "$(_MAKE_DIR)/$(CHARMED_OCI_FULL_NAME)" ]; then mkdir -p "$(_MAKE_DIR)/$(CHARMED_OCI_FULL_NAME)"; fi touch $(CHARMED_OCI_TAG) @@ -80,10 +81,15 @@ $(K8S_TAG): sg microk8s ./tests/integration/config-microk8s.sh @touch $(K8S_TAG) +$(AWS_TAG): $(K8S_TAG) + @echo "=== Setting up and configure AWS CLI ===" + /bin/bash ./tests/integration/setup-aws-cli.sh + touch $(AWS_TAG) + microk8s: $(K8S_TAG) $(_MAKE_DIR)/%/$(TAG).tar: $(_MAKE_DIR)/%/$(TAG).tag - docker save $*:$(TAG) > $(_MAKE_DIR)/$*/$(TAG).tar + docker save $*:$(TAG) -o $(_MAKE_DIR)/$*/$(TAG).tar $(BASE_NAME): $(_MAKE_DIR)/$(CHARMED_OCI_FULL_NAME)/$(TAG).tar @echo "=== Creating $(BASE_NAME) OCI archive ===" @@ -106,7 +112,7 @@ import: $(K8S_TAG) build microk8s ctr images import --base-name $(CHARMED_OCI_FULL_NAME):$(TAG) $(BASE_NAME) endif -tests: +tests: $(K8S_TAG) $(AWS_TAG) @echo "=== Running Integration Tests ===" /bin/bash ./tests/integration/integration-tests.sh diff --git a/tests/integration/config-microk8s.sh b/tests/integration/config-microk8s.sh index 82c6ebf7..9461fb82 100755 --- a/tests/integration/config-microk8s.sh +++ b/tests/integration/config-microk8s.sh @@ -2,3 +2,4 @@ microk8s status --wait-ready microk8s config | tee ~/.kube/config microk8s.enable dns microk8s.enable rbac +microk8s.enable minio \ No newline at end of file diff --git a/tests/integration/integration-tests.sh b/tests/integration/integration-tests.sh index a9267613..a73961bc 100755 --- a/tests/integration/integration-tests.sh +++ b/tests/integration/integration-tests.sh @@ -135,7 +135,7 @@ teardown_test_pod() { run_example_job_in_pod() { SPARK_EXAMPLES_JAR_NAME="spark-examples_2.12-$(get_spark_version).jar" - PREVIOUS_JOB=$(kubectl get pods | grep driver | tail -n 1 | cut -d' ' -f1) + PREVIOUS_JOB=$(kubectl get pods -n ${NAMESPACE}| grep driver | tail -n 1 | cut -d' ' -f1) NAMESPACE=$1 USERNAME=$2 @@ -166,6 +166,136 @@ run_example_job_in_pod() { validate_pi_value $pi } +get_s3_access_key(){ + # Prints out S3 Access Key by reading it from K8s secret + kubectl get secret -n minio-operator microk8s-user-1 -o jsonpath='{.data.CONSOLE_ACCESS_KEY}' | base64 -d +} + +get_s3_secret_key(){ + # Prints out S3 Secret Key by reading it from K8s secret + kubectl get secret -n minio-operator microk8s-user-1 -o jsonpath='{.data.CONSOLE_SECRET_KEY}' | base64 -d +} + +get_s3_endpoint(){ + # Prints out the endpoint S3 bucket is exposed on. + kubectl get service minio -n minio-operator -o jsonpath='{.spec.clusterIP}' +} + +create_s3_bucket(){ + # Creates a S3 bucket with the given name. + S3_ENDPOINT=$(get_s3_endpoint) + BUCKET_NAME=$1 + aws --endpoint-url "http://$S3_ENDPOINT" s3api create-bucket --bucket "$BUCKET_NAME" + echo "Created S3 bucket ${BUCKET_NAME}" +} + +delete_s3_bucket(){ + # Deletes a S3 bucket with the given name. + S3_ENDPOINT=$(get_s3_endpoint) + BUCKET_NAME=$1 + aws --endpoint-url "http://$S3_ENDPOINT" s3 rb "s3://$BUCKET_NAME" --force + echo "Deleted S3 bucket ${BUCKET_NAME}" +} + +copy_file_to_s3_bucket(){ + # Copies a file from local to S3 bucket. + # The bucket name and the path to file that is to be uploaded is to be provided as arguments + BUCKET_NAME=$1 + FILE_PATH=$2 + + # If file path is '/foo/bar/file.ext', the basename is 'file.ext' + BASE_NAME=$(basename "$FILE_PATH") + S3_ENDPOINT=$(get_s3_endpoint) + + # Copy the file to S3 bucket + aws --endpoint-url "http://$S3_ENDPOINT" s3 cp $FILE_PATH s3://"$BUCKET_NAME"/"$BASE_NAME" + echo "Copied file ${FILE_PATH} to S3 bucket ${BUCKET_NAME}" +} + +test_iceberg_example_in_pod(){ + # Test Iceberg integration in Charmed Spark Rock + + # First create S3 bucket named 'spark' + create_s3_bucket spark + + # Copy 'test-iceberg.py' script to 'spark' bucket + copy_file_to_s3_bucket spark ./tests/integration/resources/test-iceberg.py + + NAMESPACE="tests" + USERNAME="spark" + + # Number of rows that are to be inserted during the test. + NUM_ROWS_TO_INSERT="4" + + # Number of driver pods that exist in the namespace already. + PREVIOUS_DRIVER_PODS_COUNT=$(kubectl get pods -n ${NAMESPACE} | grep driver | wc -l) + + # Submit the job from inside 'testpod' + kubectl exec testpod -- \ + env \ + UU="$USERNAME" \ + NN="$NAMESPACE" \ + IM="$(spark_image)" \ + NUM_ROWS="$NUM_ROWS_TO_INSERT" \ + ACCESS_KEY="$(get_s3_access_key)" \ + SECRET_KEY="$(get_s3_secret_key)" \ + S3_ENDPOINT="$(get_s3_endpoint)" \ + /bin/bash -c '\ + spark-client.spark-submit \ + --username $UU --namespace $NN \ + --conf spark.kubernetes.driver.request.cores=100m \ + --conf spark.kubernetes.executor.request.cores=100m \ + --conf spark.kubernetes.container.image=$IM \ + --conf spark.hadoop.fs.s3a.aws.credentials.provider=org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider \ + --conf spark.hadoop.fs.s3a.connection.ssl.enabled=false \ + --conf spark.hadoop.fs.s3a.path.style.access=true \ + --conf spark.hadoop.fs.s3a.endpoint=$S3_ENDPOINT \ + --conf spark.hadoop.fs.s3a.access.key=$ACCESS_KEY \ + --conf spark.hadoop.fs.s3a.secret.key=$SECRET_KEY \ + --conf spark.jars.ivy=/tmp \ + --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \ + --conf spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog \ + --conf spark.sql.catalog.spark_catalog.type=hive \ + --conf spark.sql.catalog.local=org.apache.iceberg.spark.SparkCatalog \ + --conf spark.sql.catalog.local.type=hadoop \ + --conf spark.sql.catalog.local.warehouse=s3a://spark/warehouse \ + --conf spark.sql.defaultCatalog=local \ + s3a://spark/test-iceberg.py -n $NUM_ROWS' + + # Delete 'spark' bucket + delete_s3_bucket spark + + # Number of driver pods after the job is completed. + DRIVER_PODS_COUNT=$(kubectl get pods -n ${NAMESPACE} | grep driver | wc -l) + + # If the number of driver pods is same as before, job has not been run at all! + if [[ "${PREVIOUS_DRIVER_PODS_COUNT}" == "${DRIVER_PODS_COUNT}" ]] + then + echo "ERROR: Sample job has not run!" + exit 1 + fi + + # Find the ID of the driver pod that ran the job. + # tail -n 1 => Filter out the last line + # cut -d' ' -f1 => Split by spaces and pick the first part + DRIVER_POD_ID=$(kubectl get pods -n ${NAMESPACE} | grep test-iceberg-.*-driver | tail -n 1 | cut -d' ' -f1) + + # Filter out the output log line + OUTPUT_LOG_LINE=$(kubectl logs ${DRIVER_POD_ID} -n ${NAMESPACE} | grep 'Number of rows inserted:' ) + + # Fetch out the number of rows inserted + # rev => Reverse the string + # cut -d' ' -f1 => Split by spaces and pick the first part + # rev => Reverse the string back + NUM_ROWS_INSERTED=$(echo $OUTPUT_LOG_LINE | rev | cut -d' ' -f1 | rev) + + if [ "${NUM_ROWS_INSERTED}" != "${NUM_ROWS_TO_INSERT}" ]; then + echo "ERROR: ${NUM_ROWS_TO_INSERT} were supposed to be inserted. Found ${NUM_ROWS_INSERTED} rows. Aborting with exit code 1." + exit 1 + fi + +} + run_example_job_in_pod_with_pod_templates() { SPARK_EXAMPLES_JAR_NAME="spark-examples_2.12-$(get_spark_version).jar" @@ -425,6 +555,13 @@ echo -e "RUN EXAMPLE JOB WITH ERRORS" echo -e "########################################" (setup_user_admin_context && test_example_job_in_pod_with_errors && cleanup_user_success) || cleanup_user_failure_in_pod + +echo -e "##################################" +echo -e "RUN EXAMPLE THAT USES ICEBERG LIBRARIES" +echo -e "##################################" + +(setup_user_admin_context && test_iceberg_example_in_pod && cleanup_user_success) || cleanup_user_failure_in_pod + echo -e "##################################" echo -e "TEARDOWN TEST POD" echo -e "##################################" diff --git a/tests/integration/resources/test-iceberg.py b/tests/integration/resources/test-iceberg.py new file mode 100644 index 00000000..929914e4 --- /dev/null +++ b/tests/integration/resources/test-iceberg.py @@ -0,0 +1,35 @@ +import argparse +import random + +from pyspark.sql import SparkSession +from pyspark.sql.types import LongType, StructType, StructField + +parser = argparse.ArgumentParser("TestIceberg") +parser.add_argument("--num_rows", "-n", type=int) +args = parser.parse_args() +num_rows = args.num_rows + +# Create Spark session +spark = SparkSession.builder.appName("IcebergExample").getOrCreate() + +# Create schema +schema = StructType( + [StructField("row_id", LongType(), True), StructField("row_val", LongType(), True)] +) + +# Generate 'num_rows' number of random rows to be inserted. +data = [] +for idx in range(num_rows): + row = (idx + 1, random.randint(1, 100)) + data.append(row) + +# Create a data frame and write it +df = spark.createDataFrame(data, schema) +df.writeTo("demo.foo.bar").create() + +# Read back the inserted data and count the number of rows +df = spark.table("demo.foo.bar") +count = df.count() + +# Print the number of rows +print(f"Number of rows inserted: {count}") diff --git a/tests/integration/setup-aws-cli.sh b/tests/integration/setup-aws-cli.sh new file mode 100755 index 00000000..b30293cb --- /dev/null +++ b/tests/integration/setup-aws-cli.sh @@ -0,0 +1,17 @@ +#!/bin/bash + +# Install AWS CLI +sudo snap install aws-cli --classic + +# Get Access key and secret key from MinIO +ACCESS_KEY=$(kubectl get secret -n minio-operator microk8s-user-1 -o jsonpath='{.data.CONSOLE_ACCESS_KEY}' | base64 -d) +SECRET_KEY=$(kubectl get secret -n minio-operator microk8s-user-1 -o jsonpath='{.data.CONSOLE_SECRET_KEY}' | base64 -d) + +S3_BUCKET="spark" +DEFAULT_REGION="us-east-2" + +# Configure AWS CLI credentials +aws configure set aws_access_key_id $ACCESS_KEY +aws configure set aws_secret_access_key $SECRET_KEY +aws configure set default.region $DEFAULT_REGION +echo "AWS CLI credentials set successfully"