job: Replace by a better process abstraction layer

- New libuv/pty process abstraction with simplified API and no globals.
- Remove nvim/os/job*. Jobs are now a concept that apply only to programs
  spawned by vimscript job* functions.
- Refactor shell.c/channel.c to use the new module, which brings a number of
  advantages:
  - Simplified API, less code
  - No slots in the user job table are used
  - Not possible to acidentally receive data from vimscript
- Implement job table in eval.c, which is now a hash table with unilimited job
  slots and unique job ids.
This commit is contained in:
Thiago de Arruda 2015-07-17 00:32:07 -03:00
parent 9d8d2b7fa8
commit aa9cb48bf0
21 changed files with 791 additions and 1016 deletions

View File

@ -55,6 +55,7 @@
#include "nvim/misc1.h"
#include "nvim/misc2.h"
#include "nvim/keymap.h"
#include "nvim/map.h"
#include "nvim/file_search.h"
#include "nvim/garray.h"
#include "nvim/move.h"
@ -82,8 +83,10 @@
#include "nvim/version.h"
#include "nvim/window.h"
#include "nvim/os/os.h"
#include "nvim/os/job.h"
#include "nvim/event/uv_process.h"
#include "nvim/event/pty_process.h"
#include "nvim/event/rstream.h"
#include "nvim/event/wstream.h"
#include "nvim/os/time.h"
#include "nvim/msgpack_rpc/channel.h"
#include "nvim/msgpack_rpc/server.h"
@ -442,14 +445,19 @@ static dictitem_T vimvars_var; /* variable used for v: */
#define vimvarht vimvardict.dv_hashtab
typedef struct {
Job *job;
union {
UvProcess uv;
PtyProcess pty;
} proc;
Stream in, out, err;
Terminal *term;
bool stopped;
bool exited;
bool stdin_closed;
int refcount;
ufunc_T *on_stdout, *on_stderr, *on_exit;
dict_T *self;
int *status_ptr;
uint64_t id;
} TerminalJobData;
@ -462,7 +470,6 @@ typedef struct {
valid character */
// Memory pool for reusing JobEvent structures
typedef struct {
int job_id;
TerminalJobData *data;
ufunc_T *callback;
const char *type;
@ -470,12 +477,15 @@ typedef struct {
int status;
} JobEvent;
static int disable_job_defer = 0;
static uint64_t current_job_id = 1;
static PMap(uint64_t) *jobs = NULL;
/*
* Initialize the global and v: variables.
*/
void eval_init(void)
{
jobs = pmap_new(uint64_t)();
int i;
struct vimvar *p;
@ -10692,29 +10702,27 @@ static void f_jobclose(typval_T *argvars, typval_T *rettv)
return;
}
Job *job = job_find(argvars[0].vval.v_number);
if (!is_user_job(job)) {
// Invalid job id
TerminalJobData *data = pmap_get(uint64_t)(jobs, argvars[0].vval.v_number);
if (!data) {
EMSG(_(e_invjob));
return;
}
Process *proc = (Process *)&data->proc;
if (argvars[1].v_type == VAR_STRING) {
char *stream = (char *)argvars[1].vval.v_string;
if (!strcmp(stream, "stdin")) {
job_close_in(job);
((TerminalJobData *)job_data(job))->stdin_closed = true;
process_close_in(proc);
} else if (!strcmp(stream, "stdout")) {
job_close_out(job);
process_close_out(proc);
} else if (!strcmp(stream, "stderr")) {
job_close_err(job);
process_close_err(proc);
} else {
EMSG2(_("Invalid job stream \"%s\""), stream);
}
} else {
((TerminalJobData *)job_data(job))->stdin_closed = true;
job_close_streams(job);
process_close_streams(proc);
}
}
@ -10735,15 +10743,13 @@ static void f_jobsend(typval_T *argvars, typval_T *rettv)
return;
}
Job *job = job_find(argvars[0].vval.v_number);
if (!is_user_job(job)) {
// Invalid job id
TerminalJobData *data = pmap_get(uint64_t)(jobs, argvars[0].vval.v_number);
if (!data) {
EMSG(_(e_invjob));
return;
}
if (((TerminalJobData *)job_data(job))->stdin_closed) {
if (((Process *)&data->proc)->in->closed) {
EMSG(_("Can't send data to the job: stdin is closed"));
return;
}
@ -10757,7 +10763,7 @@ static void f_jobsend(typval_T *argvars, typval_T *rettv)
}
WBuffer *buf = wstream_new_buffer(input, input_len, 1, xfree);
rettv->vval.v_number = job_write(job, buf);
rettv->vval.v_number = wstream_write(data->proc.uv.process.in, buf);
}
// "jobresize(job, width, height)" function
@ -10777,19 +10783,20 @@ static void f_jobresize(typval_T *argvars, typval_T *rettv)
return;
}
Job *job = job_find(argvars[0].vval.v_number);
if (!is_user_job(job)) {
// Probably an invalid job id
TerminalJobData *data = pmap_get(uint64_t)(jobs, argvars[0].vval.v_number);
if (!data) {
EMSG(_(e_invjob));
return;
}
if (!job_resize(job, argvars[1].vval.v_number, argvars[2].vval.v_number)) {
if (data->proc.uv.process.type != kProcessTypePty) {
EMSG(_(e_jobnotpty));
return;
}
pty_process_resize(&data->proc.pty, argvars[1].vval.v_number,
argvars[2].vval.v_number);
rettv->vval.v_number = 1;
}
@ -10878,37 +10885,33 @@ static void f_jobstart(typval_T *argvars, typval_T *rettv)
}
}
JobOptions opts = common_job_options(argv, on_stdout, on_stderr, on_exit,
job_opts);
bool pty = job_opts && get_dict_number(job_opts, (uint8_t *)"pty") != 0;
TerminalJobData *data = common_job_init(argv, on_stdout, on_stderr, on_exit,
job_opts, pty);
Process *proc = (Process *)&data->proc;
if (!job_opts) {
goto start;
}
opts.pty = get_dict_number(job_opts, (uint8_t *)"pty");
if (opts.pty) {
if (pty) {
uint16_t width = get_dict_number(job_opts, (uint8_t *)"width");
if (width > 0) {
opts.width = width;
data->proc.pty.width = width;
}
uint16_t height = get_dict_number(job_opts, (uint8_t *)"height");
if (height > 0) {
opts.height = height;
data->proc.pty.height = height;
}
char *term = (char *)get_dict_string(job_opts, (uint8_t *)"TERM", true);
if (term) {
opts.term_name = term;
data->proc.pty.term_name = term;
}
}
start:
if (!on_stdout) {
opts.stdout_cb = NULL;
proc->out = NULL;
}
if (!on_stderr) {
opts.stderr_cb = NULL;
proc->err = NULL;
}
common_job_start(opts, rettv);
common_job_start(data, rettv);
}
// "jobstop()" function
@ -10927,14 +10930,15 @@ static void f_jobstop(typval_T *argvars, typval_T *rettv)
return;
}
Job *job = job_find(argvars[0].vval.v_number);
if (!is_user_job(job)) {
TerminalJobData *data = pmap_get(uint64_t)(jobs, argvars[0].vval.v_number);
if (!data || data->stopped) {
EMSG(_(e_invjob));
return;
}
job_stop(job);
process_stop((Process *)&data->proc);
data->stopped = true;
rettv->vval.v_number = 1;
}
@ -10971,13 +10975,11 @@ static void f_jobwait(typval_T *argvars, typval_T *rettv)
// is used to represent an invalid job id, -2 is for a interrupted job and
// -1 for jobs that were skipped or timed out.
for (listitem_T *arg = args->lv_first; arg != NULL; arg = arg->li_next) {
Job *job = NULL;
TerminalJobData *data = NULL;
if (arg->li_tv.v_type != VAR_NUMBER
|| !(job = job_find(arg->li_tv.vval.v_number))
|| !is_user_job(job)) {
|| !(data = pmap_get(uint64_t)(jobs, arg->li_tv.vval.v_number))) {
list_append_number(rv, -3);
} else {
TerminalJobData *data = job_data(job);
// append the list item and set the status pointer so we'll collect the
// status code when the job exits
list_append_number(rv, -1);
@ -10993,18 +10995,16 @@ static void f_jobwait(typval_T *argvars, typval_T *rettv)
}
for (listitem_T *arg = args->lv_first; arg != NULL; arg = arg->li_next) {
Job *job = NULL;
TerminalJobData *data = NULL;
if (remaining == 0) {
// timed out
break;
}
if (arg->li_tv.v_type != VAR_NUMBER
|| !(job = job_find(arg->li_tv.vval.v_number))
|| !is_user_job(job)) {
|| !(data = pmap_get(uint64_t)(jobs, arg->li_tv.vval.v_number))) {
continue;
}
TerminalJobData *data = job_data(job);
int status = job_wait(job, remaining);
int status = process_wait((Process *)&data->proc, remaining);
if (status < 0) {
// interrupted or timed out, skip remaining jobs.
if (status == -2) {
@ -11028,13 +11028,11 @@ static void f_jobwait(typval_T *argvars, typval_T *rettv)
loop_poll_events(&loop, 0);
for (listitem_T *arg = args->lv_first; arg != NULL; arg = arg->li_next) {
Job *job = NULL;
TerminalJobData *data = NULL;
if (arg->li_tv.v_type != VAR_NUMBER
|| !(job = job_find(arg->li_tv.vval.v_number))
|| !is_user_job(job)) {
|| !(data = pmap_get(uint64_t)(jobs, arg->li_tv.vval.v_number))) {
continue;
}
TerminalJobData *data = job_data(job);
// remove the status pointer because the list may be freed before the
// job exits
data->status_ptr = NULL;
@ -12951,7 +12949,7 @@ static void f_rpcstart(typval_T *argvars, typval_T *rettv)
// The last item of argv must be NULL
argv[i] = NULL;
uint64_t channel_id = channel_from_job(argv);
uint64_t channel_id = channel_from_process(argv);
if (!channel_id) {
EMSG(_(e_api_spawn_failed));
@ -15225,19 +15223,15 @@ static void f_termopen(typval_T *argvars, typval_T *rettv)
}
}
JobOptions opts = common_job_options(argv, on_stdout, on_stderr, on_exit,
job_opts);
opts.pty = true;
opts.width = curwin->w_width;
opts.height = curwin->w_height;
opts.term_name = xstrdup("xterm-256color");
Job *job = common_job_start(opts, rettv);
if (!job) {
shell_free_argv(argv);
TerminalJobData *data = common_job_init(argv, on_stdout, on_stderr, on_exit,
job_opts, true);
data->proc.pty.width = curwin->w_width;
data->proc.pty.height = curwin->w_height;
data->proc.pty.term_name = xstrdup("xterm-256color");
if (!common_job_start(data, rettv)) {
return;
}
TerminalJobData *data = opts.data;
TerminalOptions topts = TERMINAL_OPTIONS_INIT;
TerminalOptions topts;
topts.data = data;
topts.width = curwin->w_width;
topts.height = curwin->w_height;
@ -15250,7 +15244,7 @@ static void f_termopen(typval_T *argvars, typval_T *rettv)
&& os_isdir(argvars[1].vval.v_string)) {
cwd = (char *)argvars[1].vval.v_string;
}
int pid = job_pid(job);
int pid = data->proc.pty.process.pid;
// Get the desired name of the buffer.
char *name = job_opts ?
@ -20222,21 +20216,27 @@ char_u *do_string_sub(char_u *str, char_u *pat, char_u *sub, char_u *flags)
return ret;
}
static inline JobOptions common_job_options(char **argv, ufunc_T *on_stdout,
ufunc_T *on_stderr, ufunc_T *on_exit, dict_T *self)
static inline TerminalJobData *common_job_init(char **argv, ufunc_T *on_stdout,
ufunc_T *on_stderr, ufunc_T *on_exit, dict_T *self, bool pty)
{
TerminalJobData *data = xcalloc(1, sizeof(TerminalJobData));
data->stopped = false;
data->on_stdout = on_stdout;
data->on_stderr = on_stderr;
data->on_exit = on_exit;
data->self = self;
JobOptions opts = JOB_OPTIONS_INIT;
opts.argv = argv;
opts.data = data;
opts.stdout_cb = on_job_stdout;
opts.stderr_cb = on_job_stderr;
opts.exit_cb = on_job_exit;
return opts;
if (pty) {
data->proc.pty = pty_process_init(data);
} else {
data->proc.uv = uv_process_init(data);
}
Process *proc = (Process *)&data->proc;
proc->argv = argv;
proc->in = &data->in;
proc->out = &data->out;
proc->err = &data->err;
proc->cb = on_process_exit;
return data;
}
/// Return true/false on success/failure.
@ -20262,24 +20262,28 @@ static inline bool common_job_callbacks(dict_T *vopts, ufunc_T **on_stdout,
return false;
}
static inline Job *common_job_start(JobOptions opts, typval_T *rettv)
static inline bool common_job_start(TerminalJobData *data, typval_T *rettv)
{
TerminalJobData *data = opts.data;
data->refcount++;
Job *job = job_start(opts, &rettv->vval.v_number);
if (rettv->vval.v_number <= 0) {
if (rettv->vval.v_number == 0) {
EMSG(_(e_jobtblfull));
xfree(opts.term_name);
free_term_job_data(data);
} else {
EMSG(_(e_jobexe));
}
return NULL;
Process *proc = (Process *)&data->proc;
if (!process_spawn(&loop, proc)) {
EMSG(_(e_jobexe));
return false;
}
data->job = job;
return job;
data->id = current_job_id++;
wstream_init(proc->in, 0);
if (proc->out) {
rstream_init(proc->out, 0);
rstream_start(proc->out, on_job_stdout);
}
if (proc->err) {
rstream_init(proc->err, 0);
rstream_start(proc->err, on_job_stderr);
}
pmap_put(uint64_t)(jobs, data->id, data);
rettv->vval.v_number = data->id;
return true;
}
static inline void free_term_job_data(TerminalJobData *data) {
@ -20300,25 +20304,15 @@ static inline void free_term_job_data(TerminalJobData *data) {
xfree(data);
}
static inline bool is_user_job(Job *job)
{
if (!job) {
return false;
}
JobOptions *opts = job_opts(job);
return opts->exit_cb == on_job_exit;
}
// vimscript job callbacks must be executed on Nvim main loop
static inline void push_job_event(Job *job, ufunc_T *callback,
const char *type, char *data, size_t count, int status)
static inline void push_job_event(TerminalJobData *data, ufunc_T *callback,
const char *type, char *buf, size_t count, int status)
{
JobEvent *event_data = xmalloc(sizeof(JobEvent));
event_data->received = NULL;
if (data) {
if (buf) {
event_data->received = list_alloc();
char *ptr = data;
char *ptr = buf;
size_t remaining = count;
size_t off = 0;
@ -20342,8 +20336,7 @@ static inline void push_job_event(Job *job, ufunc_T *callback,
} else {
event_data->status = status;
}
event_data->job_id = job_id(job);
event_data->data = job_data(job);
event_data->data = data;
event_data->callback = callback;
event_data->type = type;
loop_push_event(&loop, (Event) {
@ -20354,24 +20347,23 @@ static inline void push_job_event(Job *job, ufunc_T *callback,
static void on_job_stdout(Stream *stream, RBuffer *buf, void *job, bool eof)
{
TerminalJobData *data = job_data(job);
TerminalJobData *data = job;
on_job_output(stream, job, buf, eof, data->on_stdout, "stdout");
}
static void on_job_stderr(Stream *stream, RBuffer *buf, void *job, bool eof)
{
TerminalJobData *data = job_data(job);
TerminalJobData *data = job;
on_job_output(stream, job, buf, eof, data->on_stderr, "stderr");
}
static void on_job_output(Stream *stream, Job *job, RBuffer *buf, bool eof,
ufunc_T *callback, const char *type)
static void on_job_output(Stream *stream, TerminalJobData *data, RBuffer *buf,
bool eof, ufunc_T *callback, const char *type)
{
if (eof) {
return;
}
TerminalJobData *data = job_data(job);
RBUFFER_UNTIL_EMPTY(buf, ptr, len) {
// The order here matters, the terminal must receive the data first because
// push_job_event will modify the read buffer(convert NULs into NLs)
@ -20380,17 +20372,16 @@ static void on_job_output(Stream *stream, Job *job, RBuffer *buf, bool eof,
}
if (callback) {
push_job_event(job, callback, type, ptr, len, 0);
push_job_event(data, callback, type, ptr, len, 0);
}
rbuffer_consumed(buf, len);
}
}
static void on_job_exit(Job *job, int status, void *d)
static void on_process_exit(Process *proc, int status, void *d)
{
TerminalJobData *data = d;
if (data->term && !data->exited) {
data->exited = true;
terminal_close(data->term,
@ -20401,19 +20392,20 @@ static void on_job_exit(Job *job, int status, void *d)
*data->status_ptr = status;
}
push_job_event(job, data->on_exit, "exit", NULL, 0, status);
push_job_event(data, data->on_exit, "exit", NULL, 0, status);
}
static void term_write(char *buf, size_t size, void *data)
static void term_write(char *buf, size_t size, void *d)
{
Job *job = ((TerminalJobData *)data)->job;
TerminalJobData *data = d;
WBuffer *wbuf = wstream_new_buffer(xmemdup(buf, size), size, 1, xfree);
job_write(job, wbuf);
wstream_write(&data->in, wbuf);
}
static void term_resize(uint16_t width, uint16_t height, void *data)
static void term_resize(uint16_t width, uint16_t height, void *d)
{
job_resize(((TerminalJobData *)data)->job, width, height);
TerminalJobData *data = d;
pty_process_resize(&data->proc.pty, width, height);
}
static void term_close(void *d)
@ -20421,7 +20413,7 @@ static void term_close(void *d)
TerminalJobData *data = d;
if (!data->exited) {
data->exited = true;
job_stop(data->job);
process_stop((Process *)&data->proc);
}
terminal_destroy(data->term);
term_job_data_decref(d);
@ -20448,7 +20440,7 @@ static void on_job_event(Event event)
if (argc > 0) {
argv[0].v_type = VAR_NUMBER;
argv[0].v_lock = 0;
argv[0].vval.v_number = ev->job_id;
argv[0].vval.v_number = ev->data->id;
}
if (argc > 1) {
@ -20479,6 +20471,7 @@ static void on_job_event(Event event)
end:
if (!ev->received) {
// exit event, safe to free job data now
pmap_del(uint64_t)(jobs, ev->data->id);
term_job_data_decref(ev->data);
}
xfree(ev);

View File

@ -3,6 +3,7 @@
#include <uv.h>
#include "nvim/event/loop.h"
#include "nvim/event/process.h"
#ifdef INCLUDE_GENERATED_DECLARATIONS
# include "event/loop.c.generated.h"
@ -15,6 +16,10 @@ void loop_init(Loop *loop, void *data)
loop->uv.data = loop;
loop->deferred_events = kl_init(Event);
loop->immediate_events = kl_init(Event);
loop->children = kl_init(WatcherPtr);
loop->children_stop_requests = 0;
uv_signal_init(&loop->uv, &loop->children_watcher);
uv_timer_init(&loop->uv, &loop->children_kill_timer);
}
void loop_poll_events(Loop *loop, int ms)
@ -113,6 +118,8 @@ void loop_stop(Loop *loop)
void loop_close(Loop *loop)
{
uv_close((uv_handle_t *)&loop->children_watcher, NULL);
uv_close((uv_handle_t *)&loop->children_kill_timer, NULL);
do {
uv_run(&loop->uv, UV_RUN_DEFAULT);
} while (uv_loop_close(&loop->uv));

View File

@ -26,6 +26,10 @@ typedef struct loop {
uv_loop_t uv;
klist_t(Event) *deferred_events, *immediate_events;
int deferred_events_allowed;
klist_t(WatcherPtr) *children;
uv_signal_t children_watcher;
uv_timer_t children_kill_timer;
size_t children_stop_requests;
} Loop;
// Poll for events until a condition or timeout

325
src/nvim/event/process.c Normal file
View File

@ -0,0 +1,325 @@
#include <assert.h>
#include <stdlib.h>
#include <uv.h>
#include "nvim/os/shell.h"
#include "nvim/event/loop.h"
#include "nvim/event/rstream.h"
#include "nvim/event/wstream.h"
#include "nvim/event/process.h"
#include "nvim/event/uv_process.h"
#include "nvim/event/pty_process.h"
#include "nvim/globals.h"
#include "nvim/log.h"
#ifdef INCLUDE_GENERATED_DECLARATIONS
# include "event/process.c.generated.h"
#endif
// {SIGNAL}_TIMEOUT is the time (in nanoseconds) that a process has to cleanly
// exit before we send SIGNAL to it
#define TERM_TIMEOUT 1000000000
#define KILL_TIMEOUT (TERM_TIMEOUT * 2)
#define CLOSE_PROC_STREAM(proc, stream) \
do { \
if (proc->stream && !proc->stream->closed) { \
stream_close(proc->stream, NULL); \
} \
} while (0)
bool process_spawn(Loop *loop, Process *proc) FUNC_ATTR_NONNULL_ALL
{
proc->loop = loop;
if (proc->in) {
uv_pipe_init(&loop->uv, &proc->in->uv.pipe, 0);
}
if (proc->out) {
uv_pipe_init(&loop->uv, &proc->out->uv.pipe, 0);
}
if (proc->err) {
uv_pipe_init(&loop->uv, &proc->err->uv.pipe, 0);
}
bool success;
switch (proc->type) {
case kProcessTypeUv:
success = uv_process_spawn((UvProcess *)proc);
break;
case kProcessTypePty:
success = pty_process_spawn((PtyProcess *)proc);
break;
default:
abort();
}
if (!success) {
if (proc->in) {
uv_close((uv_handle_t *)&proc->in->uv.pipe, NULL);
}
if (proc->out) {
uv_close((uv_handle_t *)&proc->out->uv.pipe, NULL);
}
if (proc->err) {
uv_close((uv_handle_t *)&proc->err->uv.pipe, NULL);
}
process_close(proc);
shell_free_argv(proc->argv);
proc->status = -1;
return false;
}
void *data = proc->data;
if (proc->in) {
stream_init(NULL, proc->in, -1, (uv_stream_t *)&proc->in->uv.pipe, data);
proc->in->internal_data = proc;
proc->in->internal_close_cb = on_process_stream_close;
proc->refcount++;
}
if (proc->out) {
stream_init(NULL, proc->out, -1, (uv_stream_t *)&proc->out->uv.pipe, data);
proc->out->internal_data = proc;
proc->out->internal_close_cb = on_process_stream_close;
proc->refcount++;
}
if (proc->err) {
stream_init(NULL, proc->err, -1, (uv_stream_t *)&proc->err->uv.pipe, data);
proc->err->internal_data = proc;
proc->err->internal_close_cb = on_process_stream_close;
proc->refcount++;
}
proc->internal_exit_cb = on_process_exit;
proc->internal_close_cb = decref;
proc->refcount++;
kl_push(WatcherPtr, loop->children, proc);
return true;
}
void process_teardown(Loop *loop) FUNC_ATTR_NONNULL_ALL
{
kl_iter(WatcherPtr, loop->children, current) {
Process *proc = (*current)->data;
uv_kill(proc->pid, SIGTERM);
proc->term_sent = true;
process_stop(proc);
}
// Wait until all children exit
LOOP_POLL_EVENTS_UNTIL(loop, -1, kl_empty(loop->children));
pty_process_teardown(loop);
}
// Wrappers around `stream_close` that protect against double-closing.
void process_close_streams(Process *proc) FUNC_ATTR_NONNULL_ALL
{
process_close_in(proc);
process_close_out(proc);
process_close_err(proc);
}
void process_close_in(Process *proc) FUNC_ATTR_NONNULL_ALL
{
CLOSE_PROC_STREAM(proc, in);
}
void process_close_out(Process *proc) FUNC_ATTR_NONNULL_ALL
{
CLOSE_PROC_STREAM(proc, out);
}
void process_close_err(Process *proc) FUNC_ATTR_NONNULL_ALL
{
CLOSE_PROC_STREAM(proc, err);
}
/// Synchronously wait for a process to finish
///
/// @param process The Process instance
/// @param ms Number of milliseconds to wait, 0 for not waiting, -1 for
/// waiting until the process quits.
/// @return returns the status code of the exited process. -1 if the process is
/// still running and the `timeout` has expired. Note that this is
/// indistinguishable from the process returning -1 by itself. Which
/// is possible on some OS. Returns -2 if an user has interruped the
/// wait.
int process_wait(Process *proc, int ms) FUNC_ATTR_NONNULL_ALL
{
// The default status is -1, which represents a timeout
int status = -1;
bool interrupted = false;
// Increase refcount to stop the exit callback from being called(and possibly
// being freed) before we have a chance to get the status.
proc->refcount++;
LOOP_POLL_EVENTS_UNTIL(proc->loop, ms,
// Until...
got_int || // interrupted by the user
proc->refcount == 1); // job exited
// we'll assume that a user frantically hitting interrupt doesn't like
// the current job. Signal that it has to be killed.
if (got_int) {
interrupted = true;
got_int = false;
process_stop(proc);
if (ms == -1) {
// We can only return, if all streams/handles are closed and the job
// exited.
LOOP_POLL_EVENTS_UNTIL(proc->loop, -1, proc->refcount == 1);
} else {
loop_poll_events(proc->loop, 0);
}
}
if (proc->refcount == 1) {
// Job exited, collect status and manually invoke close_cb to free the job
// resources
status = interrupted ? -2 : proc->status;
decref(proc);
} else {
proc->refcount--;
}
return status;
}
/// Ask a process to terminate and eventually kill if it doesn't respond
void process_stop(Process *proc) FUNC_ATTR_NONNULL_ALL
{
if (proc->stopped_time) {
return;
}
proc->stopped_time = os_hrtime();
switch (proc->type) {
case kProcessTypeUv:
// Close the process's stdin. If the process doesn't close its own
// stdout/stderr, they will be closed when it exits(possibly due to being
// terminated after a timeout)
process_close_in(proc);
break;
case kProcessTypePty:
// close all streams for pty processes to send SIGHUP to the process
process_close_streams(proc);
pty_process_close_master((PtyProcess *)proc);
break;
default:
abort();
}
Loop *loop = proc->loop;
if (!loop->children_stop_requests++) {
// When there's at least one stop request pending, start a timer that
// will periodically check if a signal should be send to a to the job
DLOG("Starting job kill timer");
uv_timer_start(&loop->children_kill_timer, children_kill_cb, 100, 100);
}
}
/// Iterates the process list sending SIGTERM to stopped processes and SIGKILL
/// to those that didn't die from SIGTERM after a while(exit_timeout is 0).
static void children_kill_cb(uv_timer_t *handle)
{
Loop *loop = handle->loop->data;
uint64_t now = os_hrtime();
kl_iter(WatcherPtr, loop->children, current) {
Process *proc = (*current)->data;
if (!proc->stopped_time) {
continue;
}
uint64_t elapsed = now - proc->stopped_time;
if (!proc->term_sent && elapsed >= TERM_TIMEOUT) {
ILOG("Sending SIGTERM to pid %d", proc->pid);
uv_kill(proc->pid, SIGTERM);
proc->term_sent = true;
} else if (elapsed >= KILL_TIMEOUT) {
ILOG("Sending SIGKILL to pid %d", proc->pid);
uv_kill(proc->pid, SIGKILL);
}
}
}
static void decref(Process *proc)
{
if (--proc->refcount != 0) {
return;
}
Loop *loop = proc->loop;
kliter_t(WatcherPtr) **node = NULL;
kl_iter(WatcherPtr, loop->children, current) {
if ((*current)->data == proc) {
node = current;
break;
}
}
assert(node);
kl_shift_at(WatcherPtr, loop->children, node);
shell_free_argv(proc->argv);
if (proc->type == kProcessTypePty) {
xfree(((PtyProcess *)proc)->term_name);
}
if (proc->cb) {
proc->cb(proc, proc->status, proc->data);
}
}
static void process_close(Process *proc)
FUNC_ATTR_NONNULL_ARG(1)
{
assert(!proc->closed);
proc->closed = true;
switch (proc->type) {
case kProcessTypeUv:
uv_process_close((UvProcess *)proc);
break;
case kProcessTypePty:
pty_process_close((PtyProcess *)proc);
break;
default:
abort();
}
}
static void on_process_exit(Process *proc)
{
if (exiting) {
on_process_exit_event((Event) {.data = proc});
} else {
loop_push_event(proc->loop,
(Event) {.handler = on_process_exit_event, .data = proc}, false);
}
Loop *loop = proc->loop;
if (loop->children_stop_requests && !--loop->children_stop_requests) {
// Stop the timer if no more stop requests are pending
DLOG("Stopping process kill timer");
uv_timer_stop(&loop->children_kill_timer);
}
}
static void on_process_exit_event(Event event)
{
Process *proc = event.data;
process_close_streams(proc);
process_close(proc);
}
static void on_process_stream_close(Stream *stream, void *data)
{
Process *proc = data;
decref(proc);
}

56
src/nvim/event/process.h Normal file
View File

@ -0,0 +1,56 @@
#ifndef NVIM_EVENT_PROCESS_H
#define NVIM_EVENT_PROCESS_H
#include "nvim/event/loop.h"
#include "nvim/event/rstream.h"
#include "nvim/event/wstream.h"
typedef enum {
kProcessTypeUv,
kProcessTypePty
} ProcessType;
typedef struct process Process;
typedef void (*process_exit_cb)(Process *proc, int status, void *data);
typedef void (*internal_process_cb)(Process *proc);
struct process {
ProcessType type;
Loop *loop;
void *data;
int pid, status, refcount;
// set to the hrtime of when process_stop was called for the process.
uint64_t stopped_time;
char **argv;
Stream *in, *out, *err;
process_exit_cb cb;
internal_process_cb internal_exit_cb, internal_close_cb;
bool closed, term_sent;
};
static inline Process process_init(ProcessType type, void *data)
{
return (Process) {
.type = type,
.data = data,
.loop = NULL,
.pid = 0,
.status = 0,
.refcount = 0,
.stopped_time = 0,
.argv = NULL,
.in = NULL,
.out = NULL,
.err = NULL,
.cb = NULL,
.closed = false,
.term_sent = false,
.internal_close_cb = NULL,
.internal_exit_cb = NULL
};
}
#ifdef INCLUDE_GENERATED_DECLARATIONS
# include "event/process.h.generated.h"
#endif
#endif // NVIM_EVENT_PROCESS_H

View File

@ -20,58 +20,39 @@
#include <uv.h>
#include "nvim/func_attr.h"
#include "nvim/os/job.h"
#include "nvim/os/job_defs.h"
#include "nvim/os/job_private.h"
#include "nvim/os/pty_process.h"
#include "nvim/memory.h"
#include "nvim/vim.h"
#include "nvim/globals.h"
#include "nvim/lib/klist.h"
#include "nvim/event/loop.h"
#include "nvim/event/rstream.h"
#include "nvim/event/wstream.h"
#include "nvim/event/process.h"
#include "nvim/event/pty_process.h"
#include "nvim/log.h"
#ifdef INCLUDE_GENERATED_DECLARATIONS
# include "os/pty_process.c.generated.h"
# include "event/pty_process.c.generated.h"
#endif
static const unsigned int KILL_RETRIES = 5;
static const unsigned int KILL_TIMEOUT = 2; // seconds
bool pty_process_spawn(Job *job) FUNC_ATTR_NONNULL_ALL
bool pty_process_spawn(PtyProcess *ptyproc)
FUNC_ATTR_NONNULL_ALL
{
PtyProcess *ptyproc = &job->process.pty;
ptyproc->tty_fd = -1;
if (job->opts.writable) {
uv_pipe_init(&loop.uv, &ptyproc->proc_stdin, 0);
ptyproc->proc_stdin.data = NULL;
}
if (job->opts.stdout_cb) {
uv_pipe_init(&loop.uv, &ptyproc->proc_stdout, 0);
ptyproc->proc_stdout.data = NULL;
}
if (job->opts.stderr_cb) {
uv_pipe_init(&loop.uv, &ptyproc->proc_stderr, 0);
ptyproc->proc_stderr.data = NULL;
}
job->proc_stdin = (uv_stream_t *)&ptyproc->proc_stdin;
job->proc_stdout = (uv_stream_t *)&ptyproc->proc_stdout;
job->proc_stderr = (uv_stream_t *)&ptyproc->proc_stderr;
int master;
ptyproc->winsize = (struct winsize){job->opts.height, job->opts.width, 0, 0};
Process *proc = (Process *)ptyproc;
uv_signal_start(&proc->loop->children_watcher, chld_handler, SIGCHLD);
ptyproc->winsize = (struct winsize){ptyproc->height, ptyproc->width, 0, 0};
struct termios termios;
init_termios(&termios);
uv_disable_stdio_inheritance();
int master;
int pid = forkpty(&master, NULL, &termios, &ptyproc->winsize);
if (pid < 0) {
ELOG("forkpty failed: %s", strerror(errno));
return false;
} else if (pid == 0) {
init_child(job);
init_child(ptyproc);
abort();
}
@ -86,23 +67,18 @@ bool pty_process_spawn(Job *job) FUNC_ATTR_NONNULL_ALL
goto error;
}
if (job->opts.writable
&& !set_pipe_duplicating_descriptor(master, &ptyproc->proc_stdin)) {
if (proc->in && !set_duplicating_descriptor(master, &proc->in->uv.pipe)) {
goto error;
}
if (job->opts.stdout_cb
&& !set_pipe_duplicating_descriptor(master, &ptyproc->proc_stdout)) {
if (proc->out && !set_duplicating_descriptor(master, &proc->out->uv.pipe)) {
goto error;
}
if (job->opts.stderr_cb
&& !set_pipe_duplicating_descriptor(master, &ptyproc->proc_stderr)) {
if (proc->err && !set_duplicating_descriptor(master, &proc->err->uv.pipe)) {
goto error;
}
ptyproc->tty_fd = master;
job->pid = pid;
proc->pid = pid;
return true;
error:
@ -117,54 +93,44 @@ error:
}
if (child != pid) {
kill(pid, SIGKILL);
waitpid(pid, NULL, 0);
}
return false;
}
static bool set_pipe_duplicating_descriptor(int fd, uv_pipe_t *pipe)
void pty_process_resize(PtyProcess *ptyproc, uint16_t width,
uint16_t height)
FUNC_ATTR_NONNULL_ALL
{
int fd_dup = dup(fd);
if (fd_dup < 0) {
ELOG("Failed to dup descriptor %d: %s", fd, strerror(errno));
return false;
}
int uv_result = uv_pipe_open(pipe, fd_dup);
if (uv_result) {
ELOG("Failed to set pipe to descriptor %d: %s",
fd_dup, uv_strerror(uv_result));
close(fd_dup);
return false;
}
return true;
ptyproc->winsize = (struct winsize){height, width, 0, 0};
ioctl(ptyproc->tty_fd, TIOCSWINSZ, &ptyproc->winsize);
}
void pty_process_close(Job *job) FUNC_ATTR_NONNULL_ALL
void pty_process_close(PtyProcess *ptyproc)
FUNC_ATTR_NONNULL_ALL
{
pty_process_close_master(job);
job_close_streams(job);
job_decref(job);
pty_process_close_master(ptyproc);
Process *proc = (Process *)ptyproc;
if (proc->internal_close_cb) {
proc->internal_close_cb(proc);
}
}
void pty_process_close_master(Job *job) FUNC_ATTR_NONNULL_ALL
void pty_process_close_master(PtyProcess *ptyproc) FUNC_ATTR_NONNULL_ALL
{
PtyProcess *ptyproc = &job->process.pty;
if (ptyproc->tty_fd >= 0) {
close(ptyproc->tty_fd);
ptyproc->tty_fd = -1;
}
}
void pty_process_resize(Job *job, uint16_t width, uint16_t height)
FUNC_ATTR_NONNULL_ALL
void pty_process_teardown(Loop *loop)
{
PtyProcess *ptyproc = &job->process.pty;
ptyproc->winsize = (struct winsize){height, width, 0, 0};
ioctl(ptyproc->tty_fd, TIOCSWINSZ, &ptyproc->winsize);
uv_signal_stop(&loop->children_watcher);
}
static void init_child(Job *job) FUNC_ATTR_NONNULL_ALL
static void init_child(PtyProcess *ptyproc) FUNC_ATTR_NONNULL_ALL
{
unsetenv("COLUMNS");
unsetenv("LINES");
@ -179,8 +145,8 @@ static void init_child(Job *job) FUNC_ATTR_NONNULL_ALL
signal(SIGTERM, SIG_DFL);
signal(SIGALRM, SIG_DFL);
setenv("TERM", job->opts.term_name ? job->opts.term_name : "ansi", 1);
execvp(job->opts.argv[0], job->opts.argv);
setenv("TERM", ptyproc->term_name ? ptyproc->term_name : "ansi", 1);
execvp(ptyproc->process.argv[0], ptyproc->process.argv);
fprintf(stderr, "execvp failed: %s\n", strerror(errno));
}
@ -239,3 +205,50 @@ static void init_termios(struct termios *termios) FUNC_ATTR_NONNULL_ALL
termios->c_cc[VMIN] = 1;
termios->c_cc[VTIME] = 0;
}
static bool set_duplicating_descriptor(int fd, uv_pipe_t *pipe)
FUNC_ATTR_NONNULL_ALL
{
int fd_dup = dup(fd);
if (fd_dup < 0) {
ELOG("Failed to dup descriptor %d: %s", fd, strerror(errno));
return false;
}
int uv_result = uv_pipe_open(pipe, fd_dup);
if (uv_result) {
ELOG("Failed to set pipe to descriptor %d: %s",
fd_dup, uv_strerror(uv_result));
close(fd_dup);
return false;
}
return true;
}
static void chld_handler(uv_signal_t *handle, int signum)
{
int stat = 0;
int pid;
do {
pid = waitpid(-1, &stat, WNOHANG);
} while (pid < 0 && errno == EINTR);
if (pid <= 0) {
return;
}
Loop *loop = handle->loop->data;
kl_iter(WatcherPtr, loop->children, current) {
Process *proc = (*current)->data;
if (proc->pid == pid) {
if (WIFEXITED(stat)) {
proc->status = WEXITSTATUS(stat);
} else if (WIFSIGNALED(stat)) {
proc->status = WTERMSIG(stat);
}
proc->internal_exit_cb(proc);
break;
}
}
}

View File

@ -0,0 +1,30 @@
#ifndef NVIM_EVENT_PTY_PROCESS_H
#define NVIM_EVENT_PTY_PROCESS_H
#include <sys/ioctl.h>
#include "nvim/event/process.h"
typedef struct pty_process {
Process process;
char *term_name;
uint16_t width, height;
struct winsize winsize;
int tty_fd;
} PtyProcess;
static inline PtyProcess pty_process_init(void *data)
{
PtyProcess rv;
rv.process = process_init(kProcessTypePty, data);
rv.term_name = NULL;
rv.width = 80;
rv.height = 24;
rv.tty_fd = -1;
return rv;
}
#ifdef INCLUDE_GENERATED_DECLARATIONS
# include "event/pty_process.h.generated.h"
#endif
#endif // NVIM_EVENT_PTY_PROCESS_H

View File

@ -0,0 +1,77 @@
#include <assert.h>
#include <uv.h>
#include "nvim/event/loop.h"
#include "nvim/event/rstream.h"
#include "nvim/event/wstream.h"
#include "nvim/event/process.h"
#include "nvim/event/uv_process.h"
#include "nvim/log.h"
#ifdef INCLUDE_GENERATED_DECLARATIONS
# include "event/uv_process.c.generated.h"
#endif
bool uv_process_spawn(UvProcess *uvproc)
FUNC_ATTR_NONNULL_ALL
{
Process *proc = (Process *)uvproc;
uvproc->uvopts.file = proc->argv[0];
uvproc->uvopts.args = proc->argv;
uvproc->uvopts.flags = UV_PROCESS_WINDOWS_HIDE;
uvproc->uvopts.exit_cb = exit_cb;
uvproc->uvopts.cwd = NULL;
uvproc->uvopts.env = NULL;
uvproc->uvopts.stdio = uvproc->uvstdio;
uvproc->uvopts.stdio_count = 3;
uvproc->uvstdio[0].flags = UV_IGNORE;
uvproc->uvstdio[1].flags = UV_IGNORE;
uvproc->uvstdio[2].flags = UV_IGNORE;
uvproc->uv.data = proc;
if (proc->in) {
uvproc->uvstdio[0].flags = UV_CREATE_PIPE | UV_READABLE_PIPE;
uvproc->uvstdio[0].data.stream = (uv_stream_t *)&proc->in->uv.pipe;
}
if (proc->out) {
uvproc->uvstdio[1].flags = UV_CREATE_PIPE | UV_WRITABLE_PIPE;
uvproc->uvstdio[1].data.stream = (uv_stream_t *)&proc->out->uv.pipe;
}
if (proc->err) {
uvproc->uvstdio[2].flags = UV_CREATE_PIPE | UV_WRITABLE_PIPE;
uvproc->uvstdio[2].data.stream = (uv_stream_t *)&proc->err->uv.pipe;
}
int status;
if ((status = uv_spawn(&proc->loop->uv, &uvproc->uv, &uvproc->uvopts))) {
ELOG("uv_spawn failed: %s", uv_strerror(status));
return false;
}
proc->pid = uvproc->uv.pid;
return true;
}
void uv_process_close(UvProcess *uvproc)
FUNC_ATTR_NONNULL_ARG(1)
{
uv_close((uv_handle_t *)&uvproc->uv, close_cb);
}
static void close_cb(uv_handle_t *handle)
{
Process *proc = handle->data;
if (proc->internal_close_cb) {
proc->internal_close_cb(proc);
}
}
static void exit_cb(uv_process_t *handle, int64_t status, int term_signal)
{
Process *proc = handle->data;
proc->status = (int)status;
proc->internal_exit_cb(proc);
}

View File

@ -0,0 +1,25 @@
#ifndef NVIM_EVENT_UV_PROCESS_H
#define NVIM_EVENT_UV_PROCESS_H
#include <uv.h>
#include "nvim/event/process.h"
typedef struct uv_process {
Process process;
uv_process_t uv;
uv_process_options_t uvopts;
uv_stdio_container_t uvstdio[3];
} UvProcess;
static inline UvProcess uv_process_init(void *data)
{
UvProcess rv;
rv.process = process_init(kProcessTypeUv, data);
return rv;
}
#ifdef INCLUDE_GENERATED_DECLARATIONS
# include "event/uv_process.h.generated.h"
#endif
#endif // NVIM_EVENT_UV_PROCESS_H

View File

@ -136,6 +136,6 @@
// `break` statement is executed before the next iteration.
#define kl_iter(name, kl, p) kl_iter_at(name, kl, p, NULL)
#define kl_iter_at(name, kl, p, h) \
for (kl1_##name *p = h ? h : kl->head; p != kl->tail; p = p->next)
for (kl1_##name **p = h ? h : &kl->head; *p != kl->tail; p = &(*p)->next)
#endif

View File

@ -63,7 +63,7 @@
#include "nvim/os/time.h"
#include "nvim/event/loop.h"
#include "nvim/os/signal.h"
#include "nvim/os/job.h"
#include "nvim/event/process.h"
#include "nvim/msgpack_rpc/defs.h"
#include "nvim/msgpack_rpc/helpers.h"
#include "nvim/msgpack_rpc/server.h"
@ -149,7 +149,6 @@ void event_init(void)
// `event_poll`
// Signals
signal_init();
job_init();
// finish mspgack-rpc initialization
channel_init();
server_init();
@ -165,7 +164,7 @@ void event_teardown(void)
loop_process_all_events(&loop);
input_stop();
channel_teardown();
job_teardown();
process_teardown(&loop);
server_teardown();
signal_teardown();
terminal_teardown();

View File

@ -10,11 +10,10 @@
#include "nvim/msgpack_rpc/channel.h"
#include "nvim/msgpack_rpc/remote_ui.h"
#include "nvim/event/loop.h"
#include "nvim/event/uv_process.h"
#include "nvim/event/rstream.h"
#include "nvim/event/wstream.h"
#include "nvim/event/socket.h"
#include "nvim/os/job.h"
#include "nvim/os/job_defs.h"
#include "nvim/msgpack_rpc/helpers.h"
#include "nvim/vim.h"
#include "nvim/ascii.h"
@ -35,7 +34,7 @@
typedef enum {
kChannelTypeSocket,
kChannelTypeJob,
kChannelTypeProc,
kChannelTypeStdio
} ChannelType;
@ -54,8 +53,13 @@ typedef struct {
ChannelType type;
msgpack_unpacker *unpacker;
union {
Job *job;
Stream stream;
struct {
UvProcess uvproc;
Stream in;
Stream out;
Stream err;
} process;
struct {
Stream in;
Stream out;
@ -110,34 +114,35 @@ void channel_teardown(void)
});
}
/// Creates an API channel by starting a job and connecting to its
/// Creates an API channel by starting a process and connecting to its
/// stdin/stdout. stderr is forwarded to the editor error stream.
///
/// @param argv The argument vector for the process. [consumed]
/// @return The channel id (> 0), on success.
/// 0, on error.
uint64_t channel_from_job(char **argv)
uint64_t channel_from_process(char **argv)
{
Channel *channel = register_channel(kChannelTypeJob);
incref(channel); // job channels are only closed by the exit_cb
int status;
JobOptions opts = JOB_OPTIONS_INIT;
opts.argv = argv;
opts.data = channel;
opts.stdout_cb = job_out;
opts.stderr_cb = job_err;
opts.exit_cb = job_exit;
channel->data.job = job_start(opts, &status);
if (status <= 0) {
if (status == 0) { // Two decrefs needed if status == 0.
decref(channel); // Only one needed if status < 0,
} // because exit_cb will do the second one.
Channel *channel = register_channel(kChannelTypeProc);
channel->data.process.uvproc = uv_process_init(channel);
Process *proc = &channel->data.process.uvproc.process;
proc->argv = argv;
proc->in = &channel->data.process.in;
proc->out = &channel->data.process.out;
proc->err = &channel->data.process.err;
proc->cb = process_exit;
if (!process_spawn(&loop, proc)) {
loop_poll_events(&loop, 0);
decref(channel);
return 0;
}
incref(channel); // process channels are only closed by the exit_cb
wstream_init(proc->in, 0);
rstream_init(proc->out, 0);
rstream_start(proc->out, parse_msgpack);
rstream_init(proc->err, 0);
rstream_start(proc->err, forward_stderr);
return channel->id;
}
@ -319,24 +324,17 @@ static void channel_from_stdio(void)
wstream_init_fd(&loop, &channel->data.std.out, 1, 0, NULL);
}
static void job_out(Stream *stream, RBuffer *buf, void *data, bool eof)
{
Job *job = data;
parse_msgpack(stream, buf, job_data(job), eof);
}
static void job_err(Stream *stream, RBuffer *rbuf, void *data, bool eof)
static void forward_stderr(Stream *stream, RBuffer *rbuf, void *data, bool eof)
{
while (rbuffer_size(rbuf)) {
char buf[256];
size_t read = rbuffer_read(rbuf, buf, sizeof(buf) - 1);
buf[read] = NUL;
ELOG("Channel %" PRIu64 " stderr: %s",
((Channel *)job_data(data))->id, buf);
ELOG("Channel %" PRIu64 " stderr: %s", ((Channel *)data)->id, buf);
}
}
static void job_exit(Job *job, int status, void *data)
static void process_exit(Process *proc, int status, void *data)
{
decref(data);
}
@ -511,8 +509,8 @@ static bool channel_write(Channel *channel, WBuffer *buffer)
case kChannelTypeSocket:
success = wstream_write(&channel->data.stream, buffer);
break;
case kChannelTypeJob:
success = job_write(channel->data.job, buffer);
case kChannelTypeProc:
success = wstream_write(&channel->data.process.in, buffer);
break;
case kChannelTypeStdio:
success = wstream_write(&channel->data.std.out, buffer);
@ -627,7 +625,7 @@ static void unsubscribe(Channel *channel, char *event)
xfree(event_string);
}
/// Close the channel streams/job and free the channel resources.
/// Close the channel streams/process and free the channel resources.
static void close_channel(Channel *channel)
{
if (channel->closed) {
@ -640,9 +638,9 @@ static void close_channel(Channel *channel)
case kChannelTypeSocket:
stream_close(&channel->data.stream, close_cb);
break;
case kChannelTypeJob:
if (channel->data.job) {
job_stop(channel->data.job);
case kChannelTypeProc:
if (!channel->data.process.uvproc.process.closed) {
process_stop(&channel->data.process.uvproc.process);
}
break;
case kChannelTypeStdio:

View File

@ -1,444 +0,0 @@
#include <stdint.h>
#include <stdbool.h>
#include <uv.h>
#include "nvim/event/loop.h"
#include "nvim/event/time.h"
#include "nvim/event/signal.h"
#include "nvim/event/rstream.h"
#include "nvim/event/wstream.h"
#include "nvim/os/job.h"
#include "nvim/os/job_defs.h"
#include "nvim/os/job_private.h"
#include "nvim/os/pty_process.h"
#include "nvim/os/time.h"
#include "nvim/vim.h"
#include "nvim/memory.h"
#ifdef HAVE_SYS_WAIT_H
# include <sys/wait.h>
#endif
// {SIGNAL}_TIMEOUT is the time (in nanoseconds) that a job has to cleanly exit
// before we send SIGNAL to it
#define TERM_TIMEOUT 1000000000
#define KILL_TIMEOUT (TERM_TIMEOUT * 2)
#define JOB_BUFFER_SIZE 0xFFFF
#define close_job_stream(job, stream) \
do { \
if (!job->stream.closed) { \
stream_close(&job->stream, on_##stream_close); \
} \
} while (0)
#define close_job_in(job) close_job_stream(job, in)
#define close_job_out(job) close_job_stream(job, out)
#define close_job_err(job) close_job_stream(job, err)
Job *table[MAX_RUNNING_JOBS] = {NULL};
size_t stop_requests = 0;
TimeWatcher job_stop_timer;
SignalWatcher schld;
// Some helpers shared in this module
#ifdef INCLUDE_GENERATED_DECLARATIONS
# include "os/job.c.generated.h"
#endif
// Callbacks for libuv
/// Initializes job control resources
void job_init(void)
{
uv_disable_stdio_inheritance();
time_watcher_init(&loop, &job_stop_timer, NULL);
signal_watcher_init(&loop, &schld, NULL);
signal_watcher_start(&schld, chld_handler, SIGCHLD);
}
/// Releases job control resources and terminates running jobs
void job_teardown(void)
{
// Stop all jobs
for (int i = 0; i < MAX_RUNNING_JOBS; i++) {
Job *job;
if ((job = table[i]) != NULL) {
uv_kill(job->pid, SIGTERM);
job->term_sent = true;
job_stop(job);
}
}
// Wait until all jobs are closed
LOOP_POLL_EVENTS_UNTIL(&loop, -1, !stop_requests);
signal_watcher_stop(&schld);
signal_watcher_close(&schld, NULL);
// Close the timer
time_watcher_close(&job_stop_timer, NULL);
}
/// Tries to start a new job.
///
/// @param[out] status The job id if the job started successfully, 0 if the job
/// table is full, -1 if the program could not be executed.
/// @return The job pointer if the job started successfully, NULL otherwise
Job *job_start(JobOptions opts, int *status)
{
int i;
Job *job;
// Search for a free slot in the table
for (i = 0; i < MAX_RUNNING_JOBS; i++) {
if (table[i] == NULL) {
break;
}
}
if (i == MAX_RUNNING_JOBS) {
// No free slots
shell_free_argv(opts.argv);
*status = 0;
return NULL;
}
job = xmalloc(sizeof(Job));
// Initialize
job->id = i + 1;
*status = job->id;
job->status = -1;
job->refcount = 1;
job->stopped_time = 0;
job->term_sent = false;
job->opts = opts;
job->closed = false;
job->in.closed = true;
job->out.closed = true;
job->err.closed = true;
// Spawn the job
if (!process_spawn(job)) {
if (job->opts.writable) {
uv_close((uv_handle_t *)job->proc_stdin, NULL);
}
if (job->opts.stdout_cb) {
uv_close((uv_handle_t *)job->proc_stdout, NULL);
}
if (job->opts.stderr_cb) {
uv_close((uv_handle_t *)job->proc_stderr, NULL);
}
process_close(job);
loop_poll_events(&loop, 0);
*status = -1;
return NULL;
}
if (opts.writable) {
job->refcount++;
wstream_init_stream(&job->in, job->proc_stdin, opts.maxmem, job);
}
// Start the readable streams
if (opts.stdout_cb) {
job->refcount++;
rstream_init_stream(&job->out, job->proc_stdout, JOB_BUFFER_SIZE, job);
rstream_start(&job->out, read_cb);
}
if (opts.stderr_cb) {
job->refcount++;
rstream_init_stream(&job->err, job->proc_stderr, JOB_BUFFER_SIZE, job);
rstream_start(&job->err, read_cb);
}
// Save the job to the table
table[i] = job;
return job;
}
/// Finds a job instance by id
///
/// @param id The job id
/// @return the Job instance
Job *job_find(int id)
{
Job *job;
if (id <= 0 || id > MAX_RUNNING_JOBS || !(job = table[id - 1])
|| job->stopped_time) {
return NULL;
}
return job;
}
/// Terminates a job. This is a non-blocking operation, but if the job exists
/// it's guaranteed to succeed(SIGKILL will eventually be sent)
///
/// @param job The Job instance
void job_stop(Job *job)
{
if (job->stopped_time) {
return;
}
job->stopped_time = os_hrtime();
if (job->opts.pty) {
// close all streams for pty jobs to send SIGHUP to the process
job_close_streams(job);
pty_process_close_master(job);
} else {
// Close the job's stdin. If the job doesn't close its own stdout/stderr,
// they will be closed when the job exits(possibly due to being terminated
// after a timeout)
job_close_in(job);
}
if (!stop_requests++) {
// When there's at least one stop request pending, start a timer that
// will periodically check if a signal should be send to a to the job
DLOG("Starting job kill timer");
time_watcher_start(&job_stop_timer, job_stop_timer_cb, 100, 100);
}
}
/// job_wait - synchronously wait for a job to finish
///
/// @param job The job instance
/// @param ms Number of milliseconds to wait, 0 for not waiting, -1 for
/// waiting until the job quits.
/// @return returns the status code of the exited job. -1 if the job is
/// still running and the `timeout` has expired. Note that this is
/// indistinguishable from the process returning -1 by itself. Which
/// is possible on some OS. Returns -2 if the job was interrupted.
int job_wait(Job *job, int ms) FUNC_ATTR_NONNULL_ALL
{
// The default status is -1, which represents a timeout
int status = -1;
bool interrupted = false;
// Increase refcount to stop the job from being freed before we have a
// chance to get the status.
job->refcount++;
LOOP_POLL_EVENTS_UNTIL(&loop, ms,
// Until...
got_int || // interrupted by the user
job->refcount == 1); // job exited
// we'll assume that a user frantically hitting interrupt doesn't like
// the current job. Signal that it has to be killed.
if (got_int) {
interrupted = true;
got_int = false;
job_stop(job);
if (ms == -1) {
// We can only return, if all streams/handles are closed and the job
// exited.
LOOP_POLL_EVENTS_UNTIL(&loop, -1, job->refcount == 1);
} else {
loop_poll_events(&loop, 0);
}
}
if (job->refcount == 1) {
// Job exited, collect status and manually invoke close_cb to free the job
// resources
status = interrupted ? -2 : job->status;
job_close_streams(job);
job_decref(job);
} else {
job->refcount--;
}
return status;
}
/// Close the pipe used to write to the job.
///
/// This can be used for example to indicate to the job process that no more
/// input is coming, and that it should shut down cleanly.
///
/// It has no effect when the input pipe doesn't exist or was already
/// closed.
///
/// @param job The job instance
void job_close_in(Job *job) FUNC_ATTR_NONNULL_ALL
{
close_job_in(job);
}
// Close the job stdout stream.
void job_close_out(Job *job) FUNC_ATTR_NONNULL_ALL
{
close_job_out(job);
}
// Close the job stderr stream.
void job_close_err(Job *job) FUNC_ATTR_NONNULL_ALL
{
close_job_out(job);
}
/// All writes that complete after calling this function will be reported
/// to `cb`.
///
/// Use this function to be notified about the status of an in-flight write.
///
/// @see {wstream_set_write_cb}
///
/// @param job The job instance
/// @param cb The function that will be called on write completion or
/// failure. It will be called with the job as the `data` argument.
void job_write_cb(Job *job, stream_write_cb cb) FUNC_ATTR_NONNULL_ALL
{
wstream_set_write_cb(&job->in, cb);
}
/// Writes data to the job's stdin. This is a non-blocking operation, it
/// returns when the write request was sent.
///
/// @param job The Job instance
/// @param buffer The buffer which contains the data to be written
/// @return true if the write request was successfully sent, false if writing
/// to the job stream failed (possibly because the OS buffer is full)
bool job_write(Job *job, WBuffer *buffer)
{
return wstream_write(&job->in, buffer);
}
/// Get the job id
///
/// @param job A pointer to the job
/// @return The job id
int job_id(Job *job)
{
return job->id;
}
// Get the job pid
int job_pid(Job *job)
{
return job->pid;
}
/// Get data associated with a job
///
/// @param job A pointer to the job
/// @return The job data
void *job_data(Job *job)
{
return job->opts.data;
}
/// Resize the window for a pty job
bool job_resize(Job *job, uint16_t width, uint16_t height)
{
if (!job->opts.pty) {
return false;
}
pty_process_resize(job, width, height);
return true;
}
void job_close_streams(Job *job)
{
close_job_in(job);
close_job_out(job);
close_job_err(job);
}
JobOptions *job_opts(Job *job)
{
return &job->opts;
}
/// Iterates the table, sending SIGTERM to stopped jobs and SIGKILL to those
/// that didn't die from SIGTERM after a while(exit_timeout is 0).
static void job_stop_timer_cb(TimeWatcher *watcher, void *data)
{
Job *job;
uint64_t now = os_hrtime();
for (size_t i = 0; i < MAX_RUNNING_JOBS; i++) {
if ((job = table[i]) == NULL || !job->stopped_time) {
continue;
}
uint64_t elapsed = now - job->stopped_time;
if (!job->term_sent && elapsed >= TERM_TIMEOUT) {
ILOG("Sending SIGTERM to job(id: %d)", job->id);
uv_kill(job->pid, SIGTERM);
job->term_sent = true;
} else if (elapsed >= KILL_TIMEOUT) {
ILOG("Sending SIGKILL to job(id: %d)", job->id);
uv_kill(job->pid, SIGKILL);
process_close(job);
}
}
}
// Wraps the call to std{out,err}_cb and emits a JobExit event if necessary.
static void read_cb(Stream *stream, RBuffer *buf, void *data, bool eof)
{
Job *job = data;
if (stream == &job->out) {
job->opts.stdout_cb(stream, buf, data, eof);
if (eof) {
close_job_out(job);
}
} else {
job->opts.stderr_cb(stream, buf, data, eof);
if (eof) {
close_job_err(job);
}
}
}
static void on_stream_close(Stream *stream, void *data)
{
job_decref(data);
}
static void job_exited(Event event)
{
Job *job = event.data;
process_close(job);
}
static void chld_handler(SignalWatcher *watcher, int signum, void *data)
{
int stat = 0;
int pid;
do {
pid = waitpid(-1, &stat, WNOHANG);
} while (pid < 0 && errno == EINTR);
if (pid <= 0) {
return;
}
Job *job = NULL;
// find the job corresponding to the exited pid
for (int i = 0; i < MAX_RUNNING_JOBS; i++) {
if ((job = table[i]) != NULL && job->pid == pid) {
if (WIFEXITED(stat)) {
job->status = WEXITSTATUS(stat);
} else if (WIFSIGNALED(stat)) {
job->status = WTERMSIG(stat);
}
if (exiting) {
// don't enqueue more events when exiting
process_close(job);
} else {
loop_push_event(&loop,
(Event) {.handler = job_exited, .data = job}, false);
}
break;
}
}
}

View File

@ -1,20 +0,0 @@
// Job is a short name we use to refer to child processes that run in parallel
// with the editor, probably executing long-running tasks and sending updates
// asynchronously. Communication happens through anonymous pipes connected to
// the job's std{in,out,err}. They are more like bash/zsh co-processes than the
// usual shell background job. The name 'Job' was chosen because it applies to
// the concept while being significantly shorter.
#ifndef NVIM_OS_JOB_H
#define NVIM_OS_JOB_H
#include <stdint.h>
#include <stdbool.h>
#include "nvim/os/job_defs.h"
#include "nvim/event/rstream.h"
#include "nvim/event/wstream.h"
#ifdef INCLUDE_GENERATED_DECLARATIONS
# include "os/job.h.generated.h"
#endif
#endif // NVIM_OS_JOB_H

View File

@ -1,64 +0,0 @@
#ifndef NVIM_OS_JOB_DEFS_H
#define NVIM_OS_JOB_DEFS_H
#include <uv.h>
#include "nvim/event/rstream.h"
#include "nvim/event/wstream.h"
#define MAX_RUNNING_JOBS 100
typedef struct job Job;
/// Function called when the job reads data
///
/// @param id The job id
/// @param data Some data associated with the job by the caller
typedef void (*job_exit_cb)(Job *job, int status, void *data);
// Job startup options
// job_exit_cb Callback that will be invoked when the job exits
// maxmem Maximum amount of memory used by the job WStream
typedef struct {
// Argument vector for the process. The first item is the
// executable to run.
// [consumed]
char **argv;
// Caller data that will be associated with the job
void *data;
// If true the job stdin will be available for writing with job_write,
// otherwise it will be redirected to /dev/null
bool writable;
// Callback that will be invoked when data is available on stdout. If NULL
// stdout will be redirected to /dev/null.
stream_read_cb stdout_cb;
// Callback that will be invoked when data is available on stderr. If NULL
// stderr will be redirected to /dev/null.
stream_read_cb stderr_cb;
// Callback that will be invoked when the job has exited and will not send
// data
job_exit_cb exit_cb;
// Maximum memory used by the job's WStream
size_t maxmem;
// Connect the job to a pseudo terminal
bool pty;
// Initial window dimensions if the job is connected to a pseudo terminal
uint16_t width, height;
// Value for the $TERM environment variable. A default value of "ansi" is
// assumed if NULL
char *term_name;
} JobOptions;
#define JOB_OPTIONS_INIT ((JobOptions) { \
.argv = NULL, \
.data = NULL, \
.writable = true, \
.stdout_cb = NULL, \
.stderr_cb = NULL, \
.exit_cb = NULL, \
.maxmem = 0, \
.pty = false, \
.width = 80, \
.height = 24, \
.term_name = NULL \
})
#endif // NVIM_OS_JOB_DEFS_H

View File

@ -1,101 +0,0 @@
#ifndef NVIM_OS_JOB_PRIVATE_H
#define NVIM_OS_JOB_PRIVATE_H
#include <stdlib.h>
#include <uv.h>
#include "nvim/event/time.h"
#include "nvim/event/rstream.h"
#include "nvim/event/wstream.h"
#include "nvim/os/pipe_process.h"
#include "nvim/os/pty_process.h"
#include "nvim/os/shell.h"
#include "nvim/log.h"
#include "nvim/memory.h"
struct job {
// Job id the index in the job table plus one.
int id;
// Process id
int pid;
// Exit status code of the job process
int status;
// Number of references to the job. The job resources will only be freed by
// close_cb when this is 0
int refcount;
// Time when job_stop was called for the job.
uint64_t stopped_time;
// If SIGTERM was already sent to the job(only send one before SIGKILL)
bool term_sent;
// stdio streams(std{in,out,err})
Stream in, out, err;
// Libuv streams representing stdin/stdout/stderr
uv_stream_t *proc_stdin, *proc_stdout, *proc_stderr;
// Extra data set by the process spawner
union {
UvProcess uv;
PtyProcess pty;
} process;
// If process_close has been called on this job
bool closed;
// Startup options
JobOptions opts;
};
extern Job *table[];
extern size_t stop_requests;
extern TimeWatcher job_stop_timer;
static inline bool process_spawn(Job *job)
{
return job->opts.pty ? pty_process_spawn(job) : pipe_process_spawn(job);
}
static inline void process_close(Job *job)
{
if (job->closed) {
return;
}
job->closed = true;
if (job->opts.pty) {
pty_process_close(job);
} else {
pipe_process_close(job);
}
}
static inline void job_exit_callback(Job *job)
{
// Free the slot now, 'exit_cb' may want to start another job to replace
// this one
table[job->id - 1] = NULL;
if (job->opts.exit_cb) {
// Invoke the exit callback
job->opts.exit_cb(job, job->status, job->opts.data);
}
if (stop_requests && !--stop_requests) {
// Stop the timer if no more stop requests are pending
DLOG("Stopping job kill timer");
time_watcher_stop(&job_stop_timer);
}
}
static inline void job_decref(Job *job)
{
if (--job->refcount == 0) {
// Invoke the exit_cb
job_exit_callback(job);
// Free all memory allocated for the job
shell_free_argv(job->opts.argv);
if (job->opts.pty) {
xfree(job->opts.term_name);
}
xfree(job);
}
}
#endif // NVIM_OS_JOB_PRIVATE_H

View File

@ -1,88 +0,0 @@
#include <stdbool.h>
#include <stdlib.h>
#include <uv.h>
#include "nvim/os/job.h"
#include "nvim/os/job_defs.h"
#include "nvim/os/job_private.h"
#include "nvim/os/pipe_process.h"
#include "nvim/memory.h"
#include "nvim/vim.h"
#include "nvim/globals.h"
#ifdef INCLUDE_GENERATED_DECLARATIONS
# include "os/pipe_process.c.generated.h"
#endif
bool pipe_process_spawn(Job *job)
{
UvProcess *pipeproc = &job->process.uv;
pipeproc->proc_opts.file = job->opts.argv[0];
pipeproc->proc_opts.args = job->opts.argv;
pipeproc->proc_opts.stdio = pipeproc->stdio;
pipeproc->proc_opts.stdio_count = 3;
pipeproc->proc_opts.flags = UV_PROCESS_WINDOWS_HIDE;
pipeproc->proc_opts.exit_cb = exit_cb;
pipeproc->proc_opts.cwd = NULL;
pipeproc->proc_opts.env = NULL;
pipeproc->proc.data = NULL;
pipeproc->proc_stdin.data = NULL;
pipeproc->proc_stdout.data = NULL;
pipeproc->proc_stderr.data = NULL;
// Initialize the job std{in,out,err}
pipeproc->stdio[0].flags = UV_IGNORE;
pipeproc->stdio[1].flags = UV_IGNORE;
pipeproc->stdio[2].flags = UV_IGNORE;
pipeproc->proc.data = job;
if (job->opts.writable) {
uv_pipe_init(&loop.uv, &pipeproc->proc_stdin, 0);
pipeproc->stdio[0].flags = UV_CREATE_PIPE | UV_READABLE_PIPE;
pipeproc->stdio[0].data.stream = (uv_stream_t *)&pipeproc->proc_stdin;
}
if (job->opts.stdout_cb) {
uv_pipe_init(&loop.uv, &pipeproc->proc_stdout, 0);
pipeproc->stdio[1].flags = UV_CREATE_PIPE | UV_WRITABLE_PIPE;
pipeproc->stdio[1].data.stream = (uv_stream_t *)&pipeproc->proc_stdout;
}
if (job->opts.stderr_cb) {
uv_pipe_init(&loop.uv, &pipeproc->proc_stderr, 0);
pipeproc->stdio[2].flags = UV_CREATE_PIPE | UV_WRITABLE_PIPE;
pipeproc->stdio[2].data.stream = (uv_stream_t *)&pipeproc->proc_stderr;
}
job->proc_stdin = (uv_stream_t *)&pipeproc->proc_stdin;
job->proc_stdout = (uv_stream_t *)&pipeproc->proc_stdout;
job->proc_stderr = (uv_stream_t *)&pipeproc->proc_stderr;
if (uv_spawn(&loop.uv, &pipeproc->proc, &pipeproc->proc_opts) != 0) {
return false;
}
job->pid = pipeproc->proc.pid;
return true;
}
void pipe_process_close(Job *job)
{
uv_close((uv_handle_t *)&job->process.uv.proc, close_cb);
}
static void exit_cb(uv_process_t *proc, int64_t status, int term_signal)
{
Job *job = proc->data;
job->status = (int)status;
pipe_process_close(job);
}
static void close_cb(uv_handle_t *handle)
{
Job *job = handle->data;
job_close_streams(job);
job_decref(job);
}

View File

@ -1,17 +0,0 @@
#ifndef NVIM_OS_PIPE_PROCESS_H
#define NVIM_OS_PIPE_PROCESS_H
#include <uv.h>
typedef struct {
// Structures for process spawning/management used by libuv
uv_process_t proc;
uv_process_options_t proc_opts;
uv_stdio_container_t stdio[3];
uv_pipe_t proc_stdin, proc_stdout, proc_stderr;
} UvProcess;
#ifdef INCLUDE_GENERATED_DECLARATIONS
# include "os/pipe_process.h.generated.h"
#endif
#endif // NVIM_OS_PIPE_PROCESS_H

View File

@ -1,17 +0,0 @@
#ifndef NVIM_OS_PTY_PROCESS_H
#define NVIM_OS_PTY_PROCESS_H
#include <sys/ioctl.h>
#include <uv.h>
typedef struct {
struct winsize winsize;
uv_pipe_t proc_stdin, proc_stdout, proc_stderr;
int tty_fd;
} PtyProcess;
#ifdef INCLUDE_GENERATED_DECLARATIONS
# include "os/pty_process.h.generated.h"
#endif
#endif // NVIM_OS_PTY_PROCESS_H

View File

@ -9,7 +9,7 @@
#include "nvim/lib/kvec.h"
#include "nvim/log.h"
#include "nvim/event/loop.h"
#include "nvim/os/job.h"
#include "nvim/event/uv_process.h"
#include "nvim/event/rstream.h"
#include "nvim/os/shell.h"
#include "nvim/os/signal.h"
@ -204,17 +204,15 @@ static int do_os_system(char **argv,
char prog[MAXPATHL];
xstrlcpy(prog, argv[0], MAXPATHL);
int status;
JobOptions opts = JOB_OPTIONS_INIT;
opts.argv = argv;
opts.data = &buf;
opts.writable = input != NULL;
opts.stdout_cb = data_cb;
opts.stderr_cb = data_cb;
opts.exit_cb = NULL;
Job *job = job_start(opts, &status);
if (status <= 0) {
Stream in, out, err;
UvProcess uvproc = uv_process_init(&buf);
Process *proc = &uvproc.process;
proc->argv = argv;
proc->in = input != NULL ? &in : NULL;
proc->out = &out;
proc->err = &err;
if (!process_spawn(&loop, proc)) {
loop_poll_events(&loop, 0);
// Failed, probably due to `sh` not being executable
if (!silent) {
MSG_PUTS(_("\nCannot execute "));
@ -224,28 +222,32 @@ static int do_os_system(char **argv,
return -1;
}
if (input != NULL) {
wstream_init(proc->in, 0);
}
rstream_init(proc->out, 0);
rstream_start(proc->out, data_cb);
rstream_init(proc->err, 0);
rstream_start(proc->err, data_cb);
// write the input, if any
if (input) {
WBuffer *input_buffer = wstream_new_buffer((char *) input, len, 1, NULL);
if (!job_write(job, input_buffer)) {
// couldn't write, stop the job and tell the user about it
job_stop(job);
if (!wstream_write(&in, input_buffer)) {
// couldn't write, stop the process and tell the user about it
process_stop(proc);
return -1;
}
// close the input stream after everything is written
job_write_cb(job, shell_write_cb);
} else {
// close the input stream, let the process know that no more input is
// coming
job_close_in(job);
wstream_set_write_cb(&in, shell_write_cb);
}
// invoke busy_start here so event_poll_until wont change the busy state for
// the UI
ui_busy_start();
ui_flush();
status = job_wait(job, -1);
int status = process_wait(proc, -1);
ui_busy_stop();
// prepare the out parameters if requested
@ -285,8 +287,7 @@ static void dynamic_buffer_ensure(DynamicBuffer *buf, size_t desired)
static void system_data_cb(Stream *stream, RBuffer *buf, void *data, bool eof)
{
Job *job = data;
DynamicBuffer *dbuf = job_data(job);
DynamicBuffer *dbuf = data;
size_t nread = buf->size;
dynamic_buffer_ensure(dbuf, dbuf->len + nread + 1);
@ -472,6 +473,5 @@ static size_t write_output(char *output, size_t remaining, bool to_buffer,
static void shell_write_cb(Stream *stream, void *data, int status)
{
Job *job = data;
job_close_in(job);
stream_close(stream, NULL);
}

View File

@ -50,7 +50,6 @@
#include "nvim/os/input.h"
#include "nvim/os/shell.h"
#include "nvim/os/signal.h"
#include "nvim/os/job.h"
#include "nvim/msgpack_rpc/helpers.h"
#ifdef HAVE_STROPTS_H