-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathsubprocess_nonblocking.py
168 lines (139 loc) · 5.62 KB
/
subprocess_nonblocking.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
# recipe-440554-1
# This is from http://code.activestate.com/recipes/440554/
# With modifications from Blake Oliver <oliver22213@me.com>
import os
import subprocess
import errno
import time
import sys
PIPE = subprocess.PIPE
# python 3.x does not have a mswindows property in the subprocess module
# therefore, use os/sys instead to determine the OS.
if sys.platform == "win32":
from win32file import ReadFile, WriteFile
from win32pipe import PeekNamedPipe
import msvcrt
else: # unix-like OS
import select
import fcntl
class Popen(subprocess.Popen):
def recv(self, maxsize=None):
return self._recv('stdout', maxsize)
def recv_err(self, maxsize=None):
"""Return up to maxsize characters from stderr."""
return self._recv('stderr', maxsize)
def send_recv(self, input='', maxsize=None):
"""Send input, and receive up to maxsize chars from stdout and stderr. Return a tuple of the form (sent_chars, stdout_output, stderr_output)."""
return self.send(input), self.recv(maxsize), self.recv_err(maxsize)
def get_conn_maxsize(self, which, maxsize):
"""Return a sane number for the maxsize parameter."""
if maxsize is None:
maxsize = 1024
elif maxsize < 1:
maxsize = 1
return getattr(self, which), maxsize
def _close(self, which):
getattr(self, which).close()
setattr(self, which, None)
if sys.platform == "win32":
# define the send method for windows
def send(self, input):
"""Send the given string to the subprocess's stdin stream. Return None if stdin was not directed to a pipe."""
if not self.stdin:
return None
try:
x = msvcrt.get_osfhandle(self.stdin.fileno())
(errCode, written) = WriteFile(x, input)
except ValueError:
return self._close('stdin')
except (subprocess.pywintypes.error, Exception) as why:
if why[0] in (109, errno.ESHUTDOWN):
return self._close('stdin')
raise
return written
def _recv(self, which, maxsize):
"""Return up to maxsize chars from eithe r'stdin' or 'stderr'."""
conn, maxsize = self.get_conn_maxsize(which, maxsize)
if conn is None:
return None
try:
x = msvcrt.get_osfhandle(conn.fileno())
(read, nAvail, nMessage) = PeekNamedPipe(x, 0)
if maxsize < nAvail:
nAvail = maxsize
if nAvail > 0:
(errCode, read) = ReadFile(x, nAvail, None)
except ValueError:
return self._close(which)
except (subprocess.pywintypes.error, Exception) as why:
if why[0] in (109, errno.ESHUTDOWN):
return self._close(which)
raise
if self.universal_newlines:
if sys.version_info.major == 2:
read = self._translate_newlines(read)
else:
read = self._translate_newlines(read, "utf-8", "strict")
return read
else: # define for unix-like OSs
def send(self, input):
if not self.stdin:
return None
if not select.select([], [self.stdin], [], 0)[1]:
return 0
try:
written = os.write(self.stdin.fileno(), input)
except OSError as why:
if why[0] == errno.EPIPE: #broken pipe
return self._close('stdin')
raise
return written
def _recv(self, which, maxsize):
conn, maxsize = self.get_conn_maxsize(which, maxsize)
if conn is None:
return None
flags = fcntl.fcntl(conn, fcntl.F_GETFL)
if not conn.closed:
fcntl.fcntl(conn, fcntl.F_SETFL, flags| os.O_NONBLOCK)
try:
if not select.select([conn], [], [], 0)[0]:
return ''
r = conn.read(maxsize)
if not r:
return self._close(which)
if self.universal_newlines:
r = self._translate_newlines(r)
return r
finally:
if not conn.closed:
fcntl.fcntl(conn, fcntl.F_SETFL, flags)
def recv_some(subproc, timeout=0.1, throw_exception=True, tries=5, stderr=False):
if tries < 1:
tries = 1
x = time.time()+timeout
data = [] # a list of chunks of data we've received from the pipe
received = '' # string of characters we've received from the pipe
pipe_receive = subproc.recv
if stderr:
pipe_receive = subproc.recv_err
while time.time() < x or received:
received = pipe_receive()
if received == None: # if we didn't get any output
if throw_exception:
raise PipeClosedError
else: # do not raise an exception
break
elif received: # we did get some data
data.append(received)
else: # we probably got an empty string, so the pipe is open but there's nothing new
time.sleep(max((x-time.time())/tries, 0))
# return all of the data concatenated
return ''.join(data)
def send_all(subproc, data):
while len(data):
sent = subproc.send(data)
if sent is None:
raise PipeClosedError
data = buffer(data, sent)
class PipeClosedError(Exception):
pass