From d84c58fd1c0c320c227218a14f811d0a8cf45df3 Mon Sep 17 00:00:00 2001 From: aleidk Date: Thu, 8 May 2025 12:02:10 -0400 Subject: [PATCH] fix: load tasks in chunks --- src/sql.rs | 33 ++++++++++++++++++--------------- 1 file changed, 18 insertions(+), 15 deletions(-) diff --git a/src/sql.rs b/src/sql.rs index b081c89..c4b26da 100644 --- a/src/sql.rs +++ b/src/sql.rs @@ -5,7 +5,7 @@ use sqlx::sqlite::{SqliteConnectOptions, SqliteJournalMode}; use sqlx::{QueryBuilder, Sqlite, SqlitePool}; use tokio::fs; -static SQLITE_QUERY_LIMIT: usize = 32766; +static SQLITE_BIND_LIMIT: usize = 32766; #[derive(sqlx::Type)] #[repr(u8)] @@ -55,6 +55,7 @@ impl TaskManager { where T: TaskPayload + Serialize, { + let mut tx = self.pool.begin().await?; let mut builder: QueryBuilder<'_, Sqlite> = QueryBuilder::new("insert into tasks(payload_key, payload, status_id)"); @@ -63,23 +64,25 @@ impl TaskManager { .map(|value| Ok((value.get_key(), serde_json::to_string(value)?))) .collect(); - // FIXME: create chunk based insert by SQLITE_QUERY_LIMIT / N° of binds - builder.push_values( - args?.into_iter().take(SQLITE_QUERY_LIMIT / 3), - |mut builder, item| { + + // Chunk the query by the size limit of bind params + for chunk in args?.chunks(SQLITE_BIND_LIMIT / 3) { + builder.push_values(chunk, |mut builder, item| { builder - .push_bind(item.0) - .push_bind(item.1) + .push_bind(&item.0) + .push_bind(&item.1) .push_bind(TaskStatus::Pending); - }, - ); - - builder.push("ON conflict (payload_key) DO NOTHING"); - - let query = builder.build(); - - query.execute(&self.pool).await?; + }); + builder.push("ON conflict (payload_key) DO NOTHING"); + + let query = builder.build(); + + query.execute(&mut *tx).await?; + builder.reset(); + } + tx.commit().await?; + Ok(()) } }