mirror of
https://github.com/neovim/neovim.git
synced 2024-12-31 17:13:26 -07:00
rstream: Pass read count to read events
This is necessary to keep events in the same order received from the OS.
This commit is contained in:
parent
166d8c799f
commit
6b3cd381dc
@ -21176,38 +21176,43 @@ static inline void process_job_event(TerminalJobData *data, ufunc_T *callback,
|
||||
on_job_event(&event_data);
|
||||
}
|
||||
|
||||
static void on_job_stdout(Stream *stream, RBuffer *buf, void *job, bool eof)
|
||||
static void on_job_stdout(Stream *stream, RBuffer *buf, size_t count,
|
||||
void *job, bool eof)
|
||||
{
|
||||
TerminalJobData *data = job;
|
||||
on_job_output(stream, job, buf, eof, data->on_stdout, "stdout");
|
||||
on_job_output(stream, job, buf, count, eof, data->on_stdout, "stdout");
|
||||
}
|
||||
|
||||
static void on_job_stderr(Stream *stream, RBuffer *buf, void *job, bool eof)
|
||||
static void on_job_stderr(Stream *stream, RBuffer *buf, size_t count,
|
||||
void *job, bool eof)
|
||||
{
|
||||
TerminalJobData *data = job;
|
||||
on_job_output(stream, job, buf, eof, data->on_stderr, "stderr");
|
||||
on_job_output(stream, job, buf, count, eof, data->on_stderr, "stderr");
|
||||
}
|
||||
|
||||
static void on_job_output(Stream *stream, TerminalJobData *data, RBuffer *buf,
|
||||
bool eof, ufunc_T *callback, const char *type)
|
||||
size_t count, bool eof, ufunc_T *callback, const char *type)
|
||||
{
|
||||
if (eof) {
|
||||
return;
|
||||
}
|
||||
|
||||
RBUFFER_UNTIL_EMPTY(buf, ptr, len) {
|
||||
// The order here matters, the terminal must receive the data first because
|
||||
// process_job_event will modify the read buffer(convert NULs into NLs)
|
||||
if (data->term) {
|
||||
terminal_receive(data->term, ptr, len);
|
||||
}
|
||||
// stub variable, to keep reading consistent with the order of events, only
|
||||
// consider the count parameter.
|
||||
size_t r;
|
||||
char *ptr = rbuffer_read_ptr(buf, &r);
|
||||
|
||||
if (callback) {
|
||||
process_job_event(data, callback, type, ptr, len, 0);
|
||||
}
|
||||
|
||||
rbuffer_consumed(buf, len);
|
||||
// The order here matters, the terminal must receive the data first because
|
||||
// process_job_event will modify the read buffer(convert NULs into NLs)
|
||||
if (data->term) {
|
||||
terminal_receive(data->term, ptr, count);
|
||||
}
|
||||
|
||||
if (callback) {
|
||||
process_job_event(data, callback, type, ptr, count, 0);
|
||||
}
|
||||
|
||||
rbuffer_consumed(buf, count);
|
||||
}
|
||||
|
||||
static void on_process_exit(Process *proc, int status, void *d)
|
||||
|
@ -114,7 +114,7 @@ static void read_cb(uv_stream_t *uvstream, ssize_t cnt, const uv_buf_t *buf)
|
||||
// Read error or EOF, either way stop the stream and invoke the callback
|
||||
// with eof == true
|
||||
uv_read_stop(uvstream);
|
||||
invoke_read_cb(stream, true);
|
||||
invoke_read_cb(stream, 0, true);
|
||||
}
|
||||
return;
|
||||
}
|
||||
@ -124,7 +124,7 @@ static void read_cb(uv_stream_t *uvstream, ssize_t cnt, const uv_buf_t *buf)
|
||||
// Data was already written, so all we need is to update 'wpos' to reflect
|
||||
// the space actually used in the buffer.
|
||||
rbuffer_produced(stream->buffer, nread);
|
||||
invoke_read_cb(stream, false);
|
||||
invoke_read_cb(stream, nread, false);
|
||||
}
|
||||
|
||||
// Called by the by the 'idle' handle to emulate a reading event
|
||||
@ -158,7 +158,7 @@ static void fread_idle_cb(uv_idle_t *handle)
|
||||
|
||||
if (req.result <= 0) {
|
||||
uv_idle_stop(&stream->uv.idle);
|
||||
invoke_read_cb(stream, true);
|
||||
invoke_read_cb(stream, 0, true);
|
||||
return;
|
||||
}
|
||||
|
||||
@ -166,19 +166,21 @@ static void fread_idle_cb(uv_idle_t *handle)
|
||||
size_t nread = (size_t) req.result;
|
||||
rbuffer_produced(stream->buffer, nread);
|
||||
stream->fpos += nread;
|
||||
invoke_read_cb(stream, false);
|
||||
invoke_read_cb(stream, nread, false);
|
||||
}
|
||||
|
||||
static void read_event(void **argv)
|
||||
{
|
||||
Stream *stream = argv[0];
|
||||
if (stream->read_cb) {
|
||||
bool eof = (uintptr_t)argv[1];
|
||||
stream->read_cb(stream, stream->buffer, stream->data, eof);
|
||||
size_t count = (uintptr_t)argv[1];
|
||||
bool eof = (uintptr_t)argv[2];
|
||||
stream->read_cb(stream, stream->buffer, count, stream->data, eof);
|
||||
}
|
||||
}
|
||||
|
||||
static void invoke_read_cb(Stream *stream, bool eof)
|
||||
static void invoke_read_cb(Stream *stream, size_t count, bool eof)
|
||||
{
|
||||
CREATE_EVENT(stream->events, read_event, 2, stream, (void *)(uintptr_t)eof);
|
||||
CREATE_EVENT(stream->events, read_event, 3, stream,
|
||||
(void *)(uintptr_t *)count, (void *)(uintptr_t)eof);
|
||||
}
|
||||
|
@ -14,10 +14,14 @@ typedef struct stream Stream;
|
||||
///
|
||||
/// @param stream The Stream instance
|
||||
/// @param rbuffer The associated RBuffer instance
|
||||
/// @param count Number of bytes to read. This must be respected if keeping
|
||||
/// the order of events is a requirement. This is because events
|
||||
/// may be queued and only processed later when more data is copied
|
||||
/// into to the buffer, so one read may starve another.
|
||||
/// @param data User-defined data
|
||||
/// @param eof If the stream reached EOF.
|
||||
typedef void (*stream_read_cb)(Stream *stream, RBuffer *buf, void *data,
|
||||
bool eof);
|
||||
typedef void (*stream_read_cb)(Stream *stream, RBuffer *buf, size_t count,
|
||||
void *data, bool eof);
|
||||
|
||||
/// Type of function called when the Stream has information about a write
|
||||
/// request.
|
||||
|
@ -328,7 +328,8 @@ static void channel_from_stdio(void)
|
||||
wstream_init_fd(&loop, &channel->data.std.out, 1, 0, NULL);
|
||||
}
|
||||
|
||||
static void forward_stderr(Stream *stream, RBuffer *rbuf, void *data, bool eof)
|
||||
static void forward_stderr(Stream *stream, RBuffer *rbuf, size_t count,
|
||||
void *data, bool eof)
|
||||
{
|
||||
while (rbuffer_size(rbuf)) {
|
||||
char buf[256];
|
||||
@ -343,7 +344,8 @@ static void process_exit(Process *proc, int status, void *data)
|
||||
decref(data);
|
||||
}
|
||||
|
||||
static void parse_msgpack(Stream *stream, RBuffer *rbuf, void *data, bool eof)
|
||||
static void parse_msgpack(Stream *stream, RBuffer *rbuf, size_t c, void *data,
|
||||
bool eof)
|
||||
{
|
||||
Channel *channel = data;
|
||||
incref(channel);
|
||||
|
@ -316,7 +316,8 @@ static InbufPollResult inbuf_poll(int ms)
|
||||
return input_eof ? kInputEof : kInputNone;
|
||||
}
|
||||
|
||||
static void read_cb(Stream *stream, RBuffer *buf, void *data, bool at_eof)
|
||||
static void read_cb(Stream *stream, RBuffer *buf, size_t c, void *data,
|
||||
bool at_eof)
|
||||
{
|
||||
if (at_eof) {
|
||||
input_eof = true;
|
||||
|
@ -298,7 +298,8 @@ static void dynamic_buffer_ensure(DynamicBuffer *buf, size_t desired)
|
||||
buf->data = xrealloc(buf->data, buf->cap);
|
||||
}
|
||||
|
||||
static void system_data_cb(Stream *stream, RBuffer *buf, void *data, bool eof)
|
||||
static void system_data_cb(Stream *stream, RBuffer *buf, size_t count,
|
||||
void *data, bool eof)
|
||||
{
|
||||
DynamicBuffer *dbuf = data;
|
||||
|
||||
@ -308,7 +309,8 @@ static void system_data_cb(Stream *stream, RBuffer *buf, void *data, bool eof)
|
||||
dbuf->len += nread;
|
||||
}
|
||||
|
||||
static void out_data_cb(Stream *stream, RBuffer *buf, void *data, bool eof)
|
||||
static void out_data_cb(Stream *stream, RBuffer *buf, size_t count, void *data,
|
||||
bool eof)
|
||||
{
|
||||
size_t cnt;
|
||||
char *ptr = rbuffer_read_ptr(buf, &cnt);
|
||||
|
@ -208,7 +208,8 @@ static bool handle_forced_escape(TermInput *input)
|
||||
|
||||
static void restart_reading(void **argv);
|
||||
|
||||
static void read_cb(Stream *stream, RBuffer *buf, void *data, bool eof)
|
||||
static void read_cb(Stream *stream, RBuffer *buf, size_t c, void *data,
|
||||
bool eof)
|
||||
{
|
||||
TermInput *input = data;
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user