mirror of
https://github.com/neovim/neovim.git
synced 2024-12-19 18:55:14 -07:00
Merge pull request #10021 from bfredl/chanevent
channel: refactor events, prevent recursive invocation of callbacks
This commit is contained in:
commit
149dcbf2c7
@ -22,19 +22,10 @@ PMap(uint64_t) *channels = NULL;
|
||||
/// 2 is reserved for stderr channel
|
||||
static uint64_t next_chan_id = CHAN_STDERR+1;
|
||||
|
||||
|
||||
typedef struct {
|
||||
Channel *chan;
|
||||
Callback *callback;
|
||||
const char *type;
|
||||
// if reader is set, status is ignored.
|
||||
CallbackReader *reader;
|
||||
int status;
|
||||
} ChannelEvent;
|
||||
|
||||
#ifdef INCLUDE_GENERATED_DECLARATIONS
|
||||
# include "channel.c.generated.h"
|
||||
#endif
|
||||
|
||||
/// Teardown the module
|
||||
void channel_teardown(void)
|
||||
{
|
||||
@ -179,6 +170,7 @@ static Channel *channel_alloc(ChannelStreamType type)
|
||||
}
|
||||
chan->events = multiqueue_new_child(main_loop.events);
|
||||
chan->refcount = 1;
|
||||
chan->exit_status = -1;
|
||||
chan->streamtype = type;
|
||||
pmap_put(uint64_t)(channels, chan->id, chan);
|
||||
return chan;
|
||||
@ -234,9 +226,10 @@ void callback_reader_free(CallbackReader *reader)
|
||||
ga_clear(&reader->buffer);
|
||||
}
|
||||
|
||||
void callback_reader_start(CallbackReader *reader)
|
||||
void callback_reader_start(CallbackReader *reader, const char *type)
|
||||
{
|
||||
ga_init(&reader->buffer, sizeof(char *), 32);
|
||||
reader->type = type;
|
||||
}
|
||||
|
||||
static void free_channel_event(void **argv)
|
||||
@ -246,7 +239,7 @@ static void free_channel_event(void **argv)
|
||||
rpc_free(chan);
|
||||
}
|
||||
|
||||
callback_reader_free(&chan->on_stdout);
|
||||
callback_reader_free(&chan->on_data);
|
||||
callback_reader_free(&chan->on_stderr);
|
||||
callback_free(&chan->on_exit);
|
||||
|
||||
@ -286,7 +279,7 @@ Channel *channel_job_start(char **argv, CallbackReader on_stdout,
|
||||
assert(cwd == NULL || os_isdir_executable(cwd));
|
||||
|
||||
Channel *chan = channel_alloc(kChannelStreamProc);
|
||||
chan->on_stdout = on_stdout;
|
||||
chan->on_data = on_stdout;
|
||||
chan->on_stderr = on_stderr;
|
||||
chan->on_exit = on_exit;
|
||||
|
||||
@ -326,7 +319,7 @@ Channel *channel_job_start(char **argv, CallbackReader on_stdout,
|
||||
has_out = true;
|
||||
has_err = false;
|
||||
} else {
|
||||
has_out = rpc || callback_reader_set(chan->on_stdout);
|
||||
has_out = rpc || callback_reader_set(chan->on_data);
|
||||
has_err = callback_reader_set(chan->on_stderr);
|
||||
}
|
||||
int status = process_spawn(proc, true, has_out, has_err);
|
||||
@ -352,13 +345,13 @@ Channel *channel_job_start(char **argv, CallbackReader on_stdout,
|
||||
rpc_start(chan);
|
||||
} else {
|
||||
if (has_out) {
|
||||
callback_reader_start(&chan->on_stdout);
|
||||
rstream_start(&proc->out, on_job_stdout, chan);
|
||||
callback_reader_start(&chan->on_data, "stdout");
|
||||
rstream_start(&proc->out, on_channel_data, chan);
|
||||
}
|
||||
}
|
||||
|
||||
if (has_err) {
|
||||
callback_reader_start(&chan->on_stderr);
|
||||
callback_reader_start(&chan->on_stderr, "stderr");
|
||||
rstream_init(&proc->err, 0);
|
||||
rstream_start(&proc->err, on_job_stderr, chan);
|
||||
}
|
||||
@ -402,9 +395,9 @@ uint64_t channel_connect(bool tcp, const char *address,
|
||||
if (rpc) {
|
||||
rpc_start(channel);
|
||||
} else {
|
||||
channel->on_stdout = on_output;
|
||||
callback_reader_start(&channel->on_stdout);
|
||||
rstream_start(&channel->stream.socket, on_socket_output, channel);
|
||||
channel->on_data = on_output;
|
||||
callback_reader_start(&channel->on_data, "data");
|
||||
rstream_start(&channel->stream.socket, on_channel_data, channel);
|
||||
}
|
||||
|
||||
end:
|
||||
@ -452,9 +445,9 @@ uint64_t channel_from_stdio(bool rpc, CallbackReader on_output,
|
||||
if (rpc) {
|
||||
rpc_start(channel);
|
||||
} else {
|
||||
channel->on_stdout = on_output;
|
||||
callback_reader_start(&channel->on_stdout);
|
||||
rstream_start(&channel->stream.stdio.in, on_stdio_input, channel);
|
||||
channel->on_data = on_output;
|
||||
callback_reader_start(&channel->on_data, "stdin");
|
||||
rstream_start(&channel->stream.stdio.in, on_channel_data, channel);
|
||||
}
|
||||
|
||||
return channel->id;
|
||||
@ -519,55 +512,22 @@ static inline list_T *buffer_to_tv_list(const char *const buf, const size_t len)
|
||||
return l;
|
||||
}
|
||||
|
||||
// vimscript job callbacks must be executed on Nvim main loop
|
||||
static inline void process_channel_event(Channel *chan, Callback *callback,
|
||||
const char *type,
|
||||
CallbackReader *reader, int status)
|
||||
{
|
||||
assert(callback);
|
||||
ChannelEvent *event_data = xmalloc(sizeof(*event_data));
|
||||
event_data->reader = reader;
|
||||
event_data->status = status;
|
||||
channel_incref(chan); // Hold on ref to callback
|
||||
event_data->chan = chan;
|
||||
event_data->callback = callback;
|
||||
event_data->type = type;
|
||||
|
||||
multiqueue_put(chan->events, on_channel_event, 1, event_data);
|
||||
}
|
||||
|
||||
void on_job_stdout(Stream *stream, RBuffer *buf, size_t count,
|
||||
void *data, bool eof)
|
||||
void on_channel_data(Stream *stream, RBuffer *buf, size_t count,
|
||||
void *data, bool eof)
|
||||
{
|
||||
Channel *chan = data;
|
||||
on_channel_output(stream, chan, buf, count, eof, &chan->on_stdout, "stdout");
|
||||
on_channel_output(stream, chan, buf, count, eof, &chan->on_data);
|
||||
}
|
||||
|
||||
void on_job_stderr(Stream *stream, RBuffer *buf, size_t count,
|
||||
void *data, bool eof)
|
||||
{
|
||||
Channel *chan = data;
|
||||
on_channel_output(stream, chan, buf, count, eof, &chan->on_stderr, "stderr");
|
||||
on_channel_output(stream, chan, buf, count, eof, &chan->on_stderr);
|
||||
}
|
||||
|
||||
static void on_socket_output(Stream *stream, RBuffer *buf, size_t count,
|
||||
void *data, bool eof)
|
||||
{
|
||||
Channel *chan = data;
|
||||
on_channel_output(stream, chan, buf, count, eof, &chan->on_stdout, "data");
|
||||
}
|
||||
|
||||
static void on_stdio_input(Stream *stream, RBuffer *buf, size_t count,
|
||||
void *data, bool eof)
|
||||
{
|
||||
Channel *chan = data;
|
||||
on_channel_output(stream, chan, buf, count, eof, &chan->on_stdout, "stdin");
|
||||
}
|
||||
|
||||
/// @param type must have static lifetime
|
||||
static void on_channel_output(Stream *stream, Channel *chan, RBuffer *buf,
|
||||
size_t count, bool eof, CallbackReader *reader,
|
||||
const char *type)
|
||||
size_t count, bool eof, CallbackReader *reader)
|
||||
{
|
||||
// stub variable, to keep reading consistent with the order of events, only
|
||||
// consider the count parameter.
|
||||
@ -575,57 +535,93 @@ static void on_channel_output(Stream *stream, Channel *chan, RBuffer *buf,
|
||||
char *ptr = rbuffer_read_ptr(buf, &r);
|
||||
|
||||
if (eof) {
|
||||
if (reader->buffered) {
|
||||
if (reader->cb.type != kCallbackNone) {
|
||||
process_channel_event(chan, &reader->cb, type, reader, 0);
|
||||
} else if (reader->self) {
|
||||
if (tv_dict_find(reader->self, type, -1) == NULL) {
|
||||
list_T *data = buffer_to_tv_list(reader->buffer.ga_data,
|
||||
(size_t)reader->buffer.ga_len);
|
||||
tv_dict_add_list(reader->self, type, strlen(type), data);
|
||||
} else {
|
||||
// can't display error message now, defer it.
|
||||
channel_incref(chan);
|
||||
multiqueue_put(chan->events, on_buffered_error, 2, chan, type);
|
||||
}
|
||||
ga_clear(&reader->buffer);
|
||||
} else {
|
||||
abort();
|
||||
}
|
||||
} else if (reader->cb.type != kCallbackNone) {
|
||||
process_channel_event(chan, &reader->cb, type, reader, 0);
|
||||
reader->eof = true;
|
||||
} else {
|
||||
if (chan->term) {
|
||||
terminal_receive(chan->term, ptr, count);
|
||||
terminal_flush_output(chan->term);
|
||||
}
|
||||
|
||||
rbuffer_consumed(buf, count);
|
||||
|
||||
if (callback_reader_set(*reader)) {
|
||||
ga_concat_len(&reader->buffer, ptr, count);
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
// The order here matters, the terminal must receive the data first because
|
||||
// process_channel_event will modify the read buffer(convert NULs into NLs)
|
||||
if (chan->term) {
|
||||
terminal_receive(chan->term, ptr, count);
|
||||
terminal_flush_output(chan->term);
|
||||
}
|
||||
|
||||
rbuffer_consumed(buf, count);
|
||||
|
||||
if (callback_reader_set(*reader) || reader->buffered) {
|
||||
// if buffer wasn't consumed, a pending callback is stalled. Aggregate the
|
||||
// received data and avoid a "burst" of multiple callbacks.
|
||||
bool buffer_set = reader->buffer.ga_len > 0;
|
||||
ga_concat_len(&reader->buffer, ptr, count);
|
||||
if (callback_reader_set(*reader) && !reader->buffered && !buffer_set) {
|
||||
process_channel_event(chan, &reader->cb, type, reader, 0);
|
||||
}
|
||||
if (callback_reader_set(*reader)) {
|
||||
schedule_channel_event(chan);
|
||||
}
|
||||
}
|
||||
|
||||
static void on_buffered_error(void **args)
|
||||
/// schedule the necessary callbacks to be invoked as a deferred event
|
||||
static void schedule_channel_event(Channel *chan)
|
||||
{
|
||||
if (!chan->callback_scheduled) {
|
||||
if (!chan->callback_busy) {
|
||||
multiqueue_put(chan->events, on_channel_event, 1, chan);
|
||||
channel_incref(chan);
|
||||
}
|
||||
chan->callback_scheduled = true;
|
||||
}
|
||||
}
|
||||
|
||||
static void on_channel_event(void **args)
|
||||
{
|
||||
Channel *chan = (Channel *)args[0];
|
||||
const char *stream = (const char *)args[1];
|
||||
EMSG3(_(e_streamkey), stream, chan->id);
|
||||
|
||||
chan->callback_busy = true;
|
||||
chan->callback_scheduled = false;
|
||||
|
||||
int exit_status = chan->exit_status;
|
||||
channel_reader_callbacks(chan, &chan->on_data);
|
||||
channel_reader_callbacks(chan, &chan->on_stderr);
|
||||
if (exit_status > -1) {
|
||||
channel_callback_call(chan, NULL);
|
||||
chan->exit_status = -1;
|
||||
}
|
||||
|
||||
chan->callback_busy = false;
|
||||
if (chan->callback_scheduled) {
|
||||
// further callback was deferred to avoid recursion.
|
||||
multiqueue_put(chan->events, on_channel_event, 1, chan);
|
||||
channel_incref(chan);
|
||||
}
|
||||
|
||||
channel_decref(chan);
|
||||
}
|
||||
|
||||
void channel_reader_callbacks(Channel *chan, CallbackReader *reader)
|
||||
{
|
||||
if (reader->buffered) {
|
||||
if (reader->eof) {
|
||||
if (reader->self) {
|
||||
if (tv_dict_find(reader->self, reader->type, -1) == NULL) {
|
||||
list_T *data = buffer_to_tv_list(reader->buffer.ga_data,
|
||||
(size_t)reader->buffer.ga_len);
|
||||
tv_dict_add_list(reader->self, reader->type, strlen(reader->type),
|
||||
data);
|
||||
} else {
|
||||
EMSG3(_(e_streamkey), reader->type, chan->id);
|
||||
}
|
||||
} else {
|
||||
channel_callback_call(chan, reader);
|
||||
}
|
||||
reader->eof = false;
|
||||
}
|
||||
} else {
|
||||
bool is_eof = reader->eof;
|
||||
if (reader->buffer.ga_len > 0) {
|
||||
channel_callback_call(chan, reader);
|
||||
}
|
||||
// if the stream reached eof, invoke extra callback with no data
|
||||
if (is_eof) {
|
||||
channel_callback_call(chan, reader);
|
||||
reader->eof = false;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
static void channel_process_exit_cb(Process *proc, int status, void *data)
|
||||
{
|
||||
Channel *chan = data;
|
||||
@ -637,45 +633,46 @@ static void channel_process_exit_cb(Process *proc, int status, void *data)
|
||||
|
||||
// If process did not exit, we only closed the handle of a detached process.
|
||||
bool exited = (status >= 0);
|
||||
if (exited) {
|
||||
process_channel_event(chan, &chan->on_exit, "exit", NULL, status);
|
||||
if (exited && chan->on_exit.type != kCallbackNone) {
|
||||
schedule_channel_event(chan);
|
||||
chan->exit_status = status;
|
||||
}
|
||||
|
||||
channel_decref(chan);
|
||||
}
|
||||
|
||||
static void on_channel_event(void **args)
|
||||
static void channel_callback_call(Channel *chan, CallbackReader *reader)
|
||||
{
|
||||
ChannelEvent *ev = (ChannelEvent *)args[0];
|
||||
|
||||
Callback *cb;
|
||||
typval_T argv[4];
|
||||
|
||||
argv[0].v_type = VAR_NUMBER;
|
||||
argv[0].v_lock = VAR_UNLOCKED;
|
||||
argv[0].vval.v_number = (varnumber_T)ev->chan->id;
|
||||
argv[0].vval.v_number = (varnumber_T)chan->id;
|
||||
|
||||
if (ev->reader) {
|
||||
if (reader) {
|
||||
argv[1].v_type = VAR_LIST;
|
||||
argv[1].v_lock = VAR_UNLOCKED;
|
||||
argv[1].vval.v_list = buffer_to_tv_list(ev->reader->buffer.ga_data,
|
||||
(size_t)ev->reader->buffer.ga_len);
|
||||
argv[1].vval.v_list = buffer_to_tv_list(reader->buffer.ga_data,
|
||||
(size_t)reader->buffer.ga_len);
|
||||
tv_list_ref(argv[1].vval.v_list);
|
||||
ga_clear(&ev->reader->buffer);
|
||||
ga_clear(&reader->buffer);
|
||||
cb = &reader->cb;
|
||||
argv[2].vval.v_string = (char_u *)reader->type;
|
||||
} else {
|
||||
argv[1].v_type = VAR_NUMBER;
|
||||
argv[1].v_lock = VAR_UNLOCKED;
|
||||
argv[1].vval.v_number = ev->status;
|
||||
argv[1].vval.v_number = chan->exit_status;
|
||||
cb = &chan->on_exit;
|
||||
argv[2].vval.v_string = (char_u *)"exit";
|
||||
}
|
||||
|
||||
argv[2].v_type = VAR_STRING;
|
||||
argv[2].v_lock = VAR_UNLOCKED;
|
||||
argv[2].vval.v_string = (uint8_t *)ev->type;
|
||||
|
||||
typval_T rettv = TV_INITIAL_VALUE;
|
||||
callback_call(ev->callback, 3, argv, &rettv);
|
||||
callback_call(cb, 3, argv, &rettv);
|
||||
tv_clear(&rettv);
|
||||
channel_decref(ev->chan);
|
||||
xfree(ev);
|
||||
}
|
||||
|
||||
|
||||
|
@ -42,13 +42,16 @@ typedef struct {
|
||||
Callback cb;
|
||||
dict_T *self;
|
||||
garray_T buffer;
|
||||
bool eof;
|
||||
bool buffered;
|
||||
const char *type;
|
||||
} CallbackReader;
|
||||
|
||||
#define CALLBACK_READER_INIT ((CallbackReader){ .cb = CALLBACK_NONE, \
|
||||
.self = NULL, \
|
||||
.buffer = GA_EMPTY_INIT_VALUE, \
|
||||
.buffered = false })
|
||||
.buffered = false, \
|
||||
.type = NULL })
|
||||
static inline bool callback_reader_set(CallbackReader reader)
|
||||
{
|
||||
return reader.cb.type != kCallbackNone || reader.self;
|
||||
@ -73,9 +76,13 @@ struct Channel {
|
||||
RpcState rpc;
|
||||
Terminal *term;
|
||||
|
||||
CallbackReader on_stdout;
|
||||
CallbackReader on_data;
|
||||
CallbackReader on_stderr;
|
||||
Callback on_exit;
|
||||
int exit_status;
|
||||
|
||||
bool callback_busy;
|
||||
bool callback_scheduled;
|
||||
};
|
||||
|
||||
EXTERN PMap(uint64_t) *channels;
|
||||
|
@ -5165,7 +5165,7 @@ bool garbage_collect(bool testing)
|
||||
{
|
||||
Channel *data;
|
||||
map_foreach_value(channels, data, {
|
||||
set_ref_in_callback_reader(&data->on_stdout, copyID, NULL, NULL);
|
||||
set_ref_in_callback_reader(&data->on_data, copyID, NULL, NULL);
|
||||
set_ref_in_callback_reader(&data->on_stderr, copyID, NULL, NULL);
|
||||
set_ref_in_callback(&data->on_exit, copyID, NULL, NULL);
|
||||
})
|
||||
|
@ -439,16 +439,66 @@ describe('jobs', function()
|
||||
call add(self.data, Normalize(a:data))
|
||||
sleep 200m
|
||||
endfunction
|
||||
function! d.on_exit(job, data, event) dict
|
||||
let g:exit_data = copy(self.data)
|
||||
endfunction
|
||||
if has('win32')
|
||||
let cmd = 'for /L %I in (1,1,5) do @(echo %I& ping -n 2 127.0.0.1 > nul)'
|
||||
else
|
||||
let cmd = ['sh', '-c', 'for i in $(seq 1 5); do echo $i; sleep 0.1; done']
|
||||
endif
|
||||
call jobwait([jobstart(cmd, d)])
|
||||
let g:id = jobstart(cmd, d)
|
||||
sleep 1500m
|
||||
call jobwait([g:id])
|
||||
]])
|
||||
|
||||
local expected = {'1', '2', '3', '4', '5', ''}
|
||||
local chunks = eval('d.data')
|
||||
-- check nothing was received after exit, including EOF
|
||||
eq(eval('g:exit_data'), chunks)
|
||||
local received = {''}
|
||||
for i, chunk in ipairs(chunks) do
|
||||
if i < #chunks then
|
||||
-- if chunks got joined, a spurious [''] callback was not sent
|
||||
neq({''}, chunk)
|
||||
else
|
||||
-- but EOF callback is still sent
|
||||
eq({''}, chunk)
|
||||
end
|
||||
received[#received] = received[#received]..chunk[1]
|
||||
for j = 2, #chunk do
|
||||
received[#received+1] = chunk[j]
|
||||
end
|
||||
end
|
||||
eq(expected, received)
|
||||
end)
|
||||
|
||||
it('does not invoke callbacks recursively', function()
|
||||
source([[
|
||||
let d = {'data': []}
|
||||
function! d.on_stdout(job, data, event) dict
|
||||
" if callbacks were invoked recursively, this would cause on_stdout
|
||||
" to be invoked recursively and the data reversed on the call stack
|
||||
sleep 200m
|
||||
call add(self.data, Normalize(a:data))
|
||||
endfunction
|
||||
function! d.on_exit(job, data, event) dict
|
||||
let g:exit_data = copy(self.data)
|
||||
endfunction
|
||||
if has('win32')
|
||||
let cmd = 'for /L %I in (1,1,5) do @(echo %I& ping -n 2 127.0.0.1 > nul)'
|
||||
else
|
||||
let cmd = ['sh', '-c', 'for i in $(seq 1 5); do echo $i; sleep 0.1; done']
|
||||
endif
|
||||
let g:id = jobstart(cmd, d)
|
||||
sleep 1500m
|
||||
call jobwait([g:id])
|
||||
]])
|
||||
|
||||
local expected = {'1', '2', '3', '4', '5', ''}
|
||||
local chunks = eval('d.data')
|
||||
-- check nothing was received after exit, including EOF
|
||||
eq(eval('g:exit_data'), chunks)
|
||||
local received = {''}
|
||||
for i, chunk in ipairs(chunks) do
|
||||
if i < #chunks then
|
||||
|
Loading…
Reference in New Issue
Block a user