diff --git a/ocs_ci/helpers/helpers.py b/ocs_ci/helpers/helpers.py index 71dd6b1ae73..468ec68c2a7 100644 --- a/ocs_ci/helpers/helpers.py +++ b/ocs_ci/helpers/helpers.py @@ -38,6 +38,7 @@ query_nb_db_psql_version, ) +from ocs_ci.ocs.node import get_worker_nodes from ocs_ci.ocs import constants, defaults, node, ocp, exceptions from ocs_ci.ocs.exceptions import ( CommandFailed, @@ -5870,3 +5871,118 @@ def create_rbd_deviceclass_storageclass( sc_obj = create_resource(**sc_data) sc_obj.reload() return sc_obj + + +def get_rbd_daemonset_csi_addons_node_object(node): + """ + Gets rdb daemonset CSI addons node data + + Args: + node (str): Name of the node + + Returns: + dict: CSI addons node object info + + """ + namespace = config.ENV_DATA["cluster_namespace"] + csi_addons_node = OCP(kind=constants.CSI_ADDONS_NODE_KIND, namespace=namespace) + csi_addons_node_data = csi_addons_node.get( + resource_name=f"{node}-{namespace}-daemonset-csi-rbdplugin" + ) + return csi_addons_node_data + + +def create_network_fence_class(): + """ + Create NetworkFenceClass CR and verify Ips are populated + in respective CsiAddonsNode objects + + """ + + logger.info("Creating NetworkFenceClass") + network_fence_class_dict = templating.load_yaml(constants.NETWORK_FENCE_CLASS_CRD) + network_fence_class_obj = create_resource(**network_fence_class_dict) + if network_fence_class_obj.ocp.get( + resource_name=network_fence_class_obj.name, dont_raise=True + ): + logger.info( + f"NetworkFenceClass {network_fence_class_obj.name} created successfully" + ) + + logger.info("Verifying CsiAddonsNode object for CSI RBD daemonset") + all_nodes = get_worker_nodes() + + for node_name in all_nodes: + cidrs = get_rbd_daemonset_csi_addons_node_object(node_name)["status"][ + "networkFenceClientStatus" + ][0]["ClientDetails"][0]["cidrs"] + assert len(cidrs) == 1, "No cidrs are populated to CSI Addons node object" + logger.info(f"Cidr: {cidrs[0]} populated in {node_name} CSI addons node object") + + +def create_network_fence(node_name, cidr): + """ + Create NetworkFence for the node + + Args: + node_name (str): Name of the node + cidr (str): cidr + + Returns: + OCS: NetworkFence object + + """ + network_fence_obj = OCP( + kind=constants.NETWORK_FENCE, + namespace=config.ENV_DATA["cluster_namespace"], + resource_name=node_name, + ) + + if not network_fence_obj.get(resource_name=node_name, dont_raise=True): + logger.info("Creating NetworkFence") + network_fence_dict = templating.load_yaml(constants.NETWORK_FENCE_CRD) + network_fence_dict["metadata"]["name"] = node_name + network_fence_dict["spec"]["cidrs"][0] = cidr + network_fence_obj = create_resource(**network_fence_dict) + if network_fence_obj.ocp.get( + resource_name=network_fence_obj.name, dont_raise=True + ): + logger.info( + f"NetworkFence {network_fence_obj.name} for node {node_name} created successfully" + ) + else: + logger.info(f"Network fence object for {node_name} already exists!") + return network_fence_obj + + +def unfence_node(node_name, delete=False): + """ + Un-fence node + + Args: + node_name (str): Name of the node + delete (bool): If True, delete the network fence object + + """ + + network_fence_obj = OCP( + kind=constants.NETWORK_FENCE, namespace=config.ENV_DATA["cluster_namespace"] + ) + + if network_fence_obj.get(resource_name=node_name, dont_raise=True): + network_fence_obj.patch( + resource_name=node_name, + params='{"spec":{"fenceState":"Unfenced"}}', + format_type="merge", + ) + assert ( + network_fence_obj.get(resource_name=node_name)["spec"]["fenceState"] + != "Fenced" + ), f"{node_name} doesnt seem to be unfenced" + logger.info(f"Unfenced node {node_name} successfully!") + + if delete: + network_fence_obj.delete(resource_name=node_name) + logger.info(f"Deleted network fence object for node {node_name}") + else: + logger.info(f"No networkfence found for node {node_name}") diff --git a/ocs_ci/ocs/constants.py b/ocs_ci/ocs/constants.py index 535b4218c92..3a1df0bf90b 100644 --- a/ocs_ci/ocs/constants.py +++ b/ocs_ci/ocs/constants.py @@ -250,6 +250,8 @@ ENCRYPTIONKEYROTATIONJOB = "encryptionkeyrotationjobs.csiaddons.openshift.io" DEFAULT_CEPH_DEVICECLASS = "defaultCephDeviceClass" CRD_KIND = "CustomResourceDefinition" +NETWORK_FENCE_CLASS = "NetworkFenceClass" +NETWORK_FENCE = "NetworkFence" # Provisioners AWS_EFS_PROVISIONER = "openshift.org/aws-efs" @@ -311,6 +313,9 @@ RAM = "rss" VIRT = "vms" +ODF_NETWORK_FENCE_CLASS = "odf-networkfenceclass" +CSI_ADDONS_NODE_KIND = "CSIAddonsNode" + # cluster types MS_CONSUMER_TYPE = "consumer" MS_PROVIDER_TYPE = "provider" @@ -2485,6 +2490,12 @@ LOGWRITER_CEPHFS_WRITER = os.path.join(LOGWRITER_DIR, "cephfs.logwriter.yaml") LOGWRITER_STS_PATH = os.path.join(LOGWRITER_DIR, "logwriter.rbd.yaml") +# Network Fence CRDs +NETWORK_FENCE_CLASS_CRD = os.path.join( + TEMPLATE_DIR, "network", "network-fence-class.yaml" +) +NETWORK_FENCE_CRD = os.path.join(TEMPLATE_DIR, "network", "network-fence.yaml") + # MCG namespace constants MCG_NS_AWS_ENDPOINT = "https://s3.amazonaws.com" MCG_NS_AZURE_ENDPOINT = "https://blob.core.windows.net" @@ -3088,6 +3099,8 @@ f"{ARBITER_ZONE}{DATA_ZONE_1}-{DATA_ZONE_1}{DATA_ZONE_2}" ) +NODE_OUT_OF_SERVICE_TAINT = "node.kubernetes.io/out-of-service=nodeshutdown:NoExecute" + # Logwriter workload labels LOGWRITER_CEPHFS_LABEL = "app=logwriter-cephfs" diff --git a/ocs_ci/ocs/resources/stretchcluster.py b/ocs_ci/ocs/resources/stretchcluster.py index b6fa4ebaa98..534fa45a7ea 100644 --- a/ocs_ci/ocs/resources/stretchcluster.py +++ b/ocs_ci/ocs/resources/stretchcluster.py @@ -319,7 +319,7 @@ def get_logfile_map(self, label): self.logfile_map[label][0] = list(set(self.logfile_map[label][0])) logger.info(self.logfile_map[label][0]) - @retry(UnexpectedBehaviour, tries=6, delay=5) + @retry(UnexpectedBehaviour, tries=8, delay=5) def get_logwriter_reader_pods( self, label, diff --git a/ocs_ci/templates/network/network-fence-class.yaml b/ocs_ci/templates/network/network-fence-class.yaml new file mode 100644 index 00000000000..eb0d44d145b --- /dev/null +++ b/ocs_ci/templates/network/network-fence-class.yaml @@ -0,0 +1,10 @@ +apiVersion: csiaddons.openshift.io/v1alpha1 +kind: NetworkFenceClass +metadata: + name: odf-networkfenceclass +spec: + provisioner: openshift-storage.rbd.csi.ceph.com + parameters: + clusterID: openshift-storage + csiaddons.openshift.io/networkfence-secret-name: rook-csi-rbd-node + csiaddons.openshift.io/networkfence-secret-namespace: openshift-storage diff --git a/ocs_ci/templates/network/network-fence.yaml b/ocs_ci/templates/network/network-fence.yaml new file mode 100644 index 00000000000..dfbbcb7e45b --- /dev/null +++ b/ocs_ci/templates/network/network-fence.yaml @@ -0,0 +1,14 @@ +apiVersion: csiaddons.openshift.io/v1alpha1 +kind: NetworkFence +metadata: + name: +spec: + cidrs: + - + driver: openshift-storage.rbd.csi.ceph.com + fenceState: Fenced + parameters: + clusterID: openshift-storage + secret: + name: rook-csi-rbd-provisioner + namespace: openshift-storage diff --git a/ocs_ci/templates/workloads/logwriter/cephfs.logreader.yaml b/ocs_ci/templates/workloads/logwriter/cephfs.logreader.yaml index 2dacb6e5a30..537bb91b4a7 100644 --- a/ocs_ci/templates/workloads/logwriter/cephfs.logreader.yaml +++ b/ocs_ci/templates/workloads/logwriter/cephfs.logreader.yaml @@ -7,6 +7,7 @@ metadata: spec: completions: 6 parallelism: 6 + backoffLimit: 10 completionMode: Indexed template: metadata: diff --git a/tests/conftest.py b/tests/conftest.py index 9ceb9af1ebe..61a34277029 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -184,6 +184,7 @@ get_current_test_name, modify_deployment_replica_count, modify_statefulset_replica_count, + create_network_fence_class, ) from ocs_ci.ocs.ceph_debug import CephObjectStoreTool, MonStoreTool, RookCephPlugin from ocs_ci.ocs.bucket_utils import get_rgw_restart_counts @@ -7972,7 +7973,7 @@ def setup_logwriter_workload(request, teardown_factory): """ - def factory(pvc, logwriter_path): + def factory(pvc, logwriter_path, zone_aware=True): """ Args: pvc (PVC): PVC object @@ -7991,6 +7992,10 @@ def factory(pvc, logwriter_path): dc_data["spec"]["template"]["spec"]["volumes"][0]["persistentVolumeClaim"][ "claimName" ] = pvc.name + + if not zone_aware: + dc_data["spec"]["template"]["spec"].pop("topologySpreadConstraints") + logwriter_dc = helpers.create_resource(**dc_data) teardown_factory(logwriter_dc) @@ -8025,7 +8030,7 @@ def logreader_workload_factory(request, teardown_factory): def setup_logreader_workload(request, teardown_factory): - def factory(pvc, logreader_path, duration=30): + def factory(pvc, logreader_path, duration=30, zone_aware=True): """ Args: pvc (PVC): PVC object @@ -8050,6 +8055,10 @@ def factory(pvc, logreader_path, duration=30): job_data["spec"]["template"]["spec"]["containers"][0]["command"][ 2 ] = f"/opt/logreader.py -t {duration} *.log -d" + + if not zone_aware: + job_data["spec"]["template"]["spec"].pop("topologySpreadConstraints") + logreader_job = helpers.create_resource(**job_data) teardown_factory(logreader_job) @@ -8125,7 +8134,7 @@ def setup_logwriter_cephfs_workload( """ - def factory(read_duration=30): + def factory(read_duration=30, **kwargs): """ Args: read_duration (int): Time duration in minutes @@ -8140,10 +8149,10 @@ def factory(read_duration=30): project_name=setup_stretch_cluster_project ) logwriter_workload = logwriter_workload_factory( - pvc=pvc, logwriter_path=logwriter_path + pvc=pvc, logwriter_path=logwriter_path, **kwargs ) logreader_workload = logreader_workload_factory( - pvc=pvc, logreader_path=logreader_path, duration=read_duration + pvc=pvc, logreader_path=logreader_path, duration=read_duration, **kwargs ) return logwriter_workload, logreader_workload @@ -8179,22 +8188,31 @@ def setup_logwriter_rbd_workload( """ - logwriter_sts_path = constants.LOGWRITER_STS_PATH - sts_data = templating.load_yaml(logwriter_sts_path) - sts_data["metadata"]["namespace"] = setup_stretch_cluster_project.namespace - logwriter_sts = helpers.create_resource(**sts_data) - teardown_factory(logwriter_sts) - logwriter_sts_pods = [ - pod["metadata"]["name"] - for pod in get_pods_having_label( - label="app=logwriter-rbd", namespace=setup_stretch_cluster_project.namespace + def factory(zone_aware=True): + + logwriter_sts_path = constants.LOGWRITER_STS_PATH + sts_data = templating.load_yaml(logwriter_sts_path) + sts_data["metadata"]["namespace"] = setup_stretch_cluster_project.namespace + if not zone_aware: + sts_data["spec"]["template"]["spec"].pop("topologySpreadConstraints") + + logwriter_sts = helpers.create_resource(**sts_data) + teardown_factory(logwriter_sts) + logwriter_sts_pods = [ + pod["metadata"]["name"] + for pod in get_pods_having_label( + label="app=logwriter-rbd", + namespace=setup_stretch_cluster_project.namespace, + ) + ] + wait_for_pods_to_be_running( + namespace=setup_stretch_cluster_project.namespace, + pod_names=logwriter_sts_pods, ) - ] - wait_for_pods_to_be_running( - namespace=setup_stretch_cluster_project.namespace, pod_names=logwriter_sts_pods - ) - return logwriter_sts + return logwriter_sts + + return factory @pytest.fixture() @@ -9161,3 +9179,38 @@ def pytest_sessionfinish(session, exitstatus): cluster_load.finish_cluster_load() except Exception: log.exception("During finishing the Cluster load an exception was hit!") + + +@pytest.fixture(scope="session") +def setup_network_fence_class(request): + """ + Setup NetworkFenceClass CRD for ODF if not present + + """ + try: + network_fence_class = OCP( + kind=constants.NETWORK_FENCE_CLASS, + namespace=ocsci_config.ENV_DATA["cluster_namespace"], + resource_name=constants.ODF_NETWORK_FENCE_CLASS, + ) + created_by_fixture = False + if not network_fence_class.get(dont_raise=True): + create_network_fence_class() + created_by_fixture = True + else: + log.info( + f"NetworkFenceClass {network_fence_class.resource_name} already exists!" + ) + finally: + + def finalizer(): + """ + Delete the NFC CRD if created by fixture + + """ + if created_by_fixture: + network_fence_class.delete( + resource_name=constants.ODF_NETWORK_FENCE_CLASS + ) + + request.addfinalizer(finalizer) diff --git a/tests/functional/disaster-recovery/sc_arbiter/test_zone_unaware_app.py b/tests/functional/disaster-recovery/sc_arbiter/test_zone_unaware_app.py new file mode 100644 index 00000000000..a256df273ae --- /dev/null +++ b/tests/functional/disaster-recovery/sc_arbiter/test_zone_unaware_app.py @@ -0,0 +1,354 @@ +import logging +import pytest +import time + +from ocs_ci.framework.pytest_customization.marks import ( + stretchcluster_required, + tier1, + turquoise_squad, +) +from ocs_ci.helpers.sanity_helpers import Sanity +from ocs_ci.ocs.node import taint_nodes, get_nodes, get_worker_nodes +from ocs_ci.helpers.helpers import ( + create_network_fence, + get_rbd_daemonset_csi_addons_node_object, + unfence_node, +) +from ocs_ci.helpers.stretchcluster_helper import check_for_logwriter_workload_pods +from ocs_ci.ocs import constants +from ocs_ci.ocs.exceptions import ( + UnexpectedBehaviour, + CommandFailed, + ResourceWrongStatusException, + CephHealthException, +) +from ocs_ci.ocs.node import wait_for_nodes_status +from ocs_ci.ocs.resources.pod import ( + get_pods_having_label, + Pod, + get_ceph_tools_pod, + wait_for_pods_to_be_in_statuses, + logger, +) +from ocs_ci.ocs.resources.pvc import get_pvc_objs +from ocs_ci.ocs.resources.stretchcluster import StretchCluster +from ocs_ci.utility.retry import retry + +log = logging.getLogger(__name__) + + +@tier1 +@stretchcluster_required +@turquoise_squad +class TestZoneUnawareApps: + + @pytest.fixture() + def init_sanity(self, request, nodes): + """ + Initial Cluster sanity + """ + self.sanity_helpers = Sanity() + + def finalizer(): + """ + Make sure all the nodes are Running and + the ceph health is OK at the end of the test + """ + + # check if all the nodes are Running + log.info("Checking if all the nodes are READY") + master_nodes = get_nodes(node_type=constants.MASTER_MACHINE) + worker_nodes = get_nodes(node_type=constants.WORKER_MACHINE) + nodes_not_ready = list() + nodes_not_ready.extend( + [node for node in worker_nodes if node.status() != "Ready"] + ) + nodes_not_ready.extend( + [node for node in master_nodes if node.status() != "Ready"] + ) + + if len(nodes_not_ready) != 0: + try: + nodes.start_nodes(nodes=nodes_not_ready) + except Exception: + log.error( + f"Something went wrong while starting the nodes {nodes_not_ready}!" + ) + raise + + retry( + ( + CommandFailed, + TimeoutError, + AssertionError, + ResourceWrongStatusException, + ), + tries=10, + delay=15, + )(wait_for_nodes_status(timeout=1800)) + log.info( + f"Following nodes {nodes_not_ready} were NOT READY, are now in READY state" + ) + else: + log.info("All nodes are READY") + + # check cluster health + try: + log.info("Making sure ceph health is OK") + self.sanity_helpers.health_check(tries=50, cluster_check=False) + except CephHealthException as e: + assert ( + "HEALTH_WARN" in e.args[0] + ), f"Ignoring Ceph health warnings: {e.args[0]}" + get_ceph_tools_pod().exec_ceph_cmd(ceph_cmd="ceph crash archive-all") + log.info("Archived ceph crash!") + + request.addfinalizer(finalizer) + + @pytest.fixture(autouse=True) + def unfence_teardown(self, request): + """ + In case of failure in between test run unfence the networkfence + and delete the NetworkFence objects + + """ + + def teardown(): + all_worker_nodes = get_worker_nodes() + for node_name in all_worker_nodes: + unfence_node(node_name, delete=True) + logger.info("cleaned up all network fence objects if any") + + request.addfinalizer(teardown) + + @pytest.mark.parametrize( + argnames="fencing", + argvalues=[ + pytest.param( + True, + ), + # pytest.param( + # False, + # ), + ], + ids=[ + "With-Fencing", + # "Without-Fencing", + ], + ) + def test_zone_shutdowns( + self, + init_sanity, + setup_logwriter_cephfs_workload_factory, + setup_logwriter_rbd_workload_factory, + logreader_workload_factory, + setup_network_fence_class, + nodes, + fencing, + ): + + sc_obj = StretchCluster() + + # Deploy the zone un-aware logwriter workloads + ( + sc_obj.cephfs_logwriter_dep, + sc_obj.cephfs_logreader_job, + ) = setup_logwriter_cephfs_workload_factory(read_duration=0, zone_aware=False) + + sc_obj.rbd_logwriter_sts = setup_logwriter_rbd_workload_factory( + zone_aware=False + ) + + # Fetch all the worker node names + worker_nodes = get_worker_nodes() + + for zone in constants.DATA_ZONE_LABELS: + + # Make sure logwriter workload pods are running + check_for_logwriter_workload_pods(sc_obj, nodes=nodes) + log.info("Both logwriter CephFS and RBD workloads are in healthy state") + + # Fetch logfile details to verify data integrity post recovery + sc_obj.get_logfile_map(label=constants.LOGWRITER_CEPHFS_LABEL) + sc_obj.get_logfile_map(label=constants.LOGWRITER_RBD_LABEL) + log.info( + "Fetched the logfile details for data integrity verification post recovery" + ) + + # Shutdown the nodes + nodes_to_shutdown = sc_obj.get_nodes_in_zone(zone) + nodes.stop_nodes(nodes=nodes_to_shutdown) + wait_for_nodes_status( + node_names=[node.name for node in nodes_to_shutdown], + status=constants.NODE_NOT_READY, + timeout=300, + ) + log.info(f"Nodes of zone {zone} are shutdown successfully") + + if fencing: + + # If fencing is True, then we need to fence the nodes after shutdown + log.info( + "Since fencing is enabled, we need to fence the nodes after zone shutdown" + ) + for node in nodes_to_shutdown: + + # Ignore the master nodes + if node.name not in worker_nodes: + continue + + # Fetch the cidrs for creating network fence + cidrs = retry(CommandFailed, tries=5)( + get_rbd_daemonset_csi_addons_node_object + )(node.name)["status"]["networkFenceClientStatus"][0][ + "ClientDetails" + ][ + 0 + ][ + "cidrs" + ] + + # Create the network fence + retry(CommandFailed, tries=5)(create_network_fence)( + node.name, cidr=cidrs[0] + ) + + # Taint the nodes that are shutdown + taint_nodes( + nodes=[node.name for node in nodes_to_shutdown], + taint_label=constants.NODE_OUT_OF_SERVICE_TAINT, + ) + + # Wait for the buffer time of pod relocation + log.info("Wait until the pod relocation buffer time of 10 minutes") + time.sleep(600) + + # Check if all the pods are running + log.info( + "Checking if all the logwriter/logreader pods are relocated and successfully running" + ) + sc_obj.get_logwriter_reader_pods(label=constants.LOGWRITER_CEPHFS_LABEL) + sc_obj.get_logwriter_reader_pods( + label=constants.LOGREADER_CEPHFS_LABEL, + statuses=[constants.STATUS_RUNNING, constants.STATUS_COMPLETED], + ) + try: + retry(UnexpectedBehaviour, tries=1)(sc_obj.get_logwriter_reader_pods)( + label=constants.LOGWRITER_RBD_LABEL, exp_num_replicas=2 + ) + except UnexpectedBehaviour: + if not fencing: + log.info( + "It is expected for RBD workload with RWO to stuck in terminating state" + ) + log.info("Trying the workaround now...") + pods_terminating = [ + Pod(**pod_info) + for pod_info in get_pods_having_label( + label=constants.LOGWRITER_RBD_LABEL, + namespace=constants.STRETCH_CLUSTER_NAMESPACE, + ) + ] + log.info(pods_terminating) + for pod in pods_terminating: + log.info(f"Force deleting the pod {pod.name}") + pod.delete(force=True) + sc_obj.get_logwriter_reader_pods( + label=constants.LOGWRITER_RBD_LABEL, exp_num_replicas=2 + ) + else: + log.error( + "Looks like pods are not running or not relocated even after fencing.. please check" + ) + raise + + if fencing: + + # If fencing is True, then unfence the nodes once the pods are relocated + log.info( + "If fencing was done, then we need to unfence the nodes once the pods are relocated and running" + ) + for node in nodes_to_shutdown: + if node.name not in worker_nodes: + continue + unfence_node(node.name, delete=True) + + # Remove the taints from the nodes that were shutdown + taint_nodes( + nodes=[node.name for node in nodes_to_shutdown], + taint_label=f"{constants.NODE_OUT_OF_SERVICE_TAINT}-", + ) + log.info( + "Successfully removed taints from the nodes that were shutdown" + ) + + # Start the nodes that were shutdown + log.info(f"Starting the {zone} nodes") + try: + nodes.start_nodes(nodes=nodes_to_shutdown) + except Exception: + log.error("Something went wrong while starting the nodes!") + raise + + # Validate all nodes are in READY state and up + retry( + ( + CommandFailed, + TimeoutError, + AssertionError, + ResourceWrongStatusException, + ), + tries=10, + delay=15, + )(wait_for_nodes_status(timeout=1800)) + log.info(f"Nodes of zone {zone} are started successfully") + + # Make sure all the logwriter pods are running + check_for_logwriter_workload_pods(sc_obj, nodes=nodes) + log.info("All logwriter workload pods are running!") + + # check for any data loss through logwriter logs + assert sc_obj.check_for_data_loss( + constants.LOGWRITER_CEPHFS_LABEL + ), "[CephFS] Data is lost" + log.info("[CephFS] No data loss is seen") + assert sc_obj.check_for_data_loss( + constants.LOGWRITER_RBD_LABEL + ), "[RBD] Data is lost" + log.info("[RBD] No data loss is seen") + + # check for data corruption through logreader logs + sc_obj.cephfs_logreader_job.delete() + for pod in sc_obj.cephfs_logreader_pods: + pod.wait_for_pod_delete(timeout=120) + log.info("All old CephFS logreader pods are deleted") + pvc = get_pvc_objs( + pvc_names=[ + sc_obj.cephfs_logwriter_dep.get()["spec"]["template"]["spec"][ + "volumes" + ][0]["persistentVolumeClaim"]["claimName"] + ], + namespace=constants.STRETCH_CLUSTER_NAMESPACE, + )[0] + logreader_workload_factory( + pvc=pvc, logreader_path=constants.LOGWRITER_CEPHFS_READER, duration=5 + ) + sc_obj.get_logwriter_reader_pods(constants.LOGREADER_CEPHFS_LABEL) + + wait_for_pods_to_be_in_statuses( + expected_statuses=constants.STATUS_COMPLETED, + pod_names=[pod.name for pod in sc_obj.cephfs_logreader_pods], + timeout=900, + namespace=constants.STRETCH_CLUSTER_NAMESPACE, + ) + log.info("[CephFS] Logreader job pods have reached 'Completed' state!") + + assert sc_obj.check_for_data_corruption( + label=constants.LOGREADER_CEPHFS_LABEL + ), "Data is corrupted for cephFS workloads" + log.info("No data corruption is seen in CephFS workloads") + + assert sc_obj.check_for_data_corruption( + label=constants.LOGWRITER_RBD_LABEL + ), "Data is corrupted for RBD workloads" + log.info("No data corruption is seen in RBD workloads")