File: cluster.h

package info (click to toggle)
virtuoso-opensource 7.2.5.1%2Bdfsg1-0.3
  • links: PTS, VCS
  • area: main
  • in suites: bookworm
  • size: 285,240 kB
  • sloc: ansic: 641,220; sql: 490,413; xml: 269,570; java: 83,893; javascript: 79,900; cpp: 36,927; sh: 31,653; cs: 25,702; php: 12,690; yacc: 10,227; lex: 7,601; makefile: 7,129; jsp: 4,523; awk: 1,697; perl: 1,013; ruby: 1,003; python: 326
file content (1713 lines) | stat: -rw-r--r-- 67,702 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
1356
1357
1358
1359
1360
1361
1362
1363
1364
1365
1366
1367
1368
1369
1370
1371
1372
1373
1374
1375
1376
1377
1378
1379
1380
1381
1382
1383
1384
1385
1386
1387
1388
1389
1390
1391
1392
1393
1394
1395
1396
1397
1398
1399
1400
1401
1402
1403
1404
1405
1406
1407
1408
1409
1410
1411
1412
1413
1414
1415
1416
1417
1418
1419
1420
1421
1422
1423
1424
1425
1426
1427
1428
1429
1430
1431
1432
1433
1434
1435
1436
1437
1438
1439
1440
1441
1442
1443
1444
1445
1446
1447
1448
1449
1450
1451
1452
1453
1454
1455
1456
1457
1458
1459
1460
1461
1462
1463
1464
1465
1466
1467
1468
1469
1470
1471
1472
1473
1474
1475
1476
1477
1478
1479
1480
1481
1482
1483
1484
1485
1486
1487
1488
1489
1490
1491
1492
1493
1494
1495
1496
1497
1498
1499
1500
1501
1502
1503
1504
1505
1506
1507
1508
1509
1510
1511
1512
1513
1514
1515
1516
1517
1518
1519
1520
1521
1522
1523
1524
1525
1526
1527
1528
1529
1530
1531
1532
1533
1534
1535
1536
1537
1538
1539
1540
1541
1542
1543
1544
1545
1546
1547
1548
1549
1550
1551
1552
1553
1554
1555
1556
1557
1558
1559
1560
1561
1562
1563
1564
1565
1566
1567
1568
1569
1570
1571
1572
1573
1574
1575
1576
1577
1578
1579
1580
1581
1582
1583
1584
1585
1586
1587
1588
1589
1590
1591
1592
1593
1594
1595
1596
1597
1598
1599
1600
1601
1602
1603
1604
1605
1606
1607
1608
1609
1610
1611
1612
1613
1614
1615
1616
1617
1618
1619
1620
1621
1622
1623
1624
1625
1626
1627
1628
1629
1630
1631
1632
1633
1634
1635
1636
1637
1638
1639
1640
1641
1642
1643
1644
1645
1646
1647
1648
1649
1650
1651
1652
1653
1654
1655
1656
1657
1658
1659
1660
1661
1662
1663
1664
1665
1666
1667
1668
1669
1670
1671
1672
1673
1674
1675
1676
1677
1678
1679
1680
1681
1682
1683
1684
1685
1686
1687
1688
1689
1690
1691
1692
1693
1694
1695
1696
1697
1698
1699
1700
1701
1702
1703
1704
1705
1706
1707
1708
1709
1710
1711
1712
1713
/*
 *  $Id$
 *
 *  Cluster data structures
 *
 *  This file is part of the OpenLink Software Virtuoso Open-Source (VOS)
 *  project.
 *
 *  Copyright (C) 1998-2018 OpenLink Software
 *
 *  This project is free software; you can redistribute it and/or modify it
 *  under the terms of the GNU General Public License as published by the
 *  Free Software Foundation; only version 2 of the License, dated June 1991.
 *
 *  This program is distributed in the hope that it will be useful, but
 *  WITHOUT ANY WARRANTY; without even the implied warranty of
 *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
 *  General Public License for more details.
 *
 *  You should have received a copy of the GNU General Public License along
 *  with this program; if not, write to the Free Software Foundation, Inc.,
 *  51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
 *
 */

#ifndef _CLUSTER_H
#define _CLUSTER_H

#include "../Dk/Dkhash64.h"
#include "clq.h"

typedef struct cl_message_s cl_message_t;

#ifdef MTX_METER
#define CM_TRACE
#endif

typedef struct cl_status_s
{
  int 		cst_host;
  int		cst_status;
  int		cst_ch_status;
  uint32	cst_ts;
  int64		cst_bytes_sent;
  int64		cst_messages_sent;
  int64		cst_cl_wait;
  int64		cst_txn_messages_sent;
  int64		cst_cl_wait_msec;
  int		cst_threads_running;
  int		cst_threads_waiting;
  int		cst_threads_io;
  int		cst_buffers_wired;
  int		cst_buffers_used;
  int		cst_buffers_dirty;
  int64		cst_read_msec;
  int32		cst_cpu;
  int32		cst_majflt;
  char		cst_status_line[40];
} cl_status_t;


#define CST_EMPTY 0
#define CST_OK 1
#define CST_DOWN 2

extern cl_status_t cl_local_cst;


/* dks_cluster_flags */
#define DKS_TO_CLUSTER 1
#define DKS_LEN_ONLY 2


typedef struct cl_interface_s
{
  char		cin_is_local; /* this process */
  char		cin_disable; /* if unplugged, failed switch  or such */
  char		cin_is_local_host; /* other cluster process on same machine */
  caddr_t	cin_connect_string;
  caddr_t	cin_port_only;
  int		cin_n_connections; /* how many exist */
  short		cin_local_interface; /* when going to remote, through which local interface does it go? */
  resource_t *	cin_connections;
} cl_interface_t;




typedef struct clrb_buf_s
{
  int 		clrb_first_cm;
  db_buf_t 	clrb_buf;
} clrb_buf_t;


typedef struct cl_ring_s
{
  int 		clr_n_buffers;
  int 		clr_fill;
  clrb_buf_t *	clr_buffers;
} cl_ring_t;


typedef struct msg_ctr_s
{
  struct cl_cli_s *	mctr_clses;
  int64 		mctr_conn_id;
  uint32 		mctr_recd;
  uint32 		mctr_sent;
  cl_ring_t 		mctr_clr;
} msg_ctr_t;

#define CIN_REQ 1
#define CIN_REPLY 2
#define CIN_SYNC_REQ 3 /* connection for log shipping.  Does not use cluster protocol. Upon init not put in the pool of connections of the accepting process */
#define CIN_TEST 4 /* test connection, high byte is interface no of target host. The server replies and disconnects */

struct cl_host_s
{
  int32			ch_id;
  caddr_t		ch_name;
  char 			ch_status;
  char			ch_disconnect_in_progress; /* set by listener thread for time of stopping and freeing activity on behalf of this.  Do not allow connects while this is set */
  char			ch_is_local_host; /* same machine, prefer unix domain */
  uint32		ch_offline_since;
  uint32		ch_atomic_since; /* if host in checkpoint or such, do not expect keep alives */
  uint32		ch_last_disconnect_query_sent; /* last msec time local complained about this host being offline */
  uint32		ch_last_disconnect_query_served; /* last msec time a disconnect query about this host was processed */
  cl_interface_t **	ch_interfaces;
  caddr_t		ch_connect_string;
  dk_set_t		ch_replica;
  /* resources kept locally for this host */
  dk_hash_t *		ch_id_to_cm; /*for suspended cm's for this host */
  dk_mutex_t *		ch_mtx;
  dk_hash_t *		ch_closed_qfs;
  dk_set_t		ch_cl_maps;
  uint32 		ch_boot_time;
  cl_status_t		ch_prev_cst;
  cl_status_t		ch_current_cst;
};


typedef struct cl_config_s
{
  int32		ccf_req_batch_size;
  int32		ccf_batches_per_rpc;
  int32		ccf_res_buffer_bytes;
  int32		ccf_cl_threads;
  cl_host_t **	ccf_hosts; /* ((host<n> host:port ...) ...) */
} cl_config_t;


typedef struct cl_trx_s
{
  int64		ctrx_w_id;
  dk_set_t	ctrx_waits_for;
  dk_set_t 	ctrx_waiting_for_this;
} cl_trx_t;


#define CH_ONLINE 0
#define CH_REMOVED 1  /* do not connect to this until otherwise indicated.  Not kept insync, rejoin by special protocol */
#define CH_OFFLINE 2 /* Disconnected, can retry from time to time */
#define CH_SEEK_CONFIG 3
#define CH_RFWD 4 /* roll forward from own log */
#define CH_RFWD_GROUP 5 /* sync by roll forward of log from group peer if partitions in multiple copies */
#define CH_SYNC_SCHEMA 6
#define CH_NONE 7 /* indicates unused host number in a host map string */


typedef struct cl_inxop_s
{
  it_cursor_t *		cio_itc;
  caddr_t *		cio_values;
  int			cio_pos;
  int			cio_values_fill;
  char			cio_is_bitmap:1;
  char			cio_is_local:1;
  dtp_t			cio_dtp;
  short			cio_bm_len;
  int			cio_nth;
  placeholder_t * 	cio_bm_pl;
  db_buf_t		cio_bm;
  bitno_t		cio_bm_start;
  bitno_t		cio_bm_end;
  int			cio_n_results;
  dk_session_t *	cio_strses;
  caddr_t *		cio_local_params;
  caddr_t *		cio_results;
} cl_inxop_t;


typedef struct cl_op_s
{
  char			clo_op;
  int			clo_seq_no; /* seq no in the clrg */
  int			clo_nth_param_row; /* no of the param row of the calling sql node */
  slice_id_t 		clo_slice;		/* if requesting clo, id of targeted partition, can be many per host  */
  data_col_t *		clo_set_no; /* number of the param row in vectored mode */
  mem_pool_t *		clo_pool;
  dk_set_t		clo_clibs; /* the set of hosts serving this */
  union
  {
    struct
    {
      it_cursor_t *	itc;
      caddr_t *		local_params;
      cl_message_t *	gb_cm; /* if reading a group by temp of a remote gb, this is the cm of the qf that has the temp */
      short		max_rows;
      char		is_text_seek; /* for given word and d_id <= x, desc seek and asc read */
      bitf_t		is_started:1; /* if local, indicates whether itc_reset or next */
      bitf_t		any_passed:1; /* for oj when te clo represents a set in a select batch */
      bitf_t		is_null_join:1; /* if oj, mark that the result is null because of null criteria or such */
      bitf_t 		sample_cols:1;	/* return itc_st.cols serialized as 2nd col  */
    } select;
    struct
    {
      short		max_rows;
      caddr_t *		params;
    } select_same;
    struct
    {
      row_delta_t *	rd; /* must be first, same as delete */
      slice_id_t *	slices;
      row_delta_t *	prev_rd; /* if rd non-unq, the conflicting pre-existing row is put here */
      char		ins_mode;
      char		ins_result; /* DVC_MATCH if non-unq */
      char		is_local;
      char 		is_autocommit;
      char 		non_txn;
    } insert;
    struct
    {
      row_delta_t *	rd; /* key and key parts for finding the key to delete, must be first, same as insert  */
      slice_id_t *	slices;
    } delete;
    struct
    {
      query_frag_t *	qf;
      query_t *		qr;
      caddr_t *		params;
      caddr_t *		local_params;
      caddr_t *		qst;
      dk_set_t		alt_inputs; /* a dist frag with no qr can get multiple inputs.  Put them here while waiting for the qr */
      int		coord_host; /* host no of host coordinating the query */
      uint32		coord_req_no; /* req no to use form messages to coordinator */
      uint64 		qf_id;		/* if the qr is not known on recipient, use this to fetch it from the coordinator */
      int 		n_initial_sets;
      short		max_rows;
      char		nth_stage; /* stage no of dist frag to which this is going */
      char		isolation;
      char		lock_mode;
      char		is_started;
      char 		is_update;
      char		is_autocommit;
      bitf_t		is_update_replica:1; /* if set and doing upd/del, do not bother with 2nd keys */
      bitf_t 		merits_thread:1;
    } frag;
    struct
    {
      int 		coord;
      query_frag_t *	qf;
      query_t *		qr;
    } qf_prepare;
    struct
    {
      caddr_t		func;
      caddr_t *		params;
      slice_id_t *	slices;
      dbe_key_t *	key;
      int		u_id;
      char		is_txn;
      char 		non_txn_insert;
      char		is_update;
    } call;
    struct
    {
      int 		filler;
    } blob;
    struct
    {
      caddr_t *		cols;
      data_col_t **	local_dcs;
      int 		n_rows;
      int 		nth_val;		/* if row consists of dcs, this is the index to 1st non-processed value */
    } row;
    struct
    {
      char		type; /*table, proc, ... */
      caddr_t		name;
      caddr_t		trig_table;
    } ddl;
    struct
    {
      caddr_t		err;
    } error;
    struct
    {
      struct itc_cluster_s *	itcl;
    } itcl;
    struct
    {
      cl_status_t * 	cst;
    } cst;
    struct
    {
      cl_inxop_t *	cio;
    } inxop;
    struct
    {
      int		in_batches; /* messages recd for this stage node */
      int 		batches_consumed;
      int 		batches_to_report;
      int 		batches_reported;
      int		result_rows; /* rows in last stage's output set.  Not exact but means some progress */
      int64 *		out_counts; /* per host, how many inputs sent to date */
      int64		sets_completed; /* count of inputs processed to end since start */
    } dfg_stat;
    struct
    {
      int		host;
      char 		from_cl_more;	/* in response to cl more ? */
      struct cl_op_s **	stats;
    } dfg_array;
    struct
    {
      caddr_t		in;
      dk_set_t 		in_list;
      int 		n_comp;
      int		read_to;
      int		bytes;
    } stn_in;
    struct
    {
      int		req_no;
    } dfga;
    struct
    {
      int		op;
      int		master;
      caddr_t *		offline;
    } ctl;
  } _;
} cl_op_t;


#define CLO_HEAD_SIZE ((ptrlong)&((cl_op_t*)0)->_)
#define CLO_ROW_SIZE (CLO_HEAD_SIZE + sizeof (((cl_op_t*)0)->_.row))
#define CLO_CALL_SIZE  (CLO_HEAD_SIZE + sizeof (((cl_op_t*)0)->_.call))



/* is_started in qf clo */
#define QF_STARTED 1
#define QF_SETP_READ 2

struct cl_thread_s
{
  /* data for a thread serving a req from another host */
  cl_queue_t 		clt_queue;
  int64 clt_running_trx_no;	/* if trx no associated to this in cll_trx_thread, this is the trx no, else 0, always consistent with cll_trx_thread, in_cll */
  int 			clt_now_running_coord;	/* for now running, the coord if dfg, the direct requester otherwise */
  uint32		clt_now_running; /* matches cm_req_no */
  int 			clt_clo_start;
  cl_message_t *	clt_current_cm;
  cl_op_t *		clt_clo;
  cl_host_t *		clt_client;
  du_thread_t *		clt_thread;
  dk_hash_t *		clt_rec_dfg;	/* host:req_no of recursives for which this is the queue.  For fast clear */
  client_connection_t *clt_cli;
  dk_session_t *	clt_reply_strses;
  dk_session_t *	clt_reply_ses; /* if a batch sends multiple reply messages, they are all on this ses to keep order */
  dk_mutex_t 		clt_reply_mtx;	/* multiple dfg threads need to share one clt reply ses to send to coordinator, needed for message ordering.  Serialize on this */
  cl_host_t *		clt_disconnected; /* when set by listener, this clt is expected to finish and free all things and threads associated with the disconnected host */
  char			clt_is_error; /* Will  the reply message be with error flag set */
  char			clt_commit_scheduled; /* if a 2pc final is scheduled on this thread, set this flag to mark that this must complete even if the client disconnected */
  char 			clt_is_recursive;
  char 			clt_has_dfg_stat;	/* on returning, set this to indicate that there is a dfg stat inside the reply */
  int			clt_n_bytes; /* total bytes sent by rpc */
  it_cursor_t *		clt_save_itc;	/* allow reuse of itc between selects with same cond */
  int64			clt_n_affected;
  int64			clt_sample_total;
  dk_hash_t *		clt_col_stat;
  cl_thread_t *		clt_top_clt;	/* a recursive cm on a aq thread on non top coord must feed from a real clt, to which the recursive req is bound.  This is the clt for a temp clt 2nd rec clt */
  int 			clt_n_sample_rows;
  int 			clt_n_row_spec_matches;
  uint32		clt_start_time; /* approx time, use for keep alive */
  uint32 		clt_reply_req_no;	/* use for req no in reply if dissociated from the pending cm, as in recursive dfg */
  int 			clt_id;
} ;

/* clt_is_recursive */
#define CLT_TOP_LEVEL 0		/* an original clt running a top level request */
#define CLT_TOP_LEVEL_REC 1	/* a top ;level clt running a recursive cm */
#define CLT_2ND_REC 2		/* a clt that is not on a cluster server thread, temporary, running a recursive on top coord or aq thread */
#define CLT_REPLY_TEMP 3	/* not a clt, only used for sending dfg replies to coord */


typedef struct dc_read_s
{
  int		dre_pos;
  int		dre_bytes;
  int		dre_n_values;
  dtp_t		dre_dtp;
  dtp_t		dre_type;
  dtp_t		dre_non_null;
  dtp_t		dre_any_null;
  db_buf_t	dre_nulls;
  db_buf_t	dre_data;
} dc_read_t;


typedef struct cll_in_box_s
{
  char			clib_is_active; /* results may still come */
  char			clib_waiting; /* true if this clib is part of current wait for multiple */
  char			clib_is_error;
  char			clib_enlist; /* set when transactional clo's are queued, will enlist the branch when sending */
  char			clib_res_type; /* when select, the cm_res_type of last CL_RESULT */
  char			clib_is_update; /* requires 2pc at end of txn because changed something */
  char			clib_is_ac_update; /* autocommitting upd/del */
  char			clib_ac_batch_end; /* if autocommit upd, indicates end of batch.  Must not continue until all are at batch end and the autocommit can be done  */
  char			clib_agg_qf_pending; /* if set, when freeing clib, send a cancel to remote to free unread  temps */
  char			clib_batch_as_reply; /* if set, when sending a batch, use the reply cm op.  This is when going from other to query coordinator in distr frag */
  char			clib_dfg_any; /* set if used for dfg send, means must forward cancellations */
  char			clib_fake_req_no; /* the req no is not registered and expects no answer, so do not rm the clib with the req no when freeing.  Happens with dfg's  */
  char			clib_is_update_replica;
  char 			clib_vectored;		/* a clo_row received will always have dc's */
  char 			clib_local_dfg_advanced;
  char 			clib_is_top_coord;	/* fake clib without a clrg for getting misc recursive calls on top level coordinator thread */
  char 			clib_is_local;
  slice_id_t 		clib_slice;
  int			clib_skip_target_set; /* if would ask for more, start the cl_more at clo with this row no or higher */
  int32			clib_row_low_water;  /*when less than this many and last cm is continuable, ask for more */
  int			clib_batches_requested; /* how many CL_MORE' s + 1 for the initial CL_BATCH */
  int			clib_batches_received; /* how many full batches recd */
  int			clib_batches_read;
  int			clib_rows_done; /* how many done from the batch?  Compare with low water mark */
  int			clib_n_local_rows; /* if local exec, no of rows in teh result dcs */
  uint32		clib_req_no;
  uint32 		clib_base_req_no;	/* req no on top coord for top level cl invocation.  Speeds up finding a descendent clib when scheduling recursive cl op */
  int			clib_n_selects;
  int			clib_n_selects_received; /* if less recd than requested, send a close when freeing the clrg */
  uint32		clib_keep_alive; /* for long running, time of last keep alive from server */
  int64			clib_n_affected; /* upd/del changed row count returned here */
  int64 		clib_alt_trx_no;	/* differentiate from other clibs on the same host */
  cl_host_t *		clib_host;
  query_frag_t *	clib_last_serialized_qf; /* when finding a series of qf invokes remember the qf, even if not yet compiled on remote so as not to send multiple times in one batch */
  struct cl_req_group_s * clib_group;
  cl_queue_t 		clib_in;		/* raw strings from the remote */
  basket_t		clib_in_parsed; /* responses as cl_op_t's */
  int			clib_prev_read_to; /* debug to see prev msg */
  dk_session_t *	clib_req_strses;
  dk_session_t *	clib_out; /* the connect to the server.  If the req is sent in multiple messages, the same ses must be used to guarantee order */
  basket_t		clib_local_clo;
  int			clib_local_bytes; /* how many processed in local batch */
  int			clib_local_bytes_cum; /* metric.  How many bytes done local while remotes pending */
  int 			clib_n_dcs;
  int 			clib_part_n_rows;		/* during partitioning, this many rows to this clib */
  int64 		clib_dc_start;		/* offset in req strses for start of current dc during partitioning */
  mem_pool_t *		clib_local_pool;
  dk_set_t 		clib_vec_clos;
  cl_op_t *		clib_vec_clo;
  dc_read_t *		clib_dc_read;
  caddr_t *		clib_first_row;
  cl_op_t		clib_first;
  struct itc_cluster_s *clib_itcl;
  dk_session_t		clib_in_strses;
  scheduler_io_data_t	clib_in_siod;
} cll_in_box_t;

#define clib_has_data(clib) \
  ( clq_count (&clib->clib_in) || clib->clib_in_parsed.bsk_count	\
   || (clib->clib_in_strses.dks_in_buffer && clib->clib_in_strses.dks_in_read < clib->clib_in_strses.dks_in_fill))

typedef struct cucurbit_s cucurbit_t;

#define CL_MAX_REPLICAS 3

#define CLRG_CSL_CLIB(clrg, csl_id, nth_host) \
  clrg->clrg_slice_clibs[(csl_id) * clrg->clrg_clm->clm_n_replicas + (nth_host)]


typedef struct cl_req_group_s
{
  int			clrg_ref_count;
  char			clrg_wait_all;
  char			clrg_sent; /* true after the first is sent */
  char			clrg_is_error;
  bitf_t 		clrg_is_elastic;
  bitf_t		clrg_keep_local_clo:1; /* when adding local ops, put them in clib_local_clo ? */
  bitf_t		clrg_all_sent:1;
  bitf_t		clrg_select_same:1;  /* all clo's are the same select with different params */
  bitf_t		clrg_is_dfg:1;
  bitf_t		clrg_is_dfg_rcv:1;
  bitf_t		clrg_need_enlist;
  bitf_t		clrg_no_txn:1; /* for clrg call, do not enlist */
  bitf_t		clrg_retriable:2;
  bitf_t		clrg_best_effort:1;
  char			clrg_cm_control; /* send ops with this ored to cm_enlist, e.g. cm_control, cm_start_atomic */
  short 		clrg_dbg_qf;
  uint32		clrg_send_buffered; /* total waiting send in clibs */
  int			clrg_clo_seq_no;
  uint32		clrg_send_time;
  int32			clrg_timeout;
  uint32		clrg_dfg_req_no; /* if running a distr frag, use this req no and this host for cancel */
  int			clrg_dfg_host;
  dk_set_t 		clrg_clibs;
  cll_in_box_t **	clrg_slice_clibs;
  cll_in_box_t **	clrg_host_clibs;	/* can be many clibs per host but only this one is used for sending */
  cluster_map_t *	clrg_clm;
  caddr_t *		clrg_inst;
  cl_message_t *	clrg_rec_continue;
  struct cl_req_group_s *clrg_next_waiting;
  query_frag_t *	clrg_local_qf;	/* if this clrg exes a local qf, some vec ssls thereof will be from the pool of the clrg.  Upon free, these must be reset to have no ref to dc in freed mem */
  update_node_t *	clrg_dml_node;	/* if used for 2nd keys of del/upd */
  dk_mutex_t 		clrg_mtx;
  lock_trx_t *		clrg_lt;
  int64			clrg_trx_no; /* in cases of terminate the lt may be free but must still send cancels with the right id */
  du_thread_t *	volatile clrg_waiting;
  caddr_t		clrg_error; /* sql error struct for connection errors */
  mem_pool_t *		clrg_pool; /* for params */
  struct itc_cluster_s *	clrg_itcl;
  int			clrg_wait_msec;
  /* use for standalone call clrg */
  int			clrg_nth_param_row; /* count calls on standalone clrg */
  int			clrg_nth_set; /* no of sets received. One set per param row */
  int			clrg_n_sets_requested; /* clrg_nth_param_row at the time of last send.  Do not send  more until this many rec'd */
  int			clrg_u_id;
  char			clrg_error_end; /* transaction error in txn daq.  No more results */
  caddr_t *		clrg_param_rows;
  dk_set_t		clrg_last;
  cucurbit_t *		clrg_cu;
  dk_set_t 		clrg_vec_clos;
} cl_req_group_t;

#define CLRG_RETRY_SINGLY 1 /* clibs can be retried individually */
#define CLRG_RETRY_ALL 2 /* if a retry, all clibs retry */

#define ITCL_TRACE_SZ  10

typedef struct itc_cluster_s
{
  bitf_t		itc_in_order:1;
  bitf_t		itcl_desc_order:1;
  bitf_t		itcl_return_pl:1;
  bitf_t 		itcl_is_dfg:1;
  unsigned short 	itcl_n_clibs;
  unsigned short 	itcl_merge_fill;
  int			itcl_nth_set;
  int 			itcl_batch_size;
  int 			itcl_n_results;
  dbe_column_t **	itcl_out_cols;
  dk_set_t		itcl_out_slots;
  query_frag_t *	itcl_dfg_qf; /* the qf if this is the control itcl of a distributed frag */
  clo_comp_t **		itcl_order;
  cl_req_group_t *	itcl_clrg;
  int *			itcl_merge_order;
  caddr_t *		itcl_qst;
  mem_pool_t *		itcl_pool;
  cl_op_t ***		itcl_param_rows;
  cll_in_box_t *	itcl_last_returned; /* when getting non-first  next, pop the top row off this clib first */
  dk_set_t		itcl_last; /* round robin pointer into the clrg_clibs */
  cll_in_box_t *	itcl_local_when_idle; /*when not ordered, run this local clib whenever no remote data */
#ifdef ITCL_TRACE_SZ
  uint32 		itcl_trace_ctr;
  int 			itcl_trace_n_res[ITCL_TRACE_SZ];
  short 		itcl_trace_line[ITCL_TRACE_SZ];
#endif
} itc_cluster_t;

#ifdef ITCL_TRACE_SZ
#define ITCL_TRACE(itcl) \
  {itcl->itcl_trace_n_res[itcl->itcl_trace_ctr % ITCL_TRACE_SZ] = itcl->itcl_n_results; itcl->itcl_trace_line[itcl->itcl_trace_ctr++ % ITCL_TRACE_SZ] = __LINE__; if (itcl->itcl_n_results < 0 || itcl->itcl_n_results > itcl->itcl_batch_size) GPF_T1("itcl n result out of range"); }
#else
#define ITCL_TRACE(itcl)
#endif



#define ITCL_SHOULD_START(itcl, inst, clb) \
  ((QST_INT (inst, clb.clb_fill) == clb.clb_batch_size) || itcl->itcl_clrg->clrg_send_buffered > cl_send_high_water || itcl->itcl_pool->mp_bytes > 10000000 \
   || (0 == QST_INT (inst, clb.clb_fill) % 20  && ((query_instance_t*)inst)->qi_client->cli_anytime_timeout && qi_anytime_send_check (inst)))


#define ITCL_ANYT_DUE(itcl, inst) \
  (itcl->itcl_n_results && ((QI*)inst)->qi_client->cli_anytime_started && itcl_anyt_due (itcl, inst))
int itcl_anyt_due (itc_cluster_t * itcl, caddr_t * inst);

/* itcs_wait_mode */
#define ITCS_WAIT_ALL 1 /* for merging of ordered result streams */
#define ITCS_WAIT_ANY 2 /* for unordered merge of unordered result streams */



typedef struct value_state_s
{
  dk_set_t		vs_references;
  caddr_t		vs_org_value;
  caddr_t		vs_result;
  int			vs_n_steps;
  char			vs_is_value; /* is already translated? */
} value_state_t;



typedef struct cu_return_s
{
  caddr_t	cur_value;
  caddr_t	cur_is_value;
  caddr_t	cur_step[10];
} cu_return_t;


typedef cu_return_t * (*cu_op_func_t) (cucurbit_t * cu, caddr_t arg, value_state_t * vs);


typedef struct cu_func_s
{
  caddr_t	cf_name;
  char		cf_is_upd;
  char		cf_1_arg;
  bitf_t 	cf_is_vec:1;
  bitf_t 	cf_vec_checked:1;
  bitf_t	cf_single_action:1;
  caddr_t	cf_proc;
  cu_op_func_t	cf_dispatch;
  dbe_key_t *	cf_part_key;
  caddr_t	cf_call_bif;
  caddr_t	cf_call_proc;
  sql_type_t *	cf_arg_sqt;
  caddr_t	cf_extra;
} cu_func_t;


typedef struct cu_line_s
{
  cu_func_t *	cul_func;
  id_hash_t *	cul_values;
} cu_line_t;


struct cucurbit_s
{
  char			cu_is_distinct;
  char			cu_is_ordered;
  char			cu_is_in_dp; /* the cu_funcs are owned by a dpipe node */
  char			cu_allow_redo;
  int			cu_nth_set; /* no of result row */
  dk_set_t		cu_lines;
  int			cu_n_cols;
  char			cu_input_funcs_allocd;
  cu_func_t **		cu_input_funcs;
  cl_req_group_t *	cu_clrg;
  dk_hash_t * 		cu_seq_no_to_vs;
  caddr_t *		cu_rows;
  int *			cu_vec_set_nos; /* if vectored with sparse inputs, correlates input row no to col position for the result. */
  int			cu_fill;
  basket_t		cu_ready;
  caddr_t *		cu_qst; /* for duration of a dipipe_next call */
  void			(*cu_ready_cb) (cucurbit_t* cu, caddr_t * row);
  int			cu_n_redo; /* debug ctr */
  char			cu_rdf_load_mode;
  caddr_t 		cu_rdf_last_g;
  dk_hash_t *		cu_key_dup;
  id_hash_t *		cu_ld_graphs;	/* distinct graphs */
  dk_set_t 		cu_ld_rows;		/* resolved rows of ids */
  caddr_t *		cu_cd;
};

#if (SIZEOF_VOID_P == 4)
#define DPIPE_MAX_ROWS 0xfffff
#else
#define DPIPE_MAX_ROWS INT32_MAX
#endif

#if (SIZEOF_CHAR_P == 4)
#define DPIPE_MAX_LANES (4096 - 3)
#else
#define DPIPE_MAX_LANES (0xffff - 3)
#endif

#define RDF_LD_MULTIGRAPH 2
#define RDF_LD_DELETE 3
#define RDF_LD_MASK 3
#define RDF_LD_DEL_INS 4
#define RDF_LD_DEL_GS 8
#define RDF_LD_INS_GS 16

#define CU_ALLOW_REDO 4
#define CU_NO_TXN 2
#define CU_ORDERED 1

typedef struct cl_listener_s
{
  dk_hash_64_t *	cll_id_to_trx; /* serialized on wi_txn_mtx */
  dk_hash_64_t *	cll_w_id_to_trx; /* serialized on wi_txn_mtx */
  dk_hash_64_t *	cll_dead_w_id; /* The w ids of txns transacted in the last few minutes. Don't du stuff or record things about these.  in txn mtx. */
  dk_hash_64_t *	cll_trx_thread;
  dk_hash_t *		cll_id_to_clib;
  dk_hash_64_t *	cll_rec_dfg;	/* For each recursive dfg, coord:req_no -> clt that feeds it.  Low bit of clt set if thread presently running */
  dk_mutex_t *		cll_mtx;
  dk_set_t		volatile cll_clients;
  dk_session_t *	cll_self; /* other threads on this host use this to signal the cluster listener thread */
  cl_host_t *		cll_local;
  volatile short *	cll_interface_threads; /* for each interface, how many threads writing */
  dk_session_t *	cll_listen;
  int32			cll_this_host;
  int32			cll_master_host;
  volatile uint32	cll_next_req_id;
  cl_host_t **		cll_master_group; /* masters in precedence order. */
  dk_set_t 		cll_cluster_maps;
  dk_hash_t *		cll_id_to_host;
  dk_hash_64_t *	cll_id_to_qf; /* query frags sent here by others */
  dk_hash_t *		cll_qf_hosts; /* hosts having each query frag sent by this */
  dk_hash_t *		cll_local_qf; /* local distr qfs by local id, for serving distr frags where remote asks for the qf */
  dk_hash_64_t * 	cll_replayed_w_ids;
  int64			cll_atomic_trx_id; /* if server in atomic mode, only this trx id is allowed */
  du_thread_t *		cll_atomic_owner;
  int32			cll_max_host; /* highest used host no */
  char			cll_no_ddl; /* during add of nodes, ddl is forbidden */
  char			cll_is_master; /* inh master group, must have master functions inited and in sync for fallback */
  char			cll_is_flt; /* some logical clusters in multiple copies, fault tolerannt.  Extra logging and 2pc consensus protocols on */
  char			cll_need_network_check; /* if one interface to peer works and the other not, should check  */
  char			cll_is_map_uncertain; /* between change of map and commit of same */
  uint32		cll_synced_time; /* msec time of receiving sync confirmation for flt rejoin */
  uint32		cll_no_disable_of_unavailable;
} cl_listener_t;

typedef int (*cl_ready_t) (dk_session_t * ses);

typedef struct cl_cli_s
{
  cl_ready_t 		clses_ready;
  cl_host_t *		clses_host;
  cl_interface_t *	clses_interface;
  caddr_t *		clses_stat_inst;
  int 			clses_seq_no;
  int 			clses_remove_line;
  char			clses_status;
  char			clses_is_log_sync;
  char 			clses_reading_req_clo;	/* if reading a dc (dv data) do not allocate it, return only a place marker, will be read later into the right mp */
  char			clses_held_for_write; /* a thread has checked this out and uses this to write a seq message to the host */
  cl_thread_t *		clses_clt; /* when serving req from this client, ref back to trx and other data for blob read */
  cl_thread_t *		clses_reply_ses_of;	/* ref back to clt if this ses is the reply ses of it, in case of reconnect */
  cl_message_t *	clses_head_cm;	/* if multiple cms are read before scheduling due to dfg follows flag, this is the first, the following ones are added to cm_extra of this */
  cl_message_t *	clses_reading_cm;
  char 			clses_read_state;
  int 			clses_bytes_needed;
  int 			clses_bytes_received;
  int 			clses_in_fill;
  int 			clses_in_size;
  int 			clses_in_parsed;
  int 			clses_n_comp_entries;
  int64 		clses_conn_id;
  msg_ctr_t *		clses_mctr;
  int 			clses_last_wrtn;
  int 			clses_first_cm;		/* in out buffer start of 1st cm, -1 if none starts in out buffer */
} cl_ses_t;

#define CLSES_NO_INIT 0 /* not logged in */
#define CLSES_OK 1 /* operational */
#define CLSES_DISCONNECTED 2 /* a read has failed.  Not yet free because a thread may hold this for write */


#define DKS_CL_DATA(ses)  (*((cl_ses_t **)&(ses)->dks_cluster_data))
#define DKS_QI_DATA(ses)  (*((query_instance_t **)&(ses)->dks_object_temp))

typedef struct cl_self_message_s
{
  void (*cls_func) (void* arg);
  void * 	cls_cd;
} cl_self_message_t;


/* Top level messages */
#define CL_INIT  0 /* client host id, pwd, version */
#define CL_BATCH  1
#define CL_RESULT 2
#define CL_MORE 3 /* batch id, sends more replies */
#define CL_ROLLBACK 4
#define CL_COMMIT 5
#define CL_PREPARE  6
#define CL_CANCEL  7
#define CL_WAIT_STATE 8  /* local tx waits for global deadlock detection */
#define CL_DISCONNECT 9 /* send for orderly shut of connection, else disconnect is considered host failure. */
#define CL_STATUS 10
#define CL_SELF_INIT 11
#define CL_1PC_COMMIT 12 /* a distr. commit with 1 changed remote */
#define CL_PING 13
#define CL_QF_PREPARED 14
#define CL_QF_FREE 15
#define CL_TRANSACTED 16  /* notification of completed commit /rb for the monitor */
#define CL_1PC_EXPEDITE 17 /* when killed during 1pc reply wait, use this message to make sure the 1pc does not hang.  */
#define CL_SEQ 18
#define CL_GET_CONFIG 19
#define CL_RECOV 20 /*during 2pc recov cycle,  did cm_trx commit? */
#define CL_BLOB 21
#define CL_BLOB_RESULT 22
#define CL_WAIT_QUERY 23 /* request to send all local wait edges to master */
#define CL_WAIT_QUERY_REQ 24
#define CL_GET_QF 25 /* when running distr frag and having no qf, this gets the query_t from the coordinator */
#define CL_COMMIT_MON 26 /* combines final commit and notify to monitor */
#define CL_KEEP_ALIVE 27 /* inform that the following req nos and lt w ids are pending */
#define CL_ATOMIC 28  /* signal or ask if a host is in atomic state */
#define CL_TXN_QUERY 29  /* Internal, to schedule async exec of final commit check for lt's refd in a keep alive.  Will send cl commits and rbs to the concerned */
#define CL_SEQ_REPL 31 /* message replicate seq ranges to standby master */
#define CL_ROLLBACK_SYNC 32 /* internal, indicate that a reply is expected for the rollback */
#define CL_DISCONNECT_QUERY 33  /* request that the master check availability of node and eventually disable it */
#define CL_ADMIN 34 /* internal admin action like resync or disable of failed */
#define CL_SET_BLOOM 35 /* shipping bloom filter for partitioned hash table */

   /* enlist flag for CL_ATATOMIC */
#define  CL_AC_SYNC 3
#define  CL_COL_AC_SYNC 4


/* or'ed to clo_seq_no of batch or set end to indicate db_activity_t for used resources follows */
#define CL_DA_FOLLOWS 0x80000000

/* Opcodes in batches */

#define CLO_NONE 0 /* in a static clo, this means there is no data.  Mist be 0 */
#define CLO_SELECT  22
#define CLO_INSERT 1
#define CLO_INSERT_SOFT 2
#define CLO_INSERT_REPL 3
#define CLO_DELETE 4
#define CLO_DELETE_PLACE 5
#define CLO_INXOP 6
#define CLO_CALL 9
#define CLO_BLOB  10
#define CLO_ROW 11 /* result set row */
#define CLO_BATCH_END 12 /* ask for more to get next batch of results */
#define CLO_SET_END 13 /* end of result set for present cl op */
#define CLO_DDL 14  /* shared schema info changed */
#define CLO_ERROR 15
#define CLO_ITCL 16 /* not a message.  A container for a local itc_cluster_t */
#define CLO_SELECT_SAME 18 /* like first CLO_SELECT, except for params.  So only params are given.  */
#define CLO_NON_UNQ 20 /* reply to non unq insert soft */
#define CLO_QF_PREPARE 21  /* sending a compilation frag */
#define CLO_QF_EXEC 23 /* exec a query frag */
#define CLO_AGG_SET_END 24 /* ret val when aggregating qf done, means that the qf must be left registered even though this is not an end of batch */
#define CLO_AGG_END 25 /* end of a full batch of aggregate inputs.  Send when the whole batch is processed, no set-by-set results */
#define CLO_STATUS 26
#define CLO_DFG_STATE 28 /* record with counts of processed and forwarded sets in a distributed frag */
#define CLO_STN_IN 29 /* input string and state for a dist frag */
#define CLO_DFG_ARRAY 30
#define CLO_DFG_AGG 31  /* request for simple aggregate result from dist frag */
#define CLO_CONTROL 32

/* action codes for ddl messages */
#define CLO_DDL_TABLE 1
#define CLO_DDL_PROC 2
#define CLO_DDL_CLUSTER 3
#define CLO_DDL_TYPE 4
#define CLO_DDL_TRIG 5
#define CLO_DDL_ATOMIC 6
#define CLO_DDL_ATOMIC_OVER 7

/* flags as 1st col of result row of clo_call */
#define CLO_CALL_ROW 1
#define CLO_CALL_RESULT 2
#define CLO_CALL_ERROR 3


/* select.is_text_seek */
#define CLO_TEXT_NONE 0 /* not a text inx op */
#define CLO_TEXT_INIT 1
#define CLO_TEXT_SEEK 2
#define CLO_TEXT_SEEK_NEXT 3
#define CLO_TEXT_SEEK_UNQ 4
#define CLO_TEXT_SAMPLE 5 /* not a text lookup at all but a stats sample */

/* CLO_CONTROL opcodes */
#define CLO_CTL_OFFLINES 1 /* notify list of offline hosts */
#define CLO_CTL_SEQ_REPL 2 /* replicate seq ranges between master and spare */
#define CLO_CTL_ONLINE 3 /* send to master, expect list of offlines as an async reply */


#ifdef MTX_METER
#define MTX_TS_SET_2(f, m, fl) \
  f = m->mtx_enters | (((int64)(fl)) << 32)
#else
#define MTX_TS_SET_2(f, m, fl)
#endif


struct cl_message_s
{
  char			cm_op;
  char			cm_is_error;
  slice_id_t 		cm_slice;		/* slice and req no in 1st word of cm */
  uint32 		cm_seq_no;
  uint32 		cm_req_no;
  uint32 		cm_dfg_agg_req_no;	/* 2nd req no for dfg agg reply when first req no is set to a dfg req no */
  int 			cm_n_comp_entries;
  char			cm_enlist; /* the trx id is to be handled with 2pc.  The lt is for use with this id only */
  char 			cm_req_flags;
  char			cm_cancelled; /* must read the incoming stuff from cm_client but must not do anything else */
  char			cm_registered; /*if suspended waiting for CL_MORE */
  char			cm_res_type; /* whether more is coming from same server */
  dtp_t 		cm_dfg_stage;
  unsigned short 	cm_from_host;
  unsigned short 	cm_to_host;
  unsigned short 	cm_dfg_coord;
  char 			cm_ts[DT_LENGTH];
  int64			cm_bytes;
  int64 		cm_uncomp_bytes;	/* if compressed, uncompressed length */
  int64			cm_trx;
  int64			cm_trx_w_id;
  int64 		cm_main_trx;		/* a rc read of a can have many threads on a server and all but the first will have an alt trx no since only one thread per trx no is allowed. If low on threads, can also queue on the thread of cm_trx */
  dk_session_t *	cm_strses;
  cl_call_stack_t *	cm_cl_stack;
  caddr_t		cm_in_string; /* str box containing message if message was read by dispatch thread */
  dk_set_t 		cm_in_list;
  int			cm_in_read_to; /* when suspended between select batches, pointer to next clo in in string */
  int			cm_anytime_quota;
  cl_op_t *		cm_pending_clo; /* Holds select clo when suspended between batches */
  dk_session_t *	cm_client; /* the client detached from served set, worker thread must read the stuff and return this to served set */
  lock_trx_t *		cm_lt; /* While registered, remember the lt so that continuable qi's and itc's keep the lt across transacts */
  cl_thread_t *		cm_clt;		/* for dbg, if reg'd cm running on a clt */
  cl_message_t **	cm_extra_cm;	/* when many consec cms together because of dfg follows, this is the array of non first cms */
  int cm_n_extra;

#ifdef MTX_METER
  uint64 		cm_id;
  char 			cm_in_dfg_cont;
#endif
};

#define cm_n_affected cm_trx_w_id  /* dual use.  n_affected on CL_RESULT */

/* cm_enlist */
#define CM_ENLIST 1
#define CM_LOG 2
#define CM_COMMIT_RESEND 4
#define CM_ENTER_ALWAYS 8 /* for ops that have to go through despite cpt or atomic, like seq range replication */
#define CM_CONTROL 16 /* if set, dispatch the op even if recipient not fully online */
#define CM_START_ATOMIC 32 /* at transaction enter, set the host to atomic if not yet, proceed then to rb */
#define CM_DIRECT_IO 64 /* the connection will continue with other protocol, the server func will read and write directly to this connection */

/* cm_req_flags */
#define CMR_RC_PARALLEL 1	/* read committed on a slice, can have many per txn */
#define CMR_DFG_FOLLOWS 2
#define CMR_DFG_PARALLEL  4
#define CMR_DFG_CLOSE 8		/* final cancel message for a value dfg, means that the closing dfg does not need to forward the close, the coordinator will close all */
#define CMR_DFG 16		/* This is a dfg, special treatment if recursive, no more than 1 thread per dfg */
#define CMR_MARKED_REC 32
#define CMR_FWD_NO_STACK_TOP 64


#define CL_REC_RUNNING 1	/* or'ed to entry in cll_rec_dfg to indicate a thread presently executing for the rec batch */
#define CL_REC_CANCEL 2		/* or'ed to entry in cll_rec_dfg to indicate cancellation of running recursive batch */

/* cm_res_type */
#define CM_NO_RES 0
#define CM_RES_INTERMEDIATE 1
#define CM_RES_FINAL 2
#define CM_RES_CONTINUABLE 3
#define CM_RES_CANCELLED 4 /* means that the clib is out of the waiting set and can't get results or even timeout */


/* api */





#define CLUSTER_PWD "clu"

#define CL_NO_THREADS "CLNTH"
/* values of clib_error and return codes */
#define CLE_OK 0
#define CLE_DISCONNECT 1 /* remote party dead */
#define CLE_SQL 2 /* misc sql error, later in the message */
#define CLE_TRX 3


void clrg_dml_free (cl_req_group_t * clrg);
int  clrg_destroy (cl_req_group_t * clrg);
cl_req_group_t * clrg_copy (cl_req_group_t * clrg);
void clrg_set_lt (cl_req_group_t * clrg, lock_trx_t * lt);
int clrg_wait (cl_req_group_t * clrg, int mode, caddr_t * qst);
#define CLRG_WAIT_ANY 0
#define CLRG_WAIT_ALL 1
int clrg_wait_1 (cl_req_group_t * clrg, int wait_all);
cl_op_t * clo_allocate (char op);
cl_op_t * clo_allocate_2 (char op);
cl_op_t * clo_allocate_3 (char op);
cl_op_t * clo_allocate_4 (char op);
void clo_local_copy (cl_op_t * clo, cl_req_group_t * clrg);
int clo_destroy  (cl_op_t * clop);
cl_op_t * clib_first (cll_in_box_t * clib);
cl_message_t * cl_deserialize_cl_message_t (dk_session_t * in);
cl_host_t * cl_name_to_host (char * name);
itc_cluster_t * itcl_allocate (lock_trx_t * lt, caddr_t * qst);

extern int32 cl_req_batch_size;
extern int32 cl_dfg_batch_bytes;
extern uint32 cl_send_high_water;
extern int32 cl_batches_per_rpc; /* no of rows to send before stopping to wait for a CL_MORE message */
extern int32  cl_res_buffer_bytes; /* no of bytes before sending to client */
extern long dbf_branch_transact_wait;
extern int32 cl_wait_query_delay;

int clrg_qf_send (cl_req_group_t * clrg);
int  clrg_send (cl_req_group_t * clrg);

void clrg_send_slices (cl_req_group_t * clrg);
int clo_serialize (cll_in_box_t * clib, cl_op_t * clo);
void cl_table_source_input (table_source_t * ts, caddr_t * inst, caddr_t * state);
void cl_insert_node_input (insert_node_t * ins, caddr_t * inst, caddr_t * state);

extern cl_listener_t local_cll;
extern resource_t * cl_strses_rc;
extern int64 cl_cum_messages;
extern int64 cl_cum_txn_messages;
extern int64 cl_cum_bytes;
extern int64 cl_cum_wait;
extern int64 cl_cum_wait_msec;
cl_op_t * cl_deserialize_cl_op_t  (dk_session_t * in);
cl_req_group_t * cl_req_group (lock_trx_t * lt);
int clrg_result_array (cl_req_group_t * clrg, cl_op_t ** res, int * fill_ret, int max, caddr_t * qst);

/*
resource_t * cl_str_1;

resource_t * cl_str_2;
resource_t * cl_str_3;
*/
void cl_msg_string_free (caddr_t str);

void cm_free (cl_message_t * cm);
int cl_process_message (dk_session_t * ses, cl_message_t * cm);
void cl_self_cm_srv (void* cmv);
void clt_process_cm (cl_thread_t * clt, cl_message_t * cm);
void cm_send_reply (cl_message_t * cm, cl_op_t * reply, caddr_t err);
extern du_thread_t * cl_listener_thr;
void  cls_rollback (dk_session_t * ses, cl_message_t * cm);
cl_host_t * cl_id_to_host (int id);
cluster_map_t * cl_name_to_clm (char * name);
void cl_itc_free (it_cursor_t * itc);
void itc_cl_row (it_cursor_t * itc, buffer_desc_t * buf);
dk_session_t * ch_get_connection (cl_host_t * ch, int op, caddr_t * err_ret);
int  itc_insert_rd (it_cursor_t * itc, row_delta_t * rd, buffer_desc_t ** unq_buf);
int itc_rd_cluster_blobs (it_cursor_t * itc, row_delta_t * rd, mem_pool_t * ins_mp);


/**add vec */
void ts_ensure_fs_part (table_source_t * ts);
void clrg_call_flush_if_due (cl_req_group_t * clrg, query_instance_t * qi, int anyway);
void chash_cl_init ();
caddr_t daq_call_1 (cl_req_group_t * clrg, dbe_key_t * key, caddr_t fn, caddr_t * vec, int flags, int * first_seq_ret, caddr_t * host_nos);
extern dk_mutex_t cl_chash_mtx;
extern dk_hash_t cl_id_to_chash;
extern int enable_itc_dfg_ck;


void ks_set_dfg_queue_f (key_source_t * ks, caddr_t * inst, it_cursor_t * itc);
void rb_dfg_flags (rbuf_t * rb, void *elt);
void cl_send_from_cll (cl_host_t * ch, cl_message_t * cm, int always_queue);
rbuf_t *qi_slice_queue (caddr_t * slice_inst, stage_node_t * stn);
extern resource_t *cll_rbuf_rc;
extern int64 cll_entered;
extern int64 cll_lines[1000];
extern int cll_counts[1000];
extern dk_hash_t cl_waiting_clrgs;
extern dk_mutex_t clrg_wait_mtx;


void cl_cancel_waiting (int64 cancel_w_id, int host, int req_no);
void clbing2 ();
extern dk_mutex_t *cl_reply_mtx;
extern semaphore_t *cl_reply_sem;
extern basket_t cl_reply_queue;
void cl_reply_init ();
int cm_may_compress (cl_message_t * cm, cl_host_t * ch);
void strses_compressed_write_out (dk_session_t * strses, dk_session_t * out);
void cm_uncompress (cl_message_t * cm);
void cl_uncompress_in_string (db_buf_t ** str_ret, int64 non_comp_bytes, dk_set_t more, int n_comp);
int cl_read_frag_c (dk_session_t * ses);

#define CM_OFFBAND 1
#define CM_BLOCK 2
#define CM_COMPLETE 3
#define CM_DISCONNECT 4


extern int enable_dfg_follows;
int clm_has_slice (cluster_map_t * clm, cl_host_t * ch, slice_id_t slid);
cl_message_t * clib_dfg_cm (cll_in_box_t * clib, int is_reply, int is_first_stn, stage_node_t * stn);
int clo_frag_is_empty (cl_op_t * clo);
int clib_frag_is_empty (cll_in_box_t * clib);
int clrg_dfg_send_g (cl_req_group_t * clrg, int coord_host, int64 * bytes_ret, int is_first_stn, stage_node_t * stn);

extern timeout_t boot_time;
void clses_set_mctr (cl_ses_t * clses, msg_ctr_t * mctr);
int tcpses_write (session_t * ses, char *buf, int n_out);
extern int enable_clrel;
void clrel_send_ses (dk_session_t ** ses_ret, dk_session_t * ses, cl_message_t * cm, int flush);
int clrel_buf_full (session_t * ses, char *bytes, int n_bytes);
int clrel_flush (session_t * ses, char *buffer, int n_bytes);
void clrel_mark_cm_start (dk_session_t * dks, cl_message_t * cm);
extern resource_t *cl_buf_rc;
msg_ctr_t *mctr_by_id (uint64 id);
void mctr_init ();

cll_in_box_t *clrg_ensure_single_clib (cl_req_group_t * clrg);
void cm_handle_rec_cancel (cl_thread_t * queue_clt, cl_queue_t * bsk, cl_message_t * cm, int in_cll, int line);
void cls_vec_del_rd_layout (row_delta_t * rd);
extern int c_cl_no_unix_domain;

//#define AGG_TRACE
#ifdef AGG_TRACE
void cl_agg_trace_f (ptrlong req_no, int stage);
#define cl_agg_trace(n) cl_agg_trace_f (n, __LINE__)
#else
#define cl_agg_trace(r)
#endif


void cl_clear_dup_cancel ();
cl_thread_t *cli_claq_clt (client_connection_t * cli);
cl_slice_t *clm_id_to_slice (cluster_map_t * clm, slice_id_t slid);
void cl_init_dae_key ();
void cl_dae_blobs (query_instance_t * qi, state_slot_t ** ssls);
void cl_ses_set_options (session_t * ses);
int clm_is_colocated (cluster_map_t * clm1, cluster_map_t * clm2);
slice_id_t clm_slice_in_host (cluster_map_t * clm, int ch);
extern uint32 cl_last_ac_sync;
extern int32 cl_ac_interval;
void qi_free_dfg_queue (query_instance_t * qi, query_t * qr);
int clo_any_for_slice (cl_op_t * clo);
void clo_row_dc_reset (cl_op_t * clo);
int cl_qi_kill (cl_thread_t * clt, query_instance_t * qi);
#define DFG_ID(coord, req_no)  ((((int64)coord) << 32) | req_no)

extern int enable_rec_dfg_print;
#define rdfg_printf(a) {if (enable_rec_dfg_print) printf a; }
void clib_send_qf (cll_in_box_t * clib, cl_op_t * clo);
int cm_is_running_rec_dfg (cl_message_t * cm, cl_thread_t * clt);
int cm_rec_dfg_done (int coord, uint32 req_no, cl_thread_t * clt, char *file, int line);
int cm_dfg_coord (cl_message_t * cm);

#define QR_MAX_REFS  10000
/* max no of slices per host times max no of concurrent instances per slice */

void qf_check_batch_sz (query_frag_t * qf, caddr_t * inst);
int dfg_fetch_qr (uint64 qf_id, query_t ** qr_ret, cl_thread_t * clt);
int dfg_fetch_qr_local (uint64 qf_id, query_t ** qr_ret, cl_thread_t * clt);
void qf_assign_id (query_frag_t * qf);
stage_node_t **stn_array (dk_set_t nodes, int n_stages);
client_connection_t *cl_cli ();
void clib_add_local_error (cll_in_box_t * clib, caddr_t err);
void basket_delete (basket_t * head, basket_t ** elt_ret);
void da_add_enlist (db_activity_t * da, int host, int change);
void cli_receive_da_enlist (client_connection_t * cli, db_activity_t * da);
void lt_ensure_branch (lock_trx_t * lt, int host, int change);
int clst_is_sib_or_desc (cl_call_stack_t * inner, cl_call_stack_t * outer);
void clrg_top_check (cl_req_group_t * clrg, query_instance_t * top_qi);
void cli_cl_push (client_connection_t * cli, int host, int req_no);
void cli_cl_pop (client_connection_t * cli);
void cli_free_stack (client_connection_t * cli);
void cl_serialize_st_cols (dk_session_t * ses, dk_hash_t * cols);
caddr_t clo_detach_error (cl_op_t * clo);
int qi_n_cl_aq_threads (query_instance_t * qi);
int clo_frag_n_sets (cl_op_t * clo);
void cl_fref_local_result (query_instance_t * qi, query_frag_t * qf, state_slot_t * slice_qis, int is_final);
int dfg_is_slice_continuable (stage_node_t * stn, query_instance_t * slice_qi);
int dfg_feed (stage_node_t * stn, caddr_t * inst, cl_queue_t * bsk);
void dfg_after_feed ();
void clib_dfg_coord_req (cll_in_box_t * clib);
#define ASSERT_IN_CLL \
  ASSERT_IN_MTX (local_cll.cll_mtx);

void cl_dfg_run_local (stage_node_t * stn, caddr_t * inst);
caddr_t *stn_add_slice_inst (state_slot_t * slice_qis, query_frag_t * qf, caddr_t * inst, int coordinator, slice_id_t slice,
    int is_in_cll);
caddr_t *stn_find_slice (state_slot_t * slice_qis, caddr_t * inst, slice_id_t slice);
void qf_new_results (query_frag_t * qf, caddr_t * inst, itc_cluster_t * itcl);
void qf_out_sets (query_frag_t * qf, caddr_t * inst);
void clrg_cancel (cl_req_group_t * clrg);

#define CM_SET_QUOTA(clrg, cm, time)					      \
  if (clrg->clrg_lt && clrg->clrg_lt->lt_client->cli_anytime_timeout) \
    cm_set_quota (clrg, cm, time);

void cm_set_quota (cl_req_group_t * clrg, cl_message_t * cm, int time);
table_source_t *qn_loc_ts (data_source_t * qn, int must_have);
extern dbe_key_t *cl_blob_dae_key;
void cli_set_slice (client_connection_t * cli, cluster_map_t * clm, slice_id_t slice, caddr_t * err_ret);
dbe_storage_t *dbs_open_slices (char *name);
cl_host_group_t *clm_find_chg (cluster_map_t * clm, int ch_id);
void clrg_target_clm (cl_req_group_t * clrg, cluster_map_t * clm);
query_t *cl_ins_del_qr (dbe_key_t * key, int op, int ins_mode, caddr_t * err_ret);
void clrg_daq_dml_send (cl_req_group_t * clrg);
cll_in_box_t *clrg_csl_clib (cl_req_group_t * clrg, cl_host_t * host, cl_slice_t * slice);
void cls_vec_insert (cl_thread_t * clt, cl_op_t * clo, int is_local);
void cls_vec_delete (cl_thread_t * clt, cl_op_t * clo, int is_local);

int clt_is_update_replica (cl_thread_t * clt);
void ik_ins_del_partition (cl_req_group_t * clrg, ins_key_t * ik, int op, caddr_t * inst);
cl_req_group_t *dml_clrg (delete_node_t * del, caddr_t * inst);
int cl_dml_send (cl_req_group_t * clrg);

void itc_local_partition_param_sort (key_source_t * ks, it_cursor_t * itc, ins_key_t * ik, slice_id_t slid, caddr_t * part_inst,
    int n_sets);
int clt_param_dcs (mem_pool_t * mp, caddr_t * params, char is_vectored_proc_call);
void cl_row_append_out_cols (itc_cluster_t * itcl, caddr_t * inst, cl_op_t * clo);
void clib_vec_read_into_clo (cll_in_box_t * clib);
void clib_vec_read_into_slots (cll_in_box_t * clib, caddr_t * inst, dk_set_t slots);
int qf_param_dcs (query_t * qr, caddr_t * inst, mem_pool_t * mp, caddr_t * params);
void dc_serialize (data_col_t * dc, dk_session_t * ses);
void sslr_dc_serialize (dk_session_t * ses, caddr_t * inst, state_slot_t * sslr, int n_rows, int dbg_qfs, int dbg_col);
int dc_serialize_sliced (dk_session_t * ses, data_col_t * dc, slice_id_t * slices, slice_id_t target_slice);

data_col_t *dc_deserialize (dk_session_t * ses, dtp_t dtp);

void ks_vec_partition (key_source_t * ks, itc_cluster_t * itcl, data_source_t * qn, cl_op_t * clo);
void cl_vec_insert (insert_node_t * ins, caddr_t * inst);
void cl_ins_sync (insert_node_t * ins, caddr_t * inst);
int cl_ins_set_local_mask (caddr_t * inst, ssl_index_t set_mask_slot, ins_key_t * ik, int n_sets, slice_id_t slid,
    caddr_t * part_inst);

#define DO_LOCAL_CSL(csl, clm)						\
  { int __ci; 								\
  if (clm->clm_local_chg) { \
    DO_BOX (cl_slice_t *, csl, __ci, clm->clm_local_chg->chg_hosted_slices)

#define END_DO_LOCAL_CSL \
  END_DO_BOX; } }


int clrg_add_transact_1 (cl_req_group_t * clrg, cl_host_t * host, int op, caddr_t args);
#define clrg_add_transact(c,h,o) clrg_add_transact_1 (c, h, o, NULL)
int clrg_add (cl_req_group_t * clrg, cl_host_t * host, cl_op_t * clop);
int clrg_add_slice (cl_req_group_t * clrg, cl_host_t * host, cl_op_t * clop, slice_id_t slid);
void cl_serialize_cl_message_t (dk_session_t * out, cl_message_t * cl);
void cl_serialize_cl_op_t (dk_session_t * ses, cl_op_t * clo);
void  cl_serialize_db_activity_t (dk_session_t * out, db_activity_t * s);
void cl_deserialize_db_activity_t (dk_session_t * in, caddr_t * inst);

void clib_parse (cll_in_box_t * clib);
void cluster_init ();
void cluster_schema ();
void cluster_built_in_schema ();
void cluster_listen ();
void cluster_online ();
void cluster_after_online ();

void cls_transact (cl_thread_t * clt, cl_message_t * cm);
int64 read_boxint (dk_session_t * ses);
void clib_local_next (cll_in_box_t * clib);
int rd_is_blob (row_delta_t * rd, int nth);
void  cl_check_in (dk_session_t * ses);
void cl_check_out (dk_session_t * ses);
void cl_self_signal (void (*f)(void* _cd), void* cd);

#ifdef MTX_METER
#define TRY_CLL cll_try_enter ()
int cll_try_enter ();
#define IN_CLL {mutex_enter (local_cll.cll_mtx); cll_entered = rdtsc ();}
#define LEAVE_CLL { cll_counts[__LINE__ % 1000] ++; cll_lines[__LINE__ % 1000] += rdtsc () - cll_entered;  mutex_leave (local_cll.cll_mtx);}
#else
#define TRY_CLL mutex_try_enter (local_cll.cll_mtx)
#define IN_CLL mutex_enter (local_cll.cll_mtx)
#define LEAVE_CLL mutex_leave (local_cll.cll_mtx)
#endif

#define IN_CLL_SR \
  {if (!cll_stay_in_cll_mtx) IN_CLL;}
#define LEAVE_CLL_SR \
  {if (!cll_stay_in_cll_mtx) LEAVE_CLL;}


void cl_write_done (dk_session_t * ses);
caddr_t clrg_error_ck (cl_req_group_t * clrg, int rc, int signal);
void itcl_error_ck (itc_cluster_t * itcl, int rc);
int cl_transact (lock_trx_t * lt, int op, int trx_free);
void cl_rollback_local_cms (lock_trx_t * lt);

#define CLO_HEAD(ses, op, seq, nth_param_row)			\
{ \
  session_buffered_write_char (op, ses); \
  print_int (seq, ses); \
  print_int (nth_param_row, ses); \
}

void cl_send_db_activity (dk_session_t * ses);

#define CLO_HEAD_STAT(ses, op, seq, nth_param_row)			\
{ \
  session_buffered_write_char (op, ses); \
  print_int ((uint32)(CL_DA_FOLLOWS | seq), ses);	\
  print_int (nth_param_row, ses); \
  cl_send_db_activity (ses); \
}

#define CLO_HEAD_LAST_STAT(clt, ses, op, seq, nth) \
{ \
  if (clt->clt_current_cm->cm_in_read_to == clt->clt_current_cm->cm_bytes) { \
    CLO_HEAD_STAT (ses, op, seq, nth); } \
  else \
    CLO_HEAD (ses, op, seq, nth); \
}


caddr_t cl_ddl (query_instance_t * qi, lock_trx_t * lt, caddr_t name, int type, caddr_t trig_table);
caddr_t cl_start_atomic (query_instance_t * qi, caddr_t name, int type);
void cl_local_atomic_over ();
caddr_t cl_read_partition (query_instance_t * qi, caddr_t tb_name);
caddr_t cl_read_cluster (query_instance_t * qi, caddr_t name, int create);
void  clib_more (cll_in_box_t * clib);
cl_op_t * mp_clo_allocate (mem_pool_t * mp, char op);

#ifdef noMTX_DEBUG
#define cl_printf(a) printf a
#else
#define cl_printf(a)
#endif
void  clib_read_next (cll_in_box_t * clib, caddr_t * inst, dk_set_t out_slots);
void clib_rc_init ();

#define CL_CONN_ERROR(error, ses, host, errno_save)				\
  { if (CH_REMOVED != host->ch_status) host->ch_status = CH_OFFLINE; \
  if (&error) error = srv_make_new_error ("08C01", "CL...", "Cluster could not connect to host %d %s error %d", (int)host->ch_id, host->ch_connect_string, (int)errno_save);}

void cl_node_init (table_source_t * ts, caddr_t * inst);

#define IS_CL_NODE(ts) \
  (IS_TS (ts) || (qn_input_fn) insert_node_input == ts->src_gen.src_input \
   || (qn_input_fn) query_frag_input == ts->src_gen.src_input \
   || (qn_input_fn) dpipe_node_input == ts->src_gen.src_input \
   || (qn_input_fn) code_node_input == ts->src_gen.src_input \
   || (qn_input_fn) fun_ref_node_input == ts->src_gen.src_input \
   || IS_STN (ts) \
  )


int itc_cl_local (it_cursor_t * itc, buffer_desc_t * buf);
int itc_cl_others_ready (it_cursor_t * itc);
void cl_row_set_out_cols (dk_set_t slots, caddr_t * inst, cl_op_t * clo);
void cl_select_save_env (table_source_t * ts, itc_cluster_t * itcl, caddr_t * inst, cl_op_t * clo, int nth);
void cl_ts_set_context (table_source_t * ts, itc_cluster_t * itcl, caddr_t * inst, int nth_set);
extern dk_hash_t * cl_inx_to_ssl;
query_t * cl_set_id_qf (uint32 ch_id, uint32 qf_id, query_t * qf);
query_t * cl_id_qf (uint32 ch_id, uint32 qf_id);
void cl_serialize_top_query_frag_t (dk_session_t * out, query_frag_t * s);
int cl_send (cl_host_t * host, cl_message_t * cm, caddr_t * err_ret);
int cl_send_ses (cl_host_t * host, cl_message_t * cm, dk_session_t ** ses_ret, caddr_t * err_ret);
int cl_send_flush (cl_host_t * host, dk_session_t * ses, caddr_t * err_ret, int is_final);
void cl_qf_free (query_frag_t * qf);
void cl_qr_done (query_t * qr);
void cl_dml_results (update_node_t * upd, caddr_t * inst);
void cl_delete_keys (delete_node_t * del, caddr_t * inst, row_delta_t * main_rd);
void itc_delete_rd (it_cursor_t * itc, row_delta_t * rd);
int32  strses_out_bytes (dk_session_t * ses);
int clt_flush (cl_thread_t * clt, int is_final);
void cl_update_keys (update_node_t * upd, caddr_t * inst, row_delta_t * main_rd, row_delta_t * new_rd, dk_set_t keys,
    int ins_new_rd);
void cl_dml_ac_check (query_instance_t * qi, dbe_key_t * key);
int cl_is_ac_dml (query_instance_t * qi);



#define QFID_HOST(i) ((int)(((unsigned int64) (i)) >> 32))

void cl_trx_init ();
extern basket_t cl_trx_queue;
extern semaphore_t * cl_trx_sem;
extern dk_mutex_t * cl_trx_mtx;
extern dk_mutex_t * cl_trx_graph_mtx;
extern dk_hash_64_t * cl_w_id_to_ctrx;
extern cl_host_t * cl_master;


#ifdef CM_TRACE

void cm_record_send (cl_message_t * cm, int to);
void cm_record_dispatch_1 (cl_message_t * cm, int clt, int mode, char *file, int line);
void cm_record_dfg_progress (int coord, int req_no, cl_thread_t * clt, char *file, int line);


#define dfg_progress(coord, req_no, clt)  cm_record_dfg_progress (coord, req_no, clt, __FILE__, __LINE__)

#define cm_record_dispatch(cm, clt, f) cm_record_dispatch_1 (cm, clt, f, __FILE__, __LINE__)
#define cm_record_dfg_deliv(cm, f) cm_record_dispatch_1 (cm, 0, 256 + (f), __FILE__, __LINE__);

#define CM_D_TOP_START 1
#define CM_D_TOP_QUEUE 2
#define CM_D_REC_QUEUED 3
#define CM_D_REC_QUEUE_ASG 4
#define CM_D_REC_INT 5
#define CM_D_DISCARD 6
#define CM_D_TOP_REC_START 7
#define CM_D_REC_CLT_DETACH 8
#define CM_D_CANCEL 9
#define CM_D_REPLY_QUEUED 10
#define CM_D_REPLY_QUEUED_NR 14
#define CM_D_REPLY_WAKE 11
#define CM_D_REPLY_DISCARD 12
#define CM_D_RCV 13
#define CM_D_DFG_PROGRESS 14


#define CM_DFGD_INIT 1
#define CM_DFGD_START 2
#define CM_DFGD_SELF 3
#define CM_DFGD_OTHER 4
#define CM_DFGD_DISCARD 5
#define CM_DFGD_R_QUEUE 6


typedef struct _cm_send_s
{
  int64 cms_id;
  int cms_req_no;
  short cms_to;
} cm_send_t;

typedef struct _cm_trace_s
{
  uint64	cmt_id;
  int64		cmt_line;
  uint32	cmt_req_no;
  uint32	cmt_disp_ts;
  uint32	cmt_dfgd_ts;
  short		cmt_dfg_coord;
  short		cmt_clt;
  char		cmt_dfg_stage;
  char		cmt_disp_mode;
  char		cmt_dfgd_mode;
} cm_trace_t;


#else
#define cm_record_send(cm, to)
#define cm_record_dispatch(cm, clt, r)
#define cm_record_dfg_deliv(cm, f)
#define dfg_progress(coord, req_no, clt)
#endif

void cl_notify_wait (gen_lock_t * pl, it_cursor_t * itc, buffer_desc_t * buf);
void cl_notify_transact (lock_trx_t * lt);
void cl_master_notify_transact (int64 w_id);
void lt_kill_while_2pc_prepare (lock_trx_t * lt);
void lt_expedite_1pc (lock_trx_t * lt);
void lt_send_rollbacks (lock_trx_t * lt, int sync);
extern int cl_trx_inited;
#define CTRX_H_ID(c) ((int) (c->ctrx_w_id >> 32))
#define CTRX_W_ID(c) ((int32) (c->ctrx_w_id))
#if 0
#define ctrx_printf(x) printf x
#else
#define ctrx_printf(x)
#endif
void cl_clear_dead_w_id ();

#define THR_DBG_CLRG_WAIT ((caddr_t) 1)

void cl_notify_disconnect (int host);
void clo_unlink_clib (cl_op_t * clo, cll_in_box_t * clib, int is_allocd);
void bif_daq_init ();
void cls_call (cl_thread_t * clt, cl_op_t * clo);
void clt_send_error (cl_thread_t * clt, caddr_t err);
uint32 col_part_hash (col_partition_t * cp, caddr_t val, int is_already_cast, int * cast_ret, int32 * rem_ret);
/* for is_already_cast */
#define CP_CAST 0
#define CP_NO_CAST 1
#define CP_CAST_NO_ERROR 2

cl_host_t * clm_choose_host (cluster_map_t * clm, cl_host_t ** group, cl_op_t * clo, int part_mode, int32 rem);
cl_host_t * chg_first (cl_host_t ** group);

cl_host_t **  clm_group (cluster_map_t * clm, uint32 hash, int op);
void  query_frag_run (query_frag_t * qf, caddr_t * inst, caddr_t * state);
int cls_wst_select (cl_thread_t * clt, cl_op_t * clo);
void clt_itc_error (cl_thread_t * clt, it_cursor_t * itc);
int64 itc_cl_sample (it_cursor_t * itc);
void itc_cl_sample_init (it_cursor_t * itc, cl_req_group_t ** clrg_ret);
int64 itc_cl_sample_result (it_cursor_t * itc, cl_req_group_t * clrg);

void cu_free (cucurbit_t * cu);
void dpipe_define (caddr_t name, dbe_key_t * key, caddr_t fn, cu_op_func_t fn_disp, int l);
void dpipe_drop (caddr_t name);
/* dpipe define flags */
#define CF_UPDATE 1
#define CF_1_ARG 2
#define CF_FIRST_RW 4 /* rw of the first partition of the value */
#define CF_REST_RW 8 /* update of replica partitions */
#define CF_READ_NO_TXN 16
#define CF_IS_BIF 32
#define CF_IS_DISPATCH 64
#define CF_SINGLE_ACTION 128 /* one call gets the job done, can be colocated as an ordinary proc call */
#define CF_VECTORED 256

void dpipe_refresh_schema ();
void cl_rdf_init ();
void clrg_check_trx_error (cl_req_group_t * clrg, caddr_t * err);

#define SQLSTATE_IS_TXN(s) (0 == strncmp (s, "400", 3) || 0 == strncmp (s, "08", 2) || 0 == strncmp (s, "S1T0", 4))
void cl_read_dpipes ();
caddr_t * cu_next (cucurbit_t * cu, query_instance_t * qi, int is_flush);
void  dpipe_node_input (dpipe_node_t * dp, caddr_t * inst, caddr_t * state);
void dpipe_node_local_input (dpipe_node_t * dp, caddr_t * inst, caddr_t * stat);
void  dpipe_node_free (dpipe_node_t * dp);
#define IS_DP(xx) ((qn_input_fn)dpipe_node_input == ((data_source_t*)(xx))->src_input)
cu_func_t * cu_func (caddr_t name, int must_find);
void cu_ssl_row (cucurbit_t * cu, caddr_t * qst, state_slot_t ** args, int first_ssl);
void cl_fref_result (fun_ref_node_t * fref, caddr_t * inst, cl_op_t ** clo_ret);
int cl_partitioned_fref_start (dk_set_t nodes, caddr_t * inst);
void ch_qf_closed (cl_host_t * ch, uint32 req_no, cl_message_t * cm);
void cl_timeout_closed_qfs ();
void cm_free_pending_clo (cl_message_t * cm);
#ifdef LT_TRACE_SZ
#define cl_lt_drop_ref(lt, f) { LT_TRACE (lt); cl_lt_drop_ref_1 (lt, f); }
void cl_lt_drop_ref_1 (lock_trx_t * lt, int is_cancel);
#else
void cl_lt_drop_ref (lock_trx_t * lt, int is_cancel);
#endif

extern int qf_trace;
int cls_autocommit (cl_thread_t * clt, cl_op_t * clo, caddr_t err, int recov_deadlock);
void  cl_local_insert (caddr_t * inst, cl_op_t * clo);
void  cl_local_delete (caddr_t * inst, cl_op_t * clo);
cl_req_group_t * dpipe_allocate (query_instance_t * qi, int flags, int n_ops, char ** ops);
void cu_row (cucurbit_t * cu, caddr_t * args);
caddr_t * cu_next (cucurbit_t * cu, query_instance_t * qi, int is_flush);
caddr_t cl_iri_to_id (query_instance_t * qi, caddr_t str, int make_new);
caddr_t cl_id_to_iri (query_instance_t * qi, caddr_t id);
void cu_set_value (cucurbit_t * cu, value_state_t * vs, caddr_t val);
cl_op_t * cl_key_insert_op_vec (caddr_t * qst, dbe_key_t * key, int ins_mode,
    char **col_names, caddr_t * values, cl_req_group_t * clrg, int seq, int nth_set);
cl_op_t * cl_key_delete_op_vec (caddr_t * qst, dbe_key_t * key,
    char **col_names, caddr_t * values, cl_req_group_t * clrg, int seq, int nth_set);

void array_add (caddr_t ** ap, int * fill, caddr_t elt);
void mp_array_add (mem_pool_t * mp, caddr_t ** ap, int * fill, caddr_t elt);
cl_req_group_t * bif_clrg_arg (caddr_t * qst, state_slot_t ** args, int nth, const char *func);
void cl_refresh_status (query_instance_t * qi, int mode);
#define CLST_REFRESH 0
#define CLST_SUMMARY 1
#define CLST_DETAILS 2
void cst_fill_local (cl_status_t * cst);
extern int32 cl_n_hosts;
extern int32 cl_max_hosts;
boxint cl_sequence_next (query_instance_t * qi, caddr_t seq, int step, boxint sz, int in_map, caddr_t * err_ret);

extern int32 cl_stage; /* during init, how far come, one of the values of ch_status */

caddr_t  cl_msg_string (int64 bytes);
int cm_read_in_string (cl_message_t * cm);
int cl_trx_check (int64 trx_no, int retry, int ask_host);
void cls_blob_send (cl_thread_t * clt, cl_message_t * cm);
int  cl_get_blob (lock_trx_t * lt, blob_handle_t * bh, int64 n, int64 skip, dk_session_t ** ses_ret);
dk_set_t  cl_bh_string_list (lock_trx_t * lt, blob_handle_t * bh, int64 n, int64 skip);
void  cls_timeouts (int flags);
#define CL_TIMEOUT_ALL 1 /* cls_timeouts will mark all as timed out to recover from arbitrary hang */
void cu_dispatch (cucurbit_t * cu, value_state_t * vs, cu_func_t * cf, caddr_t val);
void cl_send_kill (int64 lt_w_id, int host_id);
int cl_send_commit (int64 w_id, int to_host);
void cl_schedule_admin (caddr_t text);
void cl_disconnect_query (cl_host_t * ch);
void cl_send_all_atomic (int flag);
void cls_wait_query ();
void cls_seq_alloc (cl_thread_t * clt, cl_message_t * cm);
void lt_io_start (lock_trx_t * lt);
void lt_io_end (lock_trx_t * lt);

void cl_request_wait_query ();
int qn_has_clb_save (data_source_t * qn);
int itcl_fetch_to_set (itc_cluster_t * itcl, int nth);
caddr_t bif_cl_set_switch (caddr_t * qst, caddr_t * err_ret, state_slot_t ** args);
void ssa_set_switch (setp_save_t * ssa, caddr_t * inst, int set);
void cl_set_switch (caddr_t * inst, state_slot_t * set_no, state_slot_t * current_set, state_slot_t * array, state_slot_t ** save,
    int n_save, int target_set, int cl_batch, state_slot_t ** defaults);
void cl_qr_skip_to_set (query_t * qr, caddr_t *  inst, state_slot_t * set_no_ssl, int target_set);
void cl_local_skip_to_set (cll_in_box_t * clib);
int itcl_skip_to_set (itc_cluster_t * itcl, int nth);
void cl_qr_async_flush (query_t * qr, caddr_t * inst, dk_set_t nodes);
void cl_co_flush (table_source_t * ts, caddr_t * inst);

#define IS_MAX_ROWS(m) ((m) && 0 == --(m))
void  cl_rdf_inf_init (client_connection_t * cli, caddr_t * err_ret);

int  itcl_dfg_set_end (itc_cluster_t * itcl, cll_in_box_t * clib);

#define IS_QF(qf) ((qn_input_fn)query_frag_input == ((data_source_t*)qf)->src_input)
#define IS_STN(qf) ((qn_input_fn)stage_node_input == ((data_source_t*)qf)->src_input)

void stage_node_input (stage_node_t * stn, caddr_t * inst, caddr_t * state);
void stage_node_free (stage_node_t * stn);
void dfg_resume_pending (query_t * subq, query_instance_t * qi, dk_set_t nodes, int * any_done, stage_node_t * successors_only);
void dfg_coord_batch (itc_cluster_t *itcl, cll_in_box_t * clib);
int  clib_enlist (cll_in_box_t * clib, cl_message_t * cm);
void clib_read_into_slots (cll_in_box_t * clib, caddr_t * qst, dk_set_t slots);
void itc_locate (it_cursor_t * itc, dk_set_t * hosts_ret, cl_op_t * op, int *is_local, cl_req_group_t ** clrg_ret, lock_trx_t * lt);
void qf_locate (cl_op_t * op, caddr_t * qst, int *is_local, cl_req_group_t ** clrg_ret, lock_trx_t * lt);
void rd_locate (row_delta_t * rd, dk_set_t * hosts_ret, cl_op_t * op, int *is_local, cl_req_group_t ** clrg_ret, lock_trx_t * lt);

void rd_free_temp_blobs (row_delta_t * rd, lock_trx_t * lt, int is_local);
int cl_handle_reset (cl_op_t * clo, cl_thread_t * clt, query_instance_t * qi, int reset_code);
void clib_assign_req_no (cll_in_box_t * clib);
cll_in_box_t * itcl_local_start (itc_cluster_t * itcl);
cl_op_t * itcl_next (itc_cluster_t * itcl);
cl_op_t * itcl_next_no_order (itc_cluster_t * itcl, cl_buffer_t * clb);

void qf_trace_ret (query_frag_t * qf, itc_cluster_t * itcl, cl_op_t * clo);
void cl_dfg_start_search (query_frag_t * qf, caddr_t * inst);
void cl_dfg_results (query_frag_t * qf, caddr_t * inst);
int cl_dfg_continue (cl_thread_t * clt, cl_op_t * clo);
int cl_dfg_exec (cl_thread_t * clt, cl_op_t * clo);
void dfg_batch_end (stage_node_t ** nodes, caddr_t * inst, int n_stages, cll_in_box_t * local_clib, int is_cl_more);
void cl_dfg_flush (dk_set_t nodes, caddr_t * inst, caddr_t * err_ret, caddr_t * main_inst);
void cm_unregister (cl_message_t * cm, cl_host_t * ch);
int dfg_coord_should_pause (itc_cluster_t * itcl);
table_source_t * sqlg_loc_ts (table_source_t * ts, table_source_t * prev_loc_ts);
void lt_free_branches (lock_trx_t * lt);
void cl_dfg_no_cancel_forward (dk_set_t nodes, caddr_t * inst);
stage_node_t *qf_nth_stage (stage_node_t ** nodes, int nth);

void cluster_bifs ();

#define CL_MARK_MSG(lt, bytes)			\
  {if (lt) {lt->lt_client->cli_activity.da_cl_messages++; lt->lt_client->cli_activity.da_cl_bytes += bytes;}}

void sel_multistate_top (select_node_t *sel, caddr_t * inst);
void cl_write_disconnect_srv (void* cd);
void cl_rdf_bif_check_init (bif_t bif);

#define QF_XML(qi,par) \
  { \
dtp_t dtp = DV_TYPE_OF (par); \
  if (DV_XML_ENTITY == dtp || DV_ARRAY_OF_POINTER == dtp) \
    xte_set_qi (par, qi); \
}

int qi_anytime_send_check (caddr_t * inst);
char *  cl_thr_stat ();

int key_is_known_partition (dbe_key_t * key, caddr_t * qst, search_spec_t * ksp, search_spec_t * rsp, uint32 * hash_ret,
    it_cursor_t * itc, int32 * rem_ret);
#define KP_ALL 0
#define KP_ONE 1
#define KP_NULL 2


#define IS_CL_TXS(txs) \
  ((qn_input_fn)txs_input == ((data_source_t*)txs)->src_input   && ((text_node_t*)txs)->txs_loc_ts)

int key_is_d_id_partition (dbe_key_t * key);
void lt_set_w_id (lock_trx_t * lt, int64 w_id);
caddr_t cl_read_map ();
void cluster_after_online ();
void cl_qi_count_affected (query_instance_t * qi, cl_req_group_t * clrg);
void ch_qf_closed (cl_host_t * ch, uint32 req_no, cl_message_t * cm);


int cl_w_timeout_hook (dk_session_t * ses);
#if 1
#define io_printf(a)
#else
#define io_printf(a) printf a
#endif

#define CL_ONLINE_CK \
  {if (CH_ONLINE !=cl_stage && CL_RUN_CLUSTER == cl_run_local_only) sqlr_new_error ("08C06", "CLNJO", "Cluster operations not allowed until confirmed online");}

void cl_flt_init ();
void cl_flt_init_2 ();
extern cluster_map_t * clm_all;
dk_session_t * dks_file (char * name, int flags);
void clib_row_boxes (cll_in_box_t * clib);
void clib_prepare_read_rows (cll_in_box_t * clib);
void clrg_add_clib (cl_req_group_t * clrg, cll_in_box_t * clib);
uint32 cp_string_hash (col_partition_t * cp, caddr_t bytes, int len, int32 * rem_ret);
uint32 cp_int_any_hash (col_partition_t * cp, unsigned int64 i, int32 * rem_ret);
uint32 cp_double_hash (col_partition_t * cp, double d, int32 * rem_ret);
uint32 cp_any_hash (col_partition_t * cp, db_buf_t val, int32 * rem_ret);

#define N_ONES(n) ((1 << (n)) - 1)



extern int enable_small_int_part;
#if 0
#define I_PART(i) i
#else
#define I_PART(i, shift) \
 (enable_small_int_part && (uint64)i < (1 << (shift + 2)) \
 ? ((uint64)i) << shift  \
 : (uint64)i)
#endif

#define cp_int_hash(cp, i, rem_ret)				\
  ((*rem_ret = (cp->cp_shift << 24) | (cp->cp_shift ? (I_PART (i, cp->cp_shift) & N_ONES (cp->cp_shift)) : -1)), \
   ((((unsigned int64)I_PART (i, cp->cp_shift)) >> cp->cp_shift) & cp->cp_mask))




extern dk_mutex_t * clrg_ref_mtx;
extern long dbf_cpt_rb;
cll_in_box_t * clib_allocate ();
id_hash_t * dict_ht (id_hash_iterator_t * dict);
void dpipe_signature (caddr_t name, int n_args, ...);
#define CU_CLI(cu) ((cu)->cu_clrg->clrg_lt ? (cu)->cu_clrg->clrg_lt->lt_client : NULL)
void qi_free_dfg_queue_nodes (query_instance_t * qi, dk_set_t nodes);
void qf_set_cost (query_frag_t * qf);


#endif