File: gtmsource_heartbeat.c

package info (click to toggle)
fis-gtm 6.3-014-3
  • links: PTS, VCS
  • area: main
  • in suites: bullseye
  • size: 36,680 kB
  • sloc: ansic: 333,039; asm: 5,180; csh: 4,956; sh: 1,924; awk: 291; makefile: 66; sed: 13
file content (249 lines) | stat: -rwxr-xr-x 10,251 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
/****************************************************************
 *								*
 * Copyright (c) 2006-2017 Fidelity National Information	*
 * Services, Inc. and/or its subsidiaries. All rights reserved.	*
 *								*
 *	This source code contains the intellectual property	*
 *	of its copyright holder(s), and is made available	*
 *	under a license.  If you do not know the terms of	*
 *	the license, please stop and do not read further.	*
 *								*
 ****************************************************************/

#include "mdef.h"

#include "gtm_string.h"
#include "gtm_time.h"
#include "gtm_inet.h"	/* Required for gtmsource.h */
#include "gtm_stdio.h"

#include <sys/time.h>
#include <errno.h>
#ifdef VMS
#include <descrip.h> /* Required for gtmsource.h */
#endif

#include "gdsroot.h"
#include "gdsblk.h"
#include "gtm_facility.h"
#include "fileinfo.h"
#include "gdsbt.h"
#include "gdsfhead.h"
#include "filestruct.h"
#include "jnl.h"
#include "repl_msg.h"
#include "gtmsource.h"
#include "repl_comm.h"
#include "repl_dbg.h"
#include "repl_log.h"
#include "repl_errno.h"
#include "iosp.h"
#include "gt_timer.h"
#include "gtmsource_heartbeat.h"
#include "relqop.h"

GBLREF	jnlpool_addrs_ptr_t	jnlpool;
GBLREF	int			gtmsource_sock_fd;
GBLREF	boolean_t		gtmsource_logstats;
GBLREF	int			gtmsource_log_fd;
GBLREF 	FILE			*gtmsource_log_fp;
GBLREF  gtmsource_state_t       gtmsource_state;

GBLDEF	boolean_t			heartbeat_stalled = TRUE;
GBLDEF	repl_heartbeat_que_entry_t	*repl_heartbeat_que_head = NULL;
GBLDEF	repl_heartbeat_que_entry_t	*repl_heartbeat_free_head = NULL;
GBLDEF	volatile time_t			gtmsource_now;
GBLDEF	time_t				last_sent_time, earliest_sent_time;

error_def(ERR_REPLCOMM);
error_def(ERR_TEXT);

static	int				heartbeat_period, heartbeat_max_wait;

void gtmsource_heartbeat_timer(TID tid, int4 interval_len, int *interval_ptr)
{
	assert(0 != gtmsource_now);
	UNIX_ONLY(assert(*interval_ptr == heartbeat_period);)	/* interval_len and interval_ptr are dummies on VMS */
	gtmsource_now += heartbeat_period;			/* cannot use *interval_ptr on VMS */
	REPL_DPRINT4("Repeating heartbeat timer with %d s\tSource now is %ld\tTime now is %ld\n", heartbeat_period,
					gtmsource_now, time(NULL));
	start_timer((TID)gtmsource_heartbeat_timer, heartbeat_period * 1000, gtmsource_heartbeat_timer, SIZEOF(heartbeat_period),
			&heartbeat_period); /* start_timer expects time interval in milli seconds, heartbeat_period is in seconds */
}

int gtmsource_init_heartbeat(void)
{
	int				num_q_entries;
	repl_heartbeat_que_entry_t	*heartbeat_element;

	assert(NULL == repl_heartbeat_que_head);

	heartbeat_period = jnlpool->gtmsource_local->connect_parms[GTMSOURCE_CONN_HEARTBEAT_PERIOD];
	heartbeat_max_wait = jnlpool->gtmsource_local->connect_parms[GTMSOURCE_CONN_HEARTBEAT_MAX_WAIT];
	num_q_entries = DIVIDE_ROUND_UP(heartbeat_max_wait, heartbeat_period) + 2;
	REPL_DPRINT4("Initialized heartbeat, heartbeat_period = %d s, heartbeat_max_wait = %d s, num_q_entries = %d\n",
			heartbeat_period, heartbeat_max_wait, num_q_entries);
	if (!(repl_heartbeat_que_head = (repl_heartbeat_que_entry_t *)malloc(num_q_entries * SIZEOF(repl_heartbeat_que_entry_t))))
		rts_error_csa(CSA_ARG(NULL) VARLSTCNT(7) ERR_REPLCOMM, 0, ERR_TEXT, 2,
			LEN_AND_LIT("Error in allocating heartbeat queue"), errno);

	memset(repl_heartbeat_que_head, 0, num_q_entries * SIZEOF(repl_heartbeat_que_entry_t));
	repl_heartbeat_free_head = repl_heartbeat_que_head + 1;
	*(gtm_time4_t *)&repl_heartbeat_que_head->heartbeat.ack_time[0] = 0;
	*(gtm_time4_t *)&repl_heartbeat_free_head->heartbeat.ack_time[0] = 0;
	for (heartbeat_element = repl_heartbeat_free_head + 1, num_q_entries -= 2;
	     num_q_entries > 0;
	     num_q_entries--, heartbeat_element++)
	{
		insqt((que_ent_ptr_t)heartbeat_element, (que_ent_ptr_t)repl_heartbeat_free_head);
	}
	last_sent_time = gtmsource_now = time(NULL);
	/* Ideally, we should use the Greatest Common Factor of heartbeat_period and GTMSOURCE_LOGSTATS_INTERVAL as the time keeper
	 * interval. As it stands now, we may not honor GTMSOURCE_LOGSTATS_INTERVAL if user specifies a heartbeat value
	 * larger than GTMSOURCE_LOGSTATS_INTERVAL. When we make GTMSOURCE_LOGSTATS_INTERVAL a user configurable parameter,
	 * this code may have to be revisited. Also, modify the check in gtmsource_process (prev_now != (save_now = gtmsource_now))
	 * to be something like (hearbeat_period < difftime((save_now = gtmsource_now), prev_now)). Vinaya 2003, Sep 08
	 */
	start_timer((TID)gtmsource_heartbeat_timer, heartbeat_period * 1000, gtmsource_heartbeat_timer, SIZEOF(heartbeat_period),
			&heartbeat_period); /* start_timer expects time interval in milli seconds, heartbeat_period is in seconds */
	REPL_DPRINT4("Started heartbeat timer with %d s\tSource now is %ld\tTime now is %ld\n",
			heartbeat_period, gtmsource_now, time(NULL));
	heartbeat_stalled = FALSE;
	earliest_sent_time = 0;
	return (SS_NORMAL);
}

int gtmsource_stop_heartbeat(void)
{
	REPL_DPRINT4("Stopping heartbeat timer with %d s\tSource now is %ld\tTime now is %ld\n",
			heartbeat_period, gtmsource_now, time(NULL));
	cancel_timer((TID)gtmsource_heartbeat_timer);
	if (NULL != repl_heartbeat_que_head)
		free(repl_heartbeat_que_head);
	repl_heartbeat_que_head = NULL;
	repl_heartbeat_free_head = NULL;
	last_sent_time = 0;
	earliest_sent_time = 0;
	gtmsource_now = 0;
	heartbeat_stalled = TRUE;
	REPL_DPRINT2("Stopped heartbeat (%d)\n", intrpt_ok_state);
	return (SS_NORMAL);
}

boolean_t gtmsource_is_heartbeat_overdue(time_t *now, repl_heartbeat_msg_ptr_t overdue_heartbeat)
{

	repl_heartbeat_que_entry_t	*heartbeat_element;
	double				time_elapsed;
	unsigned char			seq_num_str[32], *seq_num_ptr;

#ifndef REPL_DISABLE_HEARTBEAT
	if (0 == earliest_sent_time ||
	    (time_elapsed = difftime(*now, earliest_sent_time)) <= (double)heartbeat_max_wait)
		return (FALSE);

	heartbeat_element = (repl_heartbeat_que_entry_t *)remqh((que_ent_ptr_t)repl_heartbeat_que_head);
	if (NULL == heartbeat_element)
	{
		assert(FALSE);
		return (FALSE);
	}

	memcpy(overdue_heartbeat, &heartbeat_element->heartbeat, SIZEOF(repl_heartbeat_msg_t));

	REPL_DPRINT5("Overdue heartbeat - SEQNO : "INT8_FMT" time : %ld now : %ld difftime : %00.f\n",
		     INT8_PRINT(*(seq_num *)&overdue_heartbeat->ack_seqno[0]), *(gtm_time4_t *)&overdue_heartbeat->ack_time[0],
		     *now, time_elapsed);

	insqt((que_ent_ptr_t)heartbeat_element, (que_ent_ptr_t)repl_heartbeat_free_head);

	return (TRUE);
#else
	return (FALSE);
#endif
}

int gtmsource_send_heartbeat(time_t *now)
{
	repl_heartbeat_que_entry_t	*heartbeat_element;
	unsigned char			*msg_ptr;				/* needed for REPL_{SEND,RECV}_LOOP */
	int				tosend_len, sent_len, sent_this_iter;	/* needed for REPL_SEND_LOOP */
	int				status, poll_dir;			/* needed for REPL_{SEND,RECV}_LOOP */
	unsigned char			seq_num_str[32], *seq_num_ptr;
	gtmsource_local_ptr_t		gtmsource_local;

	heartbeat_element = (repl_heartbeat_que_entry_t *)remqh((que_ent_ptr_t)repl_heartbeat_free_head);
	if (NULL == heartbeat_element) /* Too many pending heartbeats, send later */
		return (SS_NORMAL);
	QWASSIGN(*(seq_num *)&heartbeat_element->heartbeat.ack_seqno[0], jnlpool->jnlpool_ctl->jnl_seqno);
	*(gtm_time4_t *)&heartbeat_element->heartbeat.ack_time[0] = (gtm_time4_t)(*now);

	heartbeat_element->heartbeat.type = REPL_HEARTBEAT;
	heartbeat_element->heartbeat.len = MIN_REPL_MSGLEN;
	REPL_SEND_LOOP(gtmsource_sock_fd, &heartbeat_element->heartbeat, MIN_REPL_MSGLEN, REPL_POLL_NOWAIT)
	{
		gtmsource_poll_actions(FALSE);  /* Recursive call */
		if ((GTMSOURCE_WAITING_FOR_CONNECTION == gtmsource_state) || (GTMSOURCE_CHANGING_MODE == gtmsource_state)
				|| (GTMSOURCE_HANDLE_ONLN_RLBK == gtmsource_state))
			return (SS_NORMAL);
	}
	if (SS_NORMAL == status)
	{
		insqt((que_ent_ptr_t)heartbeat_element, (que_ent_ptr_t)repl_heartbeat_que_head);
		last_sent_time = *now;
		if (0 == earliest_sent_time)
			earliest_sent_time = last_sent_time;

		REPL_DPRINT4("HEARTBEAT sent with time %ld SEQNO "INT8_FMT" at %ld\n",
			     *(gtm_time4_t *)&heartbeat_element->heartbeat.ack_time[0],
			     INT8_PRINT(*(seq_num *)&heartbeat_element->heartbeat.ack_seqno[0]), time(NULL));

		return (SS_NORMAL);
	}
	if (EREPL_SEND == repl_errno && REPL_CONN_RESET(status))
	{
		repl_log(gtmsource_log_fp, TRUE, TRUE, "Connection reset while attempting to send heartbeat. Status = %d ; %s\n",
				status, STRERROR(status));
		repl_close(&gtmsource_sock_fd);
		gtmsource_state = jnlpool->gtmsource_local->gtmsource_state = GTMSOURCE_WAITING_FOR_CONNECTION;
		return (SS_NORMAL);
	}
	if (EREPL_SEND == repl_errno)
		rts_error_csa(CSA_ARG(NULL) VARLSTCNT(7) ERR_REPLCOMM, 0, ERR_TEXT, 2,
			LEN_AND_LIT("Error sending HEARTBEAT message. Error in send"), status);
	if (EREPL_SELECT == repl_errno)
		rts_error_csa(CSA_ARG(NULL) VARLSTCNT(7) ERR_REPLCOMM, 0, ERR_TEXT, 2,
			LEN_AND_LIT("Error sending HEARTBEAT message. Error in select"), status);
	assertpro((SS_NORMAL == status));
	return -1; /* This will never get executed, added to make compiler happy */
}

int gtmsource_process_heartbeat(repl_heartbeat_msg_ptr_t heartbeat_msg)
{
	repl_heartbeat_que_entry_t	*heartbeat_element;
	seq_num				ack_seqno;
	gd_region			*reg, *region_top;
	sgmnt_addrs			*csa;
	unsigned char			seq_num_str[32], *seq_num_ptr;

	QWASSIGN(ack_seqno, *(seq_num *)&heartbeat_msg->ack_seqno[0]);
	REPL_DPRINT4("HEARTBEAT received with time %ld SEQNO "INT8_FMT" at %ld\n",
		     *(gtm_time4_t *)&heartbeat_msg->ack_time[0], INT8_PRINT(ack_seqno), time(NULL));

	for (heartbeat_element = (repl_heartbeat_que_entry_t *)remqh((que_ent_ptr_t)repl_heartbeat_que_head);
	     NULL !=  heartbeat_element&&
	     *(gtm_time4_t *)&heartbeat_msg->ack_time[0] >= (gtm_time4_t)earliest_sent_time;
	     heartbeat_element = (repl_heartbeat_que_entry_t *)remqh((que_ent_ptr_t)repl_heartbeat_que_head))
	{
		insqt((que_ent_ptr_t)heartbeat_element, (que_ent_ptr_t)repl_heartbeat_free_head);
		earliest_sent_time =
			(time_t) *(gtm_time4_t *)&((repl_heartbeat_que_entry_t *)
						   ((unsigned char *)repl_heartbeat_que_head +
						    repl_heartbeat_que_head->que.fl))->heartbeat.ack_time[0];
	}

	if (NULL != heartbeat_element)
		insqh((que_ent_ptr_t)heartbeat_element, (que_ent_ptr_t)repl_heartbeat_que_head);

	return (SS_NORMAL);
}