Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
151 changes: 57 additions & 94 deletions js/packages/binderhub-client/lib/client.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { fetchEventSource } from "@microsoft/fetch-event-source";
import { EventSource } from "eventsource";
import { EventIterator } from "event-iterator";

function _getXSRFToken() {
Expand All @@ -21,8 +21,6 @@ function _getXSRFToken() {
return null;
}

/* throw this to close the event stream */
class EventStreamClose extends Error {}
/* throw this to close the event stream */
class EventStreamRetry extends Error {}

Expand Down Expand Up @@ -68,8 +66,7 @@ export class BinderRepository {
}
this.apiToken = apiToken;

this.eventIteratorQueue = null;
this.abortSignal = null;
this.stopQueue = null;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The intention here is that the queue is the application-level interface we care about. The underlying EventSource is just an implementation detail.

}

/**
Expand All @@ -96,7 +93,6 @@ export class BinderRepository {
*/
fetch() {
const headers = {};
this.abortController = new AbortController();

if (this.apiToken && this.apiToken.length > 0) {
headers["Authorization"] = `Bearer ${this.apiToken}`;
Expand All @@ -106,106 +102,73 @@ export class BinderRepository {
headers["X-Xsrftoken"] = xsrf;
}
}
// setTimeout(() => this.close(), 1000);

const es = new EventSource(this.buildUrl, {
fetch: async (input, init) => {
const response = await fetch(input, {
...init,
headers: { ...init.headers, ...headers },
});
Comment on lines +108 to +111
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ensure we pass through the headers

// Known failures are passed on and handled in onError
if (response.ok) {
return response;
} else if (
response.status >= 400 &&
response.status < 500 &&
response.status !== 429
) {
return response;
}
// Otherwise, throw, triggering a retry
throw new EventStreamRetry();
Comment on lines +112 to +123
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK responses, or known-failures, are passed through to EventSource, which dispatches the proper events.

We don't do that for retry-able responses, because EventSource doesn't feature a configurable retry mechanism — it only kicks in for failures during fetching.

},
});

return new EventIterator((queue) => {
this.eventIteratorQueue = queue;
fetchEventSource(this.buildUrl, {
headers,
// signal used for closing
signal: this.abortController.signal,
// openWhenHidden leaves connection open (matches default)
// otherwise fetch-event closes connections,
// which would be nice if our javascript handled restarting messages better
openWhenHidden: true,
onopen: (response) => {
if (response.ok) {
return; // everything's good
} else if (
response.status >= 400 &&
response.status < 500 &&
response.status !== 429
) {
queue.push({
phase: "failed",
message: `Failed to connect to event stream: ${response.status} - ${response.text}\n`,
});
throw new EventStreamClose();
} else {
queue.push({
phase: "unknown",
message: `Error connecting to event stream, retrying: ${response.status} - ${response.text}\n`,
});
throw new EventStreamRetry();
}
},
this.stopQueue = () => queue.stop();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Leak the stop action.


onclose: () => {
if (!queue.isStopped) {
// close called before queue finished
queue.push({
phase: "failed",
message: `Event stream closed unexpectedly\n`,
});
queue.stop();
// throw new EventStreamClose();
}
},
onerror: (error) => {
console.log("Event stream error", error);
if (error.name === "EventStreamRetry") {
// if we don't re-raise, connection will be retried;
queue.push({
phase: "unknown",
message: `Error in event stream: ${error}\n`,
});
return;
}
if (
!(error.name === "EventStreamClose" || error.name === "AbortError")
) {
// errors _other_ than EventStreamClose get displayed
queue.push({
phase: "failed",
message: `Error in event stream: ${error}\n`,
});
}
const onMessage = (event) => {
if (!event.data || event.data === "") {
// onmessage is called for the empty lines
return;
}
const data = JSON.parse(event.data);
// FIXME: fix case of phase/state upstream
if (data.phase) {
data.phase = data.phase.toLowerCase();
}
queue.push(data);
if (data.phase === "failed") {
queue.stop();
// need to rethrow to prevent reconnection
throw error;
},
}
};

const onError = (error) => {
queue.push({
phase: "unknown",
message: `Error in event stream: ${error}\n`,
});
queue.stop();
};

onmessage: (event) => {
if (!event.data || event.data === "") {
// onmessage is called for the empty lines
return;
}
const data = JSON.parse(event.data);
// FIXME: fix case of phase/state upstream
if (data.phase) {
data.phase = data.phase.toLowerCase();
}
queue.push(data);
if (data.phase === "failed") {
throw new EventStreamClose();
}
},
});
es.addEventListener("message", onMessage);
es.addEventListener("error", onError);
return () => {
es.removeEventListener("message", onMessage);
es.removeEventListener("error", onError);
es.close();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This closes the event source, from stopQueue().

};
});
}

/**
* Close the EventSource connection to the BinderHub API if it is open
*/
close() {
if (this.eventIteratorQueue) {
// Stop any currently running fetch() iterations
this.eventIteratorQueue.stop();
this.eventIteratorQueue = null;
}
if (this.abortController) {
if (this.stopQueue) {
// close event source
this.abortController.abort();
this.abortController = null;
this.stopQueue();
this.stopQueue = null;
}
}
}
2 changes: 1 addition & 1 deletion js/packages/binderhub-client/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
},
"homepage": "https://github.com/jupyterhub/binderhub#readme",
"dependencies": {
"@microsoft/fetch-event-source": "^2.0.1",
"eventsource": "^4.1.0",
"event-iterator": "^2.0.0"
}
}
Loading