This repository has been archived on 2025-05-15. You can view files and clone it, but you cannot make any changes to it's state, such as pushing and creating new issues, pull requests or comments.
synchronizator-go/pkg/synchronizator.go

589 lines
14 KiB
Go

// Package synchronizator: Provides a library to help synchronize data from
// different sources.
//
// It does so implementing a graph database representing the relation of the
// different entities of data called "nodes", helping you find, create or
// delete the "equivalent" entities in the different sources.
//
// In this library we use the following nomemclature:
// - [struct_name]: The representation of the element in the database
// - [struct_name]Class: An interface so the users can create a custom representation of the element that make's sence in their application. The interface needs to provide a way to transform the struct into a node and viceversa.
package synchronizator
import (
sql "database/sql"
"encoding/json"
"fmt"
"os"
"strings"
"time"
)
// DB Syncronization database interface
type DB struct {
Connection *sql.DB // underliying database connection
logger *os.File
log_level LogLevel
drop_tables bool
}
// Options for the DB struct
type Options struct {
Logger *os.File // Where to log, defaults to os.Stdout
Log_level LogLevel // Log level, defaults to INFO
DANGEROUSLY_DROP_TABLES bool // Drop tables on initialization, defaults to false
}
type LogLevel int
// The log levels, lower levels take precedence
//
//go:generate stringer -type=LogLevel
const (
ERROR LogLevel = iota
INFO
DEBUG
)
var DefaultOptions = &Options{
Logger: os.Stdout,
Log_level: INFO,
DANGEROUSLY_DROP_TABLES: false,
}
// Constructor for the DB struct, it's dicourage to create the struct yourself!
// It receives a pointer to a sql.DB connection and an optional pointer to an Options struct.
//
// You need to initialize the connection yourself, this library does not handle that and only use the provided connection.
// This also alows to append this labrary tables to your already existing database.
func New(connection *sql.DB, options *Options) (*DB, error) {
if options == nil {
options = DefaultOptions
}
conn := DB{
Connection: connection,
logger: options.Logger,
log_level: options.Log_level,
drop_tables: options.DANGEROUSLY_DROP_TABLES,
}
err := conn.bootstrap()
if err != nil {
return nil, err
}
return &conn, nil
}
// Write a log message to the logger.
//
// It receives a LogLevel and a variadic argument that is directly passed to fmt.Fprintln.
// The log is only written if the LogLevel is the same or lower than the configured value in the struct.
func (conn *DB) log(level LogLevel, args ...any) {
if level > conn.log_level {
return
}
fmt.Fprintf(conn.logger, "[ %s - %-5s ]: ", time.Now().Format(time.DateTime), level.String())
fmt.Fprintln(conn.logger, args...)
}
// Initialize the database, creating the tables and indexes.
func (conn *DB) bootstrap() error {
conn.log(INFO, "Initializing database...")
sql := ""
if conn.drop_tables {
sql += `
DROP TABLE IF EXISTS nodes;
DROP TABLE IF EXISTS relationships;
DROP INDEX IF EXISTS node_class;
DROP INDEX IF EXISTS relationships_class;
`
}
sql += `
CREATE TABLE IF NOT EXISTS nodes (
id INTEGER PRIMARY KEY AUTOINCREMENT,
_class text NOT NULL,
name TEXT,
metadata jsonb DEFAULT '{}',
original_data jsonb DEFAULT '{}'
);
CREATE INDEX IF NOT EXISTS node_class on nodes (_class);
CREATE TABLE IF NOT EXISTS relationships (
node_from INTEGER NOT NULL,
node_to INTEGER NOT NULL,
_class text NOT NULL,
metadata jsonb DEFAULT '{}',
PRIMARY KEY (node_from, node_to),
CHECK (node_from != node_to),
CONSTRAINT fk_node_from_relationships FOREIGN KEY (node_from) REFERENCES nodes(id),
CONSTRAINT fk_node_to_relationships FOREIGN KEY (node_to) REFERENCES nodes(id)
);
CREATE INDEX IF NOT EXISTS relationships_class on relationships (_class);
`
conn.log(DEBUG, sql)
_, err := conn.Connection.Exec(sql)
if err != nil {
return err
}
return nil
}
// Allows you to run the underliying query in a transaction.
func (conn *DB) withTx(fn func(*sql.Tx) error) error {
tx, err := conn.Connection.Begin()
if err != nil {
return err
}
defer tx.Rollback()
if err := fn(tx); err != nil {
return err
}
return tx.Commit()
}
func (conn *DB) Query(sql string, args ...any) (*sql.Rows, error) {
conn.log(DEBUG, "Executing query:", sql, args)
rows, err := conn.Connection.Query(sql, args...)
if err != nil {
return nil, err
}
return rows, nil
}
// Creates a new Platform with the provided data.
//
// A collection is only a Node wrapper with some extended functionality to
// manage multiple nodes. For more information see [DB.NewNode] method and the
// [Platform] struct.
func (conn *DB) NewPlatform(name string, metadata []byte, originalData []byte) (*Platform, error) {
var platform *Platform
err := conn.withTx(func(tx *sql.Tx) error {
node, err := conn.newNodewithTx(tx, name, "PLATFORM", metadata, originalData)
if err != nil {
return err
}
collection, err := conn.newCollectionwithTx(
tx,
strings.ToUpper(name)+"_DEFAULT",
nil,
nil,
)
if err != nil {
return err
}
collection.is_default = true
platform = &Platform{
Node: *node,
Collections: []*Collection{collection},
}
_, err = conn.addRelationwithTx(tx, platform.Id, &collection_has_node{}, collection.Id)
if err != nil {
return err
}
return nil
})
return platform, err
}
// Creates a new Collection with the provided data.
//
// A collection is only a Node wrapper with some extended functionality to
// manage multiple nodes. For more information see [DB.NewNode] method and the
// [Collection] struct.
//
// The operation is ran in a database transaction.
func (conn *DB) NewCollection(name string, metadata, originalData []byte) (*Collection, error) {
var collection *Collection
err := conn.withTx(func(tx *sql.Tx) error {
var err error
collection, err = conn.newCollectionwithTx(tx, name, metadata, originalData)
return err
})
return collection, err
}
// Creates a new Collection with the provided data.
//
// A collection is only a Node wrapper with some extended functionality to
// manage multiple nodes. For more information see [DB.NewNode] method and the
// [Collection] struct.
func (conn *DB) newCollectionwithTx(
tx *sql.Tx,
name string,
metadata, originalData []byte,
) (*Collection, error) {
node, err := conn.newNodewithTx(tx, name, "COLLECTION", metadata, originalData)
if err != nil {
return nil, err
}
collection := &Collection{
Node: *node,
childs: make([]*Node, 0),
is_default: false,
}
return collection, err
}
// Creates a new node.
//
// The operation is ran in a database transaction.
func (conn *DB) NewNode(name string, class string, metadata, originalData []byte) (*Node, error) {
var node *Node
err := conn.withTx(func(tx *sql.Tx) error {
var err error
node, err = conn.newNodewithTx(tx, name, class, metadata, originalData)
return err
})
return node, err
}
// Creates a new node
func (conn *DB) newNodewithTx(
tx *sql.Tx,
name string,
class string,
metadata []byte,
originalData []byte,
) (*Node, error) {
node := Node{
_conn: conn,
_class: class,
name: name,
metadata: metadata,
originalData: originalData,
Id: -1,
}
conn.log(DEBUG, "Creating node:", node)
sql := "INSERT INTO nodes (_class, name, metadata, original_data) VALUES ($1, $2, $3, $3) RETURNING id;"
err := tx.QueryRow(sql, node._class, node.name, metadata, originalData).Scan(&node.Id)
if err != nil {
return nil, err
}
return &node, nil
}
// Updates a node with the provided id and data
func (conn *DB) UpdateNode(id int64, name string, metadata []byte) (*Node, error) {
node := Node{
_conn: conn,
name: name,
metadata: metadata,
Id: id,
}
tx, err := conn.Connection.Begin()
if err != nil {
return nil, err
}
defer tx.Rollback()
conn.log(DEBUG, "Updating node:", id, node)
sql := "UPDATE nodes SET _class = $1, name = $2, metadata = $3 WHERE id = $4;"
_, err = tx.Exec(sql, node._class, node.name, node.metadata, id)
if err != nil {
return nil, err
}
if err := tx.Commit(); err != nil {
return nil, err
}
return &node, nil
}
// Return a node with the provided id
func (conn *DB) GetNode(id int64) (*Node, error) {
node := Node{
Id: id,
_conn: conn,
}
sql := "SELECT _class, name, metadata FROM nodes WHERE id = $1;"
conn.log(DEBUG, sql)
err := conn.Connection.QueryRow(sql, id).Scan(&node._class, &node.name, &node.metadata)
if err != nil {
conn.log(DEBUG, err)
return nil, fmt.Errorf("No row matching id = %v", id)
}
return &node, nil
}
// Deletes a node with the provided id
func (conn *DB) DeleteNode(id int64) error {
tx, err := conn.Connection.Begin()
if err != nil {
return err
}
defer tx.Rollback()
conn.log(DEBUG, "Deleting node:", id)
sql := "DELETE FROM nodes WHERE id = $1;"
_, err = tx.Exec(sql, id)
if err != nil {
return err
}
if err := tx.Commit(); err != nil {
return err
}
return nil
}
// Creates a new node.
//
// The operation is ran in a database transaction.
func (conn *DB) AddRelation(
from int64,
data StandardRelationship,
to int64,
) (*Relationship, error) {
var relationship *Relationship
err := conn.withTx(func(tx *sql.Tx) error {
var err error
relationship, err = conn.addRelationwithTx(tx, from, data, to)
return err
})
return relationship, err
}
// Creates a new relationship between two nodes.
//
// It returns the created relationship representation.
func (conn *DB) addRelationwithTx(
tx *sql.Tx,
from int64,
data StandardRelationship,
to int64,
) (*Relationship, error) {
class := data.GetClass()
metadata := data.GetMetadata()
relationship := Relationship{
_conn: conn,
_class: class,
metadata: metadata,
From: from,
To: to,
}
conn.log(DEBUG, "Creating relationship:", from, relationship, to)
sql := "INSERT INTO relationships (_class, node_from, node_to, metadata) VALUES ($1, $2, $3, $4) RETURNING node_from, node_to;"
_, err := tx.Exec(sql, relationship._class, relationship.From, relationship.To, metadata)
if err != nil {
return nil, err
}
return &relationship, nil
}
// Updates a relationship for the provided id and data
func (conn *DB) UpdateRelation(from int64, metadata any, to int64) error {
if metadata == nil {
return fmt.Errorf("metadata cannot be nil")
}
json_metadata, err := json.Marshal(metadata)
if err != nil {
return fmt.Errorf("invalid metadata format: %w", err)
}
tx, err := conn.Connection.Begin()
if err != nil {
return err
}
defer tx.Rollback()
conn.log(DEBUG, "Updating relationship:", from, metadata, to)
sql := "UPDATE relationships SET metadata = $1 WHERE node_from = $2 AND node_to = $3;"
_, err = tx.Exec(sql, json_metadata, from, to)
if err != nil {
return err
}
if err := tx.Commit(); err != nil {
return err
}
return nil
}
// Deletes a relationship for the provided id
func (conn *DB) DeleteRelation(from int64, to int64) error {
tx, err := conn.Connection.Begin()
if err != nil {
return err
}
defer tx.Rollback()
sql := "DELETE FROM relationships WHERE node_from = $1 AND node_to = $2;"
conn.log(DEBUG, "Deleting relationship:", from, to)
_, err = tx.Exec(sql, from, to)
if err != nil {
return nil
}
if err := tx.Commit(); err != nil {
return err
}
return nil
}
// Creates a new node
func BulkCreateNode[T StandardNode](
conn *DB,
nodes []T,
) error {
if len(nodes) == 0 {
return nil
}
tx, err := conn.Connection.Begin()
if err != nil {
return err
}
defer tx.Rollback()
// Build the query dynamically based on number of nodes
valueStrings := make([]string, 0, len(nodes))
valueArgs := make([]interface{}, 0, len(nodes)*3)
for i := range nodes {
n := i * 4
valueStrings = append(valueStrings, fmt.Sprintf("($%d, $%d, $%d, $%d)", n+1, n+2, n+3, n+4))
valueArgs = append(
valueArgs,
nodes[i].GetClass(),
nodes[i].GetName(),
nodes[i].GetMetadata(),
nodes[i].GetOriginalData(),
)
}
sql := fmt.Sprintf(`
INSERT INTO nodes (_class, name, metadata, original_data)
VALUES %s
RETURNING id;`, strings.Join(valueStrings, ","))
conn.log(DEBUG, "Bulk creating nodes:", sql, valueArgs)
// Execute and scan returned IDs
rows, err := tx.Query(sql, valueArgs...)
if err != nil {
return fmt.Errorf("bulk insert failed: %w", err)
}
defer rows.Close()
// Assign IDs back to the nodes
i := 0
for rows.Next() {
var id int64
if err := rows.Scan(&id); err != nil {
return fmt.Errorf("scanning returned id failed: %w", err)
}
nodes[i].SetId(id)
nodes[i].SetConnection(conn)
i++
}
tx.Commit()
return rows.Err()
}
func BulkCreateRelationships[T StandardRelationship](
conn *DB,
relationships []T,
) error {
if len(relationships) == 0 {
return nil
}
tx, err := conn.Connection.Begin()
if err != nil {
return err
}
defer tx.Rollback()
// Build the query dynamically based on number of nodes
valueStrings := make([]string, 0, len(relationships))
valueArgs := make([]interface{}, 0, len(relationships)*3)
for i := range relationships {
n := i * 4
valueStrings = append(
valueStrings,
fmt.Sprintf("($%d, $%d, $%d, $%d) ", n+1, n+2, n+3, n+4),
)
node1, node2 := relationships[i].GetNodes()
class := relationships[i].GetClass()
metadata := relationships[i].GetMetadata()
relationships[i].SetConnection(conn)
if class == "" || node1 <= 0 || node2 <= 0 {
return fmt.Errorf("Invalid relationship: %v, %v, %v", class, node1, node2)
}
valueArgs = append(
valueArgs,
class,
node1,
node2,
metadata,
)
}
sql := fmt.Sprintf(`
INSERT INTO relationships
(_class, node_from, node_to, metadata)
VALUES %s
`, strings.Join(valueStrings, ","))
conn.log(DEBUG, "Bulk creating relationships:", sql, valueArgs)
// Execute and scan returned IDs
_, err = tx.Exec(sql, valueArgs...)
if err != nil {
return fmt.Errorf("bulk insert failed: %w", err)
}
tx.Commit()
return nil
}