Skip to content

Commit 0c8e69d

Browse files
authored
Merge pull request #3 from watercrawl/feature/fix-async
fix(stream): implement improved async event streaming mechanism
2 parents d22871e + 0a5b424 commit 0c8e69d

File tree

4 files changed

+79
-137
lines changed

4 files changed

+79
-137
lines changed

CHANGELOG.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,14 @@ All notable changes to this project will be documented in this file.
55
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
66
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
77

8+
## [1.1.1] - 2025-05-03
9+
10+
### Fixed
11+
- Improved async streaming implementation for more reliable event handling
12+
- Fixed search event monitoring to correctly process event status
13+
- Refactored stream processing for better error handling and resource management
14+
- Removed debug console.log statements from tests
15+
816
## [1.1.0] - 2025-04-30
917

1018
### Added

src/base.ts

Lines changed: 54 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,12 @@ import urlJoin from 'url-join';
44
export class BaseAPIClient {
55
protected client: AxiosInstance;
66
protected baseUrl: string;
7+
protected apiKey: string;
78

89
constructor(apiKey: string, baseUrl: string = 'https://app.watercrawl.dev') {
10+
this.apiKey = apiKey;
911
this.baseUrl = baseUrl;
12+
1013
this.client = axios.create({
1114
baseURL: this.baseUrl,
1215
headers: {
@@ -16,21 +19,19 @@ export class BaseAPIClient {
1619
}
1720
});
1821

19-
// Add response interceptor for error handling
2022
this.client.interceptors.response.use(
21-
response => response,
22-
error => {
23-
if (error.response) {
24-
// Log API errors
25-
console.error('API Error:', {
26-
url: error.config.url,
27-
status: error.response.status,
28-
data: error.response.data,
29-
headers: error.response.headers
30-
});
31-
}
32-
throw error;
33-
}
23+
response => response,
24+
error => {
25+
if (error.response) {
26+
console.error('API Error:', {
27+
url: error.config.url,
28+
status: error.response.status,
29+
data: error.response.data,
30+
headers: error.response.headers
31+
});
32+
}
33+
throw error;
34+
}
3435
);
3536
}
3637

@@ -63,30 +64,54 @@ export class BaseAPIClient {
6364
return urlJoin(this.baseUrl, ...parts);
6465
}
6566

66-
protected async streamEvents(endpoint: string, onEvent: (event: any) => void, config: AxiosRequestConfig = {}): Promise<void> {
67-
const response = await this.client.get(endpoint, {
68-
responseType: 'stream',
69-
...config
67+
68+
/**
69+
* Async generator that streams SSE data using fetch and yields parsed JSON
70+
*/
71+
protected async *fetchStream<T>(endpoint: string, config: AxiosRequestConfig = {}): AsyncGenerator<T> {
72+
const url = new URL(this.buildUrl(endpoint));
73+
if(config.params) {
74+
Object.keys(config.params).forEach(key => {
75+
url.searchParams.append(key, config.params[key]);
76+
})
77+
}
78+
const response = await fetch(url.toString(), {
79+
method: 'GET',
80+
headers: {
81+
// 'Accept': 'text/event-stream',
82+
'X-API-KEY': this.apiKey
83+
}
7084
});
85+
86+
if (!response.body) {
87+
throw new Error("No response body");
88+
}
89+
90+
const reader = response.body.getReader();
91+
const decoder = new TextDecoder("utf-8");
7192
let buffer = '';
7293

73-
const stream = response.data;
74-
stream.on('data', (chunk: string) => {
75-
buffer += chunk.toString();
94+
while (true) {
95+
const { done, value } = await reader.read();
96+
if (done) break;
97+
98+
buffer += decoder.decode(value, { stream: true });
99+
76100
const lines = buffer.split('\n');
77101
buffer = lines.pop() || '';
102+
78103
for (const line of lines) {
79-
if (line.startsWith('data: ')) {
104+
const trimmed = line.trim();
105+
if (trimmed.startsWith('data:')) {
106+
const dataStr = trimmed.slice(5).trim();
107+
if (dataStr === '[DONE]') return;
80108
try {
81-
const data = JSON.parse(line.slice(6));
82-
onEvent(data);
83-
} catch (error) {
84-
console.error('Error parsing event data:', line, error);
109+
yield JSON.parse(dataStr) as T;
110+
} catch (err) {
111+
console.error('Failed to parse JSON from stream:', dataStr, err);
85112
}
86113
}
87114
}
88-
});
89-
90-
return response.data;
115+
}
91116
}
92117
}

src/index.ts

Lines changed: 17 additions & 107 deletions
Original file line numberDiff line numberDiff line change
@@ -53,56 +53,11 @@ export class WaterCrawlAPIClient extends BaseAPIClient {
5353
}
5454

5555
async *monitorCrawlRequest(itemId: string, download: boolean = true): AsyncGenerator<CrawlEvent, void, unknown> {
56-
const events: CrawlEvent[] = [];
57-
let resolveNext: ((value: IteratorResult<CrawlEvent, void>) => void) | null = null;
58-
let isDone = false;
59-
let streamError: Error | null = null;
6056

61-
const processEvent = (event: CrawlEvent) => {
62-
if (resolveNext) {
63-
resolveNext({ value: event, done: false });
64-
resolveNext = null;
65-
} else {
66-
events.push(event);
67-
}
68-
};
69-
70-
const streamPromise = this.streamEvents(
57+
yield* this.fetchStream<CrawlEvent>(
7158
`/api/v1/core/crawl-requests/${itemId}/status/`,
72-
processEvent,
7359
{ params: { prefetched: download } }
74-
).catch((error) => {
75-
streamError = error;
76-
isDone = true;
77-
}).finally(() => {
78-
isDone = true;
79-
});
80-
81-
82-
try {
83-
while (!isDone || events.length > 0) {
84-
if (events.length > 0) {
85-
yield events.shift()!;
86-
} else if (!isDone) {
87-
await new Promise<IteratorResult<CrawlEvent, void>>((resolve) => {
88-
resolveNext = resolve;
89-
});
90-
}
91-
}
92-
93-
// Check for any remaining events after the stream ends
94-
while (events.length > 0) {
95-
yield events.shift()!;
96-
}
97-
98-
// If the stream failed, propagate the error
99-
if (streamError) {
100-
throw streamError;
101-
}
102-
} finally {
103-
// Ensure the stream is awaited and cleaned up properly
104-
await streamPromise;
105-
}
60+
);
10661
}
10762

10863
async getCrawlRequestResults(itemId: string): Promise<{ results: CrawlResult[] }> {
@@ -148,14 +103,14 @@ export class WaterCrawlAPIClient extends BaseAPIClient {
148103
* @throws Error if the sitemap is not available
149104
*/
150105
private async getCrawlRequestForSitemap(crawlRequest: string | CrawlRequest): Promise<CrawlRequest> {
151-
const request = typeof crawlRequest === 'string'
152-
? await this.getCrawlRequest(crawlRequest)
106+
const request = typeof crawlRequest === 'string'
107+
? await this.getCrawlRequest(crawlRequest)
153108
: crawlRequest;
154-
109+
155110
if (!request.sitemap) {
156111
throw new Error('Sitemap not found in crawl request');
157112
}
158-
113+
159114
return request;
160115
}
161116

@@ -166,11 +121,11 @@ export class WaterCrawlAPIClient extends BaseAPIClient {
166121
*/
167122
async downloadSitemap(crawlRequest: string | CrawlRequest): Promise<SitemapNode[]> {
168123
const request = await this.getCrawlRequestForSitemap(crawlRequest);
169-
124+
170125
if (!request.sitemap) {
171126
throw new Error('Sitemap URL is missing or undefined');
172127
}
173-
128+
174129
const response = await axios.get(request.sitemap);
175130
return response.data;
176131
}
@@ -237,23 +192,23 @@ export class WaterCrawlAPIClient extends BaseAPIClient {
237192
search_options: searchOptions,
238193
result_limit: resultLimit
239194
};
240-
195+
241196
const response = await this.post<SearchRequest>('/api/v1/core/search/', request);
242-
197+
243198
if (!sync) {
244199
return response;
245200
}
246-
201+
247202
// Monitor the search request until completion
248203
for await (const event of this.monitorSearchRequest(response.uuid, download)) {
249-
if (event.type === 'state' && ['finished', 'failed'].includes(event.status)) {
204+
if (event.type === 'state' && ['finished', 'failed'].includes(event.data.status)) {
250205
if (download && Array.isArray(event.data.result)) {
251206
return event.data.result as SearchResult[];
252207
}
253208
return event.data;
254209
}
255210
}
256-
211+
257212
throw new Error('Search request failed or timed out');
258213
}
259214

@@ -264,55 +219,10 @@ export class WaterCrawlAPIClient extends BaseAPIClient {
264219
* @returns AsyncGenerator yielding search events
265220
*/
266221
async *monitorSearchRequest(itemId: string, download: boolean = true): AsyncGenerator<SearchEvent, void, unknown> {
267-
const events: SearchEvent[] = [];
268-
let resolveNext: ((value: IteratorResult<SearchEvent, void>) => void) | null = null;
269-
let isDone = false;
270-
let streamError: Error | null = null;
271-
272-
const processEvent = (event: SearchEvent) => {
273-
if (resolveNext) {
274-
resolveNext({ value: event, done: false });
275-
resolveNext = null;
276-
} else {
277-
events.push(event);
278-
}
279-
};
280-
281-
const streamPromise = this.streamEvents(
282-
`/api/v1/core/search/${itemId}/status/`,
283-
processEvent,
284-
{ params: { prefetched: download } }
285-
).catch((error) => {
286-
streamError = error;
287-
isDone = true;
288-
}).finally(() => {
289-
isDone = true;
290-
});
291-
292-
try {
293-
while (!isDone || events.length > 0) {
294-
if (events.length > 0) {
295-
yield events.shift()!;
296-
} else if (!isDone) {
297-
await new Promise<IteratorResult<SearchEvent, void>>((resolve) => {
298-
resolveNext = resolve;
299-
});
300-
}
301-
}
302-
303-
// Check for any remaining events after the stream ends
304-
while (events.length > 0) {
305-
yield events.shift()!;
306-
}
307-
308-
// If the stream failed, propagate the error
309-
if (streamError) {
310-
throw streamError;
311-
}
312-
} finally {
313-
// Ensure the stream is awaited and cleaned up properly
314-
await streamPromise;
315-
}
222+
yield* this.fetchStream<SearchEvent>(
223+
`/api/v1/core/search/${itemId}/status/`,
224+
{ params: { prefetched: download } }
225+
);
316226
}
317227

318228
/**

test/api.test.ts

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,6 @@ describe('WaterCrawlAPI', () => {
4949

5050
let count = 0;
5151
for await (const item of generator) {
52-
console.log("TEST", item);
5352
expect(typeof item).toBe('object');
5453
count++;
5554
if (count >= 2) break; // Only test first few events to keep test duration reasonable

0 commit comments

Comments
 (0)