aboutsummaryrefslogtreecommitdiff
path: root/database/src
diff options
context:
space:
mode:
authormetamuffin <metamuffin@disroot.org>2025-04-30 11:46:28 +0200
committermetamuffin <metamuffin@disroot.org>2025-04-30 11:46:28 +0200
commit35ae80f183904466667af73c7921b4ade399569a (patch)
tree3a7eec95debbd9ba292436ff989742b8a9b1dff4 /database/src
parentd6a039a10ac3c81d410beb9b648d29524ca1e278 (diff)
downloadjellything-35ae80f183904466667af73c7921b4ade399569a.tar
jellything-35ae80f183904466667af73c7921b4ade399569a.tar.bz2
jellything-35ae80f183904466667af73c7921b4ade399569a.tar.zst
split base into asset_token and db
Diffstat (limited to 'database/src')
-rw-r--r--database/src/lib.rs555
-rw-r--r--database/src/search.rs64
2 files changed, 619 insertions, 0 deletions
diff --git a/database/src/lib.rs b/database/src/lib.rs
new file mode 100644
index 0000000..b84ddc9
--- /dev/null
+++ b/database/src/lib.rs
@@ -0,0 +1,555 @@
+/*
+ This file is part of jellything (https://codeberg.org/metamuffin/jellything)
+ which is licensed under the GNU Affero General Public License (version 3); see /COPYING.
+ Copyright (C) 2025 metamuffin <metamuffin.org>
+*/
+pub mod search;
+
+use anyhow::{Context, Result, anyhow, bail};
+use bincode::{Decode, Encode, config::standard};
+use jellycommon::{
+ Node, NodeID,
+ user::{NodeUserData, User},
+};
+use log::info;
+use redb::{Durability, ReadableTable, StorageError, TableDefinition};
+use search::NodeTextSearchIndex;
+use std::{
+ fs::create_dir_all,
+ hash::{DefaultHasher, Hasher},
+ path::{Path, PathBuf},
+ str::FromStr,
+ sync::Arc,
+ time::SystemTime,
+};
+use tantivy::{
+ DateTime, TantivyDocument,
+ collector::{Count, TopDocs},
+ doc,
+ query::QueryParser,
+ schema::Value,
+};
+
+const T_USER: TableDefinition<&str, Ser<User>> = TableDefinition::new("user");
+const T_USER_NODE: TableDefinition<(&str, [u8; 32]), Ser<NodeUserData>> =
+ TableDefinition::new("user_node");
+const T_INVITE: TableDefinition<&str, ()> = TableDefinition::new("invite");
+const T_NODE: TableDefinition<[u8; 32], Ser<Node>> = TableDefinition::new("node");
+const T_NODE_CHILDREN: TableDefinition<([u8; 32], [u8; 32]), ()> =
+ TableDefinition::new("node_children");
+const T_TAG_NODE: TableDefinition<(&str, [u8; 32]), ()> = TableDefinition::new("tag_node");
+const T_NODE_EXTERNAL_ID: TableDefinition<(&str, &str), [u8; 32]> =
+ TableDefinition::new("node_external_id");
+const T_IMPORT_FILE_MTIME: TableDefinition<&[u8], u64> = TableDefinition::new("import_file_mtime");
+const T_NODE_MTIME: TableDefinition<[u8; 32], u64> = TableDefinition::new("node_mtime");
+const T_NODE_MEDIA_PATHS: TableDefinition<([u8; 32], &str), ()> =
+ TableDefinition::new("node_media_paths");
+
+#[derive(Clone)]
+pub struct Database {
+ inner: Arc<redb::Database>,
+ text_search: Arc<NodeTextSearchIndex>,
+}
+
+impl Database {
+ pub fn open(path: &Path) -> Result<Self, anyhow::Error> {
+ create_dir_all(path).context("creating database directory")?;
+ info!("opening kv store...");
+ let db = redb::Database::create(path.join("data")).context("opening kv store")?;
+ info!("opening node index...");
+ let ft_node = NodeTextSearchIndex::new(path).context("in node index")?;
+ let r = Self {
+ inner: db.into(),
+ text_search: ft_node.into(),
+ };
+
+ {
+ // this creates all tables such that read operations on them do not fail.
+ let txn = r.inner.begin_write()?;
+ txn.open_table(T_INVITE)?;
+ txn.open_table(T_USER)?;
+ txn.open_table(T_USER_NODE)?;
+ txn.open_table(T_NODE)?;
+ txn.open_table(T_NODE_MTIME)?;
+ txn.open_table(T_NODE_CHILDREN)?;
+ txn.open_table(T_NODE_EXTERNAL_ID)?;
+ txn.open_table(T_NODE_MEDIA_PATHS)?;
+ txn.open_table(T_IMPORT_FILE_MTIME)?;
+ txn.commit()?;
+ }
+
+ info!("ready");
+ Ok(r)
+ }
+
+ pub fn get_node_slug(&self, slug: &str) -> Result<Option<Arc<Node>>> {
+ self.get_node(NodeID::from_slug(slug))
+ }
+
+ pub fn get_node(&self, id: NodeID) -> Result<Option<Arc<Node>>> {
+ let txn = self.inner.begin_read()?;
+ let t_node = txn.open_table(T_NODE)?;
+ if let Some(node) = t_node.get(id.0)? {
+ Ok(Some(node.value().0.into()))
+ } else {
+ Ok(None)
+ }
+ }
+ pub fn get_node_external_id(&self, platform: &str, eid: &str) -> Result<Option<NodeID>> {
+ let txn = self.inner.begin_read()?;
+ let t_node_external_id = txn.open_table(T_NODE_EXTERNAL_ID)?;
+ if let Some(id) = t_node_external_id.get((platform, eid))? {
+ Ok(Some(NodeID(id.value())))
+ } else {
+ Ok(None)
+ }
+ }
+ pub fn get_node_children(&self, id: NodeID) -> Result<Vec<NodeID>> {
+ let txn = self.inner.begin_read()?;
+ let t_node_children = txn.open_table(T_NODE_CHILDREN)?;
+ Ok(t_node_children
+ .range((id.0, NodeID::MIN.0)..(id.0, NodeID::MAX.0))?
+ .map(|r| r.map(|r| NodeID(r.0.value().1)))
+ .collect::<Result<Vec<_>, StorageError>>()?)
+ }
+ pub fn get_tag_nodes(&self, tag: &str) -> Result<Vec<NodeID>> {
+ let txn = self.inner.begin_read()?;
+ let t_tag_node = txn.open_table(T_TAG_NODE)?;
+ Ok(t_tag_node
+ .range((tag, NodeID::MIN.0)..(tag, NodeID::MAX.0))?
+ .map(|r| r.map(|r| NodeID(r.0.value().1)))
+ .collect::<Result<Vec<_>, StorageError>>()?)
+ }
+ pub fn get_nodes_modified_since(&self, since: u64) -> Result<Vec<NodeID>> {
+ let txn = self.inner.begin_read()?;
+ let t_node_mtime = txn.open_table(T_NODE_MTIME)?;
+ Ok(t_node_mtime
+ .iter()?
+ .flat_map(|r| r.map(|r| (NodeID(r.0.value()), r.1.value())))
+ .filter(|(_, mtime)| *mtime >= since)
+ .map(|(id, _)| id)
+ .collect())
+ }
+
+ pub fn clear_nodes(&self) -> Result<()> {
+ let mut txn = self.inner.begin_write()?;
+ let mut t_node = txn.open_table(T_NODE)?;
+ let mut t_node_mtime = txn.open_table(T_NODE_MTIME)?;
+ let mut t_node_children = txn.open_table(T_NODE_CHILDREN)?;
+ let mut t_node_external_id = txn.open_table(T_NODE_EXTERNAL_ID)?;
+ let mut t_import_file_mtime = txn.open_table(T_IMPORT_FILE_MTIME)?;
+ let mut t_node_media_paths = txn.open_table(T_NODE_MEDIA_PATHS)?;
+ t_node.retain(|_, _| false)?;
+ t_node_mtime.retain(|_, _| false)?;
+ t_node_children.retain(|_, _| false)?;
+ t_node_external_id.retain(|_, _| false)?;
+ t_import_file_mtime.retain(|_, _| false)?;
+ t_node_media_paths.retain(|_, _| false)?;
+ drop((
+ t_node,
+ t_node_mtime,
+ t_node_children,
+ t_node_external_id,
+ t_import_file_mtime,
+ t_node_media_paths,
+ ));
+ txn.set_durability(Durability::Eventual);
+ txn.commit()?;
+ Ok(())
+ }
+
+ pub fn get_node_udata(&self, id: NodeID, username: &str) -> Result<Option<NodeUserData>> {
+ let txn = self.inner.begin_read()?;
+ let t_node = txn.open_table(T_USER_NODE)?;
+ if let Some(node) = t_node.get((username, id.0))? {
+ Ok(Some(node.value().0))
+ } else {
+ Ok(None)
+ }
+ }
+
+ pub fn update_node_init(
+ &self,
+ id: NodeID,
+ update: impl FnOnce(&mut Node) -> Result<()>,
+ ) -> Result<()> {
+ let time = SystemTime::now()
+ .duration_since(SystemTime::UNIX_EPOCH)
+ .unwrap()
+ .as_secs();
+ let mut txn = self.inner.begin_write()?;
+ let mut t_node = txn.open_table(T_NODE)?;
+ let mut t_node_mtime = txn.open_table(T_NODE_MTIME)?;
+ let mut t_node_children = txn.open_table(T_NODE_CHILDREN)?;
+ let mut t_node_external_id = txn.open_table(T_NODE_EXTERNAL_ID)?;
+ let mut t_tag_node = txn.open_table(T_TAG_NODE)?;
+ let mut node = t_node.get(id.0)?.map(|v| v.value().0).unwrap_or_default();
+
+ let mut dh_before = HashWriter(DefaultHasher::new());
+ bincode::encode_into_writer(&node, &mut dh_before, standard()).unwrap();
+ update(&mut node)?;
+ let mut dh_after = HashWriter(DefaultHasher::new());
+ bincode::encode_into_writer(&node, &mut dh_after, standard()).unwrap();
+
+ if dh_before.0.finish() == dh_after.0.finish() {
+ return Ok(());
+ }
+
+ for parent in &node.parents {
+ t_node_children.insert((parent.0, id.0), ())?;
+ }
+ for (pl, eid) in &node.external_ids {
+ t_node_external_id.insert((pl.as_str(), eid.as_str()), id.0)?;
+ }
+ for tag in &node.tags {
+ t_tag_node.insert((tag.as_str(), id.0), ())?;
+ }
+ t_node.insert(&id.0, Ser(node))?;
+ t_node_mtime.insert(&id.0, time)?;
+ drop((
+ t_node,
+ t_node_mtime,
+ t_node_children,
+ t_node_external_id,
+ t_tag_node,
+ ));
+ txn.set_durability(Durability::Eventual);
+ txn.commit()?;
+ Ok(())
+ }
+ pub fn get_node_media_paths(&self, id: NodeID) -> Result<Vec<PathBuf>> {
+ let txn = self.inner.begin_read()?;
+ let table = txn.open_table(T_NODE_MEDIA_PATHS)?;
+ let mut paths = Vec::new();
+ // TODO fix this
+ for p in table.range((id.0, "\0")..(id.0, "\x7f"))? {
+ paths.push(PathBuf::from_str(p?.0.value().1)?);
+ }
+ Ok(paths)
+ }
+ pub fn insert_node_media_path(&self, id: NodeID, path: &Path) -> Result<()> {
+ let txn = self.inner.begin_write()?;
+ let mut table = txn.open_table(T_NODE_MEDIA_PATHS)?;
+ table.insert((id.0, path.to_str().unwrap()), ())?;
+ drop(table);
+ txn.commit()?;
+ Ok(())
+ }
+
+ pub fn update_node_udata(
+ &self,
+ node: NodeID,
+ username: &str,
+ update: impl FnOnce(&mut NodeUserData) -> Result<()>,
+ ) -> Result<()> {
+ let txn = self.inner.begin_write()?;
+ let mut user_nodes = txn.open_table(T_USER_NODE)?;
+
+ let mut udata = user_nodes
+ .get((username, node.0))?
+ .map(|x| x.value().0)
+ .unwrap_or_default();
+
+ update(&mut udata)?;
+
+ user_nodes.insert((username, node.0), Ser(udata))?;
+ drop(user_nodes);
+ txn.commit()?;
+ Ok(())
+ }
+
+ pub fn get_user(&self, username: &str) -> Result<Option<User>> {
+ let txn = self.inner.begin_read()?;
+ let t_user = txn.open_table(T_USER)?;
+ if let Some(user) = t_user.get(username)? {
+ Ok(Some(user.value().0))
+ } else {
+ Ok(None)
+ }
+ }
+ pub fn update_user(
+ &self,
+ username: &str,
+ update: impl FnOnce(&mut User) -> Result<()>,
+ ) -> Result<()> {
+ let txn = self.inner.begin_write()?;
+ let mut users = txn.open_table(T_USER)?;
+ let mut user = users
+ .get(username)?
+ .ok_or(anyhow!("user does not exist"))?
+ .value()
+ .0;
+ update(&mut user)?;
+ users.insert(&username, Ser(user))?;
+ drop(users);
+ txn.commit()?;
+
+ Ok(())
+ }
+ pub fn delete_user(&self, username: &str) -> Result<bool> {
+ let txn = self.inner.begin_write()?;
+ let mut table = txn.open_table(T_USER)?;
+ let r = table.remove(username)?.is_some();
+ drop(table);
+ txn.commit()?;
+ Ok(r)
+ }
+ pub fn list_users(&self) -> Result<Vec<User>> {
+ let txn = self.inner.begin_read()?;
+ let table = txn.open_table(T_USER)?;
+ let i = table
+ .iter()?
+ .map(|a| {
+ let (_, y) = a.unwrap(); // TODO
+ y.value().0
+ })
+ .collect::<Vec<_>>();
+ drop(table);
+ Ok(i)
+ }
+ pub fn list_invites(&self) -> Result<Vec<String>> {
+ let txn = self.inner.begin_read()?;
+ let table = txn.open_table(T_INVITE)?;
+ let i = table
+ .iter()?
+ .map(|a| {
+ let (x, _) = a.unwrap();
+ x.value().to_owned()
+ })
+ .collect::<Vec<_>>();
+ drop(table);
+ Ok(i)
+ }
+ pub fn create_invite(&self, inv: &str) -> Result<()> {
+ let txn = self.inner.begin_write()?;
+ let mut table = txn.open_table(T_INVITE)?;
+ table.insert(inv, ())?;
+ drop(table);
+ txn.commit()?;
+ Ok(())
+ }
+ pub fn delete_invite(&self, inv: &str) -> Result<bool> {
+ let txn = self.inner.begin_write()?;
+ let mut table = txn.open_table(T_INVITE)?;
+ let r = table.remove(inv)?.is_some();
+ drop(table);
+ txn.commit()?;
+ Ok(r)
+ }
+ pub fn register_user(&self, invite: &str, username: &str, user: User) -> Result<()> {
+ let txn = self.inner.begin_write()?;
+ let mut invites = txn.open_table(T_INVITE)?;
+ let mut users = txn.open_table(T_USER)?;
+
+ if invites.remove(invite)?.is_none() {
+ bail!("invitation invalid");
+ }
+ let prev_user = users.insert(username, Ser(user))?.map(|x| x.value().0);
+ if prev_user.is_some() {
+ bail!("username taken");
+ }
+
+ drop(users);
+ drop(invites);
+ txn.commit()?;
+ Ok(())
+ }
+ pub fn list_nodes_with_udata(&self, username: &str) -> Result<Vec<(Arc<Node>, NodeUserData)>> {
+ let txn = self.inner.begin_read()?;
+ let nodes = txn.open_table(T_NODE)?;
+ let node_users = txn.open_table(T_USER_NODE)?;
+ let i = nodes
+ .iter()?
+ .map(|a| {
+ let (x, y) = a.unwrap();
+ let (x, y) = (x.value().to_owned(), Arc::new(y.value().0));
+ let z = node_users
+ .get(&(username, x))
+ .unwrap()
+ .map(|z| z.value().0)
+ .unwrap_or_default();
+ (y, z)
+ })
+ .collect::<Vec<_>>();
+ drop(nodes);
+ Ok(i)
+ }
+ pub fn search(&self, query: &str, limit: usize, offset: usize) -> Result<(usize, Vec<NodeID>)> {
+ let query = QueryParser::for_index(
+ &self.text_search.index,
+ vec![self.text_search.title, self.text_search.description],
+ )
+ .parse_query(query)
+ .context("parsing query")?;
+
+ let searcher = self.text_search.reader.searcher();
+ let sres = searcher.search(&query, &TopDocs::with_limit(limit).and_offset(offset))?;
+ let scount = searcher.search(&query, &Count)?;
+
+ let mut results = Vec::new();
+ for (_, daddr) in sres {
+ let doc: TantivyDocument = searcher.doc(daddr)?;
+ let id = doc
+ .get_first(self.text_search.id)
+ .unwrap()
+ .as_bytes()
+ .unwrap();
+ let id = NodeID(id.try_into().unwrap());
+ results.push(id);
+ }
+ Ok((scount, results))
+ }
+
+ pub fn search_create_index(&self) -> Result<()> {
+ let mut w = self.text_search.writer.write().unwrap();
+ w.delete_all_documents()?;
+
+ let txn = self.inner.begin_read()?;
+ let nodes = txn.open_table(T_NODE)?;
+ for node in nodes.iter()? {
+ let (x, y) = node?;
+ let (id, node) = (x.value().to_owned(), y.value().0);
+
+ w.add_document(doc!(
+ self.text_search.id => id.to_vec(),
+ self.text_search.title => node.title.unwrap_or_default(),
+ self.text_search.description => node.description.unwrap_or_default(),
+ self.text_search.releasedate => DateTime::from_timestamp_millis(node.release_date.unwrap_or_default()),
+ self.text_search.f_index => node.index.unwrap_or_default() as u64,
+ ))?;
+ }
+
+ w.commit()?;
+ Ok(())
+ }
+
+ pub fn create_admin_user(&self, username: &str, password_hash: Vec<u8>) -> Result<()> {
+ let txn = self.inner.begin_write().unwrap();
+ let mut users = txn.open_table(T_USER).unwrap();
+
+ let admin = users.get(username).unwrap().map(|x| x.value().0);
+ users
+ .insert(
+ username,
+ Ser(User {
+ admin: true,
+ name: username.to_owned(),
+ password: password_hash,
+ ..admin.unwrap_or_else(|| User {
+ display_name: "Admin".to_string(),
+ ..Default::default()
+ })
+ }),
+ )
+ .unwrap();
+
+ drop(users);
+ txn.commit().unwrap();
+ Ok(())
+ }
+ pub fn get_import_file_mtime(&self, path: &Path) -> Result<Option<u64>> {
+ let bytes = path.as_os_str().as_encoded_bytes();
+ let txn = self.inner.begin_read()?;
+ let table = txn.open_table(T_IMPORT_FILE_MTIME)?;
+ if let Some(v) = table.get(bytes)? {
+ Ok(Some(v.value()))
+ } else {
+ Ok(None)
+ }
+ }
+ pub fn set_import_file_mtime(&self, path: &Path, mtime: u64) -> Result<()> {
+ let bytes = path.as_os_str().as_encoded_bytes();
+ let txn = self.inner.begin_write()?;
+ let mut table = txn.open_table(T_IMPORT_FILE_MTIME)?;
+ table.insert(bytes, mtime)?;
+ drop(table);
+ txn.commit()?;
+ Ok(())
+ }
+}
+
+pub struct HashWriter(DefaultHasher);
+impl bincode::enc::write::Writer for HashWriter {
+ fn write(&mut self, bytes: &[u8]) -> std::result::Result<(), bincode::error::EncodeError> {
+ self.0.write(bytes);
+ Ok(())
+ }
+}
+
+#[derive(Debug)]
+#[cfg(not(feature = "db_json"))]
+pub struct Ser<T>(pub T);
+#[cfg(not(feature = "db_json"))]
+impl<T: Encode + Decode + std::fmt::Debug> redb::Value for Ser<T> {
+ type SelfType<'a>
+ = Ser<T>
+ where
+ Self: 'a;
+ type AsBytes<'a>
+ = Vec<u8>
+ where
+ Self: 'a;
+
+ fn fixed_width() -> Option<usize> {
+ None
+ }
+
+ fn from_bytes<'a>(data: &'a [u8]) -> Self::SelfType<'a>
+ where
+ Self: 'a,
+ {
+ Ser(bincode::decode_from_slice(data, bincode::config::legacy())
+ .unwrap()
+ .0)
+ }
+
+ fn as_bytes<'a, 'b: 'a>(value: &'a Self::SelfType<'b>) -> Self::AsBytes<'a>
+ where
+ Self: 'a,
+ Self: 'b,
+ {
+ bincode::encode_to_vec(&value.0, bincode::config::legacy()).unwrap()
+ }
+
+ fn type_name() -> redb::TypeName {
+ redb::TypeName::new("bincode")
+ }
+}
+
+#[derive(Debug)]
+#[cfg(feature = "db_json")]
+pub struct Ser<T>(pub T);
+#[cfg(feature = "db_json")]
+impl<T: Serialize + for<'a> Deserialize<'a> + std::fmt::Debug> redb::Value for Ser<T> {
+ type SelfType<'a>
+ = Ser<T>
+ where
+ Self: 'a;
+ type AsBytes<'a>
+ = Vec<u8>
+ where
+ Self: 'a;
+
+ fn fixed_width() -> Option<usize> {
+ None
+ }
+
+ fn from_bytes<'a>(data: &'a [u8]) -> Self::SelfType<'a>
+ where
+ Self: 'a,
+ {
+ Ser(serde_json::from_slice(data).unwrap())
+ }
+
+ fn as_bytes<'a, 'b: 'a>(value: &'a Self::SelfType<'b>) -> Self::AsBytes<'a>
+ where
+ Self: 'a,
+ Self: 'b,
+ {
+ serde_json::to_vec(&value.0).unwrap()
+ }
+
+ fn type_name() -> redb::TypeName {
+ redb::TypeName::new("json")
+ }
+}
diff --git a/database/src/search.rs b/database/src/search.rs
new file mode 100644
index 0000000..a7c074f
--- /dev/null
+++ b/database/src/search.rs
@@ -0,0 +1,64 @@
+/*
+ This file is part of jellything (https://codeberg.org/metamuffin/jellything)
+ which is licensed under the GNU Affero General Public License (version 3); see /COPYING.
+ Copyright (C) 2025 metamuffin <metamuffin.org>
+*/
+use anyhow::Context;
+use std::{fs::create_dir_all, path::Path, sync::RwLock};
+use tantivy::{
+ DateOptions, Index, IndexReader, IndexWriter, ReloadPolicy,
+ directory::MmapDirectory,
+ schema::{FAST, Field, INDEXED, STORED, STRING, Schema, TEXT},
+};
+
+pub struct NodeTextSearchIndex {
+ pub schema: Schema,
+ pub reader: IndexReader,
+ pub writer: RwLock<IndexWriter>,
+ pub index: Index,
+ pub id: Field,
+ pub title: Field,
+ pub releasedate: Field,
+ pub description: Field,
+ pub parent: Field,
+ pub f_index: Field,
+}
+impl NodeTextSearchIndex {
+ pub(crate) fn new(path: &Path) -> anyhow::Result<Self> {
+ let mut schema = Schema::builder();
+ let id = schema.add_text_field("id", TEXT | STORED | FAST);
+ let title = schema.add_text_field("title", TEXT);
+ let description = schema.add_text_field("description", TEXT);
+ let parent = schema.add_text_field("parent", STRING | FAST);
+ let f_index = schema.add_u64_field("index", FAST);
+ let releasedate = schema.add_date_field(
+ "releasedate",
+ DateOptions::from(INDEXED)
+ .set_fast()
+ .set_precision(tantivy::DateTimePrecision::Seconds),
+ );
+ let schema = schema.build();
+ create_dir_all(path.join("node_index"))?;
+ let directory =
+ MmapDirectory::open(path.join("node_index")).context("opening index directory")?;
+ let index = Index::open_or_create(directory, schema.clone()).context("creating index")?;
+ let reader = index
+ .reader_builder()
+ .reload_policy(ReloadPolicy::OnCommitWithDelay)
+ .try_into()
+ .context("creating reader")?;
+ let writer = index.writer(30_000_000).context("creating writer")?;
+ Ok(Self {
+ index,
+ writer: writer.into(),
+ reader,
+ schema,
+ parent,
+ f_index,
+ releasedate,
+ id,
+ description,
+ title,
+ })
+ }
+}