From 3cf643c83d59f81538c9be02cbe06d9c9a7e7348 Mon Sep 17 00:00:00 2001 From: aleidk Date: Fri, 22 Nov 2024 20:20:22 -0300 Subject: [PATCH] perf: do a bulk insert in fetchCollection Improve performance over inserting each item individually --- examples/usage.go | 12 +++----- pkg/collection.go | 14 ++++++++++ pkg/node.go | 29 +++++++++++++++++++- pkg/platform.go | 11 ++++++-- pkg/synchronizator.go | 64 +++++++++++++++++++++++++++++++++++++++++++ 5 files changed, 118 insertions(+), 12 deletions(-) diff --git a/examples/usage.go b/examples/usage.go index 0bc6ab5..7680766 100644 --- a/examples/usage.go +++ b/examples/usage.go @@ -32,7 +32,6 @@ type Pokemon struct { } func getPokemons( - sync *synchronizator.DB, pagination synchronizator.Pagination, ) ([]*synchronizator.Collection, synchronizator.Pagination, error) { var collections []*synchronizator.Collection @@ -57,12 +56,9 @@ func getPokemons( collections = make([]*synchronizator.Collection, 0, len(data.Results)) - // TODO: this writes to the database in each collection creation - // add better strategies, like returning a collection to the platform and the - // platform do a final bulk update for _, pokedex := range data.Results { collection_name := "Pokedex_" + pokedex.Name - collection, err := sync.NewCollection(collection_name, nil) + collection := synchronizator.NewCollection(collection_name, nil) if err != nil { return nil, pagination, err } @@ -89,7 +85,7 @@ func main() { defer connection.Close() opts := synchronizator.DefaultOptions - // opts.Log_level = synchronizator.DEBUG + opts.Log_level = synchronizator.DEBUG opts.DANGEROUSLY_DROP_TABLES = true sync, err := synchronizator.New(connection, opts) @@ -105,11 +101,11 @@ func main() { return } - fmt.Println(pokeApi) - err = pokeApi.FetchCollections(getPokemons, synchronizator.StartPagination) if err != nil { fmt.Println(err) return } + + fmt.Println(pokeApi) } diff --git a/pkg/collection.go b/pkg/collection.go index 20282de..79c554f 100644 --- a/pkg/collection.go +++ b/pkg/collection.go @@ -28,6 +28,20 @@ type Collection struct { childs []*Node // Child nodes } +// NewCollectionObject creates a new Collection instance without persisting it to the database. +// This is useful when you want to prepare a Collection for later storage. +func NewCollection(name string, metadata []byte) *Collection { + return &Collection{ + Node: Node{ + name: name, + _class: "COLLECTION", + metadata: metadata, + Id: -1, // Use -1 to indicate not persisted + }, + childs: make([]*Node, 0), + } +} + // Internal RelationshipClass to handle the collection to node relationship type collection_relation struct{} diff --git a/pkg/node.go b/pkg/node.go index fd14907..4a6e736 100644 --- a/pkg/node.go +++ b/pkg/node.go @@ -1,6 +1,13 @@ package synchronizator -type StandardNode interface{} +type StandardNode interface { + GetClass() string + GetName() string + GetMetadata() []byte + + SetId(int64) + SetConnection(*DB) +} // A node in the database. // It adds some helper methods to easily manage the node. @@ -13,6 +20,26 @@ type Node struct { metadata []byte // Arbitrary data. This is stored as a jsonb in the database } +func (node *Node) GetClass() string { + return node._class +} + +func (node *Node) GetName() string { + return node.name +} + +func (node *Node) GetMetadata() []byte { + return node.metadata +} + +func (node *Node) SetId(id int64) { + node.Id = id +} + +func (node *Node) SetConnection(conn *DB) { + node._conn = conn +} + // Creates a new relation of type StandardRelationship to the node with the // provided id. An error is returned if the relation already exists. // diff --git a/pkg/platform.go b/pkg/platform.go index ccb8135..41ff26d 100644 --- a/pkg/platform.go +++ b/pkg/platform.go @@ -1,5 +1,7 @@ package synchronizator +import "slices" + type PlatformClass interface { // How to transform the struct into a node. It needs to return the class, // name and a []byte representation of the metadata. @@ -24,7 +26,7 @@ type Platform struct { collections []*Collection // Child nodes } -type Fetcher = func(conn *DB, pagination Pagination) ([]*Collection, Pagination, error) +type Fetcher = func(pagination Pagination) ([]*Collection, Pagination, error) type Pagination struct { Total int @@ -41,14 +43,17 @@ var StartPagination = Pagination{ } func (platform *Platform) FetchCollections(fetcher Fetcher, start_pagination Pagination) error { - collections, pagination, err := fetcher(platform._conn, start_pagination) + collections, pagination, err := fetcher(start_pagination) if err != nil { return err } - platform.collections = collections + platform.collections = slices.Concat(platform.collections, collections) if pagination.HasMore { return platform.FetchCollections(fetcher, pagination) } + + err = BulkCreateNode(platform._conn, platform.collections) + return nil } diff --git a/pkg/synchronizator.go b/pkg/synchronizator.go index 54995a4..370229a 100644 --- a/pkg/synchronizator.go +++ b/pkg/synchronizator.go @@ -451,3 +451,67 @@ func (conn *DB) DeleteRelation(from int64, to int64) error { return nil } + +// Creates a new node +func BulkCreateNode[T StandardNode]( + conn *DB, + nodes []T, +) error { + if len(nodes) == 0 { + return nil + } + + tx, err := conn.Connection.Begin() + if err != nil { + return err + } + + defer tx.Rollback() + + // Build the query dynamically based on number of nodes + valueStrings := make([]string, 0, len(nodes)) + valueArgs := make([]interface{}, 0, len(nodes)*3) + + for i := range nodes { + // Create ($1, $2, $3), ($4, $5, $6), etc. + n := i * 3 + valueStrings = append(valueStrings, fmt.Sprintf("($%d, $%d, $%d)", n+1, n+2, n+3)) + valueArgs = append( + valueArgs, + nodes[i].GetClass(), + nodes[i].GetName(), + nodes[i].GetMetadata(), + ) + } + + sql := fmt.Sprintf(` + INSERT INTO nodes (_class, name, metadata) + VALUES %s + RETURNING id;`, strings.Join(valueStrings, ",")) + + conn.log(DEBUG, "Bulk creating nodes:", sql, valueArgs) + + // Execute and scan returned IDs + rows, err := tx.Query(sql, valueArgs...) + if err != nil { + return fmt.Errorf("bulk insert failed: %w", err) + } + defer rows.Close() + + // Assign IDs back to the nodes + i := 0 + for rows.Next() { + var id int64 + if err := rows.Scan(&id); err != nil { + return fmt.Errorf("scanning returned id failed: %w", err) + } + + nodes[i].SetId(id) + nodes[i].SetConnection(conn) + i++ + } + + tx.Commit() + + return rows.Err() +}