mirror of
https://github.com/neovim/neovim.git
synced 2025-01-01 17:23:36 -07:00
Merge #7081 from justinmk/rpcstop
rpc: close channel if stream was closed
This commit is contained in:
commit
6e7a8c3fe2
@ -233,8 +233,7 @@ void process_stop(Process *proc) FUNC_ATTR_NONNULL_ALL
|
||||
switch (proc->type) {
|
||||
case kProcessTypeUv:
|
||||
// Close the process's stdin. If the process doesn't close its own
|
||||
// stdout/stderr, they will be closed when it exits(possibly due to being
|
||||
// terminated after a timeout)
|
||||
// stdout/stderr, they will be closed when it exits (voluntarily or not).
|
||||
process_close_in(proc);
|
||||
ILOG("Sending SIGTERM to pid %d", proc->pid);
|
||||
uv_kill(proc->pid, SIGTERM);
|
||||
|
@ -118,7 +118,7 @@ static void read_cb(uv_stream_t *uvstream, ssize_t cnt, const uv_buf_t *buf)
|
||||
// to `alloc_cb` will return the same unused pointer(`rbuffer_produced`
|
||||
// won't be called)
|
||||
&& cnt != 0) {
|
||||
DLOG("Closing Stream (%p): %s (%s)", stream,
|
||||
DLOG("closing Stream: %p: %s (%s)", stream,
|
||||
uv_err_name((int)cnt), os_strerror((int)cnt));
|
||||
// Read error or EOF, either way stop the stream and invoke the callback
|
||||
// with eof == true
|
||||
|
@ -7,6 +7,7 @@
|
||||
|
||||
#include <uv.h>
|
||||
|
||||
#include "nvim/log.h"
|
||||
#include "nvim/rbuffer.h"
|
||||
#include "nvim/macros.h"
|
||||
#include "nvim/event/stream.h"
|
||||
@ -81,6 +82,7 @@ void stream_close(Stream *stream, stream_close_cb on_stream_close, void *data)
|
||||
FUNC_ATTR_NONNULL_ARG(1)
|
||||
{
|
||||
assert(!stream->closed);
|
||||
DLOG("closing Stream: %p", stream);
|
||||
stream->closed = true;
|
||||
stream->close_cb = on_stream_close;
|
||||
stream->close_cb_data = data;
|
||||
|
@ -8,6 +8,7 @@
|
||||
|
||||
#include <uv.h>
|
||||
|
||||
#include "nvim/log.h"
|
||||
#include "nvim/event/loop.h"
|
||||
#include "nvim/event/wstream.h"
|
||||
#include "nvim/vim.h"
|
||||
|
@ -250,7 +250,7 @@ static bool v_do_log_to_file(FILE *log_file, int log_level,
|
||||
static const char *log_levels[] = {
|
||||
[DEBUG_LOG_LEVEL] = "DEBUG",
|
||||
[INFO_LOG_LEVEL] = "INFO ",
|
||||
[WARNING_LOG_LEVEL] = "WARN ",
|
||||
[WARN_LOG_LEVEL] = "WARN ",
|
||||
[ERROR_LOG_LEVEL] = "ERROR",
|
||||
};
|
||||
assert(log_level >= DEBUG_LOG_LEVEL && log_level <= ERROR_LOG_LEVEL);
|
||||
|
@ -6,7 +6,7 @@
|
||||
|
||||
#define DEBUG_LOG_LEVEL 0
|
||||
#define INFO_LOG_LEVEL 1
|
||||
#define WARNING_LOG_LEVEL 2
|
||||
#define WARN_LOG_LEVEL 2
|
||||
#define ERROR_LOG_LEVEL 3
|
||||
|
||||
#define DLOG(...)
|
||||
@ -43,12 +43,12 @@
|
||||
__VA_ARGS__)
|
||||
#endif
|
||||
|
||||
#if MIN_LOG_LEVEL <= WARNING_LOG_LEVEL
|
||||
#if MIN_LOG_LEVEL <= WARN_LOG_LEVEL
|
||||
# undef WLOG
|
||||
# undef WLOGN
|
||||
# define WLOG(...) do_log(WARNING_LOG_LEVEL, __func__, __LINE__, true, \
|
||||
# define WLOG(...) do_log(WARN_LOG_LEVEL, __func__, __LINE__, true, \
|
||||
__VA_ARGS__)
|
||||
# define WLOGN(...) do_log(WARNING_LOG_LEVEL, __func__, __LINE__, false, \
|
||||
# define WLOGN(...) do_log(WARN_LOG_LEVEL, __func__, __LINE__, false, \
|
||||
__VA_ARGS__)
|
||||
#endif
|
||||
|
||||
|
@ -62,7 +62,7 @@ typedef struct {
|
||||
ChannelType type;
|
||||
msgpack_unpacker *unpacker;
|
||||
union {
|
||||
Stream stream;
|
||||
Stream stream; // bidirectional (socket)
|
||||
Process *proc;
|
||||
struct {
|
||||
Stream in;
|
||||
@ -133,6 +133,9 @@ uint64_t channel_from_process(Process *proc, uint64_t id, char *source)
|
||||
rstream_init(proc->out, 0);
|
||||
rstream_start(proc->out, receive_msgpack, channel);
|
||||
|
||||
DLOG("ch %" PRIu64 " in-stream=%p out-stream=%p", channel->id, proc->in,
|
||||
proc->out);
|
||||
|
||||
return channel->id;
|
||||
}
|
||||
|
||||
@ -150,6 +153,9 @@ void channel_from_connection(SocketWatcher *watcher)
|
||||
wstream_init(&channel->data.stream, 0);
|
||||
rstream_init(&channel->data.stream, CHANNEL_BUFFER_SIZE);
|
||||
rstream_start(&channel->data.stream, receive_msgpack, channel);
|
||||
|
||||
DLOG("ch %" PRIu64 " in/out-stream=%p", channel->id,
|
||||
&channel->data.stream);
|
||||
}
|
||||
|
||||
/// @param source description of source function, rplugin name, TCP addr, etc
|
||||
@ -344,6 +350,9 @@ void channel_from_stdio(void)
|
||||
rstream_start(&channel->data.std.in, receive_msgpack, channel);
|
||||
// write stream
|
||||
wstream_init_fd(&main_loop, &channel->data.std.out, 1, 0);
|
||||
|
||||
DLOG("ch %" PRIu64 " in-stream=%p out-stream=%p", channel->id,
|
||||
&channel->data.std.in, &channel->data.std.out);
|
||||
}
|
||||
|
||||
/// Creates a loopback channel. This is used to avoid deadlock
|
||||
@ -363,6 +372,7 @@ void channel_process_exit(uint64_t id, int status)
|
||||
decref(channel);
|
||||
}
|
||||
|
||||
// rstream.c:read_event() invokes this as stream->read_cb().
|
||||
static void receive_msgpack(Stream *stream, RBuffer *rbuf, size_t c,
|
||||
void *data, bool eof)
|
||||
{
|
||||
@ -374,12 +384,24 @@ static void receive_msgpack(Stream *stream, RBuffer *rbuf, size_t c,
|
||||
char buf[256];
|
||||
snprintf(buf, sizeof(buf), "ch %" PRIu64 " was closed by the client",
|
||||
channel->id);
|
||||
call_set_error(channel, buf, WARNING_LOG_LEVEL);
|
||||
call_set_error(channel, buf, WARN_LOG_LEVEL);
|
||||
goto end;
|
||||
}
|
||||
|
||||
if ((chan_wstream(channel) != NULL && chan_wstream(channel)->closed)
|
||||
|| (chan_rstream(channel) != NULL && chan_rstream(channel)->closed)) {
|
||||
char buf[256];
|
||||
snprintf(buf, sizeof(buf),
|
||||
"ch %" PRIu64 ": stream closed unexpectedly. "
|
||||
"closing channel",
|
||||
channel->id);
|
||||
call_set_error(channel, buf, WARN_LOG_LEVEL);
|
||||
goto end;
|
||||
}
|
||||
|
||||
size_t count = rbuffer_size(rbuf);
|
||||
DLOG("parsing %u bytes of msgpack data from Stream(%p)", count, stream);
|
||||
DLOG("ch %" PRIu64 ": parsing %u bytes from msgpack Stream: %p",
|
||||
channel->id, count, stream);
|
||||
|
||||
// Feed the unpacker with data
|
||||
msgpack_unpacker_reserve_buffer(channel->unpacker, count);
|
||||
@ -435,7 +457,7 @@ static void parse_msgpack(Channel *channel)
|
||||
// causes for this error(search for 'goto _failed')
|
||||
//
|
||||
// A not so uncommon cause for this might be deserializing objects with
|
||||
// a high nesting level: msgpack will break when it's internal parse stack
|
||||
// a high nesting level: msgpack will break when its internal parse stack
|
||||
// size exceeds MSGPACK_EMBED_STACK_SIZE (defined as 32 by default)
|
||||
send_error(channel, 0, "Invalid msgpack payload. "
|
||||
"This error can also happen when deserializing "
|
||||
@ -534,6 +556,39 @@ static void on_request_event(void **argv)
|
||||
api_clear_error(&error);
|
||||
}
|
||||
|
||||
/// Returns the Stream that a Channel writes to.
|
||||
static Stream *chan_wstream(Channel *chan)
|
||||
{
|
||||
switch (chan->type) {
|
||||
case kChannelTypeSocket:
|
||||
return &chan->data.stream;
|
||||
case kChannelTypeProc:
|
||||
return chan->data.proc->in;
|
||||
case kChannelTypeStdio:
|
||||
return &chan->data.std.out;
|
||||
case kChannelTypeInternal:
|
||||
return NULL;
|
||||
}
|
||||
abort();
|
||||
}
|
||||
|
||||
/// Returns the Stream that a Channel reads from.
|
||||
static Stream *chan_rstream(Channel *chan)
|
||||
{
|
||||
switch (chan->type) {
|
||||
case kChannelTypeSocket:
|
||||
return &chan->data.stream;
|
||||
case kChannelTypeProc:
|
||||
return chan->data.proc->out;
|
||||
case kChannelTypeStdio:
|
||||
return &chan->data.std.in;
|
||||
case kChannelTypeInternal:
|
||||
return NULL;
|
||||
}
|
||||
abort();
|
||||
}
|
||||
|
||||
|
||||
static bool channel_write(Channel *channel, WBuffer *buffer)
|
||||
{
|
||||
bool success = false;
|
||||
@ -545,13 +600,9 @@ static bool channel_write(Channel *channel, WBuffer *buffer)
|
||||
|
||||
switch (channel->type) {
|
||||
case kChannelTypeSocket:
|
||||
success = wstream_write(&channel->data.stream, buffer);
|
||||
break;
|
||||
case kChannelTypeProc:
|
||||
success = wstream_write(channel->data.proc->in, buffer);
|
||||
break;
|
||||
case kChannelTypeStdio:
|
||||
success = wstream_write(&channel->data.std.out, buffer);
|
||||
success = wstream_write(chan_wstream(channel), buffer);
|
||||
break;
|
||||
case kChannelTypeInternal:
|
||||
incref(channel);
|
||||
@ -565,8 +616,8 @@ static bool channel_write(Channel *channel, WBuffer *buffer)
|
||||
char buf[256];
|
||||
snprintf(buf,
|
||||
sizeof(buf),
|
||||
"Before returning from a RPC call, ch %" PRIu64 " was "
|
||||
"closed due to a failed write",
|
||||
"ch %" PRIu64 ": stream write failed. "
|
||||
"RPC canceled; closing channel",
|
||||
channel->id);
|
||||
call_set_error(channel, buf, ERROR_LOG_LEVEL);
|
||||
}
|
||||
@ -817,6 +868,7 @@ static void call_set_error(Channel *channel, char *msg, int loglevel)
|
||||
ChannelCallFrame *frame = kv_A(channel->call_stack, i);
|
||||
frame->returned = true;
|
||||
frame->errored = true;
|
||||
api_free_object(frame->result);
|
||||
frame->result = STRING_OBJ(cstr_to_string(msg));
|
||||
}
|
||||
|
||||
|
@ -88,7 +88,12 @@ bool msgpack_rpc_to_object(const msgpack_object *const obj, Object *const arg)
|
||||
{
|
||||
bool ret = true;
|
||||
kvec_t(MPToAPIObjectStackItem) stack = KV_INITIAL_VALUE;
|
||||
kv_push(stack, ((MPToAPIObjectStackItem) { obj, arg, false, 0 }));
|
||||
kv_push(stack, ((MPToAPIObjectStackItem) {
|
||||
.mobj = obj,
|
||||
.aobj = arg,
|
||||
.container = false,
|
||||
.idx = 0,
|
||||
}));
|
||||
while (ret && kv_size(stack)) {
|
||||
MPToAPIObjectStackItem cur = kv_last(stack);
|
||||
if (!cur.container) {
|
||||
@ -361,7 +366,7 @@ typedef struct {
|
||||
size_t idx;
|
||||
} APIToMPObjectStackItem;
|
||||
|
||||
/// Convert type used by Neovim API to msgpack
|
||||
/// Convert type used by Nvim API to msgpack type.
|
||||
///
|
||||
/// @param[in] result Object to convert.
|
||||
/// @param[out] res Structure that defines where conversion results are saved.
|
||||
|
@ -13,6 +13,7 @@
|
||||
#include <stdbool.h>
|
||||
#include <stdlib.h>
|
||||
|
||||
#include "nvim/log.h"
|
||||
#include "nvim/vim.h"
|
||||
#include "nvim/ascii.h"
|
||||
#include "nvim/normal.h"
|
||||
|
@ -20,6 +20,22 @@ describe('server -> client', function()
|
||||
cid = nvim('get_api_info')[1]
|
||||
end)
|
||||
|
||||
it('handles unexpected closed stream while preparing RPC response', function()
|
||||
source([[
|
||||
let g:_nvim_args = [v:progpath, '--embed', '-n', '-u', 'NONE', '-i', 'NONE', ]
|
||||
let ch1 = jobstart(g:_nvim_args, {'rpc': v:true})
|
||||
let child1_ch = rpcrequest(ch1, "nvim_get_api_info")[0]
|
||||
call rpcnotify(ch1, 'nvim_eval', 'rpcrequest('.child1_ch.', "nvim_get_api_info")')
|
||||
|
||||
let ch2 = jobstart(g:_nvim_args, {'rpc': v:true})
|
||||
let child2_ch = rpcrequest(ch2, "nvim_get_api_info")[0]
|
||||
call rpcnotify(ch2, 'nvim_eval', 'rpcrequest('.child2_ch.', "nvim_get_api_info")')
|
||||
|
||||
call jobstop(ch1)
|
||||
]])
|
||||
eq(2, eval("1+1")) -- Still alive?
|
||||
end)
|
||||
|
||||
describe('simple call', function()
|
||||
it('works', function()
|
||||
local function on_setup()
|
||||
@ -141,7 +157,7 @@ describe('server -> client', function()
|
||||
end)
|
||||
end)
|
||||
|
||||
describe('when the client is a recursive vim instance', function()
|
||||
describe('recursive (child) nvim client', function()
|
||||
if os.getenv("TRAVIS") and helpers.os_name() == "osx" then
|
||||
-- XXX: Hangs Travis macOS since e9061117a5b8f195c3f26a5cb94e18ddd7752d86.
|
||||
pending("[Hangs on Travis macOS. #5002]", function() end)
|
||||
@ -155,7 +171,7 @@ describe('server -> client', function()
|
||||
|
||||
after_each(function() command('call rpcstop(vim)') end)
|
||||
|
||||
it('can send/recieve notifications and make requests', function()
|
||||
it('can send/receive notifications and make requests', function()
|
||||
nvim('command', "call rpcnotify(vim, 'vim_set_current_line', 'SOME TEXT')")
|
||||
|
||||
-- Wait for the notification to complete.
|
||||
@ -188,7 +204,7 @@ describe('server -> client', function()
|
||||
end)
|
||||
end)
|
||||
|
||||
describe('when using jobstart', function()
|
||||
describe('jobstart()', function()
|
||||
local jobid
|
||||
before_each(function()
|
||||
local channel = nvim('get_api_info')[1]
|
||||
@ -227,7 +243,7 @@ describe('server -> client', function()
|
||||
end)
|
||||
end)
|
||||
|
||||
describe('when connecting to another nvim instance', function()
|
||||
describe('connecting to another (peer) nvim', function()
|
||||
local function connect_test(server, mode, address)
|
||||
local serverpid = funcs.getpid()
|
||||
local client = spawn(nvim_argv)
|
||||
@ -256,7 +272,7 @@ describe('server -> client', function()
|
||||
client:close()
|
||||
end
|
||||
|
||||
it('over a named pipe', function()
|
||||
it('via named pipe', function()
|
||||
local server = spawn(nvim_argv)
|
||||
set_session(server)
|
||||
local address = funcs.serverlist()[1]
|
||||
@ -265,7 +281,7 @@ describe('server -> client', function()
|
||||
connect_test(server, 'pipe', address)
|
||||
end)
|
||||
|
||||
it('to an ip adress', function()
|
||||
it('via ip address', function()
|
||||
local server = spawn(nvim_argv)
|
||||
set_session(server)
|
||||
local address = funcs.serverstart("127.0.0.1:")
|
||||
@ -273,7 +289,7 @@ describe('server -> client', function()
|
||||
connect_test(server, 'tcp', address)
|
||||
end)
|
||||
|
||||
it('to a hostname', function()
|
||||
it('via hostname', function()
|
||||
local server = spawn(nvim_argv)
|
||||
set_session(server)
|
||||
local address = funcs.serverstart("localhost:")
|
||||
|
Loading…
Reference in New Issue
Block a user