diff --git a/.idea/runConfigurations/Load_Tasks.xml b/.idea/runConfigurations/Load_Tasks.xml new file mode 100644 index 0000000..41f2816 --- /dev/null +++ b/.idea/runConfigurations/Load_Tasks.xml @@ -0,0 +1,22 @@ + + + + \ No newline at end of file diff --git a/.idea/runConfigurations/Query_Tasks.xml b/.idea/runConfigurations/Query_Tasks.xml new file mode 100644 index 0000000..ef4b918 --- /dev/null +++ b/.idea/runConfigurations/Query_Tasks.xml @@ -0,0 +1,22 @@ + + + + \ No newline at end of file diff --git a/.idea/runConfigurations/Run_Tasks.xml b/.idea/runConfigurations/Run_Tasks.xml new file mode 100644 index 0000000..ef08a15 --- /dev/null +++ b/.idea/runConfigurations/Run_Tasks.xml @@ -0,0 +1,22 @@ + + + + \ No newline at end of file diff --git a/Cargo.lock b/Cargo.lock index 5b6eeaa..dfdfe03 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -508,6 +508,21 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "futures" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "65bc07b1a8bc7c85c5f2e110c476c7389b4554ba72af57d8445ea63a576b0876" +dependencies = [ + "futures-channel", + "futures-core", + "futures-executor", + "futures-io", + "futures-sink", + "futures-task", + "futures-util", +] + [[package]] name = "futures-channel" version = "0.3.31" @@ -552,6 +567,17 @@ version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9e5c1b78ca4aae1ac06c48a526a655760685149f0d465d21f37abfe57ce075c6" +[[package]] +name = "futures-macro" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "162ee34ebcb7c64a8abebc059ce0fee27c2262618d7b60ed8faf72fef13c3650" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "futures-sink" version = "0.3.31" @@ -570,8 +596,10 @@ version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9fa08315bb612088cc391249efdc3bc77536f16c91f6cf495e6fbe85b20a4a81" dependencies = [ + "futures-channel", "futures-core", "futures-io", + "futures-macro", "futures-sink", "futures-task", "memchr", @@ -1301,6 +1329,7 @@ dependencies = [ "clap", "directories", "figment", + "futures", "serde", "serde_json", "sqlx", diff --git a/Cargo.toml b/Cargo.toml index 6482ca3..3f51301 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,4 +16,5 @@ tracing = "0.1.41" tracing-subscriber = { version = "0.3.19" , features = ["env-filter"]} figment = { version = "0.10.19", features = ["env"] } tracing-core = "0.1.33" -tabled = "0.19.0" \ No newline at end of file +tabled = "0.19.0" +futures = "0.3.31" \ No newline at end of file diff --git a/src/config.rs b/src/config.rs index 2b71f1c..04bdbff 100644 --- a/src/config.rs +++ b/src/config.rs @@ -66,6 +66,7 @@ pub enum Command { path: PathBuf, }, Query, + Run, #[clap(skip)] None, } diff --git a/src/main.rs b/src/main.rs index 319961a..ff1f2ac 100644 --- a/src/main.rs +++ b/src/main.rs @@ -5,7 +5,7 @@ use figment::{ }; use readwise_bulk_upload::config::{Command, Config}; use readwise_bulk_upload::readwise::DocumentPayload; -use readwise_bulk_upload::task_manager::TaskManager; +use readwise_bulk_upload::task_manager::{TaskManager, TaskStatus}; use readwise_bulk_upload::{Error, Result}; use std::fs::File; use tabled::Table; @@ -29,6 +29,7 @@ async fn main() -> Result<()> { } async fn run(command: &Command) -> Result<()> { + let task_manager = TaskManager::new().await?; match command { Command::LoadTasks { path } => { let file = File::open(path).map_err(|_| { @@ -40,16 +41,21 @@ async fn run(command: &Command) -> Result<()> { let documents: Vec = serde_json::from_reader(file)?; - let task_manager = TaskManager::new().await?; task_manager.load_tasks(documents).await?; } Command::Query => { - let task_manager = TaskManager::new().await?; - let tasks = task_manager.get_tasks::(None, 25).await?; + let tasks = task_manager.get_tasks::(None, Some(25)).await?; println!("{}", Table::new(tasks)); } + Command::Run => { + task_manager.run_tasks::(|task| { + println!("{}", task.get_key()); + + TaskStatus::Completed + }).await?; + } Command::None => { Config::command().print_help()?; } diff --git a/src/task_manager.rs b/src/task_manager.rs index 2581187..11401e2 100644 --- a/src/task_manager.rs +++ b/src/task_manager.rs @@ -1,6 +1,7 @@ use crate::Error; use chrono::Utc; use directories::ProjectDirs; +use futures::{StreamExt, TryStreamExt}; use serde::de::DeserializeOwned; use serde::Serialize; use sqlx::sqlite::{SqliteConnectOptions, SqliteJournalMode}; @@ -44,6 +45,8 @@ pub trait TaskPayload { fn get_key(&self) -> String; } +pub type TaskJob = fn(&Task) -> TaskStatus; + #[derive(sqlx::FromRow, Tabled, Debug)] pub struct Task { id: u32, @@ -58,6 +61,12 @@ pub struct Task { updated_at: Option>, } +impl Task { + pub fn get_key(&self) -> String { + self.payload_key.clone() + } +} + fn display_option_date(o: &Option>) -> String { match o { Some(s) => format!("{}", s), @@ -65,6 +74,9 @@ fn display_option_date(o: &Option>) -> String { } } +pub trait _Task: DeserializeOwned + Send + Unpin + 'static + Display {} +impl _Task for T {} + #[derive(Debug)] pub struct TaskManager { pool: SqlitePool, @@ -97,21 +109,34 @@ impl TaskManager { Ok(pool) } - pub async fn get_tasks< - T: DeserializeOwned + Send + Unpin + 'static + Display, - >( + fn get_task_builder( + status: Option, + limit: Option, + ) -> QueryBuilder<'static, Sqlite> { + let mut builder: QueryBuilder<'_, Sqlite> = QueryBuilder::new( + "select id, payload_key, payload, status_id, created_at, updated_at from tasks ", + ); + + if let Some(status) = status { + builder.push("where status_id = ").push_bind(status); + } + + builder.push("ORDER BY created_at DESC "); + + if let Some(limit) = limit { + builder.push("LIMIT ").push_bind(limit); + } + builder + } + + pub async fn get_tasks( &self, status: Option, - limit: u16, + limit: Option, ) -> crate::Result>> { - let tasks: Vec> = sqlx::query_as( - "select id, payload_key, payload, status_id, created_at, updated_at from tasks where status_id = ? order by ? limit ?", - ) - .bind(status.unwrap_or(TaskStatus::Pending)) - .bind("created_at DESC") - .bind(limit) - .fetch_all(&self.pool) - .await?; + let mut builder = Self::get_task_builder(status, limit); + + let tasks: Vec> = builder.build_query_as().fetch_all(&self.pool).await?; Ok(tasks) } @@ -153,4 +178,19 @@ impl TaskManager { Ok(()) } + + pub async fn run_tasks(&self, func: TaskJob) -> crate::Result<()> { + let mut builder = Self::get_task_builder(Some(TaskStatus::Pending), None); + + let rows = builder.build_query_as::>().fetch(&self.pool); + + let result: Vec<(Task, TaskStatus)> = rows.map(|x| { + let task = x.unwrap(); + let status = func(&task); + + (task, status) + }).collect().await; + + Ok(()) + } }