From 30cf6dfff2bdf2da702f0e61394976e58d46d4ab Mon Sep 17 00:00:00 2001 From: aleidk Date: Tue, 26 Nov 2024 15:44:08 -0300 Subject: [PATCH] feat: add basic worker pool --- examples/usage.go | 22 ++++----- pkg/fetcher.go | 122 ++++++++++++++++++++++++++++++++++++++++++++++ pkg/platform.go | 41 ++++++++++++---- 3 files changed, 165 insertions(+), 20 deletions(-) diff --git a/examples/usage.go b/examples/usage.go index ea35ed7..51d3fc6 100644 --- a/examples/usage.go +++ b/examples/usage.go @@ -164,15 +164,15 @@ func main() { return } - for _, pokedex := range pokeApi.Collections { - if pokedex.IsDefault() { - continue - } - - err = pokedex.FetchNodes(getPokemons, synchronizator.StartPagination) - if err != nil { - fmt.Println(err) - return - } - } + // for _, pokedex := range pokeApi.Collections { + // if pokedex.IsDefault() { + // continue + // } + // + // err = pokedex.FetchNodes(getPokemons, synchronizator.StartPagination) + // if err != nil { + // fmt.Println(err) + // return + // } + // } } diff --git a/pkg/fetcher.go b/pkg/fetcher.go index 7c17c24..6473a7c 100644 --- a/pkg/fetcher.go +++ b/pkg/fetcher.go @@ -1,5 +1,24 @@ package synchronizator +import ( + "fmt" + "iter" + "sync" +) + +// Fetcher is the concurrent manager +// upon invocation, should create a worker pool of 1 to get the first set of results +// then base on the Patination Total and Limit, should distribute the workload +// +// It also needs to handle errors, rate-limits, retries strategies, and gracefull rejections +// +// It should return the pages not fetched for later retry +// +// Pagination should include a max-concurrent connection and rate-limit +// configuration to prevent having errors from external sources +// +// Maybe change the name to pagination or embed in another struct + type Fetcher = func(pagination Pagination) ([]*Collection, Pagination, error) type Pagination struct { @@ -15,3 +34,106 @@ var StartPagination = Pagination{ Limit: 10, Offset: 0, } + +// Work represents a function that processes a value of type S and returns a +// result of type T or an error. +type Work[T, S any] func(value T) (S, error) + +// Worker represents a worker that processes tasks of type S and sends results +// of type T. +type Worker[T, S any] struct { + id uint8 // id is the unique identifier of the worker. + receptor <-chan T // receptor is the channel from which the worker receives tasks. + transmiter chan<- S // transmiter is the channel to which the worker sends results. + wg *sync.WaitGroup // wg is the wait group to synchronize the completion of tasks. + work Work[T, S] // work is the function that processes tasks. +} + +type WorkerManager[T, S any] struct { + active_workers sync.WaitGroup + is_open_to_work bool + workers_receptor chan T + workers_transmiter chan S +} + +func (manager *WorkerManager[T, S]) AddWork(value T) error { + if !manager.is_open_to_work { + return fmt.Errorf("The manager is closed to add more work.") + } + manager.workers_receptor <- value + return nil +} + +func (manager *WorkerManager[T, S]) Stop() { + // Stop receiving new units of work + manager.is_open_to_work = false + close(manager.workers_receptor) +} + +func (manager *WorkerManager[T, S]) GetSingleWorkUnit() S { + return <-manager.workers_transmiter +} + +func (manager *WorkerManager[T, S]) GetWorkUnit() iter.Seq[S] { + // send a message through the done channel when all workers have stopped + done_channel := make(chan bool) + + go func() { + manager.active_workers.Wait() + close(done_channel) + }() + + manager.Stop() + + return func(yield func(S) bool) { + for { + // TODO: handle tiemouts and rate-limits + select { + case value := <-manager.workers_transmiter: + if !yield(value) { + return + } + case <-done_channel: + close(manager.workers_transmiter) + return + } + } + } +} + +func spawn_worker[T, S any](worker *Worker[T, S]) { + defer worker.wg.Done() + + // TODO: handle errors + for unit := range worker.receptor { + value, _ := worker.work(unit) + worker.transmiter <- value + } +} + +func createWorkerPool[T, S any](max_workers uint8, work Work[T, S]) *WorkerManager[T, S] { + channel_size := max_workers * 3 + + manager := &WorkerManager[T, S]{ + workers_receptor: make(chan T, channel_size), + workers_transmiter: make(chan S, channel_size), + } + + // create pool of workers + for i := range max_workers { + worker := &Worker[T, S]{ + id: uint8(i), + receptor: manager.workers_receptor, + transmiter: manager.workers_transmiter, + wg: &manager.active_workers, + work: work, + } + + go spawn_worker(worker) + manager.active_workers.Add(1) + } + + manager.is_open_to_work = true + + return manager +} diff --git a/pkg/platform.go b/pkg/platform.go index 0b6a304..2931ae2 100644 --- a/pkg/platform.go +++ b/pkg/platform.go @@ -1,6 +1,9 @@ package synchronizator -import "slices" +import ( + "fmt" + "slices" +) // Utility struct to represent a collection of nodes, it's a [Node] itself so all // the node's functionality is available. @@ -10,17 +13,37 @@ type Platform struct { } func (platform *Platform) FetchCollections(fetcher Fetcher, start_pagination Pagination) error { - collections, pagination, err := fetcher(start_pagination) - if err != nil { - return err - } - platform.Collections = slices.Concat(platform.Collections, collections) + fetchWrapper := func(offset int) ([]*Collection, error) { + fmt.Printf("Requesting offset: %v\n", offset) - if pagination.HasMore { - return platform.FetchCollections(fetcher, pagination) + pagination := start_pagination + + pagination.Offset = offset + collections, _, err := fetcher(pagination) + if err != nil { + return nil, err + } + + return collections, nil } - err = BulkCreateNode(platform._conn, platform.Collections) + manager := createWorkerPool[int, []*Collection](5, fetchWrapper) + + // TODO: get number of page dynamically and change Fetcher signature + pages := 4 + + for i := range pages { + err := manager.AddWork(i * start_pagination.Limit) + if err != nil { + return err + } + } + + for collections := range manager.GetWorkUnit() { + platform.Collections = slices.Concat(platform.Collections, collections) + } + + err := BulkCreateNode(platform._conn, platform.Collections) if err != nil { return err }