package synchronizator import ( "context" "encoding/json" "fmt" "slices" "strings" ) type default_collection struct { platform_name string } func (collection *default_collection) GetClass() string { platform_name := strings.ToUpper(collection.platform_name) return platform_name + "_DEFAULT" } func (collection *default_collection) GetMetadata() []byte { return nil } func (collection *default_collection) FromNode(_class string, name string, metadata []byte) error { if _class != "DEFAULT" { return fmt.Errorf("invalid class %s", _class) } return nil } // Utility struct to represent a collection of nodes, it's a [Node] itself so all // the node's functionality is available. type Collection struct { Node // Underlaying node info childs []*Node // Child nodes is_default bool } func (collection *Collection) loadNodes() error { sql := ` WITH RECURSIVE NodeRelationships AS ( SELECT *, relationships._class AS relationship_class, relationships.metadata AS relationship_metadata FROM nodes as src JOIN relationships ON src.id = relationships.node_from WHERE src.id = $1 AND relationships._class = 'COLLECTION_HAS_NODE' ) SELECT dst.id, dst.name, dst._class, dst.metadata, dst.original_data FROM NodeRelationships JOIN nodes as dst ON dst.id = NodeRelationships.node_to ` rows, err := collection._conn.Query(sql, collection.Id) if err != nil { return err } defer rows.Close() collection._relationships = make([]StandardRelationship, 0) collection.childs = make([]*Node, 0) for rows.Next() { node := &Node{ _conn: collection._conn, } if err := rows.Scan( &node.Id, &node.name, &node._class, &node.metadata, &node.originalData, ); err != nil { return err } collection.childs = append(collection.childs, node) } return nil } // NewCollectionObject creates a new Collection instance without persisting it to the database. // This is useful when you want to prepare a Collection for later storage. func NewCollection(name string, metadata []byte) *Collection { return &Collection{ Node: Node{ name: name, _class: "COLLECTION", metadata: metadata, Id: -1, // Use -1 to indicate not persisted }, childs: make([]*Node, 0), is_default: false, } } // Internal RelationshipClass to handle the collection to node relationship type collection_has_node struct { Relationship } func (col_has_node *collection_has_node) GetClass() string { return "COLLECTION_HAS" } func (col_has_node *collection_has_node) GetMetadata() []byte { return nil } func (col_has_node *collection_has_node) FromRelationship(_class string, metadata []byte) error { if _class != "COLLECTION_HAS" { return fmt.Errorf("invalid class %s", _class) } return nil } // Adds a new child to this collection. Use the underlaying node's AddRelation // method. func (collection *Collection) AddChild(node *Node) error { _, err := collection.StoreRelation(&collection_has_node{}, node.Id) if err != nil { return err } collection.childs = append(collection.childs, node) return nil } func (collection *Collection) IsDefault() bool { return collection.is_default } // Is a type alias for FetchResponse containing a slice of Collection pointers. type FetchNodesResponse = FetchResponse[[]*Node] // Fetches collections using the provided fetcher and pagination settings. // It updates the platform's collections and creates relationships between the platform and the collections. // // Parameters: // - ctx: The context to control cancellation. // - fetcher: The fetcher function to execute the work. // - start_pagination: The initial pagination settings. // - pool_config: The configuration for the worker pool. // // Returns: // - error: The error if any occurred. func (collection *Collection) FetchNodes( ctx context.Context, fetcher Work[Pagination, FetchNodesResponse], startPagination Pagination, poolConfig *WorkConfig, ) error { fmt.Printf("Fetching nodes for Collection: %d - %v\n", collection.Id, collection.name) values, err := fetchWithPagination(ctx, poolConfig, fetcher, startPagination) if err != nil { return err } collection.childs = slices.Concat(collection.childs, values) fmt.Printf("Nodes added: %d, Nodes in collection: %d\n", len(values), len(collection.childs)) err = BulkCreateNode(collection._conn, values) if err != nil { return err } relationships := make([]*Relationship, 0, len(values)) for _, item := range values { relation := &Relationship{ _class: "COLLECTION_HAS_NODE", From: collection.Id, To: item.Id, } err := collection.AddRelationship(relation) if err != nil { return err } relationships = append(relationships, relation) } err = BulkCreateRelationships(collection._conn, relationships) if err != nil { return err } return nil } func PushNodes[T any]( ctx context.Context, collection *Collection, fetcher Work[T, string], poolConfig *WorkConfig, ) error { fmt.Printf("Pushing nodes for Collection: %d - %v\n", collection.Id, collection.name) tasks := make(chan T) values, errors, doneChannel := asyncTaskRunner[T, string]( ctx, tasks, poolConfig, fetcher, ) for _, node := range collection.childs { var child T json.Unmarshal(node.GetMetadata(), &child) tasks <- child } close(tasks) for { select { case <-values: // TODO: add external id to node, so we can update it later continue case error, ok := <-errors: if !ok { continue } return error case <-ctx.Done(): return ctx.Err() case <-doneChannel: return nil } } } type Reconcilation[T any] func(src *T) (*Node, error) func ReconciliateCollections[T any](src, dst *Collection, work Reconcilation[T]) error { newNodes := make([]*Node, 0, len(src.childs)) updateNodes := make([]*Node, 0, len(src.childs)) for _, srcNode := range src.childs { var metadata T err := json.Unmarshal(srcNode.metadata, &metadata) if err != nil { return err } dstNode, err := work(&metadata) if err != nil { return err } if dstNode == nil { return fmt.Errorf("Node pointer is null") } if dstNode.Id == -1 { newNodes = append(newNodes, dstNode) } else { updateNodes = append(updateNodes, dstNode) } } BulkCreateNode(src._conn, newNodes) // BulkUpdateNode(src._conn, updateNodes) relationships := make([]*Relationship, 0, len(newNodes)+len(updateNodes)) appendRelationship := func(item *Node) error { relation := &Relationship{ _class: "COLLECTION_HAS_NODE", From: dst.Id, To: item.Id, } err := dst.AddRelationship(relation) if err != nil { return err } relationships = append(relationships, relation) return nil } for _, item := range newNodes { err := appendRelationship(item) if err != nil { return err } } for _, item := range updateNodes { err := appendRelationship(item) if err != nil { return err } } err := BulkCreateRelationships(dst._conn, relationships) if err != nil { return err } return nil }