refactor(lib_core): create new lib core with apalis
use apalis library for task management
This commit is contained in:
parent
cfa2474606
commit
a3adb9ffa8
7 changed files with 403 additions and 56 deletions
26
task_queue/Cargo.toml
Normal file
26
task_queue/Cargo.toml
Normal file
|
|
@ -0,0 +1,26 @@
|
|||
[package]
|
||||
name = "task_queue"
|
||||
version = "0.1.0"
|
||||
edition = "2024"
|
||||
|
||||
[dependencies]
|
||||
directories = "6.0.0"
|
||||
tokio = { version = "1.45.0", features = ["default", "rt", "rt-multi-thread", "macros", "signal"] }
|
||||
tokio-stream = "0.1.17"
|
||||
sqlx = { version = "0.8", features = ["runtime-tokio", "sqlite", "chrono", "migrate", "uuid"] }
|
||||
serde = { version = "1.0.219", features = ["derive"] }
|
||||
chrono = {version = "0.4.41", features = ["serde"]}
|
||||
serde_json = "1.0.140"
|
||||
tracing = "0.1.41"
|
||||
tracing-subscriber = { version = "0.3.19" , features = ["env-filter"]}
|
||||
figment = { version = "0.10.19", features = ["env"] }
|
||||
tracing-core = "0.1.33"
|
||||
futures = "0.3.31"
|
||||
thiserror = "2.0.12"
|
||||
async-stream = "0.3.6"
|
||||
apalis = { version = "1.0.0-rc.1" }
|
||||
apalis-sqlite = "1.0.0-rc.1"
|
||||
|
||||
[dev-dependencies]
|
||||
fake = { version = "4.3.0", features = ["derive", "chrono", "http", "uuid"] }
|
||||
tracing-test = "0.2.5"
|
||||
28
task_queue/src/error.rs
Normal file
28
task_queue/src/error.rs
Normal file
|
|
@ -0,0 +1,28 @@
|
|||
use thiserror::Error;
|
||||
|
||||
#[derive(Error, Debug)]
|
||||
pub enum Error {
|
||||
#[error("{0}")]
|
||||
Exception(&'static str),
|
||||
|
||||
#[error("{0}")]
|
||||
Unhandled(&'static str),
|
||||
|
||||
#[error(transparent)]
|
||||
Io(#[from] tokio::io::Error),
|
||||
|
||||
#[error(transparent)]
|
||||
Sqlx(#[from] sqlx::Error),
|
||||
|
||||
#[error(transparent)]
|
||||
Migration(#[from] sqlx::migrate::MigrateError),
|
||||
|
||||
#[error(transparent)]
|
||||
ParseJson(#[from] serde_json::Error),
|
||||
|
||||
#[error(transparent)]
|
||||
WorkerError(#[from] apalis::prelude::WorkerError),
|
||||
}
|
||||
|
||||
pub type Result<T> = std::result::Result<T, Error>;
|
||||
|
||||
9
task_queue/src/lib.rs
Normal file
9
task_queue/src/lib.rs
Normal file
|
|
@ -0,0 +1,9 @@
|
|||
#![allow(dead_code, unused)]
|
||||
|
||||
use apalis::prelude::*;
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
pub mod error;
|
||||
|
||||
pub(crate) use error::*;
|
||||
pub mod tasks;
|
||||
50
task_queue/src/tasks.rs
Normal file
50
task_queue/src/tasks.rs
Normal file
|
|
@ -0,0 +1,50 @@
|
|||
use serde::{Deserialize, Serialize};
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, Clone)]
|
||||
pub struct Task {
|
||||
pub id: u32,
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::time::Duration;
|
||||
|
||||
use crate::Result;
|
||||
|
||||
use super::*;
|
||||
use apalis::prelude::*;
|
||||
use apalis_sqlite::SqliteStorage;
|
||||
use fake::{Fake, Faker};
|
||||
use sqlx::SqlitePool;
|
||||
use tokio_stream::StreamExt;
|
||||
|
||||
async fn generate_dummy_tasks<T: TaskSink<super::Task>>(storage: &mut T)
|
||||
where
|
||||
<T as apalis::prelude::Backend>::Error: std::fmt::Debug,
|
||||
{
|
||||
storage
|
||||
.push(super::Task { id: Faker.fake() })
|
||||
.await
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn can_enqueue_tasks() {
|
||||
let pool = SqlitePool::connect(":memory:").await.unwrap();
|
||||
SqliteStorage::setup(&pool).await.unwrap();
|
||||
let mut backend = SqliteStorage::new(&pool);
|
||||
|
||||
generate_dummy_tasks(&mut backend).await;
|
||||
|
||||
async fn process_task(task: super::Task, worker: WorkerContext) -> crate::Result<()> {
|
||||
tokio::time::sleep(Duration::from_millis(100)).await;
|
||||
|
||||
worker.stop()?;
|
||||
Ok(())
|
||||
}
|
||||
let worker = WorkerBuilder::new("rango-tango")
|
||||
.backend(backend)
|
||||
.build(process_task);
|
||||
worker.run().await.unwrap();
|
||||
}
|
||||
}
|
||||
Loading…
Add table
Add a link
Reference in a new issue