diff options
Diffstat (limited to 'run-command.c')
-rw-r--r-- | run-command.c | 1335 |
1 files changed, 1335 insertions, 0 deletions
diff --git a/run-command.c b/run-command.c new file mode 100644 index 0000000..5a4dbb6 --- /dev/null +++ b/run-command.c @@ -0,0 +1,1335 @@ +#include "cache.h" +#include "run-command.h" +#include "exec_cmd.h" +#include "sigchain.h" +#include "argv-array.h" +#include "thread-utils.h" +#include "strbuf.h" + +void child_process_init(struct child_process *child) +{ + memset(child, 0, sizeof(*child)); + argv_array_init(&child->args); + argv_array_init(&child->env_array); +} + +void child_process_clear(struct child_process *child) +{ + argv_array_clear(&child->args); + argv_array_clear(&child->env_array); +} + +struct child_to_clean { + pid_t pid; + struct child_to_clean *next; +}; +static struct child_to_clean *children_to_clean; +static int installed_child_cleanup_handler; + +static void cleanup_children(int sig, int in_signal) +{ + while (children_to_clean) { + struct child_to_clean *p = children_to_clean; + children_to_clean = p->next; + kill(p->pid, sig); + if (!in_signal) + free(p); + } +} + +static void cleanup_children_on_signal(int sig) +{ + cleanup_children(sig, 1); + sigchain_pop(sig); + raise(sig); +} + +static void cleanup_children_on_exit(void) +{ + cleanup_children(SIGTERM, 0); +} + +static void mark_child_for_cleanup(pid_t pid) +{ + struct child_to_clean *p = xmalloc(sizeof(*p)); + p->pid = pid; + p->next = children_to_clean; + children_to_clean = p; + + if (!installed_child_cleanup_handler) { + atexit(cleanup_children_on_exit); + sigchain_push_common(cleanup_children_on_signal); + installed_child_cleanup_handler = 1; + } +} + +static void clear_child_for_cleanup(pid_t pid) +{ + struct child_to_clean **pp; + + for (pp = &children_to_clean; *pp; pp = &(*pp)->next) { + struct child_to_clean *clean_me = *pp; + + if (clean_me->pid == pid) { + *pp = clean_me->next; + free(clean_me); + return; + } + } +} + +static inline void close_pair(int fd[2]) +{ + close(fd[0]); + close(fd[1]); +} + +#ifndef GIT_WINDOWS_NATIVE +static inline void dup_devnull(int to) +{ + int fd = open("/dev/null", O_RDWR); + if (fd < 0) + die_errno(_("open /dev/null failed")); + if (dup2(fd, to) < 0) + die_errno(_("dup2(%d,%d) failed"), fd, to); + close(fd); +} +#endif + +static char *locate_in_PATH(const char *file) +{ + const char *p = getenv("PATH"); + struct strbuf buf = STRBUF_INIT; + + if (!p || !*p) + return NULL; + + while (1) { + const char *end = strchrnul(p, ':'); + + strbuf_reset(&buf); + + /* POSIX specifies an empty entry as the current directory. */ + if (end != p) { + strbuf_add(&buf, p, end - p); + strbuf_addch(&buf, '/'); + } + strbuf_addstr(&buf, file); + + if (!access(buf.buf, F_OK)) + return strbuf_detach(&buf, NULL); + + if (!*end) + break; + p = end + 1; + } + + strbuf_release(&buf); + return NULL; +} + +static int exists_in_PATH(const char *file) +{ + char *r = locate_in_PATH(file); + free(r); + return r != NULL; +} + +int sane_execvp(const char *file, char * const argv[]) +{ + if (!execvp(file, argv)) + return 0; /* cannot happen ;-) */ + + /* + * When a command can't be found because one of the directories + * listed in $PATH is unsearchable, execvp reports EACCES, but + * careful usability testing (read: analysis of occasional bug + * reports) reveals that "No such file or directory" is more + * intuitive. + * + * We avoid commands with "/", because execvp will not do $PATH + * lookups in that case. + * + * The reassignment of EACCES to errno looks like a no-op below, + * but we need to protect against exists_in_PATH overwriting errno. + */ + if (errno == EACCES && !strchr(file, '/')) + errno = exists_in_PATH(file) ? EACCES : ENOENT; + else if (errno == ENOTDIR && !strchr(file, '/')) + errno = ENOENT; + return -1; +} + +static const char **prepare_shell_cmd(struct argv_array *out, const char **argv) +{ + if (!argv[0]) + die("BUG: shell command is empty"); + + if (strcspn(argv[0], "|&;<>()$`\\\"' \t\n*?[#~=%") != strlen(argv[0])) { +#ifndef GIT_WINDOWS_NATIVE + argv_array_push(out, SHELL_PATH); +#else + argv_array_push(out, "sh"); +#endif + argv_array_push(out, "-c"); + + /* + * If we have no extra arguments, we do not even need to + * bother with the "$@" magic. + */ + if (!argv[1]) + argv_array_push(out, argv[0]); + else + argv_array_pushf(out, "%s \"$@\"", argv[0]); + } + + argv_array_pushv(out, argv); + return out->argv; +} + +#ifndef GIT_WINDOWS_NATIVE +static int execv_shell_cmd(const char **argv) +{ + struct argv_array nargv = ARGV_ARRAY_INIT; + prepare_shell_cmd(&nargv, argv); + trace_argv_printf(nargv.argv, "trace: exec:"); + sane_execvp(nargv.argv[0], (char **)nargv.argv); + argv_array_clear(&nargv); + return -1; +} +#endif + +#ifndef GIT_WINDOWS_NATIVE +static int child_notifier = -1; + +static void notify_parent(void) +{ + /* + * execvp failed. If possible, we'd like to let start_command + * know, so failures like ENOENT can be handled right away; but + * otherwise, finish_command will still report the error. + */ + xwrite(child_notifier, "", 1); +} +#endif + +static inline void set_cloexec(int fd) +{ + int flags = fcntl(fd, F_GETFD); + if (flags >= 0) + fcntl(fd, F_SETFD, flags | FD_CLOEXEC); +} + +static int wait_or_whine(pid_t pid, const char *argv0, int in_signal) +{ + int status, code = -1; + pid_t waiting; + int failed_errno = 0; + + while ((waiting = waitpid(pid, &status, 0)) < 0 && errno == EINTR) + ; /* nothing */ + if (in_signal) + return 0; + + if (waiting < 0) { + failed_errno = errno; + error_errno("waitpid for %s failed", argv0); + } else if (waiting != pid) { + error("waitpid is confused (%s)", argv0); + } else if (WIFSIGNALED(status)) { + code = WTERMSIG(status); + if (code != SIGINT && code != SIGQUIT && code != SIGPIPE) + error("%s died of signal %d", argv0, code); + /* + * This return value is chosen so that code & 0xff + * mimics the exit code that a POSIX shell would report for + * a program that died from this signal. + */ + code += 128; + } else if (WIFEXITED(status)) { + code = WEXITSTATUS(status); + /* + * Convert special exit code when execvp failed. + */ + if (code == 127) { + code = -1; + failed_errno = ENOENT; + } + } else { + error("waitpid is confused (%s)", argv0); + } + + clear_child_for_cleanup(pid); + + errno = failed_errno; + return code; +} + +int start_command(struct child_process *cmd) +{ + int need_in, need_out, need_err; + int fdin[2], fdout[2], fderr[2]; + int failed_errno; + char *str; + + if (!cmd->argv) + cmd->argv = cmd->args.argv; + if (!cmd->env) + cmd->env = cmd->env_array.argv; + + /* + * In case of errors we must keep the promise to close FDs + * that have been passed in via ->in and ->out. + */ + + need_in = !cmd->no_stdin && cmd->in < 0; + if (need_in) { + if (pipe(fdin) < 0) { + failed_errno = errno; + if (cmd->out > 0) + close(cmd->out); + str = "standard input"; + goto fail_pipe; + } + cmd->in = fdin[1]; + } + + need_out = !cmd->no_stdout + && !cmd->stdout_to_stderr + && cmd->out < 0; + if (need_out) { + if (pipe(fdout) < 0) { + failed_errno = errno; + if (need_in) + close_pair(fdin); + else if (cmd->in) + close(cmd->in); + str = "standard output"; + goto fail_pipe; + } + cmd->out = fdout[0]; + } + + need_err = !cmd->no_stderr && cmd->err < 0; + if (need_err) { + if (pipe(fderr) < 0) { + failed_errno = errno; + if (need_in) + close_pair(fdin); + else if (cmd->in) + close(cmd->in); + if (need_out) + close_pair(fdout); + else if (cmd->out) + close(cmd->out); + str = "standard error"; +fail_pipe: + error("cannot create %s pipe for %s: %s", + str, cmd->argv[0], strerror(failed_errno)); + child_process_clear(cmd); + errno = failed_errno; + return -1; + } + cmd->err = fderr[0]; + } + + trace_argv_printf(cmd->argv, "trace: run_command:"); + fflush(NULL); + +#ifndef GIT_WINDOWS_NATIVE +{ + int notify_pipe[2]; + if (pipe(notify_pipe)) + notify_pipe[0] = notify_pipe[1] = -1; + + cmd->pid = fork(); + failed_errno = errno; + if (!cmd->pid) { + /* + * Redirect the channel to write syscall error messages to + * before redirecting the process's stderr so that all die() + * in subsequent call paths use the parent's stderr. + */ + if (cmd->no_stderr || need_err) { + int child_err = dup(2); + set_cloexec(child_err); + set_error_handle(fdopen(child_err, "w")); + } + + close(notify_pipe[0]); + set_cloexec(notify_pipe[1]); + child_notifier = notify_pipe[1]; + atexit(notify_parent); + + if (cmd->no_stdin) + dup_devnull(0); + else if (need_in) { + dup2(fdin[0], 0); + close_pair(fdin); + } else if (cmd->in) { + dup2(cmd->in, 0); + close(cmd->in); + } + + if (cmd->no_stderr) + dup_devnull(2); + else if (need_err) { + dup2(fderr[1], 2); + close_pair(fderr); + } else if (cmd->err > 1) { + dup2(cmd->err, 2); + close(cmd->err); + } + + if (cmd->no_stdout) + dup_devnull(1); + else if (cmd->stdout_to_stderr) + dup2(2, 1); + else if (need_out) { + dup2(fdout[1], 1); + close_pair(fdout); + } else if (cmd->out > 1) { + dup2(cmd->out, 1); + close(cmd->out); + } + + if (cmd->dir && chdir(cmd->dir)) + die_errno("exec '%s': cd to '%s' failed", cmd->argv[0], + cmd->dir); + if (cmd->env) { + for (; *cmd->env; cmd->env++) { + if (strchr(*cmd->env, '=')) + putenv((char *)*cmd->env); + else + unsetenv(*cmd->env); + } + } + if (cmd->git_cmd) + execv_git_cmd(cmd->argv); + else if (cmd->use_shell) + execv_shell_cmd(cmd->argv); + else + sane_execvp(cmd->argv[0], (char *const*) cmd->argv); + if (errno == ENOENT) { + if (!cmd->silent_exec_failure) + error("cannot run %s: %s", cmd->argv[0], + strerror(ENOENT)); + exit(127); + } else { + die_errno("cannot exec '%s'", cmd->argv[0]); + } + } + if (cmd->pid < 0) + error_errno("cannot fork() for %s", cmd->argv[0]); + else if (cmd->clean_on_exit) + mark_child_for_cleanup(cmd->pid); + + /* + * Wait for child's execvp. If the execvp succeeds (or if fork() + * failed), EOF is seen immediately by the parent. Otherwise, the + * child process sends a single byte. + * Note that use of this infrastructure is completely advisory, + * therefore, we keep error checks minimal. + */ + close(notify_pipe[1]); + if (read(notify_pipe[0], ¬ify_pipe[1], 1) == 1) { + /* + * At this point we know that fork() succeeded, but execvp() + * failed. Errors have been reported to our stderr. + */ + wait_or_whine(cmd->pid, cmd->argv[0], 0); + failed_errno = errno; + cmd->pid = -1; + } + close(notify_pipe[0]); +} +#else +{ + int fhin = 0, fhout = 1, fherr = 2; + const char **sargv = cmd->argv; + struct argv_array nargv = ARGV_ARRAY_INIT; + + if (cmd->no_stdin) + fhin = open("/dev/null", O_RDWR); + else if (need_in) + fhin = dup(fdin[0]); + else if (cmd->in) + fhin = dup(cmd->in); + + if (cmd->no_stderr) + fherr = open("/dev/null", O_RDWR); + else if (need_err) + fherr = dup(fderr[1]); + else if (cmd->err > 2) + fherr = dup(cmd->err); + + if (cmd->no_stdout) + fhout = open("/dev/null", O_RDWR); + else if (cmd->stdout_to_stderr) + fhout = dup(fherr); + else if (need_out) + fhout = dup(fdout[1]); + else if (cmd->out > 1) + fhout = dup(cmd->out); + + if (cmd->git_cmd) + cmd->argv = prepare_git_cmd(&nargv, cmd->argv); + else if (cmd->use_shell) + cmd->argv = prepare_shell_cmd(&nargv, cmd->argv); + + cmd->pid = mingw_spawnvpe(cmd->argv[0], cmd->argv, (char**) cmd->env, + cmd->dir, fhin, fhout, fherr); + failed_errno = errno; + if (cmd->pid < 0 && (!cmd->silent_exec_failure || errno != ENOENT)) + error_errno("cannot spawn %s", cmd->argv[0]); + if (cmd->clean_on_exit && cmd->pid >= 0) + mark_child_for_cleanup(cmd->pid); + + argv_array_clear(&nargv); + cmd->argv = sargv; + if (fhin != 0) + close(fhin); + if (fhout != 1) + close(fhout); + if (fherr != 2) + close(fherr); +} +#endif + + if (cmd->pid < 0) { + if (need_in) + close_pair(fdin); + else if (cmd->in) + close(cmd->in); + if (need_out) + close_pair(fdout); + else if (cmd->out) + close(cmd->out); + if (need_err) + close_pair(fderr); + else if (cmd->err) + close(cmd->err); + child_process_clear(cmd); + errno = failed_errno; + return -1; + } + + if (need_in) + close(fdin[0]); + else if (cmd->in) + close(cmd->in); + + if (need_out) + close(fdout[1]); + else if (cmd->out) + close(cmd->out); + + if (need_err) + close(fderr[1]); + else if (cmd->err) + close(cmd->err); + + return 0; +} + +int finish_command(struct child_process *cmd) +{ + int ret = wait_or_whine(cmd->pid, cmd->argv[0], 0); + child_process_clear(cmd); + return ret; +} + +int finish_command_in_signal(struct child_process *cmd) +{ + return wait_or_whine(cmd->pid, cmd->argv[0], 1); +} + + +int run_command(struct child_process *cmd) +{ + int code; + + if (cmd->out < 0 || cmd->err < 0) + die("BUG: run_command with a pipe can cause deadlock"); + + code = start_command(cmd); + if (code) + return code; + return finish_command(cmd); +} + +int run_command_v_opt(const char **argv, int opt) +{ + return run_command_v_opt_cd_env(argv, opt, NULL, NULL); +} + +int run_command_v_opt_cd_env(const char **argv, int opt, const char *dir, const char *const *env) +{ + struct child_process cmd = CHILD_PROCESS_INIT; + cmd.argv = argv; + cmd.no_stdin = opt & RUN_COMMAND_NO_STDIN ? 1 : 0; + cmd.git_cmd = opt & RUN_GIT_CMD ? 1 : 0; + cmd.stdout_to_stderr = opt & RUN_COMMAND_STDOUT_TO_STDERR ? 1 : 0; + cmd.silent_exec_failure = opt & RUN_SILENT_EXEC_FAILURE ? 1 : 0; + cmd.use_shell = opt & RUN_USING_SHELL ? 1 : 0; + cmd.clean_on_exit = opt & RUN_CLEAN_ON_EXIT ? 1 : 0; + cmd.dir = dir; + cmd.env = env; + return run_command(&cmd); +} + +#ifndef NO_PTHREADS +static pthread_t main_thread; +static int main_thread_set; +static pthread_key_t async_key; +static pthread_key_t async_die_counter; + +static void *run_thread(void *data) +{ + struct async *async = data; + intptr_t ret; + + if (async->isolate_sigpipe) { + sigset_t mask; + sigemptyset(&mask); + sigaddset(&mask, SIGPIPE); + if (pthread_sigmask(SIG_BLOCK, &mask, NULL) < 0) { + ret = error("unable to block SIGPIPE in async thread"); + return (void *)ret; + } + } + + pthread_setspecific(async_key, async); + ret = async->proc(async->proc_in, async->proc_out, async->data); + return (void *)ret; +} + +static NORETURN void die_async(const char *err, va_list params) +{ + vreportf("fatal: ", err, params); + + if (in_async()) { + struct async *async = pthread_getspecific(async_key); + if (async->proc_in >= 0) + close(async->proc_in); + if (async->proc_out >= 0) + close(async->proc_out); + pthread_exit((void *)128); + } + + exit(128); +} + +static int async_die_is_recursing(void) +{ + void *ret = pthread_getspecific(async_die_counter); + pthread_setspecific(async_die_counter, (void *)1); + return ret != NULL; +} + +int in_async(void) +{ + if (!main_thread_set) + return 0; /* no asyncs started yet */ + return !pthread_equal(main_thread, pthread_self()); +} + +void NORETURN async_exit(int code) +{ + pthread_exit((void *)(intptr_t)code); +} + +#else + +static struct { + void (**handlers)(void); + size_t nr; + size_t alloc; +} git_atexit_hdlrs; + +static int git_atexit_installed; + +static void git_atexit_dispatch(void) +{ + size_t i; + + for (i=git_atexit_hdlrs.nr ; i ; i--) + git_atexit_hdlrs.handlers[i-1](); +} + +static void git_atexit_clear(void) +{ + free(git_atexit_hdlrs.handlers); + memset(&git_atexit_hdlrs, 0, sizeof(git_atexit_hdlrs)); + git_atexit_installed = 0; +} + +#undef atexit +int git_atexit(void (*handler)(void)) +{ + ALLOC_GROW(git_atexit_hdlrs.handlers, git_atexit_hdlrs.nr + 1, git_atexit_hdlrs.alloc); + git_atexit_hdlrs.handlers[git_atexit_hdlrs.nr++] = handler; + if (!git_atexit_installed) { + if (atexit(&git_atexit_dispatch)) + return -1; + git_atexit_installed = 1; + } + return 0; +} +#define atexit git_atexit + +static int process_is_async; +int in_async(void) +{ + return process_is_async; +} + +void NORETURN async_exit(int code) +{ + exit(code); +} + +#endif + +int start_async(struct async *async) +{ + int need_in, need_out; + int fdin[2], fdout[2]; + int proc_in, proc_out; + + need_in = async->in < 0; + if (need_in) { + if (pipe(fdin) < 0) { + if (async->out > 0) + close(async->out); + return error_errno("cannot create pipe"); + } + async->in = fdin[1]; + } + + need_out = async->out < 0; + if (need_out) { + if (pipe(fdout) < 0) { + if (need_in) + close_pair(fdin); + else if (async->in) + close(async->in); + return error_errno("cannot create pipe"); + } + async->out = fdout[0]; + } + + if (need_in) + proc_in = fdin[0]; + else if (async->in) + proc_in = async->in; + else + proc_in = -1; + + if (need_out) + proc_out = fdout[1]; + else if (async->out) + proc_out = async->out; + else + proc_out = -1; + +#ifdef NO_PTHREADS + /* Flush stdio before fork() to avoid cloning buffers */ + fflush(NULL); + + async->pid = fork(); + if (async->pid < 0) { + error_errno("fork (async) failed"); + goto error; + } + if (!async->pid) { + if (need_in) + close(fdin[1]); + if (need_out) + close(fdout[0]); + git_atexit_clear(); + process_is_async = 1; + exit(!!async->proc(proc_in, proc_out, async->data)); + } + + mark_child_for_cleanup(async->pid); + + if (need_in) + close(fdin[0]); + else if (async->in) + close(async->in); + + if (need_out) + close(fdout[1]); + else if (async->out) + close(async->out); +#else + if (!main_thread_set) { + /* + * We assume that the first time that start_async is called + * it is from the main thread. + */ + main_thread_set = 1; + main_thread = pthread_self(); + pthread_key_create(&async_key, NULL); + pthread_key_create(&async_die_counter, NULL); + set_die_routine(die_async); + set_die_is_recursing_routine(async_die_is_recursing); + } + + if (proc_in >= 0) + set_cloexec(proc_in); + if (proc_out >= 0) + set_cloexec(proc_out); + async->proc_in = proc_in; + async->proc_out = proc_out; + { + int err = pthread_create(&async->tid, NULL, run_thread, async); + if (err) { + error_errno("cannot create thread"); + goto error; + } + } +#endif + return 0; + +error: + if (need_in) + close_pair(fdin); + else if (async->in) + close(async->in); + + if (need_out) + close_pair(fdout); + else if (async->out) + close(async->out); + return -1; +} + +int finish_async(struct async *async) +{ +#ifdef NO_PTHREADS + return wait_or_whine(async->pid, "child process", 0); +#else + void *ret = (void *)(intptr_t)(-1); + + if (pthread_join(async->tid, &ret)) + error("pthread_join failed"); + return (int)(intptr_t)ret; +#endif +} + +const char *find_hook(const char *name) +{ + static struct strbuf path = STRBUF_INIT; + + strbuf_reset(&path); + strbuf_git_path(&path, "hooks/%s", name); + if (access(path.buf, X_OK) < 0) + return NULL; + return path.buf; +} + +int run_hook_ve(const char *const *env, const char *name, va_list args) +{ + struct child_process hook = CHILD_PROCESS_INIT; + const char *p; + + p = find_hook(name); + if (!p) + return 0; + + argv_array_push(&hook.args, p); + while ((p = va_arg(args, const char *))) + argv_array_push(&hook.args, p); + hook.env = env; + hook.no_stdin = 1; + hook.stdout_to_stderr = 1; + + return run_command(&hook); +} + +int run_hook_le(const char *const *env, const char *name, ...) +{ + va_list args; + int ret; + + va_start(args, name); + ret = run_hook_ve(env, name, args); + va_end(args); + + return ret; +} + +struct io_pump { + /* initialized by caller */ + int fd; + int type; /* POLLOUT or POLLIN */ + union { + struct { + const char *buf; + size_t len; + } out; + struct { + struct strbuf *buf; + size_t hint; + } in; + } u; + + /* returned by pump_io */ + int error; /* 0 for success, otherwise errno */ + + /* internal use */ + struct pollfd *pfd; +}; + +static int pump_io_round(struct io_pump *slots, int nr, struct pollfd *pfd) +{ + int pollsize = 0; + int i; + + for (i = 0; i < nr; i++) { + struct io_pump *io = &slots[i]; + if (io->fd < 0) + continue; + pfd[pollsize].fd = io->fd; + pfd[pollsize].events = io->type; + io->pfd = &pfd[pollsize++]; + } + + if (!pollsize) + return 0; + + if (poll(pfd, pollsize, -1) < 0) { + if (errno == EINTR) + return 1; + die_errno("poll failed"); + } + + for (i = 0; i < nr; i++) { + struct io_pump *io = &slots[i]; + + if (io->fd < 0) + continue; + + if (!(io->pfd->revents & (POLLOUT|POLLIN|POLLHUP|POLLERR|POLLNVAL))) + continue; + + if (io->type == POLLOUT) { + ssize_t len = xwrite(io->fd, + io->u.out.buf, io->u.out.len); + if (len < 0) { + io->error = errno; + close(io->fd); + io->fd = -1; + } else { + io->u.out.buf += len; + io->u.out.len -= len; + if (!io->u.out.len) { + close(io->fd); + io->fd = -1; + } + } + } + + if (io->type == POLLIN) { + ssize_t len = strbuf_read_once(io->u.in.buf, + io->fd, io->u.in.hint); + if (len < 0) + io->error = errno; + if (len <= 0) { + close(io->fd); + io->fd = -1; + } + } + } + + return 1; +} + +static int pump_io(struct io_pump *slots, int nr) +{ + struct pollfd *pfd; + int i; + + for (i = 0; i < nr; i++) + slots[i].error = 0; + + ALLOC_ARRAY(pfd, nr); + while (pump_io_round(slots, nr, pfd)) + ; /* nothing */ + free(pfd); + + /* There may be multiple errno values, so just pick the first. */ + for (i = 0; i < nr; i++) { + if (slots[i].error) { + errno = slots[i].error; + return -1; + } + } + return 0; +} + + +int pipe_command(struct child_process *cmd, + const char *in, size_t in_len, + struct strbuf *out, size_t out_hint, + struct strbuf *err, size_t err_hint) +{ + struct io_pump io[3]; + int nr = 0; + + if (in) + cmd->in = -1; + if (out) + cmd->out = -1; + if (err) + cmd->err = -1; + + if (start_command(cmd) < 0) + return -1; + + if (in) { + io[nr].fd = cmd->in; + io[nr].type = POLLOUT; + io[nr].u.out.buf = in; + io[nr].u.out.len = in_len; + nr++; + } + if (out) { + io[nr].fd = cmd->out; + io[nr].type = POLLIN; + io[nr].u.in.buf = out; + io[nr].u.in.hint = out_hint; + nr++; + } + if (err) { + io[nr].fd = cmd->err; + io[nr].type = POLLIN; + io[nr].u.in.buf = err; + io[nr].u.in.hint = err_hint; + nr++; + } + + if (pump_io(io, nr) < 0) { + finish_command(cmd); /* throw away exit code */ + return -1; + } + + return finish_command(cmd); +} + +enum child_state { + GIT_CP_FREE, + GIT_CP_WORKING, + GIT_CP_WAIT_CLEANUP, +}; + +struct parallel_processes { + void *data; + + int max_processes; + int nr_processes; + + get_next_task_fn get_next_task; + start_failure_fn start_failure; + task_finished_fn task_finished; + + struct { + enum child_state state; + struct child_process process; + struct strbuf err; + void *data; + } *children; + /* + * The struct pollfd is logically part of *children, + * but the system call expects it as its own array. + */ + struct pollfd *pfd; + + unsigned shutdown : 1; + + int output_owner; + struct strbuf buffered_output; /* of finished children */ +}; + +static int default_start_failure(struct strbuf *out, + void *pp_cb, + void *pp_task_cb) +{ + return 0; +} + +static int default_task_finished(int result, + struct strbuf *out, + void *pp_cb, + void *pp_task_cb) +{ + return 0; +} + +static void kill_children(struct parallel_processes *pp, int signo) +{ + int i, n = pp->max_processes; + + for (i = 0; i < n; i++) + if (pp->children[i].state == GIT_CP_WORKING) + kill(pp->children[i].process.pid, signo); +} + +static struct parallel_processes *pp_for_signal; + +static void handle_children_on_signal(int signo) +{ + kill_children(pp_for_signal, signo); + sigchain_pop(signo); + raise(signo); +} + +static void pp_init(struct parallel_processes *pp, + int n, + get_next_task_fn get_next_task, + start_failure_fn start_failure, + task_finished_fn task_finished, + void *data) +{ + int i; + + if (n < 1) + n = online_cpus(); + + pp->max_processes = n; + + trace_printf("run_processes_parallel: preparing to run up to %d tasks", n); + + pp->data = data; + if (!get_next_task) + die("BUG: you need to specify a get_next_task function"); + pp->get_next_task = get_next_task; + + pp->start_failure = start_failure ? start_failure : default_start_failure; + pp->task_finished = task_finished ? task_finished : default_task_finished; + + pp->nr_processes = 0; + pp->output_owner = 0; + pp->shutdown = 0; + pp->children = xcalloc(n, sizeof(*pp->children)); + pp->pfd = xcalloc(n, sizeof(*pp->pfd)); + strbuf_init(&pp->buffered_output, 0); + + for (i = 0; i < n; i++) { + strbuf_init(&pp->children[i].err, 0); + child_process_init(&pp->children[i].process); + pp->pfd[i].events = POLLIN | POLLHUP; + pp->pfd[i].fd = -1; + } + + pp_for_signal = pp; + sigchain_push_common(handle_children_on_signal); +} + +static void pp_cleanup(struct parallel_processes *pp) +{ + int i; + + trace_printf("run_processes_parallel: done"); + for (i = 0; i < pp->max_processes; i++) { + strbuf_release(&pp->children[i].err); + child_process_clear(&pp->children[i].process); + } + + free(pp->children); + free(pp->pfd); + + /* + * When get_next_task added messages to the buffer in its last + * iteration, the buffered output is non empty. + */ + strbuf_write(&pp->buffered_output, stderr); + strbuf_release(&pp->buffered_output); + + sigchain_pop_common(); +} + +/* returns + * 0 if a new task was started. + * 1 if no new jobs was started (get_next_task ran out of work, non critical + * problem with starting a new command) + * <0 no new job was started, user wishes to shutdown early. Use negative code + * to signal the children. + */ +static int pp_start_one(struct parallel_processes *pp) +{ + int i, code; + + for (i = 0; i < pp->max_processes; i++) + if (pp->children[i].state == GIT_CP_FREE) + break; + if (i == pp->max_processes) + die("BUG: bookkeeping is hard"); + + code = pp->get_next_task(&pp->children[i].process, + &pp->children[i].err, + pp->data, + &pp->children[i].data); + if (!code) { + strbuf_addbuf(&pp->buffered_output, &pp->children[i].err); + strbuf_reset(&pp->children[i].err); + return 1; + } + pp->children[i].process.err = -1; + pp->children[i].process.stdout_to_stderr = 1; + pp->children[i].process.no_stdin = 1; + + if (start_command(&pp->children[i].process)) { + code = pp->start_failure(&pp->children[i].err, + pp->data, + &pp->children[i].data); + strbuf_addbuf(&pp->buffered_output, &pp->children[i].err); + strbuf_reset(&pp->children[i].err); + if (code) + pp->shutdown = 1; + return code; + } + + pp->nr_processes++; + pp->children[i].state = GIT_CP_WORKING; + pp->pfd[i].fd = pp->children[i].process.err; + return 0; +} + +static void pp_buffer_stderr(struct parallel_processes *pp, int output_timeout) +{ + int i; + + while ((i = poll(pp->pfd, pp->max_processes, output_timeout)) < 0) { + if (errno == EINTR) + continue; + pp_cleanup(pp); + die_errno("poll"); + } + + /* Buffer output from all pipes. */ + for (i = 0; i < pp->max_processes; i++) { + if (pp->children[i].state == GIT_CP_WORKING && + pp->pfd[i].revents & (POLLIN | POLLHUP)) { + int n = strbuf_read_once(&pp->children[i].err, + pp->children[i].process.err, 0); + if (n == 0) { + close(pp->children[i].process.err); + pp->children[i].state = GIT_CP_WAIT_CLEANUP; + } else if (n < 0) + if (errno != EAGAIN) + die_errno("read"); + } + } +} + +static void pp_output(struct parallel_processes *pp) +{ + int i = pp->output_owner; + if (pp->children[i].state == GIT_CP_WORKING && + pp->children[i].err.len) { + strbuf_write(&pp->children[i].err, stderr); + strbuf_reset(&pp->children[i].err); + } +} + +static int pp_collect_finished(struct parallel_processes *pp) +{ + int i, code; + int n = pp->max_processes; + int result = 0; + + while (pp->nr_processes > 0) { + for (i = 0; i < pp->max_processes; i++) + if (pp->children[i].state == GIT_CP_WAIT_CLEANUP) + break; + if (i == pp->max_processes) + break; + + code = finish_command(&pp->children[i].process); + + code = pp->task_finished(code, + &pp->children[i].err, pp->data, + &pp->children[i].data); + + if (code) + result = code; + if (code < 0) + break; + + pp->nr_processes--; + pp->children[i].state = GIT_CP_FREE; + pp->pfd[i].fd = -1; + child_process_init(&pp->children[i].process); + + if (i != pp->output_owner) { + strbuf_addbuf(&pp->buffered_output, &pp->children[i].err); + strbuf_reset(&pp->children[i].err); + } else { + strbuf_write(&pp->children[i].err, stderr); + strbuf_reset(&pp->children[i].err); + + /* Output all other finished child processes */ + strbuf_write(&pp->buffered_output, stderr); + strbuf_reset(&pp->buffered_output); + + /* + * Pick next process to output live. + * NEEDSWORK: + * For now we pick it randomly by doing a round + * robin. Later we may want to pick the one with + * the most output or the longest or shortest + * running process time. + */ + for (i = 0; i < n; i++) + if (pp->children[(pp->output_owner + i) % n].state == GIT_CP_WORKING) + break; + pp->output_owner = (pp->output_owner + i) % n; + } + } + return result; +} + +int run_processes_parallel(int n, + get_next_task_fn get_next_task, + start_failure_fn start_failure, + task_finished_fn task_finished, + void *pp_cb) +{ + int i, code; + int output_timeout = 100; + int spawn_cap = 4; + struct parallel_processes pp; + + pp_init(&pp, n, get_next_task, start_failure, task_finished, pp_cb); + while (1) { + for (i = 0; + i < spawn_cap && !pp.shutdown && + pp.nr_processes < pp.max_processes; + i++) { + code = pp_start_one(&pp); + if (!code) + continue; + if (code < 0) { + pp.shutdown = 1; + kill_children(&pp, -code); + } + break; + } + if (!pp.nr_processes) + break; + pp_buffer_stderr(&pp, output_timeout); + pp_output(&pp); + code = pp_collect_finished(&pp); + if (code) { + pp.shutdown = 1; + if (code < 0) + kill_children(&pp, -code); + } + } + + pp_cleanup(&pp); + return 0; +} |