Assuming you have the following database table:
user | date | column_a | column_b |
---|---|---|---|
user1 | 2012-01-01 | 1 | 1 |
user1 | 2012-01-06 | 2 | 0 |
user2 | 2012-02-19 | 0 | 3 |
You can use sql-agg to extract aggregated data from the table as follows:
from datetime import date
from sqlagg import QueryContext
from sqlagg.columns import CountColumn, SimpleColumn, SumColumn
from sqlagg.filters import GT, LT
# create the columns
user = SimpleColumn("user")
i_a = SumColumn("column_a")
i_b = CountColumn("column_b")
# initialise the query context and add the columns to it
vc = QueryContext("table_name",
filters=[GT('date', 'startdate'), LT('date', 'enddate')],
group_by=["user"])
vc.append_column(user)
vc.append_column(i_a)
vc.append_column(i_b)
filter_values={
"startdate": date(2012, 1, 1),
"enddate": date(2012, 3, 1)
}
# resolve the query context with the filter values (connection is an SQLAlchemy connection)
vc.resolve(connection, filter_values=filter_values)
data = vc.data
The resultant data
variable will be a dictionary as follows:
{
"user1": {
"user": "user1",
"column_a": 3,
"column_b": 2
},
"user2": {
"user": "user2",
"column_a": 0,
"column_b": 1
}
}
Multi-level grouping can be done by adding multiple SimpleColumn's to the QueryContext as well as multiple column names in the 'group_by' parameter of the QueryContext.
region = SimpleColumn("region")
sub_region = SimpleColumn("sub_region")
column_a = SumColumn("column_a")
vc = QueryContext("table_name",
filters=None,
group_by=["region","sub_region"])
The resultant data would look as follows:
{
("region1", "sub_region1"): {
"region": "region1",
"sub_region": "sub_region1",
"column_a": 1
},
("region1", "sub_region2"): {
"region": "region1",
"sub_region": "sub_region2",
"column_a": 3
},
("region2", "sub_region3"): {
"region": "region2",
"sub_region": "sub_region3",
"column_a": 2
}
}
For each column you can specify the table
, filters
and also group_by
fields. Using these features you can supply
different filters per column or select data from different columns.
column_a = SumColumn("column_a")
column_b = SumColumn("column_b", filters=[LT('date', 'enddate')])
In this case column_a
will get the filters supplied to the QueryContext
while column_b
will be resolved with its own
filters. This will result in two queries being run on the database.
It is possible to select data from different tables by providing columns with different table_name
s.
column_a = SumColumn("column_a")
column_b = SumColumn("column_b", table_name="table_b", group_by=["user"])
Here column_a
will be selected from the table configured in the QueryContext while column_b
will be selected from
table_name and will be grouped by user. This will result in two queries being run on the database.
It is possible to use the same column in multiple columns by specifying the alias
argument of the column.
sum_a = SumColumn("column_a", alias="sum_a")
count_a = CountColumn("column_a", alias="count_a")
The resulting data will use the alias
keys to reference the values.
Simple
num_wheels = SumWhen("vehicle", whens={"unicycle": 1, "bicycle": 2, "car": 4}, else_=0, alias="num_wheels")
Complex
num_children = SumWhen(whens={"users.age < 13": 1}, else_=0, alias="children")
Useful if you want to use a column more than once but don't want to re-calculate its value.
sum_a = SumColumn("column_a")
aggregate = AggregateColumn(lambda x, y: x / y,
AliasColumn("column_a"),
SumColumn("column_b")
TODO: custom queries
The QueryContext
and most column classes accept a filters
parameter which must be iterable.
Each element of this iterable must be a subclass of sqlagg.filter.SqlFilter
. The elements of this
parameter are combined using the AND
operator.
i.e.
filters = [EQ('user', 'username'), EQ('role', 'admin')]
is equivalent to:
filters = [AND([
EQ('user', 'username'), EQ('role', 'admin')
])]
Any filter expression can be expressed using a RawFilter:
RawFilter('"user" = :username AND "date" between :start and :end')
In this case the same filter could be expressed as follows:
AND([EQ('user', 'username'), BETWEEN('date', 'start', 'end'])
To install dependencies, create/activate a virtualenv and run
pip install -e .[test]
First create an environment variable for the appropriate connection string:
export SQLAGG_TEST_CONNECTION_STRING='postgresql://user:pass@localhost:5432/sqlagg_test'
Then run the following
pytest
To publish a new release on pypi.org:
- Update
__version__
in sqlagg/__init__.py. - Create and push a git tag with the new version number.
git tag vX.Y.Z git push --tags
- Wait for the pypi.yml workflow to build and publish the new release.
A dev release is published on test.pypi.org on each new push or merge to master. A dev release may also be published on-demand for any branch with workflow dispatch.