diff options
Diffstat (limited to 'snapshot/src')
-rw-r--r-- | snapshot/src/format.rs | 85 | ||||
-rw-r--r-- | snapshot/src/lib.rs | 36 | ||||
-rw-r--r-- | snapshot/src/manager.rs | 140 | ||||
-rw-r--r-- | snapshot/src/receiver.rs | 245 | ||||
-rw-r--r-- | snapshot/src/snap.rs | 511 | ||||
-rw-r--r-- | snapshot/src/storage.rs | 176 |
6 files changed, 1193 insertions, 0 deletions
diff --git a/snapshot/src/format.rs b/snapshot/src/format.rs new file mode 100644 index 0000000..1a22938 --- /dev/null +++ b/snapshot/src/format.rs @@ -0,0 +1,85 @@ +use buffer::CapacityError; +use packer::Packer; +use packer::Unpacker; +use packer; +use snap::Error; +use warn::Warn; +use warn::wrap; + +#[derive(Clone, Debug, Eq, PartialEq)] +pub enum Warning { + Packer(packer::Warning), + NonZeroPadding, + DuplicateDelete, + DuplicateUpdate, + UnknownDelete, + DeleteUpdate, + NumUpdatedItems, +} + +impl From<packer::Warning> for Warning { + fn from(w: packer::Warning) -> Warning { + Warning::Packer(w) + } +} + +pub fn key_to_type_id(key: i32) -> u16 { + ((key as u32 >> 16) & 0xffff) as u16 +} + +pub fn key_to_id(key: i32) -> u16 { + ((key as u32) & 0xffff) as u16 +} + +pub fn key(type_id: u16, id: u16) -> i32 { + (((type_id as u32) << 16) | (id as u32)) as i32 +} + +#[derive(Clone, Copy, Debug)] +pub struct Item<'a> { + pub type_id: u16, + pub id: u16, + pub data: &'a [i32], +} + +impl<'a> Item<'a> { + pub fn from_key(key: i32, data: &[i32]) -> Item { + Item { + type_id: key_to_type_id(key), + id: key_to_id(key), + data: data, + } + } + pub fn key(&self) -> i32 { + key(self.type_id, self.id) + } +} + +#[derive(Clone, Copy, Debug)] +pub struct DeltaHeader { + pub num_deleted_items: i32, + pub num_updated_items: i32, +} + +impl DeltaHeader { + pub fn decode<W: Warn<Warning>>(warn: &mut W, p: &mut Unpacker) + -> Result<DeltaHeader, Error> + { + let result = DeltaHeader { + num_deleted_items: packer::positive(p.read_int(wrap(warn))?)?, + num_updated_items: packer::positive(p.read_int(wrap(warn))?)?, + }; + if p.read_int(wrap(warn))? != 0 { + warn.warn(Warning::NonZeroPadding); + } + Ok(result) + } + pub fn encode<'d, 's>(&self, mut p: Packer<'d, 's>) + -> Result<&'d [u8], CapacityError> + { + p.write_int(self.num_deleted_items)?; + p.write_int(self.num_updated_items)?; + p.write_int(0)?; + Ok(p.written()) + } +} diff --git a/snapshot/src/lib.rs b/snapshot/src/lib.rs new file mode 100644 index 0000000..992d898 --- /dev/null +++ b/snapshot/src/lib.rs @@ -0,0 +1,36 @@ + +#[cfg(feature = "gamenet_ddnet")] +extern crate gamenet_ddnet as gamenet; +#[cfg(feature = "gamenet_0_5")] +extern crate gamenet_teeworlds_0_5 as gamenet; +#[cfg(feature = "gamenet_0_6")] +extern crate gamenet_teeworlds_0_6 as gamenet; +#[cfg(feature = "gamenet_0_7")] +extern crate gamenet_teeworlds_0_7 as gamenet; + +extern crate buffer; +extern crate common; +extern crate packer; +extern crate vec_map; +extern crate warn; + +pub mod format; +pub mod manager; +pub mod receiver; +pub mod snap; +pub mod storage; + +pub use manager::Manager; +pub use receiver::DeltaReceiver; +pub use receiver::ReceivedDelta; +pub use snap::Delta; +pub use snap::DeltaReader; +pub use snap::Snap; +pub use storage::Storage; + +use common::num::Cast; +use std::ops; + +fn to_usize(r: ops::Range<u32>) -> ops::Range<usize> { + r.start.usize()..r.end.usize() +} diff --git a/snapshot/src/manager.rs b/snapshot/src/manager.rs new file mode 100644 index 0000000..65d0c72 --- /dev/null +++ b/snapshot/src/manager.rs @@ -0,0 +1,140 @@ +use Delta; +use DeltaReader; +use DeltaReceiver; +use ReceivedDelta; +use Snap; +use Storage; +use format; +use gamenet::msg::system; +use packer::Unpacker; +use receiver; +use snap; +use storage; +use warn::Warn; +use warn::wrap; + +#[derive(Clone, Debug, Eq, PartialEq)] +pub enum Error { + Receiver(receiver::Error), + Snap(snap::Error), + Storage(storage::Error), +} + +#[derive(Clone, Debug, Eq, PartialEq)] +pub enum Warning { + Receiver(receiver::Warning), + Snap(format::Warning), + Storage(storage::Warning), +} + +impl From<receiver::Error> for Error { + fn from(err: receiver::Error) -> Error { + Error::Receiver(err) + } +} + +impl From<snap::Error> for Error { + fn from(err: snap::Error) -> Error { + Error::Snap(err) + } +} + +impl From<storage::Error> for Error { + fn from(err: storage::Error) -> Error { + Error::Storage(err) + } +} + +impl From<receiver::Warning> for Warning { + fn from(w: receiver::Warning) -> Warning { + Warning::Receiver(w) + } +} + +impl From<format::Warning> for Warning { + fn from(w: format::Warning) -> Warning { + Warning::Snap(w) + } +} + +impl From<storage::Warning> for Warning { + fn from(w: storage::Warning) -> Warning { + Warning::Storage(w) + } +} + +#[derive(Clone, Default)] +struct ManagerInner { + temp_delta: Delta, + reader: DeltaReader, + storage: Storage, +} + +#[derive(Clone, Default)] +pub struct Manager { + inner: ManagerInner, + receiver: DeltaReceiver, +} + +impl Manager { + pub fn new() -> Manager { + Default::default() + } + pub fn reset(&mut self) { + self.inner.storage.reset(); + self.receiver.reset(); + } + pub fn ack_tick(&self) -> Option<i32> { + self.inner.storage.ack_tick() + } + pub fn snap_empty<W, O>(&mut self, warn: &mut W, object_size: O, snap: system::SnapEmpty) + -> Result<Option<&Snap>, Error> + where W: Warn<Warning>, + O: FnMut(u16) -> Option<u32>, + { + let res = self.receiver.snap_empty(wrap(warn), snap); + self.inner.handle_msg(warn, object_size, res) + } + pub fn snap_single<W, O>(&mut self, warn: &mut W, object_size: O, snap: system::SnapSingle) + -> Result<Option<&Snap>, Error> + where W: Warn<Warning>, + O: FnMut(u16) -> Option<u32>, + { + let res = self.receiver.snap_single(wrap(warn), snap); + self.inner.handle_msg(warn, object_size, res) + } + pub fn snap<W, O>(&mut self, warn: &mut W, object_size: O, snap: system::Snap) + -> Result<Option<&Snap>, Error> + where W: Warn<Warning>, + O: FnMut(u16) -> Option<u32>, + { + let res = self.receiver.snap(wrap(warn), snap); + self.inner.handle_msg(warn, object_size, res) + } +} + +impl ManagerInner { + fn handle_msg<W, O>(&mut self, warn: &mut W, object_size: O, res: Result<Option<ReceivedDelta>, receiver::Error>) + -> Result<Option<&Snap>, Error> + where W: Warn<Warning>, + O: FnMut(u16) -> Option<u32>, + { + Ok(match res? { + Some(delta) => Some(self.add_delta(warn, object_size, delta)?), + None => None, + }) + } + fn add_delta<W, O>(&mut self, warn: &mut W, object_size: O, delta: ReceivedDelta) + -> Result<&Snap, Error> + where W: Warn<Warning>, + O: FnMut(u16) -> Option<u32>, + { + let crc = delta.data_and_crc.map(|d| d.1); + if let Some((data, _)) = delta.data_and_crc { + self.reader.read(wrap(warn), &mut self.temp_delta, object_size, &mut Unpacker::new(data))?; + } else { + self.temp_delta.clear(); + } + Ok(self.storage.add_delta(wrap(warn), crc, delta.delta_tick, delta.tick, &self.temp_delta)?) + } +} diff --git a/snapshot/src/receiver.rs b/snapshot/src/receiver.rs new file mode 100644 index 0000000..49833d1 --- /dev/null +++ b/snapshot/src/receiver.rs @@ -0,0 +1,245 @@ +use common::num::Cast; +use gamenet::msg::system; +use std::ops; +use to_usize; +use vec_map::VecMap; +use warn::Warn; + +#[derive(Clone, Debug, Eq, PartialEq)] +pub enum Warning { + DuplicateSnap, + DifferingAttributes, +} + +#[derive(Clone, Debug, Eq, PartialEq)] +pub enum Error { + OldDelta, + InvalidNumParts, + InvalidPart, + DuplicatePart, +} + +// TODO: How to handle `tick` overflowing? +#[derive(Clone, Debug)] +struct CurrentDelta { + tick: i32, + delta_tick: i32, + num_parts: i32, + crc: i32, +} + +#[derive(Copy, Clone, Debug, Eq, PartialEq)] +pub struct ReceivedDelta<'a> { + pub delta_tick: i32, + pub tick: i32, + pub data_and_crc: Option<(&'a [u8], i32)>, +} + +#[derive(Clone, Default)] +pub struct DeltaReceiver { + previous_tick: Option<i32>, + current: Option<CurrentDelta>, + // `parts` points into `receive_buf`. + parts: VecMap<ops::Range<u32>>, + receive_buf: Vec<u8>, + result: Vec<u8>, +} + +impl DeltaReceiver { + pub fn new() -> DeltaReceiver { + Default::default() + } + pub fn reset(&mut self) { + self.previous_tick = None; + self.current = None; + } + fn can_receive(&self, tick: i32) -> bool { + self.current.as_ref().map(|c| c.tick <= tick) + .or(self.previous_tick.map(|t| t < tick)) + .unwrap_or(true) + } + fn init_delta(&mut self) { + self.parts.clear(); + self.receive_buf.clear(); + self.result.clear(); + } + fn finish_delta(&mut self, tick: i32) { + self.current = None; + self.previous_tick = Some(tick); + } + pub fn snap_empty<W>(&mut self, warn: &mut W, snap: system::SnapEmpty) + -> Result<Option<ReceivedDelta>, Error> + where W: Warn<Warning>, + { + if !self.can_receive(snap.tick) { + return Err(Error::OldDelta); + } + if self.current.as_ref().map(|c| c.tick == snap.tick).unwrap_or(false) { + warn.warn(Warning::DuplicateSnap); + } + self.init_delta(); + self.finish_delta(snap.tick); + Ok(Some(ReceivedDelta { + delta_tick: snap.tick.wrapping_sub(snap.delta_tick), + tick: snap.tick, + data_and_crc: None, + })) + } + pub fn snap_single<W>(&mut self, warn: &mut W, snap: system::SnapSingle) + -> Result<Option<ReceivedDelta>, Error> + where W: Warn<Warning>, + { + if !self.can_receive(snap.tick) { + return Err(Error::OldDelta); + } + if self.current.as_ref().map(|c| c.tick == snap.tick).unwrap_or(false) { + warn.warn(Warning::DuplicateSnap); + } + self.init_delta(); + self.finish_delta(snap.tick); + self.result.extend(snap.data); + Ok(Some(ReceivedDelta { + delta_tick: snap.tick.wrapping_sub(snap.delta_tick), + tick: snap.tick, + data_and_crc: Some((&self.result, snap.crc)), + })) + } + pub fn snap<W>(&mut self, warn: &mut W, snap: system::Snap) + -> Result<Option<ReceivedDelta>, Error> + where W: Warn<Warning>, + { + if !self.can_receive(snap.tick) { + return Err(Error::OldDelta); + } + if !(0 <= snap.num_parts && snap.num_parts <= 32) { + return Err(Error::InvalidNumParts); + } + if !(0 <= snap.part && snap.part < snap.num_parts) { + return Err(Error::InvalidPart); + } + if self.current.as_ref().map(|c| c.tick != snap.tick).unwrap_or(false) { + self.current = None; + } + if let None = self.current { + self.init_delta(); + self.current = Some(CurrentDelta { + tick: snap.tick, + delta_tick: snap.tick.wrapping_sub(snap.delta_tick), + num_parts: snap.num_parts, + crc: snap.crc, + }); + + // Checked above. + let num_parts = snap.num_parts.assert_usize(); + self.parts.reserve_len(num_parts); + } + let delta_tick; + let tick; + let crc; + let num_parts; + { + let current: &mut CurrentDelta = self.current.as_mut().unwrap(); + if snap.delta_tick != current.delta_tick + || snap.num_parts != current.num_parts + || snap.crc != current.crc + { + warn.warn(Warning::DifferingAttributes); + } + delta_tick = current.delta_tick; + tick = current.tick; + crc = current.crc; + num_parts = current.num_parts; + } + let part = snap.part.assert_usize(); + if self.parts.contains_key(part) { + return Err(Error::DuplicatePart); + } + let start = self.receive_buf.len().assert_u32(); + let end = (self.receive_buf.len() + snap.data.len()).assert_u32(); + self.receive_buf.extend(snap.data); + assert!(self.parts.insert(part, start..end).is_none()); + + if self.parts.len().assert_i32() != num_parts { + return Ok(None); + } + + self.finish_delta(tick); + self.result.reserve(self.receive_buf.len()); + for range in self.parts.values() { + self.result.extend(&self.receive_buf[to_usize(range.clone())]); + } + + Ok(Some(ReceivedDelta { + delta_tick: delta_tick, + tick: tick, + data_and_crc: Some((&self.result, crc)), + })) + } +} + +#[cfg(test)] +mod test { + use common::num::Cast; + use gamenet::msg::system::Snap; + use gamenet::msg::system::SnapEmpty; + use gamenet::msg::system::SnapSingle; + use super::DeltaReceiver; + use super::Error; + use super::ReceivedDelta; + use warn::Panic; + + #[test] + fn old() { + let mut receiver = DeltaReceiver::new(); + { + let result = receiver.snap_empty(&mut Panic, SnapEmpty { + tick: 1, + delta_tick: 2, + }).unwrap(); + + assert_eq!(result, Some(ReceivedDelta { + delta_tick: -1, + tick: 1, + data_and_crc: None, + })); + } + + assert_eq!(receiver.snap_single(&mut Panic, SnapSingle { + tick: 0, + delta_tick: 0, + data: b"123", + crc: 0, + }).unwrap_err(), Error::OldDelta); + } + + #[test] + fn reorder() { + let mut receiver = DeltaReceiver::new(); + let chunks: &[(i32, &[u8])] = &[ + (3, b"3"), + (2, b"2"), + (4, b"4_"), + (1, b"1__"), + (0, b"0"), + ]; + for &(i, c) in chunks { + let result = receiver.snap(&mut Panic, Snap { + tick: 2, + delta_tick: 1, + num_parts: chunks.len().assert_i32(), + part: i, + crc: 3, + data: c, + }).unwrap(); + if i != 0 { + assert_eq!(result, None); + } else { + assert_eq!(result, Some(ReceivedDelta { + delta_tick: 1, + tick: 2, + data_and_crc: Some((b"01__234_", 3)), + })); + } + } + } +} diff --git a/snapshot/src/snap.rs b/snapshot/src/snap.rs new file mode 100644 index 0000000..7b9768f --- /dev/null +++ b/snapshot/src/snap.rs @@ -0,0 +1,511 @@ +use buffer::CapacityError; +use common::num::Cast; +use format::DeltaHeader; +use format::Item; +use format::Warning; +use format::key; +use format::key_to_id; +use format::key_to_type_id; +use gamenet::enums::MAX_SNAPSHOT_PACKSIZE; +use gamenet::msg::system; +use packer::Packer; +use packer::Unpacker; +use packer::with_packer; +use packer; +use std::cmp; +use std::collections::HashMap; +use std::collections::HashSet; +use std::collections::hash_map; +use std::fmt; +use std::iter; +use std::mem; +use std::ops; +use to_usize; +use warn::Warn; +use warn::wrap; + +// TODO: Actually obey this the same way as Teeworlds does. +pub const MAX_SNAPSHOT_SIZE: usize = 64 * 1024; // 64 KB + +#[derive(Clone, Debug, Eq, PartialEq)] +pub enum Error { + UnexpectedEnd, + IntOutOfRange, + DeletedItemsUnpacking, + ItemDiffsUnpacking, + TypeIdRange, + IdRange, + NegativeSize, + TooLongDiff, + TooLongSnap, + DeltaDifferingSizes, +} + +#[derive(Clone, Debug, Eq, PartialEq)] +pub enum BuilderError { + DuplicateKey, + TooLongSnap, +} + +#[derive(Clone, Debug, Eq, PartialEq)] +struct TooLongSnap; + +impl From<TooLongSnap> for Error { + fn from(_: TooLongSnap) -> Error { + Error::TooLongSnap + } +} + +impl From<TooLongSnap> for BuilderError { + fn from(_: TooLongSnap) -> BuilderError { + BuilderError::TooLongSnap + } +} + +impl From<packer::IntOutOfRange> for Error { + fn from(_: packer::IntOutOfRange) -> Error { + Error::IntOutOfRange + } +} + +impl From<packer::UnexpectedEnd> for Error { + fn from(_: packer::UnexpectedEnd) -> Error { + Error::UnexpectedEnd + } +} + +fn apply_delta(in_: Option<&[i32]>, delta: &[i32], out: &mut [i32]) + -> Result<(), Error> +{ + assert!(delta.len() == out.len()); + match in_ { + Some(in_) => { + if in_.len() != out.len() { + return Err(Error::DeltaDifferingSizes); + } + for i in 0..out.len() { + out[i] = in_[i].wrapping_add(delta[i]); + } + } + None => out.copy_from_slice(delta), + } + Ok(()) +} + +fn create_delta(from: Option<&[i32]>, to: &[i32], out: &mut [i32]) { + assert!(to.len() == out.len()); + match from { + Some(from) => { + assert!(from.len() == to.len()); + for i in 0..out.len() { + out[i] = to[i].wrapping_sub(from[i]); + } + }, + None => out.copy_from_slice(to), + } +} + +// TODO: Select a faster hasher? +#[derive(Clone, Default)] +pub struct Snap { + offsets: HashMap<i32, ops::Range<u32>>, + buf: Vec<i32>, +} + +impl Snap { + pub fn empty() -> Snap { + Default::default() + } + fn clear(&mut self) { + self.offsets.clear(); + self.buf.clear(); + } + fn item_from_offset(&self, offset: ops::Range<u32>) -> &[i32] { + &self.buf[to_usize(offset)] + } + pub fn item(&self, type_id: u16, id: u16) -> Option<&[i32]> { + self.offsets.get(&key(type_id, id)).map(|o| &self.buf[to_usize(o.clone())]) + } + pub fn items(&self) -> Items { + Items { + snap: self, + iter: self.offsets.iter(), + } + } + fn prepare_item_vacant<'a>(entry: hash_map::VacantEntry<'a, i32, ops::Range<u32>>, buf: &mut Vec<i32>, size: usize) + -> Result<&'a mut ops::Range<u32>, TooLongSnap> + { + let offset = buf.len(); + if offset + size > MAX_SNAPSHOT_SIZE { + return Err(TooLongSnap); + } + let start = offset.assert_u32(); + let end = (offset + size).assert_u32(); + buf.extend(iter::repeat(0).take(size)); + Ok(entry.insert(start..end)) + } + fn prepare_item(&mut self, type_id: u16, id: u16, size: usize) + -> Result<&mut [i32], Error> + { + let offset = match self.offsets.entry(key(type_id, id)) { + hash_map::Entry::Occupied(o) => o.into_mut(), + hash_map::Entry::Vacant(v) => Snap::prepare_item_vacant(v, &mut self.buf, size)?, + }.clone(); + Ok(&mut self.buf[to_usize(offset)]) + } + pub fn read_with_delta<W>(&mut self, warn: &mut W, from: &Snap, delta: &Delta) + -> Result<(), Error> + where W: Warn<Warning>, + { + self.clear(); + + let mut num_deletions = 0; + for item in from.items() { + if !delta.deleted_items.contains(&item.key()) { + let out = self.prepare_item(item.type_id, item.id, item.data.len())?; + out.copy_from_slice(item.data); + } else { + num_deletions += 1; + } + } + if num_deletions != delta.deleted_items.len() { + warn.warn(Warning::UnknownDelete); + } + + for (&key, offset) in &delta.updated_items { + let type_id = key_to_type_id(key); + let id = key_to_id(key); + let diff = &delta.buf[to_usize(offset.clone())]; + let out = self.prepare_item(type_id, id, diff.len())?; + let in_ = from.item(type_id, id); + + apply_delta(in_, diff, out)?; + } + Ok(()) + } + pub fn write<'d, 's>(&self, buf: &mut Vec<i32>, mut p: Packer<'d, 's>) + -> Result<&'d [u8], CapacityError> + { + let keys = buf; + keys.clear(); + keys.extend(self.offsets.keys().cloned()); + keys.sort_unstable_by_key(|&k| k as u32); + let data_size = self.buf.len() + .checked_add(self.offsets.len()).expect("snap size overflow") + .checked_mul(mem::size_of::<i32>()).expect("snap size overflow") + .assert_i32(); + p.write_int(data_size)?; + let num_items = self.offsets.len().assert_i32(); + p.write_int(num_items)?; + + let mut offset = 0; + for &key in &*keys { + p.write_int(offset)?; + let key_offset = self.offsets[&key].clone(); + offset = offset + .checked_add((key_offset.end - key_offset.start + 1).usize() + .checked_mul(mem::size_of::<i32>()) + .expect("item size overflow").assert_i32()) + .expect("offset overflow"); + } + for &key in &*keys { + p.write_int(key)?; + for &i in &self.buf[to_usize(self.offsets[&key].clone())] { + p.write_int(i)?; + } + } + Ok(p.written()) + } + pub fn crc(&self) -> i32 { + self.buf.iter().fold(0, |s, &a| s.wrapping_add(a)) + } + pub fn recycle(mut self) -> Builder { + self.clear(); + Builder { + snap: self, + } + } +} + +pub struct Items<'a> { + snap: &'a Snap, + iter: hash_map::Iter<'a, i32, ops::Range<u32>>, +} + +impl<'a> Iterator for Items<'a> { + type Item = Item<'a>; + fn next(&mut self) -> Option<Item<'a>> { + self.iter.next().map(|(&k, o)| { + Item::from_key(k, self.snap.item_from_offset(o.clone())) + }) + } + fn size_hint(&self) -> (usize, Option<usize>) { + self.iter.size_hint() + } +} + +impl<'a> ExactSizeIterator for Items<'a> { + fn len(&self) -> usize { + self.iter.len() + } +} + +impl fmt::Debug for Snap { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_map().entries(self.items().map( + |Item { type_id, id, data }| ((type_id, id), data) + )).finish() + } +} + +#[derive(Clone, Default)] +pub struct Delta { + deleted_items: HashSet<i32>, + updated_items: HashMap<i32, ops::Range<u32>>, + buf: Vec<i32>, +} + +impl Delta { + pub fn new() -> Delta { + Default::default() + } + pub fn clear(&mut self) { + self.deleted_items.clear(); + self.updated_items.clear(); + self.buf.clear(); + } + fn prepare_update_item(&mut self, type_id: u16, id: u16, size: usize) -> &mut [i32] { + let key = key(type_id, id); + + let offset = self.buf.len(); + let start = offset.assert_u32(); + let end = (offset + size).assert_u32(); + self.buf.extend(iter::repeat(0).take(size)); + assert!(self.updated_items.insert(key, start..end).is_none()); + &mut self.buf[to_usize(start..end)] + } + pub fn create(&mut self, from: &Snap, to: &Snap) { + self.clear(); + for Item { type_id, id, .. } in from.items() { + if to.item(type_id, id).is_none() { + assert!(self.deleted_items.insert(key(type_id, id))); + } + } + for Item { type_id, id, data } in to.items() { + let from_data = from.item(type_id, id); + let out_delta = self.prepare_update_item(type_id, id, data.len()); + create_delta(from_data, data, out_delta); + } + } + pub fn write<'d, 's, O>(&self, object_size: O, mut p: Packer<'d, 's>) + -> Result<&'d [u8], CapacityError> + where O: FnMut(u16) -> Option<u32>, + { + let mut object_size = object_size; + with_packer(&mut p, |p| DeltaHeader { + num_deleted_items: self.deleted_items.len().assert_i32(), + num_updated_items: self.updated_items.len().assert_i32() + }.encode(p))?; + for &key in &self.deleted_items { + p.write_int(key)?; + } + for (&key, range) in &self.updated_items { + let data = &self.buf[to_usize(range.clone())]; + let type_id = key_to_type_id(key); + let id = key_to_id(key); + p.write_int(type_id.i32())?; + p.write_int(id.i32())?; + match object_size(type_id) { + Some(size) => assert!(size.usize() == data.len()), + None => p.write_int(data.len().assert_i32())?, + } + for &d in data { + p.write_int(d)?; + } + } + Ok(p.written()) + } +} + +#[derive(Clone, Default)] +pub struct DeltaReader { + buf: Vec<i32>, +} + +impl DeltaReader { + pub fn new() -> DeltaReader { + Default::default() + } + fn clear(&mut self) { + self.buf.clear(); + } + pub fn read<W, O>(&mut self, warn: &mut W, delta: &mut Delta, object_size: O, p: &mut Unpacker) + -> Result<(), Error> + where W: Warn<Warning>, + O: FnMut(u16) -> Option<u32>, + { + delta.clear(); + self.clear(); + + let mut object_size = object_size; + + let header = DeltaHeader::decode(warn, p)?; + while !p.as_slice().is_empty() { + self.buf.push(p.read_int(wrap(warn))?); + } + let split = header.num_deleted_items.assert_usize(); + if split > self.buf.len() { + return Err(Error::DeletedItemsUnpacking); + } + let (deleted_items, buf) = self.buf.split_at(split); + delta.deleted_items.extend(deleted_items); + if deleted_items.len() != delta.deleted_items.len() { + warn.warn(Warning::DuplicateDelete); + } + + let mut num_updates = 0; + let mut buf = buf.iter(); + // FIXME: Use `is_empty`. + while buf.len() != 0 { + let type_id = buf.next().ok_or(Error::ItemDiffsUnpacking)?; + let id = buf.next().ok_or(Error::ItemDiffsUnpacking)?; + + let type_id = type_id.try_u16().ok_or(Error::TypeIdRange)?; + let id = id.try_u16().ok_or(Error::IdRange)?; + + let size = match object_size(type_id) { + Some(s) => s.usize(), + None => { + let s = buf.next().ok_or(Error::ItemDiffsUnpacking)?; + s.try_usize().ok_or(Error::NegativeSize)? + } + }; + + if size > buf.len() { + return Err(Error::ItemDiffsUnpacking); + } + let (data, b) = buf.as_slice().split_at(size); + buf = b.iter(); + + let offset = delta.buf.len(); + let start = offset.try_u32().ok_or(Error::TooLongDiff)?; + let end = (offset + data.len()).try_u32().ok_or(Error::TooLongDiff)?; + delta.buf.extend(data.iter()); + + // In case of conflict, take later update (as the original code does). + if delta.updated_items.insert(key(type_id, id), start..end).is_some() { + warn.warn(Warning::DuplicateUpdate); + } + + if delta.deleted_items.contains(&key(type_id, id)) { + warn.warn(Warning::DeleteUpdate); + } + num_updates += 1; + } + + if num_updates != header.num_updated_items { + warn.warn(Warning::NumUpdatedItems); + } + + Ok(()) + } +} + +#[derive(Default)] +pub struct Builder { + snap: Snap, +} + +impl Builder { + pub fn new() -> Builder { + Default::default() + } + pub fn add_item(&mut self, type_id: u16, id: u16, data: &[i32]) + -> Result<(), BuilderError> + { + let offset = match self.snap.offsets.entry(key(type_id, id)) { + hash_map::Entry::Occupied(..) => return Err(BuilderError::DuplicateKey), + hash_map::Entry::Vacant(v) => { + Snap::prepare_item_vacant(v, &mut self.snap.buf, data.len())? + } + }.clone(); + self.snap.buf[to_usize(offset)].copy_from_slice(data); + Ok(()) + } + pub fn finish(self) -> Snap { + self.snap + } +} + +pub fn delta_chunks(tick: i32, delta_tick: i32, data: &[u8], crc: i32) -> DeltaChunks { + DeltaChunks { + tick: tick, + delta_tick: tick - delta_tick, + crc: crc, + cur_part: if !data.is_empty() { 0 } else { -1 }, + num_parts: ((data.len() + MAX_SNAPSHOT_PACKSIZE as usize - 1) / MAX_SNAPSHOT_PACKSIZE as usize).assert_i32(), + data: data, + } +} + +impl<'a> Into<system::System<'a>> for SnapMsg<'a> { + fn into(self) -> system::System<'a> { + match self { + SnapMsg::Snap(s) => system::System::Snap(s), + SnapMsg::SnapEmpty(s) => system::System::SnapEmpty(s), + SnapMsg::SnapSingle(s) => system::System::SnapSingle(s), + } + } +} + +#[derive(Clone, Copy)] +pub enum SnapMsg<'a> { + Snap(system::Snap<'a>), + SnapEmpty(system::SnapEmpty), + SnapSingle(system::SnapSingle<'a>), +} + +pub struct DeltaChunks<'a> { + tick: i32, + delta_tick: i32, + crc: i32, + cur_part: i32, + num_parts: i32, + data: &'a [u8], +} + +impl<'a> Iterator for DeltaChunks<'a> { + type Item = SnapMsg<'a>; + fn next(&mut self) -> Option<SnapMsg<'a>> { + if self.cur_part == self.num_parts { + return None; + } + let result = if self.num_parts == 0 { + SnapMsg::SnapEmpty(system::SnapEmpty { + tick: self.tick, + delta_tick: self.delta_tick, + }) + } else if self.num_parts == 1 { + SnapMsg::SnapSingle(system::SnapSingle { + tick: self.tick, + delta_tick: self.delta_tick, + crc: self.crc, + data: self.data, + }) + } else { + let index = self.cur_part.assert_usize(); + let start = MAX_SNAPSHOT_PACKSIZE as usize * index; + let end = cmp::min(MAX_SNAPSHOT_PACKSIZE as usize * (index + 1), self.data.len()); + SnapMsg::Snap(system::Snap { + tick: self.tick, + delta_tick: self.delta_tick, + num_parts: self.num_parts, + part: self.cur_part, + crc: self.crc, + data: &self.data[start..end], + }) + }; + self.cur_part += 1; + Some(result) + } +} diff --git a/snapshot/src/storage.rs b/snapshot/src/storage.rs new file mode 100644 index 0000000..c445d04 --- /dev/null +++ b/snapshot/src/storage.rs @@ -0,0 +1,176 @@ +use format; +use snap::Builder; +use snap::Delta; +use snap::Snap; +use snap; +use std::collections::VecDeque; +use warn::Warn; +use warn::wrap; + +// TODO: Separate server storage from client storage. +// TODO: Delete snapshots over time. + +#[derive(Clone)] +struct StoredSnap { + snap: Snap, + tick: i32, +} + +const MAX_STORED_SNAPSHOT: usize = 100; + +#[derive(Clone, Debug, Eq, PartialEq)] +pub struct UnknownSnap; + +#[derive(Clone, Debug, Eq, PartialEq)] +pub struct WeirdNegativeDeltaTick; + +#[derive(Clone, Debug, Eq, PartialEq)] +pub enum Error { + OldDelta, + UnknownSnap, + InvalidCrc, + Unpack(snap::Error), +} + +impl From<snap::Error> for Error { + fn from(err: snap::Error) -> Error { + Error::Unpack(err) + } +} + +#[derive(Clone, Debug, Eq, PartialEq)] +pub enum Warning { + WeirdNegativeDeltaTick, + Unpack(format::Warning), +} + +impl From<format::Warning> for Warning { + fn from(w: format::Warning) -> Warning { + Warning::Unpack(w) + } +} + +#[derive(Clone, Default)] +pub struct Storage { + /// Queue that stores received snaps. + /// + /// The newest elements are in the front. + snaps: VecDeque<StoredSnap>, + free: Vec<Snap>, + ack_tick: Option<i32>, + delta: Delta, + delta_tick: Option<i32>, +} + +impl Storage { + pub fn new() -> Storage { + Default::default() + } + pub fn reset(&mut self) { + let self_free = &mut self.free; + // FIXME: Replace with something like `exhaust`. + self.snaps.drain(..).map(|s| self_free.push(s.snap)).count(); + self.ack_tick = None; + } + pub fn ack_tick(&self) -> Option<i32> { + self.ack_tick + } + pub fn add_delta<W>(&mut self, warn: &mut W, crc: Option<i32>, delta_tick: i32, tick: i32, delta: &Delta) + -> Result<&Snap, Error> + where W: Warn<Warning>, + { + if self.snaps.front().map(|s| s.tick).unwrap_or(-1) >= tick { + return Err(Error::OldDelta); + } + { + let empty = Snap::empty(); + let delta_snap; + if delta_tick >= 0 { + if let Some(i) = self.snaps.iter().position(|s| s.tick < delta_tick) { + let self_free = &mut self.free; + // FIXME: Replace with something like `exhaust`. + self.snaps.drain(i..).map(|s| self_free.push(s.snap)).count(); + } + if let Some(d) = self.snaps.back() { + if d.tick == delta_tick { + delta_snap = &d.snap; + } else { + self.ack_tick = None; + return Err(Error::UnknownSnap); + } + } else { + self.ack_tick = None; + return Err(Error::UnknownSnap); + } + } else { + delta_snap = ∅ + if delta_tick != -1 { + warn.warn(Warning::WeirdNegativeDeltaTick); + } + } + if self.free.is_empty() { + self.free.push(Snap::empty()); + } + + let new_snap: &mut Snap = self.free.last_mut().unwrap(); + new_snap.read_with_delta(wrap(warn), delta_snap, delta)?; + if crc.map(|crc| crc != new_snap.crc()).unwrap_or(false) { + self.ack_tick = None; + return Err(Error::InvalidCrc); + } + self.ack_tick = Some(tick); + } + self.snaps.push_front(StoredSnap { + tick: tick, + snap: self.free.pop().unwrap(), + }); + if self.snaps.len() > MAX_STORED_SNAPSHOT { + self.free.push(self.snaps.pop_back().unwrap().snap); + } + Ok(&self.snaps.front().unwrap().snap) + } + pub fn new_builder(&mut self) -> Builder { + self.free.pop().unwrap_or_default().recycle() + } + pub fn set_delta_tick<W>(&mut self, warn: &mut W, tick: i32) + -> Result<(), UnknownSnap> + where W: Warn<WeirdNegativeDeltaTick>, + { + if tick < 0 { + if tick != -1 { + warn.warn(WeirdNegativeDeltaTick); + } + self.delta_tick = None; + return Ok(()); + } + if let Some(i) = self.snaps.iter().position(|s| s.tick < tick) { + let self_free = &mut self.free; + // FIXME: Replace with something like `exhaust`. + self.snaps.drain(i..).map(|s| self_free.push(s.snap)).count(); + } + if !self.snaps.back().map(|s| s.tick == tick).unwrap_or(false) { + self.delta_tick = None; + return Err(UnknownSnap); + } + self.delta_tick = Some(tick); + Ok(()) + } + pub fn delta_tick(&self) -> Option<i32> { + self.delta_tick + } + pub fn add_snap(&mut self, tick: i32, snap: Snap) -> &Delta { + self.snaps.push_front(StoredSnap { + snap: snap, + tick: tick, + }); + let tmp; + let delta_snap = if self.delta_tick.is_some() { + &self.snaps.back().unwrap().snap + } else { + tmp = Snap::empty(); + &tmp + }; + self.delta.create(delta_snap, &self.snaps.front().unwrap().snap); + &self.delta + } +} |