Skip to content

Commit 6026059

Browse files
committed
merge raw history removal
Signed-off-by: Shijie Sheng <[email protected]>
1 parent fa8456a commit 6026059

File tree

6 files changed

+20
-237
lines changed

6 files changed

+20
-237
lines changed

src/main/java/com/uber/cadence/internal/common/InternalUtils.java

Lines changed: 0 additions & 97 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,6 @@
1818
package com.uber.cadence.internal.common;
1919

2020
import com.google.common.base.Defaults;
21-
import com.google.common.collect.Lists;
22-
import com.uber.cadence.DataBlob;
23-
import com.uber.cadence.History;
24-
import com.uber.cadence.HistoryEvent;
25-
import com.uber.cadence.HistoryEventFilterType;
2621
import com.uber.cadence.Memo;
2722
import com.uber.cadence.SearchAttributes;
2823
import com.uber.cadence.TaskList;
@@ -33,15 +28,10 @@
3328
import com.uber.cadence.workflow.WorkflowMethod;
3429
import java.lang.reflect.Method;
3530
import java.nio.ByteBuffer;
36-
import java.util.Arrays;
3731
import java.util.HashMap;
38-
import java.util.List;
3932
import java.util.Map;
4033
import java.util.concurrent.ExecutorService;
4134
import java.util.concurrent.TimeUnit;
42-
import org.apache.thrift.TDeserializer;
43-
import org.apache.thrift.TException;
44-
import org.apache.thrift.TSerializer;
4535

4636
/** Utility functions shared by the implementation code. */
4737
public final class InternalUtils {
@@ -164,93 +154,6 @@ public static SearchAttributes convertMapToSearchAttributes(
164154
return new SearchAttributes().setIndexedFields(mapOfByteBuffer);
165155
}
166156

167-
// This method serializes history to blob data
168-
public static DataBlob SerializeFromHistoryToBlobData(History history) {
169-
170-
// TODO: move to global dependency after https://issues.apache.org/jira/browse/THRIFT-2218
171-
TSerializer serializer = new TSerializer();
172-
DataBlob blob = new DataBlob();
173-
try {
174-
blob.setData(serializer.serialize(history));
175-
} catch (org.apache.thrift.TException err) {
176-
throw new RuntimeException("Serialize history to blob data failed", err);
177-
}
178-
179-
return blob;
180-
}
181-
182-
// This method deserialize the DataBlob data to the History data
183-
public static History DeserializeFromBlobDataToHistory(
184-
List<DataBlob> blobData, HistoryEventFilterType historyEventFilterType) throws TException {
185-
186-
// TODO: move to global dependency after https://issues.apache.org/jira/browse/THRIFT-2218
187-
TDeserializer deSerializer = new TDeserializer();
188-
List<HistoryEvent> events = Lists.newArrayList();
189-
for (DataBlob data : blobData) {
190-
History history = new History();
191-
try {
192-
byte[] dataByte = data.getData();
193-
// TODO: verify the beginning index
194-
dataByte = Arrays.copyOfRange(dataByte, 0, dataByte.length);
195-
deSerializer.deserialize(history, dataByte);
196-
197-
if (history == null || history.getEvents() == null || history.getEvents().size() == 0) {
198-
return null;
199-
}
200-
} catch (org.apache.thrift.TException err) {
201-
throw new TException("Deserialize blob data to history failed with unknown error");
202-
}
203-
204-
events.addAll(history.getEvents());
205-
}
206-
207-
if (events.size() > 0 && historyEventFilterType == HistoryEventFilterType.CLOSE_EVENT) {
208-
events = events.subList(events.size() - 1, events.size());
209-
}
210-
211-
return new History().setEvents(events);
212-
}
213-
214-
// This method serializes history event to blob data
215-
public static List<DataBlob> SerializeFromHistoryEventToBlobData(List<HistoryEvent> events) {
216-
217-
// TODO: move to global dependency after https://issues.apache.org/jira/browse/THRIFT-2218
218-
TSerializer serializer = new TSerializer();
219-
List<DataBlob> blobs = Lists.newArrayListWithCapacity(events.size());
220-
for (HistoryEvent event : events) {
221-
DataBlob blob = new DataBlob();
222-
try {
223-
blob.setData(serializer.serialize(event));
224-
} catch (org.apache.thrift.TException err) {
225-
throw new RuntimeException("Serialize history event to blob data failed", err);
226-
}
227-
blobs.add(blob);
228-
}
229-
return blobs;
230-
}
231-
232-
// This method serializes blob data to history event
233-
public static List<HistoryEvent> DeserializeFromBlobDataToHistoryEvents(List<DataBlob> blobData)
234-
throws TException {
235-
236-
// TODO: move to global dependency after https://issues.apache.org/jira/browse/THRIFT-2218
237-
TDeserializer deSerializer = new TDeserializer();
238-
List<HistoryEvent> events = Lists.newArrayList();
239-
for (DataBlob data : blobData) {
240-
try {
241-
HistoryEvent event = new HistoryEvent();
242-
byte[] dataByte = data.getData();
243-
// TODO: verify the beginning index
244-
dataByte = Arrays.copyOfRange(dataByte, 0, dataByte.length);
245-
deSerializer.deserialize(event, dataByte);
246-
events.add(event);
247-
} catch (org.apache.thrift.TException err) {
248-
throw new TException("Deserialize blob data to history event failed with unknown error");
249-
}
250-
}
251-
return events;
252-
}
253-
254157
/** Prohibit instantiation */
255158
private InternalUtils() {}
256159
}

src/main/java/com/uber/cadence/internal/shadowing/ReplayWorkflowActivityImpl.java

Lines changed: 3 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,9 @@
1919

2020
import com.google.common.collect.Lists;
2121
import com.uber.cadence.GetWorkflowExecutionHistoryResponse;
22-
import com.uber.cadence.History;
2322
import com.uber.cadence.HistoryEvent;
24-
import com.uber.cadence.HistoryEventFilterType;
2523
import com.uber.cadence.activity.Activity;
2624
import com.uber.cadence.common.WorkflowExecutionHistory;
27-
import com.uber.cadence.internal.common.InternalUtils;
2825
import com.uber.cadence.internal.common.RpcRetryer;
2926
import com.uber.cadence.internal.common.WorkflowExecutionUtils;
3027
import com.uber.cadence.internal.metrics.MetricsType;
@@ -185,14 +182,10 @@ protected WorkflowExecutionHistory getFullHistory(String domain, WorkflowExecuti
185182
nextPageToken, this.serviceClient, domain, execution.toThrift()));
186183
pageToken = resp.getNextPageToken();
187184

188-
// handle raw history
185+
// TODO support raw history feature once server removes default Thrift encoding
189186
if (resp.getRawHistory() != null && resp.getRawHistory().size() > 0) {
190-
History history =
191-
InternalUtils.DeserializeFromBlobDataToHistory(
192-
resp.getRawHistory(), HistoryEventFilterType.ALL_EVENT);
193-
if (history != null && history.getEvents() != null) {
194-
histories.addAll(history.getEvents());
195-
}
187+
throw new UnsupportedOperationException(
188+
"Raw history is not supported. Please turn off frontend.sendRawWorkflowHistory feature flag in frontend service to recover");
196189
} else {
197190
histories.addAll(resp.getHistory().getEvents());
198191
}

src/main/java/com/uber/cadence/internal/testservice/TestWorkflowStoreImpl.java

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
package com.uber.cadence.internal.testservice;
1919

2020
import com.uber.cadence.BadRequestError;
21-
import com.uber.cadence.DataBlob;
2221
import com.uber.cadence.EntityNotExistsError;
2322
import com.uber.cadence.EventType;
2423
import com.uber.cadence.GetWorkflowExecutionHistoryRequest;
@@ -34,7 +33,6 @@
3433
import com.uber.cadence.StickyExecutionAttributes;
3534
import com.uber.cadence.WorkflowExecution;
3635
import com.uber.cadence.WorkflowExecutionInfo;
37-
import com.uber.cadence.internal.common.InternalUtils;
3836
import com.uber.cadence.internal.common.WorkflowExecutionUtils;
3937
import com.uber.cadence.internal.testservice.RequestContext.Timer;
4038
import java.time.Duration;
@@ -348,24 +346,20 @@ public GetWorkflowExecutionHistoryResponse getWorkflowExecutionHistory(
348346
if (!getRequest.isWaitForNewEvent()
349347
&& getRequest.getHistoryEventFilterType() != HistoryEventFilterType.CLOSE_EVENT) {
350348
List<HistoryEvent> events = history.getEventsLocked();
351-
List<DataBlob> blobs = InternalUtils.SerializeFromHistoryEventToBlobData(events);
352349
// Copy the list as it is mutable. Individual events assumed immutable.
353350
ArrayList<HistoryEvent> eventsCopy = new ArrayList<>(events);
354351
return new GetWorkflowExecutionHistoryResponse()
355-
.setHistory(new History().setEvents(eventsCopy))
356-
.setRawHistory(blobs);
352+
.setHistory(new History().setEvents(eventsCopy));
357353
}
358354
expectedNextEventId = history.getNextEventIdLocked();
359355
} finally {
360356
lock.unlock();
361357
}
362358
List<HistoryEvent> events =
363359
history.waitForNewEvents(expectedNextEventId, getRequest.getHistoryEventFilterType());
364-
List<DataBlob> blobs = InternalUtils.SerializeFromHistoryEventToBlobData(events);
365360
GetWorkflowExecutionHistoryResponse result = new GetWorkflowExecutionHistoryResponse();
366361
if (events != null) {
367362
result.setHistory(new History().setEvents(events));
368-
result.setRawHistory(blobs);
369363
}
370364
return result;
371365
}

src/main/java/com/uber/cadence/serviceclient/WorkflowServiceTChannel.java

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@
2828
import com.uber.cadence.WorkflowService.GetWorkflowExecutionHistory_result;
2929
import com.uber.cadence.internal.Version;
3030
import com.uber.cadence.internal.common.CheckedExceptionWrapper;
31-
import com.uber.cadence.internal.common.InternalUtils;
3231
import com.uber.cadence.internal.metrics.MetricsTag;
3332
import com.uber.cadence.internal.metrics.MetricsType;
3433
import com.uber.cadence.internal.metrics.ServiceMethod;
@@ -774,10 +773,8 @@ private GetWorkflowExecutionHistoryResponse getWorkflowExecutionHistory(
774773
if (response.getResponseCode() == ResponseCode.OK) {
775774
GetWorkflowExecutionHistoryResponse res = result.getSuccess();
776775
if (res.getRawHistory() != null) {
777-
History history =
778-
InternalUtils.DeserializeFromBlobDataToHistory(
779-
res.getRawHistory(), getRequest.getHistoryEventFilterType());
780-
res.setHistory(history);
776+
throw new TException(
777+
"Raw history is not supported. Please turn off frontend.sendRawWorkflowHistory feature flag in frontend service to recover");
781778
}
782779
return res;
783780
}
@@ -2601,10 +2598,8 @@ private void getWorkflowExecutionHistory(
26012598
if (r.getResponseCode() == ResponseCode.OK) {
26022599
GetWorkflowExecutionHistoryResponse res = result.getSuccess();
26032600
if (res.getRawHistory() != null) {
2604-
History history =
2605-
InternalUtils.DeserializeFromBlobDataToHistory(
2606-
res.getRawHistory(), getRequest.getHistoryEventFilterType());
2607-
res.setHistory(history);
2601+
throw new TException(
2602+
"Raw history is not supported. Please turn off frontend.sendRawWorkflowHistory feature flag in frontend service to recover");
26082603
}
26092604
resultHandler.onComplete(res);
26102605
return;

src/test/java/com/uber/cadence/internal/common/InternalUtilsTest.java

Lines changed: 0 additions & 106 deletions
Original file line numberDiff line numberDiff line change
@@ -17,23 +17,14 @@
1717

1818
package com.uber.cadence.internal.common;
1919

20-
import static com.uber.cadence.EventType.WorkflowExecutionStarted;
2120
import static junit.framework.TestCase.assertEquals;
22-
import static org.junit.Assert.assertNotNull;
2321

24-
import com.google.common.collect.Lists;
25-
import com.googlecode.junittoolbox.MultithreadingTester;
26-
import com.googlecode.junittoolbox.RunnableAssert;
2722
import com.uber.cadence.*;
2823
import com.uber.cadence.converter.DataConverterException;
2924
import com.uber.cadence.workflow.WorkflowUtils;
3025
import java.io.FileOutputStream;
31-
import java.time.LocalDateTime;
32-
import java.time.ZoneOffset;
3326
import java.util.HashMap;
34-
import java.util.List;
3527
import java.util.Map;
36-
import junit.framework.TestCase;
3728
import org.junit.Test;
3829

3930
public class InternalUtilsTest {
@@ -56,101 +47,4 @@ public void testConvertMapToSearchAttributesException() throws Throwable {
5647
attr.put("InvalidValue", new FileOutputStream("dummy"));
5748
InternalUtils.convertMapToSearchAttributes(attr);
5849
}
59-
60-
@Test
61-
public void testSerialization_History() {
62-
63-
RunnableAssert r =
64-
new RunnableAssert("history_serialization") {
65-
@Override
66-
public void run() {
67-
HistoryEvent event =
68-
new HistoryEvent()
69-
.setEventId(1)
70-
.setVersion(1)
71-
.setEventType(WorkflowExecutionStarted)
72-
.setTimestamp(LocalDateTime.now().toEpochSecond(ZoneOffset.UTC))
73-
.setWorkflowExecutionStartedEventAttributes(
74-
new WorkflowExecutionStartedEventAttributes()
75-
.setAttempt(1)
76-
.setFirstExecutionRunId("test"));
77-
78-
List<HistoryEvent> historyEvents = Lists.newArrayList(event);
79-
History history = new History().setEvents(historyEvents);
80-
DataBlob blob = InternalUtils.SerializeFromHistoryToBlobData(history);
81-
assertNotNull(blob);
82-
83-
try {
84-
History result =
85-
InternalUtils.DeserializeFromBlobDataToHistory(
86-
Lists.newArrayList(blob), HistoryEventFilterType.ALL_EVENT);
87-
assertNotNull(result);
88-
assertEquals(1, result.events.size());
89-
assertEquals(event.getEventId(), result.events.get(0).getEventId());
90-
assertEquals(event.getVersion(), result.events.get(0).getVersion());
91-
assertEquals(event.getEventType(), result.events.get(0).getEventType());
92-
assertEquals(event.getTimestamp(), result.events.get(0).getTimestamp());
93-
assertEquals(
94-
event.getWorkflowExecutionStartedEventAttributes(),
95-
result.events.get(0).getWorkflowExecutionStartedEventAttributes());
96-
} catch (Exception e) {
97-
TestCase.fail("Received unexpected error during deserialization");
98-
}
99-
}
100-
};
101-
102-
try {
103-
new MultithreadingTester().add(r).numThreads(50).numRoundsPerThread(10).run();
104-
} catch (Exception e) {
105-
TestCase.fail("Received unexpected error during concurrent deserialization");
106-
}
107-
}
108-
109-
@Test
110-
public void testSerialization_HistoryEvent() {
111-
112-
RunnableAssert r =
113-
new RunnableAssert("history_event_serialization") {
114-
@Override
115-
public void run() {
116-
HistoryEvent event =
117-
new HistoryEvent()
118-
.setEventId(1)
119-
.setVersion(1)
120-
.setEventType(WorkflowExecutionStarted)
121-
.setTimestamp(LocalDateTime.now().toEpochSecond(ZoneOffset.UTC))
122-
.setWorkflowExecutionStartedEventAttributes(
123-
new WorkflowExecutionStartedEventAttributes()
124-
.setAttempt(1)
125-
.setFirstExecutionRunId("test"));
126-
127-
List<HistoryEvent> historyEvents = Lists.newArrayList(event);
128-
List<DataBlob> blobList =
129-
InternalUtils.SerializeFromHistoryEventToBlobData(historyEvents);
130-
assertEquals(1, blobList.size());
131-
132-
try {
133-
List<HistoryEvent> result =
134-
InternalUtils.DeserializeFromBlobDataToHistoryEvents(blobList);
135-
assertNotNull(result);
136-
assertEquals(1, result.size());
137-
assertEquals(event.getEventId(), result.get(0).getEventId());
138-
assertEquals(event.getVersion(), result.get(0).getVersion());
139-
assertEquals(event.getEventType(), result.get(0).getEventType());
140-
assertEquals(event.getTimestamp(), result.get(0).getTimestamp());
141-
assertEquals(
142-
event.getWorkflowExecutionStartedEventAttributes(),
143-
result.get(0).getWorkflowExecutionStartedEventAttributes());
144-
} catch (Exception e) {
145-
TestCase.fail("Received unexpected error during deserialization");
146-
}
147-
}
148-
};
149-
150-
try {
151-
new MultithreadingTester().add(r).numThreads(50).numRoundsPerThread(10).run();
152-
} catch (Exception e) {
153-
TestCase.fail("Received unexpected error during concurrent deserialization");
154-
}
155-
}
15650
}

0 commit comments

Comments
 (0)