Skip to content

Commit

Permalink
Fix #645 #671 asyncio loop support
Browse files Browse the repository at this point in the history
  • Loading branch information
Shougo committed Mar 12, 2018
1 parent c3c9406 commit 47b86cf
Show file tree
Hide file tree
Showing 4 changed files with 77 additions and 101 deletions.
2 changes: 1 addition & 1 deletion doc/deoplete.txt
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ You can enable Python3 interface with pip: >
pip3 install neovim
Note: deoplete needs neovim-python ver.0.1.8+.
Note: deoplete needs neovim-python ver.0.2.4+.
You need update neovim-python module.
>
pip3 install --upgrade neovim
Expand Down
10 changes: 7 additions & 3 deletions rplugin/python3/deoplete/deoplete.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,7 @@

from deoplete import logger
from deoplete.parent import Parent
from deoplete.util import (error_tb, find_rplugins)
# from deoplete.util import error
from deoplete.util import (error_tb, find_rplugins, error)


class Deoplete(logger.LoggingMixin):
Expand All @@ -27,6 +26,10 @@ def __init__(self, vim):
self._max_parents = max(
[1, self._vim.vars['deoplete#num_processes']])

if self._max_parents > 1 and not hasattr(self._vim, 'loop'):
error(self._vim, 'neovim-python 0.2.4+ is required.')
return

# Enable logging before "Init" for more information, and e.g.
# deoplete-jedi picks up the log filename from deoplete's handler in
# its on_init.
Expand Down Expand Up @@ -82,7 +85,8 @@ def completion_begin(self, context):
# Check the previous completion
prev_candidates = context['vars'][
'deoplete#_prev_completion']['candidates']
if context['event'] == 'Async' and candidates == prev_candidates:
if (context['event'] == 'Async' and
not prev_candidates and candidates == prev_candidates):

This comment has been minimized.

Copy link
@blueyed

blueyed Mar 12, 2018

Contributor

The condition looks strange. Should the last and be an or?

This comment has been minimized.

Copy link
@Shougo

Shougo Mar 12, 2018

Author Owner

I don't understand.
Please describe it more clearly.

This comment has been minimized.

Copy link
@blueyed

blueyed Mar 12, 2018

Contributor

and not prev_candidates and candidates == prev_candidates is the same as and not prev_candidates and not prev_candidates (given lists).
Maybe the Check the previous completion comment above should get extended to what this check is supposed to do.

This comment has been minimized.

Copy link
@Shougo

Shougo Mar 12, 2018

Author Owner

I get it.
I have fixed it.

return

# error(self._vim, candidates)
Expand Down
84 changes: 58 additions & 26 deletions rplugin/python3/deoplete/parent.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,11 @@
# ============================================================================

import time
import os
import msgpack
import subprocess
from functools import partial
from queue import Queue

from deoplete import logger
from deoplete.process import Process
Expand All @@ -17,10 +22,20 @@ def __init__(self, vim, context):
self.name = 'parent'

self._vim = vim
self._proc = None
self._hnd = None
self._stdin = None
self._child = None
self._queue_id = ''
self._prev_pos = []
self._queue_in = Queue()
self._queue_out = Queue()
self._packer = msgpack.Packer(
use_bin_type=True,
encoding='utf-8',
unicode_errors='surrogateescape')
self._unpacker = msgpack.Unpacker(
encoding='utf-8',
unicode_errors='surrogateescape')
self._start_process(context)

def enable_logging(self):
Expand Down Expand Up @@ -65,48 +80,65 @@ def merge_results(self, context):

def on_event(self, context):
self._put('on_event', [context])
if context['event'] == 'VimLeavePre':
self._stop_process()

def _start_process(self, context):
if self._vim.vars['deoplete#num_processes'] > 1:
# Parallel
python3 = self._vim.vars.get('python3_host_prog', 'python3')
self._proc = Process(
[python3, context['dp_main'],
self._vim.vars['deoplete#_serveraddr']],
context, context['cwd'])

startupinfo = None
if os.name == 'nt':
startupinfo = subprocess.STARTUPINFO()
startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW

self._hnd = self._vim.loop.create_task(
self._vim.loop.subprocess_exec(
partial(Process, self),
self._vim.vars.get('python3_host_prog', 'python3'),
context['dp_main'],
self._vim.vars['deoplete#_serveraddr'],
stderr=None, cwd=context['cwd'], startupinfo=startupinfo))
else:
# Serial
from deoplete.child import Child
self._child = Child(self._vim)

def _stop_process(self):
if self._proc:
self._proc.kill()
self._proc = None

def _put(self, name, args):
queue_id = str(time.time())

if self._proc:
if self._child:
return self._child.main(name, args, queue_id)

if not self._hnd:
return None

msg = self._packer.pack({
'name': name, 'args': args, 'queue_id': queue_id
})
self._queue_in.put(msg)

if self._stdin:
try:
self._proc.write({
'name': name, 'args': args, 'queue_id': queue_id
})
while not self._queue_in.empty():
self._stdin.write(self._queue_in.get_nowait())
except BrokenPipeError as e:
error_tb(self._vim, 'Crash in child process')
error(self._vim, 'stderr=' + str(self._proc.read_error()))
self._proc.kill()
return queue_id
elif self._child:
return self._child.main(name, args, queue_id)
else:
return None
self._hnd = None
return queue_id

def _get(self, queue_id):
if not self._proc:
if not self._hnd:
return []

return [x for x in self._proc.communicate(0.02)
if x['queue_id'] == queue_id]
outs = []
while not self._queue_out.empty():
outs.append(self._queue_out.get_nowait())
return [x for x in outs if x['queue_id'] == queue_id]

def _on_connection(self, transport):
self._stdin = transport.get_pipe_transport(0)

def _on_output(self, fd, data):
self._unpacker.feed(data)
for child_out in self._unpacker:
self._queue_out.put(child_out)
82 changes: 11 additions & 71 deletions rplugin/python3/deoplete/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,80 +4,20 @@
# License: MIT license
# ============================================================================

import subprocess
import os
import msgpack
from threading import Thread
from queue import Queue
from time import time, sleep
import asyncio


class Process(object):
def __init__(self, commands, context, cwd):
startupinfo = None
if os.name == 'nt':
startupinfo = subprocess.STARTUPINFO()
startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW
self._proc = subprocess.Popen(commands,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
startupinfo=startupinfo,
cwd=cwd)
self._context = context
self._packer = msgpack.Packer(
use_bin_type=True,
encoding='utf-8',
unicode_errors='surrogateescape')
self._unpacker = msgpack.Unpacker(
encoding='utf-8',
unicode_errors='surrogateescape')
self._queue_out = Queue()
self._thread = Thread(target=self.enqueue_output)
self._thread.start()
class Process(asyncio.SubprocessProtocol):

def kill(self):
if not self._proc:
return
self._proc.kill()
self._proc.wait()
self._proc = None
self._queue_out = None
self._thread.join(1.0)
self._thread = None
def __init__(self, plugin):
self._plugin = plugin
self._vim = plugin._vim

def enqueue_output(self):
while self._proc and self._proc.stdout:
b = self._proc.stdout.raw.read(102400)
if b is None:
continue
if b == b'':
# EOF
break
self._unpacker.feed(b)
for child_out in self._unpacker:
self._queue_out.put(child_out)
def connection_made(self, transport):
self._vim.async_call(self._plugin._on_connection, transport)

def communicate(self, timeout):
if not self._proc:
return []
def pipe_data_received(self, fd, data):
self._vim.async_call(self._plugin._on_output, fd, data)

end = time() + timeout
while self._queue_out.empty() and time() < end:
sleep(0.005)

outs = []
while not self._queue_out.empty():
outs.append(self._queue_out.get_nowait())
return outs

def read_error(self):
if not self._proc or not self._proc.stderr:
return ''
return self._proc.stderr.read()

def write(self, expr):
if not self._proc or not self._proc.stdin:
return
self._proc.stdin.write(self._packer.pack(expr))
self._proc.stdin.flush()
def process_exited(self):
pass

0 comments on commit 47b86cf

Please sign in to comment.