aboutsummaryrefslogtreecommitdiff
path: root/import/src
diff options
context:
space:
mode:
authormetamuffin <metamuffin@disroot.org>2024-01-20 00:50:20 +0100
committermetamuffin <metamuffin@disroot.org>2024-01-20 00:50:20 +0100
commit46c251655db7bb3d9aa814b1a5dde85336b0b9b1 (patch)
treeab0696f2c92e8854ce6aa0737877cc15184bd8b6 /import/src
parent1c37d32a0985ff7390313833345b9299f9f0b196 (diff)
downloadjellything-46c251655db7bb3d9aa814b1a5dde85336b0b9b1.tar
jellything-46c251655db7bb3d9aa814b1a5dde85336b0b9b1.tar.bz2
jellything-46c251655db7bb3d9aa814b1a5dde85336b0b9b1.tar.zst
replace sled with redb
Diffstat (limited to 'import/src')
-rw-r--r--import/src/infojson.rs6
-rw-r--r--import/src/lib.rs119
-rw-r--r--import/src/tmdb.rs6
3 files changed, 85 insertions, 46 deletions
diff --git a/import/src/infojson.rs b/import/src/infojson.rs
index c18c19d..e6b001f 100644
--- a/import/src/infojson.rs
+++ b/import/src/infojson.rs
@@ -5,7 +5,7 @@
*/
use anyhow::Context;
-use jellycommon::chrono::{format::Parsed, DateTime, Utc};
+use jellycommon::chrono::{format::Parsed, Utc};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
@@ -123,7 +123,7 @@ pub struct YHeatmapSample {
pub value: f64,
}
-pub fn parse_upload_date(d: &str) -> anyhow::Result<DateTime<Utc>> {
+pub fn parse_upload_date(d: &str) -> anyhow::Result<i64> {
let (year, month, day) = (&d[0..4], &d[4..6], &d[6..8]);
let (year, month, day) = (
year.parse().context("parsing year")?,
@@ -139,5 +139,5 @@ pub fn parse_upload_date(d: &str) -> anyhow::Result<DateTime<Utc>> {
p.hour_mod_12 = Some(0);
p.minute = Some(0);
p.second = Some(0);
- Ok(p.to_datetime_with_timezone(&Utc)?)
+ Ok(p.to_datetime_with_timezone(&Utc)?.timestamp_millis())
}
diff --git a/import/src/lib.rs b/import/src/lib.rs
index 692cf7f..a4d1611 100644
--- a/import/src/lib.rs
+++ b/import/src/lib.rs
@@ -13,7 +13,7 @@ use async_recursion::async_recursion;
use futures::{executor::block_on, stream::FuturesUnordered, StreamExt};
use jellybase::{
cache::{async_cache_file, cache_memory},
- database::Database,
+ database::{DataAcid, ReadableTable, Ser, T_NODE, T_NODE_IMPORT},
federation::Federation,
AssetLocationExt, CONF,
};
@@ -42,32 +42,54 @@ use tokio::{io::AsyncWriteExt, sync::Semaphore, task::spawn_blocking};
static IMPORT_SEM: LazyLock<Semaphore> = LazyLock::new(|| Semaphore::new(1));
-pub async fn import(db: &Database, fed: &Federation) -> anyhow::Result<()> {
+pub async fn import(db: &DataAcid, fed: &Federation) -> anyhow::Result<()> {
let permit = IMPORT_SEM.try_acquire()?;
- if !db.node_import.is_empty() {
- info!("clearing temporary node tree from an aborted last import...");
- db.node_import.clear()?;
+
+ {
+ let txn = db.inner.begin_write()?;
+ let mut table = txn.open_table(T_NODE_IMPORT)?;
+ if !table.is_empty()? {
+ info!("clearing temporary node tree from an aborted last import...");
+ table.drain::<&str>(..)?;
+ }
+ drop(table);
+ txn.commit()?;
}
info!("loading sources...");
import_path(CONF.library_path.clone(), vec![], db, fed)
.await
.context("indexing")?;
info!("removing old nodes...");
- db.node.clear()?;
+ {
+ let txn = db.inner.begin_write()?;
+ let mut table = txn.open_table(T_NODE)?;
+ table.drain::<&str>(..)?;
+ drop(table);
+ txn.commit()?;
+ }
info!("merging nodes...");
merge_nodes(db).context("merging nodes")?;
info!("generating paths...");
generate_node_paths(db).context("generating paths")?;
info!("clearing temporary node tree...");
- db.node_import.clear()?;
+ {
+ let txn = db.inner.begin_write()?;
+ let mut table = txn.open_table(T_NODE_IMPORT)?;
+ table.drain::<&str>(..)?;
+ drop(table);
+ txn.commit()?;
+ }
info!("import completed");
drop(permit);
Ok(())
}
-pub fn merge_nodes(db: &Database) -> anyhow::Result<()> {
- for r in db.node_import.iter() {
- let (id, mut nodes) = r?;
+pub fn merge_nodes(db: &DataAcid) -> anyhow::Result<()> {
+ let txn_read = db.inner.begin_read()?;
+ let t_node_import = txn_read.open_table(T_NODE_IMPORT)?;
+ for r in t_node_import.iter()? {
+ let (id, nodes) = r?;
+ let mut nodes = nodes.value().0;
nodes.sort_by(|(x, _), (y, _)| compare_index_path(x, y));
@@ -77,27 +99,36 @@ pub fn merge_nodes(db: &Database) -> anyhow::Result<()> {
.reduce(|x, y| merge_node(x, y))
.unwrap();
- node.public.id = Some(id.clone());
+ node.public.id = Some(id.value().to_owned());
node.public.path = vec![]; // will be reconstructed in the next pass
- db.node.insert(&id, &node)?;
+ {
+ let txn_write = db.inner.begin_write()?;
+ let mut t_node = txn_write.open_table(T_NODE)?;
+ t_node.insert(id.value(), Ser(node))?;
+ drop(t_node);
+ txn_write.commit()?;
+ }
}
Ok(())
}
-pub fn generate_node_paths(db: &Database) -> anyhow::Result<()> {
- fn traverse(db: &Database, c: String, mut path: Vec<String>) -> anyhow::Result<()> {
- let node = db
- .node
- .update_and_fetch(&c, |mut nc| {
- if let Some(nc) = &mut nc {
- if nc.public.path.is_empty() {
- nc.public.path = path.clone();
- }
- }
- nc
- })?
- .ok_or(anyhow!("node {c:?} missing"))?;
+pub fn generate_node_paths(db: &DataAcid) -> anyhow::Result<()> {
+ fn traverse(db: &DataAcid, c: String, mut path: Vec<String>) -> anyhow::Result<()> {
+ let node = {
+ let txn = db.inner.begin_write()?;
+ let table = txn.open_table(T_NODE)?;
+
+ let mut node = table.get(&*c)?.ok_or(anyhow!("your mum"))?.value().0;
+
+ if node.public.path.is_empty() {
+ node.public.path = path.clone();
+ }
+
+ drop(table);
+ txn.commit()?;
+ node
+ };
path.push(c);
for c in node.public.children {
@@ -126,7 +157,7 @@ fn compare_index_path(x: &[usize], y: &[usize]) -> Ordering {
pub async fn import_path(
path: PathBuf,
index_path: Vec<usize>,
- db: &Database,
+ db: &DataAcid,
fed: &Federation,
) -> anyhow::Result<()> {
if path.is_dir() {
@@ -190,15 +221,19 @@ async fn process_source(
s: ImportSource,
path: &Path,
index_path: &[usize],
- db: &Database,
+ db: &DataAcid,
fed: &Federation,
) -> anyhow::Result<()> {
- let insert_node = move |id: &String, n: Node| -> anyhow::Result<()> {
- db.node_import.fetch_and_update(id, |l| {
- let mut l = l.unwrap_or_default();
- l.push((index_path.to_vec(), n.clone()));
- Some(l)
- })?;
+ let insert_node = move |id: &str, n: Node| -> anyhow::Result<()> {
+ let txn = db.inner.begin_write()?;
+ let mut table = txn.open_table(T_NODE_IMPORT)?;
+
+ let mut node = table.get(id)?.map(|a| a.value().0).unwrap_or_default();
+ node.push((index_path.to_vec(), n.clone()));
+ table.insert(id, Ser(node))?;
+
+ drop(table);
+ txn.commit()?;
Ok(())
};
match s {
@@ -497,16 +532,20 @@ static SEM_REMOTE_IMPORT: Semaphore = Semaphore::const_new(16);
async fn import_remote(
id: String,
host: &str,
- db: &Database,
+ db: &DataAcid,
session: &Arc<Session>,
index_path: &[usize],
) -> anyhow::Result<()> {
- let insert_node = move |id: &String, n: Node| -> anyhow::Result<()> {
- db.node_import.fetch_and_update(id, |l| {
- let mut l = l.unwrap_or_default();
- l.push((index_path.to_vec(), n.clone()));
- Some(l)
- })?;
+ let insert_node = move |id: &str, n: Node| -> anyhow::Result<()> {
+ let txn = db.inner.begin_write()?;
+ let mut table = txn.open_table(T_NODE_IMPORT)?;
+
+ let mut node = table.get(id)?.map(|a| a.value().0).unwrap_or_default();
+ node.push((index_path.to_vec(), n.clone()));
+ table.insert(id, Ser(node))?;
+
+ drop(table);
+ txn.commit()?;
Ok(())
};
let _permit = SEM_REMOTE_IMPORT.acquire().await.unwrap();
diff --git a/import/src/tmdb.rs b/import/src/tmdb.rs
index 37447e6..95ebef4 100644
--- a/import/src/tmdb.rs
+++ b/import/src/tmdb.rs
@@ -5,7 +5,7 @@
*/
use anyhow::Context;
use bincode::{Decode, Encode};
-use jellycommon::chrono::{format::Parsed, DateTime, Utc};
+use jellycommon::chrono::{format::Parsed, Utc};
use log::info;
use serde::Deserialize;
@@ -115,7 +115,7 @@ pub async fn tmdb_image(path: &str) -> anyhow::Result<Vec<u8>> {
Ok(res.bytes().await?.to_vec())
}
-pub fn parse_release_date(d: &str) -> anyhow::Result<DateTime<Utc>> {
+pub fn parse_release_date(d: &str) -> anyhow::Result<i64> {
let (year, month, day) = (&d[0..4], &d[5..7], &d[8..10]);
let (year, month, day) = (
year.parse().context("parsing year")?,
@@ -131,5 +131,5 @@ pub fn parse_release_date(d: &str) -> anyhow::Result<DateTime<Utc>> {
p.hour_mod_12 = Some(0);
p.minute = Some(0);
p.second = Some(0);
- Ok(p.to_datetime_with_timezone(&Utc)?)
+ Ok(p.to_datetime_with_timezone(&Utc)?.timestamp_millis())
}