-
Notifications
You must be signed in to change notification settings - Fork 401
feat: support Node.js for @jupyterhub/binderhub-client
#2044
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,4 +1,4 @@ | ||
| import { fetchEventSource } from "@microsoft/fetch-event-source"; | ||
| import { EventSource } from "eventsource"; | ||
| import { EventIterator } from "event-iterator"; | ||
|
|
||
| function _getXSRFToken() { | ||
|
|
@@ -21,8 +21,6 @@ function _getXSRFToken() { | |
| return null; | ||
| } | ||
|
|
||
| /* throw this to close the event stream */ | ||
| class EventStreamClose extends Error {} | ||
| /* throw this to close the event stream */ | ||
| class EventStreamRetry extends Error {} | ||
|
|
||
|
|
@@ -68,8 +66,7 @@ export class BinderRepository { | |
| } | ||
| this.apiToken = apiToken; | ||
|
|
||
| this.eventIteratorQueue = null; | ||
| this.abortSignal = null; | ||
| this.stopQueue = null; | ||
| } | ||
|
|
||
| /** | ||
|
|
@@ -96,7 +93,6 @@ export class BinderRepository { | |
| */ | ||
| fetch() { | ||
| const headers = {}; | ||
| this.abortController = new AbortController(); | ||
|
|
||
| if (this.apiToken && this.apiToken.length > 0) { | ||
| headers["Authorization"] = `Bearer ${this.apiToken}`; | ||
|
|
@@ -106,106 +102,73 @@ export class BinderRepository { | |
| headers["X-Xsrftoken"] = xsrf; | ||
| } | ||
| } | ||
| // setTimeout(() => this.close(), 1000); | ||
|
|
||
| const es = new EventSource(this.buildUrl, { | ||
| fetch: async (input, init) => { | ||
| const response = await fetch(input, { | ||
| ...init, | ||
| headers: { ...init.headers, ...headers }, | ||
| }); | ||
|
Comment on lines
+108
to
+111
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ensure we pass through the headers |
||
| // Known failures are passed on and handled in onError | ||
| if (response.ok) { | ||
| return response; | ||
| } else if ( | ||
| response.status >= 400 && | ||
| response.status < 500 && | ||
| response.status !== 429 | ||
| ) { | ||
| return response; | ||
| } | ||
| // Otherwise, throw, triggering a retry | ||
| throw new EventStreamRetry(); | ||
|
Comment on lines
+112
to
+123
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. OK responses, or known-failures, are passed through to We don't do that for retry-able responses, because |
||
| }, | ||
| }); | ||
|
|
||
| return new EventIterator((queue) => { | ||
| this.eventIteratorQueue = queue; | ||
| fetchEventSource(this.buildUrl, { | ||
| headers, | ||
| // signal used for closing | ||
| signal: this.abortController.signal, | ||
| // openWhenHidden leaves connection open (matches default) | ||
| // otherwise fetch-event closes connections, | ||
| // which would be nice if our javascript handled restarting messages better | ||
| openWhenHidden: true, | ||
| onopen: (response) => { | ||
| if (response.ok) { | ||
| return; // everything's good | ||
| } else if ( | ||
| response.status >= 400 && | ||
| response.status < 500 && | ||
| response.status !== 429 | ||
| ) { | ||
| queue.push({ | ||
| phase: "failed", | ||
| message: `Failed to connect to event stream: ${response.status} - ${response.text}\n`, | ||
| }); | ||
| throw new EventStreamClose(); | ||
| } else { | ||
| queue.push({ | ||
| phase: "unknown", | ||
| message: `Error connecting to event stream, retrying: ${response.status} - ${response.text}\n`, | ||
| }); | ||
| throw new EventStreamRetry(); | ||
| } | ||
| }, | ||
| this.stopQueue = () => queue.stop(); | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Leak the stop action. |
||
|
|
||
| onclose: () => { | ||
| if (!queue.isStopped) { | ||
| // close called before queue finished | ||
| queue.push({ | ||
| phase: "failed", | ||
| message: `Event stream closed unexpectedly\n`, | ||
| }); | ||
| queue.stop(); | ||
| // throw new EventStreamClose(); | ||
| } | ||
| }, | ||
| onerror: (error) => { | ||
| console.log("Event stream error", error); | ||
| if (error.name === "EventStreamRetry") { | ||
| // if we don't re-raise, connection will be retried; | ||
| queue.push({ | ||
| phase: "unknown", | ||
| message: `Error in event stream: ${error}\n`, | ||
| }); | ||
| return; | ||
| } | ||
| if ( | ||
| !(error.name === "EventStreamClose" || error.name === "AbortError") | ||
| ) { | ||
| // errors _other_ than EventStreamClose get displayed | ||
| queue.push({ | ||
| phase: "failed", | ||
| message: `Error in event stream: ${error}\n`, | ||
| }); | ||
| } | ||
| const onMessage = (event) => { | ||
| if (!event.data || event.data === "") { | ||
| // onmessage is called for the empty lines | ||
| return; | ||
| } | ||
| const data = JSON.parse(event.data); | ||
| // FIXME: fix case of phase/state upstream | ||
| if (data.phase) { | ||
| data.phase = data.phase.toLowerCase(); | ||
| } | ||
| queue.push(data); | ||
| if (data.phase === "failed") { | ||
| queue.stop(); | ||
| // need to rethrow to prevent reconnection | ||
| throw error; | ||
| }, | ||
| } | ||
| }; | ||
|
|
||
| const onError = (error) => { | ||
| queue.push({ | ||
| phase: "unknown", | ||
| message: `Error in event stream: ${error}\n`, | ||
| }); | ||
| queue.stop(); | ||
| }; | ||
|
|
||
| onmessage: (event) => { | ||
| if (!event.data || event.data === "") { | ||
| // onmessage is called for the empty lines | ||
| return; | ||
| } | ||
| const data = JSON.parse(event.data); | ||
| // FIXME: fix case of phase/state upstream | ||
| if (data.phase) { | ||
| data.phase = data.phase.toLowerCase(); | ||
| } | ||
| queue.push(data); | ||
| if (data.phase === "failed") { | ||
| throw new EventStreamClose(); | ||
| } | ||
| }, | ||
| }); | ||
| es.addEventListener("message", onMessage); | ||
| es.addEventListener("error", onError); | ||
| return () => { | ||
| es.removeEventListener("message", onMessage); | ||
| es.removeEventListener("error", onError); | ||
| es.close(); | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This closes the event source, from |
||
| }; | ||
| }); | ||
| } | ||
|
|
||
| /** | ||
| * Close the EventSource connection to the BinderHub API if it is open | ||
| */ | ||
| close() { | ||
| if (this.eventIteratorQueue) { | ||
| // Stop any currently running fetch() iterations | ||
| this.eventIteratorQueue.stop(); | ||
| this.eventIteratorQueue = null; | ||
| } | ||
| if (this.abortController) { | ||
| if (this.stopQueue) { | ||
| // close event source | ||
| this.abortController.abort(); | ||
| this.abortController = null; | ||
| this.stopQueue(); | ||
| this.stopQueue = null; | ||
| } | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The intention here is that the queue is the application-level interface we care about. The underlying
EventSourceis just an implementation detail.