diff --git a/.vscode/settings.json b/.vscode/settings.json index 04d5d0570..3f74a2d4f 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -1,7 +1,12 @@ { "[markdown]": { "editor.defaultFormatter": "DavidAnson.vscode-markdownlint", + "editor.formatOnSave": true, + "editor.codeActionsOnSave": { + "source.fixAll.markdownlint": "explicit" + } }, + "chat.useAgentSkills": true, "chat.useAgentsMdFile": true, "chat.useNestedAgentsMdFiles": true, "chat.customAgentInSubagent.enabled": true, @@ -12,6 +17,7 @@ "files.associations": { "*.md": "markdown" }, + "github.copilot.chat.customAgents.showOrganizationAndEnterpriseAgents": true, "github.copilot.chat.githubMcpServer.enabled": true, "github.copilot.chat.githubMcpServer.readonly": false, "github.copilot.chat.githubMcpServer.toolsets": [ diff --git a/src/_common/Quotes/ITick.cs b/src/_common/Quotes/ITick.cs new file mode 100644 index 000000000..ef6a10bb3 --- /dev/null +++ b/src/_common/Quotes/ITick.cs @@ -0,0 +1,24 @@ +namespace Skender.Stock.Indicators; + +/// +/// Tick interface for raw market tick data. +/// This represents a single trade or quote event with price and volume at a specific timestamp. +/// +public interface ITick : IReusable +{ + /// + /// Tick price + /// + decimal Price { get; } + + /// + /// Tick volume (quantity traded) + /// + decimal Volume { get; } + + /// + /// Optional unique execution ID for duplicate detection. + /// When null, duplicates are assessed by timestamp only. + /// + string? ExecutionId { get; } +} diff --git a/src/_common/Quotes/Quote.AggregatorHub.cs b/src/_common/Quotes/Quote.AggregatorHub.cs new file mode 100644 index 000000000..b9d66e071 --- /dev/null +++ b/src/_common/Quotes/Quote.AggregatorHub.cs @@ -0,0 +1,301 @@ +namespace Skender.Stock.Indicators; + +/// +/// Streaming hub for aggregating quotes into larger time periods. +/// +public class QuoteAggregatorHub + : QuoteProvider +{ + private readonly object _addLock = new(); + private readonly Dictionary _inputQuoteTracker = []; + private Quote? _currentBar; + private DateTime _currentBarTimestamp; + + /// + /// Initializes a new instance of the class. + /// + /// The quote provider. + /// The period size to aggregate to. + /// Whether to fill gaps by carrying forward the last known price. + public QuoteAggregatorHub( + IQuoteProvider provider, + PeriodSize periodSize, + bool fillGaps = false) + : base(provider) + { + if (periodSize == PeriodSize.Month) + { + throw new ArgumentException( + $"Month aggregation is not supported in streaming mode. periodSize={periodSize}. Use TimeSpan overload for custom periods.", + nameof(periodSize)); + } + + AggregationPeriod = periodSize.ToTimeSpan(); + FillGaps = fillGaps; + Name = $"QUOTE-AGG({periodSize})"; + + Reinitialize(); + } + + /// + /// Initializes a new instance of the class. + /// + /// The quote provider. + /// The time span to aggregate to. + /// Whether to fill gaps by carrying forward the last known price. + /// Thrown when the time span is less than or equal to zero. + public QuoteAggregatorHub( + IQuoteProvider provider, + TimeSpan timeSpan, + bool fillGaps = false) + : base(provider) + { + if (timeSpan <= TimeSpan.Zero) + { + throw new ArgumentOutOfRangeException(nameof(timeSpan), timeSpan, + "Aggregation period must be greater than zero."); + } + + AggregationPeriod = timeSpan; + FillGaps = fillGaps; + Name = $"QUOTE-AGG({timeSpan})"; + + Reinitialize(); + } + + /// + /// Gets a value indicating whether gap filling is enabled. + /// + public bool FillGaps { get; } + + /// + /// Gets the aggregation period. + /// + public TimeSpan AggregationPeriod { get; } + + /// + public override void OnAdd(IQuote item, bool notify, int? indexHint) + { + ArgumentNullException.ThrowIfNull(item); + + lock (_addLock) + { + DateTime barTimestamp = item.Timestamp.RoundDown(AggregationPeriod); + + // Check if this exact input quote was already processed (duplicate detection) + if (_inputQuoteTracker.TryGetValue(item.Timestamp, out IQuote? previousQuote)) + { + // This is an update to a previously seen quote - need to subtract old values + // and add new values to avoid double-counting + if (previousQuote.Timestamp == item.Timestamp) + { + // Update tracker with new quote + _inputQuoteTracker[item.Timestamp] = item; + + // Rebuild from this bar to recalculate correctly + if (_currentBar != null && barTimestamp == _currentBarTimestamp) + { + Rebuild(barTimestamp); + return; + } + } + } + else + { + // Track this input quote + _inputQuoteTracker[item.Timestamp] = item; + + // Prune old tracker entries (keep last 1000 or within 10x aggregation period) + if (_inputQuoteTracker.Count > 1000) + { + DateTime pruneThreshold = item.Timestamp.Add(-10 * AggregationPeriod); + List toRemove = _inputQuoteTracker + .Where(kvp => kvp.Key < pruneThreshold) + .Select(kvp => kvp.Key) + .ToList(); + + foreach (DateTime key in toRemove) + { + _inputQuoteTracker.Remove(key); + } + } + } + + // Determine if this is for current bar, future bar, or past bar + bool isFutureBar = _currentBar == null || barTimestamp > _currentBarTimestamp; + bool isPastBar = _currentBar != null && barTimestamp < _currentBarTimestamp; + + // Handle late arrival for past bar + if (isPastBar) + { + Rebuild(barTimestamp); + return; + } + + // Handle gap filling if enabled and moving to future bar + if (FillGaps && isFutureBar && _currentBar != null) + { + DateTime lastBarTimestamp = _currentBarTimestamp; + DateTime nextExpectedBarTimestamp = lastBarTimestamp.Add(AggregationPeriod); + + // Fill gaps between last bar and current bar + while (nextExpectedBarTimestamp < barTimestamp) + { + // Create a gap-fill bar with carried-forward prices + Quote gapBar = new( + Timestamp: nextExpectedBarTimestamp, + Open: _currentBar.Close, + High: _currentBar.Close, + Low: _currentBar.Close, + Close: _currentBar.Close, + Volume: 0m); + + // Add gap bar using base class logic + (IQuote gapResult, _) = ToIndicator(gapBar, null); + AppendCache(gapResult, notify); + + // Update current bar to the gap bar + _currentBar = gapBar; + _currentBarTimestamp = nextExpectedBarTimestamp; + + nextExpectedBarTimestamp = nextExpectedBarTimestamp.Add(AggregationPeriod); + } + } + + // Handle new bar or update to current bar + if (isFutureBar) + { + // Start a new bar + _currentBar = CreateOrUpdateBar(null, barTimestamp, item); + _currentBarTimestamp = barTimestamp; + + // Use base class to add the new bar + (IQuote result, _) = ToIndicator(_currentBar, indexHint); + AppendCache(result, notify); + } + else // isCurrentBar + { + // Update existing bar - for quotes with same timestamp, replace + _currentBar = CreateOrUpdateBar(_currentBar, barTimestamp, item); + + // Replace the last item in cache with updated bar + int index = Cache.Count - 1; + if (index >= 0) + { + Cache[index] = _currentBar; + + // Notify observers of the update + if (notify) + { + NotifyObserversOnRebuild(_currentBar.Timestamp); + } + } + } + } + } + + /// + /// Creates a new bar or updates an existing bar with quote data. + /// + /// Existing bar to update, or null to create new. + /// Timestamp for the bar. + /// Quote data to incorporate. + /// Updated or new Quote bar. + private static Quote CreateOrUpdateBar(Quote? existingBar, DateTime barTimestamp, IQuote quote) + { + if (existingBar == null) + { + // Create new bar from quote + return new Quote( + Timestamp: barTimestamp, + Open: quote.Open, + High: quote.High, + Low: quote.Low, + Close: quote.Close, + Volume: quote.Volume); + } + else + { + // Update existing bar + return new Quote( + Timestamp: barTimestamp, + Open: existingBar.Open, // Keep original open + High: Math.Max(existingBar.High, quote.High), + Low: Math.Min(existingBar.Low, quote.Low), + Close: quote.Close, // Always use latest close + Volume: existingBar.Volume + quote.Volume); + } + } + + /// + protected override (IQuote result, int index) + ToIndicator(IQuote item, int? indexHint) + { + ArgumentNullException.ThrowIfNull(item); + + DateTime barTimestamp = item.Timestamp.RoundDown(AggregationPeriod); + + int index = indexHint ?? Cache.IndexGte(barTimestamp); + + if (index == -1) + { + index = Cache.Count; + } + + return (item, index); + } + + /// + public override string ToString() + => $"QUOTE-AGG<{AggregationPeriod}>: {Cache.Count} items"; + + /// + protected override void RollbackState(DateTime timestamp) + { + lock (_addLock) + { + _currentBar = null; + _currentBarTimestamp = default; + + // Clear input tracker for rolled back period + List toRemove = _inputQuoteTracker + .Where(kvp => kvp.Key >= timestamp) + .Select(kvp => kvp.Key) + .ToList(); + + foreach (DateTime key in toRemove) + { + _inputQuoteTracker.Remove(key); + } + } + } +} + +public static partial class Quotes +{ + /// + /// Creates a QuoteAggregatorHub that aggregates quotes from the provider into larger time periods. + /// + /// The quote provider to aggregate. + /// The period size to aggregate to. + /// Whether to fill gaps by carrying forward the last known price. + /// A new instance of QuoteAggregatorHub. + public static QuoteAggregatorHub ToQuoteAggregatorHub( + this IQuoteProvider quoteProvider, + PeriodSize periodSize, + bool fillGaps = false) + => new(quoteProvider, periodSize, fillGaps); + + /// + /// Creates a QuoteAggregatorHub that aggregates quotes from the provider into larger time periods. + /// + /// The quote provider to aggregate. + /// The time span to aggregate to. + /// Whether to fill gaps by carrying forward the last known price. + /// A new instance of QuoteAggregatorHub. + public static QuoteAggregatorHub ToQuoteAggregatorHub( + this IQuoteProvider quoteProvider, + TimeSpan timeSpan, + bool fillGaps = false) + => new(quoteProvider, timeSpan, fillGaps); +} diff --git a/src/_common/Quotes/Tick.AggregatorHub.cs b/src/_common/Quotes/Tick.AggregatorHub.cs new file mode 100644 index 000000000..17f313bf3 --- /dev/null +++ b/src/_common/Quotes/Tick.AggregatorHub.cs @@ -0,0 +1,295 @@ +namespace Skender.Stock.Indicators; + +/// +/// Streaming hub for aggregating raw tick data into OHLCV quote bars. +/// +public class TickAggregatorHub + : QuoteProvider +{ + private readonly object _addLock = new(); + private readonly Dictionary _processedExecutionIds = []; + private Quote? _currentBar; + private DateTime _currentBarTimestamp; + private const int MaxExecutionIdCacheSize = 10000; + private readonly TimeSpan _executionIdRetentionPeriod; + + /// + /// Initializes a new instance of the class. + /// + /// The tick data provider. + /// The period size to aggregate to. + /// Whether to fill gaps by carrying forward the last known price. + public TickAggregatorHub( + IStreamObservable provider, + PeriodSize periodSize, + bool fillGaps = false) + : base(provider) + { + if (periodSize == PeriodSize.Month) + { + throw new ArgumentException( + $"Month aggregation is not supported in streaming mode. periodSize={periodSize}. Use TimeSpan overload for custom periods.", + nameof(periodSize)); + } + + AggregationPeriod = periodSize.ToTimeSpan(); + FillGaps = fillGaps; + Name = $"TICK-AGG({periodSize})"; + + // Keep execution IDs for 100x the aggregation period or at least 1 hour + _executionIdRetentionPeriod = TimeSpan.FromTicks(Math.Max( + AggregationPeriod.Ticks * 100, + TimeSpan.FromHours(1).Ticks)); + + Reinitialize(); + } + + /// + /// Initializes a new instance of the class. + /// + /// The tick data provider. + /// The time span to aggregate to. + /// Whether to fill gaps by carrying forward the last known price. + /// Thrown when the time span is less than or equal to zero. + public TickAggregatorHub( + IStreamObservable provider, + TimeSpan timeSpan, + bool fillGaps = false) + : base(provider) + { + if (timeSpan <= TimeSpan.Zero) + { + throw new ArgumentOutOfRangeException(nameof(timeSpan), timeSpan, + "Aggregation period must be greater than zero."); + } + + AggregationPeriod = timeSpan; + FillGaps = fillGaps; + Name = $"TICK-AGG({timeSpan})"; + + // Keep execution IDs for 100x the aggregation period or at least 1 hour + _executionIdRetentionPeriod = TimeSpan.FromTicks(Math.Max( + timeSpan.Ticks * 100, + TimeSpan.FromHours(1).Ticks)); + + Reinitialize(); + } + + /// + /// Gets a value indicating whether gap filling is enabled. + /// + public bool FillGaps { get; } + + /// + /// Gets the aggregation period. + /// + public TimeSpan AggregationPeriod { get; } + + /// + public override void OnAdd(ITick item, bool notify, int? indexHint) + { + ArgumentNullException.ThrowIfNull(item); + + lock (_addLock) + { + // Check for duplicate execution IDs with time-based pruning + if (!string.IsNullOrEmpty(item.ExecutionId)) + { + if (_processedExecutionIds.TryGetValue(item.ExecutionId, out DateTime processedTime)) + { + // Skip duplicate tick + return; + } + + // Add execution ID with timestamp + _processedExecutionIds[item.ExecutionId] = item.Timestamp; + + // Prune old execution IDs (by time or size) + if (_processedExecutionIds.Count > (MaxExecutionIdCacheSize / 2)) + { + DateTime pruneThreshold = item.Timestamp.Add(-_executionIdRetentionPeriod); + List toRemove = _processedExecutionIds + .Where(kvp => kvp.Value < pruneThreshold) + .Select(kvp => kvp.Key) + .ToList(); + + foreach (string key in toRemove) + { + _processedExecutionIds.Remove(key); + } + } + + // Hard limit: if still too large after pruning, remove oldest entries + if (_processedExecutionIds.Count > MaxExecutionIdCacheSize) + { + List toRemove = _processedExecutionIds + .OrderBy(kvp => kvp.Value) + .Take(_processedExecutionIds.Count - (MaxExecutionIdCacheSize / 2)) + .Select(kvp => kvp.Key) + .ToList(); + + foreach (string key in toRemove) + { + _processedExecutionIds.Remove(key); + } + } + } + + DateTime barTimestamp = item.Timestamp.RoundDown(AggregationPeriod); + + // Determine if this is for current bar, future bar, or past bar + bool isFutureBar = _currentBar == null || barTimestamp > _currentBarTimestamp; + bool isPastBar = _currentBar != null && barTimestamp < _currentBarTimestamp; + + // Handle late arrival for past bar + if (isPastBar) + { + Rebuild(barTimestamp); + return; + } + + // Handle gap filling if enabled and moving to future bar + if (FillGaps && isFutureBar && _currentBar != null) + { + DateTime lastBarTimestamp = _currentBarTimestamp; + DateTime nextExpectedBarTimestamp = lastBarTimestamp.Add(AggregationPeriod); + + // Fill gaps between last bar and current bar + while (nextExpectedBarTimestamp < barTimestamp) + { + // Create a gap-fill bar with carried-forward prices + Quote gapBar = new( + Timestamp: nextExpectedBarTimestamp, + Open: _currentBar.Close, + High: _currentBar.Close, + Low: _currentBar.Close, + Close: _currentBar.Close, + Volume: 0m); + + // Add gap bar directly to cache + AppendCache(gapBar, notify); + + // Update current bar to the gap bar + _currentBar = gapBar; + _currentBarTimestamp = nextExpectedBarTimestamp; + + nextExpectedBarTimestamp = nextExpectedBarTimestamp.Add(AggregationPeriod); + } + } + + // Handle new bar or update to current bar + if (isFutureBar) + { + // Start a new bar from the tick + _currentBar = CreateOrUpdateBar(null, barTimestamp, item); + _currentBarTimestamp = barTimestamp; + + // Add the new bar directly to cache + AppendCache(_currentBar, notify); + } + else // isCurrentBar + { + // Update existing bar with new tick data + _currentBar = CreateOrUpdateBar(_currentBar, barTimestamp, item); + + // Replace the last item in cache with updated bar + int index = Cache.Count - 1; + if (index >= 0) + { + Cache[index] = _currentBar; + + // Notify observers of the update + if (notify) + { + NotifyObserversOnRebuild(_currentBar.Timestamp); + } + } + } + } + } + + /// + /// Creates a new bar or updates an existing bar with tick data. + /// + /// Existing bar to update, or null to create new. + /// Timestamp for the bar. + /// Tick data to incorporate. + /// Updated or new Quote bar. + private static Quote CreateOrUpdateBar(Quote? existingBar, DateTime barTimestamp, ITick tick) + { + if (existingBar == null) + { + // Create new bar from tick + return new Quote( + Timestamp: barTimestamp, + Open: tick.Price, + High: tick.Price, + Low: tick.Price, + Close: tick.Price, + Volume: tick.Volume); + } + else + { + // Update existing bar + return new Quote( + Timestamp: barTimestamp, + Open: existingBar.Open, // Keep original open + High: Math.Max(existingBar.High, tick.Price), + Low: Math.Min(existingBar.Low, tick.Price), + Close: tick.Price, // Always use latest price + Volume: existingBar.Volume + tick.Volume); + } + } + + /// + protected override (IQuote result, int index) + ToIndicator(ITick item, int? indexHint) + { + ArgumentNullException.ThrowIfNull(item); + + DateTime barTimestamp = item.Timestamp.RoundDown(AggregationPeriod); + + int index = indexHint ?? Cache.IndexGte(barTimestamp); + + if (index == -1) + { + index = Cache.Count; + } + + // Convert tick to a single-price bar + Quote bar = new( + Timestamp: barTimestamp, + Open: item.Price, + High: item.Price, + Low: item.Price, + Close: item.Price, + Volume: item.Volume); + + return (bar, index); + } + + /// + public override string ToString() + => $"TICK-AGG<{AggregationPeriod}>: {Cache.Count} items"; + + /// + protected override void RollbackState(DateTime timestamp) + { + lock (_addLock) + { + _currentBar = null; + _currentBarTimestamp = default; + + // Clear execution IDs for rolled back period + List toRemove = _processedExecutionIds + .Where(kvp => kvp.Value >= timestamp) + .Select(kvp => kvp.Key) + .ToList(); + + foreach (string key in toRemove) + { + _processedExecutionIds.Remove(key); + } + } + } +} diff --git a/src/_common/Quotes/Tick.StreamHub.cs b/src/_common/Quotes/Tick.StreamHub.cs new file mode 100644 index 000000000..d55b93f5f --- /dev/null +++ b/src/_common/Quotes/Tick.StreamHub.cs @@ -0,0 +1,203 @@ +namespace Skender.Stock.Indicators; + +/// +/// Streaming hub for managing raw tick data. +/// +public class TickHub + : StreamHub, IStreamObservable +{ + /// + /// Indicates whether this TickHub is standalone (no external provider). + /// + private readonly bool _isStandalone; + + /// + /// Initializes a new instance of the class without its own provider. + /// + /// Maximum in-memory cache size. + public TickHub(int? maxCacheSize = null) + : base(new BaseProvider()) + { + _isStandalone = true; + + const int maxCacheSizeDefault = (int)(0.9 * int.MaxValue); + + if (maxCacheSize is not null and > maxCacheSizeDefault) + { + string message + = $"'{nameof(maxCacheSize)}' must be less than {maxCacheSizeDefault}."; + + throw new ArgumentOutOfRangeException( + nameof(maxCacheSize), maxCacheSize, message); + } + + MaxCacheSize = maxCacheSize ?? maxCacheSizeDefault; + Name = "TICK-HUB"; + } + + /// + /// Initializes a new instance of the class with a specified provider. + /// + /// The tick provider. + public TickHub( + IStreamObservable provider) + : base(provider ?? throw new ArgumentNullException(nameof(provider))) + { + ArgumentNullException.ThrowIfNull(provider); + + _isStandalone = false; + Name = "TICK-HUB"; + Reinitialize(); + } + + /// + protected override (ITick result, int index) + ToIndicator(ITick item, int? indexHint) + { + ArgumentNullException.ThrowIfNull(item); + + int index = indexHint + ?? Cache.IndexGte(item.Timestamp); + + return (item, index == -1 ? Cache.Count : index); + } + + /// + public override string ToString() + => $"TICKS: {Cache.Count} items"; + + /// + /// Handles adding a new tick with special handling for same-timestamp updates + /// when TickHub is standalone (no external provider). + /// + /// + public override void OnAdd(ITick item, bool notify, int? indexHint) + { + // for non-standalone TickHub, use standard behavior + if (!_isStandalone) + { + base.OnAdd(item, notify, indexHint); + return; + } + + // get result and position + (ITick result, int index) = ToIndicator(item, indexHint); + + // check if this is a same-timestamp update (not a new item at the end) + if (Cache.Count > 0 && index < Cache.Count && Cache[index].Timestamp == result.Timestamp) + { + // check if this is an exact duplicate (same values) + // if so, defer to AppendCache for overflow tracking + if (Cache[index].Equals(result)) + { + AppendCache(result, notify); + return; + } + + // For ticks with different execution IDs but same timestamp, + // we need to store them properly (not drop them) + bool hasExecutionId = !string.IsNullOrEmpty(result.ExecutionId); + bool hasCachedExecutionId = !string.IsNullOrEmpty(Cache[index].ExecutionId); + + if (hasExecutionId && hasCachedExecutionId && Cache[index].ExecutionId != result.ExecutionId) + { + // Different execution IDs at same timestamp - both are valid trades + // Persist to cache before notifying observers + Cache[index] = result; + + // Notify observers with the new tick so aggregators can process it + if (notify) + { + NotifyObserversOnAdd(result, index); + } + + return; + } + + // For ticks without execution IDs or same execution ID, replace in cache + Cache[index] = result; + + // Notify appropriately based on whether it's an update or new execution + if (notify) + { + if (hasExecutionId && hasCachedExecutionId && Cache[index].ExecutionId == result.ExecutionId) + { + // Same execution ID - this is an update/correction + NotifyObserversOnRebuild(result.Timestamp); + } + else + { + // No execution IDs - notify as addition so aggregators can process + NotifyObserversOnAdd(result, index); + } + } + + return; + } + + // standard add behavior for new items + AppendCache(result, notify); + } + + /// + /// Rebuilds the cache from a specific timestamp. + /// For standalone TickHub, preserves cache and notifies observers. + /// + /// + public override void Rebuild(DateTime fromTimestamp) + { + // for standalone TickHub (no external provider), + // we cannot rebuild from an empty provider cache + // instead, just notify observers to rebuild from this hub's cache + if (_isStandalone) + { + // rollback internal state + RollbackState(fromTimestamp); + + // notify observers to rebuild from this hub + NotifyObserversOnRebuild(fromTimestamp); + return; + } + + // standard rebuild for TickHub with external provider + base.Rebuild(fromTimestamp); + } +} + +/// +/// Provides extension methods for aggregating tick data streams into OHLCV quote bars using a . +/// +/// +/// The Ticks class offers static methods to facilitate the transformation of tick data into aggregated +/// quote bars, supporting both fixed period sizes and custom time spans. These methods enable seamless integration with +/// tick data providers and allow optional gap filling to maintain continuity in price data. All members are static and +/// intended for use as extension methods on instances. +/// +public static class Ticks +{ + /// + /// Creates a TickAggregatorHub that aggregates ticks from the provider into OHLCV quote bars. + /// + /// The tick provider to aggregate. + /// The period size to aggregate to. + /// Whether to fill gaps by carrying forward the last known price. + /// A new instance of TickAggregatorHub. + public static TickAggregatorHub ToTickAggregatorHub( + this IStreamObservable tickProvider, + PeriodSize periodSize, + bool fillGaps = false) + => new(tickProvider, periodSize, fillGaps); + + /// + /// Creates a TickAggregatorHub that aggregates ticks from the provider into OHLCV quote bars. + /// + /// The tick provider to aggregate. + /// The time span to aggregate to. + /// Whether to fill gaps by carrying forward the last known price. + /// A new instance of TickAggregatorHub. + public static TickAggregatorHub ToTickAggregatorHub( + this IStreamObservable tickProvider, + TimeSpan timeSpan, + bool fillGaps = false) + => new(tickProvider, timeSpan, fillGaps); +} diff --git a/src/_common/Quotes/Tick.cs b/src/_common/Quotes/Tick.cs new file mode 100644 index 000000000..83782b0b1 --- /dev/null +++ b/src/_common/Quotes/Tick.cs @@ -0,0 +1,36 @@ +namespace Skender.Stock.Indicators; + +/// +/// Built-in Tick type, representing a raw market tick data point. +/// +/// +/// Date/time of the tick +/// +/// +/// Tick price +/// +/// +/// Tick volume (quantity traded) +/// +/// +/// Optional unique execution ID for duplicate detection +/// +[Serializable] +public record Tick +( + DateTime Timestamp, + decimal Price, + decimal Volume, + string? ExecutionId = null +) : ITick +{ + /// + [JsonIgnore] + public double Value => (double)Price; + + /// + /// Initializes a new instance of the class. + /// + public Tick() + : this(default, default, default, null) { } +} diff --git a/src/_common/StreamHub/StreamHub.Observable.cs b/src/_common/StreamHub/StreamHub.Observable.cs index 47a8bb135..0b5eb3157 100644 --- a/src/_common/StreamHub/StreamHub.Observable.cs +++ b/src/_common/StreamHub/StreamHub.Observable.cs @@ -81,7 +81,7 @@ private sealed class Unsubscriber( /// /// TSeries item to send. /// Provider index hint. - private void NotifyObserversOnAdd(TOut item, int? indexHint) + protected void NotifyObserversOnAdd(TOut item, int? indexHint) { if (ObserverCount == 0) { diff --git a/tests/indicators/_base/StreamHubTestBase.cs b/tests/indicators/_base/StreamHubTestBase.cs index b95f310b0..1698db093 100644 --- a/tests/indicators/_base/StreamHubTestBase.cs +++ b/tests/indicators/_base/StreamHubTestBase.cs @@ -24,6 +24,14 @@ public interface ITestQuoteObserver void QuoteObserver_WithWarmupLateArrivalAndRemoval_MatchesSeriesExactly(); } +public interface ITestTickObserver +{ + /// + /// Tests hub compatibility with tick provider + /// + void TickObserver_WithWarmupAndMultipleSameTimestamp_WorksCorrectly(); +} + /// /// Add this to stream chainee indicator tests. /// diff --git a/tests/indicators/_common/Quotes/Quote.AggregatorHub.Tests.cs b/tests/indicators/_common/Quotes/Quote.AggregatorHub.Tests.cs new file mode 100644 index 000000000..77c78a9ec --- /dev/null +++ b/tests/indicators/_common/Quotes/Quote.AggregatorHub.Tests.cs @@ -0,0 +1,518 @@ +namespace StreamHubs; + +[TestClass] +public class QuoteAggregatorHubTests : StreamHubTestBase, ITestQuoteObserver, ITestChainProvider +{ + [TestMethod] + public override void ToStringOverride_ReturnsExpectedName() + { + QuoteHub provider = new(); + QuoteAggregatorHub aggregator = provider.ToQuoteAggregatorHub(PeriodSize.FiveMinutes); + + string result = aggregator.ToString(); + + result.Should().Contain("QUOTE-AGG"); + result.Should().Contain("00:05:00"); + + aggregator.Unsubscribe(); + provider.EndTransmission(); + } + + [TestMethod] + public void BasicAggregation_OneMinuteToFiveMinute() + { + // Setup: Create minute-level quotes + List minuteQuotes = + [ + new(DateTime.Parse("2023-11-09 10:00", invariantCulture), 100, 105, 99, 102, 1000), + new(DateTime.Parse("2023-11-09 10:01", invariantCulture), 102, 106, 101, 104, 1100), + new(DateTime.Parse("2023-11-09 10:02", invariantCulture), 104, 107, 103, 105, 1200), + new(DateTime.Parse("2023-11-09 10:03", invariantCulture), 105, 108, 104, 106, 1300), + new(DateTime.Parse("2023-11-09 10:04", invariantCulture), 106, 109, 105, 107, 1400), + new(DateTime.Parse("2023-11-09 10:05", invariantCulture), 107, 110, 106, 108, 1500), + new(DateTime.Parse("2023-11-09 10:06", invariantCulture), 108, 111, 107, 109, 1600), + ]; + + QuoteHub provider = new(); + QuoteAggregatorHub aggregator = provider.ToQuoteAggregatorHub(PeriodSize.FiveMinutes); + + // Add quotes + foreach (Quote q in minuteQuotes) + { + provider.Add(q); + } + + // Verify results + IReadOnlyList results = aggregator.Results; + + results.Should().HaveCount(2); + + // First 5-minute bar (10:00-10:04) + IQuote bar1 = results[0]; + bar1.Timestamp.Should().Be(DateTime.Parse("2023-11-09 10:00", invariantCulture)); + bar1.Open.Should().Be(100); + bar1.High.Should().Be(109); // Max of all highs in period + bar1.Low.Should().Be(99); // Min of all lows in period + bar1.Close.Should().Be(107); // Last close in period + bar1.Volume.Should().Be(6000); // Sum of all volumes + + // Second 5-minute bar (10:05-10:06, incomplete) + IQuote bar2 = results[1]; + bar2.Timestamp.Should().Be(DateTime.Parse("2023-11-09 10:05", invariantCulture)); + bar2.Open.Should().Be(107); + bar2.High.Should().Be(111); + bar2.Low.Should().Be(106); + bar2.Close.Should().Be(109); + bar2.Volume.Should().Be(3100); + + // Cleanup + aggregator.Unsubscribe(); + provider.EndTransmission(); + } + + [TestMethod] + public void AggregationWithTimeSpan() + { + List minuteQuotes = + [ + new(DateTime.Parse("2023-11-09 10:00", invariantCulture), 100, 105, 99, 102, 1000), + new(DateTime.Parse("2023-11-09 10:01", invariantCulture), 102, 106, 101, 104, 1100), + new(DateTime.Parse("2023-11-09 10:02", invariantCulture), 104, 107, 103, 105, 1200), + ]; + + QuoteHub provider = new(); + QuoteAggregatorHub aggregator = provider.ToQuoteAggregatorHub(TimeSpan.FromMinutes(3)); + + foreach (Quote q in minuteQuotes) + { + provider.Add(q); + } + + IReadOnlyList results = aggregator.Results; + + results.Should().HaveCount(1); + + IQuote bar = results[0]; + bar.Timestamp.Should().Be(DateTime.Parse("2023-11-09 10:00", invariantCulture)); + bar.Open.Should().Be(100); + bar.High.Should().Be(107); + bar.Low.Should().Be(99); + bar.Close.Should().Be(105); + bar.Volume.Should().Be(3300); + + aggregator.Unsubscribe(); + provider.EndTransmission(); + } + + [TestMethod] + public void GapFilling_CarriesForwardPrices() + { + List minuteQuotes = + [ + new(DateTime.Parse("2023-11-09 10:00", invariantCulture), 100, 105, 99, 102, 1000), + new(DateTime.Parse("2023-11-09 10:01", invariantCulture), 102, 106, 101, 104, 1100), + // Gap: 10:02 missing + new(DateTime.Parse("2023-11-09 10:03", invariantCulture), 105, 108, 104, 106, 1300), + ]; + + QuoteHub provider = new(); + QuoteAggregatorHub aggregator = provider.ToQuoteAggregatorHub( + PeriodSize.OneMinute, + fillGaps: true); + + foreach (Quote q in minuteQuotes) + { + provider.Add(q); + } + + IReadOnlyList results = aggregator.Results; + + // Should have 4 bars: 10:00, 10:01, 10:02 (gap-filled), 10:03 + results.Should().HaveCount(4); + + // Verify gap-filled bar at 10:02 + IQuote gapBar = results[2]; + gapBar.Timestamp.Should().Be(DateTime.Parse("2023-11-09 10:02", invariantCulture)); + gapBar.Open.Should().Be(104); // Carried forward from 10:01 close + gapBar.High.Should().Be(104); + gapBar.Low.Should().Be(104); + gapBar.Close.Should().Be(104); + gapBar.Volume.Should().Be(0); + + aggregator.Unsubscribe(); + provider.EndTransmission(); + } + + [TestMethod] + public void GapFilling_MultipleConsecutiveGaps() + { + List minuteQuotes = + [ + new(DateTime.Parse("2023-11-09 10:00", invariantCulture), 100, 105, 99, 102, 1000), + // Gaps: 10:01, 10:02, 10:03 missing + new(DateTime.Parse("2023-11-09 10:04", invariantCulture), 105, 108, 104, 106, 1300), + ]; + + QuoteHub provider = new(); + QuoteAggregatorHub aggregator = provider.ToQuoteAggregatorHub( + PeriodSize.OneMinute, + fillGaps: true); + + foreach (Quote q in minuteQuotes) + { + provider.Add(q); + } + + IReadOnlyList results = aggregator.Results; + + // Should have 5 bars: 10:00, 10:01 (gap), 10:02 (gap), 10:03 (gap), 10:04 + results.Should().HaveCount(5); + + // Verify all gap-filled bars carry forward price of 102 + for (int i = 1; i <= 3; i++) + { + IQuote gapBar = results[i]; + gapBar.Open.Should().Be(102); + gapBar.High.Should().Be(102); + gapBar.Low.Should().Be(102); + gapBar.Close.Should().Be(102); + gapBar.Volume.Should().Be(0); + } + + aggregator.Unsubscribe(); + provider.EndTransmission(); + } + + [TestMethod] + public void NoGapFilling_SkipsMissingPeriods() + { + List minuteQuotes = + [ + new(DateTime.Parse("2023-11-09 10:00", invariantCulture), 100, 105, 99, 102, 1000), + // Gap: 10:01, 10:02 missing + new(DateTime.Parse("2023-11-09 10:03", invariantCulture), 105, 108, 104, 106, 1300), + ]; + + QuoteHub provider = new(); + QuoteAggregatorHub aggregator = provider.ToQuoteAggregatorHub( + PeriodSize.OneMinute, + fillGaps: false); + + foreach (Quote q in minuteQuotes) + { + provider.Add(q); + } + + IReadOnlyList results = aggregator.Results; + + // Should have only 2 bars (no gap filling) + results.Should().HaveCount(2); + + results[0].Timestamp.Should().Be(DateTime.Parse("2023-11-09 10:00", invariantCulture)); + results[1].Timestamp.Should().Be(DateTime.Parse("2023-11-09 10:03", invariantCulture)); + + aggregator.Unsubscribe(); + provider.EndTransmission(); + } + + [TestMethod] + public void SameTimestampUpdates_ReplaceWithinSameBar() + { + QuoteHub provider = new(); + QuoteAggregatorHub aggregator = provider.ToQuoteAggregatorHub(PeriodSize.FiveMinutes); + + // Add initial quote + provider.Add(new Quote( + DateTime.Parse("2023-11-09 10:00", invariantCulture), 100, 105, 99, 102, 1000)); + + // Add another quote in the same 5-minute period (should update the bar) + provider.Add(new Quote( + DateTime.Parse("2023-11-09 10:02", invariantCulture), 102, 110, 98, 108, 1500)); + + IReadOnlyList results = aggregator.Results; + + results.Should().HaveCount(1); + + IQuote bar = results[0]; + bar.Timestamp.Should().Be(DateTime.Parse("2023-11-09 10:00", invariantCulture)); + bar.Open.Should().Be(100); // Original open + bar.High.Should().Be(110); // Updated high + bar.Low.Should().Be(98); // Updated low + bar.Close.Should().Be(108); // Latest close + bar.Volume.Should().Be(2500); // Sum of volumes + + aggregator.Unsubscribe(); + provider.EndTransmission(); + } + + [TestMethod] + public void ChainWithDownstreamIndicator_WorksCorrectly() + { + List minuteQuotes = + [ + new(DateTime.Parse("2023-11-09 10:00", invariantCulture), 100, 105, 99, 102, 1000), + new(DateTime.Parse("2023-11-09 10:05", invariantCulture), 102, 107, 101, 104, 1100), + new(DateTime.Parse("2023-11-09 10:10", invariantCulture), 104, 109, 103, 106, 1200), + new(DateTime.Parse("2023-11-09 10:15", invariantCulture), 106, 111, 105, 108, 1300), + new(DateTime.Parse("2023-11-09 10:20", invariantCulture), 108, 113, 107, 110, 1400), + ]; + + QuoteHub provider = new(); + QuoteAggregatorHub aggregator = provider.ToQuoteAggregatorHub(PeriodSize.FiveMinutes); + + // Chain with SMA + SmaHub sma = aggregator.ToSmaHub(3); + + foreach (Quote q in minuteQuotes) + { + provider.Add(q); + } + + // Verify aggregated quotes + IReadOnlyList aggResults = aggregator.Results; + aggResults.Should().HaveCount(5); + + // Verify SMA results + IReadOnlyList smaResults = sma.Results; + smaResults.Should().HaveCount(5); + + // First two should be null (not enough data) + smaResults[0].Sma.Should().BeNull(); + smaResults[1].Sma.Should().BeNull(); + + // Third should be average of first three closes + smaResults[2].Sma.Should().NotBeNull(); + const double expectedSma = (102 + 104 + 106) / 3.0; + smaResults[2].Sma.Should().BeApproximately(expectedSma, 0.0001); + + sma.Unsubscribe(); + aggregator.Unsubscribe(); + provider.EndTransmission(); + } + + [TestMethod] + public void InvalidPeriodSize_Month_ThrowsException() + { + QuoteHub provider = new(); + + FluentActions + .Invoking(() => provider.ToQuoteAggregatorHub(PeriodSize.Month)) + .Should() + .ThrowExactly() + .WithMessage("*Month aggregation is not supported*"); + + provider.EndTransmission(); + } + + [TestMethod] + public void InvalidTimeSpan_Zero_ThrowsException() + { + QuoteHub provider = new(); + + FluentActions + .Invoking(() => provider.ToQuoteAggregatorHub(TimeSpan.Zero)) + .Should() + .ThrowExactly() + .WithMessage("*must be greater than zero*"); + + provider.EndTransmission(); + } + + [TestMethod] + public void InvalidTimeSpan_Negative_ThrowsException() + { + QuoteHub provider = new(); + + FluentActions + .Invoking(() => provider.ToQuoteAggregatorHub(TimeSpan.FromMinutes(-5))) + .Should() + .ThrowExactly() + .WithMessage("*must be greater than zero*"); + + provider.EndTransmission(); + } + + [TestMethod] + public void QuoteObserver_WithWarmupLateArrivalAndRemoval_MatchesSeriesExactly() + { + // Setup quote provider hub + QuoteHub provider = new(); + + // Prefill quotes at provider (warmup window) + provider.Add(Quotes.Take(20)); + + // Initialize aggregator (1-minute aggregation, no gaps to keep it simple) + QuoteAggregatorHub aggregator = provider.ToQuoteAggregatorHub(PeriodSize.OneMinute); + + // Fetch initial results (early) + IReadOnlyList sut = aggregator.Results; + + // Emulate adding quotes to provider hub + for (int i = 20; i < Quotes.Count; i++) + { + // Skip one (add later) + if (i == 80) { continue; } + + // Skip the removal index for now + if (i == removeAtIndex) { continue; } + + Quote q = Quotes[i]; + provider.Add(q); + + // Resend duplicate quotes + if (i is > 100 and < 105) { provider.Add(q); } + } + + // Late arrival + provider.Insert(Quotes[80]); + IReadOnlyList afterLateArrival = aggregator.Results.ToList(); + + // Removal (simulate rollback) + provider.Insert(Quotes[removeAtIndex]); + IReadOnlyList afterRemoval = aggregator.Results.ToList(); + + // Verify structural invariants + sut.Should().NotBeEmpty(); + afterLateArrival.Should().NotBeEmpty(); + afterRemoval.Should().NotBeEmpty(); + + // Verify ordering and consistency + sut.Should().AllSatisfy(q => { + q.Timestamp.Should().NotBe(default); + q.Open.Should().BeGreaterThan(0); + q.High.Should().BeGreaterThanOrEqualTo(q.Low); + q.Close.Should().BeGreaterThan(0); + }); + + afterLateArrival.Should().AllSatisfy(q => { + q.Timestamp.Should().NotBe(default); + q.Open.Should().BeGreaterThan(0); + q.High.Should().BeGreaterThanOrEqualTo(q.Low); + q.Close.Should().BeGreaterThan(0); + }); + + afterRemoval.Should().AllSatisfy(q => { + q.Timestamp.Should().NotBe(default); + q.Open.Should().BeGreaterThan(0); + q.High.Should().BeGreaterThanOrEqualTo(q.Low); + q.Close.Should().BeGreaterThan(0); + }); + + // Verify ordering is preserved + for (int i = 1; i < sut.Count; i++) + { + sut[i].Timestamp.Should().BeGreaterThanOrEqualTo(sut[i - 1].Timestamp); + } + + for (int i = 1; i < afterLateArrival.Count; i++) + { + afterLateArrival[i].Timestamp.Should().BeGreaterThanOrEqualTo(afterLateArrival[i - 1].Timestamp); + } + + for (int i = 1; i < afterRemoval.Count; i++) + { + afterRemoval[i].Timestamp.Should().BeGreaterThanOrEqualTo(afterRemoval[i - 1].Timestamp); + } + + // Cleanup + aggregator.Unsubscribe(); + provider.EndTransmission(); + } + + [TestMethod] + public void ChainProvider_MatchesSeriesExactly() + { + const int emaPeriods = 14; + + // Setup quote provider hub + QuoteHub provider = new(); + + // Initialize aggregator and chain with EMA + EmaHub observer = provider + .ToQuoteAggregatorHub(PeriodSize.FiveMinutes) + .ToEmaHub(emaPeriods); + + // Emulate quote stream - for aggregator, use simple sequential adding + // (late arrivals and removals don't make sense for time-based aggregation) + foreach (Quote q in Quotes) + { + provider.Add(q); + } + + // Final results + IReadOnlyList sut = observer.Results; + + // Compare with aggregated series + IReadOnlyList aggregatedQuotes = Quotes.Aggregate(PeriodSize.FiveMinutes); + + IReadOnlyList expected = aggregatedQuotes + .ToEma(emaPeriods); + + // Assert: should have same count as batch aggregation + sut.Should().HaveCount(expected.Count); + + // Verify EMA values match closely + // Note: There may be slight differences due to streaming vs batch processing + // but they should be very close for completed bars + for (int i = emaPeriods; i < sut.Count; i++) + { + if (sut[i].Ema.HasValue && expected[i].Ema.HasValue) + { + sut[i].Ema.Should().BeApproximately(expected[i].Ema.Value, 0.1, + $"at index {i}, timestamp {sut[i].Timestamp}"); + } + } + + // Cleanup + observer.Unsubscribe(); + provider.EndTransmission(); + } + + [TestMethod] + public void Properties_AreSetCorrectly() + { + QuoteHub provider = new(); + QuoteAggregatorHub aggregator = provider.ToQuoteAggregatorHub( + PeriodSize.FifteenMinutes, + fillGaps: true); + + aggregator.FillGaps.Should().BeTrue(); + aggregator.AggregationPeriod.Should().Be(TimeSpan.FromMinutes(15)); + + aggregator.Unsubscribe(); + provider.EndTransmission(); + } + + [TestMethod] + public void SameTimestampQuotes_ReplacesPriorBar() + { + QuoteHub provider = new(); + QuoteAggregatorHub aggregator = provider.ToQuoteAggregatorHub(PeriodSize.FiveMinutes); + + // Add initial quote at 10:00 + provider.Add(new Quote( + DateTime.Parse("2023-11-09 10:00", invariantCulture), + 100m, 110m, 99m, 105m, 1000m)); + + // Add another quote also at 10:00 (within same 5-min period) + provider.Add(new Quote( + DateTime.Parse("2023-11-09 10:01", invariantCulture), + 105m, 115m, 104m, 108m, 1100m)); + + IReadOnlyList results = aggregator.Results; + + results.Should().HaveCount(1); + + // Bar should incorporate both quotes + IQuote bar = results[0]; + bar.Open.Should().Be(100m); // First quote's open + bar.High.Should().Be(115m); // Max of both highs + bar.Low.Should().Be(99m); // Min of both lows + bar.Close.Should().Be(108m); // Last quote's close + bar.Volume.Should().Be(2100m); // Sum of volumes + + aggregator.Unsubscribe(); + provider.EndTransmission(); + } +} + diff --git a/tests/indicators/_common/Quotes/Tick.AggregatorHub.Tests.cs b/tests/indicators/_common/Quotes/Tick.AggregatorHub.Tests.cs new file mode 100644 index 000000000..932e6fcf5 --- /dev/null +++ b/tests/indicators/_common/Quotes/Tick.AggregatorHub.Tests.cs @@ -0,0 +1,452 @@ +namespace StreamHubs; + +[TestClass] +public class TickAggregatorHubTests : StreamHubTestBase, ITestQuoteObserver, ITestChainProvider +{ + [TestMethod] + public override void ToStringOverride_ReturnsExpectedName() + { + TickHub provider = new(); + TickAggregatorHub aggregator = provider.ToTickAggregatorHub(PeriodSize.FiveMinutes); + + string result = aggregator.ToString(); + + result.Should().Contain("TICK-AGG"); + result.Should().Contain("00:05:00"); + + aggregator.Unsubscribe(); + provider.EndTransmission(); + } + + [TestMethod] + public void BasicAggregation_TicksToOneMinuteBars() + { + // Setup: Create tick-level data + List ticks = + [ + new(DateTime.Parse("2023-11-09 10:00:00", invariantCulture), 100.00m, 10m), + new(DateTime.Parse("2023-11-09 10:00:15", invariantCulture), 100.50m, 15m), + new(DateTime.Parse("2023-11-09 10:00:30", invariantCulture), 99.75m, 20m), + new(DateTime.Parse("2023-11-09 10:00:45", invariantCulture), 100.25m, 25m), + new(DateTime.Parse("2023-11-09 10:01:10", invariantCulture), 101.00m, 30m), + ]; + + TickHub provider = new(); + TickAggregatorHub aggregator = provider.ToTickAggregatorHub(PeriodSize.OneMinute); + + // Add ticks + foreach (Tick tick in ticks) + { + provider.Add(tick); + } + + // Verify results + IReadOnlyList results = aggregator.Results; + + results.Should().HaveCount(2); + + // First 1-minute bar (10:00) + IQuote bar1 = results[0]; + bar1.Timestamp.Should().Be(DateTime.Parse("2023-11-09 10:00", invariantCulture)); + bar1.Open.Should().Be(100.00m); // First tick price + bar1.High.Should().Be(100.50m); // Max tick price in period + bar1.Low.Should().Be(99.75m); // Min tick price in period + bar1.Close.Should().Be(100.25m); // Last tick price in period + bar1.Volume.Should().Be(70m); // Sum of tick volumes (10+15+20+25) + + // Second 1-minute bar (10:01) + IQuote bar2 = results[1]; + bar2.Timestamp.Should().Be(DateTime.Parse("2023-11-09 10:01", invariantCulture)); + bar2.Open.Should().Be(101.00m); + bar2.High.Should().Be(101.00m); + bar2.Low.Should().Be(101.00m); + bar2.Close.Should().Be(101.00m); + bar2.Volume.Should().Be(30m); + + // Cleanup + aggregator.Unsubscribe(); + provider.EndTransmission(); + } + + [TestMethod] + public void GapFilling_CarriesForwardPrices() + { + List ticks = + [ + new(DateTime.Parse("2023-11-09 10:00:00", invariantCulture), 100.00m, 10m), + // Gap: 10:01 missing - will be filled + new(DateTime.Parse("2023-11-09 10:02:00", invariantCulture), 102.00m, 20m), + ]; + + TickHub provider = new(); + TickAggregatorHub aggregator = provider.ToTickAggregatorHub( + PeriodSize.OneMinute, + fillGaps: true); + + foreach (Tick tick in ticks) + { + provider.Add(tick); + } + + IReadOnlyList results = aggregator.Results; + + // Should have 3 bars: 10:00, 10:01 (gap-filled), 10:02 + results.Should().HaveCount(3); + + // Verify gap-filled bar at 10:01 + IQuote gapBar = results[1]; + gapBar.Timestamp.Should().Be(DateTime.Parse("2023-11-09 10:01", invariantCulture)); + gapBar.Open.Should().Be(100.00m); // Carried forward from 10:00 close + gapBar.High.Should().Be(100.00m); + gapBar.Low.Should().Be(100.00m); + gapBar.Close.Should().Be(100.00m); + gapBar.Volume.Should().Be(0m); + + aggregator.Unsubscribe(); + provider.EndTransmission(); + } + + [TestMethod] + public void NoGapFilling_SkipsMissingPeriods() + { + List ticks = + [ + new(DateTime.Parse("2023-11-09 10:00:00", invariantCulture), 100.00m, 10m), + // Gap: 10:01, 10:02 missing + new(DateTime.Parse("2023-11-09 10:03:00", invariantCulture), 103.00m, 30m), + ]; + + TickHub provider = new(); + TickAggregatorHub aggregator = provider.ToTickAggregatorHub( + PeriodSize.OneMinute, + fillGaps: false); + + foreach (Tick tick in ticks) + { + provider.Add(tick); + } + + IReadOnlyList results = aggregator.Results; + + // Should have only 2 bars (no gap filling) + results.Should().HaveCount(2); + + results[0].Timestamp.Should().Be(DateTime.Parse("2023-11-09 10:00", invariantCulture)); + results[1].Timestamp.Should().Be(DateTime.Parse("2023-11-09 10:03", invariantCulture)); + + aggregator.Unsubscribe(); + provider.EndTransmission(); + } + + [TestMethod] + public void ChainWithDownstreamIndicator_WorksCorrectly() + { + List ticks = + [ + new(DateTime.Parse("2023-11-09 10:00", invariantCulture), 100m, 10m), + new(DateTime.Parse("2023-11-09 10:01", invariantCulture), 101m, 11m), + new(DateTime.Parse("2023-11-09 10:02", invariantCulture), 102m, 12m), + new(DateTime.Parse("2023-11-09 10:03", invariantCulture), 103m, 13m), + new(DateTime.Parse("2023-11-09 10:04", invariantCulture), 104m, 14m), + ]; + + TickHub provider = new(); + TickAggregatorHub aggregator = provider.ToTickAggregatorHub(PeriodSize.OneMinute); + + // Chain with SMA + SmaHub sma = aggregator.ToSmaHub(3); + + foreach (Tick tick in ticks) + { + provider.Add(tick); + } + + // Verify aggregated quotes + IReadOnlyList aggResults = aggregator.Results; + aggResults.Should().HaveCount(5); + + // Verify SMA results + IReadOnlyList smaResults = sma.Results; + smaResults.Should().HaveCount(5); + + // First two should be null (not enough data) + smaResults[0].Sma.Should().BeNull(); + smaResults[1].Sma.Should().BeNull(); + + // Third should be average of first three closes + smaResults[2].Sma.Should().NotBeNull(); + const double expectedSma = (100 + 101 + 102) / 3.0; + smaResults[2].Sma.Should().BeApproximately(expectedSma, Money4); + + sma.Unsubscribe(); + aggregator.Unsubscribe(); + provider.EndTransmission(); + } + + [TestMethod] + public void QuoteObserver_WithWarmupLateArrivalAndRemoval_MatchesSeriesExactly() + { + // Setup tick provider hub + TickHub provider = new(); + + // Create some tick data + List ticks = []; + for (int i = 0; i < 100; i++) + { + ticks.Add(new Tick( + DateTime.Parse("2023-11-09 10:00", invariantCulture).AddMinutes(i), + 100m + i, + 10m + i)); + } + + // Prefill warmup window + provider.Add(ticks.Take(20)); + + // Initialize aggregator + TickAggregatorHub aggregator = provider.ToTickAggregatorHub(PeriodSize.OneMinute); + + // Fetch initial results + IReadOnlyList sut = aggregator.Results.ToList(); + + // Stream remaining ticks in-order including duplicates + for (int i = 20; i < ticks.Count; i++) + { + // Skip one (add later as late arrival) + if (i == 80) { continue; } + + // Skip removal index + if (i == 50) { continue; } + + Tick tick = ticks[i]; + provider.Add(tick); + + // Stream duplicate at same timestamp with different execution ID + if (i is > 30 and < 35) + { + provider.Add(new Tick( + tick.Timestamp, + tick.Price + 0.5m, + tick.Volume + 1m, + $"dup-{i}")); + } + } + + // Late historical insert (earlier timestamp than current tail) + provider.Insert(ticks[80]); + IReadOnlyList afterLateArrival = aggregator.Results.ToList(); + + // Remove a historical tick (simulate rollback) + provider.Insert(ticks[50]); + IReadOnlyList afterRemoval = aggregator.Results.ToList(); + + // Verify structural invariants at all stages + sut.Should().AllSatisfy(q => { + q.Timestamp.Should().NotBe(default); + q.Open.Should().BeGreaterThan(0); + q.High.Should().BeGreaterThanOrEqualTo(q.Low); + q.Close.Should().BeGreaterThan(0); + }); + + afterLateArrival.Should().AllSatisfy(q => { + q.Timestamp.Should().NotBe(default); + q.Open.Should().BeGreaterThan(0); + q.High.Should().BeGreaterThanOrEqualTo(q.Low); + q.Close.Should().BeGreaterThan(0); + }); + + afterRemoval.Should().AllSatisfy(q => { + q.Timestamp.Should().NotBe(default); + q.Open.Should().BeGreaterThan(0); + q.High.Should().BeGreaterThanOrEqualTo(q.Low); + q.Close.Should().BeGreaterThan(0); + }); + + // Verify ordering is strictly preserved + for (int i = 1; i < sut.Count; i++) + { + sut[i].Timestamp.Should().BeGreaterThanOrEqualTo(sut[i - 1].Timestamp); + } + + for (int i = 1; i < afterLateArrival.Count; i++) + { + afterLateArrival[i].Timestamp.Should().BeGreaterThanOrEqualTo(afterLateArrival[i - 1].Timestamp); + } + + for (int i = 1; i < afterRemoval.Count; i++) + { + afterRemoval[i].Timestamp.Should().BeGreaterThanOrEqualTo(afterRemoval[i - 1].Timestamp); + } + + // Cleanup + aggregator.Unsubscribe(); + provider.EndTransmission(); + } + + [TestMethod] + public void ChainProvider_MatchesSeriesExactly() + { + const int emaPeriods = 14; + + // Setup tick provider hub + TickHub provider = new(); + + // Create tick data + List ticks = []; + for (int i = 0; i < 100; i++) + { + Tick tick = new( + DateTime.Parse("2023-11-09 10:00", invariantCulture).AddMinutes(i), + 100m + i, + 10m + i); + ticks.Add(tick); + provider.Add(tick); + } + + // Initialize aggregator and chain with EMA + TickAggregatorHub aggregator = provider.ToTickAggregatorHub(PeriodSize.OneMinute); + EmaHub observer = aggregator.ToEmaHub(emaPeriods); + + // Final results + IReadOnlyList sut = observer.Results; + + // Should have results + sut.Should().HaveCount(100); + + // Verify some EMA values are calculated + sut.Skip(emaPeriods).Should().AllSatisfy(r => r.Ema.Should().NotBeNull()); + + // Build reference EMA Series from same tick sequence + // First aggregate ticks to quotes + List quoteSequence = []; + foreach (Tick tick in ticks) + { + DateTime barTimestamp = tick.Timestamp.RoundDown(TimeSpan.FromMinutes(1)); + Quote q = quoteSequence.FirstOrDefault(x => x.Timestamp == barTimestamp); + if (q == null) + { + q = new Quote(barTimestamp, tick.Price, tick.Price, tick.Price, tick.Price, tick.Volume); + quoteSequence.Add(q); + } + else + { + // Update existing bar + q = new Quote( + q.Timestamp, + q.Open, + Math.Max(q.High, tick.Price), + Math.Min(q.Low, tick.Price), + tick.Price, + q.Volume + tick.Volume); + quoteSequence[quoteSequence.Count - 1] = q; + } + } + + // Calculate reference EMA on aggregated quotes + IReadOnlyList referenceEma = quoteSequence.Ema(emaPeriods); + + // Compare observer results to reference EMA with strict ordering and equality + sut.Should().HaveSameCount(referenceEma); + for (int i = 0; i < sut.Count; i++) + { + // Verify timestamp and ordering + sut[i].Timestamp.Should().Be(referenceEma[i].Timestamp); + if (i > 0) + { + sut[i].Timestamp.Should().BeGreaterThanOrEqualTo(sut[i - 1].Timestamp); + } + + // Verify EMA values match + if (referenceEma[i].Ema.HasValue) + { + sut[i].Ema.Should().BeApproximately(referenceEma[i].Ema.Value, Money4, + $"EMA at index {i} should match reference series"); + } + else + { + sut[i].Ema.Should().BeNull($"EMA at index {i} should be null like reference"); + } + } + + // Cleanup + observer.Unsubscribe(); + provider.EndTransmission(); + } + + [TestMethod] + public void Properties_AreSetCorrectly() + { + TickHub provider = new(); + TickAggregatorHub aggregator = provider.ToTickAggregatorHub( + PeriodSize.FifteenMinutes, + fillGaps: true); + + aggregator.FillGaps.Should().BeTrue(); + aggregator.AggregationPeriod.Should().Be(TimeSpan.FromMinutes(15)); + + aggregator.Unsubscribe(); + provider.EndTransmission(); + } + + [TestMethod] + public void DuplicateExecutionId_CorrectionRebuildsBar() + { + TickHub provider = new(); + TickAggregatorHub aggregator = provider.ToTickAggregatorHub(PeriodSize.OneMinute); + + // Add tick with execution ID + provider.Add(new Tick( + DateTime.Parse("2023-11-09 10:00:00", invariantCulture), + 100m, 10m, "EXEC-001")); + + // Add correction with same execution ID and timestamp + provider.Add(new Tick( + DateTime.Parse("2023-11-09 10:00:00", invariantCulture), + 200m, 20m, "EXEC-001")); + + IReadOnlyList results = aggregator.Results; + + results.Should().HaveCount(1); + + // Should reflect corrected tick + IQuote bar = results[0]; + bar.Open.Should().Be(200m); + bar.High.Should().Be(200m); + bar.Low.Should().Be(200m); + bar.Close.Should().Be(200m); + bar.Volume.Should().Be(20m); + + aggregator.Unsubscribe(); + provider.EndTransmission(); + } + + [TestMethod] + public void TicksWithoutExecutionId_DuplicatesAllowed() + { + TickHub provider = new(); + TickAggregatorHub aggregator = provider.ToTickAggregatorHub(PeriodSize.OneMinute); + + // Add ticks without execution IDs (same timestamp is allowed) + provider.Add(new Tick( + DateTime.Parse("2023-11-09 10:00:00", invariantCulture), + 100m, 10m, null)); + + provider.Add(new Tick( + DateTime.Parse("2023-11-09 10:00:00", invariantCulture), + 105m, 15m, null)); + + IReadOnlyList results = aggregator.Results; + + results.Should().HaveCount(1); + + // Both ticks should be incorporated + IQuote bar = results[0]; + bar.Open.Should().Be(100m); // First tick price + bar.High.Should().Be(105m); + bar.Low.Should().Be(100m); + bar.Close.Should().Be(105m); // Last tick price + bar.Volume.Should().Be(25m); // Sum of both volumes + + aggregator.Unsubscribe(); + provider.EndTransmission(); + } +} diff --git a/tests/indicators/_common/Quotes/Tick.StreamHub.Tests.cs b/tests/indicators/_common/Quotes/Tick.StreamHub.Tests.cs new file mode 100644 index 000000000..2757365ae --- /dev/null +++ b/tests/indicators/_common/Quotes/Tick.StreamHub.Tests.cs @@ -0,0 +1,361 @@ +namespace StreamHubs; + +[TestClass] +public class TickStreamHubTests : StreamHubTestBase, ITestTickObserver +{ + [TestMethod] + public override void ToStringOverride_ReturnsExpectedName() + { + TickHub hub = new(); + string result = hub.ToString(); + + result.Should().Contain("TICKS"); + result.Should().Contain("0 items"); + + hub.EndTransmission(); + } + + [TestMethod] + public void StandaloneInitialization_WorksCorrectly() + { + TickHub hub = new(); + + hub.Should().NotBeNull(); + hub.MaxCacheSize.Should().BeGreaterThan(0); + hub.Cache.Should().BeEmpty(); + + hub.EndTransmission(); + } + + [TestMethod] + public void ProviderBackedInitialization_WorksCorrectly() + { + TickHub provider = new(); + TickHub hub = new(provider); + + hub.Should().NotBeNull(); + hub.Cache.Should().BeEmpty(); + + hub.Unsubscribe(); + provider.EndTransmission(); + } + + [TestMethod] + public void SameTimestamp_WithoutExecutionId_ReplacesAndNotifiesAsAddition() + { + TickHub hub = new(); + int addCount = 0; + + TestTickObserver observer = new(); + observer.OnAddAction = (tick, notify, idx) => addCount++; + + hub.Subscribe(observer); + + // Add first tick without execution ID + Tick tick1 = new( + DateTime.Parse("2023-11-09 10:00:00", invariantCulture), + 100m, 10m, null); + hub.Add(tick1); + + // Add second tick with same timestamp but different price (no execution ID) + Tick tick2 = new( + DateTime.Parse("2023-11-09 10:00:00", invariantCulture), + 105m, 15m, null); + hub.Add(tick2); + + // Cache should have only one tick (the latest) + hub.Cache.Should().HaveCount(1); + hub.Cache[0].Price.Should().Be(105m); + hub.Cache[0].Volume.Should().Be(15m); + + // Both ticks should be notified as additions for aggregators to process + addCount.Should().Be(2); + + observer.Unsubscribe(); + hub.EndTransmission(); + } + + [TestMethod] + public void SameTimestamp_WithSameExecutionId_ReplacesAndNotifiesRebuild() + { + TickHub hub = new(); + int rebuildCount = 0; + + TestTickObserver observer = new(); + observer.OnRebuildAction = (ts) => rebuildCount++; + + hub.Subscribe(observer); + + // Add first tick with execution ID + Tick tick1 = new( + DateTime.Parse("2023-11-09 10:00:00", invariantCulture), + 100m, 10m, "EXEC-001"); + hub.Add(tick1); + + // Add updated tick with same execution ID (correction) + Tick tick2 = new( + DateTime.Parse("2023-11-09 10:00:00", invariantCulture), + 101m, 11m, "EXEC-001"); + hub.Add(tick2); + + // Cache should have only one tick (the corrected one) + hub.Cache.Should().HaveCount(1); + hub.Cache[0].Price.Should().Be(101m); + hub.Cache[0].ExecutionId.Should().Be("EXEC-001"); + + // Should trigger rebuild for correction + rebuildCount.Should().Be(1); + + observer.Unsubscribe(); + hub.EndTransmission(); + } + + [TestMethod] + public void SameTimestamp_WithDifferentExecutionId_NotifiesAddition() + { + TickHub hub = new(); + int addCount = 0; + + TestTickObserver observer = new(); + observer.OnAddAction = (tick, notify, idx) => addCount++; + + hub.Subscribe(observer); + + // Add first tick with execution ID + Tick tick1 = new( + DateTime.Parse("2023-11-09 10:00:00", invariantCulture), + 100m, 10m, "EXEC-001"); + hub.Add(tick1); + + // Add second tick with same timestamp but different execution ID + Tick tick2 = new( + DateTime.Parse("2023-11-09 10:00:00", invariantCulture), + 105m, 15m, "EXEC-002"); + hub.Add(tick2); + + // Both ticks should be notified to observers (not dropped) + addCount.Should().Be(2); + + observer.Unsubscribe(); + hub.EndTransmission(); + } + + [TestMethod] + public void ExactDuplicate_DefersToAppendCache() + { + TickHub hub = new(); + int addCount = 0; + + TestTickObserver observer = new(); + observer.OnAddAction = (tick, notify, idx) => addCount++; + + hub.Subscribe(observer); + + // Add same tick twice + Tick tick = new( + DateTime.Parse("2023-11-09 10:00:00", invariantCulture), + 100m, 10m, "EXEC-001"); + + hub.Add(tick); + hub.Add(tick); + + // Should use overflow tracking from base class + hub.Cache.Should().HaveCount(1); + addCount.Should().Be(1); // Only first add notified + + observer.Unsubscribe(); + hub.EndTransmission(); + } + + [TestMethod] + public void StandaloneRebuild_PreservesCacheAndNotifiesObservers() + { + TickHub hub = new(); + int rebuildCount = 0; + DateTime? rebuildTimestamp = null; + + TestTickObserver observer = new(); + observer.OnRebuildAction = (ts) => + { + rebuildCount++; + rebuildTimestamp = ts; + }; + + hub.Subscribe(observer); + + // Add ticks + hub.Add(new Tick(DateTime.Parse("2023-11-09 10:00", invariantCulture), 100m, 10m)); + hub.Add(new Tick(DateTime.Parse("2023-11-09 10:01", invariantCulture), 101m, 11m)); + hub.Add(new Tick(DateTime.Parse("2023-11-09 10:02", invariantCulture), 102m, 12m)); + + int initialCount = hub.Cache.Count; + + // Trigger rebuild + hub.Rebuild(DateTime.Parse("2023-11-09 10:01", invariantCulture)); + + // Cache should still have all ticks (standalone doesn't clear) + hub.Cache.Should().HaveCount(initialCount); + + // Observers should be notified + rebuildCount.Should().Be(1); + rebuildTimestamp.Should().Be(DateTime.Parse("2023-11-09 10:01", invariantCulture)); + + observer.Unsubscribe(); + hub.EndTransmission(); + } + + [TestMethod] + public void ProviderBackedRebuild_UsesStandardBehavior() + { + TickHub provider = new(); + TickHub hub = new(provider); + + // Add ticks through provider + provider.Add(new Tick(DateTime.Parse("2023-11-09 10:00", invariantCulture), 100m, 10m)); + provider.Add(new Tick(DateTime.Parse("2023-11-09 10:01", invariantCulture), 101m, 11m)); + provider.Add(new Tick(DateTime.Parse("2023-11-09 10:02", invariantCulture), 102m, 12m)); + + hub.Cache.Should().HaveCount(3); + + // Trigger rebuild on hub (not provider) + hub.Rebuild(DateTime.Parse("2023-11-09 10:01", invariantCulture)); + + // Provider-backed hub should rebuild normally + hub.Cache.Should().HaveCountGreaterOrEqualTo(2); + + hub.Unsubscribe(); + provider.EndTransmission(); + } + + [TestMethod] + public void MultipleTicksSameTimestamp_AllProcessedByAggregators() + { + // This test verifies the fix for issue #4 from code review + TickHub provider = new(); + TickAggregatorHub aggregator = provider.ToTickAggregatorHub(PeriodSize.OneMinute); + + // Add three ticks at same timestamp with different execution IDs + provider.Add(new Tick( + DateTime.Parse("2023-11-09 10:00:00", invariantCulture), + 100m, 10m, "EXEC-001")); + provider.Add(new Tick( + DateTime.Parse("2023-11-09 10:00:00", invariantCulture), + 105m, 15m, "EXEC-002")); + provider.Add(new Tick( + DateTime.Parse("2023-11-09 10:00:00", invariantCulture), + 95m, 20m, "EXEC-003")); + + IReadOnlyList results = aggregator.Results; + + results.Should().HaveCount(1); + + // All three ticks should be incorporated into the bar + IQuote bar = results[0]; + bar.High.Should().Be(105m); // Max of all three + bar.Low.Should().Be(95m); // Min of all three + bar.Volume.Should().Be(45m); // Sum of all three (10 + 15 + 20) + + aggregator.Unsubscribe(); + provider.EndTransmission(); + } + + [TestMethod] + public void TickObserver_WithWarmupAndMultipleSameTimestamp_WorksCorrectly() + { + // setup tick provider hub + TickHub provider = new(); + + // prefill warmup window + for (int i = 0; i < 20; i++) + { + provider.Add(new Tick( + DateTime.Parse("2023-11-09", invariantCulture).AddMinutes(i), + 100m + i, 10m + i, $"EXEC-{i:000}")); + } + + // initialize observer + TickHub observer = new(provider); + + // fetch initial results + IReadOnlyList results = observer.Results.ToList(); + + results.Should().HaveCount(20); + + // stream in-order duplicates (same timestamp different exec IDs) + provider.Add(new Tick( + DateTime.Parse("2023-11-09", invariantCulture).AddMinutes(20), + 120m, 30m, "EXEC-020")); + provider.Add(new Tick( + DateTime.Parse("2023-11-09", invariantCulture).AddMinutes(20), + 121m, 31m, "EXEC-021")); + + IReadOnlyList afterDuplicates = observer.Results.ToList(); + afterDuplicates.Should().HaveCount(22); // 20 + 2 more ticks with same timestamp + + // insert late historical tick (earlier DateTime than current tail) + provider.Insert(new Tick( + DateTime.Parse("2023-11-09", invariantCulture).AddMinutes(15), + 115m, 25m, "EXEC-015-LATE")); + + IReadOnlyList afterLateArrival = observer.Results.ToList(); + + // results should be recalculated and maintain ordering + afterLateArrival.Should().HaveCount(23); + for (int i = 1; i < afterLateArrival.Count; i++) + { + afterLateArrival[i].Timestamp.Should().BeGreaterThanOrEqualTo(afterLateArrival[i - 1].Timestamp); + } + + // remove a historical tick (simulate rollback) + provider.Insert(new Tick( + DateTime.Parse("2023-11-09", invariantCulture).AddMinutes(10), + 110m, 20m, "EXEC-010")); + + IReadOnlyList afterRemoval = observer.Results.ToList(); + + // results should update after removal + afterRemoval.Should().HaveCount(24); + + // Verify strict ordering is maintained after each mutation + for (int i = 1; i < afterRemoval.Count; i++) + { + afterRemoval[i].Timestamp.Should().BeGreaterThanOrEqualTo(afterRemoval[i - 1].Timestamp); + } + + // Verify content equality for final results + // (timestamps and tick counts should match expected sequence) + afterRemoval.Should().AllSatisfy(t => { + t.Timestamp.Should().NotBe(default); + t.Price.Should().BeGreaterThan(0); + t.Volume.Should().BeGreaterThan(0); + }); + + // cleanup + observer.Unsubscribe(); + provider.EndTransmission(); + } + + /// + /// Test observer helper class for tracking tick notifications. + /// + private class TestTickObserver : IStreamObserver + { + public Action? OnAddAction { get; set; } + public Action? OnRebuildAction { get; set; } + public bool IsSubscribed { get; private set; } = true; + + public void OnAdd(ITick item, bool notify, int? indexHint) + => OnAddAction?.Invoke(item, notify, indexHint); + + public void OnCompleted() => IsSubscribed = false; + + public void OnRebuild(DateTime timestamp) + => OnRebuildAction?.Invoke(timestamp); + + public void OnError(Exception error) { } + + public void OnPrune(DateTime timestamp) { } + + public void Unsubscribe() => IsSubscribed = false; + } +} diff --git a/tests/indicators/_common/StreamHub/StreamHub.Interface.Tests.cs b/tests/indicators/_common/StreamHub/StreamHub.Interface.Tests.cs index 1a3e68d1b..157852af3 100644 --- a/tests/indicators/_common/StreamHub/StreamHub.Interface.Tests.cs +++ b/tests/indicators/_common/StreamHub/StreamHub.Interface.Tests.cs @@ -32,7 +32,8 @@ public void AllStreamHubTests_ImplementCorrectInterfaces() // Define observer and provider interface types Type[] observerTypes = [ typeof(ITestChainObserver), - typeof(ITestQuoteObserver) + typeof(ITestQuoteObserver), + typeof(ITestTickObserver) ]; // Add more provider interfaces here if needed