diff --git a/examples/readeck/main.go b/examples/readeck/main.go index fd4cc55..cf23c4b 100644 --- a/examples/readeck/main.go +++ b/examples/readeck/main.go @@ -1,9 +1,12 @@ package main import ( + "bytes" + "context" "database/sql" "encoding/json" "fmt" + "net/http" "time" _ "modernc.org/sqlite" @@ -11,6 +14,8 @@ import ( synchronizator "git.alecodes.page/alecodes/synchronizator/pkg" ) +const API_TOKEN = "" + type ReadwiseDocument struct { Id string `json:"id"` Url string `json:"url"` @@ -75,8 +80,9 @@ type ReadeckBookmark struct { func readwiseToReadeck(document *ReadwiseDocument) (*synchronizator.Node, error) { bookmark := &ReadeckBookmark{ - Url: document.SourceUrl, - Title: document.Title, + Url: document.SourceUrl, + Title: document.Title, + IsArchived: document.Location == "archive", } for _, tag := range document.Tags { @@ -93,6 +99,72 @@ func readwiseToReadeck(document *ReadwiseDocument) (*synchronizator.Node, error) return node, nil } +func createReadeckBookmark(ctx context.Context, bookmark ReadeckBookmark) (string, error) { + fmt.Printf("Bookmark: %v\n", bookmark) + + url := "https://web-archive.alecodes.page/api/bookmarks" + method := "POST" + + payload, err := json.Marshal(bookmark) + if err != nil { + return "", err + } + + client := &http.Client{} + req, err := http.NewRequest(method, url, bytes.NewBuffer(payload)) + if err != nil { + return "", err + } + req.Header.Add("accept", "application/json") + req.Header.Add("content-type", "application/json") + req.Header.Add("authorization", "Bearer "+API_TOKEN) + + res, err := client.Do(req) + if err != nil { + return "", err + } + defer res.Body.Close() + + if res.StatusCode > 202 { + return "", fmt.Errorf( + "Request for bookmark %v failed with status code %v", + bookmark.Id, + res.Status, + ) + } + + if !bookmark.IsArchived { + return "", nil + } + + updatePayload := []byte(fmt.Sprintf(`{"is_archived": %t}`, bookmark.IsArchived)) + + url = "https://web-archive.alecodes.page/api/bookmarks/" + res.Header.Get("bookmark-id") + updateReq, err := http.NewRequest("PATCH", url, bytes.NewBuffer(updatePayload)) + if err != nil { + return "", err + } + updateReq.Header.Add("accept", "application/json") + updateReq.Header.Add("authorization", "Bearer "+API_TOKEN) + updateReq.Header.Add("content-type", "application/json") + + updateRes, err := client.Do(updateReq) + if err != nil { + return "", nil + } + defer updateRes.Body.Close() + + if res.StatusCode > 202 { + return "", fmt.Errorf( + "Request for bookmark %v failed with status code %v", + bookmark.Id, + res.Status, + ) + } + + return "", nil +} + func drop_data(conn *sql.DB) error { sql := ` DELETE FROM nodes WHERE name = 'READECK'; @@ -122,12 +194,12 @@ func main() { opts := synchronizator.DefaultOptions // opts.Log_level = synchronizator.DEBUG - err = drop_data(connection) - if err != nil { - fmt.Println(err) - - return - } + // err = drop_data(connection) + // if err != nil { + // fmt.Println(err) + // + // return + // } sync, err := synchronizator.New(connection, opts) if err != nil { @@ -136,27 +208,63 @@ func main() { return } + // err = reconciliateCollections(sync) + // if err != nil { + // fmt.Println(err) + // return + // } + + err = pushChanges(sync) + if err != nil { + fmt.Printf("ERROR: %v\n", err) + return + } +} + +func pushChanges(sync *synchronizator.DB) error { + readeckBookmarks, err := sync.GetCollection(3167) + if err != nil { + return nil + } + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + poolConfig := &synchronizator.WorkConfig{ + AmountOfWorkers: 5, + MaxRetries: 2, + BaseRetryTime: time.Second * 30, + RateLimit: synchronizator.NewRateLimiter(10, time.Minute), + Timeout: time.Second * 2, + } + + err = synchronizator.PushNodes(ctx, readeckBookmarks, createReadeckBookmark, poolConfig) + if err != nil { + return err + } + + return nil +} + +func reconciliateCollections(sync *synchronizator.DB) error { readeck, err := sync.NewPlatform("READECK", nil, nil) if err != nil { - fmt.Println(err) - return + return nil } collection, err := readeck.GetDefaultCollection() if err != nil { - fmt.Println(err) - return + return nil } readwiseDocuments, err := sync.GetCollection(3) if err != nil { - fmt.Println(err) - return + return nil } err = synchronizator.ReconciliateCollections(readwiseDocuments, collection, readwiseToReadeck) if err != nil { - fmt.Println(err) - return + return nil } + return nil } diff --git a/pkg/collection.go b/pkg/collection.go index 70345f4..3c6b675 100644 --- a/pkg/collection.go +++ b/pkg/collection.go @@ -199,6 +199,49 @@ func (collection *Collection) FetchNodes( return nil } +func PushNodes[T any]( + ctx context.Context, + collection *Collection, + fetcher Work[T, string], + poolConfig *WorkConfig, +) error { + fmt.Printf("Pushing nodes for Collection: %d - %v\n", collection.Id, collection.name) + + tasks := make(chan T) + + values, errors, doneChannel := asyncTaskRunner[T, string]( + ctx, + tasks, + poolConfig, + fetcher, + ) + + for _, node := range collection.childs { + var child T + json.Unmarshal(node.GetMetadata(), &child) + tasks <- child + } + + close(tasks) + + for { + select { + case <-values: + // TODO: add external id to node, so we can update it later + continue + case error, ok := <-errors: + if !ok { + continue + } + return error + case <-ctx.Done(): + return ctx.Err() + case <-doneChannel: + return nil + } + } +} + type Reconcilation[T any] func(src *T) (*Node, error) func ReconciliateCollections[T any](src, dst *Collection, work Reconcilation[T]) error { diff --git a/pkg/synchronizator.go b/pkg/synchronizator.go index 03ba0b0..49cc946 100644 --- a/pkg/synchronizator.go +++ b/pkg/synchronizator.go @@ -109,6 +109,7 @@ func (conn *DB) bootstrap() error { id INTEGER PRIMARY KEY AUTOINCREMENT, _class text NOT NULL, name TEXT, + external_id TEXT, metadata jsonb DEFAULT '{}', original_data jsonb DEFAULT '{}' );