diff --git a/.gitignore b/.gitignore
index ea8c4bf..b60de5b 100644
--- a/.gitignore
+++ b/.gitignore
@@ -1 +1 @@
-/target
+**/target
diff --git a/.idea/dataSources.xml b/.idea/dataSources.xml
index 283ef50..38d7bd6 100644
--- a/.idea/dataSources.xml
+++ b/.idea/dataSources.xml
@@ -8,5 +8,12 @@
jdbc:sqlite:$USER_HOME$/.local/share/readwise-bulk-upload/db.sql
$ProjectFileDir$
+
+ sqlite.xerial
+ true
+ org.sqlite.JDBC
+ jdbc:sqlite:$PROJECT_DIR$/lib_sync_core/target/sqlx/test-dbs/lib_sync_core/database/sqlite/tests/it_save_tasks.sqlite
+ $ProjectFileDir$
+
\ No newline at end of file
diff --git a/Cargo.lock b/Cargo.lock
index 827a82a..c144106 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -398,6 +398,40 @@ dependencies = [
"typenum",
]
+[[package]]
+name = "darling"
+version = "0.20.11"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "fc7f46116c46ff9ab3eb1597a45688b6715c6e628b5c133e288e709a29bcb4ee"
+dependencies = [
+ "darling_core",
+ "darling_macro",
+]
+
+[[package]]
+name = "darling_core"
+version = "0.20.11"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "0d00b9596d185e565c2207a0b01f8bd1a135483d02d9b7b0a54b11da8d53412e"
+dependencies = [
+ "fnv",
+ "ident_case",
+ "proc-macro2",
+ "quote",
+ "syn",
+]
+
+[[package]]
+name = "darling_macro"
+version = "0.20.11"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "fc34b93ccb385b40dc71c6fceac4b2ad23662c7eeb248cf10d529b7e055b6ead"
+dependencies = [
+ "darling_core",
+ "quote",
+ "syn",
+]
+
[[package]]
name = "der"
version = "0.7.10"
@@ -409,6 +443,12 @@ dependencies = [
"zeroize",
]
+[[package]]
+name = "deunicode"
+version = "1.6.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "abd57806937c9cc163efc8ea3910e00a62e2aeb0b8119f1793a978088f8f6b04"
+
[[package]]
name = "digest"
version = "0.10.7"
@@ -459,6 +499,18 @@ version = "0.15.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1aaf95b3e5c8f23aa320147307562d361db0ae0d51242340f558153b4eb2439b"
+[[package]]
+name = "dummy"
+version = "0.11.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "3bbcf21279103a67372982cb1156a2154a452451dff2b884cf897ccecce389e0"
+dependencies = [
+ "darling",
+ "proc-macro2",
+ "quote",
+ "syn",
+]
+
[[package]]
name = "either"
version = "1.15.0"
@@ -506,6 +558,21 @@ dependencies = [
"pin-project-lite",
]
+[[package]]
+name = "fake"
+version = "4.3.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "2f5f203b70a419cb8880d1cfe6bebe488add0a0307d404e9f24021e5fd864b80"
+dependencies = [
+ "chrono",
+ "deunicode",
+ "dummy",
+ "http",
+ "rand 0.9.1",
+ "url-escape",
+ "uuid",
+]
+
[[package]]
name = "fastrand"
version = "2.3.0"
@@ -536,6 +603,12 @@ dependencies = [
"spin",
]
+[[package]]
+name = "fnv"
+version = "1.0.7"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1"
+
[[package]]
name = "foldhash"
version = "0.1.5"
@@ -749,6 +822,17 @@ dependencies = [
"windows-sys 0.59.0",
]
+[[package]]
+name = "http"
+version = "1.3.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "f4a85d31aea989eead29a3aaf9e1115a180df8282431156e533de47660892565"
+dependencies = [
+ "bytes",
+ "fnv",
+ "itoa",
+]
+
[[package]]
name = "iana-time-zone"
version = "0.1.63"
@@ -891,6 +975,12 @@ dependencies = [
"syn",
]
+[[package]]
+name = "ident_case"
+version = "1.0.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "b9e0384b61958566e926dc50660321d12159025e767c18e043daf26b70104c39"
+
[[package]]
name = "idna"
version = "1.0.3"
@@ -967,6 +1057,7 @@ dependencies = [
"chrono",
"clap",
"directories",
+ "fake",
"figment",
"futures",
"serde",
@@ -978,6 +1069,7 @@ dependencies = [
"tracing",
"tracing-core",
"tracing-subscriber",
+ "uuid",
]
[[package]]
@@ -1108,7 +1200,7 @@ dependencies = [
"num-integer",
"num-iter",
"num-traits",
- "rand",
+ "rand 0.8.5",
"smallvec",
"zeroize",
]
@@ -1362,8 +1454,18 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "34af8d1a0e25924bc5b7c43c079c942339d8f0a8b57c39049bef581b46327404"
dependencies = [
"libc",
- "rand_chacha",
- "rand_core",
+ "rand_chacha 0.3.1",
+ "rand_core 0.6.4",
+]
+
+[[package]]
+name = "rand"
+version = "0.9.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "9fbfd9d094a40bf3ae768db9361049ace4c0e04a4fd6b359518bd7b73a73dd97"
+dependencies = [
+ "rand_chacha 0.9.0",
+ "rand_core 0.9.3",
]
[[package]]
@@ -1373,7 +1475,17 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e6c10a63a0fa32252be49d21e7709d4d4baf8d231c2dbce1eaa8141b9b127d88"
dependencies = [
"ppv-lite86",
- "rand_core",
+ "rand_core 0.6.4",
+]
+
+[[package]]
+name = "rand_chacha"
+version = "0.9.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "d3022b5f1df60f26e1ffddd6c66e8aa15de382ae63b3a0c1bfc0e4d3e3f325cb"
+dependencies = [
+ "ppv-lite86",
+ "rand_core 0.9.3",
]
[[package]]
@@ -1385,6 +1497,15 @@ dependencies = [
"getrandom 0.2.16",
]
+[[package]]
+name = "rand_core"
+version = "0.9.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "99d9a13982dcf210057a8a78572b2217b667c3beacbf3a0d8b454f6f82837d38"
+dependencies = [
+ "getrandom 0.3.2",
+]
+
[[package]]
name = "redox_syscall"
version = "0.5.12"
@@ -1462,7 +1583,7 @@ dependencies = [
"num-traits",
"pkcs1",
"pkcs8",
- "rand_core",
+ "rand_core 0.6.4",
"signature",
"spki",
"subtle",
@@ -1561,6 +1682,12 @@ dependencies = [
"digest",
]
+[[package]]
+name = "sha1_smol"
+version = "1.0.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "bbfa15b3dddfee50a0fff136974b3e1bde555604ba463834a7eb7deb6417705d"
+
[[package]]
name = "sha2"
version = "0.10.9"
@@ -1594,7 +1721,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "77549399552de45a898a580c1b41d445bf730df867cc44e6c0233bbc4b8329de"
dependencies = [
"digest",
- "rand_core",
+ "rand_core 0.6.4",
]
[[package]]
@@ -1690,6 +1817,7 @@ dependencies = [
"tokio-stream",
"tracing",
"url",
+ "uuid",
]
[[package]]
@@ -1761,7 +1889,7 @@ dependencies = [
"memchr",
"once_cell",
"percent-encoding",
- "rand",
+ "rand 0.8.5",
"rsa",
"serde",
"sha1",
@@ -1771,6 +1899,7 @@ dependencies = [
"stringprep",
"thiserror",
"tracing",
+ "uuid",
"whoami",
]
@@ -1800,7 +1929,7 @@ dependencies = [
"md-5",
"memchr",
"once_cell",
- "rand",
+ "rand 0.8.5",
"serde",
"serde_json",
"sha2",
@@ -1809,6 +1938,7 @@ dependencies = [
"stringprep",
"thiserror",
"tracing",
+ "uuid",
"whoami",
]
@@ -1835,6 +1965,7 @@ dependencies = [
"thiserror",
"tracing",
"url",
+ "uuid",
]
[[package]]
@@ -2148,6 +2279,15 @@ dependencies = [
"percent-encoding",
]
+[[package]]
+name = "url-escape"
+version = "0.1.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "44e0ce4d1246d075ca5abec4b41d33e87a6054d08e2366b63205665e950db218"
+dependencies = [
+ "percent-encoding",
+]
+
[[package]]
name = "utf16_iter"
version = "1.0.5"
@@ -2166,6 +2306,19 @@ version = "0.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821"
+[[package]]
+name = "uuid"
+version = "1.16.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "458f7a779bf54acc9f347480ac654f68407d3aab21269a6e3c9f922acd9e2da9"
+dependencies = [
+ "atomic",
+ "getrandom 0.3.2",
+ "md-5",
+ "serde",
+ "sha1_smol",
+]
+
[[package]]
name = "valuable"
version = "0.1.1"
diff --git a/lib_sync_core/Cargo.toml b/lib_sync_core/Cargo.toml
index c2bd944..82df9ee 100644
--- a/lib_sync_core/Cargo.toml
+++ b/lib_sync_core/Cargo.toml
@@ -6,7 +6,7 @@ edition = "2024"
[dependencies]
directories = "6.0.0"
tokio = { version = "1.45.0", features = ["default", "rt", "rt-multi-thread", "macros"] }
-sqlx = { version = "0.8", features = [ "runtime-tokio", "sqlite", "chrono", "migrate" ] }
+sqlx = { version = "0.8", features = ["runtime-tokio", "sqlite", "chrono", "migrate", "uuid"] }
clap = { version = "4.5.37", features = ["derive"] }
serde = { version = "1.0.219", features = ["derive"] }
chrono = {version = "0.4.41", features = ["serde"]}
@@ -19,3 +19,7 @@ tabled = "0.19.0"
futures = "0.3.31"
thiserror = "2.0.12"
async-stream = "0.3.6"
+uuid = { version = "1.16.0", features = ["serde", "v4"] }
+
+[dev-dependencies]
+fake = { version = "4.3.0", features = ["derive", "chrono", "http", "uuid"] }
diff --git a/lib_sync_core/src/database.rs b/lib_sync_core/src/database.rs
index f2bc877..b01914f 100644
--- a/lib_sync_core/src/database.rs
+++ b/lib_sync_core/src/database.rs
@@ -69,7 +69,7 @@ impl TasksPage {
}
pub trait TaskStorage {
- async fn insert_tasks(&self, tasks: Vec>) -> crate::Result<()>;
+ async fn insert_tasks<'a, I: IntoIterator- >>(&self, tasks: I) -> crate::Result<()>;
fn get_tasks(&self, options: TaskStatus) -> impl Stream
- >>;
async fn get_paginated_tasks(&self, page: &TaskPagination) -> crate::Result>;
diff --git a/lib_sync_core/src/database/sqlite.rs b/lib_sync_core/src/database/sqlite.rs
index c7fc993..d231990 100644
--- a/lib_sync_core/src/database/sqlite.rs
+++ b/lib_sync_core/src/database/sqlite.rs
@@ -1,14 +1,14 @@
+use crate::database::{TaskPagination, TaskStorage, TasksPage};
+use crate::tasks::{Task, TaskPayload, TaskStatus};
+use futures::{Stream, TryStreamExt};
+use sqlx::sqlite::{SqliteConnectOptions, SqliteJournalMode};
use sqlx::{QueryBuilder, SqlitePool};
use std::path::PathBuf;
use tokio::fs;
-use sqlx::sqlite::{SqliteConnectOptions, SqliteJournalMode};
use tracing::{info, instrument};
-use futures::{Stream, TryStreamExt};
-use crate::database::{TaskPagination, TaskStorage, TasksPage};
-use crate::tasks::{Task, TaskPayload, TaskStatus};
-#[allow(unused)]
static SQLITE_BIND_LIMIT: usize = 32766;
+static MIGRATIONS: sqlx::migrate::Migrator = sqlx::migrate!("../migrations");
#[derive(Debug)]
pub struct Sqlite {
@@ -36,21 +36,37 @@ impl Sqlite {
let pool = SqlitePool::connect_with(opts).await?;
- sqlx::migrate!("../migrations").run(&pool).await?;
+ MIGRATIONS.run(&pool).await?;
Ok(pool)
}
}
impl TaskStorage for Sqlite {
+ /// Insert task into the database for later processing
+ ///
+ /// # Arguments
+ ///
+ /// * `tasks`: A list of task to be processed, each task has to have a unique key, if a key is repeated, the item will be omitted
+ ///
+ /// returns: Result<(), Error>
+ ///
+ /// # Examples
+ ///
+ /// ```
+ ///
+ /// ```
#[instrument(skip(self, tasks))]
- async fn insert_tasks(&self, tasks: Vec>) -> crate::Result<()> {
+ async fn insert_tasks<'a, I: IntoIterator
- >>(
+ &self,
+ tasks: I,
+ ) -> crate::Result<()> {
let mut tx = self.pool.begin().await?;
let mut builder: QueryBuilder<'_, sqlx::Sqlite> =
QueryBuilder::new("insert into tasks(payload_key, payload, status_id)");
let args: crate::Result> = tasks
- .iter()
+ .into_iter()
.map(|value| Ok((value.get_key(), serde_json::to_string(value.payload())?)))
.collect();
@@ -79,14 +95,15 @@ impl TaskStorage for Sqlite {
}
fn get_tasks(&self, task_status: TaskStatus) -> impl Stream
- >> {
- let query= sqlx::query_as::<_, Task>(
+ let query = sqlx::query_as::<_, Task>(
"
SELECT id, payload_key, payload, status_id, created_at, updated_at
FROM tasks
WHERE status_id = ?
ORDER BY created_at DESC
",
- ).bind(task_status);
+ )
+ .bind(task_status);
query.fetch(&self.pool).err_into::()
}
@@ -110,8 +127,71 @@ impl TaskStorage for Sqlite {
builder.push("LIMIT ").push_bind(limit);
}
- let tasks = builder.build_query_as::>().fetch_all(&self.pool).await?;
+ let tasks = builder
+ .build_query_as::>()
+ .fetch_all(&self.pool)
+ .await?;
Ok(TasksPage::new(tasks, page.clone()))
}
-}
\ No newline at end of file
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use fake::{Dummy, Fake, Faker};
+ use futures::StreamExt;
+ use serde::{Deserialize, Serialize};
+ use sqlx::types::Uuid;
+ use sqlx::Row;
+
+ #[derive(Dummy, Serialize, Deserialize, Debug)]
+struct DummyTaskPayload {
+ key: Uuid,
+ _foo: String,
+ _baar: String,
+ }
+
+ #[sqlx::test(migrator = "MIGRATIONS")]
+ async fn it_save_tasks(pool: SqlitePool) -> sqlx::Result<()> {
+ let owned_pool = pool.clone();
+ let sqlite = Sqlite { pool: owned_pool };
+
+ let tasks = generate_dummy_tasks();
+
+ sqlite.insert_tasks(&tasks).await.unwrap();
+
+ let result = sqlx::query("select count(id) from tasks")
+ .fetch_one(&pool)
+ .await?;
+
+ let total_rows: u64 = result.get(0);
+
+ assert_eq!(total_rows as usize, tasks.len());
+
+ let saved_tasks: Vec> = sqlite
+ .get_tasks(TaskStatus::Pending)
+ .map(|item| item.unwrap())
+ .collect()
+ .await;
+
+ assert_eq!(tasks.len(), saved_tasks.len());
+
+ let mut zip = tasks.into_iter().zip(saved_tasks.into_iter());
+
+ assert!(zip.all(|(a, b)| {
+ a.get_key() == b.get_key()
+ }));
+
+ Ok(())
+ }
+
+ fn generate_dummy_tasks() -> Vec> {
+ let payloads: Vec = Faker.fake();
+
+ payloads
+ .into_iter()
+ .map(|item| Task::new(item.key.to_string(), item, TaskStatus::Pending))
+ .collect()
+ }
+}
diff --git a/lib_sync_core/src/lib.rs b/lib_sync_core/src/lib.rs
index 586934c..1030a9a 100644
--- a/lib_sync_core/src/lib.rs
+++ b/lib_sync_core/src/lib.rs
@@ -3,18 +3,3 @@ pub mod error;
pub(crate) use error::*;
pub mod tasks;
mod database;
-
-pub fn add(left: u64, right: u64) -> u64 {
- left + right
-}
-
-#[cfg(test)]
-mod tests {
- use super::*;
-
- #[test]
- fn it_works() {
- let result = add(2, 2);
- assert_eq!(result, 4);
- }
-}
diff --git a/lib_sync_core/src/tasks.rs b/lib_sync_core/src/tasks.rs
index 04c22b7..20b61d6 100644
--- a/lib_sync_core/src/tasks.rs
+++ b/lib_sync_core/src/tasks.rs
@@ -1,5 +1,4 @@
use chrono::Utc;
-use futures::StreamExt;
use serde::de::DeserializeOwned;
use serde::Serialize;
use std::fmt::Display;
@@ -35,15 +34,11 @@ impl Display for TaskStatus {
}
}
-pub trait TaskPayloadKey {
- fn get_key(&self) -> String;
-}
-
pub trait TaskPayload:
- Serialize + DeserializeOwned + Send + Unpin + 'static + Display + TaskPayloadKey
+ Serialize + DeserializeOwned + Send + Unpin + 'static
{
}
-impl
+impl
TaskPayload for T
{
}
@@ -64,6 +59,19 @@ pub struct Task {
updated_at: Option>,
}
+impl Task {
+ pub fn new(payload_key: String, payload: T, status: TaskStatus) -> Self {
+ Self {
+ id: 0,
+ payload_key,
+ payload,
+ status,
+ created_at: Default::default(),
+ updated_at: None,
+ }
+ }
+}
+
impl Task {
pub fn payload(&self) -> &T {
&self.payload