channel: Extract 'channel_write' function

This commit is contained in:
Thiago de Arruda 2014-06-20 10:52:56 -03:00
parent 0dea2682dc
commit c8297e462a

View File

@ -14,13 +14,14 @@
#include "nvim/os/msgpack_rpc.h"
#include "nvim/vim.h"
#include "nvim/memory.h"
#include "nvim/message.h"
#include "nvim/map.h"
#include "nvim/lib/kvec.h"
typedef struct {
uint64_t id;
PMap(cstr_t) *subscribed_events;
bool is_job;
bool is_job, is_alive;
msgpack_unpacker *unpacker;
msgpack_sbuffer *sbuffer;
union {
@ -215,12 +216,13 @@ static void parse_msgpack(RStream *rstream, void *data, bool eof)
msgpack_packer_init(&response, channel->sbuffer, msgpack_sbuffer_write);
// Perform the call
msgpack_rpc_call(channel->id, &unpacked.data, &response);
wstream_write(channel->data.streams.write,
wstream_new_buffer(xmemdup(channel->sbuffer->data,
channel->sbuffer->size),
channel->sbuffer->size,
free));
WBuffer *buffer = wstream_new_buffer(xmemdup(channel->sbuffer->data,
channel->sbuffer->size),
channel->sbuffer->size,
free);
if (!channel_write(channel, buffer)) {
return;
}
// Clear the buffer for future calls
msgpack_sbuffer_clear(channel->sbuffer);
}
@ -240,19 +242,40 @@ static void parse_msgpack(RStream *rstream, void *data, bool eof)
"This error can also happen when deserializing "
"an object with high level of nesting",
&response);
wstream_write(channel->data.streams.write,
wstream_new_buffer(xmemdup(channel->sbuffer->data,
channel->sbuffer->size),
channel->sbuffer->size,
free));
WBuffer *buffer = wstream_new_buffer(xmemdup(channel->sbuffer->data,
channel->sbuffer->size),
channel->sbuffer->size,
free);
if (!channel_write(channel, buffer)) {
return;
}
// Clear the buffer for future calls
msgpack_sbuffer_clear(channel->sbuffer);
}
}
static bool channel_write(Channel *channel, WBuffer *buffer)
{
bool success;
if (channel->is_job) {
success = job_write(channel->data.job, buffer);
} else {
success = wstream_write(channel->data.streams.write, buffer);
}
if (!success) {
// If the write failed for whatever reason, mark the channel as not alive so
// it can be freed later
channel->is_alive = false;
}
return success;
}
static void send_event(Channel *channel, char *type, Object data)
{
wstream_write(channel->data.streams.write, serialize_event(type, data));
channel_write(channel, serialize_event(type, data));
}
static void broadcast_event(char *type, Object data)
@ -275,7 +298,7 @@ static void broadcast_event(char *type, Object data)
WBuffer *buffer = serialize_event(type, data);
for (size_t i = 0; i < kv_size(subscribed); i++) {
wstream_write(kv_A(subscribed, i)->data.streams.write, buffer);
channel_write(kv_A(subscribed, i), buffer);
}
end:
@ -349,6 +372,7 @@ static WBuffer *serialize_event(char *type, Object data)
static Channel *register_channel()
{
Channel *rv = xmalloc(sizeof(Channel));
rv->is_alive = true;
rv->unpacker = msgpack_unpacker_new(MSGPACK_UNPACKER_INIT_BUFFER_SIZE);
rv->sbuffer = msgpack_sbuffer_new();
rv->id = next_id++;