Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

OpenSTEF Integration #585

Closed
wants to merge 82 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
82 commits
Select commit Hold shift + click to select a range
3685c59
Added OpenSTEF components
rodalynbarce Nov 27, 2023
c22e137
Remove commented out code
rodalynbarce Nov 27, 2023
a144296
Added ENTSO-E and MFFBAS source components
rodalynbarce Nov 27, 2023
08f908a
Black formatting
rodalynbarce Nov 27, 2023
f620c5a
Update environment.yml
rodalynbarce Nov 27, 2023
1547cf0
Added init
rodalynbarce Nov 27, 2023
4b6d2b7
Black formatting
rodalynbarce Nov 27, 2023
0c15a4c
Update openstef-dbc version
rodalynbarce Nov 27, 2023
a058109
Updates
rodalynbarce Nov 27, 2023
a24fbeb
Test Updates
rodalynbarce Nov 27, 2023
76c2742
Updates
rodalynbarce Nov 27, 2023
a7e242b
Test updates
rodalynbarce Nov 28, 2023
2ae2747
Envrionment changes
rodalynbarce Nov 30, 2023
abbf34a
Update env
rodalynbarce Nov 30, 2023
0dd717f
Black formatting
rodalynbarce Nov 30, 2023
6212be0
Updates
rodalynbarce Dec 1, 2023
b762495
Summary Statistics API (#588)
cching95 Nov 27, 2023
a79e793
fix standard deviation bug (#591)
cching95 Nov 28, 2023
3631116
Update documentation links on APIs (#592)
cching95 Nov 28, 2023
c6dff0e
Source connector for CASIO System Level Data (#579)
IW-SS Nov 29, 2023
6dcdcec
Refactored Time series Unit Tests (#598)
cching95 Nov 30, 2023
48ecec5
update api links (#602)
cching95 Dec 1, 2023
dfb9cf1
Update serializer
rodalynbarce Dec 8, 2023
377c709
Added package version requirement
rodalynbarce Dec 8, 2023
8c7f452
Blog OPC UA to Delta Lake (#604)
srjhunjhunwalacorp Dec 5, 2023
73215cb
Source connector for ERCOT System Level Data (#583)
IW-SS Dec 6, 2023
02c4e39
Black formatting
rodalynbarce Dec 8, 2023
da7d53d
Update tests
rodalynbarce Dec 8, 2023
23258fb
Updates
rodalynbarce Dec 8, 2023
d5b477e
Black formatting
rodalynbarce Dec 8, 2023
a4d6f33
Updates
rodalynbarce Dec 8, 2023
13242d7
Revert changes back
rodalynbarce Dec 11, 2023
185ad12
Add test skip clause
rodalynbarce Dec 11, 2023
661feb2
Formatting
rodalynbarce Dec 11, 2023
c40d3a7
Removed 3.8
rodalynbarce Dec 13, 2023
ae486a5
Update documentation links on APIs (#592)
cching95 Nov 28, 2023
b4cb271
update api links (#602)
cching95 Dec 1, 2023
51b9aff
Mirico Transformer, Documentation and Unit Tests (#608)
cching95 Dec 8, 2023
c19c660
add mirico transformer to init (#609)
cching95 Dec 8, 2023
36b6247
Merge branch 'develop' into feature/integration_a
rodalynbarce Dec 13, 2023
6bc6e45
Summary of Statistics Query with Documentation and Unit Tests (#582)
cching95 Nov 24, 2023
7cbbc09
Added OpenSTEF components
rodalynbarce Nov 27, 2023
ff72dcc
Remove commented out code
rodalynbarce Nov 27, 2023
f1bd4d1
Added ENTSO-E and MFFBAS source components
rodalynbarce Nov 27, 2023
18b218c
Black formatting
rodalynbarce Nov 27, 2023
dee363f
Update environment.yml
rodalynbarce Nov 27, 2023
fe43e08
Added init
rodalynbarce Nov 27, 2023
f7c5035
Black formatting
rodalynbarce Nov 27, 2023
32acac5
Update openstef-dbc version
rodalynbarce Nov 27, 2023
1a1c486
Updates
rodalynbarce Nov 27, 2023
c83bd41
Test Updates
rodalynbarce Nov 27, 2023
34238c7
Updates
rodalynbarce Nov 27, 2023
81685d1
Test updates
rodalynbarce Nov 28, 2023
b824eac
Envrionment changes
rodalynbarce Nov 30, 2023
92ef18d
Update env
rodalynbarce Nov 30, 2023
38d6e9b
Black formatting
rodalynbarce Nov 30, 2023
c82288a
Updates
rodalynbarce Dec 1, 2023
dfcad22
Summary Statistics API (#588)
cching95 Nov 27, 2023
67e075c
fix standard deviation bug (#591)
cching95 Nov 28, 2023
106e588
Update documentation links on APIs (#592)
cching95 Nov 28, 2023
f4d6700
Source connector for CASIO System Level Data (#579)
IW-SS Nov 29, 2023
07b03d6
Refactored Time series Unit Tests (#598)
cching95 Nov 30, 2023
43101ba
update api links (#602)
cching95 Dec 1, 2023
3255d20
Update serializer
rodalynbarce Dec 8, 2023
7efe268
Added package version requirement
rodalynbarce Dec 8, 2023
4712b92
Blog OPC UA to Delta Lake (#604)
srjhunjhunwalacorp Dec 5, 2023
9815f2c
Source connector for ERCOT System Level Data (#583)
IW-SS Dec 6, 2023
e8faa7a
Black formatting
rodalynbarce Dec 8, 2023
69bc110
Update tests
rodalynbarce Dec 8, 2023
c6b4ed9
Updates
rodalynbarce Dec 8, 2023
c0735c3
Black formatting
rodalynbarce Dec 8, 2023
9e87eb5
Updates
rodalynbarce Dec 8, 2023
f9fb15e
Revert changes back
rodalynbarce Dec 11, 2023
849b6ac
Add test skip clause
rodalynbarce Dec 11, 2023
cd2b8c8
Formatting
rodalynbarce Dec 11, 2023
af212ff
Removed 3.8
rodalynbarce Dec 13, 2023
e0fed4c
Update documentation links on APIs (#592)
cching95 Nov 28, 2023
d58e6d2
update api links (#602)
cching95 Dec 1, 2023
3403b12
Mirico Transformer, Documentation and Unit Tests (#608)
cching95 Dec 8, 2023
15b7b22
add mirico transformer to init (#609)
cching95 Dec 8, 2023
eb46640
Merge remote-tracking branch 'origin/feature/integration_a' into feat…
rodalynbarce Dec 13, 2023
407bf0b
Merge branch 'develop' into feature/integration_a
rodalynbarce Dec 13, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/ISSUE_TEMPLATE/security_vulnerability.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ If applicable, add screenshots to help explain your problem.
**Installation Setup (please complete the following information):**

- OS: [e.g. iOS]
- Python Version: [e.g. 3.11, 3.8]
- Python Version: [e.g. 3.11, 3.9]
- SDK Version: [e.g. 22]

**Additional context**
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ jobs:
- pyspark: "3.4.1"
python-version: "3.9"
- pyspark: "3.4.1"
python-version: "3.10"
python-version: "3.10"
- pyspark: "3.4.0"
python-version: "3.9"
- pyspark: "3.4.0"
Expand Down
2 changes: 2 additions & 0 deletions docs/sdk/code-reference/integrations/openstef/database.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
# OpenSTEF Integration with RTDIP
::: src.sdk.python.rtdip_sdk.integrations.openstef.database
2 changes: 2 additions & 0 deletions docs/sdk/code-reference/pipelines/sources/python/entsoe.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
# Read from ENTSO-E API
::: src.sdk.python.rtdip_sdk.pipelines.sources.python.entsoe
2 changes: 2 additions & 0 deletions docs/sdk/code-reference/pipelines/sources/python/mffbas.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
# Read from MFFBAS API
::: src.sdk.python.rtdip_sdk.pipelines.sources.python.mffbas
4 changes: 3 additions & 1 deletion environment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -77,4 +77,6 @@ dependencies:
- build==0.10.0
- deltalake==0.10.1
- trio==0.22.1

- openstef-dbc==3.6.17
- sqlparams==5.1.0
- entsoe-py==0.5.10
4 changes: 4 additions & 0 deletions mkdocs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,8 @@ nav:
- Azure Active Directory: sdk/authentication/azure.md
- Databricks: sdk/authentication/databricks.md
- Code Reference:
- Integrations:
- OpenSTEF: sdk/code-reference/integrations/openstef/database.md
- Pipelines:
- Sources:
- Spark:
Expand Down Expand Up @@ -182,6 +184,8 @@ nav:
- Python:
- Delta: sdk/code-reference/pipelines/sources/python/delta.md
- Delta Sharing: sdk/code-reference/pipelines/sources/python/delta_sharing.md
- ENTSO-E: sdk/code-reference/pipelines/sources/python/entsoe.md
- MFFBAS: sdk/code-reference/pipelines/sources/python/mffbas.md
- Transformers:
- Spark:
- Binary To String: sdk/code-reference/pipelines/transformers/spark/binary_to_string.md
Expand Down
17 changes: 17 additions & 0 deletions src/sdk/python/rtdip_sdk/integrations/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# Copyright 2022 RTDIP
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from .openstef.database import *
from .openstef.interfaces import *
from .openstef.serializer import *
13 changes: 13 additions & 0 deletions src/sdk/python/rtdip_sdk/integrations/openstef/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# Copyright 2022 RTDIP
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
250 changes: 250 additions & 0 deletions src/sdk/python/rtdip_sdk/integrations/openstef/_query_builder.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,250 @@
# Copyright 2022 RTDIP
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import re
from jinja2 import Template

aggregate_window = r"\|> aggregateWindow"


def _build_parameters(query):
columns = {
"weather": ["input_city", "source"],
"power": ["system"],
"prediction_taheads": ["customer", "pid", "tAhead", "type"],
"prediction": ["customer", "pid", "type"],
"marketprices": ["Name"],
"sjv": ["year_created"],
}

parameters = {}

measurement_pattern = r'r(?:\._measurement|(\["_measurement"\]))\s*==\s*"([^"]+)"'
table = re.search(measurement_pattern, query).group(2)
parameters["table"] = table
parameters["columns"] = columns[parameters["table"].lower()]

start = re.search(r"start:\s+([^|,Z]+)", query).group(1)
stop = re.search(r"stop:\s+([^|Z)]+)", query).group(1).strip()
parameters["start"] = start
parameters["stop"] = stop

window_pattern = r"\|> aggregateWindow\((.*)\)"
window = re.findall(window_pattern, query)
if window:
every = re.findall(r"every: ([^,]+)m", str(window))
parameters["time_interval_rate"] = every

fn = re.findall(r"fn: ([^,\]']+)", str(window))
parameters["agg_method"] = fn

parameters["time_interval_unit"] = "minute"
parameters["range_join_seconds"] = int(parameters["time_interval_rate"][0]) * 60

filter_sections = re.findall(
r"\|> filter\(fn: \(r\) => (.*?)(?=\s*\||$)", query, re.DOTALL
)
_filter = " AND ".join(["(" + i.strip() for i in filter_sections])

where = re.sub(r'r\.([\w]+)|r\["([^"]+)"\]', r"\1\2", _filter)
if where.count("(") != where.count(")"):
where = "(" + where

parameters["where"] = where

yields = re.findall(r"\|> yield\(name: \"(.*?)\"\)", query)
if yields:
parameters["yield"] = yields

create_empty = re.search(r"createEmpty: (.*?)\)", query)
parameters["createEmpty"] = "true"
if create_empty:
parameters["createEmpty"] = create_empty.group(1)

return parameters


def _raw_query(query: str) -> list:
parameters = _build_parameters(query)

flux_query = (
"{% if table == 'weather'%}"
'WITH raw_events AS (SELECT Latitude, Longitude, EnqueuedTime, EventTime AS _time, Value AS _value, Status, Latest, EventDate, TagName, split(TagName, ":") AS tags_array, tags_array[0] AS _field, tags_array[1] AS input_city, tags_array[2] AS source, "weather" AS _measurement FROM `weather`) '
"{% else %}"
'WITH raw_events AS (SELECT EventTime AS _time, Value AS _value, Status, TagName, split(TagName, ":") AS tags_array, '
"tags_array[0] AS _field, "
"{% for col in columns %}"
"tags_array[{{ columns.index(col) + 1 }}] AS {{ col }}, "
"{% endfor %}"
'"{{ table }}" AS _measurement FROM `{{ table }}`)'
"{% endif %}"
'SELECT * FROM raw_events WHERE {{ where }} AND _time BETWEEN to_timestamp("{{ start }}") AND to_timestamp("{{ stop }}")'
)

sql_template = Template(flux_query)
sql_query = sql_template.render(parameters)
return [sql_query]


def _resample_query(query: str) -> list:
parameters = _build_parameters(query)
parameters["filters"] = re.findall(r'r\.system == "([^"]+)"', query)

resample_base_query = (
"{% if table == 'weather'%}"
'WITH raw_events AS (SELECT Latitude, Longitude, EnqueuedTime, EventTime AS _time, Value AS _value, Status, Latest, EventDate, TagName, split(TagName, ":") AS tags_array, tags_array[0] AS _field, tags_array[1] AS input_city, tags_array[2] AS source, "weather" AS _measurement FROM `weather`) '
"{% else %}"
'WITH raw_events AS (SELECT EventTime AS _time, Value AS _value, Status, TagName, split(TagName, ":") AS tags_array, '
"tags_array[0] AS _field, "
"{% for col in columns %}"
"tags_array[{{ columns.index(col) + 1 }}] AS {{ col }}, "
"{% endfor %}"
'"{{ table }}" AS _measurement FROM `{{ table }}`)'
"{% endif %}"
', raw_events_filtered AS (SELECT * FROM raw_events WHERE {{ where }} AND _time BETWEEN to_timestamp("{{ start }}") AND to_timestamp("{{ stop }}"))'
', date_array AS (SELECT DISTINCT TagName, explode(sequence(to_timestamp("{{ start }}") - INTERVAL "{{ time_interval_rate[0] + " " + time_interval_unit }}", to_timestamp("{{ stop }}"), INTERVAL "{{ time_interval_rate[0] + " " + time_interval_unit }}")) AS timestamp_array FROM raw_events_filtered) '
', date_intervals AS (SELECT TagName, date_trunc("{{time_interval_unit}}", timestamp_array) - {{time_interval_unit}}(timestamp_array) %% {{ time_interval_rate[0] }} * INTERVAL 1 {{time_interval_unit}} AS timestamp_array FROM date_array) '
", window_buckets AS (SELECT TagName, timestamp_array AS window_start, timestampadd({{ time_interval_unit }}, {{ time_interval_rate[0] }}, timestamp_array) as window_end FROM date_intervals) "
", resample AS (SELECT /*+ RANGE_JOIN(a, {{ range_join_seconds }}) */ a.TagName, window_end AS _time, {{ agg_method[0] }}(_value) AS _value, Status, _field"
"{% for col in columns if columns is defined and columns|length > 0 %}"
", b.{{ col }}"
"{% endfor %}"
" FROM window_buckets a LEFT JOIN raw_events_filtered b ON a.window_start <= b._time AND a.window_end > b._time AND a.TagName = b.TagName GROUP BY ALL) "
)

if len(re.findall(aggregate_window, query)) == 1:
flux_query = (
f"{resample_base_query}"
"{% if createEmpty == 'true' %}"
", fill_nulls AS (SELECT *, last_value(_field, true) OVER (PARTITION BY TagName ORDER BY TagName ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS _field_forward, first_value(_field, true) OVER (PARTITION BY TagName ORDER BY TagName ROWS BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING) AS _field_backward "
"{% for col in columns if columns is defined and columns|length > 0 %}"
", last_value({{ col }}, true) OVER (PARTITION BY TagName ORDER BY TagName ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS {{ col }}_forward, first_value({{ col }}, true) OVER (PARTITION BY TagName ORDER BY TagName ROWS BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING ) AS {{ col }}_backward "
"{% endfor %}"
" FROM resample"
"{% if yield is defined and yield|length > 0 %}"
' ), resample_results AS (SELECT "{{ yield[0] }}" AS result, '
"{% else %}"
' ), resample_results AS (SELECT "_result" AS result, '
"{% endif %}"
'_time, _value, "Good" AS Status, TagName, coalesce(_field_forward, _field_backward) AS _field '
"{% for col in columns if columns is defined and columns|length > 0 %}"
", CAST(coalesce({{ col }}_forward, {{ col }}_backward) AS STRING) AS {{ col }} "
"{% endfor %}"
'FROM fill_nulls WHERE _time > to_timestamp("{{ start }}") GROUP BY ALL '
"ORDER BY TagName, _time) "
"{% else %}"
"{% if yield is defined and yield|length > 0 %}"
', resample_results AS (SELECT "{{ yield[0] }}" AS result, '
"{% else %}"
', resample_results AS (SELECT "_result" AS result, '
"{% endif %}"
'_time, _value, "Good" AS Status, TagName, _field '
"{% for col in columns if columns is defined and columns|length > 0 %}"
", CAST({{ col }} AS STRING) "
"{% endfor %}"
'FROM resample WHERE _time > to_timestamp("{{ start }}") AND _field IS NOT NULL '
"ORDER BY TagName, _time) "
"{% endif %}"
)

flux_query = f"{flux_query}" "SELECT * FROM resample_results "

sql_template = Template(flux_query)
sql_query = sql_template.render(parameters)
return [sql_query]

elif len(re.findall(aggregate_window, query)) > 1:
sql_query = (
f"{resample_base_query}"
', resample_sum AS (SELECT /*+ RANGE_JOIN(a, {{ range_join_seconds }}) */ "load" AS result, _time, sum(_value) AS _value, "Good" AS Status FROM window_buckets a LEFT JOIN resample b ON a.window_start <= b._time AND a.window_end > b._time AND a.TagName = b.TagName WHERE _time < to_timestamp("{{ stop }}") GROUP BY ALL)'
', resample_count AS (SELECT /*+ RANGE_JOIN(a, {{ range_join_seconds }}) */ "nEntries" AS result, _time, count(_value) AS _value, "Good" AS Status FROM window_buckets a LEFT JOIN resample b ON a.window_start <= b._time AND a.window_end > b._time AND a.TagName = b.TagName WHERE _time < to_timestamp("{{ stop }}") GROUP BY ALL)'
)

sum_query = f"{sql_query}" " SELECT * FROM resample_sum ORDER BY _time"

count_query = f"{sql_query}" " SELECT * FROM resample_count ORDER BY _time"

sum_template = Template(sum_query)
sum_query = sum_template.render(parameters)

count_template = Template(count_query)
count_query = count_template.render(parameters)
return [sum_query, count_query]


def _pivot_query(query: str) -> list:
parameters = _build_parameters(query)
parameters["filters"] = re.findall(r'r\.system == "([^"]+)"', query)

flux_query = (
'WITH raw_events AS (SELECT EventTime AS _time, Value AS _value, Status, TagName, split(TagName, ":") AS tags_array, tags_array [0] AS _field, tags_array [1] AS system, "power" AS _measurement FROM `power`)'
', raw_events_filtered AS (SELECT *, ROW_NUMBER() OVER (PARTITION BY system ORDER BY _time) AS ordered FROM raw_events WHERE {{ where }} AND _time BETWEEN to_timestamp("{{ start }}") AND to_timestamp("{{ stop }}"))'
", pivot_table AS (SELECT _time, Status, TagName, _field, _measurement, "
"{% for filter in filters %}"
" first_value({{ filter }}, true) OVER (PARTITION BY _time ORDER BY _time, TagName ROWS BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING) AS {{ filter }} "
"{% if not loop.last %}"
", "
"{% endif %}"
"{% endfor %}"
" FROM raw_events_filtered PIVOT (MAX(_value) FOR system IN ("
"{% for filter in filters %}"
' "{{ filter }}" '
"{% if not loop.last %}"
", "
"{% endif %}"
"{% endfor %}"
" )) ORDER BY TagName, _time) "
" SELECT _time, Status"
"{% for filter in filters %}"
", {{ filter }}"
"{% endfor %}"
" FROM pivot_table WHERE "
"{% for filter in filters %}"
" {{ filter }} IS NOT NULL "
"{% if not loop.last %}"
"AND "
"{% endif %}"
"{% endfor %}"
" ORDER BY _time"
)

sql_template = Template(flux_query)
sql_query = sql_template.render(parameters)
return [sql_query]


def _max_query() -> list:
sql_query = (
'WITH raw_events AS (SELECT Latitude, Longitude, EnqueuedTime, EventTime AS _time, Value AS _value, Status, Latest, EventDate, TagName, split(TagName, ":") AS tags_array, tags_array [0] AS _field, tags_array [1] AS input_city, tags_array [2] AS source, "weather" AS _measurement FROM `weather`)'
', raw_events_filtered AS (SELECT * FROM raw_events WHERE (_measurement == "weather" and source == "harm_arome" and _field == "source_run") AND _time >= to_timestamp(timestampadd(day, -2, current_timestamp())))'
", max_events AS (SELECT _time, MAX(_value) OVER (PARTITION BY TagName) AS _value, Status, TagName, _field, _measurement, input_city, source FROM raw_events_filtered)"
", results AS (SELECT a._time, a._value, a.Status, a.TagName, a._field, a._measurement, a.input_city, a.source, ROW_NUMBER() OVER (PARTITION BY a.TagName ORDER BY a._time) AS ordered FROM max_events a INNER JOIN raw_events_filtered b ON a._time = b._time AND a._value = b._value)"
"SELECT _time, _value, Status, TagName, _field, input_city, source FROM results WHERE ordered = 1 ORDER BY input_city, _field, _time"
)

return [sql_query]


def _query_builder(query: str) -> list:
if re.search(aggregate_window, query):
return _resample_query(query)

elif re.search(r"\|> pivot", query):
return _pivot_query(query)

elif re.search(r"\|> max\(\)", query):
return _max_query()

else:
return _raw_query(query)
Loading