diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/mcp/McpRetryConfig.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/mcp/McpRetryConfig.java new file mode 100644 index 0000000000..44889f9ffa --- /dev/null +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/mcp/McpRetryConfig.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.common.config.connector.mcp; + +import lombok.Data; + +@Data +public class McpRetryConfig { + // maximum number of retries, default 2, minimum 0 + private int maxRetries = 2; + + // retry interval, default 1000ms + private int interval = 1000; + + // Default value is false, indicating that only requests with network-level errors will be retried. + // If set to true, all failed requests will be retried, including network-level errors and non-2xx responses. + private boolean retryOnNonSuccess = false; +} diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/mcp/McpSinkConfig.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/mcp/McpSinkConfig.java new file mode 100644 index 0000000000..ce645513c5 --- /dev/null +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/mcp/McpSinkConfig.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.common.config.connector.mcp; + +import org.apache.eventmesh.common.config.connector.SinkConfig; + +import lombok.Data; +import lombok.EqualsAndHashCode; + + +@Data +@EqualsAndHashCode(callSuper = true) +public class McpSinkConfig extends SinkConfig { + + public SinkConnectorConfig connectorConfig; +} diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/mcp/McpSourceConfig.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/mcp/McpSourceConfig.java new file mode 100644 index 0000000000..320cc3761f --- /dev/null +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/mcp/McpSourceConfig.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.common.config.connector.mcp; + +import org.apache.eventmesh.common.config.connector.SourceConfig; + +import lombok.Data; +import lombok.EqualsAndHashCode; + + +@Data +@EqualsAndHashCode(callSuper = true) +public class McpSourceConfig extends SourceConfig { + + public SourceConnectorConfig connectorConfig; +} diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/mcp/SinkConnectorConfig.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/mcp/SinkConnectorConfig.java new file mode 100644 index 0000000000..54a02fb6b5 --- /dev/null +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/mcp/SinkConnectorConfig.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.common.config.connector.mcp; + + +import lombok.Data; + +@Data +public class SinkConnectorConfig { + + private String connectorName; + + private String[] urls; + + // keepAlive, default true + private boolean keepAlive = true; + + // timeunit: ms, default 60000ms + private int keepAliveTimeout = 60 * 1000; // Keep units consistent + + // timeunit: ms, default 5000ms, recommended scope: 5000ms - 10000ms + private int connectionTimeout = 5000; + + // timeunit: ms, default 5000ms + private int idleTimeout = 5000; + + // maximum number of HTTP/1 connections a client will pool, default 50 + private int maxConnectionPoolSize = 50; + + // retry config + private McpRetryConfig retryConfig = new McpRetryConfig(); + + private String deliveryStrategy = "ROUND_ROBIN"; + + private boolean skipDeliverException = false; + + // managed pipelining param, default true + private boolean isParallelized = true; + + private int parallelism = 2; + + + /** + * Fill default values if absent (When there are multiple default values for a field) + * + * @param config SinkConnectorConfig + */ + public static void populateFieldsWithDefaults(SinkConnectorConfig config) { + /* + * set default values for idleTimeout + * recommended scope: common(5s - 10s), webhook(15s - 30s) + */ + final int commonHttpIdleTimeout = 5000; + + // Set default values for idleTimeout + if (config.getIdleTimeout() == 0) { + config.setIdleTimeout(commonHttpIdleTimeout); + } + + } +} diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/mcp/SourceConnectorConfig.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/mcp/SourceConnectorConfig.java new file mode 100644 index 0000000000..18808942e0 --- /dev/null +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/mcp/SourceConnectorConfig.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.common.config.connector.mcp; + +import java.util.HashMap; +import java.util.Map; + +import lombok.Data; + +@Data +public class SourceConnectorConfig { + + private String connectorName; + + private String path = "/"; + + private int port; + + // timeunit: ms, default 5000ms + private int idleTimeout = 5000; + + /** + * + */ + private int maxFormAttributeSize = 1024 * 1024; + + // max size of the queue, default 1000 + private int maxStorageSize = 1000; + + // batch size, default 10 + private int batchSize = 10; + + // protocol, default CloudEvent + private String protocol = "Mcp"; + + // extra config, e.g. GitHub secret + private Map extraConfig = new HashMap<>(); + + // data consistency enabled, default true + private boolean dataConsistencyEnabled = false; + + private String forwardPath; +} diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/utils/JsonUtils.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/utils/JsonUtils.java index f2328541c4..9b1bfe6f4e 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/utils/JsonUtils.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/utils/JsonUtils.java @@ -162,6 +162,17 @@ public static T parseObject(byte[] bytes, Class clazz) { } } + public static T parseObject(String text, TypeReference typeReference) { + if (StringUtils.isEmpty(text)) { + return null; + } + try { + return OBJECT_MAPPER.readValue(text, typeReference); + } catch (JsonProcessingException e) { + throw new JsonException("deserialize json string to object error", e); + } + } + /** * parse json string to object. * diff --git a/eventmesh-connectors/eventmesh-connector-http/src/main/resources/server-config.yml b/eventmesh-connectors/eventmesh-connector-http/src/main/resources/server-config.yml index 0cd7b5b5ab..5f66dd0f68 100644 --- a/eventmesh-connectors/eventmesh-connector-http/src/main/resources/server-config.yml +++ b/eventmesh-connectors/eventmesh-connector-http/src/main/resources/server-config.yml @@ -16,4 +16,4 @@ # sourceEnable: true -sinkEnable: false +sinkEnable: true diff --git a/eventmesh-connectors/eventmesh-connector-mcp/build.gradle b/eventmesh-connectors/eventmesh-connector-mcp/build.gradle new file mode 100644 index 0000000000..82072c2876 --- /dev/null +++ b/eventmesh-connectors/eventmesh-connector-mcp/build.gradle @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +dependencies { + api project(":eventmesh-openconnect:eventmesh-openconnect-java") + implementation project(":eventmesh-common") + implementation project(":eventmesh-connectors:eventmesh-connector-http") + implementation project(":eventmesh-protocol-plugin:eventmesh-protocol-api") + implementation "io.cloudevents:cloudevents-core" + implementation "com.google.guava:guava" + implementation "io.cloudevents:cloudevents-json-jackson" + implementation ("io.grpc:grpc-protobuf:1.68.0") { + exclude group: "com.google.protobuf", module: "protobuf-java" + } + implementation 'io.cloudevents:cloudevents-http-vertx:3.0.0' + implementation 'io.vertx:vertx-web:4.5.8' + implementation 'io.vertx:vertx-web-client:4.5.9' + implementation 'dev.failsafe:failsafe:3.3.2' + + + testImplementation 'org.apache.httpcomponents.client5:httpclient5:5.4' + testImplementation 'org.apache.httpcomponents.client5:httpclient5-fluent:5.4' + testImplementation 'org.mock-server:mockserver-netty:5.15.0' + implementation 'io.netty:netty-codec-http:4.1.114.Final' + compileOnly 'org.projectlombok:lombok' + annotationProcessor 'org.projectlombok:lombok' +} diff --git a/eventmesh-connectors/eventmesh-connector-mcp/gradle.properties b/eventmesh-connectors/eventmesh-connector-mcp/gradle.properties new file mode 100644 index 0000000000..5e98eb968e --- /dev/null +++ b/eventmesh-connectors/eventmesh-connector-mcp/gradle.properties @@ -0,0 +1,18 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +pluginType=connector +pluginName=mcp \ No newline at end of file diff --git a/eventmesh-connectors/eventmesh-connector-mcp/src/main/java/org/apache/eventmesh/connector/mcp/config/McpServerConfig.java b/eventmesh-connectors/eventmesh-connector-mcp/src/main/java/org/apache/eventmesh/connector/mcp/config/McpServerConfig.java new file mode 100644 index 0000000000..33357b6a29 --- /dev/null +++ b/eventmesh-connectors/eventmesh-connector-mcp/src/main/java/org/apache/eventmesh/connector/mcp/config/McpServerConfig.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.eventmesh.connector.mcp.config; + +import org.apache.eventmesh.common.config.connector.Config; + +import lombok.Data; +import lombok.EqualsAndHashCode; + + +@Data +@EqualsAndHashCode(callSuper = true) +public class McpServerConfig extends Config { + + private boolean sourceEnable; + + private boolean sinkEnable; +} diff --git a/eventmesh-connectors/eventmesh-connector-mcp/src/main/java/org/apache/eventmesh/connector/mcp/server/McpConnectServer.java b/eventmesh-connectors/eventmesh-connector-mcp/src/main/java/org/apache/eventmesh/connector/mcp/server/McpConnectServer.java new file mode 100644 index 0000000000..71ea9ae752 --- /dev/null +++ b/eventmesh-connectors/eventmesh-connector-mcp/src/main/java/org/apache/eventmesh/connector/mcp/server/McpConnectServer.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.connector.mcp.server; + +import org.apache.eventmesh.connector.mcp.config.McpServerConfig; +import org.apache.eventmesh.connector.mcp.sink.McpSinkConnector; +import org.apache.eventmesh.connector.mcp.source.McpSourceConnector; +import org.apache.eventmesh.openconnect.Application; +import org.apache.eventmesh.openconnect.util.ConfigUtil; + +public class McpConnectServer { + public static void main(String[] args) throws Exception { + McpServerConfig serverConfig = ConfigUtil.parse(McpServerConfig.class, "server-config.yml"); + + if (serverConfig.isSourceEnable()) { + Application mcpSourceApp = new Application(); + mcpSourceApp.run(McpSourceConnector.class); + } + + if (serverConfig.isSinkEnable()) { + Application mcpSinkApp = new Application(); + mcpSinkApp.run(McpSinkConnector.class); + } + } +} diff --git a/eventmesh-connectors/eventmesh-connector-mcp/src/main/java/org/apache/eventmesh/connector/mcp/sink/McpSinkConnector.java b/eventmesh-connectors/eventmesh-connector-mcp/src/main/java/org/apache/eventmesh/connector/mcp/sink/McpSinkConnector.java new file mode 100644 index 0000000000..3d65fb9b5d --- /dev/null +++ b/eventmesh-connectors/eventmesh-connector-mcp/src/main/java/org/apache/eventmesh/connector/mcp/sink/McpSinkConnector.java @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.connector.mcp.sink; + +import org.apache.eventmesh.common.EventMeshThreadFactory; +import org.apache.eventmesh.common.config.connector.Config; +import org.apache.eventmesh.common.config.connector.mcp.McpSinkConfig; +import org.apache.eventmesh.common.config.connector.mcp.SinkConnectorConfig; +import org.apache.eventmesh.connector.mcp.sink.handler.McpSinkHandler; +import org.apache.eventmesh.connector.mcp.sink.handler.impl.CommonMcpSinkHandler; +import org.apache.eventmesh.connector.mcp.sink.handler.impl.McpSinkHandlerRetryWrapper; +import org.apache.eventmesh.openconnect.api.ConnectorCreateService; +import org.apache.eventmesh.openconnect.api.connector.ConnectorContext; +import org.apache.eventmesh.openconnect.api.connector.SinkConnectorContext; +import org.apache.eventmesh.openconnect.api.sink.Sink; +import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord; + +import java.util.List; +import java.util.Objects; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import lombok.Getter; +import lombok.SneakyThrows; +import lombok.extern.slf4j.Slf4j; + + +@Slf4j +public class McpSinkConnector implements Sink, ConnectorCreateService { + + private McpSinkConfig mcpSinkConfig; + + @Getter + private McpSinkHandler sinkHandler; + + private ThreadPoolExecutor executor; + + private final AtomicBoolean isStart = new AtomicBoolean(false); + + @Override + public Class configClass() { + return McpSinkConfig.class; + } + + @Override + public Sink create() { + return new McpSinkConnector(); + } + + @Override + public void init(Config config) throws Exception { + this.mcpSinkConfig = (McpSinkConfig) config; + doInit(); + } + + @Override + public void init(ConnectorContext connectorContext) throws Exception { + SinkConnectorContext sinkConnectorContext = (SinkConnectorContext) connectorContext; + this.mcpSinkConfig = (McpSinkConfig) sinkConnectorContext.getSinkConfig(); + doInit(); + } + + @SneakyThrows + private void doInit() { + // Fill default values if absent + SinkConnectorConfig.populateFieldsWithDefaults(this.mcpSinkConfig.connectorConfig); + // Create different handlers for different configurations + McpSinkHandler nonRetryHandler; + + nonRetryHandler = new CommonMcpSinkHandler(this.mcpSinkConfig.connectorConfig); + + int maxRetries = this.mcpSinkConfig.connectorConfig.getRetryConfig().getMaxRetries(); + if (maxRetries == 0) { + // Use the original sink handler + this.sinkHandler = nonRetryHandler; + } else if (maxRetries > 0) { + // Wrap the sink handler with a retry handler + this.sinkHandler = new McpSinkHandlerRetryWrapper(this.mcpSinkConfig.connectorConfig, nonRetryHandler); + } else { + throw new IllegalArgumentException("Max retries must be greater than or equal to 0."); + } + + boolean isParallelized = this.mcpSinkConfig.connectorConfig.isParallelized(); + int parallelism = isParallelized ? this.mcpSinkConfig.connectorConfig.getParallelism() : 1; + + // Use the executor's built-in queue with a reasonable capacity + executor = new ThreadPoolExecutor( + parallelism, + parallelism, + 0L, + TimeUnit.MILLISECONDS, + new LinkedBlockingQueue<>(), // Built-in queue with capacity + new EventMeshThreadFactory("mcp-sink-handler") + ); + } + + @Override + public void start() throws Exception { + this.sinkHandler.start(); + isStart.set(true); + } + + @Override + public void commit(ConnectRecord record) { + + } + + @Override + public String name() { + return this.mcpSinkConfig.connectorConfig.getConnectorName(); + } + + @Override + public void onException(ConnectRecord record) { + + } + + @Override + public void stop() throws Exception { + isStart.set(false); + + log.info("Stopping mcp sink connector, shutting down executor..."); + executor.shutdown(); + + try { + // Wait for existing tasks to complete + if (!executor.awaitTermination(30, TimeUnit.SECONDS)) { + log.warn("Executor did not terminate gracefully, forcing shutdown"); + executor.shutdownNow(); + // Wait a bit more for tasks to respond to being cancelled + if (!executor.awaitTermination(10, TimeUnit.SECONDS)) { + log.error("Executor did not terminate after forced shutdown"); + } + } + } catch (InterruptedException e) { + log.warn("Interrupted while waiting for executor termination"); + executor.shutdownNow(); + Thread.currentThread().interrupt(); + } + + this.sinkHandler.stop(); + } + + @Override + public void put(List sinkRecords) { + if (!isStart.get()) { + log.warn("Connector is not started, ignoring sink records"); + return; + } + + for (ConnectRecord sinkRecord : sinkRecords) { + if (Objects.isNull(sinkRecord)) { + log.warn("ConnectRecord data is null, ignore."); + continue; + } + log.info("McpSinkConnector put record: {}", sinkRecord); + + try { + // Use executor.submit() instead of custom queue + executor.submit(() -> { + try { + sinkHandler.handle(sinkRecord); + } catch (Exception e) { + log.error("Failed to handle sink record via mcp", e); + } + }); + } catch (Exception e) { + log.error("Failed to submit sink record to executor", e); + } + } + } +} diff --git a/eventmesh-connectors/eventmesh-connector-mcp/src/main/java/org/apache/eventmesh/connector/mcp/sink/data/McpAttemptEvent.java b/eventmesh-connectors/eventmesh-connector-mcp/src/main/java/org/apache/eventmesh/connector/mcp/sink/data/McpAttemptEvent.java new file mode 100644 index 0000000000..451fc3523d --- /dev/null +++ b/eventmesh-connectors/eventmesh-connector-mcp/src/main/java/org/apache/eventmesh/connector/mcp/sink/data/McpAttemptEvent.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.connector.mcp.sink.data; + +import java.util.concurrent.atomic.AtomicInteger; + +/** + * Single MCP attempt event + */ +public class McpAttemptEvent { + + public static final String PREFIX = "mcp-attempt-event-"; + + private final int maxAttempts; + + private final AtomicInteger attempts; + + private Throwable lastException; + + + public McpAttemptEvent(int maxAttempts) { + this.maxAttempts = maxAttempts; + this.attempts = new AtomicInteger(0); + } + + /** + * Increment the attempts + */ + public void incrementAttempts() { + attempts.incrementAndGet(); + } + + /** + * Update the event, incrementing the attempts and setting the last exception + * + * @param exception the exception to update, can be null + */ + public void updateEvent(Throwable exception) { + // increment the attempts + incrementAttempts(); + + // update the last exception + lastException = exception; + } + + /** + * Check if the attempts are less than the maximum attempts + * + * @return true if the attempts are less than the maximum attempts, false otherwise + */ + public boolean canAttempt() { + return attempts.get() < maxAttempts; + } + + public boolean isComplete() { + if (attempts.get() == 0) { + // No start yet + return false; + } + + // If no attempt can be made or the last exception is null, the event completed + return !canAttempt() || lastException == null; + } + + + public int getMaxAttempts() { + return maxAttempts; + } + + public int getAttempts() { + return attempts.get(); + } + + public Throwable getLastException() { + return lastException; + } + + /** + * Get the limited exception message with the default limit of 256 + * + * @return the limited exception message + */ + public String getLimitedExceptionMessage() { + return getLimitedExceptionMessage(256); + } + + /** + * Get the limited exception message with the specified limit + * + * @param maxLimit the maximum limit of the exception message + * @return the limited exception message + */ + public String getLimitedExceptionMessage(int maxLimit) { + if (lastException == null) { + return ""; + } + String message = lastException.getMessage(); + if (message == null) { + return ""; + } + if (message.length() > maxLimit) { + return message.substring(0, maxLimit); + } + return message; + } + +} diff --git a/eventmesh-connectors/eventmesh-connector-mcp/src/main/java/org/apache/eventmesh/connector/mcp/sink/data/McpConnectRecord.java b/eventmesh-connectors/eventmesh-connector-mcp/src/main/java/org/apache/eventmesh/connector/mcp/sink/data/McpConnectRecord.java new file mode 100644 index 0000000000..26f9c1e6fb --- /dev/null +++ b/eventmesh-connectors/eventmesh-connector-mcp/src/main/java/org/apache/eventmesh/connector/mcp/sink/data/McpConnectRecord.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.connector.mcp.sink.data; + +import org.apache.eventmesh.common.remote.offset.http.HttpRecordOffset; +import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord; +import org.apache.eventmesh.openconnect.offsetmgmt.api.data.KeyValue; + +import java.io.Serializable; +import java.time.LocalDateTime; +import java.util.Base64; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; + +import lombok.Builder; +import lombok.Getter; + +/** + * a special ConnectRecord for McpSinkConnector + */ +@Getter +@Builder +public class McpConnectRecord implements Serializable { + + private static final long serialVersionUID = 5271462532332251473L; + + /** + * The unique identifier for the McpConnectRecord + */ + private final String mcpRecordId = UUID.randomUUID().toString(); + + /** + * The time when the McpConnectRecord was created + */ + private LocalDateTime createTime; + + /** + * The type of the McpConnectRecord + */ + private String type; + + /** + * The event id of the McpConnectRecord + */ + private String eventId; + + private Object data; + + private KeyValue extensions; + + @Override + public String toString() { + return "McpConnectRecord{" + + "createTime=" + createTime + + ", mcpRecordId='" + mcpRecordId + + ", type='" + type + + ", eventId='" + eventId + + ", data=" + data + + ", extensions=" + extensions + + '}'; + } + + /** + * Convert ConnectRecord to McpConnectRecord + * + * @param record the ConnectRecord to convert + * @return the converted McpConnectRecord + */ + public static McpConnectRecord convertConnectRecord(ConnectRecord record, String type) { + Map offsetMap = new HashMap<>(); + if (record != null && record.getPosition() != null && record.getPosition().getRecordOffset() != null) { + if (HttpRecordOffset.class.equals(record.getPosition().getRecordOffsetClazz())) { + offsetMap = ((HttpRecordOffset) record.getPosition().getRecordOffset()).getOffsetMap(); + } + } + String offset = "0"; + if (!offsetMap.isEmpty()) { + offset = offsetMap.values().iterator().next().toString(); + } + if (record.getData() instanceof byte[]) { + String data = Base64.getEncoder().encodeToString((byte[]) record.getData()); + record.addExtension("isBase64", true); + return McpConnectRecord.builder() + .type(type) + .createTime(LocalDateTime.now()) + .eventId(type + "-" + offset) + .data(data) + .extensions(record.getExtensions()) + .build(); + } else { + record.addExtension("isBase64", false); + return McpConnectRecord.builder() + .type(type) + .createTime(LocalDateTime.now()) + .eventId(type + "-" + offset) + .data(record.getData()) + .extensions(record.getExtensions()) + .build(); + } + } + +} diff --git a/eventmesh-connectors/eventmesh-connector-mcp/src/main/java/org/apache/eventmesh/connector/mcp/sink/data/McpExportMetadata.java b/eventmesh-connectors/eventmesh-connector-mcp/src/main/java/org/apache/eventmesh/connector/mcp/sink/data/McpExportMetadata.java new file mode 100644 index 0000000000..72595b2637 --- /dev/null +++ b/eventmesh-connectors/eventmesh-connector-mcp/src/main/java/org/apache/eventmesh/connector/mcp/sink/data/McpExportMetadata.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.connector.mcp.sink.data; + +import java.io.Serializable; +import java.time.LocalDateTime; + +import lombok.Builder; +import lombok.Data; + +/** + * Metadata for an MCP export operation. + */ +@Data +@Builder +public class McpExportMetadata implements Serializable { + + private static final long serialVersionUID = 1121010466793041920L; + + private String url; + + private int code; + + private String message; + + private LocalDateTime receivedTime; + + private String recordId; + + private String retriedBy; + + private int retryNum; +} diff --git a/eventmesh-connectors/eventmesh-connector-mcp/src/main/java/org/apache/eventmesh/connector/mcp/sink/data/McpExportRecord.java b/eventmesh-connectors/eventmesh-connector-mcp/src/main/java/org/apache/eventmesh/connector/mcp/sink/data/McpExportRecord.java new file mode 100644 index 0000000000..c9a35c193b --- /dev/null +++ b/eventmesh-connectors/eventmesh-connector-mcp/src/main/java/org/apache/eventmesh/connector/mcp/sink/data/McpExportRecord.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.connector.mcp.sink.data; + +import java.io.Serializable; + +import lombok.AllArgsConstructor; +import lombok.Data; + +/** + * Represents an MCP export record containing metadata and data to be exported. + */ +@Data +@AllArgsConstructor +public class McpExportRecord implements Serializable { + + private static final long serialVersionUID = 6010283911452947157L; + + private McpExportMetadata metadata; + + private Object data; +} diff --git a/eventmesh-connectors/eventmesh-connector-mcp/src/main/java/org/apache/eventmesh/connector/mcp/sink/data/McpExportRecordPage.java b/eventmesh-connectors/eventmesh-connector-mcp/src/main/java/org/apache/eventmesh/connector/mcp/sink/data/McpExportRecordPage.java new file mode 100644 index 0000000000..3e0e615523 --- /dev/null +++ b/eventmesh-connectors/eventmesh-connector-mcp/src/main/java/org/apache/eventmesh/connector/mcp/sink/data/McpExportRecordPage.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.connector.mcp.sink.data; + +import java.io.Serializable; +import java.util.List; + +import lombok.AllArgsConstructor; +import lombok.Data; + +/** + * Represents a page of MCP export records. + */ +@Data +@AllArgsConstructor +public class McpExportRecordPage implements Serializable { + + private static final long serialVersionUID = 1143791658357035990L; + + private int pageNum; + + private int pageSize; + + private List pageItems; + +} diff --git a/eventmesh-connectors/eventmesh-connector-mcp/src/main/java/org/apache/eventmesh/connector/mcp/sink/data/MultiMcpRequestContext.java b/eventmesh-connectors/eventmesh-connector-mcp/src/main/java/org/apache/eventmesh/connector/mcp/sink/data/MultiMcpRequestContext.java new file mode 100644 index 0000000000..f24d0e3fd1 --- /dev/null +++ b/eventmesh-connectors/eventmesh-connector-mcp/src/main/java/org/apache/eventmesh/connector/mcp/sink/data/MultiMcpRequestContext.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.connector.mcp.sink.data; + +import java.util.concurrent.atomic.AtomicInteger; + + +/** + * Multi Mcp request context + */ +public class MultiMcpRequestContext { + + public static final String NAME = "multi-http-request-context"; + + /** + * The remaining requests to be processed. + */ + private final AtomicInteger remainingRequests; + + /** + * The last failed event. + * If retries occur but still fail, it will be logged, and only the last one will be retained. + */ + private McpAttemptEvent lastFailedEvent; + + public MultiMcpRequestContext(int remainingEvents) { + this.remainingRequests = new AtomicInteger(remainingEvents); + } + + /** + * Decrement the remaining requests by 1. + */ + public void decrementRemainingRequests() { + remainingRequests.decrementAndGet(); + } + + /** + * Check if all requests have been processed. + * + * @return true if all requests have been processed, false otherwise. + */ + public boolean isAllRequestsProcessed() { + return remainingRequests.get() == 0; + } + + public int getRemainingRequests() { + return remainingRequests.get(); + } + + public McpAttemptEvent getLastFailedEvent() { + return lastFailedEvent; + } + + public void setLastFailedEvent(McpAttemptEvent lastFailedEvent) { + this.lastFailedEvent = lastFailedEvent; + } +} diff --git a/eventmesh-connectors/eventmesh-connector-mcp/src/main/java/org/apache/eventmesh/connector/mcp/sink/handler/AbstractMcpSinkHandler.java b/eventmesh-connectors/eventmesh-connector-mcp/src/main/java/org/apache/eventmesh/connector/mcp/sink/handler/AbstractMcpSinkHandler.java new file mode 100644 index 0000000000..5c7435d037 --- /dev/null +++ b/eventmesh-connectors/eventmesh-connector-mcp/src/main/java/org/apache/eventmesh/connector/mcp/sink/handler/AbstractMcpSinkHandler.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.connector.mcp.sink.handler; + +import org.apache.eventmesh.common.config.connector.mcp.SinkConnectorConfig; +import org.apache.eventmesh.connector.mcp.sink.data.McpAttemptEvent; +import org.apache.eventmesh.connector.mcp.sink.data.McpConnectRecord; +import org.apache.eventmesh.connector.mcp.sink.data.MultiMcpRequestContext; +import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord; + +import java.net.URI; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; + +import lombok.Getter; + +public abstract class AbstractMcpSinkHandler implements McpSinkHandler { + @Getter + private final SinkConnectorConfig sinkConnectorConfig; + + @Getter + private final List urls; + + private final McpDeliveryStrategy deliveryStrategy; + + private int roundRobinIndex = 0; + + protected AbstractMcpSinkHandler(SinkConnectorConfig sinkConnectorConfig) { + this.sinkConnectorConfig = sinkConnectorConfig; + this.deliveryStrategy = McpDeliveryStrategy.valueOf(sinkConnectorConfig.getDeliveryStrategy()); + // Initialize URLs + String[] urlStrings = sinkConnectorConfig.getUrls(); + this.urls = Arrays.stream(urlStrings) + .map(URI::create) + .collect(Collectors.toList()); + } + + /** + * Processes a ConnectRecord by sending it over HTTP or HTTPS. This method should be called for each ConnectRecord that needs to be processed. + * + * @param record the ConnectRecord to process + */ + @Override + public void handle(ConnectRecord record) { + // build attributes + Map attributes = new ConcurrentHashMap<>(); + + switch (deliveryStrategy) { + case ROUND_ROBIN: + attributes.put(MultiMcpRequestContext.NAME, new MultiMcpRequestContext(1)); + URI url = urls.get(roundRobinIndex); + roundRobinIndex = (roundRobinIndex + 1) % urls.size(); + sendRecordToUrl(record, attributes, url); + break; + case BROADCAST: + attributes.put(MultiMcpRequestContext.NAME, new MultiMcpRequestContext(urls.size())); + // send the record to all URLs + urls.forEach(url0 -> sendRecordToUrl(record, attributes, url0)); + break; + default: + throw new IllegalArgumentException("Unknown delivery strategy: " + deliveryStrategy); + } + } + + private void sendRecordToUrl(ConnectRecord record, Map attributes, URI url) { + // convert ConnectRecord to HttpConnectRecord + String type = String.format("%s.%s.%s", + this.sinkConnectorConfig.getConnectorName(), url.getScheme(), + "common"); + McpConnectRecord mcpConnectRecord = McpConnectRecord.convertConnectRecord(record, type); + + // add AttemptEvent to the attributes + McpAttemptEvent attemptEvent = new McpAttemptEvent(this.sinkConnectorConfig.getRetryConfig().getMaxRetries() + 1); + attributes.put(McpAttemptEvent.PREFIX + mcpConnectRecord.getMcpRecordId(), attemptEvent); + + // deliver the record + deliver(url, mcpConnectRecord, attributes, record); + } +} diff --git a/eventmesh-connectors/eventmesh-connector-mcp/src/main/java/org/apache/eventmesh/connector/mcp/sink/handler/McpDeliveryStrategy.java b/eventmesh-connectors/eventmesh-connector-mcp/src/main/java/org/apache/eventmesh/connector/mcp/sink/handler/McpDeliveryStrategy.java new file mode 100644 index 0000000000..07cbbe3d46 --- /dev/null +++ b/eventmesh-connectors/eventmesh-connector-mcp/src/main/java/org/apache/eventmesh/connector/mcp/sink/handler/McpDeliveryStrategy.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.connector.mcp.sink.handler; + +public enum McpDeliveryStrategy { + ROUND_ROBIN, + BROADCAST +} diff --git a/eventmesh-connectors/eventmesh-connector-mcp/src/main/java/org/apache/eventmesh/connector/mcp/sink/handler/McpSinkHandler.java b/eventmesh-connectors/eventmesh-connector-mcp/src/main/java/org/apache/eventmesh/connector/mcp/sink/handler/McpSinkHandler.java new file mode 100644 index 0000000000..c10d96337b --- /dev/null +++ b/eventmesh-connectors/eventmesh-connector-mcp/src/main/java/org/apache/eventmesh/connector/mcp/sink/handler/McpSinkHandler.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.connector.mcp.sink.handler; + +import org.apache.eventmesh.connector.mcp.sink.data.McpConnectRecord; +import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord; + +import java.net.URI; +import java.util.Map; + +import io.vertx.core.Future; +import io.vertx.core.buffer.Buffer; +import io.vertx.ext.web.client.HttpResponse; + +/** + * Interface for handling ConnectRecords via HTTP or HTTPS. Classes implementing this interface are responsible for processing ConnectRecords by + * sending them over HTTP or HTTPS, with additional support for handling multiple requests and asynchronous processing. + * + *

Any class that needs to process ConnectRecords via HTTP or HTTPS should implement this interface. + * Implementing classes must provide implementations for the {@link #start()}, {@link #handle(ConnectRecord)}, + * {@link #deliver(URI, McpConnectRecord, Map, ConnectRecord)}, and {@link #stop()} methods.

+ * + *

Implementing classes should ensure thread safety and handle MCP communication efficiently. + * The {@link #start()} method initializes any necessary resources for MCP communication. The {@link #handle(ConnectRecord)} method processes a + * ConnectRecord by sending it over HTTP or HTTPS. The {@link #deliver(URI, McpConnectRecord, Map, ConnectRecord)} method processes HttpConnectRecord + * on specified URL while returning its own processing logic {@link #stop()} method releases any resources used for MCP communication.

+ * + *

It's recommended to handle exceptions gracefully within the {@link #deliver(URI, McpConnectRecord, Map, ConnectRecord)} method + * to prevent message loss or processing interruptions.

+ */ +public interface McpSinkHandler { + + /** + * Initializes the MCP handler. This method should be called before using the handler. + */ + void start(); + + /** + * Processes a ConnectRecord by sending it over HTTP or HTTPS. This method should be called for each ConnectRecord that needs to be processed. + * + * @param record the ConnectRecord to process + */ + void handle(ConnectRecord record); + + + /** + * Processes HttpConnectRecord on specified URL while returning its own processing logic + * + * @param url URI to which the HttpConnectRecord should be sent + * @param mcpConnectRecord HttpConnectRecord to process + * @param attributes additional attributes to be used in processing + * @return processing chain + */ + Future> deliver(URI url, McpConnectRecord mcpConnectRecord, Map attributes, ConnectRecord connectRecord); + + /** + * Cleans up and releases resources used by the MCP handler. This method should be called when the handler is no longer needed. + */ + void stop(); +} + diff --git a/eventmesh-connectors/eventmesh-connector-mcp/src/main/java/org/apache/eventmesh/connector/mcp/sink/handler/impl/CommonMcpSinkHandler.java b/eventmesh-connectors/eventmesh-connector-mcp/src/main/java/org/apache/eventmesh/connector/mcp/sink/handler/impl/CommonMcpSinkHandler.java new file mode 100644 index 0000000000..1d884f4a19 --- /dev/null +++ b/eventmesh-connectors/eventmesh-connector-mcp/src/main/java/org/apache/eventmesh/connector/mcp/sink/handler/impl/CommonMcpSinkHandler.java @@ -0,0 +1,271 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.connector.mcp.sink.handler.impl; + +import org.apache.eventmesh.common.config.connector.mcp.SinkConnectorConfig; +import org.apache.eventmesh.common.utils.JsonUtils; +import org.apache.eventmesh.connector.http.util.HttpUtils; +import org.apache.eventmesh.connector.mcp.sink.data.McpAttemptEvent; +import org.apache.eventmesh.connector.mcp.sink.data.McpConnectRecord; +import org.apache.eventmesh.connector.mcp.sink.data.MultiMcpRequestContext; +import org.apache.eventmesh.connector.mcp.sink.handler.AbstractMcpSinkHandler; +import org.apache.eventmesh.openconnect.offsetmgmt.api.callback.SendExceptionContext; +import org.apache.eventmesh.openconnect.offsetmgmt.api.callback.SendResult; +import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord; + +import java.net.URI; +import java.time.ZoneId; +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +import io.netty.handler.codec.http.HttpHeaderNames; +import io.vertx.core.Future; +import io.vertx.core.MultiMap; +import io.vertx.core.Vertx; +import io.vertx.core.buffer.Buffer; +import io.vertx.core.http.HttpHeaders; +import io.vertx.ext.web.client.HttpResponse; +import io.vertx.ext.web.client.WebClient; +import io.vertx.ext.web.client.WebClientOptions; + +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; + + +/** + * Common MCP Sink Handler implementation to handle ConnectRecords by sending them over MCP to configured URLs. + * + *

This handler initializes a WebClient for making HTTP requests based on the provided SinkConnectorConfig. + * It handles processing ConnectRecords by converting them to HttpConnectRecord and sending them asynchronously to each configured URL using the + * WebClient.

+ * + *

The handler uses Vert.x's WebClient to perform HTTP/HTTPS requests. It initializes the WebClient in the {@link #start()} + * method and closes it in the {@link #stop()} method to manage resources efficiently.

+ * + *

Each ConnectRecord is processed and sent to all configured URLs concurrently using asynchronous HTTP requests.

+ */ +@Slf4j +@Getter +public class CommonMcpSinkHandler extends AbstractMcpSinkHandler { + + private WebClient webClient; + + + public CommonMcpSinkHandler(SinkConnectorConfig sinkConnectorConfig) { + super(sinkConnectorConfig); + } + + /** + * Initializes the WebClient for making HTTP requests based on the provided SinkConnectorConfig. + */ + @Override + public void start() { + // Create WebClient + doInitWebClient(); + } + + /** + * Initializes the WebClient with the provided configuration options. + */ + private void doInitWebClient() { + SinkConnectorConfig sinkConnectorConfig = getSinkConnectorConfig(); + final Vertx vertx = Vertx.vertx(); + WebClientOptions options = new WebClientOptions() + .setKeepAlive(sinkConnectorConfig.isKeepAlive()) + .setKeepAliveTimeout(sinkConnectorConfig.getKeepAliveTimeout() / 1000) + .setIdleTimeout(sinkConnectorConfig.getIdleTimeout()) + .setIdleTimeoutUnit(TimeUnit.MILLISECONDS) + .setConnectTimeout(sinkConnectorConfig.getConnectionTimeout()) + .setMaxPoolSize(sinkConnectorConfig.getMaxConnectionPoolSize()) + .setPipelining(sinkConnectorConfig.isParallelized()); + this.webClient = WebClient.create(vertx, options); + } + + /** + * Processes HttpConnectRecord on specified URL while returning its own processing logic. This method sends the HttpConnectRecord to the specified + * URL using the WebClient. + * + * @param url URI to which the HttpConnectRecord should be sent + * @param mcpConnectRecord HttpConnectRecord to process + * @param attributes additional attributes to be used in processing + * @return processing chain + */ + @Override + public Future> deliver(URI url, McpConnectRecord mcpConnectRecord, Map attributes, + ConnectRecord connectRecord) { + // create headers + Map extensionMap = new HashMap<>(); + Set extensionKeySet = mcpConnectRecord.getExtensions().keySet(); + for (String extensionKey : extensionKeySet) { + Object v = mcpConnectRecord.getExtensions().getObject(extensionKey); + extensionMap.put(extensionKey, v); + } + + MultiMap headers = HttpHeaders.headers() + .set(HttpHeaderNames.CONTENT_TYPE, "application/json; charset=utf-8") + .set(HttpHeaderNames.ACCEPT, "application/json; charset=utf-8") + .set("extension", JsonUtils.toJSONString(extensionMap)); + // get timestamp and offset + Long timestamp = mcpConnectRecord.getCreateTime() + .atZone(ZoneId.systemDefault()) + .toInstant() + .toEpochMilli(); + + // send the request + return this.webClient.post(url.getPath()) + .host(url.getHost()) + .port(url.getPort() == -1 ? (Objects.equals(url.getScheme(), "https") ? 443 : 80) : url.getPort()) + .putHeaders(headers) + .ssl(Objects.equals(url.getScheme(), "https")) + .sendJson(mcpConnectRecord.getData()) + .onSuccess(res -> { + log.info("Request sent successfully. Record: timestamp={}", timestamp); + + Exception e = null; + + // log the response + if (HttpUtils.is2xxSuccessful(res.statusCode())) { + if (log.isDebugEnabled()) { + log.debug("Received successful response: statusCode={}. Record: timestamp={}, responseBody={}", + res.statusCode(), timestamp, res.bodyAsString()); + } else { + log.info("Received successful response: statusCode={}. Record: timestamp={}", res.statusCode(), timestamp); + } + } else { + if (log.isDebugEnabled()) { + log.warn("Received non-2xx response: statusCode={}. Record: timestamp={}, responseBody={}", + res.statusCode(), timestamp, res.bodyAsString()); + } else { + log.warn("Received non-2xx response: statusCode={}. Record: timestamp={}", res.statusCode(), timestamp); + } + + e = new RuntimeException("Unexpected HTTP response code: " + res.statusCode()); + } + + // try callback + tryCallback(mcpConnectRecord, e, attributes, connectRecord); + }).onFailure(err -> { + log.error("Request failed to send. Record: timestamp={}", timestamp, err); + + // try callback + tryCallback(mcpConnectRecord, err, attributes, connectRecord); + }); + } + + /** + * Tries to call the callback based on the result of the request. + * + * @param mcpConnectRecord the McpConnectRecord to use + * @param e the exception thrown during the request, may be null + * @param attributes additional attributes to be used in processing + */ + private void tryCallback(McpConnectRecord mcpConnectRecord, Throwable e, Map attributes, ConnectRecord record) { + // get and update the attempt event + McpAttemptEvent attemptEvent = (McpAttemptEvent) attributes.get(McpAttemptEvent.PREFIX + mcpConnectRecord.getMcpRecordId()); + attemptEvent.updateEvent(e); + + // get and update the multiHttpRequestContext + MultiMcpRequestContext multiMcpRequestContext = getAndUpdateMultiMcpRequestContext(attributes, attemptEvent); + + if (multiMcpRequestContext.isAllRequestsProcessed()) { + // do callback + if (record.getCallback() == null) { + if (log.isDebugEnabled()) { + log.warn("ConnectRecord callback is null. Ignoring callback. {}", record); + } else { + log.warn("ConnectRecord callback is null. Ignoring callback."); + } + return; + } + + // get the last failed event + McpAttemptEvent lastFailedEvent = multiMcpRequestContext.getLastFailedEvent(); + if (lastFailedEvent == null) { + // success + record.getCallback().onSuccess(convertToSendResult(record)); + } else { + // failure + record.getCallback().onException(buildSendExceptionContext(record, lastFailedEvent.getLastException())); + } + } else { + log.warn("still have requests to process, size {}|attempt num {}", + multiMcpRequestContext.getRemainingRequests(), attemptEvent.getAttempts()); + } + } + + + /** + * Gets and updates the multi mcp request context based on the provided attributes and HttpConnectRecord. + * + * @param attributes the attributes to use + * @param attemptEvent the McpAttemptEvent to use + * @return the updated multi mcp request context + */ + private MultiMcpRequestContext getAndUpdateMultiMcpRequestContext(Map attributes, McpAttemptEvent attemptEvent) { + // get the multi http request context + MultiMcpRequestContext multiMcpRequestContext = (MultiMcpRequestContext) attributes.get(MultiMcpRequestContext.NAME); + + // Check if the current attempted event has completed + if (attemptEvent.isComplete()) { + // decrement the counter + multiMcpRequestContext.decrementRemainingRequests(); + + if (attemptEvent.getLastException() != null) { + // if all attempts are exhausted, set the last failed event + multiMcpRequestContext.setLastFailedEvent(attemptEvent); + } + } + + return multiMcpRequestContext; + } + + private SendResult convertToSendResult(ConnectRecord record) { + SendResult result = new SendResult(); + result.setMessageId(record.getRecordId()); + if (org.apache.commons.lang3.StringUtils.isNotEmpty(record.getExtension("topic"))) { + result.setTopic(record.getExtension("topic")); + } + return result; + } + + private SendExceptionContext buildSendExceptionContext(ConnectRecord record, Throwable e) { + SendExceptionContext sendExceptionContext = new SendExceptionContext(); + sendExceptionContext.setMessageId(record.getRecordId()); + sendExceptionContext.setCause(e); + if (org.apache.commons.lang3.StringUtils.isNotEmpty(record.getExtension("topic"))) { + sendExceptionContext.setTopic(record.getExtension("topic")); + } + return sendExceptionContext; + } + + + /** + * Cleans up and releases resources used by the MCP handler. + */ + @Override + public void stop() { + if (this.webClient != null) { + this.webClient.close(); + } else { + log.warn("WebClient is null, ignore."); + } + } +} diff --git a/eventmesh-connectors/eventmesh-connector-mcp/src/main/java/org/apache/eventmesh/connector/mcp/sink/handler/impl/McpSinkHandlerRetryWrapper.java b/eventmesh-connectors/eventmesh-connector-mcp/src/main/java/org/apache/eventmesh/connector/mcp/sink/handler/impl/McpSinkHandlerRetryWrapper.java new file mode 100644 index 0000000000..c2e990857e --- /dev/null +++ b/eventmesh-connectors/eventmesh-connector-mcp/src/main/java/org/apache/eventmesh/connector/mcp/sink/handler/impl/McpSinkHandlerRetryWrapper.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.connector.mcp.sink.handler.impl; + +import org.apache.eventmesh.common.config.connector.mcp.McpRetryConfig; +import org.apache.eventmesh.common.config.connector.mcp.SinkConnectorConfig; +import org.apache.eventmesh.connector.http.util.HttpUtils; +import org.apache.eventmesh.connector.mcp.sink.data.McpConnectRecord; +import org.apache.eventmesh.connector.mcp.sink.handler.AbstractMcpSinkHandler; +import org.apache.eventmesh.connector.mcp.sink.handler.McpSinkHandler; +import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord; + +import java.net.ConnectException; +import java.net.URI; +import java.time.Duration; +import java.util.Map; + +import io.vertx.core.Future; +import io.vertx.core.buffer.Buffer; +import io.vertx.ext.web.client.HttpResponse; + +import lombok.extern.slf4j.Slf4j; + +import dev.failsafe.Failsafe; +import dev.failsafe.RetryPolicy; + +/** + * McpSinkHandlerRetryWrapper is a wrapper class for the McpSinkHandler that provides retry functionality for failed Mcp requests. + */ +@Slf4j +public class McpSinkHandlerRetryWrapper extends AbstractMcpSinkHandler { + + private final McpRetryConfig mcpRetryConfig; + + private final McpSinkHandler sinkHandler; + + private final RetryPolicy> retryPolicy; + + public McpSinkHandlerRetryWrapper(SinkConnectorConfig sinkConnectorConfig, McpSinkHandler sinkHandler) { + super(sinkConnectorConfig); + this.sinkHandler = sinkHandler; + this.mcpRetryConfig = getSinkConnectorConfig().getRetryConfig(); + this.retryPolicy = buildRetryPolicy(); + } + + private RetryPolicy> buildRetryPolicy() { + return RetryPolicy.>builder() + .handleIf(e -> e instanceof ConnectException) + .handleResultIf(response -> mcpRetryConfig.isRetryOnNonSuccess() && !HttpUtils.is2xxSuccessful(response.statusCode())) + .withMaxRetries(mcpRetryConfig.getMaxRetries()) + .withDelay(Duration.ofMillis(mcpRetryConfig.getInterval())) + .onRetry(event -> { + if (log.isDebugEnabled()) { + log.warn("Failed to deliver message after {} attempts. Retrying in {} ms. Error: {}", + event.getAttemptCount(), mcpRetryConfig.getInterval(), event.getLastException()); + } else { + log.warn("Failed to deliver message after {} attempts. Retrying in {} ms.", + event.getAttemptCount(), mcpRetryConfig.getInterval()); + } + }).onFailure(event -> { + if (log.isDebugEnabled()) { + log.error("Failed to deliver message after {} attempts. Error: {}", + event.getAttemptCount(), event.getException()); + } else { + log.error("Failed to deliver message after {} attempts.", + event.getAttemptCount()); + } + }).build(); + } + + /** + * Initializes the WebClient for making Mcp requests based on the provided SinkConnectorConfig. + */ + @Override + public void start() { + sinkHandler.start(); + } + + + /** + * Processes McpConnectRecord on specified URL while returning its own processing logic This method provides the retry power to process the + * McpConnectRecord + * + * @param url URI to which the McpConnectRecord should be sent + * @param mcpConnectRecord McpConnectRecord to process + * @param attributes additional attributes to pass to the processing chain + * @return processing chain + */ + @Override + public Future> deliver(URI url, McpConnectRecord mcpConnectRecord, Map attributes, + ConnectRecord connectRecord) { + Failsafe.with(retryPolicy) + .getStageAsync(() -> sinkHandler.deliver(url, mcpConnectRecord, attributes, connectRecord).toCompletionStage()); + return null; + } + + + /** + * Cleans up and releases resources used by the Mcp handler. + */ + @Override + public void stop() { + sinkHandler.stop(); + } +} diff --git a/eventmesh-connectors/eventmesh-connector-mcp/src/main/java/org/apache/eventmesh/connector/mcp/source/McpSourceConnector.java b/eventmesh-connectors/eventmesh-connector-mcp/src/main/java/org/apache/eventmesh/connector/mcp/source/McpSourceConnector.java new file mode 100644 index 0000000000..e5fe258124 --- /dev/null +++ b/eventmesh-connectors/eventmesh-connector-mcp/src/main/java/org/apache/eventmesh/connector/mcp/source/McpSourceConnector.java @@ -0,0 +1,659 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.connector.mcp.source; + +import static org.apache.eventmesh.connector.mcp.source.McpSourceConstants.CACHE_CONTROL_NO_CACHE; +import static org.apache.eventmesh.connector.mcp.source.McpSourceConstants.CONNECTION_KEEP_ALIVE; +import static org.apache.eventmesh.connector.mcp.source.McpSourceConstants.CONTENT_TYPE_JSON; +import static org.apache.eventmesh.connector.mcp.source.McpSourceConstants.CONTENT_TYPE_JSON_PLAIN; +import static org.apache.eventmesh.connector.mcp.source.McpSourceConstants.CONTENT_TYPE_SSE; +import static org.apache.eventmesh.connector.mcp.source.McpSourceConstants.CORS_ALLOWED_HEADERS; +import static org.apache.eventmesh.connector.mcp.source.McpSourceConstants.CORS_ALLOWED_METHODS; +import static org.apache.eventmesh.connector.mcp.source.McpSourceConstants.CORS_ALLOW_ALL; +import static org.apache.eventmesh.connector.mcp.source.McpSourceConstants.CORS_EXPOSED_HEADERS; +import static org.apache.eventmesh.connector.mcp.source.McpSourceConstants.DEFAULT_CONNECTOR_NAME; +import static org.apache.eventmesh.connector.mcp.source.McpSourceConstants.DEFAULT_HEARTBEAT_INTERVAL_MS; +import static org.apache.eventmesh.connector.mcp.source.McpSourceConstants.DEFAULT_IDLE_TIMEOUT_MS; +import static org.apache.eventmesh.connector.mcp.source.McpSourceConstants.DEFAULT_NO_MESSAGE; +import static org.apache.eventmesh.connector.mcp.source.McpSourceConstants.DEFAULT_PROTOCOL_VERSION; +import static org.apache.eventmesh.connector.mcp.source.McpSourceConstants.DEFAULT_SERVER_NAME; +import static org.apache.eventmesh.connector.mcp.source.McpSourceConstants.DEFAULT_SERVER_VERSION; +import static org.apache.eventmesh.connector.mcp.source.McpSourceConstants.ENDPOINT_HEALTH; +import static org.apache.eventmesh.connector.mcp.source.McpSourceConstants.ERROR_INTERNAL; +import static org.apache.eventmesh.connector.mcp.source.McpSourceConstants.ERROR_INVALID_PARAMS; +import static org.apache.eventmesh.connector.mcp.source.McpSourceConstants.ERROR_METHOD_NOT_FOUND; +import static org.apache.eventmesh.connector.mcp.source.McpSourceConstants.HEADER_ACCEPT; +import static org.apache.eventmesh.connector.mcp.source.McpSourceConstants.HEADER_CACHE_CONTROL; +import static org.apache.eventmesh.connector.mcp.source.McpSourceConstants.HEADER_CONNECTION; +import static org.apache.eventmesh.connector.mcp.source.McpSourceConstants.HEADER_CONTENT_TYPE; +import static org.apache.eventmesh.connector.mcp.source.McpSourceConstants.HEADER_CORS_ALLOW_HEADERS; +import static org.apache.eventmesh.connector.mcp.source.McpSourceConstants.HEADER_CORS_ALLOW_METHODS; +import static org.apache.eventmesh.connector.mcp.source.McpSourceConstants.HEADER_CORS_ALLOW_ORIGIN; +import static org.apache.eventmesh.connector.mcp.source.McpSourceConstants.HEADER_CORS_EXPOSE_HEADERS; +import static org.apache.eventmesh.connector.mcp.source.McpSourceConstants.HEADER_X_ACCEL_BUFFERING; +import static org.apache.eventmesh.connector.mcp.source.McpSourceConstants.HTTP_METHOD_OPTIONS; +import static org.apache.eventmesh.connector.mcp.source.McpSourceConstants.HTTP_STATUS_INTERNAL_ERROR; +import static org.apache.eventmesh.connector.mcp.source.McpSourceConstants.HTTP_STATUS_NO_CONTENT; +import static org.apache.eventmesh.connector.mcp.source.McpSourceConstants.KEY_CAPABILITIES; +import static org.apache.eventmesh.connector.mcp.source.McpSourceConstants.KEY_CONNECTOR; +import static org.apache.eventmesh.connector.mcp.source.McpSourceConstants.KEY_CONTENT; +import static org.apache.eventmesh.connector.mcp.source.McpSourceConstants.KEY_DESCRIPTION; +import static org.apache.eventmesh.connector.mcp.source.McpSourceConstants.KEY_ERROR; +import static org.apache.eventmesh.connector.mcp.source.McpSourceConstants.KEY_ERROR_CODE; +import static org.apache.eventmesh.connector.mcp.source.McpSourceConstants.KEY_ERROR_MESSAGE; +import static org.apache.eventmesh.connector.mcp.source.McpSourceConstants.KEY_ID; +import static org.apache.eventmesh.connector.mcp.source.McpSourceConstants.KEY_JSONRPC; +import static org.apache.eventmesh.connector.mcp.source.McpSourceConstants.KEY_METHOD; +import static org.apache.eventmesh.connector.mcp.source.McpSourceConstants.KEY_NAME; +import static org.apache.eventmesh.connector.mcp.source.McpSourceConstants.KEY_PARAMS; +import static org.apache.eventmesh.connector.mcp.source.McpSourceConstants.KEY_PROPERTIES; +import static org.apache.eventmesh.connector.mcp.source.McpSourceConstants.KEY_PROTOCOL_VERSION; +import static org.apache.eventmesh.connector.mcp.source.McpSourceConstants.KEY_REQUIRED; +import static org.apache.eventmesh.connector.mcp.source.McpSourceConstants.KEY_RESULT; +import static org.apache.eventmesh.connector.mcp.source.McpSourceConstants.KEY_SERVER_INFO; +import static org.apache.eventmesh.connector.mcp.source.McpSourceConstants.KEY_STATUS; +import static org.apache.eventmesh.connector.mcp.source.McpSourceConstants.KEY_TEXT; +import static org.apache.eventmesh.connector.mcp.source.McpSourceConstants.KEY_TOOLS; +import static org.apache.eventmesh.connector.mcp.source.McpSourceConstants.KEY_TYPE; +import static org.apache.eventmesh.connector.mcp.source.McpSourceConstants.KEY_VERSION; +import static org.apache.eventmesh.connector.mcp.source.McpSourceConstants.MAX_POLL_WAIT_TIME_MS; +import static org.apache.eventmesh.connector.mcp.source.McpSourceConstants.METHOD_INITIALIZE; +import static org.apache.eventmesh.connector.mcp.source.McpSourceConstants.METHOD_NOTIFICATIONS_INITIALIZED; +import static org.apache.eventmesh.connector.mcp.source.McpSourceConstants.METHOD_PING; +import static org.apache.eventmesh.connector.mcp.source.McpSourceConstants.METHOD_TOOLS_CALL; +import static org.apache.eventmesh.connector.mcp.source.McpSourceConstants.METHOD_TOOLS_LIST; +import static org.apache.eventmesh.connector.mcp.source.McpSourceConstants.PARAM_DESC_MESSAGE; +import static org.apache.eventmesh.connector.mcp.source.McpSourceConstants.PARAM_DESC_MESSAGE_CONTENT; +import static org.apache.eventmesh.connector.mcp.source.McpSourceConstants.PARAM_DESC_TOPIC; +import static org.apache.eventmesh.connector.mcp.source.McpSourceConstants.PARAM_MESSAGE; +import static org.apache.eventmesh.connector.mcp.source.McpSourceConstants.PARAM_TOPIC; +import static org.apache.eventmesh.connector.mcp.source.McpSourceConstants.SSE_DATA_OPEN; +import static org.apache.eventmesh.connector.mcp.source.McpSourceConstants.SSE_EVENT_OPEN; +import static org.apache.eventmesh.connector.mcp.source.McpSourceConstants.SSE_HEARTBEAT; +import static org.apache.eventmesh.connector.mcp.source.McpSourceConstants.VALUE_JSONRPC_VERSION; +import static org.apache.eventmesh.connector.mcp.source.McpSourceConstants.VALUE_STATUS_UP; +import static org.apache.eventmesh.connector.mcp.source.McpSourceConstants.VALUE_TYPE_OBJECT; +import static org.apache.eventmesh.connector.mcp.source.McpSourceConstants.VALUE_TYPE_STRING; +import static org.apache.eventmesh.connector.mcp.source.McpSourceConstants.VALUE_TYPE_TEXT; +import static org.apache.eventmesh.connector.mcp.source.McpSourceConstants.X_ACCEL_BUFFERING_NO; + +import org.apache.eventmesh.common.config.connector.Config; +import org.apache.eventmesh.common.config.connector.mcp.McpSourceConfig; +import org.apache.eventmesh.common.exception.EventMeshException; +import org.apache.eventmesh.connector.mcp.source.protocol.Protocol; +import org.apache.eventmesh.connector.mcp.source.protocol.ProtocolFactory; +import org.apache.eventmesh.openconnect.api.ConnectorCreateService; +import org.apache.eventmesh.openconnect.api.connector.ConnectorContext; +import org.apache.eventmesh.openconnect.api.connector.SourceConnectorContext; +import org.apache.eventmesh.openconnect.api.source.Source; +import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; + +import io.vertx.core.Vertx; +import io.vertx.core.buffer.Buffer; +import io.vertx.core.http.HttpServer; +import io.vertx.core.http.HttpServerOptions; +import io.vertx.core.json.JsonArray; +import io.vertx.core.json.JsonObject; +import io.vertx.ext.web.Route; +import io.vertx.ext.web.Router; +import io.vertx.ext.web.RoutingContext; +import io.vertx.ext.web.client.WebClient; +import io.vertx.ext.web.handler.BodyHandler; +import io.vertx.ext.web.handler.LoggerHandler; + +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; + +/** + * MCP Source Connector for EventMesh Implements MCP protocol server allowing AI clients to interact with EventMesh via MCP protocol + */ +@Slf4j +public class McpSourceConnector implements Source, ConnectorCreateService { + + private McpSourceConfig sourceConfig; + + private BlockingQueue queue; + + private int batchSize; + + private String forwardPath; + + private Route route; + + private Protocol protocol; + + private HttpServer server; + + private Vertx vertx; + + private WebClient webClient; + + private McpToolRegistry toolRegistry; + + @Getter + private volatile boolean started = false; + + @Getter + private volatile boolean destroyed = false; + + @Override + public Class configClass() { + return McpSourceConfig.class; + } + + @Override + public Source create() { + return new McpSourceConnector(); + } + + @Override + public void init(Config config) { + this.sourceConfig = (McpSourceConfig) config; + doInit(); + } + + @Override + public void init(ConnectorContext connectorContext) { + SourceConnectorContext sourceConnectorContext = (SourceConnectorContext) connectorContext; + this.sourceConfig = (McpSourceConfig) sourceConnectorContext.getSourceConfig(); + doInit(); + } + + /** + * Initialize the connector + */ + private void doInit() { + log.info("Initializing MCP Source Connector..."); + + // Initialize queue + int maxQueueSize = this.sourceConfig.getConnectorConfig().getMaxStorageSize(); + this.queue = new LinkedBlockingQueue<>(maxQueueSize); + + // Initialize batch size + this.batchSize = this.sourceConfig.getConnectorConfig().getBatchSize(); + + String protocolName = this.sourceConfig.getConnectorConfig().getProtocol(); + this.protocol = ProtocolFactory.getInstance(this.sourceConfig.connectorConfig, protocolName); + + // Initialize tool registry + this.toolRegistry = new McpToolRegistry(); + registerDefaultTools(); + + // Initialize Vertx and router + this.vertx = Vertx.vertx(); + final Router router = Router.router(vertx); + this.webClient = WebClient.create(vertx); + + final String basePath = this.sourceConfig.connectorConfig.getPath(); + this.forwardPath = this.sourceConfig.connectorConfig.getForwardPath(); + + // Configure CORS (must be before all routes) + router.route().handler(ctx -> { + ctx.response() + .putHeader(HEADER_CORS_ALLOW_ORIGIN, CORS_ALLOW_ALL) + .putHeader(HEADER_CORS_ALLOW_METHODS, CORS_ALLOWED_METHODS) + .putHeader(HEADER_CORS_ALLOW_HEADERS, CORS_ALLOWED_HEADERS) + .putHeader(HEADER_CORS_EXPOSE_HEADERS, CORS_EXPOSED_HEADERS); + + if (HTTP_METHOD_OPTIONS.equals(ctx.request().method().name())) { + ctx.response().setStatusCode(HTTP_STATUS_NO_CONTENT).end(); + } else { + ctx.next(); + } + }); + + // Body handler + router.route().handler(BodyHandler.create()); + + // Main endpoint - handles both JSON-RPC and SSE requests + router.post(basePath) + .handler(LoggerHandler.create()) + .handler(ctx -> { + String contentType = ctx.request().getHeader(HEADER_CONTENT_TYPE); + String accept = ctx.request().getHeader(HEADER_ACCEPT); + + // Determine if it's an SSE request or JSON-RPC request + if (CONTENT_TYPE_SSE.startsWith(accept != null ? accept : "")) { + handleSseRequest(ctx); + } else { + handleJsonRpcRequest(ctx); + } + }); + + // GET request for SSE support + router.get(basePath) + .handler(this::handleSseRequest); + + // Health check endpoint + router.get(basePath + ENDPOINT_HEALTH).handler(ctx -> { + JsonObject health = new JsonObject() + .put(KEY_STATUS, VALUE_STATUS_UP) + .put(KEY_CONNECTOR, DEFAULT_CONNECTOR_NAME) + .put(KEY_TOOLS, toolRegistry.getToolCount()); + ctx.response() + .putHeader(HEADER_CONTENT_TYPE, CONTENT_TYPE_JSON) + .end(health.encode()); + }); + + Route forwardRoute = router.route().path(forwardPath).handler(LoggerHandler.create()); + + this.route = router.route() + .path(this.sourceConfig.connectorConfig.getPath()) + .handler(LoggerHandler.create()); + + // set protocol handler + this.protocol.setHandler(route, queue); + this.protocol.setHandler(forwardRoute, queue); + + // Create server + this.server = vertx.createHttpServer(new HttpServerOptions() + .setPort(this.sourceConfig.connectorConfig.getPort()) + .setHandle100ContinueAutomatically(true) + .setIdleTimeout(DEFAULT_IDLE_TIMEOUT_MS) + .setIdleTimeoutUnit(TimeUnit.MILLISECONDS)) + .requestHandler(router); + + log.info("MCP Source Connector initialized on http://127.0.0.1:{}{}", + this.sourceConfig.connectorConfig.getPort(), basePath); + } + + /** + * Register default MCP tools + */ + private void registerDefaultTools() { + // Echo tool + toolRegistry.registerTool( + "echo", + "Echo back the input message", + createEchoSchema(), + args -> { + String message = args.getString(PARAM_MESSAGE, DEFAULT_NO_MESSAGE); + return createTextContent("Echo: " + message); + } + ); + + // EventMesh message sending tool + toolRegistry.registerTool( + "sendEventMeshMessage", + "Send a message to EventMesh", + createSendMessageSchema(), + args -> { + String topic = args.getString(PARAM_TOPIC); + Object message = args.getString(PARAM_MESSAGE); + + webClient.post(this.sourceConfig.connectorConfig.getPort(), "127.0.0.1", this.forwardPath) + .putHeader(CORS_EXPOSED_HEADERS, CONTENT_TYPE_JSON_PLAIN) + .sendBuffer(Buffer.buffer( + new JsonObject() + .put("type", "mcp.tools.call") + .put("tool", "sendEventMeshMessage") + .put("arguments", new JsonObject().put("message", message).put("topic", topic)) + .encode() + ), ar -> { + if (ar.succeeded()) { + log.info("forwarded tools/call to {} OK, status={}", forwardPath, ar.result().statusCode()); + } else { + log.warn("forward tools/call failed: {}", ar.cause().toString()); + } + }); + + return createTextContent( + String.format("Message sent to topic '%s': %s", topic, message) + ); + } + ); + + log.info("Registered {} MCP tools", toolRegistry.getToolCount()); + } + + /** + * Handle JSON-RPC request (HTTP mode) + * + * @param ctx Routing context + */ + private void handleJsonRpcRequest(RoutingContext ctx) { + String body = ctx.body().asString(); + + try { + JsonObject request = new JsonObject(body); + JsonObject response = handleMcpRequest(request); + + if (response != null) { + ctx.response() + .putHeader(HEADER_CONTENT_TYPE, CONTENT_TYPE_JSON) + .end(response.encode()); + } else { + // Notification messages don't need response + ctx.response().setStatusCode(HTTP_STATUS_NO_CONTENT).end(); + } + + } catch (Exception e) { + JsonObject error = createErrorResponse(null, ERROR_INTERNAL, + "Internal error: " + e.getMessage()); + ctx.response() + .putHeader(HEADER_CONTENT_TYPE, CONTENT_TYPE_JSON) + .setStatusCode(HTTP_STATUS_INTERNAL_ERROR) + .end(error.encode()); + } + } + + /** + * Handle SSE request (Server-Sent Events mode) + * + * @param ctx Routing context + */ + private void handleSseRequest(RoutingContext ctx) { + ctx.response() + .putHeader(HEADER_CONTENT_TYPE, CONTENT_TYPE_SSE) + .putHeader(HEADER_CACHE_CONTROL, CACHE_CONTROL_NO_CACHE) + .putHeader(HEADER_CONNECTION, CONNECTION_KEEP_ALIVE) + .putHeader(HEADER_X_ACCEL_BUFFERING, X_ACCEL_BUFFERING_NO) + .setChunked(true); + + // Send connection established event + ctx.response().write(SSE_EVENT_OPEN); + ctx.response().write(SSE_DATA_OPEN); + + // Heartbeat (optional) + long timerId = vertx.setPeriodic(DEFAULT_HEARTBEAT_INTERVAL_MS, id -> { + if (!ctx.response().closed()) { + ctx.response().write(SSE_HEARTBEAT); + } else { + vertx.cancelTimer(id); + } + }); + + ctx.request().connection().closeHandler(v -> { + vertx.cancelTimer(timerId); + }); + } + + /** + * Handle MCP JSON-RPC request + * + * @param request JSON-RPC request object + * @return JSON-RPC response object, or null for notifications + */ + private JsonObject handleMcpRequest(JsonObject request) { + String method = request.getString(KEY_METHOD, ""); + Object id = request.getValue(KEY_ID); + JsonObject params = request.getJsonObject(KEY_PARAMS); + + switch (method) { + case METHOD_INITIALIZE: + return handleInitialize(id, params); + case METHOD_NOTIFICATIONS_INITIALIZED: + return null; // Notifications don't need response + case METHOD_TOOLS_LIST: + return handleToolsList(id); + case METHOD_TOOLS_CALL: + return handleToolsCall(id, params); + case METHOD_PING: + return createSuccessResponse(id, new JsonObject()); + default: + return createErrorResponse(id, ERROR_METHOD_NOT_FOUND, + "Method not found: " + method); + } + } + + /** + * Handle initialize method + * + * @param id Request ID + * @param params Request parameters + * @return Initialize response + */ + private JsonObject handleInitialize(Object id, JsonObject params) { + String clientVersion = params != null + ? params.getString(KEY_PROTOCOL_VERSION, DEFAULT_PROTOCOL_VERSION) + : DEFAULT_PROTOCOL_VERSION; + + JsonObject result = new JsonObject() + .put(KEY_PROTOCOL_VERSION, clientVersion) + .put(KEY_SERVER_INFO, new JsonObject() + .put(KEY_NAME, DEFAULT_SERVER_NAME) + .put(KEY_VERSION, DEFAULT_SERVER_VERSION)) + .put(KEY_CAPABILITIES, new JsonObject() + .put(KEY_TOOLS, new JsonObject())); + + return createSuccessResponse(id, result); + } + + /** + * Handle tools/list method + * + * @param id Request ID + * @return Tools list response + */ + private JsonObject handleToolsList(Object id) { + JsonArray tools = toolRegistry.getToolsArray(); + JsonObject result = new JsonObject().put(KEY_TOOLS, tools); + return createSuccessResponse(id, result); + } + + /** + * Handle tools/call method + * + * @param id Request ID + * @param params Tool call parameters + * @return Tool execution result + */ + private JsonObject handleToolsCall(Object id, JsonObject params) { + if (params == null) { + return createErrorResponse(id, ERROR_INVALID_PARAMS, "Invalid params"); + } + + String toolName = params.getString(KEY_NAME); + JsonObject arguments = params.getJsonObject("arguments", new JsonObject()); + + log.info("Calling tool: {} with arguments: {}", toolName, arguments); + + try { + JsonObject content = toolRegistry.executeTool(toolName, arguments); + JsonObject result = new JsonObject() + .put(KEY_CONTENT, new JsonArray().add(content)); + + return createSuccessResponse(id, result); + + } catch (IllegalArgumentException e) { + return createErrorResponse(id, ERROR_INVALID_PARAMS, e.getMessage()); + } catch (Exception e) { + log.error("Tool execution error", e); + return createErrorResponse(id, ERROR_INTERNAL, + "Tool execution failed: " + e.getMessage()); + } + } + + // ========== JSON-RPC Response Builders ========== + + /** + * Create a success response + * + * @param id Request ID + * @param result Result object + * @return JSON-RPC success response + */ + private JsonObject createSuccessResponse(Object id, JsonObject result) { + return new JsonObject() + .put(KEY_JSONRPC, VALUE_JSONRPC_VERSION) + .put(KEY_ID, id) + .put(KEY_RESULT, result); + } + + /** + * Create an error response + * + * @param id Request ID + * @param code Error code + * @param message Error message + * @return JSON-RPC error response + */ + private JsonObject createErrorResponse(Object id, int code, String message) { + return new JsonObject() + .put(KEY_JSONRPC, VALUE_JSONRPC_VERSION) + .put(KEY_ID, id) + .put(KEY_ERROR, new JsonObject() + .put(KEY_ERROR_CODE, code) + .put(KEY_ERROR_MESSAGE, message)); + } + + // ========== Schema Creation Helpers ========== + + /** + * Create JSON schema for echo tool + * + * @return Echo tool input schema + */ + private JsonObject createEchoSchema() { + return new JsonObject() + .put(KEY_TYPE, VALUE_TYPE_OBJECT) + .put(KEY_PROPERTIES, new JsonObject() + .put(PARAM_MESSAGE, new JsonObject() + .put(KEY_TYPE, VALUE_TYPE_STRING) + .put(KEY_DESCRIPTION, PARAM_DESC_MESSAGE))) + .put(KEY_REQUIRED, new JsonArray().add(PARAM_MESSAGE)); + } + + /** + * Create JSON schema for send message tool + * + * @return Send message tool input schema + */ + private JsonObject createSendMessageSchema() { + return new JsonObject() + .put(KEY_TYPE, VALUE_TYPE_OBJECT) + .put(KEY_PROPERTIES, new JsonObject() + .put(PARAM_TOPIC, new JsonObject() + .put(KEY_TYPE, VALUE_TYPE_STRING) + .put(KEY_DESCRIPTION, PARAM_DESC_TOPIC)) + .put(PARAM_MESSAGE, new JsonObject() + .put(KEY_TYPE, VALUE_TYPE_STRING) + .put(KEY_DESCRIPTION, PARAM_DESC_MESSAGE_CONTENT))) + .put(KEY_REQUIRED, new JsonArray().add(PARAM_TOPIC).add(PARAM_MESSAGE)); + } + + /** + * Create text content object + * + * @param text Text content + * @return MCP text content object + */ + private JsonObject createTextContent(String text) { + return new JsonObject() + .put(KEY_TYPE, VALUE_TYPE_TEXT) + .put(KEY_TEXT, text); + } + + // ========== Source Interface Implementation ========== + + @Override + public void start() { + this.server.listen(res -> { + if (res.succeeded()) { + this.started = true; + log.info("McpSourceConnector started on port: {}", + this.sourceConfig.getConnectorConfig().getPort()); + log.info("MCP endpoints available at:"); + log.info(" - POST {} (JSON-RPC)", this.sourceConfig.connectorConfig.getPath()); + log.info(" - GET {} (SSE)", this.sourceConfig.connectorConfig.getPath()); + log.info(" - GET {}{} (Health check)", + this.sourceConfig.connectorConfig.getPath(), ENDPOINT_HEALTH); + } else { + log.error("McpSourceConnector failed to start on port: {}", + this.sourceConfig.getConnectorConfig().getPort()); + throw new EventMeshException("failed to start Vertx server", res.cause()); + } + }); + } + + @Override + public void commit(ConnectRecord record) { + if (sourceConfig.getConnectorConfig().isDataConsistencyEnabled()) { + log.debug("McpSourceConnector commit record: {}", record.getRecordId()); + // MCP protocol processing doesn't require additional commit logic + } + } + + @Override + public String name() { + return this.sourceConfig.getConnectorConfig().getConnectorName(); + } + + @Override + public void onException(ConnectRecord record) { + log.error("Exception occurred for record: {}", record.getRecordId()); + // MCP errors are already handled via JSON-RPC error responses + } + + @Override + public void stop() { + if (this.server != null) { + this.server.close(res -> { + if (res.succeeded()) { + this.destroyed = true; + log.info("McpSourceConnector stopped on port: {}", + this.sourceConfig.getConnectorConfig().getPort()); + } else { + log.error("McpSourceConnector failed to stop on port: {}", + this.sourceConfig.getConnectorConfig().getPort()); + throw new EventMeshException("failed to stop Vertx server", res.cause()); + } + }); + } else { + log.warn("McpSourceConnector server is null, ignore."); + } + + if (this.vertx != null) { + this.vertx.close(); + } + } + + @Override + public List poll() { + long startTime = System.currentTimeMillis(); + long remainingTime = MAX_POLL_WAIT_TIME_MS; + + List connectRecords = new ArrayList<>(batchSize); + for (int i = 0; i < batchSize; i++) { + try { + Object obj = queue.poll(remainingTime, TimeUnit.MILLISECONDS); + if (obj == null) { + break; + } + + // Convert MCP tool calls to ConnectRecord + ConnectRecord connectRecord = protocol.convertToConnectRecord(obj); + connectRecords.add(connectRecord); + + long elapsedTime = System.currentTimeMillis() - startTime; + remainingTime = MAX_POLL_WAIT_TIME_MS > elapsedTime + ? MAX_POLL_WAIT_TIME_MS - elapsedTime : 0; + } catch (Exception e) { + log.error("Failed to poll from queue.", e); + throw new RuntimeException(e); + } + } + return connectRecords; + } +} diff --git a/eventmesh-connectors/eventmesh-connector-mcp/src/main/java/org/apache/eventmesh/connector/mcp/source/McpSourceConstants.java b/eventmesh-connectors/eventmesh-connector-mcp/src/main/java/org/apache/eventmesh/connector/mcp/source/McpSourceConstants.java new file mode 100644 index 0000000000..423cbbc2c7 --- /dev/null +++ b/eventmesh-connectors/eventmesh-connector-mcp/src/main/java/org/apache/eventmesh/connector/mcp/source/McpSourceConstants.java @@ -0,0 +1,433 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.connector.mcp.source; + +/** + * Constants for MCP Source Connector + */ +public final class McpSourceConstants { + + private McpSourceConstants() { + // Utility class, no instantiation + } + + // ========== Server Configuration ========== + + /** + * Default connector name + */ + public static final String DEFAULT_CONNECTOR_NAME = "mcp-source"; + + /** + * Default server name for MCP protocol + */ + public static final String DEFAULT_SERVER_NAME = "eventmesh-mcp-connector"; + + /** + * Default server version + */ + public static final String DEFAULT_SERVER_VERSION = "1.0.0"; + + /** + * Default MCP protocol version + */ + public static final String DEFAULT_PROTOCOL_VERSION = "2024-11-05"; + + /** + * Default idle timeout in milliseconds (60 seconds) + */ + public static final int DEFAULT_IDLE_TIMEOUT_MS = 60000; + + /** + * Default heartbeat interval in milliseconds (30 seconds) + */ + public static final long DEFAULT_HEARTBEAT_INTERVAL_MS = 30000; + + /** + * Maximum poll wait time in milliseconds (5 seconds) + */ + public static final long MAX_POLL_WAIT_TIME_MS = 5000; + + // ========== HTTP Headers ========== + + /** + * Content-Type header name + */ + public static final String HEADER_CONTENT_TYPE = "Content-Type"; + + /** + * Accept header name + */ + public static final String HEADER_ACCEPT = "Accept"; + + /** + * Access-Control-Allow-Origin header + */ + public static final String HEADER_CORS_ALLOW_ORIGIN = "Access-Control-Allow-Origin"; + + /** + * Access-Control-Allow-Methods header + */ + public static final String HEADER_CORS_ALLOW_METHODS = "Access-Control-Allow-Methods"; + + /** + * Access-Control-Allow-Headers header + */ + public static final String HEADER_CORS_ALLOW_HEADERS = "Access-Control-Allow-Headers"; + + /** + * Access-Control-Expose-Headers header + */ + public static final String HEADER_CORS_EXPOSE_HEADERS = "Access-Control-Expose-Headers"; + + /** + * Cache-Control header + */ + public static final String HEADER_CACHE_CONTROL = "Cache-Control"; + + /** + * Connection header + */ + public static final String HEADER_CONNECTION = "Connection"; + + /** + * X-Accel-Buffering header + */ + public static final String HEADER_X_ACCEL_BUFFERING = "X-Accel-Buffering"; + + // ========== CORS Values ========== + + /** + * CORS allow all origins + */ + public static final String CORS_ALLOW_ALL = "*"; + + /** + * CORS allowed methods + */ + public static final String CORS_ALLOWED_METHODS = "GET, POST, OPTIONS"; + + /** + * CORS allowed headers + */ + public static final String CORS_ALLOWED_HEADERS = "Content-Type, Authorization, Accept"; + + /** + * CORS exposed headers + */ + public static final String CORS_EXPOSED_HEADERS = "Content-Type"; + + // ========== Content Types ========== + + /** + * JSON content type with UTF-8 charset + */ + public static final String CONTENT_TYPE_JSON = "application/json; charset=utf-8"; + + /** + * Server-Sent Events content type + */ + public static final String CONTENT_TYPE_SSE = "text/event-stream; charset=utf-8"; + + /** + * Plain JSON content type (for matching) + */ + public static final String CONTENT_TYPE_JSON_PLAIN = "application/json"; + + // ========== HTTP Status Codes ========== + + /** + * HTTP 204 No Content + */ + public static final int HTTP_STATUS_NO_CONTENT = 204; + + /** + * HTTP 500 Internal Server Error + */ + public static final int HTTP_STATUS_INTERNAL_ERROR = 500; + + // ========== JSON-RPC Methods ========== + + /** + * Initialize method + */ + public static final String METHOD_INITIALIZE = "initialize"; + + /** + * Notifications initialized method + */ + public static final String METHOD_NOTIFICATIONS_INITIALIZED = "notifications/initialized"; + + /** + * Tools list method + */ + public static final String METHOD_TOOLS_LIST = "tools/list"; + + /** + * Tools call method + */ + public static final String METHOD_TOOLS_CALL = "tools/call"; + + /** + * Ping method + */ + public static final String METHOD_PING = "ping"; + + // ========== JSON-RPC Error Codes ========== + + /** + * Invalid params error code + */ + public static final int ERROR_INVALID_PARAMS = -32602; + + /** + * Method not found error code + */ + public static final int ERROR_METHOD_NOT_FOUND = -32601; + + /** + * Internal error code + */ + public static final int ERROR_INTERNAL = -32603; + + // ========== JSON Keys ========== + + /** + * JSON-RPC version key + */ + public static final String KEY_JSONRPC = "jsonrpc"; + + /** + * JSON-RPC version value + */ + public static final String VALUE_JSONRPC_VERSION = "2.0"; + + /** + * Method key + */ + public static final String KEY_METHOD = "method"; + + /** + * ID key + */ + public static final String KEY_ID = "id"; + + /** + * Params key + */ + public static final String KEY_PARAMS = "params"; + + /** + * Result key + */ + public static final String KEY_RESULT = "result"; + + /** + * Error key + */ + public static final String KEY_ERROR = "error"; + + /** + * Error code key + */ + public static final String KEY_ERROR_CODE = "code"; + + /** + * Error message key + */ + public static final String KEY_ERROR_MESSAGE = "message"; + + /** + * Protocol version key + */ + public static final String KEY_PROTOCOL_VERSION = "protocolVersion"; + + /** + * Server info key + */ + public static final String KEY_SERVER_INFO = "serverInfo"; + + /** + * Capabilities key + */ + public static final String KEY_CAPABILITIES = "capabilities"; + + /** + * Tools key + */ + public static final String KEY_TOOLS = "tools"; + + /** + * Name key + */ + public static final String KEY_NAME = "name"; + + /** + * Version key + */ + public static final String KEY_VERSION = "version"; + + /** + * Content key + */ + public static final String KEY_CONTENT = "content"; + + /** + * Type key + */ + public static final String KEY_TYPE = "type"; + + /** + * Text key + */ + public static final String KEY_TEXT = "text"; + + /** + * Description key + */ + public static final String KEY_DESCRIPTION = "description"; + + /** + * Input schema key + */ + public static final String KEY_INPUT_SCHEMA = "inputSchema"; + + /** + * Properties key + */ + public static final String KEY_PROPERTIES = "properties"; + + /** + * Required key + */ + public static final String KEY_REQUIRED = "required"; + + /** + * Status key + */ + public static final String KEY_STATUS = "status"; + + /** + * Connector key + */ + public static final String KEY_CONNECTOR = "connector"; + + // ========== JSON Values ========== + + /** + * Object type value + */ + public static final String VALUE_TYPE_OBJECT = "object"; + + /** + * String type value + */ + public static final String VALUE_TYPE_STRING = "string"; + + /** + * Text type value + */ + public static final String VALUE_TYPE_TEXT = "text"; + + /** + * Status UP value + */ + public static final String VALUE_STATUS_UP = "UP"; + + // ========== HTTP Methods ========== + + /** + * OPTIONS HTTP method + */ + public static final String HTTP_METHOD_OPTIONS = "OPTIONS"; + + // ========== SSE Events ========== + + /** + * SSE event: open + */ + public static final String SSE_EVENT_OPEN = "event: open\n"; + + /** + * SSE data for open event + */ + public static final String SSE_DATA_OPEN = "data: {\"type\":\"open\"}\n\n"; + + /** + * SSE heartbeat comment + */ + public static final String SSE_HEARTBEAT = ": heartbeat\n\n"; + + /** + * Cache control: no-cache + */ + public static final String CACHE_CONTROL_NO_CACHE = "no-cache"; + + /** + * Connection: keep-alive + */ + public static final String CONNECTION_KEEP_ALIVE = "keep-alive"; + + /** + * X-Accel-Buffering: no + */ + public static final String X_ACCEL_BUFFERING_NO = "no"; + + // ========== Tool Parameter Names ========== + + /** + * Message parameter name + */ + public static final String PARAM_MESSAGE = "message"; + + /** + * Topic parameter name + */ + public static final String PARAM_TOPIC = "topic"; + + // ========== Tool Parameter Descriptions ========== + + /** + * Message parameter description + */ + public static final String PARAM_DESC_MESSAGE = "Message to echo"; + + /** + * Topic parameter description + */ + public static final String PARAM_DESC_TOPIC = "EventMesh topic"; + + /** + * Message content parameter description + */ + public static final String PARAM_DESC_MESSAGE_CONTENT = "Message content"; + + // ========== Default Values ========== + + /** + * Default message when no message provided + */ + public static final String DEFAULT_NO_MESSAGE = "No message"; + + // ========== Endpoint Paths ========== + + /** + * Health check endpoint suffix + */ + public static final String ENDPOINT_HEALTH = "/health"; +} diff --git a/eventmesh-connectors/eventmesh-connector-mcp/src/main/java/org/apache/eventmesh/connector/mcp/source/McpToolRegistry.java b/eventmesh-connectors/eventmesh-connector-mcp/src/main/java/org/apache/eventmesh/connector/mcp/source/McpToolRegistry.java new file mode 100644 index 0000000000..a59d93f0b6 --- /dev/null +++ b/eventmesh-connectors/eventmesh-connector-mcp/src/main/java/org/apache/eventmesh/connector/mcp/source/McpToolRegistry.java @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.connector.mcp.source; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import io.vertx.core.json.JsonArray; +import io.vertx.core.json.JsonObject; + +import lombok.extern.slf4j.Slf4j; + +/** + * MCP Tool Registry + * Manages all available MCP tools + */ +@Slf4j +public class McpToolRegistry { + + private final Map tools = new ConcurrentHashMap<>(); + + /** + * Register an MCP tool + * @param name Tool name + * @param description Tool description + * @param inputSchema JSON schema for tool input parameters + * @param executor Tool execution logic + */ + public void registerTool(String name, String description, + JsonObject inputSchema, ToolExecutor executor) { + McpTool tool = new McpTool(name, description, inputSchema, executor); + tools.put(name, tool); + log.info("Registered MCP tool: {}", name); + } + + /** + * Execute a specified tool + * @param name Tool name + * @param arguments Tool arguments as JSON object + * @return Tool execution result as MCP content object + * @throws IllegalArgumentException if tool not found + * @throws RuntimeException if tool execution fails + */ + public JsonObject executeTool(String name, JsonObject arguments) { + McpTool tool = tools.get(name); + if (tool == null) { + throw new IllegalArgumentException("Unknown tool: " + name); + } + + try { + return tool.executor.execute(arguments); + } catch (Exception e) { + log.error("Tool execution failed: {}", name, e); + throw new RuntimeException("Tool execution failed: " + e.getMessage(), e); + } + } + + /** + * Get all tools as a JSON array + * @return JSON array containing all registered tools with their metadata + */ + public JsonArray getToolsArray() { + JsonArray array = new JsonArray(); + for (McpTool tool : tools.values()) { + JsonObject toolObj = new JsonObject() + .put("name", tool.name) + .put("description", tool.description) + .put("inputSchema", tool.inputSchema); + array.add(toolObj); + } + return array; + } + + /** + * Get the number of registered tools + * @return Total count of registered tools + */ + public int getToolCount() { + return tools.size(); + } + + /** + * Check if a tool exists + * @param name Tool name to check + * @return true if tool is registered, false otherwise + */ + public boolean hasTool(String name) { + return tools.containsKey(name); + } + + /** + * Tool Executor Interface + * Functional interface for defining tool execution logic + */ + @FunctionalInterface + public interface ToolExecutor { + /** + * Execute tool logic + * @param arguments Tool input arguments as JSON object + * @return MCP content object (must contain 'type' and 'text' fields) + * @throws Exception if execution fails + */ + JsonObject execute(JsonObject arguments) throws Exception; + } + + /** + * MCP Tool Definition + * Internal class representing a registered MCP tool + */ + private static class McpTool { + final String name; + final String description; + final JsonObject inputSchema; + final ToolExecutor executor; + + McpTool(String name, String description, JsonObject inputSchema, ToolExecutor executor) { + this.name = name; + this.description = description; + this.inputSchema = inputSchema; + this.executor = executor; + } + } +} diff --git a/eventmesh-connectors/eventmesh-connector-mcp/src/main/java/org/apache/eventmesh/connector/mcp/source/data/McpRequest.java b/eventmesh-connectors/eventmesh-connector-mcp/src/main/java/org/apache/eventmesh/connector/mcp/source/data/McpRequest.java new file mode 100644 index 0000000000..38b9be8b53 --- /dev/null +++ b/eventmesh-connectors/eventmesh-connector-mcp/src/main/java/org/apache/eventmesh/connector/mcp/source/data/McpRequest.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.connector.mcp.source.data; + +import java.io.Serializable; + +import io.vertx.core.json.JsonObject; +import io.vertx.ext.web.RoutingContext; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +/** + * MCP Protocol Request + * Represents a request in the MCP (Model Context Protocol) format + */ +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +public class McpRequest implements Serializable { + + private static final long serialVersionUID = -483500600756490500L; + + /** + * Protocol name + */ + private String protocolName; + + /** + * Session ID for tracking the request + */ + private String sessionId; + + /** + * MCP method name + */ + private String method; + + /** + * Tool name + */ + private String toolName; + + /** + * Tool arguments + */ + private JsonObject arguments; + + /** + * Tool execution result + */ + private JsonObject result; + + /** + * Request timestamp + */ + private long timestamp; + + /** + * Whether the tool execution succeeded + */ + private boolean success; + + /** + * Error message if execution failed + */ + private String errorMessage; + + /** + * Vert.x routing context for HTTP response handling + */ + private transient RoutingContext routingContext; +} diff --git a/eventmesh-connectors/eventmesh-connector-mcp/src/main/java/org/apache/eventmesh/connector/mcp/source/data/McpResponse.java b/eventmesh-connectors/eventmesh-connector-mcp/src/main/java/org/apache/eventmesh/connector/mcp/source/data/McpResponse.java new file mode 100644 index 0000000000..93e1cbcfab --- /dev/null +++ b/eventmesh-connectors/eventmesh-connector-mcp/src/main/java/org/apache/eventmesh/connector/mcp/source/data/McpResponse.java @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.connector.mcp.source.data; + +import java.io.Serializable; +import java.time.LocalDateTime; + +import com.alibaba.fastjson2.JSON; +import com.alibaba.fastjson2.JSONWriter.Feature; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +/** + * MCP Response + * Represents a response message for MCP protocol operations + */ +@Data +@NoArgsConstructor +@AllArgsConstructor +public class McpResponse implements Serializable { + + private static final long serialVersionUID = 8616938575207104455L; + + /** + * Response status: "success", "error", etc. + */ + private String status; + + /** + * Response message + */ + private String msg; + + /** + * Response timestamp + */ + private LocalDateTime handleTime; + + /** + * Additional error code for error responses + */ + private Integer errorCode; + + /** + * Additional data payload + */ + private Object data; + + /** + * Convert to JSON string + * + * @return JSON string representation + */ + public String toJsonStr() { + return JSON.toJSONString(this, Feature.WriteMapNullValue); + } + + /** + * Create a success response + * + * @return Success response + */ + public static McpResponse success() { + return new McpResponse("success", "Operation completed successfully", + LocalDateTime.now(), null, null); + } + + /** + * Create a success response with message + * + * @param msg Success message + * @return Success response + */ + public static McpResponse success(String msg) { + return new McpResponse("success", msg, LocalDateTime.now(), null, null); + } + + /** + * Create a success response with data + * + * @param msg Success message + * @param data Response data + * @return Success response with data + */ + public static McpResponse success(String msg, Object data) { + return new McpResponse("success", msg, LocalDateTime.now(), null, data); + } + + /** + * Create an error response + * + * @param msg Error message + * @return Error response + */ + public static McpResponse error(String msg) { + return new McpResponse("error", msg, LocalDateTime.now(), null, null); + } + + /** + * Create an error response with error code + * + * @param msg Error message + * @param errorCode Error code + * @return Error response with code + */ + public static McpResponse error(String msg, Integer errorCode) { + return new McpResponse("error", msg, LocalDateTime.now(), errorCode, null); + } + + /** + * Create a base response + * + * @param msg Message + * @return Base response + */ + public static McpResponse base(String msg) { + return new McpResponse("info", msg, LocalDateTime.now(), null, null); + } +} diff --git a/eventmesh-connectors/eventmesh-connector-mcp/src/main/java/org/apache/eventmesh/connector/mcp/source/protocol/Protocol.java b/eventmesh-connectors/eventmesh-connector-mcp/src/main/java/org/apache/eventmesh/connector/mcp/source/protocol/Protocol.java new file mode 100644 index 0000000000..f18397a6c3 --- /dev/null +++ b/eventmesh-connectors/eventmesh-connector-mcp/src/main/java/org/apache/eventmesh/connector/mcp/source/protocol/Protocol.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.connector.mcp.source.protocol; + +import org.apache.eventmesh.common.config.connector.mcp.SourceConnectorConfig; +import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord; + +import java.util.concurrent.BlockingQueue; + +import io.vertx.ext.web.Route; + +/** + * Protocol Interface. + * All protocols should implement this interface. + */ +public interface Protocol { + + /** + * Initialize the protocol. + * + * @param sourceConnectorConfig source connector config + */ + void initialize(SourceConnectorConfig sourceConnectorConfig); + + + /** + * Handle the protocol message. + * + * @param route route + * @param queue queue info + */ + void setHandler(Route route, BlockingQueue queue); + + + /** + * Convert the message to ConnectRecord. + * + * @param message message + * @return ConnectRecord + */ + ConnectRecord convertToConnectRecord(Object message); +} diff --git a/eventmesh-connectors/eventmesh-connector-mcp/src/main/java/org/apache/eventmesh/connector/mcp/source/protocol/ProtocolFactory.java b/eventmesh-connectors/eventmesh-connector-mcp/src/main/java/org/apache/eventmesh/connector/mcp/source/protocol/ProtocolFactory.java new file mode 100644 index 0000000000..1c30d4e43e --- /dev/null +++ b/eventmesh-connectors/eventmesh-connector-mcp/src/main/java/org/apache/eventmesh/connector/mcp/source/protocol/ProtocolFactory.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.connector.mcp.source.protocol; + +import org.apache.eventmesh.common.config.connector.mcp.SourceConnectorConfig; +import org.apache.eventmesh.connector.mcp.source.protocol.impl.McpStandardProtocol; + +import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; + + +/** + * Protocol factory. This class is responsible for storing and creating instances of {@link Protocol} classes. + */ +public class ProtocolFactory { + // protocol name -> protocol class + private static final ConcurrentHashMap> protocols = new ConcurrentHashMap<>(); + + static { + // register all protocols + registerProtocol(McpStandardProtocol.PROTOCOL_NAME, McpStandardProtocol.class); + } + + + /** + * Register a protocol + * + * @param name name of the protocol + * @param clazz class of the protocol + */ + public static void registerProtocol(String name, Class clazz) { + if (Protocol.class.isAssignableFrom(clazz)) { + // put the class into the map(case insensitive) + protocols.put(name.toLowerCase(), clazz); + } else { + throw new IllegalArgumentException("Class " + clazz.getName() + " does not implement Protocol interface"); + } + } + + /** + * Get an instance of a protocol, if it is not already created, create a new instance + * + * @param name name of the protocol + * @return instance of the protocol + */ + public static Protocol getInstance(SourceConnectorConfig sourceConnectorConfig, String name) { + // get the class by name(case insensitive) + Class clazz = Optional.ofNullable(protocols.get(name.toLowerCase())) + .orElseThrow(() -> new IllegalArgumentException("Protocol " + name + " is not registered")); + try { + // create a new instance + Protocol protocol = (Protocol) clazz.newInstance(); + // initialize the protocol + protocol.initialize(sourceConnectorConfig); + return protocol; + } catch (InstantiationException | IllegalAccessException e) { + throw new IllegalArgumentException("Failed to instantiate protocol " + name, e); + } + } +} diff --git a/eventmesh-connectors/eventmesh-connector-mcp/src/main/java/org/apache/eventmesh/connector/mcp/source/protocol/impl/McpStandardProtocol.java b/eventmesh-connectors/eventmesh-connector-mcp/src/main/java/org/apache/eventmesh/connector/mcp/source/protocol/impl/McpStandardProtocol.java new file mode 100644 index 0000000000..737dae82de --- /dev/null +++ b/eventmesh-connectors/eventmesh-connector-mcp/src/main/java/org/apache/eventmesh/connector/mcp/source/protocol/impl/McpStandardProtocol.java @@ -0,0 +1,339 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.connector.mcp.source.protocol.impl; + +import org.apache.eventmesh.common.Constants; +import org.apache.eventmesh.common.config.connector.mcp.SourceConnectorConfig; +import org.apache.eventmesh.connector.mcp.source.data.McpRequest; +import org.apache.eventmesh.connector.mcp.source.data.McpResponse; +import org.apache.eventmesh.connector.mcp.source.protocol.Protocol; +import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord; + +import java.util.Base64; +import java.util.UUID; +import java.util.concurrent.BlockingQueue; + +import io.netty.handler.codec.http.HttpResponseStatus; +import io.vertx.core.http.HttpMethod; +import io.vertx.core.json.JsonObject; +import io.vertx.ext.web.Route; +import io.vertx.ext.web.handler.BodyHandler; + +import lombok.extern.slf4j.Slf4j; + +/** + * MCP Standard Protocol Implementation + * Handles MCP (Model Context Protocol) requests and converts them to EventMesh ConnectRecords + */ +@Slf4j +public class McpStandardProtocol implements Protocol { + + /** + * Protocol name constant + */ + public static final String PROTOCOL_NAME = "MCP"; + + // Extension keys + private static final String EXTENSION_PROTOCOL = "protocol"; + private static final String EXTENSION_SESSION_ID = "sessionid"; + private static final String EXTENSION_TOOL_NAME = "toolname"; + private static final String EXTENSION_METHOD = "method"; // ok + private static final String EXTENSION_REQUEST_ID = "requestid"; + private static final String EXTENSION_SUCCESS = "success"; // ok + private static final String EXTENSION_ERROR_MESSAGE = "errormessage"; + private static final String EXTENSION_ROUTING_CONTEXT = "routingcontext"; + private static final String EXTENSION_IS_BASE64 = "isbase64"; + private static final String METADATA_EXTENSION_KEY = "extension"; + + private SourceConnectorConfig sourceConnectorConfig; + + /** + * Initialize the protocol + * + * @param sourceConnectorConfig Source connector configuration + */ + @Override + public void initialize(SourceConnectorConfig sourceConnectorConfig) { + this.sourceConnectorConfig = sourceConnectorConfig; + log.info("Initialized MCP Standard Protocol"); + } + + /** + * Set the handler for the route + * This method is called when using the protocol in a generic HTTP connector context + * + * @param route Vert.x route to configure + * @param queue Queue for storing requests + */ + @Override + public void setHandler(Route route, BlockingQueue queue) { + route.method(HttpMethod.POST) + .handler(BodyHandler.create()) + .handler(ctx -> { + try { + // Parse the request body + String bodyString = ctx.body().asString(Constants.DEFAULT_CHARSET.toString()); + + // Try to parse as JSON + JsonObject requestJson; + try { + requestJson = new JsonObject(bodyString); + } catch (Exception e) { + log.error("Failed to parse request as JSON: {}", bodyString, e); + ctx.response() + .setStatusCode(HttpResponseStatus.BAD_REQUEST.code()) + .putHeader("Content-Type", "application/json") + .end(McpResponse.error("Invalid JSON format").toJsonStr()); + return; + } + + // Extract JSON-RPC fields + String method = requestJson.getString("type", ""); + String toolName = requestJson.getString("tool", ""); + JsonObject params = requestJson.getJsonObject("arguments"); + + // Generate session ID if not present + String sessionId = ctx.request().getHeader("Mcp-Session-Id"); + if (sessionId == null || sessionId.isEmpty()) { + sessionId = generateSessionId(); + } + + // Create MCP request based on method type + McpRequest mcpRequest = createMcpRequest( + method, + params, + sessionId, + toolName, + ctx + ); + + // Queue the request + if (!queue.offer(mcpRequest)) { + log.error("Failed to queue MCP request: queue is full"); + ctx.response() + .setStatusCode(HttpResponseStatus.SERVICE_UNAVAILABLE.code()) + .putHeader("Content-Type", "application/json") + .end(McpResponse.error("Service temporarily unavailable").toJsonStr()); + return; + } + + // If data consistency is not enabled, return immediate response + if (!sourceConnectorConfig.isDataConsistencyEnabled()) { + ctx.response() + .setStatusCode(HttpResponseStatus.OK.code()) + .putHeader("Content-Type", "application/json") + .end(McpResponse.success().toJsonStr()); + } + // Otherwise, response will be sent after processing (via commit) + + } catch (Exception e) { + log.error("Error handling MCP request", e); + ctx.response() + .setStatusCode(HttpResponseStatus.INTERNAL_SERVER_ERROR.code()) + .putHeader("Content-Type", "application/json") + .end(McpResponse.error("Internal server error: " + e.getMessage()).toJsonStr()); + } + }) + .failureHandler(ctx -> { + log.error("Failed to handle MCP request", ctx.failure()); + + // Return error response + ctx.response() + .setStatusCode(ctx.statusCode() > 0 ? ctx.statusCode() : 500) + .putHeader("Content-Type", "application/json") + .end(McpResponse.error(ctx.failure().getMessage()).toJsonStr()); + }); + } + + /** + * Create MCP request from parsed JSON-RPC data + * + * @param method JSON-RPC method name + * @param params JSON-RPC params + * @param sessionId Session identifier + * @param tool Tool name + * @param ctx Routing context + * @return Constructed McpRequest + */ + private McpRequest createMcpRequest( + String method, + JsonObject params, + String sessionId, + String tool, + io.vertx.ext.web.RoutingContext ctx) { + + McpRequest.McpRequestBuilder builder = McpRequest.builder() + .protocolName(PROTOCOL_NAME) + .sessionId(sessionId) + .method(method) + .toolName(tool) + .timestamp(System.currentTimeMillis()) + .routingContext(ctx); + + + // Handle different method types + if ("mcp.tools.call".equals(method) && params != null) { + // Tool call request + String toolName = params.getString("name"); + JsonObject arguments = params.getJsonObject("arguments", new JsonObject()); + + builder.toolName(toolName) + .arguments(arguments) + .success(false); // Will be set to true after execution + + + } else if ("initialize".equals(method)) { + // Initialize request + builder.success(true); + + } else { + // Other methods + builder.success(true); + } + + return builder.build(); + } + + /** + * Convert MCP request to ConnectRecord + * Simple and direct conversion following the existing pattern + * + * @param message MCP request message + * @return ConnectRecord representation + */ + @Override + public ConnectRecord convertToConnectRecord(Object message) { + // Validate input + if (message == null) { + throw new IllegalArgumentException("Message cannot be null"); + } + + if (!(message instanceof McpRequest)) { + throw new IllegalArgumentException( + String.format("Expected McpRequest but got %s", message.getClass().getName()) + ); + } + + McpRequest request = (McpRequest) message; + + // Get timestamp + long timestamp = request.getTimestamp() > 0 + ? request.getTimestamp() + : System.currentTimeMillis(); + + // Get data (priority: result > arguments > inputs) + Object data = extractData(request); + + // Create ConnectRecord + ConnectRecord connectRecord = new ConnectRecord(null, null, timestamp, data); + + // Add protocol extension + connectRecord.addExtension(EXTENSION_PROTOCOL, PROTOCOL_NAME); + + // Add session ID + if (request.getSessionId() != null) { + connectRecord.addExtension(EXTENSION_SESSION_ID, request.getSessionId()); + } + + // Add method + if (request.getMethod() != null) { + connectRecord.addExtension(EXTENSION_METHOD, request.getMethod()); + } + + // Add tool name (for tool calls) + if (request.getToolName() != null) { + connectRecord.addExtension(EXTENSION_TOOL_NAME, request.getToolName()); + } + + // Add success status + connectRecord.addExtension(EXTENSION_SUCCESS, String.valueOf(request.isSuccess())); + + // Add error message if failed + if (!request.isSuccess() && request.getErrorMessage() != null) { + connectRecord.addExtension(EXTENSION_ERROR_MESSAGE, request.getErrorMessage()); + } + + // Handle Base64 decoding if needed + handleBase64Decoding(connectRecord); + + // Add routing context for response handling + if (request.getRoutingContext() != null) { + connectRecord.addExtension(EXTENSION_ROUTING_CONTEXT, request.getRoutingContext()); + } + + return connectRecord; + } + + /** + * Extract data from MCP request + * Priority: result > arguments > inputs + */ + private Object extractData(McpRequest request) { + if (request.isSuccess() && request.getResult() != null) { + return request.getResult().encode(); + } + + if (request.getArguments() != null) { + return request.getArguments().encode(); + } + + return String.format("{\"tool\":\"%s\",\"timestamp\":%d}", + request.getToolName(), request.getTimestamp()); + } + + /** + * Handle Base64 decoding if isBase64 flag is set + */ + private void handleBase64Decoding(ConnectRecord connectRecord) { + Object isBase64Obj = connectRecord.getExtensionObj(EXTENSION_IS_BASE64); + + if (isBase64Obj == null) { + return; + } + + // Parse boolean value + boolean isBase64; + if (isBase64Obj instanceof Boolean) { + isBase64 = (Boolean) isBase64Obj; + } else { + isBase64 = Boolean.parseBoolean(String.valueOf(isBase64Obj)); + } + + // Decode if needed + if (isBase64 && connectRecord.getData() != null) { + try { + String dataStr = connectRecord.getData().toString(); + byte[] decodedData = Base64.getDecoder().decode(dataStr); + connectRecord.setData(decodedData); + log.debug("Decoded Base64 data: {} bytes", decodedData.length); + } catch (IllegalArgumentException e) { + log.error("Failed to decode Base64 data: {}", e.getMessage()); + // Keep original data if decoding fails + } + } + } + + /** + * Generate a unique session ID + * + * @return Generated session ID + */ + private String generateSessionId() { + return "mcp-session-" + UUID.randomUUID(); + } +} diff --git a/eventmesh-connectors/eventmesh-connector-mcp/src/main/resources/META-INF.eventmesh/org.apache.eventmesh.openconnect.api.ConnectorCreateService b/eventmesh-connectors/eventmesh-connector-mcp/src/main/resources/META-INF.eventmesh/org.apache.eventmesh.openconnect.api.ConnectorCreateService new file mode 100644 index 0000000000..01a46e9de0 --- /dev/null +++ b/eventmesh-connectors/eventmesh-connector-mcp/src/main/resources/META-INF.eventmesh/org.apache.eventmesh.openconnect.api.ConnectorCreateService @@ -0,0 +1,20 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + + +MCP-Source=org.apache.eventmesh.connector.mcp.source.McpSourceConnector +MCP-Sink=org.apache.eventmesh.connector.mcp.sink.McpSinkConnector \ No newline at end of file diff --git a/eventmesh-connectors/eventmesh-connector-mcp/src/main/resources/server-config.yml b/eventmesh-connectors/eventmesh-connector-mcp/src/main/resources/server-config.yml new file mode 100644 index 0000000000..8009f5c76a --- /dev/null +++ b/eventmesh-connectors/eventmesh-connector-mcp/src/main/resources/server-config.yml @@ -0,0 +1,19 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +sourceEnable: true +sinkEnable: false \ No newline at end of file diff --git a/eventmesh-connectors/eventmesh-connector-mcp/src/main/resources/sink-config.yml b/eventmesh-connectors/eventmesh-connector-mcp/src/main/resources/sink-config.yml new file mode 100644 index 0000000000..c04f886699 --- /dev/null +++ b/eventmesh-connectors/eventmesh-connector-mcp/src/main/resources/sink-config.yml @@ -0,0 +1,56 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +pubSubConfig: + meshAddress: 127.0.0.1:10000 + subject: TopicTest + idc: FT + env: PRD + group: mcpSink + appId: 5032 + userName: mcpSourceUser + passWord: mcpPassWord +connectorConfig: + connectorName: mcpSink + urls: + - http://127.0.0.1:7092/test + keepAlive: true + keepAliveTimeout: 60000 + idleTimeout: 5000 # timeunit: ms, recommended scope: common(5s - 10s), webhook(15s - 60s) + connectionTimeout: 5000 # timeunit: ms, recommended scope: 5 - 10s + maxConnectionPoolSize: 5 + retryConfig: + maxRetries: 2 + interval: 1000 + retryOnNonSuccess: false diff --git a/eventmesh-connectors/eventmesh-connector-mcp/src/main/resources/source-config.yml b/eventmesh-connectors/eventmesh-connector-mcp/src/main/resources/source-config.yml new file mode 100644 index 0000000000..66ad75d37b --- /dev/null +++ b/eventmesh-connectors/eventmesh-connector-mcp/src/main/resources/source-config.yml @@ -0,0 +1,38 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +pubSubConfig: + meshAddress: 127.0.0.1:10000 + subject: TopicTest + idc: FT + env: PRD + group: mcpSource + appId: 5032 + userName: mcpSourceUser + passWord: mcpPassWord +connectorConfig: + connectorName: mcpSource + path: /test + port: 7091 + idleTimeout: 5000 # timeunit: ms + maxFormAttributeSize: 1048576 # timeunit: byte, default: 1048576(1MB). This applies only when handling form data submissions. + protocol: MCP # Case insensitive, default: CloudEvent, options: CloudEvent, GitHub, Common + extraConfig: # extra config for different protocol, e.g. GitHub secret + streamType: chunked + contentType: application/json + reconnection: true + forwardPath: /forward \ No newline at end of file diff --git a/eventmesh-connectors/eventmesh-connector-mcp/src/test/resources/server-config.yml b/eventmesh-connectors/eventmesh-connector-mcp/src/test/resources/server-config.yml new file mode 100644 index 0000000000..0cd7b5b5ab --- /dev/null +++ b/eventmesh-connectors/eventmesh-connector-mcp/src/test/resources/server-config.yml @@ -0,0 +1,19 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +sourceEnable: true +sinkEnable: false diff --git a/eventmesh-examples/src/main/java/org/apache/eventmesh/http/demo/sub/RemoteSubscribeInstance.java b/eventmesh-examples/src/main/java/org/apache/eventmesh/http/demo/sub/RemoteSubscribeInstance.java index 99837d27c4..f045bf077b 100644 --- a/eventmesh-examples/src/main/java/org/apache/eventmesh/http/demo/sub/RemoteSubscribeInstance.java +++ b/eventmesh-examples/src/main/java/org/apache/eventmesh/http/demo/sub/RemoteSubscribeInstance.java @@ -32,23 +32,66 @@ import org.apache.eventmesh.common.utils.IPUtils; import org.apache.eventmesh.common.utils.JsonUtils; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpPost; +import org.apache.http.entity.StringEntity; import org.apache.http.impl.client.CloseableHttpClient; import org.apache.http.impl.client.HttpClients; +import org.apache.http.util.EntityUtils; +import java.io.IOException; +import java.nio.charset.StandardCharsets; import java.util.Collections; +import java.util.HashMap; +import java.util.Map; import java.util.Objects; import io.netty.handler.codec.http.HttpMethod; +import lombok.extern.slf4j.Slf4j; + +@Slf4j public class RemoteSubscribeInstance { static final CloseableHttpClient httpClient = HttpClients.createDefault(); - public static void main(String[] args) { - subscribeRemote(); + public static void main(String[] args) throws IOException { + subscribeLocal(); + //subscribeRemote(); // unsubscribeRemote(); } + private static void subscribeLocal() throws IOException { + SubscriptionItem item = new SubscriptionItem(); + item.setTopic(ExampleConstants.EVENTMESH_HTTP_ASYNC_TEST_TOPIC); + item.setMode(SubscriptionMode.CLUSTERING); + item.setType(SubscriptionType.ASYNC); + + Map body = new HashMap<>(); + body.put("url", "http://127.0.0.1:8088/sub/test"); + body.put("consumerGroup", "EventMeshTest-consumerGroup"); + body.put("topic", Collections.singletonList(item)); + + String json = JsonUtils.toJSONString(body); + // 2) use HttpPost + HttpPost post = new HttpPost("http://127.0.0.1:10105/eventmesh/subscribe/local"); + post.setHeader("Content-Type", "application/json"); + post.setHeader("env", "prod"); + post.setHeader("idc", "default"); + post.setHeader("sys", "http-client-demo"); + post.setHeader("username", "eventmesh"); + post.setHeader("passwd", "eventmesh"); + post.setHeader("ip", IPUtils.getLocalAddress()); + post.setHeader("language", "JAVA"); + post.setEntity(new StringEntity(json, StandardCharsets.UTF_8)); + + try (CloseableHttpResponse resp = httpClient.execute(post)) { + String respBody = EntityUtils.toString(resp.getEntity(), StandardCharsets.UTF_8); + log.info("respStatusLine:{}", resp.getStatusLine()); + log.info("respBody:{}", respBody); + } + } + private static void subscribeRemote() { SubscriptionItem subscriptionItem = new SubscriptionItem(); subscriptionItem.setTopic(ExampleConstants.EVENTMESH_HTTP_ASYNC_TEST_TOPIC); diff --git a/settings.gradle b/settings.gradle index b013a57929..327ca7e1a2 100644 --- a/settings.gradle +++ b/settings.gradle @@ -76,6 +76,7 @@ include 'eventmesh-connectors:eventmesh-connector-wechat' include 'eventmesh-connectors:eventmesh-connector-http' include 'eventmesh-connectors:eventmesh-connector-chatgpt' include 'eventmesh-connectors:eventmesh-connector-canal' +include 'eventmesh-connectors:eventmesh-connector-mcp' include 'eventmesh-storage-plugin:eventmesh-storage-api' include 'eventmesh-storage-plugin:eventmesh-storage-standalone'