From 71c11eaa84988b5907017ca15d6ba696c5e0b6d7 Mon Sep 17 00:00:00 2001 From: aleidk Date: Sun, 11 May 2025 17:09:23 -0400 Subject: [PATCH 1/3] feature: Create command system to handle actions --- .idea/vcs.xml | 5 +++++ src/config.rs | 28 +++++++++++++++++++++++----- src/main.rs | 45 ++++++++++++++++++++++++++++++--------------- 3 files changed, 58 insertions(+), 20 deletions(-) diff --git a/.idea/vcs.xml b/.idea/vcs.xml index 35eb1dd..8d3e42f 100644 --- a/.idea/vcs.xml +++ b/.idea/vcs.xml @@ -1,5 +1,10 @@ + + + diff --git a/src/config.rs b/src/config.rs index 680fdf7..3fb8d52 100644 --- a/src/config.rs +++ b/src/config.rs @@ -1,4 +1,4 @@ -use clap::{Parser, ValueEnum}; +use clap::{Parser, Subcommand, ValueEnum}; use serde::{Deserialize, Serialize}; use std::fmt; use std::path::PathBuf; @@ -57,9 +57,30 @@ impl Into for VerbosityLevel { } } +#[derive(Debug, Subcommand)] +#[clap(rename_all = "snake_case")] +pub enum Command { + /// Load task into the database from [path] + LoadTasks{ + /// Path to the file + path: PathBuf, + }, + #[clap(skip)] + None, +} + +impl Default for Command { + fn default() -> Self { + Command::None + } +} + #[derive(Debug, Parser, Serialize, Deserialize)] pub struct Config { - path: PathBuf, + #[command(subcommand)] + #[serde(skip)] + pub command: Command, + #[arg( long, short = 'v', @@ -71,9 +92,6 @@ pub struct Config { } impl Config { - pub fn path(&self) -> &PathBuf { - &self.path - } pub fn log_level(&self) -> LevelFilter { self.log_level.clone().into() diff --git a/src/main.rs b/src/main.rs index 985c0e4..b1fb8e8 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,5 +1,5 @@ -use clap::Parser; -use readwise_bulk_upload::config::Config; +use clap::{CommandFactory, Parser}; +use readwise_bulk_upload::config::{Command, Config}; use readwise_bulk_upload::readwise::DocumentPayload; use readwise_bulk_upload::sql::TaskManager; use readwise_bulk_upload::{Error, Result}; @@ -9,8 +9,9 @@ use figment::{Figment, providers::{Serialized, Env}}; #[tokio::main] async fn main() -> Result<()> { + let cli = Config::parse(); let args: Config = Figment::new() - .merge(Serialized::defaults(Config::parse())) + .merge(Serialized::defaults(&cli)) .merge(Env::prefixed("APP_")) .extract()?; @@ -18,18 +19,32 @@ async fn main() -> Result<()> { .with_max_level(args.log_level()) .init(); - let file = File::open(args.path()).map_err(|_| { - Error::Runtime(format!( - r#"The file "{}" could not be open"#, - args.path().display() - )) - })?; - - let documents: Vec = serde_json::from_reader(file)?; - - let task_manager = TaskManager::new().await?; - - task_manager.load_tasks(documents).await?; + run(&cli.command).await?; + + Ok(()) +} + +async fn run(command: &Command) -> Result<()> { + + match command { + Command::LoadTasks { path } => { + let file = File::open(path).map_err(|_| { + Error::Runtime(format!( + r#"The file "{}" could not be open"#, + path.display() + )) + })?; + + let documents: Vec = serde_json::from_reader(file)?; + + let task_manager = TaskManager::new().await?; + + task_manager.load_tasks(documents).await?; + } + Command::None => { + Config::command().print_help()?; + } + } Ok(()) } From 63c20cfc87415ad06f68fccbefa95a085dc964fd Mon Sep 17 00:00:00 2001 From: aleidk Date: Mon, 12 May 2025 14:32:44 -0400 Subject: [PATCH 2/3] feature: add command to query tasks --- Cargo.lock | 92 +++++++++++++++++++++++++++++++++ Cargo.toml | 1 + src/config.rs | 1 + src/lib.rs | 2 +- src/main.rs | 15 ++++-- src/readwise.rs | 15 +++++- src/{sql.rs => task_manager.rs} | 66 ++++++++++++++++++++++- 7 files changed, 184 insertions(+), 8 deletions(-) rename src/{sql.rs => task_manager.rs} (59%) diff --git a/Cargo.lock b/Cargo.lock index 87f2582..5b6eeaa 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -17,6 +17,19 @@ version = "2.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "512761e0bb2578dd7380c6baaa0f4ce03e84f95e960231d1dec8bf4d7d6e2627" +[[package]] +name = "ahash" +version = "0.8.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5a15f179cd60c4584b8a8c596927aadc462e27f2ca70c04e0071964a73ba7a75" +dependencies = [ + "cfg-if", + "getrandom 0.3.2", + "once_cell", + "version_check", + "zerocopy", +] + [[package]] name = "aho-corasick" version = "1.1.3" @@ -172,6 +185,12 @@ version = "3.17.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1628fb46dfa0b37568d12e5edd512553eccf6a22a78e8bde00bb4aed84d5bdbf" +[[package]] +name = "bytecount" +version = "0.6.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5ce89b21cab1437276d2650d57e971f9d548a2d9037cc231abdc0562b97498ce" + [[package]] name = "bytemuck" version = "1.23.0" @@ -1059,6 +1078,17 @@ version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39" +[[package]] +name = "papergrid" +version = "0.15.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "30268a8d20c2c0d126b2b6610ab405f16517f6ba9f244d8c59ac2c512a8a1ce7" +dependencies = [ + "ahash", + "bytecount", + "unicode-width", +] + [[package]] name = "parking" version = "2.2.1" @@ -1174,6 +1204,28 @@ dependencies = [ "zerocopy", ] +[[package]] +name = "proc-macro-error-attr2" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "96de42df36bb9bba5542fe9f1a054b8cc87e172759a1868aa05c1f3acc89dfc5" +dependencies = [ + "proc-macro2", + "quote", +] + +[[package]] +name = "proc-macro-error2" +version = "2.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "11ec05c52be0a07b08061f7dd003e7d7092e0472bc731b4af7bb1ef876109802" +dependencies = [ + "proc-macro-error-attr2", + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "proc-macro2" version = "1.0.95" @@ -1252,6 +1304,7 @@ dependencies = [ "serde", "serde_json", "sqlx", + "tabled", "thiserror", "tokio", "tracing", @@ -1762,6 +1815,30 @@ dependencies = [ "syn", ] +[[package]] +name = "tabled" +version = "0.19.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "228d124371171cd39f0f454b58f73ddebeeef3cef3207a82ffea1c29465aea43" +dependencies = [ + "papergrid", + "tabled_derive", + "testing_table", +] + +[[package]] +name = "tabled_derive" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0ea5d1b13ca6cff1f9231ffd62f15eefd72543dab5e468735f1a456728a02846" +dependencies = [ + "heck", + "proc-macro-error2", + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "tempfile" version = "3.19.1" @@ -1775,6 +1852,15 @@ dependencies = [ "windows-sys 0.59.0", ] +[[package]] +name = "testing_table" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0f8daae29995a24f65619e19d8d31dea5b389f3d853d8bf297bbf607cd0014cc" +dependencies = [ + "unicode-width", +] + [[package]] name = "thiserror" version = "2.0.12" @@ -1972,6 +2058,12 @@ version = "0.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e70f2a8b45122e719eb623c01822704c4e0907e7e426a05927e1a1cfff5b75d0" +[[package]] +name = "unicode-width" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1fc81956842c57dac11422a97c3b8195a1ff727f06e85c84ed2e8aa277c9a0fd" + [[package]] name = "url" version = "2.5.4" diff --git a/Cargo.toml b/Cargo.toml index da95ddc..6482ca3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,3 +16,4 @@ tracing = "0.1.41" tracing-subscriber = { version = "0.3.19" , features = ["env-filter"]} figment = { version = "0.10.19", features = ["env"] } tracing-core = "0.1.33" +tabled = "0.19.0" \ No newline at end of file diff --git a/src/config.rs b/src/config.rs index 3fb8d52..2b71f1c 100644 --- a/src/config.rs +++ b/src/config.rs @@ -65,6 +65,7 @@ pub enum Command { /// Path to the file path: PathBuf, }, + Query, #[clap(skip)] None, } diff --git a/src/lib.rs b/src/lib.rs index dabd6d3..f63d5eb 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,5 +1,5 @@ mod error; -pub mod sql; +pub mod task_manager; pub mod config; pub mod readwise; diff --git a/src/main.rs b/src/main.rs index b1fb8e8..319961a 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,11 +1,15 @@ use clap::{CommandFactory, Parser}; +use figment::{ + providers::{Env, Serialized}, + Figment, +}; use readwise_bulk_upload::config::{Command, Config}; use readwise_bulk_upload::readwise::DocumentPayload; -use readwise_bulk_upload::sql::TaskManager; +use readwise_bulk_upload::task_manager::TaskManager; use readwise_bulk_upload::{Error, Result}; use std::fs::File; +use tabled::Table; use tracing_subscriber; -use figment::{Figment, providers::{Serialized, Env}}; #[tokio::main] async fn main() -> Result<()> { @@ -25,7 +29,6 @@ async fn main() -> Result<()> { } async fn run(command: &Command) -> Result<()> { - match command { Command::LoadTasks { path } => { let file = File::open(path).map_err(|_| { @@ -41,6 +44,12 @@ async fn run(command: &Command) -> Result<()> { task_manager.load_tasks(documents).await?; } + Command::Query => { + let task_manager = TaskManager::new().await?; + let tasks = task_manager.get_tasks::(None, 25).await?; + + println!("{}", Table::new(tasks)); + } Command::None => { Config::command().print_help()?; } diff --git a/src/readwise.rs b/src/readwise.rs index 33bf9ed..e3d7a42 100644 --- a/src/readwise.rs +++ b/src/readwise.rs @@ -1,7 +1,8 @@ +use crate::task_manager::TaskPayload; use chrono::{DateTime, Local}; -use serde::{Deserialize, Deserializer, de, Serialize}; +use serde::{de, Deserialize, Deserializer, Serialize}; use serde_json::Value; -use crate::sql::TaskPayload; +use std::fmt::Display; #[derive(Deserialize, Serialize, Debug)] pub struct DocumentPayload { @@ -14,6 +15,16 @@ pub struct DocumentPayload { location: String, } +impl Display for DocumentPayload { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!( + f, + "{}", + serde_json::to_string_pretty(self).map_err(|_| std::fmt::Error)? + ) + } +} + impl TaskPayload for DocumentPayload { fn get_key(&self) -> String { self.url.clone() diff --git a/src/sql.rs b/src/task_manager.rs similarity index 59% rename from src/sql.rs rename to src/task_manager.rs index 7d2a442..2581187 100644 --- a/src/sql.rs +++ b/src/task_manager.rs @@ -1,14 +1,18 @@ use crate::Error; +use chrono::Utc; use directories::ProjectDirs; +use serde::de::DeserializeOwned; use serde::Serialize; use sqlx::sqlite::{SqliteConnectOptions, SqliteJournalMode}; use sqlx::{QueryBuilder, Sqlite, SqlitePool}; +use std::fmt::Display; +use tabled::Tabled; use tokio::fs; use tracing::{info, instrument}; static SQLITE_BIND_LIMIT: usize = 32766; -#[derive(sqlx::Type)] +#[derive(sqlx::Type, Debug)] #[repr(u8)] pub enum TaskStatus { Pending = 1, @@ -17,10 +21,50 @@ pub enum TaskStatus { Failed = 4, } +impl Display for TaskStatus { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + TaskStatus::Pending => { + write!(f, "Pending") + } + TaskStatus::InProgress => { + write!(f, "In Progress") + } + TaskStatus::Completed => { + write!(f, "Completed") + } + TaskStatus::Failed => { + write!(f, "Failed") + } + } + } +} + pub trait TaskPayload { fn get_key(&self) -> String; } +#[derive(sqlx::FromRow, Tabled, Debug)] +pub struct Task { + id: u32, + payload_key: String, + #[sqlx(json)] + #[tabled(skip)] + payload: T, + #[sqlx(rename = "status_id")] + status: TaskStatus, + created_at: chrono::DateTime, + #[tabled(display = "display_option_date")] + updated_at: Option>, +} + +fn display_option_date(o: &Option>) -> String { + match o { + Some(s) => format!("{}", s), + None => String::from(""), + } +} + #[derive(Debug)] pub struct TaskManager { pool: SqlitePool, @@ -53,6 +97,25 @@ impl TaskManager { Ok(pool) } + pub async fn get_tasks< + T: DeserializeOwned + Send + Unpin + 'static + Display, + >( + &self, + status: Option, + limit: u16, + ) -> crate::Result>> { + let tasks: Vec> = sqlx::query_as( + "select id, payload_key, payload, status_id, created_at, updated_at from tasks where status_id = ? order by ? limit ?", + ) + .bind(status.unwrap_or(TaskStatus::Pending)) + .bind("created_at DESC") + .bind(limit) + .fetch_all(&self.pool) + .await?; + + Ok(tasks) + } + #[instrument(skip(self, values))] pub async fn load_tasks(&self, values: Vec) -> crate::Result<()> where @@ -67,7 +130,6 @@ impl TaskManager { .map(|value| Ok((value.get_key(), serde_json::to_string(value)?))) .collect(); - let mut affected_rows = 0; // Chunk the query by the size limit of bind params for chunk in args?.chunks(SQLITE_BIND_LIMIT / 3) { From 53872108446597439ac77045b0f656a7f5f4e864 Mon Sep 17 00:00:00 2001 From: aleidk Date: Mon, 12 May 2025 16:40:41 -0400 Subject: [PATCH 3/3] feature: add command to run tasks --- .idea/runConfigurations/Load_Tasks.xml | 22 +++++++++ .idea/runConfigurations/Query_Tasks.xml | 22 +++++++++ .idea/runConfigurations/Run_Tasks.xml | 22 +++++++++ Cargo.lock | 29 +++++++++++ Cargo.toml | 3 +- src/config.rs | 1 + src/main.rs | 14 ++++-- src/task_manager.rs | 64 ++++++++++++++++++++----- 8 files changed, 160 insertions(+), 17 deletions(-) create mode 100644 .idea/runConfigurations/Load_Tasks.xml create mode 100644 .idea/runConfigurations/Query_Tasks.xml create mode 100644 .idea/runConfigurations/Run_Tasks.xml diff --git a/.idea/runConfigurations/Load_Tasks.xml b/.idea/runConfigurations/Load_Tasks.xml new file mode 100644 index 0000000..41f2816 --- /dev/null +++ b/.idea/runConfigurations/Load_Tasks.xml @@ -0,0 +1,22 @@ + + + + \ No newline at end of file diff --git a/.idea/runConfigurations/Query_Tasks.xml b/.idea/runConfigurations/Query_Tasks.xml new file mode 100644 index 0000000..ef4b918 --- /dev/null +++ b/.idea/runConfigurations/Query_Tasks.xml @@ -0,0 +1,22 @@ + + + + \ No newline at end of file diff --git a/.idea/runConfigurations/Run_Tasks.xml b/.idea/runConfigurations/Run_Tasks.xml new file mode 100644 index 0000000..ef08a15 --- /dev/null +++ b/.idea/runConfigurations/Run_Tasks.xml @@ -0,0 +1,22 @@ + + + + \ No newline at end of file diff --git a/Cargo.lock b/Cargo.lock index 5b6eeaa..dfdfe03 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -508,6 +508,21 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "futures" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "65bc07b1a8bc7c85c5f2e110c476c7389b4554ba72af57d8445ea63a576b0876" +dependencies = [ + "futures-channel", + "futures-core", + "futures-executor", + "futures-io", + "futures-sink", + "futures-task", + "futures-util", +] + [[package]] name = "futures-channel" version = "0.3.31" @@ -552,6 +567,17 @@ version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9e5c1b78ca4aae1ac06c48a526a655760685149f0d465d21f37abfe57ce075c6" +[[package]] +name = "futures-macro" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "162ee34ebcb7c64a8abebc059ce0fee27c2262618d7b60ed8faf72fef13c3650" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "futures-sink" version = "0.3.31" @@ -570,8 +596,10 @@ version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9fa08315bb612088cc391249efdc3bc77536f16c91f6cf495e6fbe85b20a4a81" dependencies = [ + "futures-channel", "futures-core", "futures-io", + "futures-macro", "futures-sink", "futures-task", "memchr", @@ -1301,6 +1329,7 @@ dependencies = [ "clap", "directories", "figment", + "futures", "serde", "serde_json", "sqlx", diff --git a/Cargo.toml b/Cargo.toml index 6482ca3..3f51301 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,4 +16,5 @@ tracing = "0.1.41" tracing-subscriber = { version = "0.3.19" , features = ["env-filter"]} figment = { version = "0.10.19", features = ["env"] } tracing-core = "0.1.33" -tabled = "0.19.0" \ No newline at end of file +tabled = "0.19.0" +futures = "0.3.31" \ No newline at end of file diff --git a/src/config.rs b/src/config.rs index 2b71f1c..04bdbff 100644 --- a/src/config.rs +++ b/src/config.rs @@ -66,6 +66,7 @@ pub enum Command { path: PathBuf, }, Query, + Run, #[clap(skip)] None, } diff --git a/src/main.rs b/src/main.rs index 319961a..ff1f2ac 100644 --- a/src/main.rs +++ b/src/main.rs @@ -5,7 +5,7 @@ use figment::{ }; use readwise_bulk_upload::config::{Command, Config}; use readwise_bulk_upload::readwise::DocumentPayload; -use readwise_bulk_upload::task_manager::TaskManager; +use readwise_bulk_upload::task_manager::{TaskManager, TaskStatus}; use readwise_bulk_upload::{Error, Result}; use std::fs::File; use tabled::Table; @@ -29,6 +29,7 @@ async fn main() -> Result<()> { } async fn run(command: &Command) -> Result<()> { + let task_manager = TaskManager::new().await?; match command { Command::LoadTasks { path } => { let file = File::open(path).map_err(|_| { @@ -40,16 +41,21 @@ async fn run(command: &Command) -> Result<()> { let documents: Vec = serde_json::from_reader(file)?; - let task_manager = TaskManager::new().await?; task_manager.load_tasks(documents).await?; } Command::Query => { - let task_manager = TaskManager::new().await?; - let tasks = task_manager.get_tasks::(None, 25).await?; + let tasks = task_manager.get_tasks::(None, Some(25)).await?; println!("{}", Table::new(tasks)); } + Command::Run => { + task_manager.run_tasks::(|task| { + println!("{}", task.get_key()); + + TaskStatus::Completed + }).await?; + } Command::None => { Config::command().print_help()?; } diff --git a/src/task_manager.rs b/src/task_manager.rs index 2581187..11401e2 100644 --- a/src/task_manager.rs +++ b/src/task_manager.rs @@ -1,6 +1,7 @@ use crate::Error; use chrono::Utc; use directories::ProjectDirs; +use futures::{StreamExt, TryStreamExt}; use serde::de::DeserializeOwned; use serde::Serialize; use sqlx::sqlite::{SqliteConnectOptions, SqliteJournalMode}; @@ -44,6 +45,8 @@ pub trait TaskPayload { fn get_key(&self) -> String; } +pub type TaskJob = fn(&Task) -> TaskStatus; + #[derive(sqlx::FromRow, Tabled, Debug)] pub struct Task { id: u32, @@ -58,6 +61,12 @@ pub struct Task { updated_at: Option>, } +impl Task { + pub fn get_key(&self) -> String { + self.payload_key.clone() + } +} + fn display_option_date(o: &Option>) -> String { match o { Some(s) => format!("{}", s), @@ -65,6 +74,9 @@ fn display_option_date(o: &Option>) -> String { } } +pub trait _Task: DeserializeOwned + Send + Unpin + 'static + Display {} +impl _Task for T {} + #[derive(Debug)] pub struct TaskManager { pool: SqlitePool, @@ -97,21 +109,34 @@ impl TaskManager { Ok(pool) } - pub async fn get_tasks< - T: DeserializeOwned + Send + Unpin + 'static + Display, - >( + fn get_task_builder( + status: Option, + limit: Option, + ) -> QueryBuilder<'static, Sqlite> { + let mut builder: QueryBuilder<'_, Sqlite> = QueryBuilder::new( + "select id, payload_key, payload, status_id, created_at, updated_at from tasks ", + ); + + if let Some(status) = status { + builder.push("where status_id = ").push_bind(status); + } + + builder.push("ORDER BY created_at DESC "); + + if let Some(limit) = limit { + builder.push("LIMIT ").push_bind(limit); + } + builder + } + + pub async fn get_tasks( &self, status: Option, - limit: u16, + limit: Option, ) -> crate::Result>> { - let tasks: Vec> = sqlx::query_as( - "select id, payload_key, payload, status_id, created_at, updated_at from tasks where status_id = ? order by ? limit ?", - ) - .bind(status.unwrap_or(TaskStatus::Pending)) - .bind("created_at DESC") - .bind(limit) - .fetch_all(&self.pool) - .await?; + let mut builder = Self::get_task_builder(status, limit); + + let tasks: Vec> = builder.build_query_as().fetch_all(&self.pool).await?; Ok(tasks) } @@ -153,4 +178,19 @@ impl TaskManager { Ok(()) } + + pub async fn run_tasks(&self, func: TaskJob) -> crate::Result<()> { + let mut builder = Self::get_task_builder(Some(TaskStatus::Pending), None); + + let rows = builder.build_query_as::>().fetch(&self.pool); + + let result: Vec<(Task, TaskStatus)> = rows.map(|x| { + let task = x.unwrap(); + let status = func(&task); + + (task, status) + }).collect().await; + + Ok(()) + } }