generated from alecodes/base-template
feat: auto calculate pagination in fetcher
also refactor a little the worker pool
This commit is contained in:
parent
837e703bc4
commit
96af51ee68
4 changed files with 451 additions and 374 deletions
369
pkg/fetcher.go
369
pkg/fetcher.go
|
|
@ -1,354 +1,53 @@
|
|||
package synchronizator
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
// Fetcher is a function type that fetches a collection of items based on the provided pagination information.
|
||||
// It returns a slice of collections, updated pagination information, and an error if any occurred.
|
||||
type Fetcher = func(pagination Pagination) ([]*Collection, Pagination, error)
|
||||
|
||||
// Pagination represents the pagination information for fetching data.
|
||||
type Pagination struct {
|
||||
Total int // Total number of items.
|
||||
HasMore bool // Indicates if there are more items to fetch.
|
||||
Limit int // Number of items to fetch per request.
|
||||
Offset int // Offset for the next set of items to fetch.
|
||||
}
|
||||
import "fmt"
|
||||
|
||||
// StartPagination is the initial pagination configuration.
|
||||
var StartPagination = Pagination{
|
||||
Total: 0,
|
||||
Pages: 0,
|
||||
HasMore: false,
|
||||
Limit: 10,
|
||||
Offset: 0,
|
||||
}
|
||||
|
||||
// NewRateLimiter creates a rate limiter that emits time events at a specified rate.
|
||||
// request_per specifies the number of requests allowed per time_scale duration.
|
||||
// time_scale specifies the duration over which the requests are allowed.
|
||||
func NewRateLimiter(request_per int, time_scale time.Duration) <-chan time.Time {
|
||||
rate_limit := make(chan time.Time, request_per)
|
||||
tickrate := time_scale / time.Duration(request_per)
|
||||
|
||||
for range request_per {
|
||||
rate_limit <- time.Now()
|
||||
}
|
||||
|
||||
go func() {
|
||||
for t := range time.Tick(tickrate) {
|
||||
rate_limit <- t
|
||||
}
|
||||
}()
|
||||
|
||||
return rate_limit
|
||||
type FetchResponse[T any] struct {
|
||||
Pagination
|
||||
Response T
|
||||
}
|
||||
|
||||
// WorkUnit represents a unit of work to be processed.
|
||||
type WorkUnit[T, S any] struct {
|
||||
argument T // Argument to be processed by a worker.
|
||||
result S // Result of the processing.
|
||||
err error // Error encountered during processing.
|
||||
timeout time.Duration // Timeout for the work unit.
|
||||
attempts uint8 // Number of attempts made to process the work unit.
|
||||
// Pagination represents the pagination information for fetching data.
|
||||
type Pagination struct {
|
||||
Total uint64 // Total number of items.
|
||||
Pages uint64
|
||||
HasMore bool // Indicates if there are more items to fetch.
|
||||
Limit uint64 // Number of items to fetch per request.
|
||||
Offset uint64 // Offset for the next set of items to fetch.
|
||||
}
|
||||
|
||||
// Work defines a function type that processes a value of type T and returns a result of type S or an error.
|
||||
type Work[T, S any] func(value T) (S, error)
|
||||
func calculatePages(pagination *Pagination, offset uint64) (uint64, error) {
|
||||
if pagination.Limit == 0 {
|
||||
return 0, fmt.Errorf("division by zero")
|
||||
}
|
||||
|
||||
// Worker represents a worker that processes tasks of type T and returns results of type S.
|
||||
type Worker[T, S any] struct {
|
||||
id uint8 // Unique identifier of the worker.
|
||||
receptor <-chan WorkUnit[T, S] // Channel from which the worker receives tasks.
|
||||
transmitter chan<- WorkUnit[T, S] // Channel to which the worker sends results.
|
||||
wg *sync.WaitGroup // Wait group to synchronize the completion of tasks. should be decremented each time a task has been processed
|
||||
work Work[T, S] // Function that processes tasks.
|
||||
rate_limit <-chan time.Time // Ticker to limit the amount of request. Is recomended to pass the result of calling NewRateLimiter().
|
||||
timeout time.Duration // Maximum execution time allowed for a task before beign canceled.
|
||||
dividend := pagination.Total - offset
|
||||
divisor := pagination.Limit
|
||||
|
||||
result := dividend / divisor
|
||||
if dividend%divisor != 0 {
|
||||
result++
|
||||
}
|
||||
return result, nil
|
||||
}
|
||||
|
||||
// WorkConfig represents the configuration for the work processing.
|
||||
// All fields are optional and have sensible defaults:
|
||||
//
|
||||
// - tasks_processed: always a new WaitGroup
|
||||
// - max_workers: default is 5
|
||||
// - amount_of_workers: if 0, retries are disabled
|
||||
// - base_retry_time: default is 1 second
|
||||
// - rate_limit: default is 10 requests per second
|
||||
// - timeout: if 0, timeout is disabled
|
||||
type WorkConfig struct {
|
||||
tasks_processed sync.WaitGroup // Wait group to synchronize task completion.
|
||||
amount_of_workers uint8 // Number of workers to spawn.
|
||||
max_retries uint8 // Maximum number of retries for a task before beign cancelled.
|
||||
base_retry_time time.Duration // Base factor to wait for before retrying a task.
|
||||
rate_limit <-chan time.Time // Ticker to limit the amount of request. Is recomended to pass the result of calling NewRateLimiter().
|
||||
timeout time.Duration // Maximum execution time allowed for a task before beign canceled.
|
||||
}
|
||||
|
||||
// Group the channels used for task processing for easy access between functions.
|
||||
type Channels[T, S any] struct {
|
||||
tasks_queue chan T // Channel for incoming tasks.
|
||||
tasks_done chan S // Channel for completed tasks.
|
||||
tasks_failed chan error // Channel for failed tasks.
|
||||
units_dispatcher chan WorkUnit[T, S] // Channel for dispatching work units.
|
||||
units_receiver chan WorkUnit[T, S] // Channel for receiving processed work units.
|
||||
}
|
||||
|
||||
// Starts a worker that processes work units received from the worker's receptor channel.
|
||||
// It waits for rate limits, processes the work unit, and sends the result to the worker's transmitter channel.
|
||||
// It also applies rate limit if is enabled.
|
||||
//
|
||||
// The function stops processing when the receptor channel is closed.
|
||||
//
|
||||
// Parameters:
|
||||
// - ctx: The context to control the cancellation of the worker.
|
||||
// - worker: The worker that processes the work units.
|
||||
func spawn_worker[T, S any](ctx context.Context, worker *Worker[T, S]) {
|
||||
for workUnit := range worker.receptor {
|
||||
// Wait for rate-limit
|
||||
<-worker.rate_limit
|
||||
|
||||
var workCtx context.Context
|
||||
var cancel context.CancelFunc
|
||||
|
||||
if worker.timeout != 0 {
|
||||
workCtx, cancel = context.WithTimeout(ctx, worker.timeout)
|
||||
} else {
|
||||
workCtx, cancel = context.WithCancel(ctx)
|
||||
}
|
||||
|
||||
done := make(chan struct{})
|
||||
|
||||
var value S
|
||||
var err error
|
||||
|
||||
go func() {
|
||||
value, err = worker.work(workUnit.argument)
|
||||
close(done)
|
||||
}()
|
||||
|
||||
select {
|
||||
case <-done:
|
||||
workUnit.result = value
|
||||
workUnit.err = err
|
||||
case <-workCtx.Done():
|
||||
workUnit.err = workCtx.Err()
|
||||
}
|
||||
|
||||
cancel()
|
||||
|
||||
worker.transmitter <- workUnit
|
||||
}
|
||||
}
|
||||
|
||||
// handleFailedWorkUnit handles a failed work unit by retrying it or marking it as failed.
|
||||
// If the maximum number of retries is reached, the work unit is marked as failed and no further retries are attempted.
|
||||
// Otherwise, the work unit is retried with an exponential backoff and jitter.
|
||||
//
|
||||
// Parameters:
|
||||
// - workUnit: The work unit that failed.
|
||||
// - channels: The channels used for communication between different parts of the system.
|
||||
// - config: The configuration for the work unit.
|
||||
//
|
||||
// Returns:
|
||||
// - A boolean indicating whether the work unit will be retried (true) or not (false).
|
||||
func handleFailedWorkUnit[T, S any](
|
||||
workUnit *WorkUnit[T, S],
|
||||
channels *Channels[T, S],
|
||||
config *WorkConfig,
|
||||
) bool {
|
||||
// If retries == 0, retries are disabled, return immediately
|
||||
if config.max_retries == 0 || config.max_retries <= workUnit.attempts {
|
||||
channels.tasks_failed <- workUnit.err
|
||||
config.tasks_processed.Done()
|
||||
return false
|
||||
}
|
||||
|
||||
workUnit.attempts++
|
||||
workUnit.err = nil
|
||||
|
||||
if workUnit.timeout == 0 {
|
||||
workUnit.timeout = config.base_retry_time
|
||||
} else {
|
||||
workUnit.timeout *= 2
|
||||
}
|
||||
|
||||
go func() {
|
||||
jitter := time.Duration(rand.Int63n(int64(workUnit.timeout)))
|
||||
timeout := workUnit.timeout + jitter
|
||||
fmt.Printf(
|
||||
"Unit with value %v failed for %v time, retrying in: %v\n",
|
||||
workUnit.argument,
|
||||
workUnit.attempts,
|
||||
timeout,
|
||||
)
|
||||
time.Sleep(timeout)
|
||||
channels.units_dispatcher <- *workUnit
|
||||
}()
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
// Listens for work results from the units_receiver channel and processes them.
|
||||
// It handles failed work units and sends successful results to the tasks_done channel.
|
||||
// The function stops processing when the context is done or when the units_receiver channel is closed.
|
||||
//
|
||||
// Parameters:
|
||||
// - ctx: The context to control the cancellation of the listener.
|
||||
// - channels: The channels used for communication between different parts of the system.
|
||||
// - config: The configuration for the work listener.
|
||||
func listenForWorkResults[T, S any](
|
||||
ctx context.Context,
|
||||
channels *Channels[T, S],
|
||||
config *WorkConfig,
|
||||
) {
|
||||
for {
|
||||
select {
|
||||
case workUnit, ok := <-channels.units_receiver:
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
|
||||
if workUnit.err != nil {
|
||||
handleFailedWorkUnit(&workUnit, channels, config)
|
||||
continue
|
||||
}
|
||||
|
||||
// Send message to user
|
||||
channels.tasks_done <- workUnit.result
|
||||
config.tasks_processed.Done()
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// workUnitDispatcher is responsible for dispatching work units to the units_dispatcher channel.
|
||||
// It listens for tasks from the tasks_queue channel and sends them to the units_dispatcher channel.
|
||||
// The function stops processing work when the context is done or when the tasks_queue channel is closed.
|
||||
//
|
||||
// Parameters:
|
||||
// - ctx: The context to control the cancellation of the dispatcher.
|
||||
// - finish: The cancel function to stop the dispatcher.
|
||||
// - channels: The channels used for communication between different parts of the system.
|
||||
// - config: The configuration for the work dispatcher.
|
||||
func workUnitDispatcher[T, S any](
|
||||
ctx context.Context,
|
||||
finish context.CancelFunc,
|
||||
channels *Channels[T, S],
|
||||
config *WorkConfig,
|
||||
) {
|
||||
defer stopProcessingWork(finish, channels, config)
|
||||
|
||||
for {
|
||||
select {
|
||||
case value, ok := <-channels.tasks_queue:
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
|
||||
workUnit := WorkUnit[T, S]{
|
||||
argument: value,
|
||||
timeout: 0,
|
||||
attempts: 0,
|
||||
}
|
||||
channels.units_dispatcher <- workUnit
|
||||
config.tasks_processed.Add(1)
|
||||
|
||||
case <-ctx.Done():
|
||||
fmt.Println("context done")
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Stops the processing of work by closing all channels and calling the finish function.
|
||||
// It waits for all tasks to be processed before closing the channels.
|
||||
//
|
||||
// Parameters:
|
||||
// - finish: The context.CancelFunc to propagate the stop signal.
|
||||
// - channels: The channels used for communication between different parts of the system.
|
||||
// - config: The configuration for the work processing.
|
||||
func stopProcessingWork[T, S any](
|
||||
finish context.CancelFunc,
|
||||
channels *Channels[T, S],
|
||||
config *WorkConfig,
|
||||
) {
|
||||
config.tasks_processed.Wait()
|
||||
|
||||
close(channels.units_receiver)
|
||||
close(channels.units_dispatcher)
|
||||
close(channels.tasks_done)
|
||||
close(channels.tasks_failed)
|
||||
|
||||
finish()
|
||||
}
|
||||
|
||||
// asyncTaskRunner runs tasks asynchronously using a pool of workers.
|
||||
// It sets default values for the WorkConfig if not provided, creates channels for communication,
|
||||
// and starts the workers and dispatcher.
|
||||
//
|
||||
// Parameters:
|
||||
// - ctx: The context to control the cancellation of the task runner.
|
||||
// - inbound: The channel from which tasks are received.
|
||||
// - config: The configuration for the task runner.
|
||||
// - work: The work function to be executed by the workers.
|
||||
//
|
||||
// Returns:
|
||||
// - A channel that receives the results of the tasks.
|
||||
// - A channel that receives errors from the tasks.
|
||||
// - A channel that signals when the task runner is done.
|
||||
func asyncTaskRunner[T, S any](
|
||||
ctx context.Context,
|
||||
inbound chan T,
|
||||
config *WorkConfig,
|
||||
work Work[T, S],
|
||||
) (<-chan S, <-chan error, <-chan struct{}) {
|
||||
// Set default values for WorkConfig if not provided
|
||||
if config.amount_of_workers == 0 {
|
||||
config.amount_of_workers = 5
|
||||
}
|
||||
if config.base_retry_time == 0 {
|
||||
config.base_retry_time = 1 * time.Second
|
||||
}
|
||||
if config.rate_limit == nil {
|
||||
config.rate_limit = NewRateLimiter(10, time.Second)
|
||||
}
|
||||
|
||||
// Ensure a clean wait group is ussed
|
||||
config.tasks_processed = sync.WaitGroup{}
|
||||
|
||||
channel_size := config.amount_of_workers * 3
|
||||
|
||||
done, finish := context.WithCancel(ctx)
|
||||
|
||||
channels := &Channels[T, S]{
|
||||
tasks_queue: inbound,
|
||||
tasks_done: make(chan S),
|
||||
tasks_failed: make(chan error),
|
||||
units_dispatcher: make(chan WorkUnit[T, S], channel_size),
|
||||
units_receiver: make(chan WorkUnit[T, S], channel_size),
|
||||
}
|
||||
|
||||
// create pool of workers
|
||||
for i := range config.amount_of_workers {
|
||||
worker := &Worker[T, S]{
|
||||
id: uint8(i),
|
||||
receptor: channels.units_dispatcher,
|
||||
transmitter: channels.units_receiver,
|
||||
rate_limit: config.rate_limit,
|
||||
timeout: config.timeout,
|
||||
work: work,
|
||||
}
|
||||
|
||||
go spawn_worker(ctx, worker)
|
||||
}
|
||||
|
||||
go listenForWorkResults(done, channels, config)
|
||||
go workUnitDispatcher(done, finish, channels, config)
|
||||
return channels.tasks_done, channels.tasks_failed, done.Done()
|
||||
func getPageByOffset(pagination *Pagination) (uint64, error) {
|
||||
if pagination.Pages == 0 {
|
||||
return 0, fmt.Errorf("division by zero")
|
||||
}
|
||||
|
||||
total := pagination.Pages * pagination.Limit
|
||||
|
||||
result := (pagination.Pages * pagination.Offset) / total
|
||||
|
||||
return result, nil
|
||||
}
|
||||
|
|
|
|||
|
|
@ -14,25 +14,12 @@ type Platform struct {
|
|||
Collections []*Collection // Child nodes
|
||||
}
|
||||
|
||||
func (platform *Platform) FetchCollections(fetcher Fetcher, start_pagination Pagination) error {
|
||||
fetchWrapper := func(offset int) ([]*Collection, error) {
|
||||
// fmt.Printf("Requesting offset: %v\n", offset)
|
||||
|
||||
if offset == 10 {
|
||||
time.Sleep(time.Second * 5)
|
||||
}
|
||||
|
||||
pagination := start_pagination
|
||||
|
||||
pagination.Offset = offset
|
||||
collections, _, err := fetcher(pagination)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return collections, nil
|
||||
}
|
||||
type FetchCollectionResponse = FetchResponse[[]*Collection]
|
||||
|
||||
func (platform *Platform) FetchCollections(
|
||||
fetcher Work[Pagination, FetchCollectionResponse],
|
||||
start_pagination Pagination,
|
||||
) error {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
|
|
@ -44,15 +31,62 @@ func (platform *Platform) FetchCollections(fetcher Fetcher, start_pagination Pag
|
|||
timeout: time.Second * 2,
|
||||
}
|
||||
|
||||
tasks := make(chan int)
|
||||
tasks := make(chan Pagination)
|
||||
|
||||
// TODO: get number of page dynamically and change Fetcher signature
|
||||
pages := 4
|
||||
results, errors, done := asyncTaskRunner(
|
||||
ctx,
|
||||
tasks,
|
||||
config,
|
||||
fetcher,
|
||||
)
|
||||
|
||||
results, errors, done := asyncTaskRunner(ctx, tasks, config, fetchWrapper)
|
||||
var current_page uint64 = 0
|
||||
|
||||
for i := range pages {
|
||||
tasks <- i * start_pagination.Limit
|
||||
if start_pagination.Pages == 0 {
|
||||
// do the first fetch
|
||||
tasks <- start_pagination
|
||||
|
||||
select {
|
||||
case response, ok := <-results:
|
||||
if !ok {
|
||||
break
|
||||
}
|
||||
platform.Collections = slices.Concat(platform.Collections, response.Response)
|
||||
|
||||
pages, err := calculatePages(&response.Pagination, start_pagination.Offset)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
start_pagination.Pages = pages
|
||||
start_pagination.Total = response.Pagination.Total
|
||||
current_page++
|
||||
|
||||
case error, ok := <-errors:
|
||||
if !ok {
|
||||
return fmt.Errorf("Could not do first fetch to calculate pages: %v\n", error)
|
||||
}
|
||||
|
||||
case <-ctx.Done():
|
||||
break
|
||||
case <-done:
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
page_offset, err := getPageByOffset(&start_pagination)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
current_page += page_offset
|
||||
|
||||
fmt.Printf("Total pages: %v, Current page: %v\n", start_pagination.Pages, current_page)
|
||||
|
||||
for current_page <= start_pagination.Pages {
|
||||
page := start_pagination
|
||||
page.Offset = current_page * page.Limit
|
||||
tasks <- page
|
||||
current_page++
|
||||
}
|
||||
|
||||
close(tasks)
|
||||
|
|
@ -60,11 +94,11 @@ func (platform *Platform) FetchCollections(fetcher Fetcher, start_pagination Pag
|
|||
loop:
|
||||
for {
|
||||
select {
|
||||
case collection, ok := <-results:
|
||||
case response, ok := <-results:
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
platform.Collections = slices.Concat(platform.Collections, collection)
|
||||
platform.Collections = slices.Concat(platform.Collections, response.Response)
|
||||
case error, ok := <-errors:
|
||||
if !ok {
|
||||
continue
|
||||
|
|
@ -80,7 +114,7 @@ loop:
|
|||
|
||||
fmt.Printf("Collections: %v\n", len(platform.Collections))
|
||||
|
||||
err := BulkCreateNode(platform._conn, platform.Collections)
|
||||
err = BulkCreateNode(platform._conn, platform.Collections)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
|
|||
334
pkg/workpool.go
Normal file
334
pkg/workpool.go
Normal file
|
|
@ -0,0 +1,334 @@
|
|||
package synchronizator
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
// NewRateLimiter creates a rate limiter that emits time events at a specified rate.
|
||||
// request_per specifies the number of requests allowed per time_scale duration.
|
||||
// time_scale specifies the duration over which the requests are allowed.
|
||||
func NewRateLimiter(request_per int, time_scale time.Duration) <-chan time.Time {
|
||||
rate_limit := make(chan time.Time, request_per)
|
||||
tickrate := time_scale / time.Duration(request_per)
|
||||
|
||||
for range request_per {
|
||||
rate_limit <- time.Now()
|
||||
}
|
||||
|
||||
go func() {
|
||||
for t := range time.Tick(tickrate) {
|
||||
rate_limit <- t
|
||||
}
|
||||
}()
|
||||
|
||||
return rate_limit
|
||||
}
|
||||
|
||||
// WorkUnit represents a unit of work to be processed.
|
||||
type WorkUnit[T, S any] struct {
|
||||
argument T // Argument to be processed by a worker.
|
||||
result S // Result of the processing.
|
||||
err error // Error encountered during processing.
|
||||
timeout time.Duration // Timeout for the work unit.
|
||||
attempts uint8 // Number of attempts made to process the work unit.
|
||||
}
|
||||
|
||||
// Work defines a function type that processes a value of type T and returns a result of type S or an error.
|
||||
type Work[T, S any] func(ctx context.Context, value T) (S, error)
|
||||
|
||||
// Worker represents a worker that processes tasks of type T and returns results of type S.
|
||||
type Worker[T, S any] struct {
|
||||
id uint8 // Unique identifier of the worker.
|
||||
receptor <-chan WorkUnit[T, S] // Channel from which the worker receives tasks.
|
||||
transmitter chan<- WorkUnit[T, S] // Channel to which the worker sends results.
|
||||
wg *sync.WaitGroup // Wait group to synchronize the completion of tasks. should be decremented each time a task has been processed
|
||||
work Work[T, S] // Function that processes tasks.
|
||||
rate_limit <-chan time.Time // Ticker to limit the amount of request. Is recomended to pass the result of calling NewRateLimiter().
|
||||
timeout time.Duration // Maximum execution time allowed for a task before beign canceled.
|
||||
}
|
||||
|
||||
// WorkConfig represents the configuration for the work processing.
|
||||
// All fields are optional and have sensible defaults:
|
||||
//
|
||||
// - tasks_processed: always a new WaitGroup
|
||||
// - max_workers: default is 5
|
||||
// - amount_of_workers: if 0, retries are disabled
|
||||
// - base_retry_time: default is 1 second
|
||||
// - rate_limit: default is 10 requests per second
|
||||
// - timeout: if 0, timeout is disabled
|
||||
type WorkConfig struct {
|
||||
tasks_processed sync.WaitGroup // Wait group to synchronize task completion.
|
||||
amount_of_workers uint8 // Number of workers to spawn.
|
||||
max_retries uint8 // Maximum number of retries for a task before beign cancelled.
|
||||
base_retry_time time.Duration // Base factor to wait for before retrying a task.
|
||||
rate_limit <-chan time.Time // Ticker to limit the amount of request. Is recomended to pass the result of calling NewRateLimiter().
|
||||
timeout time.Duration // Maximum execution time allowed for a task before beign canceled.
|
||||
}
|
||||
|
||||
// Group the channels used for task processing for easy access between functions.
|
||||
type Channels[T, S any] struct {
|
||||
tasks_queue chan T // Channel for incoming tasks.
|
||||
tasks_done chan S // Channel for completed tasks.
|
||||
tasks_failed chan error // Channel for failed tasks.
|
||||
units_dispatcher chan WorkUnit[T, S] // Channel for dispatching work units.
|
||||
units_receiver chan WorkUnit[T, S] // Channel for receiving processed work units.
|
||||
}
|
||||
|
||||
// Starts a worker that processes work units received from the worker's receptor channel.
|
||||
// It waits for rate limits, processes the work unit, and sends the result to the worker's transmitter channel.
|
||||
// It also applies rate limit if is enabled.
|
||||
//
|
||||
// The function stops processing when the receptor channel is closed.
|
||||
//
|
||||
// Parameters:
|
||||
// - ctx: The context to control the cancellation of the worker.
|
||||
// - worker: The worker that processes the work units.
|
||||
func spawn_worker[T, S any](ctx context.Context, worker *Worker[T, S]) {
|
||||
for workUnit := range worker.receptor {
|
||||
// Wait for rate-limit
|
||||
<-worker.rate_limit
|
||||
|
||||
var workCtx context.Context
|
||||
var cancel context.CancelFunc
|
||||
|
||||
if worker.timeout != 0 {
|
||||
workCtx, cancel = context.WithTimeout(ctx, worker.timeout)
|
||||
} else {
|
||||
workCtx, cancel = context.WithCancel(ctx)
|
||||
}
|
||||
|
||||
done := make(chan struct{})
|
||||
|
||||
var value S
|
||||
var err error
|
||||
|
||||
go func() {
|
||||
value, err = worker.work(ctx, workUnit.argument)
|
||||
close(done)
|
||||
}()
|
||||
|
||||
select {
|
||||
case <-done:
|
||||
workUnit.result = value
|
||||
workUnit.err = err
|
||||
case <-workCtx.Done():
|
||||
workUnit.err = workCtx.Err()
|
||||
}
|
||||
|
||||
cancel()
|
||||
|
||||
worker.transmitter <- workUnit
|
||||
}
|
||||
}
|
||||
|
||||
// handleFailedWorkUnit handles a failed work unit by retrying it or marking it as failed.
|
||||
// If the maximum number of retries is reached, the work unit is marked as failed and no further retries are attempted.
|
||||
// Otherwise, the work unit is retried with an exponential backoff and jitter.
|
||||
//
|
||||
// Parameters:
|
||||
// - workUnit: The work unit that failed.
|
||||
// - channels: The channels used for communication between different parts of the system.
|
||||
// - config: The configuration for the work unit.
|
||||
//
|
||||
// Returns:
|
||||
// - A boolean indicating whether the work unit will be retried (true) or not (false).
|
||||
func handleFailedWorkUnit[T, S any](
|
||||
workUnit *WorkUnit[T, S],
|
||||
channels *Channels[T, S],
|
||||
config *WorkConfig,
|
||||
) bool {
|
||||
// If retries == 0, retries are disabled, return immediately
|
||||
if config.max_retries == 0 || config.max_retries <= workUnit.attempts {
|
||||
channels.tasks_failed <- workUnit.err
|
||||
config.tasks_processed.Done()
|
||||
return false
|
||||
}
|
||||
|
||||
workUnit.attempts++
|
||||
workUnit.err = nil
|
||||
|
||||
if workUnit.timeout == 0 {
|
||||
workUnit.timeout = config.base_retry_time
|
||||
} else {
|
||||
workUnit.timeout *= 2
|
||||
}
|
||||
|
||||
go func() {
|
||||
jitter := time.Duration(rand.Int63n(int64(workUnit.timeout)))
|
||||
timeout := workUnit.timeout + jitter
|
||||
fmt.Printf(
|
||||
"Unit with value %v failed for %v time, retrying in: %v\n",
|
||||
workUnit.argument,
|
||||
workUnit.attempts,
|
||||
timeout,
|
||||
)
|
||||
time.Sleep(timeout)
|
||||
channels.units_dispatcher <- *workUnit
|
||||
}()
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
// Listens for work results from the units_receiver channel and processes them.
|
||||
// It handles failed work units and sends successful results to the tasks_done channel.
|
||||
// The function stops processing when the context is done or when the units_receiver channel is closed.
|
||||
//
|
||||
// Parameters:
|
||||
// - ctx: The context to control the cancellation of the listener.
|
||||
// - channels: The channels used for communication between different parts of the system.
|
||||
// - config: The configuration for the work listener.
|
||||
func listenForWorkResults[T, S any](
|
||||
ctx context.Context,
|
||||
channels *Channels[T, S],
|
||||
config *WorkConfig,
|
||||
) {
|
||||
for {
|
||||
select {
|
||||
case workUnit, ok := <-channels.units_receiver:
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
|
||||
if workUnit.err != nil {
|
||||
handleFailedWorkUnit(&workUnit, channels, config)
|
||||
continue
|
||||
}
|
||||
|
||||
// Send message to user
|
||||
channels.tasks_done <- workUnit.result
|
||||
config.tasks_processed.Done()
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// workUnitDispatcher is responsible for dispatching work units to the units_dispatcher channel.
|
||||
// It listens for tasks from the tasks_queue channel and sends them to the units_dispatcher channel.
|
||||
// The function stops processing work when the context is done or when the tasks_queue channel is closed.
|
||||
//
|
||||
// Parameters:
|
||||
// - ctx: The context to control the cancellation of the dispatcher.
|
||||
// - finish: The cancel function to stop the dispatcher.
|
||||
// - channels: The channels used for communication between different parts of the system.
|
||||
// - config: The configuration for the work dispatcher.
|
||||
func workUnitDispatcher[T, S any](
|
||||
ctx context.Context,
|
||||
finish context.CancelFunc,
|
||||
channels *Channels[T, S],
|
||||
config *WorkConfig,
|
||||
) {
|
||||
defer stopProcessingWork(finish, channels, config)
|
||||
|
||||
for {
|
||||
select {
|
||||
case value, ok := <-channels.tasks_queue:
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
|
||||
workUnit := WorkUnit[T, S]{
|
||||
argument: value,
|
||||
timeout: 0,
|
||||
attempts: 0,
|
||||
}
|
||||
channels.units_dispatcher <- workUnit
|
||||
config.tasks_processed.Add(1)
|
||||
|
||||
case <-ctx.Done():
|
||||
fmt.Println("context done")
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Stops the processing of work by closing all channels and calling the finish function.
|
||||
// It waits for all tasks to be processed before closing the channels.
|
||||
//
|
||||
// Parameters:
|
||||
// - finish: The context.CancelFunc to propagate the stop signal.
|
||||
// - channels: The channels used for communication between different parts of the system.
|
||||
// - config: The configuration for the work processing.
|
||||
func stopProcessingWork[T, S any](
|
||||
finish context.CancelFunc,
|
||||
channels *Channels[T, S],
|
||||
config *WorkConfig,
|
||||
) {
|
||||
config.tasks_processed.Wait()
|
||||
|
||||
close(channels.units_receiver)
|
||||
close(channels.units_dispatcher)
|
||||
close(channels.tasks_done)
|
||||
close(channels.tasks_failed)
|
||||
|
||||
finish()
|
||||
}
|
||||
|
||||
// asyncTaskRunner runs tasks asynchronously using a pool of workers.
|
||||
// It sets default values for the WorkConfig if not provided, creates channels for communication,
|
||||
// and starts the workers and dispatcher.
|
||||
//
|
||||
// Parameters:
|
||||
// - ctx: The context to control the cancellation of the task runner.
|
||||
// - inbound: The channel from which tasks are received.
|
||||
// - config: The configuration for the task runner.
|
||||
// - work: The work function to be executed by the workers.
|
||||
//
|
||||
// Returns:
|
||||
// - A channel that receives the results of the tasks.
|
||||
// - A channel that receives errors from the tasks.
|
||||
// - A channel that signals when the task runner is done.
|
||||
func asyncTaskRunner[T, S any](
|
||||
ctx context.Context,
|
||||
inbound chan T,
|
||||
config *WorkConfig,
|
||||
work Work[T, S],
|
||||
) (<-chan S, <-chan error, <-chan struct{}) {
|
||||
// Set default values for WorkConfig if not provided
|
||||
if config.amount_of_workers == 0 {
|
||||
config.amount_of_workers = 5
|
||||
}
|
||||
if config.base_retry_time == 0 {
|
||||
config.base_retry_time = 1 * time.Second
|
||||
}
|
||||
if config.rate_limit == nil {
|
||||
config.rate_limit = NewRateLimiter(10, time.Second)
|
||||
}
|
||||
|
||||
// Ensure a clean wait group is ussed
|
||||
config.tasks_processed = sync.WaitGroup{}
|
||||
|
||||
channel_size := config.amount_of_workers * 3
|
||||
|
||||
done, finish := context.WithCancel(ctx)
|
||||
|
||||
channels := &Channels[T, S]{
|
||||
tasks_queue: inbound,
|
||||
tasks_done: make(chan S),
|
||||
tasks_failed: make(chan error),
|
||||
units_dispatcher: make(chan WorkUnit[T, S], channel_size),
|
||||
units_receiver: make(chan WorkUnit[T, S], channel_size),
|
||||
}
|
||||
|
||||
// create pool of workers
|
||||
for i := range config.amount_of_workers {
|
||||
worker := &Worker[T, S]{
|
||||
id: uint8(i),
|
||||
receptor: channels.units_dispatcher,
|
||||
transmitter: channels.units_receiver,
|
||||
rate_limit: config.rate_limit,
|
||||
timeout: config.timeout,
|
||||
work: work,
|
||||
}
|
||||
|
||||
go spawn_worker(ctx, worker)
|
||||
}
|
||||
|
||||
go listenForWorkResults(done, channels, config)
|
||||
go workUnitDispatcher(done, finish, channels, config)
|
||||
return channels.tasks_done, channels.tasks_failed, done.Done()
|
||||
}
|
||||
Reference in a new issue