refactor: add separation of concerns in fetcher and platform

This commit is contained in:
Alexander Navarro 2024-11-30 18:14:04 -03:00
parent 96af51ee68
commit 4094f71a7d
5 changed files with 227 additions and 145 deletions

View file

@ -85,7 +85,6 @@ func getPokedexs(
// fmt.Println(data) // fmt.Println(data)
payload.Offset += pagination.Limit payload.Offset += pagination.Limit
payload.HasMore = data.Next != ""
payload.Total = data.Count payload.Total = data.Count
payload.Response = collections payload.Response = collections
@ -166,9 +165,19 @@ func main() {
return return
} }
pagination := synchronizator.StartPagination ctx, cancel := context.WithCancel(context.Background())
defer cancel()
err = pokeApi.FetchCollections(getPokedexs, pagination) pagination := synchronizator.StartPagination
pool_config := &synchronizator.WorkConfig{
AmountOfWorkers: 5,
MaxRetries: 3,
BaseRetryTime: time.Second * 2,
RateLimit: synchronizator.NewRateLimiter(5, time.Minute),
Timeout: time.Second * 2,
}
err = pokeApi.FetchCollections(ctx, getPokedexs, pagination, pool_config)
if err != nil { if err != nil {
fmt.Println(err) fmt.Println(err)
return return

View file

@ -89,16 +89,12 @@ func (collection *Collection) IsDefault() bool {
type NodeFetcher = func(metadata []byte, pagination Pagination) ([]*Node, Pagination, error) type NodeFetcher = func(metadata []byte, pagination Pagination) ([]*Node, Pagination, error)
func (collection *Collection) FetchNodes(fetcher NodeFetcher, start_pagination Pagination) error { func (collection *Collection) FetchNodes(fetcher NodeFetcher, start_pagination Pagination) error {
nodes, pagination, err := fetcher(collection.GetMetadata(), start_pagination) nodes, _, err := fetcher(collection.GetMetadata(), start_pagination)
if err != nil { if err != nil {
return err return err
} }
collection.childs = slices.Concat(collection.childs, nodes) collection.childs = slices.Concat(collection.childs, nodes)
if pagination.HasMore {
return collection.FetchNodes(fetcher, pagination)
}
err = BulkCreateNode(collection._conn, collection.childs) err = BulkCreateNode(collection._conn, collection.childs)
if err != nil { if err != nil {
return err return err

View file

@ -1,45 +1,62 @@
package synchronizator package synchronizator
import "fmt" import (
"context"
"fmt"
"slices"
)
// StartPagination is the initial pagination configuration. // The initial pagination configuration.
var StartPagination = Pagination{ var StartPagination = Pagination{
Total: 0, Total: 0,
Pages: 0, Pages: 0,
HasMore: false,
Limit: 10, Limit: 10,
Offset: 0, Offset: 0,
} }
// Represents the response from a fetch operation.
// It includes pagination information and the actual response data.
type FetchResponse[T any] struct { type FetchResponse[T any] struct {
Pagination Pagination
Response T Response T // The actual response data.
} }
// Pagination represents the pagination information for fetching data. // Represents the pagination information for fetching data.
type Pagination struct { type Pagination struct {
Total uint64 // Total number of items. Total uint64 // Total number of items.
Pages uint64 Pages uint64 // Total number of pages.
HasMore bool // Indicates if there are more items to fetch.
Limit uint64 // Number of items to fetch per request. Limit uint64 // Number of items to fetch per request.
Offset uint64 // Offset for the next set of items to fetch. Offset uint64 // Offset for the next set of items to fetch.
} }
// Calculates the number of pages based on the total items, offset, and limit.
// It returns the calculated number of pages and an error if any.
//
// Parameters:
// - pagination: The pagination settings.
// - offset: The offset value.
//
// Returns:
// - uint64: The calculated number of pages.
// - error: The error if any occurred.
func calculatePages(pagination *Pagination, offset uint64) (uint64, error) { func calculatePages(pagination *Pagination, offset uint64) (uint64, error) {
if pagination.Limit == 0 { if pagination.Limit == 0 {
return 0, fmt.Errorf("division by zero") return 0, fmt.Errorf("The limit cannot be 0 to calculate the pages")
} }
dividend := pagination.Total - offset result := (pagination.Total - offset) / pagination.Limit
divisor := pagination.Limit
result := dividend / divisor
if dividend%divisor != 0 {
result++
}
return result, nil return result, nil
} }
// Calculates the page number based on the offset and pagination settings.
// It returns the calculated page number and an error if any.
//
// Parameters:
// - pagination: The pagination settings.
//
// Returns:
// - uint64: The calculated page number.
// - error: The error if any occurred.
func getPageByOffset(pagination *Pagination) (uint64, error) { func getPageByOffset(pagination *Pagination) (uint64, error) {
if pagination.Pages == 0 { if pagination.Pages == 0 {
return 0, fmt.Errorf("division by zero") return 0, fmt.Errorf("division by zero")
@ -51,3 +68,135 @@ func getPageByOffset(pagination *Pagination) (uint64, error) {
return result, nil return result, nil
} }
// Fetches data with pagination support.
// It returns a slice of fetched data and an error if any.
//
// The pagination is gonna try to guess the total amount of pages if not
// provided by executing a first fetch and then calculating with the
// pagination.Total value returned by the fetcher function.
//
// Parameters:
// - ctx: The context to control cancellation.
// - pool_config: The configuration for the worker pool.
// - fetcher: The fetcher function to execute the work.
// - start_pagination: The initial pagination settings.
//
// Returns:
// - []T: A slice of fetched data.
// - error: The error if any occurred.
func fetchWithPagination[T any](
ctx context.Context,
pool_config *WorkConfig,
fetcher Work[Pagination, FetchResponse[[]T]],
start_pagination Pagination,
) ([]T, error) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
tasks := make(chan Pagination)
values := make([]T, 0)
results, errors, doneChannel := asyncTaskRunner(
ctx,
tasks,
pool_config,
fetcher,
)
var current_page uint64 = 0
if start_pagination.Pages == 0 {
tasks <- start_pagination
response, _, err := waitForFetchResponse(ctx, results, errors, nil)
if err != nil {
return nil, err
}
values = slices.Concat(values, response.Response)
start_pagination.Total = response.Pagination.Total
pages, err := calculatePages(&response.Pagination, start_pagination.Offset)
if err != nil {
return nil, err
}
start_pagination.Pages = pages
current_page++
}
page_offset, err := getPageByOffset(&start_pagination)
if err != nil {
return nil, err
}
current_page += page_offset
for current_page <= start_pagination.Pages {
fmt.Printf("Total pages: %v, Current page: %v\n", start_pagination.Pages, current_page)
page := start_pagination
page.Offset = current_page * page.Limit
tasks <- page
current_page++
}
close(tasks)
for {
response, isWorkDone, err := waitForFetchResponse(ctx, results, errors, doneChannel)
if err != nil {
return nil, err
}
if isWorkDone {
break
}
if response == nil {
continue
}
values = slices.Concat(values, response.Response)
}
return values, nil
}
// Waits for a fetch response from the provided channels.
// It returns the fetch response, a boolean indicating if the work is done, and an error if any.
//
// Parameters:
// - ctx: The context to control cancellation.
// - results: A channel to receive fetch responses.
// - errors: A channel to receive errors.
// - done: A channel to signal completion.
//
// Returns:
// - *FetchResponse[[]T]: The fetch response if available.
// - bool: True if the work is done or the operation was cancelled.
// - error: The error if any occurred.
func waitForFetchResponse[T any](
ctx context.Context,
results <-chan FetchResponse[[]T],
errors <-chan error,
done <-chan struct{},
) (*FetchResponse[[]T], bool, error) {
var values *FetchResponse[[]T]
select {
case response, ok := <-results:
if !ok {
return nil, false, nil
}
values = &response
case err, ok := <-errors:
if !ok {
return nil, false, nil
}
return nil, true, err
case <-ctx.Done():
return nil, true, nil
case <-done:
return nil, true, nil
}
return values, false, nil
}

View file

@ -4,113 +4,41 @@ import (
"context" "context"
"fmt" "fmt"
"slices" "slices"
"time"
) )
// Utility struct to represent a collection of nodes, it's a [Node] itself so all // Platform represents a collection of nodes. It embeds a Node, so all the
// the node's functionality is available. // node's functionality is available.
type Platform struct { type Platform struct {
Node // Underlaying node info Node // Underlying node info
Collections []*Collection // Child nodes Collections []*Collection // Child nodes
} }
// Is a type alias for FetchResponse containing a slice of Collection pointers.
type FetchCollectionResponse = FetchResponse[[]*Collection] type FetchCollectionResponse = FetchResponse[[]*Collection]
// Fetches collections using the provided fetcher and pagination settings.
// It updates the platform's collections and creates relationships between the platform and the collections.
//
// Parameters:
// - ctx: The context to control cancellation.
// - fetcher: The fetcher function to execute the work.
// - start_pagination: The initial pagination settings.
// - pool_config: The configuration for the worker pool.
//
// Returns:
// - error: The error if any occurred.
func (platform *Platform) FetchCollections( func (platform *Platform) FetchCollections(
ctx context.Context,
fetcher Work[Pagination, FetchCollectionResponse], fetcher Work[Pagination, FetchCollectionResponse],
start_pagination Pagination, startPagination Pagination,
poolConfig *WorkConfig,
) error { ) error {
ctx, cancel := context.WithCancel(context.Background()) values, err := fetchWithPagination(ctx, poolConfig, fetcher, startPagination)
defer cancel()
config := &WorkConfig{
amount_of_workers: 5,
max_retries: 2,
base_retry_time: time.Second,
rate_limit: NewRateLimiter(5, time.Minute),
timeout: time.Second * 2,
}
tasks := make(chan Pagination)
results, errors, done := asyncTaskRunner(
ctx,
tasks,
config,
fetcher,
)
var current_page uint64 = 0
if start_pagination.Pages == 0 {
// do the first fetch
tasks <- start_pagination
select {
case response, ok := <-results:
if !ok {
break
}
platform.Collections = slices.Concat(platform.Collections, response.Response)
pages, err := calculatePages(&response.Pagination, start_pagination.Offset)
if err != nil {
return err
}
start_pagination.Pages = pages
start_pagination.Total = response.Pagination.Total
current_page++
case error, ok := <-errors:
if !ok {
return fmt.Errorf("Could not do first fetch to calculate pages: %v\n", error)
}
case <-ctx.Done():
break
case <-done:
break
}
}
page_offset, err := getPageByOffset(&start_pagination)
if err != nil { if err != nil {
return err return err
} }
current_page += page_offset platform.Collections = slices.Concat(platform.Collections, values)
fmt.Printf("Total pages: %v, Current page: %v\n", start_pagination.Pages, current_page)
for current_page <= start_pagination.Pages {
page := start_pagination
page.Offset = current_page * page.Limit
tasks <- page
current_page++
}
close(tasks)
loop:
for {
select {
case response, ok := <-results:
if !ok {
continue
}
platform.Collections = slices.Concat(platform.Collections, response.Response)
case error, ok := <-errors:
if !ok {
continue
}
fmt.Printf("There was an error: %v\n", error)
case <-ctx.Done():
break loop
case <-done:
break loop
}
}
fmt.Printf("Collections: %v\n", len(platform.Collections)) fmt.Printf("Collections: %v\n", len(platform.Collections))

View file

@ -61,12 +61,12 @@ type Worker[T, S any] struct {
// - rate_limit: default is 10 requests per second // - rate_limit: default is 10 requests per second
// - timeout: if 0, timeout is disabled // - timeout: if 0, timeout is disabled
type WorkConfig struct { type WorkConfig struct {
tasks_processed sync.WaitGroup // Wait group to synchronize task completion. TasksProcessed sync.WaitGroup // Wait group to synchronize task completion.
amount_of_workers uint8 // Number of workers to spawn. AmountOfWorkers uint8 // Number of workers to spawn.
max_retries uint8 // Maximum number of retries for a task before beign cancelled. MaxRetries uint8 // Maximum number of retries for a task before beign cancelled.
base_retry_time time.Duration // Base factor to wait for before retrying a task. BaseRetryTime time.Duration // Base factor to wait for before retrying a task.
rate_limit <-chan time.Time // Ticker to limit the amount of request. Is recomended to pass the result of calling NewRateLimiter(). RateLimit <-chan time.Time // Ticker to limit the amount of request. Is recomended to pass the result of calling NewRateLimiter().
timeout time.Duration // Maximum execution time allowed for a task before beign canceled. Timeout time.Duration // Maximum execution time allowed for a task before beign canceled.
} }
// Group the channels used for task processing for easy access between functions. // Group the channels used for task processing for easy access between functions.
@ -142,9 +142,9 @@ func handleFailedWorkUnit[T, S any](
config *WorkConfig, config *WorkConfig,
) bool { ) bool {
// If retries == 0, retries are disabled, return immediately // If retries == 0, retries are disabled, return immediately
if config.max_retries == 0 || config.max_retries <= workUnit.attempts { if config.MaxRetries == 0 || config.MaxRetries <= workUnit.attempts {
channels.tasks_failed <- workUnit.err channels.tasks_failed <- workUnit.err
config.tasks_processed.Done() config.TasksProcessed.Done()
return false return false
} }
@ -152,7 +152,7 @@ func handleFailedWorkUnit[T, S any](
workUnit.err = nil workUnit.err = nil
if workUnit.timeout == 0 { if workUnit.timeout == 0 {
workUnit.timeout = config.base_retry_time workUnit.timeout = config.BaseRetryTime
} else { } else {
workUnit.timeout *= 2 workUnit.timeout *= 2
} }
@ -200,7 +200,7 @@ func listenForWorkResults[T, S any](
// Send message to user // Send message to user
channels.tasks_done <- workUnit.result channels.tasks_done <- workUnit.result
config.tasks_processed.Done() config.TasksProcessed.Done()
case <-ctx.Done(): case <-ctx.Done():
return return
} }
@ -237,7 +237,7 @@ func workUnitDispatcher[T, S any](
attempts: 0, attempts: 0,
} }
channels.units_dispatcher <- workUnit channels.units_dispatcher <- workUnit
config.tasks_processed.Add(1) config.TasksProcessed.Add(1)
case <-ctx.Done(): case <-ctx.Done():
fmt.Println("context done") fmt.Println("context done")
@ -258,7 +258,7 @@ func stopProcessingWork[T, S any](
channels *Channels[T, S], channels *Channels[T, S],
config *WorkConfig, config *WorkConfig,
) { ) {
config.tasks_processed.Wait() config.TasksProcessed.Wait()
close(channels.units_receiver) close(channels.units_receiver)
close(channels.units_dispatcher) close(channels.units_dispatcher)
@ -289,20 +289,20 @@ func asyncTaskRunner[T, S any](
work Work[T, S], work Work[T, S],
) (<-chan S, <-chan error, <-chan struct{}) { ) (<-chan S, <-chan error, <-chan struct{}) {
// Set default values for WorkConfig if not provided // Set default values for WorkConfig if not provided
if config.amount_of_workers == 0 { if config.AmountOfWorkers == 0 {
config.amount_of_workers = 5 config.AmountOfWorkers = 5
} }
if config.base_retry_time == 0 { if config.BaseRetryTime == 0 {
config.base_retry_time = 1 * time.Second config.BaseRetryTime = 1 * time.Second
} }
if config.rate_limit == nil { if config.RateLimit == nil {
config.rate_limit = NewRateLimiter(10, time.Second) config.RateLimit = NewRateLimiter(10, time.Second)
} }
// Ensure a clean wait group is ussed // Ensure a clean wait group is ussed
config.tasks_processed = sync.WaitGroup{} config.TasksProcessed = sync.WaitGroup{}
channel_size := config.amount_of_workers * 3 channel_size := config.AmountOfWorkers * 3
done, finish := context.WithCancel(ctx) done, finish := context.WithCancel(ctx)
@ -315,13 +315,13 @@ func asyncTaskRunner[T, S any](
} }
// create pool of workers // create pool of workers
for i := range config.amount_of_workers { for i := range config.AmountOfWorkers {
worker := &Worker[T, S]{ worker := &Worker[T, S]{
id: uint8(i), id: uint8(i),
receptor: channels.units_dispatcher, receptor: channels.units_dispatcher,
transmitter: channels.units_receiver, transmitter: channels.units_receiver,
rate_limit: config.rate_limit, rate_limit: config.RateLimit,
timeout: config.timeout, timeout: config.Timeout,
work: work, work: work,
} }