Skip to content

Commit daf6da6

Browse files
authored
chore(next): Unify rabbitmq starter with starter eda (#117)
* unify rabbitmq starter, update docs
1 parent 2f6e0b0 commit daf6da6

File tree

97 files changed

+1282
-3603
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

97 files changed

+1282
-3603
lines changed

async/async-commons/src/main/java/org/reactivecommons/async/commons/converters/json/CloudEventBuilderExt.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import com.fasterxml.jackson.databind.ObjectMapper;
44
import io.cloudevents.CloudEvent;
5+
import io.cloudevents.CloudEventData;
56
import lombok.SneakyThrows;
67
import lombok.experimental.UtilityClass;
78

@@ -16,6 +17,10 @@ public static byte[] asBytes(Object object) {
1617
return mapper.writeValueAsBytes(object);
1718
}
1819

20+
public static CloudEventData asCloudEventData(Object object) {
21+
return () -> asBytes(object);
22+
}
23+
1924
@SneakyThrows
2025
public static <T> T fromCloudEventData(CloudEvent cloudEvent, Class<T> classValue) {
2126
return mapper.readValue(Objects.requireNonNull(cloudEvent.getData()).toBytes(), classValue);

async/async-kafka/src/test/java/org/reactivecommons/async/kafka/listeners/GenericMessageListenerTest.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -72,8 +72,6 @@ void shouldStartListener() {
7272
Headers header = new RecordHeaders().add("contentType", "application/json".getBytes(StandardCharsets.UTF_8));
7373
when(record.headers()).thenReturn(header);
7474
when(record.key()).thenReturn("key");
75-
ReceiverOffset receiverOffset = mock(ReceiverOffset.class);
76-
when(record.receiverOffset()).thenReturn(receiverOffset);
7775

7876
Flux<ReceiverRecord<String, byte[]>> flux = Flux.just(record);
7977
when(receiver.listen(anyString(), any(List.class))).thenReturn(flux);
@@ -91,7 +89,6 @@ void shouldStartListener() {
9189
StepVerifier.create(flow).expectNext("").verifyComplete();
9290
// Assert
9391
verify(topologyCreator, times(1)).createTopics(any(List.class));
94-
verify(receiverOffset, atLeastOnce()).acknowledge();
9592
}
9693

9794

0 commit comments

Comments
 (0)