From f58db2ecaa19ded2cd7f8efd555e3600c28616d4 Mon Sep 17 00:00:00 2001 From: aleidk Date: Sun, 1 Dec 2024 12:26:28 -0300 Subject: [PATCH] feat: update getNodes from collection to use worker pool --- examples/usage.go | 68 ++++++++++++++++++++++++++++++----------------- pkg/collection.go | 59 ++++++++++++++++++++++++++++------------ 2 files changed, 86 insertions(+), 41 deletions(-) diff --git a/examples/usage.go b/examples/usage.go index 31c510a..90d89d5 100644 --- a/examples/usage.go +++ b/examples/usage.go @@ -93,17 +93,24 @@ func getPokedexs( } func getPokemons( - metadata []byte, + ctx context.Context, pagination synchronizator.Pagination, -) ([]*synchronizator.Node, synchronizator.Pagination, error) { - var nodes []*synchronizator.Node +) (synchronizator.FetchNodesResponse, error) { + payload := synchronizator.FetchNodesResponse{ + Pagination: pagination, + } - pokedex := &Pokedex{} - json.Unmarshal(metadata, pokedex) + var pokemons []*synchronizator.Node + + pokedex, ok := ctx.Value("pokedex").(*Pokedex) + + if !ok { + return payload, fmt.Errorf("Couldn't retreive pokedex from context!") + } resp, err := http.Get(pokedex.Url) if err != nil { - return nil, pagination, err + return payload, err } body, err := io.ReadAll(resp.Body) @@ -112,23 +119,23 @@ func getPokemons( var data PokeApiPokedexResponse err = json.Unmarshal(body, &data) if err != nil { - return nil, pagination, err + return payload, err } - nodes = make([]*synchronizator.Node, 0, len(data.Pokemons)) + pokemons = make([]*synchronizator.Node, 0, len(data.Pokemons)) for _, pokemon := range data.Pokemons { metadata, err := json.Marshal(pokemon) node := synchronizator.NewNode(pokemon.PokemonSpecies.Name, metadata) if err != nil { - return nil, pagination, err + return payload, err } - nodes = append(nodes, node) + pokemons = append(pokemons, node) } - // fmt.Println(data) + payload.Response = pokemons - return nodes, pagination, nil + return payload, nil } func main() { @@ -171,7 +178,7 @@ func main() { pagination := synchronizator.StartPagination pool_config := &synchronizator.WorkConfig{ AmountOfWorkers: 5, - MaxRetries: 3, + MaxRetries: 1, BaseRetryTime: time.Second * 2, RateLimit: synchronizator.NewRateLimiter(5, time.Minute), Timeout: time.Second * 2, @@ -183,15 +190,28 @@ func main() { return } - // for _, pokedex := range pokeApi.Collections { - // if pokedex.IsDefault() { - // continue - // } - // - // err = pokedex.FetchNodes(getPokemons, synchronizator.StartPagination) - // if err != nil { - // fmt.Println(err) - // return - // } - // } + for _, collection := range pokeApi.Collections { + if collection.IsDefault() { + continue + } + + pagination := synchronizator.StartPagination + // It returns all the pokemons of 1 pokedex in a single request + pagination.Pages = 1 + + pokedex := &Pokedex{} + err := json.Unmarshal(collection.GetMetadata(), pokedex) + if err != nil { + fmt.Println(err) + return + } + + ctx := context.WithValue(ctx, "pokedex", pokedex) + + err = collection.FetchNodes(ctx, getPokemons, pagination, pool_config) + if err != nil { + fmt.Println(err) + return + } + } } diff --git a/pkg/collection.go b/pkg/collection.go index b97ed22..a033936 100644 --- a/pkg/collection.go +++ b/pkg/collection.go @@ -1,6 +1,7 @@ package synchronizator import ( + "context" "fmt" "slices" "strings" @@ -86,33 +87,57 @@ func (collection *Collection) IsDefault() bool { return collection.is_default } -type NodeFetcher = func(metadata []byte, pagination Pagination) ([]*Node, Pagination, error) +// Is a type alias for FetchResponse containing a slice of Collection pointers. +type FetchNodesResponse = FetchResponse[[]*Node] -func (collection *Collection) FetchNodes(fetcher NodeFetcher, start_pagination Pagination) error { - nodes, _, err := fetcher(collection.GetMetadata(), start_pagination) - if err != nil { - return err - } - collection.childs = slices.Concat(collection.childs, nodes) - - err = BulkCreateNode(collection._conn, collection.childs) +// Fetches collections using the provided fetcher and pagination settings. +// It updates the platform's collections and creates relationships between the platform and the collections. +// +// Parameters: +// - ctx: The context to control cancellation. +// - fetcher: The fetcher function to execute the work. +// - start_pagination: The initial pagination settings. +// - pool_config: The configuration for the worker pool. +// +// Returns: +// - error: The error if any occurred. +func (collection *Collection) FetchNodes( + ctx context.Context, + fetcher Work[Pagination, FetchNodesResponse], + startPagination Pagination, + poolConfig *WorkConfig, +) error { + values, err := fetchWithPagination(ctx, poolConfig, fetcher, startPagination) if err != nil { return err } - for _, item := range collection.childs { - err := collection.AddRelationship( - &Relationship{ - _class: "COLLECTION_HAS_NODE", - From: collection.Id, - To: item.Id, - }) + collection.childs = slices.Concat(collection.childs, values) + + fmt.Printf("Nodes: %v\n", len(collection.childs)) + + err = BulkCreateNode(collection._conn, values) + if err != nil { + return err + } + + relationships := make([]*Relationship, 0, len(values)) + + for _, item := range values { + relation := &Relationship{ + _class: "COLLECTION_HAS_NODE", + From: collection.Id, + To: item.Id, + } + + err := collection.AddRelationship(relation) if err != nil { return err } + relationships = append(relationships, relation) } - err = BulkCreateRelationships(collection._conn, collection._relationships) + err = BulkCreateRelationships(collection._conn, relationships) if err != nil { return err }