diff --git a/.idea/codeStyles/Project.xml b/.idea/codeStyles/Project.xml new file mode 100644 index 0000000..4e20976 --- /dev/null +++ b/.idea/codeStyles/Project.xml @@ -0,0 +1,29 @@ + + + + + + + + + + + + \ No newline at end of file diff --git a/.idea/codeStyles/codeStyleConfig.xml b/.idea/codeStyles/codeStyleConfig.xml index a55e7a1..6e6eec1 100644 --- a/.idea/codeStyles/codeStyleConfig.xml +++ b/.idea/codeStyles/codeStyleConfig.xml @@ -1,5 +1,6 @@ + \ No newline at end of file diff --git a/Cargo.lock b/Cargo.lock index 43de3fc..6292eb7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1066,6 +1066,7 @@ dependencies = [ "tabled", "thiserror", "tokio", + "tokio-stream", "tracing", "tracing-core", "tracing-subscriber", diff --git a/lib_sync_core/Cargo.toml b/lib_sync_core/Cargo.toml index 0dcc4f3..be60587 100644 --- a/lib_sync_core/Cargo.toml +++ b/lib_sync_core/Cargo.toml @@ -6,6 +6,7 @@ edition = "2024" [dependencies] directories = "6.0.0" tokio = { version = "1.45.0", features = ["default", "rt", "rt-multi-thread", "macros"] } +tokio-stream = "0.1.17" sqlx = { version = "0.8", features = ["runtime-tokio", "sqlite", "chrono", "migrate", "uuid"] } clap = { version = "4.5.37", features = ["derive"] } serde = { version = "1.0.219", features = ["derive"] } diff --git a/lib_sync_core/src/database.rs b/lib_sync_core/src/database.rs index 45db661..89e5da0 100644 --- a/lib_sync_core/src/database.rs +++ b/lib_sync_core/src/database.rs @@ -13,12 +13,12 @@ impl TaskPagination { pub fn new() -> Self { Self::default() } - + pub fn next(&self) -> Self { - Self { + Self { offset: self.offset.saturating_add(self.limit.unwrap_or(0)), - ..self.clone() - } + ..self.clone() + } } pub fn prev(&self) -> Self { @@ -45,20 +45,17 @@ impl TaskPagination { } pub struct TasksPage { - tasks: Vec>, - page: TaskPagination + tasks: Vec>, + page: TaskPagination, } impl TasksPage { fn new(tasks: Vec>, page: TaskPagination) -> Self { - Self { - tasks, - page - } + Self { tasks, page } } pub fn next(&self) -> TaskPagination { - self.page.next() + self.page.next() } pub fn prev(&self) -> TaskPagination { @@ -67,9 +64,13 @@ impl TasksPage { } pub trait TaskStorage { - async fn insert_tasks<'a, I: IntoIterator>>(&self, tasks: I) -> crate::Result<()>; - fn get_tasks(&self, options: TaskStatus) -> impl Stream>>; + async fn insert_tasks<'a, I: IntoIterator>>( + &self, + tasks: I, + ) -> crate::Result<()>; + fn get_tasks(&self, task_status: TaskStatus) -> impl Stream>>; + + fn listen_tasks(&self, task_status: TaskStatus) -> impl Stream>>; async fn get_paginated_tasks(&self, page: TaskPagination) -> crate::Result>; } - diff --git a/lib_sync_core/src/database/sqlite.rs b/lib_sync_core/src/database/sqlite.rs index 6aac53c..6d11c9a 100644 --- a/lib_sync_core/src/database/sqlite.rs +++ b/lib_sync_core/src/database/sqlite.rs @@ -108,6 +108,11 @@ impl TaskStorage for Sqlite { query.fetch(&self.pool).err_into::() } + fn listen_tasks(&self, task_status: TaskStatus) -> impl Stream>> { + futures::stream::empty() + } + + async fn get_paginated_tasks(&self, page: TaskPagination) -> crate::Result> { let mut builder: QueryBuilder<'_, sqlx::Sqlite> = QueryBuilder::new( "select id, payload_key, payload, status_id, created_at, updated_at from tasks ", @@ -170,6 +175,7 @@ mod tests { } #[sqlx::test(migrator = "MIGRATIONS")] + #[traced_test] async fn it_save_tasks(pool: SqlitePool) -> sqlx::Result<()> { let (sqlite, tasks) = setup(pool.clone(), 100); diff --git a/lib_sync_core/src/tasks.rs b/lib_sync_core/src/tasks.rs index a4bc96c..f455941 100644 --- a/lib_sync_core/src/tasks.rs +++ b/lib_sync_core/src/tasks.rs @@ -5,6 +5,9 @@ use std::fmt::Display; use tabled::Tabled; mod manager; +mod jobs; +mod worker; +mod bus; #[derive(sqlx::Type, Debug, Clone)] #[repr(u8)] @@ -43,8 +46,6 @@ impl { } -pub type TaskJob = fn(&Task) -> TaskStatus; - #[derive(sqlx::FromRow, Tabled, Debug)] pub struct Task { id: u32, @@ -76,9 +77,7 @@ impl Task { pub fn payload(&self) -> &T { &self.payload } -} -impl Task { pub fn get_key(&self) -> String { self.payload_key.clone() } diff --git a/lib_sync_core/src/tasks/bus.rs b/lib_sync_core/src/tasks/bus.rs new file mode 100644 index 0000000..4e96e56 --- /dev/null +++ b/lib_sync_core/src/tasks/bus.rs @@ -0,0 +1,4 @@ +#[derive(Clone)] +pub enum Bus { + Local, +} \ No newline at end of file diff --git a/lib_sync_core/src/tasks/jobs.rs b/lib_sync_core/src/tasks/jobs.rs new file mode 100644 index 0000000..74a8ca0 --- /dev/null +++ b/lib_sync_core/src/tasks/jobs.rs @@ -0,0 +1,3 @@ +use crate::tasks::{Task, TaskStatus}; + +pub type TaskJob = fn(&Task) -> TaskStatus; \ No newline at end of file diff --git a/lib_sync_core/src/tasks/manager.rs b/lib_sync_core/src/tasks/manager.rs index bee3cb8..eaaef77 100644 --- a/lib_sync_core/src/tasks/manager.rs +++ b/lib_sync_core/src/tasks/manager.rs @@ -1,32 +1,190 @@ +use crate::database::TaskStorage; +use crate::tasks::bus::Bus; +use crate::tasks::jobs::TaskJob; +use crate::tasks::{Task, TaskPayload, TaskStatus}; use futures::StreamExt; use std::marker::PhantomData; -use crate::database::TaskStorage; -use crate::tasks::{Task, TaskJob, TaskPayload, TaskStatus}; +use std::pin::pin; +use tokio::sync::mpsc::Receiver; +use tokio::sync::{mpsc, oneshot}; +use tokio::sync::oneshot::Sender; +use crate::tasks::worker::TaskMessage; -struct TaskManager> -{ +pub enum RateLimit { + Buffer(usize), + Rate(usize), + Ticks(usize), + None, +} + +pub struct ManagerOptions { + rate_limit: RateLimit, + bus: Bus, +} + +impl ManagerOptions { + pub fn new() -> Self { + Self::default() + } + + pub fn with_rate_limit(mut self, rate_limit: RateLimit) -> Self { + self.rate_limit = rate_limit; + self + } +} + +impl Default for ManagerOptions { + fn default() -> Self { + Self { + rate_limit: RateLimit::None, + bus: Bus::Local, + } + } +} + +struct TaskManager> { storage: T, + options: ManagerOptions, _marker: PhantomData, } impl> TaskManager { - pub fn new(storage: T) -> Self { - Self { - storage, - _marker: PhantomData, - } + pub fn new(storage: T, options: ManagerOptions) -> Self { + Self { + storage, + options, + _marker: PhantomData, + } } - - pub async fn run_tasks(&self, func: TaskJob) -> crate::Result<()> { + + pub async fn run_tasks(&self, mut task_sink: TaskMessage) -> crate::Result<()> { let rows = self.storage.get_tasks(TaskStatus::Pending); - - let result: Vec<(Task, TaskStatus)> = rows.map(|x| { - let task = x.unwrap(); - let status = func(&task); - - (task, status) - }).collect().await; - + let listener = self.storage.listen_tasks(TaskStatus::Pending); + + let mut queue = pin!(rows.chain(listener)); + + while let Some(task) = queue.next().await { + let task = match task { + Ok(task) => task, + Err(e) => { + continue + } + }; + + let sink = match task_sink.recv().await { + Some(s) => s, + None => break, // sink has stoped requesting tasks + }; + + if let Err(_) = sink.send(task) { + continue; + } + + // (task, status) + } + Ok(()) } -} \ No newline at end of file +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::database::{TaskPagination, TasksPage}; + use async_stream::stream; + use fake::{Dummy, Fake, Faker}; + use futures::Stream; + use serde::{Deserialize, Serialize}; + use sqlx::types::Uuid; + use sync::mpsc; + use tokio::sync; + use tokio_stream::wrappers::ReceiverStream; + use tracing_test::traced_test; + use crate::error::Error; + use crate::tasks::worker::{Worker, WorkerManager}; + + #[derive(Dummy, Serialize, Deserialize, Debug)] + struct DummyTaskPayload { + key: Uuid, + _foo: String, + _bar: String, + } + + struct DummyTaskStorage {} + + impl TaskStorage for DummyTaskStorage { + async fn insert_tasks<'a, I: IntoIterator>>( + &self, + _: I, + ) -> crate::error::Result<()> { + todo!() + } + + fn get_tasks( + &self, + task_status: TaskStatus, + ) -> impl Stream>> { + let payloads: Vec = Faker.fake(); + + let tasks = payloads.into_iter().enumerate().map(move |(i, item)| { + Ok(Task::new((i + 1).to_string(), item, task_status.clone())) + }); + + futures::stream::iter(tasks) + } + + fn listen_tasks( + &self, + task_status: TaskStatus, + ) -> impl Stream>> { + let (tx, rx) = mpsc::channel::>>(10); + + tokio::spawn(async move { + for _ in 0..10 { + tokio::time::sleep(std::time::Duration::from_millis(250)).await; + + let payload: DummyTaskPayload = Faker.fake(); + let task_status: TaskStatus = task_status.clone(); + let task = Ok(Task::new(payload.key.to_string(), payload, task_status)); + + if let Err(_) = tx.send(task).await { + break; + } + } + }); + + ReceiverStream::new(rx) + } + + async fn get_paginated_tasks( + &self, + _: TaskPagination, + ) -> crate::error::Result> { + todo!() + } + } + + struct DummyWorker; + + impl Worker for DummyWorker { + fn process_job(task: &Task) -> crate::error::Result<()> { + println!("{:#?}", task); + Ok(()) + } + + async fn on_job_failure(task: &Task, error: Error) -> crate::error::Result<()> { + println!("{:#?} {:?}", task, error); + Ok(()) + } + } + + #[tokio::test] + #[traced_test] + async fn manager_runs() { + let execute_options = ManagerOptions::new(); + let local_worker_sink = WorkerManager::get_listener_sink::(execute_options.bus.clone()); + let task_manager = TaskManager::new(DummyTaskStorage {}, execute_options); + + task_manager.run_tasks(local_worker_sink).await.unwrap() + } +} diff --git a/lib_sync_core/src/tasks/worker.rs b/lib_sync_core/src/tasks/worker.rs new file mode 100644 index 0000000..7e3d917 --- /dev/null +++ b/lib_sync_core/src/tasks/worker.rs @@ -0,0 +1,48 @@ +use crate::error::Error; +use crate::tasks::bus::Bus; +use crate::tasks::{Task, TaskPayload}; +use tokio::sync::mpsc::Receiver; +use tokio::sync::oneshot::Sender; +use tokio::sync::{mpsc, oneshot}; + +pub type TaskMessage = Receiver>>; + +pub struct WorkerManager; + +impl WorkerManager { + pub fn get_listener_sink>(bus: Bus) -> TaskMessage { + match bus { + Bus::Local => { + let (bus_tx, bus_rx) = mpsc::channel(100); + tokio::spawn(async move { + loop { + // TODO: properly catch errors + let (tx, rx) = oneshot::channel(); + + // Request a task + bus_tx.send(tx).await.unwrap(); + + // Wait for a task to be returned + let task = rx.await.unwrap(); + + W::process_job(&task).unwrap(); + } + }); + + bus_rx + } + } + } +} + +pub trait Worker { + async fn pre_process_job(task: &Task) -> crate::Result<()> { + Ok(()) + } + fn process_job(task: &Task) -> crate::Result<()>; + async fn post_process_job(task: &Task) -> crate::Result<()> { + Ok(()) + } + + async fn on_job_failure(task: &Task, error: Error) -> crate::Result<()>; +}