Automated trading system with support for multiple strategies, broker integrations, and real-time market data processing.
Fortified is a production-grade automated trading platform built with FastAPI that enables systematic trading across multiple brokers with real-time market data integration. The system supports both equity and futures trading with advanced execution features including tick-level precision, multi-timeframe support, and manual tick aggregation.
- Paper Trading Mode: Full simulation environment for risk-free strategy testing with simulated fills
- Multi-Broker Support: TastyTrade and Charles Schwab integrations with OAuth2 authentication
- Real-Time Market Data:
- TastyTrade DXFeed for futures (Trade/Quote events)
- Schwab WebSocket for equities (1-second bars)
- Historical replay data source for backtesting
- Manual Tick Aggregation: Custom tick-to-bar aggregation with full control over timeframes
- Multi-Timeframe Support: Single symbol can have multiple timeframes simultaneously
- Advanced Strategies:
- EMA Crossover strategy with configurable periods
- Triple EMA ZigZag strategy with ThinkScript-compatible defaults
- Extensible strategy framework for custom implementations
- Advanced Indicators: EMA, Triple EMA, ZigZag High/Low with configurable parameters
- Redis-Powered: Streams for real-time data distribution, consumer groups for strategy isolation
- Position State Management: Unified position tracking across multiple brokers in Redis
- Tick-Level Execution: Precise order timing with stop/reversal handling
- Order Tracking: Comprehensive order tracking service for monitoring execution
- Historical Backfill: Automatic data warmup for strategies with configurable warmup periods
- RESTful API: FastAPI-based API with automatic OpenAPI documentation
- WebSocket Updates: Real-time strategy status and trade updates
- Advanced Trade History: Pagination, filtering, date ranges, and P&L statistics
- Modular Architecture: Clean separation of concerns for easy extension
┌──────────────────────────────────────────────────────────────────────┐
│ FastAPI Server │
│ API Routes | Authentication | WebSocket Manager │
└────────────────────────────┬─────────────────────────────────────────┘
│
┌───────┴────────┐
│ Redis Streams │ ← bars_stream, ticks_stream
│ Consumer Grps │ ← Strategy isolation
│ Trade States │ ← Position tracking
└───────┬────────┘
│
┌───────────────────┴─────────────────────────┐
│ │
┌──────▼──────────┐ ┌────────▼────────┐
│ Aggregation │ │ Trading │
│ Manager │ │ Manager │
│ (Data Flow) │ │ (Orchestrator) │
└──┬──────────┬───┘ └────────┬────────┘
│ │ │
│ │ ┌───────▼────────┐
│ │ │ Strategy Runner│ × N
│ │ │ (Per Symbol) │
│ │ └───┬────────┬───┘
│ │ │ │
┌────▼────┐ ┌─▼──────────┐ │ │
│ Data │ │ Aggregators│ │ │
│ Sources │ ├────────────┤ ┌───────▼────┐ │
├─────────┤ │ TimeBar │ │ Strategy │ │
│TastyTrade│─>│ (1m,5m) │──bars──> │ Instance │ │
│ DXFeed │ ├────────────┤ Redis │(EMA/ZigZag)│ │
│(Futures)│ │ TickBar │ └────┬───────┘ │
├─────────┤ │ (144t,512t)│ │ │
│ Schwab │─>├────────────┤ ┌─────────▼───────┐ │
│ (Eq.) │ │ Passthrough│ │ Indicators │ │
├─────────┤ │ (1-sec) │ │ (EMA, ZigZag) │ │
│ Replay │ └────────────┘ └─────────┬───────┘ │
│(Backtest) │ │
└─────────┘ ┌─────▼───────────▼─────┐
│ Order Generation │
└──────────┬────────────┘
│
┌───────────────┼────────────────┐
│ │ │
┌───────▼────┐ ┌───────▼────┐ ┌───────▼────┐
│ Schwab │ │ TastyTrade │ │ Paper │
│ Broker │ │ Broker │ │ Broker │
└─────┬──────┘ └─────┬──────┘ └─────┬──────┘
│ │ │
┌─────▼───────────────▼───────────────▼─────┐
│ Trade State Manager │
│ (Unified Position Tracking in Redis) │
└────────────────────────────────────────────┘
- auth.py: JWT-based authentication
- brokers.py: Broker authorization and credential management
- tickers.py: Ticker configuration and management
- trading.py: Trade execution, strategy control, manual triggers
- websocket_manager.py: Real-time WebSocket connections for strategy updates
- charts.py: Chart data WebSocket endpoints for multi-timeframe support
Orchestration (/core/orchestration)
trading_manager.py: Top-level orchestrator for trading system- Manages multiple strategy runners simultaneously
- Initializes brokers (paper or live mode)
- Coordinates aggregation manager and data sources
- Handles strategy start/stop lifecycle
- Provides status and monitoring endpoints
strategy_runner.py: Individual strategy execution manager- Runs one strategy instance for a specific symbol
- Subscribes to Redis bar and tick streams
- Feeds data to strategy and executes generated orders
- Routes orders to appropriate brokers (Schwab/TastyTrade/Paper)
- Manages broker-specific quantities
- Handles warmup period tracking
aggregation_manager.py: Orchestrates data flow from sources to aggregators to Redis- Creates and manages aggregators for each symbol/timeframe
- Routes ticks to appropriate aggregators (TimeBar, TickBar, or Passthrough)
- Handles multi-timeframe subscriptions
- Publishes bars and partial tick updates to Redis streams
trade_state_manager.py: Unified position state management- Loads and saves position state to Redis
- Tracks positions across multiple brokers
- Provides aggregate position views
- Validates position state before trades
config_loader.py: Strategy and symbol configuration- Loads symbol configurations for strategies
- Validates configuration parameters
- Provides type-safe config objects
backfill_service.py: Historical data warmup- Fetches historical bars for strategy initialization
- Populates Redis with historical data
- Supports configurable lookback periods
- Works with any data source (TastyTrade, Schwab, Replay)
Brokers (/core/brokers)
tastytrade_broker.py: TastyTrade API integration for futures trading- OAuth2 authentication with automatic token refresh
- Place orders with TastyTrade-specific actions (Buy/Sell to Open/Close)
- Position and account management
schwab_broker.py: Charles Schwab API integration for equity trading- OAuth2 authentication with automatic token refresh
- Market and limit order execution
- Real-time position tracking
paper_broker.py: Paper trading simulation broker- Simulates order execution without real API calls
- Logs what WOULD be sent to real broker
- Tracks positions in Redis with Trade State Manager
- Calculates simulated P&L
- Stores paper trades for analysis
- Supports both Schwab and TastyTrade mode simulation
base_broker.py: Abstract base class for all brokers- Defines common broker interface
- Order, Position, and Account data structures
Data Sources (/core/data_sources)
tastytrade_dxfeed_data_source.py:- DXLink WebSocket protocol for TastyTrade DXFeed
- Trade/Quote event subscriptions (not pre-aggregated Candles)
- Historical data fetching from DXFeed
- Automatic token refresh
- Futures contract resolution
schwab_data_source.py:- Schwab WebSocket for equity 1-second bars
- Level 1 quote data streaming
- OAuth2 token management
replay_data_source.py:- Historical data replay for backtesting
- Loads bars from Redis historical storage
- Simulates real-time tick flow from past data
- Configurable replay speed
- Supports strategy testing with historical data
databento_data_source.py: DEPRECATED - Legacy data source (use TastyTrade/Schwab)base_data_source.py: Abstract base classes (Tick, Bar, BaseDataSource)futures_contracts.py: Futures contract symbol resolution helpersschwab_auth_helper.py: OAuth2 authentication utilities for Schwab
Aggregation (/core/aggregation)
time_bar_aggregator.py: Time-based bar aggregation (1min, 5min, 15min, etc.)- Bucket-based time windowing
- OHLCV bar creation from ticks
- Publishes completed bars to Redis streams
- Publishes partial updates to tick streams
tick_bar_aggregator.py: Tick-count bar aggregation (144t, 512t, etc.)- Buffer-based tick accumulation
- Bar creation after N ticks
- Real-time buffer progress tracking
- Publishes partial bar updates with fill percentage
bar_passthrough_aggregator.py: Passthrough for pre-aggregated bars- Used when data source provides pre-aggregated bars (e.g., Schwab 1-second bars)
- Minimal processing, direct Redis publication
base_aggregator.py: Abstract base class for all aggregators
Indicators (/core/indicators)
ema.py: Exponential Moving Average (single period)- Configurable period
- Efficient calculation using pandas
triple_ema.py: Triple EMA indicator- Fast, medium, and slow EMA periods
- Configurable displacement
- Used in Triple EMA ZigZag strategy
zigzag_highlow.py: ZigZag high/low indicator- Multiple calculation methods (average, ATR, percent, absolute)
- Configurable reversal thresholds
- ThinkScript-compatible implementation
base_indicator.py: Abstract base class for custom indicators
Strategies (/core/strategies)
ema_crossover_strategy.py: EMA crossover trading strategy- Configurable fast and slow MA periods
- Supports EMA, SMA, WMA types
- Entry on MA crossovers
- Exit on opposite crossovers
- One-trade-per-bar limit to prevent duplicates
triple_ema_zigzag_strategy.py: Triple EMA + ZigZag strategy- Three EMAs (fast, medium, slow) for trend confirmation
- ZigZag indicator for swing high/low detection
- Entry when EMAs align with ZigZag signals
- Configurable parameters matching ThinkScript defaults
- Support for stop loss and take profit
base_strategy.py: Abstract strategy framework- Defines strategy interface (
on_new_bar,on_tick) - Warmup period management
- Signal calculation framework
- Extensible design for adding custom strategies
- Defines strategy interface (
Storage (/core/storage)
redis_streams.py: Redis Streams and ZSET storage for bars- Publishes bars to
bars_stream:{symbol}:{timeframe} - Publishes partial updates to
ticks_stream:{symbol}:{timeframe} - Stores historical bars in ZSETs with timeframe namespacing
- Consumer group management for strategy isolation
- Stream trimming to prevent unbounded growth
- Publishes bars to
redis_trade_state.py: Position state persistence- Stores position state per symbol/strategy/broker
- Tracks position side (LONG/SHORT/FLAT), quantity, and average price
- Provides state validation before trades
- Used by Trade State Manager
redis_paper_trades.py: Paper trading history storage- Stores simulated trades in Redis ZSETs
- Provides pagination and filtering
- Calculates P&L statistics
- Supports bar snapshot storage for reproducibility
redis_real_trades.py: Real trading history storage- Stores actual broker trades in Redis ZSETs
- Pagination and filtering support
- Trade count and statistics
redis_singletons.py: Singleton pattern for Redis storage instances- Provides dependency injection for FastAPI routes
- Ensures single instance per storage type
Execution (/core/execution)
tick_level_executor.py: Tick-level trade execution- Monitors tick stream for precise entry/exit timing
- Executes trades at specific price levels
- Sub-bar execution for improved fills
stop_reversal_handler.py: Stop loss and position reversal handling- Monitors positions for stop loss triggers
- Handles position reversals (long → short, short → long)
- Configurable stop loss parameters
Tracking (/core/tracking)
order_tracking_service.py: Order lifecycle tracking- Tracks order status from submission to fill
- Monitors broker order updates
- Provides order history and analytics
Utilities (/core/utils)
timezone.py: Timezone conversion utilities- EST/UTC conversion helpers
- Market hours calculations
- Python 3.12+
- Redis server
- API credentials for:
- TastyTrade (futures market data via DXFeed + broker integration)
- Charles Schwab (equity market data + broker integration)
- Clone the repository:
cd fortified-trader-backend- Create and activate virtual environment:
python -m venv venv
source venv/bin/activate # On Windows: venv\Scripts\activate- Install dependencies:
pip install -r requirements.txt- Configure environment variables:
cp .env.example .env
# Edit .env with your API keys and configuration- Start Redis:
# Linux/systemd
sudo systemctl start redis
# macOS/Homebrew
brew services start redis
# Docker
docker run -d -p 6379:6379 redis:latestEdit .env with your credentials:
# Server
PORT=8000
ENVIRONMENT=development
# JWT Authentication
JWT_SECRET=your-secure-secret-key
JWT_EXPIRATION_MINUTES=43200 # 30 days
# Redis
REDIS_HOST=localhost
REDIS_PORT=6379
REDIS_DB=0
# TastyTrade (Futures Market Data + Broker)
TASTY_CLIENT_ID=your-tastytrade-client-id
TASTY_CLIENT_SECRET=your-tastytrade-client-secret
TASTY_ACCOUNT_ID=your-tastytrade-account-id
# Schwab (Equity Market Data + Broker)
SCHWAB_CLIENT_ID=your-schwab-client-id
SCHWAB_CLIENT_SECRET=your-schwab-client-secret
SCHWAB_REDIRECT_URI=https://localhost:8000/api/brokers/schwab/callback
# Paper Trading (Optional)
PAPER_TRADES_BACKUP=false # Set to 'true' to enable file backups of paper trades
# Logging
LOG_LEVEL=INFO # DEBUG, INFO, WARNING, ERROR, CRITICALStrategies are configured via ticker configurations stored in Redis. Each symbol/strategy combination has its own configuration:
{
"symbol": "/NQ",
"strategy": "ema",
"timeframe": "144t",
"trade_enabled": true,
"schwab_quantity": 10,
"tastytrade_quantity": 1,
"period_1": 9,
"period_2": 21,
"trend_line_1": "EMA",
"trend_line_2": "EMA"
}Configuration Fields:
symbol: Trading symbol (e.g., "/NQ" for futures, "SPY" for equities)strategy: Strategy name (ema,supertrend, etc.)timeframe: Bar timeframe (1,5,144t,512t, etc.)trade_enabled: Enable/disable trading for this symbolschwab_quantity: Number of shares/contracts to trade on Schwabtastytrade_quantity: Number of contracts to trade on TastyTradeperiod_1,period_2: Strategy-specific parameters (e.g., EMA periods)trend_line_1,trend_line_2: MA type (EMA, SMA, WMA)
Paper vs Live Mode:
- Paper mode (
live_mode=false): Uses PaperBroker, simulates all trades - Live mode (
live_mode=true): Uses real brokers (Schwab/TastyTrade), places actual orders
IMPORTANT: Always test strategies in paper mode first before enabling live trading!
# Using the startup script
./start_api.sh
# Or manually with uvicorn
uvicorn main:app --reload --port 8000# Start backend with PM2
pm2 start main.py --name backend --interpreter python3 -- -m uvicorn main:app --host 0.0.0.0 --port 8000
# Start frontend (if applicable)
cd ../fortified-trader-frontend
pm2 start npm --name frontend -- run dev
# View logs
pm2 logs backend
pm2 logs frontend
# Monitor processes
pm2 monit
# Save PM2 process list
pm2 save
# Setup PM2 to start on boot
pm2 startupOnce running, access the interactive API docs:
- Swagger UI: http://localhost:8000/api/docs
- ReDoc: http://localhost:8000/api/redoc
- Health Check: http://localhost:8000/api/health
The system supports running multiple timeframes for the same symbol simultaneously. For example, /NQ can have both:
- Time-based:
1(1-minute bars),5(5-minute bars) - Tick-based:
144t(144-tick bars),512t(512-tick bars)
Each timeframe gets its own aggregator and Redis stream, allowing strategies to consume data at different granularities.
- Timeframes:
1,5,15,30,1h,4h,1d - Method: Bucket-based time windowing
- Bar Completion: At the end of each time period
- Use Case: Traditional time-based trading strategies
Example: 1-minute aggregator
08:02:00 - 08:02:59 → Accumulate ticks → 08:03:00 → Publish bar
- Timeframes:
144t,512t,1000t(any number + 't') - Method: Buffer-based tick counting
- Bar Completion: After N ticks received
- Use Case: Volume-based or tick-based trading strategies
Example: 144-tick aggregator
Tick 1-143 → Buffer accumulates → Tick 144 → Publish bar → Clear buffer
Real-time Progress:
- Publishes partial bar updates to
ticks_streamafter each tick - Includes buffer fill percentage (e.g., "90/144 = 62.5% full")
- Allows strategies to see intra-bar price action
Stream Keys:
bars_stream:/NQ:1 # Completed 1-minute bars
bars_stream:/NQ:144t # Completed 144-tick bars
ticks_stream:/NQ:1 # Partial 1-minute bar updates
ticks_stream:/NQ:144t # Partial 144-tick bar updates
Consumer Groups: Each strategy creates its own consumer group for each stream:
Consumer Group: "ema"
├─ bars_stream:/NQ:1 → ema//NQ_timestamp
└─ bars_stream:/NQ:144t → ema//NQ_timestamp
Benefits:
- No duplicate processing across strategies
- Each strategy can consume at its own pace
- Automatic message acknowledgment
- Can reset to any point in stream
# Login to get JWT token
curl -X POST http://localhost:8000/api/login \
-H "Content-Type: application/json" \
-d '{"email": "admin@example.com", "password": "your-password"}'# Get Schwab authorization URL
curl -X GET http://localhost:8000/api/brokers/schwab/authorize \
-H "Authorization: Bearer YOUR_JWT_TOKEN"
# Exchange authorization code for tokens
curl -X POST http://localhost:8000/api/brokers/schwab/token \
-H "Authorization: Bearer YOUR_JWT_TOKEN" \
-H "Content-Type: application/json" \
-d '{"code": "AUTH_CODE"}'# Get tickers for a strategy
curl -X GET http://localhost:8000/api/tickers/triple_ema \
-H "Authorization: Bearer YOUR_JWT_TOKEN"
# Update ticker configuration
curl -X POST http://localhost:8000/api/tickers \
-H "Authorization: Bearer YOUR_JWT_TOKEN" \
-H "Content-Type: application/json" \
-d '{
"strategy": "triple_ema",
"ticker": "SPY",
"config": {
"timeframe": "5Min",
"quantity": 100,
"enabled": true
}
}'# Start trading in paper mode (default - safe for testing)
curl -X GET "http://localhost:8000/api/start-trading?strategy=ema&live_mode=false" \
-H "Authorization: Bearer YOUR_JWT_TOKEN"
# Start trading in LIVE mode (real money!)
curl -X GET "http://localhost:8000/api/start-trading?strategy=ema&live_mode=true" \
-H "Authorization: Bearer YOUR_JWT_TOKEN"
# Stop trading
curl -X GET "http://localhost:8000/api/stop-trading?strategy=ema" \
-H "Authorization: Bearer YOUR_JWT_TOKEN"
# Get trading status
curl -X GET "http://localhost:8000/api/trading-status?strategy=ema" \
-H "Authorization: Bearer YOUR_JWT_TOKEN"
# Get positions
curl -X GET "http://localhost:8000/api/positions?strategy=ema" \
-H "Authorization: Bearer YOUR_JWT_TOKEN"# Get all trades (paper + real) with pagination
curl -X GET "http://localhost:8000/api/trades?strategy=ema&page=1&page_size=50" \
-H "Authorization: Bearer YOUR_JWT_TOKEN"
# Filter by trade type (paper or real)
curl -X GET "http://localhost:8000/api/trades?strategy=ema&trade_type=paper" \
-H "Authorization: Bearer YOUR_JWT_TOKEN"
# Filter by symbol and broker
curl -X GET "http://localhost:8000/api/trades?strategy=ema&symbol=/NQ&broker=tastytrade" \
-H "Authorization: Bearer YOUR_JWT_TOKEN"
# Filter by date range
curl -X GET "http://localhost:8000/api/trades?strategy=ema&start_date=2025-01-01&end_date=2025-01-31" \
-H "Authorization: Bearer YOUR_JWT_TOKEN"
# Get paper trades with pagination
curl -X GET "http://localhost:8000/api/paper-trades?strategy=ema&page=1&page_size=100" \
-H "Authorization: Bearer YOUR_JWT_TOKEN"
# Get paper trade statistics
curl -X GET "http://localhost:8000/api/paper-trades/stats?strategy=ema" \
-H "Authorization: Bearer YOUR_JWT_TOKEN"
# Get real trades
curl -X GET "http://localhost:8000/api/real-trades?strategy=ema" \
-H "Authorization: Bearer YOUR_JWT_TOKEN"# Manual trigger for ZeroDay strategy
curl -X POST http://localhost:8000/api/manual-trigger \
-H "Authorization: Bearer YOUR_JWT_TOKEN" \
-H "Content-Type: application/json" \
-d '{
"ticker": "SPX",
"action": "long"
}'
# Manual trigger for EMA strategy
curl -X POST http://localhost:8000/api/ema-manual-trigger \
-H "Authorization: Bearer YOUR_JWT_TOKEN" \
-H "Content-Type: application/json" \
-d '{
"ticker": "SPY",
"action": "short"
}'fortified-trader-backend/
├── main.py # FastAPI application entry point
├── start_api.sh # Startup script
├── requirements.txt # Python dependencies
├── .env.example # Environment variables template
├── utils.py # General utilities
├── clear_trade_states.py # Trade state cleanup script
│
├── api/ # API routes and endpoints
│ ├── auth.py # Authentication routes
│ ├── brokers.py # Broker integration routes
│ ├── tickers.py # Ticker management routes
│ ├── trading.py # Trading control routes (start/stop, trades history)
│ ├── charts.py # Chart data WebSocket endpoints
│ ├── websocket.py # Strategy WebSocket endpoints
│ ├── websocket_manager.py # WebSocket connection manager
│ └── dependencies.py # Shared dependencies (auth, etc.)
│
├── core/ # Core trading engine
│ │
│ ├── orchestration/ # Orchestration layer
│ │ ├── trading_manager.py # Top-level strategy orchestrator
│ │ ├── strategy_runner.py # Individual strategy executor
│ │ ├── aggregation_manager.py # Data flow orchestration
│ │ ├── trade_state_manager.py # Position state management
│ │ ├── config_loader.py # Strategy/symbol configuration
│ │ └── backfill_service.py # Historical data warmup
│ │
│ ├── aggregation/ # Tick-to-bar aggregation
│ │ ├── base_aggregator.py # Abstract base class
│ │ ├── time_bar_aggregator.py # Time-based (1min, 5min)
│ │ ├── tick_bar_aggregator.py # Tick-based (144t, 512t)
│ │ └── bar_passthrough_aggregator.py # Passthrough for pre-aggregated
│ │
│ ├── brokers/ # Broker API integrations
│ │ ├── base_broker.py # Abstract base class
│ │ ├── tastytrade_broker.py # TastyTrade futures broker
│ │ ├── schwab_broker.py # Schwab equity broker
│ │ └── paper_broker.py # Paper trading simulator
│ │
│ ├── data_sources/ # Market data providers
│ │ ├── base_data_source.py # Base classes (Tick, Bar)
│ │ ├── tastytrade_dxfeed_data_source.py # DXFeed WebSocket
│ │ ├── schwab_data_source.py # Schwab WebSocket
│ │ ├── replay_data_source.py # Historical replay for testing
│ │ ├── databento_data_source.py # DEPRECATED
│ │ ├── futures_contracts.py # Futures symbol resolution
│ │ └── schwab_auth_helper.py # OAuth2 utilities
│ │
│ ├── indicators/ # Technical indicators
│ │ ├── base_indicator.py # Abstract base class
│ │ ├── ema.py # Exponential Moving Average
│ │ ├── triple_ema.py # Triple EMA
│ │ └── zigzag_highlow.py # ZigZag high/low
│ │
│ ├── strategies/ # Trading strategies
│ │ ├── base_strategy.py # Abstract strategy framework
│ │ ├── ema_crossover_strategy.py # EMA crossover strategy
│ │ └── triple_ema_zigzag_strategy.py # Triple EMA + ZigZag strategy
│ │
│ ├── execution/ # Trade execution
│ │ ├── tick_level_executor.py # Tick-level execution
│ │ └── stop_reversal_handler.py # Stop loss and reversals
│ │
│ ├── tracking/ # Order tracking
│ │ └── order_tracking_service.py # Order lifecycle tracking
│ │
│ ├── storage/ # Persistent storage
│ │ ├── redis_streams.py # Redis Streams/ZSET for bars
│ │ ├── redis_trade_state.py # Position state persistence
│ │ ├── redis_paper_trades.py # Paper trade history
│ │ ├── redis_real_trades.py # Real trade history
│ │ └── redis_singletons.py # Singleton storage instances
│ │
│ └── utils/ # Core utilities
│ └── timezone.py # Timezone conversions
│
├── scripts/ # Utility scripts
│ ├── get_schwab_tokens.py # OAuth token helper
│ ├── refresh_tastytrade_token.py # TastyTrade token refresh
│ ├── tastytrade_client_credentials.py # TastyTrade auth setup
│ └── test_*.py # Various test scripts
│
├── testing/ # Integration tests
│ ├── test_1_aggregator_creation.py # Aggregator tests
│ ├── test_2_real_dxfeed_connection.py # DXFeed tests
│ ├── test_4_real_redis_persistence.py # Redis tests
│ ├── test_5_mixed_symbols_real.py # Multi-symbol tests
│ ├── test_6_schwab_equity_real.py # Schwab tests
│ └── test_7_historical_warmup_real.py # Backfill tests
│
├── tokens/ # Broker tokens (gitignored)
│ ├── schwab_tokens.json
│ └── tastytrade_tokens.json
│
├── credentials/ # API credentials (gitignored)
├── paper_trades/ # Paper trade backups (gitignored)
├── trade_states/ # Trade state backups (gitignored)
└── venv/ # Python virtual environment
- Create strategy class in
core/strategies/:
from core.strategies.base_strategy import BaseStrategy, StrategyConfig
from core.brokers.base_broker import OrderSide
from typing import Optional, List, Dict, Any
class MyStrategy(BaseStrategy):
def __init__(self, config: StrategyConfig, logger):
super().__init__(config, logger)
# Initialize indicators
# self.indicator = MyIndicator(period=config.parameters.get('period', 14))
def warmup_periods(self) -> int:
"""Return number of bars needed for warmup"""
return 50 # Adjust based on your indicators
def on_new_bar(self, bar: Dict[str, Any], current_position: Optional[Any]) -> List[Dict]:
"""Called on each new bar - main strategy logic"""
# Update internal data
# Calculate indicators
# Generate signals
# Return list of orders (or empty list)
orders = []
# Example: Entry logic
if self.should_enter_long(bar) and not current_position:
orders.append({
'symbol': bar['symbol'],
'side': OrderSide.BUY,
'reason': 'entry_long'
})
# Example: Exit logic
if self.should_exit_long(bar) and current_position:
orders.append({
'symbol': bar['symbol'],
'side': OrderSide.SELL,
'reason': 'exit_long'
})
return orders
def on_tick(self, tick: Dict[str, Any], current_position: Optional[Any]) -> List[Dict]:
"""Optional: Called on each tick for intra-bar execution"""
# Implement tick-level logic if needed
return []- Register strategy in
core/orchestration/strategy_runner.py_create_strategy()method - Create ticker configuration with strategy-specific parameters
- Test in paper mode first!
- Create broker class in
core/brokers/:
from core.brokers.base_broker import BaseBroker
class MyBroker(BaseBroker):
async def authorize(self):
"""OAuth flow"""
pass
async def place_order(self, symbol, quantity, side):
"""Place order"""
pass
async def get_positions(self):
"""Fetch positions"""
pass- Add broker routes in
api/brokers.py - Update environment variables in
.env.example
Paper Trading Workflow:
- Configure strategy in Redis via
/api/tickersendpoint - Start strategy in paper mode:
live_mode=false - Monitor paper trades via
/api/paper-trades - Review P&L and statistics via
/api/paper-trades/stats - Analyze trade history with filters
- Adjust strategy parameters as needed
- Only enable live mode after thorough testing
Historical Replay Testing:
# Use ReplayDataSource to test strategy with historical data
from core.data_sources.replay_data_source import ReplayDataSource
# Load historical bars from Redis
replay_source = ReplayDataSource(
symbol="/NQ",
timeframe="144t",
start_time="2025-01-01T09:30:00",
end_time="2025-01-01T16:00:00"
)
# Run strategy with replay data
# (See testing/test_7_historical_warmup_real.py for examples)Integration Tests:
# Run integration tests
cd testing/
python test_1_aggregator_creation.py # Test aggregators
python test_4_real_redis_persistence.py # Test Redis storage
python test_7_historical_warmup_real.py # Test backfill and replayFutures (TastyTrade DXFeed):
DXFeed WebSocket → Trade/Quote Events → Tick Objects → Aggregators
Equities (Schwab):
Schwab WebSocket → 1-second Bars → Tick Objects → Aggregators
AggregationManager orchestrates:
- Creates aggregators based on timeframe type:
TimeBarAggregatorfor time-based (1min, 5min, 15min)TickBarAggregatorfor tick-based (144t, 512t)
- Routes incoming ticks to appropriate aggregators
- Supports multi-timeframe: single symbol can have multiple timeframes
TimeBarAggregator:
Ticks → Time bucket (1min window) → OHLCV bar → Redis Stream
TickBarAggregator:
Ticks → Buffer (144 ticks) → OHLCV bar → Redis Stream
↓
Partial updates → Tick Stream (real-time progress)
Published Streams:
bars_stream:{symbol}:{timeframe}- Completed barsticks_stream:{symbol}:{timeframe}- Partial bar updates
Consumer Groups:
- Each strategy has its own consumer group (e.g., "ema")
- Strategies consume from multiple streams simultaneously
- Consumer group ensures no duplicate processing
Trading Manager → Strategy Runner Flow:
- Trading Manager creates Strategy Runner instances (one per symbol)
- Strategy Runner subscribes to Redis bar and tick streams
- Receives new bars from stream via consumer group
- Feeds bar to Strategy instance
- Strategy calculates indicators and generates signals
- Strategy returns list of orders
- Strategy Runner routes orders to appropriate brokers (paper or live)
- Brokers execute orders and update Trade State Manager
- Trade history saved to Redis (paper or real)
- WebSocket manager broadcasts updates to connected clients
Position State Management:
- Unified position tracking via Trade State Manager
- Position state stored in Redis per symbol/strategy/broker
- Supports multiple brokers per strategy
- Validates position exists before exit orders
- Tracks: side (LONG/SHORT/FLAT), quantity, average price
Paper vs Live Execution:
- Paper mode: PaperBroker simulates fills, logs API requests that would be sent
- Live mode: Real brokers (Schwab/TastyTrade) execute actual orders
- Both modes use same Trade State Manager for consistency
WebSocket Manager:
- Publishes strategy updates to connected clients
- Broadcasts trade execution events
- Streams chart data for multi-timeframe visualization
- JWT Tokens: Change
JWT_SECRETin production - Broker Tokens: Stored in
tokens/directory (gitignored) - API Keys: Never commit
.envfile - CORS: Configure
allow_originsin production - HTTPS: Use reverse proxy (nginx) with SSL in production
- Health Check:
/api/healthendpoint checks Redis connectivity - Logs: Configured via
LOG_LEVELenvironment variable - API Metrics: Available at
/api/docs(Swagger metrics)
# Check if Redis is running
redis-cli ping
# Check Redis logs
sudo journalctl -u redis -f# Refresh tokens manually
curl -X POST http://localhost:8000/api/brokers/schwab/refresh \
-H "Authorization: Bearer YOUR_JWT_TOKEN"TastyTrade DXFeed:
- Verify
TASTY_CLIENT_IDandTASTY_CLIENT_SECRETare valid - Check token refresh in logs:
tail -f ~/.pm2/logs/backend-error.log | grep "token refresh" - Verify DXFeed connection: Look for "Connected to DXLink" in logs
- Check subscription: Look for "Using Trade/Quote subscription" in logs
Schwab WebSocket:
- Verify Schwab tokens are valid and not expired
- Check WebSocket connection in logs
- Refresh Schwab tokens if needed
Check Redis Streams:
# List all streams
redis-cli KEYS "*stream*"
# Check stream length
redis-cli XLEN bars_stream:/NQ:1
# View recent bars
redis-cli XREVRANGE bars_stream:/NQ:1 + - COUNT 5
# Check consumer groups
redis-cli XINFO GROUPS bars_stream:/NQ:1Check Aggregator Status:
# View aggregation logs
pm2 logs backend | grep -E "(Aggregator|Buffer|BAR PUBLISHED)"
# Check for routing issues
pm2 logs backend | grep "Routing Tick"Decision: Implement full paper trading broker that simulates order execution and tracks positions in Redis.
Rationale:
- Safe environment for strategy testing without risk
- Logs what API requests WOULD be sent to real broker (educational)
- Uses same Trade State Manager as live brokers (consistency)
- Calculates realistic P&L for performance analysis
- Enables rapid iteration and debugging
Implementation:
- PaperBroker simulates fills at current market price
- Stores paper trades in Redis with full bar snapshots
- Supports both Schwab and TastyTrade mode simulation
- No actual broker API calls made
Decision: Centralize all position tracking in Redis via Trade State Manager, eliminating in-memory position tracking.
Rationale:
- Single source of truth for positions across all brokers
- Survives application restarts
- Enables position querying from API
- Validates state before trades (prevents invalid exits)
- Supports multi-broker strategies (e.g., Schwab + TastyTrade)
Benefits:
- No position drift between broker and internal state
- Easy debugging via Redis CLI
- Historical position tracking
- Consistent state across paper and live modes
Decision: Split orchestration into Trading Manager (multi-strategy) and Strategy Runner (single symbol).
Rationale:
- Trading Manager handles lifecycle (start/stop all symbols)
- Strategy Runner focuses on single symbol execution
- Enables parallel execution of multiple symbols
- Clean separation of concerns
- Easy to scale (one Strategy Runner per symbol)
Flow:
Trading Manager
├─ Strategy Runner (/NQ, 144t)
├─ Strategy Runner (/ES, 512t)
└─ Strategy Runner (SPY, 1)
Decision: Create ReplayDataSource that loads bars from Redis and simulates real-time flow.
Rationale:
- Test strategies with historical data
- Validates strategy logic before live trading
- Reproducible testing (same data, same results)
- Fast testing (no waiting for market hours)
- Supports backfill service integration
Use Cases:
- Strategy validation
- Parameter optimization
- Historical P&L analysis
- Regression testing
Decision: Use Trade/Quote events and manually aggregate ticks instead of subscribing to pre-aggregated Candle events from DXFeed.
Rationale:
- Full control over aggregation logic and timeframes
- Single subscription per symbol regardless of number of timeframes
- Simpler codebase - no need for special handling of pre-aggregated data
- Easier to debug and verify bar calculations
- Supports custom timeframes not available from data provider
Trade-offs:
- Slightly higher computational cost for aggregation
- Need to handle tick buffering and bar completion logic
- More complex aggregator implementations
Decision: Use Redis Streams with consumer groups instead of traditional Pub/Sub.
Rationale:
- Message persistence - can replay messages if needed
- Consumer groups prevent duplicate processing across strategies
- Each strategy can consume at its own pace
- Can reset to any point in stream for debugging
- Message acknowledgment ensures delivery
Trade-offs:
- Slightly more complex than simple Pub/Sub
- Need to manage consumer groups and stream trimming
Decision: Allow single symbol to have multiple timeframes simultaneously, each with its own aggregator and stream.
Rationale:
- Strategies can use multiple timeframes for better signals (e.g., 1-min for entries, 144t for exits)
- Single data source feeds all timeframes efficiently
- Each timeframe isolated in its own stream
- Easy to add/remove timeframes without affecting others
Decision: Configure separate quantities for each broker (Schwab, TastyTrade) at strategy start, never change during execution.
Rationale:
- Different brokers may have different position sizes (e.g., 10 shares vs 1 contract)
- Strategy logic focuses on signals, not position sizing
- Strategy Runner handles broker-specific quantity routing
- Both entry and exit orders use same configured quantities
Example:
Strategy generates: BUY signal
Strategy Runner executes:
- Schwab: BUY 10 shares
- TastyTrade: BUY 1 contract
- Tick Rate: System handles 10-100+ ticks/second per symbol
- Latency: Sub-100ms from tick receipt to bar publication
- Memory: Buffer sizes limited to prevent memory leaks
- Redis: Stream trimming (maxlen=1000) prevents unbounded growth
- Concurrency: Async/await throughout for non-blocking I/O
Proprietary - All rights reserved
For issues and questions, contact the development team or consult the API documentation at /api/docs.