refactor(lib_core): create new lib core with apalis

use apalis library for task management
This commit is contained in:
Alexander Navarro 2025-12-29 19:45:19 -03:00
parent 2c47226dc9
commit 040c53cebd
7 changed files with 403 additions and 56 deletions

28
task_queue/src/error.rs Normal file
View file

@ -0,0 +1,28 @@
use thiserror::Error;
#[derive(Error, Debug)]
pub enum Error {
#[error("{0}")]
Exception(&'static str),
#[error("{0}")]
Unhandled(&'static str),
#[error(transparent)]
Io(#[from] tokio::io::Error),
#[error(transparent)]
Sqlx(#[from] sqlx::Error),
#[error(transparent)]
Migration(#[from] sqlx::migrate::MigrateError),
#[error(transparent)]
ParseJson(#[from] serde_json::Error),
#[error(transparent)]
WorkerError(#[from] apalis::prelude::WorkerError),
}
pub type Result<T> = std::result::Result<T, Error>;

9
task_queue/src/lib.rs Normal file
View file

@ -0,0 +1,9 @@
#![allow(dead_code, unused)]
use apalis::prelude::*;
use serde::{Deserialize, Serialize};
pub mod error;
pub(crate) use error::*;
pub mod tasks;

50
task_queue/src/tasks.rs Normal file
View file

@ -0,0 +1,50 @@
use serde::{Deserialize, Serialize};
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct Task {
pub id: u32,
}
#[cfg(test)]
mod tests {
use std::time::Duration;
use crate::Result;
use super::*;
use apalis::prelude::*;
use apalis_sqlite::SqliteStorage;
use fake::{Fake, Faker};
use sqlx::SqlitePool;
use tokio_stream::StreamExt;
async fn generate_dummy_tasks<T: TaskSink<super::Task>>(storage: &mut T)
where
<T as apalis::prelude::Backend>::Error: std::fmt::Debug,
{
storage
.push(super::Task { id: Faker.fake() })
.await
.unwrap();
}
#[tokio::test]
async fn can_enqueue_tasks() {
let pool = SqlitePool::connect(":memory:").await.unwrap();
SqliteStorage::setup(&pool).await.unwrap();
let mut backend = SqliteStorage::new(&pool);
generate_dummy_tasks(&mut backend).await;
async fn process_task(task: super::Task, worker: WorkerContext) -> crate::Result<()> {
tokio::time::sleep(Duration::from_millis(100)).await;
worker.stop()?;
Ok(())
}
let worker = WorkerBuilder::new("rango-tango")
.backend(backend)
.build(process_task);
worker.run().await.unwrap();
}
}