parent
94fe050c4a
commit
c52a497075
7 changed files with 93 additions and 44 deletions
1
.idea/readwise-bulk-upload.iml
generated
1
.idea/readwise-bulk-upload.iml
generated
|
|
@ -5,6 +5,7 @@
|
||||||
<sourceFolder url="file://$MODULE_DIR$/cli/src" isTestSource="false" />
|
<sourceFolder url="file://$MODULE_DIR$/cli/src" isTestSource="false" />
|
||||||
<sourceFolder url="file://$MODULE_DIR$/lib_sync_core/src" isTestSource="false" />
|
<sourceFolder url="file://$MODULE_DIR$/lib_sync_core/src" isTestSource="false" />
|
||||||
<sourceFolder url="file://$MODULE_DIR$/web/src" isTestSource="false" />
|
<sourceFolder url="file://$MODULE_DIR$/web/src" isTestSource="false" />
|
||||||
|
<excludeFolder url="file://$MODULE_DIR$/lib_sync_core/target" />
|
||||||
<excludeFolder url="file://$MODULE_DIR$/target" />
|
<excludeFolder url="file://$MODULE_DIR$/target" />
|
||||||
</content>
|
</content>
|
||||||
<orderEntry type="inheritedJdk" />
|
<orderEntry type="inheritedJdk" />
|
||||||
|
|
|
||||||
22
Cargo.lock
generated
22
Cargo.lock
generated
|
|
@ -1069,6 +1069,7 @@ dependencies = [
|
||||||
"tracing",
|
"tracing",
|
||||||
"tracing-core",
|
"tracing-core",
|
||||||
"tracing-subscriber",
|
"tracing-subscriber",
|
||||||
|
"tracing-test",
|
||||||
"uuid",
|
"uuid",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
|
@ -2220,6 +2221,27 @@ dependencies = [
|
||||||
"tracing-log",
|
"tracing-log",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "tracing-test"
|
||||||
|
version = "0.2.5"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "557b891436fe0d5e0e363427fc7f217abf9ccd510d5136549847bdcbcd011d68"
|
||||||
|
dependencies = [
|
||||||
|
"tracing-core",
|
||||||
|
"tracing-subscriber",
|
||||||
|
"tracing-test-macro",
|
||||||
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "tracing-test-macro"
|
||||||
|
version = "0.2.5"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "04659ddb06c87d233c566112c1c9c5b9e98256d9af50ec3bc9c8327f873a7568"
|
||||||
|
dependencies = [
|
||||||
|
"quote",
|
||||||
|
"syn",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "typenum"
|
name = "typenum"
|
||||||
version = "1.18.0"
|
version = "1.18.0"
|
||||||
|
|
|
||||||
|
|
@ -23,3 +23,4 @@ uuid = { version = "1.16.0", features = ["serde", "v4"] }
|
||||||
|
|
||||||
[dev-dependencies]
|
[dev-dependencies]
|
||||||
fake = { version = "4.3.0", features = ["derive", "chrono", "http", "uuid"] }
|
fake = { version = "4.3.0", features = ["derive", "chrono", "http", "uuid"] }
|
||||||
|
tracing-test = "0.2.5"
|
||||||
|
|
|
||||||
|
|
@ -1,12 +1,11 @@
|
||||||
use crate::tasks::{Task, TaskPayload, TaskStatus};
|
use crate::tasks::{Task, TaskPayload, TaskStatus};
|
||||||
use futures::{Stream};
|
use futures::Stream;
|
||||||
mod sqlite;
|
mod sqlite;
|
||||||
|
|
||||||
#[derive(Default, Clone)]
|
#[derive(Default, Clone, Debug)]
|
||||||
pub struct TaskPagination {
|
pub struct TaskPagination {
|
||||||
page_size: usize,
|
|
||||||
limit: Option<u32>,
|
limit: Option<u32>,
|
||||||
offset: Option<u32>,
|
offset: u32,
|
||||||
status: Option<TaskStatus>,
|
status: Option<TaskStatus>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -17,38 +16,37 @@ impl TaskPagination {
|
||||||
|
|
||||||
pub fn next(&self) -> Self {
|
pub fn next(&self) -> Self {
|
||||||
Self {
|
Self {
|
||||||
page_size: self.page_size + self.page_size,
|
offset: self.offset.saturating_add(self.limit.unwrap_or(0)),
|
||||||
..self.clone()
|
..self.clone()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn prev(&self) -> Self {
|
pub fn prev(&self) -> Self {
|
||||||
Self {
|
Self {
|
||||||
page_size: self.page_size - self.page_size,
|
offset: self.offset.saturating_sub(self.limit.unwrap_or(0)),
|
||||||
..self.clone()
|
..self.clone()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn set_page_size(&mut self, page_size: usize) {
|
pub fn with_limit(mut self, limit: Option<u32>) -> Self {
|
||||||
self.page_size = page_size;
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn set_limit(&mut self, limit: Option<u32>) {
|
|
||||||
self.limit = limit;
|
self.limit = limit;
|
||||||
|
self
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn set_offset(&mut self, offset: Option<u32>) {
|
pub fn with_offset(mut self, offset: u32) -> Self {
|
||||||
self.offset = offset;
|
self.offset = offset;
|
||||||
|
self
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn set_status(&mut self, status: Option<TaskStatus>) {
|
pub fn with_status(mut self, status: Option<TaskStatus>) -> Self {
|
||||||
self.status = status;
|
self.status = status;
|
||||||
|
self
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct TasksPage<T: TaskPayload> {
|
pub struct TasksPage<T: TaskPayload> {
|
||||||
tasks: Vec<Task<T>>,
|
tasks: Vec<Task<T>>,
|
||||||
page: TaskPagination
|
page: TaskPagination
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<T: TaskPayload> TasksPage<T> {
|
impl<T: TaskPayload> TasksPage<T> {
|
||||||
|
|
@ -72,6 +70,6 @@ pub trait TaskStorage<T: TaskPayload> {
|
||||||
async fn insert_tasks<'a, I: IntoIterator<Item=&'a Task<T>>>(&self, tasks: I) -> crate::Result<()>;
|
async fn insert_tasks<'a, I: IntoIterator<Item=&'a Task<T>>>(&self, tasks: I) -> crate::Result<()>;
|
||||||
fn get_tasks(&self, options: TaskStatus) -> impl Stream<Item = crate::Result<Task<T>>>;
|
fn get_tasks(&self, options: TaskStatus) -> impl Stream<Item = crate::Result<Task<T>>>;
|
||||||
|
|
||||||
async fn get_paginated_tasks(&self, page: &TaskPagination) -> crate::Result<TasksPage<T>>;
|
async fn get_paginated_tasks(&self, page: TaskPagination) -> crate::Result<TasksPage<T>>;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -5,7 +5,7 @@ use sqlx::sqlite::{SqliteConnectOptions, SqliteJournalMode};
|
||||||
use sqlx::{QueryBuilder, SqlitePool};
|
use sqlx::{QueryBuilder, SqlitePool};
|
||||||
use std::path::PathBuf;
|
use std::path::PathBuf;
|
||||||
use tokio::fs;
|
use tokio::fs;
|
||||||
use tracing::{info, instrument};
|
use tracing::{debug, info, instrument};
|
||||||
|
|
||||||
static SQLITE_BIND_LIMIT: usize = 32766;
|
static SQLITE_BIND_LIMIT: usize = 32766;
|
||||||
static MIGRATIONS: sqlx::migrate::Migrator = sqlx::migrate!("../migrations");
|
static MIGRATIONS: sqlx::migrate::Migrator = sqlx::migrate!("../migrations");
|
||||||
|
|
@ -108,7 +108,7 @@ impl<T: TaskPayload> TaskStorage<T> for Sqlite {
|
||||||
query.fetch(&self.pool).err_into::<crate::Error>()
|
query.fetch(&self.pool).err_into::<crate::Error>()
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn get_paginated_tasks(&self, page: &TaskPagination) -> crate::Result<TasksPage<T>> {
|
async fn get_paginated_tasks(&self, page: TaskPagination) -> crate::Result<TasksPage<T>> {
|
||||||
let mut builder: QueryBuilder<'_, sqlx::Sqlite> = QueryBuilder::new(
|
let mut builder: QueryBuilder<'_, sqlx::Sqlite> = QueryBuilder::new(
|
||||||
"select id, payload_key, payload, status_id, created_at, updated_at from tasks ",
|
"select id, payload_key, payload, status_id, created_at, updated_at from tasks ",
|
||||||
);
|
);
|
||||||
|
|
@ -119,13 +119,13 @@ impl<T: TaskPayload> TaskStorage<T> for Sqlite {
|
||||||
|
|
||||||
builder.push("ORDER BY created_at DESC ");
|
builder.push("ORDER BY created_at DESC ");
|
||||||
|
|
||||||
if let Some(limit) = &page.offset {
|
|
||||||
builder.push("OFFSET ").push_bind(limit);
|
if let Some(limit) = page.limit {
|
||||||
|
builder.push("LIMIT ").push_bind(limit);
|
||||||
|
builder.push(" OFFSET ").push_bind(page.offset);
|
||||||
}
|
}
|
||||||
|
|
||||||
if let Some(limit) = &page.limit {
|
debug!("SQL: \"{}\", with options: \"{:?}\"", builder.sql(), page);
|
||||||
builder.push("LIMIT ").push_bind(limit);
|
|
||||||
}
|
|
||||||
|
|
||||||
let tasks = builder
|
let tasks = builder
|
||||||
.build_query_as::<Task<T>>()
|
.build_query_as::<Task<T>>()
|
||||||
|
|
@ -139,25 +139,39 @@ impl<T: TaskPayload> TaskStorage<T> for Sqlite {
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
use super::*;
|
use super::*;
|
||||||
use fake::{Dummy, Fake, Faker};
|
use fake::Dummy;
|
||||||
use futures::StreamExt;
|
use futures::StreamExt;
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
use sqlx::types::Uuid;
|
use sqlx::types::Uuid;
|
||||||
use sqlx::Row;
|
use sqlx::Row;
|
||||||
|
use tracing_test::traced_test;
|
||||||
|
|
||||||
#[derive(Dummy, Serialize, Deserialize, Debug)]
|
#[derive(Dummy, Serialize, Deserialize, Debug)]
|
||||||
struct DummyTaskPayload {
|
struct DummyTaskPayload {
|
||||||
key: Uuid,
|
key: Uuid,
|
||||||
_foo: String,
|
_foo: String,
|
||||||
_baar: String,
|
_bar: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
//noinspection RsUnresolvedPath
|
||||||
|
fn setup(pool: SqlitePool, len: usize) -> (Sqlite, Vec<Task<DummyTaskPayload>>) {
|
||||||
|
let owned_pool = pool.clone();
|
||||||
|
let sqlite = Sqlite { pool: owned_pool };
|
||||||
|
|
||||||
|
let payloads = fake::vec![DummyTaskPayload; len];
|
||||||
|
|
||||||
|
let tasks = payloads
|
||||||
|
.into_iter()
|
||||||
|
.enumerate()
|
||||||
|
.map(|(i, item)| Task::new((i + 1).to_string(), item, TaskStatus::Pending))
|
||||||
|
.collect();
|
||||||
|
|
||||||
|
(sqlite, tasks)
|
||||||
}
|
}
|
||||||
|
|
||||||
#[sqlx::test(migrator = "MIGRATIONS")]
|
#[sqlx::test(migrator = "MIGRATIONS")]
|
||||||
async fn it_save_tasks(pool: SqlitePool) -> sqlx::Result<()> {
|
async fn it_save_tasks(pool: SqlitePool) -> sqlx::Result<()> {
|
||||||
let owned_pool = pool.clone();
|
let (sqlite, tasks) = setup(pool.clone(), 100);
|
||||||
let sqlite = Sqlite { pool: owned_pool };
|
|
||||||
|
|
||||||
let tasks = generate_dummy_tasks();
|
|
||||||
|
|
||||||
sqlite.insert_tasks(&tasks).await.unwrap();
|
sqlite.insert_tasks(&tasks).await.unwrap();
|
||||||
|
|
||||||
|
|
@ -179,19 +193,32 @@ struct DummyTaskPayload {
|
||||||
|
|
||||||
let mut zip = tasks.into_iter().zip(saved_tasks.into_iter());
|
let mut zip = tasks.into_iter().zip(saved_tasks.into_iter());
|
||||||
|
|
||||||
assert!(zip.all(|(a, b)| {
|
assert!(zip.all(|(a, b)| { a.get_key() == b.get_key() }));
|
||||||
a.get_key() == b.get_key()
|
|
||||||
}));
|
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
fn generate_dummy_tasks() -> Vec<Task<DummyTaskPayload>> {
|
#[sqlx::test(migrator = "MIGRATIONS")]
|
||||||
let payloads: Vec<DummyTaskPayload> = Faker.fake();
|
#[traced_test]
|
||||||
|
async fn it_return_paginated_tasks(pool: SqlitePool) -> sqlx::Result<()> {
|
||||||
|
let (sqlite, tasks) = setup(pool.clone(), 300);
|
||||||
|
|
||||||
payloads
|
sqlite.insert_tasks(&tasks).await.unwrap();
|
||||||
.into_iter()
|
|
||||||
.map(|item| Task::new(item.key.to_string(), item, TaskStatus::Pending))
|
let page_options = TaskPagination::new().with_limit(Some(25));
|
||||||
.collect()
|
|
||||||
|
let first_page: TasksPage<DummyTaskPayload> =
|
||||||
|
sqlite.get_paginated_tasks(page_options).await.unwrap();
|
||||||
|
|
||||||
|
assert_eq!(first_page.tasks.len(), 25);
|
||||||
|
assert_eq!(first_page.tasks.first().unwrap().get_key(), tasks.get(0).unwrap().get_key());
|
||||||
|
|
||||||
|
let second_page: TasksPage<DummyTaskPayload> =
|
||||||
|
sqlite.get_paginated_tasks(first_page.next()).await.unwrap();
|
||||||
|
|
||||||
|
assert_eq!(second_page.tasks.len(), 25);
|
||||||
|
assert_eq!(second_page.tasks.first().unwrap().get_key(), tasks.get(25).unwrap().get_key());
|
||||||
|
|
||||||
|
Ok(())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -35,10 +35,10 @@ impl Display for TaskStatus {
|
||||||
}
|
}
|
||||||
|
|
||||||
pub trait TaskPayload:
|
pub trait TaskPayload:
|
||||||
Serialize + DeserializeOwned + Send + Unpin + 'static
|
Serialize + DeserializeOwned + Send + Unpin + 'static + std::fmt::Debug
|
||||||
{
|
{
|
||||||
}
|
}
|
||||||
impl<T: Serialize + DeserializeOwned + Send + Unpin + 'static>
|
impl<T: Serialize + DeserializeOwned + Send + Unpin + 'static + std::fmt::Debug>
|
||||||
TaskPayload for T
|
TaskPayload for T
|
||||||
{
|
{
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -3,7 +3,7 @@ create table tasks
|
||||||
id integer not null
|
id integer not null
|
||||||
constraint tasks_pk
|
constraint tasks_pk
|
||||||
primary key autoincrement,
|
primary key autoincrement,
|
||||||
payload_key ANY not null
|
payload_key TEXT not null
|
||||||
constraint tasks_payload_key
|
constraint tasks_payload_key
|
||||||
unique on conflict ignore,
|
unique on conflict ignore,
|
||||||
payload TEXT not null,
|
payload TEXT not null,
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue