Skip to content

Commit f30dd43

Browse files
committed
HDFS-17632. RBF: Support listOpenFiles for routers
1 parent 1f099fe commit f30dd43

File tree

3 files changed

+289
-4
lines changed

3 files changed

+289
-4
lines changed

hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java

Lines changed: 45 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import static org.apache.hadoop.hdfs.server.federation.router.FederationUtil.updateMountPointStatus;
2323
import org.apache.hadoop.conf.Configuration;
2424
import org.apache.hadoop.crypto.CryptoProtocolVersion;
25+
import org.apache.hadoop.fs.BatchedRemoteIterator;
2526
import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedEntries;
2627
import org.apache.hadoop.fs.CacheFlag;
2728
import org.apache.hadoop.fs.ContentSummary;
@@ -1977,8 +1978,50 @@ public BatchedEntries<OpenFileEntry> listOpenFiles(long prevId)
19771978
public BatchedEntries<OpenFileEntry> listOpenFiles(long prevId,
19781979
EnumSet<OpenFilesIterator.OpenFilesType> openFilesTypes, String path)
19791980
throws IOException {
1980-
rpcServer.checkOperation(NameNode.OperationCategory.READ, false);
1981-
return null;
1981+
rpcServer.checkOperation(NameNode.OperationCategory.READ, true);
1982+
List<RemoteLocation> locations = rpcServer.getLocationsForPath(path, false, false);
1983+
RemoteMethod method =
1984+
new RemoteMethod("listOpenFiles", new Class<?>[] {long.class, EnumSet.class, String.class},
1985+
prevId, openFilesTypes, new RemoteParam());
1986+
Map<RemoteLocation, BatchedEntries> results =
1987+
rpcClient.invokeConcurrent(locations, method, true, false, -1, BatchedEntries.class);
1988+
1989+
// Get the largest inodeIds for each namespace, and the smallest inodeId of them
1990+
// then ignore all entries above this id to keep a consistent prevId for the next listOpenFiles
1991+
long minOfMax = Long.MAX_VALUE;
1992+
for (BatchedEntries nsEntries : results.values()) {
1993+
// Only need to care about namespaces that still have more files to report
1994+
if (!nsEntries.hasMore()) {
1995+
continue;
1996+
}
1997+
long max = 0;
1998+
for (int i = 0; i < nsEntries.size(); i++) {
1999+
max = Math.max(max, ((OpenFileEntry) nsEntries.get(i)).getId());
2000+
}
2001+
minOfMax = Math.min(minOfMax, max);
2002+
}
2003+
// Concatenate all entries into one result, sorted by inodeId
2004+
List<OpenFileEntry> routerEntries = new ArrayList<>();
2005+
boolean hasMore = false;
2006+
for (Map.Entry<RemoteLocation, BatchedEntries> entry : results.entrySet()) {
2007+
BatchedEntries nsEntries = entry.getValue();
2008+
hasMore |= nsEntries.hasMore();
2009+
for (int i = 0; i < nsEntries.size(); i++) {
2010+
OpenFileEntry ofe = (OpenFileEntry) nsEntries.get(i);
2011+
if (ofe.getId() > minOfMax) {
2012+
hasMore = true;
2013+
break;
2014+
}
2015+
RemoteLocation loc = entry.getKey();
2016+
String routerPath = ofe.getFilePath().replaceFirst(loc.getDest(), loc.getSrc());;
2017+
OpenFileEntry newEntry =
2018+
new OpenFileEntry(ofe.getId(), routerPath, ofe.getClientName(),
2019+
ofe.getClientMachine());
2020+
routerEntries.add(newEntry);
2021+
}
2022+
}
2023+
routerEntries.sort(Comparator.comparingLong(OpenFileEntry::getId));
2024+
return new BatchedRemoteIterator.BatchedListEntries<>(routerEntries, hasMore);
19822025
}
19832026

19842027
@Override
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,238 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
* <p>
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
* <p>
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.hadoop.hdfs.server.federation.router;
19+
20+
import java.io.IOException;
21+
import java.io.OutputStream;
22+
import java.util.ArrayList;
23+
import java.util.Collections;
24+
import java.util.EnumSet;
25+
import java.util.HashMap;
26+
import java.util.List;
27+
import java.util.Map;
28+
29+
import org.junit.jupiter.api.AfterAll;
30+
import org.junit.jupiter.api.AfterEach;
31+
import org.junit.jupiter.api.BeforeAll;
32+
import org.junit.jupiter.api.BeforeEach;
33+
import org.junit.jupiter.api.Test;
34+
35+
import org.apache.hadoop.conf.Configuration;
36+
import org.apache.hadoop.fs.BatchedRemoteIterator;
37+
import org.apache.hadoop.fs.RemoteIterator;
38+
import org.apache.hadoop.hdfs.DFSClient;
39+
import org.apache.hadoop.hdfs.DFSConfigKeys;
40+
import org.apache.hadoop.hdfs.protocol.OpenFileEntry;
41+
import org.apache.hadoop.hdfs.protocol.OpenFilesIterator;
42+
import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster;
43+
import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
44+
import org.apache.hadoop.hdfs.server.federation.StateStoreDFSCluster;
45+
import org.apache.hadoop.hdfs.server.federation.resolver.MultipleDestinationMountTableResolver;
46+
import org.apache.hadoop.hdfs.server.federation.resolver.order.DestinationOrder;
47+
import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryRequest;
48+
import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryResponse;
49+
import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
50+
51+
import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.getAdminClient;
52+
import static org.apache.hadoop.test.GenericTestUtils.getMethodName;
53+
import static org.junit.jupiter.api.Assertions.assertEquals;
54+
import static org.junit.jupiter.api.Assertions.assertTrue;
55+
56+
public class TestRouterListOpenFiles {
57+
final private static String TEST_DESTINATION_PATH = "/TestRouterListOpenFilesDst";
58+
final private static int NUM_SUBCLUSTERS = 2;
59+
final private static int BATCH_SIZE = 3;
60+
private static StateStoreDFSCluster cluster;
61+
private static MiniRouterDFSCluster.RouterContext routerContext;
62+
private static RouterClientProtocol routerProtocol;
63+
private static DFSClient client0;
64+
private static DFSClient client1;
65+
private static DFSClient routerClient;
66+
67+
@BeforeAll
68+
public static void setup() throws Exception {
69+
cluster = new StateStoreDFSCluster(false, NUM_SUBCLUSTERS,
70+
MultipleDestinationMountTableResolver.class);
71+
Configuration conf = new RouterConfigBuilder().stateStore().heartbeat().admin().rpc().build();
72+
conf.set(RBFConfigKeys.DFS_ROUTER_MONITOR_NAMENODE, "ns0,ns1");
73+
conf.setBoolean(RBFConfigKeys.MOUNT_TABLE_CACHE_UPDATE, true);
74+
conf.setInt(DFSConfigKeys.DFS_NAMENODE_LIST_OPENFILES_NUM_RESPONSES, BATCH_SIZE);
75+
cluster.addRouterOverrides(conf);
76+
cluster.startCluster(conf);
77+
cluster.startRouters();
78+
cluster.waitClusterUp();
79+
routerContext = cluster.getRandomRouter();
80+
routerProtocol = routerContext.getRouterRpcServer().getClientProtocolModule();
81+
routerClient = routerContext.getClient();
82+
client0 = cluster.getNamenode("ns0", null).getClient();
83+
client1 = cluster.getNamenode("ns1", null).getClient();
84+
}
85+
86+
@AfterAll
87+
public static void cleanup() {
88+
if (cluster != null) {
89+
cluster.shutdown();
90+
cluster = null;
91+
}
92+
}
93+
94+
@BeforeEach
95+
public void resetInodeId() throws IOException {
96+
cluster.getNamenode("ns0", null).getNamenode().getNamesystem().getFSDirectory()
97+
.resetLastInodeIdWithoutChecking(12345);
98+
cluster.getNamenode("ns1", null).getNamenode().getNamesystem().getFSDirectory()
99+
.resetLastInodeIdWithoutChecking(12345);
100+
// Create 2 dirs with the same name on 2 different nss
101+
client0.mkdirs(TEST_DESTINATION_PATH);
102+
client1.mkdirs(TEST_DESTINATION_PATH);
103+
}
104+
105+
@AfterEach
106+
public void cleanupNamespaces() throws IOException {
107+
client0.delete("/", true);
108+
client1.delete("/", true);
109+
}
110+
111+
@Test
112+
public void testSingleDestination() throws Exception {
113+
String testPath = "/" + getMethodName();
114+
createMountTableEntry(testPath, Collections.singletonList("ns0"));
115+
116+
// Open 2 files with different names
117+
OutputStream os0 = client0.create(TEST_DESTINATION_PATH + "/file0", true);
118+
OutputStream os1 = client1.create(TEST_DESTINATION_PATH + "/file1", true);
119+
120+
BatchedRemoteIterator.BatchedEntries<OpenFileEntry> result =
121+
routerProtocol.listOpenFiles(0, EnumSet.of(OpenFilesIterator.OpenFilesType.ALL_OPEN_FILES),
122+
testPath);
123+
// Should list only the entry on ns0
124+
assertEquals(1, result.size());
125+
assertEquals(testPath + "/file0", result.get(0).getFilePath());
126+
os0.close();
127+
os1.close();
128+
}
129+
130+
@Test
131+
public void testMultipleDestinations() throws Exception {
132+
String testPath = "/" + getMethodName();
133+
createMountTableEntry(testPath, cluster.getNameservices());
134+
135+
// Open 2 files with different names
136+
OutputStream os0 = client0.create(TEST_DESTINATION_PATH + "/file0", true);
137+
OutputStream os1 = client1.create(TEST_DESTINATION_PATH + "/file1", true);
138+
BatchedRemoteIterator.BatchedEntries<OpenFileEntry> result =
139+
routerProtocol.listOpenFiles(0, EnumSet.of(OpenFilesIterator.OpenFilesType.ALL_OPEN_FILES),
140+
testPath);
141+
// Should list both entries on ns0 and ns1
142+
assertEquals(2, result.size());
143+
assertEquals(testPath + "/file0", result.get(0).getFilePath());
144+
assertEquals(testPath + "/file1", result.get(1).getFilePath());
145+
RemoteIterator<OpenFileEntry> ite = routerClient.listOpenFiles(testPath);
146+
while (ite.hasNext()) {
147+
OpenFileEntry ofe = ite.next();
148+
assertTrue(ofe.getFilePath().equals(testPath + "/file0") || ofe.getFilePath()
149+
.equals(testPath + "/file1"));
150+
}
151+
os0.close();
152+
os1.close();
153+
154+
// Open 2 files with same name
155+
os0 = client0.create(TEST_DESTINATION_PATH + "/file2", true);
156+
os1 = client1.create(TEST_DESTINATION_PATH + "/file2", true);
157+
result =
158+
routerProtocol.listOpenFiles(0, EnumSet.of(OpenFilesIterator.OpenFilesType.ALL_OPEN_FILES),
159+
testPath);
160+
// Should list both entries
161+
assertEquals(2, result.size());
162+
assertEquals(testPath + "/file2", result.get(0).getFilePath());
163+
assertEquals(testPath + "/file2", result.get(1).getFilePath());
164+
ite = routerClient.listOpenFiles(testPath);
165+
while (ite.hasNext()) {
166+
OpenFileEntry ofe = ite.next();
167+
assertTrue(ofe.getFilePath().equals(testPath + "/file2"));
168+
}
169+
os0.close();
170+
os1.close();
171+
}
172+
173+
@Test
174+
public void testMultipleDestinationsMultipleBatches() throws Exception {
175+
String testPath = "/" + getMethodName();
176+
createMountTableEntry(testPath, cluster.getNameservices());
177+
178+
// Make ns1 have a much bigger inodeid than ns0
179+
cluster.getNamenode("ns0", null).getNamenode().getNamesystem().getFSDirectory()
180+
.resetLastInodeIdWithoutChecking((long) 1E6);
181+
cluster.getNamenode("ns1", null).getNamenode().getNamesystem().getFSDirectory()
182+
.resetLastInodeIdWithoutChecking((long) 2E6);
183+
runBatchListOpenFilesTest(testPath);
184+
185+
// Rerun the test with ns0 having a much bigger inodeid than ns1
186+
cluster.getNamenode("ns0", null).getNamenode().getNamesystem().getFSDirectory()
187+
.resetLastInodeIdWithoutChecking((long) 4E6);
188+
cluster.getNamenode("ns1", null).getNamenode().getNamesystem().getFSDirectory()
189+
.resetLastInodeIdWithoutChecking((long) 3E6);
190+
runBatchListOpenFilesTest(testPath);
191+
}
192+
193+
private static void runBatchListOpenFilesTest(String testPath) throws IOException {
194+
// Open 3 batches on both namespaces
195+
OutputStream[] oss0 = new OutputStream[3 * BATCH_SIZE];
196+
OutputStream[] oss1 = new OutputStream[3 * BATCH_SIZE];
197+
for (int i = 0; i < 3 * BATCH_SIZE; i++) {
198+
oss0[i] = client0.create(TEST_DESTINATION_PATH + "/file0a_" + i, true);
199+
oss1[i] = client1.create(TEST_DESTINATION_PATH + "/file1a_" + i, true);
200+
}
201+
RemoteIterator<OpenFileEntry> ite = routerClient.listOpenFiles(testPath);
202+
List<OpenFileEntry> allEntries = new ArrayList<>();
203+
while (ite.hasNext()) {
204+
allEntries.add(ite.next());
205+
}
206+
// All files should be reported once
207+
assertEquals(3 * 2 * BATCH_SIZE, allEntries.size());
208+
209+
// Clean up
210+
for (int i = 0; i < 3 * BATCH_SIZE; i++) {
211+
oss0[i].close();
212+
oss1[i].close();
213+
}
214+
client0.delete(TEST_DESTINATION_PATH, true);
215+
client1.delete(TEST_DESTINATION_PATH, true);
216+
}
217+
218+
/**
219+
* Creates a mount with custom source path and some fixed destination paths.
220+
*/
221+
private static void createMountTableEntry(String sourcePath, List<String> nsIds)
222+
throws Exception {
223+
Map<String, String> destMap = new HashMap<>();
224+
for (String nsId : nsIds) {
225+
destMap.put(nsId, TEST_DESTINATION_PATH);
226+
}
227+
MountTable newEntry = MountTable.newInstance(sourcePath, destMap);
228+
if (nsIds.size() > 1) {
229+
newEntry.setDestOrder(DestinationOrder.HASH_ALL);
230+
}
231+
AddMountTableEntryRequest addRequest = AddMountTableEntryRequest.newInstance(newEntry);
232+
AddMountTableEntryResponse addResponse =
233+
getAdminClient(routerContext.getRouter()).getMountTableManager()
234+
.addMountTableEntry(addRequest);
235+
assertTrue(addResponse.getStatus());
236+
routerContext.getRouter().getStateStore().refreshCaches(true);
237+
}
238+
}

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2060,8 +2060,12 @@ void resetLastInodeId(long newValue) throws IOException {
20602060
}
20612061
}
20622062

2063-
/** Should only be used for tests to reset to any value */
2064-
void resetLastInodeIdWithoutChecking(long newValue) {
2063+
/**
2064+
* Should only be used for tests to reset to any value.
2065+
* @param newValue new value to set to
2066+
*/
2067+
@VisibleForTesting
2068+
public void resetLastInodeIdWithoutChecking(long newValue) {
20652069
inodeId.setCurrentValue(newValue);
20662070
}
20672071

0 commit comments

Comments
 (0)