Conversation
| // TODO: is using `@rsocket/rxjs` as intermediary adapter a bad idea? | ||
| // - considerations: | ||
| // - do we lose support for backpressure that we wouldn't have otherwise? | ||
| // - what is bundle size consequences of relying on `@rsocket/rxjs`? | ||
| // - what is bundle size consequences of relying on `rxjs` and `rxjs-for-await` | ||
| const $responderObs = RxRequestersFactory.requestChannel( | ||
| $requesterObs, | ||
| inputCodec, | ||
| outputCodec, | ||
| prefetch | ||
| )(rsocket, metadata); |
There was a problem hiding this comment.
Here is where I leveraged the Rx adapter to support requestChannel more easily, but doing so raises some questions, which I've detailed in the comment, and below:
- do we lose support for backpressure that we wouldn't have otherwise?
- what is bundle size consequences of relying on
@rsocket/rxjs? - what is bundle size consequences of relying on
rxjsandrxjs-for-await
There was a problem hiding this comment.
@OlegDokuka can you help out with providing an alternative async adapter implementation for requestChannel here?
| const subscriberFactory = RxRespondersFactory.requestChannel( | ||
| ($in) => from(handler(eachValueFrom($in))), | ||
| codecs, | ||
| prefetch | ||
| ); | ||
|
|
||
| return subscriberFactory(payload, initialRequestN, isCompleted, s); |
There was a problem hiding this comment.
Same idea here as above, I leveraged the RX adapter to more easily provide this functionality, but not sure if it's the best idea.
| export function requestStream<T, R>( | ||
| handler: DefaultResponderHandlerSignature<T>, | ||
| codecs: { | ||
| inputCodec: Codec<T>; | ||
| outputCodec: Codec<R>; | ||
| } | ||
| ) { | ||
| return Object.assign< | ||
| DefaultResponderHandlerSignature<Payload>, | ||
| { requestType: FrameTypes.REQUEST_STREAM } | ||
| >( | ||
| (payload, initialRequestN, subscriber) => { | ||
| return handler( | ||
| codecs.inputCodec.decode(payload.data), | ||
| initialRequestN, | ||
| subscriber | ||
| ); | ||
| }, | ||
| { requestType: FrameTypes.REQUEST_STREAM } | ||
| ); | ||
| } |
There was a problem hiding this comment.
As I mentioned in the last PR, I started adding a set of "default" responders to aid with my testing, as well as an additional feature. I only got as far as this single responder though, so if we don't want to land this with only a single responder, we can pick it out and loop back around to adding the rest later.
There was a problem hiding this comment.
There is also the question of if these default request/responders should be in the messaging package or elsewhere.
There was a problem hiding this comment.
TODO: Reviewed where I am using this default responder and its only in an example currently so lets not land this API addition with this effort.
- requester fireAndForget - requester requestResponse - requester requestStream refactor: renamed to SubscribingAsyncIterator + added more tests feat: (wip) add async responders - fireAndForget - requestResponse feat: AsyncIterable requestStream responder refactor: use rxjs observer for async iterable requestStream example feat: add requesChannel responders and requesters refactor: remove unnecessary passing of scheduler test: (wip) requester tests test: async requestResponse requesters tests test: async adapter fireAndForget requester tests refactor: apply linting fix: resolve issues from rebasing test: add tests for requestStream requester refactor: rename async package to adapter-async Signed-off-by: Kevin Viglucci <kviglucci@gmail.com>
Signed-off-by: Kevin Viglucci <kviglucci@gmail.com>
Signed-off-by: Kevin Viglucci <kviglucci@gmail.com>
ee225e6 to
f30d5b6
Compare
Signed-off-by: Kevin Viglucci <kviglucci@gmail.com>
04fca05 to
43cd833
Compare
Signed-off-by: Kevin Viglucci <kviglucci@gmail.com>
479727a to
303c6f3
Compare
Adds an adapter package that supports
async/awaitand async generators for the various interaction patterns.