feat: add basic worker pool

This commit is contained in:
Alexander Navarro 2024-11-26 15:44:08 -03:00
parent d1a0212cb1
commit 56d2e72528
3 changed files with 144 additions and 12 deletions

View file

@ -164,15 +164,15 @@ func main() {
return
}
for _, pokedex := range pokeApi.Collections {
if pokedex.IsDefault() {
continue
}
err = pokedex.FetchNodes(getPokemons, synchronizator.StartPagination)
if err != nil {
fmt.Println(err)
return
}
}
// for _, pokedex := range pokeApi.Collections {
// if pokedex.IsDefault() {
// continue
// }
//
// err = pokedex.FetchNodes(getPokemons, synchronizator.StartPagination)
// if err != nil {
// fmt.Println(err)
// return
// }
// }
}

View file

@ -1,5 +1,24 @@
package synchronizator
import (
"fmt"
"iter"
"sync"
)
// Fetcher is the concurrent manager
// upon invocation, should create a worker pool of 1 to get the first set of results
// then base on the Patination Total and Limit, should distribute the workload
//
// It also needs to handle errors, rate-limits, retries strategies, and gracefull rejections
//
// It should return the pages not fetched for later retry
//
// Pagination should include a max-concurrent connection and rate-limit
// configuration to prevent having errors from external sources
//
// Maybe change the name to pagination or embed in another struct
type Fetcher = func(pagination Pagination) ([]*Collection, Pagination, error)
type Pagination struct {
@ -15,3 +34,93 @@ var StartPagination = Pagination{
Limit: 10,
Offset: 0,
}
type Work[T any] func(value T) T
type Worker[T any] struct {
id uint8
receptor <-chan T
transmiter chan<- T
wg *sync.WaitGroup
work Work[T]
}
type WorkerManager[T any] struct {
active_workers sync.WaitGroup
is_open_to_work bool
workers_receptor chan T
workers_transmiter chan T
}
func (manager *WorkerManager[T]) AddWork(value T) error {
if !manager.is_open_to_work {
return fmt.Errorf("The manager is closed to add more work.")
}
manager.workers_receptor <- value
return nil
}
func (manager *WorkerManager[T]) Stop() {
// Stop receiving new units of work
manager.is_open_to_work = false
close(manager.workers_receptor)
}
func (manager *WorkerManager[T]) GetWorkUnit() iter.Seq[T] {
// send a message through the done channel when all workers have stopped
done_channel := make(chan bool)
go func() {
manager.active_workers.Wait()
close(done_channel)
}()
manager.Stop()
return func(yield func(T) bool) {
for {
select {
case value := <-manager.workers_transmiter:
if !yield(value) {
return
}
case <-done_channel:
close(manager.workers_transmiter)
return
}
}
}
}
func spawn_worker[T any](worker *Worker[T]) {
defer worker.wg.Done()
for unit := range worker.receptor {
worker.transmiter <- worker.work(unit)
}
}
func createWorkerPool[T any](max_workers uint8, work Work[T]) *WorkerManager[T] {
manager := &WorkerManager[T]{
workers_receptor: make(chan T, max_workers),
workers_transmiter: make(chan T, max_workers),
}
// create pool of workers
for i := range max_workers {
worker := &Worker[T]{
id: uint8(i),
receptor: manager.workers_receptor,
transmiter: manager.workers_transmiter,
wg: &manager.active_workers,
work: work,
}
go spawn_worker(worker)
manager.active_workers.Add(1)
}
manager.is_open_to_work = true
return manager
}

View file

@ -1,6 +1,9 @@
package synchronizator
import "slices"
import (
"fmt"
"slices"
)
// Utility struct to represent a collection of nodes, it's a [Node] itself so all
// the node's functionality is available.
@ -10,6 +13,26 @@ type Platform struct {
}
func (platform *Platform) FetchCollections(fetcher Fetcher, start_pagination Pagination) error {
square := func(value int) int {
return value * value
}
manager := createWorkerPool[int](5, square)
err := manager.AddWork(5)
if err != nil {
return err
}
for value := range manager.GetWorkUnit() {
fmt.Printf("%v: Concurrent value: %v \n", start_pagination.Offset, value)
}
err = manager.AddWork(45)
if err != nil {
return err
}
collections, pagination, err := fetcher(start_pagination)
if err != nil {
return err