Skip to content

Commit 5e0916c

Browse files
authored
[Fix #934] Implementing function catalog (#1021)
[Fix #934] Implementing function catalog (alt) Signed-off-by: fjtirado <[email protected]>
1 parent 5e40aef commit 5e0916c

File tree

19 files changed

+398
-102
lines changed

19 files changed

+398
-102
lines changed
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
/*
2+
* Copyright 2020-Present The Serverless Workflow Specification Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.serverlessworkflow.impl;
17+
18+
import io.serverlessworkflow.api.types.Task;
19+
import io.serverlessworkflow.impl.resources.ExternalResourceHandler;
20+
import java.util.function.Function;
21+
22+
public interface FunctionReader extends Function<ExternalResourceHandler, Task> {}
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
/*
2+
* Copyright 2020-Present The Serverless Workflow Specification Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.serverlessworkflow.impl;
17+
18+
public interface NamedObject {
19+
String name();
20+
}

impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowApplication.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@ public class WorkflowApplication implements AutoCloseable {
7676
private final SecretManager secretManager;
7777
private final SchedulerListener schedulerListener;
7878
private final Optional<URITemplateResolver> templateResolver;
79+
private final Optional<FunctionReader> functionReader;
7980

8081
private WorkflowApplication(Builder builder) {
8182
this.taskFactory = builder.taskFactory;
@@ -98,6 +99,7 @@ private WorkflowApplication(Builder builder) {
9899
this.configManager = builder.configManager;
99100
this.secretManager = builder.secretManager;
100101
this.templateResolver = builder.templateResolver;
102+
this.functionReader = builder.functionReader;
101103
}
102104

103105
public TaskExecutorFactory taskFactory() {
@@ -178,6 +180,7 @@ public SchemaValidator getValidator(SchemaInline inline) {
178180
private ConfigManager configManager;
179181
private SchedulerListener schedulerListener;
180182
private Optional<URITemplateResolver> templateResolver;
183+
private Optional<FunctionReader> functionReader;
181184

182185
private Builder() {
183186
ServiceLoader.load(NamedWorkflowAdditionalObject.class)
@@ -329,6 +332,7 @@ public WorkflowApplication build() {
329332
.orElseGet(() -> new ConfigSecretManager(configManager));
330333
}
331334
templateResolver = ServiceLoader.load(URITemplateResolver.class).findFirst();
335+
functionReader = ServiceLoader.load(FunctionReader.class).findFirst();
332336
return new WorkflowApplication(this);
333337
}
334338
}
@@ -406,6 +410,10 @@ public Optional<URITemplateResolver> templateResolver() {
406410
return templateResolver;
407411
}
408412

413+
public Optional<FunctionReader> functionReader() {
414+
return functionReader;
415+
}
416+
409417
public <T> Optional<T> additionalObject(
410418
String name, WorkflowContext workflowContext, TaskContext taskContext) {
411419
return Optional.ofNullable(additionalObjects.get(name))

impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowUtils.java

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -51,12 +51,8 @@ public static Optional<SchemaValidator> getSchemaValidator(
5151
return Optional.of(validatorFactory.getValidator(schema.getSchemaInline()));
5252
} else if (schema.getSchemaExternal() != null) {
5353
return Optional.of(
54-
resourceLoader.load(
55-
schema.getSchemaExternal().getResource(),
56-
validatorFactory::getValidator,
57-
null,
58-
null,
59-
null));
54+
resourceLoader.loadStatic(
55+
schema.getSchemaExternal().getResource(), validatorFactory::getValidator));
6056
}
6157
}
6258
return Optional.empty();

impl/core/src/main/java/io/serverlessworkflow/impl/additional/NamedWorkflowAdditionalObject.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
*/
1616
package io.serverlessworkflow.impl.additional;
1717

18-
public interface NamedWorkflowAdditionalObject<T> extends WorkflowAdditionalObject<T> {
19-
String name();
20-
}
18+
import io.serverlessworkflow.impl.NamedObject;
19+
20+
public interface NamedWorkflowAdditionalObject<T>
21+
extends WorkflowAdditionalObject<T>, NamedObject {}

impl/core/src/main/java/io/serverlessworkflow/impl/executors/CallFunctionExecutor.java

Lines changed: 65 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,20 @@
1616
package io.serverlessworkflow.impl.executors;
1717

1818
import io.serverlessworkflow.api.types.CallFunction;
19+
import io.serverlessworkflow.api.types.Catalog;
1920
import io.serverlessworkflow.api.types.FunctionArguments;
2021
import io.serverlessworkflow.api.types.Task;
2122
import io.serverlessworkflow.api.types.TaskBase;
23+
import io.serverlessworkflow.api.types.Use;
24+
import io.serverlessworkflow.api.types.UseCatalogs;
25+
import io.serverlessworkflow.api.types.UseFunctions;
2226
import io.serverlessworkflow.impl.WorkflowDefinition;
2327
import io.serverlessworkflow.impl.WorkflowMutablePosition;
2428
import io.serverlessworkflow.impl.WorkflowUtils;
2529
import io.serverlessworkflow.impl.WorkflowValueResolver;
30+
import io.serverlessworkflow.impl.resources.ExternalResourceHandler;
31+
import io.serverlessworkflow.impl.resources.ResourceLoader;
32+
import java.net.URI;
2633
import java.util.Map;
2734
import java.util.Optional;
2835

@@ -35,25 +42,71 @@ public class CallFunctionExecutor implements CallableTaskBuilder<CallFunction> {
3542
public void init(
3643
CallFunction task, WorkflowDefinition definition, WorkflowMutablePosition position) {
3744
String functionName = task.getCall();
45+
Use use = definition.workflow().getUse();
46+
Task function = null;
47+
if (use != null) {
48+
UseFunctions functions = use.getFunctions();
49+
if (functions != null) {
50+
function = functions.getAdditionalProperties().get(functionName);
51+
}
52+
if (function == null) {
53+
int indexOf = functionName.indexOf('@');
54+
if (indexOf > 0) {
55+
String catalogName = functionName.substring(indexOf + 1);
56+
UseCatalogs catalogs = use.getCatalogs();
57+
if (catalogs != null) {
58+
Catalog catalog = catalogs.getAdditionalProperties().get(catalogName);
59+
ResourceLoader loader = definition.resourceLoader();
60+
function =
61+
definition
62+
.resourceLoader()
63+
.loadURI(
64+
WorkflowUtils.concatURI(
65+
loader.uri(catalog.getEndpoint()),
66+
pathFromFunctionName(functionName.substring(0, indexOf))),
67+
h -> from(definition, h));
68+
}
69+
}
70+
}
71+
}
72+
if (function == null) {
73+
function =
74+
definition.resourceLoader().loadURI(URI.create(functionName), h -> from(definition, h));
75+
}
76+
executorBuilder =
77+
definition.application().taskFactory().getTaskExecutor(position, function, definition);
3878
FunctionArguments functionArgs = task.getWith();
3979
args =
4080
functionArgs != null
4181
? WorkflowUtils.buildMapResolver(
4282
definition.application(), functionArgs.getAdditionalProperties())
4383
: (w, t, m) -> Map.of();
44-
Task function = null;
45-
if (definition.workflow().getUse() != null
46-
&& definition.workflow().getUse().getFunctions() != null
47-
&& definition.workflow().getUse().getFunctions().getAdditionalProperties() != null) {
48-
function =
49-
definition.workflow().getUse().getFunctions().getAdditionalProperties().get(functionName);
50-
}
51-
if (function == null) {
52-
// TODO search in catalog
53-
throw new UnsupportedOperationException("Function Catalog not supported yet");
84+
}
85+
86+
private String pathFromFunctionName(String functionName) {
87+
int sep = functionName.indexOf(":");
88+
if (sep < 0) {
89+
throw new IllegalArgumentException(
90+
"Invalid function name "
91+
+ functionName
92+
+ ". It has to be of the format <function name>:<function version>");
5493
}
55-
executorBuilder =
56-
definition.application().taskFactory().getTaskExecutor(position, function, definition);
94+
StringBuilder sb = new StringBuilder(functionName);
95+
sb.setCharAt(sep, '/');
96+
sb.insert(0, "main/functions/");
97+
sb.append("/function.yaml");
98+
return sb.toString();
99+
}
100+
101+
private Task from(WorkflowDefinition definition, ExternalResourceHandler handler) {
102+
return definition
103+
.application()
104+
.functionReader()
105+
.map(v -> v.apply(handler))
106+
.orElseThrow(
107+
() ->
108+
new IllegalStateException(
109+
"No converter from external resource to function found. Make sure a dependency that includes an implementation of FunctionReader is included"));
57110
}
58111

59112
@Override

impl/core/src/main/java/io/serverlessworkflow/impl/resources/DefaultResourceLoader.java

Lines changed: 16 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -15,17 +15,7 @@
1515
*/
1616
package io.serverlessworkflow.impl.resources;
1717

18-
import static io.serverlessworkflow.impl.WorkflowUtils.getURISupplier;
19-
20-
import io.serverlessworkflow.api.types.Endpoint;
21-
import io.serverlessworkflow.api.types.EndpointUri;
22-
import io.serverlessworkflow.api.types.ExternalResource;
23-
import io.serverlessworkflow.impl.TaskContext;
2418
import io.serverlessworkflow.impl.WorkflowApplication;
25-
import io.serverlessworkflow.impl.WorkflowContext;
26-
import io.serverlessworkflow.impl.WorkflowModel;
27-
import io.serverlessworkflow.impl.WorkflowValueResolver;
28-
import io.serverlessworkflow.impl.expressions.ExpressionDescriptor;
2919
import java.net.MalformedURLException;
3020
import java.net.URI;
3121
import java.nio.file.Path;
@@ -35,18 +25,31 @@
3525
import java.util.concurrent.ConcurrentHashMap;
3626
import java.util.function.Function;
3727

38-
public class DefaultResourceLoader implements ResourceLoader {
28+
public class DefaultResourceLoader extends ResourceLoader {
3929

4030
private final Optional<Path> workflowPath;
41-
private final WorkflowApplication application;
4231

4332
private Map<ExternalResourceHandler, CachedResource> resourceCache = new ConcurrentHashMap<>();
4433

4534
protected DefaultResourceLoader(WorkflowApplication application, Path workflowPath) {
46-
this.application = application;
35+
super(application);
4736
this.workflowPath = Optional.ofNullable(workflowPath);
4837
}
4938

39+
@Override
40+
public <T> T loadURI(URI uri, Function<ExternalResourceHandler, T> function) {
41+
ExternalResourceHandler resourceHandler = buildFromURI(uri);
42+
return (T)
43+
resourceCache
44+
.compute(
45+
resourceHandler,
46+
(k, v) ->
47+
v == null || k.shouldReload(v.lastReload())
48+
? new CachedResource(Instant.now(), function.apply(k))
49+
: v)
50+
.content();
51+
}
52+
5053
private ExternalResourceHandler fileResource(String pathStr) {
5154
Path path = Path.of(pathStr);
5255
if (path.isAbsolute()) {
@@ -74,66 +77,6 @@ private ExternalResourceHandler buildFromURI(URI uri) {
7477
}
7578

7679
@Override
77-
public <T> T load(
78-
ExternalResource resource,
79-
Function<ExternalResourceHandler, T> function,
80-
WorkflowContext workflowContext,
81-
TaskContext taskContext,
82-
WorkflowModel model) {
83-
ExternalResourceHandler resourceHandler =
84-
buildFromURI(
85-
uriSupplier(resource.getEndpoint())
86-
.apply(
87-
workflowContext,
88-
taskContext,
89-
model == null ? application.modelFactory().fromNull() : model));
90-
return (T)
91-
resourceCache
92-
.compute(
93-
resourceHandler,
94-
(k, v) ->
95-
v == null || k.shouldReload(v.lastReload())
96-
? new CachedResource(Instant.now(), function.apply(k))
97-
: v)
98-
.content();
99-
}
100-
101-
@Override
102-
public WorkflowValueResolver<URI> uriSupplier(Endpoint endpoint) {
103-
if (endpoint.getEndpointConfiguration() != null) {
104-
EndpointUri uri = endpoint.getEndpointConfiguration().getUri();
105-
if (uri.getLiteralEndpointURI() != null) {
106-
return getURISupplier(application, uri.getLiteralEndpointURI());
107-
} else if (uri.getExpressionEndpointURI() != null) {
108-
return new ExpressionURISupplier(
109-
application
110-
.expressionFactory()
111-
.resolveString(ExpressionDescriptor.from(uri.getExpressionEndpointURI())));
112-
}
113-
} else if (endpoint.getRuntimeExpression() != null) {
114-
return new ExpressionURISupplier(
115-
application
116-
.expressionFactory()
117-
.resolveString(ExpressionDescriptor.from(endpoint.getRuntimeExpression())));
118-
} else if (endpoint.getUriTemplate() != null) {
119-
return getURISupplier(application, endpoint.getUriTemplate());
120-
}
121-
throw new IllegalArgumentException("Invalid endpoint definition " + endpoint);
122-
}
123-
124-
private class ExpressionURISupplier implements WorkflowValueResolver<URI> {
125-
private WorkflowValueResolver<String> expr;
126-
127-
public ExpressionURISupplier(WorkflowValueResolver<String> expr) {
128-
this.expr = expr;
129-
}
130-
131-
@Override
132-
public URI apply(WorkflowContext workflow, TaskContext task, WorkflowModel node) {
133-
return URI.create(expr.apply(workflow, task, node));
134-
}
135-
}
136-
13780
public void close() {
13881
resourceCache.clear();
13982
}
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
/*
2+
* Copyright 2020-Present The Serverless Workflow Specification Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.serverlessworkflow.impl.resources;
17+
18+
import java.io.UncheckedIOException;
19+
import java.net.MalformedURLException;
20+
import java.net.URL;
21+
22+
public class GitHubHelper {
23+
24+
private GitHubHelper() {}
25+
26+
private static final String BLOB = "blob/";
27+
28+
public static URL handleURL(URL url) {
29+
if (url.getHost().equals("github.com")) {
30+
try {
31+
String path = url.getPath();
32+
if (path.startsWith(BLOB)) {
33+
path = path.substring(BLOB.length());
34+
}
35+
return new URL(url.getProtocol(), "raw.githubusercontent.com", url.getPort(), path);
36+
} catch (MalformedURLException e) {
37+
throw new UncheckedIOException(e);
38+
}
39+
} else {
40+
return url;
41+
}
42+
}
43+
}

impl/core/src/main/java/io/serverlessworkflow/impl/resources/HttpResource.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ public class HttpResource implements ExternalResourceHandler {
2828
private URL url;
2929

3030
public HttpResource(URL url) {
31-
this.url = url;
31+
this.url = GitHubHelper.handleURL(url);
3232
}
3333

3434
@Override

0 commit comments

Comments
 (0)