From ed2dc912003a7f5f13f09374aec99399ff8d614b Mon Sep 17 00:00:00 2001 From: Y Ethan Guo Date: Wed, 22 May 2024 15:27:48 -0700 Subject: [PATCH] [HUDI-7784] Fix serde of HoodieHadoopConfiguration in Spark (#11270) --- .../spark/HoodieSparkKryoRegistrar.scala | 6 +- .../spark/TestHoodieSparkKryoRegistrar.java | 86 +++++++++++++++++++ 2 files changed, 89 insertions(+), 3 deletions(-) create mode 100644 hudi-client/hudi-spark-client/src/test/java/org/apache/spark/TestHoodieSparkKryoRegistrar.java diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/HoodieSparkKryoRegistrar.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/HoodieSparkKryoRegistrar.scala index a8650e5668a6e..eba3999ea57d1 100644 --- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/HoodieSparkKryoRegistrar.scala +++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/HoodieSparkKryoRegistrar.scala @@ -22,7 +22,7 @@ import org.apache.hudi.client.model.HoodieInternalRow import org.apache.hudi.common.model.{HoodieKey, HoodieSparkRecord} import org.apache.hudi.common.util.HoodieCommonKryoRegistrar import org.apache.hudi.config.HoodieWriteConfig -import org.apache.hudi.storage.StorageConfiguration +import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration import com.esotericsoftware.kryo.io.{Input, Output} import com.esotericsoftware.kryo.serializers.JavaSerializer @@ -64,8 +64,8 @@ class HoodieSparkKryoRegistrar extends HoodieCommonKryoRegistrar with KryoRegist // Hadoop's configuration is not a serializable object by itself, and hence // we're relying on [[SerializableConfiguration]] wrapper to work it around. // We cannot remove this entry; otherwise the ordering is changed. - // So we replace it with [[StorageConfiguration]]. - kryo.register(classOf[StorageConfiguration[_]], new JavaSerializer()) + // So we replace it with [[HadoopStorageConfiguration]] for Spark. + kryo.register(classOf[HadoopStorageConfiguration], new JavaSerializer()) } /** diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/spark/TestHoodieSparkKryoRegistrar.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/spark/TestHoodieSparkKryoRegistrar.java new file mode 100644 index 0000000000000..4dd297a02b66f --- /dev/null +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/spark/TestHoodieSparkKryoRegistrar.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark; + +import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration; + +import com.esotericsoftware.kryo.Kryo; +import com.esotericsoftware.kryo.io.Input; +import com.esotericsoftware.kryo.io.Output; +import org.apache.hadoop.conf.Configuration; +import org.junit.jupiter.api.Test; +import org.objenesis.strategy.StdInstantiatorStrategy; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.util.HashMap; +import java.util.Map; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +/** + * Tests {@link HoodieSparkKryoRegistrar} + */ +public class TestHoodieSparkKryoRegistrar { + @Test + public void testSerdeHoodieHadoopConfiguration() { + Kryo kryo = newKryo(); + + HadoopStorageConfiguration conf = new HadoopStorageConfiguration(new Configuration()); + + // Serialize + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + Output output = new Output(baos); + kryo.writeObject(output, conf); + output.close(); + + // Deserialize + ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray()); + Input input = new Input(bais); + HadoopStorageConfiguration deserialized = kryo.readObject(input, HadoopStorageConfiguration.class); + input.close(); + + // Verify + assertEquals(getPropsInMap(conf), getPropsInMap(deserialized)); + } + + private Kryo newKryo() { + Kryo kryo = new Kryo(); + + // This instance of Kryo should not require prior registration of classes + kryo.setRegistrationRequired(false); + kryo.setInstantiatorStrategy(new Kryo.DefaultInstantiatorStrategy(new StdInstantiatorStrategy())); + // Handle cases where we may have an odd classloader setup like with libjars + // for hadoop + kryo.setClassLoader(Thread.currentThread().getContextClassLoader()); + + // Register Hudi's classes + new HoodieSparkKryoRegistrar().registerClasses(kryo); + + return kryo; + } + + private Map getPropsInMap(HadoopStorageConfiguration conf) { + Map configMap = new HashMap<>(); + conf.unwrap().iterator().forEachRemaining( + e -> configMap.put(e.getKey(), e.getValue())); + return configMap; + } +}