feat: add simple reconsiliation capabilities

This commit is contained in:
Alexander Navarro 2024-12-02 15:13:44 -03:00
parent 5d00b7c336
commit 28fa3ed3cc
5 changed files with 451 additions and 8 deletions

View file

@ -2,6 +2,7 @@ package synchronizator
import (
"context"
"encoding/json"
"fmt"
"slices"
"strings"
@ -35,6 +36,58 @@ type Collection struct {
is_default bool
}
func (collection *Collection) loadNodes() error {
sql := `
WITH RECURSIVE NodeRelationships AS (
SELECT
*,
relationships._class AS relationship_class,
relationships.metadata AS relationship_metadata
FROM
nodes as src
JOIN relationships ON src.id = relationships.node_from
WHERE src.id = $1 AND relationships._class = 'COLLECTION_HAS_NODE'
)
SELECT
dst.id,
dst.name,
dst._class,
dst.metadata,
dst.original_data
FROM
NodeRelationships
JOIN nodes as dst ON dst.id = NodeRelationships.node_to
`
rows, err := collection._conn.Query(sql, collection.Id)
if err != nil {
return err
}
defer rows.Close()
collection._relationships = make([]StandardRelationship, 0)
collection.childs = make([]*Node, 0)
for rows.Next() {
node := &Node{
_conn: collection._conn,
}
if err := rows.Scan(
&node.Id,
&node.name,
&node._class,
&node.metadata,
&node.originalData,
); err != nil {
return err
}
collection.childs = append(collection.childs, node)
}
return nil
}
// NewCollectionObject creates a new Collection instance without persisting it to the database.
// This is useful when you want to prepare a Collection for later storage.
func NewCollection(name string, metadata []byte) *Collection {
@ -145,3 +198,74 @@ func (collection *Collection) FetchNodes(
return nil
}
type Reconcilation[T any] func(src *T) (*Node, error)
func ReconciliateCollections[T any](src, dst *Collection, work Reconcilation[T]) error {
newNodes := make([]*Node, 0, len(src.childs))
updateNodes := make([]*Node, 0, len(src.childs))
for _, srcNode := range src.childs {
var metadata T
err := json.Unmarshal(srcNode.metadata, &metadata)
if err != nil {
return err
}
dstNode, err := work(&metadata)
if err != nil {
return err
}
if dstNode == nil {
return fmt.Errorf("Node pointer is null")
}
if dstNode.Id == -1 {
newNodes = append(newNodes, dstNode)
} else {
updateNodes = append(updateNodes, dstNode)
}
}
BulkCreateNode(src._conn, newNodes)
// BulkUpdateNode(src._conn, updateNodes)
relationships := make([]*Relationship, 0, len(newNodes)+len(updateNodes))
appendRelationship := func(item *Node) error {
relation := &Relationship{
_class: "COLLECTION_HAS_NODE",
From: dst.Id,
To: item.Id,
}
err := dst.AddRelationship(relation)
if err != nil {
return err
}
relationships = append(relationships, relation)
return nil
}
for _, item := range newNodes {
err := appendRelationship(item)
if err != nil {
return err
}
}
for _, item := range updateNodes {
err := appendRelationship(item)
if err != nil {
return err
}
}
err := BulkCreateRelationships(dst._conn, relationships)
if err != nil {
return err
}
return nil
}

View file

@ -2,10 +2,33 @@ package synchronizator
import (
"context"
sql "database/sql"
"fmt"
"slices"
)
type platform_has_collection struct {
Relationship
}
func (col_has_node *platform_has_collection) GetClass() string {
return "PLATFORM_HAS_COLLECTION"
}
func (col_has_node *platform_has_collection) GetMetadata() []byte {
return nil
}
func (col_has_node *platform_has_collection) FromRelationship(
_class string,
metadata []byte,
) error {
if _class != "PLATFORM_HAS_COLLECTION" {
return fmt.Errorf("invalid class %s", _class)
}
return nil
}
// Platform represents a collection of nodes. It embeds a Node, so all the
// node's functionality is available.
type Platform struct {
@ -17,16 +40,90 @@ func (platform *Platform) AddCollection(
name string,
metadata, originalData []byte,
) (*Collection, error) {
collection, err := platform._conn.NewCollection(name, metadata, originalData)
var collection *Collection
err := platform._conn.withTx(func(tx *sql.Tx) error {
node, err := platform._conn.newCollectionwithTx(tx, name, metadata, originalData)
if err != nil {
return err
}
_, err = platform._conn.addRelationwithTx(
tx,
platform.Id,
&platform_has_collection{},
node.Id,
)
if err != nil {
return err
}
collection = node
return nil
})
if err != nil {
return nil, err
}
platform.Collections = append(platform.Collections, collection)
return collection, nil
}
func (platform *Platform) loadCollections() error {
sql := `
WITH RECURSIVE NodeRelationships AS (
SELECT
*,
relationships._class AS relationship_class,
relationships.metadata AS relationship_metadata
FROM
nodes as src
JOIN relationships ON src.id = relationships.node_from
WHERE src.id = $1 AND relationships._class = 'PLATFORM_HAS_COLLECTION'
)
SELECT
dst.id,
dst.name,
dst._class,
dst.metadata,
dst.original_data
FROM
NodeRelationships
JOIN nodes as dst ON dst.id = NodeRelationships.node_to
`
rows, err := platform._conn.Query(sql, platform.Id)
if err != nil {
return err
}
defer rows.Close()
platform._relationships = make([]StandardRelationship, 0)
platform.Collections = make([]*Collection, 0)
for rows.Next() {
collection := &Collection{
Node: Node{
_conn: platform._conn,
},
}
if err := rows.Scan(
&collection.Id,
&collection.name,
&collection._class,
&collection.metadata,
&collection.originalData,
); err != nil {
return err
}
platform.Collections = append(platform.Collections, collection)
}
return nil
}
func (platform *Platform) GetDefaultCollection() (*Collection, error) {
for _, collection := range platform.Collections {
if collection.IsDefault() {

View file

@ -195,7 +195,7 @@ func (conn *DB) NewPlatform(name string, metadata []byte, originalData []byte) (
Collections: []*Collection{collection},
}
_, err = conn.addRelationwithTx(tx, platform.Id, &collection_has_node{}, collection.Id)
_, err = conn.addRelationwithTx(tx, platform.Id, &platform_has_collection{}, collection.Id)
if err != nil {
return err
}
@ -340,6 +340,60 @@ func (conn *DB) GetNode(id int64) (*Node, error) {
return &node, nil
}
func (conn *DB) GetPlatform(id int64) (*Platform, error) {
node := Node{
Id: id,
_conn: conn,
}
sql := "SELECT _class, name, metadata, original_data FROM nodes WHERE id = $1 AND _class = 'PLATFORM';"
conn.log(DEBUG, sql)
err := conn.Connection.QueryRow(sql, id).
Scan(&node._class, &node.name, &node.metadata, &node.originalData)
if err != nil {
conn.log(DEBUG, err)
return nil, fmt.Errorf("No row matching id = %v", id)
}
platform := &Platform{
Node: node,
}
err = platform.loadCollections()
if err != nil {
return nil, err
}
return platform, nil
}
func (conn *DB) GetCollection(id int64) (*Collection, error) {
node := Node{
Id: id,
_conn: conn,
}
sql := "SELECT _class, name, metadata, original_data FROM nodes WHERE id = $1 AND _class = 'COLLECTION';"
conn.log(DEBUG, sql)
err := conn.Connection.QueryRow(sql, id).
Scan(&node._class, &node.name, &node.metadata, &node.originalData)
if err != nil {
conn.log(DEBUG, err)
return nil, fmt.Errorf("No row matching id = %v", id)
}
collection := &Collection{
Node: node,
}
err = collection.loadNodes()
if err != nil {
return nil, err
}
return collection, nil
}
// Deletes a node with the provided id
func (conn *DB) DeleteNode(id int64) error {
tx, err := conn.Connection.Begin()