From d0a5593cd179c67f8b28a24debb3114631d9c196 Mon Sep 17 00:00:00 2001 From: esmioley Date: Fri, 12 Apr 2019 23:09:15 -0700 Subject: [PATCH] Remove old jsonToHoodie example, replace with ParquetToCassandra (#20) --- README.md | 197 ++++----- examples/jsonToHoodie/README.md | 41 -- examples/jsonToHoodie/errorSchema.1.avsc | 12 - examples/jsonToHoodie/exampleConf.yaml | 24 -- examples/jsonToHoodie/exampleData.json | 4 - examples/jsonToHoodie/exampleSchema.1.avsc | 15 - .../examples/job/ParquetToCassandraJob.java | 407 ++++++++++++++++++ .../marmaray/utilities/CassandraSinkUtil.java | 29 ++ .../uber/marmaray/utilities/SparkUtil.java | 44 +- 9 files changed, 551 insertions(+), 222 deletions(-) delete mode 100644 examples/jsonToHoodie/README.md delete mode 100644 examples/jsonToHoodie/errorSchema.1.avsc delete mode 100644 examples/jsonToHoodie/exampleConf.yaml delete mode 100644 examples/jsonToHoodie/exampleData.json delete mode 100644 examples/jsonToHoodie/exampleSchema.1.avsc create mode 100644 marmaray/src/main/java/com/uber/marmaray/examples/job/ParquetToCassandraJob.java create mode 100644 marmaray/src/main/java/com/uber/marmaray/utilities/CassandraSinkUtil.java diff --git a/README.md b/README.md index 91025a1..53019d6 100644 --- a/README.md +++ b/README.md @@ -1,124 +1,73 @@ -# Marmaray - -

- -

- -_Note: For an End to End example of how all our components tie together, please see com.uber.marmaray.common.job.JsonHoodieIngestionJob_ - -Marmaray is a generic Hadoop data ingestion and dispersal framework and library. It is a plug-in based framework built on top of the Hadoop ecosystem where support can be added to ingest data from any source and disperse to any sink leveraging the power of Apache Spark. - -Marmaray describes a number of abstractions to support the ingestion of any source to any sink. They are described at a high-level below to help developers understand the architecture and design of the overall system. - -This system has been canonically used to ingest data into a Hadoop data lake and disperse data from a data lake to online data stores usually with lower latency semantics. The framework was intentionally designed, however, to not be tightly coupled to just this particular use case and can move data from any source to any sink. - -**End-to-End Job Flow** - -The figure below illustrates a high level flow of how Marmaray jobs are orchestrated, independent of the specific source or sink. - -

- -

- -During this process, a configuration defining specific attributes for each source and sink orchestrates every step of the next job. This includes figuring out the amount of data we need to process (i.e., its Work Unit), applying forking functions to split the raw data, for example, into ‘valid’ and ‘error’ records and converting the data to an appropriate sink format. At the end of the job the metadata will be saved/updated in the metadata manager, and metrics can be reported to track progress. - -The following sections give an overview of each of the major components that enable the job flow previously illustrated. - -**High-Level Architecture** - -The architecture diagram below illustrates the fundamental building blocks and abstractions in Marmaray that enable its overall job flow. These generic components facilitate the ability to add extensions to Marmaray, letting it support new sources and sinks. - -

- -

- -**Avro Payload** - -The central component of Marmaray’s architecture is what we call the AvroPayload, a wrapper around Avro’s GenericRecord binary encoding format which includes relevant metadata for our data processing needs. - -One of the major benefits of Avro data (GenericRecord) is that it is efficient both in its memory and network usage, as the binary encoded data can be sent over the wire with minimal schema overhead compared to JSON. Using Avro data running on top of Spark’s architecture means we can also take advantage of Spark’s data compression and encryption features. These benefits help our Spark jobs more efficiently handle data at a large scale. - -To support our any-source to any-sink architecture, we require that all ingestion sources define converters from their schema format to Avro and that all dispersal sinks define converters from the Avro Schema to the native sink data model (i.e., ByteBuffers for Cassandra). - -Requiring that all converters either convert data to or from an AvroPayload format allows a loose and intentional coupling in our data model. Once a source and its associated transformation have been defined, the source theoretically can be dispersed to any supported sink, since all sinks are source-agnostic and only care that the data is in the intermediate AvroPayload format. - -This is illustrated in the figure below: - -

- -

- - -**Data Model** - -The central component of our architecture is the introduction of the concept of what we termed the AvroPayload. AvroPayload acts as a wrapper around Avro’s GenericRecord binary encoding format along with relevant metadata for our data processing needs. One of the major benefits of Avro data (GenericRecord) is that once an Avro schema is registered with Spark, data is only sent during internode network transfers and disk writes which are then highly optimized. Using Avro data running on top of Spark’s architecture means we can also take advantage of Spark’s data compression and encryption features. These benefits factor heavily in helping our Spark jobs handle data at large scale more efficiently. Avro includes a schema to specify the structure of the data being encoded while also supporting schema evolution. For large data files, we take advantage that each record is encoded with the same schema and this schema only needs to be defined once in the file which reduces overhead. To support our any-source to any-sink architecture, we require that all ingestion sources define converters from their schema format to Avro and that all dispersal sinks define converters from the Avro Schema to the native sink data model (i.e ByteBuffers for Cassandra). - -This allows an loose and intentional coupling in our data model, where once a source and its associated transformation has been defined, it theoretically can now be dispersed to any supported sink since all sinks are source agnostic and only care that the data is in the intermediate AvroPayload format. - -**Data Converters** - -The primary function of ingestion and dispersal jobs are to perform transformations on input records from the source to ensure it is in the desired format before writing the data to the destination sink. Marmaray allows jobs to chain converters together to perform multiple transformations as needed with the potential to also write to multiple sinks. - -A secondary but critical function of DataConverters is to produce error records with every transformation. Before data is ingested into our Hadoop data lake, it is critical that all data conforms to a schema for analytical purposes and any data that is malformed, missing required fields, or otherwise deemed to have issues will be filtered out and written to error tables. This ensures a high level of data quality in our Hadoop data lake. This functionality is abstracted out by only exposing a “convert()” method to user. The convert() will act on a single piece of datum from the input schema format and do one of the following: -Return an output record in the desired output schema format -Write the input record to the error table with an error message and other useful metadata or discard the record. - -Using the Kafka -> Hudi (Hive) ingestion case, we use 2 converters: - -KafkaSourceDataConverter -- Converts Kafka messages (byte[]) to GenericRecord (wrapped in AvroPayload as described earlier). This record is then sent to our data lake for ingestion -HoodieSinkDataConverter -- Converts GenericRecord (wrapped in an AvroPayload) received from the data lake into a HoodieRecord which is needed for insertion into our Hoodie storage System - -**Error Tables** - -Error Tables are written to by DataConverters as described in a previous section. The main purpose of error tables was to enable easy debugging of jobs and reject records which do not have a backward compatible schema change Since some of this error data can have potentially sensitive user information, we control access to this error table on a “owner”+”table” level. In addition, once the owners have fixed the data and ensured it is schema conforming they can push the data back into the pipeline where it can now be successfully ingested. - -**WorkUnit Calculator** - -Marmaray moves data in mini-batches of configurable size. In order to calculate the amount of data to process, we introduced the concept of a WorkUnitCalculator. At a very high level, a work unit calculator will look at the type of input source, the previously stored checkpoint, and calculate the next work unit or batch of work. An example of a work unit would be Offset Ranges for Kafka or a collection of HDFS files for Hive/HDFS source. - -When calculating the next batch of data to process, a work unit can also take into account throttling information. Examples include the maximum amount of data to read or number of messages to read from Kafka. This is configurable per use case and gives maximum flexibility to ensure that work units are appropriately sized especially as the amount of data increases in scale and doesn’t overwhelm source or sink systems - -Each WorkUnitCalculator will return a IWorkCalculatorResult which will include the list of work units to process in the current batch as well as the new checkpoint state if the job succeeds in processing the input batch. We have also added functionality to calculate the cost of the execution of each work unit for chargeback purposes. This is very useful because now users can define various methods to compute cost using number of records, size of total records, spark executor’s effective execution time etc. As we allow multiple ingestions in a single run (i.e. multiple kafka topics can be ingested in single spark job run using separate topic specific dags.) having per topic level execution time helps in differentiating execution cost between topics. - -**Metadata Manager** - -All Marmaray jobs need a persistent store, known as the metadata manager, to store job level metadata information. A job can update its state during its execution and job will replace old saved state only if current execution of the job is successful. Otherwise, any modifications to the state are rejected. We use this for storing checkpoint information (partition offsets in case of kafka), average record size, average number of messages etc. The metadata store is designed to be generic, however, and can store any relevant metrics that is useful to track, describe, or collect status on jobs depending on the use case and user needs. - -When a job begins execution, an in memory copy of the current metadata is created and shared with the appropriate job components which will need to update the in-memory copy during job execution. If the job fails, this in memory copy will be discarded to ensure that the next run will start from the previously saved state of the last successful run. If the job succeeds the in-memory copy is now saved to the persistent store. As of now since the metadata manager has an in-memory copy there is a limitation on the amount of metadata a job can store - -

- -

- -**Fork Operator** - -The main purpose for the ForkOperator is to split the input stream of records into multiple output streams. The canonical use case for this is to have an input stream each for valid and error records which then can be appropriately handled in an separate and independent manner. - -The internal execution engine of Spark performs all operations in a manner of lazy-evaluation. Unless an action is performed (count, forEach, etc), no data is actually read. The ForkOperator was invented to avoid the re-execution of input transformations as well as the re-reading of data from the source which would have been very expensive. - -A provided ForkFunction is used by the ForkOperator to tag each datum with a valid or error annotation. These ForkOperators are called by our data converters during job execution. Users can now filter to get the desired collection of tagged records. These records are persisted in Spark to avoid having to re-read the raw input and re-apply the transformation when filtering. By default we currently use DISK_ONLY persistence to avoid memory overhead and pressure. These components are used in DataConverters to split input stream into 2 streams (output + error) but it can be used for splitting it into more than 2 streams with overlapping records if desired. For example, we could decide to split an input stream of integers (1 to 6) into an even number stream (2,4,6), odd number stream (1,3,5) and a multiple of 3 stream (3,6). - -

- -

- -**JobDag** - -The JobDag component orchestrates and performs the actual execution of the Job. It does the following: -- Initialize the MetadataManager so checkpoint information can be retrieved -- Reads input records from the ISource to create the RDD -- Hands over the RDD to the ISink to write the data to the destination storage system -- Persists updated metadata and checkpoint information to the MetadataManager if the JobDag execution succeeded -- Report status of the job and other metrics - -**JobManager** - -The JobManager is responsible for running multiple JobDags. For example, a JobDag can correspond to each topic in Kafka that is ingested, and N number of JobDags can be run by the we do instead is group together multiple JobDags as a single logical job all sharing the same SparkContext (and resources). The JobManager will be responsible for managing each one of these JobDags and can be configured to run a certain number in parallel which results in much better resource utilization since we don’t currently take advantage of Spark’s ability to dynamically allocate resources. The ordering of jobs can be defined to ensure longer running jobs and higher priority jobs get resources first. The JobManager also handles reporting job success metrics and maintain registered reporters for reporting various metrics. - -**ISource & ISink** - -The ISource contain all the necessary information to read in the source data for the appropriate requested work units and ISink contain all the necessary information on how to write to the sink. For example, a Cassandra sink would contain information about the cluster, table, partitioning keys, and clustering keys for where the data should reside. A Kafka source would contain information about the topic name, maximum messages to read, cluster information, offset initialization strategy etc. - - +## ParquetToCassandraJob + +This job demonstrates the ability to load parquet data from HDFS +(can be underlying a hive table or just raw parquet files with the same schema) to a cassandra cluster. + +Requirements: +1. hadoop +2. spark 2.1 +3. cassandra + +How to run: + +1. Create parquet files on HDFS. Can be done in spark shell: +``` +val testDF = Seq( (10, "foo"), (8, "bar"), (19, "baz")).toDF("id", "name") +testDF.coalesce(1).write.format("parquet").parquet("/path/to/testParquet") +``` + +2. replace guava in spark (guava 19.0). Can be done in spark jars directly, or use spark.yarn.archive to update the libraries used. + +3. create the following config file, and put in HDFS +``` +marmaray: + cassandra: + cluster_name: testcluster + datacenter: solo + keyspace: marmaray + partition_keys: id + tablename: test_parquet_cassandra + error_table: + enabled: false + hadoop: + yarn_queue: default + cassandra: + output.thrift.address: localhost + hive: + data_path: /path/to/testParquet + job_name: testParquetToCassandra + lock_manager: + is_enabled: false + zk_base_path: /hoodie/no-op + metadata_manager: + cassandra: + cluster: testcluster + keyspace: marmaray + table_name: marmaray_metadata_table + username: + password: + output.thrift.address: localhost + type: CASSANDRA + job_name: testParquetToCassandra + zookeeper: + port: 2181 + quorum: unused +``` + +4. Run the spark job +``` +./bin/spark-submit --class com.uber.marmaray.examples.job.ParquetToCassandraJob path/to/marmaray-1.0-SNAPSHOT-jar-with-dependencies.jar -c path/to/test.yaml +``` + +5. On success, the data will be dispersed to cassandra. You can use CQL to verify +``` +cqlsh> select * from marmaray.test_parquet_cassandra; + + id | name +----+------ + 10 | foo + 19 | baz + 8 | bar + +(3 rows) +``` diff --git a/examples/jsonToHoodie/README.md b/examples/jsonToHoodie/README.md deleted file mode 100644 index 8841493..0000000 --- a/examples/jsonToHoodie/README.md +++ /dev/null @@ -1,41 +0,0 @@ -# JSON to Hudi example - -This demonstrates the ability to load JSON files to a Hudi-based table. - -# Instructions -1. `mvn clean package` to obtain `marmaray-VERSION-jar-with-dependencies.jar` -2. Copy the file to spark -3. Create a directory to hold the files, and replace BASE_PATH in sampleConfig.yaml with that path. For example, I used oss_test as my base path. -4. Create the schema service path (BASE_PATH/schema) and put the exampleSchema.1.avsc and errorSchema.1.avsc files in it -5. Create the data path (BASE_PATH/data) and put the exampleData.json in it -6. Put the yaml file somewhere in BASE_PATH -Call spark-submit with `spark-submit --class com.uber.marmaray.common.job.JsonHoodie marmaray-VERSION-jar-with-dependencies.jar -c BASE_PATH/exampleConf.yaml` - -It will create two hoodie output data sets, one with two successfully ingested records (BASE_PATH/table), one with two error records (BASE_PATH/errorTable). - -If you look into the error records, you can see that one record is failing due to missing a required field, timestamp, and the other is failing due to having a schema mismatch. - -``` -scala> spark.read.parquet("oss_test/table/2018/09/09/61ee3dbd-7c97-42c1-bb1f-2d9a50c03d5c_0_20180910020542.parquet").show(false) -+-------------------+--------------------+------------------+----------------------+--------------------------------------------------------------------------------+-------------+--------+------------------------------+----------+ -|_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path|_hoodie_file_name |firstName |lastName|address |timestamp | -+-------------------+--------------------+------------------+----------------------+--------------------------------------------------------------------------------+-------------+--------+------------------------------+----------+ -|20180910020542 |20180910020542_0_3 |Foo |2018/09/09 |2018-09-09_61ee3dbd-7c97-42c1-bb1f-2d9a50c03d5c_0_20180910020542_27_3888.parquet|Foo |Bar |[12345 Main St,Anytown,123456]|1536518565| -|20180910020542 |20180910020542_0_4 |OnlyFirstname |2018/09/09 |2018-09-09_61ee3dbd-7c97-42c1-bb1f-2d9a50c03d5c_0_20180910020542_27_3888.parquet|OnlyFirstname|null |null |1536518566| -+-------------------+--------------------+------------------+----------------------+--------------------------------------------------------------------------------+-------------+--------+------------------------------+----------+ - -scala> spark.read.parquet("oss_test/errorTable/2018/09/10/06af2c95-0d3e-43e8-9f5e-717dd9b20209_0_20180910020527.parquet").show(false) -+-------------------+--------------------+--------------------------+----------------------+----------------------------------------------------------------------------+-----------------+-------------------------------------------------------------------------------------------------+----------------------------------------------------------+---------------------------+----------------------------------+ -|_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key |_hoodie_partition_path|_hoodie_file_name |hadoop_row_key |hadoop_error_source_data |hadoop_error_exception |hadoop_changelog_columns |hadoop_application_id | -+-------------------+--------------------+--------------------------+----------------------+----------------------------------------------------------------------------+-----------------+-------------------------------------------------------------------------------------------------+----------------------------------------------------------+---------------------------+----------------------------------+ -|20180910020527 |20180910020527_0_1 |Hoodie_record_key_constant|2018/09/10 |2018-09-10_06af2c95-0d3e-43e8-9f5e-717dd9b20209_0_20180910020527_2_2.parquet|ROW_KEY_NOT_FOUND|RawData(data={"firstName": "badData", "address": {"zip": "NotANumber"}, "timestamp": 1536518567})|Type conversion error for field zip, NotANumber for "long"|CHANGELOG_COLUMNS_NOT_FOUND|application_1525479763952_10243196| -+-------------------+--------------------+--------------------------+----------------------+----------------------------------------------------------------------------+-----------------+-------------------------------------------------------------------------------------------------+----------------------------------------------------------+---------------------------+----------------------------------+ - - -scala> spark.read.parquet("oss_test/errorTable/2018/09/10/de94d50f-2c07-405d-9448-437cbf686343_0_20180910020538.parquet").show(false) -+-------------------+--------------------+--------------------------+----------------------+--------------------------------------------------------------------------------+-----------------+---------------------------------------------------------------------------------------+-----------------------------------+---------------------------+----------------------------------+ -|_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key |_hoodie_partition_path|_hoodie_file_name |hadoop_row_key |hadoop_error_source_data |hadoop_error_exception |hadoop_changelog_columns |hadoop_application_id | -+-------------------+--------------------+--------------------------+----------------------+--------------------------------------------------------------------------------+-----------------+---------------------------------------------------------------------------------------+-----------------------------------+---------------------------+----------------------------------+ -|20180910020538 |20180910020538_0_2 |Hoodie_record_key_constant|2018/09/10 |2018-09-10_de94d50f-2c07-405d-9448-437cbf686343_0_20180910020538_14_1433.parquet|ROW_KEY_NOT_FOUND|{"firstName": "missingTimestamp", "lastName": null, "address": null, "timestamp": null}|required field is missing:timestamp|CHANGELOG_COLUMNS_NOT_FOUND|application_1525479763952_10243196| -+-------------------+--------------------+--------------------------+----------------------+--------------------------------------------------------------------------------+-----------------+---------------------------------------------------------------------------------------+-----------------------------------+---------------------------+----------------------------------+``` -``` diff --git a/examples/jsonToHoodie/errorSchema.1.avsc b/examples/jsonToHoodie/errorSchema.1.avsc deleted file mode 100644 index a0ccc95..0000000 --- a/examples/jsonToHoodie/errorSchema.1.avsc +++ /dev/null @@ -1,12 +0,0 @@ -{ - "type": "record", - "name": "errorSchema", - "schemaVersion": 1, - "fields": [ - {"name": "hadoop_row_key", "type": ["null", "string"], "default": null}, - {"name": "hadoop_error_source_data", "type": ["null", "string"], "default": null}, - {"name": "hadoop_error_exception", "type": ["null", "string"], "default": null}, - {"name": "hadoop_changelog_columns", "type": ["null", "string"], "default": null}, - {"name": "hadoop_application_id", "type": ["null", "string"], "default": null} - ] -} diff --git a/examples/jsonToHoodie/exampleConf.yaml b/examples/jsonToHoodie/exampleConf.yaml deleted file mode 100644 index 521c03b..0000000 --- a/examples/jsonToHoodie/exampleConf.yaml +++ /dev/null @@ -1,24 +0,0 @@ -marmaray: - source: - file: - directory: BASE_PATH/data - type: json - schema: exampleSchema - hdfs_schema_service: - path: BASE_PATH/schema - hoodie: - tables: - only_table: - table_name: exampleTable - base_path: BASE_PATH/table - metrics_prefix: "" - lock_manager: - zk_base_path: "irrelevant" - is_enabled: false - zookeeper: - quorum: "irrelevant" - port: "irrelevant" - error_table: - enabled: true - dest_path: BASE_PATH/errorTable - is_date_partitioned: true diff --git a/examples/jsonToHoodie/exampleData.json b/examples/jsonToHoodie/exampleData.json deleted file mode 100644 index 515e894..0000000 --- a/examples/jsonToHoodie/exampleData.json +++ /dev/null @@ -1,4 +0,0 @@ -{"firstName": "Foo", "lastName": "Bar", "address": {"line1": "12345 Main St", "city": "Anytown", "zip": 123456}, "timestamp": 1536518565} -{"firstName": "OnlyFirstname", "timestamp": 1536518566} -{"firstName": "badData", "address": {"zip": "NotANumber"}, "timestamp": 1536518567} -{"firstName": "missingTimestamp"} diff --git a/examples/jsonToHoodie/exampleSchema.1.avsc b/examples/jsonToHoodie/exampleSchema.1.avsc deleted file mode 100644 index 570b561..0000000 --- a/examples/jsonToHoodie/exampleSchema.1.avsc +++ /dev/null @@ -1,15 +0,0 @@ -{ - "type": "record", - "name": "exampleSchema", - "schemaVersion": 1, - "fields": [ - { "name": "firstName", "type": ["null", "string"], "default": null }, - { "name": "lastName", "type": ["null", "string"], "default": null }, - { "name": "address", "type": ["null", {"type": "record", "name": "address_items", "fields": [ - { "name": "line1", "type": ["null", "string"], "default": null }, - { "name": "city", "type": ["null", "string"], "default": null }, - { "name": "zip", "type": ["null", "long"], "default": null} - ] } ], "default": null}, - { "name": "timestamp", "type": ["null", "long"], "default": null} - ] -} diff --git a/marmaray/src/main/java/com/uber/marmaray/examples/job/ParquetToCassandraJob.java b/marmaray/src/main/java/com/uber/marmaray/examples/job/ParquetToCassandraJob.java new file mode 100644 index 0000000..900f062 --- /dev/null +++ b/marmaray/src/main/java/com/uber/marmaray/examples/job/ParquetToCassandraJob.java @@ -0,0 +1,407 @@ +/* + * Copyright (c) 2019 Uber Technologies, Inc. + * Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated + * documentation files (the "Software"), to deal in the Software without restriction, including without limitation the + * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to + * permit persons to whom the Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all copies or substantial portions + * of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO + * THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, + * TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS + * IN THE SOFTWARE. + */ + +package com.uber.marmaray.examples.job; + +import com.beust.jcommander.JCommander; +import com.beust.jcommander.Parameter; +import com.google.common.base.Optional; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; +import com.uber.marmaray.common.configuration.CassandraMetadataManagerConfiguration; +import com.uber.marmaray.common.configuration.CassandraSinkConfiguration; +import com.uber.marmaray.common.configuration.Configuration; +import com.uber.marmaray.common.configuration.HiveConfiguration; +import com.uber.marmaray.common.configuration.HiveSourceConfiguration; +import com.uber.marmaray.common.converters.data.CassandraSinkCQLDataConverter; +import com.uber.marmaray.common.converters.data.CassandraSinkDataConverter; +import com.uber.marmaray.common.converters.data.SparkSourceDataConverter; +import com.uber.marmaray.common.converters.schema.CassandraSchemaConverter; +import com.uber.marmaray.common.converters.schema.DataFrameSchemaConverter; +import com.uber.marmaray.common.exceptions.JobRuntimeException; +import com.uber.marmaray.common.job.JobDag; +import com.uber.marmaray.common.job.JobManager; +import com.uber.marmaray.common.metadata.CassandraBasedMetadataManager; +import com.uber.marmaray.common.metadata.IMetadataManager; +import com.uber.marmaray.common.metrics.DataFeedMetricNames; +import com.uber.marmaray.common.metrics.DataFeedMetrics; +import com.uber.marmaray.common.metrics.ErrorCauseTagNames; +import com.uber.marmaray.common.metrics.JobMetricNames; +import com.uber.marmaray.common.metrics.JobMetrics; +import com.uber.marmaray.common.metrics.LongMetric; +import com.uber.marmaray.common.metrics.ModuleTagNames; +import com.uber.marmaray.common.metrics.TimerMetric; +import com.uber.marmaray.common.reporters.ConsoleReporter; +import com.uber.marmaray.common.reporters.Reporters; +import com.uber.marmaray.common.schema.cassandra.CassandraSchema; +import com.uber.marmaray.common.schema.cassandra.CassandraSinkSchemaManager; +import com.uber.marmaray.common.sinks.ISink; +import com.uber.marmaray.common.sinks.cassandra.CassandraClientSink; +import com.uber.marmaray.common.sinks.cassandra.CassandraSSTableSink; +import com.uber.marmaray.common.sources.ISource; +import com.uber.marmaray.common.sources.IWorkUnitCalculator; +import com.uber.marmaray.common.sources.hive.HiveSource; +import com.uber.marmaray.common.sources.hive.ParquetWorkUnitCalculator; +import com.uber.marmaray.common.spark.SparkArgs; +import com.uber.marmaray.common.spark.SparkFactory; +import com.uber.marmaray.utilities.SparkUtil; +import com.uber.marmaray.utilities.CassandraSinkUtil; +import com.uber.marmaray.utilities.ErrorExtractor; +import com.uber.marmaray.utilities.FSUtils; +import com.uber.marmaray.utilities.JobUtil; +import com.uber.marmaray.utilities.SchemaUtil; +import com.uber.marmaray.utilities.TimestampInfo; +import com.uber.marmaray.utilities.listener.TimeoutManager; +import lombok.Getter; +import lombok.NonNull; +import lombok.extern.slf4j.Slf4j; +import org.apache.avro.Schema; +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.sql.SQLContext; +import org.apache.spark.sql.types.StructType; +import org.hibernate.validator.constraints.NotEmpty; +import parquet.Preconditions; + +import java.io.IOException; +import java.time.Instant; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Collectors; + +/** + * Job to load data from parquet files on HDFS to a Cassandra instance + */ +@Slf4j +public class ParquetToCassandraJob { + + /** + * Generic entry point + * @param args arguments for the job, from the command line + * @throws IOException + */ + public static void main(final String[] args) throws IOException { + new ParquetToCassandraJob().run(args); + } + + /** + * Main execution method for the job. + * @param args command line arguments + * @throws IOException + */ + private void run(final String[] args) throws IOException { + final Instant jobStartTime = Instant.now(); + + final Configuration conf = getConfiguration(args); + + final Reporters reporters = new Reporters(); + reporters.addReporter(new ConsoleReporter()); + + final Map metricTags = Collections.emptyMap(); + final DataFeedMetrics dataFeedMetrics = new DataFeedMetrics("parquet to cassandra dispersal", metricTags); + + final FileSystem fs = FSUtils.getFs(conf, Optional.absent()); + + log.info("Initializing configurations for job"); + final TimerMetric confInitMetric = new TimerMetric(DataFeedMetricNames.INIT_CONFIG_LATENCY_MS, + metricTags); + final HiveSourceConfiguration hiveSourceConf; + final CassandraSinkConfiguration cassandraConf; + try { + hiveSourceConf = new HiveSourceConfiguration(conf); + cassandraConf = new CassandraSinkConfiguration(conf, dataFeedMetrics); + } catch (final Exception e) { + final LongMetric configError = new LongMetric(DataFeedMetricNames.DISPERSAL_CONFIGURATION_INIT_ERRORS, 1); + configError.addTags(metricTags); + configError.addTags(DataFeedMetricNames + .getErrorModuleCauseTags(ModuleTagNames.CONFIGURATION, ErrorCauseTagNames.CONFIG_ERROR)); + reporters.report(configError); + reporters.getReporters().forEach(reporter -> dataFeedMetrics.gauageFailureMetric(reporter)); + throw e; + } + confInitMetric.stop(); + reporters.report(confInitMetric); + + log.info("Initializing metadata manager for job"); + final TimerMetric metadataManagerInitMetric = + new TimerMetric(DataFeedMetricNames.INIT_METADATAMANAGER_LATENCY_MS, metricTags); + final IMetadataManager metadataManager; + try { + metadataManager = initMetadataManager(conf, dataFeedMetrics); + } catch (final JobRuntimeException e) { + final LongMetric configError = new LongMetric(DataFeedMetricNames.DISPERSAL_CONFIGURATION_INIT_ERRORS, 1); + configError.addTags(metricTags); + configError.addTags(DataFeedMetricNames + .getErrorModuleCauseTags(ModuleTagNames.METADATA_MANAGER, ErrorCauseTagNames.CONFIG_ERROR)); + reporters.report(configError); + reporters.getReporters().forEach(reporter -> dataFeedMetrics.gauageFailureMetric(reporter)); + throw e; + } + metadataManagerInitMetric.stop(); + reporters.report(metadataManagerInitMetric); + + // Todo - T1021227: Consider using Schema Service instead + log.info("Reading schema from path: {}", hiveSourceConf.getDataPath()); + final TimerMetric convertSchemaLatencyMs = + new TimerMetric(DataFeedMetricNames.CONVERT_SCHEMA_LATENCY_MS, metricTags); + final StructType inputSchema; + try { + inputSchema = SchemaUtil.generateSchemaFromParquet(fs, + hiveSourceConf.getDataPath(), Optional.of(dataFeedMetrics)); + } catch (final JobRuntimeException e) { + final LongMetric configError = new LongMetric(DataFeedMetricNames.DISPERSAL_CONFIGURATION_INIT_ERRORS, 1); + configError.addTags(metricTags); + configError.addTags(DataFeedMetricNames + .getErrorModuleCauseTags(ModuleTagNames.SCHEMA_MANAGER, ErrorCauseTagNames.CONFIG_ERROR)); + reporters.report(configError); + reporters.getReporters().forEach(reporter -> dataFeedMetrics.gauageFailureMetric(reporter)); + throw e; + } + + final DataFrameSchemaConverter schemaConverter = new DataFrameSchemaConverter(); + final Schema outputSchema = schemaConverter.convertToCommonSchema(inputSchema); + convertSchemaLatencyMs.stop(); + reporters.report(convertSchemaLatencyMs); + + final SparkArgs sparkArgs = new SparkArgs( + Arrays.asList(outputSchema), + SparkUtil.getSerializationClasses(), + conf); + final SparkFactory sparkFactory = new SparkFactory(sparkArgs); + final JobManager jobManager = JobManager.createJobManager(conf, "marmaray", + "frequency", sparkFactory, reporters); + + final JavaSparkContext jsc = sparkFactory.getSparkContext(); + try { + log.info("Initializing converters & schemas for job"); + final SQLContext sqlContext = SQLContext.getOrCreate(jsc.sc()); + + log.info("Common schema is: {}", outputSchema.toString()); + + final TimestampInfo tsInfo = new TimestampInfo(cassandraConf.getWriteTimestamp(), + cassandraConf.isTimestampIsLongType(), cassandraConf.getTimestampFieldName()); + log.info("Using optional Cassandra timestamp: {}", tsInfo); + + final List requiredKeys = Lists.newArrayList( + Iterables.concat( + cassandraConf.getClusteringKeys() + .stream() + .map(ck -> ck.getName()).collect(Collectors.toList()), + cassandraConf.getPartitionKeys())); + if (tsInfo.hasTimestamp()) { + requiredKeys.remove(tsInfo.getTimestampFieldName()); + } + log.info("Required keys for source and sink are: {}", requiredKeys); + + log.info("Initializing schema converter"); + final CassandraSchemaConverter cassandraSchemaConverter = new CassandraSchemaConverter( + cassandraConf.getKeyspace(), cassandraConf.getTableName(), + tsInfo, cassandraConf.getFilteredColumns()); + final CassandraSchema cassandraSchema = cassandraSchemaConverter.convertToExternalSchema(outputSchema); + + log.info("Initializing schema manager"); + final CassandraSinkSchemaManager schemaManager; + try { + schemaManager = new CassandraSinkSchemaManager(cassandraSchema, + cassandraConf.getPartitionKeys(), + cassandraConf.getClusteringKeys(), + cassandraConf.getTimeToLive(), + Optional.of(dataFeedMetrics), + CassandraSinkUtil.computeTimestamp(conf.getProperty(HiveConfiguration.PARTITION)), + cassandraConf.getWrittenTime().isPresent()); + } catch (JobRuntimeException e) { + reporters.getReporters().forEach(reporter -> dataFeedMetrics.gauageFailureMetric(reporter)); + throw e; + } + + log.info("Initializing source data converter"); + // Source + final SparkSourceDataConverter dataConverter = new SparkSourceDataConverter( + inputSchema, + outputSchema, + conf, + Sets.newHashSet(requiredKeys), + new ErrorExtractor()); + + log.info("Initializing source & sink for job"); + final ISource hiveSource = new HiveSource(hiveSourceConf, sqlContext, dataConverter); + final ISink cassandraSink; + + // Sink + if (cassandraConf.getUseClientSink()) { + final CassandraSinkCQLDataConverter sinkCQLDataConverter = new CassandraSinkCQLDataConverter( + outputSchema, + conf, + cassandraConf.getFilteredColumns(), + requiredKeys, + tsInfo, + new ErrorExtractor()); + cassandraSink = new CassandraClientSink(sinkCQLDataConverter, schemaManager, cassandraConf); + } else { + final CassandraSinkDataConverter sinkDataConverter = new CassandraSinkDataConverter( + outputSchema, + conf, + cassandraConf.getFilteredColumns(), + cassandraConf.getWrittenTime(), + requiredKeys, + tsInfo, + new ErrorExtractor()); + cassandraSink = new CassandraSSTableSink(sinkDataConverter, schemaManager, cassandraConf); + } + + log.info("Initializing work unit calculator for job"); + final IWorkUnitCalculator workUnitCalculator = new ParquetWorkUnitCalculator(hiveSourceConf, fs); + + log.info("Initializing job dag"); + final JobDag jobDag = new JobDag(hiveSource, cassandraSink, metadataManager, workUnitCalculator, + hiveSourceConf.getJobName(), hiveSourceConf.getJobName(), new JobMetrics("marmaray"), + dataFeedMetrics, reporters); + + /* + * We need to have separate m3 reporters since the JobManager automatically calls close() on + * the reporter upon completion of the executed job and it can't be used to report latency + * and other config related metrics in this class. + */ + jobManager.addJobDag(jobDag); + + log.info("Running dispersal job"); + try { + jobManager.run(); + JobUtil.raiseExceptionIfStatusFailed(jobManager.getJobManagerStatus()); + } catch (final Throwable t) { + /* + * TODO - Technically more than 1 error can occur in a job run, but this currently acts + * as more like a flag that we can alert on. Also, Metrics API as currently constructed doesn't + * have an elegant way to increment this count which should be improved. + * + * TODO: T131675 - Also, we will modify JobManager to increment the error count with the datafeed name + * which currently isn't possible. This is the most appropriate place to increment the metric. + * Once that is done we can remove this code block to report the metric. + */ + if (TimeoutManager.getInstance().getTimedOut()) { + final LongMetric runTimeError = new LongMetric(DataFeedMetricNames.MARMARAY_JOB_ERROR, 1); + runTimeError.addTags(metricTags); + runTimeError.addTags(DataFeedMetricNames.getErrorModuleCauseTags( + ModuleTagNames.JOB_MANAGER, ErrorCauseTagNames.TIME_OUT)); + reporters.report(runTimeError); + } + final LongMetric configError = new LongMetric(JobMetricNames.RUN_JOB_ERROR_COUNT, 1); + configError.addTags(metricTags); + reporters.report(configError); + throw t; + } + log.info("Dispersal job has been completed"); + + final TimerMetric jobLatencyMetric = + new TimerMetric(JobMetricNames.RUN_JOB_DAG_LATENCY_MS, metricTags, jobStartTime); + jobLatencyMetric.stop(); + reporters.report(jobLatencyMetric); + reporters.finish(); + } finally { + jsc.stop(); + } + } + + /** + * Get configuration from command line + * @param args command line arguments passed in + * @return configuration populated from them + */ + private Configuration getConfiguration(@NotEmpty final String[] args) { + final ParquetToCassandraCommandLineOptions options = new ParquetToCassandraCommandLineOptions(args); + if (options.getConfFile() != null) { + return getFileConfiguration(options.getConfFile()); + } else if (options.getJsonConf() != null) { + return getJsonConfiguration(options.getJsonConf()); + } else { + throw new JobRuntimeException("Unable to find conf; this shouldn't be possible"); + } + } + + /** + * Get configuration from JSON-based configuration + * @param jsonConf JSON string of configuration + * @return configuration populated from it + */ + private Configuration getJsonConfiguration(@NotEmpty final String jsonConf) { + final Configuration conf = new Configuration(); + conf.loadYamlStream(IOUtils.toInputStream(jsonConf), Optional.absent()); + return conf; + } + + /** + * Load configuration from a file on HDFS + * @param filePath path to the HDFS file to load + * @return configuration populated from it + */ + private Configuration getFileConfiguration(@NotEmpty final String filePath) { + final Configuration conf = new Configuration(); + try { + final FileSystem fs = FSUtils.getFs(conf, Optional.absent()); + final Path dataFeedConfFile = new Path(filePath); + log.info("Loading configuration from {}", dataFeedConfFile.toString()); + conf.loadYamlStream(fs.open(dataFeedConfFile), Optional.absent()); + } catch (IOException e) { + final String errorMessage = String.format("Unable to find configuration for %s", filePath); + log.error(errorMessage); + throw new JobRuntimeException(errorMessage, e); + } + return conf; + + } + + /** + * Initialize the metadata store system + * @param conf configuration to use + * @param dataFeedMetric metric repository for reporting metrics + * @return metadata mangaer + */ + private static IMetadataManager initMetadataManager(@NonNull final Configuration conf, + @NonNull final DataFeedMetrics dataFeedMetric) { + log.info("Create metadata manager"); + try { + return new CassandraBasedMetadataManager(new CassandraMetadataManagerConfiguration(conf), + new AtomicBoolean(true)); + } catch (IOException e) { + throw new JobRuntimeException("Unable to create metadata manager", e); + } + } + + private static final class ParquetToCassandraCommandLineOptions { + @Getter + @Parameter(names = {"--configurationFile", "-c"}, description = "path to configuration file") + private String confFile; + + @Getter + @Parameter(names = {"--jsonConfiguration", "-j"}, description = "json configuration") + private String jsonConf; + + private ParquetToCassandraCommandLineOptions(@NonNull final String[] args) { + final JCommander commander = new JCommander(this); + commander.parse(args); + Preconditions.checkState(this.confFile != null || this.jsonConf != null, + "One of jsonConfiguration or configurationFile must be specified"); + } + } + +} diff --git a/marmaray/src/main/java/com/uber/marmaray/utilities/CassandraSinkUtil.java b/marmaray/src/main/java/com/uber/marmaray/utilities/CassandraSinkUtil.java new file mode 100644 index 0000000..08e24f3 --- /dev/null +++ b/marmaray/src/main/java/com/uber/marmaray/utilities/CassandraSinkUtil.java @@ -0,0 +1,29 @@ +package com.uber.marmaray.utilities; + +import com.google.common.base.Optional; +import lombok.extern.slf4j.Slf4j; + +import java.text.ParseException; +import java.text.SimpleDateFormat; +import java.util.TimeZone; + +@Slf4j +public class CassandraSinkUtil { + + public static final TimeZone TIME_ZONE_UTC = TimeZone.getTimeZone("UTC"); + + public static Optional computeTimestamp(final Optional partition) { + if (partition.isPresent()) { + try { + final SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd"); + formatter.setTimeZone(TIME_ZONE_UTC); + final Long epochTime = formatter.parse(partition.get()).getTime() * 1000; + return Optional.of(epochTime); + } catch (ParseException e) { + log.error("Got exception in parse the date to microseconds. {}", e); + } + } + + return Optional.absent(); + } +} diff --git a/marmaray/src/main/java/com/uber/marmaray/utilities/SparkUtil.java b/marmaray/src/main/java/com/uber/marmaray/utilities/SparkUtil.java index 57ecf85..9a60b61 100644 --- a/marmaray/src/main/java/com/uber/marmaray/utilities/SparkUtil.java +++ b/marmaray/src/main/java/com/uber/marmaray/utilities/SparkUtil.java @@ -18,10 +18,19 @@ import com.google.common.base.Optional; import com.google.common.collect.Sets; +import com.uber.hoodie.WriteStatus; +import com.uber.hoodie.common.HoodieRollbackStat; +import com.uber.hoodie.common.model.HoodieRecordLocation; +import com.uber.hoodie.common.model.HoodieWriteStat; +import com.uber.marmaray.common.HoodieErrorPayload; import com.uber.marmaray.common.exceptions.JobRuntimeException; import lombok.NonNull; import lombok.extern.slf4j.Slf4j; import org.apache.avro.generic.GenericRecord; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsAction; +import org.apache.hadoop.hdfs.protocol.FsPermissionExtension; import org.apache.spark.SparkContext; import org.apache.spark.SparkEnv; import org.apache.spark.serializer.SerializerInstance; @@ -29,13 +38,20 @@ import org.apache.spark.sql.types.DataType; import org.apache.spark.sql.types.DataTypes; import org.apache.spark.storage.RDDInfo; +import org.apache.spark.util.AccumulatorMetadata; import scala.reflect.ClassManifestFactory; import scala.reflect.ClassTag; import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; +import java.util.HashSet; import java.util.List; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; /** * {@link SparkUtil} defines utility methods for working with Apache Spark @@ -53,7 +69,7 @@ private SparkUtil() { } public static void addClassesIfFound(@NonNull final List serializableClasses, - @NonNull final List classList) { + @NonNull final List classList) { for (final String className : classList) { try { serializableClasses.add(Class.forName(className)); @@ -71,6 +87,7 @@ public static Set getSupportedDataTypes() { /** * KryoSerializer is the the default serializaer + * * @return SerializerInstance */ public static SerializerInstance getSerializerInstance() { @@ -81,7 +98,7 @@ public static SerializerInstance getSerializerInstance() { } public static > T deserialize(final byte[] serializedRecord, - @NonNull final K classTag) { + @NonNull final K classTag) { if (serializedRecord == null) { return null; } @@ -124,4 +141,27 @@ public static SparkSession getOrCreateSparkSession() { return SparkSession.builder().getOrCreate(); } + /** + * Get list of classes we need for serialization + * + * @return list of classes used for serialization + */ + public static List getSerializationClasses() { + return new ArrayList<>(Arrays.asList( + HoodieErrorPayload.class, + AccumulatorMetadata.class, + TimeUnit.class, + HoodieRollbackStat.class, + FileStatus.class, + Path.class, + FsPermissionExtension.class, + FsAction.class, + HoodieWriteStat.class, + AtomicLong.class, + HashSet.class, + ConcurrentHashMap.class, + WriteStatus.class, + HoodieRecordLocation.class + )); + } }