1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323
|
:orphan:
.. _remote-reference-protocol:
Remote Reference Protocol
=========================
This note describes the design details of Remote Reference protocol and walks
through message flows in different scenarios. Make sure you're familiar with the
:ref:`distributed-rpc-framework` before proceeding.
Background
^^^^^^^^^^
RRef stands for Remote REFerence. It is a reference of an object which is
located on the local or remote worker, and transparently handles reference
counting under the hood. Conceptually, it can be considered as a distributed
shared pointer. Applications can create an RRef by calling
:meth:`~torch.distributed.rpc.remote`. Each RRef is owned by the callee worker
of the :meth:`~torch.distributed.rpc.remote` call (i.e., owner) and can be used
by multiple users. The owner stores the real data and keeps track of the global
reference count. Every RRef can be uniquely identified by a global ``RRefId``,
which is assigned at the time of creation on the caller of the
:meth:`~torch.distributed.rpc.remote` call.
On the owner worker, there is only one ``OwnerRRef`` instance, which contains
the real data, while on user workers, there can be as many ``UserRRefs`` as
necessary, and ``UserRRef`` does not hold the data. All usage on the owner will
retrieve the unique ``OwnerRRef`` instance using the globally unique ``RRefId``.
A ``UserRRef`` will be created when it is used as an argument or return value in
:meth:`~torch.distributed.rpc.rpc_sync`,
:meth:`~torch.distributed.rpc.rpc_async` or
:meth:`~torch.distributed.rpc.remote` invocation, and the owner will be notified
according to update the reference count. An ``OwnerRRef`` and its data will be
deleted when there is no ``UserRRef`` instances globally and there are no
reference to the ``OwnerRRef`` on the owner as well.
Assumptions
^^^^^^^^^^^
RRef protocol is designed with the following assumptions.
- **Transient Network Failures**: The RRef design handles transient
network failures by retrying messages. It cannot handle node crashes or
permanent network partitions. When those incidents occur, the application
should take down all workers, revert to the previous checkpoint, and resume
training.
- **Non-idempotent UDFs**: We assume the user functions (UDF) provided to
:meth:`~torch.distributed.rpc.rpc_sync`,
:meth:`~torch.distributed.rpc.rpc_async` or
:meth:`~torch.distributed.rpc.remote` are not idempotent and therefore
cannot be retried. However, internal RRef control messages are idempotent and
retried upon message failure.
- **Out of Order Message Delivery**: We do not assume message delivery order
between any pair of nodes, because both sender and receiver are using multiple
threads. There is no guarantee on which message will be processed first.
RRef Lifetime
^^^^^^^^^^^^^
The goal of the protocol is to delete an ``OwnerRRef`` at an appropriate time.
The right time to delete an ``OwnerRRef`` is when there are no living
``UserRRef`` instances and user code is not holding references to the
``OwnerRRef`` either. The tricky part is to determine if there are any living
``UserRRef`` instances.
Design Reasoning
----------------
A user can get a ``UserRRef`` in three situations:
1) Receiving a ``UserRRef`` from the owner.
2) Receiving a ``UserRRef`` from another user.
3) Creating a new ``UserRRef`` owned by another worker.
Case 1 is the simplest where the owner passes its RRef to a user, where the
owner calls :meth:`~torch.distributed.rpc.rpc_sync`,
:meth:`~torch.distributed.rpc.rpc_async`, or
:meth:`~torch.distributed.rpc.remote` and uses its RRef as an argument. In this
case a new ``UserRRef`` will be created on the user. As the owner is the caller,
it can easily update its local reference count on the ``OwnerRRef``.
The only requirement is that any
``UserRRef`` must notify the owner upon destruction. Hence, we need the first
guarantee:
**G1. The owner will be notified when any UserRRef is deleted.**
As messages might come delayed or out-of-order, we need one more guarantee to
make sure the delete message is not processed too soon. If A sends a message to
B that involves an RRef, we call the RRef on A (the parent RRef) and the RRef on B
(the child RRef).
**G2. Parent RRef will NOT be deleted until the child RRef is confirmed by the
owner.**
In cases 2 and 3, it is possible that the owner has only partial or no knowledge
at all about the RRef fork graph. For example, an RRef could be
constructed on a user, and before the owner receives any RPC call, the
creator user might have already shared the RRef with other users, and those
users could further share the RRef. One invariant is that the fork graph of
any RRef is always a tree, because forking an RRef always
creates a new ``UserRRef`` instance on the callee (except if the callee is the
owner), and hence every RRef has a single parent.
The owner's view on any ``UserRRef`` in the tree has three stages:
.. code::
1) unknown -> 2) known -> 3) deleted.
The owner's view of the entire tree keeps changing. The owner deletes its
``OwnerRRef`` instance when it thinks there are no living ``UserRRef``
instances, i.e.,
when ``OwnerRRef`` is deleted, all ``UserRRef`` instances could be either indeed
deleted or unknown. The dangerous case is when some forks are unknown and others
are deleted.
**G2** trivially guarantees that no parent ``UserRRef`` can be deleted before
the owner knows all of its children ``UserRRef`` instances. However, it is
possible that the child ``UserRRef`` may be deleted before the owner knows its
parent ``UserRRef``.
Consider the following example, where the ``OwnerRRef`` forks to A, then A forks
to Y, and Y forks to Z:
.. code::
OwnerRRef -> A -> Y -> Z
If all of Z's messages, including the delete message, are processed by the
owner before Y's messages. the owner will learn of Z's deletion befores
knowing Y exists. Nevertheless, this does not cause any problem. Because, at least
one of Y's ancestors will be alive (A) and it will
prevent the owner from deleting the ``OwnerRRef``. More specifically, if the
owner does not know Y, A cannot be deleted due to **G2**, and the owner knows A
since it is A's parent.
Things get a little trickier if the RRef is created on a user:
.. code::
OwnerRRef
^
|
A -> Y -> Z
If Z calls :meth:`~torch.distributed.rpc.RRef.to_here` on the ``UserRRef``, the
owner at least knows A when Z is deleted, because otherwise,
:meth:`~torch.distributed.rpc.RRef.to_here` wouldn't finish. If Z does not call
:meth:`~torch.distributed.rpc.RRef.to_here`, it is possible that the owner
receives all messages from Z before any message from A and Y. In this case, as
the real data of the ``OwnerRRef`` has not been created yet, there is nothing to
be deleted either. It is the same as Z does not exist at all. Hence, it's still
OK.
Implementation
--------------
**G1** is implemented by sending out a delete message in ``UserRRef``
destructor. To provide **G2**, the parent ``UserRRef`` is put into a context
whenever it is forked, indexed by the new ``ForkId``. The parent ``UserRRef`` is
only removed from the context when it receives an acknowledgement message (ACK)
from the child, and the child will only send out the ACK when it is confirmed by
the owner.
Protocol Scenarios
^^^^^^^^^^^^^^^^^^
Let's now discuss how the above designs translate to the protocol in four
scenarios.
User Share RRef with Owner as Return Value
------------------------------------------
.. code::
import torch
import torch.distributed.rpc as rpc
# on worker A
rref = rpc.remote('B', torch.add, args=(torch.ones(2), 1))
# say the rref has RRefId 100 and ForkId 1
rref.to_here()
In this case, the ``UserRRef`` is created on the user worker A, then it is
passed to the owner worker B together with the remote message, and then B
creates the ``OwnerRRef``. The method :meth:`~torch.distributed.rpc.remote`
returns immediately, meaning that the ``UserRRef`` can be forked/used before
the owner knows about it.
On the owner, when receiving the :meth:`~torch.distributed.rpc.remote` call, it
will create the ``OwnerRRef``, and returns an ACK to acknowledge ``{100, 1}``
(``RRefId``, ``ForkId``). Only after receiving this ACK, can A delete its
``UserRRef``. This involves both **G1** and **G2**. **G1** is obvious. For
**G2**, the ``OwnerRRef`` is a child of the ``UserRRef``, and the ``UserRRef``
is not deleted until it receives the ACK from the owner.
.. image:: https://user-images\.githubusercontent\.com/16999635/69164772-98181300-0abe-11ea-93a7-9ad9f757cd94.png
:alt: user_to_owner_ret.png
:width: 500 px
The diagram above shows the message flow, where solid arrow contains user
function and dashed arrow are builtin messages. Note that the first two messages
from A to B (:meth:`~torch.distributed.rpc.remote` and
:meth:`~torch.distributed.rpc.RRef.to_here`) may
arrive at B in any order, but the final delete message will only be sent out
when:
- B acknowledges ``UserRRef {100, 1}`` (G2), and
- Python GC agrees to delete the local ``UserRRef`` instance. This occurs when
the RRef is no longer in scope and is eligible for garbage collection.
User Share RRef with Owner as Argument
--------------------------------------
.. code::
import torch
import torch.distributed.rpc as rpc
# on worker A and worker B
def func(rref):
pass
# on worker A
rref = rpc.remote('B', torch.add, args=(torch.ones(2), 1))
# say the rref has RRefId 100 and ForkId 1
rpc.rpc_async('B', func, args=(rref, ))
In this case, after creating the ``UserRRef`` on A, A uses it as an argument in
a followup RPC call to B. A will keep ``UserRRef {100, 1}`` alive until it
receives the acknowledge from B (**G2**, not the return value of the RPC call).
This is necessary because A should not send out the delete message until all
previous messages are received, otherwise, the ``OwnerRRef`` could be
deleted before usage as we do not guarantee message delivery order. This is done
by creating a child ``ForkId`` of RRef, holding them in a map until receives the
owner confirms the child ``ForkId``. The figure below shows the message flow.
.. image:: https://user-images.githubusercontent.com/16999635/69164845-b67e0e80-0abe-11ea-93fa-d24674e75a2b.png
:alt: user_to_owner_arg.png
:width: 500 px
Note that the ``UserRRef`` could be deleted on B before func finishes or even
starts. However this is OK, as at the time B sends out ACK for the child
``ForkId``, it already acquired the ``OwnerRRef`` instance, which would prevent
it been deleted too soon.
Owner Share RRef with User
--------------------------
Owner to user is the simplest case, where the owner can update reference
counting locally, and does not need any additional control message to notify
others. Regarding **G2**, it is same as the parent receives the ACK from the
owner immediately, as the parent is the owner.
.. code::
import torch
import torch.distributed.rpc as RRef, rpc
# on worker B and worker C
def func(rref):
pass
# on worker B, creating a local RRef
rref = RRef("data")
# say the rref has RRefId 100
dist.rpc_async('C', func, args=(rref, ))
.. image:: https://user-images.githubusercontent.com/16999635/69164921-c990de80-0abe-11ea-9250-d32ad00cf4ae.png
:alt: owner_to_user.png
:width: 500 px
The figure above shows the message flow. Note that when the ``OwnerRRef`` exits
scope after the rpc_async call, it will not be deleted, because internally
there is a map to hold it alive if there is any known forks, in which case is
``UserRRef {100, 1}``. (**G2**)
User Share RRef with User
-------------------------
This is the most complicated case where caller user (parent ``UserRRef``),
callee user (child ``UserRRef``), and the owner all need to get involved.
.. code::
import torch
import torch.distributed.rpc as rpc
# on worker A and worker C
def func(rref):
pass
# on worker A
rref = rpc.remote('B', torch.add, args=(torch.ones(2), 1))
# say the rref has RRefId 100 and ForkId 1
rpc.rpc_async('C', func, args=(rref, ))
.. image:: https://user-images.githubusercontent.com/16999635/69164971-d6adcd80-0abe-11ea-971d-6b7af131f0fd.png
:alt: user_to_user.png
:width: 500 px
When C receives the child ``UserRRef`` from A, it sends out a fork request to
the owner B. Later, when the B confirms the ``UserRRef`` on C, C will perform
two actions in parallel: 1) send out the child ACK to A ,and 2) run the user
provided function. During this time, the parent (A) will hold its
``UserRRef {100, 1}`` alive to achieve **G2**.
|