From 040c53cebd0e4b8d01f96861949e13bb29391745 Mon Sep 17 00:00:00 2001 From: aleidk Date: Mon, 29 Dec 2025 19:45:19 -0300 Subject: [PATCH] refactor(lib_core): create new lib core with apalis use apalis library for task management --- Cargo.lock | 340 ++++++++++++++++++++++++++++++++------- Cargo.toml | 2 +- lib_sync_core/src/lib.rs | 4 +- task_queue/Cargo.toml | 26 +++ task_queue/src/error.rs | 28 ++++ task_queue/src/lib.rs | 9 ++ task_queue/src/tasks.rs | 50 ++++++ 7 files changed, 403 insertions(+), 56 deletions(-) create mode 100644 task_queue/Cargo.toml create mode 100644 task_queue/src/error.rs create mode 100644 task_queue/src/lib.rs create mode 100644 task_queue/src/tasks.rs diff --git a/Cargo.lock b/Cargo.lock index 6292eb7..3b5e94c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -110,6 +110,84 @@ dependencies = [ "windows-sys 0.59.0", ] +[[package]] +name = "apalis" +version = "1.0.0-rc.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f93be0eb33b912f5e66004d0b756423c285273259068b1c80a71d7842658189b" +dependencies = [ + "apalis-core", + "futures-util", + "pin-project", + "thiserror", + "tower", + "tracing", +] + +[[package]] +name = "apalis-codec" +version = "0.1.0-rc.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a5ed6bb8e64c360ed4ad666a6cbc42e9e6df73087461dc4071f510a3af284637" +dependencies = [ + "apalis-core", + "serde", + "serde_json", +] + +[[package]] +name = "apalis-core" +version = "1.0.0-rc.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6b1557d680ee4a9b42a76ab3a9572cee1a00d45e7eb455427d906c42774766e7" +dependencies = [ + "futures-channel", + "futures-core", + "futures-sink", + "futures-timer", + "futures-util", + "pin-project", + "serde", + "thiserror", + "tower-layer", + "tower-service", + "tracing", +] + +[[package]] +name = "apalis-sql" +version = "1.0.0-rc.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5ade5d8faa60e9975b01d3bb1ebc5028589aa4986365eaa4d080d30ed3b5141f" +dependencies = [ + "apalis-core", + "chrono", + "serde", + "serde_json", + "thiserror", +] + +[[package]] +name = "apalis-sqlite" +version = "1.0.0-rc.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fd43020ce13d6cb8c8d8c09657a6691d8490cd1ce0d8bc0f7fef8bf9b23cfe86" +dependencies = [ + "apalis-codec", + "apalis-core", + "apalis-sql", + "chrono", + "futures", + "log", + "pin-project", + "serde", + "serde_json", + "sqlx", + "thiserror", + "tokio", + "ulid", +] + [[package]] name = "async-stream" version = "0.3.6" @@ -526,16 +604,6 @@ version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "877a4ace8713b0bcf2a4e7eec82529c029f1d0619886d18145fea96c3ffe5c0f" -[[package]] -name = "errno" -version = "0.3.11" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "976dd42dc7e85965fe702eb8164f21f450704bdde31faefd6471dba214cb594e" -dependencies = [ - "libc", - "windows-sys 0.59.0", -] - [[package]] name = "etcetera" version = "0.8.0" @@ -573,12 +641,6 @@ dependencies = [ "uuid", ] -[[package]] -name = "fastrand" -version = "2.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "37909eebbb50d72f9059c3b6d82c0463f2ff062c9e95845c43a6c9c0355411be" - [[package]] name = "figment" version = "0.10.19" @@ -706,6 +768,12 @@ version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f90f7dce0722e95104fcb095585910c0977252f286e354b5e3bd38902cd99988" +[[package]] +name = "futures-timer" +version = "3.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f288b0a4f20f9a56b5d1da57e2227c661b7b16168e2f72365f57b63326e29b24" + [[package]] name = "futures-util" version = "0.3.31" @@ -1107,12 +1175,6 @@ dependencies = [ "vcpkg", ] -[[package]] -name = "linux-raw-sys" -version = "0.9.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cd945864f07fe9f5371a27ad7b52a172b4b499999f1d97574c9fa68373937e12" - [[package]] name = "litemap" version = "0.7.5" @@ -1342,6 +1404,26 @@ version = "2.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e3148f5046208a5d56bcfc03053e3ca6334e51da8dfb19b6cdc8b306fae3283e" +[[package]] +name = "pin-project" +version = "1.1.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "677f1add503faace112b9f1373e43e9e054bfdd22ff1a63c1bc485eaec6a6a8a" +dependencies = [ + "pin-project-internal", +] + +[[package]] +name = "pin-project-internal" +version = "1.1.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6e918e4ff8c4549eb882f14b3a4bc8c8bc93de829416eacf579f1207a8fbf861" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "pin-project-lite" version = "0.2.16" @@ -1572,6 +1654,20 @@ version = "0.8.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2b15c43186be67a4fd63bee50d0303afffcef381492ebe2c5d87f324e1b8815c" +[[package]] +name = "ring" +version = "0.17.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a4689e6c2294d81e88dc6261c768b63bc4fcdb852be6d1352498b114f61383b7" +dependencies = [ + "cc", + "cfg-if", + "getrandom 0.2.16", + "libc", + "untrusted", + "windows-sys 0.52.0", +] + [[package]] name = "rsa" version = "0.9.8" @@ -1599,16 +1695,37 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "719b953e2095829ee67db738b3bfa9fa368c94900df327b3f07fe6e794d2fe1f" [[package]] -name = "rustix" -version = "1.0.7" +name = "rustls" +version = "0.23.35" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c71e83d6afe7ff64890ec6b71d6a69bb8a610ab78ce364b3352876bb4c801266" +checksum = "533f54bc6a7d4f647e46ad909549eda97bf5afc1585190ef692b4286b198bd8f" dependencies = [ - "bitflags", - "errno", - "libc", - "linux-raw-sys", - "windows-sys 0.59.0", + "once_cell", + "ring", + "rustls-pki-types", + "rustls-webpki", + "subtle", + "zeroize", +] + +[[package]] +name = "rustls-pki-types" +version = "1.13.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "21e6f2ab2928ca4291b86736a8bd920a277a399bba1589409d72154ff87c1282" +dependencies = [ + "zeroize", +] + +[[package]] +name = "rustls-webpki" +version = "0.103.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2ffdfa2f5286e2247234e03f680868ac2815974dc39e00ea15adc445d0aafe52" +dependencies = [ + "ring", + "rustls-pki-types", + "untrusted", ] [[package]] @@ -1716,6 +1833,15 @@ version = "1.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0fda2ff0d084019ba4d7c6f371c95d8fd75ce3524c3cb8fb653a3023f6323e64" +[[package]] +name = "signal-hook-registry" +version = "1.4.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7664a098b8e616bdfcc2dc0e9ac44eb231eedf41db4e9fe95d8d32ec728dedad" +dependencies = [ + "libc", +] + [[package]] name = "signature" version = "2.2.0" @@ -1775,9 +1901,9 @@ dependencies = [ [[package]] name = "sqlx" -version = "0.8.5" +version = "0.8.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f3c3a85280daca669cfd3bcb68a337882a8bc57ec882f72c5d13a430613a738e" +checksum = "1fefb893899429669dcdd979aff487bd78f4064e5e7907e4269081e0ef7d97dc" dependencies = [ "sqlx-core", "sqlx-macros", @@ -1788,9 +1914,9 @@ dependencies = [ [[package]] name = "sqlx-core" -version = "0.8.5" +version = "0.8.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f743f2a3cea30a58cd479013f75550e879009e3a02f616f18ca699335aa248c3" +checksum = "ee6798b1838b6a0f69c007c133b8df5866302197e404e8b6ee8ed3e3a5e68dc6" dependencies = [ "base64", "bytes", @@ -1810,6 +1936,7 @@ dependencies = [ "memchr", "once_cell", "percent-encoding", + "rustls", "serde", "serde_json", "sha2", @@ -1820,13 +1947,14 @@ dependencies = [ "tracing", "url", "uuid", + "webpki-roots 0.26.11", ] [[package]] name = "sqlx-macros" -version = "0.8.5" +version = "0.8.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7f4200e0fde19834956d4252347c12a083bdcb237d7a1a1446bffd8768417dce" +checksum = "a2d452988ccaacfbf5e0bdbc348fb91d7c8af5bee192173ac3636b5fb6e6715d" dependencies = [ "proc-macro2", "quote", @@ -1837,9 +1965,9 @@ dependencies = [ [[package]] name = "sqlx-macros-core" -version = "0.8.5" +version = "0.8.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "882ceaa29cade31beca7129b6beeb05737f44f82dbe2a9806ecea5a7093d00b7" +checksum = "19a9c1841124ac5a61741f96e1d9e2ec77424bf323962dd894bdb93f37d5219b" dependencies = [ "dotenvy", "either", @@ -1856,16 +1984,15 @@ dependencies = [ "sqlx-postgres", "sqlx-sqlite", "syn", - "tempfile", "tokio", "url", ] [[package]] name = "sqlx-mysql" -version = "0.8.5" +version = "0.8.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0afdd3aa7a629683c2d750c2df343025545087081ab5942593a5288855b1b7a7" +checksum = "aa003f0038df784eb8fecbbac13affe3da23b45194bd57dba231c8f48199c526" dependencies = [ "atoi", "base64", @@ -1907,9 +2034,9 @@ dependencies = [ [[package]] name = "sqlx-postgres" -version = "0.8.5" +version = "0.8.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a0bedbe1bbb5e2615ef347a5e9d8cd7680fb63e77d9dafc0f29be15e53f1ebe6" +checksum = "db58fcd5a53cf07c184b154801ff91347e4c30d17a3562a635ff028ad5deda46" dependencies = [ "atoi", "base64", @@ -1946,9 +2073,9 @@ dependencies = [ [[package]] name = "sqlx-sqlite" -version = "0.8.5" +version = "0.8.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c26083e9a520e8eb87a06b12347679b142dc2ea29e6e409f805644a7a979a5bc" +checksum = "c2d12fe70b2c1b4401038055f90f151b78208de1f9f89a7dbfd41587a10c3eea" dependencies = [ "atoi", "chrono", @@ -2010,6 +2137,12 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "sync_wrapper" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0bf256ce5efdfa370213c1dabab5935a12e49f2c58d15e9eac2870d3b4f27263" + [[package]] name = "synstructure" version = "0.13.2" @@ -2046,16 +2179,27 @@ dependencies = [ ] [[package]] -name = "tempfile" -version = "3.19.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7437ac7763b9b123ccf33c338a5cc1bac6f69b45a136c19bdd8a65e3916435bf" +name = "task_queue" +version = "0.1.0" dependencies = [ - "fastrand", - "getrandom 0.3.2", - "once_cell", - "rustix", - "windows-sys 0.59.0", + "apalis", + "apalis-sqlite", + "async-stream", + "chrono", + "directories", + "fake", + "figment", + "futures", + "serde", + "serde_json", + "sqlx", + "thiserror", + "tokio", + "tokio-stream", + "tracing", + "tracing-core", + "tracing-subscriber", + "tracing-test", ] [[package]] @@ -2133,6 +2277,7 @@ dependencies = [ "libc", "mio", "pin-project-lite", + "signal-hook-registry", "socket2", "tokio-macros", "windows-sys 0.52.0", @@ -2160,6 +2305,48 @@ dependencies = [ "tokio", ] +[[package]] +name = "tokio-util" +version = "0.7.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2efa149fe76073d6e8fd97ef4f4eca7b67f599660115591483572e406e165594" +dependencies = [ + "bytes", + "futures-core", + "futures-sink", + "pin-project-lite", + "tokio", +] + +[[package]] +name = "tower" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d039ad9159c98b70ecfd540b2573b97f7f52c3e8d9f8ad57a24b916a536975f9" +dependencies = [ + "futures-core", + "futures-util", + "pin-project-lite", + "sync_wrapper", + "tokio", + "tokio-util", + "tower-layer", + "tower-service", + "tracing", +] + +[[package]] +name = "tower-layer" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "121c2a6cda46980bb0fcd1647ffaf6cd3fc79a013de288782836f6df9c48780e" + +[[package]] +name = "tower-service" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8df9b6e13f2d32c91b9bd719c00d1958837bc7dec474d94952798cc8e69eeec3" + [[package]] name = "tracing" version = "0.1.41" @@ -2249,6 +2436,17 @@ version = "1.18.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1dccffe3ce07af9386bfd29e80c0ab1a8205a2fc34e4bcd40364df902cfa8f3f" +[[package]] +name = "ulid" +version = "1.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "470dbf6591da1b39d43c14523b2b469c86879a53e8b758c8e090a470fe7b1fbe" +dependencies = [ + "rand 0.9.1", + "serde", + "web-time", +] + [[package]] name = "uncased" version = "0.9.10" @@ -2291,6 +2489,12 @@ version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1fc81956842c57dac11422a97c3b8195a1ff727f06e85c84ed2e8aa277c9a0fd" +[[package]] +name = "untrusted" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8ecb6da28b8a351d773b68d5825ac39017e680750f980f3a1a85cd8dd28a47c1" + [[package]] name = "url" version = "2.5.4" @@ -2443,6 +2647,34 @@ dependencies = [ name = "web" version = "0.1.0" +[[package]] +name = "web-time" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5a6580f308b1fad9207618087a65c04e7a10bc77e02c8e84e9b00dd4b12fa0bb" +dependencies = [ + "js-sys", + "wasm-bindgen", +] + +[[package]] +name = "webpki-roots" +version = "0.26.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "521bc38abb08001b01866da9f51eb7c5d647a19260e00054a8c7fd5f9e57f7a9" +dependencies = [ + "webpki-roots 1.0.4", +] + +[[package]] +name = "webpki-roots" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b2878ef029c47c6e8cf779119f20fcf52bde7ad42a731b2a304bc221df17571e" +dependencies = [ + "rustls-pki-types", +] + [[package]] name = "whoami" version = "1.6.0" diff --git a/Cargo.toml b/Cargo.toml index df1befe..14eaf11 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -2,5 +2,5 @@ resolver = "3" members = [ - "cli", "lib_sync_core", "web", + "cli", "lib_sync_core", "web", "task_queue", ] diff --git a/lib_sync_core/src/lib.rs b/lib_sync_core/src/lib.rs index 1030a9a..13a5f6a 100644 --- a/lib_sync_core/src/lib.rs +++ b/lib_sync_core/src/lib.rs @@ -1,5 +1,7 @@ +#![allow(dead_code, unused)] + pub mod error; pub(crate) use error::*; -pub mod tasks; mod database; +pub mod tasks; diff --git a/task_queue/Cargo.toml b/task_queue/Cargo.toml new file mode 100644 index 0000000..5ec2395 --- /dev/null +++ b/task_queue/Cargo.toml @@ -0,0 +1,26 @@ +[package] +name = "task_queue" +version = "0.1.0" +edition = "2024" + +[dependencies] +directories = "6.0.0" +tokio = { version = "1.45.0", features = ["default", "rt", "rt-multi-thread", "macros", "signal"] } +tokio-stream = "0.1.17" +sqlx = { version = "0.8", features = ["runtime-tokio", "sqlite", "chrono", "migrate", "uuid"] } +serde = { version = "1.0.219", features = ["derive"] } +chrono = {version = "0.4.41", features = ["serde"]} +serde_json = "1.0.140" +tracing = "0.1.41" +tracing-subscriber = { version = "0.3.19" , features = ["env-filter"]} +figment = { version = "0.10.19", features = ["env"] } +tracing-core = "0.1.33" +futures = "0.3.31" +thiserror = "2.0.12" +async-stream = "0.3.6" +apalis = { version = "1.0.0-rc.1" } +apalis-sqlite = "1.0.0-rc.1" + +[dev-dependencies] +fake = { version = "4.3.0", features = ["derive", "chrono", "http", "uuid"] } +tracing-test = "0.2.5" diff --git a/task_queue/src/error.rs b/task_queue/src/error.rs new file mode 100644 index 0000000..e6129be --- /dev/null +++ b/task_queue/src/error.rs @@ -0,0 +1,28 @@ +use thiserror::Error; + +#[derive(Error, Debug)] +pub enum Error { + #[error("{0}")] + Exception(&'static str), + + #[error("{0}")] + Unhandled(&'static str), + + #[error(transparent)] + Io(#[from] tokio::io::Error), + + #[error(transparent)] + Sqlx(#[from] sqlx::Error), + + #[error(transparent)] + Migration(#[from] sqlx::migrate::MigrateError), + + #[error(transparent)] + ParseJson(#[from] serde_json::Error), + + #[error(transparent)] + WorkerError(#[from] apalis::prelude::WorkerError), +} + +pub type Result = std::result::Result; + diff --git a/task_queue/src/lib.rs b/task_queue/src/lib.rs new file mode 100644 index 0000000..ad9ee6e --- /dev/null +++ b/task_queue/src/lib.rs @@ -0,0 +1,9 @@ +#![allow(dead_code, unused)] + +use apalis::prelude::*; +use serde::{Deserialize, Serialize}; + +pub mod error; + +pub(crate) use error::*; +pub mod tasks; diff --git a/task_queue/src/tasks.rs b/task_queue/src/tasks.rs new file mode 100644 index 0000000..e1adb58 --- /dev/null +++ b/task_queue/src/tasks.rs @@ -0,0 +1,50 @@ +use serde::{Deserialize, Serialize}; + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct Task { + pub id: u32, +} + +#[cfg(test)] +mod tests { + use std::time::Duration; + + use crate::Result; + + use super::*; + use apalis::prelude::*; + use apalis_sqlite::SqliteStorage; + use fake::{Fake, Faker}; + use sqlx::SqlitePool; + use tokio_stream::StreamExt; + + async fn generate_dummy_tasks>(storage: &mut T) + where + ::Error: std::fmt::Debug, + { + storage + .push(super::Task { id: Faker.fake() }) + .await + .unwrap(); + } + + #[tokio::test] + async fn can_enqueue_tasks() { + let pool = SqlitePool::connect(":memory:").await.unwrap(); + SqliteStorage::setup(&pool).await.unwrap(); + let mut backend = SqliteStorage::new(&pool); + + generate_dummy_tasks(&mut backend).await; + + async fn process_task(task: super::Task, worker: WorkerContext) -> crate::Result<()> { + tokio::time::sleep(Duration::from_millis(100)).await; + + worker.stop()?; + Ok(()) + } + let worker = WorkerBuilder::new("rango-tango") + .backend(backend) + .build(process_task); + worker.run().await.unwrap(); + } +}