From 1ec7db70ecac75736b5042203e57aae57b65abe6 Mon Sep 17 00:00:00 2001 From: Thiago de Arruda Date: Mon, 23 Feb 2015 12:34:02 -0300 Subject: [PATCH] job: Refactor process spawning and startup arguments - process spawning was decoupled from the rest of the job control logic. The goal is reusing it for spawning processes connected to pseudo terminal file descriptors. - job_start now receives a JobOptions structure containing all the startup options. --- .valgrind.supp | 2 + src/nvim/eval.c | 16 ++- src/nvim/msgpack_rpc/channel.c | 15 ++- src/nvim/os/job.c | 195 +++++++-------------------------- src/nvim/os/job_defs.h | 37 +++++++ src/nvim/os/job_private.h | 104 ++++++++++++++++++ src/nvim/os/pipe_process.c | 110 +++++++++++++++++++ src/nvim/os/pipe_process.h | 7 ++ src/nvim/os/shell.c | 16 +-- 9 files changed, 323 insertions(+), 179 deletions(-) create mode 100644 src/nvim/os/job_private.h create mode 100644 src/nvim/os/pipe_process.c create mode 100644 src/nvim/os/pipe_process.h diff --git a/.valgrind.supp b/.valgrind.supp index cad2cd9ee2..8b630fcaaf 100644 --- a/.valgrind.supp +++ b/.valgrind.supp @@ -10,5 +10,7 @@ Memcheck:Leak fun:malloc fun:uv_spawn + fun:pipe_process_spawn + fun:process_spawn fun:job_start } diff --git a/src/nvim/eval.c b/src/nvim/eval.c index 0049c9f59a..c2a46ed206 100644 --- a/src/nvim/eval.c +++ b/src/nvim/eval.c @@ -10725,15 +10725,13 @@ static void f_jobstart(typval_T *argvars, typval_T *rettv) // The last item of argv must be NULL argv[i] = NULL; - - job_start(argv, - xstrdup((char *)argvars[0].vval.v_string), - true, - on_job_stdout, - on_job_stderr, - on_job_exit, - 0, - &rettv->vval.v_number); + JobOptions opts = JOB_OPTIONS_INIT; + opts.argv = argv; + opts.data = xstrdup((char *)argvars[0].vval.v_string); + opts.stdout_cb = on_job_stdout; + opts.stderr_cb = on_job_stderr; + opts.exit_cb = on_job_exit; + job_start(opts, &rettv->vval.v_number); if (rettv->vval.v_number <= 0) { if (rettv->vval.v_number == 0) { diff --git a/src/nvim/msgpack_rpc/channel.c b/src/nvim/msgpack_rpc/channel.c index 3df3200d3d..00b8cd072f 100644 --- a/src/nvim/msgpack_rpc/channel.c +++ b/src/nvim/msgpack_rpc/channel.c @@ -132,14 +132,13 @@ uint64_t channel_from_job(char **argv) incref(channel); // job channels are only closed by the exit_cb int status; - channel->data.job = job_start(argv, - channel, - true, - job_out, - job_err, - job_exit, - 0, - &status); + JobOptions opts = JOB_OPTIONS_INIT; + opts.argv = argv; + opts.data = channel; + opts.stdout_cb = job_out; + opts.stderr_cb = job_err; + opts.exit_cb = job_exit; + channel->data.job = job_start(opts, &status); if (status <= 0) { if (status == 0) { // Two decrefs needed if status == 0. diff --git a/src/nvim/os/job.c b/src/nvim/os/job.c index 33429b364f..94bb9067ed 100644 --- a/src/nvim/os/job.c +++ b/src/nvim/os/job.c @@ -6,13 +6,13 @@ #include "nvim/os/uv_helpers.h" #include "nvim/os/job.h" #include "nvim/os/job_defs.h" +#include "nvim/os/job_private.h" #include "nvim/os/rstream.h" #include "nvim/os/rstream_defs.h" #include "nvim/os/wstream.h" #include "nvim/os/wstream_defs.h" #include "nvim/os/event.h" #include "nvim/os/event_defs.h" -#include "nvim/os/shell.h" #include "nvim/os/time.h" #include "nvim/vim.h" #include "nvim/memory.h" @@ -29,8 +29,8 @@ if (job->stream) { \ type##stream_free(job->stream); \ job->stream = NULL; \ - if (!uv_is_closing((uv_handle_t *)&job->proc_std##stream)) { \ - uv_close((uv_handle_t *)&job->proc_std##stream, close_cb); \ + if (!uv_is_closing((uv_handle_t *)job->proc_std##stream)) { \ + uv_close((uv_handle_t *)job->proc_std##stream, close_cb); \ } \ } \ } while (0) @@ -39,37 +39,9 @@ #define close_job_out(job) close_job_stream(job, out, r) #define close_job_err(job) close_job_stream(job, err, r) -struct job { - // Job id the index in the job table plus one. - int id; - // Exit status code of the job process - int status; - // Number of references to the job. The job resources will only be freed by - // close_cb when this is 0 - int refcount; - // Time when job_stop was called for the job. - uint64_t stopped_time; - // If SIGTERM was already sent to the job(only send one before SIGKILL) - bool term_sent; - // Data associated with the job - void *data; - // Callbacks - job_exit_cb exit_cb; - rstream_cb stdout_cb, stderr_cb; - // Readable streams(std{out,err}) - RStream *out, *err; - // Writable stream(stdin) - WStream *in; - // Structures for process spawning/management used by libuv - uv_process_t proc; - uv_process_options_t proc_opts; - uv_stdio_container_t stdio[3]; - uv_pipe_t proc_stdin, proc_stdout, proc_stderr; -}; - -static Job *table[MAX_RUNNING_JOBS] = {NULL}; +Job *table[MAX_RUNNING_JOBS] = {NULL}; size_t stop_requests = 0; -static uv_timer_t job_stop_timer; +uv_timer_t job_stop_timer; // Some helpers shared in this module @@ -106,29 +78,10 @@ void job_teardown(void) /// Tries to start a new job. /// -/// @param argv Argument vector for the process. The first item is the -/// executable to run. -/// [consumed] -/// @param data Caller data that will be associated with the job -/// @param writable If true the job stdin will be available for writing with -/// job_write, otherwise it will be redirected to /dev/null -/// @param stdout_cb Callback that will be invoked when data is available -/// on stdout. If NULL stdout will be redirected to /dev/null. -/// @param stderr_cb Callback that will be invoked when data is available -/// on stderr. If NULL stderr will be redirected to /dev/null. -/// @param job_exit_cb Callback that will be invoked when the job exits -/// @param maxmem Maximum amount of memory used by the job WStream /// @param[out] status The job id if the job started successfully, 0 if the job /// table is full, -1 if the program could not be executed. /// @return The job pointer if the job started successfully, NULL otherwise -Job *job_start(char **argv, - void *data, - bool writable, - rstream_cb stdout_cb, - rstream_cb stderr_cb, - job_exit_cb job_exit_cb, - size_t maxmem, - int *status) +Job *job_start(JobOptions opts, int *status) { int i; Job *job; @@ -142,7 +95,7 @@ Job *job_start(char **argv, if (i == MAX_RUNNING_JOBS) { // No free slots - shell_free_argv(argv); + shell_free_argv(opts.argv); *status = 0; return NULL; } @@ -153,92 +106,64 @@ Job *job_start(char **argv, *status = job->id; job->status = -1; job->refcount = 1; - job->data = data; - job->stdout_cb = stdout_cb; - job->stderr_cb = stderr_cb; - job->exit_cb = job_exit_cb; job->stopped_time = 0; job->term_sent = false; - job->proc_opts.file = argv[0]; - job->proc_opts.args = argv; - job->proc_opts.stdio = job->stdio; - job->proc_opts.stdio_count = 3; - job->proc_opts.flags = UV_PROCESS_WINDOWS_HIDE; - job->proc_opts.exit_cb = exit_cb; - job->proc_opts.cwd = NULL; - job->proc_opts.env = NULL; - job->proc.data = NULL; - job->proc_stdin.data = NULL; - job->proc_stdout.data = NULL; - job->proc_stderr.data = NULL; job->in = NULL; job->out = NULL; job->err = NULL; + job->opts = opts; + job->closed = false; - // Initialize the job std{in,out,err} - job->stdio[0].flags = UV_IGNORE; - job->stdio[1].flags = UV_IGNORE; - job->stdio[2].flags = UV_IGNORE; + process_init(job); - if (writable) { - uv_pipe_init(uv_default_loop(), &job->proc_stdin, 0); - job->stdio[0].flags = UV_CREATE_PIPE | UV_READABLE_PIPE; - job->stdio[0].data.stream = (uv_stream_t *)&job->proc_stdin; - handle_set_job((uv_handle_t *)&job->proc_stdin, job); + if (opts.writable) { + handle_set_job((uv_handle_t *)job->proc_stdin, job); job->refcount++; } - if (stdout_cb) { - uv_pipe_init(uv_default_loop(), &job->proc_stdout, 0); - job->stdio[1].flags = UV_CREATE_PIPE | UV_WRITABLE_PIPE; - job->stdio[1].data.stream = (uv_stream_t *)&job->proc_stdout; - handle_set_job((uv_handle_t *)&job->proc_stdout, job); + if (opts.stdout_cb) { + handle_set_job((uv_handle_t *)job->proc_stdout, job); job->refcount++; } - if (stderr_cb) { - uv_pipe_init(uv_default_loop(), &job->proc_stderr, 0); - job->stdio[2].flags = UV_CREATE_PIPE | UV_WRITABLE_PIPE; - job->stdio[2].data.stream = (uv_stream_t *)&job->proc_stderr; - handle_set_job((uv_handle_t *)&job->proc_stderr, job); + if (opts.stderr_cb) { + handle_set_job((uv_handle_t *)job->proc_stderr, job); job->refcount++; } - handle_set_job((uv_handle_t *)&job->proc, job); - // Spawn the job - if (uv_spawn(uv_default_loop(), &job->proc, &job->proc_opts) != 0) { - if (writable) { + if (!process_spawn(job)) { + if (opts.writable) { uv_close((uv_handle_t *)&job->proc_stdin, close_cb); } - if (stdout_cb) { + if (opts.stdout_cb) { uv_close((uv_handle_t *)&job->proc_stdout, close_cb); } - if (stderr_cb) { + if (opts.stderr_cb) { uv_close((uv_handle_t *)&job->proc_stderr, close_cb); } - uv_close((uv_handle_t *)&job->proc, close_cb); + process_close(job); event_poll(0); // Manually invoke the close_cb to free the job resources *status = -1; return NULL; } - if (writable) { - job->in = wstream_new(maxmem); - wstream_set_stream(job->in, (uv_stream_t *)&job->proc_stdin); + if (opts.writable) { + job->in = wstream_new(opts.maxmem); + wstream_set_stream(job->in, job->proc_stdin); } // Start the readable streams - if (stdout_cb) { + if (opts.stdout_cb) { job->out = rstream_new(read_cb, rbuffer_new(JOB_BUFFER_SIZE), job); - rstream_set_stream(job->out, (uv_stream_t *)&job->proc_stdout); + rstream_set_stream(job->out, job->proc_stdout); rstream_start(job->out); } - if (stderr_cb) { + if (opts.stderr_cb) { job->err = rstream_new(read_cb, rbuffer_new(JOB_BUFFER_SIZE), job); - rstream_set_stream(job->err, (uv_stream_t *)&job->proc_stderr); + rstream_set_stream(job->err, job->proc_stderr); rstream_start(job->err); } // Save the job to the table @@ -327,7 +252,8 @@ int job_wait(Job *job, int ms) FUNC_ATTR_NONNULL_ALL // Job exited, collect status and manually invoke close_cb to free the job // resources status = job->status; - close_cb((uv_handle_t *)&job->proc); + job_close_streams(job); + job_decref(job); } else { job->refcount--; } @@ -391,25 +317,7 @@ int job_id(Job *job) /// @return The job data void *job_data(Job *job) { - return job->data; -} - -static void job_exit_callback(Job *job) -{ - // Free the slot now, 'exit_cb' may want to start another job to replace - // this one - table[job->id - 1] = NULL; - - if (job->exit_cb) { - // Invoke the exit callback - job->exit_cb(job, job->data); - } - - if (stop_requests && !--stop_requests) { - // Stop the timer if no more stop requests are pending - DLOG("Stopping job kill timer"); - uv_timer_stop(&job_stop_timer); - } + return job->opts.data; } /// Iterates the table, sending SIGTERM to stopped jobs and SIGKILL to those @@ -428,11 +336,12 @@ static void job_stop_timer_cb(uv_timer_t *handle) if (!job->term_sent && elapsed >= TERM_TIMEOUT) { ILOG("Sending SIGTERM to job(id: %d)", job->id); - uv_process_kill(&job->proc, SIGTERM); + uv_kill(job->pid, SIGTERM); job->term_sent = true; } else if (elapsed >= KILL_TIMEOUT) { ILOG("Sending SIGKILL to job(id: %d)", job->id); - uv_process_kill(&job->proc, SIGKILL); + uv_kill(job->pid, SIGKILL); + process_close(job); } } } @@ -443,48 +352,26 @@ static void read_cb(RStream *rstream, void *data, bool eof) Job *job = data; if (rstream == job->out) { - job->stdout_cb(rstream, data, eof); + job->opts.stdout_cb(rstream, data, eof); if (eof) { close_job_out(job); } } else { - job->stderr_cb(rstream, data, eof); + job->opts.stderr_cb(rstream, data, eof); if (eof) { close_job_err(job); } } } -// Emits a JobExit event if both rstreams are closed -static void exit_cb(uv_process_t *proc, int64_t status, int term_signal) +void job_close_streams(Job *job) { - Job *job = handle_get_job((uv_handle_t *)proc); - - job->status = (int)status; - uv_close((uv_handle_t *)&job->proc, close_cb); + close_job_in(job); + close_job_out(job); + close_job_err(job); } static void close_cb(uv_handle_t *handle) { - Job *job = handle_get_job(handle); - - if (handle == (uv_handle_t *)&job->proc) { - // Make sure all streams are properly closed to trigger callback invocation - // when job->proc is closed - close_job_in(job); - close_job_out(job); - close_job_err(job); - } - - if (--job->refcount == 0) { - // Invoke the exit_cb - job_exit_callback(job); - // Free all memory allocated for the job - free(job->proc.data); - free(job->proc_stdin.data); - free(job->proc_stdout.data); - free(job->proc_stderr.data); - shell_free_argv(job->proc_opts.args); - free(job); - } + job_decref(handle_get_job(handle)); } diff --git a/src/nvim/os/job_defs.h b/src/nvim/os/job_defs.h index a9caa169a8..340ef551be 100644 --- a/src/nvim/os/job_defs.h +++ b/src/nvim/os/job_defs.h @@ -1,7 +1,9 @@ #ifndef NVIM_OS_JOB_DEFS_H #define NVIM_OS_JOB_DEFS_H +#include #include "nvim/os/rstream_defs.h" +#include "nvim/os/wstream_defs.h" typedef struct job Job; @@ -11,4 +13,39 @@ typedef struct job Job; /// @param data Some data associated with the job by the caller typedef void (*job_exit_cb)(Job *job, void *data); +// Job startup options +// job_exit_cb Callback that will be invoked when the job exits +// maxmem Maximum amount of memory used by the job WStream +typedef struct { + // Argument vector for the process. The first item is the + // executable to run. + // [consumed] + char **argv; + // Caller data that will be associated with the job + void *data; + // If true the job stdin will be available for writing with job_write, + // otherwise it will be redirected to /dev/null + bool writable; + // Callback that will be invoked when data is available on stdout. If NULL + // stdout will be redirected to /dev/null. + rstream_cb stdout_cb; + // Callback that will be invoked when data is available on stderr. If NULL + // stderr will be redirected to /dev/null. + rstream_cb stderr_cb; + // Callback that will be invoked when the job has exited and will not send + // data + job_exit_cb exit_cb; + // Maximum memory used by the job's WStream + size_t maxmem; +} JobOptions; + +#define JOB_OPTIONS_INIT ((JobOptions) { \ + .argv = NULL, \ + .data = NULL, \ + .writable = true, \ + .stdout_cb = NULL, \ + .stderr_cb = NULL, \ + .exit_cb = NULL, \ + .maxmem = 0 \ + }) #endif // NVIM_OS_JOB_DEFS_H diff --git a/src/nvim/os/job_private.h b/src/nvim/os/job_private.h new file mode 100644 index 0000000000..1beaa1bd70 --- /dev/null +++ b/src/nvim/os/job_private.h @@ -0,0 +1,104 @@ +#ifndef NVIM_OS_JOB_PRIVATE_H +#define NVIM_OS_JOB_PRIVATE_H + +#include + +#include + +#include "nvim/os/rstream_defs.h" +#include "nvim/os/wstream_defs.h" +#include "nvim/os/pipe_process.h" +#include "nvim/os/shell.h" +#include "nvim/log.h" + +struct job { + // Job id the index in the job table plus one. + int id; + // Process id + int pid; + // Exit status code of the job process + int status; + // Number of references to the job. The job resources will only be freed by + // close_cb when this is 0 + int refcount; + // Time when job_stop was called for the job. + uint64_t stopped_time; + // If SIGTERM was already sent to the job(only send one before SIGKILL) + bool term_sent; + // Readable streams(std{out,err}) + RStream *out, *err; + // Writable stream(stdin) + WStream *in; + // Libuv streams representing stdin/stdout/stderr + uv_stream_t *proc_stdin, *proc_stdout, *proc_stderr; + // Extra data set by the process spawner + void *process; + // If process_close has been called on this job + bool closed; + // Startup options + JobOptions opts; +}; + +extern Job *table[]; +extern size_t stop_requests; +extern uv_timer_t job_stop_timer; + +static inline bool process_spawn(Job *job) +{ + return pipe_process_spawn(job); +} + +static inline void process_init(Job *job) +{ + pipe_process_init(job); +} + +static inline void process_close(Job *job) +{ + if (job->closed) { + return; + } + job->closed = true; + pipe_process_close(job); +} + +static inline void process_destroy(Job *job) +{ + pipe_process_destroy(job); +} + +static inline void job_exit_callback(Job *job) +{ + // Free the slot now, 'exit_cb' may want to start another job to replace + // this one + table[job->id - 1] = NULL; + + if (job->opts.exit_cb) { + // Invoke the exit callback + job->opts.exit_cb(job, job->opts.data); + } + + if (stop_requests && !--stop_requests) { + // Stop the timer if no more stop requests are pending + DLOG("Stopping job kill timer"); + uv_timer_stop(&job_stop_timer); + } +} + +static inline void job_decref(Job *job) +{ + if (--job->refcount == 0) { + // Invoke the exit_cb + job_exit_callback(job); + // Free all memory allocated for the job + free(job->proc_stdin->data); + free(job->proc_stdout->data); + free(job->proc_stderr->data); + shell_free_argv(job->opts.argv); + process_destroy(job); + free(job); + } +} + + +#endif // NVIM_OS_JOB_PRIVATE_H diff --git a/src/nvim/os/pipe_process.c b/src/nvim/os/pipe_process.c new file mode 100644 index 0000000000..5535c3fe93 --- /dev/null +++ b/src/nvim/os/pipe_process.c @@ -0,0 +1,110 @@ +#include +#include + +#include + +#include "nvim/os/uv_helpers.h" +#include "nvim/os/job.h" +#include "nvim/os/job_defs.h" +#include "nvim/os/job_private.h" +#include "nvim/os/pipe_process.h" +#include "nvim/memory.h" + +#ifdef INCLUDE_GENERATED_DECLARATIONS +# include "os/pipe_process.c.generated.h" +#endif + +typedef struct { + // Structures for process spawning/management used by libuv + uv_process_t proc; + uv_process_options_t proc_opts; + uv_stdio_container_t stdio[3]; + uv_pipe_t proc_stdin, proc_stdout, proc_stderr; +} UvProcess; + +void pipe_process_init(Job *job) +{ + UvProcess *pipeproc = xmalloc(sizeof(UvProcess)); + pipeproc->proc_opts.file = job->opts.argv[0]; + pipeproc->proc_opts.args = job->opts.argv; + pipeproc->proc_opts.stdio = pipeproc->stdio; + pipeproc->proc_opts.stdio_count = 3; + pipeproc->proc_opts.flags = UV_PROCESS_WINDOWS_HIDE; + pipeproc->proc_opts.exit_cb = exit_cb; + pipeproc->proc_opts.cwd = NULL; + pipeproc->proc_opts.env = NULL; + pipeproc->proc.data = NULL; + pipeproc->proc_stdin.data = NULL; + pipeproc->proc_stdout.data = NULL; + pipeproc->proc_stderr.data = NULL; + + // Initialize the job std{in,out,err} + pipeproc->stdio[0].flags = UV_IGNORE; + pipeproc->stdio[1].flags = UV_IGNORE; + pipeproc->stdio[2].flags = UV_IGNORE; + + handle_set_job((uv_handle_t *)&pipeproc->proc, job); + + if (job->opts.writable) { + uv_pipe_init(uv_default_loop(), &pipeproc->proc_stdin, 0); + pipeproc->stdio[0].flags = UV_CREATE_PIPE | UV_READABLE_PIPE; + pipeproc->stdio[0].data.stream = (uv_stream_t *)&pipeproc->proc_stdin; + } + + if (job->opts.stdout_cb) { + uv_pipe_init(uv_default_loop(), &pipeproc->proc_stdout, 0); + pipeproc->stdio[1].flags = UV_CREATE_PIPE | UV_WRITABLE_PIPE; + pipeproc->stdio[1].data.stream = (uv_stream_t *)&pipeproc->proc_stdout; + } + + if (job->opts.stderr_cb) { + uv_pipe_init(uv_default_loop(), &pipeproc->proc_stderr, 0); + pipeproc->stdio[2].flags = UV_CREATE_PIPE | UV_WRITABLE_PIPE; + pipeproc->stdio[2].data.stream = (uv_stream_t *)&pipeproc->proc_stderr; + } + + job->proc_stdin = (uv_stream_t *)&pipeproc->proc_stdin; + job->proc_stdout = (uv_stream_t *)&pipeproc->proc_stdout; + job->proc_stderr = (uv_stream_t *)&pipeproc->proc_stderr; + job->process = pipeproc; +} + +void pipe_process_destroy(Job *job) +{ + UvProcess *pipeproc = job->process; + free(pipeproc->proc.data); + free(pipeproc); + job->process = NULL; +} + +bool pipe_process_spawn(Job *job) +{ + UvProcess *pipeproc = job->process; + + if (uv_spawn(uv_default_loop(), &pipeproc->proc, &pipeproc->proc_opts) != 0) { + return false; + } + + job->pid = pipeproc->proc.pid; + return true; +} + +void pipe_process_close(Job *job) +{ + UvProcess *pipeproc = job->process; + uv_close((uv_handle_t *)&pipeproc->proc, close_cb); +} + +static void exit_cb(uv_process_t *proc, int64_t status, int term_signal) +{ + Job *job = handle_get_job((uv_handle_t *)proc); + job->status = (int)status; + pipe_process_close(job); +} + +static void close_cb(uv_handle_t *handle) +{ + Job *job = handle_get_job(handle); + job_close_streams(job); + job_decref(job); +} diff --git a/src/nvim/os/pipe_process.h b/src/nvim/os/pipe_process.h new file mode 100644 index 0000000000..17a4255ddc --- /dev/null +++ b/src/nvim/os/pipe_process.h @@ -0,0 +1,7 @@ +#ifndef NVIM_OS_PIPE_PROCESS_H +#define NVIM_OS_PIPE_PROCESS_H + +#ifdef INCLUDE_GENERATED_DECLARATIONS +# include "os/pipe_process.h.generated.h" +#endif +#endif // NVIM_OS_PIPE_PROCESS_H diff --git a/src/nvim/os/shell.c b/src/nvim/os/shell.c index 32c7ea564d..8cf7e7161d 100644 --- a/src/nvim/os/shell.c +++ b/src/nvim/os/shell.c @@ -201,14 +201,14 @@ static int shell(const char *cmd, char **argv = shell_build_argv(cmd, extra_args); int status; - Job *job = job_start(argv, - &buf, - input != NULL, - data_cb, - data_cb, - NULL, - 0, - &status); + JobOptions opts = JOB_OPTIONS_INIT; + opts.argv = argv; + opts.data = &buf; + opts.writable = input != NULL; + opts.stdout_cb = data_cb; + opts.stderr_cb = data_cb; + opts.exit_cb = NULL; + Job *job = job_start(opts, &status); if (status <= 0) { // Failed, probably due to `sh` not being executable