@@ -41,7 +41,14 @@ public MultiRetryWhenOp(Multi<? extends T> upstream, Predicate<? super Throwable
4141 private static <T > void subscribe (MultiSubscriber <? super T > downstream , Predicate <? super Throwable > onFailurePredicate ,
4242 Function <? super Multi <Throwable >, ? extends Publisher <?>> triggerStreamFactory ,
4343 Multi <? extends T > upstream ) {
44- TriggerSubscriber other = new TriggerSubscriber ();
44+ Context context ;
45+ if (downstream instanceof ContextSupport provider ) {
46+ context = provider .context ();
47+ } else {
48+ context = Context .empty ();
49+ }
50+
51+ TriggerSubscriber other = new TriggerSubscriber (context );
4552 Subscriber <Throwable > signaller = new SerializedSubscriber <>(other .processor );
4653 signaller .onSubscribe (Subscriptions .empty ());
4754 MultiSubscriber <T > serialized = new SerializedSubscriber <>(downstream );
@@ -176,7 +183,11 @@ static final class TriggerSubscriber extends AbstractMulti<Throwable>
176183 implements Multi <Throwable >, Subscriber <Object >, ContextSupport {
177184 RetryWhenOperator <?> operator ;
178185 private final Flow .Processor <Throwable , Throwable > processor = UnicastProcessor .<Throwable > create ().serialized ();
179- private Context context ;
186+ private final Context context ;
187+
188+ TriggerSubscriber (Context context ) {
189+ this .context = context ;
190+ }
180191
181192 @ Override
182193 public void onSubscribe (Flow .Subscription s ) {
@@ -200,11 +211,6 @@ public void onComplete() {
200211
201212 @ Override
202213 public void subscribe (Subscriber <? super Throwable > actual ) {
203- if (actual instanceof ContextSupport ) {
204- this .context = ((ContextSupport ) actual ).context ();
205- } else {
206- this .context = Context .empty ();
207- }
208214 processor .subscribe (actual );
209215 }
210216
0 commit comments