/* This file is part of jellything (https://codeberg.org/metamuffin/jellything) which is licensed under the GNU Affero General Public License (version 3); see /COPYING. Copyright (C) 2025 metamuffin */ pub mod search; use anyhow::{Context, Result, anyhow, bail}; use bincode::{Decode, Encode, config::standard}; use jellycommon::{ Node, NodeID, user::{NodeUserData, User}, }; use log::info; use redb::{Durability, ReadableTable, StorageError, TableDefinition}; use search::NodeTextSearchIndex; use std::{ fs::create_dir_all, hash::{DefaultHasher, Hasher}, path::{Path, PathBuf}, str::FromStr, sync::Arc, time::SystemTime, }; use tantivy::{ DateTime, TantivyDocument, collector::{Count, TopDocs}, doc, query::QueryParser, schema::Value, }; const T_USER: TableDefinition<&str, Ser> = TableDefinition::new("user"); const T_USER_NODE: TableDefinition<(&str, [u8; 32]), Ser> = TableDefinition::new("user_node"); const T_INVITE: TableDefinition<&str, ()> = TableDefinition::new("invite"); const T_NODE: TableDefinition<[u8; 32], Ser> = TableDefinition::new("node"); const T_NODE_CHILDREN: TableDefinition<([u8; 32], [u8; 32]), ()> = TableDefinition::new("node_children"); const T_TAG_NODE: TableDefinition<(&str, [u8; 32]), ()> = TableDefinition::new("tag_node"); const T_NODE_EXTERNAL_ID: TableDefinition<(&str, &str), [u8; 32]> = TableDefinition::new("node_external_id"); const T_IMPORT_FILE_MTIME: TableDefinition<&[u8], u64> = TableDefinition::new("import_file_mtime"); const T_NODE_MTIME: TableDefinition<[u8; 32], u64> = TableDefinition::new("node_mtime"); const T_NODE_MEDIA_PATHS: TableDefinition<([u8; 32], &str), ()> = TableDefinition::new("node_media_paths"); #[derive(Clone)] pub struct Database { inner: Arc, text_search: Arc, } impl Database { pub fn open(path: &Path) -> Result { create_dir_all(path).context("creating database directory")?; info!("opening kv store..."); let db = redb::Database::create(path.join("data")).context("opening kv store")?; info!("opening node index..."); let ft_node = NodeTextSearchIndex::new(path).context("in node index")?; let r = Self { inner: db.into(), text_search: ft_node.into(), }; { // this creates all tables such that read operations on them do not fail. let txn = r.inner.begin_write()?; txn.open_table(T_INVITE)?; txn.open_table(T_USER)?; txn.open_table(T_USER_NODE)?; txn.open_table(T_NODE)?; txn.open_table(T_NODE_MTIME)?; txn.open_table(T_NODE_CHILDREN)?; txn.open_table(T_NODE_EXTERNAL_ID)?; txn.open_table(T_NODE_MEDIA_PATHS)?; txn.open_table(T_IMPORT_FILE_MTIME)?; txn.commit()?; } info!("ready"); Ok(r) } pub fn get_node_slug(&self, slug: &str) -> Result>> { self.get_node(NodeID::from_slug(slug)) } pub fn get_node(&self, id: NodeID) -> Result>> { let txn = self.inner.begin_read()?; let t_node = txn.open_table(T_NODE)?; if let Some(node) = t_node.get(id.0)? { Ok(Some(node.value().0.into())) } else { Ok(None) } } pub fn get_node_external_id(&self, platform: &str, eid: &str) -> Result> { let txn = self.inner.begin_read()?; let t_node_external_id = txn.open_table(T_NODE_EXTERNAL_ID)?; if let Some(id) = t_node_external_id.get((platform, eid))? { Ok(Some(NodeID(id.value()))) } else { Ok(None) } } pub fn get_node_children(&self, id: NodeID) -> Result> { let txn = self.inner.begin_read()?; let t_node_children = txn.open_table(T_NODE_CHILDREN)?; Ok(t_node_children .range((id.0, NodeID::MIN.0)..(id.0, NodeID::MAX.0))? .map(|r| r.map(|r| NodeID(r.0.value().1))) .collect::, StorageError>>()?) } pub fn get_tag_nodes(&self, tag: &str) -> Result> { let txn = self.inner.begin_read()?; let t_tag_node = txn.open_table(T_TAG_NODE)?; Ok(t_tag_node .range((tag, NodeID::MIN.0)..(tag, NodeID::MAX.0))? .map(|r| r.map(|r| NodeID(r.0.value().1))) .collect::, StorageError>>()?) } pub fn get_nodes_modified_since(&self, since: u64) -> Result> { let txn = self.inner.begin_read()?; let t_node_mtime = txn.open_table(T_NODE_MTIME)?; Ok(t_node_mtime .iter()? .flat_map(|r| r.map(|r| (NodeID(r.0.value()), r.1.value()))) .filter(|(_, mtime)| *mtime >= since) .map(|(id, _)| id) .collect()) } pub fn clear_nodes(&self) -> Result<()> { let mut txn = self.inner.begin_write()?; let mut t_node = txn.open_table(T_NODE)?; let mut t_node_mtime = txn.open_table(T_NODE_MTIME)?; let mut t_node_children = txn.open_table(T_NODE_CHILDREN)?; let mut t_node_external_id = txn.open_table(T_NODE_EXTERNAL_ID)?; let mut t_import_file_mtime = txn.open_table(T_IMPORT_FILE_MTIME)?; let mut t_node_media_paths = txn.open_table(T_NODE_MEDIA_PATHS)?; t_node.retain(|_, _| false)?; t_node_mtime.retain(|_, _| false)?; t_node_children.retain(|_, _| false)?; t_node_external_id.retain(|_, _| false)?; t_import_file_mtime.retain(|_, _| false)?; t_node_media_paths.retain(|_, _| false)?; drop(( t_node, t_node_mtime, t_node_children, t_node_external_id, t_import_file_mtime, t_node_media_paths, )); txn.set_durability(Durability::Eventual); txn.commit()?; Ok(()) } pub fn get_node_udata(&self, id: NodeID, username: &str) -> Result> { let txn = self.inner.begin_read()?; let t_node = txn.open_table(T_USER_NODE)?; if let Some(node) = t_node.get((username, id.0))? { Ok(Some(node.value().0)) } else { Ok(None) } } pub fn update_node_init( &self, id: NodeID, update: impl FnOnce(&mut Node) -> Result<()>, ) -> Result<()> { let time = SystemTime::now() .duration_since(SystemTime::UNIX_EPOCH) .unwrap() .as_secs(); let mut txn = self.inner.begin_write()?; let mut t_node = txn.open_table(T_NODE)?; let mut t_node_mtime = txn.open_table(T_NODE_MTIME)?; let mut t_node_children = txn.open_table(T_NODE_CHILDREN)?; let mut t_node_external_id = txn.open_table(T_NODE_EXTERNAL_ID)?; let mut t_tag_node = txn.open_table(T_TAG_NODE)?; let mut node = t_node.get(id.0)?.map(|v| v.value().0).unwrap_or_default(); let mut dh_before = HashWriter(DefaultHasher::new()); bincode::encode_into_writer(&node, &mut dh_before, standard()).unwrap(); update(&mut node)?; let mut dh_after = HashWriter(DefaultHasher::new()); bincode::encode_into_writer(&node, &mut dh_after, standard()).unwrap(); if dh_before.0.finish() == dh_after.0.finish() { return Ok(()); } for parent in &node.parents { t_node_children.insert((parent.0, id.0), ())?; } for (pl, eid) in &node.external_ids { t_node_external_id.insert((pl.as_str(), eid.as_str()), id.0)?; } for tag in &node.tags { t_tag_node.insert((tag.as_str(), id.0), ())?; } t_node.insert(&id.0, Ser(node))?; t_node_mtime.insert(&id.0, time)?; drop(( t_node, t_node_mtime, t_node_children, t_node_external_id, t_tag_node, )); txn.set_durability(Durability::Eventual); txn.commit()?; Ok(()) } pub fn get_node_media_paths(&self, id: NodeID) -> Result> { let txn = self.inner.begin_read()?; let table = txn.open_table(T_NODE_MEDIA_PATHS)?; let mut paths = Vec::new(); // TODO fix this for p in table.range((id.0, "\0")..(id.0, "\x7f"))? { paths.push(PathBuf::from_str(p?.0.value().1)?); } Ok(paths) } pub fn insert_node_media_path(&self, id: NodeID, path: &Path) -> Result<()> { let txn = self.inner.begin_write()?; let mut table = txn.open_table(T_NODE_MEDIA_PATHS)?; table.insert((id.0, path.to_str().unwrap()), ())?; drop(table); txn.commit()?; Ok(()) } pub fn update_node_udata( &self, node: NodeID, username: &str, update: impl FnOnce(&mut NodeUserData) -> Result<()>, ) -> Result<()> { let txn = self.inner.begin_write()?; let mut user_nodes = txn.open_table(T_USER_NODE)?; let mut udata = user_nodes .get((username, node.0))? .map(|x| x.value().0) .unwrap_or_default(); update(&mut udata)?; user_nodes.insert((username, node.0), Ser(udata))?; drop(user_nodes); txn.commit()?; Ok(()) } pub fn get_user(&self, username: &str) -> Result> { let txn = self.inner.begin_read()?; let t_user = txn.open_table(T_USER)?; if let Some(user) = t_user.get(username)? { Ok(Some(user.value().0)) } else { Ok(None) } } pub fn update_user( &self, username: &str, update: impl FnOnce(&mut User) -> Result<()>, ) -> Result<()> { let txn = self.inner.begin_write()?; let mut users = txn.open_table(T_USER)?; let mut user = users .get(username)? .ok_or(anyhow!("user does not exist"))? .value() .0; update(&mut user)?; users.insert(&username, Ser(user))?; drop(users); txn.commit()?; Ok(()) } pub fn delete_user(&self, username: &str) -> Result { let txn = self.inner.begin_write()?; let mut table = txn.open_table(T_USER)?; let r = table.remove(username)?.is_some(); drop(table); txn.commit()?; Ok(r) } pub fn list_users(&self) -> Result> { let txn = self.inner.begin_read()?; let table = txn.open_table(T_USER)?; let i = table .iter()? .map(|a| { let (_, y) = a.unwrap(); // TODO y.value().0 }) .collect::>(); drop(table); Ok(i) } pub fn list_invites(&self) -> Result> { let txn = self.inner.begin_read()?; let table = txn.open_table(T_INVITE)?; let i = table .iter()? .map(|a| { let (x, _) = a.unwrap(); x.value().to_owned() }) .collect::>(); drop(table); Ok(i) } pub fn create_invite(&self, inv: &str) -> Result<()> { let txn = self.inner.begin_write()?; let mut table = txn.open_table(T_INVITE)?; table.insert(inv, ())?; drop(table); txn.commit()?; Ok(()) } pub fn delete_invite(&self, inv: &str) -> Result { let txn = self.inner.begin_write()?; let mut table = txn.open_table(T_INVITE)?; let r = table.remove(inv)?.is_some(); drop(table); txn.commit()?; Ok(r) } pub fn register_user(&self, invite: &str, username: &str, user: User) -> Result<()> { let txn = self.inner.begin_write()?; let mut invites = txn.open_table(T_INVITE)?; let mut users = txn.open_table(T_USER)?; if invites.remove(invite)?.is_none() { bail!("invitation invalid"); } let prev_user = users.insert(username, Ser(user))?.map(|x| x.value().0); if prev_user.is_some() { bail!("username taken"); } drop(users); drop(invites); txn.commit()?; Ok(()) } pub fn list_nodes_with_udata(&self, username: &str) -> Result, NodeUserData)>> { let txn = self.inner.begin_read()?; let nodes = txn.open_table(T_NODE)?; let node_users = txn.open_table(T_USER_NODE)?; let i = nodes .iter()? .map(|a| { let (x, y) = a.unwrap(); let (x, y) = (x.value().to_owned(), Arc::new(y.value().0)); let z = node_users .get(&(username, x)) .unwrap() .map(|z| z.value().0) .unwrap_or_default(); (y, z) }) .collect::>(); drop(nodes); Ok(i) } pub fn search(&self, query: &str, limit: usize, offset: usize) -> Result<(usize, Vec)> { let query = QueryParser::for_index( &self.text_search.index, vec![self.text_search.title, self.text_search.description], ) .parse_query(query) .context("parsing query")?; let searcher = self.text_search.reader.searcher(); let sres = searcher.search(&query, &TopDocs::with_limit(limit).and_offset(offset))?; let scount = searcher.search(&query, &Count)?; let mut results = Vec::new(); for (_, daddr) in sres { let doc: TantivyDocument = searcher.doc(daddr)?; let id = doc .get_first(self.text_search.id) .unwrap() .as_bytes() .unwrap(); let id = NodeID(id.try_into().unwrap()); results.push(id); } Ok((scount, results)) } pub fn search_create_index(&self) -> Result<()> { let mut w = self.text_search.writer.write().unwrap(); w.delete_all_documents()?; let txn = self.inner.begin_read()?; let nodes = txn.open_table(T_NODE)?; for node in nodes.iter()? { let (x, y) = node?; let (id, node) = (x.value().to_owned(), y.value().0); w.add_document(doc!( self.text_search.id => id.to_vec(), self.text_search.title => node.title.unwrap_or_default(), self.text_search.description => node.description.unwrap_or_default(), self.text_search.releasedate => DateTime::from_timestamp_millis(node.release_date.unwrap_or_default()), self.text_search.f_index => node.index.unwrap_or_default() as u64, ))?; } w.commit()?; Ok(()) } pub fn create_admin_user(&self, username: &str, password_hash: Vec) -> Result<()> { let txn = self.inner.begin_write().unwrap(); let mut users = txn.open_table(T_USER).unwrap(); let admin = users.get(username).unwrap().map(|x| x.value().0); users .insert( username, Ser(User { admin: true, name: username.to_owned(), password: password_hash, ..admin.unwrap_or_else(|| User { display_name: "Admin".to_string(), ..Default::default() }) }), ) .unwrap(); drop(users); txn.commit().unwrap(); Ok(()) } pub fn get_import_file_mtime(&self, path: &Path) -> Result> { let bytes = path.as_os_str().as_encoded_bytes(); let txn = self.inner.begin_read()?; let table = txn.open_table(T_IMPORT_FILE_MTIME)?; if let Some(v) = table.get(bytes)? { Ok(Some(v.value())) } else { Ok(None) } } pub fn set_import_file_mtime(&self, path: &Path, mtime: u64) -> Result<()> { let bytes = path.as_os_str().as_encoded_bytes(); let txn = self.inner.begin_write()?; let mut table = txn.open_table(T_IMPORT_FILE_MTIME)?; table.insert(bytes, mtime)?; drop(table); txn.commit()?; Ok(()) } } pub struct HashWriter(DefaultHasher); impl bincode::enc::write::Writer for HashWriter { fn write(&mut self, bytes: &[u8]) -> std::result::Result<(), bincode::error::EncodeError> { self.0.write(bytes); Ok(()) } } #[derive(Debug)] #[cfg(not(feature = "db_json"))] pub struct Ser(pub T); #[cfg(not(feature = "db_json"))] impl + std::fmt::Debug> redb::Value for Ser { type SelfType<'a> = Ser where Self: 'a; type AsBytes<'a> = Vec where Self: 'a; fn fixed_width() -> Option { None } fn from_bytes<'a>(data: &'a [u8]) -> Self::SelfType<'a> where Self: 'a, { Ser(bincode::decode_from_slice(data, bincode::config::legacy()) .unwrap() .0) } fn as_bytes<'a, 'b: 'a>(value: &'a Self::SelfType<'b>) -> Self::AsBytes<'a> where Self: 'a, Self: 'b, { bincode::encode_to_vec(&value.0, bincode::config::legacy()).unwrap() } fn type_name() -> redb::TypeName { redb::TypeName::new("bincode") } } #[derive(Debug)] #[cfg(feature = "db_json")] pub struct Ser(pub T); #[cfg(feature = "db_json")] impl Deserialize<'a> + std::fmt::Debug> redb::Value for Ser { type SelfType<'a> = Ser where Self: 'a; type AsBytes<'a> = Vec where Self: 'a; fn fixed_width() -> Option { None } fn from_bytes<'a>(data: &'a [u8]) -> Self::SelfType<'a> where Self: 'a, { Ser(serde_json::from_slice(data).unwrap()) } fn as_bytes<'a, 'b: 'a>(value: &'a Self::SelfType<'b>) -> Self::AsBytes<'a> where Self: 'a, Self: 'b, { serde_json::to_vec(&value.0).unwrap() } fn type_name() -> redb::TypeName { redb::TypeName::new("json") } }