3 lines of code • Any data source • 10x faster
One API for CSV, Parquet, Snowflake, Databricks, BigQuery, and 15+ sources. No boilerplate.
pip install duckguardfrom duckguard import connect
orders = connect("s3://warehouse/orders.parquet") # Cloud, local, or warehouse
assert orders.customer_id.is_not_null() # Just like pytest!
assert orders.total_amount.between(0, 10000) # Readable validations
assert orders.status.isin(["pending", "shipped", "delivered"])
quality = orders.score()
print(f"Grade: {quality.grade}") # A, B, C, D, or FThat's it. Same 3 lines whether your data lives in S3, Snowflake, Databricks, or a local CSV. No context. No datasource. No validator. No expectation suite. Just data quality.
from duckguard import connect
# Data Lakes
orders = connect("s3://bucket/orders.parquet") # AWS S3
orders = connect("gs://bucket/orders.parquet") # Google Cloud
orders = connect("az://container/orders.parquet") # Azure Blob
# Data Warehouses
orders = connect("snowflake://account/db", table="orders") # Snowflake
orders = connect("databricks://host/catalog", table="orders") # Databricks
orders = connect("bigquery://project", table="orders") # BigQuery
orders = connect("redshift://cluster/db", table="orders") # Redshift
orders = connect("fabric://workspace/lakehouse/Tables/orders") # Microsoft Fabric
# Modern Table Formats
orders = connect("delta://path/to/delta_table") # Delta Lake
orders = connect("iceberg://path/to/iceberg_table") # Apache Iceberg
# Databases
orders = connect("postgres://localhost/db", table="orders") # PostgreSQL
orders = connect("mysql://localhost/db", table="orders") # MySQL
# Files & DataFrames
orders = connect("orders.parquet") # Parquet, CSV, JSON, Excel
orders = connect(pandas_dataframe) # pandas DataFrame
# Hugging Face Datasets
from datasets import load_dataset
df = load_dataset("my_dataset", split="train").to_pandas()
orders = connect(df) # HF → DuckGuard15+ connectors. Install what you need:
pip install duckguard[snowflake],duckguard[databricks], orduckguard[all]
Every data quality tool asks you to write 50+ lines of boilerplate before you can validate a single column. DuckGuard gives you a pytest-like API powered by DuckDB's speed.
|
Great Expectations # 50+ lines of setup required
from great_expectations import get_context
context = get_context()
datasource = context.sources.add_pandas("my_ds")
asset = datasource.add_dataframe_asset(
name="orders", dataframe=df
)
batch_request = asset.build_batch_request()
expectation_suite = context.add_expectation_suite(
"orders_suite"
)
validator = context.get_validator(
batch_request=batch_request,
expectation_suite_name="orders_suite"
)
validator.expect_column_values_to_not_be_null(
"customer_id"
)
validator.expect_column_values_to_be_between(
"amount", min_value=0, max_value=10000
)
# ... and more configuration45 seconds | 4GB RAM | 20+ dependencies |
DuckGuard from duckguard import connect
orders = connect(
"snowflake://account/db",
table="orders"
)
assert orders.customer_id.is_not_null()
assert orders.total_amount.between(0, 10000)4 seconds | 200MB RAM | 7 dependencies |
| Feature | DuckGuard | Great Expectations | Soda Core | Pandera |
|---|---|---|---|---|
| Lines of code to start | 3 | 50+ | 10+ | 5+ |
| Time for 1GB CSV* | ~4 sec | ~45 sec | ~20 sec | ~15 sec |
| Memory for 1GB CSV* | ~200 MB | ~4 GB | ~1.5 GB | ~1.5 GB |
| Learning curve | Minutes | Days | Hours | Minutes |
| Pytest-like API | Yes | - | - | - |
| DuckDB-powered | Yes | - | Partial | - |
| Cloud storage (S3/GCS/Azure) | Yes | Yes | Yes | - |
| Database connectors | 11+ | Yes | Yes | - |
| PII detection | Built-in | - | - | - |
| Anomaly detection (7 methods) | Built-in | - | Partial | - |
| Schema evolution tracking | Built-in | - | Yes | - |
| Freshness monitoring | Built-in | - | Yes | - |
| Data contracts | Yes | - | Yes | Yes |
| Row-level error details | Yes | Yes | - | Yes |
| Cross-dataset & FK checks | Built-in | Partial | Yes | - |
| Reconciliation | Built-in | - | - | - |
| Distribution drift | Built-in | - | - | - |
| Conditional checks | Built-in | - | - | - |
| Query-based checks | Built-in | - | Yes | - |
| YAML rules | Yes | Yes | Yes | - |
| dbt integration | Yes | Yes | Yes | - |
| Slack/Teams/Email alerts | Yes | Yes | Yes | - |
| HTML/PDF reports | Yes | Yes | Yes | - |
*Performance varies by hardware and data characteristics. Based on typical usage patterns with DuckDB's columnar engine.
pip install duckguard
# With optional features
pip install duckguard[reports] # HTML/PDF reports
pip install duckguard[snowflake] # Snowflake connector
pip install duckguard[databricks] # Databricks connector
pip install duckguard[airflow] # Airflow integration
pip install duckguard[all] # Everything|
Quality Scoring A-F grades with 4 quality dimensions |
PII Detection Auto-detect emails, SSNs, phones |
Anomaly Detection Z-score, IQR, KS-test, ML baselines |
Alerts Slack, Teams, Email |
|
Freshness Monitoring Detect stale data automatically |
Schema Evolution Track and detect breaking changes |
Data Contracts Schema + SLA enforcement |
Row-Level Errors See exactly which rows failed |
|
HTML/PDF Reports Beautiful shareable reports |
Historical Tracking Quality trends over time |
Cross-Dataset Checks FK, reconciliation, drift |
CI/CD Ready dbt, Airflow, GitHub Actions |
|
YAML Rules Declarative validation rules |
Auto-Profiling Semantic types & rule suggestions |
Conditional Checks Validate when conditions are met |
Group-By Validation Segmented per-group checks |
orders = connect("orders.csv")
# Null & uniqueness
orders.order_id.is_not_null() # No nulls allowed
orders.order_id.is_unique() # All values distinct
orders.order_id.has_no_duplicates() # Alias for is_unique
# Range & comparison
orders.total_amount.between(0, 10000) # Inclusive range
orders.total_amount.greater_than(0) # Minimum (exclusive)
orders.total_amount.less_than(100000) # Maximum (exclusive)
# Pattern & enum
orders.email.matches(r'^[\w.+-]+@[\w-]+\.[\w.]+$')
orders.status.isin(["pending", "shipped", "delivered"])
# String length
orders.order_id.value_lengths_between(5, 10)Every validation returns a ValidationResult with .passed, .message, .summary(), and .failed_rows.
result = orders.quantity.between(1, 100)
if not result.passed:
print(result.summary())
# Column 'quantity' has 3 values outside [1, 100]
#
# Sample of 3 failing rows (total: 3):
# Row 5: quantity=500 - Value outside range [1, 100]
# Row 23: quantity=-2 - Value outside range [1, 100]
# Row 29: quantity=0 - Value outside range [1, 100]
for row in result.failed_rows:
print(f"Row {row.row_number}: {row.value} ({row.reason})")
print(result.get_failed_values()) # [500, -2, 0]
print(result.get_failed_row_indices()) # [5, 23, 29]score = orders.score()
print(score.grade) # A, B, C, D, or F
print(score.overall) # 0-100 composite score
print(score.completeness) # % non-null across all columns
print(score.uniqueness) # % unique across key columns
print(score.validity) # % values passing type/range checks
print(score.consistency) # % consistent formattingorders = connect("orders.csv")
customers = connect("customers.csv")
# Foreign key check
result = orders.customer_id.exists_in(customers.customer_id)
# FK with null handling
result = orders.customer_id.references(customers.customer_id, allow_nulls=True)
# Get orphan values for debugging
orphans = orders.customer_id.find_orphans(customers.customer_id)
print(f"Invalid IDs: {orphans}")
# Compare value sets
result = orders.status.matches_values(lookup.code)
# Compare row counts with tolerance
result = orders.row_count_matches(backup, tolerance=10)source = connect("orders_source.parquet")
target = connect("orders_migrated.parquet")
recon = source.reconcile(
target,
key_columns=["order_id"],
compare_columns=["amount", "status", "customer_id"],
)
print(recon.match_percentage) # 95.5
print(recon.missing_in_target) # 3
print(recon.extra_in_target) # 1
print(recon.value_mismatches) # {'amount': 5, 'status': 2}
print(recon.summary())baseline = connect("orders_jan.parquet")
current = connect("orders_feb.parquet")
drift = current.amount.detect_drift(baseline.amount)
print(drift.is_drifted) # True/False
print(drift.p_value) # 0.0023
print(drift.statistic) # KS statistic
print(drift.message) # Human-readable summarygrouped = orders.group_by("region")
print(grouped.groups) # [{'region': 'North'}, ...]
print(grouped.group_count) # 4
for stat in grouped.stats():
print(stat) # {'region': 'North', 'row_count': 150}
# Ensure every group has at least 10 rows
result = grouped.row_count_greater_than(10)
for g in result.get_failed_groups():
print(f"{g.key_string}: only {g.row_count} rows")DuckGuard 3.2 adds AI-powered data quality — the first data quality library with native LLM integration.
# Explain quality issues in plain English
duckguard explain orders.csv
# AI suggests validation rules based on your data
duckguard suggest orders.csv
# Get AI-powered fix suggestions for quality issues
duckguard fix orders.csvfrom duckguard.ai import explainer, rules_generator, fixer
# Natural language quality explanation
summary = explainer.explain(dataset)
# AI-generated validation rules
rules = rules_generator.suggest_rules(dataset)
# Suggest fixes for data quality issues
fixes = fixer.suggest_fixes(dataset, results)Supports OpenAI, Anthropic, and Ollama (local models). Configure via environment variables or AIConfig.
- 🔍 Improved semantic type detection — smarter column classification, fewer false positives
- 📄 Apache 2.0 license — OSI-approved, enterprise-friendly
- 🛡️ SQL injection prevention — multi-layer escaping in all string-based checks
- 📖 Full documentation site — xdatahubai.github.io/duckguard
- 🔒 PEP 561 typed —
py.typedmarker for mypy/pyright
DuckGuard 3.0 introduces conditional checks, multi-column validation, query-based expectations, distributional tests, and 7 anomaly detection methods.
Apply validation rules only when a SQL condition is met:
# Email required only for shipped orders
orders.email.not_null_when("status = 'shipped'")
# Quantity must be 1-100 for US orders
orders.quantity.between_when(1, 100, "country = 'US'")
# Status must be shipped or delivered for UK
orders.status.isin_when(["shipped", "delivered"], "country = 'UK'")
# Also: unique_when(), matches_when()Validate relationships across columns:
# Ship date must come after created date
orders.expect_column_pair_satisfy(
column_a="ship_date",
column_b="created_at",
expression="ship_date >= created_at",
)
# Composite key uniqueness
orders.expect_columns_unique(columns=["order_id", "customer_id"])
# Multi-column sum check
orders.expect_multicolumn_sum_to_equal(
columns=["subtotal", "tax", "shipping"],
expected_sum=59.50,
)Run custom SQL for unlimited flexibility:
# No rows should have negative quantities
orders.expect_query_to_return_no_rows(
"SELECT * FROM table WHERE quantity < 0"
)
# Verify data exists
orders.expect_query_to_return_rows(
"SELECT * FROM table WHERE status = 'shipped'"
)
# Exact value check on aggregate
orders.expect_query_result_to_equal(
"SELECT COUNT(*) FROM table", expected=1000
)
# Range check on aggregate
orders.expect_query_result_to_be_between(
"SELECT AVG(amount) FROM table", min_value=50, max_value=500
)Statistical tests for distribution shape (requires scipy):
# Test for normal distribution
orders.total_amount.expect_distribution_normal(significance_level=0.05)
# Kolmogorov-Smirnov test
orders.quantity.expect_ks_test(distribution="norm")
# Chi-square goodness of fit
orders.status.expect_chi_square_test()from duckguard import detect_anomalies, AnomalyDetector
from duckguard.anomaly import BaselineMethod, KSTestMethod, SeasonalMethod
# High-level API: detect anomalies across columns
report = detect_anomalies(orders, method="zscore", columns=["quantity", "amount"])
print(report.has_anomalies, report.anomaly_count)
for a in report.anomalies:
print(f"{a.column}: score={a.score:.2f}, anomaly={a.is_anomaly}")
# AnomalyDetector with IQR
detector = AnomalyDetector(method="iqr", threshold=1.5)
report = detector.detect(orders, columns=["quantity"])
# ML Baseline: fit on historical data, score new values
baseline = BaselineMethod(sensitivity=2.0)
baseline.fit([100, 102, 98, 105, 97, 103])
print(baseline.baseline_mean, baseline.baseline_std)
score = baseline.score(250) # Single value
print(score.is_anomaly, score.score)
scores = baseline.score(orders.total_amount) # Entire column
print(max(scores))
# KS-Test: detect distribution drift
ks = KSTestMethod(p_value_threshold=0.05)
ks.fit([1, 2, 3, 4, 5])
comparison = ks.compare_distributions([10, 11, 12, 13, 14])
print(comparison.is_drift, comparison.p_value, comparison.message)
# Seasonal: time-aware anomaly detection
seasonal = SeasonalMethod(period="daily", sensitivity=2.0)
seasonal.fit([10, 12, 11, 13, 9, 14])Available methods: zscore, iqr, modified_zscore, percent_change, baseline, ks_test, seasonal
# duckguard.yaml
name: orders_validation
description: Quality checks for the orders dataset
checks:
order_id:
- not_null
- unique
quantity:
- between: [1, 1000]
status:
- allowed_values: [pending, shipped, delivered, cancelled, returned]
email:
- not_null:
severity: warningfrom duckguard import load_rules, execute_rules
rules = load_rules("duckguard.yaml")
result = execute_rules(rules, "orders.csv")
print(f"Passed: {result.passed_count}/{result.total_checks}")
for r in result.results:
tag = "PASS" if r.passed else "FAIL"
print(f" [{tag}] {r.message}")from duckguard import connect, generate_rules
orders = connect("orders.csv")
yaml_rules = generate_rules(orders, dataset_name="orders")
print(yaml_rules) # Ready-to-use YAMLfrom duckguard import generate_contract, validate_contract, diff_contracts
from duckguard.contracts import contract_to_yaml
# Generate a contract from existing data
contract = generate_contract(orders, name="orders_v1", owner="data-team")
print(contract.name, contract.version, len(contract.schema))
# Validate data against a contract
validation = validate_contract(contract, "orders.csv")
print(validation.passed)
# Export to YAML
print(contract_to_yaml(contract))
# Detect breaking changes between versions
diff = diff_contracts(contract_v1, contract_v2)
if diff.has_breaking_changes:
for change in diff.changes:
print(change)from duckguard import AutoProfiler, SemanticAnalyzer, detect_type, detect_types_for_dataset
# Profile entire dataset — quality scores, pattern detection, and rule suggestions included
profiler = AutoProfiler()
profile = profiler.profile(orders)
print(f"Columns: {profile.column_count}, Rows: {profile.row_count}")
print(f"Quality: {profile.overall_quality_grade} ({profile.overall_quality_score:.1f}/100)")
# Per-column quality grades and percentiles
for col in profile.columns:
print(f" {col.name}: grade={col.quality_grade}, nulls={col.null_percent:.1f}%")
if col.median_value is not None:
print(f" p25={col.p25_value}, median={col.median_value}, p75={col.p75_value}")
# Suggested rules (25+ pattern types: email, SSN, UUID, credit card, etc.)
print(f"Suggested rules: {len(profile.suggested_rules)}")
for rule in profile.suggested_rules[:5]:
print(f" {rule}")
# Deep profiling — distribution analysis + outlier detection (numeric columns)
deep_profiler = AutoProfiler(deep=True)
deep_profile = deep_profiler.profile(orders)
for col in deep_profile.columns:
if col.distribution_type:
print(f" {col.name}: {col.distribution_type}, skew={col.skewness:.2f}")
if col.outlier_count is not None:
print(f" outliers: {col.outlier_count} ({col.outlier_percentage:.1f}%)")
# Configurable thresholds
strict = AutoProfiler(null_threshold=0.0, unique_threshold=100.0, pattern_min_confidence=95.0)
strict_profile = strict.profile(orders)# Detect semantic type for a single column
print(detect_type(orders, "email")) # SemanticType.EMAIL
print(detect_type(orders, "country")) # SemanticType.COUNTRY_CODE
# Detect types for all columns at once
type_map = detect_types_for_dataset(orders)
for col, stype in type_map.items():
print(f" {col}: {stype}")
# Full PII analysis
analysis = SemanticAnalyzer().analyze(orders)
print(f"PII columns: {analysis.pii_columns}") # ['email', 'phone']
for col in analysis.columns:
if col.is_pii:
print(f" {col.name}: {col.semantic_type.value} (confidence: {col.confidence:.0%})")Supported semantic types: email, phone, url, ip_address, ssn, credit_card, person_name, address, country, state, city, zipcode, latitude, longitude, date, datetime, currency, percentage, boolean, uuid, identifier, and more.
from datetime import timedelta
from duckguard.freshness import FreshnessMonitor
# Quick check
print(orders.freshness.last_modified) # 2024-01-30 14:22:01
print(orders.freshness.age_human) # "2 hours ago"
print(orders.freshness.is_fresh) # True
# Custom threshold
print(orders.is_fresh(timedelta(hours=6)))
# Structured monitoring
monitor = FreshnessMonitor(threshold=timedelta(hours=1))
result = monitor.check(orders)
print(result.is_fresh, result.age_human)from duckguard.schema_history import SchemaTracker, SchemaChangeAnalyzer
# Capture a snapshot
tracker = SchemaTracker()
snapshot = tracker.capture(orders)
for col in snapshot.columns[:5]:
print(f" {col.name}: {col.dtype}")
# View history
history = tracker.get_history(orders.source)
print(f"Snapshots: {len(history)}")
# Detect breaking changes
analyzer = SchemaChangeAnalyzer()
report = analyzer.detect_changes(orders)
print(report.has_breaking_changes, len(report.changes))from duckguard.history import HistoryStorage, TrendAnalyzer
# Store validation results
storage = HistoryStorage()
storage.store(exec_result)
# Query past runs
runs = storage.get_runs("orders.csv", limit=10)
for run in runs:
print(f" {run.run_id}: passed={run.passed}, checks={run.total_checks}")
# Analyze quality trends
trends = TrendAnalyzer(storage).analyze("orders.csv", days=30)
print(trends.summary())DuckGuard generates self-contained HTML reports with dark mode, trend charts, collapsible sections, sortable tables, and search — all in a single file with zero JavaScript dependencies.
Live demos: Light / Auto Theme • Dark Theme
from duckguard.reports import HTMLReporter, ReportConfig, generate_html_report
# Quick one-liner
generate_html_report(exec_result, "report.html")
# Full-featured report with trends and metadata
config = ReportConfig(
title="Orders Quality Report",
dark_mode="auto", # "auto", "light", or "dark"
include_trends=True,
include_metadata=True,
)
reporter = HTMLReporter(config=config)
reporter.generate(
exec_result,
"report.html",
trend_data=trend_data, # from HistoryStorage.get_trend()
row_count=dataset.row_count,
column_count=dataset.column_count,
)
# PDF export (requires weasyprint)
from duckguard.reports import generate_pdf_report
generate_pdf_report(exec_result, "report.pdf")from duckguard.notifications import (
SlackNotifier, TeamsNotifier, EmailNotifier,
format_results_text, format_results_markdown,
)
slack = SlackNotifier(webhook_url="https://hooks.slack.com/services/XXX")
teams = TeamsNotifier(webhook_url="https://outlook.office.com/webhook/XXX")
email = EmailNotifier(
smtp_host="smtp.example.com", smtp_port=587,
smtp_user="user", smtp_password="pass",
to_addresses=["team@example.com"],
)
# Format for custom integrations
print(format_results_text(exec_result))
print(format_results_markdown(exec_result))from duckguard.integrations.dbt import rules_to_dbt_tests
dbt_tests = rules_to_dbt_tests(rules)from airflow import DAG
from airflow.operators.python import PythonOperator
def validate_orders():
from duckguard import connect, load_rules, execute_rules
rules = load_rules("duckguard.yaml")
result = execute_rules(rules, "s3://bucket/orders.parquet")
if not result.passed:
raise Exception(f"Quality check failed: {result.failed_count} failures")
dag = DAG("data_quality", schedule_interval="@daily", ...)
PythonOperator(task_id="validate", python_callable=validate_orders, dag=dag)name: Data Quality
on: [push]
jobs:
quality-check:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: actions/setup-python@v5
with: { python-version: "3.11" }
- run: pip install duckguard
- run: duckguard check data/orders.csv --rules duckguard.yaml# tests/test_data_quality.py
from duckguard import connect
def test_orders_quality():
orders = connect("data/orders.csv")
assert orders.row_count > 0
assert orders.order_id.is_not_null()
assert orders.order_id.is_unique()
assert orders.quantity.between(0, 10000)
assert orders.status.isin(["pending", "shipped", "delivered", "cancelled"])# Validate data against rules
duckguard check orders.csv --config duckguard.yaml
# Auto-discover rules from data
duckguard discover orders.csv > duckguard.yaml
# Generate reports (with dark mode and trend charts)
duckguard report orders.csv --output report.html --dark-mode auto --trends
# Anomaly detection
duckguard anomaly orders.csv --method zscore
# Freshness check
duckguard freshness orders.csv --max-age 6h
# Schema tracking
duckguard schema orders.csv --action capture
duckguard schema orders.csv --action changes
# Data contracts
duckguard contract generate orders.csv
duckguard contract validate orders.csv
# Dataset info
duckguard info orders.csv
# Profile dataset with quality scoring
duckguard profile orders.csv
duckguard profile orders.csv --deep --format jsonBuilt on DuckDB for fast, memory-efficient validation:
| Dataset | Great Expectations | DuckGuard | Speedup |
|---|---|---|---|
| 1GB CSV | 45 sec, 4GB RAM | 4 sec, 200MB RAM | 10x faster |
| 10GB Parquet | 8 min, 32GB RAM | 45 sec, 2GB RAM | 10x faster |
| 100M rows | Minutes | Seconds | 10x faster |
- DuckDB engine: Columnar, vectorized, SIMD-optimized
- Zero copy: Direct file access, no DataFrame conversion
- Lazy evaluation: Only compute what's needed
- Memory efficient: Stream large files without loading entirely
| Data Size | Recommendation |
|---|---|
| < 10M rows | DuckGuard directly |
| 10-100M rows | Use Parquet, configure memory_limit |
| 100GB+ | Use database connectors (Snowflake, BigQuery, Databricks) |
from duckguard import DuckGuardEngine, connect
engine = DuckGuardEngine(memory_limit="8GB")
dataset = connect("large_data.parquet", engine=engine)col.null_count # Number of null values
col.null_percent # Percentage of null values
col.unique_count # Number of distinct values
col.min, col.max # Min/max values (numeric)
col.mean, col.median # Mean and median (numeric)
col.stddev # Standard deviation (numeric)| Method | Description |
|---|---|
col.is_not_null() |
No nulls allowed |
col.is_unique() |
All values distinct |
col.between(min, max) |
Range check (inclusive) |
col.greater_than(val) |
Minimum (exclusive) |
col.less_than(val) |
Maximum (exclusive) |
col.matches(regex) |
Regex pattern check |
col.isin(values) |
Allowed values |
col.has_no_duplicates() |
No duplicate values |
col.value_lengths_between(min, max) |
String length range |
col.exists_in(ref_col) |
FK: values exist in reference |
col.references(ref_col, allow_nulls) |
FK with null handling |
col.find_orphans(ref_col) |
List orphan values |
col.matches_values(other_col) |
Compare value sets |
col.detect_drift(ref_col) |
KS-test drift detection |
col.not_null_when(condition) |
Conditional not-null |
col.unique_when(condition) |
Conditional uniqueness |
col.between_when(min, max, condition) |
Conditional range |
col.isin_when(values, condition) |
Conditional enum |
col.matches_when(pattern, condition) |
Conditional pattern |
col.expect_distribution_normal() |
Normality test |
col.expect_ks_test(distribution) |
KS distribution test |
col.expect_chi_square_test() |
Chi-square test |
| Method | Description |
|---|---|
ds.score() |
Quality score (completeness, uniqueness, validity, consistency) |
ds.reconcile(target, key_columns, compare_columns) |
Full reconciliation |
ds.row_count_matches(other, tolerance) |
Row count comparison |
ds.group_by(columns) |
Group-level validation |
ds.expect_column_pair_satisfy(a, b, expr) |
Column pair check |
ds.expect_columns_unique(columns) |
Composite key uniqueness |
ds.expect_multicolumn_sum_to_equal(columns, sum) |
Multi-column sum |
ds.expect_query_to_return_no_rows(sql) |
Custom SQL: no violations |
ds.expect_query_to_return_rows(sql) |
Custom SQL: data exists |
ds.expect_query_result_to_equal(sql, val) |
Custom SQL: exact value |
ds.expect_query_result_to_be_between(sql, min, max) |
Custom SQL: range |
ds.is_fresh(max_age) |
Data freshness check |
ds.head(n) |
Preview first n rows |
DuckGuard provides helpful, actionable error messages with suggestions:
try:
orders.nonexistent_column
except ColumnNotFoundError as e:
print(e)
# Column 'nonexistent_column' not found.
# Available columns: order_id, customer_id, product_name, ...
try:
connect("ftp://data.example.com/file.xyz")
except UnsupportedConnectorError as e:
print(e)
# No connector found for: ftp://data.example.com/file.xyz
# Supported formats: CSV, Parquet, JSON, PostgreSQL, MySQL, ...We'd love to hear from you! Whether you have a question, idea, or want to share how you're using DuckGuard:
- GitHub Discussions — Ask questions, share ideas, show what you've built
- GitHub Issues — Report bugs or request features
- Contributing Guide — Learn how to contribute code, tests, or docs
We welcome contributions! See CONTRIBUTING.md for guidelines.
git clone https://github.com/XDataHubAI/duckguard.git
cd duckguard
pip install -e ".[dev]"
pytest # Run tests
black src tests # Format code
ruff check src tests # LintApache License 2.0 - see LICENSE
Built with ❤️ by the DuckGuard Team