Skip to content

Commit c3d16b2

Browse files
authored
[GOBBLIN-2155] Fix bug where empty delete activity result was not jackson deserializ… (apache#4054)
* Fix bug where empty delete activity result was not jackson deserializable
1 parent 35a4d8e commit c3d16b2

File tree

3 files changed

+63
-1
lines changed

3 files changed

+63
-1
lines changed

gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/DeleteWorkDirsActivityImpl.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ public DirDeletionResult delete(WUProcessingSpec workSpec, EventSubmitterContext
5050
// Ensure that non HDFS writers exit early as they rely on a different cleanup process, can consider consolidation in the future
5151
// through an abstracted cleanup method implemented at a writer level
5252
if (workDirPaths.isEmpty()) {
53-
return new DirDeletionResult();
53+
return DirDeletionResult.createEmpty();
5454
}
5555
//TODO: Emit timers to measure length of cleanup step
5656
Optional<String> optJobName = Optional.empty();

gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/DirDeletionResult.java

+13
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.gobblin.temporal.ddm.work;
1919

20+
import java.util.HashMap;
2021
import java.util.Map;
2122

2223
import lombok.Data;
@@ -34,4 +35,16 @@
3435
public class DirDeletionResult {
3536

3637
@NonNull private Map<String, Boolean> successesByDirPath;
38+
39+
/**
40+
* Empty result that should be used instead of empty constructor and needed to support jackson (de)serialization, otherwise will face the following error
41+
* Caused by: io.temporal.common.converter.DataConverterException: com.fasterxml.jackson.databind.JsonMappingException: successesByDirPath is marked non-null but is null
42+
* at [Source: (byte[])"{"successesByDirPath":null}"; line: 1, column: 23] (through reference chain: org.apache.gobblin.temporal.ddm.work.DirDeletionResult["successesByDirPath"])
43+
* at io.temporal.common.converter.JacksonJsonPayloadConverter.fromData(JacksonJsonPayloadConverter.java:101)
44+
* at io.temporal.common.converter.DefaultDataConverter.fromPayload(DefaultDataConverter.java:145)
45+
* @return
46+
*/
47+
public static DirDeletionResult createEmpty() {
48+
return new DirDeletionResult(new HashMap<>());
49+
}
3750
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.gobblin.temporal.ddm.activity.impl;
19+
20+
import java.util.HashSet;
21+
import java.util.Optional;
22+
import java.util.Set;
23+
24+
import org.testng.Assert;
25+
import org.testng.annotations.Test;
26+
27+
import io.temporal.api.common.v1.Payload;
28+
import io.temporal.common.converter.JacksonJsonPayloadConverter;
29+
import io.temporal.common.converter.PayloadConverter;
30+
31+
import org.apache.gobblin.temporal.ddm.activity.DeleteWorkDirsActivity;
32+
import org.apache.gobblin.temporal.ddm.work.DirDeletionResult;
33+
import org.apache.gobblin.temporal.ddm.work.WUProcessingSpec;
34+
35+
36+
public class DeleteWorkDirsActivityImplTest {
37+
38+
@Test
39+
public void testEmptyDeleteSupportsSerde() {
40+
DeleteWorkDirsActivity deleteWorkDirsActivity = new DeleteWorkDirsActivityImpl();
41+
WUProcessingSpec workSpec = new WUProcessingSpec();
42+
Set<String> workDirPaths = new HashSet<>();
43+
DirDeletionResult result = deleteWorkDirsActivity.delete(workSpec, null, workDirPaths);
44+
PayloadConverter converter = new JacksonJsonPayloadConverter();
45+
Optional<Payload> payload = converter.toData(result);
46+
DirDeletionResult result2 = converter.fromData(payload.get(), DirDeletionResult.class, DirDeletionResult.class);
47+
Assert.assertEquals(result, result2);
48+
}
49+
}

0 commit comments

Comments
 (0)