diff options
author | metamuffin <metamuffin@disroot.org> | 2024-01-20 00:50:20 +0100 |
---|---|---|
committer | metamuffin <metamuffin@disroot.org> | 2024-01-20 00:50:20 +0100 |
commit | 46c251655db7bb3d9aa814b1a5dde85336b0b9b1 (patch) | |
tree | ab0696f2c92e8854ce6aa0737877cc15184bd8b6 /import/src | |
parent | 1c37d32a0985ff7390313833345b9299f9f0b196 (diff) | |
download | jellything-46c251655db7bb3d9aa814b1a5dde85336b0b9b1.tar jellything-46c251655db7bb3d9aa814b1a5dde85336b0b9b1.tar.bz2 jellything-46c251655db7bb3d9aa814b1a5dde85336b0b9b1.tar.zst |
replace sled with redb
Diffstat (limited to 'import/src')
-rw-r--r-- | import/src/infojson.rs | 6 | ||||
-rw-r--r-- | import/src/lib.rs | 119 | ||||
-rw-r--r-- | import/src/tmdb.rs | 6 |
3 files changed, 85 insertions, 46 deletions
diff --git a/import/src/infojson.rs b/import/src/infojson.rs index c18c19d..e6b001f 100644 --- a/import/src/infojson.rs +++ b/import/src/infojson.rs @@ -5,7 +5,7 @@ */ use anyhow::Context; -use jellycommon::chrono::{format::Parsed, DateTime, Utc}; +use jellycommon::chrono::{format::Parsed, Utc}; use serde::{Deserialize, Serialize}; use std::collections::HashMap; @@ -123,7 +123,7 @@ pub struct YHeatmapSample { pub value: f64, } -pub fn parse_upload_date(d: &str) -> anyhow::Result<DateTime<Utc>> { +pub fn parse_upload_date(d: &str) -> anyhow::Result<i64> { let (year, month, day) = (&d[0..4], &d[4..6], &d[6..8]); let (year, month, day) = ( year.parse().context("parsing year")?, @@ -139,5 +139,5 @@ pub fn parse_upload_date(d: &str) -> anyhow::Result<DateTime<Utc>> { p.hour_mod_12 = Some(0); p.minute = Some(0); p.second = Some(0); - Ok(p.to_datetime_with_timezone(&Utc)?) + Ok(p.to_datetime_with_timezone(&Utc)?.timestamp_millis()) } diff --git a/import/src/lib.rs b/import/src/lib.rs index 692cf7f..a4d1611 100644 --- a/import/src/lib.rs +++ b/import/src/lib.rs @@ -13,7 +13,7 @@ use async_recursion::async_recursion; use futures::{executor::block_on, stream::FuturesUnordered, StreamExt}; use jellybase::{ cache::{async_cache_file, cache_memory}, - database::Database, + database::{DataAcid, ReadableTable, Ser, T_NODE, T_NODE_IMPORT}, federation::Federation, AssetLocationExt, CONF, }; @@ -42,32 +42,54 @@ use tokio::{io::AsyncWriteExt, sync::Semaphore, task::spawn_blocking}; static IMPORT_SEM: LazyLock<Semaphore> = LazyLock::new(|| Semaphore::new(1)); -pub async fn import(db: &Database, fed: &Federation) -> anyhow::Result<()> { +pub async fn import(db: &DataAcid, fed: &Federation) -> anyhow::Result<()> { let permit = IMPORT_SEM.try_acquire()?; - if !db.node_import.is_empty() { - info!("clearing temporary node tree from an aborted last import..."); - db.node_import.clear()?; + + { + let txn = db.inner.begin_write()?; + let mut table = txn.open_table(T_NODE_IMPORT)?; + if !table.is_empty()? { + info!("clearing temporary node tree from an aborted last import..."); + table.drain::<&str>(..)?; + } + drop(table); + txn.commit()?; } info!("loading sources..."); import_path(CONF.library_path.clone(), vec![], db, fed) .await .context("indexing")?; info!("removing old nodes..."); - db.node.clear()?; + { + let txn = db.inner.begin_write()?; + let mut table = txn.open_table(T_NODE)?; + table.drain::<&str>(..)?; + drop(table); + txn.commit()?; + } info!("merging nodes..."); merge_nodes(db).context("merging nodes")?; info!("generating paths..."); generate_node_paths(db).context("generating paths")?; info!("clearing temporary node tree..."); - db.node_import.clear()?; + { + let txn = db.inner.begin_write()?; + let mut table = txn.open_table(T_NODE_IMPORT)?; + table.drain::<&str>(..)?; + drop(table); + txn.commit()?; + } info!("import completed"); drop(permit); Ok(()) } -pub fn merge_nodes(db: &Database) -> anyhow::Result<()> { - for r in db.node_import.iter() { - let (id, mut nodes) = r?; +pub fn merge_nodes(db: &DataAcid) -> anyhow::Result<()> { + let txn_read = db.inner.begin_read()?; + let t_node_import = txn_read.open_table(T_NODE_IMPORT)?; + for r in t_node_import.iter()? { + let (id, nodes) = r?; + let mut nodes = nodes.value().0; nodes.sort_by(|(x, _), (y, _)| compare_index_path(x, y)); @@ -77,27 +99,36 @@ pub fn merge_nodes(db: &Database) -> anyhow::Result<()> { .reduce(|x, y| merge_node(x, y)) .unwrap(); - node.public.id = Some(id.clone()); + node.public.id = Some(id.value().to_owned()); node.public.path = vec![]; // will be reconstructed in the next pass - db.node.insert(&id, &node)?; + { + let txn_write = db.inner.begin_write()?; + let mut t_node = txn_write.open_table(T_NODE)?; + t_node.insert(id.value(), Ser(node))?; + drop(t_node); + txn_write.commit()?; + } } Ok(()) } -pub fn generate_node_paths(db: &Database) -> anyhow::Result<()> { - fn traverse(db: &Database, c: String, mut path: Vec<String>) -> anyhow::Result<()> { - let node = db - .node - .update_and_fetch(&c, |mut nc| { - if let Some(nc) = &mut nc { - if nc.public.path.is_empty() { - nc.public.path = path.clone(); - } - } - nc - })? - .ok_or(anyhow!("node {c:?} missing"))?; +pub fn generate_node_paths(db: &DataAcid) -> anyhow::Result<()> { + fn traverse(db: &DataAcid, c: String, mut path: Vec<String>) -> anyhow::Result<()> { + let node = { + let txn = db.inner.begin_write()?; + let table = txn.open_table(T_NODE)?; + + let mut node = table.get(&*c)?.ok_or(anyhow!("your mum"))?.value().0; + + if node.public.path.is_empty() { + node.public.path = path.clone(); + } + + drop(table); + txn.commit()?; + node + }; path.push(c); for c in node.public.children { @@ -126,7 +157,7 @@ fn compare_index_path(x: &[usize], y: &[usize]) -> Ordering { pub async fn import_path( path: PathBuf, index_path: Vec<usize>, - db: &Database, + db: &DataAcid, fed: &Federation, ) -> anyhow::Result<()> { if path.is_dir() { @@ -190,15 +221,19 @@ async fn process_source( s: ImportSource, path: &Path, index_path: &[usize], - db: &Database, + db: &DataAcid, fed: &Federation, ) -> anyhow::Result<()> { - let insert_node = move |id: &String, n: Node| -> anyhow::Result<()> { - db.node_import.fetch_and_update(id, |l| { - let mut l = l.unwrap_or_default(); - l.push((index_path.to_vec(), n.clone())); - Some(l) - })?; + let insert_node = move |id: &str, n: Node| -> anyhow::Result<()> { + let txn = db.inner.begin_write()?; + let mut table = txn.open_table(T_NODE_IMPORT)?; + + let mut node = table.get(id)?.map(|a| a.value().0).unwrap_or_default(); + node.push((index_path.to_vec(), n.clone())); + table.insert(id, Ser(node))?; + + drop(table); + txn.commit()?; Ok(()) }; match s { @@ -497,16 +532,20 @@ static SEM_REMOTE_IMPORT: Semaphore = Semaphore::const_new(16); async fn import_remote( id: String, host: &str, - db: &Database, + db: &DataAcid, session: &Arc<Session>, index_path: &[usize], ) -> anyhow::Result<()> { - let insert_node = move |id: &String, n: Node| -> anyhow::Result<()> { - db.node_import.fetch_and_update(id, |l| { - let mut l = l.unwrap_or_default(); - l.push((index_path.to_vec(), n.clone())); - Some(l) - })?; + let insert_node = move |id: &str, n: Node| -> anyhow::Result<()> { + let txn = db.inner.begin_write()?; + let mut table = txn.open_table(T_NODE_IMPORT)?; + + let mut node = table.get(id)?.map(|a| a.value().0).unwrap_or_default(); + node.push((index_path.to_vec(), n.clone())); + table.insert(id, Ser(node))?; + + drop(table); + txn.commit()?; Ok(()) }; let _permit = SEM_REMOTE_IMPORT.acquire().await.unwrap(); diff --git a/import/src/tmdb.rs b/import/src/tmdb.rs index 37447e6..95ebef4 100644 --- a/import/src/tmdb.rs +++ b/import/src/tmdb.rs @@ -5,7 +5,7 @@ */ use anyhow::Context; use bincode::{Decode, Encode}; -use jellycommon::chrono::{format::Parsed, DateTime, Utc}; +use jellycommon::chrono::{format::Parsed, Utc}; use log::info; use serde::Deserialize; @@ -115,7 +115,7 @@ pub async fn tmdb_image(path: &str) -> anyhow::Result<Vec<u8>> { Ok(res.bytes().await?.to_vec()) } -pub fn parse_release_date(d: &str) -> anyhow::Result<DateTime<Utc>> { +pub fn parse_release_date(d: &str) -> anyhow::Result<i64> { let (year, month, day) = (&d[0..4], &d[5..7], &d[8..10]); let (year, month, day) = ( year.parse().context("parsing year")?, @@ -131,5 +131,5 @@ pub fn parse_release_date(d: &str) -> anyhow::Result<DateTime<Utc>> { p.hour_mod_12 = Some(0); p.minute = Some(0); p.second = Some(0); - Ok(p.to_datetime_with_timezone(&Utc)?) + Ok(p.to_datetime_with_timezone(&Utc)?.timestamp_millis()) } |