From 7e2416f757889b4333d151e57c95bab4c4b059c5 Mon Sep 17 00:00:00 2001 From: Jeremiah Uy Date: Tue, 10 Nov 2020 11:25:22 +0100 Subject: [PATCH] Feature/is alive check and api error handling (#10) * UPDATE: is alive check to change when unable to connect to k8s * UPDATE: to handle api exception --- collector/nais_stream.py | 37 ++++++++++++++++++++++++------------- main.py | 15 +++++++++++++-- 2 files changed, 37 insertions(+), 15 deletions(-) diff --git a/collector/nais_stream.py b/collector/nais_stream.py index ba120dc..72bd7df 100644 --- a/collector/nais_stream.py +++ b/collector/nais_stream.py @@ -1,6 +1,9 @@ import re from typing import Callable + +from kubernetes.client import ApiException + from collector.nais import init_nais_logging logger = init_nais_logging() @@ -18,24 +21,32 @@ def __str__(self): def resource_version(self): return self._resource_version -class NaisStream: +class NaisStream: def __init__(self, callback_function: Callable, v1, w): self.callback_function = callback_function self.v1 = v1 self.w = w def watch(self, **kwargs): - for event in self.w.stream(self.v1.list_cluster_custom_object, - group="nais.io", - version="v1alpha1", - plural="applications", - **kwargs): - if event["type"] not in ["ERROR", "DELETED"]: - self.callback_function(event) - elif event["type"] == "ERROR" and event["object"]["code"] == 410: - logger.warning("") - logger.warning(event) - logger.warning("") - resource_version = event["object"]["message"].split('(') + try: + for event in self.w.stream(self.v1.list_cluster_custom_object, + group="nais.io", + version="v1alpha1", + plural="applications", + **kwargs): + if event["type"] not in ["ERROR", "DELETED"]: + self.callback_function(event) + elif event["type"] == "ERROR" and event["object"]["code"] == 410: + logger.warning("") + logger.warning(event) + logger.warning("") + resource_version = event["object"]["message"].split('(') + raise TooOldResourceVersionError(resource_version[1][:-1]) + except ApiException as api_exc: + logger.warning(f"\n\nAPI exception: {api_exc.status}: Reason: {api_exc.reason}\n\n") + if api_exc.status == 410: + resource_version = api_exc.reason.split('(') raise TooOldResourceVersionError(resource_version[1][:-1]) + else: + raise api_exc diff --git a/main.py b/main.py index b65558c..9f3a8be 100644 --- a/main.py +++ b/main.py @@ -5,6 +5,8 @@ import backoff # noinspection PyPackageRequirements from fastapi import FastAPI +from starlette.responses import JSONResponse +from starlette import status from collector.nais import init_nais_logging from collector.kube_api import watch_nais_apps from kubernetes import client, config @@ -12,6 +14,7 @@ # initiating logging logger = init_nais_logging() app = FastAPI() +is_alive = True @backoff.on_exception(backoff.expo, requests.exceptions.RequestException, max_tries=10) @@ -43,7 +46,11 @@ def init_kube_client(): def watch_nais_task() -> None: - watch_nais_apps(watch_nais_callback) + try: + watch_nais_apps(watch_nais_callback) + except: + global is_alive + is_alive = False @app.on_event('startup') @@ -71,7 +78,11 @@ def root(): @app.get('/is-alive') def is_healthy(): - return 'OK' + if is_alive: + return 'OK' + else: + return JSONResponse(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + content={"Error": "K8s stream stopped"}) @app.get('/is-ready')