diff --git a/examples/usage.go b/examples/usage.go index 51d3fc6..f39f434 100644 --- a/examples/usage.go +++ b/examples/usage.go @@ -1,6 +1,7 @@ package main import ( + "context" "database/sql" "encoding/json" "fmt" @@ -16,7 +17,7 @@ import ( ) type PokeApiListResponse[T any] struct { - Count int `json:"count"` + Count uint64 `json:"count"` Next string `json:"next"` Previous string `json:"previous"` Results []T `json:"results"` @@ -42,17 +43,22 @@ type Pokemon struct { } func getPokedexs( + ctx context.Context, pagination synchronizator.Pagination, -) ([]*synchronizator.Collection, synchronizator.Pagination, error) { +) (synchronizator.FetchCollectionResponse, error) { + payload := synchronizator.FetchCollectionResponse{ + Pagination: pagination, + } + var collections []*synchronizator.Collection params := url.Values{} - params.Add("offset", strconv.Itoa(pagination.Offset)) - params.Add("limit", strconv.Itoa(pagination.Limit)) + params.Add("offset", strconv.FormatUint(pagination.Offset, 10)) + params.Add("limit", strconv.FormatUint(pagination.Limit, 10)) resp, err := http.Get("https://pokeapi.co/api/v2/pokedex?" + params.Encode()) if err != nil { - return nil, pagination, err + return payload, err } body, err := io.ReadAll(resp.Body) @@ -61,7 +67,7 @@ func getPokedexs( var data PokeApiListResponse[Pokedex] err = json.Unmarshal(body, &data) if err != nil { - return nil, pagination, err + return payload, err } collections = make([]*synchronizator.Collection, 0, len(data.Results)) @@ -71,18 +77,20 @@ func getPokedexs( metadata, err := json.Marshal(pokedex) collection := synchronizator.NewCollection(collection_name, metadata) if err != nil { - return nil, pagination, err + return payload, err } collections = append(collections, collection) } // fmt.Println(data) - pagination.Offset += pagination.Limit - pagination.HasMore = data.Next != "" - pagination.Total = data.Count + payload.Offset += pagination.Limit + payload.HasMore = data.Next != "" + payload.Total = data.Count - return collections, pagination, nil + payload.Response = collections + + return payload, nil } func getPokemons( @@ -158,7 +166,9 @@ func main() { return } - err = pokeApi.FetchCollections(getPokedexs, synchronizator.StartPagination) + pagination := synchronizator.StartPagination + + err = pokeApi.FetchCollections(getPokedexs, pagination) if err != nil { fmt.Println(err) return diff --git a/pkg/fetcher.go b/pkg/fetcher.go index b22d4ee..7d67fda 100644 --- a/pkg/fetcher.go +++ b/pkg/fetcher.go @@ -1,354 +1,53 @@ package synchronizator -import ( - "context" - "fmt" - "math/rand" - "sync" - "time" -) - -// Fetcher is a function type that fetches a collection of items based on the provided pagination information. -// It returns a slice of collections, updated pagination information, and an error if any occurred. -type Fetcher = func(pagination Pagination) ([]*Collection, Pagination, error) - -// Pagination represents the pagination information for fetching data. -type Pagination struct { - Total int // Total number of items. - HasMore bool // Indicates if there are more items to fetch. - Limit int // Number of items to fetch per request. - Offset int // Offset for the next set of items to fetch. -} +import "fmt" // StartPagination is the initial pagination configuration. var StartPagination = Pagination{ Total: 0, + Pages: 0, HasMore: false, Limit: 10, Offset: 0, } -// NewRateLimiter creates a rate limiter that emits time events at a specified rate. -// request_per specifies the number of requests allowed per time_scale duration. -// time_scale specifies the duration over which the requests are allowed. -func NewRateLimiter(request_per int, time_scale time.Duration) <-chan time.Time { - rate_limit := make(chan time.Time, request_per) - tickrate := time_scale / time.Duration(request_per) - - for range request_per { - rate_limit <- time.Now() - } - - go func() { - for t := range time.Tick(tickrate) { - rate_limit <- t - } - }() - - return rate_limit +type FetchResponse[T any] struct { + Pagination + Response T } -// WorkUnit represents a unit of work to be processed. -type WorkUnit[T, S any] struct { - argument T // Argument to be processed by a worker. - result S // Result of the processing. - err error // Error encountered during processing. - timeout time.Duration // Timeout for the work unit. - attempts uint8 // Number of attempts made to process the work unit. +// Pagination represents the pagination information for fetching data. +type Pagination struct { + Total uint64 // Total number of items. + Pages uint64 + HasMore bool // Indicates if there are more items to fetch. + Limit uint64 // Number of items to fetch per request. + Offset uint64 // Offset for the next set of items to fetch. } -// Work defines a function type that processes a value of type T and returns a result of type S or an error. -type Work[T, S any] func(value T) (S, error) +func calculatePages(pagination *Pagination, offset uint64) (uint64, error) { + if pagination.Limit == 0 { + return 0, fmt.Errorf("division by zero") + } -// Worker represents a worker that processes tasks of type T and returns results of type S. -type Worker[T, S any] struct { - id uint8 // Unique identifier of the worker. - receptor <-chan WorkUnit[T, S] // Channel from which the worker receives tasks. - transmitter chan<- WorkUnit[T, S] // Channel to which the worker sends results. - wg *sync.WaitGroup // Wait group to synchronize the completion of tasks. should be decremented each time a task has been processed - work Work[T, S] // Function that processes tasks. - rate_limit <-chan time.Time // Ticker to limit the amount of request. Is recomended to pass the result of calling NewRateLimiter(). - timeout time.Duration // Maximum execution time allowed for a task before beign canceled. + dividend := pagination.Total - offset + divisor := pagination.Limit + + result := dividend / divisor + if dividend%divisor != 0 { + result++ + } + return result, nil } -// WorkConfig represents the configuration for the work processing. -// All fields are optional and have sensible defaults: -// -// - tasks_processed: always a new WaitGroup -// - max_workers: default is 5 -// - amount_of_workers: if 0, retries are disabled -// - base_retry_time: default is 1 second -// - rate_limit: default is 10 requests per second -// - timeout: if 0, timeout is disabled -type WorkConfig struct { - tasks_processed sync.WaitGroup // Wait group to synchronize task completion. - amount_of_workers uint8 // Number of workers to spawn. - max_retries uint8 // Maximum number of retries for a task before beign cancelled. - base_retry_time time.Duration // Base factor to wait for before retrying a task. - rate_limit <-chan time.Time // Ticker to limit the amount of request. Is recomended to pass the result of calling NewRateLimiter(). - timeout time.Duration // Maximum execution time allowed for a task before beign canceled. -} - -// Group the channels used for task processing for easy access between functions. -type Channels[T, S any] struct { - tasks_queue chan T // Channel for incoming tasks. - tasks_done chan S // Channel for completed tasks. - tasks_failed chan error // Channel for failed tasks. - units_dispatcher chan WorkUnit[T, S] // Channel for dispatching work units. - units_receiver chan WorkUnit[T, S] // Channel for receiving processed work units. -} - -// Starts a worker that processes work units received from the worker's receptor channel. -// It waits for rate limits, processes the work unit, and sends the result to the worker's transmitter channel. -// It also applies rate limit if is enabled. -// -// The function stops processing when the receptor channel is closed. -// -// Parameters: -// - ctx: The context to control the cancellation of the worker. -// - worker: The worker that processes the work units. -func spawn_worker[T, S any](ctx context.Context, worker *Worker[T, S]) { - for workUnit := range worker.receptor { - // Wait for rate-limit - <-worker.rate_limit - - var workCtx context.Context - var cancel context.CancelFunc - - if worker.timeout != 0 { - workCtx, cancel = context.WithTimeout(ctx, worker.timeout) - } else { - workCtx, cancel = context.WithCancel(ctx) - } - - done := make(chan struct{}) - - var value S - var err error - - go func() { - value, err = worker.work(workUnit.argument) - close(done) - }() - - select { - case <-done: - workUnit.result = value - workUnit.err = err - case <-workCtx.Done(): - workUnit.err = workCtx.Err() - } - - cancel() - - worker.transmitter <- workUnit - } -} - -// handleFailedWorkUnit handles a failed work unit by retrying it or marking it as failed. -// If the maximum number of retries is reached, the work unit is marked as failed and no further retries are attempted. -// Otherwise, the work unit is retried with an exponential backoff and jitter. -// -// Parameters: -// - workUnit: The work unit that failed. -// - channels: The channels used for communication between different parts of the system. -// - config: The configuration for the work unit. -// -// Returns: -// - A boolean indicating whether the work unit will be retried (true) or not (false). -func handleFailedWorkUnit[T, S any]( - workUnit *WorkUnit[T, S], - channels *Channels[T, S], - config *WorkConfig, -) bool { - // If retries == 0, retries are disabled, return immediately - if config.max_retries == 0 || config.max_retries <= workUnit.attempts { - channels.tasks_failed <- workUnit.err - config.tasks_processed.Done() - return false - } - - workUnit.attempts++ - workUnit.err = nil - - if workUnit.timeout == 0 { - workUnit.timeout = config.base_retry_time - } else { - workUnit.timeout *= 2 - } - - go func() { - jitter := time.Duration(rand.Int63n(int64(workUnit.timeout))) - timeout := workUnit.timeout + jitter - fmt.Printf( - "Unit with value %v failed for %v time, retrying in: %v\n", - workUnit.argument, - workUnit.attempts, - timeout, - ) - time.Sleep(timeout) - channels.units_dispatcher <- *workUnit - }() - - return true -} - -// Listens for work results from the units_receiver channel and processes them. -// It handles failed work units and sends successful results to the tasks_done channel. -// The function stops processing when the context is done or when the units_receiver channel is closed. -// -// Parameters: -// - ctx: The context to control the cancellation of the listener. -// - channels: The channels used for communication between different parts of the system. -// - config: The configuration for the work listener. -func listenForWorkResults[T, S any]( - ctx context.Context, - channels *Channels[T, S], - config *WorkConfig, -) { - for { - select { - case workUnit, ok := <-channels.units_receiver: - if !ok { - return - } - - if workUnit.err != nil { - handleFailedWorkUnit(&workUnit, channels, config) - continue - } - - // Send message to user - channels.tasks_done <- workUnit.result - config.tasks_processed.Done() - case <-ctx.Done(): - return - } - } -} - -// workUnitDispatcher is responsible for dispatching work units to the units_dispatcher channel. -// It listens for tasks from the tasks_queue channel and sends them to the units_dispatcher channel. -// The function stops processing work when the context is done or when the tasks_queue channel is closed. -// -// Parameters: -// - ctx: The context to control the cancellation of the dispatcher. -// - finish: The cancel function to stop the dispatcher. -// - channels: The channels used for communication between different parts of the system. -// - config: The configuration for the work dispatcher. -func workUnitDispatcher[T, S any]( - ctx context.Context, - finish context.CancelFunc, - channels *Channels[T, S], - config *WorkConfig, -) { - defer stopProcessingWork(finish, channels, config) - - for { - select { - case value, ok := <-channels.tasks_queue: - if !ok { - return - } - - workUnit := WorkUnit[T, S]{ - argument: value, - timeout: 0, - attempts: 0, - } - channels.units_dispatcher <- workUnit - config.tasks_processed.Add(1) - - case <-ctx.Done(): - fmt.Println("context done") - return - } - } -} - -// Stops the processing of work by closing all channels and calling the finish function. -// It waits for all tasks to be processed before closing the channels. -// -// Parameters: -// - finish: The context.CancelFunc to propagate the stop signal. -// - channels: The channels used for communication between different parts of the system. -// - config: The configuration for the work processing. -func stopProcessingWork[T, S any]( - finish context.CancelFunc, - channels *Channels[T, S], - config *WorkConfig, -) { - config.tasks_processed.Wait() - - close(channels.units_receiver) - close(channels.units_dispatcher) - close(channels.tasks_done) - close(channels.tasks_failed) - - finish() -} - -// asyncTaskRunner runs tasks asynchronously using a pool of workers. -// It sets default values for the WorkConfig if not provided, creates channels for communication, -// and starts the workers and dispatcher. -// -// Parameters: -// - ctx: The context to control the cancellation of the task runner. -// - inbound: The channel from which tasks are received. -// - config: The configuration for the task runner. -// - work: The work function to be executed by the workers. -// -// Returns: -// - A channel that receives the results of the tasks. -// - A channel that receives errors from the tasks. -// - A channel that signals when the task runner is done. -func asyncTaskRunner[T, S any]( - ctx context.Context, - inbound chan T, - config *WorkConfig, - work Work[T, S], -) (<-chan S, <-chan error, <-chan struct{}) { - // Set default values for WorkConfig if not provided - if config.amount_of_workers == 0 { - config.amount_of_workers = 5 - } - if config.base_retry_time == 0 { - config.base_retry_time = 1 * time.Second - } - if config.rate_limit == nil { - config.rate_limit = NewRateLimiter(10, time.Second) - } - - // Ensure a clean wait group is ussed - config.tasks_processed = sync.WaitGroup{} - - channel_size := config.amount_of_workers * 3 - - done, finish := context.WithCancel(ctx) - - channels := &Channels[T, S]{ - tasks_queue: inbound, - tasks_done: make(chan S), - tasks_failed: make(chan error), - units_dispatcher: make(chan WorkUnit[T, S], channel_size), - units_receiver: make(chan WorkUnit[T, S], channel_size), - } - - // create pool of workers - for i := range config.amount_of_workers { - worker := &Worker[T, S]{ - id: uint8(i), - receptor: channels.units_dispatcher, - transmitter: channels.units_receiver, - rate_limit: config.rate_limit, - timeout: config.timeout, - work: work, - } - - go spawn_worker(ctx, worker) - } - - go listenForWorkResults(done, channels, config) - go workUnitDispatcher(done, finish, channels, config) - return channels.tasks_done, channels.tasks_failed, done.Done() +func getPageByOffset(pagination *Pagination) (uint64, error) { + if pagination.Pages == 0 { + return 0, fmt.Errorf("division by zero") + } + + total := pagination.Pages * pagination.Limit + + result := (pagination.Pages * pagination.Offset) / total + + return result, nil } diff --git a/pkg/platform.go b/pkg/platform.go index 6997519..bf94b55 100644 --- a/pkg/platform.go +++ b/pkg/platform.go @@ -14,25 +14,12 @@ type Platform struct { Collections []*Collection // Child nodes } -func (platform *Platform) FetchCollections(fetcher Fetcher, start_pagination Pagination) error { - fetchWrapper := func(offset int) ([]*Collection, error) { - // fmt.Printf("Requesting offset: %v\n", offset) - - if offset == 10 { - time.Sleep(time.Second * 5) - } - - pagination := start_pagination - - pagination.Offset = offset - collections, _, err := fetcher(pagination) - if err != nil { - return nil, err - } - - return collections, nil - } +type FetchCollectionResponse = FetchResponse[[]*Collection] +func (platform *Platform) FetchCollections( + fetcher Work[Pagination, FetchCollectionResponse], + start_pagination Pagination, +) error { ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -44,15 +31,62 @@ func (platform *Platform) FetchCollections(fetcher Fetcher, start_pagination Pag timeout: time.Second * 2, } - tasks := make(chan int) + tasks := make(chan Pagination) - // TODO: get number of page dynamically and change Fetcher signature - pages := 4 + results, errors, done := asyncTaskRunner( + ctx, + tasks, + config, + fetcher, + ) - results, errors, done := asyncTaskRunner(ctx, tasks, config, fetchWrapper) + var current_page uint64 = 0 - for i := range pages { - tasks <- i * start_pagination.Limit + if start_pagination.Pages == 0 { + // do the first fetch + tasks <- start_pagination + + select { + case response, ok := <-results: + if !ok { + break + } + platform.Collections = slices.Concat(platform.Collections, response.Response) + + pages, err := calculatePages(&response.Pagination, start_pagination.Offset) + if err != nil { + return err + } + start_pagination.Pages = pages + start_pagination.Total = response.Pagination.Total + current_page++ + + case error, ok := <-errors: + if !ok { + return fmt.Errorf("Could not do first fetch to calculate pages: %v\n", error) + } + + case <-ctx.Done(): + break + case <-done: + break + } + } + + page_offset, err := getPageByOffset(&start_pagination) + if err != nil { + return err + } + + current_page += page_offset + + fmt.Printf("Total pages: %v, Current page: %v\n", start_pagination.Pages, current_page) + + for current_page <= start_pagination.Pages { + page := start_pagination + page.Offset = current_page * page.Limit + tasks <- page + current_page++ } close(tasks) @@ -60,11 +94,11 @@ func (platform *Platform) FetchCollections(fetcher Fetcher, start_pagination Pag loop: for { select { - case collection, ok := <-results: + case response, ok := <-results: if !ok { continue } - platform.Collections = slices.Concat(platform.Collections, collection) + platform.Collections = slices.Concat(platform.Collections, response.Response) case error, ok := <-errors: if !ok { continue @@ -80,7 +114,7 @@ loop: fmt.Printf("Collections: %v\n", len(platform.Collections)) - err := BulkCreateNode(platform._conn, platform.Collections) + err = BulkCreateNode(platform._conn, platform.Collections) if err != nil { return err } diff --git a/pkg/workpool.go b/pkg/workpool.go new file mode 100644 index 0000000..2a3faf5 --- /dev/null +++ b/pkg/workpool.go @@ -0,0 +1,334 @@ +package synchronizator + +import ( + "context" + "fmt" + "math/rand" + "sync" + "time" +) + +// NewRateLimiter creates a rate limiter that emits time events at a specified rate. +// request_per specifies the number of requests allowed per time_scale duration. +// time_scale specifies the duration over which the requests are allowed. +func NewRateLimiter(request_per int, time_scale time.Duration) <-chan time.Time { + rate_limit := make(chan time.Time, request_per) + tickrate := time_scale / time.Duration(request_per) + + for range request_per { + rate_limit <- time.Now() + } + + go func() { + for t := range time.Tick(tickrate) { + rate_limit <- t + } + }() + + return rate_limit +} + +// WorkUnit represents a unit of work to be processed. +type WorkUnit[T, S any] struct { + argument T // Argument to be processed by a worker. + result S // Result of the processing. + err error // Error encountered during processing. + timeout time.Duration // Timeout for the work unit. + attempts uint8 // Number of attempts made to process the work unit. +} + +// Work defines a function type that processes a value of type T and returns a result of type S or an error. +type Work[T, S any] func(ctx context.Context, value T) (S, error) + +// Worker represents a worker that processes tasks of type T and returns results of type S. +type Worker[T, S any] struct { + id uint8 // Unique identifier of the worker. + receptor <-chan WorkUnit[T, S] // Channel from which the worker receives tasks. + transmitter chan<- WorkUnit[T, S] // Channel to which the worker sends results. + wg *sync.WaitGroup // Wait group to synchronize the completion of tasks. should be decremented each time a task has been processed + work Work[T, S] // Function that processes tasks. + rate_limit <-chan time.Time // Ticker to limit the amount of request. Is recomended to pass the result of calling NewRateLimiter(). + timeout time.Duration // Maximum execution time allowed for a task before beign canceled. +} + +// WorkConfig represents the configuration for the work processing. +// All fields are optional and have sensible defaults: +// +// - tasks_processed: always a new WaitGroup +// - max_workers: default is 5 +// - amount_of_workers: if 0, retries are disabled +// - base_retry_time: default is 1 second +// - rate_limit: default is 10 requests per second +// - timeout: if 0, timeout is disabled +type WorkConfig struct { + tasks_processed sync.WaitGroup // Wait group to synchronize task completion. + amount_of_workers uint8 // Number of workers to spawn. + max_retries uint8 // Maximum number of retries for a task before beign cancelled. + base_retry_time time.Duration // Base factor to wait for before retrying a task. + rate_limit <-chan time.Time // Ticker to limit the amount of request. Is recomended to pass the result of calling NewRateLimiter(). + timeout time.Duration // Maximum execution time allowed for a task before beign canceled. +} + +// Group the channels used for task processing for easy access between functions. +type Channels[T, S any] struct { + tasks_queue chan T // Channel for incoming tasks. + tasks_done chan S // Channel for completed tasks. + tasks_failed chan error // Channel for failed tasks. + units_dispatcher chan WorkUnit[T, S] // Channel for dispatching work units. + units_receiver chan WorkUnit[T, S] // Channel for receiving processed work units. +} + +// Starts a worker that processes work units received from the worker's receptor channel. +// It waits for rate limits, processes the work unit, and sends the result to the worker's transmitter channel. +// It also applies rate limit if is enabled. +// +// The function stops processing when the receptor channel is closed. +// +// Parameters: +// - ctx: The context to control the cancellation of the worker. +// - worker: The worker that processes the work units. +func spawn_worker[T, S any](ctx context.Context, worker *Worker[T, S]) { + for workUnit := range worker.receptor { + // Wait for rate-limit + <-worker.rate_limit + + var workCtx context.Context + var cancel context.CancelFunc + + if worker.timeout != 0 { + workCtx, cancel = context.WithTimeout(ctx, worker.timeout) + } else { + workCtx, cancel = context.WithCancel(ctx) + } + + done := make(chan struct{}) + + var value S + var err error + + go func() { + value, err = worker.work(ctx, workUnit.argument) + close(done) + }() + + select { + case <-done: + workUnit.result = value + workUnit.err = err + case <-workCtx.Done(): + workUnit.err = workCtx.Err() + } + + cancel() + + worker.transmitter <- workUnit + } +} + +// handleFailedWorkUnit handles a failed work unit by retrying it or marking it as failed. +// If the maximum number of retries is reached, the work unit is marked as failed and no further retries are attempted. +// Otherwise, the work unit is retried with an exponential backoff and jitter. +// +// Parameters: +// - workUnit: The work unit that failed. +// - channels: The channels used for communication between different parts of the system. +// - config: The configuration for the work unit. +// +// Returns: +// - A boolean indicating whether the work unit will be retried (true) or not (false). +func handleFailedWorkUnit[T, S any]( + workUnit *WorkUnit[T, S], + channels *Channels[T, S], + config *WorkConfig, +) bool { + // If retries == 0, retries are disabled, return immediately + if config.max_retries == 0 || config.max_retries <= workUnit.attempts { + channels.tasks_failed <- workUnit.err + config.tasks_processed.Done() + return false + } + + workUnit.attempts++ + workUnit.err = nil + + if workUnit.timeout == 0 { + workUnit.timeout = config.base_retry_time + } else { + workUnit.timeout *= 2 + } + + go func() { + jitter := time.Duration(rand.Int63n(int64(workUnit.timeout))) + timeout := workUnit.timeout + jitter + fmt.Printf( + "Unit with value %v failed for %v time, retrying in: %v\n", + workUnit.argument, + workUnit.attempts, + timeout, + ) + time.Sleep(timeout) + channels.units_dispatcher <- *workUnit + }() + + return true +} + +// Listens for work results from the units_receiver channel and processes them. +// It handles failed work units and sends successful results to the tasks_done channel. +// The function stops processing when the context is done or when the units_receiver channel is closed. +// +// Parameters: +// - ctx: The context to control the cancellation of the listener. +// - channels: The channels used for communication between different parts of the system. +// - config: The configuration for the work listener. +func listenForWorkResults[T, S any]( + ctx context.Context, + channels *Channels[T, S], + config *WorkConfig, +) { + for { + select { + case workUnit, ok := <-channels.units_receiver: + if !ok { + return + } + + if workUnit.err != nil { + handleFailedWorkUnit(&workUnit, channels, config) + continue + } + + // Send message to user + channels.tasks_done <- workUnit.result + config.tasks_processed.Done() + case <-ctx.Done(): + return + } + } +} + +// workUnitDispatcher is responsible for dispatching work units to the units_dispatcher channel. +// It listens for tasks from the tasks_queue channel and sends them to the units_dispatcher channel. +// The function stops processing work when the context is done or when the tasks_queue channel is closed. +// +// Parameters: +// - ctx: The context to control the cancellation of the dispatcher. +// - finish: The cancel function to stop the dispatcher. +// - channels: The channels used for communication between different parts of the system. +// - config: The configuration for the work dispatcher. +func workUnitDispatcher[T, S any]( + ctx context.Context, + finish context.CancelFunc, + channels *Channels[T, S], + config *WorkConfig, +) { + defer stopProcessingWork(finish, channels, config) + + for { + select { + case value, ok := <-channels.tasks_queue: + if !ok { + return + } + + workUnit := WorkUnit[T, S]{ + argument: value, + timeout: 0, + attempts: 0, + } + channels.units_dispatcher <- workUnit + config.tasks_processed.Add(1) + + case <-ctx.Done(): + fmt.Println("context done") + return + } + } +} + +// Stops the processing of work by closing all channels and calling the finish function. +// It waits for all tasks to be processed before closing the channels. +// +// Parameters: +// - finish: The context.CancelFunc to propagate the stop signal. +// - channels: The channels used for communication between different parts of the system. +// - config: The configuration for the work processing. +func stopProcessingWork[T, S any]( + finish context.CancelFunc, + channels *Channels[T, S], + config *WorkConfig, +) { + config.tasks_processed.Wait() + + close(channels.units_receiver) + close(channels.units_dispatcher) + close(channels.tasks_done) + close(channels.tasks_failed) + + finish() +} + +// asyncTaskRunner runs tasks asynchronously using a pool of workers. +// It sets default values for the WorkConfig if not provided, creates channels for communication, +// and starts the workers and dispatcher. +// +// Parameters: +// - ctx: The context to control the cancellation of the task runner. +// - inbound: The channel from which tasks are received. +// - config: The configuration for the task runner. +// - work: The work function to be executed by the workers. +// +// Returns: +// - A channel that receives the results of the tasks. +// - A channel that receives errors from the tasks. +// - A channel that signals when the task runner is done. +func asyncTaskRunner[T, S any]( + ctx context.Context, + inbound chan T, + config *WorkConfig, + work Work[T, S], +) (<-chan S, <-chan error, <-chan struct{}) { + // Set default values for WorkConfig if not provided + if config.amount_of_workers == 0 { + config.amount_of_workers = 5 + } + if config.base_retry_time == 0 { + config.base_retry_time = 1 * time.Second + } + if config.rate_limit == nil { + config.rate_limit = NewRateLimiter(10, time.Second) + } + + // Ensure a clean wait group is ussed + config.tasks_processed = sync.WaitGroup{} + + channel_size := config.amount_of_workers * 3 + + done, finish := context.WithCancel(ctx) + + channels := &Channels[T, S]{ + tasks_queue: inbound, + tasks_done: make(chan S), + tasks_failed: make(chan error), + units_dispatcher: make(chan WorkUnit[T, S], channel_size), + units_receiver: make(chan WorkUnit[T, S], channel_size), + } + + // create pool of workers + for i := range config.amount_of_workers { + worker := &Worker[T, S]{ + id: uint8(i), + receptor: channels.units_dispatcher, + transmitter: channels.units_receiver, + rate_limit: config.rate_limit, + timeout: config.timeout, + work: work, + } + + go spawn_worker(ctx, worker) + } + + go listenForWorkResults(done, channels, config) + go workUnitDispatcher(done, finish, channels, config) + return channels.tasks_done, channels.tasks_failed, done.Done() +}