Compare commits

...

1 commit

Author SHA1 Message Date
2126a0f83d wip(lib_core): introduce database module
refs: #5
2025-05-16 19:07:13 -04:00
6 changed files with 249 additions and 124 deletions

5
.idea/codeStyles/codeStyleConfig.xml generated Normal file
View file

@ -0,0 +1,5 @@
<component name="ProjectCodeStyleConfiguration">
<state>
<option name="PREFERRED_PROJECT_CODE_STYLE" value="Default" />
</state>
</component>

23
Cargo.lock generated
View file

@ -110,6 +110,28 @@ dependencies = [
"windows-sys 0.59.0", "windows-sys 0.59.0",
] ]
[[package]]
name = "async-stream"
version = "0.3.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0b5a71a6f37880a80d1d7f19efd781e4b5de42c88f0722cc13bcb6cc2cfe8476"
dependencies = [
"async-stream-impl",
"futures-core",
"pin-project-lite",
]
[[package]]
name = "async-stream-impl"
version = "0.3.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c7c24de15d275a1ecfd47a380fb4d5ec9bfe0933f309ed5e705b775596a3574d"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]] [[package]]
name = "atoi" name = "atoi"
version = "2.0.0" version = "2.0.0"
@ -941,6 +963,7 @@ dependencies = [
name = "lib_sync_core" name = "lib_sync_core"
version = "0.1.0" version = "0.1.0"
dependencies = [ dependencies = [
"async-stream",
"chrono", "chrono",
"clap", "clap",
"directories", "directories",

View file

@ -18,3 +18,4 @@ tracing-core = "0.1.33"
tabled = "0.19.0" tabled = "0.19.0"
futures = "0.3.31" futures = "0.3.31"
thiserror = "2.0.12" thiserror = "2.0.12"
async-stream = "0.3.6"

View file

@ -0,0 +1,197 @@
use crate::task_manager::{Task, TaskPayload, TaskStatus};
use futures::stream::BoxStream;
use futures::{Stream, StreamExt, TryStreamExt};
use serde::Serialize;
use sqlx::sqlite::{SqliteConnectOptions, SqliteJournalMode};
use sqlx::{Error, QueryBuilder, Sqlite, SqlitePool};
use std::path::PathBuf;
use tokio::fs;
use tracing::{info, instrument};
static SQLITE_BIND_LIMIT: usize = 32766;
#[derive(Default, Clone)]
pub struct TaskPagination {
page_size: usize,
limit: Option<u32>,
offset: Option<u32>,
status: Option<TaskStatus>,
}
impl TaskPagination {
pub fn new() -> Self {
Self::default()
}
pub fn next(&self) -> Self {
Self {
page_size: self.page_size + self.page_size,
..self.clone()
}
}
pub fn prev(&self) -> Self {
Self {
page_size: self.page_size - self.page_size,
..self.clone()
}
}
pub fn set_page_size(&mut self, page_size: usize) {
self.page_size = page_size;
}
pub fn set_limit(&mut self, limit: Option<u32>) {
self.limit = limit;
}
pub fn set_offset(&mut self, offset: Option<u32>) {
self.offset = offset;
}
pub fn set_status(&mut self, status: Option<TaskStatus>) {
self.status = status;
}
}
pub struct TasksPage<T: TaskPayload> {
tasks: Vec<Task<T>>,
page: TaskPagination
}
impl<T: TaskPayload> TasksPage<T> {
fn new(tasks: Vec<Task<T>>, page: TaskPagination) -> Self {
Self {
tasks,
page
}
}
pub fn next(&self) -> TaskPagination {
self.page.next()
}
pub fn prev(&self) -> TaskPagination {
self.page.prev()
}
}
pub trait TaskStorage<T: TaskPayload> {
fn insert_tasks(&self, tasks: Vec<Task<T>>) -> crate::Result<()>;
fn get_tasks(&self, options: TaskStatus) -> impl Stream<Item = crate::Result<Task<T>>>;
async fn get_paginated_tasks(&self, page: &TaskPagination) -> crate::Result<TasksPage<T>>;
}
#[derive(Debug)]
pub struct Database {
pool: SqlitePool,
}
impl Database {
pub async fn new<P: Into<PathBuf>>(base_path: P) -> crate::Result<Self> {
Ok(Self {
pool: Self::connect_database(base_path).await?,
})
}
async fn connect_database<P: Into<PathBuf>>(base_path: P) -> crate::Result<SqlitePool> {
let base_path = base_path.into();
let database_file_path = base_path.join("db.sql");
fs::create_dir_all(base_path).await?;
let opts = SqliteConnectOptions::new()
.filename(database_file_path)
.create_if_missing(true)
.journal_mode(SqliteJournalMode::Wal);
let pool = SqlitePool::connect_with(opts).await?;
sqlx::migrate!("../migrations").run(&pool).await?;
Ok(pool)
}
#[instrument(skip(self, values))]
pub async fn load_tasks<T>(&self, values: Vec<T>) -> crate::Result<()>
where
T: TaskPayload + Serialize + std::fmt::Debug,
{
let mut tx = self.pool.begin().await?;
let mut builder: QueryBuilder<'_, Sqlite> =
QueryBuilder::new("insert into tasks(payload_key, payload, status_id)");
let args: crate::Result<Vec<(String, String)>> = values
.iter()
.map(|value| Ok((value.get_key(), serde_json::to_string(value)?)))
.collect();
let mut affected_rows = 0;
// Chunk the query by the size limit of bind params
for chunk in args?.chunks(SQLITE_BIND_LIMIT / 3) {
builder.push_values(chunk, |mut builder, item| {
builder
.push_bind(&item.0)
.push_bind(&item.1)
.push_bind(TaskStatus::Pending);
});
builder.push("ON conflict (payload_key) DO NOTHING");
let query = builder.build();
affected_rows += query.execute(&mut *tx).await?.rows_affected();
builder.reset();
}
tx.commit().await?;
info!("{} rows inserted.", affected_rows);
Ok(())
}
}
impl<T: TaskPayload> TaskStorage<T> for Database {
fn insert_tasks(&self, tasks: Vec<Task<T>>) -> crate::error::Result<()> {
todo!()
}
fn get_tasks(&self, task_status: TaskStatus) -> impl Stream<Item = crate::Result<Task<T>>> {
let query= sqlx::query_as::<_, Task<T>>(
"
SELECT id, payload_key, payload, status_id, created_at, updated_at
FROM tasks
WHERE status_id = ?
ORDER BY created_at DESC
",
).bind(task_status);
query.fetch(&self.pool).err_into::<crate::Error>()
}
async fn get_paginated_tasks(&self, page: &TaskPagination) -> crate::Result<TasksPage<T>> {
let mut builder: QueryBuilder<'_, Sqlite> = QueryBuilder::new(
"select id, payload_key, payload, status_id, created_at, updated_at from tasks ",
);
if let Some(status) = &page.status {
builder.push("where status_id = ").push_bind(status);
}
builder.push("ORDER BY created_at DESC ");
if let Some(limit) = &page.offset {
builder.push("OFFSET ").push_bind(limit);
}
if let Some(limit) = &page.limit {
builder.push("LIMIT ").push_bind(limit);
}
let tasks = builder.build_query_as::<Task<T>>().fetch_all(&self.pool).await?;
Ok(TasksPage::new(tasks, page.clone()))
}
}

View file

@ -2,6 +2,7 @@ pub mod error;
pub(crate) use error::*; pub(crate) use error::*;
pub mod task_manager; pub mod task_manager;
mod database;
pub fn add(left: u64, right: u64) -> u64 { pub fn add(left: u64, right: u64) -> u64 {
left + right left + right

View file

@ -8,13 +8,12 @@ use sqlx::sqlite::{SqliteConnectOptions, SqliteJournalMode};
use sqlx::{QueryBuilder, Sqlite, SqlitePool}; use sqlx::{QueryBuilder, Sqlite, SqlitePool};
use std::fmt::Display; use std::fmt::Display;
use std::path::PathBuf; use std::path::PathBuf;
use futures::stream::BoxStream;
use tabled::Tabled; use tabled::Tabled;
use tokio::fs; use tokio::fs;
use tracing::{info, instrument}; use tracing::{info, instrument};
static SQLITE_BIND_LIMIT: usize = 32766; #[derive(sqlx::Type, Debug, Clone)]
#[derive(sqlx::Type, Debug)]
#[repr(u8)] #[repr(u8)]
pub enum TaskStatus { pub enum TaskStatus {
Pending = 1, Pending = 1,
@ -42,11 +41,14 @@ impl Display for TaskStatus {
} }
} }
pub trait TaskPayload { pub trait TaskPayloadKey {
fn get_key(&self) -> String; fn get_key(&self) -> String;
} }
pub type TaskJob<T: _Task> = fn(&Task<T>) -> TaskStatus; pub trait TaskPayload: DeserializeOwned + Send + Unpin + 'static + Display + TaskPayloadKey {}
impl<T: DeserializeOwned + Send + Unpin + 'static + Display + TaskPayloadKey> TaskPayload for T {}
pub type TaskJob<T: TaskPayload> = fn(&Task<T>) -> TaskStatus;
#[derive(sqlx::FromRow, Tabled, Debug)] #[derive(sqlx::FromRow, Tabled, Debug)]
pub struct Task<T: DeserializeOwned + std::fmt::Display> { pub struct Task<T: DeserializeOwned + std::fmt::Display> {
@ -75,126 +77,22 @@ fn display_option_date(o: &Option<chrono::DateTime<Utc>>) -> String {
} }
} }
pub trait _Task: DeserializeOwned + Send + Unpin + 'static + Display {}
impl<T: DeserializeOwned + Send + Unpin + 'static + Display> _Task for T {}
#[derive(Debug)] struct TaskManager{}
pub struct TaskManager {
base_path: PathBuf,
pool: SqlitePool,
}
impl TaskManager { impl TaskManager {
pub async fn new<P: Into<PathBuf>>(base_path: P) -> Result<TaskManager, Error> { // pub async fn run_tasks<T: TaskPayload>(&self, func: TaskJob<T>) -> crate::Result<()> {
let base_path = base_path.into(); // let mut builder = Self::get_task_builder(Some(TaskStatus::Pending), None);
let pool = Self::connect_database(base_path.clone()).await?; //
Ok(Self { // let rows = builder.build_query_as::<Task<T>>().fetch(&self.pool);
base_path, //
pool, // let result: Vec<(Task<T>, TaskStatus)> = rows.map(|x| {
}) // let task = x.unwrap();
} // let status = func(&task);
//
async fn connect_database<P: Into<PathBuf>>(base_path: P) -> crate::Result<SqlitePool> { // (task, status)
let base_path = base_path.into(); // }).collect().await;
//
let database_file_path = base_path.join("db.sql"); // Ok(())
// }
fs::create_dir_all(base_path).await?;
let opts = SqliteConnectOptions::new()
.filename(database_file_path)
.create_if_missing(true)
.journal_mode(SqliteJournalMode::Wal);
let pool = SqlitePool::connect_with(opts).await?;
sqlx::migrate!("../migrations").run(&pool).await?;
Ok(pool)
}
fn get_task_builder(
status: Option<TaskStatus>,
limit: Option<u16>,
) -> QueryBuilder<'static, Sqlite> {
let mut builder: QueryBuilder<'_, Sqlite> = QueryBuilder::new(
"select id, payload_key, payload, status_id, created_at, updated_at from tasks ",
);
if let Some(status) = status {
builder.push("where status_id = ").push_bind(status);
}
builder.push("ORDER BY created_at DESC ");
if let Some(limit) = limit {
builder.push("LIMIT ").push_bind(limit);
}
builder
}
pub async fn get_tasks<T: _Task>(
&self,
status: Option<TaskStatus>,
limit: Option<u16>,
) -> crate::Result<Vec<Task<T>>> {
let mut builder = Self::get_task_builder(status, limit);
let tasks: Vec<Task<T>> = builder.build_query_as().fetch_all(&self.pool).await?;
Ok(tasks)
}
#[instrument(skip(self, values))]
pub async fn load_tasks<T>(&self, values: Vec<T>) -> crate::Result<()>
where
T: TaskPayload + Serialize + std::fmt::Debug,
{
let mut tx = self.pool.begin().await?;
let mut builder: QueryBuilder<'_, Sqlite> =
QueryBuilder::new("insert into tasks(payload_key, payload, status_id)");
let args: crate::Result<Vec<(String, String)>> = values
.iter()
.map(|value| Ok((value.get_key(), serde_json::to_string(value)?)))
.collect();
let mut affected_rows = 0;
// Chunk the query by the size limit of bind params
for chunk in args?.chunks(SQLITE_BIND_LIMIT / 3) {
builder.push_values(chunk, |mut builder, item| {
builder
.push_bind(&item.0)
.push_bind(&item.1)
.push_bind(TaskStatus::Pending);
});
builder.push("ON conflict (payload_key) DO NOTHING");
let query = builder.build();
affected_rows += query.execute(&mut *tx).await?.rows_affected();
builder.reset();
}
tx.commit().await?;
info!("{} rows inserted.", affected_rows);
Ok(())
}
pub async fn run_tasks<T: _Task>(&self, func: TaskJob<T>) -> crate::Result<()> {
let mut builder = Self::get_task_builder(Some(TaskStatus::Pending), None);
let rows = builder.build_query_as::<Task<T>>().fetch(&self.pool);
let result: Vec<(Task<T>, TaskStatus)> = rows.map(|x| {
let task = x.unwrap();
let status = func(&task);
(task, status)
}).collect().await;
Ok(())
}
} }