Compare commits

..

No commits in common. "refactor/re-structure-project" and "main" have entirely different histories.

22 changed files with 175 additions and 682 deletions

View file

@ -2,9 +2,7 @@
<module type="EMPTY_MODULE" version="4"> <module type="EMPTY_MODULE" version="4">
<component name="NewModuleRootManager"> <component name="NewModuleRootManager">
<content url="file://$MODULE_DIR$"> <content url="file://$MODULE_DIR$">
<sourceFolder url="file://$MODULE_DIR$/cli/src" isTestSource="false" /> <sourceFolder url="file://$MODULE_DIR$/src" isTestSource="false" />
<sourceFolder url="file://$MODULE_DIR$/lib_sync_core/src" isTestSource="false" />
<sourceFolder url="file://$MODULE_DIR$/web/src" isTestSource="false" />
<excludeFolder url="file://$MODULE_DIR$/target" /> <excludeFolder url="file://$MODULE_DIR$/target" />
</content> </content>
<orderEntry type="inheritedJdk" /> <orderEntry type="inheritedJdk" />

View file

@ -1,22 +0,0 @@
<component name="ProjectRunConfigurationManager">
<configuration default="false" name="Load Tasks" type="CargoCommandRunConfiguration" factoryName="Cargo Command">
<option name="buildProfileId" value="dev" />
<option name="command" value="run --package readwise-bulk-upload --bin readwise-bulk-upload -- load_tasks $USER_HOME$/Downloads/readwise-reader-import.json" />
<option name="workingDirectory" value="file://$PROJECT_DIR$" />
<envs>
<env name="APP_LOG_LEVEL" value="Debug" />
</envs>
<option name="emulateTerminal" value="true" />
<option name="channel" value="DEFAULT" />
<option name="requiredFeatures" value="true" />
<option name="allFeatures" value="false" />
<option name="withSudo" value="false" />
<option name="buildTarget" value="REMOTE" />
<option name="backtrace" value="SHORT" />
<option name="isRedirectInput" value="false" />
<option name="redirectInputPath" value="" />
<method v="2">
<option name="CARGO.BUILD_TASK_PROVIDER" enabled="true" />
</method>
</configuration>
</component>

View file

@ -1,22 +0,0 @@
<component name="ProjectRunConfigurationManager">
<configuration default="false" name="Query Tasks" type="CargoCommandRunConfiguration" factoryName="Cargo Command">
<option name="buildProfileId" value="dev" />
<option name="command" value="run --package readwise-bulk-upload --bin readwise-bulk-upload -- query" />
<option name="workingDirectory" value="file://$PROJECT_DIR$" />
<envs>
<env name="APP_LOG_LEVEL" value="Debug" />
</envs>
<option name="emulateTerminal" value="true" />
<option name="channel" value="DEFAULT" />
<option name="requiredFeatures" value="true" />
<option name="allFeatures" value="false" />
<option name="withSudo" value="false" />
<option name="buildTarget" value="REMOTE" />
<option name="backtrace" value="SHORT" />
<option name="isRedirectInput" value="false" />
<option name="redirectInputPath" value="" />
<method v="2">
<option name="CARGO.BUILD_TASK_PROVIDER" enabled="true" />
</method>
</configuration>
</component>

View file

@ -1,22 +0,0 @@
<component name="ProjectRunConfigurationManager">
<configuration default="false" name="Run Tasks" type="CargoCommandRunConfiguration" factoryName="Cargo Command">
<option name="buildProfileId" value="dev" />
<option name="command" value="run --package readwise-bulk-upload --bin readwise-bulk-upload -- run" />
<option name="workingDirectory" value="file://$PROJECT_DIR$" />
<envs>
<env name="APP_LOG_LEVEL" value="Debug" />
</envs>
<option name="emulateTerminal" value="true" />
<option name="channel" value="DEFAULT" />
<option name="requiredFeatures" value="true" />
<option name="allFeatures" value="false" />
<option name="withSudo" value="false" />
<option name="buildTarget" value="REMOTE" />
<option name="backtrace" value="SHORT" />
<option name="isRedirectInput" value="false" />
<option name="redirectInputPath" value="" />
<method v="2">
<option name="CARGO.BUILD_TASK_PROVIDER" enabled="true" />
</method>
</configuration>
</component>

5
.idea/vcs.xml generated
View file

@ -1,10 +1,5 @@
<?xml version="1.0" encoding="UTF-8"?> <?xml version="1.0" encoding="UTF-8"?>
<project version="4"> <project version="4">
<component name="GitSharedSettings">
<option name="FORCE_PUSH_PROHIBITED_PATTERNS">
<list />
</option>
</component>
<component name="VcsDirectoryMappings"> <component name="VcsDirectoryMappings">
<mapping directory="" vcs="Git" /> <mapping directory="" vcs="Git" />
</component> </component>

182
Cargo.lock generated
View file

@ -17,19 +17,6 @@ version = "2.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "512761e0bb2578dd7380c6baaa0f4ce03e84f95e960231d1dec8bf4d7d6e2627" checksum = "512761e0bb2578dd7380c6baaa0f4ce03e84f95e960231d1dec8bf4d7d6e2627"
[[package]]
name = "ahash"
version = "0.8.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5a15f179cd60c4584b8a8c596927aadc462e27f2ca70c04e0071964a73ba7a75"
dependencies = [
"cfg-if",
"getrandom 0.3.2",
"once_cell",
"version_check",
"zerocopy",
]
[[package]] [[package]]
name = "aho-corasick" name = "aho-corasick"
version = "1.1.3" version = "1.1.3"
@ -185,12 +172,6 @@ version = "3.17.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1628fb46dfa0b37568d12e5edd512553eccf6a22a78e8bde00bb4aed84d5bdbf" checksum = "1628fb46dfa0b37568d12e5edd512553eccf6a22a78e8bde00bb4aed84d5bdbf"
[[package]]
name = "bytecount"
version = "0.6.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5ce89b21cab1437276d2650d57e971f9d548a2d9037cc231abdc0562b97498ce"
[[package]] [[package]]
name = "bytemuck" name = "bytemuck"
version = "1.23.0" version = "1.23.0"
@ -279,27 +260,6 @@ version = "0.7.4"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f46ad14479a25103f283c0f10005961cf086d8dc42205bb44c46ac563475dca6" checksum = "f46ad14479a25103f283c0f10005961cf086d8dc42205bb44c46ac563475dca6"
[[package]]
name = "cli"
version = "0.1.0"
dependencies = [
"chrono",
"clap",
"directories",
"figment",
"futures",
"lib_sync_core",
"serde",
"serde_json",
"sqlx",
"tabled",
"thiserror",
"tokio",
"tracing",
"tracing-core",
"tracing-subscriber",
]
[[package]] [[package]]
name = "colorchoice" name = "colorchoice"
version = "1.0.3" version = "1.0.3"
@ -529,21 +489,6 @@ dependencies = [
"percent-encoding", "percent-encoding",
] ]
[[package]]
name = "futures"
version = "0.3.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "65bc07b1a8bc7c85c5f2e110c476c7389b4554ba72af57d8445ea63a576b0876"
dependencies = [
"futures-channel",
"futures-core",
"futures-executor",
"futures-io",
"futures-sink",
"futures-task",
"futures-util",
]
[[package]] [[package]]
name = "futures-channel" name = "futures-channel"
version = "0.3.31" version = "0.3.31"
@ -588,17 +533,6 @@ version = "0.3.31"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9e5c1b78ca4aae1ac06c48a526a655760685149f0d465d21f37abfe57ce075c6" checksum = "9e5c1b78ca4aae1ac06c48a526a655760685149f0d465d21f37abfe57ce075c6"
[[package]]
name = "futures-macro"
version = "0.3.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "162ee34ebcb7c64a8abebc059ce0fee27c2262618d7b60ed8faf72fef13c3650"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]] [[package]]
name = "futures-sink" name = "futures-sink"
version = "0.3.31" version = "0.3.31"
@ -617,10 +551,8 @@ version = "0.3.31"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9fa08315bb612088cc391249efdc3bc77536f16c91f6cf495e6fbe85b20a4a81" checksum = "9fa08315bb612088cc391249efdc3bc77536f16c91f6cf495e6fbe85b20a4a81"
dependencies = [ dependencies = [
"futures-channel",
"futures-core", "futures-core",
"futures-io", "futures-io",
"futures-macro",
"futures-sink", "futures-sink",
"futures-task", "futures-task",
"memchr", "memchr",
@ -937,26 +869,6 @@ dependencies = [
"spin", "spin",
] ]
[[package]]
name = "lib_sync_core"
version = "0.1.0"
dependencies = [
"chrono",
"clap",
"directories",
"figment",
"futures",
"serde",
"serde_json",
"sqlx",
"tabled",
"thiserror",
"tokio",
"tracing",
"tracing-core",
"tracing-subscriber",
]
[[package]] [[package]]
name = "libc" name = "libc"
version = "0.2.172" version = "0.2.172"
@ -1147,17 +1059,6 @@ version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39" checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39"
[[package]]
name = "papergrid"
version = "0.15.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "30268a8d20c2c0d126b2b6610ab405f16517f6ba9f244d8c59ac2c512a8a1ce7"
dependencies = [
"ahash",
"bytecount",
"unicode-width",
]
[[package]] [[package]]
name = "parking" name = "parking"
version = "2.2.1" version = "2.2.1"
@ -1273,28 +1174,6 @@ dependencies = [
"zerocopy", "zerocopy",
] ]
[[package]]
name = "proc-macro-error-attr2"
version = "2.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "96de42df36bb9bba5542fe9f1a054b8cc87e172759a1868aa05c1f3acc89dfc5"
dependencies = [
"proc-macro2",
"quote",
]
[[package]]
name = "proc-macro-error2"
version = "2.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "11ec05c52be0a07b08061f7dd003e7d7092e0472bc731b4af7bb1ef876109802"
dependencies = [
"proc-macro-error-attr2",
"proc-macro2",
"quote",
"syn",
]
[[package]] [[package]]
name = "proc-macro2" name = "proc-macro2"
version = "1.0.95" version = "1.0.95"
@ -1362,6 +1241,24 @@ dependencies = [
"getrandom 0.2.16", "getrandom 0.2.16",
] ]
[[package]]
name = "readwise-bulk-upload"
version = "0.1.0"
dependencies = [
"chrono",
"clap",
"directories",
"figment",
"serde",
"serde_json",
"sqlx",
"thiserror",
"tokio",
"tracing",
"tracing-core",
"tracing-subscriber",
]
[[package]] [[package]]
name = "redox_syscall" name = "redox_syscall"
version = "0.5.12" version = "0.5.12"
@ -1865,30 +1762,6 @@ dependencies = [
"syn", "syn",
] ]
[[package]]
name = "tabled"
version = "0.19.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "228d124371171cd39f0f454b58f73ddebeeef3cef3207a82ffea1c29465aea43"
dependencies = [
"papergrid",
"tabled_derive",
"testing_table",
]
[[package]]
name = "tabled_derive"
version = "0.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0ea5d1b13ca6cff1f9231ffd62f15eefd72543dab5e468735f1a456728a02846"
dependencies = [
"heck",
"proc-macro-error2",
"proc-macro2",
"quote",
"syn",
]
[[package]] [[package]]
name = "tempfile" name = "tempfile"
version = "3.19.1" version = "3.19.1"
@ -1902,15 +1775,6 @@ dependencies = [
"windows-sys 0.59.0", "windows-sys 0.59.0",
] ]
[[package]]
name = "testing_table"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0f8daae29995a24f65619e19d8d31dea5b389f3d853d8bf297bbf607cd0014cc"
dependencies = [
"unicode-width",
]
[[package]] [[package]]
name = "thiserror" name = "thiserror"
version = "2.0.12" version = "2.0.12"
@ -2108,12 +1972,6 @@ version = "0.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e70f2a8b45122e719eb623c01822704c4e0907e7e426a05927e1a1cfff5b75d0" checksum = "e70f2a8b45122e719eb623c01822704c4e0907e7e426a05927e1a1cfff5b75d0"
[[package]]
name = "unicode-width"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1fc81956842c57dac11422a97c3b8195a1ff727f06e85c84ed2e8aa277c9a0fd"
[[package]] [[package]]
name = "url" name = "url"
version = "2.5.4" version = "2.5.4"
@ -2240,10 +2098,6 @@ dependencies = [
"unicode-ident", "unicode-ident",
] ]
[[package]]
name = "web"
version = "0.1.0"
[[package]] [[package]]
name = "whoami" name = "whoami"
version = "1.6.0" version = "1.6.0"

View file

@ -1,6 +1,18 @@
[workspace] [package]
resolver = "3" name = "readwise-bulk-upload"
version = "0.1.0"
edition = "2024"
members = [ [dependencies]
"cli", "lib_sync_core", "web", thiserror = "2.0.12"
] directories = "6.0.0"
tokio = { version = "1.45.0", features = ["default", "rt", "rt-multi-thread", "macros"] }
sqlx = { version = "0.8", features = [ "runtime-tokio", "sqlite", "chrono", "migrate" ] }
clap = { version = "4.5.37", features = ["derive"] }
serde = { version = "1.0.219", features = ["derive"] }
chrono = {version = "0.4.41", features = ["serde"]}
serde_json = "1.0.140"
tracing = "0.1.41"
tracing-subscriber = { version = "0.3.19" , features = ["env-filter"]}
figment = { version = "0.10.19", features = ["env"] }
tracing-core = "0.1.33"

View file

@ -1,26 +0,0 @@
[package]
name = "cli"
version = "0.1.0"
edition = "2024"
[[bin]]
name = "readwise"
path = "bin/readwise/main.rs"
[dependencies]
lib_sync_core = {path = "../lib_sync_core"}
directories = "6.0.0"
tokio = { version = "1.45.0", features = ["default", "rt", "rt-multi-thread", "macros"] }
sqlx = { version = "0.8", features = [ "runtime-tokio", "sqlite", "chrono", "migrate" ] }
clap = { version = "4.5.37", features = ["derive"] }
serde = { version = "1.0.219", features = ["derive"] }
chrono = {version = "0.4.41", features = ["serde"]}
serde_json = "1.0.140"
tracing = "0.1.41"
tracing-subscriber = { version = "0.3.19" , features = ["env-filter"]}
figment = { version = "0.10.19", features = ["env"] }
tracing-core = "0.1.33"
tabled = "0.19.0"
futures = "0.3.31"
thiserror = "2.0.12"

View file

@ -1,76 +0,0 @@
use clap::{CommandFactory, Parser};
use directories::ProjectDirs;
use figment::{
Figment,
providers::{Env, Serialized},
};
use lib_sync_core::task_manager::{TaskManager, TaskStatus};
use cli::config::{Command, Config};
use cli::{Error, Result};
use std::fs::File;
use tabled::Table;
use tracing_subscriber;
use crate::external_interface::DocumentPayload;
mod external_interface;
#[tokio::main]
async fn main() -> Result<()> {
let cli = Config::parse();
let args: Config = Figment::new()
.merge(Serialized::defaults(&cli))
.merge(Env::prefixed("APP_"))
.extract()?;
tracing_subscriber::fmt()
.with_max_level(args.log_level())
.init();
run(&cli.command).await?;
Ok(())
}
async fn run(command: &Command) -> Result<()> {
let project_dir = ProjectDirs::from("", "", "synchronizator_readwise").ok_or(
lib_sync_core::error::Error::Unhandled("Could not get standard directories"),
)?;
let task_manager = TaskManager::new(project_dir.data_dir()).await?;
match command {
Command::LoadTasks { path } => {
let file = File::open(path).map_err(|_| {
Error::Runtime(format!(
r#"The file "{}" could not be open"#,
path.display()
))
})?;
let documents: Vec<DocumentPayload> = serde_json::from_reader(file)?;
task_manager.load_tasks(documents).await?;
}
Command::Query => {
let tasks = task_manager
.get_tasks::<DocumentPayload>(None, Some(25))
.await?;
println!("{}", Table::new(tasks));
}
Command::Run => {
task_manager
.run_tasks::<DocumentPayload>(|task| {
println!("{}", task.get_key());
TaskStatus::Completed
})
.await?;
}
Command::None => {
Config::command().print_help()?;
}
}
Ok(())
}

View file

@ -1,23 +0,0 @@
use clap::Parser;
use figment::{
providers::{Env, Serialized},
Figment,
};
use cli::config::Config;
use cli::Result;
use tracing_subscriber;
#[tokio::main]
async fn main() -> Result<()> {
let cli = Config::parse();
let args: Config = Figment::new()
.merge(Serialized::defaults(&cli))
.merge(Env::prefixed("APP_"))
.extract()?;
tracing_subscriber::fmt()
.with_max_level(args.log_level())
.init();
Ok(())
}

View file

@ -1,20 +0,0 @@
[package]
name = "lib_sync_core"
version = "0.1.0"
edition = "2024"
[dependencies]
directories = "6.0.0"
tokio = { version = "1.45.0", features = ["default", "rt", "rt-multi-thread", "macros"] }
sqlx = { version = "0.8", features = [ "runtime-tokio", "sqlite", "chrono", "migrate" ] }
clap = { version = "4.5.37", features = ["derive"] }
serde = { version = "1.0.219", features = ["derive"] }
chrono = {version = "0.4.41", features = ["serde"]}
serde_json = "1.0.140"
tracing = "0.1.41"
tracing-subscriber = { version = "0.3.19" , features = ["env-filter"]}
figment = { version = "0.10.19", features = ["env"] }
tracing-core = "0.1.33"
tabled = "0.19.0"
futures = "0.3.31"
thiserror = "2.0.12"

View file

@ -1,24 +0,0 @@
use thiserror::Error;
#[derive(Error, Debug)]
pub enum Error {
#[error("{0}")]
Exception(&'static str),
#[error("{0}")]
Unhandled(&'static str),
#[error(transparent)]
Io(#[from] tokio::io::Error),
#[error(transparent)]
Sqlx(#[from] sqlx::Error),
#[error(transparent)]
Migration(#[from] sqlx::migrate::MigrateError),
#[error(transparent)]
ParseJson(#[from] serde_json::Error),
}
pub type Result<T> = std::result::Result<T, Error>;

View file

@ -1,19 +0,0 @@
pub mod error;
pub(crate) use error::*;
pub mod task_manager;
pub fn add(left: u64, right: u64) -> u64 {
left + right
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn it_works() {
let result = add(2, 2);
assert_eq!(result, 4);
}
}

View file

@ -1,200 +0,0 @@
use crate::error::Error;
use chrono::Utc;
use directories::ProjectDirs;
use futures::{StreamExt, TryStreamExt};
use serde::de::DeserializeOwned;
use serde::Serialize;
use sqlx::sqlite::{SqliteConnectOptions, SqliteJournalMode};
use sqlx::{QueryBuilder, Sqlite, SqlitePool};
use std::fmt::Display;
use std::path::PathBuf;
use tabled::Tabled;
use tokio::fs;
use tracing::{info, instrument};
static SQLITE_BIND_LIMIT: usize = 32766;
#[derive(sqlx::Type, Debug)]
#[repr(u8)]
pub enum TaskStatus {
Pending = 1,
InProgress = 2,
Completed = 3,
Failed = 4,
}
impl Display for TaskStatus {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
TaskStatus::Pending => {
write!(f, "Pending")
}
TaskStatus::InProgress => {
write!(f, "In Progress")
}
TaskStatus::Completed => {
write!(f, "Completed")
}
TaskStatus::Failed => {
write!(f, "Failed")
}
}
}
}
pub trait TaskPayload {
fn get_key(&self) -> String;
}
pub type TaskJob<T: _Task> = fn(&Task<T>) -> TaskStatus;
#[derive(sqlx::FromRow, Tabled, Debug)]
pub struct Task<T: DeserializeOwned + std::fmt::Display> {
id: u32,
payload_key: String,
#[sqlx(json)]
#[tabled(skip)]
payload: T,
#[sqlx(rename = "status_id")]
status: TaskStatus,
created_at: chrono::DateTime<Utc>,
#[tabled(display = "display_option_date")]
updated_at: Option<chrono::DateTime<Utc>>,
}
impl<T: DeserializeOwned + std::fmt::Display> Task<T> {
pub fn get_key(&self) -> String {
self.payload_key.clone()
}
}
fn display_option_date(o: &Option<chrono::DateTime<Utc>>) -> String {
match o {
Some(s) => format!("{}", s),
None => String::from(""),
}
}
pub trait _Task: DeserializeOwned + Send + Unpin + 'static + Display {}
impl<T: DeserializeOwned + Send + Unpin + 'static + Display> _Task for T {}
#[derive(Debug)]
pub struct TaskManager {
base_path: PathBuf,
pool: SqlitePool,
}
impl TaskManager {
pub async fn new<P: Into<PathBuf>>(base_path: P) -> Result<TaskManager, Error> {
let base_path = base_path.into();
let pool = Self::connect_database(base_path.clone()).await?;
Ok(Self {
base_path,
pool,
})
}
async fn connect_database<P: Into<PathBuf>>(base_path: P) -> crate::Result<SqlitePool> {
let base_path = base_path.into();
let database_file_path = base_path.join("db.sql");
fs::create_dir_all(base_path).await?;
let opts = SqliteConnectOptions::new()
.filename(database_file_path)
.create_if_missing(true)
.journal_mode(SqliteJournalMode::Wal);
let pool = SqlitePool::connect_with(opts).await?;
sqlx::migrate!("../migrations").run(&pool).await?;
Ok(pool)
}
fn get_task_builder(
status: Option<TaskStatus>,
limit: Option<u16>,
) -> QueryBuilder<'static, Sqlite> {
let mut builder: QueryBuilder<'_, Sqlite> = QueryBuilder::new(
"select id, payload_key, payload, status_id, created_at, updated_at from tasks ",
);
if let Some(status) = status {
builder.push("where status_id = ").push_bind(status);
}
builder.push("ORDER BY created_at DESC ");
if let Some(limit) = limit {
builder.push("LIMIT ").push_bind(limit);
}
builder
}
pub async fn get_tasks<T: _Task>(
&self,
status: Option<TaskStatus>,
limit: Option<u16>,
) -> crate::Result<Vec<Task<T>>> {
let mut builder = Self::get_task_builder(status, limit);
let tasks: Vec<Task<T>> = builder.build_query_as().fetch_all(&self.pool).await?;
Ok(tasks)
}
#[instrument(skip(self, values))]
pub async fn load_tasks<T>(&self, values: Vec<T>) -> crate::Result<()>
where
T: TaskPayload + Serialize + std::fmt::Debug,
{
let mut tx = self.pool.begin().await?;
let mut builder: QueryBuilder<'_, Sqlite> =
QueryBuilder::new("insert into tasks(payload_key, payload, status_id)");
let args: crate::Result<Vec<(String, String)>> = values
.iter()
.map(|value| Ok((value.get_key(), serde_json::to_string(value)?)))
.collect();
let mut affected_rows = 0;
// Chunk the query by the size limit of bind params
for chunk in args?.chunks(SQLITE_BIND_LIMIT / 3) {
builder.push_values(chunk, |mut builder, item| {
builder
.push_bind(&item.0)
.push_bind(&item.1)
.push_bind(TaskStatus::Pending);
});
builder.push("ON conflict (payload_key) DO NOTHING");
let query = builder.build();
affected_rows += query.execute(&mut *tx).await?.rows_affected();
builder.reset();
}
tx.commit().await?;
info!("{} rows inserted.", affected_rows);
Ok(())
}
pub async fn run_tasks<T: _Task>(&self, func: TaskJob<T>) -> crate::Result<()> {
let mut builder = Self::get_task_builder(Some(TaskStatus::Pending), None);
let rows = builder.build_query_as::<Task<T>>().fetch(&self.pool);
let result: Vec<(Task<T>, TaskStatus)> = rows.map(|x| {
let task = x.unwrap();
let status = func(&task);
(task, status)
}).collect().await;
Ok(())
}
}

View file

@ -1,4 +1,4 @@
use clap::{Parser, Subcommand, ValueEnum}; use clap::{Parser, ValueEnum};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::fmt; use std::fmt;
use std::path::PathBuf; use std::path::PathBuf;
@ -57,32 +57,9 @@ impl Into<LevelFilter> for VerbosityLevel {
} }
} }
#[derive(Debug, Subcommand)]
#[clap(rename_all = "snake_case")]
pub enum Command {
/// Load task into the database from [path]
LoadTasks{
/// Path to the file
path: PathBuf,
},
Query,
Run,
#[clap(skip)]
None,
}
impl Default for Command {
fn default() -> Self {
Command::None
}
}
#[derive(Debug, Parser, Serialize, Deserialize)] #[derive(Debug, Parser, Serialize, Deserialize)]
pub struct Config { pub struct Config {
#[command(subcommand)] path: PathBuf,
#[serde(skip)]
pub command: Command,
#[arg( #[arg(
long, long,
short = 'v', short = 'v',
@ -94,6 +71,9 @@ pub struct Config {
} }
impl Config { impl Config {
pub fn path(&self) -> &PathBuf {
&self.path
}
pub fn log_level(&self) -> LevelFilter { pub fn log_level(&self) -> LevelFilter {
self.log_level.clone().into() self.log_level.clone().into()

View file

@ -11,9 +11,6 @@ pub enum Error {
#[error("{0}")] #[error("{0}")]
Unhandled(&'static str), Unhandled(&'static str),
#[error(transparent)]
Sync(#[from] lib_sync_core::error::Error),
#[error(transparent)] #[error(transparent)]
Sqlx(#[from] sqlx::Error), Sqlx(#[from] sqlx::Error),

View file

@ -1,4 +1,6 @@
pub mod config;
mod error; mod error;
pub mod sql;
pub mod config;
pub mod readwise;
pub use error::*; pub use error::*;

35
src/main.rs Normal file
View file

@ -0,0 +1,35 @@
use clap::Parser;
use readwise_bulk_upload::config::Config;
use readwise_bulk_upload::readwise::DocumentPayload;
use readwise_bulk_upload::sql::TaskManager;
use readwise_bulk_upload::{Error, Result};
use std::fs::File;
use tracing_subscriber;
use figment::{Figment, providers::{Serialized, Env}};
#[tokio::main]
async fn main() -> Result<()> {
let args: Config = Figment::new()
.merge(Serialized::defaults(Config::parse()))
.merge(Env::prefixed("APP_"))
.extract()?;
tracing_subscriber::fmt()
.with_max_level(args.log_level())
.init();
let file = File::open(args.path()).map_err(|_| {
Error::Runtime(format!(
r#"The file "{}" could not be open"#,
args.path().display()
))
})?;
let documents: Vec<DocumentPayload> = serde_json::from_reader(file)?;
let task_manager = TaskManager::new().await?;
task_manager.load_tasks(documents).await?;
Ok(())
}

View file

@ -1,8 +1,7 @@
use lib_sync_core::task_manager::TaskPayload;
use chrono::{DateTime, Local}; use chrono::{DateTime, Local};
use serde::{de, Deserialize, Deserializer, Serialize}; use serde::{Deserialize, Deserializer, de, Serialize};
use serde_json::Value; use serde_json::Value;
use std::fmt::Display; use crate::sql::TaskPayload;
#[derive(Deserialize, Serialize, Debug)] #[derive(Deserialize, Serialize, Debug)]
pub struct DocumentPayload { pub struct DocumentPayload {
@ -15,16 +14,6 @@ pub struct DocumentPayload {
location: String, location: String,
} }
impl Display for DocumentPayload {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"{}",
serde_json::to_string_pretty(self).map_err(|_| std::fmt::Error)?
)
}
}
impl TaskPayload for DocumentPayload { impl TaskPayload for DocumentPayload {
fn get_key(&self) -> String { fn get_key(&self) -> String {
self.url.clone() self.url.clone()

94
src/sql.rs Normal file
View file

@ -0,0 +1,94 @@
use crate::Error;
use directories::ProjectDirs;
use serde::Serialize;
use sqlx::sqlite::{SqliteConnectOptions, SqliteJournalMode};
use sqlx::{QueryBuilder, Sqlite, SqlitePool};
use tokio::fs;
use tracing::{info, instrument};
static SQLITE_BIND_LIMIT: usize = 32766;
#[derive(sqlx::Type)]
#[repr(u8)]
pub enum TaskStatus {
Pending = 1,
InProgress = 2,
Completed = 3,
Failed = 4,
}
pub trait TaskPayload {
fn get_key(&self) -> String;
}
#[derive(Debug)]
pub struct TaskManager {
pool: SqlitePool,
}
impl TaskManager {
pub async fn new() -> Result<TaskManager, Error> {
Ok(Self {
pool: Self::connect_database().await?,
})
}
async fn connect_database() -> crate::Result<SqlitePool> {
let project_dir = ProjectDirs::from("", "", env!("CARGO_PKG_NAME"))
.ok_or(Error::Unhandled("Could not get standard directories"))?;
let database_file_path = project_dir.data_dir().join("db.sql");
fs::create_dir_all(project_dir.data_dir()).await?;
let opts = SqliteConnectOptions::new()
.filename(database_file_path)
.create_if_missing(true)
.journal_mode(SqliteJournalMode::Wal);
let pool = SqlitePool::connect_with(opts).await?;
sqlx::migrate!("./migrations").run(&pool).await?;
Ok(pool)
}
#[instrument(skip(self, values))]
pub async fn load_tasks<T>(&self, values: Vec<T>) -> crate::Result<()>
where
T: TaskPayload + Serialize + std::fmt::Debug,
{
let mut tx = self.pool.begin().await?;
let mut builder: QueryBuilder<'_, Sqlite> =
QueryBuilder::new("insert into tasks(payload_key, payload, status_id)");
let args: crate::Result<Vec<(String, String)>> = values
.iter()
.map(|value| Ok((value.get_key(), serde_json::to_string(value)?)))
.collect();
let mut affected_rows = 0;
// Chunk the query by the size limit of bind params
for chunk in args?.chunks(SQLITE_BIND_LIMIT / 3) {
builder.push_values(chunk, |mut builder, item| {
builder
.push_bind(&item.0)
.push_bind(&item.1)
.push_bind(TaskStatus::Pending);
});
builder.push("ON conflict (payload_key) DO NOTHING");
let query = builder.build();
affected_rows += query.execute(&mut *tx).await?.rows_affected();
builder.reset();
}
tx.commit().await?;
info!("{} rows inserted.", affected_rows);
Ok(())
}
}

View file

@ -1,6 +0,0 @@
[package]
name = "web"
version = "0.1.0"
edition = "2024"
[dependencies]

View file

@ -1,3 +0,0 @@
fn main() {
println!("Hello, world!");
}