Sunday, July 26, 2009

Ets Ordered Set Select Efficiency

Ets (and tcerl) ordered_sets have the useful property that range queries on them can be resolved more efficiently than full-table scan. For instance, given the operation

ets:select (Tab, [ { { { foo, 1, '_' }, bar }, [], [ '$_' ] } ])

only the range of keys corresponding to 3-element tuples whose first two elements are 'foo' and 1 will be examined, and this is potentially far less than the entire set of keys.

Recently I've been working on a project that requires a range of partially bound keys. This would correspond to an operation such as

ets:select (Tab, [ { { { foo, '$1', '_' }, bar }, [ { '>=', '$1', 1 }, { '=<', '$1', 5 } ], [ '$_' ] } ])

and so I made the modifications to tcerl to be able to detect match conditions of this form and constrain the query range. I started to wonder if I was reinventing the wheel and whether ets had sophisticated match specification analysis under the hood somewhere. I asked the mailing list but answers were inconclusive so I decided to do some measurements. Using the following module:

-module (etsselect).
-compile (export_all).

populate_ets (Tab, N) ->
ets:delete_all_objects (Tab),
lists:foreach (fun (M) ->
ets:insert (Tab, { { foo, M div 10, M }, bar })
end,
lists:seq (1, N)).

repeat_factor (Size) when Size < 1000 -> 100;
repeat_factor (Size) when Size < 100000 -> 30;
repeat_factor (_) -> 10.

select (MatchSpec, Tab, Repeat) when Repeat > 0 ->
ets:select (Tab, MatchSpec),
select (MatchSpec, Tab, Repeat - 1);
select (_, _, _) ->
ok.

select_time (MatchSpec) ->
Tab = ets:new (?MODULE, [ ordered_set ]),
try
[ { N,
(fun () ->
populate_ets (Tab, N),
{ Time, _ } = timer:tc (?MODULE, select, [ MatchSpec, Tab, Repeat ]),
Time / Repeat
end) ()
}
|| X <- lists:seq (1, 9),
N <- [ trunc (math:pow (5, X)) ],
Repeat <- [ repeat_factor (N) ]
]
after
ets:delete (Tab)
end.

I did some timing tests with different match specifications and table sizes. The match specifications were:

[{ '_', [], [ '$_' ] }] % unconstrained
[{{{ foo, 1, '_' }, bar }, [], [ '$_' ] }] % prefix bound
[{{{ foo, '$1', '_' }, bar },
[{ '>=', '$1', 1 }, { '=<', '$1', 1 }],
[ '$_' ]
}] % prefix range bound

The results (using R12B5) indicate that the prefix range bound condition is not being optimized by ets.
The tcerl timings indicate the prefix range bound condition is being optimized (they also indicate much worse constant factors then ets).
Note I haven't released these changes yet, but I will soon as 1.3.1h. Versions of tcerl prior to 1.3.1h will do a full table scan in the prefix range bound case.

Wednesday, July 1, 2009

osmos

I just released the first version of osmos, a pure Erlang library that provides on-disk ordered set tables which allow thousands of updates per second with ACID properties.

It achieves that update rate using a rather different structure from a traditional B-tree based table (like the ones used by most RDBMSs or provided by DBM libraries like BDB or tokyocabinet): an incremental merge sort with user-defined merge semantics.

Motivation

Ordinarily, the rate of updates to an on-disk table is limited by the need to seek around and update an index in place. With a typical seek time on the order of 10 ms, this makes it challenging to scale past about 100 updates/s. Most strategies for going beyond that involve some kind of partitioning, either over multiple disks, multiple machines, or both.

However, a few key observations[1] point to a different strategy:
  1. The reason for updating an index on every write is the expectation that reads are much more frequent than writes, so that read efficiency is the dominating factor. But if writes are far more frequent than reads, you can use some kind of lazy updating to delay the work until absolutely necessary, and combine the work from multiple updates.
  2. An extreme example of a write-dominated, lazily updated database is a full-text inverted index for a search engine. To build one, you might typically read in billions of term-document pairs, sort them by term using some form of external merge sort, and then create a highly optimized index before ever handling a single query.
  3. A merge sort can operate continuously, by injecting new records in sorted batches, and then merging the batches as necessary to maintain a set of sorted files with exponentially increasing sizes. And crucially, this kind of incremental merge sort process can allow relatively efficient searches of the data while it's operating, by binary-searching each sorted file. (An example of this is the incremental indexing provided by Lucene.)
This gives you an ordered set table with a slight increase in the cost of a search (with N records, maybe an extra factor of log N). But the cost of a write is tiny, and mostly delayed: about log N future comparisons during merging, and log N future disk writes, but since all the disk writes are sequential, they will be buffered, and writing to the table requires no explicit seeking at all.[2]

User-defined merging

Things get even more interesting when you let the user control how records are merged. In the osmos model, there is at most one record for any given key; if two records with the same key are encountered during the merge sort, the user's merge function is called to merge the two records into a single record.

The merge function can be any function

Merge(Key, EarlierValue, LaterValue) -> MergedValue
that is associative, i.e.,

Merge(K, Merge(K, V1, V2), V3) =:=
Merge(K, V1, Merge(K, V2, V3))
for any key K and any consecutive sequence of values V1, V2, V3.

This allows a wide variety of semantics for writing to the table. For example:
  • If the merge function always returns the later value, then a write replaces any previous value, like an ordinary key-value store.
  • If the values are integers, and the merge function returns the sum of the two values, then writing to the table acts like transactionally incrementing a counter.
Similarly, you could use any associative function of two numbers; you could apply such a function to each element of a vector of numbers; or you could apply a different function to each element. For example, to keep a minimum, maximum, and average over some set of keys, you could use something like:

merge(_K, N1, N2)
when is_number(N1), is_number(N2) ->
{min(N1, N2), max(N1, N2), N1 + N2, 2};
merge(_K, N1, {Min2, Max2, Sum2, Count2})
when is_number(N1) ->
{min(N1, Min2), max(N1, Max2),
N1 + Sum2, 1 + Count2};
merge(_K, {Min1, Max1, Sum1, Count1}, N2)
when is_number(N2) ->
{min(Min1, N2), max(Max1, N2),
Sum1 + N2, Count1 + 1};
merge(_K, {Min1, Max1, Sum1, Count1},
{Min2, Max2, Sum2, Count2}) ->
{min(Min1, Min2), max(Max1, Max2),
Sum1 + Sum2, Count1 + Count2}.
This lets you write single numbers as values, but read back either {Min, Max, Sum, Count} tuples (if more than one number has been written for a key) or single numbers (if that was the only value written). To do this with an ordinary key-value table and multiple writers would require expensive transactions, but with osmos, operations like this are no more expensive than replacement, but still ACID.

As you can see, keeping statistics for reporting (when the reports are queried infrequently relative to data collection) is one of the killer applications for a merge sort table.

Among the possibilities for even wackier merge operations are:
  • Always return the earlier value. (What was the first value that occurred for this key?)
  • Take the union of two sets. (What are all the values that have occurred for this key?)
  • Take the intersection of two sets. (What values have always occurred for this key?)
  • Multiply two NxN matrices, e.g., to keep track of a series of rotations applied to a vector in RN.
  • Compose a series of arbitrary operations applied to a space of replaceable objects, e.g.:

    merge(_K, _, V) when ?is_value(V) ->
    V;
    merge(_K, V, Op) when ?is_value(V),
    ?is_operation(Op) ->
    do_operation(Op, V);
    merge(_K, V, Ops) when ?is_value(V),
    is_list(Ops) ->
    lists:foldl (fun (Op, V) ->
    do_operation(Op, V)
    end,
    V,
    Ops);
    merge(_K, Op1, Op2) when ?is_operation(Op1),
    ?is_operation(Op2) ->
    [Op1, Op2];
    merge(_K, Op, Ops) when ?is_operation(Op),
    is_list(Ops) ->
    [Op | Ops];
    merge(_K, Ops, Op) when is_list(Ops),
    ?is_operation(Op) ->
    Ops ++ [Op];
    merge(_K, Ops1, Ops2) when is_list(Ops1),
    is_list(Ops2) ->
    Ops1 ++ Ops2.
    The values could be employee records, and the operations could be things like, “change street address to X,” “increase salary by Y%.” Using this pattern, you can get extremely cheap transactional safety for any single-key operation, as long as your merge function implements it.

API

The basic API is quite simple:

{ok, Table} = osmos:open(Table, [{directory, D}, {format, F}])
to open a table named Table with the format F in the directory D;

ok = osmos:write(Table, Key, Value)
to write a record to the table;

case osmos:read(Table, Key) of
{ok, Value} -> ...;
not_found -> ...
end
to read the record for a key; and

ok = osmos:close(Table)
to close the table.

You can also iterate over a range of keys in chunks using osmos:select_range/5 and osmos:select_continue/3. The results from a select provide a consistent snapshot of the table, meaning that the results always reflect the contents of the table at the time of the original call to select_range. (In other words, any writes that happen between subsequent calls to select_continue won't affect the results.)

A table format is a record with the following fields:
  • block_size::integer(): block size of the table in bytes, controlling the size of disk reads (which are always whole blocks), and the fanout of the on-disk search trees.
  • key_format::#osmos_format{}: on-disk format for keys. (A pair of functions to convert some set of terms to binaries and back.)
  • key_less::(KeyA, KeyB) -> bool(): comparison function defining the order of keys in the table. Takes two native-format keys, and returns true if the first argument is less than the second argument.
  • value_format::#osmos_format{}: on-disk format for values.
  • merge::(Key, EarlierValue, LaterValue) -> MergedValue: the merge function described above.
  • short_circuit::(Key, Value) -> bool(): function which allows searches of the table to be terminated early (short-circuited) if it can be determined from a record that any earlier records with the same key are irrelevant.
  • delete::(Key, Value) -> bool(): function controlling when records are deleted from the table.
There are several pre-canned formats available from the function osmos_table_format:new/3, or you can build your own #osmos_table_format{} record as needed.

Performance

The file tests/osmos_benchmark_1.erl in the source distribution contains a benchmark that uses variable-length binaries as keys (some more frequent than others, with an average length of 15 bytes), and nonnegative integers as values, encoded in 64 bits, where merging takes the sum of the two values. One process writes random keys and values as fast as it can, while another process reads random keys with a 10 ms sleep between reads.

I ran the benchmark for 15 minutes on a 2.2 GHz dual-core MacBook, and got the following numbers:
  • 5028735 total records written, for an average of 5587 writes/second (including the time to compute random keys, etc.)
  • an average of 109.8 microseconds per write call, which would mean a theoretical maximum write rate of 9111 writes/second (for this table format, machine, etc.)
  • the median time per write call was 30 microseconds, and the 90th percentile was 54 microseconds, indicating that the vast majority of writes are extremely quick
  • an average of 1900 microseconds (1.9 ms) per read call
  • the median time per read call was 979 microseconds, and the 90th percentile was 2567 microseconds

I reran the benchmark for 2 hours on the same machine, and got the following numbers:
  • 17247676 total records written, for an average of 2396 writes/second
  • an average of 329.7 microseconds per write call, for a theoretical maximum write rate of 3033 writes/second
  • the median time per write call was 39 microseconds, and the 90th percentile was 88 microseconds
  • an average of 13076 microseconds (13 ms) per read call
  • the median time per read call was 6081 microseconds, and the 90th percentile was 34094 microseconds
The table had about 400 MB of data on the disk (in 7 files) at the end of the 2-hour run. This shows that read performance does start to suffer a bit as the amount of data on the disk grows, but writes remain very fast. (In fact, if there were no reads competing with writes, I wouldn't expect the write times to degrade even that much, since all that's happening synchronously during a write call is a buffered write to a journal file, and an insert into an in-memory tree.)

[1] I have to thank my good friend Dave Benson for introducing me to these ideas, and the generalized merge sort table. His excellent library GSK provides a very similar table API (GskTable) for people who want to write single-threaded servers in C. (I ripped off several of his design ideas, including prefix compression and the variable-length integer encoding.)
[2] Of course there may be implicit seeking due to the need to access multiple files at the same time. But for sequential access, the OS and disk hardware can mitigate this to a large degree as long as the number of files isn't too large.