From 4199a97a195b5da1a976cfc5365eac806ea10de6 Mon Sep 17 00:00:00 2001 From: aleidk Date: Thu, 15 May 2025 16:53:11 -0400 Subject: [PATCH] refactor(lib_core): introduce database module refs: #5 --- .idea/codeStyles/codeStyleConfig.xml | 5 + Cargo.lock | 23 ++++ lib_sync_core/Cargo.toml | 1 + lib_sync_core/src/database.rs | 197 +++++++++++++++++++++++++++ lib_sync_core/src/lib.rs | 1 + lib_sync_core/src/task_manager.rs | 146 +++----------------- 6 files changed, 249 insertions(+), 124 deletions(-) create mode 100644 .idea/codeStyles/codeStyleConfig.xml create mode 100644 lib_sync_core/src/database.rs diff --git a/.idea/codeStyles/codeStyleConfig.xml b/.idea/codeStyles/codeStyleConfig.xml new file mode 100644 index 0000000..a55e7a1 --- /dev/null +++ b/.idea/codeStyles/codeStyleConfig.xml @@ -0,0 +1,5 @@ + + + + \ No newline at end of file diff --git a/Cargo.lock b/Cargo.lock index 5daa196..827a82a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -110,6 +110,28 @@ dependencies = [ "windows-sys 0.59.0", ] +[[package]] +name = "async-stream" +version = "0.3.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b5a71a6f37880a80d1d7f19efd781e4b5de42c88f0722cc13bcb6cc2cfe8476" +dependencies = [ + "async-stream-impl", + "futures-core", + "pin-project-lite", +] + +[[package]] +name = "async-stream-impl" +version = "0.3.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c7c24de15d275a1ecfd47a380fb4d5ec9bfe0933f309ed5e705b775596a3574d" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "atoi" version = "2.0.0" @@ -941,6 +963,7 @@ dependencies = [ name = "lib_sync_core" version = "0.1.0" dependencies = [ + "async-stream", "chrono", "clap", "directories", diff --git a/lib_sync_core/Cargo.toml b/lib_sync_core/Cargo.toml index c6d73f7..c2bd944 100644 --- a/lib_sync_core/Cargo.toml +++ b/lib_sync_core/Cargo.toml @@ -18,3 +18,4 @@ tracing-core = "0.1.33" tabled = "0.19.0" futures = "0.3.31" thiserror = "2.0.12" +async-stream = "0.3.6" diff --git a/lib_sync_core/src/database.rs b/lib_sync_core/src/database.rs new file mode 100644 index 0000000..0674329 --- /dev/null +++ b/lib_sync_core/src/database.rs @@ -0,0 +1,197 @@ +use crate::task_manager::{Task, TaskPayload, TaskStatus}; +use futures::stream::BoxStream; +use futures::{Stream, StreamExt, TryStreamExt}; +use serde::Serialize; +use sqlx::sqlite::{SqliteConnectOptions, SqliteJournalMode}; +use sqlx::{Error, QueryBuilder, Sqlite, SqlitePool}; +use std::path::PathBuf; +use tokio::fs; +use tracing::{info, instrument}; + +static SQLITE_BIND_LIMIT: usize = 32766; + +#[derive(Default, Clone)] +pub struct TaskPagination { + page_size: usize, + limit: Option, + offset: Option, + status: Option, +} + +impl TaskPagination { + pub fn new() -> Self { + Self::default() + } + + pub fn next(&self) -> Self { + Self { + page_size: self.page_size + self.page_size, + ..self.clone() + } + } + + pub fn prev(&self) -> Self { + Self { + page_size: self.page_size - self.page_size, + ..self.clone() + } + } + + pub fn set_page_size(&mut self, page_size: usize) { + self.page_size = page_size; + } + + pub fn set_limit(&mut self, limit: Option) { + self.limit = limit; + } + + pub fn set_offset(&mut self, offset: Option) { + self.offset = offset; + } + + pub fn set_status(&mut self, status: Option) { + self.status = status; + } +} + +pub struct TasksPage { + tasks: Vec>, + page: TaskPagination +} + +impl TasksPage { + fn new(tasks: Vec>, page: TaskPagination) -> Self { + Self { + tasks, + page + } + } + + pub fn next(&self) -> TaskPagination { + self.page.next() + } + + pub fn prev(&self) -> TaskPagination { + self.page.prev() + } +} + +pub trait TaskStorage { + fn insert_tasks(&self, tasks: Vec>) -> crate::Result<()>; + fn get_tasks(&self, options: TaskStatus) -> impl Stream>>; + + async fn get_paginated_tasks(&self, page: &TaskPagination) -> crate::Result>; +} + +#[derive(Debug)] +pub struct Database { + pool: SqlitePool, +} + +impl Database { + pub async fn new>(base_path: P) -> crate::Result { + Ok(Self { + pool: Self::connect_database(base_path).await?, + }) + } + + async fn connect_database>(base_path: P) -> crate::Result { + let base_path = base_path.into(); + + let database_file_path = base_path.join("db.sql"); + + fs::create_dir_all(base_path).await?; + + let opts = SqliteConnectOptions::new() + .filename(database_file_path) + .create_if_missing(true) + .journal_mode(SqliteJournalMode::Wal); + + let pool = SqlitePool::connect_with(opts).await?; + + sqlx::migrate!("../migrations").run(&pool).await?; + + Ok(pool) + } + + #[instrument(skip(self, values))] + pub async fn load_tasks(&self, values: Vec) -> crate::Result<()> + where + T: TaskPayload + Serialize + std::fmt::Debug, + { + let mut tx = self.pool.begin().await?; + let mut builder: QueryBuilder<'_, Sqlite> = + QueryBuilder::new("insert into tasks(payload_key, payload, status_id)"); + + let args: crate::Result> = values + .iter() + .map(|value| Ok((value.get_key(), serde_json::to_string(value)?))) + .collect(); + + let mut affected_rows = 0; + // Chunk the query by the size limit of bind params + for chunk in args?.chunks(SQLITE_BIND_LIMIT / 3) { + builder.push_values(chunk, |mut builder, item| { + builder + .push_bind(&item.0) + .push_bind(&item.1) + .push_bind(TaskStatus::Pending); + }); + builder.push("ON conflict (payload_key) DO NOTHING"); + + let query = builder.build(); + + affected_rows += query.execute(&mut *tx).await?.rows_affected(); + builder.reset(); + } + + tx.commit().await?; + + info!("{} rows inserted.", affected_rows); + + Ok(()) + } +} + +impl TaskStorage for Database { + fn insert_tasks(&self, tasks: Vec>) -> crate::error::Result<()> { + todo!() + } + + fn get_tasks(&self, task_status: TaskStatus) -> impl Stream>> { + let query= sqlx::query_as::<_, Task>( + " + SELECT id, payload_key, payload, status_id, created_at, updated_at + FROM tasks + WHERE status_id = ? + ORDER BY created_at DESC + ", + ).bind(task_status); + + query.fetch(&self.pool).err_into::() + } + + async fn get_paginated_tasks(&self, page: &TaskPagination) -> crate::Result> { + let mut builder: QueryBuilder<'_, Sqlite> = QueryBuilder::new( + "select id, payload_key, payload, status_id, created_at, updated_at from tasks ", + ); + + if let Some(status) = &page.status { + builder.push("where status_id = ").push_bind(status); + } + + builder.push("ORDER BY created_at DESC "); + + if let Some(limit) = &page.offset { + builder.push("OFFSET ").push_bind(limit); + } + + if let Some(limit) = &page.limit { + builder.push("LIMIT ").push_bind(limit); + } + + let tasks = builder.build_query_as::>().fetch_all(&self.pool).await?; + + Ok(TasksPage::new(tasks, page.clone())) + } +} diff --git a/lib_sync_core/src/lib.rs b/lib_sync_core/src/lib.rs index 810ba8e..0feb7fb 100644 --- a/lib_sync_core/src/lib.rs +++ b/lib_sync_core/src/lib.rs @@ -2,6 +2,7 @@ pub mod error; pub(crate) use error::*; pub mod task_manager; +mod database; pub fn add(left: u64, right: u64) -> u64 { left + right diff --git a/lib_sync_core/src/task_manager.rs b/lib_sync_core/src/task_manager.rs index 57a2556..8d7cdd4 100644 --- a/lib_sync_core/src/task_manager.rs +++ b/lib_sync_core/src/task_manager.rs @@ -8,13 +8,12 @@ use sqlx::sqlite::{SqliteConnectOptions, SqliteJournalMode}; use sqlx::{QueryBuilder, Sqlite, SqlitePool}; use std::fmt::Display; use std::path::PathBuf; +use futures::stream::BoxStream; use tabled::Tabled; use tokio::fs; use tracing::{info, instrument}; -static SQLITE_BIND_LIMIT: usize = 32766; - -#[derive(sqlx::Type, Debug)] +#[derive(sqlx::Type, Debug, Clone)] #[repr(u8)] pub enum TaskStatus { Pending = 1, @@ -42,11 +41,14 @@ impl Display for TaskStatus { } } -pub trait TaskPayload { +pub trait TaskPayloadKey { fn get_key(&self) -> String; } -pub type TaskJob = fn(&Task) -> TaskStatus; +pub trait TaskPayload: DeserializeOwned + Send + Unpin + 'static + Display + TaskPayloadKey {} +impl TaskPayload for T {} + +pub type TaskJob = fn(&Task) -> TaskStatus; #[derive(sqlx::FromRow, Tabled, Debug)] pub struct Task { @@ -75,126 +77,22 @@ fn display_option_date(o: &Option>) -> String { } } -pub trait _Task: DeserializeOwned + Send + Unpin + 'static + Display {} -impl _Task for T {} -#[derive(Debug)] -pub struct TaskManager { - base_path: PathBuf, - pool: SqlitePool, -} +struct TaskManager{} impl TaskManager { - pub async fn new>(base_path: P) -> Result { - let base_path = base_path.into(); - let pool = Self::connect_database(base_path.clone()).await?; - Ok(Self { - base_path, - pool, - }) - } - - async fn connect_database>(base_path: P) -> crate::Result { - let base_path = base_path.into(); - - let database_file_path = base_path.join("db.sql"); - - fs::create_dir_all(base_path).await?; - - let opts = SqliteConnectOptions::new() - .filename(database_file_path) - .create_if_missing(true) - .journal_mode(SqliteJournalMode::Wal); - - let pool = SqlitePool::connect_with(opts).await?; - - sqlx::migrate!("../migrations").run(&pool).await?; - - Ok(pool) - } - - fn get_task_builder( - status: Option, - limit: Option, - ) -> QueryBuilder<'static, Sqlite> { - let mut builder: QueryBuilder<'_, Sqlite> = QueryBuilder::new( - "select id, payload_key, payload, status_id, created_at, updated_at from tasks ", - ); - - if let Some(status) = status { - builder.push("where status_id = ").push_bind(status); - } - - builder.push("ORDER BY created_at DESC "); - - if let Some(limit) = limit { - builder.push("LIMIT ").push_bind(limit); - } - builder - } - - pub async fn get_tasks( - &self, - status: Option, - limit: Option, - ) -> crate::Result>> { - let mut builder = Self::get_task_builder(status, limit); - - let tasks: Vec> = builder.build_query_as().fetch_all(&self.pool).await?; - - Ok(tasks) - } - - #[instrument(skip(self, values))] - pub async fn load_tasks(&self, values: Vec) -> crate::Result<()> - where - T: TaskPayload + Serialize + std::fmt::Debug, - { - let mut tx = self.pool.begin().await?; - let mut builder: QueryBuilder<'_, Sqlite> = - QueryBuilder::new("insert into tasks(payload_key, payload, status_id)"); - - let args: crate::Result> = values - .iter() - .map(|value| Ok((value.get_key(), serde_json::to_string(value)?))) - .collect(); - - let mut affected_rows = 0; - // Chunk the query by the size limit of bind params - for chunk in args?.chunks(SQLITE_BIND_LIMIT / 3) { - builder.push_values(chunk, |mut builder, item| { - builder - .push_bind(&item.0) - .push_bind(&item.1) - .push_bind(TaskStatus::Pending); - }); - builder.push("ON conflict (payload_key) DO NOTHING"); - - let query = builder.build(); - - affected_rows += query.execute(&mut *tx).await?.rows_affected(); - builder.reset(); - } - - tx.commit().await?; - - info!("{} rows inserted.", affected_rows); - - Ok(()) - } - - pub async fn run_tasks(&self, func: TaskJob) -> crate::Result<()> { - let mut builder = Self::get_task_builder(Some(TaskStatus::Pending), None); - - let rows = builder.build_query_as::>().fetch(&self.pool); - - let result: Vec<(Task, TaskStatus)> = rows.map(|x| { - let task = x.unwrap(); - let status = func(&task); - - (task, status) - }).collect().await; - - Ok(()) - } + // pub async fn run_tasks(&self, func: TaskJob) -> crate::Result<()> { + // let mut builder = Self::get_task_builder(Some(TaskStatus::Pending), None); + // + // let rows = builder.build_query_as::>().fetch(&self.pool); + // + // let result: Vec<(Task, TaskStatus)> = rows.map(|x| { + // let task = x.unwrap(); + // let status = func(&task); + // + // (task, status) + // }).collect().await; + // + // Ok(()) + // } }