diff options
| author | Josh Kingsley <josh@joshkingsley.me> | 2025-11-23 16:03:53 +0200 |
|---|---|---|
| committer | Josh Kingsley <josh@joshkingsley.me> | 2025-11-23 16:03:53 +0200 |
| commit | be5cae1beb714680f56d7598b8f5c4816000cb1d (patch) | |
| tree | 7722b7a4970d36420a45a7bc9ab8835cddddf389 /crdt | |
| parent | 2418583618ece59fd28b5b04bedffccdae11a170 (diff) | |
feat(crdt): re-factor into modules
Diffstat (limited to 'crdt')
| -rw-r--r-- | crdt/src/doc.rs | 101 | ||||
| -rw-r--r-- | crdt/src/lib.rs | 359 | ||||
| -rw-r--r-- | crdt/src/op.rs | 21 |
3 files changed, 154 insertions, 327 deletions
diff --git a/crdt/src/doc.rs b/crdt/src/doc.rs new file mode 100644 index 0000000..bbd24d0 --- /dev/null +++ b/crdt/src/doc.rs @@ -0,0 +1,101 @@ +use thiserror::Error; +use uuid::Uuid; + +use crate::op::{ChangeSubdivisions, CreateGrid, Op, OpKind}; + +/// An deterministically derived ID, e.g. a grid ID derived from the +/// op ID which creates it. +pub struct DerivedId { + base: String, + tag: &'static str, + index: usize, +} + +impl ToString for DerivedId { + fn to_string(&self) -> String { + format!("{}:{}={}", self.base, self.tag, self.index) + } +} + +trait DerivableId { + fn derive_id(&self, tag: &'static str, index: usize) -> DerivedId; +} + +impl DerivableId for Uuid { + fn derive_id(&self, tag: &'static str, index: usize) -> DerivedId { + DerivedId { + base: self.to_string(), + tag, + index, + } + } +} + +impl DerivableId for DerivedId { + fn derive_id(&self, tag: &'static str, index: usize) -> DerivedId { + DerivedId { + base: self.to_string(), + tag, + index, + } + } +} + +#[derive(Default)] +pub struct Doc { + pub(crate) grids: Vec<Grid>, +} + +pub struct Grid { + pub(crate) id: DerivedId, + pub(crate) rows: Vec<Row>, +} + +pub struct Row { + pub(crate) id: DerivedId, + pub(crate) cells: Vec<Cell>, +} + +pub struct Cell { + pub(crate) id: DerivedId, +} + +#[derive(Error, Debug)] +pub enum ApplyOpError {} + +pub type ApplyOpResult = Result<(), ApplyOpError>; + +impl Doc { + pub fn apply_op(&mut self, op: &Op) -> ApplyOpResult { + match &op.kind { + OpKind::CreateGrid(data) => apply_create_grid(self, &op.id, data), + OpKind::ChangeSubdivisions(data) => apply_change_subdivisions(self, data), + } + } +} + +fn apply_create_grid(doc: &mut Doc, op_id: &Uuid, data: &CreateGrid) -> ApplyOpResult { + let grid_id = op_id.derive_id("grid", 0); + + let rows = (0..data.rows) + .map(|row_idx| { + let row_id = grid_id.derive_id("row", row_idx); + + let cells = (0..data.base_cells_per_row) + .map(|cell_idx| Cell { + id: row_id.derive_id("cell", cell_idx), + }) + .collect(); + + Row { id: row_id, cells } + }) + .collect(); + + doc.grids.push(Grid { id: grid_id, rows }); + + Ok(()) +} + +fn apply_change_subdivisions(doc: &mut Doc, data: &ChangeSubdivisions) -> ApplyOpResult { + todo!() +} diff --git a/crdt/src/lib.rs b/crdt/src/lib.rs index 50ccd20..977763f 100644 --- a/crdt/src/lib.rs +++ b/crdt/src/lib.rs @@ -1,76 +1,43 @@ -mod vector_clock; - -use std::{collections::BTreeSet, fmt::Display, mem}; +//! This crate defines the distributed state management system for +//! Notive. The state is an operation-based CRDT, which given the same +//! set of operations on any client, will realize the same state. -use num_rational::Ratio; +use thiserror::Error; use uuid::Uuid; -use crate::vector_clock::VectorClock; +use crate::{ + doc::{ApplyOpError, Doc}, + op::{Op, OpKind}, + vector_clock::VectorClock, +}; -use thiserror::Error; +mod doc; +mod op; +mod vector_clock; #[derive(Error, Debug)] pub enum Error { - #[error("object with ID {0} not found")] - NotFound(DerivedId), + #[error("error while realizing state")] + RealizeError(#[from] ApplyOpError), } -#[derive(Default, Clone)] +#[derive(Default)] pub struct State { ops: Vec<Op>, } impl State { - pub fn from_ops(actor_id: &Uuid, data: &[OpData]) -> Self { - let mut clock = VectorClock::new(); - - let ops = data - .iter() - .cloned() - .map(|data| { - clock = clock.inc(&actor_id); - - Op { - id: Uuid::now_v7(), - clock: clock.clone(), - data, - } - }) - .collect(); - - State { ops } - } - - pub fn append_op(&mut self, actor_id: &Uuid, data: OpData) { - // Increment the last clock for the provided actor + pub fn append_op(&mut self, actor_id: &Uuid, kind: OpKind) { let clock = self .ops .last() - .map(|Op { clock, .. }| clock.inc(actor_id)) - // For an empty document, initialize a new clock - .unwrap_or_else(|| VectorClock::default().inc(actor_id)); + .map(|op| op.clock.inc(actor_id)) + .unwrap_or_else(|| VectorClock::new().inc(actor_id)); self.ops.push(Op { id: Uuid::now_v7(), clock, - data, - }); - } - - pub fn merge(&mut self, other: &State) { - let op_ids: BTreeSet<Uuid> = self.ops.iter().map(|op| op.id).collect(); - - for op in &other.ops { - if !op_ids.contains(&op.id) { - self.ops.push(op.clone()); - } - } - - self.ops.sort_by(|op1, op2| { - op1.clock - .partial_cmp(&op2.clock) - // Tie-breaker: yse op ID - .unwrap_or_else(|| op1.id.cmp(&op2.id)) + kind, }); } @@ -78,297 +45,35 @@ impl State { let mut doc = Doc::default(); for op in &self.ops { - op.apply(&mut doc)?; + doc.apply_op(op)?; } Ok(doc) } } -#[derive(Debug, Clone, PartialEq, Eq)] -pub struct Op { - id: Uuid, - clock: VectorClock, - data: OpData, -} - -impl Op { - fn apply(&self, doc: &mut Doc) -> Result<(), Error> { - match &self.data { - OpData::CreateGrid { - rows, - base_cells_per_row, - } => { - let duration: Ratio<u32> = Ratio::new(1, *base_cells_per_row as u32); - - let rows = (0..*rows) - .map(|row_idx| { - let cells = (0..*base_cells_per_row) - .map(|cell_idx| { - Entry::active(Cell { - id: self.id.derive_id("cell", cell_idx), - duration, - }) - }) - .collect(); - - Entry::active(Row { - id: self.id.derive_id("row", row_idx), - cells, - }) - }) - .collect(); - - doc.grids.push(Entry::active(Grid { - id: self.id.derive_id("grid", 0), - rows, - })); - } - - OpData::ChangeSubdivisions { - grid_id, - row_id, - start_cell_id, - end_cell_id, - subdivisions, - } => { - let grid = doc - .grids - .iter_mut() - .find(|entry| entry.is_active_and(|g| g.id == *grid_id)) - .ok_or(Error::NotFound(grid_id.clone()))?; - - let row = grid - .value_mut() - .rows - .iter_mut() - .find(|entry| entry.is_active_and(|r| r.id == *row_id)) - .ok_or(Error::NotFound(row_id.clone()))?; - - let start_cell_idx = row - .value() - .cells - .iter() - .position(|entry| entry.is_active_and(|c| c.id == *start_cell_id)) - .ok_or(Error::NotFound(start_cell_id.clone()))?; - - let end_cell_idx = row - .value() - .cells - .iter() - .position(|entry| entry.is_active_and(|c| c.id == *end_cell_id)) - .ok_or(Error::NotFound(end_cell_id.clone()))?; - - let (i, j) = if start_cell_idx <= end_cell_idx { - (start_cell_idx, end_cell_idx) - } else { - (end_cell_idx, start_cell_idx) - }; - - for entry in row.value_mut().cells[i..(j + 1)].iter_mut() { - entry.delete(self.id); - } - - let span_duration: Ratio<u32> = row.value().cells[i..j + 1] - .iter() - .map(|cell| cell.value().duration) - .sum(); - - let duration: Ratio<u32> = span_duration / *subdivisions as u32; - - row.value_mut().cells.splice( - i..i, - (0..*subdivisions).map(|subdivision_idx| { - Entry::active(Cell { - id: self.id.derive_id("cell", subdivision_idx), - duration, - }) - }), - ); - } - } - - Ok(()) - } -} - -#[derive(Debug, Clone, PartialEq, Eq)] -pub enum OpData { - CreateGrid { - rows: usize, - base_cells_per_row: usize, - }, - - ChangeSubdivisions { - grid_id: DerivedId, - row_id: DerivedId, - start_cell_id: DerivedId, - end_cell_id: DerivedId, - subdivisions: usize, - }, -} - -#[derive(Default, Debug)] -pub struct Doc { - grids: Vec<Entry<Grid>>, -} - -#[derive(Debug)] -pub enum Entry<T: Default> { - Active { value: T }, - Deleted { value: T, deleted_by: Uuid }, -} - -impl<T: Default> Default for Entry<T> { - fn default() -> Self { - Self::Active { - value: T::default(), - } - } -} - -impl<T: Default> Entry<T> { - pub fn active(value: T) -> Self { - Self::Active { value } - } - - pub fn is_active(&self) -> bool { - matches!(self, Self::Active { .. }) - } - - pub fn is_active_and(&self, f: impl FnOnce(&T) -> bool) -> bool { - match self { - Self::Active { value } => f(value), - _ => false, - } - } - - pub fn value(&self) -> &T { - match self { - Self::Active { value } => value, - Self::Deleted { value, .. } => value, - } - } - - pub fn value_mut(&mut self) -> &mut T { - match self { - Self::Active { value } => value, - Self::Deleted { value, .. } => value, - } - } - - pub fn delete(&mut self, op_id: Uuid) { - if matches!(self, Self::Active { .. }) { - *self = match mem::take(self) { - Self::Active { value } => Self::Deleted { - value, - deleted_by: op_id, - }, - _ => unreachable!(), - } - } - } -} - -#[derive(Debug, Default)] -pub struct Grid { - id: DerivedId, - rows: Vec<Entry<Row>>, -} - -#[derive(Debug, Default)] -pub struct Row { - id: DerivedId, - cells: Vec<Entry<Cell>>, -} - -#[derive(Debug, Default)] -pub struct Cell { - id: DerivedId, - duration: Ratio<u32>, -} - -#[derive(PartialEq, Eq, Debug, Clone, Copy, Default)] -pub struct DerivedId { - // TODO These IDs can be interned on the Doc - id: Uuid, - tag: &'static str, - index: usize, -} - -impl Display for DerivedId { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "{}:{}:{}", self.id, self.tag, self.index) - } -} - -trait DerivableId { - fn derive_id(&self, tag: &'static str, index: usize) -> DerivedId; -} - -impl DerivableId for Uuid { - fn derive_id(&self, tag: &'static str, index: usize) -> DerivedId { - DerivedId { - id: self.clone(), - tag, - index, - } - } -} - #[cfg(test)] mod tests { + use crate::op::CreateGrid; + use super::*; #[test] fn test() { - let actor_id = Uuid::now_v7(); + let alice = Uuid::now_v7(); - let mut state = State::from_ops( - &actor_id, - &vec![OpData::CreateGrid { + let mut state = State::default(); + + state.append_op( + &alice, + OpKind::CreateGrid(CreateGrid { rows: 4, base_cells_per_row: 16, - }], + }), ); - { - let doc = state.realize().unwrap(); - let grid = doc.grids[0].value(); - let row = grid.rows[0].value(); - - assert_eq!(doc.grids.len(), 1); - assert_eq!(grid.rows.len(), 4); - assert_eq!(row.cells.len(), 16); - - state.append_op( - &actor_id, - OpData::ChangeSubdivisions { - grid_id: grid.id, - row_id: row.id, - start_cell_id: row.cells[0].value().id, - end_cell_id: row.cells[3].value().id, - subdivisions: 3, - }, - ); - } - - { - let doc = state.realize().unwrap(); - let grid = doc.grids[0].value(); - let row = grid.rows[0].value(); - - assert_eq!(doc.grids.len(), 1); - assert_eq!(grid.rows.len(), 4); - assert_eq!(row.cells.len(), 19); - - assert_eq!( - vec![ - true, true, true, false, false, false, false, true, true, true, true, true, - true, true, true, true, true, true, true - ], - row.cells.iter().map(|e| e.is_active()).collect::<Vec<_>>() - ); - } + let doc = state.realize().unwrap(); + let grid = doc.grids.first().unwrap(); + assert_eq!(grid.rows.len(), 4); } } diff --git a/crdt/src/op.rs b/crdt/src/op.rs new file mode 100644 index 0000000..8f5f8b5 --- /dev/null +++ b/crdt/src/op.rs @@ -0,0 +1,21 @@ +use uuid::Uuid; + +use crate::vector_clock::VectorClock; + +pub struct Op { + pub(crate) id: Uuid, + pub(crate) clock: VectorClock, + pub(crate) kind: OpKind, +} + +pub enum OpKind { + CreateGrid(CreateGrid), + ChangeSubdivisions(ChangeSubdivisions), +} + +pub struct CreateGrid { + pub(crate) rows: usize, + pub(crate) base_cells_per_row: usize, +} + +pub struct ChangeSubdivisions {} |
