Skip to content

Commit

Permalink
feat: assemble runtime crdt state (#408)
Browse files Browse the repository at this point in the history
* feat: calc clock in update refs

* feat: add refs

* feat: impl store

* feat: add utils for doc store

* feat: add state vector & self check

* feat: doc integrate skeleton

* feat: new id

* test: `get_state` & `get_state_vector`

* test: `add_item` & `get_item`

* feat: init doc struct

* feat: integrate part2

* feat: crdt trait

* chore: move update_missing_sv

* feat: add split item

* feat: doc integrate part3

* chore: ignore dead code temporarily

* chore: fix clippy lint
  • Loading branch information
darkskygit authored Apr 28, 2023
1 parent d0f1b1c commit 154546a
Show file tree
Hide file tree
Showing 15 changed files with 868 additions and 139 deletions.
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions libs/jwst-codec/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,14 @@ license = "AGPL-3.0-only"
[dependencies]
bitvec = "1.0.1"
byteorder = "1.4.3"
nanoid = "0.4.0"
nom = "7.1.3"
rand = "0.8.5"
serde_json = "1.0.94"
thiserror = "1.0.40"

# ======= workspace dependencies =======
jwst-logger = { path = "../jwst-logger" }

[dev-dependencies]
criterion = { version = "0.4.0", features = ["html_reports"] }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use nom::{
use super::*;
use std::collections::HashMap;

#[derive(Debug)]
#[derive(Debug, Clone, PartialEq)]
pub enum Any {
Undefined,
Null,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use nom::{
};
use serde_json::Value as JsonValue;

#[derive(Debug)]
#[derive(Debug, Clone, PartialEq)]
pub enum YType {
Array,
Map,
Expand All @@ -17,7 +17,7 @@ pub enum YType {
XmlHook(String),
}

#[derive(Debug)]
#[derive(Debug, Clone, PartialEq)]
pub enum Content {
Deleted(u64),
JSON(Vec<Option<String>>),
Expand All @@ -30,6 +30,27 @@ pub enum Content {
Doc { guid: String, opts: Vec<Any> },
}

impl Content {
pub fn clock_len(&self) -> u64 {
match self {
Content::Deleted(len) => *len,
Content::JSON(strings) => strings.len() as u64,
Content::String(string) => string.len() as u64,
Content::Any(any) => any.len() as u64,
Content::Binary(_)
| Content::Embed(_)
| Content::Format { .. }
| Content::Type(_)
| Content::Doc { .. } => 1,
}
}

pub fn split(&self, diff: u64) -> JwstCodecResult<(Content, Content)> {
// TODO: implement split for other types
Err(JwstCodecError::ContentSplitNotSupport(diff))
}
}

pub fn read_content(input: &[u8], tag_type: u8) -> IResult<&[u8], Content> {
match tag_type {
1 => {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,19 @@
use super::*;

#[derive(Debug)]
#[derive(Debug, Clone, PartialEq)]
pub struct Id {
pub client: u64,
pub clock: u64,
}

impl Id {
pub fn new(client: u64, clock: u64) -> Self {
Self { client, clock }
}
}

pub fn read_item_id(input: &[u8]) -> IResult<&[u8], Id> {
let (tail, client) = read_var_u64(input)?;
let (tail, clock) = read_var_u64(tail)?;
Ok((tail, Id { client, clock }))
Ok((tail, Id::new(client, clock)))
}
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
use super::*;

#[derive(Debug)]
#[derive(Debug, Clone, PartialEq)]
pub enum Parent {
String(String),
Id(Id),
}

#[derive(Debug)]
#[derive(Debug, Clone, PartialEq)]
pub struct Item {
pub info: u8,
pub left_id: Option<Id>,
pub right_id: Option<Id>,
pub parent: Option<Parent>,
Expand All @@ -32,7 +31,6 @@ pub fn read_item(input: &[u8], info: u8, first_5_bit: u8) -> IResult<&[u8], Item
// NOTE: read order must keep the same as the order in yjs
// TODO: this data structure design will break the cpu OOE, need to be optimized
let item = Item {
info,
left_id: if has_left_id {
let (tail, id) = read_item_id(input)?;
input = tail;
Expand Down
20 changes: 20 additions & 0 deletions libs/jwst-codec/src/doc/codec/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
mod any;
mod content;
mod id;
mod item;
mod refs;
mod update;

pub use any::Any;
pub use content::Content;
pub use id::Id;
pub use item::Item;
pub use refs::StructInfo;
pub use update::{read_update, Update};

use super::*;
use any::read_any;
use content::read_content;
use id::read_item_id;
use item::read_item;
use refs::read_client_struct_refs;
177 changes: 177 additions & 0 deletions libs/jwst-codec/src/doc/codec/refs.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
use super::*;
use nom::{multi::count, number::complete::be_u8};
use std::collections::HashMap;

enum RawStructInfo {
GC(u64),
Skip(u64),
Item(Item),
}

struct RawRefs {
client: u64,
refs: Vec<StructInfo>,
}

#[derive(Debug, Clone, PartialEq)]
pub enum StructInfo {
GC { id: Id, len: u64 },
Skip { id: Id, len: u64 },
Item { id: Id, item: Item },
}

impl StructInfo {
pub fn id(&self) -> &Id {
match self {
StructInfo::GC { id, .. } => id,
StructInfo::Skip { id, .. } => id,
StructInfo::Item { id, .. } => id,
}
}

pub fn client_id(&self) -> u64 {
self.id().client
}

pub fn clock(&self) -> u64 {
self.id().clock
}

pub fn len(&self) -> u64 {
match self {
StructInfo::GC { len, .. } => *len,
StructInfo::Skip { len, .. } => *len,
StructInfo::Item { item, .. } => item.content.clock_len(),
}
}

pub fn is_gc(&self) -> bool {
matches!(self, StructInfo::GC { .. })
}

pub fn is_skip(&self) -> bool {
matches!(self, StructInfo::Skip { .. })
}

pub fn is_item(&self) -> bool {
matches!(self, StructInfo::Item { .. })
}

pub fn split_item(&mut self, diff: u64) -> JwstCodecResult<(Self, Self)> {
if let Self::Item { id, item } = self {
let right_id = Id::new(id.client, id.clock + diff);
let (left_content, right_content) = item.content.split(diff)?;

let left_item = StructInfo::Item {
id: id.clone(),
item: Item {
right_id: Some(right_id.clone()),
content: left_content,
..item.clone()
},
};

let right_item = StructInfo::Item {
id: right_id,
item: Item {
left_id: Some(Id::new(id.client, id.clock + diff - 1)),
right_id: item.right_id.clone(),
parent: item.parent.clone(),
parent_sub: item.parent_sub.clone(),
content: right_content,
},
};

Ok((left_item, right_item))
} else {
Err(JwstCodecError::ItemSplitNotSupport)
}
}
}

fn read_struct(input: &[u8]) -> IResult<&[u8], RawStructInfo> {
let (input, info) = be_u8(input)?;
let first_5_bit = info & 0b11111;

match first_5_bit {
0 => {
let (input, len) = read_var_u64(input)?;
Ok((input, RawStructInfo::GC(len)))
}
10 => {
let (input, len) = read_var_u64(input)?;
Ok((input, RawStructInfo::Skip(len)))
}
_ => {
let (input, item) = read_item(input, info, first_5_bit)?;
Ok((input, RawStructInfo::Item(item)))
}
}
}

fn read_refs(input: &[u8]) -> IResult<&[u8], RawRefs> {
let (input, num_of_structs) = read_var_u64(input)?;
let (input, client) = read_var_u64(input)?;
let (input, clock) = read_var_u64(input)?;
let (input, structs) = count(read_struct, num_of_structs as usize)(input)?;
let (refs, _) = structs
.into_iter()
.fold((vec![], clock), |(mut vec, clock), s| {
let id = Id::new(client, clock);
match s {
RawStructInfo::GC(len) => {
vec.push(StructInfo::GC { id, len });
(vec, clock + len)
}
RawStructInfo::Skip(len) => {
vec.push(StructInfo::Skip { id, len });
(vec, clock + len)
}
RawStructInfo::Item(item) => {
let len = item.content.clock_len();
vec.push(StructInfo::Item { id, item });
(vec, clock + len)
}
}
});

Ok((input, RawRefs { client, refs }))
}

pub fn read_client_struct_refs(input: &[u8]) -> IResult<&[u8], HashMap<u64, Vec<StructInfo>>> {
let (input, num_of_updates) = read_var_u64(input)?;
let (tail, updates) = count(read_refs, num_of_updates as usize)(input)?;

Ok((
tail,
updates.into_iter().map(|u| (u.client, u.refs)).collect(),
))
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_struct_info() {
{
let struct_info = StructInfo::GC {
id: Id::new(1, 0),
len: 10,
};
assert_eq!(struct_info.len(), 10);
assert_eq!(struct_info.client_id(), 1);
assert_eq!(struct_info.clock(), 0);
}

{
let struct_info = StructInfo::Skip {
id: Id::new(2, 0),
len: 20,
};
assert_eq!(struct_info.len(), 20);
assert_eq!(struct_info.client_id(), 2);
assert_eq!(struct_info.clock(), 0);
}
}
}
54 changes: 54 additions & 0 deletions libs/jwst-codec/src/doc/codec/update.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
use super::*;
use nom::multi::count;
use std::collections::HashMap;

#[derive(Debug)]
pub struct Delete {
pub clock: u64,
pub clock_len: u64,
}

#[derive(Debug)]
pub struct DeleteSets {
pub client: u64,
pub deletes: Vec<Delete>,
}

#[derive(Debug)]
pub struct Update {
pub delete_sets: Vec<DeleteSets>,
pub structs: HashMap<u64, Vec<StructInfo>>,
}

fn read_delete(input: &[u8]) -> IResult<&[u8], Delete> {
let (tail, clock) = read_var_u64(input)?;
let (tail, clock_len) = read_var_u64(tail)?;
Ok((tail, Delete { clock, clock_len }))
}

fn parse_delete_set(input: &[u8]) -> IResult<&[u8], DeleteSets> {
let (input, client) = read_var_u64(input)?;
let (input, num_of_deletes) = read_var_u64(input)?;
let (tail, deletes) = count(read_delete, num_of_deletes as usize)(input)?;

Ok((tail, DeleteSets { client, deletes }))
}

fn read_delete_set(input: &[u8]) -> IResult<&[u8], Vec<DeleteSets>> {
let (input, num_of_clients) = read_var_u64(input)?;
let (tail, deletes) = count(parse_delete_set, num_of_clients as usize)(input)?;

Ok((tail, deletes))
}

pub fn read_update(input: &[u8]) -> IResult<&[u8], Update> {
let (tail, structs) = read_client_struct_refs(input)?;
let (tail, delete_sets) = read_delete_set(tail)?;
Ok((
tail,
Update {
structs,
delete_sets,
},
))
}
Loading

1 comment on commit 154546a

@vercel
Copy link

@vercel vercel bot commented on 154546a Apr 28, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please sign in to comment.