Skip to content

Commit

Permalink
* upgrade elasticsearch and elasticsearch-dsl to 8.13.0
Browse files Browse the repository at this point in the history
* update tests and elasticsearch logs to handle new major version
  • Loading branch information
Marcusk19 committed Nov 22, 2024
1 parent ece794e commit 7bcdbdf
Show file tree
Hide file tree
Showing 6 changed files with 71 additions and 19 deletions.
41 changes: 28 additions & 13 deletions data/logs_model/elastic_logs.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,12 @@
import re
from datetime import datetime, timedelta

from elasticsearch import RequestsHttpConnection
from elasticsearch.exceptions import AuthorizationException, NotFoundError
from elastic_transport import RequestsHttpNode, SerializationError
from elasticsearch.exceptions import (
AuthorizationException,
NotFoundError,
UnsupportedProductError,
)
from elasticsearch_dsl import Date, Document, Index, Integer, Ip, Keyword, Object, Text
from elasticsearch_dsl.connections import connections
from requests_aws4auth import AWS4Auth
Expand Down Expand Up @@ -75,7 +79,10 @@ def init(cls, index_prefix, index_settings=None, skip_template_init=False):
def create_or_update_template(cls):
assert cls._index and cls._index_prefix
index_template = cls._index.as_template(cls._index_prefix)
index_template.save(using=ELASTICSEARCH_TEMPLATE_CONNECTION_ALIAS)
try:
index_template.save(using=ELASTICSEARCH_TEMPLATE_CONNECTION_ALIAS)
except SerializationError:
pass

def save(self, **kwargs):
# We group the logs based on year, month and day as different indexes, so that
Expand Down Expand Up @@ -127,6 +134,7 @@ def _initialize(self):
"""
if not self._initialized:
http_auth = None
scheme = "https" if self._use_ssl else "http"
if self._access_key and self._secret_key and self._aws_region:
http_auth = AWS4Auth(self._access_key, self._secret_key, self._aws_region, "es")
elif self._access_key and self._secret_key:
Expand All @@ -135,11 +143,10 @@ def _initialize(self):
logger.warning("Connecting to Elasticsearch without HTTP auth")

self._client = connections.create_connection(
hosts=[{"host": self._host, "port": self._port}],
hosts=[{"host": self._host, "port": self._port, "scheme": scheme}],
http_auth=http_auth,
use_ssl=self._use_ssl,
verify_certs=True,
connection_class=RequestsHttpConnection,
node_class=RequestsHttpNode,
timeout=ELASTICSEARCH_DEFAULT_CONNECTION_TIMEOUT,
)

Expand All @@ -149,24 +156,25 @@ def _initialize(self):
# This only needs to be done once to initialize the index template
connections.create_connection(
alias=ELASTICSEARCH_TEMPLATE_CONNECTION_ALIAS,
hosts=[{"host": self._host, "port": self._port}],
hosts=[{"host": self._host, "port": self._port, "scheme": scheme}],
http_auth=http_auth,
use_ssl=self._use_ssl,
verify_certs=True,
connection_class=RequestsHttpConnection,
node_class=RequestsHttpNode,
timeout=ELASTICSEARCH_TEMPLATE_CONNECTION_TIMEOUT,
)

try:
force_template_update = ELASTICSEARCH_FORCE_INDEX_TEMPLATE_UPDATE.lower() == "true"
self._client.indices.get_template(self._index_prefix)
self._client.indices.get_template(name=self._index_prefix)
LogEntry.init(
self._index_prefix,
self._index_settings,
skip_template_init=not force_template_update,
)
except NotFoundError:
LogEntry.init(self._index_prefix, self._index_settings, skip_template_init=False)
except SerializationError:
LogEntry.init(self._index_prefix, self._index_settings, skip_template_init=False)
finally:
try:
connections.remove_connection(ELASTICSEARCH_TEMPLATE_CONNECTION_ALIAS)
Expand All @@ -187,9 +195,11 @@ def index_name(self, day):

def index_exists(self, index):
try:
return index in self._client.indices.get(index)
return index in self._client.indices.get(index=index)
except NotFoundError:
return False
except SerializationError:
return False

@staticmethod
def _valid_index_prefix(prefix):
Expand Down Expand Up @@ -229,20 +239,25 @@ def can_delete_index(self, index, cutoff_date):
def list_indices(self):
self._initialize()
try:
return list(self._client.indices.get(self._index_prefix + "*").keys())
return list(self._client.indices.get(index=(self._index_prefix + "*")).keys())
except NotFoundError as nfe:
logger.exception("`%s` indices not found: %s", self._index_prefix, nfe.info)
return []
except AuthorizationException as ae:
logger.exception("Unauthorized for indices `%s`: %s", self._index_prefix, ae.info)
return None
except SerializationError as se:
logger.exception(
"Serialization error for indices `%s`: %s", self._index_prefix, se.info
)
return None

def delete_index(self, index):
self._initialize()
assert self._valid_index_name(index)

try:
self._client.indices.delete(index)
self._client.indices.delete(index=index)
return index
except NotFoundError as nfe:
logger.exception("`%s` indices not found: %s", index, nfe.info)
Expand Down
4 changes: 2 additions & 2 deletions data/logs_model/logs_producer/elasticsearch_logs_producer.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import logging

from elasticsearch.exceptions import ElasticsearchException
from elasticsearch.exceptions import ApiError

from data.logs_model.logs_producer import LogSendException
from data.logs_model.logs_producer.interface import LogProducerInterface
Expand All @@ -18,7 +18,7 @@ class ElasticsearchLogsProducer(LogProducerInterface):
def send(self, logentry):
try:
logentry.save()
except ElasticsearchException as ex:
except ApiError as ex:
logger.exception("ElasticsearchLogsProducer error sending log to Elasticsearch: %s", ex)
raise LogSendException(
"ElasticsearchLogsProducer error sending log to Elasticsearch: %s" % ex
Expand Down
12 changes: 12 additions & 0 deletions data/logs_model/test/fake_elasticsearch.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
import dateutil.parser
from httmock import HTTMock, urlmatch

from data.logs_model.test.test_elasticsearch import add_elastic_headers

FAKE_ES_HOST = "fakees"

EMPTY_RESULT = {
Expand Down Expand Up @@ -42,6 +44,7 @@ def transform(value, field_name):

return value

@add_elastic_headers
@urlmatch(netloc=FAKE_ES_HOST, path=r"/_template/(.+)", method="GET")
def get_template(url, request):
template_name = url[len("/_template/") :]
Expand All @@ -50,12 +53,14 @@ def get_template(url, request):

return {"status_code": 404}

@add_elastic_headers
@urlmatch(netloc=FAKE_ES_HOST, path=r"/_template/(.+)", method="PUT")
def put_template(url, request):
template_name = url[len("/_template/") :]
templates[template_name] = True
return {"status_code": 201}

@add_elastic_headers
@urlmatch(netloc=FAKE_ES_HOST, path=r"/([^/]+)/_doc", method="POST")
def post_doc(url, request):
index_name, _ = url.path[1:].split("/")
Expand All @@ -75,6 +80,7 @@ def post_doc(url, request):
),
}

@add_elastic_headers
@urlmatch(netloc=FAKE_ES_HOST, path=r"/([^/]+)$", method="DELETE")
def index_delete(url, request):
index_name_or_pattern = url.path[1:]
Expand All @@ -96,6 +102,7 @@ def index_delete(url, request):
"content": {"acknowledged": True},
}

@add_elastic_headers
@urlmatch(netloc=FAKE_ES_HOST, path=r"/([^/]+)$", method="GET")
def index_lookup(url, request):
index_name_or_pattern = url.path[1:]
Expand Down Expand Up @@ -184,6 +191,7 @@ def _is_match(doc, current_query):

return found, found_index or (index_name_or_pattern.find("*") >= 0)

@add_elastic_headers
@urlmatch(netloc=FAKE_ES_HOST, path=r"/([^/]+)/_count$", method="POST")
def count_docs(url, request):
request = json.loads(request.body)
Expand All @@ -203,6 +211,7 @@ def count_docs(url, request):
"content": json.dumps({"count": len(found)}),
}

@add_elastic_headers
@urlmatch(netloc=FAKE_ES_HOST, path=r"/_search/scroll$", method="POST")
def lookup_scroll(url, request):
request_obj = json.loads(request.body)
Expand All @@ -220,6 +229,7 @@ def lookup_scroll(url, request):
"status_code": 404,
}

@add_elastic_headers
@urlmatch(netloc=FAKE_ES_HOST, path=r"/_search/scroll$", method="DELETE")
def delete_scroll(url, request):
request = json.loads(request.body)
Expand All @@ -230,6 +240,7 @@ def delete_scroll(url, request):
"status_code": 404,
}

@add_elastic_headers
@urlmatch(netloc=FAKE_ES_HOST, path=r"/([^/]+)/_search$", method="POST")
def lookup_docs(url, request):
query_params = parse_query(url.query)
Expand Down Expand Up @@ -383,6 +394,7 @@ def _aggregate(query_config, results):
"content": json.dumps(final_result),
}

@add_elastic_headers
@urlmatch(netloc=FAKE_ES_HOST)
def catchall_handler(url, request):
print(
Expand Down
3 changes: 2 additions & 1 deletion data/logs_model/test/mock_elasticsearch.py
Original file line number Diff line number Diff line change
Expand Up @@ -315,11 +315,12 @@ def _scroll(d):
"query": {
"range": {"datetime": {"lt": "2018-04-02T00:00:00", "gte": "2018-03-08T00:00:00"}}
},
"size": 1,
},
],
[{"scroll": "5m", "scroll_id": _scroll_id}],
[{"scroll": "5m", "scroll_id": _scroll_id}],
[{"scroll_id": [_scroll_id]}],
[{"scroll_id": _scroll_id}],
]

SCROLL_RESPONSES = [SCROLL_CREATE, SCROLL_GET, SCROLL_GET_2, SCROLL_DELETE]
26 changes: 25 additions & 1 deletion data/logs_model/test/test_elasticsearch.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
FAKE_AWS_SECRET_KEY = None
FAKE_AWS_REGION = None

DEFAULT_SEARCH_SCROLL_SIZE = 1


@pytest.fixture()
def logs_model_config():
Expand Down Expand Up @@ -142,6 +144,21 @@ def parse_query(query):
return {s.split("=")[0]: s.split("=")[1] for s in query.split("&") if s != ""}


def add_elastic_headers(response_fn):
# Adding special headers to the mocked response
# to prevent an UnsupportedProductError exception from occuring
# during testing
def wrapper(*args, **kwargs):
response = response_fn(*args, **kwargs)
if isinstance(response, dict):
headers = response.get("headers", {})
headers["X-Elastic-Product"] = "Elasticsearch"
response["headers"] = headers
return response

return wrapper


@pytest.fixture()
def mock_elasticsearch():
mock = Mock()
Expand All @@ -156,6 +173,7 @@ def mock_elasticsearch():
mock.list_indices.side_effect = NotImplementedError

@urlmatch(netloc=r".*", path=r".*")
@add_elastic_headers
def default(url, req):
raise Exception(
"\nurl={}\nmethod={}\nreq.url={}\nheaders={}\nbody={}".format(
Expand All @@ -164,25 +182,30 @@ def default(url, req):
)

@urlmatch(netloc=FAKE_ES_HOST_PATTERN, path=r"/_template/.*")
@add_elastic_headers
def template(url, req):
return mock.template(url.query.split("/")[-1], req.body)

@urlmatch(netloc=FAKE_ES_HOST_PATTERN, path=r"/logentry_(\*|[0-9\-]+)")
@add_elastic_headers
def list_indices(url, req):
return mock.list_indices()

@urlmatch(netloc=FAKE_ES_HOST_PATTERN, path=r"/logentry_[0-9\-]*/_doc")
@add_elastic_headers
def index(url, req):
index = url.path.split("/")[1]
body = json.loads(req.body)

return mock.index(index, body)

@urlmatch(netloc=FAKE_ES_HOST_PATTERN, path=r"/logentry_([0-9\-]*|\*)/_count")
@add_elastic_headers
def count(_, req):
return mock.count(json.loads(req.body))

@urlmatch(netloc=FAKE_ES_HOST_PATTERN, path=r"/_search/scroll")
@add_elastic_headers
def scroll(url, req):
if req.method == "DELETE":
return mock.scroll_delete(json.loads(req.body))
Expand All @@ -192,11 +215,12 @@ def scroll(url, req):
raise NotImplementedError()

@urlmatch(netloc=FAKE_ES_HOST_PATTERN, path=r"/logentry_(\*|[0-9\-]*)/_search")
@add_elastic_headers
def search(url, req):
if "scroll" in url.query:
query = parse_query(url.query)
window_size = query["scroll"]
maximum_result_size = int(query["size"])
maximum_result_size = int(query.get("size", DEFAULT_SEARCH_SCROLL_SIZE))
return mock.search_scroll_create(window_size, maximum_result_size, json.loads(req.body))
elif b"aggs" in req.body:
return mock.search_aggs(json.loads(req.body))
Expand Down
4 changes: 2 additions & 2 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ debtcollector==1.22.0
decorator==5.1.1
Deprecated==1.2.14
dumb-init==1.2.5.post1
elasticsearch==7.6.0
elasticsearch-dsl==7.0.0
elasticsearch==8.13.0
elasticsearch-dsl==8.13.0
Flask==2.3.2
Flask-Login==0.6.3
Flask-Mail==0.9.1
Expand Down

0 comments on commit 7bcdbdf

Please sign in to comment.