2017-01-14 01:47:33 -07:00
|
|
|
-- Test server -> client RPC scenarios. Note: unlike `rpcnotify`, to evaluate
|
|
|
|
-- `rpcrequest` calls we need the client event loop to be running.
|
2024-04-20 08:44:13 -07:00
|
|
|
local t = require('test.testutil')
|
|
|
|
local n = require('test.functional.testnvim')()
|
2024-04-08 02:03:20 -07:00
|
|
|
|
2024-04-20 08:44:13 -07:00
|
|
|
local clear, eval = n.clear, n.eval
|
|
|
|
local eq, neq, run, stop = t.eq, t.neq, n.run, n.stop
|
|
|
|
local nvim_prog, command, fn = n.nvim_prog, n.command, n.fn
|
|
|
|
local source, next_msg = n.source, n.next_msg
|
2024-04-08 02:03:20 -07:00
|
|
|
local ok = t.ok
|
2024-04-20 08:44:13 -07:00
|
|
|
local api = n.api
|
|
|
|
local spawn, merge_args = n.spawn, n.merge_args
|
|
|
|
local set_session = n.set_session
|
2024-04-08 02:03:20 -07:00
|
|
|
local pcall_err = t.pcall_err
|
2024-04-20 08:44:13 -07:00
|
|
|
local assert_alive = n.assert_alive
|
2014-10-08 09:56:01 -07:00
|
|
|
|
|
|
|
describe('server -> client', function()
|
|
|
|
local cid
|
|
|
|
|
|
|
|
before_each(function()
|
|
|
|
clear()
|
2024-02-12 06:50:39 -07:00
|
|
|
cid = api.nvim_get_chan_info(0).id
|
2014-10-08 09:56:01 -07:00
|
|
|
end)
|
|
|
|
|
rpc: close channel if stream was closed
f_jobstop()/f_rpcstop() .. process_stop() .. process_close_in(proc)
closes the write-stream of a RPC channel. But there might be
a pending RPC notification on the queue, which may get processed just
before the channel is closed.
To handle that case, check the Stream.closed in
channel.c:receive_msgpack().
Before this change, the above scenario could trigger
this assert(!stream->closed) in wstream_write():
0x00007f96e1cd3428 in __GI_raise (sig=sig@entry=6) at ../sysdeps/unix/sysv/linux/raise.c:54
0x00007f96e1cd502a in __GI_abort () at abort.c:89
0x00007f96e1ccbbd7 in __assert_fail_base (fmt=<optimized out>, assertion=assertion@entry=0x768f9b "!stream->closed",
file=file@entry=0x768f70 "../src/nvim/event/wstream.c", line=line@entry=77,
function=function@entry=0x768fb0 <__PRETTY_FUNCTION__.13735> "wstream_write") at assert.c:92
0x00007f96e1ccbc82 in __GI___assert_fail (assertion=0x768f9b "!stream->closed", file=0x768f70 "../src/nvim/event/wstream.c", line=77,
function=0x768fb0 <__PRETTY_FUNCTION__.13735> "wstream_write") at assert.c:101
0x00000000004d2c1f in wstream_write (stream=0x7f96e0a35078, buffer=0x7f96e09f9b40) at ../src/nvim/event/wstream.c:77
0x00000000005857b2 in channel_write (channel=0x7f96e0ae5800, buffer=0x7f96e09f9b40) at ../src/nvim/msgpack_rpc/channel.c:551
0x000000000058567d in on_request_event (argv=0x7ffed792efa0) at ../src/nvim/msgpack_rpc/channel.c:523
0x00000000005854c8 in handle_request (channel=0x7f96e0ae5800, request=0x7ffed792f1b8) at ../src/nvim/msgpack_rpc/channel.c:503
0x00000000005850cb in parse_msgpack (channel=0x7f96e0ae5800) at ../src/nvim/msgpack_rpc/channel.c:423
0x0000000000584f90 in receive_msgpack (stream=0x7f96e0a35218, rbuf=0x7f96e0d1d4c0, c=22, data=0x7f96e0ae5800, eof=false)
at ../src/nvim/msgpack_rpc/channel.c:389
0x00000000004d0b20 in read_event (argv=0x7ffed792f4a8) at ../src/nvim/event/rstream.c:190
0x00000000004ce462 in multiqueue_process_events (this=0x7f96e18172d0) at ../src/nvim/event/multiqueue.c:150
0x000000000059b630 in nv_event (cap=0x7ffed792f620) at ../src/nvim/normal.c:7908
0x000000000058be69 in normal_execute (state=0x7ffed792f580, key=-25341) at ../src/nvim/normal.c:1137
0x0000000000652463 in state_enter (s=0x7ffed792f580) at ../src/nvim/state.c:61
0x000000000058a1fe in normal_enter (cmdwin=false, noexmode=false) at ../src/nvim/normal.c:467
0x00000000005500c2 in main (argc=2, argv=0x7ffed792f8d8) at ../src/nvim/main.c:554
Alternative approach suggested by bfredl is to use close_cb of the
process. My unsuccessful attempt is below. (It seems close_cb is queued
too late, which is the similar problem addressed by this commit):
commit 75fc12c6ab15711bdb7b18c6d42ec9d157f5145e
Author: Justin M. Keyes <justinkz@gmail.com>
Date: Fri Aug 18 01:30:41 2017 +0200
rpc: use Stream's close_cb instead of explicit check in receive_msgpack()
diff --git a/src/nvim/event/process.c b/src/nvim/event/process.c
index 8371d3cd482e..e52da23cdc40 100644
--- a/src/nvim/event/process.c
+++ b/src/nvim/event/process.c
@@ -416,6 +416,10 @@ static void on_process_exit(Process *proc)
static void on_process_stream_close(Stream *stream, void *data)
{
Process *proc = data;
+ ILOG("on_process_stream_close");
+ if (proc->stream_close_cb != NULL) {
+ proc->stream_close_cb(stream, proc->stream_close_data);
+ }
decref(proc);
}
diff --git a/src/nvim/event/process.h b/src/nvim/event/process.h
index 5c00e8e7ecd5..34a8d54f6f8c 100644
--- a/src/nvim/event/process.h
+++ b/src/nvim/event/process.h
@@ -26,6 +26,11 @@ struct process {
Stream *in, *out, *err;
process_exit_cb cb;
internal_process_cb internal_exit_cb, internal_close_cb;
+
+ // Called when any of the process streams (in/out/err) closes.
+ stream_close_cb stream_close_cb;
+ void *stream_close_data;
+
bool closed, detach;
MultiQueue *events;
};
@@ -50,6 +55,8 @@ static inline Process process_init(Loop *loop, ProcessType type, void *data)
.closed = false,
.internal_close_cb = NULL,
.internal_exit_cb = NULL,
+ .stream_close_cb = NULL,
+ .stream_close_data = NULL,
.detach = false
};
}
diff --git a/src/nvim/event/stream.c b/src/nvim/event/stream.c
index 7c865bfe1e8c..c8720d1e45d9 100644
--- a/src/nvim/event/stream.c
+++ b/src/nvim/event/stream.c
@@ -95,7 +95,11 @@ void stream_close(Stream *stream, stream_close_cb on_stream_close, void *data)
void stream_close_handle(Stream *stream)
FUNC_ATTR_NONNULL_ALL
{
+ ILOG("stream=%d", stream);
+ // LOG_CALLSTACK();
if (stream->uvstream) {
+ // problem: this schedules on the queue, but channel.c:receive_msgpack may
+ // be processed before close_cb is called by libuv.
uv_close((uv_handle_t *)stream->uvstream, close_cb);
} else {
uv_close((uv_handle_t *)&stream->uv.idle, close_cb);
@@ -105,6 +109,7 @@ void stream_close_handle(Stream *stream)
static void close_cb(uv_handle_t *handle)
{
Stream *stream = handle->data;
+ ILOG(">>>>>>>>>>>>>>>>>>>>>>> stream=%p stream->internal_close_cb=%p", stream, stream->internal_close_cb);
if (stream->buffer) {
rbuffer_free(stream->buffer);
}
diff --git a/src/nvim/msgpack_rpc/channel.c b/src/nvim/msgpack_rpc/channel.c
index 782eabe04e4a..dc2b794e366a 100644
--- a/src/nvim/msgpack_rpc/channel.c
+++ b/src/nvim/msgpack_rpc/channel.c
@@ -128,6 +128,8 @@ uint64_t channel_from_process(Process *proc, uint64_t id, char *source)
source);
incref(channel); // process channels are only closed by the exit_cb
channel->data.proc = proc;
+ channel->data.proc->stream_close_cb = close_cb2;
+ channel->data.proc->stream_close_data = channel;
wstream_init(proc->in, 0);
rstream_init(proc->out, 0);
@@ -387,17 +389,6 @@ static void receive_msgpack(Stream *stream, RBuffer *rbuf, size_t c,
goto end;
}
- if ((chan_wstream(channel) != NULL && chan_wstream(channel)->closed)
- || (chan_rstream(channel) != NULL && chan_rstream(channel)->closed)) {
- char buf[256];
- snprintf(buf, sizeof(buf),
- "ch %" PRIu64 ": stream closed unexpectedly. "
- "closing channel",
- channel->id);
- call_set_error(channel, buf, WARN_LOG_LEVEL);
- goto end;
- }
-
size_t count = rbuffer_size(rbuf);
DLOG("ch %" PRIu64 ": parsing %u bytes from msgpack Stream: %p",
channel->id, count, stream);
@@ -571,23 +562,6 @@ static Stream *chan_wstream(Channel *chan)
abort();
}
-/// Returns the Stream that a Channel reads from.
-static Stream *chan_rstream(Channel *chan)
-{
- switch (chan->type) {
- case kChannelTypeSocket:
- return &chan->data.stream;
- case kChannelTypeProc:
- return chan->data.proc->out;
- case kChannelTypeStdio:
- return &chan->data.std.in;
- case kChannelTypeInternal:
- return NULL;
- }
- abort();
-}
-
-
static bool channel_write(Channel *channel, WBuffer *buffer)
{
bool success = false;
@@ -799,6 +773,12 @@ static void close_cb(Stream *stream, void *data)
decref(data);
}
+static void close_cb2(Stream *stream, void *data)
+{
+ ILOG("close_cb2");
+ close_channel(data);
+}
+
/// @param source description of source function, rplugin name, TCP addr, etc
static Channel *register_channel(ChannelType type, uint64_t id,
MultiQueue *events, char *source)
2017-07-26 16:08:50 -07:00
|
|
|
it('handles unexpected closed stream while preparing RPC response', function()
|
|
|
|
source([[
|
2018-09-20 10:19:38 -07:00
|
|
|
let g:_nvim_args = [v:progpath, '--embed', '--headless', '-n', '-u', 'NONE', '-i', 'NONE', ]
|
rpc: close channel if stream was closed
f_jobstop()/f_rpcstop() .. process_stop() .. process_close_in(proc)
closes the write-stream of a RPC channel. But there might be
a pending RPC notification on the queue, which may get processed just
before the channel is closed.
To handle that case, check the Stream.closed in
channel.c:receive_msgpack().
Before this change, the above scenario could trigger
this assert(!stream->closed) in wstream_write():
0x00007f96e1cd3428 in __GI_raise (sig=sig@entry=6) at ../sysdeps/unix/sysv/linux/raise.c:54
0x00007f96e1cd502a in __GI_abort () at abort.c:89
0x00007f96e1ccbbd7 in __assert_fail_base (fmt=<optimized out>, assertion=assertion@entry=0x768f9b "!stream->closed",
file=file@entry=0x768f70 "../src/nvim/event/wstream.c", line=line@entry=77,
function=function@entry=0x768fb0 <__PRETTY_FUNCTION__.13735> "wstream_write") at assert.c:92
0x00007f96e1ccbc82 in __GI___assert_fail (assertion=0x768f9b "!stream->closed", file=0x768f70 "../src/nvim/event/wstream.c", line=77,
function=0x768fb0 <__PRETTY_FUNCTION__.13735> "wstream_write") at assert.c:101
0x00000000004d2c1f in wstream_write (stream=0x7f96e0a35078, buffer=0x7f96e09f9b40) at ../src/nvim/event/wstream.c:77
0x00000000005857b2 in channel_write (channel=0x7f96e0ae5800, buffer=0x7f96e09f9b40) at ../src/nvim/msgpack_rpc/channel.c:551
0x000000000058567d in on_request_event (argv=0x7ffed792efa0) at ../src/nvim/msgpack_rpc/channel.c:523
0x00000000005854c8 in handle_request (channel=0x7f96e0ae5800, request=0x7ffed792f1b8) at ../src/nvim/msgpack_rpc/channel.c:503
0x00000000005850cb in parse_msgpack (channel=0x7f96e0ae5800) at ../src/nvim/msgpack_rpc/channel.c:423
0x0000000000584f90 in receive_msgpack (stream=0x7f96e0a35218, rbuf=0x7f96e0d1d4c0, c=22, data=0x7f96e0ae5800, eof=false)
at ../src/nvim/msgpack_rpc/channel.c:389
0x00000000004d0b20 in read_event (argv=0x7ffed792f4a8) at ../src/nvim/event/rstream.c:190
0x00000000004ce462 in multiqueue_process_events (this=0x7f96e18172d0) at ../src/nvim/event/multiqueue.c:150
0x000000000059b630 in nv_event (cap=0x7ffed792f620) at ../src/nvim/normal.c:7908
0x000000000058be69 in normal_execute (state=0x7ffed792f580, key=-25341) at ../src/nvim/normal.c:1137
0x0000000000652463 in state_enter (s=0x7ffed792f580) at ../src/nvim/state.c:61
0x000000000058a1fe in normal_enter (cmdwin=false, noexmode=false) at ../src/nvim/normal.c:467
0x00000000005500c2 in main (argc=2, argv=0x7ffed792f8d8) at ../src/nvim/main.c:554
Alternative approach suggested by bfredl is to use close_cb of the
process. My unsuccessful attempt is below. (It seems close_cb is queued
too late, which is the similar problem addressed by this commit):
commit 75fc12c6ab15711bdb7b18c6d42ec9d157f5145e
Author: Justin M. Keyes <justinkz@gmail.com>
Date: Fri Aug 18 01:30:41 2017 +0200
rpc: use Stream's close_cb instead of explicit check in receive_msgpack()
diff --git a/src/nvim/event/process.c b/src/nvim/event/process.c
index 8371d3cd482e..e52da23cdc40 100644
--- a/src/nvim/event/process.c
+++ b/src/nvim/event/process.c
@@ -416,6 +416,10 @@ static void on_process_exit(Process *proc)
static void on_process_stream_close(Stream *stream, void *data)
{
Process *proc = data;
+ ILOG("on_process_stream_close");
+ if (proc->stream_close_cb != NULL) {
+ proc->stream_close_cb(stream, proc->stream_close_data);
+ }
decref(proc);
}
diff --git a/src/nvim/event/process.h b/src/nvim/event/process.h
index 5c00e8e7ecd5..34a8d54f6f8c 100644
--- a/src/nvim/event/process.h
+++ b/src/nvim/event/process.h
@@ -26,6 +26,11 @@ struct process {
Stream *in, *out, *err;
process_exit_cb cb;
internal_process_cb internal_exit_cb, internal_close_cb;
+
+ // Called when any of the process streams (in/out/err) closes.
+ stream_close_cb stream_close_cb;
+ void *stream_close_data;
+
bool closed, detach;
MultiQueue *events;
};
@@ -50,6 +55,8 @@ static inline Process process_init(Loop *loop, ProcessType type, void *data)
.closed = false,
.internal_close_cb = NULL,
.internal_exit_cb = NULL,
+ .stream_close_cb = NULL,
+ .stream_close_data = NULL,
.detach = false
};
}
diff --git a/src/nvim/event/stream.c b/src/nvim/event/stream.c
index 7c865bfe1e8c..c8720d1e45d9 100644
--- a/src/nvim/event/stream.c
+++ b/src/nvim/event/stream.c
@@ -95,7 +95,11 @@ void stream_close(Stream *stream, stream_close_cb on_stream_close, void *data)
void stream_close_handle(Stream *stream)
FUNC_ATTR_NONNULL_ALL
{
+ ILOG("stream=%d", stream);
+ // LOG_CALLSTACK();
if (stream->uvstream) {
+ // problem: this schedules on the queue, but channel.c:receive_msgpack may
+ // be processed before close_cb is called by libuv.
uv_close((uv_handle_t *)stream->uvstream, close_cb);
} else {
uv_close((uv_handle_t *)&stream->uv.idle, close_cb);
@@ -105,6 +109,7 @@ void stream_close_handle(Stream *stream)
static void close_cb(uv_handle_t *handle)
{
Stream *stream = handle->data;
+ ILOG(">>>>>>>>>>>>>>>>>>>>>>> stream=%p stream->internal_close_cb=%p", stream, stream->internal_close_cb);
if (stream->buffer) {
rbuffer_free(stream->buffer);
}
diff --git a/src/nvim/msgpack_rpc/channel.c b/src/nvim/msgpack_rpc/channel.c
index 782eabe04e4a..dc2b794e366a 100644
--- a/src/nvim/msgpack_rpc/channel.c
+++ b/src/nvim/msgpack_rpc/channel.c
@@ -128,6 +128,8 @@ uint64_t channel_from_process(Process *proc, uint64_t id, char *source)
source);
incref(channel); // process channels are only closed by the exit_cb
channel->data.proc = proc;
+ channel->data.proc->stream_close_cb = close_cb2;
+ channel->data.proc->stream_close_data = channel;
wstream_init(proc->in, 0);
rstream_init(proc->out, 0);
@@ -387,17 +389,6 @@ static void receive_msgpack(Stream *stream, RBuffer *rbuf, size_t c,
goto end;
}
- if ((chan_wstream(channel) != NULL && chan_wstream(channel)->closed)
- || (chan_rstream(channel) != NULL && chan_rstream(channel)->closed)) {
- char buf[256];
- snprintf(buf, sizeof(buf),
- "ch %" PRIu64 ": stream closed unexpectedly. "
- "closing channel",
- channel->id);
- call_set_error(channel, buf, WARN_LOG_LEVEL);
- goto end;
- }
-
size_t count = rbuffer_size(rbuf);
DLOG("ch %" PRIu64 ": parsing %u bytes from msgpack Stream: %p",
channel->id, count, stream);
@@ -571,23 +562,6 @@ static Stream *chan_wstream(Channel *chan)
abort();
}
-/// Returns the Stream that a Channel reads from.
-static Stream *chan_rstream(Channel *chan)
-{
- switch (chan->type) {
- case kChannelTypeSocket:
- return &chan->data.stream;
- case kChannelTypeProc:
- return chan->data.proc->out;
- case kChannelTypeStdio:
- return &chan->data.std.in;
- case kChannelTypeInternal:
- return NULL;
- }
- abort();
-}
-
-
static bool channel_write(Channel *channel, WBuffer *buffer)
{
bool success = false;
@@ -799,6 +773,12 @@ static void close_cb(Stream *stream, void *data)
decref(data);
}
+static void close_cb2(Stream *stream, void *data)
+{
+ ILOG("close_cb2");
+ close_channel(data);
+}
+
/// @param source description of source function, rplugin name, TCP addr, etc
static Channel *register_channel(ChannelType type, uint64_t id,
MultiQueue *events, char *source)
2017-07-26 16:08:50 -07:00
|
|
|
let ch1 = jobstart(g:_nvim_args, {'rpc': v:true})
|
2024-02-12 06:50:39 -07:00
|
|
|
let child1_ch = rpcrequest(ch1, "nvim_get_chan_info", 0).id
|
rpc: close channel if stream was closed
f_jobstop()/f_rpcstop() .. process_stop() .. process_close_in(proc)
closes the write-stream of a RPC channel. But there might be
a pending RPC notification on the queue, which may get processed just
before the channel is closed.
To handle that case, check the Stream.closed in
channel.c:receive_msgpack().
Before this change, the above scenario could trigger
this assert(!stream->closed) in wstream_write():
0x00007f96e1cd3428 in __GI_raise (sig=sig@entry=6) at ../sysdeps/unix/sysv/linux/raise.c:54
0x00007f96e1cd502a in __GI_abort () at abort.c:89
0x00007f96e1ccbbd7 in __assert_fail_base (fmt=<optimized out>, assertion=assertion@entry=0x768f9b "!stream->closed",
file=file@entry=0x768f70 "../src/nvim/event/wstream.c", line=line@entry=77,
function=function@entry=0x768fb0 <__PRETTY_FUNCTION__.13735> "wstream_write") at assert.c:92
0x00007f96e1ccbc82 in __GI___assert_fail (assertion=0x768f9b "!stream->closed", file=0x768f70 "../src/nvim/event/wstream.c", line=77,
function=0x768fb0 <__PRETTY_FUNCTION__.13735> "wstream_write") at assert.c:101
0x00000000004d2c1f in wstream_write (stream=0x7f96e0a35078, buffer=0x7f96e09f9b40) at ../src/nvim/event/wstream.c:77
0x00000000005857b2 in channel_write (channel=0x7f96e0ae5800, buffer=0x7f96e09f9b40) at ../src/nvim/msgpack_rpc/channel.c:551
0x000000000058567d in on_request_event (argv=0x7ffed792efa0) at ../src/nvim/msgpack_rpc/channel.c:523
0x00000000005854c8 in handle_request (channel=0x7f96e0ae5800, request=0x7ffed792f1b8) at ../src/nvim/msgpack_rpc/channel.c:503
0x00000000005850cb in parse_msgpack (channel=0x7f96e0ae5800) at ../src/nvim/msgpack_rpc/channel.c:423
0x0000000000584f90 in receive_msgpack (stream=0x7f96e0a35218, rbuf=0x7f96e0d1d4c0, c=22, data=0x7f96e0ae5800, eof=false)
at ../src/nvim/msgpack_rpc/channel.c:389
0x00000000004d0b20 in read_event (argv=0x7ffed792f4a8) at ../src/nvim/event/rstream.c:190
0x00000000004ce462 in multiqueue_process_events (this=0x7f96e18172d0) at ../src/nvim/event/multiqueue.c:150
0x000000000059b630 in nv_event (cap=0x7ffed792f620) at ../src/nvim/normal.c:7908
0x000000000058be69 in normal_execute (state=0x7ffed792f580, key=-25341) at ../src/nvim/normal.c:1137
0x0000000000652463 in state_enter (s=0x7ffed792f580) at ../src/nvim/state.c:61
0x000000000058a1fe in normal_enter (cmdwin=false, noexmode=false) at ../src/nvim/normal.c:467
0x00000000005500c2 in main (argc=2, argv=0x7ffed792f8d8) at ../src/nvim/main.c:554
Alternative approach suggested by bfredl is to use close_cb of the
process. My unsuccessful attempt is below. (It seems close_cb is queued
too late, which is the similar problem addressed by this commit):
commit 75fc12c6ab15711bdb7b18c6d42ec9d157f5145e
Author: Justin M. Keyes <justinkz@gmail.com>
Date: Fri Aug 18 01:30:41 2017 +0200
rpc: use Stream's close_cb instead of explicit check in receive_msgpack()
diff --git a/src/nvim/event/process.c b/src/nvim/event/process.c
index 8371d3cd482e..e52da23cdc40 100644
--- a/src/nvim/event/process.c
+++ b/src/nvim/event/process.c
@@ -416,6 +416,10 @@ static void on_process_exit(Process *proc)
static void on_process_stream_close(Stream *stream, void *data)
{
Process *proc = data;
+ ILOG("on_process_stream_close");
+ if (proc->stream_close_cb != NULL) {
+ proc->stream_close_cb(stream, proc->stream_close_data);
+ }
decref(proc);
}
diff --git a/src/nvim/event/process.h b/src/nvim/event/process.h
index 5c00e8e7ecd5..34a8d54f6f8c 100644
--- a/src/nvim/event/process.h
+++ b/src/nvim/event/process.h
@@ -26,6 +26,11 @@ struct process {
Stream *in, *out, *err;
process_exit_cb cb;
internal_process_cb internal_exit_cb, internal_close_cb;
+
+ // Called when any of the process streams (in/out/err) closes.
+ stream_close_cb stream_close_cb;
+ void *stream_close_data;
+
bool closed, detach;
MultiQueue *events;
};
@@ -50,6 +55,8 @@ static inline Process process_init(Loop *loop, ProcessType type, void *data)
.closed = false,
.internal_close_cb = NULL,
.internal_exit_cb = NULL,
+ .stream_close_cb = NULL,
+ .stream_close_data = NULL,
.detach = false
};
}
diff --git a/src/nvim/event/stream.c b/src/nvim/event/stream.c
index 7c865bfe1e8c..c8720d1e45d9 100644
--- a/src/nvim/event/stream.c
+++ b/src/nvim/event/stream.c
@@ -95,7 +95,11 @@ void stream_close(Stream *stream, stream_close_cb on_stream_close, void *data)
void stream_close_handle(Stream *stream)
FUNC_ATTR_NONNULL_ALL
{
+ ILOG("stream=%d", stream);
+ // LOG_CALLSTACK();
if (stream->uvstream) {
+ // problem: this schedules on the queue, but channel.c:receive_msgpack may
+ // be processed before close_cb is called by libuv.
uv_close((uv_handle_t *)stream->uvstream, close_cb);
} else {
uv_close((uv_handle_t *)&stream->uv.idle, close_cb);
@@ -105,6 +109,7 @@ void stream_close_handle(Stream *stream)
static void close_cb(uv_handle_t *handle)
{
Stream *stream = handle->data;
+ ILOG(">>>>>>>>>>>>>>>>>>>>>>> stream=%p stream->internal_close_cb=%p", stream, stream->internal_close_cb);
if (stream->buffer) {
rbuffer_free(stream->buffer);
}
diff --git a/src/nvim/msgpack_rpc/channel.c b/src/nvim/msgpack_rpc/channel.c
index 782eabe04e4a..dc2b794e366a 100644
--- a/src/nvim/msgpack_rpc/channel.c
+++ b/src/nvim/msgpack_rpc/channel.c
@@ -128,6 +128,8 @@ uint64_t channel_from_process(Process *proc, uint64_t id, char *source)
source);
incref(channel); // process channels are only closed by the exit_cb
channel->data.proc = proc;
+ channel->data.proc->stream_close_cb = close_cb2;
+ channel->data.proc->stream_close_data = channel;
wstream_init(proc->in, 0);
rstream_init(proc->out, 0);
@@ -387,17 +389,6 @@ static void receive_msgpack(Stream *stream, RBuffer *rbuf, size_t c,
goto end;
}
- if ((chan_wstream(channel) != NULL && chan_wstream(channel)->closed)
- || (chan_rstream(channel) != NULL && chan_rstream(channel)->closed)) {
- char buf[256];
- snprintf(buf, sizeof(buf),
- "ch %" PRIu64 ": stream closed unexpectedly. "
- "closing channel",
- channel->id);
- call_set_error(channel, buf, WARN_LOG_LEVEL);
- goto end;
- }
-
size_t count = rbuffer_size(rbuf);
DLOG("ch %" PRIu64 ": parsing %u bytes from msgpack Stream: %p",
channel->id, count, stream);
@@ -571,23 +562,6 @@ static Stream *chan_wstream(Channel *chan)
abort();
}
-/// Returns the Stream that a Channel reads from.
-static Stream *chan_rstream(Channel *chan)
-{
- switch (chan->type) {
- case kChannelTypeSocket:
- return &chan->data.stream;
- case kChannelTypeProc:
- return chan->data.proc->out;
- case kChannelTypeStdio:
- return &chan->data.std.in;
- case kChannelTypeInternal:
- return NULL;
- }
- abort();
-}
-
-
static bool channel_write(Channel *channel, WBuffer *buffer)
{
bool success = false;
@@ -799,6 +773,12 @@ static void close_cb(Stream *stream, void *data)
decref(data);
}
+static void close_cb2(Stream *stream, void *data)
+{
+ ILOG("close_cb2");
+ close_channel(data);
+}
+
/// @param source description of source function, rplugin name, TCP addr, etc
static Channel *register_channel(ChannelType type, uint64_t id,
MultiQueue *events, char *source)
2017-07-26 16:08:50 -07:00
|
|
|
call rpcnotify(ch1, 'nvim_eval', 'rpcrequest('.child1_ch.', "nvim_get_api_info")')
|
|
|
|
|
|
|
|
let ch2 = jobstart(g:_nvim_args, {'rpc': v:true})
|
2024-02-12 06:50:39 -07:00
|
|
|
let child2_ch = rpcrequest(ch2, "nvim_get_chan_info", 0).id
|
rpc: close channel if stream was closed
f_jobstop()/f_rpcstop() .. process_stop() .. process_close_in(proc)
closes the write-stream of a RPC channel. But there might be
a pending RPC notification on the queue, which may get processed just
before the channel is closed.
To handle that case, check the Stream.closed in
channel.c:receive_msgpack().
Before this change, the above scenario could trigger
this assert(!stream->closed) in wstream_write():
0x00007f96e1cd3428 in __GI_raise (sig=sig@entry=6) at ../sysdeps/unix/sysv/linux/raise.c:54
0x00007f96e1cd502a in __GI_abort () at abort.c:89
0x00007f96e1ccbbd7 in __assert_fail_base (fmt=<optimized out>, assertion=assertion@entry=0x768f9b "!stream->closed",
file=file@entry=0x768f70 "../src/nvim/event/wstream.c", line=line@entry=77,
function=function@entry=0x768fb0 <__PRETTY_FUNCTION__.13735> "wstream_write") at assert.c:92
0x00007f96e1ccbc82 in __GI___assert_fail (assertion=0x768f9b "!stream->closed", file=0x768f70 "../src/nvim/event/wstream.c", line=77,
function=0x768fb0 <__PRETTY_FUNCTION__.13735> "wstream_write") at assert.c:101
0x00000000004d2c1f in wstream_write (stream=0x7f96e0a35078, buffer=0x7f96e09f9b40) at ../src/nvim/event/wstream.c:77
0x00000000005857b2 in channel_write (channel=0x7f96e0ae5800, buffer=0x7f96e09f9b40) at ../src/nvim/msgpack_rpc/channel.c:551
0x000000000058567d in on_request_event (argv=0x7ffed792efa0) at ../src/nvim/msgpack_rpc/channel.c:523
0x00000000005854c8 in handle_request (channel=0x7f96e0ae5800, request=0x7ffed792f1b8) at ../src/nvim/msgpack_rpc/channel.c:503
0x00000000005850cb in parse_msgpack (channel=0x7f96e0ae5800) at ../src/nvim/msgpack_rpc/channel.c:423
0x0000000000584f90 in receive_msgpack (stream=0x7f96e0a35218, rbuf=0x7f96e0d1d4c0, c=22, data=0x7f96e0ae5800, eof=false)
at ../src/nvim/msgpack_rpc/channel.c:389
0x00000000004d0b20 in read_event (argv=0x7ffed792f4a8) at ../src/nvim/event/rstream.c:190
0x00000000004ce462 in multiqueue_process_events (this=0x7f96e18172d0) at ../src/nvim/event/multiqueue.c:150
0x000000000059b630 in nv_event (cap=0x7ffed792f620) at ../src/nvim/normal.c:7908
0x000000000058be69 in normal_execute (state=0x7ffed792f580, key=-25341) at ../src/nvim/normal.c:1137
0x0000000000652463 in state_enter (s=0x7ffed792f580) at ../src/nvim/state.c:61
0x000000000058a1fe in normal_enter (cmdwin=false, noexmode=false) at ../src/nvim/normal.c:467
0x00000000005500c2 in main (argc=2, argv=0x7ffed792f8d8) at ../src/nvim/main.c:554
Alternative approach suggested by bfredl is to use close_cb of the
process. My unsuccessful attempt is below. (It seems close_cb is queued
too late, which is the similar problem addressed by this commit):
commit 75fc12c6ab15711bdb7b18c6d42ec9d157f5145e
Author: Justin M. Keyes <justinkz@gmail.com>
Date: Fri Aug 18 01:30:41 2017 +0200
rpc: use Stream's close_cb instead of explicit check in receive_msgpack()
diff --git a/src/nvim/event/process.c b/src/nvim/event/process.c
index 8371d3cd482e..e52da23cdc40 100644
--- a/src/nvim/event/process.c
+++ b/src/nvim/event/process.c
@@ -416,6 +416,10 @@ static void on_process_exit(Process *proc)
static void on_process_stream_close(Stream *stream, void *data)
{
Process *proc = data;
+ ILOG("on_process_stream_close");
+ if (proc->stream_close_cb != NULL) {
+ proc->stream_close_cb(stream, proc->stream_close_data);
+ }
decref(proc);
}
diff --git a/src/nvim/event/process.h b/src/nvim/event/process.h
index 5c00e8e7ecd5..34a8d54f6f8c 100644
--- a/src/nvim/event/process.h
+++ b/src/nvim/event/process.h
@@ -26,6 +26,11 @@ struct process {
Stream *in, *out, *err;
process_exit_cb cb;
internal_process_cb internal_exit_cb, internal_close_cb;
+
+ // Called when any of the process streams (in/out/err) closes.
+ stream_close_cb stream_close_cb;
+ void *stream_close_data;
+
bool closed, detach;
MultiQueue *events;
};
@@ -50,6 +55,8 @@ static inline Process process_init(Loop *loop, ProcessType type, void *data)
.closed = false,
.internal_close_cb = NULL,
.internal_exit_cb = NULL,
+ .stream_close_cb = NULL,
+ .stream_close_data = NULL,
.detach = false
};
}
diff --git a/src/nvim/event/stream.c b/src/nvim/event/stream.c
index 7c865bfe1e8c..c8720d1e45d9 100644
--- a/src/nvim/event/stream.c
+++ b/src/nvim/event/stream.c
@@ -95,7 +95,11 @@ void stream_close(Stream *stream, stream_close_cb on_stream_close, void *data)
void stream_close_handle(Stream *stream)
FUNC_ATTR_NONNULL_ALL
{
+ ILOG("stream=%d", stream);
+ // LOG_CALLSTACK();
if (stream->uvstream) {
+ // problem: this schedules on the queue, but channel.c:receive_msgpack may
+ // be processed before close_cb is called by libuv.
uv_close((uv_handle_t *)stream->uvstream, close_cb);
} else {
uv_close((uv_handle_t *)&stream->uv.idle, close_cb);
@@ -105,6 +109,7 @@ void stream_close_handle(Stream *stream)
static void close_cb(uv_handle_t *handle)
{
Stream *stream = handle->data;
+ ILOG(">>>>>>>>>>>>>>>>>>>>>>> stream=%p stream->internal_close_cb=%p", stream, stream->internal_close_cb);
if (stream->buffer) {
rbuffer_free(stream->buffer);
}
diff --git a/src/nvim/msgpack_rpc/channel.c b/src/nvim/msgpack_rpc/channel.c
index 782eabe04e4a..dc2b794e366a 100644
--- a/src/nvim/msgpack_rpc/channel.c
+++ b/src/nvim/msgpack_rpc/channel.c
@@ -128,6 +128,8 @@ uint64_t channel_from_process(Process *proc, uint64_t id, char *source)
source);
incref(channel); // process channels are only closed by the exit_cb
channel->data.proc = proc;
+ channel->data.proc->stream_close_cb = close_cb2;
+ channel->data.proc->stream_close_data = channel;
wstream_init(proc->in, 0);
rstream_init(proc->out, 0);
@@ -387,17 +389,6 @@ static void receive_msgpack(Stream *stream, RBuffer *rbuf, size_t c,
goto end;
}
- if ((chan_wstream(channel) != NULL && chan_wstream(channel)->closed)
- || (chan_rstream(channel) != NULL && chan_rstream(channel)->closed)) {
- char buf[256];
- snprintf(buf, sizeof(buf),
- "ch %" PRIu64 ": stream closed unexpectedly. "
- "closing channel",
- channel->id);
- call_set_error(channel, buf, WARN_LOG_LEVEL);
- goto end;
- }
-
size_t count = rbuffer_size(rbuf);
DLOG("ch %" PRIu64 ": parsing %u bytes from msgpack Stream: %p",
channel->id, count, stream);
@@ -571,23 +562,6 @@ static Stream *chan_wstream(Channel *chan)
abort();
}
-/// Returns the Stream that a Channel reads from.
-static Stream *chan_rstream(Channel *chan)
-{
- switch (chan->type) {
- case kChannelTypeSocket:
- return &chan->data.stream;
- case kChannelTypeProc:
- return chan->data.proc->out;
- case kChannelTypeStdio:
- return &chan->data.std.in;
- case kChannelTypeInternal:
- return NULL;
- }
- abort();
-}
-
-
static bool channel_write(Channel *channel, WBuffer *buffer)
{
bool success = false;
@@ -799,6 +773,12 @@ static void close_cb(Stream *stream, void *data)
decref(data);
}
+static void close_cb2(Stream *stream, void *data)
+{
+ ILOG("close_cb2");
+ close_channel(data);
+}
+
/// @param source description of source function, rplugin name, TCP addr, etc
static Channel *register_channel(ChannelType type, uint64_t id,
MultiQueue *events, char *source)
2017-07-26 16:08:50 -07:00
|
|
|
call rpcnotify(ch2, 'nvim_eval', 'rpcrequest('.child2_ch.', "nvim_get_api_info")')
|
|
|
|
|
|
|
|
call jobstop(ch1)
|
|
|
|
]])
|
2021-09-01 09:42:53 -07:00
|
|
|
assert_alive()
|
rpc: close channel if stream was closed
f_jobstop()/f_rpcstop() .. process_stop() .. process_close_in(proc)
closes the write-stream of a RPC channel. But there might be
a pending RPC notification on the queue, which may get processed just
before the channel is closed.
To handle that case, check the Stream.closed in
channel.c:receive_msgpack().
Before this change, the above scenario could trigger
this assert(!stream->closed) in wstream_write():
0x00007f96e1cd3428 in __GI_raise (sig=sig@entry=6) at ../sysdeps/unix/sysv/linux/raise.c:54
0x00007f96e1cd502a in __GI_abort () at abort.c:89
0x00007f96e1ccbbd7 in __assert_fail_base (fmt=<optimized out>, assertion=assertion@entry=0x768f9b "!stream->closed",
file=file@entry=0x768f70 "../src/nvim/event/wstream.c", line=line@entry=77,
function=function@entry=0x768fb0 <__PRETTY_FUNCTION__.13735> "wstream_write") at assert.c:92
0x00007f96e1ccbc82 in __GI___assert_fail (assertion=0x768f9b "!stream->closed", file=0x768f70 "../src/nvim/event/wstream.c", line=77,
function=0x768fb0 <__PRETTY_FUNCTION__.13735> "wstream_write") at assert.c:101
0x00000000004d2c1f in wstream_write (stream=0x7f96e0a35078, buffer=0x7f96e09f9b40) at ../src/nvim/event/wstream.c:77
0x00000000005857b2 in channel_write (channel=0x7f96e0ae5800, buffer=0x7f96e09f9b40) at ../src/nvim/msgpack_rpc/channel.c:551
0x000000000058567d in on_request_event (argv=0x7ffed792efa0) at ../src/nvim/msgpack_rpc/channel.c:523
0x00000000005854c8 in handle_request (channel=0x7f96e0ae5800, request=0x7ffed792f1b8) at ../src/nvim/msgpack_rpc/channel.c:503
0x00000000005850cb in parse_msgpack (channel=0x7f96e0ae5800) at ../src/nvim/msgpack_rpc/channel.c:423
0x0000000000584f90 in receive_msgpack (stream=0x7f96e0a35218, rbuf=0x7f96e0d1d4c0, c=22, data=0x7f96e0ae5800, eof=false)
at ../src/nvim/msgpack_rpc/channel.c:389
0x00000000004d0b20 in read_event (argv=0x7ffed792f4a8) at ../src/nvim/event/rstream.c:190
0x00000000004ce462 in multiqueue_process_events (this=0x7f96e18172d0) at ../src/nvim/event/multiqueue.c:150
0x000000000059b630 in nv_event (cap=0x7ffed792f620) at ../src/nvim/normal.c:7908
0x000000000058be69 in normal_execute (state=0x7ffed792f580, key=-25341) at ../src/nvim/normal.c:1137
0x0000000000652463 in state_enter (s=0x7ffed792f580) at ../src/nvim/state.c:61
0x000000000058a1fe in normal_enter (cmdwin=false, noexmode=false) at ../src/nvim/normal.c:467
0x00000000005500c2 in main (argc=2, argv=0x7ffed792f8d8) at ../src/nvim/main.c:554
Alternative approach suggested by bfredl is to use close_cb of the
process. My unsuccessful attempt is below. (It seems close_cb is queued
too late, which is the similar problem addressed by this commit):
commit 75fc12c6ab15711bdb7b18c6d42ec9d157f5145e
Author: Justin M. Keyes <justinkz@gmail.com>
Date: Fri Aug 18 01:30:41 2017 +0200
rpc: use Stream's close_cb instead of explicit check in receive_msgpack()
diff --git a/src/nvim/event/process.c b/src/nvim/event/process.c
index 8371d3cd482e..e52da23cdc40 100644
--- a/src/nvim/event/process.c
+++ b/src/nvim/event/process.c
@@ -416,6 +416,10 @@ static void on_process_exit(Process *proc)
static void on_process_stream_close(Stream *stream, void *data)
{
Process *proc = data;
+ ILOG("on_process_stream_close");
+ if (proc->stream_close_cb != NULL) {
+ proc->stream_close_cb(stream, proc->stream_close_data);
+ }
decref(proc);
}
diff --git a/src/nvim/event/process.h b/src/nvim/event/process.h
index 5c00e8e7ecd5..34a8d54f6f8c 100644
--- a/src/nvim/event/process.h
+++ b/src/nvim/event/process.h
@@ -26,6 +26,11 @@ struct process {
Stream *in, *out, *err;
process_exit_cb cb;
internal_process_cb internal_exit_cb, internal_close_cb;
+
+ // Called when any of the process streams (in/out/err) closes.
+ stream_close_cb stream_close_cb;
+ void *stream_close_data;
+
bool closed, detach;
MultiQueue *events;
};
@@ -50,6 +55,8 @@ static inline Process process_init(Loop *loop, ProcessType type, void *data)
.closed = false,
.internal_close_cb = NULL,
.internal_exit_cb = NULL,
+ .stream_close_cb = NULL,
+ .stream_close_data = NULL,
.detach = false
};
}
diff --git a/src/nvim/event/stream.c b/src/nvim/event/stream.c
index 7c865bfe1e8c..c8720d1e45d9 100644
--- a/src/nvim/event/stream.c
+++ b/src/nvim/event/stream.c
@@ -95,7 +95,11 @@ void stream_close(Stream *stream, stream_close_cb on_stream_close, void *data)
void stream_close_handle(Stream *stream)
FUNC_ATTR_NONNULL_ALL
{
+ ILOG("stream=%d", stream);
+ // LOG_CALLSTACK();
if (stream->uvstream) {
+ // problem: this schedules on the queue, but channel.c:receive_msgpack may
+ // be processed before close_cb is called by libuv.
uv_close((uv_handle_t *)stream->uvstream, close_cb);
} else {
uv_close((uv_handle_t *)&stream->uv.idle, close_cb);
@@ -105,6 +109,7 @@ void stream_close_handle(Stream *stream)
static void close_cb(uv_handle_t *handle)
{
Stream *stream = handle->data;
+ ILOG(">>>>>>>>>>>>>>>>>>>>>>> stream=%p stream->internal_close_cb=%p", stream, stream->internal_close_cb);
if (stream->buffer) {
rbuffer_free(stream->buffer);
}
diff --git a/src/nvim/msgpack_rpc/channel.c b/src/nvim/msgpack_rpc/channel.c
index 782eabe04e4a..dc2b794e366a 100644
--- a/src/nvim/msgpack_rpc/channel.c
+++ b/src/nvim/msgpack_rpc/channel.c
@@ -128,6 +128,8 @@ uint64_t channel_from_process(Process *proc, uint64_t id, char *source)
source);
incref(channel); // process channels are only closed by the exit_cb
channel->data.proc = proc;
+ channel->data.proc->stream_close_cb = close_cb2;
+ channel->data.proc->stream_close_data = channel;
wstream_init(proc->in, 0);
rstream_init(proc->out, 0);
@@ -387,17 +389,6 @@ static void receive_msgpack(Stream *stream, RBuffer *rbuf, size_t c,
goto end;
}
- if ((chan_wstream(channel) != NULL && chan_wstream(channel)->closed)
- || (chan_rstream(channel) != NULL && chan_rstream(channel)->closed)) {
- char buf[256];
- snprintf(buf, sizeof(buf),
- "ch %" PRIu64 ": stream closed unexpectedly. "
- "closing channel",
- channel->id);
- call_set_error(channel, buf, WARN_LOG_LEVEL);
- goto end;
- }
-
size_t count = rbuffer_size(rbuf);
DLOG("ch %" PRIu64 ": parsing %u bytes from msgpack Stream: %p",
channel->id, count, stream);
@@ -571,23 +562,6 @@ static Stream *chan_wstream(Channel *chan)
abort();
}
-/// Returns the Stream that a Channel reads from.
-static Stream *chan_rstream(Channel *chan)
-{
- switch (chan->type) {
- case kChannelTypeSocket:
- return &chan->data.stream;
- case kChannelTypeProc:
- return chan->data.proc->out;
- case kChannelTypeStdio:
- return &chan->data.std.in;
- case kChannelTypeInternal:
- return NULL;
- }
- abort();
-}
-
-
static bool channel_write(Channel *channel, WBuffer *buffer)
{
bool success = false;
@@ -799,6 +773,12 @@ static void close_cb(Stream *stream, void *data)
decref(data);
}
+static void close_cb2(Stream *stream, void *data)
+{
+ ILOG("close_cb2");
+ close_channel(data);
+}
+
/// @param source description of source function, rplugin name, TCP addr, etc
static Channel *register_channel(ChannelType type, uint64_t id,
MultiQueue *events, char *source)
2017-07-26 16:08:50 -07:00
|
|
|
end)
|
|
|
|
|
2014-10-08 09:56:01 -07:00
|
|
|
describe('simple call', function()
|
|
|
|
it('works', function()
|
|
|
|
local function on_setup()
|
|
|
|
eq({ 4, 5, 6 }, eval('rpcrequest(' .. cid .. ', "scall", 1, 2, 3)'))
|
|
|
|
stop()
|
|
|
|
end
|
|
|
|
|
|
|
|
local function on_request(method, args)
|
|
|
|
eq('scall', method)
|
|
|
|
eq({ 1, 2, 3 }, args)
|
2024-01-12 06:11:28 -07:00
|
|
|
command('let g:result = [4, 5, 6]')
|
2014-10-08 09:56:01 -07:00
|
|
|
return eval('g:result')
|
|
|
|
end
|
|
|
|
run(on_request, nil, on_setup)
|
|
|
|
end)
|
|
|
|
end)
|
|
|
|
|
2015-12-19 03:29:39 -07:00
|
|
|
describe('empty string handling in arrays', function()
|
|
|
|
-- Because the msgpack encoding for an empty string was interpreted as an
|
|
|
|
-- error, msgpack arrays with an empty string looked like
|
|
|
|
-- [..., '', 0, ..., 0] after the conversion, regardless of the array
|
|
|
|
-- elements following the empty string.
|
|
|
|
it('works', function()
|
|
|
|
local function on_setup()
|
|
|
|
eq({ 1, 2, '', 3, 'asdf' }, eval('rpcrequest(' .. cid .. ', "nstring")'))
|
|
|
|
stop()
|
|
|
|
end
|
|
|
|
|
2016-02-02 12:23:12 -07:00
|
|
|
local function on_request()
|
2015-12-19 03:29:39 -07:00
|
|
|
-- No need to evaluate the args, we are only interested in
|
|
|
|
-- a response that contains an array with an empty string.
|
|
|
|
return { 1, 2, '', 3, 'asdf' }
|
|
|
|
end
|
|
|
|
run(on_request, nil, on_setup)
|
|
|
|
end)
|
|
|
|
end)
|
|
|
|
|
2014-10-08 09:56:01 -07:00
|
|
|
describe('recursive call', function()
|
|
|
|
it('works', function()
|
|
|
|
local function on_setup()
|
2024-01-12 10:59:57 -07:00
|
|
|
api.nvim_set_var('result1', 0)
|
|
|
|
api.nvim_set_var('result2', 0)
|
|
|
|
api.nvim_set_var('result3', 0)
|
|
|
|
api.nvim_set_var('result4', 0)
|
2024-01-12 06:11:28 -07:00
|
|
|
command('let g:result1 = rpcrequest(' .. cid .. ', "rcall", 2)')
|
2024-01-12 10:59:57 -07:00
|
|
|
eq(4, api.nvim_get_var('result1'))
|
|
|
|
eq(8, api.nvim_get_var('result2'))
|
|
|
|
eq(16, api.nvim_get_var('result3'))
|
|
|
|
eq(32, api.nvim_get_var('result4'))
|
2014-10-08 09:56:01 -07:00
|
|
|
stop()
|
|
|
|
end
|
|
|
|
|
|
|
|
local function on_request(method, args)
|
|
|
|
eq('rcall', method)
|
2024-04-20 08:44:13 -07:00
|
|
|
local _n = unpack(args) * 2
|
|
|
|
if _n <= 16 then
|
2014-10-08 09:56:01 -07:00
|
|
|
local cmd
|
2024-04-20 08:44:13 -07:00
|
|
|
if _n == 4 then
|
|
|
|
cmd = 'let g:result2 = rpcrequest(' .. cid .. ', "rcall", ' .. _n .. ')'
|
|
|
|
elseif _n == 8 then
|
|
|
|
cmd = 'let g:result3 = rpcrequest(' .. cid .. ', "rcall", ' .. _n .. ')'
|
|
|
|
elseif _n == 16 then
|
|
|
|
cmd = 'let g:result4 = rpcrequest(' .. cid .. ', "rcall", ' .. _n .. ')'
|
2014-10-08 09:56:01 -07:00
|
|
|
end
|
2024-01-12 06:11:28 -07:00
|
|
|
command(cmd)
|
2014-10-08 09:56:01 -07:00
|
|
|
end
|
2024-04-20 08:44:13 -07:00
|
|
|
return _n
|
2014-10-08 09:56:01 -07:00
|
|
|
end
|
|
|
|
run(on_request, nil, on_setup)
|
|
|
|
end)
|
|
|
|
end)
|
2014-11-05 09:55:53 -07:00
|
|
|
|
|
|
|
describe('requests and notifications interleaved', function()
|
2017-10-28 19:06:53 -07:00
|
|
|
it('does not delay notifications during pending request', function()
|
|
|
|
local received = false
|
|
|
|
local function on_setup()
|
2024-01-12 10:59:57 -07:00
|
|
|
eq('retval', fn.rpcrequest(cid, 'doit'))
|
2017-10-28 19:06:53 -07:00
|
|
|
stop()
|
|
|
|
end
|
|
|
|
local function on_request(method)
|
|
|
|
if method == 'doit' then
|
2024-01-12 10:59:57 -07:00
|
|
|
fn.rpcnotify(cid, 'headsup')
|
2017-10-28 19:06:53 -07:00
|
|
|
eq(true, received)
|
|
|
|
return 'retval'
|
|
|
|
end
|
|
|
|
end
|
|
|
|
local function on_notification(method)
|
|
|
|
if method == 'headsup' then
|
|
|
|
received = true
|
|
|
|
end
|
|
|
|
end
|
|
|
|
run(on_request, on_notification, on_setup)
|
|
|
|
end)
|
|
|
|
|
|
|
|
-- This tests the following scenario:
|
2014-11-05 09:55:53 -07:00
|
|
|
--
|
|
|
|
-- server->client [request ] (1)
|
|
|
|
-- client->server [request ] (2) triggered by (1)
|
|
|
|
-- server->client [notification] (3) triggered by (2)
|
|
|
|
-- server->client [response ] (4) response to (2)
|
|
|
|
-- client->server [request ] (4) triggered by (3)
|
|
|
|
-- server->client [request ] (5) triggered by (4)
|
|
|
|
-- client->server [response ] (6) response to (1)
|
|
|
|
--
|
|
|
|
-- If the above scenario ever happens, the client connection will be closed
|
|
|
|
-- because (6) is returned after request (5) is sent, and nvim
|
|
|
|
-- only deals with one server->client request at a time. (In other words,
|
|
|
|
-- the client cannot send a response to a request that is not at the top
|
|
|
|
-- of nvim's request stack).
|
2017-10-28 19:06:53 -07:00
|
|
|
pending('will close connection if not properly synchronized', function()
|
2014-11-05 09:55:53 -07:00
|
|
|
local function on_setup()
|
|
|
|
eq('notified!', eval('rpcrequest(' .. cid .. ', "notify")'))
|
|
|
|
end
|
|
|
|
|
2015-11-17 15:31:22 -07:00
|
|
|
local function on_request(method)
|
2017-10-28 19:06:53 -07:00
|
|
|
if method == 'notify' then
|
|
|
|
eq(1, eval('rpcnotify(' .. cid .. ', "notification")'))
|
|
|
|
return 'notified!'
|
|
|
|
elseif method == 'nested' then
|
|
|
|
-- do some busywork, so the first request will return
|
|
|
|
-- before this one
|
|
|
|
for _ = 1, 5 do
|
2021-09-01 09:42:53 -07:00
|
|
|
assert_alive()
|
2017-10-28 19:06:53 -07:00
|
|
|
end
|
|
|
|
eq(1, eval('rpcnotify(' .. cid .. ', "nested_done")'))
|
|
|
|
return 'done!'
|
|
|
|
end
|
2014-11-05 09:55:53 -07:00
|
|
|
end
|
|
|
|
|
2015-11-17 15:31:22 -07:00
|
|
|
local function on_notification(method)
|
2017-10-28 19:06:53 -07:00
|
|
|
if method == 'notification' then
|
|
|
|
eq('done!', eval('rpcrequest(' .. cid .. ', "nested")'))
|
|
|
|
elseif method == 'nested_done' then
|
2022-06-30 04:16:46 -07:00
|
|
|
ok(false, 'never sent', 'sent')
|
2014-11-05 09:55:53 -07:00
|
|
|
end
|
|
|
|
end
|
|
|
|
|
|
|
|
run(on_request, on_notification, on_setup)
|
2017-10-28 19:06:53 -07:00
|
|
|
-- ignore disconnect failure, otherwise detected by after_each
|
|
|
|
clear()
|
2014-11-05 09:55:53 -07:00
|
|
|
end)
|
|
|
|
end)
|
2014-11-10 11:58:37 -07:00
|
|
|
|
rpc: close channel if stream was closed
f_jobstop()/f_rpcstop() .. process_stop() .. process_close_in(proc)
closes the write-stream of a RPC channel. But there might be
a pending RPC notification on the queue, which may get processed just
before the channel is closed.
To handle that case, check the Stream.closed in
channel.c:receive_msgpack().
Before this change, the above scenario could trigger
this assert(!stream->closed) in wstream_write():
0x00007f96e1cd3428 in __GI_raise (sig=sig@entry=6) at ../sysdeps/unix/sysv/linux/raise.c:54
0x00007f96e1cd502a in __GI_abort () at abort.c:89
0x00007f96e1ccbbd7 in __assert_fail_base (fmt=<optimized out>, assertion=assertion@entry=0x768f9b "!stream->closed",
file=file@entry=0x768f70 "../src/nvim/event/wstream.c", line=line@entry=77,
function=function@entry=0x768fb0 <__PRETTY_FUNCTION__.13735> "wstream_write") at assert.c:92
0x00007f96e1ccbc82 in __GI___assert_fail (assertion=0x768f9b "!stream->closed", file=0x768f70 "../src/nvim/event/wstream.c", line=77,
function=0x768fb0 <__PRETTY_FUNCTION__.13735> "wstream_write") at assert.c:101
0x00000000004d2c1f in wstream_write (stream=0x7f96e0a35078, buffer=0x7f96e09f9b40) at ../src/nvim/event/wstream.c:77
0x00000000005857b2 in channel_write (channel=0x7f96e0ae5800, buffer=0x7f96e09f9b40) at ../src/nvim/msgpack_rpc/channel.c:551
0x000000000058567d in on_request_event (argv=0x7ffed792efa0) at ../src/nvim/msgpack_rpc/channel.c:523
0x00000000005854c8 in handle_request (channel=0x7f96e0ae5800, request=0x7ffed792f1b8) at ../src/nvim/msgpack_rpc/channel.c:503
0x00000000005850cb in parse_msgpack (channel=0x7f96e0ae5800) at ../src/nvim/msgpack_rpc/channel.c:423
0x0000000000584f90 in receive_msgpack (stream=0x7f96e0a35218, rbuf=0x7f96e0d1d4c0, c=22, data=0x7f96e0ae5800, eof=false)
at ../src/nvim/msgpack_rpc/channel.c:389
0x00000000004d0b20 in read_event (argv=0x7ffed792f4a8) at ../src/nvim/event/rstream.c:190
0x00000000004ce462 in multiqueue_process_events (this=0x7f96e18172d0) at ../src/nvim/event/multiqueue.c:150
0x000000000059b630 in nv_event (cap=0x7ffed792f620) at ../src/nvim/normal.c:7908
0x000000000058be69 in normal_execute (state=0x7ffed792f580, key=-25341) at ../src/nvim/normal.c:1137
0x0000000000652463 in state_enter (s=0x7ffed792f580) at ../src/nvim/state.c:61
0x000000000058a1fe in normal_enter (cmdwin=false, noexmode=false) at ../src/nvim/normal.c:467
0x00000000005500c2 in main (argc=2, argv=0x7ffed792f8d8) at ../src/nvim/main.c:554
Alternative approach suggested by bfredl is to use close_cb of the
process. My unsuccessful attempt is below. (It seems close_cb is queued
too late, which is the similar problem addressed by this commit):
commit 75fc12c6ab15711bdb7b18c6d42ec9d157f5145e
Author: Justin M. Keyes <justinkz@gmail.com>
Date: Fri Aug 18 01:30:41 2017 +0200
rpc: use Stream's close_cb instead of explicit check in receive_msgpack()
diff --git a/src/nvim/event/process.c b/src/nvim/event/process.c
index 8371d3cd482e..e52da23cdc40 100644
--- a/src/nvim/event/process.c
+++ b/src/nvim/event/process.c
@@ -416,6 +416,10 @@ static void on_process_exit(Process *proc)
static void on_process_stream_close(Stream *stream, void *data)
{
Process *proc = data;
+ ILOG("on_process_stream_close");
+ if (proc->stream_close_cb != NULL) {
+ proc->stream_close_cb(stream, proc->stream_close_data);
+ }
decref(proc);
}
diff --git a/src/nvim/event/process.h b/src/nvim/event/process.h
index 5c00e8e7ecd5..34a8d54f6f8c 100644
--- a/src/nvim/event/process.h
+++ b/src/nvim/event/process.h
@@ -26,6 +26,11 @@ struct process {
Stream *in, *out, *err;
process_exit_cb cb;
internal_process_cb internal_exit_cb, internal_close_cb;
+
+ // Called when any of the process streams (in/out/err) closes.
+ stream_close_cb stream_close_cb;
+ void *stream_close_data;
+
bool closed, detach;
MultiQueue *events;
};
@@ -50,6 +55,8 @@ static inline Process process_init(Loop *loop, ProcessType type, void *data)
.closed = false,
.internal_close_cb = NULL,
.internal_exit_cb = NULL,
+ .stream_close_cb = NULL,
+ .stream_close_data = NULL,
.detach = false
};
}
diff --git a/src/nvim/event/stream.c b/src/nvim/event/stream.c
index 7c865bfe1e8c..c8720d1e45d9 100644
--- a/src/nvim/event/stream.c
+++ b/src/nvim/event/stream.c
@@ -95,7 +95,11 @@ void stream_close(Stream *stream, stream_close_cb on_stream_close, void *data)
void stream_close_handle(Stream *stream)
FUNC_ATTR_NONNULL_ALL
{
+ ILOG("stream=%d", stream);
+ // LOG_CALLSTACK();
if (stream->uvstream) {
+ // problem: this schedules on the queue, but channel.c:receive_msgpack may
+ // be processed before close_cb is called by libuv.
uv_close((uv_handle_t *)stream->uvstream, close_cb);
} else {
uv_close((uv_handle_t *)&stream->uv.idle, close_cb);
@@ -105,6 +109,7 @@ void stream_close_handle(Stream *stream)
static void close_cb(uv_handle_t *handle)
{
Stream *stream = handle->data;
+ ILOG(">>>>>>>>>>>>>>>>>>>>>>> stream=%p stream->internal_close_cb=%p", stream, stream->internal_close_cb);
if (stream->buffer) {
rbuffer_free(stream->buffer);
}
diff --git a/src/nvim/msgpack_rpc/channel.c b/src/nvim/msgpack_rpc/channel.c
index 782eabe04e4a..dc2b794e366a 100644
--- a/src/nvim/msgpack_rpc/channel.c
+++ b/src/nvim/msgpack_rpc/channel.c
@@ -128,6 +128,8 @@ uint64_t channel_from_process(Process *proc, uint64_t id, char *source)
source);
incref(channel); // process channels are only closed by the exit_cb
channel->data.proc = proc;
+ channel->data.proc->stream_close_cb = close_cb2;
+ channel->data.proc->stream_close_data = channel;
wstream_init(proc->in, 0);
rstream_init(proc->out, 0);
@@ -387,17 +389,6 @@ static void receive_msgpack(Stream *stream, RBuffer *rbuf, size_t c,
goto end;
}
- if ((chan_wstream(channel) != NULL && chan_wstream(channel)->closed)
- || (chan_rstream(channel) != NULL && chan_rstream(channel)->closed)) {
- char buf[256];
- snprintf(buf, sizeof(buf),
- "ch %" PRIu64 ": stream closed unexpectedly. "
- "closing channel",
- channel->id);
- call_set_error(channel, buf, WARN_LOG_LEVEL);
- goto end;
- }
-
size_t count = rbuffer_size(rbuf);
DLOG("ch %" PRIu64 ": parsing %u bytes from msgpack Stream: %p",
channel->id, count, stream);
@@ -571,23 +562,6 @@ static Stream *chan_wstream(Channel *chan)
abort();
}
-/// Returns the Stream that a Channel reads from.
-static Stream *chan_rstream(Channel *chan)
-{
- switch (chan->type) {
- case kChannelTypeSocket:
- return &chan->data.stream;
- case kChannelTypeProc:
- return chan->data.proc->out;
- case kChannelTypeStdio:
- return &chan->data.std.in;
- case kChannelTypeInternal:
- return NULL;
- }
- abort();
-}
-
-
static bool channel_write(Channel *channel, WBuffer *buffer)
{
bool success = false;
@@ -799,6 +773,12 @@ static void close_cb(Stream *stream, void *data)
decref(data);
}
+static void close_cb2(Stream *stream, void *data)
+{
+ ILOG("close_cb2");
+ close_channel(data);
+}
+
/// @param source description of source function, rplugin name, TCP addr, etc
static Channel *register_channel(ChannelType type, uint64_t id,
MultiQueue *events, char *source)
2017-07-26 16:08:50 -07:00
|
|
|
describe('recursive (child) nvim client', function()
|
2014-11-10 11:58:37 -07:00
|
|
|
before_each(function()
|
2018-09-20 10:19:38 -07:00
|
|
|
command(
|
|
|
|
"let vim = rpcstart('"
|
|
|
|
.. nvim_prog
|
|
|
|
.. "', ['-u', 'NONE', '-i', 'NONE', '--cmd', 'set noswapfile', '--embed', '--headless'])"
|
|
|
|
)
|
2014-11-10 11:58:37 -07:00
|
|
|
neq(0, eval('vim'))
|
|
|
|
end)
|
|
|
|
|
2016-05-12 13:25:15 -07:00
|
|
|
after_each(function()
|
|
|
|
command('call rpcstop(vim)')
|
|
|
|
end)
|
2014-11-10 11:58:37 -07:00
|
|
|
|
rpc: close channel if stream was closed
f_jobstop()/f_rpcstop() .. process_stop() .. process_close_in(proc)
closes the write-stream of a RPC channel. But there might be
a pending RPC notification on the queue, which may get processed just
before the channel is closed.
To handle that case, check the Stream.closed in
channel.c:receive_msgpack().
Before this change, the above scenario could trigger
this assert(!stream->closed) in wstream_write():
0x00007f96e1cd3428 in __GI_raise (sig=sig@entry=6) at ../sysdeps/unix/sysv/linux/raise.c:54
0x00007f96e1cd502a in __GI_abort () at abort.c:89
0x00007f96e1ccbbd7 in __assert_fail_base (fmt=<optimized out>, assertion=assertion@entry=0x768f9b "!stream->closed",
file=file@entry=0x768f70 "../src/nvim/event/wstream.c", line=line@entry=77,
function=function@entry=0x768fb0 <__PRETTY_FUNCTION__.13735> "wstream_write") at assert.c:92
0x00007f96e1ccbc82 in __GI___assert_fail (assertion=0x768f9b "!stream->closed", file=0x768f70 "../src/nvim/event/wstream.c", line=77,
function=0x768fb0 <__PRETTY_FUNCTION__.13735> "wstream_write") at assert.c:101
0x00000000004d2c1f in wstream_write (stream=0x7f96e0a35078, buffer=0x7f96e09f9b40) at ../src/nvim/event/wstream.c:77
0x00000000005857b2 in channel_write (channel=0x7f96e0ae5800, buffer=0x7f96e09f9b40) at ../src/nvim/msgpack_rpc/channel.c:551
0x000000000058567d in on_request_event (argv=0x7ffed792efa0) at ../src/nvim/msgpack_rpc/channel.c:523
0x00000000005854c8 in handle_request (channel=0x7f96e0ae5800, request=0x7ffed792f1b8) at ../src/nvim/msgpack_rpc/channel.c:503
0x00000000005850cb in parse_msgpack (channel=0x7f96e0ae5800) at ../src/nvim/msgpack_rpc/channel.c:423
0x0000000000584f90 in receive_msgpack (stream=0x7f96e0a35218, rbuf=0x7f96e0d1d4c0, c=22, data=0x7f96e0ae5800, eof=false)
at ../src/nvim/msgpack_rpc/channel.c:389
0x00000000004d0b20 in read_event (argv=0x7ffed792f4a8) at ../src/nvim/event/rstream.c:190
0x00000000004ce462 in multiqueue_process_events (this=0x7f96e18172d0) at ../src/nvim/event/multiqueue.c:150
0x000000000059b630 in nv_event (cap=0x7ffed792f620) at ../src/nvim/normal.c:7908
0x000000000058be69 in normal_execute (state=0x7ffed792f580, key=-25341) at ../src/nvim/normal.c:1137
0x0000000000652463 in state_enter (s=0x7ffed792f580) at ../src/nvim/state.c:61
0x000000000058a1fe in normal_enter (cmdwin=false, noexmode=false) at ../src/nvim/normal.c:467
0x00000000005500c2 in main (argc=2, argv=0x7ffed792f8d8) at ../src/nvim/main.c:554
Alternative approach suggested by bfredl is to use close_cb of the
process. My unsuccessful attempt is below. (It seems close_cb is queued
too late, which is the similar problem addressed by this commit):
commit 75fc12c6ab15711bdb7b18c6d42ec9d157f5145e
Author: Justin M. Keyes <justinkz@gmail.com>
Date: Fri Aug 18 01:30:41 2017 +0200
rpc: use Stream's close_cb instead of explicit check in receive_msgpack()
diff --git a/src/nvim/event/process.c b/src/nvim/event/process.c
index 8371d3cd482e..e52da23cdc40 100644
--- a/src/nvim/event/process.c
+++ b/src/nvim/event/process.c
@@ -416,6 +416,10 @@ static void on_process_exit(Process *proc)
static void on_process_stream_close(Stream *stream, void *data)
{
Process *proc = data;
+ ILOG("on_process_stream_close");
+ if (proc->stream_close_cb != NULL) {
+ proc->stream_close_cb(stream, proc->stream_close_data);
+ }
decref(proc);
}
diff --git a/src/nvim/event/process.h b/src/nvim/event/process.h
index 5c00e8e7ecd5..34a8d54f6f8c 100644
--- a/src/nvim/event/process.h
+++ b/src/nvim/event/process.h
@@ -26,6 +26,11 @@ struct process {
Stream *in, *out, *err;
process_exit_cb cb;
internal_process_cb internal_exit_cb, internal_close_cb;
+
+ // Called when any of the process streams (in/out/err) closes.
+ stream_close_cb stream_close_cb;
+ void *stream_close_data;
+
bool closed, detach;
MultiQueue *events;
};
@@ -50,6 +55,8 @@ static inline Process process_init(Loop *loop, ProcessType type, void *data)
.closed = false,
.internal_close_cb = NULL,
.internal_exit_cb = NULL,
+ .stream_close_cb = NULL,
+ .stream_close_data = NULL,
.detach = false
};
}
diff --git a/src/nvim/event/stream.c b/src/nvim/event/stream.c
index 7c865bfe1e8c..c8720d1e45d9 100644
--- a/src/nvim/event/stream.c
+++ b/src/nvim/event/stream.c
@@ -95,7 +95,11 @@ void stream_close(Stream *stream, stream_close_cb on_stream_close, void *data)
void stream_close_handle(Stream *stream)
FUNC_ATTR_NONNULL_ALL
{
+ ILOG("stream=%d", stream);
+ // LOG_CALLSTACK();
if (stream->uvstream) {
+ // problem: this schedules on the queue, but channel.c:receive_msgpack may
+ // be processed before close_cb is called by libuv.
uv_close((uv_handle_t *)stream->uvstream, close_cb);
} else {
uv_close((uv_handle_t *)&stream->uv.idle, close_cb);
@@ -105,6 +109,7 @@ void stream_close_handle(Stream *stream)
static void close_cb(uv_handle_t *handle)
{
Stream *stream = handle->data;
+ ILOG(">>>>>>>>>>>>>>>>>>>>>>> stream=%p stream->internal_close_cb=%p", stream, stream->internal_close_cb);
if (stream->buffer) {
rbuffer_free(stream->buffer);
}
diff --git a/src/nvim/msgpack_rpc/channel.c b/src/nvim/msgpack_rpc/channel.c
index 782eabe04e4a..dc2b794e366a 100644
--- a/src/nvim/msgpack_rpc/channel.c
+++ b/src/nvim/msgpack_rpc/channel.c
@@ -128,6 +128,8 @@ uint64_t channel_from_process(Process *proc, uint64_t id, char *source)
source);
incref(channel); // process channels are only closed by the exit_cb
channel->data.proc = proc;
+ channel->data.proc->stream_close_cb = close_cb2;
+ channel->data.proc->stream_close_data = channel;
wstream_init(proc->in, 0);
rstream_init(proc->out, 0);
@@ -387,17 +389,6 @@ static void receive_msgpack(Stream *stream, RBuffer *rbuf, size_t c,
goto end;
}
- if ((chan_wstream(channel) != NULL && chan_wstream(channel)->closed)
- || (chan_rstream(channel) != NULL && chan_rstream(channel)->closed)) {
- char buf[256];
- snprintf(buf, sizeof(buf),
- "ch %" PRIu64 ": stream closed unexpectedly. "
- "closing channel",
- channel->id);
- call_set_error(channel, buf, WARN_LOG_LEVEL);
- goto end;
- }
-
size_t count = rbuffer_size(rbuf);
DLOG("ch %" PRIu64 ": parsing %u bytes from msgpack Stream: %p",
channel->id, count, stream);
@@ -571,23 +562,6 @@ static Stream *chan_wstream(Channel *chan)
abort();
}
-/// Returns the Stream that a Channel reads from.
-static Stream *chan_rstream(Channel *chan)
-{
- switch (chan->type) {
- case kChannelTypeSocket:
- return &chan->data.stream;
- case kChannelTypeProc:
- return chan->data.proc->out;
- case kChannelTypeStdio:
- return &chan->data.std.in;
- case kChannelTypeInternal:
- return NULL;
- }
- abort();
-}
-
-
static bool channel_write(Channel *channel, WBuffer *buffer)
{
bool success = false;
@@ -799,6 +773,12 @@ static void close_cb(Stream *stream, void *data)
decref(data);
}
+static void close_cb2(Stream *stream, void *data)
+{
+ ILOG("close_cb2");
+ close_channel(data);
+}
+
/// @param source description of source function, rplugin name, TCP addr, etc
static Channel *register_channel(ChannelType type, uint64_t id,
MultiQueue *events, char *source)
2017-07-26 16:08:50 -07:00
|
|
|
it('can send/receive notifications and make requests', function()
|
2024-01-12 06:11:28 -07:00
|
|
|
command("call rpcnotify(vim, 'vim_set_current_line', 'SOME TEXT')")
|
2014-11-10 11:58:37 -07:00
|
|
|
|
|
|
|
-- Wait for the notification to complete.
|
2024-01-12 06:11:28 -07:00
|
|
|
command("call rpcrequest(vim, 'vim_eval', '0')")
|
2014-11-10 11:58:37 -07:00
|
|
|
|
|
|
|
eq('SOME TEXT', eval("rpcrequest(vim, 'vim_get_current_line')"))
|
|
|
|
end)
|
|
|
|
|
|
|
|
it('can communicate buffers, tabpages, and windows', function()
|
2016-09-16 21:30:36 -07:00
|
|
|
eq({ 1 }, eval("rpcrequest(vim, 'nvim_list_tabpages')"))
|
2023-11-29 05:32:40 -07:00
|
|
|
-- Window IDs start at 1000 (LOWEST_WIN_ID in window.h)
|
2016-10-07 08:05:59 -07:00
|
|
|
eq({ 1000 }, eval("rpcrequest(vim, 'nvim_list_wins')"))
|
2014-11-10 11:58:37 -07:00
|
|
|
|
2016-09-16 21:30:36 -07:00
|
|
|
local buf = eval("rpcrequest(vim, 'nvim_list_bufs')")[1]
|
2016-06-19 12:06:03 -07:00
|
|
|
eq(1, buf)
|
2014-11-10 11:58:37 -07:00
|
|
|
|
|
|
|
eval("rpcnotify(vim, 'buffer_set_line', " .. buf .. ", 0, 'SOME TEXT')")
|
2024-01-12 06:11:28 -07:00
|
|
|
command("call rpcrequest(vim, 'vim_eval', '0')") -- wait
|
2014-11-10 11:58:37 -07:00
|
|
|
|
|
|
|
eq('SOME TEXT', eval("rpcrequest(vim, 'buffer_get_line', " .. buf .. ', 0)'))
|
|
|
|
|
2016-03-12 10:49:18 -07:00
|
|
|
-- Call get_lines(buf, range [0,0], strict_indexing)
|
|
|
|
eq({ 'SOME TEXT' }, eval("rpcrequest(vim, 'buffer_get_lines', " .. buf .. ', 0, 1, 1)'))
|
2014-11-10 11:58:37 -07:00
|
|
|
end)
|
2015-04-06 10:37:09 -07:00
|
|
|
|
|
|
|
it('returns an error if the request failed', function()
|
2019-09-03 13:51:45 -07:00
|
|
|
eq(
|
|
|
|
"Vim:Error invoking 'does-not-exist' on channel 3:\nInvalid method: does-not-exist",
|
|
|
|
pcall_err(eval, "rpcrequest(vim, 'does-not-exist')")
|
2024-01-02 18:09:18 -07:00
|
|
|
)
|
2015-04-06 10:37:09 -07:00
|
|
|
end)
|
2014-11-10 11:58:37 -07:00
|
|
|
end)
|
2016-05-12 13:25:15 -07:00
|
|
|
|
rpc: close channel if stream was closed
f_jobstop()/f_rpcstop() .. process_stop() .. process_close_in(proc)
closes the write-stream of a RPC channel. But there might be
a pending RPC notification on the queue, which may get processed just
before the channel is closed.
To handle that case, check the Stream.closed in
channel.c:receive_msgpack().
Before this change, the above scenario could trigger
this assert(!stream->closed) in wstream_write():
0x00007f96e1cd3428 in __GI_raise (sig=sig@entry=6) at ../sysdeps/unix/sysv/linux/raise.c:54
0x00007f96e1cd502a in __GI_abort () at abort.c:89
0x00007f96e1ccbbd7 in __assert_fail_base (fmt=<optimized out>, assertion=assertion@entry=0x768f9b "!stream->closed",
file=file@entry=0x768f70 "../src/nvim/event/wstream.c", line=line@entry=77,
function=function@entry=0x768fb0 <__PRETTY_FUNCTION__.13735> "wstream_write") at assert.c:92
0x00007f96e1ccbc82 in __GI___assert_fail (assertion=0x768f9b "!stream->closed", file=0x768f70 "../src/nvim/event/wstream.c", line=77,
function=0x768fb0 <__PRETTY_FUNCTION__.13735> "wstream_write") at assert.c:101
0x00000000004d2c1f in wstream_write (stream=0x7f96e0a35078, buffer=0x7f96e09f9b40) at ../src/nvim/event/wstream.c:77
0x00000000005857b2 in channel_write (channel=0x7f96e0ae5800, buffer=0x7f96e09f9b40) at ../src/nvim/msgpack_rpc/channel.c:551
0x000000000058567d in on_request_event (argv=0x7ffed792efa0) at ../src/nvim/msgpack_rpc/channel.c:523
0x00000000005854c8 in handle_request (channel=0x7f96e0ae5800, request=0x7ffed792f1b8) at ../src/nvim/msgpack_rpc/channel.c:503
0x00000000005850cb in parse_msgpack (channel=0x7f96e0ae5800) at ../src/nvim/msgpack_rpc/channel.c:423
0x0000000000584f90 in receive_msgpack (stream=0x7f96e0a35218, rbuf=0x7f96e0d1d4c0, c=22, data=0x7f96e0ae5800, eof=false)
at ../src/nvim/msgpack_rpc/channel.c:389
0x00000000004d0b20 in read_event (argv=0x7ffed792f4a8) at ../src/nvim/event/rstream.c:190
0x00000000004ce462 in multiqueue_process_events (this=0x7f96e18172d0) at ../src/nvim/event/multiqueue.c:150
0x000000000059b630 in nv_event (cap=0x7ffed792f620) at ../src/nvim/normal.c:7908
0x000000000058be69 in normal_execute (state=0x7ffed792f580, key=-25341) at ../src/nvim/normal.c:1137
0x0000000000652463 in state_enter (s=0x7ffed792f580) at ../src/nvim/state.c:61
0x000000000058a1fe in normal_enter (cmdwin=false, noexmode=false) at ../src/nvim/normal.c:467
0x00000000005500c2 in main (argc=2, argv=0x7ffed792f8d8) at ../src/nvim/main.c:554
Alternative approach suggested by bfredl is to use close_cb of the
process. My unsuccessful attempt is below. (It seems close_cb is queued
too late, which is the similar problem addressed by this commit):
commit 75fc12c6ab15711bdb7b18c6d42ec9d157f5145e
Author: Justin M. Keyes <justinkz@gmail.com>
Date: Fri Aug 18 01:30:41 2017 +0200
rpc: use Stream's close_cb instead of explicit check in receive_msgpack()
diff --git a/src/nvim/event/process.c b/src/nvim/event/process.c
index 8371d3cd482e..e52da23cdc40 100644
--- a/src/nvim/event/process.c
+++ b/src/nvim/event/process.c
@@ -416,6 +416,10 @@ static void on_process_exit(Process *proc)
static void on_process_stream_close(Stream *stream, void *data)
{
Process *proc = data;
+ ILOG("on_process_stream_close");
+ if (proc->stream_close_cb != NULL) {
+ proc->stream_close_cb(stream, proc->stream_close_data);
+ }
decref(proc);
}
diff --git a/src/nvim/event/process.h b/src/nvim/event/process.h
index 5c00e8e7ecd5..34a8d54f6f8c 100644
--- a/src/nvim/event/process.h
+++ b/src/nvim/event/process.h
@@ -26,6 +26,11 @@ struct process {
Stream *in, *out, *err;
process_exit_cb cb;
internal_process_cb internal_exit_cb, internal_close_cb;
+
+ // Called when any of the process streams (in/out/err) closes.
+ stream_close_cb stream_close_cb;
+ void *stream_close_data;
+
bool closed, detach;
MultiQueue *events;
};
@@ -50,6 +55,8 @@ static inline Process process_init(Loop *loop, ProcessType type, void *data)
.closed = false,
.internal_close_cb = NULL,
.internal_exit_cb = NULL,
+ .stream_close_cb = NULL,
+ .stream_close_data = NULL,
.detach = false
};
}
diff --git a/src/nvim/event/stream.c b/src/nvim/event/stream.c
index 7c865bfe1e8c..c8720d1e45d9 100644
--- a/src/nvim/event/stream.c
+++ b/src/nvim/event/stream.c
@@ -95,7 +95,11 @@ void stream_close(Stream *stream, stream_close_cb on_stream_close, void *data)
void stream_close_handle(Stream *stream)
FUNC_ATTR_NONNULL_ALL
{
+ ILOG("stream=%d", stream);
+ // LOG_CALLSTACK();
if (stream->uvstream) {
+ // problem: this schedules on the queue, but channel.c:receive_msgpack may
+ // be processed before close_cb is called by libuv.
uv_close((uv_handle_t *)stream->uvstream, close_cb);
} else {
uv_close((uv_handle_t *)&stream->uv.idle, close_cb);
@@ -105,6 +109,7 @@ void stream_close_handle(Stream *stream)
static void close_cb(uv_handle_t *handle)
{
Stream *stream = handle->data;
+ ILOG(">>>>>>>>>>>>>>>>>>>>>>> stream=%p stream->internal_close_cb=%p", stream, stream->internal_close_cb);
if (stream->buffer) {
rbuffer_free(stream->buffer);
}
diff --git a/src/nvim/msgpack_rpc/channel.c b/src/nvim/msgpack_rpc/channel.c
index 782eabe04e4a..dc2b794e366a 100644
--- a/src/nvim/msgpack_rpc/channel.c
+++ b/src/nvim/msgpack_rpc/channel.c
@@ -128,6 +128,8 @@ uint64_t channel_from_process(Process *proc, uint64_t id, char *source)
source);
incref(channel); // process channels are only closed by the exit_cb
channel->data.proc = proc;
+ channel->data.proc->stream_close_cb = close_cb2;
+ channel->data.proc->stream_close_data = channel;
wstream_init(proc->in, 0);
rstream_init(proc->out, 0);
@@ -387,17 +389,6 @@ static void receive_msgpack(Stream *stream, RBuffer *rbuf, size_t c,
goto end;
}
- if ((chan_wstream(channel) != NULL && chan_wstream(channel)->closed)
- || (chan_rstream(channel) != NULL && chan_rstream(channel)->closed)) {
- char buf[256];
- snprintf(buf, sizeof(buf),
- "ch %" PRIu64 ": stream closed unexpectedly. "
- "closing channel",
- channel->id);
- call_set_error(channel, buf, WARN_LOG_LEVEL);
- goto end;
- }
-
size_t count = rbuffer_size(rbuf);
DLOG("ch %" PRIu64 ": parsing %u bytes from msgpack Stream: %p",
channel->id, count, stream);
@@ -571,23 +562,6 @@ static Stream *chan_wstream(Channel *chan)
abort();
}
-/// Returns the Stream that a Channel reads from.
-static Stream *chan_rstream(Channel *chan)
-{
- switch (chan->type) {
- case kChannelTypeSocket:
- return &chan->data.stream;
- case kChannelTypeProc:
- return chan->data.proc->out;
- case kChannelTypeStdio:
- return &chan->data.std.in;
- case kChannelTypeInternal:
- return NULL;
- }
- abort();
-}
-
-
static bool channel_write(Channel *channel, WBuffer *buffer)
{
bool success = false;
@@ -799,6 +773,12 @@ static void close_cb(Stream *stream, void *data)
decref(data);
}
+static void close_cb2(Stream *stream, void *data)
+{
+ ILOG("close_cb2");
+ close_channel(data);
+}
+
/// @param source description of source function, rplugin name, TCP addr, etc
static Channel *register_channel(ChannelType type, uint64_t id,
MultiQueue *events, char *source)
2017-07-26 16:08:50 -07:00
|
|
|
describe('jobstart()', function()
|
2016-05-12 13:25:15 -07:00
|
|
|
local jobid
|
|
|
|
before_each(function()
|
2024-02-12 06:50:39 -07:00
|
|
|
local channel = api.nvim_get_chan_info(0).id
|
2024-01-12 10:59:57 -07:00
|
|
|
api.nvim_set_var('channel', channel)
|
2016-05-12 13:25:15 -07:00
|
|
|
source([[
|
|
|
|
function! s:OnEvent(id, data, event)
|
|
|
|
call rpcnotify(g:channel, a:event, 0, a:data)
|
|
|
|
endfunction
|
|
|
|
let g:job_opts = {
|
|
|
|
\ 'on_stderr': function('s:OnEvent'),
|
|
|
|
\ 'on_exit': function('s:OnEvent'),
|
|
|
|
\ 'user': 0,
|
|
|
|
\ 'rpc': v:true
|
|
|
|
\ }
|
|
|
|
]])
|
2024-01-12 10:59:57 -07:00
|
|
|
api.nvim_set_var('args', {
|
2023-02-01 04:54:22 -07:00
|
|
|
nvim_prog,
|
|
|
|
'-ll',
|
2019-09-16 10:16:39 -07:00
|
|
|
'test/functional/api/rpc_fixture.lua',
|
|
|
|
package.path,
|
|
|
|
package.cpath,
|
|
|
|
})
|
2016-05-12 13:25:15 -07:00
|
|
|
jobid = eval('jobstart(g:args, g:job_opts)')
|
2019-09-16 10:16:39 -07:00
|
|
|
neq(0, jobid)
|
2016-05-12 13:25:15 -07:00
|
|
|
end)
|
|
|
|
|
|
|
|
after_each(function()
|
2024-01-12 10:59:57 -07:00
|
|
|
pcall(fn.jobstop, jobid)
|
2016-05-12 13:25:15 -07:00
|
|
|
end)
|
|
|
|
|
2024-04-08 02:03:20 -07:00
|
|
|
if t.skip(t.is_os('win')) then
|
2022-11-21 17:13:30 -07:00
|
|
|
return
|
|
|
|
end
|
2017-01-11 21:08:19 -07:00
|
|
|
|
2016-05-12 13:25:15 -07:00
|
|
|
it('rpc and text stderr can be combined', function()
|
2024-01-12 10:59:57 -07:00
|
|
|
local status, rv = pcall(fn.rpcrequest, jobid, 'poll')
|
2019-09-16 10:16:39 -07:00
|
|
|
if not status then
|
|
|
|
error(string.format('missing nvim Lua module? (%s)', rv))
|
|
|
|
end
|
|
|
|
eq('ok', rv)
|
2024-01-12 10:59:57 -07:00
|
|
|
fn.rpcnotify(jobid, 'ping')
|
2018-03-08 16:24:38 -07:00
|
|
|
eq({ 'notification', 'pong', {} }, next_msg())
|
2024-01-12 10:59:57 -07:00
|
|
|
eq('done!', fn.rpcrequest(jobid, 'write_stderr', 'fluff\n'))
|
2018-03-08 16:24:38 -07:00
|
|
|
eq({ 'notification', 'stderr', { 0, { 'fluff', '' } } }, next_msg())
|
2024-01-12 10:59:57 -07:00
|
|
|
pcall(fn.rpcrequest, jobid, 'exit')
|
2018-03-08 16:24:38 -07:00
|
|
|
eq({ 'notification', 'stderr', { 0, { '' } } }, next_msg())
|
|
|
|
eq({ 'notification', 'exit', { 0, 0 } }, next_msg())
|
2016-05-12 13:25:15 -07:00
|
|
|
end)
|
|
|
|
end)
|
|
|
|
|
rpc: close channel if stream was closed
f_jobstop()/f_rpcstop() .. process_stop() .. process_close_in(proc)
closes the write-stream of a RPC channel. But there might be
a pending RPC notification on the queue, which may get processed just
before the channel is closed.
To handle that case, check the Stream.closed in
channel.c:receive_msgpack().
Before this change, the above scenario could trigger
this assert(!stream->closed) in wstream_write():
0x00007f96e1cd3428 in __GI_raise (sig=sig@entry=6) at ../sysdeps/unix/sysv/linux/raise.c:54
0x00007f96e1cd502a in __GI_abort () at abort.c:89
0x00007f96e1ccbbd7 in __assert_fail_base (fmt=<optimized out>, assertion=assertion@entry=0x768f9b "!stream->closed",
file=file@entry=0x768f70 "../src/nvim/event/wstream.c", line=line@entry=77,
function=function@entry=0x768fb0 <__PRETTY_FUNCTION__.13735> "wstream_write") at assert.c:92
0x00007f96e1ccbc82 in __GI___assert_fail (assertion=0x768f9b "!stream->closed", file=0x768f70 "../src/nvim/event/wstream.c", line=77,
function=0x768fb0 <__PRETTY_FUNCTION__.13735> "wstream_write") at assert.c:101
0x00000000004d2c1f in wstream_write (stream=0x7f96e0a35078, buffer=0x7f96e09f9b40) at ../src/nvim/event/wstream.c:77
0x00000000005857b2 in channel_write (channel=0x7f96e0ae5800, buffer=0x7f96e09f9b40) at ../src/nvim/msgpack_rpc/channel.c:551
0x000000000058567d in on_request_event (argv=0x7ffed792efa0) at ../src/nvim/msgpack_rpc/channel.c:523
0x00000000005854c8 in handle_request (channel=0x7f96e0ae5800, request=0x7ffed792f1b8) at ../src/nvim/msgpack_rpc/channel.c:503
0x00000000005850cb in parse_msgpack (channel=0x7f96e0ae5800) at ../src/nvim/msgpack_rpc/channel.c:423
0x0000000000584f90 in receive_msgpack (stream=0x7f96e0a35218, rbuf=0x7f96e0d1d4c0, c=22, data=0x7f96e0ae5800, eof=false)
at ../src/nvim/msgpack_rpc/channel.c:389
0x00000000004d0b20 in read_event (argv=0x7ffed792f4a8) at ../src/nvim/event/rstream.c:190
0x00000000004ce462 in multiqueue_process_events (this=0x7f96e18172d0) at ../src/nvim/event/multiqueue.c:150
0x000000000059b630 in nv_event (cap=0x7ffed792f620) at ../src/nvim/normal.c:7908
0x000000000058be69 in normal_execute (state=0x7ffed792f580, key=-25341) at ../src/nvim/normal.c:1137
0x0000000000652463 in state_enter (s=0x7ffed792f580) at ../src/nvim/state.c:61
0x000000000058a1fe in normal_enter (cmdwin=false, noexmode=false) at ../src/nvim/normal.c:467
0x00000000005500c2 in main (argc=2, argv=0x7ffed792f8d8) at ../src/nvim/main.c:554
Alternative approach suggested by bfredl is to use close_cb of the
process. My unsuccessful attempt is below. (It seems close_cb is queued
too late, which is the similar problem addressed by this commit):
commit 75fc12c6ab15711bdb7b18c6d42ec9d157f5145e
Author: Justin M. Keyes <justinkz@gmail.com>
Date: Fri Aug 18 01:30:41 2017 +0200
rpc: use Stream's close_cb instead of explicit check in receive_msgpack()
diff --git a/src/nvim/event/process.c b/src/nvim/event/process.c
index 8371d3cd482e..e52da23cdc40 100644
--- a/src/nvim/event/process.c
+++ b/src/nvim/event/process.c
@@ -416,6 +416,10 @@ static void on_process_exit(Process *proc)
static void on_process_stream_close(Stream *stream, void *data)
{
Process *proc = data;
+ ILOG("on_process_stream_close");
+ if (proc->stream_close_cb != NULL) {
+ proc->stream_close_cb(stream, proc->stream_close_data);
+ }
decref(proc);
}
diff --git a/src/nvim/event/process.h b/src/nvim/event/process.h
index 5c00e8e7ecd5..34a8d54f6f8c 100644
--- a/src/nvim/event/process.h
+++ b/src/nvim/event/process.h
@@ -26,6 +26,11 @@ struct process {
Stream *in, *out, *err;
process_exit_cb cb;
internal_process_cb internal_exit_cb, internal_close_cb;
+
+ // Called when any of the process streams (in/out/err) closes.
+ stream_close_cb stream_close_cb;
+ void *stream_close_data;
+
bool closed, detach;
MultiQueue *events;
};
@@ -50,6 +55,8 @@ static inline Process process_init(Loop *loop, ProcessType type, void *data)
.closed = false,
.internal_close_cb = NULL,
.internal_exit_cb = NULL,
+ .stream_close_cb = NULL,
+ .stream_close_data = NULL,
.detach = false
};
}
diff --git a/src/nvim/event/stream.c b/src/nvim/event/stream.c
index 7c865bfe1e8c..c8720d1e45d9 100644
--- a/src/nvim/event/stream.c
+++ b/src/nvim/event/stream.c
@@ -95,7 +95,11 @@ void stream_close(Stream *stream, stream_close_cb on_stream_close, void *data)
void stream_close_handle(Stream *stream)
FUNC_ATTR_NONNULL_ALL
{
+ ILOG("stream=%d", stream);
+ // LOG_CALLSTACK();
if (stream->uvstream) {
+ // problem: this schedules on the queue, but channel.c:receive_msgpack may
+ // be processed before close_cb is called by libuv.
uv_close((uv_handle_t *)stream->uvstream, close_cb);
} else {
uv_close((uv_handle_t *)&stream->uv.idle, close_cb);
@@ -105,6 +109,7 @@ void stream_close_handle(Stream *stream)
static void close_cb(uv_handle_t *handle)
{
Stream *stream = handle->data;
+ ILOG(">>>>>>>>>>>>>>>>>>>>>>> stream=%p stream->internal_close_cb=%p", stream, stream->internal_close_cb);
if (stream->buffer) {
rbuffer_free(stream->buffer);
}
diff --git a/src/nvim/msgpack_rpc/channel.c b/src/nvim/msgpack_rpc/channel.c
index 782eabe04e4a..dc2b794e366a 100644
--- a/src/nvim/msgpack_rpc/channel.c
+++ b/src/nvim/msgpack_rpc/channel.c
@@ -128,6 +128,8 @@ uint64_t channel_from_process(Process *proc, uint64_t id, char *source)
source);
incref(channel); // process channels are only closed by the exit_cb
channel->data.proc = proc;
+ channel->data.proc->stream_close_cb = close_cb2;
+ channel->data.proc->stream_close_data = channel;
wstream_init(proc->in, 0);
rstream_init(proc->out, 0);
@@ -387,17 +389,6 @@ static void receive_msgpack(Stream *stream, RBuffer *rbuf, size_t c,
goto end;
}
- if ((chan_wstream(channel) != NULL && chan_wstream(channel)->closed)
- || (chan_rstream(channel) != NULL && chan_rstream(channel)->closed)) {
- char buf[256];
- snprintf(buf, sizeof(buf),
- "ch %" PRIu64 ": stream closed unexpectedly. "
- "closing channel",
- channel->id);
- call_set_error(channel, buf, WARN_LOG_LEVEL);
- goto end;
- }
-
size_t count = rbuffer_size(rbuf);
DLOG("ch %" PRIu64 ": parsing %u bytes from msgpack Stream: %p",
channel->id, count, stream);
@@ -571,23 +562,6 @@ static Stream *chan_wstream(Channel *chan)
abort();
}
-/// Returns the Stream that a Channel reads from.
-static Stream *chan_rstream(Channel *chan)
-{
- switch (chan->type) {
- case kChannelTypeSocket:
- return &chan->data.stream;
- case kChannelTypeProc:
- return chan->data.proc->out;
- case kChannelTypeStdio:
- return &chan->data.std.in;
- case kChannelTypeInternal:
- return NULL;
- }
- abort();
-}
-
-
static bool channel_write(Channel *channel, WBuffer *buffer)
{
bool success = false;
@@ -799,6 +773,12 @@ static void close_cb(Stream *stream, void *data)
decref(data);
}
+static void close_cb2(Stream *stream, void *data)
+{
+ ILOG("close_cb2");
+ close_channel(data);
+}
+
/// @param source description of source function, rplugin name, TCP addr, etc
static Channel *register_channel(ChannelType type, uint64_t id,
MultiQueue *events, char *source)
2017-07-26 16:08:50 -07:00
|
|
|
describe('connecting to another (peer) nvim', function()
|
2024-04-20 08:44:13 -07:00
|
|
|
local nvim_argv = merge_args(n.nvim_argv, { '--headless' })
|
2017-04-26 04:10:21 -07:00
|
|
|
local function connect_test(server, mode, address)
|
2024-01-12 10:59:57 -07:00
|
|
|
local serverpid = fn.getpid()
|
2021-06-12 14:23:05 -07:00
|
|
|
local client = spawn(nvim_argv, false, nil, true)
|
|
|
|
set_session(client)
|
|
|
|
|
2024-01-12 10:59:57 -07:00
|
|
|
local clientpid = fn.getpid()
|
2017-04-26 04:10:21 -07:00
|
|
|
neq(serverpid, clientpid)
|
2024-01-12 10:59:57 -07:00
|
|
|
local id = fn.sockconnect(mode, address, { rpc = true })
|
2017-04-26 04:10:21 -07:00
|
|
|
ok(id > 0)
|
|
|
|
|
2024-01-12 10:59:57 -07:00
|
|
|
fn.rpcrequest(id, 'nvim_set_current_line', 'hello')
|
2024-02-12 06:50:39 -07:00
|
|
|
local client_id = fn.rpcrequest(id, 'nvim_get_chan_info', 0).id
|
2017-04-26 04:10:21 -07:00
|
|
|
|
2021-06-12 14:23:05 -07:00
|
|
|
set_session(server)
|
2024-01-12 10:59:57 -07:00
|
|
|
eq(serverpid, fn.getpid())
|
|
|
|
eq('hello', api.nvim_get_current_line())
|
2017-04-26 04:10:21 -07:00
|
|
|
|
|
|
|
-- method calls work both ways
|
2024-01-12 10:59:57 -07:00
|
|
|
fn.rpcrequest(client_id, 'nvim_set_current_line', 'howdy!')
|
2024-02-12 06:50:39 -07:00
|
|
|
eq(id, fn.rpcrequest(client_id, 'nvim_get_chan_info', 0).id)
|
2017-04-26 04:10:21 -07:00
|
|
|
|
2021-06-12 14:23:05 -07:00
|
|
|
set_session(client)
|
2024-01-12 10:59:57 -07:00
|
|
|
eq(clientpid, fn.getpid())
|
|
|
|
eq('howdy!', api.nvim_get_current_line())
|
2017-04-26 04:10:21 -07:00
|
|
|
|
|
|
|
server:close()
|
|
|
|
client:close()
|
|
|
|
end
|
|
|
|
|
rpc: close channel if stream was closed
f_jobstop()/f_rpcstop() .. process_stop() .. process_close_in(proc)
closes the write-stream of a RPC channel. But there might be
a pending RPC notification on the queue, which may get processed just
before the channel is closed.
To handle that case, check the Stream.closed in
channel.c:receive_msgpack().
Before this change, the above scenario could trigger
this assert(!stream->closed) in wstream_write():
0x00007f96e1cd3428 in __GI_raise (sig=sig@entry=6) at ../sysdeps/unix/sysv/linux/raise.c:54
0x00007f96e1cd502a in __GI_abort () at abort.c:89
0x00007f96e1ccbbd7 in __assert_fail_base (fmt=<optimized out>, assertion=assertion@entry=0x768f9b "!stream->closed",
file=file@entry=0x768f70 "../src/nvim/event/wstream.c", line=line@entry=77,
function=function@entry=0x768fb0 <__PRETTY_FUNCTION__.13735> "wstream_write") at assert.c:92
0x00007f96e1ccbc82 in __GI___assert_fail (assertion=0x768f9b "!stream->closed", file=0x768f70 "../src/nvim/event/wstream.c", line=77,
function=0x768fb0 <__PRETTY_FUNCTION__.13735> "wstream_write") at assert.c:101
0x00000000004d2c1f in wstream_write (stream=0x7f96e0a35078, buffer=0x7f96e09f9b40) at ../src/nvim/event/wstream.c:77
0x00000000005857b2 in channel_write (channel=0x7f96e0ae5800, buffer=0x7f96e09f9b40) at ../src/nvim/msgpack_rpc/channel.c:551
0x000000000058567d in on_request_event (argv=0x7ffed792efa0) at ../src/nvim/msgpack_rpc/channel.c:523
0x00000000005854c8 in handle_request (channel=0x7f96e0ae5800, request=0x7ffed792f1b8) at ../src/nvim/msgpack_rpc/channel.c:503
0x00000000005850cb in parse_msgpack (channel=0x7f96e0ae5800) at ../src/nvim/msgpack_rpc/channel.c:423
0x0000000000584f90 in receive_msgpack (stream=0x7f96e0a35218, rbuf=0x7f96e0d1d4c0, c=22, data=0x7f96e0ae5800, eof=false)
at ../src/nvim/msgpack_rpc/channel.c:389
0x00000000004d0b20 in read_event (argv=0x7ffed792f4a8) at ../src/nvim/event/rstream.c:190
0x00000000004ce462 in multiqueue_process_events (this=0x7f96e18172d0) at ../src/nvim/event/multiqueue.c:150
0x000000000059b630 in nv_event (cap=0x7ffed792f620) at ../src/nvim/normal.c:7908
0x000000000058be69 in normal_execute (state=0x7ffed792f580, key=-25341) at ../src/nvim/normal.c:1137
0x0000000000652463 in state_enter (s=0x7ffed792f580) at ../src/nvim/state.c:61
0x000000000058a1fe in normal_enter (cmdwin=false, noexmode=false) at ../src/nvim/normal.c:467
0x00000000005500c2 in main (argc=2, argv=0x7ffed792f8d8) at ../src/nvim/main.c:554
Alternative approach suggested by bfredl is to use close_cb of the
process. My unsuccessful attempt is below. (It seems close_cb is queued
too late, which is the similar problem addressed by this commit):
commit 75fc12c6ab15711bdb7b18c6d42ec9d157f5145e
Author: Justin M. Keyes <justinkz@gmail.com>
Date: Fri Aug 18 01:30:41 2017 +0200
rpc: use Stream's close_cb instead of explicit check in receive_msgpack()
diff --git a/src/nvim/event/process.c b/src/nvim/event/process.c
index 8371d3cd482e..e52da23cdc40 100644
--- a/src/nvim/event/process.c
+++ b/src/nvim/event/process.c
@@ -416,6 +416,10 @@ static void on_process_exit(Process *proc)
static void on_process_stream_close(Stream *stream, void *data)
{
Process *proc = data;
+ ILOG("on_process_stream_close");
+ if (proc->stream_close_cb != NULL) {
+ proc->stream_close_cb(stream, proc->stream_close_data);
+ }
decref(proc);
}
diff --git a/src/nvim/event/process.h b/src/nvim/event/process.h
index 5c00e8e7ecd5..34a8d54f6f8c 100644
--- a/src/nvim/event/process.h
+++ b/src/nvim/event/process.h
@@ -26,6 +26,11 @@ struct process {
Stream *in, *out, *err;
process_exit_cb cb;
internal_process_cb internal_exit_cb, internal_close_cb;
+
+ // Called when any of the process streams (in/out/err) closes.
+ stream_close_cb stream_close_cb;
+ void *stream_close_data;
+
bool closed, detach;
MultiQueue *events;
};
@@ -50,6 +55,8 @@ static inline Process process_init(Loop *loop, ProcessType type, void *data)
.closed = false,
.internal_close_cb = NULL,
.internal_exit_cb = NULL,
+ .stream_close_cb = NULL,
+ .stream_close_data = NULL,
.detach = false
};
}
diff --git a/src/nvim/event/stream.c b/src/nvim/event/stream.c
index 7c865bfe1e8c..c8720d1e45d9 100644
--- a/src/nvim/event/stream.c
+++ b/src/nvim/event/stream.c
@@ -95,7 +95,11 @@ void stream_close(Stream *stream, stream_close_cb on_stream_close, void *data)
void stream_close_handle(Stream *stream)
FUNC_ATTR_NONNULL_ALL
{
+ ILOG("stream=%d", stream);
+ // LOG_CALLSTACK();
if (stream->uvstream) {
+ // problem: this schedules on the queue, but channel.c:receive_msgpack may
+ // be processed before close_cb is called by libuv.
uv_close((uv_handle_t *)stream->uvstream, close_cb);
} else {
uv_close((uv_handle_t *)&stream->uv.idle, close_cb);
@@ -105,6 +109,7 @@ void stream_close_handle(Stream *stream)
static void close_cb(uv_handle_t *handle)
{
Stream *stream = handle->data;
+ ILOG(">>>>>>>>>>>>>>>>>>>>>>> stream=%p stream->internal_close_cb=%p", stream, stream->internal_close_cb);
if (stream->buffer) {
rbuffer_free(stream->buffer);
}
diff --git a/src/nvim/msgpack_rpc/channel.c b/src/nvim/msgpack_rpc/channel.c
index 782eabe04e4a..dc2b794e366a 100644
--- a/src/nvim/msgpack_rpc/channel.c
+++ b/src/nvim/msgpack_rpc/channel.c
@@ -128,6 +128,8 @@ uint64_t channel_from_process(Process *proc, uint64_t id, char *source)
source);
incref(channel); // process channels are only closed by the exit_cb
channel->data.proc = proc;
+ channel->data.proc->stream_close_cb = close_cb2;
+ channel->data.proc->stream_close_data = channel;
wstream_init(proc->in, 0);
rstream_init(proc->out, 0);
@@ -387,17 +389,6 @@ static void receive_msgpack(Stream *stream, RBuffer *rbuf, size_t c,
goto end;
}
- if ((chan_wstream(channel) != NULL && chan_wstream(channel)->closed)
- || (chan_rstream(channel) != NULL && chan_rstream(channel)->closed)) {
- char buf[256];
- snprintf(buf, sizeof(buf),
- "ch %" PRIu64 ": stream closed unexpectedly. "
- "closing channel",
- channel->id);
- call_set_error(channel, buf, WARN_LOG_LEVEL);
- goto end;
- }
-
size_t count = rbuffer_size(rbuf);
DLOG("ch %" PRIu64 ": parsing %u bytes from msgpack Stream: %p",
channel->id, count, stream);
@@ -571,23 +562,6 @@ static Stream *chan_wstream(Channel *chan)
abort();
}
-/// Returns the Stream that a Channel reads from.
-static Stream *chan_rstream(Channel *chan)
-{
- switch (chan->type) {
- case kChannelTypeSocket:
- return &chan->data.stream;
- case kChannelTypeProc:
- return chan->data.proc->out;
- case kChannelTypeStdio:
- return &chan->data.std.in;
- case kChannelTypeInternal:
- return NULL;
- }
- abort();
-}
-
-
static bool channel_write(Channel *channel, WBuffer *buffer)
{
bool success = false;
@@ -799,6 +773,12 @@ static void close_cb(Stream *stream, void *data)
decref(data);
}
+static void close_cb2(Stream *stream, void *data)
+{
+ ILOG("close_cb2");
+ close_channel(data);
+}
+
/// @param source description of source function, rplugin name, TCP addr, etc
static Channel *register_channel(ChannelType type, uint64_t id,
MultiQueue *events, char *source)
2017-07-26 16:08:50 -07:00
|
|
|
it('via named pipe', function()
|
2017-04-26 04:10:21 -07:00
|
|
|
local server = spawn(nvim_argv)
|
|
|
|
set_session(server)
|
2024-01-12 10:59:57 -07:00
|
|
|
local address = fn.serverlist()[1]
|
2017-04-26 04:10:21 -07:00
|
|
|
local first = string.sub(address, 1, 1)
|
|
|
|
ok(first == '/' or first == '\\')
|
|
|
|
connect_test(server, 'pipe', address)
|
|
|
|
end)
|
|
|
|
|
2017-11-02 02:45:38 -07:00
|
|
|
it('via ipv4 address', function()
|
2017-04-26 04:10:21 -07:00
|
|
|
local server = spawn(nvim_argv)
|
|
|
|
set_session(server)
|
2024-01-12 10:59:57 -07:00
|
|
|
local status, address = pcall(fn.serverstart, '127.0.0.1:')
|
2018-05-09 12:44:18 -07:00
|
|
|
if not status then
|
2019-10-17 19:46:30 -07:00
|
|
|
pending('no ipv4 stack')
|
2017-11-02 02:45:38 -07:00
|
|
|
end
|
2017-04-26 04:10:21 -07:00
|
|
|
eq('127.0.0.1:', string.sub(address, 1, 10))
|
|
|
|
connect_test(server, 'tcp', address)
|
|
|
|
end)
|
|
|
|
|
2017-11-02 02:45:38 -07:00
|
|
|
it('via ipv6 address', function()
|
|
|
|
local server = spawn(nvim_argv)
|
|
|
|
set_session(server)
|
2024-01-12 10:59:57 -07:00
|
|
|
local status, address = pcall(fn.serverstart, '::1:')
|
2018-05-09 12:44:18 -07:00
|
|
|
if not status then
|
2019-10-17 19:46:30 -07:00
|
|
|
pending('no ipv6 stack')
|
2017-11-02 02:45:38 -07:00
|
|
|
end
|
|
|
|
eq('::1:', string.sub(address, 1, 4))
|
|
|
|
connect_test(server, 'tcp', address)
|
|
|
|
end)
|
|
|
|
|
rpc: close channel if stream was closed
f_jobstop()/f_rpcstop() .. process_stop() .. process_close_in(proc)
closes the write-stream of a RPC channel. But there might be
a pending RPC notification on the queue, which may get processed just
before the channel is closed.
To handle that case, check the Stream.closed in
channel.c:receive_msgpack().
Before this change, the above scenario could trigger
this assert(!stream->closed) in wstream_write():
0x00007f96e1cd3428 in __GI_raise (sig=sig@entry=6) at ../sysdeps/unix/sysv/linux/raise.c:54
0x00007f96e1cd502a in __GI_abort () at abort.c:89
0x00007f96e1ccbbd7 in __assert_fail_base (fmt=<optimized out>, assertion=assertion@entry=0x768f9b "!stream->closed",
file=file@entry=0x768f70 "../src/nvim/event/wstream.c", line=line@entry=77,
function=function@entry=0x768fb0 <__PRETTY_FUNCTION__.13735> "wstream_write") at assert.c:92
0x00007f96e1ccbc82 in __GI___assert_fail (assertion=0x768f9b "!stream->closed", file=0x768f70 "../src/nvim/event/wstream.c", line=77,
function=0x768fb0 <__PRETTY_FUNCTION__.13735> "wstream_write") at assert.c:101
0x00000000004d2c1f in wstream_write (stream=0x7f96e0a35078, buffer=0x7f96e09f9b40) at ../src/nvim/event/wstream.c:77
0x00000000005857b2 in channel_write (channel=0x7f96e0ae5800, buffer=0x7f96e09f9b40) at ../src/nvim/msgpack_rpc/channel.c:551
0x000000000058567d in on_request_event (argv=0x7ffed792efa0) at ../src/nvim/msgpack_rpc/channel.c:523
0x00000000005854c8 in handle_request (channel=0x7f96e0ae5800, request=0x7ffed792f1b8) at ../src/nvim/msgpack_rpc/channel.c:503
0x00000000005850cb in parse_msgpack (channel=0x7f96e0ae5800) at ../src/nvim/msgpack_rpc/channel.c:423
0x0000000000584f90 in receive_msgpack (stream=0x7f96e0a35218, rbuf=0x7f96e0d1d4c0, c=22, data=0x7f96e0ae5800, eof=false)
at ../src/nvim/msgpack_rpc/channel.c:389
0x00000000004d0b20 in read_event (argv=0x7ffed792f4a8) at ../src/nvim/event/rstream.c:190
0x00000000004ce462 in multiqueue_process_events (this=0x7f96e18172d0) at ../src/nvim/event/multiqueue.c:150
0x000000000059b630 in nv_event (cap=0x7ffed792f620) at ../src/nvim/normal.c:7908
0x000000000058be69 in normal_execute (state=0x7ffed792f580, key=-25341) at ../src/nvim/normal.c:1137
0x0000000000652463 in state_enter (s=0x7ffed792f580) at ../src/nvim/state.c:61
0x000000000058a1fe in normal_enter (cmdwin=false, noexmode=false) at ../src/nvim/normal.c:467
0x00000000005500c2 in main (argc=2, argv=0x7ffed792f8d8) at ../src/nvim/main.c:554
Alternative approach suggested by bfredl is to use close_cb of the
process. My unsuccessful attempt is below. (It seems close_cb is queued
too late, which is the similar problem addressed by this commit):
commit 75fc12c6ab15711bdb7b18c6d42ec9d157f5145e
Author: Justin M. Keyes <justinkz@gmail.com>
Date: Fri Aug 18 01:30:41 2017 +0200
rpc: use Stream's close_cb instead of explicit check in receive_msgpack()
diff --git a/src/nvim/event/process.c b/src/nvim/event/process.c
index 8371d3cd482e..e52da23cdc40 100644
--- a/src/nvim/event/process.c
+++ b/src/nvim/event/process.c
@@ -416,6 +416,10 @@ static void on_process_exit(Process *proc)
static void on_process_stream_close(Stream *stream, void *data)
{
Process *proc = data;
+ ILOG("on_process_stream_close");
+ if (proc->stream_close_cb != NULL) {
+ proc->stream_close_cb(stream, proc->stream_close_data);
+ }
decref(proc);
}
diff --git a/src/nvim/event/process.h b/src/nvim/event/process.h
index 5c00e8e7ecd5..34a8d54f6f8c 100644
--- a/src/nvim/event/process.h
+++ b/src/nvim/event/process.h
@@ -26,6 +26,11 @@ struct process {
Stream *in, *out, *err;
process_exit_cb cb;
internal_process_cb internal_exit_cb, internal_close_cb;
+
+ // Called when any of the process streams (in/out/err) closes.
+ stream_close_cb stream_close_cb;
+ void *stream_close_data;
+
bool closed, detach;
MultiQueue *events;
};
@@ -50,6 +55,8 @@ static inline Process process_init(Loop *loop, ProcessType type, void *data)
.closed = false,
.internal_close_cb = NULL,
.internal_exit_cb = NULL,
+ .stream_close_cb = NULL,
+ .stream_close_data = NULL,
.detach = false
};
}
diff --git a/src/nvim/event/stream.c b/src/nvim/event/stream.c
index 7c865bfe1e8c..c8720d1e45d9 100644
--- a/src/nvim/event/stream.c
+++ b/src/nvim/event/stream.c
@@ -95,7 +95,11 @@ void stream_close(Stream *stream, stream_close_cb on_stream_close, void *data)
void stream_close_handle(Stream *stream)
FUNC_ATTR_NONNULL_ALL
{
+ ILOG("stream=%d", stream);
+ // LOG_CALLSTACK();
if (stream->uvstream) {
+ // problem: this schedules on the queue, but channel.c:receive_msgpack may
+ // be processed before close_cb is called by libuv.
uv_close((uv_handle_t *)stream->uvstream, close_cb);
} else {
uv_close((uv_handle_t *)&stream->uv.idle, close_cb);
@@ -105,6 +109,7 @@ void stream_close_handle(Stream *stream)
static void close_cb(uv_handle_t *handle)
{
Stream *stream = handle->data;
+ ILOG(">>>>>>>>>>>>>>>>>>>>>>> stream=%p stream->internal_close_cb=%p", stream, stream->internal_close_cb);
if (stream->buffer) {
rbuffer_free(stream->buffer);
}
diff --git a/src/nvim/msgpack_rpc/channel.c b/src/nvim/msgpack_rpc/channel.c
index 782eabe04e4a..dc2b794e366a 100644
--- a/src/nvim/msgpack_rpc/channel.c
+++ b/src/nvim/msgpack_rpc/channel.c
@@ -128,6 +128,8 @@ uint64_t channel_from_process(Process *proc, uint64_t id, char *source)
source);
incref(channel); // process channels are only closed by the exit_cb
channel->data.proc = proc;
+ channel->data.proc->stream_close_cb = close_cb2;
+ channel->data.proc->stream_close_data = channel;
wstream_init(proc->in, 0);
rstream_init(proc->out, 0);
@@ -387,17 +389,6 @@ static void receive_msgpack(Stream *stream, RBuffer *rbuf, size_t c,
goto end;
}
- if ((chan_wstream(channel) != NULL && chan_wstream(channel)->closed)
- || (chan_rstream(channel) != NULL && chan_rstream(channel)->closed)) {
- char buf[256];
- snprintf(buf, sizeof(buf),
- "ch %" PRIu64 ": stream closed unexpectedly. "
- "closing channel",
- channel->id);
- call_set_error(channel, buf, WARN_LOG_LEVEL);
- goto end;
- }
-
size_t count = rbuffer_size(rbuf);
DLOG("ch %" PRIu64 ": parsing %u bytes from msgpack Stream: %p",
channel->id, count, stream);
@@ -571,23 +562,6 @@ static Stream *chan_wstream(Channel *chan)
abort();
}
-/// Returns the Stream that a Channel reads from.
-static Stream *chan_rstream(Channel *chan)
-{
- switch (chan->type) {
- case kChannelTypeSocket:
- return &chan->data.stream;
- case kChannelTypeProc:
- return chan->data.proc->out;
- case kChannelTypeStdio:
- return &chan->data.std.in;
- case kChannelTypeInternal:
- return NULL;
- }
- abort();
-}
-
-
static bool channel_write(Channel *channel, WBuffer *buffer)
{
bool success = false;
@@ -799,6 +773,12 @@ static void close_cb(Stream *stream, void *data)
decref(data);
}
+static void close_cb2(Stream *stream, void *data)
+{
+ ILOG("close_cb2");
+ close_channel(data);
+}
+
/// @param source description of source function, rplugin name, TCP addr, etc
static Channel *register_channel(ChannelType type, uint64_t id,
MultiQueue *events, char *source)
2017-07-26 16:08:50 -07:00
|
|
|
it('via hostname', function()
|
2017-04-26 04:10:21 -07:00
|
|
|
local server = spawn(nvim_argv)
|
|
|
|
set_session(server)
|
2024-01-12 10:59:57 -07:00
|
|
|
local address = fn.serverstart('localhost:')
|
2017-04-26 04:10:21 -07:00
|
|
|
eq('localhost:', string.sub(address, 1, 10))
|
|
|
|
connect_test(server, 'tcp', address)
|
|
|
|
end)
|
2023-02-08 19:53:47 -07:00
|
|
|
|
|
|
|
it('does not crash on receiving UI events', function()
|
|
|
|
local server = spawn(nvim_argv)
|
|
|
|
set_session(server)
|
2024-01-12 10:59:57 -07:00
|
|
|
local address = fn.serverlist()[1]
|
2023-02-08 19:53:47 -07:00
|
|
|
local client = spawn(nvim_argv, false, nil, true)
|
|
|
|
set_session(client)
|
|
|
|
|
2024-01-12 10:59:57 -07:00
|
|
|
local id = fn.sockconnect('pipe', address, { rpc = true })
|
|
|
|
fn.rpcrequest(id, 'nvim_ui_attach', 80, 24, {})
|
2023-02-08 19:53:47 -07:00
|
|
|
assert_alive()
|
|
|
|
|
|
|
|
server:close()
|
|
|
|
client:close()
|
|
|
|
end)
|
2024-03-21 02:08:31 -07:00
|
|
|
|
|
|
|
it('via stdio, with many small flushes does not crash #23781', function()
|
|
|
|
source([[
|
|
|
|
let chan = jobstart([v:progpath, '--embed', '--headless', '-n', '-u', 'NONE', '-i', 'NONE'], { 'rpc':v:false })
|
|
|
|
call chansend(chan, 0Z94)
|
|
|
|
sleep 50m
|
|
|
|
call chansend(chan, 0Z00)
|
|
|
|
call chansend(chan, 0Z01)
|
|
|
|
call chansend(chan, 0ZAC)
|
|
|
|
call chansend(chan, 0Z6E76696D5F636F6D6D616E64)
|
|
|
|
call chansend(chan, 0Z91)
|
|
|
|
call chansend(chan, 0ZA5)
|
|
|
|
call chansend(chan, 0Z71616C6C21)
|
|
|
|
let g:statuses = jobwait([chan])
|
|
|
|
]])
|
|
|
|
eq(eval('g:statuses'), { 0 })
|
|
|
|
assert_alive()
|
|
|
|
end)
|
2017-04-26 04:10:21 -07:00
|
|
|
end)
|
|
|
|
|
2017-07-01 15:30:00 -07:00
|
|
|
describe('connecting to its own pipe address', function()
|
|
|
|
it('does not deadlock', function()
|
2024-01-12 10:59:57 -07:00
|
|
|
local address = fn.serverlist()[1]
|
2017-05-06 00:31:36 -07:00
|
|
|
local first = string.sub(address, 1, 1)
|
|
|
|
ok(first == '/' or first == '\\')
|
2024-01-12 10:59:57 -07:00
|
|
|
local serverpid = fn.getpid()
|
2017-05-06 00:31:36 -07:00
|
|
|
|
2024-01-12 10:59:57 -07:00
|
|
|
local id = fn.sockconnect('pipe', address, { rpc = true })
|
2017-05-06 00:31:36 -07:00
|
|
|
|
2024-01-12 10:59:57 -07:00
|
|
|
fn.rpcrequest(id, 'nvim_set_current_line', 'hello')
|
|
|
|
eq('hello', api.nvim_get_current_line())
|
|
|
|
eq(serverpid, fn.rpcrequest(id, 'nvim_eval', 'getpid()'))
|
2017-05-06 00:31:36 -07:00
|
|
|
|
2024-02-12 06:50:39 -07:00
|
|
|
eq(id, fn.rpcrequest(id, 'nvim_get_chan_info', 0).id)
|
2017-05-06 00:31:36 -07:00
|
|
|
end)
|
|
|
|
end)
|
2014-10-08 09:56:01 -07:00
|
|
|
end)
|