Skip to content

Commit 2aa92a7

Browse files
committed
GH-422, GH-606 Fis structure mode attribute generation
1 parent 24ef274 commit 2aa92a7

File tree

5 files changed

+41
-39
lines changed

5 files changed

+41
-39
lines changed

spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventAttributes.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import java.util.HashMap;
2020
import java.util.Map;
21+
import java.util.UUID;
2122

2223
import org.springframework.util.StringUtils;
2324

@@ -101,6 +102,10 @@ public <A> A getId() {
101102
else if (this.containsKey(CloudEventMessageUtils.HTTP_ATTR_PREFIX + CloudEventMessageUtils.ID)) {
102103
return (A) this.get(CloudEventMessageUtils.HTTP_ATTR_PREFIX + CloudEventMessageUtils.ID);
103104
}
105+
Object id = this.get(CloudEventMessageUtils.ID);
106+
if (!(id instanceof UUID)) {
107+
return (A) id;
108+
}
104109
return null;
105110
}
106111

spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventMessageUtils.java

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -237,9 +237,10 @@ public static Message<?> toBinary(Message<?> inputMessage, MessageConverter mess
237237
.parseMimeType(contentType.getType() + "/" + suffix);
238238
Message<?> cloudEventMessage = MessageBuilder.fromMessage(inputMessage)
239239
.setHeader(MessageHeaders.CONTENT_TYPE, cloudEventDeserializationContentType)
240-
.setHeader(CloudEventMessageUtils.CANONICAL_DATACONTENTTYPE, dataContentType).build();
240+
.setHeader(CloudEventMessageUtils.CANONICAL_DATACONTENTTYPE, dataContentType)
241+
.build();
241242
Map<String, Object> structuredCloudEvent = (Map<String, Object>) messageConverter.fromMessage(cloudEventMessage, Map.class);
242-
Message<?> binaryCeMessage = buildCeMessageFromStructured(structuredCloudEvent, determinePrefixToUse(inputMessage));
243+
Message<?> binaryCeMessage = buildCeMessageFromStructured(structuredCloudEvent, inputMessage.getHeaders());
243244
return binaryCeMessage;
244245
}
245246
}
@@ -251,7 +252,8 @@ else if (StringUtils.hasText(attributes.getDataContentType())) {
251252
return inputMessage;
252253
}
253254

254-
private static Message<?> buildCeMessageFromStructured(Map<String, Object> structuredCloudEvent, String prefixToUse) {
255+
private static Message<?> buildCeMessageFromStructured(Map<String, Object> structuredCloudEvent, MessageHeaders originalHeaders) {
256+
String prefixToUse = determinePrefixToUse(originalHeaders);
255257
Object data = null;
256258
if (structuredCloudEvent.containsKey(CloudEventMessageUtils.HTTP_ATTR_PREFIX + CloudEventMessageUtils.DATA)) {
257259
data = structuredCloudEvent.get(CloudEventMessageUtils.HTTP_ATTR_PREFIX + CloudEventMessageUtils.DATA);
@@ -272,11 +274,12 @@ else if (structuredCloudEvent.containsKey(CloudEventMessageUtils.DATA)) {
272274
builder.setHeader(prefixToUse + CloudEventMessageUtils.SOURCE, attributes.getSource());
273275
builder.setHeader(prefixToUse + CloudEventMessageUtils.TYPE, attributes.getType());
274276
builder.setHeader(prefixToUse + CloudEventMessageUtils.SPECVERSION, attributes.getSpecversion());
277+
builder.copyHeaders(originalHeaders);
275278
return builder.build();
276279
}
277280

278-
public static String determinePrefixToUse(Message<?> inputMessage) {
279-
Set<String> keys = inputMessage.getHeaders().keySet();
281+
public static String determinePrefixToUse(MessageHeaders messageHeaders) {
282+
Set<String> keys = messageHeaders.keySet();
280283
if (keys.contains("user-agent")) {
281284
return CloudEventMessageUtils.HTTP_ATTR_PREFIX;
282285
}
@@ -296,9 +299,9 @@ public static CloudEventAttributes generateAttributes(Message<?> message, CloudE
296299
return attributes;
297300
}
298301

299-
public static CloudEventAttributes generateAttributes(Message<?> inputMessage, Object result, String applicationName) {
300-
CloudEventAttributes attributes = new CloudEventAttributes(inputMessage.getHeaders(), CloudEventMessageUtils.determinePrefixToUse(inputMessage));
301-
return generateDefaultAttributeValues(attributes, result.getClass().getName(), applicationName);
302+
public static CloudEventAttributes generateAttributes(Message<?> inputMessage, String typeName, String sourceName) {
303+
CloudEventAttributes attributes = new CloudEventAttributes(inputMessage.getHeaders(), CloudEventMessageUtils.determinePrefixToUse(inputMessage.getHeaders()));
304+
return generateDefaultAttributeValues(attributes, typeName, sourceName);
302305
}
303306

304307
private static CloudEventAttributes generateDefaultAttributeValues(CloudEventAttributes attributes, String source, String type) {

spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/BeanFactoryAwareFunctionRegistry.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -163,13 +163,12 @@ else if (this.isSpecialFunctionRegistration(functionNames, functionName)) {
163163

164164
if (function != null) {
165165
BiFunction<Message<?>, Object, Message<?>> invocationResultHeaderEnricher = new BiFunction<Message<?>, Object, Message<?>>() {
166-
167166
@Override
168167
public Message<?> apply(Message<?> inputMessage, Object invocationResult) {
169168
// TODO: Factor it out! Cloud Events specific code
170169
CloudEventAttributes generatedCeHeaders = CloudEventMessageUtils
171-
.generateAttributes(inputMessage, invocationResult, getApplicationName());
172-
CloudEventAttributes attributes = new CloudEventAttributes(generatedCeHeaders, CloudEventMessageUtils.determinePrefixToUse(inputMessage));
170+
.generateAttributes(inputMessage, invocationResult.getClass().getName(), getApplicationName());
171+
CloudEventAttributes attributes = new CloudEventAttributes(generatedCeHeaders, CloudEventMessageUtils.determinePrefixToUse(inputMessage.getHeaders()));
173172
if (cloudEventAtttributesProvider != null) {
174173
cloudEventAtttributesProvider.generateDefaultCloudEventHeaders(attributes);
175174
}

spring-cloud-function-samples/function-sample-cloudevent/src/main/java/io/spring/cloudevent/CloudeventDemoApplication.java

Lines changed: 3 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,9 @@
2222
import java.util.function.Consumer;
2323
import java.util.function.Function;
2424

25-
import org.springframework.beans.factory.annotation.Value;
2625
import org.springframework.boot.SpringApplication;
2726
import org.springframework.boot.autoconfigure.SpringBootApplication;
28-
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
2927
import org.springframework.boot.web.client.RestTemplateBuilder;
30-
import org.springframework.cloud.function.cloudevent.CloudEventAttributes;
3128
import org.springframework.cloud.function.cloudevent.CloudEventAttributesProvider;
3229
import org.springframework.cloud.function.cloudevent.CloudEventMessageUtils;
3330
import org.springframework.cloud.function.web.util.HeaderUtils;
@@ -37,7 +34,6 @@
3734
import org.springframework.messaging.MessageHeaders;
3835
import org.springframework.messaging.support.MessageBuilder;
3936
import org.springframework.util.Assert;
40-
import org.springframework.web.client.RestTemplate;
4137

4238
/**
4339
* Sample application that demonstrates how user functions can be triggered by cloud event.
@@ -55,6 +51,8 @@
5551
@SpringBootApplication
5652
public class CloudeventDemoApplication {
5753

54+
boolean consumerSuccess;
55+
5856
public static void main(String[] args) throws Exception {
5957
SpringApplication.run(CloudeventDemoApplication.class, args);
6058
}
@@ -145,22 +143,8 @@ public Consumer<Message<SpringReleaseEvent>> pojoConsumer(CloudEventAttributesPr
145143
Assert.notEmpty(idHeader, "'id' must not be null");
146144
List<String> specversionHeader = entity.getHeaders().get("ce-specversion");
147145
Assert.notEmpty(specversionHeader, "'specversion' must not be null");
146+
this.consumerSuccess = true;
148147
};
149148
}
150149

151-
152-
@Bean
153-
@ConditionalOnExpression("'${K_SINK:}'!=''")
154-
public Consumer<Message<Map<String, Object>>> sink(CloudEventAttributesProvider provider,
155-
RestTemplateBuilder builder, @Value("${K_SINK}") String url) {
156-
RestTemplate client = builder.build();
157-
return eventMessage -> {
158-
RequestEntity<Map<String, Object>> entity = RequestEntity.post(URI.create("http://foo.com"))
159-
.headers(HeaderUtils.fromMessage(
160-
new MessageHeaders(CloudEventMessageUtils.generateAttributes(eventMessage, provider, "io.spring"))))
161-
.body(eventMessage.getPayload());
162-
client.exchange(entity, byte[].class);
163-
};
164-
}
165-
166150
}

spring-cloud-function-samples/function-sample-cloudevent/src/test/java/io/spring/cloudevent/CloudeventDemoApplicationRESTTests.java

Lines changed: 20 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -171,6 +171,11 @@ public void testAsStracturalFormatToPOJO() throws Exception {
171171
response = testRestTemplate.exchange(re, String.class);
172172

173173
assertThat(response.getBody()).isEqualTo("releaseDate:24-03-2004; releaseName:Spring Framework; version:1.0");
174+
assertThat(response.getHeaders().get(CloudEventMessageUtils.HTTP_ATTR_PREFIX + CloudEventMessageUtils.SOURCE))
175+
.isEqualTo(Collections.singletonList("https://interface21.com/"));
176+
assertThat(response.getHeaders().get(CloudEventMessageUtils.HTTP_ATTR_PREFIX + CloudEventMessageUtils.TYPE))
177+
.isEqualTo(Collections.singletonList("com.interface21"));
178+
assertThat(response.getHeaders().get(CloudEventMessageUtils.HTTP_ATTR_PREFIX + CloudEventMessageUtils.ID)).isNotNull();
174179
}
175180

176181
@Test
@@ -202,6 +207,11 @@ public void testAsStracturalFormatToString() throws Exception {
202207
response = testRestTemplate.exchange(re, String.class);
203208

204209
assertThat(response.getBody()).isEqualTo("{\"version\":\"1.0\",\"releaseName\":\"Spring Framework\",\"releaseDate\":\"24-03-2004\"}");
210+
assertThat(response.getHeaders().get(CloudEventMessageUtils.HTTP_ATTR_PREFIX + CloudEventMessageUtils.SOURCE))
211+
.isEqualTo(Collections.singletonList("https://interface21.com/"));
212+
assertThat(response.getHeaders().get(CloudEventMessageUtils.HTTP_ATTR_PREFIX + CloudEventMessageUtils.TYPE))
213+
.isEqualTo(Collections.singletonList("com.interface21"));
214+
assertThat(response.getHeaders().get(CloudEventMessageUtils.HTTP_ATTR_PREFIX + CloudEventMessageUtils.ID)).isNotNull();
205215
}
206216

207217
@Test
@@ -271,7 +281,7 @@ public void testAsBinaryPojoToPojoWrongHeaders() throws Exception {
271281

272282

273283
@Test
274-
public void testAsStructuralPojoToPojo() throws Exception {
284+
public void testAsStructuralPojoToPojoDefaultDataContentType() throws Exception {
275285
ApplicationContext context = SpringApplication.run(CloudeventDemoApplication.class);
276286
JsonMapper mapper = context.getBean(JsonMapper.class);
277287

@@ -280,15 +290,14 @@ public void testAsStructuralPojoToPojo() throws Exception {
280290
" \"type\" : \"org.springframework\",\n" +
281291
" \"source\" : \"https://spring.io/\",\n" +
282292
" \"id\" : \"A234-1234-1234\",\n" +
283-
// " \"ce-datacontenttype\" : \"application/json\",\n" +
284293
" \"data\" : {\n" +
285294
" \"version\" : \"1.0\",\n" +
286295
" \"releaseName\" : \"Spring Framework\",\n" +
287296
" \"releaseDate\" : \"24-03-2004\"\n" +
288297
" }\n" +
289298
"}";
290299

291-
System.out.println(payload);
300+
292301
HttpHeaders headers = new HttpHeaders();
293302
headers.setContentType(MediaType.valueOf("application/cloudevents+json;charset=utf-8"));
294303

@@ -308,23 +317,25 @@ public void testAsStructuralPojoToPojo() throws Exception {
308317
assertThat(springReleaseEvent.getReleaseName()).isEqualTo("Spring Framework");
309318
assertThat(springReleaseEvent.getVersion()).isEqualTo("10.0");
310319

311-
312-
// assertThat(response.getHeaders().get(CloudEventMessageUtils.CANONICAL_SOURCE))
313-
// .isEqualTo(Collections.singletonList("http://spring.io/application-application"));
314-
// assertThat(response.getHeaders().get(CloudEventMessageUtils.CANONICAL_TYPE))
315-
// .isEqualTo(Collections.singletonList(SpringReleaseEvent.class.getName()));
320+
assertThat(response.getHeaders().get(CloudEventMessageUtils.HTTP_ATTR_PREFIX + CloudEventMessageUtils.SOURCE))
321+
.isEqualTo(Collections.singletonList("https://interface21.com/"));
322+
assertThat(response.getHeaders().get(CloudEventMessageUtils.HTTP_ATTR_PREFIX + CloudEventMessageUtils.TYPE))
323+
.isEqualTo(Collections.singletonList("com.interface21"));
324+
assertThat(response.getHeaders().get(CloudEventMessageUtils.HTTP_ATTR_PREFIX + CloudEventMessageUtils.ID)).isNotNull();
316325
}
317326

318327
@Test
319328
public void testPojoConsumer() throws Exception {
320-
SpringApplication.run(new Class[] {CloudeventDemoApplication.class}, new String[] {});
329+
ApplicationContext context = SpringApplication.run(new Class[] {CloudeventDemoApplication.class}, new String[] {});
321330

322331
HttpHeaders headers = this.buildHeaders(MediaType.APPLICATION_JSON);
323332
String payload = "{\"releaseDate\":\"01-10-2006\", \"releaseName\":\"Spring Framework\", \"version\":\"1.0\"}";
324333

325334
RequestEntity<String> re = new RequestEntity<>(payload, headers, HttpMethod.POST, this.constructURI("/pojoConsumer"));
326335
ResponseEntity<String> response = testRestTemplate.exchange(re, String.class);
327336
assertThat(response.getStatusCode()).isEqualTo(HttpStatus.ACCEPTED);
337+
CloudeventDemoApplication application = context.getBean(CloudeventDemoApplication.class);
338+
assertThat(application.consumerSuccess).isTrue();
328339
}
329340

330341
private URI constructURI(String path) throws Exception {

0 commit comments

Comments
 (0)