parent
c52a497075
commit
d87843614a
7 changed files with 177 additions and 24 deletions
29
.idea/codeStyles/Project.xml
generated
Normal file
29
.idea/codeStyles/Project.xml
generated
Normal file
|
|
@ -0,0 +1,29 @@
|
||||||
|
<component name="ProjectCodeStyleConfiguration">
|
||||||
|
<code_scheme name="Project" version="173">
|
||||||
|
<RsCodeStyleSettings>
|
||||||
|
<option name="INDENT_WHERE_CLAUSE" value="true" />
|
||||||
|
</RsCodeStyleSettings>
|
||||||
|
<SqlCodeStyleSettings version="7">
|
||||||
|
<option name="KEYWORD_CASE" value="2" />
|
||||||
|
<option name="SUBQUERY_OPENING" value="1" />
|
||||||
|
<option name="SUBQUERY_CONTENT" value="4" />
|
||||||
|
<option name="SUBQUERY_CLOSING" value="4" />
|
||||||
|
<option name="SUBQUERY_PAR_SPACE_BEFORE" value="1" />
|
||||||
|
<option name="INSERT_INTO_NL" value="2" />
|
||||||
|
<option name="INSERT_COLLAPSE_MULTI_ROW_VALUES" value="true" />
|
||||||
|
<option name="INSERT_MATRIX_ALIGN" value="true" />
|
||||||
|
<option name="INSERT_MATRIX_INCLUDING_HEADER" value="true" />
|
||||||
|
<option name="FROM_ALIGN_JOIN_TABLES" value="true" />
|
||||||
|
<option name="FROM_ALIGN_ALIASES" value="true" />
|
||||||
|
</SqlCodeStyleSettings>
|
||||||
|
<codeStyleSettings language="Rust">
|
||||||
|
<indentOptions>
|
||||||
|
<option name="INDENT_SIZE" value="2" />
|
||||||
|
<option name="CONTINUATION_INDENT_SIZE" value="2" />
|
||||||
|
<option name="TAB_SIZE" value="2" />
|
||||||
|
<option name="USE_TAB_CHARACTER" value="true" />
|
||||||
|
<option name="SMART_TABS" value="true" />
|
||||||
|
</indentOptions>
|
||||||
|
</codeStyleSettings>
|
||||||
|
</code_scheme>
|
||||||
|
</component>
|
||||||
2
.idea/codeStyles/codeStyleConfig.xml
generated
2
.idea/codeStyles/codeStyleConfig.xml
generated
|
|
@ -1,5 +1,5 @@
|
||||||
<component name="ProjectCodeStyleConfiguration">
|
<component name="ProjectCodeStyleConfiguration">
|
||||||
<state>
|
<state>
|
||||||
<option name="PREFERRED_PROJECT_CODE_STYLE" value="Default" />
|
<option name="USE_PER_PROJECT_SETTINGS" value="true" />
|
||||||
</state>
|
</state>
|
||||||
</component>
|
</component>
|
||||||
|
|
@ -68,8 +68,10 @@ impl<T: TaskPayload> TasksPage<T> {
|
||||||
|
|
||||||
pub trait TaskStorage<T: TaskPayload> {
|
pub trait TaskStorage<T: TaskPayload> {
|
||||||
async fn insert_tasks<'a, I: IntoIterator<Item=&'a Task<T>>>(&self, tasks: I) -> crate::Result<()>;
|
async fn insert_tasks<'a, I: IntoIterator<Item=&'a Task<T>>>(&self, tasks: I) -> crate::Result<()>;
|
||||||
fn get_tasks(&self, options: TaskStatus) -> impl Stream<Item = crate::Result<Task<T>>>;
|
fn get_tasks(&self, task_status: TaskStatus) -> impl Stream<Item = crate::Result<Task<T>>>;
|
||||||
|
|
||||||
|
async fn listen_tasks(&self, task_status: TaskStatus) -> crate::Result<()>;
|
||||||
|
|
||||||
async fn get_paginated_tasks(&self, page: TaskPagination) -> crate::Result<TasksPage<T>>;
|
async fn get_paginated_tasks(&self, page: TaskPagination) -> crate::Result<TasksPage<T>>;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -108,6 +108,10 @@ impl<T: TaskPayload> TaskStorage<T> for Sqlite {
|
||||||
query.fetch(&self.pool).err_into::<crate::Error>()
|
query.fetch(&self.pool).err_into::<crate::Error>()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn listen_tasks(&self, task_status: TaskStatus) -> crate::error::Result<()> {
|
||||||
|
todo!()
|
||||||
|
}
|
||||||
|
|
||||||
async fn get_paginated_tasks(&self, page: TaskPagination) -> crate::Result<TasksPage<T>> {
|
async fn get_paginated_tasks(&self, page: TaskPagination) -> crate::Result<TasksPage<T>> {
|
||||||
let mut builder: QueryBuilder<'_, sqlx::Sqlite> = QueryBuilder::new(
|
let mut builder: QueryBuilder<'_, sqlx::Sqlite> = QueryBuilder::new(
|
||||||
"select id, payload_key, payload, status_id, created_at, updated_at from tasks ",
|
"select id, payload_key, payload, status_id, created_at, updated_at from tasks ",
|
||||||
|
|
@ -170,6 +174,7 @@ mod tests {
|
||||||
}
|
}
|
||||||
|
|
||||||
#[sqlx::test(migrator = "MIGRATIONS")]
|
#[sqlx::test(migrator = "MIGRATIONS")]
|
||||||
|
#[traced_test]
|
||||||
async fn it_save_tasks(pool: SqlitePool) -> sqlx::Result<()> {
|
async fn it_save_tasks(pool: SqlitePool) -> sqlx::Result<()> {
|
||||||
let (sqlite, tasks) = setup(pool.clone(), 100);
|
let (sqlite, tasks) = setup(pool.clone(), 100);
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -5,6 +5,7 @@ use std::fmt::Display;
|
||||||
use tabled::Tabled;
|
use tabled::Tabled;
|
||||||
|
|
||||||
mod manager;
|
mod manager;
|
||||||
|
mod jobs;
|
||||||
|
|
||||||
#[derive(sqlx::Type, Debug, Clone)]
|
#[derive(sqlx::Type, Debug, Clone)]
|
||||||
#[repr(u8)]
|
#[repr(u8)]
|
||||||
|
|
@ -43,8 +44,6 @@ impl<T: Serialize + DeserializeOwned + Send + Unpin + 'static + std::fmt::Debug>
|
||||||
{
|
{
|
||||||
}
|
}
|
||||||
|
|
||||||
pub type TaskJob<T> = fn(&Task<T>) -> TaskStatus;
|
|
||||||
|
|
||||||
#[derive(sqlx::FromRow, Tabled, Debug)]
|
#[derive(sqlx::FromRow, Tabled, Debug)]
|
||||||
pub struct Task<T: TaskPayload> {
|
pub struct Task<T: TaskPayload> {
|
||||||
id: u32,
|
id: u32,
|
||||||
|
|
|
||||||
3
lib_sync_core/src/tasks/jobs.rs
Normal file
3
lib_sync_core/src/tasks/jobs.rs
Normal file
|
|
@ -0,0 +1,3 @@
|
||||||
|
use crate::tasks::{Task, TaskStatus};
|
||||||
|
|
||||||
|
pub type TaskJob<T> = fn(&Task<T>) -> TaskStatus;
|
||||||
|
|
@ -1,32 +1,147 @@
|
||||||
use futures::StreamExt;
|
|
||||||
use std::marker::PhantomData;
|
|
||||||
use crate::database::TaskStorage;
|
use crate::database::TaskStorage;
|
||||||
use crate::tasks::{Task, TaskJob, TaskPayload, TaskStatus};
|
use crate::tasks::jobs::TaskJob;
|
||||||
|
use crate::tasks::{Task, TaskPayload, TaskStatus};
|
||||||
|
use futures::StreamExt;
|
||||||
|
use futures::stream::FuturesOrdered;
|
||||||
|
use std::marker::PhantomData;
|
||||||
|
|
||||||
struct TaskManager<S: TaskPayload, T: TaskStorage<S>>
|
pub enum RateLimit {
|
||||||
{
|
Buffer(usize),
|
||||||
|
Rate(usize),
|
||||||
|
Ticks(usize),
|
||||||
|
None,
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct ExecuteOptions {
|
||||||
|
rate_limit: RateLimit,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl ExecuteOptions {
|
||||||
|
pub fn new() -> Self {
|
||||||
|
Self::default()
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn with_rate_limit(mut self, rate_limit: RateLimit) -> Self {
|
||||||
|
self.rate_limit = rate_limit;
|
||||||
|
self
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Default for ExecuteOptions {
|
||||||
|
fn default() -> Self {
|
||||||
|
Self {
|
||||||
|
rate_limit: RateLimit::None,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
struct TaskManager<S: TaskPayload, T: TaskStorage<S>> {
|
||||||
storage: T,
|
storage: T,
|
||||||
|
options: ExecuteOptions,
|
||||||
_marker: PhantomData<S>,
|
_marker: PhantomData<S>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<S: TaskPayload, T: TaskStorage<S>> TaskManager<S, T> {
|
impl<S: TaskPayload, T: TaskStorage<S>> TaskManager<S, T> {
|
||||||
pub fn new(storage: T) -> Self {
|
pub fn new(storage: T, options: ExecuteOptions) -> Self {
|
||||||
Self {
|
Self {
|
||||||
storage,
|
storage,
|
||||||
_marker: PhantomData,
|
options,
|
||||||
}
|
_marker: PhantomData,
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn run_tasks(&self, func: TaskJob<S>) -> crate::Result<()> {
|
pub async fn run_tasks(&self, func: TaskJob<S>) -> crate::Result<()> {
|
||||||
let rows = self.storage.get_tasks(TaskStatus::Pending);
|
let rows = self.storage.get_tasks(TaskStatus::Pending);
|
||||||
|
|
||||||
let result: Vec<(Task<S>, TaskStatus)> = rows.map(|x| {
|
let result: Vec<(Task<S>, TaskStatus)> = rows
|
||||||
let task = x.unwrap();
|
.map(async |x| {
|
||||||
let status = func(&task);
|
let task = x.unwrap();
|
||||||
|
let status = func(&task);
|
||||||
(task, status)
|
|
||||||
}).collect().await;
|
(task, status)
|
||||||
|
})
|
||||||
|
.collect()
|
||||||
|
.await;
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use super::*;
|
||||||
|
use crate::database::{TaskPagination, TasksPage};
|
||||||
|
use fake::{Dummy, Fake, Faker};
|
||||||
|
use futures::{Stream, StreamExt};
|
||||||
|
use serde::{Deserialize, Serialize};
|
||||||
|
use sqlx::Row;
|
||||||
|
use sqlx::types::Uuid;
|
||||||
|
use tracing_test::traced_test;
|
||||||
|
|
||||||
|
#[derive(Dummy, Serialize, Deserialize, Debug)]
|
||||||
|
struct DummyTaskPayload {
|
||||||
|
key: Uuid,
|
||||||
|
_foo: String,
|
||||||
|
_bar: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
struct DummyTaskStorage {}
|
||||||
|
|
||||||
|
impl TaskStorage<DummyTaskPayload> for DummyTaskStorage {
|
||||||
|
async fn insert_tasks<'a, I: IntoIterator<Item = &'a Task<DummyTaskPayload>>>(
|
||||||
|
&self,
|
||||||
|
_: I,
|
||||||
|
) -> crate::error::Result<()> {
|
||||||
|
todo!()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn get_tasks(
|
||||||
|
&self,
|
||||||
|
task_status: TaskStatus,
|
||||||
|
) -> impl Stream<Item = crate::error::Result<Task<DummyTaskPayload>>> {
|
||||||
|
let payloads: Vec<DummyTaskPayload> = Faker.fake();
|
||||||
|
|
||||||
|
let tasks = payloads.into_iter().enumerate().map(move |(i, item)| {
|
||||||
|
Ok(Task::new((i + 1).to_string(), item, task_status.clone()))
|
||||||
|
});
|
||||||
|
|
||||||
|
futures::stream::iter(tasks)
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn listen_tasks(&self, task_status: TaskStatus) -> crate::error::Result<()> {
|
||||||
|
todo!()
|
||||||
|
}
|
||||||
|
async fn listen_tasks2(&self, task_status: TaskStatus) -> FuturesOrdered<impl Future<Output=Task<DummyTaskPayload>> + Sized> {
|
||||||
|
let mut fifo = FuturesOrdered::new();
|
||||||
|
|
||||||
|
tokio::spawn(async move {
|
||||||
|
loop {
|
||||||
|
tokio::time::sleep(std::time::Duration::from_millis(250)).await;
|
||||||
|
let payload: DummyTaskPayload = Faker.fake();
|
||||||
|
let task_status: TaskStatus = task_status.clone();
|
||||||
|
fifo.push_back(async move {
|
||||||
|
Task::new(payload.key.to_string(), payload, task_status)
|
||||||
|
});
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
fifo
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn get_paginated_tasks(
|
||||||
|
&self,
|
||||||
|
_: TaskPagination,
|
||||||
|
) -> crate::error::Result<TasksPage<DummyTaskPayload>> {
|
||||||
|
todo!()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
#[traced_test]
|
||||||
|
async fn manager_runs() {
|
||||||
|
let execute_options = ExecuteOptions::new();
|
||||||
|
let manager = TaskManager::new(DummyTaskStorage {}, execute_options);
|
||||||
|
|
||||||
|
manager.run_tasks(|_| TaskStatus::Completed).await.unwrap();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue