diff --git a/CHANGELOG.md b/CHANGELOG.md
index 78bd9296..f2dd035d 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -6,6 +6,62 @@ The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/)
and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.html)
+## [0.1.91] - 2026-03-05
+Upgrade DataPull core to Spark 3.5.0, Scala 2.12, Java 11
+
+Dependency upgrades (pom.xml):
+- Spark 2.4.6 -> 3.5.0, Scala 2.11 -> 2.12, Hadoop 2.10.1 -> 3.2.1
+- MongoDB Spark Connector 10.4.0 (new format API)
+- Cassandra Spark Connector 3.5.0 (driver 4.x)
+- Iceberg 1.5.0 (new platform support)
+- Elasticsearch 7.17.0 with REST clients
+- mssql-jdbc 11.2.3.jre11, ojdbc8 21.9.0.0, terajdbc4 17.20.00.12
+- ABRis 6.4.0, Snowflake 2.10.0, PostgreSQL 42.6.0
+- Guava shade plugin for Spark 3.5 classloader compatibility
+- Log4j 2.17.1 (CVE-2021-44228 fix retained)
+- Added expediahotelloader with Spark/Scala exclusions
+
+MongoDB modernization (DataFrameFromTo.scala):
+- Replaced MongoSpark/ReadConfig/WriteConfig with format("mongodb") API
+- Replaced MongoClient/MongoClientURI with MongoClients.create()
+- Updated mongodbToDataFrame, dataFrameToMongodb, mongoRunCommand
+
+New Iceberg support (DataFrameFromTo.scala, Migration.scala):
+- Added dataFrameToIceberg with MERGE INTO SQL support
+- Added icebergToDataFrame for SQL-based reads
+- Added iceberg as source/destination platform in Migration
+
+Spark 3.5 compatibility (DataPull.scala):
+- Cassandra UUIDs -> Uuids (driver 4.x)
+- Binary type from org.bson.types.Binary
+- DataPull object extends Serializable
+- Hive caseSensitiveInferenceMode: INFER_ONLY -> NEVER_INFER
+- Hive metastore.version config commented out
+
+IMDSv2 and MSSQL fixes (Helper.scala):
+- Added imdsv2Token() for EC2 metadata token-based access
+- Updated GetEC2pkcs7() and GetEC2Role() with IMDSv2 headers
+- MSSQL JDBC URL: added encrypt and trustServerCertificate params
+- Commented out URI logging to prevent credential exposure
+
+All homeaway bug fixes preserved (v0.1.83-0.1.90):
+- ConcurrentHashMap for thread-safe stepPipelineMap
+- Subnet NULL/invalid validation with default pool fallback
+- Credentials display fix, duplicate tags fix
+- Default 'datapullemr' application tag
+- url parameter in RDBMS methods, KMS encryption support
+- setExternalSparkConf, ReplaceInlineExpressions
+
+### Changed
+core/pom.xml
+core/src/main/scala/core/DataFrameFromTo.scala
+core/src/main/scala/core/DataPull.scala
+core/src/main/scala/core/Migration.scala
+core/src/main/scala/core/Controller.scala
+core/src/main/scala/helper/Helper.scala
+core/src/main/resources/Samples/Input_Json_Specification.json
+
+
## [0.1.90] - 2026-02-16
Addressing "key not found: url" Exception
### Changed
diff --git a/api/pom.xml b/api/pom.xml
index ed614a3a..9188379f 100644
--- a/api/pom.xml
+++ b/api/pom.xml
@@ -51,7 +51,7 @@
org.projectlombok
lombok
- 1.16.20
+ 1.18.42
com.fasterxml.jackson.core
diff --git a/api/src/main/java/com/homeaway/datapullclient/config/EMRProperties.java b/api/src/main/java/com/homeaway/datapullclient/config/EMRProperties.java
index 81a27ce2..ebf29c2e 100644
--- a/api/src/main/java/com/homeaway/datapullclient/config/EMRProperties.java
+++ b/api/src/main/java/com/homeaway/datapullclient/config/EMRProperties.java
@@ -48,10 +48,10 @@ public class EMRProperties {
@Value("${instance_count:6}")
private int instanceCount;
- @Value("${master_type:m6g.xlarge}")
+ @Value("${master_type:m7g.xlarge}")
private String masterType;
- @Value("${slave_type:m6g.xlarge}")
+ @Value("${slave_type:m7g.xlarge}")
private String slaveType;
@Value("${service_role:emr_datapull_role}")
@@ -60,7 +60,7 @@ public class EMRProperties {
@Value("${job_flow_role:emr_ec2_datapull_role}")
private String jobFlowRole;
- @Value("${emr_release:emr-5.36.2}")
+ @Value("${emr_release:emr-7.12.0}")
private String emrRelease;
@Value( "${bootstrap_folder_path:}" )
diff --git a/api/src/main/java/com/homeaway/datapullclient/process/DataPullRequestProcessor.java b/api/src/main/java/com/homeaway/datapullclient/process/DataPullRequestProcessor.java
index 5af17ffb..7d0a0c49 100644
--- a/api/src/main/java/com/homeaway/datapullclient/process/DataPullRequestProcessor.java
+++ b/api/src/main/java/com/homeaway/datapullclient/process/DataPullRequestProcessor.java
@@ -258,7 +258,7 @@ private void runDataPull(String json, boolean isStart, boolean validateJson) thr
log.debug("runDataPull <- return");
}
- List rotateSubnets(){
+ synchronized List rotateSubnets(){
if(subnets.isEmpty()){
subnets= getSubnet();
@@ -268,7 +268,8 @@ List rotateSubnets(){
subnets.clear();
subnets.addAll(subnetIds_shuffled);
}
- return subnets;
+ subnets.removeIf(s -> StringUtils.isBlank(s));
+ return new ArrayList<>(subnets);
}
Map> getStepForPipeline(){
@@ -279,7 +280,9 @@ public List getSubnet(){
List subnetIds = new ArrayList<>();
- subnetIds.add(dataPullProperties.getApplicationSubnet1());
+ if (StringUtils.isNotBlank(dataPullProperties.getApplicationSubnet1())) {
+ subnetIds.add(dataPullProperties.getApplicationSubnet1());
+ }
if (StringUtils.isNotBlank(dataPullProperties.getApplicationSubnet2())) {
subnetIds.add(dataPullProperties.getApplicationSubnet2());
diff --git a/api/src/main/java/com/homeaway/datapullclient/process/DataPullTask.java b/api/src/main/java/com/homeaway/datapullclient/process/DataPullTask.java
index e45ddcce..33607dd0 100644
--- a/api/src/main/java/com/homeaway/datapullclient/process/DataPullTask.java
+++ b/api/src/main/java/com/homeaway/datapullclient/process/DataPullTask.java
@@ -31,6 +31,8 @@
import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.Collectors;
+
+
@Slf4j
@Data
public class DataPullTask implements Runnable {
@@ -205,7 +207,27 @@ private List prepareSparkSubmitParams(final String SparkSubmitParams) {
final List sparkSubmitParamsList = new ArrayList<>();
String[] sparkSubmitParamsArray = null;
if (SparkSubmitParams != "") {
- sparkSubmitParamsArray = SparkSubmitParams.split("\\s+");
+ // Fix 1: Upgrade Scala 2.11 packages to 2.12 for Spark 3.5 compatibility
+ String normalizedParams = SparkSubmitParams.replaceAll("_2\\.11:[^,\\s]+", "_2.12:3.5.0");
+
+ // Fix 2: If writeLegacyFormat=true is present (INT96 timestamps), inject rebase modes
+ // to prevent SparkUpgradeException on Spark 3.2+. Only inject if not already set.
+ // Must insert before --class so the flags are not passed as app args to main().
+ if (normalizedParams.contains("spark.sql.parquet.writeLegacyFormat=true")
+ && !normalizedParams.contains("spark.sql.parquet.int96RebaseModeInWrite")) {
+ String rebaseConfs = " --conf spark.sql.parquet.int96RebaseModeInWrite=LEGACY"
+ + " --conf spark.sql.parquet.int96RebaseModeInRead=LEGACY";
+ int classIdx = normalizedParams.indexOf(" --class ");
+ if (classIdx >= 0) {
+ normalizedParams = normalizedParams.substring(0, classIdx)
+ + rebaseConfs
+ + normalizedParams.substring(classIdx);
+ } else {
+ normalizedParams = normalizedParams + rebaseConfs;
+ }
+ }
+
+ sparkSubmitParamsArray = normalizedParams.split("\\s+");
sparkSubmitParamsList.add("spark-submit");
@@ -224,7 +246,7 @@ private RunJobFlowResult runTaskInNewCluster(final AmazonElasticMapReduce emr, f
sparkSubmitParamsList = (ArrayList) this.prepareSparkSubmitParams(sparkSubmitParams);
} else {
List sparkBaseParams = new ArrayList<>();
- sparkBaseParams.addAll(toList(new String[]{"spark-submit", "--conf", "spark.default.parallelism=3", "--conf", "spark.storage.blockManagerSlaveTimeoutMs=1200s", "--conf", "spark.executor.heartbeatInterval=900s", "--conf", "spark.driver.extraJavaOptions=-Djavax.net.ssl.trustStore=/etc/pki/java/cacerts/ -Djavax.net.ssl.trustStorePassword=changeit", "--conf", "spark.executor.extraJavaOptions=-Djavax.net.ssl.trustStore=/etc/pki/java/cacerts/ -Djavax.net.ssl.trustStorePassword=changeit", "--packages", "org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.4,org.apache.spark:spark-avro_2.11:2.4.4", "--class", DataPullTask.MAIN_CLASS, jarPath}));
+ sparkBaseParams.addAll(toList(new String[]{"spark-submit", "--conf", "spark.default.parallelism=3", "--conf", "spark.storage.blockManagerSlaveTimeoutMs=1200s", "--conf", "spark.executor.heartbeatInterval=900s", "--conf", "spark.driver.extraJavaOptions=-Djavax.net.ssl.trustStore=/etc/pki/java/cacerts/ -Djavax.net.ssl.trustStorePassword=changeit", "--conf", "spark.executor.extraJavaOptions=-Djavax.net.ssl.trustStore=/etc/pki/java/cacerts/ -Djavax.net.ssl.trustStorePassword=changeit", "--packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0,org.apache.spark:spark-avro_2.12:3.5.0", "--class", DataPullTask.MAIN_CLASS, jarPath}));
sparkSubmitParamsList.addAll(sparkBaseParams);
}
@@ -279,7 +301,19 @@ private RunJobFlowResult runTaskInNewCluster(final AmazonElasticMapReduce emr, f
Map hdfsProperties = this.clusterProperties.getHdfsProperties();
- Map sparkDefaultsProperties = this.clusterProperties.getSparkDefaultsProperties();
+ // Safety defaults for Spark 3.x compatibility.
+ // Pipeline-level spark_defaults_properties override these if explicitly set.
+ Map sparkDefaultsProperties = new HashMap<>();
+ sparkDefaultsProperties.put("spark.sql.parquet.datetimeRebaseModeInWrite", "LEGACY");
+ sparkDefaultsProperties.put("spark.sql.parquet.datetimeRebaseModeInRead", "LEGACY");
+ // Spark 3.x changed timeParserPolicy default to CORRECTED: to_date(col) without a format
+ // string silently returns NULL for non-standard date formats. LEGACY restores Spark 2.x
+ // lenient parsing so existing sql.query expressions continue to produce correct results.
+ sparkDefaultsProperties.put("spark.sql.legacy.timeParserPolicy", "LEGACY");
+ // Spark 3.x changed storeAssignmentPolicy default from LEGACY to ANSI: implicit type casts
+ // on Hive writes (e.g. INT -> BIGINT) that worked in Spark 2.x now throw AnalysisException.
+ sparkDefaultsProperties.put("spark.sql.storeAssignmentPolicy", "LEGACY");
+ sparkDefaultsProperties.putAll(this.clusterProperties.getSparkDefaultsProperties());
Map sparkEnvProperties = this.clusterProperties.getSparkEnvProperties();
Map sparkMetricsProperties = this.clusterProperties.getSparkMetricsProperties();
@@ -409,9 +443,9 @@ private JobFlowInstancesConfig getJobFlowInstancesConfig(EMRProperties emrProper
.withInstanceTypeConfigs(workerInstanceTypeConfig)
.withTargetOnDemandCapacity(count);
- System.out.println("Printing random subnet : " + subnets);
+ log.info("Printing random subnet : {}", subnets);
- System.out.println("Printing selected subnet-ID for EMR cluster creation : " + subnets.get(0));
+ log.info("Printing selected subnet-ID for EMR cluster creation : {}", subnets.isEmpty() ? "N/A" : subnets.get(0));
final String masterSG = emrProperties.getEmrSecurityGroupMaster();
final String slaveSG = emrProperties.getEmrSecurityGroupSlave();
@@ -423,10 +457,6 @@ private JobFlowInstancesConfig getJobFlowInstancesConfig(EMRProperties emrProper
final String serviceAccessSecurityGroup = Objects.toString(
this.clusterProperties.getServiceAccessSecurityGroup(), serviceAccesss != null ? serviceAccesss : "");
- if(StringUtils.isNotBlank(clusterProperties.getSubnetId())){
- subnets.add(0,clusterProperties.getSubnetId());
- }
-
// Introducing below logic to address null and invalid subnet issue
String getSubnetId = clusterProperties.getSubnetId();
String finalSubnetId;
@@ -434,20 +464,20 @@ private JobFlowInstancesConfig getJobFlowInstancesConfig(EMRProperties emrProper
if (StringUtils.isNotBlank(getSubnetId) && getSubnetId.startsWith("subnet-")) {
finalSubnetId = getSubnetId;
- System.out.println("Subnet '" + finalSubnetId + "' provided by the user will be used for EMR cluster creation.");
+ log.info("Subnet '{}' provided by the user will be used for EMR cluster creation.", finalSubnetId);
} else {
if (StringUtils.isNotBlank(getSubnetId)) {
- System.out.println("The user provided an invalid value '" + getSubnetId + "' for subnet. Hence, default subnet pool will be used for EMR creation.");
+ log.warn("The user provided an invalid value '{}' for subnet. Hence, default subnet pool will be used for EMR creation.", getSubnetId);
} else {
- System.out.println("The user either provided a NULL value for the subnet or did not specify subnet in the payload. Hence, the default subnet pool will be used for EMR creation.");
+ log.info("The user either provided a NULL value for the subnet or did not specify subnet in the payload. Hence, the default subnet pool will be used for EMR creation.");
}
- Set subnetsDeduped = new LinkedHashSet<>(subnets);
- subnets.clear();
- subnets.addAll(subnetsDeduped);
+ if (subnets.isEmpty()) {
+ throw new IllegalStateException("No valid subnets available in the default subnet pool. Please configure at least one application_subnet property.");
+ }
finalSubnetId = subnets.get(0);
- System.out.println("EMR cluster will be created using a subnet from the default subnet pool: " + finalSubnetId);
+ log.info("EMR cluster will be created using a subnet from the default subnet pool: {}", finalSubnetId);
}
final JobFlowInstancesConfig jobConfig = new JobFlowInstancesConfig()
@@ -496,7 +526,7 @@ private void runTaskOnExistingCluster(final String id, final String jarPath, fin
sparkSubmitParamsListOnExistingCluster = this.prepareSparkSubmitParams(sparkSubmitParams);
} else {
List sparkBaseParams = new ArrayList<>();
- sparkBaseParams.addAll(toList(new String[]{"spark-submit", "--conf", "spark.default.parallelism=3", "--conf", "spark.storage.blockManagerSlaveTimeoutMs=1200s", "--conf", "spark.executor.heartbeatInterval=900s", "--conf", "spark.driver.extraJavaOptions=-Djavax.net.ssl.trustStore=/etc/pki/java/cacerts/ -Djavax.net.ssl.trustStorePassword=changeit", "--conf", "spark.executor.extraJavaOptions=-Djavax.net.ssl.trustStore=/etc/pki/java/cacerts/ -Djavax.net.ssl.trustStorePassword=changeit", "--packages", "org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.4,org.apache.spark:spark-avro_2.11:2.4.4", "--class", DataPullTask.MAIN_CLASS, jarPath}));
+ sparkBaseParams.addAll(toList(new String[]{"spark-submit", "--conf", "spark.default.parallelism=3", "--conf", "spark.storage.blockManagerSlaveTimeoutMs=1200s", "--conf", "spark.executor.heartbeatInterval=900s", "--conf", "spark.driver.extraJavaOptions=-Djavax.net.ssl.trustStore=/etc/pki/java/cacerts/ -Djavax.net.ssl.trustStorePassword=changeit", "--conf", "spark.executor.extraJavaOptions=-Djavax.net.ssl.trustStore=/etc/pki/java/cacerts/ -Djavax.net.ssl.trustStorePassword=changeit", "--packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0,org.apache.spark:spark-avro_2.12:3.5.0", "--class", DataPullTask.MAIN_CLASS, jarPath}));
sparkSubmitParamsListOnExistingCluster.addAll(sparkBaseParams);
}
diff --git a/core/pom.xml b/core/pom.xml
index 73244d99..0f1102d9 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -9,11 +9,14 @@
1.0-SNAPSHOT
- 2.11
- 2.4.6
- 2.4
- 3.2.0
- 2.10.1
+ 2.12
+ 2.12.18
+ 3.5.0
+ 4.0.0
+ 3.4.2
+ 5.5.1
+ 6.4.0
+ 3.2
1.11.1019
@@ -42,10 +45,41 @@
+
+ org.apache.maven.plugins
+ maven-shade-plugin
+ 2.3
+
+
+ package
+
+ shade
+
+
+
+
+ com.google.guava:*
+
+
+ false
+
+
+ com.google.common
+ repackaged.com.google.common
+
+
+ true
+
+
+
+
net.alchim31.maven
scala-maven-plugin
- 3.2.2
+ 4.8.1
+
+ ${scala.full.version}
+
@@ -59,7 +93,7 @@
org.apache.maven.plugins
maven-jar-plugin
- ${maven.plugin.version}
+ 3.0.2
@@ -73,7 +107,7 @@
org.apache.maven.plugins
maven-assembly-plugin
- ${maven.plugin.version}
+ 3.0.0
@@ -102,27 +136,33 @@
za.co.absa
- abris_${scala.binary.version}
- 4.2.0
+ abris_${scala.version}
+ ${abris.version}
-
+
com.datastax.spark
- spark-cassandra-connector_${scala.binary.version}
- 2.4.3
+ spark-cassandra-connector_2.12
+ 3.5.0
-
+
io.netty
netty-all
+
+ com.github.jnr
+ jnr-posix
+ 3.1.20
+
+
com.microsoft.sqlserver
mssql-jdbc
- 6.1.0.jre8
+ 11.2.3.jre11
@@ -131,24 +171,30 @@
1.3.1
+
+ com.google.guava
+ guava
+ 28.1-jre
+
+
org.postgresql
postgresql
- 42.2.1
+ 42.6.0
org.scalatest
- scalatest_${scala.binary.version}
+ scalatest_${scala.version}
3.2.0-SNAP10
test
org.scalacheck
- scalacheck_${scala.binary.version}
+ scalacheck_${scala.version}
1.14.0
test
@@ -170,13 +216,14 @@
mysql
mysql-connector-java
- [8.0.16,)
+ 8.0.26
org.influxdb
influxdb-java
2.14
+
com.amazonaws
aws-java-sdk-s3
@@ -191,11 +238,11 @@
org.apache.spark
- spark-core_${scala.binary.version}
+ spark-core_${scala.version}
${spark.version}
provided
-
+
io.netty
netty-all
@@ -204,53 +251,56 @@
org.apache.spark
- spark-sql_${scala.binary.version}
+ spark-sql_${scala.version}
${spark.version}
-
+ provided
org.apache.spark
- spark-sql-kafka-0-10_${scala.binary.version}
+ spark-sql-kafka-0-10_${scala.version}
${spark.version}
provided
-
-
+
+
+ org.apache.spark
+ spark-catalyst_${scala.version}
+ ${spark.version}
+ provided
+
+
+ org.apache.spark
+ spark-common-utils_${scala.version}
+ ${spark.version}
+ provided
+
+
org.mongodb.spark
- mongo-spark-connector_${scala.binary.version}
- 2.4.2
+ mongo-spark-connector_2.12
+ 10.4.0
org.apache.hadoop
hadoop-client
- ${hadoop.version}
+ 3.2.1
provided
org.apache.hadoop
hadoop-aws
- ${hadoop.version}
-
-
- org.apache.hadoop
- hadoop-common
- ${hadoop.version}
+ 3.2.1
+
org.apache.httpcomponents
httpclient
4.5.12
-
- com.databricks
- spark-csv_${scala.binary.version}
- 1.3.0
-
io.netty
netty-all
@@ -259,15 +309,15 @@
org.apache.spark
- spark-avro_${scala.binary.version}
+ spark-avro_${scala.version}
${spark.version}
provided
org.json4s
- json4s-jackson_${scala.binary.version}
- 3.7.0-M16
+ json4s-jackson_${scala.version}
+ 3.7.0-M4
@@ -276,62 +326,98 @@
1.6.2
+
+ com.sun.mail
+ javax.mail
+ 1.6.2
+
+
org.apache.kafka
kafka-clients
- 0.11.0.1
+ 2.8.1
+
com.googlecode.json-simple
json-simple
1.1.1
+
+
+ neo4j-contrib
+ neo4j-spark-connector
+ 2.1.0-M4
+
+
com.typesafe
config
- 1.2.1
+ 1.4.2
+
+
+ org.elasticsearch.client
+ elasticsearch-rest-client
+ 8.17.10
-
org.elasticsearch
- elasticsearch-spark-20_${scala.binary.version}
- 8.11.1
+ elasticsearch
+ 8.17.10
+
+
+ org.elasticsearch
+ elasticsearch-spark-30_2.12
+ 8.17.10
-
commons-httpclient
commons-httpclient
3.1
+
org.mongodb
- mongo-java-driver
- 3.8.2
+ mongodb-driver-sync
+ 4.10.1
+
org.mongodb.scala
- mongo-scala-bson_${scala.binary.version}
- 2.4.2
+ mongo-scala-bson_${scala.version}
+ 4.10.1
-
org.mongodb
bson
- 3.8.2
-
-
-
- org.mongodb.scala
- mongo-scala-driver_${scala.binary.version}
- 2.4.2
+ 4.10.1
com.springml
- spark-sftp_${scala.binary.version}
+ spark-sftp_2.11
1.1.3
+
+
+ com.databricks
+ spark-avro_2.11
+
+
+ com.databricks
+ spark-xml_2.11
+
+
+
+ org.apache.spark
+ *
+
+
+ org.scala-lang
+ *
+
+
@@ -342,19 +428,15 @@
org.scala-lang.modules
- scala-xml_${scala.binary.version}
+ scala-xml_${scala.version}
1.3.0
-
- com.google.guava
- guava
- 28.1-jre
-
+
net.snowflake
- spark-snowflake_${scala.binary.version}
- 2.8.2-spark_${spark.minor.version}
+ spark-snowflake_${scala.version}
+ 2.10.0-spark_${spark.minor.version}
net.snowflake
@@ -362,17 +444,62 @@
3.12.13
+
+
+ org.apache.iceberg
+ iceberg-spark-runtime-3.5_2.12
+ 1.5.0
+
+
com.fasterxml.jackson.dataformat
jackson-dataformat-yaml
2.12.3
-
+
org.codehaus.jettison
jettison
1.4.1
+
+
+ com.homeaway.datatools
+ expediahotelloader
+ 1.0-SNAPSHOT
+
+
+ net.java.dev.jets3t
+ jets3t
+
+
+
+ org.apache.spark
+ *
+
+
+ org.scala-lang
+ *
+
+
+ com.oracle.jdbc
+ ojdbc7
+
+
+
+
+
+ com.oracle.database.jdbc
+ ojdbc8
+ 21.9.0.0
+
+
+
+ com.teradata.jdbc
+ terajdbc4
+ 17.20.00.12
+
+
diff --git a/core/src/main/resources/Samples/Input_Json_Specification.json b/core/src/main/resources/Samples/Input_Json_Specification.json
index 4901b092..e645757c 100644
--- a/core/src/main/resources/Samples/Input_Json_Specification.json
+++ b/core/src/main/resources/Samples/Input_Json_Specification.json
@@ -291,6 +291,26 @@
"fs.s3.connection.ssl.enabled": "true"
}
},
+ "source_iceberg": {
+ "platform": "iceberg",
+ "comment_query": "Mandatory, the provided query will be executed as sql in Spark Session to read from Iceberg table",
+ "query": "select * from catalog.db.table"
+ },
+ "destination_iceberg": {
+ "platform": "iceberg",
+ "comment_table": "Mandatory, the fully qualified Iceberg table name",
+ "table": "catalog.db.table",
+ "comment_database": "Mandatory, the database/namespace of the Iceberg table",
+ "database": "catalog.db",
+ "comment_savemode": "Optional, defaults to append. Options: append, overwrite",
+ "savemode": "append",
+ "comment_ismergeinto": "Optional, defaults to false. Set to true to use MERGE INTO SQL",
+ "ismergeinto": "false",
+ "comment_mergeintosql": "Optional, required when ismergeinto is true. Must start with 'MERGE INTO'",
+ "mergeintosql": "MERGE INTO catalog.db.target target USING source_view source ON target.id = source.id WHEN MATCHED THEN UPDATE SET * WHEN NOT MATCHED THEN INSERT *",
+ "comment_sparkoptions": "Optional, additional spark options as key:value pairs. e.g. overwrite-mode:dynamic",
+ "sparkoptions": "overwrite-mode:dynamic"
+ },
"source_kafka": {
"platform": "kafka",
"comment_bootstrapservers": "comma-separated list of bootstrap servers for the kafka cluster",
diff --git a/core/src/main/scala/core/Controller.scala b/core/src/main/scala/core/Controller.scala
index ecf1c831..1ed0fbca 100644
--- a/core/src/main/scala/core/Controller.scala
+++ b/core/src/main/scala/core/Controller.scala
@@ -22,9 +22,11 @@ import java.util.concurrent.Executors
import com.amazonaws.services.simpleemail.model.{Body, Content, Destination}
import config.AppConfig
-import javax.mail._
import helper._
+
+import javax.mail._
import javax.mail.internet.{InternetAddress, MimeMessage}
+
import logging._
import org.codehaus.jettison.json.JSONArray
import security._
@@ -94,7 +96,7 @@ class Controller(appConfig: AppConfig, pipeline: String) {
val migration = new Migration()
def runner(migrationJsonString: String) = Future {
- migration.migrate(migrationJsonString, reportEmailAddress, poolId, verifymigration, reportCounts, no_of_retries, custom_retries, jobId, sparkSession.sparkContext.isLocal, preciseCounts, appConfig, pipelineName)
+ migration.migrate(migrationJsonString, reportEmailAddress, poolId, verifymigration, reportCounts, no_of_retries, custom_retries, jobId, sparkSession, preciseCounts, appConfig, pipelineName)
}
try {
@@ -123,7 +125,7 @@ class Controller(appConfig: AppConfig, pipeline: String) {
breakableLoop.breakable {
for (i <- 0 to migrations.length() - 1) {
var migration = new Migration()
- val migrationResult: Map[String, String] = migration.migrate(migrations.getJSONObject(i).toString(), reportEmailAddress, i.toString, verifymigration, reportCounts, no_of_retries, custom_retries, jobId, sparkSession.sparkContext.isLocal, preciseCounts, appConfig, pipelineName)
+ val migrationResult: Map[String, String] = migration.migrate(migrations.getJSONObject(i).toString(), reportEmailAddress, i.toString, verifymigration, reportCounts, no_of_retries, custom_retries, jobId, sparkSession, preciseCounts, appConfig, pipelineName)
reportbodyHtml.append(migrationResult("reportRowHtml"))
if (migrationResult("migrationError") != null)
migrationErrors += migrationResult("migrationError")
diff --git a/core/src/main/scala/core/DataFrameFromTo.scala b/core/src/main/scala/core/DataFrameFromTo.scala
index fad45286..6c59c879 100644
--- a/core/src/main/scala/core/DataFrameFromTo.scala
+++ b/core/src/main/scala/core/DataFrameFromTo.scala
@@ -30,11 +30,11 @@ import com.amazonaws.services.s3.{AmazonS3, AmazonS3ClientBuilder}
import com.datastax.spark.connector.cql.CassandraConnector
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory
-import com.mongodb.client.{MongoCollection, MongoCursor, MongoDatabase}
-import com.mongodb.spark.MongoSpark
-import com.mongodb.spark.config.{ReadConfig, WriteConfig}
-import com.mongodb.spark.sql.toSparkSessionFunctions
-import com.mongodb.{MongoClient, MongoClientURI}
+import org.apache.spark.sql.functions.{col, monotonically_increasing_id}
+import org.apache.spark.sql.types.IntegerType
+import org.apache.spark.sql.Encoders
+
+import scala.collection.mutable
import config.AppConfig
import core.DataPull.jsonObjectPropertiesToMap
import helper._
@@ -101,14 +101,14 @@ class DataFrameFromTo(appConfig: AppConfig, pipeline: String) extends Serializab
"fileType" -> fileFormat
)
}
- if (fileFormat == "csv") {
+ if (fileFormat.equalsIgnoreCase("csv")) {
sparkOptions = sparkOptions ++ Map(
"delimiter" -> delimiter,
"mode" -> "DROPMALFORMED",
"header" -> "true",
"charset" -> charset
)
- } else if (fileFormat == "parquet") {
+ } else if (fileFormat.equalsIgnoreCase("parquet")) {
sparkOptions = sparkOptions ++ Map(
"mergeSchema" -> mergeSchema.toString.toLowerCase
)
@@ -232,7 +232,7 @@ class DataFrameFromTo(appConfig: AppConfig, pipeline: String) extends Serializab
var dft = sparkSession.emptyDataFrame
if (coalescefilecount == null) {
- if (fileFormat == "csv") {
+ if (fileFormat.equalsIgnoreCase("csv")) {
dft = df.coalesce(1)
}
else {
@@ -270,7 +270,8 @@ class DataFrameFromTo(appConfig: AppConfig, pipeline: String) extends Serializab
df.write.
format("com.springml.spark.sftp").
options(sparkOptions).
- save(filePath)
+ option("path", filePath).
+ save()
} else if (rowFromJsonString) {
@@ -320,23 +321,18 @@ class DataFrameFromTo(appConfig: AppConfig, pipeline: String) extends Serializab
if (groupByFields != "") {
dfWriter = dfWriter.partitionBy(groupByFieldsArray: _*)
}
- if (fileFormat == "json") {
- dfWriter.json(s"$filePrefixString$filePath")
- } else if (fileFormat == "csv") {
- dfWriter.csv(s"$filePrefixString$filePath")
- } else if (fileFormat == "avro") {
- dfWriter.save(s"$filePrefixString$filePath")
- } else if (fileFormat == "orc") {
- dfWriter.orc(s"$filePrefixString$filePath")
- } else if (fileFormat == "sequencefile") {
+ val normalizedFormat = fileFormat.toLowerCase
+ if (normalizedFormat == "json" | normalizedFormat == "csv" | normalizedFormat == "avro"
+ | normalizedFormat == "orc" | normalizedFormat == "parquet") {
+ dfWriter.format(normalizedFormat).option("path",s"$filePrefixString$filePath").save()
+ } else if (normalizedFormat == "sequencefile") {
dft.toJSON.rdd.zipWithIndex.map { case (v, i) => (i, v) }.saveAsSequenceFile(s"$filePrefixString$filePath", Some(classOf[org.apache.hadoop.io.compress.DefaultCodec]))
- } else if (fileFormat == "sequencefilesnappy") {
+ } else if (normalizedFormat == "sequencefilesnappy") {
dft.toJSON.rdd.zipWithIndex.map { case (v, i) => (i, v) }.saveAsSequenceFile(s"$filePrefixString$filePath", Some(classOf[org.apache.hadoop.io.compress.SnappyCodec]))
- } else if (fileFormat == "sequencefiledeflate") {
+ } else if (normalizedFormat == "sequencefiledeflate") {
dft.toJSON.rdd.zipWithIndex.map { case (v, i) => (i, v) }.saveAsSequenceFile(s"$filePrefixString$filePath", Some(classOf[org.apache.hadoop.io.compress.DeflateCodec]))
} else {
- //parquet
- dfWriter.parquet(s"$filePrefixString$filePath")
+ throw new UnsupportedOperationException(s"Unsupported file format: $fileFormat")
}
}
}
@@ -768,7 +764,13 @@ class DataFrameFromTo(appConfig: AppConfig, pipeline: String) extends Serializab
} catch {
case e: Throwable => e.printStackTrace
throw (e)
- }
+ } finally {
+ if (connection != null) {
+ if (!connection.isClosed) {
+ connection.close()
+ }
+ }
+ }
}
def mongodbToDataFrame(awsEnv: String, cluster: String, overrideconnector: String, database: String, authenticationDatabase: String, collection: String, login: String, password: String, sparkSession: org.apache.spark.sql.SparkSession, vaultEnv: String, addlSparkOptions: JSONObject, secretStore: String, authenticationEnabled: String, tmpFileLocation: String, sampleSize: String, sslEnabled: String): org.apache.spark.sql.DataFrame = {
@@ -796,45 +798,54 @@ class DataFrameFromTo(appConfig: AppConfig, pipeline: String) extends Serializab
}
uri = helper.buildMongoURI(vaultLogin, vaultPassword, cluster, null, authenticationDatabase, database, collection, authenticationEnabled.toBoolean, sslEnabled)
if (overrideconnector.toBoolean) {
- var mongoClient: MongoClient = new MongoClient(new MongoClientURI(uri))
- var mdatabase: MongoDatabase = mongoClient.getDatabase("" + database);
- var col: MongoCollection[Document] = mdatabase.getCollection(collection);
- var cur: MongoCursor[Document] = col.find().iterator()
- var doc: org.bson.Document = null
- val list = new ListBuffer[String]()
- val tmp_location = tmpFileLocation
- var df_temp = sparkSession.emptyDataFrame
- var df_big = sparkSession.emptyDataFrame
- var part = 1
- var tmp_location_local = tmp_location + "/partition=" + part + "/"
-
- import sparkSession.implicits._
- while (cur.hasNext()) {
- doc = cur.next();
- list += (doc.toJson)
- if (list.length >= 20000) {
- df_temp = list.toList.toDF("jsonfield")
- df_temp.write.mode(SaveMode.Append).json(tmp_location_local)
- list.clear()
- part += 1
- tmp_location_local = tmp_location + "/partition=" + part + "/"
- }
- }
- df_temp = list.toList.toDF("jsonfield")
- df_temp.write.mode(SaveMode.Append).json(tmp_location_local)
- list.clear()
- df_big = sparkSession.read.json(tmp_location).withColumnRenamed("value", "jsonfield")
- return df_big
- }
- else {
- var sparkOptions = Map("uri" -> uri)
+ // MongoDB Read using MongoDB Spark Connector
+ val readConfig = Map(
+ "spark.mongodb.read.connection.uri" -> uri,
+ "database" -> database,
+ "collection" -> collection
+ )
+ val dfMongo = sparkSession.read.format("mongodb").options(readConfig).load()
+
+ // Convert each row to a whole-document JSON string (matching old behavior)
+ import org.apache.spark.sql.functions.{struct, to_json}
+ val dfAsJson = dfMongo.withColumn("jsonfield", to_json(struct(dfMongo.columns.map(col): _*)))
+ .select("jsonfield")
+
+ // Write to temporary JSON files with 20K-row partitioning
+ val tmpLocation = if (tmpFileLocation.startsWith("s3://") || tmpFileLocation.startsWith("s3a://")) tmpFileLocation else "s3a://" + tmpFileLocation
+ val partitionedDF = dfAsJson.withColumn("partition", (monotonically_increasing_id() / 20000).cast(IntegerType))
+
+ partitionedDF.write
+ .mode(SaveMode.Overwrite)
+ .partitionBy("partition")
+ .json(tmpLocation)
+
+ // Read back the data
+ val dfBig = sparkSession.read
+ .json(tmpLocation)
+ .select("jsonfield")
+
+ dfBig
+
+ } else {
+ // Additional Spark MongoDB options
+ var sparkOptions = Map(
+ "spark.mongodb.read.connection.uri" -> uri,
+ "database" -> database,
+ "collection" -> collection
+ )
if (addlSparkOptions != null) {
sparkOptions = sparkOptions ++ jsonObjectPropertiesToMap(addlSparkOptions)
}
if (sampleSize != null) {
- sparkOptions = sparkOptions ++ Map("spark.mongodb.input.sample.sampleSize" -> sampleSize, "sampleSize" -> sampleSize)
+ sparkOptions = sparkOptions ++ Map(
+ "sampleSize" -> sampleSize,
+ "spark.mongodb.input.sample.sampleSize" -> sampleSize
+ )
}
- val df = sparkSession.loadFromMongoDB(ReadConfig(sparkOptions))
+
+ // Load data directly from MongoDB
+ val df = sparkSession.read.format("mongodb").options(sparkOptions).load()
return df
}
}
@@ -863,7 +874,7 @@ class DataFrameFromTo(appConfig: AppConfig, pipeline: String) extends Serializab
}
uri = helper.buildMongoURI(vaultLogin, vaultPassword, cluster, replicaset, authenticationDatabase, database, collection, authenticationEnabled, sslEnabled)
- var sparkOptions = Map("uri" -> uri, "replaceDocument" -> replaceDocuments.toString, "ordered" -> ordered.toString)
+ var sparkOptions = Map("spark.mongodb.write.connection.uri" -> uri, "replaceDocument" -> replaceDocuments.toString, "ordered" -> ordered.toString)
if (maxBatchSize != null)
sparkOptions = sparkOptions ++ Map("maxBatchSize" -> maxBatchSize)
@@ -871,18 +882,28 @@ class DataFrameFromTo(appConfig: AppConfig, pipeline: String) extends Serializab
sparkOptions = sparkOptions ++ jsonObjectPropertiesToMap(addlSparkOptions)
}
- val writeConfig = WriteConfig(sparkOptions)
- if (documentfromjsonfield.toBoolean) {
+ val writeConfig = sparkOptions
- import com.mongodb.spark._
- import org.bson.Document
- import sparkSession.implicits._
- val rdd = df.select(jsonfield).map(r => r.getString(0)).rdd
- rdd.map(Document.parse).saveToMongoDB(writeConfig)
- }
- else {
- MongoSpark.save(df, writeConfig)
+ if (documentfromjsonfield.toBoolean) {
+ // Parse JSON strings from the specified field into a proper DataFrame with schema
+ // The MongoDB Spark Connector v10.x expects a DataFrame with typed columns, not Kryo-encoded BSON
+ val jsonRDD = df.select(jsonfield).rdd.map(row => row.getString(0))
+ val parsedDF = sparkSession.read.json(sparkSession.createDataset(jsonRDD)(Encoders.STRING))
+
+ // Write the parsed DataFrame to MongoDB
+ parsedDF.write
+ .format("mongodb")
+ .options(writeConfig)
+ .mode("append")
+ .save()
+ } else {
+ // Save the DataFrame directly to MongoDB
+ df.write
+ .format("mongodb")
+ .options(writeConfig)
+ .mode("append")
+ .save()
}
}
@@ -911,8 +932,11 @@ class DataFrameFromTo(appConfig: AppConfig, pipeline: String) extends Serializab
}
- val uri = new MongoClientURI(helper.buildMongoURI(vaultLogin, vaultPassword, cluster, null, authenticationDatabase, database, collection, authenticationEnabled, sslEnabled))
- val mongoClient = new MongoClient(uri)
+ import com.mongodb.client.{MongoClient, MongoClients, MongoDatabase}
+ import org.bson.Document
+
+ val uri = helper.buildMongoURI(vaultLogin, vaultPassword, cluster, null, authenticationDatabase, database, collection, authenticationEnabled, sslEnabled)
+ val mongoClient: MongoClient = MongoClients.create(uri)
val data = mongoClient.getDatabase(database)
val response = data.runCommand(org.bson.Document.parse(runCommand))
@@ -1218,8 +1242,8 @@ class DataFrameFromTo(appConfig: AppConfig, pipeline: String) extends Serializab
println("Properties Passed:" + properties)
val defaultConfigs = Map(
- "spark.sql.hive.caseSensitiveInferenceMode" -> "INFER_ONLY",
- "spark.sql.hive.metastore.version" -> "1.2.1",
+ "spark.sql.hive.caseSensitiveInferenceMode" -> "NEVER_INFER",
+ //"spark.sql.hive.metastore.version" -> "1.2.1",
"spark.sql.hive.metastore.jars" -> "builtin"
)
val parsedProperties = properties.map { jsonObj =>
@@ -1455,5 +1479,101 @@ class DataFrameFromTo(appConfig: AppConfig, pipeline: String) extends Serializab
.save()
}
+
+ import scala.jdk.CollectionConverters._
+
+ def jsonObjectToMap(jsonObject: JSONObject): Map[String, String] = {
+ jsonObject.keys.asScala.map(key => key.toString -> jsonObject.getString(key.toString)).toMap
+ }
+
+ def dataFrameToIceberg(sparkSession: SparkSession, df: org.apache.spark.sql.DataFrame,
+ table: String, database: String, saveMode: String, isMergeInto: Boolean,
+ mergeIntoSQL: String, sparkoptions: Option[JSONObject]): Unit = {
+
+ IcebergUtils.configureIcebergCatalog(sparkSession)
+
+ sparkSession.sql("use " + database)
+
+ /*
+ allow to insert the data to target table only if these conditions met
+ case-1: source dataframe columns <= target dataframe columns
+ case-2: source dataframe columns and datatypes == target dataframe columns and datatypes
+ */
+ val targetDf = sparkSession.sql(s"SELECT * FROM $table limit 100")
+ val srcDf = df
+ validateIcebergSchema(srcDf, targetDf)
+
+ if (isMergeInto) {
+ //validating MergeIntoQuery
+ if (mergeIntoSQL.trim.toLowerCase.startsWith("merge into")) {
+ //get the source table from User given SQL string
+ val srcTable = mergeIntoSQL.split(" ").take(6).last
+
+ df.createOrReplaceTempView(srcTable)
+ sparkSession.sql(mergeIntoSQL)
+ }else {
+ throw new IllegalArgumentException("Validation Error: mergeinto sql query is not valid, should start with 'MERGE INTO'. Expected format: " +
+ "MERGE INTO target USING source ON target. = source. WHEN MATCHED THEN UPDATE SET * WHEN NOT MATCHED THEN INSERT *")
+ }
+ } else{
+ var extraConfigOptions= Map.empty[String, String]
+ sparkoptions match {
+ case Some(jsonObj) if jsonObj != null =>
+ extraConfigOptions = jsonObjectPropertiesToMap(sparkoptions.get)
+ case _ =>
+ extraConfigOptions = extraConfigOptions ++ Map("overwrite-mode" -> "dynamic")
+ }
+
+ if (saveMode.toLowerCase == "overwrite") {
+ df.write
+ .format("iceberg")
+ .options(extraConfigOptions)
+ .mode(saveMode)
+ .insertInto(table)
+ } else {
+ df.write
+ .format("iceberg")
+ .mode(saveMode)
+ .insertInto(table)
+ }
+ }
+ }
+
+ import scala.collection.mutable.Map
+
+ def icebergToDataFrame(sparkSession: org.apache.spark.sql.SparkSession,
+ query: String): org.apache.spark.sql.DataFrame = {
+ IcebergUtils.configureIcebergCatalog(sparkSession)
+
+ sparkSession.sql(query)
+ }
+
+ import org.apache.spark.sql.types._
+
+ def validateIcebergSchema(srcDf: org.apache.spark.sql.DataFrame, tgtDf: org.apache.spark.sql.DataFrame): Unit = {
+
+ val srcSchema = srcDf.schema
+ val tgtSchema = tgtDf.schema
+
+ val srcFields = srcSchema.fields.map(f => f.name.toLowerCase -> f.dataType).toMap
+ val tgtFields = tgtSchema.fields.map(f => f.name.toLowerCase -> f.dataType).toMap
+
+ // Check if src has more columns than tgt (which is not allowed)
+ if (srcFields.size > tgtFields.size) {
+ throw new IllegalArgumentException("Source DataFrame has more columns than target DataFrame")
+ }
+
+ // Check if all src columns exist in tgt and types match
+ srcFields.foreach { case (srcCol, srcType) =>
+ tgtFields.get(srcCol) match {
+ case Some(tgtType) if tgtType == srcType => // All good
+ case Some(tgtType) =>
+ throw new IllegalArgumentException(s"Data type mismatch for column '$srcCol'. Src: $srcType, Tgt: $tgtType")
+ case None =>
+ throw new IllegalArgumentException(s"Column '$srcCol' is missing in target schema")
+ }
+ }
+ }
+
}
diff --git a/core/src/main/scala/core/DataPull.scala b/core/src/main/scala/core/DataPull.scala
index ba969c90..8dd7dc3e 100644
--- a/core/src/main/scala/core/DataPull.scala
+++ b/core/src/main/scala/core/DataPull.scala
@@ -21,10 +21,11 @@ import java.nio.ByteBuffer
import java.time._
import java.util.{Scanner, UUID}
-import com.datastax.driver.core.utils.UUIDs
+import com.datastax.oss.driver.api.core.uuid.Uuids
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory
-import com.mongodb.spark.sql.fieldTypes.Binary
+import org.bson.types.Binary
+import org.apache.spark.sql.types._
import config.AppConfig
import helper._
import javax.net.ssl._
@@ -39,8 +40,10 @@ import security._
import scala.collection.mutable.ListBuffer
import scala.util.control.Breaks.breakable
+//import com.expedia.hdw.common.parallax.ParallaxHash
+
// Main class
-object DataPull {
+object DataPull extends Serializable {
def main(args: Array[String]): Unit = {
@@ -123,15 +126,21 @@ object DataPull {
.config("" + config.executor, config.interval)
.config("" + config.failures, no_of_retries)
.config("fs." + s3Prefix + ".multiobjectdelete.enable", true)
- .config("spark.sql.hive.metastore.version", "1.2.1")
+ //.config("spark.sql.hive.metastore.version", "1.2.1")
.config("spark.sql.hive.metastore.jars", "builtin")
- .config("spark.sql.hive.caseSensitiveInferenceMode", "INFER_ONLY")
+ .config("spark.sql.hive.caseSensitiveInferenceMode", "NEVER_INFER")
+ .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
.enableHiveSupport()
.getOrCreate()
val helper = new Helper(config)
- ec2Role = helper.GetEC2Role()
+ try {
+ ec2Role = helper.GetEC2Role()
+ } catch {
+ case e: Exception =>
+ println("Warning: Unable to retrieve EC2 role via IMDS, defaulting to 'local'. Error: " + e.getMessage)
+ }
}
applicationId = sparkSession.sparkContext.applicationId
@@ -140,6 +149,7 @@ object DataPull {
sparkSession.udf.register("binaryToUUID", binaryToUUID _)
sparkSession.udf.register("uuid", uuid _)
sparkSession.udf.register("binaryToJUUID", binaryToJUUID _)
+// sparkSession.udf.register("stringToParallaxHash", stringToParallaxHash _)
stepSubmissionTime = Instant.now().toString
@@ -296,10 +306,27 @@ object DataPull {
val keys = properties.keys()
while (keys.hasNext()) {
val key = keys.next().toString()
- sparkSession.conf.set(key, properties.getString(key))
+ try {
+ sparkSession.conf.set(key, properties.getString(key))
+ } catch {
+ case e: org.apache.spark.sql.AnalysisException =>
+ println(s"Warning: Cannot set static Spark config '$key' at runtime, skipping. ${e.getMessage}")
+ }
+ }
+ // Spark 3.x: if writeLegacyFormat=true (INT96 timestamps), automatically inject rebase
+ // modes to LEGACY to prevent SparkUpgradeException on write. Only set if the pipeline
+ // has not explicitly provided its own value.
+ if (properties.optString("spark.sql.parquet.writeLegacyFormat", "").equalsIgnoreCase("true")) {
+ if (properties.optString("spark.sql.parquet.int96RebaseModeInWrite", "").isEmpty) {
+ sparkSession.conf.set("spark.sql.parquet.int96RebaseModeInWrite", "LEGACY")
+ }
+ if (properties.optString("spark.sql.parquet.int96RebaseModeInRead", "").isEmpty) {
+ sparkSession.conf.set("spark.sql.parquet.int96RebaseModeInRead", "LEGACY")
+ }
}
}
+
def setAWSCredentialsByPrefix(sparkSession: org.apache.spark.sql.SparkSession, sourceDestinationMap: Map[String, String], s3Prefix: String): Unit = {
if (sourceDestinationMap.contains("awssecretaccesskey") && sourceDestinationMap("awssecretaccesskey") != "") {
sparkSession.sparkContext.hadoopConfiguration.set("fs." + s3Prefix + ".access.key", sourceDestinationMap("awsaccesskeyid"))
@@ -356,7 +383,7 @@ object DataPull {
def uuid(): String = {
- UUIDs.timeBased().toString
+ Uuids.timeBased().toString
}
@@ -375,6 +402,7 @@ object DataPull {
}
}
+ /*
def uuidToBinary(uuid_key: String): Binary = {
if (uuid_key == null) null
@@ -391,6 +419,19 @@ object DataPull {
return binaryUuid
}
}
+ */
+
+def uuidToBinary(uuid_key: String): Array[Byte] = {
+ if (uuid_key == null) null
+ else {
+ val uuid = UUID.fromString(uuid_key)
+ val bb = ByteBuffer.allocate(16)
+ bb.putLong(uuid.getMostSignificantBits)
+ bb.putLong(uuid.getLeastSignificantBits)
+ bb.array()
+ }
+}
+
def binaryToUUID(byte: Array[Byte]): String = {
@@ -413,6 +454,13 @@ object DataPull {
returnMap
}
+// def stringToParallaxHash(stringData: String): String = {
+// if (stringData == null) null
+// else {
+// ParallaxHash.parallaxHash().hash(stringData.toLowerCase)
+// }
+// }
+
/**
* Binary data to JUUID String representation
* Based on: https://github.com/mongodb/mongo-csharp-driver/blob/master/uuidhelpers.js
diff --git a/core/src/main/scala/core/Migration.scala b/core/src/main/scala/core/Migration.scala
index a4ff35e6..21beb8b0 100644
--- a/core/src/main/scala/core/Migration.scala
+++ b/core/src/main/scala/core/Migration.scala
@@ -42,39 +42,13 @@ class Migration extends SparkListener {
val secretStoreDefaultValue: String = "vault"
var appConfig: AppConfig = null;
- def migrate(migrationJSONString: String, reportEmailAddress: String, migrationId: String, verifymigration: Boolean, reportCounts: Boolean, no_of_retries: Int, custom_retries: Boolean, migrationLogId: String, isLocal: Boolean, preciseCounts: Boolean, appConfig: AppConfig, pipeline: String): Map[String, String] = {
+ def migrate(migrationJSONString: String, reportEmailAddress: String, migrationId: String, verifymigration: Boolean, reportCounts: Boolean, no_of_retries: Int, custom_retries: Boolean, migrationLogId: String, sparkSession: SparkSession, preciseCounts: Boolean, appConfig: AppConfig, pipeline: String): Map[String, String] = {
val s3TempFolderDeletionError = StringBuilder.newBuilder
val reportRowHtml = StringBuilder.newBuilder
val migration: JSONObject = new JSONObject(migrationJSONString)
val dataPullLogs = new DataPullLog(appConfig, pipeline)
this.appConfig = appConfig;
val helper = new Helper(appConfig)
- var sparkSession: SparkSession = null
-
- if (isLocal) {
- sparkSession = SparkSession.builder.master("local[*]")
- .config("spark.scheduler.mode", "FAIR")
- .appName("DataPull - local")
- .config("spark.network.timeout", "10000s")
- .config("spark.executor.heartbeatInterval", "1000s")
- .config("spark.sql.broadcastTimeout", 36000)
- // .config("spark.driver.bindAddress","127.0.0.1")
- .getOrCreate()
- } else {
- sparkSession = SparkSession.builder //.master("local[*]")
- .config("spark.scheduler.mode", "FAIR")
- .appName("DataPull")
- .config("spark.network.timeout", "10000s")
- .config("spark.executor.heartbeatInterval", "1000s")
- .config("spark.sql.broadcastTimeout", 36000)
- .config("spark.task.maxFailures", no_of_retries)
- .config("fs.s3a.multiobjectdelete.enable", true)
- .config("spark.sql.hive.metastore.version", "1.2.1")
- .config("spark.sql.hive.metastore.jars", "builtin")
- .config("spark.sql.hive.caseSensitiveInferenceMode", "INFER_ONLY")
- .enableHiveSupport()
- .getOrCreate()
- }
var sources = new JSONArray()
var destination = new JSONObject()
@@ -164,7 +138,16 @@ class Migration extends SparkListener {
}
aliases += (if (selectedSource.has("alias")) selectedSource.getString("alias") else "")
- df.createOrReplaceTempView(store)
+ // When caseSensitive=true, JDBC drivers (Teradata, MSSQL) return UPPERCASE column names.
+ // Spark 3.x strictly enforces case, so sql.query references in lowercase would fail with
+ // AnalysisException. Normalize all column names to lowercase before registering the temp
+ // view so that the sql.query works regardless of what the JDBC driver returns.
+ val viewDf = if (sparkSession.conf.getOption("spark.sql.caseSensitive").exists(_.equalsIgnoreCase("true"))) {
+ df.toDF(df.columns.map(_.toLowerCase): _*)
+ } else {
+ df
+ }
+ viewDf.createOrReplaceTempView(store)
platforms += platform
}
@@ -210,7 +193,7 @@ class Migration extends SparkListener {
else if (destinationMap("platform") == "mssql" || destinationMap("platform") == "mysql" || destinationMap("platform") == "postgres" || destinationMap("platform") == "oracle" || destinationMap("platform") == "teradata") {
dataframeFromTo.dataFrameToRdbms(
platform = destinationMap("platform"),
- url = destinationMap("url"),
+ url = destinationMap.getOrElse("url", ""),
awsEnv = destinationMap("awsenv"),
server = destinationMap("server"),
database = destinationMap("database"),
@@ -385,6 +368,25 @@ class Migration extends SparkListener {
dft,
sparkSession
)
+ } else if (destinationMap("platform") == "iceberg") {
+ val extraSparkOptions = (if (destination.optJSONObject("sparkoptions") == null) None else Some(destination.optJSONObject("sparkoptions")))
+
+ // Support dynamic partition overwrite via partitioncolumns
+ val partitionColumns = destinationMap.get("partitioncolumns").map(_.split(",").map(_.trim).toSeq).getOrElse(Seq.empty)
+ if (partitionColumns.nonEmpty && destinationMap.getOrElse("savemode", "append").equalsIgnoreCase("overwrite")) {
+ IcebergUtils.processTablePartitionsDynamic(sparkSession, dft, destinationMap("database"), destinationMap("table"), partitionColumns)
+ } else {
+ dataframeFromTo.dataFrameToIceberg(sparkSession, dft, destinationMap("table"), destinationMap("database"), destinationMap.getOrElse("savemode", "append"),
+ destinationMap.getOrElse("ismergeinto", "false").toBoolean, destinationMap.getOrElse("mergeintosql", " ")
+ , extraSparkOptions)
+ }
+
+ // Post-write deduplication if configured
+ val dedupKeyColumns = destinationMap.get("dedupkeycolumns").map(_.split(",").map(_.trim).toSeq).getOrElse(Seq.empty)
+ if (dedupKeyColumns.nonEmpty) {
+ val orderByCol = destinationMap.get("deduporderbycolumn")
+ IcebergUtils.icebergTableDeduplication(sparkSession, destinationMap("database"), destinationMap("table"), dedupKeyColumns, orderByCol)
+ }
}
if (dft.isStreaming) {
@@ -685,6 +687,8 @@ class Migration extends SparkListener {
propertiesMap("vaultenv"),
propertiesMap.getOrElse("secretstore", secretStoreDefaultValue),
sparkSession)
+ } else if (platform == "iceberg") {
+ dataframeFromTo.icebergToDataFrame(sparkSession, propertiesMap("query"))
}
else {
sparkSession.emptyDataFrame
@@ -789,12 +793,11 @@ class Migration extends SparkListener {
lengthOfArray = command.length()
for (i <- 0 to lengthOfArray - 1) {
-
val dataframeFromTo = new DataFrameFromTo(appConfig, pipeline)
if (platform == "mssql" || platform == "mysql" || platform == "oracle" || platform == "postgres" || platform == "teradata") {
dataframeFromTo.rdbmsRunCommand(
platform = platform,
- url = propertiesMap("url"),
+ url = propertiesMap.getOrElse("url", ""),
awsEnv = propertiesMap("awsenv"),
server = propertiesMap("server"),
port = propertiesMap.getOrElse("port", null),
@@ -895,6 +898,8 @@ class Migration extends SparkListener {
propertiesMap = propertiesMap ++ jsonObjectPropertiesToMap(List("cluster", "database", "authenticationdatabase", "collection", "login", "password"), platformObject)
} else if (platform == "kafka") {
propertiesMap = propertiesMap ++ jsonObjectPropertiesToMap(List("bootstrapServers", "schemaRegistries", "topic", "keyField", "keyFormat"), platformObject)
+ } else if (platform == "iceberg") {
+ propertiesMap = propertiesMap ++ jsonObjectPropertiesToMap(List("cluster", "clustertype", "database", "table"), platformObject)
}
htmlString.append("")
propertiesMap.filter((t) => t._2 != "" && t._1 != "login" && t._1 != "password" && t._1 != "awsaccesskeyid" && t._1 != "awssecretaccesskey").foreach(i => htmlString.append("- " + i._1 + "
- " + i._2 + "
"))
diff --git a/core/src/main/scala/helper/Helper.scala b/core/src/main/scala/helper/Helper.scala
index bcb3b941..4ba4322c 100644
--- a/core/src/main/scala/helper/Helper.scala
+++ b/core/src/main/scala/helper/Helper.scala
@@ -73,11 +73,21 @@ class Helper(appConfig: AppConfig) {
}
def GetEC2pkcs7(): String = {
- var pkcs7 = getHttpResponse("http://169.254.169.254/latest/dynamic/instance-identity/pkcs7", 100000, 10000, "GET").ResponseBody
+ var pkcs7Properties = Map[String, String]()
+ pkcs7Properties += ("X-aws-ec2-metadata-token" -> imdsv2Token())
+ var pkcs7 = getHttpResponse("http://169.254.169.254/latest/dynamic/instance-identity/pkcs7", 100000, 10000, "GET", httpHeaders = pkcs7Properties).ResponseBody
pkcs7 = pkcs7.split('\n').mkString
pkcs7
}
+ def imdsv2Token(): String = {
+ var tokenProperties = Map[String, String]()
+ tokenProperties += ("X-aws-ec2-metadata-token-ttl-seconds" -> "21600")
+
+ var token = getHttpResponse(url = "http://169.254.169.254/latest/api/token", 100000, 10000, "PUT", httpHeaders = tokenProperties).ResponseBody
+ token
+ }
+
/**
* Returns the text (content) and response code from a REST URL as a String and int.
*
@@ -127,6 +137,10 @@ class Helper(appConfig: AppConfig) {
connection.setRequestMethod(requestMethod)
httpHeaders.foreach(h => connection.setRequestProperty(h._1, h._2))
+ if (requestMethod.equalsIgnoreCase("PUT") || requestMethod.equalsIgnoreCase("POST")) {
+ connection.setDoOutput(true)
+ }
+
if (jsonBody != "") {
connection.setRequestMethod("POST")
@@ -157,7 +171,7 @@ class Helper(appConfig: AppConfig) {
vault_exception.append(tmp_string)
- if (retry_count == no_of_retries) {
+ if (retry_count >= no_of_retries) {
exceptions = true
throw ex
}
@@ -177,7 +191,9 @@ class Helper(appConfig: AppConfig) {
}
def GetEC2Role(): String = {
- var role = getHttpResponse("http://169.254.169.254/latest/meta-data/iam/security-credentials/", 100000, 10000, "GET").ResponseBody
+ var ec2RoleProperties = Map[String, String]()
+ ec2RoleProperties += ("X-aws-ec2-metadata-token" -> imdsv2Token())
+ var role = getHttpResponse("http://169.254.169.254/latest/meta-data/iam/security-credentials/", 100000, 10000, "GET", httpHeaders = ec2RoleProperties).ResponseBody
role
}
@@ -284,7 +300,8 @@ class Helper(appConfig: AppConfig) {
}
else {
driver = "com.microsoft.sqlserver.jdbc.SQLServerDriver"
- url = "jdbc:sqlserver://" + server + ":" + (if (port == null) "1433" else port) + ";database=" + database
+ url = "jdbc:sqlserver://" + server + ":" + (if (port == null) "1433" else port) + ";database=" + database +
+ ";encrypt=" + sslEnabled + ";trustServerCertificate=true"
}
}
else if (platform == "oracle") {
@@ -304,7 +321,7 @@ class Helper(appConfig: AppConfig) {
driver = "org.postgresql.Driver"
url = "jdbc:postgresql://" + server + ":" + (if (port == null) "5432" else port) + "/" + database + (if (sslEnabled == true) "?sslmode=require" else "")
}
- println("logging the URI: " + url)
+ //println("logging the URI: " + url)
Map("driver" -> driver, "url" -> url)
}
@@ -349,7 +366,7 @@ class Helper(appConfig: AppConfig) {
}
def ReplaceInlineExpressions(inputString: String, sparkSession: org.apache.spark.sql.SparkSession): String = {
- val RegexForInlineExpr = """inlineexpr\{\{(.*)}}""".r
+ val RegexForInlineExpr = """inlineexpr\{\{(.*?)}}""".r
val returnVal = RegexForInlineExpr.replaceAllIn(inputString, _ match { case RegexForInlineExpr(inlineExprr) => println(inlineExprr);
val df = sparkSession.sql(inlineExprr);
val rows = df.take(1);
@@ -358,7 +375,7 @@ class Helper(appConfig: AppConfig) {
} else ""
})
- val RegexForInlineSecret = """inlinesecret\{\{(.*)}}""".r
+ val RegexForInlineSecret = """inlinesecret\{\{(.*?)}}""".r
RegexForInlineSecret.replaceAllIn(returnVal, _ match { case RegexForInlineSecret(inlineExprr) => println(inlineExprr);
val inlineExprrAsJson = new JSONObject(new JSONObject("{\"data\": \"{" + inlineExprr + "}\"}").getString("data"))
if (inlineExprrAsJson.has("secretstore") && inlineExprrAsJson.has("secretname")) {
@@ -375,6 +392,7 @@ class Helper(appConfig: AppConfig) {
}
})
+
}
def ReplaceInlineExpressions(platformObject: JSONObject, optionalJsonPropertiesList:List[String]): JSONObject ={
@@ -386,7 +404,7 @@ class Helper(appConfig: AppConfig) {
val colType= jsonObjectPropertiesToMap(inlineexprforjdbcasJson).get("coltype")
val rs= dataframeFromTo.rdbmsRunCommand(
platform = platformObject.getString("platform"),
- url = propertiesMap("url"),
+ url = propertiesMap.getOrElse("url", ""),
awsEnv = propertiesMap("awsenv"),
server = propertiesMap("server"),
port = propertiesMap.getOrElse("port", null),
diff --git a/core/src/main/scala/helper/IcebergUtils.scala b/core/src/main/scala/helper/IcebergUtils.scala
new file mode 100644
index 00000000..304f07a9
--- /dev/null
+++ b/core/src/main/scala/helper/IcebergUtils.scala
@@ -0,0 +1,95 @@
+/* Copyright (c) 2019 Expedia Group.
+ * All rights reserved. http://www.homeaway.com
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package helper
+
+import org.apache.spark.sql.{DataFrame, SparkSession}
+import org.apache.spark.sql.functions._
+
+object IcebergUtils {
+
+ def configureIcebergCatalog(sparkSession: SparkSession): Unit = {
+ sparkSession.conf.set("spark.sql.catalog.iceberg.type", "hive")
+ sparkSession.conf.set("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkSessionCatalog")
+ sparkSession.conf.set("spark.sql.catalog.iceberg", "org.apache.iceberg.spark.SparkCatalog")
+ }
+
+ /**
+ * Loads the target Iceberg table as a DataFrame for inspection (schema, row count, partitioning).
+ * Returns a tuple of (target DataFrame, row count).
+ */
+ def loadTargetTable(sparkSession: SparkSession, database: String, table: String): (DataFrame, Long) = {
+ configureIcebergCatalog(sparkSession)
+ sparkSession.sql(s"use $database")
+ val targetDf = sparkSession.sql(s"SELECT * FROM $table")
+ val rowCount = targetDf.count()
+ (targetDf, rowCount)
+ }
+
+ /**
+ * Deduplicates an Iceberg table based on the specified key columns.
+ * Keeps the latest row per key using the orderByColumn (descending) to determine recency.
+ * If orderByColumn is not provided, an arbitrary row per key group is kept.
+ */
+ def icebergTableDeduplication(sparkSession: SparkSession, database: String, table: String,
+ keyColumns: Seq[String], orderByColumn: Option[String] = None): Unit = {
+ configureIcebergCatalog(sparkSession)
+ sparkSession.sql(s"use $database")
+
+ val keyColsStr = keyColumns.mkString(", ")
+ val orderClause = orderByColumn match {
+ case Some(col) => s"ORDER BY $col DESC"
+ case None => ""
+ }
+
+ val dedupViewName = s"__iceberg_dedup_${table.replaceAll("[^a-zA-Z0-9]", "_")}"
+
+ val dedupQuery =
+ s"""SELECT * FROM (
+ | SELECT *, ROW_NUMBER() OVER (PARTITION BY $keyColsStr $orderClause) as __dedup_rn
+ | FROM $table
+ |) WHERE __dedup_rn = 1""".stripMargin
+
+ val dedupDf = sparkSession.sql(dedupQuery).drop("__dedup_rn")
+ dedupDf.createOrReplaceTempView(dedupViewName)
+
+ // Overwrite the table with deduplicated data
+ dedupDf.write
+ .format("iceberg")
+ .mode("overwrite")
+ .insertInto(table)
+ }
+
+ /**
+ * Processes dynamic partition overwrite for Iceberg tables.
+ * Only overwrites partitions that exist in the source DataFrame, leaving other partitions untouched.
+ * partitionColumns specifies which columns define the partitioning scheme.
+ */
+ def processTablePartitionsDynamic(sparkSession: SparkSession, df: DataFrame,
+ database: String, table: String,
+ partitionColumns: Seq[String]): Unit = {
+ configureIcebergCatalog(sparkSession)
+ sparkSession.sql(s"use $database")
+
+ sparkSession.conf.set("spark.sql.sources.partitionOverwriteMode", "dynamic")
+
+ df.write
+ .format("iceberg")
+ .option("overwrite-mode", "dynamic")
+ .mode("overwrite")
+ .insertInto(table)
+ }
+}