diff --git a/.idea/runConfigurations/Load_Tasks.xml b/.idea/runConfigurations/Load_Tasks.xml
new file mode 100644
index 0000000..41f2816
--- /dev/null
+++ b/.idea/runConfigurations/Load_Tasks.xml
@@ -0,0 +1,22 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.idea/runConfigurations/Query_Tasks.xml b/.idea/runConfigurations/Query_Tasks.xml
new file mode 100644
index 0000000..ef4b918
--- /dev/null
+++ b/.idea/runConfigurations/Query_Tasks.xml
@@ -0,0 +1,22 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.idea/runConfigurations/Run_Tasks.xml b/.idea/runConfigurations/Run_Tasks.xml
new file mode 100644
index 0000000..ef08a15
--- /dev/null
+++ b/.idea/runConfigurations/Run_Tasks.xml
@@ -0,0 +1,22 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.idea/vcs.xml b/.idea/vcs.xml
index 35eb1dd..8d3e42f 100644
--- a/.idea/vcs.xml
+++ b/.idea/vcs.xml
@@ -1,5 +1,10 @@
+
+
+
diff --git a/Cargo.lock b/Cargo.lock
index 87f2582..dfdfe03 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -17,6 +17,19 @@ version = "2.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "512761e0bb2578dd7380c6baaa0f4ce03e84f95e960231d1dec8bf4d7d6e2627"
+[[package]]
+name = "ahash"
+version = "0.8.12"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "5a15f179cd60c4584b8a8c596927aadc462e27f2ca70c04e0071964a73ba7a75"
+dependencies = [
+ "cfg-if",
+ "getrandom 0.3.2",
+ "once_cell",
+ "version_check",
+ "zerocopy",
+]
+
[[package]]
name = "aho-corasick"
version = "1.1.3"
@@ -172,6 +185,12 @@ version = "3.17.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1628fb46dfa0b37568d12e5edd512553eccf6a22a78e8bde00bb4aed84d5bdbf"
+[[package]]
+name = "bytecount"
+version = "0.6.8"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "5ce89b21cab1437276d2650d57e971f9d548a2d9037cc231abdc0562b97498ce"
+
[[package]]
name = "bytemuck"
version = "1.23.0"
@@ -489,6 +508,21 @@ dependencies = [
"percent-encoding",
]
+[[package]]
+name = "futures"
+version = "0.3.31"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "65bc07b1a8bc7c85c5f2e110c476c7389b4554ba72af57d8445ea63a576b0876"
+dependencies = [
+ "futures-channel",
+ "futures-core",
+ "futures-executor",
+ "futures-io",
+ "futures-sink",
+ "futures-task",
+ "futures-util",
+]
+
[[package]]
name = "futures-channel"
version = "0.3.31"
@@ -533,6 +567,17 @@ version = "0.3.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9e5c1b78ca4aae1ac06c48a526a655760685149f0d465d21f37abfe57ce075c6"
+[[package]]
+name = "futures-macro"
+version = "0.3.31"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "162ee34ebcb7c64a8abebc059ce0fee27c2262618d7b60ed8faf72fef13c3650"
+dependencies = [
+ "proc-macro2",
+ "quote",
+ "syn",
+]
+
[[package]]
name = "futures-sink"
version = "0.3.31"
@@ -551,8 +596,10 @@ version = "0.3.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9fa08315bb612088cc391249efdc3bc77536f16c91f6cf495e6fbe85b20a4a81"
dependencies = [
+ "futures-channel",
"futures-core",
"futures-io",
+ "futures-macro",
"futures-sink",
"futures-task",
"memchr",
@@ -1059,6 +1106,17 @@ version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39"
+[[package]]
+name = "papergrid"
+version = "0.15.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "30268a8d20c2c0d126b2b6610ab405f16517f6ba9f244d8c59ac2c512a8a1ce7"
+dependencies = [
+ "ahash",
+ "bytecount",
+ "unicode-width",
+]
+
[[package]]
name = "parking"
version = "2.2.1"
@@ -1174,6 +1232,28 @@ dependencies = [
"zerocopy",
]
+[[package]]
+name = "proc-macro-error-attr2"
+version = "2.0.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "96de42df36bb9bba5542fe9f1a054b8cc87e172759a1868aa05c1f3acc89dfc5"
+dependencies = [
+ "proc-macro2",
+ "quote",
+]
+
+[[package]]
+name = "proc-macro-error2"
+version = "2.0.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "11ec05c52be0a07b08061f7dd003e7d7092e0472bc731b4af7bb1ef876109802"
+dependencies = [
+ "proc-macro-error-attr2",
+ "proc-macro2",
+ "quote",
+ "syn",
+]
+
[[package]]
name = "proc-macro2"
version = "1.0.95"
@@ -1249,9 +1329,11 @@ dependencies = [
"clap",
"directories",
"figment",
+ "futures",
"serde",
"serde_json",
"sqlx",
+ "tabled",
"thiserror",
"tokio",
"tracing",
@@ -1762,6 +1844,30 @@ dependencies = [
"syn",
]
+[[package]]
+name = "tabled"
+version = "0.19.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "228d124371171cd39f0f454b58f73ddebeeef3cef3207a82ffea1c29465aea43"
+dependencies = [
+ "papergrid",
+ "tabled_derive",
+ "testing_table",
+]
+
+[[package]]
+name = "tabled_derive"
+version = "0.11.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "0ea5d1b13ca6cff1f9231ffd62f15eefd72543dab5e468735f1a456728a02846"
+dependencies = [
+ "heck",
+ "proc-macro-error2",
+ "proc-macro2",
+ "quote",
+ "syn",
+]
+
[[package]]
name = "tempfile"
version = "3.19.1"
@@ -1775,6 +1881,15 @@ dependencies = [
"windows-sys 0.59.0",
]
+[[package]]
+name = "testing_table"
+version = "0.3.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "0f8daae29995a24f65619e19d8d31dea5b389f3d853d8bf297bbf607cd0014cc"
+dependencies = [
+ "unicode-width",
+]
+
[[package]]
name = "thiserror"
version = "2.0.12"
@@ -1972,6 +2087,12 @@ version = "0.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e70f2a8b45122e719eb623c01822704c4e0907e7e426a05927e1a1cfff5b75d0"
+[[package]]
+name = "unicode-width"
+version = "0.2.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "1fc81956842c57dac11422a97c3b8195a1ff727f06e85c84ed2e8aa277c9a0fd"
+
[[package]]
name = "url"
version = "2.5.4"
diff --git a/Cargo.toml b/Cargo.toml
index da95ddc..3f51301 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -16,3 +16,5 @@ tracing = "0.1.41"
tracing-subscriber = { version = "0.3.19" , features = ["env-filter"]}
figment = { version = "0.10.19", features = ["env"] }
tracing-core = "0.1.33"
+tabled = "0.19.0"
+futures = "0.3.31"
\ No newline at end of file
diff --git a/src/config.rs b/src/config.rs
index 680fdf7..04bdbff 100644
--- a/src/config.rs
+++ b/src/config.rs
@@ -1,4 +1,4 @@
-use clap::{Parser, ValueEnum};
+use clap::{Parser, Subcommand, ValueEnum};
use serde::{Deserialize, Serialize};
use std::fmt;
use std::path::PathBuf;
@@ -57,9 +57,32 @@ impl Into for VerbosityLevel {
}
}
+#[derive(Debug, Subcommand)]
+#[clap(rename_all = "snake_case")]
+pub enum Command {
+ /// Load task into the database from [path]
+ LoadTasks{
+ /// Path to the file
+ path: PathBuf,
+ },
+ Query,
+ Run,
+ #[clap(skip)]
+ None,
+}
+
+impl Default for Command {
+ fn default() -> Self {
+ Command::None
+ }
+}
+
#[derive(Debug, Parser, Serialize, Deserialize)]
pub struct Config {
- path: PathBuf,
+ #[command(subcommand)]
+ #[serde(skip)]
+ pub command: Command,
+
#[arg(
long,
short = 'v',
@@ -71,9 +94,6 @@ pub struct Config {
}
impl Config {
- pub fn path(&self) -> &PathBuf {
- &self.path
- }
pub fn log_level(&self) -> LevelFilter {
self.log_level.clone().into()
diff --git a/src/lib.rs b/src/lib.rs
index dabd6d3..f63d5eb 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -1,5 +1,5 @@
mod error;
-pub mod sql;
+pub mod task_manager;
pub mod config;
pub mod readwise;
diff --git a/src/main.rs b/src/main.rs
index 985c0e4..ff1f2ac 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -1,16 +1,21 @@
-use clap::Parser;
-use readwise_bulk_upload::config::Config;
+use clap::{CommandFactory, Parser};
+use figment::{
+ providers::{Env, Serialized},
+ Figment,
+};
+use readwise_bulk_upload::config::{Command, Config};
use readwise_bulk_upload::readwise::DocumentPayload;
-use readwise_bulk_upload::sql::TaskManager;
+use readwise_bulk_upload::task_manager::{TaskManager, TaskStatus};
use readwise_bulk_upload::{Error, Result};
use std::fs::File;
+use tabled::Table;
use tracing_subscriber;
-use figment::{Figment, providers::{Serialized, Env}};
#[tokio::main]
async fn main() -> Result<()> {
+ let cli = Config::parse();
let args: Config = Figment::new()
- .merge(Serialized::defaults(Config::parse()))
+ .merge(Serialized::defaults(&cli))
.merge(Env::prefixed("APP_"))
.extract()?;
@@ -18,18 +23,43 @@ async fn main() -> Result<()> {
.with_max_level(args.log_level())
.init();
- let file = File::open(args.path()).map_err(|_| {
- Error::Runtime(format!(
- r#"The file "{}" could not be open"#,
- args.path().display()
- ))
- })?;
-
- let documents: Vec = serde_json::from_reader(file)?;
-
- let task_manager = TaskManager::new().await?;
-
- task_manager.load_tasks(documents).await?;
+ run(&cli.command).await?;
+
+ Ok(())
+}
+
+async fn run(command: &Command) -> Result<()> {
+ let task_manager = TaskManager::new().await?;
+ match command {
+ Command::LoadTasks { path } => {
+ let file = File::open(path).map_err(|_| {
+ Error::Runtime(format!(
+ r#"The file "{}" could not be open"#,
+ path.display()
+ ))
+ })?;
+
+ let documents: Vec = serde_json::from_reader(file)?;
+
+
+ task_manager.load_tasks(documents).await?;
+ }
+ Command::Query => {
+ let tasks = task_manager.get_tasks::(None, Some(25)).await?;
+
+ println!("{}", Table::new(tasks));
+ }
+ Command::Run => {
+ task_manager.run_tasks::(|task| {
+ println!("{}", task.get_key());
+
+ TaskStatus::Completed
+ }).await?;
+ }
+ Command::None => {
+ Config::command().print_help()?;
+ }
+ }
Ok(())
}
diff --git a/src/readwise.rs b/src/readwise.rs
index 33bf9ed..e3d7a42 100644
--- a/src/readwise.rs
+++ b/src/readwise.rs
@@ -1,7 +1,8 @@
+use crate::task_manager::TaskPayload;
use chrono::{DateTime, Local};
-use serde::{Deserialize, Deserializer, de, Serialize};
+use serde::{de, Deserialize, Deserializer, Serialize};
use serde_json::Value;
-use crate::sql::TaskPayload;
+use std::fmt::Display;
#[derive(Deserialize, Serialize, Debug)]
pub struct DocumentPayload {
@@ -14,6 +15,16 @@ pub struct DocumentPayload {
location: String,
}
+impl Display for DocumentPayload {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ write!(
+ f,
+ "{}",
+ serde_json::to_string_pretty(self).map_err(|_| std::fmt::Error)?
+ )
+ }
+}
+
impl TaskPayload for DocumentPayload {
fn get_key(&self) -> String {
self.url.clone()
diff --git a/src/sql.rs b/src/sql.rs
deleted file mode 100644
index 7d2a442..0000000
--- a/src/sql.rs
+++ /dev/null
@@ -1,94 +0,0 @@
-use crate::Error;
-use directories::ProjectDirs;
-use serde::Serialize;
-use sqlx::sqlite::{SqliteConnectOptions, SqliteJournalMode};
-use sqlx::{QueryBuilder, Sqlite, SqlitePool};
-use tokio::fs;
-use tracing::{info, instrument};
-
-static SQLITE_BIND_LIMIT: usize = 32766;
-
-#[derive(sqlx::Type)]
-#[repr(u8)]
-pub enum TaskStatus {
- Pending = 1,
- InProgress = 2,
- Completed = 3,
- Failed = 4,
-}
-
-pub trait TaskPayload {
- fn get_key(&self) -> String;
-}
-
-#[derive(Debug)]
-pub struct TaskManager {
- pool: SqlitePool,
-}
-
-impl TaskManager {
- pub async fn new() -> Result {
- Ok(Self {
- pool: Self::connect_database().await?,
- })
- }
-
- async fn connect_database() -> crate::Result {
- let project_dir = ProjectDirs::from("", "", env!("CARGO_PKG_NAME"))
- .ok_or(Error::Unhandled("Could not get standard directories"))?;
-
- let database_file_path = project_dir.data_dir().join("db.sql");
-
- fs::create_dir_all(project_dir.data_dir()).await?;
-
- let opts = SqliteConnectOptions::new()
- .filename(database_file_path)
- .create_if_missing(true)
- .journal_mode(SqliteJournalMode::Wal);
-
- let pool = SqlitePool::connect_with(opts).await?;
-
- sqlx::migrate!("./migrations").run(&pool).await?;
-
- Ok(pool)
- }
-
- #[instrument(skip(self, values))]
- pub async fn load_tasks(&self, values: Vec) -> crate::Result<()>
- where
- T: TaskPayload + Serialize + std::fmt::Debug,
- {
- let mut tx = self.pool.begin().await?;
- let mut builder: QueryBuilder<'_, Sqlite> =
- QueryBuilder::new("insert into tasks(payload_key, payload, status_id)");
-
- let args: crate::Result> = values
- .iter()
- .map(|value| Ok((value.get_key(), serde_json::to_string(value)?)))
- .collect();
-
-
- let mut affected_rows = 0;
- // Chunk the query by the size limit of bind params
- for chunk in args?.chunks(SQLITE_BIND_LIMIT / 3) {
- builder.push_values(chunk, |mut builder, item| {
- builder
- .push_bind(&item.0)
- .push_bind(&item.1)
- .push_bind(TaskStatus::Pending);
- });
- builder.push("ON conflict (payload_key) DO NOTHING");
-
- let query = builder.build();
-
- affected_rows += query.execute(&mut *tx).await?.rows_affected();
- builder.reset();
- }
-
- tx.commit().await?;
-
- info!("{} rows inserted.", affected_rows);
-
- Ok(())
- }
-}
diff --git a/src/task_manager.rs b/src/task_manager.rs
new file mode 100644
index 0000000..11401e2
--- /dev/null
+++ b/src/task_manager.rs
@@ -0,0 +1,196 @@
+use crate::Error;
+use chrono::Utc;
+use directories::ProjectDirs;
+use futures::{StreamExt, TryStreamExt};
+use serde::de::DeserializeOwned;
+use serde::Serialize;
+use sqlx::sqlite::{SqliteConnectOptions, SqliteJournalMode};
+use sqlx::{QueryBuilder, Sqlite, SqlitePool};
+use std::fmt::Display;
+use tabled::Tabled;
+use tokio::fs;
+use tracing::{info, instrument};
+
+static SQLITE_BIND_LIMIT: usize = 32766;
+
+#[derive(sqlx::Type, Debug)]
+#[repr(u8)]
+pub enum TaskStatus {
+ Pending = 1,
+ InProgress = 2,
+ Completed = 3,
+ Failed = 4,
+}
+
+impl Display for TaskStatus {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ match self {
+ TaskStatus::Pending => {
+ write!(f, "Pending")
+ }
+ TaskStatus::InProgress => {
+ write!(f, "In Progress")
+ }
+ TaskStatus::Completed => {
+ write!(f, "Completed")
+ }
+ TaskStatus::Failed => {
+ write!(f, "Failed")
+ }
+ }
+ }
+}
+
+pub trait TaskPayload {
+ fn get_key(&self) -> String;
+}
+
+pub type TaskJob = fn(&Task) -> TaskStatus;
+
+#[derive(sqlx::FromRow, Tabled, Debug)]
+pub struct Task {
+ id: u32,
+ payload_key: String,
+ #[sqlx(json)]
+ #[tabled(skip)]
+ payload: T,
+ #[sqlx(rename = "status_id")]
+ status: TaskStatus,
+ created_at: chrono::DateTime,
+ #[tabled(display = "display_option_date")]
+ updated_at: Option>,
+}
+
+impl Task {
+ pub fn get_key(&self) -> String {
+ self.payload_key.clone()
+ }
+}
+
+fn display_option_date(o: &Option>) -> String {
+ match o {
+ Some(s) => format!("{}", s),
+ None => String::from(""),
+ }
+}
+
+pub trait _Task: DeserializeOwned + Send + Unpin + 'static + Display {}
+impl _Task for T {}
+
+#[derive(Debug)]
+pub struct TaskManager {
+ pool: SqlitePool,
+}
+
+impl TaskManager {
+ pub async fn new() -> Result {
+ Ok(Self {
+ pool: Self::connect_database().await?,
+ })
+ }
+
+ async fn connect_database() -> crate::Result {
+ let project_dir = ProjectDirs::from("", "", env!("CARGO_PKG_NAME"))
+ .ok_or(Error::Unhandled("Could not get standard directories"))?;
+
+ let database_file_path = project_dir.data_dir().join("db.sql");
+
+ fs::create_dir_all(project_dir.data_dir()).await?;
+
+ let opts = SqliteConnectOptions::new()
+ .filename(database_file_path)
+ .create_if_missing(true)
+ .journal_mode(SqliteJournalMode::Wal);
+
+ let pool = SqlitePool::connect_with(opts).await?;
+
+ sqlx::migrate!("./migrations").run(&pool).await?;
+
+ Ok(pool)
+ }
+
+ fn get_task_builder(
+ status: Option,
+ limit: Option,
+ ) -> QueryBuilder<'static, Sqlite> {
+ let mut builder: QueryBuilder<'_, Sqlite> = QueryBuilder::new(
+ "select id, payload_key, payload, status_id, created_at, updated_at from tasks ",
+ );
+
+ if let Some(status) = status {
+ builder.push("where status_id = ").push_bind(status);
+ }
+
+ builder.push("ORDER BY created_at DESC ");
+
+ if let Some(limit) = limit {
+ builder.push("LIMIT ").push_bind(limit);
+ }
+ builder
+ }
+
+ pub async fn get_tasks(
+ &self,
+ status: Option,
+ limit: Option,
+ ) -> crate::Result>> {
+ let mut builder = Self::get_task_builder(status, limit);
+
+ let tasks: Vec> = builder.build_query_as().fetch_all(&self.pool).await?;
+
+ Ok(tasks)
+ }
+
+ #[instrument(skip(self, values))]
+ pub async fn load_tasks(&self, values: Vec) -> crate::Result<()>
+ where
+ T: TaskPayload + Serialize + std::fmt::Debug,
+ {
+ let mut tx = self.pool.begin().await?;
+ let mut builder: QueryBuilder<'_, Sqlite> =
+ QueryBuilder::new("insert into tasks(payload_key, payload, status_id)");
+
+ let args: crate::Result> = values
+ .iter()
+ .map(|value| Ok((value.get_key(), serde_json::to_string(value)?)))
+ .collect();
+
+ let mut affected_rows = 0;
+ // Chunk the query by the size limit of bind params
+ for chunk in args?.chunks(SQLITE_BIND_LIMIT / 3) {
+ builder.push_values(chunk, |mut builder, item| {
+ builder
+ .push_bind(&item.0)
+ .push_bind(&item.1)
+ .push_bind(TaskStatus::Pending);
+ });
+ builder.push("ON conflict (payload_key) DO NOTHING");
+
+ let query = builder.build();
+
+ affected_rows += query.execute(&mut *tx).await?.rows_affected();
+ builder.reset();
+ }
+
+ tx.commit().await?;
+
+ info!("{} rows inserted.", affected_rows);
+
+ Ok(())
+ }
+
+ pub async fn run_tasks(&self, func: TaskJob) -> crate::Result<()> {
+ let mut builder = Self::get_task_builder(Some(TaskStatus::Pending), None);
+
+ let rows = builder.build_query_as::>().fetch(&self.pool);
+
+ let result: Vec<(Task, TaskStatus)> = rows.map(|x| {
+ let task = x.unwrap();
+ let status = func(&task);
+
+ (task, status)
+ }).collect().await;
+
+ Ok(())
+ }
+}