feat: implement database handlerImplement a database api to manage nodes and relationships in a "graph databaselike" implemented in sqlite.closes #1

This commit is contained in:
Alexander Navarro 2024-11-13 19:55:02 +00:00
parent 5aaacb10e3
commit b2d8dadcee
17 changed files with 963 additions and 0 deletions

37
pkg/collection.go Normal file
View file

@ -0,0 +1,37 @@
package synchronizator
import "fmt"
// Utility struct to represent a collection of nodes, it's a [Node] itself so all
// the node's functionality is available.
type Collection struct {
Node // Underlaying node info
childs []*Node // Child nodes
}
// Internal RelationshipClass to handle the collection to node relationship
type collection_relation struct{}
func (collection *collection_relation) ToRelationship() (string, []byte, error) {
return "COLLECTION_HAS", nil, nil
}
func (collection *collection_relation) FromRelationship(_class string, metadata []byte) error {
if _class != "COLLECTION_HAS" {
return fmt.Errorf("invalid class %s", _class)
}
return nil
}
// Adds a new child to this collection. Use the underlaying node's AddRelation
// method.
func (collection *Collection) AddChild(node *Node) error {
_, err := collection.AddRelation(&collection_relation{}, node.Id)
if err != nil {
return err
}
collection.childs = append(collection.childs, node)
return nil
}

25
pkg/loglevel_string.go Normal file
View file

@ -0,0 +1,25 @@
// Code generated by "stringer -type=LogLevel"; DO NOT EDIT.
package synchronizator
import "strconv"
func _() {
// An "invalid array index" compiler error signifies that the constant values have changed.
// Re-run the stringer command to generate them again.
var x [1]struct{}
_ = x[ERROR-0]
_ = x[INFO-1]
_ = x[DEBUG-2]
}
const _LogLevel_name = "ERRORINFODEBUG"
var _LogLevel_index = [...]uint8{0, 5, 9, 14}
func (i LogLevel) String() string {
if i < 0 || i >= LogLevel(len(_LogLevel_index)-1) {
return "LogLevel(" + strconv.FormatInt(int64(i), 10) + ")"
}
return _LogLevel_name[_LogLevel_index[i]:_LogLevel_index[i+1]]
}

150
pkg/node.go Normal file
View file

@ -0,0 +1,150 @@
package synchronizator
// A user representation of a node, this interface should be implemented by the user
// to provide the ability to parse the database node into a user defined
// struct that fulfills it's requirements.
//
// Example usage:
//
// type Library struct {
// Name string `json:"name"`
// Category string `json:"category"`
// Metadata map[string]interface{} `json:"metadata"`
// }
//
// func (library *Library) ToNode() (string, string, []byte, error) {
// metadata, err := json.Marshal(library.Metadata)
// if err != nil {
// return "", "", nil, err
// }
// return "LIBRARY", library.Name, metadata, nil
// }
//
// func (library *Library) FromNode(_class string, name string, metadata []byte) error {
// if _class != "LIBRARY" {
// return fmt.Errorf("invalid class %s", _class)
// }
// if err := json.Unmarshal(metadata, &library.Metadata); err != nil {
// return err
// }
// library.Name = name
// return nil
// }
type NodeClass interface {
// How to transform the struct into a node. It needs to return the class,
// name and a []byte representation of the metadata.
//
// - class: Is used for classification and query pourposes. It's recomended to provide a constante string to increase consistency.
// - name: A user friendly name
// - metadata: Arbitrary data. This will be stored as a jsonb in the database
//
ToNode() (string, string, []byte, error)
// How to transform a node into the struct. This method should modify the
// struct directly as it receives a pointer.
//
// - class: Is used for classification and query pourposes.
// - name: A user friendly name
// - metadata: Arbitrary data. This is stored as a jsonb in the database
FromNode(string, string, []byte) error
}
// A node in the database.
// It adds some helper methods to easily manage the node.
type Node struct {
_conn *DB // Underlaying connection to the database.
_class string // The class of the node, should not be modified to avoid inconsistencies.
_relationships []*Relationship // Relationships of the node
Id int64 // The id of the node
name string // The name of the node
metadata []byte // Arbitrary data. This is stored as a jsonb in the database
}
// Creates a new relation of type RelationshipClass to the node with the
// provided id. An error is returned if the relation already exists.
//
// This method is a wrapper around the AddRelation method of the connection.
func (node *Node) AddRelation(relation RelationshipClass, to int64) (*Relationship, error) {
return node._conn.AddRelation(node.Id, relation, to)
}
// Update a relation with the node of the provided id.
//
// This method is a wrapper around the UpdateRelation method of the connection.
func (node *Node) UpdateRelation(metadata any, to int64) error {
return node._conn.UpdateRelation(node.Id, metadata, to)
}
// Delete a relation with the node of the provided id.
//
// This method is a wrapper around the DeleteRelation method of the connection.
func (node *Node) DeleteRelation(to int64) error {
return node._conn.DeleteRelation(node.Id, to)
}
// Fetch all the outgoing relations for this node. This method will return a
// slice of relationships pointers and also store them in the node for further
// use.
func (node *Node) GetOutRelations() ([]*Relationship, error) {
sql := `
WITH RECURSIVE NodeRelationships AS (
SELECT
*,
relationships._class AS relationship_class,
relationships.metadata AS relationship_metadata
FROM
nodes as src
JOIN relationships ON src.id = relationships.node_from
WHERE src.id = $1
)
SELECT
NodeRelationships.id as src_node,
NodeRelationships.relationship_class,
NodeRelationships.relationship_metadata,
dst.id as dst_id
FROM
NodeRelationships
JOIN nodes as dst ON dst.id = NodeRelationships.node_to
`
rows, err := node._conn.Query(sql, node.Id)
if err != nil {
return nil, err
}
defer rows.Close()
node._relationships = make([]*Relationship, 0)
for rows.Next() {
relationship := &Relationship{
_conn: node._conn,
}
if err := rows.Scan(&relationship.From, &relationship._class, &relationship.Metadata, &relationship.To); err != nil {
return nil, err
}
node._relationships = append(node._relationships, relationship)
}
return node._relationships, nil
}
// Deletes this node from the database.
// This method is a wrapper around the DeleteNode method of the connection.
func (node *Node) Delete() error {
return node._conn.DeleteNode(node.Id)
}
// Allows to retreive the saved information back into the user struct. This
// method will call the [NodeClass.FromNode] of the provided struct.
//
// Example:
//
// data := &Library{}
// if err := node.Unmarshall(data); err != nil {
// println(err)
// }
func (node *Node) Unmarshall(dst NodeClass) error {
return dst.FromNode(node._class, node.name, node.metadata)
}

50
pkg/relationship.go Normal file
View file

@ -0,0 +1,50 @@
package synchronizator
// A user representation of a relationship, this interface should be implemented by the user
// to provide the ability to parse the database relationship into a user defined
// struct that fulfills it's requirements.
//
// Example usage:
//
// type collection_relation struct{}
//
// func (collection *collection_relation) ToRelationship() (string, []byte, error) {
// return "COLLECTION_HAS", nil, nil
// }
//
// func (collection *collection_relation) FromRelationship(_class string, metadata []byte) error {
// if _class != "COLLECTION_HAS" {
// return fmt.Errorf("invalid class %s", _class)
// }
// return nil
// }
type RelationshipClass interface {
// How to transform the struct into a collection. It needs to return the class,
// and a []byte representation of the metadata.
//
// - class: Is used for classification and query pourposes. It's recomended to provide a constante string to increase consistency.
// - metadata: Arbitrary data. This will be stored as a jsonb in the database
//
ToRelationship() (string, []byte, error)
// How to transform a relationship into the struct. This method should modify the
// struct directly as it receives a pointer.
//
// - class: Is used for classification and query pourposes.
// - metadata: Arbitrary data. This is stored as a jsonb in the database
FromRelationship(string, []byte) error
}
// A relationship in the database.
// It adds some helper methods to easily manage the node.
type Relationship struct {
_conn *DB // Underlaying connection to the database.
_class string // The class of the node, should not be modified to avoid inconsistencies.
From int64 // From what node this relation comes from
To int64 // To what node this relation goes to
Metadata []byte // Arbitrary data. This is stored as a jsonb in the database
}
func (relationship *Relationship) GetClass() string {
return relationship._class
}

368
pkg/synchronizator.go Normal file
View file

@ -0,0 +1,368 @@
// Package synchronizator: Provides a library to help synchronize data from
// different sources.
//
// It does so implementing a graph database representing the relation of the
// different entities of data called "nodes", helping you find, create or
// delete the "equivalent" entities in the different sources.
package synchronizator
import (
sql "database/sql"
"encoding/json"
"fmt"
"os"
"time"
)
// DB Syncronization database interface
type DB struct {
Connection *sql.DB // underliying database connection
logger *os.File
log_level LogLevel
drop_tables bool
}
// Options for the DB struct
type Options struct {
Logger *os.File // Where to log, defaults to os.Stdout
Log_level LogLevel // Log level, defaults to INFO
DANGEROUSLY_DROP_TABLES bool // Drop tables on initialization, defaults to false
}
type LogLevel int
// The log levels, lower levels take precedence
//
//go:generate stringer -type=LogLevel
const (
ERROR LogLevel = iota
INFO
DEBUG
)
var DefaultOptions = &Options{
Logger: os.Stdout,
Log_level: INFO,
DANGEROUSLY_DROP_TABLES: false,
}
// Constructor for the DB struct, it's dicourage to create the struct yourself!
// It receives a pointer to a sql.DB connection and an optional pointer to an Options struct.
//
// You need to initialize the connection yourself, this library does not handle that and only use the provided connection.
// This also alows to append this labrary tables to your already existing database.
func New(connection *sql.DB, options *Options) (*DB, error) {
if options == nil {
options = DefaultOptions
}
conn := DB{
Connection: connection,
logger: options.Logger,
log_level: options.Log_level,
drop_tables: options.DANGEROUSLY_DROP_TABLES,
}
err := conn.bootstrap()
if err != nil {
return nil, err
}
return &conn, nil
}
// Write a log message to the logger.
//
// It receives a LogLevel and a variadic argument that is directly passed to fmt.Fprintln.
// The log is only written if the LogLevel is the same or lower than the configured value in the struct.
func (conn *DB) log(level LogLevel, args ...any) {
if level > conn.log_level {
return
}
fmt.Fprintf(conn.logger, "[ %s - %-5s ]: ", time.Now().Format(time.DateTime), level.String())
fmt.Fprintln(conn.logger, args...)
}
// Initialize the database, creating the tables and indexes.
func (conn *DB) bootstrap() error {
conn.log(INFO, "Initializing database...")
sql := ""
if conn.drop_tables {
sql += `
DROP TABLE IF EXISTS nodes;
DROP TABLE IF EXISTS relationships;
DROP INDEX IF EXISTS node_class;
DROP INDEX IF EXISTS relationships_class;
`
}
sql += `
CREATE TABLE IF NOT EXISTS nodes (
id INTEGER PRIMARY KEY AUTOINCREMENT,
_class text NOT NULL,
name TEXT,
metadata jsonb DEFAULT '{}'
);
CREATE INDEX IF NOT EXISTS node_class on nodes (_class);
CREATE TABLE IF NOT EXISTS relationships (
node_from INTEGER NOT NULL,
node_to INTEGER NOT NULL,
_class text NOT NULL,
metadata jsonb DEFAULT '{}',
PRIMARY KEY (node_from, node_to),
CHECK (node_from != node_to),
CONSTRAINT fk_node_from_relationships FOREIGN KEY (node_from) REFERENCES nodes(id),
CONSTRAINT fk_node_to_relationships FOREIGN KEY (node_to) REFERENCES nodes(id)
);
CREATE INDEX IF NOT EXISTS relationships_class on relationships (_class);
`
conn.log(DEBUG, sql)
_, err := conn.Connection.Exec(sql)
if err != nil {
return err
}
return nil
}
func (conn *DB) Query(sql string, args ...any) (*sql.Rows, error) {
conn.log(DEBUG, "Executing query:", sql, args)
rows, err := conn.Connection.Query(sql, args...)
if err != nil {
return nil, err
}
return rows, nil
}
// Creates a new Collection with the provided data.
//
// A collection is only a Node wrapper with some extended functionality to
// manage multiple nodes. For more information see [DB.NewNode] method and the
// [Collection] struct.
func (conn *DB) NewCollection(data NodeClass) (*Collection, error) {
node, err := conn.NewNode(data)
if err != nil {
return nil, err
}
collection := &Collection{
Node: *node,
childs: make([]*Node, 0),
}
return collection, nil
}
// Creates a new node
func (conn *DB) NewNode(data NodeClass) (*Node, error) {
class, name, metadata, err := data.ToNode()
if err != nil {
return nil, err
}
node := Node{
_conn: conn,
_class: class,
name: name,
metadata: metadata,
Id: -1,
}
tx, err := conn.Connection.Begin()
if err != nil {
return nil, err
}
defer tx.Rollback()
conn.log(DEBUG, "Creating node:", node)
sql := "INSERT INTO nodes (_class, name, metadata) VALUES ($1, $2, $3) RETURNING id;"
err = tx.QueryRow(sql, node._class, node.name, metadata).Scan(&node.Id)
if err != nil {
return nil, err
}
if err := tx.Commit(); err != nil {
return nil, err
}
return &node, nil
}
// Updates a node with the provided id and data
func (conn *DB) UpdateNode(id int64, data NodeClass) (*Node, error) {
class, name, metadata, err := data.ToNode()
if err != nil {
return nil, err
}
node := Node{
_conn: conn,
_class: class,
name: name,
metadata: metadata,
Id: id,
}
tx, err := conn.Connection.Begin()
if err != nil {
return nil, err
}
defer tx.Rollback()
conn.log(DEBUG, "Updating node:", id, node)
sql := "UPDATE nodes SET _class = $1, name = $2, metadata = $3 WHERE id = $4;"
_, err = tx.Exec(sql, node._class, node.name, node.metadata, id)
if err != nil {
return nil, err
}
if err := tx.Commit(); err != nil {
return nil, err
}
return &node, nil
}
// Return a node with the provided id
func (conn *DB) GetNode(id int64) (*Node, error) {
node := Node{
Id: id,
_conn: conn,
}
sql := "SELECT _class, name, metadata FROM nodes WHERE id = $1;"
conn.log(DEBUG, sql)
err := conn.Connection.QueryRow(sql, id).Scan(&node._class, &node.name, &node.metadata)
if err != nil {
conn.log(DEBUG, err)
return nil, fmt.Errorf("No row matching id = %v", id)
}
return &node, nil
}
// Deletes a node with the provided id
func (conn *DB) DeleteNode(id int64) error {
tx, err := conn.Connection.Begin()
if err != nil {
return err
}
defer tx.Rollback()
conn.log(DEBUG, "Deleting node:", id)
sql := "DELETE FROM nodes WHERE id = $1;"
_, err = tx.Exec(sql, id)
if err != nil {
return err
}
if err := tx.Commit(); err != nil {
return err
}
return nil
}
// Creates a new relationship between two nodes.
//
// It returns the created relationship representation.
func (conn *DB) AddRelation(
from int64,
data RelationshipClass,
to int64,
) (*Relationship, error) {
class, metadata, err := data.ToRelationship()
if err != nil {
return nil, err
}
relationship := Relationship{
_conn: conn,
_class: class,
Metadata: metadata,
From: from,
To: to,
}
tx, err := conn.Connection.Begin()
if err != nil {
return nil, err
}
defer tx.Rollback()
conn.log(DEBUG, "Creating relationship:", from, relationship, to)
sql := "INSERT INTO relationships (_class, node_from, node_to, metadata) VALUES ($1, $2, $3, $4) RETURNING node_from, node_to;"
_, err = tx.Exec(sql, relationship._class, relationship.From, relationship.To, metadata)
if err != nil {
return nil, err
}
if err := tx.Commit(); err != nil {
return nil, err
}
return &relationship, nil
}
// Updates a relationship for the provided id and data
func (conn *DB) UpdateRelation(from int64, metadata any, to int64) error {
if metadata == nil {
return fmt.Errorf("metadata cannot be nil")
}
json_metadata, err := json.Marshal(metadata)
if err != nil {
return fmt.Errorf("invalid metadata format: %w", err)
}
tx, err := conn.Connection.Begin()
if err != nil {
return err
}
defer tx.Rollback()
conn.log(DEBUG, "Updating relationship:", from, metadata, to)
sql := "UPDATE relationships SET metadata = $1 WHERE node_from = $2 AND node_to = $3;"
_, err = tx.Exec(sql, json_metadata, from, to)
if err != nil {
return err
}
if err := tx.Commit(); err != nil {
return err
}
return nil
}
// Deletes a relationship for the provided id
func (conn *DB) DeleteRelation(from int64, to int64) error {
tx, err := conn.Connection.Begin()
if err != nil {
return err
}
defer tx.Rollback()
sql := "DELETE FROM relationships WHERE node_from = $1 AND node_to = $2;"
conn.log(DEBUG, "Deleting relationship:", from, to)
_, err = tx.Exec(sql, from, to)
if err != nil {
return nil
}
if err := tx.Commit(); err != nil {
return err
}
return nil
}