Merge pull request #10021 from bfredl/chanevent

channel: refactor events, prevent recursive invocation of callbacks
This commit is contained in:
Björn Linse 2019-06-18 13:58:14 +02:00 committed by GitHub
commit 149dcbf2c7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 174 additions and 120 deletions

View File

@ -22,19 +22,10 @@ PMap(uint64_t) *channels = NULL;
/// 2 is reserved for stderr channel /// 2 is reserved for stderr channel
static uint64_t next_chan_id = CHAN_STDERR+1; static uint64_t next_chan_id = CHAN_STDERR+1;
typedef struct {
Channel *chan;
Callback *callback;
const char *type;
// if reader is set, status is ignored.
CallbackReader *reader;
int status;
} ChannelEvent;
#ifdef INCLUDE_GENERATED_DECLARATIONS #ifdef INCLUDE_GENERATED_DECLARATIONS
# include "channel.c.generated.h" # include "channel.c.generated.h"
#endif #endif
/// Teardown the module /// Teardown the module
void channel_teardown(void) void channel_teardown(void)
{ {
@ -179,6 +170,7 @@ static Channel *channel_alloc(ChannelStreamType type)
} }
chan->events = multiqueue_new_child(main_loop.events); chan->events = multiqueue_new_child(main_loop.events);
chan->refcount = 1; chan->refcount = 1;
chan->exit_status = -1;
chan->streamtype = type; chan->streamtype = type;
pmap_put(uint64_t)(channels, chan->id, chan); pmap_put(uint64_t)(channels, chan->id, chan);
return chan; return chan;
@ -234,9 +226,10 @@ void callback_reader_free(CallbackReader *reader)
ga_clear(&reader->buffer); ga_clear(&reader->buffer);
} }
void callback_reader_start(CallbackReader *reader) void callback_reader_start(CallbackReader *reader, const char *type)
{ {
ga_init(&reader->buffer, sizeof(char *), 32); ga_init(&reader->buffer, sizeof(char *), 32);
reader->type = type;
} }
static void free_channel_event(void **argv) static void free_channel_event(void **argv)
@ -246,7 +239,7 @@ static void free_channel_event(void **argv)
rpc_free(chan); rpc_free(chan);
} }
callback_reader_free(&chan->on_stdout); callback_reader_free(&chan->on_data);
callback_reader_free(&chan->on_stderr); callback_reader_free(&chan->on_stderr);
callback_free(&chan->on_exit); callback_free(&chan->on_exit);
@ -286,7 +279,7 @@ Channel *channel_job_start(char **argv, CallbackReader on_stdout,
assert(cwd == NULL || os_isdir_executable(cwd)); assert(cwd == NULL || os_isdir_executable(cwd));
Channel *chan = channel_alloc(kChannelStreamProc); Channel *chan = channel_alloc(kChannelStreamProc);
chan->on_stdout = on_stdout; chan->on_data = on_stdout;
chan->on_stderr = on_stderr; chan->on_stderr = on_stderr;
chan->on_exit = on_exit; chan->on_exit = on_exit;
@ -326,7 +319,7 @@ Channel *channel_job_start(char **argv, CallbackReader on_stdout,
has_out = true; has_out = true;
has_err = false; has_err = false;
} else { } else {
has_out = rpc || callback_reader_set(chan->on_stdout); has_out = rpc || callback_reader_set(chan->on_data);
has_err = callback_reader_set(chan->on_stderr); has_err = callback_reader_set(chan->on_stderr);
} }
int status = process_spawn(proc, true, has_out, has_err); int status = process_spawn(proc, true, has_out, has_err);
@ -352,13 +345,13 @@ Channel *channel_job_start(char **argv, CallbackReader on_stdout,
rpc_start(chan); rpc_start(chan);
} else { } else {
if (has_out) { if (has_out) {
callback_reader_start(&chan->on_stdout); callback_reader_start(&chan->on_data, "stdout");
rstream_start(&proc->out, on_job_stdout, chan); rstream_start(&proc->out, on_channel_data, chan);
} }
} }
if (has_err) { if (has_err) {
callback_reader_start(&chan->on_stderr); callback_reader_start(&chan->on_stderr, "stderr");
rstream_init(&proc->err, 0); rstream_init(&proc->err, 0);
rstream_start(&proc->err, on_job_stderr, chan); rstream_start(&proc->err, on_job_stderr, chan);
} }
@ -402,9 +395,9 @@ uint64_t channel_connect(bool tcp, const char *address,
if (rpc) { if (rpc) {
rpc_start(channel); rpc_start(channel);
} else { } else {
channel->on_stdout = on_output; channel->on_data = on_output;
callback_reader_start(&channel->on_stdout); callback_reader_start(&channel->on_data, "data");
rstream_start(&channel->stream.socket, on_socket_output, channel); rstream_start(&channel->stream.socket, on_channel_data, channel);
} }
end: end:
@ -452,9 +445,9 @@ uint64_t channel_from_stdio(bool rpc, CallbackReader on_output,
if (rpc) { if (rpc) {
rpc_start(channel); rpc_start(channel);
} else { } else {
channel->on_stdout = on_output; channel->on_data = on_output;
callback_reader_start(&channel->on_stdout); callback_reader_start(&channel->on_data, "stdin");
rstream_start(&channel->stream.stdio.in, on_stdio_input, channel); rstream_start(&channel->stream.stdio.in, on_channel_data, channel);
} }
return channel->id; return channel->id;
@ -519,55 +512,22 @@ static inline list_T *buffer_to_tv_list(const char *const buf, const size_t len)
return l; return l;
} }
// vimscript job callbacks must be executed on Nvim main loop void on_channel_data(Stream *stream, RBuffer *buf, size_t count,
static inline void process_channel_event(Channel *chan, Callback *callback,
const char *type,
CallbackReader *reader, int status)
{
assert(callback);
ChannelEvent *event_data = xmalloc(sizeof(*event_data));
event_data->reader = reader;
event_data->status = status;
channel_incref(chan); // Hold on ref to callback
event_data->chan = chan;
event_data->callback = callback;
event_data->type = type;
multiqueue_put(chan->events, on_channel_event, 1, event_data);
}
void on_job_stdout(Stream *stream, RBuffer *buf, size_t count,
void *data, bool eof) void *data, bool eof)
{ {
Channel *chan = data; Channel *chan = data;
on_channel_output(stream, chan, buf, count, eof, &chan->on_stdout, "stdout"); on_channel_output(stream, chan, buf, count, eof, &chan->on_data);
} }
void on_job_stderr(Stream *stream, RBuffer *buf, size_t count, void on_job_stderr(Stream *stream, RBuffer *buf, size_t count,
void *data, bool eof) void *data, bool eof)
{ {
Channel *chan = data; Channel *chan = data;
on_channel_output(stream, chan, buf, count, eof, &chan->on_stderr, "stderr"); on_channel_output(stream, chan, buf, count, eof, &chan->on_stderr);
} }
static void on_socket_output(Stream *stream, RBuffer *buf, size_t count,
void *data, bool eof)
{
Channel *chan = data;
on_channel_output(stream, chan, buf, count, eof, &chan->on_stdout, "data");
}
static void on_stdio_input(Stream *stream, RBuffer *buf, size_t count,
void *data, bool eof)
{
Channel *chan = data;
on_channel_output(stream, chan, buf, count, eof, &chan->on_stdout, "stdin");
}
/// @param type must have static lifetime
static void on_channel_output(Stream *stream, Channel *chan, RBuffer *buf, static void on_channel_output(Stream *stream, Channel *chan, RBuffer *buf,
size_t count, bool eof, CallbackReader *reader, size_t count, bool eof, CallbackReader *reader)
const char *type)
{ {
// stub variable, to keep reading consistent with the order of events, only // stub variable, to keep reading consistent with the order of events, only
// consider the count parameter. // consider the count parameter.
@ -575,31 +535,8 @@ static void on_channel_output(Stream *stream, Channel *chan, RBuffer *buf,
char *ptr = rbuffer_read_ptr(buf, &r); char *ptr = rbuffer_read_ptr(buf, &r);
if (eof) { if (eof) {
if (reader->buffered) { reader->eof = true;
if (reader->cb.type != kCallbackNone) {
process_channel_event(chan, &reader->cb, type, reader, 0);
} else if (reader->self) {
if (tv_dict_find(reader->self, type, -1) == NULL) {
list_T *data = buffer_to_tv_list(reader->buffer.ga_data,
(size_t)reader->buffer.ga_len);
tv_dict_add_list(reader->self, type, strlen(type), data);
} else { } else {
// can't display error message now, defer it.
channel_incref(chan);
multiqueue_put(chan->events, on_buffered_error, 2, chan, type);
}
ga_clear(&reader->buffer);
} else {
abort();
}
} else if (reader->cb.type != kCallbackNone) {
process_channel_event(chan, &reader->cb, type, reader, 0);
}
return;
}
// The order here matters, the terminal must receive the data first because
// process_channel_event will modify the read buffer(convert NULs into NLs)
if (chan->term) { if (chan->term) {
terminal_receive(chan->term, ptr, count); terminal_receive(chan->term, ptr, count);
terminal_flush_output(chan->term); terminal_flush_output(chan->term);
@ -607,25 +544,84 @@ static void on_channel_output(Stream *stream, Channel *chan, RBuffer *buf,
rbuffer_consumed(buf, count); rbuffer_consumed(buf, count);
if (callback_reader_set(*reader) || reader->buffered) { if (callback_reader_set(*reader)) {
// if buffer wasn't consumed, a pending callback is stalled. Aggregate the
// received data and avoid a "burst" of multiple callbacks.
bool buffer_set = reader->buffer.ga_len > 0;
ga_concat_len(&reader->buffer, ptr, count); ga_concat_len(&reader->buffer, ptr, count);
if (callback_reader_set(*reader) && !reader->buffered && !buffer_set) {
process_channel_event(chan, &reader->cb, type, reader, 0);
} }
} }
if (callback_reader_set(*reader)) {
schedule_channel_event(chan);
}
} }
static void on_buffered_error(void **args) /// schedule the necessary callbacks to be invoked as a deferred event
static void schedule_channel_event(Channel *chan)
{
if (!chan->callback_scheduled) {
if (!chan->callback_busy) {
multiqueue_put(chan->events, on_channel_event, 1, chan);
channel_incref(chan);
}
chan->callback_scheduled = true;
}
}
static void on_channel_event(void **args)
{ {
Channel *chan = (Channel *)args[0]; Channel *chan = (Channel *)args[0];
const char *stream = (const char *)args[1];
EMSG3(_(e_streamkey), stream, chan->id); chan->callback_busy = true;
chan->callback_scheduled = false;
int exit_status = chan->exit_status;
channel_reader_callbacks(chan, &chan->on_data);
channel_reader_callbacks(chan, &chan->on_stderr);
if (exit_status > -1) {
channel_callback_call(chan, NULL);
chan->exit_status = -1;
}
chan->callback_busy = false;
if (chan->callback_scheduled) {
// further callback was deferred to avoid recursion.
multiqueue_put(chan->events, on_channel_event, 1, chan);
channel_incref(chan);
}
channel_decref(chan); channel_decref(chan);
} }
void channel_reader_callbacks(Channel *chan, CallbackReader *reader)
{
if (reader->buffered) {
if (reader->eof) {
if (reader->self) {
if (tv_dict_find(reader->self, reader->type, -1) == NULL) {
list_T *data = buffer_to_tv_list(reader->buffer.ga_data,
(size_t)reader->buffer.ga_len);
tv_dict_add_list(reader->self, reader->type, strlen(reader->type),
data);
} else {
EMSG3(_(e_streamkey), reader->type, chan->id);
}
} else {
channel_callback_call(chan, reader);
}
reader->eof = false;
}
} else {
bool is_eof = reader->eof;
if (reader->buffer.ga_len > 0) {
channel_callback_call(chan, reader);
}
// if the stream reached eof, invoke extra callback with no data
if (is_eof) {
channel_callback_call(chan, reader);
reader->eof = false;
}
}
}
static void channel_process_exit_cb(Process *proc, int status, void *data) static void channel_process_exit_cb(Process *proc, int status, void *data)
{ {
Channel *chan = data; Channel *chan = data;
@ -637,45 +633,46 @@ static void channel_process_exit_cb(Process *proc, int status, void *data)
// If process did not exit, we only closed the handle of a detached process. // If process did not exit, we only closed the handle of a detached process.
bool exited = (status >= 0); bool exited = (status >= 0);
if (exited) { if (exited && chan->on_exit.type != kCallbackNone) {
process_channel_event(chan, &chan->on_exit, "exit", NULL, status); schedule_channel_event(chan);
chan->exit_status = status;
} }
channel_decref(chan); channel_decref(chan);
} }
static void on_channel_event(void **args) static void channel_callback_call(Channel *chan, CallbackReader *reader)
{ {
ChannelEvent *ev = (ChannelEvent *)args[0]; Callback *cb;
typval_T argv[4]; typval_T argv[4];
argv[0].v_type = VAR_NUMBER; argv[0].v_type = VAR_NUMBER;
argv[0].v_lock = VAR_UNLOCKED; argv[0].v_lock = VAR_UNLOCKED;
argv[0].vval.v_number = (varnumber_T)ev->chan->id; argv[0].vval.v_number = (varnumber_T)chan->id;
if (ev->reader) { if (reader) {
argv[1].v_type = VAR_LIST; argv[1].v_type = VAR_LIST;
argv[1].v_lock = VAR_UNLOCKED; argv[1].v_lock = VAR_UNLOCKED;
argv[1].vval.v_list = buffer_to_tv_list(ev->reader->buffer.ga_data, argv[1].vval.v_list = buffer_to_tv_list(reader->buffer.ga_data,
(size_t)ev->reader->buffer.ga_len); (size_t)reader->buffer.ga_len);
tv_list_ref(argv[1].vval.v_list); tv_list_ref(argv[1].vval.v_list);
ga_clear(&ev->reader->buffer); ga_clear(&reader->buffer);
cb = &reader->cb;
argv[2].vval.v_string = (char_u *)reader->type;
} else { } else {
argv[1].v_type = VAR_NUMBER; argv[1].v_type = VAR_NUMBER;
argv[1].v_lock = VAR_UNLOCKED; argv[1].v_lock = VAR_UNLOCKED;
argv[1].vval.v_number = ev->status; argv[1].vval.v_number = chan->exit_status;
cb = &chan->on_exit;
argv[2].vval.v_string = (char_u *)"exit";
} }
argv[2].v_type = VAR_STRING; argv[2].v_type = VAR_STRING;
argv[2].v_lock = VAR_UNLOCKED; argv[2].v_lock = VAR_UNLOCKED;
argv[2].vval.v_string = (uint8_t *)ev->type;
typval_T rettv = TV_INITIAL_VALUE; typval_T rettv = TV_INITIAL_VALUE;
callback_call(ev->callback, 3, argv, &rettv); callback_call(cb, 3, argv, &rettv);
tv_clear(&rettv); tv_clear(&rettv);
channel_decref(ev->chan);
xfree(ev);
} }

View File

@ -42,13 +42,16 @@ typedef struct {
Callback cb; Callback cb;
dict_T *self; dict_T *self;
garray_T buffer; garray_T buffer;
bool eof;
bool buffered; bool buffered;
const char *type;
} CallbackReader; } CallbackReader;
#define CALLBACK_READER_INIT ((CallbackReader){ .cb = CALLBACK_NONE, \ #define CALLBACK_READER_INIT ((CallbackReader){ .cb = CALLBACK_NONE, \
.self = NULL, \ .self = NULL, \
.buffer = GA_EMPTY_INIT_VALUE, \ .buffer = GA_EMPTY_INIT_VALUE, \
.buffered = false }) .buffered = false, \
.type = NULL })
static inline bool callback_reader_set(CallbackReader reader) static inline bool callback_reader_set(CallbackReader reader)
{ {
return reader.cb.type != kCallbackNone || reader.self; return reader.cb.type != kCallbackNone || reader.self;
@ -73,9 +76,13 @@ struct Channel {
RpcState rpc; RpcState rpc;
Terminal *term; Terminal *term;
CallbackReader on_stdout; CallbackReader on_data;
CallbackReader on_stderr; CallbackReader on_stderr;
Callback on_exit; Callback on_exit;
int exit_status;
bool callback_busy;
bool callback_scheduled;
}; };
EXTERN PMap(uint64_t) *channels; EXTERN PMap(uint64_t) *channels;

View File

@ -5165,7 +5165,7 @@ bool garbage_collect(bool testing)
{ {
Channel *data; Channel *data;
map_foreach_value(channels, data, { map_foreach_value(channels, data, {
set_ref_in_callback_reader(&data->on_stdout, copyID, NULL, NULL); set_ref_in_callback_reader(&data->on_data, copyID, NULL, NULL);
set_ref_in_callback_reader(&data->on_stderr, copyID, NULL, NULL); set_ref_in_callback_reader(&data->on_stderr, copyID, NULL, NULL);
set_ref_in_callback(&data->on_exit, copyID, NULL, NULL); set_ref_in_callback(&data->on_exit, copyID, NULL, NULL);
}) })

View File

@ -439,16 +439,66 @@ describe('jobs', function()
call add(self.data, Normalize(a:data)) call add(self.data, Normalize(a:data))
sleep 200m sleep 200m
endfunction endfunction
function! d.on_exit(job, data, event) dict
let g:exit_data = copy(self.data)
endfunction
if has('win32') if has('win32')
let cmd = 'for /L %I in (1,1,5) do @(echo %I& ping -n 2 127.0.0.1 > nul)' let cmd = 'for /L %I in (1,1,5) do @(echo %I& ping -n 2 127.0.0.1 > nul)'
else else
let cmd = ['sh', '-c', 'for i in $(seq 1 5); do echo $i; sleep 0.1; done'] let cmd = ['sh', '-c', 'for i in $(seq 1 5); do echo $i; sleep 0.1; done']
endif endif
call jobwait([jobstart(cmd, d)]) let g:id = jobstart(cmd, d)
sleep 1500m
call jobwait([g:id])
]]) ]])
local expected = {'1', '2', '3', '4', '5', ''} local expected = {'1', '2', '3', '4', '5', ''}
local chunks = eval('d.data') local chunks = eval('d.data')
-- check nothing was received after exit, including EOF
eq(eval('g:exit_data'), chunks)
local received = {''}
for i, chunk in ipairs(chunks) do
if i < #chunks then
-- if chunks got joined, a spurious [''] callback was not sent
neq({''}, chunk)
else
-- but EOF callback is still sent
eq({''}, chunk)
end
received[#received] = received[#received]..chunk[1]
for j = 2, #chunk do
received[#received+1] = chunk[j]
end
end
eq(expected, received)
end)
it('does not invoke callbacks recursively', function()
source([[
let d = {'data': []}
function! d.on_stdout(job, data, event) dict
" if callbacks were invoked recursively, this would cause on_stdout
" to be invoked recursively and the data reversed on the call stack
sleep 200m
call add(self.data, Normalize(a:data))
endfunction
function! d.on_exit(job, data, event) dict
let g:exit_data = copy(self.data)
endfunction
if has('win32')
let cmd = 'for /L %I in (1,1,5) do @(echo %I& ping -n 2 127.0.0.1 > nul)'
else
let cmd = ['sh', '-c', 'for i in $(seq 1 5); do echo $i; sleep 0.1; done']
endif
let g:id = jobstart(cmd, d)
sleep 1500m
call jobwait([g:id])
]])
local expected = {'1', '2', '3', '4', '5', ''}
local chunks = eval('d.data')
-- check nothing was received after exit, including EOF
eq(eval('g:exit_data'), chunks)
local received = {''} local received = {''}
for i, chunk in ipairs(chunks) do for i, chunk in ipairs(chunks) do
if i < #chunks then if i < #chunks then