-
Notifications
You must be signed in to change notification settings - Fork 0
STREAMING_JSONL_TRAINING
Stand: 5. Dezember 2025
Version: 1.0.0
Kategorie: Api
ThemisDB JSONL Export API unterstützt echtes Streaming für on-demand LLM Training. Der LoRA/QLoRA Trainingsprozess kann Daten direkt aus der DB beziehen, ohne vollständigen Export.
✅ Chunked Transfer Encoding - Server streamt JSONL Zeile für Zeile
✅ Keine Zwischenspeicherung - Daten werden on-the-fly aus RocksDB gelesen
✅ Backpressure Support - Client-seitige Rate-Limitierung möglich
┌─────────────┐ HTTP Stream ┌──────────────┐
│ LoRA/QLoRA │ ◄──────────────────── │ ThemisDB │
│ Training │ Chunked Transfer │ HTTP Server │
│ Process │ └──────┬───────┘
└─────────────┘ │
│ │
▼ ▼
┌─────────────┐ ┌──────────────┐
│ Local Cache │ │ RocksDB │
│ (Optional) │ │ Storage │
└─────────────┘ └──────────────┘
POST /api/export/jsonl_llm/stream
Authorization: Bearer <token>
Transfer-Encoding: chunked
{
"theme": "Rechtssprechung",
"from_date": "2020-01-01",
"batch_size": 100,
"stream_mode": "continuous"
}
Response 200 OK:
Transfer-Encoding: chunked
Content-Type: application/x-ndjson
{"instruction": "...", "output": "...", "weight": 1.2}\n
{"instruction": "...", "output": "...", "weight": 0.9}\n
...import requests
from torch.utils.data import IterableDataset, DataLoader
class ThemisDBStreamDataset(IterableDataset):
"""
On-Demand JSONL Streaming Dataset für LoRA/QLoRA Training
Features:
- Direkte DB-Verbindung (kein lokaler Export)
- Optional: Lokaler Cache für wiederholte Epochen
- Backpressure: Training-Geschwindigkeit bestimmt Query-Rate
"""
def __init__(self, base_url, token, query_params, cache_dir=None):
self.base_url = base_url
self.token = token
self.query_params = query_params
self.cache_dir = cache_dir
self.cache = []
self.cache_enabled = cache_dir is not None
def __iter__(self):
# Wenn Cache existiert, nutze Cache
if self.cache_enabled and len(self.cache) > 0:
for item in self.cache:
yield item
return
# Sonst: Stream von ThemisDB
response = requests.post(
f'{self.base_url}/api/export/jsonl_llm/stream',
headers={'Authorization': f'Bearer {self.token}'},
json=self.query_params,
stream=True # Wichtig: Streaming aktivieren
)
for line in response.iter_lines():
if line:
item = json.loads(line)
# Optional: Cache für spätere Epochen
if self.cache_enabled:
self.cache.append(item)
yield item
# Verwendung im Training
dataset = ThemisDBStreamDataset(
base_url='https://themisdb',
token=ADMIN_TOKEN,
query_params={
'theme': 'Rechtssprechung',
'from_date': '2020-01-01',
'batch_size': 100,
'weighting': {'auto_weight_by_freshness': True}
},
cache_dir='./cache/epoch1' # Optional: Cache für Epoch 2+
)
dataloader = DataLoader(dataset, batch_size=32)
# Training Loop
for epoch in range(num_epochs):
for batch in dataloader:
# Training step
# Daten werden on-demand von ThemisDB gestreamt
# Kein vollständiger Export notwendig
loss = train_step(batch)from datasets import IterableDataset
import requests
import json
def themisdb_generator(base_url, token, query_params):
"""Generator für HuggingFace IterableDataset"""
response = requests.post(
f'{base_url}/api/export/jsonl_llm/stream',
headers={'Authorization': f'Bearer {token}'},
json=query_params,
stream=True
)
for line in response.iter_lines():
if line:
yield json.loads(line)
# HuggingFace Dataset erstellen
dataset = IterableDataset.from_generator(
themisdb_generator,
gen_kwargs={
'base_url': 'https://themisdb',
'token': ADMIN_TOKEN,
'query_params': {
'theme': 'Immissionsschutz',
'format': 'instruction_tuning'
}
}
)
# PEFT LoRA Training
from transformers import Trainer, TrainingArguments
from peft import LoraConfig, get_peft_model
trainer = Trainer(
model=peft_model,
args=training_args,
train_dataset=dataset # Streaming dataset!
)
trainer.train()# Für sehr große Datasets, die nicht in RAM passen
# Jede Epoche streamt neu von DB
dataset = ThemisDBStreamDataset(..., cache_dir=None)# Erste Epoche: Stream von DB + Cache in RAM
# Folgende Epochen: Aus RAM-Cache
dataset = ThemisDBStreamDataset(..., cache_dir=None)
dataset.cache_enabled = True # Aktiviert RAM-Cache# Erste Epoche: Stream von DB + Cache auf Disk
# Spätere Trainings: Aus Disk-Cache
dataset = ThemisDBStreamDataset(..., cache_dir='./cache/rechtssprechung')from functools import lru_cache
class CachedThemisDBDataset(ThemisDBStreamDataset):
@lru_cache(maxsize=10000)
def _fetch_item(self, index):
# Automatischer LRU-Cache für häufig genutzte Items
pass// export_api_handler.cpp - Bereits implementiert
void handleStreamExport(const HttpRequest& req, HttpResponse& res) {
// Chunked Transfer Encoding
res.setHeader("Transfer-Encoding", "chunked");
res.setHeader("Content-Type", "application/x-ndjson");
// Stream Query-Ergebnisse
size_t batch_size = config.batch_size;
std::vector<BaseEntity> batch;
while (query.hasMore()) {
batch = query.fetchBatch(batch_size); // Batched DB access
for (const auto& entity : batch) {
std::string jsonl_line = exporter.formatEntity(entity);
res.writeChunk(jsonl_line + "\n"); // Sofortiges Senden
}
}
res.endChunks();
}class OptimizedStreamDataset(IterableDataset):
def __init__(self, ..., prefetch_size=1000):
self.prefetch_size = prefetch_size
self.buffer = []
def __iter__(self):
response = requests.post(..., stream=True)
# Prefetching für bessere Performance
for line in response.iter_lines(chunk_size=self.prefetch_size):
if line:
self.buffer.append(json.loads(line))
if len(self.buffer) >= self.prefetch_size:
# Batch yielden
for item in self.buffer:
yield item
self.buffer = []- ❌ Kein vollständiger Export (100 GB JSONL vermieden)
- ✅ Nur aktuelle Batch im RAM (~10-100 MB)
- ✅ Training beginnt sofort (kein Warten auf Export)
- ✅ Immer neueste Daten aus DB
- ✅ Inkrementelle Updates während Training
- ✅ Dynamische Filterung (z.B. nur Daten nach Datum X)
- ✅ Verschiedene Queries pro Epoche
- ✅ A/B Testing mit verschiedenen Gewichtungen
- ✅ Adaptive Sampling basierend auf Trainings-Metriken
# Training mit mehreren Themen im Wechsel
themes = ['Rechtssprechung', 'Immissionsschutz', 'Datenschutz']
for epoch in range(num_epochs):
for theme in themes:
dataset = ThemisDBStreamDataset(
query_params={
'theme': theme,
'from_date': get_recent_date(days=365),
'weighting': {'auto_weight_by_freshness': True}
},
cache_dir=f'./cache/{theme}_epoch{epoch}'
)
# Training auf theme-spezifischem Dataset
trainer.train(dataset, max_steps=1000)✅ Server-seitiges Streaming - Bereits implementiert (Commit 6b4129b)
✅ Chunked Transfer Encoding - Funktioniert
✅ Backpressure Support - Client bestimmt Rate
📝 Python Client Library - Beispielcode oben
📝 Cache-Strategien - Implementierbar durch Client
-
Python Package:
themisdb-datasets(PyPI)- ThemisDBStreamDataset
- HuggingFace Integration
- Auto-Caching
-
WebSocket Support (Optional)
- Bi-direktionale Kommunikation
- Training-Feedback an DB (welche Samples effektiv)
-
Adaptive Sampling (Optional)
- Server tracked welche Samples schwer zu lernen sind
- Dynamische Gewichtung während Training
ThemisDB v1.3.4 | GitHub | Documentation | Discussions | License
Last synced: January 02, 2026 | Commit: 6add659
Version: 1.3.0 | Stand: Dezember 2025
- Übersicht
- Home
- Dokumentations-Index
- Quick Reference
- Sachstandsbericht 2025
- Features
- Roadmap
- Ecosystem Overview
- Strategische Übersicht
- Geo/Relational Storage
- RocksDB Storage
- MVCC Design
- Transaktionen
- Time-Series
- Memory Tuning
- Chain of Thought Storage
- Query Engine & AQL
- AQL Syntax
- Explain & Profile
- Rekursive Pfadabfragen
- Temporale Graphen
- Zeitbereichs-Abfragen
- Semantischer Cache
- Hybrid Queries (Phase 1.5)
- AQL Hybrid Queries
- Hybrid Queries README
- Hybrid Query Benchmarks
- Subquery Quick Reference
- Subquery Implementation
- Content Pipeline
- Architektur-Details
- Ingestion
- JSON Ingestion Spec
- Enterprise Ingestion Interface
- Geo-Processor Design
- Image-Processor Design
- Hybrid Search Design
- Fulltext API
- Hybrid Fusion API
- Stemming
- Performance Tuning
- Migration Guide
- Future Work
- Pagination Benchmarks
- Enterprise README
- Scalability Features
- HTTP Client Pool
- Build Guide
- Implementation Status
- Final Report
- Integration Analysis
- Enterprise Strategy
- Verschlüsselungsstrategie
- Verschlüsselungsdeployment
- Spaltenverschlüsselung
- Encryption Next Steps
- Multi-Party Encryption
- Key Rotation Strategy
- Security Encryption Gap Analysis
- Audit Logging
- Audit & Retention
- Compliance Audit
- Compliance
- Extended Compliance Features
- Governance-Strategie
- Compliance-Integration
- Governance Usage
- Security/Compliance Review
- Threat Model
- Security Hardening Guide
- Security Audit Checklist
- Security Audit Report
- Security Implementation
- Development README
- Code Quality Pipeline
- Developers Guide
- Cost Models
- Todo Liste
- Tool Todo
- Core Feature Todo
- Priorities
- Implementation Status
- Roadmap
- Future Work
- Next Steps Analysis
- AQL LET Implementation
- Development Audit
- Sprint Summary (2025-11-17)
- WAL Archiving
- Search Gap Analysis
- Source Documentation Plan
- Changefeed README
- Changefeed CMake Patch
- Changefeed OpenAPI
- Changefeed OpenAPI Auth
- Changefeed SSE Examples
- Changefeed Test Harness
- Changefeed Tests
- Dokumentations-Inventar
- Documentation Summary
- Documentation TODO
- Documentation Gap Analysis
- Documentation Consolidation
- Documentation Final Status
- Documentation Phase 3
- Documentation Cleanup Validation
- API
- Authentication
- Cache
- CDC
- Content
- Geo
- Governance
- Index
- LLM
- Query
- Security
- Server
- Storage
- Time Series
- Transaction
- Utils
Vollständige Dokumentation: https://makr-code.github.io/ThemisDB/