diff --git a/.gitignore b/.gitignore
index ea8c4bf..b60de5b 100644
--- a/.gitignore
+++ b/.gitignore
@@ -1 +1 @@
-/target
+**/target
diff --git a/.idea/codeStyles/Project.xml b/.idea/codeStyles/Project.xml
new file mode 100644
index 0000000..4e20976
--- /dev/null
+++ b/.idea/codeStyles/Project.xml
@@ -0,0 +1,29 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.idea/codeStyles/codeStyleConfig.xml b/.idea/codeStyles/codeStyleConfig.xml
new file mode 100644
index 0000000..6e6eec1
--- /dev/null
+++ b/.idea/codeStyles/codeStyleConfig.xml
@@ -0,0 +1,6 @@
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.idea/dataSources.xml b/.idea/dataSources.xml
index 283ef50..38d7bd6 100644
--- a/.idea/dataSources.xml
+++ b/.idea/dataSources.xml
@@ -8,5 +8,12 @@
jdbc:sqlite:$USER_HOME$/.local/share/readwise-bulk-upload/db.sql
$ProjectFileDir$
+
+ sqlite.xerial
+ true
+ org.sqlite.JDBC
+ jdbc:sqlite:$PROJECT_DIR$/lib_sync_core/target/sqlx/test-dbs/lib_sync_core/database/sqlite/tests/it_save_tasks.sqlite
+ $ProjectFileDir$
+
\ No newline at end of file
diff --git a/.idea/readwise-bulk-upload.iml b/.idea/readwise-bulk-upload.iml
index cf84ae4..0855a40 100644
--- a/.idea/readwise-bulk-upload.iml
+++ b/.idea/readwise-bulk-upload.iml
@@ -2,7 +2,10 @@
-
+
+
+
+
diff --git a/.idea/runConfigurations/Load_Tasks.xml b/.idea/runConfigurations/Load_Tasks.xml
new file mode 100644
index 0000000..41f2816
--- /dev/null
+++ b/.idea/runConfigurations/Load_Tasks.xml
@@ -0,0 +1,22 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.idea/runConfigurations/Query_Tasks.xml b/.idea/runConfigurations/Query_Tasks.xml
new file mode 100644
index 0000000..ef4b918
--- /dev/null
+++ b/.idea/runConfigurations/Query_Tasks.xml
@@ -0,0 +1,22 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.idea/runConfigurations/Run_Tasks.xml b/.idea/runConfigurations/Run_Tasks.xml
new file mode 100644
index 0000000..ef08a15
--- /dev/null
+++ b/.idea/runConfigurations/Run_Tasks.xml
@@ -0,0 +1,22 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.idea/vcs.xml b/.idea/vcs.xml
index 35eb1dd..8d3e42f 100644
--- a/.idea/vcs.xml
+++ b/.idea/vcs.xml
@@ -1,5 +1,10 @@
+
+
+
diff --git a/Cargo.lock b/Cargo.lock
index 87f2582..6292eb7 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -17,6 +17,19 @@ version = "2.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "512761e0bb2578dd7380c6baaa0f4ce03e84f95e960231d1dec8bf4d7d6e2627"
+[[package]]
+name = "ahash"
+version = "0.8.12"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "5a15f179cd60c4584b8a8c596927aadc462e27f2ca70c04e0071964a73ba7a75"
+dependencies = [
+ "cfg-if",
+ "getrandom 0.3.2",
+ "once_cell",
+ "version_check",
+ "zerocopy",
+]
+
[[package]]
name = "aho-corasick"
version = "1.1.3"
@@ -97,6 +110,28 @@ dependencies = [
"windows-sys 0.59.0",
]
+[[package]]
+name = "async-stream"
+version = "0.3.6"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "0b5a71a6f37880a80d1d7f19efd781e4b5de42c88f0722cc13bcb6cc2cfe8476"
+dependencies = [
+ "async-stream-impl",
+ "futures-core",
+ "pin-project-lite",
+]
+
+[[package]]
+name = "async-stream-impl"
+version = "0.3.6"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "c7c24de15d275a1ecfd47a380fb4d5ec9bfe0933f309ed5e705b775596a3574d"
+dependencies = [
+ "proc-macro2",
+ "quote",
+ "syn",
+]
+
[[package]]
name = "atoi"
version = "2.0.0"
@@ -172,6 +207,12 @@ version = "3.17.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1628fb46dfa0b37568d12e5edd512553eccf6a22a78e8bde00bb4aed84d5bdbf"
+[[package]]
+name = "bytecount"
+version = "0.6.8"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "5ce89b21cab1437276d2650d57e971f9d548a2d9037cc231abdc0562b97498ce"
+
[[package]]
name = "bytemuck"
version = "1.23.0"
@@ -260,6 +301,27 @@ version = "0.7.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f46ad14479a25103f283c0f10005961cf086d8dc42205bb44c46ac563475dca6"
+[[package]]
+name = "cli"
+version = "0.1.0"
+dependencies = [
+ "chrono",
+ "clap",
+ "directories",
+ "figment",
+ "futures",
+ "lib_sync_core",
+ "serde",
+ "serde_json",
+ "sqlx",
+ "tabled",
+ "thiserror",
+ "tokio",
+ "tracing",
+ "tracing-core",
+ "tracing-subscriber",
+]
+
[[package]]
name = "colorchoice"
version = "1.0.3"
@@ -336,6 +398,40 @@ dependencies = [
"typenum",
]
+[[package]]
+name = "darling"
+version = "0.20.11"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "fc7f46116c46ff9ab3eb1597a45688b6715c6e628b5c133e288e709a29bcb4ee"
+dependencies = [
+ "darling_core",
+ "darling_macro",
+]
+
+[[package]]
+name = "darling_core"
+version = "0.20.11"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "0d00b9596d185e565c2207a0b01f8bd1a135483d02d9b7b0a54b11da8d53412e"
+dependencies = [
+ "fnv",
+ "ident_case",
+ "proc-macro2",
+ "quote",
+ "syn",
+]
+
+[[package]]
+name = "darling_macro"
+version = "0.20.11"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "fc34b93ccb385b40dc71c6fceac4b2ad23662c7eeb248cf10d529b7e055b6ead"
+dependencies = [
+ "darling_core",
+ "quote",
+ "syn",
+]
+
[[package]]
name = "der"
version = "0.7.10"
@@ -347,6 +443,12 @@ dependencies = [
"zeroize",
]
+[[package]]
+name = "deunicode"
+version = "1.6.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "abd57806937c9cc163efc8ea3910e00a62e2aeb0b8119f1793a978088f8f6b04"
+
[[package]]
name = "digest"
version = "0.10.7"
@@ -397,6 +499,18 @@ version = "0.15.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1aaf95b3e5c8f23aa320147307562d361db0ae0d51242340f558153b4eb2439b"
+[[package]]
+name = "dummy"
+version = "0.11.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "3bbcf21279103a67372982cb1156a2154a452451dff2b884cf897ccecce389e0"
+dependencies = [
+ "darling",
+ "proc-macro2",
+ "quote",
+ "syn",
+]
+
[[package]]
name = "either"
version = "1.15.0"
@@ -444,6 +558,21 @@ dependencies = [
"pin-project-lite",
]
+[[package]]
+name = "fake"
+version = "4.3.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "2f5f203b70a419cb8880d1cfe6bebe488add0a0307d404e9f24021e5fd864b80"
+dependencies = [
+ "chrono",
+ "deunicode",
+ "dummy",
+ "http",
+ "rand 0.9.1",
+ "url-escape",
+ "uuid",
+]
+
[[package]]
name = "fastrand"
version = "2.3.0"
@@ -474,6 +603,12 @@ dependencies = [
"spin",
]
+[[package]]
+name = "fnv"
+version = "1.0.7"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1"
+
[[package]]
name = "foldhash"
version = "0.1.5"
@@ -489,6 +624,21 @@ dependencies = [
"percent-encoding",
]
+[[package]]
+name = "futures"
+version = "0.3.31"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "65bc07b1a8bc7c85c5f2e110c476c7389b4554ba72af57d8445ea63a576b0876"
+dependencies = [
+ "futures-channel",
+ "futures-core",
+ "futures-executor",
+ "futures-io",
+ "futures-sink",
+ "futures-task",
+ "futures-util",
+]
+
[[package]]
name = "futures-channel"
version = "0.3.31"
@@ -533,6 +683,17 @@ version = "0.3.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9e5c1b78ca4aae1ac06c48a526a655760685149f0d465d21f37abfe57ce075c6"
+[[package]]
+name = "futures-macro"
+version = "0.3.31"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "162ee34ebcb7c64a8abebc059ce0fee27c2262618d7b60ed8faf72fef13c3650"
+dependencies = [
+ "proc-macro2",
+ "quote",
+ "syn",
+]
+
[[package]]
name = "futures-sink"
version = "0.3.31"
@@ -551,8 +712,10 @@ version = "0.3.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9fa08315bb612088cc391249efdc3bc77536f16c91f6cf495e6fbe85b20a4a81"
dependencies = [
+ "futures-channel",
"futures-core",
"futures-io",
+ "futures-macro",
"futures-sink",
"futures-task",
"memchr",
@@ -659,6 +822,17 @@ dependencies = [
"windows-sys 0.59.0",
]
+[[package]]
+name = "http"
+version = "1.3.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "f4a85d31aea989eead29a3aaf9e1115a180df8282431156e533de47660892565"
+dependencies = [
+ "bytes",
+ "fnv",
+ "itoa",
+]
+
[[package]]
name = "iana-time-zone"
version = "0.1.63"
@@ -801,6 +975,12 @@ dependencies = [
"syn",
]
+[[package]]
+name = "ident_case"
+version = "1.0.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "b9e0384b61958566e926dc50660321d12159025e767c18e043daf26b70104c39"
+
[[package]]
name = "idna"
version = "1.0.3"
@@ -869,6 +1049,31 @@ dependencies = [
"spin",
]
+[[package]]
+name = "lib_sync_core"
+version = "0.1.0"
+dependencies = [
+ "async-stream",
+ "chrono",
+ "clap",
+ "directories",
+ "fake",
+ "figment",
+ "futures",
+ "serde",
+ "serde_json",
+ "sqlx",
+ "tabled",
+ "thiserror",
+ "tokio",
+ "tokio-stream",
+ "tracing",
+ "tracing-core",
+ "tracing-subscriber",
+ "tracing-test",
+ "uuid",
+]
+
[[package]]
name = "libc"
version = "0.2.172"
@@ -997,7 +1202,7 @@ dependencies = [
"num-integer",
"num-iter",
"num-traits",
- "rand",
+ "rand 0.8.5",
"smallvec",
"zeroize",
]
@@ -1059,6 +1264,17 @@ version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39"
+[[package]]
+name = "papergrid"
+version = "0.15.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "30268a8d20c2c0d126b2b6610ab405f16517f6ba9f244d8c59ac2c512a8a1ce7"
+dependencies = [
+ "ahash",
+ "bytecount",
+ "unicode-width",
+]
+
[[package]]
name = "parking"
version = "2.2.1"
@@ -1174,6 +1390,28 @@ dependencies = [
"zerocopy",
]
+[[package]]
+name = "proc-macro-error-attr2"
+version = "2.0.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "96de42df36bb9bba5542fe9f1a054b8cc87e172759a1868aa05c1f3acc89dfc5"
+dependencies = [
+ "proc-macro2",
+ "quote",
+]
+
+[[package]]
+name = "proc-macro-error2"
+version = "2.0.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "11ec05c52be0a07b08061f7dd003e7d7092e0472bc731b4af7bb1ef876109802"
+dependencies = [
+ "proc-macro-error-attr2",
+ "proc-macro2",
+ "quote",
+ "syn",
+]
+
[[package]]
name = "proc-macro2"
version = "1.0.95"
@@ -1218,8 +1456,18 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "34af8d1a0e25924bc5b7c43c079c942339d8f0a8b57c39049bef581b46327404"
dependencies = [
"libc",
- "rand_chacha",
- "rand_core",
+ "rand_chacha 0.3.1",
+ "rand_core 0.6.4",
+]
+
+[[package]]
+name = "rand"
+version = "0.9.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "9fbfd9d094a40bf3ae768db9361049ace4c0e04a4fd6b359518bd7b73a73dd97"
+dependencies = [
+ "rand_chacha 0.9.0",
+ "rand_core 0.9.3",
]
[[package]]
@@ -1229,7 +1477,17 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e6c10a63a0fa32252be49d21e7709d4d4baf8d231c2dbce1eaa8141b9b127d88"
dependencies = [
"ppv-lite86",
- "rand_core",
+ "rand_core 0.6.4",
+]
+
+[[package]]
+name = "rand_chacha"
+version = "0.9.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "d3022b5f1df60f26e1ffddd6c66e8aa15de382ae63b3a0c1bfc0e4d3e3f325cb"
+dependencies = [
+ "ppv-lite86",
+ "rand_core 0.9.3",
]
[[package]]
@@ -1242,21 +1500,12 @@ dependencies = [
]
[[package]]
-name = "readwise-bulk-upload"
-version = "0.1.0"
+name = "rand_core"
+version = "0.9.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "99d9a13982dcf210057a8a78572b2217b667c3beacbf3a0d8b454f6f82837d38"
dependencies = [
- "chrono",
- "clap",
- "directories",
- "figment",
- "serde",
- "serde_json",
- "sqlx",
- "thiserror",
- "tokio",
- "tracing",
- "tracing-core",
- "tracing-subscriber",
+ "getrandom 0.3.2",
]
[[package]]
@@ -1336,7 +1585,7 @@ dependencies = [
"num-traits",
"pkcs1",
"pkcs8",
- "rand_core",
+ "rand_core 0.6.4",
"signature",
"spki",
"subtle",
@@ -1435,6 +1684,12 @@ dependencies = [
"digest",
]
+[[package]]
+name = "sha1_smol"
+version = "1.0.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "bbfa15b3dddfee50a0fff136974b3e1bde555604ba463834a7eb7deb6417705d"
+
[[package]]
name = "sha2"
version = "0.10.9"
@@ -1468,7 +1723,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "77549399552de45a898a580c1b41d445bf730df867cc44e6c0233bbc4b8329de"
dependencies = [
"digest",
- "rand_core",
+ "rand_core 0.6.4",
]
[[package]]
@@ -1564,6 +1819,7 @@ dependencies = [
"tokio-stream",
"tracing",
"url",
+ "uuid",
]
[[package]]
@@ -1635,7 +1891,7 @@ dependencies = [
"memchr",
"once_cell",
"percent-encoding",
- "rand",
+ "rand 0.8.5",
"rsa",
"serde",
"sha1",
@@ -1645,6 +1901,7 @@ dependencies = [
"stringprep",
"thiserror",
"tracing",
+ "uuid",
"whoami",
]
@@ -1674,7 +1931,7 @@ dependencies = [
"md-5",
"memchr",
"once_cell",
- "rand",
+ "rand 0.8.5",
"serde",
"serde_json",
"sha2",
@@ -1683,6 +1940,7 @@ dependencies = [
"stringprep",
"thiserror",
"tracing",
+ "uuid",
"whoami",
]
@@ -1709,6 +1967,7 @@ dependencies = [
"thiserror",
"tracing",
"url",
+ "uuid",
]
[[package]]
@@ -1762,6 +2021,30 @@ dependencies = [
"syn",
]
+[[package]]
+name = "tabled"
+version = "0.19.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "228d124371171cd39f0f454b58f73ddebeeef3cef3207a82ffea1c29465aea43"
+dependencies = [
+ "papergrid",
+ "tabled_derive",
+ "testing_table",
+]
+
+[[package]]
+name = "tabled_derive"
+version = "0.11.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "0ea5d1b13ca6cff1f9231ffd62f15eefd72543dab5e468735f1a456728a02846"
+dependencies = [
+ "heck",
+ "proc-macro-error2",
+ "proc-macro2",
+ "quote",
+ "syn",
+]
+
[[package]]
name = "tempfile"
version = "3.19.1"
@@ -1775,6 +2058,15 @@ dependencies = [
"windows-sys 0.59.0",
]
+[[package]]
+name = "testing_table"
+version = "0.3.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "0f8daae29995a24f65619e19d8d31dea5b389f3d853d8bf297bbf607cd0014cc"
+dependencies = [
+ "unicode-width",
+]
+
[[package]]
name = "thiserror"
version = "2.0.12"
@@ -1930,6 +2222,27 @@ dependencies = [
"tracing-log",
]
+[[package]]
+name = "tracing-test"
+version = "0.2.5"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "557b891436fe0d5e0e363427fc7f217abf9ccd510d5136549847bdcbcd011d68"
+dependencies = [
+ "tracing-core",
+ "tracing-subscriber",
+ "tracing-test-macro",
+]
+
+[[package]]
+name = "tracing-test-macro"
+version = "0.2.5"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "04659ddb06c87d233c566112c1c9c5b9e98256d9af50ec3bc9c8327f873a7568"
+dependencies = [
+ "quote",
+ "syn",
+]
+
[[package]]
name = "typenum"
version = "1.18.0"
@@ -1972,6 +2285,12 @@ version = "0.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e70f2a8b45122e719eb623c01822704c4e0907e7e426a05927e1a1cfff5b75d0"
+[[package]]
+name = "unicode-width"
+version = "0.2.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "1fc81956842c57dac11422a97c3b8195a1ff727f06e85c84ed2e8aa277c9a0fd"
+
[[package]]
name = "url"
version = "2.5.4"
@@ -1983,6 +2302,15 @@ dependencies = [
"percent-encoding",
]
+[[package]]
+name = "url-escape"
+version = "0.1.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "44e0ce4d1246d075ca5abec4b41d33e87a6054d08e2366b63205665e950db218"
+dependencies = [
+ "percent-encoding",
+]
+
[[package]]
name = "utf16_iter"
version = "1.0.5"
@@ -2001,6 +2329,19 @@ version = "0.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821"
+[[package]]
+name = "uuid"
+version = "1.16.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "458f7a779bf54acc9f347480ac654f68407d3aab21269a6e3c9f922acd9e2da9"
+dependencies = [
+ "atomic",
+ "getrandom 0.3.2",
+ "md-5",
+ "serde",
+ "sha1_smol",
+]
+
[[package]]
name = "valuable"
version = "0.1.1"
@@ -2098,6 +2439,10 @@ dependencies = [
"unicode-ident",
]
+[[package]]
+name = "web"
+version = "0.1.0"
+
[[package]]
name = "whoami"
version = "1.6.0"
diff --git a/Cargo.toml b/Cargo.toml
index da95ddc..df1befe 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -1,18 +1,6 @@
-[package]
-name = "readwise-bulk-upload"
-version = "0.1.0"
-edition = "2024"
+[workspace]
+resolver = "3"
-[dependencies]
-thiserror = "2.0.12"
-directories = "6.0.0"
-tokio = { version = "1.45.0", features = ["default", "rt", "rt-multi-thread", "macros"] }
-sqlx = { version = "0.8", features = [ "runtime-tokio", "sqlite", "chrono", "migrate" ] }
-clap = { version = "4.5.37", features = ["derive"] }
-serde = { version = "1.0.219", features = ["derive"] }
-chrono = {version = "0.4.41", features = ["serde"]}
-serde_json = "1.0.140"
-tracing = "0.1.41"
-tracing-subscriber = { version = "0.3.19" , features = ["env-filter"]}
-figment = { version = "0.10.19", features = ["env"] }
-tracing-core = "0.1.33"
+members = [
+ "cli", "lib_sync_core", "web",
+]
diff --git a/cli/Cargo.toml b/cli/Cargo.toml
new file mode 100644
index 0000000..c056a52
--- /dev/null
+++ b/cli/Cargo.toml
@@ -0,0 +1,26 @@
+[package]
+name = "cli"
+version = "0.1.0"
+edition = "2024"
+
+[[bin]]
+name = "readwise"
+path = "bin/readwise/main.rs"
+
+[dependencies]
+lib_sync_core = {path = "../lib_sync_core"}
+
+directories = "6.0.0"
+tokio = { version = "1.45.0", features = ["default", "rt", "rt-multi-thread", "macros"] }
+sqlx = { version = "0.8", features = [ "runtime-tokio", "sqlite", "chrono", "migrate" ] }
+clap = { version = "4.5.37", features = ["derive"] }
+serde = { version = "1.0.219", features = ["derive"] }
+chrono = {version = "0.4.41", features = ["serde"]}
+serde_json = "1.0.140"
+tracing = "0.1.41"
+tracing-subscriber = { version = "0.3.19" , features = ["env-filter"]}
+figment = { version = "0.10.19", features = ["env"] }
+tracing-core = "0.1.33"
+tabled = "0.19.0"
+futures = "0.3.31"
+thiserror = "2.0.12"
\ No newline at end of file
diff --git a/src/readwise.rs b/cli/bin/readwise/external_interface.rs
similarity index 69%
rename from src/readwise.rs
rename to cli/bin/readwise/external_interface.rs
index 33bf9ed..a484ca9 100644
--- a/src/readwise.rs
+++ b/cli/bin/readwise/external_interface.rs
@@ -1,7 +1,8 @@
+use lib_sync_core::tasks::TaskPayload;
use chrono::{DateTime, Local};
-use serde::{Deserialize, Deserializer, de, Serialize};
+use serde::{de, Deserialize, Deserializer, Serialize};
use serde_json::Value;
-use crate::sql::TaskPayload;
+use std::fmt::Display;
#[derive(Deserialize, Serialize, Debug)]
pub struct DocumentPayload {
@@ -14,6 +15,16 @@ pub struct DocumentPayload {
location: String,
}
+impl Display for DocumentPayload {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ write!(
+ f,
+ "{}",
+ serde_json::to_string_pretty(self).map_err(|_| std::fmt::Error)?
+ )
+ }
+}
+
impl TaskPayload for DocumentPayload {
fn get_key(&self) -> String {
self.url.clone()
diff --git a/cli/bin/readwise/main.rs b/cli/bin/readwise/main.rs
new file mode 100644
index 0000000..29eee78
--- /dev/null
+++ b/cli/bin/readwise/main.rs
@@ -0,0 +1,76 @@
+use clap::{CommandFactory, Parser};
+use directories::ProjectDirs;
+use figment::{
+ Figment,
+ providers::{Env, Serialized},
+};
+use lib_sync_core::tasks::{TaskStatus};
+use cli::config::{Command, Config};
+use cli::{Error, Result};
+use std::fs::File;
+use tabled::Table;
+use tracing_subscriber;
+use crate::external_interface::DocumentPayload;
+
+mod external_interface;
+
+#[tokio::main]
+async fn main() -> Result<()> {
+ let cli = Config::parse();
+ let args: Config = Figment::new()
+ .merge(Serialized::defaults(&cli))
+ .merge(Env::prefixed("APP_"))
+ .extract()?;
+
+ tracing_subscriber::fmt()
+ .with_max_level(args.log_level())
+ .init();
+
+ run(&cli.command).await?;
+
+ Ok(())
+}
+
+async fn run(command: &Command) -> Result<()> {
+ let project_dir = ProjectDirs::from("", "", "synchronizator_readwise").ok_or(
+ lib_sync_core::error::Error::Unhandled("Could not get standard directories"),
+ )?;
+
+ let task_manager = TaskManager::new(project_dir.data_dir()).await?;
+
+ match command {
+ Command::LoadTasks { path } => {
+ let file = File::open(path).map_err(|_| {
+ Error::Runtime(format!(
+ r#"The file "{}" could not be open"#,
+ path.display()
+ ))
+ })?;
+
+ let documents: Vec = serde_json::from_reader(file)?;
+
+ task_manager.load_tasks(documents).await?;
+ }
+ Command::Query => {
+ let tasks = task_manager
+ .get_tasks::(None, Some(25))
+ .await?;
+
+ println!("{}", Table::new(tasks));
+ }
+ Command::Run => {
+ task_manager
+ .run_tasks::(|task| {
+ println!("{}", task.get_key());
+
+ TaskStatus::Completed
+ })
+ .await?;
+ }
+ Command::None => {
+ Config::command().print_help()?;
+ }
+ }
+
+ Ok(())
+}
diff --git a/src/config.rs b/cli/src/config.rs
similarity index 80%
rename from src/config.rs
rename to cli/src/config.rs
index 680fdf7..04bdbff 100644
--- a/src/config.rs
+++ b/cli/src/config.rs
@@ -1,4 +1,4 @@
-use clap::{Parser, ValueEnum};
+use clap::{Parser, Subcommand, ValueEnum};
use serde::{Deserialize, Serialize};
use std::fmt;
use std::path::PathBuf;
@@ -57,9 +57,32 @@ impl Into for VerbosityLevel {
}
}
+#[derive(Debug, Subcommand)]
+#[clap(rename_all = "snake_case")]
+pub enum Command {
+ /// Load task into the database from [path]
+ LoadTasks{
+ /// Path to the file
+ path: PathBuf,
+ },
+ Query,
+ Run,
+ #[clap(skip)]
+ None,
+}
+
+impl Default for Command {
+ fn default() -> Self {
+ Command::None
+ }
+}
+
#[derive(Debug, Parser, Serialize, Deserialize)]
pub struct Config {
- path: PathBuf,
+ #[command(subcommand)]
+ #[serde(skip)]
+ pub command: Command,
+
#[arg(
long,
short = 'v',
@@ -71,9 +94,6 @@ pub struct Config {
}
impl Config {
- pub fn path(&self) -> &PathBuf {
- &self.path
- }
pub fn log_level(&self) -> LevelFilter {
self.log_level.clone().into()
diff --git a/src/error.rs b/cli/src/error.rs
similarity index 88%
rename from src/error.rs
rename to cli/src/error.rs
index b55b32e..36572b2 100644
--- a/src/error.rs
+++ b/cli/src/error.rs
@@ -11,6 +11,9 @@ pub enum Error {
#[error("{0}")]
Unhandled(&'static str),
+ #[error(transparent)]
+ Sync(#[from] lib_sync_core::error::Error),
+
#[error(transparent)]
Sqlx(#[from] sqlx::Error),
diff --git a/src/lib.rs b/cli/src/lib.rs
similarity index 59%
rename from src/lib.rs
rename to cli/src/lib.rs
index dabd6d3..dabf39f 100644
--- a/src/lib.rs
+++ b/cli/src/lib.rs
@@ -1,6 +1,4 @@
-mod error;
-pub mod sql;
pub mod config;
-pub mod readwise;
+mod error;
pub use error::*;
\ No newline at end of file
diff --git a/cli/src/main.rs b/cli/src/main.rs
new file mode 100644
index 0000000..75c0be7
--- /dev/null
+++ b/cli/src/main.rs
@@ -0,0 +1,23 @@
+use clap::Parser;
+use figment::{
+ providers::{Env, Serialized},
+ Figment,
+};
+use cli::config::Config;
+use cli::Result;
+use tracing_subscriber;
+
+#[tokio::main]
+async fn main() -> Result<()> {
+ let cli = Config::parse();
+ let args: Config = Figment::new()
+ .merge(Serialized::defaults(&cli))
+ .merge(Env::prefixed("APP_"))
+ .extract()?;
+
+ tracing_subscriber::fmt()
+ .with_max_level(args.log_level())
+ .init();
+
+ Ok(())
+}
diff --git a/lib_sync_core/Cargo.toml b/lib_sync_core/Cargo.toml
new file mode 100644
index 0000000..be60587
--- /dev/null
+++ b/lib_sync_core/Cargo.toml
@@ -0,0 +1,27 @@
+[package]
+name = "lib_sync_core"
+version = "0.1.0"
+edition = "2024"
+
+[dependencies]
+directories = "6.0.0"
+tokio = { version = "1.45.0", features = ["default", "rt", "rt-multi-thread", "macros"] }
+tokio-stream = "0.1.17"
+sqlx = { version = "0.8", features = ["runtime-tokio", "sqlite", "chrono", "migrate", "uuid"] }
+clap = { version = "4.5.37", features = ["derive"] }
+serde = { version = "1.0.219", features = ["derive"] }
+chrono = {version = "0.4.41", features = ["serde"]}
+serde_json = "1.0.140"
+tracing = "0.1.41"
+tracing-subscriber = { version = "0.3.19" , features = ["env-filter"]}
+figment = { version = "0.10.19", features = ["env"] }
+tracing-core = "0.1.33"
+tabled = "0.19.0"
+futures = "0.3.31"
+thiserror = "2.0.12"
+async-stream = "0.3.6"
+uuid = { version = "1.16.0", features = ["serde", "v4"] }
+
+[dev-dependencies]
+fake = { version = "4.3.0", features = ["derive", "chrono", "http", "uuid"] }
+tracing-test = "0.2.5"
diff --git a/lib_sync_core/src/database.rs b/lib_sync_core/src/database.rs
new file mode 100644
index 0000000..89e5da0
--- /dev/null
+++ b/lib_sync_core/src/database.rs
@@ -0,0 +1,76 @@
+use crate::tasks::{Task, TaskPayload, TaskStatus};
+use futures::Stream;
+mod sqlite;
+
+#[derive(Default, Clone, Debug)]
+pub struct TaskPagination {
+ limit: Option,
+ offset: u32,
+ status: Option,
+}
+
+impl TaskPagination {
+ pub fn new() -> Self {
+ Self::default()
+ }
+
+ pub fn next(&self) -> Self {
+ Self {
+ offset: self.offset.saturating_add(self.limit.unwrap_or(0)),
+ ..self.clone()
+ }
+ }
+
+ pub fn prev(&self) -> Self {
+ Self {
+ offset: self.offset.saturating_sub(self.limit.unwrap_or(0)),
+ ..self.clone()
+ }
+ }
+
+ pub fn with_limit(mut self, limit: Option) -> Self {
+ self.limit = limit;
+ self
+ }
+
+ pub fn with_offset(mut self, offset: u32) -> Self {
+ self.offset = offset;
+ self
+ }
+
+ pub fn with_status(mut self, status: Option) -> Self {
+ self.status = status;
+ self
+ }
+}
+
+pub struct TasksPage {
+ tasks: Vec>,
+ page: TaskPagination,
+}
+
+impl TasksPage {
+ fn new(tasks: Vec>, page: TaskPagination) -> Self {
+ Self { tasks, page }
+ }
+
+ pub fn next(&self) -> TaskPagination {
+ self.page.next()
+ }
+
+ pub fn prev(&self) -> TaskPagination {
+ self.page.prev()
+ }
+}
+
+pub trait TaskStorage {
+ async fn insert_tasks<'a, I: IntoIterator- >>(
+ &self,
+ tasks: I,
+ ) -> crate::Result<()>;
+ fn get_tasks(&self, task_status: TaskStatus) -> impl Stream
- >>;
+
+ fn listen_tasks(&self, task_status: TaskStatus) -> impl Stream
- >>;
+
+ async fn get_paginated_tasks(&self, page: TaskPagination) -> crate::Result>;
+}
diff --git a/lib_sync_core/src/database/sqlite.rs b/lib_sync_core/src/database/sqlite.rs
new file mode 100644
index 0000000..6d11c9a
--- /dev/null
+++ b/lib_sync_core/src/database/sqlite.rs
@@ -0,0 +1,230 @@
+use crate::database::{TaskPagination, TaskStorage, TasksPage};
+use crate::tasks::{Task, TaskPayload, TaskStatus};
+use futures::{Stream, TryStreamExt};
+use sqlx::sqlite::{SqliteConnectOptions, SqliteJournalMode};
+use sqlx::{QueryBuilder, SqlitePool};
+use std::path::PathBuf;
+use tokio::fs;
+use tracing::{debug, info, instrument};
+
+static SQLITE_BIND_LIMIT: usize = 32766;
+static MIGRATIONS: sqlx::migrate::Migrator = sqlx::migrate!("../migrations");
+
+#[derive(Debug)]
+pub struct Sqlite {
+ pool: SqlitePool,
+}
+
+impl Sqlite {
+ pub async fn new>(base_path: P) -> crate::Result {
+ Ok(Self {
+ pool: Self::connect_database(base_path).await?,
+ })
+ }
+
+ async fn connect_database>(base_path: P) -> crate::Result {
+ let base_path = base_path.into();
+
+ let database_file_path = base_path.join("db.sql");
+
+ fs::create_dir_all(base_path).await?;
+
+ let opts = SqliteConnectOptions::new()
+ .filename(database_file_path)
+ .create_if_missing(true)
+ .journal_mode(SqliteJournalMode::Wal);
+
+ let pool = SqlitePool::connect_with(opts).await?;
+
+ MIGRATIONS.run(&pool).await?;
+
+ Ok(pool)
+ }
+}
+
+impl TaskStorage for Sqlite {
+ /// Insert task into the database for later processing
+ ///
+ /// # Arguments
+ ///
+ /// * `tasks`: A list of task to be processed, each task has to have a unique key, if a key is repeated, the item will be omitted
+ ///
+ /// returns: Result<(), Error>
+ ///
+ /// # Examples
+ ///
+ /// ```
+ ///
+ /// ```
+ #[instrument(skip(self, tasks))]
+ async fn insert_tasks<'a, I: IntoIterator
- >>(
+ &self,
+ tasks: I,
+ ) -> crate::Result<()> {
+ let mut tx = self.pool.begin().await?;
+ let mut builder: QueryBuilder<'_, sqlx::Sqlite> =
+ QueryBuilder::new("insert into tasks(payload_key, payload, status_id)");
+
+ let args: crate::Result> = tasks
+ .into_iter()
+ .map(|value| Ok((value.get_key(), serde_json::to_string(value.payload())?)))
+ .collect();
+
+ let mut affected_rows = 0;
+ // Chunk the query by the size limit of bind params
+ for chunk in args?.chunks(SQLITE_BIND_LIMIT / 3) {
+ builder.push_values(chunk, |mut builder, item| {
+ builder
+ .push_bind(&item.0)
+ .push_bind(&item.1)
+ .push_bind(TaskStatus::Pending);
+ });
+ builder.push("ON conflict (payload_key) DO NOTHING");
+
+ let query = builder.build();
+
+ affected_rows += query.execute(&mut *tx).await?.rows_affected();
+ builder.reset();
+ }
+
+ tx.commit().await?;
+
+ info!("{} rows inserted.", affected_rows);
+
+ Ok(())
+ }
+
+ fn get_tasks(&self, task_status: TaskStatus) -> impl Stream
- >> {
+ let query = sqlx::query_as::<_, Task>(
+ "
+ SELECT id, payload_key, payload, status_id, created_at, updated_at
+ FROM tasks
+ WHERE status_id = ?
+ ORDER BY created_at DESC
+ ",
+ )
+ .bind(task_status);
+
+ query.fetch(&self.pool).err_into::()
+ }
+
+ fn listen_tasks(&self, task_status: TaskStatus) -> impl Stream
- >> {
+ futures::stream::empty()
+ }
+
+
+ async fn get_paginated_tasks(&self, page: TaskPagination) -> crate::Result> {
+ let mut builder: QueryBuilder<'_, sqlx::Sqlite> = QueryBuilder::new(
+ "select id, payload_key, payload, status_id, created_at, updated_at from tasks ",
+ );
+
+ if let Some(status) = &page.status {
+ builder.push("where status_id = ").push_bind(status);
+ }
+
+ builder.push("ORDER BY created_at DESC ");
+
+
+ if let Some(limit) = page.limit {
+ builder.push("LIMIT ").push_bind(limit);
+ builder.push(" OFFSET ").push_bind(page.offset);
+ }
+
+ debug!("SQL: \"{}\", with options: \"{:?}\"", builder.sql(), page);
+
+ let tasks = builder
+ .build_query_as::>()
+ .fetch_all(&self.pool)
+ .await?;
+
+ Ok(TasksPage::new(tasks, page.clone()))
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use fake::Dummy;
+ use futures::StreamExt;
+ use serde::{Deserialize, Serialize};
+ use sqlx::types::Uuid;
+ use sqlx::Row;
+ use tracing_test::traced_test;
+
+ #[derive(Dummy, Serialize, Deserialize, Debug)]
+ struct DummyTaskPayload {
+ key: Uuid,
+ _foo: String,
+ _bar: String,
+ }
+
+ //noinspection RsUnresolvedPath
+ fn setup(pool: SqlitePool, len: usize) -> (Sqlite, Vec>) {
+ let owned_pool = pool.clone();
+ let sqlite = Sqlite { pool: owned_pool };
+
+ let payloads = fake::vec![DummyTaskPayload; len];
+
+ let tasks = payloads
+ .into_iter()
+ .enumerate()
+ .map(|(i, item)| Task::new((i + 1).to_string(), item, TaskStatus::Pending))
+ .collect();
+
+ (sqlite, tasks)
+ }
+
+ #[sqlx::test(migrator = "MIGRATIONS")]
+ #[traced_test]
+ async fn it_save_tasks(pool: SqlitePool) -> sqlx::Result<()> {
+ let (sqlite, tasks) = setup(pool.clone(), 100);
+
+ sqlite.insert_tasks(&tasks).await.unwrap();
+
+ let result = sqlx::query("select count(id) from tasks")
+ .fetch_one(&pool)
+ .await?;
+
+ let total_rows: u64 = result.get(0);
+
+ assert_eq!(total_rows as usize, tasks.len());
+
+ let saved_tasks: Vec> = sqlite
+ .get_tasks(TaskStatus::Pending)
+ .map(|item| item.unwrap())
+ .collect()
+ .await;
+
+ assert_eq!(tasks.len(), saved_tasks.len());
+
+ let mut zip = tasks.into_iter().zip(saved_tasks.into_iter());
+
+ assert!(zip.all(|(a, b)| { a.get_key() == b.get_key() }));
+
+ Ok(())
+ }
+
+ #[sqlx::test(migrator = "MIGRATIONS")]
+ #[traced_test]
+ async fn it_return_paginated_tasks(pool: SqlitePool) -> sqlx::Result<()> {
+ let (sqlite, tasks) = setup(pool.clone(), 300);
+
+ sqlite.insert_tasks(&tasks).await.unwrap();
+
+ let page_options = TaskPagination::new().with_limit(Some(25));
+
+ let first_page: TasksPage =
+ sqlite.get_paginated_tasks(page_options).await.unwrap();
+
+ assert_eq!(first_page.tasks.len(), 25);
+ assert_eq!(first_page.tasks.first().unwrap().get_key(), tasks.get(0).unwrap().get_key());
+
+ let second_page: TasksPage =
+ sqlite.get_paginated_tasks(first_page.next()).await.unwrap();
+
+ assert_eq!(second_page.tasks.len(), 25);
+ assert_eq!(second_page.tasks.first().unwrap().get_key(), tasks.get(25).unwrap().get_key());
+
+ Ok(())
+ }
+}
diff --git a/lib_sync_core/src/error.rs b/lib_sync_core/src/error.rs
new file mode 100644
index 0000000..93c8a0b
--- /dev/null
+++ b/lib_sync_core/src/error.rs
@@ -0,0 +1,24 @@
+use thiserror::Error;
+
+#[derive(Error, Debug)]
+pub enum Error {
+ #[error("{0}")]
+ Exception(&'static str),
+
+ #[error("{0}")]
+ Unhandled(&'static str),
+
+ #[error(transparent)]
+ Io(#[from] tokio::io::Error),
+
+ #[error(transparent)]
+ Sqlx(#[from] sqlx::Error),
+
+ #[error(transparent)]
+ Migration(#[from] sqlx::migrate::MigrateError),
+
+ #[error(transparent)]
+ ParseJson(#[from] serde_json::Error),
+}
+
+pub type Result = std::result::Result;
\ No newline at end of file
diff --git a/lib_sync_core/src/lib.rs b/lib_sync_core/src/lib.rs
new file mode 100644
index 0000000..1030a9a
--- /dev/null
+++ b/lib_sync_core/src/lib.rs
@@ -0,0 +1,5 @@
+pub mod error;
+
+pub(crate) use error::*;
+pub mod tasks;
+mod database;
diff --git a/lib_sync_core/src/tasks.rs b/lib_sync_core/src/tasks.rs
new file mode 100644
index 0000000..f455941
--- /dev/null
+++ b/lib_sync_core/src/tasks.rs
@@ -0,0 +1,92 @@
+use chrono::Utc;
+use serde::de::DeserializeOwned;
+use serde::Serialize;
+use std::fmt::Display;
+use tabled::Tabled;
+
+mod manager;
+mod jobs;
+mod worker;
+mod bus;
+
+#[derive(sqlx::Type, Debug, Clone)]
+#[repr(u8)]
+pub enum TaskStatus {
+ Pending = 1,
+ InProgress = 2,
+ Completed = 3,
+ Failed = 4,
+}
+
+impl Display for TaskStatus {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ match self {
+ TaskStatus::Pending => {
+ write!(f, "Pending")
+ }
+ TaskStatus::InProgress => {
+ write!(f, "In Progress")
+ }
+ TaskStatus::Completed => {
+ write!(f, "Completed")
+ }
+ TaskStatus::Failed => {
+ write!(f, "Failed")
+ }
+ }
+ }
+}
+
+pub trait TaskPayload:
+ Serialize + DeserializeOwned + Send + Unpin + 'static + std::fmt::Debug
+{
+}
+impl
+ TaskPayload for T
+{
+}
+
+#[derive(sqlx::FromRow, Tabled, Debug)]
+pub struct Task {
+ id: u32,
+ payload_key: String,
+ #[sqlx(json)]
+ #[tabled(skip)]
+ payload: T,
+ #[sqlx(rename = "status_id")]
+ status: TaskStatus,
+ created_at: chrono::DateTime,
+ #[tabled(display = "display_option_date")]
+ updated_at: Option>,
+}
+
+impl Task {
+ pub fn new(payload_key: String, payload: T, status: TaskStatus) -> Self {
+ Self {
+ id: 0,
+ payload_key,
+ payload,
+ status,
+ created_at: Default::default(),
+ updated_at: None,
+ }
+ }
+}
+
+impl Task {
+ pub fn payload(&self) -> &T {
+ &self.payload
+ }
+
+ pub fn get_key(&self) -> String {
+ self.payload_key.clone()
+ }
+}
+
+fn display_option_date(o: &Option>) -> String {
+ match o {
+ Some(s) => format!("{}", s),
+ None => String::from(""),
+ }
+}
+
diff --git a/lib_sync_core/src/tasks/bus.rs b/lib_sync_core/src/tasks/bus.rs
new file mode 100644
index 0000000..4e96e56
--- /dev/null
+++ b/lib_sync_core/src/tasks/bus.rs
@@ -0,0 +1,4 @@
+#[derive(Clone)]
+pub enum Bus {
+ Local,
+}
\ No newline at end of file
diff --git a/lib_sync_core/src/tasks/jobs.rs b/lib_sync_core/src/tasks/jobs.rs
new file mode 100644
index 0000000..74a8ca0
--- /dev/null
+++ b/lib_sync_core/src/tasks/jobs.rs
@@ -0,0 +1,3 @@
+use crate::tasks::{Task, TaskStatus};
+
+pub type TaskJob = fn(&Task) -> TaskStatus;
\ No newline at end of file
diff --git a/lib_sync_core/src/tasks/manager.rs b/lib_sync_core/src/tasks/manager.rs
new file mode 100644
index 0000000..eaaef77
--- /dev/null
+++ b/lib_sync_core/src/tasks/manager.rs
@@ -0,0 +1,190 @@
+use crate::database::TaskStorage;
+use crate::tasks::bus::Bus;
+use crate::tasks::jobs::TaskJob;
+use crate::tasks::{Task, TaskPayload, TaskStatus};
+use futures::StreamExt;
+use std::marker::PhantomData;
+use std::pin::pin;
+use tokio::sync::mpsc::Receiver;
+use tokio::sync::{mpsc, oneshot};
+use tokio::sync::oneshot::Sender;
+use crate::tasks::worker::TaskMessage;
+
+pub enum RateLimit {
+ Buffer(usize),
+ Rate(usize),
+ Ticks(usize),
+ None,
+}
+
+pub struct ManagerOptions {
+ rate_limit: RateLimit,
+ bus: Bus,
+}
+
+impl ManagerOptions {
+ pub fn new() -> Self {
+ Self::default()
+ }
+
+ pub fn with_rate_limit(mut self, rate_limit: RateLimit) -> Self {
+ self.rate_limit = rate_limit;
+ self
+ }
+}
+
+impl Default for ManagerOptions {
+ fn default() -> Self {
+ Self {
+ rate_limit: RateLimit::None,
+ bus: Bus::Local,
+ }
+ }
+}
+
+struct TaskManager> {
+ storage: T,
+ options: ManagerOptions,
+ _marker: PhantomData
,
+}
+
+impl> TaskManager {
+ pub fn new(storage: T, options: ManagerOptions) -> Self {
+ Self {
+ storage,
+ options,
+ _marker: PhantomData,
+ }
+ }
+
+ pub async fn run_tasks(&self, mut task_sink: TaskMessage) -> crate::Result<()> {
+ let rows = self.storage.get_tasks(TaskStatus::Pending);
+ let listener = self.storage.listen_tasks(TaskStatus::Pending);
+
+ let mut queue = pin!(rows.chain(listener));
+
+ while let Some(task) = queue.next().await {
+ let task = match task {
+ Ok(task) => task,
+ Err(e) => {
+ continue
+ }
+ };
+
+ let sink = match task_sink.recv().await {
+ Some(s) => s,
+ None => break, // sink has stoped requesting tasks
+ };
+
+ if let Err(_) = sink.send(task) {
+ continue;
+ }
+
+ // (task, status)
+ }
+
+ Ok(())
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use crate::database::{TaskPagination, TasksPage};
+ use async_stream::stream;
+ use fake::{Dummy, Fake, Faker};
+ use futures::Stream;
+ use serde::{Deserialize, Serialize};
+ use sqlx::types::Uuid;
+ use sync::mpsc;
+ use tokio::sync;
+ use tokio_stream::wrappers::ReceiverStream;
+ use tracing_test::traced_test;
+ use crate::error::Error;
+ use crate::tasks::worker::{Worker, WorkerManager};
+
+ #[derive(Dummy, Serialize, Deserialize, Debug)]
+ struct DummyTaskPayload {
+ key: Uuid,
+ _foo: String,
+ _bar: String,
+ }
+
+ struct DummyTaskStorage {}
+
+ impl TaskStorage for DummyTaskStorage {
+ async fn insert_tasks<'a, I: IntoIterator- >>(
+ &self,
+ _: I,
+ ) -> crate::error::Result<()> {
+ todo!()
+ }
+
+ fn get_tasks(
+ &self,
+ task_status: TaskStatus,
+ ) -> impl Stream
- >> {
+ let payloads: Vec = Faker.fake();
+
+ let tasks = payloads.into_iter().enumerate().map(move |(i, item)| {
+ Ok(Task::new((i + 1).to_string(), item, task_status.clone()))
+ });
+
+ futures::stream::iter(tasks)
+ }
+
+ fn listen_tasks(
+ &self,
+ task_status: TaskStatus,
+ ) -> impl Stream
- >> {
+ let (tx, rx) = mpsc::channel::>>(10);
+
+ tokio::spawn(async move {
+ for _ in 0..10 {
+ tokio::time::sleep(std::time::Duration::from_millis(250)).await;
+
+ let payload: DummyTaskPayload = Faker.fake();
+ let task_status: TaskStatus = task_status.clone();
+ let task = Ok(Task::new(payload.key.to_string(), payload, task_status));
+
+ if let Err(_) = tx.send(task).await {
+ break;
+ }
+ }
+ });
+
+ ReceiverStream::new(rx)
+ }
+
+ async fn get_paginated_tasks(
+ &self,
+ _: TaskPagination,
+ ) -> crate::error::Result> {
+ todo!()
+ }
+ }
+
+ struct DummyWorker;
+
+ impl Worker for DummyWorker {
+ fn process_job(task: &Task) -> crate::error::Result<()> {
+ println!("{:#?}", task);
+ Ok(())
+ }
+
+ async fn on_job_failure(task: &Task, error: Error) -> crate::error::Result<()> {
+ println!("{:#?} {:?}", task, error);
+ Ok(())
+ }
+ }
+
+ #[tokio::test]
+ #[traced_test]
+ async fn manager_runs() {
+ let execute_options = ManagerOptions::new();
+ let local_worker_sink = WorkerManager::get_listener_sink::(execute_options.bus.clone());
+ let task_manager = TaskManager::new(DummyTaskStorage {}, execute_options);
+
+ task_manager.run_tasks(local_worker_sink).await.unwrap()
+ }
+}
diff --git a/lib_sync_core/src/tasks/worker.rs b/lib_sync_core/src/tasks/worker.rs
new file mode 100644
index 0000000..7e3d917
--- /dev/null
+++ b/lib_sync_core/src/tasks/worker.rs
@@ -0,0 +1,48 @@
+use crate::error::Error;
+use crate::tasks::bus::Bus;
+use crate::tasks::{Task, TaskPayload};
+use tokio::sync::mpsc::Receiver;
+use tokio::sync::oneshot::Sender;
+use tokio::sync::{mpsc, oneshot};
+
+pub type TaskMessage = Receiver>>;
+
+pub struct WorkerManager;
+
+impl WorkerManager {
+ pub fn get_listener_sink>(bus: Bus) -> TaskMessage {
+ match bus {
+ Bus::Local => {
+ let (bus_tx, bus_rx) = mpsc::channel(100);
+ tokio::spawn(async move {
+ loop {
+ // TODO: properly catch errors
+ let (tx, rx) = oneshot::channel();
+
+ // Request a task
+ bus_tx.send(tx).await.unwrap();
+
+ // Wait for a task to be returned
+ let task = rx.await.unwrap();
+
+ W::process_job(&task).unwrap();
+ }
+ });
+
+ bus_rx
+ }
+ }
+ }
+}
+
+pub trait Worker {
+ async fn pre_process_job(task: &Task) -> crate::Result<()> {
+ Ok(())
+ }
+ fn process_job(task: &Task) -> crate::Result<()>;
+ async fn post_process_job(task: &Task) -> crate::Result<()> {
+ Ok(())
+ }
+
+ async fn on_job_failure(task: &Task, error: Error) -> crate::Result<()>;
+}
diff --git a/migrations/0003_tasks.sql b/migrations/0003_tasks.sql
index 0b7cb54..85d03ec 100644
--- a/migrations/0003_tasks.sql
+++ b/migrations/0003_tasks.sql
@@ -3,7 +3,7 @@ create table tasks
id integer not null
constraint tasks_pk
primary key autoincrement,
- payload_key ANY not null
+ payload_key TEXT not null
constraint tasks_payload_key
unique on conflict ignore,
payload TEXT not null,
diff --git a/src/main.rs b/src/main.rs
deleted file mode 100644
index 985c0e4..0000000
--- a/src/main.rs
+++ /dev/null
@@ -1,35 +0,0 @@
-use clap::Parser;
-use readwise_bulk_upload::config::Config;
-use readwise_bulk_upload::readwise::DocumentPayload;
-use readwise_bulk_upload::sql::TaskManager;
-use readwise_bulk_upload::{Error, Result};
-use std::fs::File;
-use tracing_subscriber;
-use figment::{Figment, providers::{Serialized, Env}};
-
-#[tokio::main]
-async fn main() -> Result<()> {
- let args: Config = Figment::new()
- .merge(Serialized::defaults(Config::parse()))
- .merge(Env::prefixed("APP_"))
- .extract()?;
-
- tracing_subscriber::fmt()
- .with_max_level(args.log_level())
- .init();
-
- let file = File::open(args.path()).map_err(|_| {
- Error::Runtime(format!(
- r#"The file "{}" could not be open"#,
- args.path().display()
- ))
- })?;
-
- let documents: Vec = serde_json::from_reader(file)?;
-
- let task_manager = TaskManager::new().await?;
-
- task_manager.load_tasks(documents).await?;
-
- Ok(())
-}
diff --git a/src/sql.rs b/src/sql.rs
deleted file mode 100644
index 7d2a442..0000000
--- a/src/sql.rs
+++ /dev/null
@@ -1,94 +0,0 @@
-use crate::Error;
-use directories::ProjectDirs;
-use serde::Serialize;
-use sqlx::sqlite::{SqliteConnectOptions, SqliteJournalMode};
-use sqlx::{QueryBuilder, Sqlite, SqlitePool};
-use tokio::fs;
-use tracing::{info, instrument};
-
-static SQLITE_BIND_LIMIT: usize = 32766;
-
-#[derive(sqlx::Type)]
-#[repr(u8)]
-pub enum TaskStatus {
- Pending = 1,
- InProgress = 2,
- Completed = 3,
- Failed = 4,
-}
-
-pub trait TaskPayload {
- fn get_key(&self) -> String;
-}
-
-#[derive(Debug)]
-pub struct TaskManager {
- pool: SqlitePool,
-}
-
-impl TaskManager {
- pub async fn new() -> Result {
- Ok(Self {
- pool: Self::connect_database().await?,
- })
- }
-
- async fn connect_database() -> crate::Result {
- let project_dir = ProjectDirs::from("", "", env!("CARGO_PKG_NAME"))
- .ok_or(Error::Unhandled("Could not get standard directories"))?;
-
- let database_file_path = project_dir.data_dir().join("db.sql");
-
- fs::create_dir_all(project_dir.data_dir()).await?;
-
- let opts = SqliteConnectOptions::new()
- .filename(database_file_path)
- .create_if_missing(true)
- .journal_mode(SqliteJournalMode::Wal);
-
- let pool = SqlitePool::connect_with(opts).await?;
-
- sqlx::migrate!("./migrations").run(&pool).await?;
-
- Ok(pool)
- }
-
- #[instrument(skip(self, values))]
- pub async fn load_tasks(&self, values: Vec) -> crate::Result<()>
- where
- T: TaskPayload + Serialize + std::fmt::Debug,
- {
- let mut tx = self.pool.begin().await?;
- let mut builder: QueryBuilder<'_, Sqlite> =
- QueryBuilder::new("insert into tasks(payload_key, payload, status_id)");
-
- let args: crate::Result> = values
- .iter()
- .map(|value| Ok((value.get_key(), serde_json::to_string(value)?)))
- .collect();
-
-
- let mut affected_rows = 0;
- // Chunk the query by the size limit of bind params
- for chunk in args?.chunks(SQLITE_BIND_LIMIT / 3) {
- builder.push_values(chunk, |mut builder, item| {
- builder
- .push_bind(&item.0)
- .push_bind(&item.1)
- .push_bind(TaskStatus::Pending);
- });
- builder.push("ON conflict (payload_key) DO NOTHING");
-
- let query = builder.build();
-
- affected_rows += query.execute(&mut *tx).await?.rows_affected();
- builder.reset();
- }
-
- tx.commit().await?;
-
- info!("{} rows inserted.", affected_rows);
-
- Ok(())
- }
-}
diff --git a/web/Cargo.toml b/web/Cargo.toml
new file mode 100644
index 0000000..52dfac4
--- /dev/null
+++ b/web/Cargo.toml
@@ -0,0 +1,6 @@
+[package]
+name = "web"
+version = "0.1.0"
+edition = "2024"
+
+[dependencies]
diff --git a/web/src/main.rs b/web/src/main.rs
new file mode 100644
index 0000000..e7a11a9
--- /dev/null
+++ b/web/src/main.rs
@@ -0,0 +1,3 @@
+fn main() {
+ println!("Hello, world!");
+}