Skip to content
Merged

V4.0 #28

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
119 changes: 11 additions & 108 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# ![RingBufferPlus Logo](https://raw.githubusercontent.com/FRACerqueira/RingBufferPlus/refs/heads/main/docs/images/icon.png) Welcome to RingBufferPlus
# ![RingBufferPlus Logo](https://raw.githubusercontent.com/FRACerqueira/RingBufferPlus/refs/heads/main/icon.png) Welcome to RingBufferPlus

## **The generic ring buffer with auto-scaler (elastic buffer).**

Expand Down Expand Up @@ -60,7 +60,7 @@ A ring buffer makes a bounded queue when separate indices are used for inserting
- Bug fixed when used with Rabbitmq.
- Removed need to set Automatic Recovery to false for use with Rabbitmq
- Removed Master/Slave, ReportScale, BufferHealth, ScaleWhen..., RollbackWhen... and TriggerByAccqWhen... concept (Break changes)
- Added command LockAcquireWhenAutoScale
- Added command LockWhenScaling
- Added command AutoScaleAcquireFault
- Added command HeartBeat
- Added command BackgroundLogger
Expand Down Expand Up @@ -116,9 +116,9 @@ var rb = await RingBuffer<int>.New("MyBuffer")

Console.WriteLine($"Ring Buffer name({rb.Name}) created.");
Console.WriteLine($"Ring Buffer Current capacity is : {rb.CurrentCapacity}");
Console.WriteLine($"Ring Buffer name({rb.Name}) IsInitCapacity = {rb.IsInitCapacity}.");
Console.WriteLine($"Ring Buffer name({rb.Name}) IsMaxCapacity = {rb.IsMaxCapacity}.");
Console.WriteLine($"Ring Buffer name({rb.Name}) IsMinCapacity = {rb.IsMinCapacity}.");
Console.WriteLine($"Ring Buffer name({rb.Name}) IsInitCapacity({rb.Capacity}) = {rb.IsInitCapacity}.");
Console.WriteLine($"Ring Buffer name({rb.Name}) IsMaxCapacity({rb.MaxCapacity}) = {rb.IsMaxCapacity}.");
Console.WriteLine($"Ring Buffer name({rb.Name}) IsMinCapacity({rb.MinCapacity}) = {rb.IsMinCapacity}.");

using (var buffer = await rb.AcquireAsync(token))
{
Expand Down Expand Up @@ -152,29 +152,11 @@ var rb = await RingBuffer<int>.New("MyBuffer")
.MaxCapacity(9)
.BuildWarmupAsync(token);

Console.WriteLine($"Ring Buffer name({rb.Name}) created.");
Console.WriteLine($"Ring Buffer Current capacity is : {rb.CurrentCapacity}");
Console.WriteLine($"Ring Buffer name({rb.Name}) IsInitCapacity = {rb.IsInitCapacity}.");
Console.WriteLine($"Ring Buffer name({rb.Name}) IsMaxCapacity = {rb.IsMaxCapacity}.");
Console.WriteLine($"Ring Buffer name({rb.Name}) IsMinCapacity = {rb.IsMinCapacity}.");

if (!await rb.SwitchToAsync(ScaleSwitch.MaxCapacity))
{
//manual scale was not scheduled
//do something
}

using (var buffer = await rb.AcquireAsync(token))
{
if (buffer.Successful)
{
Console.WriteLine($"Buffer is ok({buffer.Successful}:{buffer.ElapsedTime}) value: {buffer.Current}");
}
else
{
//do something
}
}
}
```

### Trigger Scale Usage
Expand All @@ -198,27 +180,9 @@ var rb = await RingBuffer<int>.New("MyBuffer")
.MinCapacity(3)
.MaxCapacity(9)
.BuildWarmupAsync(token);

Console.WriteLine($"Ring Buffer name({rb.Name}) created.");
Console.WriteLine($"Ring Buffer Current capacity is : {rb.CurrentCapacity}");
Console.WriteLine($"Ring Buffer name({rb.Name}) IsInitCapacity = {rb.IsInitCapacity}.");
Console.WriteLine($"Ring Buffer name({rb.Name}) IsMaxCapacity = {rb.IsMaxCapacity}.");
Console.WriteLine($"Ring Buffer name({rb.Name}) IsMinCapacity = {rb.IsMinCapacity}.");

using (var buffer = await rb.AcquireAsync(token))
{
if (buffer.Successful)
{
Console.WriteLine($"Buffer is ok({buffer.Successful}:{buffer.ElapsedTime}) value: {buffer.Current}");
}
else
{
//do something
}
}
```

### Lock/Unlock de Acquire/Switch Usage
### Lock Acquire/Switch Usage
[**Top**](#table-of-contents)

When the scaling up or down process is executed, acquisition or scale switching is not blocked.
Expand All @@ -233,29 +197,10 @@ var rb = await RingBuffer<int>.New("MyBuffer")
.Logger(HostApp.Services.GetService<ILogger<Program>>())
.Factory((_) => { return Task.FromResult(rnd.Next(1, 10)); })
.ScaleTimer(50, TimeSpan.FromSeconds(5))
.LockAcquireWhenAutoScale()
.AutoScaleAcquireFault()
.LockWhenScaling()
.MinCapacity(3)
.MaxCapacity(9)
.BuildWarmupAsync(token);

Console.WriteLine($"Ring Buffer name({rb.Name}) created.");
Console.WriteLine($"Ring Buffer Current capacity is : {rb.CurrentCapacity}");
Console.WriteLine($"Ring Buffer name({rb.Name}) IsInitCapacity = {rb.IsInitCapacity}.");
Console.WriteLine($"Ring Buffer name({rb.Name}) IsMaxCapacity = {rb.IsMaxCapacity}.");
Console.WriteLine($"Ring Buffer name({rb.Name}) IsMinCapacity = {rb.IsMinCapacity}.");

using (var buffer = await rb.AcquireAsync(token))
{
if (buffer.Successful)
{
Console.WriteLine($"Buffer is ok({buffer.Successful}:{buffer.ElapsedTime}) value: {buffer.Current}");
}
else
{
//do something
}
}
```

### HeartBeat Usage
Expand All @@ -275,24 +220,6 @@ var rb = await RingBuffer<int>.New("MyBuffer")
.Factory((_) => { return Task.FromResult(rnd.Next(1, 10)); })
.BuildWarmupAsync(token);

Console.WriteLine($"Ring Buffer name({rb.Name}) created.");
Console.WriteLine($"Ring Buffer Current capacity is : {rb.CurrentCapacity}");
Console.WriteLine($"Ring Buffer name({rb.Name}) IsInitCapacity = {rb.IsInitCapacity}.");
Console.WriteLine($"Ring Buffer name({rb.Name}) IsMaxCapacity = {rb.IsMaxCapacity}.");
Console.WriteLine($"Ring Buffer name({rb.Name}) IsMinCapacity = {rb.IsMinCapacity}.");

using (var buffer = await rb.AcquireAsync(token))
{
if (buffer.Successful)
{
Console.WriteLine($"Buffer is ok({buffer.Successful}:{buffer.ElapsedTime}) value: {buffer.Current}");
}
else
{
//do something
}
}

private void MyHeartBeat(RingBufferValue<int> item)
{
//do anything ex: health check
Expand All @@ -302,7 +229,7 @@ private void MyHeartBeat(RingBufferValue<int> item)
### Background Logger Usage
[**Top**](#table-of-contents)

Log execution is done automatically by the component (Level debug and Error) in the same execution thread. This process can burden execution if the log recording process takes a long time.
Log execution is done automatically by the component (Level Debug, Warning and Error) in the same execution thread. This process can burden execution if the log recording process takes a long time.

For this scenario, you can use the log execution in the background in an asynchronous process done by the component.

Expand All @@ -315,24 +242,6 @@ var rb = await RingBuffer<int>.New("MyBuffer")
.BackgroundLogger()
.Factory((_) => { return Task.FromResult(rnd.Next(1, 10)); })
.BuildWarmupAsync(token);

Console.WriteLine($"Ring Buffer name({rb.Name}) created.");
Console.WriteLine($"Ring Buffer Current capacity is : {rb.CurrentCapacity}");
Console.WriteLine($"Ring Buffer name({rb.Name}) IsInitCapacity = {rb.IsInitCapacity}.");
Console.WriteLine($"Ring Buffer name({rb.Name}) IsMaxCapacity = {rb.IsMaxCapacity}.");
Console.WriteLine($"Ring Buffer name({rb.Name}) IsMinCapacity = {rb.IsMinCapacity}.");

using (var buffer = await rb.AcquireAsync(token))
{
if (buffer.Successful)
{
Console.WriteLine($"Buffer is ok({buffer.Successful}:{buffer.ElapsedTime}) value: {buffer.Current}");
}
else
{
//do something
}
}
```

## RabbitMQ Usage
Expand Down Expand Up @@ -363,19 +272,13 @@ var rb = await RingBuffer<IChannel>.New("RabbitChanels")
.Capacity(10)
.Logger(applogger!)
.BackgroundLogger()
.Factory((cts) => ModelFactory(cts)!)
.Factory((cts) => ChannelFactory(cts))
.ScaleTimer(100, TimeSpan.FromSeconds(10))
.MaxCapacity(20)
.MinCapacity(5)
.AutoScaleAcquireFault()
.BuildWarmupAsync(token);

Console.WriteLine($"Ring Buffer name({rb.Name}) created.");
Console.WriteLine($"Ring Buffer Current capacity is : {rb.CurrentCapacity}");
Console.WriteLine($"Ring Buffer name({rb.Name}) IsInitCapacity = {rb.IsInitCapacity}.");
Console.WriteLine($"Ring Buffer name({rb.Name}) IsMaxCapacity = {rb.IsMaxCapacity}.");
Console.WriteLine($"Ring Buffer name({rb.Name}) IsMinCapacity = {rb.IsMinCapacity}.");

using (var buffer = await rb.AcquireAsync(token))
{
if (buffer.Successful)
Expand All @@ -389,7 +292,7 @@ using (var buffer = await rb.AcquireAsync(token))
}
}

private async Task<IChannel> ModelFactory(CancellationToken cancellation)
private async Task<IChannel> ChannelFactory(CancellationToken cancellation)
{
return await connectionRabbit!.CreateChannelAsync(cancellationToken: cancellation);
}
Expand Down
84 changes: 83 additions & 1 deletion samples/RingBufferPlusRabbitSample/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ static async Task<IChannel> ChannelFactory(CancellationToken cancellation)
Console.WriteLine($"Ring Buffer name({rb.Name}) IsMaxCapacity = {rb.IsMaxCapacity}.");
Console.WriteLine($"Ring Buffer name({rb.Name}) IsMinCapacity = {rb.IsMinCapacity}.");

Console.WriteLine($"Wait... 20 sec. to start {threadCount} thread");
Console.WriteLine($"Wait... 20 sec. to start {threadCount} thread using Non lock Acquire");
Thread.Sleep(TimeSpan.FromSeconds(20));

Console.WriteLine($"Running 60 seconds..");
Expand Down Expand Up @@ -148,6 +148,88 @@ static async Task<IChannel> ChannelFactory(CancellationToken cancellation)
}
sw.Reset();

threads.Clear();

cts.Dispose();
cts = CancellationTokenSource.CreateLinkedTokenSource(tokenapplifetime);

Console.WriteLine($"Wait... 20 sec. to start {threadCount} thread using lock Acquire");

rb = await RingBuffer<IChannel>.New("RabbitChanels")
.Capacity(10)
.Logger(hostApp.Services.GetService<ILogger<Program>>())
.BackgroundLogger()
.Factory((cts) => ChannelFactory(cts)!)
.ScaleTimer(50, TimeSpan.FromSeconds(5))
.MaxCapacity(20)
.MinCapacity(5)
.LockWhenScaling()
.AutoScaleAcquireFault()
.BuildWarmupAsync(cts.Token);

Console.WriteLine($"Ring Buffer name({rb.Name}) created.");
Console.WriteLine($"Ring Buffer Current capacity = {rb.CurrentCapacity}");
Console.WriteLine($"Ring Buffer name({rb.Name}) IsInitCapacity = {rb.IsInitCapacity}.");
Console.WriteLine($"Ring Buffer name({rb.Name}) IsMaxCapacity = {rb.IsMaxCapacity}.");
Console.WriteLine($"Ring Buffer name({rb.Name}) IsMinCapacity = {rb.IsMinCapacity}.");

Thread.Sleep(TimeSpan.FromSeconds(20));

Console.WriteLine($"Running 60 seconds..");
Thread.Sleep(TimeSpan.FromSeconds(1));

dtref = DateTime.Now.AddSeconds(60);
qtdstart = 0;
for (int i = 0; i < threadCount; i++)
{
Thread thread = new(async () =>
{
var id = Interlocked.Increment(ref qtdstart);
Console.WriteLine($"Thread {qtdstart} started ");
while (true)
{
if (DateTime.Now >= dtref)
{
Console.WriteLine($"wait({id}) 60 seconds (idle)");
Thread.Sleep(TimeSpan.FromSeconds(60));
break;
}
using var bufferedItem = await rb!.AcquireAsync();
if (bufferedItem.Successful)
{
var body = new ReadOnlyMemory<byte>(messageBodyBytes);
await bufferedItem.Current!.BasicPublishAsync("", "log", body);
}
else
{
if (!cts.IsCancellationRequested)
{
Console.WriteLine($"RingBuffer-{id}({bufferedItem.Successful}:{bufferedItem.ElapsedTime}) Channel Capacity({rb!.CurrentCapacity})");
}
}
}
Console.WriteLine($"Thread {id} ended");
Interlocked.Decrement(ref qtdstart);
});
thread.Start();
threads.Add(thread);
}

Console.WriteLine($"Waiting for {threadCount} threads to finish...");
while (qtdstart > 0)
{
Thread.Sleep(10);
}

Console.WriteLine("Dispose ring buffer");
cts.Cancel();
sw = Stopwatch.StartNew();
while (sw.ElapsedMilliseconds < 10000)
{
Thread.Sleep(1000);
Console.WriteLine($"Ring Buffer {rb!.Name} current capacity : {rb!.CurrentCapacity}");
}
sw.Reset();
}

public static string RandomString(int length)
Expand Down
6 changes: 3 additions & 3 deletions src/RingBufferPlus.Tests/RingBufferManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ public async Task SwitchToAsync_ShouldScaleToMinCapacity()
// Arrange
var builder = new RingBufferBuilder<int>("TestBuffer",null);
builder.Capacity(5);
builder.LockAcquireWhenAutoScale();
builder.LockWhenScaling();
builder.MaxCapacity(10);
builder.MinCapacity(2);
builder.Factory((_) => Task.FromResult(0));
Expand All @@ -104,7 +104,7 @@ public async Task SwitchToAsync_ShouldScaleToMaxCapacity()
// Arrange
var builder = new RingBufferBuilder<int>("TestBuffer", null);
builder.Capacity(5);
builder.LockAcquireWhenAutoScale();
builder.LockWhenScaling();
builder.MaxCapacity(10);
builder.MinCapacity(2);
builder.Factory((_) => Task.FromResult(0));
Expand All @@ -124,7 +124,7 @@ public async Task SwitchToAsync_ShouldScaleToInitCapacity()
// Arrange
var builder = new RingBufferBuilder<int>("TestBuffer", null);
builder.Capacity(5);
builder.LockAcquireWhenAutoScale();
builder.LockWhenScaling();
builder.MaxCapacity(10);
builder.MinCapacity(2);
builder.Factory((_) => Task.FromResult(0));
Expand Down
4 changes: 2 additions & 2 deletions src/RingBufferPlus/Commands/IRingBufferScaleCapacity.cs
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,11 @@ public interface IRingBufferScaleCapacity<T> : IRingBufferBuild<T>
IRingBufferScaleCapacity<T> MaxCapacity(int value);

/// <summary>
/// Sets acquisition lock when running auto scale.
/// Sets acquisition/Switch lock when running scaleUp/ScaleDown.
/// </summary>
/// <param name="value">True to acquisition lock.Default true</param>
/// <returns></returns>
IRingBufferScaleCapacity<T> LockAcquireWhenAutoScale(bool value = true);
IRingBufferScaleCapacity<T> LockWhenScaling(bool value = true);

/// <summary>
/// Sets the condition to autoscale (scale up) capacity when an acquire fault occurs. The Manually change scale will always return false if autoscale is enabled.
Expand Down
2 changes: 1 addition & 1 deletion src/RingBufferPlus/Core/RingBufferBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ public IRingBuffer<T> BackgroundLogger(bool value = true)
return this;
}

public IRingBufferScaleCapacity<T> LockAcquireWhenAutoScale(bool value = true)
public IRingBufferScaleCapacity<T> LockWhenScaling(bool value = true)
{
_lockAcquire = value;
return this;
Expand Down
Loading