From 143dc522e148bc04488df4cfa4c7de42ab3a8bd7 Mon Sep 17 00:00:00 2001 From: Lin Liu <141371752+linliu-code@users.noreply.github.com> Date: Thu, 19 Dec 2024 10:19:14 -0800 Subject: [PATCH] [HUDI-8776] Add schema evolution configs to SparkBroadcastManager (#12510) --- .../table/action/compact/HoodieCompactor.java | 5 +- .../MultipleSparkJobExecutionStrategy.java | 2 +- .../hudi/table/SparkBroadcastManager.java | 48 ++++++++++++-- .../HoodieSparkMergeOnReadTableCompactor.java | 6 +- .../hudi/table/TestSparkBroadcastManager.java | 66 +++++++++++++++++++ 5 files changed, 118 insertions(+), 9 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/HoodieCompactor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/HoodieCompactor.java index 88d90a850cf6..79aa2580de7b 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/HoodieCompactor.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/HoodieCompactor.java @@ -94,7 +94,8 @@ public abstract void preCompact( * * @return the {@link EngineBroadcastManager} if available. */ - public Option getEngineBroadcastManager(HoodieEngineContext context) { + public Option getEngineBroadcastManager(HoodieEngineContext context, + HoodieTableMetaClient metaClient) { return Option.empty(); } @@ -152,7 +153,7 @@ public HoodieData compact( && config.populateMetaFields(); // Virtual key support by fg reader is not ready if (useFileGroupReaderBasedCompaction) { - Option broadcastManagerOpt = getEngineBroadcastManager(context); + Option broadcastManagerOpt = getEngineBroadcastManager(context, metaClient); // Broadcast required information. broadcastManagerOpt.ifPresent(EngineBroadcastManager::prepareAndBroadcast); return context.parallelize(operations).map( diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java index 552041b8ecf1..b84c027b1ccc 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java @@ -544,7 +544,7 @@ private Dataset readRecordsForGroupAsRowWithFileGroupReader(JavaSparkContex SerializableSchema serializableTableSchemaWithMetaFields = new SerializableSchema(tableSchemaWithMetaFields); // broadcast reader context. - SparkBroadcastManager broadcastManager = new SparkBroadcastManager(getEngineContext()); + SparkBroadcastManager broadcastManager = new SparkBroadcastManager(getEngineContext(), getHoodieTable().getMetaClient()); broadcastManager.prepareAndBroadcast(); StructType sparkSchemaWithMetaFields = AvroConversionUtils.convertAvroSchemaToStructType(tableSchemaWithMetaFields); diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/SparkBroadcastManager.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/SparkBroadcastManager.java index ebe0b6da12dd..af7d18c14fd8 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/SparkBroadcastManager.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/SparkBroadcastManager.java @@ -23,11 +23,17 @@ import org.apache.hudi.SparkAdapterSupport$; import org.apache.hudi.SparkFileFormatInternalRowReaderContext; import org.apache.hudi.client.common.HoodieSparkEngineContext; +import org.apache.hudi.client.utils.SparkInternalSchemaConverter; import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.engine.HoodieReaderContext; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.TableSchemaResolver; +import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.table.timeline.InstantFileNameGenerator; import org.apache.hudi.common.util.Option; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieIOException; +import org.apache.hudi.internal.schema.InternalSchema; import org.apache.hudi.storage.StoragePath; import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration; @@ -41,7 +47,10 @@ import org.apache.spark.util.SerializableConfiguration; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; import scala.Tuple2; import scala.collection.JavaConverters; @@ -52,14 +61,16 @@ public class SparkBroadcastManager extends EngineBroadcastManager { private final transient HoodieEngineContext context; + private final transient HoodieTableMetaClient metaClient; protected Option parquetReaderOpt = Option.empty(); protected Broadcast sqlConfBroadcast; protected Broadcast parquetReaderBroadcast; protected Broadcast configurationBroadcast; - public SparkBroadcastManager(HoodieEngineContext context) { + public SparkBroadcastManager(HoodieEngineContext context, HoodieTableMetaClient metaClient) { this.context = context; + this.metaClient = metaClient; } @Override @@ -72,15 +83,24 @@ public void prepareAndBroadcast() { SQLConf sqlConf = hoodieSparkEngineContext.getSqlContext().sessionState().conf(); JavaSparkContext jsc = hoodieSparkEngineContext.jsc(); + // Prepare boolean returningBatch = sqlConf.parquetVectorizedReaderEnabled(); scala.collection.immutable.Map options = scala.collection.immutable.Map$.MODULE$.empty() .$plus(new Tuple2<>(FileFormat.OPTION_RETURNING_BATCH(), Boolean.toString(returningBatch))); + TableSchemaResolver resolver = new TableSchemaResolver(metaClient); + InstantFileNameGenerator fileNameGenerator = metaClient.getTimelineLayout().getInstantFileNameGenerator(); + HoodieTimeline timeline = metaClient.getCommitsAndCompactionTimeline().filterCompletedInstants(); + Map schemaEvolutionConfigs = + getSchemaEvolutionConfigs(resolver, timeline, fileNameGenerator, metaClient.getBasePath().toString()); - // Do broadcast. + // Broadcast: SQLConf. sqlConfBroadcast = jsc.broadcast(sqlConf); - configurationBroadcast = jsc.broadcast( - new SerializableConfiguration(getHadoopConfiguration(jsc.hadoopConfiguration()))); + // Broadcast: Configuration. + Configuration configs = getHadoopConfiguration(jsc.hadoopConfiguration()); + addSchemaEvolutionConfigs(configs, schemaEvolutionConfigs); + configurationBroadcast = jsc.broadcast(new SerializableConfiguration(configs)); + // Broadcast: ParquetReader. // Spark parquet reader has to be instantiated on the driver and broadcast to the executors parquetReaderOpt = Option.of(SparkAdapterSupport$.MODULE$.sparkAdapter().createParquetFileReader( false, sqlConfBroadcast.getValue(), options, configurationBroadcast.getValue().value())); @@ -126,4 +146,24 @@ static Configuration getHadoopConfiguration(Configuration configuration) { } return (new HadoopStorageConfiguration(hadoopConf).getInline()).unwrap(); } + + static Map getSchemaEvolutionConfigs(TableSchemaResolver schemaResolver, + HoodieTimeline timeline, + InstantFileNameGenerator fileNameGenerator, + String basePath) { + Option internalSchemaOpt = schemaResolver.getTableInternalSchemaFromCommitMetadata(); + Map configs = new HashMap<>(); + if (internalSchemaOpt.isPresent()) { + List instantFiles = timeline.getInstants().stream().map(fileNameGenerator::getFileName).collect(Collectors.toList()); + configs.put(SparkInternalSchemaConverter.HOODIE_VALID_COMMITS_LIST, String.join(",", instantFiles)); + configs.put(SparkInternalSchemaConverter.HOODIE_TABLE_PATH, basePath); + } + return configs; + } + + static void addSchemaEvolutionConfigs(Configuration configs, Map schemaEvolutionConfigs) { + for (Map.Entry entry : schemaEvolutionConfigs.entrySet()) { + configs.set(entry.getKey(), entry.getValue()); + } + } } \ No newline at end of file diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/compact/HoodieSparkMergeOnReadTableCompactor.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/compact/HoodieSparkMergeOnReadTableCompactor.java index bac394a8da54..d6eff9cd0c12 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/compact/HoodieSparkMergeOnReadTableCompactor.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/compact/HoodieSparkMergeOnReadTableCompactor.java @@ -25,6 +25,7 @@ import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.WriteOperationType; +import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.util.Option; @@ -45,8 +46,9 @@ public class HoodieSparkMergeOnReadTableCompactor extends HoodieCompactor>, HoodieData, HoodieData> { @Override - public Option getEngineBroadcastManager(HoodieEngineContext context) { - return Option.of(new SparkBroadcastManager(context)); + public Option getEngineBroadcastManager(HoodieEngineContext context, + HoodieTableMetaClient metaClient) { + return Option.of(new SparkBroadcastManager(context, metaClient)); } @Override diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestSparkBroadcastManager.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestSparkBroadcastManager.java index bd0c6730829c..cb33a887c195 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestSparkBroadcastManager.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestSparkBroadcastManager.java @@ -20,15 +20,34 @@ package org.apache.hudi.table; import org.apache.hudi.HoodieSparkUtils; +import org.apache.hudi.client.utils.SparkInternalSchemaConverter; +import org.apache.hudi.common.model.ActionType; +import org.apache.hudi.common.table.TableSchemaResolver; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.table.timeline.InstantFileNameGenerator; +import org.apache.hudi.common.table.timeline.versioning.v2.InstantFileNameGeneratorV2; +import org.apache.hudi.common.table.timeline.versioning.v2.InstantGeneratorV2; +import org.apache.hudi.common.util.Option; import org.apache.hudi.hadoop.fs.inline.InLineFileSystem; +import org.apache.hudi.internal.schema.InternalSchema; +import org.apache.hudi.internal.schema.Types; import org.apache.hadoop.conf.Configuration; import org.apache.spark.sql.internal.SQLConf; import org.junit.jupiter.api.Test; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; class TestSparkBroadcastManager { @Test @@ -47,4 +66,51 @@ void testGetStorageConfiguration() { String inlineClassName = createdConfig.get("fs." + InLineFileSystem.SCHEME + ".impl"); assertEquals(InLineFileSystem.class.getName(), inlineClassName); } + + @Test + void testExtraConfigsAdded() { + Map extraConfigs = new HashMap<>(); + extraConfigs.put("K1", "V1"); + Configuration configs = new Configuration(false); + SparkBroadcastManager.addSchemaEvolutionConfigs(configs, extraConfigs); + assertEquals("V1", configs.get("K1")); + } + + @Test + void testGetSchemaEvolutionConfigurations() { + TableSchemaResolver schemaResolver = mock(TableSchemaResolver.class); + HoodieTimeline timeline = mock(HoodieTimeline.class); + InstantFileNameGenerator fileNameGenerator = new InstantFileNameGeneratorV2(); + String basePath = "any_table_path"; + + // Test when InternalSchema is empty. + when(schemaResolver.getTableInternalSchemaFromCommitMetadata()).thenReturn(Option.empty()); + Map schemaEvolutionConfigs = SparkBroadcastManager.getSchemaEvolutionConfigs( + schemaResolver, timeline, fileNameGenerator, basePath); + assertTrue(schemaEvolutionConfigs.isEmpty()); + + // Test when InternalSchema is not empty. + InstantGeneratorV2 instantGen = new InstantGeneratorV2(); + Types.RecordType record = Types.RecordType.get(Collections.singletonList( + Types.Field.get(0, "col1", Types.BooleanType.get()))); + List instants = Arrays.asList( + instantGen.createNewInstant( + HoodieInstant.State.COMPLETED, ActionType.deltacommit.name(), "0001", "0005"), + instantGen.createNewInstant( + HoodieInstant.State.COMPLETED, ActionType.deltacommit.name(), "0002", "0006"), + instantGen.createNewInstant( + HoodieInstant.State.COMPLETED, ActionType.compaction.name(), "0003", "0007")); + InternalSchema internalSchema = new InternalSchema(record); + when(schemaResolver.getTableInternalSchemaFromCommitMetadata()).thenReturn(Option.of(internalSchema)); + when(timeline.getInstants()).thenReturn(instants); + schemaEvolutionConfigs = SparkBroadcastManager.getSchemaEvolutionConfigs( + schemaResolver, timeline, fileNameGenerator, basePath); + assertFalse(schemaEvolutionConfigs.isEmpty()); + assertEquals( + "0001_0005.deltacommit,0002_0006.deltacommit,0003_0007.commit", + schemaEvolutionConfigs.get(SparkInternalSchemaConverter.HOODIE_VALID_COMMITS_LIST)); + assertEquals( + "any_table_path", + schemaEvolutionConfigs.get(SparkInternalSchemaConverter.HOODIE_TABLE_PATH)); + } }