Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ dependencies {
api("com.launchdarkly:launchdarkly-logging:${Versions.launchdarklyLogging}")
api("com.squareup.okhttp3:okhttp:${Versions.okhttp}")
testImplementation("org.mockito:mockito-core:1.10.19")
testImplementation("com.launchdarkly:test-helpers:2.0.1")
testImplementation("com.launchdarkly:test-helpers:2.1.0")
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Resolves the dependency on nanohttpd.

testImplementation("com.google.guava:guava:32.0.1-jre")
testImplementation("junit:junit:4.12")
testImplementation("org.hamcrest:hamcrest-all:1.3")
Expand Down
33 changes: 31 additions & 2 deletions src/main/java/com/launchdarkly/eventsource/ConnectStrategy.java
Original file line number Diff line number Diff line change
Expand Up @@ -65,18 +65,34 @@ public static class Result {
private final InputStream inputStream;
private final URI origin;
private final Closeable closer;

private final ResponseHeaders headers;

/**
* Creates an instance.
*
*
* @param inputStream see {@link #getInputStream()}
* @param origin see {@link #getOrigin()}
* @param closer see {@link #getCloser()}
*/
public Result(InputStream inputStream, URI origin, Closeable closer) {
this(inputStream, origin, closer, null);
}

/**
* Creates an instance with response headers.
*
* @param inputStream see {@link #getInputStream()}
* @param origin see {@link #getOrigin()}
* @param closer see {@link #getCloser()}
* @param headers see {@link #getHeaders()}
*
* @since 4.2.0
*/
public Result(InputStream inputStream, URI origin, Closeable closer, ResponseHeaders headers) {
this.inputStream = inputStream;
this.origin = origin;
this.closer = closer;
this.headers = headers;
}

/**
Expand Down Expand Up @@ -118,6 +134,19 @@ public URI getOrigin() {
public Closeable getCloser() {
return closer;
}

/**
* Returns the response headers from the connection, or null if not available.
* <p>
* For HTTP connections, this contains the HTTP response headers. For other
* connection types, this may be null or may contain connection-specific metadata.
*
* @return the response headers, or null if not available
* @since 4.2.0
*/
public ResponseHeaders getHeaders() {
return headers;
}
}

/**
Expand Down
12 changes: 8 additions & 4 deletions src/main/java/com/launchdarkly/eventsource/EventParser.java
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ final class EventParser {
private Set<String> expectFields;
private final LDLogger logger;
private final URI origin;
private final ResponseHeaders headers;

private BufferedLineParser lineParser;
private byte[] chunkData;
Expand Down Expand Up @@ -77,14 +78,16 @@ final class EventParser {
int readBufferSize,
boolean streamEventData,
Set<String> expectFields,
LDLogger logger
LDLogger logger,
ResponseHeaders headers
) {
this.lineParser = new BufferedLineParser(inputStream, readBufferSize);
this.origin = origin;
this.streamEventData = streamEventData;
this.expectFields = expectFields;
this.logger = logger;

this.headers = headers;

dataBuffer = new ByteArrayOutputStream(VALUE_BUFFER_INITIAL_CAPACITY);
}

Expand Down Expand Up @@ -159,7 +162,7 @@ private StreamEvent tryNextEvent() throws StreamException {
}

String dataString = utf8ByteArrayOutputStreamToString(dataBuffer);
MessageEvent message = new MessageEvent(eventName, dataString, lastEventId, origin);
MessageEvent message = new MessageEvent(eventName, dataString, lastEventId, origin, headers);
resetState();
logger.debug("Received message: {}", message);
return message;
Expand Down Expand Up @@ -193,7 +196,8 @@ private StreamEvent tryNextEvent() throws StreamException {
eventName,
new InputStreamReader(messageDataStream),
lastEventId,
origin
origin,
headers
);
logger.debug("Received message: {}", message);
return message;
Expand Down
19 changes: 14 additions & 5 deletions src/main/java/com/launchdarkly/eventsource/EventSource.java
Original file line number Diff line number Diff line change
Expand Up @@ -301,14 +301,19 @@ private FaultEvent tryStart(boolean canReturnFaultEvent) throws StreamException
if (applyErrorStrategy(exception) == ErrorStrategy.Action.CONTINUE) {
// The ErrorStrategy told us to CONTINUE rather than throwing an exception.
if (canReturnFaultEvent) {
return new FaultEvent(exception);
// Extract headers from exception if available
ResponseHeaders headers = null;
if (exception instanceof StreamHttpErrorException) {
headers = ((StreamHttpErrorException) exception).getHeaders();
}
return new FaultEvent(exception, headers);
}
// If canReturnFaultEvent is false, it means the caller explicitly called start(),
// in which case there's no way to return a FaultEvent so we just keep retrying
// transparently.
continue;
}
// The ErrorStrategy told us to THROW rather than CONTINUE.
// The ErrorStrategy told us to THROW rather than CONTINUE.
throw exception;
}

Expand All @@ -317,14 +322,16 @@ private FaultEvent tryStart(boolean canReturnFaultEvent) throws StreamException
origin = clientResult.getOrigin() == null ? client.getOrigin() : clientResult.getOrigin();
connectedTime = System.currentTimeMillis();
logger.debug("Connected to SSE stream");


ResponseHeaders headers = clientResult.getHeaders();
eventParser = new EventParser(
clientResult.getInputStream(),
clientResult.getOrigin(),
readBufferSize,
streamEventData,
expectFields,
logger
logger,
headers
);

readyState.set(ReadyState.OPEN);
Expand Down Expand Up @@ -624,7 +631,9 @@ private StreamEvent requireEvent() throws StreamException {
eventParser = null;
computeReconnectDelay();
if (applyErrorStrategy(e) == ErrorStrategy.Action.CONTINUE) {
return new FaultEvent(e);
// At this point we're handling errors from reading the stream (not initial connection),
// so we never have HTTP response headers available (headers is always null)
return new FaultEvent(e, null);
}
throw e;
}
Expand Down
32 changes: 30 additions & 2 deletions src/main/java/com/launchdarkly/eventsource/FaultEvent.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,30 @@
*/
public final class FaultEvent implements StreamEvent {
private final StreamException cause;

private final ResponseHeaders headers;

/**
* Creates an instance.
*
* @param cause the cause of the failure
*/
public FaultEvent(StreamException cause) {
this(cause, null);
}

/**
* Creates an instance with response headers.
*
* @param cause the cause of the failure
* @param headers the response headers if available (typically for HTTP errors), or null
*
* @since 4.2.0
*/
public FaultEvent(StreamException cause, ResponseHeaders headers) {
this.cause = cause;
this.headers = headers;
}

/**
* Returns a {@link StreamException} describing the cause of the failure.
*
Expand All @@ -38,6 +52,20 @@ public FaultEvent(StreamException cause) {
public StreamException getCause() {
return cause;
}

/**
* Returns the response headers if available, or null otherwise.
* <p>
* For HTTP error responses (when the cause is a {@link StreamHttpErrorException}),
* this typically contains the HTTP response headers from the failed request.
* For other types of failures (I/O errors, timeouts, etc.), this may be null.
*
* @return the response headers, or null if not available
* @since 4.2.0
*/
public ResponseHeaders getHeaders() {
return headers;
}

@Override
public boolean equals(Object o) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -443,29 +443,32 @@ class Client extends ConnectStrategy.Client { // package-private visibility for
@Override
public Result connect(String lastEventId) throws StreamException {
logger.debug("Attempting to connect to SSE stream at {}", uri);

Request request = createRequest(lastEventId);
Call call = httpClient.newCall(request);

Response response;
try {
response = call.execute();
} catch (IOException e) {
logger.info("Connection failed: {}", LogValues.exceptionSummary(e));
throw new StreamIOException(e);
}

if (!response.isSuccessful()) {
ResponseHeaders headers = ResponseHeadersImpl.fromOkHttpHeaders(response.headers());
response.close();
logger.info("Server returned HTTP error {}", response.code());
throw new StreamHttpErrorException(response.code());
throw new StreamHttpErrorException(response.code(), headers);
}


ResponseHeaders headers = ResponseHeadersImpl.fromOkHttpHeaders(response.headers());
ResponseBody responseBody = response.body();
return new Result(
responseBody.byteStream(),
uri,
new RequestCloser(call)
new RequestCloser(call),
headers
);
}

Expand Down
Loading