Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
ceb08b3
Initial plan
Copilot Jan 5, 2026
db2e161
feat: Add QuoteAggregatorHub for streaming quote aggregation
Copilot Jan 5, 2026
922c77a
test: Add interface compliance and chain provider tests
Copilot Jan 5, 2026
a62c3ad
refactor: Simplify QuoteAggregatorHub properties and calculations
DaveSkender Jan 7, 2026
5f1186b
feat: Add TickAggregatorHub for tick-to-bar aggregation
Copilot Jan 7, 2026
9ca279e
feat: Add duplicate and out-of-sequence handling to aggregators
Copilot Jan 7, 2026
d9127b0
Merge branch 'v3' into copilot/add-quote-stream-aggregator
DaveSkender Jan 11, 2026
b796af0
Merge branch 'v3' into copilot/add-quote-stream-aggregator
DaveSkender Jan 12, 2026
b58c8ec
Merge branch 'v3' into copilot/add-quote-stream-aggregator
DaveSkender Jan 12, 2026
4aaef2d
Merge branch 'v3' into copilot/add-quote-stream-aggregator
DaveSkender Jan 12, 2026
e4eb40d
Merge branch 'v3' into copilot/add-quote-stream-aggregator
DaveSkender Jan 20, 2026
633312c
Merge branch 'v3' into copilot/add-quote-stream-aggregator
DaveSkender Jan 20, 2026
369f934
code cleanup
DaveSkender Jan 20, 2026
41e5db8
fix failing test
DaveSkender Jan 20, 2026
a4dd550
fix: Rebuild streaming aggregates for late ticks (#1943)
DaveSkender Jan 28, 2026
8e7ab8c
Merge branch 'v3' into copilot/add-quote-stream-aggregator
DaveSkender Jan 29, 2026
56eee6e
fix: Address all code review feedback for aggregator hubs
Copilot Jan 29, 2026
7785c41
test: Add ITestTickObserver interface and required test method
Copilot Jan 29, 2026
49d163c
Update src/_common/Quotes/Tick.StreamHub.cs
DaveSkender Feb 1, 2026
c4d5f8c
Merge branch 'v3' into copilot/add-quote-stream-aggregator
DaveSkender Feb 2, 2026
45edbe6
feat: enhance quote and tick aggregators with better error handling
DaveSkender Feb 2, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@
{
"[markdown]": {
"editor.defaultFormatter": "DavidAnson.vscode-markdownlint",
"editor.formatOnSave": true,
"editor.codeActionsOnSave": {
"source.fixAll.markdownlint": "explicit"
}
},
"chat.useAgentSkills": true,
"chat.useAgentsMdFile": true,
"chat.useNestedAgentsMdFiles": true,
"chat.customAgentInSubagent.enabled": true,
Expand All @@ -12,6 +17,7 @@
"files.associations": {
"*.md": "markdown"
},
"github.copilot.chat.customAgents.showOrganizationAndEnterpriseAgents": true,
"github.copilot.chat.githubMcpServer.enabled": true,
"github.copilot.chat.githubMcpServer.readonly": false,
"github.copilot.chat.githubMcpServer.toolsets": [
Expand Down
24 changes: 24 additions & 0 deletions src/_common/Quotes/ITick.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
namespace Skender.Stock.Indicators;

/// <summary>
/// Tick interface for raw market tick data.
/// This represents a single trade or quote event with price and volume at a specific timestamp.
/// </summary>
public interface ITick : IReusable
{
/// <summary>
/// Tick price
/// </summary>
decimal Price { get; }

/// <summary>
/// Tick volume (quantity traded)
/// </summary>
decimal Volume { get; }

/// <summary>
/// Optional unique execution ID for duplicate detection.
/// When null, duplicates are assessed by timestamp only.
/// </summary>
string? ExecutionId { get; }
}
301 changes: 301 additions & 0 deletions src/_common/Quotes/Quote.AggregatorHub.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,301 @@
namespace Skender.Stock.Indicators;

/// <summary>
/// Streaming hub for aggregating quotes into larger time periods.
/// </summary>
public class QuoteAggregatorHub
: QuoteProvider<IQuote, IQuote>
{
private readonly object _addLock = new();
private readonly Dictionary<DateTime, IQuote> _inputQuoteTracker = [];
private Quote? _currentBar;
private DateTime _currentBarTimestamp;

/// <summary>
/// Initializes a new instance of the <see cref="QuoteAggregatorHub"/> class.
/// </summary>
/// <param name="provider">The quote provider.</param>
/// <param name="periodSize">The period size to aggregate to.</param>
/// <param name="fillGaps">Whether to fill gaps by carrying forward the last known price.</param>
public QuoteAggregatorHub(
IQuoteProvider<IQuote> provider,
PeriodSize periodSize,
bool fillGaps = false)
: base(provider)
{
if (periodSize == PeriodSize.Month)
{
throw new ArgumentException(
$"Month aggregation is not supported in streaming mode. periodSize={periodSize}. Use TimeSpan overload for custom periods.",
nameof(periodSize));
}

AggregationPeriod = periodSize.ToTimeSpan();
FillGaps = fillGaps;
Name = $"QUOTE-AGG({periodSize})";

Reinitialize();
}

/// <summary>
/// Initializes a new instance of the <see cref="QuoteAggregatorHub"/> class.
/// </summary>
/// <param name="provider">The quote provider.</param>
/// <param name="timeSpan">The time span to aggregate to.</param>
/// <param name="fillGaps">Whether to fill gaps by carrying forward the last known price.</param>
/// <exception cref="ArgumentOutOfRangeException">Thrown when the time span is less than or equal to zero.</exception>
public QuoteAggregatorHub(
IQuoteProvider<IQuote> provider,
TimeSpan timeSpan,
bool fillGaps = false)
: base(provider)
{
if (timeSpan <= TimeSpan.Zero)
{
throw new ArgumentOutOfRangeException(nameof(timeSpan), timeSpan,
"Aggregation period must be greater than zero.");
}

AggregationPeriod = timeSpan;
FillGaps = fillGaps;
Name = $"QUOTE-AGG({timeSpan})";

Reinitialize();
}

/// <summary>
/// Gets a value indicating whether gap filling is enabled.
/// </summary>
public bool FillGaps { get; }

/// <summary>
/// Gets the aggregation period.
/// </summary>
public TimeSpan AggregationPeriod { get; }

/// <inheritdoc/>
public override void OnAdd(IQuote item, bool notify, int? indexHint)
{
ArgumentNullException.ThrowIfNull(item);

lock (_addLock)
{
DateTime barTimestamp = item.Timestamp.RoundDown(AggregationPeriod);

// Check if this exact input quote was already processed (duplicate detection)
if (_inputQuoteTracker.TryGetValue(item.Timestamp, out IQuote? previousQuote))
{
// This is an update to a previously seen quote - need to subtract old values
// and add new values to avoid double-counting
if (previousQuote.Timestamp == item.Timestamp)
{
// Update tracker with new quote
_inputQuoteTracker[item.Timestamp] = item;

// Rebuild from this bar to recalculate correctly
if (_currentBar != null && barTimestamp == _currentBarTimestamp)
{
Rebuild(barTimestamp);
return;
}
}
}
else
{
// Track this input quote
_inputQuoteTracker[item.Timestamp] = item;

// Prune old tracker entries (keep last 1000 or within 10x aggregation period)
if (_inputQuoteTracker.Count > 1000)
{
DateTime pruneThreshold = item.Timestamp.Add(-10 * AggregationPeriod);
List<DateTime> toRemove = _inputQuoteTracker
.Where(kvp => kvp.Key < pruneThreshold)
.Select(kvp => kvp.Key)
.ToList();

foreach (DateTime key in toRemove)
{
_inputQuoteTracker.Remove(key);
}
}
}

// Determine if this is for current bar, future bar, or past bar
bool isFutureBar = _currentBar == null || barTimestamp > _currentBarTimestamp;
bool isPastBar = _currentBar != null && barTimestamp < _currentBarTimestamp;

// Handle late arrival for past bar
if (isPastBar)
{
Rebuild(barTimestamp);
return;
}

// Handle gap filling if enabled and moving to future bar
if (FillGaps && isFutureBar && _currentBar != null)
{
DateTime lastBarTimestamp = _currentBarTimestamp;
DateTime nextExpectedBarTimestamp = lastBarTimestamp.Add(AggregationPeriod);

// Fill gaps between last bar and current bar
while (nextExpectedBarTimestamp < barTimestamp)
{
// Create a gap-fill bar with carried-forward prices
Quote gapBar = new(
Timestamp: nextExpectedBarTimestamp,
Open: _currentBar.Close,
High: _currentBar.Close,
Low: _currentBar.Close,
Close: _currentBar.Close,
Volume: 0m);

// Add gap bar using base class logic
(IQuote gapResult, _) = ToIndicator(gapBar, null);
AppendCache(gapResult, notify);

// Update current bar to the gap bar
_currentBar = gapBar;
_currentBarTimestamp = nextExpectedBarTimestamp;

nextExpectedBarTimestamp = nextExpectedBarTimestamp.Add(AggregationPeriod);
}
}

// Handle new bar or update to current bar
if (isFutureBar)
{
// Start a new bar
_currentBar = CreateOrUpdateBar(null, barTimestamp, item);
_currentBarTimestamp = barTimestamp;

// Use base class to add the new bar
(IQuote result, _) = ToIndicator(_currentBar, indexHint);
AppendCache(result, notify);
}
else // isCurrentBar
{
// Update existing bar - for quotes with same timestamp, replace
_currentBar = CreateOrUpdateBar(_currentBar, barTimestamp, item);

// Replace the last item in cache with updated bar
int index = Cache.Count - 1;
if (index >= 0)
{
Cache[index] = _currentBar;

// Notify observers of the update
if (notify)
{
NotifyObserversOnRebuild(_currentBar.Timestamp);
}
}
}
}
}

/// <summary>
/// Creates a new bar or updates an existing bar with quote data.
/// </summary>
/// <param name="existingBar">Existing bar to update, or null to create new.</param>
/// <param name="barTimestamp">Timestamp for the bar.</param>
/// <param name="quote">Quote data to incorporate.</param>
/// <returns>Updated or new Quote bar.</returns>
private static Quote CreateOrUpdateBar(Quote? existingBar, DateTime barTimestamp, IQuote quote)
{
if (existingBar == null)
{
// Create new bar from quote
return new Quote(
Timestamp: barTimestamp,
Open: quote.Open,
High: quote.High,
Low: quote.Low,
Close: quote.Close,
Volume: quote.Volume);
}
else
{
// Update existing bar
return new Quote(
Timestamp: barTimestamp,
Open: existingBar.Open, // Keep original open
High: Math.Max(existingBar.High, quote.High),
Low: Math.Min(existingBar.Low, quote.Low),
Close: quote.Close, // Always use latest close
Volume: existingBar.Volume + quote.Volume);
}
}

/// <inheritdoc/>
protected override (IQuote result, int index)
ToIndicator(IQuote item, int? indexHint)
{
ArgumentNullException.ThrowIfNull(item);

DateTime barTimestamp = item.Timestamp.RoundDown(AggregationPeriod);

int index = indexHint ?? Cache.IndexGte(barTimestamp);

if (index == -1)
{
index = Cache.Count;
}

return (item, index);
}

/// <inheritdoc/>
public override string ToString()
=> $"QUOTE-AGG<{AggregationPeriod}>: {Cache.Count} items";

/// <inheritdoc/>
protected override void RollbackState(DateTime timestamp)
{
lock (_addLock)
{
_currentBar = null;
_currentBarTimestamp = default;

// Clear input tracker for rolled back period
List<DateTime> toRemove = _inputQuoteTracker
.Where(kvp => kvp.Key >= timestamp)
.Select(kvp => kvp.Key)
.ToList();

foreach (DateTime key in toRemove)
{
_inputQuoteTracker.Remove(key);
}
}
}
}

public static partial class Quotes
{
/// <summary>
/// Creates a QuoteAggregatorHub that aggregates quotes from the provider into larger time periods.
/// </summary>
/// <param name="quoteProvider">The quote provider to aggregate.</param>
/// <param name="periodSize">The period size to aggregate to.</param>
/// <param name="fillGaps">Whether to fill gaps by carrying forward the last known price.</param>
/// <returns>A new instance of QuoteAggregatorHub.</returns>
public static QuoteAggregatorHub ToQuoteAggregatorHub(
this IQuoteProvider<IQuote> quoteProvider,
PeriodSize periodSize,
bool fillGaps = false)
=> new(quoteProvider, periodSize, fillGaps);

/// <summary>
/// Creates a QuoteAggregatorHub that aggregates quotes from the provider into larger time periods.
/// </summary>
/// <param name="quoteProvider">The quote provider to aggregate.</param>
/// <param name="timeSpan">The time span to aggregate to.</param>
/// <param name="fillGaps">Whether to fill gaps by carrying forward the last known price.</param>
/// <returns>A new instance of QuoteAggregatorHub.</returns>
public static QuoteAggregatorHub ToQuoteAggregatorHub(
this IQuoteProvider<IQuote> quoteProvider,
TimeSpan timeSpan,
bool fillGaps = false)
=> new(quoteProvider, timeSpan, fillGaps);
}
Loading
Loading