refactor(lib_core): task manager structure

refs: #5
This commit is contained in:
Alexander Navarro 2025-05-16 20:00:25 -04:00
parent 4199a97a19
commit 45a3bf291b
7 changed files with 176 additions and 160 deletions

View file

@ -1,14 +1,6 @@
use crate::task_manager::{Task, TaskPayload, TaskStatus};
use futures::stream::BoxStream;
use futures::{Stream, StreamExt, TryStreamExt};
use serde::Serialize;
use sqlx::sqlite::{SqliteConnectOptions, SqliteJournalMode};
use sqlx::{Error, QueryBuilder, Sqlite, SqlitePool};
use std::path::PathBuf;
use tokio::fs;
use tracing::{info, instrument};
static SQLITE_BIND_LIMIT: usize = 32766;
use crate::tasks::{Task, TaskPayload, TaskStatus};
use futures::{Stream};
mod sqlite;
#[derive(Default, Clone)]
pub struct TaskPagination {
@ -77,121 +69,9 @@ impl<T: TaskPayload> TasksPage<T> {
}
pub trait TaskStorage<T: TaskPayload> {
fn insert_tasks(&self, tasks: Vec<Task<T>>) -> crate::Result<()>;
async fn insert_tasks(&self, tasks: Vec<Task<T>>) -> crate::Result<()>;
fn get_tasks(&self, options: TaskStatus) -> impl Stream<Item = crate::Result<Task<T>>>;
async fn get_paginated_tasks(&self, page: &TaskPagination) -> crate::Result<TasksPage<T>>;
}
#[derive(Debug)]
pub struct Database {
pool: SqlitePool,
}
impl Database {
pub async fn new<P: Into<PathBuf>>(base_path: P) -> crate::Result<Self> {
Ok(Self {
pool: Self::connect_database(base_path).await?,
})
}
async fn connect_database<P: Into<PathBuf>>(base_path: P) -> crate::Result<SqlitePool> {
let base_path = base_path.into();
let database_file_path = base_path.join("db.sql");
fs::create_dir_all(base_path).await?;
let opts = SqliteConnectOptions::new()
.filename(database_file_path)
.create_if_missing(true)
.journal_mode(SqliteJournalMode::Wal);
let pool = SqlitePool::connect_with(opts).await?;
sqlx::migrate!("../migrations").run(&pool).await?;
Ok(pool)
}
#[instrument(skip(self, values))]
pub async fn load_tasks<T>(&self, values: Vec<T>) -> crate::Result<()>
where
T: TaskPayload + Serialize + std::fmt::Debug,
{
let mut tx = self.pool.begin().await?;
let mut builder: QueryBuilder<'_, Sqlite> =
QueryBuilder::new("insert into tasks(payload_key, payload, status_id)");
let args: crate::Result<Vec<(String, String)>> = values
.iter()
.map(|value| Ok((value.get_key(), serde_json::to_string(value)?)))
.collect();
let mut affected_rows = 0;
// Chunk the query by the size limit of bind params
for chunk in args?.chunks(SQLITE_BIND_LIMIT / 3) {
builder.push_values(chunk, |mut builder, item| {
builder
.push_bind(&item.0)
.push_bind(&item.1)
.push_bind(TaskStatus::Pending);
});
builder.push("ON conflict (payload_key) DO NOTHING");
let query = builder.build();
affected_rows += query.execute(&mut *tx).await?.rows_affected();
builder.reset();
}
tx.commit().await?;
info!("{} rows inserted.", affected_rows);
Ok(())
}
}
impl<T: TaskPayload> TaskStorage<T> for Database {
fn insert_tasks(&self, tasks: Vec<Task<T>>) -> crate::error::Result<()> {
todo!()
}
fn get_tasks(&self, task_status: TaskStatus) -> impl Stream<Item = crate::Result<Task<T>>> {
let query= sqlx::query_as::<_, Task<T>>(
"
SELECT id, payload_key, payload, status_id, created_at, updated_at
FROM tasks
WHERE status_id = ?
ORDER BY created_at DESC
",
).bind(task_status);
query.fetch(&self.pool).err_into::<crate::Error>()
}
async fn get_paginated_tasks(&self, page: &TaskPagination) -> crate::Result<TasksPage<T>> {
let mut builder: QueryBuilder<'_, Sqlite> = QueryBuilder::new(
"select id, payload_key, payload, status_id, created_at, updated_at from tasks ",
);
if let Some(status) = &page.status {
builder.push("where status_id = ").push_bind(status);
}
builder.push("ORDER BY created_at DESC ");
if let Some(limit) = &page.offset {
builder.push("OFFSET ").push_bind(limit);
}
if let Some(limit) = &page.limit {
builder.push("LIMIT ").push_bind(limit);
}
let tasks = builder.build_query_as::<Task<T>>().fetch_all(&self.pool).await?;
Ok(TasksPage::new(tasks, page.clone()))
}
}