Merge branch 'remove-multiple-protocol-support'

This commit is contained in:
Thiago de Arruda 2014-05-27 15:09:14 -03:00
commit 1b43e5c47e
5 changed files with 35 additions and 107 deletions

View File

@ -5,7 +5,6 @@
#include "nvim/api/private/helpers.h"
#include "nvim/os/channel.h"
#include "nvim/os/channel_defs.h"
#include "nvim/os/rstream.h"
#include "nvim/os/rstream_defs.h"
#include "nvim/os/wstream.h"
@ -19,14 +18,9 @@
typedef struct {
uint64_t id;
ChannelProtocol protocol;
bool is_job;
union {
struct {
msgpack_unpacker *unpacker;
msgpack_sbuffer *sbuffer;
} msgpack;
} proto;
msgpack_unpacker *unpacker;
msgpack_sbuffer *sbuffer;
union {
int job_id;
struct {
@ -44,7 +38,6 @@ static msgpack_sbuffer msgpack_event_buffer;
static void on_job_stdout(RStream *rstream, void *data, bool eof);
static void on_job_stderr(RStream *rstream, void *data, bool eof);
static void parse_msgpack(RStream *rstream, void *data, bool eof);
static void send_msgpack(Channel *channel, String type, Object data);
static void close_channel(Channel *channel);
static void close_cb(uv_handle_t *handle);
@ -67,48 +60,28 @@ void channel_teardown()
});
}
void channel_from_job(char **argv, ChannelProtocol prot)
void channel_from_job(char **argv)
{
Channel *channel = xmalloc(sizeof(Channel));
rstream_cb rcb = NULL;
switch (prot) {
case kChannelProtocolMsgpack:
rcb = on_job_stdout;
channel->proto.msgpack.unpacker =
msgpack_unpacker_new(MSGPACK_UNPACKER_INIT_BUFFER_SIZE);
channel->proto.msgpack.sbuffer = msgpack_sbuffer_new();
break;
default:
abort();
}
rstream_cb rcb = on_job_stdout;
channel->unpacker = msgpack_unpacker_new(MSGPACK_UNPACKER_INIT_BUFFER_SIZE);
channel->sbuffer = msgpack_sbuffer_new();
channel->id = next_id++;
channel->protocol = prot;
channel->is_job = true;
channel->data.job_id = job_start(argv, channel, rcb, on_job_stderr, NULL);
map_put(uint64_t)(channels, channel->id, channel);
}
void channel_from_stream(uv_stream_t *stream, ChannelProtocol prot)
void channel_from_stream(uv_stream_t *stream)
{
Channel *channel = xmalloc(sizeof(Channel));
rstream_cb rcb = NULL;
switch (prot) {
case kChannelProtocolMsgpack:
rcb = parse_msgpack;
channel->proto.msgpack.unpacker =
msgpack_unpacker_new(MSGPACK_UNPACKER_INIT_BUFFER_SIZE);
channel->proto.msgpack.sbuffer = msgpack_sbuffer_new();
break;
default:
abort();
}
rstream_cb rcb = parse_msgpack;
channel->unpacker = msgpack_unpacker_new(MSGPACK_UNPACKER_INIT_BUFFER_SIZE);
channel->sbuffer = msgpack_sbuffer_new();
stream->data = NULL;
channel->id = next_id++;
channel->protocol = prot;
channel->is_job = false;
// read stream
channel->data.streams.read = rstream_new(rcb, 1024, channel, true);
@ -131,16 +104,18 @@ bool channel_send_event(uint64_t id, char *type, typval_T *data)
String event_type = {.size = strnlen(type, 1024), .data = type};
Object event_data = vim_to_object(data);
msgpack_packer packer;
msgpack_packer_init(&packer, &msgpack_event_buffer, msgpack_sbuffer_write);
msgpack_rpc_notification(event_type, event_data, &packer);
char *bytes = xmemdup(msgpack_event_buffer.data, msgpack_event_buffer.size);
switch (channel->protocol) {
case kChannelProtocolMsgpack:
send_msgpack(channel, event_type, event_data);
break;
default:
abort();
}
wstream_write(channel->data.streams.write,
bytes,
msgpack_event_buffer.size,
true);
msgpack_rpc_free_object(event_data);
msgpack_sbuffer_clear(&msgpack_event_buffer);
return true;
}
@ -168,62 +143,35 @@ static void parse_msgpack(RStream *rstream, void *data, bool eof)
uint32_t count = rstream_available(rstream);
// Feed the unpacker with data
msgpack_unpacker_reserve_buffer(channel->proto.msgpack.unpacker, count);
rstream_read(rstream,
msgpack_unpacker_buffer(channel->proto.msgpack.unpacker),
count);
msgpack_unpacker_buffer_consumed(channel->proto.msgpack.unpacker, count);
msgpack_unpacker_reserve_buffer(channel->unpacker, count);
rstream_read(rstream, msgpack_unpacker_buffer(channel->unpacker), count);
msgpack_unpacker_buffer_consumed(channel->unpacker, count);
msgpack_unpacked unpacked;
msgpack_unpacked_init(&unpacked);
// Deserialize everything we can.
while (msgpack_unpacker_next(channel->proto.msgpack.unpacker, &unpacked)) {
while (msgpack_unpacker_next(channel->unpacker, &unpacked)) {
// Each object is a new msgpack-rpc request and requires an empty response
msgpack_packer response;
msgpack_packer_init(&response,
channel->proto.msgpack.sbuffer,
msgpack_sbuffer_write);
msgpack_packer_init(&response, channel->sbuffer, msgpack_sbuffer_write);
// Perform the call
msgpack_rpc_call(channel->id, &unpacked.data, &response);
wstream_write(channel->data.streams.write,
xmemdup(channel->proto.msgpack.sbuffer->data,
channel->proto.msgpack.sbuffer->size),
channel->proto.msgpack.sbuffer->size,
xmemdup(channel->sbuffer->data, channel->sbuffer->size),
channel->sbuffer->size,
true);
// Clear the buffer for future calls
msgpack_sbuffer_clear(channel->proto.msgpack.sbuffer);
msgpack_sbuffer_clear(channel->sbuffer);
}
}
static void send_msgpack(Channel *channel, String type, Object data)
{
msgpack_packer packer;
msgpack_packer_init(&packer, &msgpack_event_buffer, msgpack_sbuffer_write);
msgpack_rpc_notification(type, data, &packer);
char *bytes = xmemdup(msgpack_event_buffer.data, msgpack_event_buffer.size);
wstream_write(channel->data.streams.write,
bytes,
msgpack_event_buffer.size,
true);
msgpack_sbuffer_clear(&msgpack_event_buffer);
}
static void close_channel(Channel *channel)
{
map_del(uint64_t)(channels, channel->id);
switch (channel->protocol) {
case kChannelProtocolMsgpack:
msgpack_sbuffer_free(channel->proto.msgpack.sbuffer);
msgpack_unpacker_free(channel->proto.msgpack.unpacker);
break;
default:
abort();
}
msgpack_sbuffer_free(channel->sbuffer);
msgpack_unpacker_free(channel->unpacker);
if (channel->is_job) {
job_stop(channel->data.job_id);

View File

@ -4,7 +4,6 @@
#include <uv.h>
#include "nvim/vim.h"
#include "nvim/os/channel_defs.h"
/// Initializes the module
void channel_init(void);
@ -16,15 +15,13 @@ void channel_teardown(void);
/// pipe/socket client connection
///
/// @param stream The established connection
/// @param prot The rpc protocol used
void channel_from_stream(uv_stream_t *stream, ChannelProtocol prot);
void channel_from_stream(uv_stream_t *stream);
/// Creates an API channel by starting a job and connecting to its
/// stdin/stdout. stderr is forwarded to the editor error stream.
///
/// @param argv The argument vector for the process
/// @param prot The rpc protocol used
void channel_from_job(char **argv, ChannelProtocol prot);
void channel_from_job(char **argv);
/// Sends event/data to channel
///

View File

@ -1,8 +0,0 @@
#ifndef NVIM_OS_CHANNEL_DEFS_H
#define NVIM_OS_CHANNEL_DEFS_H
typedef enum {
kChannelProtocolMsgpack
} ChannelProtocol;
#endif // NVIM_OS_CHANNEL_DEFS_H

View File

@ -5,7 +5,6 @@
#include <uv.h>
#include "nvim/os/channel_defs.h"
#include "nvim/os/channel.h"
#include "nvim/os/server.h"
#include "nvim/os/os.h"
@ -25,8 +24,6 @@ typedef enum {
} ServerType;
typedef struct {
// Protocol for channels established through this server
ChannelProtocol protocol;
// Type of the union below
ServerType type;
@ -59,8 +56,7 @@ void server_init()
free(listen_address);
}
server_start((char *)os_getenv("NEOVIM_LISTEN_ADDRESS"),
kChannelProtocolMsgpack);
server_start((char *)os_getenv("NEOVIM_LISTEN_ADDRESS"));
}
void server_teardown()
@ -80,7 +76,7 @@ void server_teardown()
});
}
void server_start(char *endpoint, ChannelProtocol prot)
void server_start(char *endpoint)
{
char addr[ADDRESS_MAX_SIZE];
@ -101,8 +97,6 @@ void server_start(char *endpoint, ChannelProtocol prot)
Server *server = xmalloc(sizeof(Server));
char ip[16], *ip_end = strrchr(addr, ':');
server->protocol = prot;
if (!ip_end) {
ip_end = strchr(addr, NUL);
}
@ -229,7 +223,7 @@ static void connection_cb(uv_stream_t *server, int status)
return;
}
channel_from_stream(client, srv->protocol);
channel_from_stream(client);
}
static void free_client(uv_handle_t *handle)

View File

@ -1,8 +1,6 @@
#ifndef NVIM_OS_SERVER_H
#define NVIM_OS_SERVER_H
#include "nvim/os/channel_defs.h"
/// Initializes the module
void server_init();
@ -18,8 +16,7 @@ void server_teardown();
/// @param endpoint Address of the server. Either a 'ip:port' string or an
/// arbitrary identifier(trimmed to 256 bytes) for the unix socket or
/// named pipe.
/// @param prot The rpc protocol to be used
void server_start(char *endpoint, ChannelProtocol prot);
void server_start(char *endpoint);
/// Stops listening on the address specified by `endpoint`.
///