diff --git a/pkg/fetcher.go b/pkg/fetcher.go index 65a6d05..7b73061 100644 --- a/pkg/fetcher.go +++ b/pkg/fetcher.go @@ -1,8 +1,8 @@ package synchronizator import ( + "context" "fmt" - "iter" "math/rand" "sync" "time" @@ -80,137 +80,24 @@ type Worker[T, S any] struct { rate_limit <-chan time.Time } -type WorkerManager[T, S any] struct { - queue_tasks uint - processed_tasks uint - active_workers sync.WaitGroup - is_open_to_work bool - max_retries uint8 - base_retry_time time.Duration - failed_units []*WorkUnit[T, S] - workers_receptor chan WorkUnit[T, S] - workers_transmiter chan WorkUnit[T, S] +type WorkConfig struct { + tasks_processed sync.WaitGroup + max_workers uint8 + max_retries uint8 + base_retry_time time.Duration + rate_limit <-chan time.Time } -func (manager *WorkerManager[T, S]) AddWork(value T) error { - if !manager.is_open_to_work { - return fmt.Errorf("The manager is closed to add more work.") - } - - workUnit := WorkUnit[T, S]{ - argument: value, - timeout: 0, - attempts: 0, - } - - manager.workers_receptor <- workUnit - manager.queue_tasks++ - return nil -} - -func (manager *WorkerManager[T, S]) Stop() { - // Stop receiving new units of work - manager.is_open_to_work = false -} - -func (manager *WorkerManager[T, S]) GetSingleWorkUnit() S { - workUnit := <-manager.workers_transmiter - - return workUnit.result -} - -func (manager *WorkerManager[T, S]) handleFailedWorkUnit(workUnit *WorkUnit[T, S]) bool { - if manager.max_retries <= workUnit.attempts { - manager.failed_units = append(manager.failed_units, workUnit) - manager.processed_tasks++ - return false - } - - workUnit.attempts++ - - if workUnit.timeout == 0 { - workUnit.timeout = manager.base_retry_time - } else { - workUnit.timeout *= 2 - } - - go func() { - jitter := time.Duration(rand.Int63n(int64(workUnit.timeout))) - timeout := workUnit.timeout + jitter - fmt.Printf( - "Unit failed for %v time, retrying in: %v\n", - workUnit.attempts, - timeout, - ) - time.Sleep(timeout) - manager.workers_receptor <- *workUnit - }() - - return true -} - -func (manager *WorkerManager[T, S]) increment_processed_units() { - manager.processed_tasks++ - fmt.Printf("processed_tasks: %v\n", manager.processed_tasks) - - if manager.processed_tasks >= manager.queue_tasks { - close(manager.workers_receptor) - } -} - -func (manager *WorkerManager[T, S]) handleWorkUnit(workUnit *WorkUnit[T, S]) bool { - if workUnit.err != nil { - can_try_again := manager.handleFailedWorkUnit(workUnit) - - if !can_try_again { - manager.increment_processed_units() - } - return false - } - - manager.increment_processed_units() - - return true -} - -func (manager *WorkerManager[T, S]) GetWorkUnit() iter.Seq[S] { - // send a message through the done channel when all workers have stopped - done_channel := make(chan bool) - - go func() { - manager.active_workers.Wait() - close(done_channel) - }() - - manager.is_open_to_work = false - - return func(yield func(S) bool) { - for { - // TODO: handle tiemouts - select { - case workUnit := <-manager.workers_transmiter: - if is_successfull := manager.handleWorkUnit(&workUnit); !is_successfull { - continue - } - - if !yield(workUnit.result) { - return - } - case <-done_channel: - close(manager.workers_transmiter) - return - } - } - } -} - -func (manager *WorkerManager[T, S]) GetFailedUnits() []*WorkUnit[T, S] { - return manager.failed_units +type Channels[T, S any] struct { + tasks_queue chan T + tasks_done chan S + tasks_failed chan error + units_dispatcher chan WorkUnit[T, S] + units_receiver chan WorkUnit[T, S] } func spawn_worker[T, S any](worker *Worker[T, S]) { - defer worker.wg.Done() - + // TODO: handle tiemouts for workUnit := range worker.receptor { // Wait for rate-limit <-worker.rate_limit @@ -223,37 +110,151 @@ func spawn_worker[T, S any](worker *Worker[T, S]) { } } -func createWorkerPool[T, S any]( - max_workers uint8, - max_retries uint8, - rate_limit <-chan time.Time, - work Work[T, S], -) *WorkerManager[T, S] { - channel_size := max_workers * 3 +func handleFailedWorkUnit[T, S any]( + workUnit *WorkUnit[T, S], + channels *Channels[T, S], + config *WorkConfig, +) bool { + if config.max_retries <= workUnit.attempts { + channels.tasks_failed <- workUnit.err + config.tasks_processed.Done() + return false + } - manager := &WorkerManager[T, S]{ - max_retries: max_retries, - base_retry_time: time.Second, - workers_receptor: make(chan WorkUnit[T, S], channel_size), - workers_transmiter: make(chan WorkUnit[T, S], channel_size), + workUnit.attempts++ + workUnit.err = nil + + if workUnit.timeout == 0 { + workUnit.timeout = config.base_retry_time + } else { + workUnit.timeout *= 2 + } + + go func() { + jitter := time.Duration(rand.Int63n(int64(workUnit.timeout))) + timeout := workUnit.timeout + jitter + fmt.Printf( + "Unit with value %v failed for %v time, retrying in: %v\n", + workUnit.argument, + workUnit.attempts, + timeout, + ) + time.Sleep(timeout) + channels.units_dispatcher <- *workUnit + }() + + return true +} + +// this is in charge of what we return to the user +// exits when units_receiver is closed, which is done when the workers are closed +func listenForWorkResults[T, S any]( + ctx context.Context, + channels *Channels[T, S], + config *WorkConfig, +) { + for { + select { + case workUnit, ok := <-channels.units_receiver: + if !ok { + return + } + + if workUnit.err != nil { + handleFailedWorkUnit(&workUnit, channels, config) + continue + } + + // Send message to user + channels.tasks_done <- workUnit.result + config.tasks_processed.Done() + case <-ctx.Done(): + return + } + } +} + +// this is in charge of receive values and transform them into work units +// stops when the queue is empty +func workUnitDispatcher[T, S any]( + ctx context.Context, + finish context.CancelFunc, + channels *Channels[T, S], + config *WorkConfig, +) { + defer stopProcessingWork(finish, channels, config) + + for { + select { + case value, ok := <-channels.tasks_queue: + if !ok { + return + } + + workUnit := WorkUnit[T, S]{ + argument: value, + timeout: 0, + attempts: 0, + } + channels.units_dispatcher <- workUnit + config.tasks_processed.Add(1) + + case <-ctx.Done(): + fmt.Println("context done") + return + } + } +} + +// this wait for all workers to stop, then close the unit channels where the workers send values +// prevent closing the channel before the workers finish +func stopProcessingWork[T, S any]( + finish context.CancelFunc, + channels *Channels[T, S], + config *WorkConfig, +) { + config.tasks_processed.Wait() + + close(channels.units_receiver) + close(channels.units_dispatcher) + close(channels.tasks_done) + close(channels.tasks_failed) + + finish() +} + +func asyncTaskRunner[T, S any]( + ctx context.Context, + inbound chan T, + config *WorkConfig, + work Work[T, S], +) (<-chan S, <-chan error, <-chan struct{}) { + channel_size := config.max_workers * 3 + + done, finish := context.WithCancel(ctx) + + channels := &Channels[T, S]{ + tasks_queue: inbound, + tasks_done: make(chan S), + tasks_failed: make(chan error), + units_dispatcher: make(chan WorkUnit[T, S], channel_size), + units_receiver: make(chan WorkUnit[T, S], channel_size), } // create pool of workers - for i := range max_workers { + for i := range config.max_workers { worker := &Worker[T, S]{ id: uint8(i), - receptor: manager.workers_receptor, - transmiter: manager.workers_transmiter, - rate_limit: rate_limit, - wg: &manager.active_workers, + receptor: channels.units_dispatcher, + transmiter: channels.units_receiver, + rate_limit: config.rate_limit, work: work, } go spawn_worker(worker) - manager.active_workers.Add(1) } - manager.is_open_to_work = true - - return manager + go listenForWorkResults(done, channels, config) + go workUnitDispatcher(done, finish, channels, config) + return channels.tasks_done, channels.tasks_failed, done.Done() } diff --git a/pkg/platform.go b/pkg/platform.go index e66f1ef..293fd35 100644 --- a/pkg/platform.go +++ b/pkg/platform.go @@ -1,6 +1,7 @@ package synchronizator import ( + "context" "fmt" "slices" "time" @@ -15,7 +16,7 @@ type Platform struct { func (platform *Platform) FetchCollections(fetcher Fetcher, start_pagination Pagination) error { fetchWrapper := func(offset int) ([]*Collection, error) { - fmt.Printf("Requesting offset: %v\n", offset) + // fmt.Printf("Requesting offset: %v\n", offset) if offset == 10 { return nil, fmt.Errorf("Simulated error jeje") @@ -32,30 +33,51 @@ func (platform *Platform) FetchCollections(fetcher Fetcher, start_pagination Pag return collections, nil } - // 5 request per minute - rate_limit := NewRateLimit(5, time.Minute) + ctx, cancel := context.WithCancel(context.Background()) - manager := createWorkerPool[int, []*Collection](5, 2, rate_limit, fetchWrapper) + config := &WorkConfig{ + max_workers: 5, + max_retries: 2, + base_retry_time: time.Second, + rate_limit: NewRateLimit(5, time.Minute), + } + + tasks := make(chan int) // TODO: get number of page dynamically and change Fetcher signature pages := 4 + results, errors, done := asyncTaskRunner[int, []*Collection](ctx, tasks, config, fetchWrapper) + for i := range pages { - err := manager.AddWork(i * start_pagination.Limit) - if err != nil { - return err + tasks <- i * start_pagination.Limit + } + + close(tasks) + +loop: + for { + select { + case collection, ok := <-results: + if !ok { + continue + } + platform.Collections = slices.Concat(platform.Collections, collection) + case error, ok := <-errors: + if !ok { + continue + } + + fmt.Printf("There was an error: %v\n", error) + cancel() + case <-ctx.Done(): + break loop + case <-done: + break loop } } - successUnits := 0 - for collections := range manager.GetWorkUnit() { - platform.Collections = slices.Concat(platform.Collections, collections) - successUnits++ - } - - if successUnits != pages { - return fmt.Errorf("Units failed: %v", manager.GetFailedUnits()) - } + fmt.Printf("Collections: %v\n", len(platform.Collections)) err := BulkCreateNode(platform._conn, platform.Collections) if err != nil {