mirror of
https://github.com/neovim/neovim.git
synced 2024-12-24 05:05:00 -07:00
Merge pull request #10021 from bfredl/chanevent
channel: refactor events, prevent recursive invocation of callbacks
This commit is contained in:
commit
149dcbf2c7
@ -22,19 +22,10 @@ PMap(uint64_t) *channels = NULL;
|
|||||||
/// 2 is reserved for stderr channel
|
/// 2 is reserved for stderr channel
|
||||||
static uint64_t next_chan_id = CHAN_STDERR+1;
|
static uint64_t next_chan_id = CHAN_STDERR+1;
|
||||||
|
|
||||||
|
|
||||||
typedef struct {
|
|
||||||
Channel *chan;
|
|
||||||
Callback *callback;
|
|
||||||
const char *type;
|
|
||||||
// if reader is set, status is ignored.
|
|
||||||
CallbackReader *reader;
|
|
||||||
int status;
|
|
||||||
} ChannelEvent;
|
|
||||||
|
|
||||||
#ifdef INCLUDE_GENERATED_DECLARATIONS
|
#ifdef INCLUDE_GENERATED_DECLARATIONS
|
||||||
# include "channel.c.generated.h"
|
# include "channel.c.generated.h"
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
/// Teardown the module
|
/// Teardown the module
|
||||||
void channel_teardown(void)
|
void channel_teardown(void)
|
||||||
{
|
{
|
||||||
@ -179,6 +170,7 @@ static Channel *channel_alloc(ChannelStreamType type)
|
|||||||
}
|
}
|
||||||
chan->events = multiqueue_new_child(main_loop.events);
|
chan->events = multiqueue_new_child(main_loop.events);
|
||||||
chan->refcount = 1;
|
chan->refcount = 1;
|
||||||
|
chan->exit_status = -1;
|
||||||
chan->streamtype = type;
|
chan->streamtype = type;
|
||||||
pmap_put(uint64_t)(channels, chan->id, chan);
|
pmap_put(uint64_t)(channels, chan->id, chan);
|
||||||
return chan;
|
return chan;
|
||||||
@ -234,9 +226,10 @@ void callback_reader_free(CallbackReader *reader)
|
|||||||
ga_clear(&reader->buffer);
|
ga_clear(&reader->buffer);
|
||||||
}
|
}
|
||||||
|
|
||||||
void callback_reader_start(CallbackReader *reader)
|
void callback_reader_start(CallbackReader *reader, const char *type)
|
||||||
{
|
{
|
||||||
ga_init(&reader->buffer, sizeof(char *), 32);
|
ga_init(&reader->buffer, sizeof(char *), 32);
|
||||||
|
reader->type = type;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void free_channel_event(void **argv)
|
static void free_channel_event(void **argv)
|
||||||
@ -246,7 +239,7 @@ static void free_channel_event(void **argv)
|
|||||||
rpc_free(chan);
|
rpc_free(chan);
|
||||||
}
|
}
|
||||||
|
|
||||||
callback_reader_free(&chan->on_stdout);
|
callback_reader_free(&chan->on_data);
|
||||||
callback_reader_free(&chan->on_stderr);
|
callback_reader_free(&chan->on_stderr);
|
||||||
callback_free(&chan->on_exit);
|
callback_free(&chan->on_exit);
|
||||||
|
|
||||||
@ -286,7 +279,7 @@ Channel *channel_job_start(char **argv, CallbackReader on_stdout,
|
|||||||
assert(cwd == NULL || os_isdir_executable(cwd));
|
assert(cwd == NULL || os_isdir_executable(cwd));
|
||||||
|
|
||||||
Channel *chan = channel_alloc(kChannelStreamProc);
|
Channel *chan = channel_alloc(kChannelStreamProc);
|
||||||
chan->on_stdout = on_stdout;
|
chan->on_data = on_stdout;
|
||||||
chan->on_stderr = on_stderr;
|
chan->on_stderr = on_stderr;
|
||||||
chan->on_exit = on_exit;
|
chan->on_exit = on_exit;
|
||||||
|
|
||||||
@ -326,7 +319,7 @@ Channel *channel_job_start(char **argv, CallbackReader on_stdout,
|
|||||||
has_out = true;
|
has_out = true;
|
||||||
has_err = false;
|
has_err = false;
|
||||||
} else {
|
} else {
|
||||||
has_out = rpc || callback_reader_set(chan->on_stdout);
|
has_out = rpc || callback_reader_set(chan->on_data);
|
||||||
has_err = callback_reader_set(chan->on_stderr);
|
has_err = callback_reader_set(chan->on_stderr);
|
||||||
}
|
}
|
||||||
int status = process_spawn(proc, true, has_out, has_err);
|
int status = process_spawn(proc, true, has_out, has_err);
|
||||||
@ -352,13 +345,13 @@ Channel *channel_job_start(char **argv, CallbackReader on_stdout,
|
|||||||
rpc_start(chan);
|
rpc_start(chan);
|
||||||
} else {
|
} else {
|
||||||
if (has_out) {
|
if (has_out) {
|
||||||
callback_reader_start(&chan->on_stdout);
|
callback_reader_start(&chan->on_data, "stdout");
|
||||||
rstream_start(&proc->out, on_job_stdout, chan);
|
rstream_start(&proc->out, on_channel_data, chan);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (has_err) {
|
if (has_err) {
|
||||||
callback_reader_start(&chan->on_stderr);
|
callback_reader_start(&chan->on_stderr, "stderr");
|
||||||
rstream_init(&proc->err, 0);
|
rstream_init(&proc->err, 0);
|
||||||
rstream_start(&proc->err, on_job_stderr, chan);
|
rstream_start(&proc->err, on_job_stderr, chan);
|
||||||
}
|
}
|
||||||
@ -402,9 +395,9 @@ uint64_t channel_connect(bool tcp, const char *address,
|
|||||||
if (rpc) {
|
if (rpc) {
|
||||||
rpc_start(channel);
|
rpc_start(channel);
|
||||||
} else {
|
} else {
|
||||||
channel->on_stdout = on_output;
|
channel->on_data = on_output;
|
||||||
callback_reader_start(&channel->on_stdout);
|
callback_reader_start(&channel->on_data, "data");
|
||||||
rstream_start(&channel->stream.socket, on_socket_output, channel);
|
rstream_start(&channel->stream.socket, on_channel_data, channel);
|
||||||
}
|
}
|
||||||
|
|
||||||
end:
|
end:
|
||||||
@ -452,9 +445,9 @@ uint64_t channel_from_stdio(bool rpc, CallbackReader on_output,
|
|||||||
if (rpc) {
|
if (rpc) {
|
||||||
rpc_start(channel);
|
rpc_start(channel);
|
||||||
} else {
|
} else {
|
||||||
channel->on_stdout = on_output;
|
channel->on_data = on_output;
|
||||||
callback_reader_start(&channel->on_stdout);
|
callback_reader_start(&channel->on_data, "stdin");
|
||||||
rstream_start(&channel->stream.stdio.in, on_stdio_input, channel);
|
rstream_start(&channel->stream.stdio.in, on_channel_data, channel);
|
||||||
}
|
}
|
||||||
|
|
||||||
return channel->id;
|
return channel->id;
|
||||||
@ -519,55 +512,22 @@ static inline list_T *buffer_to_tv_list(const char *const buf, const size_t len)
|
|||||||
return l;
|
return l;
|
||||||
}
|
}
|
||||||
|
|
||||||
// vimscript job callbacks must be executed on Nvim main loop
|
void on_channel_data(Stream *stream, RBuffer *buf, size_t count,
|
||||||
static inline void process_channel_event(Channel *chan, Callback *callback,
|
void *data, bool eof)
|
||||||
const char *type,
|
|
||||||
CallbackReader *reader, int status)
|
|
||||||
{
|
|
||||||
assert(callback);
|
|
||||||
ChannelEvent *event_data = xmalloc(sizeof(*event_data));
|
|
||||||
event_data->reader = reader;
|
|
||||||
event_data->status = status;
|
|
||||||
channel_incref(chan); // Hold on ref to callback
|
|
||||||
event_data->chan = chan;
|
|
||||||
event_data->callback = callback;
|
|
||||||
event_data->type = type;
|
|
||||||
|
|
||||||
multiqueue_put(chan->events, on_channel_event, 1, event_data);
|
|
||||||
}
|
|
||||||
|
|
||||||
void on_job_stdout(Stream *stream, RBuffer *buf, size_t count,
|
|
||||||
void *data, bool eof)
|
|
||||||
{
|
{
|
||||||
Channel *chan = data;
|
Channel *chan = data;
|
||||||
on_channel_output(stream, chan, buf, count, eof, &chan->on_stdout, "stdout");
|
on_channel_output(stream, chan, buf, count, eof, &chan->on_data);
|
||||||
}
|
}
|
||||||
|
|
||||||
void on_job_stderr(Stream *stream, RBuffer *buf, size_t count,
|
void on_job_stderr(Stream *stream, RBuffer *buf, size_t count,
|
||||||
void *data, bool eof)
|
void *data, bool eof)
|
||||||
{
|
{
|
||||||
Channel *chan = data;
|
Channel *chan = data;
|
||||||
on_channel_output(stream, chan, buf, count, eof, &chan->on_stderr, "stderr");
|
on_channel_output(stream, chan, buf, count, eof, &chan->on_stderr);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void on_socket_output(Stream *stream, RBuffer *buf, size_t count,
|
|
||||||
void *data, bool eof)
|
|
||||||
{
|
|
||||||
Channel *chan = data;
|
|
||||||
on_channel_output(stream, chan, buf, count, eof, &chan->on_stdout, "data");
|
|
||||||
}
|
|
||||||
|
|
||||||
static void on_stdio_input(Stream *stream, RBuffer *buf, size_t count,
|
|
||||||
void *data, bool eof)
|
|
||||||
{
|
|
||||||
Channel *chan = data;
|
|
||||||
on_channel_output(stream, chan, buf, count, eof, &chan->on_stdout, "stdin");
|
|
||||||
}
|
|
||||||
|
|
||||||
/// @param type must have static lifetime
|
|
||||||
static void on_channel_output(Stream *stream, Channel *chan, RBuffer *buf,
|
static void on_channel_output(Stream *stream, Channel *chan, RBuffer *buf,
|
||||||
size_t count, bool eof, CallbackReader *reader,
|
size_t count, bool eof, CallbackReader *reader)
|
||||||
const char *type)
|
|
||||||
{
|
{
|
||||||
// stub variable, to keep reading consistent with the order of events, only
|
// stub variable, to keep reading consistent with the order of events, only
|
||||||
// consider the count parameter.
|
// consider the count parameter.
|
||||||
@ -575,57 +535,93 @@ static void on_channel_output(Stream *stream, Channel *chan, RBuffer *buf,
|
|||||||
char *ptr = rbuffer_read_ptr(buf, &r);
|
char *ptr = rbuffer_read_ptr(buf, &r);
|
||||||
|
|
||||||
if (eof) {
|
if (eof) {
|
||||||
if (reader->buffered) {
|
reader->eof = true;
|
||||||
if (reader->cb.type != kCallbackNone) {
|
} else {
|
||||||
process_channel_event(chan, &reader->cb, type, reader, 0);
|
if (chan->term) {
|
||||||
} else if (reader->self) {
|
terminal_receive(chan->term, ptr, count);
|
||||||
if (tv_dict_find(reader->self, type, -1) == NULL) {
|
terminal_flush_output(chan->term);
|
||||||
list_T *data = buffer_to_tv_list(reader->buffer.ga_data,
|
}
|
||||||
(size_t)reader->buffer.ga_len);
|
|
||||||
tv_dict_add_list(reader->self, type, strlen(type), data);
|
rbuffer_consumed(buf, count);
|
||||||
} else {
|
|
||||||
// can't display error message now, defer it.
|
if (callback_reader_set(*reader)) {
|
||||||
channel_incref(chan);
|
ga_concat_len(&reader->buffer, ptr, count);
|
||||||
multiqueue_put(chan->events, on_buffered_error, 2, chan, type);
|
|
||||||
}
|
|
||||||
ga_clear(&reader->buffer);
|
|
||||||
} else {
|
|
||||||
abort();
|
|
||||||
}
|
|
||||||
} else if (reader->cb.type != kCallbackNone) {
|
|
||||||
process_channel_event(chan, &reader->cb, type, reader, 0);
|
|
||||||
}
|
}
|
||||||
return;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// The order here matters, the terminal must receive the data first because
|
if (callback_reader_set(*reader)) {
|
||||||
// process_channel_event will modify the read buffer(convert NULs into NLs)
|
schedule_channel_event(chan);
|
||||||
if (chan->term) {
|
|
||||||
terminal_receive(chan->term, ptr, count);
|
|
||||||
terminal_flush_output(chan->term);
|
|
||||||
}
|
|
||||||
|
|
||||||
rbuffer_consumed(buf, count);
|
|
||||||
|
|
||||||
if (callback_reader_set(*reader) || reader->buffered) {
|
|
||||||
// if buffer wasn't consumed, a pending callback is stalled. Aggregate the
|
|
||||||
// received data and avoid a "burst" of multiple callbacks.
|
|
||||||
bool buffer_set = reader->buffer.ga_len > 0;
|
|
||||||
ga_concat_len(&reader->buffer, ptr, count);
|
|
||||||
if (callback_reader_set(*reader) && !reader->buffered && !buffer_set) {
|
|
||||||
process_channel_event(chan, &reader->cb, type, reader, 0);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static void on_buffered_error(void **args)
|
/// schedule the necessary callbacks to be invoked as a deferred event
|
||||||
|
static void schedule_channel_event(Channel *chan)
|
||||||
|
{
|
||||||
|
if (!chan->callback_scheduled) {
|
||||||
|
if (!chan->callback_busy) {
|
||||||
|
multiqueue_put(chan->events, on_channel_event, 1, chan);
|
||||||
|
channel_incref(chan);
|
||||||
|
}
|
||||||
|
chan->callback_scheduled = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static void on_channel_event(void **args)
|
||||||
{
|
{
|
||||||
Channel *chan = (Channel *)args[0];
|
Channel *chan = (Channel *)args[0];
|
||||||
const char *stream = (const char *)args[1];
|
|
||||||
EMSG3(_(e_streamkey), stream, chan->id);
|
chan->callback_busy = true;
|
||||||
|
chan->callback_scheduled = false;
|
||||||
|
|
||||||
|
int exit_status = chan->exit_status;
|
||||||
|
channel_reader_callbacks(chan, &chan->on_data);
|
||||||
|
channel_reader_callbacks(chan, &chan->on_stderr);
|
||||||
|
if (exit_status > -1) {
|
||||||
|
channel_callback_call(chan, NULL);
|
||||||
|
chan->exit_status = -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
chan->callback_busy = false;
|
||||||
|
if (chan->callback_scheduled) {
|
||||||
|
// further callback was deferred to avoid recursion.
|
||||||
|
multiqueue_put(chan->events, on_channel_event, 1, chan);
|
||||||
|
channel_incref(chan);
|
||||||
|
}
|
||||||
|
|
||||||
channel_decref(chan);
|
channel_decref(chan);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void channel_reader_callbacks(Channel *chan, CallbackReader *reader)
|
||||||
|
{
|
||||||
|
if (reader->buffered) {
|
||||||
|
if (reader->eof) {
|
||||||
|
if (reader->self) {
|
||||||
|
if (tv_dict_find(reader->self, reader->type, -1) == NULL) {
|
||||||
|
list_T *data = buffer_to_tv_list(reader->buffer.ga_data,
|
||||||
|
(size_t)reader->buffer.ga_len);
|
||||||
|
tv_dict_add_list(reader->self, reader->type, strlen(reader->type),
|
||||||
|
data);
|
||||||
|
} else {
|
||||||
|
EMSG3(_(e_streamkey), reader->type, chan->id);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
channel_callback_call(chan, reader);
|
||||||
|
}
|
||||||
|
reader->eof = false;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
bool is_eof = reader->eof;
|
||||||
|
if (reader->buffer.ga_len > 0) {
|
||||||
|
channel_callback_call(chan, reader);
|
||||||
|
}
|
||||||
|
// if the stream reached eof, invoke extra callback with no data
|
||||||
|
if (is_eof) {
|
||||||
|
channel_callback_call(chan, reader);
|
||||||
|
reader->eof = false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
static void channel_process_exit_cb(Process *proc, int status, void *data)
|
static void channel_process_exit_cb(Process *proc, int status, void *data)
|
||||||
{
|
{
|
||||||
Channel *chan = data;
|
Channel *chan = data;
|
||||||
@ -637,45 +633,46 @@ static void channel_process_exit_cb(Process *proc, int status, void *data)
|
|||||||
|
|
||||||
// If process did not exit, we only closed the handle of a detached process.
|
// If process did not exit, we only closed the handle of a detached process.
|
||||||
bool exited = (status >= 0);
|
bool exited = (status >= 0);
|
||||||
if (exited) {
|
if (exited && chan->on_exit.type != kCallbackNone) {
|
||||||
process_channel_event(chan, &chan->on_exit, "exit", NULL, status);
|
schedule_channel_event(chan);
|
||||||
|
chan->exit_status = status;
|
||||||
}
|
}
|
||||||
|
|
||||||
channel_decref(chan);
|
channel_decref(chan);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void on_channel_event(void **args)
|
static void channel_callback_call(Channel *chan, CallbackReader *reader)
|
||||||
{
|
{
|
||||||
ChannelEvent *ev = (ChannelEvent *)args[0];
|
Callback *cb;
|
||||||
|
|
||||||
typval_T argv[4];
|
typval_T argv[4];
|
||||||
|
|
||||||
argv[0].v_type = VAR_NUMBER;
|
argv[0].v_type = VAR_NUMBER;
|
||||||
argv[0].v_lock = VAR_UNLOCKED;
|
argv[0].v_lock = VAR_UNLOCKED;
|
||||||
argv[0].vval.v_number = (varnumber_T)ev->chan->id;
|
argv[0].vval.v_number = (varnumber_T)chan->id;
|
||||||
|
|
||||||
if (ev->reader) {
|
if (reader) {
|
||||||
argv[1].v_type = VAR_LIST;
|
argv[1].v_type = VAR_LIST;
|
||||||
argv[1].v_lock = VAR_UNLOCKED;
|
argv[1].v_lock = VAR_UNLOCKED;
|
||||||
argv[1].vval.v_list = buffer_to_tv_list(ev->reader->buffer.ga_data,
|
argv[1].vval.v_list = buffer_to_tv_list(reader->buffer.ga_data,
|
||||||
(size_t)ev->reader->buffer.ga_len);
|
(size_t)reader->buffer.ga_len);
|
||||||
tv_list_ref(argv[1].vval.v_list);
|
tv_list_ref(argv[1].vval.v_list);
|
||||||
ga_clear(&ev->reader->buffer);
|
ga_clear(&reader->buffer);
|
||||||
|
cb = &reader->cb;
|
||||||
|
argv[2].vval.v_string = (char_u *)reader->type;
|
||||||
} else {
|
} else {
|
||||||
argv[1].v_type = VAR_NUMBER;
|
argv[1].v_type = VAR_NUMBER;
|
||||||
argv[1].v_lock = VAR_UNLOCKED;
|
argv[1].v_lock = VAR_UNLOCKED;
|
||||||
argv[1].vval.v_number = ev->status;
|
argv[1].vval.v_number = chan->exit_status;
|
||||||
|
cb = &chan->on_exit;
|
||||||
|
argv[2].vval.v_string = (char_u *)"exit";
|
||||||
}
|
}
|
||||||
|
|
||||||
argv[2].v_type = VAR_STRING;
|
argv[2].v_type = VAR_STRING;
|
||||||
argv[2].v_lock = VAR_UNLOCKED;
|
argv[2].v_lock = VAR_UNLOCKED;
|
||||||
argv[2].vval.v_string = (uint8_t *)ev->type;
|
|
||||||
|
|
||||||
typval_T rettv = TV_INITIAL_VALUE;
|
typval_T rettv = TV_INITIAL_VALUE;
|
||||||
callback_call(ev->callback, 3, argv, &rettv);
|
callback_call(cb, 3, argv, &rettv);
|
||||||
tv_clear(&rettv);
|
tv_clear(&rettv);
|
||||||
channel_decref(ev->chan);
|
|
||||||
xfree(ev);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -42,13 +42,16 @@ typedef struct {
|
|||||||
Callback cb;
|
Callback cb;
|
||||||
dict_T *self;
|
dict_T *self;
|
||||||
garray_T buffer;
|
garray_T buffer;
|
||||||
|
bool eof;
|
||||||
bool buffered;
|
bool buffered;
|
||||||
|
const char *type;
|
||||||
} CallbackReader;
|
} CallbackReader;
|
||||||
|
|
||||||
#define CALLBACK_READER_INIT ((CallbackReader){ .cb = CALLBACK_NONE, \
|
#define CALLBACK_READER_INIT ((CallbackReader){ .cb = CALLBACK_NONE, \
|
||||||
.self = NULL, \
|
.self = NULL, \
|
||||||
.buffer = GA_EMPTY_INIT_VALUE, \
|
.buffer = GA_EMPTY_INIT_VALUE, \
|
||||||
.buffered = false })
|
.buffered = false, \
|
||||||
|
.type = NULL })
|
||||||
static inline bool callback_reader_set(CallbackReader reader)
|
static inline bool callback_reader_set(CallbackReader reader)
|
||||||
{
|
{
|
||||||
return reader.cb.type != kCallbackNone || reader.self;
|
return reader.cb.type != kCallbackNone || reader.self;
|
||||||
@ -73,9 +76,13 @@ struct Channel {
|
|||||||
RpcState rpc;
|
RpcState rpc;
|
||||||
Terminal *term;
|
Terminal *term;
|
||||||
|
|
||||||
CallbackReader on_stdout;
|
CallbackReader on_data;
|
||||||
CallbackReader on_stderr;
|
CallbackReader on_stderr;
|
||||||
Callback on_exit;
|
Callback on_exit;
|
||||||
|
int exit_status;
|
||||||
|
|
||||||
|
bool callback_busy;
|
||||||
|
bool callback_scheduled;
|
||||||
};
|
};
|
||||||
|
|
||||||
EXTERN PMap(uint64_t) *channels;
|
EXTERN PMap(uint64_t) *channels;
|
||||||
|
@ -5165,7 +5165,7 @@ bool garbage_collect(bool testing)
|
|||||||
{
|
{
|
||||||
Channel *data;
|
Channel *data;
|
||||||
map_foreach_value(channels, data, {
|
map_foreach_value(channels, data, {
|
||||||
set_ref_in_callback_reader(&data->on_stdout, copyID, NULL, NULL);
|
set_ref_in_callback_reader(&data->on_data, copyID, NULL, NULL);
|
||||||
set_ref_in_callback_reader(&data->on_stderr, copyID, NULL, NULL);
|
set_ref_in_callback_reader(&data->on_stderr, copyID, NULL, NULL);
|
||||||
set_ref_in_callback(&data->on_exit, copyID, NULL, NULL);
|
set_ref_in_callback(&data->on_exit, copyID, NULL, NULL);
|
||||||
})
|
})
|
||||||
|
@ -439,16 +439,66 @@ describe('jobs', function()
|
|||||||
call add(self.data, Normalize(a:data))
|
call add(self.data, Normalize(a:data))
|
||||||
sleep 200m
|
sleep 200m
|
||||||
endfunction
|
endfunction
|
||||||
|
function! d.on_exit(job, data, event) dict
|
||||||
|
let g:exit_data = copy(self.data)
|
||||||
|
endfunction
|
||||||
if has('win32')
|
if has('win32')
|
||||||
let cmd = 'for /L %I in (1,1,5) do @(echo %I& ping -n 2 127.0.0.1 > nul)'
|
let cmd = 'for /L %I in (1,1,5) do @(echo %I& ping -n 2 127.0.0.1 > nul)'
|
||||||
else
|
else
|
||||||
let cmd = ['sh', '-c', 'for i in $(seq 1 5); do echo $i; sleep 0.1; done']
|
let cmd = ['sh', '-c', 'for i in $(seq 1 5); do echo $i; sleep 0.1; done']
|
||||||
endif
|
endif
|
||||||
call jobwait([jobstart(cmd, d)])
|
let g:id = jobstart(cmd, d)
|
||||||
|
sleep 1500m
|
||||||
|
call jobwait([g:id])
|
||||||
]])
|
]])
|
||||||
|
|
||||||
local expected = {'1', '2', '3', '4', '5', ''}
|
local expected = {'1', '2', '3', '4', '5', ''}
|
||||||
local chunks = eval('d.data')
|
local chunks = eval('d.data')
|
||||||
|
-- check nothing was received after exit, including EOF
|
||||||
|
eq(eval('g:exit_data'), chunks)
|
||||||
|
local received = {''}
|
||||||
|
for i, chunk in ipairs(chunks) do
|
||||||
|
if i < #chunks then
|
||||||
|
-- if chunks got joined, a spurious [''] callback was not sent
|
||||||
|
neq({''}, chunk)
|
||||||
|
else
|
||||||
|
-- but EOF callback is still sent
|
||||||
|
eq({''}, chunk)
|
||||||
|
end
|
||||||
|
received[#received] = received[#received]..chunk[1]
|
||||||
|
for j = 2, #chunk do
|
||||||
|
received[#received+1] = chunk[j]
|
||||||
|
end
|
||||||
|
end
|
||||||
|
eq(expected, received)
|
||||||
|
end)
|
||||||
|
|
||||||
|
it('does not invoke callbacks recursively', function()
|
||||||
|
source([[
|
||||||
|
let d = {'data': []}
|
||||||
|
function! d.on_stdout(job, data, event) dict
|
||||||
|
" if callbacks were invoked recursively, this would cause on_stdout
|
||||||
|
" to be invoked recursively and the data reversed on the call stack
|
||||||
|
sleep 200m
|
||||||
|
call add(self.data, Normalize(a:data))
|
||||||
|
endfunction
|
||||||
|
function! d.on_exit(job, data, event) dict
|
||||||
|
let g:exit_data = copy(self.data)
|
||||||
|
endfunction
|
||||||
|
if has('win32')
|
||||||
|
let cmd = 'for /L %I in (1,1,5) do @(echo %I& ping -n 2 127.0.0.1 > nul)'
|
||||||
|
else
|
||||||
|
let cmd = ['sh', '-c', 'for i in $(seq 1 5); do echo $i; sleep 0.1; done']
|
||||||
|
endif
|
||||||
|
let g:id = jobstart(cmd, d)
|
||||||
|
sleep 1500m
|
||||||
|
call jobwait([g:id])
|
||||||
|
]])
|
||||||
|
|
||||||
|
local expected = {'1', '2', '3', '4', '5', ''}
|
||||||
|
local chunks = eval('d.data')
|
||||||
|
-- check nothing was received after exit, including EOF
|
||||||
|
eq(eval('g:exit_data'), chunks)
|
||||||
local received = {''}
|
local received = {''}
|
||||||
for i, chunk in ipairs(chunks) do
|
for i, chunk in ipairs(chunks) do
|
||||||
if i < #chunks then
|
if i < #chunks then
|
||||||
|
Loading…
Reference in New Issue
Block a user