npx skills add https://github.com/wshobson/agents --skill spark-optimizationHow Spark Optimization fits into a Paperclip company.
Spark Optimization drops into any Paperclip agent that handles this kind of work. Assign it to a specialist inside a pre-configured PaperclipOrg company and the skill becomes available on every heartbeat — no prompt engineering, no tool wiring.
Pre-configured AI company — 18 agents, 18 skills, one-time purchase.
SKILL.md411 linesExpandCollapse
---name: spark-optimizationdescription: Optimize Apache Spark jobs with partitioning, caching, shuffle optimization, and memory tuning. Use when improving Spark performance, debugging slow jobs, or scaling data processing pipelines.--- # Apache Spark Optimization Production patterns for optimizing Apache Spark jobs including partitioning strategies, memory management, shuffle optimization, and performance tuning. ## When to Use This Skill - Optimizing slow Spark jobs- Tuning memory and executor configuration- Implementing efficient partitioning strategies- Debugging Spark performance issues- Scaling Spark pipelines for large datasets- Reducing shuffle and data skew ## Core Concepts ### 1. Spark Execution Model ```Driver Program ↓Job (triggered by action) ↓Stages (separated by shuffles) ↓Tasks (one per partition)``` ### 2. Key Performance Factors | Factor | Impact | Solution || ----------------- | --------------------- | ----------------------------- || **Shuffle** | Network I/O, disk I/O | Minimize wide transformations || **Data Skew** | Uneven task duration | Salting, broadcast joins || **Serialization** | CPU overhead | Use Kryo, columnar formats || **Memory** | GC pressure, spills | Tune executor memory || **Partitions** | Parallelism | Right-size partitions | ## Quick Start ```pythonfrom pyspark.sql import SparkSessionfrom pyspark.sql import functions as F # Create optimized Spark sessionspark = (SparkSession.builder .appName("OptimizedJob") .config("spark.sql.adaptive.enabled", "true") .config("spark.sql.adaptive.coalescePartitions.enabled", "true") .config("spark.sql.adaptive.skewJoin.enabled", "true") .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .config("spark.sql.shuffle.partitions", "200") .getOrCreate()) # Read with optimized settingsdf = (spark.read .format("parquet") .option("mergeSchema", "false") .load("s3://bucket/data/")) # Efficient transformationsresult = (df .filter(F.col("date") >= "2024-01-01") .select("id", "amount", "category") .groupBy("category") .agg(F.sum("amount").alias("total"))) result.write.mode("overwrite").parquet("s3://bucket/output/")``` ## Patterns ### Pattern 1: Optimal Partitioning ```python# Calculate optimal partition countdef calculate_partitions(data_size_gb: float, partition_size_mb: int = 128) -> int: """ Optimal partition size: 128MB - 256MB Too few: Under-utilization, memory pressure Too many: Task scheduling overhead """ return max(int(data_size_gb * 1024 / partition_size_mb), 1) # Repartition for even distributiondf_repartitioned = df.repartition(200, "partition_key") # Coalesce to reduce partitions (no shuffle)df_coalesced = df.coalesce(100) # Partition pruning with predicate pushdowndf = (spark.read.parquet("s3://bucket/data/") .filter(F.col("date") == "2024-01-01")) # Spark pushes this down # Write with partitioning for future queries(df.write .partitionBy("year", "month", "day") .mode("overwrite") .parquet("s3://bucket/partitioned_output/"))``` ### Pattern 2: Join Optimization ```pythonfrom pyspark.sql import functions as Ffrom pyspark.sql.types import * # 1. Broadcast Join - Small table joins# Best when: One side < 10MB (configurable)small_df = spark.read.parquet("s3://bucket/small_table/") # < 10MBlarge_df = spark.read.parquet("s3://bucket/large_table/") # TBs # Explicit broadcast hintresult = large_df.join( F.broadcast(small_df), on="key", how="left") # 2. Sort-Merge Join - Default for large tables# Requires shuffle, but handles any sizeresult = large_df1.join(large_df2, on="key", how="inner") # 3. Bucket Join - Pre-sorted, no shuffle at join time# Write bucketed tables(df.write .bucketBy(200, "customer_id") .sortBy("customer_id") .mode("overwrite") .saveAsTable("bucketed_orders")) # Join bucketed tables (no shuffle!)orders = spark.table("bucketed_orders")customers = spark.table("bucketed_customers") # Same bucket countresult = orders.join(customers, on="customer_id") # 4. Skew Join Handling# Enable AQE skew join optimizationspark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")spark.conf.set("spark.sql.adaptive.skewJoin.skewedPartitionFactor", "5")spark.conf.set("spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes", "256MB") # Manual salting for severe skewdef salt_join(df_skewed, df_other, key_col, num_salts=10): """Add salt to distribute skewed keys""" # Add salt to skewed side df_salted = df_skewed.withColumn( "salt", (F.rand() * num_salts).cast("int") ).withColumn( "salted_key", F.concat(F.col(key_col), F.lit("_"), F.col("salt")) ) # Explode other side with all salts df_exploded = df_other.crossJoin( spark.range(num_salts).withColumnRenamed("id", "salt") ).withColumn( "salted_key", F.concat(F.col(key_col), F.lit("_"), F.col("salt")) ) # Join on salted key return df_salted.join(df_exploded, on="salted_key", how="inner")``` ### Pattern 3: Caching and Persistence ```pythonfrom pyspark import StorageLevel # Cache when reusing DataFrame multiple timesdf = spark.read.parquet("s3://bucket/data/")df_filtered = df.filter(F.col("status") == "active") # Cache in memory (MEMORY_AND_DISK is default)df_filtered.cache() # Or with specific storage leveldf_filtered.persist(StorageLevel.MEMORY_AND_DISK_SER) # Force materializationdf_filtered.count() # Use in multiple actionsagg1 = df_filtered.groupBy("category").count()agg2 = df_filtered.groupBy("region").sum("amount") # Unpersist when donedf_filtered.unpersist() # Storage levels explained:# MEMORY_ONLY - Fast, but may not fit# MEMORY_AND_DISK - Spills to disk if needed (recommended)# MEMORY_ONLY_SER - Serialized, less memory, more CPU# DISK_ONLY - When memory is tight# OFF_HEAP - Tungsten off-heap memory # Checkpoint for complex lineagespark.sparkContext.setCheckpointDir("s3://bucket/checkpoints/")df_complex = (df .join(other_df, "key") .groupBy("category") .agg(F.sum("amount")))df_complex.checkpoint() # Breaks lineage, materializes``` ### Pattern 4: Memory Tuning ```python# Executor memory configuration# spark-submit --executor-memory 8g --executor-cores 4 # Memory breakdown (8GB executor):# - spark.memory.fraction = 0.6 (60% = 4.8GB for execution + storage)# - spark.memory.storageFraction = 0.5 (50% of 4.8GB = 2.4GB for cache)# - Remaining 2.4GB for execution (shuffles, joins, sorts)# - 40% = 3.2GB for user data structures and internal metadata spark = (SparkSession.builder .config("spark.executor.memory", "8g") .config("spark.executor.memoryOverhead", "2g") # For non-JVM memory .config("spark.memory.fraction", "0.6") .config("spark.memory.storageFraction", "0.5") .config("spark.sql.shuffle.partitions", "200") # For memory-intensive operations .config("spark.sql.autoBroadcastJoinThreshold", "50MB") # Prevent OOM on large shuffles .config("spark.sql.files.maxPartitionBytes", "128MB") .getOrCreate()) # Monitor memory usagedef print_memory_usage(spark): """Print current memory usage""" sc = spark.sparkContext for executor in sc._jsc.sc().getExecutorMemoryStatus().keySet().toArray(): mem_status = sc._jsc.sc().getExecutorMemoryStatus().get(executor) total = mem_status._1() / (1024**3) free = mem_status._2() / (1024**3) print(f"{executor}: {total:.2f}GB total, {free:.2f}GB free")``` ### Pattern 5: Shuffle Optimization ```python# Reduce shuffle data sizespark.conf.set("spark.sql.shuffle.partitions", "auto") # With AQEspark.conf.set("spark.shuffle.compress", "true")spark.conf.set("spark.shuffle.spill.compress", "true") # Pre-aggregate before shuffledf_optimized = (df # Local aggregation first (combiner) .groupBy("key", "partition_col") .agg(F.sum("value").alias("partial_sum")) # Then global aggregation .groupBy("key") .agg(F.sum("partial_sum").alias("total"))) # Avoid shuffle with map-side operations# BAD: Shuffle for each distinctdistinct_count = df.select("category").distinct().count() # GOOD: Approximate distinct (no shuffle)approx_count = df.select(F.approx_count_distinct("category")).collect()[0][0] # Use coalesce instead of repartition when reducing partitionsdf_reduced = df.coalesce(10) # No shuffle # Optimize shuffle with compressionspark.conf.set("spark.io.compression.codec", "lz4") # Fast compression``` ### Pattern 6: Data Format Optimization ```python# Parquet optimizations(df.write .option("compression", "snappy") # Fast compression .option("parquet.block.size", 128 * 1024 * 1024) # 128MB row groups .parquet("s3://bucket/output/")) # Column pruning - only read needed columnsdf = (spark.read.parquet("s3://bucket/data/") .select("id", "amount", "date")) # Spark only reads these columns # Predicate pushdown - filter at storage leveldf = (spark.read.parquet("s3://bucket/partitioned/year=2024/") .filter(F.col("status") == "active")) # Pushed to Parquet reader # Delta Lake optimizations(df.write .format("delta") .option("optimizeWrite", "true") # Bin-packing .option("autoCompact", "true") # Compact small files .mode("overwrite") .save("s3://bucket/delta_table/")) # Z-ordering for multi-dimensional queriesspark.sql(""" OPTIMIZE delta.`s3://bucket/delta_table/` ZORDER BY (customer_id, date)""")``` ### Pattern 7: Monitoring and Debugging ```python# Enable detailed metricsspark.conf.set("spark.sql.codegen.wholeStage", "true")spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true") # Explain query plandf.explain(mode="extended")# Modes: simple, extended, codegen, cost, formatted # Get physical plan statisticsdf.explain(mode="cost") # Monitor task metricsdef analyze_stage_metrics(spark): """Analyze recent stage metrics""" status_tracker = spark.sparkContext.statusTracker() for stage_id in status_tracker.getActiveStageIds(): stage_info = status_tracker.getStageInfo(stage_id) print(f"Stage {stage_id}:") print(f" Tasks: {stage_info.numTasks}") print(f" Completed: {stage_info.numCompletedTasks}") print(f" Failed: {stage_info.numFailedTasks}") # Identify data skewdef check_partition_skew(df): """Check for partition skew""" partition_counts = (df .withColumn("partition_id", F.spark_partition_id()) .groupBy("partition_id") .count() .orderBy(F.desc("count"))) partition_counts.show(20) stats = partition_counts.select( F.min("count").alias("min"), F.max("count").alias("max"), F.avg("count").alias("avg"), F.stddev("count").alias("stddev") ).collect()[0] skew_ratio = stats["max"] / stats["avg"] print(f"Skew ratio: {skew_ratio:.2f}x (>2x indicates skew)")``` ## Configuration Cheat Sheet ```python# Production configuration templatespark_configs = { # Adaptive Query Execution (AQE) "spark.sql.adaptive.enabled": "true", "spark.sql.adaptive.coalescePartitions.enabled": "true", "spark.sql.adaptive.skewJoin.enabled": "true", # Memory "spark.executor.memory": "8g", "spark.executor.memoryOverhead": "2g", "spark.memory.fraction": "0.6", "spark.memory.storageFraction": "0.5", # Parallelism "spark.sql.shuffle.partitions": "200", "spark.default.parallelism": "200", # Serialization "spark.serializer": "org.apache.spark.serializer.KryoSerializer", "spark.sql.execution.arrow.pyspark.enabled": "true", # Compression "spark.io.compression.codec": "lz4", "spark.shuffle.compress": "true", # Broadcast "spark.sql.autoBroadcastJoinThreshold": "50MB", # File handling "spark.sql.files.maxPartitionBytes": "128MB", "spark.sql.files.openCostInBytes": "4MB",}``` ## Best Practices ### Do's - **Enable AQE** - Adaptive query execution handles many issues- **Use Parquet/Delta** - Columnar formats with compression- **Broadcast small tables** - Avoid shuffle for small joins- **Monitor Spark UI** - Check for skew, spills, GC- **Right-size partitions** - 128MB - 256MB per partition ### Don'ts - **Don't collect large data** - Keep data distributed- **Don't use UDFs unnecessarily** - Use built-in functions- **Don't over-cache** - Memory is limited- **Don't ignore data skew** - It dominates job time- **Don't use `.count()` for existence** - Use `.take(1)` or `.isEmpty()`Accessibility Compliance
This walks you through implementing proper WCAG 2.2 compliance with real code patterns for screen readers, keyboard navigation, and mobile accessibility. It cov
Airflow Dag Patterns
If you're building data pipelines with Airflow, this skill gives you production-ready DAG patterns that actually work in the real world. It covers TaskFlow API
Angular Migration
Migrating from AngularJS to Angular is notoriously painful, and this skill tackles the practical stuff that makes or breaks these projects. It covers hybrid app