From 2c47226dc9e5c5937ec9c0250e4fba9e87feb86f Mon Sep 17 00:00:00 2001 From: aleidk Date: Tue, 20 May 2025 16:49:41 -0400 Subject: [PATCH] wip: add test to task_manager refs: #5 --- .idea/codeStyles/codeStyleConfig.xml | 1 + Cargo.lock | 1 + lib_sync_core/Cargo.toml | 1 + lib_sync_core/src/database.rs | 29 ++++---- lib_sync_core/src/database/sqlite.rs | 5 +- lib_sync_core/src/tasks.rs | 4 +- lib_sync_core/src/tasks/bus.rs | 4 + lib_sync_core/src/tasks/manager.rs | 107 +++++++++++++++++++-------- lib_sync_core/src/tasks/worker.rs | 48 ++++++++++++ 9 files changed, 149 insertions(+), 51 deletions(-) create mode 100644 lib_sync_core/src/tasks/bus.rs create mode 100644 lib_sync_core/src/tasks/worker.rs diff --git a/.idea/codeStyles/codeStyleConfig.xml b/.idea/codeStyles/codeStyleConfig.xml index 79ee123..6e6eec1 100644 --- a/.idea/codeStyles/codeStyleConfig.xml +++ b/.idea/codeStyles/codeStyleConfig.xml @@ -1,5 +1,6 @@ \ No newline at end of file diff --git a/Cargo.lock b/Cargo.lock index 43de3fc..6292eb7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1066,6 +1066,7 @@ dependencies = [ "tabled", "thiserror", "tokio", + "tokio-stream", "tracing", "tracing-core", "tracing-subscriber", diff --git a/lib_sync_core/Cargo.toml b/lib_sync_core/Cargo.toml index 0dcc4f3..be60587 100644 --- a/lib_sync_core/Cargo.toml +++ b/lib_sync_core/Cargo.toml @@ -6,6 +6,7 @@ edition = "2024" [dependencies] directories = "6.0.0" tokio = { version = "1.45.0", features = ["default", "rt", "rt-multi-thread", "macros"] } +tokio-stream = "0.1.17" sqlx = { version = "0.8", features = ["runtime-tokio", "sqlite", "chrono", "migrate", "uuid"] } clap = { version = "4.5.37", features = ["derive"] } serde = { version = "1.0.219", features = ["derive"] } diff --git a/lib_sync_core/src/database.rs b/lib_sync_core/src/database.rs index 216e71d..89e5da0 100644 --- a/lib_sync_core/src/database.rs +++ b/lib_sync_core/src/database.rs @@ -13,12 +13,12 @@ impl TaskPagination { pub fn new() -> Self { Self::default() } - + pub fn next(&self) -> Self { - Self { + Self { offset: self.offset.saturating_add(self.limit.unwrap_or(0)), - ..self.clone() - } + ..self.clone() + } } pub fn prev(&self) -> Self { @@ -45,20 +45,17 @@ impl TaskPagination { } pub struct TasksPage { - tasks: Vec>, - page: TaskPagination + tasks: Vec>, + page: TaskPagination, } impl TasksPage { fn new(tasks: Vec>, page: TaskPagination) -> Self { - Self { - tasks, - page - } + Self { tasks, page } } pub fn next(&self) -> TaskPagination { - self.page.next() + self.page.next() } pub fn prev(&self) -> TaskPagination { @@ -67,11 +64,13 @@ impl TasksPage { } pub trait TaskStorage { - async fn insert_tasks<'a, I: IntoIterator>>(&self, tasks: I) -> crate::Result<()>; + async fn insert_tasks<'a, I: IntoIterator>>( + &self, + tasks: I, + ) -> crate::Result<()>; fn get_tasks(&self, task_status: TaskStatus) -> impl Stream>>; - async fn listen_tasks(&self, task_status: TaskStatus) -> crate::Result<()>; - + fn listen_tasks(&self, task_status: TaskStatus) -> impl Stream>>; + async fn get_paginated_tasks(&self, page: TaskPagination) -> crate::Result>; } - diff --git a/lib_sync_core/src/database/sqlite.rs b/lib_sync_core/src/database/sqlite.rs index 5c09dc4..6d11c9a 100644 --- a/lib_sync_core/src/database/sqlite.rs +++ b/lib_sync_core/src/database/sqlite.rs @@ -108,10 +108,11 @@ impl TaskStorage for Sqlite { query.fetch(&self.pool).err_into::() } - async fn listen_tasks(&self, task_status: TaskStatus) -> crate::error::Result<()> { - todo!() + fn listen_tasks(&self, task_status: TaskStatus) -> impl Stream>> { + futures::stream::empty() } + async fn get_paginated_tasks(&self, page: TaskPagination) -> crate::Result> { let mut builder: QueryBuilder<'_, sqlx::Sqlite> = QueryBuilder::new( "select id, payload_key, payload, status_id, created_at, updated_at from tasks ", diff --git a/lib_sync_core/src/tasks.rs b/lib_sync_core/src/tasks.rs index 1079163..f455941 100644 --- a/lib_sync_core/src/tasks.rs +++ b/lib_sync_core/src/tasks.rs @@ -6,6 +6,8 @@ use tabled::Tabled; mod manager; mod jobs; +mod worker; +mod bus; #[derive(sqlx::Type, Debug, Clone)] #[repr(u8)] @@ -75,9 +77,7 @@ impl Task { pub fn payload(&self) -> &T { &self.payload } -} -impl Task { pub fn get_key(&self) -> String { self.payload_key.clone() } diff --git a/lib_sync_core/src/tasks/bus.rs b/lib_sync_core/src/tasks/bus.rs new file mode 100644 index 0000000..4e96e56 --- /dev/null +++ b/lib_sync_core/src/tasks/bus.rs @@ -0,0 +1,4 @@ +#[derive(Clone)] +pub enum Bus { + Local, +} \ No newline at end of file diff --git a/lib_sync_core/src/tasks/manager.rs b/lib_sync_core/src/tasks/manager.rs index f21fb0c..eaaef77 100644 --- a/lib_sync_core/src/tasks/manager.rs +++ b/lib_sync_core/src/tasks/manager.rs @@ -1,9 +1,14 @@ use crate::database::TaskStorage; +use crate::tasks::bus::Bus; use crate::tasks::jobs::TaskJob; use crate::tasks::{Task, TaskPayload, TaskStatus}; use futures::StreamExt; -use futures::stream::FuturesOrdered; use std::marker::PhantomData; +use std::pin::pin; +use tokio::sync::mpsc::Receiver; +use tokio::sync::{mpsc, oneshot}; +use tokio::sync::oneshot::Sender; +use crate::tasks::worker::TaskMessage; pub enum RateLimit { Buffer(usize), @@ -12,11 +17,12 @@ pub enum RateLimit { None, } -pub struct ExecuteOptions { +pub struct ManagerOptions { rate_limit: RateLimit, + bus: Bus, } -impl ExecuteOptions { +impl ManagerOptions { pub fn new() -> Self { Self::default() } @@ -27,22 +33,23 @@ impl ExecuteOptions { } } -impl Default for ExecuteOptions { +impl Default for ManagerOptions { fn default() -> Self { Self { rate_limit: RateLimit::None, + bus: Bus::Local, } } } struct TaskManager> { storage: T, - options: ExecuteOptions, + options: ManagerOptions, _marker: PhantomData, } impl> TaskManager { - pub fn new(storage: T, options: ExecuteOptions) -> Self { + pub fn new(storage: T, options: ManagerOptions) -> Self { Self { storage, options, @@ -50,18 +57,31 @@ impl> TaskManager { } } - pub async fn run_tasks(&self, func: TaskJob) -> crate::Result<()> { + pub async fn run_tasks(&self, mut task_sink: TaskMessage) -> crate::Result<()> { let rows = self.storage.get_tasks(TaskStatus::Pending); + let listener = self.storage.listen_tasks(TaskStatus::Pending); - let result: Vec<(Task, TaskStatus)> = rows - .map(async |x| { - let task = x.unwrap(); - let status = func(&task); + let mut queue = pin!(rows.chain(listener)); - (task, status) - }) - .collect() - .await; + while let Some(task) = queue.next().await { + let task = match task { + Ok(task) => task, + Err(e) => { + continue + } + }; + + let sink = match task_sink.recv().await { + Some(s) => s, + None => break, // sink has stoped requesting tasks + }; + + if let Err(_) = sink.send(task) { + continue; + } + + // (task, status) + } Ok(()) } @@ -71,12 +91,17 @@ impl> TaskManager { mod tests { use super::*; use crate::database::{TaskPagination, TasksPage}; + use async_stream::stream; use fake::{Dummy, Fake, Faker}; - use futures::{Stream, StreamExt}; + use futures::Stream; use serde::{Deserialize, Serialize}; - use sqlx::Row; use sqlx::types::Uuid; + use sync::mpsc; + use tokio::sync; + use tokio_stream::wrappers::ReceiverStream; use tracing_test::traced_test; + use crate::error::Error; + use crate::tasks::worker::{Worker, WorkerManager}; #[derive(Dummy, Serialize, Deserialize, Debug)] struct DummyTaskPayload { @@ -98,7 +123,7 @@ mod tests { fn get_tasks( &self, task_status: TaskStatus, - ) -> impl Stream>> { + ) -> impl Stream>> { let payloads: Vec = Faker.fake(); let tasks = payloads.into_iter().enumerate().map(move |(i, item)| { @@ -108,24 +133,27 @@ mod tests { futures::stream::iter(tasks) } - async fn listen_tasks(&self, task_status: TaskStatus) -> crate::error::Result<()> { - todo!() - } - async fn listen_tasks2(&self, task_status: TaskStatus) -> FuturesOrdered> + Sized> { - let mut fifo = FuturesOrdered::new(); + fn listen_tasks( + &self, + task_status: TaskStatus, + ) -> impl Stream>> { + let (tx, rx) = mpsc::channel::>>(10); tokio::spawn(async move { - loop { + for _ in 0..10 { tokio::time::sleep(std::time::Duration::from_millis(250)).await; + let payload: DummyTaskPayload = Faker.fake(); let task_status: TaskStatus = task_status.clone(); - fifo.push_back(async move { - Task::new(payload.key.to_string(), payload, task_status) - }); + let task = Ok(Task::new(payload.key.to_string(), payload, task_status)); + + if let Err(_) = tx.send(task).await { + break; + } } }); - - fifo + + ReceiverStream::new(rx) } async fn get_paginated_tasks( @@ -136,12 +164,27 @@ mod tests { } } + struct DummyWorker; + + impl Worker for DummyWorker { + fn process_job(task: &Task) -> crate::error::Result<()> { + println!("{:#?}", task); + Ok(()) + } + + async fn on_job_failure(task: &Task, error: Error) -> crate::error::Result<()> { + println!("{:#?} {:?}", task, error); + Ok(()) + } + } + #[tokio::test] #[traced_test] async fn manager_runs() { - let execute_options = ExecuteOptions::new(); - let manager = TaskManager::new(DummyTaskStorage {}, execute_options); + let execute_options = ManagerOptions::new(); + let local_worker_sink = WorkerManager::get_listener_sink::(execute_options.bus.clone()); + let task_manager = TaskManager::new(DummyTaskStorage {}, execute_options); - manager.run_tasks(|_| TaskStatus::Completed).await.unwrap(); + task_manager.run_tasks(local_worker_sink).await.unwrap() } } diff --git a/lib_sync_core/src/tasks/worker.rs b/lib_sync_core/src/tasks/worker.rs new file mode 100644 index 0000000..7e3d917 --- /dev/null +++ b/lib_sync_core/src/tasks/worker.rs @@ -0,0 +1,48 @@ +use crate::error::Error; +use crate::tasks::bus::Bus; +use crate::tasks::{Task, TaskPayload}; +use tokio::sync::mpsc::Receiver; +use tokio::sync::oneshot::Sender; +use tokio::sync::{mpsc, oneshot}; + +pub type TaskMessage = Receiver>>; + +pub struct WorkerManager; + +impl WorkerManager { + pub fn get_listener_sink>(bus: Bus) -> TaskMessage { + match bus { + Bus::Local => { + let (bus_tx, bus_rx) = mpsc::channel(100); + tokio::spawn(async move { + loop { + // TODO: properly catch errors + let (tx, rx) = oneshot::channel(); + + // Request a task + bus_tx.send(tx).await.unwrap(); + + // Wait for a task to be returned + let task = rx.await.unwrap(); + + W::process_job(&task).unwrap(); + } + }); + + bus_rx + } + } + } +} + +pub trait Worker { + async fn pre_process_job(task: &Task) -> crate::Result<()> { + Ok(()) + } + fn process_job(task: &Task) -> crate::Result<()>; + async fn post_process_job(task: &Task) -> crate::Result<()> { + Ok(()) + } + + async fn on_job_failure(task: &Task, error: Error) -> crate::Result<()>; +}