Skip to content
Open
Show file tree
Hide file tree
Changes from 18 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
29900cb
Adding default application tag 'datapullemr' when application tags ar…
Sep 11, 2025
98f4ed4
Merge branch 'homeaway:master' into master
BalamuruganDE Sep 22, 2025
42ee65c
Merge branch 'homeaway:master' into master
BalamuruganDE Oct 7, 2025
f3173ed
Merge branch 'homeaway:master' into master
BalamuruganDE Feb 9, 2026
3d97a00
Addressing "key not found: url" Exception
Feb 16, 2026
11c6376
Merge branch 'homeaway:master' into master
BalamuruganDE Feb 16, 2026
6b15292
Revoking URL changes
Feb 16, 2026
f1e9e0e
Merge branch 'homeaway:master' into master
BalamuruganDE Mar 5, 2026
9faa47f
Upgrade DataPull core to Spark 3.5.0, Scala 2.12, Java 11
Mar 5, 2026
42fce93
Test commit
Mar 5, 2026
a17eb09
Updating pom.xml to overcome compilation
Mar 6, 2026
95dfd11
Adding code changes to address runtime issue with EMR 7.12.0
Mar 6, 2026
5decf8d
Retaining Single jsonfield column with full document JSON
Mar 9, 2026
57c8329
Addressing uuidToBinary, Sequencefile issue
Mar 9, 2026
f7887b9
Addressing tmpFileLocation & documentfromjsonfield json response issue
Mar 9, 2026
2b79dcd
Updating elasticsearch deps version
Mar 9, 2026
01e6262
Adding more Iceberg features and addressing null subnet edge cases
Mar 10, 2026
cc212c1
Addressing IcebergUtils compilation errors
Mar 10, 2026
3fa7a79
Fix double SparkContext creation causing YARN Promise crash
Mar 11, 2026
431b153
Addressing bson encoder strict validation issue
Mar 11, 2026
5325d4e
Updating elasticsearch-spark-30_2.1 to 8.17.10
Mar 11, 2026
fb642f0
Updating elasticsearch-spark-30_2.1 to 8.17.10
Mar 11, 2026
acc6c64
changing greedy (.*) to non-greedy/lazy (.*?)
Mar 16, 2026
8895b0e
Addressing IMDSv2 empty post
Mar 16, 2026
0b8a98d
Addressing spark 3.5 strict static config validation
Mar 18, 2026
249c60f
Updating default instance type m7g as part spark 3.5 upgrade
Mar 18, 2026
1e44c2b
Addressing "key not found: url" Exception
Apr 1, 2026
abaf140
Dummy commit
Apr 1, 2026
495c514
Dummy commit 2
Apr 1, 2026
a6b4210
Addressing below issues/bugs
BalamuruganDE Apr 29, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 56 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,62 @@ The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/)
and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.html)


## [0.1.91] - 2026-03-05
Upgrade DataPull core to Spark 3.5.0, Scala 2.12, Java 11

Dependency upgrades (pom.xml):
- Spark 2.4.6 -> 3.5.0, Scala 2.11 -> 2.12, Hadoop 2.10.1 -> 3.2.1
- MongoDB Spark Connector 10.4.0 (new format API)
- Cassandra Spark Connector 3.5.0 (driver 4.x)
- Iceberg 1.5.0 (new platform support)
- Elasticsearch 7.17.0 with REST clients
- mssql-jdbc 11.2.3.jre11, ojdbc8 21.9.0.0, terajdbc4 17.20.00.12
- ABRis 6.4.0, Snowflake 2.10.0, PostgreSQL 42.6.0
- Guava shade plugin for Spark 3.5 classloader compatibility
- Log4j 2.17.1 (CVE-2021-44228 fix retained)
- Added expediahotelloader with Spark/Scala exclusions

MongoDB modernization (DataFrameFromTo.scala):
- Replaced MongoSpark/ReadConfig/WriteConfig with format("mongodb") API
- Replaced MongoClient/MongoClientURI with MongoClients.create()
- Updated mongodbToDataFrame, dataFrameToMongodb, mongoRunCommand

New Iceberg support (DataFrameFromTo.scala, Migration.scala):
- Added dataFrameToIceberg with MERGE INTO SQL support
- Added icebergToDataFrame for SQL-based reads
- Added iceberg as source/destination platform in Migration

Spark 3.5 compatibility (DataPull.scala):
- Cassandra UUIDs -> Uuids (driver 4.x)
- Binary type from org.bson.types.Binary
- DataPull object extends Serializable
- Hive caseSensitiveInferenceMode: INFER_ONLY -> NEVER_INFER
- Hive metastore.version config commented out

IMDSv2 and MSSQL fixes (Helper.scala):
- Added imdsv2Token() for EC2 metadata token-based access
- Updated GetEC2pkcs7() and GetEC2Role() with IMDSv2 headers
- MSSQL JDBC URL: added encrypt and trustServerCertificate params
- Commented out URI logging to prevent credential exposure

All homeaway bug fixes preserved (v0.1.83-0.1.90):
- ConcurrentHashMap for thread-safe stepPipelineMap
- Subnet NULL/invalid validation with default pool fallback
- Credentials display fix, duplicate tags fix
- Default 'datapullemr' application tag
- url parameter in RDBMS methods, KMS encryption support
- setExternalSparkConf, ReplaceInlineExpressions

### Changed
core/pom.xml
core/src/main/scala/core/DataFrameFromTo.scala
core/src/main/scala/core/DataPull.scala
core/src/main/scala/core/Migration.scala
core/src/main/scala/core/Controller.scala
core/src/main/scala/helper/Helper.scala
core/src/main/resources/Samples/Input_Json_Specification.json


## [0.1.90] - 2026-02-16
Addressing "key not found: url" Exception
### Changed
Expand Down
2 changes: 1 addition & 1 deletion api/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.16.20</version>
<version>1.18.42</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public class EMRProperties {
@Value("${job_flow_role:emr_ec2_datapull_role}")
private String jobFlowRole;

@Value("${emr_release:emr-5.36.2}")
@Value("${emr_release:emr-7.12.0}")
private String emrRelease;

@Value( "${bootstrap_folder_path:}" )
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ private void runDataPull(String json, boolean isStart, boolean validateJson) thr
log.debug("runDataPull <- return");
}

List<String> rotateSubnets(){
synchronized List<String> rotateSubnets(){

if(subnets.isEmpty()){
subnets= getSubnet();
Expand All @@ -268,7 +268,8 @@ List<String> rotateSubnets(){
subnets.clear();
subnets.addAll(subnetIds_shuffled);
}
return subnets;
subnets.removeIf(s -> StringUtils.isBlank(s));
return new ArrayList<>(subnets);
}

Map<String,List<DescribeStepRequest>> getStepForPipeline(){
Expand All @@ -279,7 +280,9 @@ public List<String> getSubnet(){

List<String> subnetIds = new ArrayList<>();

subnetIds.add(dataPullProperties.getApplicationSubnet1());
if (StringUtils.isNotBlank(dataPullProperties.getApplicationSubnet1())) {
subnetIds.add(dataPullProperties.getApplicationSubnet1());
}

if (StringUtils.isNotBlank(dataPullProperties.getApplicationSubnet2())) {
subnetIds.add(dataPullProperties.getApplicationSubnet2());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ private RunJobFlowResult runTaskInNewCluster(final AmazonElasticMapReduce emr, f
sparkSubmitParamsList = (ArrayList<String>) this.prepareSparkSubmitParams(sparkSubmitParams);
} else {
List<String> sparkBaseParams = new ArrayList<>();
sparkBaseParams.addAll(toList(new String[]{"spark-submit", "--conf", "spark.default.parallelism=3", "--conf", "spark.storage.blockManagerSlaveTimeoutMs=1200s", "--conf", "spark.executor.heartbeatInterval=900s", "--conf", "spark.driver.extraJavaOptions=-Djavax.net.ssl.trustStore=/etc/pki/java/cacerts/ -Djavax.net.ssl.trustStorePassword=changeit", "--conf", "spark.executor.extraJavaOptions=-Djavax.net.ssl.trustStore=/etc/pki/java/cacerts/ -Djavax.net.ssl.trustStorePassword=changeit", "--packages", "org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.4,org.apache.spark:spark-avro_2.11:2.4.4", "--class", DataPullTask.MAIN_CLASS, jarPath}));
Comment thread
BalamuruganDE marked this conversation as resolved.
sparkBaseParams.addAll(toList(new String[]{"spark-submit", "--conf", "spark.default.parallelism=3", "--conf", "spark.storage.blockManagerSlaveTimeoutMs=1200s", "--conf", "spark.executor.heartbeatInterval=900s", "--conf", "spark.driver.extraJavaOptions=-Djavax.net.ssl.trustStore=/etc/pki/java/cacerts/ -Djavax.net.ssl.trustStorePassword=changeit", "--conf", "spark.executor.extraJavaOptions=-Djavax.net.ssl.trustStore=/etc/pki/java/cacerts/ -Djavax.net.ssl.trustStorePassword=changeit", "--packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0,org.apache.spark:spark-avro_2.12:3.5.0", "--class", DataPullTask.MAIN_CLASS, jarPath}));
sparkSubmitParamsList.addAll(sparkBaseParams);
}

Expand Down Expand Up @@ -409,9 +409,9 @@ private JobFlowInstancesConfig getJobFlowInstancesConfig(EMRProperties emrProper
.withInstanceTypeConfigs(workerInstanceTypeConfig)
.withTargetOnDemandCapacity(count);

System.out.println("Printing random subnet : " + subnets);
log.info("Printing random subnet : {}", subnets);

System.out.println("Printing selected subnet-ID for EMR cluster creation : " + subnets.get(0));
log.info("Printing selected subnet-ID for EMR cluster creation : {}", subnets.isEmpty() ? "N/A" : subnets.get(0));

final String masterSG = emrProperties.getEmrSecurityGroupMaster();
final String slaveSG = emrProperties.getEmrSecurityGroupSlave();
Expand All @@ -423,31 +423,27 @@ private JobFlowInstancesConfig getJobFlowInstancesConfig(EMRProperties emrProper
final String serviceAccessSecurityGroup = Objects.toString(
this.clusterProperties.getServiceAccessSecurityGroup(), serviceAccesss != null ? serviceAccesss : "");

if(StringUtils.isNotBlank(clusterProperties.getSubnetId())){
subnets.add(0,clusterProperties.getSubnetId());
}

// Introducing below logic to address null and invalid subnet issue
String getSubnetId = clusterProperties.getSubnetId();
String finalSubnetId;


if (StringUtils.isNotBlank(getSubnetId) && getSubnetId.startsWith("subnet-")) {
finalSubnetId = getSubnetId;
System.out.println("Subnet '" + finalSubnetId + "' provided by the user will be used for EMR cluster creation.");
log.info("Subnet '{}' provided by the user will be used for EMR cluster creation.", finalSubnetId);
} else {
if (StringUtils.isNotBlank(getSubnetId)) {
System.out.println("The user provided an invalid value '" + getSubnetId + "' for subnet. Hence, default subnet pool will be used for EMR creation.");
log.warn("The user provided an invalid value '{}' for subnet. Hence, default subnet pool will be used for EMR creation.", getSubnetId);
} else {
System.out.println("The user either provided a NULL value for the subnet or did not specify subnet in the payload. Hence, the default subnet pool will be used for EMR creation.");
log.info("The user either provided a NULL value for the subnet or did not specify subnet in the payload. Hence, the default subnet pool will be used for EMR creation.");
}

Set<String> subnetsDeduped = new LinkedHashSet<>(subnets);
subnets.clear();
subnets.addAll(subnetsDeduped);
if (subnets.isEmpty()) {
throw new IllegalStateException("No valid subnets available in the default subnet pool. Please configure at least one application_subnet property.");
}

finalSubnetId = subnets.get(0);
System.out.println("EMR cluster will be created using a subnet from the default subnet pool: " + finalSubnetId);
log.info("EMR cluster will be created using a subnet from the default subnet pool: {}", finalSubnetId);
}

final JobFlowInstancesConfig jobConfig = new JobFlowInstancesConfig()
Expand Down Expand Up @@ -496,7 +492,7 @@ private void runTaskOnExistingCluster(final String id, final String jarPath, fin
sparkSubmitParamsListOnExistingCluster = this.prepareSparkSubmitParams(sparkSubmitParams);
} else {
List<String> sparkBaseParams = new ArrayList<>();
sparkBaseParams.addAll(toList(new String[]{"spark-submit", "--conf", "spark.default.parallelism=3", "--conf", "spark.storage.blockManagerSlaveTimeoutMs=1200s", "--conf", "spark.executor.heartbeatInterval=900s", "--conf", "spark.driver.extraJavaOptions=-Djavax.net.ssl.trustStore=/etc/pki/java/cacerts/ -Djavax.net.ssl.trustStorePassword=changeit", "--conf", "spark.executor.extraJavaOptions=-Djavax.net.ssl.trustStore=/etc/pki/java/cacerts/ -Djavax.net.ssl.trustStorePassword=changeit", "--packages", "org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.4,org.apache.spark:spark-avro_2.11:2.4.4", "--class", DataPullTask.MAIN_CLASS, jarPath}));
sparkBaseParams.addAll(toList(new String[]{"spark-submit", "--conf", "spark.default.parallelism=3", "--conf", "spark.storage.blockManagerSlaveTimeoutMs=1200s", "--conf", "spark.executor.heartbeatInterval=900s", "--conf", "spark.driver.extraJavaOptions=-Djavax.net.ssl.trustStore=/etc/pki/java/cacerts/ -Djavax.net.ssl.trustStorePassword=changeit", "--conf", "spark.executor.extraJavaOptions=-Djavax.net.ssl.trustStore=/etc/pki/java/cacerts/ -Djavax.net.ssl.trustStorePassword=changeit", "--packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0,org.apache.spark:spark-avro_2.12:3.5.0", "--class", DataPullTask.MAIN_CLASS, jarPath}));
sparkSubmitParamsListOnExistingCluster.addAll(sparkBaseParams);
}

Expand Down
Loading