aboutsummaryrefslogtreecommitdiff
path: root/snapshot
diff options
context:
space:
mode:
Diffstat (limited to 'snapshot')
-rw-r--r--snapshot/Cargo.toml27
-rw-r--r--snapshot/src/format.rs85
-rw-r--r--snapshot/src/lib.rs36
-rw-r--r--snapshot/src/manager.rs140
-rw-r--r--snapshot/src/receiver.rs245
-rw-r--r--snapshot/src/snap.rs511
-rw-r--r--snapshot/src/storage.rs176
7 files changed, 1220 insertions, 0 deletions
diff --git a/snapshot/Cargo.toml b/snapshot/Cargo.toml
new file mode 100644
index 0000000..165ba3c
--- /dev/null
+++ b/snapshot/Cargo.toml
@@ -0,0 +1,27 @@
+[package]
+name = "snapshot"
+version = "0.0.1"
+authors = [
+ "metamuffin <metamuffin@disroot.org>",
+ "heinrich5991 <heinrich5991@gmail.com>",
+]
+license = "AGPL-3.0-only"
+
+[dependencies]
+buffer = "0.1.9"
+common = { git = "https://github.com/heinrich5991/libtw2" }
+gamenet_teeworlds_0_5 = { git = "https://github.com/heinrich5991/libtw2", optional = true }
+gamenet_teeworlds_0_6 = { git = "https://github.com/heinrich5991/libtw2", optional = true }
+gamenet_teeworlds_0_7 = { git = "https://github.com/heinrich5991/libtw2", optional = true }
+gamenet_ddnet = { git = "https://github.com/heinrich5991/libtw2", optional = true }
+packer = { git = "https://github.com/heinrich5991/libtw2" }
+vec_map = "0.8.0"
+warn = ">=0.1.1,<0.3.0"
+
+
+[features]
+default = ["gamenet_ddnet"]
+gamenet_tw_0_5 = ["gamenet_teeworlds_0_5"]
+gamenet_tw_0_6 = ["gamenet_teeworlds_0_6"]
+gamenet_tw_0_7 = ["gamenet_teeworlds_0_7"]
+gamenet_ddnet_0_6 = ["gamenet_ddnet"]
diff --git a/snapshot/src/format.rs b/snapshot/src/format.rs
new file mode 100644
index 0000000..1a22938
--- /dev/null
+++ b/snapshot/src/format.rs
@@ -0,0 +1,85 @@
+use buffer::CapacityError;
+use packer::Packer;
+use packer::Unpacker;
+use packer;
+use snap::Error;
+use warn::Warn;
+use warn::wrap;
+
+#[derive(Clone, Debug, Eq, PartialEq)]
+pub enum Warning {
+ Packer(packer::Warning),
+ NonZeroPadding,
+ DuplicateDelete,
+ DuplicateUpdate,
+ UnknownDelete,
+ DeleteUpdate,
+ NumUpdatedItems,
+}
+
+impl From<packer::Warning> for Warning {
+ fn from(w: packer::Warning) -> Warning {
+ Warning::Packer(w)
+ }
+}
+
+pub fn key_to_type_id(key: i32) -> u16 {
+ ((key as u32 >> 16) & 0xffff) as u16
+}
+
+pub fn key_to_id(key: i32) -> u16 {
+ ((key as u32) & 0xffff) as u16
+}
+
+pub fn key(type_id: u16, id: u16) -> i32 {
+ (((type_id as u32) << 16) | (id as u32)) as i32
+}
+
+#[derive(Clone, Copy, Debug)]
+pub struct Item<'a> {
+ pub type_id: u16,
+ pub id: u16,
+ pub data: &'a [i32],
+}
+
+impl<'a> Item<'a> {
+ pub fn from_key(key: i32, data: &[i32]) -> Item {
+ Item {
+ type_id: key_to_type_id(key),
+ id: key_to_id(key),
+ data: data,
+ }
+ }
+ pub fn key(&self) -> i32 {
+ key(self.type_id, self.id)
+ }
+}
+
+#[derive(Clone, Copy, Debug)]
+pub struct DeltaHeader {
+ pub num_deleted_items: i32,
+ pub num_updated_items: i32,
+}
+
+impl DeltaHeader {
+ pub fn decode<W: Warn<Warning>>(warn: &mut W, p: &mut Unpacker)
+ -> Result<DeltaHeader, Error>
+ {
+ let result = DeltaHeader {
+ num_deleted_items: packer::positive(p.read_int(wrap(warn))?)?,
+ num_updated_items: packer::positive(p.read_int(wrap(warn))?)?,
+ };
+ if p.read_int(wrap(warn))? != 0 {
+ warn.warn(Warning::NonZeroPadding);
+ }
+ Ok(result)
+ }
+ pub fn encode<'d, 's>(&self, mut p: Packer<'d, 's>)
+ -> Result<&'d [u8], CapacityError>
+ {
+ p.write_int(self.num_deleted_items)?;
+ p.write_int(self.num_updated_items)?;
+ p.write_int(0)?;
+ Ok(p.written())
+ }
+}
diff --git a/snapshot/src/lib.rs b/snapshot/src/lib.rs
new file mode 100644
index 0000000..992d898
--- /dev/null
+++ b/snapshot/src/lib.rs
@@ -0,0 +1,36 @@
+
+#[cfg(feature = "gamenet_ddnet")]
+extern crate gamenet_ddnet as gamenet;
+#[cfg(feature = "gamenet_0_5")]
+extern crate gamenet_teeworlds_0_5 as gamenet;
+#[cfg(feature = "gamenet_0_6")]
+extern crate gamenet_teeworlds_0_6 as gamenet;
+#[cfg(feature = "gamenet_0_7")]
+extern crate gamenet_teeworlds_0_7 as gamenet;
+
+extern crate buffer;
+extern crate common;
+extern crate packer;
+extern crate vec_map;
+extern crate warn;
+
+pub mod format;
+pub mod manager;
+pub mod receiver;
+pub mod snap;
+pub mod storage;
+
+pub use manager::Manager;
+pub use receiver::DeltaReceiver;
+pub use receiver::ReceivedDelta;
+pub use snap::Delta;
+pub use snap::DeltaReader;
+pub use snap::Snap;
+pub use storage::Storage;
+
+use common::num::Cast;
+use std::ops;
+
+fn to_usize(r: ops::Range<u32>) -> ops::Range<usize> {
+ r.start.usize()..r.end.usize()
+}
diff --git a/snapshot/src/manager.rs b/snapshot/src/manager.rs
new file mode 100644
index 0000000..65d0c72
--- /dev/null
+++ b/snapshot/src/manager.rs
@@ -0,0 +1,140 @@
+use Delta;
+use DeltaReader;
+use DeltaReceiver;
+use ReceivedDelta;
+use Snap;
+use Storage;
+use format;
+use gamenet::msg::system;
+use packer::Unpacker;
+use receiver;
+use snap;
+use storage;
+use warn::Warn;
+use warn::wrap;
+
+#[derive(Clone, Debug, Eq, PartialEq)]
+pub enum Error {
+ Receiver(receiver::Error),
+ Snap(snap::Error),
+ Storage(storage::Error),
+}
+
+#[derive(Clone, Debug, Eq, PartialEq)]
+pub enum Warning {
+ Receiver(receiver::Warning),
+ Snap(format::Warning),
+ Storage(storage::Warning),
+}
+
+impl From<receiver::Error> for Error {
+ fn from(err: receiver::Error) -> Error {
+ Error::Receiver(err)
+ }
+}
+
+impl From<snap::Error> for Error {
+ fn from(err: snap::Error) -> Error {
+ Error::Snap(err)
+ }
+}
+
+impl From<storage::Error> for Error {
+ fn from(err: storage::Error) -> Error {
+ Error::Storage(err)
+ }
+}
+
+impl From<receiver::Warning> for Warning {
+ fn from(w: receiver::Warning) -> Warning {
+ Warning::Receiver(w)
+ }
+}
+
+impl From<format::Warning> for Warning {
+ fn from(w: format::Warning) -> Warning {
+ Warning::Snap(w)
+ }
+}
+
+impl From<storage::Warning> for Warning {
+ fn from(w: storage::Warning) -> Warning {
+ Warning::Storage(w)
+ }
+}
+
+#[derive(Clone, Default)]
+struct ManagerInner {
+ temp_delta: Delta,
+ reader: DeltaReader,
+ storage: Storage,
+}
+
+#[derive(Clone, Default)]
+pub struct Manager {
+ inner: ManagerInner,
+ receiver: DeltaReceiver,
+}
+
+impl Manager {
+ pub fn new() -> Manager {
+ Default::default()
+ }
+ pub fn reset(&mut self) {
+ self.inner.storage.reset();
+ self.receiver.reset();
+ }
+ pub fn ack_tick(&self) -> Option<i32> {
+ self.inner.storage.ack_tick()
+ }
+ pub fn snap_empty<W, O>(&mut self, warn: &mut W, object_size: O, snap: system::SnapEmpty)
+ -> Result<Option<&Snap>, Error>
+ where W: Warn<Warning>,
+ O: FnMut(u16) -> Option<u32>,
+ {
+ let res = self.receiver.snap_empty(wrap(warn), snap);
+ self.inner.handle_msg(warn, object_size, res)
+ }
+ pub fn snap_single<W, O>(&mut self, warn: &mut W, object_size: O, snap: system::SnapSingle)
+ -> Result<Option<&Snap>, Error>
+ where W: Warn<Warning>,
+ O: FnMut(u16) -> Option<u32>,
+ {
+ let res = self.receiver.snap_single(wrap(warn), snap);
+ self.inner.handle_msg(warn, object_size, res)
+ }
+ pub fn snap<W, O>(&mut self, warn: &mut W, object_size: O, snap: system::Snap)
+ -> Result<Option<&Snap>, Error>
+ where W: Warn<Warning>,
+ O: FnMut(u16) -> Option<u32>,
+ {
+ let res = self.receiver.snap(wrap(warn), snap);
+ self.inner.handle_msg(warn, object_size, res)
+ }
+}
+
+impl ManagerInner {
+ fn handle_msg<W, O>(&mut self, warn: &mut W, object_size: O, res: Result<Option<ReceivedDelta>, receiver::Error>)
+ -> Result<Option<&Snap>, Error>
+ where W: Warn<Warning>,
+ O: FnMut(u16) -> Option<u32>,
+ {
+ Ok(match res? {
+ Some(delta) => Some(self.add_delta(warn, object_size, delta)?),
+ None => None,
+ })
+ }
+ fn add_delta<W, O>(&mut self, warn: &mut W, object_size: O, delta: ReceivedDelta)
+ -> Result<&Snap, Error>
+ where W: Warn<Warning>,
+ O: FnMut(u16) -> Option<u32>,
+ {
+ let crc = delta.data_and_crc.map(|d| d.1);
+ if let Some((data, _)) = delta.data_and_crc {
+ self.reader.read(wrap(warn), &mut self.temp_delta, object_size, &mut Unpacker::new(data))?;
+ } else {
+ self.temp_delta.clear();
+ }
+ Ok(self.storage.add_delta(wrap(warn), crc, delta.delta_tick, delta.tick, &self.temp_delta)?)
+ }
+}
diff --git a/snapshot/src/receiver.rs b/snapshot/src/receiver.rs
new file mode 100644
index 0000000..49833d1
--- /dev/null
+++ b/snapshot/src/receiver.rs
@@ -0,0 +1,245 @@
+use common::num::Cast;
+use gamenet::msg::system;
+use std::ops;
+use to_usize;
+use vec_map::VecMap;
+use warn::Warn;
+
+#[derive(Clone, Debug, Eq, PartialEq)]
+pub enum Warning {
+ DuplicateSnap,
+ DifferingAttributes,
+}
+
+#[derive(Clone, Debug, Eq, PartialEq)]
+pub enum Error {
+ OldDelta,
+ InvalidNumParts,
+ InvalidPart,
+ DuplicatePart,
+}
+
+// TODO: How to handle `tick` overflowing?
+#[derive(Clone, Debug)]
+struct CurrentDelta {
+ tick: i32,
+ delta_tick: i32,
+ num_parts: i32,
+ crc: i32,
+}
+
+#[derive(Copy, Clone, Debug, Eq, PartialEq)]
+pub struct ReceivedDelta<'a> {
+ pub delta_tick: i32,
+ pub tick: i32,
+ pub data_and_crc: Option<(&'a [u8], i32)>,
+}
+
+#[derive(Clone, Default)]
+pub struct DeltaReceiver {
+ previous_tick: Option<i32>,
+ current: Option<CurrentDelta>,
+ // `parts` points into `receive_buf`.
+ parts: VecMap<ops::Range<u32>>,
+ receive_buf: Vec<u8>,
+ result: Vec<u8>,
+}
+
+impl DeltaReceiver {
+ pub fn new() -> DeltaReceiver {
+ Default::default()
+ }
+ pub fn reset(&mut self) {
+ self.previous_tick = None;
+ self.current = None;
+ }
+ fn can_receive(&self, tick: i32) -> bool {
+ self.current.as_ref().map(|c| c.tick <= tick)
+ .or(self.previous_tick.map(|t| t < tick))
+ .unwrap_or(true)
+ }
+ fn init_delta(&mut self) {
+ self.parts.clear();
+ self.receive_buf.clear();
+ self.result.clear();
+ }
+ fn finish_delta(&mut self, tick: i32) {
+ self.current = None;
+ self.previous_tick = Some(tick);
+ }
+ pub fn snap_empty<W>(&mut self, warn: &mut W, snap: system::SnapEmpty)
+ -> Result<Option<ReceivedDelta>, Error>
+ where W: Warn<Warning>,
+ {
+ if !self.can_receive(snap.tick) {
+ return Err(Error::OldDelta);
+ }
+ if self.current.as_ref().map(|c| c.tick == snap.tick).unwrap_or(false) {
+ warn.warn(Warning::DuplicateSnap);
+ }
+ self.init_delta();
+ self.finish_delta(snap.tick);
+ Ok(Some(ReceivedDelta {
+ delta_tick: snap.tick.wrapping_sub(snap.delta_tick),
+ tick: snap.tick,
+ data_and_crc: None,
+ }))
+ }
+ pub fn snap_single<W>(&mut self, warn: &mut W, snap: system::SnapSingle)
+ -> Result<Option<ReceivedDelta>, Error>
+ where W: Warn<Warning>,
+ {
+ if !self.can_receive(snap.tick) {
+ return Err(Error::OldDelta);
+ }
+ if self.current.as_ref().map(|c| c.tick == snap.tick).unwrap_or(false) {
+ warn.warn(Warning::DuplicateSnap);
+ }
+ self.init_delta();
+ self.finish_delta(snap.tick);
+ self.result.extend(snap.data);
+ Ok(Some(ReceivedDelta {
+ delta_tick: snap.tick.wrapping_sub(snap.delta_tick),
+ tick: snap.tick,
+ data_and_crc: Some((&self.result, snap.crc)),
+ }))
+ }
+ pub fn snap<W>(&mut self, warn: &mut W, snap: system::Snap)
+ -> Result<Option<ReceivedDelta>, Error>
+ where W: Warn<Warning>,
+ {
+ if !self.can_receive(snap.tick) {
+ return Err(Error::OldDelta);
+ }
+ if !(0 <= snap.num_parts && snap.num_parts <= 32) {
+ return Err(Error::InvalidNumParts);
+ }
+ if !(0 <= snap.part && snap.part < snap.num_parts) {
+ return Err(Error::InvalidPart);
+ }
+ if self.current.as_ref().map(|c| c.tick != snap.tick).unwrap_or(false) {
+ self.current = None;
+ }
+ if let None = self.current {
+ self.init_delta();
+ self.current = Some(CurrentDelta {
+ tick: snap.tick,
+ delta_tick: snap.tick.wrapping_sub(snap.delta_tick),
+ num_parts: snap.num_parts,
+ crc: snap.crc,
+ });
+
+ // Checked above.
+ let num_parts = snap.num_parts.assert_usize();
+ self.parts.reserve_len(num_parts);
+ }
+ let delta_tick;
+ let tick;
+ let crc;
+ let num_parts;
+ {
+ let current: &mut CurrentDelta = self.current.as_mut().unwrap();
+ if snap.delta_tick != current.delta_tick
+ || snap.num_parts != current.num_parts
+ || snap.crc != current.crc
+ {
+ warn.warn(Warning::DifferingAttributes);
+ }
+ delta_tick = current.delta_tick;
+ tick = current.tick;
+ crc = current.crc;
+ num_parts = current.num_parts;
+ }
+ let part = snap.part.assert_usize();
+ if self.parts.contains_key(part) {
+ return Err(Error::DuplicatePart);
+ }
+ let start = self.receive_buf.len().assert_u32();
+ let end = (self.receive_buf.len() + snap.data.len()).assert_u32();
+ self.receive_buf.extend(snap.data);
+ assert!(self.parts.insert(part, start..end).is_none());
+
+ if self.parts.len().assert_i32() != num_parts {
+ return Ok(None);
+ }
+
+ self.finish_delta(tick);
+ self.result.reserve(self.receive_buf.len());
+ for range in self.parts.values() {
+ self.result.extend(&self.receive_buf[to_usize(range.clone())]);
+ }
+
+ Ok(Some(ReceivedDelta {
+ delta_tick: delta_tick,
+ tick: tick,
+ data_and_crc: Some((&self.result, crc)),
+ }))
+ }
+}
+
+#[cfg(test)]
+mod test {
+ use common::num::Cast;
+ use gamenet::msg::system::Snap;
+ use gamenet::msg::system::SnapEmpty;
+ use gamenet::msg::system::SnapSingle;
+ use super::DeltaReceiver;
+ use super::Error;
+ use super::ReceivedDelta;
+ use warn::Panic;
+
+ #[test]
+ fn old() {
+ let mut receiver = DeltaReceiver::new();
+ {
+ let result = receiver.snap_empty(&mut Panic, SnapEmpty {
+ tick: 1,
+ delta_tick: 2,
+ }).unwrap();
+
+ assert_eq!(result, Some(ReceivedDelta {
+ delta_tick: -1,
+ tick: 1,
+ data_and_crc: None,
+ }));
+ }
+
+ assert_eq!(receiver.snap_single(&mut Panic, SnapSingle {
+ tick: 0,
+ delta_tick: 0,
+ data: b"123",
+ crc: 0,
+ }).unwrap_err(), Error::OldDelta);
+ }
+
+ #[test]
+ fn reorder() {
+ let mut receiver = DeltaReceiver::new();
+ let chunks: &[(i32, &[u8])] = &[
+ (3, b"3"),
+ (2, b"2"),
+ (4, b"4_"),
+ (1, b"1__"),
+ (0, b"0"),
+ ];
+ for &(i, c) in chunks {
+ let result = receiver.snap(&mut Panic, Snap {
+ tick: 2,
+ delta_tick: 1,
+ num_parts: chunks.len().assert_i32(),
+ part: i,
+ crc: 3,
+ data: c,
+ }).unwrap();
+ if i != 0 {
+ assert_eq!(result, None);
+ } else {
+ assert_eq!(result, Some(ReceivedDelta {
+ delta_tick: 1,
+ tick: 2,
+ data_and_crc: Some((b"01__234_", 3)),
+ }));
+ }
+ }
+ }
+}
diff --git a/snapshot/src/snap.rs b/snapshot/src/snap.rs
new file mode 100644
index 0000000..7b9768f
--- /dev/null
+++ b/snapshot/src/snap.rs
@@ -0,0 +1,511 @@
+use buffer::CapacityError;
+use common::num::Cast;
+use format::DeltaHeader;
+use format::Item;
+use format::Warning;
+use format::key;
+use format::key_to_id;
+use format::key_to_type_id;
+use gamenet::enums::MAX_SNAPSHOT_PACKSIZE;
+use gamenet::msg::system;
+use packer::Packer;
+use packer::Unpacker;
+use packer::with_packer;
+use packer;
+use std::cmp;
+use std::collections::HashMap;
+use std::collections::HashSet;
+use std::collections::hash_map;
+use std::fmt;
+use std::iter;
+use std::mem;
+use std::ops;
+use to_usize;
+use warn::Warn;
+use warn::wrap;
+
+// TODO: Actually obey this the same way as Teeworlds does.
+pub const MAX_SNAPSHOT_SIZE: usize = 64 * 1024; // 64 KB
+
+#[derive(Clone, Debug, Eq, PartialEq)]
+pub enum Error {
+ UnexpectedEnd,
+ IntOutOfRange,
+ DeletedItemsUnpacking,
+ ItemDiffsUnpacking,
+ TypeIdRange,
+ IdRange,
+ NegativeSize,
+ TooLongDiff,
+ TooLongSnap,
+ DeltaDifferingSizes,
+}
+
+#[derive(Clone, Debug, Eq, PartialEq)]
+pub enum BuilderError {
+ DuplicateKey,
+ TooLongSnap,
+}
+
+#[derive(Clone, Debug, Eq, PartialEq)]
+struct TooLongSnap;
+
+impl From<TooLongSnap> for Error {
+ fn from(_: TooLongSnap) -> Error {
+ Error::TooLongSnap
+ }
+}
+
+impl From<TooLongSnap> for BuilderError {
+ fn from(_: TooLongSnap) -> BuilderError {
+ BuilderError::TooLongSnap
+ }
+}
+
+impl From<packer::IntOutOfRange> for Error {
+ fn from(_: packer::IntOutOfRange) -> Error {
+ Error::IntOutOfRange
+ }
+}
+
+impl From<packer::UnexpectedEnd> for Error {
+ fn from(_: packer::UnexpectedEnd) -> Error {
+ Error::UnexpectedEnd
+ }
+}
+
+fn apply_delta(in_: Option<&[i32]>, delta: &[i32], out: &mut [i32])
+ -> Result<(), Error>
+{
+ assert!(delta.len() == out.len());
+ match in_ {
+ Some(in_) => {
+ if in_.len() != out.len() {
+ return Err(Error::DeltaDifferingSizes);
+ }
+ for i in 0..out.len() {
+ out[i] = in_[i].wrapping_add(delta[i]);
+ }
+ }
+ None => out.copy_from_slice(delta),
+ }
+ Ok(())
+}
+
+fn create_delta(from: Option<&[i32]>, to: &[i32], out: &mut [i32]) {
+ assert!(to.len() == out.len());
+ match from {
+ Some(from) => {
+ assert!(from.len() == to.len());
+ for i in 0..out.len() {
+ out[i] = to[i].wrapping_sub(from[i]);
+ }
+ },
+ None => out.copy_from_slice(to),
+ }
+}
+
+// TODO: Select a faster hasher?
+#[derive(Clone, Default)]
+pub struct Snap {
+ offsets: HashMap<i32, ops::Range<u32>>,
+ buf: Vec<i32>,
+}
+
+impl Snap {
+ pub fn empty() -> Snap {
+ Default::default()
+ }
+ fn clear(&mut self) {
+ self.offsets.clear();
+ self.buf.clear();
+ }
+ fn item_from_offset(&self, offset: ops::Range<u32>) -> &[i32] {
+ &self.buf[to_usize(offset)]
+ }
+ pub fn item(&self, type_id: u16, id: u16) -> Option<&[i32]> {
+ self.offsets.get(&key(type_id, id)).map(|o| &self.buf[to_usize(o.clone())])
+ }
+ pub fn items(&self) -> Items {
+ Items {
+ snap: self,
+ iter: self.offsets.iter(),
+ }
+ }
+ fn prepare_item_vacant<'a>(entry: hash_map::VacantEntry<'a, i32, ops::Range<u32>>, buf: &mut Vec<i32>, size: usize)
+ -> Result<&'a mut ops::Range<u32>, TooLongSnap>
+ {
+ let offset = buf.len();
+ if offset + size > MAX_SNAPSHOT_SIZE {
+ return Err(TooLongSnap);
+ }
+ let start = offset.assert_u32();
+ let end = (offset + size).assert_u32();
+ buf.extend(iter::repeat(0).take(size));
+ Ok(entry.insert(start..end))
+ }
+ fn prepare_item(&mut self, type_id: u16, id: u16, size: usize)
+ -> Result<&mut [i32], Error>
+ {
+ let offset = match self.offsets.entry(key(type_id, id)) {
+ hash_map::Entry::Occupied(o) => o.into_mut(),
+ hash_map::Entry::Vacant(v) => Snap::prepare_item_vacant(v, &mut self.buf, size)?,
+ }.clone();
+ Ok(&mut self.buf[to_usize(offset)])
+ }
+ pub fn read_with_delta<W>(&mut self, warn: &mut W, from: &Snap, delta: &Delta)
+ -> Result<(), Error>
+ where W: Warn<Warning>,
+ {
+ self.clear();
+
+ let mut num_deletions = 0;
+ for item in from.items() {
+ if !delta.deleted_items.contains(&item.key()) {
+ let out = self.prepare_item(item.type_id, item.id, item.data.len())?;
+ out.copy_from_slice(item.data);
+ } else {
+ num_deletions += 1;
+ }
+ }
+ if num_deletions != delta.deleted_items.len() {
+ warn.warn(Warning::UnknownDelete);
+ }
+
+ for (&key, offset) in &delta.updated_items {
+ let type_id = key_to_type_id(key);
+ let id = key_to_id(key);
+ let diff = &delta.buf[to_usize(offset.clone())];
+ let out = self.prepare_item(type_id, id, diff.len())?;
+ let in_ = from.item(type_id, id);
+
+ apply_delta(in_, diff, out)?;
+ }
+ Ok(())
+ }
+ pub fn write<'d, 's>(&self, buf: &mut Vec<i32>, mut p: Packer<'d, 's>)
+ -> Result<&'d [u8], CapacityError>
+ {
+ let keys = buf;
+ keys.clear();
+ keys.extend(self.offsets.keys().cloned());
+ keys.sort_unstable_by_key(|&k| k as u32);
+ let data_size = self.buf.len()
+ .checked_add(self.offsets.len()).expect("snap size overflow")
+ .checked_mul(mem::size_of::<i32>()).expect("snap size overflow")
+ .assert_i32();
+ p.write_int(data_size)?;
+ let num_items = self.offsets.len().assert_i32();
+ p.write_int(num_items)?;
+
+ let mut offset = 0;
+ for &key in &*keys {
+ p.write_int(offset)?;
+ let key_offset = self.offsets[&key].clone();
+ offset = offset
+ .checked_add((key_offset.end - key_offset.start + 1).usize()
+ .checked_mul(mem::size_of::<i32>())
+ .expect("item size overflow").assert_i32())
+ .expect("offset overflow");
+ }
+ for &key in &*keys {
+ p.write_int(key)?;
+ for &i in &self.buf[to_usize(self.offsets[&key].clone())] {
+ p.write_int(i)?;
+ }
+ }
+ Ok(p.written())
+ }
+ pub fn crc(&self) -> i32 {
+ self.buf.iter().fold(0, |s, &a| s.wrapping_add(a))
+ }
+ pub fn recycle(mut self) -> Builder {
+ self.clear();
+ Builder {
+ snap: self,
+ }
+ }
+}
+
+pub struct Items<'a> {
+ snap: &'a Snap,
+ iter: hash_map::Iter<'a, i32, ops::Range<u32>>,
+}
+
+impl<'a> Iterator for Items<'a> {
+ type Item = Item<'a>;
+ fn next(&mut self) -> Option<Item<'a>> {
+ self.iter.next().map(|(&k, o)| {
+ Item::from_key(k, self.snap.item_from_offset(o.clone()))
+ })
+ }
+ fn size_hint(&self) -> (usize, Option<usize>) {
+ self.iter.size_hint()
+ }
+}
+
+impl<'a> ExactSizeIterator for Items<'a> {
+ fn len(&self) -> usize {
+ self.iter.len()
+ }
+}
+
+impl fmt::Debug for Snap {
+ fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+ f.debug_map().entries(self.items().map(
+ |Item { type_id, id, data }| ((type_id, id), data)
+ )).finish()
+ }
+}
+
+#[derive(Clone, Default)]
+pub struct Delta {
+ deleted_items: HashSet<i32>,
+ updated_items: HashMap<i32, ops::Range<u32>>,
+ buf: Vec<i32>,
+}
+
+impl Delta {
+ pub fn new() -> Delta {
+ Default::default()
+ }
+ pub fn clear(&mut self) {
+ self.deleted_items.clear();
+ self.updated_items.clear();
+ self.buf.clear();
+ }
+ fn prepare_update_item(&mut self, type_id: u16, id: u16, size: usize) -> &mut [i32] {
+ let key = key(type_id, id);
+
+ let offset = self.buf.len();
+ let start = offset.assert_u32();
+ let end = (offset + size).assert_u32();
+ self.buf.extend(iter::repeat(0).take(size));
+ assert!(self.updated_items.insert(key, start..end).is_none());
+ &mut self.buf[to_usize(start..end)]
+ }
+ pub fn create(&mut self, from: &Snap, to: &Snap) {
+ self.clear();
+ for Item { type_id, id, .. } in from.items() {
+ if to.item(type_id, id).is_none() {
+ assert!(self.deleted_items.insert(key(type_id, id)));
+ }
+ }
+ for Item { type_id, id, data } in to.items() {
+ let from_data = from.item(type_id, id);
+ let out_delta = self.prepare_update_item(type_id, id, data.len());
+ create_delta(from_data, data, out_delta);
+ }
+ }
+ pub fn write<'d, 's, O>(&self, object_size: O, mut p: Packer<'d, 's>)
+ -> Result<&'d [u8], CapacityError>
+ where O: FnMut(u16) -> Option<u32>,
+ {
+ let mut object_size = object_size;
+ with_packer(&mut p, |p| DeltaHeader {
+ num_deleted_items: self.deleted_items.len().assert_i32(),
+ num_updated_items: self.updated_items.len().assert_i32()
+ }.encode(p))?;
+ for &key in &self.deleted_items {
+ p.write_int(key)?;
+ }
+ for (&key, range) in &self.updated_items {
+ let data = &self.buf[to_usize(range.clone())];
+ let type_id = key_to_type_id(key);
+ let id = key_to_id(key);
+ p.write_int(type_id.i32())?;
+ p.write_int(id.i32())?;
+ match object_size(type_id) {
+ Some(size) => assert!(size.usize() == data.len()),
+ None => p.write_int(data.len().assert_i32())?,
+ }
+ for &d in data {
+ p.write_int(d)?;
+ }
+ }
+ Ok(p.written())
+ }
+}
+
+#[derive(Clone, Default)]
+pub struct DeltaReader {
+ buf: Vec<i32>,
+}
+
+impl DeltaReader {
+ pub fn new() -> DeltaReader {
+ Default::default()
+ }
+ fn clear(&mut self) {
+ self.buf.clear();
+ }
+ pub fn read<W, O>(&mut self, warn: &mut W, delta: &mut Delta, object_size: O, p: &mut Unpacker)
+ -> Result<(), Error>
+ where W: Warn<Warning>,
+ O: FnMut(u16) -> Option<u32>,
+ {
+ delta.clear();
+ self.clear();
+
+ let mut object_size = object_size;
+
+ let header = DeltaHeader::decode(warn, p)?;
+ while !p.as_slice().is_empty() {
+ self.buf.push(p.read_int(wrap(warn))?);
+ }
+ let split = header.num_deleted_items.assert_usize();
+ if split > self.buf.len() {
+ return Err(Error::DeletedItemsUnpacking);
+ }
+ let (deleted_items, buf) = self.buf.split_at(split);
+ delta.deleted_items.extend(deleted_items);
+ if deleted_items.len() != delta.deleted_items.len() {
+ warn.warn(Warning::DuplicateDelete);
+ }
+
+ let mut num_updates = 0;
+ let mut buf = buf.iter();
+ // FIXME: Use `is_empty`.
+ while buf.len() != 0 {
+ let type_id = buf.next().ok_or(Error::ItemDiffsUnpacking)?;
+ let id = buf.next().ok_or(Error::ItemDiffsUnpacking)?;
+
+ let type_id = type_id.try_u16().ok_or(Error::TypeIdRange)?;
+ let id = id.try_u16().ok_or(Error::IdRange)?;
+
+ let size = match object_size(type_id) {
+ Some(s) => s.usize(),
+ None => {
+ let s = buf.next().ok_or(Error::ItemDiffsUnpacking)?;
+ s.try_usize().ok_or(Error::NegativeSize)?
+ }
+ };
+
+ if size > buf.len() {
+ return Err(Error::ItemDiffsUnpacking);
+ }
+ let (data, b) = buf.as_slice().split_at(size);
+ buf = b.iter();
+
+ let offset = delta.buf.len();
+ let start = offset.try_u32().ok_or(Error::TooLongDiff)?;
+ let end = (offset + data.len()).try_u32().ok_or(Error::TooLongDiff)?;
+ delta.buf.extend(data.iter());
+
+ // In case of conflict, take later update (as the original code does).
+ if delta.updated_items.insert(key(type_id, id), start..end).is_some() {
+ warn.warn(Warning::DuplicateUpdate);
+ }
+
+ if delta.deleted_items.contains(&key(type_id, id)) {
+ warn.warn(Warning::DeleteUpdate);
+ }
+ num_updates += 1;
+ }
+
+ if num_updates != header.num_updated_items {
+ warn.warn(Warning::NumUpdatedItems);
+ }
+
+ Ok(())
+ }
+}
+
+#[derive(Default)]
+pub struct Builder {
+ snap: Snap,
+}
+
+impl Builder {
+ pub fn new() -> Builder {
+ Default::default()
+ }
+ pub fn add_item(&mut self, type_id: u16, id: u16, data: &[i32])
+ -> Result<(), BuilderError>
+ {
+ let offset = match self.snap.offsets.entry(key(type_id, id)) {
+ hash_map::Entry::Occupied(..) => return Err(BuilderError::DuplicateKey),
+ hash_map::Entry::Vacant(v) => {
+ Snap::prepare_item_vacant(v, &mut self.snap.buf, data.len())?
+ }
+ }.clone();
+ self.snap.buf[to_usize(offset)].copy_from_slice(data);
+ Ok(())
+ }
+ pub fn finish(self) -> Snap {
+ self.snap
+ }
+}
+
+pub fn delta_chunks(tick: i32, delta_tick: i32, data: &[u8], crc: i32) -> DeltaChunks {
+ DeltaChunks {
+ tick: tick,
+ delta_tick: tick - delta_tick,
+ crc: crc,
+ cur_part: if !data.is_empty() { 0 } else { -1 },
+ num_parts: ((data.len() + MAX_SNAPSHOT_PACKSIZE as usize - 1) / MAX_SNAPSHOT_PACKSIZE as usize).assert_i32(),
+ data: data,
+ }
+}
+
+impl<'a> Into<system::System<'a>> for SnapMsg<'a> {
+ fn into(self) -> system::System<'a> {
+ match self {
+ SnapMsg::Snap(s) => system::System::Snap(s),
+ SnapMsg::SnapEmpty(s) => system::System::SnapEmpty(s),
+ SnapMsg::SnapSingle(s) => system::System::SnapSingle(s),
+ }
+ }
+}
+
+#[derive(Clone, Copy)]
+pub enum SnapMsg<'a> {
+ Snap(system::Snap<'a>),
+ SnapEmpty(system::SnapEmpty),
+ SnapSingle(system::SnapSingle<'a>),
+}
+
+pub struct DeltaChunks<'a> {
+ tick: i32,
+ delta_tick: i32,
+ crc: i32,
+ cur_part: i32,
+ num_parts: i32,
+ data: &'a [u8],
+}
+
+impl<'a> Iterator for DeltaChunks<'a> {
+ type Item = SnapMsg<'a>;
+ fn next(&mut self) -> Option<SnapMsg<'a>> {
+ if self.cur_part == self.num_parts {
+ return None;
+ }
+ let result = if self.num_parts == 0 {
+ SnapMsg::SnapEmpty(system::SnapEmpty {
+ tick: self.tick,
+ delta_tick: self.delta_tick,
+ })
+ } else if self.num_parts == 1 {
+ SnapMsg::SnapSingle(system::SnapSingle {
+ tick: self.tick,
+ delta_tick: self.delta_tick,
+ crc: self.crc,
+ data: self.data,
+ })
+ } else {
+ let index = self.cur_part.assert_usize();
+ let start = MAX_SNAPSHOT_PACKSIZE as usize * index;
+ let end = cmp::min(MAX_SNAPSHOT_PACKSIZE as usize * (index + 1), self.data.len());
+ SnapMsg::Snap(system::Snap {
+ tick: self.tick,
+ delta_tick: self.delta_tick,
+ num_parts: self.num_parts,
+ part: self.cur_part,
+ crc: self.crc,
+ data: &self.data[start..end],
+ })
+ };
+ self.cur_part += 1;
+ Some(result)
+ }
+}
diff --git a/snapshot/src/storage.rs b/snapshot/src/storage.rs
new file mode 100644
index 0000000..c445d04
--- /dev/null
+++ b/snapshot/src/storage.rs
@@ -0,0 +1,176 @@
+use format;
+use snap::Builder;
+use snap::Delta;
+use snap::Snap;
+use snap;
+use std::collections::VecDeque;
+use warn::Warn;
+use warn::wrap;
+
+// TODO: Separate server storage from client storage.
+// TODO: Delete snapshots over time.
+
+#[derive(Clone)]
+struct StoredSnap {
+ snap: Snap,
+ tick: i32,
+}
+
+const MAX_STORED_SNAPSHOT: usize = 100;
+
+#[derive(Clone, Debug, Eq, PartialEq)]
+pub struct UnknownSnap;
+
+#[derive(Clone, Debug, Eq, PartialEq)]
+pub struct WeirdNegativeDeltaTick;
+
+#[derive(Clone, Debug, Eq, PartialEq)]
+pub enum Error {
+ OldDelta,
+ UnknownSnap,
+ InvalidCrc,
+ Unpack(snap::Error),
+}
+
+impl From<snap::Error> for Error {
+ fn from(err: snap::Error) -> Error {
+ Error::Unpack(err)
+ }
+}
+
+#[derive(Clone, Debug, Eq, PartialEq)]
+pub enum Warning {
+ WeirdNegativeDeltaTick,
+ Unpack(format::Warning),
+}
+
+impl From<format::Warning> for Warning {
+ fn from(w: format::Warning) -> Warning {
+ Warning::Unpack(w)
+ }
+}
+
+#[derive(Clone, Default)]
+pub struct Storage {
+ /// Queue that stores received snaps.
+ ///
+ /// The newest elements are in the front.
+ snaps: VecDeque<StoredSnap>,
+ free: Vec<Snap>,
+ ack_tick: Option<i32>,
+ delta: Delta,
+ delta_tick: Option<i32>,
+}
+
+impl Storage {
+ pub fn new() -> Storage {
+ Default::default()
+ }
+ pub fn reset(&mut self) {
+ let self_free = &mut self.free;
+ // FIXME: Replace with something like `exhaust`.
+ self.snaps.drain(..).map(|s| self_free.push(s.snap)).count();
+ self.ack_tick = None;
+ }
+ pub fn ack_tick(&self) -> Option<i32> {
+ self.ack_tick
+ }
+ pub fn add_delta<W>(&mut self, warn: &mut W, crc: Option<i32>, delta_tick: i32, tick: i32, delta: &Delta)
+ -> Result<&Snap, Error>
+ where W: Warn<Warning>,
+ {
+ if self.snaps.front().map(|s| s.tick).unwrap_or(-1) >= tick {
+ return Err(Error::OldDelta);
+ }
+ {
+ let empty = Snap::empty();
+ let delta_snap;
+ if delta_tick >= 0 {
+ if let Some(i) = self.snaps.iter().position(|s| s.tick < delta_tick) {
+ let self_free = &mut self.free;
+ // FIXME: Replace with something like `exhaust`.
+ self.snaps.drain(i..).map(|s| self_free.push(s.snap)).count();
+ }
+ if let Some(d) = self.snaps.back() {
+ if d.tick == delta_tick {
+ delta_snap = &d.snap;
+ } else {
+ self.ack_tick = None;
+ return Err(Error::UnknownSnap);
+ }
+ } else {
+ self.ack_tick = None;
+ return Err(Error::UnknownSnap);
+ }
+ } else {
+ delta_snap = &empty;
+ if delta_tick != -1 {
+ warn.warn(Warning::WeirdNegativeDeltaTick);
+ }
+ }
+ if self.free.is_empty() {
+ self.free.push(Snap::empty());
+ }
+
+ let new_snap: &mut Snap = self.free.last_mut().unwrap();
+ new_snap.read_with_delta(wrap(warn), delta_snap, delta)?;
+ if crc.map(|crc| crc != new_snap.crc()).unwrap_or(false) {
+ self.ack_tick = None;
+ return Err(Error::InvalidCrc);
+ }
+ self.ack_tick = Some(tick);
+ }
+ self.snaps.push_front(StoredSnap {
+ tick: tick,
+ snap: self.free.pop().unwrap(),
+ });
+ if self.snaps.len() > MAX_STORED_SNAPSHOT {
+ self.free.push(self.snaps.pop_back().unwrap().snap);
+ }
+ Ok(&self.snaps.front().unwrap().snap)
+ }
+ pub fn new_builder(&mut self) -> Builder {
+ self.free.pop().unwrap_or_default().recycle()
+ }
+ pub fn set_delta_tick<W>(&mut self, warn: &mut W, tick: i32)
+ -> Result<(), UnknownSnap>
+ where W: Warn<WeirdNegativeDeltaTick>,
+ {
+ if tick < 0 {
+ if tick != -1 {
+ warn.warn(WeirdNegativeDeltaTick);
+ }
+ self.delta_tick = None;
+ return Ok(());
+ }
+ if let Some(i) = self.snaps.iter().position(|s| s.tick < tick) {
+ let self_free = &mut self.free;
+ // FIXME: Replace with something like `exhaust`.
+ self.snaps.drain(i..).map(|s| self_free.push(s.snap)).count();
+ }
+ if !self.snaps.back().map(|s| s.tick == tick).unwrap_or(false) {
+ self.delta_tick = None;
+ return Err(UnknownSnap);
+ }
+ self.delta_tick = Some(tick);
+ Ok(())
+ }
+ pub fn delta_tick(&self) -> Option<i32> {
+ self.delta_tick
+ }
+ pub fn add_snap(&mut self, tick: i32, snap: Snap) -> &Delta {
+ self.snaps.push_front(StoredSnap {
+ snap: snap,
+ tick: tick,
+ });
+ let tmp;
+ let delta_snap = if self.delta_tick.is_some() {
+ &self.snaps.back().unwrap().snap
+ } else {
+ tmp = Snap::empty();
+ &tmp
+ };
+ self.delta.create(delta_snap, &self.snaps.front().unwrap().snap);
+ &self.delta
+ }
+}