From f1de097dbb236ea400150f80b909407ca9af7441 Mon Sep 17 00:00:00 2001 From: Thiago de Arruda Date: Thu, 13 Aug 2015 11:53:19 -0300 Subject: [PATCH] eval: Fix jobwait() to process multiple jobs concurrently The new event processing architecture changed `jobwait()` semantics: Only one job is processed at time since process_wait only focuses on one queue. This fixes the problem with a few changes: - Allow the event queue polled by `process_wait` to be overriden by a new argument. - Allow the parent queue to be overriden with `queue_replace_parent` - Create a temporary queue that serves as the parent for all jobs passed to `jobwait()` --- src/nvim/eval.c | 25 ++++++++++++++++++++----- src/nvim/event/loop.c | 4 ++-- src/nvim/event/process.c | 16 ++++++++++------ src/nvim/event/queue.c | 6 ++++++ src/nvim/os/shell.c | 2 +- 5 files changed, 39 insertions(+), 14 deletions(-) diff --git a/src/nvim/eval.c b/src/nvim/eval.c index ab0c7d79bb..c7c67cfca4 100644 --- a/src/nvim/eval.c +++ b/src/nvim/eval.c @@ -11037,6 +11037,7 @@ static void f_jobwait(typval_T *argvars, typval_T *rettv) list_T *rv = list_alloc(); ui_busy_start(); + Queue *waiting_jobs = queue_new_parent(loop_on_put, &loop); // For each item in the input list append an integer to the output list. -3 // is used to represent an invalid job id, -2 is for a interrupted job and // -1 for jobs that were skipped or timed out. @@ -11050,6 +11051,10 @@ static void f_jobwait(typval_T *argvars, typval_T *rettv) // status code when the job exits list_append_number(rv, -1); data->status_ptr = &rv->lv_last->li_tv.vval.v_number; + // Process any pending events for the job because we'll temporarily + // replace the parent queue + queue_process_events(data->events); + queue_replace_parent(data->events, waiting_jobs); } } @@ -11070,7 +11075,7 @@ static void f_jobwait(typval_T *argvars, typval_T *rettv) || !(data = find_job(arg->li_tv.vval.v_number))) { continue; } - int status = process_wait((Process *)&data->proc, remaining); + int status = process_wait((Process *)&data->proc, remaining, waiting_jobs); if (status < 0) { // interrupted or timed out, skip remaining jobs. if (status == -2) { @@ -11090,9 +11095,6 @@ static void f_jobwait(typval_T *argvars, typval_T *rettv) } } - // poll to ensure any pending callbacks from the last job are invoked - loop_poll_events(&loop, 0); - for (listitem_T *arg = args->lv_first; arg != NULL; arg = arg->li_next) { TerminalJobData *data = NULL; if (arg->li_tv.v_type != VAR_NUMBER @@ -11103,8 +11105,21 @@ static void f_jobwait(typval_T *argvars, typval_T *rettv) // job exits data->status_ptr = NULL; } - ui_busy_stop(); + // restore the parent queue for any jobs still alive + for (listitem_T *arg = args->lv_first; arg != NULL; arg = arg->li_next) { + TerminalJobData *data = NULL; + if (arg->li_tv.v_type != VAR_NUMBER + || !(data = pmap_get(uint64_t)(jobs, arg->li_tv.vval.v_number))) { + continue; + } + // restore the parent queue for the job + queue_process_events(data->events); + queue_replace_parent(data->events, loop.events); + } + + queue_free(waiting_jobs); + ui_busy_stop(); rv->lv_refcount++; rettv->v_type = VAR_LIST; rettv->vval.v_list = rv; diff --git a/src/nvim/event/loop.c b/src/nvim/event/loop.c index 1a50ec0d9a..3d3288f858 100644 --- a/src/nvim/event/loop.c +++ b/src/nvim/event/loop.c @@ -22,7 +22,7 @@ void loop_init(Loop *loop, void *data) loop->uv.data = loop; loop->children = kl_init(WatcherPtr); loop->children_stop_requests = 0; - loop->events = queue_new_parent(on_put, loop); + loop->events = queue_new_parent(loop_on_put, loop); loop->fast_events = queue_new_child(loop->events); uv_signal_init(&loop->uv, &loop->children_watcher); uv_timer_init(&loop->uv, &loop->children_kill_timer); @@ -59,7 +59,7 @@ void loop_poll_events(Loop *loop, int ms) queue_process_events(loop->fast_events); } -static void on_put(Queue *queue, void *data) +void loop_on_put(Queue *queue, void *data) { Loop *loop = data; // Sometimes libuv will run pending callbacks(timer for example) before diff --git a/src/nvim/event/process.c b/src/nvim/event/process.c index 54dbc11a03..81d4e690c3 100644 --- a/src/nvim/event/process.c +++ b/src/nvim/event/process.c @@ -152,7 +152,7 @@ void process_close_err(Process *proc) FUNC_ATTR_NONNULL_ALL /// indistinguishable from the process returning -1 by itself. Which /// is possible on some OS. Returns -2 if an user has interruped the /// wait. -int process_wait(Process *proc, int ms) FUNC_ATTR_NONNULL_ALL +int process_wait(Process *proc, int ms, Queue *events) FUNC_ATTR_NONNULL_ARG(1) { // The default status is -1, which represents a timeout int status = -1; @@ -162,10 +162,14 @@ int process_wait(Process *proc, int ms) FUNC_ATTR_NONNULL_ALL return proc->status; } + if (!events) { + events = proc->events; + } + // Increase refcount to stop the exit callback from being called(and possibly // being freed) before we have a chance to get the status. proc->refcount++; - LOOP_PROCESS_EVENTS_UNTIL(proc->loop, proc->events, ms, + LOOP_PROCESS_EVENTS_UNTIL(proc->loop, events, ms, // Until... got_int || // interrupted by the user proc->refcount == 1); // job exited @@ -179,10 +183,10 @@ int process_wait(Process *proc, int ms) FUNC_ATTR_NONNULL_ALL if (ms == -1) { // We can only return if all streams/handles are closed and the job // exited. - LOOP_PROCESS_EVENTS_UNTIL(proc->loop, proc->events, -1, + LOOP_PROCESS_EVENTS_UNTIL(proc->loop, events, -1, proc->refcount == 1); } else { - LOOP_PROCESS_EVENTS(proc->loop, proc->events, 0); + LOOP_PROCESS_EVENTS(proc->loop, events, 0); } } @@ -191,9 +195,9 @@ int process_wait(Process *proc, int ms) FUNC_ATTR_NONNULL_ALL // resources status = interrupted ? -2 : proc->status; decref(proc); - if (proc->events) { + if (events) { // the decref call created an exit event, process it now - queue_process_events(proc->events); + queue_process_events(events); } } else { proc->refcount--; diff --git a/src/nvim/event/queue.c b/src/nvim/event/queue.c index 3f03dd444e..19eca14144 100644 --- a/src/nvim/event/queue.c +++ b/src/nvim/event/queue.c @@ -152,6 +152,12 @@ bool queue_empty(Queue *queue) return QUEUE_EMPTY(&queue->headtail); } +void queue_replace_parent(Queue *queue, Queue *new_parent) +{ + assert(queue_empty(queue)); + queue->parent = new_parent; +} + static Event queue_remove(Queue *queue) { assert(!queue_empty(queue)); diff --git a/src/nvim/os/shell.c b/src/nvim/os/shell.c index 77750bb077..2d97c4bf4f 100644 --- a/src/nvim/os/shell.c +++ b/src/nvim/os/shell.c @@ -257,7 +257,7 @@ static int do_os_system(char **argv, // the UI ui_busy_start(); ui_flush(); - int status = process_wait(proc, -1); + int status = process_wait(proc, -1, NULL); ui_busy_stop(); // prepare the out parameters if requested