From 9dbb89982816706c5710782ff08ffc7ec4b05caa Mon Sep 17 00:00:00 2001
From: Ryan Lamb <4955475+kinyoklion@users.noreply.github.com>
Date: Wed, 21 Jan 2026 09:17:43 -0800
Subject: [PATCH 1/2] feat: Add support for accessing response headers.
---
build.gradle.kts | 2 +-
.../eventsource/ConnectStrategy.java | 33 ++-
.../launchdarkly/eventsource/EventParser.java | 12 +-
.../launchdarkly/eventsource/EventSource.java | 22 +-
.../launchdarkly/eventsource/FaultEvent.java | 32 ++-
.../eventsource/HttpConnectStrategy.java | 15 +-
.../eventsource/MessageEvent.java | 81 +++++++-
.../eventsource/ResponseHeaders.java | 115 +++++++++++
.../eventsource/ResponseHeadersImpl.java | 85 ++++++++
.../eventsource/StreamHttpErrorException.java | 23 +++
.../eventsource/EventParserBaseTest.java | 3 +-
.../eventsource/HttpConnectStrategyTest.java | 161 +++++++++++++++
...ttpConnectStrategyWithEventSourceTest.java | 191 ++++++++++++++++++
13 files changed, 744 insertions(+), 31 deletions(-)
create mode 100644 src/main/java/com/launchdarkly/eventsource/ResponseHeaders.java
create mode 100644 src/main/java/com/launchdarkly/eventsource/ResponseHeadersImpl.java
diff --git a/build.gradle.kts b/build.gradle.kts
index 2bbe220..4c5f75f 100644
--- a/build.gradle.kts
+++ b/build.gradle.kts
@@ -69,7 +69,7 @@ dependencies {
api("com.launchdarkly:launchdarkly-logging:${Versions.launchdarklyLogging}")
api("com.squareup.okhttp3:okhttp:${Versions.okhttp}")
testImplementation("org.mockito:mockito-core:1.10.19")
- testImplementation("com.launchdarkly:test-helpers:2.0.1")
+ testImplementation("com.launchdarkly:test-helpers:2.1.0")
testImplementation("com.google.guava:guava:32.0.1-jre")
testImplementation("junit:junit:4.12")
testImplementation("org.hamcrest:hamcrest-all:1.3")
diff --git a/src/main/java/com/launchdarkly/eventsource/ConnectStrategy.java b/src/main/java/com/launchdarkly/eventsource/ConnectStrategy.java
index 31b3239..fe7f30c 100644
--- a/src/main/java/com/launchdarkly/eventsource/ConnectStrategy.java
+++ b/src/main/java/com/launchdarkly/eventsource/ConnectStrategy.java
@@ -65,18 +65,34 @@ public static class Result {
private final InputStream inputStream;
private final URI origin;
private final Closeable closer;
-
+ private final ResponseHeaders headers;
+
/**
* Creates an instance.
- *
+ *
* @param inputStream see {@link #getInputStream()}
* @param origin see {@link #getOrigin()}
* @param closer see {@link #getCloser()}
*/
public Result(InputStream inputStream, URI origin, Closeable closer) {
+ this(inputStream, origin, closer, null);
+ }
+
+ /**
+ * Creates an instance with response headers.
+ *
+ * @param inputStream see {@link #getInputStream()}
+ * @param origin see {@link #getOrigin()}
+ * @param closer see {@link #getCloser()}
+ * @param headers see {@link #getHeaders()}
+ *
+ * @since 4.2.0
+ */
+ public Result(InputStream inputStream, URI origin, Closeable closer, ResponseHeaders headers) {
this.inputStream = inputStream;
this.origin = origin;
this.closer = closer;
+ this.headers = headers;
}
/**
@@ -118,6 +134,19 @@ public URI getOrigin() {
public Closeable getCloser() {
return closer;
}
+
+ /**
+ * Returns the response headers from the connection, or null if not available.
+ *
+ * For HTTP connections, this contains the HTTP response headers. For other
+ * connection types, this may be null or may contain connection-specific metadata.
+ *
+ * @return the response headers, or null if not available
+ * @since 4.2.0
+ */
+ public ResponseHeaders getHeaders() {
+ return headers;
+ }
}
/**
diff --git a/src/main/java/com/launchdarkly/eventsource/EventParser.java b/src/main/java/com/launchdarkly/eventsource/EventParser.java
index 02bdffe..85d9473 100644
--- a/src/main/java/com/launchdarkly/eventsource/EventParser.java
+++ b/src/main/java/com/launchdarkly/eventsource/EventParser.java
@@ -50,6 +50,7 @@ final class EventParser {
private Set expectFields;
private final LDLogger logger;
private final URI origin;
+ private final ResponseHeaders headers;
private BufferedLineParser lineParser;
private byte[] chunkData;
@@ -77,14 +78,16 @@ final class EventParser {
int readBufferSize,
boolean streamEventData,
Set expectFields,
- LDLogger logger
+ LDLogger logger,
+ ResponseHeaders headers
) {
this.lineParser = new BufferedLineParser(inputStream, readBufferSize);
this.origin = origin;
this.streamEventData = streamEventData;
this.expectFields = expectFields;
this.logger = logger;
-
+ this.headers = headers;
+
dataBuffer = new ByteArrayOutputStream(VALUE_BUFFER_INITIAL_CAPACITY);
}
@@ -159,7 +162,7 @@ private StreamEvent tryNextEvent() throws StreamException {
}
String dataString = utf8ByteArrayOutputStreamToString(dataBuffer);
- MessageEvent message = new MessageEvent(eventName, dataString, lastEventId, origin);
+ MessageEvent message = new MessageEvent(eventName, dataString, lastEventId, origin, headers);
resetState();
logger.debug("Received message: {}", message);
return message;
@@ -193,7 +196,8 @@ private StreamEvent tryNextEvent() throws StreamException {
eventName,
new InputStreamReader(messageDataStream),
lastEventId,
- origin
+ origin,
+ headers
);
logger.debug("Received message: {}", message);
return message;
diff --git a/src/main/java/com/launchdarkly/eventsource/EventSource.java b/src/main/java/com/launchdarkly/eventsource/EventSource.java
index cf276d9..df69986 100644
--- a/src/main/java/com/launchdarkly/eventsource/EventSource.java
+++ b/src/main/java/com/launchdarkly/eventsource/EventSource.java
@@ -301,14 +301,19 @@ private FaultEvent tryStart(boolean canReturnFaultEvent) throws StreamException
if (applyErrorStrategy(exception) == ErrorStrategy.Action.CONTINUE) {
// The ErrorStrategy told us to CONTINUE rather than throwing an exception.
if (canReturnFaultEvent) {
- return new FaultEvent(exception);
+ // Extract headers from exception if available
+ ResponseHeaders headers = null;
+ if (exception instanceof StreamHttpErrorException) {
+ headers = ((StreamHttpErrorException) exception).getHeaders();
+ }
+ return new FaultEvent(exception, headers);
}
// If canReturnFaultEvent is false, it means the caller explicitly called start(),
// in which case there's no way to return a FaultEvent so we just keep retrying
// transparently.
continue;
}
- // The ErrorStrategy told us to THROW rather than CONTINUE.
+ // The ErrorStrategy told us to THROW rather than CONTINUE.
throw exception;
}
@@ -317,14 +322,16 @@ private FaultEvent tryStart(boolean canReturnFaultEvent) throws StreamException
origin = clientResult.getOrigin() == null ? client.getOrigin() : clientResult.getOrigin();
connectedTime = System.currentTimeMillis();
logger.debug("Connected to SSE stream");
-
+
+ ResponseHeaders headers = clientResult.getHeaders();
eventParser = new EventParser(
clientResult.getInputStream(),
clientResult.getOrigin(),
readBufferSize,
streamEventData,
expectFields,
- logger
+ logger,
+ headers
);
readyState.set(ReadyState.OPEN);
@@ -624,7 +631,12 @@ private StreamEvent requireEvent() throws StreamException {
eventParser = null;
computeReconnectDelay();
if (applyErrorStrategy(e) == ErrorStrategy.Action.CONTINUE) {
- return new FaultEvent(e);
+ // Extract headers from exception if available
+ ResponseHeaders headers = null;
+ if (e instanceof StreamHttpErrorException) {
+ headers = ((StreamHttpErrorException) e).getHeaders();
+ }
+ return new FaultEvent(e, headers);
}
throw e;
}
diff --git a/src/main/java/com/launchdarkly/eventsource/FaultEvent.java b/src/main/java/com/launchdarkly/eventsource/FaultEvent.java
index e48c690..c7e098e 100644
--- a/src/main/java/com/launchdarkly/eventsource/FaultEvent.java
+++ b/src/main/java/com/launchdarkly/eventsource/FaultEvent.java
@@ -20,16 +20,30 @@
*/
public final class FaultEvent implements StreamEvent {
private final StreamException cause;
-
+ private final ResponseHeaders headers;
+
/**
* Creates an instance.
*
* @param cause the cause of the failure
*/
public FaultEvent(StreamException cause) {
+ this(cause, null);
+ }
+
+ /**
+ * Creates an instance with response headers.
+ *
+ * @param cause the cause of the failure
+ * @param headers the response headers if available (typically for HTTP errors), or null
+ *
+ * @since 4.2.0
+ */
+ public FaultEvent(StreamException cause, ResponseHeaders headers) {
this.cause = cause;
+ this.headers = headers;
}
-
+
/**
* Returns a {@link StreamException} describing the cause of the failure.
*
@@ -38,6 +52,20 @@ public FaultEvent(StreamException cause) {
public StreamException getCause() {
return cause;
}
+
+ /**
+ * Returns the response headers if available, or null otherwise.
+ *
+ * For HTTP error responses (when the cause is a {@link StreamHttpErrorException}),
+ * this typically contains the HTTP response headers from the failed request.
+ * For other types of failures (I/O errors, timeouts, etc.), this may be null.
+ *
+ * @return the response headers, or null if not available
+ * @since 4.2.0
+ */
+ public ResponseHeaders getHeaders() {
+ return headers;
+ }
@Override
public boolean equals(Object o) {
diff --git a/src/main/java/com/launchdarkly/eventsource/HttpConnectStrategy.java b/src/main/java/com/launchdarkly/eventsource/HttpConnectStrategy.java
index 0d535a4..fcd9bfe 100644
--- a/src/main/java/com/launchdarkly/eventsource/HttpConnectStrategy.java
+++ b/src/main/java/com/launchdarkly/eventsource/HttpConnectStrategy.java
@@ -443,10 +443,10 @@ class Client extends ConnectStrategy.Client { // package-private visibility for
@Override
public Result connect(String lastEventId) throws StreamException {
logger.debug("Attempting to connect to SSE stream at {}", uri);
-
+
Request request = createRequest(lastEventId);
Call call = httpClient.newCall(request);
-
+
Response response;
try {
response = call.execute();
@@ -454,18 +454,21 @@ public Result connect(String lastEventId) throws StreamException {
logger.info("Connection failed: {}", LogValues.exceptionSummary(e));
throw new StreamIOException(e);
}
-
+
if (!response.isSuccessful()) {
+ ResponseHeaders headers = ResponseHeadersImpl.fromOkHttpHeaders(response.headers());
response.close();
logger.info("Server returned HTTP error {}", response.code());
- throw new StreamHttpErrorException(response.code());
+ throw new StreamHttpErrorException(response.code(), headers);
}
-
+
+ ResponseHeaders headers = ResponseHeadersImpl.fromOkHttpHeaders(response.headers());
ResponseBody responseBody = response.body();
return new Result(
responseBody.byteStream(),
uri,
- new RequestCloser(call)
+ new RequestCloser(call),
+ headers
);
}
diff --git a/src/main/java/com/launchdarkly/eventsource/MessageEvent.java b/src/main/java/com/launchdarkly/eventsource/MessageEvent.java
index 819aed0..0f9edb8 100644
--- a/src/main/java/com/launchdarkly/eventsource/MessageEvent.java
+++ b/src/main/java/com/launchdarkly/eventsource/MessageEvent.java
@@ -32,50 +32,69 @@ public class MessageEvent implements StreamEvent {
private final String eventName;
private final String lastEventId;
private final URI origin;
+ private final ResponseHeaders headers;
/**
* Simple constructor with event data only, using the default event name.
*
* This constructor assumes that the event data has been fully read into memory as a String.
- *
+ *
* @param data the event data; if null, will be changed to an empty string
*/
public MessageEvent(String data) {
- this(null, data, null, null);
+ this(null, data, null, null, null);
}
/**
* Constructs a new instance with the default event name.
*
* This constructor assumes that the event data has been fully read into memory as a String.
- *
+ *
* @param data the event data; if null, will be changed to an empty string
* @param lastEventId the event ID, or null if none
* @param origin the stream endpoint
*/
public MessageEvent(String data, String lastEventId, URI origin) {
- this(null, data, lastEventId, origin);
+ this(null, data, lastEventId, origin, null);
}
/**
* Constructs a new instance.
*
* This constructor assumes that the event data has been fully read into memory as a String.
- *
+ *
* @param eventName the event name; if null, {@link #DEFAULT_EVENT_NAME} is used
* @param data the event data; if null, will be changed to an empty string
* @param lastEventId the event ID, or null if none
* @param origin the stream endpoint
- *
+ *
* @since 2.6.0
*/
public MessageEvent(String eventName, String data, String lastEventId, URI origin) {
+ this(eventName, data, lastEventId, origin, null);
+ }
+
+ /**
+ * Constructs a new instance with response headers.
+ *
+ * This constructor assumes that the event data has been fully read into memory as a String.
+ *
+ * @param eventName the event name; if null, {@link #DEFAULT_EVENT_NAME} is used
+ * @param data the event data; if null, will be changed to an empty string
+ * @param lastEventId the event ID, or null if none
+ * @param origin the stream endpoint
+ * @param headers the response headers from the current connection, or null if not available
+ *
+ * @since 4.2.0
+ */
+ public MessageEvent(String eventName, String data, String lastEventId, URI origin, ResponseHeaders headers) {
this.eventName = eventName == null ? DEFAULT_EVENT_NAME : eventName;
this.data = data == null ? "" : data;
this.dataReader = null;
this.dataReaderLock = new Object();
this.lastEventId = lastEventId;
this.origin = origin;
+ this.headers = headers;
}
/**
@@ -85,12 +104,12 @@ public MessageEvent(String eventName, String data, String lastEventId, URI origi
* optimization that sometimes allows events to be processed without large buffers. The caller
* must be careful about using this model because the behavior of the reader is not idempotent;
* see {@link #getDataReader()}.
- *
+ *
* @param eventName an object that will provide the event name if requested
* @param dataReader a {@link Reader} for consuming the event data
* @param lastEventId an object that will provide the last event ID if requested
* @param origin the stream endpoint
- *
+ *
* @see #getDataReader()
* @since 2.6.0
*/
@@ -100,22 +119,50 @@ public MessageEvent(
String lastEventId,
URI origin
) {
+ this(eventName, dataReader, lastEventId, origin, null);
+ }
+
+ /**
+ * Constructs a new instance with lazy-loading behavior and response headers.
+ *
+ * This constructor takes a {@link Reader} instead of a String for the event data. This is an
+ * optimization that sometimes allows events to be processed without large buffers. The caller
+ * must be careful about using this model because the behavior of the reader is not idempotent;
+ * see {@link #getDataReader()}.
+ *
+ * @param eventName an object that will provide the event name if requested
+ * @param dataReader a {@link Reader} for consuming the event data
+ * @param lastEventId an object that will provide the last event ID if requested
+ * @param origin the stream endpoint
+ * @param headers the response headers from the current connection, or null if not available
+ *
+ * @see #getDataReader()
+ * @since 4.2.0
+ */
+ public MessageEvent(
+ String eventName,
+ Reader dataReader,
+ String lastEventId,
+ URI origin,
+ ResponseHeaders headers
+ ) {
this.data = null;
this.dataReader = dataReader;
this.dataReaderLock = new Object();
this.eventName = eventName == null ? DEFAULT_EVENT_NAME : eventName;
this.lastEventId = lastEventId;
this.origin = origin;
+ this.headers = headers;
}
/**
* Constructs a new instance.
- *
+ *
* @param eventName the event name
* @param data the event data, if any
*/
public MessageEvent(String eventName, String data) {
- this(eventName, data, null, null);
+ this(eventName, data, null, null, null);
}
/**
@@ -230,6 +277,20 @@ public URI getOrigin() {
return origin;
}
+ /**
+ * Returns the response headers from the current connection, or null if not available.
+ *
+ * For HTTP connections, this contains the HTTP response headers that were received when
+ * the connection was established. These headers remain the same for all events received
+ * on this connection.
+ *
+ * @return the response headers, or null if not available
+ * @since 4.2.0
+ */
+ public ResponseHeaders getHeaders() {
+ return headers;
+ }
+
/**
* Returns {@code true} if this event was dispatched with streaming data behavior rather than pre-read data.
*
diff --git a/src/main/java/com/launchdarkly/eventsource/ResponseHeaders.java b/src/main/java/com/launchdarkly/eventsource/ResponseHeaders.java
new file mode 100644
index 0000000..c310f71
--- /dev/null
+++ b/src/main/java/com/launchdarkly/eventsource/ResponseHeaders.java
@@ -0,0 +1,115 @@
+package com.launchdarkly.eventsource;
+
+/**
+ * This interface provides access to HTTP response headers in a way that is independent
+ * of any specific HTTP client implementation. Headers are represented as an ordered list
+ * of name-value pairs, preserving the original structure including duplicate header names.
+ *
+ * Implementations of this interface should be immutable and thread-safe.
+ *
+ * @since 4.2.0
+ */
+public interface ResponseHeaders {
+ /**
+ * Represents a single HTTP header as a name-value pair.
+ *
+ * Header names are case-insensitive according to the HTTP specification, but the
+ * original case is preserved in this representation.
+ *
+ * @since 4.2.0
+ */
+ public static final class Header {
+ private final String name;
+ private final String value;
+
+ /**
+ * Creates a header with the given name and value.
+ *
+ * @param name the header name
+ * @param value the header value
+ */
+ public Header(String name, String value) {
+ this.name = name;
+ this.value = value;
+ }
+
+ /**
+ * Returns the header name.
+ *
+ * @return the header name
+ */
+ public String getName() {
+ return name;
+ }
+
+ /**
+ * Returns the header value.
+ *
+ * @return the header value
+ */
+ public String getValue() {
+ return value;
+ }
+
+ @Override
+ public String toString() {
+ return name + ": " + value;
+ }
+ }
+
+ /**
+ * Returns the number of headers in this collection.
+ *
+ * Note that if a header name appears multiple times (e.g., multiple {@code Set-Cookie}
+ * headers), each appearance is counted separately.
+ *
+ * @return the number of headers
+ */
+ int size();
+
+ /**
+ * Returns the header at the specified index.
+ *
+ * Headers are indexed in the order they appeared in the HTTP response.
+ *
+ * @param index the index of the header to retrieve (0-based)
+ * @return the header at the specified index
+ * @throws IndexOutOfBoundsException if the index is out of range
+ */
+ Header get(int index);
+
+ /**
+ * Returns the value of the first header with the specified name (case-insensitive),
+ * or null if no such header exists.
+ *
+ * This is a convenience method for headers where you expect a single value. If the
+ * header appears multiple times in the response, only the first occurrence is returned.
+ *
+ * Note that the returned value may contain commas if:
+ *
+ * - The original HTTP response had a comma in the header value, or
+ * - The HTTP client combined multiple header lines into one (though this is
+ * uncommon with the underlying OkHttp implementation)
+ *
+ *
+ * For headers that can legitimately appear multiple times (like {@code Set-Cookie}),
+ * use {@link #size()} and {@link #get(int)} to iterate through all occurrences instead.
+ *
+ * Example usage:
+ *
+ * String contentType = headers.value("Content-Type");
+ * String retryAfter = headers.value("Retry-After");
+ *
+ *
+ * @param name the header name (case-insensitive)
+ * @return the value of the first matching header, or null if not found
+ */
+ String value(String name);
+
+ /**
+ * Returns true if this collection contains no headers.
+ *
+ * @return true if there are no headers
+ */
+ boolean isEmpty();
+}
diff --git a/src/main/java/com/launchdarkly/eventsource/ResponseHeadersImpl.java b/src/main/java/com/launchdarkly/eventsource/ResponseHeadersImpl.java
new file mode 100644
index 0000000..f89fcb7
--- /dev/null
+++ b/src/main/java/com/launchdarkly/eventsource/ResponseHeadersImpl.java
@@ -0,0 +1,85 @@
+package com.launchdarkly.eventsource;
+
+import okhttp3.Headers;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Package-private implementation of ResponseHeaders.
+ *
+ * This class is immutable and thread-safe.
+ */
+final class ResponseHeadersImpl implements ResponseHeaders {
+ private static final ResponseHeaders EMPTY = new ResponseHeadersImpl(Collections.emptyList());
+
+ private final List headers;
+
+ private ResponseHeadersImpl(List headers) {
+ this.headers = headers;
+ }
+
+ /**
+ * Creates a ResponseHeaders instance from OkHttp Headers.
+ *
+ * This preserves all headers in the order they appear, including duplicate header names.
+ *
+ * @param headers the OkHttp headers
+ * @return a ResponseHeaders instance
+ */
+ static ResponseHeaders fromOkHttpHeaders(Headers headers) {
+ if (headers == null || headers.size() == 0) {
+ return EMPTY;
+ }
+
+ List headerList = new ArrayList<>(headers.size());
+ for (int i = 0; i < headers.size(); i++) {
+ headerList.add(new Header(headers.name(i), headers.value(i)));
+ }
+
+ return new ResponseHeadersImpl(Collections.unmodifiableList(headerList));
+ }
+
+ /**
+ * Returns an empty ResponseHeaders instance.
+ *
+ * @return an empty ResponseHeaders
+ */
+ static ResponseHeaders empty() {
+ return EMPTY;
+ }
+
+ @Override
+ public int size() {
+ return headers.size();
+ }
+
+ @Override
+ public Header get(int index) {
+ return headers.get(index);
+ }
+
+ @Override
+ public String value(String name) {
+ if (name == null) {
+ return null;
+ }
+ for (Header header : headers) {
+ if (header.getName().equalsIgnoreCase(name)) {
+ return header.getValue();
+ }
+ }
+ return null;
+ }
+
+ @Override
+ public boolean isEmpty() {
+ return headers.isEmpty();
+ }
+
+ @Override
+ public String toString() {
+ return "ResponseHeaders(" + headers.size() + " headers)";
+ }
+}
diff --git a/src/main/java/com/launchdarkly/eventsource/StreamHttpErrorException.java b/src/main/java/com/launchdarkly/eventsource/StreamHttpErrorException.java
index c4bc6ad..997d298 100644
--- a/src/main/java/com/launchdarkly/eventsource/StreamHttpErrorException.java
+++ b/src/main/java/com/launchdarkly/eventsource/StreamHttpErrorException.java
@@ -13,14 +13,27 @@
public class StreamHttpErrorException extends StreamException {
private final int code;
+ private final ResponseHeaders headers;
/**
* Constructs an instance.
* @param code the HTTP status
*/
public StreamHttpErrorException(int code) {
+ this(code, null);
+ }
+
+ /**
+ * Constructs an instance with response headers.
+ * @param code the HTTP status
+ * @param headers the response headers, or null if not available
+ *
+ * @since 4.2.0
+ */
+ public StreamHttpErrorException(int code, ResponseHeaders headers) {
super("Server returned HTTP error " + code);
this.code = code;
+ this.headers = headers;
}
/**
@@ -30,4 +43,14 @@ public StreamHttpErrorException(int code) {
public int getCode() {
return code;
}
+
+ /**
+ * Returns the response headers from the failed HTTP request, or null if not available.
+ *
+ * @return the response headers, or null
+ * @since 4.2.0
+ */
+ public ResponseHeaders getHeaders() {
+ return headers;
+ }
}
diff --git a/src/test/java/com/launchdarkly/eventsource/EventParserBaseTest.java b/src/test/java/com/launchdarkly/eventsource/EventParserBaseTest.java
index 1db9bf1..ddd73aa 100644
--- a/src/test/java/com/launchdarkly/eventsource/EventParserBaseTest.java
+++ b/src/test/java/com/launchdarkly/eventsource/EventParserBaseTest.java
@@ -46,7 +46,8 @@ protected void initParser(int bufferSize, boolean streamEventData, String... exp
bufferSize,
streamEventData,
expectedFields,
- testLoggerRule.getLogger()
+ testLoggerRule.getLogger(),
+ null
);
}
diff --git a/src/test/java/com/launchdarkly/eventsource/HttpConnectStrategyTest.java b/src/test/java/com/launchdarkly/eventsource/HttpConnectStrategyTest.java
index c2a8e9e..ae87ac9 100644
--- a/src/test/java/com/launchdarkly/eventsource/HttpConnectStrategyTest.java
+++ b/src/test/java/com/launchdarkly/eventsource/HttpConnectStrategyTest.java
@@ -268,6 +268,167 @@ public void requestFailsWithHttpError() throws Exception {
}
}
+ @Test
+ public void responseHeadersAreAvailableOnSuccess() throws Exception {
+ Handler response = Handlers.all(
+ Handlers.header("X-Custom-Header", "custom-value"),
+ Handlers.header("X-Request-Id", "12345"),
+ Handlers.startChunks("text/plain", Charset.forName("UTF-8")),
+ Handlers.writeChunkString("hello"),
+ Handlers.hang()
+ );
+ try (HttpServer server = HttpServer.start(response)) {
+ try (ConnectStrategy.Client client = ConnectStrategy.http(server.getUri())
+ .createClient(testLogger.getLogger())) {
+ ConnectStrategy.Client.Result result = client.connect(null);
+
+ ResponseHeaders headers = result.getHeaders();
+ assertThat(headers, Matchers.notNullValue());
+ assertThat(headers.size(), Matchers.greaterThan(0));
+
+ // Find and verify our custom headers
+ boolean foundCustomHeader = false;
+ boolean foundRequestId = false;
+ boolean foundContentType = false;
+
+ for (int i = 0; i < headers.size(); i++) {
+ ResponseHeaders.Header header = headers.get(i);
+ if (header.getName().equalsIgnoreCase("X-Custom-Header")) {
+ assertThat(header.getValue(), equalTo("custom-value"));
+ foundCustomHeader = true;
+ } else if (header.getName().equalsIgnoreCase("X-Request-Id")) {
+ assertThat(header.getValue(), equalTo("12345"));
+ foundRequestId = true;
+ } else if (header.getName().equalsIgnoreCase("Content-Type")) {
+ assertThat(header.getValue(), equalTo("text/plain;charset=utf-8"));
+ foundContentType = true;
+ }
+ }
+
+ assertThat("Should find X-Custom-Header", foundCustomHeader, equalTo(true));
+ assertThat("Should find X-Request-Id", foundRequestId, equalTo(true));
+ assertThat("Should find Content-Type", foundContentType, equalTo(true));
+
+ result.getCloser().close();
+ }
+ }
+ }
+
+ @Test
+ public void responseHeadersAreAvailableOnHttpError() throws Exception {
+ Handler errorResponse = Handlers.all(
+ Handlers.header("X-Error-Code", "RATE_LIMIT_EXCEEDED"),
+ Handlers.header("Retry-After", "60"),
+ Handlers.header("X-RateLimit-Remaining", "0"),
+ Handlers.status(429)
+ );
+ try (HttpServer server = HttpServer.start(errorResponse)) {
+ try (ConnectStrategy.Client client = ConnectStrategy.http(server.getUri())
+ .createClient(testLogger.getLogger())) {
+ try {
+ client.connect(null);
+ fail("expected exception");
+ } catch (StreamHttpErrorException e) {
+ assertThat(e.getCode(), equalTo(429));
+
+ ResponseHeaders headers = e.getHeaders();
+ assertThat(headers, Matchers.notNullValue());
+
+ // Find and verify our custom error headers
+ boolean foundErrorCode = false;
+ boolean foundRetryAfter = false;
+ boolean foundRateLimitRemaining = false;
+
+ for (int i = 0; i < headers.size(); i++) {
+ ResponseHeaders.Header header = headers.get(i);
+ if (header.getName().equalsIgnoreCase("X-Error-Code")) {
+ assertThat(header.getValue(), equalTo("RATE_LIMIT_EXCEEDED"));
+ foundErrorCode = true;
+ } else if (header.getName().equalsIgnoreCase("Retry-After")) {
+ assertThat(header.getValue(), equalTo("60"));
+ foundRetryAfter = true;
+ } else if (header.getName().equalsIgnoreCase("X-RateLimit-Remaining")) {
+ assertThat(header.getValue(), equalTo("0"));
+ foundRateLimitRemaining = true;
+ }
+ }
+
+ assertThat("Should find X-Error-Code", foundErrorCode, equalTo(true));
+ assertThat("Should find Retry-After", foundRetryAfter, equalTo(true));
+ assertThat("Should find X-RateLimit-Remaining", foundRateLimitRemaining, equalTo(true));
+ }
+ }
+ }
+ }
+
+ @Test
+ public void responseHeadersValueHelperMethod() throws Exception {
+ Handler response = Handlers.all(
+ Handlers.header("X-Test-Header", "test-value"),
+ Handlers.startChunks("text/plain", Charset.forName("UTF-8")),
+ Handlers.writeChunkString("hello"),
+ Handlers.hang()
+ );
+ try (HttpServer server = HttpServer.start(response)) {
+ try (ConnectStrategy.Client client = ConnectStrategy.http(server.getUri())
+ .createClient(testLogger.getLogger())) {
+ ConnectStrategy.Client.Result result = client.connect(null);
+
+ ResponseHeaders headers = result.getHeaders();
+ assertThat(headers, Matchers.notNullValue());
+
+ // Test value() returns the header value
+ assertThat(headers.value("X-Test-Header"), equalTo("test-value"));
+
+ // Test case-insensitivity
+ assertThat(headers.value("x-test-header"), equalTo("test-value"));
+ assertThat(headers.value("X-TEST-HEADER"), equalTo("test-value"));
+
+ // Test that missing headers return null
+ assertThat(headers.value("X-Nonexistent"), nullValue());
+
+ result.getCloser().close();
+ }
+ }
+ }
+
+ @Test
+ public void responseHeadersWithMultipleValuesForSameHeader() throws Exception {
+ Handler response = Handlers.all(
+ // The test-helper will combine these headers, versus them being different header lines, this is different
+ // from what an HTTP server *may* choose to do. So we cannot validate the version with multiple header lines,
+ // and instead these headers are combined into a single header with multiple values (comma separated).
+ Handlers.addHeader("Set-Cookie", "cookie1=value1"),
+ Handlers.addHeader("Set-Cookie", "cookie2=value2"),
+ Handlers.startChunks("text/plain", Charset.forName("UTF-8")),
+ Handlers.writeChunkString("hello"),
+ Handlers.hang()
+ );
+ try (HttpServer server = HttpServer.start(response)) {
+ try (ConnectStrategy.Client client = ConnectStrategy.http(server.getUri())
+ .createClient(testLogger.getLogger())) {
+ ConnectStrategy.Client.Result result = client.connect(null);
+
+ ResponseHeaders headers = result.getHeaders();
+ assertThat(headers, Matchers.notNullValue());
+
+ // Count how many Set-Cookie headers we have
+ int setCookieCount = 0;
+ for (int i = 0; i < headers.size(); i++) {
+ if (headers.get(i).getName().equalsIgnoreCase("Set-Cookie")) {
+ setCookieCount++;
+ }
+ }
+
+ // We should have at least 1 Set-Cookie header
+ // (the test server may combine them or keep them separate)
+ assertThat("Should have at least one Set-Cookie header", setCookieCount, Matchers.greaterThanOrEqualTo(1));
+
+ result.getCloser().close();
+ }
+ }
+ }
+
@Test
public void requestFailsWithIOException() throws Exception {
try (TcpServer server = TcpServer.start(TcpHandlers.noResponse())) {
diff --git a/src/test/java/com/launchdarkly/eventsource/HttpConnectStrategyWithEventSourceTest.java b/src/test/java/com/launchdarkly/eventsource/HttpConnectStrategyWithEventSourceTest.java
index 3162f0c..fc2c5ba 100644
--- a/src/test/java/com/launchdarkly/eventsource/HttpConnectStrategyWithEventSourceTest.java
+++ b/src/test/java/com/launchdarkly/eventsource/HttpConnectStrategyWithEventSourceTest.java
@@ -7,9 +7,13 @@
import org.junit.Rule;
import org.junit.Test;
+import org.hamcrest.Matchers;
+
import static com.launchdarkly.eventsource.TestUtils.interruptOnAnotherThread;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.notNullValue;
+import static org.hamcrest.Matchers.nullValue;
/**
* Tests of basic EventSource behavior using real HTTP requests.
@@ -104,4 +108,191 @@ public void eventSourceReconnectsAfterExternallyInterrupted() throws Exception {
}
}
}
+
+ @Test
+ public void messageEventsIncludeResponseHeaders() throws Exception {
+ Handler response = Handlers.all(
+ Handlers.header("X-Server-Id", "server-123"),
+ Handlers.header("X-Request-Id", "request-456"),
+ Handlers.SSE.start(),
+ Handlers.SSE.event("message", "data1"),
+ Handlers.SSE.event("message", "data2"),
+ Handlers.hang()
+ );
+
+ try (HttpServer server = HttpServer.start(response)) {
+ try (EventSource es = new EventSource.Builder(server.getUri()).build()) {
+ es.start();
+
+ // First message should have headers
+ StreamEvent event1 = es.readAnyEvent();
+ assertThat(event1 instanceof MessageEvent, equalTo(true));
+ MessageEvent msg1 = (MessageEvent) event1;
+ assertThat(msg1.getData(), equalTo("data1"));
+
+ ResponseHeaders headers1 = msg1.getHeaders();
+ assertThat(headers1, notNullValue());
+ assertThat(headers1.size(), Matchers.greaterThan(0));
+
+ // Find our custom headers using iteration
+ String serverId1 = findHeaderValue(headers1, "X-Server-Id");
+ String requestId1 = findHeaderValue(headers1, "X-Request-Id");
+ assertThat(serverId1, equalTo("server-123"));
+ assertThat(requestId1, equalTo("request-456"));
+
+ // Also validate the value() helper method
+ assertThat(headers1.value("X-Server-Id"), equalTo("server-123"));
+ assertThat(headers1.value("X-Request-Id"), equalTo("request-456"));
+
+ // Second message should have the same headers (same connection)
+ StreamEvent event2 = es.readAnyEvent();
+ assertThat(event2 instanceof MessageEvent, equalTo(true));
+ MessageEvent msg2 = (MessageEvent) event2;
+ assertThat(msg2.getData(), equalTo("data2"));
+
+ ResponseHeaders headers2 = msg2.getHeaders();
+ assertThat(headers2, notNullValue());
+
+ String serverId2 = findHeaderValue(headers2, "X-Server-Id");
+ String requestId2 = findHeaderValue(headers2, "X-Request-Id");
+ assertThat(serverId2, equalTo("server-123"));
+ assertThat(requestId2, equalTo("request-456"));
+
+ // Also validate the value() helper method
+ assertThat(headers2.value("X-Server-Id"), equalTo("server-123"));
+ assertThat(headers2.value("X-Request-Id"), equalTo("request-456"));
+ }
+ }
+ }
+
+ private String findHeaderValue(ResponseHeaders headers, String name) {
+ for (int i = 0; i < headers.size(); i++) {
+ ResponseHeaders.Header header = headers.get(i);
+ if (header.getName().equalsIgnoreCase(name)) {
+ return header.getValue();
+ }
+ }
+ return null;
+ }
+
+ @Test
+ public void messageEventsIncludeHeadersFromReconnectedConnection() throws Exception {
+ Handler response1 = Handlers.all(
+ Handlers.header("X-Connection-Id", "connection-1"),
+ Handlers.SSE.start(),
+ Handlers.SSE.event("message", "first")
+ );
+ Handler response2 = Handlers.all(
+ Handlers.header("X-Connection-Id", "connection-2"),
+ Handlers.SSE.start(),
+ Handlers.SSE.event("message", "second"),
+ Handlers.SSE.leaveOpen()
+ );
+ Handler allResponses = Handlers.sequential(response1, response2);
+
+ try (HttpServer server = HttpServer.start(allResponses)) {
+ try (EventSource es = new EventSource.Builder(server.getUri())
+ .errorStrategy(ErrorStrategy.alwaysContinue())
+ .retryDelay(1, null)
+ .build()) {
+ es.start();
+
+ // First message from first connection
+ StreamEvent event1 = es.readAnyEvent();
+ assertThat(event1 instanceof MessageEvent, equalTo(true));
+ MessageEvent msg1 = (MessageEvent) event1;
+ assertThat(msg1.getData(), equalTo("first"));
+ assertThat(msg1.getHeaders(), notNullValue());
+ assertThat(findHeaderValue(msg1.getHeaders(), "X-Connection-Id"), equalTo("connection-1"));
+ // Also validate the value() helper method
+ assertThat(msg1.getHeaders().value("X-Connection-Id"), equalTo("connection-1"));
+
+ // Fault event when connection closes
+ assertThat(es.readAnyEvent(), equalTo(new FaultEvent(new StreamClosedByServerException())));
+
+ // Started event for reconnection
+ assertThat(es.readAnyEvent(), equalTo(new StartedEvent()));
+
+ // Second message from second connection (should have new headers)
+ StreamEvent event2 = es.readAnyEvent();
+ assertThat(event2 instanceof MessageEvent, equalTo(true));
+ MessageEvent msg2 = (MessageEvent) event2;
+ assertThat(msg2.getData(), equalTo("second"));
+ assertThat(msg2.getHeaders(), notNullValue());
+ assertThat(findHeaderValue(msg2.getHeaders(), "X-Connection-Id"), equalTo("connection-2"));
+ // Also validate the value() helper method
+ assertThat(msg2.getHeaders().value("X-Connection-Id"), equalTo("connection-2"));
+ }
+ }
+ }
+
+ @Test
+ public void faultEventsIncludeHeadersForHttpErrors() throws Exception {
+ Handler errorResponse = Handlers.all(
+ Handlers.header("X-Error-Code", "RATE_LIMIT_EXCEEDED"),
+ Handlers.header("Retry-After", "60"),
+ Handlers.header("X-RateLimit-Remaining", "0"),
+ Handlers.status(429)
+ );
+
+ try (HttpServer server = HttpServer.start(errorResponse)) {
+ try (EventSource es = new EventSource.Builder(server.getUri())
+ .errorStrategy(ErrorStrategy.alwaysContinue())
+ .build()) {
+
+ StreamEvent event = es.readAnyEvent();
+ assertThat(event instanceof FaultEvent, equalTo(true));
+ FaultEvent fault = (FaultEvent) event;
+
+ // Verify it's an HTTP error
+ assertThat(fault.getCause() instanceof StreamHttpErrorException, equalTo(true));
+ StreamHttpErrorException httpError = (StreamHttpErrorException) fault.getCause();
+ assertThat(httpError.getCode(), equalTo(429));
+
+ // Verify headers are available using iteration
+ ResponseHeaders headers = fault.getHeaders();
+ assertThat(headers, notNullValue());
+ assertThat(findHeaderValue(headers, "X-Error-Code"), equalTo("RATE_LIMIT_EXCEEDED"));
+ assertThat(findHeaderValue(headers, "Retry-After"), equalTo("60"));
+ assertThat(findHeaderValue(headers, "X-RateLimit-Remaining"), equalTo("0"));
+
+ // Also validate the value() helper method
+ assertThat(headers.value("X-Error-Code"), equalTo("RATE_LIMIT_EXCEEDED"));
+ assertThat(headers.value("Retry-After"), equalTo("60"));
+ assertThat(headers.value("X-RateLimit-Remaining"), equalTo("0"));
+ }
+ }
+ }
+
+ @Test
+ public void faultEventsDoNotIncludeHeadersForNonHttpErrors() throws Exception {
+ Handler response = Handlers.all(
+ Handlers.SSE.start(),
+ Handlers.SSE.event("message", "data1")
+ // Connection closes, causing StreamClosedByServerException
+ );
+
+ try (HttpServer server = HttpServer.start(response)) {
+ try (EventSource es = new EventSource.Builder(server.getUri())
+ .errorStrategy(ErrorStrategy.alwaysContinue())
+ .build()) {
+ es.start();
+
+ // Read the message
+ assertThat(es.readAnyEvent(), equalTo(
+ new MessageEvent("message", "data1", null, es.getOrigin())));
+
+ // Get the fault event
+ StreamEvent event = es.readAnyEvent();
+ assertThat(event instanceof FaultEvent, equalTo(true));
+ FaultEvent fault = (FaultEvent) event;
+
+ // Verify it's not an HTTP error
+ assertThat(fault.getCause() instanceof StreamClosedByServerException, equalTo(true));
+
+ // Headers should be null for non-HTTP errors
+ assertThat(fault.getHeaders(), nullValue());
+ }
+ }
+ }
}
From e8e81b33db4b3806dcea271947b39c1d5c785f33 Mon Sep 17 00:00:00 2001
From: Ryan Lamb <4955475+kinyoklion@users.noreply.github.com>
Date: Wed, 21 Jan 2026 11:15:02 -0800
Subject: [PATCH 2/2] Exceptions after initial connection will not be
exceptions with headers.
---
.../launchdarkly/eventsource/EventSource.java | 9 +--
.../eventsource/HttpConnectStrategyTest.java | 55 +++++++++++++++++++
...ttpConnectStrategyWithEventSourceTest.java | 44 +++++++++++++++
3 files changed, 102 insertions(+), 6 deletions(-)
diff --git a/src/main/java/com/launchdarkly/eventsource/EventSource.java b/src/main/java/com/launchdarkly/eventsource/EventSource.java
index df69986..f5c6f94 100644
--- a/src/main/java/com/launchdarkly/eventsource/EventSource.java
+++ b/src/main/java/com/launchdarkly/eventsource/EventSource.java
@@ -631,12 +631,9 @@ private StreamEvent requireEvent() throws StreamException {
eventParser = null;
computeReconnectDelay();
if (applyErrorStrategy(e) == ErrorStrategy.Action.CONTINUE) {
- // Extract headers from exception if available
- ResponseHeaders headers = null;
- if (e instanceof StreamHttpErrorException) {
- headers = ((StreamHttpErrorException) e).getHeaders();
- }
- return new FaultEvent(e, headers);
+ // At this point we're handling errors from reading the stream (not initial connection),
+ // so we never have HTTP response headers available (headers is always null)
+ return new FaultEvent(e, null);
}
throw e;
}
diff --git a/src/test/java/com/launchdarkly/eventsource/HttpConnectStrategyTest.java b/src/test/java/com/launchdarkly/eventsource/HttpConnectStrategyTest.java
index ae87ac9..06415d5 100644
--- a/src/test/java/com/launchdarkly/eventsource/HttpConnectStrategyTest.java
+++ b/src/test/java/com/launchdarkly/eventsource/HttpConnectStrategyTest.java
@@ -429,6 +429,61 @@ public void responseHeadersWithMultipleValuesForSameHeader() throws Exception {
}
}
+ @Test
+ public void responseHeadersEdgeCases() throws Exception {
+ Handler response = Handlers.all(
+ Handlers.header("X-Test", "value"),
+ Handlers.startChunks("text/plain", Charset.forName("UTF-8")),
+ Handlers.writeChunkString("hello"),
+ Handlers.hang()
+ );
+ try (HttpServer server = HttpServer.start(response)) {
+ try (ConnectStrategy.Client client = ConnectStrategy.http(server.getUri())
+ .createClient(testLogger.getLogger())) {
+ ConnectStrategy.Client.Result result = client.connect(null);
+
+ ResponseHeaders headers = result.getHeaders();
+ assertThat(headers, Matchers.notNullValue());
+
+ // Test value() with null parameter
+ assertThat(headers.value(null), nullValue());
+
+ // Test isEmpty() - headers should not be empty
+ assertThat(headers.isEmpty(), equalTo(false));
+
+ // Test toString() - just verify it doesn't throw
+ assertThat(headers.toString(), Matchers.notNullValue());
+
+ // Test Header.toString()
+ ResponseHeaders.Header header = headers.get(0);
+ assertThat(header.toString(), Matchers.notNullValue());
+ assertThat(header.toString(), Matchers.containsString(":"));
+
+ result.getCloser().close();
+ }
+ }
+ }
+
+ @Test
+ public void responseHeadersEmpty() throws Exception {
+ // Test the empty headers case by using ResponseHeadersImpl directly
+ ResponseHeaders empty = ResponseHeadersImpl.empty();
+ assertThat(empty, Matchers.notNullValue());
+ assertThat(empty.isEmpty(), equalTo(true));
+ assertThat(empty.size(), equalTo(0));
+ assertThat(empty.value("Any-Header"), nullValue());
+ assertThat(empty.toString(), Matchers.notNullValue());
+
+ // Test fromOkHttpHeaders with null
+ ResponseHeaders fromNull = ResponseHeadersImpl.fromOkHttpHeaders(null);
+ assertThat(fromNull.isEmpty(), equalTo(true));
+
+ // Test fromOkHttpHeaders with empty Headers
+ okhttp3.Headers emptyHeaders = new okhttp3.Headers.Builder().build();
+ ResponseHeaders fromEmpty = ResponseHeadersImpl.fromOkHttpHeaders(emptyHeaders);
+ assertThat(fromEmpty.isEmpty(), equalTo(true));
+ }
+
@Test
public void requestFailsWithIOException() throws Exception {
try (TcpServer server = TcpServer.start(TcpHandlers.noResponse())) {
diff --git a/src/test/java/com/launchdarkly/eventsource/HttpConnectStrategyWithEventSourceTest.java b/src/test/java/com/launchdarkly/eventsource/HttpConnectStrategyWithEventSourceTest.java
index fc2c5ba..2ce7493 100644
--- a/src/test/java/com/launchdarkly/eventsource/HttpConnectStrategyWithEventSourceTest.java
+++ b/src/test/java/com/launchdarkly/eventsource/HttpConnectStrategyWithEventSourceTest.java
@@ -44,6 +44,50 @@ public void eventSourceReadsEvents() throws Exception {
}
}
+ @Test
+ public void eventSourceTracksLastEventId() throws Exception {
+ Handler response = Handlers.all(
+ Handlers.SSE.start(),
+ Handlers.writeChunkString("id: 123\nevent: message\ndata: first\n\n"),
+ Handlers.writeChunkString("id: 456\nevent: message\ndata: second\n\n"),
+ Handlers.hang()
+ );
+
+ try (HttpServer server = HttpServer.start(response)) {
+ try (EventSource es = new EventSource.Builder(server.getUri()).build()) {
+ es.start();
+
+ assertThat(es.readAnyEvent(), equalTo(
+ new MessageEvent("message", "first", "123", es.getOrigin())));
+
+ assertThat(es.readAnyEvent(), equalTo(
+ new MessageEvent("message", "second", "456", es.getOrigin())));
+
+ // Verify the EventSource tracks the last event ID
+ assertThat(es.getLastEventId(), equalTo("456"));
+ }
+ }
+ }
+
+ @Test
+ public void eventSourceHandlesRetryField() throws Exception {
+ Handler response = Handlers.all(
+ Handlers.SSE.start(),
+ Handlers.writeChunkString("retry: 5000\nevent: message\ndata: test\n\n"),
+ Handlers.hang()
+ );
+
+ try (HttpServer server = HttpServer.start(response)) {
+ try (EventSource es = new EventSource.Builder(server.getUri()).build()) {
+ es.start();
+
+ // The retry field should be processed internally, and we should still get the message
+ assertThat(es.readAnyEvent(), equalTo(
+ new MessageEvent("message", "test", null, es.getOrigin())));
+ }
+ }
+ }
+
@Test
public void eventSourceReconnectsAfterSocketClosed() throws Exception {
Handler response1 = Handlers.all(