generated from alecodes/base-template
also refactor public and internal api to support transaction between multiple methods #2
456 lines
11 KiB
Go
456 lines
11 KiB
Go
// Package synchronizator: Provides a library to help synchronize data from
|
|
// different sources.
|
|
//
|
|
// It does so implementing a graph database representing the relation of the
|
|
// different entities of data called "nodes", helping you find, create or
|
|
// delete the "equivalent" entities in the different sources.
|
|
//
|
|
// In this library we use the following nomemclature:
|
|
// - [struct_name]: The representation of the element in the database
|
|
// - [struct_name]Class: An interface so the users can create a custom representation of the element that make's sence in their application. The interface needs to provide a way to transform the struct into a node and viceversa.
|
|
package synchronizator
|
|
|
|
import (
|
|
sql "database/sql"
|
|
"encoding/json"
|
|
"fmt"
|
|
"os"
|
|
"time"
|
|
)
|
|
|
|
// DB Syncronization database interface
|
|
type DB struct {
|
|
Connection *sql.DB // underliying database connection
|
|
logger *os.File
|
|
log_level LogLevel
|
|
drop_tables bool
|
|
}
|
|
|
|
// Options for the DB struct
|
|
type Options struct {
|
|
Logger *os.File // Where to log, defaults to os.Stdout
|
|
Log_level LogLevel // Log level, defaults to INFO
|
|
DANGEROUSLY_DROP_TABLES bool // Drop tables on initialization, defaults to false
|
|
}
|
|
|
|
type LogLevel int
|
|
|
|
// The log levels, lower levels take precedence
|
|
//
|
|
//go:generate stringer -type=LogLevel
|
|
const (
|
|
ERROR LogLevel = iota
|
|
INFO
|
|
DEBUG
|
|
)
|
|
|
|
var DefaultOptions = &Options{
|
|
Logger: os.Stdout,
|
|
Log_level: INFO,
|
|
DANGEROUSLY_DROP_TABLES: false,
|
|
}
|
|
|
|
// Constructor for the DB struct, it's dicourage to create the struct yourself!
|
|
// It receives a pointer to a sql.DB connection and an optional pointer to an Options struct.
|
|
//
|
|
// You need to initialize the connection yourself, this library does not handle that and only use the provided connection.
|
|
// This also alows to append this labrary tables to your already existing database.
|
|
func New(connection *sql.DB, options *Options) (*DB, error) {
|
|
if options == nil {
|
|
options = DefaultOptions
|
|
}
|
|
|
|
conn := DB{
|
|
Connection: connection,
|
|
logger: options.Logger,
|
|
log_level: options.Log_level,
|
|
drop_tables: options.DANGEROUSLY_DROP_TABLES,
|
|
}
|
|
|
|
err := conn.bootstrap()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return &conn, nil
|
|
}
|
|
|
|
// Write a log message to the logger.
|
|
//
|
|
// It receives a LogLevel and a variadic argument that is directly passed to fmt.Fprintln.
|
|
// The log is only written if the LogLevel is the same or lower than the configured value in the struct.
|
|
func (conn *DB) log(level LogLevel, args ...any) {
|
|
if level > conn.log_level {
|
|
return
|
|
}
|
|
|
|
fmt.Fprintf(conn.logger, "[ %s - %-5s ]: ", time.Now().Format(time.DateTime), level.String())
|
|
fmt.Fprintln(conn.logger, args...)
|
|
}
|
|
|
|
// Initialize the database, creating the tables and indexes.
|
|
func (conn *DB) bootstrap() error {
|
|
conn.log(INFO, "Initializing database...")
|
|
|
|
sql := ""
|
|
|
|
if conn.drop_tables {
|
|
sql += `
|
|
DROP TABLE IF EXISTS nodes;
|
|
DROP TABLE IF EXISTS relationships;
|
|
DROP INDEX IF EXISTS node_class;
|
|
DROP INDEX IF EXISTS relationships_class;
|
|
`
|
|
}
|
|
|
|
sql += `
|
|
CREATE TABLE IF NOT EXISTS nodes (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
_class text NOT NULL,
|
|
name TEXT,
|
|
metadata jsonb DEFAULT '{}'
|
|
);
|
|
|
|
CREATE INDEX IF NOT EXISTS node_class on nodes (_class);
|
|
|
|
|
|
CREATE TABLE IF NOT EXISTS relationships (
|
|
node_from INTEGER NOT NULL,
|
|
node_to INTEGER NOT NULL,
|
|
_class text NOT NULL,
|
|
metadata jsonb DEFAULT '{}',
|
|
PRIMARY KEY (node_from, node_to),
|
|
CHECK (node_from != node_to),
|
|
CONSTRAINT fk_node_from_relationships FOREIGN KEY (node_from) REFERENCES nodes(id),
|
|
CONSTRAINT fk_node_to_relationships FOREIGN KEY (node_to) REFERENCES nodes(id)
|
|
);
|
|
|
|
CREATE INDEX IF NOT EXISTS relationships_class on relationships (_class);
|
|
`
|
|
|
|
conn.log(DEBUG, sql)
|
|
_, err := conn.Connection.Exec(sql)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// Allows you to run the underliying query in a transaction.
|
|
func (conn *DB) withTx(fn func(*sql.Tx) error) error {
|
|
tx, err := conn.Connection.Begin()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer tx.Rollback()
|
|
|
|
if err := fn(tx); err != nil {
|
|
return err
|
|
}
|
|
|
|
return tx.Commit()
|
|
}
|
|
|
|
func (conn *DB) Query(sql string, args ...any) (*sql.Rows, error) {
|
|
conn.log(DEBUG, "Executing query:", sql, args)
|
|
|
|
rows, err := conn.Connection.Query(sql, args...)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return rows, nil
|
|
}
|
|
|
|
// Creates a new Platform with the provided data.
|
|
//
|
|
// A collection is only a Node wrapper with some extended functionality to
|
|
// manage multiple nodes. For more information see [DB.NewNode] method and the
|
|
// [Platform] struct.
|
|
func (conn *DB) NewPlatform(data StandardNode) (*Platform, error) {
|
|
var platform *Platform
|
|
err := conn.withTx(func(tx *sql.Tx) error {
|
|
node, err := conn.newNodewithTx(tx, "PLATFORM", data)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
collection, err := conn.newCollectionwithTx(
|
|
tx,
|
|
&default_collection{platform_name: node.name},
|
|
)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
platform := &Platform{
|
|
Node: *node,
|
|
collections: []*Collection{collection},
|
|
}
|
|
|
|
_, err = conn.addRelationwithTx(tx, platform.Id, &collection_relation{}, collection.Id)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
})
|
|
return platform, err
|
|
}
|
|
|
|
// Creates a new Collection with the provided data.
|
|
//
|
|
// A collection is only a Node wrapper with some extended functionality to
|
|
// manage multiple nodes. For more information see [DB.NewNode] method and the
|
|
// [Collection] struct.
|
|
//
|
|
// The operation is ran in a database transaction.
|
|
func (conn *DB) NewCollection(data StandardNode) (*Collection, error) {
|
|
var collection *Collection
|
|
|
|
err := conn.withTx(func(tx *sql.Tx) error {
|
|
var err error
|
|
collection, err = conn.newCollectionwithTx(tx, data)
|
|
return err
|
|
})
|
|
|
|
return collection, err
|
|
}
|
|
|
|
// Creates a new Collection with the provided data.
|
|
//
|
|
// A collection is only a Node wrapper with some extended functionality to
|
|
// manage multiple nodes. For more information see [DB.NewNode] method and the
|
|
// [Collection] struct.
|
|
func (conn *DB) newCollectionwithTx(tx *sql.Tx, data StandardNode) (*Collection, error) {
|
|
node, err := conn.newNodewithTx(tx, "COLLECTION", data)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
collection := &Collection{
|
|
Node: *node,
|
|
childs: make([]*Node, 0),
|
|
}
|
|
|
|
return collection, err
|
|
}
|
|
|
|
// Creates a new node.
|
|
//
|
|
// The operation is ran in a database transaction.
|
|
func (conn *DB) NewNode(class string, data StandardNode) (*Node, error) {
|
|
var node *Node
|
|
|
|
err := conn.withTx(func(tx *sql.Tx) error {
|
|
var err error
|
|
node, err = conn.newNodewithTx(tx, class, data)
|
|
return err
|
|
})
|
|
|
|
return node, err
|
|
}
|
|
|
|
// Creates a new node
|
|
func (conn *DB) newNodewithTx(tx *sql.Tx, class string, data StandardNode) (*Node, error) {
|
|
name, metadata, err := data.ToNode()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
node := Node{
|
|
_conn: conn,
|
|
_class: class,
|
|
name: name,
|
|
metadata: metadata,
|
|
Id: -1,
|
|
}
|
|
|
|
conn.log(DEBUG, "Creating node:", node)
|
|
|
|
sql := "INSERT INTO nodes (_class, name, metadata) VALUES ($1, $2, $3) RETURNING id;"
|
|
|
|
err = tx.QueryRow(sql, node._class, node.name, metadata).Scan(&node.Id)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return &node, nil
|
|
}
|
|
|
|
// Updates a node with the provided id and data
|
|
func (conn *DB) UpdateNode(id int64, data StandardNode) (*Node, error) {
|
|
name, metadata, err := data.ToNode()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
node := Node{
|
|
_conn: conn,
|
|
name: name,
|
|
metadata: metadata,
|
|
Id: id,
|
|
}
|
|
|
|
tx, err := conn.Connection.Begin()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer tx.Rollback()
|
|
conn.log(DEBUG, "Updating node:", id, node)
|
|
|
|
sql := "UPDATE nodes SET _class = $1, name = $2, metadata = $3 WHERE id = $4;"
|
|
|
|
_, err = tx.Exec(sql, node._class, node.name, node.metadata, id)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if err := tx.Commit(); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return &node, nil
|
|
}
|
|
|
|
// Return a node with the provided id
|
|
func (conn *DB) GetNode(id int64) (*Node, error) {
|
|
node := Node{
|
|
Id: id,
|
|
_conn: conn,
|
|
}
|
|
sql := "SELECT _class, name, metadata FROM nodes WHERE id = $1;"
|
|
conn.log(DEBUG, sql)
|
|
|
|
err := conn.Connection.QueryRow(sql, id).Scan(&node._class, &node.name, &node.metadata)
|
|
if err != nil {
|
|
conn.log(DEBUG, err)
|
|
return nil, fmt.Errorf("No row matching id = %v", id)
|
|
}
|
|
|
|
return &node, nil
|
|
}
|
|
|
|
// Deletes a node with the provided id
|
|
func (conn *DB) DeleteNode(id int64) error {
|
|
tx, err := conn.Connection.Begin()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer tx.Rollback()
|
|
|
|
conn.log(DEBUG, "Deleting node:", id)
|
|
sql := "DELETE FROM nodes WHERE id = $1;"
|
|
|
|
_, err = tx.Exec(sql, id)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if err := tx.Commit(); err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Creates a new node.
|
|
//
|
|
// The operation is ran in a database transaction.
|
|
func (conn *DB) AddRelation(
|
|
from int64,
|
|
data StandardRelationship,
|
|
to int64,
|
|
) (*Relationship, error) {
|
|
var relationship *Relationship
|
|
|
|
err := conn.withTx(func(tx *sql.Tx) error {
|
|
var err error
|
|
relationship, err = conn.addRelationwithTx(tx, from, data, to)
|
|
return err
|
|
})
|
|
|
|
return relationship, err
|
|
}
|
|
|
|
// Creates a new relationship between two nodes.
|
|
//
|
|
// It returns the created relationship representation.
|
|
func (conn *DB) addRelationwithTx(
|
|
tx *sql.Tx,
|
|
from int64,
|
|
data StandardRelationship,
|
|
to int64,
|
|
) (*Relationship, error) {
|
|
class, metadata, err := data.ToRelationship()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
relationship := Relationship{
|
|
_conn: conn,
|
|
_class: class,
|
|
Metadata: metadata,
|
|
From: from,
|
|
To: to,
|
|
}
|
|
|
|
conn.log(DEBUG, "Creating relationship:", from, relationship, to)
|
|
|
|
sql := "INSERT INTO relationships (_class, node_from, node_to, metadata) VALUES ($1, $2, $3, $4) RETURNING node_from, node_to;"
|
|
|
|
_, err = tx.Exec(sql, relationship._class, relationship.From, relationship.To, metadata)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return &relationship, nil
|
|
}
|
|
|
|
// Updates a relationship for the provided id and data
|
|
func (conn *DB) UpdateRelation(from int64, metadata any, to int64) error {
|
|
if metadata == nil {
|
|
return fmt.Errorf("metadata cannot be nil")
|
|
}
|
|
json_metadata, err := json.Marshal(metadata)
|
|
if err != nil {
|
|
return fmt.Errorf("invalid metadata format: %w", err)
|
|
}
|
|
tx, err := conn.Connection.Begin()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer tx.Rollback()
|
|
conn.log(DEBUG, "Updating relationship:", from, metadata, to)
|
|
sql := "UPDATE relationships SET metadata = $1 WHERE node_from = $2 AND node_to = $3;"
|
|
_, err = tx.Exec(sql, json_metadata, from, to)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if err := tx.Commit(); err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Deletes a relationship for the provided id
|
|
func (conn *DB) DeleteRelation(from int64, to int64) error {
|
|
tx, err := conn.Connection.Begin()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
defer tx.Rollback()
|
|
|
|
sql := "DELETE FROM relationships WHERE node_from = $1 AND node_to = $2;"
|
|
conn.log(DEBUG, "Deleting relationship:", from, to)
|
|
_, err = tx.Exec(sql, from, to)
|
|
if err != nil {
|
|
return nil
|
|
}
|
|
|
|
if err := tx.Commit(); err != nil {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|