Skip to content

Worker Pools

Johnny Boursiquot edited this page May 31, 2024 · 1 revision

The worker pool pattern is a concurrency pattern that manages a set of worker goroutines to handle tasks from a shared task queue. It efficiently utilizes system resources by controlling the number of active goroutines, making it useful for processing tasks concurrently without overwhelming the system.

Key Concepts

  • Task Queue: A buffered channel that holds tasks to be processed.
  • Workers: Goroutines that pick tasks from the task queue and process them.
  • Dispatcher: A goroutine that distributes tasks to the workers.

Worker pool basics

package main

import (
    "fmt"
    "sync"
    "time"
)

type task struct {id int}
type result struct{id int}

func worker(id int, tasks <-chan task, results chan<- result) {
    for task := range tasks {
        fmt.Printf("Worker %d processing %#v\n", id, task)
        time.Sleep(time.Second) // Simulate work
        results <- result{id: task.id}
    }
}

func main() {
    numWorkers := 3
    numTasks := 5

    tasks := make(chan task, numTasks)
    results := make(chan result, numTasks)

    // launch workers
    for i := 1; i <= numWorkers; i++ {
        go worker(i, tasks, results)
    }

    // dispatch some tasks
    for i := 1; i <= numTasks; i++ {
        tasks <- task{id: i}
    }
    close(tasks) // indicates we're done adding tasks to the queue

    // collect results
    for a := 1; a <= numTasks; a++ {
	    fmt.Printf("%#v returned", <-results)
	}
}

Running this program yields something like this:

Worker 3 processing main.task{id:1}
Worker 1 processing main.task{id:2}
Worker 2 processing main.task{id:3}
Worker 3 processing main.task{id:4}
Worker 2 processing main.task{id:5}
main.result{id:1} returned
main.result{id:3} returned
main.result{id:2} returned
main.result{id:5} returned
main.result{id:4} returned

Alternative implementation using a sync.WaitGroup to wait for the results:

package main

import (
    "fmt"
    "sync"
    "time"
)

type task struct{ id int }
type result struct{ id int }

func worker(id int, tasks <-chan task, results chan<- result, wg *sync.WaitGroup) {
    defer wg.Done()
    for task := range tasks {
        fmt.Printf("Worker %d processing %#v\n", id, task)
        time.Sleep(time.Second) // Simulate work
        results <- result{id: task.id}
    }
}

func main() {
    numWorkers := 3
    numTasks := 5

    tasks := make(chan task, numTasks)
    results := make(chan result, numTasks)

    var wg sync.WaitGroup
    wg.Add(numWorkers)

    // launch workers
    for i := 1; i <= numWorkers; i++ {
        go worker(i, tasks, results, &wg)
    }

    // dispatch some tasks
    for i := 1; i <= numTasks; i++ {
        tasks <- task{id: i}
    }
    close(tasks) // indicates we're done adding tasks to the queue

    // wait for all workers to finish
    go func() {
        wg.Wait()
        close(results)
    }()

    // collect results
    for res := range results {
        fmt.Printf("%#v returned", res)
    }

    fmt.Println("All tasks processed.")
}

Exercise 2

You are now well equiped to take on exercise 2.