Python parallel plugin management.

* Main differences from ruby are inside Command class, notably it
poll/sleeps on the subprocess to check output & timeout.
* Another difference, interrupt is not instaneous due to checking var.
* Otherwise, I mainly just mirrored code into objects.
* Note that due to GVim freeze, disabling use on Windows
This commit is contained in:
Jeremy Pallats/starcraft.man
2015-02-12 11:59:50 -05:00
parent 74169f3761
commit e362fd5931
4 changed files with 423 additions and 16 deletions

404
plug.vim
View File

@@ -73,6 +73,7 @@ let s:plug_tab = get(s:, 'plug_tab', -1)
let s:plug_buf = get(s:, 'plug_buf', -1)
let s:mac_gui = has('gui_macvim') && has('gui_running')
let s:is_win = has('win32') || has('win64')
let s:py2 = has('python') && !s:is_win
let s:ruby = has('ruby') && (v:version >= 703 || v:version == 702 && has('patch374'))
let s:nvim = has('nvim') && !s:is_win
let s:me = resolve(expand('<sfile>:p'))
@@ -745,7 +746,7 @@ function! s:update_impl(pull, force, args) abort
\ 'pull': a:pull,
\ 'force': a:force,
\ 'new': {},
\ 'threads': (s:ruby || s:nvim) ? min([len(todo), threads]) : 1,
\ 'threads': (s:py2 || s:ruby || s:nvim) ? min([len(todo), threads]) : 1,
\ 'bar': '',
\ 'fin': 0
\ }
@@ -754,13 +755,17 @@ function! s:update_impl(pull, force, args) abort
call append(0, ['', ''])
normal! 2G
if s:ruby && s:update.threads > 1
if (s:py2 || s:ruby) && !s:nvim && s:update.threads > 1
try
let imd = &imd
if s:mac_gui
set noimd
endif
call s:update_ruby()
if s:ruby
call s:update_ruby()
else
call s:update_python()
endif
catch
let lines = getline(4, '$')
let printed = {}
@@ -984,6 +989,386 @@ while 1 " Without TCO, Vim stack is bound to explode
endwhile
endfunction
function! s:update_python()
python << EOF
""" Due to use of signals this function is POSIX only. """
import datetime
import functools
import os
import Queue
import random
import re
import shutil
import signal
import subprocess
import tempfile
import threading as thr
import time
import traceback
import vim
G_PULL = vim.eval('s:update.pull') == '1'
G_RETRIES = int(vim.eval('get(g:, "plug_retries", 2)')) + 1
G_TIMEOUT = int(vim.eval('get(g:, "plug_timeout", 60)'))
G_PROGRESS = vim.eval('s:progress_opt(1)')
G_LOG_PROB = 1.0 / int(vim.eval('s:update.threads'))
G_STOP = thr.Event()
class CmdTimedOut(Exception):
pass
class CmdFailed(Exception):
pass
class InvalidURI(Exception):
pass
class Action(object):
INSTALL, UPDATE, ERROR, DONE = ['+', '*', 'x', '-']
class GLog(object):
ON = None
LOGDIR = None
@classmethod
def write(cls, msg):
if cls.ON is None:
cls.ON = int(vim.eval('get(g:, "plug_log_on", 0)'))
cls.LOGDIR = os.path.expanduser(vim.eval('get(g:, "plug_logs", "~/plug_logs")'))
if cls.ON:
if not os.path.exists(cls.LOGDIR):
os.makedirs(cls.LOGDIR)
cls._write(msg)
@classmethod
def _write(cls, msg):
name = thr.current_thread().name
fname = cls.LOGDIR + os.path.sep + name
with open(fname, 'ab') as flog:
ltime = datetime.datetime.now().strftime("%H:%M:%S.%f")
msg = '[{},{}] {}{}'.format(name, ltime, msg, '\n')
flog.write(msg)
class Buffer(object):
def __init__(self, lock, num_plugs):
self.bar = ''
self.event = 'Updating' if vim.eval('s:update.pull') == '1' else 'Installing'
self.is_win = vim.eval('s:is_win') == '1'
self.lock = lock
self.maxy = int(vim.eval('winheight(".")'))
self.num_plugs = num_plugs
def _where(self, name):
""" Find first line with name in current buffer. Return line num. """
found, lnum = False, 0
matcher = re.compile('^[-+x*] {}:'.format(name))
for line in vim.current.buffer:
if matcher.search(line) is not None:
found = True
break
lnum += 1
if not found:
lnum = -1
return lnum
def header(self):
curbuf = vim.current.buffer
curbuf[0] = self.event + ' plugins ({}/{})'.format(len(self.bar), self.num_plugs)
num_spaces = self.num_plugs - len(self.bar)
curbuf[1] = '[{}{}]'.format(self.bar, num_spaces * ' ')
vim.command('normal! 2G')
if not self.is_win:
vim.command('redraw')
def write(self, *args, **kwargs):
with self.lock:
self._write(*args, **kwargs)
def _write(self, action, name, lines):
first, rest = lines[0], lines[1:]
msg = ['{} {}{}{}'.format(action, name, ': ' if first else '', first)]
padded_rest = [' ' + line for line in rest]
msg.extend(padded_rest)
try:
if action == Action.ERROR:
self.bar += 'x'
vim.command("call add(s:update.errors, '{}')".format(name))
elif action == Action.DONE:
self.bar += '='
curbuf = vim.current.buffer
lnum = self._where(name)
if lnum != -1: # Found matching line num
del curbuf[lnum]
if lnum > self.maxy and action in {Action.INSTALL, Action.UPDATE}:
lnum = 3
else:
lnum = 3
curbuf.append(msg, lnum)
self.header()
except vim.error:
GLog.write('Buffer Update FAILED.')
class Command(object):
def __init__(self, cmd, cmd_dir=None, timeout=60, ntries=3, cb=None, clean=None):
self.cmd = cmd
self.cmd_dir = cmd_dir
self.timeout = timeout
self.ntries = ntries
self.callback = cb if cb else (lambda msg: None)
self.clean = clean
def attempt_cmd(self):
""" Tries to run the command, returns result if no exceptions. """
attempt = 0
finished = False
limit = self.timeout
while not finished:
try:
attempt += 1
result = self.timeout_cmd()
finished = True
except CmdTimedOut:
if attempt != self.ntries:
for count in range(3, 0, -1):
if G_STOP.is_set():
raise KeyboardInterrupt
msg = 'Timeout. Will retry in {} second{} ...'.format(
count, 's' if count != 1 else '')
self.callback([msg])
time.sleep(1)
self.timeout += limit
self.callback(['Retrying ...'])
else:
raise
return result
def timeout_cmd(self):
""" Execute a cmd & poll for callback. Returns list of output.
Raises CmdFailed -> return code for Popen isn't 0
Raises CmdTimedOut -> command exceeded timeout without new output
"""
proc = None
first_line = True
try:
tfile = tempfile.NamedTemporaryFile()
proc = subprocess.Popen(self.cmd, cwd=self.cmd_dir, stdout=tfile,
stderr=subprocess.STDOUT, shell=True, preexec_fn=os.setsid)
while proc.poll() is None:
# Yield this thread
time.sleep(0.2)
if G_STOP.is_set():
raise KeyboardInterrupt
if first_line or random.random() < G_LOG_PROB:
first_line = False
line = nonblock_read(tfile.name)
if line:
self.callback([line])
time_diff = time.time() - os.path.getmtime(tfile.name)
if time_diff > self.timeout:
raise CmdTimedOut(['Timeout!'])
tfile.seek(0)
result = [line.rstrip() for line in tfile]
if proc.returncode != 0:
msg = ['']
msg.extend(result)
raise CmdFailed(msg)
except:
if proc and proc.poll() is None:
os.killpg(proc.pid, signal.SIGTERM)
if self.clean:
self.clean()
raise
return result
class Plugin(object):
def __init__(self, name, args, buf, lock):
self.name = name
self.args = args
self.buf = buf
self.lock = lock
tag = args.get('tag', 0)
self.checkout = esc(tag if tag else args['branch'])
self.merge = esc(tag if tag else 'origin/' + args['branch'])
def manage(self):
try:
if os.path.exists(self.args['dir']):
self.update()
else:
self.install()
with self.lock:
vim.command("let s:update.new['{}'] = 1".format(self.name))
except (CmdTimedOut, CmdFailed, InvalidURI) as exc:
self.write(Action.ERROR, self.name, exc.message)
except KeyboardInterrupt:
G_STOP.set()
self.write(Action.ERROR, self.name, ['Interrupted!'])
except:
# Any exception except those above print stack trace
msg = 'Trace:\n{}'.format(traceback.format_exc().rstrip())
self.write(Action.ERROR, self.name, msg.split('\n'))
raise
def install(self):
target = self.args['dir']
def clean(target):
def _clean():
try:
shutil.rmtree(target)
except OSError:
pass
return _clean
self.write(Action.INSTALL, self.name, ['Installing ...'])
callback = functools.partial(self.buf.write, Action.INSTALL, self.name)
cmd = 'git clone {} --recursive {} -b {} {} 2>&1'.format(
G_PROGRESS, self.args['uri'], self.checkout, esc(target))
com = Command(cmd, None, G_TIMEOUT, G_RETRIES, callback, clean(target))
result = com.attempt_cmd()
self.write(Action.DONE, self.name, result[-1:])
def update(self):
match = re.compile(r'git::?@')
actual_uri = re.sub(match, '', self.repo_uri())
expect_uri = re.sub(match, '', self.args['uri'])
if actual_uri != expect_uri:
msg = ['',
'Invalid URI: {}'.format(actual_uri),
'Expected {}'.format(expect_uri),
'PlugClean required.']
raise InvalidURI(msg)
if G_PULL:
self.write(Action.UPDATE, self.name, ['Updating ...'])
callback = functools.partial(self.buf.write, Action.UPDATE, self.name)
cmds = ['git fetch {}'.format(G_PROGRESS),
'git checkout -q {}'.format(self.checkout),
'git merge --ff-only {}'.format(self.merge),
'git submodule update --init --recursive']
cmd = ' 2>&1 && '.join(cmds)
GLog.write(cmd)
com = Command(cmd, self.args['dir'], G_TIMEOUT, G_RETRIES, callback)
result = com.attempt_cmd()
GLog.write(result)
self.write(Action.DONE, self.name, result[-1:])
else:
self.write(Action.DONE, self.name, ['Already installed'])
def repo_uri(self):
cmd = 'git rev-parse --abbrev-ref HEAD 2>&1 && git config remote.origin.url'
command = Command(cmd, self.args['dir'], G_TIMEOUT, G_RETRIES)
result = command.attempt_cmd()
return result[-1]
def write(self, action, name, msg):
GLog.write('{} {}: {}'.format(action, name, '\n'.join(msg)))
self.buf.write(action, name, msg)
class PlugThread(thr.Thread):
def __init__(self, tname, args):
super(PlugThread, self).__init__()
self.tname = tname
self.args = args
def run(self):
thr.current_thread().name = self.tname
work_q, lock, buf = self.args
try:
while not G_STOP.is_set():
name, args = work_q.get_nowait()
GLog.write('{}: Dir {}'.format(name, args['dir']))
plug = Plugin(name, args, buf, lock)
plug.manage()
work_q.task_done()
except Queue.Empty:
GLog.write('Queue now empty.')
class RefreshThread(thr.Thread):
def __init__(self, lock):
super(RefreshThread, self).__init__()
self.lock = lock
self.running = True
def run(self):
while self.running:
with self.lock:
vim.command('noautocmd normal! a')
time.sleep(0.2)
def stop(self):
self.running = False
def esc(name):
return '"' + name.replace('"', '\"') + '"'
def nonblock_read(fname):
""" Read a file with nonblock flag. Return the last line. """
fread = os.open(fname, os.O_RDONLY | os.O_NONBLOCK)
buf = os.read(fread, 100000)
os.close(fread)
line = buf.rstrip('\r\n')
left = max(line.rfind('\r'), line.rfind('\n'))
if left != -1:
left += 1
line = line[left:]
return line
def main():
thr.current_thread().name = 'main'
GLog.write('')
if GLog.ON and os.path.exists(GLog.LOGDIR):
shutil.rmtree(GLog.LOGDIR)
threads = []
nthreads = int(vim.eval('s:update.threads'))
plugs = vim.eval('s:update.todo')
mac_gui = vim.eval('s:mac_gui') == '1'
is_win = vim.eval('s:is_win') == '1'
GLog.write('Plugs: {}'.format(plugs))
GLog.write('PULL: {}, WIN: {}, MAC: {}'.format(G_PULL, is_win, mac_gui))
GLog.write('Num Threads: {}'.format(nthreads))
lock = thr.Lock()
buf = Buffer(lock, len(plugs))
work_q = Queue.Queue()
for work in plugs.items():
work_q.put(work)
GLog.write('Starting Threads')
for num in range(nthreads):
tname = 'PlugT-{0:02}'.format(num)
thread = PlugThread(tname, (work_q, lock, buf))
thread.start()
threads.append(thread)
if mac_gui:
rthread = RefreshThread(lock)
rthread.start()
GLog.write('Joining Live Threads')
for thread in threads:
thread.join()
if mac_gui:
rthread.stop()
rthread.join()
GLog.write('Cleanly Exited Main')
main()
EOF
endfunction
function! s:update_ruby()
ruby << EOF
module PlugStream
@@ -1355,8 +1740,10 @@ function! s:upgrade()
endif
elseif s:ruby
call s:upgrade_using_ruby(new)
elseif s:py2
call s:upgrade_using_python(new)
else
return s:err('curl executable or ruby support not found')
return s:err('Missing: curl executable, ruby support or python support')
endif
catch
return s:err('Error upgrading vim-plug: '. v:exception)
@@ -1384,6 +1771,15 @@ function! s:upgrade_using_ruby(new)
EOF
endfunction
function! s:upgrade_using_python(new)
python << EOF
import urllib
import vim
psrc, dest = vim.eval('s:plug_src'), vim.eval('a:new')
urllib.urlretrieve(psrc, dest)
EOF
endfunction
function! s:upgrade_specs()
for spec in values(g:plugs)
let spec.frozen = get(spec, 'frozen', 0)