#include "cache.h" #include "simple-ipc.h" #include "strbuf.h" #include "pkt-line.h" #include "thread-utils.h" #include "unix-socket.h" #include "unix-stream-server.h" #ifndef SUPPORTS_SIMPLE_IPC /* * This source file should only be compiled when Simple IPC is supported. * See the top-level Makefile. */ #error SUPPORTS_SIMPLE_IPC not defined #endif enum ipc_active_state ipc_get_active_state(const char *path) { enum ipc_active_state state = IPC_STATE__OTHER_ERROR; struct ipc_client_connect_options options = IPC_CLIENT_CONNECT_OPTIONS_INIT; struct stat st; struct ipc_client_connection *connection_test = NULL; options.wait_if_busy = 0; options.wait_if_not_found = 0; if (lstat(path, &st) == -1) { switch (errno) { case ENOENT: case ENOTDIR: return IPC_STATE__NOT_LISTENING; default: return IPC_STATE__INVALID_PATH; } } /* also complain if a plain file is in the way */ if ((st.st_mode & S_IFMT) != S_IFSOCK) return IPC_STATE__INVALID_PATH; /* * Just because the filesystem has a S_IFSOCK type inode * at `path`, doesn't mean it that there is a server listening. * Ping it to be sure. */ state = ipc_client_try_connect(path, &options, &connection_test); ipc_client_close_connection(connection_test); return state; } /* * Retry frequency when trying to connect to a server. * * This value should be short enough that we don't seriously delay our * caller, but not fast enough that our spinning puts pressure on the * system. */ #define WAIT_STEP_MS (50) /* * Try to connect to the server. If the server is just starting up or * is very busy, we may not get a connection the first time. */ static enum ipc_active_state connect_to_server( const char *path, int timeout_ms, const struct ipc_client_connect_options *options, int *pfd) { int k; *pfd = -1; for (k = 0; k < timeout_ms; k += WAIT_STEP_MS) { int fd = unix_stream_connect(path, options->uds_disallow_chdir); if (fd != -1) { *pfd = fd; return IPC_STATE__LISTENING; } if (errno == ENOENT) { if (!options->wait_if_not_found) return IPC_STATE__PATH_NOT_FOUND; goto sleep_and_try_again; } if (errno == ETIMEDOUT) { if (!options->wait_if_busy) return IPC_STATE__NOT_LISTENING; goto sleep_and_try_again; } if (errno == ECONNREFUSED) { if (!options->wait_if_busy) return IPC_STATE__NOT_LISTENING; goto sleep_and_try_again; } return IPC_STATE__OTHER_ERROR; sleep_and_try_again: sleep_millisec(WAIT_STEP_MS); } return IPC_STATE__NOT_LISTENING; } /* * The total amount of time that we are willing to wait when trying to * connect to a server. * * When the server is first started, it might take a little while for * it to become ready to service requests. Likewise, the server may * be very (temporarily) busy and not respond to our connections. * * We should gracefully and silently handle those conditions and try * again for a reasonable time period. * * The value chosen here should be long enough for the server * to reliably heal from the above conditions. */ #define MY_CONNECTION_TIMEOUT_MS (1000) enum ipc_active_state ipc_client_try_connect( const char *path, const struct ipc_client_connect_options *options, struct ipc_client_connection **p_connection) { enum ipc_active_state state = IPC_STATE__OTHER_ERROR; int fd = -1; *p_connection = NULL; trace2_region_enter("ipc-client", "try-connect", NULL); trace2_data_string("ipc-client", NULL, "try-connect/path", path); state = connect_to_server(path, MY_CONNECTION_TIMEOUT_MS, options, &fd); trace2_data_intmax("ipc-client", NULL, "try-connect/state", (intmax_t)state); trace2_region_leave("ipc-client", "try-connect", NULL); if (state == IPC_STATE__LISTENING) { (*p_connection) = xcalloc(1, sizeof(struct ipc_client_connection)); (*p_connection)->fd = fd; } return state; } void ipc_client_close_connection(struct ipc_client_connection *connection) { if (!connection) return; if (connection->fd != -1) close(connection->fd); free(connection); } int ipc_client_send_command_to_connection( struct ipc_client_connection *connection, const char *message, size_t message_len, struct strbuf *answer) { int ret = 0; strbuf_setlen(answer, 0); trace2_region_enter("ipc-client", "send-command", NULL); if (write_packetized_from_buf_no_flush(message, message_len, connection->fd) < 0 || packet_flush_gently(connection->fd) < 0) { ret = error(_("could not send IPC command")); goto done; } if (read_packetized_to_strbuf( connection->fd, answer, PACKET_READ_GENTLE_ON_EOF | PACKET_READ_GENTLE_ON_READ_ERROR) < 0) { ret = error(_("could not read IPC response")); goto done; } done: trace2_region_leave("ipc-client", "send-command", NULL); return ret; } int ipc_client_send_command(const char *path, const struct ipc_client_connect_options *options, const char *message, size_t message_len, struct strbuf *answer) { int ret = -1; enum ipc_active_state state; struct ipc_client_connection *connection = NULL; state = ipc_client_try_connect(path, options, &connection); if (state != IPC_STATE__LISTENING) return ret; ret = ipc_client_send_command_to_connection(connection, message, message_len, answer); ipc_client_close_connection(connection); return ret; } static int set_socket_blocking_flag(int fd, int make_nonblocking) { int flags; flags = fcntl(fd, F_GETFL, NULL); if (flags < 0) return -1; if (make_nonblocking) flags |= O_NONBLOCK; else flags &= ~O_NONBLOCK; return fcntl(fd, F_SETFL, flags); } /* * Magic numbers used to annotate callback instance data. * These are used to help guard against accidentally passing the * wrong instance data across multiple levels of callbacks (which * is easy to do if there are `void*` arguments). */ enum magic { MAGIC_SERVER_REPLY_DATA, MAGIC_WORKER_THREAD_DATA, MAGIC_ACCEPT_THREAD_DATA, MAGIC_SERVER_DATA, }; struct ipc_server_reply_data { enum magic magic; int fd; struct ipc_worker_thread_data *worker_thread_data; }; struct ipc_worker_thread_data { enum magic magic; struct ipc_worker_thread_data *next_thread; struct ipc_server_data *server_data; pthread_t pthread_id; }; struct ipc_accept_thread_data { enum magic magic; struct ipc_server_data *server_data; struct unix_ss_socket *server_socket; int fd_send_shutdown; int fd_wait_shutdown; pthread_t pthread_id; }; /* * With unix-sockets, the conceptual "ipc-server" is implemented as a single * controller "accept-thread" thread and a pool of "worker-thread" threads. * The former does the usual `accept()` loop and dispatches connections * to an idle worker thread. The worker threads wait in an idle loop for * a new connection, communicate with the client and relay data to/from * the `application_cb` and then wait for another connection from the * server thread. This avoids the overhead of constantly creating and * destroying threads. */ struct ipc_server_data { enum magic magic; ipc_server_application_cb *application_cb; void *application_data; struct strbuf buf_path; struct ipc_accept_thread_data *accept_thread; struct ipc_worker_thread_data *worker_thread_list; pthread_mutex_t work_available_mutex; pthread_cond_t work_available_cond; /* * Accepted but not yet processed client connections are kept * in a circular buffer FIFO. The queue is empty when the * positions are equal. */ int *fifo_fds; int queue_size; int back_pos; int front_pos; int shutdown_requested; int is_stopped; }; /* * Remove and return the oldest queued connection. * * Returns -1 if empty. */ static int fifo_dequeue(struct ipc_server_data *server_data) { /* ASSERT holding mutex */ int fd; if (server_data->back_pos == server_data->front_pos) return -1; fd = server_data->fifo_fds[server_data->front_pos]; server_data->fifo_fds[server_data->front_pos] = -1; server_data->front_pos++; if (server_data->front_pos == server_data->queue_size) server_data->front_pos = 0; return fd; } /* * Push a new fd onto the back of the queue. * * Drop it and return -1 if queue is already full. */ static int fifo_enqueue(struct ipc_server_data *server_data, int fd) { /* ASSERT holding mutex */ int next_back_pos; next_back_pos = server_data->back_pos + 1; if (next_back_pos == server_data->queue_size) next_back_pos = 0; if (next_back_pos == server_data->front_pos) { /* Queue is full. Just drop it. */ close(fd); return -1; } server_data->fifo_fds[server_data->back_pos] = fd; server_data->back_pos = next_back_pos; return fd; } /* * Wait for a connection to be queued to the FIFO and return it. * * Returns -1 if someone has already requested a shutdown. */ static int worker_thread__wait_for_connection( struct ipc_worker_thread_data *worker_thread_data) { /* ASSERT NOT holding mutex */ struct ipc_server_data *server_data = worker_thread_data->server_data; int fd = -1; pthread_mutex_lock(&server_data->work_available_mutex); for (;;) { if (server_data->shutdown_requested) break; fd = fifo_dequeue(server_data); if (fd >= 0) break; pthread_cond_wait(&server_data->work_available_cond, &server_data->work_available_mutex); } pthread_mutex_unlock(&server_data->work_available_mutex); return fd; } /* * Forward declare our reply callback function so that any compiler * errors are reported when we actually define the function (in addition * to any errors reported when we try to pass this callback function as * a parameter in a function call). The former are easier to understand. */ static ipc_server_reply_cb do_io_reply_callback; /* * Relay application's response message to the client process. * (We do not flush at this point because we allow the caller * to chunk data to the client thru us.) */ static int do_io_reply_callback(struct ipc_server_reply_data *reply_data, const char *response, size_t response_len) { if (reply_data->magic != MAGIC_SERVER_REPLY_DATA) BUG("reply_cb called with wrong instance data"); return write_packetized_from_buf_no_flush(response, response_len, reply_data->fd); } /* A randomly chosen value. */ #define MY_WAIT_POLL_TIMEOUT_MS (10) /* * If the client hangs up without sending any data on the wire, just * quietly close the socket and ignore this client. * * This worker thread is committed to reading the IPC request data * from the client at the other end of this fd. Wait here for the * client to actually put something on the wire -- because if the * client just does a ping (connect and hangup without sending any * data), our use of the pkt-line read routines will spew an error * message. * * Return -1 if the client hung up. * Return 0 if data (possibly incomplete) is ready. */ static int worker_thread__wait_for_io_start( struct ipc_worker_thread_data *worker_thread_data, int fd) { struct ipc_server_data *server_data = worker_thread_data->server_data; struct pollfd pollfd[1]; int result; for (;;) { pollfd[0].fd = fd; pollfd[0].events = POLLIN; result = poll(pollfd, 1, MY_WAIT_POLL_TIMEOUT_MS); if (result < 0) { if (errno == EINTR) continue; goto cleanup; } if (result == 0) { /* a timeout */ int in_shutdown; pthread_mutex_lock(&server_data->work_available_mutex); in_shutdown = server_data->shutdown_requested; pthread_mutex_unlock(&server_data->work_available_mutex); /* * If a shutdown is already in progress and this * client has not started talking yet, just drop it. */ if (in_shutdown) goto cleanup; continue; } if (pollfd[0].revents & POLLHUP) goto cleanup; if (pollfd[0].revents & POLLIN) return 0; goto cleanup; } cleanup: close(fd); return -1; } /* * Receive the request/command from the client and pass it to the * registered request-callback. The request-callback will compose * a response and call our reply-callback to send it to the client. */ static int worker_thread__do_io( struct ipc_worker_thread_data *worker_thread_data, int fd) { /* ASSERT NOT holding lock */ struct strbuf buf = STRBUF_INIT; struct ipc_server_reply_data reply_data; int ret = 0; reply_data.magic = MAGIC_SERVER_REPLY_DATA; reply_data.worker_thread_data = worker_thread_data; reply_data.fd = fd; ret = read_packetized_to_strbuf( reply_data.fd, &buf, PACKET_READ_GENTLE_ON_EOF | PACKET_READ_GENTLE_ON_READ_ERROR); if (ret >= 0) { ret = worker_thread_data->server_data->application_cb( worker_thread_data->server_data->application_data, buf.buf, buf.len, do_io_reply_callback, &reply_data); packet_flush_gently(reply_data.fd); } else { /* * The client probably disconnected/shutdown before it * could send a well-formed message. Ignore it. */ } strbuf_release(&buf); close(reply_data.fd); return ret; } /* * Block SIGPIPE on the current thread (so that we get EPIPE from * write() rather than an actual signal). * * Note that using sigchain_push() and _pop() to control SIGPIPE * around our IO calls is not thread safe: * [] It uses a global stack of handler frames. * [] It uses ALLOC_GROW() to resize it. * [] Finally, according to the `signal(2)` man-page: * "The effects of `signal()` in a multithreaded process are unspecified." */ static void thread_block_sigpipe(sigset_t *old_set) { sigset_t new_set; sigemptyset(&new_set); sigaddset(&new_set, SIGPIPE); sigemptyset(old_set); pthread_sigmask(SIG_BLOCK, &new_set, old_set); } /* * Thread proc for an IPC worker thread. It handles a series of * connections from clients. It pulls the next fd from the queue * processes it, and then waits for the next client. * * Block SIGPIPE in this worker thread for the life of the thread. * This avoids stray (and sometimes delayed) SIGPIPE signals caused * by client errors and/or when we are under extremely heavy IO load. * * This means that the application callback will have SIGPIPE blocked. * The callback should not change it. */ static void *worker_thread_proc(void *_worker_thread_data) { struct ipc_worker_thread_data *worker_thread_data = _worker_thread_data; struct ipc_server_data *server_data = worker_thread_data->server_data; sigset_t old_set; int fd, io; int ret; trace2_thread_start("ipc-worker"); thread_block_sigpipe(&old_set); for (;;) { fd = worker_thread__wait_for_connection(worker_thread_data); if (fd == -1) break; /* in shutdown */ io = worker_thread__wait_for_io_start(worker_thread_data, fd); if (io == -1) continue; /* client hung up without sending anything */ ret = worker_thread__do_io(worker_thread_data, fd); if (ret == SIMPLE_IPC_QUIT) { trace2_data_string("ipc-worker", NULL, "queue_stop_async", "application_quit"); /* * The application layer is telling the ipc-server * layer to shutdown. * * We DO NOT have a response to send to the client. * * Queue an async stop (to stop the other threads) and * allow this worker thread to exit now (no sense waiting * for the thread-pool shutdown signal). * * Other non-idle worker threads are allowed to finish * responding to their current clients. */ ipc_server_stop_async(server_data); break; } } trace2_thread_exit(); return NULL; } /* A randomly chosen value. */ #define MY_ACCEPT_POLL_TIMEOUT_MS (60 * 1000) /* * Accept a new client connection on our socket. This uses non-blocking * IO so that we can also wait for shutdown requests on our socket-pair * without actually spinning on a fast timeout. */ static int accept_thread__wait_for_connection( struct ipc_accept_thread_data *accept_thread_data) { struct pollfd pollfd[2]; int result; for (;;) { pollfd[0].fd = accept_thread_data->fd_wait_shutdown; pollfd[0].events = POLLIN; pollfd[1].fd = accept_thread_data->server_socket->fd_socket; pollfd[1].events = POLLIN; result = poll(pollfd, 2, MY_ACCEPT_POLL_TIMEOUT_MS); if (result < 0) { if (errno == EINTR) continue; return result; } if (result == 0) { /* a timeout */ /* * If someone deletes or force-creates a new unix * domain socket at our path, all future clients * will be routed elsewhere and we silently starve. * If that happens, just queue a shutdown. */ if (unix_ss_was_stolen( accept_thread_data->server_socket)) { trace2_data_string("ipc-accept", NULL, "queue_stop_async", "socket_stolen"); ipc_server_stop_async( accept_thread_data->server_data); } continue; } if (pollfd[0].revents & POLLIN) { /* shutdown message queued to socketpair */ return -1; } if (pollfd[1].revents & POLLIN) { /* a connection is available on server_socket */ int client_fd = accept(accept_thread_data->server_socket->fd_socket, NULL, NULL); if (client_fd >= 0) return client_fd; /* * An error here is unlikely -- it probably * indicates that the connecting process has * already dropped the connection. */ continue; } BUG("unandled poll result errno=%d r[0]=%d r[1]=%d", errno, pollfd[0].revents, pollfd[1].revents); } } /* * Thread proc for the IPC server "accept thread". This waits for * an incoming socket connection, appends it to the queue of available * connections, and notifies a worker thread to process it. * * Block SIGPIPE in this thread for the life of the thread. This * avoids any stray SIGPIPE signals when closing pipe fds under * extremely heavy loads (such as when the fifo queue is full and we * drop incomming connections). */ static void *accept_thread_proc(void *_accept_thread_data) { struct ipc_accept_thread_data *accept_thread_data = _accept_thread_data; struct ipc_server_data *server_data = accept_thread_data->server_data; sigset_t old_set; trace2_thread_start("ipc-accept"); thread_block_sigpipe(&old_set); for (;;) { int client_fd = accept_thread__wait_for_connection( accept_thread_data); pthread_mutex_lock(&server_data->work_available_mutex); if (server_data->shutdown_requested) { pthread_mutex_unlock(&server_data->work_available_mutex); if (client_fd >= 0) close(client_fd); break; } if (client_fd < 0) { /* ignore transient accept() errors */ } else { fifo_enqueue(server_data, client_fd); pthread_cond_broadcast(&server_data->work_available_cond); } pthread_mutex_unlock(&server_data->work_available_mutex); } trace2_thread_exit(); return NULL; } /* * We can't predict the connection arrival rate relative to the worker * processing rate, therefore we allow the "accept-thread" to queue up * a generous number of connections, since we'd rather have the client * not unnecessarily timeout if we can avoid it. (The assumption is * that this will be used for FSMonitor and a few second wait on a * connection is better than having the client timeout and do the full * computation itself.) * * The FIFO queue size is set to a multiple of the worker pool size. * This value chosen at random. */ #define FIFO_SCALE (100) /* * The backlog value for `listen(2)`. This doesn't need to huge, * rather just large enough for our "accept-thread" to wake up and * queue incoming connections onto the FIFO without the kernel * dropping any. * * This value chosen at random. */ #define LISTEN_BACKLOG (50) static int create_listener_socket( const char *path, const struct ipc_server_opts *ipc_opts, struct unix_ss_socket **new_server_socket) { struct unix_ss_socket *server_socket = NULL; struct unix_stream_listen_opts uslg_opts = UNIX_STREAM_LISTEN_OPTS_INIT; int ret; uslg_opts.listen_backlog_size = LISTEN_BACKLOG; uslg_opts.disallow_chdir = ipc_opts->uds_disallow_chdir; ret = unix_ss_create(path, &uslg_opts, -1, &server_socket); if (ret) return ret; if (set_socket_blocking_flag(server_socket->fd_socket, 1)) { int saved_errno = errno; unix_ss_free(server_socket); errno = saved_errno; return -1; } *new_server_socket = server_socket; trace2_data_string("ipc-server", NULL, "listen-with-lock", path); return 0; } static int setup_listener_socket( const char *path, const struct ipc_server_opts *ipc_opts, struct unix_ss_socket **new_server_socket) { int ret, saved_errno; trace2_region_enter("ipc-server", "create-listener_socket", NULL); ret = create_listener_socket(path, ipc_opts, new_server_socket); saved_errno = errno; trace2_region_leave("ipc-server", "create-listener_socket", NULL); errno = saved_errno; return ret; } /* * Start IPC server in a pool of background threads. */ int ipc_server_run_async(struct ipc_server_data **returned_server_data, const char *path, const struct ipc_server_opts *opts, ipc_server_application_cb *application_cb, void *application_data) { struct unix_ss_socket *server_socket = NULL; struct ipc_server_data *server_data; int sv[2]; int k; int ret; int nr_threads = opts->nr_threads; *returned_server_data = NULL; /* * Create a socketpair and set sv[1] to non-blocking. This * will used to send a shutdown message to the accept-thread * and allows the accept-thread to wait on EITHER a client * connection or a shutdown request without spinning. */ if (socketpair(AF_UNIX, SOCK_STREAM, 0, sv) < 0) return -1; if (set_socket_blocking_flag(sv[1], 1)) { int saved_errno = errno; close(sv[0]); close(sv[1]); errno = saved_errno; return -1; } ret = setup_listener_socket(path, opts, &server_socket); if (ret) { int saved_errno = errno; close(sv[0]); close(sv[1]); errno = saved_errno; return ret; } server_data = xcalloc(1, sizeof(*server_data)); server_data->magic = MAGIC_SERVER_DATA; server_data->application_cb = application_cb; server_data->application_data = application_data; strbuf_init(&server_data->buf_path, 0); strbuf_addstr(&server_data->buf_path, path); if (nr_threads < 1) nr_threads = 1; pthread_mutex_init(&server_data->work_available_mutex, NULL); pthread_cond_init(&server_data->work_available_cond, NULL); server_data->queue_size = nr_threads * FIFO_SCALE; CALLOC_ARRAY(server_data->fifo_fds, server_data->queue_size); server_data->accept_thread = xcalloc(1, sizeof(*server_data->accept_thread)); server_data->accept_thread->magic = MAGIC_ACCEPT_THREAD_DATA; server_data->accept_thread->server_data = server_data; server_data->accept_thread->server_socket = server_socket; server_data->accept_thread->fd_send_shutdown = sv[0]; server_data->accept_thread->fd_wait_shutdown = sv[1]; if (pthread_create(&server_data->accept_thread->pthread_id, NULL, accept_thread_proc, server_data->accept_thread)) die_errno(_("could not start accept_thread '%s'"), path); for (k = 0; k < nr_threads; k++) { struct ipc_worker_thread_data *wtd; wtd = xcalloc(1, sizeof(*wtd)); wtd->magic = MAGIC_WORKER_THREAD_DATA; wtd->server_data = server_data; if (pthread_create(&wtd->pthread_id, NULL, worker_thread_proc, wtd)) { if (k == 0) die(_("could not start worker[0] for '%s'"), path); /* * Limp along with the thread pool that we have. */ break; } wtd->next_thread = server_data->worker_thread_list; server_data->worker_thread_list = wtd; } *returned_server_data = server_data; return 0; } /* * Gently tell the IPC server treads to shutdown. * Can be run on any thread. */ int ipc_server_stop_async(struct ipc_server_data *server_data) { /* ASSERT NOT holding mutex */ int fd; if (!server_data) return 0; trace2_region_enter("ipc-server", "server-stop-async", NULL); pthread_mutex_lock(&server_data->work_available_mutex); server_data->shutdown_requested = 1; /* * Write a byte to the shutdown socket pair to wake up the * accept-thread. */ if (write(server_data->accept_thread->fd_send_shutdown, "Q", 1) < 0) error_errno("could not write to fd_send_shutdown"); /* * Drain the queue of existing connections. */ while ((fd = fifo_dequeue(server_data)) != -1) close(fd); /* * Gently tell worker threads to stop processing new connections * and exit. (This does not abort in-process conversations.) */ pthread_cond_broadcast(&server_data->work_available_cond); pthread_mutex_unlock(&server_data->work_available_mutex); trace2_region_leave("ipc-server", "server-stop-async", NULL); return 0; } /* * Wait for all IPC server threads to stop. */ int ipc_server_await(struct ipc_server_data *server_data) { pthread_join(server_data->accept_thread->pthread_id, NULL); if (!server_data->shutdown_requested) BUG("ipc-server: accept-thread stopped for '%s'", server_data->buf_path.buf); while (server_data->worker_thread_list) { struct ipc_worker_thread_data *wtd = server_data->worker_thread_list; pthread_join(wtd->pthread_id, NULL); server_data->worker_thread_list = wtd->next_thread; free(wtd); } server_data->is_stopped = 1; return 0; } void ipc_server_free(struct ipc_server_data *server_data) { struct ipc_accept_thread_data * accept_thread_data; if (!server_data) return; if (!server_data->is_stopped) BUG("cannot free ipc-server while running for '%s'", server_data->buf_path.buf); accept_thread_data = server_data->accept_thread; if (accept_thread_data) { unix_ss_free(accept_thread_data->server_socket); if (accept_thread_data->fd_send_shutdown != -1) close(accept_thread_data->fd_send_shutdown); if (accept_thread_data->fd_wait_shutdown != -1) close(accept_thread_data->fd_wait_shutdown); free(server_data->accept_thread); } while (server_data->worker_thread_list) { struct ipc_worker_thread_data *wtd = server_data->worker_thread_list; server_data->worker_thread_list = wtd->next_thread; free(wtd); } pthread_cond_destroy(&server_data->work_available_cond); pthread_mutex_destroy(&server_data->work_available_mutex); strbuf_release(&server_data->buf_path); free(server_data->fifo_fds); free(server_data); }