generated from alecodes/base-template
314 lines
6.9 KiB
Go
314 lines
6.9 KiB
Go
package synchronizator
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"slices"
|
|
"strings"
|
|
)
|
|
|
|
type default_collection struct {
|
|
platform_name string
|
|
}
|
|
|
|
func (collection *default_collection) GetClass() string {
|
|
platform_name := strings.ToUpper(collection.platform_name)
|
|
return platform_name + "_DEFAULT"
|
|
}
|
|
|
|
func (collection *default_collection) GetMetadata() []byte {
|
|
return nil
|
|
}
|
|
|
|
func (collection *default_collection) FromNode(_class string, name string, metadata []byte) error {
|
|
if _class != "DEFAULT" {
|
|
return fmt.Errorf("invalid class %s", _class)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Utility struct to represent a collection of nodes, it's a [Node] itself so all
|
|
// the node's functionality is available.
|
|
type Collection struct {
|
|
Node // Underlaying node info
|
|
childs []*Node // Child nodes
|
|
is_default bool
|
|
}
|
|
|
|
func (collection *Collection) loadNodes() error {
|
|
sql := `
|
|
WITH RECURSIVE NodeRelationships AS (
|
|
SELECT
|
|
*,
|
|
relationships._class AS relationship_class,
|
|
relationships.metadata AS relationship_metadata
|
|
FROM
|
|
nodes as src
|
|
JOIN relationships ON src.id = relationships.node_from
|
|
WHERE src.id = $1 AND relationships._class = 'COLLECTION_HAS_NODE'
|
|
)
|
|
SELECT
|
|
dst.id,
|
|
dst.name,
|
|
dst._class,
|
|
dst.metadata,
|
|
dst.original_data
|
|
FROM
|
|
NodeRelationships
|
|
JOIN nodes as dst ON dst.id = NodeRelationships.node_to
|
|
`
|
|
|
|
rows, err := collection._conn.Query(sql, collection.Id)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
defer rows.Close()
|
|
|
|
collection._relationships = make([]StandardRelationship, 0)
|
|
collection.childs = make([]*Node, 0)
|
|
for rows.Next() {
|
|
node := &Node{
|
|
_conn: collection._conn,
|
|
}
|
|
if err := rows.Scan(
|
|
&node.Id,
|
|
&node.name,
|
|
&node._class,
|
|
&node.metadata,
|
|
&node.originalData,
|
|
); err != nil {
|
|
return err
|
|
}
|
|
|
|
collection.childs = append(collection.childs, node)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// NewCollectionObject creates a new Collection instance without persisting it to the database.
|
|
// This is useful when you want to prepare a Collection for later storage.
|
|
func NewCollection(name string, metadata []byte) *Collection {
|
|
return &Collection{
|
|
Node: Node{
|
|
name: name,
|
|
_class: "COLLECTION",
|
|
metadata: metadata,
|
|
Id: -1, // Use -1 to indicate not persisted
|
|
},
|
|
childs: make([]*Node, 0),
|
|
is_default: false,
|
|
}
|
|
}
|
|
|
|
// Internal RelationshipClass to handle the collection to node relationship
|
|
type collection_has_node struct {
|
|
Relationship
|
|
}
|
|
|
|
func (col_has_node *collection_has_node) GetClass() string {
|
|
return "COLLECTION_HAS"
|
|
}
|
|
|
|
func (col_has_node *collection_has_node) GetMetadata() []byte {
|
|
return nil
|
|
}
|
|
|
|
func (col_has_node *collection_has_node) FromRelationship(_class string, metadata []byte) error {
|
|
if _class != "COLLECTION_HAS" {
|
|
return fmt.Errorf("invalid class %s", _class)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Adds a new child to this collection. Use the underlaying node's AddRelation
|
|
// method.
|
|
func (collection *Collection) AddChild(node *Node) error {
|
|
_, err := collection.StoreRelation(&collection_has_node{}, node.Id)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
collection.childs = append(collection.childs, node)
|
|
|
|
return nil
|
|
}
|
|
|
|
func (collection *Collection) IsDefault() bool {
|
|
return collection.is_default
|
|
}
|
|
|
|
// Is a type alias for FetchResponse containing a slice of Collection pointers.
|
|
type FetchNodesResponse = FetchResponse[[]*Node]
|
|
|
|
// Fetches collections using the provided fetcher and pagination settings.
|
|
// It updates the platform's collections and creates relationships between the platform and the collections.
|
|
//
|
|
// Parameters:
|
|
// - ctx: The context to control cancellation.
|
|
// - fetcher: The fetcher function to execute the work.
|
|
// - start_pagination: The initial pagination settings.
|
|
// - pool_config: The configuration for the worker pool.
|
|
//
|
|
// Returns:
|
|
// - error: The error if any occurred.
|
|
func (collection *Collection) FetchNodes(
|
|
ctx context.Context,
|
|
fetcher Work[Pagination, FetchNodesResponse],
|
|
startPagination Pagination,
|
|
poolConfig *WorkConfig,
|
|
) error {
|
|
fmt.Printf("Fetching nodes for Collection: %d - %v\n", collection.Id, collection.name)
|
|
values, err := fetchWithPagination(ctx, poolConfig, fetcher, startPagination)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
collection.childs = slices.Concat(collection.childs, values)
|
|
|
|
fmt.Printf("Nodes added: %d, Nodes in collection: %d\n", len(values), len(collection.childs))
|
|
|
|
err = BulkCreateNode(collection._conn, values)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
relationships := make([]*Relationship, 0, len(values))
|
|
|
|
for _, item := range values {
|
|
relation := &Relationship{
|
|
_class: "COLLECTION_HAS_NODE",
|
|
From: collection.Id,
|
|
To: item.Id,
|
|
}
|
|
|
|
err := collection.AddRelationship(relation)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
relationships = append(relationships, relation)
|
|
}
|
|
|
|
err = BulkCreateRelationships(collection._conn, relationships)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func PushNodes[T any](
|
|
ctx context.Context,
|
|
collection *Collection,
|
|
fetcher Work[T, string],
|
|
poolConfig *WorkConfig,
|
|
) error {
|
|
fmt.Printf("Pushing nodes for Collection: %d - %v\n", collection.Id, collection.name)
|
|
|
|
tasks := make(chan T)
|
|
|
|
values, errors, doneChannel := asyncTaskRunner[T, string](
|
|
ctx,
|
|
tasks,
|
|
poolConfig,
|
|
fetcher,
|
|
)
|
|
|
|
for _, node := range collection.childs {
|
|
var child T
|
|
json.Unmarshal(node.GetMetadata(), &child)
|
|
tasks <- child
|
|
}
|
|
|
|
close(tasks)
|
|
|
|
for {
|
|
select {
|
|
case <-values:
|
|
// TODO: add external id to node, so we can update it later
|
|
continue
|
|
case error, ok := <-errors:
|
|
if !ok {
|
|
continue
|
|
}
|
|
return error
|
|
case <-ctx.Done():
|
|
return ctx.Err()
|
|
case <-doneChannel:
|
|
return nil
|
|
}
|
|
}
|
|
}
|
|
|
|
type Reconcilation[T any] func(src *T) (*Node, error)
|
|
|
|
func ReconciliateCollections[T any](src, dst *Collection, work Reconcilation[T]) error {
|
|
newNodes := make([]*Node, 0, len(src.childs))
|
|
updateNodes := make([]*Node, 0, len(src.childs))
|
|
for _, srcNode := range src.childs {
|
|
var metadata T
|
|
err := json.Unmarshal(srcNode.metadata, &metadata)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
dstNode, err := work(&metadata)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if dstNode == nil {
|
|
return fmt.Errorf("Node pointer is null")
|
|
}
|
|
|
|
if dstNode.Id == -1 {
|
|
newNodes = append(newNodes, dstNode)
|
|
} else {
|
|
updateNodes = append(updateNodes, dstNode)
|
|
}
|
|
}
|
|
|
|
BulkCreateNode(src._conn, newNodes)
|
|
// BulkUpdateNode(src._conn, updateNodes)
|
|
|
|
relationships := make([]*Relationship, 0, len(newNodes)+len(updateNodes))
|
|
|
|
appendRelationship := func(item *Node) error {
|
|
relation := &Relationship{
|
|
_class: "COLLECTION_HAS_NODE",
|
|
From: dst.Id,
|
|
To: item.Id,
|
|
}
|
|
|
|
err := dst.AddRelationship(relation)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
relationships = append(relationships, relation)
|
|
|
|
return nil
|
|
}
|
|
|
|
for _, item := range newNodes {
|
|
err := appendRelationship(item)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
for _, item := range updateNodes {
|
|
err := appendRelationship(item)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
err := BulkCreateRelationships(dst._conn, relationships)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|