diff --git a/.idea/runConfigurations/Load_Tasks.xml b/.idea/runConfigurations/Load_Tasks.xml
new file mode 100644
index 0000000..41f2816
--- /dev/null
+++ b/.idea/runConfigurations/Load_Tasks.xml
@@ -0,0 +1,22 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.idea/runConfigurations/Query_Tasks.xml b/.idea/runConfigurations/Query_Tasks.xml
new file mode 100644
index 0000000..ef4b918
--- /dev/null
+++ b/.idea/runConfigurations/Query_Tasks.xml
@@ -0,0 +1,22 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.idea/runConfigurations/Run_Tasks.xml b/.idea/runConfigurations/Run_Tasks.xml
new file mode 100644
index 0000000..ef08a15
--- /dev/null
+++ b/.idea/runConfigurations/Run_Tasks.xml
@@ -0,0 +1,22 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/Cargo.lock b/Cargo.lock
index 5b6eeaa..dfdfe03 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -508,6 +508,21 @@ dependencies = [
"percent-encoding",
]
+[[package]]
+name = "futures"
+version = "0.3.31"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "65bc07b1a8bc7c85c5f2e110c476c7389b4554ba72af57d8445ea63a576b0876"
+dependencies = [
+ "futures-channel",
+ "futures-core",
+ "futures-executor",
+ "futures-io",
+ "futures-sink",
+ "futures-task",
+ "futures-util",
+]
+
[[package]]
name = "futures-channel"
version = "0.3.31"
@@ -552,6 +567,17 @@ version = "0.3.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9e5c1b78ca4aae1ac06c48a526a655760685149f0d465d21f37abfe57ce075c6"
+[[package]]
+name = "futures-macro"
+version = "0.3.31"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "162ee34ebcb7c64a8abebc059ce0fee27c2262618d7b60ed8faf72fef13c3650"
+dependencies = [
+ "proc-macro2",
+ "quote",
+ "syn",
+]
+
[[package]]
name = "futures-sink"
version = "0.3.31"
@@ -570,8 +596,10 @@ version = "0.3.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9fa08315bb612088cc391249efdc3bc77536f16c91f6cf495e6fbe85b20a4a81"
dependencies = [
+ "futures-channel",
"futures-core",
"futures-io",
+ "futures-macro",
"futures-sink",
"futures-task",
"memchr",
@@ -1301,6 +1329,7 @@ dependencies = [
"clap",
"directories",
"figment",
+ "futures",
"serde",
"serde_json",
"sqlx",
diff --git a/Cargo.toml b/Cargo.toml
index 6482ca3..3f51301 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -16,4 +16,5 @@ tracing = "0.1.41"
tracing-subscriber = { version = "0.3.19" , features = ["env-filter"]}
figment = { version = "0.10.19", features = ["env"] }
tracing-core = "0.1.33"
-tabled = "0.19.0"
\ No newline at end of file
+tabled = "0.19.0"
+futures = "0.3.31"
\ No newline at end of file
diff --git a/src/config.rs b/src/config.rs
index 2b71f1c..04bdbff 100644
--- a/src/config.rs
+++ b/src/config.rs
@@ -66,6 +66,7 @@ pub enum Command {
path: PathBuf,
},
Query,
+ Run,
#[clap(skip)]
None,
}
diff --git a/src/main.rs b/src/main.rs
index 319961a..ff1f2ac 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -5,7 +5,7 @@ use figment::{
};
use readwise_bulk_upload::config::{Command, Config};
use readwise_bulk_upload::readwise::DocumentPayload;
-use readwise_bulk_upload::task_manager::TaskManager;
+use readwise_bulk_upload::task_manager::{TaskManager, TaskStatus};
use readwise_bulk_upload::{Error, Result};
use std::fs::File;
use tabled::Table;
@@ -29,6 +29,7 @@ async fn main() -> Result<()> {
}
async fn run(command: &Command) -> Result<()> {
+ let task_manager = TaskManager::new().await?;
match command {
Command::LoadTasks { path } => {
let file = File::open(path).map_err(|_| {
@@ -40,16 +41,21 @@ async fn run(command: &Command) -> Result<()> {
let documents: Vec = serde_json::from_reader(file)?;
- let task_manager = TaskManager::new().await?;
task_manager.load_tasks(documents).await?;
}
Command::Query => {
- let task_manager = TaskManager::new().await?;
- let tasks = task_manager.get_tasks::(None, 25).await?;
+ let tasks = task_manager.get_tasks::(None, Some(25)).await?;
println!("{}", Table::new(tasks));
}
+ Command::Run => {
+ task_manager.run_tasks::(|task| {
+ println!("{}", task.get_key());
+
+ TaskStatus::Completed
+ }).await?;
+ }
Command::None => {
Config::command().print_help()?;
}
diff --git a/src/task_manager.rs b/src/task_manager.rs
index 2581187..11401e2 100644
--- a/src/task_manager.rs
+++ b/src/task_manager.rs
@@ -1,6 +1,7 @@
use crate::Error;
use chrono::Utc;
use directories::ProjectDirs;
+use futures::{StreamExt, TryStreamExt};
use serde::de::DeserializeOwned;
use serde::Serialize;
use sqlx::sqlite::{SqliteConnectOptions, SqliteJournalMode};
@@ -44,6 +45,8 @@ pub trait TaskPayload {
fn get_key(&self) -> String;
}
+pub type TaskJob = fn(&Task) -> TaskStatus;
+
#[derive(sqlx::FromRow, Tabled, Debug)]
pub struct Task {
id: u32,
@@ -58,6 +61,12 @@ pub struct Task {
updated_at: Option>,
}
+impl Task {
+ pub fn get_key(&self) -> String {
+ self.payload_key.clone()
+ }
+}
+
fn display_option_date(o: &Option>) -> String {
match o {
Some(s) => format!("{}", s),
@@ -65,6 +74,9 @@ fn display_option_date(o: &Option>) -> String {
}
}
+pub trait _Task: DeserializeOwned + Send + Unpin + 'static + Display {}
+impl _Task for T {}
+
#[derive(Debug)]
pub struct TaskManager {
pool: SqlitePool,
@@ -97,21 +109,34 @@ impl TaskManager {
Ok(pool)
}
- pub async fn get_tasks<
- T: DeserializeOwned + Send + Unpin + 'static + Display,
- >(
+ fn get_task_builder(
+ status: Option,
+ limit: Option,
+ ) -> QueryBuilder<'static, Sqlite> {
+ let mut builder: QueryBuilder<'_, Sqlite> = QueryBuilder::new(
+ "select id, payload_key, payload, status_id, created_at, updated_at from tasks ",
+ );
+
+ if let Some(status) = status {
+ builder.push("where status_id = ").push_bind(status);
+ }
+
+ builder.push("ORDER BY created_at DESC ");
+
+ if let Some(limit) = limit {
+ builder.push("LIMIT ").push_bind(limit);
+ }
+ builder
+ }
+
+ pub async fn get_tasks(
&self,
status: Option,
- limit: u16,
+ limit: Option,
) -> crate::Result>> {
- let tasks: Vec> = sqlx::query_as(
- "select id, payload_key, payload, status_id, created_at, updated_at from tasks where status_id = ? order by ? limit ?",
- )
- .bind(status.unwrap_or(TaskStatus::Pending))
- .bind("created_at DESC")
- .bind(limit)
- .fetch_all(&self.pool)
- .await?;
+ let mut builder = Self::get_task_builder(status, limit);
+
+ let tasks: Vec> = builder.build_query_as().fetch_all(&self.pool).await?;
Ok(tasks)
}
@@ -153,4 +178,19 @@ impl TaskManager {
Ok(())
}
+
+ pub async fn run_tasks(&self, func: TaskJob) -> crate::Result<()> {
+ let mut builder = Self::get_task_builder(Some(TaskStatus::Pending), None);
+
+ let rows = builder.build_query_as::>().fetch(&self.pool);
+
+ let result: Vec<(Task, TaskStatus)> = rows.map(|x| {
+ let task = x.unwrap();
+ let status = func(&task);
+
+ (task, status)
+ }).collect().await;
+
+ Ok(())
+ }
}