LLM Data Processor is an ultra-high-performance toolkit designed for large language model (LLM) data cleaning and preprocessing, adopting a minimal configuration philosophy that requires just one YAML file to implement complex data processing workflows. This tool employs advanced asynchronous architecture and intelligent load balancing technology to ensure optimal performance when processing large-scale datasets.
- π Extreme Performance Optimization: Asynchronous multi-threaded architecture, 10-20 times faster than traditional processing methods
- βοΈ Intelligent Load Balancing: Automatically distributes load across multiple LLM models, ensuring optimal performance and cost-effectiveness
- π High Compatibility: Designed with unified interfaces supporting various databases and LLM platforms
- π Minimal Configuration: One YAML file handles all configurations, no code writing required
- π‘οΈ Robust Fault Tolerance: Automatically handles API errors, rate limits, and network issues
- LLM Interfaces: Only OpenAI API compatible interfaces
- Databases: Only PostgreSQL with asynchronous multi-threading
- Usage Method: Only command-line interface
pip install llm-data-processorSimply create a configuration file and run the command:
# Process data using the configuration file, no code writing required
llm-data-processor task/company_extract.yamlIt's that simple!
# Model configuration - Supports intelligent load balancing across multiple models
model_configs:
- api_key: ${OPENAI_API_KEY}
base_url: "https://api.openai.com/v1"
model_name: "gpt-4o"
initial_weight: 1.0
max_tokens: 4000
temperature: 0.2
max_concurrency: 20
cost_per_token: 0.00001
- api_key: ${AZURE_API_KEY}
base_url: "https://your-azure-endpoint.openai.azure.com"
model_name: "gpt-35-turbo"
initial_weight: 3.0
max_tokens: 2000
temperature: 0.3
max_concurrency: 50
cost_per_token: 0.000002
- api_key: "xinference"
base_url: "http://localhost:9997/v1"
model_name: "llama3"
initial_weight: 2.0
max_tokens: 2000
temperature: 0.7
max_concurrency: 4
night_only: true # Only use during off-peak hours
- api_key: ${ANOTHER_API_KEY}
base_url: "https://api.another-provider.com/v1"
model_name: "custom-model"
initial_weight: 0.5
max_tokens: 4000
retry_priority: 1 # Retry priority, lower numbers have higher priority
fallback_only: true # Only used as a fallback optionThe system automatically optimizes model selection and request allocation based on the following factors:
- β±οΈ Response Time: Dynamically adjusts weights to prioritize models with faster responses
- π° Cost Effectiveness: Considers the cost of each model to optimize overall processing costs
- π Error Rate: Automatically reduces the weight of models that frequently error
- π Load Situation: Distributes load based on current concurrent requests
- β° Time Strategy: Supports using specific models during specific time periods (e.g., local models at night)
Suppose you have a database table containing company description texts and need to extract structured company information.
CREATE TABLE public.companies (
id SERIAL PRIMARY KEY,
company_name VARCHAR(255),
description TEXT,
website VARCHAR(255),
processed BOOLEAN DEFAULT FALSE,
extracted_info JSONB,
processed_at TIMESTAMP
);# Database configuration
db:
database: "business_db"
user: ${DB_USER}
password: ${DB_PASSWORD}
host: "localhost"
port: 5432
# Task configuration
table: "public.companies"
query_fields: ["id", "company_name", "description", "website"]
result_fields: ["extracted_info"]
id_field: "id"
input_field: "description"
filter_condition: "processed = false AND description IS NOT NULL AND length(description) > 50"
model_flag: "processed"
timestamp_field: "processed_at"
# Processing parameters - High performance configuration
batch_size: 100
concurrency: 8
db_pool_min_size: 3
db_pool_max_size: 10
queue_max_size: 20
max_retries: 3
retry_delay: 2.0
exponential_backoff: true
# Model configuration - Supports intelligent load balancing across multiple models
model_configs:
- api_key: ${OPENAI_API_KEY}
base_url: "https://api.openai.com/v1"
model_name: "gpt-3.5-turbo"
initial_weight: 1.0
max_tokens: 2000
temperature: 0.2
- api_key: "xinference"
base_url: "http://localhost:9997/v1"
model_name: "llama3"
initial_weight: 0.5
max_tokens: 2000
night_only: true # Only use during off-peak hours
# Prompt configuration
system_prompt: |
You are a business information analysis expert. Your task is to extract key business information from company descriptions.
Please extract the following information:
1. Industry categories (maximum 3)
2. Main products or services (maximum 5)
3. Target customer groups
4. Estimated company size (e.g., startup, SME, large enterprise, etc.)
5. Year of company founding (if mentioned)
6. Company geographic location/headquarters (if mentioned)
Please respond in JSON format, containing the following fields:
{
"industries": [],
"products_services": [],
"target_customers": "",
"company_size": "",
"founding_year": null,
"headquarters": ""
}
user_prompt_template: |
Company Name: {company_name}
Company Website: {website}
Company Description:
{description}
Based on the information above, please extract key business information.# Set environment variables
export DB_USER="postgres"
export DB_PASSWORD="your_password"
export OPENAI_API_KEY="your_openai_key"
# Run the processing task
llm-data-processor company_extract.yamlCREATE TABLE public.customer_reviews (
review_id SERIAL PRIMARY KEY,
product_id INTEGER,
review_text TEXT,
date_created TIMESTAMP DEFAULT NOW(),
analyzed BOOLEAN DEFAULT FALSE,
sentiment VARCHAR(20),
key_topics JSONB,
summary TEXT,
analysis_date TIMESTAMP
);# Database configuration
db:
database: "ecommerce_db"
user: ${DB_USER}
password: ${DB_PASSWORD}
host: "localhost"
port: 5432
# Task configuration
table: "public.customer_reviews"
query_fields: ["review_id", "review_text", "product_id", "date_created"]
result_fields: ["sentiment", "key_topics", "summary"]
id_field: "review_id"
input_field: "review_text"
filter_condition: "analyzed = false AND length(review_text) > 10"
model_flag: "analyzed"
timestamp_field: "analysis_date"
# Processing parameters
batch_size: 200
concurrency: 10
db_pool_min_size: 5
db_pool_max_size: 15
queue_max_size: 20
max_retries: 3
# Model configuration
model_configs:
- api_key: ${OPENAI_API_KEY}
base_url: "https://api.openai.com/v1"
model_name: "gpt-3.5-turbo"
initial_weight: 1.0
# Prompt configuration
system_prompt: |
You are a customer feedback analyst. Your task is to analyze customer reviews and extract:
1. Overall sentiment (positive, negative, or neutral)
2. Key topics mentioned in the review (maximum 3)
3. Brief summary (maximum 50 words)
Please respond in JSON format, containing the following fields:
{
"sentiment": "",
"key_topics": [],
"summary": ""
}
user_prompt_template: |
Product ID: {product_id}
Review Date: {date_created}
Customer Review:
{review_text}
Please analyze the customer review above for sentiment, key topics, and provide a brief summary.| Parameter | Description |
|---|---|
database |
Database name |
user |
Database username |
password |
Database password |
host |
Database host address |
port |
Database port number |
| Parameter | Description |
|---|---|
table |
Table name to process (including schema) |
query_fields |
List of fields to query |
result_fields |
List of fields to store results |
id_field |
Primary key field name |
input_field |
Field name containing text to process |
filter_condition |
SQL WHERE condition to filter records for processing |
model_flag |
Boolean field name to mark records as processed |
timestamp_field |
Timestamp field to record processing time (optional) |
| Parameter | Description |
|---|---|
batch_size |
Number of records to process per batch |
concurrency |
Number of concurrent processing threads |
db_pool_min_size |
Minimum number of database connections in the pool |
db_pool_max_size |
Maximum number of database connections in the pool |
queue_max_size |
Maximum size of the task queue |
max_retries |
Maximum number of retries for failed tasks |
| Parameter | Description |
|---|---|
api_key |
API key |
base_url |
API base URL |
model_name |
Model name |
initial_weight |
Initial model weight (for multi-model configuration) |
max_tokens |
Maximum tokens for model response |
temperature |
Temperature setting for model response |
night_only |
Whether to use this model only during night hours (optional) |
LLM Data Processor employs an advanced asynchronous architecture to ensure optimal performance:
- Asynchronous I/O Engine: Efficient non-blocking I/O system built on Python asyncio
- Dynamic Batching: Intelligently combines requests to maximize throughput
- Adaptive Concurrency Control: Dynamically adjusts concurrency levels based on system load and API limits
- Multi-level Caching System: Reduces duplicate processing and API calls
- Intelligent Retry Mechanism: Exponential backoff strategy and automatic failover
- Resource Monitoring: Real-time monitoring of system resource usage to prevent overload
| Processing Method | Records Processed per Hour | Relative Performance |
|---|---|---|
| Traditional Single-threaded Processing | ~500 | 1x |
| Simple Multi-threading | ~2,000 | 4x |
| LLM Data Processor | ~10,000+ | 20x+ |
- β Databases: PostgreSQL
- β LLM Interfaces: OpenAI API compatible interfaces
-
π Databases:
- MySQL/MariaDB
- SQLite
- MongoDB
- Redis
- Microsoft SQL Server
- Oracle
- Elasticsearch
- Clickhouse
-
π LLM Platforms:
- Ollama
- Anthropic Claude
- Qwen
- Baichuan
- Volcengine
- Xorbits Inference
- Zhipuai
- Google Gemini
- Mistral AI
- Cohere
- Hugging Face Inference API
- β Simple command-line interface based on configuration files
- β High-performance asynchronous processing for PostgreSQL database
- β Support for OpenAI API compatible interfaces
- π Automated logging and error handling
- π Support for more LLM platforms: Ollama, Anthropic, Qwen, Baichuan, Volcengine, etc.
- π Support for more databases: MySQL/MariaDB, SQLite, MongoDB, Redis
- π Python API while maintaining simplicity of use
- π Data quality metrics and reporting
- π Performance optimization for very large datasets
- π§ Simple web configuration interface
- π€ Automatic tuning of processing parameters
- π Streaming data support
- π Distributed processing support
Contributions are welcome! Please feel free to submit a Pull Request.
