Back to Backend Development

Data Engineering

data-engineeringsparketlairflowbig-data
β˜… 4.9 (212)⭐ 4πŸ“„ NOASSERTIONπŸ•’ 2026-01-05Source β†—

Install this skill

npx skills add pluginagentmarketplace/custom-plugin-ai-data-scientist

Works across Claude Code, Cursor, Codex, Copilot & Antigravity

What this skill does

  • β€’Develop ETL pipelines using Apache Airflow for task orchestration
  • β€’Execute large-scale data transformations with PySpark
  • β€’Implement data quality validation checks via Great Expectations
  • β€’Design star schema models for structured data warehousing
  • β€’Manage real-time data ingestion using Kafka producers and consumers

When to use it

  • βœ“When building automated pipelines to ingest raw data into a warehouse
  • βœ“When optimizing slow Spark jobs that require better partitioning or caching
  • βœ“When establishing data quality gates to prevent corrupted data from downstream use
  • βœ“When integrating real-time streaming events into a batch-based storage system

When not to use it

  • βœ•For simple CRUD operations on small, relational database tables
  • βœ•When you only need a basic script for one-off data cleanup without infrastructure requirements

How to invoke it

Example prompts that trigger this skill:

  • β€œHelp me set up an Airflow DAG to extract data from an API and load it to Snowflake.”
  • β€œShow me how to optimize this PySpark transformation for large datasets.”
  • β€œWrite a Great Expectations suite to validate these incoming CSV files.”
  • β€œHelp me implement a Kafka consumer to process streaming logs.”
  • β€œRefactor this SQL query for a star schema data warehouse design.”

Example workflow

  1. Define schema requirements for the target fact and dimension tables.
  2. Write PySpark scripts to perform necessary joins and aggregations.
  3. Configure Airflow tasks to manage dependencies between extraction and transformation.
  4. Integrate Great Expectations checks to ensure target data meets business rules.
  5. Schedule the final pipeline and enable Delta Lake versioning for auditing.

Prerequisites

  • –Python 3.x
  • –Apache Spark environment
  • –Access to a data warehouse or lake
  • –Kafka broker instance for streaming workflows

Pitfalls & limitations

  • !Over-partitioning files in data lakes can lead to metadata overhead issues.
  • !Failing to cache intermediate Spark DataFrames will force redundant re-computations.
  • !Ignoring Airflow backfill configurations can cause massive data duplication.

FAQ

Can I use this for real-time streaming?
Yes, the skill includes patterns for Kafka integration to handle low-latency data streams.
What is the role of Great Expectations here?
It is used to automate data testing, ensuring that your ETL results remain accurate and consistent.
Does this require a specific warehouse?
No, while examples include Snowflake, the architectural patterns are adaptable to most standard data warehousing solutions.
Is this suitable for small datasets?
While you can use it, the tooling is optimized for big data; smaller sets might be better served by standard Python pandas scripts.

How it compares

Generic prompts often provide fragmented code snippets, whereas this skill provides integrated architectural patterns that link components like Airflow, Spark, and validation tools together.

Source & trust

⭐ 4 starsπŸ“„ NOASSERTIONπŸ•’ Updated 2026-01-05πŸ›‘ no risky patterns found

From the source: β€œ# Data Engineering Build scalable data pipelines and infrastructure for big data processing. ## Quick Start with Apache Spark ```python from pyspark.sql import SparkSession from pyspark.sql.functions import col, avg, sum, count # Initialize Spark spark = SparkSession.builder \ .appName("DataProcessi…”

View the full SKILL.md source

# Data Engineering

Build scalable data pipelines and infrastructure for big data processing.

## Quick Start with Apache Spark

```python
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, avg, sum, count

# Initialize Spark
spark = SparkSession.builder \
    .appName("DataProcessing") \
    .config("spark.executor.memory", "4g") \
    .getOrCreate()

# Read data
df = spark.read.parquet("s3://bucket/data/")

# Transformations (lazy evaluation)
df_clean = df \
    .filter(col("value") > 0) \
    .groupBy("category") \
    .agg(
        sum("sales").alias("total_sales"),
        avg("price").alias("avg_price"),
        count("*").alias("count")
    ) \
    .orderBy(col("total_sales").desc())

# Write results
df_clean.write \
    .mode("overwrite") \
    .partitionBy("date") \
    .parquet("s3://bucket/output/")
```

## ETL Pipeline with Apache Airflow

```python
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta

default_args = {
    'owner': 'data-team',
    'depends_on_past': False,
    'start_date': datetime(2024, 1, 1),
    'email_on_failure': True,
    'retries': 3,
    'retry_delay': timedelta(minutes=5),
}

dag = DAG(
    'etl_pipeline',
    default_args=default_args,
    schedule_interval='@daily',
    catchup=False
)

def extract(**context):
    # Extract data from source
    data = fetch_api_data()
    context['task_instance'].xcom_push(key='raw_data', value=data)

def transform(**context):
    # Transform data
    data = context['task_instance'].xcom_pull(key='raw_data')
    cleaned = clean_and_transform(data)
    context['task_instance'].xcom_push(key='clean_data', value=cleaned)

def load(**context):
    # Load to data warehouse
    data = context['task_instance'].xcom_pull(key='clean_data')
    load_to_warehouse(data)

extract_task = PythonOperator(
    task_id='extract',
    python_callable=extract,
    dag=dag
)

transform_task = PythonOperator(
    task_id='transform',
    python_callable=transform,
    dag=dag
)

load_task = PythonOperator(
    task_id='load',
    python_callable=load,
    dag=dag
)

extract_task >> transform_task >> load_task
```

## Data Warehousing

### Star Schema Design
```sql
-- Fact Table
CREATE TABLE fact_sales (
    sale_id SERIAL PRIMARY KEY,
    date_key INT REFERENCES dim_date(date_key),
    product_key INT REFERENCES dim_product(product_key),
    customer_key INT REFERENCES dim_customer(customer_key),
    quantity INT,
    revenue DECIMAL(10,2),
    cost DECIMAL(10,2)
);

-- Dimension Table
CREATE TABLE dim_product (
    product_key INT PRIMARY KEY,
    product_id VARCHAR(50),
    product_name VARCHAR(200),
    category VARCHAR(100),
    brand VARCHAR(100)
);
```

### Snowflake Data Warehouse
```sql
-- Create warehouse
CREATE WAREHOUSE compute_wh
    WAREHOUSE_SIZE = 'MEDIUM'
    AUTO_SUSPEND = 300
    AUTO_RESUME = TRUE;

-- Load data from S3
COPY INTO sales_table
FROM 's3://bucket/data/'
FILE_FORMAT = (TYPE = 'PARQUET')
ON_ERROR = 'CONTINUE';

-- Clustering
ALTER TABLE sales CLUSTER BY (date, region);

-- Time travel
SELECT * FROM sales AT (OFFSET => -3600);  -- 1 hour ago
```

## Big Data Processing

### Spark SQL
```python
# Register as temp view
df.createOrReplaceTempView("sales")

# SQL queries
result = spark.sql("""
    SELECT
        category,
        SUM(sales) as total_sales,
        AVG(price) as avg_price
    FROM sales
    WHERE date >= '2024-01-01'
    GROUP BY category
    HAVING SUM(sales) > 10000
    ORDER BY total_sales DESC
""")

result.show()
```

### Spark Optimization
```python
# Cache in memory
df.cache()

# Repartition
df.repartition(200)

# Broadcast small tables
from pyspark.sql.functions import broadcast
result = large_df.join(broadcast(small_df), "key")

# Persist
from pyspark.storagelevel import StorageLevel
df.persist(StorageLevel.MEMORY_AND_DISK)
```

## Stream Processing with Kafka

```python
from kafka import KafkaProducer, KafkaConsumer
import json

# Producer
producer = KafkaProducer(
    bootstrap_servers=['localhost:9092'],
    value_serializer=lambda v: json.dumps(v).encode('utf-8')
)

producer.send('topic-name', {'key': 'value'})

# Consumer
consumer = KafkaConsumer(
    'topic-name',
    bootstrap_servers=['localhost:9092'],
    value_deserializer=lambda m: json.loads(m.decode('utf-8')),
    group_id='my-group',
    auto_offset_reset='earliest'
)

for message in consumer:
    process_message(message.value)
```

## Data Quality Validation

```python
import great_expectations as ge

# Load data
df = ge.read_csv('data.csv')

# Define expectations
df.expect_column_values_to_not_be_null('user_id')
df.expect_column_values_to_be_unique('email')
df.expect_column_values_to_be_between('age', 0, 120)
df.expect_column_values_to_match_regex(
    'email',
    r'^[\w\.-]+@[\w\.-]+\.\w+$'
)

# Validate
results = df.validate()
print(results)
```

## Delta Lake (Data Lakehouse)

```python
from delta.tables import DeltaTable

# Write to Delta
df.write.format("delta") \
    .mode("overwrite") \
    .save("/path/to/delta-table")

# Read from Delta
df = spark.read.format("delta").load("/path/to/delta-table")

# ACID transactions
deltaTable = DeltaTable.forPath(spark, "/path/to/delta-table")

# Upsert (merge)
deltaTable.alias("target") \
    .merge(
        updates.alias("source"),
        "target.id = source.id"
    ) \
    .whenMatchedUpdate(set={"value": "source.value"}) \
    .whenNotMatchedInsert(
        values={"id": "source.id", "value": "source.value"}
    ) \
    .execute()

# Time travel
df = spark.read.format("delta") \
    .option("versionAsOf", 10) \
    .load("/path/to/delta-table")
```

## Best Practices

1. **Incremental processing**: Process only new data
2. **Idempotency**: Same input produces same output
3. **Data validation**: Check quality at every stage
4. **Monitoring**: Track pipeline health and performance
5. **Error handling**: Retry logic, dead letter queues
6. **Partitioning**: Partition large datasets by date/category
7. **Compression**: Use Parquet, ORC for storage efficiency

Quoted from pluginagentmarketplace/custom-plugin-ai-data-scientist for reference β€” see the original for the authoritative, latest version.

πŸ“„ Full skill instructions β€” original source: pluginagentmarketplace/custom-plugin-ai-data-scientist
The Data Engineering skill provides a structured framework for managing big data workflows, building resilient ETL pipelines, and optimizing data architecture. It assists developers in implementing complex data transformations using Apache Spark, automating task orchestration with Airflow, and designing efficient schema layouts like star schemas. The skill covers the end-to-end data lifecycle, from initial extraction and rigorous quality validation with Great Expectations to advanced storage solutions like Delta Lake and Snowflake warehousing. By applying standardized patterns for memory management, partitioning, and streaming integrations, this skill helps translate raw, disconnected data into clean, accessible formats for analytical consumption. It is focused on production-grade performance and reliability, making it essential for engineers responsible for maintaining scalable data infrastructure and ensuring data integrity across high-volume processing tasks.

How to Use This Skill Unit

Option A: Project-Specific (Recommended)

  1. Click "Download" above
  2. In your project, create the directory: .agent/skills/data-engineering/
  3. Save the file as SKILL.md
  4. The agent will automatically discover the skill based on its description.

Option B: Global Installation (All Agents)

Save the file to these locations to make it available across all projects:

  • Claude Code: ~/.claude/skills/pluginagentmarketplace/custom-plugin-ai-data-scientist/data-engineering/SKILL.md
  • Cursor: ~/.cursor/skills/pluginagentmarketplace/custom-plugin-ai-data-scientist/data-engineering/SKILL.md
  • Antigravity: ~/.gemini/antigravity/skills/pluginagentmarketplace/custom-plugin-ai-data-scientist/data-engineering/SKILL.md

πŸš€ Install with CLI:
npx skills add pluginagentmarketplace/custom-plugin-ai-data-scientist

Read the Master Guide: Mastering Agent Skills β†’

Recommended Rules

View more rules β†’

Recommended Workflows

View more workflows β†’

Recommended MCP Servers

View more MCP servers β†’

Take It Further

Maximize your productivity with these powerful resources

πŸ“‹

Define Your Standards

Set up coding standards to ensure this workflow produces consistent, high-quality results.

Browse Rules Library
πŸ“–

Master Workflows

Learn how to create custom workflows, use Turbo Mode, and build your automation library.

Complete Guide

How to use this Skill in Claude Code & Cursor

For Claude Code (CLI)

To use this skill in Claude Code, copy the rule content into your project's custom instructions or follow our Add-Skill CLI guide. This ensures Claude follows your standards during every code generation.

For Cursor & Windsurf

For Cursor or Windsurf, individual skills are best used in the "Rules for AI" section. This specific unit helps the agent avoid backend development issues, leading to cleaner, more efficient code.

Why the skill format matters: the standardized Agent Skills format lets your AI agent load detailed instructions only when they are relevant, keeping your prompt clean while improving results.

Source & attribution

This skill is categorized under Backend Development and is published by pluginagentmarketplace, maintained in pluginagentmarketplace/custom-plugin-ai-data-scientist.

← Browse All Agent Skills
Sponsored AI assistant. Recommendations may be paid.