package synchronizator import ( "context" "fmt" "math/rand" "sync" "time" ) // NewRateLimiter creates a rate limiter that emits time events at a specified rate. // request_per specifies the number of requests allowed per time_scale duration. // time_scale specifies the duration over which the requests are allowed. func NewRateLimiter(request_per int, time_scale time.Duration) <-chan time.Time { rate_limit := make(chan time.Time, request_per) tickrate := time_scale / time.Duration(request_per) for range request_per { rate_limit <- time.Now() } go func() { for t := range time.Tick(tickrate) { rate_limit <- t } }() return rate_limit } // WorkUnit represents a unit of work to be processed. type WorkUnit[T, S any] struct { argument T // Argument to be processed by a worker. result S // Result of the processing. err error // Error encountered during processing. timeout time.Duration // Timeout for the work unit. attempts uint8 // Number of attempts made to process the work unit. } // Work defines a function type that processes a value of type T and returns a result of type S or an error. type Work[T, S any] func(ctx context.Context, value T) (S, error) // Worker represents a worker that processes tasks of type T and returns results of type S. type Worker[T, S any] struct { id uint8 // Unique identifier of the worker. receptor <-chan WorkUnit[T, S] // Channel from which the worker receives tasks. transmitter chan<- WorkUnit[T, S] // Channel to which the worker sends results. wg *sync.WaitGroup // Wait group to synchronize the completion of tasks. should be decremented each time a task has been processed work Work[T, S] // Function that processes tasks. rate_limit <-chan time.Time // Ticker to limit the amount of request. Is recomended to pass the result of calling NewRateLimiter(). timeout time.Duration // Maximum execution time allowed for a task before beign canceled. } // WorkConfig represents the configuration for the work processing. // All fields are optional and have sensible defaults: // // - tasks_processed: always a new WaitGroup // - max_workers: default is 5 // - amount_of_workers: if 0, retries are disabled // - base_retry_time: default is 1 second // - rate_limit: default is 10 requests per second // - timeout: if 0, timeout is disabled type WorkConfig struct { TasksProcessed sync.WaitGroup // Wait group to synchronize task completion. AmountOfWorkers uint8 // Number of workers to spawn. MaxRetries uint8 // Maximum number of retries for a task before beign cancelled. BaseRetryTime time.Duration // Base factor to wait for before retrying a task. RateLimit <-chan time.Time // Ticker to limit the amount of request. Is recomended to pass the result of calling NewRateLimiter(). Timeout time.Duration // Maximum execution time allowed for a task before beign canceled. } // Group the channels used for task processing for easy access between functions. type Channels[T, S any] struct { tasks_queue chan T // Channel for incoming tasks. tasks_done chan S // Channel for completed tasks. tasks_failed chan error // Channel for failed tasks. units_dispatcher chan WorkUnit[T, S] // Channel for dispatching work units. units_receiver chan WorkUnit[T, S] // Channel for receiving processed work units. } // Starts a worker that processes work units received from the worker's receptor channel. // It waits for rate limits, processes the work unit, and sends the result to the worker's transmitter channel. // It also applies rate limit if is enabled. // // The function stops processing when the receptor channel is closed. // // Parameters: // - ctx: The context to control the cancellation of the worker. // - worker: The worker that processes the work units. func spawn_worker[T, S any](ctx context.Context, worker *Worker[T, S]) { for workUnit := range worker.receptor { // Wait for rate-limit <-worker.rate_limit var workCtx context.Context var cancel context.CancelFunc if worker.timeout != 0 { workCtx, cancel = context.WithTimeout(ctx, worker.timeout) } else { workCtx, cancel = context.WithCancel(ctx) } done := make(chan struct{}) var value S var err error go func() { value, err = worker.work(ctx, workUnit.argument) close(done) }() select { case <-done: workUnit.result = value workUnit.err = err case <-workCtx.Done(): workUnit.err = workCtx.Err() } cancel() worker.transmitter <- workUnit } } // handleFailedWorkUnit handles a failed work unit by retrying it or marking it as failed. // If the maximum number of retries is reached, the work unit is marked as failed and no further retries are attempted. // Otherwise, the work unit is retried with an exponential backoff and jitter. // // Parameters: // - workUnit: The work unit that failed. // - channels: The channels used for communication between different parts of the system. // - config: The configuration for the work unit. // // Returns: // - A boolean indicating whether the work unit will be retried (true) or not (false). func handleFailedWorkUnit[T, S any]( workUnit *WorkUnit[T, S], channels *Channels[T, S], config *WorkConfig, ) bool { // If retries == 0, retries are disabled, return immediately if config.MaxRetries == 0 || config.MaxRetries <= workUnit.attempts { channels.tasks_failed <- workUnit.err config.TasksProcessed.Done() return false } workUnit.attempts++ workUnit.err = nil if workUnit.timeout == 0 { workUnit.timeout = config.BaseRetryTime } else { workUnit.timeout *= 2 } go func() { jitter := time.Duration(rand.Int63n(int64(workUnit.timeout))) timeout := workUnit.timeout + jitter fmt.Printf( "Unit with value %v failed for %v time, retrying in: %v\n", workUnit.argument, workUnit.attempts, timeout, ) time.Sleep(timeout) channels.units_dispatcher <- *workUnit }() return true } // Listens for work results from the units_receiver channel and processes them. // It handles failed work units and sends successful results to the tasks_done channel. // The function stops processing when the context is done or when the units_receiver channel is closed. // // Parameters: // - ctx: The context to control the cancellation of the listener. // - channels: The channels used for communication between different parts of the system. // - config: The configuration for the work listener. func listenForWorkResults[T, S any]( ctx context.Context, channels *Channels[T, S], config *WorkConfig, ) { for { select { case workUnit, ok := <-channels.units_receiver: if !ok { return } if workUnit.err != nil { handleFailedWorkUnit(&workUnit, channels, config) continue } // Send message to user channels.tasks_done <- workUnit.result config.TasksProcessed.Done() case <-ctx.Done(): return } } } // workUnitDispatcher is responsible for dispatching work units to the units_dispatcher channel. // It listens for tasks from the tasks_queue channel and sends them to the units_dispatcher channel. // The function stops processing work when the context is done or when the tasks_queue channel is closed. // // Parameters: // - ctx: The context to control the cancellation of the dispatcher. // - finish: The cancel function to stop the dispatcher. // - channels: The channels used for communication between different parts of the system. // - config: The configuration for the work dispatcher. func workUnitDispatcher[T, S any]( ctx context.Context, finish context.CancelFunc, channels *Channels[T, S], config *WorkConfig, ) { defer stopProcessingWork(finish, channels, config) for { select { case value, ok := <-channels.tasks_queue: if !ok { return } workUnit := WorkUnit[T, S]{ argument: value, timeout: 0, attempts: 0, } channels.units_dispatcher <- workUnit config.TasksProcessed.Add(1) case <-ctx.Done(): fmt.Println("context done") return } } } // Stops the processing of work by closing all channels and calling the finish function. // It waits for all tasks to be processed before closing the channels. // // Parameters: // - finish: The context.CancelFunc to propagate the stop signal. // - channels: The channels used for communication between different parts of the system. // - config: The configuration for the work processing. func stopProcessingWork[T, S any]( finish context.CancelFunc, channels *Channels[T, S], config *WorkConfig, ) { config.TasksProcessed.Wait() close(channels.units_receiver) close(channels.units_dispatcher) close(channels.tasks_done) close(channels.tasks_failed) finish() } // asyncTaskRunner runs tasks asynchronously using a pool of workers. // It sets default values for the WorkConfig if not provided, creates channels for communication, // and starts the workers and dispatcher. // // Parameters: // - ctx: The context to control the cancellation of the task runner. // - inbound: The channel from which tasks are received. // - config: The configuration for the task runner. // - work: The work function to be executed by the workers. // // Returns: // - A channel that receives the results of the tasks. // - A channel that receives errors from the tasks. // - A channel that signals when the task runner is done. func asyncTaskRunner[T, S any]( ctx context.Context, inbound chan T, config *WorkConfig, work Work[T, S], ) (<-chan S, <-chan error, <-chan struct{}) { // Set default values for WorkConfig if not provided if config.AmountOfWorkers == 0 { config.AmountOfWorkers = 5 } if config.BaseRetryTime == 0 { config.BaseRetryTime = 1 * time.Second } if config.RateLimit == nil { config.RateLimit = NewRateLimiter(10, time.Second) } // Ensure a clean wait group is ussed config.TasksProcessed = sync.WaitGroup{} channel_size := config.AmountOfWorkers * 3 done, finish := context.WithCancel(ctx) channels := &Channels[T, S]{ tasks_queue: inbound, tasks_done: make(chan S), tasks_failed: make(chan error), units_dispatcher: make(chan WorkUnit[T, S], channel_size), units_receiver: make(chan WorkUnit[T, S], channel_size), } // create pool of workers for i := range config.AmountOfWorkers { worker := &Worker[T, S]{ id: uint8(i), receptor: channels.units_dispatcher, transmitter: channels.units_receiver, rate_limit: config.RateLimit, timeout: config.Timeout, work: work, } go spawn_worker(ctx, worker) } go listenForWorkResults(done, channels, config) go workUnitDispatcher(done, finish, channels, config) return channels.tasks_done, channels.tasks_failed, done.Done() }