From 35ae80f183904466667af73c7921b4ade399569a Mon Sep 17 00:00:00 2001 From: metamuffin Date: Wed, 30 Apr 2025 11:46:28 +0200 Subject: split base into asset_token and db --- database/src/lib.rs | 555 +++++++++++++++++++++++++++++++++++++++++++++++++ database/src/search.rs | 64 ++++++ 2 files changed, 619 insertions(+) create mode 100644 database/src/lib.rs create mode 100644 database/src/search.rs (limited to 'database/src') diff --git a/database/src/lib.rs b/database/src/lib.rs new file mode 100644 index 0000000..b84ddc9 --- /dev/null +++ b/database/src/lib.rs @@ -0,0 +1,555 @@ +/* + This file is part of jellything (https://codeberg.org/metamuffin/jellything) + which is licensed under the GNU Affero General Public License (version 3); see /COPYING. + Copyright (C) 2025 metamuffin +*/ +pub mod search; + +use anyhow::{Context, Result, anyhow, bail}; +use bincode::{Decode, Encode, config::standard}; +use jellycommon::{ + Node, NodeID, + user::{NodeUserData, User}, +}; +use log::info; +use redb::{Durability, ReadableTable, StorageError, TableDefinition}; +use search::NodeTextSearchIndex; +use std::{ + fs::create_dir_all, + hash::{DefaultHasher, Hasher}, + path::{Path, PathBuf}, + str::FromStr, + sync::Arc, + time::SystemTime, +}; +use tantivy::{ + DateTime, TantivyDocument, + collector::{Count, TopDocs}, + doc, + query::QueryParser, + schema::Value, +}; + +const T_USER: TableDefinition<&str, Ser> = TableDefinition::new("user"); +const T_USER_NODE: TableDefinition<(&str, [u8; 32]), Ser> = + TableDefinition::new("user_node"); +const T_INVITE: TableDefinition<&str, ()> = TableDefinition::new("invite"); +const T_NODE: TableDefinition<[u8; 32], Ser> = TableDefinition::new("node"); +const T_NODE_CHILDREN: TableDefinition<([u8; 32], [u8; 32]), ()> = + TableDefinition::new("node_children"); +const T_TAG_NODE: TableDefinition<(&str, [u8; 32]), ()> = TableDefinition::new("tag_node"); +const T_NODE_EXTERNAL_ID: TableDefinition<(&str, &str), [u8; 32]> = + TableDefinition::new("node_external_id"); +const T_IMPORT_FILE_MTIME: TableDefinition<&[u8], u64> = TableDefinition::new("import_file_mtime"); +const T_NODE_MTIME: TableDefinition<[u8; 32], u64> = TableDefinition::new("node_mtime"); +const T_NODE_MEDIA_PATHS: TableDefinition<([u8; 32], &str), ()> = + TableDefinition::new("node_media_paths"); + +#[derive(Clone)] +pub struct Database { + inner: Arc, + text_search: Arc, +} + +impl Database { + pub fn open(path: &Path) -> Result { + create_dir_all(path).context("creating database directory")?; + info!("opening kv store..."); + let db = redb::Database::create(path.join("data")).context("opening kv store")?; + info!("opening node index..."); + let ft_node = NodeTextSearchIndex::new(path).context("in node index")?; + let r = Self { + inner: db.into(), + text_search: ft_node.into(), + }; + + { + // this creates all tables such that read operations on them do not fail. + let txn = r.inner.begin_write()?; + txn.open_table(T_INVITE)?; + txn.open_table(T_USER)?; + txn.open_table(T_USER_NODE)?; + txn.open_table(T_NODE)?; + txn.open_table(T_NODE_MTIME)?; + txn.open_table(T_NODE_CHILDREN)?; + txn.open_table(T_NODE_EXTERNAL_ID)?; + txn.open_table(T_NODE_MEDIA_PATHS)?; + txn.open_table(T_IMPORT_FILE_MTIME)?; + txn.commit()?; + } + + info!("ready"); + Ok(r) + } + + pub fn get_node_slug(&self, slug: &str) -> Result>> { + self.get_node(NodeID::from_slug(slug)) + } + + pub fn get_node(&self, id: NodeID) -> Result>> { + let txn = self.inner.begin_read()?; + let t_node = txn.open_table(T_NODE)?; + if let Some(node) = t_node.get(id.0)? { + Ok(Some(node.value().0.into())) + } else { + Ok(None) + } + } + pub fn get_node_external_id(&self, platform: &str, eid: &str) -> Result> { + let txn = self.inner.begin_read()?; + let t_node_external_id = txn.open_table(T_NODE_EXTERNAL_ID)?; + if let Some(id) = t_node_external_id.get((platform, eid))? { + Ok(Some(NodeID(id.value()))) + } else { + Ok(None) + } + } + pub fn get_node_children(&self, id: NodeID) -> Result> { + let txn = self.inner.begin_read()?; + let t_node_children = txn.open_table(T_NODE_CHILDREN)?; + Ok(t_node_children + .range((id.0, NodeID::MIN.0)..(id.0, NodeID::MAX.0))? + .map(|r| r.map(|r| NodeID(r.0.value().1))) + .collect::, StorageError>>()?) + } + pub fn get_tag_nodes(&self, tag: &str) -> Result> { + let txn = self.inner.begin_read()?; + let t_tag_node = txn.open_table(T_TAG_NODE)?; + Ok(t_tag_node + .range((tag, NodeID::MIN.0)..(tag, NodeID::MAX.0))? + .map(|r| r.map(|r| NodeID(r.0.value().1))) + .collect::, StorageError>>()?) + } + pub fn get_nodes_modified_since(&self, since: u64) -> Result> { + let txn = self.inner.begin_read()?; + let t_node_mtime = txn.open_table(T_NODE_MTIME)?; + Ok(t_node_mtime + .iter()? + .flat_map(|r| r.map(|r| (NodeID(r.0.value()), r.1.value()))) + .filter(|(_, mtime)| *mtime >= since) + .map(|(id, _)| id) + .collect()) + } + + pub fn clear_nodes(&self) -> Result<()> { + let mut txn = self.inner.begin_write()?; + let mut t_node = txn.open_table(T_NODE)?; + let mut t_node_mtime = txn.open_table(T_NODE_MTIME)?; + let mut t_node_children = txn.open_table(T_NODE_CHILDREN)?; + let mut t_node_external_id = txn.open_table(T_NODE_EXTERNAL_ID)?; + let mut t_import_file_mtime = txn.open_table(T_IMPORT_FILE_MTIME)?; + let mut t_node_media_paths = txn.open_table(T_NODE_MEDIA_PATHS)?; + t_node.retain(|_, _| false)?; + t_node_mtime.retain(|_, _| false)?; + t_node_children.retain(|_, _| false)?; + t_node_external_id.retain(|_, _| false)?; + t_import_file_mtime.retain(|_, _| false)?; + t_node_media_paths.retain(|_, _| false)?; + drop(( + t_node, + t_node_mtime, + t_node_children, + t_node_external_id, + t_import_file_mtime, + t_node_media_paths, + )); + txn.set_durability(Durability::Eventual); + txn.commit()?; + Ok(()) + } + + pub fn get_node_udata(&self, id: NodeID, username: &str) -> Result> { + let txn = self.inner.begin_read()?; + let t_node = txn.open_table(T_USER_NODE)?; + if let Some(node) = t_node.get((username, id.0))? { + Ok(Some(node.value().0)) + } else { + Ok(None) + } + } + + pub fn update_node_init( + &self, + id: NodeID, + update: impl FnOnce(&mut Node) -> Result<()>, + ) -> Result<()> { + let time = SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .unwrap() + .as_secs(); + let mut txn = self.inner.begin_write()?; + let mut t_node = txn.open_table(T_NODE)?; + let mut t_node_mtime = txn.open_table(T_NODE_MTIME)?; + let mut t_node_children = txn.open_table(T_NODE_CHILDREN)?; + let mut t_node_external_id = txn.open_table(T_NODE_EXTERNAL_ID)?; + let mut t_tag_node = txn.open_table(T_TAG_NODE)?; + let mut node = t_node.get(id.0)?.map(|v| v.value().0).unwrap_or_default(); + + let mut dh_before = HashWriter(DefaultHasher::new()); + bincode::encode_into_writer(&node, &mut dh_before, standard()).unwrap(); + update(&mut node)?; + let mut dh_after = HashWriter(DefaultHasher::new()); + bincode::encode_into_writer(&node, &mut dh_after, standard()).unwrap(); + + if dh_before.0.finish() == dh_after.0.finish() { + return Ok(()); + } + + for parent in &node.parents { + t_node_children.insert((parent.0, id.0), ())?; + } + for (pl, eid) in &node.external_ids { + t_node_external_id.insert((pl.as_str(), eid.as_str()), id.0)?; + } + for tag in &node.tags { + t_tag_node.insert((tag.as_str(), id.0), ())?; + } + t_node.insert(&id.0, Ser(node))?; + t_node_mtime.insert(&id.0, time)?; + drop(( + t_node, + t_node_mtime, + t_node_children, + t_node_external_id, + t_tag_node, + )); + txn.set_durability(Durability::Eventual); + txn.commit()?; + Ok(()) + } + pub fn get_node_media_paths(&self, id: NodeID) -> Result> { + let txn = self.inner.begin_read()?; + let table = txn.open_table(T_NODE_MEDIA_PATHS)?; + let mut paths = Vec::new(); + // TODO fix this + for p in table.range((id.0, "\0")..(id.0, "\x7f"))? { + paths.push(PathBuf::from_str(p?.0.value().1)?); + } + Ok(paths) + } + pub fn insert_node_media_path(&self, id: NodeID, path: &Path) -> Result<()> { + let txn = self.inner.begin_write()?; + let mut table = txn.open_table(T_NODE_MEDIA_PATHS)?; + table.insert((id.0, path.to_str().unwrap()), ())?; + drop(table); + txn.commit()?; + Ok(()) + } + + pub fn update_node_udata( + &self, + node: NodeID, + username: &str, + update: impl FnOnce(&mut NodeUserData) -> Result<()>, + ) -> Result<()> { + let txn = self.inner.begin_write()?; + let mut user_nodes = txn.open_table(T_USER_NODE)?; + + let mut udata = user_nodes + .get((username, node.0))? + .map(|x| x.value().0) + .unwrap_or_default(); + + update(&mut udata)?; + + user_nodes.insert((username, node.0), Ser(udata))?; + drop(user_nodes); + txn.commit()?; + Ok(()) + } + + pub fn get_user(&self, username: &str) -> Result> { + let txn = self.inner.begin_read()?; + let t_user = txn.open_table(T_USER)?; + if let Some(user) = t_user.get(username)? { + Ok(Some(user.value().0)) + } else { + Ok(None) + } + } + pub fn update_user( + &self, + username: &str, + update: impl FnOnce(&mut User) -> Result<()>, + ) -> Result<()> { + let txn = self.inner.begin_write()?; + let mut users = txn.open_table(T_USER)?; + let mut user = users + .get(username)? + .ok_or(anyhow!("user does not exist"))? + .value() + .0; + update(&mut user)?; + users.insert(&username, Ser(user))?; + drop(users); + txn.commit()?; + + Ok(()) + } + pub fn delete_user(&self, username: &str) -> Result { + let txn = self.inner.begin_write()?; + let mut table = txn.open_table(T_USER)?; + let r = table.remove(username)?.is_some(); + drop(table); + txn.commit()?; + Ok(r) + } + pub fn list_users(&self) -> Result> { + let txn = self.inner.begin_read()?; + let table = txn.open_table(T_USER)?; + let i = table + .iter()? + .map(|a| { + let (_, y) = a.unwrap(); // TODO + y.value().0 + }) + .collect::>(); + drop(table); + Ok(i) + } + pub fn list_invites(&self) -> Result> { + let txn = self.inner.begin_read()?; + let table = txn.open_table(T_INVITE)?; + let i = table + .iter()? + .map(|a| { + let (x, _) = a.unwrap(); + x.value().to_owned() + }) + .collect::>(); + drop(table); + Ok(i) + } + pub fn create_invite(&self, inv: &str) -> Result<()> { + let txn = self.inner.begin_write()?; + let mut table = txn.open_table(T_INVITE)?; + table.insert(inv, ())?; + drop(table); + txn.commit()?; + Ok(()) + } + pub fn delete_invite(&self, inv: &str) -> Result { + let txn = self.inner.begin_write()?; + let mut table = txn.open_table(T_INVITE)?; + let r = table.remove(inv)?.is_some(); + drop(table); + txn.commit()?; + Ok(r) + } + pub fn register_user(&self, invite: &str, username: &str, user: User) -> Result<()> { + let txn = self.inner.begin_write()?; + let mut invites = txn.open_table(T_INVITE)?; + let mut users = txn.open_table(T_USER)?; + + if invites.remove(invite)?.is_none() { + bail!("invitation invalid"); + } + let prev_user = users.insert(username, Ser(user))?.map(|x| x.value().0); + if prev_user.is_some() { + bail!("username taken"); + } + + drop(users); + drop(invites); + txn.commit()?; + Ok(()) + } + pub fn list_nodes_with_udata(&self, username: &str) -> Result, NodeUserData)>> { + let txn = self.inner.begin_read()?; + let nodes = txn.open_table(T_NODE)?; + let node_users = txn.open_table(T_USER_NODE)?; + let i = nodes + .iter()? + .map(|a| { + let (x, y) = a.unwrap(); + let (x, y) = (x.value().to_owned(), Arc::new(y.value().0)); + let z = node_users + .get(&(username, x)) + .unwrap() + .map(|z| z.value().0) + .unwrap_or_default(); + (y, z) + }) + .collect::>(); + drop(nodes); + Ok(i) + } + pub fn search(&self, query: &str, limit: usize, offset: usize) -> Result<(usize, Vec)> { + let query = QueryParser::for_index( + &self.text_search.index, + vec![self.text_search.title, self.text_search.description], + ) + .parse_query(query) + .context("parsing query")?; + + let searcher = self.text_search.reader.searcher(); + let sres = searcher.search(&query, &TopDocs::with_limit(limit).and_offset(offset))?; + let scount = searcher.search(&query, &Count)?; + + let mut results = Vec::new(); + for (_, daddr) in sres { + let doc: TantivyDocument = searcher.doc(daddr)?; + let id = doc + .get_first(self.text_search.id) + .unwrap() + .as_bytes() + .unwrap(); + let id = NodeID(id.try_into().unwrap()); + results.push(id); + } + Ok((scount, results)) + } + + pub fn search_create_index(&self) -> Result<()> { + let mut w = self.text_search.writer.write().unwrap(); + w.delete_all_documents()?; + + let txn = self.inner.begin_read()?; + let nodes = txn.open_table(T_NODE)?; + for node in nodes.iter()? { + let (x, y) = node?; + let (id, node) = (x.value().to_owned(), y.value().0); + + w.add_document(doc!( + self.text_search.id => id.to_vec(), + self.text_search.title => node.title.unwrap_or_default(), + self.text_search.description => node.description.unwrap_or_default(), + self.text_search.releasedate => DateTime::from_timestamp_millis(node.release_date.unwrap_or_default()), + self.text_search.f_index => node.index.unwrap_or_default() as u64, + ))?; + } + + w.commit()?; + Ok(()) + } + + pub fn create_admin_user(&self, username: &str, password_hash: Vec) -> Result<()> { + let txn = self.inner.begin_write().unwrap(); + let mut users = txn.open_table(T_USER).unwrap(); + + let admin = users.get(username).unwrap().map(|x| x.value().0); + users + .insert( + username, + Ser(User { + admin: true, + name: username.to_owned(), + password: password_hash, + ..admin.unwrap_or_else(|| User { + display_name: "Admin".to_string(), + ..Default::default() + }) + }), + ) + .unwrap(); + + drop(users); + txn.commit().unwrap(); + Ok(()) + } + pub fn get_import_file_mtime(&self, path: &Path) -> Result> { + let bytes = path.as_os_str().as_encoded_bytes(); + let txn = self.inner.begin_read()?; + let table = txn.open_table(T_IMPORT_FILE_MTIME)?; + if let Some(v) = table.get(bytes)? { + Ok(Some(v.value())) + } else { + Ok(None) + } + } + pub fn set_import_file_mtime(&self, path: &Path, mtime: u64) -> Result<()> { + let bytes = path.as_os_str().as_encoded_bytes(); + let txn = self.inner.begin_write()?; + let mut table = txn.open_table(T_IMPORT_FILE_MTIME)?; + table.insert(bytes, mtime)?; + drop(table); + txn.commit()?; + Ok(()) + } +} + +pub struct HashWriter(DefaultHasher); +impl bincode::enc::write::Writer for HashWriter { + fn write(&mut self, bytes: &[u8]) -> std::result::Result<(), bincode::error::EncodeError> { + self.0.write(bytes); + Ok(()) + } +} + +#[derive(Debug)] +#[cfg(not(feature = "db_json"))] +pub struct Ser(pub T); +#[cfg(not(feature = "db_json"))] +impl redb::Value for Ser { + type SelfType<'a> + = Ser + where + Self: 'a; + type AsBytes<'a> + = Vec + where + Self: 'a; + + fn fixed_width() -> Option { + None + } + + fn from_bytes<'a>(data: &'a [u8]) -> Self::SelfType<'a> + where + Self: 'a, + { + Ser(bincode::decode_from_slice(data, bincode::config::legacy()) + .unwrap() + .0) + } + + fn as_bytes<'a, 'b: 'a>(value: &'a Self::SelfType<'b>) -> Self::AsBytes<'a> + where + Self: 'a, + Self: 'b, + { + bincode::encode_to_vec(&value.0, bincode::config::legacy()).unwrap() + } + + fn type_name() -> redb::TypeName { + redb::TypeName::new("bincode") + } +} + +#[derive(Debug)] +#[cfg(feature = "db_json")] +pub struct Ser(pub T); +#[cfg(feature = "db_json")] +impl Deserialize<'a> + std::fmt::Debug> redb::Value for Ser { + type SelfType<'a> + = Ser + where + Self: 'a; + type AsBytes<'a> + = Vec + where + Self: 'a; + + fn fixed_width() -> Option { + None + } + + fn from_bytes<'a>(data: &'a [u8]) -> Self::SelfType<'a> + where + Self: 'a, + { + Ser(serde_json::from_slice(data).unwrap()) + } + + fn as_bytes<'a, 'b: 'a>(value: &'a Self::SelfType<'b>) -> Self::AsBytes<'a> + where + Self: 'a, + Self: 'b, + { + serde_json::to_vec(&value.0).unwrap() + } + + fn type_name() -> redb::TypeName { + redb::TypeName::new("json") + } +} diff --git a/database/src/search.rs b/database/src/search.rs new file mode 100644 index 0000000..a7c074f --- /dev/null +++ b/database/src/search.rs @@ -0,0 +1,64 @@ +/* + This file is part of jellything (https://codeberg.org/metamuffin/jellything) + which is licensed under the GNU Affero General Public License (version 3); see /COPYING. + Copyright (C) 2025 metamuffin +*/ +use anyhow::Context; +use std::{fs::create_dir_all, path::Path, sync::RwLock}; +use tantivy::{ + DateOptions, Index, IndexReader, IndexWriter, ReloadPolicy, + directory::MmapDirectory, + schema::{FAST, Field, INDEXED, STORED, STRING, Schema, TEXT}, +}; + +pub struct NodeTextSearchIndex { + pub schema: Schema, + pub reader: IndexReader, + pub writer: RwLock, + pub index: Index, + pub id: Field, + pub title: Field, + pub releasedate: Field, + pub description: Field, + pub parent: Field, + pub f_index: Field, +} +impl NodeTextSearchIndex { + pub(crate) fn new(path: &Path) -> anyhow::Result { + let mut schema = Schema::builder(); + let id = schema.add_text_field("id", TEXT | STORED | FAST); + let title = schema.add_text_field("title", TEXT); + let description = schema.add_text_field("description", TEXT); + let parent = schema.add_text_field("parent", STRING | FAST); + let f_index = schema.add_u64_field("index", FAST); + let releasedate = schema.add_date_field( + "releasedate", + DateOptions::from(INDEXED) + .set_fast() + .set_precision(tantivy::DateTimePrecision::Seconds), + ); + let schema = schema.build(); + create_dir_all(path.join("node_index"))?; + let directory = + MmapDirectory::open(path.join("node_index")).context("opening index directory")?; + let index = Index::open_or_create(directory, schema.clone()).context("creating index")?; + let reader = index + .reader_builder() + .reload_policy(ReloadPolicy::OnCommitWithDelay) + .try_into() + .context("creating reader")?; + let writer = index.writer(30_000_000).context("creating writer")?; + Ok(Self { + index, + writer: writer.into(), + reader, + schema, + parent, + f_index, + releasedate, + id, + description, + title, + }) + } +} -- cgit v1.2.3-70-g09d2