feat: add rate limit to worker pool

This commit is contained in:
Alexander Navarro 2024-11-27 13:46:42 -03:00
parent 30cf6dfff2
commit 2d1041f167
2 changed files with 34 additions and 3 deletions

View file

@ -4,6 +4,7 @@ import (
"fmt" "fmt"
"iter" "iter"
"sync" "sync"
"time"
) )
// Fetcher is the concurrent manager // Fetcher is the concurrent manager
@ -35,6 +36,23 @@ var StartPagination = Pagination{
Offset: 0, Offset: 0,
} }
func NewRateLimit(request_per int, time_scale time.Duration) <-chan time.Time {
rate_limit := make(chan time.Time, request_per)
tickrate := time_scale / time.Duration(request_per)
for range request_per {
rate_limit <- time.Now()
}
go func() {
for t := range time.Tick(tickrate) {
rate_limit <- t
}
}()
return rate_limit
}
// Work represents a function that processes a value of type S and returns a // Work represents a function that processes a value of type S and returns a
// result of type T or an error. // result of type T or an error.
type Work[T, S any] func(value T) (S, error) type Work[T, S any] func(value T) (S, error)
@ -47,6 +65,7 @@ type Worker[T, S any] struct {
transmiter chan<- S // transmiter is the channel to which the worker sends results. transmiter chan<- S // transmiter is the channel to which the worker sends results.
wg *sync.WaitGroup // wg is the wait group to synchronize the completion of tasks. wg *sync.WaitGroup // wg is the wait group to synchronize the completion of tasks.
work Work[T, S] // work is the function that processes tasks. work Work[T, S] // work is the function that processes tasks.
rate_limit <-chan time.Time
} }
type WorkerManager[T, S any] struct { type WorkerManager[T, S any] struct {
@ -87,7 +106,7 @@ func (manager *WorkerManager[T, S]) GetWorkUnit() iter.Seq[S] {
return func(yield func(S) bool) { return func(yield func(S) bool) {
for { for {
// TODO: handle tiemouts and rate-limits // TODO: handle tiemouts
select { select {
case value := <-manager.workers_transmiter: case value := <-manager.workers_transmiter:
if !yield(value) { if !yield(value) {
@ -106,12 +125,19 @@ func spawn_worker[T, S any](worker *Worker[T, S]) {
// TODO: handle errors // TODO: handle errors
for unit := range worker.receptor { for unit := range worker.receptor {
// Wait for rate-limit
<-worker.rate_limit
value, _ := worker.work(unit) value, _ := worker.work(unit)
worker.transmiter <- value worker.transmiter <- value
} }
} }
func createWorkerPool[T, S any](max_workers uint8, work Work[T, S]) *WorkerManager[T, S] { func createWorkerPool[T, S any](
max_workers uint8,
rate_limit <-chan time.Time,
work Work[T, S],
) *WorkerManager[T, S] {
channel_size := max_workers * 3 channel_size := max_workers * 3
manager := &WorkerManager[T, S]{ manager := &WorkerManager[T, S]{
@ -125,6 +151,7 @@ func createWorkerPool[T, S any](max_workers uint8, work Work[T, S]) *WorkerManag
id: uint8(i), id: uint8(i),
receptor: manager.workers_receptor, receptor: manager.workers_receptor,
transmiter: manager.workers_transmiter, transmiter: manager.workers_transmiter,
rate_limit: rate_limit,
wg: &manager.active_workers, wg: &manager.active_workers,
work: work, work: work,
} }

View file

@ -3,6 +3,7 @@ package synchronizator
import ( import (
"fmt" "fmt"
"slices" "slices"
"time"
) )
// Utility struct to represent a collection of nodes, it's a [Node] itself so all // Utility struct to represent a collection of nodes, it's a [Node] itself so all
@ -27,7 +28,10 @@ func (platform *Platform) FetchCollections(fetcher Fetcher, start_pagination Pag
return collections, nil return collections, nil
} }
manager := createWorkerPool[int, []*Collection](5, fetchWrapper) // 5 request per minute
rate_limit := NewRateLimit(5, time.Minute)
manager := createWorkerPool[int, []*Collection](5, rate_limit, fetchWrapper)
// TODO: get number of page dynamically and change Fetcher signature // TODO: get number of page dynamically and change Fetcher signature
pages := 4 pages := 4