Skip to content

Commit 7bb5051

Browse files
stidsborgclaude
andcommitted
Refactored QueueManager.TryToDeliver to use async semaphore
Replaced the _delivering flag with an async SemaphoreSlim for controlling concurrent access to message delivery. This ensures callers wait for their turn instead of returning early when delivery is in progress. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
1 parent efbe998 commit 7bb5051

File tree

1 file changed

+55
-48
lines changed

1 file changed

+55
-48
lines changed

Core/Cleipnir.ResilientFunctions/Queuing/QueueManager.cs

Lines changed: 55 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ public class QueueManager(
4141

4242
private IdempotencyKeys? _idempotencyKeys;
4343
private int _nextToRemoveIndex = 0;
44-
private bool _delivering;
44+
private readonly SemaphoreSlim _deliverySemaphore = new(1, 1);
4545

4646
private readonly SemaphoreSlim _semaphoreSlim = new SemaphoreSlim(1);
4747
private bool _initialized = false;
@@ -150,7 +150,7 @@ public async Task FetchMessagesOnce()
150150
}
151151

152152
if (messages.Any())
153-
TryToDeliver();
153+
_ = TryToDeliverAsync();
154154
}
155155
finally
156156
{
@@ -206,65 +206,72 @@ public async Task AfterFlush()
206206
}
207207
}
208208

209-
private void TryToDeliver()
209+
private async Task TryToDeliverAsync()
210210
{
211-
List<MessageWithPosition> messagesToDeliver;
212-
List<KeyValuePair<EffectId, Subscription>> subscribers;
213-
lock (_lock)
214-
if (_delivering)
215-
return;
216-
else
217-
_delivering = true;
218-
219-
StartAgain:
220-
lock (_lock)
221-
{
222-
messagesToDeliver = _toDeliver.ToList();
223-
subscribers = _subscribers.ToList();
224-
}
225-
211+
await _deliverySemaphore.WaitAsync();
226212
try
227213
{
228-
foreach (var messageWithPosition in messagesToDeliver)
229-
foreach (var idAndSubscription in subscribers)
214+
while (true)
230215
{
231-
var (effectId, subscription) = idAndSubscription;
232-
if (subscription.Predicate(messageWithPosition.Message))
216+
List<MessageWithPosition> messagesToDeliver;
217+
List<KeyValuePair<EffectId, Subscription>> subscribers;
218+
lock (_lock)
233219
{
234-
int toRemoveIndex;
235-
lock (_lock)
236-
{
237-
if (!_subscribers.ContainsKey(effectId)) //might have been removed by timeout
238-
continue;
239-
240-
_toDeliver.Remove(messageWithPosition);
241-
_deliveredPositions.Add(messageWithPosition.Position);
242-
_subscribers.Remove(effectId);
243-
toRemoveIndex = _nextToRemoveIndex++;
244-
}
245-
246-
effect.Upsert(_toRemoveNextIndex, toRemoveIndex, alias: null, flush: false);
220+
messagesToDeliver = _toDeliver.ToList();
221+
subscribers = _subscribers.ToList();
222+
}
247223

248-
var toRemoveId = new EffectId([-1, 0, toRemoveIndex]);
249-
var msg = new MessageAndEffectResults(
250-
messageWithPosition.Message,
251-
messageWithPosition.IdempotencyKeyResult == null
252-
? [new EffectResult(toRemoveId, messageWithPosition.Position, Alias: null)]
253-
: [new EffectResult(toRemoveId, messageWithPosition.Position, Alias: null), messageWithPosition.IdempotencyKeyResult]
254-
);
255-
subscription.Tcs.SetResult(msg);
224+
var delivered = false;
225+
foreach (var messageWithPosition in messagesToDeliver)
226+
{
227+
if (delivered) break;
256228

257-
goto StartAgain;
229+
foreach (var idAndSubscription in subscribers)
230+
{
231+
var (effectId, subscription) = idAndSubscription;
232+
if (subscription.Predicate(messageWithPosition.Message))
233+
{
234+
int toRemoveIndex;
235+
lock (_lock)
236+
{
237+
if (!_subscribers.ContainsKey(effectId)) //might have been removed by timeout
238+
continue;
239+
240+
_toDeliver.Remove(messageWithPosition);
241+
_deliveredPositions.Add(messageWithPosition.Position);
242+
_subscribers.Remove(effectId);
243+
toRemoveIndex = _nextToRemoveIndex++;
244+
}
245+
246+
await effect.Upsert(_toRemoveNextIndex, toRemoveIndex, alias: null, flush: false);
247+
248+
var toRemoveId = new EffectId([-1, 0, toRemoveIndex]);
249+
var msg = new MessageAndEffectResults(
250+
messageWithPosition.Message,
251+
messageWithPosition.IdempotencyKeyResult == null
252+
? [new EffectResult(toRemoveId, messageWithPosition.Position, Alias: null)]
253+
: [new EffectResult(toRemoveId, messageWithPosition.Position, Alias: null), messageWithPosition.IdempotencyKeyResult]
254+
);
255+
subscription.Tcs.SetResult(msg);
256+
257+
delivered = true;
258+
break;
259+
}
260+
}
258261
}
262+
263+
if (!delivered)
264+
break;
259265
}
260266
}
261267
catch (Exception e)
262268
{
263269
unhandledExceptionHandler.Invoke(flowId.Type, e);
264270
}
265-
266-
lock (_lock)
267-
_delivering = false;
271+
finally
272+
{
273+
_deliverySemaphore.Release();
274+
}
268275
}
269276

270277
public async Task CheckTimeouts()
@@ -300,7 +307,7 @@ public async Task CheckTimeouts()
300307
if (timeout != null)
301308
timeouts.AddTimeout(timeoutId!, timeout.Value);
302309

303-
TryToDeliver();
310+
_ = TryToDeliverAsync();
304311

305312
await Task.WhenAny(tcs.Task, Task.Delay(maxWait ?? settings.MessagesDefaultMaxWaitForCompletion));
306313

0 commit comments

Comments
 (0)