Compare commits

...

2 commits

Author SHA1 Message Date
a3adb9ffa8 refactor(lib_core): create new lib core with apalis
use apalis library for task management
2025-12-29 19:52:33 -03:00
cfa2474606 test(lib_core): update task manager tests
refs: #5

wip: add test to task_manager

refs: #5
2025-12-29 19:52:23 -03:00
17 changed files with 692 additions and 94 deletions

29
.idea/codeStyles/Project.xml generated Normal file
View file

@ -0,0 +1,29 @@
<component name="ProjectCodeStyleConfiguration">
<code_scheme name="Project" version="173">
<RsCodeStyleSettings>
<option name="INDENT_WHERE_CLAUSE" value="true" />
</RsCodeStyleSettings>
<SqlCodeStyleSettings version="7">
<option name="KEYWORD_CASE" value="2" />
<option name="SUBQUERY_OPENING" value="1" />
<option name="SUBQUERY_CONTENT" value="4" />
<option name="SUBQUERY_CLOSING" value="4" />
<option name="SUBQUERY_PAR_SPACE_BEFORE" value="1" />
<option name="INSERT_INTO_NL" value="2" />
<option name="INSERT_COLLAPSE_MULTI_ROW_VALUES" value="true" />
<option name="INSERT_MATRIX_ALIGN" value="true" />
<option name="INSERT_MATRIX_INCLUDING_HEADER" value="true" />
<option name="FROM_ALIGN_JOIN_TABLES" value="true" />
<option name="FROM_ALIGN_ALIASES" value="true" />
</SqlCodeStyleSettings>
<codeStyleSettings language="Rust">
<indentOptions>
<option name="INDENT_SIZE" value="2" />
<option name="CONTINUATION_INDENT_SIZE" value="2" />
<option name="TAB_SIZE" value="2" />
<option name="USE_TAB_CHARACTER" value="true" />
<option name="SMART_TABS" value="true" />
</indentOptions>
</codeStyleSettings>
</code_scheme>
</component>

View file

@ -1,5 +1,6 @@
<component name="ProjectCodeStyleConfiguration">
<state>
<option name="USE_PER_PROJECT_SETTINGS" value="true" />
<option name="PREFERRED_PROJECT_CODE_STYLE" value="Default" />
</state>
</component>

341
Cargo.lock generated
View file

@ -110,6 +110,84 @@ dependencies = [
"windows-sys 0.59.0",
]
[[package]]
name = "apalis"
version = "1.0.0-rc.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f93be0eb33b912f5e66004d0b756423c285273259068b1c80a71d7842658189b"
dependencies = [
"apalis-core",
"futures-util",
"pin-project",
"thiserror",
"tower",
"tracing",
]
[[package]]
name = "apalis-codec"
version = "0.1.0-rc.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a5ed6bb8e64c360ed4ad666a6cbc42e9e6df73087461dc4071f510a3af284637"
dependencies = [
"apalis-core",
"serde",
"serde_json",
]
[[package]]
name = "apalis-core"
version = "1.0.0-rc.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6b1557d680ee4a9b42a76ab3a9572cee1a00d45e7eb455427d906c42774766e7"
dependencies = [
"futures-channel",
"futures-core",
"futures-sink",
"futures-timer",
"futures-util",
"pin-project",
"serde",
"thiserror",
"tower-layer",
"tower-service",
"tracing",
]
[[package]]
name = "apalis-sql"
version = "1.0.0-rc.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5ade5d8faa60e9975b01d3bb1ebc5028589aa4986365eaa4d080d30ed3b5141f"
dependencies = [
"apalis-core",
"chrono",
"serde",
"serde_json",
"thiserror",
]
[[package]]
name = "apalis-sqlite"
version = "1.0.0-rc.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fd43020ce13d6cb8c8d8c09657a6691d8490cd1ce0d8bc0f7fef8bf9b23cfe86"
dependencies = [
"apalis-codec",
"apalis-core",
"apalis-sql",
"chrono",
"futures",
"log",
"pin-project",
"serde",
"serde_json",
"sqlx",
"thiserror",
"tokio",
"ulid",
]
[[package]]
name = "async-stream"
version = "0.3.6"
@ -526,16 +604,6 @@ version = "1.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "877a4ace8713b0bcf2a4e7eec82529c029f1d0619886d18145fea96c3ffe5c0f"
[[package]]
name = "errno"
version = "0.3.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "976dd42dc7e85965fe702eb8164f21f450704bdde31faefd6471dba214cb594e"
dependencies = [
"libc",
"windows-sys 0.59.0",
]
[[package]]
name = "etcetera"
version = "0.8.0"
@ -573,12 +641,6 @@ dependencies = [
"uuid",
]
[[package]]
name = "fastrand"
version = "2.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "37909eebbb50d72f9059c3b6d82c0463f2ff062c9e95845c43a6c9c0355411be"
[[package]]
name = "figment"
version = "0.10.19"
@ -706,6 +768,12 @@ version = "0.3.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f90f7dce0722e95104fcb095585910c0977252f286e354b5e3bd38902cd99988"
[[package]]
name = "futures-timer"
version = "3.0.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f288b0a4f20f9a56b5d1da57e2227c661b7b16168e2f72365f57b63326e29b24"
[[package]]
name = "futures-util"
version = "0.3.31"
@ -1066,6 +1134,7 @@ dependencies = [
"tabled",
"thiserror",
"tokio",
"tokio-stream",
"tracing",
"tracing-core",
"tracing-subscriber",
@ -1106,12 +1175,6 @@ dependencies = [
"vcpkg",
]
[[package]]
name = "linux-raw-sys"
version = "0.9.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cd945864f07fe9f5371a27ad7b52a172b4b499999f1d97574c9fa68373937e12"
[[package]]
name = "litemap"
version = "0.7.5"
@ -1341,6 +1404,26 @@ version = "2.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e3148f5046208a5d56bcfc03053e3ca6334e51da8dfb19b6cdc8b306fae3283e"
[[package]]
name = "pin-project"
version = "1.1.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "677f1add503faace112b9f1373e43e9e054bfdd22ff1a63c1bc485eaec6a6a8a"
dependencies = [
"pin-project-internal",
]
[[package]]
name = "pin-project-internal"
version = "1.1.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6e918e4ff8c4549eb882f14b3a4bc8c8bc93de829416eacf579f1207a8fbf861"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "pin-project-lite"
version = "0.2.16"
@ -1571,6 +1654,20 @@ version = "0.8.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2b15c43186be67a4fd63bee50d0303afffcef381492ebe2c5d87f324e1b8815c"
[[package]]
name = "ring"
version = "0.17.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a4689e6c2294d81e88dc6261c768b63bc4fcdb852be6d1352498b114f61383b7"
dependencies = [
"cc",
"cfg-if",
"getrandom 0.2.16",
"libc",
"untrusted",
"windows-sys 0.52.0",
]
[[package]]
name = "rsa"
version = "0.9.8"
@ -1598,16 +1695,37 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "719b953e2095829ee67db738b3bfa9fa368c94900df327b3f07fe6e794d2fe1f"
[[package]]
name = "rustix"
version = "1.0.7"
name = "rustls"
version = "0.23.35"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c71e83d6afe7ff64890ec6b71d6a69bb8a610ab78ce364b3352876bb4c801266"
checksum = "533f54bc6a7d4f647e46ad909549eda97bf5afc1585190ef692b4286b198bd8f"
dependencies = [
"bitflags",
"errno",
"libc",
"linux-raw-sys",
"windows-sys 0.59.0",
"once_cell",
"ring",
"rustls-pki-types",
"rustls-webpki",
"subtle",
"zeroize",
]
[[package]]
name = "rustls-pki-types"
version = "1.13.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "21e6f2ab2928ca4291b86736a8bd920a277a399bba1589409d72154ff87c1282"
dependencies = [
"zeroize",
]
[[package]]
name = "rustls-webpki"
version = "0.103.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2ffdfa2f5286e2247234e03f680868ac2815974dc39e00ea15adc445d0aafe52"
dependencies = [
"ring",
"rustls-pki-types",
"untrusted",
]
[[package]]
@ -1715,6 +1833,15 @@ version = "1.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0fda2ff0d084019ba4d7c6f371c95d8fd75ce3524c3cb8fb653a3023f6323e64"
[[package]]
name = "signal-hook-registry"
version = "1.4.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7664a098b8e616bdfcc2dc0e9ac44eb231eedf41db4e9fe95d8d32ec728dedad"
dependencies = [
"libc",
]
[[package]]
name = "signature"
version = "2.2.0"
@ -1774,9 +1901,9 @@ dependencies = [
[[package]]
name = "sqlx"
version = "0.8.5"
version = "0.8.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f3c3a85280daca669cfd3bcb68a337882a8bc57ec882f72c5d13a430613a738e"
checksum = "1fefb893899429669dcdd979aff487bd78f4064e5e7907e4269081e0ef7d97dc"
dependencies = [
"sqlx-core",
"sqlx-macros",
@ -1787,9 +1914,9 @@ dependencies = [
[[package]]
name = "sqlx-core"
version = "0.8.5"
version = "0.8.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f743f2a3cea30a58cd479013f75550e879009e3a02f616f18ca699335aa248c3"
checksum = "ee6798b1838b6a0f69c007c133b8df5866302197e404e8b6ee8ed3e3a5e68dc6"
dependencies = [
"base64",
"bytes",
@ -1809,6 +1936,7 @@ dependencies = [
"memchr",
"once_cell",
"percent-encoding",
"rustls",
"serde",
"serde_json",
"sha2",
@ -1819,13 +1947,14 @@ dependencies = [
"tracing",
"url",
"uuid",
"webpki-roots 0.26.11",
]
[[package]]
name = "sqlx-macros"
version = "0.8.5"
version = "0.8.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7f4200e0fde19834956d4252347c12a083bdcb237d7a1a1446bffd8768417dce"
checksum = "a2d452988ccaacfbf5e0bdbc348fb91d7c8af5bee192173ac3636b5fb6e6715d"
dependencies = [
"proc-macro2",
"quote",
@ -1836,9 +1965,9 @@ dependencies = [
[[package]]
name = "sqlx-macros-core"
version = "0.8.5"
version = "0.8.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "882ceaa29cade31beca7129b6beeb05737f44f82dbe2a9806ecea5a7093d00b7"
checksum = "19a9c1841124ac5a61741f96e1d9e2ec77424bf323962dd894bdb93f37d5219b"
dependencies = [
"dotenvy",
"either",
@ -1855,16 +1984,15 @@ dependencies = [
"sqlx-postgres",
"sqlx-sqlite",
"syn",
"tempfile",
"tokio",
"url",
]
[[package]]
name = "sqlx-mysql"
version = "0.8.5"
version = "0.8.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0afdd3aa7a629683c2d750c2df343025545087081ab5942593a5288855b1b7a7"
checksum = "aa003f0038df784eb8fecbbac13affe3da23b45194bd57dba231c8f48199c526"
dependencies = [
"atoi",
"base64",
@ -1906,9 +2034,9 @@ dependencies = [
[[package]]
name = "sqlx-postgres"
version = "0.8.5"
version = "0.8.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a0bedbe1bbb5e2615ef347a5e9d8cd7680fb63e77d9dafc0f29be15e53f1ebe6"
checksum = "db58fcd5a53cf07c184b154801ff91347e4c30d17a3562a635ff028ad5deda46"
dependencies = [
"atoi",
"base64",
@ -1945,9 +2073,9 @@ dependencies = [
[[package]]
name = "sqlx-sqlite"
version = "0.8.5"
version = "0.8.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c26083e9a520e8eb87a06b12347679b142dc2ea29e6e409f805644a7a979a5bc"
checksum = "c2d12fe70b2c1b4401038055f90f151b78208de1f9f89a7dbfd41587a10c3eea"
dependencies = [
"atoi",
"chrono",
@ -2009,6 +2137,12 @@ dependencies = [
"unicode-ident",
]
[[package]]
name = "sync_wrapper"
version = "1.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0bf256ce5efdfa370213c1dabab5935a12e49f2c58d15e9eac2870d3b4f27263"
[[package]]
name = "synstructure"
version = "0.13.2"
@ -2045,16 +2179,27 @@ dependencies = [
]
[[package]]
name = "tempfile"
version = "3.19.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7437ac7763b9b123ccf33c338a5cc1bac6f69b45a136c19bdd8a65e3916435bf"
name = "task_queue"
version = "0.1.0"
dependencies = [
"fastrand",
"getrandom 0.3.2",
"once_cell",
"rustix",
"windows-sys 0.59.0",
"apalis",
"apalis-sqlite",
"async-stream",
"chrono",
"directories",
"fake",
"figment",
"futures",
"serde",
"serde_json",
"sqlx",
"thiserror",
"tokio",
"tokio-stream",
"tracing",
"tracing-core",
"tracing-subscriber",
"tracing-test",
]
[[package]]
@ -2132,6 +2277,7 @@ dependencies = [
"libc",
"mio",
"pin-project-lite",
"signal-hook-registry",
"socket2",
"tokio-macros",
"windows-sys 0.52.0",
@ -2159,6 +2305,48 @@ dependencies = [
"tokio",
]
[[package]]
name = "tokio-util"
version = "0.7.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2efa149fe76073d6e8fd97ef4f4eca7b67f599660115591483572e406e165594"
dependencies = [
"bytes",
"futures-core",
"futures-sink",
"pin-project-lite",
"tokio",
]
[[package]]
name = "tower"
version = "0.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d039ad9159c98b70ecfd540b2573b97f7f52c3e8d9f8ad57a24b916a536975f9"
dependencies = [
"futures-core",
"futures-util",
"pin-project-lite",
"sync_wrapper",
"tokio",
"tokio-util",
"tower-layer",
"tower-service",
"tracing",
]
[[package]]
name = "tower-layer"
version = "0.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "121c2a6cda46980bb0fcd1647ffaf6cd3fc79a013de288782836f6df9c48780e"
[[package]]
name = "tower-service"
version = "0.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8df9b6e13f2d32c91b9bd719c00d1958837bc7dec474d94952798cc8e69eeec3"
[[package]]
name = "tracing"
version = "0.1.41"
@ -2248,6 +2436,17 @@ version = "1.18.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1dccffe3ce07af9386bfd29e80c0ab1a8205a2fc34e4bcd40364df902cfa8f3f"
[[package]]
name = "ulid"
version = "1.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "470dbf6591da1b39d43c14523b2b469c86879a53e8b758c8e090a470fe7b1fbe"
dependencies = [
"rand 0.9.1",
"serde",
"web-time",
]
[[package]]
name = "uncased"
version = "0.9.10"
@ -2290,6 +2489,12 @@ version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1fc81956842c57dac11422a97c3b8195a1ff727f06e85c84ed2e8aa277c9a0fd"
[[package]]
name = "untrusted"
version = "0.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8ecb6da28b8a351d773b68d5825ac39017e680750f980f3a1a85cd8dd28a47c1"
[[package]]
name = "url"
version = "2.5.4"
@ -2442,6 +2647,34 @@ dependencies = [
name = "web"
version = "0.1.0"
[[package]]
name = "web-time"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5a6580f308b1fad9207618087a65c04e7a10bc77e02c8e84e9b00dd4b12fa0bb"
dependencies = [
"js-sys",
"wasm-bindgen",
]
[[package]]
name = "webpki-roots"
version = "0.26.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "521bc38abb08001b01866da9f51eb7c5d647a19260e00054a8c7fd5f9e57f7a9"
dependencies = [
"webpki-roots 1.0.4",
]
[[package]]
name = "webpki-roots"
version = "1.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b2878ef029c47c6e8cf779119f20fcf52bde7ad42a731b2a304bc221df17571e"
dependencies = [
"rustls-pki-types",
]
[[package]]
name = "whoami"
version = "1.6.0"

View file

@ -2,5 +2,5 @@
resolver = "3"
members = [
"cli", "lib_sync_core", "web",
"cli", "lib_sync_core", "web", "task_queue",
]

View file

@ -6,6 +6,7 @@ edition = "2024"
[dependencies]
directories = "6.0.0"
tokio = { version = "1.45.0", features = ["default", "rt", "rt-multi-thread", "macros"] }
tokio-stream = "0.1.17"
sqlx = { version = "0.8", features = ["runtime-tokio", "sqlite", "chrono", "migrate", "uuid"] }
clap = { version = "4.5.37", features = ["derive"] }
serde = { version = "1.0.219", features = ["derive"] }

View file

@ -15,10 +15,10 @@ impl TaskPagination {
}
pub fn next(&self) -> Self {
Self {
Self {
offset: self.offset.saturating_add(self.limit.unwrap_or(0)),
..self.clone()
}
..self.clone()
}
}
pub fn prev(&self) -> Self {
@ -45,20 +45,17 @@ impl TaskPagination {
}
pub struct TasksPage<T: TaskPayload> {
tasks: Vec<Task<T>>,
page: TaskPagination
tasks: Vec<Task<T>>,
page: TaskPagination,
}
impl<T: TaskPayload> TasksPage<T> {
fn new(tasks: Vec<Task<T>>, page: TaskPagination) -> Self {
Self {
tasks,
page
}
Self { tasks, page }
}
pub fn next(&self) -> TaskPagination {
self.page.next()
self.page.next()
}
pub fn prev(&self) -> TaskPagination {
@ -67,9 +64,13 @@ impl<T: TaskPayload> TasksPage<T> {
}
pub trait TaskStorage<T: TaskPayload> {
async fn insert_tasks<'a, I: IntoIterator<Item=&'a Task<T>>>(&self, tasks: I) -> crate::Result<()>;
fn get_tasks(&self, options: TaskStatus) -> impl Stream<Item = crate::Result<Task<T>>>;
async fn insert_tasks<'a, I: IntoIterator<Item = &'a Task<T>>>(
&self,
tasks: I,
) -> crate::Result<()>;
fn get_tasks(&self, task_status: TaskStatus) -> impl Stream<Item = crate::Result<Task<T>>>;
fn listen_tasks(&self, task_status: TaskStatus) -> impl Stream<Item = crate::Result<Task<T>>>;
async fn get_paginated_tasks(&self, page: TaskPagination) -> crate::Result<TasksPage<T>>;
}

View file

@ -108,6 +108,11 @@ impl<T: TaskPayload> TaskStorage<T> for Sqlite {
query.fetch(&self.pool).err_into::<crate::Error>()
}
fn listen_tasks(&self, task_status: TaskStatus) -> impl Stream<Item=crate::error::Result<Task<T>>> {
futures::stream::empty()
}
async fn get_paginated_tasks(&self, page: TaskPagination) -> crate::Result<TasksPage<T>> {
let mut builder: QueryBuilder<'_, sqlx::Sqlite> = QueryBuilder::new(
"select id, payload_key, payload, status_id, created_at, updated_at from tasks ",
@ -170,6 +175,7 @@ mod tests {
}
#[sqlx::test(migrator = "MIGRATIONS")]
#[traced_test]
async fn it_save_tasks(pool: SqlitePool) -> sqlx::Result<()> {
let (sqlite, tasks) = setup(pool.clone(), 100);

View file

@ -1,5 +1,7 @@
#![allow(dead_code, unused)]
pub mod error;
pub(crate) use error::*;
pub mod tasks;
mod database;
pub mod tasks;

View file

@ -5,6 +5,9 @@ use std::fmt::Display;
use tabled::Tabled;
mod manager;
mod jobs;
mod worker;
mod bus;
#[derive(sqlx::Type, Debug, Clone)]
#[repr(u8)]
@ -43,8 +46,6 @@ impl<T: Serialize + DeserializeOwned + Send + Unpin + 'static + std::fmt::Debug>
{
}
pub type TaskJob<T> = fn(&Task<T>) -> TaskStatus;
#[derive(sqlx::FromRow, Tabled, Debug)]
pub struct Task<T: TaskPayload> {
id: u32,
@ -76,9 +77,7 @@ impl<T: TaskPayload> Task<T> {
pub fn payload(&self) -> &T {
&self.payload
}
}
impl<T: TaskPayload> Task<T> {
pub fn get_key(&self) -> String {
self.payload_key.clone()
}

View file

@ -0,0 +1,4 @@
#[derive(Clone)]
pub enum Bus {
Local,
}

View file

@ -0,0 +1,3 @@
use crate::tasks::{Task, TaskStatus};
pub type TaskJob<T> = fn(&Task<T>) -> TaskStatus;

View file

@ -1,32 +1,190 @@
use crate::database::TaskStorage;
use crate::tasks::bus::Bus;
use crate::tasks::jobs::TaskJob;
use crate::tasks::{Task, TaskPayload, TaskStatus};
use futures::StreamExt;
use std::marker::PhantomData;
use crate::database::TaskStorage;
use crate::tasks::{Task, TaskJob, TaskPayload, TaskStatus};
use std::pin::pin;
use tokio::sync::mpsc::Receiver;
use tokio::sync::{mpsc, oneshot};
use tokio::sync::oneshot::Sender;
use crate::tasks::worker::TaskMessage;
struct TaskManager<S: TaskPayload, T: TaskStorage<S>>
{
pub enum RateLimit {
Buffer(usize),
Rate(usize),
Ticks(usize),
None,
}
pub struct ManagerOptions {
rate_limit: RateLimit,
bus: Bus,
}
impl ManagerOptions {
pub fn new() -> Self {
Self::default()
}
pub fn with_rate_limit(mut self, rate_limit: RateLimit) -> Self {
self.rate_limit = rate_limit;
self
}
}
impl Default for ManagerOptions {
fn default() -> Self {
Self {
rate_limit: RateLimit::None,
bus: Bus::Local,
}
}
}
struct TaskManager<S: TaskPayload, T: TaskStorage<S>> {
storage: T,
options: ManagerOptions,
_marker: PhantomData<S>,
}
impl<S: TaskPayload, T: TaskStorage<S>> TaskManager<S, T> {
pub fn new(storage: T) -> Self {
Self {
storage,
_marker: PhantomData,
}
pub fn new(storage: T, options: ManagerOptions) -> Self {
Self {
storage,
options,
_marker: PhantomData,
}
}
pub async fn run_tasks(&self, func: TaskJob<S>) -> crate::Result<()> {
pub async fn run_tasks(&self, mut task_sink: TaskMessage<S>) -> crate::Result<()> {
let rows = self.storage.get_tasks(TaskStatus::Pending);
let listener = self.storage.listen_tasks(TaskStatus::Pending);
let result: Vec<(Task<S>, TaskStatus)> = rows.map(|x| {
let task = x.unwrap();
let status = func(&task);
let mut queue = pin!(rows.chain(listener));
(task, status)
}).collect().await;
while let Some(task) = queue.next().await {
let task = match task {
Ok(task) => task,
Err(e) => {
continue
}
};
let sink = match task_sink.recv().await {
Some(s) => s,
None => break, // sink has stoped requesting tasks
};
if let Err(_) = sink.send(task) {
continue;
}
// (task, status)
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::database::{TaskPagination, TasksPage};
use async_stream::stream;
use fake::{Dummy, Fake, Faker};
use futures::Stream;
use serde::{Deserialize, Serialize};
use sqlx::types::Uuid;
use sync::mpsc;
use tokio::sync;
use tokio_stream::wrappers::ReceiverStream;
use tracing_test::traced_test;
use crate::error::Error;
use crate::tasks::worker::{Worker, WorkerManager};
#[derive(Dummy, Serialize, Deserialize, Debug)]
struct DummyTaskPayload {
key: Uuid,
_foo: String,
_bar: String,
}
struct DummyTaskStorage {}
impl TaskStorage<DummyTaskPayload> for DummyTaskStorage {
async fn insert_tasks<'a, I: IntoIterator<Item = &'a Task<DummyTaskPayload>>>(
&self,
_: I,
) -> crate::error::Result<()> {
todo!()
}
fn get_tasks(
&self,
task_status: TaskStatus,
) -> impl Stream<Item = crate::Result<Task<DummyTaskPayload>>> {
let payloads: Vec<DummyTaskPayload> = Faker.fake();
let tasks = payloads.into_iter().enumerate().map(move |(i, item)| {
Ok(Task::new((i + 1).to_string(), item, task_status.clone()))
});
futures::stream::iter(tasks)
}
fn listen_tasks(
&self,
task_status: TaskStatus,
) -> impl Stream<Item = crate::Result<Task<DummyTaskPayload>>> {
let (tx, rx) = mpsc::channel::<crate::Result<Task<DummyTaskPayload>>>(10);
tokio::spawn(async move {
for _ in 0..10 {
tokio::time::sleep(std::time::Duration::from_millis(250)).await;
let payload: DummyTaskPayload = Faker.fake();
let task_status: TaskStatus = task_status.clone();
let task = Ok(Task::new(payload.key.to_string(), payload, task_status));
if let Err(_) = tx.send(task).await {
break;
}
}
});
ReceiverStream::new(rx)
}
async fn get_paginated_tasks(
&self,
_: TaskPagination,
) -> crate::error::Result<TasksPage<DummyTaskPayload>> {
todo!()
}
}
struct DummyWorker;
impl Worker<DummyTaskPayload> for DummyWorker {
fn process_job(task: &Task<DummyTaskPayload>) -> crate::error::Result<()> {
println!("{:#?}", task);
Ok(())
}
async fn on_job_failure(task: &Task<DummyTaskPayload>, error: Error) -> crate::error::Result<()> {
println!("{:#?} {:?}", task, error);
Ok(())
}
}
#[tokio::test]
#[traced_test]
async fn manager_runs() {
let execute_options = ManagerOptions::new();
let local_worker_sink = WorkerManager::get_listener_sink::<DummyTaskPayload, DummyWorker>(execute_options.bus.clone());
let task_manager = TaskManager::new(DummyTaskStorage {}, execute_options);
task_manager.run_tasks(local_worker_sink).await.unwrap()
}
}

View file

@ -0,0 +1,48 @@
use crate::error::Error;
use crate::tasks::bus::Bus;
use crate::tasks::{Task, TaskPayload};
use tokio::sync::mpsc::Receiver;
use tokio::sync::oneshot::Sender;
use tokio::sync::{mpsc, oneshot};
pub type TaskMessage<T> = Receiver<Sender<Task<T>>>;
pub struct WorkerManager;
impl WorkerManager {
pub fn get_listener_sink<T: TaskPayload, W: Worker<T>>(bus: Bus) -> TaskMessage<T> {
match bus {
Bus::Local => {
let (bus_tx, bus_rx) = mpsc::channel(100);
tokio::spawn(async move {
loop {
// TODO: properly catch errors
let (tx, rx) = oneshot::channel();
// Request a task
bus_tx.send(tx).await.unwrap();
// Wait for a task to be returned
let task = rx.await.unwrap();
W::process_job(&task).unwrap();
}
});
bus_rx
}
}
}
}
pub trait Worker<T: TaskPayload> {
async fn pre_process_job(task: &Task<T>) -> crate::Result<()> {
Ok(())
}
fn process_job(task: &Task<T>) -> crate::Result<()>;
async fn post_process_job(task: &Task<T>) -> crate::Result<()> {
Ok(())
}
async fn on_job_failure(task: &Task<T>, error: Error) -> crate::Result<()>;
}

26
task_queue/Cargo.toml Normal file
View file

@ -0,0 +1,26 @@
[package]
name = "task_queue"
version = "0.1.0"
edition = "2024"
[dependencies]
directories = "6.0.0"
tokio = { version = "1.45.0", features = ["default", "rt", "rt-multi-thread", "macros", "signal"] }
tokio-stream = "0.1.17"
sqlx = { version = "0.8", features = ["runtime-tokio", "sqlite", "chrono", "migrate", "uuid"] }
serde = { version = "1.0.219", features = ["derive"] }
chrono = {version = "0.4.41", features = ["serde"]}
serde_json = "1.0.140"
tracing = "0.1.41"
tracing-subscriber = { version = "0.3.19" , features = ["env-filter"]}
figment = { version = "0.10.19", features = ["env"] }
tracing-core = "0.1.33"
futures = "0.3.31"
thiserror = "2.0.12"
async-stream = "0.3.6"
apalis = { version = "1.0.0-rc.1" }
apalis-sqlite = "1.0.0-rc.1"
[dev-dependencies]
fake = { version = "4.3.0", features = ["derive", "chrono", "http", "uuid"] }
tracing-test = "0.2.5"

28
task_queue/src/error.rs Normal file
View file

@ -0,0 +1,28 @@
use thiserror::Error;
#[derive(Error, Debug)]
pub enum Error {
#[error("{0}")]
Exception(&'static str),
#[error("{0}")]
Unhandled(&'static str),
#[error(transparent)]
Io(#[from] tokio::io::Error),
#[error(transparent)]
Sqlx(#[from] sqlx::Error),
#[error(transparent)]
Migration(#[from] sqlx::migrate::MigrateError),
#[error(transparent)]
ParseJson(#[from] serde_json::Error),
#[error(transparent)]
WorkerError(#[from] apalis::prelude::WorkerError),
}
pub type Result<T> = std::result::Result<T, Error>;

9
task_queue/src/lib.rs Normal file
View file

@ -0,0 +1,9 @@
#![allow(dead_code, unused)]
use apalis::prelude::*;
use serde::{Deserialize, Serialize};
pub mod error;
pub(crate) use error::*;
pub mod tasks;

50
task_queue/src/tasks.rs Normal file
View file

@ -0,0 +1,50 @@
use serde::{Deserialize, Serialize};
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct Task {
pub id: u32,
}
#[cfg(test)]
mod tests {
use std::time::Duration;
use crate::Result;
use super::*;
use apalis::prelude::*;
use apalis_sqlite::SqliteStorage;
use fake::{Fake, Faker};
use sqlx::SqlitePool;
use tokio_stream::StreamExt;
async fn generate_dummy_tasks<T: TaskSink<super::Task>>(storage: &mut T)
where
<T as apalis::prelude::Backend>::Error: std::fmt::Debug,
{
storage
.push(super::Task { id: Faker.fake() })
.await
.unwrap();
}
#[tokio::test]
async fn can_enqueue_tasks() {
let pool = SqlitePool::connect(":memory:").await.unwrap();
SqliteStorage::setup(&pool).await.unwrap();
let mut backend = SqliteStorage::new(&pool);
generate_dummy_tasks(&mut backend).await;
async fn process_task(task: super::Task, worker: WorkerContext) -> crate::Result<()> {
tokio::time::sleep(Duration::from_millis(100)).await;
worker.stop()?;
Ok(())
}
let worker = WorkerBuilder::new("rango-tango")
.backend(backend)
.build(process_task);
worker.run().await.unwrap();
}
}