From 6d0cc68eb74c770f2e38e7aa3ae06e7ab4decdc1 Mon Sep 17 00:00:00 2001
From: Danny Chen
Date: Wed, 5 Sep 2018 16:10:09 -0700
Subject: [PATCH] Add .gitignore and code which was missed in first commit
---
.gitignore | 90 +++
checkstyles/marmaray-0.0.1.xml | 136 ++++
checkstyles/suppressions.xml | 13 +
marmaray-tools/pom.xml | 22 +
.../src/main/cli/toggleHDFSMetadataFile.py | 62 ++
.../marmaray/tools/HDFSMetadataPrinter.java | 83 ++
.../marmaray/tools/HDFSMetadataPruner.java | 120 +++
marmaray/build.gradle | 18 +
marmaray/config/sample.yaml | 45 ++
marmaray/pom.xml | 55 ++
.../com/uber/marmaray/common/AvroPayload.java | 87 +++
.../uber/marmaray/common/DispersalType.java | 29 +
.../uber/marmaray/common/FileSinkType.java | 28 +
.../marmaray/common/HoodieErrorPayload.java | 62 ++
.../com/uber/marmaray/common/IPayload.java | 27 +
.../uber/marmaray/common/PartitionType.java | 29 +
.../com/uber/marmaray/common/WorkUnit.java | 27 +
.../common/actions/IJobDagAction.java | 54 ++
.../common/actions/JobDagActions.java | 141 ++++
.../common/actions/ReporterAction.java | 59 ++
.../configuration/AwsConfiguration.java | 80 ++
.../CassandraSinkConfiguration.java | 266 +++++++
.../configuration/ConfigScopeResolver.java | 289 +++++++
.../common/configuration/Configuration.java | 227 ++++++
.../ErrorTableConfiguration.java | 123 +++
.../configuration/FileSinkConfiguration.java | 240 ++++++
.../configuration/HadoopConfiguration.java | 51 ++
.../configuration/HiveConfiguration.java | 71 ++
.../HiveSourceConfiguration.java | 67 ++
.../configuration/HoodieConfiguration.java | 524 +++++++++++++
.../HoodieIndexConfiguration.java | 207 +++++
.../configuration/KafkaConfiguration.java | 77 ++
.../KafkaSourceConfiguration.java | 83 ++
.../LockManagerConfiguration.java | 96 +++
.../RetryStrategyConfiguration.java | 54 ++
.../SimpleRetryStrategyConfiguration.java | 49 ++
.../configuration/ZookeeperConfiguration.java | 55 ++
.../converterresult/ConverterResult.java | 77 ++
.../data/AbstractDataConverter.java | 160 ++++
.../data/CassandraSinkCQLDataConverter.java | 200 +++++
.../data/CassandraSinkDataConverter.java | 201 +++++
.../data/DummyHoodieSinkDataConverter.java | 44 ++
.../data/FileSinkDataConverter.java | 109 +++
.../data/HoodieSinkDataConverter.java | 86 +++
.../data/KafkaSourceDataConverter.java | 79 ++
.../converters/data/SinkDataConverter.java | 37 +
.../converters/data/SourceDataConverter.java | 36 +
.../data/SparkSourceDataConverter.java | 109 +++
.../data/TSBasedHoodieSinkDataConverter.java | 79 ++
.../schema/AbstractSchemaConverter.java | 30 +
.../schema/CassandraSchemaConverter.java | 114 +++
.../schema/DataFrameSchemaConverter.java | 105 +++
.../marmaray/common/data/BinaryRawData.java | 34 +
.../uber/marmaray/common/data/ErrorData.java | 37 +
.../uber/marmaray/common/data/ForkData.java | 40 +
.../com/uber/marmaray/common/data/IData.java | 22 +
.../uber/marmaray/common/data/RDDWrapper.java | 54 ++
.../uber/marmaray/common/data/RawData.java | 30 +
.../marmaray/common/data/RawDataHelper.java | 32 +
.../uber/marmaray/common/data/ValidData.java | 26 +
.../marmaray/common/dataset/ErrorRecord.java | 51 ++
.../common/dataset/ExceptionRecord.java | 48 ++
.../marmaray/common/dataset/MetricRecord.java | 45 ++
.../marmaray/common/dataset/UtilRecord.java | 41 +
.../marmaray/common/dataset/UtilTable.java | 96 +++
.../exceptions/ForkOperationException.java | 31 +
.../exceptions/InvalidDataException.java | 38 +
.../exceptions/JobRuntimeException.java | 38 +
.../common/exceptions/MetadataException.java | 30 +
.../exceptions/MissingPropertyException.java | 31 +
.../common/exceptions/RetryException.java | 30 +
.../common/forkoperator/FilterFunction.java | 47 ++
.../common/forkoperator/ForkFunction.java | 94 +++
.../common/forkoperator/ForkOperator.java | 113 +++
.../uber/marmaray/common/job/DagPayload.java | 35 +
.../ExecutionTimeJobExecutionStrategy.java | 88 +++
.../common/job/IJobExecutionStrategy.java | 33 +
.../com/uber/marmaray/common/job/Job.java | 46 ++
.../com/uber/marmaray/common/job/JobDag.java | 189 +++++
.../uber/marmaray/common/job/JobManager.java | 368 +++++++++
.../uber/marmaray/common/job/JobSubDag.java | 236 ++++++
.../marmaray/common/job/SingleSinkSubDag.java | 58 ++
.../common/job/ThreadPoolService.java | 424 ++++++++++
.../common/job/ThreadPoolServiceTier.java | 22 +
.../common/metadata/AbstractValue.java | 28 +
.../metadata/HDFSDatePartitionManager.java | 137 ++++
.../common/metadata/HDFSMetadataManager.java | 278 +++++++
.../common/metadata/HDFSPartitionManager.java | 154 ++++
.../metadata/HoodieBasedMetadataManager.java | 195 +++++
.../common/metadata/IMetadataManager.java | 41 +
.../metadata/JobManagerMetadataTracker.java | 125 +++
.../common/metadata/MetadataConstants.java | 36 +
.../common/metadata/NoOpMetadataManager.java | 52 ++
.../marmaray/common/metadata/StringValue.java | 36 +
.../CassandraPayloadRDDSizeEstimator.java | 42 +
.../common/metrics/ChargebackMetricType.java | 26 +
.../common/metrics/DataFeedMetricNames.java | 65 ++
.../common/metrics/DataFeedMetrics.java | 86 +++
.../marmaray/common/metrics/DoubleMetric.java | 44 ++
.../common/metrics/IChargebackCalculator.java | 42 +
.../marmaray/common/metrics/IMetricable.java | 32 +
.../common/metrics/JobMetricNames.java | 33 +
.../common/metrics/JobMetricType.java | 35 +
.../marmaray/common/metrics/JobMetrics.java | 75 ++
.../marmaray/common/metrics/LongMetric.java | 44 ++
.../uber/marmaray/common/metrics/Metric.java | 76 ++
.../marmaray/common/metrics/TimerMetric.java | 82 ++
.../common/reporters/ConsoleReporter.java | 39 +
.../marmaray/common/reporters/IReporter.java | 29 +
.../marmaray/common/reporters/Reportable.java | 26 +
.../marmaray/common/reporters/Reporters.java | 59 ++
.../retry/IFunctionThrowsException.java | 28 +
.../marmaray/common/retry/IRetryStrategy.java | 29 +
.../common/retry/RetryableFunction.java | 79 ++
.../common/retry/SimpleRetryStrategy.java | 75 ++
.../common/schema/ISchemaService.java | 81 ++
.../common/schema/ISinkSchemaManager.java | 20 +
.../schema/cassandra/CassandraDataField.java | 33 +
.../schema/cassandra/CassandraPayload.java | 59 ++
.../schema/cassandra/CassandraSchema.java | 50 ++
.../cassandra/CassandraSchemaField.java | 79 ++
.../cassandra/CassandraSinkSchemaManager.java | 254 ++++++
.../common/schema/cassandra/ClusterKey.java | 65 ++
.../com/uber/marmaray/common/sinks/ISink.java | 36 +
.../common/sinks/SinkStatManager.java | 133 ++++
.../sinks/cassandra/CassandraClientSink.java | 128 ++++
.../sinks/cassandra/CassandraSSTableSink.java | 166 ++++
.../common/sinks/cassandra/CassandraSink.java | 149 ++++
.../common/sinks/file/AwsFileSink.java | 224 ++++++
.../marmaray/common/sinks/file/FileSink.java | 203 +++++
.../common/sinks/file/HdfsFileSink.java | 101 +++
.../common/sinks/hoodie/HoodieErrorSink.java | 87 +++
.../common/sinks/hoodie/HoodieSink.java | 528 +++++++++++++
.../sinks/hoodie/HoodieSinkOperations.java | 37 +
.../sinks/hoodie/HoodieWriteStatus.java | 44 ++
.../marmaray/common/sources/IRunState.java | 24 +
.../uber/marmaray/common/sources/ISource.java | 34 +
.../common/sources/IWorkUnitCalculator.java | 87 +++
.../common/sources/hive/HiveRunState.java | 30 +
.../common/sources/hive/HiveSource.java | 100 +++
.../hive/ParquetWorkUnitCalculator.java | 141 ++++
.../hive/ParquetWorkUnitCalculatorResult.java | 47 ++
.../kafka/KafkaBootstrapOffsetSelector.java | 46 ++
.../common/sources/kafka/KafkaRunState.java | 50 ++
.../common/sources/kafka/KafkaSource.java | 214 ++++++
.../kafka/KafkaWorkUnitCalculator.java | 433 +++++++++++
.../uber/marmaray/common/spark/SparkArgs.java | 58 ++
.../marmaray/common/spark/SparkFactory.java | 203 +++++
.../marmaray/utilities/ByteBufferUtil.java | 48 ++
.../marmaray/utilities/CommandLineUtil.java | 92 +++
.../uber/marmaray/utilities/ConfigUtil.java | 54 ++
.../marmaray/utilities/ConverterUtil.java | 55 ++
.../com/uber/marmaray/utilities/DateUtil.java | 40 +
.../marmaray/utilities/ErrorExtractor.java | 46 ++
.../marmaray/utilities/ErrorTableUtil.java | 173 +++++
.../com/uber/marmaray/utilities/FSUtils.java | 161 ++++
.../marmaray/utilities/GenericRecordUtil.java | 278 +++++++
.../HoodieSinkConverterErrorExtractor.java | 59 ++
.../utilities/HoodieSinkErrorExtractor.java | 62 ++
.../uber/marmaray/utilities/HoodieUtil.java | 78 ++
.../com/uber/marmaray/utilities/JobUtil.java | 48 ++
.../KafkaSourceConverterErrorExtractor.java | 54 ++
.../uber/marmaray/utilities/KafkaUtil.java | 228 ++++++
.../uber/marmaray/utilities/LockManager.java | 214 ++++++
.../com/uber/marmaray/utilities/MapUtil.java | 96 +++
.../marmaray/utilities/NumberConstants.java | 30 +
.../uber/marmaray/utilities/ScalaUtil.java | 58 ++
.../uber/marmaray/utilities/SchemaUtil.java | 103 +++
.../uber/marmaray/utilities/SparkUtil.java | 234 ++++++
.../uber/marmaray/utilities/StringTypes.java | 41 +
.../uber/marmaray/utilities/StringUtil.java | 44 ++
.../marmaray/utilities/TimestampInfo.java | 45 ++
.../cluster/CassandraClusterInfo.java | 56 ++
.../listener/SparkEventListener.java | 68 ++
.../utilities/listener/SparkJobTracker.java | 224 ++++++
.../utilities/listener/TimeoutManager.java | 155 ++++
.../java/com/uber/marmaray/TestSparkUtil.java | 51 ++
.../common/actions/TestJobDagActions.java | 244 ++++++
.../configuration/TestAwsConfiguration.java | 69 ++
.../TestCassandraSinkConfiguration.java | 151 ++++
.../TestConfigScopeResolver.java | 92 +++
.../configuration/TestConfiguration.java | 97 +++
.../TestErrorTableConfiguration.java | 65 ++
.../TestFileSinkConfiguration.java | 124 +++
.../TestHadoopConfiguration.java | 44 ++
.../TestHoodieConfiguration.java | 90 +++
.../TestHoodieIndexConfiguration.java | 91 +++
.../configuration/TestKafkaConfiguration.java | 67 ++
.../converters/TestAbstractDataConverter.java | 94 +++
.../TestCassandraDataFrameConverter.java | 108 +++
.../TestCassandraSchemaConverter.java | 156 ++++
.../TestDataFrameDataConverter.java | 166 ++++
.../TestDataFrameSchemaConverter.java | 102 +++
.../data/TestFileSinkDataConverter.java | 109 +++
.../marmaray/common/data/TestRDDWrapper.java | 41 +
.../common/dataset/TestUtilTable.java | 151 ++++
.../common/forkoperator/TestForkOperator.java | 209 +++++
...TestExecutionTimeJobExecutionStrategy.java | 113 +++
.../marmaray/common/job/TestJobSubDag.java | 161 ++++
.../common/job/TestThreadPoolService.java | 262 +++++++
.../common/metadata/HDFSTestConstants.java | 25 +
.../metadata/MemoryMetadataManager.java | 64 ++
.../TestHDFSDatePartitionManager.java | 237 ++++++
.../TestHDFSJobLevelMetadataTracker.java | 90 +++
.../metadata/TestHDFSMetadataManager.java | 133 ++++
.../metadata/TestHDFSPartitionManager.java | 245 ++++++
.../TestHoodieBasedMetadataManager.java | 149 ++++
.../common/metrics/TestDataFeedMetrics.java | 82 ++
.../common/metrics/TestJobMetrics.java | 76 ++
.../common/metrics/TestTimerMetric.java | 58 ++
.../common/retry/TestRetryableFunction.java | 100 +++
.../TestCassandraSinkSchemaManager.java | 366 +++++++++
.../schema/cassandra/TestClusterKey.java | 47 ++
.../common/sinks/TestSinkStatManager.java | 83 ++
.../cassandra/TestCassandraClientSink.java | 230 ++++++
.../cassandra/TestCassandraSSTableSink.java | 227 ++++++
.../cassandra/TestCassandraSinkUtil.java | 112 +++
.../common/sinks/file/FileSinkTestUtil.java | 94 +++
.../common/sinks/file/TestAwsFileSink.java | 267 +++++++
.../common/sinks/file/TestFileSink.java | 116 +++
.../common/sinks/file/TestHdfsFileSink.java | 153 ++++
.../common/sinks/hoodie/TestHoodieSink.java | 464 +++++++++++
.../common/sources/hive/TestHiveSource.java | 93 +++
.../hive/TestHiveSourceConfiguration.java | 66 ++
.../hive/TestParquetWorkUnitCalculator.java | 174 +++++
.../common/spark/TestSparkFactory.java | 91 +++
.../common/util/AbstractSparkTest.java | 88 +++
.../marmaray/common/util/AvroPayloadUtil.java | 137 ++++
.../common/util/CassandraTestConstants.java | 27 +
.../common/util/CassandraTestUtil.java | 57 ++
.../marmaray/common/util/FileHelperUtil.java | 33 +
.../common/util/FileSinkConfigTestUtil.java | 107 +++
.../marmaray/common/util/FileTestUtil.java | 39 +
.../marmaray/common/util/HiveTestUtil.java | 37 +
.../marmaray/common/util/KafkaTestHelper.java | 242 ++++++
.../util/MultiThreadTestCoordinator.java | 39 +
.../common/util/ParquetWriterUtil.java | 49 ++
.../marmaray/common/util/SchemaTestUtil.java | 100 +++
.../marmaray/common/util/SparkTestUtil.java | 51 ++
.../common/util/TestConverterUtil.java | 77 ++
.../marmaray/common/util/TestDateUtil.java | 43 ++
.../marmaray/common/util/TestFsUtils.java | 83 ++
.../marmaray/common/util/TestJobUtil.java | 37 +
.../marmaray/common/util/TestLockManager.java | 298 +++++++
.../marmaray/common/util/TestMapUtil.java | 39 +
.../common/util/TestParquetWriterUtil.java | 80 ++
.../marmaray/common/util/TestSchemaUtil.java | 64 ++
.../marmaray/utilities/TestKafkaUtil.java | 102 +++
marmaray/src/test/resources/cassandra.yaml | 586 ++++++++++++++
marmaray/src/test/resources/config.yaml | 16 +
.../src/test/resources/configWithScopes.yaml | 59 ++
.../src/test/resources/datacenter/datacenter | 1 +
.../expectedConfigWithBootstrapScope.yaml | 34 +
.../expectedConfigWithIncrementalScope.yaml | 33 +
.../test/resources/log4j-surefire.properties | 10 +
.../test/resources/schemas/StringPair.avsc | 9 +
marmaray/src/test/resources/setupTable.cql | 5 +
marmaray/src/test/resources/teardownTable.cql | 4 +
.../testData/testPartition/data.parquet | Bin 0 -> 454 bytes
pom.xml | 724 ++++++++++++++++++
260 files changed, 26644 insertions(+)
create mode 100644 .gitignore
create mode 100644 checkstyles/marmaray-0.0.1.xml
create mode 100644 checkstyles/suppressions.xml
create mode 100644 marmaray-tools/pom.xml
create mode 100755 marmaray-tools/src/main/cli/toggleHDFSMetadataFile.py
create mode 100644 marmaray-tools/src/main/java/com/uber/marmaray/tools/HDFSMetadataPrinter.java
create mode 100644 marmaray-tools/src/main/java/com/uber/marmaray/tools/HDFSMetadataPruner.java
create mode 100644 marmaray/build.gradle
create mode 100644 marmaray/config/sample.yaml
create mode 100644 marmaray/pom.xml
create mode 100644 marmaray/src/main/java/com/uber/marmaray/common/AvroPayload.java
create mode 100644 marmaray/src/main/java/com/uber/marmaray/common/DispersalType.java
create mode 100644 marmaray/src/main/java/com/uber/marmaray/common/FileSinkType.java
create mode 100644 marmaray/src/main/java/com/uber/marmaray/common/HoodieErrorPayload.java
create mode 100644 marmaray/src/main/java/com/uber/marmaray/common/IPayload.java
create mode 100644 marmaray/src/main/java/com/uber/marmaray/common/PartitionType.java
create mode 100644 marmaray/src/main/java/com/uber/marmaray/common/WorkUnit.java
create mode 100644 marmaray/src/main/java/com/uber/marmaray/common/actions/IJobDagAction.java
create mode 100644 marmaray/src/main/java/com/uber/marmaray/common/actions/JobDagActions.java
create mode 100644 marmaray/src/main/java/com/uber/marmaray/common/actions/ReporterAction.java
create mode 100644 marmaray/src/main/java/com/uber/marmaray/common/configuration/AwsConfiguration.java
create mode 100644 marmaray/src/main/java/com/uber/marmaray/common/configuration/CassandraSinkConfiguration.java
create mode 100644 marmaray/src/main/java/com/uber/marmaray/common/configuration/ConfigScopeResolver.java
create mode 100644 marmaray/src/main/java/com/uber/marmaray/common/configuration/Configuration.java
create mode 100644 marmaray/src/main/java/com/uber/marmaray/common/configuration/ErrorTableConfiguration.java
create mode 100644 marmaray/src/main/java/com/uber/marmaray/common/configuration/FileSinkConfiguration.java
create mode 100644 marmaray/src/main/java/com/uber/marmaray/common/configuration/HadoopConfiguration.java
create mode 100644 marmaray/src/main/java/com/uber/marmaray/common/configuration/HiveConfiguration.java
create mode 100644 marmaray/src/main/java/com/uber/marmaray/common/configuration/HiveSourceConfiguration.java
create mode 100644 marmaray/src/main/java/com/uber/marmaray/common/configuration/HoodieConfiguration.java
create mode 100644 marmaray/src/main/java/com/uber/marmaray/common/configuration/HoodieIndexConfiguration.java
create mode 100644 marmaray/src/main/java/com/uber/marmaray/common/configuration/KafkaConfiguration.java
create mode 100644 marmaray/src/main/java/com/uber/marmaray/common/configuration/KafkaSourceConfiguration.java
create mode 100644 marmaray/src/main/java/com/uber/marmaray/common/configuration/LockManagerConfiguration.java
create mode 100644 marmaray/src/main/java/com/uber/marmaray/common/configuration/RetryStrategyConfiguration.java
create mode 100644 marmaray/src/main/java/com/uber/marmaray/common/configuration/SimpleRetryStrategyConfiguration.java
create mode 100644 marmaray/src/main/java/com/uber/marmaray/common/configuration/ZookeeperConfiguration.java
create mode 100644 marmaray/src/main/java/com/uber/marmaray/common/converters/converterresult/ConverterResult.java
create mode 100644 marmaray/src/main/java/com/uber/marmaray/common/converters/data/AbstractDataConverter.java
create mode 100644 marmaray/src/main/java/com/uber/marmaray/common/converters/data/CassandraSinkCQLDataConverter.java
create mode 100644 marmaray/src/main/java/com/uber/marmaray/common/converters/data/CassandraSinkDataConverter.java
create mode 100644 marmaray/src/main/java/com/uber/marmaray/common/converters/data/DummyHoodieSinkDataConverter.java
create mode 100644 marmaray/src/main/java/com/uber/marmaray/common/converters/data/FileSinkDataConverter.java
create mode 100644 marmaray/src/main/java/com/uber/marmaray/common/converters/data/HoodieSinkDataConverter.java
create mode 100644 marmaray/src/main/java/com/uber/marmaray/common/converters/data/KafkaSourceDataConverter.java
create mode 100644 marmaray/src/main/java/com/uber/marmaray/common/converters/data/SinkDataConverter.java
create mode 100644 marmaray/src/main/java/com/uber/marmaray/common/converters/data/SourceDataConverter.java
create mode 100644 marmaray/src/main/java/com/uber/marmaray/common/converters/data/SparkSourceDataConverter.java
create mode 100644 marmaray/src/main/java/com/uber/marmaray/common/converters/data/TSBasedHoodieSinkDataConverter.java
create mode 100644 marmaray/src/main/java/com/uber/marmaray/common/converters/schema/AbstractSchemaConverter.java
create mode 100644 marmaray/src/main/java/com/uber/marmaray/common/converters/schema/CassandraSchemaConverter.java
create mode 100644 marmaray/src/main/java/com/uber/marmaray/common/converters/schema/DataFrameSchemaConverter.java
create mode 100644 marmaray/src/main/java/com/uber/marmaray/common/data/BinaryRawData.java
create mode 100644 marmaray/src/main/java/com/uber/marmaray/common/data/ErrorData.java
create mode 100644 marmaray/src/main/java/com/uber/marmaray/common/data/ForkData.java
create mode 100644 marmaray/src/main/java/com/uber/marmaray/common/data/IData.java
create mode 100644 marmaray/src/main/java/com/uber/marmaray/common/data/RDDWrapper.java
create mode 100644 marmaray/src/main/java/com/uber/marmaray/common/data/RawData.java
create mode 100644 marmaray/src/main/java/com/uber/marmaray/common/data/RawDataHelper.java
create mode 100644 marmaray/src/main/java/com/uber/marmaray/common/data/ValidData.java
create mode 100644 marmaray/src/main/java/com/uber/marmaray/common/dataset/ErrorRecord.java
create mode 100644 marmaray/src/main/java/com/uber/marmaray/common/dataset/ExceptionRecord.java
create mode 100644 marmaray/src/main/java/com/uber/marmaray/common/dataset/MetricRecord.java
create mode 100644 marmaray/src/main/java/com/uber/marmaray/common/dataset/UtilRecord.java
create mode 100644 marmaray/src/main/java/com/uber/marmaray/common/dataset/UtilTable.java
create mode 100644 marmaray/src/main/java/com/uber/marmaray/common/exceptions/ForkOperationException.java
create mode 100644 marmaray/src/main/java/com/uber/marmaray/common/exceptions/InvalidDataException.java
create mode 100644 marmaray/src/main/java/com/uber/marmaray/common/exceptions/JobRuntimeException.java
create mode 100644 marmaray/src/main/java/com/uber/marmaray/common/exceptions/MetadataException.java
create mode 100644 marmaray/src/main/java/com/uber/marmaray/common/exceptions/MissingPropertyException.java
create mode 100644 marmaray/src/main/java/com/uber/marmaray/common/exceptions/RetryException.java
create mode 100644 marmaray/src/main/java/com/uber/marmaray/common/forkoperator/FilterFunction.java
create mode 100644 marmaray/src/main/java/com/uber/marmaray/common/forkoperator/ForkFunction.java
create mode 100644 marmaray/src/main/java/com/uber/marmaray/common/forkoperator/ForkOperator.java
create mode 100644 marmaray/src/main/java/com/uber/marmaray/common/job/DagPayload.java
create mode 100644 marmaray/src/main/java/com/uber/marmaray/common/job/ExecutionTimeJobExecutionStrategy.java
create mode 100644 marmaray/src/main/java/com/uber/marmaray/common/job/IJobExecutionStrategy.java
create mode 100644 marmaray/src/main/java/com/uber/marmaray/common/job/Job.java
create mode 100644 marmaray/src/main/java/com/uber/marmaray/common/job/JobDag.java
create mode 100644 marmaray/src/main/java/com/uber/marmaray/common/job/JobManager.java
create mode 100644 marmaray/src/main/java/com/uber/marmaray/common/job/JobSubDag.java
create mode 100644 marmaray/src/main/java/com/uber/marmaray/common/job/SingleSinkSubDag.java
create mode 100644 marmaray/src/main/java/com/uber/marmaray/common/job/ThreadPoolService.java
create mode 100644 marmaray/src/main/java/com/uber/marmaray/common/job/ThreadPoolServiceTier.java
create mode 100644 marmaray/src/main/java/com/uber/marmaray/common/metadata/AbstractValue.java
create mode 100644 marmaray/src/main/java/com/uber/marmaray/common/metadata/HDFSDatePartitionManager.java
create mode 100644 marmaray/src/main/java/com/uber/marmaray/common/metadata/HDFSMetadataManager.java
create mode 100644 marmaray/src/main/java/com/uber/marmaray/common/metadata/HDFSPartitionManager.java
create mode 100644 marmaray/src/main/java/com/uber/marmaray/common/metadata/HoodieBasedMetadataManager.java
create mode 100644 marmaray/src/main/java/com/uber/marmaray/common/metadata/IMetadataManager.java
create mode 100644 marmaray/src/main/java/com/uber/marmaray/common/metadata/JobManagerMetadataTracker.java
create mode 100644 marmaray/src/main/java/com/uber/marmaray/common/metadata/MetadataConstants.java
create mode 100644 marmaray/src/main/java/com/uber/marmaray/common/metadata/NoOpMetadataManager.java
create mode 100644 marmaray/src/main/java/com/uber/marmaray/common/metadata/StringValue.java
create mode 100644 marmaray/src/main/java/com/uber/marmaray/common/metrics/CassandraPayloadRDDSizeEstimator.java
create mode 100644 marmaray/src/main/java/com/uber/marmaray/common/metrics/ChargebackMetricType.java
create mode 100644 marmaray/src/main/java/com/uber/marmaray/common/metrics/DataFeedMetricNames.java
create mode 100644 marmaray/src/main/java/com/uber/marmaray/common/metrics/DataFeedMetrics.java
create mode 100644 marmaray/src/main/java/com/uber/marmaray/common/metrics/DoubleMetric.java
create mode 100644 marmaray/src/main/java/com/uber/marmaray/common/metrics/IChargebackCalculator.java
create mode 100644 marmaray/src/main/java/com/uber/marmaray/common/metrics/IMetricable.java
create mode 100644 marmaray/src/main/java/com/uber/marmaray/common/metrics/JobMetricNames.java
create mode 100644 marmaray/src/main/java/com/uber/marmaray/common/metrics/JobMetricType.java
create mode 100644 marmaray/src/main/java/com/uber/marmaray/common/metrics/JobMetrics.java
create mode 100644 marmaray/src/main/java/com/uber/marmaray/common/metrics/LongMetric.java
create mode 100644 marmaray/src/main/java/com/uber/marmaray/common/metrics/Metric.java
create mode 100644 marmaray/src/main/java/com/uber/marmaray/common/metrics/TimerMetric.java
create mode 100644 marmaray/src/main/java/com/uber/marmaray/common/reporters/ConsoleReporter.java
create mode 100644 marmaray/src/main/java/com/uber/marmaray/common/reporters/IReporter.java
create mode 100644 marmaray/src/main/java/com/uber/marmaray/common/reporters/Reportable.java
create mode 100644 marmaray/src/main/java/com/uber/marmaray/common/reporters/Reporters.java
create mode 100644 marmaray/src/main/java/com/uber/marmaray/common/retry/IFunctionThrowsException.java
create mode 100644 marmaray/src/main/java/com/uber/marmaray/common/retry/IRetryStrategy.java
create mode 100644 marmaray/src/main/java/com/uber/marmaray/common/retry/RetryableFunction.java
create mode 100644 marmaray/src/main/java/com/uber/marmaray/common/retry/SimpleRetryStrategy.java
create mode 100644 marmaray/src/main/java/com/uber/marmaray/common/schema/ISchemaService.java
create mode 100644 marmaray/src/main/java/com/uber/marmaray/common/schema/ISinkSchemaManager.java
create mode 100644 marmaray/src/main/java/com/uber/marmaray/common/schema/cassandra/CassandraDataField.java
create mode 100644 marmaray/src/main/java/com/uber/marmaray/common/schema/cassandra/CassandraPayload.java
create mode 100644 marmaray/src/main/java/com/uber/marmaray/common/schema/cassandra/CassandraSchema.java
create mode 100644 marmaray/src/main/java/com/uber/marmaray/common/schema/cassandra/CassandraSchemaField.java
create mode 100644 marmaray/src/main/java/com/uber/marmaray/common/schema/cassandra/CassandraSinkSchemaManager.java
create mode 100644 marmaray/src/main/java/com/uber/marmaray/common/schema/cassandra/ClusterKey.java
create mode 100644 marmaray/src/main/java/com/uber/marmaray/common/sinks/ISink.java
create mode 100644 marmaray/src/main/java/com/uber/marmaray/common/sinks/SinkStatManager.java
create mode 100644 marmaray/src/main/java/com/uber/marmaray/common/sinks/cassandra/CassandraClientSink.java
create mode 100644 marmaray/src/main/java/com/uber/marmaray/common/sinks/cassandra/CassandraSSTableSink.java
create mode 100644 marmaray/src/main/java/com/uber/marmaray/common/sinks/cassandra/CassandraSink.java
create mode 100644 marmaray/src/main/java/com/uber/marmaray/common/sinks/file/AwsFileSink.java
create mode 100644 marmaray/src/main/java/com/uber/marmaray/common/sinks/file/FileSink.java
create mode 100644 marmaray/src/main/java/com/uber/marmaray/common/sinks/file/HdfsFileSink.java
create mode 100644 marmaray/src/main/java/com/uber/marmaray/common/sinks/hoodie/HoodieErrorSink.java
create mode 100644 marmaray/src/main/java/com/uber/marmaray/common/sinks/hoodie/HoodieSink.java
create mode 100644 marmaray/src/main/java/com/uber/marmaray/common/sinks/hoodie/HoodieSinkOperations.java
create mode 100644 marmaray/src/main/java/com/uber/marmaray/common/sinks/hoodie/HoodieWriteStatus.java
create mode 100644 marmaray/src/main/java/com/uber/marmaray/common/sources/IRunState.java
create mode 100644 marmaray/src/main/java/com/uber/marmaray/common/sources/ISource.java
create mode 100644 marmaray/src/main/java/com/uber/marmaray/common/sources/IWorkUnitCalculator.java
create mode 100644 marmaray/src/main/java/com/uber/marmaray/common/sources/hive/HiveRunState.java
create mode 100644 marmaray/src/main/java/com/uber/marmaray/common/sources/hive/HiveSource.java
create mode 100644 marmaray/src/main/java/com/uber/marmaray/common/sources/hive/ParquetWorkUnitCalculator.java
create mode 100644 marmaray/src/main/java/com/uber/marmaray/common/sources/hive/ParquetWorkUnitCalculatorResult.java
create mode 100644 marmaray/src/main/java/com/uber/marmaray/common/sources/kafka/KafkaBootstrapOffsetSelector.java
create mode 100644 marmaray/src/main/java/com/uber/marmaray/common/sources/kafka/KafkaRunState.java
create mode 100644 marmaray/src/main/java/com/uber/marmaray/common/sources/kafka/KafkaSource.java
create mode 100644 marmaray/src/main/java/com/uber/marmaray/common/sources/kafka/KafkaWorkUnitCalculator.java
create mode 100644 marmaray/src/main/java/com/uber/marmaray/common/spark/SparkArgs.java
create mode 100644 marmaray/src/main/java/com/uber/marmaray/common/spark/SparkFactory.java
create mode 100644 marmaray/src/main/java/com/uber/marmaray/utilities/ByteBufferUtil.java
create mode 100644 marmaray/src/main/java/com/uber/marmaray/utilities/CommandLineUtil.java
create mode 100644 marmaray/src/main/java/com/uber/marmaray/utilities/ConfigUtil.java
create mode 100644 marmaray/src/main/java/com/uber/marmaray/utilities/ConverterUtil.java
create mode 100644 marmaray/src/main/java/com/uber/marmaray/utilities/DateUtil.java
create mode 100644 marmaray/src/main/java/com/uber/marmaray/utilities/ErrorExtractor.java
create mode 100644 marmaray/src/main/java/com/uber/marmaray/utilities/ErrorTableUtil.java
create mode 100644 marmaray/src/main/java/com/uber/marmaray/utilities/FSUtils.java
create mode 100644 marmaray/src/main/java/com/uber/marmaray/utilities/GenericRecordUtil.java
create mode 100644 marmaray/src/main/java/com/uber/marmaray/utilities/HoodieSinkConverterErrorExtractor.java
create mode 100644 marmaray/src/main/java/com/uber/marmaray/utilities/HoodieSinkErrorExtractor.java
create mode 100644 marmaray/src/main/java/com/uber/marmaray/utilities/HoodieUtil.java
create mode 100644 marmaray/src/main/java/com/uber/marmaray/utilities/JobUtil.java
create mode 100644 marmaray/src/main/java/com/uber/marmaray/utilities/KafkaSourceConverterErrorExtractor.java
create mode 100644 marmaray/src/main/java/com/uber/marmaray/utilities/KafkaUtil.java
create mode 100644 marmaray/src/main/java/com/uber/marmaray/utilities/LockManager.java
create mode 100644 marmaray/src/main/java/com/uber/marmaray/utilities/MapUtil.java
create mode 100644 marmaray/src/main/java/com/uber/marmaray/utilities/NumberConstants.java
create mode 100644 marmaray/src/main/java/com/uber/marmaray/utilities/ScalaUtil.java
create mode 100644 marmaray/src/main/java/com/uber/marmaray/utilities/SchemaUtil.java
create mode 100644 marmaray/src/main/java/com/uber/marmaray/utilities/SparkUtil.java
create mode 100644 marmaray/src/main/java/com/uber/marmaray/utilities/StringTypes.java
create mode 100644 marmaray/src/main/java/com/uber/marmaray/utilities/StringUtil.java
create mode 100644 marmaray/src/main/java/com/uber/marmaray/utilities/TimestampInfo.java
create mode 100644 marmaray/src/main/java/com/uber/marmaray/utilities/cluster/CassandraClusterInfo.java
create mode 100644 marmaray/src/main/java/com/uber/marmaray/utilities/listener/SparkEventListener.java
create mode 100644 marmaray/src/main/java/com/uber/marmaray/utilities/listener/SparkJobTracker.java
create mode 100644 marmaray/src/main/java/com/uber/marmaray/utilities/listener/TimeoutManager.java
create mode 100644 marmaray/src/test/java/com/uber/marmaray/TestSparkUtil.java
create mode 100644 marmaray/src/test/java/com/uber/marmaray/common/actions/TestJobDagActions.java
create mode 100644 marmaray/src/test/java/com/uber/marmaray/common/configuration/TestAwsConfiguration.java
create mode 100644 marmaray/src/test/java/com/uber/marmaray/common/configuration/TestCassandraSinkConfiguration.java
create mode 100644 marmaray/src/test/java/com/uber/marmaray/common/configuration/TestConfigScopeResolver.java
create mode 100644 marmaray/src/test/java/com/uber/marmaray/common/configuration/TestConfiguration.java
create mode 100644 marmaray/src/test/java/com/uber/marmaray/common/configuration/TestErrorTableConfiguration.java
create mode 100644 marmaray/src/test/java/com/uber/marmaray/common/configuration/TestFileSinkConfiguration.java
create mode 100644 marmaray/src/test/java/com/uber/marmaray/common/configuration/TestHadoopConfiguration.java
create mode 100644 marmaray/src/test/java/com/uber/marmaray/common/configuration/TestHoodieConfiguration.java
create mode 100644 marmaray/src/test/java/com/uber/marmaray/common/configuration/TestHoodieIndexConfiguration.java
create mode 100644 marmaray/src/test/java/com/uber/marmaray/common/configuration/TestKafkaConfiguration.java
create mode 100644 marmaray/src/test/java/com/uber/marmaray/common/converters/TestAbstractDataConverter.java
create mode 100644 marmaray/src/test/java/com/uber/marmaray/common/converters/TestCassandraDataFrameConverter.java
create mode 100644 marmaray/src/test/java/com/uber/marmaray/common/converters/TestCassandraSchemaConverter.java
create mode 100644 marmaray/src/test/java/com/uber/marmaray/common/converters/TestDataFrameDataConverter.java
create mode 100644 marmaray/src/test/java/com/uber/marmaray/common/converters/TestDataFrameSchemaConverter.java
create mode 100644 marmaray/src/test/java/com/uber/marmaray/common/converters/data/TestFileSinkDataConverter.java
create mode 100644 marmaray/src/test/java/com/uber/marmaray/common/data/TestRDDWrapper.java
create mode 100644 marmaray/src/test/java/com/uber/marmaray/common/dataset/TestUtilTable.java
create mode 100644 marmaray/src/test/java/com/uber/marmaray/common/forkoperator/TestForkOperator.java
create mode 100644 marmaray/src/test/java/com/uber/marmaray/common/job/TestExecutionTimeJobExecutionStrategy.java
create mode 100644 marmaray/src/test/java/com/uber/marmaray/common/job/TestJobSubDag.java
create mode 100644 marmaray/src/test/java/com/uber/marmaray/common/job/TestThreadPoolService.java
create mode 100644 marmaray/src/test/java/com/uber/marmaray/common/metadata/HDFSTestConstants.java
create mode 100644 marmaray/src/test/java/com/uber/marmaray/common/metadata/MemoryMetadataManager.java
create mode 100644 marmaray/src/test/java/com/uber/marmaray/common/metadata/TestHDFSDatePartitionManager.java
create mode 100644 marmaray/src/test/java/com/uber/marmaray/common/metadata/TestHDFSJobLevelMetadataTracker.java
create mode 100644 marmaray/src/test/java/com/uber/marmaray/common/metadata/TestHDFSMetadataManager.java
create mode 100644 marmaray/src/test/java/com/uber/marmaray/common/metadata/TestHDFSPartitionManager.java
create mode 100644 marmaray/src/test/java/com/uber/marmaray/common/metadata/TestHoodieBasedMetadataManager.java
create mode 100644 marmaray/src/test/java/com/uber/marmaray/common/metrics/TestDataFeedMetrics.java
create mode 100644 marmaray/src/test/java/com/uber/marmaray/common/metrics/TestJobMetrics.java
create mode 100644 marmaray/src/test/java/com/uber/marmaray/common/metrics/TestTimerMetric.java
create mode 100644 marmaray/src/test/java/com/uber/marmaray/common/retry/TestRetryableFunction.java
create mode 100644 marmaray/src/test/java/com/uber/marmaray/common/schema/cassandra/TestCassandraSinkSchemaManager.java
create mode 100644 marmaray/src/test/java/com/uber/marmaray/common/schema/cassandra/TestClusterKey.java
create mode 100644 marmaray/src/test/java/com/uber/marmaray/common/sinks/TestSinkStatManager.java
create mode 100644 marmaray/src/test/java/com/uber/marmaray/common/sinks/cassandra/TestCassandraClientSink.java
create mode 100644 marmaray/src/test/java/com/uber/marmaray/common/sinks/cassandra/TestCassandraSSTableSink.java
create mode 100644 marmaray/src/test/java/com/uber/marmaray/common/sinks/cassandra/TestCassandraSinkUtil.java
create mode 100644 marmaray/src/test/java/com/uber/marmaray/common/sinks/file/FileSinkTestUtil.java
create mode 100644 marmaray/src/test/java/com/uber/marmaray/common/sinks/file/TestAwsFileSink.java
create mode 100644 marmaray/src/test/java/com/uber/marmaray/common/sinks/file/TestFileSink.java
create mode 100644 marmaray/src/test/java/com/uber/marmaray/common/sinks/file/TestHdfsFileSink.java
create mode 100644 marmaray/src/test/java/com/uber/marmaray/common/sinks/hoodie/TestHoodieSink.java
create mode 100644 marmaray/src/test/java/com/uber/marmaray/common/sources/hive/TestHiveSource.java
create mode 100644 marmaray/src/test/java/com/uber/marmaray/common/sources/hive/TestHiveSourceConfiguration.java
create mode 100644 marmaray/src/test/java/com/uber/marmaray/common/sources/hive/TestParquetWorkUnitCalculator.java
create mode 100644 marmaray/src/test/java/com/uber/marmaray/common/spark/TestSparkFactory.java
create mode 100644 marmaray/src/test/java/com/uber/marmaray/common/util/AbstractSparkTest.java
create mode 100644 marmaray/src/test/java/com/uber/marmaray/common/util/AvroPayloadUtil.java
create mode 100644 marmaray/src/test/java/com/uber/marmaray/common/util/CassandraTestConstants.java
create mode 100644 marmaray/src/test/java/com/uber/marmaray/common/util/CassandraTestUtil.java
create mode 100644 marmaray/src/test/java/com/uber/marmaray/common/util/FileHelperUtil.java
create mode 100644 marmaray/src/test/java/com/uber/marmaray/common/util/FileSinkConfigTestUtil.java
create mode 100644 marmaray/src/test/java/com/uber/marmaray/common/util/FileTestUtil.java
create mode 100644 marmaray/src/test/java/com/uber/marmaray/common/util/HiveTestUtil.java
create mode 100644 marmaray/src/test/java/com/uber/marmaray/common/util/KafkaTestHelper.java
create mode 100644 marmaray/src/test/java/com/uber/marmaray/common/util/MultiThreadTestCoordinator.java
create mode 100644 marmaray/src/test/java/com/uber/marmaray/common/util/ParquetWriterUtil.java
create mode 100644 marmaray/src/test/java/com/uber/marmaray/common/util/SchemaTestUtil.java
create mode 100644 marmaray/src/test/java/com/uber/marmaray/common/util/SparkTestUtil.java
create mode 100644 marmaray/src/test/java/com/uber/marmaray/common/util/TestConverterUtil.java
create mode 100644 marmaray/src/test/java/com/uber/marmaray/common/util/TestDateUtil.java
create mode 100644 marmaray/src/test/java/com/uber/marmaray/common/util/TestFsUtils.java
create mode 100644 marmaray/src/test/java/com/uber/marmaray/common/util/TestJobUtil.java
create mode 100644 marmaray/src/test/java/com/uber/marmaray/common/util/TestLockManager.java
create mode 100644 marmaray/src/test/java/com/uber/marmaray/common/util/TestMapUtil.java
create mode 100644 marmaray/src/test/java/com/uber/marmaray/common/util/TestParquetWriterUtil.java
create mode 100644 marmaray/src/test/java/com/uber/marmaray/common/util/TestSchemaUtil.java
create mode 100644 marmaray/src/test/java/com/uber/marmaray/utilities/TestKafkaUtil.java
create mode 100644 marmaray/src/test/resources/cassandra.yaml
create mode 100644 marmaray/src/test/resources/config.yaml
create mode 100644 marmaray/src/test/resources/configWithScopes.yaml
create mode 100644 marmaray/src/test/resources/datacenter/datacenter
create mode 100644 marmaray/src/test/resources/expectedConfigWithBootstrapScope.yaml
create mode 100644 marmaray/src/test/resources/expectedConfigWithIncrementalScope.yaml
create mode 100644 marmaray/src/test/resources/log4j-surefire.properties
create mode 100644 marmaray/src/test/resources/schemas/StringPair.avsc
create mode 100644 marmaray/src/test/resources/setupTable.cql
create mode 100644 marmaray/src/test/resources/teardownTable.cql
create mode 100644 marmaray/src/test/resources/testData/testPartition/data.parquet
create mode 100644 pom.xml
diff --git a/.gitignore b/.gitignore
new file mode 100644
index 0000000..82806b8
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1,90 @@
+# Compiled source #
+###################
+*.com
+*.class
+*.dll
+*.exe
+*.a
+*.o
+*.so
+*.node
+
+# Node Waf Byproducts #
+#######################
+.lock-wscript
+build/
+autom4te.cache/
+
+# Node Modules #
+################
+# Better to let npm install these from the package.json defintion
+# rather than maintain this manually
+node_modules/
+
+# Packages #
+############
+# it's better to unpack these files and commit the raw source
+# git has its own built in compression methods
+*.7z
+*.dmg
+*.gz
+*.iso
+*.jar
+*.rar
+*.tar
+*.zip
+
+# Logs and databases #
+######################
+*.log
+dump.rdb
+*.tap
+
+
+# OS generated files #
+######################
+.DS_Store?
+.DS_Store
+ehthumbs.db
+Icon?
+Thumbs.db
+
+# thrift generated files #
+##########################
+generated/
+
+# NodeJS Core Dump
+core
+
+# Jenkins build scripts
+rt-jenkins/
+
+# Coverage Reports
+coverage/
+
+# local docs, scratchboards
+localdocs/
+
+# vi temp files
+.*.swp
+
+# intelliJ
+.idea/
+*.iml
+
+# Project specific items (local conf, build dir)
+config/local.json
+maps-evidence/
+*.lst
+classes/
+target/
+*.dat
+
+# shaded jar pom file
+dependency-reduced-pom.xml
+
+# output of build plugin org.codehaus.mojo build-helper-maven-plugin
+test_properties.props
+
+# gradle generated logs
+.gradle
diff --git a/checkstyles/marmaray-0.0.1.xml b/checkstyles/marmaray-0.0.1.xml
new file mode 100644
index 0000000..fa3cf98
--- /dev/null
+++ b/checkstyles/marmaray-0.0.1.xml
@@ -0,0 +1,136 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/checkstyles/suppressions.xml b/checkstyles/suppressions.xml
new file mode 100644
index 0000000..f516bb7
--- /dev/null
+++ b/checkstyles/suppressions.xml
@@ -0,0 +1,13 @@
+
+
+
+
+
+
+
+
+
+
+
diff --git a/marmaray-tools/pom.xml b/marmaray-tools/pom.xml
new file mode 100644
index 0000000..1fc5334
--- /dev/null
+++ b/marmaray-tools/pom.xml
@@ -0,0 +1,22 @@
+
+ 4.0.0
+
+ 1.8
+ 1.8
+
+
+ com.uber.marmaray
+ marmaray-base
+ 1.0-SNAPSHOT
+
+ marmaray-tools
+ 1.0-SNAPSHOT
+
+
+
+ com.uber.marmaray
+ marmaray
+ 1.0-SNAPSHOT
+
+
+
diff --git a/marmaray-tools/src/main/cli/toggleHDFSMetadataFile.py b/marmaray-tools/src/main/cli/toggleHDFSMetadataFile.py
new file mode 100755
index 0000000..2e96826
--- /dev/null
+++ b/marmaray-tools/src/main/cli/toggleHDFSMetadataFile.py
@@ -0,0 +1,62 @@
+"""
+Copyright (c) 2018 Uber Technologies, Inc.
+Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated
+documentation files (the "Software"), to deal in the Software without restriction, including without limitation the
+rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to
+permit persons to whom the Software is furnished to do so, subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in all copies or substantial portions
+of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO
+THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
+TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
+IN THE SOFTWARE.
+"""
+
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+
+
+"""
+A script to either enable or disable a HDFS metadata file. A metadata file is disabled if the extension ".tmp"
+is appended at the end. Enabling a metadata file will remove the same extension. This tool will allow users
+to disable metadata files and associated watermarks inside these files used to determine which partition to process.
+
+Usage: toggleHDFSMetadataFile -f sample_file -d
+ toggleHDFSMetadataFile -f sample_file.tmp -e
+"""
+import argparse, sys
+import shutil
+import logging
+from os.path import basename, dirname, splitext, isfile
+
+def main(args):
+ logging.basicConfig(level=logging.INFO)
+ disabled_ext = ".tmp"
+
+ if not isfile(args.file):
+ logging.error("the file does not exist")
+ quit()
+
+ if args.disable:
+ if not args.file.endswith(disabled_ext):
+ logging.info("Disabling %s", args.file)
+ shutil.move(args.file, args.file + disabled_ext)
+ else:
+ logging.warning("the specified filename already ends with a .tmp extension")
+ if args.enable:
+ if args.file.endswith(disabled_ext):
+ logging.info("Enabling %s ", args.file)
+ shutil.move(args.file, dirname(args.file) + splitext(basename(args.file))[0])
+ else:
+ logging.error("the file must end with a .tmp extension")
+
+if __name__ == '__main__':
+ p= argparse.ArgumentParser()
+ p.add_argument('-f', '--file', help='File to toggle on/off as metadata file')
+ p.add_argument('-e', '--enable', help='Enable .tmp file as metadata file', action='store_true')
+ p.add_argument('-d', '--disable', help='disable metadata file', action='store_true')
+ args = p.parse_args()
+ main(args)
diff --git a/marmaray-tools/src/main/java/com/uber/marmaray/tools/HDFSMetadataPrinter.java b/marmaray-tools/src/main/java/com/uber/marmaray/tools/HDFSMetadataPrinter.java
new file mode 100644
index 0000000..8b13c25
--- /dev/null
+++ b/marmaray-tools/src/main/java/com/uber/marmaray/tools/HDFSMetadataPrinter.java
@@ -0,0 +1,83 @@
+/*
+ * Copyright (c) 2018 Uber Technologies, Inc.
+ * Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated
+ * documentation files (the "Software"), to deal in the Software without restriction, including without limitation the
+ * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to
+ * permit persons to whom the Software is furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all copies or substantial portions
+ * of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO
+ * THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
+ * TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
+ * IN THE SOFTWARE.
+ */
+package com.uber.marmaray.tools;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+import com.uber.marmaray.common.configuration.Configuration;
+import com.uber.marmaray.common.metadata.HDFSMetadataManager;
+import com.uber.marmaray.common.metadata.StringValue;
+import com.uber.marmaray.utilities.CommandLineUtil;
+import com.uber.marmaray.utilities.FSUtils;
+import java.io.BufferedInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.ObjectInputStream;
+import java.util.Map;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+@Slf4j
+public class HDFSMetadataPrinter {
+ private static final String METADATA_FILE_OPTION = "mfile";
+
+ public static void main(final String[] args) throws ParseException, IOException {
+ final CommandLineParser parser = new GnuParser();
+ final Options options = getCLIOptions();
+ final CommandLine cmd;
+
+ try {
+ cmd = parser.parse(options, args);
+ } catch (final ParseException e) {
+ final String cmdLineSyntax = "java -cp [jar_name] com.uber.marmaray.tools.HDFSMetadataPrinter "
+ + "-m [METADATA_FILE]";
+ final String header = "This tools prints out all the metadata contents of a HDFS metadata file.";
+ final String footer = "For help, please contact the Hadoop Data Platform team";
+ CommandLineUtil.printHelp(options, cmdLineSyntax, header, footer);
+ throw e;
+ }
+
+ final String metadataFilePath = cmd.getOptionValue(METADATA_FILE_OPTION);
+ Preconditions.checkState(!Strings.isNullOrEmpty(metadataFilePath));
+
+ log.info("Printing contents of metadata file: " + metadataFilePath);
+
+ final Configuration conf = new Configuration();
+ final FileSystem fs = FSUtils.getFs(conf);
+ try (final InputStream is = new BufferedInputStream(fs.open(new Path(metadataFilePath)))) {
+ try (final ObjectInputStream input = new ObjectInputStream(is)) {
+ final Map metadataMap = HDFSMetadataManager.deserialize(input);
+ metadataMap.entrySet()
+ .stream()
+ .forEach(entry ->
+ log.info("Key: " + entry.getKey() + " Value: " + entry.getValue().getValue()));
+ }
+ }
+ }
+
+ private static Options getCLIOptions() {
+ final Options options = new Options();
+ options.addOption(CommandLineUtil.generateOption("m", METADATA_FILE_OPTION, true, "HDFS metadata file", true));
+ return options;
+ }
+}
diff --git a/marmaray-tools/src/main/java/com/uber/marmaray/tools/HDFSMetadataPruner.java b/marmaray-tools/src/main/java/com/uber/marmaray/tools/HDFSMetadataPruner.java
new file mode 100644
index 0000000..b2bbe90
--- /dev/null
+++ b/marmaray-tools/src/main/java/com/uber/marmaray/tools/HDFSMetadataPruner.java
@@ -0,0 +1,120 @@
+/*
+ * Copyright (c) 2018 Uber Technologies, Inc.
+ * Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated
+ * documentation files (the "Software"), to deal in the Software without restriction, including without limitation the
+ * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to
+ * permit persons to whom the Software is furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all copies or substantial portions
+ * of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO
+ * THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
+ * TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
+ * IN THE SOFTWARE.
+ */
+package com.uber.marmaray.tools;
+
+import com.uber.marmaray.common.configuration.Configuration;
+import com.uber.marmaray.common.metadata.HDFSMetadataManager;
+import com.uber.marmaray.utilities.CommandLineUtil;
+import com.uber.marmaray.utilities.FSUtils;
+import java.io.IOException;
+import java.util.Comparator;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import parquet.Preconditions;
+
+@Slf4j
+public class HDFSMetadataPruner {
+ private static final String HDFS_PATH_LONG_OPTION = "path";
+ private static final String NUM_METADATA_FILES_RETAINED_LONG_OPTION = "numFiles";
+ private static final String FAKE_DELETE_LONG_OPTION = "fake";
+ private static final String HDFS_PATH_SHORT_OPTION = "p";
+ private static final String NUM_METADATA_FILES_RETAINED_SHORT_OPTION = "n";
+ private static final String FAKE_DELETE_SHORT_OPTION = "f";
+
+ // Metadata file names in HDFS = nanoseconds since epoch so we can sort by name
+ private static final Comparator byTimestampedNameAsc =
+ Comparator.comparingLong(f1 -> Long.parseLong(f1.getPath().getName()));
+
+ // Todo - consider putting main functionality in a static utility method
+ public static void main(final String[] args) throws ParseException, IOException {
+ final CommandLineParser parser = new GnuParser();
+ final Options options = getCLIOptions();
+ final CommandLine cmd;
+
+ try {
+ cmd = parser.parse(options, args);
+ } catch (final ParseException e) {
+ final String cmdLineSyntax =
+ String.format("java -cp [jar_name] com.uber.marmaray.tools.HDFSMetadataCuller "
+ + "-%s [METADATA_PATH] -%s [NUM_METADATA_FILES_RETAINED] -%s [FAKE_DELETE_BOOLEAN]",
+ HDFS_PATH_SHORT_OPTION, NUM_METADATA_FILES_RETAINED_SHORT_OPTION, FAKE_DELETE_SHORT_OPTION);
+ final String header = "This tool prunes metadata files for an HDFS path by modification time";
+ final String footer = "For help, please contact the Hadoop Data Platform team";
+ CommandLineUtil.printHelp(options, cmdLineSyntax, header, footer);
+ throw e;
+ }
+
+ final Path metadataPath = new Path(cmd.getOptionValue(HDFS_PATH_LONG_OPTION));
+
+ final int numFilesToRetain = cmd.hasOption(NUM_METADATA_FILES_RETAINED_LONG_OPTION)
+ ? Integer.parseInt(cmd.getOptionValue(NUM_METADATA_FILES_RETAINED_LONG_OPTION))
+ : HDFSMetadataManager.DEFAULT_NUM_METADATA_FILES_TO_RETAIN;
+
+ Preconditions.checkState(numFilesToRetain > 0, "Number of files to retain cannot be <= 0");
+
+ final boolean fakeDelete = cmd.hasOption(FAKE_DELETE_LONG_OPTION)
+ ? Boolean.parseBoolean(cmd.getOptionValue(FAKE_DELETE_LONG_OPTION))
+ : false;
+
+ final Configuration conf = new Configuration();
+ final FileSystem fs = FSUtils.getFs(conf);
+
+ if (fs.isDirectory(metadataPath)) {
+ final FileStatus[] fileStatuses = fs.listStatus(metadataPath);
+
+ if (fileStatuses.length < numFilesToRetain) {
+ log.info("No files were deleted. Number of files ({}) is less than number to retain ({})",
+ fileStatuses.length, numFilesToRetain);
+ return;
+ }
+
+ FSUtils.deleteHDFSMetadataFiles(fileStatuses, fs, numFilesToRetain, fakeDelete);
+ } else {
+ log.warn("Cannot prune any files, the path {} is not a directory", metadataPath);
+ }
+ }
+
+ private static Options getCLIOptions() {
+ final Options options = new Options();
+ options.addOption(CommandLineUtil.generateOption(HDFS_PATH_SHORT_OPTION,
+ HDFS_PATH_LONG_OPTION,
+ true,
+ "HDFS path",
+ true));
+
+ options.addOption(CommandLineUtil.generateOption(NUM_METADATA_FILES_RETAINED_SHORT_OPTION,
+ NUM_METADATA_FILES_RETAINED_LONG_OPTION,
+ true,
+ "number of metadata files to retain",
+ false));
+
+ options.addOption(CommandLineUtil.generateOption(FAKE_DELETE_SHORT_OPTION,
+ FAKE_DELETE_LONG_OPTION,
+ true,
+ "fake delete",
+ false));
+ return options;
+ }
+}
+
diff --git a/marmaray/build.gradle b/marmaray/build.gradle
new file mode 100644
index 0000000..015d697
--- /dev/null
+++ b/marmaray/build.gradle
@@ -0,0 +1,18 @@
+plugins {
+ id 'com.kageiit.jacobo' version '2.0.1'
+}
+
+description = 'translate jacoco to cobertura'
+
+dependencies {
+}
+
+task jacobo(type: com.kageiit.jacobo.JacoboTask) {
+ jacocoReport = file("./target/site/jacoco-ut/jacoco.xml")
+ coberturaReport = file("./target/site/cobertura/coverage.xml")
+ srcDirs = ["./src/main/java"]
+}
+
+task noop {
+ // noop task for when tests don't run
+}
diff --git a/marmaray/config/sample.yaml b/marmaray/config/sample.yaml
new file mode 100644
index 0000000..fc3e37c
--- /dev/null
+++ b/marmaray/config/sample.yaml
@@ -0,0 +1,45 @@
+---
+marmaray:
+ hadoop:
+ anything: "???"
+ hive:
+ dataPath: ""
+ jobName: ""
+ source:
+ saveCheckpoint: false
+ hoodie:
+ tables:
+ cell_table:
+ table_name: ""
+ base_path: ""
+ schema: ""
+ parallelism: 1024
+ row_table:
+ table_name: ""
+ base_bath: ""
+ schema: ""
+ default:
+ combine_before_insert: true
+ combine_before_upsert: true
+ parallelism: 512
+ kafka:
+ conn:
+ bootstrap:
+ servers: "???"
+ source:
+ topicName:
+ maxMessage:
+ readParallelism:
+ cassandra:
+ output:
+ native.port: ""
+ thrift.address: ""
+ keyspace: "keyspace"
+ tablename: "tableName"
+ cluster_name: "clusterName"
+ column_list: "columnList"
+ partition_keys: "partitionKeys"
+ clustering_keys: "clusteringKeys"
+ inputPath: "inputPath"
+ partitionType: "partitionType"
+ time_to_live: 0L
diff --git a/marmaray/pom.xml b/marmaray/pom.xml
new file mode 100644
index 0000000..1d5683d
--- /dev/null
+++ b/marmaray/pom.xml
@@ -0,0 +1,55 @@
+
+ 4.0.0
+
+ 2.7.4
+ 1.8
+ 1.8
+ 2.7.1
+
+
+
+
+
+ org.jacoco
+ jacoco-maven-plugin
+
+
+ org.fortasoft
+ gradle-maven-plugin
+
+
+ maven-assembly-plugin
+
+
+
+
+
+
+ com.fasterxml.jackson.dataformat
+ jackson-dataformat-yaml
+ ${jackson.dataformat.yaml}
+
+
+
+ org.apache.curator
+ curator-recipes
+ ${apache.curator}
+
+
+
+ org.apache.curator
+ curator-test
+ ${apache.curator}
+ test
+
+
+
+ com.uber.marmaray
+ marmaray-base
+ 1.0-SNAPSHOT
+
+ marmaray
+ 1.0-SNAPSHOT
+
+
diff --git a/marmaray/src/main/java/com/uber/marmaray/common/AvroPayload.java b/marmaray/src/main/java/com/uber/marmaray/common/AvroPayload.java
new file mode 100644
index 0000000..d9214de
--- /dev/null
+++ b/marmaray/src/main/java/com/uber/marmaray/common/AvroPayload.java
@@ -0,0 +1,87 @@
+/*
+ * Copyright (c) 2018 Uber Technologies, Inc.
+ * Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated
+ * documentation files (the "Software"), to deal in the Software without restriction, including without limitation the
+ * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to
+ * permit persons to whom the Software is furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all copies or substantial portions
+ * of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO
+ * THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
+ * TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
+ * IN THE SOFTWARE.
+ */
+package com.uber.marmaray.common;
+
+import com.google.common.base.Preconditions;
+import com.uber.marmaray.common.data.IData;
+import com.uber.marmaray.utilities.SparkUtil;
+import lombok.NonNull;
+import lombok.ToString;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.hibernate.validator.constraints.NotEmpty;
+import scala.reflect.ClassManifestFactory;
+import scala.reflect.ClassTag;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.avro.Schema.Type.RECORD;
+
+/**
+ * This class contains the Avro data as payload with the schema
+ */
+// TODO (T962137)
+@ToString
+@Slf4j
+public class AvroPayload implements IPayload, IData, Serializable {
+
+ private static final ClassTag recordClassTag = ClassManifestFactory.fromClass(GenericRecord.class);
+ @NonNull
+ private final Map rootFields = new HashMap<>();
+ @NonNull
+ private final byte[] byteRecord;
+
+ public AvroPayload(@NonNull final GenericRecord record) {
+ this.byteRecord = SparkUtil.serialize(record, recordClassTag);
+ for (final Schema.Field f : record.getSchema().getFields()) {
+ if (!RECORD.equals(f.schema().getType())) {
+ this.rootFields.put(f.name(), record.get(f.name()));
+ }
+ }
+ }
+
+ public AvroPayload(@NonNull final GenericRecord record,
+ @NonNull final List fieldsToCache) {
+ this.byteRecord = SparkUtil.serialize(record, recordClassTag);
+ for (final String f : fieldsToCache) {
+ this.rootFields.put(f, record.get(f));
+ }
+ }
+
+ /**
+ * Avoid calling it to fetch top level record fields.
+ */
+ public GenericRecord getData() {
+ return SparkUtil.deserialize(this.byteRecord, recordClassTag);
+ }
+
+ /**
+ * It only supports fetching fields at the root level of the record which are of type other than
+ * {@link org.apache.avro.generic.GenericData.Record}.
+ *
+ * @param fieldName name of the field at the root level of the record.
+ */
+ public Object getField(@NotEmpty final String fieldName) {
+ Preconditions.checkState(this.rootFields.containsKey(fieldName),
+ "field is not cached at root level :" + fieldName);
+ return this.rootFields.get(fieldName);
+ }
+}
diff --git a/marmaray/src/main/java/com/uber/marmaray/common/DispersalType.java b/marmaray/src/main/java/com/uber/marmaray/common/DispersalType.java
new file mode 100644
index 0000000..e6094cb
--- /dev/null
+++ b/marmaray/src/main/java/com/uber/marmaray/common/DispersalType.java
@@ -0,0 +1,29 @@
+/*
+ * Copyright (c) 2018 Uber Technologies, Inc.
+ * Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated
+ * documentation files (the "Software"), to deal in the Software without restriction, including without limitation the
+ * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to
+ * permit persons to whom the Software is furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all copies or substantial portions
+ * of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO
+ * THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
+ * TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
+ * IN THE SOFTWARE.
+ */
+
+package com.uber.marmaray.common;
+
+/**
+ * {@link DispersalType} defines two dispersal types:
+ * version: append new file to path with version id
+ * overwrite: delete old files and then add new file to path
+ */
+public enum DispersalType {
+ VERSION,
+ OVERWRITE
+}
+
diff --git a/marmaray/src/main/java/com/uber/marmaray/common/FileSinkType.java b/marmaray/src/main/java/com/uber/marmaray/common/FileSinkType.java
new file mode 100644
index 0000000..f9aa958
--- /dev/null
+++ b/marmaray/src/main/java/com/uber/marmaray/common/FileSinkType.java
@@ -0,0 +1,28 @@
+/*
+ * Copyright (c) 2018 Uber Technologies, Inc.
+ * Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated
+ * documentation files (the "Software"), to deal in the Software without restriction, including without limitation the
+ * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to
+ * permit persons to whom the Software is furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all copies or substantial portions
+ * of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO
+ * THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
+ * TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
+ * IN THE SOFTWARE.
+ */
+
+package com.uber.marmaray.common;
+
+/**
+ * {@link FileSinkType} defines two options of file sink destinations
+ * 1. HDFS
+ * 2. S3: aws s3
+ */
+public enum FileSinkType {
+ HDFS,
+ S3
+}
diff --git a/marmaray/src/main/java/com/uber/marmaray/common/HoodieErrorPayload.java b/marmaray/src/main/java/com/uber/marmaray/common/HoodieErrorPayload.java
new file mode 100644
index 0000000..11e9551
--- /dev/null
+++ b/marmaray/src/main/java/com/uber/marmaray/common/HoodieErrorPayload.java
@@ -0,0 +1,62 @@
+/*
+ * Copyright (c) 2018 Uber Technologies, Inc.
+ * Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated
+ * documentation files (the "Software"), to deal in the Software without restriction, including without limitation the
+ * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to
+ * permit persons to whom the Software is furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all copies or substantial portions
+ * of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO
+ * THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
+ * TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
+ * IN THE SOFTWARE.
+ */
+package com.uber.marmaray.common;
+
+import com.uber.hoodie.common.model.HoodieRecordPayload;
+import com.uber.hoodie.common.util.HoodieAvroUtils;
+import com.uber.marmaray.common.exceptions.JobRuntimeException;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.IndexedRecord;
+
+import lombok.NonNull;
+
+import java.io.IOException;
+import java.util.Optional;
+
+/**
+ * {@link HoodieErrorPayload} is a class to represent error payloads written by Hoodie.
+ */
+public class HoodieErrorPayload implements HoodieRecordPayload {
+ private final GenericRecord record;
+
+ public HoodieErrorPayload(@NonNull final GenericRecord record) {
+ this.record = record;
+ }
+
+ @Override
+ public Optional getInsertValue(final Schema schema) throws IOException {
+ final Optional record = getRecord();
+ return record.map(r -> HoodieAvroUtils.rewriteRecord(r, schema));
+ }
+
+ protected Optional getRecord() {
+ return Optional.of(this.record);
+ }
+
+ @Override
+ public HoodieErrorPayload preCombine(final HoodieErrorPayload hoodieErrorPayload) {
+ throw new JobRuntimeException("Not implemented yet!!");
+ }
+
+ @Override
+ public Optional combineAndGetUpdateValue(final IndexedRecord indexedRecord, final Schema schema)
+ throws IOException {
+ throw new JobRuntimeException("Not implemented yet!!");
+ }
+}
diff --git a/marmaray/src/main/java/com/uber/marmaray/common/IPayload.java b/marmaray/src/main/java/com/uber/marmaray/common/IPayload.java
new file mode 100644
index 0000000..49b6470
--- /dev/null
+++ b/marmaray/src/main/java/com/uber/marmaray/common/IPayload.java
@@ -0,0 +1,27 @@
+/*
+ * Copyright (c) 2018 Uber Technologies, Inc.
+ * Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated
+ * documentation files (the "Software"), to deal in the Software without restriction, including without limitation the
+ * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to
+ * permit persons to whom the Software is furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all copies or substantial portions
+ * of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO
+ * THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
+ * TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
+ * IN THE SOFTWARE.
+ */
+package com.uber.marmaray.common;
+
+/**
+ * We explicitly model a generic payload here so that it gives us the flexibility to
+ * wrap the data with additional metadata as needed
+ *
+ * @param data type
+ */
+public interface IPayload {
+ D getData();
+}
diff --git a/marmaray/src/main/java/com/uber/marmaray/common/PartitionType.java b/marmaray/src/main/java/com/uber/marmaray/common/PartitionType.java
new file mode 100644
index 0000000..6d5c9f0
--- /dev/null
+++ b/marmaray/src/main/java/com/uber/marmaray/common/PartitionType.java
@@ -0,0 +1,29 @@
+/*
+ * Copyright (c) 2018 Uber Technologies, Inc.
+ * Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated
+ * documentation files (the "Software"), to deal in the Software without restriction, including without limitation the
+ * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to
+ * permit persons to whom the Software is furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all copies or substantial portions
+ * of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO
+ * THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
+ * TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
+ * IN THE SOFTWARE.
+ */
+package com.uber.marmaray.common;
+
+/**
+ * {@link PartitionType} defines partition type for data
+ * normal: partition by some defined key
+ * date: partition by date
+ * none: no partition
+ */
+public enum PartitionType {
+ NORMAL,
+ DATE,
+ NONE
+}
diff --git a/marmaray/src/main/java/com/uber/marmaray/common/WorkUnit.java b/marmaray/src/main/java/com/uber/marmaray/common/WorkUnit.java
new file mode 100644
index 0000000..a4793e7
--- /dev/null
+++ b/marmaray/src/main/java/com/uber/marmaray/common/WorkUnit.java
@@ -0,0 +1,27 @@
+/*
+ * Copyright (c) 2018 Uber Technologies, Inc.
+ * Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated
+ * documentation files (the "Software"), to deal in the Software without restriction, including without limitation the
+ * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to
+ * permit persons to whom the Software is furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all copies or substantial portions
+ * of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO
+ * THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
+ * TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
+ * IN THE SOFTWARE.
+ */
+package com.uber.marmaray.common;
+
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+
+@AllArgsConstructor
+public class WorkUnit {
+
+ @Getter
+ private final String workEntity;
+}
diff --git a/marmaray/src/main/java/com/uber/marmaray/common/actions/IJobDagAction.java b/marmaray/src/main/java/com/uber/marmaray/common/actions/IJobDagAction.java
new file mode 100644
index 0000000..8ec2426
--- /dev/null
+++ b/marmaray/src/main/java/com/uber/marmaray/common/actions/IJobDagAction.java
@@ -0,0 +1,54 @@
+/*
+ * Copyright (c) 2018 Uber Technologies, Inc.
+ * Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated
+ * documentation files (the "Software"), to deal in the Software without restriction, including without limitation the
+ * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to
+ * permit persons to whom the Software is furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all copies or substantial portions
+ * of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO
+ * THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
+ * TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
+ * IN THE SOFTWARE.
+ */
+package com.uber.marmaray.common.actions;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * {@link IJobDagAction} is interface to determine a generic action to execute.
+ * {@link JobDagActions} are completely independent and will determine if they should run based on success status.
+ */
+public interface IJobDagAction {
+ int DEFAULT_TIMEOUT_SECONDS = 120;
+ String ACTION_TYPE = "action_type";
+
+ /**
+ * Execute the action
+ *
+ * @param successful whether the job dag succeeded
+ * @return true if action succeeded
+ */
+ boolean execute(boolean successful);
+
+ /**
+ * Timeout to wait for the action to complete
+ * @return number of seconds to wait for task completion
+ */
+ default int getTimeoutSeconds() {
+ return DEFAULT_TIMEOUT_SECONDS;
+ }
+
+ /**
+ * @return metric tags to be used for reporting metrics.
+ */
+ default Map getMetricTags() {
+ final Map metricsTags = new HashMap<>();
+ metricsTags.put(ACTION_TYPE, this.getClass().getSimpleName());
+ return metricsTags;
+ }
+}
diff --git a/marmaray/src/main/java/com/uber/marmaray/common/actions/JobDagActions.java b/marmaray/src/main/java/com/uber/marmaray/common/actions/JobDagActions.java
new file mode 100644
index 0000000..c9767bf
--- /dev/null
+++ b/marmaray/src/main/java/com/uber/marmaray/common/actions/JobDagActions.java
@@ -0,0 +1,141 @@
+/*
+ * Copyright (c) 2018 Uber Technologies, Inc.
+ * Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated
+ * documentation files (the "Software"), to deal in the Software without restriction, including without limitation the
+ * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to
+ * permit persons to whom the Software is furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all copies or substantial portions
+ * of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO
+ * THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
+ * TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
+ * IN THE SOFTWARE.
+ */
+package com.uber.marmaray.common.actions;
+
+import com.uber.marmaray.common.job.ThreadPoolService;
+import com.uber.marmaray.common.job.ThreadPoolServiceTier;
+import com.uber.marmaray.common.metrics.LongMetric;
+import com.uber.marmaray.common.reporters.Reporters;
+import lombok.Getter;
+import lombok.NonNull;
+import lombok.extern.slf4j.Slf4j;
+import org.hibernate.validator.constraints.NotEmpty;
+
+import java.util.Collection;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static com.uber.marmaray.common.metrics.DataFeedMetricNames.RESULT_FAILURE;
+import static com.uber.marmaray.common.metrics.DataFeedMetricNames.RESULT_SUCCESS;
+
+@Slf4j
+/**
+ * {@link JobDagActions} are actions that are run based on success status.
+ * This class is completely independent and NOT an implementation of the {@link IJobDagAction} interface
+ */
+public final class JobDagActions {
+ public static final String RESULT_METRIC = "result";
+ public static final String TIME_METRIC = "execution_time";
+
+ /**
+ * Class to hold and execute actions.
+ *
+ * Actions are executed in parallel; execution status will not affect the others.
+ */
+
+ public static final String DEFAULT_NAME = "anonymous";
+
+ @Getter
+ private final Queue actions;
+ private final Reporters reporters;
+
+ @Getter
+ private final String name;
+
+ public JobDagActions(@NonNull final Reporters reporters) {
+ this(reporters, DEFAULT_NAME);
+ }
+
+ public JobDagActions(@NonNull final Reporters reporters, @NotEmpty final String name) {
+ this.actions = new ConcurrentLinkedDeque<>();
+ this.name = name;
+ this.reporters = reporters;
+ }
+
+ /**
+ * Add an action to the container
+ * @param action IAction to hold
+ */
+ public void addAction(@NonNull final IJobDagAction action) {
+ this.actions.add(action);
+ }
+
+ /**
+ * Add a collection of actions to the container
+ * @param actions Collection of IActions to hold
+ */
+ public void addActions(@NonNull final Collection extends IJobDagAction> actions) {
+ this.actions.addAll(actions);
+ }
+
+ /**
+ * Execute all of the actions in parallel.
+ *
+ * Parallelism is managed by the ThreadPoolService. Throws IllegalStateException if any IAction
+ * throws an Exception during execution.
+ * @param dagSuccess whether the actions are responding to a successful dag run or a failed one
+ */
+ public boolean execute(final boolean dagSuccess) {
+ final AtomicBoolean successful = new AtomicBoolean(true);
+ final ConcurrentMap, IJobDagAction> futures = new ConcurrentHashMap<>();
+ this.actions.forEach(a ->
+ futures.put(ThreadPoolService.submit(() -> {
+ final long startTime = System.currentTimeMillis();
+ final boolean success;
+ try {
+ success = a.execute(dagSuccess);
+ } finally {
+ final long endTime = System.currentTimeMillis();
+ reportExecuteTime(a, endTime - startTime);
+ }
+ return success;
+ }, ThreadPoolServiceTier.ACTIONS_TIER, a.getTimeoutSeconds()),
+ a));
+ futures.forEach((future, action) -> {
+ final AtomicBoolean actionSuccess = new AtomicBoolean(true);
+ try {
+ actionSuccess.set(future.get());
+ } catch (Exception e) {
+ log.error("Error running JobDagAction {} for {}:", action.getClass(), this.getName(), e);
+ actionSuccess.set(false);
+ successful.set(false);
+ }
+ reportActionStatus(action, actionSuccess.get());
+ });
+ if (!successful.get()) {
+ log.warn("Errors encountered during JobDagActions execution");
+ }
+ return successful.get();
+ }
+
+ private void reportExecuteTime(@NonNull final IJobDagAction action, final long timeInMillis) {
+ final LongMetric timeMetric = new LongMetric(TIME_METRIC, TimeUnit.MILLISECONDS.toSeconds(timeInMillis));
+ timeMetric.addTags(action.getMetricTags());
+ this.reporters.getReporters().stream().forEach(r -> r.gauge(timeMetric));
+ }
+
+ private void reportActionStatus(@NonNull final IJobDagAction action, final boolean isSuccess) {
+ final LongMetric resultMetric = new LongMetric(RESULT_METRIC, isSuccess ? RESULT_SUCCESS : RESULT_FAILURE);
+ resultMetric.addTags(action.getMetricTags());
+ this.reporters.getReporters().stream().forEach(r -> r.gauge(resultMetric));
+ }
+}
diff --git a/marmaray/src/main/java/com/uber/marmaray/common/actions/ReporterAction.java b/marmaray/src/main/java/com/uber/marmaray/common/actions/ReporterAction.java
new file mode 100644
index 0000000..d33e738
--- /dev/null
+++ b/marmaray/src/main/java/com/uber/marmaray/common/actions/ReporterAction.java
@@ -0,0 +1,59 @@
+/*
+ * Copyright (c) 2018 Uber Technologies, Inc.
+ * Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated
+ * documentation files (the "Software"), to deal in the Software without restriction, including without limitation the
+ * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to
+ * permit persons to whom the Software is furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all copies or substantial portions
+ * of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO
+ * THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
+ * TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
+ * IN THE SOFTWARE.
+ */
+package com.uber.marmaray.common.actions;
+
+import com.uber.marmaray.common.metrics.DataFeedMetrics;
+import com.uber.marmaray.common.metrics.JobMetrics;
+import com.uber.marmaray.common.reporters.Reporters;
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+@AllArgsConstructor
+/**
+ * {@link ReporterAction} is an implemetation of the {@link IJobDagAction} interface and is used to
+ * report metrics
+ */
+public class ReporterAction implements IJobDagAction {
+
+ public static int DEFAULT_TIMEOUT_SECONDS = 120;
+
+ @Getter
+ private final Reporters reporters;
+ @Getter
+ private final JobMetrics jobMetrics;
+ private final DataFeedMetrics dataFeedMetrics;
+
+ @Override
+ public boolean execute(final boolean success) {
+ if (success) {
+ this.reporters.getReporters().forEach(reporter -> {
+ this.jobMetrics.gaugeAll(reporter);
+ this.dataFeedMetrics.gaugeAll(reporter);
+ });
+ } else {
+ log.warn("No metrics produced or actions being executed on reporter because errors were encountered");
+ }
+ return success;
+ }
+
+ @Override
+ public int getTimeoutSeconds() {
+ return DEFAULT_TIMEOUT_SECONDS;
+ }
+}
diff --git a/marmaray/src/main/java/com/uber/marmaray/common/configuration/AwsConfiguration.java b/marmaray/src/main/java/com/uber/marmaray/common/configuration/AwsConfiguration.java
new file mode 100644
index 0000000..fa83c80
--- /dev/null
+++ b/marmaray/src/main/java/com/uber/marmaray/common/configuration/AwsConfiguration.java
@@ -0,0 +1,80 @@
+/*
+ * Copyright (c) 2018 Uber Technologies, Inc.
+ * Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated
+ * documentation files (the "Software"), to deal in the Software without restriction, including without limitation the
+ * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to
+ * permit persons to whom the Software is furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all copies or substantial portions
+ * of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO
+ * THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
+ * TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
+ * IN THE SOFTWARE.
+ */
+
+package com.uber.marmaray.common.configuration;
+
+import com.uber.marmaray.utilities.ConfigUtil;
+import lombok.Getter;
+import lombok.NonNull;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+public class AwsConfiguration implements Serializable {
+ @Getter
+ private final String region;
+ @Getter
+ private final String awsAccessKeyId;
+ @Getter
+ private final String awsSecretAccessKey;
+ @Getter
+ private final String bucketName;
+ @Getter
+ private final String objectKey;
+ @Getter
+ private final String s3FilePrefix;
+ @Getter
+ private final String sourcePath;
+ @Getter
+ private final String fileSystemPrefix;
+ @Getter
+ private final String pathKey;
+
+ public AwsConfiguration(@NonNull final FileSinkConfiguration conf) {
+ ConfigUtil.checkMandatoryProperties(conf.getConf(), this.getMandatoryProperties());
+ this.region = conf.getAwsRegion().get();
+ this.awsAccessKeyId = conf.getAwsAccessKeyId().get();
+ this.awsSecretAccessKey = conf.getAwsSecretAccesskey().get();
+ this.bucketName = conf.getBucketName().get();
+ this.objectKey = conf.getObjectKey().get();
+ this.sourcePath = conf.getFullPath();
+ this.fileSystemPrefix = conf.getPathPrefix();
+
+ String s3FilePrefix;
+ if (conf.getSourcePartitionPath().isPresent()) {
+ s3FilePrefix = String.format("%s/%s/", this.objectKey, conf.getSourcePartitionPath().get());
+ } else {
+ s3FilePrefix = String.format("%s/", this.objectKey);
+ }
+ this.pathKey = s3FilePrefix;
+ s3FilePrefix += conf.getFileNamePrefix();
+ this.s3FilePrefix = s3FilePrefix;
+ }
+
+ private List getMandatoryProperties() {
+ return Collections.unmodifiableList(
+ Arrays.asList(
+ FileSinkConfiguration.AWS_REGION,
+ FileSinkConfiguration.BUCKET_NAME,
+ FileSinkConfiguration.OBJECT_KEY,
+ FileSinkConfiguration.AWS_ACCESS_KEY_ID,
+ FileSinkConfiguration.AWS_SECRET_ACCESS_KEY
+ ));
+ }
+}
diff --git a/marmaray/src/main/java/com/uber/marmaray/common/configuration/CassandraSinkConfiguration.java b/marmaray/src/main/java/com/uber/marmaray/common/configuration/CassandraSinkConfiguration.java
new file mode 100644
index 0000000..8c0e077
--- /dev/null
+++ b/marmaray/src/main/java/com/uber/marmaray/common/configuration/CassandraSinkConfiguration.java
@@ -0,0 +1,266 @@
+/*
+ * Copyright (c) 2018 Uber Technologies, Inc.
+ * Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated
+ * documentation files (the "Software"), to deal in the Software without restriction, including without limitation the
+ * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to
+ * permit persons to whom the Software is furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all copies or substantial portions
+ * of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO
+ * THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
+ * TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
+ * IN THE SOFTWARE.
+ */
+package com.uber.marmaray.common.configuration;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Splitter;
+import com.google.common.collect.Lists;
+import com.uber.marmaray.common.PartitionType;
+import com.uber.marmaray.common.exceptions.MissingPropertyException;
+import com.uber.marmaray.common.schema.cassandra.ClusterKey;
+import com.uber.marmaray.utilities.ConfigUtil;
+import com.uber.marmaray.utilities.SchemaUtil;
+import com.uber.marmaray.utilities.StringTypes;
+import lombok.Getter;
+import lombok.NonNull;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.cassandra.dht.Murmur3Partitioner;
+import org.apache.cassandra.hadoop.ConfigHelper;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * {@link CassandraSinkConfiguration} contains all relevant configuration properties to disperse data to Cassandra.
+ * Properties include but are not limited to MapReduce job settings, optimization parameters, etc.
+ *
+ * We explicitly assume all whitespace characters are trimmed from values in the yaml reader so we don't have
+ * to sanitize the values again here.
+ */
+@Slf4j
+public class CassandraSinkConfiguration implements Serializable {
+
+ public static final String CASSANDRA_PREFIX_ONLY = "cassandra.";
+ public static final String CASS_COMMON_PREFIX = Configuration.MARMARAY_PREFIX + CASSANDRA_PREFIX_ONLY;
+ public static final String HADOOP_COMMON_PREFIX = Configuration.MARMARAY_PREFIX + "hadoop.";
+ // *** Cassandra Configuration Settings ***
+ public static final String OUTPUT_THRIFT_PORT = CASS_COMMON_PREFIX + "output.thrift.port";
+
+ public static final String NATIVE_TRANSPORT_PORT =
+ HADOOP_COMMON_PREFIX + CASSANDRA_PREFIX_ONLY + "output.native.port";
+ // Note: CassandraUnit uses port 9142 so it won't disturb any local default cassandra installation
+ public static final String DEFAULT_OUTPUT_NATIVE_PORT = "9042";
+
+ public static final String INITIAL_HOSTS = HADOOP_COMMON_PREFIX + CASSANDRA_PREFIX_ONLY + "output.thrift.address";
+
+ // this setting is currently not used yet but will eventually be needed for throttling
+ public static final String STREAM_THROTTLE_MBITS = "mapreduce.output.bulkoutputformat.streamthrottlembits";
+
+ public static final String STORAGE_PORT = CASS_COMMON_PREFIX + "storage.port";
+ public static final String SSL_STORAGE_PORT = CASS_COMMON_PREFIX + "ssl.storage.port";
+
+ public static final String DISABLE_QUERY_UNS = CASS_COMMON_PREFIX + "disable_uns";
+
+ // *** End of Cassandra Configuration Settings ***1
+ public static final String DEFAULT_OUTPUT_RPC_PORT = "9160";
+ // *** End of Cassandra Configuration Settings ***
+
+ // *** Dispersal Job Settings ***
+ public static final String KEYSPACE = CASS_COMMON_PREFIX + "keyspace";
+ public static final String TABLE_NAME = CASS_COMMON_PREFIX + "tablename";
+ public static final String CLUSTER_NAME = CASS_COMMON_PREFIX + "cluster_name";
+ public static final String COLUMN_LIST = CASS_COMMON_PREFIX + "column_list";
+ public static final String PARTITION_KEYS = CASS_COMMON_PREFIX + "partition_keys";
+ public static final String CLUSTERING_KEYS = CASS_COMMON_PREFIX + "clustering_keys";
+ public static final String PARTITION_TYPE = CASS_COMMON_PREFIX + "partition_type";
+ public static final String USERNAME = CASS_COMMON_PREFIX + "username";
+ public static final String PASSWORD = CASS_COMMON_PREFIX + "password";
+ public static final String USE_CLIENT_SINK = CASS_COMMON_PREFIX + "use_client_sink";
+
+ // optional field for customers to timestamp their data
+ public static final String TIMESTAMP = CASS_COMMON_PREFIX + SchemaUtil.DISPERSAL_TIMESTAMP;
+ public static final String TIMESTAMP_IS_LONG_TYPE = CASS_COMMON_PREFIX + "timestamp_is_long_type";
+
+ // File path containing datacenter info on each machine
+ public static final String DC_PATH = "data_center_path";
+
+ public static final String DATACENTER = CASS_COMMON_PREFIX + "datacenter";
+
+ public static final String TIME_TO_LIVE = CASS_COMMON_PREFIX + "time_to_live";
+ private static final long DEFAULT_TIME_TO_LIVE = 0L;
+
+ private static final Splitter splitter = Splitter.on(StringTypes.COMMA);
+
+ @Getter
+ protected final Configuration conf;
+
+ @Getter
+ protected final List initialHosts;
+
+ /**
+ * Optional field. This functionality enables a subset of columns to be dispersed from the data source and
+ * not forcing the user to either disperse all or none of the data.
+ * This set must include all partition and clustering keys if defined or the job will fail.
+ */
+ @Getter
+ private final Optional> filteredColumns;
+
+ @Getter
+ private final List partitionKeys;
+
+ @Getter
+ private final List clusteringKeys;
+
+ @Getter
+ private final PartitionType partitionType;
+
+ @Getter
+ private final Optional writeTimestamp;
+
+ @Getter
+ private final boolean timestampIsLongType;
+
+ public CassandraSinkConfiguration(@NonNull final Configuration conf) {
+ this.conf = conf;
+ ConfigUtil.checkMandatoryProperties(this.conf, this.getMandatoryProperties());
+
+ this.partitionKeys = this.splitString(this.conf.getProperty(PARTITION_KEYS).get());
+
+ if (this.conf.getProperty(INITIAL_HOSTS).isPresent()) {
+ this.initialHosts = this.splitString(this.conf.getProperty(INITIAL_HOSTS).get());
+ } else {
+ this.initialHosts = new ArrayList<>();
+ }
+
+ // Source fields can be case insensitive so we convert everything to lower case for comparing
+ this.filteredColumns = this.conf.getProperty(COLUMN_LIST).isPresent()
+ ? Optional.of(new HashSet<>(this.splitString(this.conf.getProperty(COLUMN_LIST).get().toLowerCase())))
+ : Optional.absent();
+
+ this.clusteringKeys = this.conf.getProperty(CLUSTERING_KEYS).isPresent()
+ ? initClusterKeys(this.splitString(this.conf.getProperty(CLUSTERING_KEYS).get()))
+ : Collections.EMPTY_LIST;
+
+ if (this.conf.getProperty(PARTITION_TYPE).isPresent()) {
+ this.partitionType = PartitionType.valueOf(this.conf.getProperty(PARTITION_TYPE)
+ .get().trim().toUpperCase());
+ } else {
+ this.partitionType = PartitionType.NONE;
+ }
+
+ this.writeTimestamp = this.conf.getProperty(TIMESTAMP);
+ this.timestampIsLongType = this.conf.getBooleanProperty(TIMESTAMP_IS_LONG_TYPE, false);
+ }
+
+ /**
+ * Returns hadoop configuration.
+ * Some example of things that could be set are the output rpc port (cassandra.output.thrift.port)
+ * and other cassandra specific hadoop configuration settings.
+ */
+ public org.apache.hadoop.conf.Configuration getHadoopConf() {
+ final org.apache.hadoop.conf.Configuration hadoopConf = new org.apache.hadoop.conf.Configuration();
+
+ if (this.getUserName().isPresent() && this.getPassword().isPresent()) {
+ ConfigHelper.setOutputKeyspaceUserNameAndPassword(
+ hadoopConf,
+ this.getUserName().get(),
+ this.getPassword().get()
+ );
+ }
+
+ if (!this.conf.getProperty(DATACENTER).isPresent()) {
+ // should never happen, but a sanity check
+ throw new MissingPropertyException("The datacenter information is missing");
+ }
+
+ // By default use Murmur3 Partitioner for now, we may want this to be changed in future
+ ConfigHelper.setOutputPartitioner(hadoopConf, Murmur3Partitioner.class.getName());
+
+ log.info("Setting output local DC only to true");
+ ConfigHelper.setOutputLocalDCOnly(hadoopConf, true);
+
+ this.conf.getPropertiesWithPrefix(CassandraSinkConfiguration.HADOOP_COMMON_PREFIX, true).forEach(
+ (key, value) -> {
+ log.info("hadoop-conf-update:key:[{}]:value:[{}]", key, value);
+ hadoopConf.set(key, value);
+ }
+ );
+ return hadoopConf;
+ }
+
+ public String getKeyspace() {
+ return this.getConf().getProperty(KEYSPACE).get().trim();
+ }
+
+ public String getTableName() {
+ return this.getConf().getProperty(TABLE_NAME).get().trim();
+ }
+
+ public String getClusterName() {
+ return this.getConf().getProperty(CLUSTER_NAME).get().trim();
+ }
+
+ public Boolean getUseClientSink() {
+ return this.getConf().getBooleanProperty(USE_CLIENT_SINK, false);
+ }
+
+ public Optional getUserName() {
+ return this.getConf().getProperty(USERNAME);
+ }
+
+ public Optional getPassword() {
+ return this.getConf().getProperty(PASSWORD);
+ }
+
+ public Optional getSSLStoragePort() {
+ return this.conf.getProperty(SSL_STORAGE_PORT).isPresent()
+ ? Optional.of(this.conf.getProperty(SSL_STORAGE_PORT).get())
+ : Optional.absent();
+ }
+
+ public Optional getStoragePort() {
+ return this.conf.getProperty(STORAGE_PORT).isPresent()
+ ? Optional.of(this.conf.getProperty(STORAGE_PORT).get())
+ : Optional.absent();
+ }
+
+ // TTL is optional. By default if you set it to 0 it means forever (confirm this)
+ public Optional getTimeToLive() {
+ return this.conf.getProperty(TIME_TO_LIVE).isPresent()
+ ? Optional.of(this.conf.getLongProperty(TIME_TO_LIVE, DEFAULT_TIME_TO_LIVE))
+ : Optional.absent();
+ }
+
+ // TODO - For these optional fields, consider adding a default value
+ public Optional getNativePort() {
+ return this.conf.getProperty(NATIVE_TRANSPORT_PORT);
+ }
+
+ private List getMandatoryProperties() {
+ return Collections.unmodifiableList(
+ Arrays.asList(
+ KEYSPACE,
+ TABLE_NAME,
+ CLUSTER_NAME,
+ PARTITION_KEYS
+ ));
+ }
+
+ protected List splitString(final String commaSeparatedValues) {
+ return Lists.newArrayList(splitter.split(commaSeparatedValues));
+ }
+
+ private List initClusterKeys(final List entries) {
+ return entries.stream().map(entry -> ClusterKey.parse(entry)).collect(Collectors.toList());
+ }
+}
diff --git a/marmaray/src/main/java/com/uber/marmaray/common/configuration/ConfigScopeResolver.java b/marmaray/src/main/java/com/uber/marmaray/common/configuration/ConfigScopeResolver.java
new file mode 100644
index 0000000..1369dd6
--- /dev/null
+++ b/marmaray/src/main/java/com/uber/marmaray/common/configuration/ConfigScopeResolver.java
@@ -0,0 +1,289 @@
+/*
+ * Copyright (c) 2018 Uber Technologies, Inc.
+ * Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated
+ * documentation files (the "Software"), to deal in the Software without restriction, including without limitation the
+ * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to
+ * permit persons to whom the Software is furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all copies or substantial portions
+ * of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO
+ * THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
+ * TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
+ * IN THE SOFTWARE.
+ */
+
+package com.uber.marmaray.common.configuration;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.uber.marmaray.common.exceptions.JobRuntimeException;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+import lombok.NonNull;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.hibernate.validator.constraints.NotEmpty;
+
+/**
+ *
+ * What qualify as scopes?
+ * Top-level nodes which appear under a tag dedicated to represent
+ * scope override qualify as scopes.
+ *
+ *
+ * How is scope override defined?
+ * Scope override definition is defined under a config driven tag dedicated
+ * for scope override. In the example below in (fig 1), the tag happens to be
+ * scope_override and scopes are x, y & z. Scopes y and z are the overrider
+ * scopes and x is the default scope.
+ *
+ *
+ * How is scope overriding done?
+ * There are not more than 2 scopes participating in a scope override at any time.
+ * That is, either the user chooses to use the scope y, in which case y overrides x
+ * OR the user chooses to use scope z, in which case z overrides x
+ * OR user chooses to not use any scope, in which case no overriding is done at all.
+ * Also note that y and z are totally unrelated and never participate together in
+ * scope overriding. Before explaining the logic of scope overriding, please note
+ * the convention in the example below
+ *
+ *
+ * Any node that starts with p represents a primitive node. In other
+ * words, it represents a config-key and a node that starts with v
+ * represents the config-value.
+ *
+ *
+ * Nodes x, y, z represent scopes as explained in the previous question.
+ *
+ * ├── scope_override
+ * │  ├── {y = x}
+ * │  └── {z = x}
+ * │   Â
+ * ├── x
+ * │  └── a
+ * │  ├── b
+ * │  │  ├── c
+ * │  │  │  ├── d
+ * │  │  │  │  ├── {p3 = v3_x}
+ * │  │  │  │  └── {p4 = v4_x}
+ * │  │  │  └── {p2 = v2_x}
+ * │  │  └── {p1 = v1_x}
+ * │  └── e
+ * │  └── f
+ * │  └── {p5 = v5_x}
+ * ├── y
+ * │  └── a
+ * │  └── b
+ * │  ├── c
+ * │  │  ├── d
+ * │  │  │  ├── {p3 = v3_y}
+ * │  │  │  └── {p4 = v4_y}
+ * │  │  └── {p2 = v2_y}
+ * │  └── {p1 = v1_y}
+ * └── z
+ * └── a
+ * └── b
+ * ├── c
+ * │  ├── d
+ * │  │  ├── {p3 = v3_z}
+ * │  │  └── {p7 = v7_z}
+ * │  └── {p2 = v2_z}
+ * ├── g
+ * │  └── {p6 = v6_z}
+ * └── {p1 = v1_z}
+ *
+ *
+ *
+ *
+ * Overriding is done by replacing the value of a config-key that appears
+ * in both the scopes, along the exact same hierarchy.
+ *
+ * When scope y overrides x, the final config-key values resolved are below.
+ * It inherits p5 as is from x. Overrides p1, p2, p3, p4.
+ * (fig 2)
+ *
+ * a
+ * ├── b
+ * │  ├── c
+ * │  │  ├── d
+ * │  │  │  ├── {p3 = v3_y}
+ * │  │  │  └── {p4 = v4_y}
+ * │  │  └── {p2 = v2_y}
+ * │  └── {p1 = v1_y}
+ * └── e
+ * └── f
+ * └── {p5 = v5_x}
+ *
+ * When scope z overrides x, the final config-key values resolved are below.
+ * It inherits p5 as is from x. Overrides p1, p2, p3, p4. Retains its p6.
+ * (fig 3)
+ *
+ * a
+ * ├── b
+ * │  ├── c
+ * │  │  ├── d
+ * │  │  │  ├── {p3 = v3_z}
+ * │  │  │  └── {p4 = v4_z}
+ * │  │  │  └── {p7 = v7_z}
+ * │  │  └── {p2 = v2_z}
+ * │ ├── g
+ * │ │  └── {p6 = v6_z}
+ * │  └── {p1 = v1_y}
+ * └── e
+ * └── f
+ * └── {p5 = v5_x}
+ *
+ *
+ */
+@Slf4j
+@RequiredArgsConstructor
+public class ConfigScopeResolver {
+
+ private final String scopeOverrideMappingKey;
+ private Map scopeOverrideMap;
+ private final ObjectMapper mapper = new ObjectMapper();
+
+ public JsonNode projectOverrideScopeOverDefault(
+ @NonNull final Optional scope, @NonNull final JsonNode rootJsonNode) {
+ if (!scope.isPresent() || !rootJsonNode.isContainerNode()) {
+ log.info("No scope overriding in effect. "
+ + "Either scope: {} is absent, or {} is not a container node",
+ scope, rootJsonNode);
+ return rootJsonNode;
+ }
+ Preconditions.checkState(rootJsonNode.has(scopeOverrideMappingKey),
+ String.format(
+ "scopeOverrideMappingKey: %s is not present in config but scoping is expected with scope: %s",
+ scopeOverrideMappingKey, scope));
+
+ final ObjectNode root = copyToObjectNode(rootJsonNode);
+ final JsonNode scopeOverrideDefinitionNodeVal = root.get(scopeOverrideMappingKey);
+ Preconditions.checkArgument(
+ scopeOverrideDefinitionNodeVal != null
+ && scopeOverrideDefinitionNodeVal.isContainerNode(),
+ String.format("Value for scopePrecedence %s should be a map, got null or primitive: ",
+ scopeOverrideDefinitionNodeVal));
+ this.scopeOverrideMap = mapper.convertValue(scopeOverrideDefinitionNodeVal, Map.class);
+ log.info("scopeOverrideMap is {} scope is {}", this.scopeOverrideMap, scope.get());
+
+ Preconditions.checkArgument(scopeOverrideMap.containsKey(scope.get()),
+ "Un-recognized scope passed for config resolving");
+
+ root.remove(scopeOverrideMappingKey);
+
+ final String overrideScope = scope.get();
+ final String defaultScope = scopeOverrideMap.get(overrideScope);
+ if (root.has(overrideScope) && root.has(defaultScope)) {
+ final JsonNode resolvedNode = handleScopeOverride(root, overrideScope,
+ defaultScope);
+ root.remove(overrideScope);
+ root.remove(defaultScope);
+ final Iterator fieldNamesOfResolvedNode = resolvedNode.fieldNames();
+ while (fieldNamesOfResolvedNode.hasNext()) {
+ final String fieldNameOfResolvedNode = fieldNamesOfResolvedNode.next();
+ root.put(fieldNameOfResolvedNode, resolvedNode.get(fieldNameOfResolvedNode));
+ }
+ } else {
+ log.info("No overriding done for scope combinations as one of them is missing."
+ + " IsOverrideScopePresent: {}, IsDefaultScopePresent: {} ",
+ root.has(overrideScope), root.has(defaultScope));
+ }
+ for (final Entry entry : scopeOverrideMap.entrySet()) {
+ // remove all scope definitions, now that resolving is done
+ root.remove(entry.getKey());
+ root.remove(entry.getValue());
+ }
+ return root;
+ }
+
+ private JsonNode handleScopeOverride(
+ @NonNull final ObjectNode root,
+ @NotEmpty final String overrideScope,
+ @NotEmpty final String defaultScope) {
+ final JsonNode overridingNode = root.get(overrideScope);
+ final JsonNode defaultNode = root.get(defaultScope);
+ final ObjectNode defaultNodeCopy;
+ defaultNodeCopy = copyToObjectNode(defaultNode);
+ // defaultNodeCopy will be updated by projecting overridingNode over it
+ projectOverrideNodeOverDefaultForField(null, defaultNodeCopy, overridingNode);
+ return defaultNodeCopy;
+
+ }
+
+ private ObjectNode copyToObjectNode(@NonNull final JsonNode defaultNode) {
+ final ObjectNode defaultNodeCopy;
+ try {
+ defaultNodeCopy = (ObjectNode) mapper
+ .readTree(mapper.writeValueAsString(defaultNode));
+ } catch (IOException e) {
+ log.error("Got exception", e);
+ throw new JobRuntimeException(e);
+ }
+ return defaultNodeCopy;
+ }
+
+ private void projectOverrideNodeOverDefaultForField(
+ @NotEmpty final String fieldName,
+ @NonNull final JsonNode parentDefaultNode,
+ @NonNull final JsonNode parentOverridingNode) {
+
+ final JsonNode defaultNode =
+ (fieldName == null) ? parentDefaultNode : parentDefaultNode.get(fieldName);
+ final JsonNode overridingNode =
+ (fieldName == null) ? parentOverridingNode : parentOverridingNode.get(fieldName);
+
+ if (fieldName != null) {
+ // not first time call to recursion
+ if (defaultNode == null || overridingNode == null) {
+ final JsonNode nodeToPutAtFieldName = java.util.Optional.ofNullable(defaultNode)
+ .orElse(overridingNode);
+ log.info("Copying fieldName: {} value: {}", fieldName, nodeToPutAtFieldName);
+ ((ObjectNode) parentDefaultNode).put(fieldName, nodeToPutAtFieldName);
+ return;
+ }
+ Preconditions
+ .checkState(
+ (defaultNode.isContainerNode() && overridingNode.isContainerNode())
+ || (!defaultNode.isContainerNode() && !overridingNode.isContainerNode()),
+ "Mismatch in node type between default node: {} and overriding node: {}."
+ + " One of them is a primitive node", defaultNode, overridingNode);
+ if (!overridingNode.isContainerNode()) {
+ // primitive node or TextNode since that is the only primitive node that appears here
+ // so blindly accept the value of the overriding node
+ log.info("Using value: {} of override node for fieldName: {}", overridingNode,
+ fieldName);
+ ((ObjectNode) parentDefaultNode).put(fieldName, overridingNode);
+ } else {
+ // both are container nodes
+ projectOverAllFields(defaultNode, overridingNode);
+ }
+ } else {
+ // first call to recursion, represents root default node and override node.
+ projectOverAllFields(defaultNode, overridingNode);
+ }
+ }
+
+ private void projectOverAllFields(
+ @NonNull final JsonNode defaultNode, @NonNull final JsonNode overridingNode) {
+ final Iterator childFieldNames = overridingNode.fieldNames();
+ while (childFieldNames.hasNext()) {
+ final String childFieldName = childFieldNames.next();
+ projectOverrideNodeOverDefaultForField(childFieldName, defaultNode, overridingNode);
+ }
+ }
+}
diff --git a/marmaray/src/main/java/com/uber/marmaray/common/configuration/Configuration.java b/marmaray/src/main/java/com/uber/marmaray/common/configuration/Configuration.java
new file mode 100644
index 0000000..48dcfa0
--- /dev/null
+++ b/marmaray/src/main/java/com/uber/marmaray/common/configuration/Configuration.java
@@ -0,0 +1,227 @@
+/*
+ * Copyright (c) 2018 Uber Technologies, Inc.
+ * Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated
+ * documentation files (the "Software"), to deal in the Software without restriction, including without limitation the
+ * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to
+ * permit persons to whom the Software is furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all copies or substantial portions
+ * of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO
+ * THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
+ * TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
+ * IN THE SOFTWARE.
+ */
+package com.uber.marmaray.common.configuration;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+
+import java.io.InputStream;
+import java.io.Serializable;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
+import com.uber.marmaray.common.exceptions.JobRuntimeException;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+
+import lombok.NonNull;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.hibernate.validator.constraints.NotEmpty;
+
+/**
+ * {@link Configuration} will be instantiated from a YAML based file
+ * and contain all the pertinent metadata to initialize and execute
+ * a data transfer job. Supports scopes and scope based config
+ * overriding. Refer to documentation of {@link ConfigScopeResolver}
+ * for more info on scope overriding
+ */
+@Slf4j
+public class Configuration implements Serializable {
+
+ public static final String MARMARAY_PREFIX = "marmaray.";
+ public static final String SCOPE_OVERRIDE_MAPPING_KEY = "scope_override_map";
+
+ private final Properties props = new Properties();
+
+ /**
+ * @deprecated todo: remove this constructor in a separate diff
+ * since callers will need to inject scope, so will need change in callers
+ */
+ @Deprecated
+ public Configuration() {
+
+ }
+
+ public Configuration(@NonNull final File yamlFile,
+ @NonNull final Optional scope) {
+ loadYamlFile(yamlFile, scope);
+ }
+
+ public Configuration(@NonNull final InputStream inputStream,
+ @NonNull final Optional scope) {
+ loadYamlStream(inputStream, scope);
+ }
+
+ /**
+ * @deprecated todo: remove this constructor in a separate diff
+ * since callers will need to inject scope, so will need change in callers
+ */
+ @Deprecated
+ public Configuration(@NonNull final Configuration conf) {
+ this.props.putAll(conf.props);
+ }
+
+ public void loadYamlFile(@NonNull final File yamlFile,
+ final Optional scope) {
+ try {
+ final FileSystem localFs = FileSystem.getLocal(
+ new HadoopConfiguration(new Configuration()).getHadoopConf());
+ final InputStream yamlInputStream = localFs.open(new Path(yamlFile.getPath()));
+ loadYamlStream(yamlInputStream, scope);
+ } catch (IOException e) {
+ final String errorMsg = String
+ .format("Error loading yaml config file %s", yamlFile.getAbsolutePath());
+ log.error(errorMsg, e);
+ throw new JobRuntimeException(errorMsg, e);
+ }
+ }
+
+ public void loadYamlStream(@NonNull final InputStream yamlStream,
+ @NonNull final Optional scope) {
+ try {
+ final ObjectMapper yamlReader = new ObjectMapper(new YAMLFactory());
+ final JsonNode jsonNode = yamlReader.readTree(yamlStream);
+ final JsonNode scopeOverriddenJsonNode = handleScopeOverriding(scope, jsonNode);
+ parseConfigJson(scopeOverriddenJsonNode, "");
+ } catch (IOException e) {
+ final String errorMsg = "Error loading yaml file ";
+ log.error(errorMsg, e);
+ throw new JobRuntimeException(errorMsg, e);
+ }
+ }
+
+ private JsonNode handleScopeOverriding(
+ @NonNull final Optional scope, @NonNull final JsonNode jsonNode) {
+ return new ConfigScopeResolver(SCOPE_OVERRIDE_MAPPING_KEY)
+ .projectOverrideScopeOverDefault(scope, jsonNode);
+ }
+
+ public String getProperty(final String key, final String defaultValue) {
+ return this.props.getProperty(key, defaultValue);
+ }
+
+ public Optional getProperty(final String key) {
+ Preconditions.checkArgument(!Strings.isNullOrEmpty(key));
+ final String val = this.props.getProperty(key);
+ return (val == null) ? Optional.absent() : Optional.of(val);
+ }
+
+ public void setProperty(final String key, final String value) {
+ this.props.setProperty(key, value);
+ }
+
+ /**
+ * Returns properties with or without prefix.
+ * @param prefix
+ * @param removePrefix if true it will remove prefix from properties.
+ * @return {@link Map} with properties.
+ */
+ public Map getPropertiesWithPrefix(final String prefix, final boolean removePrefix) {
+ Preconditions.checkState(!Strings.isNullOrEmpty(prefix));
+ final Map properties = new HashMap<>();
+ final int prefixLength = prefix.length();
+ this.props.entrySet().forEach(
+ entry -> {
+ final String key = (String) entry.getKey();
+ if (key.startsWith(prefix)) {
+ if (removePrefix) {
+ properties.put(key.substring(prefixLength), entry.getValue().toString());
+ } else {
+ properties.put(key, entry.getValue().toString());
+ }
+
+ }
+ });
+ return properties;
+ }
+
+ public static T getProperty(@NonNull final Configuration conf, @NotEmpty final String key,
+ @NonNull final T defaultValue) {
+ if (defaultValue instanceof Integer) {
+ return (T) new Integer(conf.getIntProperty(key, ((Integer) defaultValue).intValue()));
+ } else if (defaultValue instanceof Long) {
+ return (T) new Long(conf.getLongProperty(key, ((Long) defaultValue).longValue()));
+ } else if (defaultValue instanceof String) {
+ return (T) conf.getProperty(key, (String) defaultValue);
+ } else if (defaultValue instanceof Double) {
+ return (T) new Double(conf.getDoubleProperty(key, ((Double) defaultValue).doubleValue()));
+ } else if (defaultValue instanceof Boolean) {
+ return (T) new Boolean(conf.getBooleanProperty(key, ((Boolean) defaultValue).booleanValue()));
+ } else {
+ throw new IllegalArgumentException("Not supported :" + defaultValue.getClass());
+ }
+ }
+
+ public int getIntProperty(final String key, final int defaultValue) {
+ final Optional val = getProperty(key);
+ return val.isPresent() ? Integer.parseInt(val.get()) : defaultValue;
+ }
+
+ public long getLongProperty(final String key, final long defaultValue) {
+ final Optional val = getProperty(key);
+ return val.isPresent() ? Long.parseLong(val.get()) : defaultValue;
+ }
+
+ public double getDoubleProperty(final String key, final double defaultValue) {
+ final Optional val = getProperty(key);
+ return val.isPresent() ? Double.parseDouble(val.get()) : defaultValue;
+ }
+
+ public boolean getBooleanProperty(final String key, final boolean defaultValue) {
+ final Optional val = getProperty(key);
+ return val.isPresent() ? Boolean.parseBoolean(val.get()) : defaultValue;
+ }
+
+ private void parseConfigJson(final JsonNode jsonNode, final String prefix) {
+ final Iterator fieldNamesIt = jsonNode.fieldNames();
+ while (fieldNamesIt.hasNext()) {
+ final String fieldName = fieldNamesIt.next();
+ final String newPrefix = prefix.isEmpty() ? fieldName.trim() : prefix + "." + fieldName.trim();
+ final JsonNode newJsonNode = jsonNode.get(fieldName);
+ if (newJsonNode.isObject()) {
+ parseConfigJson(newJsonNode, newPrefix);
+ } else {
+ props.put(newPrefix, newJsonNode.asText().trim());
+ }
+ }
+ }
+
+ @Override
+ public String toString() {
+ final StringBuilder sb = new StringBuilder();
+ this.props.entrySet().forEach(
+ entry -> {
+ sb.append(entry.getKey() + "<=>" + entry.getValue() + "\n");
+ }
+ );
+ return sb.toString();
+ }
+
+ public Set