diff --git a/.gitignore b/.gitignore index b60de5b..ea8c4bf 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1 @@ -**/target +/target diff --git a/.idea/codeStyles/Project.xml b/.idea/codeStyles/Project.xml deleted file mode 100644 index 4e20976..0000000 --- a/.idea/codeStyles/Project.xml +++ /dev/null @@ -1,29 +0,0 @@ - - - - - - - - - - - - \ No newline at end of file diff --git a/.idea/codeStyles/codeStyleConfig.xml b/.idea/codeStyles/codeStyleConfig.xml deleted file mode 100644 index 6e6eec1..0000000 --- a/.idea/codeStyles/codeStyleConfig.xml +++ /dev/null @@ -1,6 +0,0 @@ - - - - \ No newline at end of file diff --git a/.idea/dataSources.xml b/.idea/dataSources.xml index 38d7bd6..283ef50 100644 --- a/.idea/dataSources.xml +++ b/.idea/dataSources.xml @@ -8,12 +8,5 @@ jdbc:sqlite:$USER_HOME$/.local/share/readwise-bulk-upload/db.sql $ProjectFileDir$ - - sqlite.xerial - true - org.sqlite.JDBC - jdbc:sqlite:$PROJECT_DIR$/lib_sync_core/target/sqlx/test-dbs/lib_sync_core/database/sqlite/tests/it_save_tasks.sqlite - $ProjectFileDir$ - \ No newline at end of file diff --git a/.idea/readwise-bulk-upload.iml b/.idea/readwise-bulk-upload.iml index 0855a40..cf84ae4 100644 --- a/.idea/readwise-bulk-upload.iml +++ b/.idea/readwise-bulk-upload.iml @@ -2,10 +2,7 @@ - - - - + diff --git a/.idea/runConfigurations/Load_Tasks.xml b/.idea/runConfigurations/Load_Tasks.xml deleted file mode 100644 index 41f2816..0000000 --- a/.idea/runConfigurations/Load_Tasks.xml +++ /dev/null @@ -1,22 +0,0 @@ - - - - \ No newline at end of file diff --git a/.idea/runConfigurations/Query_Tasks.xml b/.idea/runConfigurations/Query_Tasks.xml deleted file mode 100644 index ef4b918..0000000 --- a/.idea/runConfigurations/Query_Tasks.xml +++ /dev/null @@ -1,22 +0,0 @@ - - - - \ No newline at end of file diff --git a/.idea/runConfigurations/Run_Tasks.xml b/.idea/runConfigurations/Run_Tasks.xml deleted file mode 100644 index ef08a15..0000000 --- a/.idea/runConfigurations/Run_Tasks.xml +++ /dev/null @@ -1,22 +0,0 @@ - - - - \ No newline at end of file diff --git a/.idea/vcs.xml b/.idea/vcs.xml index 8d3e42f..35eb1dd 100644 --- a/.idea/vcs.xml +++ b/.idea/vcs.xml @@ -1,10 +1,5 @@ - - - diff --git a/Cargo.lock b/Cargo.lock index 6292eb7..87f2582 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -17,19 +17,6 @@ version = "2.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "512761e0bb2578dd7380c6baaa0f4ce03e84f95e960231d1dec8bf4d7d6e2627" -[[package]] -name = "ahash" -version = "0.8.12" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5a15f179cd60c4584b8a8c596927aadc462e27f2ca70c04e0071964a73ba7a75" -dependencies = [ - "cfg-if", - "getrandom 0.3.2", - "once_cell", - "version_check", - "zerocopy", -] - [[package]] name = "aho-corasick" version = "1.1.3" @@ -110,28 +97,6 @@ dependencies = [ "windows-sys 0.59.0", ] -[[package]] -name = "async-stream" -version = "0.3.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0b5a71a6f37880a80d1d7f19efd781e4b5de42c88f0722cc13bcb6cc2cfe8476" -dependencies = [ - "async-stream-impl", - "futures-core", - "pin-project-lite", -] - -[[package]] -name = "async-stream-impl" -version = "0.3.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c7c24de15d275a1ecfd47a380fb4d5ec9bfe0933f309ed5e705b775596a3574d" -dependencies = [ - "proc-macro2", - "quote", - "syn", -] - [[package]] name = "atoi" version = "2.0.0" @@ -207,12 +172,6 @@ version = "3.17.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1628fb46dfa0b37568d12e5edd512553eccf6a22a78e8bde00bb4aed84d5bdbf" -[[package]] -name = "bytecount" -version = "0.6.8" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5ce89b21cab1437276d2650d57e971f9d548a2d9037cc231abdc0562b97498ce" - [[package]] name = "bytemuck" version = "1.23.0" @@ -301,27 +260,6 @@ version = "0.7.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f46ad14479a25103f283c0f10005961cf086d8dc42205bb44c46ac563475dca6" -[[package]] -name = "cli" -version = "0.1.0" -dependencies = [ - "chrono", - "clap", - "directories", - "figment", - "futures", - "lib_sync_core", - "serde", - "serde_json", - "sqlx", - "tabled", - "thiserror", - "tokio", - "tracing", - "tracing-core", - "tracing-subscriber", -] - [[package]] name = "colorchoice" version = "1.0.3" @@ -398,40 +336,6 @@ dependencies = [ "typenum", ] -[[package]] -name = "darling" -version = "0.20.11" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fc7f46116c46ff9ab3eb1597a45688b6715c6e628b5c133e288e709a29bcb4ee" -dependencies = [ - "darling_core", - "darling_macro", -] - -[[package]] -name = "darling_core" -version = "0.20.11" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0d00b9596d185e565c2207a0b01f8bd1a135483d02d9b7b0a54b11da8d53412e" -dependencies = [ - "fnv", - "ident_case", - "proc-macro2", - "quote", - "syn", -] - -[[package]] -name = "darling_macro" -version = "0.20.11" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fc34b93ccb385b40dc71c6fceac4b2ad23662c7eeb248cf10d529b7e055b6ead" -dependencies = [ - "darling_core", - "quote", - "syn", -] - [[package]] name = "der" version = "0.7.10" @@ -443,12 +347,6 @@ dependencies = [ "zeroize", ] -[[package]] -name = "deunicode" -version = "1.6.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "abd57806937c9cc163efc8ea3910e00a62e2aeb0b8119f1793a978088f8f6b04" - [[package]] name = "digest" version = "0.10.7" @@ -499,18 +397,6 @@ version = "0.15.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1aaf95b3e5c8f23aa320147307562d361db0ae0d51242340f558153b4eb2439b" -[[package]] -name = "dummy" -version = "0.11.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3bbcf21279103a67372982cb1156a2154a452451dff2b884cf897ccecce389e0" -dependencies = [ - "darling", - "proc-macro2", - "quote", - "syn", -] - [[package]] name = "either" version = "1.15.0" @@ -558,21 +444,6 @@ dependencies = [ "pin-project-lite", ] -[[package]] -name = "fake" -version = "4.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2f5f203b70a419cb8880d1cfe6bebe488add0a0307d404e9f24021e5fd864b80" -dependencies = [ - "chrono", - "deunicode", - "dummy", - "http", - "rand 0.9.1", - "url-escape", - "uuid", -] - [[package]] name = "fastrand" version = "2.3.0" @@ -603,12 +474,6 @@ dependencies = [ "spin", ] -[[package]] -name = "fnv" -version = "1.0.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1" - [[package]] name = "foldhash" version = "0.1.5" @@ -624,21 +489,6 @@ dependencies = [ "percent-encoding", ] -[[package]] -name = "futures" -version = "0.3.31" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "65bc07b1a8bc7c85c5f2e110c476c7389b4554ba72af57d8445ea63a576b0876" -dependencies = [ - "futures-channel", - "futures-core", - "futures-executor", - "futures-io", - "futures-sink", - "futures-task", - "futures-util", -] - [[package]] name = "futures-channel" version = "0.3.31" @@ -683,17 +533,6 @@ version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9e5c1b78ca4aae1ac06c48a526a655760685149f0d465d21f37abfe57ce075c6" -[[package]] -name = "futures-macro" -version = "0.3.31" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "162ee34ebcb7c64a8abebc059ce0fee27c2262618d7b60ed8faf72fef13c3650" -dependencies = [ - "proc-macro2", - "quote", - "syn", -] - [[package]] name = "futures-sink" version = "0.3.31" @@ -712,10 +551,8 @@ version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9fa08315bb612088cc391249efdc3bc77536f16c91f6cf495e6fbe85b20a4a81" dependencies = [ - "futures-channel", "futures-core", "futures-io", - "futures-macro", "futures-sink", "futures-task", "memchr", @@ -822,17 +659,6 @@ dependencies = [ "windows-sys 0.59.0", ] -[[package]] -name = "http" -version = "1.3.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f4a85d31aea989eead29a3aaf9e1115a180df8282431156e533de47660892565" -dependencies = [ - "bytes", - "fnv", - "itoa", -] - [[package]] name = "iana-time-zone" version = "0.1.63" @@ -975,12 +801,6 @@ dependencies = [ "syn", ] -[[package]] -name = "ident_case" -version = "1.0.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b9e0384b61958566e926dc50660321d12159025e767c18e043daf26b70104c39" - [[package]] name = "idna" version = "1.0.3" @@ -1049,31 +869,6 @@ dependencies = [ "spin", ] -[[package]] -name = "lib_sync_core" -version = "0.1.0" -dependencies = [ - "async-stream", - "chrono", - "clap", - "directories", - "fake", - "figment", - "futures", - "serde", - "serde_json", - "sqlx", - "tabled", - "thiserror", - "tokio", - "tokio-stream", - "tracing", - "tracing-core", - "tracing-subscriber", - "tracing-test", - "uuid", -] - [[package]] name = "libc" version = "0.2.172" @@ -1202,7 +997,7 @@ dependencies = [ "num-integer", "num-iter", "num-traits", - "rand 0.8.5", + "rand", "smallvec", "zeroize", ] @@ -1264,17 +1059,6 @@ version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39" -[[package]] -name = "papergrid" -version = "0.15.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "30268a8d20c2c0d126b2b6610ab405f16517f6ba9f244d8c59ac2c512a8a1ce7" -dependencies = [ - "ahash", - "bytecount", - "unicode-width", -] - [[package]] name = "parking" version = "2.2.1" @@ -1390,28 +1174,6 @@ dependencies = [ "zerocopy", ] -[[package]] -name = "proc-macro-error-attr2" -version = "2.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "96de42df36bb9bba5542fe9f1a054b8cc87e172759a1868aa05c1f3acc89dfc5" -dependencies = [ - "proc-macro2", - "quote", -] - -[[package]] -name = "proc-macro-error2" -version = "2.0.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "11ec05c52be0a07b08061f7dd003e7d7092e0472bc731b4af7bb1ef876109802" -dependencies = [ - "proc-macro-error-attr2", - "proc-macro2", - "quote", - "syn", -] - [[package]] name = "proc-macro2" version = "1.0.95" @@ -1456,18 +1218,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "34af8d1a0e25924bc5b7c43c079c942339d8f0a8b57c39049bef581b46327404" dependencies = [ "libc", - "rand_chacha 0.3.1", - "rand_core 0.6.4", -] - -[[package]] -name = "rand" -version = "0.9.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9fbfd9d094a40bf3ae768db9361049ace4c0e04a4fd6b359518bd7b73a73dd97" -dependencies = [ - "rand_chacha 0.9.0", - "rand_core 0.9.3", + "rand_chacha", + "rand_core", ] [[package]] @@ -1477,17 +1229,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e6c10a63a0fa32252be49d21e7709d4d4baf8d231c2dbce1eaa8141b9b127d88" dependencies = [ "ppv-lite86", - "rand_core 0.6.4", -] - -[[package]] -name = "rand_chacha" -version = "0.9.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d3022b5f1df60f26e1ffddd6c66e8aa15de382ae63b3a0c1bfc0e4d3e3f325cb" -dependencies = [ - "ppv-lite86", - "rand_core 0.9.3", + "rand_core", ] [[package]] @@ -1500,12 +1242,21 @@ dependencies = [ ] [[package]] -name = "rand_core" -version = "0.9.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "99d9a13982dcf210057a8a78572b2217b667c3beacbf3a0d8b454f6f82837d38" +name = "readwise-bulk-upload" +version = "0.1.0" dependencies = [ - "getrandom 0.3.2", + "chrono", + "clap", + "directories", + "figment", + "serde", + "serde_json", + "sqlx", + "thiserror", + "tokio", + "tracing", + "tracing-core", + "tracing-subscriber", ] [[package]] @@ -1585,7 +1336,7 @@ dependencies = [ "num-traits", "pkcs1", "pkcs8", - "rand_core 0.6.4", + "rand_core", "signature", "spki", "subtle", @@ -1684,12 +1435,6 @@ dependencies = [ "digest", ] -[[package]] -name = "sha1_smol" -version = "1.0.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bbfa15b3dddfee50a0fff136974b3e1bde555604ba463834a7eb7deb6417705d" - [[package]] name = "sha2" version = "0.10.9" @@ -1723,7 +1468,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "77549399552de45a898a580c1b41d445bf730df867cc44e6c0233bbc4b8329de" dependencies = [ "digest", - "rand_core 0.6.4", + "rand_core", ] [[package]] @@ -1819,7 +1564,6 @@ dependencies = [ "tokio-stream", "tracing", "url", - "uuid", ] [[package]] @@ -1891,7 +1635,7 @@ dependencies = [ "memchr", "once_cell", "percent-encoding", - "rand 0.8.5", + "rand", "rsa", "serde", "sha1", @@ -1901,7 +1645,6 @@ dependencies = [ "stringprep", "thiserror", "tracing", - "uuid", "whoami", ] @@ -1931,7 +1674,7 @@ dependencies = [ "md-5", "memchr", "once_cell", - "rand 0.8.5", + "rand", "serde", "serde_json", "sha2", @@ -1940,7 +1683,6 @@ dependencies = [ "stringprep", "thiserror", "tracing", - "uuid", "whoami", ] @@ -1967,7 +1709,6 @@ dependencies = [ "thiserror", "tracing", "url", - "uuid", ] [[package]] @@ -2021,30 +1762,6 @@ dependencies = [ "syn", ] -[[package]] -name = "tabled" -version = "0.19.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "228d124371171cd39f0f454b58f73ddebeeef3cef3207a82ffea1c29465aea43" -dependencies = [ - "papergrid", - "tabled_derive", - "testing_table", -] - -[[package]] -name = "tabled_derive" -version = "0.11.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0ea5d1b13ca6cff1f9231ffd62f15eefd72543dab5e468735f1a456728a02846" -dependencies = [ - "heck", - "proc-macro-error2", - "proc-macro2", - "quote", - "syn", -] - [[package]] name = "tempfile" version = "3.19.1" @@ -2058,15 +1775,6 @@ dependencies = [ "windows-sys 0.59.0", ] -[[package]] -name = "testing_table" -version = "0.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0f8daae29995a24f65619e19d8d31dea5b389f3d853d8bf297bbf607cd0014cc" -dependencies = [ - "unicode-width", -] - [[package]] name = "thiserror" version = "2.0.12" @@ -2222,27 +1930,6 @@ dependencies = [ "tracing-log", ] -[[package]] -name = "tracing-test" -version = "0.2.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "557b891436fe0d5e0e363427fc7f217abf9ccd510d5136549847bdcbcd011d68" -dependencies = [ - "tracing-core", - "tracing-subscriber", - "tracing-test-macro", -] - -[[package]] -name = "tracing-test-macro" -version = "0.2.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "04659ddb06c87d233c566112c1c9c5b9e98256d9af50ec3bc9c8327f873a7568" -dependencies = [ - "quote", - "syn", -] - [[package]] name = "typenum" version = "1.18.0" @@ -2285,12 +1972,6 @@ version = "0.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e70f2a8b45122e719eb623c01822704c4e0907e7e426a05927e1a1cfff5b75d0" -[[package]] -name = "unicode-width" -version = "0.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1fc81956842c57dac11422a97c3b8195a1ff727f06e85c84ed2e8aa277c9a0fd" - [[package]] name = "url" version = "2.5.4" @@ -2302,15 +1983,6 @@ dependencies = [ "percent-encoding", ] -[[package]] -name = "url-escape" -version = "0.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "44e0ce4d1246d075ca5abec4b41d33e87a6054d08e2366b63205665e950db218" -dependencies = [ - "percent-encoding", -] - [[package]] name = "utf16_iter" version = "1.0.5" @@ -2329,19 +2001,6 @@ version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821" -[[package]] -name = "uuid" -version = "1.16.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "458f7a779bf54acc9f347480ac654f68407d3aab21269a6e3c9f922acd9e2da9" -dependencies = [ - "atomic", - "getrandom 0.3.2", - "md-5", - "serde", - "sha1_smol", -] - [[package]] name = "valuable" version = "0.1.1" @@ -2439,10 +2098,6 @@ dependencies = [ "unicode-ident", ] -[[package]] -name = "web" -version = "0.1.0" - [[package]] name = "whoami" version = "1.6.0" diff --git a/Cargo.toml b/Cargo.toml index df1befe..da95ddc 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,18 @@ -[workspace] -resolver = "3" +[package] +name = "readwise-bulk-upload" +version = "0.1.0" +edition = "2024" -members = [ - "cli", "lib_sync_core", "web", -] +[dependencies] +thiserror = "2.0.12" +directories = "6.0.0" +tokio = { version = "1.45.0", features = ["default", "rt", "rt-multi-thread", "macros"] } +sqlx = { version = "0.8", features = [ "runtime-tokio", "sqlite", "chrono", "migrate" ] } +clap = { version = "4.5.37", features = ["derive"] } +serde = { version = "1.0.219", features = ["derive"] } +chrono = {version = "0.4.41", features = ["serde"]} +serde_json = "1.0.140" +tracing = "0.1.41" +tracing-subscriber = { version = "0.3.19" , features = ["env-filter"]} +figment = { version = "0.10.19", features = ["env"] } +tracing-core = "0.1.33" diff --git a/cli/Cargo.toml b/cli/Cargo.toml deleted file mode 100644 index c056a52..0000000 --- a/cli/Cargo.toml +++ /dev/null @@ -1,26 +0,0 @@ -[package] -name = "cli" -version = "0.1.0" -edition = "2024" - -[[bin]] -name = "readwise" -path = "bin/readwise/main.rs" - -[dependencies] -lib_sync_core = {path = "../lib_sync_core"} - -directories = "6.0.0" -tokio = { version = "1.45.0", features = ["default", "rt", "rt-multi-thread", "macros"] } -sqlx = { version = "0.8", features = [ "runtime-tokio", "sqlite", "chrono", "migrate" ] } -clap = { version = "4.5.37", features = ["derive"] } -serde = { version = "1.0.219", features = ["derive"] } -chrono = {version = "0.4.41", features = ["serde"]} -serde_json = "1.0.140" -tracing = "0.1.41" -tracing-subscriber = { version = "0.3.19" , features = ["env-filter"]} -figment = { version = "0.10.19", features = ["env"] } -tracing-core = "0.1.33" -tabled = "0.19.0" -futures = "0.3.31" -thiserror = "2.0.12" \ No newline at end of file diff --git a/cli/bin/readwise/main.rs b/cli/bin/readwise/main.rs deleted file mode 100644 index 29eee78..0000000 --- a/cli/bin/readwise/main.rs +++ /dev/null @@ -1,76 +0,0 @@ -use clap::{CommandFactory, Parser}; -use directories::ProjectDirs; -use figment::{ - Figment, - providers::{Env, Serialized}, -}; -use lib_sync_core::tasks::{TaskStatus}; -use cli::config::{Command, Config}; -use cli::{Error, Result}; -use std::fs::File; -use tabled::Table; -use tracing_subscriber; -use crate::external_interface::DocumentPayload; - -mod external_interface; - -#[tokio::main] -async fn main() -> Result<()> { - let cli = Config::parse(); - let args: Config = Figment::new() - .merge(Serialized::defaults(&cli)) - .merge(Env::prefixed("APP_")) - .extract()?; - - tracing_subscriber::fmt() - .with_max_level(args.log_level()) - .init(); - - run(&cli.command).await?; - - Ok(()) -} - -async fn run(command: &Command) -> Result<()> { - let project_dir = ProjectDirs::from("", "", "synchronizator_readwise").ok_or( - lib_sync_core::error::Error::Unhandled("Could not get standard directories"), - )?; - - let task_manager = TaskManager::new(project_dir.data_dir()).await?; - - match command { - Command::LoadTasks { path } => { - let file = File::open(path).map_err(|_| { - Error::Runtime(format!( - r#"The file "{}" could not be open"#, - path.display() - )) - })?; - - let documents: Vec = serde_json::from_reader(file)?; - - task_manager.load_tasks(documents).await?; - } - Command::Query => { - let tasks = task_manager - .get_tasks::(None, Some(25)) - .await?; - - println!("{}", Table::new(tasks)); - } - Command::Run => { - task_manager - .run_tasks::(|task| { - println!("{}", task.get_key()); - - TaskStatus::Completed - }) - .await?; - } - Command::None => { - Config::command().print_help()?; - } - } - - Ok(()) -} diff --git a/cli/src/main.rs b/cli/src/main.rs deleted file mode 100644 index 75c0be7..0000000 --- a/cli/src/main.rs +++ /dev/null @@ -1,23 +0,0 @@ -use clap::Parser; -use figment::{ - providers::{Env, Serialized}, - Figment, -}; -use cli::config::Config; -use cli::Result; -use tracing_subscriber; - -#[tokio::main] -async fn main() -> Result<()> { - let cli = Config::parse(); - let args: Config = Figment::new() - .merge(Serialized::defaults(&cli)) - .merge(Env::prefixed("APP_")) - .extract()?; - - tracing_subscriber::fmt() - .with_max_level(args.log_level()) - .init(); - - Ok(()) -} diff --git a/lib_sync_core/Cargo.toml b/lib_sync_core/Cargo.toml deleted file mode 100644 index be60587..0000000 --- a/lib_sync_core/Cargo.toml +++ /dev/null @@ -1,27 +0,0 @@ -[package] -name = "lib_sync_core" -version = "0.1.0" -edition = "2024" - -[dependencies] -directories = "6.0.0" -tokio = { version = "1.45.0", features = ["default", "rt", "rt-multi-thread", "macros"] } -tokio-stream = "0.1.17" -sqlx = { version = "0.8", features = ["runtime-tokio", "sqlite", "chrono", "migrate", "uuid"] } -clap = { version = "4.5.37", features = ["derive"] } -serde = { version = "1.0.219", features = ["derive"] } -chrono = {version = "0.4.41", features = ["serde"]} -serde_json = "1.0.140" -tracing = "0.1.41" -tracing-subscriber = { version = "0.3.19" , features = ["env-filter"]} -figment = { version = "0.10.19", features = ["env"] } -tracing-core = "0.1.33" -tabled = "0.19.0" -futures = "0.3.31" -thiserror = "2.0.12" -async-stream = "0.3.6" -uuid = { version = "1.16.0", features = ["serde", "v4"] } - -[dev-dependencies] -fake = { version = "4.3.0", features = ["derive", "chrono", "http", "uuid"] } -tracing-test = "0.2.5" diff --git a/lib_sync_core/src/database.rs b/lib_sync_core/src/database.rs deleted file mode 100644 index 89e5da0..0000000 --- a/lib_sync_core/src/database.rs +++ /dev/null @@ -1,76 +0,0 @@ -use crate::tasks::{Task, TaskPayload, TaskStatus}; -use futures::Stream; -mod sqlite; - -#[derive(Default, Clone, Debug)] -pub struct TaskPagination { - limit: Option, - offset: u32, - status: Option, -} - -impl TaskPagination { - pub fn new() -> Self { - Self::default() - } - - pub fn next(&self) -> Self { - Self { - offset: self.offset.saturating_add(self.limit.unwrap_or(0)), - ..self.clone() - } - } - - pub fn prev(&self) -> Self { - Self { - offset: self.offset.saturating_sub(self.limit.unwrap_or(0)), - ..self.clone() - } - } - - pub fn with_limit(mut self, limit: Option) -> Self { - self.limit = limit; - self - } - - pub fn with_offset(mut self, offset: u32) -> Self { - self.offset = offset; - self - } - - pub fn with_status(mut self, status: Option) -> Self { - self.status = status; - self - } -} - -pub struct TasksPage { - tasks: Vec>, - page: TaskPagination, -} - -impl TasksPage { - fn new(tasks: Vec>, page: TaskPagination) -> Self { - Self { tasks, page } - } - - pub fn next(&self) -> TaskPagination { - self.page.next() - } - - pub fn prev(&self) -> TaskPagination { - self.page.prev() - } -} - -pub trait TaskStorage { - async fn insert_tasks<'a, I: IntoIterator>>( - &self, - tasks: I, - ) -> crate::Result<()>; - fn get_tasks(&self, task_status: TaskStatus) -> impl Stream>>; - - fn listen_tasks(&self, task_status: TaskStatus) -> impl Stream>>; - - async fn get_paginated_tasks(&self, page: TaskPagination) -> crate::Result>; -} diff --git a/lib_sync_core/src/database/sqlite.rs b/lib_sync_core/src/database/sqlite.rs deleted file mode 100644 index 6d11c9a..0000000 --- a/lib_sync_core/src/database/sqlite.rs +++ /dev/null @@ -1,230 +0,0 @@ -use crate::database::{TaskPagination, TaskStorage, TasksPage}; -use crate::tasks::{Task, TaskPayload, TaskStatus}; -use futures::{Stream, TryStreamExt}; -use sqlx::sqlite::{SqliteConnectOptions, SqliteJournalMode}; -use sqlx::{QueryBuilder, SqlitePool}; -use std::path::PathBuf; -use tokio::fs; -use tracing::{debug, info, instrument}; - -static SQLITE_BIND_LIMIT: usize = 32766; -static MIGRATIONS: sqlx::migrate::Migrator = sqlx::migrate!("../migrations"); - -#[derive(Debug)] -pub struct Sqlite { - pool: SqlitePool, -} - -impl Sqlite { - pub async fn new>(base_path: P) -> crate::Result { - Ok(Self { - pool: Self::connect_database(base_path).await?, - }) - } - - async fn connect_database>(base_path: P) -> crate::Result { - let base_path = base_path.into(); - - let database_file_path = base_path.join("db.sql"); - - fs::create_dir_all(base_path).await?; - - let opts = SqliteConnectOptions::new() - .filename(database_file_path) - .create_if_missing(true) - .journal_mode(SqliteJournalMode::Wal); - - let pool = SqlitePool::connect_with(opts).await?; - - MIGRATIONS.run(&pool).await?; - - Ok(pool) - } -} - -impl TaskStorage for Sqlite { - /// Insert task into the database for later processing - /// - /// # Arguments - /// - /// * `tasks`: A list of task to be processed, each task has to have a unique key, if a key is repeated, the item will be omitted - /// - /// returns: Result<(), Error> - /// - /// # Examples - /// - /// ``` - /// - /// ``` - #[instrument(skip(self, tasks))] - async fn insert_tasks<'a, I: IntoIterator>>( - &self, - tasks: I, - ) -> crate::Result<()> { - let mut tx = self.pool.begin().await?; - let mut builder: QueryBuilder<'_, sqlx::Sqlite> = - QueryBuilder::new("insert into tasks(payload_key, payload, status_id)"); - - let args: crate::Result> = tasks - .into_iter() - .map(|value| Ok((value.get_key(), serde_json::to_string(value.payload())?))) - .collect(); - - let mut affected_rows = 0; - // Chunk the query by the size limit of bind params - for chunk in args?.chunks(SQLITE_BIND_LIMIT / 3) { - builder.push_values(chunk, |mut builder, item| { - builder - .push_bind(&item.0) - .push_bind(&item.1) - .push_bind(TaskStatus::Pending); - }); - builder.push("ON conflict (payload_key) DO NOTHING"); - - let query = builder.build(); - - affected_rows += query.execute(&mut *tx).await?.rows_affected(); - builder.reset(); - } - - tx.commit().await?; - - info!("{} rows inserted.", affected_rows); - - Ok(()) - } - - fn get_tasks(&self, task_status: TaskStatus) -> impl Stream>> { - let query = sqlx::query_as::<_, Task>( - " - SELECT id, payload_key, payload, status_id, created_at, updated_at - FROM tasks - WHERE status_id = ? - ORDER BY created_at DESC - ", - ) - .bind(task_status); - - query.fetch(&self.pool).err_into::() - } - - fn listen_tasks(&self, task_status: TaskStatus) -> impl Stream>> { - futures::stream::empty() - } - - - async fn get_paginated_tasks(&self, page: TaskPagination) -> crate::Result> { - let mut builder: QueryBuilder<'_, sqlx::Sqlite> = QueryBuilder::new( - "select id, payload_key, payload, status_id, created_at, updated_at from tasks ", - ); - - if let Some(status) = &page.status { - builder.push("where status_id = ").push_bind(status); - } - - builder.push("ORDER BY created_at DESC "); - - - if let Some(limit) = page.limit { - builder.push("LIMIT ").push_bind(limit); - builder.push(" OFFSET ").push_bind(page.offset); - } - - debug!("SQL: \"{}\", with options: \"{:?}\"", builder.sql(), page); - - let tasks = builder - .build_query_as::>() - .fetch_all(&self.pool) - .await?; - - Ok(TasksPage::new(tasks, page.clone())) - } -} - -#[cfg(test)] -mod tests { - use super::*; - use fake::Dummy; - use futures::StreamExt; - use serde::{Deserialize, Serialize}; - use sqlx::types::Uuid; - use sqlx::Row; - use tracing_test::traced_test; - - #[derive(Dummy, Serialize, Deserialize, Debug)] - struct DummyTaskPayload { - key: Uuid, - _foo: String, - _bar: String, - } - - //noinspection RsUnresolvedPath - fn setup(pool: SqlitePool, len: usize) -> (Sqlite, Vec>) { - let owned_pool = pool.clone(); - let sqlite = Sqlite { pool: owned_pool }; - - let payloads = fake::vec![DummyTaskPayload; len]; - - let tasks = payloads - .into_iter() - .enumerate() - .map(|(i, item)| Task::new((i + 1).to_string(), item, TaskStatus::Pending)) - .collect(); - - (sqlite, tasks) - } - - #[sqlx::test(migrator = "MIGRATIONS")] - #[traced_test] - async fn it_save_tasks(pool: SqlitePool) -> sqlx::Result<()> { - let (sqlite, tasks) = setup(pool.clone(), 100); - - sqlite.insert_tasks(&tasks).await.unwrap(); - - let result = sqlx::query("select count(id) from tasks") - .fetch_one(&pool) - .await?; - - let total_rows: u64 = result.get(0); - - assert_eq!(total_rows as usize, tasks.len()); - - let saved_tasks: Vec> = sqlite - .get_tasks(TaskStatus::Pending) - .map(|item| item.unwrap()) - .collect() - .await; - - assert_eq!(tasks.len(), saved_tasks.len()); - - let mut zip = tasks.into_iter().zip(saved_tasks.into_iter()); - - assert!(zip.all(|(a, b)| { a.get_key() == b.get_key() })); - - Ok(()) - } - - #[sqlx::test(migrator = "MIGRATIONS")] - #[traced_test] - async fn it_return_paginated_tasks(pool: SqlitePool) -> sqlx::Result<()> { - let (sqlite, tasks) = setup(pool.clone(), 300); - - sqlite.insert_tasks(&tasks).await.unwrap(); - - let page_options = TaskPagination::new().with_limit(Some(25)); - - let first_page: TasksPage = - sqlite.get_paginated_tasks(page_options).await.unwrap(); - - assert_eq!(first_page.tasks.len(), 25); - assert_eq!(first_page.tasks.first().unwrap().get_key(), tasks.get(0).unwrap().get_key()); - - let second_page: TasksPage = - sqlite.get_paginated_tasks(first_page.next()).await.unwrap(); - - assert_eq!(second_page.tasks.len(), 25); - assert_eq!(second_page.tasks.first().unwrap().get_key(), tasks.get(25).unwrap().get_key()); - - Ok(()) - } -} diff --git a/lib_sync_core/src/error.rs b/lib_sync_core/src/error.rs deleted file mode 100644 index 93c8a0b..0000000 --- a/lib_sync_core/src/error.rs +++ /dev/null @@ -1,24 +0,0 @@ -use thiserror::Error; - -#[derive(Error, Debug)] -pub enum Error { - #[error("{0}")] - Exception(&'static str), - - #[error("{0}")] - Unhandled(&'static str), - - #[error(transparent)] - Io(#[from] tokio::io::Error), - - #[error(transparent)] - Sqlx(#[from] sqlx::Error), - - #[error(transparent)] - Migration(#[from] sqlx::migrate::MigrateError), - - #[error(transparent)] - ParseJson(#[from] serde_json::Error), -} - -pub type Result = std::result::Result; \ No newline at end of file diff --git a/lib_sync_core/src/lib.rs b/lib_sync_core/src/lib.rs deleted file mode 100644 index 1030a9a..0000000 --- a/lib_sync_core/src/lib.rs +++ /dev/null @@ -1,5 +0,0 @@ -pub mod error; - -pub(crate) use error::*; -pub mod tasks; -mod database; diff --git a/lib_sync_core/src/tasks.rs b/lib_sync_core/src/tasks.rs deleted file mode 100644 index f455941..0000000 --- a/lib_sync_core/src/tasks.rs +++ /dev/null @@ -1,92 +0,0 @@ -use chrono::Utc; -use serde::de::DeserializeOwned; -use serde::Serialize; -use std::fmt::Display; -use tabled::Tabled; - -mod manager; -mod jobs; -mod worker; -mod bus; - -#[derive(sqlx::Type, Debug, Clone)] -#[repr(u8)] -pub enum TaskStatus { - Pending = 1, - InProgress = 2, - Completed = 3, - Failed = 4, -} - -impl Display for TaskStatus { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match self { - TaskStatus::Pending => { - write!(f, "Pending") - } - TaskStatus::InProgress => { - write!(f, "In Progress") - } - TaskStatus::Completed => { - write!(f, "Completed") - } - TaskStatus::Failed => { - write!(f, "Failed") - } - } - } -} - -pub trait TaskPayload: - Serialize + DeserializeOwned + Send + Unpin + 'static + std::fmt::Debug -{ -} -impl - TaskPayload for T -{ -} - -#[derive(sqlx::FromRow, Tabled, Debug)] -pub struct Task { - id: u32, - payload_key: String, - #[sqlx(json)] - #[tabled(skip)] - payload: T, - #[sqlx(rename = "status_id")] - status: TaskStatus, - created_at: chrono::DateTime, - #[tabled(display = "display_option_date")] - updated_at: Option>, -} - -impl Task { - pub fn new(payload_key: String, payload: T, status: TaskStatus) -> Self { - Self { - id: 0, - payload_key, - payload, - status, - created_at: Default::default(), - updated_at: None, - } - } -} - -impl Task { - pub fn payload(&self) -> &T { - &self.payload - } - - pub fn get_key(&self) -> String { - self.payload_key.clone() - } -} - -fn display_option_date(o: &Option>) -> String { - match o { - Some(s) => format!("{}", s), - None => String::from(""), - } -} - diff --git a/lib_sync_core/src/tasks/bus.rs b/lib_sync_core/src/tasks/bus.rs deleted file mode 100644 index 4e96e56..0000000 --- a/lib_sync_core/src/tasks/bus.rs +++ /dev/null @@ -1,4 +0,0 @@ -#[derive(Clone)] -pub enum Bus { - Local, -} \ No newline at end of file diff --git a/lib_sync_core/src/tasks/jobs.rs b/lib_sync_core/src/tasks/jobs.rs deleted file mode 100644 index 74a8ca0..0000000 --- a/lib_sync_core/src/tasks/jobs.rs +++ /dev/null @@ -1,3 +0,0 @@ -use crate::tasks::{Task, TaskStatus}; - -pub type TaskJob = fn(&Task) -> TaskStatus; \ No newline at end of file diff --git a/lib_sync_core/src/tasks/manager.rs b/lib_sync_core/src/tasks/manager.rs deleted file mode 100644 index eaaef77..0000000 --- a/lib_sync_core/src/tasks/manager.rs +++ /dev/null @@ -1,190 +0,0 @@ -use crate::database::TaskStorage; -use crate::tasks::bus::Bus; -use crate::tasks::jobs::TaskJob; -use crate::tasks::{Task, TaskPayload, TaskStatus}; -use futures::StreamExt; -use std::marker::PhantomData; -use std::pin::pin; -use tokio::sync::mpsc::Receiver; -use tokio::sync::{mpsc, oneshot}; -use tokio::sync::oneshot::Sender; -use crate::tasks::worker::TaskMessage; - -pub enum RateLimit { - Buffer(usize), - Rate(usize), - Ticks(usize), - None, -} - -pub struct ManagerOptions { - rate_limit: RateLimit, - bus: Bus, -} - -impl ManagerOptions { - pub fn new() -> Self { - Self::default() - } - - pub fn with_rate_limit(mut self, rate_limit: RateLimit) -> Self { - self.rate_limit = rate_limit; - self - } -} - -impl Default for ManagerOptions { - fn default() -> Self { - Self { - rate_limit: RateLimit::None, - bus: Bus::Local, - } - } -} - -struct TaskManager> { - storage: T, - options: ManagerOptions, - _marker: PhantomData, -} - -impl> TaskManager { - pub fn new(storage: T, options: ManagerOptions) -> Self { - Self { - storage, - options, - _marker: PhantomData, - } - } - - pub async fn run_tasks(&self, mut task_sink: TaskMessage) -> crate::Result<()> { - let rows = self.storage.get_tasks(TaskStatus::Pending); - let listener = self.storage.listen_tasks(TaskStatus::Pending); - - let mut queue = pin!(rows.chain(listener)); - - while let Some(task) = queue.next().await { - let task = match task { - Ok(task) => task, - Err(e) => { - continue - } - }; - - let sink = match task_sink.recv().await { - Some(s) => s, - None => break, // sink has stoped requesting tasks - }; - - if let Err(_) = sink.send(task) { - continue; - } - - // (task, status) - } - - Ok(()) - } -} - -#[cfg(test)] -mod tests { - use super::*; - use crate::database::{TaskPagination, TasksPage}; - use async_stream::stream; - use fake::{Dummy, Fake, Faker}; - use futures::Stream; - use serde::{Deserialize, Serialize}; - use sqlx::types::Uuid; - use sync::mpsc; - use tokio::sync; - use tokio_stream::wrappers::ReceiverStream; - use tracing_test::traced_test; - use crate::error::Error; - use crate::tasks::worker::{Worker, WorkerManager}; - - #[derive(Dummy, Serialize, Deserialize, Debug)] - struct DummyTaskPayload { - key: Uuid, - _foo: String, - _bar: String, - } - - struct DummyTaskStorage {} - - impl TaskStorage for DummyTaskStorage { - async fn insert_tasks<'a, I: IntoIterator>>( - &self, - _: I, - ) -> crate::error::Result<()> { - todo!() - } - - fn get_tasks( - &self, - task_status: TaskStatus, - ) -> impl Stream>> { - let payloads: Vec = Faker.fake(); - - let tasks = payloads.into_iter().enumerate().map(move |(i, item)| { - Ok(Task::new((i + 1).to_string(), item, task_status.clone())) - }); - - futures::stream::iter(tasks) - } - - fn listen_tasks( - &self, - task_status: TaskStatus, - ) -> impl Stream>> { - let (tx, rx) = mpsc::channel::>>(10); - - tokio::spawn(async move { - for _ in 0..10 { - tokio::time::sleep(std::time::Duration::from_millis(250)).await; - - let payload: DummyTaskPayload = Faker.fake(); - let task_status: TaskStatus = task_status.clone(); - let task = Ok(Task::new(payload.key.to_string(), payload, task_status)); - - if let Err(_) = tx.send(task).await { - break; - } - } - }); - - ReceiverStream::new(rx) - } - - async fn get_paginated_tasks( - &self, - _: TaskPagination, - ) -> crate::error::Result> { - todo!() - } - } - - struct DummyWorker; - - impl Worker for DummyWorker { - fn process_job(task: &Task) -> crate::error::Result<()> { - println!("{:#?}", task); - Ok(()) - } - - async fn on_job_failure(task: &Task, error: Error) -> crate::error::Result<()> { - println!("{:#?} {:?}", task, error); - Ok(()) - } - } - - #[tokio::test] - #[traced_test] - async fn manager_runs() { - let execute_options = ManagerOptions::new(); - let local_worker_sink = WorkerManager::get_listener_sink::(execute_options.bus.clone()); - let task_manager = TaskManager::new(DummyTaskStorage {}, execute_options); - - task_manager.run_tasks(local_worker_sink).await.unwrap() - } -} diff --git a/lib_sync_core/src/tasks/worker.rs b/lib_sync_core/src/tasks/worker.rs deleted file mode 100644 index 7e3d917..0000000 --- a/lib_sync_core/src/tasks/worker.rs +++ /dev/null @@ -1,48 +0,0 @@ -use crate::error::Error; -use crate::tasks::bus::Bus; -use crate::tasks::{Task, TaskPayload}; -use tokio::sync::mpsc::Receiver; -use tokio::sync::oneshot::Sender; -use tokio::sync::{mpsc, oneshot}; - -pub type TaskMessage = Receiver>>; - -pub struct WorkerManager; - -impl WorkerManager { - pub fn get_listener_sink>(bus: Bus) -> TaskMessage { - match bus { - Bus::Local => { - let (bus_tx, bus_rx) = mpsc::channel(100); - tokio::spawn(async move { - loop { - // TODO: properly catch errors - let (tx, rx) = oneshot::channel(); - - // Request a task - bus_tx.send(tx).await.unwrap(); - - // Wait for a task to be returned - let task = rx.await.unwrap(); - - W::process_job(&task).unwrap(); - } - }); - - bus_rx - } - } - } -} - -pub trait Worker { - async fn pre_process_job(task: &Task) -> crate::Result<()> { - Ok(()) - } - fn process_job(task: &Task) -> crate::Result<()>; - async fn post_process_job(task: &Task) -> crate::Result<()> { - Ok(()) - } - - async fn on_job_failure(task: &Task, error: Error) -> crate::Result<()>; -} diff --git a/migrations/0003_tasks.sql b/migrations/0003_tasks.sql index 85d03ec..0b7cb54 100644 --- a/migrations/0003_tasks.sql +++ b/migrations/0003_tasks.sql @@ -3,7 +3,7 @@ create table tasks id integer not null constraint tasks_pk primary key autoincrement, - payload_key TEXT not null + payload_key ANY not null constraint tasks_payload_key unique on conflict ignore, payload TEXT not null, diff --git a/cli/src/config.rs b/src/config.rs similarity index 80% rename from cli/src/config.rs rename to src/config.rs index 04bdbff..680fdf7 100644 --- a/cli/src/config.rs +++ b/src/config.rs @@ -1,4 +1,4 @@ -use clap::{Parser, Subcommand, ValueEnum}; +use clap::{Parser, ValueEnum}; use serde::{Deserialize, Serialize}; use std::fmt; use std::path::PathBuf; @@ -57,32 +57,9 @@ impl Into for VerbosityLevel { } } -#[derive(Debug, Subcommand)] -#[clap(rename_all = "snake_case")] -pub enum Command { - /// Load task into the database from [path] - LoadTasks{ - /// Path to the file - path: PathBuf, - }, - Query, - Run, - #[clap(skip)] - None, -} - -impl Default for Command { - fn default() -> Self { - Command::None - } -} - #[derive(Debug, Parser, Serialize, Deserialize)] pub struct Config { - #[command(subcommand)] - #[serde(skip)] - pub command: Command, - + path: PathBuf, #[arg( long, short = 'v', @@ -94,6 +71,9 @@ pub struct Config { } impl Config { + pub fn path(&self) -> &PathBuf { + &self.path + } pub fn log_level(&self) -> LevelFilter { self.log_level.clone().into() diff --git a/cli/src/error.rs b/src/error.rs similarity index 88% rename from cli/src/error.rs rename to src/error.rs index 36572b2..b55b32e 100644 --- a/cli/src/error.rs +++ b/src/error.rs @@ -11,9 +11,6 @@ pub enum Error { #[error("{0}")] Unhandled(&'static str), - #[error(transparent)] - Sync(#[from] lib_sync_core::error::Error), - #[error(transparent)] Sqlx(#[from] sqlx::Error), diff --git a/cli/src/lib.rs b/src/lib.rs similarity index 59% rename from cli/src/lib.rs rename to src/lib.rs index dabf39f..dabd6d3 100644 --- a/cli/src/lib.rs +++ b/src/lib.rs @@ -1,4 +1,6 @@ -pub mod config; mod error; +pub mod sql; +pub mod config; +pub mod readwise; pub use error::*; \ No newline at end of file diff --git a/src/main.rs b/src/main.rs new file mode 100644 index 0000000..985c0e4 --- /dev/null +++ b/src/main.rs @@ -0,0 +1,35 @@ +use clap::Parser; +use readwise_bulk_upload::config::Config; +use readwise_bulk_upload::readwise::DocumentPayload; +use readwise_bulk_upload::sql::TaskManager; +use readwise_bulk_upload::{Error, Result}; +use std::fs::File; +use tracing_subscriber; +use figment::{Figment, providers::{Serialized, Env}}; + +#[tokio::main] +async fn main() -> Result<()> { + let args: Config = Figment::new() + .merge(Serialized::defaults(Config::parse())) + .merge(Env::prefixed("APP_")) + .extract()?; + + tracing_subscriber::fmt() + .with_max_level(args.log_level()) + .init(); + + let file = File::open(args.path()).map_err(|_| { + Error::Runtime(format!( + r#"The file "{}" could not be open"#, + args.path().display() + )) + })?; + + let documents: Vec = serde_json::from_reader(file)?; + + let task_manager = TaskManager::new().await?; + + task_manager.load_tasks(documents).await?; + + Ok(()) +} diff --git a/cli/bin/readwise/external_interface.rs b/src/readwise.rs similarity index 69% rename from cli/bin/readwise/external_interface.rs rename to src/readwise.rs index a484ca9..33bf9ed 100644 --- a/cli/bin/readwise/external_interface.rs +++ b/src/readwise.rs @@ -1,8 +1,7 @@ -use lib_sync_core::tasks::TaskPayload; use chrono::{DateTime, Local}; -use serde::{de, Deserialize, Deserializer, Serialize}; +use serde::{Deserialize, Deserializer, de, Serialize}; use serde_json::Value; -use std::fmt::Display; +use crate::sql::TaskPayload; #[derive(Deserialize, Serialize, Debug)] pub struct DocumentPayload { @@ -15,16 +14,6 @@ pub struct DocumentPayload { location: String, } -impl Display for DocumentPayload { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!( - f, - "{}", - serde_json::to_string_pretty(self).map_err(|_| std::fmt::Error)? - ) - } -} - impl TaskPayload for DocumentPayload { fn get_key(&self) -> String { self.url.clone() diff --git a/src/sql.rs b/src/sql.rs new file mode 100644 index 0000000..7d2a442 --- /dev/null +++ b/src/sql.rs @@ -0,0 +1,94 @@ +use crate::Error; +use directories::ProjectDirs; +use serde::Serialize; +use sqlx::sqlite::{SqliteConnectOptions, SqliteJournalMode}; +use sqlx::{QueryBuilder, Sqlite, SqlitePool}; +use tokio::fs; +use tracing::{info, instrument}; + +static SQLITE_BIND_LIMIT: usize = 32766; + +#[derive(sqlx::Type)] +#[repr(u8)] +pub enum TaskStatus { + Pending = 1, + InProgress = 2, + Completed = 3, + Failed = 4, +} + +pub trait TaskPayload { + fn get_key(&self) -> String; +} + +#[derive(Debug)] +pub struct TaskManager { + pool: SqlitePool, +} + +impl TaskManager { + pub async fn new() -> Result { + Ok(Self { + pool: Self::connect_database().await?, + }) + } + + async fn connect_database() -> crate::Result { + let project_dir = ProjectDirs::from("", "", env!("CARGO_PKG_NAME")) + .ok_or(Error::Unhandled("Could not get standard directories"))?; + + let database_file_path = project_dir.data_dir().join("db.sql"); + + fs::create_dir_all(project_dir.data_dir()).await?; + + let opts = SqliteConnectOptions::new() + .filename(database_file_path) + .create_if_missing(true) + .journal_mode(SqliteJournalMode::Wal); + + let pool = SqlitePool::connect_with(opts).await?; + + sqlx::migrate!("./migrations").run(&pool).await?; + + Ok(pool) + } + + #[instrument(skip(self, values))] + pub async fn load_tasks(&self, values: Vec) -> crate::Result<()> + where + T: TaskPayload + Serialize + std::fmt::Debug, + { + let mut tx = self.pool.begin().await?; + let mut builder: QueryBuilder<'_, Sqlite> = + QueryBuilder::new("insert into tasks(payload_key, payload, status_id)"); + + let args: crate::Result> = values + .iter() + .map(|value| Ok((value.get_key(), serde_json::to_string(value)?))) + .collect(); + + + let mut affected_rows = 0; + // Chunk the query by the size limit of bind params + for chunk in args?.chunks(SQLITE_BIND_LIMIT / 3) { + builder.push_values(chunk, |mut builder, item| { + builder + .push_bind(&item.0) + .push_bind(&item.1) + .push_bind(TaskStatus::Pending); + }); + builder.push("ON conflict (payload_key) DO NOTHING"); + + let query = builder.build(); + + affected_rows += query.execute(&mut *tx).await?.rows_affected(); + builder.reset(); + } + + tx.commit().await?; + + info!("{} rows inserted.", affected_rows); + + Ok(()) + } +} diff --git a/web/Cargo.toml b/web/Cargo.toml deleted file mode 100644 index 52dfac4..0000000 --- a/web/Cargo.toml +++ /dev/null @@ -1,6 +0,0 @@ -[package] -name = "web" -version = "0.1.0" -edition = "2024" - -[dependencies] diff --git a/web/src/main.rs b/web/src/main.rs deleted file mode 100644 index e7a11a9..0000000 --- a/web/src/main.rs +++ /dev/null @@ -1,3 +0,0 @@ -fn main() { - println!("Hello, world!"); -}