diff --git a/.idea/codeStyles/Project.xml b/.idea/codeStyles/Project.xml
new file mode 100644
index 0000000..4e20976
--- /dev/null
+++ b/.idea/codeStyles/Project.xml
@@ -0,0 +1,29 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.idea/codeStyles/codeStyleConfig.xml b/.idea/codeStyles/codeStyleConfig.xml
index a55e7a1..6e6eec1 100644
--- a/.idea/codeStyles/codeStyleConfig.xml
+++ b/.idea/codeStyles/codeStyleConfig.xml
@@ -1,5 +1,6 @@
+
\ No newline at end of file
diff --git a/Cargo.lock b/Cargo.lock
index 43de3fc..3b5e94c 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -110,6 +110,84 @@ dependencies = [
"windows-sys 0.59.0",
]
+[[package]]
+name = "apalis"
+version = "1.0.0-rc.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "f93be0eb33b912f5e66004d0b756423c285273259068b1c80a71d7842658189b"
+dependencies = [
+ "apalis-core",
+ "futures-util",
+ "pin-project",
+ "thiserror",
+ "tower",
+ "tracing",
+]
+
+[[package]]
+name = "apalis-codec"
+version = "0.1.0-rc.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "a5ed6bb8e64c360ed4ad666a6cbc42e9e6df73087461dc4071f510a3af284637"
+dependencies = [
+ "apalis-core",
+ "serde",
+ "serde_json",
+]
+
+[[package]]
+name = "apalis-core"
+version = "1.0.0-rc.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "6b1557d680ee4a9b42a76ab3a9572cee1a00d45e7eb455427d906c42774766e7"
+dependencies = [
+ "futures-channel",
+ "futures-core",
+ "futures-sink",
+ "futures-timer",
+ "futures-util",
+ "pin-project",
+ "serde",
+ "thiserror",
+ "tower-layer",
+ "tower-service",
+ "tracing",
+]
+
+[[package]]
+name = "apalis-sql"
+version = "1.0.0-rc.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "5ade5d8faa60e9975b01d3bb1ebc5028589aa4986365eaa4d080d30ed3b5141f"
+dependencies = [
+ "apalis-core",
+ "chrono",
+ "serde",
+ "serde_json",
+ "thiserror",
+]
+
+[[package]]
+name = "apalis-sqlite"
+version = "1.0.0-rc.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "fd43020ce13d6cb8c8d8c09657a6691d8490cd1ce0d8bc0f7fef8bf9b23cfe86"
+dependencies = [
+ "apalis-codec",
+ "apalis-core",
+ "apalis-sql",
+ "chrono",
+ "futures",
+ "log",
+ "pin-project",
+ "serde",
+ "serde_json",
+ "sqlx",
+ "thiserror",
+ "tokio",
+ "ulid",
+]
+
[[package]]
name = "async-stream"
version = "0.3.6"
@@ -526,16 +604,6 @@ version = "1.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "877a4ace8713b0bcf2a4e7eec82529c029f1d0619886d18145fea96c3ffe5c0f"
-[[package]]
-name = "errno"
-version = "0.3.11"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "976dd42dc7e85965fe702eb8164f21f450704bdde31faefd6471dba214cb594e"
-dependencies = [
- "libc",
- "windows-sys 0.59.0",
-]
-
[[package]]
name = "etcetera"
version = "0.8.0"
@@ -573,12 +641,6 @@ dependencies = [
"uuid",
]
-[[package]]
-name = "fastrand"
-version = "2.3.0"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "37909eebbb50d72f9059c3b6d82c0463f2ff062c9e95845c43a6c9c0355411be"
-
[[package]]
name = "figment"
version = "0.10.19"
@@ -706,6 +768,12 @@ version = "0.3.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f90f7dce0722e95104fcb095585910c0977252f286e354b5e3bd38902cd99988"
+[[package]]
+name = "futures-timer"
+version = "3.0.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "f288b0a4f20f9a56b5d1da57e2227c661b7b16168e2f72365f57b63326e29b24"
+
[[package]]
name = "futures-util"
version = "0.3.31"
@@ -1066,6 +1134,7 @@ dependencies = [
"tabled",
"thiserror",
"tokio",
+ "tokio-stream",
"tracing",
"tracing-core",
"tracing-subscriber",
@@ -1106,12 +1175,6 @@ dependencies = [
"vcpkg",
]
-[[package]]
-name = "linux-raw-sys"
-version = "0.9.4"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "cd945864f07fe9f5371a27ad7b52a172b4b499999f1d97574c9fa68373937e12"
-
[[package]]
name = "litemap"
version = "0.7.5"
@@ -1341,6 +1404,26 @@ version = "2.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e3148f5046208a5d56bcfc03053e3ca6334e51da8dfb19b6cdc8b306fae3283e"
+[[package]]
+name = "pin-project"
+version = "1.1.10"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "677f1add503faace112b9f1373e43e9e054bfdd22ff1a63c1bc485eaec6a6a8a"
+dependencies = [
+ "pin-project-internal",
+]
+
+[[package]]
+name = "pin-project-internal"
+version = "1.1.10"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "6e918e4ff8c4549eb882f14b3a4bc8c8bc93de829416eacf579f1207a8fbf861"
+dependencies = [
+ "proc-macro2",
+ "quote",
+ "syn",
+]
+
[[package]]
name = "pin-project-lite"
version = "0.2.16"
@@ -1571,6 +1654,20 @@ version = "0.8.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2b15c43186be67a4fd63bee50d0303afffcef381492ebe2c5d87f324e1b8815c"
+[[package]]
+name = "ring"
+version = "0.17.14"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "a4689e6c2294d81e88dc6261c768b63bc4fcdb852be6d1352498b114f61383b7"
+dependencies = [
+ "cc",
+ "cfg-if",
+ "getrandom 0.2.16",
+ "libc",
+ "untrusted",
+ "windows-sys 0.52.0",
+]
+
[[package]]
name = "rsa"
version = "0.9.8"
@@ -1598,16 +1695,37 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "719b953e2095829ee67db738b3bfa9fa368c94900df327b3f07fe6e794d2fe1f"
[[package]]
-name = "rustix"
-version = "1.0.7"
+name = "rustls"
+version = "0.23.35"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "c71e83d6afe7ff64890ec6b71d6a69bb8a610ab78ce364b3352876bb4c801266"
+checksum = "533f54bc6a7d4f647e46ad909549eda97bf5afc1585190ef692b4286b198bd8f"
dependencies = [
- "bitflags",
- "errno",
- "libc",
- "linux-raw-sys",
- "windows-sys 0.59.0",
+ "once_cell",
+ "ring",
+ "rustls-pki-types",
+ "rustls-webpki",
+ "subtle",
+ "zeroize",
+]
+
+[[package]]
+name = "rustls-pki-types"
+version = "1.13.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "21e6f2ab2928ca4291b86736a8bd920a277a399bba1589409d72154ff87c1282"
+dependencies = [
+ "zeroize",
+]
+
+[[package]]
+name = "rustls-webpki"
+version = "0.103.8"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "2ffdfa2f5286e2247234e03f680868ac2815974dc39e00ea15adc445d0aafe52"
+dependencies = [
+ "ring",
+ "rustls-pki-types",
+ "untrusted",
]
[[package]]
@@ -1715,6 +1833,15 @@ version = "1.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0fda2ff0d084019ba4d7c6f371c95d8fd75ce3524c3cb8fb653a3023f6323e64"
+[[package]]
+name = "signal-hook-registry"
+version = "1.4.7"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "7664a098b8e616bdfcc2dc0e9ac44eb231eedf41db4e9fe95d8d32ec728dedad"
+dependencies = [
+ "libc",
+]
+
[[package]]
name = "signature"
version = "2.2.0"
@@ -1774,9 +1901,9 @@ dependencies = [
[[package]]
name = "sqlx"
-version = "0.8.5"
+version = "0.8.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "f3c3a85280daca669cfd3bcb68a337882a8bc57ec882f72c5d13a430613a738e"
+checksum = "1fefb893899429669dcdd979aff487bd78f4064e5e7907e4269081e0ef7d97dc"
dependencies = [
"sqlx-core",
"sqlx-macros",
@@ -1787,9 +1914,9 @@ dependencies = [
[[package]]
name = "sqlx-core"
-version = "0.8.5"
+version = "0.8.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "f743f2a3cea30a58cd479013f75550e879009e3a02f616f18ca699335aa248c3"
+checksum = "ee6798b1838b6a0f69c007c133b8df5866302197e404e8b6ee8ed3e3a5e68dc6"
dependencies = [
"base64",
"bytes",
@@ -1809,6 +1936,7 @@ dependencies = [
"memchr",
"once_cell",
"percent-encoding",
+ "rustls",
"serde",
"serde_json",
"sha2",
@@ -1819,13 +1947,14 @@ dependencies = [
"tracing",
"url",
"uuid",
+ "webpki-roots 0.26.11",
]
[[package]]
name = "sqlx-macros"
-version = "0.8.5"
+version = "0.8.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "7f4200e0fde19834956d4252347c12a083bdcb237d7a1a1446bffd8768417dce"
+checksum = "a2d452988ccaacfbf5e0bdbc348fb91d7c8af5bee192173ac3636b5fb6e6715d"
dependencies = [
"proc-macro2",
"quote",
@@ -1836,9 +1965,9 @@ dependencies = [
[[package]]
name = "sqlx-macros-core"
-version = "0.8.5"
+version = "0.8.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "882ceaa29cade31beca7129b6beeb05737f44f82dbe2a9806ecea5a7093d00b7"
+checksum = "19a9c1841124ac5a61741f96e1d9e2ec77424bf323962dd894bdb93f37d5219b"
dependencies = [
"dotenvy",
"either",
@@ -1855,16 +1984,15 @@ dependencies = [
"sqlx-postgres",
"sqlx-sqlite",
"syn",
- "tempfile",
"tokio",
"url",
]
[[package]]
name = "sqlx-mysql"
-version = "0.8.5"
+version = "0.8.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "0afdd3aa7a629683c2d750c2df343025545087081ab5942593a5288855b1b7a7"
+checksum = "aa003f0038df784eb8fecbbac13affe3da23b45194bd57dba231c8f48199c526"
dependencies = [
"atoi",
"base64",
@@ -1906,9 +2034,9 @@ dependencies = [
[[package]]
name = "sqlx-postgres"
-version = "0.8.5"
+version = "0.8.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "a0bedbe1bbb5e2615ef347a5e9d8cd7680fb63e77d9dafc0f29be15e53f1ebe6"
+checksum = "db58fcd5a53cf07c184b154801ff91347e4c30d17a3562a635ff028ad5deda46"
dependencies = [
"atoi",
"base64",
@@ -1945,9 +2073,9 @@ dependencies = [
[[package]]
name = "sqlx-sqlite"
-version = "0.8.5"
+version = "0.8.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "c26083e9a520e8eb87a06b12347679b142dc2ea29e6e409f805644a7a979a5bc"
+checksum = "c2d12fe70b2c1b4401038055f90f151b78208de1f9f89a7dbfd41587a10c3eea"
dependencies = [
"atoi",
"chrono",
@@ -2009,6 +2137,12 @@ dependencies = [
"unicode-ident",
]
+[[package]]
+name = "sync_wrapper"
+version = "1.0.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "0bf256ce5efdfa370213c1dabab5935a12e49f2c58d15e9eac2870d3b4f27263"
+
[[package]]
name = "synstructure"
version = "0.13.2"
@@ -2045,16 +2179,27 @@ dependencies = [
]
[[package]]
-name = "tempfile"
-version = "3.19.1"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "7437ac7763b9b123ccf33c338a5cc1bac6f69b45a136c19bdd8a65e3916435bf"
+name = "task_queue"
+version = "0.1.0"
dependencies = [
- "fastrand",
- "getrandom 0.3.2",
- "once_cell",
- "rustix",
- "windows-sys 0.59.0",
+ "apalis",
+ "apalis-sqlite",
+ "async-stream",
+ "chrono",
+ "directories",
+ "fake",
+ "figment",
+ "futures",
+ "serde",
+ "serde_json",
+ "sqlx",
+ "thiserror",
+ "tokio",
+ "tokio-stream",
+ "tracing",
+ "tracing-core",
+ "tracing-subscriber",
+ "tracing-test",
]
[[package]]
@@ -2132,6 +2277,7 @@ dependencies = [
"libc",
"mio",
"pin-project-lite",
+ "signal-hook-registry",
"socket2",
"tokio-macros",
"windows-sys 0.52.0",
@@ -2159,6 +2305,48 @@ dependencies = [
"tokio",
]
+[[package]]
+name = "tokio-util"
+version = "0.7.17"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "2efa149fe76073d6e8fd97ef4f4eca7b67f599660115591483572e406e165594"
+dependencies = [
+ "bytes",
+ "futures-core",
+ "futures-sink",
+ "pin-project-lite",
+ "tokio",
+]
+
+[[package]]
+name = "tower"
+version = "0.5.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "d039ad9159c98b70ecfd540b2573b97f7f52c3e8d9f8ad57a24b916a536975f9"
+dependencies = [
+ "futures-core",
+ "futures-util",
+ "pin-project-lite",
+ "sync_wrapper",
+ "tokio",
+ "tokio-util",
+ "tower-layer",
+ "tower-service",
+ "tracing",
+]
+
+[[package]]
+name = "tower-layer"
+version = "0.3.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "121c2a6cda46980bb0fcd1647ffaf6cd3fc79a013de288782836f6df9c48780e"
+
+[[package]]
+name = "tower-service"
+version = "0.3.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "8df9b6e13f2d32c91b9bd719c00d1958837bc7dec474d94952798cc8e69eeec3"
+
[[package]]
name = "tracing"
version = "0.1.41"
@@ -2248,6 +2436,17 @@ version = "1.18.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1dccffe3ce07af9386bfd29e80c0ab1a8205a2fc34e4bcd40364df902cfa8f3f"
+[[package]]
+name = "ulid"
+version = "1.2.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "470dbf6591da1b39d43c14523b2b469c86879a53e8b758c8e090a470fe7b1fbe"
+dependencies = [
+ "rand 0.9.1",
+ "serde",
+ "web-time",
+]
+
[[package]]
name = "uncased"
version = "0.9.10"
@@ -2290,6 +2489,12 @@ version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1fc81956842c57dac11422a97c3b8195a1ff727f06e85c84ed2e8aa277c9a0fd"
+[[package]]
+name = "untrusted"
+version = "0.9.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "8ecb6da28b8a351d773b68d5825ac39017e680750f980f3a1a85cd8dd28a47c1"
+
[[package]]
name = "url"
version = "2.5.4"
@@ -2442,6 +2647,34 @@ dependencies = [
name = "web"
version = "0.1.0"
+[[package]]
+name = "web-time"
+version = "1.1.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "5a6580f308b1fad9207618087a65c04e7a10bc77e02c8e84e9b00dd4b12fa0bb"
+dependencies = [
+ "js-sys",
+ "wasm-bindgen",
+]
+
+[[package]]
+name = "webpki-roots"
+version = "0.26.11"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "521bc38abb08001b01866da9f51eb7c5d647a19260e00054a8c7fd5f9e57f7a9"
+dependencies = [
+ "webpki-roots 1.0.4",
+]
+
+[[package]]
+name = "webpki-roots"
+version = "1.0.4"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "b2878ef029c47c6e8cf779119f20fcf52bde7ad42a731b2a304bc221df17571e"
+dependencies = [
+ "rustls-pki-types",
+]
+
[[package]]
name = "whoami"
version = "1.6.0"
diff --git a/Cargo.toml b/Cargo.toml
index df1befe..14eaf11 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -2,5 +2,5 @@
resolver = "3"
members = [
- "cli", "lib_sync_core", "web",
+ "cli", "lib_sync_core", "web", "task_queue",
]
diff --git a/lib_sync_core/Cargo.toml b/lib_sync_core/Cargo.toml
index 0dcc4f3..be60587 100644
--- a/lib_sync_core/Cargo.toml
+++ b/lib_sync_core/Cargo.toml
@@ -6,6 +6,7 @@ edition = "2024"
[dependencies]
directories = "6.0.0"
tokio = { version = "1.45.0", features = ["default", "rt", "rt-multi-thread", "macros"] }
+tokio-stream = "0.1.17"
sqlx = { version = "0.8", features = ["runtime-tokio", "sqlite", "chrono", "migrate", "uuid"] }
clap = { version = "4.5.37", features = ["derive"] }
serde = { version = "1.0.219", features = ["derive"] }
diff --git a/lib_sync_core/src/database.rs b/lib_sync_core/src/database.rs
index 45db661..89e5da0 100644
--- a/lib_sync_core/src/database.rs
+++ b/lib_sync_core/src/database.rs
@@ -13,12 +13,12 @@ impl TaskPagination {
pub fn new() -> Self {
Self::default()
}
-
+
pub fn next(&self) -> Self {
- Self {
+ Self {
offset: self.offset.saturating_add(self.limit.unwrap_or(0)),
- ..self.clone()
- }
+ ..self.clone()
+ }
}
pub fn prev(&self) -> Self {
@@ -45,20 +45,17 @@ impl TaskPagination {
}
pub struct TasksPage {
- tasks: Vec>,
- page: TaskPagination
+ tasks: Vec>,
+ page: TaskPagination,
}
impl TasksPage {
fn new(tasks: Vec>, page: TaskPagination) -> Self {
- Self {
- tasks,
- page
- }
+ Self { tasks, page }
}
pub fn next(&self) -> TaskPagination {
- self.page.next()
+ self.page.next()
}
pub fn prev(&self) -> TaskPagination {
@@ -67,9 +64,13 @@ impl TasksPage {
}
pub trait TaskStorage {
- async fn insert_tasks<'a, I: IntoIterator- >>(&self, tasks: I) -> crate::Result<()>;
- fn get_tasks(&self, options: TaskStatus) -> impl Stream
- >>;
+ async fn insert_tasks<'a, I: IntoIterator
- >>(
+ &self,
+ tasks: I,
+ ) -> crate::Result<()>;
+ fn get_tasks(&self, task_status: TaskStatus) -> impl Stream
- >>;
+
+ fn listen_tasks(&self, task_status: TaskStatus) -> impl Stream
- >>;
async fn get_paginated_tasks(&self, page: TaskPagination) -> crate::Result>;
}
-
diff --git a/lib_sync_core/src/database/sqlite.rs b/lib_sync_core/src/database/sqlite.rs
index 6aac53c..6d11c9a 100644
--- a/lib_sync_core/src/database/sqlite.rs
+++ b/lib_sync_core/src/database/sqlite.rs
@@ -108,6 +108,11 @@ impl TaskStorage for Sqlite {
query.fetch(&self.pool).err_into::()
}
+ fn listen_tasks(&self, task_status: TaskStatus) -> impl Stream
- >> {
+ futures::stream::empty()
+ }
+
+
async fn get_paginated_tasks(&self, page: TaskPagination) -> crate::Result> {
let mut builder: QueryBuilder<'_, sqlx::Sqlite> = QueryBuilder::new(
"select id, payload_key, payload, status_id, created_at, updated_at from tasks ",
@@ -170,6 +175,7 @@ mod tests {
}
#[sqlx::test(migrator = "MIGRATIONS")]
+ #[traced_test]
async fn it_save_tasks(pool: SqlitePool) -> sqlx::Result<()> {
let (sqlite, tasks) = setup(pool.clone(), 100);
diff --git a/lib_sync_core/src/lib.rs b/lib_sync_core/src/lib.rs
index 1030a9a..13a5f6a 100644
--- a/lib_sync_core/src/lib.rs
+++ b/lib_sync_core/src/lib.rs
@@ -1,5 +1,7 @@
+#![allow(dead_code, unused)]
+
pub mod error;
pub(crate) use error::*;
-pub mod tasks;
mod database;
+pub mod tasks;
diff --git a/lib_sync_core/src/tasks.rs b/lib_sync_core/src/tasks.rs
index a4bc96c..f455941 100644
--- a/lib_sync_core/src/tasks.rs
+++ b/lib_sync_core/src/tasks.rs
@@ -5,6 +5,9 @@ use std::fmt::Display;
use tabled::Tabled;
mod manager;
+mod jobs;
+mod worker;
+mod bus;
#[derive(sqlx::Type, Debug, Clone)]
#[repr(u8)]
@@ -43,8 +46,6 @@ impl
{
}
-pub type TaskJob = fn(&Task) -> TaskStatus;
-
#[derive(sqlx::FromRow, Tabled, Debug)]
pub struct Task {
id: u32,
@@ -76,9 +77,7 @@ impl Task {
pub fn payload(&self) -> &T {
&self.payload
}
-}
-impl Task {
pub fn get_key(&self) -> String {
self.payload_key.clone()
}
diff --git a/lib_sync_core/src/tasks/bus.rs b/lib_sync_core/src/tasks/bus.rs
new file mode 100644
index 0000000..4e96e56
--- /dev/null
+++ b/lib_sync_core/src/tasks/bus.rs
@@ -0,0 +1,4 @@
+#[derive(Clone)]
+pub enum Bus {
+ Local,
+}
\ No newline at end of file
diff --git a/lib_sync_core/src/tasks/jobs.rs b/lib_sync_core/src/tasks/jobs.rs
new file mode 100644
index 0000000..74a8ca0
--- /dev/null
+++ b/lib_sync_core/src/tasks/jobs.rs
@@ -0,0 +1,3 @@
+use crate::tasks::{Task, TaskStatus};
+
+pub type TaskJob = fn(&Task) -> TaskStatus;
\ No newline at end of file
diff --git a/lib_sync_core/src/tasks/manager.rs b/lib_sync_core/src/tasks/manager.rs
index bee3cb8..eaaef77 100644
--- a/lib_sync_core/src/tasks/manager.rs
+++ b/lib_sync_core/src/tasks/manager.rs
@@ -1,32 +1,190 @@
+use crate::database::TaskStorage;
+use crate::tasks::bus::Bus;
+use crate::tasks::jobs::TaskJob;
+use crate::tasks::{Task, TaskPayload, TaskStatus};
use futures::StreamExt;
use std::marker::PhantomData;
-use crate::database::TaskStorage;
-use crate::tasks::{Task, TaskJob, TaskPayload, TaskStatus};
+use std::pin::pin;
+use tokio::sync::mpsc::Receiver;
+use tokio::sync::{mpsc, oneshot};
+use tokio::sync::oneshot::Sender;
+use crate::tasks::worker::TaskMessage;
-struct TaskManager>
-{
+pub enum RateLimit {
+ Buffer(usize),
+ Rate(usize),
+ Ticks(usize),
+ None,
+}
+
+pub struct ManagerOptions {
+ rate_limit: RateLimit,
+ bus: Bus,
+}
+
+impl ManagerOptions {
+ pub fn new() -> Self {
+ Self::default()
+ }
+
+ pub fn with_rate_limit(mut self, rate_limit: RateLimit) -> Self {
+ self.rate_limit = rate_limit;
+ self
+ }
+}
+
+impl Default for ManagerOptions {
+ fn default() -> Self {
+ Self {
+ rate_limit: RateLimit::None,
+ bus: Bus::Local,
+ }
+ }
+}
+
+struct TaskManager> {
storage: T,
+ options: ManagerOptions,
_marker: PhantomData
,
}
impl> TaskManager {
- pub fn new(storage: T) -> Self {
- Self {
- storage,
- _marker: PhantomData,
- }
+ pub fn new(storage: T, options: ManagerOptions) -> Self {
+ Self {
+ storage,
+ options,
+ _marker: PhantomData,
+ }
}
-
- pub async fn run_tasks(&self, func: TaskJob) -> crate::Result<()> {
+
+ pub async fn run_tasks(&self, mut task_sink: TaskMessage) -> crate::Result<()> {
let rows = self.storage.get_tasks(TaskStatus::Pending);
-
- let result: Vec<(Task, TaskStatus)> = rows.map(|x| {
- let task = x.unwrap();
- let status = func(&task);
-
- (task, status)
- }).collect().await;
-
+ let listener = self.storage.listen_tasks(TaskStatus::Pending);
+
+ let mut queue = pin!(rows.chain(listener));
+
+ while let Some(task) = queue.next().await {
+ let task = match task {
+ Ok(task) => task,
+ Err(e) => {
+ continue
+ }
+ };
+
+ let sink = match task_sink.recv().await {
+ Some(s) => s,
+ None => break, // sink has stoped requesting tasks
+ };
+
+ if let Err(_) = sink.send(task) {
+ continue;
+ }
+
+ // (task, status)
+ }
+
Ok(())
}
-}
\ No newline at end of file
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use crate::database::{TaskPagination, TasksPage};
+ use async_stream::stream;
+ use fake::{Dummy, Fake, Faker};
+ use futures::Stream;
+ use serde::{Deserialize, Serialize};
+ use sqlx::types::Uuid;
+ use sync::mpsc;
+ use tokio::sync;
+ use tokio_stream::wrappers::ReceiverStream;
+ use tracing_test::traced_test;
+ use crate::error::Error;
+ use crate::tasks::worker::{Worker, WorkerManager};
+
+ #[derive(Dummy, Serialize, Deserialize, Debug)]
+ struct DummyTaskPayload {
+ key: Uuid,
+ _foo: String,
+ _bar: String,
+ }
+
+ struct DummyTaskStorage {}
+
+ impl TaskStorage for DummyTaskStorage {
+ async fn insert_tasks<'a, I: IntoIterator- >>(
+ &self,
+ _: I,
+ ) -> crate::error::Result<()> {
+ todo!()
+ }
+
+ fn get_tasks(
+ &self,
+ task_status: TaskStatus,
+ ) -> impl Stream
- >> {
+ let payloads: Vec = Faker.fake();
+
+ let tasks = payloads.into_iter().enumerate().map(move |(i, item)| {
+ Ok(Task::new((i + 1).to_string(), item, task_status.clone()))
+ });
+
+ futures::stream::iter(tasks)
+ }
+
+ fn listen_tasks(
+ &self,
+ task_status: TaskStatus,
+ ) -> impl Stream
- >> {
+ let (tx, rx) = mpsc::channel::>>(10);
+
+ tokio::spawn(async move {
+ for _ in 0..10 {
+ tokio::time::sleep(std::time::Duration::from_millis(250)).await;
+
+ let payload: DummyTaskPayload = Faker.fake();
+ let task_status: TaskStatus = task_status.clone();
+ let task = Ok(Task::new(payload.key.to_string(), payload, task_status));
+
+ if let Err(_) = tx.send(task).await {
+ break;
+ }
+ }
+ });
+
+ ReceiverStream::new(rx)
+ }
+
+ async fn get_paginated_tasks(
+ &self,
+ _: TaskPagination,
+ ) -> crate::error::Result> {
+ todo!()
+ }
+ }
+
+ struct DummyWorker;
+
+ impl Worker for DummyWorker {
+ fn process_job(task: &Task) -> crate::error::Result<()> {
+ println!("{:#?}", task);
+ Ok(())
+ }
+
+ async fn on_job_failure(task: &Task, error: Error) -> crate::error::Result<()> {
+ println!("{:#?} {:?}", task, error);
+ Ok(())
+ }
+ }
+
+ #[tokio::test]
+ #[traced_test]
+ async fn manager_runs() {
+ let execute_options = ManagerOptions::new();
+ let local_worker_sink = WorkerManager::get_listener_sink::(execute_options.bus.clone());
+ let task_manager = TaskManager::new(DummyTaskStorage {}, execute_options);
+
+ task_manager.run_tasks(local_worker_sink).await.unwrap()
+ }
+}
diff --git a/lib_sync_core/src/tasks/worker.rs b/lib_sync_core/src/tasks/worker.rs
new file mode 100644
index 0000000..7e3d917
--- /dev/null
+++ b/lib_sync_core/src/tasks/worker.rs
@@ -0,0 +1,48 @@
+use crate::error::Error;
+use crate::tasks::bus::Bus;
+use crate::tasks::{Task, TaskPayload};
+use tokio::sync::mpsc::Receiver;
+use tokio::sync::oneshot::Sender;
+use tokio::sync::{mpsc, oneshot};
+
+pub type TaskMessage = Receiver>>;
+
+pub struct WorkerManager;
+
+impl WorkerManager {
+ pub fn get_listener_sink>(bus: Bus) -> TaskMessage {
+ match bus {
+ Bus::Local => {
+ let (bus_tx, bus_rx) = mpsc::channel(100);
+ tokio::spawn(async move {
+ loop {
+ // TODO: properly catch errors
+ let (tx, rx) = oneshot::channel();
+
+ // Request a task
+ bus_tx.send(tx).await.unwrap();
+
+ // Wait for a task to be returned
+ let task = rx.await.unwrap();
+
+ W::process_job(&task).unwrap();
+ }
+ });
+
+ bus_rx
+ }
+ }
+ }
+}
+
+pub trait Worker {
+ async fn pre_process_job(task: &Task) -> crate::Result<()> {
+ Ok(())
+ }
+ fn process_job(task: &Task) -> crate::Result<()>;
+ async fn post_process_job(task: &Task) -> crate::Result<()> {
+ Ok(())
+ }
+
+ async fn on_job_failure(task: &Task, error: Error) -> crate::Result<()>;
+}
diff --git a/task_queue/Cargo.toml b/task_queue/Cargo.toml
new file mode 100644
index 0000000..5ec2395
--- /dev/null
+++ b/task_queue/Cargo.toml
@@ -0,0 +1,26 @@
+[package]
+name = "task_queue"
+version = "0.1.0"
+edition = "2024"
+
+[dependencies]
+directories = "6.0.0"
+tokio = { version = "1.45.0", features = ["default", "rt", "rt-multi-thread", "macros", "signal"] }
+tokio-stream = "0.1.17"
+sqlx = { version = "0.8", features = ["runtime-tokio", "sqlite", "chrono", "migrate", "uuid"] }
+serde = { version = "1.0.219", features = ["derive"] }
+chrono = {version = "0.4.41", features = ["serde"]}
+serde_json = "1.0.140"
+tracing = "0.1.41"
+tracing-subscriber = { version = "0.3.19" , features = ["env-filter"]}
+figment = { version = "0.10.19", features = ["env"] }
+tracing-core = "0.1.33"
+futures = "0.3.31"
+thiserror = "2.0.12"
+async-stream = "0.3.6"
+apalis = { version = "1.0.0-rc.1" }
+apalis-sqlite = "1.0.0-rc.1"
+
+[dev-dependencies]
+fake = { version = "4.3.0", features = ["derive", "chrono", "http", "uuid"] }
+tracing-test = "0.2.5"
diff --git a/task_queue/src/error.rs b/task_queue/src/error.rs
new file mode 100644
index 0000000..e6129be
--- /dev/null
+++ b/task_queue/src/error.rs
@@ -0,0 +1,28 @@
+use thiserror::Error;
+
+#[derive(Error, Debug)]
+pub enum Error {
+ #[error("{0}")]
+ Exception(&'static str),
+
+ #[error("{0}")]
+ Unhandled(&'static str),
+
+ #[error(transparent)]
+ Io(#[from] tokio::io::Error),
+
+ #[error(transparent)]
+ Sqlx(#[from] sqlx::Error),
+
+ #[error(transparent)]
+ Migration(#[from] sqlx::migrate::MigrateError),
+
+ #[error(transparent)]
+ ParseJson(#[from] serde_json::Error),
+
+ #[error(transparent)]
+ WorkerError(#[from] apalis::prelude::WorkerError),
+}
+
+pub type Result = std::result::Result;
+
diff --git a/task_queue/src/lib.rs b/task_queue/src/lib.rs
new file mode 100644
index 0000000..ad9ee6e
--- /dev/null
+++ b/task_queue/src/lib.rs
@@ -0,0 +1,9 @@
+#![allow(dead_code, unused)]
+
+use apalis::prelude::*;
+use serde::{Deserialize, Serialize};
+
+pub mod error;
+
+pub(crate) use error::*;
+pub mod tasks;
diff --git a/task_queue/src/tasks.rs b/task_queue/src/tasks.rs
new file mode 100644
index 0000000..e1adb58
--- /dev/null
+++ b/task_queue/src/tasks.rs
@@ -0,0 +1,50 @@
+use serde::{Deserialize, Serialize};
+
+#[derive(Serialize, Deserialize, Debug, Clone)]
+pub struct Task {
+ pub id: u32,
+}
+
+#[cfg(test)]
+mod tests {
+ use std::time::Duration;
+
+ use crate::Result;
+
+ use super::*;
+ use apalis::prelude::*;
+ use apalis_sqlite::SqliteStorage;
+ use fake::{Fake, Faker};
+ use sqlx::SqlitePool;
+ use tokio_stream::StreamExt;
+
+ async fn generate_dummy_tasks>(storage: &mut T)
+ where
+ ::Error: std::fmt::Debug,
+ {
+ storage
+ .push(super::Task { id: Faker.fake() })
+ .await
+ .unwrap();
+ }
+
+ #[tokio::test]
+ async fn can_enqueue_tasks() {
+ let pool = SqlitePool::connect(":memory:").await.unwrap();
+ SqliteStorage::setup(&pool).await.unwrap();
+ let mut backend = SqliteStorage::new(&pool);
+
+ generate_dummy_tasks(&mut backend).await;
+
+ async fn process_task(task: super::Task, worker: WorkerContext) -> crate::Result<()> {
+ tokio::time::sleep(Duration::from_millis(100)).await;
+
+ worker.stop()?;
+ Ok(())
+ }
+ let worker = WorkerBuilder::new("rango-tango")
+ .backend(backend)
+ .build(process_task);
+ worker.run().await.unwrap();
+ }
+}