File: write-to-take.md

package info (click to toggle)
cyclonedds 0.10.5-1
  • links: PTS, VCS
  • area: main
  • in suites: sid, trixie
  • size: 21,372 kB
  • sloc: ansic: 224,361; perl: 1,904; xml: 1,894; yacc: 1,018; sh: 882; python: 106; makefile: 94
file content (279 lines) | stat: -rw-r--r-- 14,344 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
## Write-to-take with memory allocations

Path traversed by a sample, skipping some trivial functions and functions that are simply
"more of the same", as well as with a bit of license in the "local subscription bypass"
(skips straight from `write_sample_gc` to `rhc_store`, which is a bit of a lie when you
look carefully at the code).  This is annotated with memory allocation activity.

Currently in default configuration, everything from `dds_write` to `sendmsg` happens on
the application thread.  With current "asynchronous" mode, outgoing packets get queued and
are transmitted on a separate thread.

    APPLICATION
      |
      v
    dds_write
      |
      |  serdata: from_sample
      |    - allocates "serdata" and serializes into it
      |  key to instance_handle mapping (1)
      |    allocates for a unique key
      |    - tkmap_instance (entry in table)
      |    - "untyped serdata" for insertion in table
      |    - possibly resizes hash table
      |    (if no readers, undoes the above)
      |
      v
    write_sample_gc
      |         \
      |          \ (local subscriptiosn bypass)
      |           \
      |            \
      |             |
      |  whc_insert (for reliable data)
      |    - allocates whc_node (2)
      |    - inserts in seq# hash (which may grow hash table)
      |    - adds to seq# interval tree (which may require an interval tree node)
      |    - may push out old samples, which may cause frees
      |    - if keyed topic: insert in index on instance_handle
      |      - may grow hash table (1)
      |             |
      v             |
    transmit_sample |
      |             |
      |  allocate and initialise new "xmsg" (3)
      |    - large samples: needs many, for DATAFRAG and HEARTBEATFRAG
      |    - serialised data integrated by-reference
      |    - may also need one for a HEARTBEAT
      |             |
      |  xpack_add called for each "xmsg" to combine into RTPS messages
      |    - fills a (lazily allocated per writer) scatter/gather list
      |    - hands off the packet for synchronous/asynchronous publication
      |      when full/flushed
      |             |
    nn_xpack_send.. | ......+ (*current* asynchronous mode)
      |             |       |
      |             |       v
      |             |     nn_xpack_sendq_thread
      |             |       |
      |             |       |
      v             |       v
    nn_xpack_send_real    nn_xpack_send_real
      |             |       |
      |    - transmits the packet using sendmsg()
      |    - releases record of samples just sent to track the highest seq#
      |      that was actually transmitted, which may release samples if
      |      these were ACK'd already (unlikely, but possible) (3)
      |             |       |
      |             |       |
      v             |       v
    sendmsg         |     sendmsg
      .             |
      .             |
      .             |
    [network]       | (local subscriptions bypass)
      .             |
      .             |
      .             |
    nn_rmsg_new     |
      |             |
      |  ensure receive buffer space is available (5)
      |    - may allocate new buffers: data is left in these buffers while
      |      defragmenting or dealing with samples received out-of-order
      |    - these buffers are huge in the default config to reduce number of allocations
      v             |
    recvmsg         |
      |             |
      |             |
      v             |
    do_packet/handle_submsg_sequence (5)
      |             |
      |  typically allocates memory, typically contiguous with received datagram by
      |  bumping a pointer
      |    - COW receiver state on state changes
      |             |
      |  DATA/DATAFRAG/GAP:
      |    - allocate message defragmenting state
      |    - allocate message reordering state
      |    - (typically GAP doesn't require the above)
      |    - may result in delivering data or discarding fragments, which may free memory
      |             |
      |  ACKNACK:   |
      |    - may drop messages from WHC, freeing (2):
      |      - whc_node, interval tree entry, index entries, possibly serdata
      |      - possible "keyless serdata" and instance_handle index entry
      |  ACKNACK/NACKFRAG:
      |    - possibly queues retransmits, GAPs and HEARTBEATs
      |      - allocates "xmsg"s (like data path) (3)
      |      - allocates queue entries (4)
      |      - freed upon sending
      |             |
      |  HEARTBEAT: |
      |    - may result in delivering data or discarding fragments, which may free memory
      |             |
      |             |
      |  Note: asynchronous delivery queues samples ready for delivery; the
      |  matching delivery thread then calls deliver_user_data_synchronously
      |  to deliver the data (no allocations needed for enqueuing)
      |             |
      v             |
    deliver_user_data_synchronously
      |             |
      |  serdata: from_ser
      |    - allocates a "serdata" and, depending on the implementation,
      |      validates the serialized data and stores it (e.g., the C version),
      |      deserialises it immediately (e.g., the C++ version), or leaves
      |      if in the receive buffers (incrementing refcounts; not done currently,
      |      probably not a wise choice either)
      |             |
      |  frees receive buffer claims after having created the "serdata" (5)
      |  typical synchronous delivery path without message loss:
      |    - resets receive buffer allocator pointer to what it was prior to processing
      |      datagram, re-using the memory for the next packet
      |    - (but typical is not so interesting in a worst-case analysis ...)
      |             |
      |  key to instance_handle mapping (1)
      |    allocates for a unique key
      |    - tkmap_instance (entry in table)
      |    - "untyped serdata" for insertion in table
      |    - possibly resizes hash table
      |    (if no readers, undoes the above)
      |             |
      |            /
      |           /
      |          / (local subscriptions bypass)
      v         /
    rhc_store (once per reader)
      |
      |  - allocates new "instance" if instance handle not yet present in its hash table
      |    may grow instance hash table
      |  - allocates new sample (typically, though never for KEEP_LAST with depth 1 nor
      |    for pushing old samples out of the history
      |  - may free serdatas that were pushed out of the history
      |    this won't affect the instance_handle mappings nor the "untyped serdata"
      |    because overwriting data in the history doesn't change the set of keys
      |  - may require insertion of a "registration" in a hash table, which in turn
      |    may require allocating/growing the hash table (the "registration" itself
      |    is not allocated)
      |
      v
    dds_take/dds_read
      |
      |  - serdata: to_sample, deserialises into application representation
      |  dds_take:
      |  - frees the "serdata" if the refcount drops to zero
      |  - removes the sample from the reader history (possibly the instance as well)
      |    which may involve
      |  - freeing memory allocated for the instance handle mapping and the "untyped
      |    serdata" (1)
      |
      v
    APPLICATION

There are a number of points worth noting in the above:

* Cyclone defers freeing memory in some cases, relying on a garbage collector, but this
  garbage collector is one in the sense of the garbage trucks that drive through the
  streets collecting the garbage that has been put on the sidewalk, rather than the
  stop-the-world/"thief in the night" model used in Java, C#, Haskell, &c.
  
  The deferring is so that some data can be used without having to do reference counting
  for dynamic references caused by using some data for a very short period of time, as
  well as, to some extent, to not incur the cost of freeing at the point of use.  This is
  currently used for:
  
  * mapping entries for key value to instance handle
  * all DDSI-level entities (writers, readers, participants, topics and their "proxy"
    variants for remote ones)
  * hash table buckets for the concurrent hash tables used to index the above
  * active manual-by-participant lease objects in proxy participants
  
  Freeing these requires enqueueing them for the garbage collector; that in turn is
  currently implemented by allocating a "gcreq" queue entry.

* If one only uses keyless topics (like ROS 2 in its current version) for each topic there
  is at most a single "instance" and consequently, at most a single instance handle and
  mapping entry at any one time.  For administrating these instance handles, the
  implementation reduces the sample to its key value and erases the topic from it (the
  "untyped serdata" in the above).  This way, if different topics from the same underlying
  type implementation have the same key value, they may get the same instance handle and
  the same mapping entry.

* The allocations of `whc_node` (marked `(2)`) for tracking individual samples in the
  writer history cache potentially happen at very high rates (> 1M/s for throughput tests
  with small samples) and I have seen the allocator becomes a bottleneck.  Caching is the
  current trick to speed this up; the cache today is bounded by what can reasonably be
  expected to be needed.
  
  The interval tree is allocated/freed without caching, because there is far less churn
  for those (e.g., for a simple queue, there is only one interval needed).
  
  The number of samples in the writer history cache is, of course, bounded by history
  settings, and so these could just as easily be pre-allocated.  Even better is to use the
  fact that the WHC has been abstracted in the code.  A simple pre-allocated circular
  array is sufficient for a implementing a queue with limited depth, and so using a
  different implementation is altogether more sensible.

* The "xmsg" (marked `(3)`) that represent RTPS submessages (or more precisely: groups of
  submessages that must be kept together, like an INFO_TS and the DATA it applies to) are
  cached for the same reason the WHC entries are cached.  Unlike the latter, the lifetime
  of the "xmsg" is the actual sending of data.  The number of "xmsg" that can be packed
  into a message is bounded, and certainly for the data path these can be preallocated.
  Currently they are cached.
  
  For generating responses to ACKNACKs and HEARTBEATs and queueing them, maintaining a
  pool with a sensible policy in case the pool is too small should be possible: not
  sending a message in response because no "xmsg" is available is equivalent to it getting
  lost on the network, and that is supported.  The "sensible" part of the policy is that
  it may be better to prioritise some types over others, e.g., perhaps it would be better
  to prioritise acknowledgements over retransmits.  That's something worth investigating.

* Responses to received RTPS messages are not sent by the same thread, but rather by a
  separate thread (marked `(4)`).  Retransmits (for example) are generated and queued by
  the thread handling the incoming packet, but (for example) ACKNACKs are generated by
  rescheduling a pre-existing event that then generates an "xmsg" in that separate thread
  and transmits it similar to regular data transmission.
  
  Pre-allocating the queue entries (or better yet: turning the queue into a circular
  array) has similar considerations to pre-allocating the "xmsg".

* Receive buffers are troublesome (marked `(5)`): to support platforms without
  scatter/gather support in the kernel (are there still any left?) it allocates large
  chunks of memory so it can accept a full UDP datagram in it with some room to spare.
  That room to spare is then used to store data generated as a consequence of interpreting
  the packet: receiver state, sample information, administration for tracking fragments or
  samples that were received out of order.  Data is the longest lived of all this, and so
  releasing the memory happens when all data in the buffer has been delivered (or
  discarded).
  
  If fragmented data is received or data is received out-of-order, that means the packets
  can hang around for a while.  The benefit (and primary reason why I made this experiment
  way back when) is that there is no copying of anything until the received data is turned
  into "serdata"s by `from_ser`.  Even at that stage, it is still possible to not copy the
  data but instead leave it in the receive buffer, but I have never actually tried that.
 
  The allocator used within the buffers is a simple bump allocator with the optimisation
  that it resets completely if no part of the packet remains live after processing all its
  submessages, and so for non-fragmented data with negligible packet loss, you're
  basically never allocating memory.  Conversely, when data is fragmented or there is
  significant packet loss, it becomes a bit of a mess.
  
  Eliminating the allocating and freeing of these buffers needs some thought ... quite
  possibly the best way is to assume scatter/gather support and accept an additional copy
  if the data can't be delivered to the reader history immediately.

* The reader history cache is really vastly overcomplicated for many simlpe use cases.
  Similarly to the writer history cache, it can be replaced by an alternate
  implementation, and that's probably the most straightforward path to eliminating
  allocations.  Alternatively, one could pre-allocate instances and samples.
  
  The "registrations" that track which writer is writing which instances do not incur any
  allocations unless there are multiple writers writing the same instance.  Cyclone
  follows the DDS spec in treating topics that have no key fields as topics having a
  single instance, and so multiple publishers for the same topic will cause these to be
  tracked.  That means typical queuing patterns result in as many "registrations" for a
  topic as there are writers for that topic.

  It may be a good idea to add a setting to avoid tracking registrations.  Note that the
  DDSI spec already describes a model wherein there are no registrations for keyless
  topics.