diff --git a/.gitignore b/.gitignore index e83ee2d..334d26c 100644 --- a/.gitignore +++ b/.gitignore @@ -9,4 +9,5 @@ rel/riak_dt data/* .qc/* .eqc-info -current_counterexample.eqc \ No newline at end of file +current_counterexample.eqc +*.o diff --git a/c_src/Makefile b/c_src/Makefile new file mode 100644 index 0000000..8638fc0 --- /dev/null +++ b/c_src/Makefile @@ -0,0 +1,75 @@ +# Based on c_src.mk from erlang.mk by Loic Hoguin + +.PHONY: all +CURDIR := $(shell pwd) +BASEDIR := $(abspath $(CURDIR)/..) + +PROJECT ?= $(notdir $(BASEDIR)) +PROJECT := $(strip $(PROJECT)) + +ERTS_INCLUDE_DIR ?= $(shell erl -noshell -s init stop -eval "io:format(\"~s/erts-~s/include/\", [code:root_dir(), erlang:system_info(version)]).") +ERL_INTERFACE_INCLUDE_DIR ?= $(shell erl -noshell -s init stop -eval "io:format(\"~s\", [code:lib_dir(erl_interface, include)]).") +ERL_INTERFACE_LIB_DIR ?= $(shell erl -noshell -s init stop -eval "io:format(\"~s\", [code:lib_dir(erl_interface, lib)]).") + +C_SRC_DIR = $(CURDIR) + +# System type and C compiler/flags. + +UNAME_SYS := $(shell uname -s) +ifeq ($(UNAME_SYS), Darwin) + CC ?= cc + CFLAGS ?= -O3 -std=c99 -arch x86_64 -finline-functions -Wall + CXXFLAGS ?= -O3 -arch x86_64 -finline-functions -Wall + LDFLAGS ?= -arch x86_64 -flat_namespace -undefined suppress +else ifeq ($(UNAME_SYS), FreeBSD) + CC ?= cc + CFLAGS ?= -O3 -std=c99 -finline-functions -Wall + CXXFLAGS ?= -O3 -finline-functions -Wall +else ifeq ($(UNAME_SYS), Linux) + CC ?= gcc + CFLAGS ?= -O3 -std=c99 -finline-functions -Wall + CXXFLAGS ?= -O3 -finline-functions -Wall +endif + +CFLAGS += -fPIC -I $(ERTS_INCLUDE_DIR) -I $(ERL_INTERFACE_INCLUDE_DIR) +CXXFLAGS += -fPIC -I $(ERTS_INCLUDE_DIR) -I $(ERL_INTERFACE_INCLUDE_DIR) + +LDLIBS += -L $(ERL_INTERFACE_LIB_DIR) -lerl_interface -lei +LDFLAGS += -shared + +# Verbosity. + +c_verbose_0 = @echo " C " $(?F); +c_verbose = $(c_verbose_$(V)) + +cpp_verbose_0 = @echo " CPP " $(?F); +cpp_verbose = $(cpp_verbose_$(V)) + +link_verbose_0 = @echo " LD " $(@F); +link_verbose = $(link_verbose_$(V)) + +SOURCES := $(shell find $(C_SRC_DIR) -type f \( -name "*.c" -o -name "*.C" -o -name "*.cc" -o -name "*.cpp" \)) +OBJECTS = $(addsuffix .o, $(basename $(SOURCES))) + +COMPILE_C = $(c_verbose) $(CC) $(CFLAGS) $(CPPFLAGS) -c +COMPILE_CPP = $(cpp_verbose) $(CXX) $(CXXFLAGS) $(CPPFLAGS) -c + +RIAK_DT_VCLOCK_SO = $(CURDIR)/../priv/riak_dt_vclock.so +$(RIAK_DT_VCLOCK_SO): riak_dt_vclock.o + @mkdir -p $(BASEDIR)/priv/ + $(link_verbose) $(CC) riak_dt_vclock.o $(LDFLAGS) $(LDLIBS) -o $(RIAK_DT_VCLOCK_SO) + +%.o: %.c + $(COMPILE_C) $(OUTPUT_OPTION) $< + +%.o: %.cc + $(COMPILE_CPP) $(OUTPUT_OPTION) $< + +%.o: %.C + $(COMPILE_CPP) $(OUTPUT_OPTION) $< + +%.o: %.cpp + $(COMPILE_CPP) $(OUTPUT_OPTION) $< + +clean: + @rm -f $(OBJECTS) $(CURDIR)/../priv/riak_dt_vclock.so \ No newline at end of file diff --git a/c_src/riak_dt_vclock.c b/c_src/riak_dt_vclock.c new file mode 100644 index 0000000..009b608 --- /dev/null +++ b/c_src/riak_dt_vclock.c @@ -0,0 +1,286 @@ +#include "erl_nif.h" +#include + + +static ERL_NIF_TERM atom_true; +static ERL_NIF_TERM atom_false; +static ERL_NIF_TERM badarg; + + + +static int load(ErlNifEnv* env, void** priv_data, ERL_NIF_TERM load_info) { + atom_true = enif_make_atom(env, "true"); + atom_false = enif_make_atom(env, "false"); + badarg = enif_make_badarg(env); + return 0; +} + +ERL_NIF_TERM is_sorted_nif(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]) { + ERL_NIF_TERM head1, head2, tail, rest, list = argv[0]; + const ERL_NIF_TERM *tuple1, *tuple2; + int arity1, arity2; + + if(enif_get_list_cell(env, list, &head1, &tail)) { + if(!enif_get_tuple(env, head1, &arity1, &tuple1)) + return badarg; + if(arity1 == 0) + return badarg; + + while(enif_get_list_cell(env, tail, &head2, &rest)) { + if(!enif_get_tuple(env, head2, &arity2, &tuple2)) + return badarg; + if (arity2 == 0) + return badarg; + if (enif_compare(tuple1[0], tuple2[0]) >= 0) + return atom_false; + + tuple1 = tuple2; + arity1 = arity2; + head1 = head2; + tail = rest; + } + return atom_true; + } else if (enif_is_empty_list(env, list)) + return atom_true; + else + return badarg; +} + +static inline ErlNifSInt64 max(ErlNifSInt64 i1, ErlNifSInt64 i2) { + if (i1 > i2) + return i1; + else + return i2; +} + +ERL_NIF_TERM merge2_nif(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]) { + ERL_NIF_TERM headlhs, taillhs, listlhs = argv[0]; + ERL_NIF_TERM headrhs, tailrhs, listrhs = argv[1]; + ERL_NIF_TERM rretlist, retlist = enif_make_list(env, 0); + ERL_NIF_TERM newtuple; + + const ERL_NIF_TERM *tuplelhs, *tuplerhs; + int aritylhs, arityrhs, cmp; + ErlNifSInt64 vallhs, valrhs; + + /* We will never receive an empty list either on the left or right side */ + + while (enif_get_list_cell(env, listlhs, &headlhs, &taillhs) && + enif_get_list_cell(env, listrhs, &headrhs, &tailrhs)) { + + if(!enif_get_tuple(env, headlhs, &aritylhs, &tuplelhs)) + return badarg; + if (aritylhs != 2) + return badarg; + + if(!enif_get_tuple(env, headrhs, &arityrhs, &tuplerhs)) + return badarg; + if (arityrhs != 2) + return badarg; + + cmp = enif_compare(tuplelhs[0], tuplerhs[0]); + if (cmp == 0) { + if (!enif_get_int64(env, tuplelhs[1], &vallhs)) + return badarg; + if (!enif_get_int64(env, tuplerhs[1], &valrhs)) + return badarg; + + newtuple = enif_make_tuple2(env, tuplelhs[0], enif_make_int64(env, max(vallhs, valrhs))); + retlist = enif_make_list_cell(env, newtuple, retlist); + /* Pop both lists */ + listlhs = taillhs; + listrhs = tailrhs; + } else if (cmp < 0) { /* lhs < rhs */ + /* Pop something off LHS to have it try to catch up with RHS */ + retlist = enif_make_list_cell(env, headlhs, retlist); + listlhs = taillhs; + } else if (cmp > 0) { /* lhs > rhs */ + /* Pop something off RHS to have it try to catch up with LHS */ + retlist = enif_make_list_cell(env, headrhs, retlist); + listrhs = tailrhs; + } + } + while (enif_get_list_cell(env, listlhs, &headlhs, &taillhs)) { + retlist = enif_make_list_cell(env, headlhs, retlist); + listlhs = taillhs; + } + + while (enif_get_list_cell(env, listrhs, &headrhs, &tailrhs)) { + retlist = enif_make_list_cell(env, headrhs, retlist); + listrhs = tailrhs; + } + + /* TODO: Maybe have some heuristic here to determine whether to do a reverse here, or in Erlang? */ + enif_make_reverse_list(env, retlist, &rretlist); + return rretlist; +} + +/* Return true if Lhs is a direct descendant of Rhs, else false -- remember, a vclock is its own descendant! */ + +ERL_NIF_TERM descends2_nif(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]) { + ERL_NIF_TERM headlhs, taillhs, listlhs = argv[0]; + ERL_NIF_TERM headrhs, tailrhs, listrhs = argv[1]; + + const ERL_NIF_TERM *tuplelhs, *tuplerhs; + int aritylhs, arityrhs, cmp; + ErlNifSInt64 vallhs, valrhs; + + while (enif_get_list_cell(env, listlhs, &headlhs, &taillhs) && + enif_get_list_cell(env, listrhs, &headrhs, &tailrhs)) { + + if(!enif_get_tuple(env, headlhs, &aritylhs, &tuplelhs)) + return badarg; + if (aritylhs != 2) + return badarg; + + if(!enif_get_tuple(env, headrhs, &arityrhs, &tuplerhs)) + return badarg; + if (arityrhs != 2) + return badarg; + cmp = enif_compare(tuplelhs[0], tuplerhs[0]); + if (cmp == 0) { + if (!enif_get_int64(env, tuplelhs[1], &vallhs)) + return badarg; + if (!enif_get_int64(env, tuplerhs[1], &valrhs)) + return badarg; + if (vallhs < valrhs) + return atom_false; + listlhs = taillhs; + listrhs = tailrhs; + } else if (cmp < 0) { + /* We have an extra actor on the lhs, drop it */ + listlhs = taillhs; + } else + return atom_false; + } + /* Make sure that RHS is empty */ + if(enif_get_list_cell(env, listrhs, &headrhs, &tailrhs)) + return atom_false; + + return atom_true; +} + +ERL_NIF_TERM drop_dots_nif(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]) { + ERL_NIF_TERM headlhs, taillhs, listlhs = argv[0]; + ERL_NIF_TERM headrhs, tailrhs, listrhs = argv[1]; + ERL_NIF_TERM rretlist, retlist = enif_make_list(env, 0); + ERL_NIF_TERM newtuple; + + const ERL_NIF_TERM *tuplelhs, *tuplerhs; + int aritylhs, arityrhs, cmp; + ErlNifSInt64 vallhs, valrhs; + + /* We will never receive an empty list either on the left or right side */ + + while (enif_get_list_cell(env, listlhs, &headlhs, &taillhs) && + enif_get_list_cell(env, listrhs, &headrhs, &tailrhs)) { + + if(!enif_get_tuple(env, headlhs, &aritylhs, &tuplelhs)) + return badarg; + if (aritylhs != 2) + return badarg; + + if(!enif_get_tuple(env, headrhs, &arityrhs, &tuplerhs)) + return badarg; + if (arityrhs != 2) + return badarg; + + cmp = enif_compare(tuplelhs[0], tuplerhs[0]); + if (cmp == 0) { + if (!enif_get_int64(env, tuplelhs[1], &vallhs)) + return badarg; + if (!enif_get_int64(env, tuplerhs[1], &valrhs)) + return badarg; + if (vallhs > valrhs) { + newtuple = enif_make_tuple2(env, tuplelhs[0], enif_make_int64(env, vallhs)); + retlist = enif_make_list_cell(env, newtuple, retlist); + } + /* Pop both lists */ + listlhs = taillhs; + listrhs = tailrhs; + } else if (cmp < 0) { /* lhs < rhs */ + /* Pop something off LHS to have it try to catch up with RHS */ + retlist = enif_make_list_cell(env, headlhs, retlist); + listlhs = taillhs; + } else if (cmp > 0) { /* lhs > rhs */ + /* Pop something off RHS to have it try to catch up with LHS */ + listrhs = tailrhs; + } + } + while (enif_get_list_cell(env, listlhs, &headlhs, &taillhs)) { + retlist = enif_make_list_cell(env, headlhs, retlist); + listlhs = taillhs; + } + + /* TODO: Maybe have some heuristic here to determine whether to do a reverse here, or in Erlang? */ + enif_make_reverse_list(env, retlist, &rretlist); + return rretlist; +} + +ERL_NIF_TERM dominates2_nif(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]) { + ERL_NIF_TERM headlhs, taillhs, listlhs = argv[0]; + ERL_NIF_TERM headrhs, tailrhs, listrhs = argv[1]; + bool has_dominated = false; + + const ERL_NIF_TERM *tuplelhs, *tuplerhs; + int aritylhs, arityrhs, cmp; + ErlNifSInt64 vallhs, valrhs; + + while (enif_get_list_cell(env, listlhs, &headlhs, &taillhs) && + enif_get_list_cell(env, listrhs, &headrhs, &tailrhs)) { + + if(!enif_get_tuple(env, headlhs, &aritylhs, &tuplelhs)) + return badarg; + if (aritylhs != 2) + return badarg; + + if(!enif_get_tuple(env, headrhs, &arityrhs, &tuplerhs)) + return badarg; + if (arityrhs != 2) + return badarg; + + cmp = enif_compare(tuplelhs[0], tuplerhs[0]); + if (!enif_get_int64(env, tuplelhs[1], &vallhs)) + return badarg; + if (!enif_get_int64(env, tuplerhs[1], &valrhs)) + return badarg; + + /* Extra actor on left hand side, ignore it */ + if (cmp < 0) { + listlhs = taillhs; + } else if (cmp > 0) { + return atom_false; + } else { + if (vallhs < valrhs) + return atom_false; + else if(vallhs > valrhs) + has_dominated = true; + listlhs = taillhs; + listrhs = tailrhs; + } + } + + /* Both sides are empty */ + if(enif_is_empty_list(env, listlhs) && enif_is_empty_list(env, listrhs)) { + if(has_dominated) + return atom_true; + else + return atom_false; + /* LHS is not empty, but RHS is empty */ + } else if(!enif_is_empty_list(env, listlhs) && enif_is_empty_list(env, listrhs)) { + return atom_true; + /* LHS is empty, but RHS is not empty */ + } else { + return atom_false; + } +} + +static ErlNifFunc nif_funcs[] = { + {"descends2", 2, descends2_nif}, + {"is_sorted", 1, is_sorted_nif}, + {"merge2", 2, merge2_nif}, + {"drop_dots", 2, drop_dots_nif}, + {"dominates2", 2, dominates2_nif} +}; + +ERL_NIF_INIT(riak_dt_vclock, nif_funcs, load, NULL, NULL, NULL) diff --git a/include/r18.hrl b/include/r18.hrl new file mode 100644 index 0000000..a59c504 --- /dev/null +++ b/include/r18.hrl @@ -0,0 +1,2 @@ +-type dict() :: dict:dict(). +-type set() :: sets:set(). \ No newline at end of file diff --git a/rebar.config b/rebar.config index f25336b..15ccd35 100644 --- a/rebar.config +++ b/rebar.config @@ -3,3 +3,22 @@ {erl_opts, [debug_info, warnings_as_errors]}. {eunit_opts, [verbose]}. {xref_checks, [undefined_function_calls]}. + + +{profiles, [ + {bench, [ + {erl_opts, [ + debug_info, + warnings_as_errors, + {platform_define, "^[0-9]+", namespaced_types}, + {d, 'BENCH', true} + ]} + ]} +]}. + +{pre_hooks, + [{"(linux|darwin|solaris)", compile, "make -C c_src"}, + {"(freebsd)", compile, "gmake -C c_src"}]}. +{post_hooks, + [{"(linux|darwin|solaris)", clean, "make -C c_src clean"}, + {"(freebsd)", clean, "gmake -C c_src clean"}]}. \ No newline at end of file diff --git a/src/orset_bm.erl b/src/orset_bm.erl index 6ff2964..b7f1121 100644 --- a/src/orset_bm.erl +++ b/src/orset_bm.erl @@ -19,15 +19,15 @@ -define(N, 10). start(Cmds, Mod) -> - TS = erlang:now(), + TS = erlang:monotonic_time(), Coord = start_coordinator(?N, Mod, self()), lists:foreach(fun(I) -> start_proc(I, Cmds, Mod, Coord) end, lists:seq(1, ?N)), receive {Coord, Results} -> - TE = erlang:now(), + TE = erlang:monotonic_time(), Size = m_size(Mod, Results), - io:format("Results are ~p in ~p~n", [Size, timer:now_diff(TE, TS) / 1000]) + io:format("Results are ~p in ~p~n", [Size, erlang:convert_time_unit(TE - TS, native, micro_seconds)]) end. diff --git a/src/riak_dt_map.erl b/src/riak_dt_map.erl index 054f7de..93cc2f9 100644 --- a/src/riak_dt_map.erl +++ b/src/riak_dt_map.erl @@ -167,6 +167,9 @@ -behaviour(riak_dt). +-include("r18.hrl"). + + -ifdef(EQC). -include_lib("eqc/include/eqc.hrl"). -endif. @@ -186,10 +189,10 @@ -export([gen_op/0, gen_op/1, gen_field/0, gen_field/1, generate/0, size/1]). -endif. --export_type([map/0, binary_map/0, map_op/0]). +-export_type([dt_map/0, binary_map/0, map_op/0]). -type binary_map() :: binary(). %% A binary that from_binary/1 will accept --type map() :: {riak_dt_vclock:vclock(), entries(), deferred()}. +-type dt_map() :: {riak_dt_vclock:vclock(), entries(), deferred()}. -type entries() :: [field()]. -type field() :: {field_name(), field_value()}. -type field_name() :: {Name :: binary(), CRDTModule :: crdt_mod()}. @@ -215,7 +218,7 @@ -type crdt() :: riak_dt_emcntr:emcntr() | riak_dt_od_flag:od_flag() | riak_dt_lwwreg:lwwreg() | riak_dt_orswot:orswot() | - riak_dt_map:map(). + riak_dt_map:dt_map(). -type map_op() :: {update, [map_field_update() | map_field_op()]}. @@ -234,18 +237,18 @@ -type precondition_error() :: {error, {precondition, {not_present, field()}}}. %% @doc Create a new, empty Map. --spec new() -> map(). +-spec new() -> dt_map(). new() -> {riak_dt_vclock:fresh(), orddict:new(), orddict:new()}. %% @doc sets the clock in the map to that `Clock'. Used by a %% containing Map for sub-CRDTs --spec parent_clock(riak_dt_vclock:vclock(), map()) -> map(). +-spec parent_clock(riak_dt_vclock:vclock(), dt_map()) -> dt_map(). parent_clock(Clock, {_MapClock, Values, Deferred}) -> {Clock, Values, Deferred}. %% @doc get the current set of values for this Map --spec value(map()) -> values(). +-spec value(dt_map()) -> values(). value({_Clock, Values, _Deferred}) -> orddict:fold(fun({Name, Type}, CRDTs, Acc) -> Merged = merge_crdts(Type, CRDTs), @@ -271,11 +274,11 @@ merge_crdts(Type, {CRDTs, TS}) -> Type:merge(TS, V). %% @doc query map (not implemented yet) --spec value(term(), map()) -> values(). +-spec value(term(), dt_map()) -> values(). value(_, Map) -> value(Map). -%% @doc update the `map()' or a field in the `map()' by executing +%% @doc update the `dt_map()' or a field in the `dt_map()' by executing %% the `map_op()'. `Ops' is a list of one or more of the following %% ops: %% @@ -291,8 +294,8 @@ value(_, Map) -> %% still present, and it's value will contain the concurrent update. %% %% Atomic, all of `Ops' are performed successfully, or none are. --spec update(map_op(), riak_dt:actor() | riak_dt:dot(), map()) -> - {ok, map()} | precondition_error(). +-spec update(map_op(), riak_dt:actor() | riak_dt:dot(), dt_map()) -> + {ok, dt_map()} | precondition_error(). update(Op, ActorOrDot, Map) -> update(Op, ActorOrDot, Map, undefined). @@ -302,8 +305,8 @@ update(Op, ActorOrDot, Map) -> %% types. hence the common clock. %% %% @see parent_clock/2 --spec update(map_op(), riak_dt:actor() | riak_dt:dot(), map(), riak_dt:context()) -> - {ok, map()}. +-spec update(map_op(), riak_dt:actor() | riak_dt:dot(), dt_map(), riak_dt:context()) -> + {ok, dt_map()}. update({update, Ops}, ActorOrDot, {Clock0, Values, Deferred}, Ctx) -> {Dot, Clock} = update_clock(ActorOrDot, Clock0), apply_ops(Ops, Dot, {Clock, Values, Deferred}, Ctx). @@ -324,7 +327,7 @@ update_clock(Actor, Clock) -> %% @private -spec apply_ops([map_field_update() | map_field_op()], riak_dt:dot(), {riak_dt_vclock:vclock(), entries() , deferred()}, context()) -> - {ok, map()} | precondition_error(). + {ok, dt_map()} | precondition_error(). apply_ops([], _Dot, Map, _Ctx) -> {ok, Map}; apply_ops([{update, {_Name, Type}=Field, Op} | Rest], Dot, {Clock, Values, Deferred}, Ctx) -> @@ -363,8 +366,8 @@ apply_ops([{remove, Field} | Rest], Dot, Map, Ctx) -> %% %% @see defer_remove/4 for handling of removes of fields that are %% _not_ present --spec remove_field(field(), map(), context()) -> - {ok, map()} | precondition_error(). +-spec remove_field(field(), dt_map(), context()) -> + {ok, dt_map()} | precondition_error(). remove_field(Field, {Clock, Values, Deferred}, undefined) -> case orddict:find(Field, Values) of error -> @@ -437,8 +440,8 @@ defer_remove(Clock, Ctx, Field, Deferred) -> Deferred) end. -%% @doc merge two `map()'s. --spec merge(map(), map()) -> map(). +%% @doc merge two `dt_map()'s. +-spec merge(dt_map(), dt_map()) -> dt_map(). merge(Map, Map) -> Map; %% @TODO is there a way to optimise this, based on clocks maybe? @@ -569,8 +572,8 @@ apply_deferred(Clock, Entries, Deferred) -> Deferred). %% @private --spec remove_all([field()], map(), context()) -> - map(). +-spec remove_all([field()], dt_map(), context()) -> + dt_map(). remove_all(Fields, Map, Ctx) -> lists:foldl(fun(Field, MapAcc) -> {ok, MapAcc2}= remove_field(Field, MapAcc, Ctx), @@ -579,10 +582,10 @@ remove_all(Fields, Map, Ctx) -> Map, Fields). -%% @doc compare two `map()'s for equality of structure Both schemas +%% @doc compare two `dt_map()'s for equality of structure Both schemas %% and value list must be equal. Performs a pariwise equals for all %% values in the value lists --spec equal(map(), map()) -> boolean(). +-spec equal(dt_map(), dt_map()) -> boolean(). equal({Clock1, Values1, Deferred1}, {Clock2, Values2, Deferred2}) -> riak_dt_vclock:equal(Clock1, Clock2) andalso Deferred1 == Deferred2 andalso @@ -613,7 +616,7 @@ pairwise_equals(_, _) -> %% that only seen fields are removed. If a field removal operation has %% a context that the Map has not seen, it will be deferred until %% causally relevant. --spec precondition_context(map()) -> riak_dt:context(). +-spec precondition_context(dt_map()) -> riak_dt:context(). precondition_context({Clock, _Field, _Deferred}) -> Clock. @@ -625,11 +628,11 @@ precondition_context({Clock, _Field, _Deferred}) -> %% basically `field_count' - ( unique fields) %% `deferred_length': How many operations on the deferred list, a reasonable expression %% of lag/staleness. --spec stats(map()) -> [{atom(), integer()}]. +-spec stats(dt_map()) -> [{atom(), integer()}]. stats(Map) -> [ {S, stat(S, Map)} || S <- [actor_count, field_count, duplication, deferred_length]]. --spec stat(atom(), map()) -> number() | undefined. +-spec stat(atom(), dt_map()) -> number() | undefined. stat(actor_count, {Clock, _, _}) -> length(Clock); stat(field_count, {_, Fields, _}) -> @@ -650,7 +653,7 @@ stat(_,_) -> undefined. -define(TAG, ?DT_MAP_TAG). -define(V1_VERS, 1). -%% @doc returns a binary representation of the provided `map()'. The +%% @doc returns a binary representation of the provided `dt_map()'. The %% resulting binary is tagged and versioned for ease of future %% upgrade. Calling `from_binary/1' with the result of this function %% will return the original map. Use the application env var @@ -658,15 +661,15 @@ stat(_,_) -> undefined. %% (`false') %% %% @see `from_binary/1' --spec to_binary(map()) -> binary_map(). +-spec to_binary(dt_map()) -> binary_map(). to_binary(Map) -> <>. %% @doc When the argument is a `binary_map()' produced by -%% `to_binary/1' will return the original `map()'. +%% `to_binary/1' will return the original `dt_map()'. %% %% @see `to_binary/1' --spec from_binary(binary_map()) -> map(). +-spec from_binary(binary_map()) -> dt_map(). from_binary(<>) -> riak_dt:from_binary(B). diff --git a/src/riak_dt_orswot.erl b/src/riak_dt_orswot.erl index c57c44d..c4da8bb 100644 --- a/src/riak_dt_orswot.erl +++ b/src/riak_dt_orswot.erl @@ -62,6 +62,8 @@ -behaviour(riak_dt). +-include("r18.hrl"). + -ifdef(EQC). -include_lib("eqc/include/eqc.hrl"). -define(QC_OUT(P), @@ -88,8 +90,9 @@ -endif. --export_type([orswot/0, orswot_op/0, binary_orswot/0]). +-export_type([legacy_orswot/0, orswot/0, orswot_op/0, binary_orswot/0]). +-opaque legacy_orswot() :: {riak_dt_vclock:vclock(), legacy_entries(), deferred()}. -opaque orswot() :: {riak_dt_vclock:vclock(), entries(), deferred()}. %% Only removes can be deferred, so a list of members to be removed %% per context. @@ -106,7 +109,8 @@ %% a dict of member() -> minimal_clock() mappings. The %% `minimal_clock()' is a more effecient way of storing knowledge %% about adds / removes than a UUID per add. --type entries() :: [{member(), minimal_clock()}]. +-type legacy_entries() ::[{member(), minimal_clock()}]. +-type entries() :: #{member() => minimal_clock()}. %% a minimal clock is just the dots for the element, each dot being an %% actor and event counter for when the element was added. @@ -116,9 +120,11 @@ -type precondition_error() :: {error, {precondition ,{not_present, member()}}}. +-define(EMPTY_ORSWOT, {[],#{},[]}). + -spec new() -> orswot(). new() -> - {riak_dt_vclock:fresh(), orddict:new(), orddict:new()}. + {riak_dt_vclock:fresh(), maps:new(), orddict:new()}. %% @doc sets the clock in the Set to that `Clock'. Used by a %% containing Map for sub-CRDTs @@ -126,11 +132,15 @@ new() -> parent_clock(Clock, {_SetClock, Entries, Deferred}) -> {Clock, Entries, Deferred}. --spec value(orswot()) -> [member()]. +-spec value(orswot() | legacy_orswot()) -> [member()]. +value({_Clock, Entries, _Deferred}) when is_list(Entries) -> + [K || {K, _Dots} <- orddict:to_list(Entries)]; value({_Clock, Entries, _Deferred}) -> - [K || {K, _Dots} <- orddict:to_list(Entries)]. + [K || {K, _Dots} <- maps:to_list(Entries)]. --spec value(orswot_q(), orswot()) -> term(). +-spec value(orswot_q(), orswot() | legacy_orswot()) -> term(). +value(size, {_Clock, Entries, _Deferred}) when is_map(Entries) -> + maps:size(Entries); value(size, ORset) -> length(value(ORset)); value({contains, Elem}, ORset) -> @@ -138,15 +148,17 @@ value({contains, Elem}, ORset) -> %% @doc take a list of Set operations and apply them to the set. %% NOTE: either _all_ are applied, or _none_ are. --spec update(orswot_op(), actor() | dot(), orswot()) -> {ok, orswot()} | +-spec update(orswot_op(), actor() | dot(), orswot() | legacy_orswot()) -> {ok, orswot()} | precondition_error(). +update(Op, Actor, {Clock, Entries0, Deferred}) when is_list(Entries0) -> + Entries1 = maps:from_list(Entries0), + update(Op, Actor, {Clock, Entries1, Deferred}); update({update, Ops}, Actor, ORSet) -> apply_ops(Ops, Actor, ORSet); update({add, Elem}, Actor, ORSet) -> {ok, add_elem(Actor, ORSet, Elem)}; -update({remove, Elem}, _Actor, ORSet) -> - {_Clock, Entries, _Deferred} = ORSet, - remove_elem(orddict:find(Elem, Entries), Elem, ORSet); +update({remove, Elem}, _Actor, ORSet = {_Clock, Entries, _Deferred}) -> + remove_elem(maps:find(Elem, Entries), Elem, ORSet); update({add_all, Elems}, Actor, ORSet) -> ORSet2 = lists:foldl(fun(E, S) -> add_elem(Actor, S, E) end, @@ -158,8 +170,11 @@ update({add_all, Elems}, Actor, ORSet) -> update({remove_all, Elems}, Actor, ORSet) -> remove_all(Elems, Actor, ORSet). --spec update(orswot_op(), actor() | dot(), orswot(), riak_dt:context()) -> +-spec update(orswot_op(), actor() | dot(), legacy_orswot() | orswot(), riak_dt:context()) -> {ok, orswot()} | precondition_error(). +update(Op, Actor, {Clock, Entries0, Deferred}, Context) when is_list(Entries0) -> + Entries1 = map:from_list(Entries0), + update(Op, Actor, {Clock, Entries1, Deferred}, Context); update(Op, Actor, ORSet, undefined) -> update(Op, Actor, ORSet); update({add, Elem}, Actor, ORSet, _Ctx) -> @@ -170,14 +185,14 @@ update({remove, Elem}, _Actor, {Clock, Entries, Deferred}, Ctx) -> %% have this element, we can drop any dots it has that the %% Context has seen. Deferred2 = defer_remove(Clock, Ctx, Elem, Deferred), - case orddict:find(Elem, Entries) of + case maps:find(Elem, Entries) of {ok, ElemClock} -> ElemClock2 = riak_dt_vclock:subtract_dots(ElemClock, Ctx), case ElemClock2 of [] -> - {ok, {Clock, orddict:erase(Elem, Entries), Deferred2}}; + {ok, {Clock, maps:remove(Elem, Entries), Deferred2}}; _ -> - {ok, {Clock, orddict:store(Elem, ElemClock2, Entries), Deferred2}} + {ok, {Clock, maps:put(Elem, ElemClock2, Entries), Deferred2}} end; error -> %% Do we not have the element because we removed it @@ -258,27 +273,34 @@ remove_all([Elem | Rest], Actor, ORSet, Ctx) -> {ok, ORSet2} = update({remove, Elem}, Actor, ORSet, Ctx), remove_all(Rest, Actor, ORSet2, Ctx). --spec merge(orswot(), orswot()) -> orswot(). +-spec merge(orswot() | legacy_orswot(), orswot() | legacy_orswot()) -> orswot(). +merge({LHSClock, LHSEntries0, LHSDeferred}, RHS) when is_list(LHSEntries0) -> + LHSEntries1 = maps:from_list(LHSEntries0), + merge({LHSClock, LHSEntries1, LHSDeferred}, RHS); +merge(LHS, {RHSClock, RHSEntries0, RHSDeferred}) when is_list(RHSEntries0) -> + RHSEntries1 = maps:from_list(RHSEntries0), + merge(LHS, {RHSClock, RHSEntries1, RHSDeferred}); merge({Clock, Entries, Deferred}, {Clock, Entries, Deferred}) -> {Clock, Entries, Deferred}; +merge(?EMPTY_ORSWOT, RHS) -> RHS; +merge(LHS, ?EMPTY_ORSWOT) -> LHS; +merge({Clock, Entries, LHSDeferred}, {Clock, Entries, RHSDeferred}) -> + Deffered = merge_deferred(LHSDeferred, RHSDeferred), + apply_deferred(Clock, Entries, Deffered); merge({LHSClock, LHSEntries, LHSDeferred}=LHS, {RHSClock, RHSEntries, RHSDeferred}=RHS) -> Clock = riak_dt_vclock:merge([LHSClock, RHSClock]), %% If an element is in both dicts, merge it. If it occurs in one, %% then see if its dots are dominated by the others whole set %% clock. If so, then drop it, if not, keep it. - LHSKeys = sets:from_list(orddict:fetch_keys(LHSEntries)), - RHSKeys = sets:from_list(orddict:fetch_keys(RHSEntries)), - CommonKeys = sets:intersection(LHSKeys, RHSKeys), - LHSUnique = sets:subtract(LHSKeys, CommonKeys), - RHSUnique = sets:subtract(RHSKeys, CommonKeys), - Entries00 = merge_common_keys(CommonKeys, LHS, RHS), + LHSUnique = maps:filter(fun(Key, _Value) -> not maps:is_key(Key, RHSEntries) end, LHSEntries), + RHSUnique = maps:filter(fun(Key, _Value) -> not maps:is_key(Key, LHSEntries) end, RHSEntries), + Entries0 = merge_common_keys(LHS, RHS), - Entries0 = merge_disjoint_keys(LHSUnique, LHSEntries, RHSClock, Entries00), - Entries = merge_disjoint_keys(RHSUnique, RHSEntries, LHSClock, Entries0), + Entries1 = merge_disjoint_keys(LHSUnique, LHSEntries, RHSClock, Entries0), + Entries2 = merge_disjoint_keys(RHSUnique, RHSEntries, LHSClock, Entries1), Deffered = merge_deferred(LHSDeferred, RHSDeferred), - - apply_deferred(Clock, Entries, Deffered). + apply_deferred(Clock, Entries2, Deffered). %% @private merge the deffered operations for both sets. -spec merge_deferred(deferred(), deferred()) -> deferred(). @@ -304,17 +326,17 @@ apply_deferred(Clock, Entries, Deferred) -> %% @doc check if each element in `Entries' should be in the merged %% set. --spec merge_disjoint_keys(set(), orddict:orddict(), - riak_dt_vclock:vclock(), orddict:orddict()) -> orddict:orddict(). +-spec merge_disjoint_keys(map(), entries(), + riak_dt_vclock:vclock(), entries()) -> entries(). merge_disjoint_keys(Keys, Entries, SetClock, Accumulator) -> - sets:fold(fun(Key, Acc) -> - Dots = orddict:fetch(Key, Entries), + maps:fold(fun(Key, _Val, Acc) -> + Dots = maps:get(Key, Entries), case riak_dt_vclock:descends(SetClock, Dots) of false -> %% Optimise the set of stored dots to %% include only those unseen NewDots = riak_dt_vclock:subtract_dots(Dots, SetClock), - orddict:store(Key, NewDots, Acc); + Acc#{Key => NewDots}; true -> Acc end @@ -323,10 +345,10 @@ merge_disjoint_keys(Keys, Entries, SetClock, Accumulator) -> Keys). %% @doc merges the minimal clocks for the common entries in both sets. --spec merge_common_keys(set(), {riak_dt_vclock:vclock(), entries(), deferred()}, +-spec merge_common_keys({riak_dt_vclock:vclock(), entries(), deferred()}, {riak_dt_vclock:vclock(), entries(), deferred()}) -> - orddict:orddict(). -merge_common_keys(CommonKeys, {LHSClock, LHSEntries, _}, {RHSClock, RHSEntries, _}) -> + entries(). +merge_common_keys({LHSClock, LHSEntries, _}, {RHSClock, RHSEntries, _}) -> %% If both sides have the same values, some dots may still need to %% be shed. If LHS has dots for 'X' that RHS does _not_ have, and @@ -334,42 +356,59 @@ merge_common_keys(CommonKeys, {LHSClock, LHSEntries, _}, {RHSClock, RHSEntries, %% dots. We only keep dots BOTH side agree on, or dots that are %% not dominated. Keep only common dots, and dots that are not %% dominated by the other sides clock - - sets:fold(fun(Key, Acc) -> - V1 = orddict:fetch(Key, LHSEntries), - V2 = orddict:fetch(Key, RHSEntries), - - CommonDots = sets:intersection(sets:from_list(V1), sets:from_list(V2)), - LHSUnique = sets:to_list(sets:subtract(sets:from_list(V1), CommonDots)), - RHSUnique = sets:to_list(sets:subtract(sets:from_list(V2), CommonDots)), - LHSKeep = riak_dt_vclock:subtract_dots(LHSUnique, RHSClock), - RHSKeep = riak_dt_vclock:subtract_dots(RHSUnique, LHSClock), - V = riak_dt_vclock:merge([sets:to_list(CommonDots), LHSKeep, RHSKeep]), - %% Perfectly possible that an item in both sets should be dropped - case V of - [] -> - orddict:erase(Key, Acc); - _ -> - orddict:store(Key, V, Acc) - end - end, - orddict:new(), - CommonKeys). - --spec equal(orswot(), orswot()) -> boolean(). + LHSKeys = maps:keys(LHSEntries), + lists:foldl( + fun(Key, Acc) -> + case maps:is_key(Key, RHSEntries) of + true -> + V1 = maps:get(Key, LHSEntries), + V2 = maps:get(Key, RHSEntries), + + CommonDots = sets:intersection(sets:from_list(V1), sets:from_list(V2)), + LHSUnique = sets:to_list(sets:subtract(sets:from_list(V1), CommonDots)), + RHSUnique = sets:to_list(sets:subtract(sets:from_list(V2), CommonDots)), + LHSKeep = riak_dt_vclock:subtract_dots(LHSUnique, RHSClock), + RHSKeep = riak_dt_vclock:subtract_dots(RHSUnique, LHSClock), + V = riak_dt_vclock:merge([sets:to_list(CommonDots), LHSKeep, RHSKeep]), + %% Perfectly possible that an item in both sets should be dropped + case V of + [] -> + maps:remove(Key, Acc); + _ -> + maps:put(Key, V, Acc) + end; + false -> + Acc + end + end, + maps:new(), + LHSKeys). + +-spec equal(orswot() | legacy_orswot(), orswot() | legacy_orswot()) -> boolean(). +equal({Clock, Entries0, Deferred}, Rhs) when is_list(Entries0) -> + Entries1 = maps:from_list(Entries0), + equal({Clock, Entries1, Deferred}, Rhs); +equal(Lhs, {Clock, Entries0, Deferred}) when is_list(Entries0) -> + Entries1 = maps:from_list(Entries0), + equal(Lhs, {Clock, Entries1, Deferred}); equal({Clock1, Entries1, _}, {Clock2, Entries2, _}) -> riak_dt_vclock:equal(Clock1, Clock2) andalso - orddict:fetch_keys(Entries1) == orddict:fetch_keys(Entries2) andalso + Entries1 == Entries2 andalso clocks_equal(Entries1, Entries2). --spec clocks_equal(orddict:orddict(), orddict:orddict()) -> boolean(). -clocks_equal([], _) -> +-spec clocks_equal(entries(), entries()) -> boolean(). +clocks_equal(EntriesLhs, EntriesRhs) -> + KeysLhs = maps:keys(EntriesLhs), + clocks_equal(KeysLhs, EntriesLhs, EntriesRhs). + +clocks_equal([], _, _) -> true; -clocks_equal([{Elem, Clock1} | Rest], Entries2) -> - Clock2 = orddict:fetch(Elem, Entries2), - case riak_dt_vclock:equal(Clock1, Clock2) of +clocks_equal([Elem|Rest], EntriesLhs, EntriesRhs) -> + ClockLhs = maps:get(Elem, EntriesLhs), + ClockRhs = maps:get(Elem, EntriesRhs), + case riak_dt_vclock:equal(ClockLhs, ClockRhs) of true -> - clocks_equal(Rest, Entries2); + clocks_equal(Rest, EntriesLhs, EntriesRhs); false -> false end. @@ -377,18 +416,18 @@ clocks_equal([{Elem, Clock1} | Rest], Entries2) -> %% Private -spec add_elem(actor() | dot(), orswot(), member()) -> orswot(). add_elem(Dot, {Clock, Entries, Deferred}, Elem) when is_tuple(Dot) -> - {riak_dt_vclock:merge([Clock, [Dot]]), orddict:store(Elem, [Dot], Entries), Deferred}; + {riak_dt_vclock:merge([Clock, [Dot]]), maps:put(Elem, [Dot], Entries), Deferred}; add_elem(Actor, {Clock, Entries, Deferred}, Elem) -> NewClock = riak_dt_vclock:increment(Actor, Clock), Dot = [{Actor, riak_dt_vclock:get_counter(Actor, NewClock)}], - {NewClock, orddict:store(Elem, Dot, Entries), Deferred}. + {NewClock, maps:put(Elem, Dot, Entries), Deferred}. -spec remove_elem({ok, riak_dt_vclock:vclock()} | error, - member(), {riak_dt_vclock:vclock(), orddict:orddict(), deferred()}) -> - {ok, {riak_dt_vclock:vclock(), orddict:orddict(), deferred()}} | + member(), orswot()) -> + {ok, orswot()} | precondition_error(). -remove_elem({ok, _VClock}, Elem, {Clock, Dict, Deferred}) -> - {ok, {Clock, orddict:erase(Elem, Dict), Deferred}}; +remove_elem({ok, _VClock}, Elem, {Clock, Entries, Deferred}) -> + {ok, {Clock, maps:remove(Elem, Entries), Deferred}}; remove_elem(_, Elem, _ORSet) -> {error, {precondition, {not_present, Elem}}}. @@ -412,9 +451,9 @@ stats(ORSWOT) -> stat(actor_count, {Clock, _Dict, _}) -> length(Clock); stat(element_count, {_Clock, Dict, _}) -> - orddict:size(Dict); + maps:size(Dict); stat(max_dot_length, {_Clock, Dict, _}) -> - orddict:fold(fun(_K, Dots, Acc) -> + maps:fold(fun(_K, Dots, Acc) -> max(length(Dots), Acc) end, 0, Dict); stat(deferred_length, {_Clock, _Dict, Deferred}) -> @@ -450,6 +489,8 @@ from_binary(<>) -> %% =================================================================== -ifdef(TEST). +empty_orswot_test() -> + ?assertEqual(new(), ?EMPTY_ORSWOT). stat_test() -> Set = new(), {ok, Set1} = update({add, <<"foo">>}, 1, Set), diff --git a/src/riak_dt_vclock.erl b/src/riak_dt_vclock.erl index 76b8984..120a388 100644 --- a/src/riak_dt_vclock.erl +++ b/src/riak_dt_vclock.erl @@ -30,6 +30,10 @@ %% pp. 215-226 -module(riak_dt_vclock). +-compile(inline_list_funcs). + +-on_load(init/0). + -export([fresh/0,descends/2,merge/1,get_counter/2, subtract_dots/2, increment/2,all_nodes/1, equal/2, @@ -39,39 +43,80 @@ -include_lib("eunit/include/eunit.hrl"). -endif. +-define(APPNAME, riak_dt). +-define(LIBNAME, riak_dt_vclock). + +%% The presence of the special actor indicates the list is sorted +%% This is a very "low" actor # that was chosen randomly, and smaller than -2**31 -export_type([vclock/0, vclock_node/0, binary_vclock/0]). -type vclock() :: [vc_entry()]. +-type sorted_vclock() :: orddict:orddict(vclock_node(), counter()). -type binary_vclock() :: binary(). % The timestamp is present but not used, in case a client wishes to inspect it. -type vc_entry() :: {vclock_node(), counter()}. % Nodes can have any term() as a name, but they must differ from each other. -type vclock_node() :: term(). --type counter() :: integer(). +-type counter() :: non_neg_integer(). + +init() -> + SoName = case code:priv_dir(?APPNAME) of + {error, bad_name} -> + case filelib:is_dir(filename:join(["..", priv])) of + true -> + filename:join(["..", priv, ?LIBNAME]); + _ -> + filename:join([priv, ?LIBNAME]) + end; + Dir -> + filename:join(Dir, ?LIBNAME) + end, + erlang:load_nif(SoName, 0). + % @doc Create a brand new vclock. --spec fresh() -> vclock(). +-spec fresh() -> sorted_vclock(). fresh() -> []. -% @doc Return true if Va is a direct descendant of Vb, else false -- remember, a vclock is its own descendant! --spec descends(Va :: vclock()|[], Vb :: vclock()|[]) -> boolean(). -descends(_, []) -> - % all vclocks descend from the empty vclock - true; -descends(Va, Vb) -> - [{NodeB, CtrB} |RestB] = Vb, - case lists:keyfind(NodeB, 1, Va) of +is_sorted(_List) -> + erlang:nif_error({error, not_loaded}). + +-spec(ensure_sorted(vclock()) -> sorted_vclock()). +%ensure_sorted([]) -> +% case is_sorted([]) of +% _ -> [] +% end; +%ensure_sorted(X) -> X. +ensure_sorted(VClock0) -> + case is_sorted(VClock0) of + true -> VClock0; false -> - false; - {_, CtrA} -> - (CtrA >= CtrB) andalso descends(Va,RestB) + lists:usort(VClock0) end. +% @doc Return true if Va is a direct descendant of Vb, else false -- remember, a vclock is its own descendant! +-spec descends(Va :: vclock()|[], Vb :: vclock()|[]) -> boolean(). +descends(Va0, Vb0) -> + Va1 = ensure_sorted(Va0), + Vb1 = ensure_sorted(Vb0), + descends2(Va1, Vb1). + + +-spec descends2(Va :: sorted_vclock()|[], Vb :: sorted_vclock()|[]) -> boolean(). +descends2(_Va, _Vb) -> + erlang:nif_error({error, not_loaded}). + +% @doc Return true if Va strictly dominates Vb, else false! -spec dominates(vclock(), vclock()) -> boolean(). -dominates(A, B) -> - descends(A, B) andalso not descends(B, A). +dominates(Va0, Vb0) -> + Va1 = ensure_sorted(Va0), + Vb1 = ensure_sorted(Vb0), + dominates2(Va1, Vb1). + +dominates2(_Va, _Vb) -> + erlang:nif_error({error, not_loaded}). %% @doc subtract the VClock from the DotList. %% what this means is that any {actor(), count()} pair in @@ -80,51 +125,31 @@ dominates(A, B) -> %% [{a, 4}, {b, 1}, {c, 1}, {d, 14}, {e, 5}, {f, 2}] = %% [{{b, 2}, {g, 22}] -spec subtract_dots(vclock(), vclock()) -> vclock(). -subtract_dots(DotList, VClock) -> - drop_dots(DotList, VClock, []). - -drop_dots([], _Clock, NewDots) -> - lists:sort(NewDots); -drop_dots([{Actor, Count}=Dot | Rest], Clock, Acc) -> - case get_counter(Actor, Clock) of - Cnt when Cnt >= Count -> - %% Dot is dominated by clock, drop it - drop_dots(Rest, Clock, Acc); - _ -> - drop_dots(Rest, Clock, [Dot | Acc]) - end. +subtract_dots(DotList0, VClock0) -> + DotList1 = ensure_sorted(DotList0), + VClock1 = ensure_sorted(VClock0), + drop_dots(DotList1, VClock1). + +drop_dots(_DotList, _VClock) -> + erlang:nif_error({error, not_loaded}). % @doc Combine all VClocks in the input list into their least possible % common descendant. -spec merge(VClocks :: [vclock()]) -> vclock() | []. -merge([]) -> []; -merge([SingleVclock]) -> SingleVclock; -merge([First|Rest]) -> merge(Rest, lists:keysort(1, First)). - -merge([], NClock) -> NClock; -merge([AClock|VClocks],NClock) -> - merge(VClocks, merge(lists:keysort(1, AClock), NClock, [])). - -merge([], [], AccClock) -> lists:reverse(AccClock); -merge([], Left, AccClock) -> lists:reverse(AccClock, Left); -merge(Left, [], AccClock) -> lists:reverse(AccClock, Left); -merge(V=[{Node1, Ctr1}=NCT1|VClock], - N=[{Node2,Ctr2}=NCT2|NClock], AccClock) -> - if Node1 < Node2 -> - merge(VClock, N, [NCT1|AccClock]); - Node1 > Node2 -> - merge(V, NClock, [NCT2|AccClock]); - true -> - CT = if Ctr1 > Ctr2 -> Ctr1; - Ctr1 < Ctr2 -> Ctr2; - true -> Ctr1 - end, - merge(VClock, NClock, [{Node1,CT}|AccClock]) - end. +merge(VClocks0) -> + [VClocks1|RestVClocks1] = lists:map(fun ensure_sorted/1, VClocks0), + lists:foldl(fun merge/2, VClocks1, RestVClocks1). + +merge(V1, V2) -> merge2(V1, V2). + +merge2(_V1, _V2) -> erlang:nif_error({error, not_loaded}). % @doc Get the counter value in VClock set from Node. -spec get_counter(Node :: vclock_node(), VClock :: vclock()) -> counter(). get_counter(Node, VClock) -> + %% No reason to try sorting it + %% Best case scenario sort is O(N) + %% This function's worst case scenario is O(N) case lists:keyfind(Node, 1, VClock) of {_, Ctr} -> Ctr; false -> 0 @@ -133,30 +158,26 @@ get_counter(Node, VClock) -> % @doc Increment VClock at Node. -spec increment(Node :: vclock_node(), VClock :: vclock()) -> vclock(). -increment(Node, VClock) -> - {Ctr,NewV} = case lists:keytake(Node, 1, VClock) of - false -> - {1, VClock}; - {value, {_N, C}, ModV} -> - {C + 1, ModV} - end, - [{Node,Ctr}|NewV]. +increment(Node, VClock0) -> + VClock1 = ensure_sorted(VClock0), + orddict:update_counter(Node, 1, VClock1). +%%increment2(_Node, _VClock1) -> erlang:nif_error({error, not_loaded}). % @doc Return the list of all nodes that have ever incremented VClock. -spec all_nodes(VClock :: vclock()) -> [vclock_node()]. -all_nodes(VClock) -> - [X || {X, _} <- sort(VClock)]. +all_nodes(VClock0) -> + lists:usort([X || {X, _} <- VClock0]). % @doc Compares two VClocks for equality. -spec equal(VClockA :: vclock(), VClockB :: vclock()) -> boolean(). equal(VA,VB) -> - lists:sort(VA) =:= lists:sort(VB). + ensure_sorted(VA) =:= ensure_sorted(VB). %% @doc sorts the vclock by actor -spec sort(vclock()) -> vclock(). sort(Clock) -> - lists:sort(Clock). + ensure_sorted(Clock). %% @doc an effecient format for disk / wire. %5 @see `from_binary/1` @@ -171,21 +192,28 @@ from_binary(Bin) -> %% @doc take two vclocks and return a vclock that summerizes only the %% events both have seen. --spec glb(vclock(), vclock()) -> vclock(). -glb(Clock1, Clock2) -> - Clock = lists:foldl(fun({Actor, Cnt}, GLB) -> - case lists:keyfind(Actor, 1, Clock2) of - false -> - GLB; - {Actor, Cnt2} when Cnt2 >= Cnt -> - [{Actor, Cnt} | GLB]; - {Actor, Cnt2} -> - [{Actor, Cnt2} | GLB] - end - end, - fresh(), - Clock1), - lists:sort(Clock). +-spec glb(vclock(), vclock()) -> sorted_vclock(). +glb(ClockA0, ClockB0) -> + ClockA1 = ensure_sorted(ClockA0), + ClockB1 = ensure_sorted(ClockB0), + glb(ClockA1, ClockB1, []). + +glb([], [], Acc) -> + lists:reverse(Acc); +glb(_, [], Acc) -> + glb([], [], Acc); +glb([], _, Acc) -> + glb([], [], Acc); +glb([Dot|RestClockA], [Dot|RestClockB], Acc) -> + glb(RestClockA, RestClockB, Acc); +glb([{Actor, CounterA}|RestClockA], [DotB = {Actor, CounterB}|RestClockB], Acc) when CounterA > CounterB -> + glb(RestClockA, RestClockB, [DotB|Acc]); +glb([DotA = {Actor, CounterA}|RestClockA], [{Actor, CounterB}|RestClockB], Acc) when CounterB > CounterA -> + glb(RestClockA, RestClockB, [DotA|Acc]); +glb([{ActorA, _}|RestClockA], ClockB = [{ActorB, _}|_RestClockB], Acc) when ActorA < ActorB -> + glb(RestClockA, ClockB, Acc); +glb(ClockA = [{ActorA, _}|_RestClockA], [{ActorB, _}|RestClockB], Acc) when ActorA > ActorB -> + glb(ClockA, RestClockB, Acc). %% =================================================================== %% EUnit tests @@ -224,7 +252,7 @@ merge_test() -> {<<"4">>, 4}], VC2 = [{<<"3">>, 3}, {<<"4">>, 3}], - ?assertEqual([], merge(riak_dt_vclock:fresh())), + ?assertEqual([], merge([riak_dt_vclock:fresh()])), ?assertEqual([{<<"1">>,1},{<<"2">>,2},{<<"3">>,3},{<<"4">>,4}], merge([VC1, VC2])). @@ -246,4 +274,87 @@ merge_same_id_test() -> ?assertEqual([{<<"1">>, 1},{<<"2">>,1},{<<"3">>,1}], riak_dt_vclock:merge([VC1, VC2])). +% if Va strictly dominates Vb, else false! + +dominates_test() -> + ?assertNot(dominates([], [])), + ?assert(dominates([{'minuteman@10.0.3.237',1}], [])), + ?assertNot(dominates([], [{a, 1}])), + ?assertNot(dominates([{a, 1}], [{b,1}])), + ?assertNot(dominates([{b, 1}], [{a,1}])), + ?assert(dominates([{a, 1}, {b,1}, {c, 1}, {d,1}], [{c, 1}])), + ?assertNot(dominates([{c, 1}], [{a, 1}, {b,1}, {c, 1}, {d,1}])), + ?assertNot(dominates([{a, 1}, {c, 1}], [{b, 1}])), + ?assertNot(dominates([{b, 1}], [{a, 1}, {c, 1}])). + +subtract_dots_test() -> + ?assertEqual([{a, 1}, {b, 2}], subtract_dots([{a, 1}, {b, 2}], [])). + +is_sorted_test() -> + ?assertNot(is_sorted([{1},{2},{3},{2}])), + ?assert(is_sorted([{1},{2},{3}])), + ?assertNot(is_sorted([{1},{1}])), + ?assertNot(is_sorted([{1},{2},{1}])), + ?assertNot(is_sorted([{1},{1}])), + ?assertNot(is_sorted([{2},{2}])), + ?assert(is_sorted([{1},{2}])), + ?assert(is_sorted([{1}])), + ?assert(is_sorted([])). + +-ifdef(BENCH). +bench_test_() -> + {timeout, 300, [fun() -> bench() end]}. +bench() -> + A = random_clock1(1, 2000), + A1 = lists:usort(A), + B = random_clock1(1, 2000), + B1 = lists:usort(B), + C = random_clock1(500, 3000), + C1 = lists:usort(C), + D = random_clock1(1, 3000), + D1 = lists:usort(D), + E = random_clock1(1, 10), + E1 = lists:usort(E), + +% ?debugFmt("Increment Time (1): ~b~n", [get_time(fun increment/2, ['actor-500', A])]), + ?debugFmt("Increment Time (2): ~b~n", [get_time(fun increment/2, ['actor-500', A1])]), + %?debugFmt("Merge Time (1): ~b~n", [get_time(fun merge/1, [[B, A]])]), + ?debugFmt("Merge Time (2): ~b~n", [get_time(fun merge/1, [[B1, A1]])]), + %?debugFmt("Merge Time (3): ~b~n", [get_time(fun merge/1, [[A, C]])]), + ?debugFmt("Merge Time (4): ~b~n", [get_time(fun merge/1, [[A1, C1]])]), + %?debugFmt("Merge Time (5): ~b~n", [get_time(fun merge/1, [[A, A]])]), + ?debugFmt("Merge Time (6): ~b~n", [get_time(fun merge/1, [[A1, A1]])]), + %?debugFmt("Merge Time (7): ~b~n", [get_time(fun merge/1, [[E, E]])]), + ?debugFmt("Merge Time (8): ~b~n", [get_time(fun merge/1, [[E1, E1]])]), + + %?debugFmt("Descends Time (1): ~b~n", [get_time(fun descends/2, [A, C])]), + ?debugFmt("Descends Time (2): ~b~n", [get_time(fun descends/2, [A1, C1])]), + %?debugFmt("Descends Time (2): ~b~n", [get_time(fun descends/2, [A, D])]), + ?debugFmt("Descends Time (3): ~b~n", [get_time(fun descends/2, [A1, D1])]), + %?debugFmt("Descends Time (4): ~b~n", [get_time(fun descends/2, [D, A])]), + ?debugFmt("Descends Time (5): ~b~n", [get_time(fun descends/2, [D1, A1])]), + %?debugFmt("Descends Time (6): ~b~n", [get_time(fun descends/2, [D, D])]), + ?debugFmt("Descends Time (7): ~b~n", [get_time(fun descends/2, [D1, D1])]). + +random_clock1(N1, N2) -> + Seq0 = lists:seq(N1, N2), + Seq1 = [{rand:uniform(1000000), X} || X <- Seq0], + Seq2 = lists:sort(Seq1), + Seq3 = [X || {_, X} <- Seq2], + lists:map( + fun(I) -> + Actor = list_to_atom(lists:flatten(io_lib:format("actor-~b", [I]))), + {Actor, rand:uniform(100000)} + end, + Seq3 + ). +get_time(Fun, Args) -> + T1 = os:timestamp(), + lists:foreach(fun(_) -> apply(Fun, Args) end, lists:seq(1, 10000)), + T2 = os:timestamp(), + DiffMicros = timer:now_diff(T2, T1), + round(DiffMicros / 10000.0). + + +-endif. -endif.