-
Notifications
You must be signed in to change notification settings - Fork 0
INTER_SHARD_DATA_PIPELINE_ANALYSIS
Version: 1.0.0
Release: v1.3.0
Datum: 17. Dezember 2025
Kategorie: RPC, Sharding, Data Pipeline, Security
Diese Analyse untersucht die Inter-Shard Datenpipeline für ThemisDB mit Fokus auf:
- RocksDB Data Dumps - Bulk-Transfer von Datenbank-Snapshots
- LoRa Adapter Transfer - Transfer von LLM-Adaptern (LoRA - Low-Rank Adaptation)
- Komprimierung & Chunking - Effiziente Package-basierte Datenübertragung
- mTLS Security - Sichere Kommunikation zwischen Shards
WAL Shipper (include/sharding/wal_shipper.h)
- ✅ Asynchrone Replikation von WAL-Einträgen
- ✅ Compression Support (LZ4, Zstd)
- ✅ Batch Processing (100-1000 Einträge)
- ✅ mTLS für sichere Übertragung
- ✅ Adaptive Batching basierend auf Netzwerkbedingungen
Data Migrator (include/sharding/data_migrator.h)
- ✅ Token-Range basierte Migration
- ✅ Batch-Processing (1000 Records default)
- ✅ Data Integrity Verification (Hash-basiert)
- ✅ Idempotency Support
- ❌ Keine Compression
- ❌ Keine RocksDB Snapshot Support
mTLS Client (include/sharding/mtls_client.h)
- ✅ Mutual TLS für Shard-zu-Shard Kommunikation
- ✅ Certificate-based Authentication
- ✅ Connection Pooling
- ✅ Retry Logic mit Exponential Backoff
-
Keine RocksDB Snapshot Transfer:
- Data Migrator verwendet Record-by-Record Transfer
- Ineffizient für große Datenmengen (z.B. 100+ GB Shards)
- Hohe CPU-Last durch Serialisierung/Deserialisierung
-
Keine LoRa Adapter Support:
- Keine spezielle Behandlung von Binärdaten
- LLM-Adapter (LoRA) können mehrere GB groß sein
- Benötigt effizienten Blob-Transfer
-
Inkonsistente Compression:
- WAL Shipper hat Compression
- Data Migrator hat keine Compression
- Protobuf Messages haben keine Compression
-
Unzureichendes Chunking:
- Chunking nur in ReplicateDataStream
- Keine konfigurierbaren Chunk-Größen
- Keine Chunk-Checksums
ThemisDB unterstützt bereits:
- ✅ Checkpoints - Filesystem-level Snapshots
- ✅ Incremental Backups - Delta-Backups seit letztem Backup
Verfügbare APIs:
// In rocksdb_wrapper.h
bool createCheckpoint(const std::string& checkpoint_dir);
bool restoreFromCheckpoint(const std::string& checkpoint_dir);
bool createIncrementalBackup(const std::string& backup_dir, bool flush_before_backup = true);Small Shards (<10 GB):
- Record-by-Record Transfer mit Compression
- Nutzt bestehenden Data Migrator
Large Shards (>10 GB):
- RocksDB Checkpoint → Tar/Compress → Stream → Extract
- 10-20x schneller als Record-by-Record
Implementierungsvorschlag:
// Enhanced Migration Strategy
enum class MigrationStrategy {
RECORD_BY_RECORD, // Existing: For small data
ROCKSDB_SNAPSHOT, // New: For large data
HYBRID // New: Snapshot + WAL catchup
};
struct EnhancedMigrationRequest {
MigrationStrategy strategy = MigrationStrategy::HYBRID;
uint64_t token_range_start;
uint64_t token_range_end;
// Compression settings
CompressionType compression = CompressionType::Zstd;
int compression_level = 6; // Higher for bulk transfers
// Chunking settings
uint64_t chunk_size_bytes = 50 * 1024 * 1024; // 50 MB chunks
bool enable_chunk_checksums = true;
// RocksDB specific
bool include_wal = true; // Include WAL for consistency
bool incremental = false; // Incremental vs full snapshot
};Source Shard Network Target Shard
┌─────────────┐ ┌─────────────┐
│ │ │ │
│ 1. Create │ │ │
│ Checkpoint │ │ │
│ ↓ │ │ │
│ 2. Tar + │ │ │
│ Compress │ │ │
│ ↓ │ │ │
│ 3. Split │ │ │
│ into Chunks │ │ │
│ ↓ │ │ │
│ 4. Calculate├──> Chunk 1 (50MB) + Checksum ──mTLS──────>│ 5. Verify │
│ Checksums │ │ Checksum │
│ ├──> Chunk 2 (50MB) + Checksum ──mTLS──────>│ ↓ │
│ │ │ 6. Write to │
│ ├──> Chunk N (last) + Checksum ──mTLS──────>│ Temp Dir │
│ │ │ ↓ │
│ │<──── Verify Complete ──────────────────────┤ 7. Extract │
│ │ │ & Verify │
│ │ │ ↓ │
│ │ │ 8. Restore │
│ │ │ Checkpoint │
└─────────────┘ └─────────────┘
Vorteile:
- ✅ 10-20x schneller für große Shards
- ✅ Geringere CPU-Last (keine Serialisierung)
- ✅ Konsistente Snapshots
- ✅ Unterstützt Incremental Migration
Was sind LoRA Adapter?
- Fine-tuned Gewichts-Matrizen für LLM-Modelle
- Typische Größe: 100 MB - 10 GB
- Format: Safetensors, PyTorch, GGUF
- Werden im Blob Storage gespeichert
Use Cases:
- Multi-Tenant LLM mit shard-spezifischen Adaptern
- Deployment von neuen Modellen zwischen Shards
- Backup/Restore von LLM-Konfigurationen
Eigenschaften:
├── Große Binärdateien (100 MB - 10 GB)
├── Unveränderlich (immutable)
├── Selten geändert
├── Hohe Compression-Rate (2-5x mit Zstd)
└── Benötigt Chunk-basierter Transfer
Optimaler Ansatz: Blob Storage Integration
// Enhanced Blob Transfer for LoRA
message BlobTransferRequest {
string blob_id = 1; // UUID des Blobs (LoRA Adapter)
string blob_type = 2; // "lora_adapter", "llm_model", "embedding"
uint64 blob_size_bytes = 3; // Gesamtgröße
string checksum_sha256 = 4; // Blob-Checksum
// Chunking configuration
uint64 chunk_size_bytes = 5; // Chunk-Größe (50-100 MB)
CompressionType compression = 6;
int compression_level = 7;
}
message BlobChunk {
string blob_id = 1;
uint32 chunk_index = 2;
uint32 total_chunks = 3;
bytes data = 4; // Compressed chunk data
string checksum_crc32 = 5; // Chunk checksum
bool is_last = 6;
// Metadata
uint64 uncompressed_size = 7;
uint64 compressed_size = 8;
}
message BlobTransferResponse {
bool success = 1;
uint32 chunks_received = 2;
string error = 3;
uint64 total_bytes_received = 4;
}Transfer Flow:
Shard A (Source) Shard B (Target)
┌───────────────────┐ ┌───────────────────┐
│ 1. Read LoRA │ │ │
│ from Blob Store │ │ │
│ ↓ │ │ │
│ 2. Calculate │ │ │
│ SHA256 Checksum │ │ │
│ ↓ │ │ │
│ 3. Compress │ │ │
│ (Zstd Level 9) │ │ │
│ ↓ │ │ │
│ 4. Split into │ │ │
│ 50MB Chunks │ │ │
│ ↓ │ │ │
│ 5. For each chunk:│ │ │
│ - Compress │ │ │
│ - CRC32 │ │ │
│ ↓ │ │ │
├───────────────────┤──> BlobChunk #1 ──mTLS gRPC──>├───────────────────┤
│ Send Chunk #1 │ │ 6. Verify CRC32 │
│ │<── ACK ───────────────────────<│ 7. Write to Temp │
├───────────────────┤──> BlobChunk #2 ──mTLS gRPC──>├───────────────────┤
│ Send Chunk #2 │ │ 8. Accumulate │
│ │<── ACK ───────────────────────<│ │
│ ... │ ... │ ... │
├───────────────────┤──> BlobChunk #N ──mTLS gRPC──>├───────────────────┤
│ Send Chunk #N │ │ 9. Verify SHA256 │
│ (is_last=true) │ │ 10. Decompress │
│ │<── Final Response ────────────<│ 11. Write to │
│ │ │ Blob Store │
└───────────────────┘ └───────────────────┘
Zstd (Zstandard) - Empfohlen für Bulk-Daten:
- Compression Ratio: 2-5x für strukturierte Daten
- Geschwindigkeit: ~500 MB/s (compression), ~1.5 GB/s (decompression)
- Konfigurierbare Levels: 1-22
- Ideal für: RocksDB Snapshots, LoRA Adapter
LZ4 - Empfohlen für WAL/Replication:
- Compression Ratio: 2-3x
- Geschwindigkeit: ~1 GB/s (compression), ~3 GB/s (decompression)
- Sehr geringe CPU-Last
- Ideal für: Real-time WAL Streaming
Vergleich:
| Use Case | Algorithm | Level | Compression Ratio | CPU | Latenz |
|---|---|---|---|---|---|
| WAL Replication | LZ4 | 3 | 2.5x | Niedrig | <1ms |
| Data Migration | Zstd | 6 | 3-4x | Mittel | ~10ms |
| RocksDB Snapshot | Zstd | 9 | 4-5x | Hoch | ~50ms |
| LoRA Adapter | Zstd | 12 | 3-6x | Sehr Hoch | ~200ms |
Warum Chunking?
- Memory Efficiency - Keine großen Payloads im RAM
- Resume Capability - Transfer kann fortgesetzt werden
- Parallel Transfer - Mehrere Chunks gleichzeitig
- Error Isolation - Nur fehlerhafte Chunks neu senden
- Progress Tracking - Granulare Fortschrittsanzeige
Optimale Chunk-Größen:
Data Type Chunk Size Begründung
─────────────────────────────────────────────────────────────
WAL Entries 1-5 MB Niedrige Latenz, häufige Updates
Entity Records 10-20 MB Balance zwischen Overhead und Effizienz
RocksDB Snapshot 50-100 MB Großer Durchsatz, weniger HTTP/2 Frames
LoRA Adapters 50-100 MB Große Dateien, hohe Compression
Implementierung:
struct ChunkingConfig {
uint64_t chunk_size_bytes = 50 * 1024 * 1024; // 50 MB default
bool enable_checksums = true;
ChecksumType checksum_type = ChecksumType::CRC32; // CRC32 für Chunks
// Parallel transfer
uint32_t max_parallel_chunks = 4; // Max 4 Chunks gleichzeitig
bool enable_parallel_transfer = false; // Standardmäßig sequentiell
// Resume support
bool enable_resume = true;
string resume_token; // Token für Resume
};
enum class ChecksumType {
CRC32, // Schnell, 4 Bytes, gut für Chunks
SHA256, // Langsam, 32 Bytes, gut für Blobs
XXH64 // Sehr schnell, 8 Bytes, Alternative zu CRC32
};Chunk Package Structure:
┌─────────────────────────────────────────────────────────┐
│ Chunk Package │
├──────────────┬──────────────────────────────────────────┤
│ Header │ - Magic Number: 0x544D4442 ("TMDB") │
│ (64 bytes) │ - Version: 1 │
│ │ - Chunk Index: 0-N │
│ │ - Total Chunks: N │
│ │ - Compression: Zstd/LZ4/None │
│ │ - Checksum Type: CRC32/SHA256/XXH64 │
│ │ - Uncompressed Size: uint64 │
│ │ - Compressed Size: uint64 │
├──────────────┼──────────────────────────────────────────┤
│ Payload │ - Compressed Data │
│ (variable) │ - Size: compressed_size bytes │
├──────────────┼──────────────────────────────────────────┤
│ Footer │ - Checksum: 4-32 bytes │
│ (4-32 bytes)│ - Padding to 8-byte alignment │
└──────────────┴──────────────────────────────────────────┘
Shard Certificates:
- X.509 Zertifikate mit Custom Extensions
-
shard_id- Eindeutige Shard-Identifikation -
capabilities- Berechtigungen (read, write, replicate, migrate) -
token_range_start/end- Zugewiesene Token-Ranges
Certificate Verification Flow:
Client Shard (A) Server Shard (B)
┌─────────────────┐ ┌─────────────────┐
│ 1. TLS Handshake├──────── ClientHello ──────>│ │
│ │ │ 2. Present │
│ │<─── ServerHello + Cert ────┤ Server Cert │
│ │ │ │
│ 3. Verify │ │ │
│ Server Cert │ │ │
│ - Signed by CA │ │ │
│ - Not revoked │ │ │
│ - Valid dates │ │ │
│ │ │ │
│ 4. Present ├─── Client Cert + Key ────>│ │
│ Client Cert │ │ 5. Verify │
│ │ │ Client Cert │
│ │ │ - Signed by CA │
│ │ │ - Valid shard_id│
│ │ │ - Has capability│
│ │ │ │
│ │<─── TLS Established ───────┤ │
│ │ │ │
│ 6. Parse Cert │ │ 6. Parse Cert │
│ Extensions: │ │ Extensions: │
│ - shard_id="A" │ │ - shard_id="B" │
│ - caps=[migrate]│ │ - caps=[accept] │
│ │ │ │
│ 7. Encrypt Data │ │ │
│ with TLS 1.3 │ │ │
│ ├─── Encrypted Payload ─────>│ 8. Decrypt & │
│ │ │ Process │
└─────────────────┘ └─────────────────┘
End-to-End Security:
Source Shard Target Shard
┌─────────────────────┐ ┌─────────────────────┐
│ 1. Data Preparation │ │ │
│ ├─ Read from RocksDB│ │ │
│ ├─ Serialize │ │ │
│ ├─ Compress (Zstd) │ │ │
│ └─ Calculate SHA256 │ │ │
│ ↓ │ │ │
│ 2. Chunk & Package │ │ │
│ ├─ Split into Chunks│ │ │
│ ├─ CRC32 per Chunk │ │ │
│ └─ Add Headers │ │ │
│ ↓ │ │ │
│ 3. TLS Encryption │ │ │
│ ├─ mTLS Handshake ├────── Cert Exchange ──────────────>│ 4. Cert Verify │
│ └─ AES-256-GCM │ │ ├─ Check CA │
│ ↓ │ │ ├─ Check Revocation │
│ 4. Send Chunks │ │ └─ Check Capability │
│ ↓ │ │ ↓ │
├─────────────────────┤──── Chunk 1 (Encrypted) ─────────>├─────────────────────┤
│ Chunk 1: 50MB │ │ 5. Decrypt (TLS) │
│ - Header │ │ 6. Verify CRC32 │
│ - Compressed Data │ │ 7. Buffer │
│ - CRC32 Checksum │ │ ↓ │
│ ↓ │<──── ACK ─────────────────────────<│ 8. Send ACK │
├─────────────────────┤──── Chunk 2 (Encrypted) ─────────>├─────────────────────┤
│ Chunk 2: 50MB │ │ 9. Verify CRC32 │
│ ... │ ... │ ... │
├─────────────────────┤──── Chunk N (Encrypted) ─────────>├─────────────────────┤
│ Chunk N (Last) │ │ 10. Verify SHA256 │
│ │ │ 11. Decompress │
│ │<──── Final ACK + SHA256 ──────────<│ 12. Write to RocksDB│
└─────────────────────┘ └─────────────────────┘
Security Layers:
├─ Application: SHA256 (End-to-End Integrity)
├─ Transport: TLS 1.3 AES-256-GCM (Confidentiality)
├─ Authentication: mTLS Certificates (Identity)
└─ Per-Chunk: CRC32 (Transmission Integrity)
Confidentiality:
- ✅ TLS 1.3 mit AES-256-GCM Cipher
- ✅ Perfect Forward Secrecy (PFS)
- ✅ Keine Plaintext-Daten im Netzwerk
Integrity:
- ✅ Per-Chunk CRC32 (Transmission Errors)
- ✅ End-to-End SHA256 (Data Corruption)
- ✅ TLS MAC (Man-in-the-Middle Prevention)
Authentication:
- ✅ Mutual TLS (beide Seiten authentifiziert)
- ✅ Certificate-based (kein Passwort)
- ✅ Capability-based Authorization (in Cert Extensions)
Non-Repudiation:
- ✅ Audit Logs mit Shard-IDs
- ✅ Signed Certificates (CA-verified)
- ✅ Timestamp-basierte Logs
Szenario 1: Kleine Shard Migration (10 GB)
Methode Zeit Durchsatz CPU Netzwerk
─────────────────────────────────────────────────────────────────
Record-by-Record 45 min 3.7 MB/s High 15 GB
+ Compression (Zstd-6) 35 min 4.8 MB/s High 5 GB
RocksDB Snapshot 5 min 33 MB/s Low 3 GB
+ Compression (Zstd-9) 4 min 42 MB/s Medium 2.5 GB
Empfehlung: RocksDB Snapshot mit Zstd-9 → 11x schneller
Szenario 2: Große Shard Migration (100 GB)
Methode Zeit Durchsatz CPU Netzwerk
─────────────────────────────────────────────────────────────────
Record-by-Record 7.5 h 3.7 MB/s High 150 GB
RocksDB Snapshot 40 min 42 MB/s Medium 25 GB
RocksDB Snapshot (Zstd-9)33 min 51 MB/s Medium 22 GB
Empfehlung: RocksDB Snapshot mit Zstd-9 → 14x schneller, 85% weniger Netzwerk
Szenario 3: LoRA Adapter Transfer (5 GB)
Methode Zeit Durchsatz Compression
───────────────────────────────────────────────────────────
Uncompressed 2.5 min 33 MB/s 1.0x (5 GB)
LZ4 (Level 3) 1.8 min 46 MB/s 2.2x (2.3 GB)
Zstd (Level 6) 1.2 min 69 MB/s 3.5x (1.4 GB)
Zstd (Level 12) 1.5 min 55 MB/s 4.2x (1.2 GB)
Empfehlung: Zstd-6 → Bester Balance zwischen Zeit und Compression
Bandbreiten-Nutzung:
100 GB Shard Migration über 1 Gbps Netzwerk:
Ohne Compression:
├─ Daten: 100 GB
├─ Zeit: ~15 min (theoretisch)
└─ Praktisch: ~40 min (Overhead, Latenz)
Mit Zstd-9 Compression (4x):
├─ Daten: 25 GB
├─ Zeit: ~4 min (theoretisch)
├─ Praktisch: ~10 min
└─ CPU-Overhead: +30% (akzeptabel)
Inter-DC (100 Mbps):
├─ Ohne: 100 GB → ~2.5 Stunden
├─ Mit Zstd-9: 25 GB → ~35 Minuten
└─ Ersparnis: ~2 Stunden
Kosten-Einsparung (Cloud Egress @ $0.12/GB):
├─ Ohne: 100 GB × $0.12 = $12
├─ Mit Compression: 25 GB × $0.12 = $3
└─ Ersparnis: $9 pro Migration
-
Erweitere Protobuf Definitions:
- ✅ Füge
RocksDBSnapshotRequest/Responsehinzu - ✅ Füge
BlobTransferRequest/Chunk/Responsefür LoRA hinzu - ✅ Füge Compression & Chunking Metadaten hinzu
- ✅ Füge
-
Implementiere Compression in Data Migrator:
- ✅ Zstd Support hinzufügen
- ✅ Konfigurierbare Compression Levels
-
Verbessere Chunking:
- ✅ Konfigurierbare Chunk-Größen
- ✅ CRC32 Checksums pro Chunk
- ✅ Resume Support
-
RocksDB Snapshot Transfer:
- Checkpoint Creation/Restore Integration
- Tar/Compress Pipeline
- Streaming Transfer
-
LoRA Blob Transfer:
- Dedicated Blob Transfer Service
- Parallel Chunk Transfer
- Progress Tracking UI
-
Performance Optimierung:
- Adaptive Compression (wählt Algorithm basierend auf Datentyp)
- Parallel Chunk Processing
- Zero-Copy optimizations
-
Advanced Features:
- Deduplication (gleiche Chunks nur einmal senden)
- Delta-Transfer (nur Änderungen senden)
- Multicast für Broadcast-Szenarien
-
Monitoring & Observability:
- Real-time Transfer Dashboards
- Compression Ratio Metrics
- Network Utilization Graphs
Die Inter-Shard Pipeline benötigt Erweiterungen für:
- RocksDB Dumps → Snapshot-basierter Transfer für große Shards
- LoRA Adapters → Blob Transfer mit hoher Compression
- Compression/Chunking → Konsistente Strategie über alle Transfer-Typen
- mTLS Security → Bereits gut implementiert, keine Änderungen nötig
Nächste Schritte:
- Erweitere
shard_rpc.protomit neuen Message Types - Implementiere Compression im Data Migrator
- Füge RocksDB Snapshot Transfer hinzu
- Implementiere Blob Transfer für LoRA
Autor: ThemisDB Development Team
Review: Pending
Status: Analysis Complete
ThemisDB v1.3.4 | GitHub | Documentation | Discussions | License
Last synced: January 02, 2026 | Commit: 6add659
Version: 1.3.0 | Stand: Dezember 2025
- Übersicht
- Home
- Dokumentations-Index
- Quick Reference
- Sachstandsbericht 2025
- Features
- Roadmap
- Ecosystem Overview
- Strategische Übersicht
- Geo/Relational Storage
- RocksDB Storage
- MVCC Design
- Transaktionen
- Time-Series
- Memory Tuning
- Chain of Thought Storage
- Query Engine & AQL
- AQL Syntax
- Explain & Profile
- Rekursive Pfadabfragen
- Temporale Graphen
- Zeitbereichs-Abfragen
- Semantischer Cache
- Hybrid Queries (Phase 1.5)
- AQL Hybrid Queries
- Hybrid Queries README
- Hybrid Query Benchmarks
- Subquery Quick Reference
- Subquery Implementation
- Content Pipeline
- Architektur-Details
- Ingestion
- JSON Ingestion Spec
- Enterprise Ingestion Interface
- Geo-Processor Design
- Image-Processor Design
- Hybrid Search Design
- Fulltext API
- Hybrid Fusion API
- Stemming
- Performance Tuning
- Migration Guide
- Future Work
- Pagination Benchmarks
- Enterprise README
- Scalability Features
- HTTP Client Pool
- Build Guide
- Implementation Status
- Final Report
- Integration Analysis
- Enterprise Strategy
- Verschlüsselungsstrategie
- Verschlüsselungsdeployment
- Spaltenverschlüsselung
- Encryption Next Steps
- Multi-Party Encryption
- Key Rotation Strategy
- Security Encryption Gap Analysis
- Audit Logging
- Audit & Retention
- Compliance Audit
- Compliance
- Extended Compliance Features
- Governance-Strategie
- Compliance-Integration
- Governance Usage
- Security/Compliance Review
- Threat Model
- Security Hardening Guide
- Security Audit Checklist
- Security Audit Report
- Security Implementation
- Development README
- Code Quality Pipeline
- Developers Guide
- Cost Models
- Todo Liste
- Tool Todo
- Core Feature Todo
- Priorities
- Implementation Status
- Roadmap
- Future Work
- Next Steps Analysis
- AQL LET Implementation
- Development Audit
- Sprint Summary (2025-11-17)
- WAL Archiving
- Search Gap Analysis
- Source Documentation Plan
- Changefeed README
- Changefeed CMake Patch
- Changefeed OpenAPI
- Changefeed OpenAPI Auth
- Changefeed SSE Examples
- Changefeed Test Harness
- Changefeed Tests
- Dokumentations-Inventar
- Documentation Summary
- Documentation TODO
- Documentation Gap Analysis
- Documentation Consolidation
- Documentation Final Status
- Documentation Phase 3
- Documentation Cleanup Validation
- API
- Authentication
- Cache
- CDC
- Content
- Geo
- Governance
- Index
- LLM
- Query
- Security
- Server
- Storage
- Time Series
- Transaction
- Utils
Vollständige Dokumentation: https://makr-code.github.io/ThemisDB/