diff options
Diffstat (limited to 'snapshot')
-rw-r--r-- | snapshot/Cargo.toml | 27 | ||||
-rw-r--r-- | snapshot/src/format.rs | 85 | ||||
-rw-r--r-- | snapshot/src/lib.rs | 36 | ||||
-rw-r--r-- | snapshot/src/manager.rs | 140 | ||||
-rw-r--r-- | snapshot/src/receiver.rs | 245 | ||||
-rw-r--r-- | snapshot/src/snap.rs | 511 | ||||
-rw-r--r-- | snapshot/src/storage.rs | 176 |
7 files changed, 0 insertions, 1220 deletions
diff --git a/snapshot/Cargo.toml b/snapshot/Cargo.toml deleted file mode 100644 index 165ba3c..0000000 --- a/snapshot/Cargo.toml +++ /dev/null @@ -1,27 +0,0 @@ -[package] -name = "snapshot" -version = "0.0.1" -authors = [ - "metamuffin <metamuffin@disroot.org>", - "heinrich5991 <heinrich5991@gmail.com>", -] -license = "AGPL-3.0-only" - -[dependencies] -buffer = "0.1.9" -common = { git = "https://github.com/heinrich5991/libtw2" } -gamenet_teeworlds_0_5 = { git = "https://github.com/heinrich5991/libtw2", optional = true } -gamenet_teeworlds_0_6 = { git = "https://github.com/heinrich5991/libtw2", optional = true } -gamenet_teeworlds_0_7 = { git = "https://github.com/heinrich5991/libtw2", optional = true } -gamenet_ddnet = { git = "https://github.com/heinrich5991/libtw2", optional = true } -packer = { git = "https://github.com/heinrich5991/libtw2" } -vec_map = "0.8.0" -warn = ">=0.1.1,<0.3.0" - - -[features] -default = ["gamenet_ddnet"] -gamenet_tw_0_5 = ["gamenet_teeworlds_0_5"] -gamenet_tw_0_6 = ["gamenet_teeworlds_0_6"] -gamenet_tw_0_7 = ["gamenet_teeworlds_0_7"] -gamenet_ddnet_0_6 = ["gamenet_ddnet"] diff --git a/snapshot/src/format.rs b/snapshot/src/format.rs deleted file mode 100644 index 1a22938..0000000 --- a/snapshot/src/format.rs +++ /dev/null @@ -1,85 +0,0 @@ -use buffer::CapacityError; -use packer::Packer; -use packer::Unpacker; -use packer; -use snap::Error; -use warn::Warn; -use warn::wrap; - -#[derive(Clone, Debug, Eq, PartialEq)] -pub enum Warning { - Packer(packer::Warning), - NonZeroPadding, - DuplicateDelete, - DuplicateUpdate, - UnknownDelete, - DeleteUpdate, - NumUpdatedItems, -} - -impl From<packer::Warning> for Warning { - fn from(w: packer::Warning) -> Warning { - Warning::Packer(w) - } -} - -pub fn key_to_type_id(key: i32) -> u16 { - ((key as u32 >> 16) & 0xffff) as u16 -} - -pub fn key_to_id(key: i32) -> u16 { - ((key as u32) & 0xffff) as u16 -} - -pub fn key(type_id: u16, id: u16) -> i32 { - (((type_id as u32) << 16) | (id as u32)) as i32 -} - -#[derive(Clone, Copy, Debug)] -pub struct Item<'a> { - pub type_id: u16, - pub id: u16, - pub data: &'a [i32], -} - -impl<'a> Item<'a> { - pub fn from_key(key: i32, data: &[i32]) -> Item { - Item { - type_id: key_to_type_id(key), - id: key_to_id(key), - data: data, - } - } - pub fn key(&self) -> i32 { - key(self.type_id, self.id) - } -} - -#[derive(Clone, Copy, Debug)] -pub struct DeltaHeader { - pub num_deleted_items: i32, - pub num_updated_items: i32, -} - -impl DeltaHeader { - pub fn decode<W: Warn<Warning>>(warn: &mut W, p: &mut Unpacker) - -> Result<DeltaHeader, Error> - { - let result = DeltaHeader { - num_deleted_items: packer::positive(p.read_int(wrap(warn))?)?, - num_updated_items: packer::positive(p.read_int(wrap(warn))?)?, - }; - if p.read_int(wrap(warn))? != 0 { - warn.warn(Warning::NonZeroPadding); - } - Ok(result) - } - pub fn encode<'d, 's>(&self, mut p: Packer<'d, 's>) - -> Result<&'d [u8], CapacityError> - { - p.write_int(self.num_deleted_items)?; - p.write_int(self.num_updated_items)?; - p.write_int(0)?; - Ok(p.written()) - } -} diff --git a/snapshot/src/lib.rs b/snapshot/src/lib.rs deleted file mode 100644 index 992d898..0000000 --- a/snapshot/src/lib.rs +++ /dev/null @@ -1,36 +0,0 @@ - -#[cfg(feature = "gamenet_ddnet")] -extern crate gamenet_ddnet as gamenet; -#[cfg(feature = "gamenet_0_5")] -extern crate gamenet_teeworlds_0_5 as gamenet; -#[cfg(feature = "gamenet_0_6")] -extern crate gamenet_teeworlds_0_6 as gamenet; -#[cfg(feature = "gamenet_0_7")] -extern crate gamenet_teeworlds_0_7 as gamenet; - -extern crate buffer; -extern crate common; -extern crate packer; -extern crate vec_map; -extern crate warn; - -pub mod format; -pub mod manager; -pub mod receiver; -pub mod snap; -pub mod storage; - -pub use manager::Manager; -pub use receiver::DeltaReceiver; -pub use receiver::ReceivedDelta; -pub use snap::Delta; -pub use snap::DeltaReader; -pub use snap::Snap; -pub use storage::Storage; - -use common::num::Cast; -use std::ops; - -fn to_usize(r: ops::Range<u32>) -> ops::Range<usize> { - r.start.usize()..r.end.usize() -} diff --git a/snapshot/src/manager.rs b/snapshot/src/manager.rs deleted file mode 100644 index 65d0c72..0000000 --- a/snapshot/src/manager.rs +++ /dev/null @@ -1,140 +0,0 @@ -use Delta; -use DeltaReader; -use DeltaReceiver; -use ReceivedDelta; -use Snap; -use Storage; -use format; -use gamenet::msg::system; -use packer::Unpacker; -use receiver; -use snap; -use storage; -use warn::Warn; -use warn::wrap; - -#[derive(Clone, Debug, Eq, PartialEq)] -pub enum Error { - Receiver(receiver::Error), - Snap(snap::Error), - Storage(storage::Error), -} - -#[derive(Clone, Debug, Eq, PartialEq)] -pub enum Warning { - Receiver(receiver::Warning), - Snap(format::Warning), - Storage(storage::Warning), -} - -impl From<receiver::Error> for Error { - fn from(err: receiver::Error) -> Error { - Error::Receiver(err) - } -} - -impl From<snap::Error> for Error { - fn from(err: snap::Error) -> Error { - Error::Snap(err) - } -} - -impl From<storage::Error> for Error { - fn from(err: storage::Error) -> Error { - Error::Storage(err) - } -} - -impl From<receiver::Warning> for Warning { - fn from(w: receiver::Warning) -> Warning { - Warning::Receiver(w) - } -} - -impl From<format::Warning> for Warning { - fn from(w: format::Warning) -> Warning { - Warning::Snap(w) - } -} - -impl From<storage::Warning> for Warning { - fn from(w: storage::Warning) -> Warning { - Warning::Storage(w) - } -} - -#[derive(Clone, Default)] -struct ManagerInner { - temp_delta: Delta, - reader: DeltaReader, - storage: Storage, -} - -#[derive(Clone, Default)] -pub struct Manager { - inner: ManagerInner, - receiver: DeltaReceiver, -} - -impl Manager { - pub fn new() -> Manager { - Default::default() - } - pub fn reset(&mut self) { - self.inner.storage.reset(); - self.receiver.reset(); - } - pub fn ack_tick(&self) -> Option<i32> { - self.inner.storage.ack_tick() - } - pub fn snap_empty<W, O>(&mut self, warn: &mut W, object_size: O, snap: system::SnapEmpty) - -> Result<Option<&Snap>, Error> - where W: Warn<Warning>, - O: FnMut(u16) -> Option<u32>, - { - let res = self.receiver.snap_empty(wrap(warn), snap); - self.inner.handle_msg(warn, object_size, res) - } - pub fn snap_single<W, O>(&mut self, warn: &mut W, object_size: O, snap: system::SnapSingle) - -> Result<Option<&Snap>, Error> - where W: Warn<Warning>, - O: FnMut(u16) -> Option<u32>, - { - let res = self.receiver.snap_single(wrap(warn), snap); - self.inner.handle_msg(warn, object_size, res) - } - pub fn snap<W, O>(&mut self, warn: &mut W, object_size: O, snap: system::Snap) - -> Result<Option<&Snap>, Error> - where W: Warn<Warning>, - O: FnMut(u16) -> Option<u32>, - { - let res = self.receiver.snap(wrap(warn), snap); - self.inner.handle_msg(warn, object_size, res) - } -} - -impl ManagerInner { - fn handle_msg<W, O>(&mut self, warn: &mut W, object_size: O, res: Result<Option<ReceivedDelta>, receiver::Error>) - -> Result<Option<&Snap>, Error> - where W: Warn<Warning>, - O: FnMut(u16) -> Option<u32>, - { - Ok(match res? { - Some(delta) => Some(self.add_delta(warn, object_size, delta)?), - None => None, - }) - } - fn add_delta<W, O>(&mut self, warn: &mut W, object_size: O, delta: ReceivedDelta) - -> Result<&Snap, Error> - where W: Warn<Warning>, - O: FnMut(u16) -> Option<u32>, - { - let crc = delta.data_and_crc.map(|d| d.1); - if let Some((data, _)) = delta.data_and_crc { - self.reader.read(wrap(warn), &mut self.temp_delta, object_size, &mut Unpacker::new(data))?; - } else { - self.temp_delta.clear(); - } - Ok(self.storage.add_delta(wrap(warn), crc, delta.delta_tick, delta.tick, &self.temp_delta)?) - } -} diff --git a/snapshot/src/receiver.rs b/snapshot/src/receiver.rs deleted file mode 100644 index 49833d1..0000000 --- a/snapshot/src/receiver.rs +++ /dev/null @@ -1,245 +0,0 @@ -use common::num::Cast; -use gamenet::msg::system; -use std::ops; -use to_usize; -use vec_map::VecMap; -use warn::Warn; - -#[derive(Clone, Debug, Eq, PartialEq)] -pub enum Warning { - DuplicateSnap, - DifferingAttributes, -} - -#[derive(Clone, Debug, Eq, PartialEq)] -pub enum Error { - OldDelta, - InvalidNumParts, - InvalidPart, - DuplicatePart, -} - -// TODO: How to handle `tick` overflowing? -#[derive(Clone, Debug)] -struct CurrentDelta { - tick: i32, - delta_tick: i32, - num_parts: i32, - crc: i32, -} - -#[derive(Copy, Clone, Debug, Eq, PartialEq)] -pub struct ReceivedDelta<'a> { - pub delta_tick: i32, - pub tick: i32, - pub data_and_crc: Option<(&'a [u8], i32)>, -} - -#[derive(Clone, Default)] -pub struct DeltaReceiver { - previous_tick: Option<i32>, - current: Option<CurrentDelta>, - // `parts` points into `receive_buf`. - parts: VecMap<ops::Range<u32>>, - receive_buf: Vec<u8>, - result: Vec<u8>, -} - -impl DeltaReceiver { - pub fn new() -> DeltaReceiver { - Default::default() - } - pub fn reset(&mut self) { - self.previous_tick = None; - self.current = None; - } - fn can_receive(&self, tick: i32) -> bool { - self.current.as_ref().map(|c| c.tick <= tick) - .or(self.previous_tick.map(|t| t < tick)) - .unwrap_or(true) - } - fn init_delta(&mut self) { - self.parts.clear(); - self.receive_buf.clear(); - self.result.clear(); - } - fn finish_delta(&mut self, tick: i32) { - self.current = None; - self.previous_tick = Some(tick); - } - pub fn snap_empty<W>(&mut self, warn: &mut W, snap: system::SnapEmpty) - -> Result<Option<ReceivedDelta>, Error> - where W: Warn<Warning>, - { - if !self.can_receive(snap.tick) { - return Err(Error::OldDelta); - } - if self.current.as_ref().map(|c| c.tick == snap.tick).unwrap_or(false) { - warn.warn(Warning::DuplicateSnap); - } - self.init_delta(); - self.finish_delta(snap.tick); - Ok(Some(ReceivedDelta { - delta_tick: snap.tick.wrapping_sub(snap.delta_tick), - tick: snap.tick, - data_and_crc: None, - })) - } - pub fn snap_single<W>(&mut self, warn: &mut W, snap: system::SnapSingle) - -> Result<Option<ReceivedDelta>, Error> - where W: Warn<Warning>, - { - if !self.can_receive(snap.tick) { - return Err(Error::OldDelta); - } - if self.current.as_ref().map(|c| c.tick == snap.tick).unwrap_or(false) { - warn.warn(Warning::DuplicateSnap); - } - self.init_delta(); - self.finish_delta(snap.tick); - self.result.extend(snap.data); - Ok(Some(ReceivedDelta { - delta_tick: snap.tick.wrapping_sub(snap.delta_tick), - tick: snap.tick, - data_and_crc: Some((&self.result, snap.crc)), - })) - } - pub fn snap<W>(&mut self, warn: &mut W, snap: system::Snap) - -> Result<Option<ReceivedDelta>, Error> - where W: Warn<Warning>, - { - if !self.can_receive(snap.tick) { - return Err(Error::OldDelta); - } - if !(0 <= snap.num_parts && snap.num_parts <= 32) { - return Err(Error::InvalidNumParts); - } - if !(0 <= snap.part && snap.part < snap.num_parts) { - return Err(Error::InvalidPart); - } - if self.current.as_ref().map(|c| c.tick != snap.tick).unwrap_or(false) { - self.current = None; - } - if let None = self.current { - self.init_delta(); - self.current = Some(CurrentDelta { - tick: snap.tick, - delta_tick: snap.tick.wrapping_sub(snap.delta_tick), - num_parts: snap.num_parts, - crc: snap.crc, - }); - - // Checked above. - let num_parts = snap.num_parts.assert_usize(); - self.parts.reserve_len(num_parts); - } - let delta_tick; - let tick; - let crc; - let num_parts; - { - let current: &mut CurrentDelta = self.current.as_mut().unwrap(); - if snap.delta_tick != current.delta_tick - || snap.num_parts != current.num_parts - || snap.crc != current.crc - { - warn.warn(Warning::DifferingAttributes); - } - delta_tick = current.delta_tick; - tick = current.tick; - crc = current.crc; - num_parts = current.num_parts; - } - let part = snap.part.assert_usize(); - if self.parts.contains_key(part) { - return Err(Error::DuplicatePart); - } - let start = self.receive_buf.len().assert_u32(); - let end = (self.receive_buf.len() + snap.data.len()).assert_u32(); - self.receive_buf.extend(snap.data); - assert!(self.parts.insert(part, start..end).is_none()); - - if self.parts.len().assert_i32() != num_parts { - return Ok(None); - } - - self.finish_delta(tick); - self.result.reserve(self.receive_buf.len()); - for range in self.parts.values() { - self.result.extend(&self.receive_buf[to_usize(range.clone())]); - } - - Ok(Some(ReceivedDelta { - delta_tick: delta_tick, - tick: tick, - data_and_crc: Some((&self.result, crc)), - })) - } -} - -#[cfg(test)] -mod test { - use common::num::Cast; - use gamenet::msg::system::Snap; - use gamenet::msg::system::SnapEmpty; - use gamenet::msg::system::SnapSingle; - use super::DeltaReceiver; - use super::Error; - use super::ReceivedDelta; - use warn::Panic; - - #[test] - fn old() { - let mut receiver = DeltaReceiver::new(); - { - let result = receiver.snap_empty(&mut Panic, SnapEmpty { - tick: 1, - delta_tick: 2, - }).unwrap(); - - assert_eq!(result, Some(ReceivedDelta { - delta_tick: -1, - tick: 1, - data_and_crc: None, - })); - } - - assert_eq!(receiver.snap_single(&mut Panic, SnapSingle { - tick: 0, - delta_tick: 0, - data: b"123", - crc: 0, - }).unwrap_err(), Error::OldDelta); - } - - #[test] - fn reorder() { - let mut receiver = DeltaReceiver::new(); - let chunks: &[(i32, &[u8])] = &[ - (3, b"3"), - (2, b"2"), - (4, b"4_"), - (1, b"1__"), - (0, b"0"), - ]; - for &(i, c) in chunks { - let result = receiver.snap(&mut Panic, Snap { - tick: 2, - delta_tick: 1, - num_parts: chunks.len().assert_i32(), - part: i, - crc: 3, - data: c, - }).unwrap(); - if i != 0 { - assert_eq!(result, None); - } else { - assert_eq!(result, Some(ReceivedDelta { - delta_tick: 1, - tick: 2, - data_and_crc: Some((b"01__234_", 3)), - })); - } - } - } -} diff --git a/snapshot/src/snap.rs b/snapshot/src/snap.rs deleted file mode 100644 index 7b9768f..0000000 --- a/snapshot/src/snap.rs +++ /dev/null @@ -1,511 +0,0 @@ -use buffer::CapacityError; -use common::num::Cast; -use format::DeltaHeader; -use format::Item; -use format::Warning; -use format::key; -use format::key_to_id; -use format::key_to_type_id; -use gamenet::enums::MAX_SNAPSHOT_PACKSIZE; -use gamenet::msg::system; -use packer::Packer; -use packer::Unpacker; -use packer::with_packer; -use packer; -use std::cmp; -use std::collections::HashMap; -use std::collections::HashSet; -use std::collections::hash_map; -use std::fmt; -use std::iter; -use std::mem; -use std::ops; -use to_usize; -use warn::Warn; -use warn::wrap; - -// TODO: Actually obey this the same way as Teeworlds does. -pub const MAX_SNAPSHOT_SIZE: usize = 64 * 1024; // 64 KB - -#[derive(Clone, Debug, Eq, PartialEq)] -pub enum Error { - UnexpectedEnd, - IntOutOfRange, - DeletedItemsUnpacking, - ItemDiffsUnpacking, - TypeIdRange, - IdRange, - NegativeSize, - TooLongDiff, - TooLongSnap, - DeltaDifferingSizes, -} - -#[derive(Clone, Debug, Eq, PartialEq)] -pub enum BuilderError { - DuplicateKey, - TooLongSnap, -} - -#[derive(Clone, Debug, Eq, PartialEq)] -struct TooLongSnap; - -impl From<TooLongSnap> for Error { - fn from(_: TooLongSnap) -> Error { - Error::TooLongSnap - } -} - -impl From<TooLongSnap> for BuilderError { - fn from(_: TooLongSnap) -> BuilderError { - BuilderError::TooLongSnap - } -} - -impl From<packer::IntOutOfRange> for Error { - fn from(_: packer::IntOutOfRange) -> Error { - Error::IntOutOfRange - } -} - -impl From<packer::UnexpectedEnd> for Error { - fn from(_: packer::UnexpectedEnd) -> Error { - Error::UnexpectedEnd - } -} - -fn apply_delta(in_: Option<&[i32]>, delta: &[i32], out: &mut [i32]) - -> Result<(), Error> -{ - assert!(delta.len() == out.len()); - match in_ { - Some(in_) => { - if in_.len() != out.len() { - return Err(Error::DeltaDifferingSizes); - } - for i in 0..out.len() { - out[i] = in_[i].wrapping_add(delta[i]); - } - } - None => out.copy_from_slice(delta), - } - Ok(()) -} - -fn create_delta(from: Option<&[i32]>, to: &[i32], out: &mut [i32]) { - assert!(to.len() == out.len()); - match from { - Some(from) => { - assert!(from.len() == to.len()); - for i in 0..out.len() { - out[i] = to[i].wrapping_sub(from[i]); - } - }, - None => out.copy_from_slice(to), - } -} - -// TODO: Select a faster hasher? -#[derive(Clone, Default)] -pub struct Snap { - offsets: HashMap<i32, ops::Range<u32>>, - buf: Vec<i32>, -} - -impl Snap { - pub fn empty() -> Snap { - Default::default() - } - fn clear(&mut self) { - self.offsets.clear(); - self.buf.clear(); - } - fn item_from_offset(&self, offset: ops::Range<u32>) -> &[i32] { - &self.buf[to_usize(offset)] - } - pub fn item(&self, type_id: u16, id: u16) -> Option<&[i32]> { - self.offsets.get(&key(type_id, id)).map(|o| &self.buf[to_usize(o.clone())]) - } - pub fn items(&self) -> Items { - Items { - snap: self, - iter: self.offsets.iter(), - } - } - fn prepare_item_vacant<'a>(entry: hash_map::VacantEntry<'a, i32, ops::Range<u32>>, buf: &mut Vec<i32>, size: usize) - -> Result<&'a mut ops::Range<u32>, TooLongSnap> - { - let offset = buf.len(); - if offset + size > MAX_SNAPSHOT_SIZE { - return Err(TooLongSnap); - } - let start = offset.assert_u32(); - let end = (offset + size).assert_u32(); - buf.extend(iter::repeat(0).take(size)); - Ok(entry.insert(start..end)) - } - fn prepare_item(&mut self, type_id: u16, id: u16, size: usize) - -> Result<&mut [i32], Error> - { - let offset = match self.offsets.entry(key(type_id, id)) { - hash_map::Entry::Occupied(o) => o.into_mut(), - hash_map::Entry::Vacant(v) => Snap::prepare_item_vacant(v, &mut self.buf, size)?, - }.clone(); - Ok(&mut self.buf[to_usize(offset)]) - } - pub fn read_with_delta<W>(&mut self, warn: &mut W, from: &Snap, delta: &Delta) - -> Result<(), Error> - where W: Warn<Warning>, - { - self.clear(); - - let mut num_deletions = 0; - for item in from.items() { - if !delta.deleted_items.contains(&item.key()) { - let out = self.prepare_item(item.type_id, item.id, item.data.len())?; - out.copy_from_slice(item.data); - } else { - num_deletions += 1; - } - } - if num_deletions != delta.deleted_items.len() { - warn.warn(Warning::UnknownDelete); - } - - for (&key, offset) in &delta.updated_items { - let type_id = key_to_type_id(key); - let id = key_to_id(key); - let diff = &delta.buf[to_usize(offset.clone())]; - let out = self.prepare_item(type_id, id, diff.len())?; - let in_ = from.item(type_id, id); - - apply_delta(in_, diff, out)?; - } - Ok(()) - } - pub fn write<'d, 's>(&self, buf: &mut Vec<i32>, mut p: Packer<'d, 's>) - -> Result<&'d [u8], CapacityError> - { - let keys = buf; - keys.clear(); - keys.extend(self.offsets.keys().cloned()); - keys.sort_unstable_by_key(|&k| k as u32); - let data_size = self.buf.len() - .checked_add(self.offsets.len()).expect("snap size overflow") - .checked_mul(mem::size_of::<i32>()).expect("snap size overflow") - .assert_i32(); - p.write_int(data_size)?; - let num_items = self.offsets.len().assert_i32(); - p.write_int(num_items)?; - - let mut offset = 0; - for &key in &*keys { - p.write_int(offset)?; - let key_offset = self.offsets[&key].clone(); - offset = offset - .checked_add((key_offset.end - key_offset.start + 1).usize() - .checked_mul(mem::size_of::<i32>()) - .expect("item size overflow").assert_i32()) - .expect("offset overflow"); - } - for &key in &*keys { - p.write_int(key)?; - for &i in &self.buf[to_usize(self.offsets[&key].clone())] { - p.write_int(i)?; - } - } - Ok(p.written()) - } - pub fn crc(&self) -> i32 { - self.buf.iter().fold(0, |s, &a| s.wrapping_add(a)) - } - pub fn recycle(mut self) -> Builder { - self.clear(); - Builder { - snap: self, - } - } -} - -pub struct Items<'a> { - snap: &'a Snap, - iter: hash_map::Iter<'a, i32, ops::Range<u32>>, -} - -impl<'a> Iterator for Items<'a> { - type Item = Item<'a>; - fn next(&mut self) -> Option<Item<'a>> { - self.iter.next().map(|(&k, o)| { - Item::from_key(k, self.snap.item_from_offset(o.clone())) - }) - } - fn size_hint(&self) -> (usize, Option<usize>) { - self.iter.size_hint() - } -} - -impl<'a> ExactSizeIterator for Items<'a> { - fn len(&self) -> usize { - self.iter.len() - } -} - -impl fmt::Debug for Snap { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - f.debug_map().entries(self.items().map( - |Item { type_id, id, data }| ((type_id, id), data) - )).finish() - } -} - -#[derive(Clone, Default)] -pub struct Delta { - deleted_items: HashSet<i32>, - updated_items: HashMap<i32, ops::Range<u32>>, - buf: Vec<i32>, -} - -impl Delta { - pub fn new() -> Delta { - Default::default() - } - pub fn clear(&mut self) { - self.deleted_items.clear(); - self.updated_items.clear(); - self.buf.clear(); - } - fn prepare_update_item(&mut self, type_id: u16, id: u16, size: usize) -> &mut [i32] { - let key = key(type_id, id); - - let offset = self.buf.len(); - let start = offset.assert_u32(); - let end = (offset + size).assert_u32(); - self.buf.extend(iter::repeat(0).take(size)); - assert!(self.updated_items.insert(key, start..end).is_none()); - &mut self.buf[to_usize(start..end)] - } - pub fn create(&mut self, from: &Snap, to: &Snap) { - self.clear(); - for Item { type_id, id, .. } in from.items() { - if to.item(type_id, id).is_none() { - assert!(self.deleted_items.insert(key(type_id, id))); - } - } - for Item { type_id, id, data } in to.items() { - let from_data = from.item(type_id, id); - let out_delta = self.prepare_update_item(type_id, id, data.len()); - create_delta(from_data, data, out_delta); - } - } - pub fn write<'d, 's, O>(&self, object_size: O, mut p: Packer<'d, 's>) - -> Result<&'d [u8], CapacityError> - where O: FnMut(u16) -> Option<u32>, - { - let mut object_size = object_size; - with_packer(&mut p, |p| DeltaHeader { - num_deleted_items: self.deleted_items.len().assert_i32(), - num_updated_items: self.updated_items.len().assert_i32() - }.encode(p))?; - for &key in &self.deleted_items { - p.write_int(key)?; - } - for (&key, range) in &self.updated_items { - let data = &self.buf[to_usize(range.clone())]; - let type_id = key_to_type_id(key); - let id = key_to_id(key); - p.write_int(type_id.i32())?; - p.write_int(id.i32())?; - match object_size(type_id) { - Some(size) => assert!(size.usize() == data.len()), - None => p.write_int(data.len().assert_i32())?, - } - for &d in data { - p.write_int(d)?; - } - } - Ok(p.written()) - } -} - -#[derive(Clone, Default)] -pub struct DeltaReader { - buf: Vec<i32>, -} - -impl DeltaReader { - pub fn new() -> DeltaReader { - Default::default() - } - fn clear(&mut self) { - self.buf.clear(); - } - pub fn read<W, O>(&mut self, warn: &mut W, delta: &mut Delta, object_size: O, p: &mut Unpacker) - -> Result<(), Error> - where W: Warn<Warning>, - O: FnMut(u16) -> Option<u32>, - { - delta.clear(); - self.clear(); - - let mut object_size = object_size; - - let header = DeltaHeader::decode(warn, p)?; - while !p.as_slice().is_empty() { - self.buf.push(p.read_int(wrap(warn))?); - } - let split = header.num_deleted_items.assert_usize(); - if split > self.buf.len() { - return Err(Error::DeletedItemsUnpacking); - } - let (deleted_items, buf) = self.buf.split_at(split); - delta.deleted_items.extend(deleted_items); - if deleted_items.len() != delta.deleted_items.len() { - warn.warn(Warning::DuplicateDelete); - } - - let mut num_updates = 0; - let mut buf = buf.iter(); - // FIXME: Use `is_empty`. - while buf.len() != 0 { - let type_id = buf.next().ok_or(Error::ItemDiffsUnpacking)?; - let id = buf.next().ok_or(Error::ItemDiffsUnpacking)?; - - let type_id = type_id.try_u16().ok_or(Error::TypeIdRange)?; - let id = id.try_u16().ok_or(Error::IdRange)?; - - let size = match object_size(type_id) { - Some(s) => s.usize(), - None => { - let s = buf.next().ok_or(Error::ItemDiffsUnpacking)?; - s.try_usize().ok_or(Error::NegativeSize)? - } - }; - - if size > buf.len() { - return Err(Error::ItemDiffsUnpacking); - } - let (data, b) = buf.as_slice().split_at(size); - buf = b.iter(); - - let offset = delta.buf.len(); - let start = offset.try_u32().ok_or(Error::TooLongDiff)?; - let end = (offset + data.len()).try_u32().ok_or(Error::TooLongDiff)?; - delta.buf.extend(data.iter()); - - // In case of conflict, take later update (as the original code does). - if delta.updated_items.insert(key(type_id, id), start..end).is_some() { - warn.warn(Warning::DuplicateUpdate); - } - - if delta.deleted_items.contains(&key(type_id, id)) { - warn.warn(Warning::DeleteUpdate); - } - num_updates += 1; - } - - if num_updates != header.num_updated_items { - warn.warn(Warning::NumUpdatedItems); - } - - Ok(()) - } -} - -#[derive(Default)] -pub struct Builder { - snap: Snap, -} - -impl Builder { - pub fn new() -> Builder { - Default::default() - } - pub fn add_item(&mut self, type_id: u16, id: u16, data: &[i32]) - -> Result<(), BuilderError> - { - let offset = match self.snap.offsets.entry(key(type_id, id)) { - hash_map::Entry::Occupied(..) => return Err(BuilderError::DuplicateKey), - hash_map::Entry::Vacant(v) => { - Snap::prepare_item_vacant(v, &mut self.snap.buf, data.len())? - } - }.clone(); - self.snap.buf[to_usize(offset)].copy_from_slice(data); - Ok(()) - } - pub fn finish(self) -> Snap { - self.snap - } -} - -pub fn delta_chunks(tick: i32, delta_tick: i32, data: &[u8], crc: i32) -> DeltaChunks { - DeltaChunks { - tick: tick, - delta_tick: tick - delta_tick, - crc: crc, - cur_part: if !data.is_empty() { 0 } else { -1 }, - num_parts: ((data.len() + MAX_SNAPSHOT_PACKSIZE as usize - 1) / MAX_SNAPSHOT_PACKSIZE as usize).assert_i32(), - data: data, - } -} - -impl<'a> Into<system::System<'a>> for SnapMsg<'a> { - fn into(self) -> system::System<'a> { - match self { - SnapMsg::Snap(s) => system::System::Snap(s), - SnapMsg::SnapEmpty(s) => system::System::SnapEmpty(s), - SnapMsg::SnapSingle(s) => system::System::SnapSingle(s), - } - } -} - -#[derive(Clone, Copy)] -pub enum SnapMsg<'a> { - Snap(system::Snap<'a>), - SnapEmpty(system::SnapEmpty), - SnapSingle(system::SnapSingle<'a>), -} - -pub struct DeltaChunks<'a> { - tick: i32, - delta_tick: i32, - crc: i32, - cur_part: i32, - num_parts: i32, - data: &'a [u8], -} - -impl<'a> Iterator for DeltaChunks<'a> { - type Item = SnapMsg<'a>; - fn next(&mut self) -> Option<SnapMsg<'a>> { - if self.cur_part == self.num_parts { - return None; - } - let result = if self.num_parts == 0 { - SnapMsg::SnapEmpty(system::SnapEmpty { - tick: self.tick, - delta_tick: self.delta_tick, - }) - } else if self.num_parts == 1 { - SnapMsg::SnapSingle(system::SnapSingle { - tick: self.tick, - delta_tick: self.delta_tick, - crc: self.crc, - data: self.data, - }) - } else { - let index = self.cur_part.assert_usize(); - let start = MAX_SNAPSHOT_PACKSIZE as usize * index; - let end = cmp::min(MAX_SNAPSHOT_PACKSIZE as usize * (index + 1), self.data.len()); - SnapMsg::Snap(system::Snap { - tick: self.tick, - delta_tick: self.delta_tick, - num_parts: self.num_parts, - part: self.cur_part, - crc: self.crc, - data: &self.data[start..end], - }) - }; - self.cur_part += 1; - Some(result) - } -} diff --git a/snapshot/src/storage.rs b/snapshot/src/storage.rs deleted file mode 100644 index c445d04..0000000 --- a/snapshot/src/storage.rs +++ /dev/null @@ -1,176 +0,0 @@ -use format; -use snap::Builder; -use snap::Delta; -use snap::Snap; -use snap; -use std::collections::VecDeque; -use warn::Warn; -use warn::wrap; - -// TODO: Separate server storage from client storage. -// TODO: Delete snapshots over time. - -#[derive(Clone)] -struct StoredSnap { - snap: Snap, - tick: i32, -} - -const MAX_STORED_SNAPSHOT: usize = 100; - -#[derive(Clone, Debug, Eq, PartialEq)] -pub struct UnknownSnap; - -#[derive(Clone, Debug, Eq, PartialEq)] -pub struct WeirdNegativeDeltaTick; - -#[derive(Clone, Debug, Eq, PartialEq)] -pub enum Error { - OldDelta, - UnknownSnap, - InvalidCrc, - Unpack(snap::Error), -} - -impl From<snap::Error> for Error { - fn from(err: snap::Error) -> Error { - Error::Unpack(err) - } -} - -#[derive(Clone, Debug, Eq, PartialEq)] -pub enum Warning { - WeirdNegativeDeltaTick, - Unpack(format::Warning), -} - -impl From<format::Warning> for Warning { - fn from(w: format::Warning) -> Warning { - Warning::Unpack(w) - } -} - -#[derive(Clone, Default)] -pub struct Storage { - /// Queue that stores received snaps. - /// - /// The newest elements are in the front. - snaps: VecDeque<StoredSnap>, - free: Vec<Snap>, - ack_tick: Option<i32>, - delta: Delta, - delta_tick: Option<i32>, -} - -impl Storage { - pub fn new() -> Storage { - Default::default() - } - pub fn reset(&mut self) { - let self_free = &mut self.free; - // FIXME: Replace with something like `exhaust`. - self.snaps.drain(..).map(|s| self_free.push(s.snap)).count(); - self.ack_tick = None; - } - pub fn ack_tick(&self) -> Option<i32> { - self.ack_tick - } - pub fn add_delta<W>(&mut self, warn: &mut W, crc: Option<i32>, delta_tick: i32, tick: i32, delta: &Delta) - -> Result<&Snap, Error> - where W: Warn<Warning>, - { - if self.snaps.front().map(|s| s.tick).unwrap_or(-1) >= tick { - return Err(Error::OldDelta); - } - { - let empty = Snap::empty(); - let delta_snap; - if delta_tick >= 0 { - if let Some(i) = self.snaps.iter().position(|s| s.tick < delta_tick) { - let self_free = &mut self.free; - // FIXME: Replace with something like `exhaust`. - self.snaps.drain(i..).map(|s| self_free.push(s.snap)).count(); - } - if let Some(d) = self.snaps.back() { - if d.tick == delta_tick { - delta_snap = &d.snap; - } else { - self.ack_tick = None; - return Err(Error::UnknownSnap); - } - } else { - self.ack_tick = None; - return Err(Error::UnknownSnap); - } - } else { - delta_snap = ∅ - if delta_tick != -1 { - warn.warn(Warning::WeirdNegativeDeltaTick); - } - } - if self.free.is_empty() { - self.free.push(Snap::empty()); - } - - let new_snap: &mut Snap = self.free.last_mut().unwrap(); - new_snap.read_with_delta(wrap(warn), delta_snap, delta)?; - if crc.map(|crc| crc != new_snap.crc()).unwrap_or(false) { - self.ack_tick = None; - return Err(Error::InvalidCrc); - } - self.ack_tick = Some(tick); - } - self.snaps.push_front(StoredSnap { - tick: tick, - snap: self.free.pop().unwrap(), - }); - if self.snaps.len() > MAX_STORED_SNAPSHOT { - self.free.push(self.snaps.pop_back().unwrap().snap); - } - Ok(&self.snaps.front().unwrap().snap) - } - pub fn new_builder(&mut self) -> Builder { - self.free.pop().unwrap_or_default().recycle() - } - pub fn set_delta_tick<W>(&mut self, warn: &mut W, tick: i32) - -> Result<(), UnknownSnap> - where W: Warn<WeirdNegativeDeltaTick>, - { - if tick < 0 { - if tick != -1 { - warn.warn(WeirdNegativeDeltaTick); - } - self.delta_tick = None; - return Ok(()); - } - if let Some(i) = self.snaps.iter().position(|s| s.tick < tick) { - let self_free = &mut self.free; - // FIXME: Replace with something like `exhaust`. - self.snaps.drain(i..).map(|s| self_free.push(s.snap)).count(); - } - if !self.snaps.back().map(|s| s.tick == tick).unwrap_or(false) { - self.delta_tick = None; - return Err(UnknownSnap); - } - self.delta_tick = Some(tick); - Ok(()) - } - pub fn delta_tick(&self) -> Option<i32> { - self.delta_tick - } - pub fn add_snap(&mut self, tick: i32, snap: Snap) -> &Delta { - self.snaps.push_front(StoredSnap { - snap: snap, - tick: tick, - }); - let tmp; - let delta_snap = if self.delta_tick.is_some() { - &self.snaps.back().unwrap().snap - } else { - tmp = Snap::empty(); - &tmp - }; - self.delta.create(delta_snap, &self.snaps.front().unwrap().snap); - &self.delta - } -} |