Skip to content

Commit 9c3a78f

Browse files
committed
Simplified Workflow.Delay method
1 parent e409c6b commit 9c3a78f

File tree

7 files changed

+142
-21
lines changed

7 files changed

+142
-21
lines changed

Core/Cleipnir.ResilientFunctions.Tests/InMemoryTests/RFunctionTests/SuspensionTests.cs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,4 +99,8 @@ public override Task DelayedFlowIsRestartedOnce()
9999
[TestMethod]
100100
public override Task InterruptedExecutingFlowIsRestartedOnce()
101101
=> InterruptedExecutingFlowIsRestartedOnce(FunctionStoreFactory.Create());
102+
103+
[TestMethod]
104+
public override Task TwoDelaysFlowCompletesSuccessfully()
105+
=> TwoDelaysFlowCompletesSuccessfully(FunctionStoreFactory.Create());
102106
}

Core/Cleipnir.ResilientFunctions.Tests/TestTemplates/FunctionTests/SuspensionTests.cs

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -826,6 +826,46 @@ protected async Task InterruptedExecutingFlowIsRestartedOnce(Task<IFunctionStore
826826
await cp.WaitForCompletion(allowPostponeAndSuspended: true);
827827

828828
syncedCounter.Current.ShouldBe(2);
829+
830+
unhandledExceptionHandler.ShouldNotHaveExceptions();
831+
}
832+
833+
public abstract Task TwoDelaysFlowCompletesSuccessfully();
834+
protected async Task TwoDelaysFlowCompletesSuccessfully(Task<IFunctionStore> storeTask)
835+
{
836+
var store = await storeTask;
837+
var id = TestFlowId.Create();
838+
var (flowType, flowInstance) = id;
839+
840+
var unhandledExceptionHandler = new UnhandledExceptionCatcher();
841+
using var functionsRegistry = new FunctionsRegistry
842+
(
843+
store,
844+
new Settings(unhandledExceptionHandler.Catch)
845+
);
846+
847+
var counter = new SyncedCounter();
848+
var registration = functionsRegistry.RegisterParamless(
849+
flowType,
850+
inner: async Task (workflow) =>
851+
{
852+
counter.Increment();
853+
await workflow.Delay(TimeSpan.FromMilliseconds(100), alias: "first");
854+
await workflow.Delay(TimeSpan.FromMilliseconds(100), alias: "second");
855+
}
856+
);
857+
858+
await registration.Schedule(flowInstance);
859+
860+
var cp = await registration.ControlPanel(flowInstance);
861+
cp.ShouldNotBeNull();
862+
863+
await cp.WaitForCompletion(allowPostponeAndSuspended: true);
864+
await cp.Refresh();
865+
866+
cp.Status.ShouldBe(Status.Succeeded);
867+
868+
counter.Current.ShouldBe(3);
829869

830870
unhandledExceptionHandler.ShouldNotHaveExceptions();
831871
}
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
using System;
2+
using System.Collections.Generic;
3+
using System.Threading;
4+
using System.Threading.Tasks;
5+
using Cleipnir.ResilientFunctions.Domain;
6+
using Cleipnir.ResilientFunctions.Queuing;
7+
using Cleipnir.ResilientFunctions.Storage;
8+
9+
namespace Cleipnir.ResilientFunctions.CoreRuntime;
10+
11+
public record FlowStatus(StoredId Id, Action Suspend, QueueManager QueueManager, int Threads, int AwaitingThreads);
12+
13+
public class FlowsManager
14+
{
15+
private readonly Dictionary<StoredId, FlowStatus> _dict = new();
16+
private readonly Lock _lock = new();
17+
18+
public void AddFlow(StoredId id, Action suspend, QueueManager queueManager)
19+
{
20+
lock (_lock)
21+
{
22+
if (!_dict.ContainsKey(id))
23+
return;
24+
25+
_dict[id] = new FlowStatus(id, suspend, queueManager, Threads: 1, AwaitingThreads: 0);
26+
}
27+
}
28+
29+
public async Task Interrupt(IEnumerable<StoredId> ids)
30+
{
31+
lock (_lock)
32+
{
33+
foreach (var id in ids)
34+
{
35+
if (!_dict.ContainsKey(id))
36+
continue;
37+
38+
var queueManager = _dict[id].QueueManager;
39+
Task.Run(() => queueManager.FetchMessagesOnce());
40+
}
41+
}
42+
}
43+
44+
public void AddThread(StoredId id)
45+
{
46+
throw new NotImplementedException();
47+
}
48+
49+
public void CompletedThread(StoredId id)
50+
{
51+
throw new NotImplementedException();
52+
}
53+
54+
public void StartedThread(StoredId id)
55+
{
56+
throw new NotImplementedException();
57+
}
58+
59+
public void SuspendedThread(StoredId id)
60+
{
61+
throw new NotImplementedException();
62+
}
63+
}

Core/Cleipnir.ResilientFunctions/CoreRuntime/Invocation/Workflow.cs

Lines changed: 23 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -40,34 +40,36 @@ public Workflow(FlowId flowId, StoredId storedId, Effect effect, Utilities utili
4040
public async Task RegisterCorrelation(string correlation) => await Correlations.Register(correlation);
4141

4242
public Task Delay(TimeSpan @for, bool suspend = true, string? alias = null) => Delay(until: _utcNow() + @for, suspend, alias);
43-
public async Task Delay(DateTime until, bool suspend = true, string? alias = null)
43+
public Task Delay(DateTime until, bool suspend = true, string? alias = null)
4444
{
4545
var effectId = Effect.TakeNextImplicitId();
4646
var timeoutId = EffectId.CreateWithCurrentContext(effectId);
4747

48-
var (status, expiry) = await RegisteredTimeouts.RegisterTimeout(
49-
timeoutId,
50-
until,
51-
publishMessage: false,
52-
alias
53-
);
54-
55-
if (status is TimeoutStatus.Completed or TimeoutStatus.Cancelled)
56-
{
57-
return;
58-
}
59-
60-
var delay = (expiry - _utcNow()).RoundUpToZero();
61-
if (!suspend)
48+
async Task Inner()
6249
{
63-
await Task.Delay(delay);
64-
delay = TimeSpan.Zero;
50+
var expiry = await Effect.CreateOrGet(timeoutId, until.ToUniversalTime().Ticks, alias, flush: false);
51+
52+
if (expiry == -1)
53+
{
54+
return;
55+
}
56+
57+
Effect.FlowMinimumTimeout.AddTimeout(timeoutId, expiry.ToDateTime());
58+
var delay = (expiry.ToDateTime() - _utcNow()).RoundUpToZero();
59+
if (!suspend)
60+
{
61+
await Task.Delay(delay);
62+
delay = TimeSpan.Zero;
63+
}
64+
65+
if (delay > TimeSpan.Zero)
66+
throw new SuspendInvocationException();
67+
68+
await Effect.Upsert(timeoutId, -1L, alias, flush: false);
69+
Effect.FlowMinimumTimeout.RemoveTimeout(timeoutId);
6570
}
6671

67-
if (delay == TimeSpan.Zero)
68-
await RegisteredTimeouts.CompleteTimeout(timeoutId, alias);
69-
else
70-
throw new SuspendInvocationException();
72+
return Inner();
7173
}
7274

7375
public Task<T> Message<T>(TimeSpan? maxWait = null) where T : class

Stores/MariaDB/Cleipnir.ResilientFunctions.MariaDB.Tests/RFunctionTests/SuspensionTests.cs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,4 +87,8 @@ public override Task DelayedFlowIsRestartedOnce()
8787
[TestMethod]
8888
public override Task InterruptedExecutingFlowIsRestartedOnce()
8989
=> InterruptedExecutingFlowIsRestartedOnce(FunctionStoreFactory.Create());
90+
91+
[TestMethod]
92+
public override Task TwoDelaysFlowCompletesSuccessfully()
93+
=> TwoDelaysFlowCompletesSuccessfully(FunctionStoreFactory.Create());
9094
}

Stores/PostgreSQL/Cleipnir.ResilientFunctions.PostgreSQL.Tests/RFunctionTests/SuspensionTests.cs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,4 +88,8 @@ public override Task DelayedFlowIsRestartedOnce()
8888
[TestMethod]
8989
public override Task InterruptedExecutingFlowIsRestartedOnce()
9090
=> InterruptedExecutingFlowIsRestartedOnce(FunctionStoreFactory.Create());
91+
92+
[TestMethod]
93+
public override Task TwoDelaysFlowCompletesSuccessfully()
94+
=> TwoDelaysFlowCompletesSuccessfully(FunctionStoreFactory.Create());
9195
}

Stores/SqlServer/Cleipnir.ResilientFunctions.SqlServer.Tests/RFunctionTests/SuspensionTests.cs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,4 +88,8 @@ public override Task DelayedFlowIsRestartedOnce()
8888
[TestMethod]
8989
public override Task InterruptedExecutingFlowIsRestartedOnce()
9090
=> InterruptedExecutingFlowIsRestartedOnce(FunctionStoreFactory.Create());
91+
92+
[TestMethod]
93+
public override Task TwoDelaysFlowCompletesSuccessfully()
94+
=> TwoDelaysFlowCompletesSuccessfully(FunctionStoreFactory.Create());
9195
}

0 commit comments

Comments
 (0)