diff --git a/lithops/serverless/backends/one/gate.py b/lithops/serverless/backends/one/gate.py index 94153f3d..c0b85995 100644 --- a/lithops/serverless/backends/one/gate.py +++ b/lithops/serverless/backends/one/gate.py @@ -49,16 +49,20 @@ def get(self, path): except ValueError as e: raise OneGateError(f"Failed to parse JSON response: {e}") - def post(self, path, cardinality, role="worker", force=False): + def scale(self, cardinality, role="worker"): """ - Make a POST request to OneGate API. + Make a PUT request to OneGate API. """ - url = f"{self.endpoint}/{path}" - headers = {"X-ONEGATE-TOKEN": self.token, "X-ONEGATE-VMID": self.vm_id} - data = {"role_name": role, "cardinality": cardinality, "force": force} + url = f"{self.endpoint}/service/role/{role}" + headers = { + "X-ONEGATE-TOKEN": self.token, + "X-ONEGATE-VMID": self.vm_id, + "Content-Type": "application/json", + } + data = {"cardinality": cardinality} try: - response = requests.post(url, headers=headers, json=data) + response = requests.put(url, headers=headers, json=data) response.raise_for_status() except requests.exceptions.RequestException as e: status_code = e.response.status_code if e.response else None - raise OneGateError(f"POST request to {url} failed: {e}", status_code) + raise OneGateError(f"PUT request to {url} failed: {e}", status_code) diff --git a/lithops/serverless/backends/one/one.py b/lithops/serverless/backends/one/one.py index 8a5e4e89..a4b1c41f 100644 --- a/lithops/serverless/backends/one/one.py +++ b/lithops/serverless/backends/one/one.py @@ -54,11 +54,9 @@ def __init__(self, one_config, internal_storage): def invoke(self, docker_image_name, runtime_memory, job_payload): # Wait for nodes to become available in Kubernetes - oneke_workers = sum( - 1 - for role in self.client.get("service").get("SERVICE", {}).get("roles", []) - if role.get("name").lower() == "worker" - ) + for role in self.client.get("service").get("SERVICE", {}).get("roles", []): + if role.get("name").lower() == "worker": + oneke_workers = role.get("cardinality") self._wait_kubernetes_nodes(oneke_workers) # Scale nodes @@ -90,7 +88,10 @@ def clear(self, job_keys=None): super().clear(job_keys) super()._get_nodes() - if self.auto_scale in {"all", "down"}: + if ( + self.auto_scale in {"all", "down"} + and len(self.nodes) > self.minimum_nodes + ): self._scale_oneke(self.nodes, self.minimum_nodes) def _check_service_status(self): @@ -128,14 +129,14 @@ def _get_kube_config(self): logger.debug(f"OpenNebula OneKE Kubeconfig: {decoded_kubeconfig}") return decoded_kubeconfig - def _wait_for_state(self, state): + def _wait_for_state(self, state, timeout=300): start_time = time.time() - minutes_timeout = int(self.timeout / 60) + minutes_timeout = int(timeout / 60) logger.info( f"Waiting for OneKE service to become {state}. " f"Wait time: {minutes_timeout} minutes" ) - while (elapsed_time := time.time() - start_time) <= self.timeout: + while (elapsed_time := time.time() - start_time) <= timeout: service_state = ( self.client.get("service", {}).get("SERVICE", {}).get("state") ) @@ -146,7 +147,7 @@ def _wait_for_state(self, state): return time.sleep(1) raise OneError( - f"Unable to reach {state} state. OneKE timed out after {self.timeout} seconds. " + f"Unable to reach {state} state. OneKE timed out after {timeout} seconds. " f"Please retry when OneKE is in the 'RUNNING' state." ) @@ -178,7 +179,7 @@ def _scale_oneke(self, nodes, scale_nodes): "OneKE service is in 'COOLDOWN' state and does not need to be scaled" ) return - self.client.post(f'service/{service.get("id")}/scale', int(scale_nodes)) + self.client.scale(int(scale_nodes)) self._wait_for_state("COOLDOWN") def deploy_runtime(self, docker_image_name, memory, timeout):