Skip to content

Commit c357900

Browse files
committed
Removed skip param from MessageStore's GetMessages-method
1 parent a25a099 commit c357900

File tree

18 files changed

+105
-100
lines changed

18 files changed

+105
-100
lines changed

Core/Cleipnir.ResilientFunctions.Tests/Messaging/TestTemplates/MessageStoreTests.cs

Lines changed: 54 additions & 47 deletions
Large diffs are not rendered by default.

Core/Cleipnir.ResilientFunctions.Tests/Messaging/TestTemplates/MessagesSubscriptionTests.cs

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -36,40 +36,40 @@ await functionStore.CreateFunction(
3636
var messageStore = functionStore.MessageStore;
3737

3838
await messageStore
39-
.GetMessages(functionId, skip: 0)
39+
.GetMessages(functionId)
4040
.SelectAsync(msgs => msgs.Any())
4141
.ShouldBeFalseAsync();
4242

43-
var events = await messageStore.GetMessages(functionId, skip: 0);
43+
var events = await messageStore.GetMessages(functionId);
4444
events.ShouldBeEmpty();
4545

4646
await messageStore.AppendMessage(
4747
functionId,
4848
new StoredMessage("hello world". ToJson().ToUtf8Bytes(), typeof(string).SimpleQualifiedName().ToUtf8Bytes(), Position: 0)
4949
);
5050

51-
events = await messageStore.GetMessages(functionId, skip: 0);
51+
events = await messageStore.GetMessages(functionId);
5252
events.Count.ShouldBe(1);
5353
DefaultSerializer
5454
.Instance
5555
.Deserialize(events[0].MessageContent, DefaultSerializer.Instance.ResolveType(events[0].MessageType)!)
5656
.ShouldBe("hello world");
5757

58-
var skipPosition = events[0].Position + 1;
59-
events = await messageStore.GetMessages(functionId, skip: skipPosition);
60-
events.ShouldBeEmpty();
58+
var skipPosition = events[0].Position;
59+
var filteredEvents = (await messageStore.GetMessages(functionId)).Where(e => e.Position > skipPosition).ToList();
60+
filteredEvents.ShouldBeEmpty();
6161

6262
await messageStore.AppendMessage(
6363
functionId,
6464
new StoredMessage("hello universe".ToJson().ToUtf8Bytes(), typeof(string).SimpleQualifiedName().ToUtf8Bytes(), Position: 0)
6565
);
6666

67-
events = await messageStore.GetMessages(functionId, skip: skipPosition);
68-
events.Count.ShouldBe(1);
67+
filteredEvents = (await messageStore.GetMessages(functionId)).Where(e => e.Position > skipPosition).ToList();
68+
filteredEvents.Count.ShouldBe(1);
6969

7070
DefaultSerializer
7171
.Instance
72-
.Deserialize(events[0].MessageContent, DefaultSerializer.Instance.ResolveType(events[0].MessageType)!)
72+
.Deserialize(filteredEvents[0].MessageContent, DefaultSerializer.Instance.ResolveType(filteredEvents[0].MessageType)!)
7373
.ShouldBe("hello universe");
7474
}
7575

@@ -340,15 +340,15 @@ protected async Task OnlyFirstMessageWithSameIdempotencyKeyIsDeliveredAndBothAre
340340
await scheduled.Completion();
341341

342342
await BusyWait.Until(() => storedId != null);
343-
await BusyWait.Until(async () => await functionStore.MessageStore.GetMessages(storedId!, skip: 0).SelectAsync(m => m.Count) == 0);
343+
await BusyWait.Until(async () => await functionStore.MessageStore.GetMessages(storedId!).SelectAsync(m => m.Count) == 0);
344344

345345
// Only the first message should be delivered
346346
var result = await scheduled.Completion(maxWait: TimeSpan.FromSeconds(5));
347347
result.Item1.ShouldBe("first message");
348348
result.Item2.ShouldBeNull();
349349

350350
// Verify both messages are removed from the store after completion
351-
var messagesAfterCompletion = await functionStore.MessageStore.GetMessages(storedId!, skip: 0);
351+
var messagesAfterCompletion = await functionStore.MessageStore.GetMessages(storedId!);
352352
messagesAfterCompletion.ShouldBeEmpty();
353353

354354
unhandledExceptionCatcher.ShouldNotHaveExceptions();
@@ -438,7 +438,7 @@ protected async Task MultipleIterationsWithDuplicateIdempotencyKeysProcessCorrec
438438
receivedMessages[i].ShouldBe(i);
439439

440440
await BusyWait.Until(
441-
async () => await functionStore.MessageStore.GetMessages(storedId!, skip: 0).SelectAsync(m => m.Count) == 0,
441+
async () => await functionStore.MessageStore.GetMessages(storedId!).SelectAsync(m => m.Count) == 0,
442442
maxWait: TimeSpan.FromSeconds(30)
443443
);
444444

Core/Cleipnir.ResilientFunctions.Tests/Messaging/TestTemplates/MessagesTests.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -575,7 +575,7 @@ protected async Task SenderIsPersistedAndCanBeFetched(Task<IFunctionStore> funct
575575
var messageWriter = new MessageWriter(storedId, messageStore, serializer);
576576
await messageWriter.AppendMessage("hello world", idempotencyKey: "key1", sender: "TestSender");
577577

578-
var messages = await messageStore.GetMessages(storedId, skip: 0);
578+
var messages = await messageStore.GetMessages(storedId);
579579
messages.Count.ShouldBe(1);
580580
messages[0].Sender.ShouldBe("TestSender");
581581
}

Core/Cleipnir.ResilientFunctions.Tests/TestTemplates/FunctionTests/ControlPanelTests.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ protected async Task ExistingActionCanBeDeletedFromControlPanel(Task<IFunctionSt
4747

4848
await store
4949
.MessageStore
50-
.GetMessages(storedId, skip: 0)
50+
.GetMessages(storedId)
5151
.SelectAsync(messages => messages.Count)
5252
.ShouldBeAsync(0);
5353

@@ -97,7 +97,7 @@ async Task<string>(string _, Workflow workflow) =>
9797

9898
await store
9999
.MessageStore
100-
.GetMessages(storedId, skip: 0)
100+
.GetMessages(storedId)
101101
.SelectAsync(messages => messages.Count)
102102
.ShouldBeAsync(0);
103103

@@ -1289,7 +1289,7 @@ protected async Task DeleteRemovesFunctionFromAllStores(Task<IFunctionStore> sto
12891289
var storedId = registration.MapToStoredId(functionId.Instance);
12901290
await store.GetFunction(storedId).ShouldBeNullAsync();
12911291

1292-
await store.MessageStore.GetMessages(storedId, skip: 0)
1292+
await store.MessageStore.GetMessages(storedId)
12931293
.SelectAsync(msgs => msgs.Count == 0)
12941294
.ShouldBeTrueAsync();
12951295

Core/Cleipnir.ResilientFunctions.Tests/TestTemplates/StoreCrudTests.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -197,7 +197,7 @@ await store.EffectsStore.SetEffectResult(
197197
await store.GetFunction(storedId).ShouldBeNullAsync();
198198
await store.CorrelationStore.GetCorrelations(storedId).ShouldBeEmptyAsync();
199199
await store.EffectsStore.GetEffectResults(storedId).ShouldBeEmptyAsync();
200-
await store.MessageStore.GetMessages(storedId, skip: 0).ShouldBeEmptyAsync();
200+
await store.MessageStore.GetMessages(storedId).ShouldBeEmptyAsync();
201201
}
202202

203203
public abstract Task NonExistingFunctionCanBeDeleted();

Core/Cleipnir.ResilientFunctions.Tests/TestTemplates/StoreTests.cs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -579,7 +579,7 @@ await store.SetFunctionState(
579579
sf.Status.ShouldBe(Status.Succeeded);
580580
sf.Exception.ShouldBeNull();
581581

582-
var storedMessages = await store.MessageStore.GetMessages(functionId, skip: 0);
582+
var storedMessages = await store.MessageStore.GetMessages(functionId);
583583
storedMessages.Count.ShouldBe(1);
584584
var deserializedMessage = (string) DefaultSerializer.Instance.Deserialize(storedMessages[0].MessageContent, DefaultSerializer.Instance.ResolveType(storedMessages[0].MessageType)!);
585585
deserializedMessage.ShouldBe("hello everyone");
@@ -617,7 +617,7 @@ await store.SuspendFunction(
617617
sf.Status.ShouldBe(Status.Suspended);
618618
sf.Parameter.ShouldBe(storedParameter.ToUtf8Bytes());
619619

620-
var messages = await store.MessageStore.GetMessages(functionId, skip: 0);
620+
var messages = await store.MessageStore.GetMessages(functionId);
621621
messages.ShouldBeEmpty();
622622

623623
await Task.Delay(500);
@@ -721,7 +721,7 @@ public async Task MessagesCanBeFetchedAfterFunctionWithInitialMessagesHasBeenCre
721721
await store.MessageStore.AppendMessage(functionId, new StoredMessage("Hello".ToJson().ToUtf8Bytes(), MessageType: typeof(string).SimpleQualifiedName().ToUtf8Bytes(), Position: 0));
722722
await store.MessageStore.AppendMessage(functionId, new StoredMessage("World".ToJson().ToUtf8Bytes(), MessageType: typeof(string).SimpleQualifiedName().ToUtf8Bytes(), Position: 0));
723723

724-
var messages = await store.MessageStore.GetMessages(functionId, skip: 0);
724+
var messages = await store.MessageStore.GetMessages(functionId);
725725
messages.Count.ShouldBe(2);
726726
messages[0].DefaultDeserialize().ShouldBe("Hello");
727727
messages[1].DefaultDeserialize().ShouldBe("World");
@@ -1506,7 +1506,7 @@ protected async Task FunctionCanBeCreatedWithMessagesAndEffects(Task<IFunctionSt
15061506
effectResult2.EffectId.ShouldBe(new EffectId(["SomeEffect2".GetHashCode()]));
15071507
effectResult2.Result!.ToStringFromUtf8Bytes().ShouldBe("hello universe");
15081508

1509-
var messages = await store.MessageStore.GetMessages(storedId, skip: 0);
1509+
var messages = await store.MessageStore.GetMessages(storedId);
15101510
messages.Count.ShouldBe(2);
15111511
var fetchedMessage1 = messages[0];
15121512
fetchedMessage1.MessageType.ToStringFromUtf8Bytes().ShouldBe("some type");
@@ -1571,7 +1571,7 @@ protected async Task FunctionCanBeCreatedWithMessagesOnly(Task<IFunctionStore> s
15711571
var effectResults = await store.EffectsStore.GetEffectResults(storedId);
15721572
effectResults.Count.ShouldBe(0);
15731573

1574-
var messages = await store.MessageStore.GetMessages(storedId, skip: 0);
1574+
var messages = await store.MessageStore.GetMessages(storedId);
15751575
messages.Count.ShouldBe(2);
15761576
var fetchedMessage1 = messages[0];
15771577
fetchedMessage1.MessageType.ToStringFromUtf8Bytes().ShouldBe("some type");
@@ -1646,7 +1646,7 @@ protected async Task FunctionCanBeCreatedWithEffectsOnly(Task<IFunctionStore> st
16461646
effectResult2.EffectId.ShouldBe(new EffectId(["SomeEffect2".GetHashCode()]));
16471647
effectResult2.Result!.ToStringFromUtf8Bytes().ShouldBe("hello universe");
16481648

1649-
var messages = await store.MessageStore.GetMessages(storedId, skip: 0);
1649+
var messages = await store.MessageStore.GetMessages(storedId);
16501650
messages.Count.ShouldBe(0);
16511651

16521652
//idempotency check

Core/Cleipnir.ResilientFunctions/Domain/ExistingMessages.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ private async Task<List<MessageAndIdempotencyKey>> GetReceivedMessages()
4040
)
4141
).ToList();
4242

43-
var storedMessages = await _messageStore.GetMessages(_storedId, skip: 0);
43+
var storedMessages = await _messageStore.GetMessages(_storedId);
4444
_receivedMessages = storedMessages.ToList();
4545
return await GetReceivedMessages();
4646
}

Core/Cleipnir.ResilientFunctions/Messaging/IMessageStore.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ public interface IMessageStore
1717

1818
Task Truncate(StoredId storedId);
1919

20-
Task<IReadOnlyList<StoredMessage>> GetMessages(StoredId storedId, long skip);
20+
Task<IReadOnlyList<StoredMessage>> GetMessages(StoredId storedId);
2121
Task<IReadOnlyList<StoredMessage>> GetMessages(StoredId storedId, IReadOnlyList<long> skipPositions);
2222
Task<Dictionary<StoredId, List<StoredMessage>>> GetMessages(IEnumerable<StoredId> storedIds);
2323
Task<IDictionary<StoredId, long>> GetMaxPositions(IReadOnlyList<StoredId> storedIds);

Core/Cleipnir.ResilientFunctions/Storage/InMemoryFunctionStore.cs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -134,7 +134,7 @@ public Task<int> BulkScheduleFunctions(IEnumerable<IdWithParam> functionsWithPar
134134
}
135135
var sf = await GetFunction(storedId);
136136
var effects = await EffectsStore.GetEffectResults(storedId);
137-
var messages = await MessageStore.GetMessages(storedId, skip: 0);
137+
var messages = await MessageStore.GetMessages(storedId);
138138

139139
var session = new SnapshotStorageSession(owner);
140140
foreach (var effect in effects)
@@ -688,26 +688,26 @@ public virtual Task Truncate(StoredId storedId)
688688
return Task.CompletedTask;
689689
}
690690

691-
private IEnumerable<StoredMessage> GetMessages(StoredId storedId)
691+
private IEnumerable<StoredMessage> GetMessagesInternal(StoredId storedId)
692692
{
693693
lock (_sync)
694694
return !_messages.ContainsKey(storedId)
695695
? Enumerable.Empty<StoredMessage>()
696696
: _messages[storedId].OrderBy(kv => kv.Key).Select(kv => kv.Value with { Position = kv.Key }).ToList();
697697
}
698698

699-
public virtual Task<IReadOnlyList<StoredMessage>> GetMessages(StoredId storedId, long skip)
700-
=> ((IReadOnlyList<StoredMessage>)GetMessages(storedId).Skip((int)skip).ToList()).ToTask();
699+
public virtual Task<IReadOnlyList<StoredMessage>> GetMessages(StoredId storedId)
700+
=> ((IReadOnlyList<StoredMessage>)GetMessagesInternal(storedId).ToList()).ToTask();
701701

702702
public virtual Task<IReadOnlyList<StoredMessage>> GetMessages(StoredId storedId, IReadOnlyList<long> skipPositions)
703-
=> ((IReadOnlyList<StoredMessage>)GetMessages(storedId).Where(m => !skipPositions.Contains(m.Position)).ToList()).ToTask();
703+
=> ((IReadOnlyList<StoredMessage>)GetMessagesInternal(storedId).Where(m => !skipPositions.Contains(m.Position)).ToList()).ToTask();
704704

705705
public async Task<Dictionary<StoredId, List<StoredMessage>>> GetMessages(IEnumerable<StoredId> storedIds)
706706
{
707707
var dict = new Dictionary<StoredId, List<StoredMessage>>();
708708
foreach (var storedId in storedIds)
709709
{
710-
dict[storedId] = (await GetMessages(storedId, skip: 0)).ToList();
710+
dict[storedId] = (await GetMessages(storedId)).ToList();
711711
}
712712

713713
return dict;
@@ -717,7 +717,7 @@ public Task<IDictionary<StoredId, long>> GetMaxPositions(IReadOnlyList<StoredId>
717717
{
718718
IDictionary<StoredId, long> positions = new Dictionary<StoredId, long>();
719719
foreach (var storedId in storedIds)
720-
positions[storedId] = GetMessages(storedId).Count() - 1;
720+
positions[storedId] = GetMessagesInternal(storedId).Count() - 1;
721721

722722
return positions.ToTask();
723723
}

Stores/MariaDB/Cleipnir.ResilientFunctions.MariaDB/MariaDbFunctionStore.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -213,7 +213,7 @@ INSERT IGNORE INTO {_tablePrefix}
213213
public async Task<StoredFlowWithEffectsAndMessages?> RestartExecution(StoredId storedId, ReplicaId replicaId)
214214
{
215215
var restartCommand = _sqlGenerator.RestartExecution(storedId, replicaId);
216-
var messagesCommand = _sqlGenerator.GetMessages(storedId, skip: 0);
216+
var messagesCommand = _sqlGenerator.GetMessages(storedId);
217217

218218
await using var conn = await CreateOpenConnection(_connectionString);
219219
await using var command = StoreCommand

0 commit comments

Comments
 (0)