Compare commits
2 commits
2126a0f83d
...
45a3bf291b
| Author | SHA1 | Date | |
|---|---|---|---|
| 45a3bf291b | |||
| 4199a97a19 |
11 changed files with 344 additions and 203 deletions
5
.idea/codeStyles/codeStyleConfig.xml
generated
Normal file
5
.idea/codeStyles/codeStyleConfig.xml
generated
Normal file
|
|
@ -0,0 +1,5 @@
|
|||
<component name="ProjectCodeStyleConfiguration">
|
||||
<state>
|
||||
<option name="PREFERRED_PROJECT_CODE_STYLE" value="Default" />
|
||||
</state>
|
||||
</component>
|
||||
23
Cargo.lock
generated
23
Cargo.lock
generated
|
|
@ -110,6 +110,28 @@ dependencies = [
|
|||
"windows-sys 0.59.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "async-stream"
|
||||
version = "0.3.6"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "0b5a71a6f37880a80d1d7f19efd781e4b5de42c88f0722cc13bcb6cc2cfe8476"
|
||||
dependencies = [
|
||||
"async-stream-impl",
|
||||
"futures-core",
|
||||
"pin-project-lite",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "async-stream-impl"
|
||||
version = "0.3.6"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "c7c24de15d275a1ecfd47a380fb4d5ec9bfe0933f309ed5e705b775596a3574d"
|
||||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "atoi"
|
||||
version = "2.0.0"
|
||||
|
|
@ -941,6 +963,7 @@ dependencies = [
|
|||
name = "lib_sync_core"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"async-stream",
|
||||
"chrono",
|
||||
"clap",
|
||||
"directories",
|
||||
|
|
|
|||
|
|
@ -1,4 +1,4 @@
|
|||
use lib_sync_core::task_manager::TaskPayload;
|
||||
use lib_sync_core::tasks::TaskPayload;
|
||||
use chrono::{DateTime, Local};
|
||||
use serde::{de, Deserialize, Deserializer, Serialize};
|
||||
use serde_json::Value;
|
||||
|
|
|
|||
|
|
@ -4,7 +4,7 @@ use figment::{
|
|||
Figment,
|
||||
providers::{Env, Serialized},
|
||||
};
|
||||
use lib_sync_core::task_manager::{TaskManager, TaskStatus};
|
||||
use lib_sync_core::tasks::{TaskStatus};
|
||||
use cli::config::{Command, Config};
|
||||
use cli::{Error, Result};
|
||||
use std::fs::File;
|
||||
|
|
|
|||
|
|
@ -18,3 +18,4 @@ tracing-core = "0.1.33"
|
|||
tabled = "0.19.0"
|
||||
futures = "0.3.31"
|
||||
thiserror = "2.0.12"
|
||||
async-stream = "0.3.6"
|
||||
|
|
|
|||
77
lib_sync_core/src/database.rs
Normal file
77
lib_sync_core/src/database.rs
Normal file
|
|
@ -0,0 +1,77 @@
|
|||
use crate::tasks::{Task, TaskPayload, TaskStatus};
|
||||
use futures::{Stream};
|
||||
mod sqlite;
|
||||
|
||||
#[derive(Default, Clone)]
|
||||
pub struct TaskPagination {
|
||||
page_size: usize,
|
||||
limit: Option<u32>,
|
||||
offset: Option<u32>,
|
||||
status: Option<TaskStatus>,
|
||||
}
|
||||
|
||||
impl TaskPagination {
|
||||
pub fn new() -> Self {
|
||||
Self::default()
|
||||
}
|
||||
|
||||
pub fn next(&self) -> Self {
|
||||
Self {
|
||||
page_size: self.page_size + self.page_size,
|
||||
..self.clone()
|
||||
}
|
||||
}
|
||||
|
||||
pub fn prev(&self) -> Self {
|
||||
Self {
|
||||
page_size: self.page_size - self.page_size,
|
||||
..self.clone()
|
||||
}
|
||||
}
|
||||
|
||||
pub fn set_page_size(&mut self, page_size: usize) {
|
||||
self.page_size = page_size;
|
||||
}
|
||||
|
||||
pub fn set_limit(&mut self, limit: Option<u32>) {
|
||||
self.limit = limit;
|
||||
}
|
||||
|
||||
pub fn set_offset(&mut self, offset: Option<u32>) {
|
||||
self.offset = offset;
|
||||
}
|
||||
|
||||
pub fn set_status(&mut self, status: Option<TaskStatus>) {
|
||||
self.status = status;
|
||||
}
|
||||
}
|
||||
|
||||
pub struct TasksPage<T: TaskPayload> {
|
||||
tasks: Vec<Task<T>>,
|
||||
page: TaskPagination
|
||||
}
|
||||
|
||||
impl<T: TaskPayload> TasksPage<T> {
|
||||
fn new(tasks: Vec<Task<T>>, page: TaskPagination) -> Self {
|
||||
Self {
|
||||
tasks,
|
||||
page
|
||||
}
|
||||
}
|
||||
|
||||
pub fn next(&self) -> TaskPagination {
|
||||
self.page.next()
|
||||
}
|
||||
|
||||
pub fn prev(&self) -> TaskPagination {
|
||||
self.page.prev()
|
||||
}
|
||||
}
|
||||
|
||||
pub trait TaskStorage<T: TaskPayload> {
|
||||
async fn insert_tasks(&self, tasks: Vec<Task<T>>) -> crate::Result<()>;
|
||||
fn get_tasks(&self, options: TaskStatus) -> impl Stream<Item = crate::Result<Task<T>>>;
|
||||
|
||||
async fn get_paginated_tasks(&self, page: &TaskPagination) -> crate::Result<TasksPage<T>>;
|
||||
}
|
||||
|
||||
117
lib_sync_core/src/database/sqlite.rs
Normal file
117
lib_sync_core/src/database/sqlite.rs
Normal file
|
|
@ -0,0 +1,117 @@
|
|||
use sqlx::{QueryBuilder, SqlitePool};
|
||||
use std::path::PathBuf;
|
||||
use tokio::fs;
|
||||
use sqlx::sqlite::{SqliteConnectOptions, SqliteJournalMode};
|
||||
use tracing::{info, instrument};
|
||||
use futures::{Stream, TryStreamExt};
|
||||
use crate::database::{TaskPagination, TaskStorage, TasksPage};
|
||||
use crate::tasks::{Task, TaskPayload, TaskStatus};
|
||||
|
||||
#[allow(unused)]
|
||||
static SQLITE_BIND_LIMIT: usize = 32766;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct Sqlite {
|
||||
pool: SqlitePool,
|
||||
}
|
||||
|
||||
impl Sqlite {
|
||||
pub async fn new<P: Into<PathBuf>>(base_path: P) -> crate::Result<Self> {
|
||||
Ok(Self {
|
||||
pool: Self::connect_database(base_path).await?,
|
||||
})
|
||||
}
|
||||
|
||||
async fn connect_database<P: Into<PathBuf>>(base_path: P) -> crate::Result<SqlitePool> {
|
||||
let base_path = base_path.into();
|
||||
|
||||
let database_file_path = base_path.join("db.sql");
|
||||
|
||||
fs::create_dir_all(base_path).await?;
|
||||
|
||||
let opts = SqliteConnectOptions::new()
|
||||
.filename(database_file_path)
|
||||
.create_if_missing(true)
|
||||
.journal_mode(SqliteJournalMode::Wal);
|
||||
|
||||
let pool = SqlitePool::connect_with(opts).await?;
|
||||
|
||||
sqlx::migrate!("../migrations").run(&pool).await?;
|
||||
|
||||
Ok(pool)
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: TaskPayload> TaskStorage<T> for Sqlite {
|
||||
#[instrument(skip(self, tasks))]
|
||||
async fn insert_tasks(&self, tasks: Vec<Task<T>>) -> crate::Result<()> {
|
||||
let mut tx = self.pool.begin().await?;
|
||||
let mut builder: QueryBuilder<'_, sqlx::Sqlite> =
|
||||
QueryBuilder::new("insert into tasks(payload_key, payload, status_id)");
|
||||
|
||||
let args: crate::Result<Vec<(String, String)>> = tasks
|
||||
.iter()
|
||||
.map(|value| Ok((value.get_key(), serde_json::to_string(value.payload())?)))
|
||||
.collect();
|
||||
|
||||
let mut affected_rows = 0;
|
||||
// Chunk the query by the size limit of bind params
|
||||
for chunk in args?.chunks(SQLITE_BIND_LIMIT / 3) {
|
||||
builder.push_values(chunk, |mut builder, item| {
|
||||
builder
|
||||
.push_bind(&item.0)
|
||||
.push_bind(&item.1)
|
||||
.push_bind(TaskStatus::Pending);
|
||||
});
|
||||
builder.push("ON conflict (payload_key) DO NOTHING");
|
||||
|
||||
let query = builder.build();
|
||||
|
||||
affected_rows += query.execute(&mut *tx).await?.rows_affected();
|
||||
builder.reset();
|
||||
}
|
||||
|
||||
tx.commit().await?;
|
||||
|
||||
info!("{} rows inserted.", affected_rows);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn get_tasks(&self, task_status: TaskStatus) -> impl Stream<Item = crate::Result<Task<T>>> {
|
||||
let query= sqlx::query_as::<_, Task<T>>(
|
||||
"
|
||||
SELECT id, payload_key, payload, status_id, created_at, updated_at
|
||||
FROM tasks
|
||||
WHERE status_id = ?
|
||||
ORDER BY created_at DESC
|
||||
",
|
||||
).bind(task_status);
|
||||
|
||||
query.fetch(&self.pool).err_into::<crate::Error>()
|
||||
}
|
||||
|
||||
async fn get_paginated_tasks(&self, page: &TaskPagination) -> crate::Result<TasksPage<T>> {
|
||||
let mut builder: QueryBuilder<'_, sqlx::Sqlite> = QueryBuilder::new(
|
||||
"select id, payload_key, payload, status_id, created_at, updated_at from tasks ",
|
||||
);
|
||||
|
||||
if let Some(status) = &page.status {
|
||||
builder.push("where status_id = ").push_bind(status);
|
||||
}
|
||||
|
||||
builder.push("ORDER BY created_at DESC ");
|
||||
|
||||
if let Some(limit) = &page.offset {
|
||||
builder.push("OFFSET ").push_bind(limit);
|
||||
}
|
||||
|
||||
if let Some(limit) = &page.limit {
|
||||
builder.push("LIMIT ").push_bind(limit);
|
||||
}
|
||||
|
||||
let tasks = builder.build_query_as::<Task<T>>().fetch_all(&self.pool).await?;
|
||||
|
||||
Ok(TasksPage::new(tasks, page.clone()))
|
||||
}
|
||||
}
|
||||
|
|
@ -1,7 +1,8 @@
|
|||
pub mod error;
|
||||
|
||||
pub(crate) use error::*;
|
||||
pub mod task_manager;
|
||||
pub mod tasks;
|
||||
mod database;
|
||||
|
||||
pub fn add(left: u64, right: u64) -> u64 {
|
||||
left + right
|
||||
|
|
|
|||
|
|
@ -1,200 +0,0 @@
|
|||
use crate::error::Error;
|
||||
use chrono::Utc;
|
||||
use directories::ProjectDirs;
|
||||
use futures::{StreamExt, TryStreamExt};
|
||||
use serde::de::DeserializeOwned;
|
||||
use serde::Serialize;
|
||||
use sqlx::sqlite::{SqliteConnectOptions, SqliteJournalMode};
|
||||
use sqlx::{QueryBuilder, Sqlite, SqlitePool};
|
||||
use std::fmt::Display;
|
||||
use std::path::PathBuf;
|
||||
use tabled::Tabled;
|
||||
use tokio::fs;
|
||||
use tracing::{info, instrument};
|
||||
|
||||
static SQLITE_BIND_LIMIT: usize = 32766;
|
||||
|
||||
#[derive(sqlx::Type, Debug)]
|
||||
#[repr(u8)]
|
||||
pub enum TaskStatus {
|
||||
Pending = 1,
|
||||
InProgress = 2,
|
||||
Completed = 3,
|
||||
Failed = 4,
|
||||
}
|
||||
|
||||
impl Display for TaskStatus {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
match self {
|
||||
TaskStatus::Pending => {
|
||||
write!(f, "Pending")
|
||||
}
|
||||
TaskStatus::InProgress => {
|
||||
write!(f, "In Progress")
|
||||
}
|
||||
TaskStatus::Completed => {
|
||||
write!(f, "Completed")
|
||||
}
|
||||
TaskStatus::Failed => {
|
||||
write!(f, "Failed")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub trait TaskPayload {
|
||||
fn get_key(&self) -> String;
|
||||
}
|
||||
|
||||
pub type TaskJob<T: _Task> = fn(&Task<T>) -> TaskStatus;
|
||||
|
||||
#[derive(sqlx::FromRow, Tabled, Debug)]
|
||||
pub struct Task<T: DeserializeOwned + std::fmt::Display> {
|
||||
id: u32,
|
||||
payload_key: String,
|
||||
#[sqlx(json)]
|
||||
#[tabled(skip)]
|
||||
payload: T,
|
||||
#[sqlx(rename = "status_id")]
|
||||
status: TaskStatus,
|
||||
created_at: chrono::DateTime<Utc>,
|
||||
#[tabled(display = "display_option_date")]
|
||||
updated_at: Option<chrono::DateTime<Utc>>,
|
||||
}
|
||||
|
||||
impl<T: DeserializeOwned + std::fmt::Display> Task<T> {
|
||||
pub fn get_key(&self) -> String {
|
||||
self.payload_key.clone()
|
||||
}
|
||||
}
|
||||
|
||||
fn display_option_date(o: &Option<chrono::DateTime<Utc>>) -> String {
|
||||
match o {
|
||||
Some(s) => format!("{}", s),
|
||||
None => String::from(""),
|
||||
}
|
||||
}
|
||||
|
||||
pub trait _Task: DeserializeOwned + Send + Unpin + 'static + Display {}
|
||||
impl<T: DeserializeOwned + Send + Unpin + 'static + Display> _Task for T {}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct TaskManager {
|
||||
base_path: PathBuf,
|
||||
pool: SqlitePool,
|
||||
}
|
||||
|
||||
impl TaskManager {
|
||||
pub async fn new<P: Into<PathBuf>>(base_path: P) -> Result<TaskManager, Error> {
|
||||
let base_path = base_path.into();
|
||||
let pool = Self::connect_database(base_path.clone()).await?;
|
||||
Ok(Self {
|
||||
base_path,
|
||||
pool,
|
||||
})
|
||||
}
|
||||
|
||||
async fn connect_database<P: Into<PathBuf>>(base_path: P) -> crate::Result<SqlitePool> {
|
||||
let base_path = base_path.into();
|
||||
|
||||
let database_file_path = base_path.join("db.sql");
|
||||
|
||||
fs::create_dir_all(base_path).await?;
|
||||
|
||||
let opts = SqliteConnectOptions::new()
|
||||
.filename(database_file_path)
|
||||
.create_if_missing(true)
|
||||
.journal_mode(SqliteJournalMode::Wal);
|
||||
|
||||
let pool = SqlitePool::connect_with(opts).await?;
|
||||
|
||||
sqlx::migrate!("../migrations").run(&pool).await?;
|
||||
|
||||
Ok(pool)
|
||||
}
|
||||
|
||||
fn get_task_builder(
|
||||
status: Option<TaskStatus>,
|
||||
limit: Option<u16>,
|
||||
) -> QueryBuilder<'static, Sqlite> {
|
||||
let mut builder: QueryBuilder<'_, Sqlite> = QueryBuilder::new(
|
||||
"select id, payload_key, payload, status_id, created_at, updated_at from tasks ",
|
||||
);
|
||||
|
||||
if let Some(status) = status {
|
||||
builder.push("where status_id = ").push_bind(status);
|
||||
}
|
||||
|
||||
builder.push("ORDER BY created_at DESC ");
|
||||
|
||||
if let Some(limit) = limit {
|
||||
builder.push("LIMIT ").push_bind(limit);
|
||||
}
|
||||
builder
|
||||
}
|
||||
|
||||
pub async fn get_tasks<T: _Task>(
|
||||
&self,
|
||||
status: Option<TaskStatus>,
|
||||
limit: Option<u16>,
|
||||
) -> crate::Result<Vec<Task<T>>> {
|
||||
let mut builder = Self::get_task_builder(status, limit);
|
||||
|
||||
let tasks: Vec<Task<T>> = builder.build_query_as().fetch_all(&self.pool).await?;
|
||||
|
||||
Ok(tasks)
|
||||
}
|
||||
|
||||
#[instrument(skip(self, values))]
|
||||
pub async fn load_tasks<T>(&self, values: Vec<T>) -> crate::Result<()>
|
||||
where
|
||||
T: TaskPayload + Serialize + std::fmt::Debug,
|
||||
{
|
||||
let mut tx = self.pool.begin().await?;
|
||||
let mut builder: QueryBuilder<'_, Sqlite> =
|
||||
QueryBuilder::new("insert into tasks(payload_key, payload, status_id)");
|
||||
|
||||
let args: crate::Result<Vec<(String, String)>> = values
|
||||
.iter()
|
||||
.map(|value| Ok((value.get_key(), serde_json::to_string(value)?)))
|
||||
.collect();
|
||||
|
||||
let mut affected_rows = 0;
|
||||
// Chunk the query by the size limit of bind params
|
||||
for chunk in args?.chunks(SQLITE_BIND_LIMIT / 3) {
|
||||
builder.push_values(chunk, |mut builder, item| {
|
||||
builder
|
||||
.push_bind(&item.0)
|
||||
.push_bind(&item.1)
|
||||
.push_bind(TaskStatus::Pending);
|
||||
});
|
||||
builder.push("ON conflict (payload_key) DO NOTHING");
|
||||
|
||||
let query = builder.build();
|
||||
|
||||
affected_rows += query.execute(&mut *tx).await?.rows_affected();
|
||||
builder.reset();
|
||||
}
|
||||
|
||||
tx.commit().await?;
|
||||
|
||||
info!("{} rows inserted.", affected_rows);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn run_tasks<T: _Task>(&self, func: TaskJob<T>) -> crate::Result<()> {
|
||||
let mut builder = Self::get_task_builder(Some(TaskStatus::Pending), None);
|
||||
|
||||
let rows = builder.build_query_as::<Task<T>>().fetch(&self.pool);
|
||||
|
||||
let result: Vec<(Task<T>, TaskStatus)> = rows.map(|x| {
|
||||
let task = x.unwrap();
|
||||
let status = func(&task);
|
||||
|
||||
(task, status)
|
||||
}).collect().await;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
85
lib_sync_core/src/tasks.rs
Normal file
85
lib_sync_core/src/tasks.rs
Normal file
|
|
@ -0,0 +1,85 @@
|
|||
use chrono::Utc;
|
||||
use futures::StreamExt;
|
||||
use serde::de::DeserializeOwned;
|
||||
use serde::Serialize;
|
||||
use std::fmt::Display;
|
||||
use tabled::Tabled;
|
||||
|
||||
mod manager;
|
||||
|
||||
#[derive(sqlx::Type, Debug, Clone)]
|
||||
#[repr(u8)]
|
||||
pub enum TaskStatus {
|
||||
Pending = 1,
|
||||
InProgress = 2,
|
||||
Completed = 3,
|
||||
Failed = 4,
|
||||
}
|
||||
|
||||
impl Display for TaskStatus {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
match self {
|
||||
TaskStatus::Pending => {
|
||||
write!(f, "Pending")
|
||||
}
|
||||
TaskStatus::InProgress => {
|
||||
write!(f, "In Progress")
|
||||
}
|
||||
TaskStatus::Completed => {
|
||||
write!(f, "Completed")
|
||||
}
|
||||
TaskStatus::Failed => {
|
||||
write!(f, "Failed")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub trait TaskPayloadKey {
|
||||
fn get_key(&self) -> String;
|
||||
}
|
||||
|
||||
pub trait TaskPayload:
|
||||
Serialize + DeserializeOwned + Send + Unpin + 'static + Display + TaskPayloadKey
|
||||
{
|
||||
}
|
||||
impl<T: Serialize + DeserializeOwned + Send + Unpin + 'static + Display + TaskPayloadKey>
|
||||
TaskPayload for T
|
||||
{
|
||||
}
|
||||
|
||||
pub type TaskJob<T> = fn(&Task<T>) -> TaskStatus;
|
||||
|
||||
#[derive(sqlx::FromRow, Tabled, Debug)]
|
||||
pub struct Task<T: TaskPayload> {
|
||||
id: u32,
|
||||
payload_key: String,
|
||||
#[sqlx(json)]
|
||||
#[tabled(skip)]
|
||||
payload: T,
|
||||
#[sqlx(rename = "status_id")]
|
||||
status: TaskStatus,
|
||||
created_at: chrono::DateTime<Utc>,
|
||||
#[tabled(display = "display_option_date")]
|
||||
updated_at: Option<chrono::DateTime<Utc>>,
|
||||
}
|
||||
|
||||
impl<T: TaskPayload> Task<T> {
|
||||
pub fn payload(&self) -> &T {
|
||||
&self.payload
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: TaskPayload> Task<T> {
|
||||
pub fn get_key(&self) -> String {
|
||||
self.payload_key.clone()
|
||||
}
|
||||
}
|
||||
|
||||
fn display_option_date(o: &Option<chrono::DateTime<Utc>>) -> String {
|
||||
match o {
|
||||
Some(s) => format!("{}", s),
|
||||
None => String::from(""),
|
||||
}
|
||||
}
|
||||
|
||||
32
lib_sync_core/src/tasks/manager.rs
Normal file
32
lib_sync_core/src/tasks/manager.rs
Normal file
|
|
@ -0,0 +1,32 @@
|
|||
use futures::StreamExt;
|
||||
use std::marker::PhantomData;
|
||||
use crate::database::TaskStorage;
|
||||
use crate::tasks::{Task, TaskJob, TaskPayload, TaskStatus};
|
||||
|
||||
struct TaskManager<S: TaskPayload, T: TaskStorage<S>>
|
||||
{
|
||||
storage: T,
|
||||
_marker: PhantomData<S>,
|
||||
}
|
||||
|
||||
impl<S: TaskPayload, T: TaskStorage<S>> TaskManager<S, T> {
|
||||
pub fn new(storage: T) -> Self {
|
||||
Self {
|
||||
storage,
|
||||
_marker: PhantomData,
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn run_tasks(&self, func: TaskJob<S>) -> crate::Result<()> {
|
||||
let rows = self.storage.get_tasks(TaskStatus::Pending);
|
||||
|
||||
let result: Vec<(Task<S>, TaskStatus)> = rows.map(|x| {
|
||||
let task = x.unwrap();
|
||||
let status = func(&task);
|
||||
|
||||
(task, status)
|
||||
}).collect().await;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
Loading…
Add table
Add a link
Reference in a new issue