neovim/test/functional/api/server_requests_spec.lua

362 lines
12 KiB
Lua
Raw Normal View History

-- Test server -> client RPC scenarios. Note: unlike `rpcnotify`, to evaluate
-- `rpcrequest` calls we need the client event loop to be running.
local helpers = require('test.functional.helpers')(after_each)
local clear, nvim, eval = helpers.clear, helpers.nvim, helpers.eval
local eq, neq, run, stop = helpers.eq, helpers.neq, helpers.run, helpers.stop
local nvim_prog, command, funcs = helpers.nvim_prog, helpers.command, helpers.funcs
local source, next_msg = helpers.source, helpers.next_msg
local ok = helpers.ok
local meths = helpers.meths
local spawn, merge_args = helpers.spawn, helpers.merge_args
local set_session = helpers.set_session
local meth_pcall = helpers.meth_pcall
describe('server -> client', function()
local cid
before_each(function()
clear()
cid = nvim('get_api_info')[1]
end)
rpc: close channel if stream was closed f_jobstop()/f_rpcstop() .. process_stop() .. process_close_in(proc) closes the write-stream of a RPC channel. But there might be a pending RPC notification on the queue, which may get processed just before the channel is closed. To handle that case, check the Stream.closed in channel.c:receive_msgpack(). Before this change, the above scenario could trigger this assert(!stream->closed) in wstream_write(): 0x00007f96e1cd3428 in __GI_raise (sig=sig@entry=6) at ../sysdeps/unix/sysv/linux/raise.c:54 0x00007f96e1cd502a in __GI_abort () at abort.c:89 0x00007f96e1ccbbd7 in __assert_fail_base (fmt=<optimized out>, assertion=assertion@entry=0x768f9b "!stream->closed", file=file@entry=0x768f70 "../src/nvim/event/wstream.c", line=line@entry=77, function=function@entry=0x768fb0 <__PRETTY_FUNCTION__.13735> "wstream_write") at assert.c:92 0x00007f96e1ccbc82 in __GI___assert_fail (assertion=0x768f9b "!stream->closed", file=0x768f70 "../src/nvim/event/wstream.c", line=77, function=0x768fb0 <__PRETTY_FUNCTION__.13735> "wstream_write") at assert.c:101 0x00000000004d2c1f in wstream_write (stream=0x7f96e0a35078, buffer=0x7f96e09f9b40) at ../src/nvim/event/wstream.c:77 0x00000000005857b2 in channel_write (channel=0x7f96e0ae5800, buffer=0x7f96e09f9b40) at ../src/nvim/msgpack_rpc/channel.c:551 0x000000000058567d in on_request_event (argv=0x7ffed792efa0) at ../src/nvim/msgpack_rpc/channel.c:523 0x00000000005854c8 in handle_request (channel=0x7f96e0ae5800, request=0x7ffed792f1b8) at ../src/nvim/msgpack_rpc/channel.c:503 0x00000000005850cb in parse_msgpack (channel=0x7f96e0ae5800) at ../src/nvim/msgpack_rpc/channel.c:423 0x0000000000584f90 in receive_msgpack (stream=0x7f96e0a35218, rbuf=0x7f96e0d1d4c0, c=22, data=0x7f96e0ae5800, eof=false) at ../src/nvim/msgpack_rpc/channel.c:389 0x00000000004d0b20 in read_event (argv=0x7ffed792f4a8) at ../src/nvim/event/rstream.c:190 0x00000000004ce462 in multiqueue_process_events (this=0x7f96e18172d0) at ../src/nvim/event/multiqueue.c:150 0x000000000059b630 in nv_event (cap=0x7ffed792f620) at ../src/nvim/normal.c:7908 0x000000000058be69 in normal_execute (state=0x7ffed792f580, key=-25341) at ../src/nvim/normal.c:1137 0x0000000000652463 in state_enter (s=0x7ffed792f580) at ../src/nvim/state.c:61 0x000000000058a1fe in normal_enter (cmdwin=false, noexmode=false) at ../src/nvim/normal.c:467 0x00000000005500c2 in main (argc=2, argv=0x7ffed792f8d8) at ../src/nvim/main.c:554 Alternative approach suggested by bfredl is to use close_cb of the process. My unsuccessful attempt is below. (It seems close_cb is queued too late, which is the similar problem addressed by this commit): commit 75fc12c6ab15711bdb7b18c6d42ec9d157f5145e Author: Justin M. Keyes <justinkz@gmail.com> Date: Fri Aug 18 01:30:41 2017 +0200 rpc: use Stream's close_cb instead of explicit check in receive_msgpack() diff --git a/src/nvim/event/process.c b/src/nvim/event/process.c index 8371d3cd482e..e52da23cdc40 100644 --- a/src/nvim/event/process.c +++ b/src/nvim/event/process.c @@ -416,6 +416,10 @@ static void on_process_exit(Process *proc) static void on_process_stream_close(Stream *stream, void *data) { Process *proc = data; + ILOG("on_process_stream_close"); + if (proc->stream_close_cb != NULL) { + proc->stream_close_cb(stream, proc->stream_close_data); + } decref(proc); } diff --git a/src/nvim/event/process.h b/src/nvim/event/process.h index 5c00e8e7ecd5..34a8d54f6f8c 100644 --- a/src/nvim/event/process.h +++ b/src/nvim/event/process.h @@ -26,6 +26,11 @@ struct process { Stream *in, *out, *err; process_exit_cb cb; internal_process_cb internal_exit_cb, internal_close_cb; + + // Called when any of the process streams (in/out/err) closes. + stream_close_cb stream_close_cb; + void *stream_close_data; + bool closed, detach; MultiQueue *events; }; @@ -50,6 +55,8 @@ static inline Process process_init(Loop *loop, ProcessType type, void *data) .closed = false, .internal_close_cb = NULL, .internal_exit_cb = NULL, + .stream_close_cb = NULL, + .stream_close_data = NULL, .detach = false }; } diff --git a/src/nvim/event/stream.c b/src/nvim/event/stream.c index 7c865bfe1e8c..c8720d1e45d9 100644 --- a/src/nvim/event/stream.c +++ b/src/nvim/event/stream.c @@ -95,7 +95,11 @@ void stream_close(Stream *stream, stream_close_cb on_stream_close, void *data) void stream_close_handle(Stream *stream) FUNC_ATTR_NONNULL_ALL { + ILOG("stream=%d", stream); + // LOG_CALLSTACK(); if (stream->uvstream) { + // problem: this schedules on the queue, but channel.c:receive_msgpack may + // be processed before close_cb is called by libuv. uv_close((uv_handle_t *)stream->uvstream, close_cb); } else { uv_close((uv_handle_t *)&stream->uv.idle, close_cb); @@ -105,6 +109,7 @@ void stream_close_handle(Stream *stream) static void close_cb(uv_handle_t *handle) { Stream *stream = handle->data; + ILOG(">>>>>>>>>>>>>>>>>>>>>>> stream=%p stream->internal_close_cb=%p", stream, stream->internal_close_cb); if (stream->buffer) { rbuffer_free(stream->buffer); } diff --git a/src/nvim/msgpack_rpc/channel.c b/src/nvim/msgpack_rpc/channel.c index 782eabe04e4a..dc2b794e366a 100644 --- a/src/nvim/msgpack_rpc/channel.c +++ b/src/nvim/msgpack_rpc/channel.c @@ -128,6 +128,8 @@ uint64_t channel_from_process(Process *proc, uint64_t id, char *source) source); incref(channel); // process channels are only closed by the exit_cb channel->data.proc = proc; + channel->data.proc->stream_close_cb = close_cb2; + channel->data.proc->stream_close_data = channel; wstream_init(proc->in, 0); rstream_init(proc->out, 0); @@ -387,17 +389,6 @@ static void receive_msgpack(Stream *stream, RBuffer *rbuf, size_t c, goto end; } - if ((chan_wstream(channel) != NULL && chan_wstream(channel)->closed) - || (chan_rstream(channel) != NULL && chan_rstream(channel)->closed)) { - char buf[256]; - snprintf(buf, sizeof(buf), - "ch %" PRIu64 ": stream closed unexpectedly. " - "closing channel", - channel->id); - call_set_error(channel, buf, WARN_LOG_LEVEL); - goto end; - } - size_t count = rbuffer_size(rbuf); DLOG("ch %" PRIu64 ": parsing %u bytes from msgpack Stream: %p", channel->id, count, stream); @@ -571,23 +562,6 @@ static Stream *chan_wstream(Channel *chan) abort(); } -/// Returns the Stream that a Channel reads from. -static Stream *chan_rstream(Channel *chan) -{ - switch (chan->type) { - case kChannelTypeSocket: - return &chan->data.stream; - case kChannelTypeProc: - return chan->data.proc->out; - case kChannelTypeStdio: - return &chan->data.std.in; - case kChannelTypeInternal: - return NULL; - } - abort(); -} - - static bool channel_write(Channel *channel, WBuffer *buffer) { bool success = false; @@ -799,6 +773,12 @@ static void close_cb(Stream *stream, void *data) decref(data); } +static void close_cb2(Stream *stream, void *data) +{ + ILOG("close_cb2"); + close_channel(data); +} + /// @param source description of source function, rplugin name, TCP addr, etc static Channel *register_channel(ChannelType type, uint64_t id, MultiQueue *events, char *source)
2017-07-26 16:08:50 -07:00
it('handles unexpected closed stream while preparing RPC response', function()
source([[
let g:_nvim_args = [v:progpath, '--embed', '--headless', '-n', '-u', 'NONE', '-i', 'NONE', ]
rpc: close channel if stream was closed f_jobstop()/f_rpcstop() .. process_stop() .. process_close_in(proc) closes the write-stream of a RPC channel. But there might be a pending RPC notification on the queue, which may get processed just before the channel is closed. To handle that case, check the Stream.closed in channel.c:receive_msgpack(). Before this change, the above scenario could trigger this assert(!stream->closed) in wstream_write(): 0x00007f96e1cd3428 in __GI_raise (sig=sig@entry=6) at ../sysdeps/unix/sysv/linux/raise.c:54 0x00007f96e1cd502a in __GI_abort () at abort.c:89 0x00007f96e1ccbbd7 in __assert_fail_base (fmt=<optimized out>, assertion=assertion@entry=0x768f9b "!stream->closed", file=file@entry=0x768f70 "../src/nvim/event/wstream.c", line=line@entry=77, function=function@entry=0x768fb0 <__PRETTY_FUNCTION__.13735> "wstream_write") at assert.c:92 0x00007f96e1ccbc82 in __GI___assert_fail (assertion=0x768f9b "!stream->closed", file=0x768f70 "../src/nvim/event/wstream.c", line=77, function=0x768fb0 <__PRETTY_FUNCTION__.13735> "wstream_write") at assert.c:101 0x00000000004d2c1f in wstream_write (stream=0x7f96e0a35078, buffer=0x7f96e09f9b40) at ../src/nvim/event/wstream.c:77 0x00000000005857b2 in channel_write (channel=0x7f96e0ae5800, buffer=0x7f96e09f9b40) at ../src/nvim/msgpack_rpc/channel.c:551 0x000000000058567d in on_request_event (argv=0x7ffed792efa0) at ../src/nvim/msgpack_rpc/channel.c:523 0x00000000005854c8 in handle_request (channel=0x7f96e0ae5800, request=0x7ffed792f1b8) at ../src/nvim/msgpack_rpc/channel.c:503 0x00000000005850cb in parse_msgpack (channel=0x7f96e0ae5800) at ../src/nvim/msgpack_rpc/channel.c:423 0x0000000000584f90 in receive_msgpack (stream=0x7f96e0a35218, rbuf=0x7f96e0d1d4c0, c=22, data=0x7f96e0ae5800, eof=false) at ../src/nvim/msgpack_rpc/channel.c:389 0x00000000004d0b20 in read_event (argv=0x7ffed792f4a8) at ../src/nvim/event/rstream.c:190 0x00000000004ce462 in multiqueue_process_events (this=0x7f96e18172d0) at ../src/nvim/event/multiqueue.c:150 0x000000000059b630 in nv_event (cap=0x7ffed792f620) at ../src/nvim/normal.c:7908 0x000000000058be69 in normal_execute (state=0x7ffed792f580, key=-25341) at ../src/nvim/normal.c:1137 0x0000000000652463 in state_enter (s=0x7ffed792f580) at ../src/nvim/state.c:61 0x000000000058a1fe in normal_enter (cmdwin=false, noexmode=false) at ../src/nvim/normal.c:467 0x00000000005500c2 in main (argc=2, argv=0x7ffed792f8d8) at ../src/nvim/main.c:554 Alternative approach suggested by bfredl is to use close_cb of the process. My unsuccessful attempt is below. (It seems close_cb is queued too late, which is the similar problem addressed by this commit): commit 75fc12c6ab15711bdb7b18c6d42ec9d157f5145e Author: Justin M. Keyes <justinkz@gmail.com> Date: Fri Aug 18 01:30:41 2017 +0200 rpc: use Stream's close_cb instead of explicit check in receive_msgpack() diff --git a/src/nvim/event/process.c b/src/nvim/event/process.c index 8371d3cd482e..e52da23cdc40 100644 --- a/src/nvim/event/process.c +++ b/src/nvim/event/process.c @@ -416,6 +416,10 @@ static void on_process_exit(Process *proc) static void on_process_stream_close(Stream *stream, void *data) { Process *proc = data; + ILOG("on_process_stream_close"); + if (proc->stream_close_cb != NULL) { + proc->stream_close_cb(stream, proc->stream_close_data); + } decref(proc); } diff --git a/src/nvim/event/process.h b/src/nvim/event/process.h index 5c00e8e7ecd5..34a8d54f6f8c 100644 --- a/src/nvim/event/process.h +++ b/src/nvim/event/process.h @@ -26,6 +26,11 @@ struct process { Stream *in, *out, *err; process_exit_cb cb; internal_process_cb internal_exit_cb, internal_close_cb; + + // Called when any of the process streams (in/out/err) closes. + stream_close_cb stream_close_cb; + void *stream_close_data; + bool closed, detach; MultiQueue *events; }; @@ -50,6 +55,8 @@ static inline Process process_init(Loop *loop, ProcessType type, void *data) .closed = false, .internal_close_cb = NULL, .internal_exit_cb = NULL, + .stream_close_cb = NULL, + .stream_close_data = NULL, .detach = false }; } diff --git a/src/nvim/event/stream.c b/src/nvim/event/stream.c index 7c865bfe1e8c..c8720d1e45d9 100644 --- a/src/nvim/event/stream.c +++ b/src/nvim/event/stream.c @@ -95,7 +95,11 @@ void stream_close(Stream *stream, stream_close_cb on_stream_close, void *data) void stream_close_handle(Stream *stream) FUNC_ATTR_NONNULL_ALL { + ILOG("stream=%d", stream); + // LOG_CALLSTACK(); if (stream->uvstream) { + // problem: this schedules on the queue, but channel.c:receive_msgpack may + // be processed before close_cb is called by libuv. uv_close((uv_handle_t *)stream->uvstream, close_cb); } else { uv_close((uv_handle_t *)&stream->uv.idle, close_cb); @@ -105,6 +109,7 @@ void stream_close_handle(Stream *stream) static void close_cb(uv_handle_t *handle) { Stream *stream = handle->data; + ILOG(">>>>>>>>>>>>>>>>>>>>>>> stream=%p stream->internal_close_cb=%p", stream, stream->internal_close_cb); if (stream->buffer) { rbuffer_free(stream->buffer); } diff --git a/src/nvim/msgpack_rpc/channel.c b/src/nvim/msgpack_rpc/channel.c index 782eabe04e4a..dc2b794e366a 100644 --- a/src/nvim/msgpack_rpc/channel.c +++ b/src/nvim/msgpack_rpc/channel.c @@ -128,6 +128,8 @@ uint64_t channel_from_process(Process *proc, uint64_t id, char *source) source); incref(channel); // process channels are only closed by the exit_cb channel->data.proc = proc; + channel->data.proc->stream_close_cb = close_cb2; + channel->data.proc->stream_close_data = channel; wstream_init(proc->in, 0); rstream_init(proc->out, 0); @@ -387,17 +389,6 @@ static void receive_msgpack(Stream *stream, RBuffer *rbuf, size_t c, goto end; } - if ((chan_wstream(channel) != NULL && chan_wstream(channel)->closed) - || (chan_rstream(channel) != NULL && chan_rstream(channel)->closed)) { - char buf[256]; - snprintf(buf, sizeof(buf), - "ch %" PRIu64 ": stream closed unexpectedly. " - "closing channel", - channel->id); - call_set_error(channel, buf, WARN_LOG_LEVEL); - goto end; - } - size_t count = rbuffer_size(rbuf); DLOG("ch %" PRIu64 ": parsing %u bytes from msgpack Stream: %p", channel->id, count, stream); @@ -571,23 +562,6 @@ static Stream *chan_wstream(Channel *chan) abort(); } -/// Returns the Stream that a Channel reads from. -static Stream *chan_rstream(Channel *chan) -{ - switch (chan->type) { - case kChannelTypeSocket: - return &chan->data.stream; - case kChannelTypeProc: - return chan->data.proc->out; - case kChannelTypeStdio: - return &chan->data.std.in; - case kChannelTypeInternal: - return NULL; - } - abort(); -} - - static bool channel_write(Channel *channel, WBuffer *buffer) { bool success = false; @@ -799,6 +773,12 @@ static void close_cb(Stream *stream, void *data) decref(data); } +static void close_cb2(Stream *stream, void *data) +{ + ILOG("close_cb2"); + close_channel(data); +} + /// @param source description of source function, rplugin name, TCP addr, etc static Channel *register_channel(ChannelType type, uint64_t id, MultiQueue *events, char *source)
2017-07-26 16:08:50 -07:00
let ch1 = jobstart(g:_nvim_args, {'rpc': v:true})
let child1_ch = rpcrequest(ch1, "nvim_get_api_info")[0]
call rpcnotify(ch1, 'nvim_eval', 'rpcrequest('.child1_ch.', "nvim_get_api_info")')
let ch2 = jobstart(g:_nvim_args, {'rpc': v:true})
let child2_ch = rpcrequest(ch2, "nvim_get_api_info")[0]
call rpcnotify(ch2, 'nvim_eval', 'rpcrequest('.child2_ch.', "nvim_get_api_info")')
call jobstop(ch1)
]])
eq(2, eval("1+1")) -- Still alive?
end)
describe('simple call', function()
it('works', function()
local function on_setup()
eq({4, 5, 6}, eval('rpcrequest('..cid..', "scall", 1, 2, 3)'))
stop()
end
local function on_request(method, args)
eq('scall', method)
eq({1, 2, 3}, args)
nvim('command', 'let g:result = [4, 5, 6]')
return eval('g:result')
end
run(on_request, nil, on_setup)
end)
end)
describe('empty string handling in arrays', function()
-- Because the msgpack encoding for an empty string was interpreted as an
-- error, msgpack arrays with an empty string looked like
-- [..., '', 0, ..., 0] after the conversion, regardless of the array
-- elements following the empty string.
it('works', function()
local function on_setup()
eq({1, 2, '', 3, 'asdf'}, eval('rpcrequest('..cid..', "nstring")'))
stop()
end
2016-02-02 12:23:12 -07:00
local function on_request()
-- No need to evaluate the args, we are only interested in
-- a response that contains an array with an empty string.
return {1, 2, '', 3, 'asdf'}
end
run(on_request, nil, on_setup)
end)
end)
describe('recursive call', function()
it('works', function()
local function on_setup()
nvim('set_var', 'result1', 0)
nvim('set_var', 'result2', 0)
nvim('set_var', 'result3', 0)
nvim('set_var', 'result4', 0)
nvim('command', 'let g:result1 = rpcrequest('..cid..', "rcall", 2)')
eq(4, nvim('get_var', 'result1'))
eq(8, nvim('get_var', 'result2'))
eq(16, nvim('get_var', 'result3'))
eq(32, nvim('get_var', 'result4'))
stop()
end
local function on_request(method, args)
eq('rcall', method)
local n = unpack(args) * 2
if n <= 16 then
local cmd
if n == 4 then
cmd = 'let g:result2 = rpcrequest('..cid..', "rcall", '..n..')'
elseif n == 8 then
cmd = 'let g:result3 = rpcrequest('..cid..', "rcall", '..n..')'
elseif n == 16 then
cmd = 'let g:result4 = rpcrequest('..cid..', "rcall", '..n..')'
end
nvim('command', cmd)
end
return n
end
run(on_request, nil, on_setup)
end)
end)
describe('requests and notifications interleaved', function()
it('does not delay notifications during pending request', function()
local received = false
local function on_setup()
eq("retval", funcs.rpcrequest(cid, "doit"))
stop()
end
local function on_request(method)
if method == "doit" then
funcs.rpcnotify(cid, "headsup")
eq(true,received)
return "retval"
end
end
local function on_notification(method)
if method == "headsup" then
received = true
end
end
run(on_request, on_notification, on_setup)
end)
-- This tests the following scenario:
--
-- server->client [request ] (1)
-- client->server [request ] (2) triggered by (1)
-- server->client [notification] (3) triggered by (2)
-- server->client [response ] (4) response to (2)
-- client->server [request ] (4) triggered by (3)
-- server->client [request ] (5) triggered by (4)
-- client->server [response ] (6) response to (1)
--
-- If the above scenario ever happens, the client connection will be closed
-- because (6) is returned after request (5) is sent, and nvim
-- only deals with one server->client request at a time. (In other words,
-- the client cannot send a response to a request that is not at the top
-- of nvim's request stack).
pending('will close connection if not properly synchronized', function()
local function on_setup()
eq('notified!', eval('rpcrequest('..cid..', "notify")'))
end
local function on_request(method)
if method == "notify" then
eq(1, eval('rpcnotify('..cid..', "notification")'))
return 'notified!'
elseif method == "nested" then
-- do some busywork, so the first request will return
-- before this one
for _ = 1, 5 do
eq(2, eval("1+1"))
end
eq(1, eval('rpcnotify('..cid..', "nested_done")'))
return 'done!'
end
end
local function on_notification(method)
if method == "notification" then
eq('done!', eval('rpcrequest('..cid..', "nested")'))
elseif method == "nested_done" then
ok(false, 'this should never have been sent')
end
end
run(on_request, on_notification, on_setup)
-- ignore disconnect failure, otherwise detected by after_each
clear()
end)
end)
rpc: close channel if stream was closed f_jobstop()/f_rpcstop() .. process_stop() .. process_close_in(proc) closes the write-stream of a RPC channel. But there might be a pending RPC notification on the queue, which may get processed just before the channel is closed. To handle that case, check the Stream.closed in channel.c:receive_msgpack(). Before this change, the above scenario could trigger this assert(!stream->closed) in wstream_write(): 0x00007f96e1cd3428 in __GI_raise (sig=sig@entry=6) at ../sysdeps/unix/sysv/linux/raise.c:54 0x00007f96e1cd502a in __GI_abort () at abort.c:89 0x00007f96e1ccbbd7 in __assert_fail_base (fmt=<optimized out>, assertion=assertion@entry=0x768f9b "!stream->closed", file=file@entry=0x768f70 "../src/nvim/event/wstream.c", line=line@entry=77, function=function@entry=0x768fb0 <__PRETTY_FUNCTION__.13735> "wstream_write") at assert.c:92 0x00007f96e1ccbc82 in __GI___assert_fail (assertion=0x768f9b "!stream->closed", file=0x768f70 "../src/nvim/event/wstream.c", line=77, function=0x768fb0 <__PRETTY_FUNCTION__.13735> "wstream_write") at assert.c:101 0x00000000004d2c1f in wstream_write (stream=0x7f96e0a35078, buffer=0x7f96e09f9b40) at ../src/nvim/event/wstream.c:77 0x00000000005857b2 in channel_write (channel=0x7f96e0ae5800, buffer=0x7f96e09f9b40) at ../src/nvim/msgpack_rpc/channel.c:551 0x000000000058567d in on_request_event (argv=0x7ffed792efa0) at ../src/nvim/msgpack_rpc/channel.c:523 0x00000000005854c8 in handle_request (channel=0x7f96e0ae5800, request=0x7ffed792f1b8) at ../src/nvim/msgpack_rpc/channel.c:503 0x00000000005850cb in parse_msgpack (channel=0x7f96e0ae5800) at ../src/nvim/msgpack_rpc/channel.c:423 0x0000000000584f90 in receive_msgpack (stream=0x7f96e0a35218, rbuf=0x7f96e0d1d4c0, c=22, data=0x7f96e0ae5800, eof=false) at ../src/nvim/msgpack_rpc/channel.c:389 0x00000000004d0b20 in read_event (argv=0x7ffed792f4a8) at ../src/nvim/event/rstream.c:190 0x00000000004ce462 in multiqueue_process_events (this=0x7f96e18172d0) at ../src/nvim/event/multiqueue.c:150 0x000000000059b630 in nv_event (cap=0x7ffed792f620) at ../src/nvim/normal.c:7908 0x000000000058be69 in normal_execute (state=0x7ffed792f580, key=-25341) at ../src/nvim/normal.c:1137 0x0000000000652463 in state_enter (s=0x7ffed792f580) at ../src/nvim/state.c:61 0x000000000058a1fe in normal_enter (cmdwin=false, noexmode=false) at ../src/nvim/normal.c:467 0x00000000005500c2 in main (argc=2, argv=0x7ffed792f8d8) at ../src/nvim/main.c:554 Alternative approach suggested by bfredl is to use close_cb of the process. My unsuccessful attempt is below. (It seems close_cb is queued too late, which is the similar problem addressed by this commit): commit 75fc12c6ab15711bdb7b18c6d42ec9d157f5145e Author: Justin M. Keyes <justinkz@gmail.com> Date: Fri Aug 18 01:30:41 2017 +0200 rpc: use Stream's close_cb instead of explicit check in receive_msgpack() diff --git a/src/nvim/event/process.c b/src/nvim/event/process.c index 8371d3cd482e..e52da23cdc40 100644 --- a/src/nvim/event/process.c +++ b/src/nvim/event/process.c @@ -416,6 +416,10 @@ static void on_process_exit(Process *proc) static void on_process_stream_close(Stream *stream, void *data) { Process *proc = data; + ILOG("on_process_stream_close"); + if (proc->stream_close_cb != NULL) { + proc->stream_close_cb(stream, proc->stream_close_data); + } decref(proc); } diff --git a/src/nvim/event/process.h b/src/nvim/event/process.h index 5c00e8e7ecd5..34a8d54f6f8c 100644 --- a/src/nvim/event/process.h +++ b/src/nvim/event/process.h @@ -26,6 +26,11 @@ struct process { Stream *in, *out, *err; process_exit_cb cb; internal_process_cb internal_exit_cb, internal_close_cb; + + // Called when any of the process streams (in/out/err) closes. + stream_close_cb stream_close_cb; + void *stream_close_data; + bool closed, detach; MultiQueue *events; }; @@ -50,6 +55,8 @@ static inline Process process_init(Loop *loop, ProcessType type, void *data) .closed = false, .internal_close_cb = NULL, .internal_exit_cb = NULL, + .stream_close_cb = NULL, + .stream_close_data = NULL, .detach = false }; } diff --git a/src/nvim/event/stream.c b/src/nvim/event/stream.c index 7c865bfe1e8c..c8720d1e45d9 100644 --- a/src/nvim/event/stream.c +++ b/src/nvim/event/stream.c @@ -95,7 +95,11 @@ void stream_close(Stream *stream, stream_close_cb on_stream_close, void *data) void stream_close_handle(Stream *stream) FUNC_ATTR_NONNULL_ALL { + ILOG("stream=%d", stream); + // LOG_CALLSTACK(); if (stream->uvstream) { + // problem: this schedules on the queue, but channel.c:receive_msgpack may + // be processed before close_cb is called by libuv. uv_close((uv_handle_t *)stream->uvstream, close_cb); } else { uv_close((uv_handle_t *)&stream->uv.idle, close_cb); @@ -105,6 +109,7 @@ void stream_close_handle(Stream *stream) static void close_cb(uv_handle_t *handle) { Stream *stream = handle->data; + ILOG(">>>>>>>>>>>>>>>>>>>>>>> stream=%p stream->internal_close_cb=%p", stream, stream->internal_close_cb); if (stream->buffer) { rbuffer_free(stream->buffer); } diff --git a/src/nvim/msgpack_rpc/channel.c b/src/nvim/msgpack_rpc/channel.c index 782eabe04e4a..dc2b794e366a 100644 --- a/src/nvim/msgpack_rpc/channel.c +++ b/src/nvim/msgpack_rpc/channel.c @@ -128,6 +128,8 @@ uint64_t channel_from_process(Process *proc, uint64_t id, char *source) source); incref(channel); // process channels are only closed by the exit_cb channel->data.proc = proc; + channel->data.proc->stream_close_cb = close_cb2; + channel->data.proc->stream_close_data = channel; wstream_init(proc->in, 0); rstream_init(proc->out, 0); @@ -387,17 +389,6 @@ static void receive_msgpack(Stream *stream, RBuffer *rbuf, size_t c, goto end; } - if ((chan_wstream(channel) != NULL && chan_wstream(channel)->closed) - || (chan_rstream(channel) != NULL && chan_rstream(channel)->closed)) { - char buf[256]; - snprintf(buf, sizeof(buf), - "ch %" PRIu64 ": stream closed unexpectedly. " - "closing channel", - channel->id); - call_set_error(channel, buf, WARN_LOG_LEVEL); - goto end; - } - size_t count = rbuffer_size(rbuf); DLOG("ch %" PRIu64 ": parsing %u bytes from msgpack Stream: %p", channel->id, count, stream); @@ -571,23 +562,6 @@ static Stream *chan_wstream(Channel *chan) abort(); } -/// Returns the Stream that a Channel reads from. -static Stream *chan_rstream(Channel *chan) -{ - switch (chan->type) { - case kChannelTypeSocket: - return &chan->data.stream; - case kChannelTypeProc: - return chan->data.proc->out; - case kChannelTypeStdio: - return &chan->data.std.in; - case kChannelTypeInternal: - return NULL; - } - abort(); -} - - static bool channel_write(Channel *channel, WBuffer *buffer) { bool success = false; @@ -799,6 +773,12 @@ static void close_cb(Stream *stream, void *data) decref(data); } +static void close_cb2(Stream *stream, void *data) +{ + ILOG("close_cb2"); + close_channel(data); +} + /// @param source description of source function, rplugin name, TCP addr, etc static Channel *register_channel(ChannelType type, uint64_t id, MultiQueue *events, char *source)
2017-07-26 16:08:50 -07:00
describe('recursive (child) nvim client', function()
if helpers.isCI('travis') and helpers.is_os('mac') then
-- XXX: Hangs Travis macOS since e9061117a5b8f195c3f26a5cb94e18ddd7752d86.
pending("[Hangs on Travis macOS. #5002]", function() end)
return
end
before_each(function()
command("let vim = rpcstart('"..nvim_prog.."', ['-u', 'NONE', '-i', 'NONE', '--cmd', 'set noswapfile', '--embed', '--headless'])")
neq(0, eval('vim'))
end)
after_each(function() command('call rpcstop(vim)') end)
rpc: close channel if stream was closed f_jobstop()/f_rpcstop() .. process_stop() .. process_close_in(proc) closes the write-stream of a RPC channel. But there might be a pending RPC notification on the queue, which may get processed just before the channel is closed. To handle that case, check the Stream.closed in channel.c:receive_msgpack(). Before this change, the above scenario could trigger this assert(!stream->closed) in wstream_write(): 0x00007f96e1cd3428 in __GI_raise (sig=sig@entry=6) at ../sysdeps/unix/sysv/linux/raise.c:54 0x00007f96e1cd502a in __GI_abort () at abort.c:89 0x00007f96e1ccbbd7 in __assert_fail_base (fmt=<optimized out>, assertion=assertion@entry=0x768f9b "!stream->closed", file=file@entry=0x768f70 "../src/nvim/event/wstream.c", line=line@entry=77, function=function@entry=0x768fb0 <__PRETTY_FUNCTION__.13735> "wstream_write") at assert.c:92 0x00007f96e1ccbc82 in __GI___assert_fail (assertion=0x768f9b "!stream->closed", file=0x768f70 "../src/nvim/event/wstream.c", line=77, function=0x768fb0 <__PRETTY_FUNCTION__.13735> "wstream_write") at assert.c:101 0x00000000004d2c1f in wstream_write (stream=0x7f96e0a35078, buffer=0x7f96e09f9b40) at ../src/nvim/event/wstream.c:77 0x00000000005857b2 in channel_write (channel=0x7f96e0ae5800, buffer=0x7f96e09f9b40) at ../src/nvim/msgpack_rpc/channel.c:551 0x000000000058567d in on_request_event (argv=0x7ffed792efa0) at ../src/nvim/msgpack_rpc/channel.c:523 0x00000000005854c8 in handle_request (channel=0x7f96e0ae5800, request=0x7ffed792f1b8) at ../src/nvim/msgpack_rpc/channel.c:503 0x00000000005850cb in parse_msgpack (channel=0x7f96e0ae5800) at ../src/nvim/msgpack_rpc/channel.c:423 0x0000000000584f90 in receive_msgpack (stream=0x7f96e0a35218, rbuf=0x7f96e0d1d4c0, c=22, data=0x7f96e0ae5800, eof=false) at ../src/nvim/msgpack_rpc/channel.c:389 0x00000000004d0b20 in read_event (argv=0x7ffed792f4a8) at ../src/nvim/event/rstream.c:190 0x00000000004ce462 in multiqueue_process_events (this=0x7f96e18172d0) at ../src/nvim/event/multiqueue.c:150 0x000000000059b630 in nv_event (cap=0x7ffed792f620) at ../src/nvim/normal.c:7908 0x000000000058be69 in normal_execute (state=0x7ffed792f580, key=-25341) at ../src/nvim/normal.c:1137 0x0000000000652463 in state_enter (s=0x7ffed792f580) at ../src/nvim/state.c:61 0x000000000058a1fe in normal_enter (cmdwin=false, noexmode=false) at ../src/nvim/normal.c:467 0x00000000005500c2 in main (argc=2, argv=0x7ffed792f8d8) at ../src/nvim/main.c:554 Alternative approach suggested by bfredl is to use close_cb of the process. My unsuccessful attempt is below. (It seems close_cb is queued too late, which is the similar problem addressed by this commit): commit 75fc12c6ab15711bdb7b18c6d42ec9d157f5145e Author: Justin M. Keyes <justinkz@gmail.com> Date: Fri Aug 18 01:30:41 2017 +0200 rpc: use Stream's close_cb instead of explicit check in receive_msgpack() diff --git a/src/nvim/event/process.c b/src/nvim/event/process.c index 8371d3cd482e..e52da23cdc40 100644 --- a/src/nvim/event/process.c +++ b/src/nvim/event/process.c @@ -416,6 +416,10 @@ static void on_process_exit(Process *proc) static void on_process_stream_close(Stream *stream, void *data) { Process *proc = data; + ILOG("on_process_stream_close"); + if (proc->stream_close_cb != NULL) { + proc->stream_close_cb(stream, proc->stream_close_data); + } decref(proc); } diff --git a/src/nvim/event/process.h b/src/nvim/event/process.h index 5c00e8e7ecd5..34a8d54f6f8c 100644 --- a/src/nvim/event/process.h +++ b/src/nvim/event/process.h @@ -26,6 +26,11 @@ struct process { Stream *in, *out, *err; process_exit_cb cb; internal_process_cb internal_exit_cb, internal_close_cb; + + // Called when any of the process streams (in/out/err) closes. + stream_close_cb stream_close_cb; + void *stream_close_data; + bool closed, detach; MultiQueue *events; }; @@ -50,6 +55,8 @@ static inline Process process_init(Loop *loop, ProcessType type, void *data) .closed = false, .internal_close_cb = NULL, .internal_exit_cb = NULL, + .stream_close_cb = NULL, + .stream_close_data = NULL, .detach = false }; } diff --git a/src/nvim/event/stream.c b/src/nvim/event/stream.c index 7c865bfe1e8c..c8720d1e45d9 100644 --- a/src/nvim/event/stream.c +++ b/src/nvim/event/stream.c @@ -95,7 +95,11 @@ void stream_close(Stream *stream, stream_close_cb on_stream_close, void *data) void stream_close_handle(Stream *stream) FUNC_ATTR_NONNULL_ALL { + ILOG("stream=%d", stream); + // LOG_CALLSTACK(); if (stream->uvstream) { + // problem: this schedules on the queue, but channel.c:receive_msgpack may + // be processed before close_cb is called by libuv. uv_close((uv_handle_t *)stream->uvstream, close_cb); } else { uv_close((uv_handle_t *)&stream->uv.idle, close_cb); @@ -105,6 +109,7 @@ void stream_close_handle(Stream *stream) static void close_cb(uv_handle_t *handle) { Stream *stream = handle->data; + ILOG(">>>>>>>>>>>>>>>>>>>>>>> stream=%p stream->internal_close_cb=%p", stream, stream->internal_close_cb); if (stream->buffer) { rbuffer_free(stream->buffer); } diff --git a/src/nvim/msgpack_rpc/channel.c b/src/nvim/msgpack_rpc/channel.c index 782eabe04e4a..dc2b794e366a 100644 --- a/src/nvim/msgpack_rpc/channel.c +++ b/src/nvim/msgpack_rpc/channel.c @@ -128,6 +128,8 @@ uint64_t channel_from_process(Process *proc, uint64_t id, char *source) source); incref(channel); // process channels are only closed by the exit_cb channel->data.proc = proc; + channel->data.proc->stream_close_cb = close_cb2; + channel->data.proc->stream_close_data = channel; wstream_init(proc->in, 0); rstream_init(proc->out, 0); @@ -387,17 +389,6 @@ static void receive_msgpack(Stream *stream, RBuffer *rbuf, size_t c, goto end; } - if ((chan_wstream(channel) != NULL && chan_wstream(channel)->closed) - || (chan_rstream(channel) != NULL && chan_rstream(channel)->closed)) { - char buf[256]; - snprintf(buf, sizeof(buf), - "ch %" PRIu64 ": stream closed unexpectedly. " - "closing channel", - channel->id); - call_set_error(channel, buf, WARN_LOG_LEVEL); - goto end; - } - size_t count = rbuffer_size(rbuf); DLOG("ch %" PRIu64 ": parsing %u bytes from msgpack Stream: %p", channel->id, count, stream); @@ -571,23 +562,6 @@ static Stream *chan_wstream(Channel *chan) abort(); } -/// Returns the Stream that a Channel reads from. -static Stream *chan_rstream(Channel *chan) -{ - switch (chan->type) { - case kChannelTypeSocket: - return &chan->data.stream; - case kChannelTypeProc: - return chan->data.proc->out; - case kChannelTypeStdio: - return &chan->data.std.in; - case kChannelTypeInternal: - return NULL; - } - abort(); -} - - static bool channel_write(Channel *channel, WBuffer *buffer) { bool success = false; @@ -799,6 +773,12 @@ static void close_cb(Stream *stream, void *data) decref(data); } +static void close_cb2(Stream *stream, void *data) +{ + ILOG("close_cb2"); + close_channel(data); +} + /// @param source description of source function, rplugin name, TCP addr, etc static Channel *register_channel(ChannelType type, uint64_t id, MultiQueue *events, char *source)
2017-07-26 16:08:50 -07:00
it('can send/receive notifications and make requests', function()
nvim('command', "call rpcnotify(vim, 'vim_set_current_line', 'SOME TEXT')")
-- Wait for the notification to complete.
nvim('command', "call rpcrequest(vim, 'vim_eval', '0')")
eq('SOME TEXT', eval("rpcrequest(vim, 'vim_get_current_line')"))
end)
it('can communicate buffers, tabpages, and windows', function()
eq({1}, eval("rpcrequest(vim, 'nvim_list_tabpages')"))
-- Window IDs start at 1000 (LOWEST_WIN_ID in vim.h)
eq({1000}, eval("rpcrequest(vim, 'nvim_list_wins')"))
local buf = eval("rpcrequest(vim, 'nvim_list_bufs')")[1]
eq(1, buf)
eval("rpcnotify(vim, 'buffer_set_line', "..buf..", 0, 'SOME TEXT')")
nvim('command', "call rpcrequest(vim, 'vim_eval', '0')") -- wait
eq('SOME TEXT', eval("rpcrequest(vim, 'buffer_get_line', "..buf..", 0)"))
-- Call get_lines(buf, range [0,0], strict_indexing)
eq({'SOME TEXT'}, eval("rpcrequest(vim, 'buffer_get_lines', "..buf..", 0, 1, 1)"))
end)
2015-04-06 10:37:09 -07:00
it('returns an error if the request failed', function()
eq({false, "Vim:Error invoking 'does-not-exist' on channel 3:\nInvalid method: does-not-exist" },
meth_pcall(eval, "rpcrequest(vim, 'does-not-exist')"))
2015-04-06 10:37:09 -07:00
end)
end)
rpc: close channel if stream was closed f_jobstop()/f_rpcstop() .. process_stop() .. process_close_in(proc) closes the write-stream of a RPC channel. But there might be a pending RPC notification on the queue, which may get processed just before the channel is closed. To handle that case, check the Stream.closed in channel.c:receive_msgpack(). Before this change, the above scenario could trigger this assert(!stream->closed) in wstream_write(): 0x00007f96e1cd3428 in __GI_raise (sig=sig@entry=6) at ../sysdeps/unix/sysv/linux/raise.c:54 0x00007f96e1cd502a in __GI_abort () at abort.c:89 0x00007f96e1ccbbd7 in __assert_fail_base (fmt=<optimized out>, assertion=assertion@entry=0x768f9b "!stream->closed", file=file@entry=0x768f70 "../src/nvim/event/wstream.c", line=line@entry=77, function=function@entry=0x768fb0 <__PRETTY_FUNCTION__.13735> "wstream_write") at assert.c:92 0x00007f96e1ccbc82 in __GI___assert_fail (assertion=0x768f9b "!stream->closed", file=0x768f70 "../src/nvim/event/wstream.c", line=77, function=0x768fb0 <__PRETTY_FUNCTION__.13735> "wstream_write") at assert.c:101 0x00000000004d2c1f in wstream_write (stream=0x7f96e0a35078, buffer=0x7f96e09f9b40) at ../src/nvim/event/wstream.c:77 0x00000000005857b2 in channel_write (channel=0x7f96e0ae5800, buffer=0x7f96e09f9b40) at ../src/nvim/msgpack_rpc/channel.c:551 0x000000000058567d in on_request_event (argv=0x7ffed792efa0) at ../src/nvim/msgpack_rpc/channel.c:523 0x00000000005854c8 in handle_request (channel=0x7f96e0ae5800, request=0x7ffed792f1b8) at ../src/nvim/msgpack_rpc/channel.c:503 0x00000000005850cb in parse_msgpack (channel=0x7f96e0ae5800) at ../src/nvim/msgpack_rpc/channel.c:423 0x0000000000584f90 in receive_msgpack (stream=0x7f96e0a35218, rbuf=0x7f96e0d1d4c0, c=22, data=0x7f96e0ae5800, eof=false) at ../src/nvim/msgpack_rpc/channel.c:389 0x00000000004d0b20 in read_event (argv=0x7ffed792f4a8) at ../src/nvim/event/rstream.c:190 0x00000000004ce462 in multiqueue_process_events (this=0x7f96e18172d0) at ../src/nvim/event/multiqueue.c:150 0x000000000059b630 in nv_event (cap=0x7ffed792f620) at ../src/nvim/normal.c:7908 0x000000000058be69 in normal_execute (state=0x7ffed792f580, key=-25341) at ../src/nvim/normal.c:1137 0x0000000000652463 in state_enter (s=0x7ffed792f580) at ../src/nvim/state.c:61 0x000000000058a1fe in normal_enter (cmdwin=false, noexmode=false) at ../src/nvim/normal.c:467 0x00000000005500c2 in main (argc=2, argv=0x7ffed792f8d8) at ../src/nvim/main.c:554 Alternative approach suggested by bfredl is to use close_cb of the process. My unsuccessful attempt is below. (It seems close_cb is queued too late, which is the similar problem addressed by this commit): commit 75fc12c6ab15711bdb7b18c6d42ec9d157f5145e Author: Justin M. Keyes <justinkz@gmail.com> Date: Fri Aug 18 01:30:41 2017 +0200 rpc: use Stream's close_cb instead of explicit check in receive_msgpack() diff --git a/src/nvim/event/process.c b/src/nvim/event/process.c index 8371d3cd482e..e52da23cdc40 100644 --- a/src/nvim/event/process.c +++ b/src/nvim/event/process.c @@ -416,6 +416,10 @@ static void on_process_exit(Process *proc) static void on_process_stream_close(Stream *stream, void *data) { Process *proc = data; + ILOG("on_process_stream_close"); + if (proc->stream_close_cb != NULL) { + proc->stream_close_cb(stream, proc->stream_close_data); + } decref(proc); } diff --git a/src/nvim/event/process.h b/src/nvim/event/process.h index 5c00e8e7ecd5..34a8d54f6f8c 100644 --- a/src/nvim/event/process.h +++ b/src/nvim/event/process.h @@ -26,6 +26,11 @@ struct process { Stream *in, *out, *err; process_exit_cb cb; internal_process_cb internal_exit_cb, internal_close_cb; + + // Called when any of the process streams (in/out/err) closes. + stream_close_cb stream_close_cb; + void *stream_close_data; + bool closed, detach; MultiQueue *events; }; @@ -50,6 +55,8 @@ static inline Process process_init(Loop *loop, ProcessType type, void *data) .closed = false, .internal_close_cb = NULL, .internal_exit_cb = NULL, + .stream_close_cb = NULL, + .stream_close_data = NULL, .detach = false }; } diff --git a/src/nvim/event/stream.c b/src/nvim/event/stream.c index 7c865bfe1e8c..c8720d1e45d9 100644 --- a/src/nvim/event/stream.c +++ b/src/nvim/event/stream.c @@ -95,7 +95,11 @@ void stream_close(Stream *stream, stream_close_cb on_stream_close, void *data) void stream_close_handle(Stream *stream) FUNC_ATTR_NONNULL_ALL { + ILOG("stream=%d", stream); + // LOG_CALLSTACK(); if (stream->uvstream) { + // problem: this schedules on the queue, but channel.c:receive_msgpack may + // be processed before close_cb is called by libuv. uv_close((uv_handle_t *)stream->uvstream, close_cb); } else { uv_close((uv_handle_t *)&stream->uv.idle, close_cb); @@ -105,6 +109,7 @@ void stream_close_handle(Stream *stream) static void close_cb(uv_handle_t *handle) { Stream *stream = handle->data; + ILOG(">>>>>>>>>>>>>>>>>>>>>>> stream=%p stream->internal_close_cb=%p", stream, stream->internal_close_cb); if (stream->buffer) { rbuffer_free(stream->buffer); } diff --git a/src/nvim/msgpack_rpc/channel.c b/src/nvim/msgpack_rpc/channel.c index 782eabe04e4a..dc2b794e366a 100644 --- a/src/nvim/msgpack_rpc/channel.c +++ b/src/nvim/msgpack_rpc/channel.c @@ -128,6 +128,8 @@ uint64_t channel_from_process(Process *proc, uint64_t id, char *source) source); incref(channel); // process channels are only closed by the exit_cb channel->data.proc = proc; + channel->data.proc->stream_close_cb = close_cb2; + channel->data.proc->stream_close_data = channel; wstream_init(proc->in, 0); rstream_init(proc->out, 0); @@ -387,17 +389,6 @@ static void receive_msgpack(Stream *stream, RBuffer *rbuf, size_t c, goto end; } - if ((chan_wstream(channel) != NULL && chan_wstream(channel)->closed) - || (chan_rstream(channel) != NULL && chan_rstream(channel)->closed)) { - char buf[256]; - snprintf(buf, sizeof(buf), - "ch %" PRIu64 ": stream closed unexpectedly. " - "closing channel", - channel->id); - call_set_error(channel, buf, WARN_LOG_LEVEL); - goto end; - } - size_t count = rbuffer_size(rbuf); DLOG("ch %" PRIu64 ": parsing %u bytes from msgpack Stream: %p", channel->id, count, stream); @@ -571,23 +562,6 @@ static Stream *chan_wstream(Channel *chan) abort(); } -/// Returns the Stream that a Channel reads from. -static Stream *chan_rstream(Channel *chan) -{ - switch (chan->type) { - case kChannelTypeSocket: - return &chan->data.stream; - case kChannelTypeProc: - return chan->data.proc->out; - case kChannelTypeStdio: - return &chan->data.std.in; - case kChannelTypeInternal: - return NULL; - } - abort(); -} - - static bool channel_write(Channel *channel, WBuffer *buffer) { bool success = false; @@ -799,6 +773,12 @@ static void close_cb(Stream *stream, void *data) decref(data); } +static void close_cb2(Stream *stream, void *data) +{ + ILOG("close_cb2"); + close_channel(data); +} + /// @param source description of source function, rplugin name, TCP addr, etc static Channel *register_channel(ChannelType type, uint64_t id, MultiQueue *events, char *source)
2017-07-26 16:08:50 -07:00
describe('jobstart()', function()
local jobid
before_each(function()
local channel = nvim('get_api_info')[1]
nvim('set_var', 'channel', channel)
source([[
function! s:OnEvent(id, data, event)
call rpcnotify(g:channel, a:event, 0, a:data)
endfunction
let g:job_opts = {
\ 'on_stderr': function('s:OnEvent'),
\ 'on_exit': function('s:OnEvent'),
\ 'user': 0,
\ 'rpc': v:true
\ }
]])
meths.set_var("args", {helpers.test_lua_prg,
'test/functional/api/rpc_fixture.lua'})
jobid = eval("jobstart(g:args, g:job_opts)")
neq(0, 'jobid')
end)
after_each(function()
pcall(funcs.jobstop, jobid)
end)
2017-01-11 21:08:19 -07:00
if helpers.pending_win32(pending) then return end
it('rpc and text stderr can be combined', function()
eq("ok",funcs.rpcrequest(jobid, "poll"))
funcs.rpcnotify(jobid, "ping")
eq({'notification', 'pong', {}}, next_msg())
eq("done!",funcs.rpcrequest(jobid, "write_stderr", "fluff\n"))
eq({'notification', 'stderr', {0, {'fluff', ''}}}, next_msg())
pcall(funcs.rpcrequest, jobid, "exit")
eq({'notification', 'stderr', {0, {''}}}, next_msg())
eq({'notification', 'exit', {0, 0}}, next_msg())
end)
end)
rpc: close channel if stream was closed f_jobstop()/f_rpcstop() .. process_stop() .. process_close_in(proc) closes the write-stream of a RPC channel. But there might be a pending RPC notification on the queue, which may get processed just before the channel is closed. To handle that case, check the Stream.closed in channel.c:receive_msgpack(). Before this change, the above scenario could trigger this assert(!stream->closed) in wstream_write(): 0x00007f96e1cd3428 in __GI_raise (sig=sig@entry=6) at ../sysdeps/unix/sysv/linux/raise.c:54 0x00007f96e1cd502a in __GI_abort () at abort.c:89 0x00007f96e1ccbbd7 in __assert_fail_base (fmt=<optimized out>, assertion=assertion@entry=0x768f9b "!stream->closed", file=file@entry=0x768f70 "../src/nvim/event/wstream.c", line=line@entry=77, function=function@entry=0x768fb0 <__PRETTY_FUNCTION__.13735> "wstream_write") at assert.c:92 0x00007f96e1ccbc82 in __GI___assert_fail (assertion=0x768f9b "!stream->closed", file=0x768f70 "../src/nvim/event/wstream.c", line=77, function=0x768fb0 <__PRETTY_FUNCTION__.13735> "wstream_write") at assert.c:101 0x00000000004d2c1f in wstream_write (stream=0x7f96e0a35078, buffer=0x7f96e09f9b40) at ../src/nvim/event/wstream.c:77 0x00000000005857b2 in channel_write (channel=0x7f96e0ae5800, buffer=0x7f96e09f9b40) at ../src/nvim/msgpack_rpc/channel.c:551 0x000000000058567d in on_request_event (argv=0x7ffed792efa0) at ../src/nvim/msgpack_rpc/channel.c:523 0x00000000005854c8 in handle_request (channel=0x7f96e0ae5800, request=0x7ffed792f1b8) at ../src/nvim/msgpack_rpc/channel.c:503 0x00000000005850cb in parse_msgpack (channel=0x7f96e0ae5800) at ../src/nvim/msgpack_rpc/channel.c:423 0x0000000000584f90 in receive_msgpack (stream=0x7f96e0a35218, rbuf=0x7f96e0d1d4c0, c=22, data=0x7f96e0ae5800, eof=false) at ../src/nvim/msgpack_rpc/channel.c:389 0x00000000004d0b20 in read_event (argv=0x7ffed792f4a8) at ../src/nvim/event/rstream.c:190 0x00000000004ce462 in multiqueue_process_events (this=0x7f96e18172d0) at ../src/nvim/event/multiqueue.c:150 0x000000000059b630 in nv_event (cap=0x7ffed792f620) at ../src/nvim/normal.c:7908 0x000000000058be69 in normal_execute (state=0x7ffed792f580, key=-25341) at ../src/nvim/normal.c:1137 0x0000000000652463 in state_enter (s=0x7ffed792f580) at ../src/nvim/state.c:61 0x000000000058a1fe in normal_enter (cmdwin=false, noexmode=false) at ../src/nvim/normal.c:467 0x00000000005500c2 in main (argc=2, argv=0x7ffed792f8d8) at ../src/nvim/main.c:554 Alternative approach suggested by bfredl is to use close_cb of the process. My unsuccessful attempt is below. (It seems close_cb is queued too late, which is the similar problem addressed by this commit): commit 75fc12c6ab15711bdb7b18c6d42ec9d157f5145e Author: Justin M. Keyes <justinkz@gmail.com> Date: Fri Aug 18 01:30:41 2017 +0200 rpc: use Stream's close_cb instead of explicit check in receive_msgpack() diff --git a/src/nvim/event/process.c b/src/nvim/event/process.c index 8371d3cd482e..e52da23cdc40 100644 --- a/src/nvim/event/process.c +++ b/src/nvim/event/process.c @@ -416,6 +416,10 @@ static void on_process_exit(Process *proc) static void on_process_stream_close(Stream *stream, void *data) { Process *proc = data; + ILOG("on_process_stream_close"); + if (proc->stream_close_cb != NULL) { + proc->stream_close_cb(stream, proc->stream_close_data); + } decref(proc); } diff --git a/src/nvim/event/process.h b/src/nvim/event/process.h index 5c00e8e7ecd5..34a8d54f6f8c 100644 --- a/src/nvim/event/process.h +++ b/src/nvim/event/process.h @@ -26,6 +26,11 @@ struct process { Stream *in, *out, *err; process_exit_cb cb; internal_process_cb internal_exit_cb, internal_close_cb; + + // Called when any of the process streams (in/out/err) closes. + stream_close_cb stream_close_cb; + void *stream_close_data; + bool closed, detach; MultiQueue *events; }; @@ -50,6 +55,8 @@ static inline Process process_init(Loop *loop, ProcessType type, void *data) .closed = false, .internal_close_cb = NULL, .internal_exit_cb = NULL, + .stream_close_cb = NULL, + .stream_close_data = NULL, .detach = false }; } diff --git a/src/nvim/event/stream.c b/src/nvim/event/stream.c index 7c865bfe1e8c..c8720d1e45d9 100644 --- a/src/nvim/event/stream.c +++ b/src/nvim/event/stream.c @@ -95,7 +95,11 @@ void stream_close(Stream *stream, stream_close_cb on_stream_close, void *data) void stream_close_handle(Stream *stream) FUNC_ATTR_NONNULL_ALL { + ILOG("stream=%d", stream); + // LOG_CALLSTACK(); if (stream->uvstream) { + // problem: this schedules on the queue, but channel.c:receive_msgpack may + // be processed before close_cb is called by libuv. uv_close((uv_handle_t *)stream->uvstream, close_cb); } else { uv_close((uv_handle_t *)&stream->uv.idle, close_cb); @@ -105,6 +109,7 @@ void stream_close_handle(Stream *stream) static void close_cb(uv_handle_t *handle) { Stream *stream = handle->data; + ILOG(">>>>>>>>>>>>>>>>>>>>>>> stream=%p stream->internal_close_cb=%p", stream, stream->internal_close_cb); if (stream->buffer) { rbuffer_free(stream->buffer); } diff --git a/src/nvim/msgpack_rpc/channel.c b/src/nvim/msgpack_rpc/channel.c index 782eabe04e4a..dc2b794e366a 100644 --- a/src/nvim/msgpack_rpc/channel.c +++ b/src/nvim/msgpack_rpc/channel.c @@ -128,6 +128,8 @@ uint64_t channel_from_process(Process *proc, uint64_t id, char *source) source); incref(channel); // process channels are only closed by the exit_cb channel->data.proc = proc; + channel->data.proc->stream_close_cb = close_cb2; + channel->data.proc->stream_close_data = channel; wstream_init(proc->in, 0); rstream_init(proc->out, 0); @@ -387,17 +389,6 @@ static void receive_msgpack(Stream *stream, RBuffer *rbuf, size_t c, goto end; } - if ((chan_wstream(channel) != NULL && chan_wstream(channel)->closed) - || (chan_rstream(channel) != NULL && chan_rstream(channel)->closed)) { - char buf[256]; - snprintf(buf, sizeof(buf), - "ch %" PRIu64 ": stream closed unexpectedly. " - "closing channel", - channel->id); - call_set_error(channel, buf, WARN_LOG_LEVEL); - goto end; - } - size_t count = rbuffer_size(rbuf); DLOG("ch %" PRIu64 ": parsing %u bytes from msgpack Stream: %p", channel->id, count, stream); @@ -571,23 +562,6 @@ static Stream *chan_wstream(Channel *chan) abort(); } -/// Returns the Stream that a Channel reads from. -static Stream *chan_rstream(Channel *chan) -{ - switch (chan->type) { - case kChannelTypeSocket: - return &chan->data.stream; - case kChannelTypeProc: - return chan->data.proc->out; - case kChannelTypeStdio: - return &chan->data.std.in; - case kChannelTypeInternal: - return NULL; - } - abort(); -} - - static bool channel_write(Channel *channel, WBuffer *buffer) { bool success = false; @@ -799,6 +773,12 @@ static void close_cb(Stream *stream, void *data) decref(data); } +static void close_cb2(Stream *stream, void *data) +{ + ILOG("close_cb2"); + close_channel(data); +} + /// @param source description of source function, rplugin name, TCP addr, etc static Channel *register_channel(ChannelType type, uint64_t id, MultiQueue *events, char *source)
2017-07-26 16:08:50 -07:00
describe('connecting to another (peer) nvim', function()
local nvim_argv = merge_args(helpers.nvim_argv, {'--headless'})
local function connect_test(server, mode, address)
local serverpid = funcs.getpid()
local client = spawn(nvim_argv)
set_session(client, true)
local clientpid = funcs.getpid()
neq(serverpid, clientpid)
local id = funcs.sockconnect(mode, address, {rpc=true})
ok(id > 0)
funcs.rpcrequest(id, 'nvim_set_current_line', 'hello')
local client_id = funcs.rpcrequest(id, 'nvim_get_api_info')[1]
set_session(server, true)
eq(serverpid, funcs.getpid())
eq('hello', meths.get_current_line())
-- method calls work both ways
funcs.rpcrequest(client_id, 'nvim_set_current_line', 'howdy!')
eq(id, funcs.rpcrequest(client_id, 'nvim_get_api_info')[1])
set_session(client, true)
eq(clientpid, funcs.getpid())
eq('howdy!', meths.get_current_line())
server:close()
client:close()
end
rpc: close channel if stream was closed f_jobstop()/f_rpcstop() .. process_stop() .. process_close_in(proc) closes the write-stream of a RPC channel. But there might be a pending RPC notification on the queue, which may get processed just before the channel is closed. To handle that case, check the Stream.closed in channel.c:receive_msgpack(). Before this change, the above scenario could trigger this assert(!stream->closed) in wstream_write(): 0x00007f96e1cd3428 in __GI_raise (sig=sig@entry=6) at ../sysdeps/unix/sysv/linux/raise.c:54 0x00007f96e1cd502a in __GI_abort () at abort.c:89 0x00007f96e1ccbbd7 in __assert_fail_base (fmt=<optimized out>, assertion=assertion@entry=0x768f9b "!stream->closed", file=file@entry=0x768f70 "../src/nvim/event/wstream.c", line=line@entry=77, function=function@entry=0x768fb0 <__PRETTY_FUNCTION__.13735> "wstream_write") at assert.c:92 0x00007f96e1ccbc82 in __GI___assert_fail (assertion=0x768f9b "!stream->closed", file=0x768f70 "../src/nvim/event/wstream.c", line=77, function=0x768fb0 <__PRETTY_FUNCTION__.13735> "wstream_write") at assert.c:101 0x00000000004d2c1f in wstream_write (stream=0x7f96e0a35078, buffer=0x7f96e09f9b40) at ../src/nvim/event/wstream.c:77 0x00000000005857b2 in channel_write (channel=0x7f96e0ae5800, buffer=0x7f96e09f9b40) at ../src/nvim/msgpack_rpc/channel.c:551 0x000000000058567d in on_request_event (argv=0x7ffed792efa0) at ../src/nvim/msgpack_rpc/channel.c:523 0x00000000005854c8 in handle_request (channel=0x7f96e0ae5800, request=0x7ffed792f1b8) at ../src/nvim/msgpack_rpc/channel.c:503 0x00000000005850cb in parse_msgpack (channel=0x7f96e0ae5800) at ../src/nvim/msgpack_rpc/channel.c:423 0x0000000000584f90 in receive_msgpack (stream=0x7f96e0a35218, rbuf=0x7f96e0d1d4c0, c=22, data=0x7f96e0ae5800, eof=false) at ../src/nvim/msgpack_rpc/channel.c:389 0x00000000004d0b20 in read_event (argv=0x7ffed792f4a8) at ../src/nvim/event/rstream.c:190 0x00000000004ce462 in multiqueue_process_events (this=0x7f96e18172d0) at ../src/nvim/event/multiqueue.c:150 0x000000000059b630 in nv_event (cap=0x7ffed792f620) at ../src/nvim/normal.c:7908 0x000000000058be69 in normal_execute (state=0x7ffed792f580, key=-25341) at ../src/nvim/normal.c:1137 0x0000000000652463 in state_enter (s=0x7ffed792f580) at ../src/nvim/state.c:61 0x000000000058a1fe in normal_enter (cmdwin=false, noexmode=false) at ../src/nvim/normal.c:467 0x00000000005500c2 in main (argc=2, argv=0x7ffed792f8d8) at ../src/nvim/main.c:554 Alternative approach suggested by bfredl is to use close_cb of the process. My unsuccessful attempt is below. (It seems close_cb is queued too late, which is the similar problem addressed by this commit): commit 75fc12c6ab15711bdb7b18c6d42ec9d157f5145e Author: Justin M. Keyes <justinkz@gmail.com> Date: Fri Aug 18 01:30:41 2017 +0200 rpc: use Stream's close_cb instead of explicit check in receive_msgpack() diff --git a/src/nvim/event/process.c b/src/nvim/event/process.c index 8371d3cd482e..e52da23cdc40 100644 --- a/src/nvim/event/process.c +++ b/src/nvim/event/process.c @@ -416,6 +416,10 @@ static void on_process_exit(Process *proc) static void on_process_stream_close(Stream *stream, void *data) { Process *proc = data; + ILOG("on_process_stream_close"); + if (proc->stream_close_cb != NULL) { + proc->stream_close_cb(stream, proc->stream_close_data); + } decref(proc); } diff --git a/src/nvim/event/process.h b/src/nvim/event/process.h index 5c00e8e7ecd5..34a8d54f6f8c 100644 --- a/src/nvim/event/process.h +++ b/src/nvim/event/process.h @@ -26,6 +26,11 @@ struct process { Stream *in, *out, *err; process_exit_cb cb; internal_process_cb internal_exit_cb, internal_close_cb; + + // Called when any of the process streams (in/out/err) closes. + stream_close_cb stream_close_cb; + void *stream_close_data; + bool closed, detach; MultiQueue *events; }; @@ -50,6 +55,8 @@ static inline Process process_init(Loop *loop, ProcessType type, void *data) .closed = false, .internal_close_cb = NULL, .internal_exit_cb = NULL, + .stream_close_cb = NULL, + .stream_close_data = NULL, .detach = false }; } diff --git a/src/nvim/event/stream.c b/src/nvim/event/stream.c index 7c865bfe1e8c..c8720d1e45d9 100644 --- a/src/nvim/event/stream.c +++ b/src/nvim/event/stream.c @@ -95,7 +95,11 @@ void stream_close(Stream *stream, stream_close_cb on_stream_close, void *data) void stream_close_handle(Stream *stream) FUNC_ATTR_NONNULL_ALL { + ILOG("stream=%d", stream); + // LOG_CALLSTACK(); if (stream->uvstream) { + // problem: this schedules on the queue, but channel.c:receive_msgpack may + // be processed before close_cb is called by libuv. uv_close((uv_handle_t *)stream->uvstream, close_cb); } else { uv_close((uv_handle_t *)&stream->uv.idle, close_cb); @@ -105,6 +109,7 @@ void stream_close_handle(Stream *stream) static void close_cb(uv_handle_t *handle) { Stream *stream = handle->data; + ILOG(">>>>>>>>>>>>>>>>>>>>>>> stream=%p stream->internal_close_cb=%p", stream, stream->internal_close_cb); if (stream->buffer) { rbuffer_free(stream->buffer); } diff --git a/src/nvim/msgpack_rpc/channel.c b/src/nvim/msgpack_rpc/channel.c index 782eabe04e4a..dc2b794e366a 100644 --- a/src/nvim/msgpack_rpc/channel.c +++ b/src/nvim/msgpack_rpc/channel.c @@ -128,6 +128,8 @@ uint64_t channel_from_process(Process *proc, uint64_t id, char *source) source); incref(channel); // process channels are only closed by the exit_cb channel->data.proc = proc; + channel->data.proc->stream_close_cb = close_cb2; + channel->data.proc->stream_close_data = channel; wstream_init(proc->in, 0); rstream_init(proc->out, 0); @@ -387,17 +389,6 @@ static void receive_msgpack(Stream *stream, RBuffer *rbuf, size_t c, goto end; } - if ((chan_wstream(channel) != NULL && chan_wstream(channel)->closed) - || (chan_rstream(channel) != NULL && chan_rstream(channel)->closed)) { - char buf[256]; - snprintf(buf, sizeof(buf), - "ch %" PRIu64 ": stream closed unexpectedly. " - "closing channel", - channel->id); - call_set_error(channel, buf, WARN_LOG_LEVEL); - goto end; - } - size_t count = rbuffer_size(rbuf); DLOG("ch %" PRIu64 ": parsing %u bytes from msgpack Stream: %p", channel->id, count, stream); @@ -571,23 +562,6 @@ static Stream *chan_wstream(Channel *chan) abort(); } -/// Returns the Stream that a Channel reads from. -static Stream *chan_rstream(Channel *chan) -{ - switch (chan->type) { - case kChannelTypeSocket: - return &chan->data.stream; - case kChannelTypeProc: - return chan->data.proc->out; - case kChannelTypeStdio: - return &chan->data.std.in; - case kChannelTypeInternal: - return NULL; - } - abort(); -} - - static bool channel_write(Channel *channel, WBuffer *buffer) { bool success = false; @@ -799,6 +773,12 @@ static void close_cb(Stream *stream, void *data) decref(data); } +static void close_cb2(Stream *stream, void *data) +{ + ILOG("close_cb2"); + close_channel(data); +} + /// @param source description of source function, rplugin name, TCP addr, etc static Channel *register_channel(ChannelType type, uint64_t id, MultiQueue *events, char *source)
2017-07-26 16:08:50 -07:00
it('via named pipe', function()
local server = spawn(nvim_argv)
set_session(server)
local address = funcs.serverlist()[1]
local first = string.sub(address,1,1)
ok(first == '/' or first == '\\')
connect_test(server, 'pipe', address)
end)
it('via ipv4 address', function()
local server = spawn(nvim_argv)
set_session(server)
local status, address = pcall(funcs.serverstart, "127.0.0.1:")
if not status then
pending('no ipv4 stack', function() end)
return
end
eq('127.0.0.1:', string.sub(address,1,10))
connect_test(server, 'tcp', address)
end)
it('via ipv6 address', function()
local server = spawn(nvim_argv)
set_session(server)
local status, address = pcall(funcs.serverstart, '::1:')
if not status then
pending('no ipv6 stack', function() end)
return
end
eq('::1:', string.sub(address,1,4))
connect_test(server, 'tcp', address)
end)
rpc: close channel if stream was closed f_jobstop()/f_rpcstop() .. process_stop() .. process_close_in(proc) closes the write-stream of a RPC channel. But there might be a pending RPC notification on the queue, which may get processed just before the channel is closed. To handle that case, check the Stream.closed in channel.c:receive_msgpack(). Before this change, the above scenario could trigger this assert(!stream->closed) in wstream_write(): 0x00007f96e1cd3428 in __GI_raise (sig=sig@entry=6) at ../sysdeps/unix/sysv/linux/raise.c:54 0x00007f96e1cd502a in __GI_abort () at abort.c:89 0x00007f96e1ccbbd7 in __assert_fail_base (fmt=<optimized out>, assertion=assertion@entry=0x768f9b "!stream->closed", file=file@entry=0x768f70 "../src/nvim/event/wstream.c", line=line@entry=77, function=function@entry=0x768fb0 <__PRETTY_FUNCTION__.13735> "wstream_write") at assert.c:92 0x00007f96e1ccbc82 in __GI___assert_fail (assertion=0x768f9b "!stream->closed", file=0x768f70 "../src/nvim/event/wstream.c", line=77, function=0x768fb0 <__PRETTY_FUNCTION__.13735> "wstream_write") at assert.c:101 0x00000000004d2c1f in wstream_write (stream=0x7f96e0a35078, buffer=0x7f96e09f9b40) at ../src/nvim/event/wstream.c:77 0x00000000005857b2 in channel_write (channel=0x7f96e0ae5800, buffer=0x7f96e09f9b40) at ../src/nvim/msgpack_rpc/channel.c:551 0x000000000058567d in on_request_event (argv=0x7ffed792efa0) at ../src/nvim/msgpack_rpc/channel.c:523 0x00000000005854c8 in handle_request (channel=0x7f96e0ae5800, request=0x7ffed792f1b8) at ../src/nvim/msgpack_rpc/channel.c:503 0x00000000005850cb in parse_msgpack (channel=0x7f96e0ae5800) at ../src/nvim/msgpack_rpc/channel.c:423 0x0000000000584f90 in receive_msgpack (stream=0x7f96e0a35218, rbuf=0x7f96e0d1d4c0, c=22, data=0x7f96e0ae5800, eof=false) at ../src/nvim/msgpack_rpc/channel.c:389 0x00000000004d0b20 in read_event (argv=0x7ffed792f4a8) at ../src/nvim/event/rstream.c:190 0x00000000004ce462 in multiqueue_process_events (this=0x7f96e18172d0) at ../src/nvim/event/multiqueue.c:150 0x000000000059b630 in nv_event (cap=0x7ffed792f620) at ../src/nvim/normal.c:7908 0x000000000058be69 in normal_execute (state=0x7ffed792f580, key=-25341) at ../src/nvim/normal.c:1137 0x0000000000652463 in state_enter (s=0x7ffed792f580) at ../src/nvim/state.c:61 0x000000000058a1fe in normal_enter (cmdwin=false, noexmode=false) at ../src/nvim/normal.c:467 0x00000000005500c2 in main (argc=2, argv=0x7ffed792f8d8) at ../src/nvim/main.c:554 Alternative approach suggested by bfredl is to use close_cb of the process. My unsuccessful attempt is below. (It seems close_cb is queued too late, which is the similar problem addressed by this commit): commit 75fc12c6ab15711bdb7b18c6d42ec9d157f5145e Author: Justin M. Keyes <justinkz@gmail.com> Date: Fri Aug 18 01:30:41 2017 +0200 rpc: use Stream's close_cb instead of explicit check in receive_msgpack() diff --git a/src/nvim/event/process.c b/src/nvim/event/process.c index 8371d3cd482e..e52da23cdc40 100644 --- a/src/nvim/event/process.c +++ b/src/nvim/event/process.c @@ -416,6 +416,10 @@ static void on_process_exit(Process *proc) static void on_process_stream_close(Stream *stream, void *data) { Process *proc = data; + ILOG("on_process_stream_close"); + if (proc->stream_close_cb != NULL) { + proc->stream_close_cb(stream, proc->stream_close_data); + } decref(proc); } diff --git a/src/nvim/event/process.h b/src/nvim/event/process.h index 5c00e8e7ecd5..34a8d54f6f8c 100644 --- a/src/nvim/event/process.h +++ b/src/nvim/event/process.h @@ -26,6 +26,11 @@ struct process { Stream *in, *out, *err; process_exit_cb cb; internal_process_cb internal_exit_cb, internal_close_cb; + + // Called when any of the process streams (in/out/err) closes. + stream_close_cb stream_close_cb; + void *stream_close_data; + bool closed, detach; MultiQueue *events; }; @@ -50,6 +55,8 @@ static inline Process process_init(Loop *loop, ProcessType type, void *data) .closed = false, .internal_close_cb = NULL, .internal_exit_cb = NULL, + .stream_close_cb = NULL, + .stream_close_data = NULL, .detach = false }; } diff --git a/src/nvim/event/stream.c b/src/nvim/event/stream.c index 7c865bfe1e8c..c8720d1e45d9 100644 --- a/src/nvim/event/stream.c +++ b/src/nvim/event/stream.c @@ -95,7 +95,11 @@ void stream_close(Stream *stream, stream_close_cb on_stream_close, void *data) void stream_close_handle(Stream *stream) FUNC_ATTR_NONNULL_ALL { + ILOG("stream=%d", stream); + // LOG_CALLSTACK(); if (stream->uvstream) { + // problem: this schedules on the queue, but channel.c:receive_msgpack may + // be processed before close_cb is called by libuv. uv_close((uv_handle_t *)stream->uvstream, close_cb); } else { uv_close((uv_handle_t *)&stream->uv.idle, close_cb); @@ -105,6 +109,7 @@ void stream_close_handle(Stream *stream) static void close_cb(uv_handle_t *handle) { Stream *stream = handle->data; + ILOG(">>>>>>>>>>>>>>>>>>>>>>> stream=%p stream->internal_close_cb=%p", stream, stream->internal_close_cb); if (stream->buffer) { rbuffer_free(stream->buffer); } diff --git a/src/nvim/msgpack_rpc/channel.c b/src/nvim/msgpack_rpc/channel.c index 782eabe04e4a..dc2b794e366a 100644 --- a/src/nvim/msgpack_rpc/channel.c +++ b/src/nvim/msgpack_rpc/channel.c @@ -128,6 +128,8 @@ uint64_t channel_from_process(Process *proc, uint64_t id, char *source) source); incref(channel); // process channels are only closed by the exit_cb channel->data.proc = proc; + channel->data.proc->stream_close_cb = close_cb2; + channel->data.proc->stream_close_data = channel; wstream_init(proc->in, 0); rstream_init(proc->out, 0); @@ -387,17 +389,6 @@ static void receive_msgpack(Stream *stream, RBuffer *rbuf, size_t c, goto end; } - if ((chan_wstream(channel) != NULL && chan_wstream(channel)->closed) - || (chan_rstream(channel) != NULL && chan_rstream(channel)->closed)) { - char buf[256]; - snprintf(buf, sizeof(buf), - "ch %" PRIu64 ": stream closed unexpectedly. " - "closing channel", - channel->id); - call_set_error(channel, buf, WARN_LOG_LEVEL); - goto end; - } - size_t count = rbuffer_size(rbuf); DLOG("ch %" PRIu64 ": parsing %u bytes from msgpack Stream: %p", channel->id, count, stream); @@ -571,23 +562,6 @@ static Stream *chan_wstream(Channel *chan) abort(); } -/// Returns the Stream that a Channel reads from. -static Stream *chan_rstream(Channel *chan) -{ - switch (chan->type) { - case kChannelTypeSocket: - return &chan->data.stream; - case kChannelTypeProc: - return chan->data.proc->out; - case kChannelTypeStdio: - return &chan->data.std.in; - case kChannelTypeInternal: - return NULL; - } - abort(); -} - - static bool channel_write(Channel *channel, WBuffer *buffer) { bool success = false; @@ -799,6 +773,12 @@ static void close_cb(Stream *stream, void *data) decref(data); } +static void close_cb2(Stream *stream, void *data) +{ + ILOG("close_cb2"); + close_channel(data); +} + /// @param source description of source function, rplugin name, TCP addr, etc static Channel *register_channel(ChannelType type, uint64_t id, MultiQueue *events, char *source)
2017-07-26 16:08:50 -07:00
it('via hostname', function()
local server = spawn(nvim_argv)
set_session(server)
local address = funcs.serverstart("localhost:")
eq('localhost:', string.sub(address,1,10))
connect_test(server, 'tcp', address)
end)
end)
describe('connecting to its own pipe address', function()
it('does not deadlock', function()
if not helpers.isCI('travis') and helpers.is_os('mac') then
-- It does, in fact, deadlock on QuickBuild. #6851
pending("deadlocks on QuickBuild", function() end)
return
end
local address = funcs.serverlist()[1]
local first = string.sub(address,1,1)
ok(first == '/' or first == '\\')
local serverpid = funcs.getpid()
local id = funcs.sockconnect('pipe', address, {rpc=true})
funcs.rpcrequest(id, 'nvim_set_current_line', 'hello')
eq('hello', meths.get_current_line())
eq(serverpid, funcs.rpcrequest(id, "nvim_eval", "getpid()"))
eq(id, funcs.rpcrequest(id, 'nvim_get_api_info')[1])
end)
end)
end)