-
Notifications
You must be signed in to change notification settings - Fork 3
Worker Pools
Johnny Boursiquot edited this page May 31, 2024
·
1 revision
The worker pool pattern is a concurrency pattern that manages a set of worker goroutines to handle tasks from a shared task queue. It efficiently utilizes system resources by controlling the number of active goroutines, making it useful for processing tasks concurrently without overwhelming the system.
- Task Queue: A buffered channel that holds tasks to be processed.
- Workers: Goroutines that pick tasks from the task queue and process them.
- Dispatcher: A goroutine that distributes tasks to the workers.
package main
import (
"fmt"
"sync"
"time"
)
type task struct {id int}
type result struct{id int}
func worker(id int, tasks <-chan task, results chan<- result) {
for task := range tasks {
fmt.Printf("Worker %d processing %#v\n", id, task)
time.Sleep(time.Second) // Simulate work
results <- result{id: task.id}
}
}
func main() {
numWorkers := 3
numTasks := 5
tasks := make(chan task, numTasks)
results := make(chan result, numTasks)
// launch workers
for i := 1; i <= numWorkers; i++ {
go worker(i, tasks, results)
}
// dispatch some tasks
for i := 1; i <= numTasks; i++ {
tasks <- task{id: i}
}
close(tasks) // indicates we're done adding tasks to the queue
// collect results
for a := 1; a <= numTasks; a++ {
fmt.Printf("%#v returned", <-results)
}
}
Running this program yields something like this:
Worker 3 processing main.task{id:1}
Worker 1 processing main.task{id:2}
Worker 2 processing main.task{id:3}
Worker 3 processing main.task{id:4}
Worker 2 processing main.task{id:5}
main.result{id:1} returned
main.result{id:3} returned
main.result{id:2} returned
main.result{id:5} returned
main.result{id:4} returned
Alternative implementation using a sync.WaitGroup
to wait for the results:
package main
import (
"fmt"
"sync"
"time"
)
type task struct{ id int }
type result struct{ id int }
func worker(id int, tasks <-chan task, results chan<- result, wg *sync.WaitGroup) {
defer wg.Done()
for task := range tasks {
fmt.Printf("Worker %d processing %#v\n", id, task)
time.Sleep(time.Second) // Simulate work
results <- result{id: task.id}
}
}
func main() {
numWorkers := 3
numTasks := 5
tasks := make(chan task, numTasks)
results := make(chan result, numTasks)
var wg sync.WaitGroup
wg.Add(numWorkers)
// launch workers
for i := 1; i <= numWorkers; i++ {
go worker(i, tasks, results, &wg)
}
// dispatch some tasks
for i := 1; i <= numTasks; i++ {
tasks <- task{id: i}
}
close(tasks) // indicates we're done adding tasks to the queue
// wait for all workers to finish
go func() {
wg.Wait()
close(results)
}()
// collect results
for res := range results {
fmt.Printf("%#v returned", res)
}
fmt.Println("All tasks processed.")
}
You are now well equiped to take on exercise 2.