Skip to content

Commit 0861c05

Browse files
authored
OAK-12054: Refactor creation of ThreadPoolExecutors (fix those that we in fact only running one thread) (#2679)
1 parent 5d77080 commit 0861c05

File tree

7 files changed

+120
-110
lines changed

7 files changed

+120
-110
lines changed

oak-commons/pom.xml

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,10 @@
109109
<groupId>commons-codec</groupId>
110110
<artifactId>commons-codec</artifactId>
111111
</dependency>
112+
<dependency>
113+
<groupId>org.apache.commons</groupId>
114+
<artifactId>commons-lang3</artifactId>
115+
</dependency>
112116
<dependency>
113117
<groupId>org.apache.jackrabbit</groupId>
114118
<artifactId>jackrabbit-jcr-commons</artifactId>
@@ -142,11 +146,6 @@
142146
<artifactId>commons-math3</artifactId>
143147
<scope>test</scope>
144148
</dependency>
145-
<dependency>
146-
<groupId>org.apache.commons</groupId>
147-
<artifactId>commons-lang3</artifactId>
148-
<scope>test</scope>
149-
</dependency>
150149
<dependency>
151150
<groupId>org.mockito</groupId>
152151
<artifactId>mockito-core</artifactId>
Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.jackrabbit.oak.commons.internal.concurrent;
21+
22+
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
23+
import org.jetbrains.annotations.NotNull;
24+
25+
import java.util.concurrent.ExecutorService;
26+
import java.util.concurrent.LinkedBlockingQueue;
27+
import java.util.concurrent.ThreadPoolExecutor;
28+
import java.util.concurrent.TimeUnit;
29+
30+
/**
31+
* Helper class complementing JDK's @see java.util.concurrent.Executors class with additional ExecutionService creation
32+
*/
33+
public class ExecutorHelper {
34+
35+
/**
36+
* Create a ThreadPoolExecutor with an unbounded LinkedBlockingQueue as work queue.
37+
* <br>
38+
* NOTE: The maximum number of threads in each executor service, when using a LinkedBlockingQueue(), is corePoolSize.
39+
* all other tasks are kept in the LinkedBlockingQueue, which is unbounded.
40+
* <br>
41+
* @see <a href="https://docs.oracle.com/en/java/javase/11/docs/api/java.base/java/util/concurrent/ThreadPoolExecutor.html">ThreadPoolExecutor javadoc</a>
42+
* <br>
43+
* (Using a bounded queue, such as SynchronousQueue would result in RejectedExecutionHandler if the queue would reach its capacity).
44+
* We want to keep things simple and don't want to use back pressure or other mechanisms. So in summary, corePoolSize threads are used, per service.
45+
* @param poolSize The size of the thread pool
46+
* @param namePattern The name pattern for the threads containing %d replaced by the thread number (calculated by NamedThreadFactory).
47+
* @param handler An optional handler for uncaught exceptions
48+
* @return A ThreadPoolExecutor
49+
*/
50+
public static ThreadPoolExecutor linkedQueueExecutor(int poolSize, @NotNull String namePattern, Thread.UncaughtExceptionHandler handler) {
51+
return linkedQueueExecutor(poolSize, poolSize, namePattern, handler);
52+
}
53+
54+
/**
55+
* Create a ThreadPoolExecutor with an unbounded LinkedBlockingQueue as work queue.
56+
* <br>
57+
* Helper method delegating to {@link #linkedQueueExecutor(int, String, Thread.UncaughtExceptionHandler)} with null handler.
58+
* @param poolSize The size of the thread pool
59+
* @param namePattern The name pattern for the threads containing %d replaced by the thread number (calculated by NamedThreadFactory).
60+
* @return A ThreadPoolExecutor
61+
*/
62+
public static ExecutorService linkedQueueExecutor(int poolSize, @NotNull String namePattern) {
63+
return linkedQueueExecutor(poolSize, namePattern, null);
64+
}
65+
66+
67+
/**
68+
* Create a ThreadPoolExecutor with an unbounded LinkedBlockingQueue as work queue which will start a single thread
69+
* in presence of work and will shut down the thread after 60 seconds of inactivity.
70+
* <br>
71+
* NOTE: ThreadPoolExecutor with an (unbounded) LinkedBlockingQueue does not allow setting both corePoolSize and maximumPoolSize,
72+
* and thus does not allow starting more than a single thread in combination with reduction to 0.
73+
* @param namePattern The name pattern for the threads containing %d replaced by the thread number (calculated by NamedThreadFactory).
74+
* @param handler An optional handler for uncaught exceptions
75+
* @return A ThreadPoolExecutor which will activate a single thread on demand and shut it down after 60 seconds of inactivity.
76+
*/
77+
public static ThreadPoolExecutor onDemandSingleThreadLinkedQueueExecutor(@NotNull String namePattern, Thread.UncaughtExceptionHandler handler) {
78+
return linkedQueueExecutor(0, 1, namePattern, handler);
79+
}
80+
81+
private static ThreadPoolExecutor linkedQueueExecutor(int minSize, int maxSize, @NotNull String namePattern, Thread.UncaughtExceptionHandler handler) {
82+
// Either minSize=0 and maxSize=1 (maxSize > 1 behaves as if maxSize were 1)
83+
// or minSize=maxSize (maxSize <> minSize behaves as if minSize were maxSize)
84+
var factory = BasicThreadFactory.builder()
85+
.daemon(true)
86+
.namingPattern(namePattern);
87+
if(handler != null) {
88+
factory.uncaughtExceptionHandler(handler);
89+
}
90+
return new ThreadPoolExecutor(minSize, maxSize, 60L, TimeUnit.SECONDS,
91+
new LinkedBlockingQueue<>(), factory.build());
92+
}
93+
94+
}

oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexProviderService.java

Lines changed: 5 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -26,18 +26,16 @@
2626
import java.util.List;
2727
import java.util.Map;
2828
import java.util.concurrent.ExecutorService;
29-
import java.util.concurrent.LinkedBlockingQueue;
30-
import java.util.concurrent.ThreadFactory;
3129
import java.util.concurrent.ThreadPoolExecutor;
3230
import java.util.concurrent.TimeUnit;
33-
import java.util.concurrent.atomic.AtomicInteger;
3431

3532
import org.apache.commons.lang3.StringUtils;
3633

3734
import org.apache.commons.io.FilenameUtils;
3835
import org.apache.jackrabbit.oak.api.jmx.CacheStatsMBean;
3936
import org.apache.jackrabbit.oak.api.jmx.CheckpointMBean;
4037
import org.apache.jackrabbit.oak.cache.CacheStats;
38+
import org.apache.jackrabbit.oak.commons.internal.concurrent.ExecutorHelper;
4139
import org.apache.jackrabbit.oak.osgi.OsgiWhiteboard;
4240
import org.apache.jackrabbit.oak.plugins.document.spi.JournalPropertyService;
4341
import org.apache.jackrabbit.oak.plugins.index.AsyncIndexInfoService;
@@ -79,7 +77,6 @@
7977
import org.apache.lucene.analysis.util.TokenizerFactory;
8078
import org.apache.lucene.search.BooleanQuery;
8179
import org.apache.lucene.util.InfoStream;
82-
import org.jetbrains.annotations.NotNull;
8380
import org.osgi.framework.BundleContext;
8481
import org.osgi.framework.ServiceRegistration;
8582
import org.osgi.service.component.annotations.Activate;
@@ -266,6 +263,7 @@ public class LuceneIndexProviderService {
266263
}
267264

268265
public static final String REPOSITORY_HOME = "repository.home";
266+
private static final int INDEX_COPIER_POOL_SIZE = 5;
269267

270268
private LuceneIndexProvider indexProvider;
271269

@@ -335,8 +333,6 @@ public class LuceneIndexProviderService {
335333

336334
private ExecutorService executorService;
337335

338-
private int threadPoolSize;
339-
340336
private ExtractedTextCache extractedTextCache;
341337

342338
private boolean hybridIndex;
@@ -373,7 +369,6 @@ private void activate(BundleContext bundleContext, Configuration config) throws
373369
}
374370

375371
whiteboard = new OsgiWhiteboard(bundleContext);
376-
threadPoolSize = config.threadPoolSize();
377372
initializeIndexDir(bundleContext, config);
378373
initializeExtractedTextCache(bundleContext, config, statisticsProvider);
379374
tracker = createTracker(bundleContext, config);
@@ -572,24 +567,9 @@ ExecutorService getExecutorService(){
572567
}
573568

574569
private ExecutorService createExecutor() {
575-
ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 5, 60L, TimeUnit.SECONDS,
576-
new LinkedBlockingQueue<>(), new ThreadFactory() {
577-
private final AtomicInteger counter = new AtomicInteger();
578-
private final Thread.UncaughtExceptionHandler handler = (t, e) -> log.warn("Error occurred in asynchronous processing ", e);
579-
@Override
580-
public Thread newThread(@NotNull Runnable r) {
581-
Thread thread = new Thread(r, createName());
582-
thread.setDaemon(true);
583-
thread.setPriority(Thread.MIN_PRIORITY);
584-
thread.setUncaughtExceptionHandler(handler);
585-
return thread;
586-
}
587-
588-
private String createName() {
589-
return "oak-lucene-" + counter.getAndIncrement();
590-
}
591-
});
592-
executor.setKeepAliveTime(1, TimeUnit.MINUTES);
570+
ThreadPoolExecutor executor = ExecutorHelper.linkedQueueExecutor(
571+
INDEX_COPIER_POOL_SIZE, "oak-lucene-%d",
572+
(t, e) -> log.warn("Error occurred in asynchronous processing ", e));
593573
executor.allowCoreThreadTimeOut(true);
594574
return executor;
595575
}

oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/IndexHelper.java

Lines changed: 4 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -25,13 +25,10 @@
2525
import java.util.List;
2626
import java.util.concurrent.Executor;
2727
import java.util.concurrent.ExecutorService;
28-
import java.util.concurrent.LinkedBlockingQueue;
29-
import java.util.concurrent.ThreadFactory;
3028
import java.util.concurrent.ThreadPoolExecutor;
31-
import java.util.concurrent.TimeUnit;
32-
import java.util.concurrent.atomic.AtomicInteger;
3329

3430
import org.apache.jackrabbit.oak.commons.concurrent.ExecutorCloser;
31+
import org.apache.jackrabbit.oak.commons.internal.concurrent.ExecutorHelper;
3532
import org.apache.jackrabbit.oak.commons.pio.Closer;
3633
import org.apache.jackrabbit.oak.plugins.index.AsyncIndexInfoService;
3734
import org.apache.jackrabbit.oak.plugins.index.AsyncIndexInfoServiceImpl;
@@ -174,26 +171,9 @@ protected void bindIndexInfoProviders(IndexInfoServiceImpl indexInfoService) {
174171
}
175172

176173
private ThreadPoolExecutor createExecutor() {
177-
ThreadPoolExecutor executor = new ThreadPoolExecutor(0, 5, 60L, TimeUnit.SECONDS,
178-
new LinkedBlockingQueue<Runnable>(), new ThreadFactory() {
179-
private final AtomicInteger counter = new AtomicInteger();
180-
private final Thread.UncaughtExceptionHandler handler =
181-
(t, e) -> log.warn("Error occurred in asynchronous processing ", e);
182-
183-
@Override
184-
public Thread newThread(@NotNull Runnable r) {
185-
Thread thread = new Thread(r, createName());
186-
thread.setDaemon(true);
187-
thread.setPriority(Thread.MIN_PRIORITY);
188-
thread.setUncaughtExceptionHandler(handler);
189-
return thread;
190-
}
191-
192-
private String createName() {
193-
return "oak-lucene-" + counter.getAndIncrement();
194-
}
195-
});
196-
executor.setKeepAliveTime(1, TimeUnit.MINUTES);
174+
ThreadPoolExecutor executor = ExecutorHelper.onDemandSingleThreadLinkedQueueExecutor(
175+
"oak-lucene-%d",
176+
(t, e) -> log.warn("Error occurred in asynchronous processing ", e));
197177
executor.allowCoreThreadTimeOut(true);
198178
return executor;
199179
}

oak-run/src/main/java/org/apache/jackrabbit/oak/run/Downloader.java

Lines changed: 3 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,8 @@
1818
*/
1919
package org.apache.jackrabbit.oak.run;
2020

21-
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
2221
import org.apache.jackrabbit.oak.commons.IOUtils;
22+
import org.apache.jackrabbit.oak.commons.internal.concurrent.ExecutorHelper;
2323
import org.slf4j.Logger;
2424
import org.slf4j.LoggerFactory;
2525

@@ -45,8 +45,6 @@
4545
import java.util.concurrent.ExecutionException;
4646
import java.util.concurrent.ExecutorService;
4747
import java.util.concurrent.Future;
48-
import java.util.concurrent.LinkedBlockingQueue;
49-
import java.util.concurrent.ThreadPoolExecutor;
5048
import java.util.concurrent.TimeUnit;
5149
import java.util.stream.Collectors;
5250

@@ -128,26 +126,8 @@ public Downloader(int concurrency, int connectTimeoutMs, int readTimeoutMs, int
128126
this.checksumAlgorithm = null;
129127
}
130128
this.bufferSize = bufferSize;
131-
132-
// The maximum number of threads in each executor service,
133-
// when using a LinkedBlockingQueue(), is corePoolSize.
134-
// all other tasks are kept in the LinkedBlockingQueue, which
135-
// is unbounded.
136-
// (Using a bounded queue, such as SynchronousQueue,
137-
// would result in RejectedExecutionHandler).
138-
// We want to keep things simple and don't want
139-
// to use back presssure or other mechanisms.
140-
// So in summary, corePoolSize threads are used, per service.
141-
this.executorService = new ThreadPoolExecutor(
142-
corePoolSize, concurrency, 60L, TimeUnit.SECONDS,
143-
new LinkedBlockingQueue<>(),
144-
BasicThreadFactory.builder().namingPattern("downloader-%d").daemon().build()
145-
);
146-
this.executorServiceForParts = new ThreadPoolExecutor(
147-
corePoolSize, concurrency, 60L, TimeUnit.SECONDS,
148-
new LinkedBlockingQueue<>(),
149-
BasicThreadFactory.builder().namingPattern("partDownloader-%d").daemon().build()
150-
);
129+
this.executorService = ExecutorHelper.linkedQueueExecutor(corePoolSize, "downloader-%d");
130+
this.executorServiceForParts = ExecutorHelper.linkedQueueExecutor(corePoolSize, "partDownloader-%d");
151131
this.responses = new ArrayList<>();
152132
}
153133

oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/ElasticIndexStatistics.java

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -23,14 +23,12 @@
2323
import java.util.concurrent.CompletableFuture;
2424
import java.util.concurrent.CompletionException;
2525
import java.util.concurrent.ExecutorService;
26-
import java.util.concurrent.LinkedBlockingQueue;
27-
import java.util.concurrent.ThreadPoolExecutor;
2826
import java.util.concurrent.TimeUnit;
2927

3028
import co.elastic.clients.elasticsearch._types.query_dsl.Query;
3129

32-
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
3330
import org.apache.jackrabbit.guava.common.base.Ticker;
31+
import org.apache.jackrabbit.oak.commons.internal.concurrent.ExecutorHelper;
3432
import org.apache.jackrabbit.oak.commons.internal.concurrent.FutureConverter;
3533
import org.apache.jackrabbit.oak.plugins.index.elastic.util.ElasticIndexUtils;
3634
import org.apache.jackrabbit.oak.plugins.index.search.IndexStatistics;
@@ -69,12 +67,10 @@ public class ElasticIndexStatistics implements IndexStatistics {
6967
private static final String REFRESH_SECONDS = "oak.elastic.statsRefreshSeconds";
7068
private static final Long REFRESH_SECONDS_DEFAULT = 60L;
7169

72-
private static final ExecutorService REFRESH_EXECUTOR = new ThreadPoolExecutor(
73-
0, 4, 60L, TimeUnit.SECONDS,
74-
new LinkedBlockingQueue<>(),
75-
BasicThreadFactory.builder().namingPattern("elastic-statistics-cache-refresh-thread-%d").daemon().build()
76-
);
70+
private static final int REFRESH_POOL_SIZE = 4;
7771

72+
private static final ExecutorService REFRESH_EXECUTOR = ExecutorHelper.linkedQueueExecutor(
73+
REFRESH_POOL_SIZE, "elastic-statistics-cache-refresh-%d");
7874
private final ElasticConnection elasticConnection;
7975
private final ElasticIndexDefinition indexDefinition;
8076
private final LoadingCache<StatsRequestDescriptor, Integer> countCache;

oak-search/src/main/java/org/apache/jackrabbit/oak/plugins/index/search/ExtractedTextCache.java

Lines changed: 6 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -32,19 +32,17 @@
3232
import java.util.concurrent.ExecutionException;
3333
import java.util.concurrent.ExecutorService;
3434
import java.util.concurrent.Future;
35-
import java.util.concurrent.LinkedBlockingQueue;
36-
import java.util.concurrent.ThreadFactory;
3735
import java.util.concurrent.ThreadPoolExecutor;
3836
import java.util.concurrent.TimeUnit;
3937
import java.util.concurrent.TimeoutException;
40-
import java.util.concurrent.atomic.AtomicInteger;
4138

4239
import org.apache.jackrabbit.guava.common.cache.Cache;
4340
import org.apache.jackrabbit.guava.common.cache.CacheBuilder;
4441
import org.apache.jackrabbit.guava.common.cache.Weigher;
4542
import org.apache.jackrabbit.oak.api.Blob;
4643
import org.apache.jackrabbit.oak.cache.CacheStats;
4744
import org.apache.jackrabbit.oak.commons.IOUtils;
45+
import org.apache.jackrabbit.oak.commons.internal.concurrent.ExecutorHelper;
4846
import org.apache.jackrabbit.oak.plugins.index.fulltext.ExtractedText;
4947
import org.apache.jackrabbit.oak.plugins.index.fulltext.ExtractedText.ExtractionResult;
5048
import org.apache.jackrabbit.oak.plugins.index.fulltext.PreExtractedTextProvider;
@@ -64,8 +62,8 @@ public class ExtractedTextCache {
6462
Boolean.getBoolean("oak.extracted.cacheOnlySuccess");
6563
private static final int EXTRACTION_TIMEOUT_SECONDS =
6664
Integer.getInteger("oak.extraction.timeoutSeconds", 60);
67-
private static final int EXTRACTION_MAX_THREADS =
68-
Integer.getInteger("oak.extraction.maxThreads", 10);
65+
private static final int EXTRACTION_POOL_SIZE =
66+
Integer.getInteger("oak.extraction.poolSize", 1);
6967
private static final boolean EXTRACT_IN_CALLER_THREAD =
7068
Boolean.getBoolean("oak.extraction.inCallerThread");
7169
private static final boolean EXTRACT_FORGET_TIMEOUT =
@@ -346,26 +344,9 @@ private synchronized void createExecutor() {
346344
return;
347345
}
348346
log.debug("ExtractedTextCache createExecutor {}", this);
349-
ThreadPoolExecutor executor = new ThreadPoolExecutor(1, EXTRACTION_MAX_THREADS,
350-
60L, TimeUnit.SECONDS,
351-
new LinkedBlockingQueue<>(), new ThreadFactory() {
352-
private final AtomicInteger counter = new AtomicInteger();
353-
private final Thread.UncaughtExceptionHandler handler = (t, e) -> log.warn("Error occurred in asynchronous processing ", e);
354-
@Override
355-
public Thread newThread(@NotNull Runnable r) {
356-
Thread thread = new Thread(r, createName());
357-
thread.setDaemon(true);
358-
thread.setPriority(Thread.MIN_PRIORITY);
359-
thread.setUncaughtExceptionHandler(handler);
360-
return thread;
361-
}
362-
363-
private String createName() {
364-
int index = counter.getAndIncrement();
365-
return "oak binary text extractor" + (index == 0 ? "" : " " + index);
366-
}
367-
});
368-
executor.setKeepAliveTime(1, TimeUnit.MINUTES);
347+
ThreadPoolExecutor executor = ExecutorHelper.linkedQueueExecutor(
348+
EXTRACTION_POOL_SIZE, "oak-binary-text-extractor-%d",
349+
(t, e) -> log.warn("Error occurred in asynchronous processing ", e));
369350
executor.allowCoreThreadTimeOut(true);
370351
executorService = executor;
371352
}

0 commit comments

Comments
 (0)