Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 57 additions & 1 deletion implementation/revapi.json
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,63 @@
"criticality" : "highlight",
"minSeverity" : "POTENTIALLY_BREAKING",
"minCriticality" : "documented",
"differences" : [ ]
"differences" : [
{
"ignore": true,
"code": "java.method.parameterTypeChanged",
"old": "parameter boolean io.smallrye.mutiny.Context::contains(===java.lang.String===)",
"new": "parameter boolean io.smallrye.mutiny.Context::contains(===java.lang.Object===)",
"parameterIndex": "0",
"justification": "ADD YOUR EXPLANATION FOR THE NECESSITY OF THIS CHANGE"
},
{
"ignore": true,
"code": "java.method.parameterTypeChanged",
"old": "parameter io.smallrye.mutiny.Context io.smallrye.mutiny.Context::delete(===java.lang.String===)",
"new": "parameter io.smallrye.mutiny.Context io.smallrye.mutiny.Context::delete(===java.lang.Object===)",
"parameterIndex": "0",
"justification": "ADD YOUR EXPLANATION FOR THE NECESSITY OF THIS CHANGE"
},
{
"ignore": true,
"code": "java.method.parameterTypeParameterChanged",
"old": "parameter io.smallrye.mutiny.Context io.smallrye.mutiny.Context::from(===java.util.Map<java.lang.String, ?>===)",
"new": "parameter io.smallrye.mutiny.Context io.smallrye.mutiny.Context::from(===java.util.Map<java.lang.Object, ?>===)",
"parameterIndex": "0",
"justification": "ADD YOUR EXPLANATION FOR THE NECESSITY OF THIS CHANGE"
},
{
"ignore": true,
"code": "java.method.parameterTypeChanged",
"old": "parameter <T> T io.smallrye.mutiny.Context::get(===java.lang.String===) throws java.util.NoSuchElementException",
"new": "parameter <T> T io.smallrye.mutiny.Context::get(===java.lang.Object===) throws java.util.NoSuchElementException",
"parameterIndex": "0",
"justification": "ADD YOUR EXPLANATION FOR THE NECESSITY OF THIS CHANGE"
},
{
"ignore": true,
"code": "java.method.parameterTypeChanged",
"old": "parameter <T> T io.smallrye.mutiny.Context::getOrElse(===java.lang.String===, java.util.function.Supplier<? extends T>)",
"new": "parameter <T> T io.smallrye.mutiny.Context::getOrElse(===java.lang.Object===, java.util.function.Supplier<? extends T>)",
"parameterIndex": "0",
"justification": "ADD YOUR EXPLANATION FOR THE NECESSITY OF THIS CHANGE"
},
{
"ignore": true,
"code": "java.method.returnTypeTypeParametersChanged",
"old": "method java.util.Set<java.lang.String> io.smallrye.mutiny.Context::keys()",
"new": "method java.util.Set<java.lang.Object> io.smallrye.mutiny.Context::keys()",
"justification": "ADD YOUR EXPLANATION FOR THE NECESSITY OF THIS CHANGE"
},
{
"ignore": true,
"code": "java.method.parameterTypeChanged",
"old": "parameter io.smallrye.mutiny.Context io.smallrye.mutiny.Context::put(===java.lang.String===, java.lang.Object)",
"new": "parameter io.smallrye.mutiny.Context io.smallrye.mutiny.Context::put(===java.lang.Object===, java.lang.Object)",
"parameterIndex": "0",
"justification": "ADD YOUR EXPLANATION FOR THE NECESSITY OF THIS CHANGE"
}
]
}
}, {
"extension" : "revapi.reporter.json",
Expand Down
43 changes: 28 additions & 15 deletions implementation/src/main/java/io/smallrye/mutiny/Context.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@
* A context is provided by a {@link io.smallrye.mutiny.subscription.UniSubscriber} or {@link Flow.Subscriber}
* that implements {@link io.smallrye.mutiny.subscription.ContextSupport}.
* <p>
* Context keys are represented as {@link String} while values can be from heterogeneous types.
* <p>
* {@link Context} instances are thread-safe.
* Internal storage is not allocated until the first entry is being added.
* <p>
Expand Down Expand Up @@ -57,9 +55,9 @@ public static Context of(Object... entries) {
if (entries.length % 2 != 0) {
throw new IllegalArgumentException("Arguments must be balanced to form (key, value) pairs");
}
HashMap<String, Object> map = new HashMap<>();
HashMap<Object, Object> map = new HashMap<>();
for (int i = 0; i < entries.length; i = i + 2) {
String key = nonNull(entries[i], "key").toString();
Object key = nonNull(entries[i], "key");
Object value = nonNull(entries[i + 1], "value");
map.put(key, value);
}
Expand All @@ -73,17 +71,17 @@ public static Context of(Object... entries) {
* @return the new context
* @throws NullPointerException when {@code entries} is null
*/
public static Context from(Map<String, ?> entries) {
public static Context from(Map<Object, ?> entries) {
return new Context(requireNonNull(entries, "The entries map cannot be null"));
}

private volatile ConcurrentHashMap<String, Object> entries;
private volatile ConcurrentHashMap<Object, Object> entries;

private Context() {
this.entries = null;
}

private Context(Map<String, ?> initialEntries) {
private Context(Map<Object, ?> initialEntries) {
this.entries = new ConcurrentHashMap<>(initialEntries);
}

Expand All @@ -93,7 +91,7 @@ private Context(Map<String, ?> initialEntries) {
* @param key the key
* @return {@code true} when there is an entry for {@code key}, {@code false} otherwise
*/
public boolean contains(String key) {
public boolean contains(Object key) {
if (entries == null) {
return false;
} else {
Expand All @@ -110,7 +108,7 @@ public boolean contains(String key) {
* @throws NoSuchElementException when there is no entry for {@code key}
*/
@SuppressWarnings("unchecked")
public <T> T get(String key) throws NoSuchElementException {
public <T> T get(Object key) throws NoSuchElementException {
if (entries == null) {
throw new NoSuchElementException("The context is empty");
}
Expand All @@ -130,7 +128,7 @@ public <T> T get(String key) throws NoSuchElementException {
* @return the value
*/
@SuppressWarnings("unchecked")
public <T> T getOrElse(String key, Supplier<? extends T> alternativeSupplier) {
public <T> T getOrElse(Object key, Supplier<? extends T> alternativeSupplier) {
if (entries != null) {
T value = (T) entries.get(key);
if (value != null) {
Expand All @@ -147,7 +145,7 @@ public <T> T getOrElse(String key, Supplier<? extends T> alternativeSupplier) {
* @param value the value, cannot be {@code null}
* @return this context
*/
public Context put(String key, Object value) {
public Context put(Object key, Object value) {
if (entries == null) {
synchronized (this) {
if (entries == null) {
Expand All @@ -165,7 +163,7 @@ public Context put(String key, Object value) {
* @param key the key
* @return this context
*/
public Context delete(String key) {
public Context delete(Object key) {
if (entries != null) {
entries.remove(key);
}
Expand All @@ -188,18 +186,33 @@ public boolean isEmpty() {
*
* @return the set of keys
*/
public Set<String> keys() {
public Set<Object> keys() {
if (this.entries == null) {
return Collections.emptySet();
}
HashSet<String> set = new HashSet<>();
Enumeration<String> enumeration = entries.keys();
HashSet<Object> set = new HashSet<>();
Enumeration<Object> enumeration = entries.keys();
while (enumeration.hasMoreElements()) {
set.add(enumeration.nextElement());
}
return set;
}

/**
* Fork a context from the current context.
*
* @return a new context with the current context data as initial values
*/
public Context fork() {
synchronized (this) {
if (this.isEmpty()) {
return Context.empty();
} else {
return Context.from(this.entries);
}
}
}

@Override
public boolean equals(Object other) {
if (this == other) {
Expand Down
10 changes: 10 additions & 0 deletions implementation/src/main/java/io/smallrye/mutiny/Multi.java
Original file line number Diff line number Diff line change
Expand Up @@ -739,4 +739,14 @@ default Multi<T> capDemandsTo(long max) {
default <K extends Enum<K>> MultiSplitter<T, K> split(Class<K> keyType, Function<T, K> splitter) {
return new MultiSplitter<>(this, keyType, splitter);
}

@CheckReturnValue
default Multi<T> forkContext() {
return forkContext(newContext -> {});
}

@CheckReturnValue
default Multi<T> forkContext(Consumer<Context> additionalSteps) {
throw new UnsupportedOperationException("Default method added to limit binary incompatibility");
}
}
10 changes: 10 additions & 0 deletions implementation/src/main/java/io/smallrye/mutiny/Uni.java
Original file line number Diff line number Diff line change
Expand Up @@ -855,4 +855,14 @@ default <R> Uni<R> withContext(BiFunction<Uni<T>, Context, Uni<R>> builder) {
default Uni<ItemWithContext<T>> attachContext() {
return this.withContext((uni, ctx) -> uni.onItem().transform(item -> new ItemWithContext<>(ctx, item)));
}

@CheckReturnValue
default Uni<T> forkContext() {
return forkContext(newContext -> {});
}

@CheckReturnValue
default Uni<T> forkContext(Consumer<Context> additionalSteps) {
throw new UnsupportedOperationException("Default method added to limit binary incompatibility");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import java.util.concurrent.Executor;
import java.util.concurrent.Flow;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.LongFunction;
import java.util.function.Predicate;

Expand All @@ -32,12 +33,7 @@
import io.smallrye.mutiny.groups.MultiSubscribe;
import io.smallrye.mutiny.helpers.ParameterValidation;
import io.smallrye.mutiny.infrastructure.Infrastructure;
import io.smallrye.mutiny.operators.multi.MultiCacheOp;
import io.smallrye.mutiny.operators.multi.MultiDemandCapping;
import io.smallrye.mutiny.operators.multi.MultiEmitOnOp;
import io.smallrye.mutiny.operators.multi.MultiLogger;
import io.smallrye.mutiny.operators.multi.MultiSubscribeOnOp;
import io.smallrye.mutiny.operators.multi.MultiWithContext;
import io.smallrye.mutiny.operators.multi.*;
import io.smallrye.mutiny.operators.multi.processors.BroadcastProcessor;
import io.smallrye.mutiny.subscription.MultiSubscriber;
import io.smallrye.mutiny.subscription.MultiSubscriberAdapter;
Expand Down Expand Up @@ -216,4 +212,9 @@ public MultiDemandPausing<T> pauseDemand() {
public Multi<T> capDemandsUsing(LongFunction<Long> function) {
return Infrastructure.onMultiCreation(new MultiDemandCapping<>(this, nonNull(function, "function")));
}

@Override
public Multi<T> forkContext(Consumer<Context> additionalSteps) {
return Infrastructure.onMultiCreation(new MultiContextForkingOperator<>(this, nonNull(additionalSteps, "additionalSteps")));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import java.util.concurrent.Executor;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Predicate;

import io.smallrye.mutiny.Context;
Expand Down Expand Up @@ -148,4 +149,9 @@ public Uni<T> log() {
public <R> Uni<R> withContext(BiFunction<Uni<T>, Context, Uni<R>> builder) {
return Infrastructure.onUniCreation(new UniWithContext<>(this, nonNull(builder, "builder")));
}

@Override
public Uni<T> forkContext(Consumer<Context> additionalSteps) {
return Infrastructure.onUniCreation(new UniContextForkingOperator<>(this, nonNull(additionalSteps, "additionalSteps")));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package io.smallrye.mutiny.operators.multi;

import io.smallrye.mutiny.Context;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.subscription.ContextSupport;
import io.smallrye.mutiny.subscription.MultiSubscriber;

import java.util.function.Consumer;

public final class MultiContextForkingOperator<T> extends AbstractMultiOperator<T, T> {

private final Consumer<Context> additionalSteps;

public MultiContextForkingOperator(Multi<? extends T> upstream, Consumer<Context> additionalSteps) {
super(upstream);
this.additionalSteps = additionalSteps;
}

@Override
public void subscribe(MultiSubscriber<? super T> subscriber) {
upstream.subscribe().withSubscriber(new MultiContextForkingOperatorProcessor<T>(subscriber, additionalSteps));
}

private static class MultiContextForkingOperatorProcessor<T> extends MultiOperatorProcessor<T, T> {

private final Context forkedContext;

public MultiContextForkingOperatorProcessor(MultiSubscriber<? super T> downstream, Consumer<Context> additionalSteps) {
super(downstream);
if (downstream instanceof ContextSupport provider) {
this.forkedContext = provider.context().fork();
} else {
this.forkedContext = Context.empty();
}
additionalSteps.accept(this.forkedContext);
}

@Override
public Context context() {
return forkedContext;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,14 @@ public MultiRetryWhenOp(Multi<? extends T> upstream, Predicate<? super Throwable
private static <T> void subscribe(MultiSubscriber<? super T> downstream, Predicate<? super Throwable> onFailurePredicate,
Function<? super Multi<Throwable>, ? extends Publisher<?>> triggerStreamFactory,
Multi<? extends T> upstream) {
TriggerSubscriber other = new TriggerSubscriber();
Context context;
if (downstream instanceof ContextSupport provider) {
context = provider.context();
} else {
context = Context.empty();
}

TriggerSubscriber other = new TriggerSubscriber(context);
Subscriber<Throwable> signaller = new SerializedSubscriber<>(other.processor);
signaller.onSubscribe(Subscriptions.empty());
MultiSubscriber<T> serialized = new SerializedSubscriber<>(downstream);
Expand Down Expand Up @@ -176,7 +183,11 @@ static final class TriggerSubscriber extends AbstractMulti<Throwable>
implements Multi<Throwable>, Subscriber<Object>, ContextSupport {
RetryWhenOperator<?> operator;
private final Flow.Processor<Throwable, Throwable> processor = UnicastProcessor.<Throwable> create().serialized();
private Context context;
private final Context context;

TriggerSubscriber(Context context) {
this.context = context;
}

@Override
public void onSubscribe(Flow.Subscription s) {
Expand All @@ -200,11 +211,6 @@ public void onComplete() {

@Override
public void subscribe(Subscriber<? super Throwable> actual) {
if (actual instanceof ContextSupport) {
this.context = ((ContextSupport) actual).context();
} else {
this.context = Context.empty();
}
processor.subscribe(actual);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ private class AndSupervisor implements UniSubscription {

Context context = subscriber.context();
for (Uni<?> uni : unis) {
UniHandler result = new UniHandler(this, uni, context);
UniHandler result = new UniHandler(this, uni, context.fork());
handlers.add(result);
}

Expand Down
Loading
Loading