Compare commits

...
Sign in to create a new pull request.

14 commits

37 changed files with 1763 additions and 231 deletions

2
.gitignore vendored
View file

@ -1 +1 @@
/target
**/target

29
.idea/codeStyles/Project.xml generated Normal file
View file

@ -0,0 +1,29 @@
<component name="ProjectCodeStyleConfiguration">
<code_scheme name="Project" version="173">
<RsCodeStyleSettings>
<option name="INDENT_WHERE_CLAUSE" value="true" />
</RsCodeStyleSettings>
<SqlCodeStyleSettings version="7">
<option name="KEYWORD_CASE" value="2" />
<option name="SUBQUERY_OPENING" value="1" />
<option name="SUBQUERY_CONTENT" value="4" />
<option name="SUBQUERY_CLOSING" value="4" />
<option name="SUBQUERY_PAR_SPACE_BEFORE" value="1" />
<option name="INSERT_INTO_NL" value="2" />
<option name="INSERT_COLLAPSE_MULTI_ROW_VALUES" value="true" />
<option name="INSERT_MATRIX_ALIGN" value="true" />
<option name="INSERT_MATRIX_INCLUDING_HEADER" value="true" />
<option name="FROM_ALIGN_JOIN_TABLES" value="true" />
<option name="FROM_ALIGN_ALIASES" value="true" />
</SqlCodeStyleSettings>
<codeStyleSettings language="Rust">
<indentOptions>
<option name="INDENT_SIZE" value="2" />
<option name="CONTINUATION_INDENT_SIZE" value="2" />
<option name="TAB_SIZE" value="2" />
<option name="USE_TAB_CHARACTER" value="true" />
<option name="SMART_TABS" value="true" />
</indentOptions>
</codeStyleSettings>
</code_scheme>
</component>

6
.idea/codeStyles/codeStyleConfig.xml generated Normal file
View file

@ -0,0 +1,6 @@
<component name="ProjectCodeStyleConfiguration">
<state>
<option name="USE_PER_PROJECT_SETTINGS" value="true" />
<option name="PREFERRED_PROJECT_CODE_STYLE" value="Default" />
</state>
</component>

7
.idea/dataSources.xml generated
View file

@ -8,5 +8,12 @@
<jdbc-url>jdbc:sqlite:$USER_HOME$/.local/share/readwise-bulk-upload/db.sql</jdbc-url>
<working-dir>$ProjectFileDir$</working-dir>
</data-source>
<data-source source="LOCAL" name="it_save_tasks.sqlite" uuid="313640db-1124-4bac-bbf4-8b3990555416">
<driver-ref>sqlite.xerial</driver-ref>
<synchronize>true</synchronize>
<jdbc-driver>org.sqlite.JDBC</jdbc-driver>
<jdbc-url>jdbc:sqlite:$PROJECT_DIR$/lib_sync_core/target/sqlx/test-dbs/lib_sync_core/database/sqlite/tests/it_save_tasks.sqlite</jdbc-url>
<working-dir>$ProjectFileDir$</working-dir>
</data-source>
</component>
</project>

View file

@ -2,7 +2,10 @@
<module type="EMPTY_MODULE" version="4">
<component name="NewModuleRootManager">
<content url="file://$MODULE_DIR$">
<sourceFolder url="file://$MODULE_DIR$/src" isTestSource="false" />
<sourceFolder url="file://$MODULE_DIR$/cli/src" isTestSource="false" />
<sourceFolder url="file://$MODULE_DIR$/lib_sync_core/src" isTestSource="false" />
<sourceFolder url="file://$MODULE_DIR$/web/src" isTestSource="false" />
<excludeFolder url="file://$MODULE_DIR$/lib_sync_core/target" />
<excludeFolder url="file://$MODULE_DIR$/target" />
</content>
<orderEntry type="inheritedJdk" />

22
.idea/runConfigurations/Load_Tasks.xml generated Normal file
View file

@ -0,0 +1,22 @@
<component name="ProjectRunConfigurationManager">
<configuration default="false" name="Load Tasks" type="CargoCommandRunConfiguration" factoryName="Cargo Command">
<option name="buildProfileId" value="dev" />
<option name="command" value="run --package readwise-bulk-upload --bin readwise-bulk-upload -- load_tasks $USER_HOME$/Downloads/readwise-reader-import.json" />
<option name="workingDirectory" value="file://$PROJECT_DIR$" />
<envs>
<env name="APP_LOG_LEVEL" value="Debug" />
</envs>
<option name="emulateTerminal" value="true" />
<option name="channel" value="DEFAULT" />
<option name="requiredFeatures" value="true" />
<option name="allFeatures" value="false" />
<option name="withSudo" value="false" />
<option name="buildTarget" value="REMOTE" />
<option name="backtrace" value="SHORT" />
<option name="isRedirectInput" value="false" />
<option name="redirectInputPath" value="" />
<method v="2">
<option name="CARGO.BUILD_TASK_PROVIDER" enabled="true" />
</method>
</configuration>
</component>

22
.idea/runConfigurations/Query_Tasks.xml generated Normal file
View file

@ -0,0 +1,22 @@
<component name="ProjectRunConfigurationManager">
<configuration default="false" name="Query Tasks" type="CargoCommandRunConfiguration" factoryName="Cargo Command">
<option name="buildProfileId" value="dev" />
<option name="command" value="run --package readwise-bulk-upload --bin readwise-bulk-upload -- query" />
<option name="workingDirectory" value="file://$PROJECT_DIR$" />
<envs>
<env name="APP_LOG_LEVEL" value="Debug" />
</envs>
<option name="emulateTerminal" value="true" />
<option name="channel" value="DEFAULT" />
<option name="requiredFeatures" value="true" />
<option name="allFeatures" value="false" />
<option name="withSudo" value="false" />
<option name="buildTarget" value="REMOTE" />
<option name="backtrace" value="SHORT" />
<option name="isRedirectInput" value="false" />
<option name="redirectInputPath" value="" />
<method v="2">
<option name="CARGO.BUILD_TASK_PROVIDER" enabled="true" />
</method>
</configuration>
</component>

22
.idea/runConfigurations/Run_Tasks.xml generated Normal file
View file

@ -0,0 +1,22 @@
<component name="ProjectRunConfigurationManager">
<configuration default="false" name="Run Tasks" type="CargoCommandRunConfiguration" factoryName="Cargo Command">
<option name="buildProfileId" value="dev" />
<option name="command" value="run --package readwise-bulk-upload --bin readwise-bulk-upload -- run" />
<option name="workingDirectory" value="file://$PROJECT_DIR$" />
<envs>
<env name="APP_LOG_LEVEL" value="Debug" />
</envs>
<option name="emulateTerminal" value="true" />
<option name="channel" value="DEFAULT" />
<option name="requiredFeatures" value="true" />
<option name="allFeatures" value="false" />
<option name="withSudo" value="false" />
<option name="buildTarget" value="REMOTE" />
<option name="backtrace" value="SHORT" />
<option name="isRedirectInput" value="false" />
<option name="redirectInputPath" value="" />
<method v="2">
<option name="CARGO.BUILD_TASK_PROVIDER" enabled="true" />
</method>
</configuration>
</component>

5
.idea/vcs.xml generated
View file

@ -1,5 +1,10 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="GitSharedSettings">
<option name="FORCE_PUSH_PROHIBITED_PATTERNS">
<list />
</option>
</component>
<component name="VcsDirectoryMappings">
<mapping directory="" vcs="Git" />
</component>

721
Cargo.lock generated

File diff suppressed because it is too large Load diff

View file

@ -1,18 +1,6 @@
[package]
name = "readwise-bulk-upload"
version = "0.1.0"
edition = "2024"
[workspace]
resolver = "3"
[dependencies]
thiserror = "2.0.12"
directories = "6.0.0"
tokio = { version = "1.45.0", features = ["default", "rt", "rt-multi-thread", "macros"] }
sqlx = { version = "0.8", features = [ "runtime-tokio", "sqlite", "chrono", "migrate" ] }
clap = { version = "4.5.37", features = ["derive"] }
serde = { version = "1.0.219", features = ["derive"] }
chrono = {version = "0.4.41", features = ["serde"]}
serde_json = "1.0.140"
tracing = "0.1.41"
tracing-subscriber = { version = "0.3.19" , features = ["env-filter"]}
figment = { version = "0.10.19", features = ["env"] }
tracing-core = "0.1.33"
members = [
"cli", "lib_sync_core", "web", "task_queue",
]

26
cli/Cargo.toml Normal file
View file

@ -0,0 +1,26 @@
[package]
name = "cli"
version = "0.1.0"
edition = "2024"
[[bin]]
name = "readwise"
path = "bin/readwise/main.rs"
[dependencies]
lib_sync_core = {path = "../lib_sync_core"}
directories = "6.0.0"
tokio = { version = "1.45.0", features = ["default", "rt", "rt-multi-thread", "macros"] }
sqlx = { version = "0.8", features = [ "runtime-tokio", "sqlite", "chrono", "migrate" ] }
clap = { version = "4.5.37", features = ["derive"] }
serde = { version = "1.0.219", features = ["derive"] }
chrono = {version = "0.4.41", features = ["serde"]}
serde_json = "1.0.140"
tracing = "0.1.41"
tracing-subscriber = { version = "0.3.19" , features = ["env-filter"]}
figment = { version = "0.10.19", features = ["env"] }
tracing-core = "0.1.33"
tabled = "0.19.0"
futures = "0.3.31"
thiserror = "2.0.12"

View file

@ -1,7 +1,8 @@
use lib_sync_core::tasks::TaskPayload;
use chrono::{DateTime, Local};
use serde::{Deserialize, Deserializer, de, Serialize};
use serde::{de, Deserialize, Deserializer, Serialize};
use serde_json::Value;
use crate::sql::TaskPayload;
use std::fmt::Display;
#[derive(Deserialize, Serialize, Debug)]
pub struct DocumentPayload {
@ -14,6 +15,16 @@ pub struct DocumentPayload {
location: String,
}
impl Display for DocumentPayload {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"{}",
serde_json::to_string_pretty(self).map_err(|_| std::fmt::Error)?
)
}
}
impl TaskPayload for DocumentPayload {
fn get_key(&self) -> String {
self.url.clone()

76
cli/bin/readwise/main.rs Normal file
View file

@ -0,0 +1,76 @@
use clap::{CommandFactory, Parser};
use directories::ProjectDirs;
use figment::{
Figment,
providers::{Env, Serialized},
};
use lib_sync_core::tasks::{TaskStatus};
use cli::config::{Command, Config};
use cli::{Error, Result};
use std::fs::File;
use tabled::Table;
use tracing_subscriber;
use crate::external_interface::DocumentPayload;
mod external_interface;
#[tokio::main]
async fn main() -> Result<()> {
let cli = Config::parse();
let args: Config = Figment::new()
.merge(Serialized::defaults(&cli))
.merge(Env::prefixed("APP_"))
.extract()?;
tracing_subscriber::fmt()
.with_max_level(args.log_level())
.init();
run(&cli.command).await?;
Ok(())
}
async fn run(command: &Command) -> Result<()> {
let project_dir = ProjectDirs::from("", "", "synchronizator_readwise").ok_or(
lib_sync_core::error::Error::Unhandled("Could not get standard directories"),
)?;
let task_manager = TaskManager::new(project_dir.data_dir()).await?;
match command {
Command::LoadTasks { path } => {
let file = File::open(path).map_err(|_| {
Error::Runtime(format!(
r#"The file "{}" could not be open"#,
path.display()
))
})?;
let documents: Vec<DocumentPayload> = serde_json::from_reader(file)?;
task_manager.load_tasks(documents).await?;
}
Command::Query => {
let tasks = task_manager
.get_tasks::<DocumentPayload>(None, Some(25))
.await?;
println!("{}", Table::new(tasks));
}
Command::Run => {
task_manager
.run_tasks::<DocumentPayload>(|task| {
println!("{}", task.get_key());
TaskStatus::Completed
})
.await?;
}
Command::None => {
Config::command().print_help()?;
}
}
Ok(())
}

View file

@ -1,4 +1,4 @@
use clap::{Parser, ValueEnum};
use clap::{Parser, Subcommand, ValueEnum};
use serde::{Deserialize, Serialize};
use std::fmt;
use std::path::PathBuf;
@ -57,9 +57,32 @@ impl Into<LevelFilter> for VerbosityLevel {
}
}
#[derive(Debug, Subcommand)]
#[clap(rename_all = "snake_case")]
pub enum Command {
/// Load task into the database from [path]
LoadTasks{
/// Path to the file
path: PathBuf,
},
Query,
Run,
#[clap(skip)]
None,
}
impl Default for Command {
fn default() -> Self {
Command::None
}
}
#[derive(Debug, Parser, Serialize, Deserialize)]
pub struct Config {
path: PathBuf,
#[command(subcommand)]
#[serde(skip)]
pub command: Command,
#[arg(
long,
short = 'v',
@ -71,9 +94,6 @@ pub struct Config {
}
impl Config {
pub fn path(&self) -> &PathBuf {
&self.path
}
pub fn log_level(&self) -> LevelFilter {
self.log_level.clone().into()

View file

@ -11,6 +11,9 @@ pub enum Error {
#[error("{0}")]
Unhandled(&'static str),
#[error(transparent)]
Sync(#[from] lib_sync_core::error::Error),
#[error(transparent)]
Sqlx(#[from] sqlx::Error),

View file

@ -1,6 +1,4 @@
mod error;
pub mod sql;
pub mod config;
pub mod readwise;
mod error;
pub use error::*;

23
cli/src/main.rs Normal file
View file

@ -0,0 +1,23 @@
use clap::Parser;
use figment::{
providers::{Env, Serialized},
Figment,
};
use cli::config::Config;
use cli::Result;
use tracing_subscriber;
#[tokio::main]
async fn main() -> Result<()> {
let cli = Config::parse();
let args: Config = Figment::new()
.merge(Serialized::defaults(&cli))
.merge(Env::prefixed("APP_"))
.extract()?;
tracing_subscriber::fmt()
.with_max_level(args.log_level())
.init();
Ok(())
}

27
lib_sync_core/Cargo.toml Normal file
View file

@ -0,0 +1,27 @@
[package]
name = "lib_sync_core"
version = "0.1.0"
edition = "2024"
[dependencies]
directories = "6.0.0"
tokio = { version = "1.45.0", features = ["default", "rt", "rt-multi-thread", "macros"] }
tokio-stream = "0.1.17"
sqlx = { version = "0.8", features = ["runtime-tokio", "sqlite", "chrono", "migrate", "uuid"] }
clap = { version = "4.5.37", features = ["derive"] }
serde = { version = "1.0.219", features = ["derive"] }
chrono = {version = "0.4.41", features = ["serde"]}
serde_json = "1.0.140"
tracing = "0.1.41"
tracing-subscriber = { version = "0.3.19" , features = ["env-filter"]}
figment = { version = "0.10.19", features = ["env"] }
tracing-core = "0.1.33"
tabled = "0.19.0"
futures = "0.3.31"
thiserror = "2.0.12"
async-stream = "0.3.6"
uuid = { version = "1.16.0", features = ["serde", "v4"] }
[dev-dependencies]
fake = { version = "4.3.0", features = ["derive", "chrono", "http", "uuid"] }
tracing-test = "0.2.5"

View file

@ -0,0 +1,76 @@
use crate::tasks::{Task, TaskPayload, TaskStatus};
use futures::Stream;
mod sqlite;
#[derive(Default, Clone, Debug)]
pub struct TaskPagination {
limit: Option<u32>,
offset: u32,
status: Option<TaskStatus>,
}
impl TaskPagination {
pub fn new() -> Self {
Self::default()
}
pub fn next(&self) -> Self {
Self {
offset: self.offset.saturating_add(self.limit.unwrap_or(0)),
..self.clone()
}
}
pub fn prev(&self) -> Self {
Self {
offset: self.offset.saturating_sub(self.limit.unwrap_or(0)),
..self.clone()
}
}
pub fn with_limit(mut self, limit: Option<u32>) -> Self {
self.limit = limit;
self
}
pub fn with_offset(mut self, offset: u32) -> Self {
self.offset = offset;
self
}
pub fn with_status(mut self, status: Option<TaskStatus>) -> Self {
self.status = status;
self
}
}
pub struct TasksPage<T: TaskPayload> {
tasks: Vec<Task<T>>,
page: TaskPagination,
}
impl<T: TaskPayload> TasksPage<T> {
fn new(tasks: Vec<Task<T>>, page: TaskPagination) -> Self {
Self { tasks, page }
}
pub fn next(&self) -> TaskPagination {
self.page.next()
}
pub fn prev(&self) -> TaskPagination {
self.page.prev()
}
}
pub trait TaskStorage<T: TaskPayload> {
async fn insert_tasks<'a, I: IntoIterator<Item = &'a Task<T>>>(
&self,
tasks: I,
) -> crate::Result<()>;
fn get_tasks(&self, task_status: TaskStatus) -> impl Stream<Item = crate::Result<Task<T>>>;
fn listen_tasks(&self, task_status: TaskStatus) -> impl Stream<Item = crate::Result<Task<T>>>;
async fn get_paginated_tasks(&self, page: TaskPagination) -> crate::Result<TasksPage<T>>;
}

View file

@ -0,0 +1,230 @@
use crate::database::{TaskPagination, TaskStorage, TasksPage};
use crate::tasks::{Task, TaskPayload, TaskStatus};
use futures::{Stream, TryStreamExt};
use sqlx::sqlite::{SqliteConnectOptions, SqliteJournalMode};
use sqlx::{QueryBuilder, SqlitePool};
use std::path::PathBuf;
use tokio::fs;
use tracing::{debug, info, instrument};
static SQLITE_BIND_LIMIT: usize = 32766;
static MIGRATIONS: sqlx::migrate::Migrator = sqlx::migrate!("../migrations");
#[derive(Debug)]
pub struct Sqlite {
pool: SqlitePool,
}
impl Sqlite {
pub async fn new<P: Into<PathBuf>>(base_path: P) -> crate::Result<Self> {
Ok(Self {
pool: Self::connect_database(base_path).await?,
})
}
async fn connect_database<P: Into<PathBuf>>(base_path: P) -> crate::Result<SqlitePool> {
let base_path = base_path.into();
let database_file_path = base_path.join("db.sql");
fs::create_dir_all(base_path).await?;
let opts = SqliteConnectOptions::new()
.filename(database_file_path)
.create_if_missing(true)
.journal_mode(SqliteJournalMode::Wal);
let pool = SqlitePool::connect_with(opts).await?;
MIGRATIONS.run(&pool).await?;
Ok(pool)
}
}
impl<T: TaskPayload> TaskStorage<T> for Sqlite {
/// Insert task into the database for later processing
///
/// # Arguments
///
/// * `tasks`: A list of task to be processed, each task has to have a unique key, if a key is repeated, the item will be omitted
///
/// returns: Result<(), Error>
///
/// # Examples
///
/// ```
///
/// ```
#[instrument(skip(self, tasks))]
async fn insert_tasks<'a, I: IntoIterator<Item = &'a Task<T>>>(
&self,
tasks: I,
) -> crate::Result<()> {
let mut tx = self.pool.begin().await?;
let mut builder: QueryBuilder<'_, sqlx::Sqlite> =
QueryBuilder::new("insert into tasks(payload_key, payload, status_id)");
let args: crate::Result<Vec<(String, String)>> = tasks
.into_iter()
.map(|value| Ok((value.get_key(), serde_json::to_string(value.payload())?)))
.collect();
let mut affected_rows = 0;
// Chunk the query by the size limit of bind params
for chunk in args?.chunks(SQLITE_BIND_LIMIT / 3) {
builder.push_values(chunk, |mut builder, item| {
builder
.push_bind(&item.0)
.push_bind(&item.1)
.push_bind(TaskStatus::Pending);
});
builder.push("ON conflict (payload_key) DO NOTHING");
let query = builder.build();
affected_rows += query.execute(&mut *tx).await?.rows_affected();
builder.reset();
}
tx.commit().await?;
info!("{} rows inserted.", affected_rows);
Ok(())
}
fn get_tasks(&self, task_status: TaskStatus) -> impl Stream<Item = crate::Result<Task<T>>> {
let query = sqlx::query_as::<_, Task<T>>(
"
SELECT id, payload_key, payload, status_id, created_at, updated_at
FROM tasks
WHERE status_id = ?
ORDER BY created_at DESC
",
)
.bind(task_status);
query.fetch(&self.pool).err_into::<crate::Error>()
}
fn listen_tasks(&self, task_status: TaskStatus) -> impl Stream<Item=crate::error::Result<Task<T>>> {
futures::stream::empty()
}
async fn get_paginated_tasks(&self, page: TaskPagination) -> crate::Result<TasksPage<T>> {
let mut builder: QueryBuilder<'_, sqlx::Sqlite> = QueryBuilder::new(
"select id, payload_key, payload, status_id, created_at, updated_at from tasks ",
);
if let Some(status) = &page.status {
builder.push("where status_id = ").push_bind(status);
}
builder.push("ORDER BY created_at DESC ");
if let Some(limit) = page.limit {
builder.push("LIMIT ").push_bind(limit);
builder.push(" OFFSET ").push_bind(page.offset);
}
debug!("SQL: \"{}\", with options: \"{:?}\"", builder.sql(), page);
let tasks = builder
.build_query_as::<Task<T>>()
.fetch_all(&self.pool)
.await?;
Ok(TasksPage::new(tasks, page.clone()))
}
}
#[cfg(test)]
mod tests {
use super::*;
use fake::Dummy;
use futures::StreamExt;
use serde::{Deserialize, Serialize};
use sqlx::types::Uuid;
use sqlx::Row;
use tracing_test::traced_test;
#[derive(Dummy, Serialize, Deserialize, Debug)]
struct DummyTaskPayload {
key: Uuid,
_foo: String,
_bar: String,
}
//noinspection RsUnresolvedPath
fn setup(pool: SqlitePool, len: usize) -> (Sqlite, Vec<Task<DummyTaskPayload>>) {
let owned_pool = pool.clone();
let sqlite = Sqlite { pool: owned_pool };
let payloads = fake::vec![DummyTaskPayload; len];
let tasks = payloads
.into_iter()
.enumerate()
.map(|(i, item)| Task::new((i + 1).to_string(), item, TaskStatus::Pending))
.collect();
(sqlite, tasks)
}
#[sqlx::test(migrator = "MIGRATIONS")]
#[traced_test]
async fn it_save_tasks(pool: SqlitePool) -> sqlx::Result<()> {
let (sqlite, tasks) = setup(pool.clone(), 100);
sqlite.insert_tasks(&tasks).await.unwrap();
let result = sqlx::query("select count(id) from tasks")
.fetch_one(&pool)
.await?;
let total_rows: u64 = result.get(0);
assert_eq!(total_rows as usize, tasks.len());
let saved_tasks: Vec<Task<DummyTaskPayload>> = sqlite
.get_tasks(TaskStatus::Pending)
.map(|item| item.unwrap())
.collect()
.await;
assert_eq!(tasks.len(), saved_tasks.len());
let mut zip = tasks.into_iter().zip(saved_tasks.into_iter());
assert!(zip.all(|(a, b)| { a.get_key() == b.get_key() }));
Ok(())
}
#[sqlx::test(migrator = "MIGRATIONS")]
#[traced_test]
async fn it_return_paginated_tasks(pool: SqlitePool) -> sqlx::Result<()> {
let (sqlite, tasks) = setup(pool.clone(), 300);
sqlite.insert_tasks(&tasks).await.unwrap();
let page_options = TaskPagination::new().with_limit(Some(25));
let first_page: TasksPage<DummyTaskPayload> =
sqlite.get_paginated_tasks(page_options).await.unwrap();
assert_eq!(first_page.tasks.len(), 25);
assert_eq!(first_page.tasks.first().unwrap().get_key(), tasks.get(0).unwrap().get_key());
let second_page: TasksPage<DummyTaskPayload> =
sqlite.get_paginated_tasks(first_page.next()).await.unwrap();
assert_eq!(second_page.tasks.len(), 25);
assert_eq!(second_page.tasks.first().unwrap().get_key(), tasks.get(25).unwrap().get_key());
Ok(())
}
}

View file

@ -0,0 +1,24 @@
use thiserror::Error;
#[derive(Error, Debug)]
pub enum Error {
#[error("{0}")]
Exception(&'static str),
#[error("{0}")]
Unhandled(&'static str),
#[error(transparent)]
Io(#[from] tokio::io::Error),
#[error(transparent)]
Sqlx(#[from] sqlx::Error),
#[error(transparent)]
Migration(#[from] sqlx::migrate::MigrateError),
#[error(transparent)]
ParseJson(#[from] serde_json::Error),
}
pub type Result<T> = std::result::Result<T, Error>;

7
lib_sync_core/src/lib.rs Normal file
View file

@ -0,0 +1,7 @@
#![allow(dead_code, unused)]
pub mod error;
pub(crate) use error::*;
mod database;
pub mod tasks;

View file

@ -0,0 +1,92 @@
use chrono::Utc;
use serde::de::DeserializeOwned;
use serde::Serialize;
use std::fmt::Display;
use tabled::Tabled;
mod manager;
mod jobs;
mod worker;
mod bus;
#[derive(sqlx::Type, Debug, Clone)]
#[repr(u8)]
pub enum TaskStatus {
Pending = 1,
InProgress = 2,
Completed = 3,
Failed = 4,
}
impl Display for TaskStatus {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
TaskStatus::Pending => {
write!(f, "Pending")
}
TaskStatus::InProgress => {
write!(f, "In Progress")
}
TaskStatus::Completed => {
write!(f, "Completed")
}
TaskStatus::Failed => {
write!(f, "Failed")
}
}
}
}
pub trait TaskPayload:
Serialize + DeserializeOwned + Send + Unpin + 'static + std::fmt::Debug
{
}
impl<T: Serialize + DeserializeOwned + Send + Unpin + 'static + std::fmt::Debug>
TaskPayload for T
{
}
#[derive(sqlx::FromRow, Tabled, Debug)]
pub struct Task<T: TaskPayload> {
id: u32,
payload_key: String,
#[sqlx(json)]
#[tabled(skip)]
payload: T,
#[sqlx(rename = "status_id")]
status: TaskStatus,
created_at: chrono::DateTime<Utc>,
#[tabled(display = "display_option_date")]
updated_at: Option<chrono::DateTime<Utc>>,
}
impl<T: TaskPayload> Task<T> {
pub fn new(payload_key: String, payload: T, status: TaskStatus) -> Self {
Self {
id: 0,
payload_key,
payload,
status,
created_at: Default::default(),
updated_at: None,
}
}
}
impl<T: TaskPayload> Task<T> {
pub fn payload(&self) -> &T {
&self.payload
}
pub fn get_key(&self) -> String {
self.payload_key.clone()
}
}
fn display_option_date(o: &Option<chrono::DateTime<Utc>>) -> String {
match o {
Some(s) => format!("{}", s),
None => String::from(""),
}
}

View file

@ -0,0 +1,4 @@
#[derive(Clone)]
pub enum Bus {
Local,
}

View file

@ -0,0 +1,3 @@
use crate::tasks::{Task, TaskStatus};
pub type TaskJob<T> = fn(&Task<T>) -> TaskStatus;

View file

@ -0,0 +1,190 @@
use crate::database::TaskStorage;
use crate::tasks::bus::Bus;
use crate::tasks::jobs::TaskJob;
use crate::tasks::{Task, TaskPayload, TaskStatus};
use futures::StreamExt;
use std::marker::PhantomData;
use std::pin::pin;
use tokio::sync::mpsc::Receiver;
use tokio::sync::{mpsc, oneshot};
use tokio::sync::oneshot::Sender;
use crate::tasks::worker::TaskMessage;
pub enum RateLimit {
Buffer(usize),
Rate(usize),
Ticks(usize),
None,
}
pub struct ManagerOptions {
rate_limit: RateLimit,
bus: Bus,
}
impl ManagerOptions {
pub fn new() -> Self {
Self::default()
}
pub fn with_rate_limit(mut self, rate_limit: RateLimit) -> Self {
self.rate_limit = rate_limit;
self
}
}
impl Default for ManagerOptions {
fn default() -> Self {
Self {
rate_limit: RateLimit::None,
bus: Bus::Local,
}
}
}
struct TaskManager<S: TaskPayload, T: TaskStorage<S>> {
storage: T,
options: ManagerOptions,
_marker: PhantomData<S>,
}
impl<S: TaskPayload, T: TaskStorage<S>> TaskManager<S, T> {
pub fn new(storage: T, options: ManagerOptions) -> Self {
Self {
storage,
options,
_marker: PhantomData,
}
}
pub async fn run_tasks(&self, mut task_sink: TaskMessage<S>) -> crate::Result<()> {
let rows = self.storage.get_tasks(TaskStatus::Pending);
let listener = self.storage.listen_tasks(TaskStatus::Pending);
let mut queue = pin!(rows.chain(listener));
while let Some(task) = queue.next().await {
let task = match task {
Ok(task) => task,
Err(e) => {
continue
}
};
let sink = match task_sink.recv().await {
Some(s) => s,
None => break, // sink has stoped requesting tasks
};
if let Err(_) = sink.send(task) {
continue;
}
// (task, status)
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::database::{TaskPagination, TasksPage};
use async_stream::stream;
use fake::{Dummy, Fake, Faker};
use futures::Stream;
use serde::{Deserialize, Serialize};
use sqlx::types::Uuid;
use sync::mpsc;
use tokio::sync;
use tokio_stream::wrappers::ReceiverStream;
use tracing_test::traced_test;
use crate::error::Error;
use crate::tasks::worker::{Worker, WorkerManager};
#[derive(Dummy, Serialize, Deserialize, Debug)]
struct DummyTaskPayload {
key: Uuid,
_foo: String,
_bar: String,
}
struct DummyTaskStorage {}
impl TaskStorage<DummyTaskPayload> for DummyTaskStorage {
async fn insert_tasks<'a, I: IntoIterator<Item = &'a Task<DummyTaskPayload>>>(
&self,
_: I,
) -> crate::error::Result<()> {
todo!()
}
fn get_tasks(
&self,
task_status: TaskStatus,
) -> impl Stream<Item = crate::Result<Task<DummyTaskPayload>>> {
let payloads: Vec<DummyTaskPayload> = Faker.fake();
let tasks = payloads.into_iter().enumerate().map(move |(i, item)| {
Ok(Task::new((i + 1).to_string(), item, task_status.clone()))
});
futures::stream::iter(tasks)
}
fn listen_tasks(
&self,
task_status: TaskStatus,
) -> impl Stream<Item = crate::Result<Task<DummyTaskPayload>>> {
let (tx, rx) = mpsc::channel::<crate::Result<Task<DummyTaskPayload>>>(10);
tokio::spawn(async move {
for _ in 0..10 {
tokio::time::sleep(std::time::Duration::from_millis(250)).await;
let payload: DummyTaskPayload = Faker.fake();
let task_status: TaskStatus = task_status.clone();
let task = Ok(Task::new(payload.key.to_string(), payload, task_status));
if let Err(_) = tx.send(task).await {
break;
}
}
});
ReceiverStream::new(rx)
}
async fn get_paginated_tasks(
&self,
_: TaskPagination,
) -> crate::error::Result<TasksPage<DummyTaskPayload>> {
todo!()
}
}
struct DummyWorker;
impl Worker<DummyTaskPayload> for DummyWorker {
fn process_job(task: &Task<DummyTaskPayload>) -> crate::error::Result<()> {
println!("{:#?}", task);
Ok(())
}
async fn on_job_failure(task: &Task<DummyTaskPayload>, error: Error) -> crate::error::Result<()> {
println!("{:#?} {:?}", task, error);
Ok(())
}
}
#[tokio::test]
#[traced_test]
async fn manager_runs() {
let execute_options = ManagerOptions::new();
let local_worker_sink = WorkerManager::get_listener_sink::<DummyTaskPayload, DummyWorker>(execute_options.bus.clone());
let task_manager = TaskManager::new(DummyTaskStorage {}, execute_options);
task_manager.run_tasks(local_worker_sink).await.unwrap()
}
}

View file

@ -0,0 +1,48 @@
use crate::error::Error;
use crate::tasks::bus::Bus;
use crate::tasks::{Task, TaskPayload};
use tokio::sync::mpsc::Receiver;
use tokio::sync::oneshot::Sender;
use tokio::sync::{mpsc, oneshot};
pub type TaskMessage<T> = Receiver<Sender<Task<T>>>;
pub struct WorkerManager;
impl WorkerManager {
pub fn get_listener_sink<T: TaskPayload, W: Worker<T>>(bus: Bus) -> TaskMessage<T> {
match bus {
Bus::Local => {
let (bus_tx, bus_rx) = mpsc::channel(100);
tokio::spawn(async move {
loop {
// TODO: properly catch errors
let (tx, rx) = oneshot::channel();
// Request a task
bus_tx.send(tx).await.unwrap();
// Wait for a task to be returned
let task = rx.await.unwrap();
W::process_job(&task).unwrap();
}
});
bus_rx
}
}
}
}
pub trait Worker<T: TaskPayload> {
async fn pre_process_job(task: &Task<T>) -> crate::Result<()> {
Ok(())
}
fn process_job(task: &Task<T>) -> crate::Result<()>;
async fn post_process_job(task: &Task<T>) -> crate::Result<()> {
Ok(())
}
async fn on_job_failure(task: &Task<T>, error: Error) -> crate::Result<()>;
}

View file

@ -3,7 +3,7 @@ create table tasks
id integer not null
constraint tasks_pk
primary key autoincrement,
payload_key ANY not null
payload_key TEXT not null
constraint tasks_payload_key
unique on conflict ignore,
payload TEXT not null,

View file

@ -1,35 +0,0 @@
use clap::Parser;
use readwise_bulk_upload::config::Config;
use readwise_bulk_upload::readwise::DocumentPayload;
use readwise_bulk_upload::sql::TaskManager;
use readwise_bulk_upload::{Error, Result};
use std::fs::File;
use tracing_subscriber;
use figment::{Figment, providers::{Serialized, Env}};
#[tokio::main]
async fn main() -> Result<()> {
let args: Config = Figment::new()
.merge(Serialized::defaults(Config::parse()))
.merge(Env::prefixed("APP_"))
.extract()?;
tracing_subscriber::fmt()
.with_max_level(args.log_level())
.init();
let file = File::open(args.path()).map_err(|_| {
Error::Runtime(format!(
r#"The file "{}" could not be open"#,
args.path().display()
))
})?;
let documents: Vec<DocumentPayload> = serde_json::from_reader(file)?;
let task_manager = TaskManager::new().await?;
task_manager.load_tasks(documents).await?;
Ok(())
}

View file

@ -1,94 +0,0 @@
use crate::Error;
use directories::ProjectDirs;
use serde::Serialize;
use sqlx::sqlite::{SqliteConnectOptions, SqliteJournalMode};
use sqlx::{QueryBuilder, Sqlite, SqlitePool};
use tokio::fs;
use tracing::{info, instrument};
static SQLITE_BIND_LIMIT: usize = 32766;
#[derive(sqlx::Type)]
#[repr(u8)]
pub enum TaskStatus {
Pending = 1,
InProgress = 2,
Completed = 3,
Failed = 4,
}
pub trait TaskPayload {
fn get_key(&self) -> String;
}
#[derive(Debug)]
pub struct TaskManager {
pool: SqlitePool,
}
impl TaskManager {
pub async fn new() -> Result<TaskManager, Error> {
Ok(Self {
pool: Self::connect_database().await?,
})
}
async fn connect_database() -> crate::Result<SqlitePool> {
let project_dir = ProjectDirs::from("", "", env!("CARGO_PKG_NAME"))
.ok_or(Error::Unhandled("Could not get standard directories"))?;
let database_file_path = project_dir.data_dir().join("db.sql");
fs::create_dir_all(project_dir.data_dir()).await?;
let opts = SqliteConnectOptions::new()
.filename(database_file_path)
.create_if_missing(true)
.journal_mode(SqliteJournalMode::Wal);
let pool = SqlitePool::connect_with(opts).await?;
sqlx::migrate!("./migrations").run(&pool).await?;
Ok(pool)
}
#[instrument(skip(self, values))]
pub async fn load_tasks<T>(&self, values: Vec<T>) -> crate::Result<()>
where
T: TaskPayload + Serialize + std::fmt::Debug,
{
let mut tx = self.pool.begin().await?;
let mut builder: QueryBuilder<'_, Sqlite> =
QueryBuilder::new("insert into tasks(payload_key, payload, status_id)");
let args: crate::Result<Vec<(String, String)>> = values
.iter()
.map(|value| Ok((value.get_key(), serde_json::to_string(value)?)))
.collect();
let mut affected_rows = 0;
// Chunk the query by the size limit of bind params
for chunk in args?.chunks(SQLITE_BIND_LIMIT / 3) {
builder.push_values(chunk, |mut builder, item| {
builder
.push_bind(&item.0)
.push_bind(&item.1)
.push_bind(TaskStatus::Pending);
});
builder.push("ON conflict (payload_key) DO NOTHING");
let query = builder.build();
affected_rows += query.execute(&mut *tx).await?.rows_affected();
builder.reset();
}
tx.commit().await?;
info!("{} rows inserted.", affected_rows);
Ok(())
}
}

26
task_queue/Cargo.toml Normal file
View file

@ -0,0 +1,26 @@
[package]
name = "task_queue"
version = "0.1.0"
edition = "2024"
[dependencies]
directories = "6.0.0"
tokio = { version = "1.45.0", features = ["default", "rt", "rt-multi-thread", "macros", "signal"] }
tokio-stream = "0.1.17"
sqlx = { version = "0.8", features = ["runtime-tokio", "sqlite", "chrono", "migrate", "uuid"] }
serde = { version = "1.0.219", features = ["derive"] }
chrono = {version = "0.4.41", features = ["serde"]}
serde_json = "1.0.140"
tracing = "0.1.41"
tracing-subscriber = { version = "0.3.19" , features = ["env-filter"]}
figment = { version = "0.10.19", features = ["env"] }
tracing-core = "0.1.33"
futures = "0.3.31"
thiserror = "2.0.12"
async-stream = "0.3.6"
apalis = { version = "1.0.0-rc.1" }
apalis-sqlite = "1.0.0-rc.1"
[dev-dependencies]
fake = { version = "4.3.0", features = ["derive", "chrono", "http", "uuid"] }
tracing-test = "0.2.5"

28
task_queue/src/error.rs Normal file
View file

@ -0,0 +1,28 @@
use thiserror::Error;
#[derive(Error, Debug)]
pub enum Error {
#[error("{0}")]
Exception(&'static str),
#[error("{0}")]
Unhandled(&'static str),
#[error(transparent)]
Io(#[from] tokio::io::Error),
#[error(transparent)]
Sqlx(#[from] sqlx::Error),
#[error(transparent)]
Migration(#[from] sqlx::migrate::MigrateError),
#[error(transparent)]
ParseJson(#[from] serde_json::Error),
#[error(transparent)]
WorkerError(#[from] apalis::prelude::WorkerError),
}
pub type Result<T> = std::result::Result<T, Error>;

9
task_queue/src/lib.rs Normal file
View file

@ -0,0 +1,9 @@
#![allow(dead_code, unused)]
use apalis::prelude::*;
use serde::{Deserialize, Serialize};
pub mod error;
pub(crate) use error::*;
pub mod tasks;

50
task_queue/src/tasks.rs Normal file
View file

@ -0,0 +1,50 @@
use serde::{Deserialize, Serialize};
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct Task {
pub id: u32,
}
#[cfg(test)]
mod tests {
use std::time::Duration;
use crate::Result;
use super::*;
use apalis::prelude::*;
use apalis_sqlite::SqliteStorage;
use fake::{Fake, Faker};
use sqlx::SqlitePool;
use tokio_stream::StreamExt;
async fn generate_dummy_tasks<T: TaskSink<super::Task>>(storage: &mut T)
where
<T as apalis::prelude::Backend>::Error: std::fmt::Debug,
{
storage
.push(super::Task { id: Faker.fake() })
.await
.unwrap();
}
#[tokio::test]
async fn can_enqueue_tasks() {
let pool = SqlitePool::connect(":memory:").await.unwrap();
SqliteStorage::setup(&pool).await.unwrap();
let mut backend = SqliteStorage::new(&pool);
generate_dummy_tasks(&mut backend).await;
async fn process_task(task: super::Task, worker: WorkerContext) -> crate::Result<()> {
tokio::time::sleep(Duration::from_millis(100)).await;
worker.stop()?;
Ok(())
}
let worker = WorkerBuilder::new("rango-tango")
.backend(backend)
.build(process_task);
worker.run().await.unwrap();
}
}

6
web/Cargo.toml Normal file
View file

@ -0,0 +1,6 @@
[package]
name = "web"
version = "0.1.0"
edition = "2024"
[dependencies]

3
web/src/main.rs Normal file
View file

@ -0,0 +1,3 @@
fn main() {
println!("Hello, world!");
}