diff --git a/.gitattributes b/.gitattributes new file mode 100644 index 0000000..5966153 --- /dev/null +++ b/.gitattributes @@ -0,0 +1,2 @@ +.gitattributes export-ignore +.gitignore export-ignore diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..5bf6653 --- /dev/null +++ b/.gitignore @@ -0,0 +1,10 @@ +__pycache__/ +*.py[cod] +.mypy_cache/ +.cache/ +.eggs/ +cfei_smap.egg-info + +build/ +dist/ +MANIFEST diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml new file mode 100644 index 0000000..47a2f85 --- /dev/null +++ b/.gitlab-ci.yml @@ -0,0 +1,81 @@ +stages: + - build + - test + - deploy + - documentation + +test: + stage: test + script: + - env PYTHONPATH=. pytest-3 -v . + + +style check: + stage: test + script: + - pycodestyle + allow_failure: true + + +type check: + stage: test + script: + - mypy --package ${CI_PROJECT_NAME//-/.} + allow_failure: true + + +create deb package: + stage: deploy + only: + - tags + script: + - python3 setup.py --command-packages=stdeb.command sdist_dsc --with-python2=False --with-python3=True bdist_deb + artifacts: + paths: + - deb_dist/python3-${CI_PROJECT_NAME}_${CI_COMMIT_REF_NAME:1}-1_all.deb + + +create wheel package: + stage: deploy + only: + - tags + script: + - pip3 wheel . + artifacts: + paths: + - '${CI_PROJECT_NAME//-/_}-${CI_COMMIT_REF_NAME:1}-py3-none-any.whl' + + +create tarball: + stage: deploy + only: + - tags + script: + - git archive --format=tar.gz ${CI_COMMIT_REF_NAME} --prefix=${CI_PROJECT_NAME}-${CI_COMMIT_REF_NAME:1}/ --output ${CI_PROJECT_NAME}-${CI_COMMIT_REF_NAME:1}.tar.gz + artifacts: + paths: + - ${CI_PROJECT_NAME}-${CI_COMMIT_REF_NAME:1}.tar.gz + + +create html documentation: + stage: documentation + only: + - tags + script: + - cd docs + - make html + artifacts: + paths: + - docs/build/html/ + + +create man pages: + stage: documentation + only: + - tags + script: + - cd docs + - make man + artifacts: + paths: + - docs/build/man/ diff --git a/MANIFEST.in b/MANIFEST.in new file mode 100644 index 0000000..b145ce6 --- /dev/null +++ b/MANIFEST.in @@ -0,0 +1,2 @@ +include *.txt *.md +recursive-include cfei *.json diff --git a/Readme.md b/Readme.md new file mode 100644 index 0000000..a27e32b --- /dev/null +++ b/Readme.md @@ -0,0 +1,310 @@ +CFEI sMAP Python Library +=========== + +This is a Python 3 library to interface to [sMAP] archivers. +It uses the [asyncio] framework to connect to the archiver, and it returns [pandas] data-frames and time-series. + + +Features +---- + +* **Fetching historical data**. + The user specifies a time interval and a query, and the library returns all time-series that satisfy the query. + + The user can also request a single time-series. + In that case, all resulting time-series will be intertwined to generate a single one. + This is useful when time-series are split in multiple streams, e.g., in case of gateway jumps. + +* **Subscribing to real-time data**. + The user specifies a query and a callback, and the library will call the callback whenever a new reading is available. + + Since the library supports IO concurrency, the application can perform other operations, while waiting for new data. + +* **Posting data**. + The users specifies a set of readings, a source name, a path and a UUID. + It can optionally specify properties and metadata. + + A simpler interface is available when posting data to existing streams. + The user specifies only UUIDs and readings, and the library will transparently retrieve and cache source names and paths. + +* **Concurrent requests**. + The user can instantiate multiple concurrent requests to the sMAP archiver, the library (or, better, the asyncio framework) will execute them in parallel. + + A parameter defines how many concurrent requests to execute (default: 10) to not overload the sMAP archiver. + Any further request will be transparently enqueued and executed as soon as a slot is free. + +* **Local caching**. + Readings can be cached on the local machine. + The library can detect when the requested time interval (or larger) is available locally. + This saves execution time, network traffic and server load. + If the cache does not correspond to the user's query, it is invalidated and data is automatically fetched from the server. + + *Note*: the library cannot detect when readings were replaced/added on the server. + Cache should be used only for immutable historical data, not for data that can change. + +* **Command line tool**. + The tool can be used to quickly plot single time-series data, or to export to CSV files. + + +Installation +---- + +The library is available as three different packages: + +- Debian package, usable on Debian/Ubuntu and derivatives (anything that uses `apt`/`apt-get`). + Install it with the following command (possibly prepended with `sudo`). + + dpkg -i /path/to/python3-cfei-smap_1.2.0-1_all.deb + +- Python wheel package, usable on Windows and almost every system with a Python 3 distribution. + Install it with the following command (possibly prepended with `sudo`, or passing the `--user` option). + + pip3 install /path/to/cfei_smap-1.1-py3-none-any.whl + +- Tarball source package. + It can be used by maintainers to generate a custom package. + + +Usage +---- + +### General Instructions + +This library uses the [asyncio] framework, and returns [pandas] data-frames and time-series. + +Most of the features are available as member functions of objects implementing `SmapInterface`. +Clients should first create one of the concrete classes, e.g., `SmapAiohttpInterface`, and then call its methods. + +~~~~python +from cfei.smap import SmapAiohttpInterface + +... + +smap = SmapAiohttpInterface("http://hostname.com:8079") + +await smap.fetch_readings(...) +~~~~ + + +### Important Note about Timezones + +Whenever using time-series data, proper care should be taken to correctly store and represent timezones. +Unfortunately, many tools lack support for timezones, and many users assume localtime is good enough. +This often results in issues when sharing time-series with other persons, or storing them in databases. +It also causes problems at daylight saving time changes, when the time offset of a timezone changes. +Therefore, this library enforces using timezone-aware datetimes. +Whenever a date is expected, it *must* be a timezone-aware datetime object in UTC, otherwise an exception will be generated. + +This could make things more complicate and cumbersome when users are interested in localtime dynamics, such as occupancy behaviour or price trends, because they have to manually convert to and from UTC. +An example on how to convert back and forth from UTC is included in :ref:`timezones`. + + +### Fetching Data + +To fetch data from sMAP, the caller must specify the interval as a pair of `datetime` objects and a *where* query. + +~~~~python +from datetime import datetime + +import pytz + +from cfei.smap import SmapAiohttpInterface + + +async def main(): + smap = SmapAiohttpInterface("http://hostname.com:8079") + + start = pytz.UTC.localize(datetime(2018, 1, 1, 10, 15)) + end = pytz.UTC.localize(datetime(2018, 1, 8, 4, 5)) + + where = ( + "Metadata/Location/Building = 'MyBuilding' and " + "Metadata/Location/Room = 'MyRoom' and " + "Metadata/Location/Media = 'air' and " + "Metadata/Location/Media = 'co2'" + ) + + readings_by_uuid = await smap.fetch_readings(start, end, where) + + # It returns a dict[UUID, pd.Series] + + readings = await smap.fetch_readings_intertwined(start, end, where) + + # It returns a single pd.Series obtained by intertwining all the results +~~~~ + +#### Note + +If there are readings at `start` and `end` instants, the returned time-series will be defined on the *closed* interval [`start`, `end`]. +Otherwise, which is a more common scenario, the returned time-series will be defined on the *open* interval ]`start`, `end`[. + + +### Posting Data + +Users can post both data (a sequence of readings, i.e., timestamp, value pairs) and metadata (a set of key, value pairs) to any number of sMAP streams. + +There are two options to post data to a stream: + +1. Providing *path*, *source name* and *UUID* (and optionally metadata and properties). + This option can be used to create non-existing streams. + + from datetime import datetime + from uuid import UUID + + import pandas as pd + + from cfei.smap import SmapAiohttpInterface + + async def main(): + series = pd.Series(...) + uuid = UUID(...) + + await smap.post_data(series, uuid, source_name, path) + +2. Providing only the *UUIDs*. + In this case the library will retrieve and cache the corresponding *path* and *source name*, before actually posting data. + This option is only available if the streams already exist. + + from datetime import datetime + from uuid import UUID + + import pandas as pd + + from cfei.smap import SmapAiohttpInterface + + async def main(): + series = pd.Series(...) + uuid = UUID(...) + + await smap.post_readings({ + uuid: series + }) + + +### Subscribing to real-time data + +Users can subscribe to sMAP, and be notified of every new available reading. + +~~~~python +from uuid import UUID + +from cfei.smap import SmapAiohttpInterface + + +async def callback(uuid, series): + for timestamp, value in series.iteritems(): + print("{}: {}".format(timestamp, value)) + + +async def main(): + smap = SmapAiohttpInterface("http://hostname.com:8079") + + where = ( + "Metadata/Location/Building = 'MyBuilding' and " + "Metadata/Location/Room = 'MyRoom' and " + "Metadata/Location/Media = 'air' and " + "Metadata/Location/Media = 'co2'" + ) + + await smap.subscribe(where, callback) +~~~~ + + +### Enabling Local Cache + +Pass `cache=True` to the `SmapInterface` constructor to enable local cache. + +~~~~python +from cfei.smap import SmapAiohttpInterface + + +async def main(): + smap = SmapAiohttpInterface("http://hostname.com:8079", cache=True) + + # The first time data are fetched from server and cached locally. + await smap.fetch_readings(start, end, where) + + # The second time they are loaded from local cache. + await smap.fetch_readings(start, end, where) + + # This interval is strictly contained in the previous one, so data can + # still be loaded from local cache. + await smap.fetch_readings( + start + timedelta(days=3), + end - timedelta(days=2), + where + ) + + # This interval is *NOT* strictly contained in the previous one, cache + # will be invalidated and data will be fetched from server. + await smap.fetch_readings( + start - timedelta(days=3), + end, + where + ) +~~~~ + + +### Note about *asyncio* + +This library uses the *asyncio* framework. +This means that all functions and methods are actually coroutines, and they need to be called accordingly. +The caller can retrieve the event loop, and explicitly execute a coroutine + +~~~~python +import asyncio + +loop = asyncio.get_event_loop() + +result = loop.run_until_complete( + smap.method(arguments) +) +~~~~ + +Otherwise, if the caller is itself a coroutine, it can use the corresponding syntax in Pyhton 3.5+ + +~~~~python +async def external_coroutine(): + result = await smap.method(arguments) +~~~~ + + +Command Line Utility +---- + +This library includes a command line utility named `smap`, which can be used to retrieve and plot data from sMAP archiver. + +~~~~ +usage: smap [-h] [-v] --url URL [--plot] [--plot-markers] [--csv] + [--start DATETIME] [--end DATETIME] + WHERE + +Fetch data from sMAP + +positional arguments: + WHERE sMAP query where + +optional arguments: + -h, --help show this help message and exit + -v, --verbose increase output + --url URL sMAP archiver URL + --plot plot results + --plot-markers show plot markers + --csv print results to stdout in CSV format + --start DATETIME initial time (default: 24h ago) + --end DATETIME final time (default: now) +~~~~ + +For instance, to export a single time-series to CSV file: + +~~~~bash +smap --url http://hostname.com:8079 "uuid = '12345678-1234-1234-1234-12345678abcd'" \ + --start 2018-01-17T00:00:00Z --csv -vv > output.csv +~~~~ + + +[sMAP]: https://pythonhosted.org/Smap/en/2.0/archiver.html + +[asyncio]: https://docs.python.org/3/library/asyncio.html + +[pandas]: https://pandas.pydata.org/ diff --git a/cfei/__init__.py b/cfei/__init__.py new file mode 100644 index 0000000..b1a3c78 --- /dev/null +++ b/cfei/__init__.py @@ -0,0 +1,5 @@ +# Allow other packages to use the prefix `cfei.` +# +# https://packaging.python.org/guides/packaging-namespace-packages/#pkgutil-style-namespace-packages + +__path__ = __import__('pkgutil').extend_path(__path__, __name__) # type: ignore diff --git a/cfei/smap/__init__.py b/cfei/smap/__init__.py new file mode 100644 index 0000000..7fa385c --- /dev/null +++ b/cfei/smap/__init__.py @@ -0,0 +1,774 @@ +from json import dumps as json_dumps +from datetime import datetime +from uuid import UUID, uuid3 +import logging +import heapq +from hashlib import md5 + +import pandas as pd +import typing + +from .types import SeriesCallbackType +from .transport import TransportInterface +from .transport.aiohttp import AiohttpTransportInterface +from .validation import parse_and_validate +from .cache import ( + load_readings_from_cache, + save_readings_to_cache, + load_raw_from_cache, + save_raw_to_cache, +) + + +class SmapInterface(object): + """ + An interface to sMAP protocol. + + Each object has a local cache for metadata, so querying multiple times for + a stream results in only one query for its metadata. + """ + + def __init__( + self, + transport_interface: TransportInterface, + cache: bool=False, + loose_validation: bool=False, + ) -> None: + """ + Create a `SmapInterface` object. + + :param transport_interface: transport interface. + :type transport_interface: TransportInterface + :param cache: set to True to use local cache. + :type cache: bool + :param cache: set to True to ignore JSON validation errors. + :type cache: bool + """ + + super(SmapInterface, self).__init__() + + self.logger = logging.getLogger(__name__) + + self.transport_interface = transport_interface + + self.use_local_cache = cache + self.loose_validation = loose_validation + + self.metadata_cache = {} # type: typing.Dict[UUID, typing.Any] + + self.path_cache = {} # type: typing.Dict[UUID, typing.Text] + + async def fetch_uuids( + self, + where: typing.Text, + ) -> typing.Set[UUID]: + """ + Query for UUIDs. + + Query the sMAP archiver for all streams that satisfy the `where` + condition and return their UUIDs. + + :param where: Query `WHERE` condition. + :type where: str + + :returns: The UUIDs. + :rtype: set(UUID) + """ + + query = "select uuid where {}".format(where) + output = await self.fetch_raw(query) + results = parse_and_validate( + output, + 'select_response_schema', + self.loose_validation + ) + return set([UUID(result['uuid']) for result in results]) + + async def fetch_metadata( + self, + where: typing.Text, + ) -> typing.Dict[UUID, typing.Any]: + """ + Query for metadata. + + Query the sMAP archiver for all streams that satisfy the `where` + condition and return their metadata. + + :param where: Query `WHERE` condition. + :type where: str + + :returns: A mapping UUID -> metadata. + :rtype: dict(UUID, object) + """ + + results = await self.fetch_everything(where) + return dict( + (uuid, everything['Metadata']) + for uuid, everything in results.items() + ) + + async def fetch_properties( + self, + where: typing.Text, + ) -> typing.Dict[UUID, typing.Any]: + """ + Query for properties. + + Query the sMAP archiver for all streams that satisfy the `where` + condition and return their properties. + + :param where: Query `WHERE` condition. + :type where: str + + :returns: A mapping UUID -> properties. + :rtype: dict(UUID, object) + """ + + results = await self.fetch_everything(where) + return dict( + (uuid, everything['Properties']) + for uuid, everything in results.items() + ) + + async def fetch_everything( + self, + where: typing.Text, + ) -> typing.Dict[UUID, typing.Any]: + """ + Query for everything. + + Query the sMAP archiver for all streams that satisfy the `where` + condition and return everything. + + :param where: Query `WHERE` condition. + :type where: str + + :returns: A mapping UUID -> metadata. + :rtype: dict(UUID, everything) + """ + + query = "select * where {}".format(where) + output = await self.fetch_raw(query) + results = parse_and_validate( + output, + 'select_response_schema', + self.loose_validation + ) + return dict( + (UUID(result['uuid']), result) + for result in results + ) + + async def fetch_raw( + self, + query: typing.Text, + ) -> typing.Text: + """ + Query for raw query. + + Query the sMAP archiver using a custom query and return. + + :param where: Raw query. + :type where: str + + :returns: Parsed JSON output. + :rtype: Any + """ + + if self.use_local_cache: + try: + data = load_raw_from_cache(query) + except Exception as exception: + self.logger.debug("Could not load from cache: %s", str(exception)) + data = await self.transport_interface.fetch(query) + save_raw_to_cache(data, query) + else: + data = await self.transport_interface.fetch(query) + + return data + + async def fetch_readings_intertwined( + self, + start: datetime, + end: datetime, + where: typing.Text, + limit: int=-1, + streamlimit: int=10000, + ) -> pd.Series: + """ + Query for readings and return a single stream. + + Query the sMAP archiver for all readings between `start` and `end` for + all streams that satisfy the `where` condition. The results are + intertwined. + + :param start: Query start time. + :type start: datetime + :param end: Query end time. + :type end: datetime + :param where: Query `WHERE` condition. + :type where: str + :param limit: Maximum number of readings (-1 for unlimited) + :type limit: int + :param streamlimit: Maximum number of streams + :type streamlimit: int + + :returns: Intertwined series. + :rtype: pd.Series + + .. warning :: When :code:`limit` is equal to -1 the returned + streams are actually limited to one million readings (possibly + depending on the server implementation). Using a higher value for + :code:`limit` works correctly. + """ + results = await self.fetch_readings( + start, end, where, limit, streamlimit + ) + return intertwine_series(list(results.values())) + + async def fetch_readings( + self, + start: datetime, + end: datetime, + where: typing.Text, + limit: int=-1, + streamlimit: int=10000, + ) -> typing.Dict[UUID, pd.Series]: + """ + Query for readings and return by UUID. + + Query the sMAP archiver for all readings between `start` and `end` for + all streams that satisfy the `where` condition. The results are + returned by UUID. + + :param start: Query start time. + :type start: datetime + :param end: Query end time. + :type end: datetime + :param where: Query `WHERE` condition. + :type where: str + :param limit: Maximum number of readings (-1 for unlimited) + :type limit: int + :param streamlimit: Maximum number of streams + :type streamlimit: int + + :returns: A mapping UUID -> values. + :rtype: dict(UUID, pd.Series) + + .. warning :: When :code:`limit` is equal to -1 the returned + streams are actually limited to one million readings (possibly + depending on the server implementation). Using a higher value for + :code:`limit` works correctly. + """ + + if self.use_local_cache: + try: + data = load_readings_from_cache( + start, end, where, limit, streamlimit + ) + except Exception as exception: + self.logger.debug("Could not load from cache: %s", str(exception)) + data = await self._fetch_readings( + start, + end, + where, + limit, + streamlimit + ) + save_readings_to_cache( + data, + start, + end, + where, + limit, + streamlimit, + ) + + else: + data = await self._fetch_readings( + start, + end, + where, + limit, + streamlimit + ) + + if limit == -1: + for uuid, stream in data.items(): + if len(stream) == 1000000: + self.logger.warn( + "UUID %s has exactly one million readings, " + "if limit was set to -1 they could be incomplete", + uuid + ) + + # Cached data might be larger than the requested period, truncate it. + def truncate(stream: pd.Series) -> pd.Series: + stream = stream[stream.index >= start] + stream = stream[stream.index <= end] + return stream + + return dict( + (uuid, truncate(stream)) + for uuid, stream in data.items() + ) + + async def _fetch_readings( + self, + start: datetime, + end: datetime, + where: typing.Text, + limit: int, + streamlimit: int + ) -> typing.Dict[UUID, pd.Series]: + + query = ( + "select data in ({start}, {end}) " + "limit {limit} streamlimit {streamlimit} " + "where {where}").format( + start=_to_milli_timestamp(start), + end=_to_milli_timestamp(end), + limit=limit, + streamlimit=streamlimit, + where=where, + ) + + # Do not use self.fetch_raw() + # That method uses a general local cache, while for data it is not + # necessary to match the entire query, a larger cache can be used as + # well. + output = await self.transport_interface.fetch(query) + results = parse_and_validate( + output, + 'select_response_schema', + self.loose_validation + ) + return dict( + ( + UUID(result['uuid']), + _parse_readings(UUID(result['uuid']), result['Readings']) + ) + for result in results + ) + + async def fetch_latest_readings( + self, + where: typing.Text, + limit: int=1, + streamlimit: int=10000, + instant: typing.Optional[datetime]=None + ) -> typing.Dict[UUID, pd.Series]: + """ + Query for readings and return by UUID. + + Query the sMAP archiver for all the readings before current or + specified instant for all streams that satisfy the `where` condition. + The results are returned by UUID. + + :param where: Query `WHERE` condition. + :type where: str + :param limit: Maximum number of readings (-1 for unlimited) + :type limit: int + :param streamlimit: Maximum number of streams + :type streamlimit: int + :param instant: Instant (if `None`, current instant is used) + :type instant: datetime + + :returns: A mapping UUID -> values. + :rtype: dict(UUID, pd.Series) + """ + + if instant is None: + query = ( + "select data before now " + "limit {limit} streamlimit {streamlimit} " + "where {where}").format( + limit=limit, + streamlimit=streamlimit, + where=where, + ) + else: + query = ( + "select data before {instant} " + "limit {limit} streamlimit {streamlimit} " + "where {where}").format( + instant=_to_milli_timestamp(instant), + limit=limit, + streamlimit=streamlimit, + where=where, + ) + output = await self.transport_interface.fetch(query) + results = parse_and_validate( + output, + 'select_response_schema', + self.loose_validation + ) + return dict( + ( + UUID(result['uuid']), + _parse_readings(UUID(result['uuid']), result['Readings']) + ) + for result in results + ) + + async def subscribe( + self, + where: typing.Optional[typing.Text], + callback: SeriesCallbackType, + timeout: int=30 + ) -> None: + """ + Subscribe to sMAP republish interface. + + The callback function will be called with new readings as soon as the + archiver will republish them. + + Args: + :param where: Query text. + :type where: str + :param callback: Callback to process new readings. + :type callback: SeriesCallbackType + :param timeout: Connection timeout in seconds. + :type timeout: int + """ + + async def raw_callback( + line: typing.Text + ) -> None: + payloads = parse_and_validate( + line, + 'subscribe_response_schema', + self.loose_validation + ) + for payload in payloads.values(): + uuid = UUID(payload['uuid']) + readings = payload['Readings'] + series = _parse_readings(uuid, readings) + + await callback(uuid, series) + + return await self.transport_interface.subscribe(where, raw_callback, timeout) + + async def post_readings( + self, + readings: typing.Union[typing.Dict[UUID, pd.Series], typing.Tuple[UUID, pd.Series]], + ) -> None: + """ + Post new readings to existing streams. + + The specified streams' paths will be fetched and cached before posting + the readings. All streams must, therefore, exist beforehand. + + :param readings: Either a dictionary {UUID: series} or a pair (UUID, pd.Series). + :type readings: dict|pd.Series + """ + + if isinstance(readings, dict): + streams = readings # type: typing.Dict[UUID, pd.Stream] + elif isinstance(readings, tuple): + streams = { + readings[0]: readings[1] + } + + unknown_uuids = set( + uuid + for uuid in streams.keys() + if uuid not in self.path_cache + ) + + if len(unknown_uuids) > 0: + self.logger.debug( + "Unknown UUIDs %s, getting their path...", + ', '.join(map(str, unknown_uuids)) + ) + query = "select Path where {}".format( + ' or '.join( + "uuid = '{}'".format(uuid) + for uuid in unknown_uuids + ) + ) + + output = await self.fetch_raw(query) + results = parse_and_validate( + output, + 'select_response_schema', + self.loose_validation + ) + for result in results: + uuid = UUID(result['uuid']) + path = result['Path'] + self.logger.debug("Path for UUID %s is %s", uuid, path) + self.path_cache[uuid] = path + + payload = dict() + + for uuid, stream in streams.items(): + try: + path = self.path_cache[uuid] + except KeyError: + raise RuntimeError( + "Stream with UUID {} does not exist, its path is unknown".format( + uuid + ) + ) + + payload[path] = _make_update_payload( + uuid, + stream=stream + ) + + data = json_dumps(payload) + self.logger.debug("Payload: %s", data) + + await self.transport_interface.post_data(data) + + async def post_data( + self, + stream: pd.Series, + uuid: UUID, + source_name: typing.Text, + path: typing.Text, + metadata: typing.Optional[typing.Dict]=None, + properties: typing.Optional[typing.Dict]=None, + ) -> None: + """ + Post new readings to a stream. + + :param streams: New readings. + :type streams: pd.Series + :param uuid: Stream's UUID. + :type uuid: UUID + :param source_name: Stream's source name. + :type source_name: str + :param path: Stream's path. + :type path: str + :param metadata: Stream's metadata + :type metadata: dict + :param properties: Stream's properties + :type properties: dict + """ + + if metadata is None: + metadata = dict() + + metadata = metadata.copy() + metadata['SourceName'] = source_name + + payload = { + path: _make_update_payload( + uuid, + stream=stream, + metadata=metadata, + properties=properties + ) + } + data = json_dumps(payload) + self.logger.debug("Payload: %s", data) + + await self.transport_interface.post_data(data) + + async def _update_metadata_cache_for( + self, + uuids: typing.Set[UUID] + ) -> None: + where = ' or '.join("uuid = '{}'".format(str(uuid)) for uuid in uuids) + results = await self.fetch_metadata(where) + for uuid in uuids: + self.metadata_cache[uuid] = results[uuid] + + +def _make_update_payload( + uuid: UUID, + stream: typing.Optional[pd.Series]=None, + metadata: typing.Optional[typing.Dict]=None, + properties: typing.Optional[typing.Dict]=None, + ) -> typing.Dict: + payload = { + 'uuid': str(uuid), + } # type: typing.Dict[typing.Text, typing.Any] + + if stream is not None: + readings = list(map(lambda a: list(a), zip( + [_to_milli_timestamp(ts) for ts in stream.index], + [float(v) for v in stream.values] + ))) + payload['Readings'] = readings + else: + payload['Readings'] = [] + if properties is not None: + payload['Properties'] = properties + if metadata is not None: + payload['Metadata'] = metadata + return payload + + +def _parse_readings( + uuid: UUID, + readings: typing.List[typing.List] + ) -> pd.Series: + if len(readings) == 0: + timestamps = [] # type: typing.List[int] + values = [] # type: typing.List[float] + else: + timestamps, values = zip(*readings) + + index = pd.to_datetime(timestamps, unit='ms', utc=True) + return pd.Series(values, index=index, name=str(uuid)) + + +def _to_milli_timestamp(date: datetime) -> int: + if date.tzinfo is None: + raise RuntimeError( + "Cannot convert a timezone-naive datetime to a timestamp" + ) + return 1000 * int(date.timestamp()) + + +class SmapAiohttpInterface(SmapInterface): + """ + An interface to sMAP protocol over HTTP. + + .. versionadded:: 1.1 + """ + + def __init__( + self, + url: typing.Text, + key: typing.Text='', + encoding: typing.Text='utf-8', + buffer_size: int=2**16, + max_concurrent_requests: int=10, + cache: bool=False + ) -> None: + """ + Create a `SmapAiohttpInterface` object. + + All arguments are passed to `cfei.smap.transport.AiohttpTransportInterface`. + + :param url: sMAP archiver URL. + :type url: str + :param key: sMAP archiver key. + :type key: str + :param encoding: sMAP archiver text encoding (utf-8 or similar). + :type encoding: str + :param buffer_size: buffer size for subscriptions. + :type buffer_size: int + :param max_concurrent_requests: maximum amount of concurrent requests. + :type max_concurrent_requests: int + :param cache: set to True to use local cache. + :type cache: bool + """ + super(SmapAiohttpInterface, self).__init__( + AiohttpTransportInterface( + url, + key, + encoding, + buffer_size, + max_concurrent_requests, + ), + cache=cache + ) + + +class SmapHttpInterface(SmapInterface): + """ + An interface to sMAP protocol over HTTP. + + .. deprecated:: 1.1 + Use :class:`SmapAiohttpInterface` instead. + """ + + def __init__( + self, + url: typing.Text, + key: typing.Text='', + encoding: typing.Text='utf-8', + buffer_size: int=2**16, + max_concurrent_requests: int=10, + cache: bool=False + ) -> None: + """ + Create a `SmapHttpInterface` object. + + All arguments are passed to `cfei.smap.transport.AiohttpTransportInterface`. + + :param url: sMAP archiver host name. + :type url: str + :param key: sMAP archiver key. + :type key: str + :param encoding: sMAP archiver text encoding (utf-8 or similar). + :type encoding: str + :param buffer_size: buffer size for subscriptions. + :type buffer_size: int + :param max_concurrent_requests: maximum amount of concurrent requests. + :type max_concurrent_requests: int + :param cache: set to True to use local cache. + :type cache: bool + """ + super(SmapHttpInterface, self).__init__( + AiohttpTransportInterface( + url, + key, + encoding, + buffer_size, + max_concurrent_requests, + ), + cache=cache + ) + + +def intertwine_series(serieses: typing.List[pd.Series]) -> pd.Series: + ''' + Convert a list of time-series to a single time-series, such that readings + are in order + + :param series: List of time-series. + :type series: typing.List[pd.Series] + :param uuids: List of uuids. + :type uuids: typing.List[UUID] + + :returns: A new intertwined time-series. + :rtype: pd.Series + ''' + + lists = [ + [ + (index, value) + for index, value in series.iteritems() + ] + for series in serieses + if len(series) > 0 + ] + + if len(lists) == 0: + return pd.Series() + + index, values = zip(*heapq.merge(*lists)) + + series = pd.Series(values, index=index) + series = series[~series.index.duplicated(keep='first')] + return series + + +def generate_uuid_as_in_java( + source_name: typing.Text, + path: typing.Text + ) -> UUID: + ''' + Generate a UUID from source name and path + + This function has the same implementation of the Java library, therefore, + with same inputs it will generate the same UUID. + + :param source_name: Source name + :type source_name: typing.Text + :param path: Path. + :type path: typing.Text + + :returns: A UUID. + :rtype: UUID + ''' + class NULL_NAMESPACE: + bytes = md5((source_name + path).encode('utf-8')).digest() + + return uuid3(typing.cast(UUID, NULL_NAMESPACE), '') diff --git a/cfei/smap/cache.py b/cfei/smap/cache.py new file mode 100644 index 0000000..086b38a --- /dev/null +++ b/cfei/smap/cache.py @@ -0,0 +1,191 @@ +from datetime import datetime +import logging +import pickle +from uuid import UUID +from os.path import join +from os import makedirs +from hashlib import sha512 +from base64 import b32encode + +import pandas as pd +import typing + +import appdirs + + +def load_readings_from_cache( + start: datetime, + end: datetime, + where: typing.Text, + limit: int, + streamlimit: int, + ) -> typing.Dict[UUID, pd.Series]: + """ + Load readings from local cache + + Cache file name is generated from the where clause. If such file exists, + it is loaded using `pickle` library. Loading fails if: + + - Data contained in cache does not cover the entire period [start, end]; + - Data contained in cache had different `limit` or `streamlimit` arguments. + + :param start: Query start time. + :type start: datetime + :param end: Query end time. + :type end: datetime + :param where: Query `WHERE` condition. + :type where: str + :param limit: Maximum number of readings + :type limit: int + :param streamlimit: Maximum number of streams + :type streamlimit: int + + :returns: A mapping UUID -> values. + :rtype: dict(UUID, pd.Series) + + :raises RuntimeError: If cache loading failed. + """ + + logger = logging.getLogger(__name__) + cache_file_path = _get_cache_file_path(where) + logger.debug("Loading data from cache file: %s", cache_file_path) + with open(cache_file_path, 'rb') as cache_file: + entry = pickle.load(cache_file) # typing.Dict[typing.Text, typing.Any] + if entry['start'] > start: + raise RuntimeError('cache starts too late') + if entry['end'] < end: + raise RuntimeError('cache ends too early') + if entry['limit'] != limit: + raise RuntimeError('cache limit mismatch') + if entry['streamlimit'] != streamlimit: + raise RuntimeError('cache stream limit mismatch') + data = entry['data'] # type: typing.Dict[UUID, typing.Any] + return data + + +def save_readings_to_cache( + data: typing.Dict[UUID, typing.Any], + start: datetime, + end: datetime, + where: typing.Text, + limit: int, + streamlimit: int, + ) -> None: + """ + Save readings to local cache + + Cache file name is generated from the where clause. + + :param data: Readings. + :type data: dict(UUID, typing.Any) + :param start: Query start time. + :type start: datetime + :param end: Query end time. + :type end: datetime + :param where: Query `WHERE` condition. + :type where: str + :param limit: Maximum number of readings + :type limit: int + :param streamlimit: Maximum number of streams + :type streamlimit: int + """ + + logger = logging.getLogger(__name__) + + entry = { + 'start': start, + 'end': end, + 'data': data, + 'limit': limit, + 'streamlimit': streamlimit, + } + try: + cache_file_path = _get_cache_file_path(where) + logger.debug('Saving to cache') + cache_dir = _get_cache_dir() + makedirs(cache_dir, exist_ok=True) + with open(cache_file_path, 'wb') as cache_file: + pickle.dump(entry, cache_file) + except Exception as other_exception: + logger.warning( + "Could not save cache: %s", + str(other_exception) + ) + + +def load_raw_from_cache( + query: typing.Text, + ) -> typing.Text: + """ + Load raw content from local cache + + Cache file name is generated from the entire query. If such file exists, + it is loaded using `pickle` library. + + :param query: Query. + :type query: str + + :returns: Raw data. + :rtype: str + + :raises RuntimeError: If cache loading failed. + """ + + logger = logging.getLogger(__name__) + + cache_file_path = _get_cache_file_path(query) + logger.debug("Loading data from cache file: %s", cache_file_path) + with open(cache_file_path, 'rb') as cache_file: + data = pickle.load(cache_file) # type: typing.Text + return data + + +def save_raw_to_cache( + data: typing.Text, + query: typing.Text, + ) -> None: + """ + Save raw data to local cache + + Cache file name is generated from the entire query. + + :param data: Raw data. + :type data: str + :param query: Query. + :type query: str + """ + + logger = logging.getLogger(__name__) + + try: + cache_file_path = _get_cache_file_path(query) + logger.debug('Saving to cache') + cache_dir = _get_cache_dir() + makedirs(cache_dir, exist_ok=True) + with open(cache_file_path, 'wb') as cache_file: + pickle.dump(data, cache_file) + except Exception as other_exception: + logger.warning( + "Could not save cache: %s", + str(other_exception) + ) + + +def _get_cache_dir() -> typing.Text: + cache_dir = appdirs.user_cache_dir( + appname='sMAP-Python', + appauthor='dk.sdu.mmmi.cfei', + ) # type: typing.Text + return cache_dir + + +def _get_cache_file_path(text: typing.Text) -> typing.Text: + cache_dir = _get_cache_dir() + + # Leave UTF-8 here, do not use configurable encoding. + # This is used to generate the cache file name. + hash_bytes = sha512(text.encode('utf-8')).digest() + cache_file_name = b32encode(hash_bytes).decode('utf-8') + + cache_file_path = join(cache_dir, cache_file_name) # type: typing.Text + return cache_file_path diff --git a/cfei/smap/select_response_schema.json b/cfei/smap/select_response_schema.json new file mode 100644 index 0000000..6b72bb2 --- /dev/null +++ b/cfei/smap/select_response_schema.json @@ -0,0 +1,57 @@ +{ + "$schema": "http://json-schema.org/draft-04/schema#", + "definitions": { + "uuid": { + "type": "string", + "minLength": 36, + "maxLength": 36, + "pattern": "^[a-f0-9]{8}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{12}$" + }, + "timestamp": { + "type": "number" + } + }, + "type": "array", + "items": { + "type": "object", + "additionalProperties": false, + "required": ["uuid"], + "properties": { + "uuid": { + "$ref": "#/definitions/uuid" + }, + "Readings": { + "type": "array", + "items": { + "type": "array", + "minItems": 2, + "maxItems": 2, + "additionalItems": false, + "items": [ + { + "$ref": "#/definitions/timestamp" + }, + { + "type": "number" + } + ] + } + }, + "Metadata": { + "type": "object", + "additionalProperties": true, + "properties": { + "SourceName": { + "type": "string" + } + } + }, + "Path": { + "type": "string" + }, + "Properties": { + "type": "object" + } + } + } +} diff --git a/cfei/smap/subscribe_response_schema.json b/cfei/smap/subscribe_response_schema.json new file mode 100644 index 0000000..7e905cc --- /dev/null +++ b/cfei/smap/subscribe_response_schema.json @@ -0,0 +1,60 @@ +{ + "$schema": "http://json-schema.org/draft-04/schema#", + "definitions": { + "uuid": { + "type": "string", + "minLength": 36, + "maxLength": 36, + "pattern": "^[a-f0-9]{8}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{12}$" + }, + "timestamp": { + "type": "number" + } + }, + "type": "object", + "additionalProperties": false, + "patternProperties": { + ".*": { + "type": "object", + "additionalProperties": false, + "required": ["uuid"], + "properties": { + "uuid": { + "$ref": "#/definitions/uuid" + }, + "Readings": { + "type": "array", + "items": { + "type": "array", + "minItems": 2, + "maxItems": 2, + "additionalItems": false, + "items": [ + { + "$ref": "#/definitions/timestamp" + }, + { + "type": "number" + } + ] + } + }, + "Metadata": { + "type": "object", + "additionalProperties": true, + "properties": { + "SourceName": { + "type": "string" + } + } + }, + "Path": { + "type": "string" + }, + "Properties": { + "type": "object" + } + } + } + } +} diff --git a/cfei/smap/tests/__init__.py b/cfei/smap/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/cfei/smap/tests/mockup.py b/cfei/smap/tests/mockup.py new file mode 100644 index 0000000..c933214 --- /dev/null +++ b/cfei/smap/tests/mockup.py @@ -0,0 +1,41 @@ +import typing + +from cfei.smap.types import RawCallbackType +from cfei.smap.transport import TransportInterface + + +class DummyTransportInterface(TransportInterface): + """docstring for DummyTransportInterface""" + def __init__( + self, + result: typing.Text, + ) -> None: + super(DummyTransportInterface, self).__init__() + + self.result = result + + async def post_data( + self, + data: typing.Text + ) -> None: + self.received_post_data = data + + async def fetch( + self, + query: typing.Text + ) -> typing.Text: + + self.received_fetch_query = query + + return self.result + + def synchronous_fetch(self, query: typing.Text) -> typing.Text: + raise RuntimeError('Not implemented') + + async def subscribe( + self, + where: typing.Optional[typing.Text], + callback: RawCallbackType, + timeout: int=30 + ) -> None: + raise RuntimeError('Not implemented') diff --git a/cfei/smap/tests/test_smap_freefunctions.py b/cfei/smap/tests/test_smap_freefunctions.py new file mode 100644 index 0000000..4de36d4 --- /dev/null +++ b/cfei/smap/tests/test_smap_freefunctions.py @@ -0,0 +1,104 @@ +from uuid import UUID +from datetime import datetime, timezone + +import pandas as pd +from pandas.util.testing import assert_series_equal + +import cfei.smap + + +def test_parse_readings() -> None: + uuid = UUID('12345678-1234-1234-1234-123456789abc') + readings = [ + [1484006400000, 12], + [1491004800000, -34], + [1491523200000, 53] + ] + + expected = pd.Series( + [12, -34, 53], + index=pd.to_datetime([ + 1484006400000, + 1491004800000, + 1491523200000, + ], + unit='ms', + utc=True + ), + name=str(uuid) + ) + + actual = cfei.smap._parse_readings(uuid, readings) + + assert_series_equal(actual, expected) + + +def test_to_milli_timestamp() -> None: + expected = 1491004800000 + actual = cfei.smap._to_milli_timestamp(datetime(2017, 4, 1, tzinfo=timezone.utc)) + + assert expected == actual + + +def test_intertwine_series() -> None: + a = pd.Series([1, 2, 3, 4], index=[1, 3, 4, 9]) + b = pd.Series([5, 6, 7, 8], index=[2, 3, 8, 9]) + + expected = pd.Series([1, 5, 2, 3, 7, 4], index=[1, 2, 3, 4, 8, 9]) + + actual = cfei.smap.intertwine_series([a, b]) + + assert_series_equal(actual, expected) + + +def test_make_update_payload() -> None: + uuid = UUID('12345678-1234-1234-1234-123456789abc') + + readings = pd.Series( + [12, -34, 53], + index=pd.to_datetime([ + 1484006400000, + 1491004800000, + 1491523200000, + ], + unit='ms', + utc=True + ), + name=str(uuid) + ) + + metadata = { + 'SourceName': 'Some Source Name' + } + + expected = { + 'uuid': str(uuid), + 'Readings': [ + [1484006400000, 12], + [1491004800000, -34], + [1491523200000, 53] + ], + 'Metadata': { + 'SourceName': 'Some Source Name' + } + } + + actual = cfei.smap._make_update_payload( + uuid, + readings, + metadata, + properties=None, + ) + + assert expected == actual + + +def test_generate_uuid_as_in_java() -> None: + source_name = 'My Source Name' + path = '/path/to/time-series' + + expected = UUID('c5d98d49-8af6-37e1-9248-9285be1ae8dd') + + actual = cfei.smap.generate_uuid_as_in_java(source_name, path) + + assert expected == actual diff --git a/cfei/smap/tests/test_smap_interface.py b/cfei/smap/tests/test_smap_interface.py new file mode 100644 index 0000000..ae2bd0f --- /dev/null +++ b/cfei/smap/tests/test_smap_interface.py @@ -0,0 +1,171 @@ +from uuid import UUID +import json +from datetime import datetime, timezone +import asyncio + +import pandas as pd +from pandas.util.testing import assert_series_equal + +from cfei.smap import SmapInterface +from .mockup import DummyTransportInterface + + +def test_fetch_readings() -> None: + loop = asyncio.get_event_loop() + + start = datetime(2017, 1, 1, tzinfo=timezone.utc) + end = datetime(2017, 10, 1, tzinfo=timezone.utc) + where = "Metadata/Location/Building = 'Some Building'" + + uuid = UUID('12345678-1234-5678-9abc-123456789abc') + + expected_query = ( + "select data in (1483228800000, 1506816000000) " + "limit -1 streamlimit 10000 " + "where Metadata/Location/Building = 'Some Building'" + ) + + transport_result = [ + { + 'uuid': str(uuid), + 'Readings': [ + [1483228800000, 5.0], + [1485907200000, 8.0], + [1488326400000, 12.0], + [1491004800000, -5.3] + ] + } + ] + + expected_result = pd.Series( + [5.0, 8.0, 12.0, -5.3], + index=pd.to_datetime( + [ + datetime(2017, 1, 1, tzinfo=timezone.utc), + datetime(2017, 2, 1, tzinfo=timezone.utc), + datetime(2017, 3, 1, tzinfo=timezone.utc), + datetime(2017, 4, 1, tzinfo=timezone.utc), + ], + utc=True + ), + name=str(uuid) + ) + + transport_interface = DummyTransportInterface(json.dumps(transport_result)) + smap = SmapInterface(transport_interface) + + result = loop.run_until_complete( + smap.fetch_readings(start, end, where) + ) + + assert transport_interface.received_fetch_query == expected_query + + assert len(result) == 1 + + assert list(result.keys())[0] == uuid + + assert_series_equal(list(result.values())[0], expected_result) + + +def test_post_data() -> None: + loop = asyncio.get_event_loop() + + uuid = UUID('12345678-1234-5678-9abc-123456789abc') + series = pd.Series( + [5.0, 8.0, 12.0, -5.3], + index=pd.to_datetime( + [ + datetime(2017, 1, 1, tzinfo=timezone.utc), + datetime(2017, 2, 1, tzinfo=timezone.utc), + datetime(2017, 3, 1, tzinfo=timezone.utc), + datetime(2017, 4, 1, tzinfo=timezone.utc), + ], + utc=True + ), + name=str(uuid) + ) + source_name = 'My Source Name' + path = '/some/path' + metadata = { + 'Location': { + 'Building': 'Some Building' + } + } + + expected_data = { + path: { + 'uuid': str(uuid), + 'Readings': [ + [1483228800000, 5.0], + [1485907200000, 8.0], + [1488326400000, 12.0], + [1491004800000, -5.3] + ], + 'Metadata': { + 'SourceName': source_name, + 'Location': { + 'Building': 'Some Building' + } + } + } + } + + transport_interface = DummyTransportInterface('') + smap = SmapInterface(transport_interface) + + loop.run_until_complete( + smap.post_data(series, uuid, source_name, path, metadata) + ) + + assert json.loads(transport_interface.received_post_data) == expected_data + + +def test_post_readings() -> None: + loop = asyncio.get_event_loop() + + uuid = UUID('12345678-1234-5678-9abc-123456789abc') + series = pd.Series( + [5.0, 8.0, 12.0, -5.3], + index=pd.to_datetime( + [ + datetime(2017, 1, 1, tzinfo=timezone.utc), + datetime(2017, 2, 1, tzinfo=timezone.utc), + datetime(2017, 3, 1, tzinfo=timezone.utc), + datetime(2017, 4, 1, tzinfo=timezone.utc), + ], + utc=True + ), + name=str(uuid) + ) + path = '/some/path' + + transport_result = [ + { + "Path": path, + "uuid": str(uuid) + } + ] + + expected_query = "select Path where uuid = '{}'".format(uuid) + + expected_data = { + path: { + 'uuid': str(uuid), + 'Readings': [ + [1483228800000, 5.0], + [1485907200000, 8.0], + [1488326400000, 12.0], + [1491004800000, -5.3] + ] + } + } + + transport_interface = DummyTransportInterface(json.dumps(transport_result)) + smap = SmapInterface(transport_interface) + + loop.run_until_complete( + smap.post_readings((uuid, series)) + ) + + assert transport_interface.received_fetch_query == expected_query + assert json.loads(transport_interface.received_post_data) == expected_data diff --git a/cfei/smap/transport/__init__.py b/cfei/smap/transport/__init__.py new file mode 100644 index 0000000..6053623 --- /dev/null +++ b/cfei/smap/transport/__init__.py @@ -0,0 +1,73 @@ +import typing + +from ..types import RawCallbackType + + +class TransportInterface(object): + """ + Interface for transport layer. + """ + + def __init__(self) -> None: + super(TransportInterface, self).__init__() + + async def post_data( + self, + data: typing.Text + ) -> None: + """ + Post new data. + + :param data: data. + :type data: str + """ + raise RuntimeError('Not implemented') + + async def fetch( + self, + query: typing.Text + ) -> typing.Text: + """ + Fetch data by query. + + :param query: Query. + :type query: str + + :return: Query result. + :rtype: typing.Text + """ + raise RuntimeError('Not implemented') + + def synchronous_fetch(self, query: typing.Text) -> typing.Text: + """ + Synchronously fetch data by query. + + :param query: Query. + :type query: str + + + :return: Query result. + :rtype: typing.Text + """ + raise RuntimeError('Not implemented') + + async def subscribe( + self, + where: typing.Optional[typing.Text], + callback: RawCallbackType, + timeout: int=30 + ) -> None: + """ + Subscribe to republish interface. + + The callback function will be called with new readings as soon as the + archiver will republish them. + + :param where: Query text. + :type where: str + :param callback: Callback to process new readings. + :type callback: CallbackType + :param timeout: Connection timeout in seconds. + :type timeout: int + """ + raise RuntimeError('Not implemented') diff --git a/cfei/smap/transport/aiohttp.py b/cfei/smap/transport/aiohttp.py new file mode 100644 index 0000000..fefab0e --- /dev/null +++ b/cfei/smap/transport/aiohttp.py @@ -0,0 +1,215 @@ +import logging +import typing + +import urllib.request +import urllib.parse + +import asyncio + +import aiohttp + +from ..types import RawCallbackType +from ..transport import TransportInterface + + +class AiohttpTransportInterface(TransportInterface): + """ + Interface for HTTP transport layer. + + .. versionchanged:: 1.1 + This class was previously called `cfei.smap.transport.HttpTransportInterface`. + """ + + def __init__( + self, + url: typing.Text, + key: typing.Text='', + encoding: typing.Text='utf-8', + buffer_size: int=2**16, + max_concurrent_requests: int=10, + ) -> None: + """ + Create a `AiohttpTransportInterface` object. + + Args: + :param url: sMAP archiver URL. + :type: url: str + :param key: sMAP archiver key. + :type: key: str + :param encoding: sMAP archiver text encoding (utf-8 or similar). + :type: encoding: str + :param buffer_size: buffer size for subscriptions. + :type: buffer_size: int + :param max_concurrent_requests: maximum amount of concurrent requests. + :type: max_concurrent_requests: int + """ + super(AiohttpTransportInterface, self).__init__() + self.logger = logging.getLogger(__name__) + + self.base_url = urllib.parse.urlparse(url).geturl() + self.query_url = urllib.parse.urljoin(self.base_url, '/api/query') + self.subscribe_url = urllib.parse.urljoin(self.base_url, '/republish') + self.post_url = urllib.parse.urljoin(self.base_url, '/add/' + key) + + self.buffer_size = buffer_size + self.encoding = encoding + self.semaphore = asyncio.Semaphore(max_concurrent_requests) + + async def post_data( + self, + data: typing.Text + ) -> None: + """ + Post new data. + + :param data: data. + :type data: str + """ + + async with self.semaphore: + async with aiohttp.ClientSession() as session: + async with session.post( + self.post_url, + data=data.encode(self.encoding) + ) as response: + self.logger.debug("Response status: %d", response.status) + if response.status != 200: + raise RuntimeError("Can't post data") + + async def fetch( + self, + query: typing.Text + ) -> typing.Text: + """ + Fetch data by query. + + :param query: Query. + :type query: str + + :return: Query result. + :rtype: JsonType + """ + + async with self.semaphore: + self.logger.debug("Query: %s", query) + data = query.encode(self.encoding) + + async with aiohttp.ClientSession( + conn_timeout=60, + read_timeout=None + ) as session: + async with session.post( + self.query_url, + data=data, + timeout=None + ) as response: + self.logger.debug("Response status: %d", response.status) + + # This could easily be implemented with stream.readline(), however, + # that method raises an exception if a line is longer than 2**16 + # bytes. + stream = response.content + all_bytes = bytes() + + while not stream.at_eof(): + next_bytes = await stream.read(self.buffer_size) + all_bytes += next_bytes + + text = all_bytes.decode(self.encoding) + + if (response.status // 100) == 2: + # Success HTTP response + return text + else: + raise RuntimeError( + "HTTP Response Status: {}\n{}".format( + response.status, + text + ) + ) + + def synchronous_fetch(self, query: typing.Text) -> typing.Text: + """ + Synchronously fetch data by query. + + :param query: Query. + :type query: str + + + :return: Query result. + :rtype: JsonType + """ + + data = query.encode(self.encoding) + response = urllib.request.urlopen(self.query_url, data) + str_response = response.read().decode(self.encoding) + return str_response + + async def subscribe( + self, + where: typing.Optional[typing.Text], + callback: RawCallbackType, + timeout: int=30 + ) -> None: + """ + Subscribe to republish interface. + + The callback function will be called with new readings as soon as the + archiver will republish them. + + :param where: Query text. + :type where: str + :param callback: Callback to process new readings. + :type callback: CallbackType + :param timeout: Connection timeout in seconds. + :type timeout: int + """ + + self.logger.debug("Subscribing to %s", self.subscribe_url) + async with aiohttp.ClientSession( + read_timeout=None, + conn_timeout=timeout + ) as session: + if where: + self.logger.debug("Where: %s", where) + data = where.encode(self.encoding) # type: typing.Optional[bytes] + else: + data = None + async with session.post(self.subscribe_url, data=data) as response: + self.logger.debug("Response status: %d", response.status) + stream = response.content + await self._process_subscription_stream(stream, callback) + + async def _process_subscription_stream( + self, + stream: aiohttp.StreamReader, + callback: RawCallbackType + ) -> None: + self.logger.debug('Processing subscription StreamReader') + + remaining_bytes = bytes() + + while not stream.at_eof(): + # This could easily be implemented with stream.readline(), however, + # that method raises an exception if a line is longer than 2**16 + # bytes. + + next_bytes = await stream.read(self.buffer_size) + chunks = (remaining_bytes + next_bytes).split(b'\n') + + for chunk in chunks[:-1]: + try: + await self._process_chunk(chunk, callback) + except Exception: + self.logger.error("Error processing chunk", exc_info=True) + + remaining_bytes = chunks[-1] + + async def _process_chunk( + self, + chunk: bytes, + callback: RawCallbackType + ) -> None: + line = chunk.decode(self.encoding).strip() + if len(line) > 0: + await callback(line) diff --git a/cfei/smap/types.py b/cfei/smap/types.py new file mode 100644 index 0000000..df2f7b6 --- /dev/null +++ b/cfei/smap/types.py @@ -0,0 +1,36 @@ +from uuid import UUID +import typing + +import pandas as pd + + +JsonType = typing.Any + +RawCallbackType = typing.Callable[ + [typing.Text], + typing.Awaitable[None] +] +""" +A raw callback is an `async` function (or a `@asyncio.coroutine` decorated +function) taking as arguments a string, i.e., a line returned by the +republish interface, and calls a callback with each update. +""" + +CallbackType = typing.Callable[ + [UUID, typing.List], + typing.Awaitable[None] +] +""" +A callback is an `async` function (or a `@asyncio.coroutine` decorated +function) taking as arguments a UUID and a list of readings, where each +reading is a 2-list [timestamp, value]. +""" + +SeriesCallbackType = typing.Callable[ + [UUID, pd.Series], + typing.Awaitable[None] +] +""" +A series callback is an `async` function (or a `@asyncio.coroutine` decorated +function) taking as arguments a UUID and a time-series. +""" diff --git a/cfei/smap/validation.py b/cfei/smap/validation.py new file mode 100644 index 0000000..f96f7a6 --- /dev/null +++ b/cfei/smap/validation.py @@ -0,0 +1,58 @@ +import logging +import jsonschema +import json +import pkgutil + +import typing + + +def parse_and_validate( + text: typing.Text, + schema_name: typing.Text, + loose_validation: bool, + ) -> typing.Any: + """ + Parse and validate JSON payload + + The JSON schema is loaded from the package's content and used to validate + the JSON payload. + + :param text: JSON payload. + :type text: str + :param schema_name: JSON schema to use for validation. + :type schema_name: str + :param loose_validation: If True only logs a warning in case of invalid data. + :type loose_validation: bool + + :returns: Parsed data structure. + :rtype: obj + + :raises ValidationError: If the payload does not follow the schema. + :raises SchemaError: If the schema is not valid. + """ + logger = logging.getLogger(__name__) + + payload = json.loads(text) + + schema_file_name = schema_name + '.json' + schema_bytes = pkgutil.get_data(__name__, schema_file_name) + if schema_bytes is not None: + schema = json.loads(schema_bytes.decode('utf-8')) + try: + jsonschema.validate( + payload, + schema, + format_checker=jsonschema.FormatChecker() + ) + except jsonschema.exceptions.ValidationError as e: + if loose_validation: + logger.warn("Cannot validate JSON: %s", e) + else: + raise + except jsonschema.exceptions.SchemaError as e: + if loose_validation: + logger.warn("Invalid JSON schema: %s", e) + else: + raise + + return payload diff --git a/docs/Makefile b/docs/Makefile new file mode 100644 index 0000000..1c5eac0 --- /dev/null +++ b/docs/Makefile @@ -0,0 +1,20 @@ +# Minimal makefile for Sphinx documentation +# + +# You can set these variables from the command line. +SPHINXOPTS = +SPHINXBUILD = python3 -msphinx +SPHINXPROJ = CFEIsMAPPythonLibrary +SOURCEDIR = source +BUILDDIR = build + +# Put it first so that "make" without argument is like "make help". +help: + @$(SPHINXBUILD) -M help "$(SOURCEDIR)" "$(BUILDDIR)" $(SPHINXOPTS) $(O) + +.PHONY: help Makefile + +# Catch-all target: route all unknown targets to Sphinx using the new +# "make mode" option. $(O) is meant as a shortcut for $(SPHINXOPTS). +%: Makefile + @$(SPHINXBUILD) -M $@ "$(SOURCEDIR)" "$(BUILDDIR)" $(SPHINXOPTS) $(O) \ No newline at end of file diff --git a/docs/source/_static/timezones.svg b/docs/source/_static/timezones.svg new file mode 100644 index 0000000..1d72bf7 --- /dev/null +++ b/docs/source/_static/timezones.svg @@ -0,0 +1,5659 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/docs/source/conf.py b/docs/source/conf.py new file mode 100644 index 0000000..1d0047d --- /dev/null +++ b/docs/source/conf.py @@ -0,0 +1,132 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +# +# CFEI sMAP Python Library documentation build configuration file, created by +# sphinx-quickstart on Thu Feb 15 10:34:57 2018. +# +# This file is execfile()d with the current directory set to its +# containing dir. +# +# Note that not all possible configuration values are present in this +# autogenerated file. +# +# All configuration values have a default; values that are commented out +# serve to show the default. + +# If extensions (or modules to document with autodoc) are in another directory, +# add these directories to sys.path here. If the directory is relative to the +# documentation root, use os.path.abspath to make it absolute, like shown here. +# +import os +import sys +sys.path.insert(0, os.path.abspath('../..')) + + +# -- General configuration ------------------------------------------------ + +# If your documentation needs a minimal Sphinx version, state it here. +# +# needs_sphinx = '1.0' + +# Add any Sphinx extension module names here, as strings. They can be +# extensions coming with Sphinx (named 'sphinx.ext.*') or your custom +# ones. +extensions = [ + 'sphinx.ext.autodoc', + 'sphinx.ext.coverage', + 'sphinx.ext.autosectionlabel', + 'm2r', +] + +numfig = True + +# Add any paths that contain templates here, relative to this directory. +templates_path = ['_templates'] + +# The suffix(es) of source filenames. +# You can specify multiple suffix as a list of string: +# +source_suffix = ['.rst', '.md'] + +# The master toctree document. +master_doc = 'index' + +# Other documents. +smap_cmdline_doc = 'smap' + +# General information about the project. +project = 'CFEI sMAP Python Library' +copyright = '2018, Center for Energy Informatics, University of Southern Denmark' +author = 'Claudio Giovanni Mattera cgim@mmmi.sdu.dk' + +# The version info for the project you're documenting, acts as replacement for +# |version| and |release|, also used in various other places throughout the +# built documents. +# +# The short X.Y version. +version = '1.2.0' +# The full version, including alpha/beta/rc tags. +release = '1.2.0' + +# The language for content autogenerated by Sphinx. Refer to documentation +# for a list of supported languages. +# +# This is also used if you do content translation via gettext catalogs. +# Usually you set "language" from the command line for these cases. +language = None + +# List of patterns, relative to source directory, that match files and +# directories to ignore when looking for source files. +# This patterns also effect to html_static_path and html_extra_path +exclude_patterns = [] + +# The name of the Pygments (syntax highlighting) style to use. +pygments_style = 'sphinx' + +# If true, `todo` and `todoList` produce output, else they produce nothing. +todo_include_todos = False + + +# -- Options for HTML output ---------------------------------------------- + +# The theme to use for HTML and HTML Help pages. See the documentation for +# a list of builtin themes. +# +# html_theme = 'alabaster' +html_theme = 'sphinx_rtd_theme' + +# Theme options are theme-specific and customize the look and feel of a theme +# further. For a list of options available for each theme, see the +# documentation. +# +html_theme_options = { + 'collapse_navigation': False, + 'navigation_depth': 4, +} +# Add any paths that contain custom static files (such as style sheets) here, +# relative to this directory. They are copied after the builtin static files, +# so a file named "default.css" will overwrite the builtin "default.css". +html_static_path = ['_static'] + +# Custom sidebar templates, must be a dictionary that maps document names +# to template names. +# +# This is required for the alabaster theme +# refs: http://alabaster.readthedocs.io/en/latest/installation.html#sidebars +html_sidebars = { + '**': [ + 'about.html', + 'navigation.html', + 'searchbox.html', + ] +} + + +# -- Options for manual page output --------------------------------------- + +# One entry per manual page. List of tuples +# (source start file, name, description, authors, manual section). +man_pages = [ + (master_doc, 'cfei-smap', 'CFEI sMAP Python Library Documentation', [author], 3), + (smap_cmdline_doc, 'smap', 'CFEI sMAP Command Line Tool', [author], 1), +] diff --git a/docs/source/index.rst b/docs/source/index.rst new file mode 100644 index 0000000..cd07231 --- /dev/null +++ b/docs/source/index.rst @@ -0,0 +1,25 @@ + +************************ +CFEI Python sMAP Library +************************ + + +Python library for querying sMAP servers to fetch data, subscribing to receive real-time updates, and posting new data. + + +.. toctree:: + :maxdepth: 2 + :caption: Contents: + + library + reference + timezones + internals + + +Indices and tables +================== + +* :ref:`genindex` +* :ref:`modindex` +* :ref:`search` diff --git a/docs/source/internals.rst b/docs/source/internals.rst new file mode 100644 index 0000000..2c88bc7 --- /dev/null +++ b/docs/source/internals.rst @@ -0,0 +1,55 @@ + +********* +Internals +********* + +.. toctree:: + :maxdepth: 2 + :caption: Contents: + + +========= +Structure +========= + +The library consists of a `SmapInterface` class that defines the main interface over sMAP. +This class uses an abstract class `TransportInterface` that defines primitives to send and retrieve data over HTTP. +Splitting the transport implementation from its interface allows to use this library with multiple backends. +The only supported backend at the moment is the aiohttp_ library, plus a mockup backend used for testing. + + + +======= +Caching +======= + +The library supports caching time-series and metadata on the local machine, to reduce fetch time, network traffic and server load. +When a query is executed for the first time, a filename is obtained by hashing its where-clause, and the query results are saved to such file. +The following times the same query is executed, the results are taken from the local cached file. + +The cache contains also the selected period, so that queries with same where-clause but different periods can use the cache, provided that the requested period is contained in the cached period. +Other parameters, such as limits, are also stored in cache, and if they do not match, they invalidate the cache. + +Local cache is disabled by default, and can enabled by passing ``cache=True`` to the ``SmapInterface`` constructor. +The cache directory is obtained from appdirs_ library, and is usually located at: + +- ``/home/USERNAME/.cache/sMAP-Python`` on Linux/Unix +- ``C:\\Users\\USERNAME\\AppData\\Local\\Acme\\sMAP-Python\\Cache`` on Windows + +Invalidated cache files are deleted and re-created, but local cache is otherwise never deleted. +In case it becomes too large, it should be manually deleted. + + + +================= +Schema Validation +================= + +sMAP server returns results through JSON encoding. +This library optionally checks whether the returned JSON payloads respect the expected jsonschema_. +This can be useful to diagnose a faulty server implementation. + + +.. _aiohttp: https://aiohttp.readthedocs.io/ +.. _appdirs: https://pypi.org/project/appdirs/ +.. _jsonschema: https://json-schema.org/ diff --git a/docs/source/library.rst b/docs/source/library.rst new file mode 100644 index 0000000..11975ce --- /dev/null +++ b/docs/source/library.rst @@ -0,0 +1,7 @@ + +.. toctree:: + :maxdepth: 2 + :caption: Contents: + + +.. mdinclude:: ../../Readme.md diff --git a/docs/source/reference.rst b/docs/source/reference.rst new file mode 100644 index 0000000..ad53831 --- /dev/null +++ b/docs/source/reference.rst @@ -0,0 +1,64 @@ + +********* +Reference +********* + +.. toctree:: + :maxdepth: 2 + :caption: Contents: + + +================================ +Reference for :code:`cfei.smap` +================================ + +.. automodule:: cfei.smap + :members: + :undoc-members: + :inherited-members: + :show-inheritance: + + +====================================== +Reference for :code:`cfei.smap.types` +====================================== + +.. automodule:: cfei.smap.types + :members: + :undoc-members: + :inherited-members: + :show-inheritance: + + +========================================== +Reference for :code:`cfei.smap.transport` +========================================== + +.. automodule:: cfei.smap.transport + :members: + :undoc-members: + :inherited-members: + :show-inheritance: + + +================================================== +Reference for :code:`cfei.smap.transport.aiohttp` +================================================== + +.. automodule:: cfei.smap.transport.aiohttp + :members: + :undoc-members: + :inherited-members: + :show-inheritance: + + +========================================== +Reference for :code:`cfei.smap.cache` +========================================== + +.. automodule:: cfei.smap.cache + :members: + :undoc-members: + :inherited-members: + :show-inheritance: + diff --git a/docs/source/smap.rst b/docs/source/smap.rst new file mode 100644 index 0000000..d7efeee --- /dev/null +++ b/docs/source/smap.rst @@ -0,0 +1,71 @@ + +********************************** +CFEI Python sMAP Command Line Tool +********************************** + + +======== +SYNOPSIS +======== + +:: + usage: smap [-h] [-v] --url URL [--plot] [--plot-markers] [--csv] + [--start DATETIME] [--end DATETIME] + WHERE + + + +=========== +DESCRIPTION +=========== + +*smap* is a command line tool to fetch data from a sMAP server. + + +======= +OPTIONS +======= + +--------------- +General options +--------------- + +``-h``, ``--help`` Show documentation + +``-v`` Increase verbosity + + +--------------------- +sMAP archiver options +--------------------- + +``--url`` sMAP archiver URL + + +------------ +Data options +------------ + +``--start`` Initial time [default: 24h ago] + +``--end`` Final time [default: now] + + +-------------- +Output options +-------------- + +``--csv`` Print results to stdout in CSV format + +``--plot`` Plot results + +``--plot-markers`` Show plot markers + + +=========== +EXAMPLES +=========== + +The following command plots the specified UUID over the past 24h:: + + smap --url http://localhost --plot "uuid='12345678-1234-1234-1234-12345678abcd'" diff --git a/docs/source/timezones.rst b/docs/source/timezones.rst new file mode 100644 index 0000000..16a5c39 --- /dev/null +++ b/docs/source/timezones.rst @@ -0,0 +1,125 @@ + +.. _timezones: + +******************************** +Converting to and from Timezones +******************************** + +The following code shows how to convert between localtime and UTC, query an sMAP server for data, and display the resulting time-series. + +This examples shows occupancy data, which has dynamics in localtime. +I.e., people arrive on-site around seven in the morning, and leave around three in the afternoon. +Assuming the current timezone is Europe/Copenhagen, people arrive on-site around 06:00 UTC and leave around 14:00 UTC. +However, it can be useful to access the time-series with a localtime index, for instance for plotting, or for using the time-series with programs that do not support time-zones. + +.. code-block:: python + + import logging + from datetime import datetime, timedelta + import asyncio + + import pytz + + from matplotlib import pyplot as plt + import matplotlib.dates as mdates + + from cfei.smap import SmapAiohttpInterface + + + async def main(): + hours = mdates.HourLocator(interval=1) + + smap = SmapAiohttpInterface("http://hostname:8079") + + # This is localtime naive, i.e., there is no information about timezone. + timezone_naive_local_datetime = datetime(2018, 2, 6, 4) + # 2018-02-06T04:00:00 + + # This creates a timezone object to represent the desired timezone. + timezone = pytz.timezone("Europe/Copenhagen") + + # This converts the timezone-naive datetime to a timezone-aware datetime. + # This is still localtime, but it carries information about the timezone. + timezone_aware_local_datetime = timezone.localize(timezone_naive_local_datetime) + # 2018-02-06T04:00:00+01:00 + + # This converts the localtime to UTC. + # It is necessary because comparisons between datetimes with different + # timezones is not supported in Python and pandas. + timezone_aware_utc_datetime = timezone_aware_local_datetime.astimezone(pytz.UTC) + # 2018-02-06T03:00:00Z + + # A UTC timezone-aware datetime can be used with this library. + readings = await smap.fetch_readings_intertwined( + timezone_aware_utc_datetime, + timezone_aware_utc_datetime + timedelta(hours=14), + "uuid = '12345678-1234-1234-1234-123412341234'", + ) + + # The resulting time-series have always their index in UTC. + timezone_aware_utc_index = readings.index + + # Most time-series processing can be done in UTC, but sometimes it may + # be necessary to convert it to localtime. + # This is localtime but it still carries information about the timezone. + timezone_aware_localtime_index = timezone_aware_utc_index.tz_convert(timezone) + + # And occasionally it may be necessary to drop the timezone information + # entirely. + # These situations are critical and must be carefully handled. + + # First the index is converted to localtime, and then its timezone is set + # to None. + timezone_naive_localtime_index = timezone_aware_localtime_index.tz_localize(None) + + figure, axes = plt.subplots(nrows=3) + + axes[0].set_title("Timezone-aware UTC index") + axes[0].step(timezone_aware_utc_index, readings.values) + axes[0].xaxis.set_major_locator(hours) + axes[0].xaxis.set_major_formatter( + mdates.DateFormatter("%H:%M\n%z", tz=timezone_aware_utc_index.tz) + ) + axes[0].xaxis.grid() + + axes[1].set_title("Timezone-aware localtime index") + axes[1].step(timezone_aware_localtime_index, readings.values) + axes[1].xaxis.set_major_locator(hours) + axes[1].xaxis.set_major_formatter( + mdates.DateFormatter("%H:%M\n%z", tz=timezone_aware_localtime_index.tz) + ) + axes[1].xaxis.grid() + + axes[2].set_title("Timezone-naive index") + axes[2].step(timezone_naive_localtime_index, readings.values) + axes[2].xaxis.set_major_locator(hours) + axes[2].xaxis.set_major_formatter(mdates.DateFormatter("%H:%M\n%z")) + axes[2].xaxis.grid() + + figure.tight_layout() + + plt.show() + + + if __name__ == '__main__': + logging.basicConfig(level=logging.INFO) + loop = asyncio.get_event_loop() + try: + loop.run_until_complete(main()) + finally: + loop.close() + + +The generated plot is shown in :numref:`label`. +The first time-series is rising at 06:00+0000, and the second one is rising at 07:00+0100, which are indeed the same instant in two different timezones. +The third time-series is instead rising at 07:00, which is ambiguous. +However, this time-series can be processed by other programs that do not support timezones. +It is responsibility of the user to properly manage the timezone information for the rest of the pipeline. + +.. _label: +.. figure:: _static/timezones.svg + :scale: 100 % + :alt: alternate text + :align: center + + The same time-series with three different indices: one in UTC, one in the local timezone, and one without timezone diff --git a/scripts/smap b/scripts/smap new file mode 100755 index 0000000..9d8ba88 --- /dev/null +++ b/scripts/smap @@ -0,0 +1,135 @@ +#!/usr/bin/env python3 + +from datetime import datetime, timedelta +import argparse +import logging +import asyncio + +import matplotlib.pyplot as plt + +import iso8601 + +from aiohttp.client_exceptions import ClientError + +from cfei.smap import SmapAiohttpInterface + + +async def main(): + arguments = parse_arguments() + setup_logging(arguments.verbose) + logger = logging.getLogger(__name__) + + logger.debug("Using sMAP server %s", arguments.url) + smap = SmapAiohttpInterface(arguments.url) + + logger.debug("Requesting data between %s and %s", arguments.start, arguments.end) + try: + readings = await smap.fetch_readings_intertwined( + arguments.start, + arguments.end, + arguments.where + ) + + logger.debug( + "Fetched %d readings from %s to %s", + len(readings), + readings.index[0], + readings.index[-1] + ) + + if arguments.csv: + print(readings.to_csv()) + + if arguments.plot: + figure, ax = plt.subplots() + marker = '.' if arguments.plot_markers else None + ax.step(readings.index, readings.values, where='post', marker=marker) + + plt.show() + except ClientError as e: + logger.error("%s", e) + + +def parse_arguments(): + parser = argparse.ArgumentParser( + description='Fetch data from sMAP' + ) + + now = datetime.utcnow().replace(tzinfo=iso8601.UTC) + default_end = now + default_start = now - timedelta(days=1) + + parser.add_argument( + 'where', + metavar='WHERE', + type=str, + help='sMAP query where' + ) + parser.add_argument( + '-v', '--verbose', + action='count', + help='increase output' + ) + parser.add_argument( + '--url', + type=str, + required=True, + help='sMAP archiver URL' + ) + parser.add_argument( + '--plot', + action='store_true', + default=False, + help='plot results' + ) + parser.add_argument( + '--plot-markers', + action='store_true', + default=False, + help='show plot markers' + ) + parser.add_argument( + '--csv', + action='store_true', + default=False, + help='print results to stdout in CSV format' + ) + parser.add_argument( + '--start', + metavar='DATETIME', + default=default_start, + type=parse_datetime, + help='initial time (default: 24h ago)' + ) + parser.add_argument( + '--end', + metavar='DATETIME', + default=default_end, + type=parse_datetime, + help='final time (default: now)' + ) + + return parser.parse_args() + + +def parse_datetime(text): + return iso8601.parse_date(text) + + +def setup_logging(verbose): + if verbose is None or verbose <= 0: + level = logging.WARN + elif verbose == 1: + level = logging.INFO + else: + level = logging.DEBUG + + logging.basicConfig(level=level) + + +if __name__ == '__main__': + loop = asyncio.get_event_loop() + try: + loop.run_until_complete(main()) + finally: + loop.close() diff --git a/setup.cfg b/setup.cfg new file mode 100644 index 0000000..4d40716 --- /dev/null +++ b/setup.cfg @@ -0,0 +1,28 @@ +[aliases] +test=pytest + +[mypy] +show_error_context = True +show_column_numbers = True + +follow_imports = normal +ignore_missing_imports = True +strict_optional = True +disallow_any_unimported = False +disallow_any_expr = False +disallow_any_decorated = False +disallow_any_explicit = False +disallow_any_generics = False +disallow_subclassing_any = True +disallow_untyped_calls = True +disallow_untyped_defs = True +check_untyped_defs = True +warn_no_return = True +warn_return_any = True +warn_unused_ignores = True +warn_incomplete_stub = True +warn_redundant_casts = True +no_implicit_optional = True + +[pycodestyle] +max-line-length = 120 diff --git a/setup.py b/setup.py new file mode 100755 index 0000000..9f64e6b --- /dev/null +++ b/setup.py @@ -0,0 +1,37 @@ +#!/usr/bin/env python + +import sys + +from setuptools import setup + +needs_pytest = {'pytest', 'test', 'ptr'}.intersection(sys.argv) +pytest_runner = ['pytest-runner'] if needs_pytest else [] + +setup( + name='cfei-smap', + version='1.2.0', + description='Interface to sMAP', + long_description=open('Readme.md').read(), + author='Claudio Giovanni Mattera', + author_email='cgim@mmmi.sdu.dk', + url='https://github.com/sdu-cfei/cfei-smap/', + license='MIT', + packages=[ + 'cfei.smap', + 'cfei.smap.transport', + ], + include_package_data=True, + scripts=[ + 'scripts/smap', + ], + install_requires=[ + 'pandas', + 'typing', + 'aiohttp', + 'appdirs', + 'jsonschema', + 'iso8601', + ], + setup_requires=pytest_runner, + tests_require=['pytest'], +)