wip(lib_core): introduce database module
This commit is contained in:
parent
ac3ca325f3
commit
cd8b631d2c
3 changed files with 134 additions and 110 deletions
125
lib_sync_core/src/database.rs
Normal file
125
lib_sync_core/src/database.rs
Normal file
|
|
@ -0,0 +1,125 @@
|
|||
use crate::task_manager::{Task, TaskPayload, TaskStatus};
|
||||
use futures::{Stream, StreamExt};
|
||||
use serde::Serialize;
|
||||
use sqlx::sqlite::{SqliteConnectOptions, SqliteJournalMode};
|
||||
use sqlx::{Error, QueryBuilder, Sqlite, SqlitePool};
|
||||
use std::path::PathBuf;
|
||||
use tokio::fs;
|
||||
use tracing::{info, instrument};
|
||||
|
||||
static SQLITE_BIND_LIMIT: usize = 32766;
|
||||
|
||||
#[derive(Default)]
|
||||
struct SelectQueryOptions {
|
||||
limit: Option<u32>,
|
||||
status: Option<TaskStatus>,
|
||||
}
|
||||
|
||||
impl SelectQueryOptions {
|
||||
pub fn new() -> Self {
|
||||
Self::default()
|
||||
}
|
||||
}
|
||||
|
||||
pub trait TaskStorage<T: TaskPayload> {
|
||||
fn insert_tasks(&self, tasks: Vec<Task<T>>) -> crate::Result<()>;
|
||||
fn get_tasks(&self, options: SelectQueryOptions) -> impl Stream<Item = crate::Result<Task<T>>>;
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct Database {
|
||||
pool: SqlitePool,
|
||||
}
|
||||
|
||||
impl Database {
|
||||
pub async fn new<P: Into<PathBuf>>(base_path: P) -> crate::Result<Self> {
|
||||
Ok(Self {
|
||||
pool: Self::connect_database(base_path).await?,
|
||||
})
|
||||
}
|
||||
|
||||
async fn connect_database<P: Into<PathBuf>>(base_path: P) -> crate::Result<SqlitePool> {
|
||||
let base_path = base_path.into();
|
||||
|
||||
let database_file_path = base_path.join("db.sql");
|
||||
|
||||
fs::create_dir_all(base_path).await?;
|
||||
|
||||
let opts = SqliteConnectOptions::new()
|
||||
.filename(database_file_path)
|
||||
.create_if_missing(true)
|
||||
.journal_mode(SqliteJournalMode::Wal);
|
||||
|
||||
let pool = SqlitePool::connect_with(opts).await?;
|
||||
|
||||
sqlx::migrate!("../migrations").run(&pool).await?;
|
||||
|
||||
Ok(pool)
|
||||
}
|
||||
|
||||
#[instrument(skip(self, values))]
|
||||
pub async fn load_tasks<T>(&self, values: Vec<T>) -> crate::Result<()>
|
||||
where
|
||||
T: TaskPayload + Serialize + std::fmt::Debug,
|
||||
{
|
||||
let mut tx = self.pool.begin().await?;
|
||||
let mut builder: QueryBuilder<'_, Sqlite> =
|
||||
QueryBuilder::new("insert into tasks(payload_key, payload, status_id)");
|
||||
|
||||
let args: crate::Result<Vec<(String, String)>> = values
|
||||
.iter()
|
||||
.map(|value| Ok((value.get_key(), serde_json::to_string(value)?)))
|
||||
.collect();
|
||||
|
||||
let mut affected_rows = 0;
|
||||
// Chunk the query by the size limit of bind params
|
||||
for chunk in args?.chunks(SQLITE_BIND_LIMIT / 3) {
|
||||
builder.push_values(chunk, |mut builder, item| {
|
||||
builder
|
||||
.push_bind(&item.0)
|
||||
.push_bind(&item.1)
|
||||
.push_bind(TaskStatus::Pending);
|
||||
});
|
||||
builder.push("ON conflict (payload_key) DO NOTHING");
|
||||
|
||||
let query = builder.build();
|
||||
|
||||
affected_rows += query.execute(&mut *tx).await?.rows_affected();
|
||||
builder.reset();
|
||||
}
|
||||
|
||||
tx.commit().await?;
|
||||
|
||||
info!("{} rows inserted.", affected_rows);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: TaskPayload> TaskStorage<T> for Database {
|
||||
fn insert_tasks(&self, tasks: Vec<Task<T>>) -> crate::error::Result<()> {
|
||||
todo!()
|
||||
}
|
||||
|
||||
fn get_tasks(&self, options: SelectQueryOptions) -> impl Stream<Item = crate::Result<Task<T>>> {
|
||||
let mut builder: QueryBuilder<'_, Sqlite> = QueryBuilder::new(
|
||||
"select id, payload_key, payload, status_id, created_at, updated_at from tasks ",
|
||||
);
|
||||
|
||||
if let Some(status) = options.status {
|
||||
builder.push("where status_id = ").push_bind(status);
|
||||
}
|
||||
|
||||
builder.push("ORDER BY created_at DESC ");
|
||||
|
||||
if let Some(limit) = options.limit {
|
||||
builder.push("LIMIT ").push_bind(limit);
|
||||
}
|
||||
|
||||
let tasks = builder.build_query_as::<Task<T>>().fetch(&self.pool);
|
||||
|
||||
tasks.map(|item| {
|
||||
item.map_err(Into::into)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
|
@ -2,6 +2,7 @@ pub mod error;
|
|||
|
||||
pub(crate) use error::*;
|
||||
pub mod task_manager;
|
||||
mod database;
|
||||
|
||||
pub fn add(left: u64, right: u64) -> u64 {
|
||||
left + right
|
||||
|
|
|
|||
|
|
@ -8,12 +8,11 @@ use sqlx::sqlite::{SqliteConnectOptions, SqliteJournalMode};
|
|||
use sqlx::{QueryBuilder, Sqlite, SqlitePool};
|
||||
use std::fmt::Display;
|
||||
use std::path::PathBuf;
|
||||
use futures::stream::BoxStream;
|
||||
use tabled::Tabled;
|
||||
use tokio::fs;
|
||||
use tracing::{info, instrument};
|
||||
|
||||
static SQLITE_BIND_LIMIT: usize = 32766;
|
||||
|
||||
#[derive(sqlx::Type, Debug)]
|
||||
#[repr(u8)]
|
||||
pub enum TaskStatus {
|
||||
|
|
@ -42,11 +41,14 @@ impl Display for TaskStatus {
|
|||
}
|
||||
}
|
||||
|
||||
pub trait TaskPayload {
|
||||
pub trait TaskPayloadKey {
|
||||
fn get_key(&self) -> String;
|
||||
}
|
||||
|
||||
pub type TaskJob<T: _Task> = fn(&Task<T>) -> TaskStatus;
|
||||
pub trait TaskPayload: DeserializeOwned + Send + Unpin + 'static + Display + TaskPayloadKey {}
|
||||
impl<T: DeserializeOwned + Send + Unpin + 'static + Display + TaskPayloadKey> TaskPayload for T {}
|
||||
|
||||
pub type TaskJob<T: TaskPayload> = fn(&Task<T>) -> TaskStatus;
|
||||
|
||||
#[derive(sqlx::FromRow, Tabled, Debug)]
|
||||
pub struct Task<T: DeserializeOwned + std::fmt::Display> {
|
||||
|
|
@ -75,115 +77,11 @@ fn display_option_date(o: &Option<chrono::DateTime<Utc>>) -> String {
|
|||
}
|
||||
}
|
||||
|
||||
pub trait _Task: DeserializeOwned + Send + Unpin + 'static + Display {}
|
||||
impl<T: DeserializeOwned + Send + Unpin + 'static + Display> _Task for T {}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct TaskManager {
|
||||
base_path: PathBuf,
|
||||
pool: SqlitePool,
|
||||
}
|
||||
struct TaskManager{}
|
||||
|
||||
impl TaskManager {
|
||||
pub async fn new<P: Into<PathBuf>>(base_path: P) -> Result<TaskManager, Error> {
|
||||
let base_path = base_path.into();
|
||||
let pool = Self::connect_database(base_path.clone()).await?;
|
||||
Ok(Self {
|
||||
base_path,
|
||||
pool,
|
||||
})
|
||||
}
|
||||
|
||||
async fn connect_database<P: Into<PathBuf>>(base_path: P) -> crate::Result<SqlitePool> {
|
||||
let base_path = base_path.into();
|
||||
|
||||
let database_file_path = base_path.join("db.sql");
|
||||
|
||||
fs::create_dir_all(base_path).await?;
|
||||
|
||||
let opts = SqliteConnectOptions::new()
|
||||
.filename(database_file_path)
|
||||
.create_if_missing(true)
|
||||
.journal_mode(SqliteJournalMode::Wal);
|
||||
|
||||
let pool = SqlitePool::connect_with(opts).await?;
|
||||
|
||||
sqlx::migrate!("../migrations").run(&pool).await?;
|
||||
|
||||
Ok(pool)
|
||||
}
|
||||
|
||||
fn get_task_builder(
|
||||
status: Option<TaskStatus>,
|
||||
limit: Option<u16>,
|
||||
) -> QueryBuilder<'static, Sqlite> {
|
||||
let mut builder: QueryBuilder<'_, Sqlite> = QueryBuilder::new(
|
||||
"select id, payload_key, payload, status_id, created_at, updated_at from tasks ",
|
||||
);
|
||||
|
||||
if let Some(status) = status {
|
||||
builder.push("where status_id = ").push_bind(status);
|
||||
}
|
||||
|
||||
builder.push("ORDER BY created_at DESC ");
|
||||
|
||||
if let Some(limit) = limit {
|
||||
builder.push("LIMIT ").push_bind(limit);
|
||||
}
|
||||
builder
|
||||
}
|
||||
|
||||
pub async fn get_tasks<T: _Task>(
|
||||
&self,
|
||||
status: Option<TaskStatus>,
|
||||
limit: Option<u16>,
|
||||
) -> crate::Result<Vec<Task<T>>> {
|
||||
let mut builder = Self::get_task_builder(status, limit);
|
||||
|
||||
let tasks: Vec<Task<T>> = builder.build_query_as().fetch_all(&self.pool).await?;
|
||||
|
||||
Ok(tasks)
|
||||
}
|
||||
|
||||
#[instrument(skip(self, values))]
|
||||
pub async fn load_tasks<T>(&self, values: Vec<T>) -> crate::Result<()>
|
||||
where
|
||||
T: TaskPayload + Serialize + std::fmt::Debug,
|
||||
{
|
||||
let mut tx = self.pool.begin().await?;
|
||||
let mut builder: QueryBuilder<'_, Sqlite> =
|
||||
QueryBuilder::new("insert into tasks(payload_key, payload, status_id)");
|
||||
|
||||
let args: crate::Result<Vec<(String, String)>> = values
|
||||
.iter()
|
||||
.map(|value| Ok((value.get_key(), serde_json::to_string(value)?)))
|
||||
.collect();
|
||||
|
||||
let mut affected_rows = 0;
|
||||
// Chunk the query by the size limit of bind params
|
||||
for chunk in args?.chunks(SQLITE_BIND_LIMIT / 3) {
|
||||
builder.push_values(chunk, |mut builder, item| {
|
||||
builder
|
||||
.push_bind(&item.0)
|
||||
.push_bind(&item.1)
|
||||
.push_bind(TaskStatus::Pending);
|
||||
});
|
||||
builder.push("ON conflict (payload_key) DO NOTHING");
|
||||
|
||||
let query = builder.build();
|
||||
|
||||
affected_rows += query.execute(&mut *tx).await?.rows_affected();
|
||||
builder.reset();
|
||||
}
|
||||
|
||||
tx.commit().await?;
|
||||
|
||||
info!("{} rows inserted.", affected_rows);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn run_tasks<T: _Task>(&self, func: TaskJob<T>) -> crate::Result<()> {
|
||||
pub async fn run_tasks<T: TaskPayload>(&self, func: TaskJob<T>) -> crate::Result<()> {
|
||||
let mut builder = Self::get_task_builder(Some(TaskStatus::Pending), None);
|
||||
|
||||
let rows = builder.build_query_as::<Task<T>>().fetch(&self.pool);
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue