Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix #645 #671 asyncio loop support #678

Merged
merged 3 commits into from
Mar 18, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion autoload/deoplete/init.vim
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ function! deoplete#init#_variables() abort
call deoplete#util#set_default(
\ 'g:deoplete#complete_method', 'complete')
call deoplete#util#set_default(
\ 'g:deoplete#num_processes', 4)
\ 'g:deoplete#num_processes', s:is_windows ? 1 : 4)

call deoplete#util#set_default(
\ 'g:deoplete#keyword_patterns', {})
Expand Down
5 changes: 3 additions & 2 deletions doc/deoplete.txt
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ You can enable Python3 interface with pip: >

pip3 install neovim

Note: deoplete needs neovim-python ver.0.1.8+.
Note: deoplete needs neovim-python ver.0.2.4+.
You need update neovim-python module.
>
pip3 install --upgrade neovim
Expand Down Expand Up @@ -202,7 +202,7 @@ g:deoplete#num_processes
feature. If it is less than equal 1, this feature is
disabled.

Default value: 4
Default value: 1 (Windows) or 4 (Others)

*g:deoplete#max_abbr_width*
g:deoplete#max_abbr_width
Expand Down Expand Up @@ -1652,6 +1652,7 @@ COMPATIBILITY *deoplete-compatibility*

2018.03.18
* limit attribute in source is removed.
* neovim-python 0.2.4+ is needed for asyncio feature.

2017.12.04
* "debug_enabled" is deprecated. Please use "is_debug_enabled" instead.
Expand Down
12 changes: 8 additions & 4 deletions rplugin/python3/deoplete/deoplete.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,7 @@

from deoplete import logger
from deoplete.parent import Parent
from deoplete.util import (error_tb, find_rplugins)
# from deoplete.util import error
from deoplete.util import (error_tb, find_rplugins, error)


class Deoplete(logger.LoggingMixin):
Expand All @@ -27,6 +26,10 @@ def __init__(self, vim):
self._max_parents = max(
[1, self._vim.vars['deoplete#num_processes']])

if self._max_parents > 1 and not hasattr(self._vim, 'loop'):
error(self._vim, 'neovim-python 0.2.4+ is required.')
return

# Enable logging before "Init" for more information, and e.g.
# deoplete-jedi picks up the log filename from deoplete's handler in
# its on_init.
Expand Down Expand Up @@ -79,10 +82,11 @@ def completion_begin(self, context):
in context['vars']):
self._vim.call('deoplete#mapping#_restore_completeopt')

# Check the previous completion
# Async update is skipped if same.
prev_candidates = context['vars'][
'deoplete#_prev_completion']['candidates']
if context['event'] == 'Async' and candidates == prev_candidates:
if (context['event'] == 'Async' and
prev_candidates and candidates == prev_candidates):
return

# error(self._vim, candidates)
Expand Down
84 changes: 58 additions & 26 deletions rplugin/python3/deoplete/parent.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,11 @@
# ============================================================================

import time
import os
import msgpack
import subprocess
from functools import partial
from queue import Queue

from deoplete import logger
from deoplete.process import Process
Expand All @@ -17,10 +22,20 @@ def __init__(self, vim, context):
self.name = 'parent'

self._vim = vim
self._proc = None
self._hnd = None
self._stdin = None
self._child = None
self._queue_id = ''
self._prev_pos = []
self._queue_in = Queue()
self._queue_out = Queue()
self._packer = msgpack.Packer(
use_bin_type=True,
encoding='utf-8',
unicode_errors='surrogateescape')
self._unpacker = msgpack.Unpacker(
encoding='utf-8',
unicode_errors='surrogateescape')
self._start_process(context)

def enable_logging(self):
Expand Down Expand Up @@ -65,48 +80,65 @@ def merge_results(self, context):

def on_event(self, context):
self._put('on_event', [context])
if context['event'] == 'VimLeavePre':
self._stop_process()

def _start_process(self, context):
if self._vim.vars['deoplete#num_processes'] > 1:
# Parallel
python3 = self._vim.vars.get('python3_host_prog', 'python3')
self._proc = Process(
[python3, context['dp_main'],
self._vim.vars['deoplete#_serveraddr']],
context, context['cwd'])

startupinfo = None
if os.name == 'nt':
startupinfo = subprocess.STARTUPINFO()
startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW

self._hnd = self._vim.loop.create_task(
self._vim.loop.subprocess_exec(
partial(Process, self),
self._vim.vars.get('python3_host_prog', 'python3'),
context['dp_main'],
self._vim.vars['deoplete#_serveraddr'],
stderr=None, cwd=context['cwd'], startupinfo=startupinfo))
else:
# Serial
from deoplete.child import Child
self._child = Child(self._vim)

def _stop_process(self):
if self._proc:
self._proc.kill()
self._proc = None

def _put(self, name, args):
queue_id = str(time.time())

if self._proc:
if self._child:
return self._child.main(name, args, queue_id)

if not self._hnd:
return None

msg = self._packer.pack({
'name': name, 'args': args, 'queue_id': queue_id
})
self._queue_in.put(msg)

if self._stdin:
try:
self._proc.write({
'name': name, 'args': args, 'queue_id': queue_id
})
while not self._queue_in.empty():
self._stdin.write(self._queue_in.get_nowait())
except BrokenPipeError as e:
error_tb(self._vim, 'Crash in child process')
error(self._vim, 'stderr=' + str(self._proc.read_error()))
self._proc.kill()
return queue_id
elif self._child:
return self._child.main(name, args, queue_id)
else:
return None
self._hnd = None
return queue_id

def _get(self, queue_id):
if not self._proc:
if not self._hnd:
return []

return [x for x in self._proc.communicate(0.02)
if x['queue_id'] == queue_id]
outs = []
while not self._queue_out.empty():
outs.append(self._queue_out.get_nowait())
return [x for x in outs if x['queue_id'] == queue_id]

def _on_connection(self, transport):
self._stdin = transport.get_pipe_transport(0)

def _on_output(self, fd, data):
self._unpacker.feed(data)
for child_out in self._unpacker:
self._queue_out.put(child_out)
82 changes: 11 additions & 71 deletions rplugin/python3/deoplete/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,80 +4,20 @@
# License: MIT license
# ============================================================================

import subprocess
import os
import msgpack
from threading import Thread
from queue import Queue
from time import time, sleep
import asyncio


class Process(object):
def __init__(self, commands, context, cwd):
startupinfo = None
if os.name == 'nt':
startupinfo = subprocess.STARTUPINFO()
startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW
self._proc = subprocess.Popen(commands,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
startupinfo=startupinfo,
cwd=cwd)
self._context = context
self._packer = msgpack.Packer(
use_bin_type=True,
encoding='utf-8',
unicode_errors='surrogateescape')
self._unpacker = msgpack.Unpacker(
encoding='utf-8',
unicode_errors='surrogateescape')
self._queue_out = Queue()
self._thread = Thread(target=self.enqueue_output)
self._thread.start()
class Process(asyncio.SubprocessProtocol):

def kill(self):
if not self._proc:
return
self._proc.kill()
self._proc.wait()
self._proc = None
self._queue_out = None
self._thread.join(1.0)
self._thread = None
def __init__(self, plugin):
self._plugin = plugin
self._vim = plugin._vim

def enqueue_output(self):
while self._proc and self._proc.stdout:
b = self._proc.stdout.raw.read(102400)
if b is None:
continue
if b == b'':
# EOF
break
self._unpacker.feed(b)
for child_out in self._unpacker:
self._queue_out.put(child_out)
def connection_made(self, transport):
self._vim.async_call(self._plugin._on_connection, transport)

def communicate(self, timeout):
if not self._proc:
return []
def pipe_data_received(self, fd, data):
self._vim.async_call(self._plugin._on_output, fd, data)

end = time() + timeout
while self._queue_out.empty() and time() < end:
sleep(0.005)

outs = []
while not self._queue_out.empty():
outs.append(self._queue_out.get_nowait())
return outs

def read_error(self):
if not self._proc or not self._proc.stderr:
return ''
return self._proc.stderr.read()

def write(self, expr):
if not self._proc or not self._proc.stdin:
return
self._proc.stdin.write(self._packer.pack(expr))
self._proc.stdin.flush()
def process_exited(self):
pass