Skip to content

Commit 1fb0a3b

Browse files
rishabhdaimreschke
authored andcommitted
OAK-12077 : added joinUninterruptibly() in oak-commons (#2711)
* OAK-12077 : added joinUninterruptibly() in oak-commons * OAK-12077 : added review comments * OAK-12077 : updated sleep durations to sleep within joining period * OAK-12077 : fixed assertion errors if occured in joiningThread
1 parent 1e7b885 commit 1fb0a3b

File tree

2 files changed

+158
-0
lines changed

2 files changed

+158
-0
lines changed

oak-commons/src/main/java/org/apache/jackrabbit/oak/commons/internal/concurrent/UninterruptibleUtils.java

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,4 +110,39 @@ public static void sleepUninterruptibly(final long sleep, final TimeUnit unit) {
110110
}
111111
}
112112
}
113+
114+
/**
115+
* Invokes {@link TimeUnit#timedJoin(Thread, long)} uninterruptibly.
116+
* <p>
117+
* This method repeatedly calls {@link TimeUnit#timedJoin(Thread, long)} until the
118+
* specified timeout has elapsed or the target thread terminates, ignoring
119+
* {@link InterruptedException} but remembering that an interruption
120+
* occurred. When the method finally returns, it restores the current
121+
* thread's interrupted status if any interruptions were detected.
122+
*
123+
* @param toJoin the thread to wait for; must not be {@code null}
124+
* @throws NullPointerException if {@code toJoin} or {@code unit} is {@code null}
125+
* @throws IllegalArgumentException if {@code timeout} is negative
126+
*/
127+
public static void joinUninterruptibly(final Thread toJoin) {
128+
129+
Objects.requireNonNull(toJoin, "thread to join is null");
130+
131+
boolean interrupted = false;
132+
133+
try {
134+
for(;;) {
135+
try {
136+
toJoin.join();
137+
return;
138+
} catch (InterruptedException var6) {
139+
interrupted = true;
140+
}
141+
}
142+
} finally {
143+
if (interrupted) {
144+
Thread.currentThread().interrupt();
145+
}
146+
}
147+
}
113148
}

oak-commons/src/test/java/org/apache/jackrabbit/oak/commons/internal/concurrent/UninterruptibleUtilsTest.java

Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323

2424
import java.util.concurrent.CountDownLatch;
2525
import java.util.concurrent.TimeUnit;
26+
import java.util.concurrent.atomic.AtomicReference;
2627

2728
/**
2829
* Unit cases for {@link UninterruptibleUtils}
@@ -162,4 +163,126 @@ public void testZeroSleepReturnsQuickly() {
162163
Assert.assertTrue("Zero sleep should return quickly", elapsedMillis < 50L);
163164
}
164165

166+
@Test
167+
public void testNullThread() {
168+
Assert.assertThrows(NullPointerException.class,
169+
() -> UninterruptibleUtils.joinUninterruptibly(null));
170+
}
171+
172+
@Test
173+
public void testReturnsWhenThreadFinishesBeforeTimeout() throws Exception {
174+
final long workMillis = 10L;
175+
final Thread worker = new Thread(() -> {
176+
try {
177+
Thread.sleep(workMillis);
178+
} catch (InterruptedException ignored) {}
179+
});
180+
181+
worker.start();
182+
183+
long start = System.nanoTime();
184+
UninterruptibleUtils.joinUninterruptibly(worker);
185+
long elapsedMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
186+
187+
Assert.assertFalse("Worker should be finished", worker.isAlive());
188+
Assert.assertTrue("Join should not take excessively long",
189+
elapsedMillis >= workMillis && elapsedMillis < 100L);
190+
}
191+
192+
@Test
193+
public void testJoinShouldWaitUntilThreadFinishes() {
194+
final Thread worker = new Thread(() -> {
195+
try {
196+
Thread.sleep(20L);
197+
} catch (InterruptedException ignored) {
198+
}
199+
});
200+
201+
worker.start();
202+
203+
long start = System.nanoTime();
204+
UninterruptibleUtils.joinUninterruptibly(worker);
205+
long elapsedMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
206+
207+
Assert.assertTrue("Join should respect timeout", elapsedMillis >= 20L );
208+
}
209+
210+
@Test
211+
public void testJoinUninterruptiblyIgnoresInterruptsButRestoresFlag() throws Exception {
212+
final Thread worker = new Thread(() -> {
213+
try {
214+
Thread.sleep(200L);
215+
} catch (InterruptedException ignored) {
216+
}
217+
});
218+
219+
worker.start();
220+
221+
final AtomicReference<Throwable> t = new AtomicReference<>();
222+
223+
Thread joiningThread = new Thread(() -> {
224+
try {
225+
UninterruptibleUtils.joinUninterruptibly(worker);
226+
// After returning, interrupted flag should be set if we interrupted during join
227+
Assert.assertTrue("Interrupt flag should be restored", Thread.currentThread().isInterrupted());
228+
} catch (Throwable e) {
229+
t.set(e);
230+
}
231+
});
232+
233+
joiningThread.start();
234+
235+
// Let the joining thread enter join
236+
Thread.sleep(5L);
237+
238+
// Interrupt while it is joining
239+
joiningThread.interrupt();
240+
241+
joiningThread.join();
242+
243+
// fail if any exception occurred in the thread
244+
if (t.get() != null) {
245+
Assert.fail("Got exception: " + t.get());
246+
}
247+
}
248+
249+
@Test
250+
public void testJoinUninterruptiblyMultipleInterruptsStillCompleteAndRestoreFlag() throws Exception {
251+
final Thread worker = new Thread(() -> {
252+
try {
253+
Thread.sleep(300L);
254+
} catch (InterruptedException ignored) {
255+
}
256+
});
257+
258+
worker.start();
259+
260+
final AtomicReference<Throwable> t = new AtomicReference<>();
261+
262+
Thread joiningThread = new Thread(() -> {
263+
try {
264+
UninterruptibleUtils.joinUninterruptibly(worker);
265+
Assert.assertTrue("Interrupt flag should be restored after multiple interrupts",
266+
Thread.currentThread().isInterrupted());
267+
} catch (Throwable e) {
268+
t.set(e);
269+
}
270+
});
271+
272+
joiningThread.start();
273+
274+
// Interrupt the joining thread multiple times while it is waiting
275+
for (int i = 0; i < 3; i++) {
276+
Thread.sleep(5L);
277+
joiningThread.interrupt();
278+
}
279+
280+
joiningThread.join();
281+
282+
// fail if any exception occurred in the thread
283+
if (t.get() != null) {
284+
Assert.fail("Got exception: " + t.get());
285+
}
286+
}
287+
165288
}

0 commit comments

Comments
 (0)