This repository has been archived on 2025-05-15. You can view files and clone it, but you cannot make any changes to it's state, such as pushing and creating new issues, pull requests or comments.
synchronizator-go/pkg/fetcher.go

202 lines
4.9 KiB
Go

package synchronizator
import (
"context"
"fmt"
"slices"
)
// The initial pagination configuration.
var StartPagination = Pagination{
Total: 0,
Pages: 0,
Limit: 10,
Offset: 0,
}
// Represents the response from a fetch operation.
// It includes pagination information and the actual response data.
type FetchResponse[T any] struct {
Pagination
Response T // The actual response data.
}
// Represents the pagination information for fetching data.
type Pagination struct {
Total uint64 // Total number of items.
Pages uint64 // Total number of pages.
Limit uint64 // Number of items to fetch per request.
Offset uint64 // Offset for the next set of items to fetch.
}
// Calculates the number of pages based on the total items, offset, and limit.
// It returns the calculated number of pages and an error if any.
//
// Parameters:
// - pagination: The pagination settings.
// - offset: The offset value.
//
// Returns:
// - uint64: The calculated number of pages.
// - error: The error if any occurred.
func calculatePages(pagination *Pagination, offset uint64) (uint64, error) {
if pagination.Limit == 0 {
return 0, fmt.Errorf("The limit cannot be 0 to calculate the pages")
}
result := (pagination.Total - offset) / pagination.Limit
return result, nil
}
// Calculates the page number based on the offset and pagination settings.
// It returns the calculated page number and an error if any.
//
// Parameters:
// - pagination: The pagination settings.
//
// Returns:
// - uint64: The calculated page number.
// - error: The error if any occurred.
func getPageByOffset(pagination *Pagination) (uint64, error) {
if pagination.Pages == 0 {
return 0, fmt.Errorf("division by zero")
}
total := pagination.Pages * pagination.Limit
result := (pagination.Pages * pagination.Offset) / total
return result, nil
}
// Fetches data with pagination support.
// It returns a slice of fetched data and an error if any.
//
// The pagination is gonna try to guess the total amount of pages if not
// provided by executing a first fetch and then calculating with the
// pagination.Total value returned by the fetcher function.
//
// Parameters:
// - ctx: The context to control cancellation.
// - pool_config: The configuration for the worker pool.
// - fetcher: The fetcher function to execute the work.
// - start_pagination: The initial pagination settings.
//
// Returns:
// - []T: A slice of fetched data.
// - error: The error if any occurred.
func fetchWithPagination[T any](
ctx context.Context,
pool_config *WorkConfig,
fetcher Work[Pagination, FetchResponse[[]T]],
start_pagination Pagination,
) ([]T, error) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
tasks := make(chan Pagination)
values := make([]T, 0)
results, errors, doneChannel := asyncTaskRunner(
ctx,
tasks,
pool_config,
fetcher,
)
var current_page uint64 = 0
if start_pagination.Pages == 0 {
tasks <- start_pagination
response, _, err := waitForFetchResponse(ctx, results, errors, nil)
if err != nil {
return nil, err
}
values = slices.Concat(values, response.Response)
start_pagination.Total = response.Pagination.Total
pages, err := calculatePages(&response.Pagination, start_pagination.Offset)
if err != nil {
return nil, err
}
start_pagination.Pages = pages
current_page++
}
page_offset, err := getPageByOffset(&start_pagination)
if err != nil {
return nil, err
}
current_page += page_offset
for current_page <= start_pagination.Pages {
fmt.Printf("Total pages: %v, Current page: %v\n", start_pagination.Pages, current_page)
page := start_pagination
page.Offset = current_page * page.Limit
tasks <- page
current_page++
}
close(tasks)
for {
response, isWorkDone, err := waitForFetchResponse(ctx, results, errors, doneChannel)
if err != nil {
return nil, err
}
if isWorkDone {
break
}
if response == nil {
continue
}
values = slices.Concat(values, response.Response)
}
return values, nil
}
// Waits for a fetch response from the provided channels.
// It returns the fetch response, a boolean indicating if the work is done, and an error if any.
//
// Parameters:
// - ctx: The context to control cancellation.
// - results: A channel to receive fetch responses.
// - errors: A channel to receive errors.
// - done: A channel to signal completion.
//
// Returns:
// - *FetchResponse[[]T]: The fetch response if available.
// - bool: True if the work is done or the operation was cancelled.
// - error: The error if any occurred.
func waitForFetchResponse[T any](
ctx context.Context,
results <-chan FetchResponse[[]T],
errors <-chan error,
done <-chan struct{},
) (*FetchResponse[[]T], bool, error) {
var values *FetchResponse[[]T]
select {
case response, ok := <-results:
if !ok {
return nil, false, nil
}
values = &response
case err, ok := <-errors:
if !ok {
return nil, false, nil
}
return nil, true, err
case <-ctx.Done():
return nil, true, nil
case <-done:
return nil, true, nil
}
return values, false, nil
}