diff --git a/.justfile b/.justfile index 30e466e..49fce98 100644 --- a/.justfile +++ b/.justfile @@ -1,2 +1,5 @@ run: - go run examples/usage.go + go run examples/usage/main.go + +run-example example: + go run examples/{{example}}/main.go diff --git a/examples/mock_data/Readwise/Document List.bru b/examples/mock_data/Readwise/Document List.bru new file mode 100644 index 0000000..f283088 --- /dev/null +++ b/examples/mock_data/Readwise/Document List.bru @@ -0,0 +1,11 @@ +meta { + name: Document List + type: http + seq: 2 +} + +get { + url: https://readwise.io/api/v3/list/ + body: none + auth: none +} diff --git a/examples/mock_data/Readwise/bruno.json b/examples/mock_data/Readwise/bruno.json new file mode 100644 index 0000000..2885ab6 --- /dev/null +++ b/examples/mock_data/Readwise/bruno.json @@ -0,0 +1,9 @@ +{ + "version": "1", + "name": "Readwise", + "type": "collection", + "ignore": [ + "node_modules", + ".git" + ] +} \ No newline at end of file diff --git a/examples/mock_data/Readwise/environments/Readwise.bru b/examples/mock_data/Readwise/environments/Readwise.bru new file mode 100644 index 0000000..d9d33b2 --- /dev/null +++ b/examples/mock_data/Readwise/environments/Readwise.bru @@ -0,0 +1,3 @@ +vars:secret [ + API-KEY +] diff --git a/examples/readwise/main.go b/examples/readwise/main.go new file mode 100644 index 0000000..bb8f70b --- /dev/null +++ b/examples/readwise/main.go @@ -0,0 +1,195 @@ +package main + +import ( + "context" + "database/sql" + "encoding/json" + "fmt" + "io" + "net/http" + "net/url" + "time" + + _ "modernc.org/sqlite" + + synchronizator "git.alecodes.page/alecodes/synchronizator/pkg" +) + +const API_TOKEN = "" + +type ReadwiseCursor struct { + Cursor string +} + +type ReadwiseApiResponse struct { + Count uint64 `json:"count"` + NextPageCursor string `json:"nextPageCursor"` + Results []ReadwiseDocument `json:"results"` +} + +type RawReadwiseApiResponse struct { + Count uint64 `json:"count"` + NextPageCursor string `json:"nextPageCursor"` + Results []json.RawMessage `json:"results"` // All ass raw +} + +type ReadwiseDocument struct { + Id string `json:"id"` + Url string `json:"url"` + Title string `json:"title"` + // Author string `json:"author"` + // Source string `json:"source"` + // Category string `json:"category"` + Location string `json:"location"` + // Tags map[string]string `json:"tags"` + // SiteName string `json:"site_name"` + // CreatedAt string `json:"created_at"` + // UpdatedAt string `json:"updated_at"` + // Summary string `json:"summary"` + SourceUrl string `json:"source_url"` + // Notes string `json:"notes"` + // ParentId interface{} `json:"parent_id"` + // SavedAt string `json:"saved_at"` + // LastMovedAt string `json:"last_moved_at"` +} + +func getReadwiseDocuments( + ctx context.Context, + pagination synchronizator.Pagination, +) (synchronizator.FetchNodesResponse, error) { + payload := synchronizator.FetchNodesResponse{ + Pagination: pagination, + } + + cursor, ok := ctx.Value("readwise-cursor").(*ReadwiseCursor) + + if !ok { + return payload, fmt.Errorf("Couldn't retreive cursor from context!") + } + + var documents []*synchronizator.Node + + params := url.Values{} + if cursor.Cursor != "" { + params.Add("pageCursor", cursor.Cursor) + } + + url := "https://readwise.io/api/v3/list?" + params.Encode() + req, err := http.NewRequest("GET", url, nil) + if err != nil { + fmt.Println("Error creating request:", err) + return payload, err + } + + // Add the authorization header + req.Header.Set("Authorization", "Token "+API_TOKEN) + + client := &http.Client{} + resp, err := client.Do(req) + if err != nil { + return payload, err + } + + body, err := io.ReadAll(resp.Body) + resp.Body.Close() + + var data ReadwiseApiResponse + err = json.Unmarshal(body, &data) + if err != nil { + return payload, err + } + + var rawData RawReadwiseApiResponse + err = json.Unmarshal(body, &rawData) + if err != nil { + return payload, err + } + + cursor.Cursor = data.NextPageCursor + + documents = make([]*synchronizator.Node, 0, len(data.Results)) + + for i, document := range data.Results { + metadata, err := json.Marshal(document) + if err != nil { + return payload, err + } + + node := synchronizator.NewNode( + document.Title, + "DOCUMENT", + metadata, + rawData.Results[i], + ) + documents = append(documents, node) + } + + payload.Response = documents + + return payload, nil +} + +func main() { + start := time.Now() + + defer func() { + elapsed := time.Now().Sub(start) + fmt.Printf("\n\nExecution time took: %s", elapsed) + }() + + connection, err := sql.Open("sqlite", "readwise.sql") + if err != nil { + fmt.Println(err) + + return + } + + defer connection.Close() + + opts := synchronizator.DefaultOptions + // opts.Log_level = synchronizator.DEBUG + opts.DANGEROUSLY_DROP_TABLES = true + + sync, err := synchronizator.New(connection, opts) + if err != nil { + fmt.Println(err) + + return + } + + readwiseReader, err := sync.NewPlatform("readwise_reader", nil, nil) + if err != nil { + fmt.Println(err) + return + } + + pagination := synchronizator.StartPagination + pagination.Pages = 0 + pagination.Total = 100 + pagination.Limit = 100 + pool_config := &synchronizator.WorkConfig{ + AmountOfWorkers: 5, + MaxRetries: 1, + BaseRetryTime: time.Second * 2, + RateLimit: synchronizator.NewRateLimiter(20, time.Minute), + Timeout: time.Second * 2, + } + + collection, err := readwiseReader.GetDefaultCollection() + + cursor := &ReadwiseCursor{} + + ctx := context.WithValue(context.Background(), "readwise-cursor", cursor) + + for { + err = collection.FetchNodes(ctx, getReadwiseDocuments, pagination, pool_config) + if err != nil { + fmt.Println(err) + return + } + + if cursor.Cursor == "" { + break + } + } +} diff --git a/examples/usage.bkp.go b/examples/usage.bkp.go deleted file mode 100644 index 5c1c47d..0000000 --- a/examples/usage.bkp.go +++ /dev/null @@ -1,222 +0,0 @@ -package main - -import ( - "database/sql" - "encoding/csv" - "encoding/json" - "fmt" - "io" - "os" - "path/filepath" - "strings" - - synchronizator "git.alecodes.page/alecodes/synchronizator/pkg" - _ "modernc.org/sqlite" -) - -type ProgrammingLanguage struct { - Name string -} - -func (language *ProgrammingLanguage) ToNode() (string, string, []byte, error) { - metadata, err := json.Marshal("{\"test\": \"foo\"}") - if err != nil { - return "", "", nil, err - } - return "PROGRAMMING_LANGUAGE", language.Name, metadata, nil -} - -func (language *ProgrammingLanguage) FromNode(_class string, name string, metadata []byte) error { - if _class != "PROGRAMMING_LANGUAGE" { - return fmt.Errorf("invalid class %s", _class) - } - language.Name = name - return nil -} - -type Library struct { - Name string `json:"name"` - Category string `json:"category"` - Metadata map[string]interface{} `json:"metadata"` -} - -func (library *Library) ToNode() (string, string, []byte, error) { - metadata, err := json.Marshal(library.Metadata) - if err != nil { - return "", "", nil, err - } - return "LIBRARY", library.Name, metadata, nil -} - -func (library *Library) FromNode(_class string, name string, metadata []byte) error { - if _class != "LIBRARY" { - return fmt.Errorf("invalid class %s", _class) - } - if err := json.Unmarshal(metadata, &library.Metadata); err != nil { - return err - } - library.Name = name - return nil -} - -type ( - BelognsTo struct{} - IsSame struct{} -) - -func main2() { - connection, err := sql.Open("sqlite", "db.sql") - if err != nil { - fmt.Println(err) - - return - } - - defer connection.Close() - - opts := synchronizator.DefaultOptions - // opts.Log_level = synchronizator.DEBUG - opts.DANGEROUSLY_DROP_TABLES = true - - sync, err := synchronizator.New(connection, opts) - if err != nil { - fmt.Println(err) - - return - } - - languages, err := loadData() - if err != nil { - fmt.Println(err) - } - - for language, libraries := range languages { - _, err := generateCollection( - &ProgrammingLanguage{Name: strings.ToUpper(language)}, - libraries, - sync, - ) - if err != nil { - println(err) - } - - // fmt.Fprintf( - // os.Stderr, - // "libraries_collection%+v\n", - // libraries_collection, - // ) - } - - golang, err := sync.GetNode(1) - if err != nil { - println(err) - } - fmt.Println("%v", golang) - relationships, err := golang.GetOutRelations() - if err != nil { - panic(err) - } - - for _, relationship := range relationships { - fmt.Printf("%v -> %v -> %v\n", relationship.From, relationship.GetClass(), relationship.To) - } -} - -// generateCollection Main example of the usage of the synchronizator package -func generateCollection( - language *ProgrammingLanguage, - libraries []Library, - sync *synchronizator.DB, -) (*synchronizator.Collection, error) { - language_libraries, err := sync.NewCollection(language) - if err != nil { - return nil, err - } - - for _, library := range libraries { - node, err := sync.NewNode(&library) - if err != nil { - return nil, err - } - data := &Library{} - if err := node.Unmarshall(data); err != nil { - println(err) - } - - if err := language_libraries.AddChild(node); err != nil { - return nil, err - } - } - - return language_libraries, nil -} - -func loadData() (map[string][]Library, error) { - // Find all CSV files - files, err := filepath.Glob("examples/mock_data/*.csv") - if err != nil { - return nil, fmt.Errorf("failed to glob files: %w", err) - } - - result := make(map[string][]Library) - - for _, file := range files { - // Load CSV file - libraries, err := processCSVFile(file) - if err != nil { - return nil, fmt.Errorf("failed to process %s: %w", file, err) - } - - // Use base filename without extension as language_name - language_name := filepath.Base(file) - language_name = language_name[:len(language_name)-len(filepath.Ext(language_name))] - - result[language_name] = libraries - } - - return result, nil -} - -func processCSVFile(filename string) ([]Library, error) { - file, err := os.Open(filename) - if err != nil { - return nil, err - } - defer file.Close() - - reader := csv.NewReader(file) - - // Skip header - _, err = reader.Read() - if err != nil { - return nil, err - } - - var libraries []Library - - // Read records - for { - record, err := reader.Read() - if err == io.EOF { - break - } - if err != nil { - return nil, err - } - - // Parse metadata JSON - var metadata map[string]interface{} - if err := json.Unmarshal([]byte(record[2]), &metadata); err != nil { - return nil, fmt.Errorf("failed to parse metadata: %w", err) - } - - library := Library{ - Name: record[0], - Category: record[1], - Metadata: metadata, - } - libraries = append(libraries, library) - } - - return libraries, nil -} diff --git a/examples/usage.go b/examples/usage/main.go similarity index 100% rename from examples/usage.go rename to examples/usage/main.go diff --git a/pkg/node.go b/pkg/node.go index 93896f6..c49d14d 100644 --- a/pkg/node.go +++ b/pkg/node.go @@ -3,12 +3,14 @@ package synchronizator import ( "fmt" "slices" + "strings" ) type StandardNode interface { GetClass() string GetName() string GetMetadata() []byte + GetOriginalData() []byte AddRelationship(relationship StandardRelationship) error AddRelationships(to int64, relationship []StandardRelationship) @@ -26,14 +28,16 @@ type Node struct { Id int64 // The id of the node name string // The name of the node metadata []byte // Arbitrary data. This is stored as a jsonb in the database + originalData []byte // Original response from remote platform } -func NewNode(name string, metadata []byte) *Node { +func NewNode(name string, class string, metadata, originalData []byte) *Node { return &Node{ - name: name, - _class: "NODE", - metadata: metadata, - Id: -1, // Use -1 to indicate not persisted + name: name, + _class: strings.ToUpper(class), + metadata: metadata, + originalData: originalData, + Id: -1, // Use -1 to indicate not persisted } } @@ -49,6 +53,10 @@ func (node *Node) GetMetadata() []byte { return node.metadata } +func (node *Node) GetOriginalData() []byte { + return node.originalData +} + func (node *Node) AddRelationship(relationship StandardRelationship) error { node1, node2 := relationship.GetNodes() diff --git a/pkg/platform.go b/pkg/platform.go index a0bfdea..7c84d4f 100644 --- a/pkg/platform.go +++ b/pkg/platform.go @@ -13,6 +13,16 @@ type Platform struct { Collections []*Collection // Child nodes } +func (platform *Platform) GetDefaultCollection() (*Collection, error) { + for _, collection := range platform.Collections { + if collection.IsDefault() { + return collection, nil + } + } + + return nil, fmt.Errorf("Default collection not found") +} + // Is a type alias for FetchResponse containing a slice of Collection pointers. type FetchCollectionResponse = FetchResponse[[]*Collection] diff --git a/pkg/synchronizator.go b/pkg/synchronizator.go index a2f5e91..966262b 100644 --- a/pkg/synchronizator.go +++ b/pkg/synchronizator.go @@ -109,7 +109,8 @@ func (conn *DB) bootstrap() error { id INTEGER PRIMARY KEY AUTOINCREMENT, _class text NOT NULL, name TEXT, - metadata jsonb DEFAULT '{}' + metadata jsonb DEFAULT '{}', + original_data jsonb DEFAULT '{}' ); CREATE INDEX IF NOT EXISTS node_class on nodes (_class); @@ -169,10 +170,10 @@ func (conn *DB) Query(sql string, args ...any) (*sql.Rows, error) { // A collection is only a Node wrapper with some extended functionality to // manage multiple nodes. For more information see [DB.NewNode] method and the // [Platform] struct. -func (conn *DB) NewPlatform(name string, metadata []byte) (*Platform, error) { +func (conn *DB) NewPlatform(name string, metadata []byte, originalData []byte) (*Platform, error) { var platform *Platform err := conn.withTx(func(tx *sql.Tx) error { - node, err := conn.newNodewithTx(tx, name, "PLATFORM", metadata) + node, err := conn.newNodewithTx(tx, name, "PLATFORM", metadata, originalData) if err != nil { return err } @@ -181,6 +182,7 @@ func (conn *DB) NewPlatform(name string, metadata []byte) (*Platform, error) { tx, strings.ToUpper(name)+"_DEFAULT", nil, + nil, ) if err != nil { return err @@ -210,12 +212,12 @@ func (conn *DB) NewPlatform(name string, metadata []byte) (*Platform, error) { // [Collection] struct. // // The operation is ran in a database transaction. -func (conn *DB) NewCollection(name string, metadata []byte) (*Collection, error) { +func (conn *DB) NewCollection(name string, metadata, originalData []byte) (*Collection, error) { var collection *Collection err := conn.withTx(func(tx *sql.Tx) error { var err error - collection, err = conn.newCollectionwithTx(tx, name, metadata) + collection, err = conn.newCollectionwithTx(tx, name, metadata, originalData) return err }) @@ -227,8 +229,12 @@ func (conn *DB) NewCollection(name string, metadata []byte) (*Collection, error) // A collection is only a Node wrapper with some extended functionality to // manage multiple nodes. For more information see [DB.NewNode] method and the // [Collection] struct. -func (conn *DB) newCollectionwithTx(tx *sql.Tx, name string, metadata []byte) (*Collection, error) { - node, err := conn.newNodewithTx(tx, name, "COLLECTION", metadata) +func (conn *DB) newCollectionwithTx( + tx *sql.Tx, + name string, + metadata, originalData []byte, +) (*Collection, error) { + node, err := conn.newNodewithTx(tx, name, "COLLECTION", metadata, originalData) if err != nil { return nil, err } @@ -245,12 +251,12 @@ func (conn *DB) newCollectionwithTx(tx *sql.Tx, name string, metadata []byte) (* // Creates a new node. // // The operation is ran in a database transaction. -func (conn *DB) NewNode(name string, class string, metadata []byte) (*Node, error) { +func (conn *DB) NewNode(name string, class string, metadata, originalData []byte) (*Node, error) { var node *Node err := conn.withTx(func(tx *sql.Tx) error { var err error - node, err = conn.newNodewithTx(tx, name, class, metadata) + node, err = conn.newNodewithTx(tx, name, class, metadata, originalData) return err }) @@ -263,20 +269,22 @@ func (conn *DB) newNodewithTx( name string, class string, metadata []byte, + originalData []byte, ) (*Node, error) { node := Node{ - _conn: conn, - _class: class, - name: name, - metadata: metadata, - Id: -1, + _conn: conn, + _class: class, + name: name, + metadata: metadata, + originalData: originalData, + Id: -1, } conn.log(DEBUG, "Creating node:", node) - sql := "INSERT INTO nodes (_class, name, metadata) VALUES ($1, $2, $3) RETURNING id;" + sql := "INSERT INTO nodes (_class, name, metadata, original_data) VALUES ($1, $2, $3, $3) RETURNING id;" - err := tx.QueryRow(sql, node._class, node.name, metadata).Scan(&node.Id) + err := tx.QueryRow(sql, node._class, node.name, metadata, originalData).Scan(&node.Id) if err != nil { return nil, err } @@ -474,19 +482,19 @@ func BulkCreateNode[T StandardNode]( valueArgs := make([]interface{}, 0, len(nodes)*3) for i := range nodes { - // Create ($1, $2, $3), ($4, $5, $6), etc. - n := i * 3 - valueStrings = append(valueStrings, fmt.Sprintf("($%d, $%d, $%d)", n+1, n+2, n+3)) + n := i * 4 + valueStrings = append(valueStrings, fmt.Sprintf("($%d, $%d, $%d, $%d)", n+1, n+2, n+3, n+4)) valueArgs = append( valueArgs, nodes[i].GetClass(), nodes[i].GetName(), nodes[i].GetMetadata(), + nodes[i].GetOriginalData(), ) } sql := fmt.Sprintf(` - INSERT INTO nodes (_class, name, metadata) + INSERT INTO nodes (_class, name, metadata, original_data) VALUES %s RETURNING id;`, strings.Join(valueStrings, ","))