diff --git a/.idea/readwise-bulk-upload.iml b/.idea/readwise-bulk-upload.iml
index cc58e9c..0855a40 100644
--- a/.idea/readwise-bulk-upload.iml
+++ b/.idea/readwise-bulk-upload.iml
@@ -5,6 +5,7 @@
+
diff --git a/Cargo.lock b/Cargo.lock
index c144106..43de3fc 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -1069,6 +1069,7 @@ dependencies = [
"tracing",
"tracing-core",
"tracing-subscriber",
+ "tracing-test",
"uuid",
]
@@ -2220,6 +2221,27 @@ dependencies = [
"tracing-log",
]
+[[package]]
+name = "tracing-test"
+version = "0.2.5"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "557b891436fe0d5e0e363427fc7f217abf9ccd510d5136549847bdcbcd011d68"
+dependencies = [
+ "tracing-core",
+ "tracing-subscriber",
+ "tracing-test-macro",
+]
+
+[[package]]
+name = "tracing-test-macro"
+version = "0.2.5"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "04659ddb06c87d233c566112c1c9c5b9e98256d9af50ec3bc9c8327f873a7568"
+dependencies = [
+ "quote",
+ "syn",
+]
+
[[package]]
name = "typenum"
version = "1.18.0"
diff --git a/lib_sync_core/Cargo.toml b/lib_sync_core/Cargo.toml
index 82df9ee..0dcc4f3 100644
--- a/lib_sync_core/Cargo.toml
+++ b/lib_sync_core/Cargo.toml
@@ -23,3 +23,4 @@ uuid = { version = "1.16.0", features = ["serde", "v4"] }
[dev-dependencies]
fake = { version = "4.3.0", features = ["derive", "chrono", "http", "uuid"] }
+tracing-test = "0.2.5"
diff --git a/lib_sync_core/src/database.rs b/lib_sync_core/src/database.rs
index b01914f..45db661 100644
--- a/lib_sync_core/src/database.rs
+++ b/lib_sync_core/src/database.rs
@@ -1,12 +1,11 @@
use crate::tasks::{Task, TaskPayload, TaskStatus};
-use futures::{Stream};
+use futures::Stream;
mod sqlite;
-#[derive(Default, Clone)]
+#[derive(Default, Clone, Debug)]
pub struct TaskPagination {
- page_size: usize,
limit: Option,
- offset: Option,
+ offset: u32,
status: Option,
}
@@ -17,38 +16,37 @@ impl TaskPagination {
pub fn next(&self) -> Self {
Self {
- page_size: self.page_size + self.page_size,
+ offset: self.offset.saturating_add(self.limit.unwrap_or(0)),
..self.clone()
- }
+ }
}
pub fn prev(&self) -> Self {
Self {
- page_size: self.page_size - self.page_size,
+ offset: self.offset.saturating_sub(self.limit.unwrap_or(0)),
..self.clone()
}
}
- pub fn set_page_size(&mut self, page_size: usize) {
- self.page_size = page_size;
- }
-
- pub fn set_limit(&mut self, limit: Option) {
+ pub fn with_limit(mut self, limit: Option) -> Self {
self.limit = limit;
+ self
}
- pub fn set_offset(&mut self, offset: Option) {
+ pub fn with_offset(mut self, offset: u32) -> Self {
self.offset = offset;
+ self
}
- pub fn set_status(&mut self, status: Option) {
+ pub fn with_status(mut self, status: Option) -> Self {
self.status = status;
+ self
}
}
pub struct TasksPage {
tasks: Vec>,
- page: TaskPagination
+ page: TaskPagination
}
impl TasksPage {
@@ -72,6 +70,6 @@ pub trait TaskStorage {
async fn insert_tasks<'a, I: IntoIterator- >>(&self, tasks: I) -> crate::Result<()>;
fn get_tasks(&self, options: TaskStatus) -> impl Stream
- >>;
- async fn get_paginated_tasks(&self, page: &TaskPagination) -> crate::Result>;
+ async fn get_paginated_tasks(&self, page: TaskPagination) -> crate::Result>;
}
diff --git a/lib_sync_core/src/database/sqlite.rs b/lib_sync_core/src/database/sqlite.rs
index d231990..6aac53c 100644
--- a/lib_sync_core/src/database/sqlite.rs
+++ b/lib_sync_core/src/database/sqlite.rs
@@ -5,7 +5,7 @@ use sqlx::sqlite::{SqliteConnectOptions, SqliteJournalMode};
use sqlx::{QueryBuilder, SqlitePool};
use std::path::PathBuf;
use tokio::fs;
-use tracing::{info, instrument};
+use tracing::{debug, info, instrument};
static SQLITE_BIND_LIMIT: usize = 32766;
static MIGRATIONS: sqlx::migrate::Migrator = sqlx::migrate!("../migrations");
@@ -108,7 +108,7 @@ impl TaskStorage for Sqlite {
query.fetch(&self.pool).err_into::()
}
- async fn get_paginated_tasks(&self, page: &TaskPagination) -> crate::Result> {
+ async fn get_paginated_tasks(&self, page: TaskPagination) -> crate::Result> {
let mut builder: QueryBuilder<'_, sqlx::Sqlite> = QueryBuilder::new(
"select id, payload_key, payload, status_id, created_at, updated_at from tasks ",
);
@@ -119,13 +119,13 @@ impl TaskStorage for Sqlite {
builder.push("ORDER BY created_at DESC ");
- if let Some(limit) = &page.offset {
- builder.push("OFFSET ").push_bind(limit);
+
+ if let Some(limit) = page.limit {
+ builder.push("LIMIT ").push_bind(limit);
+ builder.push(" OFFSET ").push_bind(page.offset);
}
- if let Some(limit) = &page.limit {
- builder.push("LIMIT ").push_bind(limit);
- }
+ debug!("SQL: \"{}\", with options: \"{:?}\"", builder.sql(), page);
let tasks = builder
.build_query_as::>()
@@ -139,25 +139,39 @@ impl TaskStorage for Sqlite {
#[cfg(test)]
mod tests {
use super::*;
- use fake::{Dummy, Fake, Faker};
+ use fake::Dummy;
use futures::StreamExt;
use serde::{Deserialize, Serialize};
use sqlx::types::Uuid;
use sqlx::Row;
+ use tracing_test::traced_test;
#[derive(Dummy, Serialize, Deserialize, Debug)]
-struct DummyTaskPayload {
+ struct DummyTaskPayload {
key: Uuid,
_foo: String,
- _baar: String,
+ _bar: String,
+ }
+
+ //noinspection RsUnresolvedPath
+ fn setup(pool: SqlitePool, len: usize) -> (Sqlite, Vec>) {
+ let owned_pool = pool.clone();
+ let sqlite = Sqlite { pool: owned_pool };
+
+ let payloads = fake::vec![DummyTaskPayload; len];
+
+ let tasks = payloads
+ .into_iter()
+ .enumerate()
+ .map(|(i, item)| Task::new((i + 1).to_string(), item, TaskStatus::Pending))
+ .collect();
+
+ (sqlite, tasks)
}
#[sqlx::test(migrator = "MIGRATIONS")]
async fn it_save_tasks(pool: SqlitePool) -> sqlx::Result<()> {
- let owned_pool = pool.clone();
- let sqlite = Sqlite { pool: owned_pool };
-
- let tasks = generate_dummy_tasks();
+ let (sqlite, tasks) = setup(pool.clone(), 100);
sqlite.insert_tasks(&tasks).await.unwrap();
@@ -176,22 +190,35 @@ struct DummyTaskPayload {
.await;
assert_eq!(tasks.len(), saved_tasks.len());
-
+
let mut zip = tasks.into_iter().zip(saved_tasks.into_iter());
- assert!(zip.all(|(a, b)| {
- a.get_key() == b.get_key()
- }));
-
+ assert!(zip.all(|(a, b)| { a.get_key() == b.get_key() }));
+
Ok(())
}
- fn generate_dummy_tasks() -> Vec> {
- let payloads: Vec = Faker.fake();
+ #[sqlx::test(migrator = "MIGRATIONS")]
+ #[traced_test]
+ async fn it_return_paginated_tasks(pool: SqlitePool) -> sqlx::Result<()> {
+ let (sqlite, tasks) = setup(pool.clone(), 300);
- payloads
- .into_iter()
- .map(|item| Task::new(item.key.to_string(), item, TaskStatus::Pending))
- .collect()
+ sqlite.insert_tasks(&tasks).await.unwrap();
+
+ let page_options = TaskPagination::new().with_limit(Some(25));
+
+ let first_page: TasksPage =
+ sqlite.get_paginated_tasks(page_options).await.unwrap();
+
+ assert_eq!(first_page.tasks.len(), 25);
+ assert_eq!(first_page.tasks.first().unwrap().get_key(), tasks.get(0).unwrap().get_key());
+
+ let second_page: TasksPage =
+ sqlite.get_paginated_tasks(first_page.next()).await.unwrap();
+
+ assert_eq!(second_page.tasks.len(), 25);
+ assert_eq!(second_page.tasks.first().unwrap().get_key(), tasks.get(25).unwrap().get_key());
+
+ Ok(())
}
}
diff --git a/lib_sync_core/src/tasks.rs b/lib_sync_core/src/tasks.rs
index 20b61d6..a4bc96c 100644
--- a/lib_sync_core/src/tasks.rs
+++ b/lib_sync_core/src/tasks.rs
@@ -35,10 +35,10 @@ impl Display for TaskStatus {
}
pub trait TaskPayload:
- Serialize + DeserializeOwned + Send + Unpin + 'static
+ Serialize + DeserializeOwned + Send + Unpin + 'static + std::fmt::Debug
{
}
-impl
+impl
TaskPayload for T
{
}
diff --git a/migrations/0003_tasks.sql b/migrations/0003_tasks.sql
index 0b7cb54..85d03ec 100644
--- a/migrations/0003_tasks.sql
+++ b/migrations/0003_tasks.sql
@@ -3,7 +3,7 @@ create table tasks
id integer not null
constraint tasks_pk
primary key autoincrement,
- payload_key ANY not null
+ payload_key TEXT not null
constraint tasks_payload_key
unique on conflict ignore,
payload TEXT not null,