Skip to content

Commit

Permalink
[HUDI-7784] Fix serde of HoodieHadoopConfiguration in Spark (#11270)
Browse files Browse the repository at this point in the history
  • Loading branch information
yihua committed May 23, 2024
1 parent f5a7c0f commit ed2dc91
Show file tree
Hide file tree
Showing 2 changed files with 89 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import org.apache.hudi.client.model.HoodieInternalRow
import org.apache.hudi.common.model.{HoodieKey, HoodieSparkRecord}
import org.apache.hudi.common.util.HoodieCommonKryoRegistrar
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.storage.StorageConfiguration
import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration

import com.esotericsoftware.kryo.io.{Input, Output}
import com.esotericsoftware.kryo.serializers.JavaSerializer
Expand Down Expand Up @@ -64,8 +64,8 @@ class HoodieSparkKryoRegistrar extends HoodieCommonKryoRegistrar with KryoRegist
// Hadoop's configuration is not a serializable object by itself, and hence
// we're relying on [[SerializableConfiguration]] wrapper to work it around.
// We cannot remove this entry; otherwise the ordering is changed.
// So we replace it with [[StorageConfiguration]].
kryo.register(classOf[StorageConfiguration[_]], new JavaSerializer())
// So we replace it with [[HadoopStorageConfiguration]] for Spark.
kryo.register(classOf[HadoopStorageConfiguration], new JavaSerializer())
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.spark;

import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration;

import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
import org.apache.hadoop.conf.Configuration;
import org.junit.jupiter.api.Test;
import org.objenesis.strategy.StdInstantiatorStrategy;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.util.HashMap;
import java.util.Map;

import static org.junit.jupiter.api.Assertions.assertEquals;

/**
* Tests {@link HoodieSparkKryoRegistrar}
*/
public class TestHoodieSparkKryoRegistrar {
@Test
public void testSerdeHoodieHadoopConfiguration() {
Kryo kryo = newKryo();

HadoopStorageConfiguration conf = new HadoopStorageConfiguration(new Configuration());

// Serialize
ByteArrayOutputStream baos = new ByteArrayOutputStream();
Output output = new Output(baos);
kryo.writeObject(output, conf);
output.close();

// Deserialize
ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray());
Input input = new Input(bais);
HadoopStorageConfiguration deserialized = kryo.readObject(input, HadoopStorageConfiguration.class);
input.close();

// Verify
assertEquals(getPropsInMap(conf), getPropsInMap(deserialized));
}

private Kryo newKryo() {
Kryo kryo = new Kryo();

// This instance of Kryo should not require prior registration of classes
kryo.setRegistrationRequired(false);
kryo.setInstantiatorStrategy(new Kryo.DefaultInstantiatorStrategy(new StdInstantiatorStrategy()));
// Handle cases where we may have an odd classloader setup like with libjars
// for hadoop
kryo.setClassLoader(Thread.currentThread().getContextClassLoader());

// Register Hudi's classes
new HoodieSparkKryoRegistrar().registerClasses(kryo);

return kryo;
}

private Map<String, String> getPropsInMap(HadoopStorageConfiguration conf) {
Map<String, String> configMap = new HashMap<>();
conf.unwrap().iterator().forEachRemaining(
e -> configMap.put(e.getKey(), e.getValue()));
return configMap;
}
}

0 comments on commit ed2dc91

Please sign in to comment.