diff --git a/src/nvim/channel.c b/src/nvim/channel.c index 8b8d27affd..3a45a8aec7 100644 --- a/src/nvim/channel.c +++ b/src/nvim/channel.c @@ -22,19 +22,10 @@ PMap(uint64_t) *channels = NULL; /// 2 is reserved for stderr channel static uint64_t next_chan_id = CHAN_STDERR+1; - -typedef struct { - Channel *chan; - Callback *callback; - const char *type; - // if reader is set, status is ignored. - CallbackReader *reader; - int status; -} ChannelEvent; - #ifdef INCLUDE_GENERATED_DECLARATIONS # include "channel.c.generated.h" #endif + /// Teardown the module void channel_teardown(void) { @@ -179,6 +170,7 @@ static Channel *channel_alloc(ChannelStreamType type) } chan->events = multiqueue_new_child(main_loop.events); chan->refcount = 1; + chan->exit_status = -1; chan->streamtype = type; pmap_put(uint64_t)(channels, chan->id, chan); return chan; @@ -234,9 +226,10 @@ void callback_reader_free(CallbackReader *reader) ga_clear(&reader->buffer); } -void callback_reader_start(CallbackReader *reader) +void callback_reader_start(CallbackReader *reader, const char *type) { ga_init(&reader->buffer, sizeof(char *), 32); + reader->type = type; } static void free_channel_event(void **argv) @@ -246,7 +239,7 @@ static void free_channel_event(void **argv) rpc_free(chan); } - callback_reader_free(&chan->on_stdout); + callback_reader_free(&chan->on_data); callback_reader_free(&chan->on_stderr); callback_free(&chan->on_exit); @@ -286,7 +279,7 @@ Channel *channel_job_start(char **argv, CallbackReader on_stdout, assert(cwd == NULL || os_isdir_executable(cwd)); Channel *chan = channel_alloc(kChannelStreamProc); - chan->on_stdout = on_stdout; + chan->on_data = on_stdout; chan->on_stderr = on_stderr; chan->on_exit = on_exit; @@ -326,7 +319,7 @@ Channel *channel_job_start(char **argv, CallbackReader on_stdout, has_out = true; has_err = false; } else { - has_out = rpc || callback_reader_set(chan->on_stdout); + has_out = rpc || callback_reader_set(chan->on_data); has_err = callback_reader_set(chan->on_stderr); } int status = process_spawn(proc, true, has_out, has_err); @@ -352,13 +345,13 @@ Channel *channel_job_start(char **argv, CallbackReader on_stdout, rpc_start(chan); } else { if (has_out) { - callback_reader_start(&chan->on_stdout); - rstream_start(&proc->out, on_job_stdout, chan); + callback_reader_start(&chan->on_data, "stdout"); + rstream_start(&proc->out, on_channel_data, chan); } } if (has_err) { - callback_reader_start(&chan->on_stderr); + callback_reader_start(&chan->on_stderr, "stderr"); rstream_init(&proc->err, 0); rstream_start(&proc->err, on_job_stderr, chan); } @@ -402,9 +395,9 @@ uint64_t channel_connect(bool tcp, const char *address, if (rpc) { rpc_start(channel); } else { - channel->on_stdout = on_output; - callback_reader_start(&channel->on_stdout); - rstream_start(&channel->stream.socket, on_socket_output, channel); + channel->on_data = on_output; + callback_reader_start(&channel->on_data, "data"); + rstream_start(&channel->stream.socket, on_channel_data, channel); } end: @@ -452,9 +445,9 @@ uint64_t channel_from_stdio(bool rpc, CallbackReader on_output, if (rpc) { rpc_start(channel); } else { - channel->on_stdout = on_output; - callback_reader_start(&channel->on_stdout); - rstream_start(&channel->stream.stdio.in, on_stdio_input, channel); + channel->on_data = on_output; + callback_reader_start(&channel->on_data, "stdin"); + rstream_start(&channel->stream.stdio.in, on_channel_data, channel); } return channel->id; @@ -519,55 +512,22 @@ static inline list_T *buffer_to_tv_list(const char *const buf, const size_t len) return l; } -// vimscript job callbacks must be executed on Nvim main loop -static inline void process_channel_event(Channel *chan, Callback *callback, - const char *type, - CallbackReader *reader, int status) -{ - assert(callback); - ChannelEvent *event_data = xmalloc(sizeof(*event_data)); - event_data->reader = reader; - event_data->status = status; - channel_incref(chan); // Hold on ref to callback - event_data->chan = chan; - event_data->callback = callback; - event_data->type = type; - - multiqueue_put(chan->events, on_channel_event, 1, event_data); -} - -void on_job_stdout(Stream *stream, RBuffer *buf, size_t count, - void *data, bool eof) +void on_channel_data(Stream *stream, RBuffer *buf, size_t count, + void *data, bool eof) { Channel *chan = data; - on_channel_output(stream, chan, buf, count, eof, &chan->on_stdout, "stdout"); + on_channel_output(stream, chan, buf, count, eof, &chan->on_data); } void on_job_stderr(Stream *stream, RBuffer *buf, size_t count, void *data, bool eof) { Channel *chan = data; - on_channel_output(stream, chan, buf, count, eof, &chan->on_stderr, "stderr"); + on_channel_output(stream, chan, buf, count, eof, &chan->on_stderr); } -static void on_socket_output(Stream *stream, RBuffer *buf, size_t count, - void *data, bool eof) -{ - Channel *chan = data; - on_channel_output(stream, chan, buf, count, eof, &chan->on_stdout, "data"); -} - -static void on_stdio_input(Stream *stream, RBuffer *buf, size_t count, - void *data, bool eof) -{ - Channel *chan = data; - on_channel_output(stream, chan, buf, count, eof, &chan->on_stdout, "stdin"); -} - -/// @param type must have static lifetime static void on_channel_output(Stream *stream, Channel *chan, RBuffer *buf, - size_t count, bool eof, CallbackReader *reader, - const char *type) + size_t count, bool eof, CallbackReader *reader) { // stub variable, to keep reading consistent with the order of events, only // consider the count parameter. @@ -575,57 +535,93 @@ static void on_channel_output(Stream *stream, Channel *chan, RBuffer *buf, char *ptr = rbuffer_read_ptr(buf, &r); if (eof) { - if (reader->buffered) { - if (reader->cb.type != kCallbackNone) { - process_channel_event(chan, &reader->cb, type, reader, 0); - } else if (reader->self) { - if (tv_dict_find(reader->self, type, -1) == NULL) { - list_T *data = buffer_to_tv_list(reader->buffer.ga_data, - (size_t)reader->buffer.ga_len); - tv_dict_add_list(reader->self, type, strlen(type), data); - } else { - // can't display error message now, defer it. - channel_incref(chan); - multiqueue_put(chan->events, on_buffered_error, 2, chan, type); - } - ga_clear(&reader->buffer); - } else { - abort(); - } - } else if (reader->cb.type != kCallbackNone) { - process_channel_event(chan, &reader->cb, type, reader, 0); + reader->eof = true; + } else { + if (chan->term) { + terminal_receive(chan->term, ptr, count); + terminal_flush_output(chan->term); + } + + rbuffer_consumed(buf, count); + + if (callback_reader_set(*reader)) { + ga_concat_len(&reader->buffer, ptr, count); } - return; } - // The order here matters, the terminal must receive the data first because - // process_channel_event will modify the read buffer(convert NULs into NLs) - if (chan->term) { - terminal_receive(chan->term, ptr, count); - terminal_flush_output(chan->term); - } - - rbuffer_consumed(buf, count); - - if (callback_reader_set(*reader) || reader->buffered) { - // if buffer wasn't consumed, a pending callback is stalled. Aggregate the - // received data and avoid a "burst" of multiple callbacks. - bool buffer_set = reader->buffer.ga_len > 0; - ga_concat_len(&reader->buffer, ptr, count); - if (callback_reader_set(*reader) && !reader->buffered && !buffer_set) { - process_channel_event(chan, &reader->cb, type, reader, 0); - } + if (callback_reader_set(*reader)) { + schedule_channel_event(chan); } } -static void on_buffered_error(void **args) +/// schedule the necessary callbacks to be invoked as a deferred event +static void schedule_channel_event(Channel *chan) +{ + if (!chan->callback_scheduled) { + if (!chan->callback_busy) { + multiqueue_put(chan->events, on_channel_event, 1, chan); + channel_incref(chan); + } + chan->callback_scheduled = true; + } +} + +static void on_channel_event(void **args) { Channel *chan = (Channel *)args[0]; - const char *stream = (const char *)args[1]; - EMSG3(_(e_streamkey), stream, chan->id); + + chan->callback_busy = true; + chan->callback_scheduled = false; + + int exit_status = chan->exit_status; + channel_reader_callbacks(chan, &chan->on_data); + channel_reader_callbacks(chan, &chan->on_stderr); + if (exit_status > -1) { + channel_callback_call(chan, NULL); + chan->exit_status = -1; + } + + chan->callback_busy = false; + if (chan->callback_scheduled) { + // further callback was deferred to avoid recursion. + multiqueue_put(chan->events, on_channel_event, 1, chan); + channel_incref(chan); + } + channel_decref(chan); } +void channel_reader_callbacks(Channel *chan, CallbackReader *reader) +{ + if (reader->buffered) { + if (reader->eof) { + if (reader->self) { + if (tv_dict_find(reader->self, reader->type, -1) == NULL) { + list_T *data = buffer_to_tv_list(reader->buffer.ga_data, + (size_t)reader->buffer.ga_len); + tv_dict_add_list(reader->self, reader->type, strlen(reader->type), + data); + } else { + EMSG3(_(e_streamkey), reader->type, chan->id); + } + } else { + channel_callback_call(chan, reader); + } + reader->eof = false; + } + } else { + bool is_eof = reader->eof; + if (reader->buffer.ga_len > 0) { + channel_callback_call(chan, reader); + } + // if the stream reached eof, invoke extra callback with no data + if (is_eof) { + channel_callback_call(chan, reader); + reader->eof = false; + } + } +} + static void channel_process_exit_cb(Process *proc, int status, void *data) { Channel *chan = data; @@ -637,45 +633,46 @@ static void channel_process_exit_cb(Process *proc, int status, void *data) // If process did not exit, we only closed the handle of a detached process. bool exited = (status >= 0); - if (exited) { - process_channel_event(chan, &chan->on_exit, "exit", NULL, status); + if (exited && chan->on_exit.type != kCallbackNone) { + schedule_channel_event(chan); + chan->exit_status = status; } channel_decref(chan); } -static void on_channel_event(void **args) +static void channel_callback_call(Channel *chan, CallbackReader *reader) { - ChannelEvent *ev = (ChannelEvent *)args[0]; - + Callback *cb; typval_T argv[4]; argv[0].v_type = VAR_NUMBER; argv[0].v_lock = VAR_UNLOCKED; - argv[0].vval.v_number = (varnumber_T)ev->chan->id; + argv[0].vval.v_number = (varnumber_T)chan->id; - if (ev->reader) { + if (reader) { argv[1].v_type = VAR_LIST; argv[1].v_lock = VAR_UNLOCKED; - argv[1].vval.v_list = buffer_to_tv_list(ev->reader->buffer.ga_data, - (size_t)ev->reader->buffer.ga_len); + argv[1].vval.v_list = buffer_to_tv_list(reader->buffer.ga_data, + (size_t)reader->buffer.ga_len); tv_list_ref(argv[1].vval.v_list); - ga_clear(&ev->reader->buffer); + ga_clear(&reader->buffer); + cb = &reader->cb; + argv[2].vval.v_string = (char_u *)reader->type; } else { argv[1].v_type = VAR_NUMBER; argv[1].v_lock = VAR_UNLOCKED; - argv[1].vval.v_number = ev->status; + argv[1].vval.v_number = chan->exit_status; + cb = &chan->on_exit; + argv[2].vval.v_string = (char_u *)"exit"; } argv[2].v_type = VAR_STRING; argv[2].v_lock = VAR_UNLOCKED; - argv[2].vval.v_string = (uint8_t *)ev->type; typval_T rettv = TV_INITIAL_VALUE; - callback_call(ev->callback, 3, argv, &rettv); + callback_call(cb, 3, argv, &rettv); tv_clear(&rettv); - channel_decref(ev->chan); - xfree(ev); } diff --git a/src/nvim/channel.h b/src/nvim/channel.h index b856d197f1..c733e276be 100644 --- a/src/nvim/channel.h +++ b/src/nvim/channel.h @@ -42,13 +42,16 @@ typedef struct { Callback cb; dict_T *self; garray_T buffer; + bool eof; bool buffered; + const char *type; } CallbackReader; #define CALLBACK_READER_INIT ((CallbackReader){ .cb = CALLBACK_NONE, \ .self = NULL, \ .buffer = GA_EMPTY_INIT_VALUE, \ - .buffered = false }) + .buffered = false, \ + .type = NULL }) static inline bool callback_reader_set(CallbackReader reader) { return reader.cb.type != kCallbackNone || reader.self; @@ -73,9 +76,13 @@ struct Channel { RpcState rpc; Terminal *term; - CallbackReader on_stdout; + CallbackReader on_data; CallbackReader on_stderr; Callback on_exit; + int exit_status; + + bool callback_busy; + bool callback_scheduled; }; EXTERN PMap(uint64_t) *channels; diff --git a/src/nvim/eval.c b/src/nvim/eval.c index 15fc994957..42ff6ceef2 100644 --- a/src/nvim/eval.c +++ b/src/nvim/eval.c @@ -5165,7 +5165,7 @@ bool garbage_collect(bool testing) { Channel *data; map_foreach_value(channels, data, { - set_ref_in_callback_reader(&data->on_stdout, copyID, NULL, NULL); + set_ref_in_callback_reader(&data->on_data, copyID, NULL, NULL); set_ref_in_callback_reader(&data->on_stderr, copyID, NULL, NULL); set_ref_in_callback(&data->on_exit, copyID, NULL, NULL); }) diff --git a/test/functional/core/job_spec.lua b/test/functional/core/job_spec.lua index eb02610df0..86466415e5 100644 --- a/test/functional/core/job_spec.lua +++ b/test/functional/core/job_spec.lua @@ -439,16 +439,66 @@ describe('jobs', function() call add(self.data, Normalize(a:data)) sleep 200m endfunction + function! d.on_exit(job, data, event) dict + let g:exit_data = copy(self.data) + endfunction if has('win32') let cmd = 'for /L %I in (1,1,5) do @(echo %I& ping -n 2 127.0.0.1 > nul)' else let cmd = ['sh', '-c', 'for i in $(seq 1 5); do echo $i; sleep 0.1; done'] endif - call jobwait([jobstart(cmd, d)]) + let g:id = jobstart(cmd, d) + sleep 1500m + call jobwait([g:id]) ]]) local expected = {'1', '2', '3', '4', '5', ''} local chunks = eval('d.data') + -- check nothing was received after exit, including EOF + eq(eval('g:exit_data'), chunks) + local received = {''} + for i, chunk in ipairs(chunks) do + if i < #chunks then + -- if chunks got joined, a spurious [''] callback was not sent + neq({''}, chunk) + else + -- but EOF callback is still sent + eq({''}, chunk) + end + received[#received] = received[#received]..chunk[1] + for j = 2, #chunk do + received[#received+1] = chunk[j] + end + end + eq(expected, received) + end) + + it('does not invoke callbacks recursively', function() + source([[ + let d = {'data': []} + function! d.on_stdout(job, data, event) dict + " if callbacks were invoked recursively, this would cause on_stdout + " to be invoked recursively and the data reversed on the call stack + sleep 200m + call add(self.data, Normalize(a:data)) + endfunction + function! d.on_exit(job, data, event) dict + let g:exit_data = copy(self.data) + endfunction + if has('win32') + let cmd = 'for /L %I in (1,1,5) do @(echo %I& ping -n 2 127.0.0.1 > nul)' + else + let cmd = ['sh', '-c', 'for i in $(seq 1 5); do echo $i; sleep 0.1; done'] + endif + let g:id = jobstart(cmd, d) + sleep 1500m + call jobwait([g:id]) + ]]) + + local expected = {'1', '2', '3', '4', '5', ''} + local chunks = eval('d.data') + -- check nothing was received after exit, including EOF + eq(eval('g:exit_data'), chunks) local received = {''} for i, chunk in ipairs(chunks) do if i < #chunks then