1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977 978 979 980 981 982 983 984 985 986 987 988 989 990 991 992 993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023 1024 1025 1026 1027 1028 1029 1030
|
#include "git-compat-util.h"
#include "gettext.h"
#include "simple-ipc.h"
#include "strbuf.h"
#include "thread-utils.h"
#include "trace2.h"
#include "unix-socket.h"
#include "unix-stream-server.h"
#ifndef SUPPORTS_SIMPLE_IPC
/*
* This source file should only be compiled when Simple IPC is supported.
* See the top-level Makefile.
*/
#error SUPPORTS_SIMPLE_IPC not defined
#endif
enum ipc_active_state ipc_get_active_state(const char *path)
{
enum ipc_active_state state = IPC_STATE__OTHER_ERROR;
struct ipc_client_connect_options options
= IPC_CLIENT_CONNECT_OPTIONS_INIT;
struct stat st;
struct ipc_client_connection *connection_test = NULL;
options.wait_if_busy = 0;
options.wait_if_not_found = 0;
if (lstat(path, &st) == -1) {
switch (errno) {
case ENOENT:
case ENOTDIR:
return IPC_STATE__NOT_LISTENING;
default:
return IPC_STATE__INVALID_PATH;
}
}
#ifdef __CYGWIN__
/*
* Cygwin emulates Unix sockets by writing special-crafted files whose
* `system` bit is set.
*
* If we are too fast, Cygwin might still be in the process of marking
* the underlying file as a system file. Until then, we will not see a
* Unix socket here, but a plain file instead. Just in case that this
* is happening, wait a little and try again.
*/
{
static const int delay[] = { 1, 10, 20, 40, -1 };
int i;
for (i = 0; S_ISREG(st.st_mode) && delay[i] > 0; i++) {
sleep_millisec(delay[i]);
if (lstat(path, &st) == -1)
return IPC_STATE__INVALID_PATH;
}
}
#endif
/* also complain if a plain file is in the way */
if ((st.st_mode & S_IFMT) != S_IFSOCK)
return IPC_STATE__INVALID_PATH;
/*
* Just because the filesystem has a S_IFSOCK type inode
* at `path`, doesn't mean it that there is a server listening.
* Ping it to be sure.
*/
state = ipc_client_try_connect(path, &options, &connection_test);
ipc_client_close_connection(connection_test);
return state;
}
/*
* Retry frequency when trying to connect to a server.
*
* This value should be short enough that we don't seriously delay our
* caller, but not fast enough that our spinning puts pressure on the
* system.
*/
#define WAIT_STEP_MS (50)
/*
* Try to connect to the server. If the server is just starting up or
* is very busy, we may not get a connection the first time.
*/
static enum ipc_active_state connect_to_server(
const char *path,
int timeout_ms,
const struct ipc_client_connect_options *options,
int *pfd)
{
int k;
*pfd = -1;
for (k = 0; k < timeout_ms; k += WAIT_STEP_MS) {
int fd = unix_stream_connect(path, options->uds_disallow_chdir);
if (fd != -1) {
*pfd = fd;
return IPC_STATE__LISTENING;
}
if (errno == ENOENT) {
if (!options->wait_if_not_found)
return IPC_STATE__PATH_NOT_FOUND;
goto sleep_and_try_again;
}
if (errno == ETIMEDOUT) {
if (!options->wait_if_busy)
return IPC_STATE__NOT_LISTENING;
goto sleep_and_try_again;
}
if (errno == ECONNREFUSED) {
if (!options->wait_if_busy)
return IPC_STATE__NOT_LISTENING;
goto sleep_and_try_again;
}
return IPC_STATE__OTHER_ERROR;
sleep_and_try_again:
sleep_millisec(WAIT_STEP_MS);
}
return IPC_STATE__NOT_LISTENING;
}
/*
* The total amount of time that we are willing to wait when trying to
* connect to a server.
*
* When the server is first started, it might take a little while for
* it to become ready to service requests. Likewise, the server may
* be very (temporarily) busy and not respond to our connections.
*
* We should gracefully and silently handle those conditions and try
* again for a reasonable time period.
*
* The value chosen here should be long enough for the server
* to reliably heal from the above conditions.
*/
#define MY_CONNECTION_TIMEOUT_MS (1000)
enum ipc_active_state ipc_client_try_connect(
const char *path,
const struct ipc_client_connect_options *options,
struct ipc_client_connection **p_connection)
{
enum ipc_active_state state = IPC_STATE__OTHER_ERROR;
int fd = -1;
*p_connection = NULL;
trace2_region_enter("ipc-client", "try-connect", NULL);
trace2_data_string("ipc-client", NULL, "try-connect/path", path);
state = connect_to_server(path, MY_CONNECTION_TIMEOUT_MS,
options, &fd);
trace2_data_intmax("ipc-client", NULL, "try-connect/state",
(intmax_t)state);
trace2_region_leave("ipc-client", "try-connect", NULL);
if (state == IPC_STATE__LISTENING) {
(*p_connection) = xcalloc(1, sizeof(struct ipc_client_connection));
(*p_connection)->fd = fd;
}
return state;
}
void ipc_client_close_connection(struct ipc_client_connection *connection)
{
if (!connection)
return;
if (connection->fd != -1)
close(connection->fd);
free(connection);
}
int ipc_client_send_command_to_connection(
struct ipc_client_connection *connection,
const char *message, size_t message_len,
struct strbuf *answer)
{
int ret = 0;
strbuf_setlen(answer, 0);
trace2_region_enter("ipc-client", "send-command", NULL);
if (write_packetized_from_buf_no_flush(message, message_len,
connection->fd) < 0 ||
packet_flush_gently(connection->fd) < 0) {
ret = error(_("could not send IPC command"));
goto done;
}
if (read_packetized_to_strbuf(
connection->fd, answer,
PACKET_READ_GENTLE_ON_EOF | PACKET_READ_GENTLE_ON_READ_ERROR) < 0) {
ret = error(_("could not read IPC response"));
goto done;
}
done:
trace2_region_leave("ipc-client", "send-command", NULL);
return ret;
}
int ipc_client_send_command(const char *path,
const struct ipc_client_connect_options *options,
const char *message, size_t message_len,
struct strbuf *answer)
{
int ret = -1;
enum ipc_active_state state;
struct ipc_client_connection *connection = NULL;
state = ipc_client_try_connect(path, options, &connection);
if (state != IPC_STATE__LISTENING)
return ret;
ret = ipc_client_send_command_to_connection(connection,
message, message_len,
answer);
ipc_client_close_connection(connection);
return ret;
}
static int set_socket_blocking_flag(int fd, int make_nonblocking)
{
int flags;
flags = fcntl(fd, F_GETFL, NULL);
if (flags < 0)
return -1;
if (make_nonblocking)
flags |= O_NONBLOCK;
else
flags &= ~O_NONBLOCK;
return fcntl(fd, F_SETFL, flags);
}
/*
* Magic numbers used to annotate callback instance data.
* These are used to help guard against accidentally passing the
* wrong instance data across multiple levels of callbacks (which
* is easy to do if there are `void*` arguments).
*/
enum magic {
MAGIC_SERVER_REPLY_DATA,
MAGIC_WORKER_THREAD_DATA,
MAGIC_ACCEPT_THREAD_DATA,
MAGIC_SERVER_DATA,
};
struct ipc_server_reply_data {
enum magic magic;
int fd;
struct ipc_worker_thread_data *worker_thread_data;
};
struct ipc_worker_thread_data {
enum magic magic;
struct ipc_worker_thread_data *next_thread;
struct ipc_server_data *server_data;
pthread_t pthread_id;
};
struct ipc_accept_thread_data {
enum magic magic;
struct ipc_server_data *server_data;
struct unix_ss_socket *server_socket;
int fd_send_shutdown;
int fd_wait_shutdown;
pthread_t pthread_id;
};
/*
* With unix-sockets, the conceptual "ipc-server" is implemented as a single
* controller "accept-thread" thread and a pool of "worker-thread" threads.
* The former does the usual `accept()` loop and dispatches connections
* to an idle worker thread. The worker threads wait in an idle loop for
* a new connection, communicate with the client and relay data to/from
* the `application_cb` and then wait for another connection from the
* server thread. This avoids the overhead of constantly creating and
* destroying threads.
*/
struct ipc_server_data {
enum magic magic;
ipc_server_application_cb *application_cb;
void *application_data;
struct strbuf buf_path;
struct ipc_accept_thread_data *accept_thread;
struct ipc_worker_thread_data *worker_thread_list;
pthread_mutex_t work_available_mutex;
pthread_cond_t work_available_cond;
/*
* Accepted but not yet processed client connections are kept
* in a circular buffer FIFO. The queue is empty when the
* positions are equal.
*/
int *fifo_fds;
int queue_size;
int back_pos;
int front_pos;
int shutdown_requested;
int is_stopped;
};
/*
* Remove and return the oldest queued connection.
*
* Returns -1 if empty.
*/
static int fifo_dequeue(struct ipc_server_data *server_data)
{
/* ASSERT holding mutex */
int fd;
if (server_data->back_pos == server_data->front_pos)
return -1;
fd = server_data->fifo_fds[server_data->front_pos];
server_data->fifo_fds[server_data->front_pos] = -1;
server_data->front_pos++;
if (server_data->front_pos == server_data->queue_size)
server_data->front_pos = 0;
return fd;
}
/*
* Push a new fd onto the back of the queue.
*
* Drop it and return -1 if queue is already full.
*/
static int fifo_enqueue(struct ipc_server_data *server_data, int fd)
{
/* ASSERT holding mutex */
int next_back_pos;
next_back_pos = server_data->back_pos + 1;
if (next_back_pos == server_data->queue_size)
next_back_pos = 0;
if (next_back_pos == server_data->front_pos) {
/* Queue is full. Just drop it. */
close(fd);
return -1;
}
server_data->fifo_fds[server_data->back_pos] = fd;
server_data->back_pos = next_back_pos;
return fd;
}
/*
* Wait for a connection to be queued to the FIFO and return it.
*
* Returns -1 if someone has already requested a shutdown.
*/
static int worker_thread__wait_for_connection(
struct ipc_worker_thread_data *worker_thread_data)
{
/* ASSERT NOT holding mutex */
struct ipc_server_data *server_data = worker_thread_data->server_data;
int fd = -1;
pthread_mutex_lock(&server_data->work_available_mutex);
for (;;) {
if (server_data->shutdown_requested)
break;
fd = fifo_dequeue(server_data);
if (fd >= 0)
break;
pthread_cond_wait(&server_data->work_available_cond,
&server_data->work_available_mutex);
}
pthread_mutex_unlock(&server_data->work_available_mutex);
return fd;
}
/*
* Forward declare our reply callback function so that any compiler
* errors are reported when we actually define the function (in addition
* to any errors reported when we try to pass this callback function as
* a parameter in a function call). The former are easier to understand.
*/
static ipc_server_reply_cb do_io_reply_callback;
/*
* Relay application's response message to the client process.
* (We do not flush at this point because we allow the caller
* to chunk data to the client thru us.)
*/
static int do_io_reply_callback(struct ipc_server_reply_data *reply_data,
const char *response, size_t response_len)
{
if (reply_data->magic != MAGIC_SERVER_REPLY_DATA)
BUG("reply_cb called with wrong instance data");
return write_packetized_from_buf_no_flush(response, response_len,
reply_data->fd);
}
/* A randomly chosen value. */
#define MY_WAIT_POLL_TIMEOUT_MS (10)
/*
* If the client hangs up without sending any data on the wire, just
* quietly close the socket and ignore this client.
*
* This worker thread is committed to reading the IPC request data
* from the client at the other end of this fd. Wait here for the
* client to actually put something on the wire -- because if the
* client just does a ping (connect and hangup without sending any
* data), our use of the pkt-line read routines will spew an error
* message.
*
* Return -1 if the client hung up.
* Return 0 if data (possibly incomplete) is ready.
*/
static int worker_thread__wait_for_io_start(
struct ipc_worker_thread_data *worker_thread_data,
int fd)
{
struct ipc_server_data *server_data = worker_thread_data->server_data;
struct pollfd pollfd[1];
int result;
for (;;) {
pollfd[0].fd = fd;
pollfd[0].events = POLLIN;
result = poll(pollfd, 1, MY_WAIT_POLL_TIMEOUT_MS);
if (result < 0) {
if (errno == EINTR)
continue;
goto cleanup;
}
if (result == 0) {
/* a timeout */
int in_shutdown;
pthread_mutex_lock(&server_data->work_available_mutex);
in_shutdown = server_data->shutdown_requested;
pthread_mutex_unlock(&server_data->work_available_mutex);
/*
* If a shutdown is already in progress and this
* client has not started talking yet, just drop it.
*/
if (in_shutdown)
goto cleanup;
continue;
}
if (pollfd[0].revents & POLLHUP)
goto cleanup;
if (pollfd[0].revents & POLLIN)
return 0;
goto cleanup;
}
cleanup:
close(fd);
return -1;
}
/*
* Receive the request/command from the client and pass it to the
* registered request-callback. The request-callback will compose
* a response and call our reply-callback to send it to the client.
*/
static int worker_thread__do_io(
struct ipc_worker_thread_data *worker_thread_data,
int fd)
{
/* ASSERT NOT holding lock */
struct strbuf buf = STRBUF_INIT;
struct ipc_server_reply_data reply_data;
int ret = 0;
reply_data.magic = MAGIC_SERVER_REPLY_DATA;
reply_data.worker_thread_data = worker_thread_data;
reply_data.fd = fd;
ret = read_packetized_to_strbuf(
reply_data.fd, &buf,
PACKET_READ_GENTLE_ON_EOF | PACKET_READ_GENTLE_ON_READ_ERROR);
if (ret >= 0) {
ret = worker_thread_data->server_data->application_cb(
worker_thread_data->server_data->application_data,
buf.buf, buf.len, do_io_reply_callback, &reply_data);
packet_flush_gently(reply_data.fd);
}
else {
/*
* The client probably disconnected/shutdown before it
* could send a well-formed message. Ignore it.
*/
}
strbuf_release(&buf);
close(reply_data.fd);
return ret;
}
/*
* Block SIGPIPE on the current thread (so that we get EPIPE from
* write() rather than an actual signal).
*
* Note that using sigchain_push() and _pop() to control SIGPIPE
* around our IO calls is not thread safe:
* [] It uses a global stack of handler frames.
* [] It uses ALLOC_GROW() to resize it.
* [] Finally, according to the `signal(2)` man-page:
* "The effects of `signal()` in a multithreaded process are unspecified."
*/
static void thread_block_sigpipe(sigset_t *old_set)
{
sigset_t new_set;
sigemptyset(&new_set);
sigaddset(&new_set, SIGPIPE);
sigemptyset(old_set);
pthread_sigmask(SIG_BLOCK, &new_set, old_set);
}
/*
* Thread proc for an IPC worker thread. It handles a series of
* connections from clients. It pulls the next fd from the queue
* processes it, and then waits for the next client.
*
* Block SIGPIPE in this worker thread for the life of the thread.
* This avoids stray (and sometimes delayed) SIGPIPE signals caused
* by client errors and/or when we are under extremely heavy IO load.
*
* This means that the application callback will have SIGPIPE blocked.
* The callback should not change it.
*/
static void *worker_thread_proc(void *_worker_thread_data)
{
struct ipc_worker_thread_data *worker_thread_data = _worker_thread_data;
struct ipc_server_data *server_data = worker_thread_data->server_data;
sigset_t old_set;
int fd, io;
int ret;
trace2_thread_start("ipc-worker");
thread_block_sigpipe(&old_set);
for (;;) {
fd = worker_thread__wait_for_connection(worker_thread_data);
if (fd == -1)
break; /* in shutdown */
io = worker_thread__wait_for_io_start(worker_thread_data, fd);
if (io == -1)
continue; /* client hung up without sending anything */
ret = worker_thread__do_io(worker_thread_data, fd);
if (ret == SIMPLE_IPC_QUIT) {
trace2_data_string("ipc-worker", NULL, "queue_stop_async",
"application_quit");
/*
* The application layer is telling the ipc-server
* layer to shutdown.
*
* We DO NOT have a response to send to the client.
*
* Queue an async stop (to stop the other threads) and
* allow this worker thread to exit now (no sense waiting
* for the thread-pool shutdown signal).
*
* Other non-idle worker threads are allowed to finish
* responding to their current clients.
*/
ipc_server_stop_async(server_data);
break;
}
}
trace2_thread_exit();
return NULL;
}
/* A randomly chosen value. */
#define MY_ACCEPT_POLL_TIMEOUT_MS (60 * 1000)
/*
* Accept a new client connection on our socket. This uses non-blocking
* IO so that we can also wait for shutdown requests on our socket-pair
* without actually spinning on a fast timeout.
*/
static int accept_thread__wait_for_connection(
struct ipc_accept_thread_data *accept_thread_data)
{
struct pollfd pollfd[2];
int result;
for (;;) {
pollfd[0].fd = accept_thread_data->fd_wait_shutdown;
pollfd[0].events = POLLIN;
pollfd[1].fd = accept_thread_data->server_socket->fd_socket;
pollfd[1].events = POLLIN;
result = poll(pollfd, 2, MY_ACCEPT_POLL_TIMEOUT_MS);
if (result < 0) {
if (errno == EINTR)
continue;
return result;
}
if (result == 0) {
/* a timeout */
/*
* If someone deletes or force-creates a new unix
* domain socket at our path, all future clients
* will be routed elsewhere and we silently starve.
* If that happens, just queue a shutdown.
*/
if (unix_ss_was_stolen(
accept_thread_data->server_socket)) {
trace2_data_string("ipc-accept", NULL,
"queue_stop_async",
"socket_stolen");
ipc_server_stop_async(
accept_thread_data->server_data);
}
continue;
}
if (pollfd[0].revents & POLLIN) {
/* shutdown message queued to socketpair */
return -1;
}
if (pollfd[1].revents & POLLIN) {
/* a connection is available on server_socket */
int client_fd =
accept(accept_thread_data->server_socket->fd_socket,
NULL, NULL);
if (client_fd >= 0)
return client_fd;
/*
* An error here is unlikely -- it probably
* indicates that the connecting process has
* already dropped the connection.
*/
continue;
}
BUG("unandled poll result errno=%d r[0]=%d r[1]=%d",
errno, pollfd[0].revents, pollfd[1].revents);
}
}
/*
* Thread proc for the IPC server "accept thread". This waits for
* an incoming socket connection, appends it to the queue of available
* connections, and notifies a worker thread to process it.
*
* Block SIGPIPE in this thread for the life of the thread. This
* avoids any stray SIGPIPE signals when closing pipe fds under
* extremely heavy loads (such as when the fifo queue is full and we
* drop incomming connections).
*/
static void *accept_thread_proc(void *_accept_thread_data)
{
struct ipc_accept_thread_data *accept_thread_data = _accept_thread_data;
struct ipc_server_data *server_data = accept_thread_data->server_data;
sigset_t old_set;
trace2_thread_start("ipc-accept");
thread_block_sigpipe(&old_set);
for (;;) {
int client_fd = accept_thread__wait_for_connection(
accept_thread_data);
pthread_mutex_lock(&server_data->work_available_mutex);
if (server_data->shutdown_requested) {
pthread_mutex_unlock(&server_data->work_available_mutex);
if (client_fd >= 0)
close(client_fd);
break;
}
if (client_fd < 0) {
/* ignore transient accept() errors */
}
else {
fifo_enqueue(server_data, client_fd);
pthread_cond_broadcast(&server_data->work_available_cond);
}
pthread_mutex_unlock(&server_data->work_available_mutex);
}
trace2_thread_exit();
return NULL;
}
/*
* We can't predict the connection arrival rate relative to the worker
* processing rate, therefore we allow the "accept-thread" to queue up
* a generous number of connections, since we'd rather have the client
* not unnecessarily timeout if we can avoid it. (The assumption is
* that this will be used for FSMonitor and a few second wait on a
* connection is better than having the client timeout and do the full
* computation itself.)
*
* The FIFO queue size is set to a multiple of the worker pool size.
* This value chosen at random.
*/
#define FIFO_SCALE (100)
/*
* The backlog value for `listen(2)`. This doesn't need to huge,
* rather just large enough for our "accept-thread" to wake up and
* queue incoming connections onto the FIFO without the kernel
* dropping any.
*
* This value chosen at random.
*/
#define LISTEN_BACKLOG (50)
static int create_listener_socket(
const char *path,
const struct ipc_server_opts *ipc_opts,
struct unix_ss_socket **new_server_socket)
{
struct unix_ss_socket *server_socket = NULL;
struct unix_stream_listen_opts uslg_opts = UNIX_STREAM_LISTEN_OPTS_INIT;
int ret;
uslg_opts.listen_backlog_size = LISTEN_BACKLOG;
uslg_opts.disallow_chdir = ipc_opts->uds_disallow_chdir;
ret = unix_ss_create(path, &uslg_opts, -1, &server_socket);
if (ret)
return ret;
if (set_socket_blocking_flag(server_socket->fd_socket, 1)) {
int saved_errno = errno;
unix_ss_free(server_socket);
errno = saved_errno;
return -1;
}
*new_server_socket = server_socket;
trace2_data_string("ipc-server", NULL, "listen-with-lock", path);
return 0;
}
static int setup_listener_socket(
const char *path,
const struct ipc_server_opts *ipc_opts,
struct unix_ss_socket **new_server_socket)
{
int ret, saved_errno;
trace2_region_enter("ipc-server", "create-listener_socket", NULL);
ret = create_listener_socket(path, ipc_opts, new_server_socket);
saved_errno = errno;
trace2_region_leave("ipc-server", "create-listener_socket", NULL);
errno = saved_errno;
return ret;
}
/*
* Start IPC server in a pool of background threads.
*/
int ipc_server_run_async(struct ipc_server_data **returned_server_data,
const char *path, const struct ipc_server_opts *opts,
ipc_server_application_cb *application_cb,
void *application_data)
{
struct unix_ss_socket *server_socket = NULL;
struct ipc_server_data *server_data;
int sv[2];
int k;
int ret;
int nr_threads = opts->nr_threads;
*returned_server_data = NULL;
/*
* Create a socketpair and set sv[1] to non-blocking. This
* will used to send a shutdown message to the accept-thread
* and allows the accept-thread to wait on EITHER a client
* connection or a shutdown request without spinning.
*/
if (socketpair(AF_UNIX, SOCK_STREAM, 0, sv) < 0)
return -1;
if (set_socket_blocking_flag(sv[1], 1)) {
int saved_errno = errno;
close(sv[0]);
close(sv[1]);
errno = saved_errno;
return -1;
}
ret = setup_listener_socket(path, opts, &server_socket);
if (ret) {
int saved_errno = errno;
close(sv[0]);
close(sv[1]);
errno = saved_errno;
return ret;
}
server_data = xcalloc(1, sizeof(*server_data));
server_data->magic = MAGIC_SERVER_DATA;
server_data->application_cb = application_cb;
server_data->application_data = application_data;
strbuf_init(&server_data->buf_path, 0);
strbuf_addstr(&server_data->buf_path, path);
if (nr_threads < 1)
nr_threads = 1;
pthread_mutex_init(&server_data->work_available_mutex, NULL);
pthread_cond_init(&server_data->work_available_cond, NULL);
server_data->queue_size = nr_threads * FIFO_SCALE;
CALLOC_ARRAY(server_data->fifo_fds, server_data->queue_size);
server_data->accept_thread =
xcalloc(1, sizeof(*server_data->accept_thread));
server_data->accept_thread->magic = MAGIC_ACCEPT_THREAD_DATA;
server_data->accept_thread->server_data = server_data;
server_data->accept_thread->server_socket = server_socket;
server_data->accept_thread->fd_send_shutdown = sv[0];
server_data->accept_thread->fd_wait_shutdown = sv[1];
if (pthread_create(&server_data->accept_thread->pthread_id, NULL,
accept_thread_proc, server_data->accept_thread))
die_errno(_("could not start accept_thread '%s'"), path);
for (k = 0; k < nr_threads; k++) {
struct ipc_worker_thread_data *wtd;
wtd = xcalloc(1, sizeof(*wtd));
wtd->magic = MAGIC_WORKER_THREAD_DATA;
wtd->server_data = server_data;
if (pthread_create(&wtd->pthread_id, NULL, worker_thread_proc,
wtd)) {
if (k == 0)
die(_("could not start worker[0] for '%s'"),
path);
/*
* Limp along with the thread pool that we have.
*/
break;
}
wtd->next_thread = server_data->worker_thread_list;
server_data->worker_thread_list = wtd;
}
*returned_server_data = server_data;
return 0;
}
/*
* Gently tell the IPC server treads to shutdown.
* Can be run on any thread.
*/
int ipc_server_stop_async(struct ipc_server_data *server_data)
{
/* ASSERT NOT holding mutex */
int fd;
if (!server_data)
return 0;
trace2_region_enter("ipc-server", "server-stop-async", NULL);
pthread_mutex_lock(&server_data->work_available_mutex);
server_data->shutdown_requested = 1;
/*
* Write a byte to the shutdown socket pair to wake up the
* accept-thread.
*/
if (write(server_data->accept_thread->fd_send_shutdown, "Q", 1) < 0)
error_errno("could not write to fd_send_shutdown");
/*
* Drain the queue of existing connections.
*/
while ((fd = fifo_dequeue(server_data)) != -1)
close(fd);
/*
* Gently tell worker threads to stop processing new connections
* and exit. (This does not abort in-process conversations.)
*/
pthread_cond_broadcast(&server_data->work_available_cond);
pthread_mutex_unlock(&server_data->work_available_mutex);
trace2_region_leave("ipc-server", "server-stop-async", NULL);
return 0;
}
/*
* Wait for all IPC server threads to stop.
*/
int ipc_server_await(struct ipc_server_data *server_data)
{
pthread_join(server_data->accept_thread->pthread_id, NULL);
if (!server_data->shutdown_requested)
BUG("ipc-server: accept-thread stopped for '%s'",
server_data->buf_path.buf);
while (server_data->worker_thread_list) {
struct ipc_worker_thread_data *wtd =
server_data->worker_thread_list;
pthread_join(wtd->pthread_id, NULL);
server_data->worker_thread_list = wtd->next_thread;
free(wtd);
}
server_data->is_stopped = 1;
return 0;
}
void ipc_server_free(struct ipc_server_data *server_data)
{
struct ipc_accept_thread_data * accept_thread_data;
if (!server_data)
return;
if (!server_data->is_stopped)
BUG("cannot free ipc-server while running for '%s'",
server_data->buf_path.buf);
accept_thread_data = server_data->accept_thread;
if (accept_thread_data) {
unix_ss_free(accept_thread_data->server_socket);
if (accept_thread_data->fd_send_shutdown != -1)
close(accept_thread_data->fd_send_shutdown);
if (accept_thread_data->fd_wait_shutdown != -1)
close(accept_thread_data->fd_wait_shutdown);
free(server_data->accept_thread);
}
while (server_data->worker_thread_list) {
struct ipc_worker_thread_data *wtd =
server_data->worker_thread_list;
server_data->worker_thread_list = wtd->next_thread;
free(wtd);
}
pthread_cond_destroy(&server_data->work_available_cond);
pthread_mutex_destroy(&server_data->work_available_mutex);
strbuf_release(&server_data->buf_path);
free(server_data->fifo_fds);
free(server_data);
}
|