diff --git a/pkg/fetcher.go b/pkg/fetcher.go index 5b046ec..65a6d05 100644 --- a/pkg/fetcher.go +++ b/pkg/fetcher.go @@ -3,6 +3,7 @@ package synchronizator import ( "fmt" "iter" + "math/rand" "sync" "time" ) @@ -53,6 +54,17 @@ func NewRateLimit(request_per int, time_scale time.Duration) <-chan time.Time { return rate_limit } +// T represent the argument of the function to run +// S represent the return value of the function to run + +type WorkUnit[T, S any] struct { + argument T + result S + err error + timeout time.Duration + attempts uint8 +} + // Work represents a function that processes a value of type S and returns a // result of type T or an error. type Work[T, S any] func(value T) (S, error) @@ -60,37 +72,105 @@ type Work[T, S any] func(value T) (S, error) // Worker represents a worker that processes tasks of type S and sends results // of type T. type Worker[T, S any] struct { - id uint8 // id is the unique identifier of the worker. - receptor <-chan T // receptor is the channel from which the worker receives tasks. - transmiter chan<- S // transmiter is the channel to which the worker sends results. - wg *sync.WaitGroup // wg is the wait group to synchronize the completion of tasks. - work Work[T, S] // work is the function that processes tasks. + id uint8 // id is the unique identifier of the worker. + receptor <-chan WorkUnit[T, S] // receptor is the channel from which the worker receives tasks. + transmiter chan<- WorkUnit[T, S] // transmiter is the channel to which the worker sends results. + wg *sync.WaitGroup // wg is the wait group to synchronize the completion of tasks. + work Work[T, S] // work is the function that processes tasks. rate_limit <-chan time.Time } type WorkerManager[T, S any] struct { + queue_tasks uint + processed_tasks uint active_workers sync.WaitGroup is_open_to_work bool - workers_receptor chan T - workers_transmiter chan S + max_retries uint8 + base_retry_time time.Duration + failed_units []*WorkUnit[T, S] + workers_receptor chan WorkUnit[T, S] + workers_transmiter chan WorkUnit[T, S] } func (manager *WorkerManager[T, S]) AddWork(value T) error { if !manager.is_open_to_work { return fmt.Errorf("The manager is closed to add more work.") } - manager.workers_receptor <- value + + workUnit := WorkUnit[T, S]{ + argument: value, + timeout: 0, + attempts: 0, + } + + manager.workers_receptor <- workUnit + manager.queue_tasks++ return nil } func (manager *WorkerManager[T, S]) Stop() { // Stop receiving new units of work manager.is_open_to_work = false - close(manager.workers_receptor) } func (manager *WorkerManager[T, S]) GetSingleWorkUnit() S { - return <-manager.workers_transmiter + workUnit := <-manager.workers_transmiter + + return workUnit.result +} + +func (manager *WorkerManager[T, S]) handleFailedWorkUnit(workUnit *WorkUnit[T, S]) bool { + if manager.max_retries <= workUnit.attempts { + manager.failed_units = append(manager.failed_units, workUnit) + manager.processed_tasks++ + return false + } + + workUnit.attempts++ + + if workUnit.timeout == 0 { + workUnit.timeout = manager.base_retry_time + } else { + workUnit.timeout *= 2 + } + + go func() { + jitter := time.Duration(rand.Int63n(int64(workUnit.timeout))) + timeout := workUnit.timeout + jitter + fmt.Printf( + "Unit failed for %v time, retrying in: %v\n", + workUnit.attempts, + timeout, + ) + time.Sleep(timeout) + manager.workers_receptor <- *workUnit + }() + + return true +} + +func (manager *WorkerManager[T, S]) increment_processed_units() { + manager.processed_tasks++ + fmt.Printf("processed_tasks: %v\n", manager.processed_tasks) + + if manager.processed_tasks >= manager.queue_tasks { + close(manager.workers_receptor) + } +} + +func (manager *WorkerManager[T, S]) handleWorkUnit(workUnit *WorkUnit[T, S]) bool { + if workUnit.err != nil { + can_try_again := manager.handleFailedWorkUnit(workUnit) + + if !can_try_again { + manager.increment_processed_units() + } + return false + } + + manager.increment_processed_units() + + return true } func (manager *WorkerManager[T, S]) GetWorkUnit() iter.Seq[S] { @@ -102,14 +182,18 @@ func (manager *WorkerManager[T, S]) GetWorkUnit() iter.Seq[S] { close(done_channel) }() - manager.Stop() + manager.is_open_to_work = false return func(yield func(S) bool) { for { // TODO: handle tiemouts select { - case value := <-manager.workers_transmiter: - if !yield(value) { + case workUnit := <-manager.workers_transmiter: + if is_successfull := manager.handleWorkUnit(&workUnit); !is_successfull { + continue + } + + if !yield(workUnit.result) { return } case <-done_channel: @@ -120,29 +204,38 @@ func (manager *WorkerManager[T, S]) GetWorkUnit() iter.Seq[S] { } } +func (manager *WorkerManager[T, S]) GetFailedUnits() []*WorkUnit[T, S] { + return manager.failed_units +} + func spawn_worker[T, S any](worker *Worker[T, S]) { defer worker.wg.Done() - // TODO: handle errors - for unit := range worker.receptor { + for workUnit := range worker.receptor { // Wait for rate-limit <-worker.rate_limit - value, _ := worker.work(unit) - worker.transmiter <- value + value, err := worker.work(workUnit.argument) + workUnit.result = value + workUnit.err = err + + worker.transmiter <- workUnit } } func createWorkerPool[T, S any]( max_workers uint8, + max_retries uint8, rate_limit <-chan time.Time, work Work[T, S], ) *WorkerManager[T, S] { channel_size := max_workers * 3 manager := &WorkerManager[T, S]{ - workers_receptor: make(chan T, channel_size), - workers_transmiter: make(chan S, channel_size), + max_retries: max_retries, + base_retry_time: time.Second, + workers_receptor: make(chan WorkUnit[T, S], channel_size), + workers_transmiter: make(chan WorkUnit[T, S], channel_size), } // create pool of workers diff --git a/pkg/platform.go b/pkg/platform.go index 12aec74..e66f1ef 100644 --- a/pkg/platform.go +++ b/pkg/platform.go @@ -17,6 +17,10 @@ func (platform *Platform) FetchCollections(fetcher Fetcher, start_pagination Pag fetchWrapper := func(offset int) ([]*Collection, error) { fmt.Printf("Requesting offset: %v\n", offset) + if offset == 10 { + return nil, fmt.Errorf("Simulated error jeje") + } + pagination := start_pagination pagination.Offset = offset @@ -31,7 +35,7 @@ func (platform *Platform) FetchCollections(fetcher Fetcher, start_pagination Pag // 5 request per minute rate_limit := NewRateLimit(5, time.Minute) - manager := createWorkerPool[int, []*Collection](5, rate_limit, fetchWrapper) + manager := createWorkerPool[int, []*Collection](5, 2, rate_limit, fetchWrapper) // TODO: get number of page dynamically and change Fetcher signature pages := 4 @@ -43,8 +47,14 @@ func (platform *Platform) FetchCollections(fetcher Fetcher, start_pagination Pag } } + successUnits := 0 for collections := range manager.GetWorkUnit() { platform.Collections = slices.Concat(platform.Collections, collections) + successUnits++ + } + + if successUnits != pages { + return fmt.Errorf("Units failed: %v", manager.GetFailedUnits()) } err := BulkCreateNode(platform._conn, platform.Collections)