diff --git a/server/src/main/java/com/epam/aidial/core/server/controller/BaseDeploymentPostController.java b/server/src/main/java/com/epam/aidial/core/server/controller/BaseDeploymentPostController.java index bfe9cb0c0..0a2847e66 100644 --- a/server/src/main/java/com/epam/aidial/core/server/controller/BaseDeploymentPostController.java +++ b/server/src/main/java/com/epam/aidial/core/server/controller/BaseDeploymentPostController.java @@ -8,7 +8,6 @@ import com.epam.aidial.core.server.Proxy; import com.epam.aidial.core.server.ProxyContext; import com.epam.aidial.core.server.data.ApiKeyData; -import com.epam.aidial.core.server.function.CollectResponseAttachmentsFn; import com.epam.aidial.core.server.function.CollectResponseChatCompletionAttachmentsFn; import com.epam.aidial.core.server.token.TokenUsage; import com.epam.aidial.core.server.token.TokenUsageParser; @@ -35,6 +34,7 @@ import java.util.Objects; import java.util.Set; import java.util.function.Function; +import java.util.function.Supplier; import static com.epam.aidial.core.server.Proxy.HEADER_APPLICATION_ID; import static com.epam.aidial.core.server.Proxy.HEADER_APPLICATION_PROPERTIES; @@ -49,21 +49,28 @@ public class BaseDeploymentPostController { protected final Proxy proxy; protected final ProxyContext context; - protected BufferingReadStream createResponseStream(HttpClientResponse proxyResponse) { + protected BufferingReadStream createResponseStream(HttpClientResponse proxyResponse, + Supplier listenerSupplier) { + BufferingReadStream.BaseEventListener eventListener = null; + if (isEventStreamResponse(proxyResponse)) { + eventListener = listenerSupplier.get(); + } + return new BufferingReadStream(proxyResponse, ProxyUtil.contentLength(proxyResponse, 1024), eventListener); + } + + protected boolean isEventStreamResponse(HttpClientResponse proxyResponse) { String contentType = proxyResponse.getHeader(HttpHeaders.CONTENT_TYPE); - boolean isEventStreamResponse = Strings.CI.contains(contentType, "text/event-stream") && context.isStreamingRequest(); - CollectResponseAttachmentsFn handler = isEventStreamResponse ? new CollectResponseChatCompletionAttachmentsFn(proxy, context) : null; - return new BufferingReadStream(proxyResponse, ProxyUtil.contentLength(proxyResponse, 1024), handler); + return Strings.CI.contains(contentType, "text/event-stream") && context.isStreamingRequest(); } protected Future collectResponseAttachments(Buffer responseBody) { - if (context.isStreamingRequest()) { + if (isEventStreamResponse(context.getProxyResponse())) { return Future.succeededFuture(); } try (InputStream stream = new ByteBufInputStream(responseBody.getByteBuf())) { ObjectNode tree = (ObjectNode) ProxyUtil.MAPPER.readTree(stream); var fn = new CollectResponseChatCompletionAttachmentsFn(proxy, context); - return fn.apply(tree); + return fn.apply(tree).map(ignored -> null); } catch (Throwable e) { log.warn("Can't parse JSON response body. Error:", e); return Future.failedFuture(e); diff --git a/server/src/main/java/com/epam/aidial/core/server/controller/DeploymentPostController.java b/server/src/main/java/com/epam/aidial/core/server/controller/DeploymentPostController.java index a6a41bd79..db1cc2524 100644 --- a/server/src/main/java/com/epam/aidial/core/server/controller/DeploymentPostController.java +++ b/server/src/main/java/com/epam/aidial/core/server/controller/DeploymentPostController.java @@ -10,15 +10,18 @@ import com.epam.aidial.core.server.ProxyContext; import com.epam.aidial.core.server.data.ApiKeyData; import com.epam.aidial.core.server.function.BaseRequestFunction; +import com.epam.aidial.core.server.function.BaseResponseFunction; import com.epam.aidial.core.server.function.BuildUpstreamCacheFn; import com.epam.aidial.core.server.function.CollectDeploymentsFn; import com.epam.aidial.core.server.function.CollectRequestApplicationFilesFn; import com.epam.aidial.core.server.function.CollectRequestChatCompletionAttachmentsFn; import com.epam.aidial.core.server.function.CollectRequestDataFn; +import com.epam.aidial.core.server.function.CollectResponseChatCompletionAttachmentsFn; import com.epam.aidial.core.server.function.enhancement.ApplyDefaultDeploymentSettingsFn; import com.epam.aidial.core.server.function.enhancement.EnhanceModelRequestFn; import com.epam.aidial.core.server.limiter.RateLimitResult; import com.epam.aidial.core.server.service.PermissionDeniedException; +import com.epam.aidial.core.server.sse.SseEvent; import com.epam.aidial.core.server.token.TokenUsage; import com.epam.aidial.core.server.upstream.UpstreamRoute; import com.epam.aidial.core.server.util.ProxyUtil; @@ -26,6 +29,7 @@ import com.epam.aidial.core.storage.exception.ResourceNotFoundException; import com.epam.aidial.core.storage.http.HttpException; import com.epam.aidial.core.storage.http.HttpStatus; +import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.common.annotations.VisibleForTesting; import io.netty.buffer.ByteBufInputStream; @@ -41,6 +45,7 @@ import java.io.InputStream; import java.util.List; +import java.util.function.Supplier; @Slf4j public class DeploymentPostController extends BaseDeploymentPostController { @@ -281,7 +286,9 @@ private void handleProxyResponse(HttpClientResponse proxyResponse) { upstreamRoute.fail(proxyResponse); } - BufferingReadStream responseStream = createResponseStream(proxyResponse); + Supplier eventListenerSupplier = () -> + new ChatCompletionSseListener(new CollectResponseChatCompletionAttachmentsFn(proxy, context)); + BufferingReadStream responseStream = createResponseStream(proxyResponse, eventListenerSupplier); context.setProxyResponse(proxyResponse); context.setProxyResponseTimestamp(System.currentTimeMillis()); @@ -387,4 +394,27 @@ private void handleResponseError(Throwable error, BufferingReadStream responseSt context.getProxyRequest().reset(); } } + + public static class ChatCompletionSseListener extends BufferingReadStream.BaseEventListener { + + public static final String CHAT_COMPLETION_FINAL_MESSAGE = "[DONE]"; + + public ChatCompletionSseListener(BaseResponseFunction function) { + super(function); + } + + @Override + protected boolean isLastEvent(SseEvent event, JsonNode data) { + return isFinalEvent(event); + } + + @Override + protected boolean skipEvent(SseEvent event) { + return isFinalEvent(event); + } + + private static boolean isFinalEvent(SseEvent event) { + return CHAT_COMPLETION_FINAL_MESSAGE.equals(event.getData()); + } + } } diff --git a/server/src/main/java/com/epam/aidial/core/server/controller/InterceptorController.java b/server/src/main/java/com/epam/aidial/core/server/controller/InterceptorController.java index 01641a268..14af8d3ae 100644 --- a/server/src/main/java/com/epam/aidial/core/server/controller/InterceptorController.java +++ b/server/src/main/java/com/epam/aidial/core/server/controller/InterceptorController.java @@ -8,6 +8,7 @@ import com.epam.aidial.core.server.function.BaseRequestFunction; import com.epam.aidial.core.server.function.CollectRequestChatCompletionAttachmentsFn; import com.epam.aidial.core.server.function.CollectRequestDataFn; +import com.epam.aidial.core.server.function.CollectResponseChatCompletionAttachmentsFn; import com.epam.aidial.core.server.function.enhancement.ApplyDefaultDeploymentSettingsFn; import com.epam.aidial.core.server.util.ProxyUtil; import com.epam.aidial.core.server.vertx.stream.BufferingReadStream; @@ -27,6 +28,7 @@ import java.io.InputStream; import java.util.List; +import java.util.function.Supplier; @Slf4j public class InterceptorController extends BaseDeploymentPostController { @@ -143,7 +145,9 @@ private void handleProxyResponse(HttpClientResponse proxyResponse) { context.getDeployment().getEndpoint(), proxyResponse.statusCode(), proxyResponse.headers().size()); - BufferingReadStream responseStream = createResponseStream(proxyResponse); + Supplier eventListenerSupplier = () -> + new DeploymentPostController.ChatCompletionSseListener(new CollectResponseChatCompletionAttachmentsFn(proxy, context)); + BufferingReadStream responseStream = createResponseStream(proxyResponse, eventListenerSupplier); context.setProxyResponse(proxyResponse); context.setProxyResponseTimestamp(System.currentTimeMillis()); diff --git a/server/src/main/java/com/epam/aidial/core/server/controller/ResponsesController.java b/server/src/main/java/com/epam/aidial/core/server/controller/ResponsesController.java index 07cd1c0c2..95df1107a 100644 --- a/server/src/main/java/com/epam/aidial/core/server/controller/ResponsesController.java +++ b/server/src/main/java/com/epam/aidial/core/server/controller/ResponsesController.java @@ -9,8 +9,11 @@ import com.epam.aidial.core.server.ProxyContext; import com.epam.aidial.core.server.data.ApiKeyData; import com.epam.aidial.core.server.function.BaseRequestFunction; +import com.epam.aidial.core.server.function.BaseResponseFunction; +import com.epam.aidial.core.server.function.CollectResponseChatCompletionAttachmentsFn; import com.epam.aidial.core.server.function.enhancement.EnhanceModelRequestFn; import com.epam.aidial.core.server.service.PermissionDeniedException; +import com.epam.aidial.core.server.sse.SseEvent; import com.epam.aidial.core.server.token.TokenUsage; import com.epam.aidial.core.server.upstream.UpstreamRoute; import com.epam.aidial.core.server.util.ProxyUtil; @@ -18,6 +21,7 @@ import com.epam.aidial.core.storage.exception.ResourceNotFoundException; import com.epam.aidial.core.storage.http.HttpException; import com.epam.aidial.core.storage.http.HttpStatus; +import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.ObjectNode; import io.netty.buffer.ByteBufInputStream; import io.vertx.core.Future; @@ -33,6 +37,7 @@ import java.io.IOException; import java.io.InputStream; import java.util.List; +import java.util.function.Supplier; @Slf4j public class ResponsesController extends BaseDeploymentPostController { @@ -186,7 +191,8 @@ private void handleProxyResponse(HttpClientResponse proxyResponse) { upstreamRoute.fail(proxyResponse); } - BufferingReadStream responseStream = createResponseStream(proxyResponse); + Supplier eventListenerSupplier = ResponsesSseListener::new; + BufferingReadStream responseStream = createResponseStream(proxyResponse, eventListenerSupplier); context.setProxyResponse(proxyResponse); context.setProxyResponseTimestamp(System.currentTimeMillis()); @@ -276,4 +282,16 @@ private void handleResponseError(Throwable error, BufferingReadStream responseSt context.getProxyRequest().reset(); } } + + private static class ResponsesSseListener extends BufferingReadStream.BaseEventListener { + + public ResponsesSseListener() { + super(); + } + + @Override + protected boolean isLastEvent(SseEvent event, JsonNode data) { + return "response.incomplete".equals(event.getEvent()) || "response.completed".equals(event.getEvent()); + } + } } diff --git a/server/src/main/java/com/epam/aidial/core/server/controller/ToolSetProxyController.java b/server/src/main/java/com/epam/aidial/core/server/controller/ToolSetProxyController.java index f207bdd83..68ad10708 100644 --- a/server/src/main/java/com/epam/aidial/core/server/controller/ToolSetProxyController.java +++ b/server/src/main/java/com/epam/aidial/core/server/controller/ToolSetProxyController.java @@ -14,6 +14,8 @@ import com.epam.aidial.core.server.data.ApiKeyData; import com.epam.aidial.core.server.data.ErrorData; import com.epam.aidial.core.server.function.BaseRequestFunction; +import com.epam.aidial.core.server.function.BaseResponseFunction; +import com.epam.aidial.core.server.function.FilterAllowedToolsFn; import com.epam.aidial.core.server.function.enhancement.InjectApplicationPropsToMcpRequest; import com.epam.aidial.core.server.limiter.RateLimitResult; import com.epam.aidial.core.server.limiter.RateLimiter; @@ -24,6 +26,7 @@ import com.epam.aidial.core.server.service.ConsentService; import com.epam.aidial.core.server.service.DeploymentService; import com.epam.aidial.core.server.service.PermissionDeniedException; +import com.epam.aidial.core.server.sse.SseEvent; import com.epam.aidial.core.server.token.TokenStatsTracker; import com.epam.aidial.core.server.upstream.UpstreamRoute; import com.epam.aidial.core.server.upstream.UpstreamRouteProvider; @@ -68,8 +71,6 @@ @Slf4j public class ToolSetProxyController implements Controller { - private static final ArrayNode EMPTY_JSON_ARRAY = ProxyUtil.MAPPER.createArrayNode(); - private final String toolSetId; private final CredentialsLocator credentialsLocator; @@ -100,6 +101,8 @@ public class ToolSetProxyController implements Controller { private final List> enhancementFunctions; + private final Proxy proxy; + private String mcpMethodName; private boolean useAllowedTools; @@ -125,6 +128,7 @@ public ToolSetProxyController(Proxy proxy, ProxyContext context, String toolSetI this.resourceCredentialsService = proxy.getResourceCredentialsService(); this.applicationSchemaService = proxy.getApplicationSchemaService(); this.enhancementFunctions = List.of(new InjectApplicationPropsToMcpRequest(proxy, context)); + this.proxy = proxy; } @Override @@ -346,8 +350,14 @@ private void handleProxyResponse(HttpClientResponse proxyResponse) { } private void handleSseProxyResponse(HttpClientResponse proxyResponse) { + BufferingReadStream.BaseEventListener eventListener = null; + if (requireToolFiltering()) { + FilterAllowedToolsFn fn = new FilterAllowedToolsFn(proxy, context); + eventListener = new BufferingReadStream.BaseEventListener(fn); + } + BufferingReadStream proxyResponseStream = new BufferingReadStream(proxyResponse, - ProxyUtil.contentLength(proxyResponse, 1024)); + ProxyUtil.contentLength(proxyResponse, 1024), eventListener); context.setProxyResponse(proxyResponse); context.setResponseStream(proxyResponseStream); @@ -383,12 +393,12 @@ private void handleResponse() { } private void handleResponse(int responseStatus, Buffer proxyResponseBody) { - if ("tools/list".equalsIgnoreCase(mcpMethodName) && useAllowedTools) { + Future future; + if (requireToolFiltering()) { try (InputStream stream = new ByteBufInputStream(proxyResponseBody.getByteBuf())) { - ObjectNode tree = (ObjectNode) ProxyUtil.MAPPER.readTree(stream); - if (filterToolList(tree)) { - proxyResponseBody = Buffer.buffer(ProxyUtil.MAPPER.writeValueAsBytes(tree)); - } + JsonNode tree = ProxyUtil.MAPPER.readTree(stream); + FilterAllowedToolsFn fn = new FilterAllowedToolsFn(proxy, context); + future = fn.apply(tree).map(result -> Buffer.buffer(result.toString())); } catch (Throwable e) { if (e instanceof HttpException httpException) { respond(httpException.getStatus(), httpException.getMessage()); @@ -398,38 +408,21 @@ private void handleResponse(int responseStatus, Buffer proxyResponseBody) { log.warn("Can't process JSON response body. Error:", e); return; } + } else { + future = Future.succeededFuture(proxyResponseBody); } - context.setResponseBody(proxyResponseBody); - respond(responseStatus, proxyResponseBody); - logStore.save(context); - } - - private boolean filterToolList(ObjectNode body) { - ArrayNode tools = (ArrayNode) Optional.ofNullable(body.get("result")).map(result -> result.get("tools")) - .filter(JsonNode::isArray).orElse(EMPTY_JSON_ARRAY); - List allowedTools = getAllowedTools(context.getDeployment()); - if (allowedTools.isEmpty()) { - return false; - } - boolean modified = false; - for (Iterator iter = tools.iterator(); iter.hasNext();) { - JsonNode tool = iter.next(); - String name = tool.get("name").asText(); - if (!allowedTools.contains(name)) { - iter.remove(); - modified = true; - } - } - return modified; + future.onSuccess(result -> { + context.setResponseBody(result); + respond(responseStatus, result); + logStore.save(context); + }).onFailure(error -> { + log.error("Failed to handle MCP response body", error); + respond(HttpStatus.INTERNAL_SERVER_ERROR, "Failed to handle MCP response body"); + }); } - private List getAllowedTools(Deployment deployment) { - if (deployment instanceof ToolSet toolSet) { - return toolSet.getAllowedTools(); - } else if (deployment instanceof Application application) { - return application.getMcp().getAllowedTools(); - } - throw new IllegalArgumentException("Unsupported deployment type: " + deployment.getName()); + private boolean requireToolFiltering() { + return "tools/list".equalsIgnoreCase(mcpMethodName) && useAllowedTools; } private Future handleRateLimitSuccess(Deployment deployment) { @@ -562,4 +555,5 @@ protected void finalizeRequest() { }).onFailure(error -> log.error("error occurred on invalidating per-request key", error)); } } + } diff --git a/server/src/main/java/com/epam/aidial/core/server/controller/route/ApplicationRouteController.java b/server/src/main/java/com/epam/aidial/core/server/controller/route/ApplicationRouteController.java index 061eac8ef..66c230460 100644 --- a/server/src/main/java/com/epam/aidial/core/server/controller/route/ApplicationRouteController.java +++ b/server/src/main/java/com/epam/aidial/core/server/controller/route/ApplicationRouteController.java @@ -96,7 +96,7 @@ protected Future handleProxyResponseBody(Buffer responseBody) { try (InputStream stream = new ByteBufInputStream(responseBody.getByteBuf())) { ObjectNode tree = (ObjectNode) ProxyUtil.MAPPER.readTree(stream); var fn = new CollectResponseCustomAttachmentsFn(proxy, context); - return fn.apply(tree); + return fn.apply(tree).map(ignored -> null); } catch (Throwable e) { log.warn("Can't parse JSON response body. Error:", e); return Future.failedFuture(e); diff --git a/server/src/main/java/com/epam/aidial/core/server/function/BaseResponseFunction.java b/server/src/main/java/com/epam/aidial/core/server/function/BaseResponseFunction.java index 3e1ec5262..1ec41fb25 100644 --- a/server/src/main/java/com/epam/aidial/core/server/function/BaseResponseFunction.java +++ b/server/src/main/java/com/epam/aidial/core/server/function/BaseResponseFunction.java @@ -2,10 +2,10 @@ import com.epam.aidial.core.server.Proxy; import com.epam.aidial.core.server.ProxyContext; -import com.fasterxml.jackson.databind.node.ObjectNode; +import com.fasterxml.jackson.databind.JsonNode; import io.vertx.core.Future; -public abstract class BaseResponseFunction extends BaseFunction> { +public abstract class BaseResponseFunction extends BaseFunction> { public BaseResponseFunction(Proxy proxy, ProxyContext context) { super(proxy, context); } diff --git a/server/src/main/java/com/epam/aidial/core/server/function/CollectResponseAttachmentsFn.java b/server/src/main/java/com/epam/aidial/core/server/function/CollectResponseAttachmentsFn.java index 5d4e9e202..307cd90ed 100644 --- a/server/src/main/java/com/epam/aidial/core/server/function/CollectResponseAttachmentsFn.java +++ b/server/src/main/java/com/epam/aidial/core/server/function/CollectResponseAttachmentsFn.java @@ -9,6 +9,7 @@ import com.epam.aidial.core.server.util.ProxyUtil; import com.epam.aidial.core.storage.resource.ResourceDescriptor; import com.epam.aidial.core.storage.resource.ResourceUtil; +import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.ObjectNode; import io.vertx.core.Future; import lombok.extern.slf4j.Slf4j; @@ -24,10 +25,14 @@ public CollectResponseAttachmentsFn(Proxy proxy, ProxyContext context) { } @Override - public Future apply(ObjectNode tree) { + public Future apply(JsonNode tree) { + if (!tree.isObject()) { + return Future.succeededFuture(tree); + } + ObjectNode objectNode = (ObjectNode) tree; try { Map> permittedAttachments = new HashMap<>(); - Set attachments = collectAttachments(tree); + Set attachments = collectAttachments(objectNode); for (String attachment : attachments) { processAttachedFile(attachment, permittedAttachments); } @@ -37,7 +42,7 @@ public Future apply(ObjectNode tree) { String perRequestKey = context.getApiKeyData().getPerRequestKey(); return proxy.getTaskExecutor().submit(() -> { proxy.getApiKeyStore().updatePerRequestApiKey(perRequestKey, json -> updateAutoSharedAttachments(json, permittedAttachments, perRequestKey)); - return null; + return tree; }); } catch (Throwable e) { return Future.failedFuture(e); diff --git a/server/src/main/java/com/epam/aidial/core/server/function/FilterAllowedToolsFn.java b/server/src/main/java/com/epam/aidial/core/server/function/FilterAllowedToolsFn.java new file mode 100644 index 000000000..8c9ceabe3 --- /dev/null +++ b/server/src/main/java/com/epam/aidial/core/server/function/FilterAllowedToolsFn.java @@ -0,0 +1,56 @@ +package com.epam.aidial.core.server.function; + +import com.epam.aidial.core.config.Application; +import com.epam.aidial.core.config.Deployment; +import com.epam.aidial.core.config.ToolSet; +import com.epam.aidial.core.server.Proxy; +import com.epam.aidial.core.server.ProxyContext; +import com.epam.aidial.core.server.util.ProxyUtil; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ArrayNode; +import com.fasterxml.jackson.databind.node.ObjectNode; +import io.vertx.core.Future; + +import java.util.Iterator; +import java.util.List; +import java.util.Optional; + +public class FilterAllowedToolsFn extends BaseResponseFunction { + + private static final ArrayNode EMPTY_JSON_ARRAY = ProxyUtil.MAPPER.createArrayNode(); + + public FilterAllowedToolsFn(Proxy proxy, ProxyContext context) { + super(proxy, context); + } + + @Override + public Future apply(JsonNode jsonNode) { + if (!jsonNode.isObject()) { + return Future.succeededFuture(jsonNode); + } + ObjectNode body = (ObjectNode) jsonNode; + ArrayNode tools = (ArrayNode) Optional.ofNullable(body.get("result")).map(result -> result.get("tools")) + .filter(JsonNode::isArray).orElse(EMPTY_JSON_ARRAY); + List allowedTools = getAllowedTools(context.getDeployment()); + if (allowedTools.isEmpty()) { + return Future.succeededFuture(jsonNode); + } + for (Iterator iter = tools.iterator(); iter.hasNext();) { + JsonNode tool = iter.next(); + JsonNode name = tool.get("name"); + if (name != null && !allowedTools.contains(name.asText())) { + iter.remove(); + } + } + return Future.succeededFuture(jsonNode); + } + + private List getAllowedTools(Deployment deployment) { + if (deployment instanceof ToolSet toolSet) { + return toolSet.getAllowedTools(); + } else if (deployment instanceof Application application) { + return application.getMcp().getAllowedTools(); + } + throw new IllegalArgumentException("Unsupported deployment type: " + deployment.getName()); + } +} diff --git a/server/src/main/java/com/epam/aidial/core/server/sse/SseEvent.java b/server/src/main/java/com/epam/aidial/core/server/sse/SseEvent.java new file mode 100644 index 000000000..42b6ad556 --- /dev/null +++ b/server/src/main/java/com/epam/aidial/core/server/sse/SseEvent.java @@ -0,0 +1,66 @@ +package com.epam.aidial.core.server.sse; + +import lombok.Data; +import lombok.RequiredArgsConstructor; + +@Data +@RequiredArgsConstructor +public class SseEvent { + private final String event; // type, defaults to "message" + private final String data; // payload + private final String id; // last event id + private final Integer retry; // reconnection delay, optional + + public SseEvent copyWith(String data) { + return new SseEvent(this.event, data, this.id, this.retry); + } + + /** + * Serialize this SSEEvent into text/event-stream format. + * Lines are separated by '\n'. The event is terminated by a blank line. + */ + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + + // id field + if (id != null && id.indexOf('\0') == -1) { + sb.append("id: ").append(id).append('\n'); + } + + // event type (omit if "message" to follow default semantics) + if (event != null && !event.isEmpty() && !"message".equals(event)) { + sb.append("event: ").append(event).append('\n'); + } + + // retry field + if (retry != null) { + sb.append("retry: ").append(retry).append('\n'); + } + + // data field(s): split by newline and send one data line per segment + if (data != null) { + int start = 0; + int len = data.length(); + + for (int i = 0; i < len; i++) { + char c = data.charAt(i); + if (c == '\n') { + String line = data.substring(start, i); + sb.append("data: ").append(line).append('\n'); + start = i + 1; + } + } + // remaining part after the last '\n' (or entire string if no '\n') + if (start <= len) { + String line = data.substring(start); + sb.append("data: ").append(line).append('\n'); + } + } + + // end of event: blank line + sb.append('\n'); + + return sb.toString(); + } +} diff --git a/server/src/main/java/com/epam/aidial/core/server/sse/SseEventListener.java b/server/src/main/java/com/epam/aidial/core/server/sse/SseEventListener.java new file mode 100644 index 000000000..bc5658f75 --- /dev/null +++ b/server/src/main/java/com/epam/aidial/core/server/sse/SseEventListener.java @@ -0,0 +1,9 @@ +package com.epam.aidial.core.server.sse; + +public interface SseEventListener { + void onEvent(SseEvent event); + + void onComment(String comment); + + void onComplete(); +} diff --git a/server/src/main/java/com/epam/aidial/core/server/sse/SseParser.java b/server/src/main/java/com/epam/aidial/core/server/sse/SseParser.java new file mode 100644 index 000000000..fc652aa37 --- /dev/null +++ b/server/src/main/java/com/epam/aidial/core/server/sse/SseParser.java @@ -0,0 +1,185 @@ +package com.epam.aidial.core.server.sse; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; +import io.vertx.core.buffer.Buffer; +import lombok.extern.slf4j.Slf4j; + +import java.nio.charset.StandardCharsets; + +/** + * Server side event parser. + * + *

+ * Note. The class is not thread-safe since the parser processes all chunks sequentially. + *

+ */ +@Slf4j +public class SseParser { + + private static final byte[] BOM = new byte[]{(byte) 0xEF, (byte) 0xBB, (byte) 0xBF}; + + private final SseEventListener listener; + private final ByteBuf chunkBuffer; + + // Current event state + private final StringBuilder dataBuffer = new StringBuilder(); + private String eventType = null; + private String lastEventId = null; + private Integer retry = null; + + private boolean firstChunk = true; + + public SseParser(int initialChunkBufferSize, SseEventListener listener) { + this.listener = listener; + chunkBuffer = ByteBufAllocator.DEFAULT.heapBuffer(initialChunkBufferSize, Integer.MAX_VALUE); + } + + public void parse(Buffer chunk) { + int len = chunk.length(); + try { + for (int i = skipBom(chunk); i < len; i++) { + byte b = chunk.getByte(i); + if (b == '\r') { + if (i + 1 < len && chunk.getByte(i + 1) == '\n') { + i++; + } + processLine(); + } else if (b == '\n') { + processLine(); + } else { + // accumulate buffer + chunkBuffer.writeByte(b); + } + } + } catch (Throwable error) { + log.error("Error occurred at parsing chunk", error); + } + } + + /** + * Call this when the stream is known to be finished/closed. + * It will flush any remaining partial line and event. + */ + public void finish() { + try { + processLine(); + // Emit pending event if any + emitEventIfNeeded(); + listener.onComplete(); + } catch (Throwable error) { + log.error("Error occurred at finishing SSE stream", error); + } + } + + private void processLine() { + String line = bufferToString(); + chunkBuffer.clear(); + + if (line.isEmpty()) { + emitEventIfNeeded(); + resetEvent(); + return; + } + // Comment line starting with ":" + if (line.charAt(0) == ':') { + listener.onComment(line.substring(1)); + // Ignore comment + return; + } + + // Parse "field: value" + String field; + String value; + + int colonIndex = line.indexOf(':'); + if (colonIndex == -1) { + field = line; + value = ""; + } else { + field = line.substring(0, colonIndex); + value = line.substring(colonIndex + 1); + if (value.startsWith(" ")) { + value = value.substring(1); + } + } + + switch (field) { + case "data": + if (!dataBuffer.isEmpty()) { + dataBuffer.append('\n'); + } + dataBuffer.append(value); + break; + case "event": + eventType = value; + break; + case "id": + // Ignore if contains NUL + if (value.indexOf('\0') == -1) { + lastEventId = value; + } + break; + case "retry": + try { + retry = Integer.parseInt(value); + } catch (NumberFormatException ignored) { + // ignore non-integer retry + } + break; + default: + // unknown field: ignore or extend Event if you need to store them + break; + } + } + + private void emitEventIfNeeded() { + // Skip if nothing meaningful + if (dataBuffer.isEmpty() && eventType == null && lastEventId == null && retry == null) { + return; + } + + String type = (eventType != null) ? eventType : "message"; + String data = dataBuffer.toString(); + SseEvent event = new SseEvent(type, data, lastEventId, retry); + try { + listener.onEvent(event); + } catch (Throwable e) { + // ignored + } + } + + /** + * Reset event-scoped fields. + * Note: lastEventId and retry are client state and are kept. + */ + private void resetEvent() { + dataBuffer.setLength(0); + eventType = null; + // lastEventId and retry intentionally persist + } + + + private String bufferToString() { + return StandardCharsets.UTF_8.decode(chunkBuffer.nioBuffer()).toString(); + } + + /** + * BOM + */ + private int skipBom(Buffer chunk) { + if (!firstChunk) { + return 0; + } + firstChunk = false; + if (chunk.length() < BOM.length) { + return 0; + } + for (int i = 0; i < BOM.length; i++) { + if (chunk.getByte(i) != BOM[i]) { + return 0; + } + } + return BOM.length; + } +} diff --git a/server/src/main/java/com/epam/aidial/core/server/util/EventStreamParser.java b/server/src/main/java/com/epam/aidial/core/server/util/EventStreamParser.java deleted file mode 100644 index 565a30444..000000000 --- a/server/src/main/java/com/epam/aidial/core/server/util/EventStreamParser.java +++ /dev/null @@ -1,180 +0,0 @@ -package com.epam.aidial.core.server.util; - -import com.epam.aidial.core.server.function.BaseResponseFunction; -import com.fasterxml.jackson.databind.node.ObjectNode; -import io.netty.buffer.ByteBuf; -import io.netty.buffer.ByteBufAllocator; -import io.vertx.core.Future; -import io.vertx.core.buffer.Buffer; -import jodd.io.CharBufferReader; -import lombok.extern.slf4j.Slf4j; - -import java.nio.charset.StandardCharsets; -import java.util.ArrayList; -import java.util.List; - -@Slf4j -public class EventStreamParser { - - private static final byte[] EVENT_TOKEN = "data: ".getBytes(StandardCharsets.UTF_8); - private static final byte[] DONE_TOKEN = "[DONE]".getBytes(StandardCharsets.UTF_8); - private static final byte[] BOM = new byte[]{(byte) 0xEF, (byte) 0xBB, (byte) 0xBF}; - - private static final BaseResponseFunction DEFAULT_HANDLER = new BaseResponseFunction(null, null) { - @Override - public Future apply(ObjectNode jsonNodes) { - return Future.succeededFuture(); - } - }; - - private final ByteBuf buffer; - - private int eventIndex; - - private int chunkIndex; - - private boolean firstChunk = true; - - private boolean lastChunk = false; - - private final BaseResponseFunction handler; - - private Stages stage; - - private List> futures; - - private enum Stages { - EVENT, DATA, EOL - } - - - public EventStreamParser(int initialSizeHint, BaseResponseFunction handler) { - this.handler = handler == null ? DEFAULT_HANDLER : handler; - buffer = ByteBufAllocator.DEFAULT.heapBuffer(initialSizeHint, Integer.MAX_VALUE); - } - - public synchronized Future parse(Buffer chunk) { - if (lastChunk) { - return Future.succeededFuture(true); - } - chunkIndex = 0; - futures = null; - if (firstChunk) { - chunkIndex += skipBom(chunk); - firstChunk = false; - stage = Stages.EVENT; - } - - try { - while (chunkIndex < chunk.length()) { - switch (stage) { - case EVENT -> handleEventStage(chunk); - case DATA -> handleDataStage(chunk); - case EOL -> handleEndOfLineStage(chunk); - default -> throw new IllegalStateException("unknown stage + " + stage); - } - if (lastChunk) { - break; - } - } - } catch (Throwable e) { - log.error("Error occurred at parsing chunk", e); - return Future.failedFuture(e); - } - - if (futures == null) { - return Future.succeededFuture(lastChunk); - } - boolean localLastChunk = lastChunk; - return Future.join(futures).transform(ignore -> Future.succeededFuture(localLastChunk)); - } - - private void handleEventStage(Buffer chunk) { - for (; eventIndex < EVENT_TOKEN.length && chunkIndex < chunk.length(); chunkIndex++, eventIndex++) { - if (EVENT_TOKEN[eventIndex] != chunk.getByte(chunkIndex)) { - throw new IllegalArgumentException("Bad event"); - } - } - - if (eventIndex == EVENT_TOKEN.length) { - eventIndex = 0; - stage = Stages.DATA; - } - } - - private void handleDataStage(Buffer chunk) { - boolean eol = accumulateBuffer(chunk); - if (eol) { - boolean done = isLastMessage(); - if (done) { - lastChunk = true; - return; - } - if (futures == null) { - futures = new ArrayList<>(); - } - try (CharBufferReader reader = toCharBufferReader()) { - ObjectNode tree = (ObjectNode) ProxyUtil.MAPPER.readTree(reader); - Future future = handler.apply(tree) - .onFailure(error -> log.warn("Error occurred at handling json data from chunk", error)); - futures.add(future); - } catch (Throwable e) { - log.warn("Error occurred at parsing json data from chunk", e); - } finally { - buffer.clear(); - stage = Stages.EOL; - } - } - } - - private CharBufferReader toCharBufferReader() { - return new CharBufferReader(StandardCharsets.UTF_8.decode(buffer.nioBuffer())); - } - - private boolean accumulateBuffer(Buffer chunk) { - for (; chunkIndex < chunk.length(); chunkIndex++) { - byte b = chunk.getByte(chunkIndex); - if (b == '\n' || b == '\r') { - return true; - } - buffer.writeByte(b); - } - return false; - } - - private boolean isLastMessage() { - if (buffer.readableBytes() == DONE_TOKEN.length) { - int j = 0; - for (; j < DONE_TOKEN.length; j++) { - if (buffer.getByte(j) != DONE_TOKEN[j]) { - break; - } - } - return j == DONE_TOKEN.length; - } - return false; - } - - private void handleEndOfLineStage(Buffer chunk) { - for (; chunkIndex < chunk.length(); chunkIndex++) { - byte b = chunk.getByte(chunkIndex); - if (b == '\n' || b == '\r') { - continue; - } - stage = Stages.EVENT; - break; - } - } - - private int skipBom(Buffer chunk) { - if (chunk.length() < BOM.length) { - return 0; - } - for (int i = 0; i < BOM.length; i++) { - if (chunk.getByte(i) != BOM[i]) { - return 0; - } - } - return BOM.length; - } -} diff --git a/server/src/main/java/com/epam/aidial/core/server/vertx/stream/BufferingReadStream.java b/server/src/main/java/com/epam/aidial/core/server/vertx/stream/BufferingReadStream.java index fdb7bb4f7..8a75ec400 100644 --- a/server/src/main/java/com/epam/aidial/core/server/vertx/stream/BufferingReadStream.java +++ b/server/src/main/java/com/epam/aidial/core/server/vertx/stream/BufferingReadStream.java @@ -1,7 +1,11 @@ package com.epam.aidial.core.server.vertx.stream; import com.epam.aidial.core.server.function.BaseResponseFunction; -import com.epam.aidial.core.server.util.EventStreamParser; +import com.epam.aidial.core.server.sse.SseEvent; +import com.epam.aidial.core.server.sse.SseEventListener; +import com.epam.aidial.core.server.sse.SseParser; +import com.epam.aidial.core.server.util.ProxyUtil; +import com.fasterxml.jackson.databind.JsonNode; import io.vertx.core.Future; import io.vertx.core.Handler; import io.vertx.core.Promise; @@ -11,8 +15,12 @@ import io.vertx.core.streams.ReadStream; import io.vertx.core.streams.impl.PipeImpl; import lombok.Getter; +import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; +import java.util.Objects; +import javax.annotation.Nullable; + @Slf4j @Getter public class BufferingReadStream implements ReadStream { @@ -27,30 +35,26 @@ public class BufferingReadStream implements ReadStream { private Throwable error; private boolean ended; private boolean reset; - // set the position to unset by default - private int lastChunkPos = -1; - private final EventStreamParser eventStreamParser; - // a chain of futures supplied by eventStreamParser - private Future streamHandlerFuture; + private final SseParser eventStreamParser; + private final BaseEventListener eventListener; // promise on input stream is completed private final Promise endStream; - public BufferingReadStream(ReadStream stream) { - this(stream, 512, null); - } - public BufferingReadStream(ReadStream stream, int initialSize) { this(stream, initialSize, null); } - public BufferingReadStream(ReadStream stream, int initialSize, BaseResponseFunction streamHandler) { + public BufferingReadStream(ReadStream stream, int initialSize, @Nullable BaseEventListener listener) { this.stream = stream; this.content = Buffer.buffer(initialSize); this.endStream = Promise.promise(); - if (streamHandler == null) { + this.eventListener = listener; + if (listener == null) { this.eventStreamParser = null; } else { - this.eventStreamParser = new EventStreamParser(512, streamHandler); + listener.chunkHandler(this::notifyOnChunk); + listener.chunkEndHandler(this::handleEndInternal); + this.eventStreamParser = new SseParser(512, listener); } stream.handler(this::handleChunk); @@ -131,9 +135,12 @@ public synchronized ReadStream endHandler(Handler handler) { return this; } - public synchronized void end(HttpServerResponse response) { - if (lastChunkPos != -1) { - Buffer lastChunk = content.slice(lastChunkPos, content.length()); + public void end(HttpServerResponse response) { + Buffer lastChunk = null; + if (eventListener != null) { + lastChunk = eventListener.lastChunk; + } + if (lastChunk != null) { response.end(lastChunk); } else { response.end(); @@ -145,53 +152,28 @@ public Future endStreamFuture() { } private synchronized void handleChunk(Buffer chunk) { - int pos = content.length(); content.appendBuffer(chunk); - if (lastChunkPos != -1) { - // stop streaming - return; - } if (eventStreamParser != null) { - // build chain of chunk futures: the chunks should be sent in the same order as they arrive - if (streamHandlerFuture == null) { - streamHandlerFuture = parseChunk(chunk, pos); - } else { - streamHandlerFuture = streamHandlerFuture.transform(ignore -> parseChunk(chunk, pos)); - } + eventStreamParser.parse(chunk); } else { notifyOnChunk(chunk); } } - private synchronized Future parseChunk(Buffer chunk, int pos) { - return eventStreamParser.parse(chunk) - .andThen(result -> handleStreamEvent(chunk, result.result() == Boolean.TRUE, pos)); - } - - private synchronized void handleStreamEvent(Buffer chunk, boolean isLastChunk, int pos) { - if (isLastChunk) { - if (lastChunkPos == -1) { - lastChunkPos = pos; - } - // don't send the last chunk - return; - } - notifyOnChunk(chunk); - } - private synchronized void handleEnd(Void ignored) { ended = true; - if (streamHandlerFuture == null) { - endStream.tryComplete(); - notifyOnEnd(ignored); + if (eventListener == null) { + handleEndInternal(ignored); } else { - streamHandlerFuture.onComplete(ignore -> { - endStream.tryComplete(); - notifyOnEnd(ignored); - }); + eventStreamParser.finish(); } } + private void handleEndInternal(Void ignored) { + endStream.tryComplete(); + notifyOnEnd(ignored); + } + private synchronized void handleException(Throwable exception) { error = exception; ended = true; @@ -228,4 +210,110 @@ private void notifyOnException(Throwable throwable) { } } } + + @Slf4j + public static class BaseEventListener implements SseEventListener { + @Nullable + private final BaseResponseFunction function; + // a chain of futures supplied by SSE parser + private Future streamHandlerChain = Future.succeededFuture(); + private Handler chunkHandler; + private Handler chunkEndHandler; + private volatile Buffer lastChunk; + + public BaseEventListener() { + function = null; + } + + public BaseEventListener(BaseResponseFunction function) { + this.function = function; + } + + @Override + public void onEvent(SseEvent event) { + try { + streamHandlerChain = streamHandlerChain.transform(ignore -> handle(event)); + } catch (Throwable e) { + log.error("Error occurred at handling SSE event", e); + } + } + + @Override + public void onComment(String comment) { + Buffer chunk = Buffer.buffer(":" + comment + "\n"); + // we don't enforce sending order for comments + chunkHandler.handle(chunk); + } + + public void chunkHandler(Handler handler) { + this.chunkHandler = Objects.requireNonNull(handler, "Chunk handler must not be null"); + } + + public void chunkEndHandler(Handler handler) { + chunkEndHandler = Objects.requireNonNull(handler, "Chunk end handler must not be null"); + } + + @Override + public void onComplete() { + streamHandlerChain.onComplete(ignored -> chunkEndHandler.handle(null)); + } + + @SneakyThrows + private Future handle(SseEvent event) { + Future result; + if (function == null || skipEvent(event)) { + result = Future.succeededFuture(); + } else { + String data = event.getData(); + try { + JsonNode tree = ProxyUtil.MAPPER.readTree(data); + result = function.apply(tree); + } catch (Throwable error) { + log.warn("Error occurred at JSON data parsing of SSE data and function calling", error); + result = Future.failedFuture(error); + } + } + return result.recover(error -> { + log.warn("Function call is failed with error. Try to recover SSE event", error); + return recover(error, event); + }).map(json -> send(event, json)); + } + + protected Future recover(Throwable error, SseEvent event) { + // default logic for recovering + return Future.succeededFuture(); + } + + private Void send(SseEvent event, @Nullable JsonNode tree) { + if (isLastEvent(event, tree)) { + // we send the last chunk later + lastChunk = to(event, tree); + } else if (lastChunk == null) { + Buffer chunk = to(event, tree); + chunkHandler.handle(chunk); + } + return null; + } + + private static Buffer to(SseEvent originalEvent, @Nullable JsonNode tree) { + String rawEvent; + if (tree == null) { + rawEvent = originalEvent.toString(); + } else { + String json = tree.toString(); + SseEvent event = originalEvent.copyWith(json); + rawEvent = event.toString(); + } + return Buffer.buffer(rawEvent); + } + + protected boolean isLastEvent(SseEvent event, @Nullable JsonNode tree) { + return false; + } + + protected boolean skipEvent(SseEvent event) { + return false; + } + + } } diff --git a/server/src/test/java/com/epam/aidial/core/server/sse/SseParserTest.java b/server/src/test/java/com/epam/aidial/core/server/sse/SseParserTest.java new file mode 100644 index 000000000..6a2f2c3e2 --- /dev/null +++ b/server/src/test/java/com/epam/aidial/core/server/sse/SseParserTest.java @@ -0,0 +1,135 @@ +package com.epam.aidial.core.server.sse; + +import io.vertx.core.buffer.Buffer; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +public class SseParserTest { + + private static class TestSseEventListener implements SseEventListener { + + StringBuilder response = new StringBuilder(); + + @Override + public void onEvent(SseEvent event) { + response.append(event.toString()); + } + + @Override + public void onComment(String comment) { + response.append(':').append(comment).append('\n'); + } + + @Override + public void onComplete() { + // do nothing + } + } + + @Test + public void testParse_00() { + TestSseEventListener listener = new TestSseEventListener(); + SseParser parser = new SseParser(128, listener); + + parser.parse(Buffer.buffer("data")); + parser.parse(Buffer.buffer(": {\"name\": \"assds")); + parser.parse(Buffer.buffer("dsdsa 你好。답답해 123 ")); + parser.parse(Buffer.buffer("\"}\r")); + parser.parse(Buffer.buffer("\n\rda")); + parser.parse(Buffer.buffer("ta: {\"value\": 56, \"text\": \"[DONE]\"}\n\r")); + parser.parse(Buffer.buffer("data: [DONE]\r\r")); + parser.finish(); + + String response = """ + data: {"name": "assdsdsdsa 你好。답답해 123 "} + + data: {"value": 56, "text": "[DONE]"} + + data: [DONE] + + """; + assertEquals(response, listener.response.toString()); + } + + @Test + public void testParse_01() { + TestSseEventListener listener = new TestSseEventListener(); + SseParser parser = new SseParser(128, listener); + + parser.parse(Buffer.buffer("data")); + parser.parse(Buffer.buffer(": {\"name\": \"assds")); + parser.parse(Buffer.buffer("dsdsa 你好。답답해 123 ")); + parser.parse(Buffer.buffer("\"}\r")); + parser.parse(Buffer.buffer("\n\nda")); + parser.parse(Buffer.buffer("ta: {\"value\": 56, \"text\": \"[DONE]\"}\n\r")); + parser.parse(Buffer.buffer("data: [D")); + parser.parse(Buffer.buffer("ONE]")); + parser.finish(); + + String response = """ + data: {"name": "assdsdsdsa 你好。답답해 123 "} + + data: {"value": 56, "text": "[DONE]"} + + data: [DONE] + + """; + assertEquals(response, listener.response.toString()); + } + + @Test + public void testParse_02() { + TestSseEventListener listener = new TestSseEventListener(); + SseParser parser = new SseParser(128, listener); + + //BOM + parser.parse(Buffer.buffer(new byte[]{(byte) 0xEF, (byte) 0xBB, (byte) 0xBF})); + + parser.parse(Buffer.buffer("data")); + parser.parse(Buffer.buffer(": {\"name\": \"assds")); + parser.parse(Buffer.buffer("dsdsa 你好。답답해 123 ")); + parser.parse(Buffer.buffer("\"}\r")); + parser.parse(Buffer.buffer("\n\nda")); + parser.parse(Buffer.buffer("ta: {\"value\": 56, \"text\": \"[DONE]\"}\n\r")); + parser.parse(Buffer.buffer("data: [D")); + parser.parse(Buffer.buffer("ONE]")); + parser.finish(); + + String response = """ + data: {"name": "assdsdsdsa 你好。답답해 123 "} + + data: {"value": 56, "text": "[DONE]"} + + data: [DONE] + + """; + assertEquals(response, listener.response.toString()); + } + + @Test + public void testParse_03() { + TestSseEventListener listener = new TestSseEventListener(); + SseParser parser = new SseParser(128, listener); + + parser.parse(Buffer.buffer("data")); + parser.parse(Buffer.buffer(": hi\r")); + parser.parse(Buffer.buffer("event: greeting")); + parser.parse(Buffer.buffer("\r")); + parser.parse(Buffer.buffer("\nda")); + parser.parse(Buffer.buffer("ta: 1 + 1 = ?\r")); + parser.parse(Buffer.buffer("event: ques")); + parser.parse(Buffer.buffer("tion\n\r")); + parser.finish(); + + String response = """ + event: greeting + data: hi + + event: question + data: 1 + 1 = ? + + """; + assertEquals(response, listener.response.toString()); + } +} diff --git a/server/src/test/java/com/epam/aidial/core/server/util/EventStreamParserTest.java b/server/src/test/java/com/epam/aidial/core/server/util/EventStreamParserTest.java deleted file mode 100644 index 7e591fe1b..000000000 --- a/server/src/test/java/com/epam/aidial/core/server/util/EventStreamParserTest.java +++ /dev/null @@ -1,74 +0,0 @@ -package com.epam.aidial.core.server.util; - -import com.epam.aidial.core.server.function.BaseResponseFunction; -import com.fasterxml.jackson.databind.node.ObjectNode; -import io.vertx.core.Future; -import io.vertx.core.buffer.Buffer; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; - -import java.util.ArrayList; -import java.util.List; - -import static org.junit.jupiter.api.Assertions.assertEquals; - -public class EventStreamParserTest { - - private final List responses = new ArrayList<>(); - - private final BaseResponseFunction fn = new BaseResponseFunction(null, null) { - @Override - public Future apply(ObjectNode json) { - responses.add(json.toString()); - return Future.succeededFuture(); - } - }; - - @BeforeEach - public void beforeEach() { - responses.clear(); - } - - @Test - public void testHandleChunk_00() { - EventStreamParser parser = new EventStreamParser(20, fn); - parser.parse(Buffer.buffer("data")); - parser.parse(Buffer.buffer(": {\"name\": \"assds")); - parser.parse(Buffer.buffer("dsdsa 你好。답답해 123 ")); - parser.parse(Buffer.buffer("\"}\r")); - parser.parse(Buffer.buffer("\nda")); - parser.parse(Buffer.buffer("ta: {\"value\": 56, \"text\": \"[DONE]\"}\n")); - parser.parse(Buffer.buffer("data: [DONE]\r")); - parser.parse(Buffer.buffer("\ndata: ops one more data\n")); - assertEquals(List.of("{\"name\":\"assdsdsdsa 你好。답답해 123 \"}", "{\"value\":56,\"text\":\"[DONE]\"}"), responses); - } - - @Test - public void testHandleChunk_01() { - EventStreamParser parser = new EventStreamParser(100, fn); - parser.parse(Buffer.buffer("data")); - parser.parse(Buffer.buffer(": {\"name\": \"assds")); - parser.parse(Buffer.buffer("dsdsa 你好。답답해 123 ")); - parser.parse(Buffer.buffer("\"}\r")); - parser.parse(Buffer.buffer("\nda")); - parser.parse(Buffer.buffer("ta: {\"value\": 56, \"text\": \"[DONE]\"}\n")); - parser.parse(Buffer.buffer("data: [D")); - parser.parse(Buffer.buffer("ONE]\r")); - parser.parse(Buffer.buffer("\ndata: ops one more data\n")); - assertEquals(List.of("{\"name\":\"assdsdsdsa 你好。답답해 123 \"}", "{\"value\":56,\"text\":\"[DONE]\"}"), responses); - } - - @Test - public void testHandleChunkWithBom() { - EventStreamParser parser = new EventStreamParser(100, fn); - parser.parse(Buffer.buffer(new byte[]{(byte) 0xEF, (byte) 0xBB, (byte) 0xBF}).appendBuffer(Buffer.buffer("data"))); - parser.parse(Buffer.buffer(": {\"name\": \"assds")); - parser.parse(Buffer.buffer("dsdsa 你好。답답해 123 ")); - parser.parse(Buffer.buffer("\"}\r")); - parser.parse(Buffer.buffer("\nda")); - parser.parse(Buffer.buffer("ta: {\"value\": 56, \"text\": \"[DONE]\"}\n")); - parser.parse(Buffer.buffer("data: [DONE]\r")); - parser.parse(Buffer.buffer("\ndata: ops one more data\n")); - assertEquals(List.of("{\"name\":\"assdsdsdsa 你好。답답해 123 \"}", "{\"value\":56,\"text\":\"[DONE]\"}"), responses); - } -}