perf: do a bulk insert in fetchCollection

Improve performance over inserting each item individually
This commit is contained in:
Alexander Navarro 2024-11-22 20:20:22 -03:00
parent 8c660053e5
commit 3cf643c83d
5 changed files with 118 additions and 12 deletions

View file

@ -32,7 +32,6 @@ type Pokemon struct {
} }
func getPokemons( func getPokemons(
sync *synchronizator.DB,
pagination synchronizator.Pagination, pagination synchronizator.Pagination,
) ([]*synchronizator.Collection, synchronizator.Pagination, error) { ) ([]*synchronizator.Collection, synchronizator.Pagination, error) {
var collections []*synchronizator.Collection var collections []*synchronizator.Collection
@ -57,12 +56,9 @@ func getPokemons(
collections = make([]*synchronizator.Collection, 0, len(data.Results)) collections = make([]*synchronizator.Collection, 0, len(data.Results))
// TODO: this writes to the database in each collection creation
// add better strategies, like returning a collection to the platform and the
// platform do a final bulk update
for _, pokedex := range data.Results { for _, pokedex := range data.Results {
collection_name := "Pokedex_" + pokedex.Name collection_name := "Pokedex_" + pokedex.Name
collection, err := sync.NewCollection(collection_name, nil) collection := synchronizator.NewCollection(collection_name, nil)
if err != nil { if err != nil {
return nil, pagination, err return nil, pagination, err
} }
@ -89,7 +85,7 @@ func main() {
defer connection.Close() defer connection.Close()
opts := synchronizator.DefaultOptions opts := synchronizator.DefaultOptions
// opts.Log_level = synchronizator.DEBUG opts.Log_level = synchronizator.DEBUG
opts.DANGEROUSLY_DROP_TABLES = true opts.DANGEROUSLY_DROP_TABLES = true
sync, err := synchronizator.New(connection, opts) sync, err := synchronizator.New(connection, opts)
@ -105,11 +101,11 @@ func main() {
return return
} }
fmt.Println(pokeApi)
err = pokeApi.FetchCollections(getPokemons, synchronizator.StartPagination) err = pokeApi.FetchCollections(getPokemons, synchronizator.StartPagination)
if err != nil { if err != nil {
fmt.Println(err) fmt.Println(err)
return return
} }
fmt.Println(pokeApi)
} }

View file

@ -28,6 +28,20 @@ type Collection struct {
childs []*Node // Child nodes childs []*Node // Child nodes
} }
// NewCollectionObject creates a new Collection instance without persisting it to the database.
// This is useful when you want to prepare a Collection for later storage.
func NewCollection(name string, metadata []byte) *Collection {
return &Collection{
Node: Node{
name: name,
_class: "COLLECTION",
metadata: metadata,
Id: -1, // Use -1 to indicate not persisted
},
childs: make([]*Node, 0),
}
}
// Internal RelationshipClass to handle the collection to node relationship // Internal RelationshipClass to handle the collection to node relationship
type collection_relation struct{} type collection_relation struct{}

View file

@ -1,6 +1,13 @@
package synchronizator package synchronizator
type StandardNode interface{} type StandardNode interface {
GetClass() string
GetName() string
GetMetadata() []byte
SetId(int64)
SetConnection(*DB)
}
// A node in the database. // A node in the database.
// It adds some helper methods to easily manage the node. // It adds some helper methods to easily manage the node.
@ -13,6 +20,26 @@ type Node struct {
metadata []byte // Arbitrary data. This is stored as a jsonb in the database metadata []byte // Arbitrary data. This is stored as a jsonb in the database
} }
func (node *Node) GetClass() string {
return node._class
}
func (node *Node) GetName() string {
return node.name
}
func (node *Node) GetMetadata() []byte {
return node.metadata
}
func (node *Node) SetId(id int64) {
node.Id = id
}
func (node *Node) SetConnection(conn *DB) {
node._conn = conn
}
// Creates a new relation of type StandardRelationship to the node with the // Creates a new relation of type StandardRelationship to the node with the
// provided id. An error is returned if the relation already exists. // provided id. An error is returned if the relation already exists.
// //

View file

@ -1,5 +1,7 @@
package synchronizator package synchronizator
import "slices"
type PlatformClass interface { type PlatformClass interface {
// How to transform the struct into a node. It needs to return the class, // How to transform the struct into a node. It needs to return the class,
// name and a []byte representation of the metadata. // name and a []byte representation of the metadata.
@ -24,7 +26,7 @@ type Platform struct {
collections []*Collection // Child nodes collections []*Collection // Child nodes
} }
type Fetcher = func(conn *DB, pagination Pagination) ([]*Collection, Pagination, error) type Fetcher = func(pagination Pagination) ([]*Collection, Pagination, error)
type Pagination struct { type Pagination struct {
Total int Total int
@ -41,14 +43,17 @@ var StartPagination = Pagination{
} }
func (platform *Platform) FetchCollections(fetcher Fetcher, start_pagination Pagination) error { func (platform *Platform) FetchCollections(fetcher Fetcher, start_pagination Pagination) error {
collections, pagination, err := fetcher(platform._conn, start_pagination) collections, pagination, err := fetcher(start_pagination)
if err != nil { if err != nil {
return err return err
} }
platform.collections = collections platform.collections = slices.Concat(platform.collections, collections)
if pagination.HasMore { if pagination.HasMore {
return platform.FetchCollections(fetcher, pagination) return platform.FetchCollections(fetcher, pagination)
} }
err = BulkCreateNode(platform._conn, platform.collections)
return nil return nil
} }

View file

@ -451,3 +451,67 @@ func (conn *DB) DeleteRelation(from int64, to int64) error {
return nil return nil
} }
// Creates a new node
func BulkCreateNode[T StandardNode](
conn *DB,
nodes []T,
) error {
if len(nodes) == 0 {
return nil
}
tx, err := conn.Connection.Begin()
if err != nil {
return err
}
defer tx.Rollback()
// Build the query dynamically based on number of nodes
valueStrings := make([]string, 0, len(nodes))
valueArgs := make([]interface{}, 0, len(nodes)*3)
for i := range nodes {
// Create ($1, $2, $3), ($4, $5, $6), etc.
n := i * 3
valueStrings = append(valueStrings, fmt.Sprintf("($%d, $%d, $%d)", n+1, n+2, n+3))
valueArgs = append(
valueArgs,
nodes[i].GetClass(),
nodes[i].GetName(),
nodes[i].GetMetadata(),
)
}
sql := fmt.Sprintf(`
INSERT INTO nodes (_class, name, metadata)
VALUES %s
RETURNING id;`, strings.Join(valueStrings, ","))
conn.log(DEBUG, "Bulk creating nodes:", sql, valueArgs)
// Execute and scan returned IDs
rows, err := tx.Query(sql, valueArgs...)
if err != nil {
return fmt.Errorf("bulk insert failed: %w", err)
}
defer rows.Close()
// Assign IDs back to the nodes
i := 0
for rows.Next() {
var id int64
if err := rows.Scan(&id); err != nil {
return fmt.Errorf("scanning returned id failed: %w", err)
}
nodes[i].SetId(id)
nodes[i].SetConnection(conn)
i++
}
tx.Commit()
return rows.Err()
}