Skip to content

Commit

Permalink
B OpenNebula/one-aiops#569: fix scaling operation in OneGate
Browse files Browse the repository at this point in the history
  • Loading branch information
MarioRobres committed Dec 18, 2024
1 parent 13347d7 commit c70dacb
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 18 deletions.
18 changes: 11 additions & 7 deletions lithops/serverless/backends/one/gate.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,16 +49,20 @@ def get(self, path):
except ValueError as e:
raise OneGateError(f"Failed to parse JSON response: {e}")

def post(self, path, cardinality, role="worker", force=False):
def scale(self, cardinality, role="worker"):
"""
Make a POST request to OneGate API.
Make a PUT request to OneGate API.
"""
url = f"{self.endpoint}/{path}"
headers = {"X-ONEGATE-TOKEN": self.token, "X-ONEGATE-VMID": self.vm_id}
data = {"role_name": role, "cardinality": cardinality, "force": force}
url = f"{self.endpoint}/service/role/{role}"
headers = {
"X-ONEGATE-TOKEN": self.token,
"X-ONEGATE-VMID": self.vm_id,
"Content-Type": "application/json",
}
data = {"cardinality": cardinality}
try:
response = requests.post(url, headers=headers, json=data)
response = requests.put(url, headers=headers, json=data)
response.raise_for_status()
except requests.exceptions.RequestException as e:
status_code = e.response.status_code if e.response else None
raise OneGateError(f"POST request to {url} failed: {e}", status_code)
raise OneGateError(f"PUT request to {url} failed: {e}", status_code)
23 changes: 12 additions & 11 deletions lithops/serverless/backends/one/one.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,9 @@ def __init__(self, one_config, internal_storage):

def invoke(self, docker_image_name, runtime_memory, job_payload):
# Wait for nodes to become available in Kubernetes
oneke_workers = sum(
1
for role in self.client.get("service").get("SERVICE", {}).get("roles", [])
if role.get("name").lower() == "worker"
)
for role in self.client.get("service").get("SERVICE", {}).get("roles", []):
if role.get("name").lower() == "worker":
oneke_workers = role.get("cardinality")
self._wait_kubernetes_nodes(oneke_workers)

# Scale nodes
Expand Down Expand Up @@ -90,7 +88,10 @@ def clear(self, job_keys=None):
super().clear(job_keys)
super()._get_nodes()

if self.auto_scale in {"all", "down"}:
if (
self.auto_scale in {"all", "down"}
and len(self.nodes) > self.minimum_nodes
):
self._scale_oneke(self.nodes, self.minimum_nodes)

def _check_service_status(self):
Expand Down Expand Up @@ -128,14 +129,14 @@ def _get_kube_config(self):
logger.debug(f"OpenNebula OneKE Kubeconfig: {decoded_kubeconfig}")
return decoded_kubeconfig

def _wait_for_state(self, state):
def _wait_for_state(self, state, timeout=300):
start_time = time.time()
minutes_timeout = int(self.timeout / 60)
minutes_timeout = int(timeout / 60)
logger.info(
f"Waiting for OneKE service to become {state}. "
f"Wait time: {minutes_timeout} minutes"
)
while (elapsed_time := time.time() - start_time) <= self.timeout:
while (elapsed_time := time.time() - start_time) <= timeout:
service_state = (
self.client.get("service", {}).get("SERVICE", {}).get("state")
)
Expand All @@ -146,7 +147,7 @@ def _wait_for_state(self, state):
return
time.sleep(1)
raise OneError(
f"Unable to reach {state} state. OneKE timed out after {self.timeout} seconds. "
f"Unable to reach {state} state. OneKE timed out after {timeout} seconds. "
f"Please retry when OneKE is in the 'RUNNING' state."
)

Expand Down Expand Up @@ -178,7 +179,7 @@ def _scale_oneke(self, nodes, scale_nodes):
"OneKE service is in 'COOLDOWN' state and does not need to be scaled"
)
return
self.client.post(f'service/{service.get("id")}/scale', int(scale_nodes))
self.client.scale(int(scale_nodes))
self._wait_for_state("COOLDOWN")

def deploy_runtime(self, docker_image_name, memory, timeout):
Expand Down

0 comments on commit c70dacb

Please sign in to comment.