diff --git a/pkg/fetcher.go b/pkg/fetcher.go index b7b381a..b22d4ee 100644 --- a/pkg/fetcher.go +++ b/pkg/fetcher.go @@ -8,28 +8,19 @@ import ( "time" ) -// Fetcher is the concurrent manager -// upon invocation, should create a worker pool of 1 to get the first set of results -// then base on the Patination Total and Limit, should distribute the workload -// -// It also needs to handle errors, rate-limits, retries strategies, and gracefull rejections -// -// It should return the pages not fetched for later retry -// -// Pagination should include a max-concurrent connection and rate-limit -// configuration to prevent having errors from external sources -// -// Maybe change the name to pagination or embed in another struct - +// Fetcher is a function type that fetches a collection of items based on the provided pagination information. +// It returns a slice of collections, updated pagination information, and an error if any occurred. type Fetcher = func(pagination Pagination) ([]*Collection, Pagination, error) +// Pagination represents the pagination information for fetching data. type Pagination struct { - Total int - HasMore bool - Limit int - Offset int + Total int // Total number of items. + HasMore bool // Indicates if there are more items to fetch. + Limit int // Number of items to fetch per request. + Offset int // Offset for the next set of items to fetch. } +// StartPagination is the initial pagination configuration. var StartPagination = Pagination{ Total: 0, HasMore: false, @@ -37,7 +28,10 @@ var StartPagination = Pagination{ Offset: 0, } -func NewRateLimit(request_per int, time_scale time.Duration) <-chan time.Time { +// NewRateLimiter creates a rate limiter that emits time events at a specified rate. +// request_per specifies the number of requests allowed per time_scale duration. +// time_scale specifies the duration over which the requests are allowed. +func NewRateLimiter(request_per int, time_scale time.Duration) <-chan time.Time { rate_limit := make(chan time.Time, request_per) tickrate := time_scale / time.Duration(request_per) @@ -54,63 +48,77 @@ func NewRateLimit(request_per int, time_scale time.Duration) <-chan time.Time { return rate_limit } -// T represent the argument of the function to run -// S represent the return value of the function to run - +// WorkUnit represents a unit of work to be processed. type WorkUnit[T, S any] struct { - argument T - result S - err error - timeout time.Duration - attempts uint8 + argument T // Argument to be processed by a worker. + result S // Result of the processing. + err error // Error encountered during processing. + timeout time.Duration // Timeout for the work unit. + attempts uint8 // Number of attempts made to process the work unit. } -// Work represents a function that processes a value of type S and returns a -// result of type T or an error. +// Work defines a function type that processes a value of type T and returns a result of type S or an error. type Work[T, S any] func(value T) (S, error) -// Worker represents a worker that processes tasks of type S and sends results -// of type T. +// Worker represents a worker that processes tasks of type T and returns results of type S. type Worker[T, S any] struct { - id uint8 // id is the unique identifier of the worker. - receptor <-chan WorkUnit[T, S] // receptor is the channel from which the worker receives tasks. - transmiter chan<- WorkUnit[T, S] // transmiter is the channel to which the worker sends results. - wg *sync.WaitGroup // wg is the wait group to synchronize the completion of tasks. - work Work[T, S] // work is the function that processes tasks. - rate_limit <-chan time.Time - timeout time.Duration + id uint8 // Unique identifier of the worker. + receptor <-chan WorkUnit[T, S] // Channel from which the worker receives tasks. + transmitter chan<- WorkUnit[T, S] // Channel to which the worker sends results. + wg *sync.WaitGroup // Wait group to synchronize the completion of tasks. should be decremented each time a task has been processed + work Work[T, S] // Function that processes tasks. + rate_limit <-chan time.Time // Ticker to limit the amount of request. Is recomended to pass the result of calling NewRateLimiter(). + timeout time.Duration // Maximum execution time allowed for a task before beign canceled. } +// WorkConfig represents the configuration for the work processing. +// All fields are optional and have sensible defaults: +// +// - tasks_processed: always a new WaitGroup +// - max_workers: default is 5 +// - amount_of_workers: if 0, retries are disabled +// - base_retry_time: default is 1 second +// - rate_limit: default is 10 requests per second +// - timeout: if 0, timeout is disabled type WorkConfig struct { - tasks_processed sync.WaitGroup - max_workers uint8 - max_retries uint8 - base_retry_time time.Duration - rate_limit <-chan time.Time - timeout time.Duration + tasks_processed sync.WaitGroup // Wait group to synchronize task completion. + amount_of_workers uint8 // Number of workers to spawn. + max_retries uint8 // Maximum number of retries for a task before beign cancelled. + base_retry_time time.Duration // Base factor to wait for before retrying a task. + rate_limit <-chan time.Time // Ticker to limit the amount of request. Is recomended to pass the result of calling NewRateLimiter(). + timeout time.Duration // Maximum execution time allowed for a task before beign canceled. } +// Group the channels used for task processing for easy access between functions. type Channels[T, S any] struct { - tasks_queue chan T - tasks_done chan S - tasks_failed chan error - units_dispatcher chan WorkUnit[T, S] - units_receiver chan WorkUnit[T, S] + tasks_queue chan T // Channel for incoming tasks. + tasks_done chan S // Channel for completed tasks. + tasks_failed chan error // Channel for failed tasks. + units_dispatcher chan WorkUnit[T, S] // Channel for dispatching work units. + units_receiver chan WorkUnit[T, S] // Channel for receiving processed work units. } +// Starts a worker that processes work units received from the worker's receptor channel. +// It waits for rate limits, processes the work unit, and sends the result to the worker's transmitter channel. +// It also applies rate limit if is enabled. +// +// The function stops processing when the receptor channel is closed. +// +// Parameters: +// - ctx: The context to control the cancellation of the worker. +// - worker: The worker that processes the work units. func spawn_worker[T, S any](ctx context.Context, worker *Worker[T, S]) { - // TODO: handle tiemouts for workUnit := range worker.receptor { // Wait for rate-limit <-worker.rate_limit - var timeout context.Context + var workCtx context.Context var cancel context.CancelFunc if worker.timeout != 0 { - timeout, cancel = context.WithTimeout(ctx, worker.timeout) + workCtx, cancel = context.WithTimeout(ctx, worker.timeout) } else { - timeout, cancel = context.WithCancel(ctx) + workCtx, cancel = context.WithCancel(ctx) } done := make(chan struct{}) @@ -127,22 +135,34 @@ func spawn_worker[T, S any](ctx context.Context, worker *Worker[T, S]) { case <-done: workUnit.result = value workUnit.err = err - case <-timeout.Done(): - workUnit.err = timeout.Err() + case <-workCtx.Done(): + workUnit.err = workCtx.Err() } cancel() - worker.transmiter <- workUnit + worker.transmitter <- workUnit } } +// handleFailedWorkUnit handles a failed work unit by retrying it or marking it as failed. +// If the maximum number of retries is reached, the work unit is marked as failed and no further retries are attempted. +// Otherwise, the work unit is retried with an exponential backoff and jitter. +// +// Parameters: +// - workUnit: The work unit that failed. +// - channels: The channels used for communication between different parts of the system. +// - config: The configuration for the work unit. +// +// Returns: +// - A boolean indicating whether the work unit will be retried (true) or not (false). func handleFailedWorkUnit[T, S any]( workUnit *WorkUnit[T, S], channels *Channels[T, S], config *WorkConfig, ) bool { - if config.max_retries <= workUnit.attempts { + // If retries == 0, retries are disabled, return immediately + if config.max_retries == 0 || config.max_retries <= workUnit.attempts { channels.tasks_failed <- workUnit.err config.tasks_processed.Done() return false @@ -173,8 +193,14 @@ func handleFailedWorkUnit[T, S any]( return true } -// this is in charge of what we return to the user -// exits when units_receiver is closed, which is done when the workers are closed +// Listens for work results from the units_receiver channel and processes them. +// It handles failed work units and sends successful results to the tasks_done channel. +// The function stops processing when the context is done or when the units_receiver channel is closed. +// +// Parameters: +// - ctx: The context to control the cancellation of the listener. +// - channels: The channels used for communication between different parts of the system. +// - config: The configuration for the work listener. func listenForWorkResults[T, S any]( ctx context.Context, channels *Channels[T, S], @@ -201,8 +227,15 @@ func listenForWorkResults[T, S any]( } } -// this is in charge of receive values and transform them into work units -// stops when the queue is empty +// workUnitDispatcher is responsible for dispatching work units to the units_dispatcher channel. +// It listens for tasks from the tasks_queue channel and sends them to the units_dispatcher channel. +// The function stops processing work when the context is done or when the tasks_queue channel is closed. +// +// Parameters: +// - ctx: The context to control the cancellation of the dispatcher. +// - finish: The cancel function to stop the dispatcher. +// - channels: The channels used for communication between different parts of the system. +// - config: The configuration for the work dispatcher. func workUnitDispatcher[T, S any]( ctx context.Context, finish context.CancelFunc, @@ -233,8 +266,13 @@ func workUnitDispatcher[T, S any]( } } -// this wait for all workers to stop, then close the unit channels where the workers send values -// prevent closing the channel before the workers finish +// Stops the processing of work by closing all channels and calling the finish function. +// It waits for all tasks to be processed before closing the channels. +// +// Parameters: +// - finish: The context.CancelFunc to propagate the stop signal. +// - channels: The channels used for communication between different parts of the system. +// - config: The configuration for the work processing. func stopProcessingWork[T, S any]( finish context.CancelFunc, channels *Channels[T, S], @@ -250,13 +288,41 @@ func stopProcessingWork[T, S any]( finish() } +// asyncTaskRunner runs tasks asynchronously using a pool of workers. +// It sets default values for the WorkConfig if not provided, creates channels for communication, +// and starts the workers and dispatcher. +// +// Parameters: +// - ctx: The context to control the cancellation of the task runner. +// - inbound: The channel from which tasks are received. +// - config: The configuration for the task runner. +// - work: The work function to be executed by the workers. +// +// Returns: +// - A channel that receives the results of the tasks. +// - A channel that receives errors from the tasks. +// - A channel that signals when the task runner is done. func asyncTaskRunner[T, S any]( ctx context.Context, inbound chan T, config *WorkConfig, work Work[T, S], ) (<-chan S, <-chan error, <-chan struct{}) { - channel_size := config.max_workers * 3 + // Set default values for WorkConfig if not provided + if config.amount_of_workers == 0 { + config.amount_of_workers = 5 + } + if config.base_retry_time == 0 { + config.base_retry_time = 1 * time.Second + } + if config.rate_limit == nil { + config.rate_limit = NewRateLimiter(10, time.Second) + } + + // Ensure a clean wait group is ussed + config.tasks_processed = sync.WaitGroup{} + + channel_size := config.amount_of_workers * 3 done, finish := context.WithCancel(ctx) @@ -269,14 +335,14 @@ func asyncTaskRunner[T, S any]( } // create pool of workers - for i := range config.max_workers { + for i := range config.amount_of_workers { worker := &Worker[T, S]{ - id: uint8(i), - receptor: channels.units_dispatcher, - transmiter: channels.units_receiver, - rate_limit: config.rate_limit, - timeout: config.timeout, - work: work, + id: uint8(i), + receptor: channels.units_dispatcher, + transmitter: channels.units_receiver, + rate_limit: config.rate_limit, + timeout: config.timeout, + work: work, } go spawn_worker(ctx, worker) diff --git a/pkg/platform.go b/pkg/platform.go index b8822bc..6997519 100644 --- a/pkg/platform.go +++ b/pkg/platform.go @@ -37,11 +37,11 @@ func (platform *Platform) FetchCollections(fetcher Fetcher, start_pagination Pag defer cancel() config := &WorkConfig{ - max_workers: 5, - max_retries: 2, - base_retry_time: time.Second, - rate_limit: NewRateLimit(5, time.Minute), - timeout: time.Second * 2, + amount_of_workers: 5, + max_retries: 2, + base_retry_time: time.Second, + rate_limit: NewRateLimiter(5, time.Minute), + timeout: time.Second * 2, } tasks := make(chan int) @@ -49,7 +49,7 @@ func (platform *Platform) FetchCollections(fetcher Fetcher, start_pagination Pag // TODO: get number of page dynamically and change Fetcher signature pages := 4 - results, errors, done := asyncTaskRunner[int, []*Collection](ctx, tasks, config, fetchWrapper) + results, errors, done := asyncTaskRunner(ctx, tasks, config, fetchWrapper) for i := range pages { tasks <- i * start_pagination.Limit