Skip to content

Commit d9211f8

Browse files
Merge pull request #19 from ledjon-behluli/stream-loading-space
Startup loading behavior batch vs al
2 parents 121764c + 1a8b140 commit d9211f8

File tree

5 files changed

+74
-14
lines changed

5 files changed

+74
-14
lines changed

src/OrleanSpaces/Agents/BaseAgent.cs

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -41,16 +41,11 @@ public BaseAgent(
4141

4242
async ValueTask ISpaceRouter<TTuple, TTemplate>.RouteDirector(IStoreDirector<TTuple> director)
4343
{
44+
this.director = director;
4445
if (options.LoadSpaceContentsUponStartup)
4546
{
46-
var tuples = await director.GetAll();
47-
foreach (var tuple in tuples)
48-
{
49-
this.tuples.Add(tuple);
50-
}
47+
await ReloadAsync();
5148
}
52-
53-
this.director = director;
5449
}
5550

5651
async ValueTask ISpaceRouter<TTuple, TTemplate>.RouteAction(TupleAction<TTuple> action)
@@ -194,7 +189,21 @@ public async IAsyncEnumerable<TTuple> EnumerateAsync(TTemplate template)
194189
}
195190
}
196191

197-
public async Task ReloadAsync() => tuples = await director.GetAll();
192+
public async Task ReloadAsync()
193+
{
194+
if (options.LoadingStrategy == SpaceLoadingStrategy.Sequential)
195+
{
196+
tuples = tuples.Clear();
197+
await foreach (var items in director.GetBatch())
198+
{
199+
tuples = tuples.AddRange(items);
200+
}
201+
}
202+
else
203+
{
204+
tuples = await director.GetAllBatches();
205+
}
206+
}
198207

199208
public async Task ClearAsync()
200209
{

src/OrleanSpaces/Agents/SpaceAgent.cs

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -37,12 +37,11 @@ public SpaceAgent(
3737

3838
async ValueTask ISpaceRouter<SpaceTuple, SpaceTemplate>.RouteDirector(IStoreDirector<SpaceTuple> director)
3939
{
40+
this.director = director;
4041
if (options.LoadSpaceContentsUponStartup)
4142
{
42-
tuples = await director.GetAll();
43+
await ReloadAsync();
4344
}
44-
45-
this.director = director;
4645
}
4746

4847
async ValueTask ISpaceRouter<SpaceTuple, SpaceTemplate>.RouteAction(TupleAction<SpaceTuple> action)
@@ -187,7 +186,21 @@ public async IAsyncEnumerable<SpaceTuple> EnumerateAsync(SpaceTemplate template)
187186
}
188187
}
189188

190-
public async Task ReloadAsync() => tuples = await director.GetAll();
189+
public async Task ReloadAsync()
190+
{
191+
if (options.LoadingStrategy == SpaceLoadingStrategy.Sequential)
192+
{
193+
tuples = tuples.Clear();
194+
await foreach (var items in director.GetBatch())
195+
{
196+
tuples = tuples.AddRange(items);
197+
}
198+
}
199+
else
200+
{
201+
tuples = await director.GetAllBatches();
202+
}
203+
}
191204

192205
public async Task ClearAsync()
193206
{

src/OrleanSpaces/Grains/Directors/BaseDirector.cs

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,19 @@ public override async Task OnActivateAsync(CancellationToken cancellationToken)
4444
}
4545
}
4646

47-
public async Task<ImmutableArray<StoreTuple<TTuple>>> GetAll()
47+
public async IAsyncEnumerable<ImmutableArray<StoreTuple<TTuple>>> GetBatch()
48+
{
49+
foreach (string storeKey in StoreKeys)
50+
{
51+
Guid storeId = ParseStoreKey(storeKey);
52+
var content = await GrainFactory.GetGrain<TStore>(storeKey).GetAll();
53+
var result = content.Tuples.Select(tuple => new StoreTuple<TTuple>(storeId, tuple)).ToImmutableArray();
54+
55+
yield return result;
56+
}
57+
}
58+
59+
public async Task<ImmutableArray<StoreTuple<TTuple>>> GetAllBatches()
4860
{
4961
List<Task<StoreContent<TTuple>>> tasks = new();
5062
foreach (string storeKey in StoreKeys)

src/OrleanSpaces/IStoreDirector.cs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,9 @@ namespace OrleanSpaces;
77
internal interface IStoreDirector<T>
88
where T : ISpaceTuple
99
{
10-
[ReadOnly] Task<ImmutableArray<StoreTuple<T>>> GetAll();
10+
[ReadOnly] IAsyncEnumerable<ImmutableArray<StoreTuple<T>>> GetBatch();
11+
[ReadOnly] Task<ImmutableArray<StoreTuple<T>>> GetAllBatches();
12+
1113
Task<Guid> Insert(TupleAction<T> action);
1214
Task Remove(TupleAction<T> action);
1315
Task RemoveAll(Guid agentId);

src/OrleanSpaces/SpaceOptions.cs

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,11 @@ public sealed class SpaceClientOptions
6666
/// <see cref="ISpaceAgent{T, TTuple, TTemplate}.ReloadAsync"/>, depending on the agent type.
6767
/// </i></remarks>
6868
public bool LoadSpaceContentsUponStartup { get; set; } = true;
69+
70+
/// <summary>
71+
/// Defines the way how the space contents (<i>i.e. the tuples</i>) are loaded by the agent.
72+
/// </summary>
73+
public SpaceLoadingStrategy LoadingStrategy { get; set; } = SpaceLoadingStrategy.Parallel;
6974
}
7075

7176
/// <summary>
@@ -96,3 +101,22 @@ public enum SpaceKind
96101
ULong = 262144,
97102
UShort = 524288
98103
}
104+
105+
/// <summary>
106+
/// Strategies for space loading.
107+
/// </summary>
108+
public enum SpaceLoadingStrategy
109+
{
110+
/// <summary>
111+
/// <para>Content from all partitions are loaded sequentially.</para>
112+
/// <para>Results in a longer time to load the space. But ultimately results in less resource contention, and avoids potential <see cref="ThreadPool"/> starvation.</para>
113+
/// </summary>
114+
/// <remarks><i>Use if fast loading time is not important, and the space is heavily partitioned.</i></remarks>
115+
Sequential,
116+
/// <summary>
117+
/// <para>Content from all partitions are loaded in parallel.</para>
118+
/// <para>Results in less resource contention, and avoids potential <see cref="ThreadPool"/> starvation. But ultimately results in a longer time to load the space.</para>
119+
/// </summary>
120+
/// <remarks><i>Use if fast loading time is important, and the space is not heavily partitioned.</i></remarks>
121+
Parallel
122+
}

0 commit comments

Comments
 (0)