feature: add command to run tasks

This commit is contained in:
Alexander Navarro 2025-05-12 16:40:41 -04:00
parent 63c20cfc87
commit 5387210844
8 changed files with 160 additions and 17 deletions

View file

@ -1,6 +1,7 @@
use crate::Error;
use chrono::Utc;
use directories::ProjectDirs;
use futures::{StreamExt, TryStreamExt};
use serde::de::DeserializeOwned;
use serde::Serialize;
use sqlx::sqlite::{SqliteConnectOptions, SqliteJournalMode};
@ -44,6 +45,8 @@ pub trait TaskPayload {
fn get_key(&self) -> String;
}
pub type TaskJob<T: _Task> = fn(&Task<T>) -> TaskStatus;
#[derive(sqlx::FromRow, Tabled, Debug)]
pub struct Task<T: DeserializeOwned + std::fmt::Display> {
id: u32,
@ -58,6 +61,12 @@ pub struct Task<T: DeserializeOwned + std::fmt::Display> {
updated_at: Option<chrono::DateTime<Utc>>,
}
impl<T: DeserializeOwned + std::fmt::Display> Task<T> {
pub fn get_key(&self) -> String {
self.payload_key.clone()
}
}
fn display_option_date(o: &Option<chrono::DateTime<Utc>>) -> String {
match o {
Some(s) => format!("{}", s),
@ -65,6 +74,9 @@ fn display_option_date(o: &Option<chrono::DateTime<Utc>>) -> String {
}
}
pub trait _Task: DeserializeOwned + Send + Unpin + 'static + Display {}
impl<T: DeserializeOwned + Send + Unpin + 'static + Display> _Task for T {}
#[derive(Debug)]
pub struct TaskManager {
pool: SqlitePool,
@ -97,21 +109,34 @@ impl TaskManager {
Ok(pool)
}
pub async fn get_tasks<
T: DeserializeOwned + Send + Unpin + 'static + Display,
>(
fn get_task_builder(
status: Option<TaskStatus>,
limit: Option<u16>,
) -> QueryBuilder<'static, Sqlite> {
let mut builder: QueryBuilder<'_, Sqlite> = QueryBuilder::new(
"select id, payload_key, payload, status_id, created_at, updated_at from tasks ",
);
if let Some(status) = status {
builder.push("where status_id = ").push_bind(status);
}
builder.push("ORDER BY created_at DESC ");
if let Some(limit) = limit {
builder.push("LIMIT ").push_bind(limit);
}
builder
}
pub async fn get_tasks<T: _Task>(
&self,
status: Option<TaskStatus>,
limit: u16,
limit: Option<u16>,
) -> crate::Result<Vec<Task<T>>> {
let tasks: Vec<Task<T>> = sqlx::query_as(
"select id, payload_key, payload, status_id, created_at, updated_at from tasks where status_id = ? order by ? limit ?",
)
.bind(status.unwrap_or(TaskStatus::Pending))
.bind("created_at DESC")
.bind(limit)
.fetch_all(&self.pool)
.await?;
let mut builder = Self::get_task_builder(status, limit);
let tasks: Vec<Task<T>> = builder.build_query_as().fetch_all(&self.pool).await?;
Ok(tasks)
}
@ -153,4 +178,19 @@ impl TaskManager {
Ok(())
}
pub async fn run_tasks<T: _Task>(&self, func: TaskJob<T>) -> crate::Result<()> {
let mut builder = Self::get_task_builder(Some(TaskStatus::Pending), None);
let rows = builder.build_query_as::<Task<T>>().fetch(&self.pool);
let result: Vec<(Task<T>, TaskStatus)> = rows.map(|x| {
let task = x.unwrap();
let status = func(&task);
(task, status)
}).collect().await;
Ok(())
}
}