diff --git a/.idea/codeStyles/codeStyleConfig.xml b/.idea/codeStyles/codeStyleConfig.xml
index 79ee123..6e6eec1 100644
--- a/.idea/codeStyles/codeStyleConfig.xml
+++ b/.idea/codeStyles/codeStyleConfig.xml
@@ -1,5 +1,6 @@
+
\ No newline at end of file
diff --git a/Cargo.lock b/Cargo.lock
index 43de3fc..6292eb7 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -1066,6 +1066,7 @@ dependencies = [
"tabled",
"thiserror",
"tokio",
+ "tokio-stream",
"tracing",
"tracing-core",
"tracing-subscriber",
diff --git a/lib_sync_core/Cargo.toml b/lib_sync_core/Cargo.toml
index 0dcc4f3..be60587 100644
--- a/lib_sync_core/Cargo.toml
+++ b/lib_sync_core/Cargo.toml
@@ -6,6 +6,7 @@ edition = "2024"
[dependencies]
directories = "6.0.0"
tokio = { version = "1.45.0", features = ["default", "rt", "rt-multi-thread", "macros"] }
+tokio-stream = "0.1.17"
sqlx = { version = "0.8", features = ["runtime-tokio", "sqlite", "chrono", "migrate", "uuid"] }
clap = { version = "4.5.37", features = ["derive"] }
serde = { version = "1.0.219", features = ["derive"] }
diff --git a/lib_sync_core/src/database.rs b/lib_sync_core/src/database.rs
index 216e71d..89e5da0 100644
--- a/lib_sync_core/src/database.rs
+++ b/lib_sync_core/src/database.rs
@@ -13,12 +13,12 @@ impl TaskPagination {
pub fn new() -> Self {
Self::default()
}
-
+
pub fn next(&self) -> Self {
- Self {
+ Self {
offset: self.offset.saturating_add(self.limit.unwrap_or(0)),
- ..self.clone()
- }
+ ..self.clone()
+ }
}
pub fn prev(&self) -> Self {
@@ -45,20 +45,17 @@ impl TaskPagination {
}
pub struct TasksPage {
- tasks: Vec>,
- page: TaskPagination
+ tasks: Vec>,
+ page: TaskPagination,
}
impl TasksPage {
fn new(tasks: Vec>, page: TaskPagination) -> Self {
- Self {
- tasks,
- page
- }
+ Self { tasks, page }
}
pub fn next(&self) -> TaskPagination {
- self.page.next()
+ self.page.next()
}
pub fn prev(&self) -> TaskPagination {
@@ -67,11 +64,13 @@ impl TasksPage {
}
pub trait TaskStorage {
- async fn insert_tasks<'a, I: IntoIterator- >>(&self, tasks: I) -> crate::Result<()>;
+ async fn insert_tasks<'a, I: IntoIterator
- >>(
+ &self,
+ tasks: I,
+ ) -> crate::Result<()>;
fn get_tasks(&self, task_status: TaskStatus) -> impl Stream
- >>;
- async fn listen_tasks(&self, task_status: TaskStatus) -> crate::Result<()>;
-
+ fn listen_tasks(&self, task_status: TaskStatus) -> impl Stream
- >>;
+
async fn get_paginated_tasks(&self, page: TaskPagination) -> crate::Result>;
}
-
diff --git a/lib_sync_core/src/database/sqlite.rs b/lib_sync_core/src/database/sqlite.rs
index 5c09dc4..6d11c9a 100644
--- a/lib_sync_core/src/database/sqlite.rs
+++ b/lib_sync_core/src/database/sqlite.rs
@@ -108,10 +108,11 @@ impl TaskStorage for Sqlite {
query.fetch(&self.pool).err_into::()
}
- async fn listen_tasks(&self, task_status: TaskStatus) -> crate::error::Result<()> {
- todo!()
+ fn listen_tasks(&self, task_status: TaskStatus) -> impl Stream
- >> {
+ futures::stream::empty()
}
+
async fn get_paginated_tasks(&self, page: TaskPagination) -> crate::Result> {
let mut builder: QueryBuilder<'_, sqlx::Sqlite> = QueryBuilder::new(
"select id, payload_key, payload, status_id, created_at, updated_at from tasks ",
diff --git a/lib_sync_core/src/tasks.rs b/lib_sync_core/src/tasks.rs
index 1079163..f455941 100644
--- a/lib_sync_core/src/tasks.rs
+++ b/lib_sync_core/src/tasks.rs
@@ -6,6 +6,8 @@ use tabled::Tabled;
mod manager;
mod jobs;
+mod worker;
+mod bus;
#[derive(sqlx::Type, Debug, Clone)]
#[repr(u8)]
@@ -75,9 +77,7 @@ impl Task {
pub fn payload(&self) -> &T {
&self.payload
}
-}
-impl Task {
pub fn get_key(&self) -> String {
self.payload_key.clone()
}
diff --git a/lib_sync_core/src/tasks/bus.rs b/lib_sync_core/src/tasks/bus.rs
new file mode 100644
index 0000000..4e96e56
--- /dev/null
+++ b/lib_sync_core/src/tasks/bus.rs
@@ -0,0 +1,4 @@
+#[derive(Clone)]
+pub enum Bus {
+ Local,
+}
\ No newline at end of file
diff --git a/lib_sync_core/src/tasks/manager.rs b/lib_sync_core/src/tasks/manager.rs
index f21fb0c..eaaef77 100644
--- a/lib_sync_core/src/tasks/manager.rs
+++ b/lib_sync_core/src/tasks/manager.rs
@@ -1,9 +1,14 @@
use crate::database::TaskStorage;
+use crate::tasks::bus::Bus;
use crate::tasks::jobs::TaskJob;
use crate::tasks::{Task, TaskPayload, TaskStatus};
use futures::StreamExt;
-use futures::stream::FuturesOrdered;
use std::marker::PhantomData;
+use std::pin::pin;
+use tokio::sync::mpsc::Receiver;
+use tokio::sync::{mpsc, oneshot};
+use tokio::sync::oneshot::Sender;
+use crate::tasks::worker::TaskMessage;
pub enum RateLimit {
Buffer(usize),
@@ -12,11 +17,12 @@ pub enum RateLimit {
None,
}
-pub struct ExecuteOptions {
+pub struct ManagerOptions {
rate_limit: RateLimit,
+ bus: Bus,
}
-impl ExecuteOptions {
+impl ManagerOptions {
pub fn new() -> Self {
Self::default()
}
@@ -27,22 +33,23 @@ impl ExecuteOptions {
}
}
-impl Default for ExecuteOptions {
+impl Default for ManagerOptions {
fn default() -> Self {
Self {
rate_limit: RateLimit::None,
+ bus: Bus::Local,
}
}
}
struct TaskManager> {
storage: T,
- options: ExecuteOptions,
+ options: ManagerOptions,
_marker: PhantomData
,
}
impl> TaskManager {
- pub fn new(storage: T, options: ExecuteOptions) -> Self {
+ pub fn new(storage: T, options: ManagerOptions) -> Self {
Self {
storage,
options,
@@ -50,18 +57,31 @@ impl> TaskManager {
}
}
- pub async fn run_tasks(&self, func: TaskJob) -> crate::Result<()> {
+ pub async fn run_tasks(&self, mut task_sink: TaskMessage) -> crate::Result<()> {
let rows = self.storage.get_tasks(TaskStatus::Pending);
+ let listener = self.storage.listen_tasks(TaskStatus::Pending);
- let result: Vec<(Task, TaskStatus)> = rows
- .map(async |x| {
- let task = x.unwrap();
- let status = func(&task);
+ let mut queue = pin!(rows.chain(listener));
- (task, status)
- })
- .collect()
- .await;
+ while let Some(task) = queue.next().await {
+ let task = match task {
+ Ok(task) => task,
+ Err(e) => {
+ continue
+ }
+ };
+
+ let sink = match task_sink.recv().await {
+ Some(s) => s,
+ None => break, // sink has stoped requesting tasks
+ };
+
+ if let Err(_) = sink.send(task) {
+ continue;
+ }
+
+ // (task, status)
+ }
Ok(())
}
@@ -71,12 +91,17 @@ impl> TaskManager {
mod tests {
use super::*;
use crate::database::{TaskPagination, TasksPage};
+ use async_stream::stream;
use fake::{Dummy, Fake, Faker};
- use futures::{Stream, StreamExt};
+ use futures::Stream;
use serde::{Deserialize, Serialize};
- use sqlx::Row;
use sqlx::types::Uuid;
+ use sync::mpsc;
+ use tokio::sync;
+ use tokio_stream::wrappers::ReceiverStream;
use tracing_test::traced_test;
+ use crate::error::Error;
+ use crate::tasks::worker::{Worker, WorkerManager};
#[derive(Dummy, Serialize, Deserialize, Debug)]
struct DummyTaskPayload {
@@ -98,7 +123,7 @@ mod tests {
fn get_tasks(
&self,
task_status: TaskStatus,
- ) -> impl Stream- >> {
+ ) -> impl Stream
- >> {
let payloads: Vec = Faker.fake();
let tasks = payloads.into_iter().enumerate().map(move |(i, item)| {
@@ -108,24 +133,27 @@ mod tests {
futures::stream::iter(tasks)
}
- async fn listen_tasks(&self, task_status: TaskStatus) -> crate::error::Result<()> {
- todo!()
- }
- async fn listen_tasks2(&self, task_status: TaskStatus) -> FuturesOrdered> + Sized> {
- let mut fifo = FuturesOrdered::new();
+ fn listen_tasks(
+ &self,
+ task_status: TaskStatus,
+ ) -> impl Stream
- >> {
+ let (tx, rx) = mpsc::channel::>>(10);
tokio::spawn(async move {
- loop {
+ for _ in 0..10 {
tokio::time::sleep(std::time::Duration::from_millis(250)).await;
+
let payload: DummyTaskPayload = Faker.fake();
let task_status: TaskStatus = task_status.clone();
- fifo.push_back(async move {
- Task::new(payload.key.to_string(), payload, task_status)
- });
+ let task = Ok(Task::new(payload.key.to_string(), payload, task_status));
+
+ if let Err(_) = tx.send(task).await {
+ break;
+ }
}
});
-
- fifo
+
+ ReceiverStream::new(rx)
}
async fn get_paginated_tasks(
@@ -136,12 +164,27 @@ mod tests {
}
}
+ struct DummyWorker;
+
+ impl Worker for DummyWorker {
+ fn process_job(task: &Task) -> crate::error::Result<()> {
+ println!("{:#?}", task);
+ Ok(())
+ }
+
+ async fn on_job_failure(task: &Task, error: Error) -> crate::error::Result<()> {
+ println!("{:#?} {:?}", task, error);
+ Ok(())
+ }
+ }
+
#[tokio::test]
#[traced_test]
async fn manager_runs() {
- let execute_options = ExecuteOptions::new();
- let manager = TaskManager::new(DummyTaskStorage {}, execute_options);
+ let execute_options = ManagerOptions::new();
+ let local_worker_sink = WorkerManager::get_listener_sink::(execute_options.bus.clone());
+ let task_manager = TaskManager::new(DummyTaskStorage {}, execute_options);
- manager.run_tasks(|_| TaskStatus::Completed).await.unwrap();
+ task_manager.run_tasks(local_worker_sink).await.unwrap()
}
}
diff --git a/lib_sync_core/src/tasks/worker.rs b/lib_sync_core/src/tasks/worker.rs
new file mode 100644
index 0000000..7e3d917
--- /dev/null
+++ b/lib_sync_core/src/tasks/worker.rs
@@ -0,0 +1,48 @@
+use crate::error::Error;
+use crate::tasks::bus::Bus;
+use crate::tasks::{Task, TaskPayload};
+use tokio::sync::mpsc::Receiver;
+use tokio::sync::oneshot::Sender;
+use tokio::sync::{mpsc, oneshot};
+
+pub type TaskMessage = Receiver>>;
+
+pub struct WorkerManager;
+
+impl WorkerManager {
+ pub fn get_listener_sink>(bus: Bus) -> TaskMessage {
+ match bus {
+ Bus::Local => {
+ let (bus_tx, bus_rx) = mpsc::channel(100);
+ tokio::spawn(async move {
+ loop {
+ // TODO: properly catch errors
+ let (tx, rx) = oneshot::channel();
+
+ // Request a task
+ bus_tx.send(tx).await.unwrap();
+
+ // Wait for a task to be returned
+ let task = rx.await.unwrap();
+
+ W::process_job(&task).unwrap();
+ }
+ });
+
+ bus_rx
+ }
+ }
+ }
+}
+
+pub trait Worker {
+ async fn pre_process_job(task: &Task) -> crate::Result<()> {
+ Ok(())
+ }
+ fn process_job(task: &Task) -> crate::Result<()>;
+ async fn post_process_job(task: &Task) -> crate::Result<()> {
+ Ok(())
+ }
+
+ async fn on_job_failure(task: &Task, error: Error) -> crate::Result<()>;
+}