File: gtmsource_process.c

package info (click to toggle)
fis-gtm 7.1-006-1
  • links: PTS, VCS
  • area: main
  • in suites: trixie
  • size: 32,908 kB
  • sloc: ansic: 344,906; asm: 5,184; csh: 4,859; sh: 2,000; awk: 294; makefile: 73; sed: 13
file content (2028 lines) | stat: -rw-r--r-- 92,134 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
1356
1357
1358
1359
1360
1361
1362
1363
1364
1365
1366
1367
1368
1369
1370
1371
1372
1373
1374
1375
1376
1377
1378
1379
1380
1381
1382
1383
1384
1385
1386
1387
1388
1389
1390
1391
1392
1393
1394
1395
1396
1397
1398
1399
1400
1401
1402
1403
1404
1405
1406
1407
1408
1409
1410
1411
1412
1413
1414
1415
1416
1417
1418
1419
1420
1421
1422
1423
1424
1425
1426
1427
1428
1429
1430
1431
1432
1433
1434
1435
1436
1437
1438
1439
1440
1441
1442
1443
1444
1445
1446
1447
1448
1449
1450
1451
1452
1453
1454
1455
1456
1457
1458
1459
1460
1461
1462
1463
1464
1465
1466
1467
1468
1469
1470
1471
1472
1473
1474
1475
1476
1477
1478
1479
1480
1481
1482
1483
1484
1485
1486
1487
1488
1489
1490
1491
1492
1493
1494
1495
1496
1497
1498
1499
1500
1501
1502
1503
1504
1505
1506
1507
1508
1509
1510
1511
1512
1513
1514
1515
1516
1517
1518
1519
1520
1521
1522
1523
1524
1525
1526
1527
1528
1529
1530
1531
1532
1533
1534
1535
1536
1537
1538
1539
1540
1541
1542
1543
1544
1545
1546
1547
1548
1549
1550
1551
1552
1553
1554
1555
1556
1557
1558
1559
1560
1561
1562
1563
1564
1565
1566
1567
1568
1569
1570
1571
1572
1573
1574
1575
1576
1577
1578
1579
1580
1581
1582
1583
1584
1585
1586
1587
1588
1589
1590
1591
1592
1593
1594
1595
1596
1597
1598
1599
1600
1601
1602
1603
1604
1605
1606
1607
1608
1609
1610
1611
1612
1613
1614
1615
1616
1617
1618
1619
1620
1621
1622
1623
1624
1625
1626
1627
1628
1629
1630
1631
1632
1633
1634
1635
1636
1637
1638
1639
1640
1641
1642
1643
1644
1645
1646
1647
1648
1649
1650
1651
1652
1653
1654
1655
1656
1657
1658
1659
1660
1661
1662
1663
1664
1665
1666
1667
1668
1669
1670
1671
1672
1673
1674
1675
1676
1677
1678
1679
1680
1681
1682
1683
1684
1685
1686
1687
1688
1689
1690
1691
1692
1693
1694
1695
1696
1697
1698
1699
1700
1701
1702
1703
1704
1705
1706
1707
1708
1709
1710
1711
1712
1713
1714
1715
1716
1717
1718
1719
1720
1721
1722
1723
1724
1725
1726
1727
1728
1729
1730
1731
1732
1733
1734
1735
1736
1737
1738
1739
1740
1741
1742
1743
1744
1745
1746
1747
1748
1749
1750
1751
1752
1753
1754
1755
1756
1757
1758
1759
1760
1761
1762
1763
1764
1765
1766
1767
1768
1769
1770
1771
1772
1773
1774
1775
1776
1777
1778
1779
1780
1781
1782
1783
1784
1785
1786
1787
1788
1789
1790
1791
1792
1793
1794
1795
1796
1797
1798
1799
1800
1801
1802
1803
1804
1805
1806
1807
1808
1809
1810
1811
1812
1813
1814
1815
1816
1817
1818
1819
1820
1821
1822
1823
1824
1825
1826
1827
1828
1829
1830
1831
1832
1833
1834
1835
1836
1837
1838
1839
1840
1841
1842
1843
1844
1845
1846
1847
1848
1849
1850
1851
1852
1853
1854
1855
1856
1857
1858
1859
1860
1861
1862
1863
1864
1865
1866
1867
1868
1869
1870
1871
1872
1873
1874
1875
1876
1877
1878
1879
1880
1881
1882
1883
1884
1885
1886
1887
1888
1889
1890
1891
1892
1893
1894
1895
1896
1897
1898
1899
1900
1901
1902
1903
1904
1905
1906
1907
1908
1909
1910
1911
1912
1913
1914
1915
1916
1917
1918
1919
1920
1921
1922
1923
1924
1925
1926
1927
1928
1929
1930
1931
1932
1933
1934
1935
1936
1937
1938
1939
1940
1941
1942
1943
1944
1945
1946
1947
1948
1949
1950
1951
1952
1953
1954
1955
1956
1957
1958
1959
1960
1961
1962
1963
1964
1965
1966
1967
1968
1969
1970
1971
1972
1973
1974
1975
1976
1977
1978
1979
1980
1981
1982
1983
1984
1985
1986
1987
1988
1989
1990
1991
1992
1993
1994
1995
1996
1997
1998
1999
2000
2001
2002
2003
2004
2005
2006
2007
2008
2009
2010
2011
2012
2013
2014
2015
2016
2017
2018
2019
2020
2021
2022
2023
2024
2025
2026
2027
2028
 /***************************************************************
 *								*
 * Copyright (c) 2006-2023 Fidelity National Information	*
 * Services, Inc. and/or its subsidiaries. All rights reserved.	*
 *								*
 *	This source code contains the intellectual property	*
 *	of its copyright holder(s), and is made available	*
 *	under a license.  If you do not know the terms of	*
 *	the license, please stop and do not read further.	*
 *								*
 ****************************************************************/

#if defined(__MVS__) && !defined(_ISOC99_SOURCE)
#define _ISOC99_SOURCE
#endif

#include "mdef.h"

#include "gtm_string.h"
#include "gtm_stdio.h"
#include "gtm_socket.h"
#include "gtm_netdb.h"
#include "gtm_inet.h"
#include "gtm_fcntl.h"
#include "gtm_unistd.h"
#include "gtm_time.h"
#include "gtm_stat.h"
#include "gtm_signal.h"
#include <sys/time.h>

#include <errno.h>
#include "gdsroot.h"
#include "gdsblk.h"
#include "gtm_facility.h"
#include "fileinfo.h"
#include "gdsbt.h"
#include "gdsfhead.h"
#include "filestruct.h"
#include "repl_msg.h"
#include "gtmsource.h"
#include "sgtm_putmsg.h"
#include "repl_comm.h"
#include "jnl.h"
#include "hashtab_mname.h"    /* needed for muprec.h */
#include "hashtab_int4.h"     /* needed for muprec.h */
#include "hashtab_int8.h"     /* needed for muprec.h */
#include "buddy_list.h"
#include "muprec.h"
#include "repl_ctl.h"
#include "repl_errno.h"
#include "repl_dbg.h"
#include "repl_shutdcode.h"
#include "iosp.h"
#include "gt_timer.h"
#include "gtmsource_heartbeat.h"
#include "repl_filter.h"
#include "repl_log.h"
#include "min_max.h"
#include "copy.h"
#include "ftok_sems.h"
#include "repl_instance.h"
#include "gtmmsg.h"
#include "repl_sem.h"
#include "have_crit.h"			/* needed for ZLIB_COMPRESS */
#include "deferred_signal_handler.h"	/* needed for ZLIB_COMPRESS */
#include "gtm_zlib.h"
#include "repl_sort_tr_buff.h"
#include "replgbl.h"
#include "gtmsource_srv_latch.h"
#include "gv_trigger_common.h"
#include "wbox_test_init.h"
#ifdef GTM_TLS
#include "gtm_repl.h"
#endif

#define MAX_HEXDUMP_CHARS_PER_LINE	26		/* 2 characters per byte + space, 80 column assumed */

#define BREAK_IF_CMP_ERROR(CMPRET, SEND_TR_LEN)											\
{																\
	switch(CMPRET)														\
	{															\
		case Z_MEM_ERROR:												\
			repl_log(gtmsource_log_fp, TRUE, FALSE, "Out-of-memory error from compress function "			\
					"while compressing %d bytes\n", SEND_TR_LEN);						\
			assert(FALSE);												\
			break;													\
		case Z_BUF_ERROR:												\
			repl_log(gtmsource_log_fp, TRUE, FALSE, "Insufficient output buffer error from compress function "	\
					"while compressing %d bytes\n", SEND_TR_LEN);						\
			assert(FALSE);												\
			break;													\
		case Z_STREAM_ERROR:												\
			repl_log(gtmsource_log_fp, TRUE, FALSE, "Compression level %d invalid error from compress function "	\
					"while compressing %d bytes\n", repl_zlib_cmp_level, SEND_TR_LEN);			\
			assert(FALSE);												\
			break;													\
	}															\
}

#define SET_8BYTE_CMP_MSGHDR(SEND_MSGP, SEND_TR_LEN, CMPBUFLEN, MSGHDRLEN)							\
{																\
	SEND_MSGP->type = (SEND_TR_LEN << REPL_TR_CMP_MSG_TYPE_BITS) | REPL_TR_CMP_JNL_RECS;					\
	SEND_MSGP->len = (int4)cmpbuflen + msghdrlen;										\
	/* Note that a compressed message need not be 8-byte aligned even though the input message was. So round it up to	\
	 * the nearest align boundary. The actual message will contain the unaligned length which is what the receiver will	\
	 * receive. But the # of bytes transmitted across will be the aligned length.						\
	 */															\
	SEND_TR_LEN = ROUND_UP(SEND_MSGP->len, REPL_MSG_ALIGN);									\
}

#define SET_16BYTE_CMP_MSGHDR(SEND_MSGP, SEND_TR_LEN, CMPBUFLEN, MSGHDRLEN)							\
{																\
	repl_cmpmsg_ptr_t		send_cmpmsgp;										\
																\
	send_cmpmsgp = (repl_cmpmsg_ptr_t)SEND_MSGP;										\
	assert(&send_cmpmsgp->type == &SEND_MSGP->type);									\
	assert(&send_cmpmsgp->len == &SEND_MSGP->len);										\
	send_cmpmsgp->type = REPL_TR_CMP_JNL_RECS2;										\
	/* Note that a compressed message need not be 8-byte aligned even though the input message was. So round it up to	\
	 * the nearest align boundary. The actual message will contain the unaligned length which is what the receiver will	\
	 * receive. But the # of bytes transmitted across will be the aligned length.						\
	 */															\
	send_cmpmsgp->len = (int4)(ROUND_UP(CMPBUFLEN + MSGHDRLEN, REPL_MSG_ALIGN));						\
	send_cmpmsgp->uncmplen = SEND_TR_LEN;											\
	send_cmpmsgp->cmplen = (int4)CMPBUFLEN;											\
	SEND_TR_LEN = SEND_MSGP->len;												\
}

#ifdef GTM_TRIGGER
#define ISSUE_TRIG2NOTRIG_IF_NEEDED												\
{																\
	DCL_THREADGBL_ACCESS;													\
																\
	SETUP_THREADGBL_ACCESS;													\
	if (!(TREF(replgbl)).trig_replic_warning_issued && (TREF(replgbl)).trig_replic_suspect_seqno				\
		&& !remote_side->trigger_supported)										\
	{ 	/* Note: The below repl_log text is copied from TRIG2NOTRIG error message content from merrors.msg. Change 	\
		 * to one should be reflected in another									\
		 */														\
		repl_log(gtmsource_log_fp, TRUE, TRUE, "Warning: Sending transaction sequence number %d which used "		\
			"triggers to a replicator that does not support triggers\n", (TREF(replgbl)).trig_replic_suspect_seqno);\
		(TREF(replgbl)).trig_replic_warning_issued = TRUE; /* No more warnings till restart */				\
		(TREF(replgbl)).trig_replic_suspect_seqno = seq_num_zero;							\
	}															\
}
#endif

#ifdef GTM_TLS
#define REPLTLS_RENEGOTIATE(SOCK, STATUS)											\
{																\
	char	*errp;														\
	int	save_errno;													\
																\
	if (0 != (STATUS = gtm_tls_renegotiate(SOCK)))										\
	{															\
		assert(-1 == STATUS);												\
		save_errno = gtm_tls_errno();											\
		if (REPL_CONN_RESET(save_errno))										\
		{														\
			repl_log(gtmsource_log_fp, TRUE, TRUE, "Connection reset while attempting to TLS/SSL connection.\n",	\
					(gtm_tls_does_renegotiate(SOCK)) ? "renegotiate" : "update TLS session keys");		\
			gtmsource_state = gtmsource_local->gtmsource_state = GTMSOURCE_WAITING_FOR_CONNECTION;			\
			repl_close(&gtmsource_sock_fd);										\
		} else														\
		{														\
			errp = (-1 == save_errno) ? (char *)gtm_tls_get_error(NULL) : STRERROR(save_errno);			\
			rts_error_csa(CSA_ARG(NULL) VARLSTCNT(7) ERR_TLSRENEGOTIATE, 1, 					\
					(gtm_tls_does_renegotiate(SOCK)) ? "renegotiate" : "update TLS session keys",		\
					ERR_TEXT, 2, LEN_AND_STR(errp));							\
		}														\
	} else															\
		gtmsource_local->num_renegotiations++;										\
}
#endif

#define	PHASE2_COMMIT_WAIT_CNT	8	/* 8 iterations of each 1 msec sleep before we decide to invoke "repl_phase2_cleanup" */

GBLDEF	repl_msg_ptr_t		gtmsource_msgp = NULL;
GBLDEF	int			gtmsource_msgbufsiz = 0;
GBLDEF	repl_msg_ptr_t		gtmsource_cmpmsgp = NULL;
GBLDEF	int			gtmsource_cmpmsgbufsiz = 0;
GBLDEF	boolean_t		gtmsource_received_cmp2uncmp_msg;
GBLDEF	qw_num			repl_source_data_sent = 0;
GBLDEF	qw_num			repl_source_msg_sent = 0;
GBLDEF	qw_num			repl_source_cmp_sent = 0;
GBLDEF	qw_num			repl_source_lastlog_data_sent = 0;
GBLDEF	qw_num			repl_source_lastlog_msg_sent = 0;
GBLDEF	time_t			repl_source_prev_log_time;
GBLDEF  time_t			repl_source_this_log_time;
GBLDEF	time_t			gtmsource_last_flush_time;

GBLREF	gtmsource_state_t	gtmsource_state;
GBLREF	uchar_ptr_t		repl_filter_buff;
GBLREF	int			repl_filter_bufsiz;
GBLREF	volatile time_t		gtmsource_now;
GBLREF	int			gtmsource_sock_fd;
GBLREF	jnlpool_addrs_ptr_t	jnlpool;
GBLREF	sgmnt_addrs		*cs_addrs;
GBLREF	sgmnt_data_ptr_t	cs_data;
GBLREF	gd_region		*gv_cur_region;
GBLREF	repl_ctl_element	*repl_ctl_list;
GBLREF	gtmsource_options_t	gtmsource_options;
GBLREF	int			gtmsource_log_fd;
GBLREF	FILE			*gtmsource_log_fp;
GBLREF	boolean_t		gtmsource_logstats;
GBLREF	int			gtmsource_filter;
GBLREF	gd_addr			*gd_header;
GBLREF	seq_num			seq_num_zero, seq_num_minus_one, seq_num_one;
GBLREF	unsigned int		jnl_source_datalen, jnl_dest_maxdatalen;
GBLREF	unsigned char		jnl_source_rectype, jnl_dest_maxrectype;
GBLREF	int			repl_max_send_buffsize, repl_max_recv_buffsize;
GBLREF	seq_num			lastlog_seqno;
GBLREF	uint4			log_interval;
GBLREF	qw_num			trans_sent_cnt, last_log_tr_sent_cnt;
GBLREF	repl_conn_info_t	*this_side, *remote_side;
GBLREF	int4			strm_index;
GBLREF	uint4			process_id;
GBLREF	seq_num			gtmsource_save_read_jnl_seqno;

STATICDEF	boolean_t	xon_wait_logged = FALSE;
STATICDEF	boolean_t	already_communicated = FALSE;
STATICDEF	seq_num		recvd_seqno = 0;
STATICDEF	int		recvd_start_flags = START_FLAG_NONE;
STATICDEF	int		poll_time = REPL_POLL_NOWAIT;
#ifdef GTM_TLS
STATICDEF	boolean_t	next_renegotiate_hrtbt = FALSE;
STATICDEF	int4		hrtbt_cnt = 0;
STATICDEF 	int4		renegotiate_factor = 0;
#ifdef DEBUG
STATICDEF	boolean_t	renegotiation_pending = FALSE;
#endif
#endif

#define	OUT_LINE	256 + 1
#define PROC_SRCOPS_PRINT_MSG_LEN	2048
error_def(ERR_JNLNEWREC);
error_def(ERR_JNLSETDATA2LONG);
error_def(ERR_REPLCOMM);
error_def(ERR_REPLFTOKSEM);
error_def(ERR_REPLINSTNOHIST);
error_def(ERR_REPLNOTLS);
error_def(ERR_REPLXENDIANFAIL);
error_def(ERR_REPLAHEAD);
error_def(ERR_TRIG2NOTRIG);
error_def(ERR_TLSIOERROR);
error_def(ERR_TLSRENEGOTIATE);
error_def(ERR_TEXT);

/* Endian converts the given set of journal records (possibly multiple sequence numbers) so that the secondary can consume them
 * as-is. This is done only in the case when the primary is running on a GT.M version less than the GT.M version on secondary
 * side. Otherwise, the secondary takes the responsibility of doing the endian conversion. Note that the endian conversion happens
 * in-place. The below function is based on gtmrecv_process.c/repl_tr_endian_convert()
 */
static void repl_tr_endian_convert(repl_msg_ptr_t send_msgp, int send_tr_len, seq_num pre_read_seqno)
{
	uchar_ptr_t		buffp, jb;
	DEBUG_ONLY(uchar_ptr_t	jstart;)
	int			buflen, remaining_len, jlen, reclen, status, nodeflags_keylen, temp_val, keylen;
	jnl_record		*rec;
	enum jnl_record_type	rectype;
	jrec_suffix		*suffixp;
	jnl_string		*keystr;
	mstr_len_t		*vallen_ptr;
	/* seq_num		good_seqno; */

	buffp = send_msgp->msg;
	buflen = send_msgp->len - REPL_MSG_HDRLEN;
	remaining_len = send_tr_len;
	/* QWASSIGN(good_seqno, seq_num_zero); */
	status = 0;
	while (0 < remaining_len)
	{
		jlen = buflen;
		jb = buffp;
		while (JREC_PREFIX_SIZE <= jlen)
		{
			DEBUG_ONLY(jstart = jb);
			rec = (jnl_record *)(jb);
			/* endian convert the prefix fields. Not all of the prefix fields are used by the secondary. Only rectype
			 * and forwptr are needed.
			 */
			rectype = (enum jnl_record_type)rec->prefix.jrec_type;
			reclen = rec->prefix.forwptr;
			rec->prefix.forwptr = GTM_BYTESWAP_24(reclen);
			if (!IS_REPLICATED(rectype) || (0 == reclen) || (reclen > jlen) || (reclen > MAX_LOGI_JNL_REC_SIZE))
			{
				assert(FALSE);
				status = -1;
				break;
			}
			assert(!IS_ZTP(rectype));
			assert(IS_SET_KILL_ZKILL_ZTWORM_LGTRIG_ZTRIG(rectype) || (JRT_TCOM == rectype) || (JRT_NULL == rectype));
			/* endian convert the suffix fields. Only backptr needs endian conversion as the other field - suffix_code
			 * is 8 bit.
			 */
			suffixp = ((jrec_suffix *)((unsigned char *)rec + reclen - JREC_SUFFIX_SIZE));
			suffixp->backptr = GTM_BYTESWAP_24(suffixp->backptr);
			/* QWASSIGN(good_seqno, rec->jrec_null.jnl_seqno); */ /* update good_seqno */
			rec->jrec_null.jnl_seqno = GTM_BYTESWAP_64(rec->jrec_null.jnl_seqno);
			/* At this point, we could have a TCOM or NULL or SET/KILL/ZKILL/ZTRIG type of record.
			 * Assert that all of them have "strm_seqno" at the exact same offset so we can avoid
			 * an if/then/else check on the record types in order to endian convert "strm_seqno".
			 */
			assert(&rec->jrec_null.strm_seqno == &rec->jrec_set_kill.strm_seqno);
			assert(&rec->jrec_null.strm_seqno == &rec->jrec_tcom.strm_seqno);
			rec->jrec_null.strm_seqno = GTM_BYTESWAP_64(rec->jrec_null.strm_seqno);
			if (IS_SET_KILL_ZKILL_ZTWORM_LGTRIG_ZTRIG(rectype))
			{
				keystr = (jnl_string *)&rec->jrec_set_kill.mumps_node;
				assert(keystr == (jnl_string *)&rec->jrec_ztworm.ztworm_str);
				assert(keystr == (jnl_string *)&rec->jrec_lgtrig.lgtrig_str);
				assert(&rec->jrec_set_kill.update_num == &rec->jrec_ztworm.update_num);
				assert(&rec->jrec_set_kill.update_num == &rec->jrec_lgtrig.update_num);
				rec->jrec_set_kill.update_num = GTM_BYTESWAP_32(rec->jrec_set_kill.update_num);
				/* From V19 onwards, the 'length' field is divided into 8 bit 'nodeflags' and 24 bit 'length'
				 * fields.
				 */
				keylen = keystr->length;
				nodeflags_keylen = *(jnl_str_len_t *)keystr;
				*(jnl_str_len_t *)keystr = GTM_BYTESWAP_32(nodeflags_keylen);
				if (IS_SET(rectype) || IS_ZTWORM(rectype) || IS_LGTRIG(rectype))
				{ 	/* SET/ZTWORM/LGTRIG records have a 'key/value' part whose length needs endian conversion */
					vallen_ptr = (mstr_len_t *)&keystr->text[keylen];
					GET_MSTR_LEN(temp_val, vallen_ptr);
					temp_val = GTM_BYTESWAP_32(temp_val);
					PUT_MSTR_LEN(vallen_ptr, temp_val);
					/* The 'key/value' itself is a character array and hence needs no endian conversion */
				}
			} else if (JRT_TCOM == rectype)
			{
				assert((unsigned char *)&rec->jrec_tcom.token_seq
					+ SIZEOF(token_seq_t) == (unsigned char *)&rec->jrec_tcom.filler_short);
				/* endian convert num_participants */
				rec->jrec_tcom.num_participants = GTM_BYTESWAP_16(rec->jrec_tcom.num_participants);
			}
			/* else records can only be JRT_NULL. The only relevant field in JRT_NULL is the sequence number which is
			 * already endian converted.
			 */
			assert(jstart == jb); /* endian conversion should always happen in-place. */
			jlen -= reclen;
			jb += reclen;
		}
		if ((-1 == status) || (0 != jlen))
		{
			assert(FALSE);
			RTS_ERROR_CSA_ABT(NULL, VARLSTCNT(5) ERR_REPLXENDIANFAIL, 3,
				LEN_AND_LIT("Originating"), &pre_read_seqno);
		}
		/* move on to the next transaction */
		remaining_len -= (buflen + REPL_MSG_HDRLEN);
		buffp += buflen;
		assert((REPL_TR_JNL_RECS == ((repl_msg_ptr_t)(buffp))->type) || (0 == remaining_len));
		buflen = ((repl_msg_ptr_t)(buffp))->len - REPL_MSG_HDRLEN;
		buffp += REPL_MSG_HDRLEN;
	}
	if (0 != remaining_len)
	{
		rts_error_csa(CSA_ARG(NULL) VARLSTCNT(5) ERR_REPLXENDIANFAIL, 3, LEN_AND_LIT("Originating"), &pre_read_seqno);
		assert(FALSE);
	}
}

/* Returns TRUE if the state changed to a transitional state while handling control messages */
boolean_t gtmsource_recv_ctl_nowait(void)
{
	gtmsource_state_t	gtmsource_state_sav;
	int			poll_time_sav;

	GTMSOURCE_SAVE_STATE(gtmsource_state_sav);
	poll_time_sav = poll_time;
	poll_time = REPL_POLL_NOWAIT;
	gtmsource_recv_ctl();
	/* If we changed state, keep the poll_time associated with the new state.
	 * However, TLS messaging changes poll_time without changing state, so restore poll_time to match the prior state.
	 */
	if (!GTMSOURCE_CHANGED_STATE(gtmsource_state_sav))
		poll_time = poll_time_sav;
	return (GTMSOURCE_NOW_TRANSITIONAL(gtmsource_state_sav));
}

void gtmsource_recv_ctl(void)
{
	gtmsource_local_ptr_t		gtmsource_local;
	repl_msg_t			renegotiate_msg;
	repl_msg_t			xoff_ack;
	repl_heartbeat_msg_ptr_t	heartbeat_msg;
	repl_msg_t			recv_msg, *recv_msgp;			/* gtmsource_msgp may be in use; use this instead */
	unsigned char			*msg_ptr;				/* needed for REPL_{SEND,RECV}_LOOP */
	int				torecv_len, recvd_len, recvd_this_iter;	/* needed for REPL_RECV_LOOP */
	int				status, poll_dir;			/* needed for REPL_{SEND,RECV}_LOOP */
	boolean_t			msg_is_cross_endian;
	seq_num				tmp_seqno;
	gtm_time4_t			tmp_time4;
	int				index;
	char				err_string[1024];

	/* Check if receiver sent us any control message. Typically, the traffic from receiver to source is very
	 * low compared to traffic in the other direction. More often than not, there will be nothing on the pipe
	 * to receive. Ideally, we should let TCP notify us when there is data on the pipe (async I/O on Unix and
	 * VMS). But, we are not there yet. Since we do a select() before a recv(), we won't block if there is
	 * nothing in the pipe. So, it shouldn't be an expensive operation even if done before every send. Also,
	 * in doing so, we react to an XOFF sooner than later.
	 */
	/* Make sure we don't sleep for an extended period of time if there is something to be sent across */
	assert((GTMSOURCE_SENDING_JNLRECS != gtmsource_state)
			|| ((0 == poll_time) || (GTMSOURCE_IDLE_POLL_WAIT == poll_time))
			GTMTLS_ONLY(DEBUG_ONLY(|| renegotiation_pending)));
	if (GTMSOURCE_CHANGING_MODE == gtmsource_state)
		return;
	if (GTMSOURCE_WAITING_FOR_CONNECTION == gtmsource_state)
		return;
#	ifdef GTM_TLS
	if (repl_tls.enabled && (REPLTLS_WAITING_FOR_RENEG_TIMEOUT == repl_tls.renegotiate_state) && next_renegotiate_hrtbt)
	{	/* Time to renegotiate the TLS/SSL parameters. */
		heartbeat_stalled = TRUE;	/* Defer heartbeats until renegotiation is done. */
		DEBUG_ONLY(renegotiation_pending = TRUE);
		/* Send REPL_RENEG_ACK_ME message to the receiver. */
		renegotiate_msg.type = REPL_RENEG_ACK_ME;
		renegotiate_msg.len = MIN_REPL_MSGLEN;
		gtmsource_repl_send((repl_msg_ptr_t)&renegotiate_msg, "REPL_RENEG_ACK_ME",
						MAX_SEQNO, INVALID_SUPPL_STRM);
		if (GTMSOURCE_CHANGING_MODE == gtmsource_state)
			return;
		if (GTMSOURCE_WAITING_FOR_CONNECTION == gtmsource_state)
			return;
		/* We now have to wait for REPL_RENEG_ACK from the receiver. Until then we defer sending journal
		 * records to the other side. This way, we don't end up having outbound data in the TCP/IP pipe
		 * during the time of renegotiation. TLS/SSL protocol doesn't handle application data when it is
		 * in the middle of renegotiation. Similarly, the receiver on receipt of the REPL_RENEG_ACK_ME
		 * message will defer sending any more messages to us until the renegotiation is completed.
		 */
		repl_tls.renegotiate_state = REPLTLS_WAITING_FOR_RENEG_ACK;
		repl_log(gtmsource_log_fp, TRUE, TRUE, "Waiting for SSL/TLS connection %s acknowledgement message.\n",
					(gtm_tls_does_renegotiate(repl_tls.sock)) ?  "renegotiate" : "session keys update");
		poll_time = REPL_POLL_WAIT; /* because we are waiting for a REPL_RENEG_ACK */
	}
#	endif
	recv_msgp = &recv_msg;
	REPL_RECV_LOOP(gtmsource_sock_fd, recv_msgp, MIN_REPL_MSGLEN, poll_time)
	{
		if (0 == recvd_len) /* nothing received in the first attempt, let's try again later */
			break;
		gtmsource_poll_actions(TRUE);
		if ((GTMSOURCE_CHANGING_MODE == gtmsource_state) || (GTMSOURCE_HANDLE_ONLN_RLBK == gtmsource_state))
		{
			poll_time = REPL_POLL_NOWAIT;
			return;
		}
		else if (GTMSOURCE_WAITING_FOR_CONNECTION == gtmsource_state)
			break;
	}
	gtmsource_local = jnlpool->gtmsource_local;
	if ((SS_NORMAL == status) && (0 != recvd_len))
	{	/* Process the received control message */
		assert(MIN_REPL_MSGLEN == recvd_len);
		REPL_DPRINT3("gtmsource_process: %d bytes received, type is %d\n", recvd_len, recv_msgp->type);
		/* One is not always guaranteed the received message is in source native endian format.
		 * See endianness related comments in gtmsource_recv_restart for why. So be safe and handle
		 * it just like how gtmsource_recv_restart does. The below check works as all messages we
		 * expect at this point have a fixed length of MIN_REPL_MSGLEN.
		 */
		msg_is_cross_endian = (((unsigned)MIN_REPL_MSGLEN < (unsigned)recv_msgp->len)
				&& ((unsigned)MIN_REPL_MSGLEN == GTM_BYTESWAP_32((unsigned)recv_msgp->len)));
		if (msg_is_cross_endian)
		{
			recv_msgp->type = GTM_BYTESWAP_32(recv_msgp->type);
			recv_msgp->len = GTM_BYTESWAP_32(recv_msgp->len);
		}
		assert(MIN_REPL_MSGLEN == recv_msgp->len);
		assert(remote_side->endianness_known);
		switch(recv_msgp->type)
		{
			case REPL_XOFF:
			case REPL_XOFF_ACK_ME:
				gtmsource_state = gtmsource_local->gtmsource_state = GTMSOURCE_WAITING_FOR_XON;
				poll_time = REPL_POLL_WAIT; /* because we are waiting for a REPL_XON */
				repl_log(gtmsource_log_fp, TRUE, TRUE,
					 "REPL_XOFF/REPL_XOFF_ACK_ME received. Send stalled...\n");
				xon_wait_logged = FALSE;
				if (REPL_XOFF_ACK_ME == recv_msgp->type)
				{
					xoff_ack.type = REPL_XOFF_ACK;
					tmp_seqno = *(seq_num *)&recv_msgp->msg[0];
					if (msg_is_cross_endian)
						tmp_seqno = GTM_BYTESWAP_64(tmp_seqno);
					*(seq_num *)&xoff_ack.msg[0] = tmp_seqno;
					xoff_ack.len = MIN_REPL_MSGLEN;
					gtmsource_repl_send((repl_msg_ptr_t)&xoff_ack, "REPL_XOFF_ACK",
						MAX_SEQNO, INVALID_SUPPL_STRM);
					if (GTMSOURCE_CHANGING_MODE == gtmsource_state)
						return;	/* "gtmsource_repl_send" did not complete */
					if (GTMSOURCE_WAITING_FOR_CONNECTION == gtmsource_state)
						break;	/* "gtmsource_repl_send" did not complete */
					/* REPL_XOFF_ACK_ME is always followed by either a REPL_START_JNL_SEQNO,
					 * REPL_CMP2UNCMP or REPL_BADTRANS. We don't want to be doing TLS/SSL
					 * renegotiation in the middle of these messages as the logic on the
					 * receiver side is complicated enough to include TLS/SSL renegotiation.
					 * In all three cases, we go break out of this loop and redo the replication
					 * handshake. So, set the state to skip renegotiation in the mean time.
					 */
#					ifdef GTM_TLS
					if (repl_tls.enabled)
						repl_tls.renegotiate_state = REPLTLS_SKIP_RENEGOTIATION;
#					endif
				}
				break;
			case REPL_XON:
				gtmsource_state = gtmsource_local->gtmsource_state = GTMSOURCE_SENDING_JNLRECS;
				poll_time = REPL_POLL_NOWAIT; /* because we received XON and data ready for send */
				repl_log(gtmsource_log_fp, TRUE, TRUE, "REPL_XON received\n");
				GTMTLS_ONLY(if (REPLTLS_WAITING_FOR_RENEG_ACK != repl_tls.renegotiate_state))
					heartbeat_stalled = FALSE;
				REPL_DPRINT1("Restarting HEARTBEAT\n");
				break;
			case REPL_BADTRANS:
			case REPL_CMP2UNCMP:
			case REPL_START_JNL_SEQNO:
				/* A REPL_XOFF_ACK_ME must have been sent before. Ensure by asserting that we are
				 * waiting for an XON.
				 */
				assert(GTMSOURCE_WAITING_FOR_XON == gtmsource_state);
				QWASSIGN(recvd_seqno, *(seq_num *)&recv_msgp->msg[0]);
				if (msg_is_cross_endian)
					recvd_seqno = GTM_BYTESWAP_64(recvd_seqno);
				gtmsource_state = gtmsource_local->gtmsource_state
					= GTMSOURCE_SEARCHING_FOR_RESTART;
				if ((REPL_BADTRANS == recv_msgp->type)
					|| (REPL_CMP2UNCMP == recv_msgp->type))
				{
					already_communicated = TRUE;
					recvd_start_flags = START_FLAG_NONE;
					if (REPL_BADTRANS == recv_msgp->type)
						repl_log(gtmsource_log_fp, TRUE, TRUE, "Received REPL_BADTRANS "
							"message with SEQNO "INT8_FMT" "INT8_FMTX"\n",
							recvd_seqno, recvd_seqno);
					else
					{
						repl_log(gtmsource_log_fp, TRUE, TRUE, "Received REPL_CMP2UNCMP "
							"message with SEQNO "INT8_FMT" "INT8_FMTX"\n",
							recvd_seqno, recvd_seqno);
						repl_log(gtmsource_log_fp, TRUE, FALSE,
							"Defaulting to NO compression for this connection\n");
						gtmsource_received_cmp2uncmp_msg = TRUE;
					}
				} else
				{
					recvd_start_flags = ((repl_start_msg_ptr_t)recv_msgp)->start_flags;
					if (msg_is_cross_endian)
						recvd_start_flags = GTM_BYTESWAP_32(recvd_start_flags);
					already_communicated = FALSE;
					repl_log(gtmsource_log_fp, TRUE, TRUE,
						"Received REPL_START_JNL_SEQNO message with SEQNO "INT8_FMT" "
						INT8_FMTX". Possible crash of recvr/update process\n",
						recvd_seqno, recvd_seqno);
				}
				break;
			case REPL_HEARTBEAT:
				if (msg_is_cross_endian)
				{
					heartbeat_msg = (repl_heartbeat_msg_ptr_t)recv_msgp;
					tmp_seqno = *(seq_num *)&heartbeat_msg->ack_seqno[0];
					tmp_seqno = GTM_BYTESWAP_64(tmp_seqno);
					*(seq_num *)&heartbeat_msg->ack_seqno[0] = tmp_seqno;
					tmp_time4 = *(gtm_time4_t *)&heartbeat_msg->ack_time[0];
					tmp_time4 = GTM_BYTESWAP_32(tmp_time4);
					*(gtm_time4_t *)&heartbeat_msg->ack_time[0] = tmp_time4;
				}
				if ((!gtmsource_is_heartbeat_stalled) && (SHUTDOWN != gtmsource_local->shutdown))
					gtmsource_process_heartbeat((repl_heartbeat_msg_ptr_t)recv_msgp);
#				ifdef GTM_TLS
				hrtbt_cnt++;
				/* Check whether it is time for renegotiation */
				if ((0 < renegotiate_factor) && (0 == (hrtbt_cnt % renegotiate_factor))
					&& (SHUTDOWN != gtmsource_local->shutdown))
				{
					switch(repl_tls.renegotiate_state)
					{
						case REPLTLS_RENEG_STATE_NONE:
						case REPLTLS_WAITING_FOR_RENEG_TIMEOUT:
							next_renegotiate_hrtbt = TRUE;
							repl_tls.renegotiate_state = REPLTLS_WAITING_FOR_RENEG_TIMEOUT;
							poll_time = REPL_POLL_WAIT;
							break;
						case REPLTLS_WAITING_FOR_RENEG_ACK:
						/* On slower systems, heartbeat responses may arrive late.
						   In such a case, defer renegotiation */
							break;
						default:
							assert(FALSE);
							break;
					}
				}
#				endif
				break;
#			ifdef GTM_TLS
			case REPL_RENEG_ACK:
				repl_log(gtmsource_log_fp, TRUE, TRUE, "Received SSL/TLS connection %s acknowledgement message.\n",
						((gtm_tls_does_renegotiate(repl_tls.sock)) ?
						 	"renegotiated" : "session keys updated"));
				REPLTLS_RENEGOTIATE(repl_tls.sock, status);
				poll_time = REPL_POLL_NOWAIT; /* because we are back to sending data */
				if (0 != status)
				{
					assert(GTMSOURCE_WAITING_FOR_CONNECTION == gtmsource_state);
					break;
				}
				/* Send the REPL_RENEG_COMPLETE message. */
				repl_log(gtmsource_log_fp, TRUE, TRUE, "Sending SSL/TLS connection %s message.\n",
							((gtm_tls_does_renegotiate(repl_tls.sock)) ?
							 	"renegotiated" : "session keys updated"));
				renegotiate_msg.type = REPL_RENEG_COMPLETE;
				renegotiate_msg.len = MIN_REPL_MSGLEN;
				gtmsource_repl_send((repl_msg_ptr_t)&renegotiate_msg, "REPL_RENEG_COMPLETE",
							MAX_SEQNO, INVALID_SUPPL_STRM);
				if (GTMSOURCE_CHANGING_MODE == gtmsource_state)
					return;
				if (GTMSOURCE_WAITING_FOR_CONNECTION == gtmsource_state)
					break;
				repl_log(gtmsource_log_fp, TRUE, TRUE, "Sent SSL/TLS connection %s message.\n",
							((gtm_tls_does_renegotiate(repl_tls.sock)) ?
							 	"renegotiated" : "session keys updated"));
				assert(heartbeat_stalled);
				if (GTMSOURCE_WAITING_FOR_XON != gtmsource_state)
					heartbeat_stalled = FALSE;
				/* else, heartbeat_stalled will be set back to FALSE when REPL_XON is received. */
				DEBUG_ONLY(renegotiation_pending = FALSE);
				repl_tls.renegotiate_state = REPLTLS_RENEG_STATE_NONE;
				next_renegotiate_hrtbt = FALSE;
				hrtbt_cnt = 0;
				if (gtm_tls_does_renegotiate(repl_tls.sock))	/* Announce results only if renegotiated */
					repl_log_tls_info(gtmsource_log_fp, repl_tls.sock);
				break;
#			endif
			default:
				repl_log(gtmsource_log_fp, TRUE, TRUE, "Message of unknown type %d of length %d "
					"bytes received; hex dump follows\n", recv_msgp->type, recvd_len);
				for (index = 0; index < MIN(recvd_len, gtmsource_msgbufsiz - REPL_MSG_HDRLEN); )
				{
					repl_log(gtmsource_log_fp, FALSE, FALSE, "%.2x ",
							recv_msgp->msg[index]);
					if ((++index) % MAX_HEXDUMP_CHARS_PER_LINE == 0)
						repl_log(gtmsource_log_fp, FALSE, TRUE, "\n");
				}
				repl_log(gtmsource_log_fp, FALSE, TRUE, "\n"); /* flush BEFORE the assert */
				assert(FALSE);
				break;
		}
#		ifdef GTM_TLS
		/* On receipt of a REPL_XOFF_ACK_ME, we should no longer wait-for/attempt TLS/SSL
		 * renegotiation.
		 */
		assert((REPL_XOFF_ACK_ME != recv_msgp->type)
				|| !repl_tls.enabled || (REPLTLS_SKIP_RENEGOTIATION == repl_tls.renegotiate_state));
#		endif
	} else if (SS_NORMAL != status)
	{
		if (EREPL_RECV == repl_errno)
		{
			if (REPL_CONN_RESET(status))
			{
				/* Connection reset */
				repl_log(gtmsource_log_fp, TRUE, TRUE,
					"Connection reset while attempting to receive from secondary."
					" Status = %d ; %s\n", status, STRERROR(status));
				repl_log_conn_info(gtmsource_sock_fd, gtmsource_log_fp, TRUE);
				SEND_SYSMSG_REPLCOMM(LEN_AND_LIT("Error receiving Control message from Receiver. Error in recv."));
			}
			repl_close(&gtmsource_sock_fd);
			SHORT_SLEEP(GTMSOURCE_WAIT_FOR_RECEIVER_CLOSE_CONN);
			gtmsource_state = gtmsource_local->gtmsource_state
				= GTMSOURCE_WAITING_FOR_CONNECTION;
			return;
		} else if (EREPL_SELECT == repl_errno)
		{
			SNPRINTF(err_string, SIZEOF(err_string),
					"Error receiving Control message from Receiver. Error in select : %s",
					STRERROR(status));
			SEND_SYSMSG_REPLCOMM(LEN_AND_STR(err_string));
			RTS_ERROR_CSA_ABT(NULL, VARLSTCNT(6) ERR_REPLCOMM, 0, ERR_TEXT, 2,
				LEN_AND_STR(err_string));
		}
	}
}

/* The work-horse of the Source Server */
int gtmsource_process(void)
{
	gtmsource_local_ptr_t		gtmsource_local;
	jnlpool_ctl_ptr_t		jctl;
	seq_num				sav_read_jnl_seqno;
	seq_num				recvd_jnl_seqno, tmp_read_jnl_seqno;
	int				data_len, srch_status;
	unsigned char			*msg_ptr;				/* needed for REPL_{SEND,RECV}_LOOP */
	int				tosend_len, sent_len, sent_this_iter;	/* needed for REPL_SEND_LOOP */
	int				status, poll_dir;			/* needed for REPL_{SEND,RECV}_LOOP */
	int				tot_tr_len, send_tr_len, remaining_len, pre_cmpmsglen;
	int				recvd_msg_type;
	uchar_ptr_t			in_buff, out_buff, out_buffmsg;
	uint4				in_buflen, out_buflen, out_bufsiz;
	seq_num				log_seqno, diff_seqno, pre_read_seqno, post_read_seqno, jnl_seqno;
	char				err_string[1024];
	double				time_elapsed;
	seq_num				resync_seqno, zqgblmod_seqno, filter_seqno;
	gd_region			*reg, *region_top;
	sgmnt_addrs			*csa, *repl_csa;
	qw_num				delta_sent_cnt, delta_data_sent, delta_msg_sent;
	time_t				prev_now;
	int				index;
	uint4				temp_ulong;
	unix_db_info			*udi;
	repl_histinfo			remote_histinfo, local_histinfo;
	int4				num_histinfo, max_epoch_interval;
	seq_num				local_jnl_seqno, tmp_seqno;
	repl_msg_t			instnohist_msg, losttncomplete_msg;
	repl_msg_ptr_t			send_msgp;
	repl_cmpmsg_ptr_t		send_cmpmsgp;
	repl_start_reply_msg_ptr_t	reply_msgp;
	boolean_t			rollback_first, secondary_ahead, secondary_was_rootprimary;
	boolean_t			intfilter_error, skip_last_histinfo_check, msg_is_cross_endian, retval;
	int				semval, cmpret;
	uLongf				cmpbuflen;
	int4				msghdrlen;
	Bytef				*cmpbufptr;
	char				histdetail[OUT_LINE];
	sm_global_latch_ptr_t		gtmsource_srv_latch;
	gtmsource_state_t		gtmsource_state_sav;
	uint4				phase2_commit_index1;
	int				phase2_commit_wait_cnt;
	boolean_t			close_retry = FALSE;
	DEBUG_ONLY(uchar_ptr_t		save_inbuff;)
	DEBUG_ONLY(uchar_ptr_t		save_outbuff;)
	DCL_THREADGBL_ACCESS;
        char				print_msg_src[PROC_SRCOPS_PRINT_MSG_LEN];
        char				print_msg_t[PROC_SRCOPS_PRINT_MSG_LEN];

	SETUP_THREADGBL_ACCESS;
	assert((NULL != jnlpool) && (NULL != jnlpool->jnlpool_dummy_reg) && jnlpool->jnlpool_dummy_reg->open);
#	ifdef DEBUG
	repl_csa = &FILE_INFO(jnlpool->jnlpool_dummy_reg)->s_addrs;
	assert(!repl_csa->hold_onto_crit); /* so we can do unconditional grab_lock/rel_lock */
	ASSERT_VALID_JNLPOOL(repl_csa);
#	endif
	assert(REPL_MSG_HDRLEN == SIZEOF(jnldata_hdr_struct)); /* necessary for reading multiple transactions from jnlpool in
								* a single attempt */
	jctl = jnlpool->jnlpool_ctl;
	gtmsource_local = jnlpool->gtmsource_local;
	gtmsource_msgp = NULL;
	gtmsource_msgbufsiz = MAX_REPL_MSGLEN;
	if (ZLIB_CMPLVL_NONE != gtm_zlib_cmp_level)
		gtmsource_cmpmsgp = NULL;

	assert(REPL_POLL_WAIT < MILLISECS_IN_SEC);
	assert(GTMSOURCE_IDLE_POLL_WAIT < REPL_POLL_WAIT);

	if (0 < gtmsource_options.renegotiate_interval)
	{
		/* When heartbeat period is a multiple of renegotiate_interval, perform
	           renegotiation every renegotiate_interval/heartbeat_period. */
		renegotiate_factor = (int) DIVIDE_ROUND_DOWN(gtmsource_options.renegotiate_interval,
				gtmsource_local->connect_parms[GTMSOURCE_CONN_HEARTBEAT_PERIOD]);
		/* When heartbeat period is about as high as the renegotiate interval
	        (1 == renegotiate_factor), perform renegotiation every other heartbeat.*/
		if (1 == renegotiate_factor)
			renegotiate_factor++;
	}

	gtmsource_state = gtmsource_local->gtmsource_state = GTMSOURCE_WAITING_FOR_CONNECTION;
	gtmsource_srv_latch = &gtmsource_local->gtmsource_srv_latch;
	/* Below is a simplistic representation of the state diagram of a source server.
	 *
	 *      ------------------------------
	 *            GTMSOURCE_START
	 *      ------------------------------
	 *                    |
	 *                    | (startup state)
	 *                    v
	 *      ------------------------------
	 *     GTMSOURCE_WAITING_FOR_CONNECTION
	 *      ------------------------------
	 *                    |
	 *                    | (gtmsource_est_conn)
	 *                    v
	 *      ------------------------------
	 *       GTMSOURCE_WAITING_FOR_RESTART
	 *      ------------------------------
	 *                    |
	 *                    | (gtmsource_recv_restart)
	 *                    v
	 *      ------------------------------
	 *     GTMSOURCE_SEARCHING_FOR_RESTART
	 *      ------------------------------
	 *                    |
	 *                    | (gtmsource_srch_restart)
	 *                    v
	 *      ------------------------------
	 *        GTMSOURCE_SENDING_JNLRECS         <---------\
	 *      ------------------------------                |
	 *                    |                               |
	 *                    | (receive REPL_XOFF)           |
	 *                    | (receive REPL_XOFF_ACK_ME)    |
	 *                    v                               |
	 *            ------------------------------          ^
	 *              GTMSOURCE_WAITING_FOR_XON             |
	 *            ------------------------------          |
	 *                    |                               |
	 *                    v (receive REPL_XON)            |
	 *                    |                               |
	 *                    \--------------------->---------/
	 */
	udi = FILE_INFO(jnlpool->jnlpool_dummy_reg);
	/* Before entering the loop find the max EPOCH interval (for use in lock waits) */
	max_epoch_interval = 0;
	for (reg = gd_header->regions, region_top = gd_header->regions + gd_header->n_regions; reg < region_top; reg++)
	{
		csa = &FILE_INFO(reg)->s_addrs;
		if ((max_epoch_interval < csa->hdr->epoch_interval) && (MAX_EPOCH_INTERVAL >= csa->hdr->epoch_interval))
			max_epoch_interval = csa->hdr->epoch_interval;
	}
	/* Since we want to wait at least a couple of minutes before timing out on the latch, ensure max_epoch_interval
	 * is at least 1 minute even if the individual file header epoch intervals are not that big.
	 */
	max_epoch_interval = MAX(60, max_epoch_interval);
	while (TRUE)
	{
		assert(!udi->grabbed_ftok_sem);
		gtmsource_stop_heartbeat();
		if (GTMSOURCE_HANDLE_ONLN_RLBK == gtmsource_state)
		{
			poll_time = REPL_POLL_NOWAIT;
			/* Ensure we don't hold any locks at this moment */
			assert(process_id != gtmsource_local->gtmsource_srv_latch.u.parts.latch_pid);
			assert(!have_crit(CRIT_HAVE_ANY_REG)); /* checks both journal pool lock and database crit lock */
			assert(FD_INVALID != gtmsource_sock_fd);
			if (FD_INVALID != gtmsource_sock_fd)
			{
				repl_log(gtmsource_log_fp, TRUE, TRUE, "Closing connection due to ONLINE ROLLBACK\n");
				repl_close(&gtmsource_sock_fd);
				SHORT_SLEEP(GTMSOURCE_WAIT_FOR_RECEIVER_CLOSE_CONN);
			}
			jnl_seqno = jnlpool->jnlpool_ctl->jnl_seqno;
			repl_log(gtmsource_log_fp, TRUE, TRUE, "REPL INFO - Current Jnlpool Seqno : "INT8_FMT"\n",
					jnl_seqno);
			repl_log(gtmsource_log_fp, TRUE, TRUE, "REPL INFO - Last Seqno sent : "INT8_FMT"\n",
					gtmsource_local->read_jnl_seqno - 1);
			/* gtmsource_save_read_jnl_seqno is kept uptodate with gtmsource_local->read_addr and gtmsource_local->read
			 * fields in gtmsource_onln_rlbk_clnup. But, gtmsource_local->read_jnl_seqno is still pointing to the last
			 * sequence number that we sent to the receiver (which could have been rolled back now). We don't want to
			 * continue with a stale value of read_jnl_seqno. So, set it to gtmsource_save_read_jnl_seqno which itself
			 * is taken from the jnlpool_ctl->jnl_seqno right when we detected the online rollback. We could have set
			 * this right when we set gtmsource_save_read_jnl_seqno but we don't do that because we want to print the
			 * old value in the log file but we can't use repl_log/gtmsource_log_fp in gtmsource_onln_rlbk_clnup() as
			 * it is bundled up as part of libgtmshr.so whereas repl_log is bundled in libmupip.a.
			 */
			gtmsource_local->read_jnl_seqno = gtmsource_save_read_jnl_seqno;
			repl_log(gtmsource_log_fp, TRUE, TRUE, "REPL INFO - Source Server Read Seqno is now set to : "INT8_FMT"\n",
					gtmsource_local->read_jnl_seqno);
			gtmsource_state = gtmsource_local->gtmsource_state = GTMSOURCE_WAITING_FOR_CONNECTION;
			assert(READ_FILE == gtmsource_local->read_state);
			gtmsource_ctl_close(); /* can't rely on the journal files anymore since rollback could have touched them */
		}
		if (GTMSOURCE_WAITING_FOR_CONNECTION == gtmsource_state)
		{
			gtmsource_start_jnl_release_timer();
			gtmsource_est_conn();
			gtmsource_stop_jnl_release_timer();
			if (GTMSOURCE_CHANGING_MODE == gtmsource_state)
				return (SS_NORMAL);
			repl_source_data_sent = repl_source_msg_sent = repl_source_cmp_sent = 0;
			repl_source_lastlog_data_sent = 0;
			repl_source_lastlog_msg_sent = 0;

			gtmsource_alloc_msgbuff(MAX_REPL_MSGLEN, TRUE);
			gtmsource_state = gtmsource_local->gtmsource_state = GTMSOURCE_WAITING_FOR_RESTART;
			recvd_start_flags = START_FLAG_NONE;
			repl_source_prev_log_time = time(NULL);
		}
		if (GTMSOURCE_WAITING_FOR_RESTART == gtmsource_state &&
		    SS_NORMAL != (status = gtmsource_recv_restart(&recvd_seqno, &recvd_msg_type, &recvd_start_flags)))
		{
			if (EREPL_RECV == repl_errno)
			{
				if (REPL_CONN_RESET(status))
				{	/* Connection reset */
					repl_log(gtmsource_log_fp, TRUE, TRUE,
							"Connection reset while receiving restart SEQNO. Status = %d ; %s\n",
							status, STRERROR(status));
					repl_close(&gtmsource_sock_fd);
					SHORT_SLEEP(GTMSOURCE_WAIT_FOR_RECEIVER_CLOSE_CONN);
					gtmsource_state = gtmsource_local->gtmsource_state = GTMSOURCE_WAITING_FOR_CONNECTION;
					continue;
				} else
				{
					SNPRINTF(err_string, SIZEOF(err_string),
							"Error receiving RESTART SEQNO. Error in recv : %s", STRERROR(status));
					RTS_ERROR_CSA_ABT(NULL, VARLSTCNT(6) ERR_REPLCOMM, 0, ERR_TEXT, 2,
						LEN_AND_STR(err_string));
				}
			} else if (EREPL_SEND == repl_errno)
			{
				if (REPL_CONN_RESET(status))
				{
					repl_log(gtmsource_log_fp, TRUE, TRUE,
					       "Connection reset while sending XOFF_ACK due to possible update process shutdown. "
					       "Status = %d ; %s\n", status, STRERROR(status));
					repl_log_conn_info(gtmsource_sock_fd, gtmsource_log_fp, TRUE);
					close_retry = TRUE;
				}
				if (close_retry)
				{
		    			repl_close(&gtmsource_sock_fd);
					SHORT_SLEEP(GTMSOURCE_WAIT_FOR_RECEIVER_CLOSE_CONN);
					gtmsource_state = gtmsource_local->gtmsource_state = GTMSOURCE_WAITING_FOR_CONNECTION;
					close_retry = FALSE;
					continue;
				} else
				{
					SNPRINTF(err_string, SIZEOF(err_string),
						"Error sending XOFF_ACK_ME message. Error in send : %s\n",
							STRERROR(status));
					SEND_SYSMSG_REPLCOMM(LEN_AND_STR(err_string));
					RTS_ERROR_CSA_ABT(NULL, VARLSTCNT(6) ERR_REPLCOMM, 0, ERR_TEXT, 2,
						LEN_AND_STR(err_string));
				}
			} else if (EREPL_SELECT == repl_errno)
			{
				SNPRINTF(err_string, SIZEOF(err_string), "Error receiving RESTART SEQNO/sending XOFF_ACK_ME.  "
						"Error in select : %s", STRERROR(status));
				RTS_ERROR_CSA_ABT(NULL, VARLSTCNT(6) ERR_REPLCOMM, 0, ERR_TEXT, 2,
					LEN_AND_STR(err_string));
			}
		}
		if (GTMSOURCE_CHANGING_MODE == gtmsource_state)
			return (SS_NORMAL);
		/* Connection might have been closed if "gtmsource_recv_restart" got an unexpected message. In that case
		 * re-establish the same by continuing to the beginning of this loop. */
		if (GTMSOURCE_WAITING_FOR_CONNECTION == gtmsource_state)
			continue;
		assert(REPL_PROTO_VER_MULTISITE <= remote_side->proto_ver);
		assert((GTMSOURCE_SEARCHING_FOR_RESTART == gtmsource_state) || (GTMSOURCE_WAITING_FOR_RESTART == gtmsource_state));
		/* Receiver runs on a version of GT.M that supports multi-site capability */
		/* If gtmsource_state == GTMSOURCE_SEARCHING_FOR_RESTART, we have already communicated with the
		 * receiver and hence checked the instance info so no need to do it again.
		 */
		if (GTMSOURCE_WAITING_FOR_RESTART == gtmsource_state)
		{	/* Get replication instance info */
			DEBUG_ONLY(secondary_was_rootprimary = -1;)
			/* Note: As part of the REPL_INSTINFO message, the receiver could be sending a non-zero "strm_jnl_seqno"
			 * in some cases. If so, it will override "recvd_seqno" we saw before in the REPL_START_JNL_SEQNO message.
			 */
			if (!gtmsource_get_instance_info(&secondary_was_rootprimary, &recvd_seqno))
			{
				if (GTMSOURCE_CHANGING_MODE == gtmsource_state)
					return (SS_NORMAL);
				else if (GTMSOURCE_WAITING_FOR_CONNECTION == gtmsource_state)
					continue;
				else
				{	/* Got a REPL_XOFF_ACK_ME from the receiver. Restart the initial handshake */
					assert(GTMSOURCE_WAITING_FOR_RESTART == gtmsource_state);
					continue;
				}
			}
			assert((FALSE == secondary_was_rootprimary) || (TRUE == secondary_was_rootprimary));
		}
		rollback_first = FALSE;
		secondary_ahead = FALSE;
		grab_lock(jnlpool->jnlpool_dummy_reg, TRUE, HANDLE_CONCUR_ONLINE_ROLLBACK);
		if (GTMSOURCE_HANDLE_ONLN_RLBK == gtmsource_state)
		{
			poll_time = REPL_POLL_NOWAIT;
			continue;
		}
		local_jnl_seqno = jctl->jnl_seqno;
		rel_lock(jnlpool->jnlpool_dummy_reg);
		/* Take care to set the flush parameter in repl_log calls below to FALSE until at least the first message
		 * gets sent back. This is so the fetchresync rollback on the other side does not timeout before receiving
		 * a response. */
		poll_time = REPL_POLL_NOWAIT;
		assert(0 == GET_STRM_INDEX(recvd_seqno));
		assert(0 == GET_STRM_INDEX(local_jnl_seqno));
		if (recvd_seqno > local_jnl_seqno)
		{	/* Secondary journal seqno is greater than that of the Primary. We know it is ahead of the primary. */
			secondary_ahead = TRUE;
			REPL_DPRINT5("Secondary instance journal seqno "INT8_FMT" "INT8_FMTX" is greater than Primary "
				"instance journal seqno "INT8_FMT" "INT8_FMTX"", recvd_seqno, recvd_seqno,
				local_jnl_seqno, local_jnl_seqno);
			sgtm_putmsg(print_msg_src, PROC_SRCOPS_PRINT_MSG_LEN, VARLSTCNT(4) ERR_REPLAHEAD, 2, LEN_AND_LIT(""));
			repl_log(gtmsource_log_fp, TRUE, TRUE, print_msg_src);
			SNPRINTF(print_msg_t, SIZEOF(print_msg_t), "Replicating instance : "INT8_FMT", Originating instance : "
				""INT8_FMT". ",	recvd_seqno, local_jnl_seqno);
			sgtm_putmsg(print_msg_src, PROC_SRCOPS_PRINT_MSG_LEN, VARLSTCNT(4) ERR_TEXT, 2, LEN_AND_STR(print_msg_t));
			repl_log(gtmsource_log_fp, TRUE, TRUE, print_msg_src);
			/* Since the secondary is at least multi-site, the determination of the rollback seqno involves comparing
			 * the histinfo records between the primary and secondary starting down from "local_jnl_seqno-1"
			 * (done below). In either case, the secondary has to roll back to at most "local_jnl_seqno".
			 * Reset "recvd_seqno" to this number given that we have already recorded that the secondary is
			 * ahead of the primary.
			 */
			recvd_seqno = local_jnl_seqno;
		}else
		{
			repl_log(gtmsource_log_fp, TRUE, FALSE, "Current Journal Seqno of the instance is "INT8_FMT" "INT8_FMTX"\n",
				local_jnl_seqno, local_jnl_seqno);
		}
		/* Before setting "next_histinfo_seqno", check if we have at least one histinfo record in the replication instance
		 * file. The only case when there can be no histinfo records is if this instance is a propagating primary. Assert
		 * that. In this case, wait for this instance's primary to send the first histinfo record before setting the
		 * next_histinfo_seqno. Note that we are fetching the value of "num_histinfo" without holding a lock on the instance
		 * file but that is ok since all we care about is if it is 0 or not. We do not rely on the actual value.
		 */
		num_histinfo = jnlpool->repl_inst_filehdr->num_histinfo;
		assert(0 <= num_histinfo);
		assert(num_histinfo || jctl->upd_disabled);
		gtmsource_local->next_histinfo_num = -1;/* Initial value. Reset by the call to "gtmsource_set_next_histinfo_seqno"
							 * invoked in turn by "gtmsource_send_new_histrec" down below */
		if (jctl->upd_disabled && !num_histinfo)
		{	/* Wait for corresponding primary to send a new histinfo record and the receiver server on this instance
			 * to write that to the replication instance file.
			 */
			assert(-1 == gtmsource_local->next_histinfo_num);
			repl_log(gtmsource_log_fp, TRUE, TRUE, "Source server waiting for first history record to be written by "
				"update process\n");
			do
			{
				SHORT_SLEEP(GTMSOURCE_WAIT_FOR_FIRSTHISTINFO);
				gtmsource_poll_actions(FALSE);
				if (GTMSOURCE_CHANGING_MODE == gtmsource_state)
					return (SS_NORMAL);
				else if (GTMSOURCE_HANDLE_ONLN_RLBK == gtmsource_state)
				{
					poll_time = REPL_POLL_NOWAIT;
					break; /* Break this loop */
				}
				num_histinfo = jnlpool->repl_inst_filehdr->num_histinfo;
				if (num_histinfo)	/* Number of histinfos is non-zero */
					break;
			} while (TRUE);
			if (GTMSOURCE_HANDLE_ONLN_RLBK == gtmsource_state)
			{
				poll_time = REPL_POLL_NOWAIT;
				continue; /* Restart the outer loop */
			}
			repl_log(gtmsource_log_fp, TRUE, TRUE,
				"First history record written by update process. Source server proceeding.\n");
		}
		/* Now get the latest histinfo record from the secondary. There are a few exceptions though.
		 * 	1) If we came here because of a BAD_TRANS or CMP2UNCMP message from the receiver server.
		 *		In this case, we have already been communicating with the receiver so no need to
		 *		compare the histinfo record between primary and secondary.
		 *	2) If receiver server was started with -UPDATERESYNC and receiver is running pre-V55000.
		 *		In this case there is no history record on the receiver side to compare against.
		 *		In case the receiver is post-V55000, the -UPDATERESYNC would have required an instance
		 *		file name as the value which would be used towards history record verification.
		 *	3) If receiver server was started with -UPDATERESYNC and receiver is >= V55000 and at a seqno
		 *		which is EQUAL to the earliest seqno for which we have a history record on the primary.
		 *		We have	no way of verifying histories since we definitely don't have the history record
		 *		for the receiver side seqno. Since -updateresync was used, assume they are in sync and
		 *		start replicating from the earliest seqno for which we have a history record on the primary.
		 *	4) If recvd_seqno is 1. In this case, the receiver instance has been created afresh so its instance
		 *		file is empty and we are guaranteed there is nothing to compare. So no point requesting it.
		 *		Besides, this is a very common situation in practice that requiring -updateresync in this
		 *		case seems user-unfriendly so we will let this one go by without a -updateresync particularly
		 *		because there is no harm that can happen by allowing two such instances to connect/replicate.
		 */
		assert(0 != recvd_seqno);
		if (1 == recvd_seqno)
			skip_last_histinfo_check = TRUE;
		else
		{
			if ((GTMSOURCE_WAITING_FOR_RESTART != gtmsource_state) && already_communicated)
				skip_last_histinfo_check = TRUE;
			else if (START_FLAG_UPDATERESYNC & recvd_start_flags)
			{
				repl_log(gtmsource_log_fp, TRUE, TRUE, "REPL_START_JNL_SEQNO message has "
					"START_FLAG_UPDATERESYNC bit set\n");
				if (REPL_PROTO_VER_SUPPLEMENTARY > remote_side->proto_ver)
					skip_last_histinfo_check = TRUE;
				else
				{
					assert(jnlpool->repl_inst_filehdr->num_histinfo); /* should be at least 1 history record */
					/* If -updateresync is specified and receiver instance seqno is exactly equal to the
					 * start_seqno of the earliest history record in the instance file, then skip last
					 * histinfo check. Note that in case both source and receiver instances are supplementary,
					 * we should be looking at the 0th stream only. Even in that case, we are guaranteed that
					 * the 0th history record in the instance file corresponds to the 0th stream. So it is
					 * safe to look at the start_seqno of just the 0th history record in all cases.
					 */
					grab_lock(jnlpool->jnlpool_dummy_reg, TRUE, HANDLE_CONCUR_ONLINE_ROLLBACK);
					if (GTMSOURCE_HANDLE_ONLN_RLBK == gtmsource_state)
					{
						poll_time = REPL_POLL_NOWAIT;
						continue;
					}
					status = repl_inst_histinfo_get(0, &local_histinfo);
					assert(0 == status); /* Since we pass histinfo_num of 0 which is >=0 and < num_histinfo */
					rel_lock(jnlpool->jnlpool_dummy_reg);
					if (local_histinfo.start_seqno == recvd_seqno)
						skip_last_histinfo_check = TRUE;
					else
						skip_last_histinfo_check = FALSE;
				}
			} else
				skip_last_histinfo_check = FALSE;
		}
		if (!skip_last_histinfo_check)
		{	/* Find histinfo record in the local instance file corresponding to seqno "recvd_seqno-1" */
			grab_lock(jnlpool->jnlpool_dummy_reg, TRUE, HANDLE_CONCUR_ONLINE_ROLLBACK);
			if (GTMSOURCE_HANDLE_ONLN_RLBK == gtmsource_state)
			{
				poll_time = REPL_POLL_NOWAIT;
				continue;
			}
			assert(recvd_seqno <= local_jnl_seqno);
			assert(recvd_seqno <= jctl->jnl_seqno);
			assert((INVALID_SUPPL_STRM == strm_index) || (0 == strm_index));
			status = repl_inst_histinfo_find_seqno(recvd_seqno, strm_index, &local_histinfo);
			rel_lock(jnlpool->jnlpool_dummy_reg);
			assert((0 != status) || (local_histinfo.start_seqno < recvd_seqno));
			if (0 != status)
			{	/* If recvd_seqno is the earliest history record's start_seqno and -udpateresync was
				 * specified, assume the two instances are in sync. Otherwise issue error and close connection.
				 * Send this error status to the receiver server before closing the connection.
				 * This way the receiver will know to shut down rather than loop back trying to
				 * reconnect. This avoids an infinite loop of connection open and closes
				 * between the source server and receiver server.
				 */
				assert(ERR_REPLINSTNOHIST == status); /* only error returned by "repl_inst_histinfo_find_seqno" */
				assert((INVALID_HISTINFO_NUM == local_histinfo.histinfo_num)
						|| (local_histinfo.start_seqno >= recvd_seqno));
				if (!(START_FLAG_UPDATERESYNC & recvd_start_flags)
					|| (INVALID_HISTINFO_NUM == local_histinfo.histinfo_num)
					|| (local_histinfo.start_seqno > recvd_seqno))
				{	/* recvd_seqno is PRIOR to the starting seqno of the instance file.
					 * In that case, issue error and close the connection.
					 */

					SNPRINTF(histdetail, OUT_LINE, "seqno "INT8_FMT" "INT8_FMTX, recvd_seqno - 1,
						recvd_seqno - 1);
					gtm_putmsg_csa(CSA_ARG(NULL) VARLSTCNT(6) ERR_REPLINSTNOHIST, 4,
							LEN_AND_STR(histdetail), LEN_AND_STR(udi->fn));
					instnohist_msg.type = REPL_INST_NOHIST;
					instnohist_msg.len = MIN_REPL_MSGLEN;
					memset(&instnohist_msg.msg[0], 0, SIZEOF(instnohist_msg.msg));
					gtmsource_repl_send((repl_msg_ptr_t)&instnohist_msg, "REPL_INST_NOHIST",
						MAX_SEQNO, INVALID_SUPPL_STRM);
					repl_log(gtmsource_log_fp, TRUE, TRUE,
					       "Connection reset due to above REPLINSTNOHIST error\n");
					repl_close(&gtmsource_sock_fd);
					SHORT_SLEEP(GTMSOURCE_WAIT_FOR_RECEIVER_CLOSE_CONN);
					gtmsource_state = gtmsource_local->gtmsource_state
						= GTMSOURCE_WAITING_FOR_CONNECTION;
					continue;
				}
				assert((0 == local_histinfo.histinfo_num) && (local_histinfo.start_seqno == recvd_seqno));
			}
			if (local_histinfo.start_seqno < recvd_seqno)
			{
				/* Find histinfo record in the remote instance file corresponding to seqno "recvd_seqno-1" */
				retval = gtmsource_get_remote_histinfo(recvd_seqno, &remote_histinfo);
				if (retval)
				{
					assert(remote_histinfo.start_seqno < recvd_seqno);
					/* Check if primary and secondary have same histinfo for "recvd_seqno-1" */
					rollback_first = !gtmsource_is_histinfo_identical(&remote_histinfo, &local_histinfo,
												recvd_seqno, OK_TO_LOG_TRUE);
					/* If local and remote sides are supplementary (i.e. P->Q replication), verify each
					 * stream level history as well. Do this only if the remote side is a receiver server
					 * (i.e. not rollback) and if we still intend on sending a REPL_WILL_RESTART_WITH_INFO
					 * message.
					 */
					assert(this_side->is_supplementary == jnlpool->repl_inst_filehdr->is_supplementary);
					if (this_side->is_supplementary && remote_side->is_supplementary
							&& (REPL_START_JNL_SEQNO == recvd_msg_type)
							&& !rollback_first && !secondary_ahead)
						retval = gtmsource_check_remote_strm_histinfo(recvd_seqno, &rollback_first);
				}
				if (!retval)
				{
					if (GTMSOURCE_CHANGING_MODE == gtmsource_state)
						return (SS_NORMAL);
					else if ((GTMSOURCE_WAITING_FOR_CONNECTION == gtmsource_state)
							|| (GTMSOURCE_HANDLE_ONLN_RLBK == gtmsource_state))
					{
						poll_time = REPL_POLL_NOWAIT;
						continue;
					}
					else
					{	/* Got a REPL_XOFF_ACK_ME from receiver. Restart the initial handshake */
						assert(GTMSOURCE_WAITING_FOR_RESTART == gtmsource_state);
						continue;
					}
				}
			}
		}
		QWASSIGN(sav_read_jnl_seqno, gtmsource_local->read_jnl_seqno);
		reply_msgp = (repl_start_reply_msg_ptr_t)gtmsource_msgp;
		memset(reply_msgp, 0, SIZEOF(*reply_msgp)); /* to identify older releases in the future */
		reply_msgp->len = MIN_REPL_MSGLEN;
		reply_msgp->proto_ver = REPL_PROTO_VER_THIS;
		reply_msgp->node_endianness = NODE_ENDIANNESS;
		reply_msgp->is_supplementary = jnlpool->repl_inst_filehdr->is_supplementary;
		assert((1 != recvd_seqno) || !rollback_first);
		if ((GTMSOURCE_SEARCHING_FOR_RESTART == gtmsource_state) || (REPL_START_JNL_SEQNO == recvd_msg_type))
		{
			gtmsource_state = gtmsource_local->gtmsource_state = GTMSOURCE_SEARCHING_FOR_RESTART;
			/* If the last histinfo record in both instances are NOT the same ("rollback_first" is TRUE)
			 * (possible only if the secondary is multi-site), or if secondary is ahead of the primary
			 * ("secondary_ahead" is TRUE) we do want the secondary to rollback first. Issue message to
			 * do rollback fetchresync. There is one exception though. And that is if -NORESYNC was
			 * specified on the receiver side. In this case, determine the resync/common point by comparing
			 * local and remote histinfo records from the tail of the instance file until we reach
			 * one seqno whose histinfo information is identical in both.
			 * Use this as the common point to send a REPL_WILL_RESTART_WITH_INFO message.
			 */
			poll_time = REPL_POLL_NOWAIT;
			if (!rollback_first && !secondary_ahead)
				resync_seqno = recvd_seqno;
			else if (START_FLAG_NORESYNC & recvd_start_flags)
			{
				repl_log(gtmsource_log_fp, TRUE, TRUE, "REPL_START_JNL_SEQNO message has "
					"START_FLAG_NORESYNC bit set\n");
				assert(!skip_last_histinfo_check);
				assert(1 != recvd_seqno);
				if (!rollback_first)
				{
					assert(secondary_ahead);
					assert(recvd_seqno == local_jnl_seqno);
					/* The primary and secondary are in sync as of "recvd_seqno" the jnl seqno of the
					 * primary. So that is the common point. Send it across.
					 */
					resync_seqno = recvd_seqno;
				} else
				{
					resync_seqno = gtmsource_find_resync_seqno(&local_histinfo, &remote_histinfo);
					assert((MAX_SEQNO != resync_seqno) || (GTMSOURCE_CHANGING_MODE == gtmsource_state)
						|| (GTMSOURCE_WAITING_FOR_CONNECTION == gtmsource_state)
						|| (GTMSOURCE_HANDLE_ONLN_RLBK == gtmsource_state));
					rollback_first = FALSE;
				}
			} else
			{	/* Ask secondary to issue a fetchresync rollback */
				REPL_DPRINT1("Secondary instance needs to first do MUPIP JOURNAL ROLLBACK FETCHRESYNC\n");
				resync_seqno = local_jnl_seqno;
				poll_time = REPL_POLL_NOWAIT;
				rollback_first = TRUE;
			}
			if (MAX_SEQNO != resync_seqno)
			{
				QWASSIGN(*(seq_num *)&reply_msgp->start_seqno[0], resync_seqno);
				if (!rollback_first)
				{
					assert(NULL != gd_header);
					assert(0 < gd_header->n_regions);
					grab_gtmsource_srv_latch(gtmsource_srv_latch, 2 * gd_header->n_regions * max_epoch_interval,
									HANDLE_CONCUR_ONLINE_ROLLBACK);
#					ifdef DEBUG
					if (WBTEST_ENABLED(WBTEST_HOLD_GTMSOURCE_SRV_LATCH))
						while (0 == TREF(continue_proc_cnt))
							LONG_SLEEP(1);
#					endif
					if (GTMSOURCE_HANDLE_ONLN_RLBK == gtmsource_state)
					{
						poll_time = REPL_POLL_NOWAIT;
						continue;
					}
					srch_status = gtmsource_srch_restart(resync_seqno, recvd_start_flags);
					rel_gtmsource_srv_latch(&gtmsource_local->gtmsource_srv_latch);
					assert(resync_seqno == gtmsource_local->read_jnl_seqno);
					assert(SS_NORMAL == srch_status);
					reply_msgp->type = REPL_WILL_RESTART_WITH_INFO;
					reply_msgp->jnl_ver = this_side->jnl_ver;
					temp_ulong = (0 == this_side->is_std_null_coll) ?  START_FLAG_NONE : START_FLAG_COLL_M;
					GTMTRIG_ONLY(
						assert(this_side->trigger_supported);
						temp_ulong |= START_FLAG_TRIGGER_SUPPORT;
					)
#					ifdef GTM_TLS
					if (REPL_TLS_REQUESTED)
					{
						if (!remote_side->tls_requested)
						{
							ISSUE_REPLNOTLS(ERR_REPLNOTLS, SRC_SIDE_STR, RCVR_SIDE_STR);
							CLEAR_REPL_TLS_REQUESTED; /* As if -tlsid qualifier was never specified. */
							repl_log(gtmsource_log_fp, TRUE, TRUE, "Plaintext fallback enabled. "
									"Continuing without TLS/SSL.\n");
						} else
							temp_ulong |= START_FLAG_ENABLE_TLS;
					}
#					endif
					PUT_ULONG(reply_msgp->start_flags, temp_ulong);
					recvd_start_flags = START_FLAG_NONE;
					gtmsource_repl_send((repl_msg_ptr_t)reply_msgp, "REPL_WILL_RESTART_WITH_INFO",
						resync_seqno, INVALID_SUPPL_STRM);
				} else
				{	/* Secondary needs to first do FETCHRESYNC rollback to synchronize with primary */
					reply_msgp->type = REPL_ROLLBACK_FIRST;
					poll_time = REPL_POLL_NOWAIT;
					gtmsource_repl_send((repl_msg_ptr_t)reply_msgp, "REPL_ROLLBACK_FIRST",
						resync_seqno, INVALID_SUPPL_STRM);
				}
			}
		} else
		{	/* REPL_FETCH_RESYNC received and state is WAITING_FOR_RESTART */
			if (rollback_first || secondary_ahead)
			{	/* Primary and Secondary are currently not in sync */
				if (!rollback_first)
				{	/* We know the secondary is ahead of the primary in terms of journal seqno but the last
					 * histinfo records are identical. This means that the secondary is in sync with the
					 * primary until the primary's journal seqno ("local_jnl_seqno") which should be the new
					 * resync seqno.
					 */
					resync_seqno = local_jnl_seqno;
				} else
				{	/* Determine the resync seqno between this primary and secondary by comparing
					 * local and remote histinfo records from the tail of the instance file until we reach
					 * one seqno whose histinfo information is identical in both.
					 */
					assert(1 != recvd_seqno);
					resync_seqno = gtmsource_find_resync_seqno(&local_histinfo, &remote_histinfo);
					assert((MAX_SEQNO != resync_seqno) || (GTMSOURCE_CHANGING_MODE == gtmsource_state)
						|| (GTMSOURCE_WAITING_FOR_CONNECTION == gtmsource_state)
						|| (GTMSOURCE_HANDLE_ONLN_RLBK == gtmsource_state));
				}
			} else
			{	/* Primary and Secondary are in sync upto "recvd_seqno". Send it back as the new resync seqno. */
				resync_seqno = recvd_seqno;
			}
			if (MAX_SEQNO != resync_seqno)
			{
				assert(GTMSOURCE_WAITING_FOR_RESTART == gtmsource_state && REPL_FETCH_RESYNC == recvd_msg_type);
				reply_msgp->type = REPL_RESYNC_SEQNO;
				QWASSIGN(*(seq_num *)&reply_msgp->start_seqno[0], resync_seqno);
				gtmsource_repl_send((repl_msg_ptr_t)reply_msgp, "REPL_RESYNC_SEQNO",
					resync_seqno, INVALID_SUPPL_STRM);
			}
		}
		if (GTMSOURCE_CHANGING_MODE == gtmsource_state)
			return (SS_NORMAL);	/* "gtmsource_repl_send" or "gtmsource_find_resync_seqno" did not complete */
		if ((GTMSOURCE_WAITING_FOR_CONNECTION == gtmsource_state) || (GTMSOURCE_HANDLE_ONLN_RLBK == gtmsource_state))
		{
			poll_time = REPL_POLL_NOWAIT;
			continue;	/* "gtmsource_repl_send" or "gtmsource_find_resync_seqno" did not complete */
		}
		assert(MAX_SEQNO != resync_seqno);
		/* After having established connection, initialize a few fields in the gtmsource_local
		 * structure and flush those changes to the instance file on disk.
		 */
		grab_lock(jnlpool->jnlpool_dummy_reg, TRUE, HANDLE_CONCUR_ONLINE_ROLLBACK);
		if (GTMSOURCE_HANDLE_ONLN_RLBK == gtmsource_state)
		{
			poll_time = REPL_POLL_NOWAIT;
			continue;
		}
		gtmsource_local->connect_jnl_seqno = jctl->jnl_seqno;
		gtmsource_local->send_losttn_complete = jctl->send_losttn_complete;
		rel_lock(jnlpool->jnlpool_dummy_reg);
		/* Now that "connect_jnl_seqno" has been updated, flush it to corresponding gtmsrc_lcl on disk */
		grab_gtmsource_srv_latch(gtmsource_srv_latch, UINT32_MAX, HANDLE_CONCUR_ONLINE_ROLLBACK);
		if (GTMSOURCE_HANDLE_ONLN_RLBK == gtmsource_state)
		{
			poll_time = REPL_POLL_NOWAIT;
			continue;
		}
		repl_inst_flush_gtmsrc_lcl();
		rel_gtmsource_srv_latch(gtmsource_srv_latch);
		if (REPL_WILL_RESTART_WITH_INFO != reply_msgp->type)
		{
			assert(reply_msgp->type == REPL_RESYNC_SEQNO || reply_msgp->type == REPL_ROLLBACK_FIRST);
			if ((REPL_RESYNC_SEQNO == reply_msgp->type) && secondary_was_rootprimary)
			{
				repl_log(gtmsource_log_fp, TRUE, TRUE, "Sent REPL_RESYNC_SEQNO message with SEQNO "
					INT8_FMT" "INT8_FMTX"\n",
					(*(seq_num *)&reply_msgp->start_seqno[0]), (*(seq_num *)&reply_msgp->start_seqno[0]));
				region_top = gd_header->regions + gd_header->n_regions;
				assert(NULL != jnlpool->jnlpool_dummy_reg);
				grab_lock(jnlpool->jnlpool_dummy_reg, TRUE, HANDLE_CONCUR_ONLINE_ROLLBACK);
				if (GTMSOURCE_HANDLE_ONLN_RLBK == gtmsource_state)
				{
					poll_time = REPL_POLL_NOWAIT;
					continue;
				}
				zqgblmod_seqno = jctl->max_zqgblmod_seqno;
				if (0 == zqgblmod_seqno)
				{	/* If zqgblmod_seqno in all file headers is 0, it implies that this is the first
					 * FETCHRESYNC rollback after the most recent MUPIP REPLIC -LOSTTNCOMPLETE command.
					 * Therefore reset zqgblmod_seqno to the rollback seqno. If not the first rollback,
					 * then zqgblmod_seqno will be reset only if the new rollback seqno is lesser
					 * than the current value.
					 */
					zqgblmod_seqno = MAX_SEQNO;	/* actually 0xFFFFFFFFFFFFFFFF (max possible seqno) */
					/* Reset any pending MUPIP REPLIC -SOURCE -LOSTTNCOMPLETE */
					jctl->send_losttn_complete = FALSE;
					gtmsource_local->send_losttn_complete = jctl->send_losttn_complete;
					poll_time = REPL_POLL_NOWAIT;
				}
				rel_lock(jnlpool->jnlpool_dummy_reg);
				REPL_DPRINT2("BEFORE FINDING RESYNC - zqgblmod_seqno is "INT8_FMT, zqgblmod_seqno);
				REPL_DPRINT2(", curr_seqno is "INT8_FMT"\n", jctl->jnl_seqno);
				if (zqgblmod_seqno > resync_seqno)
				{	/* reset "zqgblmod_seqno" and "zqgblmod_tn" in all fileheaders to "resync_seqno" */
					GTMSOURCE_SAVE_STATE(gtmsource_state_sav);
					if (SS_NORMAL != gtmsource_update_zqgblmod_seqno_and_tn(resync_seqno))
					{
						assert(GTMSOURCE_HANDLE_ONLN_RLBK == gtmsource_state);
						if (GTMSOURCE_HANDLE_ONLN_RLBK == gtmsource_state)
						{
							poll_time = REPL_POLL_NOWAIT;
							continue;
						}
					}
					if (GTMSOURCE_CHANGING_MODE == gtmsource_state)
						return (SS_NORMAL);
					if (GTMSOURCE_NOW_TRANSITIONAL(gtmsource_state_sav))
						continue;
				}
			}
			/* Could send a REPL_CLOSE_CONN message here */
			/* It is expected that on receiving this msg, the Receiver Server will break the connection and exit. */
		 	repl_close(&gtmsource_sock_fd);
		 	LONG_SLEEP(GTMSOURCE_WAIT_FOR_RECEIVER_TO_QUIT); /* may not be needed after REPL_CLOSE_CONN is sent */
		 	gtmsource_state = gtmsource_local->gtmsource_state = GTMSOURCE_WAITING_FOR_CONNECTION;
		 	continue;
		}
		/* Now that REPL_WILL_RESTART_WITH_INFO message has been sent, if compression of the replication stream is
		 * requested, check if the receiver server supports ability to decompress. Don't do this if this receiver has
		 * previously sent a REPL_CMP2UNCMP message.
		 */
		gtmsource_local->repl_zlib_cmp_level = repl_zlib_cmp_level = ZLIB_CMPLVL_NONE;	/* no compression by default */
		if (!gtmsource_received_cmp2uncmp_msg && (ZLIB_CMPLVL_NONE != gtm_zlib_cmp_level))
		{
			if (REPL_PROTO_VER_MULTISITE_CMP <= remote_side->proto_ver)
			{	/* Receiver server is running a version of GT.M that supports compression of replication stream.
				 * Send test message with compressed data to check if it is able to decompress properly. If so,
				 * enable compression on the replication pipe. Compression level set in repl_zlib_cmp_level.
				 */
				if (!gtmsource_get_cmp_info(&repl_zlib_cmp_level))
				{
					if (GTMSOURCE_CHANGING_MODE == gtmsource_state)
						return (SS_NORMAL);
					else if (GTMSOURCE_WAITING_FOR_CONNECTION == gtmsource_state)
						continue;
					else
					{	/* Got a REPL_XOFF_ACK_ME from receiver. Restart the initial handshake */
						assert(GTMSOURCE_WAITING_FOR_RESTART == gtmsource_state);
						continue;
					}
				}
				/* Note down replication cmp_level in this source-server specific structure in journal pool */
				gtmsource_local->repl_zlib_cmp_level = repl_zlib_cmp_level;
			} else
			{
				repl_log(gtmsource_log_fp, TRUE, FALSE,
					"Receiver server does not support compressed data on the replication pipe\n");
				repl_log(gtmsource_log_fp, TRUE, FALSE, "Defaulting to NO compression\n");
			}
		}
#		ifdef GTM_TLS
		if (!repl_tls.enabled && REPL_TLS_REQUESTED && remote_side->tls_requested)
		{	/* Now that START_FLAGS have been exchanged and both sides request TLS/SSL, do the handshake and establish
			 * an TLS/SSL connection. Even though the TCP connection is established much earlier, we want to do the
			 * TLS/SSL handshake at a point that is close to its purpose which is to send the journal records.
			 */
			assert(REPL_PROTO_VER_TLS_SUPPORT <= REPL_PROTO_VER_THIS);
			assert(REPL_PROTO_VER_TLS_SUPPORT <= remote_side->proto_ver);
			if (!gtmsource_exchange_tls_info())
			{
				switch (gtmsource_state)
				{
					case GTMSOURCE_CHANGING_MODE:		/* ACTIVE->PASSIVE mode change in poll actions. */
						return SS_NORMAL;
					case GTMSOURCE_WAITING_FOR_CONNECTION:	/* Disconnect during gtmsource_repl_{send,recv} */
					case GTMSOURCE_WAITING_FOR_RESTART:	/* Got a REPL_XOFF_ACK_ME from receiver. Restart. */
						continue;
					default:
						assert(FALSE);
				}
			} else
			{
				repl_tls.enabled = TRUE; /* From here on, all communications are secured with TLS/SSL. */
				repl_tls.renegotiate_state = REPLTLS_RENEG_STATE_NONE;
				next_renegotiate_hrtbt = FALSE;
				hrtbt_cnt = 0;
			}
		}
#		endif
		if (NULL != repl_ctl_list)
		{	/* The journal files may have been positioned
			 *	a) Ahead of the read_jnl_seqno for the next read in which case indicate that they have to be
			 *		repositioned into the past OR
			 *	b) Behind the read_jnl_seqno for the next read and ctl->lookback might have been set when
			 *		the disconnect of the previous connection occured and the newer connection happens
			 *		with a receiver database that is much more ahead in time than when it disconnected.
			 *		In this case fix ctl->lookback. (GTM-8547)
			 */
			assert(READ_FILE == gtmsource_local->read_state);
			gtmsource_set_lookback();	/* In case we read ahead, enable looking back. */
		}
		/* The variable poll_time indicates if we should wait for the receive pipe to be I/O ready and should be set to
		 * a non-zero value ONLY if the source server has nothing to send. At this point we have data to send and so
		 * set poll_time to no-wait.
		 */
		poll_time = REPL_POLL_NOWAIT;
		gtmsource_state = gtmsource_local->gtmsource_state = GTMSOURCE_SENDING_JNLRECS;
		assert(1 <= gtmsource_local->read_jnl_seqno);
		/* Now that "gtmsource_local->read_jnl_seqno" is initialized, ensure the first send gets logged. */
		gtmsource_reinit_logseqno();
		gtmsource_init_heartbeat();
		/* Internal filters are needed as long as the filter format of the originating side is greater or equal to the
		 * filter format of the secondary side
		 */
		if ((this_side->jnl_ver >= remote_side->jnl_ver)
			&& (IF_NONE != repl_filter_cur2old[remote_side->jnl_ver - JNL_VER_EARLIEST_REPL]))
		{
			assert(IF_INVALID != repl_filter_cur2old[remote_side->jnl_ver - JNL_VER_EARLIEST_REPL]);
			assert(IF_INVALID != repl_filter_old2cur[remote_side->jnl_ver - JNL_VER_EARLIEST_REPL]);
			/* reverse transformation should exist */
			assert(IF_NONE != repl_filter_old2cur[remote_side->jnl_ver - JNL_VER_EARLIEST_REPL]);
			if (this_side->is_std_null_coll != remote_side->is_std_null_coll)
				remote_side->null_subs_xform = (this_side->is_std_null_coll ?
							STDNULL_TO_GTMNULL_COLL : GTMNULL_TO_STDNULL_COLL);
			else
				remote_side->null_subs_xform = FALSE;
			gtmsource_filter |= INTERNAL_FILTER;
			gtmsource_alloc_filter_buff(gtmsource_msgbufsiz);
		} else
		{
			gtmsource_filter &= ~INTERNAL_FILTER;
			if (NO_FILTER == gtmsource_filter)
				gtmsource_free_filter_buff();
		}
		/* Reset some variables to their initial value before entering the while loop (to safeguard against stale values) */
		xon_wait_logged = FALSE;
		heartbeat_stalled = FALSE;
#		ifdef GTM_TLS
		DEBUG_ONLY(renegotiation_pending = FALSE);
		if (repl_tls.enabled && (0 < gtmsource_options.renegotiate_interval))
			repl_tls.renegotiate_state = REPLTLS_WAITING_FOR_RENEG_TIMEOUT;
#		endif
		/* Flush "gtmsource_local->read_jnl_seqno" to disk right now. This will serve as a reference point for next timed
		 * flush to occur.
		 */
		gtmsource_flush_fh(gtmsource_local->read_jnl_seqno, true);
		if (GTMSOURCE_HANDLE_ONLN_RLBK == gtmsource_state)
		{
			poll_time = REPL_POLL_NOWAIT;
			continue;
		}
		gtmsource_local->send_new_histrec = TRUE;	/* Send new histinfo unconditionally at start of connection */
		gtmsource_local->next_histinfo_seqno = MAX_SEQNO; /* Initial value. Reset by "gtmsource_send_new_histrec" below */
		assert(-1 == gtmsource_local->next_histinfo_num);
		prev_now = gtmsource_now;
		phase2_commit_index1 = 0;
		phase2_commit_wait_cnt = 0;
		while (TRUE)
		{
			gtmsource_poll_actions(TRUE);
			if (GTMSOURCE_CHANGING_MODE == gtmsource_state)
				return (SS_NORMAL);
			else if (GTMSOURCE_HANDLE_ONLN_RLBK == gtmsource_state)
			{
				poll_time = REPL_POLL_NOWAIT;
				break; /* The outerloop will continue */
			}
			if (GTMSOURCE_WAITING_FOR_CONNECTION == gtmsource_state)
				break;
			if (gtmsource_local->send_losttn_complete)
			{	/* Send LOSTTNCOMPLETE across to the secondary and reset flag to FALSE */
				losttncomplete_msg.type = REPL_LOSTTNCOMPLETE;
				losttncomplete_msg.len = MIN_REPL_MSGLEN;
				gtmsource_repl_send((repl_msg_ptr_t)&losttncomplete_msg, "REPL_LOSTTNCOMPLETE",
					MAX_SEQNO, INVALID_SUPPL_STRM);
				grab_lock(jnlpool->jnlpool_dummy_reg, TRUE, HANDLE_CONCUR_ONLINE_ROLLBACK);
				if (GTMSOURCE_HANDLE_ONLN_RLBK == gtmsource_state)
				{
					poll_time = REPL_POLL_NOWAIT;
					break; /* the outerloop will continue */
				}
				gtmsource_local->send_losttn_complete = FALSE;
				rel_lock(jnlpool->jnlpool_dummy_reg);
			}
			if (gtmsource_local->send_new_histrec)
			{	/* We are at the beginning of a new histinfo record boundary. Send a REPL_HISTREC message
				 * before sending journal records for seqnos corresponding to this histinfo.
				 */
				assert(REPL_PROTO_VER_MULTISITE <= remote_side->proto_ver);
				/* Remote version supports multi-site functionality. Send REPL_HISTREC and friends */
				gtmsource_send_new_histrec();
				if (GTMSOURCE_CHANGING_MODE == gtmsource_state)
					return (SS_NORMAL);
				if ((GTMSOURCE_WAITING_FOR_CONNECTION == gtmsource_state)
					|| (GTMSOURCE_HANDLE_ONLN_RLBK == gtmsource_state))
					break;
				assert(FALSE == gtmsource_local->send_new_histrec);
			}
			if (prev_now != gtmsource_now)
			{
				prev_now = gtmsource_now;
				if (gtmsource_msgbufsiz - MAX_REPL_MSGLEN > 2 * OS_PAGE_SIZE)
				{	/* We have expanded the buffer by too much (could have been avoided had we sent one
					 * transaction at a time while reading from journal files); let's revert back to our
					 * initial buffer size. If we don't reduce our buffer, it is possible that the buffer keeps
					 * growing (while reading * from journal file) thus making the size of sends while reading
					 * from journal pool very large (> 1 MB).
					 */
					gtmsource_free_filter_buff();
					gtmsource_free_msgbuff();
					gtmsource_alloc_msgbuff(MAX_REPL_MSGLEN, TRUE); /* will also allocate filter buffer */
				}
			}
#			if defined(DEBUG) && defined(GTM_TLS)
			if (repl_tls.enabled && (WBTEST_ENABLED(WBTEST_INDUCE_TLSIOERR)))
				RTS_ERROR_CSA_ABT(NULL, VARLSTCNT(4) ERR_TLSIOERROR, 2, LEN_AND_LIT("WBTEST_INDUCE_TLSIOERR"));
#			endif
			/* GTMSOURCE_SAVE_STATE() and GTMSOURCE_NOW_TRANSITIONAL() check are not needed
			 * here as the existing logic handles transitions.
			 */
			gtmsource_recv_ctl();
			if (GTMSOURCE_CHANGING_MODE == gtmsource_state)
				return SS_NORMAL;
			if ((GTMSOURCE_WAITING_FOR_CONNECTION == gtmsource_state)
					|| (GTMSOURCE_HANDLE_ONLN_RLBK == gtmsource_state))
			{
				poll_time = REPL_POLL_NOWAIT;
				break;
			}
#			ifdef GTM_TLS
			/* If we are waiting for a REPL_RENEG_ACK from the receiver, don't send any more messages (even journal
			 * records) before completing the renegotiation.
			 */
			if (REPLTLS_WAITING_FOR_RENEG_ACK == repl_tls.renegotiate_state)
				continue;
#			endif
			if (GTMSOURCE_WAITING_FOR_XON == gtmsource_state)
			{
				if (!xon_wait_logged)
				{
					repl_log(gtmsource_log_fp, TRUE, TRUE, "Waiting to receive XON\n");
					heartbeat_stalled = TRUE;
					REPL_DPRINT1("Stalling HEARTBEAT\n");
					xon_wait_logged = TRUE;
				}
				continue;
			}
			if ((GTMSOURCE_SEARCHING_FOR_RESTART  == gtmsource_state)
					|| (GTMSOURCE_WAITING_FOR_CONNECTION == gtmsource_state))
				break;
			assert(gtmsource_state == GTMSOURCE_SENDING_JNLRECS);
			pre_read_seqno = gtmsource_local->read_jnl_seqno;
			GTMTLS_ONLY(assert(!renegotiation_pending));
			grab_gtmsource_srv_latch(gtmsource_srv_latch, 2 * gd_header->n_regions * max_epoch_interval,
							HANDLE_CONCUR_ONLINE_ROLLBACK);
			if (GTMSOURCE_HANDLE_ONLN_RLBK == gtmsource_state)
			{
				poll_time = REPL_POLL_NOWAIT;
				break; /* the outerloop will continue */
			}
			GTMSOURCE_SAVE_STATE(gtmsource_state_sav);
			tot_tr_len = gtmsource_get_jnlrecs(&gtmsource_msgp->msg[0], &data_len,
							   gtmsource_msgbufsiz - REPL_MSG_HDRLEN,
							   !(gtmsource_filter & EXTERNAL_FILTER));
			rel_gtmsource_srv_latch(&gtmsource_local->gtmsource_srv_latch);
			/* It is safe to send the journal records as we are guaranteed NO online rollback happened in between
			 * and so we won't be sending garbage
			 */
			if (GTMSOURCE_CHANGING_MODE == gtmsource_state)
				return (SS_NORMAL);
			if (GTMSOURCE_WAITING_FOR_CONNECTION == gtmsource_state)
				break;
			if (GTMSOURCE_NOW_TRANSITIONAL(gtmsource_state_sav))
				continue;
			if (GTMSOURCE_SEND_NEW_HISTINFO == gtmsource_state)
			{	/* This is a signal from "gtmsource_get_jnlrecs" to send a REPL_HISTREC message first
				 * before sending any more seqnos across. Set "gtmsource_local->send_new_histrec" to TRUE.
				 */
				assert(0 == tot_tr_len);
				gtmsource_local->send_new_histrec = TRUE; /* Will cause a new histinfo record to be sent first */
				gtmsource_state = gtmsource_local->gtmsource_state = GTMSOURCE_SENDING_JNLRECS;
				poll_time = REPL_POLL_NOWAIT;
				continue;	/* Send a REPL_HISTREC message first and then send journal records */
			}
			post_read_seqno = gtmsource_local->read_jnl_seqno;
			if (0 <= tot_tr_len)
			{
				if (0 < data_len)
				{
					APPLY_EXT_FILTER_IF_NEEDED(gtmsource_filter, gtmsource_msgp, data_len, tot_tr_len);
					gtmsource_msgp->type = REPL_TR_JNL_RECS;
					gtmsource_msgp->len = data_len + REPL_MSG_HDRLEN;
					send_msgp = gtmsource_msgp;
					send_tr_len = tot_tr_len;
					intfilter_error = FALSE;
					if (gtmsource_filter & INTERNAL_FILTER)
					{
						in_buff = gtmsource_msgp->msg;
						in_buflen = data_len; /* jrec size of the FIRST SEQNO in the converted buffer */
						out_buffmsg = repl_filter_buff;
						out_buff = out_buffmsg + REPL_MSG_HDRLEN;
						out_bufsiz = repl_filter_bufsiz - REPL_MSG_HDRLEN;
						remaining_len = tot_tr_len;	/* jrec size of ALL SEQNOs ( >= 1) in buffer */
						while (JREC_PREFIX_SIZE <= remaining_len)
						{
							filter_seqno = ((struct_jrec_null *)(in_buff))->jnl_seqno;
							DEBUG_ONLY(
								save_inbuff = in_buff;
								save_outbuff = out_buff;
							)
							APPLY_INT_FILTER(in_buff, in_buflen, out_buff, out_buflen,
											out_bufsiz, status);
							/* Internal filters should not modify the incoming pointers. Assert that. */
							assert((save_inbuff == in_buff) && (save_outbuff == out_buff));
							if (SS_NORMAL == status)
							{	/* adjust various pointers and book-keeping values to move to next
								 * record.
								 */
								((repl_msg_ptr_t)(out_buffmsg))->type = REPL_TR_JNL_RECS;
								((repl_msg_ptr_t)(out_buffmsg))->len = out_buflen + REPL_MSG_HDRLEN;
								out_buffmsg = (out_buff + out_buflen);
								remaining_len -= (in_buflen + REPL_MSG_HDRLEN);
								assert(0 <= remaining_len);
								if (0 >= remaining_len)
									break;
								in_buff += in_buflen;
								in_buflen = ((repl_msg_ptr_t)(in_buff))->len - REPL_MSG_HDRLEN;
								in_buff += REPL_MSG_HDRLEN;
								out_buff = (out_buffmsg + REPL_MSG_HDRLEN);
								out_bufsiz -= (out_buflen + REPL_MSG_HDRLEN);
								assert(0 <= (int)out_bufsiz);
							} else if (EREPL_INTLFILTER_NOSPC == repl_errno)
							{
								REALLOCATE_INT_FILTER_BUFF(out_buff, out_buffmsg, out_bufsiz);
								/* note that in_buff and in_buflen is not changed so that we can
								 * start from where we left
								 */
							} else /* fatal error from the internal filter */
							{
								intfilter_error = TRUE;
								break;
							}
						}
						assert((0 == remaining_len) || intfilter_error);
						GTMTRIG_ONLY(ISSUE_TRIG2NOTRIG_IF_NEEDED;)
						send_msgp = (repl_msg_ptr_t)repl_filter_buff;
						send_tr_len = out_buffmsg - repl_filter_buff;
						if (0 == send_tr_len)
						{	/* This is possible ONLY if the first transaction in the buffer read from
							 * journal pool or disk encountered error while doing internal filter
							 * conversion. Issue rts_error right away as there is nothing much we can
							 * do at this point.
							 */
							assert(intfilter_error);
							assert(filter_seqno == pre_read_seqno);
							INT_FILTER_RTS_ERROR(filter_seqno, repl_errno); /* no return */
						}
					}
					assert(send_tr_len && (0 == (send_tr_len % REPL_MSG_ALIGN)));
					/* ensure that the head of the buffer has the correct type and len */
					assert((REPL_TR_JNL_RECS == send_msgp->type)
							&& (0 == (send_msgp->len % JNL_REC_START_BNDRY)));
					/* At this point send_msgp is the buffer to be sent and send_tr_len is the send size */
					assert(remote_side->endianness_known);
					if (remote_side->cross_endian && (this_side->jnl_ver < remote_side->jnl_ver))
					{	/* Cross-endian replication with GT.M version on primary being lesser than that
						 * the secondary. Do the endian conversion in the primary so that the secondary
						 * can consume it as-is.
						 * No return if the below call to repl_tr_endian_convert fails.
						 */
						repl_tr_endian_convert(send_msgp, send_tr_len, pre_read_seqno);
					}
					pre_cmpmsglen = send_tr_len; /* send_tr_len will be updated below */
					if (ZLIB_CMPLVL_NONE != repl_zlib_cmp_level)
					{	/* Compress the journal records before replicating them across the pipe.
						 * Depending on whether the total data length to be sent is within a threshold
						 * or not (see repl_msg.h before REPL_TR_CMP_THRESHOLD #define for why), send
						 * either a REPL_TR_CMP_JNL_RECS or REPL_TR_CMP_JNL_RECS2 message
						 */
						msghdrlen = (REPL_TR_CMP_THRESHOLD > send_tr_len)
									? REPL_MSG_HDRLEN : REPL_MSG_HDRLEN2;
						cmpbuflen = gtmsource_cmpmsgbufsiz - msghdrlen;
						cmpbufptr = ((Bytef *)gtmsource_cmpmsgp) + msghdrlen;
						ZLIB_COMPRESS(cmpbufptr, cmpbuflen, send_msgp, send_tr_len,
								repl_zlib_cmp_level, cmpret);
						BREAK_IF_CMP_ERROR(cmpret, send_tr_len); /* Note: break stmt. inside the macro */
						if (Z_OK == cmpret)
						{	/* Send compressed buffer */
							send_msgp = gtmsource_cmpmsgp;
							if (REPL_TR_CMP_THRESHOLD > send_tr_len)
							{	/* Send REPL_TR_CMP_JNL_RECS message with 8-byte header */
								SET_8BYTE_CMP_MSGHDR(send_msgp, send_tr_len, cmpbuflen, msghdrlen);
							} else
							{	/* Send REPL_TR_CMP_JNL_RECS2 message with 16-byte header */
								SET_16BYTE_CMP_MSGHDR(send_msgp, send_tr_len, cmpbuflen, msghdrlen);
							}
						} else
						{	/* Send normal buffer */
							repl_log(gtmsource_log_fp, TRUE, FALSE, "Defaulting to NO compression\n");
							repl_zlib_cmp_level = ZLIB_CMPLVL_NONE;	/* no compression */
							gtmsource_local->repl_zlib_cmp_level = repl_zlib_cmp_level;
						}

					}
					assert((send_tr_len == pre_cmpmsglen) || (ZLIB_CMPLVL_NONE != repl_zlib_cmp_level));
					assert(0 == (send_tr_len % REPL_MSG_ALIGN));
					/* The following loop tries to send multiple seqnos in one shot. resync_seqno gets
					 * updated once the send is completely successful. If an error occurs in the middle
					 * of the send, it is possible that we successfully sent a few seqnos to the other side.
					 * In this case resync_seqno should be updated to reflect those seqnos. Not doing so
					 * might cause the secondary to get ahead of the primary in terms of resync_seqno.
					 * Although it is possible to determine the exact seqno where the send partially failed,
					 * we update resync_seqno as if all seqnos were successfully sent (It is ok for the
					 * resync_seqno on the primary side to be a little more than the actual value as long as
					 * the secondary side has an accurate value of resync_seqno. This is because the
					 * resync_seqno of the system is the minimum of the resync_seqno of both primary
					 * and secondary). This is done by the call to gtmsource_flush_fh() done within
					 * gtmsource_poll_actions() as well as in the (SS_NORMAL != status) if condition below.
					 * Note that all of this is applicable only in a dualsite replication scenario. In
					 * case of a multisite scenario, it is always the receiver server that tells the
					 * sequence number from where the source server should start sending. So, even if
					 * the source server notes down a higher value of journal sequence number in
					 * jnlpool->gtmsource_local->read_jnl_seqno, it is not a problem since the receiver
					 * server will communicate the appropriate sequence number as part of the histinfo
					 * exchange.
					 */
					REPL_SEND_LOOP(gtmsource_sock_fd, send_msgp, send_tr_len, REPL_POLL_WAIT)
					{
						gtmsource_poll_actions(FALSE);
						if (GTMSOURCE_CHANGING_MODE == gtmsource_state)
							return (SS_NORMAL);
						else if (GTMSOURCE_HANDLE_ONLN_RLBK == gtmsource_state)
						{
							poll_time = REPL_POLL_NOWAIT;
							break;
						}
					}
					if (GTMSOURCE_HANDLE_ONLN_RLBK == gtmsource_state)
					{
						poll_time = REPL_POLL_NOWAIT;
						break; /* The outerloop will continue */
					}
					if (SS_NORMAL != status)
					{
						gtmsource_flush_fh(post_read_seqno, true);
						if (GTMSOURCE_HANDLE_ONLN_RLBK == gtmsource_state)
						{
							poll_time = REPL_POLL_NOWAIT;
							break; /* the outerloop will continue */
						}
						if (EREPL_SEND == repl_errno)
						{
							if (REPL_CONN_RESET(status))
							{
								repl_log(gtmsource_log_fp, TRUE, TRUE,
									"Connection reset while sending seqno data from "
									INT8_FMT" "INT8_FMTX" to "INT8_FMT" "INT8_FMTX
									". Status = %d ; %s\n", pre_read_seqno,
									pre_read_seqno, post_read_seqno, post_read_seqno,
									 status, STRERROR(status));
								repl_log_conn_info(gtmsource_sock_fd, gtmsource_log_fp,
								TRUE);
								close_retry = TRUE;
							}
							if (close_retry)
							{
								repl_close(&gtmsource_sock_fd);
								SHORT_SLEEP(GTMSOURCE_WAIT_FOR_RECEIVER_CLOSE_CONN);
								gtmsource_state = gtmsource_local->gtmsource_state
									= GTMSOURCE_WAITING_FOR_CONNECTION;
								close_retry = FALSE;
								break;
							} else
							{
								SNPRINTF(err_string, SIZEOF(err_string),
                                                                	"Error sending DATA. Error in send : %s",
										STRERROR(status));
								SEND_SYSMSG_REPLCOMM(LEN_AND_STR(err_string));
								RTS_ERROR_CSA_ABT(NULL, VARLSTCNT(6) ERR_REPLCOMM, 0,
									ERR_TEXT, 2, LEN_AND_STR(err_string));
							}
						}
						if (EREPL_SELECT == repl_errno)
						{
							SNPRINTF(err_string, SIZEOF(err_string),
								"Error sending DATA. Error in select : %s",
											STRERROR(status));
							SEND_SYSMSG_REPLCOMM(LEN_AND_STR(err_string));
							RTS_ERROR_CSA_ABT(NULL, VARLSTCNT(6) ERR_REPLCOMM, 0, ERR_TEXT, 2,
								LEN_AND_STR(err_string));
						}
					}
					if (intfilter_error)
					{	/* Now that we are done sending whatever buffer was filter converted, issue
						 * the error. This will bring down the source server (due to the rts_error).
						 * At this point, jnlpool->gtmsource_local->read_jnl_seqno could effectively
						 * be behind the receiver server's journal sequence number. But, that is
						 * okay since as part of reconnection (when the source server comes back up),
						 * the receiver server will communicate the appropriate sequence number as
						 * part of the histinfo exchange.
						 */
						assert(filter_seqno <= post_read_seqno);
						INT_FILTER_RTS_ERROR(filter_seqno, repl_errno); /* no return */
					}
					jnlpool->gtmsource_local->read_jnl_seqno = post_read_seqno;
					repl_source_cmp_sent += (qw_num)send_tr_len;
					repl_source_msg_sent += (qw_num)pre_cmpmsglen;
					repl_source_data_sent += (qw_num)(pre_cmpmsglen)
								- (post_read_seqno - pre_read_seqno) * REPL_MSG_HDRLEN;
					log_seqno = post_read_seqno - 1; /* post_read_seqno is the "next" seqno to be sent,
									      * not the last one we sent */
					if (gtmsource_logstats || (log_seqno - lastlog_seqno >= log_interval))
					{	/* print always when STATSLOG is ON, or when the log interval has passed */
						trans_sent_cnt += (log_seqno - lastlog_seqno);
						/* jctl->jnl_seqno >= post_read_seqno is the most common case;
						 * see gtmsource_readpool() for when the rare case can occur */
						jnl_seqno = jctl->jnl_seqno;
						if (jnl_seqno >= post_read_seqno - 1)
						{
							diff_seqno = (jnl_seqno >= post_read_seqno) ?
									(jnl_seqno - post_read_seqno) : 0;
							repl_log(gtmsource_log_fp, TRUE, TRUE, "REPL INFO - Seqno : "INT8_FMT" "
								INT8_FMTX"  Jnl Total : "INT8_FMT" "INT8_FMTX
								"  Msg Total : "INT8_FMT" "INT8_FMTX"  CmpMsg Total : "
								INT8_FMT" "INT8_FMTX"  Current backlog : "INT8_FMT" "INT8_FMTX"\n",
								log_seqno, log_seqno, repl_source_data_sent, repl_source_data_sent,
								repl_source_msg_sent, repl_source_msg_sent,
								repl_source_cmp_sent, repl_source_cmp_sent, diff_seqno, diff_seqno);
							/* gtmsource_now is updated by the heartbeat protocol every heartbeat
							 * interval. To cut down on calls to time(), we use gtmsource_now as the
							 * time to figure out if we have to log statistics. This works well as the
							 * logging interval generally is larger than the heartbeat interval, and
							 * that the heartbeat protocol is running when we are sending data. The
							 * consequence although is that we may defer logging when we would have
							 * logged. We can live with that given the benefit of not calling time
							 * related system calls. Currently, the logging interval is not changeable
							 * by users. When/if we provide means of choosing log interval, this code
							 * may have to be re-examined. Vinaya 2003, Sep 08
							 */
							assert(0 != gtmsource_now); /* must hold if we are sending data */
							repl_source_this_log_time = gtmsource_now; /* approximate time, in the
												    * worst case, behind by
												    * heartbeat interval */
							assert(repl_source_this_log_time >= repl_source_prev_log_time);
							time_elapsed = difftime(repl_source_this_log_time,
											repl_source_prev_log_time);
							if ((double)GTMSOURCE_LOGSTATS_INTERVAL <= time_elapsed)
							{
								delta_sent_cnt = trans_sent_cnt - last_log_tr_sent_cnt;
								delta_data_sent = repl_source_data_sent
											- repl_source_lastlog_data_sent;
								delta_msg_sent = repl_source_msg_sent
											- repl_source_lastlog_msg_sent;
								repl_log(gtmsource_log_fp, TRUE, FALSE,
									"REPL INFO since last log : "
									"Time elapsed : %00.f  Tr sent : "INT8_FMT" "INT8_FMTX"  "
									"Tr bytes : "INT8_FMT" "INT8_FMTX
									"  Msg bytes : "INT8_FMT" "INT8_FMTX"\n", time_elapsed,
									delta_sent_cnt, delta_sent_cnt, delta_data_sent,
									delta_data_sent, delta_msg_sent, delta_msg_sent);
								repl_log(gtmsource_log_fp, TRUE, TRUE, "REPL INFO since last log : "
									"Time elapsed : %00.f  Tr sent/s : %f  Tr bytes/s : %f  "
									"Msg bytes/s : %f\n", time_elapsed,
									(float)delta_sent_cnt / time_elapsed,
									(float)delta_data_sent / time_elapsed,
									(float)delta_msg_sent / time_elapsed);
								repl_source_lastlog_data_sent = repl_source_data_sent;
								repl_source_lastlog_msg_sent = repl_source_msg_sent;
								last_log_tr_sent_cnt = trans_sent_cnt;
								repl_source_prev_log_time = repl_source_this_log_time;
							}
							lastlog_seqno = log_seqno;
						} /* else an online rollback occurred. Fall-through and the subsequent iteration
						   * will take care of re-establishing the connection
						   */
					}
					/* Because we sent data to the other side and there might be more to be sent across, don't
					 * wait for the receive pipe to be ready.
					 */
					poll_time = REPL_POLL_NOWAIT;
				} else /* data_len == 0 */
				{	/* nothing to send */
					gtmsource_flush_fh(post_read_seqno, true);
					if (GTMSOURCE_HANDLE_ONLN_RLBK == gtmsource_state)
					{
						poll_time = REPL_POLL_NOWAIT;
						break; /* the outerloop will continue */
					}
					if ((READ_POOL == gtmsource_local->read_state)
						&& (jctl->write_addr != jctl->rsrv_write_addr))
					{	/* We found nothing to send in the journal pool but there is some phase2 commit
						 * in progress. Check if it is dead and needs to be cleaned up by noting its
						 * progress across various iterations of this "while" loop.
						 */
						if (phase2_commit_index1 == jctl->phase2_commit_index1)
						{	/* We have been through one iteration of this "while" loop with
							 * an intervening sleep and no changes happening. Do check.
							 */
							phase2_commit_wait_cnt++;
							if (PHASE2_COMMIT_WAIT_CNT <= phase2_commit_wait_cnt)
							{	/* Need to get "grab_lock" in order for "repl_phase2_cleanup"
								 * to invoke "repl_phase2_salvage" logic. So try getting it
								 * in immediate mode (second parameter FALSE below). If not
								 * available, then try again next iteration.
								 */
								if (grab_lock(jnlpool->jnlpool_dummy_reg, FALSE, GRAB_LOCK_ONLY))
								{
									repl_phase2_cleanup(jnlpool);
									rel_lock(jnlpool->jnlpool_dummy_reg);
									phase2_commit_wait_cnt = 0;
								} else
									phase2_commit_wait_cnt--; /* Try again next iteration */
							}
						} else
						{
							phase2_commit_wait_cnt = 0; /* there is progress since we last took note */
							phase2_commit_index1 = jctl->phase2_commit_index1;
						}
					}
					/* Sleep for a while (as part of the next REPL_RECV_LOOP) to avoid spinning when there is no
					 * data to be sent
					 */
					poll_time = GTMSOURCE_IDLE_POLL_WAIT;
				}
			} else /* else tot_tr_len < 0, error */
			{
				assertpro(0 < data_len); /* Else major problems */
				/* Insufficient buffer space, increase the buffer space */
				gtmsource_alloc_msgbuff(data_len + REPL_MSG_HDRLEN, TRUE);
			}
		}
	}
}