This repository has been archived on 2025-05-15. You can view files and clone it, but you cannot make any changes to it's state, such as pushing and creating new issues, pull requests or comments.
synchronizator-go/pkg/fetcher.go

139 lines
3.5 KiB
Go

package synchronizator
import (
"fmt"
"iter"
"sync"
)
// Fetcher is the concurrent manager
// upon invocation, should create a worker pool of 1 to get the first set of results
// then base on the Patination Total and Limit, should distribute the workload
//
// It also needs to handle errors, rate-limits, retries strategies, and gracefull rejections
//
// It should return the pages not fetched for later retry
//
// Pagination should include a max-concurrent connection and rate-limit
// configuration to prevent having errors from external sources
//
// Maybe change the name to pagination or embed in another struct
type Fetcher = func(pagination Pagination) ([]*Collection, Pagination, error)
type Pagination struct {
Total int
HasMore bool
Limit int
Offset int
}
var StartPagination = Pagination{
Total: 0,
HasMore: false,
Limit: 10,
Offset: 0,
}
// Work represents a function that processes a value of type S and returns a
// result of type T or an error.
type Work[T, S any] func(value T) (S, error)
// Worker represents a worker that processes tasks of type S and sends results
// of type T.
type Worker[T, S any] struct {
id uint8 // id is the unique identifier of the worker.
receptor <-chan T // receptor is the channel from which the worker receives tasks.
transmiter chan<- S // transmiter is the channel to which the worker sends results.
wg *sync.WaitGroup // wg is the wait group to synchronize the completion of tasks.
work Work[T, S] // work is the function that processes tasks.
}
type WorkerManager[T, S any] struct {
active_workers sync.WaitGroup
is_open_to_work bool
workers_receptor chan T
workers_transmiter chan S
}
func (manager *WorkerManager[T, S]) AddWork(value T) error {
if !manager.is_open_to_work {
return fmt.Errorf("The manager is closed to add more work.")
}
manager.workers_receptor <- value
return nil
}
func (manager *WorkerManager[T, S]) Stop() {
// Stop receiving new units of work
manager.is_open_to_work = false
close(manager.workers_receptor)
}
func (manager *WorkerManager[T, S]) GetSingleWorkUnit() S {
return <-manager.workers_transmiter
}
func (manager *WorkerManager[T, S]) GetWorkUnit() iter.Seq[S] {
// send a message through the done channel when all workers have stopped
done_channel := make(chan bool)
go func() {
manager.active_workers.Wait()
close(done_channel)
}()
manager.Stop()
return func(yield func(S) bool) {
for {
// TODO: handle tiemouts and rate-limits
select {
case value := <-manager.workers_transmiter:
if !yield(value) {
return
}
case <-done_channel:
close(manager.workers_transmiter)
return
}
}
}
}
func spawn_worker[T, S any](worker *Worker[T, S]) {
defer worker.wg.Done()
// TODO: handle errors
for unit := range worker.receptor {
value, _ := worker.work(unit)
worker.transmiter <- value
}
}
func createWorkerPool[T, S any](max_workers uint8, work Work[T, S]) *WorkerManager[T, S] {
channel_size := max_workers * 3
manager := &WorkerManager[T, S]{
workers_receptor: make(chan T, channel_size),
workers_transmiter: make(chan S, channel_size),
}
// create pool of workers
for i := range max_workers {
worker := &Worker[T, S]{
id: uint8(i),
receptor: manager.workers_receptor,
transmiter: manager.workers_transmiter,
wg: &manager.active_workers,
work: work,
}
go spawn_worker(worker)
manager.active_workers.Add(1)
}
manager.is_open_to_work = true
return manager
}