Claude Agent Skill · by Wshobson

Data Quality Frameworks

This gives you production-ready data quality validation using Great Expectations, dbt tests, and data contracts. You get specific expectation suites for complet

Install
Terminal · npx
$npx skills add https://github.com/wshobson/agents --skill data-quality-frameworks
Works with Paperclip

How Data Quality Frameworks fits into a Paperclip company.

Data Quality Frameworks drops into any Paperclip agent that handles this kind of work. Assign it to a specialist inside a pre-configured PaperclipOrg company and the skill becomes available on every heartbeat — no prompt engineering, no tool wiring.

S
SaaS FactoryPaired

Pre-configured AI company — 18 agents, 18 skills, one-time purchase.

$27$59
Explore pack
Source file
SKILL.md583 lines
Expand
---name: data-quality-frameworksdescription: Implement data quality validation with Great Expectations, dbt tests, and data contracts. Use when building data quality pipelines, implementing validation rules, or establishing data contracts.--- # Data Quality Frameworks Production patterns for implementing data quality with Great Expectations, dbt tests, and data contracts to ensure reliable data pipelines. ## When to Use This Skill - Implementing data quality checks in pipelines- Setting up Great Expectations validation- Building comprehensive dbt test suites- Establishing data contracts between teams- Monitoring data quality metrics- Automating data validation in CI/CD ## Core Concepts ### 1. Data Quality Dimensions | Dimension        | Description              | Example Check                                      || ---------------- | ------------------------ | -------------------------------------------------- || **Completeness** | No missing values        | `expect_column_values_to_not_be_null`              || **Uniqueness**   | No duplicates            | `expect_column_values_to_be_unique`                || **Validity**     | Values in expected range | `expect_column_values_to_be_in_set`                || **Accuracy**     | Data matches reality     | Cross-reference validation                         || **Consistency**  | No contradictions        | `expect_column_pair_values_A_to_be_greater_than_B` || **Timeliness**   | Data is recent           | `expect_column_max_to_be_between`                  | ### 2. Testing Pyramid for Data ```          /\         /  \     Integration Tests (cross-table)        /────\       /      \   Unit Tests (single column)      /────────\     /          \ Schema Tests (structure)    /────────────\``` ## Quick Start ### Great Expectations Setup ```bash# Installpip install great_expectations # Initialize projectgreat_expectations init # Create datasourcegreat_expectations datasource new``` ```python# great_expectations/checkpoints/daily_validation.ymlimport great_expectations as gx # Create contextcontext = gx.get_context() # Create expectation suitesuite = context.add_expectation_suite("orders_suite") # Add expectationssuite.add_expectation(    gx.expectations.ExpectColumnValuesToNotBeNull(column="order_id"))suite.add_expectation(    gx.expectations.ExpectColumnValuesToBeUnique(column="order_id")) # Validateresults = context.run_checkpoint(checkpoint_name="daily_orders")``` ## Patterns ### Pattern 1: Great Expectations Suite ```python# expectations/orders_suite.pyimport great_expectations as gxfrom great_expectations.core import ExpectationSuitefrom great_expectations.core.expectation_configuration import ExpectationConfiguration def build_orders_suite() -> ExpectationSuite:    """Build comprehensive orders expectation suite"""     suite = ExpectationSuite(expectation_suite_name="orders_suite")     # Schema expectations    suite.add_expectation(ExpectationConfiguration(        expectation_type="expect_table_columns_to_match_set",        kwargs={            "column_set": ["order_id", "customer_id", "amount", "status", "created_at"],            "exact_match": False  # Allow additional columns        }    ))     # Primary key    suite.add_expectation(ExpectationConfiguration(        expectation_type="expect_column_values_to_not_be_null",        kwargs={"column": "order_id"}    ))    suite.add_expectation(ExpectationConfiguration(        expectation_type="expect_column_values_to_be_unique",        kwargs={"column": "order_id"}    ))     # Foreign key    suite.add_expectation(ExpectationConfiguration(        expectation_type="expect_column_values_to_not_be_null",        kwargs={"column": "customer_id"}    ))     # Categorical values    suite.add_expectation(ExpectationConfiguration(        expectation_type="expect_column_values_to_be_in_set",        kwargs={            "column": "status",            "value_set": ["pending", "processing", "shipped", "delivered", "cancelled"]        }    ))     # Numeric ranges    suite.add_expectation(ExpectationConfiguration(        expectation_type="expect_column_values_to_be_between",        kwargs={            "column": "amount",            "min_value": 0,            "max_value": 100000,            "strict_min": True  # amount > 0        }    ))     # Date validity    suite.add_expectation(ExpectationConfiguration(        expectation_type="expect_column_values_to_be_dateutil_parseable",        kwargs={"column": "created_at"}    ))     # Freshness - data should be recent    suite.add_expectation(ExpectationConfiguration(        expectation_type="expect_column_max_to_be_between",        kwargs={            "column": "created_at",            "min_value": {"$PARAMETER": "now - timedelta(days=1)"},            "max_value": {"$PARAMETER": "now"}        }    ))     # Row count sanity    suite.add_expectation(ExpectationConfiguration(        expectation_type="expect_table_row_count_to_be_between",        kwargs={            "min_value": 1000,  # Expect at least 1000 rows            "max_value": 10000000        }    ))     # Statistical expectations    suite.add_expectation(ExpectationConfiguration(        expectation_type="expect_column_mean_to_be_between",        kwargs={            "column": "amount",            "min_value": 50,            "max_value": 500        }    ))     return suite``` ### Pattern 2: Great Expectations Checkpoint ```yaml# great_expectations/checkpoints/orders_checkpoint.ymlname: orders_checkpointconfig_version: 1.0class_name: Checkpointrun_name_template: "%Y%m%d-%H%M%S-orders-validation" validations:  - batch_request:      datasource_name: warehouse      data_connector_name: default_inferred_data_connector_name      data_asset_name: orders      data_connector_query:        index: -1 # Latest batch    expectation_suite_name: orders_suite action_list:  - name: store_validation_result    action:      class_name: StoreValidationResultAction   - name: store_evaluation_parameters    action:      class_name: StoreEvaluationParametersAction   - name: update_data_docs    action:      class_name: UpdateDataDocsAction   # Slack notification on failure  - name: send_slack_notification    action:      class_name: SlackNotificationAction      slack_webhook: ${SLACK_WEBHOOK}      notify_on: failure      renderer:        module_name: great_expectations.render.renderer.slack_renderer        class_name: SlackRenderer``` ```python# Run checkpointimport great_expectations as gx context = gx.get_context()result = context.run_checkpoint(checkpoint_name="orders_checkpoint") if not result.success:    failed_expectations = [        r for r in result.run_results.values()        if not r.success    ]    raise ValueError(f"Data quality check failed: {failed_expectations}")``` ### Pattern 3: dbt Data Tests ```yaml# models/marts/core/_core__models.ymlversion: 2 models:  - name: fct_orders    description: Order fact table    tests:      # Table-level tests      - dbt_utils.recency:          datepart: day          field: created_at          interval: 1      - dbt_utils.at_least_one      - dbt_utils.expression_is_true:          expression: "total_amount >= 0"     columns:      - name: order_id        description: Primary key        tests:          - unique          - not_null       - name: customer_id        description: Foreign key to dim_customers        tests:          - not_null          - relationships:              to: ref('dim_customers')              field: customer_id       - name: order_status        tests:          - accepted_values:              values:                ["pending", "processing", "shipped", "delivered", "cancelled"]       - name: total_amount        tests:          - not_null          - dbt_utils.expression_is_true:              expression: ">= 0"       - name: created_at        tests:          - not_null          - dbt_utils.expression_is_true:              expression: "<= current_timestamp"   - name: dim_customers    columns:      - name: customer_id        tests:          - unique          - not_null       - name: email        tests:          - unique          - not_null          # Custom regex test          - dbt_utils.expression_is_true:              expression: "email ~ '^[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\\.[A-Za-z]{2,}$'"``` ### Pattern 4: Custom dbt Tests ```sql-- tests/generic/test_row_count_in_range.sql{% test row_count_in_range(model, min_count, max_count) %} with row_count as (    select count(*) as cnt from {{ model }}) select cntfrom row_countwhere cnt < {{ min_count }} or cnt > {{ max_count }} {% endtest %} -- Usage in schema.yml:-- tests:--   - row_count_in_range:--       min_count: 1000--       max_count: 10000000``` ```sql-- tests/generic/test_sequential_values.sql{% test sequential_values(model, column_name, interval=1) %} with lagged as (    select        {{ column_name }},        lag({{ column_name }}) over (order by {{ column_name }}) as prev_value    from {{ model }}) select *from laggedwhere {{ column_name }} - prev_value != {{ interval }}  and prev_value is not null {% endtest %}``` ```sql-- tests/singular/assert_orders_customers_match.sql-- Singular test: specific business rule with orders_customers as (    select distinct customer_id from {{ ref('fct_orders') }}), dim_customers as (    select customer_id from {{ ref('dim_customers') }}), orphaned_orders as (    select o.customer_id    from orders_customers o    left join dim_customers c using (customer_id)    where c.customer_id is null) select * from orphaned_orders-- Test passes if this returns 0 rows``` ### Pattern 5: Data Contracts ```yaml# contracts/orders_contract.yamlapiVersion: datacontract.com/v1.0.0kind: DataContractmetadata:  name: orders  version: 1.0.0  owner: data-platform-team  contact: data-team@company.com info:  title: Orders Data Contract  description: Contract for order event data from the ecommerce platform  purpose: Analytics, reporting, and ML features servers:  production:    type: snowflake    account: company.us-east-1    database: ANALYTICS    schema: CORE terms:  usage: Internal analytics only  limitations: PII must not be exposed in downstream marts  billing: Charged per query TB scanned schema:  type: object  properties:    order_id:      type: string      format: uuid      description: Unique order identifier      required: true      unique: true      pii: false     customer_id:      type: string      format: uuid      description: Customer identifier      required: true      pii: true      piiClassification: indirect     total_amount:      type: number      minimum: 0      maximum: 100000      description: Order total in USD     created_at:      type: string      format: date-time      description: Order creation timestamp      required: true     status:      type: string      enum: [pending, processing, shipped, delivered, cancelled]      description: Current order status quality:  type: SodaCL  specification:    checks for orders:      - row_count > 0      - missing_count(order_id) = 0      - duplicate_count(order_id) = 0      - invalid_count(status) = 0:          valid values: [pending, processing, shipped, delivered, cancelled]      - freshness(created_at) < 24h sla:  availability: 99.9%  freshness: 1 hour  latency: 5 minutes``` ### Pattern 6: Automated Quality Pipeline ```python# quality_pipeline.pyfrom dataclasses import dataclassfrom typing import List, Dict, Anyimport great_expectations as gxfrom datetime import datetime @dataclassclass QualityResult:    table: str    passed: bool    total_expectations: int    failed_expectations: int    details: List[Dict[str, Any]]    timestamp: datetime class DataQualityPipeline:    """Orchestrate data quality checks across tables"""     def __init__(self, context: gx.DataContext):        self.context = context        self.results: List[QualityResult] = []     def validate_table(self, table: str, suite: str) -> QualityResult:        """Validate a single table against expectation suite"""         checkpoint_config = {            "name": f"{table}_validation",            "config_version": 1.0,            "class_name": "Checkpoint",            "validations": [{                "batch_request": {                    "datasource_name": "warehouse",                    "data_asset_name": table,                },                "expectation_suite_name": suite,            }],        }         result = self.context.run_checkpoint(**checkpoint_config)         # Parse results        validation_result = list(result.run_results.values())[0]        results = validation_result.results         failed = [r for r in results if not r.success]         return QualityResult(            table=table,            passed=result.success,            total_expectations=len(results),            failed_expectations=len(failed),            details=[{                "expectation": r.expectation_config.expectation_type,                "success": r.success,                "observed_value": r.result.get("observed_value"),            } for r in results],            timestamp=datetime.now()        )     def run_all(self, tables: Dict[str, str]) -> Dict[str, QualityResult]:        """Run validation for all tables"""        results = {}         for table, suite in tables.items():            print(f"Validating {table}...")            results[table] = self.validate_table(table, suite)         return results     def generate_report(self, results: Dict[str, QualityResult]) -> str:        """Generate quality report"""        report = ["# Data Quality Report", f"Generated: {datetime.now()}", ""]         total_passed = sum(1 for r in results.values() if r.passed)        total_tables = len(results)         report.append(f"## Summary: {total_passed}/{total_tables} tables passed")        report.append("")         for table, result in results.items():            status = "✅" if result.passed else "❌"            report.append(f"### {status} {table}")            report.append(f"- Expectations: {result.total_expectations}")            report.append(f"- Failed: {result.failed_expectations}")             if not result.passed:                report.append("- Failed checks:")                for detail in result.details:                    if not detail["success"]:                        report.append(f"  - {detail['expectation']}: {detail['observed_value']}")            report.append("")         return "\n".join(report) # Usagecontext = gx.get_context()pipeline = DataQualityPipeline(context) tables_to_validate = {    "orders": "orders_suite",    "customers": "customers_suite",    "products": "products_suite",} results = pipeline.run_all(tables_to_validate)report = pipeline.generate_report(results) # Fail pipeline if any table failedif not all(r.passed for r in results.values()):    print(report)    raise ValueError("Data quality checks failed!")``` ## Best Practices ### Do's - **Test early** - Validate source data before transformations- **Test incrementally** - Add tests as you find issues- **Document expectations** - Clear descriptions for each test- **Alert on failures** - Integrate with monitoring- **Version contracts** - Track schema changes ### Don'ts - **Don't test everything** - Focus on critical columns- **Don't ignore warnings** - They often precede failures- **Don't skip freshness** - Stale data is bad data- **Don't hardcode thresholds** - Use dynamic baselines- **Don't test in isolation** - Test relationships too