From 71c11eaa84988b5907017ca15d6ba696c5e0b6d7 Mon Sep 17 00:00:00 2001 From: aleidk Date: Sun, 11 May 2025 17:09:23 -0400 Subject: [PATCH 01/12] feature: Create command system to handle actions --- .idea/vcs.xml | 5 +++++ src/config.rs | 28 +++++++++++++++++++++++----- src/main.rs | 45 ++++++++++++++++++++++++++++++--------------- 3 files changed, 58 insertions(+), 20 deletions(-) diff --git a/.idea/vcs.xml b/.idea/vcs.xml index 35eb1dd..8d3e42f 100644 --- a/.idea/vcs.xml +++ b/.idea/vcs.xml @@ -1,5 +1,10 @@ + + + diff --git a/src/config.rs b/src/config.rs index 680fdf7..3fb8d52 100644 --- a/src/config.rs +++ b/src/config.rs @@ -1,4 +1,4 @@ -use clap::{Parser, ValueEnum}; +use clap::{Parser, Subcommand, ValueEnum}; use serde::{Deserialize, Serialize}; use std::fmt; use std::path::PathBuf; @@ -57,9 +57,30 @@ impl Into for VerbosityLevel { } } +#[derive(Debug, Subcommand)] +#[clap(rename_all = "snake_case")] +pub enum Command { + /// Load task into the database from [path] + LoadTasks{ + /// Path to the file + path: PathBuf, + }, + #[clap(skip)] + None, +} + +impl Default for Command { + fn default() -> Self { + Command::None + } +} + #[derive(Debug, Parser, Serialize, Deserialize)] pub struct Config { - path: PathBuf, + #[command(subcommand)] + #[serde(skip)] + pub command: Command, + #[arg( long, short = 'v', @@ -71,9 +92,6 @@ pub struct Config { } impl Config { - pub fn path(&self) -> &PathBuf { - &self.path - } pub fn log_level(&self) -> LevelFilter { self.log_level.clone().into() diff --git a/src/main.rs b/src/main.rs index 985c0e4..b1fb8e8 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,5 +1,5 @@ -use clap::Parser; -use readwise_bulk_upload::config::Config; +use clap::{CommandFactory, Parser}; +use readwise_bulk_upload::config::{Command, Config}; use readwise_bulk_upload::readwise::DocumentPayload; use readwise_bulk_upload::sql::TaskManager; use readwise_bulk_upload::{Error, Result}; @@ -9,8 +9,9 @@ use figment::{Figment, providers::{Serialized, Env}}; #[tokio::main] async fn main() -> Result<()> { + let cli = Config::parse(); let args: Config = Figment::new() - .merge(Serialized::defaults(Config::parse())) + .merge(Serialized::defaults(&cli)) .merge(Env::prefixed("APP_")) .extract()?; @@ -18,18 +19,32 @@ async fn main() -> Result<()> { .with_max_level(args.log_level()) .init(); - let file = File::open(args.path()).map_err(|_| { - Error::Runtime(format!( - r#"The file "{}" could not be open"#, - args.path().display() - )) - })?; - - let documents: Vec = serde_json::from_reader(file)?; - - let task_manager = TaskManager::new().await?; - - task_manager.load_tasks(documents).await?; + run(&cli.command).await?; + + Ok(()) +} + +async fn run(command: &Command) -> Result<()> { + + match command { + Command::LoadTasks { path } => { + let file = File::open(path).map_err(|_| { + Error::Runtime(format!( + r#"The file "{}" could not be open"#, + path.display() + )) + })?; + + let documents: Vec = serde_json::from_reader(file)?; + + let task_manager = TaskManager::new().await?; + + task_manager.load_tasks(documents).await?; + } + Command::None => { + Config::command().print_help()?; + } + } Ok(()) } From 63c20cfc87415ad06f68fccbefa95a085dc964fd Mon Sep 17 00:00:00 2001 From: aleidk Date: Mon, 12 May 2025 14:32:44 -0400 Subject: [PATCH 02/12] feature: add command to query tasks --- Cargo.lock | 92 +++++++++++++++++++++++++++++++++ Cargo.toml | 1 + src/config.rs | 1 + src/lib.rs | 2 +- src/main.rs | 15 ++++-- src/readwise.rs | 15 +++++- src/{sql.rs => task_manager.rs} | 66 ++++++++++++++++++++++- 7 files changed, 184 insertions(+), 8 deletions(-) rename src/{sql.rs => task_manager.rs} (59%) diff --git a/Cargo.lock b/Cargo.lock index 87f2582..5b6eeaa 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -17,6 +17,19 @@ version = "2.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "512761e0bb2578dd7380c6baaa0f4ce03e84f95e960231d1dec8bf4d7d6e2627" +[[package]] +name = "ahash" +version = "0.8.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5a15f179cd60c4584b8a8c596927aadc462e27f2ca70c04e0071964a73ba7a75" +dependencies = [ + "cfg-if", + "getrandom 0.3.2", + "once_cell", + "version_check", + "zerocopy", +] + [[package]] name = "aho-corasick" version = "1.1.3" @@ -172,6 +185,12 @@ version = "3.17.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1628fb46dfa0b37568d12e5edd512553eccf6a22a78e8bde00bb4aed84d5bdbf" +[[package]] +name = "bytecount" +version = "0.6.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5ce89b21cab1437276d2650d57e971f9d548a2d9037cc231abdc0562b97498ce" + [[package]] name = "bytemuck" version = "1.23.0" @@ -1059,6 +1078,17 @@ version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39" +[[package]] +name = "papergrid" +version = "0.15.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "30268a8d20c2c0d126b2b6610ab405f16517f6ba9f244d8c59ac2c512a8a1ce7" +dependencies = [ + "ahash", + "bytecount", + "unicode-width", +] + [[package]] name = "parking" version = "2.2.1" @@ -1174,6 +1204,28 @@ dependencies = [ "zerocopy", ] +[[package]] +name = "proc-macro-error-attr2" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "96de42df36bb9bba5542fe9f1a054b8cc87e172759a1868aa05c1f3acc89dfc5" +dependencies = [ + "proc-macro2", + "quote", +] + +[[package]] +name = "proc-macro-error2" +version = "2.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "11ec05c52be0a07b08061f7dd003e7d7092e0472bc731b4af7bb1ef876109802" +dependencies = [ + "proc-macro-error-attr2", + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "proc-macro2" version = "1.0.95" @@ -1252,6 +1304,7 @@ dependencies = [ "serde", "serde_json", "sqlx", + "tabled", "thiserror", "tokio", "tracing", @@ -1762,6 +1815,30 @@ dependencies = [ "syn", ] +[[package]] +name = "tabled" +version = "0.19.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "228d124371171cd39f0f454b58f73ddebeeef3cef3207a82ffea1c29465aea43" +dependencies = [ + "papergrid", + "tabled_derive", + "testing_table", +] + +[[package]] +name = "tabled_derive" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0ea5d1b13ca6cff1f9231ffd62f15eefd72543dab5e468735f1a456728a02846" +dependencies = [ + "heck", + "proc-macro-error2", + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "tempfile" version = "3.19.1" @@ -1775,6 +1852,15 @@ dependencies = [ "windows-sys 0.59.0", ] +[[package]] +name = "testing_table" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0f8daae29995a24f65619e19d8d31dea5b389f3d853d8bf297bbf607cd0014cc" +dependencies = [ + "unicode-width", +] + [[package]] name = "thiserror" version = "2.0.12" @@ -1972,6 +2058,12 @@ version = "0.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e70f2a8b45122e719eb623c01822704c4e0907e7e426a05927e1a1cfff5b75d0" +[[package]] +name = "unicode-width" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1fc81956842c57dac11422a97c3b8195a1ff727f06e85c84ed2e8aa277c9a0fd" + [[package]] name = "url" version = "2.5.4" diff --git a/Cargo.toml b/Cargo.toml index da95ddc..6482ca3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,3 +16,4 @@ tracing = "0.1.41" tracing-subscriber = { version = "0.3.19" , features = ["env-filter"]} figment = { version = "0.10.19", features = ["env"] } tracing-core = "0.1.33" +tabled = "0.19.0" \ No newline at end of file diff --git a/src/config.rs b/src/config.rs index 3fb8d52..2b71f1c 100644 --- a/src/config.rs +++ b/src/config.rs @@ -65,6 +65,7 @@ pub enum Command { /// Path to the file path: PathBuf, }, + Query, #[clap(skip)] None, } diff --git a/src/lib.rs b/src/lib.rs index dabd6d3..f63d5eb 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,5 +1,5 @@ mod error; -pub mod sql; +pub mod task_manager; pub mod config; pub mod readwise; diff --git a/src/main.rs b/src/main.rs index b1fb8e8..319961a 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,11 +1,15 @@ use clap::{CommandFactory, Parser}; +use figment::{ + providers::{Env, Serialized}, + Figment, +}; use readwise_bulk_upload::config::{Command, Config}; use readwise_bulk_upload::readwise::DocumentPayload; -use readwise_bulk_upload::sql::TaskManager; +use readwise_bulk_upload::task_manager::TaskManager; use readwise_bulk_upload::{Error, Result}; use std::fs::File; +use tabled::Table; use tracing_subscriber; -use figment::{Figment, providers::{Serialized, Env}}; #[tokio::main] async fn main() -> Result<()> { @@ -25,7 +29,6 @@ async fn main() -> Result<()> { } async fn run(command: &Command) -> Result<()> { - match command { Command::LoadTasks { path } => { let file = File::open(path).map_err(|_| { @@ -41,6 +44,12 @@ async fn run(command: &Command) -> Result<()> { task_manager.load_tasks(documents).await?; } + Command::Query => { + let task_manager = TaskManager::new().await?; + let tasks = task_manager.get_tasks::(None, 25).await?; + + println!("{}", Table::new(tasks)); + } Command::None => { Config::command().print_help()?; } diff --git a/src/readwise.rs b/src/readwise.rs index 33bf9ed..e3d7a42 100644 --- a/src/readwise.rs +++ b/src/readwise.rs @@ -1,7 +1,8 @@ +use crate::task_manager::TaskPayload; use chrono::{DateTime, Local}; -use serde::{Deserialize, Deserializer, de, Serialize}; +use serde::{de, Deserialize, Deserializer, Serialize}; use serde_json::Value; -use crate::sql::TaskPayload; +use std::fmt::Display; #[derive(Deserialize, Serialize, Debug)] pub struct DocumentPayload { @@ -14,6 +15,16 @@ pub struct DocumentPayload { location: String, } +impl Display for DocumentPayload { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!( + f, + "{}", + serde_json::to_string_pretty(self).map_err(|_| std::fmt::Error)? + ) + } +} + impl TaskPayload for DocumentPayload { fn get_key(&self) -> String { self.url.clone() diff --git a/src/sql.rs b/src/task_manager.rs similarity index 59% rename from src/sql.rs rename to src/task_manager.rs index 7d2a442..2581187 100644 --- a/src/sql.rs +++ b/src/task_manager.rs @@ -1,14 +1,18 @@ use crate::Error; +use chrono::Utc; use directories::ProjectDirs; +use serde::de::DeserializeOwned; use serde::Serialize; use sqlx::sqlite::{SqliteConnectOptions, SqliteJournalMode}; use sqlx::{QueryBuilder, Sqlite, SqlitePool}; +use std::fmt::Display; +use tabled::Tabled; use tokio::fs; use tracing::{info, instrument}; static SQLITE_BIND_LIMIT: usize = 32766; -#[derive(sqlx::Type)] +#[derive(sqlx::Type, Debug)] #[repr(u8)] pub enum TaskStatus { Pending = 1, @@ -17,10 +21,50 @@ pub enum TaskStatus { Failed = 4, } +impl Display for TaskStatus { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + TaskStatus::Pending => { + write!(f, "Pending") + } + TaskStatus::InProgress => { + write!(f, "In Progress") + } + TaskStatus::Completed => { + write!(f, "Completed") + } + TaskStatus::Failed => { + write!(f, "Failed") + } + } + } +} + pub trait TaskPayload { fn get_key(&self) -> String; } +#[derive(sqlx::FromRow, Tabled, Debug)] +pub struct Task { + id: u32, + payload_key: String, + #[sqlx(json)] + #[tabled(skip)] + payload: T, + #[sqlx(rename = "status_id")] + status: TaskStatus, + created_at: chrono::DateTime, + #[tabled(display = "display_option_date")] + updated_at: Option>, +} + +fn display_option_date(o: &Option>) -> String { + match o { + Some(s) => format!("{}", s), + None => String::from(""), + } +} + #[derive(Debug)] pub struct TaskManager { pool: SqlitePool, @@ -53,6 +97,25 @@ impl TaskManager { Ok(pool) } + pub async fn get_tasks< + T: DeserializeOwned + Send + Unpin + 'static + Display, + >( + &self, + status: Option, + limit: u16, + ) -> crate::Result>> { + let tasks: Vec> = sqlx::query_as( + "select id, payload_key, payload, status_id, created_at, updated_at from tasks where status_id = ? order by ? limit ?", + ) + .bind(status.unwrap_or(TaskStatus::Pending)) + .bind("created_at DESC") + .bind(limit) + .fetch_all(&self.pool) + .await?; + + Ok(tasks) + } + #[instrument(skip(self, values))] pub async fn load_tasks(&self, values: Vec) -> crate::Result<()> where @@ -67,7 +130,6 @@ impl TaskManager { .map(|value| Ok((value.get_key(), serde_json::to_string(value)?))) .collect(); - let mut affected_rows = 0; // Chunk the query by the size limit of bind params for chunk in args?.chunks(SQLITE_BIND_LIMIT / 3) { From 53872108446597439ac77045b0f656a7f5f4e864 Mon Sep 17 00:00:00 2001 From: aleidk Date: Mon, 12 May 2025 16:40:41 -0400 Subject: [PATCH 03/12] feature: add command to run tasks --- .idea/runConfigurations/Load_Tasks.xml | 22 +++++++++ .idea/runConfigurations/Query_Tasks.xml | 22 +++++++++ .idea/runConfigurations/Run_Tasks.xml | 22 +++++++++ Cargo.lock | 29 +++++++++++ Cargo.toml | 3 +- src/config.rs | 1 + src/main.rs | 14 ++++-- src/task_manager.rs | 64 ++++++++++++++++++++----- 8 files changed, 160 insertions(+), 17 deletions(-) create mode 100644 .idea/runConfigurations/Load_Tasks.xml create mode 100644 .idea/runConfigurations/Query_Tasks.xml create mode 100644 .idea/runConfigurations/Run_Tasks.xml diff --git a/.idea/runConfigurations/Load_Tasks.xml b/.idea/runConfigurations/Load_Tasks.xml new file mode 100644 index 0000000..41f2816 --- /dev/null +++ b/.idea/runConfigurations/Load_Tasks.xml @@ -0,0 +1,22 @@ + + + + \ No newline at end of file diff --git a/.idea/runConfigurations/Query_Tasks.xml b/.idea/runConfigurations/Query_Tasks.xml new file mode 100644 index 0000000..ef4b918 --- /dev/null +++ b/.idea/runConfigurations/Query_Tasks.xml @@ -0,0 +1,22 @@ + + + + \ No newline at end of file diff --git a/.idea/runConfigurations/Run_Tasks.xml b/.idea/runConfigurations/Run_Tasks.xml new file mode 100644 index 0000000..ef08a15 --- /dev/null +++ b/.idea/runConfigurations/Run_Tasks.xml @@ -0,0 +1,22 @@ + + + + \ No newline at end of file diff --git a/Cargo.lock b/Cargo.lock index 5b6eeaa..dfdfe03 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -508,6 +508,21 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "futures" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "65bc07b1a8bc7c85c5f2e110c476c7389b4554ba72af57d8445ea63a576b0876" +dependencies = [ + "futures-channel", + "futures-core", + "futures-executor", + "futures-io", + "futures-sink", + "futures-task", + "futures-util", +] + [[package]] name = "futures-channel" version = "0.3.31" @@ -552,6 +567,17 @@ version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9e5c1b78ca4aae1ac06c48a526a655760685149f0d465d21f37abfe57ce075c6" +[[package]] +name = "futures-macro" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "162ee34ebcb7c64a8abebc059ce0fee27c2262618d7b60ed8faf72fef13c3650" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "futures-sink" version = "0.3.31" @@ -570,8 +596,10 @@ version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9fa08315bb612088cc391249efdc3bc77536f16c91f6cf495e6fbe85b20a4a81" dependencies = [ + "futures-channel", "futures-core", "futures-io", + "futures-macro", "futures-sink", "futures-task", "memchr", @@ -1301,6 +1329,7 @@ dependencies = [ "clap", "directories", "figment", + "futures", "serde", "serde_json", "sqlx", diff --git a/Cargo.toml b/Cargo.toml index 6482ca3..3f51301 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,4 +16,5 @@ tracing = "0.1.41" tracing-subscriber = { version = "0.3.19" , features = ["env-filter"]} figment = { version = "0.10.19", features = ["env"] } tracing-core = "0.1.33" -tabled = "0.19.0" \ No newline at end of file +tabled = "0.19.0" +futures = "0.3.31" \ No newline at end of file diff --git a/src/config.rs b/src/config.rs index 2b71f1c..04bdbff 100644 --- a/src/config.rs +++ b/src/config.rs @@ -66,6 +66,7 @@ pub enum Command { path: PathBuf, }, Query, + Run, #[clap(skip)] None, } diff --git a/src/main.rs b/src/main.rs index 319961a..ff1f2ac 100644 --- a/src/main.rs +++ b/src/main.rs @@ -5,7 +5,7 @@ use figment::{ }; use readwise_bulk_upload::config::{Command, Config}; use readwise_bulk_upload::readwise::DocumentPayload; -use readwise_bulk_upload::task_manager::TaskManager; +use readwise_bulk_upload::task_manager::{TaskManager, TaskStatus}; use readwise_bulk_upload::{Error, Result}; use std::fs::File; use tabled::Table; @@ -29,6 +29,7 @@ async fn main() -> Result<()> { } async fn run(command: &Command) -> Result<()> { + let task_manager = TaskManager::new().await?; match command { Command::LoadTasks { path } => { let file = File::open(path).map_err(|_| { @@ -40,16 +41,21 @@ async fn run(command: &Command) -> Result<()> { let documents: Vec = serde_json::from_reader(file)?; - let task_manager = TaskManager::new().await?; task_manager.load_tasks(documents).await?; } Command::Query => { - let task_manager = TaskManager::new().await?; - let tasks = task_manager.get_tasks::(None, 25).await?; + let tasks = task_manager.get_tasks::(None, Some(25)).await?; println!("{}", Table::new(tasks)); } + Command::Run => { + task_manager.run_tasks::(|task| { + println!("{}", task.get_key()); + + TaskStatus::Completed + }).await?; + } Command::None => { Config::command().print_help()?; } diff --git a/src/task_manager.rs b/src/task_manager.rs index 2581187..11401e2 100644 --- a/src/task_manager.rs +++ b/src/task_manager.rs @@ -1,6 +1,7 @@ use crate::Error; use chrono::Utc; use directories::ProjectDirs; +use futures::{StreamExt, TryStreamExt}; use serde::de::DeserializeOwned; use serde::Serialize; use sqlx::sqlite::{SqliteConnectOptions, SqliteJournalMode}; @@ -44,6 +45,8 @@ pub trait TaskPayload { fn get_key(&self) -> String; } +pub type TaskJob = fn(&Task) -> TaskStatus; + #[derive(sqlx::FromRow, Tabled, Debug)] pub struct Task { id: u32, @@ -58,6 +61,12 @@ pub struct Task { updated_at: Option>, } +impl Task { + pub fn get_key(&self) -> String { + self.payload_key.clone() + } +} + fn display_option_date(o: &Option>) -> String { match o { Some(s) => format!("{}", s), @@ -65,6 +74,9 @@ fn display_option_date(o: &Option>) -> String { } } +pub trait _Task: DeserializeOwned + Send + Unpin + 'static + Display {} +impl _Task for T {} + #[derive(Debug)] pub struct TaskManager { pool: SqlitePool, @@ -97,21 +109,34 @@ impl TaskManager { Ok(pool) } - pub async fn get_tasks< - T: DeserializeOwned + Send + Unpin + 'static + Display, - >( + fn get_task_builder( + status: Option, + limit: Option, + ) -> QueryBuilder<'static, Sqlite> { + let mut builder: QueryBuilder<'_, Sqlite> = QueryBuilder::new( + "select id, payload_key, payload, status_id, created_at, updated_at from tasks ", + ); + + if let Some(status) = status { + builder.push("where status_id = ").push_bind(status); + } + + builder.push("ORDER BY created_at DESC "); + + if let Some(limit) = limit { + builder.push("LIMIT ").push_bind(limit); + } + builder + } + + pub async fn get_tasks( &self, status: Option, - limit: u16, + limit: Option, ) -> crate::Result>> { - let tasks: Vec> = sqlx::query_as( - "select id, payload_key, payload, status_id, created_at, updated_at from tasks where status_id = ? order by ? limit ?", - ) - .bind(status.unwrap_or(TaskStatus::Pending)) - .bind("created_at DESC") - .bind(limit) - .fetch_all(&self.pool) - .await?; + let mut builder = Self::get_task_builder(status, limit); + + let tasks: Vec> = builder.build_query_as().fetch_all(&self.pool).await?; Ok(tasks) } @@ -153,4 +178,19 @@ impl TaskManager { Ok(()) } + + pub async fn run_tasks(&self, func: TaskJob) -> crate::Result<()> { + let mut builder = Self::get_task_builder(Some(TaskStatus::Pending), None); + + let rows = builder.build_query_as::>().fetch(&self.pool); + + let result: Vec<(Task, TaskStatus)> = rows.map(|x| { + let task = x.unwrap(); + let status = func(&task); + + (task, status) + }).collect().await; + + Ok(()) + } } From b31502fb373361026caa3ff8e12e78483586689f Mon Sep 17 00:00:00 2001 From: aleidk Date: Thu, 15 May 2025 11:38:00 -0400 Subject: [PATCH 04/12] refactor: move library into it's own crate --- .idea/readwise-bulk-upload.iml | 3 ++- Cargo.lock | 21 +++++++++++++++++++ Cargo.toml | 24 +++++----------------- cli/Cargo.toml | 22 ++++++++++++++++++++ {src => cli/src}/config.rs | 0 {src => cli/src}/error.rs | 3 +++ {src => cli/src}/lib.rs | 3 +-- {src => cli/src}/main.rs | 9 ++++++-- {src => cli/src}/readwise.rs | 2 +- lib_sync_core/Cargo.toml | 20 ++++++++++++++++++ lib_sync_core/src/error.rs | 24 ++++++++++++++++++++++ lib_sync_core/src/lib.rs | 19 +++++++++++++++++ {src => lib_sync_core/src}/task_manager.rs | 22 ++++++++++++-------- 13 files changed, 138 insertions(+), 34 deletions(-) create mode 100644 cli/Cargo.toml rename {src => cli/src}/config.rs (100%) rename {src => cli/src}/error.rs (88%) rename {src => cli/src}/lib.rs (74%) rename {src => cli/src}/main.rs (82%) rename {src => cli/src}/readwise.rs (96%) create mode 100644 lib_sync_core/Cargo.toml create mode 100644 lib_sync_core/src/error.rs create mode 100644 lib_sync_core/src/lib.rs rename {src => lib_sync_core/src}/task_manager.rs (89%) diff --git a/.idea/readwise-bulk-upload.iml b/.idea/readwise-bulk-upload.iml index cf84ae4..2b50503 100644 --- a/.idea/readwise-bulk-upload.iml +++ b/.idea/readwise-bulk-upload.iml @@ -2,7 +2,8 @@ - + + diff --git a/Cargo.lock b/Cargo.lock index dfdfe03..12b188e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -916,6 +916,26 @@ dependencies = [ "spin", ] +[[package]] +name = "lib_sync_core" +version = "0.1.0" +dependencies = [ + "chrono", + "clap", + "directories", + "figment", + "futures", + "serde", + "serde_json", + "sqlx", + "tabled", + "thiserror", + "tokio", + "tracing", + "tracing-core", + "tracing-subscriber", +] + [[package]] name = "libc" version = "0.2.172" @@ -1330,6 +1350,7 @@ dependencies = [ "directories", "figment", "futures", + "lib_sync_core", "serde", "serde_json", "sqlx", diff --git a/Cargo.toml b/Cargo.toml index 3f51301..37dcffc 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,20 +1,6 @@ -[package] -name = "readwise-bulk-upload" -version = "0.1.0" -edition = "2024" +[workspace] +resolver = "3" -[dependencies] -thiserror = "2.0.12" -directories = "6.0.0" -tokio = { version = "1.45.0", features = ["default", "rt", "rt-multi-thread", "macros"] } -sqlx = { version = "0.8", features = [ "runtime-tokio", "sqlite", "chrono", "migrate" ] } -clap = { version = "4.5.37", features = ["derive"] } -serde = { version = "1.0.219", features = ["derive"] } -chrono = {version = "0.4.41", features = ["serde"]} -serde_json = "1.0.140" -tracing = "0.1.41" -tracing-subscriber = { version = "0.3.19" , features = ["env-filter"]} -figment = { version = "0.10.19", features = ["env"] } -tracing-core = "0.1.33" -tabled = "0.19.0" -futures = "0.3.31" \ No newline at end of file +members = [ + "cli", "lib_sync_core", +] diff --git a/cli/Cargo.toml b/cli/Cargo.toml new file mode 100644 index 0000000..410a10d --- /dev/null +++ b/cli/Cargo.toml @@ -0,0 +1,22 @@ +[package] +name = "readwise-bulk-upload" +version = "0.1.0" +edition = "2024" + +[dependencies] +lib_sync_core = {path = "../lib_sync_core"} + +directories = "6.0.0" +tokio = { version = "1.45.0", features = ["default", "rt", "rt-multi-thread", "macros"] } +sqlx = { version = "0.8", features = [ "runtime-tokio", "sqlite", "chrono", "migrate" ] } +clap = { version = "4.5.37", features = ["derive"] } +serde = { version = "1.0.219", features = ["derive"] } +chrono = {version = "0.4.41", features = ["serde"]} +serde_json = "1.0.140" +tracing = "0.1.41" +tracing-subscriber = { version = "0.3.19" , features = ["env-filter"]} +figment = { version = "0.10.19", features = ["env"] } +tracing-core = "0.1.33" +tabled = "0.19.0" +futures = "0.3.31" +thiserror = "2.0.12" \ No newline at end of file diff --git a/src/config.rs b/cli/src/config.rs similarity index 100% rename from src/config.rs rename to cli/src/config.rs diff --git a/src/error.rs b/cli/src/error.rs similarity index 88% rename from src/error.rs rename to cli/src/error.rs index b55b32e..36572b2 100644 --- a/src/error.rs +++ b/cli/src/error.rs @@ -11,6 +11,9 @@ pub enum Error { #[error("{0}")] Unhandled(&'static str), + #[error(transparent)] + Sync(#[from] lib_sync_core::error::Error), + #[error(transparent)] Sqlx(#[from] sqlx::Error), diff --git a/src/lib.rs b/cli/src/lib.rs similarity index 74% rename from src/lib.rs rename to cli/src/lib.rs index f63d5eb..acba89d 100644 --- a/src/lib.rs +++ b/cli/src/lib.rs @@ -1,6 +1,5 @@ -mod error; -pub mod task_manager; pub mod config; pub mod readwise; +mod error; pub use error::*; \ No newline at end of file diff --git a/src/main.rs b/cli/src/main.rs similarity index 82% rename from src/main.rs rename to cli/src/main.rs index ff1f2ac..23acee5 100644 --- a/src/main.rs +++ b/cli/src/main.rs @@ -5,9 +5,10 @@ use figment::{ }; use readwise_bulk_upload::config::{Command, Config}; use readwise_bulk_upload::readwise::DocumentPayload; -use readwise_bulk_upload::task_manager::{TaskManager, TaskStatus}; +use lib_sync_core::task_manager::{TaskManager, TaskStatus}; use readwise_bulk_upload::{Error, Result}; use std::fs::File; +use directories::ProjectDirs; use tabled::Table; use tracing_subscriber; @@ -29,7 +30,11 @@ async fn main() -> Result<()> { } async fn run(command: &Command) -> Result<()> { - let task_manager = TaskManager::new().await?; + let project_dir = ProjectDirs::from("", "", env!("CARGO_PKG_NAME")) + .ok_or(lib_sync_core::error::Error::Unhandled("Could not get standard directories"))?; + + let task_manager = TaskManager::new(project_dir.data_dir()).await?; + match command { Command::LoadTasks { path } => { let file = File::open(path).map_err(|_| { diff --git a/src/readwise.rs b/cli/src/readwise.rs similarity index 96% rename from src/readwise.rs rename to cli/src/readwise.rs index e3d7a42..13b2f65 100644 --- a/src/readwise.rs +++ b/cli/src/readwise.rs @@ -1,4 +1,4 @@ -use crate::task_manager::TaskPayload; +use lib_sync_core::task_manager::TaskPayload; use chrono::{DateTime, Local}; use serde::{de, Deserialize, Deserializer, Serialize}; use serde_json::Value; diff --git a/lib_sync_core/Cargo.toml b/lib_sync_core/Cargo.toml new file mode 100644 index 0000000..c6d73f7 --- /dev/null +++ b/lib_sync_core/Cargo.toml @@ -0,0 +1,20 @@ +[package] +name = "lib_sync_core" +version = "0.1.0" +edition = "2024" + +[dependencies] +directories = "6.0.0" +tokio = { version = "1.45.0", features = ["default", "rt", "rt-multi-thread", "macros"] } +sqlx = { version = "0.8", features = [ "runtime-tokio", "sqlite", "chrono", "migrate" ] } +clap = { version = "4.5.37", features = ["derive"] } +serde = { version = "1.0.219", features = ["derive"] } +chrono = {version = "0.4.41", features = ["serde"]} +serde_json = "1.0.140" +tracing = "0.1.41" +tracing-subscriber = { version = "0.3.19" , features = ["env-filter"]} +figment = { version = "0.10.19", features = ["env"] } +tracing-core = "0.1.33" +tabled = "0.19.0" +futures = "0.3.31" +thiserror = "2.0.12" diff --git a/lib_sync_core/src/error.rs b/lib_sync_core/src/error.rs new file mode 100644 index 0000000..93c8a0b --- /dev/null +++ b/lib_sync_core/src/error.rs @@ -0,0 +1,24 @@ +use thiserror::Error; + +#[derive(Error, Debug)] +pub enum Error { + #[error("{0}")] + Exception(&'static str), + + #[error("{0}")] + Unhandled(&'static str), + + #[error(transparent)] + Io(#[from] tokio::io::Error), + + #[error(transparent)] + Sqlx(#[from] sqlx::Error), + + #[error(transparent)] + Migration(#[from] sqlx::migrate::MigrateError), + + #[error(transparent)] + ParseJson(#[from] serde_json::Error), +} + +pub type Result = std::result::Result; \ No newline at end of file diff --git a/lib_sync_core/src/lib.rs b/lib_sync_core/src/lib.rs new file mode 100644 index 0000000..810ba8e --- /dev/null +++ b/lib_sync_core/src/lib.rs @@ -0,0 +1,19 @@ +pub mod error; + +pub(crate) use error::*; +pub mod task_manager; + +pub fn add(left: u64, right: u64) -> u64 { + left + right +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn it_works() { + let result = add(2, 2); + assert_eq!(result, 4); + } +} diff --git a/src/task_manager.rs b/lib_sync_core/src/task_manager.rs similarity index 89% rename from src/task_manager.rs rename to lib_sync_core/src/task_manager.rs index 11401e2..57a2556 100644 --- a/src/task_manager.rs +++ b/lib_sync_core/src/task_manager.rs @@ -1,4 +1,4 @@ -use crate::Error; +use crate::error::Error; use chrono::Utc; use directories::ProjectDirs; use futures::{StreamExt, TryStreamExt}; @@ -7,6 +7,7 @@ use serde::Serialize; use sqlx::sqlite::{SqliteConnectOptions, SqliteJournalMode}; use sqlx::{QueryBuilder, Sqlite, SqlitePool}; use std::fmt::Display; +use std::path::PathBuf; use tabled::Tabled; use tokio::fs; use tracing::{info, instrument}; @@ -79,23 +80,26 @@ impl _Task for T {} #[derive(Debug)] pub struct TaskManager { + base_path: PathBuf, pool: SqlitePool, } impl TaskManager { - pub async fn new() -> Result { + pub async fn new>(base_path: P) -> Result { + let base_path = base_path.into(); + let pool = Self::connect_database(base_path.clone()).await?; Ok(Self { - pool: Self::connect_database().await?, + base_path, + pool, }) } - async fn connect_database() -> crate::Result { - let project_dir = ProjectDirs::from("", "", env!("CARGO_PKG_NAME")) - .ok_or(Error::Unhandled("Could not get standard directories"))?; + async fn connect_database>(base_path: P) -> crate::Result { + let base_path = base_path.into(); - let database_file_path = project_dir.data_dir().join("db.sql"); + let database_file_path = base_path.join("db.sql"); - fs::create_dir_all(project_dir.data_dir()).await?; + fs::create_dir_all(base_path).await?; let opts = SqliteConnectOptions::new() .filename(database_file_path) @@ -104,7 +108,7 @@ impl TaskManager { let pool = SqlitePool::connect_with(opts).await?; - sqlx::migrate!("./migrations").run(&pool).await?; + sqlx::migrate!("../migrations").run(&pool).await?; Ok(pool) } From 2827193fd69206fb52a0acd89006eb39868974f3 Mon Sep 17 00:00:00 2001 From: aleidk Date: Thu, 15 May 2025 13:00:39 -0400 Subject: [PATCH 05/12] refactor: move readwise files to it's own binary --- Cargo.lock | 42 +++++----- cli/Cargo.toml | 6 +- .../readwise/external_interface.rs} | 0 cli/bin/readwise/main.rs | 76 +++++++++++++++++++ cli/src/lib.rs | 1 - cli/src/main.rs | 53 +------------ 6 files changed, 105 insertions(+), 73 deletions(-) rename cli/{src/readwise.rs => bin/readwise/external_interface.rs} (100%) create mode 100644 cli/bin/readwise/main.rs diff --git a/Cargo.lock b/Cargo.lock index 12b188e..3c424bd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -279,6 +279,27 @@ version = "0.7.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f46ad14479a25103f283c0f10005961cf086d8dc42205bb44c46ac563475dca6" +[[package]] +name = "cli" +version = "0.1.0" +dependencies = [ + "chrono", + "clap", + "directories", + "figment", + "futures", + "lib_sync_core", + "serde", + "serde_json", + "sqlx", + "tabled", + "thiserror", + "tokio", + "tracing", + "tracing-core", + "tracing-subscriber", +] + [[package]] name = "colorchoice" version = "1.0.3" @@ -1341,27 +1362,6 @@ dependencies = [ "getrandom 0.2.16", ] -[[package]] -name = "readwise-bulk-upload" -version = "0.1.0" -dependencies = [ - "chrono", - "clap", - "directories", - "figment", - "futures", - "lib_sync_core", - "serde", - "serde_json", - "sqlx", - "tabled", - "thiserror", - "tokio", - "tracing", - "tracing-core", - "tracing-subscriber", -] - [[package]] name = "redox_syscall" version = "0.5.12" diff --git a/cli/Cargo.toml b/cli/Cargo.toml index 410a10d..c056a52 100644 --- a/cli/Cargo.toml +++ b/cli/Cargo.toml @@ -1,8 +1,12 @@ [package] -name = "readwise-bulk-upload" +name = "cli" version = "0.1.0" edition = "2024" +[[bin]] +name = "readwise" +path = "bin/readwise/main.rs" + [dependencies] lib_sync_core = {path = "../lib_sync_core"} diff --git a/cli/src/readwise.rs b/cli/bin/readwise/external_interface.rs similarity index 100% rename from cli/src/readwise.rs rename to cli/bin/readwise/external_interface.rs diff --git a/cli/bin/readwise/main.rs b/cli/bin/readwise/main.rs new file mode 100644 index 0000000..bdfc584 --- /dev/null +++ b/cli/bin/readwise/main.rs @@ -0,0 +1,76 @@ +use clap::{CommandFactory, Parser}; +use directories::ProjectDirs; +use figment::{ + Figment, + providers::{Env, Serialized}, +}; +use lib_sync_core::task_manager::{TaskManager, TaskStatus}; +use cli::config::{Command, Config}; +use cli::{Error, Result}; +use std::fs::File; +use tabled::Table; +use tracing_subscriber; +use crate::external_interface::DocumentPayload; + +mod external_interface; + +#[tokio::main] +async fn main() -> Result<()> { + let cli = Config::parse(); + let args: Config = Figment::new() + .merge(Serialized::defaults(&cli)) + .merge(Env::prefixed("APP_")) + .extract()?; + + tracing_subscriber::fmt() + .with_max_level(args.log_level()) + .init(); + + run(&cli.command).await?; + + Ok(()) +} + +async fn run(command: &Command) -> Result<()> { + let project_dir = ProjectDirs::from("", "", "synchronizator_readwise").ok_or( + lib_sync_core::error::Error::Unhandled("Could not get standard directories"), + )?; + + let task_manager = TaskManager::new(project_dir.data_dir()).await?; + + match command { + Command::LoadTasks { path } => { + let file = File::open(path).map_err(|_| { + Error::Runtime(format!( + r#"The file "{}" could not be open"#, + path.display() + )) + })?; + + let documents: Vec = serde_json::from_reader(file)?; + + task_manager.load_tasks(documents).await?; + } + Command::Query => { + let tasks = task_manager + .get_tasks::(None, Some(25)) + .await?; + + println!("{}", Table::new(tasks)); + } + Command::Run => { + task_manager + .run_tasks::(|task| { + println!("{}", task.get_key()); + + TaskStatus::Completed + }) + .await?; + } + Command::None => { + Config::command().print_help()?; + } + } + + Ok(()) +} diff --git a/cli/src/lib.rs b/cli/src/lib.rs index acba89d..dabf39f 100644 --- a/cli/src/lib.rs +++ b/cli/src/lib.rs @@ -1,5 +1,4 @@ pub mod config; -pub mod readwise; mod error; pub use error::*; \ No newline at end of file diff --git a/cli/src/main.rs b/cli/src/main.rs index 23acee5..75c0be7 100644 --- a/cli/src/main.rs +++ b/cli/src/main.rs @@ -1,15 +1,10 @@ -use clap::{CommandFactory, Parser}; +use clap::Parser; use figment::{ providers::{Env, Serialized}, Figment, }; -use readwise_bulk_upload::config::{Command, Config}; -use readwise_bulk_upload::readwise::DocumentPayload; -use lib_sync_core::task_manager::{TaskManager, TaskStatus}; -use readwise_bulk_upload::{Error, Result}; -use std::fs::File; -use directories::ProjectDirs; -use tabled::Table; +use cli::config::Config; +use cli::Result; use tracing_subscriber; #[tokio::main] @@ -24,47 +19,5 @@ async fn main() -> Result<()> { .with_max_level(args.log_level()) .init(); - run(&cli.command).await?; - - Ok(()) -} - -async fn run(command: &Command) -> Result<()> { - let project_dir = ProjectDirs::from("", "", env!("CARGO_PKG_NAME")) - .ok_or(lib_sync_core::error::Error::Unhandled("Could not get standard directories"))?; - - let task_manager = TaskManager::new(project_dir.data_dir()).await?; - - match command { - Command::LoadTasks { path } => { - let file = File::open(path).map_err(|_| { - Error::Runtime(format!( - r#"The file "{}" could not be open"#, - path.display() - )) - })?; - - let documents: Vec = serde_json::from_reader(file)?; - - - task_manager.load_tasks(documents).await?; - } - Command::Query => { - let tasks = task_manager.get_tasks::(None, Some(25)).await?; - - println!("{}", Table::new(tasks)); - } - Command::Run => { - task_manager.run_tasks::(|task| { - println!("{}", task.get_key()); - - TaskStatus::Completed - }).await?; - } - Command::None => { - Config::command().print_help()?; - } - } - Ok(()) } From f34142dcba51614c59b4cf92e4a59add7bd48e5d Mon Sep 17 00:00:00 2001 From: aleidk Date: Thu, 15 May 2025 13:02:32 -0400 Subject: [PATCH 06/12] chore: add wip web package --- .idea/readwise-bulk-upload.iml | 1 + Cargo.lock | 4 ++++ Cargo.toml | 2 +- web/Cargo.toml | 6 ++++++ web/src/main.rs | 3 +++ 5 files changed, 15 insertions(+), 1 deletion(-) create mode 100644 web/Cargo.toml create mode 100644 web/src/main.rs diff --git a/.idea/readwise-bulk-upload.iml b/.idea/readwise-bulk-upload.iml index 2b50503..cc58e9c 100644 --- a/.idea/readwise-bulk-upload.iml +++ b/.idea/readwise-bulk-upload.iml @@ -4,6 +4,7 @@ + diff --git a/Cargo.lock b/Cargo.lock index 3c424bd..5daa196 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2240,6 +2240,10 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "web" +version = "0.1.0" + [[package]] name = "whoami" version = "1.6.0" diff --git a/Cargo.toml b/Cargo.toml index 37dcffc..df1befe 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -2,5 +2,5 @@ resolver = "3" members = [ - "cli", "lib_sync_core", + "cli", "lib_sync_core", "web", ] diff --git a/web/Cargo.toml b/web/Cargo.toml new file mode 100644 index 0000000..52dfac4 --- /dev/null +++ b/web/Cargo.toml @@ -0,0 +1,6 @@ +[package] +name = "web" +version = "0.1.0" +edition = "2024" + +[dependencies] diff --git a/web/src/main.rs b/web/src/main.rs new file mode 100644 index 0000000..e7a11a9 --- /dev/null +++ b/web/src/main.rs @@ -0,0 +1,3 @@ +fn main() { + println!("Hello, world!"); +} From 4199a97a195b5da1a976cfc5365eac806ea10de6 Mon Sep 17 00:00:00 2001 From: aleidk Date: Thu, 15 May 2025 16:53:11 -0400 Subject: [PATCH 07/12] refactor(lib_core): introduce database module refs: #5 --- .idea/codeStyles/codeStyleConfig.xml | 5 + Cargo.lock | 23 ++++ lib_sync_core/Cargo.toml | 1 + lib_sync_core/src/database.rs | 197 +++++++++++++++++++++++++++ lib_sync_core/src/lib.rs | 1 + lib_sync_core/src/task_manager.rs | 146 +++----------------- 6 files changed, 249 insertions(+), 124 deletions(-) create mode 100644 .idea/codeStyles/codeStyleConfig.xml create mode 100644 lib_sync_core/src/database.rs diff --git a/.idea/codeStyles/codeStyleConfig.xml b/.idea/codeStyles/codeStyleConfig.xml new file mode 100644 index 0000000..a55e7a1 --- /dev/null +++ b/.idea/codeStyles/codeStyleConfig.xml @@ -0,0 +1,5 @@ + + + + \ No newline at end of file diff --git a/Cargo.lock b/Cargo.lock index 5daa196..827a82a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -110,6 +110,28 @@ dependencies = [ "windows-sys 0.59.0", ] +[[package]] +name = "async-stream" +version = "0.3.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b5a71a6f37880a80d1d7f19efd781e4b5de42c88f0722cc13bcb6cc2cfe8476" +dependencies = [ + "async-stream-impl", + "futures-core", + "pin-project-lite", +] + +[[package]] +name = "async-stream-impl" +version = "0.3.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c7c24de15d275a1ecfd47a380fb4d5ec9bfe0933f309ed5e705b775596a3574d" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "atoi" version = "2.0.0" @@ -941,6 +963,7 @@ dependencies = [ name = "lib_sync_core" version = "0.1.0" dependencies = [ + "async-stream", "chrono", "clap", "directories", diff --git a/lib_sync_core/Cargo.toml b/lib_sync_core/Cargo.toml index c6d73f7..c2bd944 100644 --- a/lib_sync_core/Cargo.toml +++ b/lib_sync_core/Cargo.toml @@ -18,3 +18,4 @@ tracing-core = "0.1.33" tabled = "0.19.0" futures = "0.3.31" thiserror = "2.0.12" +async-stream = "0.3.6" diff --git a/lib_sync_core/src/database.rs b/lib_sync_core/src/database.rs new file mode 100644 index 0000000..0674329 --- /dev/null +++ b/lib_sync_core/src/database.rs @@ -0,0 +1,197 @@ +use crate::task_manager::{Task, TaskPayload, TaskStatus}; +use futures::stream::BoxStream; +use futures::{Stream, StreamExt, TryStreamExt}; +use serde::Serialize; +use sqlx::sqlite::{SqliteConnectOptions, SqliteJournalMode}; +use sqlx::{Error, QueryBuilder, Sqlite, SqlitePool}; +use std::path::PathBuf; +use tokio::fs; +use tracing::{info, instrument}; + +static SQLITE_BIND_LIMIT: usize = 32766; + +#[derive(Default, Clone)] +pub struct TaskPagination { + page_size: usize, + limit: Option, + offset: Option, + status: Option, +} + +impl TaskPagination { + pub fn new() -> Self { + Self::default() + } + + pub fn next(&self) -> Self { + Self { + page_size: self.page_size + self.page_size, + ..self.clone() + } + } + + pub fn prev(&self) -> Self { + Self { + page_size: self.page_size - self.page_size, + ..self.clone() + } + } + + pub fn set_page_size(&mut self, page_size: usize) { + self.page_size = page_size; + } + + pub fn set_limit(&mut self, limit: Option) { + self.limit = limit; + } + + pub fn set_offset(&mut self, offset: Option) { + self.offset = offset; + } + + pub fn set_status(&mut self, status: Option) { + self.status = status; + } +} + +pub struct TasksPage { + tasks: Vec>, + page: TaskPagination +} + +impl TasksPage { + fn new(tasks: Vec>, page: TaskPagination) -> Self { + Self { + tasks, + page + } + } + + pub fn next(&self) -> TaskPagination { + self.page.next() + } + + pub fn prev(&self) -> TaskPagination { + self.page.prev() + } +} + +pub trait TaskStorage { + fn insert_tasks(&self, tasks: Vec>) -> crate::Result<()>; + fn get_tasks(&self, options: TaskStatus) -> impl Stream>>; + + async fn get_paginated_tasks(&self, page: &TaskPagination) -> crate::Result>; +} + +#[derive(Debug)] +pub struct Database { + pool: SqlitePool, +} + +impl Database { + pub async fn new>(base_path: P) -> crate::Result { + Ok(Self { + pool: Self::connect_database(base_path).await?, + }) + } + + async fn connect_database>(base_path: P) -> crate::Result { + let base_path = base_path.into(); + + let database_file_path = base_path.join("db.sql"); + + fs::create_dir_all(base_path).await?; + + let opts = SqliteConnectOptions::new() + .filename(database_file_path) + .create_if_missing(true) + .journal_mode(SqliteJournalMode::Wal); + + let pool = SqlitePool::connect_with(opts).await?; + + sqlx::migrate!("../migrations").run(&pool).await?; + + Ok(pool) + } + + #[instrument(skip(self, values))] + pub async fn load_tasks(&self, values: Vec) -> crate::Result<()> + where + T: TaskPayload + Serialize + std::fmt::Debug, + { + let mut tx = self.pool.begin().await?; + let mut builder: QueryBuilder<'_, Sqlite> = + QueryBuilder::new("insert into tasks(payload_key, payload, status_id)"); + + let args: crate::Result> = values + .iter() + .map(|value| Ok((value.get_key(), serde_json::to_string(value)?))) + .collect(); + + let mut affected_rows = 0; + // Chunk the query by the size limit of bind params + for chunk in args?.chunks(SQLITE_BIND_LIMIT / 3) { + builder.push_values(chunk, |mut builder, item| { + builder + .push_bind(&item.0) + .push_bind(&item.1) + .push_bind(TaskStatus::Pending); + }); + builder.push("ON conflict (payload_key) DO NOTHING"); + + let query = builder.build(); + + affected_rows += query.execute(&mut *tx).await?.rows_affected(); + builder.reset(); + } + + tx.commit().await?; + + info!("{} rows inserted.", affected_rows); + + Ok(()) + } +} + +impl TaskStorage for Database { + fn insert_tasks(&self, tasks: Vec>) -> crate::error::Result<()> { + todo!() + } + + fn get_tasks(&self, task_status: TaskStatus) -> impl Stream>> { + let query= sqlx::query_as::<_, Task>( + " + SELECT id, payload_key, payload, status_id, created_at, updated_at + FROM tasks + WHERE status_id = ? + ORDER BY created_at DESC + ", + ).bind(task_status); + + query.fetch(&self.pool).err_into::() + } + + async fn get_paginated_tasks(&self, page: &TaskPagination) -> crate::Result> { + let mut builder: QueryBuilder<'_, Sqlite> = QueryBuilder::new( + "select id, payload_key, payload, status_id, created_at, updated_at from tasks ", + ); + + if let Some(status) = &page.status { + builder.push("where status_id = ").push_bind(status); + } + + builder.push("ORDER BY created_at DESC "); + + if let Some(limit) = &page.offset { + builder.push("OFFSET ").push_bind(limit); + } + + if let Some(limit) = &page.limit { + builder.push("LIMIT ").push_bind(limit); + } + + let tasks = builder.build_query_as::>().fetch_all(&self.pool).await?; + + Ok(TasksPage::new(tasks, page.clone())) + } +} diff --git a/lib_sync_core/src/lib.rs b/lib_sync_core/src/lib.rs index 810ba8e..0feb7fb 100644 --- a/lib_sync_core/src/lib.rs +++ b/lib_sync_core/src/lib.rs @@ -2,6 +2,7 @@ pub mod error; pub(crate) use error::*; pub mod task_manager; +mod database; pub fn add(left: u64, right: u64) -> u64 { left + right diff --git a/lib_sync_core/src/task_manager.rs b/lib_sync_core/src/task_manager.rs index 57a2556..8d7cdd4 100644 --- a/lib_sync_core/src/task_manager.rs +++ b/lib_sync_core/src/task_manager.rs @@ -8,13 +8,12 @@ use sqlx::sqlite::{SqliteConnectOptions, SqliteJournalMode}; use sqlx::{QueryBuilder, Sqlite, SqlitePool}; use std::fmt::Display; use std::path::PathBuf; +use futures::stream::BoxStream; use tabled::Tabled; use tokio::fs; use tracing::{info, instrument}; -static SQLITE_BIND_LIMIT: usize = 32766; - -#[derive(sqlx::Type, Debug)] +#[derive(sqlx::Type, Debug, Clone)] #[repr(u8)] pub enum TaskStatus { Pending = 1, @@ -42,11 +41,14 @@ impl Display for TaskStatus { } } -pub trait TaskPayload { +pub trait TaskPayloadKey { fn get_key(&self) -> String; } -pub type TaskJob = fn(&Task) -> TaskStatus; +pub trait TaskPayload: DeserializeOwned + Send + Unpin + 'static + Display + TaskPayloadKey {} +impl TaskPayload for T {} + +pub type TaskJob = fn(&Task) -> TaskStatus; #[derive(sqlx::FromRow, Tabled, Debug)] pub struct Task { @@ -75,126 +77,22 @@ fn display_option_date(o: &Option>) -> String { } } -pub trait _Task: DeserializeOwned + Send + Unpin + 'static + Display {} -impl _Task for T {} -#[derive(Debug)] -pub struct TaskManager { - base_path: PathBuf, - pool: SqlitePool, -} +struct TaskManager{} impl TaskManager { - pub async fn new>(base_path: P) -> Result { - let base_path = base_path.into(); - let pool = Self::connect_database(base_path.clone()).await?; - Ok(Self { - base_path, - pool, - }) - } - - async fn connect_database>(base_path: P) -> crate::Result { - let base_path = base_path.into(); - - let database_file_path = base_path.join("db.sql"); - - fs::create_dir_all(base_path).await?; - - let opts = SqliteConnectOptions::new() - .filename(database_file_path) - .create_if_missing(true) - .journal_mode(SqliteJournalMode::Wal); - - let pool = SqlitePool::connect_with(opts).await?; - - sqlx::migrate!("../migrations").run(&pool).await?; - - Ok(pool) - } - - fn get_task_builder( - status: Option, - limit: Option, - ) -> QueryBuilder<'static, Sqlite> { - let mut builder: QueryBuilder<'_, Sqlite> = QueryBuilder::new( - "select id, payload_key, payload, status_id, created_at, updated_at from tasks ", - ); - - if let Some(status) = status { - builder.push("where status_id = ").push_bind(status); - } - - builder.push("ORDER BY created_at DESC "); - - if let Some(limit) = limit { - builder.push("LIMIT ").push_bind(limit); - } - builder - } - - pub async fn get_tasks( - &self, - status: Option, - limit: Option, - ) -> crate::Result>> { - let mut builder = Self::get_task_builder(status, limit); - - let tasks: Vec> = builder.build_query_as().fetch_all(&self.pool).await?; - - Ok(tasks) - } - - #[instrument(skip(self, values))] - pub async fn load_tasks(&self, values: Vec) -> crate::Result<()> - where - T: TaskPayload + Serialize + std::fmt::Debug, - { - let mut tx = self.pool.begin().await?; - let mut builder: QueryBuilder<'_, Sqlite> = - QueryBuilder::new("insert into tasks(payload_key, payload, status_id)"); - - let args: crate::Result> = values - .iter() - .map(|value| Ok((value.get_key(), serde_json::to_string(value)?))) - .collect(); - - let mut affected_rows = 0; - // Chunk the query by the size limit of bind params - for chunk in args?.chunks(SQLITE_BIND_LIMIT / 3) { - builder.push_values(chunk, |mut builder, item| { - builder - .push_bind(&item.0) - .push_bind(&item.1) - .push_bind(TaskStatus::Pending); - }); - builder.push("ON conflict (payload_key) DO NOTHING"); - - let query = builder.build(); - - affected_rows += query.execute(&mut *tx).await?.rows_affected(); - builder.reset(); - } - - tx.commit().await?; - - info!("{} rows inserted.", affected_rows); - - Ok(()) - } - - pub async fn run_tasks(&self, func: TaskJob) -> crate::Result<()> { - let mut builder = Self::get_task_builder(Some(TaskStatus::Pending), None); - - let rows = builder.build_query_as::>().fetch(&self.pool); - - let result: Vec<(Task, TaskStatus)> = rows.map(|x| { - let task = x.unwrap(); - let status = func(&task); - - (task, status) - }).collect().await; - - Ok(()) - } + // pub async fn run_tasks(&self, func: TaskJob) -> crate::Result<()> { + // let mut builder = Self::get_task_builder(Some(TaskStatus::Pending), None); + // + // let rows = builder.build_query_as::>().fetch(&self.pool); + // + // let result: Vec<(Task, TaskStatus)> = rows.map(|x| { + // let task = x.unwrap(); + // let status = func(&task); + // + // (task, status) + // }).collect().await; + // + // Ok(()) + // } } From 45a3bf291b5aab694e0093a08149c3899c653cf3 Mon Sep 17 00:00:00 2001 From: aleidk Date: Fri, 16 May 2025 20:00:25 -0400 Subject: [PATCH 08/12] refactor(lib_core): task manager structure refs: #5 --- cli/bin/readwise/external_interface.rs | 2 +- cli/bin/readwise/main.rs | 2 +- lib_sync_core/src/database.rs | 128 +----------------- lib_sync_core/src/database/sqlite.rs | 117 ++++++++++++++++ lib_sync_core/src/lib.rs | 2 +- .../src/{task_manager.rs => tasks.rs} | 53 +++----- lib_sync_core/src/tasks/manager.rs | 32 +++++ 7 files changed, 176 insertions(+), 160 deletions(-) create mode 100644 lib_sync_core/src/database/sqlite.rs rename lib_sync_core/src/{task_manager.rs => tasks.rs} (53%) create mode 100644 lib_sync_core/src/tasks/manager.rs diff --git a/cli/bin/readwise/external_interface.rs b/cli/bin/readwise/external_interface.rs index 13b2f65..a484ca9 100644 --- a/cli/bin/readwise/external_interface.rs +++ b/cli/bin/readwise/external_interface.rs @@ -1,4 +1,4 @@ -use lib_sync_core::task_manager::TaskPayload; +use lib_sync_core::tasks::TaskPayload; use chrono::{DateTime, Local}; use serde::{de, Deserialize, Deserializer, Serialize}; use serde_json::Value; diff --git a/cli/bin/readwise/main.rs b/cli/bin/readwise/main.rs index bdfc584..29eee78 100644 --- a/cli/bin/readwise/main.rs +++ b/cli/bin/readwise/main.rs @@ -4,7 +4,7 @@ use figment::{ Figment, providers::{Env, Serialized}, }; -use lib_sync_core::task_manager::{TaskManager, TaskStatus}; +use lib_sync_core::tasks::{TaskStatus}; use cli::config::{Command, Config}; use cli::{Error, Result}; use std::fs::File; diff --git a/lib_sync_core/src/database.rs b/lib_sync_core/src/database.rs index 0674329..f2bc877 100644 --- a/lib_sync_core/src/database.rs +++ b/lib_sync_core/src/database.rs @@ -1,14 +1,6 @@ -use crate::task_manager::{Task, TaskPayload, TaskStatus}; -use futures::stream::BoxStream; -use futures::{Stream, StreamExt, TryStreamExt}; -use serde::Serialize; -use sqlx::sqlite::{SqliteConnectOptions, SqliteJournalMode}; -use sqlx::{Error, QueryBuilder, Sqlite, SqlitePool}; -use std::path::PathBuf; -use tokio::fs; -use tracing::{info, instrument}; - -static SQLITE_BIND_LIMIT: usize = 32766; +use crate::tasks::{Task, TaskPayload, TaskStatus}; +use futures::{Stream}; +mod sqlite; #[derive(Default, Clone)] pub struct TaskPagination { @@ -77,121 +69,9 @@ impl TasksPage { } pub trait TaskStorage { - fn insert_tasks(&self, tasks: Vec>) -> crate::Result<()>; + async fn insert_tasks(&self, tasks: Vec>) -> crate::Result<()>; fn get_tasks(&self, options: TaskStatus) -> impl Stream>>; async fn get_paginated_tasks(&self, page: &TaskPagination) -> crate::Result>; } -#[derive(Debug)] -pub struct Database { - pool: SqlitePool, -} - -impl Database { - pub async fn new>(base_path: P) -> crate::Result { - Ok(Self { - pool: Self::connect_database(base_path).await?, - }) - } - - async fn connect_database>(base_path: P) -> crate::Result { - let base_path = base_path.into(); - - let database_file_path = base_path.join("db.sql"); - - fs::create_dir_all(base_path).await?; - - let opts = SqliteConnectOptions::new() - .filename(database_file_path) - .create_if_missing(true) - .journal_mode(SqliteJournalMode::Wal); - - let pool = SqlitePool::connect_with(opts).await?; - - sqlx::migrate!("../migrations").run(&pool).await?; - - Ok(pool) - } - - #[instrument(skip(self, values))] - pub async fn load_tasks(&self, values: Vec) -> crate::Result<()> - where - T: TaskPayload + Serialize + std::fmt::Debug, - { - let mut tx = self.pool.begin().await?; - let mut builder: QueryBuilder<'_, Sqlite> = - QueryBuilder::new("insert into tasks(payload_key, payload, status_id)"); - - let args: crate::Result> = values - .iter() - .map(|value| Ok((value.get_key(), serde_json::to_string(value)?))) - .collect(); - - let mut affected_rows = 0; - // Chunk the query by the size limit of bind params - for chunk in args?.chunks(SQLITE_BIND_LIMIT / 3) { - builder.push_values(chunk, |mut builder, item| { - builder - .push_bind(&item.0) - .push_bind(&item.1) - .push_bind(TaskStatus::Pending); - }); - builder.push("ON conflict (payload_key) DO NOTHING"); - - let query = builder.build(); - - affected_rows += query.execute(&mut *tx).await?.rows_affected(); - builder.reset(); - } - - tx.commit().await?; - - info!("{} rows inserted.", affected_rows); - - Ok(()) - } -} - -impl TaskStorage for Database { - fn insert_tasks(&self, tasks: Vec>) -> crate::error::Result<()> { - todo!() - } - - fn get_tasks(&self, task_status: TaskStatus) -> impl Stream>> { - let query= sqlx::query_as::<_, Task>( - " - SELECT id, payload_key, payload, status_id, created_at, updated_at - FROM tasks - WHERE status_id = ? - ORDER BY created_at DESC - ", - ).bind(task_status); - - query.fetch(&self.pool).err_into::() - } - - async fn get_paginated_tasks(&self, page: &TaskPagination) -> crate::Result> { - let mut builder: QueryBuilder<'_, Sqlite> = QueryBuilder::new( - "select id, payload_key, payload, status_id, created_at, updated_at from tasks ", - ); - - if let Some(status) = &page.status { - builder.push("where status_id = ").push_bind(status); - } - - builder.push("ORDER BY created_at DESC "); - - if let Some(limit) = &page.offset { - builder.push("OFFSET ").push_bind(limit); - } - - if let Some(limit) = &page.limit { - builder.push("LIMIT ").push_bind(limit); - } - - let tasks = builder.build_query_as::>().fetch_all(&self.pool).await?; - - Ok(TasksPage::new(tasks, page.clone())) - } -} diff --git a/lib_sync_core/src/database/sqlite.rs b/lib_sync_core/src/database/sqlite.rs new file mode 100644 index 0000000..c7fc993 --- /dev/null +++ b/lib_sync_core/src/database/sqlite.rs @@ -0,0 +1,117 @@ +use sqlx::{QueryBuilder, SqlitePool}; +use std::path::PathBuf; +use tokio::fs; +use sqlx::sqlite::{SqliteConnectOptions, SqliteJournalMode}; +use tracing::{info, instrument}; +use futures::{Stream, TryStreamExt}; +use crate::database::{TaskPagination, TaskStorage, TasksPage}; +use crate::tasks::{Task, TaskPayload, TaskStatus}; + +#[allow(unused)] +static SQLITE_BIND_LIMIT: usize = 32766; + +#[derive(Debug)] +pub struct Sqlite { + pool: SqlitePool, +} + +impl Sqlite { + pub async fn new>(base_path: P) -> crate::Result { + Ok(Self { + pool: Self::connect_database(base_path).await?, + }) + } + + async fn connect_database>(base_path: P) -> crate::Result { + let base_path = base_path.into(); + + let database_file_path = base_path.join("db.sql"); + + fs::create_dir_all(base_path).await?; + + let opts = SqliteConnectOptions::new() + .filename(database_file_path) + .create_if_missing(true) + .journal_mode(SqliteJournalMode::Wal); + + let pool = SqlitePool::connect_with(opts).await?; + + sqlx::migrate!("../migrations").run(&pool).await?; + + Ok(pool) + } +} + +impl TaskStorage for Sqlite { + #[instrument(skip(self, tasks))] + async fn insert_tasks(&self, tasks: Vec>) -> crate::Result<()> { + let mut tx = self.pool.begin().await?; + let mut builder: QueryBuilder<'_, sqlx::Sqlite> = + QueryBuilder::new("insert into tasks(payload_key, payload, status_id)"); + + let args: crate::Result> = tasks + .iter() + .map(|value| Ok((value.get_key(), serde_json::to_string(value.payload())?))) + .collect(); + + let mut affected_rows = 0; + // Chunk the query by the size limit of bind params + for chunk in args?.chunks(SQLITE_BIND_LIMIT / 3) { + builder.push_values(chunk, |mut builder, item| { + builder + .push_bind(&item.0) + .push_bind(&item.1) + .push_bind(TaskStatus::Pending); + }); + builder.push("ON conflict (payload_key) DO NOTHING"); + + let query = builder.build(); + + affected_rows += query.execute(&mut *tx).await?.rows_affected(); + builder.reset(); + } + + tx.commit().await?; + + info!("{} rows inserted.", affected_rows); + + Ok(()) + } + + fn get_tasks(&self, task_status: TaskStatus) -> impl Stream>> { + let query= sqlx::query_as::<_, Task>( + " + SELECT id, payload_key, payload, status_id, created_at, updated_at + FROM tasks + WHERE status_id = ? + ORDER BY created_at DESC + ", + ).bind(task_status); + + query.fetch(&self.pool).err_into::() + } + + async fn get_paginated_tasks(&self, page: &TaskPagination) -> crate::Result> { + let mut builder: QueryBuilder<'_, sqlx::Sqlite> = QueryBuilder::new( + "select id, payload_key, payload, status_id, created_at, updated_at from tasks ", + ); + + if let Some(status) = &page.status { + builder.push("where status_id = ").push_bind(status); + } + + builder.push("ORDER BY created_at DESC "); + + if let Some(limit) = &page.offset { + builder.push("OFFSET ").push_bind(limit); + } + + if let Some(limit) = &page.limit { + builder.push("LIMIT ").push_bind(limit); + } + + let tasks = builder.build_query_as::>().fetch_all(&self.pool).await?; + + Ok(TasksPage::new(tasks, page.clone())) + } +} \ No newline at end of file diff --git a/lib_sync_core/src/lib.rs b/lib_sync_core/src/lib.rs index 0feb7fb..586934c 100644 --- a/lib_sync_core/src/lib.rs +++ b/lib_sync_core/src/lib.rs @@ -1,7 +1,7 @@ pub mod error; pub(crate) use error::*; -pub mod task_manager; +pub mod tasks; mod database; pub fn add(left: u64, right: u64) -> u64 { diff --git a/lib_sync_core/src/task_manager.rs b/lib_sync_core/src/tasks.rs similarity index 53% rename from lib_sync_core/src/task_manager.rs rename to lib_sync_core/src/tasks.rs index 8d7cdd4..04c22b7 100644 --- a/lib_sync_core/src/task_manager.rs +++ b/lib_sync_core/src/tasks.rs @@ -1,17 +1,11 @@ -use crate::error::Error; use chrono::Utc; -use directories::ProjectDirs; -use futures::{StreamExt, TryStreamExt}; +use futures::StreamExt; use serde::de::DeserializeOwned; use serde::Serialize; -use sqlx::sqlite::{SqliteConnectOptions, SqliteJournalMode}; -use sqlx::{QueryBuilder, Sqlite, SqlitePool}; use std::fmt::Display; -use std::path::PathBuf; -use futures::stream::BoxStream; use tabled::Tabled; -use tokio::fs; -use tracing::{info, instrument}; + +mod manager; #[derive(sqlx::Type, Debug, Clone)] #[repr(u8)] @@ -45,13 +39,19 @@ pub trait TaskPayloadKey { fn get_key(&self) -> String; } -pub trait TaskPayload: DeserializeOwned + Send + Unpin + 'static + Display + TaskPayloadKey {} -impl TaskPayload for T {} +pub trait TaskPayload: + Serialize + DeserializeOwned + Send + Unpin + 'static + Display + TaskPayloadKey +{ +} +impl + TaskPayload for T +{ +} -pub type TaskJob = fn(&Task) -> TaskStatus; +pub type TaskJob = fn(&Task) -> TaskStatus; #[derive(sqlx::FromRow, Tabled, Debug)] -pub struct Task { +pub struct Task { id: u32, payload_key: String, #[sqlx(json)] @@ -64,7 +64,13 @@ pub struct Task { updated_at: Option>, } -impl Task { +impl Task { + pub fn payload(&self) -> &T { + &self.payload + } +} + +impl Task { pub fn get_key(&self) -> String { self.payload_key.clone() } @@ -77,22 +83,3 @@ fn display_option_date(o: &Option>) -> String { } } - -struct TaskManager{} - -impl TaskManager { - // pub async fn run_tasks(&self, func: TaskJob) -> crate::Result<()> { - // let mut builder = Self::get_task_builder(Some(TaskStatus::Pending), None); - // - // let rows = builder.build_query_as::>().fetch(&self.pool); - // - // let result: Vec<(Task, TaskStatus)> = rows.map(|x| { - // let task = x.unwrap(); - // let status = func(&task); - // - // (task, status) - // }).collect().await; - // - // Ok(()) - // } -} diff --git a/lib_sync_core/src/tasks/manager.rs b/lib_sync_core/src/tasks/manager.rs new file mode 100644 index 0000000..bee3cb8 --- /dev/null +++ b/lib_sync_core/src/tasks/manager.rs @@ -0,0 +1,32 @@ +use futures::StreamExt; +use std::marker::PhantomData; +use crate::database::TaskStorage; +use crate::tasks::{Task, TaskJob, TaskPayload, TaskStatus}; + +struct TaskManager> +{ + storage: T, + _marker: PhantomData, +} + +impl> TaskManager { + pub fn new(storage: T) -> Self { + Self { + storage, + _marker: PhantomData, + } + } + + pub async fn run_tasks(&self, func: TaskJob) -> crate::Result<()> { + let rows = self.storage.get_tasks(TaskStatus::Pending); + + let result: Vec<(Task, TaskStatus)> = rows.map(|x| { + let task = x.unwrap(); + let status = func(&task); + + (task, status) + }).collect().await; + + Ok(()) + } +} \ No newline at end of file From 94fe050c4adab29732d2b0bfcdcd7269a714c791 Mon Sep 17 00:00:00 2001 From: aleidk Date: Sun, 18 May 2025 20:07:47 -0400 Subject: [PATCH 09/12] test(lib_core): it saves and return tasks refs: #5 --- .gitignore | 2 +- .idea/dataSources.xml | 7 ++ Cargo.lock | 169 +++++++++++++++++++++++++-- lib_sync_core/Cargo.toml | 6 +- lib_sync_core/src/database.rs | 2 +- lib_sync_core/src/database/sqlite.rs | 104 +++++++++++++++-- lib_sync_core/src/lib.rs | 15 --- lib_sync_core/src/tasks.rs | 22 ++-- 8 files changed, 282 insertions(+), 45 deletions(-) diff --git a/.gitignore b/.gitignore index ea8c4bf..b60de5b 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1 @@ -/target +**/target diff --git a/.idea/dataSources.xml b/.idea/dataSources.xml index 283ef50..38d7bd6 100644 --- a/.idea/dataSources.xml +++ b/.idea/dataSources.xml @@ -8,5 +8,12 @@ jdbc:sqlite:$USER_HOME$/.local/share/readwise-bulk-upload/db.sql $ProjectFileDir$ + + sqlite.xerial + true + org.sqlite.JDBC + jdbc:sqlite:$PROJECT_DIR$/lib_sync_core/target/sqlx/test-dbs/lib_sync_core/database/sqlite/tests/it_save_tasks.sqlite + $ProjectFileDir$ + \ No newline at end of file diff --git a/Cargo.lock b/Cargo.lock index 827a82a..c144106 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -398,6 +398,40 @@ dependencies = [ "typenum", ] +[[package]] +name = "darling" +version = "0.20.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fc7f46116c46ff9ab3eb1597a45688b6715c6e628b5c133e288e709a29bcb4ee" +dependencies = [ + "darling_core", + "darling_macro", +] + +[[package]] +name = "darling_core" +version = "0.20.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0d00b9596d185e565c2207a0b01f8bd1a135483d02d9b7b0a54b11da8d53412e" +dependencies = [ + "fnv", + "ident_case", + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "darling_macro" +version = "0.20.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fc34b93ccb385b40dc71c6fceac4b2ad23662c7eeb248cf10d529b7e055b6ead" +dependencies = [ + "darling_core", + "quote", + "syn", +] + [[package]] name = "der" version = "0.7.10" @@ -409,6 +443,12 @@ dependencies = [ "zeroize", ] +[[package]] +name = "deunicode" +version = "1.6.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "abd57806937c9cc163efc8ea3910e00a62e2aeb0b8119f1793a978088f8f6b04" + [[package]] name = "digest" version = "0.10.7" @@ -459,6 +499,18 @@ version = "0.15.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1aaf95b3e5c8f23aa320147307562d361db0ae0d51242340f558153b4eb2439b" +[[package]] +name = "dummy" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3bbcf21279103a67372982cb1156a2154a452451dff2b884cf897ccecce389e0" +dependencies = [ + "darling", + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "either" version = "1.15.0" @@ -506,6 +558,21 @@ dependencies = [ "pin-project-lite", ] +[[package]] +name = "fake" +version = "4.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2f5f203b70a419cb8880d1cfe6bebe488add0a0307d404e9f24021e5fd864b80" +dependencies = [ + "chrono", + "deunicode", + "dummy", + "http", + "rand 0.9.1", + "url-escape", + "uuid", +] + [[package]] name = "fastrand" version = "2.3.0" @@ -536,6 +603,12 @@ dependencies = [ "spin", ] +[[package]] +name = "fnv" +version = "1.0.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1" + [[package]] name = "foldhash" version = "0.1.5" @@ -749,6 +822,17 @@ dependencies = [ "windows-sys 0.59.0", ] +[[package]] +name = "http" +version = "1.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f4a85d31aea989eead29a3aaf9e1115a180df8282431156e533de47660892565" +dependencies = [ + "bytes", + "fnv", + "itoa", +] + [[package]] name = "iana-time-zone" version = "0.1.63" @@ -891,6 +975,12 @@ dependencies = [ "syn", ] +[[package]] +name = "ident_case" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b9e0384b61958566e926dc50660321d12159025e767c18e043daf26b70104c39" + [[package]] name = "idna" version = "1.0.3" @@ -967,6 +1057,7 @@ dependencies = [ "chrono", "clap", "directories", + "fake", "figment", "futures", "serde", @@ -978,6 +1069,7 @@ dependencies = [ "tracing", "tracing-core", "tracing-subscriber", + "uuid", ] [[package]] @@ -1108,7 +1200,7 @@ dependencies = [ "num-integer", "num-iter", "num-traits", - "rand", + "rand 0.8.5", "smallvec", "zeroize", ] @@ -1362,8 +1454,18 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "34af8d1a0e25924bc5b7c43c079c942339d8f0a8b57c39049bef581b46327404" dependencies = [ "libc", - "rand_chacha", - "rand_core", + "rand_chacha 0.3.1", + "rand_core 0.6.4", +] + +[[package]] +name = "rand" +version = "0.9.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9fbfd9d094a40bf3ae768db9361049ace4c0e04a4fd6b359518bd7b73a73dd97" +dependencies = [ + "rand_chacha 0.9.0", + "rand_core 0.9.3", ] [[package]] @@ -1373,7 +1475,17 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e6c10a63a0fa32252be49d21e7709d4d4baf8d231c2dbce1eaa8141b9b127d88" dependencies = [ "ppv-lite86", - "rand_core", + "rand_core 0.6.4", +] + +[[package]] +name = "rand_chacha" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d3022b5f1df60f26e1ffddd6c66e8aa15de382ae63b3a0c1bfc0e4d3e3f325cb" +dependencies = [ + "ppv-lite86", + "rand_core 0.9.3", ] [[package]] @@ -1385,6 +1497,15 @@ dependencies = [ "getrandom 0.2.16", ] +[[package]] +name = "rand_core" +version = "0.9.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "99d9a13982dcf210057a8a78572b2217b667c3beacbf3a0d8b454f6f82837d38" +dependencies = [ + "getrandom 0.3.2", +] + [[package]] name = "redox_syscall" version = "0.5.12" @@ -1462,7 +1583,7 @@ dependencies = [ "num-traits", "pkcs1", "pkcs8", - "rand_core", + "rand_core 0.6.4", "signature", "spki", "subtle", @@ -1561,6 +1682,12 @@ dependencies = [ "digest", ] +[[package]] +name = "sha1_smol" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbfa15b3dddfee50a0fff136974b3e1bde555604ba463834a7eb7deb6417705d" + [[package]] name = "sha2" version = "0.10.9" @@ -1594,7 +1721,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "77549399552de45a898a580c1b41d445bf730df867cc44e6c0233bbc4b8329de" dependencies = [ "digest", - "rand_core", + "rand_core 0.6.4", ] [[package]] @@ -1690,6 +1817,7 @@ dependencies = [ "tokio-stream", "tracing", "url", + "uuid", ] [[package]] @@ -1761,7 +1889,7 @@ dependencies = [ "memchr", "once_cell", "percent-encoding", - "rand", + "rand 0.8.5", "rsa", "serde", "sha1", @@ -1771,6 +1899,7 @@ dependencies = [ "stringprep", "thiserror", "tracing", + "uuid", "whoami", ] @@ -1800,7 +1929,7 @@ dependencies = [ "md-5", "memchr", "once_cell", - "rand", + "rand 0.8.5", "serde", "serde_json", "sha2", @@ -1809,6 +1938,7 @@ dependencies = [ "stringprep", "thiserror", "tracing", + "uuid", "whoami", ] @@ -1835,6 +1965,7 @@ dependencies = [ "thiserror", "tracing", "url", + "uuid", ] [[package]] @@ -2148,6 +2279,15 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "url-escape" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "44e0ce4d1246d075ca5abec4b41d33e87a6054d08e2366b63205665e950db218" +dependencies = [ + "percent-encoding", +] + [[package]] name = "utf16_iter" version = "1.0.5" @@ -2166,6 +2306,19 @@ version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821" +[[package]] +name = "uuid" +version = "1.16.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "458f7a779bf54acc9f347480ac654f68407d3aab21269a6e3c9f922acd9e2da9" +dependencies = [ + "atomic", + "getrandom 0.3.2", + "md-5", + "serde", + "sha1_smol", +] + [[package]] name = "valuable" version = "0.1.1" diff --git a/lib_sync_core/Cargo.toml b/lib_sync_core/Cargo.toml index c2bd944..82df9ee 100644 --- a/lib_sync_core/Cargo.toml +++ b/lib_sync_core/Cargo.toml @@ -6,7 +6,7 @@ edition = "2024" [dependencies] directories = "6.0.0" tokio = { version = "1.45.0", features = ["default", "rt", "rt-multi-thread", "macros"] } -sqlx = { version = "0.8", features = [ "runtime-tokio", "sqlite", "chrono", "migrate" ] } +sqlx = { version = "0.8", features = ["runtime-tokio", "sqlite", "chrono", "migrate", "uuid"] } clap = { version = "4.5.37", features = ["derive"] } serde = { version = "1.0.219", features = ["derive"] } chrono = {version = "0.4.41", features = ["serde"]} @@ -19,3 +19,7 @@ tabled = "0.19.0" futures = "0.3.31" thiserror = "2.0.12" async-stream = "0.3.6" +uuid = { version = "1.16.0", features = ["serde", "v4"] } + +[dev-dependencies] +fake = { version = "4.3.0", features = ["derive", "chrono", "http", "uuid"] } diff --git a/lib_sync_core/src/database.rs b/lib_sync_core/src/database.rs index f2bc877..b01914f 100644 --- a/lib_sync_core/src/database.rs +++ b/lib_sync_core/src/database.rs @@ -69,7 +69,7 @@ impl TasksPage { } pub trait TaskStorage { - async fn insert_tasks(&self, tasks: Vec>) -> crate::Result<()>; + async fn insert_tasks<'a, I: IntoIterator>>(&self, tasks: I) -> crate::Result<()>; fn get_tasks(&self, options: TaskStatus) -> impl Stream>>; async fn get_paginated_tasks(&self, page: &TaskPagination) -> crate::Result>; diff --git a/lib_sync_core/src/database/sqlite.rs b/lib_sync_core/src/database/sqlite.rs index c7fc993..d231990 100644 --- a/lib_sync_core/src/database/sqlite.rs +++ b/lib_sync_core/src/database/sqlite.rs @@ -1,14 +1,14 @@ +use crate::database::{TaskPagination, TaskStorage, TasksPage}; +use crate::tasks::{Task, TaskPayload, TaskStatus}; +use futures::{Stream, TryStreamExt}; +use sqlx::sqlite::{SqliteConnectOptions, SqliteJournalMode}; use sqlx::{QueryBuilder, SqlitePool}; use std::path::PathBuf; use tokio::fs; -use sqlx::sqlite::{SqliteConnectOptions, SqliteJournalMode}; use tracing::{info, instrument}; -use futures::{Stream, TryStreamExt}; -use crate::database::{TaskPagination, TaskStorage, TasksPage}; -use crate::tasks::{Task, TaskPayload, TaskStatus}; -#[allow(unused)] static SQLITE_BIND_LIMIT: usize = 32766; +static MIGRATIONS: sqlx::migrate::Migrator = sqlx::migrate!("../migrations"); #[derive(Debug)] pub struct Sqlite { @@ -36,21 +36,37 @@ impl Sqlite { let pool = SqlitePool::connect_with(opts).await?; - sqlx::migrate!("../migrations").run(&pool).await?; + MIGRATIONS.run(&pool).await?; Ok(pool) } } impl TaskStorage for Sqlite { + /// Insert task into the database for later processing + /// + /// # Arguments + /// + /// * `tasks`: A list of task to be processed, each task has to have a unique key, if a key is repeated, the item will be omitted + /// + /// returns: Result<(), Error> + /// + /// # Examples + /// + /// ``` + /// + /// ``` #[instrument(skip(self, tasks))] - async fn insert_tasks(&self, tasks: Vec>) -> crate::Result<()> { + async fn insert_tasks<'a, I: IntoIterator>>( + &self, + tasks: I, + ) -> crate::Result<()> { let mut tx = self.pool.begin().await?; let mut builder: QueryBuilder<'_, sqlx::Sqlite> = QueryBuilder::new("insert into tasks(payload_key, payload, status_id)"); let args: crate::Result> = tasks - .iter() + .into_iter() .map(|value| Ok((value.get_key(), serde_json::to_string(value.payload())?))) .collect(); @@ -79,14 +95,15 @@ impl TaskStorage for Sqlite { } fn get_tasks(&self, task_status: TaskStatus) -> impl Stream>> { - let query= sqlx::query_as::<_, Task>( + let query = sqlx::query_as::<_, Task>( " SELECT id, payload_key, payload, status_id, created_at, updated_at FROM tasks WHERE status_id = ? ORDER BY created_at DESC ", - ).bind(task_status); + ) + .bind(task_status); query.fetch(&self.pool).err_into::() } @@ -110,8 +127,71 @@ impl TaskStorage for Sqlite { builder.push("LIMIT ").push_bind(limit); } - let tasks = builder.build_query_as::>().fetch_all(&self.pool).await?; + let tasks = builder + .build_query_as::>() + .fetch_all(&self.pool) + .await?; Ok(TasksPage::new(tasks, page.clone())) } -} \ No newline at end of file +} + +#[cfg(test)] +mod tests { + use super::*; + use fake::{Dummy, Fake, Faker}; + use futures::StreamExt; + use serde::{Deserialize, Serialize}; + use sqlx::types::Uuid; + use sqlx::Row; + + #[derive(Dummy, Serialize, Deserialize, Debug)] +struct DummyTaskPayload { + key: Uuid, + _foo: String, + _baar: String, + } + + #[sqlx::test(migrator = "MIGRATIONS")] + async fn it_save_tasks(pool: SqlitePool) -> sqlx::Result<()> { + let owned_pool = pool.clone(); + let sqlite = Sqlite { pool: owned_pool }; + + let tasks = generate_dummy_tasks(); + + sqlite.insert_tasks(&tasks).await.unwrap(); + + let result = sqlx::query("select count(id) from tasks") + .fetch_one(&pool) + .await?; + + let total_rows: u64 = result.get(0); + + assert_eq!(total_rows as usize, tasks.len()); + + let saved_tasks: Vec> = sqlite + .get_tasks(TaskStatus::Pending) + .map(|item| item.unwrap()) + .collect() + .await; + + assert_eq!(tasks.len(), saved_tasks.len()); + + let mut zip = tasks.into_iter().zip(saved_tasks.into_iter()); + + assert!(zip.all(|(a, b)| { + a.get_key() == b.get_key() + })); + + Ok(()) + } + + fn generate_dummy_tasks() -> Vec> { + let payloads: Vec = Faker.fake(); + + payloads + .into_iter() + .map(|item| Task::new(item.key.to_string(), item, TaskStatus::Pending)) + .collect() + } +} diff --git a/lib_sync_core/src/lib.rs b/lib_sync_core/src/lib.rs index 586934c..1030a9a 100644 --- a/lib_sync_core/src/lib.rs +++ b/lib_sync_core/src/lib.rs @@ -3,18 +3,3 @@ pub mod error; pub(crate) use error::*; pub mod tasks; mod database; - -pub fn add(left: u64, right: u64) -> u64 { - left + right -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn it_works() { - let result = add(2, 2); - assert_eq!(result, 4); - } -} diff --git a/lib_sync_core/src/tasks.rs b/lib_sync_core/src/tasks.rs index 04c22b7..20b61d6 100644 --- a/lib_sync_core/src/tasks.rs +++ b/lib_sync_core/src/tasks.rs @@ -1,5 +1,4 @@ use chrono::Utc; -use futures::StreamExt; use serde::de::DeserializeOwned; use serde::Serialize; use std::fmt::Display; @@ -35,15 +34,11 @@ impl Display for TaskStatus { } } -pub trait TaskPayloadKey { - fn get_key(&self) -> String; -} - pub trait TaskPayload: - Serialize + DeserializeOwned + Send + Unpin + 'static + Display + TaskPayloadKey + Serialize + DeserializeOwned + Send + Unpin + 'static { } -impl +impl TaskPayload for T { } @@ -64,6 +59,19 @@ pub struct Task { updated_at: Option>, } +impl Task { + pub fn new(payload_key: String, payload: T, status: TaskStatus) -> Self { + Self { + id: 0, + payload_key, + payload, + status, + created_at: Default::default(), + updated_at: None, + } + } +} + impl Task { pub fn payload(&self) -> &T { &self.payload From c52a49707574d4897633bcbcffadc8dba4465c6b Mon Sep 17 00:00:00 2001 From: aleidk Date: Mon, 19 May 2025 13:22:36 -0400 Subject: [PATCH 10/12] test(lib_core): sqlite return paginated tasks refs: #5 --- .idea/readwise-bulk-upload.iml | 1 + Cargo.lock | 22 ++++++++ lib_sync_core/Cargo.toml | 1 + lib_sync_core/src/database.rs | 30 +++++------ lib_sync_core/src/database/sqlite.rs | 77 +++++++++++++++++++--------- lib_sync_core/src/tasks.rs | 4 +- migrations/0003_tasks.sql | 2 +- 7 files changed, 93 insertions(+), 44 deletions(-) diff --git a/.idea/readwise-bulk-upload.iml b/.idea/readwise-bulk-upload.iml index cc58e9c..0855a40 100644 --- a/.idea/readwise-bulk-upload.iml +++ b/.idea/readwise-bulk-upload.iml @@ -5,6 +5,7 @@ + diff --git a/Cargo.lock b/Cargo.lock index c144106..43de3fc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1069,6 +1069,7 @@ dependencies = [ "tracing", "tracing-core", "tracing-subscriber", + "tracing-test", "uuid", ] @@ -2220,6 +2221,27 @@ dependencies = [ "tracing-log", ] +[[package]] +name = "tracing-test" +version = "0.2.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "557b891436fe0d5e0e363427fc7f217abf9ccd510d5136549847bdcbcd011d68" +dependencies = [ + "tracing-core", + "tracing-subscriber", + "tracing-test-macro", +] + +[[package]] +name = "tracing-test-macro" +version = "0.2.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "04659ddb06c87d233c566112c1c9c5b9e98256d9af50ec3bc9c8327f873a7568" +dependencies = [ + "quote", + "syn", +] + [[package]] name = "typenum" version = "1.18.0" diff --git a/lib_sync_core/Cargo.toml b/lib_sync_core/Cargo.toml index 82df9ee..0dcc4f3 100644 --- a/lib_sync_core/Cargo.toml +++ b/lib_sync_core/Cargo.toml @@ -23,3 +23,4 @@ uuid = { version = "1.16.0", features = ["serde", "v4"] } [dev-dependencies] fake = { version = "4.3.0", features = ["derive", "chrono", "http", "uuid"] } +tracing-test = "0.2.5" diff --git a/lib_sync_core/src/database.rs b/lib_sync_core/src/database.rs index b01914f..45db661 100644 --- a/lib_sync_core/src/database.rs +++ b/lib_sync_core/src/database.rs @@ -1,12 +1,11 @@ use crate::tasks::{Task, TaskPayload, TaskStatus}; -use futures::{Stream}; +use futures::Stream; mod sqlite; -#[derive(Default, Clone)] +#[derive(Default, Clone, Debug)] pub struct TaskPagination { - page_size: usize, limit: Option, - offset: Option, + offset: u32, status: Option, } @@ -17,38 +16,37 @@ impl TaskPagination { pub fn next(&self) -> Self { Self { - page_size: self.page_size + self.page_size, + offset: self.offset.saturating_add(self.limit.unwrap_or(0)), ..self.clone() - } + } } pub fn prev(&self) -> Self { Self { - page_size: self.page_size - self.page_size, + offset: self.offset.saturating_sub(self.limit.unwrap_or(0)), ..self.clone() } } - pub fn set_page_size(&mut self, page_size: usize) { - self.page_size = page_size; - } - - pub fn set_limit(&mut self, limit: Option) { + pub fn with_limit(mut self, limit: Option) -> Self { self.limit = limit; + self } - pub fn set_offset(&mut self, offset: Option) { + pub fn with_offset(mut self, offset: u32) -> Self { self.offset = offset; + self } - pub fn set_status(&mut self, status: Option) { + pub fn with_status(mut self, status: Option) -> Self { self.status = status; + self } } pub struct TasksPage { tasks: Vec>, - page: TaskPagination + page: TaskPagination } impl TasksPage { @@ -72,6 +70,6 @@ pub trait TaskStorage { async fn insert_tasks<'a, I: IntoIterator>>(&self, tasks: I) -> crate::Result<()>; fn get_tasks(&self, options: TaskStatus) -> impl Stream>>; - async fn get_paginated_tasks(&self, page: &TaskPagination) -> crate::Result>; + async fn get_paginated_tasks(&self, page: TaskPagination) -> crate::Result>; } diff --git a/lib_sync_core/src/database/sqlite.rs b/lib_sync_core/src/database/sqlite.rs index d231990..6aac53c 100644 --- a/lib_sync_core/src/database/sqlite.rs +++ b/lib_sync_core/src/database/sqlite.rs @@ -5,7 +5,7 @@ use sqlx::sqlite::{SqliteConnectOptions, SqliteJournalMode}; use sqlx::{QueryBuilder, SqlitePool}; use std::path::PathBuf; use tokio::fs; -use tracing::{info, instrument}; +use tracing::{debug, info, instrument}; static SQLITE_BIND_LIMIT: usize = 32766; static MIGRATIONS: sqlx::migrate::Migrator = sqlx::migrate!("../migrations"); @@ -108,7 +108,7 @@ impl TaskStorage for Sqlite { query.fetch(&self.pool).err_into::() } - async fn get_paginated_tasks(&self, page: &TaskPagination) -> crate::Result> { + async fn get_paginated_tasks(&self, page: TaskPagination) -> crate::Result> { let mut builder: QueryBuilder<'_, sqlx::Sqlite> = QueryBuilder::new( "select id, payload_key, payload, status_id, created_at, updated_at from tasks ", ); @@ -119,13 +119,13 @@ impl TaskStorage for Sqlite { builder.push("ORDER BY created_at DESC "); - if let Some(limit) = &page.offset { - builder.push("OFFSET ").push_bind(limit); + + if let Some(limit) = page.limit { + builder.push("LIMIT ").push_bind(limit); + builder.push(" OFFSET ").push_bind(page.offset); } - if let Some(limit) = &page.limit { - builder.push("LIMIT ").push_bind(limit); - } + debug!("SQL: \"{}\", with options: \"{:?}\"", builder.sql(), page); let tasks = builder .build_query_as::>() @@ -139,25 +139,39 @@ impl TaskStorage for Sqlite { #[cfg(test)] mod tests { use super::*; - use fake::{Dummy, Fake, Faker}; + use fake::Dummy; use futures::StreamExt; use serde::{Deserialize, Serialize}; use sqlx::types::Uuid; use sqlx::Row; + use tracing_test::traced_test; #[derive(Dummy, Serialize, Deserialize, Debug)] -struct DummyTaskPayload { + struct DummyTaskPayload { key: Uuid, _foo: String, - _baar: String, + _bar: String, + } + + //noinspection RsUnresolvedPath + fn setup(pool: SqlitePool, len: usize) -> (Sqlite, Vec>) { + let owned_pool = pool.clone(); + let sqlite = Sqlite { pool: owned_pool }; + + let payloads = fake::vec![DummyTaskPayload; len]; + + let tasks = payloads + .into_iter() + .enumerate() + .map(|(i, item)| Task::new((i + 1).to_string(), item, TaskStatus::Pending)) + .collect(); + + (sqlite, tasks) } #[sqlx::test(migrator = "MIGRATIONS")] async fn it_save_tasks(pool: SqlitePool) -> sqlx::Result<()> { - let owned_pool = pool.clone(); - let sqlite = Sqlite { pool: owned_pool }; - - let tasks = generate_dummy_tasks(); + let (sqlite, tasks) = setup(pool.clone(), 100); sqlite.insert_tasks(&tasks).await.unwrap(); @@ -176,22 +190,35 @@ struct DummyTaskPayload { .await; assert_eq!(tasks.len(), saved_tasks.len()); - + let mut zip = tasks.into_iter().zip(saved_tasks.into_iter()); - assert!(zip.all(|(a, b)| { - a.get_key() == b.get_key() - })); - + assert!(zip.all(|(a, b)| { a.get_key() == b.get_key() })); + Ok(()) } - fn generate_dummy_tasks() -> Vec> { - let payloads: Vec = Faker.fake(); + #[sqlx::test(migrator = "MIGRATIONS")] + #[traced_test] + async fn it_return_paginated_tasks(pool: SqlitePool) -> sqlx::Result<()> { + let (sqlite, tasks) = setup(pool.clone(), 300); - payloads - .into_iter() - .map(|item| Task::new(item.key.to_string(), item, TaskStatus::Pending)) - .collect() + sqlite.insert_tasks(&tasks).await.unwrap(); + + let page_options = TaskPagination::new().with_limit(Some(25)); + + let first_page: TasksPage = + sqlite.get_paginated_tasks(page_options).await.unwrap(); + + assert_eq!(first_page.tasks.len(), 25); + assert_eq!(first_page.tasks.first().unwrap().get_key(), tasks.get(0).unwrap().get_key()); + + let second_page: TasksPage = + sqlite.get_paginated_tasks(first_page.next()).await.unwrap(); + + assert_eq!(second_page.tasks.len(), 25); + assert_eq!(second_page.tasks.first().unwrap().get_key(), tasks.get(25).unwrap().get_key()); + + Ok(()) } } diff --git a/lib_sync_core/src/tasks.rs b/lib_sync_core/src/tasks.rs index 20b61d6..a4bc96c 100644 --- a/lib_sync_core/src/tasks.rs +++ b/lib_sync_core/src/tasks.rs @@ -35,10 +35,10 @@ impl Display for TaskStatus { } pub trait TaskPayload: - Serialize + DeserializeOwned + Send + Unpin + 'static + Serialize + DeserializeOwned + Send + Unpin + 'static + std::fmt::Debug { } -impl +impl TaskPayload for T { } diff --git a/migrations/0003_tasks.sql b/migrations/0003_tasks.sql index 0b7cb54..85d03ec 100644 --- a/migrations/0003_tasks.sql +++ b/migrations/0003_tasks.sql @@ -3,7 +3,7 @@ create table tasks id integer not null constraint tasks_pk primary key autoincrement, - payload_key ANY not null + payload_key TEXT not null constraint tasks_payload_key unique on conflict ignore, payload TEXT not null, From d87843614a87c374b84541f57cd19d1b431955fb Mon Sep 17 00:00:00 2001 From: aleidk Date: Tue, 20 May 2025 16:49:41 -0400 Subject: [PATCH 11/12] wip: add test to task_manager refs: #5 --- .idea/codeStyles/Project.xml | 29 +++++ .idea/codeStyles/codeStyleConfig.xml | 2 +- lib_sync_core/src/database.rs | 4 +- lib_sync_core/src/database/sqlite.rs | 5 + lib_sync_core/src/tasks.rs | 3 +- lib_sync_core/src/tasks/jobs.rs | 3 + lib_sync_core/src/tasks/manager.rs | 155 +++++++++++++++++++++++---- 7 files changed, 177 insertions(+), 24 deletions(-) create mode 100644 .idea/codeStyles/Project.xml create mode 100644 lib_sync_core/src/tasks/jobs.rs diff --git a/.idea/codeStyles/Project.xml b/.idea/codeStyles/Project.xml new file mode 100644 index 0000000..4e20976 --- /dev/null +++ b/.idea/codeStyles/Project.xml @@ -0,0 +1,29 @@ + + + + + + + + + + + + \ No newline at end of file diff --git a/.idea/codeStyles/codeStyleConfig.xml b/.idea/codeStyles/codeStyleConfig.xml index a55e7a1..79ee123 100644 --- a/.idea/codeStyles/codeStyleConfig.xml +++ b/.idea/codeStyles/codeStyleConfig.xml @@ -1,5 +1,5 @@ - \ No newline at end of file diff --git a/lib_sync_core/src/database.rs b/lib_sync_core/src/database.rs index 45db661..216e71d 100644 --- a/lib_sync_core/src/database.rs +++ b/lib_sync_core/src/database.rs @@ -68,8 +68,10 @@ impl TasksPage { pub trait TaskStorage { async fn insert_tasks<'a, I: IntoIterator>>(&self, tasks: I) -> crate::Result<()>; - fn get_tasks(&self, options: TaskStatus) -> impl Stream>>; + fn get_tasks(&self, task_status: TaskStatus) -> impl Stream>>; + async fn listen_tasks(&self, task_status: TaskStatus) -> crate::Result<()>; + async fn get_paginated_tasks(&self, page: TaskPagination) -> crate::Result>; } diff --git a/lib_sync_core/src/database/sqlite.rs b/lib_sync_core/src/database/sqlite.rs index 6aac53c..5c09dc4 100644 --- a/lib_sync_core/src/database/sqlite.rs +++ b/lib_sync_core/src/database/sqlite.rs @@ -108,6 +108,10 @@ impl TaskStorage for Sqlite { query.fetch(&self.pool).err_into::() } + async fn listen_tasks(&self, task_status: TaskStatus) -> crate::error::Result<()> { + todo!() + } + async fn get_paginated_tasks(&self, page: TaskPagination) -> crate::Result> { let mut builder: QueryBuilder<'_, sqlx::Sqlite> = QueryBuilder::new( "select id, payload_key, payload, status_id, created_at, updated_at from tasks ", @@ -170,6 +174,7 @@ mod tests { } #[sqlx::test(migrator = "MIGRATIONS")] + #[traced_test] async fn it_save_tasks(pool: SqlitePool) -> sqlx::Result<()> { let (sqlite, tasks) = setup(pool.clone(), 100); diff --git a/lib_sync_core/src/tasks.rs b/lib_sync_core/src/tasks.rs index a4bc96c..1079163 100644 --- a/lib_sync_core/src/tasks.rs +++ b/lib_sync_core/src/tasks.rs @@ -5,6 +5,7 @@ use std::fmt::Display; use tabled::Tabled; mod manager; +mod jobs; #[derive(sqlx::Type, Debug, Clone)] #[repr(u8)] @@ -43,8 +44,6 @@ impl { } -pub type TaskJob = fn(&Task) -> TaskStatus; - #[derive(sqlx::FromRow, Tabled, Debug)] pub struct Task { id: u32, diff --git a/lib_sync_core/src/tasks/jobs.rs b/lib_sync_core/src/tasks/jobs.rs new file mode 100644 index 0000000..74a8ca0 --- /dev/null +++ b/lib_sync_core/src/tasks/jobs.rs @@ -0,0 +1,3 @@ +use crate::tasks::{Task, TaskStatus}; + +pub type TaskJob = fn(&Task) -> TaskStatus; \ No newline at end of file diff --git a/lib_sync_core/src/tasks/manager.rs b/lib_sync_core/src/tasks/manager.rs index bee3cb8..f21fb0c 100644 --- a/lib_sync_core/src/tasks/manager.rs +++ b/lib_sync_core/src/tasks/manager.rs @@ -1,32 +1,147 @@ -use futures::StreamExt; -use std::marker::PhantomData; use crate::database::TaskStorage; -use crate::tasks::{Task, TaskJob, TaskPayload, TaskStatus}; +use crate::tasks::jobs::TaskJob; +use crate::tasks::{Task, TaskPayload, TaskStatus}; +use futures::StreamExt; +use futures::stream::FuturesOrdered; +use std::marker::PhantomData; -struct TaskManager> -{ +pub enum RateLimit { + Buffer(usize), + Rate(usize), + Ticks(usize), + None, +} + +pub struct ExecuteOptions { + rate_limit: RateLimit, +} + +impl ExecuteOptions { + pub fn new() -> Self { + Self::default() + } + + pub fn with_rate_limit(mut self, rate_limit: RateLimit) -> Self { + self.rate_limit = rate_limit; + self + } +} + +impl Default for ExecuteOptions { + fn default() -> Self { + Self { + rate_limit: RateLimit::None, + } + } +} + +struct TaskManager> { storage: T, + options: ExecuteOptions, _marker: PhantomData, } impl> TaskManager { - pub fn new(storage: T) -> Self { - Self { - storage, - _marker: PhantomData, - } + pub fn new(storage: T, options: ExecuteOptions) -> Self { + Self { + storage, + options, + _marker: PhantomData, + } } - + pub async fn run_tasks(&self, func: TaskJob) -> crate::Result<()> { let rows = self.storage.get_tasks(TaskStatus::Pending); - - let result: Vec<(Task, TaskStatus)> = rows.map(|x| { - let task = x.unwrap(); - let status = func(&task); - - (task, status) - }).collect().await; - + + let result: Vec<(Task, TaskStatus)> = rows + .map(async |x| { + let task = x.unwrap(); + let status = func(&task); + + (task, status) + }) + .collect() + .await; + Ok(()) } -} \ No newline at end of file +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::database::{TaskPagination, TasksPage}; + use fake::{Dummy, Fake, Faker}; + use futures::{Stream, StreamExt}; + use serde::{Deserialize, Serialize}; + use sqlx::Row; + use sqlx::types::Uuid; + use tracing_test::traced_test; + + #[derive(Dummy, Serialize, Deserialize, Debug)] + struct DummyTaskPayload { + key: Uuid, + _foo: String, + _bar: String, + } + + struct DummyTaskStorage {} + + impl TaskStorage for DummyTaskStorage { + async fn insert_tasks<'a, I: IntoIterator>>( + &self, + _: I, + ) -> crate::error::Result<()> { + todo!() + } + + fn get_tasks( + &self, + task_status: TaskStatus, + ) -> impl Stream>> { + let payloads: Vec = Faker.fake(); + + let tasks = payloads.into_iter().enumerate().map(move |(i, item)| { + Ok(Task::new((i + 1).to_string(), item, task_status.clone())) + }); + + futures::stream::iter(tasks) + } + + async fn listen_tasks(&self, task_status: TaskStatus) -> crate::error::Result<()> { + todo!() + } + async fn listen_tasks2(&self, task_status: TaskStatus) -> FuturesOrdered> + Sized> { + let mut fifo = FuturesOrdered::new(); + + tokio::spawn(async move { + loop { + tokio::time::sleep(std::time::Duration::from_millis(250)).await; + let payload: DummyTaskPayload = Faker.fake(); + let task_status: TaskStatus = task_status.clone(); + fifo.push_back(async move { + Task::new(payload.key.to_string(), payload, task_status) + }); + } + }); + + fifo + } + + async fn get_paginated_tasks( + &self, + _: TaskPagination, + ) -> crate::error::Result> { + todo!() + } + } + + #[tokio::test] + #[traced_test] + async fn manager_runs() { + let execute_options = ExecuteOptions::new(); + let manager = TaskManager::new(DummyTaskStorage {}, execute_options); + + manager.run_tasks(|_| TaskStatus::Completed).await.unwrap(); + } +} From 2c47226dc9e5c5937ec9c0250e4fba9e87feb86f Mon Sep 17 00:00:00 2001 From: aleidk Date: Tue, 20 May 2025 16:49:41 -0400 Subject: [PATCH 12/12] wip: add test to task_manager refs: #5 --- .idea/codeStyles/codeStyleConfig.xml | 1 + Cargo.lock | 1 + lib_sync_core/Cargo.toml | 1 + lib_sync_core/src/database.rs | 29 ++++---- lib_sync_core/src/database/sqlite.rs | 5 +- lib_sync_core/src/tasks.rs | 4 +- lib_sync_core/src/tasks/bus.rs | 4 + lib_sync_core/src/tasks/manager.rs | 107 +++++++++++++++++++-------- lib_sync_core/src/tasks/worker.rs | 48 ++++++++++++ 9 files changed, 149 insertions(+), 51 deletions(-) create mode 100644 lib_sync_core/src/tasks/bus.rs create mode 100644 lib_sync_core/src/tasks/worker.rs diff --git a/.idea/codeStyles/codeStyleConfig.xml b/.idea/codeStyles/codeStyleConfig.xml index 79ee123..6e6eec1 100644 --- a/.idea/codeStyles/codeStyleConfig.xml +++ b/.idea/codeStyles/codeStyleConfig.xml @@ -1,5 +1,6 @@ \ No newline at end of file diff --git a/Cargo.lock b/Cargo.lock index 43de3fc..6292eb7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1066,6 +1066,7 @@ dependencies = [ "tabled", "thiserror", "tokio", + "tokio-stream", "tracing", "tracing-core", "tracing-subscriber", diff --git a/lib_sync_core/Cargo.toml b/lib_sync_core/Cargo.toml index 0dcc4f3..be60587 100644 --- a/lib_sync_core/Cargo.toml +++ b/lib_sync_core/Cargo.toml @@ -6,6 +6,7 @@ edition = "2024" [dependencies] directories = "6.0.0" tokio = { version = "1.45.0", features = ["default", "rt", "rt-multi-thread", "macros"] } +tokio-stream = "0.1.17" sqlx = { version = "0.8", features = ["runtime-tokio", "sqlite", "chrono", "migrate", "uuid"] } clap = { version = "4.5.37", features = ["derive"] } serde = { version = "1.0.219", features = ["derive"] } diff --git a/lib_sync_core/src/database.rs b/lib_sync_core/src/database.rs index 216e71d..89e5da0 100644 --- a/lib_sync_core/src/database.rs +++ b/lib_sync_core/src/database.rs @@ -13,12 +13,12 @@ impl TaskPagination { pub fn new() -> Self { Self::default() } - + pub fn next(&self) -> Self { - Self { + Self { offset: self.offset.saturating_add(self.limit.unwrap_or(0)), - ..self.clone() - } + ..self.clone() + } } pub fn prev(&self) -> Self { @@ -45,20 +45,17 @@ impl TaskPagination { } pub struct TasksPage { - tasks: Vec>, - page: TaskPagination + tasks: Vec>, + page: TaskPagination, } impl TasksPage { fn new(tasks: Vec>, page: TaskPagination) -> Self { - Self { - tasks, - page - } + Self { tasks, page } } pub fn next(&self) -> TaskPagination { - self.page.next() + self.page.next() } pub fn prev(&self) -> TaskPagination { @@ -67,11 +64,13 @@ impl TasksPage { } pub trait TaskStorage { - async fn insert_tasks<'a, I: IntoIterator>>(&self, tasks: I) -> crate::Result<()>; + async fn insert_tasks<'a, I: IntoIterator>>( + &self, + tasks: I, + ) -> crate::Result<()>; fn get_tasks(&self, task_status: TaskStatus) -> impl Stream>>; - async fn listen_tasks(&self, task_status: TaskStatus) -> crate::Result<()>; - + fn listen_tasks(&self, task_status: TaskStatus) -> impl Stream>>; + async fn get_paginated_tasks(&self, page: TaskPagination) -> crate::Result>; } - diff --git a/lib_sync_core/src/database/sqlite.rs b/lib_sync_core/src/database/sqlite.rs index 5c09dc4..6d11c9a 100644 --- a/lib_sync_core/src/database/sqlite.rs +++ b/lib_sync_core/src/database/sqlite.rs @@ -108,10 +108,11 @@ impl TaskStorage for Sqlite { query.fetch(&self.pool).err_into::() } - async fn listen_tasks(&self, task_status: TaskStatus) -> crate::error::Result<()> { - todo!() + fn listen_tasks(&self, task_status: TaskStatus) -> impl Stream>> { + futures::stream::empty() } + async fn get_paginated_tasks(&self, page: TaskPagination) -> crate::Result> { let mut builder: QueryBuilder<'_, sqlx::Sqlite> = QueryBuilder::new( "select id, payload_key, payload, status_id, created_at, updated_at from tasks ", diff --git a/lib_sync_core/src/tasks.rs b/lib_sync_core/src/tasks.rs index 1079163..f455941 100644 --- a/lib_sync_core/src/tasks.rs +++ b/lib_sync_core/src/tasks.rs @@ -6,6 +6,8 @@ use tabled::Tabled; mod manager; mod jobs; +mod worker; +mod bus; #[derive(sqlx::Type, Debug, Clone)] #[repr(u8)] @@ -75,9 +77,7 @@ impl Task { pub fn payload(&self) -> &T { &self.payload } -} -impl Task { pub fn get_key(&self) -> String { self.payload_key.clone() } diff --git a/lib_sync_core/src/tasks/bus.rs b/lib_sync_core/src/tasks/bus.rs new file mode 100644 index 0000000..4e96e56 --- /dev/null +++ b/lib_sync_core/src/tasks/bus.rs @@ -0,0 +1,4 @@ +#[derive(Clone)] +pub enum Bus { + Local, +} \ No newline at end of file diff --git a/lib_sync_core/src/tasks/manager.rs b/lib_sync_core/src/tasks/manager.rs index f21fb0c..eaaef77 100644 --- a/lib_sync_core/src/tasks/manager.rs +++ b/lib_sync_core/src/tasks/manager.rs @@ -1,9 +1,14 @@ use crate::database::TaskStorage; +use crate::tasks::bus::Bus; use crate::tasks::jobs::TaskJob; use crate::tasks::{Task, TaskPayload, TaskStatus}; use futures::StreamExt; -use futures::stream::FuturesOrdered; use std::marker::PhantomData; +use std::pin::pin; +use tokio::sync::mpsc::Receiver; +use tokio::sync::{mpsc, oneshot}; +use tokio::sync::oneshot::Sender; +use crate::tasks::worker::TaskMessage; pub enum RateLimit { Buffer(usize), @@ -12,11 +17,12 @@ pub enum RateLimit { None, } -pub struct ExecuteOptions { +pub struct ManagerOptions { rate_limit: RateLimit, + bus: Bus, } -impl ExecuteOptions { +impl ManagerOptions { pub fn new() -> Self { Self::default() } @@ -27,22 +33,23 @@ impl ExecuteOptions { } } -impl Default for ExecuteOptions { +impl Default for ManagerOptions { fn default() -> Self { Self { rate_limit: RateLimit::None, + bus: Bus::Local, } } } struct TaskManager> { storage: T, - options: ExecuteOptions, + options: ManagerOptions, _marker: PhantomData, } impl> TaskManager { - pub fn new(storage: T, options: ExecuteOptions) -> Self { + pub fn new(storage: T, options: ManagerOptions) -> Self { Self { storage, options, @@ -50,18 +57,31 @@ impl> TaskManager { } } - pub async fn run_tasks(&self, func: TaskJob) -> crate::Result<()> { + pub async fn run_tasks(&self, mut task_sink: TaskMessage) -> crate::Result<()> { let rows = self.storage.get_tasks(TaskStatus::Pending); + let listener = self.storage.listen_tasks(TaskStatus::Pending); - let result: Vec<(Task, TaskStatus)> = rows - .map(async |x| { - let task = x.unwrap(); - let status = func(&task); + let mut queue = pin!(rows.chain(listener)); - (task, status) - }) - .collect() - .await; + while let Some(task) = queue.next().await { + let task = match task { + Ok(task) => task, + Err(e) => { + continue + } + }; + + let sink = match task_sink.recv().await { + Some(s) => s, + None => break, // sink has stoped requesting tasks + }; + + if let Err(_) = sink.send(task) { + continue; + } + + // (task, status) + } Ok(()) } @@ -71,12 +91,17 @@ impl> TaskManager { mod tests { use super::*; use crate::database::{TaskPagination, TasksPage}; + use async_stream::stream; use fake::{Dummy, Fake, Faker}; - use futures::{Stream, StreamExt}; + use futures::Stream; use serde::{Deserialize, Serialize}; - use sqlx::Row; use sqlx::types::Uuid; + use sync::mpsc; + use tokio::sync; + use tokio_stream::wrappers::ReceiverStream; use tracing_test::traced_test; + use crate::error::Error; + use crate::tasks::worker::{Worker, WorkerManager}; #[derive(Dummy, Serialize, Deserialize, Debug)] struct DummyTaskPayload { @@ -98,7 +123,7 @@ mod tests { fn get_tasks( &self, task_status: TaskStatus, - ) -> impl Stream>> { + ) -> impl Stream>> { let payloads: Vec = Faker.fake(); let tasks = payloads.into_iter().enumerate().map(move |(i, item)| { @@ -108,24 +133,27 @@ mod tests { futures::stream::iter(tasks) } - async fn listen_tasks(&self, task_status: TaskStatus) -> crate::error::Result<()> { - todo!() - } - async fn listen_tasks2(&self, task_status: TaskStatus) -> FuturesOrdered> + Sized> { - let mut fifo = FuturesOrdered::new(); + fn listen_tasks( + &self, + task_status: TaskStatus, + ) -> impl Stream>> { + let (tx, rx) = mpsc::channel::>>(10); tokio::spawn(async move { - loop { + for _ in 0..10 { tokio::time::sleep(std::time::Duration::from_millis(250)).await; + let payload: DummyTaskPayload = Faker.fake(); let task_status: TaskStatus = task_status.clone(); - fifo.push_back(async move { - Task::new(payload.key.to_string(), payload, task_status) - }); + let task = Ok(Task::new(payload.key.to_string(), payload, task_status)); + + if let Err(_) = tx.send(task).await { + break; + } } }); - - fifo + + ReceiverStream::new(rx) } async fn get_paginated_tasks( @@ -136,12 +164,27 @@ mod tests { } } + struct DummyWorker; + + impl Worker for DummyWorker { + fn process_job(task: &Task) -> crate::error::Result<()> { + println!("{:#?}", task); + Ok(()) + } + + async fn on_job_failure(task: &Task, error: Error) -> crate::error::Result<()> { + println!("{:#?} {:?}", task, error); + Ok(()) + } + } + #[tokio::test] #[traced_test] async fn manager_runs() { - let execute_options = ExecuteOptions::new(); - let manager = TaskManager::new(DummyTaskStorage {}, execute_options); + let execute_options = ManagerOptions::new(); + let local_worker_sink = WorkerManager::get_listener_sink::(execute_options.bus.clone()); + let task_manager = TaskManager::new(DummyTaskStorage {}, execute_options); - manager.run_tasks(|_| TaskStatus::Completed).await.unwrap(); + task_manager.run_tasks(local_worker_sink).await.unwrap() } } diff --git a/lib_sync_core/src/tasks/worker.rs b/lib_sync_core/src/tasks/worker.rs new file mode 100644 index 0000000..7e3d917 --- /dev/null +++ b/lib_sync_core/src/tasks/worker.rs @@ -0,0 +1,48 @@ +use crate::error::Error; +use crate::tasks::bus::Bus; +use crate::tasks::{Task, TaskPayload}; +use tokio::sync::mpsc::Receiver; +use tokio::sync::oneshot::Sender; +use tokio::sync::{mpsc, oneshot}; + +pub type TaskMessage = Receiver>>; + +pub struct WorkerManager; + +impl WorkerManager { + pub fn get_listener_sink>(bus: Bus) -> TaskMessage { + match bus { + Bus::Local => { + let (bus_tx, bus_rx) = mpsc::channel(100); + tokio::spawn(async move { + loop { + // TODO: properly catch errors + let (tx, rx) = oneshot::channel(); + + // Request a task + bus_tx.send(tx).await.unwrap(); + + // Wait for a task to be returned + let task = rx.await.unwrap(); + + W::process_job(&task).unwrap(); + } + }); + + bus_rx + } + } + } +} + +pub trait Worker { + async fn pre_process_job(task: &Task) -> crate::Result<()> { + Ok(()) + } + fn process_job(task: &Task) -> crate::Result<()>; + async fn post_process_job(task: &Task) -> crate::Result<()> { + Ok(()) + } + + async fn on_job_failure(task: &Task, error: Error) -> crate::Result<()>; +}