diff --git a/.vscode/settings.json b/.vscode/settings.json
index 04d5d0570..3f74a2d4f 100644
--- a/.vscode/settings.json
+++ b/.vscode/settings.json
@@ -1,7 +1,12 @@
{
"[markdown]": {
"editor.defaultFormatter": "DavidAnson.vscode-markdownlint",
+ "editor.formatOnSave": true,
+ "editor.codeActionsOnSave": {
+ "source.fixAll.markdownlint": "explicit"
+ }
},
+ "chat.useAgentSkills": true,
"chat.useAgentsMdFile": true,
"chat.useNestedAgentsMdFiles": true,
"chat.customAgentInSubagent.enabled": true,
@@ -12,6 +17,7 @@
"files.associations": {
"*.md": "markdown"
},
+ "github.copilot.chat.customAgents.showOrganizationAndEnterpriseAgents": true,
"github.copilot.chat.githubMcpServer.enabled": true,
"github.copilot.chat.githubMcpServer.readonly": false,
"github.copilot.chat.githubMcpServer.toolsets": [
diff --git a/src/_common/Quotes/ITick.cs b/src/_common/Quotes/ITick.cs
new file mode 100644
index 000000000..ef6a10bb3
--- /dev/null
+++ b/src/_common/Quotes/ITick.cs
@@ -0,0 +1,24 @@
+namespace Skender.Stock.Indicators;
+
+///
+/// Tick interface for raw market tick data.
+/// This represents a single trade or quote event with price and volume at a specific timestamp.
+///
+public interface ITick : IReusable
+{
+ ///
+ /// Tick price
+ ///
+ decimal Price { get; }
+
+ ///
+ /// Tick volume (quantity traded)
+ ///
+ decimal Volume { get; }
+
+ ///
+ /// Optional unique execution ID for duplicate detection.
+ /// When null, duplicates are assessed by timestamp only.
+ ///
+ string? ExecutionId { get; }
+}
diff --git a/src/_common/Quotes/Quote.AggregatorHub.cs b/src/_common/Quotes/Quote.AggregatorHub.cs
new file mode 100644
index 000000000..b9d66e071
--- /dev/null
+++ b/src/_common/Quotes/Quote.AggregatorHub.cs
@@ -0,0 +1,301 @@
+namespace Skender.Stock.Indicators;
+
+///
+/// Streaming hub for aggregating quotes into larger time periods.
+///
+public class QuoteAggregatorHub
+ : QuoteProvider
+{
+ private readonly object _addLock = new();
+ private readonly Dictionary _inputQuoteTracker = [];
+ private Quote? _currentBar;
+ private DateTime _currentBarTimestamp;
+
+ ///
+ /// Initializes a new instance of the class.
+ ///
+ /// The quote provider.
+ /// The period size to aggregate to.
+ /// Whether to fill gaps by carrying forward the last known price.
+ public QuoteAggregatorHub(
+ IQuoteProvider provider,
+ PeriodSize periodSize,
+ bool fillGaps = false)
+ : base(provider)
+ {
+ if (periodSize == PeriodSize.Month)
+ {
+ throw new ArgumentException(
+ $"Month aggregation is not supported in streaming mode. periodSize={periodSize}. Use TimeSpan overload for custom periods.",
+ nameof(periodSize));
+ }
+
+ AggregationPeriod = periodSize.ToTimeSpan();
+ FillGaps = fillGaps;
+ Name = $"QUOTE-AGG({periodSize})";
+
+ Reinitialize();
+ }
+
+ ///
+ /// Initializes a new instance of the class.
+ ///
+ /// The quote provider.
+ /// The time span to aggregate to.
+ /// Whether to fill gaps by carrying forward the last known price.
+ /// Thrown when the time span is less than or equal to zero.
+ public QuoteAggregatorHub(
+ IQuoteProvider provider,
+ TimeSpan timeSpan,
+ bool fillGaps = false)
+ : base(provider)
+ {
+ if (timeSpan <= TimeSpan.Zero)
+ {
+ throw new ArgumentOutOfRangeException(nameof(timeSpan), timeSpan,
+ "Aggregation period must be greater than zero.");
+ }
+
+ AggregationPeriod = timeSpan;
+ FillGaps = fillGaps;
+ Name = $"QUOTE-AGG({timeSpan})";
+
+ Reinitialize();
+ }
+
+ ///
+ /// Gets a value indicating whether gap filling is enabled.
+ ///
+ public bool FillGaps { get; }
+
+ ///
+ /// Gets the aggregation period.
+ ///
+ public TimeSpan AggregationPeriod { get; }
+
+ ///
+ public override void OnAdd(IQuote item, bool notify, int? indexHint)
+ {
+ ArgumentNullException.ThrowIfNull(item);
+
+ lock (_addLock)
+ {
+ DateTime barTimestamp = item.Timestamp.RoundDown(AggregationPeriod);
+
+ // Check if this exact input quote was already processed (duplicate detection)
+ if (_inputQuoteTracker.TryGetValue(item.Timestamp, out IQuote? previousQuote))
+ {
+ // This is an update to a previously seen quote - need to subtract old values
+ // and add new values to avoid double-counting
+ if (previousQuote.Timestamp == item.Timestamp)
+ {
+ // Update tracker with new quote
+ _inputQuoteTracker[item.Timestamp] = item;
+
+ // Rebuild from this bar to recalculate correctly
+ if (_currentBar != null && barTimestamp == _currentBarTimestamp)
+ {
+ Rebuild(barTimestamp);
+ return;
+ }
+ }
+ }
+ else
+ {
+ // Track this input quote
+ _inputQuoteTracker[item.Timestamp] = item;
+
+ // Prune old tracker entries (keep last 1000 or within 10x aggregation period)
+ if (_inputQuoteTracker.Count > 1000)
+ {
+ DateTime pruneThreshold = item.Timestamp.Add(-10 * AggregationPeriod);
+ List toRemove = _inputQuoteTracker
+ .Where(kvp => kvp.Key < pruneThreshold)
+ .Select(kvp => kvp.Key)
+ .ToList();
+
+ foreach (DateTime key in toRemove)
+ {
+ _inputQuoteTracker.Remove(key);
+ }
+ }
+ }
+
+ // Determine if this is for current bar, future bar, or past bar
+ bool isFutureBar = _currentBar == null || barTimestamp > _currentBarTimestamp;
+ bool isPastBar = _currentBar != null && barTimestamp < _currentBarTimestamp;
+
+ // Handle late arrival for past bar
+ if (isPastBar)
+ {
+ Rebuild(barTimestamp);
+ return;
+ }
+
+ // Handle gap filling if enabled and moving to future bar
+ if (FillGaps && isFutureBar && _currentBar != null)
+ {
+ DateTime lastBarTimestamp = _currentBarTimestamp;
+ DateTime nextExpectedBarTimestamp = lastBarTimestamp.Add(AggregationPeriod);
+
+ // Fill gaps between last bar and current bar
+ while (nextExpectedBarTimestamp < barTimestamp)
+ {
+ // Create a gap-fill bar with carried-forward prices
+ Quote gapBar = new(
+ Timestamp: nextExpectedBarTimestamp,
+ Open: _currentBar.Close,
+ High: _currentBar.Close,
+ Low: _currentBar.Close,
+ Close: _currentBar.Close,
+ Volume: 0m);
+
+ // Add gap bar using base class logic
+ (IQuote gapResult, _) = ToIndicator(gapBar, null);
+ AppendCache(gapResult, notify);
+
+ // Update current bar to the gap bar
+ _currentBar = gapBar;
+ _currentBarTimestamp = nextExpectedBarTimestamp;
+
+ nextExpectedBarTimestamp = nextExpectedBarTimestamp.Add(AggregationPeriod);
+ }
+ }
+
+ // Handle new bar or update to current bar
+ if (isFutureBar)
+ {
+ // Start a new bar
+ _currentBar = CreateOrUpdateBar(null, barTimestamp, item);
+ _currentBarTimestamp = barTimestamp;
+
+ // Use base class to add the new bar
+ (IQuote result, _) = ToIndicator(_currentBar, indexHint);
+ AppendCache(result, notify);
+ }
+ else // isCurrentBar
+ {
+ // Update existing bar - for quotes with same timestamp, replace
+ _currentBar = CreateOrUpdateBar(_currentBar, barTimestamp, item);
+
+ // Replace the last item in cache with updated bar
+ int index = Cache.Count - 1;
+ if (index >= 0)
+ {
+ Cache[index] = _currentBar;
+
+ // Notify observers of the update
+ if (notify)
+ {
+ NotifyObserversOnRebuild(_currentBar.Timestamp);
+ }
+ }
+ }
+ }
+ }
+
+ ///
+ /// Creates a new bar or updates an existing bar with quote data.
+ ///
+ /// Existing bar to update, or null to create new.
+ /// Timestamp for the bar.
+ /// Quote data to incorporate.
+ /// Updated or new Quote bar.
+ private static Quote CreateOrUpdateBar(Quote? existingBar, DateTime barTimestamp, IQuote quote)
+ {
+ if (existingBar == null)
+ {
+ // Create new bar from quote
+ return new Quote(
+ Timestamp: barTimestamp,
+ Open: quote.Open,
+ High: quote.High,
+ Low: quote.Low,
+ Close: quote.Close,
+ Volume: quote.Volume);
+ }
+ else
+ {
+ // Update existing bar
+ return new Quote(
+ Timestamp: barTimestamp,
+ Open: existingBar.Open, // Keep original open
+ High: Math.Max(existingBar.High, quote.High),
+ Low: Math.Min(existingBar.Low, quote.Low),
+ Close: quote.Close, // Always use latest close
+ Volume: existingBar.Volume + quote.Volume);
+ }
+ }
+
+ ///
+ protected override (IQuote result, int index)
+ ToIndicator(IQuote item, int? indexHint)
+ {
+ ArgumentNullException.ThrowIfNull(item);
+
+ DateTime barTimestamp = item.Timestamp.RoundDown(AggregationPeriod);
+
+ int index = indexHint ?? Cache.IndexGte(barTimestamp);
+
+ if (index == -1)
+ {
+ index = Cache.Count;
+ }
+
+ return (item, index);
+ }
+
+ ///
+ public override string ToString()
+ => $"QUOTE-AGG<{AggregationPeriod}>: {Cache.Count} items";
+
+ ///
+ protected override void RollbackState(DateTime timestamp)
+ {
+ lock (_addLock)
+ {
+ _currentBar = null;
+ _currentBarTimestamp = default;
+
+ // Clear input tracker for rolled back period
+ List toRemove = _inputQuoteTracker
+ .Where(kvp => kvp.Key >= timestamp)
+ .Select(kvp => kvp.Key)
+ .ToList();
+
+ foreach (DateTime key in toRemove)
+ {
+ _inputQuoteTracker.Remove(key);
+ }
+ }
+ }
+}
+
+public static partial class Quotes
+{
+ ///
+ /// Creates a QuoteAggregatorHub that aggregates quotes from the provider into larger time periods.
+ ///
+ /// The quote provider to aggregate.
+ /// The period size to aggregate to.
+ /// Whether to fill gaps by carrying forward the last known price.
+ /// A new instance of QuoteAggregatorHub.
+ public static QuoteAggregatorHub ToQuoteAggregatorHub(
+ this IQuoteProvider quoteProvider,
+ PeriodSize periodSize,
+ bool fillGaps = false)
+ => new(quoteProvider, periodSize, fillGaps);
+
+ ///
+ /// Creates a QuoteAggregatorHub that aggregates quotes from the provider into larger time periods.
+ ///
+ /// The quote provider to aggregate.
+ /// The time span to aggregate to.
+ /// Whether to fill gaps by carrying forward the last known price.
+ /// A new instance of QuoteAggregatorHub.
+ public static QuoteAggregatorHub ToQuoteAggregatorHub(
+ this IQuoteProvider quoteProvider,
+ TimeSpan timeSpan,
+ bool fillGaps = false)
+ => new(quoteProvider, timeSpan, fillGaps);
+}
diff --git a/src/_common/Quotes/Tick.AggregatorHub.cs b/src/_common/Quotes/Tick.AggregatorHub.cs
new file mode 100644
index 000000000..17f313bf3
--- /dev/null
+++ b/src/_common/Quotes/Tick.AggregatorHub.cs
@@ -0,0 +1,295 @@
+namespace Skender.Stock.Indicators;
+
+///
+/// Streaming hub for aggregating raw tick data into OHLCV quote bars.
+///
+public class TickAggregatorHub
+ : QuoteProvider
+{
+ private readonly object _addLock = new();
+ private readonly Dictionary _processedExecutionIds = [];
+ private Quote? _currentBar;
+ private DateTime _currentBarTimestamp;
+ private const int MaxExecutionIdCacheSize = 10000;
+ private readonly TimeSpan _executionIdRetentionPeriod;
+
+ ///
+ /// Initializes a new instance of the class.
+ ///
+ /// The tick data provider.
+ /// The period size to aggregate to.
+ /// Whether to fill gaps by carrying forward the last known price.
+ public TickAggregatorHub(
+ IStreamObservable provider,
+ PeriodSize periodSize,
+ bool fillGaps = false)
+ : base(provider)
+ {
+ if (periodSize == PeriodSize.Month)
+ {
+ throw new ArgumentException(
+ $"Month aggregation is not supported in streaming mode. periodSize={periodSize}. Use TimeSpan overload for custom periods.",
+ nameof(periodSize));
+ }
+
+ AggregationPeriod = periodSize.ToTimeSpan();
+ FillGaps = fillGaps;
+ Name = $"TICK-AGG({periodSize})";
+
+ // Keep execution IDs for 100x the aggregation period or at least 1 hour
+ _executionIdRetentionPeriod = TimeSpan.FromTicks(Math.Max(
+ AggregationPeriod.Ticks * 100,
+ TimeSpan.FromHours(1).Ticks));
+
+ Reinitialize();
+ }
+
+ ///
+ /// Initializes a new instance of the class.
+ ///
+ /// The tick data provider.
+ /// The time span to aggregate to.
+ /// Whether to fill gaps by carrying forward the last known price.
+ /// Thrown when the time span is less than or equal to zero.
+ public TickAggregatorHub(
+ IStreamObservable provider,
+ TimeSpan timeSpan,
+ bool fillGaps = false)
+ : base(provider)
+ {
+ if (timeSpan <= TimeSpan.Zero)
+ {
+ throw new ArgumentOutOfRangeException(nameof(timeSpan), timeSpan,
+ "Aggregation period must be greater than zero.");
+ }
+
+ AggregationPeriod = timeSpan;
+ FillGaps = fillGaps;
+ Name = $"TICK-AGG({timeSpan})";
+
+ // Keep execution IDs for 100x the aggregation period or at least 1 hour
+ _executionIdRetentionPeriod = TimeSpan.FromTicks(Math.Max(
+ timeSpan.Ticks * 100,
+ TimeSpan.FromHours(1).Ticks));
+
+ Reinitialize();
+ }
+
+ ///
+ /// Gets a value indicating whether gap filling is enabled.
+ ///
+ public bool FillGaps { get; }
+
+ ///
+ /// Gets the aggregation period.
+ ///
+ public TimeSpan AggregationPeriod { get; }
+
+ ///
+ public override void OnAdd(ITick item, bool notify, int? indexHint)
+ {
+ ArgumentNullException.ThrowIfNull(item);
+
+ lock (_addLock)
+ {
+ // Check for duplicate execution IDs with time-based pruning
+ if (!string.IsNullOrEmpty(item.ExecutionId))
+ {
+ if (_processedExecutionIds.TryGetValue(item.ExecutionId, out DateTime processedTime))
+ {
+ // Skip duplicate tick
+ return;
+ }
+
+ // Add execution ID with timestamp
+ _processedExecutionIds[item.ExecutionId] = item.Timestamp;
+
+ // Prune old execution IDs (by time or size)
+ if (_processedExecutionIds.Count > (MaxExecutionIdCacheSize / 2))
+ {
+ DateTime pruneThreshold = item.Timestamp.Add(-_executionIdRetentionPeriod);
+ List toRemove = _processedExecutionIds
+ .Where(kvp => kvp.Value < pruneThreshold)
+ .Select(kvp => kvp.Key)
+ .ToList();
+
+ foreach (string key in toRemove)
+ {
+ _processedExecutionIds.Remove(key);
+ }
+ }
+
+ // Hard limit: if still too large after pruning, remove oldest entries
+ if (_processedExecutionIds.Count > MaxExecutionIdCacheSize)
+ {
+ List toRemove = _processedExecutionIds
+ .OrderBy(kvp => kvp.Value)
+ .Take(_processedExecutionIds.Count - (MaxExecutionIdCacheSize / 2))
+ .Select(kvp => kvp.Key)
+ .ToList();
+
+ foreach (string key in toRemove)
+ {
+ _processedExecutionIds.Remove(key);
+ }
+ }
+ }
+
+ DateTime barTimestamp = item.Timestamp.RoundDown(AggregationPeriod);
+
+ // Determine if this is for current bar, future bar, or past bar
+ bool isFutureBar = _currentBar == null || barTimestamp > _currentBarTimestamp;
+ bool isPastBar = _currentBar != null && barTimestamp < _currentBarTimestamp;
+
+ // Handle late arrival for past bar
+ if (isPastBar)
+ {
+ Rebuild(barTimestamp);
+ return;
+ }
+
+ // Handle gap filling if enabled and moving to future bar
+ if (FillGaps && isFutureBar && _currentBar != null)
+ {
+ DateTime lastBarTimestamp = _currentBarTimestamp;
+ DateTime nextExpectedBarTimestamp = lastBarTimestamp.Add(AggregationPeriod);
+
+ // Fill gaps between last bar and current bar
+ while (nextExpectedBarTimestamp < barTimestamp)
+ {
+ // Create a gap-fill bar with carried-forward prices
+ Quote gapBar = new(
+ Timestamp: nextExpectedBarTimestamp,
+ Open: _currentBar.Close,
+ High: _currentBar.Close,
+ Low: _currentBar.Close,
+ Close: _currentBar.Close,
+ Volume: 0m);
+
+ // Add gap bar directly to cache
+ AppendCache(gapBar, notify);
+
+ // Update current bar to the gap bar
+ _currentBar = gapBar;
+ _currentBarTimestamp = nextExpectedBarTimestamp;
+
+ nextExpectedBarTimestamp = nextExpectedBarTimestamp.Add(AggregationPeriod);
+ }
+ }
+
+ // Handle new bar or update to current bar
+ if (isFutureBar)
+ {
+ // Start a new bar from the tick
+ _currentBar = CreateOrUpdateBar(null, barTimestamp, item);
+ _currentBarTimestamp = barTimestamp;
+
+ // Add the new bar directly to cache
+ AppendCache(_currentBar, notify);
+ }
+ else // isCurrentBar
+ {
+ // Update existing bar with new tick data
+ _currentBar = CreateOrUpdateBar(_currentBar, barTimestamp, item);
+
+ // Replace the last item in cache with updated bar
+ int index = Cache.Count - 1;
+ if (index >= 0)
+ {
+ Cache[index] = _currentBar;
+
+ // Notify observers of the update
+ if (notify)
+ {
+ NotifyObserversOnRebuild(_currentBar.Timestamp);
+ }
+ }
+ }
+ }
+ }
+
+ ///
+ /// Creates a new bar or updates an existing bar with tick data.
+ ///
+ /// Existing bar to update, or null to create new.
+ /// Timestamp for the bar.
+ /// Tick data to incorporate.
+ /// Updated or new Quote bar.
+ private static Quote CreateOrUpdateBar(Quote? existingBar, DateTime barTimestamp, ITick tick)
+ {
+ if (existingBar == null)
+ {
+ // Create new bar from tick
+ return new Quote(
+ Timestamp: barTimestamp,
+ Open: tick.Price,
+ High: tick.Price,
+ Low: tick.Price,
+ Close: tick.Price,
+ Volume: tick.Volume);
+ }
+ else
+ {
+ // Update existing bar
+ return new Quote(
+ Timestamp: barTimestamp,
+ Open: existingBar.Open, // Keep original open
+ High: Math.Max(existingBar.High, tick.Price),
+ Low: Math.Min(existingBar.Low, tick.Price),
+ Close: tick.Price, // Always use latest price
+ Volume: existingBar.Volume + tick.Volume);
+ }
+ }
+
+ ///
+ protected override (IQuote result, int index)
+ ToIndicator(ITick item, int? indexHint)
+ {
+ ArgumentNullException.ThrowIfNull(item);
+
+ DateTime barTimestamp = item.Timestamp.RoundDown(AggregationPeriod);
+
+ int index = indexHint ?? Cache.IndexGte(barTimestamp);
+
+ if (index == -1)
+ {
+ index = Cache.Count;
+ }
+
+ // Convert tick to a single-price bar
+ Quote bar = new(
+ Timestamp: barTimestamp,
+ Open: item.Price,
+ High: item.Price,
+ Low: item.Price,
+ Close: item.Price,
+ Volume: item.Volume);
+
+ return (bar, index);
+ }
+
+ ///
+ public override string ToString()
+ => $"TICK-AGG<{AggregationPeriod}>: {Cache.Count} items";
+
+ ///
+ protected override void RollbackState(DateTime timestamp)
+ {
+ lock (_addLock)
+ {
+ _currentBar = null;
+ _currentBarTimestamp = default;
+
+ // Clear execution IDs for rolled back period
+ List toRemove = _processedExecutionIds
+ .Where(kvp => kvp.Value >= timestamp)
+ .Select(kvp => kvp.Key)
+ .ToList();
+
+ foreach (string key in toRemove)
+ {
+ _processedExecutionIds.Remove(key);
+ }
+ }
+ }
+}
diff --git a/src/_common/Quotes/Tick.StreamHub.cs b/src/_common/Quotes/Tick.StreamHub.cs
new file mode 100644
index 000000000..d55b93f5f
--- /dev/null
+++ b/src/_common/Quotes/Tick.StreamHub.cs
@@ -0,0 +1,203 @@
+namespace Skender.Stock.Indicators;
+
+///
+/// Streaming hub for managing raw tick data.
+///
+public class TickHub
+ : StreamHub, IStreamObservable
+{
+ ///
+ /// Indicates whether this TickHub is standalone (no external provider).
+ ///
+ private readonly bool _isStandalone;
+
+ ///
+ /// Initializes a new instance of the class without its own provider.
+ ///
+ /// Maximum in-memory cache size.
+ public TickHub(int? maxCacheSize = null)
+ : base(new BaseProvider())
+ {
+ _isStandalone = true;
+
+ const int maxCacheSizeDefault = (int)(0.9 * int.MaxValue);
+
+ if (maxCacheSize is not null and > maxCacheSizeDefault)
+ {
+ string message
+ = $"'{nameof(maxCacheSize)}' must be less than {maxCacheSizeDefault}.";
+
+ throw new ArgumentOutOfRangeException(
+ nameof(maxCacheSize), maxCacheSize, message);
+ }
+
+ MaxCacheSize = maxCacheSize ?? maxCacheSizeDefault;
+ Name = "TICK-HUB";
+ }
+
+ ///
+ /// Initializes a new instance of the class with a specified provider.
+ ///
+ /// The tick provider.
+ public TickHub(
+ IStreamObservable provider)
+ : base(provider ?? throw new ArgumentNullException(nameof(provider)))
+ {
+ ArgumentNullException.ThrowIfNull(provider);
+
+ _isStandalone = false;
+ Name = "TICK-HUB";
+ Reinitialize();
+ }
+
+ ///
+ protected override (ITick result, int index)
+ ToIndicator(ITick item, int? indexHint)
+ {
+ ArgumentNullException.ThrowIfNull(item);
+
+ int index = indexHint
+ ?? Cache.IndexGte(item.Timestamp);
+
+ return (item, index == -1 ? Cache.Count : index);
+ }
+
+ ///
+ public override string ToString()
+ => $"TICKS: {Cache.Count} items";
+
+ ///
+ /// Handles adding a new tick with special handling for same-timestamp updates
+ /// when TickHub is standalone (no external provider).
+ ///
+ ///
+ public override void OnAdd(ITick item, bool notify, int? indexHint)
+ {
+ // for non-standalone TickHub, use standard behavior
+ if (!_isStandalone)
+ {
+ base.OnAdd(item, notify, indexHint);
+ return;
+ }
+
+ // get result and position
+ (ITick result, int index) = ToIndicator(item, indexHint);
+
+ // check if this is a same-timestamp update (not a new item at the end)
+ if (Cache.Count > 0 && index < Cache.Count && Cache[index].Timestamp == result.Timestamp)
+ {
+ // check if this is an exact duplicate (same values)
+ // if so, defer to AppendCache for overflow tracking
+ if (Cache[index].Equals(result))
+ {
+ AppendCache(result, notify);
+ return;
+ }
+
+ // For ticks with different execution IDs but same timestamp,
+ // we need to store them properly (not drop them)
+ bool hasExecutionId = !string.IsNullOrEmpty(result.ExecutionId);
+ bool hasCachedExecutionId = !string.IsNullOrEmpty(Cache[index].ExecutionId);
+
+ if (hasExecutionId && hasCachedExecutionId && Cache[index].ExecutionId != result.ExecutionId)
+ {
+ // Different execution IDs at same timestamp - both are valid trades
+ // Persist to cache before notifying observers
+ Cache[index] = result;
+
+ // Notify observers with the new tick so aggregators can process it
+ if (notify)
+ {
+ NotifyObserversOnAdd(result, index);
+ }
+
+ return;
+ }
+
+ // For ticks without execution IDs or same execution ID, replace in cache
+ Cache[index] = result;
+
+ // Notify appropriately based on whether it's an update or new execution
+ if (notify)
+ {
+ if (hasExecutionId && hasCachedExecutionId && Cache[index].ExecutionId == result.ExecutionId)
+ {
+ // Same execution ID - this is an update/correction
+ NotifyObserversOnRebuild(result.Timestamp);
+ }
+ else
+ {
+ // No execution IDs - notify as addition so aggregators can process
+ NotifyObserversOnAdd(result, index);
+ }
+ }
+
+ return;
+ }
+
+ // standard add behavior for new items
+ AppendCache(result, notify);
+ }
+
+ ///
+ /// Rebuilds the cache from a specific timestamp.
+ /// For standalone TickHub, preserves cache and notifies observers.
+ ///
+ ///
+ public override void Rebuild(DateTime fromTimestamp)
+ {
+ // for standalone TickHub (no external provider),
+ // we cannot rebuild from an empty provider cache
+ // instead, just notify observers to rebuild from this hub's cache
+ if (_isStandalone)
+ {
+ // rollback internal state
+ RollbackState(fromTimestamp);
+
+ // notify observers to rebuild from this hub
+ NotifyObserversOnRebuild(fromTimestamp);
+ return;
+ }
+
+ // standard rebuild for TickHub with external provider
+ base.Rebuild(fromTimestamp);
+ }
+}
+
+///
+/// Provides extension methods for aggregating tick data streams into OHLCV quote bars using a .
+///
+///
+/// The Ticks class offers static methods to facilitate the transformation of tick data into aggregated
+/// quote bars, supporting both fixed period sizes and custom time spans. These methods enable seamless integration with
+/// tick data providers and allow optional gap filling to maintain continuity in price data. All members are static and
+/// intended for use as extension methods on instances.
+///
+public static class Ticks
+{
+ ///
+ /// Creates a TickAggregatorHub that aggregates ticks from the provider into OHLCV quote bars.
+ ///
+ /// The tick provider to aggregate.
+ /// The period size to aggregate to.
+ /// Whether to fill gaps by carrying forward the last known price.
+ /// A new instance of TickAggregatorHub.
+ public static TickAggregatorHub ToTickAggregatorHub(
+ this IStreamObservable tickProvider,
+ PeriodSize periodSize,
+ bool fillGaps = false)
+ => new(tickProvider, periodSize, fillGaps);
+
+ ///
+ /// Creates a TickAggregatorHub that aggregates ticks from the provider into OHLCV quote bars.
+ ///
+ /// The tick provider to aggregate.
+ /// The time span to aggregate to.
+ /// Whether to fill gaps by carrying forward the last known price.
+ /// A new instance of TickAggregatorHub.
+ public static TickAggregatorHub ToTickAggregatorHub(
+ this IStreamObservable tickProvider,
+ TimeSpan timeSpan,
+ bool fillGaps = false)
+ => new(tickProvider, timeSpan, fillGaps);
+}
diff --git a/src/_common/Quotes/Tick.cs b/src/_common/Quotes/Tick.cs
new file mode 100644
index 000000000..83782b0b1
--- /dev/null
+++ b/src/_common/Quotes/Tick.cs
@@ -0,0 +1,36 @@
+namespace Skender.Stock.Indicators;
+
+///
+/// Built-in Tick type, representing a raw market tick data point.
+///
+///
+/// Date/time of the tick
+///
+///
+/// Tick price
+///
+///
+/// Tick volume (quantity traded)
+///
+///
+/// Optional unique execution ID for duplicate detection
+///
+[Serializable]
+public record Tick
+(
+ DateTime Timestamp,
+ decimal Price,
+ decimal Volume,
+ string? ExecutionId = null
+) : ITick
+{
+ ///
+ [JsonIgnore]
+ public double Value => (double)Price;
+
+ ///
+ /// Initializes a new instance of the class.
+ ///
+ public Tick()
+ : this(default, default, default, null) { }
+}
diff --git a/src/_common/StreamHub/StreamHub.Observable.cs b/src/_common/StreamHub/StreamHub.Observable.cs
index 47a8bb135..0b5eb3157 100644
--- a/src/_common/StreamHub/StreamHub.Observable.cs
+++ b/src/_common/StreamHub/StreamHub.Observable.cs
@@ -81,7 +81,7 @@ private sealed class Unsubscriber(
///
/// TSeries item to send.
/// Provider index hint.
- private void NotifyObserversOnAdd(TOut item, int? indexHint)
+ protected void NotifyObserversOnAdd(TOut item, int? indexHint)
{
if (ObserverCount == 0)
{
diff --git a/tests/indicators/_base/StreamHubTestBase.cs b/tests/indicators/_base/StreamHubTestBase.cs
index b95f310b0..1698db093 100644
--- a/tests/indicators/_base/StreamHubTestBase.cs
+++ b/tests/indicators/_base/StreamHubTestBase.cs
@@ -24,6 +24,14 @@ public interface ITestQuoteObserver
void QuoteObserver_WithWarmupLateArrivalAndRemoval_MatchesSeriesExactly();
}
+public interface ITestTickObserver
+{
+ ///
+ /// Tests hub compatibility with tick provider
+ ///
+ void TickObserver_WithWarmupAndMultipleSameTimestamp_WorksCorrectly();
+}
+
///
/// Add this to stream chainee indicator tests.
///
diff --git a/tests/indicators/_common/Quotes/Quote.AggregatorHub.Tests.cs b/tests/indicators/_common/Quotes/Quote.AggregatorHub.Tests.cs
new file mode 100644
index 000000000..77c78a9ec
--- /dev/null
+++ b/tests/indicators/_common/Quotes/Quote.AggregatorHub.Tests.cs
@@ -0,0 +1,518 @@
+namespace StreamHubs;
+
+[TestClass]
+public class QuoteAggregatorHubTests : StreamHubTestBase, ITestQuoteObserver, ITestChainProvider
+{
+ [TestMethod]
+ public override void ToStringOverride_ReturnsExpectedName()
+ {
+ QuoteHub provider = new();
+ QuoteAggregatorHub aggregator = provider.ToQuoteAggregatorHub(PeriodSize.FiveMinutes);
+
+ string result = aggregator.ToString();
+
+ result.Should().Contain("QUOTE-AGG");
+ result.Should().Contain("00:05:00");
+
+ aggregator.Unsubscribe();
+ provider.EndTransmission();
+ }
+
+ [TestMethod]
+ public void BasicAggregation_OneMinuteToFiveMinute()
+ {
+ // Setup: Create minute-level quotes
+ List minuteQuotes =
+ [
+ new(DateTime.Parse("2023-11-09 10:00", invariantCulture), 100, 105, 99, 102, 1000),
+ new(DateTime.Parse("2023-11-09 10:01", invariantCulture), 102, 106, 101, 104, 1100),
+ new(DateTime.Parse("2023-11-09 10:02", invariantCulture), 104, 107, 103, 105, 1200),
+ new(DateTime.Parse("2023-11-09 10:03", invariantCulture), 105, 108, 104, 106, 1300),
+ new(DateTime.Parse("2023-11-09 10:04", invariantCulture), 106, 109, 105, 107, 1400),
+ new(DateTime.Parse("2023-11-09 10:05", invariantCulture), 107, 110, 106, 108, 1500),
+ new(DateTime.Parse("2023-11-09 10:06", invariantCulture), 108, 111, 107, 109, 1600),
+ ];
+
+ QuoteHub provider = new();
+ QuoteAggregatorHub aggregator = provider.ToQuoteAggregatorHub(PeriodSize.FiveMinutes);
+
+ // Add quotes
+ foreach (Quote q in minuteQuotes)
+ {
+ provider.Add(q);
+ }
+
+ // Verify results
+ IReadOnlyList results = aggregator.Results;
+
+ results.Should().HaveCount(2);
+
+ // First 5-minute bar (10:00-10:04)
+ IQuote bar1 = results[0];
+ bar1.Timestamp.Should().Be(DateTime.Parse("2023-11-09 10:00", invariantCulture));
+ bar1.Open.Should().Be(100);
+ bar1.High.Should().Be(109); // Max of all highs in period
+ bar1.Low.Should().Be(99); // Min of all lows in period
+ bar1.Close.Should().Be(107); // Last close in period
+ bar1.Volume.Should().Be(6000); // Sum of all volumes
+
+ // Second 5-minute bar (10:05-10:06, incomplete)
+ IQuote bar2 = results[1];
+ bar2.Timestamp.Should().Be(DateTime.Parse("2023-11-09 10:05", invariantCulture));
+ bar2.Open.Should().Be(107);
+ bar2.High.Should().Be(111);
+ bar2.Low.Should().Be(106);
+ bar2.Close.Should().Be(109);
+ bar2.Volume.Should().Be(3100);
+
+ // Cleanup
+ aggregator.Unsubscribe();
+ provider.EndTransmission();
+ }
+
+ [TestMethod]
+ public void AggregationWithTimeSpan()
+ {
+ List minuteQuotes =
+ [
+ new(DateTime.Parse("2023-11-09 10:00", invariantCulture), 100, 105, 99, 102, 1000),
+ new(DateTime.Parse("2023-11-09 10:01", invariantCulture), 102, 106, 101, 104, 1100),
+ new(DateTime.Parse("2023-11-09 10:02", invariantCulture), 104, 107, 103, 105, 1200),
+ ];
+
+ QuoteHub provider = new();
+ QuoteAggregatorHub aggregator = provider.ToQuoteAggregatorHub(TimeSpan.FromMinutes(3));
+
+ foreach (Quote q in minuteQuotes)
+ {
+ provider.Add(q);
+ }
+
+ IReadOnlyList results = aggregator.Results;
+
+ results.Should().HaveCount(1);
+
+ IQuote bar = results[0];
+ bar.Timestamp.Should().Be(DateTime.Parse("2023-11-09 10:00", invariantCulture));
+ bar.Open.Should().Be(100);
+ bar.High.Should().Be(107);
+ bar.Low.Should().Be(99);
+ bar.Close.Should().Be(105);
+ bar.Volume.Should().Be(3300);
+
+ aggregator.Unsubscribe();
+ provider.EndTransmission();
+ }
+
+ [TestMethod]
+ public void GapFilling_CarriesForwardPrices()
+ {
+ List minuteQuotes =
+ [
+ new(DateTime.Parse("2023-11-09 10:00", invariantCulture), 100, 105, 99, 102, 1000),
+ new(DateTime.Parse("2023-11-09 10:01", invariantCulture), 102, 106, 101, 104, 1100),
+ // Gap: 10:02 missing
+ new(DateTime.Parse("2023-11-09 10:03", invariantCulture), 105, 108, 104, 106, 1300),
+ ];
+
+ QuoteHub provider = new();
+ QuoteAggregatorHub aggregator = provider.ToQuoteAggregatorHub(
+ PeriodSize.OneMinute,
+ fillGaps: true);
+
+ foreach (Quote q in minuteQuotes)
+ {
+ provider.Add(q);
+ }
+
+ IReadOnlyList results = aggregator.Results;
+
+ // Should have 4 bars: 10:00, 10:01, 10:02 (gap-filled), 10:03
+ results.Should().HaveCount(4);
+
+ // Verify gap-filled bar at 10:02
+ IQuote gapBar = results[2];
+ gapBar.Timestamp.Should().Be(DateTime.Parse("2023-11-09 10:02", invariantCulture));
+ gapBar.Open.Should().Be(104); // Carried forward from 10:01 close
+ gapBar.High.Should().Be(104);
+ gapBar.Low.Should().Be(104);
+ gapBar.Close.Should().Be(104);
+ gapBar.Volume.Should().Be(0);
+
+ aggregator.Unsubscribe();
+ provider.EndTransmission();
+ }
+
+ [TestMethod]
+ public void GapFilling_MultipleConsecutiveGaps()
+ {
+ List minuteQuotes =
+ [
+ new(DateTime.Parse("2023-11-09 10:00", invariantCulture), 100, 105, 99, 102, 1000),
+ // Gaps: 10:01, 10:02, 10:03 missing
+ new(DateTime.Parse("2023-11-09 10:04", invariantCulture), 105, 108, 104, 106, 1300),
+ ];
+
+ QuoteHub provider = new();
+ QuoteAggregatorHub aggregator = provider.ToQuoteAggregatorHub(
+ PeriodSize.OneMinute,
+ fillGaps: true);
+
+ foreach (Quote q in minuteQuotes)
+ {
+ provider.Add(q);
+ }
+
+ IReadOnlyList results = aggregator.Results;
+
+ // Should have 5 bars: 10:00, 10:01 (gap), 10:02 (gap), 10:03 (gap), 10:04
+ results.Should().HaveCount(5);
+
+ // Verify all gap-filled bars carry forward price of 102
+ for (int i = 1; i <= 3; i++)
+ {
+ IQuote gapBar = results[i];
+ gapBar.Open.Should().Be(102);
+ gapBar.High.Should().Be(102);
+ gapBar.Low.Should().Be(102);
+ gapBar.Close.Should().Be(102);
+ gapBar.Volume.Should().Be(0);
+ }
+
+ aggregator.Unsubscribe();
+ provider.EndTransmission();
+ }
+
+ [TestMethod]
+ public void NoGapFilling_SkipsMissingPeriods()
+ {
+ List minuteQuotes =
+ [
+ new(DateTime.Parse("2023-11-09 10:00", invariantCulture), 100, 105, 99, 102, 1000),
+ // Gap: 10:01, 10:02 missing
+ new(DateTime.Parse("2023-11-09 10:03", invariantCulture), 105, 108, 104, 106, 1300),
+ ];
+
+ QuoteHub provider = new();
+ QuoteAggregatorHub aggregator = provider.ToQuoteAggregatorHub(
+ PeriodSize.OneMinute,
+ fillGaps: false);
+
+ foreach (Quote q in minuteQuotes)
+ {
+ provider.Add(q);
+ }
+
+ IReadOnlyList results = aggregator.Results;
+
+ // Should have only 2 bars (no gap filling)
+ results.Should().HaveCount(2);
+
+ results[0].Timestamp.Should().Be(DateTime.Parse("2023-11-09 10:00", invariantCulture));
+ results[1].Timestamp.Should().Be(DateTime.Parse("2023-11-09 10:03", invariantCulture));
+
+ aggregator.Unsubscribe();
+ provider.EndTransmission();
+ }
+
+ [TestMethod]
+ public void SameTimestampUpdates_ReplaceWithinSameBar()
+ {
+ QuoteHub provider = new();
+ QuoteAggregatorHub aggregator = provider.ToQuoteAggregatorHub(PeriodSize.FiveMinutes);
+
+ // Add initial quote
+ provider.Add(new Quote(
+ DateTime.Parse("2023-11-09 10:00", invariantCulture), 100, 105, 99, 102, 1000));
+
+ // Add another quote in the same 5-minute period (should update the bar)
+ provider.Add(new Quote(
+ DateTime.Parse("2023-11-09 10:02", invariantCulture), 102, 110, 98, 108, 1500));
+
+ IReadOnlyList results = aggregator.Results;
+
+ results.Should().HaveCount(1);
+
+ IQuote bar = results[0];
+ bar.Timestamp.Should().Be(DateTime.Parse("2023-11-09 10:00", invariantCulture));
+ bar.Open.Should().Be(100); // Original open
+ bar.High.Should().Be(110); // Updated high
+ bar.Low.Should().Be(98); // Updated low
+ bar.Close.Should().Be(108); // Latest close
+ bar.Volume.Should().Be(2500); // Sum of volumes
+
+ aggregator.Unsubscribe();
+ provider.EndTransmission();
+ }
+
+ [TestMethod]
+ public void ChainWithDownstreamIndicator_WorksCorrectly()
+ {
+ List minuteQuotes =
+ [
+ new(DateTime.Parse("2023-11-09 10:00", invariantCulture), 100, 105, 99, 102, 1000),
+ new(DateTime.Parse("2023-11-09 10:05", invariantCulture), 102, 107, 101, 104, 1100),
+ new(DateTime.Parse("2023-11-09 10:10", invariantCulture), 104, 109, 103, 106, 1200),
+ new(DateTime.Parse("2023-11-09 10:15", invariantCulture), 106, 111, 105, 108, 1300),
+ new(DateTime.Parse("2023-11-09 10:20", invariantCulture), 108, 113, 107, 110, 1400),
+ ];
+
+ QuoteHub provider = new();
+ QuoteAggregatorHub aggregator = provider.ToQuoteAggregatorHub(PeriodSize.FiveMinutes);
+
+ // Chain with SMA
+ SmaHub sma = aggregator.ToSmaHub(3);
+
+ foreach (Quote q in minuteQuotes)
+ {
+ provider.Add(q);
+ }
+
+ // Verify aggregated quotes
+ IReadOnlyList aggResults = aggregator.Results;
+ aggResults.Should().HaveCount(5);
+
+ // Verify SMA results
+ IReadOnlyList smaResults = sma.Results;
+ smaResults.Should().HaveCount(5);
+
+ // First two should be null (not enough data)
+ smaResults[0].Sma.Should().BeNull();
+ smaResults[1].Sma.Should().BeNull();
+
+ // Third should be average of first three closes
+ smaResults[2].Sma.Should().NotBeNull();
+ const double expectedSma = (102 + 104 + 106) / 3.0;
+ smaResults[2].Sma.Should().BeApproximately(expectedSma, 0.0001);
+
+ sma.Unsubscribe();
+ aggregator.Unsubscribe();
+ provider.EndTransmission();
+ }
+
+ [TestMethod]
+ public void InvalidPeriodSize_Month_ThrowsException()
+ {
+ QuoteHub provider = new();
+
+ FluentActions
+ .Invoking(() => provider.ToQuoteAggregatorHub(PeriodSize.Month))
+ .Should()
+ .ThrowExactly()
+ .WithMessage("*Month aggregation is not supported*");
+
+ provider.EndTransmission();
+ }
+
+ [TestMethod]
+ public void InvalidTimeSpan_Zero_ThrowsException()
+ {
+ QuoteHub provider = new();
+
+ FluentActions
+ .Invoking(() => provider.ToQuoteAggregatorHub(TimeSpan.Zero))
+ .Should()
+ .ThrowExactly()
+ .WithMessage("*must be greater than zero*");
+
+ provider.EndTransmission();
+ }
+
+ [TestMethod]
+ public void InvalidTimeSpan_Negative_ThrowsException()
+ {
+ QuoteHub provider = new();
+
+ FluentActions
+ .Invoking(() => provider.ToQuoteAggregatorHub(TimeSpan.FromMinutes(-5)))
+ .Should()
+ .ThrowExactly()
+ .WithMessage("*must be greater than zero*");
+
+ provider.EndTransmission();
+ }
+
+ [TestMethod]
+ public void QuoteObserver_WithWarmupLateArrivalAndRemoval_MatchesSeriesExactly()
+ {
+ // Setup quote provider hub
+ QuoteHub provider = new();
+
+ // Prefill quotes at provider (warmup window)
+ provider.Add(Quotes.Take(20));
+
+ // Initialize aggregator (1-minute aggregation, no gaps to keep it simple)
+ QuoteAggregatorHub aggregator = provider.ToQuoteAggregatorHub(PeriodSize.OneMinute);
+
+ // Fetch initial results (early)
+ IReadOnlyList sut = aggregator.Results;
+
+ // Emulate adding quotes to provider hub
+ for (int i = 20; i < Quotes.Count; i++)
+ {
+ // Skip one (add later)
+ if (i == 80) { continue; }
+
+ // Skip the removal index for now
+ if (i == removeAtIndex) { continue; }
+
+ Quote q = Quotes[i];
+ provider.Add(q);
+
+ // Resend duplicate quotes
+ if (i is > 100 and < 105) { provider.Add(q); }
+ }
+
+ // Late arrival
+ provider.Insert(Quotes[80]);
+ IReadOnlyList afterLateArrival = aggregator.Results.ToList();
+
+ // Removal (simulate rollback)
+ provider.Insert(Quotes[removeAtIndex]);
+ IReadOnlyList afterRemoval = aggregator.Results.ToList();
+
+ // Verify structural invariants
+ sut.Should().NotBeEmpty();
+ afterLateArrival.Should().NotBeEmpty();
+ afterRemoval.Should().NotBeEmpty();
+
+ // Verify ordering and consistency
+ sut.Should().AllSatisfy(q => {
+ q.Timestamp.Should().NotBe(default);
+ q.Open.Should().BeGreaterThan(0);
+ q.High.Should().BeGreaterThanOrEqualTo(q.Low);
+ q.Close.Should().BeGreaterThan(0);
+ });
+
+ afterLateArrival.Should().AllSatisfy(q => {
+ q.Timestamp.Should().NotBe(default);
+ q.Open.Should().BeGreaterThan(0);
+ q.High.Should().BeGreaterThanOrEqualTo(q.Low);
+ q.Close.Should().BeGreaterThan(0);
+ });
+
+ afterRemoval.Should().AllSatisfy(q => {
+ q.Timestamp.Should().NotBe(default);
+ q.Open.Should().BeGreaterThan(0);
+ q.High.Should().BeGreaterThanOrEqualTo(q.Low);
+ q.Close.Should().BeGreaterThan(0);
+ });
+
+ // Verify ordering is preserved
+ for (int i = 1; i < sut.Count; i++)
+ {
+ sut[i].Timestamp.Should().BeGreaterThanOrEqualTo(sut[i - 1].Timestamp);
+ }
+
+ for (int i = 1; i < afterLateArrival.Count; i++)
+ {
+ afterLateArrival[i].Timestamp.Should().BeGreaterThanOrEqualTo(afterLateArrival[i - 1].Timestamp);
+ }
+
+ for (int i = 1; i < afterRemoval.Count; i++)
+ {
+ afterRemoval[i].Timestamp.Should().BeGreaterThanOrEqualTo(afterRemoval[i - 1].Timestamp);
+ }
+
+ // Cleanup
+ aggregator.Unsubscribe();
+ provider.EndTransmission();
+ }
+
+ [TestMethod]
+ public void ChainProvider_MatchesSeriesExactly()
+ {
+ const int emaPeriods = 14;
+
+ // Setup quote provider hub
+ QuoteHub provider = new();
+
+ // Initialize aggregator and chain with EMA
+ EmaHub observer = provider
+ .ToQuoteAggregatorHub(PeriodSize.FiveMinutes)
+ .ToEmaHub(emaPeriods);
+
+ // Emulate quote stream - for aggregator, use simple sequential adding
+ // (late arrivals and removals don't make sense for time-based aggregation)
+ foreach (Quote q in Quotes)
+ {
+ provider.Add(q);
+ }
+
+ // Final results
+ IReadOnlyList sut = observer.Results;
+
+ // Compare with aggregated series
+ IReadOnlyList aggregatedQuotes = Quotes.Aggregate(PeriodSize.FiveMinutes);
+
+ IReadOnlyList expected = aggregatedQuotes
+ .ToEma(emaPeriods);
+
+ // Assert: should have same count as batch aggregation
+ sut.Should().HaveCount(expected.Count);
+
+ // Verify EMA values match closely
+ // Note: There may be slight differences due to streaming vs batch processing
+ // but they should be very close for completed bars
+ for (int i = emaPeriods; i < sut.Count; i++)
+ {
+ if (sut[i].Ema.HasValue && expected[i].Ema.HasValue)
+ {
+ sut[i].Ema.Should().BeApproximately(expected[i].Ema.Value, 0.1,
+ $"at index {i}, timestamp {sut[i].Timestamp}");
+ }
+ }
+
+ // Cleanup
+ observer.Unsubscribe();
+ provider.EndTransmission();
+ }
+
+ [TestMethod]
+ public void Properties_AreSetCorrectly()
+ {
+ QuoteHub provider = new();
+ QuoteAggregatorHub aggregator = provider.ToQuoteAggregatorHub(
+ PeriodSize.FifteenMinutes,
+ fillGaps: true);
+
+ aggregator.FillGaps.Should().BeTrue();
+ aggregator.AggregationPeriod.Should().Be(TimeSpan.FromMinutes(15));
+
+ aggregator.Unsubscribe();
+ provider.EndTransmission();
+ }
+
+ [TestMethod]
+ public void SameTimestampQuotes_ReplacesPriorBar()
+ {
+ QuoteHub provider = new();
+ QuoteAggregatorHub aggregator = provider.ToQuoteAggregatorHub(PeriodSize.FiveMinutes);
+
+ // Add initial quote at 10:00
+ provider.Add(new Quote(
+ DateTime.Parse("2023-11-09 10:00", invariantCulture),
+ 100m, 110m, 99m, 105m, 1000m));
+
+ // Add another quote also at 10:00 (within same 5-min period)
+ provider.Add(new Quote(
+ DateTime.Parse("2023-11-09 10:01", invariantCulture),
+ 105m, 115m, 104m, 108m, 1100m));
+
+ IReadOnlyList results = aggregator.Results;
+
+ results.Should().HaveCount(1);
+
+ // Bar should incorporate both quotes
+ IQuote bar = results[0];
+ bar.Open.Should().Be(100m); // First quote's open
+ bar.High.Should().Be(115m); // Max of both highs
+ bar.Low.Should().Be(99m); // Min of both lows
+ bar.Close.Should().Be(108m); // Last quote's close
+ bar.Volume.Should().Be(2100m); // Sum of volumes
+
+ aggregator.Unsubscribe();
+ provider.EndTransmission();
+ }
+}
+
diff --git a/tests/indicators/_common/Quotes/Tick.AggregatorHub.Tests.cs b/tests/indicators/_common/Quotes/Tick.AggregatorHub.Tests.cs
new file mode 100644
index 000000000..932e6fcf5
--- /dev/null
+++ b/tests/indicators/_common/Quotes/Tick.AggregatorHub.Tests.cs
@@ -0,0 +1,452 @@
+namespace StreamHubs;
+
+[TestClass]
+public class TickAggregatorHubTests : StreamHubTestBase, ITestQuoteObserver, ITestChainProvider
+{
+ [TestMethod]
+ public override void ToStringOverride_ReturnsExpectedName()
+ {
+ TickHub provider = new();
+ TickAggregatorHub aggregator = provider.ToTickAggregatorHub(PeriodSize.FiveMinutes);
+
+ string result = aggregator.ToString();
+
+ result.Should().Contain("TICK-AGG");
+ result.Should().Contain("00:05:00");
+
+ aggregator.Unsubscribe();
+ provider.EndTransmission();
+ }
+
+ [TestMethod]
+ public void BasicAggregation_TicksToOneMinuteBars()
+ {
+ // Setup: Create tick-level data
+ List ticks =
+ [
+ new(DateTime.Parse("2023-11-09 10:00:00", invariantCulture), 100.00m, 10m),
+ new(DateTime.Parse("2023-11-09 10:00:15", invariantCulture), 100.50m, 15m),
+ new(DateTime.Parse("2023-11-09 10:00:30", invariantCulture), 99.75m, 20m),
+ new(DateTime.Parse("2023-11-09 10:00:45", invariantCulture), 100.25m, 25m),
+ new(DateTime.Parse("2023-11-09 10:01:10", invariantCulture), 101.00m, 30m),
+ ];
+
+ TickHub provider = new();
+ TickAggregatorHub aggregator = provider.ToTickAggregatorHub(PeriodSize.OneMinute);
+
+ // Add ticks
+ foreach (Tick tick in ticks)
+ {
+ provider.Add(tick);
+ }
+
+ // Verify results
+ IReadOnlyList results = aggregator.Results;
+
+ results.Should().HaveCount(2);
+
+ // First 1-minute bar (10:00)
+ IQuote bar1 = results[0];
+ bar1.Timestamp.Should().Be(DateTime.Parse("2023-11-09 10:00", invariantCulture));
+ bar1.Open.Should().Be(100.00m); // First tick price
+ bar1.High.Should().Be(100.50m); // Max tick price in period
+ bar1.Low.Should().Be(99.75m); // Min tick price in period
+ bar1.Close.Should().Be(100.25m); // Last tick price in period
+ bar1.Volume.Should().Be(70m); // Sum of tick volumes (10+15+20+25)
+
+ // Second 1-minute bar (10:01)
+ IQuote bar2 = results[1];
+ bar2.Timestamp.Should().Be(DateTime.Parse("2023-11-09 10:01", invariantCulture));
+ bar2.Open.Should().Be(101.00m);
+ bar2.High.Should().Be(101.00m);
+ bar2.Low.Should().Be(101.00m);
+ bar2.Close.Should().Be(101.00m);
+ bar2.Volume.Should().Be(30m);
+
+ // Cleanup
+ aggregator.Unsubscribe();
+ provider.EndTransmission();
+ }
+
+ [TestMethod]
+ public void GapFilling_CarriesForwardPrices()
+ {
+ List ticks =
+ [
+ new(DateTime.Parse("2023-11-09 10:00:00", invariantCulture), 100.00m, 10m),
+ // Gap: 10:01 missing - will be filled
+ new(DateTime.Parse("2023-11-09 10:02:00", invariantCulture), 102.00m, 20m),
+ ];
+
+ TickHub provider = new();
+ TickAggregatorHub aggregator = provider.ToTickAggregatorHub(
+ PeriodSize.OneMinute,
+ fillGaps: true);
+
+ foreach (Tick tick in ticks)
+ {
+ provider.Add(tick);
+ }
+
+ IReadOnlyList results = aggregator.Results;
+
+ // Should have 3 bars: 10:00, 10:01 (gap-filled), 10:02
+ results.Should().HaveCount(3);
+
+ // Verify gap-filled bar at 10:01
+ IQuote gapBar = results[1];
+ gapBar.Timestamp.Should().Be(DateTime.Parse("2023-11-09 10:01", invariantCulture));
+ gapBar.Open.Should().Be(100.00m); // Carried forward from 10:00 close
+ gapBar.High.Should().Be(100.00m);
+ gapBar.Low.Should().Be(100.00m);
+ gapBar.Close.Should().Be(100.00m);
+ gapBar.Volume.Should().Be(0m);
+
+ aggregator.Unsubscribe();
+ provider.EndTransmission();
+ }
+
+ [TestMethod]
+ public void NoGapFilling_SkipsMissingPeriods()
+ {
+ List ticks =
+ [
+ new(DateTime.Parse("2023-11-09 10:00:00", invariantCulture), 100.00m, 10m),
+ // Gap: 10:01, 10:02 missing
+ new(DateTime.Parse("2023-11-09 10:03:00", invariantCulture), 103.00m, 30m),
+ ];
+
+ TickHub provider = new();
+ TickAggregatorHub aggregator = provider.ToTickAggregatorHub(
+ PeriodSize.OneMinute,
+ fillGaps: false);
+
+ foreach (Tick tick in ticks)
+ {
+ provider.Add(tick);
+ }
+
+ IReadOnlyList results = aggregator.Results;
+
+ // Should have only 2 bars (no gap filling)
+ results.Should().HaveCount(2);
+
+ results[0].Timestamp.Should().Be(DateTime.Parse("2023-11-09 10:00", invariantCulture));
+ results[1].Timestamp.Should().Be(DateTime.Parse("2023-11-09 10:03", invariantCulture));
+
+ aggregator.Unsubscribe();
+ provider.EndTransmission();
+ }
+
+ [TestMethod]
+ public void ChainWithDownstreamIndicator_WorksCorrectly()
+ {
+ List ticks =
+ [
+ new(DateTime.Parse("2023-11-09 10:00", invariantCulture), 100m, 10m),
+ new(DateTime.Parse("2023-11-09 10:01", invariantCulture), 101m, 11m),
+ new(DateTime.Parse("2023-11-09 10:02", invariantCulture), 102m, 12m),
+ new(DateTime.Parse("2023-11-09 10:03", invariantCulture), 103m, 13m),
+ new(DateTime.Parse("2023-11-09 10:04", invariantCulture), 104m, 14m),
+ ];
+
+ TickHub provider = new();
+ TickAggregatorHub aggregator = provider.ToTickAggregatorHub(PeriodSize.OneMinute);
+
+ // Chain with SMA
+ SmaHub sma = aggregator.ToSmaHub(3);
+
+ foreach (Tick tick in ticks)
+ {
+ provider.Add(tick);
+ }
+
+ // Verify aggregated quotes
+ IReadOnlyList aggResults = aggregator.Results;
+ aggResults.Should().HaveCount(5);
+
+ // Verify SMA results
+ IReadOnlyList smaResults = sma.Results;
+ smaResults.Should().HaveCount(5);
+
+ // First two should be null (not enough data)
+ smaResults[0].Sma.Should().BeNull();
+ smaResults[1].Sma.Should().BeNull();
+
+ // Third should be average of first three closes
+ smaResults[2].Sma.Should().NotBeNull();
+ const double expectedSma = (100 + 101 + 102) / 3.0;
+ smaResults[2].Sma.Should().BeApproximately(expectedSma, Money4);
+
+ sma.Unsubscribe();
+ aggregator.Unsubscribe();
+ provider.EndTransmission();
+ }
+
+ [TestMethod]
+ public void QuoteObserver_WithWarmupLateArrivalAndRemoval_MatchesSeriesExactly()
+ {
+ // Setup tick provider hub
+ TickHub provider = new();
+
+ // Create some tick data
+ List ticks = [];
+ for (int i = 0; i < 100; i++)
+ {
+ ticks.Add(new Tick(
+ DateTime.Parse("2023-11-09 10:00", invariantCulture).AddMinutes(i),
+ 100m + i,
+ 10m + i));
+ }
+
+ // Prefill warmup window
+ provider.Add(ticks.Take(20));
+
+ // Initialize aggregator
+ TickAggregatorHub aggregator = provider.ToTickAggregatorHub(PeriodSize.OneMinute);
+
+ // Fetch initial results
+ IReadOnlyList sut = aggregator.Results.ToList();
+
+ // Stream remaining ticks in-order including duplicates
+ for (int i = 20; i < ticks.Count; i++)
+ {
+ // Skip one (add later as late arrival)
+ if (i == 80) { continue; }
+
+ // Skip removal index
+ if (i == 50) { continue; }
+
+ Tick tick = ticks[i];
+ provider.Add(tick);
+
+ // Stream duplicate at same timestamp with different execution ID
+ if (i is > 30 and < 35)
+ {
+ provider.Add(new Tick(
+ tick.Timestamp,
+ tick.Price + 0.5m,
+ tick.Volume + 1m,
+ $"dup-{i}"));
+ }
+ }
+
+ // Late historical insert (earlier timestamp than current tail)
+ provider.Insert(ticks[80]);
+ IReadOnlyList afterLateArrival = aggregator.Results.ToList();
+
+ // Remove a historical tick (simulate rollback)
+ provider.Insert(ticks[50]);
+ IReadOnlyList afterRemoval = aggregator.Results.ToList();
+
+ // Verify structural invariants at all stages
+ sut.Should().AllSatisfy(q => {
+ q.Timestamp.Should().NotBe(default);
+ q.Open.Should().BeGreaterThan(0);
+ q.High.Should().BeGreaterThanOrEqualTo(q.Low);
+ q.Close.Should().BeGreaterThan(0);
+ });
+
+ afterLateArrival.Should().AllSatisfy(q => {
+ q.Timestamp.Should().NotBe(default);
+ q.Open.Should().BeGreaterThan(0);
+ q.High.Should().BeGreaterThanOrEqualTo(q.Low);
+ q.Close.Should().BeGreaterThan(0);
+ });
+
+ afterRemoval.Should().AllSatisfy(q => {
+ q.Timestamp.Should().NotBe(default);
+ q.Open.Should().BeGreaterThan(0);
+ q.High.Should().BeGreaterThanOrEqualTo(q.Low);
+ q.Close.Should().BeGreaterThan(0);
+ });
+
+ // Verify ordering is strictly preserved
+ for (int i = 1; i < sut.Count; i++)
+ {
+ sut[i].Timestamp.Should().BeGreaterThanOrEqualTo(sut[i - 1].Timestamp);
+ }
+
+ for (int i = 1; i < afterLateArrival.Count; i++)
+ {
+ afterLateArrival[i].Timestamp.Should().BeGreaterThanOrEqualTo(afterLateArrival[i - 1].Timestamp);
+ }
+
+ for (int i = 1; i < afterRemoval.Count; i++)
+ {
+ afterRemoval[i].Timestamp.Should().BeGreaterThanOrEqualTo(afterRemoval[i - 1].Timestamp);
+ }
+
+ // Cleanup
+ aggregator.Unsubscribe();
+ provider.EndTransmission();
+ }
+
+ [TestMethod]
+ public void ChainProvider_MatchesSeriesExactly()
+ {
+ const int emaPeriods = 14;
+
+ // Setup tick provider hub
+ TickHub provider = new();
+
+ // Create tick data
+ List ticks = [];
+ for (int i = 0; i < 100; i++)
+ {
+ Tick tick = new(
+ DateTime.Parse("2023-11-09 10:00", invariantCulture).AddMinutes(i),
+ 100m + i,
+ 10m + i);
+ ticks.Add(tick);
+ provider.Add(tick);
+ }
+
+ // Initialize aggregator and chain with EMA
+ TickAggregatorHub aggregator = provider.ToTickAggregatorHub(PeriodSize.OneMinute);
+ EmaHub observer = aggregator.ToEmaHub(emaPeriods);
+
+ // Final results
+ IReadOnlyList sut = observer.Results;
+
+ // Should have results
+ sut.Should().HaveCount(100);
+
+ // Verify some EMA values are calculated
+ sut.Skip(emaPeriods).Should().AllSatisfy(r => r.Ema.Should().NotBeNull());
+
+ // Build reference EMA Series from same tick sequence
+ // First aggregate ticks to quotes
+ List quoteSequence = [];
+ foreach (Tick tick in ticks)
+ {
+ DateTime barTimestamp = tick.Timestamp.RoundDown(TimeSpan.FromMinutes(1));
+ Quote q = quoteSequence.FirstOrDefault(x => x.Timestamp == barTimestamp);
+ if (q == null)
+ {
+ q = new Quote(barTimestamp, tick.Price, tick.Price, tick.Price, tick.Price, tick.Volume);
+ quoteSequence.Add(q);
+ }
+ else
+ {
+ // Update existing bar
+ q = new Quote(
+ q.Timestamp,
+ q.Open,
+ Math.Max(q.High, tick.Price),
+ Math.Min(q.Low, tick.Price),
+ tick.Price,
+ q.Volume + tick.Volume);
+ quoteSequence[quoteSequence.Count - 1] = q;
+ }
+ }
+
+ // Calculate reference EMA on aggregated quotes
+ IReadOnlyList referenceEma = quoteSequence.Ema(emaPeriods);
+
+ // Compare observer results to reference EMA with strict ordering and equality
+ sut.Should().HaveSameCount(referenceEma);
+ for (int i = 0; i < sut.Count; i++)
+ {
+ // Verify timestamp and ordering
+ sut[i].Timestamp.Should().Be(referenceEma[i].Timestamp);
+ if (i > 0)
+ {
+ sut[i].Timestamp.Should().BeGreaterThanOrEqualTo(sut[i - 1].Timestamp);
+ }
+
+ // Verify EMA values match
+ if (referenceEma[i].Ema.HasValue)
+ {
+ sut[i].Ema.Should().BeApproximately(referenceEma[i].Ema.Value, Money4,
+ $"EMA at index {i} should match reference series");
+ }
+ else
+ {
+ sut[i].Ema.Should().BeNull($"EMA at index {i} should be null like reference");
+ }
+ }
+
+ // Cleanup
+ observer.Unsubscribe();
+ provider.EndTransmission();
+ }
+
+ [TestMethod]
+ public void Properties_AreSetCorrectly()
+ {
+ TickHub provider = new();
+ TickAggregatorHub aggregator = provider.ToTickAggregatorHub(
+ PeriodSize.FifteenMinutes,
+ fillGaps: true);
+
+ aggregator.FillGaps.Should().BeTrue();
+ aggregator.AggregationPeriod.Should().Be(TimeSpan.FromMinutes(15));
+
+ aggregator.Unsubscribe();
+ provider.EndTransmission();
+ }
+
+ [TestMethod]
+ public void DuplicateExecutionId_CorrectionRebuildsBar()
+ {
+ TickHub provider = new();
+ TickAggregatorHub aggregator = provider.ToTickAggregatorHub(PeriodSize.OneMinute);
+
+ // Add tick with execution ID
+ provider.Add(new Tick(
+ DateTime.Parse("2023-11-09 10:00:00", invariantCulture),
+ 100m, 10m, "EXEC-001"));
+
+ // Add correction with same execution ID and timestamp
+ provider.Add(new Tick(
+ DateTime.Parse("2023-11-09 10:00:00", invariantCulture),
+ 200m, 20m, "EXEC-001"));
+
+ IReadOnlyList results = aggregator.Results;
+
+ results.Should().HaveCount(1);
+
+ // Should reflect corrected tick
+ IQuote bar = results[0];
+ bar.Open.Should().Be(200m);
+ bar.High.Should().Be(200m);
+ bar.Low.Should().Be(200m);
+ bar.Close.Should().Be(200m);
+ bar.Volume.Should().Be(20m);
+
+ aggregator.Unsubscribe();
+ provider.EndTransmission();
+ }
+
+ [TestMethod]
+ public void TicksWithoutExecutionId_DuplicatesAllowed()
+ {
+ TickHub provider = new();
+ TickAggregatorHub aggregator = provider.ToTickAggregatorHub(PeriodSize.OneMinute);
+
+ // Add ticks without execution IDs (same timestamp is allowed)
+ provider.Add(new Tick(
+ DateTime.Parse("2023-11-09 10:00:00", invariantCulture),
+ 100m, 10m, null));
+
+ provider.Add(new Tick(
+ DateTime.Parse("2023-11-09 10:00:00", invariantCulture),
+ 105m, 15m, null));
+
+ IReadOnlyList results = aggregator.Results;
+
+ results.Should().HaveCount(1);
+
+ // Both ticks should be incorporated
+ IQuote bar = results[0];
+ bar.Open.Should().Be(100m); // First tick price
+ bar.High.Should().Be(105m);
+ bar.Low.Should().Be(100m);
+ bar.Close.Should().Be(105m); // Last tick price
+ bar.Volume.Should().Be(25m); // Sum of both volumes
+
+ aggregator.Unsubscribe();
+ provider.EndTransmission();
+ }
+}
diff --git a/tests/indicators/_common/Quotes/Tick.StreamHub.Tests.cs b/tests/indicators/_common/Quotes/Tick.StreamHub.Tests.cs
new file mode 100644
index 000000000..2757365ae
--- /dev/null
+++ b/tests/indicators/_common/Quotes/Tick.StreamHub.Tests.cs
@@ -0,0 +1,361 @@
+namespace StreamHubs;
+
+[TestClass]
+public class TickStreamHubTests : StreamHubTestBase, ITestTickObserver
+{
+ [TestMethod]
+ public override void ToStringOverride_ReturnsExpectedName()
+ {
+ TickHub hub = new();
+ string result = hub.ToString();
+
+ result.Should().Contain("TICKS");
+ result.Should().Contain("0 items");
+
+ hub.EndTransmission();
+ }
+
+ [TestMethod]
+ public void StandaloneInitialization_WorksCorrectly()
+ {
+ TickHub hub = new();
+
+ hub.Should().NotBeNull();
+ hub.MaxCacheSize.Should().BeGreaterThan(0);
+ hub.Cache.Should().BeEmpty();
+
+ hub.EndTransmission();
+ }
+
+ [TestMethod]
+ public void ProviderBackedInitialization_WorksCorrectly()
+ {
+ TickHub provider = new();
+ TickHub hub = new(provider);
+
+ hub.Should().NotBeNull();
+ hub.Cache.Should().BeEmpty();
+
+ hub.Unsubscribe();
+ provider.EndTransmission();
+ }
+
+ [TestMethod]
+ public void SameTimestamp_WithoutExecutionId_ReplacesAndNotifiesAsAddition()
+ {
+ TickHub hub = new();
+ int addCount = 0;
+
+ TestTickObserver observer = new();
+ observer.OnAddAction = (tick, notify, idx) => addCount++;
+
+ hub.Subscribe(observer);
+
+ // Add first tick without execution ID
+ Tick tick1 = new(
+ DateTime.Parse("2023-11-09 10:00:00", invariantCulture),
+ 100m, 10m, null);
+ hub.Add(tick1);
+
+ // Add second tick with same timestamp but different price (no execution ID)
+ Tick tick2 = new(
+ DateTime.Parse("2023-11-09 10:00:00", invariantCulture),
+ 105m, 15m, null);
+ hub.Add(tick2);
+
+ // Cache should have only one tick (the latest)
+ hub.Cache.Should().HaveCount(1);
+ hub.Cache[0].Price.Should().Be(105m);
+ hub.Cache[0].Volume.Should().Be(15m);
+
+ // Both ticks should be notified as additions for aggregators to process
+ addCount.Should().Be(2);
+
+ observer.Unsubscribe();
+ hub.EndTransmission();
+ }
+
+ [TestMethod]
+ public void SameTimestamp_WithSameExecutionId_ReplacesAndNotifiesRebuild()
+ {
+ TickHub hub = new();
+ int rebuildCount = 0;
+
+ TestTickObserver observer = new();
+ observer.OnRebuildAction = (ts) => rebuildCount++;
+
+ hub.Subscribe(observer);
+
+ // Add first tick with execution ID
+ Tick tick1 = new(
+ DateTime.Parse("2023-11-09 10:00:00", invariantCulture),
+ 100m, 10m, "EXEC-001");
+ hub.Add(tick1);
+
+ // Add updated tick with same execution ID (correction)
+ Tick tick2 = new(
+ DateTime.Parse("2023-11-09 10:00:00", invariantCulture),
+ 101m, 11m, "EXEC-001");
+ hub.Add(tick2);
+
+ // Cache should have only one tick (the corrected one)
+ hub.Cache.Should().HaveCount(1);
+ hub.Cache[0].Price.Should().Be(101m);
+ hub.Cache[0].ExecutionId.Should().Be("EXEC-001");
+
+ // Should trigger rebuild for correction
+ rebuildCount.Should().Be(1);
+
+ observer.Unsubscribe();
+ hub.EndTransmission();
+ }
+
+ [TestMethod]
+ public void SameTimestamp_WithDifferentExecutionId_NotifiesAddition()
+ {
+ TickHub hub = new();
+ int addCount = 0;
+
+ TestTickObserver observer = new();
+ observer.OnAddAction = (tick, notify, idx) => addCount++;
+
+ hub.Subscribe(observer);
+
+ // Add first tick with execution ID
+ Tick tick1 = new(
+ DateTime.Parse("2023-11-09 10:00:00", invariantCulture),
+ 100m, 10m, "EXEC-001");
+ hub.Add(tick1);
+
+ // Add second tick with same timestamp but different execution ID
+ Tick tick2 = new(
+ DateTime.Parse("2023-11-09 10:00:00", invariantCulture),
+ 105m, 15m, "EXEC-002");
+ hub.Add(tick2);
+
+ // Both ticks should be notified to observers (not dropped)
+ addCount.Should().Be(2);
+
+ observer.Unsubscribe();
+ hub.EndTransmission();
+ }
+
+ [TestMethod]
+ public void ExactDuplicate_DefersToAppendCache()
+ {
+ TickHub hub = new();
+ int addCount = 0;
+
+ TestTickObserver observer = new();
+ observer.OnAddAction = (tick, notify, idx) => addCount++;
+
+ hub.Subscribe(observer);
+
+ // Add same tick twice
+ Tick tick = new(
+ DateTime.Parse("2023-11-09 10:00:00", invariantCulture),
+ 100m, 10m, "EXEC-001");
+
+ hub.Add(tick);
+ hub.Add(tick);
+
+ // Should use overflow tracking from base class
+ hub.Cache.Should().HaveCount(1);
+ addCount.Should().Be(1); // Only first add notified
+
+ observer.Unsubscribe();
+ hub.EndTransmission();
+ }
+
+ [TestMethod]
+ public void StandaloneRebuild_PreservesCacheAndNotifiesObservers()
+ {
+ TickHub hub = new();
+ int rebuildCount = 0;
+ DateTime? rebuildTimestamp = null;
+
+ TestTickObserver observer = new();
+ observer.OnRebuildAction = (ts) =>
+ {
+ rebuildCount++;
+ rebuildTimestamp = ts;
+ };
+
+ hub.Subscribe(observer);
+
+ // Add ticks
+ hub.Add(new Tick(DateTime.Parse("2023-11-09 10:00", invariantCulture), 100m, 10m));
+ hub.Add(new Tick(DateTime.Parse("2023-11-09 10:01", invariantCulture), 101m, 11m));
+ hub.Add(new Tick(DateTime.Parse("2023-11-09 10:02", invariantCulture), 102m, 12m));
+
+ int initialCount = hub.Cache.Count;
+
+ // Trigger rebuild
+ hub.Rebuild(DateTime.Parse("2023-11-09 10:01", invariantCulture));
+
+ // Cache should still have all ticks (standalone doesn't clear)
+ hub.Cache.Should().HaveCount(initialCount);
+
+ // Observers should be notified
+ rebuildCount.Should().Be(1);
+ rebuildTimestamp.Should().Be(DateTime.Parse("2023-11-09 10:01", invariantCulture));
+
+ observer.Unsubscribe();
+ hub.EndTransmission();
+ }
+
+ [TestMethod]
+ public void ProviderBackedRebuild_UsesStandardBehavior()
+ {
+ TickHub provider = new();
+ TickHub hub = new(provider);
+
+ // Add ticks through provider
+ provider.Add(new Tick(DateTime.Parse("2023-11-09 10:00", invariantCulture), 100m, 10m));
+ provider.Add(new Tick(DateTime.Parse("2023-11-09 10:01", invariantCulture), 101m, 11m));
+ provider.Add(new Tick(DateTime.Parse("2023-11-09 10:02", invariantCulture), 102m, 12m));
+
+ hub.Cache.Should().HaveCount(3);
+
+ // Trigger rebuild on hub (not provider)
+ hub.Rebuild(DateTime.Parse("2023-11-09 10:01", invariantCulture));
+
+ // Provider-backed hub should rebuild normally
+ hub.Cache.Should().HaveCountGreaterOrEqualTo(2);
+
+ hub.Unsubscribe();
+ provider.EndTransmission();
+ }
+
+ [TestMethod]
+ public void MultipleTicksSameTimestamp_AllProcessedByAggregators()
+ {
+ // This test verifies the fix for issue #4 from code review
+ TickHub provider = new();
+ TickAggregatorHub aggregator = provider.ToTickAggregatorHub(PeriodSize.OneMinute);
+
+ // Add three ticks at same timestamp with different execution IDs
+ provider.Add(new Tick(
+ DateTime.Parse("2023-11-09 10:00:00", invariantCulture),
+ 100m, 10m, "EXEC-001"));
+ provider.Add(new Tick(
+ DateTime.Parse("2023-11-09 10:00:00", invariantCulture),
+ 105m, 15m, "EXEC-002"));
+ provider.Add(new Tick(
+ DateTime.Parse("2023-11-09 10:00:00", invariantCulture),
+ 95m, 20m, "EXEC-003"));
+
+ IReadOnlyList results = aggregator.Results;
+
+ results.Should().HaveCount(1);
+
+ // All three ticks should be incorporated into the bar
+ IQuote bar = results[0];
+ bar.High.Should().Be(105m); // Max of all three
+ bar.Low.Should().Be(95m); // Min of all three
+ bar.Volume.Should().Be(45m); // Sum of all three (10 + 15 + 20)
+
+ aggregator.Unsubscribe();
+ provider.EndTransmission();
+ }
+
+ [TestMethod]
+ public void TickObserver_WithWarmupAndMultipleSameTimestamp_WorksCorrectly()
+ {
+ // setup tick provider hub
+ TickHub provider = new();
+
+ // prefill warmup window
+ for (int i = 0; i < 20; i++)
+ {
+ provider.Add(new Tick(
+ DateTime.Parse("2023-11-09", invariantCulture).AddMinutes(i),
+ 100m + i, 10m + i, $"EXEC-{i:000}"));
+ }
+
+ // initialize observer
+ TickHub observer = new(provider);
+
+ // fetch initial results
+ IReadOnlyList results = observer.Results.ToList();
+
+ results.Should().HaveCount(20);
+
+ // stream in-order duplicates (same timestamp different exec IDs)
+ provider.Add(new Tick(
+ DateTime.Parse("2023-11-09", invariantCulture).AddMinutes(20),
+ 120m, 30m, "EXEC-020"));
+ provider.Add(new Tick(
+ DateTime.Parse("2023-11-09", invariantCulture).AddMinutes(20),
+ 121m, 31m, "EXEC-021"));
+
+ IReadOnlyList afterDuplicates = observer.Results.ToList();
+ afterDuplicates.Should().HaveCount(22); // 20 + 2 more ticks with same timestamp
+
+ // insert late historical tick (earlier DateTime than current tail)
+ provider.Insert(new Tick(
+ DateTime.Parse("2023-11-09", invariantCulture).AddMinutes(15),
+ 115m, 25m, "EXEC-015-LATE"));
+
+ IReadOnlyList afterLateArrival = observer.Results.ToList();
+
+ // results should be recalculated and maintain ordering
+ afterLateArrival.Should().HaveCount(23);
+ for (int i = 1; i < afterLateArrival.Count; i++)
+ {
+ afterLateArrival[i].Timestamp.Should().BeGreaterThanOrEqualTo(afterLateArrival[i - 1].Timestamp);
+ }
+
+ // remove a historical tick (simulate rollback)
+ provider.Insert(new Tick(
+ DateTime.Parse("2023-11-09", invariantCulture).AddMinutes(10),
+ 110m, 20m, "EXEC-010"));
+
+ IReadOnlyList afterRemoval = observer.Results.ToList();
+
+ // results should update after removal
+ afterRemoval.Should().HaveCount(24);
+
+ // Verify strict ordering is maintained after each mutation
+ for (int i = 1; i < afterRemoval.Count; i++)
+ {
+ afterRemoval[i].Timestamp.Should().BeGreaterThanOrEqualTo(afterRemoval[i - 1].Timestamp);
+ }
+
+ // Verify content equality for final results
+ // (timestamps and tick counts should match expected sequence)
+ afterRemoval.Should().AllSatisfy(t => {
+ t.Timestamp.Should().NotBe(default);
+ t.Price.Should().BeGreaterThan(0);
+ t.Volume.Should().BeGreaterThan(0);
+ });
+
+ // cleanup
+ observer.Unsubscribe();
+ provider.EndTransmission();
+ }
+
+ ///
+ /// Test observer helper class for tracking tick notifications.
+ ///
+ private class TestTickObserver : IStreamObserver
+ {
+ public Action? OnAddAction { get; set; }
+ public Action? OnRebuildAction { get; set; }
+ public bool IsSubscribed { get; private set; } = true;
+
+ public void OnAdd(ITick item, bool notify, int? indexHint)
+ => OnAddAction?.Invoke(item, notify, indexHint);
+
+ public void OnCompleted() => IsSubscribed = false;
+
+ public void OnRebuild(DateTime timestamp)
+ => OnRebuildAction?.Invoke(timestamp);
+
+ public void OnError(Exception error) { }
+
+ public void OnPrune(DateTime timestamp) { }
+
+ public void Unsubscribe() => IsSubscribed = false;
+ }
+}
diff --git a/tests/indicators/_common/StreamHub/StreamHub.Interface.Tests.cs b/tests/indicators/_common/StreamHub/StreamHub.Interface.Tests.cs
index 1a3e68d1b..157852af3 100644
--- a/tests/indicators/_common/StreamHub/StreamHub.Interface.Tests.cs
+++ b/tests/indicators/_common/StreamHub/StreamHub.Interface.Tests.cs
@@ -32,7 +32,8 @@ public void AllStreamHubTests_ImplementCorrectInterfaces()
// Define observer and provider interface types
Type[] observerTypes = [
typeof(ITestChainObserver),
- typeof(ITestQuoteObserver)
+ typeof(ITestQuoteObserver),
+ typeof(ITestTickObserver)
];
// Add more provider interfaces here if needed