npx skills add https://github.com/wshobson/agents --skill go-concurrency-patternsHow Go Concurrency Patterns fits into a Paperclip company.
Go Concurrency Patterns drops into any Paperclip agent that handles this kind of work. Assign it to a specialist inside a pre-configured PaperclipOrg company and the skill becomes available on every heartbeat — no prompt engineering, no tool wiring.
Pre-configured AI company — 18 agents, 18 skills, one-time purchase.
SKILL.md651 linesExpandCollapse
---name: go-concurrency-patternsdescription: Master Go concurrency with goroutines, channels, sync primitives, and context. Use when building concurrent Go applications, implementing worker pools, or debugging race conditions.--- # Go Concurrency Patterns Production patterns for Go concurrency including goroutines, channels, synchronization primitives, and context management. ## When to Use This Skill - Building concurrent Go applications- Implementing worker pools and pipelines- Managing goroutine lifecycles- Using channels for communication- Debugging race conditions- Implementing graceful shutdown ## Core Concepts ### 1. Go Concurrency Primitives | Primitive | Purpose || ----------------- | -------------------------------- || `goroutine` | Lightweight concurrent execution || `channel` | Communication between goroutines || `select` | Multiplex channel operations || `sync.Mutex` | Mutual exclusion || `sync.WaitGroup` | Wait for goroutines to complete || `context.Context` | Cancellation and deadlines | ### 2. Go Concurrency Mantra ```Don't communicate by sharing memory;share memory by communicating.``` ## Quick Start ```gopackage main import ( "context" "fmt" "sync" "time") func main() { ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() results := make(chan string, 10) var wg sync.WaitGroup // Spawn workers for i := 0; i < 3; i++ { wg.Add(1) go worker(ctx, i, results, &wg) } // Close results when done go func() { wg.Wait() close(results) }() // Collect results for result := range results { fmt.Println(result) }} func worker(ctx context.Context, id int, results chan<- string, wg *sync.WaitGroup) { defer wg.Done() select { case <-ctx.Done(): return case results <- fmt.Sprintf("Worker %d done", id): }}``` ## Patterns ### Pattern 1: Worker Pool ```gopackage main import ( "context" "fmt" "sync") type Job struct { ID int Data string} type Result struct { JobID int Output string Err error} func WorkerPool(ctx context.Context, numWorkers int, jobs <-chan Job) <-chan Result { results := make(chan Result, len(jobs)) var wg sync.WaitGroup for i := 0; i < numWorkers; i++ { wg.Add(1) go func(workerID int) { defer wg.Done() for job := range jobs { select { case <-ctx.Done(): return default: result := processJob(job) results <- result } } }(i) } go func() { wg.Wait() close(results) }() return results} func processJob(job Job) Result { // Simulate work return Result{ JobID: job.ID, Output: fmt.Sprintf("Processed: %s", job.Data), }} // Usagefunc main() { ctx, cancel := context.WithCancel(context.Background()) defer cancel() jobs := make(chan Job, 100) // Send jobs go func() { for i := 0; i < 50; i++ { jobs <- Job{ID: i, Data: fmt.Sprintf("job-%d", i)} } close(jobs) }() // Process with 5 workers results := WorkerPool(ctx, 5, jobs) for result := range results { fmt.Printf("Result: %+v\n", result) }}``` ### Pattern 2: Fan-Out/Fan-In Pipeline ```gopackage main import ( "context" "sync") // Stage 1: Generate numbersfunc generate(ctx context.Context, nums ...int) <-chan int { out := make(chan int) go func() { defer close(out) for _, n := range nums { select { case <-ctx.Done(): return case out <- n: } } }() return out} // Stage 2: Square numbers (can run multiple instances)func square(ctx context.Context, in <-chan int) <-chan int { out := make(chan int) go func() { defer close(out) for n := range in { select { case <-ctx.Done(): return case out <- n * n: } } }() return out} // Fan-in: Merge multiple channels into onefunc merge(ctx context.Context, cs ...<-chan int) <-chan int { var wg sync.WaitGroup out := make(chan int) // Start output goroutine for each input channel output := func(c <-chan int) { defer wg.Done() for n := range c { select { case <-ctx.Done(): return case out <- n: } } } wg.Add(len(cs)) for _, c := range cs { go output(c) } // Close out after all inputs are done go func() { wg.Wait() close(out) }() return out} func main() { ctx, cancel := context.WithCancel(context.Background()) defer cancel() // Generate input in := generate(ctx, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10) // Fan out to multiple squarers c1 := square(ctx, in) c2 := square(ctx, in) c3 := square(ctx, in) // Fan in results for result := range merge(ctx, c1, c2, c3) { fmt.Println(result) }}``` ### Pattern 3: Bounded Concurrency with Semaphore ```gopackage main import ( "context" "fmt" "golang.org/x/sync/semaphore" "sync") type RateLimitedWorker struct { sem *semaphore.Weighted} func NewRateLimitedWorker(maxConcurrent int64) *RateLimitedWorker { return &RateLimitedWorker{ sem: semaphore.NewWeighted(maxConcurrent), }} func (w *RateLimitedWorker) Do(ctx context.Context, tasks []func() error) []error { var ( wg sync.WaitGroup mu sync.Mutex errors []error ) for _, task := range tasks { // Acquire semaphore (blocks if at limit) if err := w.sem.Acquire(ctx, 1); err != nil { return []error{err} } wg.Add(1) go func(t func() error) { defer wg.Done() defer w.sem.Release(1) if err := t(); err != nil { mu.Lock() errors = append(errors, err) mu.Unlock() } }(task) } wg.Wait() return errors} // Alternative: Channel-based semaphoretype Semaphore chan struct{} func NewSemaphore(n int) Semaphore { return make(chan struct{}, n)} func (s Semaphore) Acquire() { s <- struct{}{}} func (s Semaphore) Release() { <-s}``` ### Pattern 4: Graceful Shutdown ```gopackage main import ( "context" "fmt" "os" "os/signal" "sync" "syscall" "time") type Server struct { shutdown chan struct{} wg sync.WaitGroup} func NewServer() *Server { return &Server{ shutdown: make(chan struct{}), }} func (s *Server) Start(ctx context.Context) { // Start workers for i := 0; i < 5; i++ { s.wg.Add(1) go s.worker(ctx, i) }} func (s *Server) worker(ctx context.Context, id int) { defer s.wg.Done() defer fmt.Printf("Worker %d stopped\n", id) ticker := time.NewTicker(time.Second) defer ticker.Stop() for { select { case <-ctx.Done(): // Cleanup fmt.Printf("Worker %d cleaning up...\n", id) time.Sleep(500 * time.Millisecond) // Simulated cleanup return case <-ticker.C: fmt.Printf("Worker %d working...\n", id) } }} func (s *Server) Shutdown(timeout time.Duration) { // Signal shutdown close(s.shutdown) // Wait with timeout done := make(chan struct{}) go func() { s.wg.Wait() close(done) }() select { case <-done: fmt.Println("Clean shutdown completed") case <-time.After(timeout): fmt.Println("Shutdown timed out, forcing exit") }} func main() { // Setup signal handling ctx, cancel := context.WithCancel(context.Background()) sigCh := make(chan os.Signal, 1) signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM) server := NewServer() server.Start(ctx) // Wait for signal sig := <-sigCh fmt.Printf("\nReceived signal: %v\n", sig) // Cancel context to stop workers cancel() // Wait for graceful shutdown server.Shutdown(5 * time.Second)}``` ### Pattern 5: Error Group with Cancellation ```gopackage main import ( "context" "fmt" "golang.org/x/sync/errgroup" "net/http") func fetchAllURLs(ctx context.Context, urls []string) ([]string, error) { g, ctx := errgroup.WithContext(ctx) results := make([]string, len(urls)) for i, url := range urls { i, url := i, url // Capture loop variables g.Go(func() error { req, err := http.NewRequestWithContext(ctx, "GET", url, nil) if err != nil { return fmt.Errorf("creating request for %s: %w", url, err) } resp, err := http.DefaultClient.Do(req) if err != nil { return fmt.Errorf("fetching %s: %w", url, err) } defer resp.Body.Close() results[i] = fmt.Sprintf("%s: %d", url, resp.StatusCode) return nil }) } // Wait for all goroutines to complete or one to fail if err := g.Wait(); err != nil { return nil, err // First error cancels all others } return results, nil} // With concurrency limitfunc fetchWithLimit(ctx context.Context, urls []string, limit int) ([]string, error) { g, ctx := errgroup.WithContext(ctx) g.SetLimit(limit) // Max concurrent goroutines results := make([]string, len(urls)) var mu sync.Mutex for i, url := range urls { i, url := i, url g.Go(func() error { result, err := fetchURL(ctx, url) if err != nil { return err } mu.Lock() results[i] = result mu.Unlock() return nil }) } if err := g.Wait(); err != nil { return nil, err } return results, nil}``` ### Pattern 6: Concurrent Map with sync.Map ```gopackage main import ( "sync") // For frequent reads, infrequent writestype Cache struct { m sync.Map} func (c *Cache) Get(key string) (interface{}, bool) { return c.m.Load(key)} func (c *Cache) Set(key string, value interface{}) { c.m.Store(key, value)} func (c *Cache) GetOrSet(key string, value interface{}) (interface{}, bool) { return c.m.LoadOrStore(key, value)} func (c *Cache) Delete(key string) { c.m.Delete(key)} // For write-heavy workloads, use sharded maptype ShardedMap struct { shards []*shard numShards int} type shard struct { sync.RWMutex data map[string]interface{}} func NewShardedMap(numShards int) *ShardedMap { m := &ShardedMap{ shards: make([]*shard, numShards), numShards: numShards, } for i := range m.shards { m.shards[i] = &shard{data: make(map[string]interface{})} } return m} func (m *ShardedMap) getShard(key string) *shard { // Simple hash h := 0 for _, c := range key { h = 31*h + int(c) } return m.shards[h%m.numShards]} func (m *ShardedMap) Get(key string) (interface{}, bool) { shard := m.getShard(key) shard.RLock() defer shard.RUnlock() v, ok := shard.data[key] return v, ok} func (m *ShardedMap) Set(key string, value interface{}) { shard := m.getShard(key) shard.Lock() defer shard.Unlock() shard.data[key] = value}``` ### Pattern 7: Select with Timeout and Default ```gofunc selectPatterns() { ch := make(chan int) // Timeout pattern select { case v := <-ch: fmt.Println("Received:", v) case <-time.After(time.Second): fmt.Println("Timeout!") } // Non-blocking send/receive select { case ch <- 42: fmt.Println("Sent") default: fmt.Println("Channel full, skipping") } // Priority select (check high priority first) highPriority := make(chan int) lowPriority := make(chan int) for { select { case msg := <-highPriority: fmt.Println("High priority:", msg) default: select { case msg := <-highPriority: fmt.Println("High priority:", msg) case msg := <-lowPriority: fmt.Println("Low priority:", msg) } } }}``` ## Race Detection ```bash# Run tests with race detectorgo test -race ./... # Build with race detectorgo build -race . # Run with race detectorgo run -race main.go``` ## Best Practices ### Do's - **Use context** - For cancellation and deadlines- **Close channels** - From sender side only- **Use errgroup** - For concurrent operations with errors- **Buffer channels** - When you know the count- **Prefer channels** - Over mutexes when possible ### Don'ts - **Don't leak goroutines** - Always have exit path- **Don't close from receiver** - Causes panic- **Don't use shared memory** - Unless necessary- **Don't ignore context cancellation** - Check ctx.Done()- **Don't use time.Sleep for sync** - Use proper primitivesAccessibility Compliance
This walks you through implementing proper WCAG 2.2 compliance with real code patterns for screen readers, keyboard navigation, and mobile accessibility. It cov
Airflow Dag Patterns
If you're building data pipelines with Airflow, this skill gives you production-ready DAG patterns that actually work in the real world. It covers TaskFlow API
Angular Migration
Migrating from AngularJS to Angular is notoriously painful, and this skill tackles the practical stuff that makes or breaks these projects. It covers hybrid app