use anyhow::{bail, Context}; use log::error; use redb::{Database, ReadableTable, TableDefinition}; use rocket::tokio::{ sync::mpsc::{self, Receiver, Sender}, task, time::{timeout_at, Instant}, }; use serde::Deserialize; use std::time::Duration; use std::{collections::HashMap, net::IpAddr, path::PathBuf}; use std::{process::exit, sync::Arc}; #[derive(Deserialize)] pub struct AdInfo { pub image: PathBuf, pub target: String, } #[derive(Deserialize)] pub struct Config { bloom_filter_size: usize, impression_weight_falloff: f64, pub image_base: PathBuf, database_path: PathBuf, pub port: u16, pub ads: HashMap, } pub struct Logic { pub config: Config, database: Database, pub ad_keys: Vec, event_dispatch: Sender, } struct ImpressionEvent { site: String, adid: String, address_hash: u64, } static T_TOTAL: TableDefinition<'static, (), u64> = TableDefinition::new("t"); static T_IMPRESSIONS_RAW: TableDefinition<'static, &str, u64> = TableDefinition::new("ir"); static T_IMPRESSIONS_WEIGHTED: TableDefinition<'static, &str, f64> = TableDefinition::new("iw"); static T_IMPRESSIONS_ADS: TableDefinition<'static, &str, u64> = TableDefinition::new("ia"); impl Logic { pub fn new(config: Config) -> Arc { let (tx, rx) = mpsc::channel(4096); let state = Arc::new(Self { database: { let db = Database::create(&config.database_path).expect("database open failed"); { let txn = db.begin_write().unwrap(); txn.open_table(T_IMPRESSIONS_RAW).unwrap(); txn.open_table(T_IMPRESSIONS_WEIGHTED).unwrap(); txn.open_table(T_IMPRESSIONS_ADS).unwrap(); } db }, ad_keys: config.ads.keys().map(String::from).collect(), config, event_dispatch: tx, }); { let state = state.clone(); task::spawn(async move { if let Err(e) = state.commit_db(rx).await { error!("{e:?}"); exit(1) } }); } state } pub async fn get_leaderboard(&self) -> anyhow::Result> { let txn = self.database.begin_read()?; let mut d = { let raw = txn.open_table(T_IMPRESSIONS_RAW)?; let weighted = txn.open_table(T_IMPRESSIONS_WEIGHTED)?; weighted.iter()?.try_fold(Vec::new(), |mut s, k| { let (k, v) = k?; s.push(( k.value().to_owned(), raw.get(k.value())?.map(|e| e.value()).unwrap_or_default(), v.value(), )); Ok::<_, anyhow::Error>(s) })? }; #[derive(PartialEq, PartialOrd)] struct OrdAnyway(f64); impl Eq for OrdAnyway {} impl Ord for OrdAnyway { fn cmp(&self, other: &Self) -> std::cmp::Ordering { self.partial_cmp(other).unwrap() } } d.sort_by_key(|(_, _, w)| OrdAnyway(*w)); while d.len() > 16 { d.pop(); } Ok(d) } async fn commit_db(&self, mut rx: Receiver) -> anyhow::Result<()> { let mut deadline = None; let mut impressions_by_addr = vec![0u16; self.config.bloom_filter_size]; let mut imp_raw = HashMap::::new(); let mut imp_weighted = HashMap::::new(); let mut imp_ads = HashMap::::new(); let mut total = 0; loop { while Instant::now() < deadline.unwrap_or(Instant::now() + Duration::from_days(1)) { match timeout_at( deadline.unwrap_or(Instant::now() + Duration::from_days(1)), rx.recv(), ) .await { Ok(Some(ImpressionEvent { site, adid, address_hash, })) => { let num_impressions = { let ind = (address_hash % impressions_by_addr.len() as u64) as usize; impressions_by_addr[ind] = impressions_by_addr[ind].saturating_add(1); impressions_by_addr[ind] } as f64; let weight = self.config.impression_weight_falloff.powf(num_impressions); *imp_ads.entry(adid).or_default() += 1; *imp_raw.entry(site.clone()).or_default() += 1; *imp_weighted.entry(site).or_default() += weight; total += 1; if deadline.is_none() { deadline = Some(Instant::now() + Duration::from_secs(10)); } } Ok(None) => bail!("receiver end?!"), Err(_) => {} } } let txn = self.database.begin_write().context("database failure")?; { let mut raw = txn.open_table(T_TOTAL)?; let v = raw.get(())?.map(|g| g.value()).unwrap_or_default(); raw.insert((), v + 1)?; } { let mut raw = txn.open_table(T_TOTAL)?; let v = raw.get(())?.map(|g| g.value()).unwrap_or_default(); raw.insert((), v + total)?; total = 0; } for (site, amount) in imp_raw.drain() { let mut raw = txn.open_table(T_IMPRESSIONS_RAW)?; let v = raw .get(site.as_str())? .map(|g| g.value()) .unwrap_or_default(); raw.insert(site.as_str(), v + amount)?; } for (site, amount) in imp_weighted.drain() { let mut raw = txn.open_table(T_IMPRESSIONS_WEIGHTED)?; let v = raw .get(site.as_str())? .map(|g| g.value()) .unwrap_or_default(); raw.insert(site.as_str(), v + amount)?; } for (adid, amount) in imp_ads.drain() { let mut raw = txn.open_table(T_IMPRESSIONS_ADS)?; let v = raw .get(adid.as_str())? .map(|g| g.value()) .unwrap_or_default(); raw.insert(adid.as_str(), v + amount)?; } txn.commit().context("database failure")?; deadline = None; } } pub async fn register_impression( &self, site: &str, adid: &str, address: IpAddr, ) -> anyhow::Result<()> { let address_hash = xorshift(xorshift(xorshift( match address { IpAddr::V4(a) => a.to_ipv6_mapped(), IpAddr::V6(a) => a, } .to_bits() as u64, ))); self.event_dispatch .send(ImpressionEvent { address_hash, adid: adid.to_owned(), site: site.to_owned(), }) .await?; Ok(()) } } fn xorshift(mut x: u64) -> u64 { x ^= x << 13; x ^= x >> 7; x ^= x << 17; x }