diff --git a/.idea/codeStyles/Project.xml b/.idea/codeStyles/Project.xml new file mode 100644 index 0000000..4e20976 --- /dev/null +++ b/.idea/codeStyles/Project.xml @@ -0,0 +1,29 @@ + + + + + + + + + + + + \ No newline at end of file diff --git a/.idea/codeStyles/codeStyleConfig.xml b/.idea/codeStyles/codeStyleConfig.xml index a55e7a1..79ee123 100644 --- a/.idea/codeStyles/codeStyleConfig.xml +++ b/.idea/codeStyles/codeStyleConfig.xml @@ -1,5 +1,5 @@ - \ No newline at end of file diff --git a/lib_sync_core/src/database.rs b/lib_sync_core/src/database.rs index 45db661..216e71d 100644 --- a/lib_sync_core/src/database.rs +++ b/lib_sync_core/src/database.rs @@ -68,8 +68,10 @@ impl TasksPage { pub trait TaskStorage { async fn insert_tasks<'a, I: IntoIterator>>(&self, tasks: I) -> crate::Result<()>; - fn get_tasks(&self, options: TaskStatus) -> impl Stream>>; + fn get_tasks(&self, task_status: TaskStatus) -> impl Stream>>; + async fn listen_tasks(&self, task_status: TaskStatus) -> crate::Result<()>; + async fn get_paginated_tasks(&self, page: TaskPagination) -> crate::Result>; } diff --git a/lib_sync_core/src/database/sqlite.rs b/lib_sync_core/src/database/sqlite.rs index 6aac53c..5c09dc4 100644 --- a/lib_sync_core/src/database/sqlite.rs +++ b/lib_sync_core/src/database/sqlite.rs @@ -108,6 +108,10 @@ impl TaskStorage for Sqlite { query.fetch(&self.pool).err_into::() } + async fn listen_tasks(&self, task_status: TaskStatus) -> crate::error::Result<()> { + todo!() + } + async fn get_paginated_tasks(&self, page: TaskPagination) -> crate::Result> { let mut builder: QueryBuilder<'_, sqlx::Sqlite> = QueryBuilder::new( "select id, payload_key, payload, status_id, created_at, updated_at from tasks ", @@ -170,6 +174,7 @@ mod tests { } #[sqlx::test(migrator = "MIGRATIONS")] + #[traced_test] async fn it_save_tasks(pool: SqlitePool) -> sqlx::Result<()> { let (sqlite, tasks) = setup(pool.clone(), 100); diff --git a/lib_sync_core/src/tasks.rs b/lib_sync_core/src/tasks.rs index a4bc96c..1079163 100644 --- a/lib_sync_core/src/tasks.rs +++ b/lib_sync_core/src/tasks.rs @@ -5,6 +5,7 @@ use std::fmt::Display; use tabled::Tabled; mod manager; +mod jobs; #[derive(sqlx::Type, Debug, Clone)] #[repr(u8)] @@ -43,8 +44,6 @@ impl { } -pub type TaskJob = fn(&Task) -> TaskStatus; - #[derive(sqlx::FromRow, Tabled, Debug)] pub struct Task { id: u32, diff --git a/lib_sync_core/src/tasks/jobs.rs b/lib_sync_core/src/tasks/jobs.rs new file mode 100644 index 0000000..74a8ca0 --- /dev/null +++ b/lib_sync_core/src/tasks/jobs.rs @@ -0,0 +1,3 @@ +use crate::tasks::{Task, TaskStatus}; + +pub type TaskJob = fn(&Task) -> TaskStatus; \ No newline at end of file diff --git a/lib_sync_core/src/tasks/manager.rs b/lib_sync_core/src/tasks/manager.rs index bee3cb8..f21fb0c 100644 --- a/lib_sync_core/src/tasks/manager.rs +++ b/lib_sync_core/src/tasks/manager.rs @@ -1,32 +1,147 @@ -use futures::StreamExt; -use std::marker::PhantomData; use crate::database::TaskStorage; -use crate::tasks::{Task, TaskJob, TaskPayload, TaskStatus}; +use crate::tasks::jobs::TaskJob; +use crate::tasks::{Task, TaskPayload, TaskStatus}; +use futures::StreamExt; +use futures::stream::FuturesOrdered; +use std::marker::PhantomData; -struct TaskManager> -{ +pub enum RateLimit { + Buffer(usize), + Rate(usize), + Ticks(usize), + None, +} + +pub struct ExecuteOptions { + rate_limit: RateLimit, +} + +impl ExecuteOptions { + pub fn new() -> Self { + Self::default() + } + + pub fn with_rate_limit(mut self, rate_limit: RateLimit) -> Self { + self.rate_limit = rate_limit; + self + } +} + +impl Default for ExecuteOptions { + fn default() -> Self { + Self { + rate_limit: RateLimit::None, + } + } +} + +struct TaskManager> { storage: T, + options: ExecuteOptions, _marker: PhantomData, } impl> TaskManager { - pub fn new(storage: T) -> Self { - Self { - storage, - _marker: PhantomData, - } + pub fn new(storage: T, options: ExecuteOptions) -> Self { + Self { + storage, + options, + _marker: PhantomData, + } } - + pub async fn run_tasks(&self, func: TaskJob) -> crate::Result<()> { let rows = self.storage.get_tasks(TaskStatus::Pending); - - let result: Vec<(Task, TaskStatus)> = rows.map(|x| { - let task = x.unwrap(); - let status = func(&task); - - (task, status) - }).collect().await; - + + let result: Vec<(Task, TaskStatus)> = rows + .map(async |x| { + let task = x.unwrap(); + let status = func(&task); + + (task, status) + }) + .collect() + .await; + Ok(()) } -} \ No newline at end of file +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::database::{TaskPagination, TasksPage}; + use fake::{Dummy, Fake, Faker}; + use futures::{Stream, StreamExt}; + use serde::{Deserialize, Serialize}; + use sqlx::Row; + use sqlx::types::Uuid; + use tracing_test::traced_test; + + #[derive(Dummy, Serialize, Deserialize, Debug)] + struct DummyTaskPayload { + key: Uuid, + _foo: String, + _bar: String, + } + + struct DummyTaskStorage {} + + impl TaskStorage for DummyTaskStorage { + async fn insert_tasks<'a, I: IntoIterator>>( + &self, + _: I, + ) -> crate::error::Result<()> { + todo!() + } + + fn get_tasks( + &self, + task_status: TaskStatus, + ) -> impl Stream>> { + let payloads: Vec = Faker.fake(); + + let tasks = payloads.into_iter().enumerate().map(move |(i, item)| { + Ok(Task::new((i + 1).to_string(), item, task_status.clone())) + }); + + futures::stream::iter(tasks) + } + + async fn listen_tasks(&self, task_status: TaskStatus) -> crate::error::Result<()> { + todo!() + } + async fn listen_tasks2(&self, task_status: TaskStatus) -> FuturesOrdered> + Sized> { + let mut fifo = FuturesOrdered::new(); + + tokio::spawn(async move { + loop { + tokio::time::sleep(std::time::Duration::from_millis(250)).await; + let payload: DummyTaskPayload = Faker.fake(); + let task_status: TaskStatus = task_status.clone(); + fifo.push_back(async move { + Task::new(payload.key.to_string(), payload, task_status) + }); + } + }); + + fifo + } + + async fn get_paginated_tasks( + &self, + _: TaskPagination, + ) -> crate::error::Result> { + todo!() + } + } + + #[tokio::test] + #[traced_test] + async fn manager_runs() { + let execute_options = ExecuteOptions::new(); + let manager = TaskManager::new(DummyTaskStorage {}, execute_options); + + manager.run_tasks(|_| TaskStatus::Completed).await.unwrap(); + } +}