From 22a209328d163f8a53f2fc36a5e92093094e6a54 Mon Sep 17 00:00:00 2001 From: Hussein Awala Date: Sun, 5 May 2024 23:01:27 +0200 Subject: [PATCH] test(airflow): add tests for SparkOnK8SOperatorLink (#54) --- tests/airflow/test_operator_links.py | 60 ++++++++++++++++++++++++++++ 1 file changed, 60 insertions(+) create mode 100644 tests/airflow/test_operator_links.py diff --git a/tests/airflow/test_operator_links.py b/tests/airflow/test_operator_links.py new file mode 100644 index 0000000..84e833a --- /dev/null +++ b/tests/airflow/test_operator_links.py @@ -0,0 +1,60 @@ +from __future__ import annotations + +from unittest import mock + +import pytest + +from conftest import PYTHON_312 + + +@pytest.mark.skipif(PYTHON_312, reason="Python 3.12 is not supported by Airflow") +class TestSparkOnK8SOperatorLink: + def test_persist_spark_ui_link(self): + from spark_on_k8s.airflow.operator_links import SparkOnK8SOperatorLink + + mock_spark_on_k8s_operator = mock.MagicMock() + context = {} + SparkOnK8SOperatorLink.persist_spark_ui_link( + context=context, + task_instance=mock_spark_on_k8s_operator, + spark_on_k8s_service_url="http://localhost:8000", + namespace="spark", + spark_app_id="spark-app-id", + ) + + mock_spark_on_k8s_operator.xcom_push.assert_called_once_with( + context, + key="spark_ui_link", + value="http://localhost:8000/webserver/ui/spark/spark-app-id", + ) + + def test_persist_spark_history_ui_link(self): + from spark_on_k8s.airflow.operator_links import SparkOnK8SOperatorLink + + mock_spark_on_k8s_operator = mock.MagicMock() + context = {} + SparkOnK8SOperatorLink.persist_spark_history_ui_link( + context=context, + task_instance=mock_spark_on_k8s_operator, + spark_on_k8s_service_url="http://localhost:8000", + spark_app_id="spark-app-id", + ) + + mock_spark_on_k8s_operator.xcom_push.assert_called_once_with( + context, + key="spark_ui_link", + value="http://localhost:8000/webserver/ui-history/history/spark-app-id", + ) + + @mock.patch("airflow.models.xcom.BaseXCom.get_value") + def test_get_link(self, mock_get_value): + from spark_on_k8s.airflow.operator_links import SparkOnK8SOperatorLink + + mock_link = "http://localhost:8000/webserver/ui/spark/spark-app-id" + mock_get_value.return_value = mock_link + mock_spark_on_k8s_operator = mock.MagicMock() + mock_ti_key = mock.MagicMock() + spark_on_k8s_operator_link = SparkOnK8SOperatorLink() + link = spark_on_k8s_operator_link.get_link(operator=mock_spark_on_k8s_operator, ti_key=mock_ti_key) + + assert link == mock_link