From 7bcdbdfe18d7e2c764d2695be8c8aa3599ceb14b Mon Sep 17 00:00:00 2001 From: Marcus Kok Date: Thu, 21 Nov 2024 15:00:44 -0500 Subject: [PATCH] * upgrade elasticsearch and elasticsearch-dsl to 8.13.0 * update tests and elasticsearch logs to handle new major version --- data/logs_model/elastic_logs.py | 41 +++++++++++++------ .../elasticsearch_logs_producer.py | 4 +- data/logs_model/test/fake_elasticsearch.py | 12 ++++++ data/logs_model/test/mock_elasticsearch.py | 3 +- data/logs_model/test/test_elasticsearch.py | 26 +++++++++++- requirements.txt | 4 +- 6 files changed, 71 insertions(+), 19 deletions(-) diff --git a/data/logs_model/elastic_logs.py b/data/logs_model/elastic_logs.py index 0e667f23b8..8be42e5b23 100644 --- a/data/logs_model/elastic_logs.py +++ b/data/logs_model/elastic_logs.py @@ -3,8 +3,12 @@ import re from datetime import datetime, timedelta -from elasticsearch import RequestsHttpConnection -from elasticsearch.exceptions import AuthorizationException, NotFoundError +from elastic_transport import RequestsHttpNode, SerializationError +from elasticsearch.exceptions import ( + AuthorizationException, + NotFoundError, + UnsupportedProductError, +) from elasticsearch_dsl import Date, Document, Index, Integer, Ip, Keyword, Object, Text from elasticsearch_dsl.connections import connections from requests_aws4auth import AWS4Auth @@ -75,7 +79,10 @@ def init(cls, index_prefix, index_settings=None, skip_template_init=False): def create_or_update_template(cls): assert cls._index and cls._index_prefix index_template = cls._index.as_template(cls._index_prefix) - index_template.save(using=ELASTICSEARCH_TEMPLATE_CONNECTION_ALIAS) + try: + index_template.save(using=ELASTICSEARCH_TEMPLATE_CONNECTION_ALIAS) + except SerializationError: + pass def save(self, **kwargs): # We group the logs based on year, month and day as different indexes, so that @@ -127,6 +134,7 @@ def _initialize(self): """ if not self._initialized: http_auth = None + scheme = "https" if self._use_ssl else "http" if self._access_key and self._secret_key and self._aws_region: http_auth = AWS4Auth(self._access_key, self._secret_key, self._aws_region, "es") elif self._access_key and self._secret_key: @@ -135,11 +143,10 @@ def _initialize(self): logger.warning("Connecting to Elasticsearch without HTTP auth") self._client = connections.create_connection( - hosts=[{"host": self._host, "port": self._port}], + hosts=[{"host": self._host, "port": self._port, "scheme": scheme}], http_auth=http_auth, - use_ssl=self._use_ssl, verify_certs=True, - connection_class=RequestsHttpConnection, + node_class=RequestsHttpNode, timeout=ELASTICSEARCH_DEFAULT_CONNECTION_TIMEOUT, ) @@ -149,17 +156,16 @@ def _initialize(self): # This only needs to be done once to initialize the index template connections.create_connection( alias=ELASTICSEARCH_TEMPLATE_CONNECTION_ALIAS, - hosts=[{"host": self._host, "port": self._port}], + hosts=[{"host": self._host, "port": self._port, "scheme": scheme}], http_auth=http_auth, - use_ssl=self._use_ssl, verify_certs=True, - connection_class=RequestsHttpConnection, + node_class=RequestsHttpNode, timeout=ELASTICSEARCH_TEMPLATE_CONNECTION_TIMEOUT, ) try: force_template_update = ELASTICSEARCH_FORCE_INDEX_TEMPLATE_UPDATE.lower() == "true" - self._client.indices.get_template(self._index_prefix) + self._client.indices.get_template(name=self._index_prefix) LogEntry.init( self._index_prefix, self._index_settings, @@ -167,6 +173,8 @@ def _initialize(self): ) except NotFoundError: LogEntry.init(self._index_prefix, self._index_settings, skip_template_init=False) + except SerializationError: + LogEntry.init(self._index_prefix, self._index_settings, skip_template_init=False) finally: try: connections.remove_connection(ELASTICSEARCH_TEMPLATE_CONNECTION_ALIAS) @@ -187,9 +195,11 @@ def index_name(self, day): def index_exists(self, index): try: - return index in self._client.indices.get(index) + return index in self._client.indices.get(index=index) except NotFoundError: return False + except SerializationError: + return False @staticmethod def _valid_index_prefix(prefix): @@ -229,20 +239,25 @@ def can_delete_index(self, index, cutoff_date): def list_indices(self): self._initialize() try: - return list(self._client.indices.get(self._index_prefix + "*").keys()) + return list(self._client.indices.get(index=(self._index_prefix + "*")).keys()) except NotFoundError as nfe: logger.exception("`%s` indices not found: %s", self._index_prefix, nfe.info) return [] except AuthorizationException as ae: logger.exception("Unauthorized for indices `%s`: %s", self._index_prefix, ae.info) return None + except SerializationError as se: + logger.exception( + "Serialization error for indices `%s`: %s", self._index_prefix, se.info + ) + return None def delete_index(self, index): self._initialize() assert self._valid_index_name(index) try: - self._client.indices.delete(index) + self._client.indices.delete(index=index) return index except NotFoundError as nfe: logger.exception("`%s` indices not found: %s", index, nfe.info) diff --git a/data/logs_model/logs_producer/elasticsearch_logs_producer.py b/data/logs_model/logs_producer/elasticsearch_logs_producer.py index b93b9c0c37..64d0e09727 100644 --- a/data/logs_model/logs_producer/elasticsearch_logs_producer.py +++ b/data/logs_model/logs_producer/elasticsearch_logs_producer.py @@ -1,6 +1,6 @@ import logging -from elasticsearch.exceptions import ElasticsearchException +from elasticsearch.exceptions import ApiError from data.logs_model.logs_producer import LogSendException from data.logs_model.logs_producer.interface import LogProducerInterface @@ -18,7 +18,7 @@ class ElasticsearchLogsProducer(LogProducerInterface): def send(self, logentry): try: logentry.save() - except ElasticsearchException as ex: + except ApiError as ex: logger.exception("ElasticsearchLogsProducer error sending log to Elasticsearch: %s", ex) raise LogSendException( "ElasticsearchLogsProducer error sending log to Elasticsearch: %s" % ex diff --git a/data/logs_model/test/fake_elasticsearch.py b/data/logs_model/test/fake_elasticsearch.py index 205e704528..d2c8c110b4 100644 --- a/data/logs_model/test/fake_elasticsearch.py +++ b/data/logs_model/test/fake_elasticsearch.py @@ -8,6 +8,8 @@ import dateutil.parser from httmock import HTTMock, urlmatch +from data.logs_model.test.test_elasticsearch import add_elastic_headers + FAKE_ES_HOST = "fakees" EMPTY_RESULT = { @@ -42,6 +44,7 @@ def transform(value, field_name): return value + @add_elastic_headers @urlmatch(netloc=FAKE_ES_HOST, path=r"/_template/(.+)", method="GET") def get_template(url, request): template_name = url[len("/_template/") :] @@ -50,12 +53,14 @@ def get_template(url, request): return {"status_code": 404} + @add_elastic_headers @urlmatch(netloc=FAKE_ES_HOST, path=r"/_template/(.+)", method="PUT") def put_template(url, request): template_name = url[len("/_template/") :] templates[template_name] = True return {"status_code": 201} + @add_elastic_headers @urlmatch(netloc=FAKE_ES_HOST, path=r"/([^/]+)/_doc", method="POST") def post_doc(url, request): index_name, _ = url.path[1:].split("/") @@ -75,6 +80,7 @@ def post_doc(url, request): ), } + @add_elastic_headers @urlmatch(netloc=FAKE_ES_HOST, path=r"/([^/]+)$", method="DELETE") def index_delete(url, request): index_name_or_pattern = url.path[1:] @@ -96,6 +102,7 @@ def index_delete(url, request): "content": {"acknowledged": True}, } + @add_elastic_headers @urlmatch(netloc=FAKE_ES_HOST, path=r"/([^/]+)$", method="GET") def index_lookup(url, request): index_name_or_pattern = url.path[1:] @@ -184,6 +191,7 @@ def _is_match(doc, current_query): return found, found_index or (index_name_or_pattern.find("*") >= 0) + @add_elastic_headers @urlmatch(netloc=FAKE_ES_HOST, path=r"/([^/]+)/_count$", method="POST") def count_docs(url, request): request = json.loads(request.body) @@ -203,6 +211,7 @@ def count_docs(url, request): "content": json.dumps({"count": len(found)}), } + @add_elastic_headers @urlmatch(netloc=FAKE_ES_HOST, path=r"/_search/scroll$", method="POST") def lookup_scroll(url, request): request_obj = json.loads(request.body) @@ -220,6 +229,7 @@ def lookup_scroll(url, request): "status_code": 404, } + @add_elastic_headers @urlmatch(netloc=FAKE_ES_HOST, path=r"/_search/scroll$", method="DELETE") def delete_scroll(url, request): request = json.loads(request.body) @@ -230,6 +240,7 @@ def delete_scroll(url, request): "status_code": 404, } + @add_elastic_headers @urlmatch(netloc=FAKE_ES_HOST, path=r"/([^/]+)/_search$", method="POST") def lookup_docs(url, request): query_params = parse_query(url.query) @@ -383,6 +394,7 @@ def _aggregate(query_config, results): "content": json.dumps(final_result), } + @add_elastic_headers @urlmatch(netloc=FAKE_ES_HOST) def catchall_handler(url, request): print( diff --git a/data/logs_model/test/mock_elasticsearch.py b/data/logs_model/test/mock_elasticsearch.py index 0b13c8cf03..4654baaaf1 100644 --- a/data/logs_model/test/mock_elasticsearch.py +++ b/data/logs_model/test/mock_elasticsearch.py @@ -315,11 +315,12 @@ def _scroll(d): "query": { "range": {"datetime": {"lt": "2018-04-02T00:00:00", "gte": "2018-03-08T00:00:00"}} }, + "size": 1, }, ], [{"scroll": "5m", "scroll_id": _scroll_id}], [{"scroll": "5m", "scroll_id": _scroll_id}], - [{"scroll_id": [_scroll_id]}], + [{"scroll_id": _scroll_id}], ] SCROLL_RESPONSES = [SCROLL_CREATE, SCROLL_GET, SCROLL_GET_2, SCROLL_DELETE] diff --git a/data/logs_model/test/test_elasticsearch.py b/data/logs_model/test/test_elasticsearch.py index 7156bc7b73..836c619580 100644 --- a/data/logs_model/test/test_elasticsearch.py +++ b/data/logs_model/test/test_elasticsearch.py @@ -26,6 +26,8 @@ FAKE_AWS_SECRET_KEY = None FAKE_AWS_REGION = None +DEFAULT_SEARCH_SCROLL_SIZE = 1 + @pytest.fixture() def logs_model_config(): @@ -142,6 +144,21 @@ def parse_query(query): return {s.split("=")[0]: s.split("=")[1] for s in query.split("&") if s != ""} +def add_elastic_headers(response_fn): + # Adding special headers to the mocked response + # to prevent an UnsupportedProductError exception from occuring + # during testing + def wrapper(*args, **kwargs): + response = response_fn(*args, **kwargs) + if isinstance(response, dict): + headers = response.get("headers", {}) + headers["X-Elastic-Product"] = "Elasticsearch" + response["headers"] = headers + return response + + return wrapper + + @pytest.fixture() def mock_elasticsearch(): mock = Mock() @@ -156,6 +173,7 @@ def mock_elasticsearch(): mock.list_indices.side_effect = NotImplementedError @urlmatch(netloc=r".*", path=r".*") + @add_elastic_headers def default(url, req): raise Exception( "\nurl={}\nmethod={}\nreq.url={}\nheaders={}\nbody={}".format( @@ -164,14 +182,17 @@ def default(url, req): ) @urlmatch(netloc=FAKE_ES_HOST_PATTERN, path=r"/_template/.*") + @add_elastic_headers def template(url, req): return mock.template(url.query.split("/")[-1], req.body) @urlmatch(netloc=FAKE_ES_HOST_PATTERN, path=r"/logentry_(\*|[0-9\-]+)") + @add_elastic_headers def list_indices(url, req): return mock.list_indices() @urlmatch(netloc=FAKE_ES_HOST_PATTERN, path=r"/logentry_[0-9\-]*/_doc") + @add_elastic_headers def index(url, req): index = url.path.split("/")[1] body = json.loads(req.body) @@ -179,10 +200,12 @@ def index(url, req): return mock.index(index, body) @urlmatch(netloc=FAKE_ES_HOST_PATTERN, path=r"/logentry_([0-9\-]*|\*)/_count") + @add_elastic_headers def count(_, req): return mock.count(json.loads(req.body)) @urlmatch(netloc=FAKE_ES_HOST_PATTERN, path=r"/_search/scroll") + @add_elastic_headers def scroll(url, req): if req.method == "DELETE": return mock.scroll_delete(json.loads(req.body)) @@ -192,11 +215,12 @@ def scroll(url, req): raise NotImplementedError() @urlmatch(netloc=FAKE_ES_HOST_PATTERN, path=r"/logentry_(\*|[0-9\-]*)/_search") + @add_elastic_headers def search(url, req): if "scroll" in url.query: query = parse_query(url.query) window_size = query["scroll"] - maximum_result_size = int(query["size"]) + maximum_result_size = int(query.get("size", DEFAULT_SEARCH_SCROLL_SIZE)) return mock.search_scroll_create(window_size, maximum_result_size, json.loads(req.body)) elif b"aggs" in req.body: return mock.search_aggs(json.loads(req.body)) diff --git a/requirements.txt b/requirements.txt index ce50b79a9d..f7877c36fc 100644 --- a/requirements.txt +++ b/requirements.txt @@ -25,8 +25,8 @@ debtcollector==1.22.0 decorator==5.1.1 Deprecated==1.2.14 dumb-init==1.2.5.post1 -elasticsearch==7.6.0 -elasticsearch-dsl==7.0.0 +elasticsearch==8.13.0 +elasticsearch-dsl==8.13.0 Flask==2.3.2 Flask-Login==0.6.3 Flask-Mail==0.9.1