docs: Add signature comments to worker pool

This commit is contained in:
Alexander Navarro 2024-11-29 19:06:41 -03:00
parent f8c068a7f3
commit 837e703bc4
2 changed files with 142 additions and 76 deletions

View file

@ -8,28 +8,19 @@ import (
"time" "time"
) )
// Fetcher is the concurrent manager // Fetcher is a function type that fetches a collection of items based on the provided pagination information.
// upon invocation, should create a worker pool of 1 to get the first set of results // It returns a slice of collections, updated pagination information, and an error if any occurred.
// then base on the Patination Total and Limit, should distribute the workload
//
// It also needs to handle errors, rate-limits, retries strategies, and gracefull rejections
//
// It should return the pages not fetched for later retry
//
// Pagination should include a max-concurrent connection and rate-limit
// configuration to prevent having errors from external sources
//
// Maybe change the name to pagination or embed in another struct
type Fetcher = func(pagination Pagination) ([]*Collection, Pagination, error) type Fetcher = func(pagination Pagination) ([]*Collection, Pagination, error)
// Pagination represents the pagination information for fetching data.
type Pagination struct { type Pagination struct {
Total int Total int // Total number of items.
HasMore bool HasMore bool // Indicates if there are more items to fetch.
Limit int Limit int // Number of items to fetch per request.
Offset int Offset int // Offset for the next set of items to fetch.
} }
// StartPagination is the initial pagination configuration.
var StartPagination = Pagination{ var StartPagination = Pagination{
Total: 0, Total: 0,
HasMore: false, HasMore: false,
@ -37,7 +28,10 @@ var StartPagination = Pagination{
Offset: 0, Offset: 0,
} }
func NewRateLimit(request_per int, time_scale time.Duration) <-chan time.Time { // NewRateLimiter creates a rate limiter that emits time events at a specified rate.
// request_per specifies the number of requests allowed per time_scale duration.
// time_scale specifies the duration over which the requests are allowed.
func NewRateLimiter(request_per int, time_scale time.Duration) <-chan time.Time {
rate_limit := make(chan time.Time, request_per) rate_limit := make(chan time.Time, request_per)
tickrate := time_scale / time.Duration(request_per) tickrate := time_scale / time.Duration(request_per)
@ -54,63 +48,77 @@ func NewRateLimit(request_per int, time_scale time.Duration) <-chan time.Time {
return rate_limit return rate_limit
} }
// T represent the argument of the function to run // WorkUnit represents a unit of work to be processed.
// S represent the return value of the function to run
type WorkUnit[T, S any] struct { type WorkUnit[T, S any] struct {
argument T argument T // Argument to be processed by a worker.
result S result S // Result of the processing.
err error err error // Error encountered during processing.
timeout time.Duration timeout time.Duration // Timeout for the work unit.
attempts uint8 attempts uint8 // Number of attempts made to process the work unit.
} }
// Work represents a function that processes a value of type S and returns a // Work defines a function type that processes a value of type T and returns a result of type S or an error.
// result of type T or an error.
type Work[T, S any] func(value T) (S, error) type Work[T, S any] func(value T) (S, error)
// Worker represents a worker that processes tasks of type S and sends results // Worker represents a worker that processes tasks of type T and returns results of type S.
// of type T.
type Worker[T, S any] struct { type Worker[T, S any] struct {
id uint8 // id is the unique identifier of the worker. id uint8 // Unique identifier of the worker.
receptor <-chan WorkUnit[T, S] // receptor is the channel from which the worker receives tasks. receptor <-chan WorkUnit[T, S] // Channel from which the worker receives tasks.
transmiter chan<- WorkUnit[T, S] // transmiter is the channel to which the worker sends results. transmitter chan<- WorkUnit[T, S] // Channel to which the worker sends results.
wg *sync.WaitGroup // wg is the wait group to synchronize the completion of tasks. wg *sync.WaitGroup // Wait group to synchronize the completion of tasks. should be decremented each time a task has been processed
work Work[T, S] // work is the function that processes tasks. work Work[T, S] // Function that processes tasks.
rate_limit <-chan time.Time rate_limit <-chan time.Time // Ticker to limit the amount of request. Is recomended to pass the result of calling NewRateLimiter().
timeout time.Duration timeout time.Duration // Maximum execution time allowed for a task before beign canceled.
} }
// WorkConfig represents the configuration for the work processing.
// All fields are optional and have sensible defaults:
//
// - tasks_processed: always a new WaitGroup
// - max_workers: default is 5
// - amount_of_workers: if 0, retries are disabled
// - base_retry_time: default is 1 second
// - rate_limit: default is 10 requests per second
// - timeout: if 0, timeout is disabled
type WorkConfig struct { type WorkConfig struct {
tasks_processed sync.WaitGroup tasks_processed sync.WaitGroup // Wait group to synchronize task completion.
max_workers uint8 amount_of_workers uint8 // Number of workers to spawn.
max_retries uint8 max_retries uint8 // Maximum number of retries for a task before beign cancelled.
base_retry_time time.Duration base_retry_time time.Duration // Base factor to wait for before retrying a task.
rate_limit <-chan time.Time rate_limit <-chan time.Time // Ticker to limit the amount of request. Is recomended to pass the result of calling NewRateLimiter().
timeout time.Duration timeout time.Duration // Maximum execution time allowed for a task before beign canceled.
} }
// Group the channels used for task processing for easy access between functions.
type Channels[T, S any] struct { type Channels[T, S any] struct {
tasks_queue chan T tasks_queue chan T // Channel for incoming tasks.
tasks_done chan S tasks_done chan S // Channel for completed tasks.
tasks_failed chan error tasks_failed chan error // Channel for failed tasks.
units_dispatcher chan WorkUnit[T, S] units_dispatcher chan WorkUnit[T, S] // Channel for dispatching work units.
units_receiver chan WorkUnit[T, S] units_receiver chan WorkUnit[T, S] // Channel for receiving processed work units.
} }
// Starts a worker that processes work units received from the worker's receptor channel.
// It waits for rate limits, processes the work unit, and sends the result to the worker's transmitter channel.
// It also applies rate limit if is enabled.
//
// The function stops processing when the receptor channel is closed.
//
// Parameters:
// - ctx: The context to control the cancellation of the worker.
// - worker: The worker that processes the work units.
func spawn_worker[T, S any](ctx context.Context, worker *Worker[T, S]) { func spawn_worker[T, S any](ctx context.Context, worker *Worker[T, S]) {
// TODO: handle tiemouts
for workUnit := range worker.receptor { for workUnit := range worker.receptor {
// Wait for rate-limit // Wait for rate-limit
<-worker.rate_limit <-worker.rate_limit
var timeout context.Context var workCtx context.Context
var cancel context.CancelFunc var cancel context.CancelFunc
if worker.timeout != 0 { if worker.timeout != 0 {
timeout, cancel = context.WithTimeout(ctx, worker.timeout) workCtx, cancel = context.WithTimeout(ctx, worker.timeout)
} else { } else {
timeout, cancel = context.WithCancel(ctx) workCtx, cancel = context.WithCancel(ctx)
} }
done := make(chan struct{}) done := make(chan struct{})
@ -127,22 +135,34 @@ func spawn_worker[T, S any](ctx context.Context, worker *Worker[T, S]) {
case <-done: case <-done:
workUnit.result = value workUnit.result = value
workUnit.err = err workUnit.err = err
case <-timeout.Done(): case <-workCtx.Done():
workUnit.err = timeout.Err() workUnit.err = workCtx.Err()
} }
cancel() cancel()
worker.transmiter <- workUnit worker.transmitter <- workUnit
} }
} }
// handleFailedWorkUnit handles a failed work unit by retrying it or marking it as failed.
// If the maximum number of retries is reached, the work unit is marked as failed and no further retries are attempted.
// Otherwise, the work unit is retried with an exponential backoff and jitter.
//
// Parameters:
// - workUnit: The work unit that failed.
// - channels: The channels used for communication between different parts of the system.
// - config: The configuration for the work unit.
//
// Returns:
// - A boolean indicating whether the work unit will be retried (true) or not (false).
func handleFailedWorkUnit[T, S any]( func handleFailedWorkUnit[T, S any](
workUnit *WorkUnit[T, S], workUnit *WorkUnit[T, S],
channels *Channels[T, S], channels *Channels[T, S],
config *WorkConfig, config *WorkConfig,
) bool { ) bool {
if config.max_retries <= workUnit.attempts { // If retries == 0, retries are disabled, return immediately
if config.max_retries == 0 || config.max_retries <= workUnit.attempts {
channels.tasks_failed <- workUnit.err channels.tasks_failed <- workUnit.err
config.tasks_processed.Done() config.tasks_processed.Done()
return false return false
@ -173,8 +193,14 @@ func handleFailedWorkUnit[T, S any](
return true return true
} }
// this is in charge of what we return to the user // Listens for work results from the units_receiver channel and processes them.
// exits when units_receiver is closed, which is done when the workers are closed // It handles failed work units and sends successful results to the tasks_done channel.
// The function stops processing when the context is done or when the units_receiver channel is closed.
//
// Parameters:
// - ctx: The context to control the cancellation of the listener.
// - channels: The channels used for communication between different parts of the system.
// - config: The configuration for the work listener.
func listenForWorkResults[T, S any]( func listenForWorkResults[T, S any](
ctx context.Context, ctx context.Context,
channels *Channels[T, S], channels *Channels[T, S],
@ -201,8 +227,15 @@ func listenForWorkResults[T, S any](
} }
} }
// this is in charge of receive values and transform them into work units // workUnitDispatcher is responsible for dispatching work units to the units_dispatcher channel.
// stops when the queue is empty // It listens for tasks from the tasks_queue channel and sends them to the units_dispatcher channel.
// The function stops processing work when the context is done or when the tasks_queue channel is closed.
//
// Parameters:
// - ctx: The context to control the cancellation of the dispatcher.
// - finish: The cancel function to stop the dispatcher.
// - channels: The channels used for communication between different parts of the system.
// - config: The configuration for the work dispatcher.
func workUnitDispatcher[T, S any]( func workUnitDispatcher[T, S any](
ctx context.Context, ctx context.Context,
finish context.CancelFunc, finish context.CancelFunc,
@ -233,8 +266,13 @@ func workUnitDispatcher[T, S any](
} }
} }
// this wait for all workers to stop, then close the unit channels where the workers send values // Stops the processing of work by closing all channels and calling the finish function.
// prevent closing the channel before the workers finish // It waits for all tasks to be processed before closing the channels.
//
// Parameters:
// - finish: The context.CancelFunc to propagate the stop signal.
// - channels: The channels used for communication between different parts of the system.
// - config: The configuration for the work processing.
func stopProcessingWork[T, S any]( func stopProcessingWork[T, S any](
finish context.CancelFunc, finish context.CancelFunc,
channels *Channels[T, S], channels *Channels[T, S],
@ -250,13 +288,41 @@ func stopProcessingWork[T, S any](
finish() finish()
} }
// asyncTaskRunner runs tasks asynchronously using a pool of workers.
// It sets default values for the WorkConfig if not provided, creates channels for communication,
// and starts the workers and dispatcher.
//
// Parameters:
// - ctx: The context to control the cancellation of the task runner.
// - inbound: The channel from which tasks are received.
// - config: The configuration for the task runner.
// - work: The work function to be executed by the workers.
//
// Returns:
// - A channel that receives the results of the tasks.
// - A channel that receives errors from the tasks.
// - A channel that signals when the task runner is done.
func asyncTaskRunner[T, S any]( func asyncTaskRunner[T, S any](
ctx context.Context, ctx context.Context,
inbound chan T, inbound chan T,
config *WorkConfig, config *WorkConfig,
work Work[T, S], work Work[T, S],
) (<-chan S, <-chan error, <-chan struct{}) { ) (<-chan S, <-chan error, <-chan struct{}) {
channel_size := config.max_workers * 3 // Set default values for WorkConfig if not provided
if config.amount_of_workers == 0 {
config.amount_of_workers = 5
}
if config.base_retry_time == 0 {
config.base_retry_time = 1 * time.Second
}
if config.rate_limit == nil {
config.rate_limit = NewRateLimiter(10, time.Second)
}
// Ensure a clean wait group is ussed
config.tasks_processed = sync.WaitGroup{}
channel_size := config.amount_of_workers * 3
done, finish := context.WithCancel(ctx) done, finish := context.WithCancel(ctx)
@ -269,14 +335,14 @@ func asyncTaskRunner[T, S any](
} }
// create pool of workers // create pool of workers
for i := range config.max_workers { for i := range config.amount_of_workers {
worker := &Worker[T, S]{ worker := &Worker[T, S]{
id: uint8(i), id: uint8(i),
receptor: channels.units_dispatcher, receptor: channels.units_dispatcher,
transmiter: channels.units_receiver, transmitter: channels.units_receiver,
rate_limit: config.rate_limit, rate_limit: config.rate_limit,
timeout: config.timeout, timeout: config.timeout,
work: work, work: work,
} }
go spawn_worker(ctx, worker) go spawn_worker(ctx, worker)

View file

@ -37,11 +37,11 @@ func (platform *Platform) FetchCollections(fetcher Fetcher, start_pagination Pag
defer cancel() defer cancel()
config := &WorkConfig{ config := &WorkConfig{
max_workers: 5, amount_of_workers: 5,
max_retries: 2, max_retries: 2,
base_retry_time: time.Second, base_retry_time: time.Second,
rate_limit: NewRateLimit(5, time.Minute), rate_limit: NewRateLimiter(5, time.Minute),
timeout: time.Second * 2, timeout: time.Second * 2,
} }
tasks := make(chan int) tasks := make(chan int)
@ -49,7 +49,7 @@ func (platform *Platform) FetchCollections(fetcher Fetcher, start_pagination Pag
// TODO: get number of page dynamically and change Fetcher signature // TODO: get number of page dynamically and change Fetcher signature
pages := 4 pages := 4
results, errors, done := asyncTaskRunner[int, []*Collection](ctx, tasks, config, fetchWrapper) results, errors, done := asyncTaskRunner(ctx, tasks, config, fetchWrapper)
for i := range pages { for i := range pages {
tasks <- i * start_pagination.Limit tasks <- i * start_pagination.Limit