Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,11 @@ private boolean updateContainerState(final DatanodeDetails datanode,
deleteReplica(containerId, datanode, publisher, "DELETED", false, detailsForLogging);
return false;
}
if (replica.getState() == State.CLOSED && replica.getBlockCommitSequenceId() <= container.getSequenceId()
&& container.getReplicationType().equals(HddsProtos.ReplicationType.RATIS)) {
deleteReplica(containerId, datanode, publisher, "DELETED", true, detailsForLogging);
return false;
}
// HDDS-12421: fall-through to case DELETING
case DELETING:
// HDDS-11136: If a DELETING container has a non-empty CLOSED replica, transition the container to CLOSED
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,6 @@
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.EnumSource;
import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource;

/**
* Test the behaviour of the ContainerReportHandler.
Expand Down Expand Up @@ -204,6 +203,11 @@ static Stream<Arguments> containerAndReplicaStates() {
replicaState.equals(ContainerReplicaProto.State.QUASI_CLOSED)) {
continue;
}
if (replicationType == HddsProtos.ReplicationType.RATIS &&
replicaState.equals(ContainerReplicaProto.State.CLOSED) &&
containerState.equals(HddsProtos.LifeCycleState.DELETED)) {
continue;
}
for (ContainerReplicaProto.State invalidState : invalidReplicaStates) {
combinations.add(Arguments.of(replicationType, containerState, replicaState, invalidState));
}
Expand Down Expand Up @@ -1142,9 +1146,8 @@ public void closedECContainerKeyAndBytesUsedUpdatedToMinimumOfAllReplicas()
.getNumberOfKeys());
}

@ParameterizedTest
@ValueSource(booleans = {false, true})
public void testStaleReplicaOfDeletedContainer(boolean isEmpty) throws NodeNotFoundException, IOException {
@Test
public void testStaleReplicaOfDeletedContainer() throws NodeNotFoundException, IOException {
final ContainerReportHandler reportHandler = new ContainerReportHandler(nodeManager, containerManager);

final Iterator<DatanodeDetails> nodeIterator = nodeManager.getNodes(
Expand All @@ -1161,18 +1164,13 @@ public void testStaleReplicaOfDeletedContainer(boolean isEmpty) throws NodeNotFo

final ContainerReportsProto containerReport = getContainerReportsProto(
containerOne.containerID(), ContainerReplicaProto.State.CLOSED,
datanodeOne.getUuidString(), 0, isEmpty);
datanodeOne.getUuidString(), 0, true);
final ContainerReportFromDatanode containerReportFromDatanode =
new ContainerReportFromDatanode(datanodeOne, containerReport);
reportHandler.onMessage(containerReportFromDatanode, publisher);

if (isEmpty) {
// Expect the replica to be deleted when it is empty
verify(publisher, times(1)).fireEvent(any(), any(CommandForDatanode.class));
} else {
// Expect the replica to stay when it is NOT empty
verify(publisher, times(0)).fireEvent(any(), any(CommandForDatanode.class));
}
// Expect the replica to be deleted when it is empty
verify(publisher, times(1)).fireEvent(any(), any(CommandForDatanode.class));
assertEquals(1, containerManager.getContainerReplicas(containerOne.containerID()).size());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,19 @@

package org.apache.hadoop.hdds.scm.container;

import static org.apache.hadoop.hdds.protocol.MockDatanodeDetails.randomDatanodeDetails;
import static org.apache.hadoop.hdds.scm.HddsTestUtils.getContainer;
import static org.apache.hadoop.hdds.scm.HddsTestUtils.getECContainer;
import static org.apache.ratis.util.Preconditions.assertTrue;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import java.io.File;
Expand All @@ -32,30 +40,41 @@
import java.util.Set;
import java.util.concurrent.TimeoutException;
import org.apache.hadoop.hdds.HddsConfigKeys;
import org.apache.hadoop.hdds.client.ECReplicationConfig;
import org.apache.hadoop.hdds.client.RatisReplicationConfig;
import org.apache.hadoop.hdds.client.StandaloneReplicationConfig;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto;
import org.apache.hadoop.hdds.scm.container.common.helpers.InvalidContainerStateException;
import org.apache.hadoop.hdds.scm.container.replication.ContainerReplicaPendingOps;
import org.apache.hadoop.hdds.scm.events.SCMEvents;
import org.apache.hadoop.hdds.scm.ha.SCMContext;
import org.apache.hadoop.hdds.scm.ha.SCMHAManager;
import org.apache.hadoop.hdds.scm.ha.SCMHAManagerStub;
import org.apache.hadoop.hdds.scm.metadata.SCMDBDefinition;
import org.apache.hadoop.hdds.scm.node.NodeStatus;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher;
import org.apache.hadoop.hdds.server.events.EventPublisher;
import org.apache.hadoop.hdds.utils.db.DBStore;
import org.apache.hadoop.hdds.utils.db.DBStoreBuilder;
import org.apache.hadoop.ozone.common.statemachine.InvalidStateTransitionException;
import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode;
import org.apache.hadoop.ozone.protocol.commands.DeleteContainerCommand;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
import org.mockito.ArgumentCaptor;

/**
* Testing ContainerStatemanager.
Expand All @@ -67,9 +86,13 @@ public class TestContainerStateManager {
private File testDir;
private DBStore dbStore;
private Pipeline pipeline;
private MockNodeManager nodeManager;
private ContainerManager containerManager;
private SCMContext scmContext;
private EventPublisher publisher;

@BeforeEach
public void init() throws IOException, TimeoutException {
public void init() throws IOException, TimeoutException, InvalidStateTransitionException {
OzoneConfiguration conf = new OzoneConfiguration();
SCMHAManager scmhaManager = SCMHAManagerStub.getInstance(true);
conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, testDir.getAbsolutePath());
Expand All @@ -95,6 +118,48 @@ public void init() throws IOException, TimeoutException {
Clock.system(ZoneId.systemDefault()), null))
.build();

nodeManager = new MockNodeManager(true, 10);
containerManager = mock(ContainerManager.class);
scmContext = SCMContext.emptyContext();
scmContext.updateLeaderAndTerm(true, 1L);
publisher = mock(EventPublisher.class);

when(containerManager.getContainer(any(ContainerID.class)))
.thenAnswer(invocation -> containerStateManager
.getContainer((ContainerID) invocation.getArguments()[0]));

when(containerManager.getContainerReplicas(any(ContainerID.class)))
.thenAnswer(invocation -> containerStateManager
.getContainerReplicas((ContainerID) invocation.getArguments()[0]));

doAnswer(invocation -> {
containerStateManager.updateContainerStateWithSequenceId(
((ContainerID) invocation.getArguments()[0]).getProtobuf(),
(HddsProtos.LifeCycleEvent) invocation.getArguments()[1], 0L);
return null;
}).when(containerManager).updateContainerState(
any(ContainerID.class), any(HddsProtos.LifeCycleEvent.class));

doAnswer(invocation -> {
containerStateManager.updateContainerReplica(
(ContainerReplica) invocation.getArguments()[1]);
return null;
}).when(containerManager).updateContainerReplica(
any(ContainerID.class), any(ContainerReplica.class));

doAnswer(invocation -> {
containerStateManager.removeContainerReplica(
(ContainerReplica) invocation.getArguments()[1]);
return null;
}).when(containerManager).removeContainerReplica(
any(ContainerID.class), any(ContainerReplica.class));

doAnswer(invocation -> {
containerStateManager.transitionDeletingOrDeletedToClosedState(
((ContainerID) invocation.getArgument(0)).getProtobuf());
return null;
}).when(containerManager).transitionDeletingOrDeletedToClosedState(any(ContainerID.class));

}

@AfterEach
Expand Down Expand Up @@ -197,6 +262,143 @@ public void testTransitionContainerToClosedStateAllowOnlyDeletingOrDeletedContai
}
}

/**
* DELETED container + CLOSED replica with BCSID <= container seqId + RATIS replication.
* Expected: Force delete command should be sent (force=true)
*/
@Test
public void testDeletedContainerWithStaleClosedReplicaRatis()
throws IOException {
final ContainerReportHandler reportHandler =
new ContainerReportHandler(nodeManager, containerManager, scmContext, null);

final DatanodeDetails datanode = nodeManager.getNodes(
NodeStatus.inServiceHealthy()).iterator().next();

// Create a DELETED RATIS container with sequenceId = 10000L
final ContainerInfo container = getContainer(HddsProtos.LifeCycleState.DELETED);
containerStateManager.addContainer(container.getProtobuf());

// Verify it's RATIS type
assertEquals(HddsProtos.ReplicationType.RATIS, container.getReplicationType());

// Report a CLOSED replica with BCSID = 10000L (equal to container's seqId)
final StorageContainerDatanodeProtocolProtos.ContainerReportsProto containerReport = getContainerReportsProto(
container.containerID(),
ContainerReplicaProto.State.CLOSED,
datanode.getUuidString(),
10000L, // BCSID matches container sequenceId
false); // not empty

final SCMDatanodeHeartbeatDispatcher.ContainerReportFromDatanode reportFromDatanode =
new SCMDatanodeHeartbeatDispatcher.ContainerReportFromDatanode(datanode, containerReport);
reportHandler.onMessage(reportFromDatanode, publisher);

ArgumentCaptor<CommandForDatanode<?>> commandCaptor =
ArgumentCaptor.forClass(CommandForDatanode.class);
verify(publisher, times(1))
.fireEvent(eq(SCMEvents.DATANODE_COMMAND), commandCaptor.capture());

// Verify it's a DeleteContainerCommand with force=true
CommandForDatanode<?> capturedCommand = commandCaptor.getValue();
assertEquals(DeleteContainerCommand.class, capturedCommand.getCommand().getClass());
DeleteContainerCommand deleteCommand = (DeleteContainerCommand) capturedCommand.getCommand();
assertEquals(true, deleteCommand.isForce(),
"Delete command should have force=true for stale RATIS replica");
assertEquals(container.getContainerID(), deleteCommand.getContainerID());

// Container should remain in DELETED state
assertEquals(HddsProtos.LifeCycleState.DELETED,
containerManager.getContainer(container.containerID()).getState());
}

/**
* Test: DELETED container + CLOSED replica with BCSID < container seqId + RATIS.
* Expected: Force delete command should be sent (BCSID is lower)
*/
@Test
public void testDeletedContainerWithLowerBcsidStaleReplicaRatis()
throws IOException {
final ContainerReportHandler reportHandler =
new ContainerReportHandler(nodeManager, containerManager, scmContext, null);

final DatanodeDetails datanode = nodeManager.getNodes(
NodeStatus.inServiceHealthy()).iterator().next();

final ContainerInfo container = getContainer(HddsProtos.LifeCycleState.DELETED);
containerStateManager.addContainer(container.getProtobuf());

// Report a CLOSED replica with BCSID = 9000L (lower than container's 10000L)
final StorageContainerDatanodeProtocolProtos.ContainerReportsProto containerReport = getContainerReportsProto(
container.containerID(),
ContainerReplicaProto.State.CLOSED,
datanode.getUuidString(),
9000L, // BCSID < container sequenceId
false);

final SCMDatanodeHeartbeatDispatcher.ContainerReportFromDatanode reportFromDatanode =
new SCMDatanodeHeartbeatDispatcher.ContainerReportFromDatanode(datanode, containerReport);
reportHandler.onMessage(reportFromDatanode, publisher);

// Should send force delete command
ArgumentCaptor<CommandForDatanode<?>> commandCaptor =
ArgumentCaptor.forClass(CommandForDatanode.class);
verify(publisher, times(1))
.fireEvent(eq(SCMEvents.DATANODE_COMMAND), commandCaptor.capture());

DeleteContainerCommand deleteCommand =
(DeleteContainerCommand) commandCaptor.getValue().getCommand();
assertTrue(deleteCommand.isForce());
}

/**
* DELETED EC container + CLOSED replica with BCSID <= container seqId.
* Expected: Should NOT send force delete
* Should transition to CLOSED instead
*/
@Test
public void testDeletedECContainerWithStaleClosedReplicaShouldNotForceDelete()
throws IOException {
final ContainerReportHandler reportHandler =
new ContainerReportHandler(nodeManager, containerManager, scmContext, null);

final DatanodeDetails datanode = randomDatanodeDetails();
nodeManager.register(datanode, null, null);

// Create a DELETED EC container
ECReplicationConfig repConfig = new ECReplicationConfig(3, 2);
final ContainerInfo ecContainer = getECContainer(
HddsProtos.LifeCycleState.DELETED,
PipelineID.randomId(),
repConfig);
containerStateManager.addContainer(ecContainer.getProtobuf());

// Verify it's EC type
assertEquals(HddsProtos.ReplicationType.EC, ecContainer.getReplicationType());

// Report a CLOSED replica with BCSID = container's seqId
final StorageContainerDatanodeProtocolProtos.ContainerReportsProto containerReport = getContainerReportsProto(
ecContainer.containerID(),
ContainerReplicaProto.State.CLOSED,
datanode.getUuidString(),
ecContainer.getSequenceId(), // BCSID matches
false, // not empty
1); // replica index 1

final SCMDatanodeHeartbeatDispatcher.ContainerReportFromDatanode reportFromDatanode =
new SCMDatanodeHeartbeatDispatcher.ContainerReportFromDatanode(datanode, containerReport);
reportHandler.onMessage(reportFromDatanode, publisher);

// Should NOT send any delete command
// Instead should transition to CLOSED
verify(publisher, times(0))
.fireEvent(eq(SCMEvents.DATANODE_COMMAND), any(CommandForDatanode.class));

// Container should transition to CLOSED
assertEquals(HddsProtos.LifeCycleState.CLOSED,
containerManager.getContainer(ecContainer.containerID()).getState());
}

@Test
public void testSequenceIdOnStateUpdate() throws Exception {
ContainerID containerID = ContainerID.valueOf(3L);
Expand Down Expand Up @@ -264,4 +466,42 @@ private ContainerInfo allocateContainer()
return containerInfo;
}

private static StorageContainerDatanodeProtocolProtos.ContainerReportsProto getContainerReportsProto(
final ContainerID containerId,
final ContainerReplicaProto.State state,
final String originNodeId,
final long bcsId,
final boolean isEmpty) {
return getContainerReportsProto(containerId, state, originNodeId, bcsId, isEmpty, 0);
}

private static StorageContainerDatanodeProtocolProtos.ContainerReportsProto getContainerReportsProto(
final ContainerID containerId,
final ContainerReplicaProto.State state,
final String originNodeId,
final long bcsId,
final boolean isEmpty,
final int replicaIndex) {
final StorageContainerDatanodeProtocolProtos.ContainerReportsProto.Builder crBuilder =
StorageContainerDatanodeProtocolProtos.ContainerReportsProto.newBuilder();
final ContainerReplicaProto replicaProto =
ContainerReplicaProto.newBuilder()
.setContainerID(containerId.getProtobuf().getId())
.setState(state)
.setOriginNodeId(originNodeId)
.setSize(5368709120L)
.setUsed(isEmpty ? 0L : 2000000000L)
.setKeyCount(isEmpty ? 0L : 100000000L)
.setReadCount(100000000L)
.setWriteCount(100000000L)
.setReadBytes(2000000000L)
.setWriteBytes(2000000000L)
.setBlockCommitSequenceId(bcsId)
.setDeleteTransactionId(0)
.setReplicaIndex(replicaIndex)
.setIsEmpty(isEmpty)
.build();
return crBuilder.addReports(replicaProto).build();
}

}
Loading