create(T value, Synchronizer synchronizer, AutoCloseable releaseAction, Object comment) {
return new RefImpl<>(null, value, synchronizer, releaseAction, comment);
}
diff --git a/jena-serviceenhancer/src/main/java/org/apache/jena/sparql/service/enhancer/claimingcache/Synchronizer.java b/jena-serviceenhancer/src/main/java/org/apache/jena/sparql/service/enhancer/claimingcache/Synchronizer.java
new file mode 100644
index 00000000000..f67b06e9cfb
--- /dev/null
+++ b/jena-serviceenhancer/src/main/java/org/apache/jena/sparql/service/enhancer/claimingcache/Synchronizer.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package org.apache.jena.sparql.service.enhancer.claimingcache;
+
+import java.util.function.Consumer;
+
+/**
+ * Abstracts synchronization for running actions atomically.
+ * Examples for typical implementations:
+ *
+ * // Example 1
+ * synchronized (object) {
+ * action.run();
+ * }
+ *
+ * // Example 2
+ * concurrentHashMap.compute(key, (k, v) -> {
+ * action.run();
+ * retun null;
+ * });
+ *
+ * // Example 3
+ * lock.lock();
+ * try {
+ * action.run();
+ * } finally {
+ * lock.unlock();
+ * }
+ *
+ *
+ */
+public interface Synchronizer
+ extends Consumer
+{
+}
diff --git a/jena-serviceenhancer/src/main/java/org/apache/jena/sparql/service/enhancer/claimingcache/SynchronizerMap.java b/jena-serviceenhancer/src/main/java/org/apache/jena/sparql/service/enhancer/claimingcache/SynchronizerMap.java
new file mode 100644
index 00000000000..01c995e5de8
--- /dev/null
+++ b/jena-serviceenhancer/src/main/java/org/apache/jena/sparql/service/enhancer/claimingcache/SynchronizerMap.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package org.apache.jena.sparql.service.enhancer.claimingcache;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Function;
+
+/**
+ * A helper to synchronize actions on keys.
+ * This class maps keys to proxy objects for synchronization and
+ * provides methods to remove the proxy objects when they are no longer needed.
+ *
+ * @param The key type.
+ */
+public class SynchronizerMap {
+ private ConcurrentHashMap map = new ConcurrentHashMap<>();
+
+ public static class SynchronizerImpl
+ implements Synchronizer
+ {
+ private SynchronizerMap map;
+
+ /** The id. This is the value of the counter when the key was acquired. */
+ private final int id;
+ private final K key;
+ private final VolatileCounter counter;
+
+ public SynchronizerImpl(SynchronizerMap map, K key, VolatileCounter counter, int id) {
+ super();
+ this.map = map;
+ this.key = key;
+ this.counter = counter;
+ this.id = id;
+ }
+
+ @Override
+ public void accept(Runnable action) {
+ synchronized (counter) {
+ action.run();
+ }
+ }
+
+ public void clearEntryIfZero() {
+ map.clearEntryIfZero(key);
+ }
+
+ private void dec() {
+ counter.dec();
+ }
+
+ @Override
+ public String toString() {
+ return "Synchronizer on " + System.identityHashCode(map) + ", "
+ + String.join(", ", "id: " + id, "key: " + key, "current count: " + counter.get());
+ }
+ }
+
+ public T compute(K key, Function, T> handler) {
+ SynchronizerImpl synchronizer = acquire(key);
+ Holder result = Holder.of(null);
+ synchronizer.accept(() -> {
+ // Decrement the refcount of the synchronizer. Does not clear the key's proxy object.
+ synchronizer.dec();
+
+ T r = handler.apply(synchronizer);
+ result.set(r);
+ });
+ return result.get();
+ }
+
+ private SynchronizerImpl acquire(K key) {
+ Holder id = Holder.of(null);
+ VolatileCounter counter = map.compute(key, (k, before) -> {
+ VolatileCounter r = before == null ? new VolatileCounter(1) : before.inc();
+ id.set(r.get()); // Atomically expose the current value of the counter
+ return r;
+ });
+ SynchronizerImpl result = new SynchronizerImpl<>(this, key, counter, id.get());
+ // System.out.println("Acquired: " + result);
+ return result;
+ }
+
+ private void clearEntryIfZero(K key) {
+ map.compute(key, this::clearEntryIfZero);
+ }
+
+ /** This method is run atomically */
+ private VolatileCounter clearEntryIfZero(K key, VolatileCounter counter) {
+ if (counter == null) {
+ throw new IllegalStateException("No counter for key " + key);
+ }
+ int count = counter.get();
+ if (counter.get() < 0) {
+ throw new IllegalStateException("Negative count for key " + key + ": " + count);
+ }
+
+ VolatileCounter result = count == 0
+ ? null
+ : counter;
+
+// if (count == 0) {
+// System.out.println("Cleared entry for key " + key);
+// }
+
+ return result;
+ }
+}
diff --git a/jena-serviceenhancer/src/main/java/org/apache/jena/sparql/service/enhancer/claimingcache/VolatileCounter.java b/jena-serviceenhancer/src/main/java/org/apache/jena/sparql/service/enhancer/claimingcache/VolatileCounter.java
new file mode 100644
index 00000000000..55d68b7d3be
--- /dev/null
+++ b/jena-serviceenhancer/src/main/java/org/apache/jena/sparql/service/enhancer/claimingcache/VolatileCounter.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package org.apache.jena.sparql.service.enhancer.claimingcache;
+
+/**
+ * A counter that can be accessed by multiple threads.
+ * Synchronization must be ensured extrinsically, such as using synchronized blocks or within
+ * ConcurrentHashMap.compute.
+ */
+class VolatileCounter {
+ private volatile int value ;
+
+ public VolatileCounter(int value) {
+ this.value = value;
+ }
+
+ public VolatileCounter inc() { ++value; return this; }
+ public VolatileCounter dec() { --value; return this; }
+ public int get() { return value; }
+
+ @Override
+ public String toString() {
+ return "Volatile counter " + System.identityHashCode(this) + " has value " + value;
+ }
+}
diff --git a/jena-serviceenhancer/src/main/java/org/apache/jena/sparql/service/enhancer/concurrent/AutoLock.java b/jena-serviceenhancer/src/main/java/org/apache/jena/sparql/service/enhancer/concurrent/AutoLock.java
new file mode 100644
index 00000000000..495a80484ba
--- /dev/null
+++ b/jena-serviceenhancer/src/main/java/org/apache/jena/sparql/service/enhancer/concurrent/AutoLock.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package org.apache.jena.sparql.service.enhancer.concurrent;
+
+import java.util.concurrent.locks.Lock;
+
+public class AutoLock implements AutoCloseable {
+ private final Lock lock;
+
+ private AutoLock(Lock lock) {
+ this.lock = lock;
+ }
+
+ /**
+ * Immediately attempts to acquire the lock and returns
+ * an auto-closeable AutoLock instance for use with try-with-resources.
+ */
+ public static AutoLock lock(Lock lock) {
+ lock.lock();
+ return new AutoLock(lock);
+ }
+
+ @Override
+ public void close() {
+ lock.unlock();
+ }
+}
diff --git a/jena-serviceenhancer/src/main/java/org/apache/jena/sparql/service/enhancer/concurrent/CloseShieldExecutorService.java b/jena-serviceenhancer/src/main/java/org/apache/jena/sparql/service/enhancer/concurrent/CloseShieldExecutorService.java
new file mode 100644
index 00000000000..65abcb3ed41
--- /dev/null
+++ b/jena-serviceenhancer/src/main/java/org/apache/jena/sparql/service/enhancer/concurrent/CloseShieldExecutorService.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package org.apache.jena.sparql.service.enhancer.concurrent;
+
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import com.google.common.util.concurrent.ForwardingExecutorService;
+
+/** Wrapper for an executor service. Overrides the {@link #shutdown()} and {@link #shutdownNow()} with no-ops. */
+public class CloseShieldExecutorService
+ extends ForwardingExecutorService {
+
+ protected X delegate;
+ protected AtomicBoolean isShutDown = new AtomicBoolean();
+
+ public CloseShieldExecutorService(X delegate) {
+ super();
+ this.delegate = delegate;
+ }
+
+ @Override
+ protected X delegate() {
+ return delegate;
+ }
+
+ protected void checkOpen() {
+ if (isShutdown()) {
+ throw new RejectedExecutionException("Executor service is already shut down");
+ }
+ }
+
+ @Override
+ public Future submit(Callable task) {
+ checkOpen();
+ return super.submit(task);
+ }
+
+ @Override
+ public Future> submit(Runnable task) {
+ checkOpen();
+ return super.submit(task);
+ }
+
+ @Override
+ public