1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421
|
/****************************************************************
* *
* Copyright (c) 2006-2023 Fidelity National Information *
* Services, Inc. and/or its subsidiaries. All rights reserved. *
* *
* This source code contains the intellectual property *
* of its copyright holder(s), and is made available *
* under a license. If you do not know the terms of *
* the license, please stop and do not read further. *
* *
****************************************************************/
#ifndef GTMRECV_H
#define GTMRECV_H
/* Needs mdef.h, gdsfhead.h and its dependencies, and iosp.h */
#define DEFAULT_SHUTDOWN_TIMEOUT 30 /* seconds */
#define MAX_FILTER_CMD_LEN 512 /* characters */
#define UPD_HELPERS_DELIM ','
#define MAX_UPD_HELPERS 128 /* Max helper process (incl. readers and writers) one instance can support */
#define MIN_UPD_HELPERS 1 /* Minimum number of helper processes, one for reading or writing */
#define DEFAULT_UPD_HELPERS 8 /* If value for -HELPERS is not specified, start these many helpers. Change
* DEFAULT_UPD_HELPERS_STR if you change DEFAULT_UPD_HELPERS */
#define DEFAULT_UPD_HELP_READERS 5 /* If -HELPERS is not specified, or specified as -HELPERS=,n start these many
* readers. Change DEFAULT_UPD_HELPERS_STR if you change DEFAULT_UPD_HELP_READERS */
#define DEFAULT_UPD_HELPERS_STR "8,5" /* Built as "DEFAULT_UPD_HELPERS,DEFAULT_UPD_HELP_READERS". Maintain DEFAULT for
* /helpers in vvms:mupip_cmd.cld in sync with DEFAULT_UPD_HELPERS_STR */
#ifdef VMS
#define MAX_GSEC_KEY_LEN 32 /* 31 is allowed + 1 for NULL terminator */
#endif
typedef enum
{
GTMRECV_DUMMY_STATE = 0,
GTMRECV_START,
GTMRECV_WAITING_FOR_CONNECTION,
GTMRECV_RECEIVING_MSGS,
GTMRECV_WAITING_FOR_UPD_CRASH_RESTART,
GTMRECV_WAITING_FOR_UPD_SHUT_RESTART
} gtmrecv_state_t;
enum
{
UPDPROC_STARTED,
UPDPROC_START,
UPDPROC_EXISTS,
UPDPROC_START_ERR
};
enum
{
GTMRECV_NO_RESTART,
GTMRECV_RCVR_RESTARTED,
GTMRECV_UPD_RESTARTED
};
enum
{
HELPER_REAP_NONE = 0,
HELPER_REAP_NOWAIT,
HELPER_REAP_WAIT
};
#define GTMRECV_WAIT_FOR_PROC_SLOTS 1 /* s */
#define GTMRECV_WAIT_FOR_UPDSTART (1000 - 1) /* ms */
#define GTMRECV_WAIT_FOR_UPD_SHUTDOWN 10 /* ms */
#define GTMRECV_MAX_UPDSTART_ATTEMPTS 16
#define GTMRECV_WAIT_FOR_RECVSTART (1000 - 1) /* ms */
#define GTMRECV_WAIT_FOR_SRV_START 10 /* ms */
#define GTMRECV_REAP_HELPERS_INTERVAL 300 /* s */
#define SRV_ALIVE 0x0
#define SRV_DEAD 0x1
#define SRV_ERR 0x2
/* The exit status of checkhealth is BIT-OR of the Receiver status and the
* Update status */
#define RECEIVER_SRV_ALIVE 0x00
#define RECEIVER_SRV_DEAD 0x01
#define RECEIVER_CHECKHEALTH_ERR 0x02
#define UPDATE_PROC_ALIVE 0x00
#define UPDATE_PROC_DEAD 0x04
#define UPDATE_CHECKHEALTH_ERR 0x08
#define RECVPOOL_SEGMENT 'R'
#define DEFAULT_RECVPOOL_SIZE (2 << 25) /* 64MiB */
#define MIN_RECVPOOL_SIZE (2 << 19) /* 1MiB */
#define MAX_RECVPOOL_SIZE (0x1000000000LL) /* 64GB */
#define GTMRECV_MIN_TCP_SEND_BUFSIZE (512) /* anything less than this, issue a warning */
#define GTMRECV_TCP_SEND_BUFSIZE (1024) /* not much outbound traffic, we can live with a low limit */
#define GTMRECV_MIN_TCP_RECV_BUFSIZE (16 * 1024) /* anything less than this, issue a warning */
#define GTMRECV_TCP_RECV_BUFSIZE_INCR (32 * 1024) /* attempt to get a larger buffer with this increment */
#define GTMRECV_TCP_RECV_BUFSIZE (1024 * 1024) /* desirable to set the buffer size to be able to receive large chunks */
#define IS_RCVR_SRVR_FALSE FALSE
#define IS_RCVR_SRVR_TRUE TRUE
#include <pthread.h>
/* Note: fields shared between the receiver and update processes
* really need to have memory barriers or other appropriate
* synchronization constructs to ensure changes by one
* process are actually seen by the other process. Cache
* line spacing should also be taken into account.
* Adding volatile is only a start at this.
*/
typedef struct
{
replpool_identifier recvpool_id; /* Shared memory identification */
volatile seq_num jnl_seqno; /* Sequence number of the next transaction expected to be received from source
* server. Updated by Receiver Server */
seq_num old_jnl_seqno; /* Stores the value of jnl_seqno before it is set to 0 when upd crash/shut */
repl_conn_info_t this_side; /* Replication connection details of this side/instance */
gtm_uint64_t recvdata_base_off; /* Receive pool offset from where journal data starts */
gtm_uint64_t recvpool_size; /* Available space for journal data in bytes */
volatile gtm_uint64_t write; /* Relative offset from recvdata_base_off for for the next journal record to be
* written. Updated by Receiver Server */
volatile gtm_uint64_t write_wrap; /* Relative offset from recvdata_base_off where write was wrapped by recvr srvr */
volatile uint4 wrapped; /* Boolean, set by Receiver Server when it wraps. Reset by Update Process when it
* wraps. Used for detecting space used in the receive pool */
uint4 initialized; /* Boolean, has receive pool been initialized? */
uint4 fresh_start; /* Boolean, fresh_start or crash_start? */
repl_histinfo last_rcvd_histinfo; /* history from the last received REPL_HISTREC message */
repl_histinfo last_valid_histinfo; /* history corresponding to last logical record written
* into receive pool by the receiver. This is almost always
* the same as last_rcvd_histinfo except in the window
* between receiving a new REPL_HISTREC and the first
* logical records corresponding to it.
*/
repl_histinfo last_rcvd_strm_histinfo[MAX_SUPPL_STRMS]; /* same as last_rcvd_histinfo but for each
* non-supplementary stream. Used only in a
* propagating supplementary instance.
*/
repl_histinfo last_valid_strm_histinfo[MAX_SUPPL_STRMS]; /* same as last_valid_histinfo but for each
* non-supplementary stream. Used only in a
* propagating supplementary instance.
*/
boolean_t is_valid_strm_histinfo[MAX_SUPPL_STRMS]; /* TRUE if corresponding entry in
* "last_rcvd_strm_histinfo[]" array is valid
*/
uint4 max_strm_histinfo; /* maximum valid index in "is_valid_strm_histinfo" array */
boolean_t insert_strm_histinfo; /* true for a supplementary propagating primary if the receiver
* has to insert REPL_HISTREC records for each valid stream
* into the receive pool.
*/
pthread_mutex_t write_updated_ctl;
pthread_cond_t write_updated;
} recvpool_ctl_struct;
#define INSERT_STRM_HISTINFO_FALSE FALSE
#define INSERT_STRM_HISTINFO_TRUE TRUE
#define GTMRECV_CLEAR_CACHED_HISTINFO(RECVPOOL_CTL, JNLPOOL, INSERT_STRM_HISTINFO) \
{ \
memset(&RECVPOOL_CTL->last_rcvd_histinfo, 0, SIZEOF(RECVPOOL_CTL->last_rcvd_histinfo)); \
memset(&RECVPOOL_CTL->last_valid_histinfo, 0, SIZEOF(RECVPOOL_CTL->last_valid_histinfo)); \
assert((NULL != JNLPOOL) && (NULL != JNLPOOL->repl_inst_filehdr)); \
assert(NULL != JNLPOOL->jnlpool_ctl); \
if (JNLPOOL->repl_inst_filehdr->is_supplementary && JNLPOOL->jnlpool_ctl->upd_disabled) \
{ /* The below fields are used only in case of a supplementary instance where updates are disabled. \
* So avoid initializing them in any other case. \
*/ \
memset(&RECVPOOL_CTL->last_rcvd_strm_histinfo[0], 0, SIZEOF(RECVPOOL_CTL->last_rcvd_strm_histinfo)); \
memset(&RECVPOOL_CTL->last_valid_strm_histinfo[0], 0, SIZEOF(RECVPOOL_CTL->last_valid_strm_histinfo)); \
memset(&RECVPOOL_CTL->is_valid_strm_histinfo[0], 0, SIZEOF(RECVPOOL_CTL->is_valid_strm_histinfo)); \
RECVPOOL_CTL->max_strm_histinfo = 0; \
assert((0 == RECVPOOL_CTL->jnl_seqno) \
|| ((0 < RECVPOOL_CTL->jnl_seqno) && (RECVPOOL_CTL->jnl_seqno >= JNLPOOL->jnlpool_ctl->jnl_seqno))); \
assert(0 < JNLPOOL->jnlpool_ctl->jnl_seqno); \
RECVPOOL_CTL->insert_strm_histinfo = INSERT_STRM_HISTINFO; \
} \
}
/*
* The following structure contains Update Process related data items.
* Maintaining this structure in the Receive pool provides for
* persistence across instantiations of the Update Process (across crashes,
* the receive pool is preserved)
*/
typedef struct
{
uint4 upd_proc_pid; /* Process identification of update server */
uint4 upd_proc_pid_prev; /* Save for reporting old pid if we fail */
volatile seq_num read_jnl_seqno; /* Next jnl_seqno to be read; keep aligned at 8 byte boundary for performance */
volatile gtm_uint64_t read; /* Relative offset from recvdata_base_off of the next journal record to be
* read from the receive pool */
volatile uint4 upd_proc_shutdown; /* Used to communicate shutdown related values between Receiver and Update */
volatile int4 upd_proc_shutdown_time; /* Time allowed for update process to shut down */
volatile uint4 bad_trans; /* Boolean, set by Update Process that it received a bad transaction record */
volatile uint4 changelog; /* Boolean - change the log file */
int4 start_upd; /* Used to communicate upd only startup values */
volatile uint4 log_interval; /* Interval (in seqnos) at which update process logs its progress */
char log_file[MAX_FN_LEN + 1];
volatile uint4 onln_rlbk_flg; /* Set to TRUE every time update process sees an online rollback. Set to FALSE ONLY
* by receiver server */
} upd_proc_local_struct;
/*
* The following structure contains data items local to the Receiver Server,
* but are in the Receive Pool to provide for persistence across instantiations
* of the Receiver Server (across Receiver Server crashes, the Receive
* Pool is preserved).
*/
typedef struct
{
uint4 recv_serv_pid; /* Process identification of receiver server */
int4 lastrecvd_time; /* unused */
/* Data items used in communicating action qualifiers (show statistics, shutdown) and
* qualifier values (log file, shutdown time, etc). */
volatile uint4 statslog; /* Boolean - detailed log on/off? */
volatile uint4 shutdown; /* Used to communicate shutdown related values between process initiating shutdown
* and Receiver Server */
int4 shutdown_time; /* Time allowed for shutdown in seconds */
int4 listen_port; /* Port at which the Receiver Server is listening */
volatile uint4 restart; /* Used by receiver server to coordinate crash restart with update process */
volatile uint4 changelog; /* Boolean - change the log file */
volatile uint4 log_interval; /* Interval (in seqnos) at which receiver logs its progress */
char filter_cmd[MAX_FILTER_CMD_LEN]; /* Receiver filters incoming records using this process */
char log_file[MAX_FN_LEN + 1]; /* File to log receiver progress */
char statslog_file[MAX_FN_LEN + 1]; /* File to log statistics */
repl_conn_info_t remote_side; /* Details of the remote side/instance of the connection */
int4 strm_index;
boolean_t updateresync; /* Copy of gtmrecv_options.updateresync; This is cleared once first history
* record gets applied on the receiver after the first connect with a source.
*/
boolean_t noresync; /* Copy of gtmrecv_options.noresync; This is cleared once first history
* record gets applied on the receiver after the first connect with a source.
*/
int updresync_instfile_fd; /* fd of the instance file name specified in -UPDATERESYNC= */
int4 updresync_num_histinfo; /* "num_histinfo" member of instance file header from -UPDATERESYNC=<INSTFILE> */
boolean_t updresync_cross_endian; /* is the -updateresync instance file cross endian relative to current instance */
int4 updresync_num_histinfo_strm[MAX_SUPPL_STRMS]; /* "last_histinfo_num[]" member of instance file header
* from -UPDATERESYNC=<INSTFILE> */
repl_inst_uuid updresync_lms_group; /* "lms_group_info" member of instance file header from -UPDATERESYNC=<INSTFILE> */
seq_num updresync_jnl_seqno; /* "jnl_seqno" member of instance file header from -UPDATERESYNC=<INSTFILE> */
repl_inst_uuid remote_lms_group; /* "lms_group_info" member of remote instance file header.
* Initialized only if the receiving instance is a supplementary root primary. */
} gtmrecv_local_struct;
#ifdef VMS
typedef struct
{
char name[MAX_GSEC_KEY_LEN];
struct dsc$descriptor_s desc;
char filler[3];
} vms_shm_key;
#endif
/*
* The following structure contains data items local to the Update Helpers,
* but are in the Receive Pool to provide for persistence across instantiations
* of the Helpers (the Receive Pool is preserved across helper crashes).
*/
typedef struct
{
uint4 helper_pid; /* Owner of this entry. Non-zero indicates entry occupied */
uint4 helper_pid_prev;/* Copy of helper_pid, used to recognize helpers that are now gone and salvage entries */
uint4 helper_type; /* READER or WRITER */
volatile uint4 helper_shutdown;/* used to communicate to the helpers to shut down */
} upd_helper_entry_struct;
typedef struct
{
global_latch_t pre_read_lock; /* operated by pre-readers. Used to control access to next_read_offset */
volatile uint4 pre_read_offset; /* updated by updproc, read-only by pre-readers */
volatile boolean_t first_done; /* pre-readers use this to elect ONE that computes where to begin/resume */
volatile uint4 next_read_offset; /* offset in recvpool of the next record to be pre-read by pre-readers */
uint4 start_helpers; /* TRUE: receiver to start helpers, FALSE: receiver finished helper start */
uint4 start_n_readers; /* start/started these many readers */
uint4 start_n_writers; /* start/started these many writers */
uint4 reap_helpers; /* receiver to salvage slots vacated by dead helpers */
upd_helper_entry_struct helper_list[MAX_UPD_HELPERS]; /* helper information */
} upd_helper_ctl_struct;
/*
* Receive pool shared memory layout -
*
* recvpool_ctl_struct
* upd_proc_local_struct
* gtmrecv_local_struct
* upd_helper_ctl_struct
* zero or more journal records
*/
#define RECVPOOL_CTL_SIZE ROUND_UP(SIZEOF(recvpool_ctl_struct), CACHELINE_SIZE)
#define UPD_PROC_LOCAL_SIZE ROUND_UP(SIZEOF(upd_proc_local_struct), CACHELINE_SIZE)
#define GTMRECV_LOCAL_SIZE ROUND_UP(SIZEOF(gtmrecv_local_struct), CACHELINE_SIZE)
#define UPD_HELPER_CTL_SIZE ROUND_UP(SIZEOF(upd_helper_ctl_struct), CACHELINE_SIZE)
#define RECVDATA_BASE_OFF ROUND_UP(RECVPOOL_CTL_SIZE + UPD_HELPER_CTL_SIZE + GTMRECV_LOCAL_SIZE + UPD_HELPER_CTL_SIZE, \
JNL_REC_START_BNDRY)
#if defined(__osf__) && defined(__alpha)
# pragma pointer_size(save)
# pragma pointer_size(long)
#endif
typedef recvpool_ctl_struct *recvpool_ctl_ptr_t;
typedef upd_proc_local_struct *upd_proc_local_ptr_t;
typedef gtmrecv_local_struct *gtmrecv_local_ptr_t;
typedef upd_helper_entry_struct *upd_helper_entry_ptr_t;
typedef upd_helper_ctl_struct *upd_helper_ctl_ptr_t;
#if defined(__osf__) && defined(__alpha)
# pragma pointer_size(restore)
#endif
typedef struct
{
recvpool_ctl_ptr_t recvpool_ctl;
upd_proc_local_ptr_t upd_proc_local;
gtmrecv_local_ptr_t gtmrecv_local;
upd_helper_ctl_ptr_t upd_helper_ctl;
sm_uc_ptr_t recvdata_base;
#ifdef UNIX
gd_region *recvpool_dummy_reg;
#elif VMS
int4 shm_range[2];
int4 shm_lockid;
vms_shm_key vms_recvpool_key;
#endif
} recvpool_addrs;
typedef enum
{
UPDPROC,
UPD_HELPER_READER,
UPD_HELPER_WRITER,
GTMRECV,
# ifdef UNIX
GTMZPEEK
# endif
# ifdef VMS
GTMRECV_CHILD
# endif
} recvpool_user;
typedef struct
{
boolean_t start;
boolean_t shut_down;
boolean_t checkhealth;
boolean_t statslog;
boolean_t showbacklog;
boolean_t updateonly;
boolean_t stopsourcefilter;
boolean_t changelog;
gtm_uint64_t buffsize;
int4 shutdown_time;
int4 listen_port;
boolean_t updateresync;
boolean_t noresync;
uint4 rcvr_log_interval;
uint4 upd_log_interval;
boolean_t helpers;
boolean_t reuse_specified;
boolean_t resume_specified;
boolean_t initialize_specified;
int4 resume_strm_num;
int4 n_readers;
int4 n_writers;
int4 cmplvl;
char log_file[MAX_FN_LEN + 1];
char updresync_instfilename[MAX_FN_LEN + 1];
char filter_cmd[MAX_FILTER_CMD_LEN];
char reuse_instname[MAX_INSTNAME_LEN];
boolean_t autorollback;
boolean_t autorollback_verbose;
boolean_t stopreceiverfilter;
int4 send_buffsize;
int4 recv_buffsize;
} gtmrecv_options_t;
#include "gtm_inet.h"
/********** Receiver server function prototypes **********/
int gtmrecv(void);
int gtmrecv_changelog(void);
int gtmrecv_checkhealth(void);
int gtmrecv_comm_init(in_port_t port);
int gtmrecv_end1(boolean_t auto_shutdown);
int gtmrecv_endupd(void);
void gtmrecv_end(void);
int gtmrecv_get_opt(void);
int gtmrecv_poll_actions1(int *pending_data_len, int *buff_unprocessed, unsigned char *buffp);
int gtmrecv_poll_actions(int pending_data_len, int buff_unprocessed, unsigned char *buffp);
void gtmrecv_process(boolean_t crash_restart);
int gtmrecv_showbacklog(void);
int gtmrecv_shutdown(boolean_t auto_shutdown, int exit_status);
void gtmrecv_sigstop(void);
void gtmrecv_autoshutdown(void);
int gtmrecv_statslog(void);
int gtmrecv_ipc_cleanup(boolean_t auto_shutdown, int *exit_status);
int gtmrecv_start_updonly(void);
int gtmrecv_upd_proc_init(boolean_t fresh_start);
int gtmrecv_wait_for_detach(void);
void gtmrecv_exit(int exit_status);
int gtmrecv_alloc_msgbuff(void);
void gtmrecv_free_msgbuff(void);
int gtmrecv_alloc_filter_buff(int bufsiz);
void gtmrecv_free_filter_buff(void);
int is_updproc_alive(void);
int is_srv_alive(int srv_type);
int is_recv_srv_alive(void);
void recvpool_init(recvpool_user pool_user, boolean_t gtmrecv_startup);
void gtmrecv_reinit_logseqno(void);
int gtmrecv_helpers_init(int n_readers, int n_writers);
int gtmrecv_start_helpers(int n_readers, int n_writers);
void gtmrecv_reap_helpers(boolean_t wait);
int gtmrecv_end_helpers(boolean_t is_rcvr_srvr);
void gtmrecv_onln_rlbk_clnup(void);
int gtmrecv_stopfilter(void);
#endif
|