diff options
| author | metamuffin <metamuffin@disroot.org> | 2025-12-15 15:09:37 +0100 |
|---|---|---|
| committer | metamuffin <metamuffin@disroot.org> | 2025-12-15 15:09:37 +0100 |
| commit | 0e48299889c3c2b81bf351ffe5da71e0bcd4c22a (patch) | |
| tree | 8a7ff2bd2330c206070b2062723ba471b2d62544 /database/src/lib.rs | |
| parent | 7552a4ff0e027334398d28d5687a339ad77c0871 (diff) | |
| download | jellything-0e48299889c3c2b81bf351ffe5da71e0bcd4c22a.tar jellything-0e48299889c3c2b81bf351ffe5da71e0bcd4c22a.tar.bz2 jellything-0e48299889c3c2b81bf351ffe5da71e0bcd4c22a.tar.zst | |
db
Diffstat (limited to 'database/src/lib.rs')
| -rw-r--r-- | database/src/lib.rs | 513 |
1 files changed, 30 insertions, 483 deletions
diff --git a/database/src/lib.rs b/database/src/lib.rs index 0e89873..828761e 100644 --- a/database/src/lib.rs +++ b/database/src/lib.rs @@ -3,499 +3,46 @@ which is licensed under the GNU Affero General Public License (version 3); see /COPYING. Copyright (C) 2025 metamuffin <metamuffin.org> */ -pub mod search; -use anyhow::{Context, Result, anyhow, bail}; -use jellycommon::{ - IdentifierType, Node, NodeID, - user::{NodeUserData, User}, -}; -use log::info; -use redb::{Durability, ReadableDatabase, ReadableTable, StorageError, TableDefinition}; -use search::NodeTextSearchIndex; -use serde::{Serialize, de::DeserializeOwned}; -use std::{ - fs::create_dir_all, - path::{Path, PathBuf}, - str::FromStr, - sync::Arc, - time::SystemTime, -}; -use tantivy::{ - DateTime, TantivyDocument, - collector::{Count, TopDocs}, - doc, - query::QueryParser, - schema::Value, +use anyhow::{Result, bail}; + +use crate::{ + backends::{KV, memory::Memory, redb::Redb, rocksdb::Rocksdb}, + indices::Index, }; +use std::{path::Path, sync::Arc}; -const T_USER: TableDefinition<&str, Ser<User>> = TableDefinition::new("user"); -const T_USER_NODE: TableDefinition<(&str, [u8; 32]), Ser<NodeUserData>> = - TableDefinition::new("user_node"); -const T_INVITE: TableDefinition<&str, ()> = TableDefinition::new("invite"); -const T_NODE: TableDefinition<[u8; 32], Ser<Node>> = TableDefinition::new("node"); -const T_NODE_CHILDREN: TableDefinition<([u8; 32], [u8; 32]), ()> = - TableDefinition::new("node_children"); -const T_TAG_NODE: TableDefinition<(&str, [u8; 32]), ()> = TableDefinition::new("tag_node"); -const T_NODE_IDENTIFIER: TableDefinition<(u8, &str), [u8; 32]> = TableDefinition::new("node_ids"); -const T_IMPORT_FILE_MTIME: TableDefinition<&[u8], u64> = TableDefinition::new("import_file_mtime"); -const T_NODE_MTIME: TableDefinition<[u8; 32], u64> = TableDefinition::new("node_mtime"); -const T_NODE_MEDIA_PATHS: TableDefinition<([u8; 32], &str), ()> = - TableDefinition::new("node_media_paths"); +pub mod backends; +pub mod indices; -#[derive(Clone)] pub struct Database { - inner: Arc<redb::Database>, - text_search: Arc<NodeTextSearchIndex>, + storage: Arc<dyn KV>, } impl Database { - pub fn open(path: &Path) -> Result<Self, anyhow::Error> { - create_dir_all(path).context("creating database directory")?; - info!("opening kv store..."); - let db = redb::Database::create(path.join("data")).context("opening kv store")?; - info!("opening node index..."); - let ft_node = NodeTextSearchIndex::new(path).context("in node index")?; - let r = Self { - inner: db.into(), - text_search: ft_node.into(), - }; - - { - // this creates all tables such that read operations on them do not fail. - let txn = r.inner.begin_write()?; - txn.open_table(T_INVITE)?; - txn.open_table(T_USER)?; - txn.open_table(T_USER_NODE)?; - txn.open_table(T_NODE)?; - txn.open_table(T_NODE_MTIME)?; - txn.open_table(T_NODE_CHILDREN)?; - txn.open_table(T_NODE_IDENTIFIER)?; - txn.open_table(T_NODE_MEDIA_PATHS)?; - txn.open_table(T_IMPORT_FILE_MTIME)?; - txn.commit()?; - } - - info!("ready"); - Ok(r) - } - - pub fn get_node_slug(&self, slug: &str) -> Result<Option<Arc<Node>>> { - self.get_node(NodeID::from_slug(slug)) - } - - pub fn get_node(&self, id: NodeID) -> Result<Option<Arc<Node>>> { - let txn = self.inner.begin_read()?; - let t_node = txn.open_table(T_NODE)?; - if let Some(node) = t_node.get(id.0)? { - Ok(Some(node.value().0.into())) - } else { - Ok(None) - } - } - pub fn get_node_by_identifier( - &self, - ty: IdentifierType, - value: &str, - ) -> Result<Option<NodeID>> { - let txn = self.inner.begin_read()?; - let t_node_external_id = txn.open_table(T_NODE_IDENTIFIER)?; - if let Some(id) = t_node_external_id.get((ty as u8, value))? { - Ok(Some(NodeID(id.value()))) - } else { - Ok(None) - } - } - pub fn get_node_children(&self, id: NodeID) -> Result<Vec<NodeID>> { - let txn = self.inner.begin_read()?; - let t_node_children = txn.open_table(T_NODE_CHILDREN)?; - Ok(t_node_children - .range((id.0, NodeID::MIN.0)..(id.0, NodeID::MAX.0))? - .map(|r| r.map(|r| NodeID(r.0.value().1))) - .collect::<Result<Vec<_>, StorageError>>()?) - } - pub fn get_tag_nodes(&self, tag: &str) -> Result<Vec<NodeID>> { - let txn = self.inner.begin_read()?; - let t_tag_node = txn.open_table(T_TAG_NODE)?; - Ok(t_tag_node - .range((tag, NodeID::MIN.0)..(tag, NodeID::MAX.0))? - .map(|r| r.map(|r| NodeID(r.0.value().1))) - .collect::<Result<Vec<_>, StorageError>>()?) - } - pub fn get_nodes_modified_since(&self, since: u64) -> Result<Vec<NodeID>> { - let txn = self.inner.begin_read()?; - let t_node_mtime = txn.open_table(T_NODE_MTIME)?; - Ok(t_node_mtime - .iter()? - .flat_map(|r| r.map(|r| (NodeID(r.0.value()), r.1.value()))) - .filter(|(_, mtime)| *mtime >= since) - .map(|(id, _)| id) - .collect()) - } - - pub fn clear_nodes(&self) -> Result<()> { - let mut txn = self.inner.begin_write()?; - let mut t_node = txn.open_table(T_NODE)?; - let mut t_node_mtime = txn.open_table(T_NODE_MTIME)?; - let mut t_node_children = txn.open_table(T_NODE_CHILDREN)?; - let mut t_node_external_id = txn.open_table(T_NODE_IDENTIFIER)?; - let mut t_import_file_mtime = txn.open_table(T_IMPORT_FILE_MTIME)?; - let mut t_node_media_paths = txn.open_table(T_NODE_MEDIA_PATHS)?; - t_node.retain(|_, _| false)?; - t_node_mtime.retain(|_, _| false)?; - t_node_children.retain(|_, _| false)?; - t_node_external_id.retain(|_, _| false)?; - t_import_file_mtime.retain(|_, _| false)?; - t_node_media_paths.retain(|_, _| false)?; - drop(( - t_node, - t_node_mtime, - t_node_children, - t_node_external_id, - t_import_file_mtime, - t_node_media_paths, - )); - txn.set_durability(Durability::Immediate)?; - txn.commit()?; - Ok(()) - } - - pub fn get_node_udata(&self, id: NodeID, username: &str) -> Result<Option<NodeUserData>> { - let txn = self.inner.begin_read()?; - let t_node = txn.open_table(T_USER_NODE)?; - if let Some(node) = t_node.get((username, id.0))? { - Ok(Some(node.value().0)) - } else { - Ok(None) - } - } - - pub fn update_node_init(&self, id: NodeID, update: impl FnOnce(&mut Node)) -> Result<()> { - let time = SystemTime::now() - .duration_since(SystemTime::UNIX_EPOCH) - .unwrap() - .as_secs(); - let mut txn = self.inner.begin_write()?; - let mut t_node = txn.open_table(T_NODE)?; - let mut t_node_mtime = txn.open_table(T_NODE_MTIME)?; - let mut t_node_children = txn.open_table(T_NODE_CHILDREN)?; - let mut t_node_external_id = txn.open_table(T_NODE_IDENTIFIER)?; - let mut t_tag_node = txn.open_table(T_TAG_NODE)?; - let mut node = t_node.get(id.0)?.map(|v| v.value().0).unwrap_or_default(); - - let dh_before = serde_json::to_vec(&node).unwrap(); - update(&mut node); - let dh_after = serde_json::to_vec(&node).unwrap(); - - if dh_before == dh_after { - return Ok(()); - } - - for parent in &node.parents { - t_node_children.insert((parent.0, id.0), ())?; - } - for (pl, eid) in &node.identifiers { - t_node_external_id.insert((*pl as u8, eid.as_str()), id.0)?; - } - for tag in &node.tags { - t_tag_node.insert((tag.as_str(), id.0), ())?; - } - t_node.insert(&id.0, Ser(node))?; - t_node_mtime.insert(&id.0, time)?; - drop(( - t_node, - t_node_mtime, - t_node_children, - t_node_external_id, - t_tag_node, - )); - txn.set_durability(Durability::Immediate)?; - txn.commit()?; - Ok(()) - } - pub fn get_node_media_paths(&self, id: NodeID) -> Result<Vec<PathBuf>> { - let txn = self.inner.begin_read()?; - let table = txn.open_table(T_NODE_MEDIA_PATHS)?; - let mut paths = Vec::new(); - // TODO fix this - for p in table.range((id.0, "\0")..(id.0, "\x7f"))? { - paths.push(PathBuf::from_str(p?.0.value().1)?); - } - Ok(paths) - } - pub fn insert_node_media_path(&self, id: NodeID, path: &Path) -> Result<()> { - let txn = self.inner.begin_write()?; - let mut table = txn.open_table(T_NODE_MEDIA_PATHS)?; - table.insert((id.0, path.to_str().unwrap()), ())?; - drop(table); - txn.commit()?; - Ok(()) - } - - pub fn update_node_udata( - &self, - node: NodeID, - username: &str, - update: impl FnOnce(&mut NodeUserData) -> Result<()>, - ) -> Result<()> { - let txn = self.inner.begin_write()?; - let mut user_nodes = txn.open_table(T_USER_NODE)?; - - let mut udata = user_nodes - .get((username, node.0))? - .map(|x| x.value().0) - .unwrap_or_default(); - - update(&mut udata)?; - - user_nodes.insert((username, node.0), Ser(udata))?; - drop(user_nodes); - txn.commit()?; - Ok(()) - } - - pub fn get_user(&self, username: &str) -> Result<Option<User>> { - let txn = self.inner.begin_read()?; - let t_user = txn.open_table(T_USER)?; - if let Some(user) = t_user.get(username)? { - Ok(Some(user.value().0)) - } else { - Ok(None) - } - } - pub fn update_user( - &self, - username: &str, - update: impl FnOnce(&mut User) -> Result<()>, - ) -> Result<()> { - let txn = self.inner.begin_write()?; - let mut users = txn.open_table(T_USER)?; - let mut user = users - .get(username)? - .ok_or(anyhow!("user does not exist"))? - .value() - .0; - update(&mut user)?; - users.insert(&username, Ser(user))?; - drop(users); - txn.commit()?; - - Ok(()) - } - pub fn delete_user(&self, username: &str) -> Result<bool> { - let txn = self.inner.begin_write()?; - let mut table = txn.open_table(T_USER)?; - let r = table.remove(username)?.is_some(); - drop(table); - txn.commit()?; - Ok(r) - } - pub fn list_users(&self) -> Result<Vec<User>> { - let txn = self.inner.begin_read()?; - let table = txn.open_table(T_USER)?; - let i = table - .iter()? - .map(|a| { - let (_, y) = a.unwrap(); // TODO - y.value().0 - }) - .collect::<Vec<_>>(); - drop(table); - Ok(i) - } - pub fn list_invites(&self) -> Result<Vec<String>> { - let txn = self.inner.begin_read()?; - let table = txn.open_table(T_INVITE)?; - let i = table - .iter()? - .map(|a| { - let (x, _) = a.unwrap(); - x.value().to_owned() - }) - .collect::<Vec<_>>(); - drop(table); - Ok(i) - } - pub fn create_invite(&self, inv: &str) -> Result<()> { - let txn = self.inner.begin_write()?; - let mut table = txn.open_table(T_INVITE)?; - table.insert(inv, ())?; - drop(table); - txn.commit()?; - Ok(()) - } - pub fn delete_invite(&self, inv: &str) -> Result<bool> { - let txn = self.inner.begin_write()?; - let mut table = txn.open_table(T_INVITE)?; - let r = table.remove(inv)?.is_some(); - drop(table); - txn.commit()?; - Ok(r) - } - pub fn register_user(&self, invite: &str, username: &str, user: User) -> Result<()> { - let txn = self.inner.begin_write()?; - let mut invites = txn.open_table(T_INVITE)?; - let mut users = txn.open_table(T_USER)?; - - if invites.remove(invite)?.is_none() { - bail!("invitation invalid"); - } - let prev_user = users.insert(username, Ser(user))?.map(|x| x.value().0); - if prev_user.is_some() { - bail!("username taken"); - } - - drop(users); - drop(invites); - txn.commit()?; - Ok(()) - } - pub fn list_nodes_with_udata(&self, username: &str) -> Result<Vec<(Arc<Node>, NodeUserData)>> { - let txn = self.inner.begin_read()?; - let nodes = txn.open_table(T_NODE)?; - let node_users = txn.open_table(T_USER_NODE)?; - let i = nodes - .iter()? - .map(|a| { - let (x, y) = a.unwrap(); - let (x, y) = (x.value().to_owned(), Arc::new(y.value().0)); - let z = node_users - .get(&(username, x)) - .unwrap() - .map(|z| z.value().0) - .unwrap_or_default(); - (y, z) - }) - .collect::<Vec<_>>(); - drop(nodes); - Ok(i) - } - pub fn search(&self, query: &str, limit: usize, offset: usize) -> Result<(usize, Vec<NodeID>)> { - let query = QueryParser::for_index( - &self.text_search.index, - vec![self.text_search.title, self.text_search.description], - ) - .parse_query(query) - .context("parsing query")?; - - let searcher = self.text_search.reader.searcher(); - let sres = searcher.search(&query, &TopDocs::with_limit(limit).and_offset(offset))?; - let scount = searcher.search(&query, &Count)?; - - let mut results = Vec::new(); - for (_, daddr) in sres { - let doc: TantivyDocument = searcher.doc(daddr)?; - let id = doc - .get_first(self.text_search.id) - .unwrap() - .as_bytes() - .unwrap(); - let id = NodeID(id.try_into().unwrap()); - results.push(id); - } - Ok((scount, results)) - } - - pub fn search_create_index(&self) -> Result<()> { - let mut w = self.text_search.writer.write().unwrap(); - w.delete_all_documents()?; - - let txn = self.inner.begin_read()?; - let nodes = txn.open_table(T_NODE)?; - for node in nodes.iter()? { - let (x, y) = node?; - let (id, node) = (x.value().to_owned(), y.value().0); - - w.add_document(doc!( - self.text_search.id => id.to_vec(), - self.text_search.title => node.title.unwrap_or_default(), - self.text_search.description => node.description.unwrap_or_default(), - self.text_search.releasedate => DateTime::from_timestamp_millis(node.release_date.unwrap_or_default()), - self.text_search.f_index => node.index.unwrap_or_default() as u64, - ))?; - } - - w.commit()?; - Ok(()) - } - - pub fn create_admin_user(&self, username: &str, password_hash: Vec<u8>) -> Result<()> { - let txn = self.inner.begin_write().unwrap(); - let mut users = txn.open_table(T_USER).unwrap(); - - let admin = users.get(username).unwrap().map(|x| x.value().0); - users - .insert( - username, - Ser(User { - admin: true, - name: username.to_owned(), - password: password_hash, - ..admin.unwrap_or_else(|| User { - display_name: "Admin".to_string(), - ..Default::default() - }) - }), - ) - .unwrap(); - - drop(users); - txn.commit().unwrap(); - Ok(()) - } - pub fn get_import_file_mtime(&self, path: &Path) -> Result<Option<u64>> { - let bytes = path.as_os_str().as_encoded_bytes(); - let txn = self.inner.begin_read()?; - let table = txn.open_table(T_IMPORT_FILE_MTIME)?; - if let Some(v) = table.get(bytes)? { - Ok(Some(v.value())) - } else { - Ok(None) - } - } - pub fn set_import_file_mtime(&self, path: &Path, mtime: u64) -> Result<()> { - let bytes = path.as_os_str().as_encoded_bytes(); - let txn = self.inner.begin_write()?; - let mut table = txn.open_table(T_IMPORT_FILE_MTIME)?; - table.insert(bytes, mtime)?; - drop(table); - txn.commit()?; - Ok(()) + pub fn new(driver: &str, path: &Path) -> Result<Self> { + Ok(Self { + storage: match driver { + "rocksdb" => Arc::new(Rocksdb::new(path)?), + "redb" => Arc::new(Redb::new(path)?), + "memory" => Arc::new(Memory::new()), + _ => bail!("unknown db driver"), + }, + }) } } -#[derive(Debug)] -pub struct Ser<T>(pub T); -impl<T: DeserializeOwned + Serialize + std::fmt::Debug> redb::Value for Ser<T> { - type SelfType<'a> - = Ser<T> - where - Self: 'a; - type AsBytes<'a> - = Vec<u8> - where - Self: 'a; - - fn fixed_width() -> Option<usize> { - None - } - - fn from_bytes<'a>(data: &'a [u8]) -> Self::SelfType<'a> - where - Self: 'a, - { - Ser(serde_json::from_slice(data).unwrap()) - } - - fn as_bytes<'a, 'b: 'a>(value: &'a Self::SelfType<'b>) -> Self::AsBytes<'a> - where - Self: 'a, - Self: 'b, - { - serde_json::to_vec(&value.0).unwrap() - } - - fn type_name() -> redb::TypeName { - redb::TypeName::new("json") +pub struct Table<T> { + id: u32, + indices: Vec<Box<dyn Index<T>>>, + db: Arc<dyn KV>, +} +impl<T> Table<T> { + pub fn new(db: &Database, id: u32) -> Self { + Self { + id, + indices: Vec::new(), + db: db.storage.clone(), + } } } |