From 018714fe05a12436e767af32b0a1d867b2acd9d1 Mon Sep 17 00:00:00 2001 From: "tetsuya.hasegawa" Date: Sat, 8 Jul 2023 18:23:20 +0900 Subject: [PATCH 1/4] add jupyter server api runtime --- rplugin/python3/magma/jupyter_server_api.py | 114 ++++++++++++++++++++ rplugin/python3/magma/runtime.py | 32 +++--- rplugin/python3/magma/runtime_state.py | 7 ++ 3 files changed, 140 insertions(+), 13 deletions(-) create mode 100644 rplugin/python3/magma/jupyter_server_api.py create mode 100644 rplugin/python3/magma/runtime_state.py diff --git a/rplugin/python3/magma/jupyter_server_api.py b/rplugin/python3/magma/jupyter_server_api.py new file mode 100644 index 0000000..076ad35 --- /dev/null +++ b/rplugin/python3/magma/jupyter_server_api.py @@ -0,0 +1,114 @@ +import json +import uuid +import re +from queue import Empty as EmptyQueueException +from typing import Any, Dict +from threading import Thread +from queue import Queue +from urllib.parse import urlparse + +import requests +import websocket + +from magma.runtime_state import RuntimeState + + +class JupyterAPIClient: + def __init__(self, + url: str, + kernel_info: Dict[str, Any], + headers: Dict[str, str]): + self._base_url = url + self._kernel_info = kernel_info + self._headers = headers + + self._recv_queue: Queue[Dict[str, Any]] = Queue() + + def wait_for_ready(self, timeout=0): + pass + + def start_channels(self) -> None: + parsed_url = urlparse(self._base_url) + self._socket = websocket.create_connection(f"ws://{parsed_url.hostname}:{parsed_url.port}" + f"/api/kernels/{self._kernel_info['id']}/channels", + header=self._headers, + ) + self._kernel_api_base = f"{self._base_url}/api/kernels/{self._kernel_info['id']}" + + self._iopub_recv_thread = Thread(target=self._recv_message) + self._iopub_recv_thread.start() + + def _recv_message(self) -> None: + while True: + response = json.loads(self._socket.recv()) + self._recv_queue.put(response) + + def get_iopub_msg(self, **kwargs): + if self._recv_queue.empty(): + raise EmptyQueueException + + response = self._recv_queue.get() + + return response + + def execute(self, code: str): + header = { + 'msg_type': 'execute_request', + 'msg_id': uuid.uuid1().hex, + 'session': uuid.uuid1().hex + } + + message = json.dumps({ + 'header': header, + 'parent_header': header, + 'metadata': {}, + 'content': { + 'code': code, + 'silent': False + } + }) + self._socket.send(message) + + def shutdown(self): + requests.delete(self._kernel_api_base, + headers=self._headers) + self._socket.close() + + +class JupyterAPIManager: + def __init__(self, + url: str, + ): + parsed_url = urlparse(url) + self._base_url = f"{parsed_url.scheme}://{parsed_url.netloc}" + + token_part = re.search(r"token=(.*)", parsed_url.query) + + if token_part: + token = token_part.groups()[0] + self._headers = {'Authorization': 'token ' + token} + else: + # Run notebook with --NotebookApp.disable_check_xsrf="True". + self._headers = {} + + def start_kernel(self) -> None: + url = f"{self._base_url}/api/kernels" + response = requests.post(url, + headers=self._headers) + self._kernel_info = json.loads(response.text) + assert "id" in self._kernel_info, "Could not connect to Jupyter Server API. The URL specified may be incorrect." + self._kernel_api_base = f"{url}/{self._kernel_info['id']}" + + def client(self) -> JupyterAPIClient: + return JupyterAPIClient(url=self._base_url, + kernel_info=self._kernel_info, + headers=self._headers) + + def interrupt_kernel(self) -> None: + requests.post(f"{self._kernel_api_base}/interrupt", + headers=self._headers) + + def restart_kernel(self) -> None: + self.state = RuntimeState.STARTING + requests.post(f"{self._kernel_api_base}/restart", + headers=self._headers) diff --git a/rplugin/python3/magma/runtime.py b/rplugin/python3/magma/runtime.py index 7a994e6..6af9eb2 100644 --- a/rplugin/python3/magma/runtime.py +++ b/rplugin/python3/magma/runtime.py @@ -1,5 +1,4 @@ -from typing import Optional, Tuple, List, Dict, Generator, IO, Any -from enum import Enum +from typing import Optional, Tuple, List, Dict, Generator, IO, Any, Union from contextlib import contextmanager from queue import Empty as EmptyQueueException import os @@ -8,6 +7,7 @@ import jupyter_client +from magma.runtime_state import RuntimeState from magma.options import MagmaOptions from magma.outputchunks import ( Output, @@ -18,20 +18,15 @@ to_outputchunk, clean_up_text ) - - -class RuntimeState(Enum): - STARTING = 0 - IDLE = 1 - RUNNING = 2 +from magma.jupyter_server_api import JupyterAPIClient, JupyterAPIManager class JupyterRuntime: state: RuntimeState kernel_name: str - kernel_manager: jupyter_client.KernelManager - kernel_client: jupyter_client.KernelClient + kernel_manager: Union[jupyter_client.KernelManager, JupyterAPIManager] + kernel_client: Union[jupyter_client.KernelClient, JupyterAPIClient] allocated_files: List[str] @@ -41,7 +36,18 @@ def __init__(self, kernel_name: str, options: MagmaOptions): self.state = RuntimeState.STARTING self.kernel_name = kernel_name - if ".json" not in self.kernel_name: + if kernel_name.startswith("http://"): + self.external_kernel = True + self.kernel_manager = JupyterAPIManager(kernel_name) + self.kernel_manager.start_kernel() + self.kernel_client = self.kernel_manager.client() + self.kernel_client.start_channels() + + self.allocated_files = [] + + self.options = options + + elif ".json" not in self.kernel_name: self.external_kernel = True self.kernel_manager = jupyter_client.manager.KernelManager( @@ -202,8 +208,8 @@ def tick(self, output: Optional[Output]) -> bool: assert isinstance( self.kernel_client, jupyter_client.blocking.client.BlockingKernelClient, - ) - + ) or isinstance( + self.kernel_client, JupyterAPIClient) if not self.is_ready(): try: self.kernel_client.wait_for_ready(timeout=0) diff --git a/rplugin/python3/magma/runtime_state.py b/rplugin/python3/magma/runtime_state.py new file mode 100644 index 0000000..123041d --- /dev/null +++ b/rplugin/python3/magma/runtime_state.py @@ -0,0 +1,7 @@ +from enum import Enum + + +class RuntimeState(Enum): + STARTING = 0 + IDLE = 1 + RUNNING = 2 From 46f18cce57228dcd8fa6a1e72ab4808576a3e8eb Mon Sep 17 00:00:00 2001 From: "tetsuya.hasegawa" Date: Sat, 8 Jul 2023 18:59:16 +0900 Subject: [PATCH 2/4] add wait_for_ready --- rplugin/python3/magma/jupyter_server_api.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/rplugin/python3/magma/jupyter_server_api.py b/rplugin/python3/magma/jupyter_server_api.py index 076ad35..00a18a0 100644 --- a/rplugin/python3/magma/jupyter_server_api.py +++ b/rplugin/python3/magma/jupyter_server_api.py @@ -1,6 +1,7 @@ import json import uuid import re +import time from queue import Empty as EmptyQueueException from typing import Any, Dict from threading import Thread @@ -24,8 +25,13 @@ def __init__(self, self._recv_queue: Queue[Dict[str, Any]] = Queue() - def wait_for_ready(self, timeout=0): - pass + def wait_for_ready(self, **kwargs): + while True: + response = requests.get(self._kernel_api_base, + headers=self._headers) + response = json.loads(response.text) + if response["execution_state"] == "idle": + return def start_channels(self) -> None: parsed_url = urlparse(self._base_url) From 2b38121a46853ce7ef64cf5a46e9bcd37679ec66 Mon Sep 17 00:00:00 2001 From: "tetsuya.hasegawa" Date: Sat, 8 Jul 2023 21:37:33 +0900 Subject: [PATCH 3/4] Add https to the conditions for using the jupyter server api --- rplugin/python3/magma/runtime.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rplugin/python3/magma/runtime.py b/rplugin/python3/magma/runtime.py index 6af9eb2..c5b1c9b 100644 --- a/rplugin/python3/magma/runtime.py +++ b/rplugin/python3/magma/runtime.py @@ -36,7 +36,7 @@ def __init__(self, kernel_name: str, options: MagmaOptions): self.state = RuntimeState.STARTING self.kernel_name = kernel_name - if kernel_name.startswith("http://"): + if kernel_name.startswith("http://") or kernel_name.startswith("https://"): self.external_kernel = True self.kernel_manager = JupyterAPIManager(kernel_name) self.kernel_manager.start_kernel() From 2aa12bf6bd266aade6f33dbf6a17d61a2690ab38 Mon Sep 17 00:00:00 2001 From: opqrstuvcut Date: Mon, 10 Jul 2023 17:39:08 +0900 Subject: [PATCH 4/4] fix wait condition --- rplugin/python3/magma/jupyter_server_api.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/rplugin/python3/magma/jupyter_server_api.py b/rplugin/python3/magma/jupyter_server_api.py index 00a18a0..bb5b9de 100644 --- a/rplugin/python3/magma/jupyter_server_api.py +++ b/rplugin/python3/magma/jupyter_server_api.py @@ -30,8 +30,10 @@ def wait_for_ready(self, **kwargs): response = requests.get(self._kernel_api_base, headers=self._headers) response = json.loads(response.text) - if response["execution_state"] == "idle": + if response["execution_state"] in ("idle", "starting"): return + time.sleep(0.1) + def start_channels(self) -> None: parsed_url = urlparse(self._base_url)