diff --git a/scripts/msgpack-gen.lua b/scripts/msgpack-gen.lua index e7d5d5a503..e2cc267191 100644 --- a/scripts/msgpack-gen.lua +++ b/scripts/msgpack-gen.lua @@ -91,6 +91,8 @@ output:write([[ #include #include "nvim/os/msgpack_rpc.h" +#include "nvim/os/msgpack_rpc_helpers.h" +#include "nvim/api/private/helpers.h" ]]) for i = 1, #headers do @@ -120,20 +122,13 @@ output:write([[ }; const unsigned int msgpack_metadata_size = sizeof(msgpack_metadata); -void msgpack_rpc_dispatch(uint64_t channel_id, msgpack_object *req, msgpack_packer *res) +Object msgpack_rpc_dispatch(uint64_t channel_id, + uint64_t method_id, + msgpack_object *req, + Error *error) { - Error error = { .set = false }; - uint64_t method_id = (uint32_t)req->via.array.ptr[2].via.u64; - + Object ret = NIL; switch (method_id) { - case 0: - msgpack_pack_nil(res); - // The result is the [channel_id, metadata] array - msgpack_pack_array(res, 2); - msgpack_pack_uint64(res, channel_id); - msgpack_pack_raw(res, sizeof(msgpack_metadata)); - msgpack_pack_raw_body(res, msgpack_metadata, sizeof(msgpack_metadata)); - return; ]]) -- Visit each function metadata to build the case label with code generated @@ -145,8 +140,7 @@ for i = 1, #api.functions do output:write('\n case '..fn.id..': {') output:write('\n if (req->via.array.ptr[3].via.array.size != '..#fn.parameters..') {') - output:write('\n snprintf(error.msg, sizeof(error.msg), "Wrong number of arguments: expecting '..#fn.parameters..' but got %u", req->via.array.ptr[3].via.array.size);') - output:write('\n msgpack_rpc_error(error.msg, res);') + output:write('\n snprintf(error->msg, sizeof(error->msg), "Wrong number of arguments: expecting '..#fn.parameters..' but got %u", req->via.array.ptr[3].via.array.size);') output:write('\n goto '..cleanup_label..';') output:write('\n }\n') -- Declare/initialize variables that will hold converted arguments @@ -164,7 +158,9 @@ for i = 1, #api.functions do converted = 'arg_'..j convert_arg = 'msgpack_rpc_to_'..string.lower(param[1]) output:write('\n if (!'..convert_arg..'('..arg..', &'..converted..')) {') - output:write('\n msgpack_rpc_error("Wrong type for argument '..j..', expecting '..param[1]..'", res);') + output:write('\n snprintf(error->msg, sizeof(error->msg), "Wrong type for argument '..j..', expecting '..param[1]..'");') + + output:write('\n error->set = true;') output:write('\n goto '..cleanup_label..';') output:write('\n }\n') args[#args + 1] = converted @@ -195,28 +191,20 @@ for i = 1, #api.functions do if fn.can_fail then -- if the function can fail, also pass a pointer to the local error object if #args > 0 then - output:write(', &error);\n') + output:write(', error);\n') else - output:write('&error);\n') + output:write('error);\n') end -- and check for the error - output:write('\n if (error.set) {') - output:write('\n msgpack_rpc_error(error.msg, res);') + output:write('\n if (error->set) {') output:write('\n goto '..cleanup_label..';') output:write('\n }\n') else output:write(');\n') end - -- nil error - output:write('\n msgpack_pack_nil(res);'); - - if fn.return_type == 'void' then - output:write('\n msgpack_pack_nil(res);'); - else - output:write('\n msgpack_rpc_from_'..string.lower(fn.return_type)..'(rv, res);') - -- free the return value - output:write('\n msgpack_rpc_free_'..string.lower(fn.return_type)..'(rv);') + if fn.return_type ~= 'void' then + output:write('\n ret = '..string.upper(fn.return_type)..'_OBJ(rv);') end -- Now generate the cleanup label for freeing memory allocated for the -- arguments @@ -226,7 +214,7 @@ for i = 1, #api.functions do local param = fn.parameters[j] output:write('\n msgpack_rpc_free_'..string.lower(param[1])..'(arg_'..j..');') end - output:write('\n return;'); + output:write('\n break;'); output:write('\n };\n'); end @@ -235,8 +223,10 @@ output:write([[ default: - msgpack_rpc_error("Invalid function id", res); + snprintf(error->msg, sizeof(error->msg), "Invalid function id"); + error->set = true; } + return ret; } ]]) output:close() diff --git a/src/nvim/api/private/defs.h b/src/nvim/api/private/defs.h index ee0fc02c4d..b049412014 100644 --- a/src/nvim/api/private/defs.h +++ b/src/nvim/api/private/defs.h @@ -65,8 +65,16 @@ typedef enum { kObjectTypeInteger, kObjectTypeFloat, kObjectTypeString, + kObjectTypeBuffer, + kObjectTypeWindow, + kObjectTypeTabpage, kObjectTypeArray, - kObjectTypeDictionary + kObjectTypeDictionary, + kObjectTypePosition, + kObjectTypeStringArray, + kObjectTypeBufferArray, + kObjectTypeWindowArray, + kObjectTypeTabpageArray, } ObjectType; struct object { @@ -76,8 +84,16 @@ struct object { Integer integer; Float floating; String string; + Buffer buffer; + Window window; + Tabpage tabpage; Array array; Dictionary dictionary; + Position position; + StringArray stringarray; + BufferArray bufferarray; + WindowArray windowarray; + TabpageArray tabpagearray; } data; }; diff --git a/src/nvim/api/private/helpers.c b/src/nvim/api/private/helpers.c index 30301e9368..024f0c2405 100644 --- a/src/nvim/api/private/helpers.c +++ b/src/nvim/api/private/helpers.c @@ -341,7 +341,7 @@ String cstr_to_string(const char *str) }; } -static bool object_to_vim(Object obj, typval_T *tv, Error *err) +bool object_to_vim(Object obj, typval_T *tv, Error *err) { tv->v_type = VAR_UNKNOWN; tv->v_lock = 0; @@ -426,6 +426,8 @@ static bool object_to_vim(Object obj, typval_T *tv, Error *err) } tv->vval.v_dict->dv_refcount++; break; + default: + abort(); } return true; diff --git a/src/nvim/api/private/helpers.h b/src/nvim/api/private/helpers.h index e1e1a35490..f1b9dc3bc8 100644 --- a/src/nvim/api/private/helpers.h +++ b/src/nvim/api/private/helpers.h @@ -14,7 +14,9 @@ err->set = true; \ } while (0) -#define BOOL_OBJ(b) ((Object) { \ +#define OBJECT_OBJ(o) o + +#define BOOLEAN_OBJ(b) ((Object) { \ .type = kObjectTypeBoolean, \ .data.boolean = b \ }) @@ -26,26 +28,59 @@ #define STRING_OBJ(s) ((Object) { \ .type = kObjectTypeString, \ - .data.string = cstr_to_string(s) \ + .data.string = s \ }) -#define STRINGL_OBJ(d, s) ((Object) { \ - .type = kObjectTypeString, \ - .data.string = (String) { \ - .size = s, \ - .data = xmemdup(d, s) \ - }}) +#define BUFFER_OBJ(s) ((Object) { \ + .type = kObjectTypeBuffer, \ + .data.buffer = s \ + }) + +#define WINDOW_OBJ(s) ((Object) { \ + .type = kObjectTypeWindow, \ + .data.window = s \ + }) + +#define TABPAGE_OBJ(s) ((Object) { \ + .type = kObjectTypeTabpage, \ + .data.tabpage = s \ + }) #define ARRAY_OBJ(a) ((Object) { \ .type = kObjectTypeArray, \ .data.array = a \ }) +#define STRINGARRAY_OBJ(a) ((Object) { \ + .type = kObjectTypeStringArray, \ + .data.stringarray = a \ + }) + +#define BUFFERARRAY_OBJ(a) ((Object) { \ + .type = kObjectTypeBufferArray, \ + .data.bufferarray = a \ + }) + +#define WINDOWARRAY_OBJ(a) ((Object) { \ + .type = kObjectTypeWindowArray, \ + .data.windowarray = a \ + }) + +#define TABPAGEARRAY_OBJ(a) ((Object) { \ + .type = kObjectTypeTabpageArray, \ + .data.tabpagearray = a \ + }) + #define DICTIONARY_OBJ(d) ((Object) { \ .type = kObjectTypeDictionary, \ .data.dictionary = d \ }) +#define POSITION_OBJ(p) ((Object) { \ + .type = kObjectTypePosition, \ + .data.position = p \ + }) + #define NIL ((Object) {.type = kObjectTypeNil}) #define PUT(dict, k, v) \ diff --git a/src/nvim/api/vim.c b/src/nvim/api/vim.c index e7261e1096..fbeb42cf4b 100644 --- a/src/nvim/api/vim.c +++ b/src/nvim/api/vim.c @@ -424,8 +424,8 @@ void vim_set_current_tabpage(Tabpage tabpage, Error *err) /// @param event The event type string void vim_subscribe(uint64_t channel_id, String event) { - size_t length = (event.size < EVENT_MAXLEN ? event.size : EVENT_MAXLEN); - char e[EVENT_MAXLEN + 1]; + size_t length = (event.size < METHOD_MAXLEN ? event.size : METHOD_MAXLEN); + char e[METHOD_MAXLEN + 1]; memcpy(e, event.data, length); e[length] = NUL; channel_subscribe(channel_id, e); @@ -437,8 +437,10 @@ void vim_subscribe(uint64_t channel_id, String event) /// @param event The event type string void vim_unsubscribe(uint64_t channel_id, String event) { - size_t length = (event.size < EVENT_MAXLEN ? event.size : EVENT_MAXLEN); - char e[EVENT_MAXLEN + 1]; + size_t length = (event.size < METHOD_MAXLEN ? + event.size : + METHOD_MAXLEN); + char e[METHOD_MAXLEN + 1]; memcpy(e, event.data, length); e[length] = NUL; channel_unsubscribe(channel_id, e); diff --git a/src/nvim/eval.c b/src/nvim/eval.c index 92075e46f8..adc411afc7 100644 --- a/src/nvim/eval.c +++ b/src/nvim/eval.c @@ -71,6 +71,7 @@ #include "nvim/os/time.h" #include "nvim/os/channel.h" #include "nvim/api/private/helpers.h" +#include "nvim/os/msgpack_rpc_helpers.h" #define DICT_MAXNEST 100 /* maximum nesting of lists and dicts */ @@ -6453,6 +6454,7 @@ static struct fst { {"searchpair", 3, 7, f_searchpair}, {"searchpairpos", 3, 7, f_searchpairpos}, {"searchpos", 1, 4, f_searchpos}, + {"send_call", 3, 3, f_send_call}, {"send_event", 3, 3, f_send_event}, {"setbufvar", 3, 3, f_setbufvar}, {"setcmdpos", 1, 1, f_setcmdpos}, @@ -10474,6 +10476,7 @@ static void f_job_start(typval_T *argvars, typval_T *rettv) on_job_stderr, on_job_exit, true, + 0, &rettv->vval.v_number); if (rettv->vval.v_number <= 0) { @@ -10535,6 +10538,7 @@ static void f_job_write(typval_T *argvars, typval_T *rettv) if (!job) { // Invalid job id EMSG(_(e_invjob)); + return; } WBuffer *buf = wstream_new_buffer(xstrdup((char *)argvars[1].vval.v_string), @@ -12523,6 +12527,47 @@ do_searchpair ( return retval; } +// "send_call()" function +static void f_send_call(typval_T *argvars, typval_T *rettv) +{ + rettv->v_type = VAR_NUMBER; + rettv->vval.v_number = 0; + + if (check_restricted() || check_secure()) { + return; + } + + if (argvars[0].v_type != VAR_NUMBER || argvars[0].vval.v_number <= 0) { + EMSG2(_(e_invarg2), "Channel id must be a positive integer"); + return; + } + + if (argvars[1].v_type != VAR_STRING) { + EMSG2(_(e_invarg2), "Method name must be a string"); + return; + } + + bool errored; + Object result; + if (!channel_send_call((uint64_t)argvars[0].vval.v_number, + (char *)argvars[1].vval.v_string, + vim_to_object(&argvars[2]), + &result, + &errored)) { + EMSG2(_(e_invarg2), "Channel doesn't exist"); + return; + } + + Error conversion_error = {.set = false}; + if (errored || !object_to_vim(result, rettv, &conversion_error)) { + EMSG(errored ? + result.data.string.data : + _("Error converting the call result")); + } + + msgpack_rpc_free_object(result); +} + // "send_event()" function static void f_send_event(typval_T *argvars, typval_T *rettv) { diff --git a/src/nvim/os/channel.c b/src/nvim/os/channel.c index 653f09756a..9bba247a7b 100644 --- a/src/nvim/os/channel.c +++ b/src/nvim/os/channel.c @@ -5,6 +5,7 @@ #include "nvim/api/private/helpers.h" #include "nvim/os/channel.h" +#include "nvim/os/event.h" #include "nvim/os/rstream.h" #include "nvim/os/rstream_defs.h" #include "nvim/os/wstream.h" @@ -12,17 +13,24 @@ #include "nvim/os/job.h" #include "nvim/os/job_defs.h" #include "nvim/os/msgpack_rpc.h" +#include "nvim/os/msgpack_rpc_helpers.h" #include "nvim/vim.h" #include "nvim/memory.h" +#include "nvim/message.h" #include "nvim/map.h" #include "nvim/lib/kvec.h" +typedef struct { + uint64_t request_id; + bool errored; + Object result; +} ChannelCallFrame; + typedef struct { uint64_t id; PMap(cstr_t) *subscribed_events; - bool is_job; + bool is_job, enabled; msgpack_unpacker *unpacker; - msgpack_sbuffer *sbuffer; union { Job *job; struct { @@ -31,12 +39,15 @@ typedef struct { uv_stream_t *uv; } streams; } data; + uint64_t next_request_id; + kvec_t(ChannelCallFrame *) call_stack; + size_t rpc_call_level; } Channel; static uint64_t next_id = 1; static PMap(uint64_t) *channels = NULL; static PMap(cstr_t) *event_strings = NULL; -static msgpack_sbuffer msgpack_event_buffer; +static msgpack_sbuffer out_buffer; #ifdef INCLUDE_GENERATED_DECLARATIONS # include "os/channel.c.generated.h" @@ -47,7 +58,7 @@ void channel_init() { channels = pmap_new(uint64_t)(); event_strings = pmap_new(cstr_t)(); - msgpack_sbuffer_init(&msgpack_event_buffer); + msgpack_sbuffer_init(&out_buffer); } /// Teardown the module @@ -80,6 +91,7 @@ bool channel_from_job(char **argv) job_err, job_exit, true, + 0, &status); if (status <= 0) { @@ -104,7 +116,7 @@ void channel_from_stream(uv_stream_t *stream) rstream_set_stream(channel->data.streams.read, stream); rstream_start(channel->data.streams.read); // write stream - channel->data.streams.write = wstream_new(1024 * 1024); + channel->data.streams.write = wstream_new(0); wstream_set_stream(channel->data.streams.write, stream); channel->data.streams.uv = stream; } @@ -113,26 +125,98 @@ void channel_from_stream(uv_stream_t *stream) /// /// @param id The channel id. If 0, the event will be sent to all /// channels that have subscribed to the event type -/// @param type The event type, an arbitrary string -/// @param obj The event data +/// @param name The event name, an arbitrary string +/// @param arg The event arg /// @return True if the data was sent successfully, false otherwise. -bool channel_send_event(uint64_t id, char *type, Object data) +bool channel_send_event(uint64_t id, char *name, Object arg) { Channel *channel = NULL; if (id > 0) { if (!(channel = pmap_get(uint64_t)(channels, id))) { - msgpack_rpc_free_object(data); + msgpack_rpc_free_object(arg); return false; } - send_event(channel, type, data); + send_event(channel, name, arg); } else { - broadcast_event(type, data); + broadcast_event(name, arg); } return true; } +bool channel_send_call(uint64_t id, + char *name, + Object arg, + Object *result, + bool *errored) +{ + Channel *channel = NULL; + + if (!(channel = pmap_get(uint64_t)(channels, id))) { + msgpack_rpc_free_object(arg); + return false; + } + + if (kv_size(channel->call_stack) > 20) { + // 20 stack depth is more than anyone should ever need for RPC calls + *errored = true; + char buf[256]; + snprintf(buf, + sizeof(buf), + "Channel %" PRIu64 " was closed due to a high stack depth " + "while processing a RPC call", + channel->id); + *result = STRING_OBJ(cstr_to_string(buf)); + } + + uint64_t request_id = channel->next_request_id++; + // Send the msgpack-rpc request + send_request(channel, request_id, name, arg); + + if (!kv_size(channel->call_stack)) { + // This is the first frame, we must disable event deferral for this + // channel because we won't be returning until the client sends a + // response + if (channel->is_job) { + job_set_defer(channel->data.job, false); + } else { + rstream_set_defer(channel->data.streams.read, false); + } + } + + // Push the frame + ChannelCallFrame frame = {request_id, false, NIL}; + kv_push(ChannelCallFrame *, channel->call_stack, &frame); + size_t size = kv_size(channel->call_stack); + + do { + event_poll(-1); + } while ( + // Continue running if ... + channel->enabled && // the channel is still enabled + kv_size(channel->call_stack) >= size); // the call didn't return + + if (!kv_size(channel->call_stack)) { + // Popped last frame, restore event deferral + if (channel->is_job) { + job_set_defer(channel->data.job, true); + } else { + rstream_set_defer(channel->data.streams.read, true); + } + if (!channel->enabled && !channel->rpc_call_level) { + // Close the channel if it has been disabled and we have not been called + // by `parse_msgpack`(It would be unsafe to close the channel otherwise) + close_channel(channel); + } + } + + *errored = frame.errored; + *result = frame.result; + + return true; +} + /// Subscribes to event broadcasts /// /// @param id The channel id @@ -191,10 +275,17 @@ static void parse_msgpack(RStream *rstream, void *data, bool eof) Channel *channel = data; if (eof) { - close_channel(channel); + char buf[256]; + snprintf(buf, + sizeof(buf), + "Before returning from a RPC call, channel %" PRIu64 " was " + "closed by the client", + channel->id); + disable_channel(channel, buf); return; } + channel->rpc_call_level++; uint32_t count = rstream_available(rstream); // Feed the unpacker with data @@ -205,23 +296,34 @@ static void parse_msgpack(RStream *rstream, void *data, bool eof) msgpack_unpacked unpacked; msgpack_unpacked_init(&unpacked); UnpackResult result; - msgpack_packer response; // Deserialize everything we can. while ((result = msgpack_rpc_unpack(channel->unpacker, &unpacked)) == kUnpackResultOk) { - // Each object is a new msgpack-rpc request and requires an empty response - msgpack_packer_init(&response, channel->sbuffer, msgpack_sbuffer_write); - // Perform the call - msgpack_rpc_call(channel->id, &unpacked.data, &response); - wstream_write(channel->data.streams.write, - wstream_new_buffer(xmemdup(channel->sbuffer->data, - channel->sbuffer->size), - channel->sbuffer->size, - free)); + if (kv_size(channel->call_stack) && is_rpc_response(&unpacked.data)) { + if (is_valid_rpc_response(&unpacked.data, channel)) { + call_stack_pop(&unpacked.data, channel); + } else { + char buf[256]; + snprintf(buf, + sizeof(buf), + "Channel %" PRIu64 " returned a response that doesn't have " + " a matching id for the current RPC call. Ensure the client " + " is properly synchronized", + channel->id); + call_stack_unwind(channel, buf, 1); + } + msgpack_unpacked_destroy(&unpacked); + // Bail out from this event loop iteration + goto end; + } - // Clear the buffer for future calls - msgpack_sbuffer_clear(channel->sbuffer); + // Perform the call + WBuffer *resp = msgpack_rpc_call(channel->id, &unpacked.data, &out_buffer); + // write the response + if (!channel_write(channel, resp)) { + goto end; + } } if (result == kUnpackResultFail) { @@ -231,50 +333,87 @@ static void parse_msgpack(RStream *rstream, void *data, bool eof) // A not so uncommon cause for this might be deserializing objects with // a high nesting level: msgpack will break when it's internal parse stack // size exceeds MSGPACK_EMBED_STACK_SIZE(defined as 32 by default) - msgpack_packer_init(&response, channel->sbuffer, msgpack_sbuffer_write); - msgpack_pack_array(&response, 4); - msgpack_pack_int(&response, 1); - msgpack_pack_int(&response, 0); - msgpack_rpc_error("Invalid msgpack payload. " - "This error can also happen when deserializing " - "an object with high level of nesting", - &response); - wstream_write(channel->data.streams.write, - wstream_new_buffer(xmemdup(channel->sbuffer->data, - channel->sbuffer->size), - channel->sbuffer->size, - free)); - // Clear the buffer for future calls - msgpack_sbuffer_clear(channel->sbuffer); + send_error(channel, 0, "Invalid msgpack payload. " + "This error can also happen when deserializing " + "an object with high level of nesting"); + } + +end: + channel->rpc_call_level--; + if (!channel->enabled && !kv_size(channel->call_stack)) { + // Now it's safe to destroy the channel + close_channel(channel); } } -static void send_event(Channel *channel, char *type, Object data) +static bool channel_write(Channel *channel, WBuffer *buffer) { - wstream_write(channel->data.streams.write, serialize_event(type, data)); + bool success; + + if (channel->is_job) { + success = job_write(channel->data.job, buffer); + } else { + success = wstream_write(channel->data.streams.write, buffer); + } + + if (!success) { + // If the write failed for any reason, close the channel + char buf[256]; + snprintf(buf, + sizeof(buf), + "Before returning from a RPC call, channel %" PRIu64 " was " + "closed due to a failed write", + channel->id); + disable_channel(channel, buf); + } + + return success; } -static void broadcast_event(char *type, Object data) +static void send_error(Channel *channel, uint64_t id, char *err) +{ + channel_write(channel, serialize_response(id, err, NIL, &out_buffer)); +} + +static void send_request(Channel *channel, + uint64_t id, + char *name, + Object arg) +{ + String method = {.size = strlen(name), .data = name}; + channel_write(channel, serialize_request(id, method, arg, &out_buffer)); +} + +static void send_event(Channel *channel, + char *name, + Object arg) +{ + String method = {.size = strlen(name), .data = name}; + channel_write(channel, serialize_request(0, method, arg, &out_buffer)); +} + +static void broadcast_event(char *name, Object arg) { kvec_t(Channel *) subscribed; kv_init(subscribed); Channel *channel; map_foreach_value(channels, channel, { - if (pmap_has(cstr_t)(channel->subscribed_events, type)) { + if (pmap_has(cstr_t)(channel->subscribed_events, name)) { kv_push(Channel *, subscribed, channel); } }); if (!kv_size(subscribed)) { - msgpack_rpc_free_object(data); + msgpack_rpc_free_object(arg); goto end; } - WBuffer *buffer = serialize_event(type, data); + String method = {.size = strlen(name), .data = name}; + WBuffer *buffer = serialize_request(0, method, arg, &out_buffer); for (size_t i = 0; i < kv_size(subscribed); i++) { - wstream_write(kv_A(subscribed, i)->data.streams.write, buffer); + channel_write(kv_A(subscribed, i), buffer); } end: @@ -300,7 +439,6 @@ static void unsubscribe(Channel *channel, char *event) static void close_channel(Channel *channel) { pmap_del(uint64_t)(channels, channel->id); - msgpack_sbuffer_free(channel->sbuffer); msgpack_unpacker_free(channel->unpacker); if (channel->is_job) { @@ -320,6 +458,7 @@ static void close_channel(Channel *channel) }); pmap_free(cstr_t)(channel->subscribed_events); + kv_destroy(channel->call_stack); free(channel); } @@ -329,29 +468,69 @@ static void close_cb(uv_handle_t *handle) free(handle); } -static WBuffer *serialize_event(char *type, Object data) -{ - String event_type = {.size = strnlen(type, EVENT_MAXLEN), .data = type}; - msgpack_packer packer; - msgpack_packer_init(&packer, &msgpack_event_buffer, msgpack_sbuffer_write); - msgpack_rpc_notification(event_type, data, &packer); - WBuffer *rv = wstream_new_buffer(xmemdup(msgpack_event_buffer.data, - msgpack_event_buffer.size), - msgpack_event_buffer.size, - free); - msgpack_rpc_free_object(data); - msgpack_sbuffer_clear(&msgpack_event_buffer); - - return rv; -} - static Channel *register_channel() { Channel *rv = xmalloc(sizeof(Channel)); + rv->enabled = true; + rv->rpc_call_level = 0; rv->unpacker = msgpack_unpacker_new(MSGPACK_UNPACKER_INIT_BUFFER_SIZE); - rv->sbuffer = msgpack_sbuffer_new(); rv->id = next_id++; rv->subscribed_events = pmap_new(cstr_t)(); + rv->next_request_id = 1; + kv_init(rv->call_stack); pmap_put(uint64_t)(channels, rv->id, rv); return rv; } + +static bool is_rpc_response(msgpack_object *obj) +{ + return obj->type == MSGPACK_OBJECT_ARRAY + && obj->via.array.size == 4 + && obj->via.array.ptr[0].type == MSGPACK_OBJECT_POSITIVE_INTEGER + && obj->via.array.ptr[0].via.u64 == 1 + && obj->via.array.ptr[1].type == MSGPACK_OBJECT_POSITIVE_INTEGER; +} + +static bool is_valid_rpc_response(msgpack_object *obj, Channel *channel) +{ + uint64_t response_id = obj->via.array.ptr[1].via.u64; + // Must be equal to the frame at the stack's bottom + return response_id == kv_A(channel->call_stack, + kv_size(channel->call_stack) - 1)->request_id; +} + +static void call_stack_pop(msgpack_object *obj, Channel *channel) +{ + ChannelCallFrame *frame = kv_A(channel->call_stack, + kv_size(channel->call_stack) - 1); + frame->errored = obj->via.array.ptr[2].type != MSGPACK_OBJECT_NIL; + (void)kv_pop(channel->call_stack); + + if (frame->errored) { + msgpack_rpc_to_object(&obj->via.array.ptr[2], &frame->result); + } else { + msgpack_rpc_to_object(&obj->via.array.ptr[3], &frame->result); + } +} + +static void call_stack_unwind(Channel *channel, char *msg, int count) +{ + while (kv_size(channel->call_stack) && count--) { + ChannelCallFrame *frame = kv_pop(channel->call_stack); + frame->errored = true; + frame->result = STRING_OBJ(cstr_to_string(msg)); + } +} + +static void disable_channel(Channel *channel, char *msg) +{ + if (kv_size(channel->call_stack)) { + // Channel is currently in the middle of a call, remove all frames and mark + // it as "dead" + channel->enabled = false; + call_stack_unwind(channel, msg, -1); + } else { + // Safe to close it now + close_channel(channel); + } +} diff --git a/src/nvim/os/channel.h b/src/nvim/os/channel.h index f12d54cede..ce04abb76d 100644 --- a/src/nvim/os/channel.h +++ b/src/nvim/os/channel.h @@ -6,7 +6,7 @@ #include "nvim/api/private/defs.h" #include "nvim/vim.h" -#define EVENT_MAXLEN 512 +#define METHOD_MAXLEN 512 #ifdef INCLUDE_GENERATED_DECLARATIONS # include "os/channel.h.generated.h" diff --git a/src/nvim/os/event.c b/src/nvim/os/event.c index 6723b97e0c..a8bd6ca886 100644 --- a/src/nvim/os/event.c +++ b/src/nvim/os/event.c @@ -63,11 +63,6 @@ bool event_poll(int32_t ms) { uv_run_mode run_mode = UV_RUN_ONCE; - if (input_ready()) { - // If there's a pending input event to be consumed, do it now - return true; - } - static int recursive = 0; if (!(recursive++)) { @@ -95,17 +90,16 @@ bool event_poll(int32_t ms) run_mode = UV_RUN_NOWAIT; } + bool events_processed; + do { // Run one event loop iteration, blocking for events if run_mode is // UV_RUN_ONCE uv_run(uv_default_loop(), run_mode); - // Process immediate events outside uv_run since libuv event loop not - // support recursion(processing events may cause a recursive event_poll - // call) - event_process(false); + events_processed = event_process(false); } while ( // Continue running if ... - !input_ready() && // we have no input + !events_processed && // we didn't process any immediate events !event_has_deferred() && // no events are waiting to be processed run_mode != UV_RUN_NOWAIT && // ms != 0 !timer_data.timed_out); // we didn't get a timeout @@ -124,7 +118,7 @@ bool event_poll(int32_t ms) event_process(false); } - return input_ready() || event_has_deferred(); + return !timer_data.timed_out && (events_processed || event_has_deferred()); } bool event_has_deferred() @@ -139,11 +133,13 @@ void event_push(Event event, bool deferred) } // Runs the appropriate action for each queued event -void event_process(bool deferred) +bool event_process(bool deferred) { + bool processed_events = false; Event event; while (kl_shift(Event, get_queue(deferred), &event) == 0) { + processed_events = true; switch (event.type) { case kEventSignal: signal_handle(event); @@ -158,6 +154,8 @@ void event_process(bool deferred) abort(); } } + + return processed_events; } // Set a flag in the `event_poll` loop for signaling of a timeout diff --git a/src/nvim/os/input.c b/src/nvim/os/input.c index 6e42cba4ad..0f6d2df12f 100644 --- a/src/nvim/os/input.c +++ b/src/nvim/os/input.c @@ -37,12 +37,6 @@ void input_init() rstream_set_file(read_stream, read_cmd_fd); } -// Check if there's pending input -bool input_ready() -{ - return rstream_available(read_stream) > 0 || eof; -} - // Listen for input void input_start() { @@ -119,7 +113,7 @@ bool os_char_avail() // In cooked mode we should get SIGINT, no need to check. void os_breakcheck() { - if (curr_tmode == TMODE_RAW && event_poll(0)) + if (curr_tmode == TMODE_RAW && input_poll(0)) fill_input_buf(false); } @@ -132,6 +126,11 @@ bool os_isatty(int fd) return uv_guess_handle(fd) == UV_TTY; } +static bool input_poll(int32_t ms) +{ + return input_ready() || event_poll(ms) || input_ready(); +} + // This is a replacement for the old `WaitForChar` function in os_unix.c static InbufPollResult inbuf_poll(int32_t ms) { @@ -139,7 +138,7 @@ static InbufPollResult inbuf_poll(int32_t ms) return kInputAvail; } - if (event_poll(ms)) { + if (input_poll(ms)) { return eof && rstream_available(read_stream) == 0 ? kInputEof : kInputAvail; @@ -196,3 +195,10 @@ static int push_event_key(uint8_t *buf, int maxlen) return buf_idx; } + +// Check if there's pending input +bool input_ready() +{ + return rstream_available(read_stream) > 0 || eof; +} + diff --git a/src/nvim/os/job.c b/src/nvim/os/job.c index b369004e47..dcf50243a9 100644 --- a/src/nvim/os/job.c +++ b/src/nvim/os/job.c @@ -21,7 +21,6 @@ #define EXIT_TIMEOUT 25 #define MAX_RUNNING_JOBS 100 #define JOB_BUFFER_SIZE 1024 -#define JOB_WRITE_MAXMEM 1024 * 1024 struct job { // Job id the index in the job table plus one. @@ -131,6 +130,7 @@ void job_teardown() /// @param exit_cb Callback that will be invoked when the job exits /// @param defer If the job callbacks invocation should be deferred to vim /// main loop +/// @param maxmem Maximum amount of memory used by the job WStream /// @param[out] The job id if the job started successfully, 0 if the job table /// is full, -1 if the program could not be executed. /// @return The job pointer if the job started successfully, NULL otherwise @@ -140,6 +140,7 @@ Job *job_start(char **argv, rstream_cb stderr_cb, job_exit_cb job_exit_cb, bool defer, + size_t maxmem, int *status) { int i; @@ -210,7 +211,7 @@ Job *job_start(char **argv, handle_set_job((uv_handle_t *)&job->proc_stdout, job); handle_set_job((uv_handle_t *)&job->proc_stderr, job); - job->in = wstream_new(JOB_WRITE_MAXMEM); + job->in = wstream_new(maxmem); wstream_set_stream(job->in, (uv_stream_t *)&job->proc_stdin); // Start the readable streams job->out = rstream_new(read_cb, JOB_BUFFER_SIZE, job, defer); diff --git a/src/nvim/os/msgpack_rpc.c b/src/nvim/os/msgpack_rpc.c index 63e1245028..85569372da 100644 --- a/src/nvim/os/msgpack_rpc.c +++ b/src/nvim/os/msgpack_rpc.c @@ -3,405 +3,73 @@ #include -#include "nvim/os/msgpack_rpc.h" #include "nvim/vim.h" #include "nvim/memory.h" +#include "nvim/os/wstream.h" +#include "nvim/os/msgpack_rpc.h" +#include "nvim/os/msgpack_rpc_helpers.h" +#include "nvim/api/private/helpers.h" +#include "nvim/func_attr.h" -#define REMOTE_FUNCS_IMPL(t, lt) \ - bool msgpack_rpc_to_##lt(msgpack_object *obj, t *arg) \ - { \ - *arg = obj->via.u64; \ - return obj->type == MSGPACK_OBJECT_POSITIVE_INTEGER; \ - } \ - \ - void msgpack_rpc_from_##lt(t result, msgpack_packer *res) \ - { \ - msgpack_pack_uint64(res, result); \ - } +#ifdef INCLUDE_GENERATED_DECLARATIONS +# include "os/msgpack_rpc.c.generated.h" +#endif -#define TYPED_ARRAY_IMPL(t, lt) \ - bool msgpack_rpc_to_##lt##array(msgpack_object *obj, t##Array *arg) \ - { \ - if (obj->type != MSGPACK_OBJECT_ARRAY) { \ - return false; \ - } \ - \ - arg->size = obj->via.array.size; \ - arg->items = xcalloc(obj->via.array.size, sizeof(t)); \ - \ - for (size_t i = 0; i < obj->via.array.size; i++) { \ - if (!msgpack_rpc_to_##lt(obj->via.array.ptr + i, &arg->items[i])) { \ - return false; \ - } \ - } \ - \ - return true; \ - } \ - \ - void msgpack_rpc_from_##lt##array(t##Array result, msgpack_packer *res) \ - { \ - msgpack_pack_array(res, result.size); \ - \ - for (size_t i = 0; i < result.size; i++) { \ - msgpack_rpc_from_##lt(result.items[i], res); \ - } \ - } \ - \ - void msgpack_rpc_free_##lt##array(t##Array value) { \ - for (size_t i = 0; i < value.size; i++) { \ - msgpack_rpc_free_##lt(value.items[i]); \ - } \ - \ - free(value.items); \ - } +extern const uint8_t msgpack_metadata[]; +extern const unsigned int msgpack_metadata_size; -void msgpack_rpc_call(uint64_t id, msgpack_object *req, msgpack_packer *res) +/// Validates the basic structure of the msgpack-rpc call and fills `res` +/// with the basic response structure. +/// +/// @param channel_id The channel id +/// @param req The parsed request object +/// @param res A packer that contains the response +WBuffer *msgpack_rpc_call(uint64_t channel_id, + msgpack_object *req, + msgpack_sbuffer *sbuffer) + FUNC_ATTR_NONNULL_ARG(2) + FUNC_ATTR_NONNULL_ARG(3) { - // The initial response structure is the same no matter what happens, - // we set it up here - // Array of size 4 - msgpack_pack_array(res, 4); - // Response type is 1 - msgpack_pack_int(res, 1); + uint64_t response_id; + char *err = msgpack_rpc_validate(&response_id, req); - // Validate the basic structure of the msgpack-rpc payload - if (req->type != MSGPACK_OBJECT_ARRAY) { - msgpack_pack_int(res, 0); // no message id yet - msgpack_rpc_error("Request is not an array", res); - return; + if (err) { + return serialize_response(response_id, err, NIL, sbuffer); } - if (req->via.array.size != 4) { - msgpack_pack_int(res, 0); // no message id yet - char error_msg[256]; - snprintf(error_msg, - sizeof(error_msg), - "Request array size is %u, it should be 4", - req->via.array.size); - msgpack_rpc_error(error_msg, res); - return; + uint64_t method_id = req->via.array.ptr[2].via.u64; + + if (method_id == 0) { + return serialize_metadata(response_id, channel_id, sbuffer); } - if (req->via.array.ptr[1].type != MSGPACK_OBJECT_POSITIVE_INTEGER) { - msgpack_pack_int(res, 0); // no message id yet - msgpack_rpc_error("Id must be a positive integer", res); - return; + // dispatch the call + Error error = { .set = false }; + Object rv = msgpack_rpc_dispatch(channel_id, method_id, req, &error); + // send the response + msgpack_packer response; + msgpack_packer_init(&response, sbuffer, msgpack_sbuffer_write); + + if (error.set) { + return serialize_response(response_id, error.msg, NIL, sbuffer); } - // Set the response id, which is the same as the request - msgpack_pack_uint64(res, req->via.array.ptr[1].via.u64); - - if (req->via.array.ptr[0].type != MSGPACK_OBJECT_POSITIVE_INTEGER) { - msgpack_rpc_error("Message type must be an integer", res); - return; - } - - if (req->via.array.ptr[0].via.u64 != 0) { - msgpack_rpc_error("Message type must be 0", res); - return; - } - - if (req->via.array.ptr[2].type != MSGPACK_OBJECT_POSITIVE_INTEGER) { - msgpack_rpc_error("Method id must be a positive integer", res); - return; - } - - if (req->via.array.ptr[3].type != MSGPACK_OBJECT_ARRAY) { - msgpack_rpc_error("Paremeters must be an array", res); - return; - } - - // dispatch the message - msgpack_rpc_dispatch(id, req, res); -} - -void msgpack_rpc_notification(String type, Object data, msgpack_packer *pac) -{ - msgpack_pack_array(pac, 3); - msgpack_pack_int(pac, 2); - msgpack_pack_raw(pac, type.size); - msgpack_pack_raw_body(pac, type.data, type.size); - msgpack_rpc_from_object(data, pac); -} - -void msgpack_rpc_error(char *msg, msgpack_packer *res) -{ - size_t len = strlen(msg); - - // error message - msgpack_pack_raw(res, len); - msgpack_pack_raw_body(res, msg, len); - // Nil result - msgpack_pack_nil(res); -} - -bool msgpack_rpc_to_boolean(msgpack_object *obj, Boolean *arg) -{ - *arg = obj->via.boolean; - return obj->type == MSGPACK_OBJECT_BOOLEAN; -} - -bool msgpack_rpc_to_integer(msgpack_object *obj, Integer *arg) -{ - if (obj->type == MSGPACK_OBJECT_POSITIVE_INTEGER - && obj->via.u64 <= INT64_MAX) { - *arg = (int64_t)obj->via.u64; - return true; - } - - *arg = obj->via.i64; - return obj->type == MSGPACK_OBJECT_NEGATIVE_INTEGER; -} - -bool msgpack_rpc_to_float(msgpack_object *obj, Float *arg) -{ - *arg = obj->via.dec; - return obj->type == MSGPACK_OBJECT_DOUBLE; -} - -bool msgpack_rpc_to_string(msgpack_object *obj, String *arg) -{ - if (obj->type != MSGPACK_OBJECT_RAW) { - return false; - } - - arg->data = xmemdupz(obj->via.raw.ptr, obj->via.raw.size); - arg->size = obj->via.raw.size; - return true; -} - -bool msgpack_rpc_to_object(msgpack_object *obj, Object *arg) -{ - switch (obj->type) { - case MSGPACK_OBJECT_NIL: - arg->type = kObjectTypeNil; - return true; - - case MSGPACK_OBJECT_BOOLEAN: - arg->type = kObjectTypeBoolean; - return msgpack_rpc_to_boolean(obj, &arg->data.boolean); - - case MSGPACK_OBJECT_POSITIVE_INTEGER: - case MSGPACK_OBJECT_NEGATIVE_INTEGER: - arg->type = kObjectTypeInteger; - return msgpack_rpc_to_integer(obj, &arg->data.integer); - - case MSGPACK_OBJECT_DOUBLE: - arg->type = kObjectTypeFloat; - return msgpack_rpc_to_float(obj, &arg->data.floating); - - case MSGPACK_OBJECT_RAW: - arg->type = kObjectTypeString; - return msgpack_rpc_to_string(obj, &arg->data.string); - - case MSGPACK_OBJECT_ARRAY: - arg->type = kObjectTypeArray; - return msgpack_rpc_to_array(obj, &arg->data.array); - - case MSGPACK_OBJECT_MAP: - arg->type = kObjectTypeDictionary; - return msgpack_rpc_to_dictionary(obj, &arg->data.dictionary); - - default: - return false; - } -} - -bool msgpack_rpc_to_position(msgpack_object *obj, Position *arg) -{ - return obj->type == MSGPACK_OBJECT_ARRAY - && obj->via.array.size == 2 - && msgpack_rpc_to_integer(obj->via.array.ptr, &arg->row) - && msgpack_rpc_to_integer(obj->via.array.ptr + 1, &arg->col); -} - - -bool msgpack_rpc_to_array(msgpack_object *obj, Array *arg) -{ - if (obj->type != MSGPACK_OBJECT_ARRAY) { - return false; - } - - arg->size = obj->via.array.size; - arg->items = xcalloc(obj->via.array.size, sizeof(Object)); - - for (uint32_t i = 0; i < obj->via.array.size; i++) { - if (!msgpack_rpc_to_object(obj->via.array.ptr + i, &arg->items[i])) { - return false; - } - } - - return true; -} - -bool msgpack_rpc_to_dictionary(msgpack_object *obj, Dictionary *arg) -{ - if (obj->type != MSGPACK_OBJECT_MAP) { - return false; - } - - arg->size = obj->via.array.size; - arg->items = xcalloc(obj->via.map.size, sizeof(KeyValuePair)); - - - for (uint32_t i = 0; i < obj->via.map.size; i++) { - if (!msgpack_rpc_to_string(&obj->via.map.ptr[i].key, - &arg->items[i].key)) { - return false; - } - - if (!msgpack_rpc_to_object(&obj->via.map.ptr[i].val, - &arg->items[i].value)) { - return false; - } - } - - return true; -} - -void msgpack_rpc_from_boolean(Boolean result, msgpack_packer *res) -{ - if (result) { - msgpack_pack_true(res); - } else { - msgpack_pack_false(res); - } -} - -void msgpack_rpc_from_integer(Integer result, msgpack_packer *res) -{ - msgpack_pack_int64(res, result); -} - -void msgpack_rpc_from_float(Float result, msgpack_packer *res) -{ - msgpack_pack_double(res, result); -} - -void msgpack_rpc_from_string(String result, msgpack_packer *res) -{ - msgpack_pack_raw(res, result.size); - msgpack_pack_raw_body(res, result.data, result.size); -} - -void msgpack_rpc_from_object(Object result, msgpack_packer *res) -{ - switch (result.type) { - case kObjectTypeNil: - msgpack_pack_nil(res); - break; - - case kObjectTypeBoolean: - msgpack_rpc_from_boolean(result.data.boolean, res); - break; - - case kObjectTypeInteger: - msgpack_rpc_from_integer(result.data.integer, res); - break; - - case kObjectTypeFloat: - msgpack_rpc_from_float(result.data.floating, res); - break; - - case kObjectTypeString: - msgpack_rpc_from_string(result.data.string, res); - break; - - case kObjectTypeArray: - msgpack_rpc_from_array(result.data.array, res); - break; - - case kObjectTypeDictionary: - msgpack_rpc_from_dictionary(result.data.dictionary, res); - break; - - default: - abort(); - } -} - -void msgpack_rpc_from_position(Position result, msgpack_packer *res) -{ - msgpack_pack_array(res, 2);; - msgpack_pack_int64(res, result.row); - msgpack_pack_int64(res, result.col); -} - -void msgpack_rpc_from_array(Array result, msgpack_packer *res) -{ - msgpack_pack_array(res, result.size); - - for (size_t i = 0; i < result.size; i++) { - msgpack_rpc_from_object(result.items[i], res); - } -} - -void msgpack_rpc_from_dictionary(Dictionary result, msgpack_packer *res) -{ - msgpack_pack_map(res, result.size); - - for (size_t i = 0; i < result.size; i++) { - msgpack_rpc_from_string(result.items[i].key, res); - msgpack_rpc_from_object(result.items[i].value, res); - } -} - -void msgpack_rpc_free_string(String value) -{ - if (!value.data) { - return; - } - - free(value.data); -} - -void msgpack_rpc_free_object(Object value) -{ - switch (value.type) { - case kObjectTypeNil: - case kObjectTypeBoolean: - case kObjectTypeInteger: - case kObjectTypeFloat: - break; - - case kObjectTypeString: - msgpack_rpc_free_string(value.data.string); - break; - - case kObjectTypeArray: - msgpack_rpc_free_array(value.data.array); - break; - - case kObjectTypeDictionary: - msgpack_rpc_free_dictionary(value.data.dictionary); - break; - - default: - abort(); - } -} - -void msgpack_rpc_free_array(Array value) -{ - for (uint32_t i = 0; i < value.size; i++) { - msgpack_rpc_free_object(value.items[i]); - } - - free(value.items); -} - -void msgpack_rpc_free_dictionary(Dictionary value) -{ - for (uint32_t i = 0; i < value.size; i++) { - msgpack_rpc_free_string(value.items[i].key); - msgpack_rpc_free_object(value.items[i].value); - } - - free(value.items); + return serialize_response(response_id, NULL, rv, sbuffer); } +/// Try to unpack a msgpack document from the data in the unpacker buffer. This +/// function is a replacement to msgpack.h `msgpack_unpack_next` that lets +/// the called know if the unpacking failed due to bad input or due to missing +/// data. +/// +/// @param unpacker The unpacker containing the parse buffer +/// @param result The result which will contain the parsed object +/// @return kUnpackResultOk : An object was parsed +/// kUnpackResultFail : Got bad input +/// kUnpackResultNeedMore: Need more data UnpackResult msgpack_rpc_unpack(msgpack_unpacker* unpacker, msgpack_unpacked* result) + FUNC_ATTR_NONNULL_ALL { if (result->zone != NULL) { msgpack_zone_free(result->zone); @@ -425,12 +93,144 @@ UnpackResult msgpack_rpc_unpack(msgpack_unpacker* unpacker, return kUnpackResultNeedMore; } -REMOTE_FUNCS_IMPL(Buffer, buffer) -REMOTE_FUNCS_IMPL(Window, window) -REMOTE_FUNCS_IMPL(Tabpage, tabpage) +/// Finishes the msgpack-rpc call with an error message. +/// +/// @param msg The error message +/// @param res A packer that contains the response +void msgpack_rpc_error(char *msg, msgpack_packer *res) + FUNC_ATTR_NONNULL_ALL +{ + size_t len = strlen(msg); -TYPED_ARRAY_IMPL(Buffer, buffer) -TYPED_ARRAY_IMPL(Window, window) -TYPED_ARRAY_IMPL(Tabpage, tabpage) -TYPED_ARRAY_IMPL(String, string) + // error message + msgpack_pack_raw(res, len); + msgpack_pack_raw_body(res, msg, len); + // Nil result + msgpack_pack_nil(res); +} +/// Serializes a msgpack-rpc request or notification(id == 0) +WBuffer *serialize_request(uint64_t request_id, + String method, + Object arg, + msgpack_sbuffer *sbuffer) + FUNC_ATTR_NONNULL_ARG(4) +{ + msgpack_packer pac; + msgpack_packer_init(&pac, sbuffer, msgpack_sbuffer_write); + msgpack_pack_array(&pac, request_id ? 4 : 3); + msgpack_pack_int(&pac, request_id ? 0 : 2); + + if (request_id) { + msgpack_pack_uint64(&pac, request_id); + } + + msgpack_pack_raw(&pac, method.size); + msgpack_pack_raw_body(&pac, method.data, method.size); + msgpack_rpc_from_object(arg, &pac); + WBuffer *rv = wstream_new_buffer(xmemdup(sbuffer->data, sbuffer->size), + sbuffer->size, + free); + msgpack_rpc_free_object(arg); + msgpack_sbuffer_clear(sbuffer); + return rv; +} + +/// Serializes a msgpack-rpc response +WBuffer *serialize_response(uint64_t response_id, + char *err_msg, + Object arg, + msgpack_sbuffer *sbuffer) + FUNC_ATTR_NONNULL_ARG(4) +{ + msgpack_packer pac; + msgpack_packer_init(&pac, sbuffer, msgpack_sbuffer_write); + msgpack_pack_array(&pac, 4); + msgpack_pack_int(&pac, 1); + msgpack_pack_uint64(&pac, response_id); + + if (err_msg) { + String err = {.size = strlen(err_msg), .data = err_msg}; + // error message + msgpack_pack_raw(&pac, err.size); + msgpack_pack_raw_body(&pac, err.data, err.size); + // Nil result + msgpack_pack_nil(&pac); + } else { + // Nil error + msgpack_pack_nil(&pac); + // Return value + msgpack_rpc_from_object(arg, &pac); + } + + WBuffer *rv = wstream_new_buffer(xmemdup(sbuffer->data, sbuffer->size), + sbuffer->size, + free); + msgpack_rpc_free_object(arg); + msgpack_sbuffer_clear(sbuffer); + return rv; +} + +WBuffer *serialize_metadata(uint64_t id, + uint64_t channel_id, + msgpack_sbuffer *sbuffer) + FUNC_ATTR_NONNULL_ALL +{ + msgpack_packer pac; + msgpack_packer_init(&pac, sbuffer, msgpack_sbuffer_write); + msgpack_pack_array(&pac, 4); + msgpack_pack_int(&pac, 1); + msgpack_pack_uint64(&pac, id); + // Nil error + msgpack_pack_nil(&pac); + // The result is the [channel_id, metadata] array + msgpack_pack_array(&pac, 2); + msgpack_pack_uint64(&pac, channel_id); + msgpack_pack_raw(&pac, msgpack_metadata_size); + msgpack_pack_raw_body(&pac, msgpack_metadata, msgpack_metadata_size); + WBuffer *rv = wstream_new_buffer(xmemdup(sbuffer->data, sbuffer->size), + sbuffer->size, + free); + msgpack_sbuffer_clear(sbuffer); + return rv; +} + +static char *msgpack_rpc_validate(uint64_t *response_id, msgpack_object *req) +{ + // response id not known yet + + *response_id = 0; + // Validate the basic structure of the msgpack-rpc payload + if (req->type != MSGPACK_OBJECT_ARRAY) { + return "Request is not an array"; + } + + if (req->via.array.size != 4) { + return "Request array size should be 4"; + } + + if (req->via.array.ptr[1].type != MSGPACK_OBJECT_POSITIVE_INTEGER) { + return "Id must be a positive integer"; + } + + // Set the response id, which is the same as the request + *response_id = req->via.array.ptr[1].via.u64; + + if (req->via.array.ptr[0].type != MSGPACK_OBJECT_POSITIVE_INTEGER) { + return "Message type must be an integer"; + } + + if (req->via.array.ptr[0].via.u64 != 0) { + return "Message type must be 0"; + } + + if (req->via.array.ptr[2].type != MSGPACK_OBJECT_POSITIVE_INTEGER) { + return "Method id must be a positive integer"; + } + + if (req->via.array.ptr[3].type != MSGPACK_OBJECT_ARRAY) { + return "Paremeters must be an array"; + } + + return NULL; +} diff --git a/src/nvim/os/msgpack_rpc.h b/src/nvim/os/msgpack_rpc.h index baabff20aa..b8b947c0ec 100644 --- a/src/nvim/os/msgpack_rpc.h +++ b/src/nvim/os/msgpack_rpc.h @@ -8,6 +8,7 @@ #include "nvim/func_attr.h" #include "nvim/api/private/defs.h" +#include "nvim/os/wstream.h" typedef enum { kUnpackResultOk, /// Successfully parsed a document @@ -15,167 +16,26 @@ typedef enum { kUnpackResultNeedMore /// Need more data } UnpackResult; -/// Validates the basic structure of the msgpack-rpc call and fills `res` -/// with the basic response structure. -/// -/// @param id The channel id -/// @param req The parsed request object -/// @param res A packer that contains the response -void msgpack_rpc_call(uint64_t id, msgpack_object *req, msgpack_packer *res) - FUNC_ATTR_NONNULL_ARG(2) FUNC_ATTR_NONNULL_ARG(3); - -/// Packs a notification message -/// -/// @param type The message type, an arbitrary string -/// @param data The notification data -/// @param packer Where the notification will be packed to -void msgpack_rpc_notification(String type, Object data, msgpack_packer *pac) - FUNC_ATTR_NONNULL_ARG(3); - /// Dispatches to the actual API function after basic payload validation by /// `msgpack_rpc_call`. It is responsible for validating/converting arguments /// to C types, and converting the return value back to msgpack types. /// The implementation is generated at compile time with metadata extracted /// from the api/*.h headers, /// -/// @param id The channel id +/// @param channel_id The channel id +/// @param method_id The method id /// @param req The parsed request object -/// @param res A packer that contains the response -void msgpack_rpc_dispatch(uint64_t id, - msgpack_object *req, - msgpack_packer *res) +/// @param err Pointer to error structure +/// @return Some object +Object msgpack_rpc_dispatch(uint64_t channel_id, + uint64_t method_id, + msgpack_object *req, + Error *err) FUNC_ATTR_NONNULL_ARG(2) FUNC_ATTR_NONNULL_ARG(3); -/// Try to unpack a msgpack document from the data in the unpacker buffer. This -/// function is a replacement to msgpack.h `msgpack_unpack_next` that lets -/// the called know if the unpacking failed due to bad input or due to missing -/// data. -/// -/// @param unpacker The unpacker containing the parse buffer -/// @param result The result which will contain the parsed object -/// @return kUnpackResultOk : An object was parsed -/// kUnpackResultFail : Got bad input -/// kUnpackResultNeedMore: Need more data -UnpackResult msgpack_rpc_unpack(msgpack_unpacker* unpacker, - msgpack_unpacked* result); - -/// Finishes the msgpack-rpc call with an error message. -/// -/// @param msg The error message -/// @param res A packer that contains the response -void msgpack_rpc_error(char *msg, msgpack_packer *res) - FUNC_ATTR_NONNULL_ALL; - -/// Functions for validating and converting from msgpack types to C types. -/// These are used by `msgpack_rpc_dispatch` to validate and convert each -/// argument. -/// -/// @param obj The object to convert -/// @param[out] arg A pointer to the avalue -/// @return true if the conversion succeeded, false otherwise -bool msgpack_rpc_to_boolean(msgpack_object *obj, Boolean *arg) - FUNC_ATTR_NONNULL_ALL; -bool msgpack_rpc_to_integer(msgpack_object *obj, Integer *arg) - FUNC_ATTR_NONNULL_ALL; -bool msgpack_rpc_to_float(msgpack_object *obj, Float *arg) - FUNC_ATTR_NONNULL_ALL; -bool msgpack_rpc_to_position(msgpack_object *obj, Position *arg) - FUNC_ATTR_NONNULL_ALL; -bool msgpack_rpc_to_string(msgpack_object *obj, String *arg) - FUNC_ATTR_NONNULL_ALL; -bool msgpack_rpc_to_buffer(msgpack_object *obj, Buffer *arg) - FUNC_ATTR_NONNULL_ALL; -bool msgpack_rpc_to_window(msgpack_object *obj, Window *arg) - FUNC_ATTR_NONNULL_ALL; -bool msgpack_rpc_to_tabpage(msgpack_object *obj, Tabpage *arg) - FUNC_ATTR_NONNULL_ALL; -bool msgpack_rpc_to_object(msgpack_object *obj, Object *arg) - FUNC_ATTR_NONNULL_ALL; -bool msgpack_rpc_to_stringarray(msgpack_object *obj, StringArray *arg) - FUNC_ATTR_NONNULL_ALL; -bool msgpack_rpc_to_bufferarray(msgpack_object *obj, BufferArray *arg) - FUNC_ATTR_NONNULL_ALL; -bool msgpack_rpc_to_windowarray(msgpack_object *obj, WindowArray *arg) - FUNC_ATTR_NONNULL_ALL; -bool msgpack_rpc_to_tabpagearray(msgpack_object *obj, TabpageArray *arg) - FUNC_ATTR_NONNULL_ALL; -bool msgpack_rpc_to_array(msgpack_object *obj, Array *arg) - FUNC_ATTR_NONNULL_ALL; -bool msgpack_rpc_to_dictionary(msgpack_object *obj, Dictionary *arg) - FUNC_ATTR_NONNULL_ALL; - -/// Functions for converting from C types to msgpack types. -/// These are used by `msgpack_rpc_dispatch` to convert return values -/// from the API -/// -/// @param result A pointer to the result -/// @param res A packer that contains the response -void msgpack_rpc_from_boolean(Boolean result, msgpack_packer *res) - FUNC_ATTR_NONNULL_ARG(2); -void msgpack_rpc_from_integer(Integer result, msgpack_packer *res) - FUNC_ATTR_NONNULL_ARG(2); -void msgpack_rpc_from_float(Float result, msgpack_packer *res) - FUNC_ATTR_NONNULL_ARG(2); -void msgpack_rpc_from_position(Position result, msgpack_packer *res) - FUNC_ATTR_NONNULL_ARG(2); -void msgpack_rpc_from_string(String result, msgpack_packer *res) - FUNC_ATTR_NONNULL_ARG(2); -void msgpack_rpc_from_buffer(Buffer result, msgpack_packer *res) - FUNC_ATTR_NONNULL_ARG(2); -void msgpack_rpc_from_window(Window result, msgpack_packer *res) - FUNC_ATTR_NONNULL_ARG(2); -void msgpack_rpc_from_tabpage(Tabpage result, msgpack_packer *res) - FUNC_ATTR_NONNULL_ARG(2); -void msgpack_rpc_from_object(Object result, msgpack_packer *res) - FUNC_ATTR_NONNULL_ARG(2); -void msgpack_rpc_from_stringarray(StringArray result, msgpack_packer *res) - FUNC_ATTR_NONNULL_ARG(2); -void msgpack_rpc_from_bufferarray(BufferArray result, msgpack_packer *res) - FUNC_ATTR_NONNULL_ARG(2); -void msgpack_rpc_from_windowarray(WindowArray result, msgpack_packer *res) - FUNC_ATTR_NONNULL_ARG(2); -void msgpack_rpc_from_tabpagearray(TabpageArray result, msgpack_packer *res) - FUNC_ATTR_NONNULL_ARG(2); -void msgpack_rpc_from_array(Array result, msgpack_packer *res) - FUNC_ATTR_NONNULL_ARG(2); -void msgpack_rpc_from_dictionary(Dictionary result, msgpack_packer *res) - FUNC_ATTR_NONNULL_ARG(2); - -/// Helpers for initializing types that may be freed later -#define msgpack_rpc_init_boolean -#define msgpack_rpc_init_integer -#define msgpack_rpc_init_float -#define msgpack_rpc_init_position -#define msgpack_rpc_init_string = STRING_INIT -#define msgpack_rpc_init_buffer -#define msgpack_rpc_init_window -#define msgpack_rpc_init_tabpage -#define msgpack_rpc_init_object = {.type = kObjectTypeNil} -#define msgpack_rpc_init_stringarray = ARRAY_DICT_INIT -#define msgpack_rpc_init_bufferarray = ARRAY_DICT_INIT -#define msgpack_rpc_init_windowarray = ARRAY_DICT_INIT -#define msgpack_rpc_init_tabpagearray = ARRAY_DICT_INIT -#define msgpack_rpc_init_array = ARRAY_DICT_INIT -#define msgpack_rpc_init_dictionary = ARRAY_DICT_INIT - -/// Helpers for freeing arguments/return value -/// -/// @param value The value to be freed -#define msgpack_rpc_free_boolean(value) -#define msgpack_rpc_free_integer(value) -#define msgpack_rpc_free_float(value) -#define msgpack_rpc_free_position(value) -void msgpack_rpc_free_string(String value); -#define msgpack_rpc_free_buffer(value) -#define msgpack_rpc_free_window(value) -#define msgpack_rpc_free_tabpage(value) -void msgpack_rpc_free_object(Object value); -void msgpack_rpc_free_stringarray(StringArray value); -void msgpack_rpc_free_bufferarray(BufferArray value); -void msgpack_rpc_free_windowarray(WindowArray value); -void msgpack_rpc_free_tabpagearray(TabpageArray value); -void msgpack_rpc_free_array(Array value); -void msgpack_rpc_free_dictionary(Dictionary value); +#ifdef INCLUDE_GENERATED_DECLARATIONS +# include "os/msgpack_rpc.h.generated.h" +#endif #endif // NVIM_OS_MSGPACK_RPC_H diff --git a/src/nvim/os/msgpack_rpc_helpers.c b/src/nvim/os/msgpack_rpc_helpers.c new file mode 100644 index 0000000000..e2c277abe4 --- /dev/null +++ b/src/nvim/os/msgpack_rpc_helpers.c @@ -0,0 +1,380 @@ +#include +#include + +#include + +#include "nvim/os/msgpack_rpc_helpers.h" +#include "nvim/vim.h" +#include "nvim/memory.h" + +#define REMOTE_FUNCS_IMPL(t, lt) \ + bool msgpack_rpc_to_##lt(msgpack_object *obj, t *arg) \ + { \ + *arg = obj->via.u64; \ + return obj->type == MSGPACK_OBJECT_POSITIVE_INTEGER; \ + } \ + \ + void msgpack_rpc_from_##lt(t result, msgpack_packer *res) \ + { \ + msgpack_pack_uint64(res, result); \ + } + +#define TYPED_ARRAY_IMPL(t, lt) \ + bool msgpack_rpc_to_##lt##array(msgpack_object *obj, t##Array *arg) \ + { \ + if (obj->type != MSGPACK_OBJECT_ARRAY) { \ + return false; \ + } \ + \ + arg->size = obj->via.array.size; \ + arg->items = xcalloc(obj->via.array.size, sizeof(t)); \ + \ + for (size_t i = 0; i < obj->via.array.size; i++) { \ + if (!msgpack_rpc_to_##lt(obj->via.array.ptr + i, &arg->items[i])) { \ + return false; \ + } \ + } \ + \ + return true; \ + } \ + \ + void msgpack_rpc_from_##lt##array(t##Array result, msgpack_packer *res) \ + { \ + msgpack_pack_array(res, result.size); \ + \ + for (size_t i = 0; i < result.size; i++) { \ + msgpack_rpc_from_##lt(result.items[i], res); \ + } \ + } \ + \ + void msgpack_rpc_free_##lt##array(t##Array value) { \ + for (size_t i = 0; i < value.size; i++) { \ + msgpack_rpc_free_##lt(value.items[i]); \ + } \ + \ + free(value.items); \ + } + +bool msgpack_rpc_to_boolean(msgpack_object *obj, Boolean *arg) +{ + *arg = obj->via.boolean; + return obj->type == MSGPACK_OBJECT_BOOLEAN; +} + +bool msgpack_rpc_to_integer(msgpack_object *obj, Integer *arg) +{ + if (obj->type == MSGPACK_OBJECT_POSITIVE_INTEGER + && obj->via.u64 <= INT64_MAX) { + *arg = (int64_t)obj->via.u64; + return true; + } + + *arg = obj->via.i64; + return obj->type == MSGPACK_OBJECT_NEGATIVE_INTEGER; +} + +bool msgpack_rpc_to_float(msgpack_object *obj, Float *arg) +{ + *arg = obj->via.dec; + return obj->type == MSGPACK_OBJECT_DOUBLE; +} + +bool msgpack_rpc_to_string(msgpack_object *obj, String *arg) +{ + if (obj->type != MSGPACK_OBJECT_RAW) { + return false; + } + + arg->data = xmemdupz(obj->via.raw.ptr, obj->via.raw.size); + arg->size = obj->via.raw.size; + return true; +} + +bool msgpack_rpc_to_object(msgpack_object *obj, Object *arg) +{ + switch (obj->type) { + case MSGPACK_OBJECT_NIL: + arg->type = kObjectTypeNil; + return true; + + case MSGPACK_OBJECT_BOOLEAN: + arg->type = kObjectTypeBoolean; + return msgpack_rpc_to_boolean(obj, &arg->data.boolean); + + case MSGPACK_OBJECT_POSITIVE_INTEGER: + case MSGPACK_OBJECT_NEGATIVE_INTEGER: + arg->type = kObjectTypeInteger; + return msgpack_rpc_to_integer(obj, &arg->data.integer); + + case MSGPACK_OBJECT_DOUBLE: + arg->type = kObjectTypeFloat; + return msgpack_rpc_to_float(obj, &arg->data.floating); + + case MSGPACK_OBJECT_RAW: + arg->type = kObjectTypeString; + return msgpack_rpc_to_string(obj, &arg->data.string); + + case MSGPACK_OBJECT_ARRAY: + arg->type = kObjectTypeArray; + return msgpack_rpc_to_array(obj, &arg->data.array); + + case MSGPACK_OBJECT_MAP: + arg->type = kObjectTypeDictionary; + return msgpack_rpc_to_dictionary(obj, &arg->data.dictionary); + + default: + return false; + } +} + +bool msgpack_rpc_to_position(msgpack_object *obj, Position *arg) +{ + return obj->type == MSGPACK_OBJECT_ARRAY + && obj->via.array.size == 2 + && msgpack_rpc_to_integer(obj->via.array.ptr, &arg->row) + && msgpack_rpc_to_integer(obj->via.array.ptr + 1, &arg->col); +} + + +bool msgpack_rpc_to_array(msgpack_object *obj, Array *arg) +{ + if (obj->type != MSGPACK_OBJECT_ARRAY) { + return false; + } + + arg->size = obj->via.array.size; + arg->items = xcalloc(obj->via.array.size, sizeof(Object)); + + for (uint32_t i = 0; i < obj->via.array.size; i++) { + if (!msgpack_rpc_to_object(obj->via.array.ptr + i, &arg->items[i])) { + return false; + } + } + + return true; +} + +bool msgpack_rpc_to_dictionary(msgpack_object *obj, Dictionary *arg) +{ + if (obj->type != MSGPACK_OBJECT_MAP) { + return false; + } + + arg->size = obj->via.array.size; + arg->items = xcalloc(obj->via.map.size, sizeof(KeyValuePair)); + + + for (uint32_t i = 0; i < obj->via.map.size; i++) { + if (!msgpack_rpc_to_string(&obj->via.map.ptr[i].key, + &arg->items[i].key)) { + return false; + } + + if (!msgpack_rpc_to_object(&obj->via.map.ptr[i].val, + &arg->items[i].value)) { + return false; + } + } + + return true; +} + +void msgpack_rpc_from_boolean(Boolean result, msgpack_packer *res) +{ + if (result) { + msgpack_pack_true(res); + } else { + msgpack_pack_false(res); + } +} + +void msgpack_rpc_from_integer(Integer result, msgpack_packer *res) +{ + msgpack_pack_int64(res, result); +} + +void msgpack_rpc_from_float(Float result, msgpack_packer *res) +{ + msgpack_pack_double(res, result); +} + +void msgpack_rpc_from_string(String result, msgpack_packer *res) +{ + msgpack_pack_raw(res, result.size); + msgpack_pack_raw_body(res, result.data, result.size); +} + +void msgpack_rpc_from_object(Object result, msgpack_packer *res) +{ + switch (result.type) { + case kObjectTypeNil: + msgpack_pack_nil(res); + break; + + case kObjectTypeBoolean: + msgpack_rpc_from_boolean(result.data.boolean, res); + break; + + case kObjectTypeInteger: + msgpack_rpc_from_integer(result.data.integer, res); + break; + + case kObjectTypeFloat: + msgpack_rpc_from_float(result.data.floating, res); + break; + + case kObjectTypeString: + msgpack_rpc_from_string(result.data.string, res); + break; + + case kObjectTypeArray: + msgpack_rpc_from_array(result.data.array, res); + break; + + case kObjectTypePosition: + msgpack_rpc_from_position(result.data.position, res); + break; + + case kObjectTypeBuffer: + msgpack_rpc_from_buffer(result.data.buffer, res); + break; + + case kObjectTypeWindow: + msgpack_rpc_from_window(result.data.window, res); + break; + + case kObjectTypeTabpage: + msgpack_rpc_from_tabpage(result.data.tabpage, res); + break; + + case kObjectTypeStringArray: + msgpack_rpc_from_stringarray(result.data.stringarray, res); + break; + + case kObjectTypeBufferArray: + msgpack_rpc_from_bufferarray(result.data.bufferarray, res); + break; + + case kObjectTypeWindowArray: + msgpack_rpc_from_windowarray(result.data.windowarray, res); + break; + + case kObjectTypeTabpageArray: + msgpack_rpc_from_tabpagearray(result.data.tabpagearray, res); + break; + + case kObjectTypeDictionary: + msgpack_rpc_from_dictionary(result.data.dictionary, res); + break; + } +} + +void msgpack_rpc_from_position(Position result, msgpack_packer *res) +{ + msgpack_pack_array(res, 2);; + msgpack_pack_int64(res, result.row); + msgpack_pack_int64(res, result.col); +} + +void msgpack_rpc_from_array(Array result, msgpack_packer *res) +{ + msgpack_pack_array(res, result.size); + + for (size_t i = 0; i < result.size; i++) { + msgpack_rpc_from_object(result.items[i], res); + } +} + +void msgpack_rpc_from_dictionary(Dictionary result, msgpack_packer *res) +{ + msgpack_pack_map(res, result.size); + + for (size_t i = 0; i < result.size; i++) { + msgpack_rpc_from_string(result.items[i].key, res); + msgpack_rpc_from_object(result.items[i].value, res); + } +} + +void msgpack_rpc_free_string(String value) +{ + if (!value.data) { + return; + } + + free(value.data); +} + +void msgpack_rpc_free_object(Object value) +{ + switch (value.type) { + case kObjectTypeNil: + case kObjectTypeBoolean: + case kObjectTypeInteger: + case kObjectTypeFloat: + case kObjectTypePosition: + case kObjectTypeBuffer: + case kObjectTypeWindow: + case kObjectTypeTabpage: + break; + + case kObjectTypeString: + msgpack_rpc_free_string(value.data.string); + break; + + case kObjectTypeArray: + msgpack_rpc_free_array(value.data.array); + break; + + case kObjectTypeStringArray: + msgpack_rpc_free_stringarray(value.data.stringarray); + break; + + case kObjectTypeBufferArray: + msgpack_rpc_free_bufferarray(value.data.bufferarray); + break; + + case kObjectTypeWindowArray: + msgpack_rpc_free_windowarray(value.data.windowarray); + break; + + case kObjectTypeTabpageArray: + msgpack_rpc_free_tabpagearray(value.data.tabpagearray); + break; + + case kObjectTypeDictionary: + msgpack_rpc_free_dictionary(value.data.dictionary); + break; + + default: + abort(); + } +} + +void msgpack_rpc_free_array(Array value) +{ + for (uint32_t i = 0; i < value.size; i++) { + msgpack_rpc_free_object(value.items[i]); + } + + free(value.items); +} + +void msgpack_rpc_free_dictionary(Dictionary value) +{ + for (uint32_t i = 0; i < value.size; i++) { + msgpack_rpc_free_string(value.items[i].key); + msgpack_rpc_free_object(value.items[i].value); + } + + free(value.items); +} + +REMOTE_FUNCS_IMPL(Buffer, buffer) +REMOTE_FUNCS_IMPL(Window, window) +REMOTE_FUNCS_IMPL(Tabpage, tabpage) + +TYPED_ARRAY_IMPL(Buffer, buffer) +TYPED_ARRAY_IMPL(Window, window) +TYPED_ARRAY_IMPL(Tabpage, tabpage) +TYPED_ARRAY_IMPL(String, string) + diff --git a/src/nvim/os/msgpack_rpc_helpers.h b/src/nvim/os/msgpack_rpc_helpers.h new file mode 100644 index 0000000000..e3d1e756ef --- /dev/null +++ b/src/nvim/os/msgpack_rpc_helpers.h @@ -0,0 +1,124 @@ +#ifndef NVIM_OS_MSGPACK_RPC_HELPERS_H +#define NVIM_OS_MSGPACK_RPC_HELPERS_H + +#include +#include + +#include + +#include "nvim/func_attr.h" +#include "nvim/api/private/defs.h" + +/// Functions for validating and converting from msgpack types to C types. +/// These are used by `msgpack_rpc_dispatch` to validate and convert each +/// argument. +/// +/// @param obj The object to convert +/// @param[out] arg A pointer to the avalue +/// @return true if the conversion succeeded, false otherwise +bool msgpack_rpc_to_boolean(msgpack_object *obj, Boolean *arg) + FUNC_ATTR_NONNULL_ALL; +bool msgpack_rpc_to_integer(msgpack_object *obj, Integer *arg) + FUNC_ATTR_NONNULL_ALL; +bool msgpack_rpc_to_float(msgpack_object *obj, Float *arg) + FUNC_ATTR_NONNULL_ALL; +bool msgpack_rpc_to_position(msgpack_object *obj, Position *arg) + FUNC_ATTR_NONNULL_ALL; +bool msgpack_rpc_to_string(msgpack_object *obj, String *arg) + FUNC_ATTR_NONNULL_ALL; +bool msgpack_rpc_to_buffer(msgpack_object *obj, Buffer *arg) + FUNC_ATTR_NONNULL_ALL; +bool msgpack_rpc_to_window(msgpack_object *obj, Window *arg) + FUNC_ATTR_NONNULL_ALL; +bool msgpack_rpc_to_tabpage(msgpack_object *obj, Tabpage *arg) + FUNC_ATTR_NONNULL_ALL; +bool msgpack_rpc_to_object(msgpack_object *obj, Object *arg) + FUNC_ATTR_NONNULL_ALL; +bool msgpack_rpc_to_stringarray(msgpack_object *obj, StringArray *arg) + FUNC_ATTR_NONNULL_ALL; +bool msgpack_rpc_to_bufferarray(msgpack_object *obj, BufferArray *arg) + FUNC_ATTR_NONNULL_ALL; +bool msgpack_rpc_to_windowarray(msgpack_object *obj, WindowArray *arg) + FUNC_ATTR_NONNULL_ALL; +bool msgpack_rpc_to_tabpagearray(msgpack_object *obj, TabpageArray *arg) + FUNC_ATTR_NONNULL_ALL; +bool msgpack_rpc_to_array(msgpack_object *obj, Array *arg) + FUNC_ATTR_NONNULL_ALL; +bool msgpack_rpc_to_dictionary(msgpack_object *obj, Dictionary *arg) + FUNC_ATTR_NONNULL_ALL; + +/// Functions for converting from C types to msgpack types. +/// These are used by `msgpack_rpc_dispatch` to convert return values +/// from the API +/// +/// @param result A pointer to the result +/// @param res A packer that contains the response +void msgpack_rpc_from_boolean(Boolean result, msgpack_packer *res) + FUNC_ATTR_NONNULL_ARG(2); +void msgpack_rpc_from_integer(Integer result, msgpack_packer *res) + FUNC_ATTR_NONNULL_ARG(2); +void msgpack_rpc_from_float(Float result, msgpack_packer *res) + FUNC_ATTR_NONNULL_ARG(2); +void msgpack_rpc_from_position(Position result, msgpack_packer *res) + FUNC_ATTR_NONNULL_ARG(2); +void msgpack_rpc_from_string(String result, msgpack_packer *res) + FUNC_ATTR_NONNULL_ARG(2); +void msgpack_rpc_from_buffer(Buffer result, msgpack_packer *res) + FUNC_ATTR_NONNULL_ARG(2); +void msgpack_rpc_from_window(Window result, msgpack_packer *res) + FUNC_ATTR_NONNULL_ARG(2); +void msgpack_rpc_from_tabpage(Tabpage result, msgpack_packer *res) + FUNC_ATTR_NONNULL_ARG(2); +void msgpack_rpc_from_object(Object result, msgpack_packer *res) + FUNC_ATTR_NONNULL_ARG(2); +void msgpack_rpc_from_stringarray(StringArray result, msgpack_packer *res) + FUNC_ATTR_NONNULL_ARG(2); +void msgpack_rpc_from_bufferarray(BufferArray result, msgpack_packer *res) + FUNC_ATTR_NONNULL_ARG(2); +void msgpack_rpc_from_windowarray(WindowArray result, msgpack_packer *res) + FUNC_ATTR_NONNULL_ARG(2); +void msgpack_rpc_from_tabpagearray(TabpageArray result, msgpack_packer *res) + FUNC_ATTR_NONNULL_ARG(2); +void msgpack_rpc_from_array(Array result, msgpack_packer *res) + FUNC_ATTR_NONNULL_ARG(2); +void msgpack_rpc_from_dictionary(Dictionary result, msgpack_packer *res) + FUNC_ATTR_NONNULL_ARG(2); + +/// Helpers for initializing types that may be freed later +#define msgpack_rpc_init_boolean +#define msgpack_rpc_init_integer +#define msgpack_rpc_init_float +#define msgpack_rpc_init_position +#define msgpack_rpc_init_string = STRING_INIT +#define msgpack_rpc_init_buffer +#define msgpack_rpc_init_window +#define msgpack_rpc_init_tabpage +#define msgpack_rpc_init_object = {.type = kObjectTypeNil} +#define msgpack_rpc_init_stringarray = ARRAY_DICT_INIT +#define msgpack_rpc_init_bufferarray = ARRAY_DICT_INIT +#define msgpack_rpc_init_windowarray = ARRAY_DICT_INIT +#define msgpack_rpc_init_tabpagearray = ARRAY_DICT_INIT +#define msgpack_rpc_init_array = ARRAY_DICT_INIT +#define msgpack_rpc_init_dictionary = ARRAY_DICT_INIT + +/// Helpers for freeing arguments/return value +/// +/// @param value The value to be freed +#define msgpack_rpc_free_boolean(value) +#define msgpack_rpc_free_integer(value) +#define msgpack_rpc_free_float(value) +#define msgpack_rpc_free_position(value) +void msgpack_rpc_free_string(String value); +#define msgpack_rpc_free_buffer(value) +#define msgpack_rpc_free_window(value) +#define msgpack_rpc_free_tabpage(value) +void msgpack_rpc_free_object(Object value); +void msgpack_rpc_free_stringarray(StringArray value); +void msgpack_rpc_free_bufferarray(BufferArray value); +void msgpack_rpc_free_windowarray(WindowArray value); +void msgpack_rpc_free_tabpagearray(TabpageArray value); +void msgpack_rpc_free_array(Array value); +void msgpack_rpc_free_dictionary(Dictionary value); + +#endif // NVIM_OS_MSGPACK_RPC_HELPERS_H + diff --git a/src/nvim/os/wstream.c b/src/nvim/os/wstream.c index 9a908a4348..13b8e8d9dc 100644 --- a/src/nvim/os/wstream.c +++ b/src/nvim/os/wstream.c @@ -9,6 +9,8 @@ #include "nvim/vim.h" #include "nvim/memory.h" +#define DEFAULT_MAXMEM 1024 * 1024 * 10 + struct wstream { uv_stream_t *stream; // Memory currently used by pending buffers @@ -43,6 +45,10 @@ typedef struct { /// @return The newly-allocated `WStream` instance WStream * wstream_new(size_t maxmem) { + if (!maxmem) { + maxmem = DEFAULT_MAXMEM; + } + WStream *rv = xmalloc(sizeof(WStream)); rv->maxmem = maxmem; rv->stream = NULL; @@ -91,11 +97,12 @@ bool wstream_write(WStream *wstream, WBuffer *buffer) // This should not be called after a wstream was freed assert(!wstream->freed); + buffer->refcount++; + if (wstream->curmem > wstream->maxmem) { - return false; + goto err; } - buffer->refcount++; wstream->curmem += buffer->size; data = xmalloc(sizeof(WriteData)); data->wstream = wstream; @@ -105,9 +112,16 @@ bool wstream_write(WStream *wstream, WBuffer *buffer) uvbuf.base = buffer->data; uvbuf.len = buffer->size; wstream->pending_reqs++; - uv_write(req, wstream->stream, &uvbuf, 1, write_cb); + + if (uv_write(req, wstream->stream, &uvbuf, 1, write_cb)) { + goto err; + } return true; + +err: + release_wbuffer(buffer); + return false; } /// Creates a WBuffer object for holding output data. Instances of this @@ -138,10 +152,7 @@ static void write_cb(uv_write_t *req, int status) free(req); data->wstream->curmem -= data->buffer->size; - if (!--data->buffer->refcount) { - data->buffer->cb(data->buffer->data); - free(data->buffer); - } + release_wbuffer(data->buffer); data->wstream->pending_reqs--; if (data->wstream->freed && data->wstream->pending_reqs == 0) { @@ -152,3 +163,10 @@ static void write_cb(uv_write_t *req, int status) free(data); } +static void release_wbuffer(WBuffer *buffer) +{ + if (!--buffer->refcount) { + buffer->cb(buffer->data); + free(buffer); + } +}