Wednesday, April 22, 2015

A Smarter Stream Builder

Background


We are going to provide the "user-land" API using Java 8 Streams. So you can obtain a stream of all Hares this way:

Hare.stream().filter(h->h.getAge()>8).forEach(System.out::println);

The problem is that if you use standard Java 8 streams, then you will iterate over all Hares. If we use a SQL storage engine, this will correspond to

select * from Hare;

If we have 1 000 000 hares, this is not a viable solution.

The Stream Builder

We have developed a smarter Stream Builder concept that implements Stream, IntStream, LongStream and  DoubleStream. The Stream Builder acts like a builder and adds all the translations (like map, filter) to a Pipeline. Whenever a terminal operation is encountered, a pluggable StreamTerminator can inspect the Pipeline and optionally modify the Pipeline and also select different upstream data sources before the real Stream is actually started.

If we take the example above:

Hare.stream().filter(h->h.getAge()>8).forEach(System.out::println);

the StreamTerminator can, in theory, inspect the filter Predicate and see that it can translate it to a SQL command. Thus, it removes the filter command in the Pipeline and modifies the upstream data source like this:

select * from Hare where age>8;

Terminal Operation Driven

Each terminal operation will have its own "call back" in the StreamTerminator so it can act in its best interest. Let's take an example with Stream::count.

Suppose we have:
 Hare.stream(),filter(h-"gray".equals(h.getColor()))
.sorted().map(h->h.getName()).count();

as input. Then the StreamTerminator's count() call back will determine that sorted() and map() does not change the number of items in the stream and thus can be removed from the Pipeline all together. Then it can modify the upstream source to reflect the predicate. So we will end up with:

select count(*) from Hare where color='gray';

For in-memory storage engines, the StreamTerminator can be made very efficient because it may evaluate a number of options to reduce the Pipeline by short-circuiting combinations of Predicates and determining which upstream source that shall be selected to yield the lowest number of iterations for the optimized stream/pipeline.

Check It Out

Check out the com.speedment.util.stream.builder.demo package.


The Hard Part

In the beginning, we are going to use specific predicates that we can recognize easily. One way is to have support classes/methods like Predicates.equals("columnName", value) that we can detect in the Pipeline. If we want to cover the generic case like the one above, we have to decompile the predicates and find out how they operate on the fly.

Future Work

In the future, the Stream Builder could also be applied to normal Java collections so that they may be optimized using redundant step removal, reordering of Pipeline, and upstream data source modifications. Like this seemingly O(N) operation:

collection
    .stream()
    .map((User u)-> u.getName())
    .sorted()
    .limit(limit)
    .count();

would be reduced by the StreamTerminator to:


Math.min(limit, collection.size);

which is an O(1) operation.