Skip to content

Commit e35fcf1

Browse files
committed
Combined semaphore and lock in QueueManager
1 parent 7f86936 commit e35fcf1

File tree

1 file changed

+38
-44
lines changed

1 file changed

+38
-44
lines changed

Core/Cleipnir.ResilientFunctions/Queuing/QueueManager.cs

Lines changed: 38 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,7 @@ public class QueueManager(
2929
: IDisposable
3030
{
3131
private readonly Dictionary<EffectId, Subscription> _subscribers = new();
32-
private readonly Lock _lock = new();
33-
32+
3433
private readonly EffectId _parentId = new([-1]);
3534
private readonly EffectId _toRemoveNextIndex = new([-1, 0]);
3635
private readonly EffectId _idempotencyKeysId = new([-1, -1]);
@@ -105,15 +104,13 @@ public async Task FetchMessagesOnce()
105104
if (_thrownException != null)
106105
{
107106
foreach (var (_, subscription) in _subscribers)
108-
subscription.Tcs.TrySetException(_thrownException);
107+
_ = Task.Run(() => subscription.Tcs.TrySetException(_thrownException));
108+
109109
_subscribers.Clear();
110-
111110
return;
112111
}
113112

114-
List<long> skipPositions;
115-
lock (_lock)
116-
skipPositions = _fetchedPositions.ToList();
113+
var skipPositions = _fetchedPositions.ToList();
117114

118115
var messages = await messageStore.GetMessages(storedId, skipPositions);
119116
foreach (var (messageContent, messageType, position, idempotencyKey, sender, receiver) in messages)
@@ -142,23 +139,17 @@ public async Task FetchMessagesOnce()
142139
receiver,
143140
sender
144141
);
145-
lock (_lock)
146-
{
147-
_toDeliver.Add(messageData);
148-
_fetchedPositions.Add(position);
149-
}
142+
_toDeliver.Add(messageData);
143+
_fetchedPositions.Add(position);
150144
}
151145
catch (Exception e)
152146
{
153147
unhandledExceptionHandler.Invoke(flowId.Type, e);
154148
_thrownException = e;
155149

156-
lock (_lock)
157-
{
158-
foreach (var (_, subscription) in _subscribers)
159-
subscription.Tcs.TrySetException(_thrownException);
160-
_subscribers.Clear();
161-
}
150+
foreach (var (_, subscription) in _subscribers)
151+
_ = Task.Run(() => subscription.Tcs.TrySetException(_thrownException));
152+
_subscribers.Clear();
162153

163154
return;
164155
}
@@ -206,9 +197,8 @@ public async Task AfterFlush()
206197
foreach (var nonDirtyChild in nonDirtyChildren)
207198
await effect.Clear(nonDirtyChild, flush: false);
208199

209-
lock (_lock)
210-
foreach (var position in positions)
211-
_fetchedPositions.Remove(position);
200+
foreach (var position in positions)
201+
_fetchedPositions.Remove(position);
212202
}
213203
}
214204
catch (Exception exception)
@@ -228,13 +218,8 @@ private async Task TryToDeliverAsync()
228218
{
229219
while (true)
230220
{
231-
List<MessageData> messages;
232-
List<KeyValuePair<EffectId, Subscription>> subscribers;
233-
lock (_lock)
234-
{
235-
messages = _toDeliver.ToList();
236-
subscribers = _subscribers.ToList();
237-
}
221+
var messages = _toDeliver.ToList();
222+
var subscribers = _subscribers.ToList();
238223

239224
var delivered = false;
240225
foreach (var envelopeWithPosition in messages)
@@ -246,17 +231,13 @@ private async Task TryToDeliverAsync()
246231
var (effectId, subscription) = idAndSubscription;
247232
if (subscription.Predicate(envelopeWithPosition.Envelope))
248233
{
249-
int positionToRemoveIndex;
250-
lock (_lock)
251-
{
252-
if (!_subscribers.ContainsKey(effectId)) //might have been removed by timeout
253-
continue;
254-
255-
_toDeliver.Remove(envelopeWithPosition);
256-
_fetchedPositions.Add(envelopeWithPosition.Position);
257-
_subscribers.Remove(effectId);
258-
positionToRemoveIndex = _nextToRemoveIndex++;
259-
}
234+
if (!_subscribers.ContainsKey(effectId)) //might have been removed by timeout
235+
continue;
236+
237+
_toDeliver.Remove(envelopeWithPosition);
238+
_fetchedPositions.Add(envelopeWithPosition.Position);
239+
_subscribers.Remove(effectId);
240+
var positionToRemoveIndex = _nextToRemoveIndex++;
260241

261242
var toRemoveId = new EffectId([-1, 0, positionToRemoveIndex]);
262243
await effect.Upserts(
@@ -300,18 +281,23 @@ public async Task CheckTimeouts()
300281
{
301282
while (!_disposed && _thrownException == null)
302283
{
303-
var now = utcNow();
304284
List<KeyValuePair<EffectId, Subscription>> expiredSubscriptions;
305285

306-
lock (_lock)
286+
await _semaphore.WaitAsync();
287+
try
307288
{
289+
var now = utcNow();
308290
expiredSubscriptions = _subscribers
309291
.Where(s => s.Value.Timeout.HasValue && s.Value.Timeout.Value <= now)
310292
.ToList();
311293

312294
foreach (var expired in expiredSubscriptions)
313295
_subscribers.Remove(expired.Key);
314296
}
297+
finally
298+
{
299+
_semaphore.Release();
300+
}
315301

316302
foreach (var (_, subscription) in expiredSubscriptions)
317303
subscription.Tcs.SetResult(null);
@@ -322,8 +308,8 @@ public async Task CheckTimeouts()
322308

323309
public async Task<Envelope?> Subscribe(
324310
EffectId effectId,
325-
MessagePredicate predicate,
326-
DateTime? timeout,
311+
MessagePredicate predicate,
312+
DateTime? timeout,
327313
EffectId timeoutId,
328314
EffectId messageId,
329315
EffectId messageTypeId,
@@ -335,8 +321,16 @@ public async Task CheckTimeouts()
335321
throw _thrownException;
336322

337323
var tcs = new TaskCompletionSource<Envelope?>();
338-
lock (_lock)
324+
325+
await _semaphore.WaitAsync();
326+
try
327+
{
339328
_subscribers[effectId] = new Subscription(predicate, tcs, timeout, messageId, messageTypeId, receiverId, senderId);
329+
}
330+
finally
331+
{
332+
_semaphore.Release();
333+
}
340334

341335
if (timeout != null)
342336
timeouts.AddTimeout(timeoutId, timeout.Value);

0 commit comments

Comments
 (0)