Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -195,8 +195,9 @@ public SCMNodeManager(
this.numPipelinesPerMetadataVolume =
conf.getInt(ScmConfigKeys.OZONE_SCM_PIPELINE_PER_METADATA_VOLUME,
ScmConfigKeys.OZONE_SCM_PIPELINE_PER_METADATA_VOLUME_DEFAULT);
String dnLimit = conf.get(ScmConfigKeys.OZONE_DATANODE_PIPELINE_LIMIT);
this.heavyNodeCriteria = dnLimit == null ? 0 : Integer.parseInt(dnLimit);
this.heavyNodeCriteria = conf.getInt(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The existing field name heavyNodeCriteria is quite creative but unclear what does it mean. Could you rename it to datanodePipelineLimit?

ScmConfigKeys.OZONE_DATANODE_PIPELINE_LIMIT,
ScmConfigKeys.OZONE_DATANODE_PIPELINE_LIMIT_DEFAULT);
this.scmContext = scmContext;
this.sendCommandNotifyMap = new HashMap<>();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,9 @@ public PipelinePlacementPolicy(final NodeManager nodeManager,
super(nodeManager, conf);
this.nodeManager = nodeManager;
this.stateManager = stateManager;
String dnLimit = conf.get(ScmConfigKeys.OZONE_DATANODE_PIPELINE_LIMIT);
this.heavyNodeCriteria = dnLimit == null ? 0 : Integer.parseInt(dnLimit);
this.heavyNodeCriteria = conf.getInt(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Russole , there is one more field to rename.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the reminder.

ScmConfigKeys.OZONE_DATANODE_PIPELINE_LIMIT,
ScmConfigKeys.OZONE_DATANODE_PIPELINE_LIMIT_DEFAULT);
}

public static int currentRatisThreePipelineCount(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,9 @@ public RatisPipelineProvider(NodeManager nodeManager,
this.pipelineNumberLimit = conf.getInt(
ScmConfigKeys.OZONE_SCM_RATIS_PIPELINE_LIMIT,
ScmConfigKeys.OZONE_SCM_RATIS_PIPELINE_LIMIT_DEFAULT);
String dnLimit = conf.get(ScmConfigKeys.OZONE_DATANODE_PIPELINE_LIMIT);
this.maxPipelinePerDatanode = dnLimit == null ? 0 :
Integer.parseInt(dnLimit);
this.maxPipelinePerDatanode = conf.getInt(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's also rename it to datanodePipelineLimit. So it is easier to tell that they are the same thing.

ScmConfigKeys.OZONE_DATANODE_PIPELINE_LIMIT,
ScmConfigKeys.OZONE_DATANODE_PIPELINE_LIMIT_DEFAULT);
this.containerSizeBytes = (long) this.conf.getStorageSize(
ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE,
ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_DEFAULT,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2055,6 +2055,29 @@ public void testScmRegisterNodeWithUpdatedIpAndHostname()
}
}

/**
* Test that pipelineLimit() uses the default value when the config is not set.
*/
@Test
public void testPipelineLimitDefaultIsTwoWhenUnset()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rename it as something more generic:testUsesDefaultPipelineLimitWhenUnset()

throws IOException, AuthenticationException {

// Creates node manager with config without limit set
OzoneConfiguration conf = getConf();
conf.unset(ScmConfigKeys.OZONE_DATANODE_PIPELINE_LIMIT);

try (SCMNodeManager nodeManager = createNodeManager(conf)) {

// Registers datanode with healthy volumes
DatanodeDetails dn = registerWithCapacity(nodeManager);

// Calls pipelineLimit() and verifies returns 2
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// Calls pipelineLimit() and verifies returns 2
// Calls pipelineLimit() and verifies returns default value

int limit = nodeManager.pipelineLimit(dn);
assertEquals(ScmConfigKeys.OZONE_DATANODE_PIPELINE_LIMIT_DEFAULT, limit);
assertEquals(2, limit);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This assertion is redundant. The first assertion itself is enough to check that it picks the default value.

}
}

private static Stream<Arguments> nodeStateTransitions() {
return Stream.of(
// start decommissioning or entering maintenance
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -722,6 +722,60 @@ public void testCurrentRatisThreePipelineCount()
assertEquals(pipelineCount, 2);
}

@Test
public void testPipelinePlacementPolicyDefaultLimitFiltersNodeAtLimit()
throws IOException, TimeoutException {

// 1) Creates policy with config without limit set
OzoneConfiguration localConf = new OzoneConfiguration(conf);
localConf.unset(OZONE_DATANODE_PIPELINE_LIMIT);

MockNodeManager localNodeManager = new MockNodeManager(cluster,
getNodesWithRackAwareness(), false, PIPELINE_PLACEMENT_MAX_NODES_COUNT);

// Ensure NodeManager uses default limit (=2) when limit is not set in conf
localNodeManager.setNumPipelinePerDatanode(
ScmConfigKeys.OZONE_DATANODE_PIPELINE_LIMIT_DEFAULT);

PipelineStateManager localStateManager = PipelineStateManagerImpl.newBuilder()
.setPipelineStore(SCMDBDefinition.PIPELINES.getTable(dbStore))
.setRatisServer(scmhaManager.getRatisServer())
.setNodeManager(localNodeManager)
.setSCMDBTransactionBuffer(scmhaManager.getDBTransactionBuffer())
.build();

PipelinePlacementPolicy localPolicy = new PipelinePlacementPolicy(
localNodeManager, localStateManager, localConf);

List<DatanodeDetails> healthy =
localNodeManager.getNodes(NodeStatus.inServiceHealthy());
DatanodeDetails target = healthy.get(0);

// 2) Adds exactly 2 pipelines to test node (default limit)
List<DatanodeDetails> p1Dns = new ArrayList<>();
p1Dns.add(target);
p1Dns.add(healthy.get(1));
p1Dns.add(healthy.get(2));
createPipelineWithReplicationConfig(p1Dns, RATIS, THREE);

List<DatanodeDetails> p2Dns = new ArrayList<>();
p2Dns.add(target);
p2Dns.add(healthy.get(3));
p2Dns.add(healthy.get(4));
createPipelineWithReplicationConfig(p2Dns, RATIS, THREE);

assertEquals(2, PipelinePlacementPolicy.currentRatisThreePipelineCount(
localNodeManager, localStateManager, target));

// 3) Verifies node is filtered out when choosing nodes for new pipeline
int nodesRequired = HddsProtos.ReplicationFactor.THREE.getNumber();
List<DatanodeDetails> chosen = localPolicy.chooseDatanodes(
new ArrayList<>(), new ArrayList<>(), nodesRequired, 0, 0);

assertEquals(nodesRequired, chosen.size());
assertThat(chosen).doesNotContain(target);
}

private void createPipelineWithReplicationConfig(List<DatanodeDetails> dnList,
HddsProtos.ReplicationType
replicationType,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,56 @@ public void testCreatePipelinesWhenNotEnoughSpace(@TempDir File tempDir) throws
}
}

@Test
public void testCreatePipelineWithDefaultLimit() throws Exception {
// Create conf without setting OZONE_DATANODE_PIPELINE_LIMIT
OzoneConfiguration conf = new OzoneConfiguration();
conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, testDir.getAbsolutePath());

dbStore = DBStoreBuilder.createDBStore(conf, SCMDBDefinition.get());

// MockNodeManager(true, 10) typically gives 8 healthy nodes in this test suite.
nodeManager = new MockNodeManager(true, nodeCount);
// Give a large quota in MockNodeManager so we don't fail early due to mock quota.
nodeManager.setNumPipelinePerDatanode(100);

SCMHAManager scmhaManager = SCMHAManagerStub.getInstance(true);
stateManager = PipelineStateManagerImpl.newBuilder()
.setPipelineStore(SCMDBDefinition.PIPELINES.getTable(dbStore))
.setRatisServer(scmhaManager.getRatisServer())
.setNodeManager(nodeManager)
.setSCMDBTransactionBuffer(scmhaManager.getDBTransactionBuffer())
.build();

provider = new MockRatisPipelineProvider(nodeManager, stateManager, conf);

int healthyCount = nodeManager.getNodes(NodeStatus.inServiceHealthy()).size();
int defaultLimit = ScmConfigKeys.OZONE_DATANODE_PIPELINE_LIMIT_DEFAULT;
assertEquals(2, defaultLimit);

// Max pipelines before exceeding per-DN default limit.
int maxPipelines = (healthyCount * defaultLimit)
/ ReplicationFactor.THREE.getNumber();

// Create pipelines up to maxPipelines.
for (int i = 0; i < maxPipelines; i++) {
Pipeline p = provider.create(
RatisReplicationConfig.getInstance(ReplicationFactor.THREE),
new ArrayList<>(), new ArrayList<>());
stateManager.addPipeline(p.getProtobufMessage(ClientVersion.CURRENT_VERSION));
}

// Next pipeline creation should fail with default limit message.
SCMException ex = assertThrows(SCMException.class, () ->
provider.create(RatisReplicationConfig.getInstance(ReplicationFactor.THREE),
new ArrayList<>(), new ArrayList<>())
);

assertThat(ex.getMessage())
.contains("limit per datanode: " + defaultLimit)
.contains("replicationConfig: RATIS/THREE");
}

@ParameterizedTest
@CsvSource({ "1, 3", "2, 6"})
public void testCreatePipelineThrowErrorWithDataNodeLimit(int limit, int pipelineCount) throws Exception {
Expand Down