Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,11 @@ private boolean updateContainerState(final DatanodeDetails datanode,
deleteReplica(containerId, datanode, publisher, "DELETED", false, detailsForLogging);
return false;
}
if (replica.getState() == State.CLOSED && replica.getBlockCommitSequenceId() <= container.getSequenceId()
&& container.getReplicationType().equals(HddsProtos.ReplicationType.RATIS)) {
deleteReplica(containerId, datanode, publisher, "DELETED", true, detailsForLogging);
return false;
}
// HDDS-12421: fall-through to case DELETING
case DELETING:
// HDDS-11136: If a DELETING container has a non-empty CLOSED replica, transition the container to CLOSED
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,6 @@
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.EnumSource;
import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource;

/**
* Test the behaviour of the ContainerReportHandler.
Expand Down Expand Up @@ -204,6 +203,11 @@ static Stream<Arguments> containerAndReplicaStates() {
replicaState.equals(ContainerReplicaProto.State.QUASI_CLOSED)) {
continue;
}
if (replicationType == HddsProtos.ReplicationType.RATIS &&
replicaState.equals(ContainerReplicaProto.State.CLOSED) &&
containerState.equals(HddsProtos.LifeCycleState.DELETED)) {
continue;
}
for (ContainerReplicaProto.State invalidState : invalidReplicaStates) {
combinations.add(Arguments.of(replicationType, containerState, replicaState, invalidState));
}
Expand Down Expand Up @@ -1142,9 +1146,8 @@ public void closedECContainerKeyAndBytesUsedUpdatedToMinimumOfAllReplicas()
.getNumberOfKeys());
}

@ParameterizedTest
@ValueSource(booleans = {false, true})
public void testStaleReplicaOfDeletedContainer(boolean isEmpty) throws NodeNotFoundException, IOException {
@Test
public void testStaleReplicaOfDeletedContainer() throws NodeNotFoundException, IOException {
final ContainerReportHandler reportHandler = new ContainerReportHandler(nodeManager, containerManager);

final Iterator<DatanodeDetails> nodeIterator = nodeManager.getNodes(
Expand All @@ -1161,18 +1164,13 @@ public void testStaleReplicaOfDeletedContainer(boolean isEmpty) throws NodeNotFo

final ContainerReportsProto containerReport = getContainerReportsProto(
containerOne.containerID(), ContainerReplicaProto.State.CLOSED,
datanodeOne.getUuidString(), 0, isEmpty);
datanodeOne.getUuidString(), 0, true);
final ContainerReportFromDatanode containerReportFromDatanode =
new ContainerReportFromDatanode(datanodeOne, containerReport);
reportHandler.onMessage(containerReportFromDatanode, publisher);

if (isEmpty) {
// Expect the replica to be deleted when it is empty
verify(publisher, times(1)).fireEvent(any(), any(CommandForDatanode.class));
} else {
// Expect the replica to stay when it is NOT empty
verify(publisher, times(0)).fireEvent(any(), any(CommandForDatanode.class));
}
// Expect the replica to be deleted when it is empty
verify(publisher, times(1)).fireEvent(any(), any(CommandForDatanode.class));
assertEquals(1, containerManager.getContainerReplicas(containerOne.containerID()).size());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,19 @@

package org.apache.hadoop.hdds.scm.container;

import static org.apache.hadoop.hdds.protocol.MockDatanodeDetails.randomDatanodeDetails;
import static org.apache.hadoop.hdds.scm.HddsTestUtils.getContainer;
import static org.apache.hadoop.hdds.scm.HddsTestUtils.getECContainer;
import static org.apache.ratis.util.Preconditions.assertTrue;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import java.io.File;
Expand All @@ -32,30 +40,41 @@
import java.util.Set;
import java.util.concurrent.TimeoutException;
import org.apache.hadoop.hdds.HddsConfigKeys;
import org.apache.hadoop.hdds.client.ECReplicationConfig;
import org.apache.hadoop.hdds.client.RatisReplicationConfig;
import org.apache.hadoop.hdds.client.StandaloneReplicationConfig;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto;
import org.apache.hadoop.hdds.scm.container.common.helpers.InvalidContainerStateException;
import org.apache.hadoop.hdds.scm.container.replication.ContainerReplicaPendingOps;
import org.apache.hadoop.hdds.scm.events.SCMEvents;
import org.apache.hadoop.hdds.scm.ha.SCMContext;
import org.apache.hadoop.hdds.scm.ha.SCMHAManager;
import org.apache.hadoop.hdds.scm.ha.SCMHAManagerStub;
import org.apache.hadoop.hdds.scm.metadata.SCMDBDefinition;
import org.apache.hadoop.hdds.scm.node.NodeStatus;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher;
import org.apache.hadoop.hdds.server.events.EventPublisher;
import org.apache.hadoop.hdds.utils.db.DBStore;
import org.apache.hadoop.hdds.utils.db.DBStoreBuilder;
import org.apache.hadoop.ozone.common.statemachine.InvalidStateTransitionException;
import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode;
import org.apache.hadoop.ozone.protocol.commands.DeleteContainerCommand;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
import org.mockito.ArgumentCaptor;

/**
* Testing ContainerStatemanager.
Expand All @@ -67,9 +86,13 @@ public class TestContainerStateManager {
private File testDir;
private DBStore dbStore;
private Pipeline pipeline;
private MockNodeManager nodeManager;
private ContainerManager containerManager;
private SCMContext scmContext;
private EventPublisher publisher;

@BeforeEach
public void init() throws IOException, TimeoutException {
public void init() throws IOException, TimeoutException, InvalidStateTransitionException {
OzoneConfiguration conf = new OzoneConfiguration();
SCMHAManager scmhaManager = SCMHAManagerStub.getInstance(true);
conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, testDir.getAbsolutePath());
Expand All @@ -95,6 +118,48 @@ public void init() throws IOException, TimeoutException {
Clock.system(ZoneId.systemDefault()), null))
.build();

nodeManager = new MockNodeManager(true, 10);
containerManager = mock(ContainerManager.class);
scmContext = SCMContext.emptyContext();
scmContext.updateLeaderAndTerm(true, 1L);
publisher = mock(EventPublisher.class);

when(containerManager.getContainer(any(ContainerID.class)))
.thenAnswer(invocation -> containerStateManager
.getContainer((ContainerID) invocation.getArguments()[0]));

when(containerManager.getContainerReplicas(any(ContainerID.class)))
.thenAnswer(invocation -> containerStateManager
.getContainerReplicas((ContainerID) invocation.getArguments()[0]));

doAnswer(invocation -> {
containerStateManager.updateContainerStateWithSequenceId(
((ContainerID) invocation.getArguments()[0]).getProtobuf(),
(HddsProtos.LifeCycleEvent) invocation.getArguments()[1], 0L);
return null;
}).when(containerManager).updateContainerState(
any(ContainerID.class), any(HddsProtos.LifeCycleEvent.class));

doAnswer(invocation -> {
containerStateManager.updateContainerReplica(
(ContainerReplica) invocation.getArguments()[1]);
return null;
}).when(containerManager).updateContainerReplica(
any(ContainerID.class), any(ContainerReplica.class));

doAnswer(invocation -> {
containerStateManager.removeContainerReplica(
(ContainerReplica) invocation.getArguments()[1]);
return null;
}).when(containerManager).removeContainerReplica(
any(ContainerID.class), any(ContainerReplica.class));

doAnswer(invocation -> {
containerStateManager.transitionDeletingOrDeletedToClosedState(
((ContainerID) invocation.getArgument(0)).getProtobuf());
return null;
}).when(containerManager).transitionDeletingOrDeletedToClosedState(any(ContainerID.class));

}

@AfterEach
Expand Down Expand Up @@ -197,6 +262,143 @@ public void testTransitionContainerToClosedStateAllowOnlyDeletingOrDeletedContai
}
}

/**
* DELETED container + CLOSED replica with BCSID <= container seqId + RATIS replication.
* Expected: Force delete command should be sent (force=true)
*/
@Test
public void testDeletedContainerWithStaleClosedReplicaRatis()
throws IOException {
final ContainerReportHandler reportHandler =
new ContainerReportHandler(nodeManager, containerManager, scmContext, null);

final DatanodeDetails datanode = nodeManager.getNodes(
NodeStatus.inServiceHealthy()).iterator().next();

// Create a DELETED RATIS container with sequenceId = 10000L
final ContainerInfo container = getContainer(HddsProtos.LifeCycleState.DELETED);
containerStateManager.addContainer(container.getProtobuf());

// Verify it's RATIS type
assertEquals(HddsProtos.ReplicationType.RATIS, container.getReplicationType());

// Report a CLOSED replica with BCSID = 10000L (equal to container's seqId)
final StorageContainerDatanodeProtocolProtos.ContainerReportsProto containerReport = getContainerReportsProto(
container.containerID(),
ContainerReplicaProto.State.CLOSED,
datanode.getUuidString(),
10000L, // BCSID matches container sequenceId
false); // not empty

final SCMDatanodeHeartbeatDispatcher.ContainerReportFromDatanode reportFromDatanode =
new SCMDatanodeHeartbeatDispatcher.ContainerReportFromDatanode(datanode, containerReport);
reportHandler.onMessage(reportFromDatanode, publisher);

ArgumentCaptor<CommandForDatanode<?>> commandCaptor =
ArgumentCaptor.forClass(CommandForDatanode.class);
verify(publisher, times(1))
.fireEvent(eq(SCMEvents.DATANODE_COMMAND), commandCaptor.capture());

// Verify it's a DeleteContainerCommand with force=true
CommandForDatanode<?> capturedCommand = commandCaptor.getValue();
assertEquals(DeleteContainerCommand.class, capturedCommand.getCommand().getClass());
DeleteContainerCommand deleteCommand = (DeleteContainerCommand) capturedCommand.getCommand();
assertEquals(true, deleteCommand.isForce(),
"Delete command should have force=true for stale RATIS replica");
assertEquals(container.getContainerID(), deleteCommand.getContainerID());

// Container should remain in DELETED state
assertEquals(HddsProtos.LifeCycleState.DELETED,
containerManager.getContainer(container.containerID()).getState());
}

/**
* Test: DELETED container + CLOSED replica with BCSID < container seqId + RATIS.
* Expected: Force delete command should be sent (BCSID is lower)
*/
@Test
public void testDeletedContainerWithLowerBcsidStaleReplicaRatis()
throws IOException {
final ContainerReportHandler reportHandler =
new ContainerReportHandler(nodeManager, containerManager, scmContext, null);

final DatanodeDetails datanode = nodeManager.getNodes(
NodeStatus.inServiceHealthy()).iterator().next();

final ContainerInfo container = getContainer(HddsProtos.LifeCycleState.DELETED);
containerStateManager.addContainer(container.getProtobuf());

// Report a CLOSED replica with BCSID = 9000L (lower than container's 10000L)
final StorageContainerDatanodeProtocolProtos.ContainerReportsProto containerReport = getContainerReportsProto(
container.containerID(),
ContainerReplicaProto.State.CLOSED,
datanode.getUuidString(),
9000L, // BCSID < container sequenceId
false);

final SCMDatanodeHeartbeatDispatcher.ContainerReportFromDatanode reportFromDatanode =
new SCMDatanodeHeartbeatDispatcher.ContainerReportFromDatanode(datanode, containerReport);
reportHandler.onMessage(reportFromDatanode, publisher);

// Should send force delete command
ArgumentCaptor<CommandForDatanode<?>> commandCaptor =
ArgumentCaptor.forClass(CommandForDatanode.class);
verify(publisher, times(1))
.fireEvent(eq(SCMEvents.DATANODE_COMMAND), commandCaptor.capture());

DeleteContainerCommand deleteCommand =
(DeleteContainerCommand) commandCaptor.getValue().getCommand();
assertTrue(deleteCommand.isForce());
}

/**
* DELETED EC container + CLOSED replica with BCSID <= container seqId.
* Expected: Should NOT send force delete
* Should transition to CLOSED instead
*/
@Test
public void testDeletedECContainerWithStaleClosedReplicaShouldNotForceDelete()
throws IOException {
final ContainerReportHandler reportHandler =
new ContainerReportHandler(nodeManager, containerManager, scmContext, null);

final DatanodeDetails datanode = randomDatanodeDetails();
nodeManager.register(datanode, null, null);

// Create a DELETED EC container
ECReplicationConfig repConfig = new ECReplicationConfig(3, 2);
final ContainerInfo ecContainer = getECContainer(
HddsProtos.LifeCycleState.DELETED,
PipelineID.randomId(),
repConfig);
containerStateManager.addContainer(ecContainer.getProtobuf());

// Verify it's EC type
assertEquals(HddsProtos.ReplicationType.EC, ecContainer.getReplicationType());

// Report a CLOSED replica with BCSID = container's seqId
final StorageContainerDatanodeProtocolProtos.ContainerReportsProto containerReport = getContainerReportsProto(
ecContainer.containerID(),
ContainerReplicaProto.State.CLOSED,
datanode.getUuidString(),
ecContainer.getSequenceId(), // BCSID matches
false, // not empty
1); // replica index 1

final SCMDatanodeHeartbeatDispatcher.ContainerReportFromDatanode reportFromDatanode =
new SCMDatanodeHeartbeatDispatcher.ContainerReportFromDatanode(datanode, containerReport);
reportHandler.onMessage(reportFromDatanode, publisher);

// Should NOT send any delete command
// Instead should transition to CLOSED
verify(publisher, times(0))
.fireEvent(eq(SCMEvents.DATANODE_COMMAND), any(CommandForDatanode.class));

// Container should transition to CLOSED
assertEquals(HddsProtos.LifeCycleState.CLOSED,
containerManager.getContainer(ecContainer.containerID()).getState());
}

@Test
public void testSequenceIdOnStateUpdate() throws Exception {
ContainerID containerID = ContainerID.valueOf(3L);
Expand Down Expand Up @@ -264,4 +466,42 @@ private ContainerInfo allocateContainer()
return containerInfo;
}

private static StorageContainerDatanodeProtocolProtos.ContainerReportsProto getContainerReportsProto(
final ContainerID containerId,
final ContainerReplicaProto.State state,
final String originNodeId,
final long bcsId,
final boolean isEmpty) {
return getContainerReportsProto(containerId, state, originNodeId, bcsId, isEmpty, 0);
}

private static StorageContainerDatanodeProtocolProtos.ContainerReportsProto getContainerReportsProto(
final ContainerID containerId,
final ContainerReplicaProto.State state,
final String originNodeId,
final long bcsId,
final boolean isEmpty,
final int replicaIndex) {
final StorageContainerDatanodeProtocolProtos.ContainerReportsProto.Builder crBuilder =
StorageContainerDatanodeProtocolProtos.ContainerReportsProto.newBuilder();
final ContainerReplicaProto replicaProto =
ContainerReplicaProto.newBuilder()
.setContainerID(containerId.getProtobuf().getId())
.setState(state)
.setOriginNodeId(originNodeId)
.setSize(5368709120L)
.setUsed(isEmpty ? 0L : 2000000000L)
.setKeyCount(isEmpty ? 0L : 100000000L)
.setReadCount(100000000L)
.setWriteCount(100000000L)
.setReadBytes(2000000000L)
.setWriteBytes(2000000000L)
.setBlockCommitSequenceId(bcsId)
.setDeleteTransactionId(0)
.setReplicaIndex(replicaIndex)
.setIsEmpty(isEmpty)
.build();
return crBuilder.addReports(replicaProto).build();
}

}
Loading