1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376
|
/*
* %CopyrightBegin%
*
* SPDX-License-Identifier: Apache-2.0
*
* Copyright Ericsson AB 2001-2025. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* %CopyrightEnd%
*/
#ifndef ERL_NODE_TABLES_BASIC__
#define ERL_NODE_TABLES_BASIC__
typedef struct dist_entry_ DistEntry;
typedef struct ErtsDistOutputBuf_ ErtsDistOutputBuf;
typedef struct ErtsDistOutputBufsContainer_ ErtsDistOutputBufsContainer;
void erts_ref_dist_entry(DistEntry *dep);
void erts_deref_dist_entry(DistEntry *dep);
#endif
#if !defined(ERL_NODE_TABLES_BASIC_ONLY) && !defined(ERL_NODE_TABLES_H__)
#define ERL_NODE_TABLES_H__
/*
* The "node_tables module" contain two (hash) tables: the node_table
* and the dist_table.
*
* The elements of the node_table represents a specific incarnation of
* an Erlang node and has {Nodename, Creation} pairs as keys. Elements
* in the node_table are referred to from node containers (see
* node_container_utils.h).
*
* The elements of the dist_table represents a (potential) connection
* to an Erlang node and has Nodename as key. Elements in the
* dist_table are either referred to from elements in the node_table
* or from the process or port structure of the entity controlling
* the connection.
*
* Both tables are garbage collected by reference counting.
*/
#include "sys.h"
#include "hash.h"
#include "erl_alloc.h"
#define ERTS_PORT_TASK_ONLY_BASIC_TYPES__
#include "erl_port_task.h"
#undef ERTS_PORT_TASK_ONLY_BASIC_TYPES__
#include "erl_process.h"
#define ERTS_BINARY_TYPES_ONLY__
#include "erl_binary.h"
#undef ERTS_BINARY_TYPES_ONLY__
#include "erl_monitor_link.h"
#define ERTS_NODE_TAB_DELAY_GC_DEFAULT (60)
#define ERTS_NODE_TAB_DELAY_GC_MAX (100*1000*1000)
#define ERTS_NODE_TAB_DELAY_GC_INFINITY (ERTS_NODE_TAB_DELAY_GC_MAX+1)
#define ERST_INTERNAL_CHANNEL_NO 0
enum dist_entry_state {
ERTS_DE_STATE_IDLE,
ERTS_DE_STATE_PENDING,
ERTS_DE_STATE_CONNECTED,
ERTS_DE_STATE_EXITING
};
#define ERTS_DE_QFLG_BUSY (((erts_aint32_t) 1) << 0)
#define ERTS_DE_QFLG_EXIT (((erts_aint32_t) 1) << 1)
#define ERTS_DE_QFLGS_ALL (ERTS_DE_QFLG_BUSY \
| ERTS_DE_QFLG_EXIT)
#if defined(ARCH_64)
#define ERTS_DIST_OUTPUT_BUF_DBG_PATTERN ((Uint) 0xf713f713f713f713UL)
#else
#define ERTS_DIST_OUTPUT_BUF_DBG_PATTERN ((Uint) 0xf713f713)
#endif
struct ErtsDistOutputBuf_ {
#ifdef DEBUG
Uint dbg_pattern;
#endif
ErtsDistOutputBuf *next;
Binary *bin;
/*
* iov[0] reserved for driver
* iov[1] reserved for distribution header
* iov[2 ... vsize-1] data
*/
ErlIOVec *eiov;
int ignore_busy;
};
struct ErtsDistOutputBufsContainer_ {
Sint fragments;
byte *extp;
ErtsDistOutputBuf obuf[1]; /* longer if fragmented... */
};
typedef struct {
ErtsDistOutputBuf *first;
ErtsDistOutputBuf *last;
} ErtsDistOutputQueue;
struct ErtsProcList_;
/*
* Lock order:
* 1. dist_entry->rwmtx
* 2. erts_node_table_rwmtx
* 3. erts_dist_table_rwmtx
*
* Lock mutexes with lower numbers before mutexes with higher numbers and
* unlock mutexes with higher numbers before mutexes with higher numbers.
*/
struct dist_entry_ {
HashBucket hash_bucket; /* Hash bucket */
struct dist_entry_ *next; /* Next entry in dist_table (not sorted) */
struct dist_entry_ *prev; /* Previous entry in dist_table (not sorted) */
erts_rwmtx_t rwmtx; /* Protects all fields below until lck_mtx. */
Eterm sysname; /* name@host atom for efficiency */
Uint32 creation; /* creation of connected node */
erts_atomic_t input_handler; /* Input handler */
Eterm cid; /* connection handler (pid or port),
NIL == free */
Uint32 connection_id; /* Connection id incremented on connect */
enum dist_entry_state state;
int pending_nodedown;
Process* suspended_nodeup;
Uint64 dflags; /* Distribution flags, like hidden,
atom cache etc. */
Uint32 opts;
ErtsMonLnkDist *mld; /* Monitors and links */
erts_mtx_t qlock; /* Protects qflgs and out_queue */
erts_atomic32_t qflgs;
erts_atomic32_t notify; /* User wants queue notification? */
erts_atomic_t qsize; /* Size of data in queue respecting busy dist */
erts_atomic_t total_qsize; /* Total size of data in queue */
erts_atomic64_t in;
erts_atomic64_t out;
ErtsDistOutputQueue out_queue;
struct ErtsProcList_ *suspended;
ErtsDistOutputQueue tmp_out_queue;
ErtsDistOutputQueue finalized_out_queue;
erts_atomic_t dist_cmd_scheduled;
ErtsPortTaskHandle dist_cmd;
Uint (*send)(Port *prt, ErtsDistOutputBuf *obuf);
struct cache* cache; /* The atom cache */
ErtsThrPrgrLaterOp later_op;
struct dist_sequences *sequences; /* Ongoing distribution sequences */
};
/*
#define ERL_NODE_BOOKKEEP
* Bookkeeping of ErlNode inc and dec operations to help debug refc problems.
* This is best used together with cerl -rr. Type the below into gdb:
* gdb:
set pagination off
set $i = 0
set $node = referred_nodes[$node_ix].node
while $i < $node->slot.counter
printf "%s:%d ", $node->books[$i].file, $node->books[$i].line
printf "%p: ", $node->books[$i].term
etp-1 $node->books[$i].who
printf " "
p $node->books[$i].what
set $i++
end
* Then save that into a file called test.txt and run the below in
* an erlang shell in order to get all inc/dec that do not have a
* match.
f(), {ok, B} = file:read_file("test.txt").
Vs = [begin [Val, _, _, _, What] = All = string:lexemes(Ln, " "),{Val,What,All} end || Ln <- string:lexemes(B,"\n")].
Accs = lists:foldl(fun({V,<<"ERL_NODE_INC">>,_},M) -> Val = maps:get(V,M,0), M#{ V => Val + 1 }; ({V,<<"ERL_NODE_DEC">>,_},M) -> Val = maps:get(V,M,0), M#{ V => Val - 1 } end, #{}, Vs).
lists:usort(lists:filter(fun({V,N}) -> N /= 0 end, maps:to_list(Accs))).
* There are bound to be bugs in the instrumentation code, but
* at least this is a place to start when hunting refc bugs.
*
*/
#ifdef ERL_NODE_BOOKKEEP
struct erl_node_bookkeeping {
Eterm who;
Eterm term;
char *file;
int line;
enum { ERL_NODE_INC, ERL_NODE_DEC } what;
};
#define ERTS_BOOKKEEP_SIZE (1024)
#endif
typedef struct erl_node_ {
HashBucket hash_bucket; /* Hash bucket */
erts_refc_t refc; /* Reference count */
Eterm sysname; /* name@host atom for efficiency */
Uint32 creation; /* Creation */
DistEntry *dist_entry; /* Corresponding dist entry */
#ifdef ERL_NODE_BOOKKEEP
struct erl_node_bookkeeping books[ERTS_BOOKKEEP_SIZE];
erts_atomic_t slot;
#endif
} ErlNode;
extern Hash erts_dist_table;
extern Hash erts_node_table;
extern erts_rwmtx_t erts_dist_table_rwmtx;
extern erts_rwmtx_t erts_node_table_rwmtx;
extern DistEntry *erts_hidden_dist_entries;
extern DistEntry *erts_visible_dist_entries;
extern DistEntry *erts_pending_dist_entries;
extern DistEntry *erts_not_connected_dist_entries;
extern Sint erts_no_of_hidden_dist_entries;
extern Sint erts_no_of_visible_dist_entries;
extern Sint erts_no_of_pending_dist_entries;
extern Sint erts_no_of_not_connected_dist_entries;
extern DistEntry *erts_this_dist_entry;
extern ErlNode *erts_this_node;
extern char *erts_this_node_sysname; /* must match erl_node_tables.c */
Uint erts_delayed_node_table_gc(void);
DistEntry *erts_channel_no_to_dist_entry(Uint);
DistEntry *erts_sysname_to_connected_dist_entry(Eterm);
DistEntry *erts_find_or_insert_dist_entry(Eterm);
DistEntry *erts_find_dist_entry(Eterm);
void erts_schedule_delete_dist_entry(DistEntry *);
Uint erts_dist_table_size(void);
void erts_dist_table_info(fmtfn_t, void *);
void erts_set_dist_entry_not_connected(DistEntry *);
void erts_set_dist_entry_pending(DistEntry *);
void erts_set_dist_entry_connected(DistEntry *, Eterm, Uint64);
ErlNode *erts_find_or_insert_node(Eterm, Uint32, Eterm);
ErlNode *erts_find_node(Eterm, Uint32);
void erts_schedule_delete_node(ErlNode *);
void erts_set_this_node(Eterm, Uint32);
Uint erts_node_table_size(void);
void erts_init_node_tables(int);
void erts_node_table_info(fmtfn_t, void *);
void erts_print_node_info(fmtfn_t, void *, Eterm, int*, int*);
Eterm erts_get_node_and_dist_references(struct process *);
#if defined(ERTS_ENABLE_LOCK_CHECK)
int erts_lc_is_de_rwlocked(DistEntry *);
int erts_lc_is_de_rlocked(DistEntry *);
#endif
int erts_dist_entry_destructor(Binary *bin);
DistEntry *erts_dhandle_to_dist_entry(Eterm dhandle, Uint32* connection_id);
#define ERTS_DHANDLE_SIZE (3+ERTS_MAGIC_REF_THING_SIZE)
Eterm erts_build_dhandle(Eterm **hpp, ErlOffHeap*, DistEntry*, Uint32 conn_id);
Eterm erts_make_dhandle(Process *c_p, DistEntry*, Uint32 conn_id);
ERTS_GLB_INLINE void erts_init_node_entry(ErlNode *np, erts_aint_t val);
#ifdef ERL_NODE_BOOKKEEP
#define erts_ref_node_entry(NP, MIN, T) erts_ref_node_entry__((NP), (MIN), (T), __FILE__, __LINE__)
#define erts_deref_node_entry(NP, T) erts_deref_node_entry__((NP), (T), __FILE__, __LINE__)
ERTS_GLB_INLINE erts_aint_t erts_ref_node_entry__(ErlNode *np, int min_val, Eterm term, char *file, int line);
ERTS_GLB_INLINE void erts_deref_node_entry__(ErlNode *np, Eterm term, char *file, int line);
#else
ERTS_GLB_INLINE erts_aint_t erts_ref_node_entry(ErlNode *np, int min_val, Eterm term);
ERTS_GLB_INLINE void erts_deref_node_entry(ErlNode *np, Eterm term);
#endif
ERTS_GLB_INLINE erts_aint_t erts_node_refc(ErlNode *np);
ERTS_GLB_INLINE void erts_de_rlock(DistEntry *dep);
ERTS_GLB_INLINE void erts_de_runlock(DistEntry *dep);
ERTS_GLB_INLINE void erts_de_rwlock(DistEntry *dep);
ERTS_GLB_INLINE void erts_de_rwunlock(DistEntry *dep);
#ifdef ERL_NODE_BOOKKEEP
void erts_node_bookkeep(ErlNode *, Eterm , int, char *file, int line);
#else
#define erts_node_bookkeep(...)
#endif
#if ERTS_GLB_INLINE_INCL_FUNC_DEF
ERTS_GLB_INLINE void
erts_init_node_entry(ErlNode *np, erts_aint_t val)
{
erts_refc_init(&np->refc, val);
}
#ifdef ERL_NODE_BOOKKEEP
ERTS_GLB_INLINE erts_aint_t
erts_ref_node_entry__(ErlNode *np, int min_val, Eterm term, char *file, int line)
{
erts_node_bookkeep(np, term, ERL_NODE_INC, file, line);
return erts_refc_inctest(&np->refc, min_val);
}
ERTS_GLB_INLINE void
erts_deref_node_entry__(ErlNode *np, Eterm term, char *file, int line)
{
erts_node_bookkeep(np, term, ERL_NODE_DEC, file, line);
if (erts_refc_dectest(&np->refc, 0) == 0)
erts_schedule_delete_node(np);
}
#else
ERTS_GLB_INLINE erts_aint_t
erts_ref_node_entry(ErlNode *np, int min_val, Eterm term)
{
return erts_refc_inctest(&np->refc, min_val);
}
ERTS_GLB_INLINE void
erts_deref_node_entry(ErlNode *np, Eterm term)
{
if (erts_refc_dectest(&np->refc, 0) == 0)
erts_schedule_delete_node(np);
}
ERTS_GLB_INLINE erts_aint_t
erts_node_refc(ErlNode *np)
{
return erts_refc_read(&np->refc, 0);
}
#endif
ERTS_GLB_INLINE void
erts_de_rlock(DistEntry *dep)
{
erts_rwmtx_rlock(&dep->rwmtx);
}
ERTS_GLB_INLINE void
erts_de_runlock(DistEntry *dep)
{
erts_rwmtx_runlock(&dep->rwmtx);
}
ERTS_GLB_INLINE void
erts_de_rwlock(DistEntry *dep)
{
erts_rwmtx_rwlock(&dep->rwmtx);
}
ERTS_GLB_INLINE void
erts_de_rwunlock(DistEntry *dep)
{
erts_rwmtx_rwunlock(&dep->rwmtx);
}
#endif /* #if ERTS_GLB_INLINE_INCL_FUNC_DEF */
void erts_debug_test_node_tab_delayed_delete(Sint64 millisecs);
void erts_lcnt_update_distribution_locks(int enable);
#endif
|