This documentation describes:
- Client-to-query processing
- Query validation before execution
- Query rewriting for execution
This section shows the life-cycle of an SQL SELECT
request
Client-side +--+
text query | Protocol
| Buffers
+------------> Lexer +---> Parser +----> Canonicaliser +-----> Proplist
| (in Parser)
Client-side |
parameterised +--+
query
It is worth making the point about conformance with the spec at this stage. Below is an extract from the riak_ql_parser.yrl
language definition of a Numeric Value Expression:
%% 6.27 NUMERIC VALUE EXPRESSION
NumericValueExpression -> Term : '$1'.
NumericValueExpression ->
NumericValueExpression plus_sign Term :
make_expr('$1', '$2', '$3').
NumericValueExpression ->
NumericValueExpression minus_sign Term :
make_expr('$1', '$2', '$3').
Term -> Factor : '$1'.
Term ->
Term asterisk Factor :
make_expr('$1', '$2', '$3').
Term ->
Term solidus Factor :
make_expr('$1', '$2', '$3').
(...)
This is the corresponding section of the SQL Foundation Document:
The SQL Foundation Document can actually be transcribed directly into the riak SQL parser - from whence it determines the structure of the riak SQL lexer as shown in the extract below:
PLUS = (\+)
MINUS = (\-)
ASTERISK = (\*)
SOLIDUS = (/)
The lexer/parser output is canonicalised in a number of ways - this is to ensure that SQL statements that are logically equivalent have an identical (not merely equivalent) output.
A trivial example of this is keyword canonicalisation in the lexer - which will be discussed here for expository purposes. Consider the following equivalent SQL statements
SELECT * FROM mytable;
SelECt * frOm mytable;
select * from mytable;
The lexer canonicalises this trivially in the regex's that define the SQL keyword tokens:
QUANTUM = (Q|q)(U|u)(A|a)(N|n)(T|t)(U|u)(M|m)
SELECT = (S|s)(E|e)(L|l)(E|e)(C|c)(T|t)
DESCRIBE = (D|d)(E|e)(S|s)(C|c)(R|r)(I|i)(B|b)(E|e)
The important canonicalisation (which is implemented in the parser) is the canonicalisation of the WHERE
clause. Consider the following equivalent statements:
SELECT * FROM mytable WHERE field1 >= 10 AND (field2 < 2 OR field3 = 'alice');
SELECT * FROM mytable WHERE (field3 = 'alice' or field2 < 2) AND field1 >= 10;
The WHERE
is a specialist mini-lisp being a tree whose leaves consist of operators that return booleans
:
{'>=', <<"field1">>, {integer, 10}}
{'<', <<"field2">>, {integer, 2}}
{'=', <<"field3">>, {binary, <<"alice">>}}
which can be composed with the standard set of boolean
operators:
{or, EXPR1, EXPR2}
{and, EXPR1, EXPR2}
The WHERE
clause is canonicalised here - with a particular eye on simplifying the query compiler. At this stage the SQL statement is still declatory - it signals the intent to retrieve data - but it is not yet executable.
TODO: break the canconicaliser out into its own module
By emitting a proplist (or set of nested proplists) whose keys equate to record fields. This is to decouple dependencies on .hrl
files between riak_kv
and riak_ql
and make multi-repo releases less of a pain in the neck to wrestle out of the door.
The output of the lexer/parser is a valid SQL statement - but one that does not correspond to a valid query. There are three different types of validation that the query must pass before it is executable:
-
do the fields in various query clauses correspond to fields in the underlying declared table defined by a
CREATE TABLE
statement? The clauses that must be checked are:SELECT
WHERE
- (
ORDER BY
- not implemented yet) - (
GROUP BY
- not implemented yet)
-
TS can only run a subset of queries at the moment
- 'the
WHERE
clause of the query must cover the partition and local keys'
- 'the
-
are any arithmetic operations in the
SELECT
(or the other as-yet-unimplemented clauses) type safe?- for instance you can't divide a
varchar
by aninteger
- for instance you can't divide a
|
DDL DDL DDL Function
Helper | Helper Helper Type Sigs
+ + + +
| | | +-----+-----+
| | |
| | | |
v v v
Declatory Are fields YES Is valid TS YES Is query YES To Query
SQL Select +----> in table? +----------> WHERE clause? +-----> typesafe? +-----> Rewriter
Record + + +
NO | NO | NO |
| | | |
v v v
Error | Error Error
|
Lexer/Parser - QL | Query Compiler KV
Query validation happens in riak_kv
after the query is handed over to be executed
In future the restriction on key covering queries will be relaxed but other validation will be required - for instance when the AS
keyword which implements aliasing is added, or when multi-table inputs (with the requirement for column name disambiguation, etc) is allowed.
The DDL helper functions emitted by the DDL Compiler are key here - see the DDL Compiler
This stage of the validation is straightforward: iterate over the Lisp outputs that correspond to the SELECT
and WHERE
clauses and for every leaf that is a field reference check that its name corresponds to a declared field.
The key restrictions are enforced with the BFL methodology - brute force and ignorance. WHERE
clauses that completely cover the key space are accepted, the rest are errored.
This is similar to the first validation - the Lisp of the SELECT
clause needs to be iterated over - and two checks performed:
- is the application of a function to a field valid?
- for instance you CAN
COUNT
asint64
or adouble
or avarchar
- for instance you CANNOT
AVG
avarchar
- for instance you CAN
- how do the types compose up an arithmetic structure?
The validator requires information from the DDL helper module as well as the type specs from modules that implement the functions (eg COUNT
, AVG
, SUM
) as well as arithmetic operators (+
, -
, /
or *
) to perform this validation.
NOTE: wherever possible functions that validate queries should be generated directly into the helper modules - they are fast, efficient, testable and behave predictably with different underlying data schemas
The SQL query is not changed by the validation process. A given query is either valid - in which case it passed on to the query sub-system for execution - or it is invalid - in which case an appropriate error message is sent back to the user who submitted it.
The query rewriter can be thought of the in the following terms:
- the semantic meaning of the query remains unchanged
- the syntactic form is transformed
- the input is declarative
- this is a statement of the results the users would like to see
- the output is executable
- this is how Riak will return results that match the users declaration
- there is NOT a one-to-one correspondence between the declarative input and the executable output
- the query compiler may, based on heuristics, put two queries with the same declarative form through different executable query plans that have different execution costs
- the data structure that describes a query can be made recursive by hoisting
SELECT
,WHERE
, (not-yet-implemented)GROUP BY
or (not-yet-implemented)ORDER
clauses into theFROM
clause and rewriting them
-record(riak_select_v1,
{
'SELECT' :: #riak_sel_clause_v1{},
'FROM' = <<>> :: binary() | {list, [binary()]} | {regex, list()},
'WHERE' = [] :: [filter()],
'ORDER BY' = [] :: [sorter()],
'LIMIT' = [] :: [limit()],
helper_mod :: atom(),
%% will include group by
%% when we get that far
partition_key = none :: none | #key_v1{},
%% indicates whether this
%% query has already been
%% compiled to a sub query
is_executable = false :: boolean(),
type = sql :: sql | timeseries,
cover_context = undefined :: term(), %% for parallel queries
local_key %% prolly a mistake to put this here - should be in DDL
}).
Notice that the fields in the record fall into 2 distinct categories:
- declarative fields which contain the users intention
SELECT
FROM
WHERE
ORDER BY
LIMIT
- runtime information - required for execution
helper_mod
is_executable
type
cover_context
local_key
At the heart of thinking about query pipelines is the notion of tables which have column descriptors and rows of data. These are make visible by the riak-shell:
✅ riak_shell(27)>select time, weather, temperature from GeoCheckin where myfamily='family1' and myseries='seriesX' and time > 0 and time < 1000;
+----+----------------+---------------------------+
|time| weather | temperature |
+----+----------------+---------------------------+
| 1 | z«êPò¹ |4.19111744258298777600e+18 |
| 2 | ^OOgz^Blu7) |6.07861409217513676800e+18 |
| 3 | ÔÖã |6.84034338181623808000e+17 |
| 4 | ^G |-5.55785206740398080000e+16|
| 5 | ¸LËäà«d |-3.62555783091625574400e+18|
| 6 | ^AE^S¥ |1.11236574770119680000e+18 |
| 7 | ïö?ï^Fv |5.51455556936744140800e+18 |
| 8 | ^FtFVÅë=+#^Y5 |2.44525777392835584000e+17 |
| 9 |ðÁÖ·©Ü^GV^^^DkU |6.90864738609726668800e+18 |
| 10 | QÝZa^QËfQ |5.08590022245487001600e+18 |
+----+----------------+---------------------------+
The columns in this table have implicit type, time
is a timestamp
, weather
is a varchar
and temperature
is a double
.
In discussing the query pipeline we regard column headers as one data structure and row sets as another and discuss how both of them may (or may not) be transformed in different contexts.
Conventionally we also consider SQL queries to unroll left to right:
single SQL statement written by user +----------------> many SQL fragments executed across the ring by riak
query rewriter
Consequently the query pipeline operates right to left:
table returned to user <-----------+ table fragments created in pipeline
Operation
We can conceptualise the declarative statements as being logically related. Consider the following transforms (operating right to left, of course):
Table In Shell Data On Disk
+-------+-------+ +-------+-------+-------+
| ColX | ColY | | Col1 | Col2 | Col3 |
| Type1 | Type2 | | Type1 | Type2 | Type3 |
+-------+-------+ SQL Query +-------+-------+-------+
<--------------+
+-------+-------+ +-------+-------+-------+
| Val1X | Val1Y | | Val1a | Val1b | Val1c |
+---------------+ +-----------------------+
| Val2X | Val2Y | | Val2a | Val2b | Val2c |
+-------+-------+ +-----------------------+
| Val3a | Val3b | Val3c |
+-------+-------+-------+
WHERE
, ORDER BY
and GROUP BY
are all row operations:
+-------+-------+-------+ +-------+-------+-------+
| Col1 | Col2 | Col3 | | Col1 | Col2 | Col3 |
| Type1 | Type2 | Type3 | | Type1 | Type2 | Type3 |
+-------+-------+-------+ Operation +-------+-------+-------+
<-------------+
+-------+-------+-------+ +-------+-------+-------+
| Val1a | Val1b | Val1c | WHERE | Val1a | Val1b | Val1c |
+-----------------------+ GROUP BY +-----------------------+
| Val3a | Val3b | Val3c | ORDER BY | Val2a | Val2b | Val2c |
+-----------------------+ +-----------------------+
| Val6a | Val6b | Val6c | | Val3a | Val3b | Val3c |
+-----------------------+ +-----------------------+
| Val5a | Val5b | Val5c | | Val4a | Val4b | Val4c |
+-------+-------+-------+ +-----------------------+
| Val5a | Val5b | Val5c |
+-----------------------+
| Val6a | Val6b | Val6c |
+-------+-------+-------+
Row operations preserve column names and column types.
SELECT
is both a column operator and a row operator:
+-------+-------+ +-------+-------+-------+
| ColX | ColY | | Col1 | Col2 | Col3 |
| Type1 | Type2 | | Type1 | Type2 | Type3 |
+-------+-------+ Operation +-------+-------+-------+
<-------------+
+-------+-------+ +-------+-------+-------+
| Val1X | Val1Y | SELECT | Val1a | Val1b | Val1c |
+---------------+ +-----------------------+
| Val2X | Val2Y | | Val2a | Val2b | Val2c |
+-------+-------+ +-----------------------+
| Val3a | Val3b | Val3c |
+-------+-------+-------+
SELECT
can transform both column names and column types.
You can see how and why this happens if you consider:
SELECT COUNT(myvarcharfield) AS bobcat FROM mytable;
SELECT COUNT(myintegerfield)/SUM(myintegerfield) FROM mytable;
NOTE: the AS
keyword is not yet implemented.
In this context FROM
can be considered to be the identity operator:
+-------+-------+-------+ +-------+-------+-------+
| Col1 | Col2 | Col3 | | Col1 | Col2 | Col3 |
| Type1 | Type2 | Type3 | | Type1 | Type2 | Type3 |
+-------+-------+-------+ Operation +-------+-------+-------+
<--------------+
+-------+-------+-------+ +-------+-------+-------+
| Val1a | Val1b | Val1c | FROM | Val1a | Val1b | Val1c |
+-----------------------+ +-----------------------+
| Val2a | Val2b | Val2c | | Val2a | Val2b | Val2c |
+-----------------------+ +-----------------------+
| Val3a | Val3b | Val3c | | Val3a | Val3b | Val3c |
+-----------------------+ +-----------------------+
| Val4a | Val4b | Val4c | | Val4a | Val4b | Val4c |
+-----------------------+ +-----------------------+
| Val5a | Val5b | Val5c | | Val5a | Val5b | Val5c |
+-----------------------+ +-----------------------+
| Val6a | Val6b | Val6c | | Val6a | Val6b | Val6c |
+-------+-------+-------+ +-------+-------+-------+
FROM
does 2 things - if it is right at the metal it is an identity operator where the right hand data is on disc and the left hand data is in memory. If it is executed at a higher level it just ingests data for one of the other operators to act on.
Using this knowledge we can trivially rewrite an SQL statement as a series of nested SQL statements - with the nesting being implemented as a logical pipeline.
Let us see this happening for a simple SQL statement (Note this is not a valid timeseries query - it is for expository purposes only):
SELECT, height, weight FROM mytable WHERE height > 10;
This can be rewritten into queries - the results of the right hand of which is operated on by the left hand query's FROM
clause to ingest it:
NOTE: we have unrolled the query left-to-right, but this pipeline then executes right-to-left.
+ FROM <---------------------+ + FROM mytable
| |
| SELECT height, weight | SELECT *
| |
| GROUP BY [] | GROUP BY []
| |
| ORDER BY [] | ORDER BY []
| |
+ WHERE [] + WHERE height > 10
This process is trivially recursive. For operational reasons these operations must be chunked (to prevent memory overflow, unchuncked queries would simply load all the data in to memory, which kinda obviates the very existence of databases as a technology).
+-------+-------+ +-------+-------+-------+
| ColX | ColY | | Col1 | Col2 | Col3 |
| Type1 | Type2 | | Type1 | Type2 | Type3 |
+-------+-------+ SQL Query +-------+-------+-------+
<-----+
+-------+-------+ | +-------+-------+-------+
| Val1X | Val1Y | | | Val1a | Val1b | Val1c |
+---------------+ | Chunk1 +-----------------------+
| Val4X | Val4Y | +----------+ Val2a | Val2b | Val2c |
+-------+-------+ | +-----------------------+
| | Val3a | Val3b | Val3c |
| +-------+-------+-------+
|
|
| +-------+-------+-------+
| | Val4a | Val4b | Val4c |
| Chunk2 +-----------------------+
+----------+ Val5a | Val5b | Val5c |
| +-----------------------+
| | Val6a | Val6b | Val6c |
| +-------+-------+-------+
|
|
| +-------+-------+-------+
| | Val7a | Val7b | Val7c |
| Chunk3 +-----------------------+
+----------+ Val8a | Val8b | Val8c |
+-----------------------+
| Val9a | Val9b | Val0c |
+-------+-------+-------+
The chunking may be for 2 reasons
- there is too much data from a particular vnode to appear in memory
- the data we want is on more than one vnode
Let us now rewrite a Time Series query to see something real:
SELECT device, temp FROM mytimeseries WHERE family = 'myfamily', series = 'myseries', timestamp > 1233
and timestamp < 6789 and temp > 18;
This becomes (again executing right-to-left):
<----Erlang Coordinator-----> <-----LeveldDB C++ Code----->
<---Network--->
+ FROM <----------------+ + FROM mytable on vnode X
| | |
| SELECT device, temp | | SELECT *
| | Chunk1 |
| GROUP BY [] +--------+ GROUP BY []
| | |
| ORDER BY [] | | ORDER BY []
| | |
+ WHERE [] | + WHERE + start_key = {myfamily, myseries, 1233}
| | end_key = {myfamily, myseries, 4000}
| + temp > 18
|
|
| + FROM mytable on vnode Y
| |
| | SELECT *
| Chunk2 |
+--------+ GROUP BY []
|
| ORDER BY []
|
+ WHERE + start_key = {myfamily, myseries, 4001}
| end_key = {myfamily, myseries, 6789}
+ temp > 18
Note 3 things:
- the
FROM
clauses are now no longer logicalFROM
s they are physicalFROM
s - a coverage plan has been constructed and these SQL statements have been dispatched to vnodes to run - the
WHERE
clause has now be rewritten from a declarative one to a executable one - it has a leveldb start and end key to scan and then a clause to run on all values between those end points - this diagram includes the physical locations the various pipeline functions run at - on the right hand side the C++ vnodes, on the left the Erlang coordinator (inside the query sub-system)
This is an accurate view of how Riak TS 1.2 would rewrite that statement. But there is an another obvious, semantically equivalent form that it could be rewritten to, and which would be more efficient (which will be implemented from Riak TS 1.3 onwards):
<----Erlang Coordinator-----> <-----LeveldDB C++ Code----->
<---Network--->
+ FROM <----------------+ + FROM mytable on vnode X
| | |
| SELECT * | | SELECT device, temp
| | Chunk1 |
| GROUP BY [] +--------+ GROUP BY []
| | |
| ORDER BY [] | | ORDER BY []
| | |
+ WHERE [] | + WHERE + start_key = {myfamily, myseries, 1233}
| | end_key = {myfamily, myseries, 4000}
| + temp > 18
|
|
| + FROM mytable on vnode Y
| |
| | SELECT device, temp
| Chunk2 |
+--------+ GROUP BY []
|
| ORDER BY []
|
+ WHERE + start_key = {myfamily, myseries, 4001}
| end_key = {myfamily, myseries, 6789}
+ temp > 18
Performing the selection (which is a column reduction) at the vnode (the right hand side) means that less data is shipped around the cluster than in the case where the selection is performed at the left hand side (in the query coordinator). Imagine if the table mytable
had 100 columns.
This the last essential operation of a query system, building a query plan. This is the most efficient way to rewrite this query to execute with this table which is loaded with data of this shape on a riak cluster with this many nodes and ring of this size.
Strategically the operations are undifferentiated between the Erlang coordinator and the leveldb C++ code. We can see why this is the general case if we rewrite the following SQL function:
SELECT AVG(temp) FROM mytimeseries WHERE family = 'myfamily', series = 'myseries', timestamp > 1233
and timestamp < 6789 and temp > 18;
This becomes (again executing right-to-left):
<----Erlang Coordinator-----> <-----LeveldDB C++ Code----->
<---Network--->
+ FROM <-----------------------+ + FROM mytable on vnode X
| | |
| SELECT SUM(STemp)/SUM(NoTemp) | | SELECT SUM(temp) AS STemp, COUNT(temp) AS NoTemp
| | Chunk1 |
| GROUP BY [] +--------+ GROUP BY []
| | |
| ORDER BY [] | | ORDER BY []
| | |
+ WHERE [] | + WHERE + start_key = {myfamily, myseries, 1233}
| | end_key = {myfamily, myseries, 4000}
| + temp > 18
|
|
| + FROM mytable on vnode Y
| |
| | SELECT SUM(temp) AS STemp, COUNT(temp) AS NoTemp
| Chunk2 |
+--------+ GROUP BY []
|
| ORDER BY []
|
+ WHERE + start_key = {myfamily, myseries, 4001}
| end_key = {myfamily, myseries, 6789}
+ temp > 18
Notice that the AVG
function has been broken into a SUM
and a COUNT
to be run in the C++ code and another pair of SUM
s to be run in the Erlang Node.
Strategically we wish to implement all operators in the pipeline (FROM
, SELECT
, WHERE
, GROUP BY
, ORDER BY
) as an Erlang behaviour (or a behaviour-like construct in C++, a set of function callbacks with defined specs) and this behaviour MUST be identically callable from both the Erlang coordinator and within the C++ leveldb code:
+-------+-------+ +-------+-------+-------+
| ColX | ColY | | Col1 | Col2 | Col3 |
| Type1 | Type2 | | Type1 | Type2 | Type3 |
+-------+-------+ Behaviour +-------+-------+-------+
<--------------+
+-------+-------+ +-------+-------+-------+
| Val1X | Val1Y | | Val1a | Val1b | Val1c |
+---------------+ +-----------------------+
| Val2X | Val2Y | | Val2a | Val2b | Val2c |
+-------+-------+ +-----------------------+
| Val3a | Val3b | Val3c |
+-------+-------+-------+
There are multiple different strategies for querying data in Riak. It is important to understand how TS is different.
There is a blanket query mechanism that requires the query to access the entire keyspace. An example of this would be the 'list buckets' operation - every underlying key in the database is iterated over and a list of unique buckets is returned. To work correctly for all buckets and bucket types (including those created with an n_val
of 1) such a query needs to iterate over each vnode's keyspace.
Other operations, like list the keys in a bucket operate on a more narrow basis. By examining the n_val for a bucket a sub-set of vnodes can be queried. For instance in normal operations ring-size/n_val + 1
vnodes contain all the relevant information. A coverage plan that visits those vnodes with multiple key ranges will find all the keys.
2i behaves similary with respect to a sub-set of the ring - except it visits the index space of the ring - not the key-value part.
Time Series is much more like a single key read. The query comes in and is compiled into up to 5 sub-queries. Each of these sub-queries is like a normal KV single-key read for coverage purposes: go to one of the primaries of this vnode and get me the data (in this case a sub-range, not a value).
The important thing about the riak_sql_select_v1{}
record is that it takes many forms. It contains a number of fields which are semantically consistent but which have implementation differences. SQL is a declarative language and the SQL clause structure is preserved, so fields like SELECT
, FROM
, WHERE
, ORDER BY
and LIMIT
may contain different data structures for the purposes of execution but which carry the same semantic burden.
It would have been possible to have each stage of the pipeline have its own record - and this seems sensible in Time Series when there are basically 2 major types (sql
and timeseries
) but already there are several unimplemented ones shadowly emerging on the road map (eg TS full table scan
, composite key read
). The Spark connector integration could easily be constructed as a new record. This approach would lead to a lot of different records with very similar names and structures.