diff --git a/doc/deoplete.txt b/doc/deoplete.txt index f9ecd77b..6cad145d 100644 --- a/doc/deoplete.txt +++ b/doc/deoplete.txt @@ -64,7 +64,7 @@ You can enable Python3 interface with pip: > pip3 install neovim -Note: deoplete needs neovim-python ver.0.1.8+. +Note: deoplete needs neovim-python ver.0.2.4+. You need update neovim-python module. > pip3 install --upgrade neovim diff --git a/rplugin/python3/deoplete/deoplete.py b/rplugin/python3/deoplete/deoplete.py index 6bbbc42a..24e43bb1 100644 --- a/rplugin/python3/deoplete/deoplete.py +++ b/rplugin/python3/deoplete/deoplete.py @@ -6,8 +6,7 @@ from deoplete import logger from deoplete.parent import Parent -from deoplete.util import (error_tb, find_rplugins) -# from deoplete.util import error +from deoplete.util import (error_tb, find_rplugins, error) class Deoplete(logger.LoggingMixin): @@ -27,6 +26,10 @@ def __init__(self, vim): self._max_parents = max( [1, self._vim.vars['deoplete#num_processes']]) + if self._max_parents > 1 and not hasattr(self._vim, 'loop'): + error(self._vim, 'neovim-python 0.2.4+ is required.') + return + # Enable logging before "Init" for more information, and e.g. # deoplete-jedi picks up the log filename from deoplete's handler in # its on_init. @@ -82,7 +85,8 @@ def completion_begin(self, context): # Check the previous completion prev_candidates = context['vars'][ 'deoplete#_prev_completion']['candidates'] - if context['event'] == 'Async' and candidates == prev_candidates: + if (context['event'] == 'Async' and + not prev_candidates and candidates == prev_candidates): return # error(self._vim, candidates) diff --git a/rplugin/python3/deoplete/parent.py b/rplugin/python3/deoplete/parent.py index a22b9798..2ba08aea 100644 --- a/rplugin/python3/deoplete/parent.py +++ b/rplugin/python3/deoplete/parent.py @@ -5,6 +5,11 @@ # ============================================================================ import time +import os +import msgpack +import subprocess +from functools import partial +from queue import Queue from deoplete import logger from deoplete.process import Process @@ -17,10 +22,20 @@ def __init__(self, vim, context): self.name = 'parent' self._vim = vim - self._proc = None + self._hnd = None + self._stdin = None self._child = None self._queue_id = '' self._prev_pos = [] + self._queue_in = Queue() + self._queue_out = Queue() + self._packer = msgpack.Packer( + use_bin_type=True, + encoding='utf-8', + unicode_errors='surrogateescape') + self._unpacker = msgpack.Unpacker( + encoding='utf-8', + unicode_errors='surrogateescape') self._start_process(context) def enable_logging(self): @@ -65,48 +80,65 @@ def merge_results(self, context): def on_event(self, context): self._put('on_event', [context]) - if context['event'] == 'VimLeavePre': - self._stop_process() def _start_process(self, context): if self._vim.vars['deoplete#num_processes'] > 1: # Parallel - python3 = self._vim.vars.get('python3_host_prog', 'python3') - self._proc = Process( - [python3, context['dp_main'], - self._vim.vars['deoplete#_serveraddr']], - context, context['cwd']) + + startupinfo = None + if os.name == 'nt': + startupinfo = subprocess.STARTUPINFO() + startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW + + self._hnd = self._vim.loop.create_task( + self._vim.loop.subprocess_exec( + partial(Process, self), + self._vim.vars.get('python3_host_prog', 'python3'), + context['dp_main'], + self._vim.vars['deoplete#_serveraddr'], + stderr=None, cwd=context['cwd'], startupinfo=startupinfo)) else: # Serial from deoplete.child import Child self._child = Child(self._vim) - def _stop_process(self): - if self._proc: - self._proc.kill() - self._proc = None - def _put(self, name, args): queue_id = str(time.time()) - if self._proc: + if self._child: + return self._child.main(name, args, queue_id) + + if not self._hnd: + return None + + msg = self._packer.pack({ + 'name': name, 'args': args, 'queue_id': queue_id + }) + self._queue_in.put(msg) + + if self._stdin: try: - self._proc.write({ - 'name': name, 'args': args, 'queue_id': queue_id - }) + while not self._queue_in.empty(): + self._stdin.write(self._queue_in.get_nowait()) except BrokenPipeError as e: error_tb(self._vim, 'Crash in child process') error(self._vim, 'stderr=' + str(self._proc.read_error())) - self._proc.kill() - return queue_id - elif self._child: - return self._child.main(name, args, queue_id) - else: - return None + self._hnd = None + return queue_id def _get(self, queue_id): - if not self._proc: + if not self._hnd: return [] - return [x for x in self._proc.communicate(0.02) - if x['queue_id'] == queue_id] + outs = [] + while not self._queue_out.empty(): + outs.append(self._queue_out.get_nowait()) + return [x for x in outs if x['queue_id'] == queue_id] + + def _on_connection(self, transport): + self._stdin = transport.get_pipe_transport(0) + + def _on_output(self, fd, data): + self._unpacker.feed(data) + for child_out in self._unpacker: + self._queue_out.put(child_out) diff --git a/rplugin/python3/deoplete/process.py b/rplugin/python3/deoplete/process.py index c85e3d48..54ac3e72 100644 --- a/rplugin/python3/deoplete/process.py +++ b/rplugin/python3/deoplete/process.py @@ -4,80 +4,20 @@ # License: MIT license # ============================================================================ -import subprocess -import os -import msgpack -from threading import Thread -from queue import Queue -from time import time, sleep +import asyncio -class Process(object): - def __init__(self, commands, context, cwd): - startupinfo = None - if os.name == 'nt': - startupinfo = subprocess.STARTUPINFO() - startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW - self._proc = subprocess.Popen(commands, - stdin=subprocess.PIPE, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - startupinfo=startupinfo, - cwd=cwd) - self._context = context - self._packer = msgpack.Packer( - use_bin_type=True, - encoding='utf-8', - unicode_errors='surrogateescape') - self._unpacker = msgpack.Unpacker( - encoding='utf-8', - unicode_errors='surrogateescape') - self._queue_out = Queue() - self._thread = Thread(target=self.enqueue_output) - self._thread.start() +class Process(asyncio.SubprocessProtocol): - def kill(self): - if not self._proc: - return - self._proc.kill() - self._proc.wait() - self._proc = None - self._queue_out = None - self._thread.join(1.0) - self._thread = None + def __init__(self, plugin): + self._plugin = plugin + self._vim = plugin._vim - def enqueue_output(self): - while self._proc and self._proc.stdout: - b = self._proc.stdout.raw.read(102400) - if b is None: - continue - if b == b'': - # EOF - break - self._unpacker.feed(b) - for child_out in self._unpacker: - self._queue_out.put(child_out) + def connection_made(self, transport): + self._vim.async_call(self._plugin._on_connection, transport) - def communicate(self, timeout): - if not self._proc: - return [] + def pipe_data_received(self, fd, data): + self._vim.async_call(self._plugin._on_output, fd, data) - end = time() + timeout - while self._queue_out.empty() and time() < end: - sleep(0.005) - - outs = [] - while not self._queue_out.empty(): - outs.append(self._queue_out.get_nowait()) - return outs - - def read_error(self): - if not self._proc or not self._proc.stderr: - return '' - return self._proc.stderr.read() - - def write(self, expr): - if not self._proc or not self._proc.stdin: - return - self._proc.stdin.write(self._packer.pack(expr)) - self._proc.stdin.flush() + def process_exited(self): + pass