Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.github.rvesse.airline.annotations.Option;
import com.github.rvesse.airline.annotations.restrictions.*;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.concurrent.atomic.AtomicLong;
import io.github.cdimascio.dotenv.Dotenv;
import org.apache.commons.csv.CSVParser;
Expand All @@ -36,8 +37,9 @@
import java.util.Map;
import java.util.Set;

@Command(name = "convert-csv", description = "Converts CSV file exported from Neo4j via 'apoc.export.csv.all' to Neptune Gremlin load data formatted CSV files, " +
"and optionally automates the bulk loading of the converted data into Amazon Neptune.")
@Command(name = "convert-csv", description = """
Converts CSV file exported from Neo4j via 'apoc.export.csv.all' to Neptune Gremlin load data formatted CSV files,
and optionally automates the bulk loading of the converted data into Amazon Neptune.""")
public class ConvertCsv implements Runnable {

// Neo4j CSV file conversion options
Expand Down Expand Up @@ -72,22 +74,29 @@ public class ConvertCsv implements Runnable {
@Once
private File inputFile;

@Option(name = {"--conversion-config"}, description = "Path to YAML file containing configuration for label mappings and record filtering")
@Option(name = {"--conversion-config"}, description =
"Path to YAML file containing configuration for label mappings and record filtering")
@Path(mustExist = true, kind = PathKind.FILE)
@Once
private File conversionConfigFile;

@Option(name = {"--node-property-policy"}, description = "Conversion policy for multi-valued node properties (default, 'PutInSetIgnoringDuplicates')")
@Option(name = {"--node-property-policy"}, description =
"Conversion policy for multi-valued node properties (default, 'PutInSetIgnoringDuplicates')")
@Once
@AllowedValues(allowedValues = {"LeaveAsString", "Halt", "PutInSetIgnoringDuplicates", "PutInSetButHaltIfDuplicates"})
private MultiValuedNodePropertyPolicy multiValuedNodePropertyPolicy = MultiValuedNodePropertyPolicy.PutInSetIgnoringDuplicates;
@AllowedValues(allowedValues =
{"LeaveAsString", "Halt", "PutInSetIgnoringDuplicates", "PutInSetButHaltIfDuplicates"})
private MultiValuedNodePropertyPolicy multiValuedNodePropertyPolicy =
MultiValuedNodePropertyPolicy.PutInSetIgnoringDuplicates;

@Option(name = {"--relationship-property-policy"}, description = "Conversion policy for multi-valued relationship properties (default, 'LeaveAsString')")
@Option(name = {"--relationship-property-policy"}, description =
"Conversion policy for multi-valued relationship properties (default, 'LeaveAsString')")
@Once
@AllowedValues(allowedValues = {"LeaveAsString", "Halt"})
private MultiValuedRelationshipPropertyPolicy multiValuedRelationshipPropertyPolicy = MultiValuedRelationshipPropertyPolicy.LeaveAsString;
private MultiValuedRelationshipPropertyPolicy multiValuedRelationshipPropertyPolicy =
MultiValuedRelationshipPropertyPolicy.LeaveAsString;

@Option(name = {"--semi-colon-replacement"}, description = "Replacement for semi-colon character in multi-value string properties (default, ' ')")
@Option(name = {"--semi-colon-replacement"}, description =
"Replacement for semi-colon character in multi-value string properties (default, ' ')")
@Once
@Pattern(pattern = "^[^;]*$", description = "Replacement string cannot contain a semi-colon.")
private String semiColonReplacement = " ";
Expand All @@ -97,45 +106,58 @@ public class ConvertCsv implements Runnable {
private boolean inferTypes = false;

// Neptune bulk load options
@Option(name = {"--bulk-load-config"}, description = "Path to YAML file containing configuration for enabling bulk load to Neptune. " +
"If provided, configuration values are loaded from this file first, then overridden by any CLI parameters specified.")
@Option(name = {"--bulk-load-config"}, description = """
Path to YAML file containing configuration for enabling bulk load to Neptune.
If provided, configuration values are loaded from this file first,
then overridden by any CLI parameters specified.""")
@Path(mustExist = true, kind = PathKind.FILE)
@Once
private File bulkLoadConfigFile;

@Option(name = {"--bucket-name"}, description = "S3 bucket name for CSV files to be stored. " +
"Overrides bucket-name from bulk-load-config file if both are provided.")
@Option(name = {"--bucket-name"}, description = """
S3 bucket name for CSV files to be stored.
Overrides bucket-name from bulk-load-config file if both are provided.""")
@Once
private String bucketName;

@Option(name = {"--s3-prefix"}, description = "S3 prefix for uploaded file. " +
"Overrides s3-prefix from bulk-load-config file if both are provided.")
@Option(name = {"--s3-prefix"}, description =
"S3 prefix for uploaded file. Overrides s3-prefix from bulk-load-config file if both are provided.")
@Once
private String s3Prefix;

@Option(name = {"--neptune-endpoint"}, description =
"Neptune cluster endpoint. Example: my-neptune-cluster.cluster-abc123.<region>.neptune.amazonaws.com. " +
"Overrides neptune-endpoint from bulk-load-config file if both are provided. " +
"Either this parameter or --bulk-load-config must be provided to enable bulk loading.")
@Option(name = {"--neptune-endpoint"}, description = """
Neptune cluster endpoint
Example: my-neptune-cluster.cluster-abc123.<region>.neptune.amazonaws.com.
Overrides neptune-endpoint from bulk-load-config file if both are provided.
Either this parameter or --bulk-load-config must be provided to enable bulk loading.""")
@Once
private String neptuneEndpoint;

@Option(name = {"--iam-role-arn"}, description = "IAM role ARN for Neptune bulk loading. It will need S3 and Neptune access permissions. " +
"Overrides iam-role-arn from bulk-load-config file if both are provided. \n" +
"Refer to the following documentation for the specific policies/permissions required:\n" + //
"https://docs.aws.amazon.com/neptune/latest/userguide/bulk-load-tutorial-IAM-CreateRole.html\n" + //
"https://docs.aws.amazon.com/neptune/latest/userguide/bulk-load-tutorial-IAM-add-role-cluster.html")
@Option(name = {"--neptune-port"}, description = """
Port number to the Neptune cluster endpoint (default: 8182).
Overrides neptune-port from bulk-load-config file if both are provided.""")
@Once
private String neptunePort;

@Option(name = {"--iam-role-arn"}, description = """
IAM role ARN for Neptune bulk loading. It will need S3 and Neptune access permissions.
Overrides iam-role-arn from bulk-load-config file if both are provided.
Refer to the following documentation for the specific policies/permissions required:
https://docs.aws.amazon.com/neptune/latest/userguide/bulk-load-tutorial-IAM-CreateRole.html
https://docs.aws.amazon.com/neptune/latest/userguide/bulk-load-tutorial-IAM-add-role-cluster.html""")
@Once
private String iamRoleArn;

@Option(name = {"--parallelism"}, description = "Parallelism level for Neptune bulk loading (default: OVERSUBSCRIBE). " +
"Overrides parallelism from bulk-load-config file if both are provided.")
@Option(name = {"--parallelism"}, description = """
Parallelism level for Neptune bulk loading (default: OVERSUBSCRIBE).
Overrides parallelism from bulk-load-config file if both are provided.""")
@Once
@AllowedValues(allowedValues = {"LOW", "MEDIUM", "HIGH", "OVERSUBSCRIBE"})
private String parallelism;

@Option(name = {"--monitor"}, description = "Monitor Neptune bulk load progress until completion (default: false). " +
"Overrides monitor from bulk-load-config file if both are provided.")
@Option(name = {"--monitor"}, description = """
Monitor Neptune bulk load progress until completion (default: false).
Overrides monitor from bulk-load-config file if both are provided.""")
@Once
private boolean monitor;

Expand All @@ -155,7 +177,9 @@ public void run() {

// if no input file provided, it is via streaming
if (input == null) {
String uriInput, usernameInput, passwordInput;
String uriInput;
String usernameInput;
String passwordInput;
if (envFile != null) {
Dotenv dotenv = Dotenv.configure()
.directory(envFile.getParent())
Expand All @@ -169,7 +193,8 @@ public void run() {
usernameInput = username;
passwordInput = password;
}
try (Neo4jStreamWriter writer = new Neo4jStreamWriter(uriInput, usernameInput, passwordInput, directories)) {
try (Neo4jStreamWriter writer =
new Neo4jStreamWriter(uriInput, usernameInput, passwordInput, directories)) {
tempDataFile = writer.streamToFile();
}

Expand Down Expand Up @@ -199,8 +224,8 @@ public void run() {
if (bulkLoadConfig != null) {
try (NeptuneBulkLoader neptuneBulkLoader = new NeptuneBulkLoader(bulkLoadConfig)) {

String uri = directories.outputDirectory().toFile().getAbsolutePath();
String s3SourceUri = neptuneBulkLoader.uploadCsvFilesToS3(uri);
String convertedOutputDirectory = directories.outputDirectory().toFile().getAbsolutePath();
String s3SourceUri = neptuneBulkLoader.uploadCsvFilesToS3(convertedOutputDirectory);
String loadId = neptuneBulkLoader.startNeptuneBulkLoad(s3SourceUri);

if (bulkLoadConfig.isMonitor()) {
Expand All @@ -220,7 +245,7 @@ public void run() {
* @throws IllegalArgumentException if bulk loading is requested but configuration is invalid
* @throws IOException if there's an error reading the bulk load config file
*/
private BulkLoadConfig readBulkLoadConfig() throws Exception {
private BulkLoadConfig readBulkLoadConfig() throws IllegalArgumentException, IOException {
if (bulkLoadConfigFile == null && neptuneEndpoint == null) {
return null; // No bulk loading requested
}
Expand Down Expand Up @@ -275,10 +300,10 @@ private void processCsvInTwoPasses(File input, OutputFile vertexFile, OutputFile
conversionConfig);

while (iterator.hasNext()) {
CSVRecord record = iterator.next();
if (vertexMetadata.isVertex(record)) {
CSVRecord csvRecord = iterator.next();
if (vertexMetadata.isVertex(csvRecord)) {
processVertex(vertexFile, vertexIdMap, skippedVertexIds, vertexCount, skippedVertexCount,
vertexMetadata, record);
vertexMetadata, csvRecord);
}
}

Expand Down Expand Up @@ -306,9 +331,9 @@ private void processCsvInTwoPasses(File input, OutputFile vertexFile, OutputFile
vertexIdMap);

while (iterator.hasNext()) {
CSVRecord record = iterator.next();
if (edgeMetadata.isEdge(record)) {
processEdge(edgeFile, edgeCount, skippedEdgeCount, edgeMetadata, record);
CSVRecord csvRecord = iterator.next();
if (edgeMetadata.isEdge(csvRecord)) {
processEdge(edgeFile, edgeCount, skippedEdgeCount, edgeMetadata, csvRecord);
}
}

Expand All @@ -320,40 +345,43 @@ private void processCsvInTwoPasses(File input, OutputFile vertexFile, OutputFile
printStatistics(conversionConfig, vertexCount, skippedVertexCount, edgeCount, skippedEdgeCount);
}

private void processEdge(OutputFile edgeFile, AtomicLong edgeCount,
AtomicLong skippedEdgeCount, EdgeMetadata edgeMetadata, CSVRecord record) {
private void processEdge(OutputFile edgeFile, AtomicLong edgeCount, AtomicLong skippedEdgeCount,
EdgeMetadata edgeMetadata, CSVRecord csvRecord) throws UncheckedIOException {

edgeMetadata.toIterable(record).ifPresentOrElse(it -> {
edgeMetadata.toIterable(csvRecord).ifPresentOrElse(currEdgeRecord -> {
try {
edgeFile.printRecord(it);
edgeFile.printRecord(currEdgeRecord);
edgeCount.incrementAndGet();
} catch (IOException e) {
throw new RuntimeException(e);
e.printStackTrace();
throw new UncheckedIOException(e);
}
}, skippedEdgeCount::getAndIncrement);
}

private void processVertex(OutputFile vertexFile, Map<String, String> vertexIdMap, Set<String> skippedVertexIds,
AtomicLong vertexCount, AtomicLong skippedVertexCount, VertexMetadata vertexMetadata, CSVRecord record) {
vertexMetadata.toIterable(record).ifPresentOrElse(it -> {
private void processVertex(OutputFile vertexFile, Map<String, String> vertexIdMap,
Set<String> skippedVertexIds, AtomicLong vertexCount, AtomicLong skippedVertexCount,
VertexMetadata vertexMetadata, CSVRecord csvRecord) throws UncheckedIOException {
vertexMetadata.toIterable(csvRecord).ifPresentOrElse(currVertexRecord -> {
try {
vertexFile.printRecord(it);
vertexCount.incrementAndGet();

// Store mapping between original and transformed IDs
String originalId = record.get(0);
// Get the transformed ID from the vertex metadata
String transformedId = vertexMetadata.getVertexIdMap().get(originalId);
if (transformedId != null) {
vertexIdMap.put(originalId, transformedId);
}
vertexFile.printRecord(currVertexRecord);
} catch (IOException e) {
throw new RuntimeException("Error processing edge record: " + record, e);
e.printStackTrace();
throw new UncheckedIOException(e);
}
vertexCount.incrementAndGet();

// Store mapping between original and transformed IDs
String originalId = csvRecord.get(0);
// Get the transformed ID from the vertex metadata
String transformedId = vertexMetadata.getVertexIdMap().get(originalId);
if (transformedId != null) {
vertexIdMap.put(originalId, transformedId);
}
}, () -> {
// Record was skipped
skippedVertexCount.incrementAndGet();
skippedVertexIds.add(record.get(0));
skippedVertexIds.add(csvRecord.get(0));
});
}

Expand Down
Loading