Back to DevOps & CI/CD

spark-optimization

Apache SparkBig DataPerformance TuningData EngineeringETLDistributed ComputingScalaPySpark
⭐ 36.8kπŸ“„ MITπŸ•’ 2026-06-16Source β†—

Install this skill

npx skills add wshobson/agents

Works across Claude Code, Cursor, Codex, Copilot & Antigravity

The spark-optimization skill provides a structured methodology for refining Apache Spark workloads. It centers on reducing resource waste caused by improper partitioning, excessive data shuffling, and memory pressure. By configuring Adaptive Query Execution (AQE), implementing effective broadcast joins, and managing data skew through manual salting or bucketing, this skill transforms inefficient pipelines into performant distributed jobs. It focuses on the mechanics of the Spark execution engine, including stage boundaries, task distribution, and serialization overhead. The skill offers actionable logic for calculating partition counts and defines clear patterns for avoiding common bottlenecks like disk spills and uneven executor utilization. This systematic approach ensures Spark applications handle large-scale datasets while minimizing costs, improving latency, and maximizing cluster resource efficiency.

When to Use This Skill

  • β€’Reducing runtime for long-duration ETL jobs on massive S3 datasets
  • β€’Resolving executor out-of-memory errors and frequent garbage collection cycles
  • β€’Eliminating bottlenecks caused by heavy data skew in group-by operations
  • β€’Minimizing cluster network congestion during large-scale shuffles

How to Invoke This Skill

Example prompts that trigger this skill in Claude Code, Cursor, or Antigravity:

  • β€œHow do I speed up my slow PySpark job?
  • β€œMy Spark job is failing due to data skew on this join.
  • β€œHelp me optimize my Spark session configuration for performance.
  • β€œWhy are my Spark executors running out of memory?
  • β€œShow me how to reduce shuffle read/write in Spark.

Pro Tips

  • πŸ’‘Always start with comprehensive monitoring (Spark UI, Prometheus, Grafana) to pinpoint the actual bottlenecks before applying any optimization. Guessing wastes time.
  • πŸ’‘Prioritize reducing data shuffle, as it is typically the most expensive operation. Consider using broadcast joins for smaller lookup tables or repartition strategically.
  • πŸ’‘Experiment with different serialization formats (e.g., Kryo) and columnar storage formats (e.g., Parquet, ORC) to significantly reduce I/O and CPU overhead.

What this skill does

  • β€’Automatic configuration of Adaptive Query Execution (AQE) settings
  • β€’Mathematical calculation of optimal RDD/DataFrame partition counts
  • β€’Implementation of skew-resistant join strategies including broadcasting and bucketing
  • β€’Serialization tuning via Kryo configurations
  • β€’Application of predicate pushdown and column pruning techniques

When not to use it

  • βœ•Small data volumes where cluster management overhead exceeds processing time
  • βœ•Streaming applications requiring specific low-latency tuning beyond batch optimization
  • βœ•Non-distributed workloads where local processing tools are more appropriate

Example workflow

  1. Initialize Spark session with AQE enabled and Kryo serialization configured
  2. Analyze current partition distribution to identify hotspots
  3. Apply predicate pushdown filters to limit input data size
  4. Use broadcast joins for small-to-large table joins
  5. Implement salting or bucketing for highly skewed keys
  6. Write the processed output using partitioned file structures

Prerequisites

  • –Active PySpark development environment
  • –Access to cluster monitoring tools
  • –Understanding of Spark DAG execution

Pitfalls & limitations

  • !Over-partitioning leads to excessive task scheduling overhead
  • !Broadcast joins can crash the driver if the broadcasted table exceeds memory limits
  • !Salting introduces complexity that requires careful management during joins

FAQ

What is the primary benefit of enabling AQE?
AQE allows Spark to re-optimize query plans at runtime based on statistics collected between stages, effectively handling skew and partition coalescing dynamically.
How large should my Spark partitions be?
Aim for a size between 128MB and 256MB to balance task parallelism with scheduling overhead.
When should I use coalesce over repartition?
Use coalesce to reduce partitions without a full shuffle, which is significantly faster. Use repartition when you need to increase partitions or force a full data redistribution.
How do I know if I have data skew?
Check the Spark UI to see if a small number of tasks are taking significantly longer to complete compared to others, often resulting in high shuffle read times.

How it compares

Unlike generic performance prompts, this skill uses production-ready patterns for AQE and bucketed joins, eliminating the need to manually experiment with low-level Spark configurations.

Source & trust

⭐ 37k starsπŸ“„ MITπŸ•’ Updated 2026-06-16
πŸ“„ Full skill instructions β€” original source: wshobson/agents
# Apache Spark Optimization

Production patterns for optimizing Apache Spark jobs including partitioning strategies, memory management, shuffle optimization, and performance tuning.

## When to Use This Skill

- Optimizing slow Spark jobs
- Tuning memory and executor configuration
- Implementing efficient partitioning strategies
- Debugging Spark performance issues
- Scaling Spark pipelines for large datasets
- Reducing shuffle and data skew

## Core Concepts

### 1. Spark Execution Model

Driver Program
↓
Job (triggered by action)
↓
Stages (separated by shuffles)
↓
Tasks (one per partition)


### 2. Key Performance Factors

| Factor | Impact | Solution |
| ----------------- | --------------------- | ----------------------------- |
| **Shuffle** | Network I/O, disk I/O | Minimize wide transformations |
| **Data Skew** | Uneven task duration | Salting, broadcast joins |
| **Serialization** | CPU overhead | Use Kryo, columnar formats |
| **Memory** | GC pressure, spills | Tune executor memory |
| **Partitions** | Parallelism | Right-size partitions |

## Quick Start

from pyspark.sql import SparkSession
from pyspark.sql import functions as F

# Create optimized Spark session
spark = (SparkSession.builder
.appName("OptimizedJob")
.config("spark.sql.adaptive.enabled", "true")
.config("spark.sql.adaptive.coalescePartitions.enabled", "true")
.config("spark.sql.adaptive.skewJoin.enabled", "true")
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.config("spark.sql.shuffle.partitions", "200")
.getOrCreate())

# Read with optimized settings
df = (spark.read
.format("parquet")
.option("mergeSchema", "false")
.load("s3://bucket/data/"))

# Efficient transformations
result = (df
.filter(F.col("date") >= "2024-01-01")
.select("id", "amount", "category")
.groupBy("category")
.agg(F.sum("amount").alias("total")))

result.write.mode("overwrite").parquet("s3://bucket/output/")


## Patterns

### Pattern 1: Optimal Partitioning

# Calculate optimal partition count
def calculate_partitions(data_size_gb: float, partition_size_mb: int = 128) -> int:
"""
Optimal partition size: 128MB - 256MB
Too few: Under-utilization, memory pressure
Too many: Task scheduling overhead
"""
return max(int(data_size_gb * 1024 / partition_size_mb), 1)

# Repartition for even distribution
df_repartitioned = df.repartition(200, "partition_key")

# Coalesce to reduce partitions (no shuffle)
df_coalesced = df.coalesce(100)

# Partition pruning with predicate pushdown
df = (spark.read.parquet("s3://bucket/data/")
.filter(F.col("date") == "2024-01-01")) # Spark pushes this down

# Write with partitioning for future queries
(df.write
.partitionBy("year", "month", "day")
.mode("overwrite")
.parquet("s3://bucket/partitioned_output/"))


### Pattern 2: Join Optimization

from pyspark.sql import functions as F
from pyspark.sql.types import *

# 1. Broadcast Join - Small table joins
# Best when: One side < 10MB (configurable)
small_df = spark.read.parquet("s3://bucket/small_table/") # < 10MB
large_df = spark.read.parquet("s3://bucket/large_table/") # TBs

# Explicit broadcast hint
result = large_df.join(
F.broadcast(small_df),
on="key",
how="left"
)

# 2. Sort-Merge Join - Default for large tables
# Requires shuffle, but handles any size
result = large_df1.join(large_df2, on="key", how="inner")

# 3. Bucket Join - Pre-sorted, no shuffle at join time
# Write bucketed tables
(df.write
.bucketBy(200, "customer_id")
.sortBy("customer_id")
.mode("overwrite")
.saveAsTable("bucketed_orders"))

# Join bucketed tables (no shuffle!)
orders = spark.table("bucketed_orders")
customers = spark.table("bucketed_customers") # Same bucket count
result = orders.join(customers, on="customer_id")

# 4. Skew Join Handling
# Enable AQE skew join optimization
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")
spark.conf.set("spark.sql.adaptive.skewJoin.skewedPartitionFactor", "5")
spark.conf.set("spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes", "256MB")

# Manual salting for severe skew
def salt_join(df_skewed, df_other, key_col, num_salts=10):
"""Add salt to distribute skewed keys"""
# Add salt to skewed side
df_salted = df_skewed.withColumn(
"salt",
(F.rand() * num_salts).cast("int")
).withColumn(
"salted_key",
F.concat(F.col(key_col), F.lit("_"), F.col("salt"))
)

# Explode other side with all salts
df_exploded = df_other.crossJoin(
spark.range(num_salts).withColumnRenamed("id", "salt")
).withColumn(
"salted_key",
F.concat(F.col(key_col), F.lit("_"), F.col("salt"))
)

# Join on salted key
return df_salted.join(df_exploded, on="salted_key", how="inner")


### Pattern 3: Caching and Persistence

from pyspark import StorageLevel

# Cache when reusing DataFrame multiple times
df = spark.read.parquet("s3://bucket/data/")
df_filtered = df.filter(F.col("status") == "active")

# Cache in memory (MEMORY_AND_DISK is default)
df_filtered.cache()

# Or with specific storage level
df_filtered.persist(StorageLevel.MEMORY_AND_DISK_SER)

# Force materialization
df_filtered.count()

# Use in multiple actions
agg1 = df_filtered.groupBy("category").count()
agg2 = df_filtered.groupBy("region").sum("amount")

# Unpersist when done
df_filtered.unpersist()

# Storage levels explained:
# MEMORY_ONLY - Fast, but may not fit
# MEMORY_AND_DISK - Spills to disk if needed (recommended)
# MEMORY_ONLY_SER - Serialized, less memory, more CPU
# DISK_ONLY - When memory is tight
# OFF_HEAP - Tungsten off-heap memory

# Checkpoint for complex lineage
spark.sparkContext.setCheckpointDir("s3://bucket/checkpoints/")
df_complex = (df
.join(other_df, "key")
.groupBy("category")
.agg(F.sum("amount")))
df_complex.checkpoint() # Breaks lineage, materializes


### Pattern 4: Memory Tuning

# Executor memory configuration
# spark-submit --executor-memory 8g --executor-cores 4

# Memory breakdown (8GB executor):
# - spark.memory.fraction = 0.6 (60% = 4.8GB for execution + storage)
# - spark.memory.storageFraction = 0.5 (50% of 4.8GB = 2.4GB for cache)
# - Remaining 2.4GB for execution (shuffles, joins, sorts)
# - 40% = 3.2GB for user data structures and internal metadata

spark = (SparkSession.builder
.config("spark.executor.memory", "8g")
.config("spark.executor.memoryOverhead", "2g") # For non-JVM memory
.config("spark.memory.fraction", "0.6")
.config("spark.memory.storageFraction", "0.5")
.config("spark.sql.shuffle.partitions", "200")
# For memory-intensive operations
.config("spark.sql.autoBroadcastJoinThreshold", "50MB")
# Prevent OOM on large shuffles
.config("spark.sql.files.maxPartitionBytes", "128MB")
.getOrCreate())

# Monitor memory usage
def print_memory_usage(spark):
"""Print current memory usage"""
sc = spark.sparkContext
for executor in sc._jsc.sc().getExecutorMemoryStatus().keySet().toArray():
mem_status = sc._jsc.sc().getExecutorMemoryStatus().get(executor)
total = mem_status._1() / (1024**3)
free = mem_status._2() / (1024**3)
print(f"{executor}: {total:.2f}GB total, {free:.2f}GB free")


### Pattern 5: Shuffle Optimization

# Reduce shuffle data size
spark.conf.set("spark.sql.shuffle.partitions", "auto") # With AQE
spark.conf.set("spark.shuffle.compress", "true")
spark.conf.set("spark.shuffle.spill.compress", "true")

# Pre-aggregate before shuffle
df_optimized = (df
# Local aggregation first (combiner)
.groupBy("key", "partition_col")
.agg(F.sum("value").alias("partial_sum"))
# Then global aggregation
.groupBy("key")
.agg(F.sum("partial_sum").alias("total")))

# Avoid shuffle with map-side operations
# BAD: Shuffle for each distinct
distinct_count = df.select("category").distinct().count()

# GOOD: Approximate distinct (no shuffle)
approx_count = df.select(F.approx_count_distinct("category")).collect()[0][0]

# Use coalesce instead of repartition when reducing partitions
df_reduced = df.coalesce(10) # No shuffle

# Optimize shuffle with compression
spark.conf.set("spark.io.compression.codec", "lz4") # Fast compression


### Pattern 6: Data Format Optimization

# Parquet optimizations
(df.write
.option("compression", "snappy") # Fast compression
.option("parquet.block.size", 128 * 1024 * 1024) # 128MB row groups
.parquet("s3://bucket/output/"))

# Column pruning - only read needed columns
df = (spark.read.parquet("s3://bucket/data/")
.select("id", "amount", "date")) # Spark only reads these columns

# Predicate pushdown - filter at storage level
df = (spark.read.parquet("s3://bucket/partitioned/year=2024/")
.filter(F.col("status") == "active")) # Pushed to Parquet reader

# Delta Lake optimizations
(df.write
.format("delta")
.option("optimizeWrite", "true") # Bin-packing
.option("autoCompact", "true") # Compact small files
.mode("overwrite")
.save("s3://bucket/delta_table/"))

# Z-ordering for multi-dimensional queries
spark.sql("""
OPTIMIZE delta.s3://bucket/delta_table/
ZORDER BY (customer_id, date)
""")


### Pattern 7: Monitoring and Debugging

# Enable detailed metrics
spark.conf.set("spark.sql.codegen.wholeStage", "true")
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")

# Explain query plan
df.explain(mode="extended")
# Modes: simple, extended, codegen, cost, formatted

# Get physical plan statistics
df.explain(mode="cost")

# Monitor task metrics
def analyze_stage_metrics(spark):
"""Analyze recent stage metrics"""
status_tracker = spark.sparkContext.statusTracker()

for stage_id in status_tracker.getActiveStageIds():
stage_info = status_tracker.getStageInfo(stage_id)
print(f"Stage {stage_id}:")
print(f" Tasks: {stage_info.numTasks}")
print(f" Completed: {stage_info.numCompletedTasks}")
print(f" Failed: {stage_info.numFailedTasks}")

# Identify data skew
def check_partition_skew(df):
"""Check for partition skew"""
partition_counts = (df
.withColumn("partition_id", F.spark_partition_id())
.groupBy("partition_id")
.count()
.orderBy(F.desc("count")))

partition_counts.show(20)

stats = partition_counts.select(
F.min("count").alias("min"),
F.max("count").alias("max"),
F.avg("count").alias("avg"),
F.stddev("count").alias("stddev")
).collect()[0]

skew_ratio = stats["max"] / stats["avg"]
print(f"Skew ratio: {skew_ratio:.2f}x (>2x indicates skew)")


## Configuration Cheat Sheet

# Production configuration template
spark_configs = {
# Adaptive Query Execution (AQE)
"spark.sql.adaptive.enabled": "true",
"spark.sql.adaptive.coalescePartitions.enabled": "true",
"spark.sql.adaptive.skewJoin.enabled": "true",

# Memory
"spark.executor.memory": "8g",
"spark.executor.memoryOverhead": "2g",
"spark.memory.fraction": "0.6",
"spark.memory.storageFraction": "0.5",

# Parallelism
"spark.sql.shuffle.partitions": "200",
"spark.default.parallelism": "200",

# Serialization
"spark.serializer": "org.apache.spark.serializer.KryoSerializer",
"spark.sql.execution.arrow.pyspark.enabled": "true",

# Compression
"spark.io.compression.codec": "lz4",
"spark.shuffle.compress": "true",

# Broadcast
"spark.sql.autoBroadcastJoinThreshold": "50MB",

# File handling
"spark.sql.files.maxPartitionBytes": "128MB",
"spark.sql.files.openCostInBytes": "4MB",
}


## Best Practices

### Do's

- **Enable AQE** - Adaptive query execution handles many issues
- **Use Parquet/Delta** - Columnar formats with compression
- **Broadcast small tables** - Avoid shuffle for small joins
- **Monitor Spark UI** - Check for skew, spills, GC
- **Right-size partitions** - 128MB - 256MB per partition

### Don'ts

- **Don't collect large data** - Keep data distributed
- **Don't use UDFs unnecessarily** - Use built-in functions
- **Don't over-cache** - Memory is limited
- **Don't ignore data skew** - It dominates job time
- **Don't use .count() for existence** - Use .take(1) or .isEmpty()

## Resources

- [Spark Performance Tuning](https://spark.apache.org/docs/latest/sql-performance-tuning.html)
- [Spark Configuration](https://spark.apache.org/docs/latest/configuration.html)
- [Databricks Optimization Guide](https://docs.databricks.com/en/optimizations/index.html)

How to Use This Skill Unit

Option A: Project-Specific (Recommended)

  1. Click "Download" above
  2. In your project, create the directory: .agent/skills/spark-optimization/
  3. Save the file as SKILL.md
  4. The agent will automatically discover the skill based on its description.

Option B: Global Installation (All Agents)

Save the file to these locations to make it available across all projects:

  • Claude Code: ~/.claude/skills/wshobson/agents/spark-optimization/SKILL.md
  • Cursor: ~/.cursor/skills/wshobson/agents/spark-optimization/SKILL.md
  • Antigravity: ~/.gemini/antigravity/skills/wshobson/agents/spark-optimization/SKILL.md

πŸš€ Install with CLI:
npx skills add wshobson/agents

Read the Master Guide: Mastering Agent Skills β†’

Recommended Rules

View more rules β†’

Recommended Workflows

View more workflows β†’

Recommended MCP Servers

View more MCP servers β†’

Take It Further

Maximize your productivity with these powerful resources

πŸ“‹

Define Your Standards

Set up coding standards to ensure this workflow produces consistent, high-quality results.

Browse Rules Library
πŸ“–

Master Workflows

Learn how to create custom workflows, use Turbo Mode, and build your automation library.

Complete Guide

How to use this Skill in Claude Code & Cursor

For Claude Code (CLI)

To use this skill in Claude Code, copy the rule content into your project's custom instructions or follow our Add-Skill CLI guide. This ensures Claude follows your standards during every code generation.

For Cursor & Windsurf

For Cursor or Windsurf, individual skills are best used in the "Rules for AI" section. This specific unit helps the agent avoid devops & ci/cd issues, leading to cleaner, more efficient code.

Why the skill format matters: the standardized Agent Skills format lets your AI agent load detailed instructions only when they are relevant, keeping your prompt clean while improving results.

Source & attribution

This skill is categorized under DevOps & CI/CD and is published by W. Shobson, maintained in wshobson/agents.

← Browse All Agent Skills
Sponsored AI assistant. Recommendations may be paid.