refactor: change custom logic for context in worker pool

This commit is contained in:
Alexander Navarro 2024-11-29 15:06:27 -03:00
parent b342437a43
commit eeefceb2fc
2 changed files with 187 additions and 164 deletions

View file

@ -1,8 +1,8 @@
package synchronizator package synchronizator
import ( import (
"context"
"fmt" "fmt"
"iter"
"math/rand" "math/rand"
"sync" "sync"
"time" "time"
@ -80,137 +80,24 @@ type Worker[T, S any] struct {
rate_limit <-chan time.Time rate_limit <-chan time.Time
} }
type WorkerManager[T, S any] struct { type WorkConfig struct {
queue_tasks uint tasks_processed sync.WaitGroup
processed_tasks uint max_workers uint8
active_workers sync.WaitGroup
is_open_to_work bool
max_retries uint8 max_retries uint8
base_retry_time time.Duration base_retry_time time.Duration
failed_units []*WorkUnit[T, S] rate_limit <-chan time.Time
workers_receptor chan WorkUnit[T, S]
workers_transmiter chan WorkUnit[T, S]
} }
func (manager *WorkerManager[T, S]) AddWork(value T) error { type Channels[T, S any] struct {
if !manager.is_open_to_work { tasks_queue chan T
return fmt.Errorf("The manager is closed to add more work.") tasks_done chan S
} tasks_failed chan error
units_dispatcher chan WorkUnit[T, S]
workUnit := WorkUnit[T, S]{ units_receiver chan WorkUnit[T, S]
argument: value,
timeout: 0,
attempts: 0,
}
manager.workers_receptor <- workUnit
manager.queue_tasks++
return nil
}
func (manager *WorkerManager[T, S]) Stop() {
// Stop receiving new units of work
manager.is_open_to_work = false
}
func (manager *WorkerManager[T, S]) GetSingleWorkUnit() S {
workUnit := <-manager.workers_transmiter
return workUnit.result
}
func (manager *WorkerManager[T, S]) handleFailedWorkUnit(workUnit *WorkUnit[T, S]) bool {
if manager.max_retries <= workUnit.attempts {
manager.failed_units = append(manager.failed_units, workUnit)
manager.processed_tasks++
return false
}
workUnit.attempts++
if workUnit.timeout == 0 {
workUnit.timeout = manager.base_retry_time
} else {
workUnit.timeout *= 2
}
go func() {
jitter := time.Duration(rand.Int63n(int64(workUnit.timeout)))
timeout := workUnit.timeout + jitter
fmt.Printf(
"Unit failed for %v time, retrying in: %v\n",
workUnit.attempts,
timeout,
)
time.Sleep(timeout)
manager.workers_receptor <- *workUnit
}()
return true
}
func (manager *WorkerManager[T, S]) increment_processed_units() {
manager.processed_tasks++
fmt.Printf("processed_tasks: %v\n", manager.processed_tasks)
if manager.processed_tasks >= manager.queue_tasks {
close(manager.workers_receptor)
}
}
func (manager *WorkerManager[T, S]) handleWorkUnit(workUnit *WorkUnit[T, S]) bool {
if workUnit.err != nil {
can_try_again := manager.handleFailedWorkUnit(workUnit)
if !can_try_again {
manager.increment_processed_units()
}
return false
}
manager.increment_processed_units()
return true
}
func (manager *WorkerManager[T, S]) GetWorkUnit() iter.Seq[S] {
// send a message through the done channel when all workers have stopped
done_channel := make(chan bool)
go func() {
manager.active_workers.Wait()
close(done_channel)
}()
manager.is_open_to_work = false
return func(yield func(S) bool) {
for {
// TODO: handle tiemouts
select {
case workUnit := <-manager.workers_transmiter:
if is_successfull := manager.handleWorkUnit(&workUnit); !is_successfull {
continue
}
if !yield(workUnit.result) {
return
}
case <-done_channel:
close(manager.workers_transmiter)
return
}
}
}
}
func (manager *WorkerManager[T, S]) GetFailedUnits() []*WorkUnit[T, S] {
return manager.failed_units
} }
func spawn_worker[T, S any](worker *Worker[T, S]) { func spawn_worker[T, S any](worker *Worker[T, S]) {
defer worker.wg.Done() // TODO: handle tiemouts
for workUnit := range worker.receptor { for workUnit := range worker.receptor {
// Wait for rate-limit // Wait for rate-limit
<-worker.rate_limit <-worker.rate_limit
@ -223,37 +110,151 @@ func spawn_worker[T, S any](worker *Worker[T, S]) {
} }
} }
func createWorkerPool[T, S any]( func handleFailedWorkUnit[T, S any](
max_workers uint8, workUnit *WorkUnit[T, S],
max_retries uint8, channels *Channels[T, S],
rate_limit <-chan time.Time, config *WorkConfig,
work Work[T, S], ) bool {
) *WorkerManager[T, S] { if config.max_retries <= workUnit.attempts {
channel_size := max_workers * 3 channels.tasks_failed <- workUnit.err
config.tasks_processed.Done()
return false
}
manager := &WorkerManager[T, S]{ workUnit.attempts++
max_retries: max_retries, workUnit.err = nil
base_retry_time: time.Second,
workers_receptor: make(chan WorkUnit[T, S], channel_size), if workUnit.timeout == 0 {
workers_transmiter: make(chan WorkUnit[T, S], channel_size), workUnit.timeout = config.base_retry_time
} else {
workUnit.timeout *= 2
}
go func() {
jitter := time.Duration(rand.Int63n(int64(workUnit.timeout)))
timeout := workUnit.timeout + jitter
fmt.Printf(
"Unit with value %v failed for %v time, retrying in: %v\n",
workUnit.argument,
workUnit.attempts,
timeout,
)
time.Sleep(timeout)
channels.units_dispatcher <- *workUnit
}()
return true
}
// this is in charge of what we return to the user
// exits when units_receiver is closed, which is done when the workers are closed
func listenForWorkResults[T, S any](
ctx context.Context,
channels *Channels[T, S],
config *WorkConfig,
) {
for {
select {
case workUnit, ok := <-channels.units_receiver:
if !ok {
return
}
if workUnit.err != nil {
handleFailedWorkUnit(&workUnit, channels, config)
continue
}
// Send message to user
channels.tasks_done <- workUnit.result
config.tasks_processed.Done()
case <-ctx.Done():
return
}
}
}
// this is in charge of receive values and transform them into work units
// stops when the queue is empty
func workUnitDispatcher[T, S any](
ctx context.Context,
finish context.CancelFunc,
channels *Channels[T, S],
config *WorkConfig,
) {
defer stopProcessingWork(finish, channels, config)
for {
select {
case value, ok := <-channels.tasks_queue:
if !ok {
return
}
workUnit := WorkUnit[T, S]{
argument: value,
timeout: 0,
attempts: 0,
}
channels.units_dispatcher <- workUnit
config.tasks_processed.Add(1)
case <-ctx.Done():
fmt.Println("context done")
return
}
}
}
// this wait for all workers to stop, then close the unit channels where the workers send values
// prevent closing the channel before the workers finish
func stopProcessingWork[T, S any](
finish context.CancelFunc,
channels *Channels[T, S],
config *WorkConfig,
) {
config.tasks_processed.Wait()
close(channels.units_receiver)
close(channels.units_dispatcher)
close(channels.tasks_done)
close(channels.tasks_failed)
finish()
}
func asyncTaskRunner[T, S any](
ctx context.Context,
inbound chan T,
config *WorkConfig,
work Work[T, S],
) (<-chan S, <-chan error, <-chan struct{}) {
channel_size := config.max_workers * 3
done, finish := context.WithCancel(ctx)
channels := &Channels[T, S]{
tasks_queue: inbound,
tasks_done: make(chan S),
tasks_failed: make(chan error),
units_dispatcher: make(chan WorkUnit[T, S], channel_size),
units_receiver: make(chan WorkUnit[T, S], channel_size),
} }
// create pool of workers // create pool of workers
for i := range max_workers { for i := range config.max_workers {
worker := &Worker[T, S]{ worker := &Worker[T, S]{
id: uint8(i), id: uint8(i),
receptor: manager.workers_receptor, receptor: channels.units_dispatcher,
transmiter: manager.workers_transmiter, transmiter: channels.units_receiver,
rate_limit: rate_limit, rate_limit: config.rate_limit,
wg: &manager.active_workers,
work: work, work: work,
} }
go spawn_worker(worker) go spawn_worker(worker)
manager.active_workers.Add(1)
} }
manager.is_open_to_work = true go listenForWorkResults(done, channels, config)
go workUnitDispatcher(done, finish, channels, config)
return manager return channels.tasks_done, channels.tasks_failed, done.Done()
} }

View file

@ -1,6 +1,7 @@
package synchronizator package synchronizator
import ( import (
"context"
"fmt" "fmt"
"slices" "slices"
"time" "time"
@ -15,7 +16,7 @@ type Platform struct {
func (platform *Platform) FetchCollections(fetcher Fetcher, start_pagination Pagination) error { func (platform *Platform) FetchCollections(fetcher Fetcher, start_pagination Pagination) error {
fetchWrapper := func(offset int) ([]*Collection, error) { fetchWrapper := func(offset int) ([]*Collection, error) {
fmt.Printf("Requesting offset: %v\n", offset) // fmt.Printf("Requesting offset: %v\n", offset)
if offset == 10 { if offset == 10 {
return nil, fmt.Errorf("Simulated error jeje") return nil, fmt.Errorf("Simulated error jeje")
@ -32,30 +33,51 @@ func (platform *Platform) FetchCollections(fetcher Fetcher, start_pagination Pag
return collections, nil return collections, nil
} }
// 5 request per minute ctx, cancel := context.WithCancel(context.Background())
rate_limit := NewRateLimit(5, time.Minute)
manager := createWorkerPool[int, []*Collection](5, 2, rate_limit, fetchWrapper) config := &WorkConfig{
max_workers: 5,
max_retries: 2,
base_retry_time: time.Second,
rate_limit: NewRateLimit(5, time.Minute),
}
tasks := make(chan int)
// TODO: get number of page dynamically and change Fetcher signature // TODO: get number of page dynamically and change Fetcher signature
pages := 4 pages := 4
results, errors, done := asyncTaskRunner[int, []*Collection](ctx, tasks, config, fetchWrapper)
for i := range pages { for i := range pages {
err := manager.AddWork(i * start_pagination.Limit) tasks <- i * start_pagination.Limit
if err != nil { }
return err
close(tasks)
loop:
for {
select {
case collection, ok := <-results:
if !ok {
continue
}
platform.Collections = slices.Concat(platform.Collections, collection)
case error, ok := <-errors:
if !ok {
continue
}
fmt.Printf("There was an error: %v\n", error)
cancel()
case <-ctx.Done():
break loop
case <-done:
break loop
} }
} }
successUnits := 0 fmt.Printf("Collections: %v\n", len(platform.Collections))
for collections := range manager.GetWorkUnit() {
platform.Collections = slices.Concat(platform.Collections, collections)
successUnits++
}
if successUnits != pages {
return fmt.Errorf("Units failed: %v", manager.GetFailedUnits())
}
err := BulkCreateNode(platform._conn, platform.Collections) err := BulkCreateNode(platform._conn, platform.Collections)
if err != nil { if err != nil {