Skip to content

Commit

Permalink
shellcmd: simplify serialization
Browse files Browse the repository at this point in the history
  • Loading branch information
minrk committed Oct 28, 2024
1 parent 0d677b2 commit 39a7405
Show file tree
Hide file tree
Showing 8 changed files with 191 additions and 170 deletions.
8 changes: 5 additions & 3 deletions ci/ssh/linux_Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@ FROM ubuntu:20.04
RUN --mount=type=cache,target=/var/cache/apt \
rm -f /etc/apt/apt.conf.d/docker-clean \
&& apt-get update \
&& apt-get -y install iputils-ping \
&& apt-get -y install bind9-utils \
&& apt-get -y install wget openssh-server
&& apt-get -y install \
iputils-ping \
bind9-utils \
wget \
openssh-server

ENV MAMBA_ROOT_PREFIX=/opt/conda
ENV PATH=$MAMBA_ROOT_PREFIX/bin:$PATH
Expand Down
1 change: 0 additions & 1 deletion ci/ssh/linux_docker-compose.yaml
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
version: "3"
services:
sshd:
image: ipyparallel-sshd
Expand Down
10 changes: 0 additions & 10 deletions ipyparallel/cluster/launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -1311,7 +1311,6 @@ def start(self, hostname=None, user=None, port=None):
# do some checks that setting are correct
shell_info = self.ssh_sender.get_shell_info()
python_ok = self.ssh_sender.has_python()
ipython_installed = self.ssh_sender.has_ipython_package()
if self.log:
self.log.info(
f"ssh sender object initiated (break_away_support={self.ssh_sender.breakaway_support})"
Expand Down Expand Up @@ -1364,15 +1363,6 @@ def _start_waiting(self):

def wait_one(self, timeout):
python_code = f"from ipyparallel.cluster.launcher import ssh_waitpid; ssh_waitpid({self.pid}, timeout={timeout})"
# full_cmd = (
# self.ssh_cmd
# + self.ssh_args
# # double-quote for ssh
# + [self.location, "--", self.remote_python, "-c", f"'{python_code}'"]
# )
# out = check_output(full_cmd, input=None, start_new_session=True).decode(
# "utf8", "replace"
# )
out = self.ssh_sender.check_output_python_code(python_code)
values = _ssh_outputs(out)
if 'process_running' not in values:
Expand Down
208 changes: 98 additions & 110 deletions ipyparallel/cluster/shellcmd.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,57 @@

import base64
import inspect
import json
import logging
import pathlib
import re
import shlex
import sys
import time
import warnings
from subprocess import CalledProcessError, Popen, TimeoutExpired, check_output
from tempfile import NamedTemporaryFile

from .shellcmd_receive import ShellCommandReceive
from . import shellcmd_receive

_shell_cmd_receive_tpl = '''
import base64, json
{define_receive}
def _decode(b64_json):
return json.loads(base64.b64decode(b64_json).decode("utf8"))
inputs_encoded = "{inputs_encoded}"
inputs = _decode(inputs_encoded)
receive_parameters=inputs["receive_parameters"]
command_parameters=inputs["command_parameters"]
with ShellCommandReceive(**receive_parameters) as r:
r.{method}(**command_parameters)
'''

_py_detached_tpl = '''
import json
input_json = """{json_params}"""
inputs = json.loads(json_params)
from subprocess import run, PIPE, STDOUT
with open(inputs['input_filename'], 'r') as f:
stdin = f.read()
with open(inputs['output_filename'], 'w') as f:
run(inputs["cmd_args"], input=stdin, stdout=f, stderr=STDOUT, check=True, text=True)
'''


def _encode(inputs):
"""return encoded input parameters
Should be shell escaping-safe
inverse is defined in _shell_cmd_receive_tpl
"""
return base64.b64encode(json.dumps(inputs).encode("utf8")).decode("ascii")


class ShellCommandSend:
Expand Down Expand Up @@ -64,9 +104,11 @@ class ShellCommandSend:
r"__([a-z][a-z0-9_]+)=([a-z0-9\-\.]+)__", re.IGNORECASE
)
receiver_code = pathlib.Path(
inspect.getfile(inspect.getmodule(ShellCommandReceive))
inspect.getfile(shellcmd_receive)
).read_text() # get full code of receiver side
_python_chars_map = str.maketrans({"\\": "\\\\", "'": "\\'"})
receiver_import = (
"from ipyparallel.cluster.shellcmd_receive import ShellCommandReceive"
)

def __init__(
self,
Expand Down Expand Up @@ -98,15 +140,15 @@ def __init__(
if initialize:
self.initialize()

@staticmethod
def _check_output(cmd, **kwargs):
def _check_output(self, cmd, **kwargs):
kwargs.setdefault("text", True)
self.log.debug("check_output %s", cmd)
return check_output(cmd, **kwargs)

@staticmethod
def _runs_successful(cmd):
def _runs_successful(self, cmd):
self.log.debug("Checking if %s runs successfully", cmd)
try:
check_output(cmd)
check_output(cmd, input="")
except CalledProcessError as e:
return False
return True
Expand All @@ -120,14 +162,7 @@ def _as_list(cmd):
else:
raise TypeError(f"Unknown command type: {cmd!r}")

@staticmethod
def _format_for_python(param):
if isinstance(param, str):
return f"'{param.translate(ShellCommandSend._python_chars_map)}'"
else:
return str(param)

def _cmd_start_windows_no_breakaway(self, cmd_args, py_cmd, cmd):
def _cmd_start_windows_no_breakaway(self, cmd_args, py_cmd):
# if windows platform doesn't support breakaway flag (e.g. Github Runner)
# we need to start a detached process (as work-a-round), the runs until the
# 'remote' process has finished. But we cannot directly start the command as detached
Expand All @@ -150,31 +185,15 @@ def _cmd_start_windows_no_breakaway(self, cmd_args, py_cmd, cmd):
f.write(py_cmd)

# simple python code that starts the actual cmd in a detached process
cmd_args_str = ", ".join(f'{self._format_for_python(c)}' for c in cmd_args)
if self.debugging:
detach_log = "~/ipp_shell_detach.log"
tmp = str(cmd_args_str).replace("'", "")
py_detached = (
f"from subprocess import Popen,PIPE;fo=open(r'{fo_name}','w');"
f"fi=open(r'{fi_name}','r');input=fi.read();del fi;"
"from random import randint;from datetime import datetime;ranid=randint(0,999);"
f"log=open(r'{detach_log}','a');log.write(f'{{datetime.now()}} [{{ranid}}] Popen({tmp})\\n');"
"p=Popen(["
+ cmd_args_str
+ "], stdin=PIPE, stdout=fo, stderr=fo, universal_newlines=True);"
"log.write(f'{{datetime.now()}} [{{ranid}}] after Popen\\n');"
"p.stdin.write(input);p.stdin.flush();p.communicate();"
"log.write(f'{{datetime.now()}} [{{ranid}}] after communicate\\n');"
)
else:
py_detached = (
f"from subprocess import Popen,PIPE;fo=open(r'{fo_name}','w');"
f"fi=open(r'{fi_name}','r');input=fi.read();del fi;"
f"p=Popen(["
+ cmd_args_str
+ "], stdin=PIPE, stdout=fo, stderr=fo, universal_newlines=True);"
"p.stdin.write(input);p.stdin.flush();p.communicate()"
)
inputs = dict(
input_filename=fi_name,
output_filename=fo_name,
cmd_args=cmd_args,
)
self.log.debug("Starting detached process with inputs %s", inputs)
input_json = json.dumps(inputs)
py_detached = _py_detached_tpl.format(input_json=input_json)

# now start proxy process detached
self.log.info("[ShellCommandSend._cmd_send] starting detached process...")
self.log.debug("[ShellCommandSend._cmd_send] python command: \n%s", py_cmd)
Expand All @@ -185,9 +204,7 @@ def _cmd_start_windows_no_breakaway(self, cmd_args, py_cmd, cmd):
creationflags=DETACHED_PROCESS,
)
except Exception as e:
self.log.error(
f"[ShellCommandSend._cmd_send] detached process failed: {str(e)}"
)
self.log.error(f"[ShellCommandSend._cmd_send] detached process failed: {e}")
raise e
self.log.info(
"[ShellCommandSend._cmd_send] detached process started successful. Waiting for redirected output (pid)..."
Expand All @@ -202,7 +219,7 @@ def _cmd_start_windows_no_breakaway(self, cmd_args, py_cmd, cmd):
break
if p.poll() is not None:
if p.returncode != 0:
raise CalledProcessError(p.returncode, cmd)
raise CalledProcessError(p.returncode, "cmd_start")
else:
raise Exception(
"internal error: no pid returned, although exit code of process was 0"
Expand All @@ -217,52 +234,36 @@ def _cmd_start_windows_no_breakaway(self, cmd_args, py_cmd, cmd):

return output

def _cmd_send(self, cmd, *args, **kwargs):
if not self.send_receiver_code:
preamble = "from ipyparallel.cluster.shellcmd import ShellCommandReceive\n"
else:
preamble = f"{self.receiver_code}\n"

def _cmd_send(self, method, **kwargs):
# in send receiver mode it is not required that the ipyparallel.cluster.shellcmd
# exists (or is update to date) on the 'other' side of the shell. This is particular
# useful when doing further development without copying the adapted file before each
# test run. Furthermore, the calls are much faster.
receiver_params = []
param_str = ""
receive_params = {}

# make sure that env is a dictionary with only str entries (for key and value; value can be null as well)
if "env" in kwargs and kwargs["env"]:
env = kwargs["env"]
assert isinstance(env, dict)
for key, value in env.items():
if not isinstance(key, str):
raise TypeError(
f"str expected in env dict: inappropriate key type ({key!r})."
)
if value and not isinstance(value, str):
raise TypeError(
f"str expected in env dict: inappropriate value type ({value!r}) for key '{key}'"
)

if self.debugging:
receiver_params.append("debugging=True")
receive_params["debugging"] = True
receive_params["log"] = '~/ipp_shellcmd.log'
if self.breakaway_support is False:
receiver_params.append("use_breakaway=False")
if self.debugging:
receiver_params.append("log='~/ipp_shellcmd.log'")

py_cmd = f"{preamble}\nwith ShellCommandReceive({', '.join(receiver_params)}) as r:\n r.{cmd}("
for a in args:
py_cmd += self._format_for_python(a)
if len(kwargs) > 0:
py_cmd += ", "
py_cmd += ", ".join(
f'{k}={self._format_for_python(v)}' for k, v in kwargs.items()
receive_params["use_breakaway"] = False

py_cmd = _shell_cmd_receive_tpl.format(
define_receive=self.receiver_code
if self.send_receiver_code
else self.receiver_import,
inputs_encoded=_encode(
{
"receive_parameters": receive_params,
"command_parameters": kwargs,
}
),
method=method,
)
py_cmd += ")"
cmd_args = self.shell + self.args + [self.python_path]
if cmd == 'start' and self.breakaway_support is False:
return self._cmd_start_windows_no_breakaway(cmd_args, py_cmd, cmd)
if method == 'start' and self.breakaway_support is False:
return self._cmd_start_windows_no_breakaway(cmd_args, py_cmd)
else:
return self._check_output(cmd_args, universal_newlines=True, input=py_cmd)

Expand Down Expand Up @@ -387,14 +388,6 @@ def has_python(self, python_path=None):
cmd = self.shell + self.args + [python_path, '--version']
return self._runs_successful(cmd)

def has_ipython_package(self):
"""Check if ipython package is installed in the remote python installation"""
assert self.shell_info # make sure that initialize was called already
cmd = (
self.shell + self.args + [self.python_path, "-m", "pip", "show", "ipython"]
)
return self._runs_successful(cmd)

def check_output(self, cmd, **kwargs):
"""subprocess.check_output call using the shell connection
:param cmd: command (str or list of strs) that should be executed
Expand Down Expand Up @@ -442,13 +435,8 @@ def cmd_start(self, cmd, env=None, output_file=None):
"""
# join commands into a single parameter. otherwise
assert self.shell_info # make sure that initialize was called already
if isinstance(cmd, str):
paramlist = shlex.split(cmd)
else:
paramlist = self._as_list(cmd)

return self._get_pid(
self._cmd_send("cmd_start", paramlist, env=env, output_file=output_file)
self._cmd_send("cmd_start", cmd=cmd, env=env, output_file=output_file)
)

def cmd_start_python_module(self, module_params, env=None, output_file=None):
Expand All @@ -459,9 +447,9 @@ def cmd_start_python_module(self, module_params, env=None, output_file=None):
:return: pid of started process
"""
assert self.shell_info # make sure that initialize was called already
paramlist = [self.python_path, "-m"] + self._as_list(module_params)
cmd = [self.python_path, "-m"] + self._as_list(module_params)
return self._get_pid(
self._cmd_send("cmd_start", paramlist, env=env, output_file=output_file)
self._cmd_send("cmd_start", cmd=cmd, env=env, output_file=output_file)
)

def cmd_start_python_code(self, python_code, env=None, output_file=None):
Expand All @@ -479,15 +467,15 @@ def cmd_start_python_code(self, python_code, env=None, output_file=None):
py_cmd = f'import base64;exec(base64.b64decode({encoded}).decode())'
if self._win:
py_cmd = f'"{py_cmd}"'
paramlist = [self.python_path, "-c", py_cmd]
cmd = [self.python_path, "-c", py_cmd]
return self._get_pid(
self._cmd_send("cmd_start", paramlist, env=env, output_file=output_file)
self._cmd_send("cmd_start", cmd=cmd, env=env, output_file=output_file)
)

def cmd_running(self, pid):
"""check if given (remote) pid is running"""
assert self.shell_info # make sure that initialize was called already
output = self._cmd_send("cmd_running", pid)
output = self._cmd_send("cmd_running", pid=pid)
# check output
if "__running=1__" in output:
return True
Expand All @@ -502,24 +490,24 @@ def cmd_kill(self, pid, sig=None):
"""kill (remote) process with the given pid"""
assert self.shell_info # make sure that initialize was called already
if sig:
self._cmd_send("cmd_kill", pid, sig=int(sig))
self._cmd_send("cmd_kill", pid=pid, sig=int(sig))
else:
self._cmd_send("cmd_kill", pid)
self._cmd_send("cmd_kill", pid=pid)

def cmd_mkdir(self, p):
def cmd_mkdir(self, path):
"""make directory recursively"""
assert self.shell_info # make sure that initialize was called already
self._cmd_send("cmd_mkdir", p)
self._cmd_send("cmd_mkdir", path=path)

def cmd_rmdir(self, p):
def cmd_rmdir(self, path):
"""remove directory recursively"""
assert self.shell_info # make sure that initialize was called already
self._cmd_send("cmd_rmdir", p)
self._cmd_send("cmd_rmdir", path=path)

def cmd_exists(self, p):
def cmd_exists(self, path):
"""check if file/path exists"""
assert self.shell_info # make sure that initialize was called already
output = self._cmd_send("cmd_exists", p)
output = self._cmd_send("cmd_exists", path=path)
# check output
if "__exists=1__" in output:
return True
Expand All @@ -530,7 +518,7 @@ def cmd_exists(self, p):
f"Unexpected output ({output}) returned from by the exists shell command"
)

def cmd_remove(self, p):
def cmd_remove(self, path):
"""delete remote file"""
assert self.shell_info # make sure that initialize was called already
output = self._cmd_send("cmd_remove", p)
output = self._cmd_send("cmd_remove", path=path)
Loading

0 comments on commit 39a7405

Please sign in to comment.