feat: update getNodes from collection to use worker pool

This commit is contained in:
Alexander Navarro 2024-12-01 12:26:28 -03:00
parent 26b678b348
commit f58db2ecaa
2 changed files with 86 additions and 41 deletions

View file

@ -93,17 +93,24 @@ func getPokedexs(
} }
func getPokemons( func getPokemons(
metadata []byte, ctx context.Context,
pagination synchronizator.Pagination, pagination synchronizator.Pagination,
) ([]*synchronizator.Node, synchronizator.Pagination, error) { ) (synchronizator.FetchNodesResponse, error) {
var nodes []*synchronizator.Node payload := synchronizator.FetchNodesResponse{
Pagination: pagination,
}
pokedex := &Pokedex{} var pokemons []*synchronizator.Node
json.Unmarshal(metadata, pokedex)
pokedex, ok := ctx.Value("pokedex").(*Pokedex)
if !ok {
return payload, fmt.Errorf("Couldn't retreive pokedex from context!")
}
resp, err := http.Get(pokedex.Url) resp, err := http.Get(pokedex.Url)
if err != nil { if err != nil {
return nil, pagination, err return payload, err
} }
body, err := io.ReadAll(resp.Body) body, err := io.ReadAll(resp.Body)
@ -112,23 +119,23 @@ func getPokemons(
var data PokeApiPokedexResponse var data PokeApiPokedexResponse
err = json.Unmarshal(body, &data) err = json.Unmarshal(body, &data)
if err != nil { if err != nil {
return nil, pagination, err return payload, err
} }
nodes = make([]*synchronizator.Node, 0, len(data.Pokemons)) pokemons = make([]*synchronizator.Node, 0, len(data.Pokemons))
for _, pokemon := range data.Pokemons { for _, pokemon := range data.Pokemons {
metadata, err := json.Marshal(pokemon) metadata, err := json.Marshal(pokemon)
node := synchronizator.NewNode(pokemon.PokemonSpecies.Name, metadata) node := synchronizator.NewNode(pokemon.PokemonSpecies.Name, metadata)
if err != nil { if err != nil {
return nil, pagination, err return payload, err
} }
nodes = append(nodes, node) pokemons = append(pokemons, node)
} }
// fmt.Println(data) payload.Response = pokemons
return nodes, pagination, nil return payload, nil
} }
func main() { func main() {
@ -171,7 +178,7 @@ func main() {
pagination := synchronizator.StartPagination pagination := synchronizator.StartPagination
pool_config := &synchronizator.WorkConfig{ pool_config := &synchronizator.WorkConfig{
AmountOfWorkers: 5, AmountOfWorkers: 5,
MaxRetries: 3, MaxRetries: 1,
BaseRetryTime: time.Second * 2, BaseRetryTime: time.Second * 2,
RateLimit: synchronizator.NewRateLimiter(5, time.Minute), RateLimit: synchronizator.NewRateLimiter(5, time.Minute),
Timeout: time.Second * 2, Timeout: time.Second * 2,
@ -183,15 +190,28 @@ func main() {
return return
} }
// for _, pokedex := range pokeApi.Collections { for _, collection := range pokeApi.Collections {
// if pokedex.IsDefault() { if collection.IsDefault() {
// continue continue
// } }
//
// err = pokedex.FetchNodes(getPokemons, synchronizator.StartPagination) pagination := synchronizator.StartPagination
// if err != nil { // It returns all the pokemons of 1 pokedex in a single request
// fmt.Println(err) pagination.Pages = 1
// return
// } pokedex := &Pokedex{}
// } err := json.Unmarshal(collection.GetMetadata(), pokedex)
if err != nil {
fmt.Println(err)
return
}
ctx := context.WithValue(ctx, "pokedex", pokedex)
err = collection.FetchNodes(ctx, getPokemons, pagination, pool_config)
if err != nil {
fmt.Println(err)
return
}
}
} }

View file

@ -1,6 +1,7 @@
package synchronizator package synchronizator
import ( import (
"context"
"fmt" "fmt"
"slices" "slices"
"strings" "strings"
@ -86,33 +87,57 @@ func (collection *Collection) IsDefault() bool {
return collection.is_default return collection.is_default
} }
type NodeFetcher = func(metadata []byte, pagination Pagination) ([]*Node, Pagination, error) // Is a type alias for FetchResponse containing a slice of Collection pointers.
type FetchNodesResponse = FetchResponse[[]*Node]
func (collection *Collection) FetchNodes(fetcher NodeFetcher, start_pagination Pagination) error { // Fetches collections using the provided fetcher and pagination settings.
nodes, _, err := fetcher(collection.GetMetadata(), start_pagination) // It updates the platform's collections and creates relationships between the platform and the collections.
if err != nil { //
return err // Parameters:
} // - ctx: The context to control cancellation.
collection.childs = slices.Concat(collection.childs, nodes) // - fetcher: The fetcher function to execute the work.
// - start_pagination: The initial pagination settings.
err = BulkCreateNode(collection._conn, collection.childs) // - pool_config: The configuration for the worker pool.
//
// Returns:
// - error: The error if any occurred.
func (collection *Collection) FetchNodes(
ctx context.Context,
fetcher Work[Pagination, FetchNodesResponse],
startPagination Pagination,
poolConfig *WorkConfig,
) error {
values, err := fetchWithPagination(ctx, poolConfig, fetcher, startPagination)
if err != nil { if err != nil {
return err return err
} }
for _, item := range collection.childs { collection.childs = slices.Concat(collection.childs, values)
err := collection.AddRelationship(
&Relationship{ fmt.Printf("Nodes: %v\n", len(collection.childs))
_class: "COLLECTION_HAS_NODE",
From: collection.Id, err = BulkCreateNode(collection._conn, values)
To: item.Id, if err != nil {
}) return err
}
relationships := make([]*Relationship, 0, len(values))
for _, item := range values {
relation := &Relationship{
_class: "COLLECTION_HAS_NODE",
From: collection.Id,
To: item.Id,
}
err := collection.AddRelationship(relation)
if err != nil { if err != nil {
return err return err
} }
relationships = append(relationships, relation)
} }
err = BulkCreateRelationships(collection._conn, collection._relationships) err = BulkCreateRelationships(collection._conn, relationships)
if err != nil { if err != nil {
return err return err
} }