parent
45a3bf291b
commit
94fe050c4a
8 changed files with 282 additions and 45 deletions
|
|
@ -6,7 +6,7 @@ edition = "2024"
|
|||
[dependencies]
|
||||
directories = "6.0.0"
|
||||
tokio = { version = "1.45.0", features = ["default", "rt", "rt-multi-thread", "macros"] }
|
||||
sqlx = { version = "0.8", features = [ "runtime-tokio", "sqlite", "chrono", "migrate" ] }
|
||||
sqlx = { version = "0.8", features = ["runtime-tokio", "sqlite", "chrono", "migrate", "uuid"] }
|
||||
clap = { version = "4.5.37", features = ["derive"] }
|
||||
serde = { version = "1.0.219", features = ["derive"] }
|
||||
chrono = {version = "0.4.41", features = ["serde"]}
|
||||
|
|
@ -19,3 +19,7 @@ tabled = "0.19.0"
|
|||
futures = "0.3.31"
|
||||
thiserror = "2.0.12"
|
||||
async-stream = "0.3.6"
|
||||
uuid = { version = "1.16.0", features = ["serde", "v4"] }
|
||||
|
||||
[dev-dependencies]
|
||||
fake = { version = "4.3.0", features = ["derive", "chrono", "http", "uuid"] }
|
||||
|
|
|
|||
|
|
@ -69,7 +69,7 @@ impl<T: TaskPayload> TasksPage<T> {
|
|||
}
|
||||
|
||||
pub trait TaskStorage<T: TaskPayload> {
|
||||
async fn insert_tasks(&self, tasks: Vec<Task<T>>) -> crate::Result<()>;
|
||||
async fn insert_tasks<'a, I: IntoIterator<Item=&'a Task<T>>>(&self, tasks: I) -> crate::Result<()>;
|
||||
fn get_tasks(&self, options: TaskStatus) -> impl Stream<Item = crate::Result<Task<T>>>;
|
||||
|
||||
async fn get_paginated_tasks(&self, page: &TaskPagination) -> crate::Result<TasksPage<T>>;
|
||||
|
|
|
|||
|
|
@ -1,14 +1,14 @@
|
|||
use crate::database::{TaskPagination, TaskStorage, TasksPage};
|
||||
use crate::tasks::{Task, TaskPayload, TaskStatus};
|
||||
use futures::{Stream, TryStreamExt};
|
||||
use sqlx::sqlite::{SqliteConnectOptions, SqliteJournalMode};
|
||||
use sqlx::{QueryBuilder, SqlitePool};
|
||||
use std::path::PathBuf;
|
||||
use tokio::fs;
|
||||
use sqlx::sqlite::{SqliteConnectOptions, SqliteJournalMode};
|
||||
use tracing::{info, instrument};
|
||||
use futures::{Stream, TryStreamExt};
|
||||
use crate::database::{TaskPagination, TaskStorage, TasksPage};
|
||||
use crate::tasks::{Task, TaskPayload, TaskStatus};
|
||||
|
||||
#[allow(unused)]
|
||||
static SQLITE_BIND_LIMIT: usize = 32766;
|
||||
static MIGRATIONS: sqlx::migrate::Migrator = sqlx::migrate!("../migrations");
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct Sqlite {
|
||||
|
|
@ -36,21 +36,37 @@ impl Sqlite {
|
|||
|
||||
let pool = SqlitePool::connect_with(opts).await?;
|
||||
|
||||
sqlx::migrate!("../migrations").run(&pool).await?;
|
||||
MIGRATIONS.run(&pool).await?;
|
||||
|
||||
Ok(pool)
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: TaskPayload> TaskStorage<T> for Sqlite {
|
||||
/// Insert task into the database for later processing
|
||||
///
|
||||
/// # Arguments
|
||||
///
|
||||
/// * `tasks`: A list of task to be processed, each task has to have a unique key, if a key is repeated, the item will be omitted
|
||||
///
|
||||
/// returns: Result<(), Error>
|
||||
///
|
||||
/// # Examples
|
||||
///
|
||||
/// ```
|
||||
///
|
||||
/// ```
|
||||
#[instrument(skip(self, tasks))]
|
||||
async fn insert_tasks(&self, tasks: Vec<Task<T>>) -> crate::Result<()> {
|
||||
async fn insert_tasks<'a, I: IntoIterator<Item = &'a Task<T>>>(
|
||||
&self,
|
||||
tasks: I,
|
||||
) -> crate::Result<()> {
|
||||
let mut tx = self.pool.begin().await?;
|
||||
let mut builder: QueryBuilder<'_, sqlx::Sqlite> =
|
||||
QueryBuilder::new("insert into tasks(payload_key, payload, status_id)");
|
||||
|
||||
let args: crate::Result<Vec<(String, String)>> = tasks
|
||||
.iter()
|
||||
.into_iter()
|
||||
.map(|value| Ok((value.get_key(), serde_json::to_string(value.payload())?)))
|
||||
.collect();
|
||||
|
||||
|
|
@ -79,14 +95,15 @@ impl<T: TaskPayload> TaskStorage<T> for Sqlite {
|
|||
}
|
||||
|
||||
fn get_tasks(&self, task_status: TaskStatus) -> impl Stream<Item = crate::Result<Task<T>>> {
|
||||
let query= sqlx::query_as::<_, Task<T>>(
|
||||
let query = sqlx::query_as::<_, Task<T>>(
|
||||
"
|
||||
SELECT id, payload_key, payload, status_id, created_at, updated_at
|
||||
FROM tasks
|
||||
WHERE status_id = ?
|
||||
ORDER BY created_at DESC
|
||||
",
|
||||
).bind(task_status);
|
||||
)
|
||||
.bind(task_status);
|
||||
|
||||
query.fetch(&self.pool).err_into::<crate::Error>()
|
||||
}
|
||||
|
|
@ -110,8 +127,71 @@ impl<T: TaskPayload> TaskStorage<T> for Sqlite {
|
|||
builder.push("LIMIT ").push_bind(limit);
|
||||
}
|
||||
|
||||
let tasks = builder.build_query_as::<Task<T>>().fetch_all(&self.pool).await?;
|
||||
let tasks = builder
|
||||
.build_query_as::<Task<T>>()
|
||||
.fetch_all(&self.pool)
|
||||
.await?;
|
||||
|
||||
Ok(TasksPage::new(tasks, page.clone()))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use fake::{Dummy, Fake, Faker};
|
||||
use futures::StreamExt;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use sqlx::types::Uuid;
|
||||
use sqlx::Row;
|
||||
|
||||
#[derive(Dummy, Serialize, Deserialize, Debug)]
|
||||
struct DummyTaskPayload {
|
||||
key: Uuid,
|
||||
_foo: String,
|
||||
_baar: String,
|
||||
}
|
||||
|
||||
#[sqlx::test(migrator = "MIGRATIONS")]
|
||||
async fn it_save_tasks(pool: SqlitePool) -> sqlx::Result<()> {
|
||||
let owned_pool = pool.clone();
|
||||
let sqlite = Sqlite { pool: owned_pool };
|
||||
|
||||
let tasks = generate_dummy_tasks();
|
||||
|
||||
sqlite.insert_tasks(&tasks).await.unwrap();
|
||||
|
||||
let result = sqlx::query("select count(id) from tasks")
|
||||
.fetch_one(&pool)
|
||||
.await?;
|
||||
|
||||
let total_rows: u64 = result.get(0);
|
||||
|
||||
assert_eq!(total_rows as usize, tasks.len());
|
||||
|
||||
let saved_tasks: Vec<Task<DummyTaskPayload>> = sqlite
|
||||
.get_tasks(TaskStatus::Pending)
|
||||
.map(|item| item.unwrap())
|
||||
.collect()
|
||||
.await;
|
||||
|
||||
assert_eq!(tasks.len(), saved_tasks.len());
|
||||
|
||||
let mut zip = tasks.into_iter().zip(saved_tasks.into_iter());
|
||||
|
||||
assert!(zip.all(|(a, b)| {
|
||||
a.get_key() == b.get_key()
|
||||
}));
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn generate_dummy_tasks() -> Vec<Task<DummyTaskPayload>> {
|
||||
let payloads: Vec<DummyTaskPayload> = Faker.fake();
|
||||
|
||||
payloads
|
||||
.into_iter()
|
||||
.map(|item| Task::new(item.key.to_string(), item, TaskStatus::Pending))
|
||||
.collect()
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -3,18 +3,3 @@ pub mod error;
|
|||
pub(crate) use error::*;
|
||||
pub mod tasks;
|
||||
mod database;
|
||||
|
||||
pub fn add(left: u64, right: u64) -> u64 {
|
||||
left + right
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn it_works() {
|
||||
let result = add(2, 2);
|
||||
assert_eq!(result, 4);
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,5 +1,4 @@
|
|||
use chrono::Utc;
|
||||
use futures::StreamExt;
|
||||
use serde::de::DeserializeOwned;
|
||||
use serde::Serialize;
|
||||
use std::fmt::Display;
|
||||
|
|
@ -35,15 +34,11 @@ impl Display for TaskStatus {
|
|||
}
|
||||
}
|
||||
|
||||
pub trait TaskPayloadKey {
|
||||
fn get_key(&self) -> String;
|
||||
}
|
||||
|
||||
pub trait TaskPayload:
|
||||
Serialize + DeserializeOwned + Send + Unpin + 'static + Display + TaskPayloadKey
|
||||
Serialize + DeserializeOwned + Send + Unpin + 'static
|
||||
{
|
||||
}
|
||||
impl<T: Serialize + DeserializeOwned + Send + Unpin + 'static + Display + TaskPayloadKey>
|
||||
impl<T: Serialize + DeserializeOwned + Send + Unpin + 'static>
|
||||
TaskPayload for T
|
||||
{
|
||||
}
|
||||
|
|
@ -64,6 +59,19 @@ pub struct Task<T: TaskPayload> {
|
|||
updated_at: Option<chrono::DateTime<Utc>>,
|
||||
}
|
||||
|
||||
impl<T: TaskPayload> Task<T> {
|
||||
pub fn new(payload_key: String, payload: T, status: TaskStatus) -> Self {
|
||||
Self {
|
||||
id: 0,
|
||||
payload_key,
|
||||
payload,
|
||||
status,
|
||||
created_at: Default::default(),
|
||||
updated_at: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: TaskPayload> Task<T> {
|
||||
pub fn payload(&self) -> &T {
|
||||
&self.payload
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue