diff --git a/.idea/codeStyles/codeStyleConfig.xml b/.idea/codeStyles/codeStyleConfig.xml
new file mode 100644
index 0000000..a55e7a1
--- /dev/null
+++ b/.idea/codeStyles/codeStyleConfig.xml
@@ -0,0 +1,5 @@
+
+
+
+
+
\ No newline at end of file
diff --git a/Cargo.lock b/Cargo.lock
index 5daa196..827a82a 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -110,6 +110,28 @@ dependencies = [
"windows-sys 0.59.0",
]
+[[package]]
+name = "async-stream"
+version = "0.3.6"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "0b5a71a6f37880a80d1d7f19efd781e4b5de42c88f0722cc13bcb6cc2cfe8476"
+dependencies = [
+ "async-stream-impl",
+ "futures-core",
+ "pin-project-lite",
+]
+
+[[package]]
+name = "async-stream-impl"
+version = "0.3.6"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "c7c24de15d275a1ecfd47a380fb4d5ec9bfe0933f309ed5e705b775596a3574d"
+dependencies = [
+ "proc-macro2",
+ "quote",
+ "syn",
+]
+
[[package]]
name = "atoi"
version = "2.0.0"
@@ -941,6 +963,7 @@ dependencies = [
name = "lib_sync_core"
version = "0.1.0"
dependencies = [
+ "async-stream",
"chrono",
"clap",
"directories",
diff --git a/cli/bin/readwise/external_interface.rs b/cli/bin/readwise/external_interface.rs
index 13b2f65..a484ca9 100644
--- a/cli/bin/readwise/external_interface.rs
+++ b/cli/bin/readwise/external_interface.rs
@@ -1,4 +1,4 @@
-use lib_sync_core::task_manager::TaskPayload;
+use lib_sync_core::tasks::TaskPayload;
use chrono::{DateTime, Local};
use serde::{de, Deserialize, Deserializer, Serialize};
use serde_json::Value;
diff --git a/cli/bin/readwise/main.rs b/cli/bin/readwise/main.rs
index bdfc584..29eee78 100644
--- a/cli/bin/readwise/main.rs
+++ b/cli/bin/readwise/main.rs
@@ -4,7 +4,7 @@ use figment::{
Figment,
providers::{Env, Serialized},
};
-use lib_sync_core::task_manager::{TaskManager, TaskStatus};
+use lib_sync_core::tasks::{TaskStatus};
use cli::config::{Command, Config};
use cli::{Error, Result};
use std::fs::File;
diff --git a/lib_sync_core/Cargo.toml b/lib_sync_core/Cargo.toml
index c6d73f7..c2bd944 100644
--- a/lib_sync_core/Cargo.toml
+++ b/lib_sync_core/Cargo.toml
@@ -18,3 +18,4 @@ tracing-core = "0.1.33"
tabled = "0.19.0"
futures = "0.3.31"
thiserror = "2.0.12"
+async-stream = "0.3.6"
diff --git a/lib_sync_core/src/database.rs b/lib_sync_core/src/database.rs
new file mode 100644
index 0000000..f2bc877
--- /dev/null
+++ b/lib_sync_core/src/database.rs
@@ -0,0 +1,77 @@
+use crate::tasks::{Task, TaskPayload, TaskStatus};
+use futures::{Stream};
+mod sqlite;
+
+#[derive(Default, Clone)]
+pub struct TaskPagination {
+ page_size: usize,
+ limit: Option,
+ offset: Option,
+ status: Option,
+}
+
+impl TaskPagination {
+ pub fn new() -> Self {
+ Self::default()
+ }
+
+ pub fn next(&self) -> Self {
+ Self {
+ page_size: self.page_size + self.page_size,
+ ..self.clone()
+ }
+ }
+
+ pub fn prev(&self) -> Self {
+ Self {
+ page_size: self.page_size - self.page_size,
+ ..self.clone()
+ }
+ }
+
+ pub fn set_page_size(&mut self, page_size: usize) {
+ self.page_size = page_size;
+ }
+
+ pub fn set_limit(&mut self, limit: Option) {
+ self.limit = limit;
+ }
+
+ pub fn set_offset(&mut self, offset: Option) {
+ self.offset = offset;
+ }
+
+ pub fn set_status(&mut self, status: Option) {
+ self.status = status;
+ }
+}
+
+pub struct TasksPage {
+ tasks: Vec>,
+ page: TaskPagination
+}
+
+impl TasksPage {
+ fn new(tasks: Vec>, page: TaskPagination) -> Self {
+ Self {
+ tasks,
+ page
+ }
+ }
+
+ pub fn next(&self) -> TaskPagination {
+ self.page.next()
+ }
+
+ pub fn prev(&self) -> TaskPagination {
+ self.page.prev()
+ }
+}
+
+pub trait TaskStorage {
+ async fn insert_tasks(&self, tasks: Vec>) -> crate::Result<()>;
+ fn get_tasks(&self, options: TaskStatus) -> impl Stream- >>;
+
+ async fn get_paginated_tasks(&self, page: &TaskPagination) -> crate::Result>;
+}
+
diff --git a/lib_sync_core/src/database/sqlite.rs b/lib_sync_core/src/database/sqlite.rs
new file mode 100644
index 0000000..c7fc993
--- /dev/null
+++ b/lib_sync_core/src/database/sqlite.rs
@@ -0,0 +1,117 @@
+use sqlx::{QueryBuilder, SqlitePool};
+use std::path::PathBuf;
+use tokio::fs;
+use sqlx::sqlite::{SqliteConnectOptions, SqliteJournalMode};
+use tracing::{info, instrument};
+use futures::{Stream, TryStreamExt};
+use crate::database::{TaskPagination, TaskStorage, TasksPage};
+use crate::tasks::{Task, TaskPayload, TaskStatus};
+
+#[allow(unused)]
+static SQLITE_BIND_LIMIT: usize = 32766;
+
+#[derive(Debug)]
+pub struct Sqlite {
+ pool: SqlitePool,
+}
+
+impl Sqlite {
+ pub async fn new>(base_path: P) -> crate::Result {
+ Ok(Self {
+ pool: Self::connect_database(base_path).await?,
+ })
+ }
+
+ async fn connect_database>(base_path: P) -> crate::Result {
+ let base_path = base_path.into();
+
+ let database_file_path = base_path.join("db.sql");
+
+ fs::create_dir_all(base_path).await?;
+
+ let opts = SqliteConnectOptions::new()
+ .filename(database_file_path)
+ .create_if_missing(true)
+ .journal_mode(SqliteJournalMode::Wal);
+
+ let pool = SqlitePool::connect_with(opts).await?;
+
+ sqlx::migrate!("../migrations").run(&pool).await?;
+
+ Ok(pool)
+ }
+}
+
+impl TaskStorage for Sqlite {
+ #[instrument(skip(self, tasks))]
+ async fn insert_tasks(&self, tasks: Vec>) -> crate::Result<()> {
+ let mut tx = self.pool.begin().await?;
+ let mut builder: QueryBuilder<'_, sqlx::Sqlite> =
+ QueryBuilder::new("insert into tasks(payload_key, payload, status_id)");
+
+ let args: crate::Result> = tasks
+ .iter()
+ .map(|value| Ok((value.get_key(), serde_json::to_string(value.payload())?)))
+ .collect();
+
+ let mut affected_rows = 0;
+ // Chunk the query by the size limit of bind params
+ for chunk in args?.chunks(SQLITE_BIND_LIMIT / 3) {
+ builder.push_values(chunk, |mut builder, item| {
+ builder
+ .push_bind(&item.0)
+ .push_bind(&item.1)
+ .push_bind(TaskStatus::Pending);
+ });
+ builder.push("ON conflict (payload_key) DO NOTHING");
+
+ let query = builder.build();
+
+ affected_rows += query.execute(&mut *tx).await?.rows_affected();
+ builder.reset();
+ }
+
+ tx.commit().await?;
+
+ info!("{} rows inserted.", affected_rows);
+
+ Ok(())
+ }
+
+ fn get_tasks(&self, task_status: TaskStatus) -> impl Stream
- >> {
+ let query= sqlx::query_as::<_, Task>(
+ "
+ SELECT id, payload_key, payload, status_id, created_at, updated_at
+ FROM tasks
+ WHERE status_id = ?
+ ORDER BY created_at DESC
+ ",
+ ).bind(task_status);
+
+ query.fetch(&self.pool).err_into::()
+ }
+
+ async fn get_paginated_tasks(&self, page: &TaskPagination) -> crate::Result> {
+ let mut builder: QueryBuilder<'_, sqlx::Sqlite> = QueryBuilder::new(
+ "select id, payload_key, payload, status_id, created_at, updated_at from tasks ",
+ );
+
+ if let Some(status) = &page.status {
+ builder.push("where status_id = ").push_bind(status);
+ }
+
+ builder.push("ORDER BY created_at DESC ");
+
+ if let Some(limit) = &page.offset {
+ builder.push("OFFSET ").push_bind(limit);
+ }
+
+ if let Some(limit) = &page.limit {
+ builder.push("LIMIT ").push_bind(limit);
+ }
+
+ let tasks = builder.build_query_as::>().fetch_all(&self.pool).await?;
+
+ Ok(TasksPage::new(tasks, page.clone()))
+ }
+}
\ No newline at end of file
diff --git a/lib_sync_core/src/lib.rs b/lib_sync_core/src/lib.rs
index 810ba8e..586934c 100644
--- a/lib_sync_core/src/lib.rs
+++ b/lib_sync_core/src/lib.rs
@@ -1,7 +1,8 @@
pub mod error;
pub(crate) use error::*;
-pub mod task_manager;
+pub mod tasks;
+mod database;
pub fn add(left: u64, right: u64) -> u64 {
left + right
diff --git a/lib_sync_core/src/task_manager.rs b/lib_sync_core/src/task_manager.rs
deleted file mode 100644
index 57a2556..0000000
--- a/lib_sync_core/src/task_manager.rs
+++ /dev/null
@@ -1,200 +0,0 @@
-use crate::error::Error;
-use chrono::Utc;
-use directories::ProjectDirs;
-use futures::{StreamExt, TryStreamExt};
-use serde::de::DeserializeOwned;
-use serde::Serialize;
-use sqlx::sqlite::{SqliteConnectOptions, SqliteJournalMode};
-use sqlx::{QueryBuilder, Sqlite, SqlitePool};
-use std::fmt::Display;
-use std::path::PathBuf;
-use tabled::Tabled;
-use tokio::fs;
-use tracing::{info, instrument};
-
-static SQLITE_BIND_LIMIT: usize = 32766;
-
-#[derive(sqlx::Type, Debug)]
-#[repr(u8)]
-pub enum TaskStatus {
- Pending = 1,
- InProgress = 2,
- Completed = 3,
- Failed = 4,
-}
-
-impl Display for TaskStatus {
- fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
- match self {
- TaskStatus::Pending => {
- write!(f, "Pending")
- }
- TaskStatus::InProgress => {
- write!(f, "In Progress")
- }
- TaskStatus::Completed => {
- write!(f, "Completed")
- }
- TaskStatus::Failed => {
- write!(f, "Failed")
- }
- }
- }
-}
-
-pub trait TaskPayload {
- fn get_key(&self) -> String;
-}
-
-pub type TaskJob = fn(&Task) -> TaskStatus;
-
-#[derive(sqlx::FromRow, Tabled, Debug)]
-pub struct Task {
- id: u32,
- payload_key: String,
- #[sqlx(json)]
- #[tabled(skip)]
- payload: T,
- #[sqlx(rename = "status_id")]
- status: TaskStatus,
- created_at: chrono::DateTime,
- #[tabled(display = "display_option_date")]
- updated_at: Option>,
-}
-
-impl Task {
- pub fn get_key(&self) -> String {
- self.payload_key.clone()
- }
-}
-
-fn display_option_date(o: &Option>) -> String {
- match o {
- Some(s) => format!("{}", s),
- None => String::from(""),
- }
-}
-
-pub trait _Task: DeserializeOwned + Send + Unpin + 'static + Display {}
-impl _Task for T {}
-
-#[derive(Debug)]
-pub struct TaskManager {
- base_path: PathBuf,
- pool: SqlitePool,
-}
-
-impl TaskManager {
- pub async fn new>(base_path: P) -> Result {
- let base_path = base_path.into();
- let pool = Self::connect_database(base_path.clone()).await?;
- Ok(Self {
- base_path,
- pool,
- })
- }
-
- async fn connect_database>(base_path: P) -> crate::Result {
- let base_path = base_path.into();
-
- let database_file_path = base_path.join("db.sql");
-
- fs::create_dir_all(base_path).await?;
-
- let opts = SqliteConnectOptions::new()
- .filename(database_file_path)
- .create_if_missing(true)
- .journal_mode(SqliteJournalMode::Wal);
-
- let pool = SqlitePool::connect_with(opts).await?;
-
- sqlx::migrate!("../migrations").run(&pool).await?;
-
- Ok(pool)
- }
-
- fn get_task_builder(
- status: Option,
- limit: Option,
- ) -> QueryBuilder<'static, Sqlite> {
- let mut builder: QueryBuilder<'_, Sqlite> = QueryBuilder::new(
- "select id, payload_key, payload, status_id, created_at, updated_at from tasks ",
- );
-
- if let Some(status) = status {
- builder.push("where status_id = ").push_bind(status);
- }
-
- builder.push("ORDER BY created_at DESC ");
-
- if let Some(limit) = limit {
- builder.push("LIMIT ").push_bind(limit);
- }
- builder
- }
-
- pub async fn get_tasks(
- &self,
- status: Option,
- limit: Option,
- ) -> crate::Result>> {
- let mut builder = Self::get_task_builder(status, limit);
-
- let tasks: Vec> = builder.build_query_as().fetch_all(&self.pool).await?;
-
- Ok(tasks)
- }
-
- #[instrument(skip(self, values))]
- pub async fn load_tasks(&self, values: Vec) -> crate::Result<()>
- where
- T: TaskPayload + Serialize + std::fmt::Debug,
- {
- let mut tx = self.pool.begin().await?;
- let mut builder: QueryBuilder<'_, Sqlite> =
- QueryBuilder::new("insert into tasks(payload_key, payload, status_id)");
-
- let args: crate::Result> = values
- .iter()
- .map(|value| Ok((value.get_key(), serde_json::to_string(value)?)))
- .collect();
-
- let mut affected_rows = 0;
- // Chunk the query by the size limit of bind params
- for chunk in args?.chunks(SQLITE_BIND_LIMIT / 3) {
- builder.push_values(chunk, |mut builder, item| {
- builder
- .push_bind(&item.0)
- .push_bind(&item.1)
- .push_bind(TaskStatus::Pending);
- });
- builder.push("ON conflict (payload_key) DO NOTHING");
-
- let query = builder.build();
-
- affected_rows += query.execute(&mut *tx).await?.rows_affected();
- builder.reset();
- }
-
- tx.commit().await?;
-
- info!("{} rows inserted.", affected_rows);
-
- Ok(())
- }
-
- pub async fn run_tasks(&self, func: TaskJob) -> crate::Result<()> {
- let mut builder = Self::get_task_builder(Some(TaskStatus::Pending), None);
-
- let rows = builder.build_query_as::>().fetch(&self.pool);
-
- let result: Vec<(Task, TaskStatus)> = rows.map(|x| {
- let task = x.unwrap();
- let status = func(&task);
-
- (task, status)
- }).collect().await;
-
- Ok(())
- }
-}
diff --git a/lib_sync_core/src/tasks.rs b/lib_sync_core/src/tasks.rs
new file mode 100644
index 0000000..04c22b7
--- /dev/null
+++ b/lib_sync_core/src/tasks.rs
@@ -0,0 +1,85 @@
+use chrono::Utc;
+use futures::StreamExt;
+use serde::de::DeserializeOwned;
+use serde::Serialize;
+use std::fmt::Display;
+use tabled::Tabled;
+
+mod manager;
+
+#[derive(sqlx::Type, Debug, Clone)]
+#[repr(u8)]
+pub enum TaskStatus {
+ Pending = 1,
+ InProgress = 2,
+ Completed = 3,
+ Failed = 4,
+}
+
+impl Display for TaskStatus {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ match self {
+ TaskStatus::Pending => {
+ write!(f, "Pending")
+ }
+ TaskStatus::InProgress => {
+ write!(f, "In Progress")
+ }
+ TaskStatus::Completed => {
+ write!(f, "Completed")
+ }
+ TaskStatus::Failed => {
+ write!(f, "Failed")
+ }
+ }
+ }
+}
+
+pub trait TaskPayloadKey {
+ fn get_key(&self) -> String;
+}
+
+pub trait TaskPayload:
+ Serialize + DeserializeOwned + Send + Unpin + 'static + Display + TaskPayloadKey
+{
+}
+impl
+ TaskPayload for T
+{
+}
+
+pub type TaskJob = fn(&Task) -> TaskStatus;
+
+#[derive(sqlx::FromRow, Tabled, Debug)]
+pub struct Task {
+ id: u32,
+ payload_key: String,
+ #[sqlx(json)]
+ #[tabled(skip)]
+ payload: T,
+ #[sqlx(rename = "status_id")]
+ status: TaskStatus,
+ created_at: chrono::DateTime,
+ #[tabled(display = "display_option_date")]
+ updated_at: Option>,
+}
+
+impl Task {
+ pub fn payload(&self) -> &T {
+ &self.payload
+ }
+}
+
+impl Task {
+ pub fn get_key(&self) -> String {
+ self.payload_key.clone()
+ }
+}
+
+fn display_option_date(o: &Option>) -> String {
+ match o {
+ Some(s) => format!("{}", s),
+ None => String::from(""),
+ }
+}
+
diff --git a/lib_sync_core/src/tasks/manager.rs b/lib_sync_core/src/tasks/manager.rs
new file mode 100644
index 0000000..bee3cb8
--- /dev/null
+++ b/lib_sync_core/src/tasks/manager.rs
@@ -0,0 +1,32 @@
+use futures::StreamExt;
+use std::marker::PhantomData;
+use crate::database::TaskStorage;
+use crate::tasks::{Task, TaskJob, TaskPayload, TaskStatus};
+
+struct TaskManager>
+{
+ storage: T,
+ _marker: PhantomData
,
+}
+
+impl> TaskManager {
+ pub fn new(storage: T) -> Self {
+ Self {
+ storage,
+ _marker: PhantomData,
+ }
+ }
+
+ pub async fn run_tasks(&self, func: TaskJob) -> crate::Result<()> {
+ let rows = self.storage.get_tasks(TaskStatus::Pending);
+
+ let result: Vec<(Task, TaskStatus)> = rows.map(|x| {
+ let task = x.unwrap();
+ let status = func(&task);
+
+ (task, status)
+ }).collect().await;
+
+ Ok(())
+ }
+}
\ No newline at end of file