From d87843614a87c374b84541f57cd19d1b431955fb Mon Sep 17 00:00:00 2001 From: aleidk Date: Tue, 20 May 2025 16:49:41 -0400 Subject: [PATCH 1/3] wip: add test to task_manager refs: #5 --- .idea/codeStyles/Project.xml | 29 +++++ .idea/codeStyles/codeStyleConfig.xml | 2 +- lib_sync_core/src/database.rs | 4 +- lib_sync_core/src/database/sqlite.rs | 5 + lib_sync_core/src/tasks.rs | 3 +- lib_sync_core/src/tasks/jobs.rs | 3 + lib_sync_core/src/tasks/manager.rs | 155 +++++++++++++++++++++++---- 7 files changed, 177 insertions(+), 24 deletions(-) create mode 100644 .idea/codeStyles/Project.xml create mode 100644 lib_sync_core/src/tasks/jobs.rs diff --git a/.idea/codeStyles/Project.xml b/.idea/codeStyles/Project.xml new file mode 100644 index 0000000..4e20976 --- /dev/null +++ b/.idea/codeStyles/Project.xml @@ -0,0 +1,29 @@ + + + + + + + + + + + + \ No newline at end of file diff --git a/.idea/codeStyles/codeStyleConfig.xml b/.idea/codeStyles/codeStyleConfig.xml index a55e7a1..79ee123 100644 --- a/.idea/codeStyles/codeStyleConfig.xml +++ b/.idea/codeStyles/codeStyleConfig.xml @@ -1,5 +1,5 @@ - \ No newline at end of file diff --git a/lib_sync_core/src/database.rs b/lib_sync_core/src/database.rs index 45db661..216e71d 100644 --- a/lib_sync_core/src/database.rs +++ b/lib_sync_core/src/database.rs @@ -68,8 +68,10 @@ impl TasksPage { pub trait TaskStorage { async fn insert_tasks<'a, I: IntoIterator>>(&self, tasks: I) -> crate::Result<()>; - fn get_tasks(&self, options: TaskStatus) -> impl Stream>>; + fn get_tasks(&self, task_status: TaskStatus) -> impl Stream>>; + async fn listen_tasks(&self, task_status: TaskStatus) -> crate::Result<()>; + async fn get_paginated_tasks(&self, page: TaskPagination) -> crate::Result>; } diff --git a/lib_sync_core/src/database/sqlite.rs b/lib_sync_core/src/database/sqlite.rs index 6aac53c..5c09dc4 100644 --- a/lib_sync_core/src/database/sqlite.rs +++ b/lib_sync_core/src/database/sqlite.rs @@ -108,6 +108,10 @@ impl TaskStorage for Sqlite { query.fetch(&self.pool).err_into::() } + async fn listen_tasks(&self, task_status: TaskStatus) -> crate::error::Result<()> { + todo!() + } + async fn get_paginated_tasks(&self, page: TaskPagination) -> crate::Result> { let mut builder: QueryBuilder<'_, sqlx::Sqlite> = QueryBuilder::new( "select id, payload_key, payload, status_id, created_at, updated_at from tasks ", @@ -170,6 +174,7 @@ mod tests { } #[sqlx::test(migrator = "MIGRATIONS")] + #[traced_test] async fn it_save_tasks(pool: SqlitePool) -> sqlx::Result<()> { let (sqlite, tasks) = setup(pool.clone(), 100); diff --git a/lib_sync_core/src/tasks.rs b/lib_sync_core/src/tasks.rs index a4bc96c..1079163 100644 --- a/lib_sync_core/src/tasks.rs +++ b/lib_sync_core/src/tasks.rs @@ -5,6 +5,7 @@ use std::fmt::Display; use tabled::Tabled; mod manager; +mod jobs; #[derive(sqlx::Type, Debug, Clone)] #[repr(u8)] @@ -43,8 +44,6 @@ impl { } -pub type TaskJob = fn(&Task) -> TaskStatus; - #[derive(sqlx::FromRow, Tabled, Debug)] pub struct Task { id: u32, diff --git a/lib_sync_core/src/tasks/jobs.rs b/lib_sync_core/src/tasks/jobs.rs new file mode 100644 index 0000000..74a8ca0 --- /dev/null +++ b/lib_sync_core/src/tasks/jobs.rs @@ -0,0 +1,3 @@ +use crate::tasks::{Task, TaskStatus}; + +pub type TaskJob = fn(&Task) -> TaskStatus; \ No newline at end of file diff --git a/lib_sync_core/src/tasks/manager.rs b/lib_sync_core/src/tasks/manager.rs index bee3cb8..f21fb0c 100644 --- a/lib_sync_core/src/tasks/manager.rs +++ b/lib_sync_core/src/tasks/manager.rs @@ -1,32 +1,147 @@ -use futures::StreamExt; -use std::marker::PhantomData; use crate::database::TaskStorage; -use crate::tasks::{Task, TaskJob, TaskPayload, TaskStatus}; +use crate::tasks::jobs::TaskJob; +use crate::tasks::{Task, TaskPayload, TaskStatus}; +use futures::StreamExt; +use futures::stream::FuturesOrdered; +use std::marker::PhantomData; -struct TaskManager> -{ +pub enum RateLimit { + Buffer(usize), + Rate(usize), + Ticks(usize), + None, +} + +pub struct ExecuteOptions { + rate_limit: RateLimit, +} + +impl ExecuteOptions { + pub fn new() -> Self { + Self::default() + } + + pub fn with_rate_limit(mut self, rate_limit: RateLimit) -> Self { + self.rate_limit = rate_limit; + self + } +} + +impl Default for ExecuteOptions { + fn default() -> Self { + Self { + rate_limit: RateLimit::None, + } + } +} + +struct TaskManager> { storage: T, + options: ExecuteOptions, _marker: PhantomData, } impl> TaskManager { - pub fn new(storage: T) -> Self { - Self { - storage, - _marker: PhantomData, - } + pub fn new(storage: T, options: ExecuteOptions) -> Self { + Self { + storage, + options, + _marker: PhantomData, + } } - + pub async fn run_tasks(&self, func: TaskJob) -> crate::Result<()> { let rows = self.storage.get_tasks(TaskStatus::Pending); - - let result: Vec<(Task, TaskStatus)> = rows.map(|x| { - let task = x.unwrap(); - let status = func(&task); - - (task, status) - }).collect().await; - + + let result: Vec<(Task, TaskStatus)> = rows + .map(async |x| { + let task = x.unwrap(); + let status = func(&task); + + (task, status) + }) + .collect() + .await; + Ok(()) } -} \ No newline at end of file +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::database::{TaskPagination, TasksPage}; + use fake::{Dummy, Fake, Faker}; + use futures::{Stream, StreamExt}; + use serde::{Deserialize, Serialize}; + use sqlx::Row; + use sqlx::types::Uuid; + use tracing_test::traced_test; + + #[derive(Dummy, Serialize, Deserialize, Debug)] + struct DummyTaskPayload { + key: Uuid, + _foo: String, + _bar: String, + } + + struct DummyTaskStorage {} + + impl TaskStorage for DummyTaskStorage { + async fn insert_tasks<'a, I: IntoIterator>>( + &self, + _: I, + ) -> crate::error::Result<()> { + todo!() + } + + fn get_tasks( + &self, + task_status: TaskStatus, + ) -> impl Stream>> { + let payloads: Vec = Faker.fake(); + + let tasks = payloads.into_iter().enumerate().map(move |(i, item)| { + Ok(Task::new((i + 1).to_string(), item, task_status.clone())) + }); + + futures::stream::iter(tasks) + } + + async fn listen_tasks(&self, task_status: TaskStatus) -> crate::error::Result<()> { + todo!() + } + async fn listen_tasks2(&self, task_status: TaskStatus) -> FuturesOrdered> + Sized> { + let mut fifo = FuturesOrdered::new(); + + tokio::spawn(async move { + loop { + tokio::time::sleep(std::time::Duration::from_millis(250)).await; + let payload: DummyTaskPayload = Faker.fake(); + let task_status: TaskStatus = task_status.clone(); + fifo.push_back(async move { + Task::new(payload.key.to_string(), payload, task_status) + }); + } + }); + + fifo + } + + async fn get_paginated_tasks( + &self, + _: TaskPagination, + ) -> crate::error::Result> { + todo!() + } + } + + #[tokio::test] + #[traced_test] + async fn manager_runs() { + let execute_options = ExecuteOptions::new(); + let manager = TaskManager::new(DummyTaskStorage {}, execute_options); + + manager.run_tasks(|_| TaskStatus::Completed).await.unwrap(); + } +} From 2c47226dc9e5c5937ec9c0250e4fba9e87feb86f Mon Sep 17 00:00:00 2001 From: aleidk Date: Tue, 20 May 2025 16:49:41 -0400 Subject: [PATCH 2/3] wip: add test to task_manager refs: #5 --- .idea/codeStyles/codeStyleConfig.xml | 1 + Cargo.lock | 1 + lib_sync_core/Cargo.toml | 1 + lib_sync_core/src/database.rs | 29 ++++---- lib_sync_core/src/database/sqlite.rs | 5 +- lib_sync_core/src/tasks.rs | 4 +- lib_sync_core/src/tasks/bus.rs | 4 + lib_sync_core/src/tasks/manager.rs | 107 +++++++++++++++++++-------- lib_sync_core/src/tasks/worker.rs | 48 ++++++++++++ 9 files changed, 149 insertions(+), 51 deletions(-) create mode 100644 lib_sync_core/src/tasks/bus.rs create mode 100644 lib_sync_core/src/tasks/worker.rs diff --git a/.idea/codeStyles/codeStyleConfig.xml b/.idea/codeStyles/codeStyleConfig.xml index 79ee123..6e6eec1 100644 --- a/.idea/codeStyles/codeStyleConfig.xml +++ b/.idea/codeStyles/codeStyleConfig.xml @@ -1,5 +1,6 @@ \ No newline at end of file diff --git a/Cargo.lock b/Cargo.lock index 43de3fc..6292eb7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1066,6 +1066,7 @@ dependencies = [ "tabled", "thiserror", "tokio", + "tokio-stream", "tracing", "tracing-core", "tracing-subscriber", diff --git a/lib_sync_core/Cargo.toml b/lib_sync_core/Cargo.toml index 0dcc4f3..be60587 100644 --- a/lib_sync_core/Cargo.toml +++ b/lib_sync_core/Cargo.toml @@ -6,6 +6,7 @@ edition = "2024" [dependencies] directories = "6.0.0" tokio = { version = "1.45.0", features = ["default", "rt", "rt-multi-thread", "macros"] } +tokio-stream = "0.1.17" sqlx = { version = "0.8", features = ["runtime-tokio", "sqlite", "chrono", "migrate", "uuid"] } clap = { version = "4.5.37", features = ["derive"] } serde = { version = "1.0.219", features = ["derive"] } diff --git a/lib_sync_core/src/database.rs b/lib_sync_core/src/database.rs index 216e71d..89e5da0 100644 --- a/lib_sync_core/src/database.rs +++ b/lib_sync_core/src/database.rs @@ -13,12 +13,12 @@ impl TaskPagination { pub fn new() -> Self { Self::default() } - + pub fn next(&self) -> Self { - Self { + Self { offset: self.offset.saturating_add(self.limit.unwrap_or(0)), - ..self.clone() - } + ..self.clone() + } } pub fn prev(&self) -> Self { @@ -45,20 +45,17 @@ impl TaskPagination { } pub struct TasksPage { - tasks: Vec>, - page: TaskPagination + tasks: Vec>, + page: TaskPagination, } impl TasksPage { fn new(tasks: Vec>, page: TaskPagination) -> Self { - Self { - tasks, - page - } + Self { tasks, page } } pub fn next(&self) -> TaskPagination { - self.page.next() + self.page.next() } pub fn prev(&self) -> TaskPagination { @@ -67,11 +64,13 @@ impl TasksPage { } pub trait TaskStorage { - async fn insert_tasks<'a, I: IntoIterator>>(&self, tasks: I) -> crate::Result<()>; + async fn insert_tasks<'a, I: IntoIterator>>( + &self, + tasks: I, + ) -> crate::Result<()>; fn get_tasks(&self, task_status: TaskStatus) -> impl Stream>>; - async fn listen_tasks(&self, task_status: TaskStatus) -> crate::Result<()>; - + fn listen_tasks(&self, task_status: TaskStatus) -> impl Stream>>; + async fn get_paginated_tasks(&self, page: TaskPagination) -> crate::Result>; } - diff --git a/lib_sync_core/src/database/sqlite.rs b/lib_sync_core/src/database/sqlite.rs index 5c09dc4..6d11c9a 100644 --- a/lib_sync_core/src/database/sqlite.rs +++ b/lib_sync_core/src/database/sqlite.rs @@ -108,10 +108,11 @@ impl TaskStorage for Sqlite { query.fetch(&self.pool).err_into::() } - async fn listen_tasks(&self, task_status: TaskStatus) -> crate::error::Result<()> { - todo!() + fn listen_tasks(&self, task_status: TaskStatus) -> impl Stream>> { + futures::stream::empty() } + async fn get_paginated_tasks(&self, page: TaskPagination) -> crate::Result> { let mut builder: QueryBuilder<'_, sqlx::Sqlite> = QueryBuilder::new( "select id, payload_key, payload, status_id, created_at, updated_at from tasks ", diff --git a/lib_sync_core/src/tasks.rs b/lib_sync_core/src/tasks.rs index 1079163..f455941 100644 --- a/lib_sync_core/src/tasks.rs +++ b/lib_sync_core/src/tasks.rs @@ -6,6 +6,8 @@ use tabled::Tabled; mod manager; mod jobs; +mod worker; +mod bus; #[derive(sqlx::Type, Debug, Clone)] #[repr(u8)] @@ -75,9 +77,7 @@ impl Task { pub fn payload(&self) -> &T { &self.payload } -} -impl Task { pub fn get_key(&self) -> String { self.payload_key.clone() } diff --git a/lib_sync_core/src/tasks/bus.rs b/lib_sync_core/src/tasks/bus.rs new file mode 100644 index 0000000..4e96e56 --- /dev/null +++ b/lib_sync_core/src/tasks/bus.rs @@ -0,0 +1,4 @@ +#[derive(Clone)] +pub enum Bus { + Local, +} \ No newline at end of file diff --git a/lib_sync_core/src/tasks/manager.rs b/lib_sync_core/src/tasks/manager.rs index f21fb0c..eaaef77 100644 --- a/lib_sync_core/src/tasks/manager.rs +++ b/lib_sync_core/src/tasks/manager.rs @@ -1,9 +1,14 @@ use crate::database::TaskStorage; +use crate::tasks::bus::Bus; use crate::tasks::jobs::TaskJob; use crate::tasks::{Task, TaskPayload, TaskStatus}; use futures::StreamExt; -use futures::stream::FuturesOrdered; use std::marker::PhantomData; +use std::pin::pin; +use tokio::sync::mpsc::Receiver; +use tokio::sync::{mpsc, oneshot}; +use tokio::sync::oneshot::Sender; +use crate::tasks::worker::TaskMessage; pub enum RateLimit { Buffer(usize), @@ -12,11 +17,12 @@ pub enum RateLimit { None, } -pub struct ExecuteOptions { +pub struct ManagerOptions { rate_limit: RateLimit, + bus: Bus, } -impl ExecuteOptions { +impl ManagerOptions { pub fn new() -> Self { Self::default() } @@ -27,22 +33,23 @@ impl ExecuteOptions { } } -impl Default for ExecuteOptions { +impl Default for ManagerOptions { fn default() -> Self { Self { rate_limit: RateLimit::None, + bus: Bus::Local, } } } struct TaskManager> { storage: T, - options: ExecuteOptions, + options: ManagerOptions, _marker: PhantomData, } impl> TaskManager { - pub fn new(storage: T, options: ExecuteOptions) -> Self { + pub fn new(storage: T, options: ManagerOptions) -> Self { Self { storage, options, @@ -50,18 +57,31 @@ impl> TaskManager { } } - pub async fn run_tasks(&self, func: TaskJob) -> crate::Result<()> { + pub async fn run_tasks(&self, mut task_sink: TaskMessage) -> crate::Result<()> { let rows = self.storage.get_tasks(TaskStatus::Pending); + let listener = self.storage.listen_tasks(TaskStatus::Pending); - let result: Vec<(Task, TaskStatus)> = rows - .map(async |x| { - let task = x.unwrap(); - let status = func(&task); + let mut queue = pin!(rows.chain(listener)); - (task, status) - }) - .collect() - .await; + while let Some(task) = queue.next().await { + let task = match task { + Ok(task) => task, + Err(e) => { + continue + } + }; + + let sink = match task_sink.recv().await { + Some(s) => s, + None => break, // sink has stoped requesting tasks + }; + + if let Err(_) = sink.send(task) { + continue; + } + + // (task, status) + } Ok(()) } @@ -71,12 +91,17 @@ impl> TaskManager { mod tests { use super::*; use crate::database::{TaskPagination, TasksPage}; + use async_stream::stream; use fake::{Dummy, Fake, Faker}; - use futures::{Stream, StreamExt}; + use futures::Stream; use serde::{Deserialize, Serialize}; - use sqlx::Row; use sqlx::types::Uuid; + use sync::mpsc; + use tokio::sync; + use tokio_stream::wrappers::ReceiverStream; use tracing_test::traced_test; + use crate::error::Error; + use crate::tasks::worker::{Worker, WorkerManager}; #[derive(Dummy, Serialize, Deserialize, Debug)] struct DummyTaskPayload { @@ -98,7 +123,7 @@ mod tests { fn get_tasks( &self, task_status: TaskStatus, - ) -> impl Stream>> { + ) -> impl Stream>> { let payloads: Vec = Faker.fake(); let tasks = payloads.into_iter().enumerate().map(move |(i, item)| { @@ -108,24 +133,27 @@ mod tests { futures::stream::iter(tasks) } - async fn listen_tasks(&self, task_status: TaskStatus) -> crate::error::Result<()> { - todo!() - } - async fn listen_tasks2(&self, task_status: TaskStatus) -> FuturesOrdered> + Sized> { - let mut fifo = FuturesOrdered::new(); + fn listen_tasks( + &self, + task_status: TaskStatus, + ) -> impl Stream>> { + let (tx, rx) = mpsc::channel::>>(10); tokio::spawn(async move { - loop { + for _ in 0..10 { tokio::time::sleep(std::time::Duration::from_millis(250)).await; + let payload: DummyTaskPayload = Faker.fake(); let task_status: TaskStatus = task_status.clone(); - fifo.push_back(async move { - Task::new(payload.key.to_string(), payload, task_status) - }); + let task = Ok(Task::new(payload.key.to_string(), payload, task_status)); + + if let Err(_) = tx.send(task).await { + break; + } } }); - - fifo + + ReceiverStream::new(rx) } async fn get_paginated_tasks( @@ -136,12 +164,27 @@ mod tests { } } + struct DummyWorker; + + impl Worker for DummyWorker { + fn process_job(task: &Task) -> crate::error::Result<()> { + println!("{:#?}", task); + Ok(()) + } + + async fn on_job_failure(task: &Task, error: Error) -> crate::error::Result<()> { + println!("{:#?} {:?}", task, error); + Ok(()) + } + } + #[tokio::test] #[traced_test] async fn manager_runs() { - let execute_options = ExecuteOptions::new(); - let manager = TaskManager::new(DummyTaskStorage {}, execute_options); + let execute_options = ManagerOptions::new(); + let local_worker_sink = WorkerManager::get_listener_sink::(execute_options.bus.clone()); + let task_manager = TaskManager::new(DummyTaskStorage {}, execute_options); - manager.run_tasks(|_| TaskStatus::Completed).await.unwrap(); + task_manager.run_tasks(local_worker_sink).await.unwrap() } } diff --git a/lib_sync_core/src/tasks/worker.rs b/lib_sync_core/src/tasks/worker.rs new file mode 100644 index 0000000..7e3d917 --- /dev/null +++ b/lib_sync_core/src/tasks/worker.rs @@ -0,0 +1,48 @@ +use crate::error::Error; +use crate::tasks::bus::Bus; +use crate::tasks::{Task, TaskPayload}; +use tokio::sync::mpsc::Receiver; +use tokio::sync::oneshot::Sender; +use tokio::sync::{mpsc, oneshot}; + +pub type TaskMessage = Receiver>>; + +pub struct WorkerManager; + +impl WorkerManager { + pub fn get_listener_sink>(bus: Bus) -> TaskMessage { + match bus { + Bus::Local => { + let (bus_tx, bus_rx) = mpsc::channel(100); + tokio::spawn(async move { + loop { + // TODO: properly catch errors + let (tx, rx) = oneshot::channel(); + + // Request a task + bus_tx.send(tx).await.unwrap(); + + // Wait for a task to be returned + let task = rx.await.unwrap(); + + W::process_job(&task).unwrap(); + } + }); + + bus_rx + } + } + } +} + +pub trait Worker { + async fn pre_process_job(task: &Task) -> crate::Result<()> { + Ok(()) + } + fn process_job(task: &Task) -> crate::Result<()>; + async fn post_process_job(task: &Task) -> crate::Result<()> { + Ok(()) + } + + async fn on_job_failure(task: &Task, error: Error) -> crate::Result<()>; +} From 040c53cebd0e4b8d01f96861949e13bb29391745 Mon Sep 17 00:00:00 2001 From: aleidk Date: Mon, 29 Dec 2025 19:45:19 -0300 Subject: [PATCH 3/3] refactor(lib_core): create new lib core with apalis use apalis library for task management --- Cargo.lock | 340 ++++++++++++++++++++++++++++++++------- Cargo.toml | 2 +- lib_sync_core/src/lib.rs | 4 +- task_queue/Cargo.toml | 26 +++ task_queue/src/error.rs | 28 ++++ task_queue/src/lib.rs | 9 ++ task_queue/src/tasks.rs | 50 ++++++ 7 files changed, 403 insertions(+), 56 deletions(-) create mode 100644 task_queue/Cargo.toml create mode 100644 task_queue/src/error.rs create mode 100644 task_queue/src/lib.rs create mode 100644 task_queue/src/tasks.rs diff --git a/Cargo.lock b/Cargo.lock index 6292eb7..3b5e94c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -110,6 +110,84 @@ dependencies = [ "windows-sys 0.59.0", ] +[[package]] +name = "apalis" +version = "1.0.0-rc.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f93be0eb33b912f5e66004d0b756423c285273259068b1c80a71d7842658189b" +dependencies = [ + "apalis-core", + "futures-util", + "pin-project", + "thiserror", + "tower", + "tracing", +] + +[[package]] +name = "apalis-codec" +version = "0.1.0-rc.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a5ed6bb8e64c360ed4ad666a6cbc42e9e6df73087461dc4071f510a3af284637" +dependencies = [ + "apalis-core", + "serde", + "serde_json", +] + +[[package]] +name = "apalis-core" +version = "1.0.0-rc.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6b1557d680ee4a9b42a76ab3a9572cee1a00d45e7eb455427d906c42774766e7" +dependencies = [ + "futures-channel", + "futures-core", + "futures-sink", + "futures-timer", + "futures-util", + "pin-project", + "serde", + "thiserror", + "tower-layer", + "tower-service", + "tracing", +] + +[[package]] +name = "apalis-sql" +version = "1.0.0-rc.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5ade5d8faa60e9975b01d3bb1ebc5028589aa4986365eaa4d080d30ed3b5141f" +dependencies = [ + "apalis-core", + "chrono", + "serde", + "serde_json", + "thiserror", +] + +[[package]] +name = "apalis-sqlite" +version = "1.0.0-rc.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fd43020ce13d6cb8c8d8c09657a6691d8490cd1ce0d8bc0f7fef8bf9b23cfe86" +dependencies = [ + "apalis-codec", + "apalis-core", + "apalis-sql", + "chrono", + "futures", + "log", + "pin-project", + "serde", + "serde_json", + "sqlx", + "thiserror", + "tokio", + "ulid", +] + [[package]] name = "async-stream" version = "0.3.6" @@ -526,16 +604,6 @@ version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "877a4ace8713b0bcf2a4e7eec82529c029f1d0619886d18145fea96c3ffe5c0f" -[[package]] -name = "errno" -version = "0.3.11" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "976dd42dc7e85965fe702eb8164f21f450704bdde31faefd6471dba214cb594e" -dependencies = [ - "libc", - "windows-sys 0.59.0", -] - [[package]] name = "etcetera" version = "0.8.0" @@ -573,12 +641,6 @@ dependencies = [ "uuid", ] -[[package]] -name = "fastrand" -version = "2.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "37909eebbb50d72f9059c3b6d82c0463f2ff062c9e95845c43a6c9c0355411be" - [[package]] name = "figment" version = "0.10.19" @@ -706,6 +768,12 @@ version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f90f7dce0722e95104fcb095585910c0977252f286e354b5e3bd38902cd99988" +[[package]] +name = "futures-timer" +version = "3.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f288b0a4f20f9a56b5d1da57e2227c661b7b16168e2f72365f57b63326e29b24" + [[package]] name = "futures-util" version = "0.3.31" @@ -1107,12 +1175,6 @@ dependencies = [ "vcpkg", ] -[[package]] -name = "linux-raw-sys" -version = "0.9.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cd945864f07fe9f5371a27ad7b52a172b4b499999f1d97574c9fa68373937e12" - [[package]] name = "litemap" version = "0.7.5" @@ -1342,6 +1404,26 @@ version = "2.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e3148f5046208a5d56bcfc03053e3ca6334e51da8dfb19b6cdc8b306fae3283e" +[[package]] +name = "pin-project" +version = "1.1.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "677f1add503faace112b9f1373e43e9e054bfdd22ff1a63c1bc485eaec6a6a8a" +dependencies = [ + "pin-project-internal", +] + +[[package]] +name = "pin-project-internal" +version = "1.1.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6e918e4ff8c4549eb882f14b3a4bc8c8bc93de829416eacf579f1207a8fbf861" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "pin-project-lite" version = "0.2.16" @@ -1572,6 +1654,20 @@ version = "0.8.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2b15c43186be67a4fd63bee50d0303afffcef381492ebe2c5d87f324e1b8815c" +[[package]] +name = "ring" +version = "0.17.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a4689e6c2294d81e88dc6261c768b63bc4fcdb852be6d1352498b114f61383b7" +dependencies = [ + "cc", + "cfg-if", + "getrandom 0.2.16", + "libc", + "untrusted", + "windows-sys 0.52.0", +] + [[package]] name = "rsa" version = "0.9.8" @@ -1599,16 +1695,37 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "719b953e2095829ee67db738b3bfa9fa368c94900df327b3f07fe6e794d2fe1f" [[package]] -name = "rustix" -version = "1.0.7" +name = "rustls" +version = "0.23.35" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c71e83d6afe7ff64890ec6b71d6a69bb8a610ab78ce364b3352876bb4c801266" +checksum = "533f54bc6a7d4f647e46ad909549eda97bf5afc1585190ef692b4286b198bd8f" dependencies = [ - "bitflags", - "errno", - "libc", - "linux-raw-sys", - "windows-sys 0.59.0", + "once_cell", + "ring", + "rustls-pki-types", + "rustls-webpki", + "subtle", + "zeroize", +] + +[[package]] +name = "rustls-pki-types" +version = "1.13.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "21e6f2ab2928ca4291b86736a8bd920a277a399bba1589409d72154ff87c1282" +dependencies = [ + "zeroize", +] + +[[package]] +name = "rustls-webpki" +version = "0.103.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2ffdfa2f5286e2247234e03f680868ac2815974dc39e00ea15adc445d0aafe52" +dependencies = [ + "ring", + "rustls-pki-types", + "untrusted", ] [[package]] @@ -1716,6 +1833,15 @@ version = "1.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0fda2ff0d084019ba4d7c6f371c95d8fd75ce3524c3cb8fb653a3023f6323e64" +[[package]] +name = "signal-hook-registry" +version = "1.4.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7664a098b8e616bdfcc2dc0e9ac44eb231eedf41db4e9fe95d8d32ec728dedad" +dependencies = [ + "libc", +] + [[package]] name = "signature" version = "2.2.0" @@ -1775,9 +1901,9 @@ dependencies = [ [[package]] name = "sqlx" -version = "0.8.5" +version = "0.8.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f3c3a85280daca669cfd3bcb68a337882a8bc57ec882f72c5d13a430613a738e" +checksum = "1fefb893899429669dcdd979aff487bd78f4064e5e7907e4269081e0ef7d97dc" dependencies = [ "sqlx-core", "sqlx-macros", @@ -1788,9 +1914,9 @@ dependencies = [ [[package]] name = "sqlx-core" -version = "0.8.5" +version = "0.8.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f743f2a3cea30a58cd479013f75550e879009e3a02f616f18ca699335aa248c3" +checksum = "ee6798b1838b6a0f69c007c133b8df5866302197e404e8b6ee8ed3e3a5e68dc6" dependencies = [ "base64", "bytes", @@ -1810,6 +1936,7 @@ dependencies = [ "memchr", "once_cell", "percent-encoding", + "rustls", "serde", "serde_json", "sha2", @@ -1820,13 +1947,14 @@ dependencies = [ "tracing", "url", "uuid", + "webpki-roots 0.26.11", ] [[package]] name = "sqlx-macros" -version = "0.8.5" +version = "0.8.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7f4200e0fde19834956d4252347c12a083bdcb237d7a1a1446bffd8768417dce" +checksum = "a2d452988ccaacfbf5e0bdbc348fb91d7c8af5bee192173ac3636b5fb6e6715d" dependencies = [ "proc-macro2", "quote", @@ -1837,9 +1965,9 @@ dependencies = [ [[package]] name = "sqlx-macros-core" -version = "0.8.5" +version = "0.8.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "882ceaa29cade31beca7129b6beeb05737f44f82dbe2a9806ecea5a7093d00b7" +checksum = "19a9c1841124ac5a61741f96e1d9e2ec77424bf323962dd894bdb93f37d5219b" dependencies = [ "dotenvy", "either", @@ -1856,16 +1984,15 @@ dependencies = [ "sqlx-postgres", "sqlx-sqlite", "syn", - "tempfile", "tokio", "url", ] [[package]] name = "sqlx-mysql" -version = "0.8.5" +version = "0.8.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0afdd3aa7a629683c2d750c2df343025545087081ab5942593a5288855b1b7a7" +checksum = "aa003f0038df784eb8fecbbac13affe3da23b45194bd57dba231c8f48199c526" dependencies = [ "atoi", "base64", @@ -1907,9 +2034,9 @@ dependencies = [ [[package]] name = "sqlx-postgres" -version = "0.8.5" +version = "0.8.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a0bedbe1bbb5e2615ef347a5e9d8cd7680fb63e77d9dafc0f29be15e53f1ebe6" +checksum = "db58fcd5a53cf07c184b154801ff91347e4c30d17a3562a635ff028ad5deda46" dependencies = [ "atoi", "base64", @@ -1946,9 +2073,9 @@ dependencies = [ [[package]] name = "sqlx-sqlite" -version = "0.8.5" +version = "0.8.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c26083e9a520e8eb87a06b12347679b142dc2ea29e6e409f805644a7a979a5bc" +checksum = "c2d12fe70b2c1b4401038055f90f151b78208de1f9f89a7dbfd41587a10c3eea" dependencies = [ "atoi", "chrono", @@ -2010,6 +2137,12 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "sync_wrapper" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0bf256ce5efdfa370213c1dabab5935a12e49f2c58d15e9eac2870d3b4f27263" + [[package]] name = "synstructure" version = "0.13.2" @@ -2046,16 +2179,27 @@ dependencies = [ ] [[package]] -name = "tempfile" -version = "3.19.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7437ac7763b9b123ccf33c338a5cc1bac6f69b45a136c19bdd8a65e3916435bf" +name = "task_queue" +version = "0.1.0" dependencies = [ - "fastrand", - "getrandom 0.3.2", - "once_cell", - "rustix", - "windows-sys 0.59.0", + "apalis", + "apalis-sqlite", + "async-stream", + "chrono", + "directories", + "fake", + "figment", + "futures", + "serde", + "serde_json", + "sqlx", + "thiserror", + "tokio", + "tokio-stream", + "tracing", + "tracing-core", + "tracing-subscriber", + "tracing-test", ] [[package]] @@ -2133,6 +2277,7 @@ dependencies = [ "libc", "mio", "pin-project-lite", + "signal-hook-registry", "socket2", "tokio-macros", "windows-sys 0.52.0", @@ -2160,6 +2305,48 @@ dependencies = [ "tokio", ] +[[package]] +name = "tokio-util" +version = "0.7.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2efa149fe76073d6e8fd97ef4f4eca7b67f599660115591483572e406e165594" +dependencies = [ + "bytes", + "futures-core", + "futures-sink", + "pin-project-lite", + "tokio", +] + +[[package]] +name = "tower" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d039ad9159c98b70ecfd540b2573b97f7f52c3e8d9f8ad57a24b916a536975f9" +dependencies = [ + "futures-core", + "futures-util", + "pin-project-lite", + "sync_wrapper", + "tokio", + "tokio-util", + "tower-layer", + "tower-service", + "tracing", +] + +[[package]] +name = "tower-layer" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "121c2a6cda46980bb0fcd1647ffaf6cd3fc79a013de288782836f6df9c48780e" + +[[package]] +name = "tower-service" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8df9b6e13f2d32c91b9bd719c00d1958837bc7dec474d94952798cc8e69eeec3" + [[package]] name = "tracing" version = "0.1.41" @@ -2249,6 +2436,17 @@ version = "1.18.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1dccffe3ce07af9386bfd29e80c0ab1a8205a2fc34e4bcd40364df902cfa8f3f" +[[package]] +name = "ulid" +version = "1.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "470dbf6591da1b39d43c14523b2b469c86879a53e8b758c8e090a470fe7b1fbe" +dependencies = [ + "rand 0.9.1", + "serde", + "web-time", +] + [[package]] name = "uncased" version = "0.9.10" @@ -2291,6 +2489,12 @@ version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1fc81956842c57dac11422a97c3b8195a1ff727f06e85c84ed2e8aa277c9a0fd" +[[package]] +name = "untrusted" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8ecb6da28b8a351d773b68d5825ac39017e680750f980f3a1a85cd8dd28a47c1" + [[package]] name = "url" version = "2.5.4" @@ -2443,6 +2647,34 @@ dependencies = [ name = "web" version = "0.1.0" +[[package]] +name = "web-time" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5a6580f308b1fad9207618087a65c04e7a10bc77e02c8e84e9b00dd4b12fa0bb" +dependencies = [ + "js-sys", + "wasm-bindgen", +] + +[[package]] +name = "webpki-roots" +version = "0.26.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "521bc38abb08001b01866da9f51eb7c5d647a19260e00054a8c7fd5f9e57f7a9" +dependencies = [ + "webpki-roots 1.0.4", +] + +[[package]] +name = "webpki-roots" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b2878ef029c47c6e8cf779119f20fcf52bde7ad42a731b2a304bc221df17571e" +dependencies = [ + "rustls-pki-types", +] + [[package]] name = "whoami" version = "1.6.0" diff --git a/Cargo.toml b/Cargo.toml index df1befe..14eaf11 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -2,5 +2,5 @@ resolver = "3" members = [ - "cli", "lib_sync_core", "web", + "cli", "lib_sync_core", "web", "task_queue", ] diff --git a/lib_sync_core/src/lib.rs b/lib_sync_core/src/lib.rs index 1030a9a..13a5f6a 100644 --- a/lib_sync_core/src/lib.rs +++ b/lib_sync_core/src/lib.rs @@ -1,5 +1,7 @@ +#![allow(dead_code, unused)] + pub mod error; pub(crate) use error::*; -pub mod tasks; mod database; +pub mod tasks; diff --git a/task_queue/Cargo.toml b/task_queue/Cargo.toml new file mode 100644 index 0000000..5ec2395 --- /dev/null +++ b/task_queue/Cargo.toml @@ -0,0 +1,26 @@ +[package] +name = "task_queue" +version = "0.1.0" +edition = "2024" + +[dependencies] +directories = "6.0.0" +tokio = { version = "1.45.0", features = ["default", "rt", "rt-multi-thread", "macros", "signal"] } +tokio-stream = "0.1.17" +sqlx = { version = "0.8", features = ["runtime-tokio", "sqlite", "chrono", "migrate", "uuid"] } +serde = { version = "1.0.219", features = ["derive"] } +chrono = {version = "0.4.41", features = ["serde"]} +serde_json = "1.0.140" +tracing = "0.1.41" +tracing-subscriber = { version = "0.3.19" , features = ["env-filter"]} +figment = { version = "0.10.19", features = ["env"] } +tracing-core = "0.1.33" +futures = "0.3.31" +thiserror = "2.0.12" +async-stream = "0.3.6" +apalis = { version = "1.0.0-rc.1" } +apalis-sqlite = "1.0.0-rc.1" + +[dev-dependencies] +fake = { version = "4.3.0", features = ["derive", "chrono", "http", "uuid"] } +tracing-test = "0.2.5" diff --git a/task_queue/src/error.rs b/task_queue/src/error.rs new file mode 100644 index 0000000..e6129be --- /dev/null +++ b/task_queue/src/error.rs @@ -0,0 +1,28 @@ +use thiserror::Error; + +#[derive(Error, Debug)] +pub enum Error { + #[error("{0}")] + Exception(&'static str), + + #[error("{0}")] + Unhandled(&'static str), + + #[error(transparent)] + Io(#[from] tokio::io::Error), + + #[error(transparent)] + Sqlx(#[from] sqlx::Error), + + #[error(transparent)] + Migration(#[from] sqlx::migrate::MigrateError), + + #[error(transparent)] + ParseJson(#[from] serde_json::Error), + + #[error(transparent)] + WorkerError(#[from] apalis::prelude::WorkerError), +} + +pub type Result = std::result::Result; + diff --git a/task_queue/src/lib.rs b/task_queue/src/lib.rs new file mode 100644 index 0000000..ad9ee6e --- /dev/null +++ b/task_queue/src/lib.rs @@ -0,0 +1,9 @@ +#![allow(dead_code, unused)] + +use apalis::prelude::*; +use serde::{Deserialize, Serialize}; + +pub mod error; + +pub(crate) use error::*; +pub mod tasks; diff --git a/task_queue/src/tasks.rs b/task_queue/src/tasks.rs new file mode 100644 index 0000000..e1adb58 --- /dev/null +++ b/task_queue/src/tasks.rs @@ -0,0 +1,50 @@ +use serde::{Deserialize, Serialize}; + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct Task { + pub id: u32, +} + +#[cfg(test)] +mod tests { + use std::time::Duration; + + use crate::Result; + + use super::*; + use apalis::prelude::*; + use apalis_sqlite::SqliteStorage; + use fake::{Fake, Faker}; + use sqlx::SqlitePool; + use tokio_stream::StreamExt; + + async fn generate_dummy_tasks>(storage: &mut T) + where + ::Error: std::fmt::Debug, + { + storage + .push(super::Task { id: Faker.fake() }) + .await + .unwrap(); + } + + #[tokio::test] + async fn can_enqueue_tasks() { + let pool = SqlitePool::connect(":memory:").await.unwrap(); + SqliteStorage::setup(&pool).await.unwrap(); + let mut backend = SqliteStorage::new(&pool); + + generate_dummy_tasks(&mut backend).await; + + async fn process_task(task: super::Task, worker: WorkerContext) -> crate::Result<()> { + tokio::time::sleep(Duration::from_millis(100)).await; + + worker.stop()?; + Ok(()) + } + let worker = WorkerBuilder::new("rango-tango") + .backend(backend) + .build(process_task); + worker.run().await.unwrap(); + } +}