Skip to content

Commit 147844b

Browse files
authored
Allow pulsar metadata headers to be mapped outbound (#1038)
This allows pulsar metadata headers, which are excluded on the outbound messages by default, to be included in the outbound message headers. Resolves #1037
1 parent e2e55a2 commit 147844b

File tree

4 files changed

+36
-27
lines changed

4 files changed

+36
-27
lines changed

spring-pulsar-docs/src/main/antora/modules/ROOT/pages/reference/pulsar-header.adoc

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -63,9 +63,9 @@ static class PulsarHeadersCustomObjectMapperTestConfig {
6363

6464
=== Inbound/Outbound Patterns
6565
On the inbound side, by default, all Pulsar headers (message metadata plus user properties) are mapped to `MessageHeaders`.
66-
On the outbound side, by default, all `MessageHeaders` are mapped, except `id`, `timestamp`, and the headers that represent the Pulsar message metadata.
66+
On the outbound side, by default, all `MessageHeaders` are mapped, except `id`, `timestamp`, and the headers that represent the Pulsar message metadata (i.e. the headers that are prefixed with `pulsar_message_`).
6767
You can specify which headers are mapped for inbound and outbound messages by configuring the `inboundPatterns` and `outboundPatterns` on a mapper bean you provide.
68-
68+
You can include Pulsar message metadata headers on the outbound messages by adding the exact header name to the `outboundPatterns` as patterns are not supported for metadata headers.
6969
Patterns are rather simple and can contain a leading wildcard (`\*`), a trailing wildcard, or both (for example, `*.cat.*`).
7070
You can negate patterns with a leading `!`.
7171
The first pattern that matches a header name (whether positive or negative) wins.

spring-pulsar/src/main/java/org/springframework/pulsar/support/header/AbstractPulsarHeaderMapper.java

Lines changed: 17 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,13 @@ public abstract class AbstractPulsarHeaderMapper<ToPulsarHeadersContextType, ToS
5555

5656
private static final PatternMatch EXCLUDE_PATTERN_TIMESTAMP = PatternMatch.fromPatternString("!timestamp");
5757

58+
private static final List<String> NEVER_MATCH_OUTBOUND_INTERNAL_HEADERS = List.of(PulsarHeaders.KEY,
59+
PulsarHeaders.KEY_BYTES, PulsarHeaders.ORDERING_KEY, PulsarHeaders.INDEX, PulsarHeaders.MESSAGE_ID,
60+
PulsarHeaders.BROKER_PUBLISH_TIME, PulsarHeaders.EVENT_TIME, PulsarHeaders.MESSAGE_SIZE,
61+
PulsarHeaders.PRODUCER_NAME, PulsarHeaders.RAW_DATA, PulsarHeaders.PUBLISH_TIME,
62+
PulsarHeaders.REDELIVERY_COUNT, PulsarHeaders.REPLICATED_FROM, PulsarHeaders.SCHEMA_VERSION,
63+
PulsarHeaders.SEQUENCE_ID, PulsarHeaders.TOPIC_NAME);
64+
5865
protected final LogAccessor logger = new LogAccessor(this.getClass());
5966

6067
private final List<PulsarHeaderMatcher> inboundMatchers = new ArrayList<>();
@@ -64,10 +71,9 @@ public abstract class AbstractPulsarHeaderMapper<ToPulsarHeadersContextType, ToS
6471
/**
6572
* Construct a mapper that will match the supplied inbound and outbound patterns.
6673
* <p>
67-
* <strong>NOTE:</strong> Internal framework headers are <em>never</em> mapped
68-
* outbound. By default, the {@code "id"} and {@code "timestamp"} headers are also
69-
* excluded from outbound mapping but can be included by adding them to
70-
* {@code outboundPatterns}.
74+
* <strong>NOTE:</strong> By default, internal framework headers and the {@code "id"}
75+
* and {@code "timestamp"} headers are <em>not</em> mapped outbound but can be
76+
* included by adding them to {@code outboundPatterns}.
7177
* <p>
7278
* <strong>NOTE:</strong> The patterns are applied in order, stopping on the first
7379
* match (positive or negative). When no pattern is specified, the {@code "*"} pattern
@@ -84,23 +90,7 @@ public AbstractPulsarHeaderMapper(List<String> inboundPatterns, List<String> out
8490
Objects.requireNonNull(outboundPatterns, "outboundPatterns must be specified");
8591
inboundPatterns.forEach((p) -> this.inboundMatchers.add(PatternMatch.fromPatternString(p)));
8692
// @formatter:off
87-
this.outboundMatchers.add(new NeverMatch(
88-
PulsarHeaders.KEY,
89-
PulsarHeaders.KEY_BYTES,
90-
PulsarHeaders.ORDERING_KEY,
91-
PulsarHeaders.INDEX,
92-
PulsarHeaders.MESSAGE_ID,
93-
PulsarHeaders.BROKER_PUBLISH_TIME,
94-
PulsarHeaders.EVENT_TIME,
95-
PulsarHeaders.MESSAGE_SIZE,
96-
PulsarHeaders.PRODUCER_NAME,
97-
PulsarHeaders.RAW_DATA,
98-
PulsarHeaders.PUBLISH_TIME,
99-
PulsarHeaders.REDELIVERY_COUNT,
100-
PulsarHeaders.REPLICATED_FROM,
101-
PulsarHeaders.SCHEMA_VERSION,
102-
PulsarHeaders.SEQUENCE_ID,
103-
PulsarHeaders.TOPIC_NAME));
93+
this.outboundMatchers.add(getNeverMatch(outboundPatterns));
10494
// @formatter:on
10595
if (outboundPatterns.isEmpty()) {
10696
this.outboundMatchers.add(EXCLUDE_PATTERN_ID);
@@ -114,6 +104,12 @@ public AbstractPulsarHeaderMapper(List<String> inboundPatterns, List<String> out
114104
}
115105
}
116106

107+
private NeverMatch getNeverMatch(List<String> outboundPatterns) {
108+
List<String> neverMatches = new ArrayList<>(NEVER_MATCH_OUTBOUND_INTERNAL_HEADERS);
109+
neverMatches.removeAll(outboundPatterns);
110+
return new NeverMatch(neverMatches.toArray(new String[0]));
111+
}
112+
117113
@Override
118114
public Map<String, String> toPulsarHeaders(MessageHeaders springHeaders) {
119115
Objects.requireNonNull(springHeaders, "springHeaders must not be null");

spring-pulsar/src/test/java/org/springframework/pulsar/support/header/AbstractPulsarHeaderMapperTests.java

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828

2929
import java.util.Collections;
3030
import java.util.HashMap;
31+
import java.util.List;
3132
import java.util.Objects;
3233

3334
import org.apache.pulsar.client.api.Message;
@@ -206,6 +207,19 @@ void ensureCallbacksInvoked() {
206207
verify(spyTestMapper, times(springHeaders.size())).matchesForOutbound(anyString());
207208
}
208209

210+
@Test
211+
void neverMatchFiltersCanBeConfigured() {
212+
var mapper = mapperWithOutboundPatterns(PulsarHeaders.KEY, PulsarHeaders.MESSAGE_ID,
213+
PulsarHeaders.PRODUCER_NAME, "noSuchInternalHeader");
214+
var springHeaders = new HashMap<String, Object>();
215+
springHeaders.put(PulsarHeaders.KEY, "testKey");
216+
springHeaders.put(PulsarHeaders.KEY_BYTES, "testKeyBytes");
217+
springHeaders.put(PulsarHeaders.MESSAGE_ID, "testMsg");
218+
springHeaders.put(PulsarHeaders.PRODUCER_NAME, "testProducer");
219+
assertThat(mapper.toPulsarHeaders(new MessageHeaders(springHeaders))).containsOnlyKeys(PulsarHeaders.KEY,
220+
PulsarHeaders.MESSAGE_ID, PulsarHeaders.PRODUCER_NAME);
221+
}
222+
209223
}
210224

211225
@Nested
@@ -249,7 +263,7 @@ static class TestPulsarHeaderMapper extends AbstractPulsarHeaderMapper<String, S
249263
private String toPulsarHeadersContext;
250264

251265
TestPulsarHeaderMapper(String toSpringHeadersContext, String toPulsarHeadersContext) {
252-
super(Collections.emptyList(), Collections.emptyList());
266+
super(List.of(), List.of());
253267
this.toSpringHeadersContext = toSpringHeadersContext;
254268
this.toPulsarHeadersContext = toPulsarHeadersContext;
255269
}

spring-pulsar/src/test/java/org/springframework/pulsar/support/header/ToStringPulsarHeaderMapperTests.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
import static org.assertj.core.api.Assertions.assertThat;
2020
import static org.springframework.pulsar.support.header.PulsarHeaderMapperTestUtil.mockPulsarMessage;
2121

22-
import java.util.Collections;
2322
import java.util.HashMap;
2423
import java.util.List;
2524
import java.util.Map;
@@ -43,12 +42,12 @@ class ToStringPulsarHeaderMapperTests extends AbstractPulsarHeaderMapperTests {
4342

4443
@Override
4544
AbstractPulsarHeaderMapper<?, ?> mapperWithInboundPatterns(String... patterns) {
46-
return new ToStringPulsarHeaderMapper(List.of(patterns), Collections.emptyList());
45+
return new ToStringPulsarHeaderMapper(List.of(patterns), List.of());
4746
}
4847

4948
@Override
5049
AbstractPulsarHeaderMapper<?, ?> mapperWithOutboundPatterns(String... patterns) {
51-
return new ToStringPulsarHeaderMapper(Collections.emptyList(), List.of(patterns));
50+
return new ToStringPulsarHeaderMapper(List.of(), List.of(patterns));
5251
}
5352

5453
@Test

0 commit comments

Comments
 (0)