-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathDataLoader.java
89 lines (81 loc) · 3.52 KB
/
DataLoader.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
package org.finos.orr;
import org.apache.spark.sql.*;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Stream;
/**
* Spark application to load data from a directory into a Spark table.
* The application takes 3 arguments:
* - run-name: The name of the run. This will be used as the name of the Spark application.
* - input-path: The path to the directory containing the data files.
* - catalog.schema: The name of the database/schema to use.
*
* For example:
* ["reportable_event",
* "/Volumes/opensource_reg_reporting/orr/cdm-trades/",
* "opensource_reg_reporting.orr"]
*/
public class DataLoader {
/**
* Main method for the Spark application.
* @param args Command line arguments.
* @throws IOException If an error occurs while reading the input files.
*/
public static void main(String[] args) throws IOException {
if (args.length != 3) {
System.out.println("Usage: <output-table> <input-path> <catalog.schema>");
System.out.println("""
Example: ["reportable_event",
"/Volumes/opensource_reg_reporting/orr/cdm-trades/",
"opensource_reg_reporting.orr"]
""");
return;
}
// Read command line arguments
String outputTable = args[0];
String inputPath = args[1];
String databaseName = args[2];
// Create a Spark session
SparkSession spark = SparkSession.builder().appName(outputTable).getOrCreate();
// Create a list of rows from the input files
List<Row> rows = createRows(inputPath);
// Define the schema for the table
StructType schema = new StructType(new StructField[]{
new StructField("identifier", DataTypes.IntegerType, false, Metadata.empty()),
new StructField("name", DataTypes.StringType, false, Metadata.empty()),
new StructField("data", DataTypes.StringType, false, Metadata.empty())
});
// Create a DataFrame from the list of rows and the schema
Dataset<Row> df = spark.createDataFrame(rows, schema);
// Parse the JSON data in the "data" column
df = df.withColumn("data", functions.parse_json(df.col("data")));
// Write the DataFrame to the output table
df.write().mode(SaveMode.Overwrite).saveAsTable("%s.%s".formatted(databaseName, outputTable));
}
/**
* Creates a list of rows from the input files.
* @param inputPath The path to the directory containing the input files.
* @return A list of rows.
* @throws IOException If an error occurs while reading the input files.
*/
private static List<Row> createRows(String inputPath) throws IOException {
List<Row> rows = new ArrayList<>();
// Iterate over the files in the input directory
try (Stream<Path> walk = Files.walk(Path.of(inputPath))) {
List<Path> list = walk.filter(Files::isRegularFile).toList();
for (int i = 0; i < list.size(); i++) {
Path p = list.get(i);
// Create a row for each file
rows.add(RowFactory.create(i, p.getFileName().toString(), Files.readString(p)));
}
}
return rows;
}
}