From 6a5eb4366f5b6b6abbf55511ddc09d289d66ca63 Mon Sep 17 00:00:00 2001 From: ewong5 <151794509+ewong5@users.noreply.github.com> Date: Fri, 15 Aug 2025 13:11:34 +1000 Subject: [PATCH 1/2] Improve logging with sending requests and fixed the hookOnSubscribe for the AggregateSubscriber to request unbounded items --- .../HttpClientStreamableHttpTransport.java | 33 ++++++++++++------- .../client/transport/ResponseSubscribers.java | 21 ++++++++++-- 2 files changed, 40 insertions(+), 14 deletions(-) diff --git a/mcp/src/main/java/io/modelcontextprotocol/client/transport/HttpClientStreamableHttpTransport.java b/mcp/src/main/java/io/modelcontextprotocol/client/transport/HttpClientStreamableHttpTransport.java index 93c28422a..133d8edc6 100644 --- a/mcp/src/main/java/io/modelcontextprotocol/client/transport/HttpClientStreamableHttpTransport.java +++ b/mcp/src/main/java/io/modelcontextprotocol/client/transport/HttpClientStreamableHttpTransport.java @@ -422,17 +422,23 @@ public Mono sendMessage(McpSchema.JSONRPCMessage sentMessage) { return Mono.from(this.httpRequestCustomizer.customize(builder, "POST", uri, jsonBody)); }).flatMapMany(requestBuilder -> Flux.create(responseEventSink -> { - // Create the async request with proper body subscriber selection - Mono.fromFuture(this.httpClient - .sendAsync(requestBuilder.build(), this.toSendMessageBodySubscriber(responseEventSink)) - .whenComplete((response, throwable) -> { - if (throwable != null) { - responseEventSink.error(throwable); - } - else { - logger.debug("SSE connection established successfully"); - } - })).onErrorMap(CompletionException.class, t -> t.getCause()).onErrorComplete().subscribe(); + // Create the async request with proper error handling and timeout + // The key insight: the response body is consumed by the BodySubscriber + // and flows through responseEventSink + // The CompletableFuture> completes when headers are + // received, not when body is consumed + Mono.fromFuture(() -> this.httpClient.sendAsync(requestBuilder.build(), + this.toSendMessageBodySubscriber(responseEventSink))) + .doOnSuccess(response -> { + logger.debug("Success: " + response.statusCode()); + }) + .doOnError(throwable -> { + logger.error("HTTP request failed with message {}", throwable.getMessage(), throwable); + // Ensure the sink gets the error if it hasn't been completed yet + responseEventSink.error(throwable); + }) + .onErrorMap(CompletionException.class, Throwable::getCause) + .subscribe(); })).flatMap(responseEvent -> { if (transportSession.markInitialized( @@ -500,6 +506,11 @@ else if (contentType.contains(APPLICATION_JSON)) { return Mono.empty(); } + if (!Utils.hasText(data)) { + deliveredSink.success(); + return Mono.empty(); + } + try { return Mono.just(McpSchema.deserializeJsonRpcMessage(objectMapper, data)); } diff --git a/mcp/src/main/java/io/modelcontextprotocol/client/transport/ResponseSubscribers.java b/mcp/src/main/java/io/modelcontextprotocol/client/transport/ResponseSubscribers.java index 296d1a17d..5fc84f85f 100644 --- a/mcp/src/main/java/io/modelcontextprotocol/client/transport/ResponseSubscribers.java +++ b/mcp/src/main/java/io/modelcontextprotocol/client/transport/ResponseSubscribers.java @@ -187,7 +187,7 @@ else if (line.startsWith(":")) { @Override protected void hookOnComplete() { - if (this.eventBuilder.length() > 0) { + if (!this.eventBuilder.isEmpty()) { String eventData = this.eventBuilder.toString(); SseEvent sseEvent = new SseEvent(currentEventId.get(), currentEventType.get(), eventData.trim()); this.sink.next(new SseResponseEvent(responseInfo, sseEvent)); @@ -233,9 +233,24 @@ public AggregateSubscriber(ResponseInfo responseInfo, FluxSink si @Override protected void hookOnSubscribe(Subscription subscription) { - sink.onRequest(subscription::request); + var contentLength = responseInfo.headers().firstValue("Content-Length"); + var useUnbounded = false; + if (contentLength.isPresent()) { + useUnbounded = Long.parseLong(contentLength.get()) > 0; + } + if (useUnbounded) { + sink.onRequest(n -> { + // Don't forward downstream requests directly - we manage our own + // requests + }); + + // Request unbounded items to consume the entire response body + subscription.request(Long.MAX_VALUE); + } + else { + sink.onRequest(subscription::request); + } - // Register disposal callback to cancel subscription when Flux is disposed sink.onDispose(subscription::cancel); } From fb107f6228684ffa535ea43ab6a56e2bac6d4c21 Mon Sep 17 00:00:00 2001 From: ewong5 <151794509+ewong5@users.noreply.github.com> Date: Fri, 15 Aug 2025 13:24:13 +1000 Subject: [PATCH 2/2] Remove unnecessary comments --- .../client/transport/HttpClientStreamableHttpTransport.java | 4 ---- 1 file changed, 4 deletions(-) diff --git a/mcp/src/main/java/io/modelcontextprotocol/client/transport/HttpClientStreamableHttpTransport.java b/mcp/src/main/java/io/modelcontextprotocol/client/transport/HttpClientStreamableHttpTransport.java index 133d8edc6..6308eb6d7 100644 --- a/mcp/src/main/java/io/modelcontextprotocol/client/transport/HttpClientStreamableHttpTransport.java +++ b/mcp/src/main/java/io/modelcontextprotocol/client/transport/HttpClientStreamableHttpTransport.java @@ -423,10 +423,6 @@ public Mono sendMessage(McpSchema.JSONRPCMessage sentMessage) { }).flatMapMany(requestBuilder -> Flux.create(responseEventSink -> { // Create the async request with proper error handling and timeout - // The key insight: the response body is consumed by the BodySubscriber - // and flows through responseEventSink - // The CompletableFuture> completes when headers are - // received, not when body is consumed Mono.fromFuture(() -> this.httpClient.sendAsync(requestBuilder.build(), this.toSendMessageBodySubscriber(responseEventSink))) .doOnSuccess(response -> {