summaryrefslogtreecommitdiff
path: root/crdt/src/lib.rs
diff options
context:
space:
mode:
Diffstat (limited to 'crdt/src/lib.rs')
-rw-r--r--crdt/src/lib.rs359
1 files changed, 32 insertions, 327 deletions
diff --git a/crdt/src/lib.rs b/crdt/src/lib.rs
index 50ccd20..977763f 100644
--- a/crdt/src/lib.rs
+++ b/crdt/src/lib.rs
@@ -1,76 +1,43 @@
-mod vector_clock;
-
-use std::{collections::BTreeSet, fmt::Display, mem};
+//! This crate defines the distributed state management system for
+//! Notive. The state is an operation-based CRDT, which given the same
+//! set of operations on any client, will realize the same state.
-use num_rational::Ratio;
+use thiserror::Error;
use uuid::Uuid;
-use crate::vector_clock::VectorClock;
+use crate::{
+ doc::{ApplyOpError, Doc},
+ op::{Op, OpKind},
+ vector_clock::VectorClock,
+};
-use thiserror::Error;
+mod doc;
+mod op;
+mod vector_clock;
#[derive(Error, Debug)]
pub enum Error {
- #[error("object with ID {0} not found")]
- NotFound(DerivedId),
+ #[error("error while realizing state")]
+ RealizeError(#[from] ApplyOpError),
}
-#[derive(Default, Clone)]
+#[derive(Default)]
pub struct State {
ops: Vec<Op>,
}
impl State {
- pub fn from_ops(actor_id: &Uuid, data: &[OpData]) -> Self {
- let mut clock = VectorClock::new();
-
- let ops = data
- .iter()
- .cloned()
- .map(|data| {
- clock = clock.inc(&actor_id);
-
- Op {
- id: Uuid::now_v7(),
- clock: clock.clone(),
- data,
- }
- })
- .collect();
-
- State { ops }
- }
-
- pub fn append_op(&mut self, actor_id: &Uuid, data: OpData) {
- // Increment the last clock for the provided actor
+ pub fn append_op(&mut self, actor_id: &Uuid, kind: OpKind) {
let clock = self
.ops
.last()
- .map(|Op { clock, .. }| clock.inc(actor_id))
- // For an empty document, initialize a new clock
- .unwrap_or_else(|| VectorClock::default().inc(actor_id));
+ .map(|op| op.clock.inc(actor_id))
+ .unwrap_or_else(|| VectorClock::new().inc(actor_id));
self.ops.push(Op {
id: Uuid::now_v7(),
clock,
- data,
- });
- }
-
- pub fn merge(&mut self, other: &State) {
- let op_ids: BTreeSet<Uuid> = self.ops.iter().map(|op| op.id).collect();
-
- for op in &other.ops {
- if !op_ids.contains(&op.id) {
- self.ops.push(op.clone());
- }
- }
-
- self.ops.sort_by(|op1, op2| {
- op1.clock
- .partial_cmp(&op2.clock)
- // Tie-breaker: yse op ID
- .unwrap_or_else(|| op1.id.cmp(&op2.id))
+ kind,
});
}
@@ -78,297 +45,35 @@ impl State {
let mut doc = Doc::default();
for op in &self.ops {
- op.apply(&mut doc)?;
+ doc.apply_op(op)?;
}
Ok(doc)
}
}
-#[derive(Debug, Clone, PartialEq, Eq)]
-pub struct Op {
- id: Uuid,
- clock: VectorClock,
- data: OpData,
-}
-
-impl Op {
- fn apply(&self, doc: &mut Doc) -> Result<(), Error> {
- match &self.data {
- OpData::CreateGrid {
- rows,
- base_cells_per_row,
- } => {
- let duration: Ratio<u32> = Ratio::new(1, *base_cells_per_row as u32);
-
- let rows = (0..*rows)
- .map(|row_idx| {
- let cells = (0..*base_cells_per_row)
- .map(|cell_idx| {
- Entry::active(Cell {
- id: self.id.derive_id("cell", cell_idx),
- duration,
- })
- })
- .collect();
-
- Entry::active(Row {
- id: self.id.derive_id("row", row_idx),
- cells,
- })
- })
- .collect();
-
- doc.grids.push(Entry::active(Grid {
- id: self.id.derive_id("grid", 0),
- rows,
- }));
- }
-
- OpData::ChangeSubdivisions {
- grid_id,
- row_id,
- start_cell_id,
- end_cell_id,
- subdivisions,
- } => {
- let grid = doc
- .grids
- .iter_mut()
- .find(|entry| entry.is_active_and(|g| g.id == *grid_id))
- .ok_or(Error::NotFound(grid_id.clone()))?;
-
- let row = grid
- .value_mut()
- .rows
- .iter_mut()
- .find(|entry| entry.is_active_and(|r| r.id == *row_id))
- .ok_or(Error::NotFound(row_id.clone()))?;
-
- let start_cell_idx = row
- .value()
- .cells
- .iter()
- .position(|entry| entry.is_active_and(|c| c.id == *start_cell_id))
- .ok_or(Error::NotFound(start_cell_id.clone()))?;
-
- let end_cell_idx = row
- .value()
- .cells
- .iter()
- .position(|entry| entry.is_active_and(|c| c.id == *end_cell_id))
- .ok_or(Error::NotFound(end_cell_id.clone()))?;
-
- let (i, j) = if start_cell_idx <= end_cell_idx {
- (start_cell_idx, end_cell_idx)
- } else {
- (end_cell_idx, start_cell_idx)
- };
-
- for entry in row.value_mut().cells[i..(j + 1)].iter_mut() {
- entry.delete(self.id);
- }
-
- let span_duration: Ratio<u32> = row.value().cells[i..j + 1]
- .iter()
- .map(|cell| cell.value().duration)
- .sum();
-
- let duration: Ratio<u32> = span_duration / *subdivisions as u32;
-
- row.value_mut().cells.splice(
- i..i,
- (0..*subdivisions).map(|subdivision_idx| {
- Entry::active(Cell {
- id: self.id.derive_id("cell", subdivision_idx),
- duration,
- })
- }),
- );
- }
- }
-
- Ok(())
- }
-}
-
-#[derive(Debug, Clone, PartialEq, Eq)]
-pub enum OpData {
- CreateGrid {
- rows: usize,
- base_cells_per_row: usize,
- },
-
- ChangeSubdivisions {
- grid_id: DerivedId,
- row_id: DerivedId,
- start_cell_id: DerivedId,
- end_cell_id: DerivedId,
- subdivisions: usize,
- },
-}
-
-#[derive(Default, Debug)]
-pub struct Doc {
- grids: Vec<Entry<Grid>>,
-}
-
-#[derive(Debug)]
-pub enum Entry<T: Default> {
- Active { value: T },
- Deleted { value: T, deleted_by: Uuid },
-}
-
-impl<T: Default> Default for Entry<T> {
- fn default() -> Self {
- Self::Active {
- value: T::default(),
- }
- }
-}
-
-impl<T: Default> Entry<T> {
- pub fn active(value: T) -> Self {
- Self::Active { value }
- }
-
- pub fn is_active(&self) -> bool {
- matches!(self, Self::Active { .. })
- }
-
- pub fn is_active_and(&self, f: impl FnOnce(&T) -> bool) -> bool {
- match self {
- Self::Active { value } => f(value),
- _ => false,
- }
- }
-
- pub fn value(&self) -> &T {
- match self {
- Self::Active { value } => value,
- Self::Deleted { value, .. } => value,
- }
- }
-
- pub fn value_mut(&mut self) -> &mut T {
- match self {
- Self::Active { value } => value,
- Self::Deleted { value, .. } => value,
- }
- }
-
- pub fn delete(&mut self, op_id: Uuid) {
- if matches!(self, Self::Active { .. }) {
- *self = match mem::take(self) {
- Self::Active { value } => Self::Deleted {
- value,
- deleted_by: op_id,
- },
- _ => unreachable!(),
- }
- }
- }
-}
-
-#[derive(Debug, Default)]
-pub struct Grid {
- id: DerivedId,
- rows: Vec<Entry<Row>>,
-}
-
-#[derive(Debug, Default)]
-pub struct Row {
- id: DerivedId,
- cells: Vec<Entry<Cell>>,
-}
-
-#[derive(Debug, Default)]
-pub struct Cell {
- id: DerivedId,
- duration: Ratio<u32>,
-}
-
-#[derive(PartialEq, Eq, Debug, Clone, Copy, Default)]
-pub struct DerivedId {
- // TODO These IDs can be interned on the Doc
- id: Uuid,
- tag: &'static str,
- index: usize,
-}
-
-impl Display for DerivedId {
- fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
- write!(f, "{}:{}:{}", self.id, self.tag, self.index)
- }
-}
-
-trait DerivableId {
- fn derive_id(&self, tag: &'static str, index: usize) -> DerivedId;
-}
-
-impl DerivableId for Uuid {
- fn derive_id(&self, tag: &'static str, index: usize) -> DerivedId {
- DerivedId {
- id: self.clone(),
- tag,
- index,
- }
- }
-}
-
#[cfg(test)]
mod tests {
+ use crate::op::CreateGrid;
+
use super::*;
#[test]
fn test() {
- let actor_id = Uuid::now_v7();
+ let alice = Uuid::now_v7();
- let mut state = State::from_ops(
- &actor_id,
- &vec![OpData::CreateGrid {
+ let mut state = State::default();
+
+ state.append_op(
+ &alice,
+ OpKind::CreateGrid(CreateGrid {
rows: 4,
base_cells_per_row: 16,
- }],
+ }),
);
- {
- let doc = state.realize().unwrap();
- let grid = doc.grids[0].value();
- let row = grid.rows[0].value();
-
- assert_eq!(doc.grids.len(), 1);
- assert_eq!(grid.rows.len(), 4);
- assert_eq!(row.cells.len(), 16);
-
- state.append_op(
- &actor_id,
- OpData::ChangeSubdivisions {
- grid_id: grid.id,
- row_id: row.id,
- start_cell_id: row.cells[0].value().id,
- end_cell_id: row.cells[3].value().id,
- subdivisions: 3,
- },
- );
- }
-
- {
- let doc = state.realize().unwrap();
- let grid = doc.grids[0].value();
- let row = grid.rows[0].value();
-
- assert_eq!(doc.grids.len(), 1);
- assert_eq!(grid.rows.len(), 4);
- assert_eq!(row.cells.len(), 19);
-
- assert_eq!(
- vec![
- true, true, true, false, false, false, false, true, true, true, true, true,
- true, true, true, true, true, true, true
- ],
- row.cells.iter().map(|e| e.is_active()).collect::<Vec<_>>()
- );
- }
+ let doc = state.realize().unwrap();
+ let grid = doc.grids.first().unwrap();
+ assert_eq!(grid.rows.len(), 4);
}
}