Skip to content

Fortified is a production-grade automated trading platform built with FastAPI that enables systematic trading across multiple brokers with real-time market data integration. The system supports both equity and futures trading with advanced execution features including tick-level precision, multi-timeframe support, and manual tick aggregation.

Notifications You must be signed in to change notification settings

lazybigcat0624/fortified_capital_ventures_BE

Repository files navigation

Fortified Trading System - Backend

Automated trading system with support for multiple strategies, broker integrations, and real-time market data processing.

Overview

Fortified is a production-grade automated trading platform built with FastAPI that enables systematic trading across multiple brokers with real-time market data integration. The system supports both equity and futures trading with advanced execution features including tick-level precision, multi-timeframe support, and manual tick aggregation.

Key Features

  • Paper Trading Mode: Full simulation environment for risk-free strategy testing with simulated fills
  • Multi-Broker Support: TastyTrade and Charles Schwab integrations with OAuth2 authentication
  • Real-Time Market Data:
    • TastyTrade DXFeed for futures (Trade/Quote events)
    • Schwab WebSocket for equities (1-second bars)
    • Historical replay data source for backtesting
  • Manual Tick Aggregation: Custom tick-to-bar aggregation with full control over timeframes
  • Multi-Timeframe Support: Single symbol can have multiple timeframes simultaneously
  • Advanced Strategies:
    • EMA Crossover strategy with configurable periods
    • Triple EMA ZigZag strategy with ThinkScript-compatible defaults
    • Extensible strategy framework for custom implementations
  • Advanced Indicators: EMA, Triple EMA, ZigZag High/Low with configurable parameters
  • Redis-Powered: Streams for real-time data distribution, consumer groups for strategy isolation
  • Position State Management: Unified position tracking across multiple brokers in Redis
  • Tick-Level Execution: Precise order timing with stop/reversal handling
  • Order Tracking: Comprehensive order tracking service for monitoring execution
  • Historical Backfill: Automatic data warmup for strategies with configurable warmup periods
  • RESTful API: FastAPI-based API with automatic OpenAPI documentation
  • WebSocket Updates: Real-time strategy status and trade updates
  • Advanced Trade History: Pagination, filtering, date ranges, and P&L statistics
  • Modular Architecture: Clean separation of concerns for easy extension

Architecture

┌──────────────────────────────────────────────────────────────────────┐
│                         FastAPI Server                                │
│    API Routes | Authentication | WebSocket Manager                    │
└────────────────────────────┬─────────────────────────────────────────┘
                             │
                     ┌───────┴────────┐
                     │  Redis Streams │  ← bars_stream, ticks_stream
                     │  Consumer Grps │  ← Strategy isolation
                     │  Trade States  │  ← Position tracking
                     └───────┬────────┘
                             │
         ┌───────────────────┴─────────────────────────┐
         │                                              │
  ┌──────▼──────────┐                         ┌────────▼────────┐
  │   Aggregation   │                         │     Trading     │
  │    Manager      │                         │     Manager     │
  │ (Data Flow)     │                         │  (Orchestrator) │
  └──┬──────────┬───┘                         └────────┬────────┘
     │          │                                      │
     │          │                              ┌───────▼────────┐
     │          │                              │ Strategy Runner│ × N
     │          │                              │ (Per Symbol)   │
     │          │                              └───┬────────┬───┘
     │          │                                  │        │
┌────▼────┐  ┌─▼──────────┐                       │        │
│  Data   │  │ Aggregators│                       │        │
│ Sources │  ├────────────┤               ┌───────▼────┐   │
├─────────┤  │  TimeBar   │               │  Strategy  │   │
│TastyTrade│─>│  (1m,5m)   │──bars──>      │  Instance  │   │
│ DXFeed  │  ├────────────┤   Redis       │(EMA/ZigZag)│   │
│(Futures)│  │  TickBar   │               └────┬───────┘   │
├─────────┤  │ (144t,512t)│                    │           │
│ Schwab  │─>├────────────┤          ┌─────────▼───────┐   │
│  (Eq.)  │  │ Passthrough│          │   Indicators    │   │
├─────────┤  │  (1-sec)   │          │ (EMA, ZigZag)   │   │
│ Replay  │  └────────────┘          └─────────┬───────┘   │
│(Backtest)                                    │           │
└─────────┘                              ┌─────▼───────────▼─────┐
                                         │   Order Generation    │
                                         └──────────┬────────────┘
                                                    │
                                    ┌───────────────┼────────────────┐
                                    │               │                │
                            ┌───────▼────┐  ┌───────▼────┐  ┌───────▼────┐
                            │   Schwab   │  │ TastyTrade │  │   Paper    │
                            │   Broker   │  │   Broker   │  │   Broker   │
                            └─────┬──────┘  └─────┬──────┘  └─────┬──────┘
                                  │               │               │
                            ┌─────▼───────────────▼───────────────▼─────┐
                            │        Trade State Manager                 │
                            │    (Unified Position Tracking in Redis)    │
                            └────────────────────────────────────────────┘

Core Components

1. API Layer (/api)

  • auth.py: JWT-based authentication
  • brokers.py: Broker authorization and credential management
  • tickers.py: Ticker configuration and management
  • trading.py: Trade execution, strategy control, manual triggers
  • websocket_manager.py: Real-time WebSocket connections for strategy updates
  • charts.py: Chart data WebSocket endpoints for multi-timeframe support

2. Core Trading Engine (/core)

Orchestration (/core/orchestration)

  • trading_manager.py: Top-level orchestrator for trading system
    • Manages multiple strategy runners simultaneously
    • Initializes brokers (paper or live mode)
    • Coordinates aggregation manager and data sources
    • Handles strategy start/stop lifecycle
    • Provides status and monitoring endpoints
  • strategy_runner.py: Individual strategy execution manager
    • Runs one strategy instance for a specific symbol
    • Subscribes to Redis bar and tick streams
    • Feeds data to strategy and executes generated orders
    • Routes orders to appropriate brokers (Schwab/TastyTrade/Paper)
    • Manages broker-specific quantities
    • Handles warmup period tracking
  • aggregation_manager.py: Orchestrates data flow from sources to aggregators to Redis
    • Creates and manages aggregators for each symbol/timeframe
    • Routes ticks to appropriate aggregators (TimeBar, TickBar, or Passthrough)
    • Handles multi-timeframe subscriptions
    • Publishes bars and partial tick updates to Redis streams
  • trade_state_manager.py: Unified position state management
    • Loads and saves position state to Redis
    • Tracks positions across multiple brokers
    • Provides aggregate position views
    • Validates position state before trades
  • config_loader.py: Strategy and symbol configuration
    • Loads symbol configurations for strategies
    • Validates configuration parameters
    • Provides type-safe config objects
  • backfill_service.py: Historical data warmup
    • Fetches historical bars for strategy initialization
    • Populates Redis with historical data
    • Supports configurable lookback periods
    • Works with any data source (TastyTrade, Schwab, Replay)

Brokers (/core/brokers)

  • tastytrade_broker.py: TastyTrade API integration for futures trading
    • OAuth2 authentication with automatic token refresh
    • Place orders with TastyTrade-specific actions (Buy/Sell to Open/Close)
    • Position and account management
  • schwab_broker.py: Charles Schwab API integration for equity trading
    • OAuth2 authentication with automatic token refresh
    • Market and limit order execution
    • Real-time position tracking
  • paper_broker.py: Paper trading simulation broker
    • Simulates order execution without real API calls
    • Logs what WOULD be sent to real broker
    • Tracks positions in Redis with Trade State Manager
    • Calculates simulated P&L
    • Stores paper trades for analysis
    • Supports both Schwab and TastyTrade mode simulation
  • base_broker.py: Abstract base class for all brokers
    • Defines common broker interface
    • Order, Position, and Account data structures

Data Sources (/core/data_sources)

  • tastytrade_dxfeed_data_source.py:
    • DXLink WebSocket protocol for TastyTrade DXFeed
    • Trade/Quote event subscriptions (not pre-aggregated Candles)
    • Historical data fetching from DXFeed
    • Automatic token refresh
    • Futures contract resolution
  • schwab_data_source.py:
    • Schwab WebSocket for equity 1-second bars
    • Level 1 quote data streaming
    • OAuth2 token management
  • replay_data_source.py:
    • Historical data replay for backtesting
    • Loads bars from Redis historical storage
    • Simulates real-time tick flow from past data
    • Configurable replay speed
    • Supports strategy testing with historical data
  • databento_data_source.py: DEPRECATED - Legacy data source (use TastyTrade/Schwab)
  • base_data_source.py: Abstract base classes (Tick, Bar, BaseDataSource)
  • futures_contracts.py: Futures contract symbol resolution helpers
  • schwab_auth_helper.py: OAuth2 authentication utilities for Schwab

Aggregation (/core/aggregation)

  • time_bar_aggregator.py: Time-based bar aggregation (1min, 5min, 15min, etc.)
    • Bucket-based time windowing
    • OHLCV bar creation from ticks
    • Publishes completed bars to Redis streams
    • Publishes partial updates to tick streams
  • tick_bar_aggregator.py: Tick-count bar aggregation (144t, 512t, etc.)
    • Buffer-based tick accumulation
    • Bar creation after N ticks
    • Real-time buffer progress tracking
    • Publishes partial bar updates with fill percentage
  • bar_passthrough_aggregator.py: Passthrough for pre-aggregated bars
    • Used when data source provides pre-aggregated bars (e.g., Schwab 1-second bars)
    • Minimal processing, direct Redis publication
  • base_aggregator.py: Abstract base class for all aggregators

Indicators (/core/indicators)

  • ema.py: Exponential Moving Average (single period)
    • Configurable period
    • Efficient calculation using pandas
  • triple_ema.py: Triple EMA indicator
    • Fast, medium, and slow EMA periods
    • Configurable displacement
    • Used in Triple EMA ZigZag strategy
  • zigzag_highlow.py: ZigZag high/low indicator
    • Multiple calculation methods (average, ATR, percent, absolute)
    • Configurable reversal thresholds
    • ThinkScript-compatible implementation
  • base_indicator.py: Abstract base class for custom indicators

Strategies (/core/strategies)

  • ema_crossover_strategy.py: EMA crossover trading strategy
    • Configurable fast and slow MA periods
    • Supports EMA, SMA, WMA types
    • Entry on MA crossovers
    • Exit on opposite crossovers
    • One-trade-per-bar limit to prevent duplicates
  • triple_ema_zigzag_strategy.py: Triple EMA + ZigZag strategy
    • Three EMAs (fast, medium, slow) for trend confirmation
    • ZigZag indicator for swing high/low detection
    • Entry when EMAs align with ZigZag signals
    • Configurable parameters matching ThinkScript defaults
    • Support for stop loss and take profit
  • base_strategy.py: Abstract strategy framework
    • Defines strategy interface (on_new_bar, on_tick)
    • Warmup period management
    • Signal calculation framework
    • Extensible design for adding custom strategies

Storage (/core/storage)

  • redis_streams.py: Redis Streams and ZSET storage for bars
    • Publishes bars to bars_stream:{symbol}:{timeframe}
    • Publishes partial updates to ticks_stream:{symbol}:{timeframe}
    • Stores historical bars in ZSETs with timeframe namespacing
    • Consumer group management for strategy isolation
    • Stream trimming to prevent unbounded growth
  • redis_trade_state.py: Position state persistence
    • Stores position state per symbol/strategy/broker
    • Tracks position side (LONG/SHORT/FLAT), quantity, and average price
    • Provides state validation before trades
    • Used by Trade State Manager
  • redis_paper_trades.py: Paper trading history storage
    • Stores simulated trades in Redis ZSETs
    • Provides pagination and filtering
    • Calculates P&L statistics
    • Supports bar snapshot storage for reproducibility
  • redis_real_trades.py: Real trading history storage
    • Stores actual broker trades in Redis ZSETs
    • Pagination and filtering support
    • Trade count and statistics
  • redis_singletons.py: Singleton pattern for Redis storage instances
    • Provides dependency injection for FastAPI routes
    • Ensures single instance per storage type

Execution (/core/execution)

  • tick_level_executor.py: Tick-level trade execution
    • Monitors tick stream for precise entry/exit timing
    • Executes trades at specific price levels
    • Sub-bar execution for improved fills
  • stop_reversal_handler.py: Stop loss and position reversal handling
    • Monitors positions for stop loss triggers
    • Handles position reversals (long → short, short → long)
    • Configurable stop loss parameters

Tracking (/core/tracking)

  • order_tracking_service.py: Order lifecycle tracking
    • Tracks order status from submission to fill
    • Monitors broker order updates
    • Provides order history and analytics

Utilities (/core/utils)

  • timezone.py: Timezone conversion utilities
    • EST/UTC conversion helpers
    • Market hours calculations

Setup

Prerequisites

  • Python 3.12+
  • Redis server
  • API credentials for:
    • TastyTrade (futures market data via DXFeed + broker integration)
    • Charles Schwab (equity market data + broker integration)

Installation

  1. Clone the repository:
cd fortified-trader-backend
  1. Create and activate virtual environment:
python -m venv venv
source venv/bin/activate  # On Windows: venv\Scripts\activate
  1. Install dependencies:
pip install -r requirements.txt
  1. Configure environment variables:
cp .env.example .env
# Edit .env with your API keys and configuration
  1. Start Redis:
# Linux/systemd
sudo systemctl start redis

# macOS/Homebrew
brew services start redis

# Docker
docker run -d -p 6379:6379 redis:latest

Environment Configuration

Edit .env with your credentials:

# Server
PORT=8000
ENVIRONMENT=development

# JWT Authentication
JWT_SECRET=your-secure-secret-key
JWT_EXPIRATION_MINUTES=43200  # 30 days

# Redis
REDIS_HOST=localhost
REDIS_PORT=6379
REDIS_DB=0

# TastyTrade (Futures Market Data + Broker)
TASTY_CLIENT_ID=your-tastytrade-client-id
TASTY_CLIENT_SECRET=your-tastytrade-client-secret
TASTY_ACCOUNT_ID=your-tastytrade-account-id

# Schwab (Equity Market Data + Broker)
SCHWAB_CLIENT_ID=your-schwab-client-id
SCHWAB_CLIENT_SECRET=your-schwab-client-secret
SCHWAB_REDIRECT_URI=https://localhost:8000/api/brokers/schwab/callback

# Paper Trading (Optional)
PAPER_TRADES_BACKUP=false  # Set to 'true' to enable file backups of paper trades

# Logging
LOG_LEVEL=INFO  # DEBUG, INFO, WARNING, ERROR, CRITICAL

Strategy Configuration

Strategies are configured via ticker configurations stored in Redis. Each symbol/strategy combination has its own configuration:

{
  "symbol": "/NQ",
  "strategy": "ema",
  "timeframe": "144t",
  "trade_enabled": true,
  "schwab_quantity": 10,
  "tastytrade_quantity": 1,
  "period_1": 9,
  "period_2": 21,
  "trend_line_1": "EMA",
  "trend_line_2": "EMA"
}

Configuration Fields:

  • symbol: Trading symbol (e.g., "/NQ" for futures, "SPY" for equities)
  • strategy: Strategy name (ema, supertrend, etc.)
  • timeframe: Bar timeframe (1, 5, 144t, 512t, etc.)
  • trade_enabled: Enable/disable trading for this symbol
  • schwab_quantity: Number of shares/contracts to trade on Schwab
  • tastytrade_quantity: Number of contracts to trade on TastyTrade
  • period_1, period_2: Strategy-specific parameters (e.g., EMA periods)
  • trend_line_1, trend_line_2: MA type (EMA, SMA, WMA)

Paper vs Live Mode:

  • Paper mode (live_mode=false): Uses PaperBroker, simulates all trades
  • Live mode (live_mode=true): Uses real brokers (Schwab/TastyTrade), places actual orders

IMPORTANT: Always test strategies in paper mode first before enabling live trading!

Running the Application

Quick Start (Development)

# Using the startup script
./start_api.sh

# Or manually with uvicorn
uvicorn main:app --reload --port 8000

Production Mode with PM2

# Start backend with PM2
pm2 start main.py --name backend --interpreter python3 -- -m uvicorn main:app --host 0.0.0.0 --port 8000

# Start frontend (if applicable)
cd ../fortified-trader-frontend
pm2 start npm --name frontend -- run dev

# View logs
pm2 logs backend
pm2 logs frontend

# Monitor processes
pm2 monit

# Save PM2 process list
pm2 save

# Setup PM2 to start on boot
pm2 startup

API Documentation

Once running, access the interactive API docs:

Tick Aggregation System

Multi-Timeframe Support

The system supports running multiple timeframes for the same symbol simultaneously. For example, /NQ can have both:

  • Time-based: 1 (1-minute bars), 5 (5-minute bars)
  • Tick-based: 144t (144-tick bars), 512t (512-tick bars)

Each timeframe gets its own aggregator and Redis stream, allowing strategies to consume data at different granularities.

Aggregator Types

TimeBarAggregator (Time-Based)

  • Timeframes: 1, 5, 15, 30, 1h, 4h, 1d
  • Method: Bucket-based time windowing
  • Bar Completion: At the end of each time period
  • Use Case: Traditional time-based trading strategies

Example: 1-minute aggregator

08:02:00 - 08:02:59 → Accumulate ticks → 08:03:00 → Publish bar

TickBarAggregator (Tick-Based)

  • Timeframes: 144t, 512t, 1000t (any number + 't')
  • Method: Buffer-based tick counting
  • Bar Completion: After N ticks received
  • Use Case: Volume-based or tick-based trading strategies

Example: 144-tick aggregator

Tick 1-143 → Buffer accumulates → Tick 144 → Publish bar → Clear buffer

Real-time Progress:

  • Publishes partial bar updates to ticks_stream after each tick
  • Includes buffer fill percentage (e.g., "90/144 = 62.5% full")
  • Allows strategies to see intra-bar price action

Redis Stream Architecture

Stream Keys:

bars_stream:/NQ:1        # Completed 1-minute bars
bars_stream:/NQ:144t     # Completed 144-tick bars
ticks_stream:/NQ:1       # Partial 1-minute bar updates
ticks_stream:/NQ:144t    # Partial 144-tick bar updates

Consumer Groups: Each strategy creates its own consumer group for each stream:

Consumer Group: "ema"
  ├─ bars_stream:/NQ:1 → ema//NQ_timestamp
  └─ bars_stream:/NQ:144t → ema//NQ_timestamp

Benefits:

  • No duplicate processing across strategies
  • Each strategy can consume at its own pace
  • Automatic message acknowledgment
  • Can reset to any point in stream

Usage

1. Authentication

# Login to get JWT token
curl -X POST http://localhost:8000/api/login \
  -H "Content-Type: application/json" \
  -d '{"email": "admin@example.com", "password": "your-password"}'

2. Broker Authorization

# Get Schwab authorization URL
curl -X GET http://localhost:8000/api/brokers/schwab/authorize \
  -H "Authorization: Bearer YOUR_JWT_TOKEN"

# Exchange authorization code for tokens
curl -X POST http://localhost:8000/api/brokers/schwab/token \
  -H "Authorization: Bearer YOUR_JWT_TOKEN" \
  -H "Content-Type: application/json" \
  -d '{"code": "AUTH_CODE"}'

3. Ticker Management

# Get tickers for a strategy
curl -X GET http://localhost:8000/api/tickers/triple_ema \
  -H "Authorization: Bearer YOUR_JWT_TOKEN"

# Update ticker configuration
curl -X POST http://localhost:8000/api/tickers \
  -H "Authorization: Bearer YOUR_JWT_TOKEN" \
  -H "Content-Type: application/json" \
  -d '{
    "strategy": "triple_ema",
    "ticker": "SPY",
    "config": {
      "timeframe": "5Min",
      "quantity": 100,
      "enabled": true
    }
  }'

4. Trading Control

# Start trading in paper mode (default - safe for testing)
curl -X GET "http://localhost:8000/api/start-trading?strategy=ema&live_mode=false" \
  -H "Authorization: Bearer YOUR_JWT_TOKEN"

# Start trading in LIVE mode (real money!)
curl -X GET "http://localhost:8000/api/start-trading?strategy=ema&live_mode=true" \
  -H "Authorization: Bearer YOUR_JWT_TOKEN"

# Stop trading
curl -X GET "http://localhost:8000/api/stop-trading?strategy=ema" \
  -H "Authorization: Bearer YOUR_JWT_TOKEN"

# Get trading status
curl -X GET "http://localhost:8000/api/trading-status?strategy=ema" \
  -H "Authorization: Bearer YOUR_JWT_TOKEN"

# Get positions
curl -X GET "http://localhost:8000/api/positions?strategy=ema" \
  -H "Authorization: Bearer YOUR_JWT_TOKEN"

5. Trade History

# Get all trades (paper + real) with pagination
curl -X GET "http://localhost:8000/api/trades?strategy=ema&page=1&page_size=50" \
  -H "Authorization: Bearer YOUR_JWT_TOKEN"

# Filter by trade type (paper or real)
curl -X GET "http://localhost:8000/api/trades?strategy=ema&trade_type=paper" \
  -H "Authorization: Bearer YOUR_JWT_TOKEN"

# Filter by symbol and broker
curl -X GET "http://localhost:8000/api/trades?strategy=ema&symbol=/NQ&broker=tastytrade" \
  -H "Authorization: Bearer YOUR_JWT_TOKEN"

# Filter by date range
curl -X GET "http://localhost:8000/api/trades?strategy=ema&start_date=2025-01-01&end_date=2025-01-31" \
  -H "Authorization: Bearer YOUR_JWT_TOKEN"

# Get paper trades with pagination
curl -X GET "http://localhost:8000/api/paper-trades?strategy=ema&page=1&page_size=100" \
  -H "Authorization: Bearer YOUR_JWT_TOKEN"

# Get paper trade statistics
curl -X GET "http://localhost:8000/api/paper-trades/stats?strategy=ema" \
  -H "Authorization: Bearer YOUR_JWT_TOKEN"

# Get real trades
curl -X GET "http://localhost:8000/api/real-trades?strategy=ema" \
  -H "Authorization: Bearer YOUR_JWT_TOKEN"

6. Manual Triggers

# Manual trigger for ZeroDay strategy
curl -X POST http://localhost:8000/api/manual-trigger \
  -H "Authorization: Bearer YOUR_JWT_TOKEN" \
  -H "Content-Type: application/json" \
  -d '{
    "ticker": "SPX",
    "action": "long"
  }'

# Manual trigger for EMA strategy
curl -X POST http://localhost:8000/api/ema-manual-trigger \
  -H "Authorization: Bearer YOUR_JWT_TOKEN" \
  -H "Content-Type: application/json" \
  -d '{
    "ticker": "SPY",
    "action": "short"
  }'

Project Structure

fortified-trader-backend/
├── main.py                     # FastAPI application entry point
├── start_api.sh                # Startup script
├── requirements.txt            # Python dependencies
├── .env.example                # Environment variables template
├── utils.py                    # General utilities
├── clear_trade_states.py       # Trade state cleanup script
│
├── api/                        # API routes and endpoints
│   ├── auth.py                 # Authentication routes
│   ├── brokers.py              # Broker integration routes
│   ├── tickers.py              # Ticker management routes
│   ├── trading.py              # Trading control routes (start/stop, trades history)
│   ├── charts.py               # Chart data WebSocket endpoints
│   ├── websocket.py            # Strategy WebSocket endpoints
│   ├── websocket_manager.py    # WebSocket connection manager
│   └── dependencies.py         # Shared dependencies (auth, etc.)
│
├── core/                       # Core trading engine
│   │
│   ├── orchestration/          # Orchestration layer
│   │   ├── trading_manager.py       # Top-level strategy orchestrator
│   │   ├── strategy_runner.py       # Individual strategy executor
│   │   ├── aggregation_manager.py   # Data flow orchestration
│   │   ├── trade_state_manager.py   # Position state management
│   │   ├── config_loader.py         # Strategy/symbol configuration
│   │   └── backfill_service.py      # Historical data warmup
│   │
│   ├── aggregation/            # Tick-to-bar aggregation
│   │   ├── base_aggregator.py           # Abstract base class
│   │   ├── time_bar_aggregator.py       # Time-based (1min, 5min)
│   │   ├── tick_bar_aggregator.py       # Tick-based (144t, 512t)
│   │   └── bar_passthrough_aggregator.py # Passthrough for pre-aggregated
│   │
│   ├── brokers/                # Broker API integrations
│   │   ├── base_broker.py           # Abstract base class
│   │   ├── tastytrade_broker.py     # TastyTrade futures broker
│   │   ├── schwab_broker.py         # Schwab equity broker
│   │   └── paper_broker.py          # Paper trading simulator
│   │
│   ├── data_sources/           # Market data providers
│   │   ├── base_data_source.py                # Base classes (Tick, Bar)
│   │   ├── tastytrade_dxfeed_data_source.py   # DXFeed WebSocket
│   │   ├── schwab_data_source.py              # Schwab WebSocket
│   │   ├── replay_data_source.py              # Historical replay for testing
│   │   ├── databento_data_source.py           # DEPRECATED
│   │   ├── futures_contracts.py               # Futures symbol resolution
│   │   └── schwab_auth_helper.py              # OAuth2 utilities
│   │
│   ├── indicators/             # Technical indicators
│   │   ├── base_indicator.py        # Abstract base class
│   │   ├── ema.py                   # Exponential Moving Average
│   │   ├── triple_ema.py            # Triple EMA
│   │   └── zigzag_highlow.py        # ZigZag high/low
│   │
│   ├── strategies/             # Trading strategies
│   │   ├── base_strategy.py             # Abstract strategy framework
│   │   ├── ema_crossover_strategy.py    # EMA crossover strategy
│   │   └── triple_ema_zigzag_strategy.py # Triple EMA + ZigZag strategy
│   │
│   ├── execution/              # Trade execution
│   │   ├── tick_level_executor.py   # Tick-level execution
│   │   └── stop_reversal_handler.py # Stop loss and reversals
│   │
│   ├── tracking/               # Order tracking
│   │   └── order_tracking_service.py # Order lifecycle tracking
│   │
│   ├── storage/                # Persistent storage
│   │   ├── redis_streams.py         # Redis Streams/ZSET for bars
│   │   ├── redis_trade_state.py     # Position state persistence
│   │   ├── redis_paper_trades.py    # Paper trade history
│   │   ├── redis_real_trades.py     # Real trade history
│   │   └── redis_singletons.py      # Singleton storage instances
│   │
│   └── utils/                  # Core utilities
│       └── timezone.py         # Timezone conversions
│
├── scripts/                    # Utility scripts
│   ├── get_schwab_tokens.py            # OAuth token helper
│   ├── refresh_tastytrade_token.py     # TastyTrade token refresh
│   ├── tastytrade_client_credentials.py # TastyTrade auth setup
│   └── test_*.py                       # Various test scripts
│
├── testing/                    # Integration tests
│   ├── test_1_aggregator_creation.py    # Aggregator tests
│   ├── test_2_real_dxfeed_connection.py # DXFeed tests
│   ├── test_4_real_redis_persistence.py # Redis tests
│   ├── test_5_mixed_symbols_real.py     # Multi-symbol tests
│   ├── test_6_schwab_equity_real.py     # Schwab tests
│   └── test_7_historical_warmup_real.py # Backfill tests
│
├── tokens/                     # Broker tokens (gitignored)
│   ├── schwab_tokens.json
│   └── tastytrade_tokens.json
│
├── credentials/                # API credentials (gitignored)
├── paper_trades/               # Paper trade backups (gitignored)
├── trade_states/               # Trade state backups (gitignored)
└── venv/                       # Python virtual environment

Development

Adding a New Strategy

  1. Create strategy class in core/strategies/:
from core.strategies.base_strategy import BaseStrategy, StrategyConfig
from core.brokers.base_broker import OrderSide
from typing import Optional, List, Dict, Any

class MyStrategy(BaseStrategy):
    def __init__(self, config: StrategyConfig, logger):
        super().__init__(config, logger)
        # Initialize indicators
        # self.indicator = MyIndicator(period=config.parameters.get('period', 14))

    def warmup_periods(self) -> int:
        """Return number of bars needed for warmup"""
        return 50  # Adjust based on your indicators

    def on_new_bar(self, bar: Dict[str, Any], current_position: Optional[Any]) -> List[Dict]:
        """Called on each new bar - main strategy logic"""
        # Update internal data
        # Calculate indicators
        # Generate signals
        # Return list of orders (or empty list)

        orders = []

        # Example: Entry logic
        if self.should_enter_long(bar) and not current_position:
            orders.append({
                'symbol': bar['symbol'],
                'side': OrderSide.BUY,
                'reason': 'entry_long'
            })

        # Example: Exit logic
        if self.should_exit_long(bar) and current_position:
            orders.append({
                'symbol': bar['symbol'],
                'side': OrderSide.SELL,
                'reason': 'exit_long'
            })

        return orders

    def on_tick(self, tick: Dict[str, Any], current_position: Optional[Any]) -> List[Dict]:
        """Optional: Called on each tick for intra-bar execution"""
        # Implement tick-level logic if needed
        return []
  1. Register strategy in core/orchestration/strategy_runner.py _create_strategy() method
  2. Create ticker configuration with strategy-specific parameters
  3. Test in paper mode first!

Adding a New Broker

  1. Create broker class in core/brokers/:
from core.brokers.base_broker import BaseBroker

class MyBroker(BaseBroker):
    async def authorize(self):
        """OAuth flow"""
        pass

    async def place_order(self, symbol, quantity, side):
        """Place order"""
        pass

    async def get_positions(self):
        """Fetch positions"""
        pass
  1. Add broker routes in api/brokers.py
  2. Update environment variables in .env.example

Testing Strategies

Paper Trading Workflow:

  1. Configure strategy in Redis via /api/tickers endpoint
  2. Start strategy in paper mode: live_mode=false
  3. Monitor paper trades via /api/paper-trades
  4. Review P&L and statistics via /api/paper-trades/stats
  5. Analyze trade history with filters
  6. Adjust strategy parameters as needed
  7. Only enable live mode after thorough testing

Historical Replay Testing:

# Use ReplayDataSource to test strategy with historical data
from core.data_sources.replay_data_source import ReplayDataSource

# Load historical bars from Redis
replay_source = ReplayDataSource(
    symbol="/NQ",
    timeframe="144t",
    start_time="2025-01-01T09:30:00",
    end_time="2025-01-01T16:00:00"
)

# Run strategy with replay data
# (See testing/test_7_historical_warmup_real.py for examples)

Integration Tests:

# Run integration tests
cd testing/
python test_1_aggregator_creation.py      # Test aggregators
python test_4_real_redis_persistence.py    # Test Redis storage
python test_7_historical_warmup_real.py    # Test backfill and replay

Data Flow

1. Market Data Ingestion

Futures (TastyTrade DXFeed):

DXFeed WebSocket → Trade/Quote Events → Tick Objects → Aggregators

Equities (Schwab):

Schwab WebSocket → 1-second Bars → Tick Objects → Aggregators

2. Tick Aggregation

AggregationManager orchestrates:

  • Creates aggregators based on timeframe type:
    • TimeBarAggregator for time-based (1min, 5min, 15min)
    • TickBarAggregator for tick-based (144t, 512t)
  • Routes incoming ticks to appropriate aggregators
  • Supports multi-timeframe: single symbol can have multiple timeframes

TimeBarAggregator:

Ticks → Time bucket (1min window) → OHLCV bar → Redis Stream

TickBarAggregator:

Ticks → Buffer (144 ticks) → OHLCV bar → Redis Stream
         ↓
    Partial updates → Tick Stream (real-time progress)

3. Redis Streams

Published Streams:

  • bars_stream:{symbol}:{timeframe} - Completed bars
  • ticks_stream:{symbol}:{timeframe} - Partial bar updates

Consumer Groups:

  • Each strategy has its own consumer group (e.g., "ema")
  • Strategies consume from multiple streams simultaneously
  • Consumer group ensures no duplicate processing

4. Strategy Execution

Trading Manager → Strategy Runner Flow:

  1. Trading Manager creates Strategy Runner instances (one per symbol)
  2. Strategy Runner subscribes to Redis bar and tick streams
  3. Receives new bars from stream via consumer group
  4. Feeds bar to Strategy instance
  5. Strategy calculates indicators and generates signals
  6. Strategy returns list of orders
  7. Strategy Runner routes orders to appropriate brokers (paper or live)
  8. Brokers execute orders and update Trade State Manager
  9. Trade history saved to Redis (paper or real)
  10. WebSocket manager broadcasts updates to connected clients

Position State Management:

  • Unified position tracking via Trade State Manager
  • Position state stored in Redis per symbol/strategy/broker
  • Supports multiple brokers per strategy
  • Validates position exists before exit orders
  • Tracks: side (LONG/SHORT/FLAT), quantity, average price

Paper vs Live Execution:

  • Paper mode: PaperBroker simulates fills, logs API requests that would be sent
  • Live mode: Real brokers (Schwab/TastyTrade) execute actual orders
  • Both modes use same Trade State Manager for consistency

5. Real-Time Updates

WebSocket Manager:

  • Publishes strategy updates to connected clients
  • Broadcasts trade execution events
  • Streams chart data for multi-timeframe visualization

Security Considerations

  • JWT Tokens: Change JWT_SECRET in production
  • Broker Tokens: Stored in tokens/ directory (gitignored)
  • API Keys: Never commit .env file
  • CORS: Configure allow_origins in production
  • HTTPS: Use reverse proxy (nginx) with SSL in production

Monitoring and Logging

  • Health Check: /api/health endpoint checks Redis connectivity
  • Logs: Configured via LOG_LEVEL environment variable
  • API Metrics: Available at /api/docs (Swagger metrics)

Troubleshooting

Redis Connection Issues

# Check if Redis is running
redis-cli ping

# Check Redis logs
sudo journalctl -u redis -f

Broker Token Expiry

# Refresh tokens manually
curl -X POST http://localhost:8000/api/brokers/schwab/refresh \
  -H "Authorization: Bearer YOUR_JWT_TOKEN"

Market Data Issues

TastyTrade DXFeed:

  • Verify TASTY_CLIENT_ID and TASTY_CLIENT_SECRET are valid
  • Check token refresh in logs: tail -f ~/.pm2/logs/backend-error.log | grep "token refresh"
  • Verify DXFeed connection: Look for "Connected to DXLink" in logs
  • Check subscription: Look for "Using Trade/Quote subscription" in logs

Schwab WebSocket:

  • Verify Schwab tokens are valid and not expired
  • Check WebSocket connection in logs
  • Refresh Schwab tokens if needed

Aggregation Issues

Check Redis Streams:

# List all streams
redis-cli KEYS "*stream*"

# Check stream length
redis-cli XLEN bars_stream:/NQ:1

# View recent bars
redis-cli XREVRANGE bars_stream:/NQ:1 + - COUNT 5

# Check consumer groups
redis-cli XINFO GROUPS bars_stream:/NQ:1

Check Aggregator Status:

# View aggregation logs
pm2 logs backend | grep -E "(Aggregator|Buffer|BAR PUBLISHED)"

# Check for routing issues
pm2 logs backend | grep "Routing Tick"

Key Architectural Decisions

1. Paper Trading Architecture

Decision: Implement full paper trading broker that simulates order execution and tracks positions in Redis.

Rationale:

  • Safe environment for strategy testing without risk
  • Logs what API requests WOULD be sent to real broker (educational)
  • Uses same Trade State Manager as live brokers (consistency)
  • Calculates realistic P&L for performance analysis
  • Enables rapid iteration and debugging

Implementation:

  • PaperBroker simulates fills at current market price
  • Stores paper trades in Redis with full bar snapshots
  • Supports both Schwab and TastyTrade mode simulation
  • No actual broker API calls made

2. Position State Management in Redis

Decision: Centralize all position tracking in Redis via Trade State Manager, eliminating in-memory position tracking.

Rationale:

  • Single source of truth for positions across all brokers
  • Survives application restarts
  • Enables position querying from API
  • Validates state before trades (prevents invalid exits)
  • Supports multi-broker strategies (e.g., Schwab + TastyTrade)

Benefits:

  • No position drift between broker and internal state
  • Easy debugging via Redis CLI
  • Historical position tracking
  • Consistent state across paper and live modes

3. Trading Manager vs Strategy Runner Separation

Decision: Split orchestration into Trading Manager (multi-strategy) and Strategy Runner (single symbol).

Rationale:

  • Trading Manager handles lifecycle (start/stop all symbols)
  • Strategy Runner focuses on single symbol execution
  • Enables parallel execution of multiple symbols
  • Clean separation of concerns
  • Easy to scale (one Strategy Runner per symbol)

Flow:

Trading Manager
  ├─ Strategy Runner (/NQ, 144t)
  ├─ Strategy Runner (/ES, 512t)
  └─ Strategy Runner (SPY, 1)

4. Historical Replay Design

Decision: Create ReplayDataSource that loads bars from Redis and simulates real-time flow.

Rationale:

  • Test strategies with historical data
  • Validates strategy logic before live trading
  • Reproducible testing (same data, same results)
  • Fast testing (no waiting for market hours)
  • Supports backfill service integration

Use Cases:

  • Strategy validation
  • Parameter optimization
  • Historical P&L analysis
  • Regression testing

5. Manual Tick Aggregation vs Pre-Aggregated Bars

Decision: Use Trade/Quote events and manually aggregate ticks instead of subscribing to pre-aggregated Candle events from DXFeed.

Rationale:

  • Full control over aggregation logic and timeframes
  • Single subscription per symbol regardless of number of timeframes
  • Simpler codebase - no need for special handling of pre-aggregated data
  • Easier to debug and verify bar calculations
  • Supports custom timeframes not available from data provider

Trade-offs:

  • Slightly higher computational cost for aggregation
  • Need to handle tick buffering and bar completion logic
  • More complex aggregator implementations

6. Redis Streams vs Pub/Sub

Decision: Use Redis Streams with consumer groups instead of traditional Pub/Sub.

Rationale:

  • Message persistence - can replay messages if needed
  • Consumer groups prevent duplicate processing across strategies
  • Each strategy can consume at its own pace
  • Can reset to any point in stream for debugging
  • Message acknowledgment ensures delivery

Trade-offs:

  • Slightly more complex than simple Pub/Sub
  • Need to manage consumer groups and stream trimming

7. Multi-Timeframe Architecture

Decision: Allow single symbol to have multiple timeframes simultaneously, each with its own aggregator and stream.

Rationale:

  • Strategies can use multiple timeframes for better signals (e.g., 1-min for entries, 144t for exits)
  • Single data source feeds all timeframes efficiently
  • Each timeframe isolated in its own stream
  • Easy to add/remove timeframes without affecting others

8. Broker-Specific Quantities

Decision: Configure separate quantities for each broker (Schwab, TastyTrade) at strategy start, never change during execution.

Rationale:

  • Different brokers may have different position sizes (e.g., 10 shares vs 1 contract)
  • Strategy logic focuses on signals, not position sizing
  • Strategy Runner handles broker-specific quantity routing
  • Both entry and exit orders use same configured quantities

Example:

Strategy generates: BUY signal
Strategy Runner executes:
  - Schwab: BUY 10 shares
  - TastyTrade: BUY 1 contract

Performance Considerations

  • Tick Rate: System handles 10-100+ ticks/second per symbol
  • Latency: Sub-100ms from tick receipt to bar publication
  • Memory: Buffer sizes limited to prevent memory leaks
  • Redis: Stream trimming (maxlen=1000) prevents unbounded growth
  • Concurrency: Async/await throughout for non-blocking I/O

License

Proprietary - All rights reserved

Support

For issues and questions, contact the development team or consult the API documentation at /api/docs.

About

Fortified is a production-grade automated trading platform built with FastAPI that enables systematic trading across multiple brokers with real-time market data integration. The system supports both equity and futures trading with advanced execution features including tick-level precision, multi-timeframe support, and manual tick aggregation.

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published