aboutsummaryrefslogtreecommitdiff
path: root/server/src/ui/admin/import.rs
blob: b263705ebcb82a146920f017304f04ec7d8491bc (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
/*
    This file is part of jellything (https://codeberg.org/metamuffin/jellything)
    which is licensed under the GNU Affero General Public License (version 3); see /COPYING.
    Copyright (C) 2026 metamuffin <metamuffin.org>
*/

use crate::{request_info::RequestInfo, ui::error::MyResult, ui_responder::UiResponse};
use jellycommon::{
    ADMIN_IMPORT_BUSY, ADMIN_IMPORT_ERROR, VIEW_ADMIN_IMPORT,
    jellyobject::{Object, ObjectBuffer},
    routes::u_admin_import,
};
use jellyimport::{
    ImportConfig, import_wrap, is_importing,
    reporting::{IMPORT_ERRORS, IMPORT_PROGRESS},
};
use jellyui::tr;
use rocket::{
    get, post,
    response::{Flash, Redirect},
};
use rocket_ws::{Message, Stream, WebSocket};
use std::time::Duration;
use tokio::{spawn, time::sleep};

#[get("/admin/import", rank = 2)]
pub async fn r_admin_import(ri: RequestInfo<'_>) -> MyResult<UiResponse> {
    ri.require_admin()?;

    let last_import_err = IMPORT_ERRORS.read().await.clone();
    let last_import_err = last_import_err
        .iter()
        .map(|e| e.as_str())
        .collect::<Vec<_>>();
    let mut data = ObjectBuffer::empty();

    if is_importing() {
        data = data.as_object().insert(ADMIN_IMPORT_BUSY, ());
    }
    data = data
        .as_object()
        .insert_multi(ADMIN_IMPORT_ERROR, &last_import_err);

    Ok(ri.respond_ui(Object::EMPTY.insert(VIEW_ADMIN_IMPORT, data.as_object())))
}

#[post("/admin/import?<incremental>")]
pub async fn r_admin_import_post(
    ri: RequestInfo<'_>,
    incremental: bool,
) -> MyResult<Flash<Redirect>> {
    ri.require_admin()?;
    spawn(async move {
        let _ = import_wrap(
            ImportConfig {
                config: ri.state.config.import.clone(),
                cache: ri.state.cache.clone(),
                db: ri.state.database.clone(),
            },
            incremental,
        )
        .await;
    });
    Ok(Flash::success(
        Redirect::to(u_admin_import()),
        tr(ri.lang, "admin.import_success"),
    ))
}

#[get("/admin/import", rank = 1)]
pub fn r_admin_import_stream(ri: RequestInfo<'_>, ws: WebSocket) -> MyResult<Stream!['static]> {
    ri.require_admin()?;
    Ok({
        Stream! { ws =>
            loop {
                let Some(p) = IMPORT_PROGRESS.read().await.clone() else {
                    break;
                };
                yield Message::Text(serde_json::to_string(&p).unwrap());
                sleep(Duration::from_secs_f32(0.05)).await;
            }
            yield Message::Text("done".to_string());
            let _ = ws;
        }
    })
}