diff --git a/build.gradle.kts b/build.gradle.kts index 2bbe220..4c5f75f 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -69,7 +69,7 @@ dependencies { api("com.launchdarkly:launchdarkly-logging:${Versions.launchdarklyLogging}") api("com.squareup.okhttp3:okhttp:${Versions.okhttp}") testImplementation("org.mockito:mockito-core:1.10.19") - testImplementation("com.launchdarkly:test-helpers:2.0.1") + testImplementation("com.launchdarkly:test-helpers:2.1.0") testImplementation("com.google.guava:guava:32.0.1-jre") testImplementation("junit:junit:4.12") testImplementation("org.hamcrest:hamcrest-all:1.3") diff --git a/src/main/java/com/launchdarkly/eventsource/ConnectStrategy.java b/src/main/java/com/launchdarkly/eventsource/ConnectStrategy.java index 31b3239..fe7f30c 100644 --- a/src/main/java/com/launchdarkly/eventsource/ConnectStrategy.java +++ b/src/main/java/com/launchdarkly/eventsource/ConnectStrategy.java @@ -65,18 +65,34 @@ public static class Result { private final InputStream inputStream; private final URI origin; private final Closeable closer; - + private final ResponseHeaders headers; + /** * Creates an instance. - * + * * @param inputStream see {@link #getInputStream()} * @param origin see {@link #getOrigin()} * @param closer see {@link #getCloser()} */ public Result(InputStream inputStream, URI origin, Closeable closer) { + this(inputStream, origin, closer, null); + } + + /** + * Creates an instance with response headers. + * + * @param inputStream see {@link #getInputStream()} + * @param origin see {@link #getOrigin()} + * @param closer see {@link #getCloser()} + * @param headers see {@link #getHeaders()} + * + * @since 4.2.0 + */ + public Result(InputStream inputStream, URI origin, Closeable closer, ResponseHeaders headers) { this.inputStream = inputStream; this.origin = origin; this.closer = closer; + this.headers = headers; } /** @@ -118,6 +134,19 @@ public URI getOrigin() { public Closeable getCloser() { return closer; } + + /** + * Returns the response headers from the connection, or null if not available. + *

+ * For HTTP connections, this contains the HTTP response headers. For other + * connection types, this may be null or may contain connection-specific metadata. + * + * @return the response headers, or null if not available + * @since 4.2.0 + */ + public ResponseHeaders getHeaders() { + return headers; + } } /** diff --git a/src/main/java/com/launchdarkly/eventsource/EventParser.java b/src/main/java/com/launchdarkly/eventsource/EventParser.java index 02bdffe..85d9473 100644 --- a/src/main/java/com/launchdarkly/eventsource/EventParser.java +++ b/src/main/java/com/launchdarkly/eventsource/EventParser.java @@ -50,6 +50,7 @@ final class EventParser { private Set expectFields; private final LDLogger logger; private final URI origin; + private final ResponseHeaders headers; private BufferedLineParser lineParser; private byte[] chunkData; @@ -77,14 +78,16 @@ final class EventParser { int readBufferSize, boolean streamEventData, Set expectFields, - LDLogger logger + LDLogger logger, + ResponseHeaders headers ) { this.lineParser = new BufferedLineParser(inputStream, readBufferSize); this.origin = origin; this.streamEventData = streamEventData; this.expectFields = expectFields; this.logger = logger; - + this.headers = headers; + dataBuffer = new ByteArrayOutputStream(VALUE_BUFFER_INITIAL_CAPACITY); } @@ -159,7 +162,7 @@ private StreamEvent tryNextEvent() throws StreamException { } String dataString = utf8ByteArrayOutputStreamToString(dataBuffer); - MessageEvent message = new MessageEvent(eventName, dataString, lastEventId, origin); + MessageEvent message = new MessageEvent(eventName, dataString, lastEventId, origin, headers); resetState(); logger.debug("Received message: {}", message); return message; @@ -193,7 +196,8 @@ private StreamEvent tryNextEvent() throws StreamException { eventName, new InputStreamReader(messageDataStream), lastEventId, - origin + origin, + headers ); logger.debug("Received message: {}", message); return message; diff --git a/src/main/java/com/launchdarkly/eventsource/EventSource.java b/src/main/java/com/launchdarkly/eventsource/EventSource.java index cf276d9..f5c6f94 100644 --- a/src/main/java/com/launchdarkly/eventsource/EventSource.java +++ b/src/main/java/com/launchdarkly/eventsource/EventSource.java @@ -301,14 +301,19 @@ private FaultEvent tryStart(boolean canReturnFaultEvent) throws StreamException if (applyErrorStrategy(exception) == ErrorStrategy.Action.CONTINUE) { // The ErrorStrategy told us to CONTINUE rather than throwing an exception. if (canReturnFaultEvent) { - return new FaultEvent(exception); + // Extract headers from exception if available + ResponseHeaders headers = null; + if (exception instanceof StreamHttpErrorException) { + headers = ((StreamHttpErrorException) exception).getHeaders(); + } + return new FaultEvent(exception, headers); } // If canReturnFaultEvent is false, it means the caller explicitly called start(), // in which case there's no way to return a FaultEvent so we just keep retrying // transparently. continue; } - // The ErrorStrategy told us to THROW rather than CONTINUE. + // The ErrorStrategy told us to THROW rather than CONTINUE. throw exception; } @@ -317,14 +322,16 @@ private FaultEvent tryStart(boolean canReturnFaultEvent) throws StreamException origin = clientResult.getOrigin() == null ? client.getOrigin() : clientResult.getOrigin(); connectedTime = System.currentTimeMillis(); logger.debug("Connected to SSE stream"); - + + ResponseHeaders headers = clientResult.getHeaders(); eventParser = new EventParser( clientResult.getInputStream(), clientResult.getOrigin(), readBufferSize, streamEventData, expectFields, - logger + logger, + headers ); readyState.set(ReadyState.OPEN); @@ -624,7 +631,9 @@ private StreamEvent requireEvent() throws StreamException { eventParser = null; computeReconnectDelay(); if (applyErrorStrategy(e) == ErrorStrategy.Action.CONTINUE) { - return new FaultEvent(e); + // At this point we're handling errors from reading the stream (not initial connection), + // so we never have HTTP response headers available (headers is always null) + return new FaultEvent(e, null); } throw e; } diff --git a/src/main/java/com/launchdarkly/eventsource/FaultEvent.java b/src/main/java/com/launchdarkly/eventsource/FaultEvent.java index e48c690..c7e098e 100644 --- a/src/main/java/com/launchdarkly/eventsource/FaultEvent.java +++ b/src/main/java/com/launchdarkly/eventsource/FaultEvent.java @@ -20,16 +20,30 @@ */ public final class FaultEvent implements StreamEvent { private final StreamException cause; - + private final ResponseHeaders headers; + /** * Creates an instance. * * @param cause the cause of the failure */ public FaultEvent(StreamException cause) { + this(cause, null); + } + + /** + * Creates an instance with response headers. + * + * @param cause the cause of the failure + * @param headers the response headers if available (typically for HTTP errors), or null + * + * @since 4.2.0 + */ + public FaultEvent(StreamException cause, ResponseHeaders headers) { this.cause = cause; + this.headers = headers; } - + /** * Returns a {@link StreamException} describing the cause of the failure. * @@ -38,6 +52,20 @@ public FaultEvent(StreamException cause) { public StreamException getCause() { return cause; } + + /** + * Returns the response headers if available, or null otherwise. + *

+ * For HTTP error responses (when the cause is a {@link StreamHttpErrorException}), + * this typically contains the HTTP response headers from the failed request. + * For other types of failures (I/O errors, timeouts, etc.), this may be null. + * + * @return the response headers, or null if not available + * @since 4.2.0 + */ + public ResponseHeaders getHeaders() { + return headers; + } @Override public boolean equals(Object o) { diff --git a/src/main/java/com/launchdarkly/eventsource/HttpConnectStrategy.java b/src/main/java/com/launchdarkly/eventsource/HttpConnectStrategy.java index 0d535a4..fcd9bfe 100644 --- a/src/main/java/com/launchdarkly/eventsource/HttpConnectStrategy.java +++ b/src/main/java/com/launchdarkly/eventsource/HttpConnectStrategy.java @@ -443,10 +443,10 @@ class Client extends ConnectStrategy.Client { // package-private visibility for @Override public Result connect(String lastEventId) throws StreamException { logger.debug("Attempting to connect to SSE stream at {}", uri); - + Request request = createRequest(lastEventId); Call call = httpClient.newCall(request); - + Response response; try { response = call.execute(); @@ -454,18 +454,21 @@ public Result connect(String lastEventId) throws StreamException { logger.info("Connection failed: {}", LogValues.exceptionSummary(e)); throw new StreamIOException(e); } - + if (!response.isSuccessful()) { + ResponseHeaders headers = ResponseHeadersImpl.fromOkHttpHeaders(response.headers()); response.close(); logger.info("Server returned HTTP error {}", response.code()); - throw new StreamHttpErrorException(response.code()); + throw new StreamHttpErrorException(response.code(), headers); } - + + ResponseHeaders headers = ResponseHeadersImpl.fromOkHttpHeaders(response.headers()); ResponseBody responseBody = response.body(); return new Result( responseBody.byteStream(), uri, - new RequestCloser(call) + new RequestCloser(call), + headers ); } diff --git a/src/main/java/com/launchdarkly/eventsource/MessageEvent.java b/src/main/java/com/launchdarkly/eventsource/MessageEvent.java index 819aed0..0f9edb8 100644 --- a/src/main/java/com/launchdarkly/eventsource/MessageEvent.java +++ b/src/main/java/com/launchdarkly/eventsource/MessageEvent.java @@ -32,50 +32,69 @@ public class MessageEvent implements StreamEvent { private final String eventName; private final String lastEventId; private final URI origin; + private final ResponseHeaders headers; /** * Simple constructor with event data only, using the default event name. *

* This constructor assumes that the event data has been fully read into memory as a String. - * + * * @param data the event data; if null, will be changed to an empty string */ public MessageEvent(String data) { - this(null, data, null, null); + this(null, data, null, null, null); } /** * Constructs a new instance with the default event name. *

* This constructor assumes that the event data has been fully read into memory as a String. - * + * * @param data the event data; if null, will be changed to an empty string * @param lastEventId the event ID, or null if none * @param origin the stream endpoint */ public MessageEvent(String data, String lastEventId, URI origin) { - this(null, data, lastEventId, origin); + this(null, data, lastEventId, origin, null); } /** * Constructs a new instance. *

* This constructor assumes that the event data has been fully read into memory as a String. - * + * * @param eventName the event name; if null, {@link #DEFAULT_EVENT_NAME} is used * @param data the event data; if null, will be changed to an empty string * @param lastEventId the event ID, or null if none * @param origin the stream endpoint - * + * * @since 2.6.0 */ public MessageEvent(String eventName, String data, String lastEventId, URI origin) { + this(eventName, data, lastEventId, origin, null); + } + + /** + * Constructs a new instance with response headers. + *

+ * This constructor assumes that the event data has been fully read into memory as a String. + * + * @param eventName the event name; if null, {@link #DEFAULT_EVENT_NAME} is used + * @param data the event data; if null, will be changed to an empty string + * @param lastEventId the event ID, or null if none + * @param origin the stream endpoint + * @param headers the response headers from the current connection, or null if not available + * + * @since 4.2.0 + */ + public MessageEvent(String eventName, String data, String lastEventId, URI origin, ResponseHeaders headers) { this.eventName = eventName == null ? DEFAULT_EVENT_NAME : eventName; this.data = data == null ? "" : data; this.dataReader = null; this.dataReaderLock = new Object(); this.lastEventId = lastEventId; this.origin = origin; + this.headers = headers; } /** @@ -85,12 +104,12 @@ public MessageEvent(String eventName, String data, String lastEventId, URI origi * optimization that sometimes allows events to be processed without large buffers. The caller * must be careful about using this model because the behavior of the reader is not idempotent; * see {@link #getDataReader()}. - * + * * @param eventName an object that will provide the event name if requested * @param dataReader a {@link Reader} for consuming the event data * @param lastEventId an object that will provide the last event ID if requested * @param origin the stream endpoint - * + * * @see #getDataReader() * @since 2.6.0 */ @@ -100,22 +119,50 @@ public MessageEvent( String lastEventId, URI origin ) { + this(eventName, dataReader, lastEventId, origin, null); + } + + /** + * Constructs a new instance with lazy-loading behavior and response headers. + *

+ * This constructor takes a {@link Reader} instead of a String for the event data. This is an + * optimization that sometimes allows events to be processed without large buffers. The caller + * must be careful about using this model because the behavior of the reader is not idempotent; + * see {@link #getDataReader()}. + * + * @param eventName an object that will provide the event name if requested + * @param dataReader a {@link Reader} for consuming the event data + * @param lastEventId an object that will provide the last event ID if requested + * @param origin the stream endpoint + * @param headers the response headers from the current connection, or null if not available + * + * @see #getDataReader() + * @since 4.2.0 + */ + public MessageEvent( + String eventName, + Reader dataReader, + String lastEventId, + URI origin, + ResponseHeaders headers + ) { this.data = null; this.dataReader = dataReader; this.dataReaderLock = new Object(); this.eventName = eventName == null ? DEFAULT_EVENT_NAME : eventName; this.lastEventId = lastEventId; this.origin = origin; + this.headers = headers; } /** * Constructs a new instance. - * + * * @param eventName the event name * @param data the event data, if any */ public MessageEvent(String eventName, String data) { - this(eventName, data, null, null); + this(eventName, data, null, null, null); } /** @@ -230,6 +277,20 @@ public URI getOrigin() { return origin; } + /** + * Returns the response headers from the current connection, or null if not available. + *

+ * For HTTP connections, this contains the HTTP response headers that were received when + * the connection was established. These headers remain the same for all events received + * on this connection. + * + * @return the response headers, or null if not available + * @since 4.2.0 + */ + public ResponseHeaders getHeaders() { + return headers; + } + /** * Returns {@code true} if this event was dispatched with streaming data behavior rather than pre-read data. *

diff --git a/src/main/java/com/launchdarkly/eventsource/ResponseHeaders.java b/src/main/java/com/launchdarkly/eventsource/ResponseHeaders.java new file mode 100644 index 0000000..c310f71 --- /dev/null +++ b/src/main/java/com/launchdarkly/eventsource/ResponseHeaders.java @@ -0,0 +1,115 @@ +package com.launchdarkly.eventsource; + +/** + * This interface provides access to HTTP response headers in a way that is independent + * of any specific HTTP client implementation. Headers are represented as an ordered list + * of name-value pairs, preserving the original structure including duplicate header names. + *

+ * Implementations of this interface should be immutable and thread-safe. + * + * @since 4.2.0 + */ +public interface ResponseHeaders { + /** + * Represents a single HTTP header as a name-value pair. + *

+ * Header names are case-insensitive according to the HTTP specification, but the + * original case is preserved in this representation. + * + * @since 4.2.0 + */ + public static final class Header { + private final String name; + private final String value; + + /** + * Creates a header with the given name and value. + * + * @param name the header name + * @param value the header value + */ + public Header(String name, String value) { + this.name = name; + this.value = value; + } + + /** + * Returns the header name. + * + * @return the header name + */ + public String getName() { + return name; + } + + /** + * Returns the header value. + * + * @return the header value + */ + public String getValue() { + return value; + } + + @Override + public String toString() { + return name + ": " + value; + } + } + + /** + * Returns the number of headers in this collection. + *

+ * Note that if a header name appears multiple times (e.g., multiple {@code Set-Cookie} + * headers), each appearance is counted separately. + * + * @return the number of headers + */ + int size(); + + /** + * Returns the header at the specified index. + *

+ * Headers are indexed in the order they appeared in the HTTP response. + * + * @param index the index of the header to retrieve (0-based) + * @return the header at the specified index + * @throws IndexOutOfBoundsException if the index is out of range + */ + Header get(int index); + + /** + * Returns the value of the first header with the specified name (case-insensitive), + * or null if no such header exists. + *

+ * This is a convenience method for headers where you expect a single value. If the + * header appears multiple times in the response, only the first occurrence is returned. + *

+ * Note that the returned value may contain commas if: + *

+ *

+ * For headers that can legitimately appear multiple times (like {@code Set-Cookie}), + * use {@link #size()} and {@link #get(int)} to iterate through all occurrences instead. + *

+ * Example usage: + *

+   * String contentType = headers.value("Content-Type");
+   * String retryAfter = headers.value("Retry-After");
+   * 
+ * + * @param name the header name (case-insensitive) + * @return the value of the first matching header, or null if not found + */ + String value(String name); + + /** + * Returns true if this collection contains no headers. + * + * @return true if there are no headers + */ + boolean isEmpty(); +} diff --git a/src/main/java/com/launchdarkly/eventsource/ResponseHeadersImpl.java b/src/main/java/com/launchdarkly/eventsource/ResponseHeadersImpl.java new file mode 100644 index 0000000..f89fcb7 --- /dev/null +++ b/src/main/java/com/launchdarkly/eventsource/ResponseHeadersImpl.java @@ -0,0 +1,85 @@ +package com.launchdarkly.eventsource; + +import okhttp3.Headers; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +/** + * Package-private implementation of ResponseHeaders. + *

+ * This class is immutable and thread-safe. + */ +final class ResponseHeadersImpl implements ResponseHeaders { + private static final ResponseHeaders EMPTY = new ResponseHeadersImpl(Collections.emptyList()); + + private final List

headers; + + private ResponseHeadersImpl(List
headers) { + this.headers = headers; + } + + /** + * Creates a ResponseHeaders instance from OkHttp Headers. + *

+ * This preserves all headers in the order they appear, including duplicate header names. + * + * @param headers the OkHttp headers + * @return a ResponseHeaders instance + */ + static ResponseHeaders fromOkHttpHeaders(Headers headers) { + if (headers == null || headers.size() == 0) { + return EMPTY; + } + + List

headerList = new ArrayList<>(headers.size()); + for (int i = 0; i < headers.size(); i++) { + headerList.add(new Header(headers.name(i), headers.value(i))); + } + + return new ResponseHeadersImpl(Collections.unmodifiableList(headerList)); + } + + /** + * Returns an empty ResponseHeaders instance. + * + * @return an empty ResponseHeaders + */ + static ResponseHeaders empty() { + return EMPTY; + } + + @Override + public int size() { + return headers.size(); + } + + @Override + public Header get(int index) { + return headers.get(index); + } + + @Override + public String value(String name) { + if (name == null) { + return null; + } + for (Header header : headers) { + if (header.getName().equalsIgnoreCase(name)) { + return header.getValue(); + } + } + return null; + } + + @Override + public boolean isEmpty() { + return headers.isEmpty(); + } + + @Override + public String toString() { + return "ResponseHeaders(" + headers.size() + " headers)"; + } +} diff --git a/src/main/java/com/launchdarkly/eventsource/StreamHttpErrorException.java b/src/main/java/com/launchdarkly/eventsource/StreamHttpErrorException.java index c4bc6ad..997d298 100644 --- a/src/main/java/com/launchdarkly/eventsource/StreamHttpErrorException.java +++ b/src/main/java/com/launchdarkly/eventsource/StreamHttpErrorException.java @@ -13,14 +13,27 @@ public class StreamHttpErrorException extends StreamException { private final int code; + private final ResponseHeaders headers; /** * Constructs an instance. * @param code the HTTP status */ public StreamHttpErrorException(int code) { + this(code, null); + } + + /** + * Constructs an instance with response headers. + * @param code the HTTP status + * @param headers the response headers, or null if not available + * + * @since 4.2.0 + */ + public StreamHttpErrorException(int code, ResponseHeaders headers) { super("Server returned HTTP error " + code); this.code = code; + this.headers = headers; } /** @@ -30,4 +43,14 @@ public StreamHttpErrorException(int code) { public int getCode() { return code; } + + /** + * Returns the response headers from the failed HTTP request, or null if not available. + * + * @return the response headers, or null + * @since 4.2.0 + */ + public ResponseHeaders getHeaders() { + return headers; + } } diff --git a/src/test/java/com/launchdarkly/eventsource/EventParserBaseTest.java b/src/test/java/com/launchdarkly/eventsource/EventParserBaseTest.java index 1db9bf1..ddd73aa 100644 --- a/src/test/java/com/launchdarkly/eventsource/EventParserBaseTest.java +++ b/src/test/java/com/launchdarkly/eventsource/EventParserBaseTest.java @@ -46,7 +46,8 @@ protected void initParser(int bufferSize, boolean streamEventData, String... exp bufferSize, streamEventData, expectedFields, - testLoggerRule.getLogger() + testLoggerRule.getLogger(), + null ); } diff --git a/src/test/java/com/launchdarkly/eventsource/HttpConnectStrategyTest.java b/src/test/java/com/launchdarkly/eventsource/HttpConnectStrategyTest.java index c2a8e9e..06415d5 100644 --- a/src/test/java/com/launchdarkly/eventsource/HttpConnectStrategyTest.java +++ b/src/test/java/com/launchdarkly/eventsource/HttpConnectStrategyTest.java @@ -268,6 +268,222 @@ public void requestFailsWithHttpError() throws Exception { } } + @Test + public void responseHeadersAreAvailableOnSuccess() throws Exception { + Handler response = Handlers.all( + Handlers.header("X-Custom-Header", "custom-value"), + Handlers.header("X-Request-Id", "12345"), + Handlers.startChunks("text/plain", Charset.forName("UTF-8")), + Handlers.writeChunkString("hello"), + Handlers.hang() + ); + try (HttpServer server = HttpServer.start(response)) { + try (ConnectStrategy.Client client = ConnectStrategy.http(server.getUri()) + .createClient(testLogger.getLogger())) { + ConnectStrategy.Client.Result result = client.connect(null); + + ResponseHeaders headers = result.getHeaders(); + assertThat(headers, Matchers.notNullValue()); + assertThat(headers.size(), Matchers.greaterThan(0)); + + // Find and verify our custom headers + boolean foundCustomHeader = false; + boolean foundRequestId = false; + boolean foundContentType = false; + + for (int i = 0; i < headers.size(); i++) { + ResponseHeaders.Header header = headers.get(i); + if (header.getName().equalsIgnoreCase("X-Custom-Header")) { + assertThat(header.getValue(), equalTo("custom-value")); + foundCustomHeader = true; + } else if (header.getName().equalsIgnoreCase("X-Request-Id")) { + assertThat(header.getValue(), equalTo("12345")); + foundRequestId = true; + } else if (header.getName().equalsIgnoreCase("Content-Type")) { + assertThat(header.getValue(), equalTo("text/plain;charset=utf-8")); + foundContentType = true; + } + } + + assertThat("Should find X-Custom-Header", foundCustomHeader, equalTo(true)); + assertThat("Should find X-Request-Id", foundRequestId, equalTo(true)); + assertThat("Should find Content-Type", foundContentType, equalTo(true)); + + result.getCloser().close(); + } + } + } + + @Test + public void responseHeadersAreAvailableOnHttpError() throws Exception { + Handler errorResponse = Handlers.all( + Handlers.header("X-Error-Code", "RATE_LIMIT_EXCEEDED"), + Handlers.header("Retry-After", "60"), + Handlers.header("X-RateLimit-Remaining", "0"), + Handlers.status(429) + ); + try (HttpServer server = HttpServer.start(errorResponse)) { + try (ConnectStrategy.Client client = ConnectStrategy.http(server.getUri()) + .createClient(testLogger.getLogger())) { + try { + client.connect(null); + fail("expected exception"); + } catch (StreamHttpErrorException e) { + assertThat(e.getCode(), equalTo(429)); + + ResponseHeaders headers = e.getHeaders(); + assertThat(headers, Matchers.notNullValue()); + + // Find and verify our custom error headers + boolean foundErrorCode = false; + boolean foundRetryAfter = false; + boolean foundRateLimitRemaining = false; + + for (int i = 0; i < headers.size(); i++) { + ResponseHeaders.Header header = headers.get(i); + if (header.getName().equalsIgnoreCase("X-Error-Code")) { + assertThat(header.getValue(), equalTo("RATE_LIMIT_EXCEEDED")); + foundErrorCode = true; + } else if (header.getName().equalsIgnoreCase("Retry-After")) { + assertThat(header.getValue(), equalTo("60")); + foundRetryAfter = true; + } else if (header.getName().equalsIgnoreCase("X-RateLimit-Remaining")) { + assertThat(header.getValue(), equalTo("0")); + foundRateLimitRemaining = true; + } + } + + assertThat("Should find X-Error-Code", foundErrorCode, equalTo(true)); + assertThat("Should find Retry-After", foundRetryAfter, equalTo(true)); + assertThat("Should find X-RateLimit-Remaining", foundRateLimitRemaining, equalTo(true)); + } + } + } + } + + @Test + public void responseHeadersValueHelperMethod() throws Exception { + Handler response = Handlers.all( + Handlers.header("X-Test-Header", "test-value"), + Handlers.startChunks("text/plain", Charset.forName("UTF-8")), + Handlers.writeChunkString("hello"), + Handlers.hang() + ); + try (HttpServer server = HttpServer.start(response)) { + try (ConnectStrategy.Client client = ConnectStrategy.http(server.getUri()) + .createClient(testLogger.getLogger())) { + ConnectStrategy.Client.Result result = client.connect(null); + + ResponseHeaders headers = result.getHeaders(); + assertThat(headers, Matchers.notNullValue()); + + // Test value() returns the header value + assertThat(headers.value("X-Test-Header"), equalTo("test-value")); + + // Test case-insensitivity + assertThat(headers.value("x-test-header"), equalTo("test-value")); + assertThat(headers.value("X-TEST-HEADER"), equalTo("test-value")); + + // Test that missing headers return null + assertThat(headers.value("X-Nonexistent"), nullValue()); + + result.getCloser().close(); + } + } + } + + @Test + public void responseHeadersWithMultipleValuesForSameHeader() throws Exception { + Handler response = Handlers.all( + // The test-helper will combine these headers, versus them being different header lines, this is different + // from what an HTTP server *may* choose to do. So we cannot validate the version with multiple header lines, + // and instead these headers are combined into a single header with multiple values (comma separated). + Handlers.addHeader("Set-Cookie", "cookie1=value1"), + Handlers.addHeader("Set-Cookie", "cookie2=value2"), + Handlers.startChunks("text/plain", Charset.forName("UTF-8")), + Handlers.writeChunkString("hello"), + Handlers.hang() + ); + try (HttpServer server = HttpServer.start(response)) { + try (ConnectStrategy.Client client = ConnectStrategy.http(server.getUri()) + .createClient(testLogger.getLogger())) { + ConnectStrategy.Client.Result result = client.connect(null); + + ResponseHeaders headers = result.getHeaders(); + assertThat(headers, Matchers.notNullValue()); + + // Count how many Set-Cookie headers we have + int setCookieCount = 0; + for (int i = 0; i < headers.size(); i++) { + if (headers.get(i).getName().equalsIgnoreCase("Set-Cookie")) { + setCookieCount++; + } + } + + // We should have at least 1 Set-Cookie header + // (the test server may combine them or keep them separate) + assertThat("Should have at least one Set-Cookie header", setCookieCount, Matchers.greaterThanOrEqualTo(1)); + + result.getCloser().close(); + } + } + } + + @Test + public void responseHeadersEdgeCases() throws Exception { + Handler response = Handlers.all( + Handlers.header("X-Test", "value"), + Handlers.startChunks("text/plain", Charset.forName("UTF-8")), + Handlers.writeChunkString("hello"), + Handlers.hang() + ); + try (HttpServer server = HttpServer.start(response)) { + try (ConnectStrategy.Client client = ConnectStrategy.http(server.getUri()) + .createClient(testLogger.getLogger())) { + ConnectStrategy.Client.Result result = client.connect(null); + + ResponseHeaders headers = result.getHeaders(); + assertThat(headers, Matchers.notNullValue()); + + // Test value() with null parameter + assertThat(headers.value(null), nullValue()); + + // Test isEmpty() - headers should not be empty + assertThat(headers.isEmpty(), equalTo(false)); + + // Test toString() - just verify it doesn't throw + assertThat(headers.toString(), Matchers.notNullValue()); + + // Test Header.toString() + ResponseHeaders.Header header = headers.get(0); + assertThat(header.toString(), Matchers.notNullValue()); + assertThat(header.toString(), Matchers.containsString(":")); + + result.getCloser().close(); + } + } + } + + @Test + public void responseHeadersEmpty() throws Exception { + // Test the empty headers case by using ResponseHeadersImpl directly + ResponseHeaders empty = ResponseHeadersImpl.empty(); + assertThat(empty, Matchers.notNullValue()); + assertThat(empty.isEmpty(), equalTo(true)); + assertThat(empty.size(), equalTo(0)); + assertThat(empty.value("Any-Header"), nullValue()); + assertThat(empty.toString(), Matchers.notNullValue()); + + // Test fromOkHttpHeaders with null + ResponseHeaders fromNull = ResponseHeadersImpl.fromOkHttpHeaders(null); + assertThat(fromNull.isEmpty(), equalTo(true)); + + // Test fromOkHttpHeaders with empty Headers + okhttp3.Headers emptyHeaders = new okhttp3.Headers.Builder().build(); + ResponseHeaders fromEmpty = ResponseHeadersImpl.fromOkHttpHeaders(emptyHeaders); + assertThat(fromEmpty.isEmpty(), equalTo(true)); + } + @Test public void requestFailsWithIOException() throws Exception { try (TcpServer server = TcpServer.start(TcpHandlers.noResponse())) { diff --git a/src/test/java/com/launchdarkly/eventsource/HttpConnectStrategyWithEventSourceTest.java b/src/test/java/com/launchdarkly/eventsource/HttpConnectStrategyWithEventSourceTest.java index 3162f0c..2ce7493 100644 --- a/src/test/java/com/launchdarkly/eventsource/HttpConnectStrategyWithEventSourceTest.java +++ b/src/test/java/com/launchdarkly/eventsource/HttpConnectStrategyWithEventSourceTest.java @@ -7,9 +7,13 @@ import org.junit.Rule; import org.junit.Test; +import org.hamcrest.Matchers; + import static com.launchdarkly.eventsource.TestUtils.interruptOnAnotherThread; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.notNullValue; +import static org.hamcrest.Matchers.nullValue; /** * Tests of basic EventSource behavior using real HTTP requests. @@ -40,6 +44,50 @@ public void eventSourceReadsEvents() throws Exception { } } + @Test + public void eventSourceTracksLastEventId() throws Exception { + Handler response = Handlers.all( + Handlers.SSE.start(), + Handlers.writeChunkString("id: 123\nevent: message\ndata: first\n\n"), + Handlers.writeChunkString("id: 456\nevent: message\ndata: second\n\n"), + Handlers.hang() + ); + + try (HttpServer server = HttpServer.start(response)) { + try (EventSource es = new EventSource.Builder(server.getUri()).build()) { + es.start(); + + assertThat(es.readAnyEvent(), equalTo( + new MessageEvent("message", "first", "123", es.getOrigin()))); + + assertThat(es.readAnyEvent(), equalTo( + new MessageEvent("message", "second", "456", es.getOrigin()))); + + // Verify the EventSource tracks the last event ID + assertThat(es.getLastEventId(), equalTo("456")); + } + } + } + + @Test + public void eventSourceHandlesRetryField() throws Exception { + Handler response = Handlers.all( + Handlers.SSE.start(), + Handlers.writeChunkString("retry: 5000\nevent: message\ndata: test\n\n"), + Handlers.hang() + ); + + try (HttpServer server = HttpServer.start(response)) { + try (EventSource es = new EventSource.Builder(server.getUri()).build()) { + es.start(); + + // The retry field should be processed internally, and we should still get the message + assertThat(es.readAnyEvent(), equalTo( + new MessageEvent("message", "test", null, es.getOrigin()))); + } + } + } + @Test public void eventSourceReconnectsAfterSocketClosed() throws Exception { Handler response1 = Handlers.all( @@ -104,4 +152,191 @@ public void eventSourceReconnectsAfterExternallyInterrupted() throws Exception { } } } + + @Test + public void messageEventsIncludeResponseHeaders() throws Exception { + Handler response = Handlers.all( + Handlers.header("X-Server-Id", "server-123"), + Handlers.header("X-Request-Id", "request-456"), + Handlers.SSE.start(), + Handlers.SSE.event("message", "data1"), + Handlers.SSE.event("message", "data2"), + Handlers.hang() + ); + + try (HttpServer server = HttpServer.start(response)) { + try (EventSource es = new EventSource.Builder(server.getUri()).build()) { + es.start(); + + // First message should have headers + StreamEvent event1 = es.readAnyEvent(); + assertThat(event1 instanceof MessageEvent, equalTo(true)); + MessageEvent msg1 = (MessageEvent) event1; + assertThat(msg1.getData(), equalTo("data1")); + + ResponseHeaders headers1 = msg1.getHeaders(); + assertThat(headers1, notNullValue()); + assertThat(headers1.size(), Matchers.greaterThan(0)); + + // Find our custom headers using iteration + String serverId1 = findHeaderValue(headers1, "X-Server-Id"); + String requestId1 = findHeaderValue(headers1, "X-Request-Id"); + assertThat(serverId1, equalTo("server-123")); + assertThat(requestId1, equalTo("request-456")); + + // Also validate the value() helper method + assertThat(headers1.value("X-Server-Id"), equalTo("server-123")); + assertThat(headers1.value("X-Request-Id"), equalTo("request-456")); + + // Second message should have the same headers (same connection) + StreamEvent event2 = es.readAnyEvent(); + assertThat(event2 instanceof MessageEvent, equalTo(true)); + MessageEvent msg2 = (MessageEvent) event2; + assertThat(msg2.getData(), equalTo("data2")); + + ResponseHeaders headers2 = msg2.getHeaders(); + assertThat(headers2, notNullValue()); + + String serverId2 = findHeaderValue(headers2, "X-Server-Id"); + String requestId2 = findHeaderValue(headers2, "X-Request-Id"); + assertThat(serverId2, equalTo("server-123")); + assertThat(requestId2, equalTo("request-456")); + + // Also validate the value() helper method + assertThat(headers2.value("X-Server-Id"), equalTo("server-123")); + assertThat(headers2.value("X-Request-Id"), equalTo("request-456")); + } + } + } + + private String findHeaderValue(ResponseHeaders headers, String name) { + for (int i = 0; i < headers.size(); i++) { + ResponseHeaders.Header header = headers.get(i); + if (header.getName().equalsIgnoreCase(name)) { + return header.getValue(); + } + } + return null; + } + + @Test + public void messageEventsIncludeHeadersFromReconnectedConnection() throws Exception { + Handler response1 = Handlers.all( + Handlers.header("X-Connection-Id", "connection-1"), + Handlers.SSE.start(), + Handlers.SSE.event("message", "first") + ); + Handler response2 = Handlers.all( + Handlers.header("X-Connection-Id", "connection-2"), + Handlers.SSE.start(), + Handlers.SSE.event("message", "second"), + Handlers.SSE.leaveOpen() + ); + Handler allResponses = Handlers.sequential(response1, response2); + + try (HttpServer server = HttpServer.start(allResponses)) { + try (EventSource es = new EventSource.Builder(server.getUri()) + .errorStrategy(ErrorStrategy.alwaysContinue()) + .retryDelay(1, null) + .build()) { + es.start(); + + // First message from first connection + StreamEvent event1 = es.readAnyEvent(); + assertThat(event1 instanceof MessageEvent, equalTo(true)); + MessageEvent msg1 = (MessageEvent) event1; + assertThat(msg1.getData(), equalTo("first")); + assertThat(msg1.getHeaders(), notNullValue()); + assertThat(findHeaderValue(msg1.getHeaders(), "X-Connection-Id"), equalTo("connection-1")); + // Also validate the value() helper method + assertThat(msg1.getHeaders().value("X-Connection-Id"), equalTo("connection-1")); + + // Fault event when connection closes + assertThat(es.readAnyEvent(), equalTo(new FaultEvent(new StreamClosedByServerException()))); + + // Started event for reconnection + assertThat(es.readAnyEvent(), equalTo(new StartedEvent())); + + // Second message from second connection (should have new headers) + StreamEvent event2 = es.readAnyEvent(); + assertThat(event2 instanceof MessageEvent, equalTo(true)); + MessageEvent msg2 = (MessageEvent) event2; + assertThat(msg2.getData(), equalTo("second")); + assertThat(msg2.getHeaders(), notNullValue()); + assertThat(findHeaderValue(msg2.getHeaders(), "X-Connection-Id"), equalTo("connection-2")); + // Also validate the value() helper method + assertThat(msg2.getHeaders().value("X-Connection-Id"), equalTo("connection-2")); + } + } + } + + @Test + public void faultEventsIncludeHeadersForHttpErrors() throws Exception { + Handler errorResponse = Handlers.all( + Handlers.header("X-Error-Code", "RATE_LIMIT_EXCEEDED"), + Handlers.header("Retry-After", "60"), + Handlers.header("X-RateLimit-Remaining", "0"), + Handlers.status(429) + ); + + try (HttpServer server = HttpServer.start(errorResponse)) { + try (EventSource es = new EventSource.Builder(server.getUri()) + .errorStrategy(ErrorStrategy.alwaysContinue()) + .build()) { + + StreamEvent event = es.readAnyEvent(); + assertThat(event instanceof FaultEvent, equalTo(true)); + FaultEvent fault = (FaultEvent) event; + + // Verify it's an HTTP error + assertThat(fault.getCause() instanceof StreamHttpErrorException, equalTo(true)); + StreamHttpErrorException httpError = (StreamHttpErrorException) fault.getCause(); + assertThat(httpError.getCode(), equalTo(429)); + + // Verify headers are available using iteration + ResponseHeaders headers = fault.getHeaders(); + assertThat(headers, notNullValue()); + assertThat(findHeaderValue(headers, "X-Error-Code"), equalTo("RATE_LIMIT_EXCEEDED")); + assertThat(findHeaderValue(headers, "Retry-After"), equalTo("60")); + assertThat(findHeaderValue(headers, "X-RateLimit-Remaining"), equalTo("0")); + + // Also validate the value() helper method + assertThat(headers.value("X-Error-Code"), equalTo("RATE_LIMIT_EXCEEDED")); + assertThat(headers.value("Retry-After"), equalTo("60")); + assertThat(headers.value("X-RateLimit-Remaining"), equalTo("0")); + } + } + } + + @Test + public void faultEventsDoNotIncludeHeadersForNonHttpErrors() throws Exception { + Handler response = Handlers.all( + Handlers.SSE.start(), + Handlers.SSE.event("message", "data1") + // Connection closes, causing StreamClosedByServerException + ); + + try (HttpServer server = HttpServer.start(response)) { + try (EventSource es = new EventSource.Builder(server.getUri()) + .errorStrategy(ErrorStrategy.alwaysContinue()) + .build()) { + es.start(); + + // Read the message + assertThat(es.readAnyEvent(), equalTo( + new MessageEvent("message", "data1", null, es.getOrigin()))); + + // Get the fault event + StreamEvent event = es.readAnyEvent(); + assertThat(event instanceof FaultEvent, equalTo(true)); + FaultEvent fault = (FaultEvent) event; + + // Verify it's not an HTTP error + assertThat(fault.getCause() instanceof StreamClosedByServerException, equalTo(true)); + + // Headers should be null for non-HTTP errors + assertThat(fault.getHeaders(), nullValue()); + } + } + } }