generated from alecodes/base-template
this is a workaround to the fact that readwise use cursor pagination instead of offset pagination, proper handling will be added later
333 lines
11 KiB
Go
333 lines
11 KiB
Go
package synchronizator
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"math/rand"
|
|
"sync"
|
|
"time"
|
|
)
|
|
|
|
// NewRateLimiter creates a rate limiter that emits time events at a specified rate.
|
|
// request_per specifies the number of requests allowed per time_scale duration.
|
|
// time_scale specifies the duration over which the requests are allowed.
|
|
func NewRateLimiter(request_per int, time_scale time.Duration) <-chan time.Time {
|
|
rate_limit := make(chan time.Time, request_per)
|
|
tickrate := time_scale / time.Duration(request_per)
|
|
|
|
for range request_per {
|
|
rate_limit <- time.Now()
|
|
}
|
|
|
|
go func() {
|
|
for t := range time.Tick(tickrate) {
|
|
rate_limit <- t
|
|
}
|
|
}()
|
|
|
|
return rate_limit
|
|
}
|
|
|
|
// WorkUnit represents a unit of work to be processed.
|
|
type WorkUnit[T, S any] struct {
|
|
argument T // Argument to be processed by a worker.
|
|
result S // Result of the processing.
|
|
err error // Error encountered during processing.
|
|
timeout time.Duration // Timeout for the work unit.
|
|
attempts uint8 // Number of attempts made to process the work unit.
|
|
}
|
|
|
|
// Work defines a function type that processes a value of type T and returns a result of type S or an error.
|
|
type Work[T, S any] func(ctx context.Context, value T) (S, error)
|
|
|
|
// Worker represents a worker that processes tasks of type T and returns results of type S.
|
|
type Worker[T, S any] struct {
|
|
id uint8 // Unique identifier of the worker.
|
|
receptor <-chan WorkUnit[T, S] // Channel from which the worker receives tasks.
|
|
transmitter chan<- WorkUnit[T, S] // Channel to which the worker sends results.
|
|
wg *sync.WaitGroup // Wait group to synchronize the completion of tasks. should be decremented each time a task has been processed
|
|
work Work[T, S] // Function that processes tasks.
|
|
rate_limit <-chan time.Time // Ticker to limit the amount of request. Is recomended to pass the result of calling NewRateLimiter().
|
|
timeout time.Duration // Maximum execution time allowed for a task before beign canceled.
|
|
}
|
|
|
|
// WorkConfig represents the configuration for the work processing.
|
|
// All fields are optional and have sensible defaults:
|
|
//
|
|
// - tasks_processed: always a new WaitGroup
|
|
// - max_workers: default is 5
|
|
// - amount_of_workers: if 0, retries are disabled
|
|
// - base_retry_time: default is 1 second
|
|
// - rate_limit: default is 10 requests per second
|
|
// - timeout: if 0, timeout is disabled
|
|
type WorkConfig struct {
|
|
TasksProcessed sync.WaitGroup // Wait group to synchronize task completion.
|
|
AmountOfWorkers uint8 // Number of workers to spawn.
|
|
MaxRetries uint8 // Maximum number of retries for a task before beign cancelled.
|
|
BaseRetryTime time.Duration // Base factor to wait for before retrying a task.
|
|
RateLimit <-chan time.Time // Ticker to limit the amount of request. Is recomended to pass the result of calling NewRateLimiter().
|
|
Timeout time.Duration // Maximum execution time allowed for a task before beign canceled.
|
|
}
|
|
|
|
// Group the channels used for task processing for easy access between functions.
|
|
type Channels[T, S any] struct {
|
|
tasks_queue chan T // Channel for incoming tasks.
|
|
tasks_done chan S // Channel for completed tasks.
|
|
tasks_failed chan error // Channel for failed tasks.
|
|
units_dispatcher chan WorkUnit[T, S] // Channel for dispatching work units.
|
|
units_receiver chan WorkUnit[T, S] // Channel for receiving processed work units.
|
|
}
|
|
|
|
// Starts a worker that processes work units received from the worker's receptor channel.
|
|
// It waits for rate limits, processes the work unit, and sends the result to the worker's transmitter channel.
|
|
// It also applies rate limit if is enabled.
|
|
//
|
|
// The function stops processing when the receptor channel is closed.
|
|
//
|
|
// Parameters:
|
|
// - ctx: The context to control the cancellation of the worker.
|
|
// - worker: The worker that processes the work units.
|
|
func spawn_worker[T, S any](ctx context.Context, worker *Worker[T, S]) {
|
|
for workUnit := range worker.receptor {
|
|
// Wait for rate-limit
|
|
<-worker.rate_limit
|
|
|
|
var workCtx context.Context
|
|
var cancel context.CancelFunc
|
|
|
|
if worker.timeout != 0 {
|
|
workCtx, cancel = context.WithTimeout(ctx, worker.timeout)
|
|
} else {
|
|
workCtx, cancel = context.WithCancel(ctx)
|
|
}
|
|
|
|
done := make(chan struct{})
|
|
|
|
var value S
|
|
var err error
|
|
|
|
go func() {
|
|
value, err = worker.work(ctx, workUnit.argument)
|
|
close(done)
|
|
}()
|
|
|
|
select {
|
|
case <-done:
|
|
workUnit.result = value
|
|
workUnit.err = err
|
|
case <-workCtx.Done():
|
|
workUnit.err = workCtx.Err()
|
|
}
|
|
|
|
cancel()
|
|
|
|
worker.transmitter <- workUnit
|
|
}
|
|
}
|
|
|
|
// handleFailedWorkUnit handles a failed work unit by retrying it or marking it as failed.
|
|
// If the maximum number of retries is reached, the work unit is marked as failed and no further retries are attempted.
|
|
// Otherwise, the work unit is retried with an exponential backoff and jitter.
|
|
//
|
|
// Parameters:
|
|
// - workUnit: The work unit that failed.
|
|
// - channels: The channels used for communication between different parts of the system.
|
|
// - config: The configuration for the work unit.
|
|
//
|
|
// Returns:
|
|
// - A boolean indicating whether the work unit will be retried (true) or not (false).
|
|
func handleFailedWorkUnit[T, S any](
|
|
workUnit *WorkUnit[T, S],
|
|
channels *Channels[T, S],
|
|
config *WorkConfig,
|
|
) bool {
|
|
// If retries == 0, retries are disabled, return immediately
|
|
if config.MaxRetries == 0 || config.MaxRetries <= workUnit.attempts {
|
|
channels.tasks_failed <- workUnit.err
|
|
config.TasksProcessed.Done()
|
|
return false
|
|
}
|
|
|
|
workUnit.attempts++
|
|
workUnit.err = nil
|
|
|
|
if workUnit.timeout == 0 {
|
|
workUnit.timeout = config.BaseRetryTime
|
|
} else {
|
|
workUnit.timeout *= 2
|
|
}
|
|
|
|
go func() {
|
|
jitter := time.Duration(rand.Int63n(int64(workUnit.timeout)))
|
|
timeout := workUnit.timeout + jitter
|
|
fmt.Printf(
|
|
"Unit with value %v failed for %v time, retrying in: %v\n",
|
|
workUnit.argument,
|
|
workUnit.attempts,
|
|
timeout,
|
|
)
|
|
time.Sleep(timeout)
|
|
channels.units_dispatcher <- *workUnit
|
|
}()
|
|
|
|
return true
|
|
}
|
|
|
|
// Listens for work results from the units_receiver channel and processes them.
|
|
// It handles failed work units and sends successful results to the tasks_done channel.
|
|
// The function stops processing when the context is done or when the units_receiver channel is closed.
|
|
//
|
|
// Parameters:
|
|
// - ctx: The context to control the cancellation of the listener.
|
|
// - channels: The channels used for communication between different parts of the system.
|
|
// - config: The configuration for the work listener.
|
|
func listenForWorkResults[T, S any](
|
|
ctx context.Context,
|
|
channels *Channels[T, S],
|
|
config *WorkConfig,
|
|
) {
|
|
for {
|
|
select {
|
|
case workUnit, ok := <-channels.units_receiver:
|
|
if !ok {
|
|
return
|
|
}
|
|
|
|
if workUnit.err != nil {
|
|
handleFailedWorkUnit(&workUnit, channels, config)
|
|
continue
|
|
}
|
|
|
|
// Send message to user
|
|
channels.tasks_done <- workUnit.result
|
|
config.TasksProcessed.Done()
|
|
case <-ctx.Done():
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
// workUnitDispatcher is responsible for dispatching work units to the units_dispatcher channel.
|
|
// It listens for tasks from the tasks_queue channel and sends them to the units_dispatcher channel.
|
|
// The function stops processing work when the context is done or when the tasks_queue channel is closed.
|
|
//
|
|
// Parameters:
|
|
// - ctx: The context to control the cancellation of the dispatcher.
|
|
// - finish: The cancel function to stop the dispatcher.
|
|
// - channels: The channels used for communication between different parts of the system.
|
|
// - config: The configuration for the work dispatcher.
|
|
func workUnitDispatcher[T, S any](
|
|
ctx context.Context,
|
|
finish context.CancelFunc,
|
|
channels *Channels[T, S],
|
|
config *WorkConfig,
|
|
) {
|
|
defer stopProcessingWork(finish, channels, config)
|
|
|
|
for {
|
|
select {
|
|
case value, ok := <-channels.tasks_queue:
|
|
if !ok {
|
|
return
|
|
}
|
|
|
|
workUnit := WorkUnit[T, S]{
|
|
argument: value,
|
|
timeout: 0,
|
|
attempts: 0,
|
|
}
|
|
channels.units_dispatcher <- workUnit
|
|
config.TasksProcessed.Add(1)
|
|
|
|
case <-ctx.Done():
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
// Stops the processing of work by closing all channels and calling the finish function.
|
|
// It waits for all tasks to be processed before closing the channels.
|
|
//
|
|
// Parameters:
|
|
// - finish: The context.CancelFunc to propagate the stop signal.
|
|
// - channels: The channels used for communication between different parts of the system.
|
|
// - config: The configuration for the work processing.
|
|
func stopProcessingWork[T, S any](
|
|
finish context.CancelFunc,
|
|
channels *Channels[T, S],
|
|
config *WorkConfig,
|
|
) {
|
|
config.TasksProcessed.Wait()
|
|
|
|
close(channels.units_receiver)
|
|
close(channels.units_dispatcher)
|
|
close(channels.tasks_done)
|
|
close(channels.tasks_failed)
|
|
|
|
finish()
|
|
}
|
|
|
|
// asyncTaskRunner runs tasks asynchronously using a pool of workers.
|
|
// It sets default values for the WorkConfig if not provided, creates channels for communication,
|
|
// and starts the workers and dispatcher.
|
|
//
|
|
// Parameters:
|
|
// - ctx: The context to control the cancellation of the task runner.
|
|
// - inbound: The channel from which tasks are received.
|
|
// - config: The configuration for the task runner.
|
|
// - work: The work function to be executed by the workers.
|
|
//
|
|
// Returns:
|
|
// - A channel that receives the results of the tasks.
|
|
// - A channel that receives errors from the tasks.
|
|
// - A channel that signals when the task runner is done.
|
|
func asyncTaskRunner[T, S any](
|
|
ctx context.Context,
|
|
inbound chan T,
|
|
config *WorkConfig,
|
|
work Work[T, S],
|
|
) (<-chan S, <-chan error, <-chan struct{}) {
|
|
// Set default values for WorkConfig if not provided
|
|
if config.AmountOfWorkers == 0 {
|
|
config.AmountOfWorkers = 5
|
|
}
|
|
if config.BaseRetryTime == 0 {
|
|
config.BaseRetryTime = 1 * time.Second
|
|
}
|
|
if config.RateLimit == nil {
|
|
config.RateLimit = NewRateLimiter(10, time.Second)
|
|
}
|
|
|
|
// Ensure a clean wait group is ussed
|
|
config.TasksProcessed = sync.WaitGroup{}
|
|
|
|
channel_size := config.AmountOfWorkers * 3
|
|
|
|
done, finish := context.WithCancel(ctx)
|
|
|
|
channels := &Channels[T, S]{
|
|
tasks_queue: inbound,
|
|
tasks_done: make(chan S),
|
|
tasks_failed: make(chan error),
|
|
units_dispatcher: make(chan WorkUnit[T, S], channel_size),
|
|
units_receiver: make(chan WorkUnit[T, S], channel_size),
|
|
}
|
|
|
|
// create pool of workers
|
|
for i := range config.AmountOfWorkers {
|
|
worker := &Worker[T, S]{
|
|
id: uint8(i),
|
|
receptor: channels.units_dispatcher,
|
|
transmitter: channels.units_receiver,
|
|
rate_limit: config.RateLimit,
|
|
timeout: config.Timeout,
|
|
work: work,
|
|
}
|
|
|
|
go spawn_worker(ctx, worker)
|
|
}
|
|
|
|
go listenForWorkResults(done, channels, config)
|
|
go workUnitDispatcher(done, finish, channels, config)
|
|
return channels.tasks_done, channels.tasks_failed, done.Done()
|
|
}
|