fix: load tasks in chunks
This commit is contained in:
parent
f288527ea4
commit
d84c58fd1c
1 changed files with 18 additions and 15 deletions
27
src/sql.rs
27
src/sql.rs
|
|
@ -5,7 +5,7 @@ use sqlx::sqlite::{SqliteConnectOptions, SqliteJournalMode};
|
||||||
use sqlx::{QueryBuilder, Sqlite, SqlitePool};
|
use sqlx::{QueryBuilder, Sqlite, SqlitePool};
|
||||||
use tokio::fs;
|
use tokio::fs;
|
||||||
|
|
||||||
static SQLITE_QUERY_LIMIT: usize = 32766;
|
static SQLITE_BIND_LIMIT: usize = 32766;
|
||||||
|
|
||||||
#[derive(sqlx::Type)]
|
#[derive(sqlx::Type)]
|
||||||
#[repr(u8)]
|
#[repr(u8)]
|
||||||
|
|
@ -55,6 +55,7 @@ impl TaskManager {
|
||||||
where
|
where
|
||||||
T: TaskPayload + Serialize,
|
T: TaskPayload + Serialize,
|
||||||
{
|
{
|
||||||
|
let mut tx = self.pool.begin().await?;
|
||||||
let mut builder: QueryBuilder<'_, Sqlite> =
|
let mut builder: QueryBuilder<'_, Sqlite> =
|
||||||
QueryBuilder::new("insert into tasks(payload_key, payload, status_id)");
|
QueryBuilder::new("insert into tasks(payload_key, payload, status_id)");
|
||||||
|
|
||||||
|
|
@ -63,22 +64,24 @@ impl TaskManager {
|
||||||
.map(|value| Ok((value.get_key(), serde_json::to_string(value)?)))
|
.map(|value| Ok((value.get_key(), serde_json::to_string(value)?)))
|
||||||
.collect();
|
.collect();
|
||||||
|
|
||||||
// FIXME: create chunk based insert by SQLITE_QUERY_LIMIT / N° of binds
|
|
||||||
builder.push_values(
|
|
||||||
args?.into_iter().take(SQLITE_QUERY_LIMIT / 3),
|
|
||||||
|mut builder, item| {
|
|
||||||
builder
|
|
||||||
.push_bind(item.0)
|
|
||||||
.push_bind(item.1)
|
|
||||||
.push_bind(TaskStatus::Pending);
|
|
||||||
},
|
|
||||||
);
|
|
||||||
|
|
||||||
|
// Chunk the query by the size limit of bind params
|
||||||
|
for chunk in args?.chunks(SQLITE_BIND_LIMIT / 3) {
|
||||||
|
builder.push_values(chunk, |mut builder, item| {
|
||||||
|
builder
|
||||||
|
.push_bind(&item.0)
|
||||||
|
.push_bind(&item.1)
|
||||||
|
.push_bind(TaskStatus::Pending);
|
||||||
|
});
|
||||||
builder.push("ON conflict (payload_key) DO NOTHING");
|
builder.push("ON conflict (payload_key) DO NOTHING");
|
||||||
|
|
||||||
let query = builder.build();
|
let query = builder.build();
|
||||||
|
|
||||||
query.execute(&self.pool).await?;
|
query.execute(&mut *tx).await?;
|
||||||
|
builder.reset();
|
||||||
|
}
|
||||||
|
|
||||||
|
tx.commit().await?;
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue