diff --git a/.idea/codeStyles/codeStyleConfig.xml b/.idea/codeStyles/codeStyleConfig.xml new file mode 100644 index 0000000..a55e7a1 --- /dev/null +++ b/.idea/codeStyles/codeStyleConfig.xml @@ -0,0 +1,5 @@ + + + + \ No newline at end of file diff --git a/Cargo.lock b/Cargo.lock index 5daa196..827a82a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -110,6 +110,28 @@ dependencies = [ "windows-sys 0.59.0", ] +[[package]] +name = "async-stream" +version = "0.3.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b5a71a6f37880a80d1d7f19efd781e4b5de42c88f0722cc13bcb6cc2cfe8476" +dependencies = [ + "async-stream-impl", + "futures-core", + "pin-project-lite", +] + +[[package]] +name = "async-stream-impl" +version = "0.3.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c7c24de15d275a1ecfd47a380fb4d5ec9bfe0933f309ed5e705b775596a3574d" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "atoi" version = "2.0.0" @@ -941,6 +963,7 @@ dependencies = [ name = "lib_sync_core" version = "0.1.0" dependencies = [ + "async-stream", "chrono", "clap", "directories", diff --git a/cli/bin/readwise/external_interface.rs b/cli/bin/readwise/external_interface.rs index 13b2f65..a484ca9 100644 --- a/cli/bin/readwise/external_interface.rs +++ b/cli/bin/readwise/external_interface.rs @@ -1,4 +1,4 @@ -use lib_sync_core::task_manager::TaskPayload; +use lib_sync_core::tasks::TaskPayload; use chrono::{DateTime, Local}; use serde::{de, Deserialize, Deserializer, Serialize}; use serde_json::Value; diff --git a/cli/bin/readwise/main.rs b/cli/bin/readwise/main.rs index bdfc584..29eee78 100644 --- a/cli/bin/readwise/main.rs +++ b/cli/bin/readwise/main.rs @@ -4,7 +4,7 @@ use figment::{ Figment, providers::{Env, Serialized}, }; -use lib_sync_core::task_manager::{TaskManager, TaskStatus}; +use lib_sync_core::tasks::{TaskStatus}; use cli::config::{Command, Config}; use cli::{Error, Result}; use std::fs::File; diff --git a/lib_sync_core/Cargo.toml b/lib_sync_core/Cargo.toml index c6d73f7..c2bd944 100644 --- a/lib_sync_core/Cargo.toml +++ b/lib_sync_core/Cargo.toml @@ -18,3 +18,4 @@ tracing-core = "0.1.33" tabled = "0.19.0" futures = "0.3.31" thiserror = "2.0.12" +async-stream = "0.3.6" diff --git a/lib_sync_core/src/database.rs b/lib_sync_core/src/database.rs new file mode 100644 index 0000000..f2bc877 --- /dev/null +++ b/lib_sync_core/src/database.rs @@ -0,0 +1,77 @@ +use crate::tasks::{Task, TaskPayload, TaskStatus}; +use futures::{Stream}; +mod sqlite; + +#[derive(Default, Clone)] +pub struct TaskPagination { + page_size: usize, + limit: Option, + offset: Option, + status: Option, +} + +impl TaskPagination { + pub fn new() -> Self { + Self::default() + } + + pub fn next(&self) -> Self { + Self { + page_size: self.page_size + self.page_size, + ..self.clone() + } + } + + pub fn prev(&self) -> Self { + Self { + page_size: self.page_size - self.page_size, + ..self.clone() + } + } + + pub fn set_page_size(&mut self, page_size: usize) { + self.page_size = page_size; + } + + pub fn set_limit(&mut self, limit: Option) { + self.limit = limit; + } + + pub fn set_offset(&mut self, offset: Option) { + self.offset = offset; + } + + pub fn set_status(&mut self, status: Option) { + self.status = status; + } +} + +pub struct TasksPage { + tasks: Vec>, + page: TaskPagination +} + +impl TasksPage { + fn new(tasks: Vec>, page: TaskPagination) -> Self { + Self { + tasks, + page + } + } + + pub fn next(&self) -> TaskPagination { + self.page.next() + } + + pub fn prev(&self) -> TaskPagination { + self.page.prev() + } +} + +pub trait TaskStorage { + async fn insert_tasks(&self, tasks: Vec>) -> crate::Result<()>; + fn get_tasks(&self, options: TaskStatus) -> impl Stream>>; + + async fn get_paginated_tasks(&self, page: &TaskPagination) -> crate::Result>; +} + diff --git a/lib_sync_core/src/database/sqlite.rs b/lib_sync_core/src/database/sqlite.rs new file mode 100644 index 0000000..c7fc993 --- /dev/null +++ b/lib_sync_core/src/database/sqlite.rs @@ -0,0 +1,117 @@ +use sqlx::{QueryBuilder, SqlitePool}; +use std::path::PathBuf; +use tokio::fs; +use sqlx::sqlite::{SqliteConnectOptions, SqliteJournalMode}; +use tracing::{info, instrument}; +use futures::{Stream, TryStreamExt}; +use crate::database::{TaskPagination, TaskStorage, TasksPage}; +use crate::tasks::{Task, TaskPayload, TaskStatus}; + +#[allow(unused)] +static SQLITE_BIND_LIMIT: usize = 32766; + +#[derive(Debug)] +pub struct Sqlite { + pool: SqlitePool, +} + +impl Sqlite { + pub async fn new>(base_path: P) -> crate::Result { + Ok(Self { + pool: Self::connect_database(base_path).await?, + }) + } + + async fn connect_database>(base_path: P) -> crate::Result { + let base_path = base_path.into(); + + let database_file_path = base_path.join("db.sql"); + + fs::create_dir_all(base_path).await?; + + let opts = SqliteConnectOptions::new() + .filename(database_file_path) + .create_if_missing(true) + .journal_mode(SqliteJournalMode::Wal); + + let pool = SqlitePool::connect_with(opts).await?; + + sqlx::migrate!("../migrations").run(&pool).await?; + + Ok(pool) + } +} + +impl TaskStorage for Sqlite { + #[instrument(skip(self, tasks))] + async fn insert_tasks(&self, tasks: Vec>) -> crate::Result<()> { + let mut tx = self.pool.begin().await?; + let mut builder: QueryBuilder<'_, sqlx::Sqlite> = + QueryBuilder::new("insert into tasks(payload_key, payload, status_id)"); + + let args: crate::Result> = tasks + .iter() + .map(|value| Ok((value.get_key(), serde_json::to_string(value.payload())?))) + .collect(); + + let mut affected_rows = 0; + // Chunk the query by the size limit of bind params + for chunk in args?.chunks(SQLITE_BIND_LIMIT / 3) { + builder.push_values(chunk, |mut builder, item| { + builder + .push_bind(&item.0) + .push_bind(&item.1) + .push_bind(TaskStatus::Pending); + }); + builder.push("ON conflict (payload_key) DO NOTHING"); + + let query = builder.build(); + + affected_rows += query.execute(&mut *tx).await?.rows_affected(); + builder.reset(); + } + + tx.commit().await?; + + info!("{} rows inserted.", affected_rows); + + Ok(()) + } + + fn get_tasks(&self, task_status: TaskStatus) -> impl Stream>> { + let query= sqlx::query_as::<_, Task>( + " + SELECT id, payload_key, payload, status_id, created_at, updated_at + FROM tasks + WHERE status_id = ? + ORDER BY created_at DESC + ", + ).bind(task_status); + + query.fetch(&self.pool).err_into::() + } + + async fn get_paginated_tasks(&self, page: &TaskPagination) -> crate::Result> { + let mut builder: QueryBuilder<'_, sqlx::Sqlite> = QueryBuilder::new( + "select id, payload_key, payload, status_id, created_at, updated_at from tasks ", + ); + + if let Some(status) = &page.status { + builder.push("where status_id = ").push_bind(status); + } + + builder.push("ORDER BY created_at DESC "); + + if let Some(limit) = &page.offset { + builder.push("OFFSET ").push_bind(limit); + } + + if let Some(limit) = &page.limit { + builder.push("LIMIT ").push_bind(limit); + } + + let tasks = builder.build_query_as::>().fetch_all(&self.pool).await?; + + Ok(TasksPage::new(tasks, page.clone())) + } +} \ No newline at end of file diff --git a/lib_sync_core/src/lib.rs b/lib_sync_core/src/lib.rs index 810ba8e..586934c 100644 --- a/lib_sync_core/src/lib.rs +++ b/lib_sync_core/src/lib.rs @@ -1,7 +1,8 @@ pub mod error; pub(crate) use error::*; -pub mod task_manager; +pub mod tasks; +mod database; pub fn add(left: u64, right: u64) -> u64 { left + right diff --git a/lib_sync_core/src/task_manager.rs b/lib_sync_core/src/task_manager.rs deleted file mode 100644 index 57a2556..0000000 --- a/lib_sync_core/src/task_manager.rs +++ /dev/null @@ -1,200 +0,0 @@ -use crate::error::Error; -use chrono::Utc; -use directories::ProjectDirs; -use futures::{StreamExt, TryStreamExt}; -use serde::de::DeserializeOwned; -use serde::Serialize; -use sqlx::sqlite::{SqliteConnectOptions, SqliteJournalMode}; -use sqlx::{QueryBuilder, Sqlite, SqlitePool}; -use std::fmt::Display; -use std::path::PathBuf; -use tabled::Tabled; -use tokio::fs; -use tracing::{info, instrument}; - -static SQLITE_BIND_LIMIT: usize = 32766; - -#[derive(sqlx::Type, Debug)] -#[repr(u8)] -pub enum TaskStatus { - Pending = 1, - InProgress = 2, - Completed = 3, - Failed = 4, -} - -impl Display for TaskStatus { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match self { - TaskStatus::Pending => { - write!(f, "Pending") - } - TaskStatus::InProgress => { - write!(f, "In Progress") - } - TaskStatus::Completed => { - write!(f, "Completed") - } - TaskStatus::Failed => { - write!(f, "Failed") - } - } - } -} - -pub trait TaskPayload { - fn get_key(&self) -> String; -} - -pub type TaskJob = fn(&Task) -> TaskStatus; - -#[derive(sqlx::FromRow, Tabled, Debug)] -pub struct Task { - id: u32, - payload_key: String, - #[sqlx(json)] - #[tabled(skip)] - payload: T, - #[sqlx(rename = "status_id")] - status: TaskStatus, - created_at: chrono::DateTime, - #[tabled(display = "display_option_date")] - updated_at: Option>, -} - -impl Task { - pub fn get_key(&self) -> String { - self.payload_key.clone() - } -} - -fn display_option_date(o: &Option>) -> String { - match o { - Some(s) => format!("{}", s), - None => String::from(""), - } -} - -pub trait _Task: DeserializeOwned + Send + Unpin + 'static + Display {} -impl _Task for T {} - -#[derive(Debug)] -pub struct TaskManager { - base_path: PathBuf, - pool: SqlitePool, -} - -impl TaskManager { - pub async fn new>(base_path: P) -> Result { - let base_path = base_path.into(); - let pool = Self::connect_database(base_path.clone()).await?; - Ok(Self { - base_path, - pool, - }) - } - - async fn connect_database>(base_path: P) -> crate::Result { - let base_path = base_path.into(); - - let database_file_path = base_path.join("db.sql"); - - fs::create_dir_all(base_path).await?; - - let opts = SqliteConnectOptions::new() - .filename(database_file_path) - .create_if_missing(true) - .journal_mode(SqliteJournalMode::Wal); - - let pool = SqlitePool::connect_with(opts).await?; - - sqlx::migrate!("../migrations").run(&pool).await?; - - Ok(pool) - } - - fn get_task_builder( - status: Option, - limit: Option, - ) -> QueryBuilder<'static, Sqlite> { - let mut builder: QueryBuilder<'_, Sqlite> = QueryBuilder::new( - "select id, payload_key, payload, status_id, created_at, updated_at from tasks ", - ); - - if let Some(status) = status { - builder.push("where status_id = ").push_bind(status); - } - - builder.push("ORDER BY created_at DESC "); - - if let Some(limit) = limit { - builder.push("LIMIT ").push_bind(limit); - } - builder - } - - pub async fn get_tasks( - &self, - status: Option, - limit: Option, - ) -> crate::Result>> { - let mut builder = Self::get_task_builder(status, limit); - - let tasks: Vec> = builder.build_query_as().fetch_all(&self.pool).await?; - - Ok(tasks) - } - - #[instrument(skip(self, values))] - pub async fn load_tasks(&self, values: Vec) -> crate::Result<()> - where - T: TaskPayload + Serialize + std::fmt::Debug, - { - let mut tx = self.pool.begin().await?; - let mut builder: QueryBuilder<'_, Sqlite> = - QueryBuilder::new("insert into tasks(payload_key, payload, status_id)"); - - let args: crate::Result> = values - .iter() - .map(|value| Ok((value.get_key(), serde_json::to_string(value)?))) - .collect(); - - let mut affected_rows = 0; - // Chunk the query by the size limit of bind params - for chunk in args?.chunks(SQLITE_BIND_LIMIT / 3) { - builder.push_values(chunk, |mut builder, item| { - builder - .push_bind(&item.0) - .push_bind(&item.1) - .push_bind(TaskStatus::Pending); - }); - builder.push("ON conflict (payload_key) DO NOTHING"); - - let query = builder.build(); - - affected_rows += query.execute(&mut *tx).await?.rows_affected(); - builder.reset(); - } - - tx.commit().await?; - - info!("{} rows inserted.", affected_rows); - - Ok(()) - } - - pub async fn run_tasks(&self, func: TaskJob) -> crate::Result<()> { - let mut builder = Self::get_task_builder(Some(TaskStatus::Pending), None); - - let rows = builder.build_query_as::>().fetch(&self.pool); - - let result: Vec<(Task, TaskStatus)> = rows.map(|x| { - let task = x.unwrap(); - let status = func(&task); - - (task, status) - }).collect().await; - - Ok(()) - } -} diff --git a/lib_sync_core/src/tasks.rs b/lib_sync_core/src/tasks.rs new file mode 100644 index 0000000..04c22b7 --- /dev/null +++ b/lib_sync_core/src/tasks.rs @@ -0,0 +1,85 @@ +use chrono::Utc; +use futures::StreamExt; +use serde::de::DeserializeOwned; +use serde::Serialize; +use std::fmt::Display; +use tabled::Tabled; + +mod manager; + +#[derive(sqlx::Type, Debug, Clone)] +#[repr(u8)] +pub enum TaskStatus { + Pending = 1, + InProgress = 2, + Completed = 3, + Failed = 4, +} + +impl Display for TaskStatus { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + TaskStatus::Pending => { + write!(f, "Pending") + } + TaskStatus::InProgress => { + write!(f, "In Progress") + } + TaskStatus::Completed => { + write!(f, "Completed") + } + TaskStatus::Failed => { + write!(f, "Failed") + } + } + } +} + +pub trait TaskPayloadKey { + fn get_key(&self) -> String; +} + +pub trait TaskPayload: + Serialize + DeserializeOwned + Send + Unpin + 'static + Display + TaskPayloadKey +{ +} +impl + TaskPayload for T +{ +} + +pub type TaskJob = fn(&Task) -> TaskStatus; + +#[derive(sqlx::FromRow, Tabled, Debug)] +pub struct Task { + id: u32, + payload_key: String, + #[sqlx(json)] + #[tabled(skip)] + payload: T, + #[sqlx(rename = "status_id")] + status: TaskStatus, + created_at: chrono::DateTime, + #[tabled(display = "display_option_date")] + updated_at: Option>, +} + +impl Task { + pub fn payload(&self) -> &T { + &self.payload + } +} + +impl Task { + pub fn get_key(&self) -> String { + self.payload_key.clone() + } +} + +fn display_option_date(o: &Option>) -> String { + match o { + Some(s) => format!("{}", s), + None => String::from(""), + } +} + diff --git a/lib_sync_core/src/tasks/manager.rs b/lib_sync_core/src/tasks/manager.rs new file mode 100644 index 0000000..bee3cb8 --- /dev/null +++ b/lib_sync_core/src/tasks/manager.rs @@ -0,0 +1,32 @@ +use futures::StreamExt; +use std::marker::PhantomData; +use crate::database::TaskStorage; +use crate::tasks::{Task, TaskJob, TaskPayload, TaskStatus}; + +struct TaskManager> +{ + storage: T, + _marker: PhantomData, +} + +impl> TaskManager { + pub fn new(storage: T) -> Self { + Self { + storage, + _marker: PhantomData, + } + } + + pub async fn run_tasks(&self, func: TaskJob) -> crate::Result<()> { + let rows = self.storage.get_tasks(TaskStatus::Pending); + + let result: Vec<(Task, TaskStatus)> = rows.map(|x| { + let task = x.unwrap(); + let status = func(&task); + + (task, status) + }).collect().await; + + Ok(()) + } +} \ No newline at end of file