diff --git a/pkg/fetcher.go b/pkg/fetcher.go index 6473a7c..5b046ec 100644 --- a/pkg/fetcher.go +++ b/pkg/fetcher.go @@ -4,6 +4,7 @@ import ( "fmt" "iter" "sync" + "time" ) // Fetcher is the concurrent manager @@ -35,6 +36,23 @@ var StartPagination = Pagination{ Offset: 0, } +func NewRateLimit(request_per int, time_scale time.Duration) <-chan time.Time { + rate_limit := make(chan time.Time, request_per) + tickrate := time_scale / time.Duration(request_per) + + for range request_per { + rate_limit <- time.Now() + } + + go func() { + for t := range time.Tick(tickrate) { + rate_limit <- t + } + }() + + return rate_limit +} + // Work represents a function that processes a value of type S and returns a // result of type T or an error. type Work[T, S any] func(value T) (S, error) @@ -47,6 +65,7 @@ type Worker[T, S any] struct { transmiter chan<- S // transmiter is the channel to which the worker sends results. wg *sync.WaitGroup // wg is the wait group to synchronize the completion of tasks. work Work[T, S] // work is the function that processes tasks. + rate_limit <-chan time.Time } type WorkerManager[T, S any] struct { @@ -87,7 +106,7 @@ func (manager *WorkerManager[T, S]) GetWorkUnit() iter.Seq[S] { return func(yield func(S) bool) { for { - // TODO: handle tiemouts and rate-limits + // TODO: handle tiemouts select { case value := <-manager.workers_transmiter: if !yield(value) { @@ -106,12 +125,19 @@ func spawn_worker[T, S any](worker *Worker[T, S]) { // TODO: handle errors for unit := range worker.receptor { + // Wait for rate-limit + <-worker.rate_limit + value, _ := worker.work(unit) worker.transmiter <- value } } -func createWorkerPool[T, S any](max_workers uint8, work Work[T, S]) *WorkerManager[T, S] { +func createWorkerPool[T, S any]( + max_workers uint8, + rate_limit <-chan time.Time, + work Work[T, S], +) *WorkerManager[T, S] { channel_size := max_workers * 3 manager := &WorkerManager[T, S]{ @@ -125,6 +151,7 @@ func createWorkerPool[T, S any](max_workers uint8, work Work[T, S]) *WorkerManag id: uint8(i), receptor: manager.workers_receptor, transmiter: manager.workers_transmiter, + rate_limit: rate_limit, wg: &manager.active_workers, work: work, } diff --git a/pkg/platform.go b/pkg/platform.go index 2931ae2..12aec74 100644 --- a/pkg/platform.go +++ b/pkg/platform.go @@ -3,6 +3,7 @@ package synchronizator import ( "fmt" "slices" + "time" ) // Utility struct to represent a collection of nodes, it's a [Node] itself so all @@ -27,7 +28,10 @@ func (platform *Platform) FetchCollections(fetcher Fetcher, start_pagination Pag return collections, nil } - manager := createWorkerPool[int, []*Collection](5, fetchWrapper) + // 5 request per minute + rate_limit := NewRateLimit(5, time.Minute) + + manager := createWorkerPool[int, []*Collection](5, rate_limit, fetchWrapper) // TODO: get number of page dynamically and change Fetcher signature pages := 4