Claude Agent Skill · by Wshobson

Go Concurrency Patterns

A solid collection of battle-tested Go concurrency patterns that covers the essentials: worker pools, fan-out/fan-in pipelines, semaphore-based rate limiting, a

Install
Terminal · npx
$npx skills add https://github.com/wshobson/agents --skill go-concurrency-patterns
Works with Paperclip

How Go Concurrency Patterns fits into a Paperclip company.

Go Concurrency Patterns drops into any Paperclip agent that handles this kind of work. Assign it to a specialist inside a pre-configured PaperclipOrg company and the skill becomes available on every heartbeat — no prompt engineering, no tool wiring.

S
SaaS FactoryPaired

Pre-configured AI company — 18 agents, 18 skills, one-time purchase.

$27$59
Explore pack
Source file
SKILL.md651 lines
Expand
---name: go-concurrency-patternsdescription: Master Go concurrency with goroutines, channels, sync primitives, and context. Use when building concurrent Go applications, implementing worker pools, or debugging race conditions.--- # Go Concurrency Patterns Production patterns for Go concurrency including goroutines, channels, synchronization primitives, and context management. ## When to Use This Skill - Building concurrent Go applications- Implementing worker pools and pipelines- Managing goroutine lifecycles- Using channels for communication- Debugging race conditions- Implementing graceful shutdown ## Core Concepts ### 1. Go Concurrency Primitives | Primitive         | Purpose                          || ----------------- | -------------------------------- || `goroutine`       | Lightweight concurrent execution || `channel`         | Communication between goroutines || `select`          | Multiplex channel operations     || `sync.Mutex`      | Mutual exclusion                 || `sync.WaitGroup`  | Wait for goroutines to complete  || `context.Context` | Cancellation and deadlines       | ### 2. Go Concurrency Mantra ```Don't communicate by sharing memory;share memory by communicating.``` ## Quick Start ```gopackage main import (    "context"    "fmt"    "sync"    "time") func main() {    ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)    defer cancel()     results := make(chan string, 10)    var wg sync.WaitGroup     // Spawn workers    for i := 0; i < 3; i++ {        wg.Add(1)        go worker(ctx, i, results, &wg)    }     // Close results when done    go func() {        wg.Wait()        close(results)    }()     // Collect results    for result := range results {        fmt.Println(result)    }} func worker(ctx context.Context, id int, results chan<- string, wg *sync.WaitGroup) {    defer wg.Done()     select {    case <-ctx.Done():        return    case results <- fmt.Sprintf("Worker %d done", id):    }}``` ## Patterns ### Pattern 1: Worker Pool ```gopackage main import (    "context"    "fmt"    "sync") type Job struct {    ID   int    Data string} type Result struct {    JobID int    Output string    Err   error} func WorkerPool(ctx context.Context, numWorkers int, jobs <-chan Job) <-chan Result {    results := make(chan Result, len(jobs))     var wg sync.WaitGroup    for i := 0; i < numWorkers; i++ {        wg.Add(1)        go func(workerID int) {            defer wg.Done()            for job := range jobs {                select {                case <-ctx.Done():                    return                default:                    result := processJob(job)                    results <- result                }            }        }(i)    }     go func() {        wg.Wait()        close(results)    }()     return results} func processJob(job Job) Result {    // Simulate work    return Result{        JobID:  job.ID,        Output: fmt.Sprintf("Processed: %s", job.Data),    }} // Usagefunc main() {    ctx, cancel := context.WithCancel(context.Background())    defer cancel()     jobs := make(chan Job, 100)     // Send jobs    go func() {        for i := 0; i < 50; i++ {            jobs <- Job{ID: i, Data: fmt.Sprintf("job-%d", i)}        }        close(jobs)    }()     // Process with 5 workers    results := WorkerPool(ctx, 5, jobs)     for result := range results {        fmt.Printf("Result: %+v\n", result)    }}``` ### Pattern 2: Fan-Out/Fan-In Pipeline ```gopackage main import (    "context"    "sync") // Stage 1: Generate numbersfunc generate(ctx context.Context, nums ...int) <-chan int {    out := make(chan int)    go func() {        defer close(out)        for _, n := range nums {            select {            case <-ctx.Done():                return            case out <- n:            }        }    }()    return out} // Stage 2: Square numbers (can run multiple instances)func square(ctx context.Context, in <-chan int) <-chan int {    out := make(chan int)    go func() {        defer close(out)        for n := range in {            select {            case <-ctx.Done():                return            case out <- n * n:            }        }    }()    return out} // Fan-in: Merge multiple channels into onefunc merge(ctx context.Context, cs ...<-chan int) <-chan int {    var wg sync.WaitGroup    out := make(chan int)     // Start output goroutine for each input channel    output := func(c <-chan int) {        defer wg.Done()        for n := range c {            select {            case <-ctx.Done():                return            case out <- n:            }        }    }     wg.Add(len(cs))    for _, c := range cs {        go output(c)    }     // Close out after all inputs are done    go func() {        wg.Wait()        close(out)    }()     return out} func main() {    ctx, cancel := context.WithCancel(context.Background())    defer cancel()     // Generate input    in := generate(ctx, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10)     // Fan out to multiple squarers    c1 := square(ctx, in)    c2 := square(ctx, in)    c3 := square(ctx, in)     // Fan in results    for result := range merge(ctx, c1, c2, c3) {        fmt.Println(result)    }}``` ### Pattern 3: Bounded Concurrency with Semaphore ```gopackage main import (    "context"    "fmt"    "golang.org/x/sync/semaphore"    "sync") type RateLimitedWorker struct {    sem *semaphore.Weighted} func NewRateLimitedWorker(maxConcurrent int64) *RateLimitedWorker {    return &RateLimitedWorker{        sem: semaphore.NewWeighted(maxConcurrent),    }} func (w *RateLimitedWorker) Do(ctx context.Context, tasks []func() error) []error {    var (        wg     sync.WaitGroup        mu     sync.Mutex        errors []error    )     for _, task := range tasks {        // Acquire semaphore (blocks if at limit)        if err := w.sem.Acquire(ctx, 1); err != nil {            return []error{err}        }         wg.Add(1)        go func(t func() error) {            defer wg.Done()            defer w.sem.Release(1)             if err := t(); err != nil {                mu.Lock()                errors = append(errors, err)                mu.Unlock()            }        }(task)    }     wg.Wait()    return errors} // Alternative: Channel-based semaphoretype Semaphore chan struct{} func NewSemaphore(n int) Semaphore {    return make(chan struct{}, n)} func (s Semaphore) Acquire() {    s <- struct{}{}} func (s Semaphore) Release() {    <-s}``` ### Pattern 4: Graceful Shutdown ```gopackage main import (    "context"    "fmt"    "os"    "os/signal"    "sync"    "syscall"    "time") type Server struct {    shutdown chan struct{}    wg       sync.WaitGroup} func NewServer() *Server {    return &Server{        shutdown: make(chan struct{}),    }} func (s *Server) Start(ctx context.Context) {    // Start workers    for i := 0; i < 5; i++ {        s.wg.Add(1)        go s.worker(ctx, i)    }} func (s *Server) worker(ctx context.Context, id int) {    defer s.wg.Done()    defer fmt.Printf("Worker %d stopped\n", id)     ticker := time.NewTicker(time.Second)    defer ticker.Stop()     for {        select {        case <-ctx.Done():            // Cleanup            fmt.Printf("Worker %d cleaning up...\n", id)            time.Sleep(500 * time.Millisecond) // Simulated cleanup            return        case <-ticker.C:            fmt.Printf("Worker %d working...\n", id)        }    }} func (s *Server) Shutdown(timeout time.Duration) {    // Signal shutdown    close(s.shutdown)     // Wait with timeout    done := make(chan struct{})    go func() {        s.wg.Wait()        close(done)    }()     select {    case <-done:        fmt.Println("Clean shutdown completed")    case <-time.After(timeout):        fmt.Println("Shutdown timed out, forcing exit")    }} func main() {    // Setup signal handling    ctx, cancel := context.WithCancel(context.Background())     sigCh := make(chan os.Signal, 1)    signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)     server := NewServer()    server.Start(ctx)     // Wait for signal    sig := <-sigCh    fmt.Printf("\nReceived signal: %v\n", sig)     // Cancel context to stop workers    cancel()     // Wait for graceful shutdown    server.Shutdown(5 * time.Second)}``` ### Pattern 5: Error Group with Cancellation ```gopackage main import (    "context"    "fmt"    "golang.org/x/sync/errgroup"    "net/http") func fetchAllURLs(ctx context.Context, urls []string) ([]string, error) {    g, ctx := errgroup.WithContext(ctx)     results := make([]string, len(urls))     for i, url := range urls {        i, url := i, url // Capture loop variables         g.Go(func() error {            req, err := http.NewRequestWithContext(ctx, "GET", url, nil)            if err != nil {                return fmt.Errorf("creating request for %s: %w", url, err)            }             resp, err := http.DefaultClient.Do(req)            if err != nil {                return fmt.Errorf("fetching %s: %w", url, err)            }            defer resp.Body.Close()             results[i] = fmt.Sprintf("%s: %d", url, resp.StatusCode)            return nil        })    }     // Wait for all goroutines to complete or one to fail    if err := g.Wait(); err != nil {        return nil, err // First error cancels all others    }     return results, nil} // With concurrency limitfunc fetchWithLimit(ctx context.Context, urls []string, limit int) ([]string, error) {    g, ctx := errgroup.WithContext(ctx)    g.SetLimit(limit) // Max concurrent goroutines     results := make([]string, len(urls))    var mu sync.Mutex     for i, url := range urls {        i, url := i, url         g.Go(func() error {            result, err := fetchURL(ctx, url)            if err != nil {                return err            }             mu.Lock()            results[i] = result            mu.Unlock()            return nil        })    }     if err := g.Wait(); err != nil {        return nil, err    }     return results, nil}``` ### Pattern 6: Concurrent Map with sync.Map ```gopackage main import (    "sync") // For frequent reads, infrequent writestype Cache struct {    m sync.Map} func (c *Cache) Get(key string) (interface{}, bool) {    return c.m.Load(key)} func (c *Cache) Set(key string, value interface{}) {    c.m.Store(key, value)} func (c *Cache) GetOrSet(key string, value interface{}) (interface{}, bool) {    return c.m.LoadOrStore(key, value)} func (c *Cache) Delete(key string) {    c.m.Delete(key)} // For write-heavy workloads, use sharded maptype ShardedMap struct {    shards    []*shard    numShards int} type shard struct {    sync.RWMutex    data map[string]interface{}} func NewShardedMap(numShards int) *ShardedMap {    m := &ShardedMap{        shards:    make([]*shard, numShards),        numShards: numShards,    }    for i := range m.shards {        m.shards[i] = &shard{data: make(map[string]interface{})}    }    return m} func (m *ShardedMap) getShard(key string) *shard {    // Simple hash    h := 0    for _, c := range key {        h = 31*h + int(c)    }    return m.shards[h%m.numShards]} func (m *ShardedMap) Get(key string) (interface{}, bool) {    shard := m.getShard(key)    shard.RLock()    defer shard.RUnlock()    v, ok := shard.data[key]    return v, ok} func (m *ShardedMap) Set(key string, value interface{}) {    shard := m.getShard(key)    shard.Lock()    defer shard.Unlock()    shard.data[key] = value}``` ### Pattern 7: Select with Timeout and Default ```gofunc selectPatterns() {    ch := make(chan int)     // Timeout pattern    select {    case v := <-ch:        fmt.Println("Received:", v)    case <-time.After(time.Second):        fmt.Println("Timeout!")    }     // Non-blocking send/receive    select {    case ch <- 42:        fmt.Println("Sent")    default:        fmt.Println("Channel full, skipping")    }     // Priority select (check high priority first)    highPriority := make(chan int)    lowPriority := make(chan int)     for {        select {        case msg := <-highPriority:            fmt.Println("High priority:", msg)        default:            select {            case msg := <-highPriority:                fmt.Println("High priority:", msg)            case msg := <-lowPriority:                fmt.Println("Low priority:", msg)            }        }    }}``` ## Race Detection ```bash# Run tests with race detectorgo test -race ./... # Build with race detectorgo build -race . # Run with race detectorgo run -race main.go``` ## Best Practices ### Do's - **Use context** - For cancellation and deadlines- **Close channels** - From sender side only- **Use errgroup** - For concurrent operations with errors- **Buffer channels** - When you know the count- **Prefer channels** - Over mutexes when possible ### Don'ts - **Don't leak goroutines** - Always have exit path- **Don't close from receiver** - Causes panic- **Don't use shared memory** - Unless necessary- **Don't ignore context cancellation** - Check ctx.Done()- **Don't use time.Sleep for sync** - Use proper primitives