wip: add command to run tasks

This commit is contained in:
Alexander Navarro 2025-05-12 16:40:41 -04:00
parent 63c20cfc87
commit 1315a61b87
8 changed files with 160 additions and 17 deletions

22
.idea/runConfigurations/Load_Tasks.xml generated Normal file
View file

@ -0,0 +1,22 @@
<component name="ProjectRunConfigurationManager">
<configuration default="false" name="Load Tasks" type="CargoCommandRunConfiguration" factoryName="Cargo Command">
<option name="buildProfileId" value="dev" />
<option name="command" value="run --package readwise-bulk-upload --bin readwise-bulk-upload -- load_tasks $USER_HOME$/Downloads/readwise-reader-import.json" />
<option name="workingDirectory" value="file://$PROJECT_DIR$" />
<envs>
<env name="APP_LOG_LEVEL" value="Debug" />
</envs>
<option name="emulateTerminal" value="true" />
<option name="channel" value="DEFAULT" />
<option name="requiredFeatures" value="true" />
<option name="allFeatures" value="false" />
<option name="withSudo" value="false" />
<option name="buildTarget" value="REMOTE" />
<option name="backtrace" value="SHORT" />
<option name="isRedirectInput" value="false" />
<option name="redirectInputPath" value="" />
<method v="2">
<option name="CARGO.BUILD_TASK_PROVIDER" enabled="true" />
</method>
</configuration>
</component>

22
.idea/runConfigurations/Query_Tasks.xml generated Normal file
View file

@ -0,0 +1,22 @@
<component name="ProjectRunConfigurationManager">
<configuration default="false" name="Query Tasks" type="CargoCommandRunConfiguration" factoryName="Cargo Command">
<option name="buildProfileId" value="dev" />
<option name="command" value="run --package readwise-bulk-upload --bin readwise-bulk-upload -- query" />
<option name="workingDirectory" value="file://$PROJECT_DIR$" />
<envs>
<env name="APP_LOG_LEVEL" value="Debug" />
</envs>
<option name="emulateTerminal" value="true" />
<option name="channel" value="DEFAULT" />
<option name="requiredFeatures" value="true" />
<option name="allFeatures" value="false" />
<option name="withSudo" value="false" />
<option name="buildTarget" value="REMOTE" />
<option name="backtrace" value="SHORT" />
<option name="isRedirectInput" value="false" />
<option name="redirectInputPath" value="" />
<method v="2">
<option name="CARGO.BUILD_TASK_PROVIDER" enabled="true" />
</method>
</configuration>
</component>

22
.idea/runConfigurations/Run_Tasks.xml generated Normal file
View file

@ -0,0 +1,22 @@
<component name="ProjectRunConfigurationManager">
<configuration default="false" name="Run Tasks" type="CargoCommandRunConfiguration" factoryName="Cargo Command">
<option name="buildProfileId" value="dev" />
<option name="command" value="run --package readwise-bulk-upload --bin readwise-bulk-upload -- run" />
<option name="workingDirectory" value="file://$PROJECT_DIR$" />
<envs>
<env name="APP_LOG_LEVEL" value="Debug" />
</envs>
<option name="emulateTerminal" value="true" />
<option name="channel" value="DEFAULT" />
<option name="requiredFeatures" value="true" />
<option name="allFeatures" value="false" />
<option name="withSudo" value="false" />
<option name="buildTarget" value="REMOTE" />
<option name="backtrace" value="SHORT" />
<option name="isRedirectInput" value="false" />
<option name="redirectInputPath" value="" />
<method v="2">
<option name="CARGO.BUILD_TASK_PROVIDER" enabled="true" />
</method>
</configuration>
</component>

29
Cargo.lock generated
View file

@ -508,6 +508,21 @@ dependencies = [
"percent-encoding", "percent-encoding",
] ]
[[package]]
name = "futures"
version = "0.3.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "65bc07b1a8bc7c85c5f2e110c476c7389b4554ba72af57d8445ea63a576b0876"
dependencies = [
"futures-channel",
"futures-core",
"futures-executor",
"futures-io",
"futures-sink",
"futures-task",
"futures-util",
]
[[package]] [[package]]
name = "futures-channel" name = "futures-channel"
version = "0.3.31" version = "0.3.31"
@ -552,6 +567,17 @@ version = "0.3.31"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9e5c1b78ca4aae1ac06c48a526a655760685149f0d465d21f37abfe57ce075c6" checksum = "9e5c1b78ca4aae1ac06c48a526a655760685149f0d465d21f37abfe57ce075c6"
[[package]]
name = "futures-macro"
version = "0.3.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "162ee34ebcb7c64a8abebc059ce0fee27c2262618d7b60ed8faf72fef13c3650"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]] [[package]]
name = "futures-sink" name = "futures-sink"
version = "0.3.31" version = "0.3.31"
@ -570,8 +596,10 @@ version = "0.3.31"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9fa08315bb612088cc391249efdc3bc77536f16c91f6cf495e6fbe85b20a4a81" checksum = "9fa08315bb612088cc391249efdc3bc77536f16c91f6cf495e6fbe85b20a4a81"
dependencies = [ dependencies = [
"futures-channel",
"futures-core", "futures-core",
"futures-io", "futures-io",
"futures-macro",
"futures-sink", "futures-sink",
"futures-task", "futures-task",
"memchr", "memchr",
@ -1301,6 +1329,7 @@ dependencies = [
"clap", "clap",
"directories", "directories",
"figment", "figment",
"futures",
"serde", "serde",
"serde_json", "serde_json",
"sqlx", "sqlx",

View file

@ -17,3 +17,4 @@ tracing-subscriber = { version = "0.3.19" , features = ["env-filter"]}
figment = { version = "0.10.19", features = ["env"] } figment = { version = "0.10.19", features = ["env"] }
tracing-core = "0.1.33" tracing-core = "0.1.33"
tabled = "0.19.0" tabled = "0.19.0"
futures = "0.3.31"

View file

@ -66,6 +66,7 @@ pub enum Command {
path: PathBuf, path: PathBuf,
}, },
Query, Query,
Run,
#[clap(skip)] #[clap(skip)]
None, None,
} }

View file

@ -5,7 +5,7 @@ use figment::{
}; };
use readwise_bulk_upload::config::{Command, Config}; use readwise_bulk_upload::config::{Command, Config};
use readwise_bulk_upload::readwise::DocumentPayload; use readwise_bulk_upload::readwise::DocumentPayload;
use readwise_bulk_upload::task_manager::TaskManager; use readwise_bulk_upload::task_manager::{TaskManager, TaskStatus};
use readwise_bulk_upload::{Error, Result}; use readwise_bulk_upload::{Error, Result};
use std::fs::File; use std::fs::File;
use tabled::Table; use tabled::Table;
@ -29,6 +29,7 @@ async fn main() -> Result<()> {
} }
async fn run(command: &Command) -> Result<()> { async fn run(command: &Command) -> Result<()> {
let task_manager = TaskManager::new().await?;
match command { match command {
Command::LoadTasks { path } => { Command::LoadTasks { path } => {
let file = File::open(path).map_err(|_| { let file = File::open(path).map_err(|_| {
@ -40,16 +41,21 @@ async fn run(command: &Command) -> Result<()> {
let documents: Vec<DocumentPayload> = serde_json::from_reader(file)?; let documents: Vec<DocumentPayload> = serde_json::from_reader(file)?;
let task_manager = TaskManager::new().await?;
task_manager.load_tasks(documents).await?; task_manager.load_tasks(documents).await?;
} }
Command::Query => { Command::Query => {
let task_manager = TaskManager::new().await?; let tasks = task_manager.get_tasks::<DocumentPayload>(None, Some(25)).await?;
let tasks = task_manager.get_tasks::<DocumentPayload>(None, 25).await?;
println!("{}", Table::new(tasks)); println!("{}", Table::new(tasks));
} }
Command::Run => {
task_manager.run_tasks::<DocumentPayload>(|task| {
println!("{}", task.get_key());
TaskStatus::Completed
}).await?;
}
Command::None => { Command::None => {
Config::command().print_help()?; Config::command().print_help()?;
} }

View file

@ -1,6 +1,7 @@
use crate::Error; use crate::Error;
use chrono::Utc; use chrono::Utc;
use directories::ProjectDirs; use directories::ProjectDirs;
use futures::{StreamExt, TryStreamExt};
use serde::de::DeserializeOwned; use serde::de::DeserializeOwned;
use serde::Serialize; use serde::Serialize;
use sqlx::sqlite::{SqliteConnectOptions, SqliteJournalMode}; use sqlx::sqlite::{SqliteConnectOptions, SqliteJournalMode};
@ -44,6 +45,8 @@ pub trait TaskPayload {
fn get_key(&self) -> String; fn get_key(&self) -> String;
} }
pub type TaskJob<T: _Task> = fn(&Task<T>) -> TaskStatus;
#[derive(sqlx::FromRow, Tabled, Debug)] #[derive(sqlx::FromRow, Tabled, Debug)]
pub struct Task<T: DeserializeOwned + std::fmt::Display> { pub struct Task<T: DeserializeOwned + std::fmt::Display> {
id: u32, id: u32,
@ -58,6 +61,12 @@ pub struct Task<T: DeserializeOwned + std::fmt::Display> {
updated_at: Option<chrono::DateTime<Utc>>, updated_at: Option<chrono::DateTime<Utc>>,
} }
impl<T: DeserializeOwned + std::fmt::Display> Task<T> {
pub fn get_key(&self) -> String {
self.payload_key.clone()
}
}
fn display_option_date(o: &Option<chrono::DateTime<Utc>>) -> String { fn display_option_date(o: &Option<chrono::DateTime<Utc>>) -> String {
match o { match o {
Some(s) => format!("{}", s), Some(s) => format!("{}", s),
@ -65,6 +74,9 @@ fn display_option_date(o: &Option<chrono::DateTime<Utc>>) -> String {
} }
} }
pub trait _Task: DeserializeOwned + Send + Unpin + 'static + Display {}
impl<T: DeserializeOwned + Send + Unpin + 'static + Display> _Task for T {}
#[derive(Debug)] #[derive(Debug)]
pub struct TaskManager { pub struct TaskManager {
pool: SqlitePool, pool: SqlitePool,
@ -97,21 +109,34 @@ impl TaskManager {
Ok(pool) Ok(pool)
} }
pub async fn get_tasks< fn get_task_builder(
T: DeserializeOwned + Send + Unpin + 'static + Display, status: Option<TaskStatus>,
>( limit: Option<u16>,
) -> QueryBuilder<'static, Sqlite> {
let mut builder: QueryBuilder<'_, Sqlite> = QueryBuilder::new(
"select id, payload_key, payload, status_id, created_at, updated_at from tasks ",
);
if let Some(status) = status {
builder.push("where status_id = ").push_bind(status);
}
builder.push("ORDER BY created_at DESC ");
if let Some(limit) = limit {
builder.push("LIMIT ").push_bind(limit);
}
builder
}
pub async fn get_tasks<T: _Task>(
&self, &self,
status: Option<TaskStatus>, status: Option<TaskStatus>,
limit: u16, limit: Option<u16>,
) -> crate::Result<Vec<Task<T>>> { ) -> crate::Result<Vec<Task<T>>> {
let tasks: Vec<Task<T>> = sqlx::query_as( let mut builder = Self::get_task_builder(status, limit);
"select id, payload_key, payload, status_id, created_at, updated_at from tasks where status_id = ? order by ? limit ?",
) let tasks: Vec<Task<T>> = builder.build_query_as().fetch_all(&self.pool).await?;
.bind(status.unwrap_or(TaskStatus::Pending))
.bind("created_at DESC")
.bind(limit)
.fetch_all(&self.pool)
.await?;
Ok(tasks) Ok(tasks)
} }
@ -153,4 +178,19 @@ impl TaskManager {
Ok(()) Ok(())
} }
pub async fn run_tasks<T: _Task>(&self, func: TaskJob<T>) -> crate::Result<()> {
let mut builder = Self::get_task_builder(Some(TaskStatus::Pending), None);
let rows = builder.build_query_as::<Task<T>>().fetch(&self.pool);
let result: Vec<(Task<T>, TaskStatus)> = rows.map(|x| {
let task = x.unwrap();
let status = func(&task);
(task, status)
}).collect().await;
Ok(())
}
} }