Skip to content
Open
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,12 @@
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.zip.ZipEntry;
import java.util.zip.ZipOutputStream;

Expand Down Expand Up @@ -58,6 +61,7 @@
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.TabletColumnFamily;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.core.volume.Volume;
import org.apache.accumulo.manager.Manager;
import org.apache.accumulo.manager.tableOps.ManagerRepo;
import org.apache.accumulo.manager.tableOps.Utils;
Expand All @@ -66,6 +70,7 @@
import org.apache.accumulo.server.conf.TableConfiguration;
import org.apache.accumulo.server.fs.VolumeManager;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

class WriteExportFiles extends ManagerRepo {
Expand Down Expand Up @@ -186,8 +191,33 @@ public static void exportTable(VolumeManager fs, ServerContext context, String t
dataOut.close();
dataOut = null;

createDistcpFile(fs, exportDir, exportMetaFilePath, uniqueFiles);

// make map containing a volume and corresponding files
final Map<Volume,Set<String>> volumeFileMap = new HashMap<>();
final Collection<Volume> configuredVolumes = fs.getVolumes();
configuredVolumes.forEach(vol -> {
final FileSystem dfs = vol.getFileSystem();
uniqueFiles.values().forEach(file -> {
Path p = null;
try {
p = dfs.resolvePath(new Path(file));
} catch (IOException e) {
throw new RuntimeException(e);
}
if (vol.containsPath(p)) {
volumeFileMap.computeIfAbsent(vol, k -> new HashSet<>()).add(file);
}
});
});

// for each entry in volumeFileMap, get 'name' of volume to name distcp.txt file
// and call createDistcpFile
for (Map.Entry<Volume,Set<String>> entry : volumeFileMap.entrySet()) {
String keyValueString = entry.getKey().toString();
String[] keyValueArray = keyValueString.split("/");
String volumeName = keyValueArray[2];
createDistcpFile(fs, exportDir, exportMetaFilePath, volumeFileMap.get(entry.getKey()),
volumeName);
}
} finally {
if (dataOut != null) {
dataOut.close();
Expand All @@ -196,12 +226,16 @@ public static void exportTable(VolumeManager fs, ServerContext context, String t
}

private static void createDistcpFile(VolumeManager fs, String exportDir, Path exportMetaFilePath,
Map<String,String> uniqueFiles) throws IOException {
BufferedWriter distcpOut = new BufferedWriter(
new OutputStreamWriter(fs.create(new Path(exportDir, "distcp.txt")), UTF_8));
Set<String> uniqueFiles, String volumeName) throws IOException {
if (volumeName.contains(":")) {
volumeName = volumeName.replace(":", "-");
}

BufferedWriter distcpOut = new BufferedWriter(new OutputStreamWriter(
fs.create(new Path(exportDir, "distcp-" + volumeName + ".txt")), UTF_8));

try {
for (String file : uniqueFiles.values()) {
for (String file : uniqueFiles) {
distcpOut.append(file);
distcpOut.newLine();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.accumulo.test;

import static org.junit.jupiter.api.Assertions.assertTrue;

import java.io.File;
import java.time.Duration;
import java.util.Map;
import java.util.SortedSet;
import java.util.TreeSet;

import org.apache.accumulo.core.client.Accumulo;
import org.apache.accumulo.core.client.AccumuloClient;
import org.apache.accumulo.core.client.BatchWriter;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.metadata.MetadataTable;
import org.apache.accumulo.core.metadata.schema.MetadataSchema;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.harness.AccumuloClusterHarness;
import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RawLocalFileSystem;
import org.apache.hadoop.io.Text;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ExportTableCommandWithMultipleVolumesIT extends AccumuloClusterHarness {
private static final Logger log =
LoggerFactory.getLogger(ExportTableCommandWithMultipleVolumesIT.class);

Path v1, v2;

public static String[] row_numbers = "1,2,3,4,5,6,7,8,9,10".split(",");

String baseDirStr = "";
String baseDir2Str = "";
String originalVolume = "";
String secondVolume = "";

@Override
protected Duration defaultTimeout() {
return Duration.ofMinutes(1);
}

@Override
public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
File baseDir = cfg.getDir();

// get first volume name
baseDirStr = baseDir.toString();
String[] baseDirArray = baseDirStr.split("/");
originalVolume = baseDirArray[2];

// get second volume name
String[] baseDir2Array = baseDirArray;
baseDir2Array[2] = baseDir2Array[2] + "2";
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test fails for me since it is trying to use a directory that does not exist: dgarguilo2 is not a user and is not a directory. I suggest looking into using a @TempDir like we do with other tests so a directory is sure to be created and cleaned up for this test.

Details
2023-07-25T16:42:33,408 [init.Initialize] DEBUG: creating instance directories for base: file:/home/dgarguilo/github/accumulo/test/target/mini-tests/org.apache.accumulo.test.ExportTableCommandWithMultipleVolumesIT_testExportCommand/volumes/v1
2023-07-25T16:42:33,427 [init.Initialize] INFO : Directory file:/home/dgarguilo/github/accumulo/test/target/mini-tests/org.apache.accumulo.test.ExportTableCommandWithMultipleVolumesIT_testExportCommand/volumes/v1/version/11 created - call returned true
2023-07-25T16:42:33,430 [init.Initialize] INFO : Directory file:/home/dgarguilo/github/accumulo/test/target/mini-tests/org.apache.accumulo.test.ExportTableCommandWithMultipleVolumesIT_testExportCommand/volumes/v1/instance_id created - call returned true
2023-07-25T16:42:33,457 [init.Initialize] INFO : Created instanceId file file:/home/dgarguilo/github/accumulo/test/target/mini-tests/org.apache.accumulo.test.ExportTableCommandWithMultipleVolumesIT_testExportCommand/volumes/v1/instance_id/7f6b05aa-4141-4781-ab1b-6a7f7667fae4 in hdfs
2023-07-25T16:42:33,457 [init.Initialize] DEBUG: creating instance directories for base: file:/home/dgarguilo2/github/accumulo/test/target/mini-tests/org.apache.accumulo.test.ExportTableCommandWithMultipleVolumesIT_testExportCommand/volumes/v2
2023-07-25T16:42:33,462 [init.Initialize] INFO : Directory file:/home/dgarguilo2/github/accumulo/test/target/mini-tests/org.apache.accumulo.test.ExportTableCommandWithMultipleVolumesIT_testExportCommand/volumes/v2/version/11 created - call returned false
2023-07-25T16:42:33,463 [init.Initialize] INFO : Directory file:/home/dgarguilo2/github/accumulo/test/target/mini-tests/org.apache.accumulo.test.ExportTableCommandWithMultipleVolumesIT_testExportCommand/volumes/v2/instance_id created - call returned false
2023-07-25T16:42:33,465 [init.Initialize] ERROR: Problem creating new directories
java.io.IOException: Mkdirs failed to create file:/home/dgarguilo2/github/accumulo/test/target/mini-tests/org.apache.accumulo.test.ExportTableCommandWithMultipleVolumesIT_testExportCommand/volumes/v2/instance_id
	at org.apache.hadoop.fs.RawLocalFileSystem.create(RawLocalFileSystem.java:561) ~[hadoop-client-api-3.3.5.jar:?]
	at org.apache.hadoop.fs.RawLocalFileSystem.create(RawLocalFileSystem.java:549) ~[hadoop-client-api-3.3.5.jar:?]
	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1210) ~[hadoop-client-api-3.3.5.jar:?]
	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1165) ~[hadoop-client-api-3.3.5.jar:?]
	at org.apache.hadoop.fs.FileSystem.createNewFile(FileSystem.java:1499) ~[hadoop-client-api-3.3.5.jar:?]
	at org.apache.accumulo.server.fs.VolumeManagerImpl.createNewFile(VolumeManagerImpl.java:177) ~[classes/:?]
	at org.apache.accumulo.server.init.Initialize.createDirs(Initialize.java:291) ~[classes/:?]
	at org.apache.accumulo.server.init.Initialize.doInit(Initialize.java:175) ~[classes/:?]
	at org.apache.accumulo.server.init.Initialize.execute(Initialize.java:543) ~[classes/:?]
	at org.apache.accumulo.server.init.Initialize.main(Initialize.java:583) ~[classes/:?]
	at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:?]
	at jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:?]
	at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:?]
	at java.lang.reflect.Method.invoke(Method.java:566) ~[?:?]
	at org.apache.accumulo.start.Main.lambda$execMainClass$1(Main.java:118) ~[classes/:?]
	at java.lang.Thread.run(Thread.java:829) [?:?]
2023-07-25T16:42:33,473 [init.Initialize] ERROR: FATAL: Problem during initialize
java.io.IOException: Problem creating directories on [file:/home/dgarguilo2/github/accumulo/test/target/mini-tests/org.apache.accumulo.test.ExportTableCommandWithMultipleVolumesIT_testExportCommand/volumes/v2, file:/home/dgarguilo/github/accumulo/test/target/mini-tests/org.apache.accumulo.test.ExportTableCommandWithMultipleVolumesIT_testExportCommand/volumes/v1]
	at org.apache.accumulo.server.init.Initialize.doInit(Initialize.java:176) ~[classes/:?]
	at org.apache.accumulo.server.init.Initialize.execute(Initialize.java:543) ~[classes/:?]
	at org.apache.accumulo.server.init.Initialize.main(Initialize.java:583) ~[classes/:?]
	at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:?]
	at jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:?]
	at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:?]
	at java.lang.reflect.Method.invoke(Method.java:566) ~[?:?]
	at org.apache.accumulo.start.Main.lambda$execMainClass$1(Main.java:118) ~[classes/:?]
	at java.lang.Thread.run(Thread.java:829) [?:?]

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the intent of the new test was to create and use a new volume. It is not possible to create a new volume in a test however, so I think the best that can be done is to use different directories which can be done with junits TempDir.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After thinking about this a bit more and looking at the code, I don't think what I suggested will work. In fact I am not sure of a way to write a test for this code. It seems that, to properly test this code, two volumes need to be present. However, an additional volume can not be created or expected to already exist on the machine for a test.

I am able to get the new test to pass only after manually creating a new directory for the test to create resources inside of. For example, with my username dgarguilo, the test expects to be able to create the following directories:

  • /home/dgarguilo/github/accumulo/test/target/mini-tests/org.apache.accumulo.test.ExportTableCommandWithMultipleVolumesIT_testExportCommand/volumes/v1
  • /home/dgarguilo2/github/accumulo/test/target/mini-tests/org.apache.accumulo.test.ExportTableCommandWithMultipleVolumesIT_testExportCommand/volumes/v2

The second of which I had to create manually. Obviously this can't be expected to happen for a test to pass.

In conclusion, after manually creating the needed directory and having the test pass, it seems like these changes are working but I don't think we can keep the test as is. I am not sure how to move forward with these changes. Should the new IT be removed and just rely on manual verification of these changes or should something else happen?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

However, an additional volume can not be created or expected to already exist on the machine for a test.

I think this statement is incorrect. You should be able to create more than one volume for the test. A volume is simply a URI. You can certainly create two volumes (with file:///path/to/tempdir/vol1 and file:///path/to/tempdir/vol2) by creating these two temporary directories at the beginning of the test, and configuring those in MiniCluster before it starts up. What is preventing that?

However, you don't even need to do that much. The table doesn't need to be online or any of its files be read, or actually exist. You can just create an offline table, and add metadata entries that look like the table has files in separate volumes. We don't need to actually read the files to do the export and verify that the separate distcp files are created. You can just check that the files were created and contain the expected entries, based on the mock entries you inserted into the metadata of the offline table.

Copy link
Contributor

@ivakegg ivakegg Jul 28, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When you say creating the directory manually, do you simply mean 'mkdir dir' in your shell? If that is all that is needed then we can certainly create the directory in the test case.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand that new directories can be created by a test. I think the difference in this case is that the distcp files that are created use the root of the path (or rather the second index) to name those files:

String keyValueString = entry.getKey().toString();
String[] keyValueArray = keyValueString.split("/");
String volumeName = keyValueArray[2];
createDistcpFile(fs, exportDir, exportMetaFilePath, volumeFileMap.get(entry.getKey()), volumeName);

https://github.com/AlbertWhitlock/accumulo/blob/fced61f2479dea155610386a9ec6ae11a8a0cfb4/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/tableExport/WriteExportFiles.java#L215

So in the case of creating a temporary directory from the test, the distcp files would be created with the same name unless we have a way of creating directories with different subdirectories at the level at which the code above uses. That is what I was trying to convey in my previous comment, not that its impossible to create a directory from a test.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if you're saying that the code in this PR is creating separate distcp files whose names collide (which shouldn't happen... this PR should create uniquely named files if it's creating more than one as the proposed solution), or if you're saying that all the local files look like they're part of the same volume and end up in the same file because all files in file:/// looks like a single filesystem. I think I would need specific file names and resulting file contents, as an example, to understand what you're saying.

However, I've noticed another problem with the current implementation. The current implementation seems to be looping over the currently configured volumes and writing out separate files for each of those volumes configured, if it sees files that belong in that volume. However... what about files in the metadata that are not part of any currently configured volume? The number, name, and organization of the files should be based on the volumes represented in the absolute paths of the files seen in the metadata table... not just grouped by whether they match one of the currently configured volumes. The current implementation looks like it would just ignore those files.

Without diving in to implement this myself, I'm not exactly sure what the expected behavior is for file:/// paths, but the basic functionality I would expect the following set of mock files seen to have 3 separate groups, each with 2 files:

group1:
hdfs://nn1:8020/accumulo/path/to/table1/tablet1/file1.rf
hdfs://nn1:8020/accumulo/path/to/table1/tablet2/file1.rf

group2:
hdfs://nn1:8021/accumulo/path/to/table1/tablet3/file1.rf
hdfs://nn1:8021/accumulo/path/to/table1/tablet4/file1.rf

group3:
hdfs://nn3:8020/accumulo/path/to/table1/tablet5/file1.rf
hdfs://nn3:8020/accumulo/path/to/table1/tablet6/file1.rf

This should be true, even if only hdfs://nn1:8020 were currently configured in instance.volumes, and nn2 and nn3 were previous volumes that are being decommissioned, but data still exists on them. This should also account for the volume replacements.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Christopher....I am only making those volumes as a place for the rfiles to be held for the test. I cannot just place non-existent files entries into the metadata and then run the exportTable command. In WriteExportFiles class, it checks that the files exist. When I create local directories to hold the rfiles, the test passes. It creates the multiple distcp files and then deletes the directories it used for the test. Your concern that the output should be based on the entries in the metadata table is correct.......this test setup is just a way to get some entries into the metadata table and then run the exportTable command.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's frustrating that the WriteExportFiles class requires the files to actually exist. That definitely limits the ways this can be tested as an integration test. But, we can still have good unit test coverage, if WriteExportFiles can be written with a package-private method that does the grouping of the input files to the designated output file, without the files existing. You might need to rewrite some of the implementation code to be more testable.

But, regardless of the test code, the implementation is still incorrect, in that it loops over instance.volumes, and looks for entries in the metadata table that matches them. What it should do is loop over the entire set of files found in the metadata table, and group them based on filesystem, regardless of what is set in instance.volumes. You can write a unit test for the grouping function that doesn't require a running system or real files to exist. You just pass a list of file names that could be seen in the metadata table, and verify the grouping function has properly grouped them by destination filename.

secondVolume = baseDir2Array[2];

// make second volume base directory
for (String element : baseDir2Array) {
baseDir2Str = baseDir2Str + "/" + element;
}
File baseDir2 = new File(baseDir2Str);

File v1f = new File(baseDir, "volumes/v1");
File v2f = new File(baseDir2, "volumes/v2");

v1 = new Path("file://" + v1f.getAbsolutePath());
v2 = new Path("file://" + v2f.getAbsolutePath());

// Run MAC on two locations in the local file system
cfg.setProperty(Property.INSTANCE_VOLUMES, v1 + "," + v2);

// use raw local file system so walogs sync and flush will work
hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName());
}

@Test
public void testExportCommand() throws Exception {
try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) {
FileSystem fs = cluster.getFileSystem();

final String tableName = getUniqueNames(1)[0];
client.tableOperations().create(tableName);

// add splits to table
SortedSet<Text> partitions = new TreeSet<>();
for (String s : row_numbers) {
partitions.add(new Text(s));
}
client.tableOperations().addSplits(tableName, partitions);

try (BatchWriter bw = client.createBatchWriter(tableName)) {
for (int i = 1; i <= 50000; i++) {
Mutation m = new Mutation(Integer.toString(i));
m.put(Integer.toString(i), "", String.format("Entry number %d.", i));
bw.addMutation(m);
}
}

client.tableOperations().compact(tableName, null, null, true, true);
client.tableOperations().flush(tableName, null, null, true);

Path outputDir = new Path(cluster.getTemporaryPath(), getClass().getName());
Path exportDir = new Path(outputDir, "export");
client.tableOperations().offline(tableName, true);
client.tableOperations().exportTable(tableName, exportDir.toString());

// Make sure the distcp.txt files that exporttable creates exists
Path distcpOne = new Path(exportDir, "distcp-" + originalVolume + ".txt");
Path distcpTwo = new Path(exportDir, "distcp-" + secondVolume + ".txt");
assertTrue(fs.exists(distcpOne), "Distcp file doesn't exist for original volume");
assertTrue(fs.exists(distcpTwo), "Distcp file doesn't exist for second volume");

try (Scanner scanner = client.createScanner(MetadataTable.NAME, Authorizations.EMPTY)) {
scanner.setRange(new Range("1", "1<"));
scanner.fetchColumnFamily(MetadataSchema.TabletsSection.DataFileColumnFamily.NAME);

for (Map.Entry<Key,Value> entry : scanner) {
boolean inV1 = entry.getKey().getColumnQualifier().toString().contains(v1.toString());
boolean inV2 = entry.getKey().getColumnQualifier().toString().contains(v2.toString());
assertTrue(inV1 || inV2);
}
}

fs.deleteOnExit(v1);
fs.deleteOnExit(v2);
fs.deleteOnExit(outputDir);
}
}
}