diff --git a/.idea/codeStyles/codeStyleConfig.xml b/.idea/codeStyles/codeStyleConfig.xml
new file mode 100644
index 0000000..a55e7a1
--- /dev/null
+++ b/.idea/codeStyles/codeStyleConfig.xml
@@ -0,0 +1,5 @@
+
+
+
+
+
\ No newline at end of file
diff --git a/Cargo.lock b/Cargo.lock
index 5daa196..827a82a 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -110,6 +110,28 @@ dependencies = [
"windows-sys 0.59.0",
]
+[[package]]
+name = "async-stream"
+version = "0.3.6"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "0b5a71a6f37880a80d1d7f19efd781e4b5de42c88f0722cc13bcb6cc2cfe8476"
+dependencies = [
+ "async-stream-impl",
+ "futures-core",
+ "pin-project-lite",
+]
+
+[[package]]
+name = "async-stream-impl"
+version = "0.3.6"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "c7c24de15d275a1ecfd47a380fb4d5ec9bfe0933f309ed5e705b775596a3574d"
+dependencies = [
+ "proc-macro2",
+ "quote",
+ "syn",
+]
+
[[package]]
name = "atoi"
version = "2.0.0"
@@ -941,6 +963,7 @@ dependencies = [
name = "lib_sync_core"
version = "0.1.0"
dependencies = [
+ "async-stream",
"chrono",
"clap",
"directories",
diff --git a/lib_sync_core/Cargo.toml b/lib_sync_core/Cargo.toml
index c6d73f7..c2bd944 100644
--- a/lib_sync_core/Cargo.toml
+++ b/lib_sync_core/Cargo.toml
@@ -18,3 +18,4 @@ tracing-core = "0.1.33"
tabled = "0.19.0"
futures = "0.3.31"
thiserror = "2.0.12"
+async-stream = "0.3.6"
diff --git a/lib_sync_core/src/database.rs b/lib_sync_core/src/database.rs
new file mode 100644
index 0000000..0674329
--- /dev/null
+++ b/lib_sync_core/src/database.rs
@@ -0,0 +1,197 @@
+use crate::task_manager::{Task, TaskPayload, TaskStatus};
+use futures::stream::BoxStream;
+use futures::{Stream, StreamExt, TryStreamExt};
+use serde::Serialize;
+use sqlx::sqlite::{SqliteConnectOptions, SqliteJournalMode};
+use sqlx::{Error, QueryBuilder, Sqlite, SqlitePool};
+use std::path::PathBuf;
+use tokio::fs;
+use tracing::{info, instrument};
+
+static SQLITE_BIND_LIMIT: usize = 32766;
+
+#[derive(Default, Clone)]
+pub struct TaskPagination {
+ page_size: usize,
+ limit: Option,
+ offset: Option,
+ status: Option,
+}
+
+impl TaskPagination {
+ pub fn new() -> Self {
+ Self::default()
+ }
+
+ pub fn next(&self) -> Self {
+ Self {
+ page_size: self.page_size + self.page_size,
+ ..self.clone()
+ }
+ }
+
+ pub fn prev(&self) -> Self {
+ Self {
+ page_size: self.page_size - self.page_size,
+ ..self.clone()
+ }
+ }
+
+ pub fn set_page_size(&mut self, page_size: usize) {
+ self.page_size = page_size;
+ }
+
+ pub fn set_limit(&mut self, limit: Option) {
+ self.limit = limit;
+ }
+
+ pub fn set_offset(&mut self, offset: Option) {
+ self.offset = offset;
+ }
+
+ pub fn set_status(&mut self, status: Option) {
+ self.status = status;
+ }
+}
+
+pub struct TasksPage {
+ tasks: Vec>,
+ page: TaskPagination
+}
+
+impl TasksPage {
+ fn new(tasks: Vec>, page: TaskPagination) -> Self {
+ Self {
+ tasks,
+ page
+ }
+ }
+
+ pub fn next(&self) -> TaskPagination {
+ self.page.next()
+ }
+
+ pub fn prev(&self) -> TaskPagination {
+ self.page.prev()
+ }
+}
+
+pub trait TaskStorage {
+ fn insert_tasks(&self, tasks: Vec>) -> crate::Result<()>;
+ fn get_tasks(&self, options: TaskStatus) -> impl Stream- >>;
+
+ async fn get_paginated_tasks(&self, page: &TaskPagination) -> crate::Result>;
+}
+
+#[derive(Debug)]
+pub struct Database {
+ pool: SqlitePool,
+}
+
+impl Database {
+ pub async fn new>(base_path: P) -> crate::Result {
+ Ok(Self {
+ pool: Self::connect_database(base_path).await?,
+ })
+ }
+
+ async fn connect_database>(base_path: P) -> crate::Result {
+ let base_path = base_path.into();
+
+ let database_file_path = base_path.join("db.sql");
+
+ fs::create_dir_all(base_path).await?;
+
+ let opts = SqliteConnectOptions::new()
+ .filename(database_file_path)
+ .create_if_missing(true)
+ .journal_mode(SqliteJournalMode::Wal);
+
+ let pool = SqlitePool::connect_with(opts).await?;
+
+ sqlx::migrate!("../migrations").run(&pool).await?;
+
+ Ok(pool)
+ }
+
+ #[instrument(skip(self, values))]
+ pub async fn load_tasks(&self, values: Vec) -> crate::Result<()>
+ where
+ T: TaskPayload + Serialize + std::fmt::Debug,
+ {
+ let mut tx = self.pool.begin().await?;
+ let mut builder: QueryBuilder<'_, Sqlite> =
+ QueryBuilder::new("insert into tasks(payload_key, payload, status_id)");
+
+ let args: crate::Result> = values
+ .iter()
+ .map(|value| Ok((value.get_key(), serde_json::to_string(value)?)))
+ .collect();
+
+ let mut affected_rows = 0;
+ // Chunk the query by the size limit of bind params
+ for chunk in args?.chunks(SQLITE_BIND_LIMIT / 3) {
+ builder.push_values(chunk, |mut builder, item| {
+ builder
+ .push_bind(&item.0)
+ .push_bind(&item.1)
+ .push_bind(TaskStatus::Pending);
+ });
+ builder.push("ON conflict (payload_key) DO NOTHING");
+
+ let query = builder.build();
+
+ affected_rows += query.execute(&mut *tx).await?.rows_affected();
+ builder.reset();
+ }
+
+ tx.commit().await?;
+
+ info!("{} rows inserted.", affected_rows);
+
+ Ok(())
+ }
+}
+
+impl TaskStorage for Database {
+ fn insert_tasks(&self, tasks: Vec>) -> crate::error::Result<()> {
+ todo!()
+ }
+
+ fn get_tasks(&self, task_status: TaskStatus) -> impl Stream
- >> {
+ let query= sqlx::query_as::<_, Task>(
+ "
+ SELECT id, payload_key, payload, status_id, created_at, updated_at
+ FROM tasks
+ WHERE status_id = ?
+ ORDER BY created_at DESC
+ ",
+ ).bind(task_status);
+
+ query.fetch(&self.pool).err_into::()
+ }
+
+ async fn get_paginated_tasks(&self, page: &TaskPagination) -> crate::Result> {
+ let mut builder: QueryBuilder<'_, Sqlite> = QueryBuilder::new(
+ "select id, payload_key, payload, status_id, created_at, updated_at from tasks ",
+ );
+
+ if let Some(status) = &page.status {
+ builder.push("where status_id = ").push_bind(status);
+ }
+
+ builder.push("ORDER BY created_at DESC ");
+
+ if let Some(limit) = &page.offset {
+ builder.push("OFFSET ").push_bind(limit);
+ }
+
+ if let Some(limit) = &page.limit {
+ builder.push("LIMIT ").push_bind(limit);
+ }
+
+ let tasks = builder.build_query_as::>().fetch_all(&self.pool).await?;
+
+ Ok(TasksPage::new(tasks, page.clone()))
+ }
+}
diff --git a/lib_sync_core/src/lib.rs b/lib_sync_core/src/lib.rs
index 810ba8e..0feb7fb 100644
--- a/lib_sync_core/src/lib.rs
+++ b/lib_sync_core/src/lib.rs
@@ -2,6 +2,7 @@ pub mod error;
pub(crate) use error::*;
pub mod task_manager;
+mod database;
pub fn add(left: u64, right: u64) -> u64 {
left + right
diff --git a/lib_sync_core/src/task_manager.rs b/lib_sync_core/src/task_manager.rs
index 57a2556..8d7cdd4 100644
--- a/lib_sync_core/src/task_manager.rs
+++ b/lib_sync_core/src/task_manager.rs
@@ -8,13 +8,12 @@ use sqlx::sqlite::{SqliteConnectOptions, SqliteJournalMode};
use sqlx::{QueryBuilder, Sqlite, SqlitePool};
use std::fmt::Display;
use std::path::PathBuf;
+use futures::stream::BoxStream;
use tabled::Tabled;
use tokio::fs;
use tracing::{info, instrument};
-static SQLITE_BIND_LIMIT: usize = 32766;
-
-#[derive(sqlx::Type, Debug)]
+#[derive(sqlx::Type, Debug, Clone)]
#[repr(u8)]
pub enum TaskStatus {
Pending = 1,
@@ -42,11 +41,14 @@ impl Display for TaskStatus {
}
}
-pub trait TaskPayload {
+pub trait TaskPayloadKey {
fn get_key(&self) -> String;
}
-pub type TaskJob = fn(&Task) -> TaskStatus;
+pub trait TaskPayload: DeserializeOwned + Send + Unpin + 'static + Display + TaskPayloadKey {}
+impl TaskPayload for T {}
+
+pub type TaskJob = fn(&Task) -> TaskStatus;
#[derive(sqlx::FromRow, Tabled, Debug)]
pub struct Task {
@@ -75,126 +77,22 @@ fn display_option_date(o: &Option>) -> String {
}
}
-pub trait _Task: DeserializeOwned + Send + Unpin + 'static + Display {}
-impl _Task for T {}
-#[derive(Debug)]
-pub struct TaskManager {
- base_path: PathBuf,
- pool: SqlitePool,
-}
+struct TaskManager{}
impl TaskManager {
- pub async fn new>(base_path: P) -> Result {
- let base_path = base_path.into();
- let pool = Self::connect_database(base_path.clone()).await?;
- Ok(Self {
- base_path,
- pool,
- })
- }
-
- async fn connect_database>(base_path: P) -> crate::Result {
- let base_path = base_path.into();
-
- let database_file_path = base_path.join("db.sql");
-
- fs::create_dir_all(base_path).await?;
-
- let opts = SqliteConnectOptions::new()
- .filename(database_file_path)
- .create_if_missing(true)
- .journal_mode(SqliteJournalMode::Wal);
-
- let pool = SqlitePool::connect_with(opts).await?;
-
- sqlx::migrate!("../migrations").run(&pool).await?;
-
- Ok(pool)
- }
-
- fn get_task_builder(
- status: Option,
- limit: Option,
- ) -> QueryBuilder<'static, Sqlite> {
- let mut builder: QueryBuilder<'_, Sqlite> = QueryBuilder::new(
- "select id, payload_key, payload, status_id, created_at, updated_at from tasks ",
- );
-
- if let Some(status) = status {
- builder.push("where status_id = ").push_bind(status);
- }
-
- builder.push("ORDER BY created_at DESC ");
-
- if let Some(limit) = limit {
- builder.push("LIMIT ").push_bind(limit);
- }
- builder
- }
-
- pub async fn get_tasks(
- &self,
- status: Option,
- limit: Option,
- ) -> crate::Result>> {
- let mut builder = Self::get_task_builder(status, limit);
-
- let tasks: Vec> = builder.build_query_as().fetch_all(&self.pool).await?;
-
- Ok(tasks)
- }
-
- #[instrument(skip(self, values))]
- pub async fn load_tasks(&self, values: Vec) -> crate::Result<()>
- where
- T: TaskPayload + Serialize + std::fmt::Debug,
- {
- let mut tx = self.pool.begin().await?;
- let mut builder: QueryBuilder<'_, Sqlite> =
- QueryBuilder::new("insert into tasks(payload_key, payload, status_id)");
-
- let args: crate::Result> = values
- .iter()
- .map(|value| Ok((value.get_key(), serde_json::to_string(value)?)))
- .collect();
-
- let mut affected_rows = 0;
- // Chunk the query by the size limit of bind params
- for chunk in args?.chunks(SQLITE_BIND_LIMIT / 3) {
- builder.push_values(chunk, |mut builder, item| {
- builder
- .push_bind(&item.0)
- .push_bind(&item.1)
- .push_bind(TaskStatus::Pending);
- });
- builder.push("ON conflict (payload_key) DO NOTHING");
-
- let query = builder.build();
-
- affected_rows += query.execute(&mut *tx).await?.rows_affected();
- builder.reset();
- }
-
- tx.commit().await?;
-
- info!("{} rows inserted.", affected_rows);
-
- Ok(())
- }
-
- pub async fn run_tasks(&self, func: TaskJob) -> crate::Result<()> {
- let mut builder = Self::get_task_builder(Some(TaskStatus::Pending), None);
-
- let rows = builder.build_query_as::>().fetch(&self.pool);
-
- let result: Vec<(Task, TaskStatus)> = rows.map(|x| {
- let task = x.unwrap();
- let status = func(&task);
-
- (task, status)
- }).collect().await;
-
- Ok(())
- }
+ // pub async fn run_tasks(&self, func: TaskJob) -> crate::Result<()> {
+ // let mut builder = Self::get_task_builder(Some(TaskStatus::Pending), None);
+ //
+ // let rows = builder.build_query_as::>().fetch(&self.pool);
+ //
+ // let result: Vec<(Task, TaskStatus)> = rows.map(|x| {
+ // let task = x.unwrap();
+ // let status = func(&task);
+ //
+ // (task, status)
+ // }).collect().await;
+ //
+ // Ok(())
+ // }
}