1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634
|
/****************************************************************
* *
* Copyright (c) 2008-2023 Fidelity National Information *
* Services, Inc. and/or its subsidiaries. All rights reserved. *
* *
* This source code contains the intellectual property *
* of its copyright holder(s), and is made available *
* under a license. If you do not know the terms of *
* the license, please stop and do not read further. *
* *
****************************************************************/
#include "mdef.h"
#include "gtm_stdio.h"
#include "gtm_string.h"
#include "gtm_unistd.h"
#include "gtm_time.h"
#include <sys/wait.h>
#include <errno.h>
#include "gtm_inet.h"
#include "gdsroot.h"
#include "gdsblk.h"
#include "gtm_facility.h"
#include "fileinfo.h"
#include "gdsbt.h"
#include "gdsfhead.h"
#include "filestruct.h"
#include "repl_shutdcode.h"
#include "gtmrecv.h"
#include "repl_comm.h"
#include "repl_msg.h"
#include "repl_dbg.h"
#include "repl_log.h"
#include "repl_errno.h"
#include "iosp.h"
#include "eintr_wrappers.h"
#include "gt_timer.h"
#include "gtmio.h"
#include "util.h"
#include "tp_change_reg.h"
#include "memcoherency.h"
#include "replgbl.h"
#include "gtmsource.h"
#ifdef GTM_TLS
#include "gtm_repl.h"
#endif
#include "jnl.h"
#include "repl_filter.h"
GBLREF repl_msg_ptr_t gtmrecv_msgp;
GBLREF int gtmrecv_max_repl_msglen;
GBLREF int gtmrecv_listen_sock_fd;
GBLREF int gtmrecv_sock_fd;
GBLREF boolean_t repl_connection_reset;
GBLREF recvpool_addrs recvpool;
GBLREF int gtmrecv_log_fd;
GBLREF FILE *gtmrecv_log_fp;
GBLREF boolean_t gtmrecv_logstats;
GBLREF boolean_t gtmrecv_wait_for_jnl_seqno;
GBLREF boolean_t gtmrecv_bad_trans_sent;
GBLREF uint4 log_interval;
GBLREF volatile time_t gtmrecv_now;
GBLREF boolean_t gtmrecv_send_cmp2uncmp;
GBLREF repl_conn_info_t *remote_side;
GBLREF jnlpool_addrs_ptr_t jnlpool;
GBLREF int gtmrecv_filter;
error_def(ERR_RECVPOOLSETUP);
error_def(ERR_REPLCOMM);
error_def(ERR_TEXT);
#ifdef INT8_SUPPORTED
static seq_num last_ack_seqno = 0;
#else
static seq_num last_ack_seqno = {0, 0};
#endif
#define GTMRECV_NEXT_REPORT_FACTOR 2
enum
{
CONTINUE_POLL,
STOP_POLL
};
int gtmrecv_poll_actions1(int *pending_data_len, int *buff_unprocessed, unsigned char *buffp)
{
static int report_cnt = 1;
static int next_report_at = 1;
static boolean_t send_xoff = FALSE;
static boolean_t xoff_sent = FALSE;
static seq_num send_seqno;
static boolean_t log_draining_msg = FALSE;
static boolean_t send_badtrans = FALSE;
static boolean_t send_cmp2uncmp = FALSE;
static boolean_t upd_shut_too_early_logged = FALSE;
static time_t last_reap_time = 0;
repl_msg_t xoff_msg;
repl_badtrans_msg_t bad_trans_msg;
boolean_t alert = FALSE, info = FALSE;
int return_status;
gd_region *region_top;
unsigned char *msg_ptr; /* needed for REPL_{SEND,RECV}_LOOP */
int tosend_len, sent_len, sent_this_iter; /* needed for REPL_SEND_LOOP */
int torecv_len, recvd_len, recvd_this_iter; /* needed for REPL_RECV_LOOP */
int status, poll_dir; /* needed for REPL_{SEND,RECV}_LOOP */
int temp_len, pending_msg_size;
int upd_start_status, upd_start_attempts;
int buffered_data_len;
int upd_exit_status;
seq_num temp_send_seqno;
boolean_t bad_trans_detected = FALSE, onln_rlbk_flg_set = FALSE;
recvpool_ctl_ptr_t recvpool_ctl;
upd_proc_local_ptr_t upd_proc_local;
gtmrecv_local_ptr_t gtmrecv_local;
upd_helper_ctl_ptr_t upd_helper_ctl;
pid_t waitpid_res;
int4 msg_type, msg_len = 0;
DCL_THREADGBL_ACCESS;
SETUP_THREADGBL_ACCESS;
recvpool_ctl = recvpool.recvpool_ctl;
upd_proc_local = recvpool.upd_proc_local;
gtmrecv_local = recvpool.gtmrecv_local;
upd_helper_ctl = recvpool.upd_helper_ctl;
if (SHUTDOWN == gtmrecv_local->shutdown)
{
repl_log(gtmrecv_log_fp, TRUE, TRUE, "Shutdown signalled\n");
gtmrecv_end(); /* Won't return */
}
# ifdef GTM_TLS
/* If we sent a REPL_RENEG_ACK, then we cannot afford to send anymore asynchronous messages (like XOFF_ACK_ME) until we
* receive a REPL_RENEG_COMPLETE from the source server. This ensures that while the source server attempts to do a SSL/TLS
* renegotiation, it doesn't have any application data (like XOFF_ACK_ME) sitting in the pipe.
*/
if (REPLTLS_WAITING_FOR_RENEG_COMPLETE == repl_tls.renegotiate_state)
return STOP_POLL;
# endif
/* Reset report_cnt and next_report_at to 1 when a new upd proc is forked */
if ((1 == report_cnt) || (report_cnt == next_report_at))
{
/* A comment on the usage of NO_SHUTDOWN below for the alert variable. Since upd_proc_local->upd_proc_shutdown is
* a shared memory field (and could be concurrently changed by either the receiver server or the update process),
* we want to make sure it is the same value BEFORE and AFTER checking whether the update process is alive or not.
* If it is not NO_SHUTDOWN (i.e. is SHUTDOWN or NORMAL_SHUTDOWN or ABNORMAL_SHUTDOWN) it has shut down due to
* an external request so we do want to send out a false update-process-is-not-alive alert.
*/
if ((alert = ((NO_SHUTDOWN == upd_proc_local->upd_proc_shutdown) && (SRV_DEAD == is_updproc_alive())
&& (NO_SHUTDOWN == upd_proc_local->upd_proc_shutdown)))
|| (info = (((NORMAL_SHUTDOWN == upd_proc_local->upd_proc_shutdown)
|| (ABNORMAL_SHUTDOWN == upd_proc_local->upd_proc_shutdown)) && (SRV_DEAD == is_updproc_alive()))))
{
if (alert)
repl_log(gtmrecv_log_fp, TRUE, TRUE,
"ALERT : Receiver Server detected that Update Process is not ALIVE\n");
else
repl_log(gtmrecv_log_fp, TRUE, TRUE,
"INFO : Update process not running due to user initiated shutdown\n");
if (1 == report_cnt)
{
send_xoff = TRUE;
recvpool_ctl->old_jnl_seqno = recvpool_ctl->jnl_seqno;
recvpool_ctl->jnl_seqno = 0;
/* Even though we have identified that the update process is NOT alive, a waitpid on the update
* process PID is necessary so that the system doesn't leave any zombie process lying around.
* This is possible since any child process that dies without the parent doing a waitpid on it
* will be defunct unless the parent dies at which point the "init" process takes the role of
* the parent and invokes waitpid to remove the zombies.
* NOTE: It is possible that the update process was killed before the receiver server got a
* chance to record it's PID in the recvpool.upd_proc_local structure. In such a case, don't
* invoke waitpid as that will block us (receiver server) if this instance of the receiver
* server was started with helper processes.
*/
if (0 < upd_proc_local->upd_proc_pid)
{
WAITPID(upd_proc_local->upd_proc_pid, &upd_exit_status, 0, waitpid_res);
/* Since the update process as part of its shutdown does NOT reset the upd_proc_pid, reset
* it here ONLY if the update process was NOT kill -9ed. This is needed because receiver
* server as part of its shutdown relies on this field (upd_proc_pid) to determine if the
* update process was cleanly shutdown or was kill -9ed.
*/
if (!alert)
upd_proc_local->upd_proc_pid = 0;
}
upd_proc_local->bad_trans = FALSE; /* No point in doing bad transaction processing */
upd_proc_local->onln_rlbk_flg = FALSE; /* No point handling online rollback */
}
gtmrecv_wait_for_jnl_seqno = TRUE;
REPL_DPRINT1(
"gtmrecv_poll_actions : Setting gtmrecv_wait_for_jnl_seqno to TRUE because of upd crash/shutdown\n");
next_report_at *= GTMRECV_NEXT_REPORT_FACTOR;
report_cnt++;
}
} else
report_cnt++;
/* Check if REPL_CMP2UNCMP or REPL_BADTRANS message needs to be sent */
if (upd_proc_local->onln_rlbk_flg)
{ /* Update process detected an online rollback and is requesting us to restart the connection. But before that, send
* REPL_XOFF source side and drain the replication pipe
*/
onln_rlbk_flg_set = TRUE;
send_xoff = TRUE;
} else if (!send_cmp2uncmp && gtmrecv_send_cmp2uncmp)
{
send_xoff = TRUE;
send_seqno = recvpool_ctl->jnl_seqno;
send_cmp2uncmp = TRUE;
} else if (!send_badtrans && upd_proc_local->bad_trans)
{
send_xoff = TRUE;
send_seqno = upd_proc_local->read_jnl_seqno;
send_badtrans = TRUE;
bad_trans_detected = TRUE;
} else if (!upd_proc_local->bad_trans && send_badtrans && 1 != report_cnt)
{
send_badtrans = FALSE;
bad_trans_detected = FALSE;
}
if (send_xoff && !xoff_sent)
{ /* Send XOFF_ACK_ME if the receiver has a connection to the source. Do not attempt to send it if we dont even
* know the endianness of the remote side. In that case, we are guaranteed no initial handshake occurred and
* so no point sending the XOFF too. This saves us lots of trouble in case of cross-endian replication connections.
*/
assert((FD_INVALID != gtmrecv_sock_fd) || repl_connection_reset);
if ((FD_INVALID != gtmrecv_sock_fd) && remote_side->endianness_known)
{
send_seqno = upd_proc_local->read_jnl_seqno;
if (!remote_side->cross_endian)
{
xoff_msg.type = REPL_XOFF_ACK_ME;
xoff_msg.len = MIN_REPL_MSGLEN;
memcpy((uchar_ptr_t)&xoff_msg.msg[0], (uchar_ptr_t)&send_seqno, SIZEOF(seq_num));
} else
{
xoff_msg.type = GTM_BYTESWAP_32(REPL_XOFF_ACK_ME);
xoff_msg.len = GTM_BYTESWAP_32(MIN_REPL_MSGLEN);
temp_send_seqno = GTM_BYTESWAP_64(send_seqno);
memcpy((uchar_ptr_t)&xoff_msg.msg[0], (uchar_ptr_t)&temp_send_seqno, SIZEOF(seq_num));
}
REPL_SEND_LOOP(gtmrecv_sock_fd, &xoff_msg, MIN_REPL_MSGLEN, REPL_POLL_NOWAIT)
; /* Empty Body */
if (SS_NORMAL != status)
{
if (REPL_CONN_RESET(status) && (EREPL_SEND == repl_errno))
{
repl_log(gtmrecv_log_fp, TRUE, TRUE, "Connection reset while sending XOFF_ACK_ME. "
"Status = %d ; %s\n", status, STRERROR(status));
repl_close(>mrecv_sock_fd);
repl_connection_reset = TRUE;
xoff_sent = FALSE;
send_badtrans = FALSE;
} else if (EREPL_SEND == repl_errno)
RTS_ERROR_CSA_ABT(NULL, VARLSTCNT(7) ERR_REPLCOMM, 0, ERR_TEXT, 2,
LEN_AND_LIT("Error sending XOFF msg due to BAD_TRANS or UPD crash/shutdown. "
"Error in send"), status);
else
{
assert(EREPL_SELECT == repl_errno);
RTS_ERROR_CSA_ABT(NULL, VARLSTCNT(7) ERR_REPLCOMM, 0, ERR_TEXT, 2,
LEN_AND_LIT("Error sending XOFF msg due to BAD_TRANS or UPD crash/shutdown. "
"Error in select"), status);
}
} else
{
xoff_sent = TRUE;
log_draining_msg = TRUE;
}
repl_log(gtmrecv_log_fp, TRUE, TRUE, "REPL_XOFF_ACK_ME sent due to upd shutdown/crash or bad trans "
"or ONLINE_ROLLBACK\n");
send_xoff = FALSE;
} else
{ /* Connection has been lost OR initial handshake needs to happen again, so no point sending XOFF/BADTRANS */
send_xoff = FALSE;
send_badtrans = FALSE;
}
}
/* Drain pipe */
if (xoff_sent)
{
if (log_draining_msg)
{ /* avoid multiple logs per instance */
repl_log(gtmrecv_log_fp, TRUE, TRUE, "REPL INFO - Draining replication pipe due to %s\n",
send_cmp2uncmp ? "CMP2UNCMP" : (send_badtrans ? "BAD_TRANS" :
(onln_rlbk_flg_set ? "ONLINE_ROLLBACK" : "UPD shutdown/crash")));
log_draining_msg = FALSE;
}
if (0 != *buff_unprocessed)
{ /* Throw away the current contents of the buffer */
buffered_data_len = ((*pending_data_len <= *buff_unprocessed) ? *pending_data_len : *buff_unprocessed);
*buff_unprocessed -= buffered_data_len;
buffp += buffered_data_len;
*pending_data_len -= buffered_data_len;
REPL_DPRINT2("gtmrecv_poll_actions : (1) Throwing away %d bytes from old buffer while draining\n",
buffered_data_len);
assert(remote_side->endianness_known); /* only then is remote_side->cross_endian reliable */
while (REPL_MSG_HDRLEN <= *buff_unprocessed)
{
assert(0 == (((unsigned long)buffp) % REPL_MSG_ALIGN));
msg_len = ((repl_msg_ptr_t)buffp)->len;
msg_type = ((repl_msg_ptr_t)buffp)->type;
if (remote_side->cross_endian)
{
msg_len = GTM_BYTESWAP_32(msg_len);
msg_type = GTM_BYTESWAP_32(msg_type);
}
msg_type = (msg_type & REPL_TR_CMP_MSG_TYPE_MASK);
assert((REPL_TR_CMP_JNL_RECS == msg_type) || (0 == (msg_len % REPL_MSG_ALIGN)));
*pending_data_len = ROUND_UP2(msg_len, REPL_MSG_ALIGN);
buffered_data_len = ((*pending_data_len <= *buff_unprocessed) ?
*pending_data_len : *buff_unprocessed);
*buff_unprocessed -= buffered_data_len;
buffp += buffered_data_len;
*pending_data_len -= buffered_data_len;
REPL_DPRINT3("gtmrecv_poll_actions : (1) Throwing away message of "
"type %d and length %d from old buffer while draining\n", msg_type, buffered_data_len);
}
if (0 < *buff_unprocessed)
{
memmove((unsigned char *)gtmrecv_msgp, buffp, *buff_unprocessed);
REPL_DPRINT2("gtmrecv_poll_actions : Incomplete header of length %d while draining\n",
*buff_unprocessed);
}
}
status = SS_NORMAL;
if (0 != *buff_unprocessed || 0 == *pending_data_len)
{ /* Receive the header of a message */
assert(REPL_MSG_HDRLEN > *buff_unprocessed); /* so we dont pass negative length in REPL_RECV_LOOP */
REPL_RECV_LOOP(gtmrecv_sock_fd, ((unsigned char *)gtmrecv_msgp) + *buff_unprocessed,
(REPL_MSG_HDRLEN - *buff_unprocessed), REPL_POLL_WAIT)
; /* Empty Body */
if (SS_NORMAL == status)
{
assert(remote_side->endianness_known); /* only then is remote_side->cross_endian reliable */
if (!remote_side->cross_endian)
{
msg_len = gtmrecv_msgp->len;
msg_type = gtmrecv_msgp->type;
} else
{
msg_len = GTM_BYTESWAP_32(gtmrecv_msgp->len);
msg_type = GTM_BYTESWAP_32(gtmrecv_msgp->type);
}
msg_type = (msg_type & REPL_TR_CMP_MSG_TYPE_MASK);
assert((REPL_TR_CMP_JNL_RECS == msg_type) || (0 == (msg_len % REPL_MSG_ALIGN)));
msg_len = ROUND_UP2(msg_len, REPL_MSG_ALIGN);
REPL_DPRINT3("gtmrecv_poll_actions : Received message of type %d and length %d while draining\n",
msg_type, msg_len);
}
}
if ((SS_NORMAL == status) && (0 != *buff_unprocessed || 0 == *pending_data_len) && (REPL_XOFF_ACK == msg_type))
{ /* Receive the rest of the XOFF_ACK msg and signal the drain as complete */
REPL_RECV_LOOP(gtmrecv_sock_fd, gtmrecv_msgp, (MIN_REPL_MSGLEN - REPL_MSG_HDRLEN), REPL_POLL_WAIT)
; /* Empty Body */
if (SS_NORMAL == status)
{
repl_log(gtmrecv_log_fp, TRUE, TRUE,
"REPL INFO - XOFF_ACK received. Drained replication pipe completely\n");
upd_shut_too_early_logged = FALSE;
xoff_sent = FALSE;
return_status = STOP_POLL;
}
} else if (SS_NORMAL == status)
{ /* Drain the rest of the message */
if (0 < *pending_data_len)
{
pending_msg_size = *pending_data_len;
REPL_DPRINT2("gtmrecv_poll_actions : (2) Throwing away %d bytes from pipe\n", pending_msg_size);
} else
{
pending_msg_size = msg_len - REPL_MSG_HDRLEN;
REPL_DPRINT3("gtmrecv_poll_actions : (2) Throwing away message of "
"type %d and length %d from pipe\n", msg_type, msg_len);
}
for ( ; SS_NORMAL == status && 0 < pending_msg_size; pending_msg_size -= gtmrecv_max_repl_msglen)
{
temp_len = (pending_msg_size < gtmrecv_max_repl_msglen)? pending_msg_size : gtmrecv_max_repl_msglen;
REPL_RECV_LOOP(gtmrecv_sock_fd, gtmrecv_msgp, temp_len, REPL_POLL_WAIT)
; /* Empty Body */
}
*buff_unprocessed = 0; *pending_data_len = 0;
if (SS_NORMAL == status && info && !upd_shut_too_early_logged)
{
repl_log(gtmrecv_log_fp, TRUE, TRUE, "ALERT : User initiated shutdown of Update Process done "
"when there was data in the replication pipe\n");
upd_shut_too_early_logged = TRUE;
}
return_status = CONTINUE_POLL;
}
if (SS_NORMAL != status)
{
if (EREPL_RECV == repl_errno)
{
if (REPL_CONN_RESET(status))
{
repl_log(gtmrecv_log_fp, TRUE, TRUE, "Connection reset while receiving XOFF_ACK. "
"Status = %d ; %s\n", status, STRERROR(status));
repl_close(>mrecv_sock_fd);
repl_connection_reset = TRUE;
xoff_sent = FALSE;
send_badtrans = FALSE;
return_status = STOP_POLL;
} else
RTS_ERROR_CSA_ABT(NULL, VARLSTCNT(7) ERR_REPLCOMM, 0, ERR_TEXT, 2,
LEN_AND_LIT("Error while draining replication pipe. Error in recv"), status);
} else
{
assert(EREPL_SELECT == repl_errno);
RTS_ERROR_CSA_ABT(NULL, VARLSTCNT(7) ERR_REPLCOMM, 0, ERR_TEXT, 2,
LEN_AND_LIT("Error while draining replication pipe. Error in select"), status);
}
}
} else
return_status = STOP_POLL;
/* Like was done before for the XOFF_ACK_ME message, send a BADTRANS/CMP2UNCMP message only if we know
* the endianness of the other side. If not, no point in sending one anyways and saves us trouble in
* case of cross-endian replication connections.
*/
if ((STOP_POLL == return_status) && (send_badtrans || send_cmp2uncmp)
&& (FD_INVALID != gtmrecv_sock_fd) && remote_side->endianness_known)
{ /* Send REPL_BADTRANS or REPL_CMP2UNCMP message */
if (!remote_side->cross_endian)
{
bad_trans_msg.type = send_cmp2uncmp ? REPL_CMP2UNCMP : REPL_BADTRANS;
bad_trans_msg.len = MIN_REPL_MSGLEN;
bad_trans_msg.start_seqno = send_seqno;
} else
{
bad_trans_msg.type = send_cmp2uncmp ? GTM_BYTESWAP_32(REPL_CMP2UNCMP) : GTM_BYTESWAP_32(REPL_BADTRANS);
bad_trans_msg.len = GTM_BYTESWAP_32(MIN_REPL_MSGLEN);
bad_trans_msg.start_seqno = GTM_BYTESWAP_64(send_seqno);
}
REPL_SEND_LOOP(gtmrecv_sock_fd, &bad_trans_msg, MIN_REPL_MSGLEN, REPL_POLL_NOWAIT)
; /* Empty Body */
if (SS_NORMAL == status)
{
if (send_cmp2uncmp)
repl_log(gtmrecv_log_fp, TRUE, TRUE, "REPL_CMP2UNCMP message sent with seqno %llu\n", send_seqno);
else
repl_log(gtmrecv_log_fp, TRUE, TRUE, "REPL_BADTRANS message sent with seqno %llu\n", send_seqno);
} else
{
if (REPL_CONN_RESET(status) && (EREPL_SEND == repl_errno))
{
if (send_cmp2uncmp)
{
repl_log(gtmrecv_log_fp, TRUE, TRUE, "Connection reset while sending REPL_CMP2UNCMP. "
"Status = %d ; %s\n", status, STRERROR(status));
} else
{
repl_log(gtmrecv_log_fp, TRUE, TRUE, "Connection reset while sending REPL_BADTRANS. "
"Status = %d ; %s\n", status, STRERROR(status));
}
repl_close(>mrecv_sock_fd);
repl_connection_reset = TRUE;
return_status = STOP_POLL;
} else if (EREPL_SEND == repl_errno)
RTS_ERROR_CSA_ABT(NULL, VARLSTCNT(7) ERR_REPLCOMM, 0, ERR_TEXT, 2,
LEN_AND_LIT("Error sending REPL_BADTRANS/REPL_CMP2UNCMP. Error in send"), status);
else
{
assert(EREPL_SELECT == repl_errno);
RTS_ERROR_CSA_ABT(NULL, VARLSTCNT(7) ERR_REPLCOMM, 0, ERR_TEXT, 2,
LEN_AND_LIT("Error sending REPL_BADTRANS/REPL_CMP2UNCMP. Error in select"), status);
}
}
send_badtrans = FALSE;
if (send_cmp2uncmp)
{
REPL_DPRINT1("gtmrecv_poll_actions : Setting gtmrecv_wait_for_jnl_seqno to TRUE because this receiver"
"server requested a fall-back from compressed to uncompressed operation\n");
gtmrecv_wait_for_jnl_seqno = TRUE;/* set this to TRUE to break out and go back to a fresh "do_main_loop" */
gtmrecv_bad_trans_sent = TRUE;
gtmrecv_send_cmp2uncmp = FALSE;
send_cmp2uncmp = FALSE;
}
}
if ((upd_proc_local->bad_trans && bad_trans_detected) || onln_rlbk_flg_set
|| ((UPDPROC_START == upd_proc_local->start_upd) && (1 != report_cnt)))
{
if (UPDPROC_START == upd_proc_local->start_upd)
{
assert(is_updproc_alive() != SRV_ALIVE);
upd_proc_local->upd_proc_shutdown = NO_SHUTDOWN;
}
recvpool_ctl->wrapped = FALSE;
recvpool_ctl->write_wrap = recvpool_ctl->recvpool_size;
recvpool_ctl->write = 0;
/* Reset last_rcvd_histinfo, last_valid_histinfo etc. as they reflect context from unprocessed data
* in the receive pool and those are no longer valid because we have drained the receive pool.
*/
GTMRECV_CLEAR_CACHED_HISTINFO(recvpool.recvpool_ctl, jnlpool, INSERT_STRM_HISTINFO_FALSE);
if (UPDPROC_START == upd_proc_local->start_upd)
{
/* Attempt starting the update process */
for (upd_start_attempts = 0;
UPDPROC_START_ERR == (upd_start_status = gtmrecv_upd_proc_init(FALSE)) &&
GTMRECV_MAX_UPDSTART_ATTEMPTS > upd_start_attempts;
upd_start_attempts++)
{
if (EREPL_UPDSTART_SEMCTL == repl_errno || EREPL_UPDSTART_BADPATH == repl_errno)
{
gtmrecv_autoshutdown();
} else if (EREPL_UPDSTART_FORK == repl_errno)
{
/* Couldn't start up update now, can try later */
LONG_SLEEP(GTMRECV_WAIT_FOR_PROC_SLOTS);
continue;
} else if (EREPL_UPDSTART_EXEC == repl_errno)
{
/* In forked child, could not exec, should exit */
gtmrecv_exit(ABNORMAL_SHUTDOWN);
}
}
if (UPDPROC_STARTED == (upd_proc_local->start_upd = upd_start_status))
{
REPL_DPRINT1("gtmrecv_poll_actions : Setting gtmrecv_wait_for_jnl_seqno to TRUE because of "
"upd restart\n");
gtmrecv_wait_for_jnl_seqno = TRUE;
report_cnt = next_report_at = 1;
if (send_xoff && (FD_INVALID == gtmrecv_sock_fd))
{
/* Update start command was issued before connection was established,
* no point in sending XOFF. */
send_xoff = FALSE;
}
} else
{
repl_log(gtmrecv_log_fp, TRUE, TRUE, "%d failed attempts to fork update process. Try later\n",
upd_start_attempts);
}
} else
{
gtmrecv_wait_for_jnl_seqno = TRUE;/* set this to TRUE to break out and go back to a fresh "do_main_loop" */
if (onln_rlbk_flg_set)
{
assert(jnlpool && (NULL != jnlpool->jnlpool_ctl));
repl_log(gtmrecv_log_fp, TRUE, TRUE, "Closing connection due to ONLINE ROLLBACK\n");
repl_log(gtmrecv_log_fp, TRUE, TRUE, "REPL INFO - Current Jnlpool Seqno : %llu\n",
jnlpool->jnlpool_ctl->jnl_seqno);
repl_log(gtmrecv_log_fp, TRUE, TRUE, "REPL INFO - Current Receive Pool Seqno : %llu\n",
recvpool_ctl->jnl_seqno);
repl_close(>mrecv_sock_fd);
repl_connection_reset = TRUE;
xoff_sent = FALSE;
send_badtrans = FALSE;
upd_proc_local->onln_rlbk_flg = FALSE;
/* Before restarting afresh, sync the online rollback cycles. This way any future grab_lock that
* we do after restarting should not realize an unhandled online rollback. For receiver, it is
* just syncing the journal pool cycles as the databases are not opened. But, to be safe, grab
* the lock and sync the cycles.
*/
grab_lock(jnlpool->jnlpool_dummy_reg, TRUE, GRAB_LOCK_ONLY);
SYNC_ONLN_RLBK_CYCLES;
rel_lock(jnlpool->jnlpool_dummy_reg);
return_status = STOP_POLL;
recvpool_ctl->jnl_seqno = 0;
} else
{
REPL_DPRINT1("gtmrecv_poll_actions : Setting gtmrecv_wait_for_jnl_seqno to TRUE because bad trans"
"sent\n");
gtmrecv_bad_trans_sent = TRUE;
upd_proc_local->bad_trans = FALSE;
recvpool_ctl->jnl_seqno = upd_proc_local->read_jnl_seqno;
}
}
}
if ((0 == *pending_data_len) && (0 != gtmrecv_local->changelog))
{
if (gtmrecv_local->changelog & REPLIC_CHANGE_LOGINTERVAL)
{
repl_log(gtmrecv_log_fp, TRUE, TRUE, "Changing log interval from %u to %u\n",
log_interval, gtmrecv_local->log_interval);
log_interval = gtmrecv_local->log_interval;
gtmrecv_reinit_logseqno(); /* will force a LOG on the first recv following the interval change */
}
if (gtmrecv_local->changelog & REPLIC_CHANGE_LOGFILE)
{
repl_log(gtmrecv_log_fp, TRUE, TRUE, "Changing log file to %s\n", gtmrecv_local->log_file);
repl_log_init(REPL_GENERAL_LOG, >mrecv_log_fd, gtmrecv_local->log_file);
repl_log_fd2fp(>mrecv_log_fp, gtmrecv_log_fd);
repl_log(gtmrecv_log_fp, TRUE, TRUE, "Change log to %s successful\n",gtmrecv_local->log_file);
}
/* NOTE: update process and receiver each ignore any setting specific to the other (REPLIC_CHANGE_UPD_LOGINTERVAL,
* REPLIC_CHANGE_LOGINTERVAL) */
if (REPLIC_CHANGE_LOGINTERVAL == gtmrecv_local->changelog)
upd_proc_local->changelog = 0;
else
upd_proc_local->changelog = gtmrecv_local->changelog; /* Pass changelog request to the update process */
gtmrecv_local->changelog = 0;
}
if (0 == *pending_data_len && !gtmrecv_logstats && gtmrecv_local->statslog)
{
gtmrecv_logstats = TRUE;
repl_log(gtmrecv_log_fp, TRUE, TRUE, "Begin statistics logging\n");
} else if (0 == *pending_data_len && gtmrecv_logstats && !gtmrecv_local->statslog)
{
gtmrecv_logstats = FALSE;
/* Force all data out to the file before closing the file */
repl_log(gtmrecv_log_fp, TRUE, TRUE, "End statistics logging\n");
}
if ((0 == *pending_data_len) && (gtmrecv_filter & EXTERNAL_FILTER) && ('\0' == gtmrecv_local->filter_cmd[0]))
{
repl_log(gtmrecv_log_fp, TRUE, TRUE, "Stopping filter\n");
repl_stop_filter();
gtmrecv_filter &= ~EXTERNAL_FILTER;
}
if (0 == *pending_data_len)
{
if (upd_helper_ctl->start_helpers)
{
gtmrecv_helpers_init(upd_helper_ctl->start_n_readers, upd_helper_ctl->start_n_writers);
upd_helper_ctl->start_helpers = FALSE;
}
if (HELPER_REAP_NONE != (status = upd_helper_ctl->reap_helpers) ||
(double)GTMRECV_REAP_HELPERS_INTERVAL <= difftime(gtmrecv_now, last_reap_time))
{
gtmrecv_reap_helpers(HELPER_REAP_WAIT == status);
last_reap_time = gtmrecv_now;
}
}
return (return_status);
}
int gtmrecv_poll_actions(int pending_data_len, int buff_unprocessed, unsigned char *buffp)
{
while (CONTINUE_POLL == gtmrecv_poll_actions1(&pending_data_len, &buff_unprocessed, buffp))
;
return (SS_NORMAL);
}
|