aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authormetamuffin <metamuffin@disroot.org>2025-01-29 23:09:54 +0100
committermetamuffin <metamuffin@disroot.org>2025-01-29 23:09:54 +0100
commit59825d68efa1077382fd6acac73f75ae9dc3680a (patch)
tree5f948eddf7b4c85f245359c35c653ab8768ba8f4
parent8099c51e56b6d253c05cac9c235f52027ad736fa (diff)
downloadjellything-59825d68efa1077382fd6acac73f75ae9dc3680a.tar
jellything-59825d68efa1077382fd6acac73f75ae9dc3680a.tar.bz2
jellything-59825d68efa1077382fd6acac73f75ae9dc3680a.tar.zst
mtime based incremental import
-rw-r--r--base/src/database.rs22
-rw-r--r--import/src/lib.rs30
-rw-r--r--server/src/routes/ui/admin/mod.rs12
3 files changed, 55 insertions, 9 deletions
diff --git a/base/src/database.rs b/base/src/database.rs
index 1f3efbf..a213a40 100644
--- a/base/src/database.rs
+++ b/base/src/database.rs
@@ -29,7 +29,7 @@ const T_USER_NODE: TableDefinition<(&str, [u8; 32]), Ser<NodeUserData>> =
TableDefinition::new("user_node");
const T_INVITE: TableDefinition<&str, ()> = TableDefinition::new("invite");
const T_NODE: TableDefinition<[u8; 32], Ser<Node>> = TableDefinition::new("node");
-const T_IMPORT_MTIME: TableDefinition<&[u8], u64> = TableDefinition::new("import-mtime");
+const T_IMPORT_FILE_MTIME: TableDefinition<&[u8], u64> = TableDefinition::new("import_file_mtime");
#[derive(Clone)]
pub struct Database {
@@ -56,6 +56,7 @@ impl Database {
drop(txn.open_table(T_USER)?);
drop(txn.open_table(T_USER_NODE)?);
drop(txn.open_table(T_NODE)?);
+ drop(txn.open_table(T_IMPORT_FILE_MTIME)?);
txn.commit()?;
}
@@ -284,6 +285,25 @@ impl Database {
Ok(())
}
+ pub fn get_import_file_mtime(&self, path: &Path) -> Result<Option<u64>> {
+ let bytes = path.as_os_str().as_encoded_bytes();
+ let txn = self.inner.begin_read()?;
+ let table = txn.open_table(T_IMPORT_FILE_MTIME)?;
+ if let Some(v) = table.get(bytes)? {
+ Ok(Some(v.value()))
+ } else {
+ Ok(None)
+ }
+ }
+ pub fn set_import_file_mtime(&self, path: &Path, mtime: u64) -> Result<()> {
+ let bytes = path.as_os_str().as_encoded_bytes();
+ let txn = self.inner.begin_write()?;
+ let mut table = txn.open_table(T_IMPORT_FILE_MTIME)?;
+ table.insert(bytes, mtime)?;
+ drop(table);
+ txn.commit()?;
+ Ok(())
+ }
}
pub struct NodeIndex {
diff --git a/import/src/lib.rs b/import/src/lib.rs
index ab410eb..34780a8 100644
--- a/import/src/lib.rs
+++ b/import/src/lib.rs
@@ -11,7 +11,8 @@ use ebml_struct::{
};
use jellybase::{assetfed::AssetInner, cache::cache_file, database::Database, CONF, SECRETS};
use jellycommon::{
- Chapter, LocalTrack, MediaInfo, NodeID, NodeKind, Rating, SourceTrack, SourceTrackKind, TrackSource,
+ Chapter, LocalTrack, MediaInfo, NodeID, NodeKind, Rating, SourceTrack, SourceTrackKind,
+ TrackSource,
};
use log::info;
use regex::Regex;
@@ -21,6 +22,7 @@ use std::{
io::{BufReader, ErrorKind, Read, Write},
path::Path,
sync::LazyLock,
+ time::UNIX_EPOCH,
};
use tmdb::Tmdb;
use tokio::{
@@ -48,11 +50,11 @@ pub fn is_importing() -> bool {
IMPORT_SEM.available_permits() == 0
}
-pub async fn import_wrap(db: Database) -> Result<()> {
+pub async fn import_wrap(db: Database, incremental: bool) -> Result<()> {
let _sem = IMPORT_SEM.try_acquire()?;
let jh = spawn_blocking(move || {
- let errs = match import(&db) {
+ let errs = match import(&db, incremental) {
Err(e) => vec![format!("{e:#}")],
Ok(e) => e,
};
@@ -64,7 +66,7 @@ pub async fn import_wrap(db: Database) -> Result<()> {
Ok(())
}
-fn import(db: &Database) -> Result<Vec<String>> {
+fn import(db: &Database, incremental: bool) -> Result<Vec<String>> {
let mut queue = VecDeque::from_iter(Some(CONF.media_path.clone()));
let mut errors = Vec::new();
@@ -72,6 +74,8 @@ fn import(db: &Database) -> Result<Vec<String>> {
trakt: SECRETS.api.trakt.as_ref().map(|key| Trakt::new(key)),
tmdb: SECRETS.api.tmdb.as_ref().map(|key| Tmdb::new(key)),
};
+ let mut num_skipped = 0;
+ let mut num_imported = 0;
while let Some(path) = queue.pop_front() {
if path.is_dir() {
@@ -80,11 +84,29 @@ fn import(db: &Database) -> Result<Vec<String>> {
}
}
if path.is_file() {
+ let meta = path.metadata()?;
+ let mtime = meta
+ .modified()?
+ .duration_since(UNIX_EPOCH)
+ .unwrap()
+ .as_secs();
+
+ if incremental {
+ if let Some(last_mtime) = db.get_import_file_mtime(&path)? {
+ if last_mtime >= mtime {
+ num_skipped += 1;
+ continue;
+ }
+ }
+ }
+ num_imported += 1;
if let Err(e) = import_file(db, &path).context(anyhow!("{path:?}")) {
errors.push(format!("{e:#}"));
}
+ db.set_import_file_mtime(&path, mtime)?;
}
}
+ info!("import finished. skipped={num_skipped} imported={num_imported}");
Ok(errors)
}
diff --git a/server/src/routes/ui/admin/mod.rs b/server/src/routes/ui/admin/mod.rs
index 2993be0..463319a 100644
--- a/server/src/routes/ui/admin/mod.rs
+++ b/server/src/routes/ui/admin/mod.rs
@@ -68,8 +68,11 @@ pub async fn admin_dashboard<'a>(
@if is_transcoding() {
section.message { p.warn { "Currently transcoding posters." } }
}
- form[method="POST", action=uri!(r_admin_import())] {
- input[type="submit", disabled=is_importing(), value="(Re-)Import Library"];
+ form[method="POST", action=uri!(r_admin_import(true))] {
+ input[type="submit", disabled=is_importing(), value="Start incremental import"];
+ }
+ form[method="POST", action=uri!(r_admin_import(false))] {
+ input[type="submit", disabled=is_importing(), value="Start full import"];
}
form[method="POST", action=uri!(r_admin_transcode_posters())] {
input[type="submit", disabled=is_transcoding(), value="Transcode all posters with low resolution"];
@@ -131,15 +134,16 @@ pub async fn r_admin_remove_invite(
admin_dashboard(database, Some(Ok("Invite invalidated".into()))).await
}
-#[post("/admin/import")]
+#[post("/admin/import?<incremental>")]
pub async fn r_admin_import(
session: AdminSession,
database: &State<Database>,
_federation: &State<Federation>,
+ incremental: bool,
) -> MyResult<DynLayoutPage<'static>> {
drop(session);
let t = Instant::now();
- let r = import_wrap((*database).clone()).await;
+ let r = import_wrap((*database).clone(), incremental).await;
admin_dashboard(
database,
Some(