The Generic Type-to-Qdrant Conversion System replaces manual conversion logic with a unified, type-safe approach that leverages TypeScript's advanced type system for automatic field detection and mapping. This system eliminates code duplication, improves maintainability, and provides consistent data transformation across all Qdrant payload types.
- Manual field extraction: Each payload type has hardcoded field mapping
- Repetitive compression logic: Same compression patterns repeated across methods
- Type-specific metadata injection: Manual metadata assignment per type
- Scattered validation: No centralized validation or transformation rules
- Comprehensive type coverage: 9 well-structured type files covering all domain objects
- Consistent patterns: Base interfaces with extensions (e.g.,
QdrantPayloadBase) - Type guards: Existing runtime type checking in
dataproc-responses.ts - Compression integration: Well-established compression service with field-level support
graph TB
subgraph "Generic Conversion System"
A[Source Type] --> B[Type Analyzer]
B --> C[Field Mapper]
C --> D[Transformation Engine]
D --> E[Compression Handler]
E --> F[Metadata Injector]
F --> G[Qdrant Payload]
end
subgraph "TypeScript Utilities"
H[Mapped Types]
I[Conditional Types]
J[Utility Types]
K[Type Guards]
end
subgraph "Configuration Layer"
L[Field Rules]
M[Compression Rules]
N[Metadata Rules]
O[Override Config]
end
B --> H
C --> I
D --> J
E --> M
F --> N
L --> C
O --> D
// File: src/types/generic-converter.ts
export interface GenericConverter<TSource, TTarget extends QdrantPayloadBase> {
convert(source: TSource, metadata: QdrantStorageMetadata): Promise<TTarget>;
validate(source: TSource): boolean;
getCompressionFields(): (keyof TSource)[];
}
export interface ConversionConfig<TSource> {
fieldMappings?: Partial<Record<keyof TSource, string>>;
compressionRules?: CompressionFieldConfig<TSource>;
transformations?: FieldTransformations<TSource>;
metadata?: MetadataInjectionRules;
}// File: src/services/generic-converter.ts
export class GenericQdrantConverter {
private compressionService: CompressionService;
private fieldAnalyzer: FieldAnalyzer;
private transformationEngine: TransformationEngine;
async convert<TSource, TTarget extends QdrantPayloadBase>(
source: TSource,
metadata: QdrantStorageMetadata,
config?: ConversionConfig<TSource>
): Promise<TTarget> {
// 1. Analyze source type structure
const fieldMap = await this.fieldAnalyzer.analyzeFields(source);
// 2. Apply automatic field mapping with overrides
const mappedFields = this.applyFieldMappings(fieldMap, config?.fieldMappings);
// 3. Handle compression for large fields
const compressedFields = await this.handleCompression(mappedFields, config?.compressionRules);
// 4. Apply transformations
const transformedFields = this.applyTransformations(compressedFields, config?.transformations);
// 5. Inject metadata
const payload = this.injectMetadata(transformedFields, metadata, config?.metadata);
return payload as TTarget;
}
}export interface CompressionFieldConfig<T> {
fields: (keyof T)[];
sizeThreshold?: number;
compressionType?: 'gzip' | 'deflate';
securityLevel?: 'none' | 'basic' | 'encrypted'; // Future security consideration
}
export class CompressionHandler {
async compressFields<T>(
data: T,
config: CompressionFieldConfig<T>
): Promise<Partial<Record<keyof T, CompressibleField<any>>>> {
const result: any = {};
for (const field of config.fields) {
const fieldValue = data[field];
const compressed = await this.compressionService.compressIfNeeded(
fieldValue,
config.sizeThreshold
);
result[field] = {
data: compressed.data,
isCompressed: compressed.isCompressed,
compressionType: compressed.compressionType,
originalSize: compressed.originalSize,
compressedSize: compressed.compressedSize,
};
}
return result;
}
}const converter = new GenericQdrantConverter(compressionService);
// Automatic field detection and mapping
const queryPayload = await converter.convert<QueryResultData, QdrantQueryResultPayload>(
queryData,
metadata
);
const clusterPayload = await converter.convert<ExtendedClusterData, QdrantClusterPayload>(
clusterData,
metadata
);const config: ConversionConfig<QueryResultData> = {
fieldMappings: {
results: 'rows', // Custom field mapping
executionTime: 'duration'
},
compressionRules: {
fields: ['schema', 'rows'],
sizeThreshold: 5120, // 5KB threshold
compressionType: 'gzip'
},
transformations: {
totalRows: (value) => value || 0, // Default value transformation
timestamp: () => new Date().toISOString() // Auto-generated timestamp
}
};
const payload = await converter.convert(queryData, metadata, config);// Compile-time type safety for field mappings
type QueryFields = ExtractQdrantFields<QueryResultData>;
type ClusterFields = ExtractQdrantFields<ExtendedClusterData>;
// Automatic payload type inference
type QueryPayload = InferQdrantPayload<QueryResultData>; // QdrantQueryResultPayload
type ClusterPayload = InferQdrantPayload<ExtendedClusterData>; // QdrantClusterPayload// Old approach - manual, repetitive, error-prone
private async createQueryResultPayload(data: any, basePayload: any): Promise<QdrantQueryResultPayload> {
const queryData = data as any;
// Manual field extraction
const schema = queryData.schema;
const rows = queryData.rows;
const summary = queryData.summary;
// Manual compression
const schemaCompression = await this.compressionService.compressIfNeeded(schema || {});
const rowsCompression = await this.compressionService.compressIfNeeded(rows || []);
// Manual payload construction
const payload: QdrantQueryResultPayload = {
...basePayload,
jobId: queryData.jobId || basePayload.jobId || 'unknown',
contentType: queryData.contentType || 'structured_data',
// ... 20+ lines of manual mapping
};
return payload;
}// New approach - automatic, type-safe, configurable
const result = await this.genericConverter.convert(data, metadata);
// That's it! Automatic field detection, compression, and mapping// Add new imports
import { GenericQdrantConverter, createGenericConverter } from './generic-converter.js';
import { ConversionConfig } from '../types/generic-converter.js';export class QdrantStorageService {
private genericConverter: GenericQdrantConverter;
constructor(config: QdrantConfig) {
// ... existing initialization
this.genericConverter = createGenericConverter(this.compressionService);
}
}Before: Manual Query Result Conversion
private async createQueryResultPayload(data: any, basePayload: any): Promise<QdrantQueryResultPayload> {
const queryData = data as any;
const schema = queryData.schema;
const rows = queryData.rows;
const summary = queryData.summary;
const searchableContent = queryData.searchableContent;
const schemaCompression = await this.compressionService.compressIfNeeded(schema || {});
const rowsCompression = await this.compressionService.compressIfNeeded(rows || []);
const payload: QdrantQueryResultPayload = {
...basePayload,
jobId: queryData.jobId || basePayload.jobId || 'unknown',
contentType: queryData.contentType || 'structured_data',
totalRows: queryData.totalRows || (Array.isArray(rows) ? rows.length : 0),
schemaFields: queryData.schemaFields || (schema?.fields?.length || 0),
dataSize: queryData.dataSize || JSON.stringify(data).length,
schema: schemaCompression.data,
rows: rowsCompression.data,
summary,
searchableContent,
isCompressed: schemaCompression.isCompressed || rowsCompression.isCompressed,
compressionType: schemaCompression.compressionType || rowsCompression.compressionType,
originalSize: (schemaCompression.originalSize || 0) + (rowsCompression.originalSize || 0),
compressedSize: (schemaCompression.compressedSize || 0) + (rowsCompression.compressedSize || 0),
};
return payload;
}After: Generic Conversion
private async createQueryResultPayload(data: any, basePayload: any): Promise<QdrantQueryResultPayload> {
// Optional: Create custom configuration for specific needs
const config: ConversionConfig<any> = {
fieldMappings: {
results: 'rows',
totalRows: 'totalRows'
},
compressionRules: {
fields: ['schema', 'rows'],
sizeThreshold: 10240,
compressionType: 'gzip'
},
transformations: {
totalRows: (value) => value || 0,
contentType: (value) => value || 'structured_data'
}
};
const result = await this.genericConverter.convert(data, basePayload, config);
return result.payload as QdrantQueryResultPayload;
}Before: Type-Specific Branching
private async createStructuredPayload(data: unknown, metadata: QdrantStorageMetadata): Promise<QdrantPayload> {
const basePayload = {
...metadata,
storedAt: new Date().toISOString(),
};
if (metadata.responseType === 'query_results' && metadata.type === 'query_result') {
return await this.createQueryResultPayload(data, basePayload);
} else if (metadata.responseType === 'cluster_data' || metadata.type === 'cluster') {
return await this.createClusterPayload(data, basePayload);
} else if (metadata.type === 'job' || metadata.responseType === 'job_submission') {
return await this.createJobPayload(data, basePayload);
} else {
return await this.createLegacyPayload(data, basePayload);
}
}After: Generic with Fallback
private async createStructuredPayload(data: unknown, metadata: QdrantStorageMetadata): Promise<QdrantPayload> {
// Try generic converter first
if (data && typeof data === 'object' && data !== null) {
try {
const result = await this.genericConverter.convert(
data as Record<string, any>,
metadata
);
logger.debug('Used generic converter for payload creation', {
type: metadata.type,
responseType: metadata.responseType,
fieldsProcessed: result.metadata.fieldsProcessed,
compressionRatio: result.metadata.compressionRatio,
});
return result.payload as QdrantPayload;
} catch (error) {
logger.warn('Generic converter failed, falling back to legacy methods', {
error: error instanceof Error ? error.message : String(error),
type: metadata.type,
responseType: metadata.responseType,
});
}
}
// Fallback to legacy methods (keep existing code for backward compatibility)
// ... existing legacy conversion logic
}// Let the system automatically detect and configure conversion
const result = await genericConverter.convert(data, metadata);const config: ConversionConfig<MyDataType> = {
fieldMappings: {
old_field_name: 'newFieldName',
snake_case_field: 'camelCaseField'
}
};
const result = await genericConverter.convert(data, metadata, config);const config: ConversionConfig<MyDataType> = {
compressionRules: {
fields: ['largeDataField', 'anotherLargeField'],
sizeThreshold: 5120, // 5KB
compressionType: 'gzip'
}
};const config: ConversionConfig<MyDataType> = {
transformations: {
timestamp: (value) => new Date(value).toISOString(),
status: (value) => value?.toUpperCase() || 'UNKNOWN',
metrics: (value) => ({
...value,
processed: true,
processedAt: new Date().toISOString()
})
}
};const config: ConversionConfig<MyDataType> = {
metadata: {
autoTimestamp: true,
autoUUID: true,
customFields: {
version: () => '2.0.0',
environment: () => process.env.NODE_ENV || 'development'
}
}
};Before
// Manual extraction and mapping
const schema = queryData.schema;
const rows = queryData.rows;
const totalRows = queryData.totalRows || (Array.isArray(rows) ? rows.length : 0);
const schemaFields = queryData.schemaFields || (schema?.fields?.length || 0);After
// Automatic with optional customization
const config = await genericConverter.createConfigForType(queryData, 'query');
const result = await genericConverter.convert(queryData, metadata, config);Before
// Manual field extraction
const clusterConfig = clusterData.config || clusterData.clusterConfig;
const machineTypes = clusterData.machineTypes || clusterData.config?.masterConfig;
const networkConfig = clusterData.networkConfig || clusterData.config?.networkConfig;After
// Automatic detection and mapping
const config = await genericConverter.createConfigForType(clusterData, 'cluster');
const result = await genericConverter.convert(clusterData, metadata, config);Before
// Manual job field handling
const jobId = jobData.jobId || basePayload.jobId || 'unknown';
const jobType = jobData.jobType || 'unknown';
const status = jobData.status || 'unknown';
const submissionTime = jobData.submissionTime || new Date().toISOString();After
// Automatic with smart defaults
const config = await genericConverter.createConfigForType(jobData, 'job');
const result = await genericConverter.convert(jobData, metadata, config);const validation = await genericConverter.validateSource(data);
if (!validation.isValid) {
logger.error('Data validation failed', {
errors: validation.errors,
suggestions: validation.suggestions
});
// Handle validation errors
}
if (validation.warnings.length > 0) {
logger.warn('Data validation warnings', {
warnings: validation.warnings
});
}try {
const result = await genericConverter.convert(data, metadata, config);
return result.payload;
} catch (error) {
logger.error('Generic conversion failed', {
error: error instanceof Error ? error.message : String(error),
dataType: typeof data,
fallbackToLegacy: true
});
// Fallback to legacy conversion
return await this.legacyConversionMethod(data, metadata);
}// After multiple conversions
const metrics = genericConverter.getMetrics();
logger.info('Conversion performance metrics', {
totalConversions: metrics.totalConversions,
averageProcessingTime: metrics.averageProcessingTime,
averageCompressionRatio: metrics.averageCompressionRatio,
fieldCompressionStats: metrics.fieldCompressionStats
});const result = await genericConverter.convert(data, metadata, config);
logger.info('Conversion completed', {
fieldsProcessed: result.metadata.fieldsProcessed,
fieldsCompressed: result.metadata.fieldsCompressed,
compressionRatio: result.metadata.compressionRatio,
processingTime: result.metadata.processingTime,
totalOriginalSize: result.metadata.totalOriginalSize,
totalCompressedSize: result.metadata.totalCompressedSize
});- Create generic converter types in
src/types/generic-converter.ts - Implement field analyzer for automatic type introspection
- Build transformation engine with TypeScript utility types
- Extend compression service for field-level rules
- Implement
GenericQdrantConverterclass - Create field mapping utilities using mapped types
- Build metadata injection system with automatic timestamp/UUID generation
- Add validation layer with type guards
- Update
qdrant-storage.tsto use generic converter - Create conversion configs for existing types (Query, Cluster, Job)
- Migrate existing conversion methods to use generic system
- Add backward compatibility layer for existing payloads
- Add new type support through configuration
- Implement security/encryption hooks for sensitive fields
- Performance optimization for large data structures
- Enhanced type inference for complex nested objects
- Reduced boilerplate: Eliminate manual conversion methods
- Type safety: Compile-time validation of field mappings
- IntelliSense support: Full IDE support for field mapping and transformations
- Extensibility: Easy addition of new types through configuration
- Centralized logic: Single conversion system for all types
- Consistent patterns: Standardized approach across all data types
- Future-proof: Easy to extend for new requirements (security, encryption)
- Backward compatibility: Existing payloads continue to work
- Optimized compression: Smart field-level compression based on data characteristics
- Lazy evaluation: Only compress fields that exceed thresholds
- Memory efficiency: Process large objects without loading everything into memory
- Caching potential: Reusable conversion configurations
Successfully integrated the generic converter system into the KnowledgeIndexer service, replacing manual ClusterData transformation and Qdrant payload creation with automated, type-safe conversions.
-
Imports and Dependencies
- Added generic converter imports:
GenericQdrantConverter,createGenericConverter,quickConvert - Added compression service:
CompressionService - Added type imports:
ConversionConfig,ConversionResult,QdrantStorageMetadata
- Added generic converter imports:
-
Class Properties
- Added
genericConverter: GenericQdrantConverter- Main conversion engine - Added
compressionService: CompressionService- Handles field-level compression
- Added
-
Constructor Updates
- Initialize compression service:
new CompressionService() - Initialize generic converter:
createGenericConverter(this.compressionService) - Updated
initializeWithConnectionManager()to recreate converter instances
- Initialize compression service:
indexClusterConfiguration() - Enhanced with Generic Converter
- Before: Manual field extraction and basic validation
- After:
- Generic converter validation with
validateClusterData() - Automatic field mapping with
extractClusterIdentifiers() - Performance tracking with start/end timing
- Enhanced error handling with fallback mechanisms
- Generic converter validation with
updateClusterKnowledge() - Automated Field Extraction
- Before: Manual extraction of machine types, worker counts, components, etc.
- After:
extractClusterDataWithConverter()- Automatic field extraction using generic convertermergeExtractedClusterData()- Type-safe data mergingupdateClusterKnowledgeManual()- Fallback to original logic- Comprehensive error handling and logging
storeClusterKnowledge() - Intelligent Payload Creation
- Before: Manual payload creation with static metadata
- After:
convertClusterKnowledgeToPayload()- Generic converter with custom configuration- Automatic compression for large fields (configurations, pipPackages, etc.)
- Dynamic metadata with actual compression metrics
storeClusterKnowledgeManual()- Fallback to original logic
storeJobKnowledge() - Enhanced Job Data Processing
- Before: Simple data wrapping with type annotation
- After:
convertJobKnowledgeToPayload()- Generic converter for job data- Compression for query, results, outputSample, errorInfo fields
- Enhanced metadata with processing metrics
storeJobKnowledgeManual()- Fallback to original logic
Performance and Metrics
getConversionMetrics()- Access generic converter performance metricsresetConversionMetrics()- Reset conversion statisticsgetPerformanceStats()- Comprehensive performance and usage statisticstestGenericConverterIntegration()- Integration testing with sample data
Data Processing Helpers
validateClusterData()- Enhanced validation using generic converterextractClusterIdentifiers()- Automatic field mapping for cluster identificationinitializeClusterKnowledge()- Clean knowledge structure initializationextractClusterDataWithConverter()- Automated cluster data extractionextractConfigurationData()- Configuration-specific field extractionextractLabelData()- Label and metadata extractionextractPipPackages()- Pip package parsing and extractionmergeExtractedClusterData()- Type-safe data merging
-
Performance Improvements
- Automatic Compression: Large fields (configurations, packages, scripts) are automatically compressed
- Processing Time Tracking: All conversions are timed and logged
- Compression Metrics: Real-time compression ratio tracking and reporting
-
Enhanced Error Handling
- Graceful Degradation: Falls back to manual methods if generic converter fails
- Comprehensive Logging: Detailed error messages with context
- Validation Integration: Uses generic converter validation with fallbacks
-
Type Safety
- Automatic Field Mapping: Type-safe field transformations
- Configuration-Driven: Conversion rules defined in strongly-typed configurations
- Runtime Validation: Source data validation before processing
-
Maintainability
- Backward Compatibility: All existing functionality preserved
- Modular Design: Clear separation between generic converter and fallback logic
- Comprehensive Testing: Built-in integration testing capabilities
src/types/generic-converter.ts- Core conversion interfaces and utility typessrc/services/generic-converter.ts- Main conversion engine implementationsrc/services/field-analyzer.ts- Automatic field detection and analysissrc/services/transformation-engine.ts- Field transformation and mapping logicsrc/utils/type-inference.ts- Advanced TypeScript utility typestests/unit/generic-converter.test.ts- Comprehensive test suite
src/services/qdrant-storage.ts- Integrate generic convertersrc/types/qdrant-payload.ts- Add generic payload supportsrc/services/compression.ts- Extend for field-level compression rulessrc/types/response-filter.ts- Add conversion metadata types
- Code reduction: 50%+ reduction in conversion-related code
- Type coverage: 100% compile-time type safety for all conversions
- Performance: No degradation in conversion speed
- Memory usage: Optimized memory footprint for large objects
- Development time: 75% reduction in time to add new type support
- Bug reduction: Elimination of manual field mapping errors
- Maintainability: Single point of change for conversion logic
- Documentation: Self-documenting through TypeScript types
This architectural design provides a robust, type-safe, and extensible foundation for converting any type in your system to Qdrant-compatible payloads while maintaining backward compatibility and optimizing for developer experience.