Skip to content

Commit f999cdd

Browse files
committed
GH-422 Formalize Cloud Event conversion strategy to consistently handle binary-mode and structured-mode cloud events
Moved CloudEvent related artifacts to ‘cloud events’ package with hopes to eventually donating it to CNCF SDK Created CloudEventUtils identifying necessary constants and utility methods
1 parent 3291863 commit f999cdd

File tree

14 files changed

+703
-95
lines changed

14 files changed

+703
-95
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
/*
2+
* Copyright 2020-2020 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.cloud.function.cloudevent;
18+
19+
import java.util.Map;
20+
import java.util.function.Function;
21+
22+
import org.springframework.cloud.function.context.config.SmartCompositeMessageConverter;
23+
import org.springframework.messaging.Message;
24+
import org.springframework.messaging.MessageHeaders;
25+
import org.springframework.messaging.converter.CompositeMessageConverter;
26+
import org.springframework.messaging.converter.ContentTypeResolver;
27+
import org.springframework.messaging.converter.DefaultContentTypeResolver;
28+
import org.springframework.messaging.support.MessageBuilder;
29+
import org.springframework.util.Assert;
30+
import org.springframework.util.MimeType;
31+
import org.springframework.util.MimeTypeUtils;
32+
33+
/**
34+
* A Cloud Events specific pre-processor that is added to {@link SmartCompositeMessageConverter}
35+
* to potentially modify incoming message.
36+
* <br><br>
37+
* For Cloud Event coming in binary-mode such modification implies determining
38+
* content type of the 'data' attribute (see {@link #getDataContentType(MessageHeaders)}
39+
* of Cloud Event and creating a new {@link Message} with its `contentType` set to such
40+
* content type while copying the rest of the headers.
41+
* <br><br>
42+
* Similar to Cloud Event coming in binary-mode, the Cloud Event coming in structured-mode
43+
* such modification also implies determining content type of the 'data' attribute
44+
* (see {@link #getDataContentType(MessageHeaders)}...
45+
*
46+
* @author Oleg Zhurakousky
47+
* @since 3.1
48+
*/
49+
public class CloudEventDataContentTypeMessagePreProcessor implements Function<Message<?>, Message<?>> {
50+
51+
private final ContentTypeResolver contentTypeResolver = new DefaultContentTypeResolver();
52+
53+
private final MimeType cloudEventContentType = MimeTypeUtils.parseMimeType("application/cloudevents");
54+
55+
private final CompositeMessageConverter messageConverter;
56+
57+
public CloudEventDataContentTypeMessagePreProcessor(CompositeMessageConverter messageConverter) {
58+
Assert.notNull(messageConverter, "'messageConverter' must not be null");
59+
this.messageConverter = messageConverter;
60+
}
61+
62+
@SuppressWarnings("unchecked")
63+
@Override
64+
public Message<?> apply(Message<?> inputMessage) {
65+
if (CloudEventUtils.isBinary(inputMessage)) {
66+
String dataContentType = this.getDataContentType(inputMessage.getHeaders());
67+
Message<?> message = MessageBuilder.fromMessage(inputMessage)
68+
.setHeader(MessageHeaders.CONTENT_TYPE, dataContentType)
69+
// .setHeader("originalContentType", inputMessage.getHeaders().get(MessageHeaders.CONTENT_TYPE)) not sure about it
70+
.build();
71+
return message;
72+
}
73+
else if (this.isStructured(inputMessage)) {
74+
MimeType contentType = this.contentTypeResolver.resolve(inputMessage.getHeaders());
75+
String dataContentType = this.getDataContentType(inputMessage.getHeaders());
76+
String suffix = contentType.getSubtypeSuffix();
77+
MimeType cloudEventDeserializationContentType = MimeTypeUtils
78+
.parseMimeType(contentType.getType() + "/" + suffix);
79+
Message<?> cloudEventMessage = MessageBuilder.fromMessage(inputMessage)
80+
.setHeader(MessageHeaders.CONTENT_TYPE, cloudEventDeserializationContentType)
81+
.setHeader(CloudEventUtils.CE_DATACONTENTTYPE, dataContentType).build();
82+
Map<String, Object> structuredCloudEvent = (Map<String, Object>) this.messageConverter
83+
.fromMessage(cloudEventMessage, Map.class);
84+
Message<?> binaryCeMessage = this.buildCeMessageFromStructured(structuredCloudEvent);
85+
return binaryCeMessage;
86+
}
87+
else {
88+
return inputMessage;
89+
}
90+
}
91+
92+
private Message<?> buildCeMessageFromStructured(Map<String, Object> structuredCloudEvent) {
93+
MessageBuilder<?> builder = MessageBuilder.withPayload(structuredCloudEvent.get(CloudEventUtils.DATA));
94+
structuredCloudEvent.remove(CloudEventUtils.DATA);
95+
builder.copyHeaders(structuredCloudEvent);
96+
return builder.build();
97+
}
98+
99+
private String getDataContentType(MessageHeaders headers) {
100+
if (headers.containsKey(CloudEventUtils.DATACONTENTTYPE)) {
101+
return (String) headers.get(CloudEventUtils.DATACONTENTTYPE);
102+
}
103+
else if (headers.containsKey(CloudEventUtils.CE_DATACONTENTTYPE)) {
104+
return (String) headers.get(CloudEventUtils.CE_DATACONTENTTYPE);
105+
}
106+
else if (headers.containsKey(MessageHeaders.CONTENT_TYPE)) {
107+
return headers.get(MessageHeaders.CONTENT_TYPE).toString();
108+
}
109+
return "application/json";
110+
}
111+
112+
private boolean isStructured(Message<?> message) {
113+
if (!CloudEventUtils.isBinary(message)) {
114+
Map<String, Object> headers = message.getHeaders();
115+
116+
if (headers.containsKey(MessageHeaders.CONTENT_TYPE)) {
117+
MimeType contentType = this.contentTypeResolver.resolve(message.getHeaders());
118+
if (contentType.getType().equals(this.cloudEventContentType.getType())
119+
&& contentType.getSubtype().startsWith(this.cloudEventContentType.getSubtype())) {
120+
return true;
121+
}
122+
}
123+
}
124+
return false;
125+
}
126+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
/*
2+
* Copyright 2020-2020 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.cloud.function.cloudevent;
18+
19+
import org.springframework.cloud.function.context.config.JsonMessageConverter;
20+
import org.springframework.cloud.function.json.JsonMapper;
21+
import org.springframework.messaging.converter.MessageConverter;
22+
import org.springframework.util.MimeType;
23+
24+
/**
25+
* Implementation of {@link MessageConverter} which uses Jackson or Gson libraries to do the
26+
* actual conversion via {@link JsonMapper} instance.
27+
*
28+
* @author Oleg Zhurakousky
29+
*
30+
* @since 3.1
31+
*/
32+
public class CloudEventJsonMessageConverter extends JsonMessageConverter {
33+
34+
public CloudEventJsonMessageConverter(JsonMapper jsonMapper) {
35+
super(jsonMapper, new MimeType("application", "cloudevents+json"));
36+
this.setStrictContentTypeMatch(true);
37+
}
38+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,147 @@
1+
/*
2+
* Copyright 2020-2020 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.cloud.function.cloudevent;
18+
19+
import java.util.Map;
20+
21+
import org.springframework.messaging.Message;
22+
23+
/**
24+
* Miscellaneous utility methods to deal with Cloud Events - https://cloudevents.io/.
25+
* <br>
26+
* Mainly for internal use within the framework;
27+
*
28+
* @author Oleg Zhurakousky
29+
* @since 3.1
30+
*/
31+
public final class CloudEventUtils {
32+
33+
private CloudEventUtils() {
34+
35+
}
36+
37+
/**
38+
* Prefix for attributes.
39+
*/
40+
public static String ATTR_PREFIX = "ce_";
41+
42+
/**
43+
* Value for 'data' attribute.
44+
*/
45+
public static String DATA = "data";
46+
47+
/**
48+
* Value for 'data' attribute with prefix.
49+
*/
50+
public static String CE_DATA = ATTR_PREFIX + DATA;
51+
52+
/**
53+
* Value for 'id' attribute.
54+
*/
55+
public static String ID = "id";
56+
57+
/**
58+
* Value for 'id' attribute with prefix.
59+
*/
60+
public static String CE_ID = ATTR_PREFIX + ID;
61+
62+
/**
63+
* Value for 'source' attribute.
64+
*/
65+
public static String SOURCE = "source";
66+
67+
/**
68+
* Value for 'source' attribute with prefix.
69+
*/
70+
public static String CE_SOURCE = ATTR_PREFIX + SOURCE;
71+
72+
/**
73+
* Value for 'specversion' attribute.
74+
*/
75+
public static String SPECVERSION = "specversion";
76+
77+
/**
78+
* Value for 'specversion' attribute with prefix.
79+
*/
80+
public static String CE_SPECVERSION = ATTR_PREFIX + SPECVERSION;
81+
82+
/**
83+
* Value for 'type' attribute.
84+
*/
85+
public static String TYPE = "type";
86+
87+
/**
88+
* Value for 'type' attribute with prefix.
89+
*/
90+
public static String CE_TYPE = ATTR_PREFIX + TYPE;
91+
92+
/**
93+
* Value for 'datacontenttype' attribute.
94+
*/
95+
public static String DATACONTENTTYPE = "datacontenttype";
96+
97+
/**
98+
* Value for 'datacontenttype' attribute with prefix.
99+
*/
100+
public static String CE_DATACONTENTTYPE = ATTR_PREFIX + DATACONTENTTYPE;
101+
102+
/**
103+
* Value for 'dataschema' attribute.
104+
*/
105+
public static String DATASCHEMA = "dataschema";
106+
107+
/**
108+
* Value for 'dataschema' attribute with prefix.
109+
*/
110+
public static String CE_DATASCHEMA = ATTR_PREFIX + DATASCHEMA;
111+
112+
/**
113+
* Value for 'subject' attribute.
114+
*/
115+
public static String SUBJECT = "subject";
116+
117+
/**
118+
* Value for 'subject' attribute with prefix.
119+
*/
120+
public static String CE_SUBJECT = ATTR_PREFIX + SUBJECT;
121+
122+
/**
123+
* Value for 'time' attribute.
124+
*/
125+
public static String TIME = "time";
126+
127+
/**
128+
* Value for 'time' attribute with prefix.
129+
*/
130+
public static String CE_TIME = ATTR_PREFIX + TIME;
131+
132+
/**
133+
* Checks if {@link Message} represents cloud event in binary-mode.
134+
*/
135+
public static boolean isBinary(Message<?> message) {
136+
Map<String, Object> headers = message.getHeaders();
137+
return (headers.containsKey("id")
138+
&& headers.containsKey("source")
139+
&& headers.containsKey("specversion")
140+
&& headers.containsKey("type"))
141+
||
142+
(headers.containsKey("ce_id")
143+
&& headers.containsKey("ce_source")
144+
&& headers.containsKey("ce_specversion")
145+
&& headers.containsKey("ce_type"));
146+
}
147+
}

spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/SimpleFunctionRegistry.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616

1717
package org.springframework.cloud.function.context.catalog;
1818

19-
import java.lang.reflect.Array;
2019
import java.lang.reflect.Field;
2120
import java.lang.reflect.ParameterizedType;
2221
import java.lang.reflect.Type;

spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/CloudEventJsonMessageConverter.java

Lines changed: 0 additions & 72 deletions
This file was deleted.

0 commit comments

Comments
 (0)