diff --git a/examples/usage.go b/examples/usage.go index f39f434..31c510a 100644 --- a/examples/usage.go +++ b/examples/usage.go @@ -85,7 +85,6 @@ func getPokedexs( // fmt.Println(data) payload.Offset += pagination.Limit - payload.HasMore = data.Next != "" payload.Total = data.Count payload.Response = collections @@ -166,9 +165,19 @@ func main() { return } - pagination := synchronizator.StartPagination + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() - err = pokeApi.FetchCollections(getPokedexs, pagination) + pagination := synchronizator.StartPagination + pool_config := &synchronizator.WorkConfig{ + AmountOfWorkers: 5, + MaxRetries: 3, + BaseRetryTime: time.Second * 2, + RateLimit: synchronizator.NewRateLimiter(5, time.Minute), + Timeout: time.Second * 2, + } + + err = pokeApi.FetchCollections(ctx, getPokedexs, pagination, pool_config) if err != nil { fmt.Println(err) return diff --git a/pkg/collection.go b/pkg/collection.go index 59fc476..b97ed22 100644 --- a/pkg/collection.go +++ b/pkg/collection.go @@ -89,16 +89,12 @@ func (collection *Collection) IsDefault() bool { type NodeFetcher = func(metadata []byte, pagination Pagination) ([]*Node, Pagination, error) func (collection *Collection) FetchNodes(fetcher NodeFetcher, start_pagination Pagination) error { - nodes, pagination, err := fetcher(collection.GetMetadata(), start_pagination) + nodes, _, err := fetcher(collection.GetMetadata(), start_pagination) if err != nil { return err } collection.childs = slices.Concat(collection.childs, nodes) - if pagination.HasMore { - return collection.FetchNodes(fetcher, pagination) - } - err = BulkCreateNode(collection._conn, collection.childs) if err != nil { return err diff --git a/pkg/fetcher.go b/pkg/fetcher.go index 7d67fda..058a43d 100644 --- a/pkg/fetcher.go +++ b/pkg/fetcher.go @@ -1,45 +1,62 @@ package synchronizator -import "fmt" +import ( + "context" + "fmt" + "slices" +) -// StartPagination is the initial pagination configuration. +// The initial pagination configuration. var StartPagination = Pagination{ - Total: 0, - Pages: 0, - HasMore: false, - Limit: 10, - Offset: 0, + Total: 0, + Pages: 0, + Limit: 10, + Offset: 0, } +// Represents the response from a fetch operation. +// It includes pagination information and the actual response data. type FetchResponse[T any] struct { Pagination - Response T + Response T // The actual response data. } -// Pagination represents the pagination information for fetching data. +// Represents the pagination information for fetching data. type Pagination struct { - Total uint64 // Total number of items. - Pages uint64 - HasMore bool // Indicates if there are more items to fetch. - Limit uint64 // Number of items to fetch per request. - Offset uint64 // Offset for the next set of items to fetch. + Total uint64 // Total number of items. + Pages uint64 // Total number of pages. + Limit uint64 // Number of items to fetch per request. + Offset uint64 // Offset for the next set of items to fetch. } +// Calculates the number of pages based on the total items, offset, and limit. +// It returns the calculated number of pages and an error if any. +// +// Parameters: +// - pagination: The pagination settings. +// - offset: The offset value. +// +// Returns: +// - uint64: The calculated number of pages. +// - error: The error if any occurred. func calculatePages(pagination *Pagination, offset uint64) (uint64, error) { if pagination.Limit == 0 { - return 0, fmt.Errorf("division by zero") + return 0, fmt.Errorf("The limit cannot be 0 to calculate the pages") } - dividend := pagination.Total - offset - divisor := pagination.Limit - - result := dividend / divisor - if dividend%divisor != 0 { - result++ - } + result := (pagination.Total - offset) / pagination.Limit return result, nil } +// Calculates the page number based on the offset and pagination settings. +// It returns the calculated page number and an error if any. +// +// Parameters: +// - pagination: The pagination settings. +// +// Returns: +// - uint64: The calculated page number. +// - error: The error if any occurred. func getPageByOffset(pagination *Pagination) (uint64, error) { if pagination.Pages == 0 { return 0, fmt.Errorf("division by zero") @@ -51,3 +68,135 @@ func getPageByOffset(pagination *Pagination) (uint64, error) { return result, nil } + +// Fetches data with pagination support. +// It returns a slice of fetched data and an error if any. +// +// The pagination is gonna try to guess the total amount of pages if not +// provided by executing a first fetch and then calculating with the +// pagination.Total value returned by the fetcher function. +// +// Parameters: +// - ctx: The context to control cancellation. +// - pool_config: The configuration for the worker pool. +// - fetcher: The fetcher function to execute the work. +// - start_pagination: The initial pagination settings. +// +// Returns: +// - []T: A slice of fetched data. +// - error: The error if any occurred. +func fetchWithPagination[T any]( + ctx context.Context, + pool_config *WorkConfig, + fetcher Work[Pagination, FetchResponse[[]T]], + start_pagination Pagination, +) ([]T, error) { + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + tasks := make(chan Pagination) + values := make([]T, 0) + + results, errors, doneChannel := asyncTaskRunner( + ctx, + tasks, + pool_config, + fetcher, + ) + + var current_page uint64 = 0 + + if start_pagination.Pages == 0 { + tasks <- start_pagination + + response, _, err := waitForFetchResponse(ctx, results, errors, nil) + if err != nil { + return nil, err + } + + values = slices.Concat(values, response.Response) + + start_pagination.Total = response.Pagination.Total + pages, err := calculatePages(&response.Pagination, start_pagination.Offset) + if err != nil { + return nil, err + } + start_pagination.Pages = pages + current_page++ + } + + page_offset, err := getPageByOffset(&start_pagination) + if err != nil { + return nil, err + } + + current_page += page_offset + + for current_page <= start_pagination.Pages { + fmt.Printf("Total pages: %v, Current page: %v\n", start_pagination.Pages, current_page) + page := start_pagination + page.Offset = current_page * page.Limit + tasks <- page + current_page++ + } + + close(tasks) + + for { + response, isWorkDone, err := waitForFetchResponse(ctx, results, errors, doneChannel) + if err != nil { + return nil, err + } + + if isWorkDone { + break + } + + if response == nil { + continue + } + + values = slices.Concat(values, response.Response) + } + + return values, nil +} + +// Waits for a fetch response from the provided channels. +// It returns the fetch response, a boolean indicating if the work is done, and an error if any. +// +// Parameters: +// - ctx: The context to control cancellation. +// - results: A channel to receive fetch responses. +// - errors: A channel to receive errors. +// - done: A channel to signal completion. +// +// Returns: +// - *FetchResponse[[]T]: The fetch response if available. +// - bool: True if the work is done or the operation was cancelled. +// - error: The error if any occurred. +func waitForFetchResponse[T any]( + ctx context.Context, + results <-chan FetchResponse[[]T], + errors <-chan error, + done <-chan struct{}, +) (*FetchResponse[[]T], bool, error) { + var values *FetchResponse[[]T] + select { + case response, ok := <-results: + if !ok { + return nil, false, nil + } + values = &response + case err, ok := <-errors: + if !ok { + return nil, false, nil + } + return nil, true, err + case <-ctx.Done(): + return nil, true, nil + case <-done: + return nil, true, nil + } + return values, false, nil +} diff --git a/pkg/platform.go b/pkg/platform.go index bf94b55..999e075 100644 --- a/pkg/platform.go +++ b/pkg/platform.go @@ -4,113 +4,41 @@ import ( "context" "fmt" "slices" - "time" ) -// Utility struct to represent a collection of nodes, it's a [Node] itself so all -// the node's functionality is available. +// Platform represents a collection of nodes. It embeds a Node, so all the +// node's functionality is available. type Platform struct { - Node // Underlaying node info + Node // Underlying node info Collections []*Collection // Child nodes } +// Is a type alias for FetchResponse containing a slice of Collection pointers. type FetchCollectionResponse = FetchResponse[[]*Collection] +// Fetches collections using the provided fetcher and pagination settings. +// It updates the platform's collections and creates relationships between the platform and the collections. +// +// Parameters: +// - ctx: The context to control cancellation. +// - fetcher: The fetcher function to execute the work. +// - start_pagination: The initial pagination settings. +// - pool_config: The configuration for the worker pool. +// +// Returns: +// - error: The error if any occurred. func (platform *Platform) FetchCollections( + ctx context.Context, fetcher Work[Pagination, FetchCollectionResponse], - start_pagination Pagination, + startPagination Pagination, + poolConfig *WorkConfig, ) error { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - config := &WorkConfig{ - amount_of_workers: 5, - max_retries: 2, - base_retry_time: time.Second, - rate_limit: NewRateLimiter(5, time.Minute), - timeout: time.Second * 2, - } - - tasks := make(chan Pagination) - - results, errors, done := asyncTaskRunner( - ctx, - tasks, - config, - fetcher, - ) - - var current_page uint64 = 0 - - if start_pagination.Pages == 0 { - // do the first fetch - tasks <- start_pagination - - select { - case response, ok := <-results: - if !ok { - break - } - platform.Collections = slices.Concat(platform.Collections, response.Response) - - pages, err := calculatePages(&response.Pagination, start_pagination.Offset) - if err != nil { - return err - } - start_pagination.Pages = pages - start_pagination.Total = response.Pagination.Total - current_page++ - - case error, ok := <-errors: - if !ok { - return fmt.Errorf("Could not do first fetch to calculate pages: %v\n", error) - } - - case <-ctx.Done(): - break - case <-done: - break - } - } - - page_offset, err := getPageByOffset(&start_pagination) + values, err := fetchWithPagination(ctx, poolConfig, fetcher, startPagination) if err != nil { return err } - current_page += page_offset - - fmt.Printf("Total pages: %v, Current page: %v\n", start_pagination.Pages, current_page) - - for current_page <= start_pagination.Pages { - page := start_pagination - page.Offset = current_page * page.Limit - tasks <- page - current_page++ - } - - close(tasks) - -loop: - for { - select { - case response, ok := <-results: - if !ok { - continue - } - platform.Collections = slices.Concat(platform.Collections, response.Response) - case error, ok := <-errors: - if !ok { - continue - } - - fmt.Printf("There was an error: %v\n", error) - case <-ctx.Done(): - break loop - case <-done: - break loop - } - } + platform.Collections = slices.Concat(platform.Collections, values) fmt.Printf("Collections: %v\n", len(platform.Collections)) diff --git a/pkg/workpool.go b/pkg/workpool.go index 2a3faf5..58d4cc1 100644 --- a/pkg/workpool.go +++ b/pkg/workpool.go @@ -61,12 +61,12 @@ type Worker[T, S any] struct { // - rate_limit: default is 10 requests per second // - timeout: if 0, timeout is disabled type WorkConfig struct { - tasks_processed sync.WaitGroup // Wait group to synchronize task completion. - amount_of_workers uint8 // Number of workers to spawn. - max_retries uint8 // Maximum number of retries for a task before beign cancelled. - base_retry_time time.Duration // Base factor to wait for before retrying a task. - rate_limit <-chan time.Time // Ticker to limit the amount of request. Is recomended to pass the result of calling NewRateLimiter(). - timeout time.Duration // Maximum execution time allowed for a task before beign canceled. + TasksProcessed sync.WaitGroup // Wait group to synchronize task completion. + AmountOfWorkers uint8 // Number of workers to spawn. + MaxRetries uint8 // Maximum number of retries for a task before beign cancelled. + BaseRetryTime time.Duration // Base factor to wait for before retrying a task. + RateLimit <-chan time.Time // Ticker to limit the amount of request. Is recomended to pass the result of calling NewRateLimiter(). + Timeout time.Duration // Maximum execution time allowed for a task before beign canceled. } // Group the channels used for task processing for easy access between functions. @@ -142,9 +142,9 @@ func handleFailedWorkUnit[T, S any]( config *WorkConfig, ) bool { // If retries == 0, retries are disabled, return immediately - if config.max_retries == 0 || config.max_retries <= workUnit.attempts { + if config.MaxRetries == 0 || config.MaxRetries <= workUnit.attempts { channels.tasks_failed <- workUnit.err - config.tasks_processed.Done() + config.TasksProcessed.Done() return false } @@ -152,7 +152,7 @@ func handleFailedWorkUnit[T, S any]( workUnit.err = nil if workUnit.timeout == 0 { - workUnit.timeout = config.base_retry_time + workUnit.timeout = config.BaseRetryTime } else { workUnit.timeout *= 2 } @@ -200,7 +200,7 @@ func listenForWorkResults[T, S any]( // Send message to user channels.tasks_done <- workUnit.result - config.tasks_processed.Done() + config.TasksProcessed.Done() case <-ctx.Done(): return } @@ -237,7 +237,7 @@ func workUnitDispatcher[T, S any]( attempts: 0, } channels.units_dispatcher <- workUnit - config.tasks_processed.Add(1) + config.TasksProcessed.Add(1) case <-ctx.Done(): fmt.Println("context done") @@ -258,7 +258,7 @@ func stopProcessingWork[T, S any]( channels *Channels[T, S], config *WorkConfig, ) { - config.tasks_processed.Wait() + config.TasksProcessed.Wait() close(channels.units_receiver) close(channels.units_dispatcher) @@ -289,20 +289,20 @@ func asyncTaskRunner[T, S any]( work Work[T, S], ) (<-chan S, <-chan error, <-chan struct{}) { // Set default values for WorkConfig if not provided - if config.amount_of_workers == 0 { - config.amount_of_workers = 5 + if config.AmountOfWorkers == 0 { + config.AmountOfWorkers = 5 } - if config.base_retry_time == 0 { - config.base_retry_time = 1 * time.Second + if config.BaseRetryTime == 0 { + config.BaseRetryTime = 1 * time.Second } - if config.rate_limit == nil { - config.rate_limit = NewRateLimiter(10, time.Second) + if config.RateLimit == nil { + config.RateLimit = NewRateLimiter(10, time.Second) } // Ensure a clean wait group is ussed - config.tasks_processed = sync.WaitGroup{} + config.TasksProcessed = sync.WaitGroup{} - channel_size := config.amount_of_workers * 3 + channel_size := config.AmountOfWorkers * 3 done, finish := context.WithCancel(ctx) @@ -315,13 +315,13 @@ func asyncTaskRunner[T, S any]( } // create pool of workers - for i := range config.amount_of_workers { + for i := range config.AmountOfWorkers { worker := &Worker[T, S]{ id: uint8(i), receptor: channels.units_dispatcher, transmitter: channels.units_receiver, - rate_limit: config.rate_limit, - timeout: config.timeout, + rate_limit: config.RateLimit, + timeout: config.Timeout, work: work, }