diff --git a/.thumbs.yml b/.thumbs.yml new file mode 100644 index 0000000..f28fff7 --- /dev/null +++ b/.thumbs.yml @@ -0,0 +1,11 @@ +minimum_reviewers: 2 +merge: true +build_steps: + - make clean + - make deps + - make compile + - make test + - make xref + - make dialyzer +org_mode: true +timeout: 1800 diff --git a/README.md b/README.md index 7b01d3a..2b87f53 100644 --- a/README.md +++ b/README.md @@ -2,7 +2,7 @@ ## WHAT? -A set of state based CRDTs implemented in Erlang and on the paper - +A set of state-based CRDTs implemented in Erlang, and based on the paper - [A Comprehensive study of Convergent and Commutative Replicated Data Types] (http://hal.inria.fr/docs/00/55/55/88/PDF/techreport.pdf) - which you may find an interesting read. @@ -11,6 +11,6 @@ may find an interesting read. Riak is getting CRDT support built in, so we've archived the old riak_dt in the branch `prototype`. No further work will be done on -it. This repo is now a resuable library of Quickcheck tested +it. This repo is now a reusable library of QuickCheck tested implementations of CRDTs. diff --git a/include/riak_dt.hrl b/include/riak_dt.hrl index ec07097..708941d 100644 --- a/include/riak_dt.hrl +++ b/include/riak_dt.hrl @@ -1,7 +1,7 @@ -ifdef(namespaced_types). -type riak_dt_dict() :: dict:dict(). --type riak_dt_set() :: sets:set(). -else. -type riak_dt_dict() :: dict(). --type riak_dt_set() :: set(). -endif. + +-type riak_dt_set() :: ordsets:ordset(_). diff --git a/src/riak_dt.app.src b/src/riak_dt.app.src index 09146a0..4e0a847 100644 --- a/src/riak_dt.app.src +++ b/src/riak_dt.app.src @@ -1,7 +1,7 @@ %% -*- erlang -*- {application, riak_dt, [ - {description, ""}, + {description, "riak CRDT datatypes"}, {vsn, git}, {registered, []}, {applications, [ @@ -14,5 +14,8 @@ %% indicates the level of compression. Higher number means more %% compression, but more time to compress. In tests so far 1 has %% been enough for CRDTs - {env, [{binary_compression, 1}]} + {env, [{binary_compression, 1}]}, + {maintainers, ["Basho", "Heinz N. Gies"]}, + {licenses, ["Apache"]}, + {links, [{"Github", "https://github.com/basho/riak_dt"}]} ]}. diff --git a/src/riak_dt_gset.erl b/src/riak_dt_gset.erl index fb54ff6..dcb26e6 100644 --- a/src/riak_dt_gset.erl +++ b/src/riak_dt_gset.erl @@ -3,7 +3,7 @@ %% %% riak_dt_gset: A convergent, replicated, state based grow only set %% -%% Copyright (c) 2007-2012 Basho Technologies, Inc. All Rights Reserved. +%% Copyright (c) 2007-2016 Basho Technologies, Inc. All Rights Reserved. %% %% This file is provided to you under the Apache License, %% Version 2.0 (the "License"); you may not use this file @@ -58,7 +58,7 @@ -type binary_gset() :: binary(). %% A binary that from_binary/1 will operate on. --type gset_op() :: {add, member()}. +-type gset_op() :: {add, member()} | {add_all, members()}. -type actor() :: riak_dt:actor(). @@ -78,10 +78,21 @@ value(GSet) -> value(_, GSet) -> value(GSet). +-spec apply_ops([gset_op()], actor(), gset()) -> + {ok, gset()}. +apply_ops([], _Actor, GSet) -> + {ok, GSet}; +apply_ops([Op | Rest], Actor, GSet) -> + {ok, GSet2} = update(Op, Actor, GSet), + apply_ops(Rest, Actor, GSet2). -spec update(gset_op(), actor(), gset()) -> {ok, gset()}. update({add, Elem}, _Actor, GSet) -> {ok, ordsets:add_element(Elem, GSet)}; + +update({update, Ops}, _Actor, GSet) -> +apply_ops(Ops,_Actor,GSet); + update({add_all, Elems}, _Actor, GSet) -> {ok, ordsets:union(GSet, ordsets:from_list(Elems))}. @@ -105,21 +116,27 @@ equal(GSet1, GSet2) -> -include("riak_dt_tags.hrl"). -define(TAG, ?DT_GSET_TAG). -define(V1_VERS, 1). +-define(V2_VERS, 2). -spec to_binary(gset()) -> binary_gset(). to_binary(GSet) -> - <>. + %%<>. + {ok, B} = to_binary(?V2_VERS, GSet), + B. -spec to_binary(Vers :: pos_integer(), gset()) -> {ok, binary()} | ?UNSUPPORTED_VERSION. -to_binary(1, S) -> - B = to_binary(S), - {ok, B}; -to_binary(Vers, _S) -> +to_binary(?V1_VERS, S) -> + {ok, <>}; +to_binary(?V2_VERS, S) -> + {ok, <>}; +to_binary(Vers, _S0) -> ?UNSUPPORTED_VERSION(Vers). -spec from_binary(binary()) -> {ok, gset()} | ?UNSUPPORTED_VERSION | ?INVALID_BINARY. from_binary(<>) -> {ok, riak_dt:from_binary(Bin)}; +from_binary(<>) -> + {ok, riak_dt:from_binary(Bin)}; from_binary(<>) -> ?UNSUPPORTED_VERSION(Vers); from_binary(_B) -> @@ -157,6 +174,11 @@ stat_test() -> ?assertEqual(15, stat(max_element_size, S1)), ?assertEqual(undefined, stat(actor_count, S1)). +to_binary_test() -> + GSet = update({add, <<"foo">>}, undefined_actor, riak_dt_gset:new()), + Bin = riak_dt_gset:to_binary(GSet), + ?assertMatch( <<82:8/integer, ?V2_VERS:8/integer, _/binary>> , Bin). + -ifdef(EQC). eqc_value_test_() -> crdt_statem_eqc:run(?MODULE, 1000). diff --git a/src/riak_dt_map.erl b/src/riak_dt_map.erl index 82162b6..7b1ec5b 100644 --- a/src/riak_dt_map.erl +++ b/src/riak_dt_map.erl @@ -483,7 +483,7 @@ merge({LHSClock, LHSEntries, LHSDeferred}, {RHSClock, RHSEntries, RHSDeferred}) %% only. -spec filter_unique(riak_dt_set(), entries(), riak_dt_vclock:vclock(), entries()) -> entries(). filter_unique(FieldSet, Entries, Clock, Acc) -> - sets:fold(fun({_Name, Type}=Field, Keep) -> + ordsets:fold(fun({_Name, Type}=Field, Keep) -> {Dots, TS} = ?DICT:fetch(Field, Entries), KeepDots = ?DICT:filter(fun(Dot, _CRDT) -> is_dot_unseen(Dot, Clock) @@ -518,7 +518,7 @@ is_dot_unseen(Dot, Clock) -> %% @doc Get the keys from an ?DICT as a ?SET -spec key_set(riak_dt_dict()) -> riak_dt_set(). key_set(Dict) -> - sets:from_list(?DICT:fetch_keys(Dict)). + ordsets:from_list(?DICT:fetch_keys(Dict)). %% @doc break the keys from an two ?DICTs out into three ?SETs, the %% common keys, those unique to one, and those unique to the other. @@ -526,22 +526,22 @@ key_set(Dict) -> key_sets(LHS, RHS) -> LHSet = key_set(LHS), RHSet = key_set(RHS), - {sets:intersection(LHSet, RHSet), - sets:subtract(LHSet, RHSet), - sets:subtract(RHSet, LHSet)}. + {ordsets:intersection(LHSet, RHSet), + ordsets:subtract(LHSet, RHSet), + ordsets:subtract(RHSet, LHSet)}. %% @private for a set of dots (that are unique to one side) decide %% whether to keep, or drop each. -spec filter_dots(riak_dt_set(), riak_dt_dict(), riak_dt_vclock:vclock()) -> entries(). filter_dots(Dots, CRDTs, Clock) -> - DotsToKeep = sets:filter(fun(Dot) -> + DotsToKeep = ordsets:filter(fun(Dot) -> is_dot_unseen(Dot, Clock) end, Dots), ?DICT:filter(fun(Dot, _CRDT) -> - sets:is_element(Dot, DotsToKeep) + ordsets:is_element(Dot, DotsToKeep) end, CRDTs). @@ -549,13 +549,13 @@ filter_dots(Dots, CRDTs, Clock) -> %% tombstone per field. If a dot is on both sides, keep it. If it is %% only on one side, drop it if dominated by the otherside's clock. merge_common(FieldSet, LHS, RHS, LHSClock, RHSClock, Acc) -> - sets:fold(fun({_, Type}=Field, Keep) -> + ordsets:fold(fun({_, Type}=Field, Keep) -> {LHSDots, LHTS} = ?DICT:fetch(Field, LHS), {RHSDots, RHTS} = ?DICT:fetch(Field, RHS), {CommonDots, LHSUniqe, RHSUnique} = key_sets(LHSDots, RHSDots), TS = Type:merge(RHTS, LHTS), - CommonSurviving = sets:fold(fun(Dot, Common) -> + CommonSurviving = ordsets:fold(fun(Dot, Common) -> L = ?DICT:fetch(Dot, LHSDots), ?DICT:store(Dot, L, Common) end, diff --git a/src/riak_dt_od_flag.erl b/src/riak_dt_od_flag.erl index b028f99..4a02754 100644 --- a/src/riak_dt_od_flag.erl +++ b/src/riak_dt_od_flag.erl @@ -128,12 +128,12 @@ merge({LHSClock, LHSDots, LHSDeferred}, {RHSClock, RHSDots, RHSDeferred}) -> %% drop all the RHS dots that dominated by the LHS clock %% keep all the dots that are in both %% save value as value of flag - CommonDots = sets:intersection(sets:from_list(LHSDots), sets:from_list(RHSDots)), - LHSUnique = sets:to_list(sets:subtract(sets:from_list(LHSDots), CommonDots)), - RHSUnique = sets:to_list(sets:subtract(sets:from_list(RHSDots), CommonDots)), + CommonDots = ordsets:intersection(ordsets:from_list(LHSDots), ordsets:from_list(RHSDots)), + LHSUnique = ordsets:to_list(ordsets:subtract(ordsets:from_list(LHSDots), CommonDots)), + RHSUnique = ordsets:to_list(ordsets:subtract(ordsets:from_list(RHSDots), CommonDots)), LHSKeep = riak_dt_vclock:subtract_dots(LHSUnique, RHSClock), RHSKeep = riak_dt_vclock:subtract_dots(RHSUnique, LHSClock), - Flag = riak_dt_vclock:merge([sets:to_list(CommonDots), LHSKeep, RHSKeep]), + Flag = riak_dt_vclock:merge([ordsets:to_list(CommonDots), LHSKeep, RHSKeep]), Deferred = ordsets:union(LHSDeferred, RHSDeferred), apply_deferred(NewClock, Flag, Deferred). diff --git a/src/riak_dt_orswot.erl b/src/riak_dt_orswot.erl index 90e0b4a..d7ca388 100644 --- a/src/riak_dt_orswot.erl +++ b/src/riak_dt_orswot.erl @@ -294,50 +294,56 @@ merge({Clock, Entries, Deferred}, {Clock, Entries, Deferred}) -> {Clock, Entries, Deferred}; merge({LHSClock, LHSEntries, LHSDeferred}, {RHSClock, RHSEntries, RHSDeferred}) -> Clock = riak_dt_vclock:merge([LHSClock, RHSClock]), - {Keep, RHSElems} = ?DICT:fold(fun(Elem, Dots, {Acc, RHSRemaining}) -> - case ?DICT:find(Elem, RHSEntries) of - error -> - %% Only on left, trim dots and keep - %% surviving - case riak_dt_vclock:subtract_dots(Dots, RHSClock) of + {Keep, RHSElems} = + ?DICT:fold(fun(Elem, Dots, {Acc, RHSRemaining}) -> + case ?DICT:find(Elem, RHSEntries) of + error -> + %% Only on left, trim dots and keep surviving + case riak_dt_vclock:subtract_dots(Dots, RHSClock) of + [] -> + %% Removed + {Acc, RHSRemaining}; + NewDots -> + {?DICT:store(Elem, NewDots, Acc), RHSRemaining} + end; + {ok, RHSDots} -> + %% On both sides + CommonDots = ordsets:intersection( + ordsets:from_list(Dots), + ordsets:from_list(RHSDots)), + LHSUnique = ordsets:to_list( + ordsets:subtract(ordsets:from_list(Dots), + CommonDots)), + RHSUnique = ordsets:to_list( + ordsets:subtract(ordsets:from_list(RHSDots), + CommonDots)), + LHSKeep = riak_dt_vclock:subtract_dots(LHSUnique, RHSClock), + RHSKeep = riak_dt_vclock:subtract_dots(RHSUnique, LHSClock), + V = riak_dt_vclock:merge([ordsets:to_list(CommonDots), LHSKeep, RHSKeep]), + %% Perfectly possible that an item in both sets should be dropped + case V of + [] -> + %% Removed from both sides + {Acc, ?DICT:erase(Elem, RHSRemaining)}; + _ -> + {?DICT:store(Elem, V, Acc), ?DICT:erase(Elem, RHSRemaining)} + end + end + end, + {?DICT:new(), RHSEntries}, + LHSEntries), + %%Now what about the stuff left from the right hand side? Do the same to that! + Entries = ?DICT:fold(fun(Elem, Dots, Acc) -> + case riak_dt_vclock:subtract_dots(Dots, LHSClock) of [] -> %% Removed - {Acc, RHSRemaining}; + Acc; NewDots -> - {?DICT:store(Elem, NewDots, Acc), RHSRemaining} - end; - {ok, RHSDots} -> - %% On both sides - CommonDots = sets:intersection(sets:from_list(Dots), sets:from_list(RHSDots)), - LHSUnique = sets:to_list(sets:subtract(sets:from_list(Dots), CommonDots)), - RHSUnique = sets:to_list(sets:subtract(sets:from_list(RHSDots), CommonDots)), - LHSKeep = riak_dt_vclock:subtract_dots(LHSUnique, RHSClock), - RHSKeep = riak_dt_vclock:subtract_dots(RHSUnique, LHSClock), - V = riak_dt_vclock:merge([sets:to_list(CommonDots), LHSKeep, RHSKeep]), - %% Perfectly possible that an item in both sets should be dropped - case V of - [] -> - %% Removed from both sides - {Acc, ?DICT:erase(Elem, RHSRemaining)}; - _ -> - {?DICT:store(Elem, V, Acc), ?DICT:erase(Elem, RHSRemaining)} + ?DICT:store(Elem, NewDots, Acc) end - end - end, - {?DICT:new(), RHSEntries}, - LHSEntries), - %%Now what about the stuff left from the right hand side? Do the same to that! - Entries = ?DICT:fold(fun(Elem, Dots, Acc) -> - case riak_dt_vclock:subtract_dots(Dots, LHSClock) of - [] -> - %% Removed - Acc; - NewDots -> - ?DICT:store(Elem, NewDots, Acc) - end - end, - Keep, - RHSElems), + end, + Keep, + RHSElems), Deffered = merge_deferred(LHSDeferred, RHSDeferred), apply_deferred(Clock, Entries, Deffered). diff --git a/test/riak_dt_gset_tests.erl b/test/riak_dt_gset_tests.erl new file mode 100644 index 0000000..ffe9980 --- /dev/null +++ b/test/riak_dt_gset_tests.erl @@ -0,0 +1,39 @@ +%% ------------------------------------------------------------------- +%% +%% riak_dt_gset_test: trivial assertive tests to illustrate module behavior +%% +%% Copyright (c) 2007-2014 Basho Technologies, Inc. All Rights Reserved. +%% +%% This file is provided to you under the Apache License, +%% Version 2.0 (the "License"); you may not use this file +%% except in compliance with the License. You may obtain +%% a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%%% Unless required by applicable law or agreed to in writing, +%% software distributed under the License is distributed on an +%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +%% KIND, either express or implied. See the License for the +%% specific language governing permissions and limitations +%% under the License. +%% +%% ------------------------------------------------------------------- +-module(riak_dt_gset_tests). + +-include_lib("eunit/include/eunit.hrl"). +-import(riak_dt_gset, [update/3]). + +-define(ACTOR_VAL, undefined). +-define(SINGLE_VAL, <<"binarytemple">>). +-define(FRANK_BOOTH, [<<"frank">>, <<"booth">>]). +-define(BOOTH_FRANK, [<<"booth">>, <<"frank">>]). + +update_add_test() -> + N = riak_dt_gset:new(), + ?assertEqual({ok, [?SINGLE_VAL]}, update({add, ?SINGLE_VAL}, ?ACTOR_VAL, N)) +. + +update_add_all_test() -> + ?assertEqual({ok, ?BOOTH_FRANK}, update({add_all, ?FRANK_BOOTH}, ?ACTOR_VAL, riak_dt_gset:new())), + ?assertNotEqual({ok, ?FRANK_BOOTH}, update({add_all, ?FRANK_BOOTH}, ?ACTOR_VAL, riak_dt_gset:new())) +.