feat: add basic retry capabilities to worker pool

This commit is contained in:
Alexander Navarro 2024-11-28 20:48:56 -03:00
parent 2d1041f167
commit b342437a43
2 changed files with 123 additions and 20 deletions

View file

@ -3,6 +3,7 @@ package synchronizator
import ( import (
"fmt" "fmt"
"iter" "iter"
"math/rand"
"sync" "sync"
"time" "time"
) )
@ -53,6 +54,17 @@ func NewRateLimit(request_per int, time_scale time.Duration) <-chan time.Time {
return rate_limit return rate_limit
} }
// T represent the argument of the function to run
// S represent the return value of the function to run
type WorkUnit[T, S any] struct {
argument T
result S
err error
timeout time.Duration
attempts uint8
}
// Work represents a function that processes a value of type S and returns a // Work represents a function that processes a value of type S and returns a
// result of type T or an error. // result of type T or an error.
type Work[T, S any] func(value T) (S, error) type Work[T, S any] func(value T) (S, error)
@ -60,37 +72,105 @@ type Work[T, S any] func(value T) (S, error)
// Worker represents a worker that processes tasks of type S and sends results // Worker represents a worker that processes tasks of type S and sends results
// of type T. // of type T.
type Worker[T, S any] struct { type Worker[T, S any] struct {
id uint8 // id is the unique identifier of the worker. id uint8 // id is the unique identifier of the worker.
receptor <-chan T // receptor is the channel from which the worker receives tasks. receptor <-chan WorkUnit[T, S] // receptor is the channel from which the worker receives tasks.
transmiter chan<- S // transmiter is the channel to which the worker sends results. transmiter chan<- WorkUnit[T, S] // transmiter is the channel to which the worker sends results.
wg *sync.WaitGroup // wg is the wait group to synchronize the completion of tasks. wg *sync.WaitGroup // wg is the wait group to synchronize the completion of tasks.
work Work[T, S] // work is the function that processes tasks. work Work[T, S] // work is the function that processes tasks.
rate_limit <-chan time.Time rate_limit <-chan time.Time
} }
type WorkerManager[T, S any] struct { type WorkerManager[T, S any] struct {
queue_tasks uint
processed_tasks uint
active_workers sync.WaitGroup active_workers sync.WaitGroup
is_open_to_work bool is_open_to_work bool
workers_receptor chan T max_retries uint8
workers_transmiter chan S base_retry_time time.Duration
failed_units []*WorkUnit[T, S]
workers_receptor chan WorkUnit[T, S]
workers_transmiter chan WorkUnit[T, S]
} }
func (manager *WorkerManager[T, S]) AddWork(value T) error { func (manager *WorkerManager[T, S]) AddWork(value T) error {
if !manager.is_open_to_work { if !manager.is_open_to_work {
return fmt.Errorf("The manager is closed to add more work.") return fmt.Errorf("The manager is closed to add more work.")
} }
manager.workers_receptor <- value
workUnit := WorkUnit[T, S]{
argument: value,
timeout: 0,
attempts: 0,
}
manager.workers_receptor <- workUnit
manager.queue_tasks++
return nil return nil
} }
func (manager *WorkerManager[T, S]) Stop() { func (manager *WorkerManager[T, S]) Stop() {
// Stop receiving new units of work // Stop receiving new units of work
manager.is_open_to_work = false manager.is_open_to_work = false
close(manager.workers_receptor)
} }
func (manager *WorkerManager[T, S]) GetSingleWorkUnit() S { func (manager *WorkerManager[T, S]) GetSingleWorkUnit() S {
return <-manager.workers_transmiter workUnit := <-manager.workers_transmiter
return workUnit.result
}
func (manager *WorkerManager[T, S]) handleFailedWorkUnit(workUnit *WorkUnit[T, S]) bool {
if manager.max_retries <= workUnit.attempts {
manager.failed_units = append(manager.failed_units, workUnit)
manager.processed_tasks++
return false
}
workUnit.attempts++
if workUnit.timeout == 0 {
workUnit.timeout = manager.base_retry_time
} else {
workUnit.timeout *= 2
}
go func() {
jitter := time.Duration(rand.Int63n(int64(workUnit.timeout)))
timeout := workUnit.timeout + jitter
fmt.Printf(
"Unit failed for %v time, retrying in: %v\n",
workUnit.attempts,
timeout,
)
time.Sleep(timeout)
manager.workers_receptor <- *workUnit
}()
return true
}
func (manager *WorkerManager[T, S]) increment_processed_units() {
manager.processed_tasks++
fmt.Printf("processed_tasks: %v\n", manager.processed_tasks)
if manager.processed_tasks >= manager.queue_tasks {
close(manager.workers_receptor)
}
}
func (manager *WorkerManager[T, S]) handleWorkUnit(workUnit *WorkUnit[T, S]) bool {
if workUnit.err != nil {
can_try_again := manager.handleFailedWorkUnit(workUnit)
if !can_try_again {
manager.increment_processed_units()
}
return false
}
manager.increment_processed_units()
return true
} }
func (manager *WorkerManager[T, S]) GetWorkUnit() iter.Seq[S] { func (manager *WorkerManager[T, S]) GetWorkUnit() iter.Seq[S] {
@ -102,14 +182,18 @@ func (manager *WorkerManager[T, S]) GetWorkUnit() iter.Seq[S] {
close(done_channel) close(done_channel)
}() }()
manager.Stop() manager.is_open_to_work = false
return func(yield func(S) bool) { return func(yield func(S) bool) {
for { for {
// TODO: handle tiemouts // TODO: handle tiemouts
select { select {
case value := <-manager.workers_transmiter: case workUnit := <-manager.workers_transmiter:
if !yield(value) { if is_successfull := manager.handleWorkUnit(&workUnit); !is_successfull {
continue
}
if !yield(workUnit.result) {
return return
} }
case <-done_channel: case <-done_channel:
@ -120,29 +204,38 @@ func (manager *WorkerManager[T, S]) GetWorkUnit() iter.Seq[S] {
} }
} }
func (manager *WorkerManager[T, S]) GetFailedUnits() []*WorkUnit[T, S] {
return manager.failed_units
}
func spawn_worker[T, S any](worker *Worker[T, S]) { func spawn_worker[T, S any](worker *Worker[T, S]) {
defer worker.wg.Done() defer worker.wg.Done()
// TODO: handle errors for workUnit := range worker.receptor {
for unit := range worker.receptor {
// Wait for rate-limit // Wait for rate-limit
<-worker.rate_limit <-worker.rate_limit
value, _ := worker.work(unit) value, err := worker.work(workUnit.argument)
worker.transmiter <- value workUnit.result = value
workUnit.err = err
worker.transmiter <- workUnit
} }
} }
func createWorkerPool[T, S any]( func createWorkerPool[T, S any](
max_workers uint8, max_workers uint8,
max_retries uint8,
rate_limit <-chan time.Time, rate_limit <-chan time.Time,
work Work[T, S], work Work[T, S],
) *WorkerManager[T, S] { ) *WorkerManager[T, S] {
channel_size := max_workers * 3 channel_size := max_workers * 3
manager := &WorkerManager[T, S]{ manager := &WorkerManager[T, S]{
workers_receptor: make(chan T, channel_size), max_retries: max_retries,
workers_transmiter: make(chan S, channel_size), base_retry_time: time.Second,
workers_receptor: make(chan WorkUnit[T, S], channel_size),
workers_transmiter: make(chan WorkUnit[T, S], channel_size),
} }
// create pool of workers // create pool of workers

View file

@ -17,6 +17,10 @@ func (platform *Platform) FetchCollections(fetcher Fetcher, start_pagination Pag
fetchWrapper := func(offset int) ([]*Collection, error) { fetchWrapper := func(offset int) ([]*Collection, error) {
fmt.Printf("Requesting offset: %v\n", offset) fmt.Printf("Requesting offset: %v\n", offset)
if offset == 10 {
return nil, fmt.Errorf("Simulated error jeje")
}
pagination := start_pagination pagination := start_pagination
pagination.Offset = offset pagination.Offset = offset
@ -31,7 +35,7 @@ func (platform *Platform) FetchCollections(fetcher Fetcher, start_pagination Pag
// 5 request per minute // 5 request per minute
rate_limit := NewRateLimit(5, time.Minute) rate_limit := NewRateLimit(5, time.Minute)
manager := createWorkerPool[int, []*Collection](5, rate_limit, fetchWrapper) manager := createWorkerPool[int, []*Collection](5, 2, rate_limit, fetchWrapper)
// TODO: get number of page dynamically and change Fetcher signature // TODO: get number of page dynamically and change Fetcher signature
pages := 4 pages := 4
@ -43,8 +47,14 @@ func (platform *Platform) FetchCollections(fetcher Fetcher, start_pagination Pag
} }
} }
successUnits := 0
for collections := range manager.GetWorkUnit() { for collections := range manager.GetWorkUnit() {
platform.Collections = slices.Concat(platform.Collections, collections) platform.Collections = slices.Concat(platform.Collections, collections)
successUnits++
}
if successUnits != pages {
return fmt.Errorf("Units failed: %v", manager.GetFailedUnits())
} }
err := BulkCreateNode(platform._conn, platform.Collections) err := BulkCreateNode(platform._conn, platform.Collections)