mirror of
https://github.com/neovim/neovim.git
synced 2024-12-20 03:05:11 -07:00
Extract reading boilerplate into rstream.c module
The `RStream` class hides the differences between files and other types of streams with a simpler, general-purpose API for performing non-blocking reads with libuv. Most of the code was adapted from input.c.
This commit is contained in:
parent
b405a64133
commit
001d05541b
@ -6,6 +6,9 @@ src/os/event_defs.h
|
||||
src/os/event.h
|
||||
src/os/input.c
|
||||
src/os/input.h
|
||||
src/os/rstream.c
|
||||
src/os/rstream_defs.h
|
||||
src/os/rstream.h
|
||||
src/os/job.c
|
||||
src/os/job_defs.h
|
||||
src/os/job.h
|
||||
|
232
src/os/rstream.c
Normal file
232
src/os/rstream.c
Normal file
@ -0,0 +1,232 @@
|
||||
#include <stdint.h>
|
||||
#include <stdbool.h>
|
||||
#include <stdlib.h>
|
||||
|
||||
#include <uv.h>
|
||||
|
||||
#include "os/rstream_defs.h"
|
||||
#include "os/rstream.h"
|
||||
#include "vim.h"
|
||||
#include "memory.h"
|
||||
|
||||
struct rstream {
|
||||
uv_buf_t uvbuf;
|
||||
void *data;
|
||||
char *buffer;
|
||||
uv_stream_t *stream;
|
||||
uv_idle_t *fread_idle;
|
||||
uv_handle_type file_type;
|
||||
uv_file fd;
|
||||
rstream_cb cb;
|
||||
uint32_t buffer_size, rpos, wpos, fpos;
|
||||
bool reading, free_handle;
|
||||
};
|
||||
|
||||
// Callbacks used by libuv
|
||||
static void alloc_cb(uv_handle_t *, size_t, uv_buf_t *);
|
||||
static void read_cb(uv_stream_t *, ssize_t, const uv_buf_t *);
|
||||
static void fread_idle_cb(uv_idle_t *);
|
||||
|
||||
RStream * rstream_new(rstream_cb cb, uint32_t buffer_size, void *data)
|
||||
{
|
||||
RStream *rv = xmalloc(sizeof(RStream));
|
||||
rv->buffer = xmalloc(buffer_size);
|
||||
rv->buffer_size = buffer_size;
|
||||
rv->data = data;
|
||||
rv->cb = cb;
|
||||
rv->rpos = rv->wpos = rv->fpos = 0;
|
||||
rv->stream = NULL;
|
||||
rv->fread_idle = NULL;
|
||||
rv->free_handle = false;
|
||||
|
||||
return rv;
|
||||
}
|
||||
|
||||
void rstream_free(RStream *rstream)
|
||||
{
|
||||
if (rstream->free_handle) {
|
||||
if (rstream->fread_idle != NULL) {
|
||||
uv_close((uv_handle_t *)rstream->fread_idle, NULL);
|
||||
free(rstream->fread_idle);
|
||||
} else {
|
||||
uv_close((uv_handle_t *)rstream->stream, NULL);
|
||||
free(rstream->stream);
|
||||
}
|
||||
}
|
||||
|
||||
free(rstream->buffer);
|
||||
free(rstream);
|
||||
}
|
||||
|
||||
void rstream_set_stream(RStream *rstream, uv_stream_t *stream)
|
||||
{
|
||||
stream->data = rstream;
|
||||
rstream->stream = stream;
|
||||
}
|
||||
|
||||
void rstream_set_file(RStream *rstream, uv_file file)
|
||||
{
|
||||
rstream->file_type = uv_guess_handle(file);
|
||||
|
||||
if (rstream->free_handle) {
|
||||
// If this is the second time we're calling this function, free the
|
||||
// previously allocated memory
|
||||
if (rstream->fread_idle != NULL) {
|
||||
uv_close((uv_handle_t *)rstream->fread_idle, NULL);
|
||||
free(rstream->fread_idle);
|
||||
} else {
|
||||
uv_close((uv_handle_t *)rstream->stream, NULL);
|
||||
free(rstream->stream);
|
||||
}
|
||||
}
|
||||
|
||||
if (rstream->file_type == UV_FILE) {
|
||||
// Non-blocking file reads are simulated with a idle handle that reads
|
||||
// in chunks of rstream->buffer_size, giving time for other events to
|
||||
// be processed between reads.
|
||||
rstream->fread_idle = xmalloc(sizeof(uv_idle_t));
|
||||
uv_idle_init(uv_default_loop(), rstream->fread_idle);
|
||||
rstream->fread_idle->data = rstream;
|
||||
} else {
|
||||
// Only pipes are supported for now
|
||||
assert(rstream->file_type == UV_NAMED_PIPE
|
||||
|| rstream->file_type == UV_TTY);
|
||||
rstream->stream = xmalloc(sizeof(uv_pipe_t));
|
||||
uv_pipe_init(uv_default_loop(), (uv_pipe_t *)rstream->stream, 0);
|
||||
uv_pipe_open((uv_pipe_t *)rstream->stream, file);
|
||||
rstream->stream->data = rstream;
|
||||
}
|
||||
|
||||
rstream->fd = file;
|
||||
rstream->free_handle = true;
|
||||
}
|
||||
|
||||
bool rstream_is_regular_file(RStream *rstream)
|
||||
{
|
||||
return rstream->file_type == UV_FILE;
|
||||
}
|
||||
|
||||
void rstream_start(RStream *rstream)
|
||||
{
|
||||
if (rstream->file_type == UV_FILE) {
|
||||
uv_idle_start(rstream->fread_idle, fread_idle_cb);
|
||||
} else {
|
||||
rstream->reading = false;
|
||||
uv_read_start(rstream->stream, alloc_cb, read_cb);
|
||||
}
|
||||
}
|
||||
|
||||
void rstream_stop(RStream *rstream)
|
||||
{
|
||||
if (rstream->file_type == UV_FILE) {
|
||||
uv_idle_stop(rstream->fread_idle);
|
||||
} else {
|
||||
uv_read_stop(rstream->stream);
|
||||
}
|
||||
}
|
||||
|
||||
uint32_t rstream_read(RStream *rstream, char *buf, uint32_t count)
|
||||
{
|
||||
uint32_t read_count = rstream->wpos - rstream->rpos;
|
||||
|
||||
if (count < read_count) {
|
||||
read_count = count;
|
||||
}
|
||||
|
||||
if (read_count > 0) {
|
||||
memcpy(buf, rstream->buffer + rstream->rpos, read_count);
|
||||
rstream->rpos += read_count;
|
||||
}
|
||||
|
||||
if (rstream->wpos == rstream->buffer_size) {
|
||||
// `wpos` is at the end of the buffer, so free some space by moving unread
|
||||
// data...
|
||||
memmove(
|
||||
rstream->buffer, // ...To the beginning of the buffer(rpos 0)
|
||||
rstream->buffer + rstream->rpos, // ...From the first unread position
|
||||
rstream->wpos - rstream->rpos); // ...By the number of unread bytes
|
||||
rstream->wpos -= rstream->rpos;
|
||||
rstream->rpos = 0;
|
||||
}
|
||||
|
||||
return read_count;
|
||||
}
|
||||
|
||||
uint32_t rstream_available(RStream *rstream)
|
||||
{
|
||||
return rstream->wpos - rstream->rpos;
|
||||
}
|
||||
|
||||
// Called by libuv to allocate memory for reading.
|
||||
static void alloc_cb(uv_handle_t *handle, size_t suggested, uv_buf_t *buf)
|
||||
{
|
||||
RStream *rstream = handle->data;
|
||||
|
||||
if (rstream->reading) {
|
||||
buf->len = 0;
|
||||
return;
|
||||
}
|
||||
|
||||
buf->base = rstream->buffer + rstream->wpos;
|
||||
buf->len = rstream->buffer_size - rstream->wpos;
|
||||
// Avoid `alloc_cb`, `alloc_cb` sequences on windows
|
||||
rstream->reading = true;
|
||||
}
|
||||
|
||||
// Callback invoked by libuv after it copies the data into the buffer provided
|
||||
// by `alloc_cb`. This is also called on EOF or when `alloc_cb` returns a
|
||||
// 0-length buffer.
|
||||
static void read_cb(uv_stream_t *stream, ssize_t cnt, const uv_buf_t *buf)
|
||||
{
|
||||
RStream *rstream = stream->data;
|
||||
|
||||
if (cnt <= 0) {
|
||||
if (cnt != UV_ENOBUFS) {
|
||||
// Read error or EOF, either way stop the stream and invoke the callback
|
||||
// with eof == true
|
||||
uv_read_stop(stream);
|
||||
rstream->cb(rstream, rstream->data, true);
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
// Data was already written, so all we need is to update 'wpos' to reflect
|
||||
// the space actually used in the buffer.
|
||||
rstream->wpos += cnt;
|
||||
// Invoke the callback passing in the number of bytes available and data
|
||||
// associated with the stream
|
||||
rstream->cb(rstream, rstream->data, false);
|
||||
rstream->reading = false;
|
||||
}
|
||||
|
||||
// Called by the by the 'idle' handle to emulate a reading event
|
||||
static void fread_idle_cb(uv_idle_t *handle)
|
||||
{
|
||||
uv_fs_t req;
|
||||
RStream *rstream = handle->data;
|
||||
|
||||
rstream->uvbuf.base = rstream->buffer + rstream->wpos;
|
||||
rstream->uvbuf.len = rstream->buffer_size - rstream->wpos;
|
||||
|
||||
// Synchronous read
|
||||
uv_fs_read(
|
||||
uv_default_loop(),
|
||||
&req,
|
||||
rstream->fd,
|
||||
&rstream->uvbuf,
|
||||
1,
|
||||
rstream->fpos,
|
||||
NULL);
|
||||
|
||||
uv_fs_req_cleanup(&req);
|
||||
|
||||
if (req.result <= 0) {
|
||||
uv_idle_stop(rstream->fread_idle);
|
||||
rstream->cb(rstream, rstream->data, true);
|
||||
return;
|
||||
}
|
||||
|
||||
rstream->wpos += req.result;
|
||||
rstream->fpos += req.result;
|
||||
rstream->cb(rstream, rstream->data, false);
|
||||
}
|
75
src/os/rstream.h
Normal file
75
src/os/rstream.h
Normal file
@ -0,0 +1,75 @@
|
||||
#ifndef NEOVIM_OS_RSTREAM_H
|
||||
#define NEOVIM_OS_RSTREAM_H
|
||||
|
||||
#include <stdbool.h>
|
||||
#include <stdint.h>
|
||||
#include <uv.h>
|
||||
|
||||
#include "os/rstream_defs.h"
|
||||
|
||||
/// Creates a new RStream instance. A RStream encapsulates all the boilerplate
|
||||
/// necessary for reading from a libuv stream.
|
||||
///
|
||||
/// @param cb A function that will be called whenever some data is available
|
||||
/// for reading with `rstream_read`
|
||||
/// @param buffer_size Size in bytes of the internal buffer.
|
||||
/// @param data Some state to associate with the `RStream` instance
|
||||
/// @return The newly-allocated `RStream` instance
|
||||
RStream * rstream_new(rstream_cb cb, uint32_t buffer_size, void *data);
|
||||
|
||||
/// Frees all memory allocated for a RStream instance
|
||||
///
|
||||
/// @param rstream The `RStream` instance
|
||||
void rstream_free(RStream *rstream);
|
||||
|
||||
/// Sets the underlying `uv_stream_t` instance
|
||||
///
|
||||
/// @param rstream The `RStream` instance
|
||||
/// @param stream The new `uv_stream_t` instance
|
||||
void rstream_set_stream(RStream *rstream, uv_stream_t *stream);
|
||||
|
||||
/// Sets the underlying `uv_file_t` instance
|
||||
///
|
||||
/// @param rstream The `RStream` instance
|
||||
/// @param stream The new `uv_stream_t` instance
|
||||
void rstream_set_stream(RStream *rstream, uv_stream_t *stream);
|
||||
|
||||
/// Sets the underlying file descriptor that will be read from. Only pipes
|
||||
/// and regular files are supported for now.
|
||||
///
|
||||
/// @param rstream The `RStream` instance
|
||||
/// @param file The file descriptor
|
||||
void rstream_set_file(RStream *rstream, uv_file file);
|
||||
|
||||
/// Tests if the stream is backed by a regular file
|
||||
///
|
||||
/// @param rstream The `RStream` instance
|
||||
/// @return True if the underlying file descriptor represents a regular file
|
||||
bool rstream_is_regular_file(RStream *rstream);
|
||||
|
||||
/// Starts watching for events from a `RStream` instance.
|
||||
///
|
||||
/// @param rstream The `RStream` instance
|
||||
void rstream_start(RStream *rstream);
|
||||
|
||||
/// Stops watching for events from a `RStream` instance.
|
||||
///
|
||||
/// @param rstream The `RStream` instance
|
||||
void rstream_stop(RStream *rstream);
|
||||
|
||||
/// Reads data from a `RStream` instance into a buffer.
|
||||
///
|
||||
/// @param rstream The `RStream` instance
|
||||
/// @param buffer The buffer which will receive the data
|
||||
/// @param count Number of bytes that `buffer` can accept
|
||||
/// @return The number of bytes copied into `buffer`
|
||||
uint32_t rstream_read(RStream *rstream, char *buffer, uint32_t count);
|
||||
|
||||
/// Returns the number of bytes available for reading from `rstream`
|
||||
///
|
||||
/// @param rstream The `RStream` instance
|
||||
/// @return The number of bytes available
|
||||
uint32_t rstream_available(RStream *rstream);
|
||||
|
||||
#endif // NEOVIM_OS_RSTREAM_H
|
||||
|
14
src/os/rstream_defs.h
Normal file
14
src/os/rstream_defs.h
Normal file
@ -0,0 +1,14 @@
|
||||
#ifndef NEOVIM_OS_RSTREAM_DEFS_H
|
||||
#define NEOVIM_OS_RSTREAM_DEFS_H
|
||||
|
||||
typedef struct rstream RStream;
|
||||
|
||||
/// Function called when the RStream receives data
|
||||
///
|
||||
/// @param rstream The RStream instance
|
||||
/// @param data State associated with the RStream instance
|
||||
/// @param eof If the stream reached EOF.
|
||||
typedef void (*rstream_cb)(RStream *rstream, void *data, bool eof);
|
||||
|
||||
#endif // NEOVIM_OS_RSTREAM_DEFS_H
|
||||
|
Loading…
Reference in New Issue
Block a user