This module implements a persistent, on-disk hash tree that is used predominately for active anti-entropy exchange in Riak. The tree consists of two parts, a set of unbounded on-disk segments and a fixed size hash tree (that may be on-disk or in-memory) constructed over these segments.
A graphical description of this design can be found in: docs/hashtree.md
Each segment logically represents an on-disk list of (key, hash) pairs. Whereas the hash tree is represented as a set of levels and buckets, with a fixed width (or fan-out) between levels that determines how many buckets of a child level are grouped together and hashed to represent a bucket at the parent level. Each leaf in the tree corresponds to a hash of one of the on-disk segments. For example, a tree with a width of 4 and 16 segments would look like the following:
level buckets 1: [0] 2: [0 1 2 3] 3: [0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15]
With each bucket entry of the form {bucket-id, hash}, eg. {0,
binary()}. The hash for each of the entries at level 3 would come from
one of the 16 segments, while the hashes for entries at level 1 and 2 are
derived from the lower levels.
Specifically, the bucket entries in level 2 would come from level 3: 0: hash([ 0 1 2 3]) 1: hash([ 4 5 6 7]) 2: hash([ 8 9 10 11]) 3: hash([12 13 14 15])
And the bucket entries in level 1 would come from level 2: 1: hash([hash([ 0 1 2 3]) hash([ 4 5 6 7]) hash([ 8 9 10 11]) hash([12 13 14 15])])
When a (key, hash) pair is added to the tree, the key is hashed to determine which segment it belongs to and inserted/upserted into the segment. Rather than update the hash tree on every insert, a dirty bit is set to note that a given segment has changed. The hashes are then updated in bulk before performing a tree exchange
To update the hash tree, the code iterates over each dirty segment, building a list of (key, hash) pairs. A hash is computed over this list, and the leaf node in the hash tree corresponding to the given segment is updated. After iterating over all dirty segments, and thus updating all leaf nodes, the update then continues to update the tree bottom-up, updating only paths that have changed. As designed, the update requires a single sparse scan over the on-disk segments and a minimal traversal up the hash tree.
The heavy-lifting of this module is provided by LevelDB. What is logically viewed as sorted on-disk segments is in reality a range of on-disk (segment, key, hash) values written to LevelDB. Each insert of a (key, hash) pair therefore corresponds to a single LevelDB write (no read necessary). Likewise, the update operation is performed using LevelDB iterators.
When used for active anti-entropy in Riak, the hash tree is built once and then updated in real-time as writes occur. A key design goal is to ensure that adding (key, hash) pairs to the tree is non-blocking, even during a tree update or a tree exchange. This is accomplished using LevelDB snapshots. Inserts into the tree always write directly to the active LevelDB instance, however updates and exchanges operate over a snapshot of the tree.
In order to improve performance, writes are buffered in memory and sent to LevelDB using a single batch write. Writes are flushed whenever the buffer becomes full, as well as before updating the hashtree.
Tree exchange is provided by thecompare/4 function.
The behavior of this function is determined through a provided function
that implements logic to get buckets and segments for a given remote tree,
as well as a callback invoked as key differences are determined. This
generic interface allows for tree exchange to be implemented in a variety
of ways, including directly against to local hash tree instances, over
distributed Erlang, or over a custom protocol over a TCP socket. See
local_compare/2 and do_remote/1 for examples (-ifdef(TEST) only).
acc_fun(Acc) = fun(([keydiff()], Acc) -> Acc)
abstract datatype: hashtree()
index() = non_neg_integer()
index_n() = {index(), pos_integer()}
keydiff() = {missing | remote_missing | different, binary()}
next_rebuild() = full | incremental
orddict() = orddict:orddict()
proplist() = proplists:proplist()
remote_fun() = fun((get_bucket | key_hashes | start_exchange_level | start_exchange_segments | init | final, {integer(), integer()} | integer() | term()) -> any())
select_fun(T) = fun((orddict()) -> T)
tree_id_bin() = <<_:176>>
| close/1 | |
| compare/4 | |
| compare2/4 | |
| delete/2 | |
| destroy/1 | |
| estimate_keys/1 | Estimate number of keys stored in the AAE tree. |
| flush_buffer/1 | |
| get_bucket/3 | |
| insert/3 | |
| insert/4 | |
| key_hashes/2 | |
| levels/1 | |
| mark_clean_close/2 | Call on a clean-close to update the meta for a tree-id's closed count
to match the current opened count, which is checked on new/reopen. |
| mark_open_and_check/2 | Check if shutdown/closing of tree-id was clean/dirty by comparing
closed to opened metadata count for the hashtree, and,
increment opened count for hashtree-id. |
| mark_open_empty/2 | Mark/clear metadata for tree-id opened/closed. |
| mem_levels/1 | |
| multi_select_segment/3 | |
| new/0 | |
| new/2 | |
| new/3 | |
| next_rebuild/1 | |
| path/1 | |
| read_meta/2 | |
| rehash_tree/1 | |
| safe_decode/1 | |
| segments/1 | |
| set_next_rebuild/2 | |
| top_hash/1 | |
| update_perform/1 | |
| update_snapshot/1 | |
| update_tree/1 | |
| width/1 | |
| write_meta/3 |
close(State::hashtree()) -> hashtree()
compare(Tree, Remote, AccFun, Acc) -> any()
compare2(Tree, Remote, AccFun, Acc) -> any()
delete(Key::binary(), State::hashtree()) -> hashtree()
destroy(Path::string() | hashtree()) -> ok | hashtree()
estimate_keys(State::hashtree()) -> {ok, integer()}
Estimate number of keys stored in the AAE tree. This is determined by sampling segments to to calculate an estimated keys-per-segment value, which is then multiplied by the number of segments. Segments are sampled until either 1% of segments have been visited or 1000 keys have been observed.
Note: this function must be called on a tree with a valid iterator, such as the snapshotted tree returned from update_snapshot/1 or a recently updated tree returned from update_tree/1 (which internally creates a snapshot). Using update_tree/1 is the best choice since that ensures segments are updated giving a better estimate.flush_buffer(State) -> any()
get_bucket(Level::integer(), Bucket::integer(), State::hashtree()) -> orddict()
insert(Key::binary(), ObjHash::binary(), State::hashtree()) -> hashtree()
insert(Key::binary(), ObjHash::binary() | tombstone, State::hashtree(), Opts::proplist()) -> hashtree()
key_hashes(State::hashtree(), Segment::integer()) -> [{integer(), orddict()}]
levels(State::hashtree()) -> pos_integer()
mark_clean_close(TreeId::index_n() | binary(), State::hashtree()) -> hashtree()
Call on a clean-close to update the meta for a tree-id's closed count
to match the current opened count, which is checked on new/reopen.
mark_open_and_check(TreeId::index_n() | binary(), State::hashtree()) -> hashtree()
Check if shutdown/closing of tree-id was clean/dirty by comparing
closed to opened metadata count for the hashtree, and,
increment opened count for hashtree-id.
next_rebuild to be an incremental one.
Otherwise, if it was a dirty shutdown, set next_rebuild, instead,
to be a full one.
mark_open_empty(TreeId::index_n() | binary(), State::hashtree()) -> hashtree()
Mark/clear metadata for tree-id opened/closed. Set next_rebuild to be incremental.
mem_levels(State::hashtree()) -> integer()
multi_select_segment(State::hashtree(), Segments::['*' | integer()], F::select_fun(T)) -> [{integer(), T}]
new() -> hashtree()
new(TreeId::{index(), tree_id_bin() | non_neg_integer()}, Options::proplist()) -> hashtree()
new(TreeId::{index(), tree_id_bin() | non_neg_integer()}, Options::hashtree()) -> hashtree()
new(X1::{index(), tree_id_bin() | non_neg_integer()}, LinkedStore::hashtree(), Options::proplist()) -> hashtree()
next_rebuild(State::hashtree()) -> next_rebuild()
path(State::hashtree()) -> string()
read_meta(Key::binary(), State::hashtree()) -> {ok, binary()} | undefined
rehash_tree(State::hashtree()) -> hashtree()
safe_decode(Bin::binary()) -> {tree_id_bin() | bad, integer(), binary()}
segments(State::hashtree()) -> pos_integer()
set_next_rebuild(Tree::hashtree(), NextRebuild::next_rebuild()) -> hashtree()
top_hash(State::hashtree()) -> [] | [{0, binary()}]
update_perform(State::hashtree()) -> hashtree()
update_snapshot(State::hashtree()) -> {hashtree(), hashtree()}
update_tree(State::hashtree()) -> hashtree()
width(State::hashtree()) -> pos_integer()
write_meta(Key::binary(), Value::binary() | term(), State::hashtree()) -> hashtree()
Generated by EDoc