Skip to content

Commit 71f9e52

Browse files
Restored original ShellServerIT. Renamed IT class. Modified checks in WriteExportFiles
1 parent aa66552 commit 71f9e52

File tree

3 files changed

+285
-238
lines changed

3 files changed

+285
-238
lines changed

server/manager/src/main/java/org/apache/accumulo/manager/tableOps/tableExport/WriteExportFiles.java

Lines changed: 32 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -25,11 +25,12 @@
2525
import java.io.DataOutputStream;
2626
import java.io.IOException;
2727
import java.io.OutputStreamWriter;
28+
import java.util.Collection;
2829
import java.util.HashMap;
30+
import java.util.HashSet;
2931
import java.util.Map;
3032
import java.util.Map.Entry;
3133
import java.util.Set;
32-
import java.util.TreeSet;
3334
import java.util.zip.ZipEntry;
3435
import java.util.zip.ZipOutputStream;
3536

@@ -60,6 +61,7 @@
6061
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily;
6162
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.TabletColumnFamily;
6263
import org.apache.accumulo.core.security.Authorizations;
64+
import org.apache.accumulo.core.volume.Volume;
6365
import org.apache.accumulo.manager.Manager;
6466
import org.apache.accumulo.manager.tableOps.ManagerRepo;
6567
import org.apache.accumulo.manager.tableOps.Utils;
@@ -68,6 +70,7 @@
6870
import org.apache.accumulo.server.conf.TableConfiguration;
6971
import org.apache.accumulo.server.fs.VolumeManager;
7072
import org.apache.hadoop.fs.FSDataOutputStream;
73+
import org.apache.hadoop.fs.FileSystem;
7174
import org.apache.hadoop.fs.Path;
7275

7376
class WriteExportFiles extends ManagerRepo {
@@ -156,8 +159,6 @@ public void undo(long tid, Manager env) {
156159
public static void exportTable(VolumeManager fs, ServerContext context, String tableName,
157160
TableId tableID, String exportDir) throws Exception {
158161

159-
Set<String> volumeSet = new TreeSet<>();
160-
161162
fs.mkdirs(new Path(exportDir));
162163
Path exportMetaFilePath = fs.getFileSystemByPath(new Path(exportDir))
163164
.makeQualified(new Path(exportDir, Constants.EXPORT_FILE));
@@ -190,23 +191,35 @@ public static void exportTable(VolumeManager fs, ServerContext context, String t
190191
dataOut.close();
191192
dataOut = null;
192193

193-
// make a set of unique volumes from the map
194-
for (String fileString : uniqueFiles.values()) {
195-
String uniqueVolume = getVolumeFromString(fileString);
196-
volumeSet.add(uniqueVolume);
197-
}
198-
199-
// for each unique volume: get every matching entry in the map and send them to
200-
// createDistcpFile method
201-
for (String volumeString : volumeSet) {
202-
Set<String> sortedVolumeSet = new TreeSet<>();
203-
for (String rFileString : uniqueFiles.values()) {
204-
String currentVolume = getVolumeFromString(rFileString);
205-
if (currentVolume.equals(volumeString)) {
206-
sortedVolumeSet.add(rFileString);
194+
// make map containing a volume and corresponding files
195+
final Map<Volume,Set<String>> volumeFileMap = new HashMap<>();
196+
final Collection<Volume> configuredVolumes = fs.getVolumes();
197+
configuredVolumes.forEach(vol -> {
198+
final FileSystem dfs = vol.getFileSystem();
199+
uniqueFiles.values().forEach(file -> {
200+
Path p = null;
201+
try {
202+
p = dfs.resolvePath(new Path(file));
203+
} catch (IOException e) {
204+
throw new RuntimeException(e);
207205
}
208-
}
209-
createDistcpFile(fs, exportDir, exportMetaFilePath, sortedVolumeSet, volumeString);
206+
if (vol.containsPath(p)) {
207+
if (volumeFileMap.get(vol) == null) {
208+
volumeFileMap.put(vol, new HashSet<String>());
209+
}
210+
volumeFileMap.get(vol).add(file);
211+
}
212+
});
213+
});
214+
215+
// for each entry in volumeFileMap, get 'name' of volume to name distcp.txt file
216+
// and call createDistcpFile
217+
for (Map.Entry<Volume,Set<String>> entry : volumeFileMap.entrySet()) {
218+
String keyValueString = entry.getKey().toString();
219+
String[] keyValueArray = keyValueString.split("/");
220+
String volumeName = keyValueArray[2];
221+
createDistcpFile(fs, exportDir, exportMetaFilePath, volumeFileMap.get(entry.getKey()),
222+
volumeName);
210223
}
211224
} finally {
212225
if (dataOut != null) {
@@ -215,11 +228,6 @@ public static void exportTable(VolumeManager fs, ServerContext context, String t
215228
}
216229
}
217230

218-
private static String getVolumeFromString(String searchString) {
219-
String[] segmentArray = searchString.split("/");
220-
return segmentArray[2];
221-
}
222-
223231
private static void createDistcpFile(VolumeManager fs, String exportDir, Path exportMetaFilePath,
224232
Set<String> uniqueFiles, String volumeName) throws IOException {
225233
if (volumeName.contains(":")) {
Lines changed: 156 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,156 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* https://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.accumulo.test;
20+
21+
import static org.junit.jupiter.api.Assertions.assertTrue;
22+
23+
import java.io.File;
24+
import java.time.Duration;
25+
import java.util.Map;
26+
import java.util.SortedSet;
27+
import java.util.TreeSet;
28+
29+
import org.apache.accumulo.core.client.Accumulo;
30+
import org.apache.accumulo.core.client.AccumuloClient;
31+
import org.apache.accumulo.core.client.BatchWriter;
32+
import org.apache.accumulo.core.client.Scanner;
33+
import org.apache.accumulo.core.conf.Property;
34+
import org.apache.accumulo.core.data.Key;
35+
import org.apache.accumulo.core.data.Mutation;
36+
import org.apache.accumulo.core.data.Range;
37+
import org.apache.accumulo.core.data.Value;
38+
import org.apache.accumulo.core.metadata.MetadataTable;
39+
import org.apache.accumulo.core.metadata.schema.MetadataSchema;
40+
import org.apache.accumulo.core.security.Authorizations;
41+
import org.apache.accumulo.harness.AccumuloClusterHarness;
42+
import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
43+
import org.apache.hadoop.conf.Configuration;
44+
import org.apache.hadoop.fs.FileSystem;
45+
import org.apache.hadoop.fs.Path;
46+
import org.apache.hadoop.fs.RawLocalFileSystem;
47+
import org.apache.hadoop.io.Text;
48+
import org.junit.jupiter.api.Test;
49+
import org.slf4j.Logger;
50+
import org.slf4j.LoggerFactory;
51+
52+
public class ExportTableCommandWithMultipleVolumesIT extends AccumuloClusterHarness {
53+
private static final Logger log =
54+
LoggerFactory.getLogger(ExportTableCommandWithMultipleVolumesIT.class);
55+
56+
Path v1, v2;
57+
58+
public static String[] row_numbers = "1,2,3,4,5,6,7,8,9,10".split(",");
59+
60+
String baseDirStr = "";
61+
String baseDir2Str = "";
62+
String originalVolume = "";
63+
String secondVolume = "";
64+
65+
@Override
66+
protected Duration defaultTimeout() {
67+
return Duration.ofMinutes(1);
68+
}
69+
70+
@Override
71+
public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
72+
File baseDir = cfg.getDir();
73+
74+
// get first volume name
75+
baseDirStr = baseDir.toString();
76+
String[] baseDirArray = baseDirStr.split("/");
77+
originalVolume = baseDirArray[2];
78+
79+
// get second volume name
80+
String[] baseDir2Array = baseDirArray;
81+
baseDir2Array[2] = baseDir2Array[2] + "2";
82+
secondVolume = baseDir2Array[2];
83+
84+
// make second volume base directory
85+
for (String element : baseDir2Array) {
86+
baseDir2Str = baseDir2Str + "/" + element;
87+
}
88+
File baseDir2 = new File(baseDir2Str);
89+
90+
File v1f = new File(baseDir, "volumes/v1");
91+
File v2f = new File(baseDir2, "volumes/v2");
92+
93+
v1 = new Path("file://" + v1f.getAbsolutePath());
94+
v2 = new Path("file://" + v2f.getAbsolutePath());
95+
96+
// Run MAC on two locations in the local file system
97+
cfg.setProperty(Property.INSTANCE_VOLUMES, v1 + "," + v2);
98+
99+
// use raw local file system so walogs sync and flush will work
100+
hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName());
101+
}
102+
103+
@Test
104+
public void testExportCommand() throws Exception {
105+
try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) {
106+
FileSystem fs = cluster.getFileSystem();
107+
108+
final String tableName = getUniqueNames(1)[0];
109+
client.tableOperations().create(tableName);
110+
111+
// add splits to table
112+
SortedSet<Text> partitions = new TreeSet<>();
113+
for (String s : row_numbers) {
114+
partitions.add(new Text(s));
115+
}
116+
client.tableOperations().addSplits(tableName, partitions);
117+
118+
try (BatchWriter bw = client.createBatchWriter(tableName)) {
119+
for (int i = 1; i <= 50000; i++) {
120+
Mutation m = new Mutation(Integer.toString(i));
121+
m.put(Integer.toString(i), "", String.format("Entry number %d.", i));
122+
bw.addMutation(m);
123+
}
124+
}
125+
126+
client.tableOperations().compact(tableName, null, null, true, true);
127+
client.tableOperations().flush(tableName, null, null, true);
128+
129+
Path outputDir = new Path(cluster.getTemporaryPath(), getClass().getName());
130+
Path exportDir = new Path(outputDir, "export");
131+
client.tableOperations().offline(tableName, true);
132+
client.tableOperations().exportTable(tableName, exportDir.toString());
133+
134+
// Make sure the distcp.txt files that exporttable creates exists
135+
Path distcpOne = new Path(exportDir, "distcp-" + originalVolume + ".txt");
136+
Path distcpTwo = new Path(exportDir, "distcp-" + secondVolume + ".txt");
137+
assertTrue(fs.exists(distcpOne), "Distcp file doesn't exist for original volume");
138+
assertTrue(fs.exists(distcpTwo), "Distcp file doesn't exist for second volume");
139+
140+
try (Scanner scanner = client.createScanner(MetadataTable.NAME, Authorizations.EMPTY)) {
141+
scanner.setRange(new Range("1", "1<"));
142+
scanner.fetchColumnFamily(MetadataSchema.TabletsSection.DataFileColumnFamily.NAME);
143+
144+
for (Map.Entry<Key,Value> entry : scanner) {
145+
boolean inV1 = entry.getKey().getColumnQualifier().toString().contains(v1.toString());
146+
boolean inV2 = entry.getKey().getColumnQualifier().toString().contains(v2.toString());
147+
assertTrue(inV1 || inV2);
148+
}
149+
}
150+
151+
fs.deleteOnExit(v1);
152+
fs.deleteOnExit(v2);
153+
fs.deleteOnExit(outputDir);
154+
}
155+
}
156+
}

0 commit comments

Comments
 (0)