-
Notifications
You must be signed in to change notification settings - Fork 56
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
MQTT topics with path parameters required to match guarded identity #1387
base: develop
Are you sure you want to change the base?
Conversation
...tt/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/config/MqttPublishConfigBuilder.java
Outdated
Show resolved
Hide resolved
...tt/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/config/MqttPublishConfigBuilder.java
Outdated
Show resolved
Hide resolved
...tt/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/config/MqttPublishConfigBuilder.java
Show resolved
Hide resolved
.../src/main/java/io/aklivity/zilla/runtime/binding/mqtt/config/MqttSubscribeConfigBuilder.java
Outdated
Show resolved
Hide resolved
...a/io/aklivity/zilla/runtime/binding/mqtt/internal/config/MqttConditionConfigAdapterTest.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We often test via spec scripts to verify implementation behavior instead of tightly coupled unit tests.
The intent is to be able to refactor the code, but still have stable tests to verify new implementation. Tightly coupled tests are typically forced to change when the code is refactored, preventing the stable basis to verify consistency.
Let's discuss further over Slack community as needed on this feedback.
.../java/io/aklivity/zilla/runtime/binding/mqtt/internal/config/MqttConditionConfigAdapter.java
Outdated
Show resolved
Hide resolved
.../java/io/aklivity/zilla/runtime/binding/mqtt/internal/config/MqttConditionConfigAdapter.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See HttpOptionsConfigAdapter for an example of a fairly complex object adapter so you can see the coding patterns.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, I updated the MqttConditionConfigAdapter
class to align with the conding patterns found in other Adapter classes.
Let me know if you like it, thanks!
...tt/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/internal/config/MqttRouteConfig.java
Outdated
Show resolved
Hide resolved
8d2dfae
to
2f9095f
Compare
2f9095f
to
d15893e
Compare
if (subscribeJson.containsKey(PARAMS_NAME)) | ||
{ | ||
subscribeJson.getJsonObject(PARAMS_NAME).forEach((n, v) -> subscribe.param() | ||
.name(n) | ||
.value(JsonString.class.cast(v).getString()) | ||
.build()); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if (subscribeJson.containsKey(PARAMS_NAME)) | |
{ | |
subscribeJson.getJsonObject(PARAMS_NAME).forEach((n, v) -> subscribe.param() | |
.name(n) | |
.value(JsonString.class.cast(v).getString()) | |
.build()); | |
} | |
if (subscribeJson.containsKey(PARAMS_NAME)) | |
{ | |
JsonObject paramsJson = subscribeJson.getJsonObject(PARAMS_NAME); | |
paramsJson.keys().forEach(n -> | |
subscribe.param() | |
.name(n) | |
.value(paramsJson.getString(n)) | |
.build()); | |
} |
if (publishJson.containsKey(PARAMS_NAME)) | ||
{ | ||
publishJson.getJsonObject(PARAMS_NAME).forEach((n, v) -> publish.param() | ||
.name(n) | ||
.value(JsonString.class.cast(v).getString()) | ||
.build()); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same feedback as above for subscribe
params.
private final List<LongObjPredicate<String>> subscribeMatchPredicates; | ||
private final List<LongObjPredicate<String>> publishMatchPredicates; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
private final List<LongObjPredicate<String>> subscribeMatchPredicates; | |
private final List<LongObjPredicate<String>> publishMatchPredicates; | |
private final List<TopicMatcher> subscribeMatchers; | |
private final List<TopicMatcher> publishMatchers; |
and create a private non-static inner class TopicMatcher
.
private final class TopicMatcher
{
private final Matcher matchTopic;
private final Map<String, LongObjPredicate> matchParams;
private TopicMatcher(
MqttRouteConfig route,
String wildcard,
List<MqttParamConfig> params)
{
this.matchTopic = Pattern.compile(wildcard
.replace(".", "\\.")
.replace("$", "\\$")
.replace("+", "[^/]*")
.replace("#", ".*")
.replaceAll("\\{([a-zA-Z_]+)\\}", "(?<$1>[^/]+)")).matcher("");
this.matchParams = params != null
? params.stream()
.collect(Collectors.toMap(p -> p.name, p -> asTopicParamMatcher(route.identities::get, p.value))
: null;
}
private boolean matches(
long authorization,
String topic)
{
return matchTopic.reset(topic).matches() &&
matchParams(matchTopic::group, authorization);
}
private boolean matchParams(
UnaryFunction<String> valuesByName
long authorization)
{
return matchParams == null ||
matchParams.entrySet().stream()
.allMatch(e -> e.getValue().test(authorization, valuesByName.apply(e.getKey())));
}
private LongObjPredicate asTopicParamMatcher(
Function<String, LongFunction> identities,
String value)
{
return (identityMatcher.reset(param.value).matches())
? asTopicParamIdentityMatcher(identities.apply(identityMatcher.group(1)))
: asTopicParamValueMatcher(value);
}
private LongObjPredicate asTopicParamIdentityMatcher(
LongFunction<String> identity)
{
return (a, v) -> v != null && identity != null && v.equals(identity.apply(a));
}
private LongObjPredicate asTopicParamMatcher(
String expected)
{
return (a, v) -> v != null && v.equals(expected);
}
}
Something more like this perhaps.
MqttConditionConfig condition) | ||
{ | ||
this.sessionMatchers = | ||
condition.sessions != null && !condition.sessions.isEmpty() ? | ||
asWildcardMatcher(condition.sessions.stream().map(s -> s.clientId).collect(Collectors.toList())) : null; | ||
this.subscribeMatchers = | ||
|
||
Matcher identityMatcher = IDENTITY_PATTERN.matcher(""); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Move this to a field on the class instead of passing to asTopicMatchPredicate(...)
.
private final Matcher identityMatcher = IDENTITY_PATTERN.matcher("");
this.subscribeMatchPredicates = | ||
condition.subscribes != null && !condition.subscribes.isEmpty() ? | ||
asTopicMatcher(condition.subscribes.stream().map(s -> s.topic).collect(Collectors.toList())) : null; | ||
this.publishMatchers = | ||
condition.subscribes.stream().map(s -> asTopicMatchPredicate(s.topic, s.params, route, identityMatcher)) | ||
.collect(Collectors.toList()) : null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this.subscribeMatchPredicates = | |
condition.subscribes != null && !condition.subscribes.isEmpty() ? | |
asTopicMatcher(condition.subscribes.stream().map(s -> s.topic).collect(Collectors.toList())) : null; | |
this.publishMatchers = | |
condition.subscribes.stream().map(s -> asTopicMatchPredicate(s.topic, s.params, route, identityMatcher)) | |
.collect(Collectors.toList()) : null; | |
this.subscribeMatchers = condition.subscribes != null && !condition.subscribes.isEmpty() | |
? condition.subscribes.stream().map(s -> new TopicMatcher(route::identity, s.topic, s.params)).toList() | |
: null; |
this.publishMatchPredicates = | ||
condition.publishes != null && !condition.publishes.isEmpty() ? | ||
asTopicMatcher(condition.publishes.stream().map(s -> s.topic).collect(Collectors.toList())) : null; | ||
condition.publishes.stream().map(s -> asTopicMatchPredicate(s.topic, s.params, route, identityMatcher)) | ||
.collect(Collectors.toList()) : null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this.publishMatchPredicates = | |
condition.publishes != null && !condition.publishes.isEmpty() ? | |
asTopicMatcher(condition.publishes.stream().map(s -> s.topic).collect(Collectors.toList())) : null; | |
condition.publishes.stream().map(s -> asTopicMatchPredicate(s.topic, s.params, route, identityMatcher)) | |
.collect(Collectors.toList()) : null; | |
this.publishMatchers = condition.publishes != null && ! condition.publishes.isEmpty() | |
? condition.publishes.stream().map(p -> new TopicMatcher(route::identity, p.topic, p.params)).toList() | |
: null; |
String identity( | ||
String guard, | ||
long authorization) | ||
{ | ||
return identities.getOrDefault(guard, a -> null).apply(authorization); | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lookup by guard name can be moved to parse time as we know the guard name when parsing the value expression.
So this needs to return the LongFunction
instead.
String identity( | |
String guard, | |
long authorization) | |
{ | |
return identities.getOrDefault(guard, a -> null).apply(authorization); | |
} | |
String identity( | |
String guard) | |
{ | |
return identities.get(guard); | |
} |
Alternatively, we could move identities
to public scope and just use route.identities::get
instead.
7a7b15c
to
a189179
Compare
The goal of this PR is to allow secure access to some MQTT client specific topics for publish and/or subscribe.
For
publish
orsubscribe
routes:jwt
guard)Example configuration:
Fixes #1382