1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977 978 979 980 981 982 983 984 985 986 987 988 989 990 991 992 993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023 1024 1025 1026 1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037 1038 1039 1040 1041 1042 1043 1044 1045 1046 1047 1048 1049 1050 1051 1052 1053 1054 1055 1056 1057 1058 1059 1060 1061 1062 1063 1064 1065 1066 1067 1068 1069 1070 1071 1072 1073 1074 1075 1076 1077 1078 1079 1080 1081 1082 1083 1084 1085 1086 1087 1088 1089 1090 1091 1092 1093 1094 1095 1096 1097 1098 1099 1100 1101 1102 1103 1104 1105 1106 1107 1108 1109 1110 1111 1112 1113 1114 1115 1116 1117 1118 1119 1120 1121 1122 1123 1124 1125 1126 1127 1128 1129 1130 1131 1132 1133 1134 1135 1136 1137 1138 1139 1140 1141 1142 1143 1144 1145 1146 1147 1148 1149 1150 1151 1152 1153 1154 1155 1156 1157 1158 1159 1160 1161 1162 1163 1164 1165 1166 1167 1168 1169 1170 1171 1172 1173 1174 1175 1176 1177 1178 1179 1180 1181 1182 1183 1184 1185 1186 1187 1188 1189 1190 1191 1192 1193 1194 1195 1196 1197 1198 1199 1200 1201 1202 1203 1204 1205 1206 1207 1208 1209 1210 1211 1212 1213 1214 1215 1216 1217 1218 1219 1220 1221 1222 1223 1224 1225 1226 1227 1228 1229 1230 1231 1232 1233 1234 1235 1236 1237 1238 1239 1240 1241 1242 1243 1244 1245 1246 1247 1248 1249 1250 1251 1252 1253 1254 1255 1256 1257 1258 1259 1260 1261 1262 1263 1264 1265 1266 1267 1268 1269 1270 1271 1272 1273 1274 1275 1276 1277 1278 1279 1280 1281 1282 1283 1284 1285 1286 1287 1288 1289 1290 1291 1292 1293 1294 1295 1296 1297 1298 1299 1300 1301 1302 1303 1304 1305 1306 1307 1308 1309 1310 1311 1312 1313 1314 1315 1316 1317 1318 1319 1320 1321 1322 1323 1324 1325 1326 1327 1328 1329 1330 1331 1332 1333 1334 1335 1336 1337 1338 1339 1340 1341 1342 1343 1344 1345 1346 1347 1348 1349 1350 1351 1352 1353 1354 1355 1356 1357 1358 1359 1360 1361 1362 1363 1364 1365 1366 1367 1368 1369 1370 1371 1372 1373 1374 1375 1376 1377 1378 1379 1380 1381 1382 1383 1384 1385 1386 1387 1388 1389 1390 1391 1392 1393 1394 1395 1396 1397 1398 1399 1400 1401 1402 1403 1404 1405 1406 1407 1408 1409 1410 1411 1412 1413 1414 1415 1416 1417 1418 1419 1420 1421 1422 1423 1424 1425 1426 1427 1428 1429 1430 1431 1432 1433 1434 1435 1436 1437 1438 1439 1440 1441 1442 1443 1444 1445 1446 1447 1448 1449 1450 1451 1452 1453 1454 1455 1456 1457 1458 1459 1460 1461 1462 1463 1464 1465 1466 1467 1468 1469 1470 1471 1472 1473 1474 1475 1476 1477 1478 1479 1480 1481 1482 1483 1484 1485 1486 1487 1488 1489 1490 1491 1492 1493 1494 1495 1496 1497 1498 1499 1500 1501 1502 1503 1504 1505 1506 1507 1508 1509 1510 1511 1512 1513 1514 1515 1516 1517 1518 1519 1520 1521 1522 1523 1524 1525 1526 1527 1528 1529 1530 1531 1532 1533 1534 1535 1536 1537 1538 1539 1540 1541 1542 1543 1544 1545 1546 1547 1548 1549 1550 1551 1552 1553 1554 1555 1556 1557 1558 1559 1560 1561 1562 1563 1564 1565 1566 1567 1568 1569 1570 1571 1572 1573 1574 1575 1576 1577 1578 1579 1580 1581 1582 1583 1584 1585 1586 1587 1588 1589 1590 1591 1592 1593 1594 1595 1596 1597 1598 1599 1600 1601 1602 1603 1604 1605 1606 1607 1608 1609 1610 1611 1612 1613 1614 1615 1616 1617 1618 1619 1620 1621 1622 1623 1624 1625 1626 1627 1628 1629 1630 1631 1632 1633 1634 1635 1636 1637 1638 1639 1640 1641 1642 1643 1644 1645 1646 1647 1648 1649 1650 1651 1652 1653 1654 1655 1656 1657 1658 1659 1660 1661 1662 1663 1664 1665 1666 1667 1668 1669 1670 1671 1672 1673 1674 1675 1676 1677 1678 1679 1680 1681 1682 1683 1684 1685 1686 1687 1688 1689 1690 1691 1692 1693 1694 1695 1696 1697 1698 1699 1700 1701 1702 1703 1704 1705 1706 1707 1708 1709 1710 1711 1712 1713 1714 1715 1716 1717 1718 1719 1720 1721 1722 1723 1724 1725 1726 1727 1728 1729 1730 1731 1732 1733 1734 1735 1736 1737 1738 1739 1740 1741 1742 1743 1744 1745 1746 1747 1748 1749 1750 1751 1752 1753 1754 1755 1756 1757 1758 1759 1760 1761 1762 1763 1764 1765 1766 1767 1768 1769 1770 1771 1772 1773 1774 1775 1776 1777 1778 1779 1780 1781 1782 1783 1784 1785 1786 1787 1788 1789 1790 1791 1792 1793 1794 1795 1796 1797 1798 1799 1800 1801 1802 1803 1804 1805 1806 1807 1808 1809 1810 1811 1812 1813 1814 1815 1816 1817 1818 1819 1820 1821 1822 1823 1824 1825 1826 1827 1828 1829 1830 1831 1832 1833 1834 1835 1836 1837 1838 1839 1840 1841 1842 1843 1844 1845 1846 1847 1848 1849 1850 1851 1852 1853 1854 1855 1856 1857 1858 1859 1860 1861 1862 1863 1864 1865 1866 1867 1868 1869 1870 1871 1872 1873 1874 1875 1876 1877 1878 1879 1880 1881 1882 1883 1884 1885 1886 1887 1888 1889 1890 1891 1892 1893 1894 1895 1896 1897 1898 1899 1900 1901 1902 1903 1904 1905 1906 1907 1908 1909 1910 1911 1912 1913 1914 1915 1916 1917 1918 1919 1920 1921 1922 1923 1924 1925 1926 1927 1928 1929 1930 1931 1932 1933 1934 1935 1936 1937 1938 1939 1940 1941 1942 1943 1944 1945 1946 1947 1948 1949 1950 1951 1952 1953 1954 1955 1956 1957 1958 1959 1960 1961 1962 1963 1964 1965 1966 1967 1968 1969 1970 1971 1972 1973 1974 1975 1976 1977 1978 1979 1980 1981 1982 1983 1984 1985 1986 1987 1988 1989 1990 1991 1992 1993 1994 1995 1996 1997 1998 1999 2000 2001 2002 2003 2004 2005 2006 2007 2008 2009 2010 2011 2012 2013 2014 2015 2016 2017 2018 2019 2020 2021 2022 2023 2024 2025 2026 2027 2028 2029 2030 2031 2032 2033 2034 2035 2036 2037 2038 2039 2040 2041 2042 2043 2044 2045 2046 2047 2048 2049 2050 2051 2052 2053 2054 2055 2056 2057 2058 2059 2060 2061 2062 2063 2064 2065 2066 2067 2068 2069 2070 2071 2072 2073 2074 2075 2076 2077 2078 2079 2080 2081 2082 2083 2084 2085 2086 2087 2088 2089 2090 2091 2092 2093 2094 2095 2096 2097 2098 2099 2100 2101 2102 2103 2104 2105 2106 2107 2108 2109 2110 2111 2112
|
// Copyright 2016 The Chromium Authors
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#ifdef UNSAFE_BUFFERS_BUILD
// TODO(crbug.com/351564777): Remove this and convert code to safer constructs.
#pragma allow_unsafe_buffers
#endif
#include "mojo/core/ports/node.h"
#include <string.h>
#include <algorithm>
#include <atomic>
#include <memory>
#include <optional>
#include <utility>
#include <vector>
#include "base/lazy_instance.h"
#include "base/logging.h"
#include "base/memory/ref_counted.h"
#include "base/notreached.h"
#include "base/rand_util.h"
#include "base/synchronization/lock.h"
#include "base/threading/thread_local.h"
#include "build/build_config.h"
#include "mojo/core/ports/event.h"
#include "mojo/core/ports/node_delegate.h"
#include "mojo/core/ports/port_locker.h"
#include "third_party/abseil-cpp/absl/container/inlined_vector.h"
namespace mojo {
namespace core {
namespace ports {
namespace {
constexpr size_t kRandomNameCacheSize = 256;
// Random port name generator which maintains a cache of random bytes to draw
// from. This amortizes the cost of random name generation on platforms where
// RandBytes may have significant per-call overhead.
//
// Note that the use of this cache means one has to be careful about fork()ing
// a process once any port names have been generated, as that behavior can lead
// to collisions between independently generated names in different processes.
class RandomNameGenerator {
public:
RandomNameGenerator() = default;
RandomNameGenerator(const RandomNameGenerator&) = delete;
RandomNameGenerator& operator=(const RandomNameGenerator&) = delete;
~RandomNameGenerator() = default;
PortName GenerateRandomPortName() {
base::AutoLock lock(lock_);
if (cache_index_ == kRandomNameCacheSize) {
base::RandBytes(base::as_writable_byte_span(cache_));
cache_index_ = 0;
}
return cache_[cache_index_++];
}
private:
base::Lock lock_;
PortName cache_[kRandomNameCacheSize];
size_t cache_index_ = kRandomNameCacheSize;
};
base::LazyInstance<RandomNameGenerator>::Leaky g_name_generator =
LAZY_INSTANCE_INITIALIZER;
int DebugError(const char* message, int error_code) {
NOTREACHED() << "Oops: " << message;
}
#define OOPS(x) DebugError(#x, x)
bool CanAcceptMoreMessages(const Port* port) {
// Have we already doled out the last message (i.e., do we expect to NOT
// receive further messages)?
uint64_t next_sequence_num = port->message_queue.next_sequence_num();
if (port->state == Port::kClosed)
return false;
if (port->peer_closed || port->remove_proxy_on_last_message) {
if (port->peer_lost_unexpectedly)
return port->message_queue.HasNextMessage();
if (port->last_sequence_num_to_receive == next_sequence_num - 1)
return false;
}
return true;
}
void GenerateRandomPortName(PortName* name) {
*name = g_name_generator.Get().GenerateRandomPortName();
}
} // namespace
Node::Node(const NodeName& name, NodeDelegate* delegate)
: name_(name), delegate_(this, delegate) {}
Node::~Node() {
if (!ports_.empty())
DLOG(WARNING) << "Unclean shutdown for node " << name_;
}
bool Node::CanShutdownCleanly(ShutdownPolicy policy) {
PortLocker::AssertNoPortsLockedOnCurrentThread();
base::AutoLock ports_lock(ports_lock_);
if (policy == ShutdownPolicy::DONT_ALLOW_LOCAL_PORTS) {
#if DCHECK_IS_ON()
for (auto& entry : ports_) {
DVLOG(2) << "Port " << entry.first << " referencing node "
<< entry.second->peer_node_name << " is blocking shutdown of "
<< "node " << name_ << " (state=" << entry.second->state << ")";
}
#endif
return ports_.empty();
}
DCHECK_EQ(policy, ShutdownPolicy::ALLOW_LOCAL_PORTS);
// NOTE: This is not efficient, though it probably doesn't need to be since
// relatively few ports should be open during shutdown and shutdown doesn't
// need to be blazingly fast.
bool can_shutdown = true;
for (auto& entry : ports_) {
PortRef port_ref(entry.first, entry.second);
SinglePortLocker locker(&port_ref);
auto* port = locker.port();
if (port->peer_node_name != name_ && port->state != Port::kReceiving) {
can_shutdown = false;
#if DCHECK_IS_ON()
DVLOG(2) << "Port " << entry.first << " referencing node "
<< port->peer_node_name << " is blocking shutdown of "
<< "node " << name_ << " (state=" << port->state << ")";
#else
// Exit early when not debugging.
break;
#endif
}
}
return can_shutdown;
}
int Node::GetPort(const PortName& port_name, PortRef* port_ref) {
PortLocker::AssertNoPortsLockedOnCurrentThread();
base::AutoLock lock(ports_lock_);
auto iter = ports_.find(port_name);
if (iter == ports_.end())
return ERROR_PORT_UNKNOWN;
#if BUILDFLAG(IS_ANDROID) && defined(ARCH_CPU_ARM64)
// Workaround for https://crbug.com/665869.
std::atomic_thread_fence(std::memory_order_seq_cst);
#endif
*port_ref = PortRef(port_name, iter->second);
return OK;
}
int Node::CreateUninitializedPort(PortRef* port_ref) {
PortName port_name;
GenerateRandomPortName(&port_name);
scoped_refptr<Port> port(new Port(kInitialSequenceNum, kInitialSequenceNum));
int rv = AddPortWithName(port_name, port);
if (rv != OK)
return rv;
*port_ref = PortRef(port_name, std::move(port));
return OK;
}
int Node::InitializePort(const PortRef& port_ref,
const NodeName& peer_node_name,
const PortName& peer_port_name,
const NodeName& prev_node_name,
const PortName& prev_port_name) {
{
// Must be acquired for UpdatePortPeerAddress below.
PortLocker::AssertNoPortsLockedOnCurrentThread();
base::AutoLock ports_locker(ports_lock_);
SinglePortLocker locker(&port_ref);
auto* port = locker.port();
if (port->state != Port::kUninitialized)
return ERROR_PORT_STATE_UNEXPECTED;
port->state = Port::kReceiving;
UpdatePortPeerAddress(port_ref.name(), port, peer_node_name,
peer_port_name);
port->prev_node_name = prev_node_name;
port->prev_port_name = prev_port_name;
}
delegate_->PortStatusChanged(port_ref);
return OK;
}
int Node::CreatePortPair(PortRef* port0_ref, PortRef* port1_ref) {
int rv;
rv = CreateUninitializedPort(port0_ref);
if (rv != OK)
return rv;
rv = CreateUninitializedPort(port1_ref);
if (rv != OK)
return rv;
rv = InitializePort(*port0_ref, name_, port1_ref->name(), name_,
port1_ref->name());
if (rv != OK)
return rv;
rv = InitializePort(*port1_ref, name_, port0_ref->name(), name_,
port0_ref->name());
if (rv != OK)
return rv;
return OK;
}
int Node::SetUserData(const PortRef& port_ref,
scoped_refptr<UserData> user_data) {
SinglePortLocker locker(&port_ref);
auto* port = locker.port();
if (port->state == Port::kClosed)
return ERROR_PORT_STATE_UNEXPECTED;
port->user_data = std::move(user_data);
return OK;
}
int Node::GetUserData(const PortRef& port_ref,
scoped_refptr<UserData>* user_data) {
SinglePortLocker locker(&port_ref);
auto* port = locker.port();
if (port->state == Port::kClosed)
return ERROR_PORT_STATE_UNEXPECTED;
*user_data = port->user_data;
return OK;
}
int Node::ClosePort(const PortRef& port_ref) {
std::vector<std::unique_ptr<UserMessageEvent>> undelivered_messages;
NodeName peer_node_name;
PortName peer_port_name;
uint64_t sequence_num = 0;
uint64_t last_sequence_num = 0;
bool was_initialized = false;
{
SinglePortLocker locker(&port_ref);
auto* port = locker.port();
switch (port->state) {
case Port::kUninitialized:
break;
case Port::kReceiving:
was_initialized = true;
port->state = Port::kClosed;
// We pass along the sequence number of the last message sent from this
// port to allow the peer to have the opportunity to consume all inbound
// messages before notifying the embedder that this port is closed.
last_sequence_num = port->next_sequence_num_to_send - 1;
peer_node_name = port->peer_node_name;
peer_port_name = port->peer_port_name;
sequence_num = port->next_control_sequence_num_to_send++;
// If the port being closed still has unread messages, then we need to
// take care to close those ports so as to avoid leaking memory.
port->message_queue.TakeAllMessages(&undelivered_messages);
port->TakePendingMessages(undelivered_messages);
break;
default:
return ERROR_PORT_STATE_UNEXPECTED;
}
}
ErasePort(port_ref.name());
if (was_initialized) {
DVLOG(2) << "Sending ObserveClosure from " << port_ref.name() << "@"
<< name_ << " to " << peer_port_name << "@" << peer_node_name;
delegate_->ForwardEvent(
peer_node_name,
std::make_unique<ObserveClosureEvent>(peer_port_name, port_ref.name(),
sequence_num, last_sequence_num));
for (const auto& message : undelivered_messages) {
for (size_t i = 0; i < message->num_ports(); ++i) {
PortRef ref;
if (GetPort(message->ports()[i], &ref) == OK)
ClosePort(ref);
}
}
}
return OK;
}
int Node::GetStatus(const PortRef& port_ref, PortStatus* port_status) {
SinglePortLocker locker(&port_ref);
auto* port = locker.port();
if (port->state != Port::kReceiving)
return ERROR_PORT_STATE_UNEXPECTED;
// TODO(sroettger): include messages pending sender verification here?
port_status->has_messages = port->message_queue.HasNextMessage();
port_status->receiving_messages = CanAcceptMoreMessages(port);
port_status->peer_closed = port->peer_closed;
port_status->peer_remote = port->peer_node_name != name_;
port_status->queued_message_count =
port->message_queue.queued_message_count();
port_status->queued_num_bytes = port->message_queue.queued_num_bytes();
port_status->unacknowledged_message_count =
port->next_sequence_num_to_send - port->last_sequence_num_acknowledged -
1;
return OK;
}
int Node::GetMessage(const PortRef& port_ref,
std::unique_ptr<UserMessageEvent>* message,
MessageFilter* filter) {
*message = nullptr;
DVLOG(4) << "GetMessage for " << port_ref.name() << "@" << name_;
NodeName peer_node_name;
ScopedEvent ack_event;
{
SinglePortLocker locker(&port_ref);
auto* port = locker.port();
// This could also be treated like the port being unknown since the
// embedder should no longer be referring to a port that has been sent.
if (port->state != Port::kReceiving)
return ERROR_PORT_STATE_UNEXPECTED;
// Let the embedder get messages until there are no more before reporting
// that the peer closed its end.
if (!CanAcceptMoreMessages(port))
return ERROR_PORT_PEER_CLOSED;
port->message_queue.GetNextMessage(message, filter);
if (*message &&
(*message)->sequence_num() == port->sequence_num_to_acknowledge) {
peer_node_name = port->peer_node_name;
ack_event = std::make_unique<UserMessageReadAckEvent>(
port->peer_port_name, port_ref.name(),
port->next_control_sequence_num_to_send++,
port->sequence_num_to_acknowledge);
}
if (*message) {
// Message will be passed to the user, no need to block the queue.
port->message_queue.MessageProcessed();
}
}
if (ack_event)
delegate_->ForwardEvent(peer_node_name, std::move(ack_event));
// Allow referenced ports to trigger PortStatusChanged calls.
if (*message) {
for (size_t i = 0; i < (*message)->num_ports(); ++i) {
PortRef new_port_ref;
int rv = GetPort((*message)->ports()[i], &new_port_ref);
DCHECK_EQ(OK, rv) << "Port " << new_port_ref.name() << "@" << name_
<< " does not exist!";
SinglePortLocker locker(&new_port_ref);
DCHECK_EQ(locker.port()->state, Port::kReceiving);
locker.port()->message_queue.set_signalable(true);
}
// The user may retransmit this message from another port. We reset the
// sequence number so that the message will get a new one if that happens.
(*message)->set_sequence_num(0);
}
return OK;
}
int Node::SendUserMessage(const PortRef& port_ref,
std::unique_ptr<UserMessageEvent> message) {
int rv = SendUserMessageInternal(port_ref, &message);
if (rv != OK) {
// If send failed, close all carried ports. Note that we're careful not to
// close the sending port itself if it happened to be one of the encoded
// ports (an invalid but possible condition.)
for (size_t i = 0; i < message->num_ports(); ++i) {
if (message->ports()[i] == port_ref.name())
continue;
PortRef port;
if (GetPort(message->ports()[i], &port) == OK)
ClosePort(port);
}
}
return rv;
}
int Node::SetAcknowledgeRequestInterval(
const PortRef& port_ref,
uint64_t sequence_num_acknowledge_interval) {
NodeName peer_node_name;
PortName peer_port_name;
uint64_t sequence_num_to_request_ack = 0;
uint64_t sequence_num = 0;
{
SinglePortLocker locker(&port_ref);
auto* port = locker.port();
if (port->state != Port::kReceiving)
return ERROR_PORT_STATE_UNEXPECTED;
port->sequence_num_acknowledge_interval = sequence_num_acknowledge_interval;
if (!sequence_num_acknowledge_interval)
return OK;
peer_node_name = port->peer_node_name;
peer_port_name = port->peer_port_name;
sequence_num_to_request_ack = port->last_sequence_num_acknowledged +
sequence_num_acknowledge_interval;
sequence_num = port->next_control_sequence_num_to_send++;
}
delegate_->ForwardEvent(peer_node_name,
std::make_unique<UserMessageReadAckRequestEvent>(
peer_port_name, port_ref.name(), sequence_num,
sequence_num_to_request_ack));
return OK;
}
bool Node::IsEventFromPreviousPeer(const Event& event) {
switch (event.type()) {
case Event::Type::kUserMessage:
return true;
case Event::Type::kPortAccepted:
// PortAccepted is sent by the next peer
return false;
case Event::Type::kObserveProxy:
// ObserveProxy with an invalid port name is a broadcast event
return event.port_name() != kInvalidPortName;
case Event::Type::kObserveProxyAck:
return true;
case Event::Type::kObserveClosure:
return true;
case Event::Type::kMergePort:
// MergePort is not from the previous peer
return false;
case Event::Type::kUserMessageReadAckRequest:
return true;
case Event::Type::kUserMessageReadAck:
return true;
case Event::Type::kUpdatePreviousPeer:
return true;
default:
// No need to check unknown message types since AcceptPeer will return
// an error.
return false;
}
}
int Node::AcceptEventInternal(const PortRef& port_ref,
const NodeName& from_node,
ScopedEvent event) {
switch (event->type()) {
case Event::Type::kUserMessage:
return OnUserMessage(port_ref, from_node,
Event::Cast<UserMessageEvent>(&event));
case Event::Type::kPortAccepted:
return OnPortAccepted(port_ref, Event::Cast<PortAcceptedEvent>(&event));
case Event::Type::kObserveProxy:
return OnObserveProxy(port_ref, Event::Cast<ObserveProxyEvent>(&event));
case Event::Type::kObserveProxyAck:
return OnObserveProxyAck(port_ref,
Event::Cast<ObserveProxyAckEvent>(&event));
case Event::Type::kObserveClosure:
return OnObserveClosure(port_ref,
Event::Cast<ObserveClosureEvent>(&event));
case Event::Type::kMergePort:
return OnMergePort(port_ref, Event::Cast<MergePortEvent>(&event));
case Event::Type::kUserMessageReadAckRequest:
return OnUserMessageReadAckRequest(
port_ref, Event::Cast<UserMessageReadAckRequestEvent>(&event));
case Event::Type::kUserMessageReadAck:
return OnUserMessageReadAck(port_ref,
Event::Cast<UserMessageReadAckEvent>(&event));
case Event::Type::kUpdatePreviousPeer:
return OnUpdatePreviousPeer(port_ref,
Event::Cast<UpdatePreviousPeerEvent>(&event));
}
return OOPS(ERROR_NOT_IMPLEMENTED);
}
int Node::AcceptEvent(const NodeName& from_node, ScopedEvent event) {
PortRef port_ref;
GetPort(event->port_name(), &port_ref);
#ifndef MOJO_BACKWARDS_COMPAT
DVLOG(2) << "AcceptEvent type: " << event->type() << ", "
<< event->from_port() << "@" << from_node << " => "
<< port_ref.name() << "@" << name_
<< " seq nr: " << event->control_sequence_num() << " port valid? "
<< port_ref.is_valid();
if (!IsEventFromPreviousPeer(*event)) {
DCHECK_EQ(event->control_sequence_num(), kInvalidSequenceNum);
// Some events are not coming from the previous peer, e.g. broadcasts or
// PortAccepted events. No need to check the sequence number or sender.
return AcceptEventInternal(port_ref, from_node, std::move(event));
}
DCHECK_NE(event->control_sequence_num(), kInvalidSequenceNum);
if (!port_ref.is_valid()) {
// If we don't have a valid port, there's nothing for us to check. However,
// we pass the ref on to AcceptEventInternal to make sure there's no race
// where it becomes valid and we skipped the peer check.
return AcceptEventInternal(port_ref, from_node, std::move(event));
}
// Before processing the event, verify the sender and sequence number.
{
SinglePortLocker locker(&port_ref);
auto* port = locker.port();
if (!port->IsNextEvent(from_node, *event)) {
DVLOG(2) << "Buffering event (type " << event->type()
<< "): " << event->from_port() << "@" << from_node << " => "
<< port_ref.name() << "@" << name_
<< " seq nr: " << event->control_sequence_num() << " / "
<< port->next_control_sequence_num_to_receive << ", want "
<< port->prev_port_name << "@" << port->prev_node_name;
port->BufferEvent(from_node, std::move(event));
return OK;
}
}
int ret = AcceptEventInternal(port_ref, from_node, std::move(event));
// More events might have been enqueued during processing.
while (true) {
ScopedEvent next_event;
NodeName next_from_node;
{
SinglePortLocker locker(&port_ref);
auto* port = locker.port();
// We always increment the control sequence number after we finished
// processing the event. That way we ensure that the events are handled
// in order without keeping a lock the whole time.
port->next_control_sequence_num_to_receive++;
port->NextEvent(&next_from_node, &next_event);
if (next_event) {
DVLOG(2) << "Handling buffered event (type " << next_event->type()
<< "): " << next_event->from_port() << "@" << next_from_node
<< " => " << port_ref.name() << "@" << name_
<< " seq nr: " << next_event->control_sequence_num() << " / "
<< port->next_control_sequence_num_to_receive;
}
}
if (!next_event)
break;
AcceptEventInternal(port_ref, next_from_node, std::move(next_event));
}
return ret;
#else
return AcceptEventInternal(port_ref, from_node, std::move(event));
#endif
}
int Node::MergePorts(const PortRef& port_ref,
const NodeName& destination_node_name,
const PortName& destination_port_name) {
PortName new_port_name;
Event::PortDescriptor new_port_descriptor;
PendingUpdatePreviousPeer pending_update_event{.from_port = port_ref.name()};
{
// Must be held for ConvertToProxy.
PortLocker::AssertNoPortsLockedOnCurrentThread();
base::AutoLock ports_locker(ports_lock_);
SinglePortLocker locker(&port_ref);
DVLOG(1) << "Sending MergePort from " << port_ref.name() << "@" << name_
<< " to " << destination_port_name << "@" << destination_node_name;
// Send the port-to-merge over to the destination node so it can be merged
// into the port cycle atomically there.
new_port_name = port_ref.name();
ConvertToProxy(locker.port(), destination_node_name, &new_port_name,
&new_port_descriptor, &pending_update_event);
}
#ifndef MOJO_BACKWARDS_COMPAT
delegate_->ForwardEvent(
pending_update_event.receiver,
std::make_unique<UpdatePreviousPeerEvent>(
pending_update_event.port, pending_update_event.from_port,
pending_update_event.sequence_num, pending_update_event.new_prev_node,
pending_update_event.new_prev_port));
#endif
if (new_port_descriptor.peer_node_name == name_ &&
destination_node_name != name_) {
// Ensure that the locally retained peer of the new proxy gets a status
// update so it notices that its peer is now remote.
PortRef local_peer;
if (GetPort(new_port_descriptor.peer_port_name, &local_peer) == OK)
delegate_->PortStatusChanged(local_peer);
}
delegate_->ForwardEvent(
destination_node_name,
std::make_unique<MergePortEvent>(destination_port_name, kInvalidPortName,
kInvalidSequenceNum, new_port_name,
new_port_descriptor));
return OK;
}
int Node::MergeLocalPorts(const PortRef& port0_ref, const PortRef& port1_ref) {
DVLOG(1) << "Merging local ports " << port0_ref.name() << "@" << name_
<< " and " << port1_ref.name() << "@" << name_;
return MergePortsInternal(port0_ref, port1_ref,
true /* allow_close_on_bad_state */);
}
int Node::LostConnectionToNode(const NodeName& node_name) {
// We can no longer send events to the given node. We also can't expect any
// PortAccepted events.
DVLOG(1) << "Observing lost connection from node " << name_ << " to node "
<< node_name;
DestroyAllPortsWithPeer(node_name, kInvalidPortName);
return OK;
}
int Node::OnUserMessage(const PortRef& port_ref,
const NodeName& from_node,
std::unique_ptr<UserMessageEvent> message) {
#if DCHECK_IS_ON()
std::ostringstream ports_buf;
for (size_t i = 0; i < message->num_ports(); ++i) {
if (i > 0)
ports_buf << ",";
ports_buf << message->ports()[i];
}
DVLOG(4) << "OnUserMessage " << message->sequence_num()
<< " [ports=" << ports_buf.str() << "] at " << message->port_name()
<< "@" << name_;
#endif
// Even if this port does not exist, cannot receive anymore messages or is
// buffering or proxying messages, we still need these ports to be bound to
// this node. When the message is forwarded, these ports will get transferred
// following the usual method. If the message cannot be accepted, then the
// newly bound ports will simply be closed.
if (from_node != name_) {
for (size_t i = 0; i < message->num_ports(); ++i) {
Event::PortDescriptor& descriptor = message->port_descriptors()[i];
int rv = AcceptPort(message->ports()[i], descriptor);
if (rv != OK)
return rv;
}
}
bool has_next_message = false;
bool message_accepted = false;
bool should_forward_messages = false;
if (port_ref.is_valid()) {
SinglePortLocker locker(&port_ref);
auto* port = locker.port();
// Reject spurious messages if we've already received the last expected
// message.
if (CanAcceptMoreMessages(port)) {
message_accepted = true;
port->message_queue.AcceptMessage(std::move(message), &has_next_message);
if (port->state == Port::kBuffering) {
has_next_message = false;
} else if (port->state == Port::kProxying) {
has_next_message = false;
should_forward_messages = true;
}
}
}
if (should_forward_messages) {
int rv = ForwardUserMessagesFromProxy(port_ref);
if (rv != OK)
return rv;
TryRemoveProxy(port_ref);
}
if (!message_accepted) {
DVLOG(2) << "Message not accepted!\n";
// Close all newly accepted ports as they are effectively orphaned.
for (size_t i = 0; i < message->num_ports(); ++i) {
PortRef attached_port_ref;
if (GetPort(message->ports()[i], &attached_port_ref) == OK) {
ClosePort(attached_port_ref);
} else {
DLOG(WARNING) << "Cannot close non-existent port!\n";
}
}
} else if (has_next_message) {
delegate_->PortStatusChanged(port_ref);
}
return OK;
}
int Node::OnPortAccepted(const PortRef& port_ref,
std::unique_ptr<PortAcceptedEvent> event) {
if (!port_ref.is_valid())
return ERROR_PORT_UNKNOWN;
#if DCHECK_IS_ON()
{
SinglePortLocker locker(&port_ref);
DVLOG(2) << "PortAccepted at " << port_ref.name() << "@" << name_
<< " pointing to " << locker.port()->peer_port_name << "@"
<< locker.port()->peer_node_name;
}
#endif
return BeginProxying(port_ref);
}
int Node::OnObserveProxy(const PortRef& port_ref,
std::unique_ptr<ObserveProxyEvent> event) {
if (event->port_name() == kInvalidPortName) {
// An ObserveProxy with an invalid target port name is a broadcast used to
// inform ports when their peer (which was itself a proxy) has become
// defunct due to unexpected node disconnection.
//
// Receiving ports affected by this treat it as equivalent to peer closure.
// Proxies affected by this can be removed and will in turn broadcast their
// own death with a similar message.
DCHECK_EQ(event->proxy_target_node_name(), kInvalidNodeName);
DCHECK_EQ(event->proxy_target_port_name(), kInvalidPortName);
DestroyAllPortsWithPeer(event->proxy_node_name(), event->proxy_port_name());
return OK;
}
// The port may have already been closed locally, in which case the
// ObserveClosure message will contain the last_sequence_num field.
// We can then silently ignore this message.
if (!port_ref.is_valid()) {
DVLOG(1) << "ObserveProxy: " << event->port_name() << "@" << name_
<< " not found";
return OK;
}
DVLOG(2) << "ObserveProxy at " << port_ref.name() << "@" << name_
<< ", proxy at " << event->proxy_port_name() << "@"
<< event->proxy_node_name() << " pointing to "
<< event->proxy_target_port_name() << "@"
<< event->proxy_target_node_name();
bool peer_changed = false;
ScopedEvent event_to_forward;
NodeName event_target_node;
{
// Must be acquired for UpdatePortPeerAddress below.
PortLocker::AssertNoPortsLockedOnCurrentThread();
base::AutoLock ports_locker(ports_lock_);
SinglePortLocker locker(&port_ref);
auto* port = locker.port();
if (port->peer_node_name == event->proxy_node_name() &&
port->peer_port_name == event->proxy_port_name()) {
if (port->state == Port::kReceiving) {
// Updating the port peer will reset the sequence num. Grab it now;
uint64_t sequence_num = port->next_control_sequence_num_to_send++;
UpdatePortPeerAddress(port_ref.name(), port,
event->proxy_target_node_name(),
event->proxy_target_port_name());
event_target_node = event->proxy_node_name();
event_to_forward = std::make_unique<ObserveProxyAckEvent>(
event->proxy_port_name(), port_ref.name(), sequence_num,
port->next_sequence_num_to_send - 1);
peer_changed = true;
DVLOG(2) << "Forwarding ObserveProxyAck from " << event->port_name()
<< "@" << name_ << " to " << event->proxy_port_name() << "@"
<< event_target_node;
} else {
// As a proxy ourselves, we don't know how to honor the ObserveProxy
// event or to populate the last_sequence_num field of ObserveProxyAck.
// Afterall, another port could be sending messages to our peer now
// that we've sent out our own ObserveProxy event. Instead, we will
// send an ObserveProxyAck indicating that the ObserveProxy event
// should be re-sent (last_sequence_num set to kInvalidSequenceNum).
// However, this has to be done after we are removed as a proxy.
// Otherwise, we might just find ourselves back here again, which
// would be akin to a busy loop.
DVLOG(2) << "Delaying ObserveProxyAck to " << event->proxy_port_name()
<< "@" << event->proxy_node_name();
port->send_on_proxy_removal =
std::make_unique<std::pair<NodeName, ScopedEvent>>(
event->proxy_node_name(),
std::make_unique<ObserveProxyAckEvent>(
event->proxy_port_name(), port_ref.name(),
kInvalidSequenceNum, kInvalidSequenceNum));
}
} else {
// Forward this event along to our peer. Eventually, it should find the
// port referring to the proxy.
event_target_node = port->peer_node_name;
event->set_port_name(port->peer_port_name);
event->set_from_port(port_ref.name());
event->set_control_sequence_num(
port->next_control_sequence_num_to_send++);
if (port->state == Port::kBuffering) {
port->control_message_queue.push({event_target_node, std::move(event)});
} else {
event_to_forward = std::move(event);
}
}
}
if (event_to_forward)
delegate_->ForwardEvent(event_target_node, std::move(event_to_forward));
if (peer_changed) {
// Re-send ack and/or ack requests, as the previous peer proxy may not have
// forwarded the previous request before it died.
MaybeResendAck(port_ref);
MaybeResendAckRequest(port_ref);
delegate_->PortStatusChanged(port_ref);
}
return OK;
}
int Node::OnObserveProxyAck(const PortRef& port_ref,
std::unique_ptr<ObserveProxyAckEvent> event) {
DVLOG(2) << "ObserveProxyAck at " << event->port_name() << "@" << name_
<< " (last_sequence_num=" << event->last_sequence_num() << ")";
if (!port_ref.is_valid())
return ERROR_PORT_UNKNOWN; // The port may have observed closure first.
bool try_remove_proxy_immediately;
bool erase_port = false;
{
SinglePortLocker locker(&port_ref);
auto* port = locker.port();
if (port->state == Port::kProxying) {
// If the last sequence number is invalid, this is a signal that we need
// to retransmit the ObserveProxy event for this port rather than flagging
// the the proxy for removal ASAP.
try_remove_proxy_immediately =
event->last_sequence_num() != kInvalidSequenceNum;
if (try_remove_proxy_immediately) {
// We can now remove this port once we have received and forwarded the
// last message addressed to this port.
port->remove_proxy_on_last_message = true;
port->last_sequence_num_to_receive = event->last_sequence_num();
}
} else if (port->state == Port::kClosed) {
erase_port = true;
} else {
return OOPS(ERROR_PORT_STATE_UNEXPECTED);
}
}
if (erase_port) {
ErasePort(port_ref.name());
return OK;
}
if (try_remove_proxy_immediately)
TryRemoveProxy(port_ref);
else
InitiateProxyRemoval(port_ref);
return OK;
}
int Node::OnObserveClosure(const PortRef& port_ref,
std::unique_ptr<ObserveClosureEvent> event) {
// OK if the port doesn't exist, as it may have been closed already.
if (!port_ref.is_valid())
return OK;
// This message tells the port that it should no longer expect more messages
// beyond last_sequence_num. This message is forwarded along until we reach
// the receiving end, and this message serves as an equivalent to
// ObserveProxyAck.
bool notify_delegate = false;
NodeName peer_node_name;
bool try_remove_proxy = false;
bool erase_port = false;
{
SinglePortLocker locker(&port_ref);
auto* port = locker.port();
port->peer_closed = true;
port->last_sequence_num_to_receive = event->last_sequence_num();
DVLOG(2) << "ObserveClosure at " << port_ref.name() << "@" << name_
<< " (state=" << port->state << ") pointing to "
<< port->peer_port_name << "@" << port->peer_node_name
<< " (last_sequence_num=" << event->last_sequence_num() << ")";
// We always forward ObserveClosure, even beyond the receiving port which
// cares about it. This ensures that any dead-end proxies beyond that port
// are notified to remove themselves.
if (port->state == Port::kReceiving) {
notify_delegate = true;
// When forwarding along the other half of the port cycle, this will only
// reach dead-end proxies. Tell them we've sent our last message so they
// can go away.
//
// TODO: Repurposing ObserveClosure for this has the desired result but
// may be semantically confusing since the forwarding port is not actually
// closed. Consider replacing this with a new event type.
event->set_last_sequence_num(port->next_sequence_num_to_send - 1);
// Treat the closure as an acknowledge that all sent messages have been
// read from the other end.
port->last_sequence_num_acknowledged =
port->next_sequence_num_to_send - 1;
} else if (port->state == Port::kClosed) {
// This is the ack for a closed proxy port notification. Now it's fine to
// delete the port.
erase_port = true;
} else {
// We haven't yet reached the receiving peer of the closed port, so we'll
// forward the message along as-is.
// See about removing the port if it is a proxy as our peer won't be able
// to participate in proxy removal.
port->remove_proxy_on_last_message = true;
if (port->state == Port::kProxying)
try_remove_proxy = true;
}
DVLOG(2) << "Forwarding ObserveClosure from " << port_ref.name() << "@"
<< name_ << " to peer " << port->peer_port_name << "@"
<< port->peer_node_name
<< " (last_sequence_num=" << event->last_sequence_num() << ")";
event->set_port_name(port->peer_port_name);
event->set_from_port(port_ref.name());
event->set_control_sequence_num(port->next_control_sequence_num_to_send++);
peer_node_name = port->peer_node_name;
if (port->state == Port::kBuffering) {
port->control_message_queue.push({peer_node_name, std::move(event)});
}
}
if (try_remove_proxy)
TryRemoveProxy(port_ref);
if (erase_port)
ErasePort(port_ref.name());
if (event)
delegate_->ForwardEvent(peer_node_name, std::move(event));
if (notify_delegate)
delegate_->PortStatusChanged(port_ref);
return OK;
}
int Node::OnMergePort(const PortRef& port_ref,
std::unique_ptr<MergePortEvent> event) {
DVLOG(1) << "MergePort at " << port_ref.name() << "@" << name_
<< " merging with proxy " << event->new_port_name() << "@" << name_
<< " pointing to " << event->new_port_descriptor().peer_port_name
<< "@" << event->new_port_descriptor().peer_node_name
<< " referred by "
<< event->new_port_descriptor().referring_port_name << "@"
<< event->new_port_descriptor().referring_node_name;
// Accept the new port. This is now the receiving end of the other port cycle
// to be merged with ours. Note that we always attempt to accept the new port
// first as otherwise its peer receiving port could be left stranded
// indefinitely.
if (AcceptPort(event->new_port_name(), event->new_port_descriptor()) != OK) {
if (port_ref.is_valid())
ClosePort(port_ref);
return ERROR_PORT_STATE_UNEXPECTED;
}
PortRef new_port_ref;
GetPort(event->new_port_name(), &new_port_ref);
if (!port_ref.is_valid() && new_port_ref.is_valid()) {
ClosePort(new_port_ref);
return ERROR_PORT_UNKNOWN;
} else if (port_ref.is_valid() && !new_port_ref.is_valid()) {
ClosePort(port_ref);
return ERROR_PORT_UNKNOWN;
}
bool peer_allowed = true;
{
SinglePortLocker locker(&port_ref);
auto* port = locker.port();
if (!port->pending_merge_peer) {
LOG(ERROR) << "MergePort called on unexpected port: "
<< event->port_name();
peer_allowed = false;
} else {
port->pending_merge_peer = false;
}
}
if (!peer_allowed) {
ClosePort(port_ref);
return ERROR_PORT_STATE_UNEXPECTED;
}
return MergePortsInternal(port_ref, new_port_ref,
false /* allow_close_on_bad_state */);
}
int Node::OnUserMessageReadAckRequest(
const PortRef& port_ref,
std::unique_ptr<UserMessageReadAckRequestEvent> event) {
DVLOG(1) << "AckRequest " << port_ref.name() << "@" << name_ << " sequence "
<< event->sequence_num_to_acknowledge();
if (!port_ref.is_valid())
return ERROR_PORT_UNKNOWN;
NodeName peer_node_name;
std::unique_ptr<Event> event_to_send;
{
SinglePortLocker locker(&port_ref);
auto* port = locker.port();
peer_node_name = port->peer_node_name;
if (port->state == Port::kProxying) {
// Proxies simply forward the ack request to their peer.
event->set_port_name(port->peer_port_name);
event->set_from_port(port_ref.name());
event->set_control_sequence_num(
port->next_control_sequence_num_to_send++);
event_to_send = std::move(event);
} else {
uint64_t current_sequence_num =
port->message_queue.next_sequence_num() - 1;
// Either this is requesting an ack for a sequence number already read, or
// else for a sequence number that is yet to be read.
if (current_sequence_num >= event->sequence_num_to_acknowledge()) {
// If the current sequence number to read already exceeds the ack
// request, send an ack immediately.
event_to_send = std::make_unique<UserMessageReadAckEvent>(
port->peer_port_name, port_ref.name(),
port->next_control_sequence_num_to_send++, current_sequence_num);
if (port->state == Port::kBuffering) {
port->control_message_queue.push(
{peer_node_name, std::move(event_to_send)});
}
// This might be a late or duplicate acknowledge request, that's
// requesting acknowledge for an already read message. There may already
// have been a request for future reads, so take care not to back up
// the requested acknowledge counter.
if (current_sequence_num > port->sequence_num_to_acknowledge)
port->sequence_num_to_acknowledge = current_sequence_num;
} else {
// This is request to ack a sequence number that hasn't been read yet.
// The state of the port can either be that it already has a
// future-requested ack, or not. Because ack requests aren't guaranteed
// to arrive in order, store the earlier of the current queued request
// and the new one, if one was already requested.
bool has_queued_ack_request =
port->sequence_num_to_acknowledge > current_sequence_num;
if (!has_queued_ack_request ||
port->sequence_num_to_acknowledge >
event->sequence_num_to_acknowledge()) {
port->sequence_num_to_acknowledge =
event->sequence_num_to_acknowledge();
}
return OK;
}
}
}
if (event_to_send)
delegate_->ForwardEvent(peer_node_name, std::move(event_to_send));
return OK;
}
int Node::OnUserMessageReadAck(const PortRef& port_ref,
std::unique_ptr<UserMessageReadAckEvent> event) {
DVLOG(1) << "Acknowledge " << port_ref.name() << "@" << name_ << " sequence "
<< event->sequence_num_acknowledged();
NodeName peer_node_name;
ScopedEvent ack_request_event;
if (port_ref.is_valid()) {
SinglePortLocker locker(&port_ref);
auto* port = locker.port();
if (event->sequence_num_acknowledged() >= port->next_sequence_num_to_send) {
// TODO(http://crbug.com/980952): This is a malformed event.
// This could return a new error "ERROR_MALFORMED_EVENT" which the
// delegate could use as a signal to drop the peer node.
return OK;
}
// Keep the largest acknowledge seen.
if (event->sequence_num_acknowledged() <=
port->last_sequence_num_acknowledged) {
// The acknowledge was late or a duplicate, it's safe to ignore it.
return OK;
}
port->last_sequence_num_acknowledged = event->sequence_num_acknowledged();
// Send another ack request if the interval is non-zero and the peer has
// not been closed.
if (port->sequence_num_acknowledge_interval && !port->peer_closed) {
peer_node_name = port->peer_node_name;
ack_request_event = std::make_unique<UserMessageReadAckRequestEvent>(
port->peer_port_name, port_ref.name(),
port->next_control_sequence_num_to_send++,
port->last_sequence_num_acknowledged +
port->sequence_num_acknowledge_interval);
DCHECK_NE(port->state, Port::kBuffering);
}
}
if (ack_request_event)
delegate_->ForwardEvent(peer_node_name, std::move(ack_request_event));
if (port_ref.is_valid())
delegate_->PortStatusChanged(port_ref);
return OK;
}
int Node::OnUpdatePreviousPeer(const PortRef& port_ref,
std::unique_ptr<UpdatePreviousPeerEvent> event) {
DVLOG(1) << "OnUpdatePreviousPeer port: " << event->port_name()
<< " changing to " << event->new_node_name()
<< ", port: " << event->from_port() << " => "
<< event->new_port_name();
if (!port_ref.is_valid()) {
return ERROR_PORT_UNKNOWN;
}
const NodeName& new_node_name = event->new_node_name();
const PortName& new_port_name = event->new_port_name();
DCHECK_NE(new_node_name, kInvalidNodeName);
DCHECK_NE(new_port_name, kInvalidPortName);
if (new_node_name == kInvalidNodeName || new_port_name == kInvalidPortName) {
return ERROR_PORT_STATE_UNEXPECTED;
}
{
SinglePortLocker locker(&port_ref);
auto* port = locker.port();
port->prev_node_name = new_node_name;
port->prev_port_name = new_port_name;
// The sequence number will get incremented after this event has been
// handled.
port->next_control_sequence_num_to_receive = kInitialSequenceNum - 1;
}
return OK;
}
int Node::AddPortWithName(const PortName& port_name, scoped_refptr<Port> port) {
PortLocker::AssertNoPortsLockedOnCurrentThread();
base::AutoLock lock(ports_lock_);
if (port->peer_port_name != kInvalidPortName) {
DCHECK_NE(kInvalidNodeName, port->peer_node_name);
peer_port_maps_[port->peer_node_name][port->peer_port_name].emplace(
port_name, PortRef(port_name, port));
}
if (!ports_.emplace(port_name, std::move(port)).second)
return OOPS(ERROR_PORT_EXISTS); // Suggests a bad UUID generator.
DVLOG(2) << "Created port " << port_name << "@" << name_;
return OK;
}
void Node::ErasePort(const PortName& port_name) {
PortLocker::AssertNoPortsLockedOnCurrentThread();
scoped_refptr<Port> port;
{
base::AutoLock lock(ports_lock_);
auto it = ports_.find(port_name);
if (it == ports_.end())
return;
port = std::move(it->second);
ports_.erase(it);
RemoveFromPeerPortMap(port_name, port.get());
}
// NOTE: We are careful not to release the port's messages while holding any
// locks, since they may run arbitrary user code upon destruction.
std::vector<std::unique_ptr<UserMessageEvent>> messages;
{
PortRef port_ref(port_name, std::move(port));
SinglePortLocker locker(&port_ref);
locker.port()->message_queue.TakeAllMessages(&messages);
}
DVLOG(2) << "Deleted port " << port_name << "@" << name_;
}
int Node::SendUserMessageInternal(const PortRef& port_ref,
std::unique_ptr<UserMessageEvent>* message) {
std::unique_ptr<UserMessageEvent>& m = *message;
m->set_from_port(port_ref.name());
for (size_t i = 0; i < m->num_ports(); ++i) {
if (m->ports()[i] == port_ref.name())
return ERROR_PORT_CANNOT_SEND_SELF;
}
NodeName target_node;
int rv = PrepareToForwardUserMessage(port_ref, Port::kReceiving,
false /* ignore_closed_peer */, m.get(),
&target_node);
if (rv != OK)
return rv;
// Beyond this point there's no sense in returning anything but OK. Even if
// message forwarding or acceptance fails, there's nothing the embedder can
// do to recover. Assume that failure beyond this point must be treated as a
// transport failure.
DCHECK_NE(kInvalidNodeName, target_node);
if (target_node != name_) {
delegate_->ForwardEvent(target_node, std::move(m));
return OK;
}
int accept_result = AcceptEvent(name_, std::move(m));
if (accept_result != OK) {
// See comment above for why we don't return an error in this case.
DVLOG(2) << "AcceptEvent failed: " << accept_result;
}
return OK;
}
int Node::MergePortsInternal(const PortRef& port0_ref,
const PortRef& port1_ref,
bool allow_close_on_bad_state) {
const PortRef* port_refs[2] = {&port0_ref, &port1_ref};
PendingUpdatePreviousPeer pending_update_events[2];
uint64_t original_sequence_number[2];
{
// Needed to swap peer map entries below.
PortLocker::AssertNoPortsLockedOnCurrentThread();
base::ReleasableAutoLock ports_locker(&ports_lock_);
std::optional<PortLocker> locker(std::in_place, port_refs, 2);
auto* port0 = locker->GetPort(port0_ref);
auto* port1 = locker->GetPort(port1_ref);
// There are several conditions which must be met before we'll consider
// merging two ports:
//
// - They must both be in the kReceiving state
// - They must not be each other's peer
// - They must have never sent a user message
//
// If any of these criteria are not met, we fail early.
if (port0->state != Port::kReceiving || port1->state != Port::kReceiving ||
(port0->peer_node_name == name_ &&
port0->peer_port_name == port1_ref.name()) ||
(port1->peer_node_name == name_ &&
port1->peer_port_name == port0_ref.name()) ||
port0->next_sequence_num_to_send != kInitialSequenceNum ||
port1->next_sequence_num_to_send != kInitialSequenceNum) {
// On failure, we only close a port if it was at least properly in the
// |kReceiving| state. This avoids getting the system in an inconsistent
// state by e.g. closing a proxy abruptly.
//
// Note that we must release the port locks before closing ports.
const bool close_port0 =
port0->state == Port::kReceiving || allow_close_on_bad_state;
const bool close_port1 =
port1->state == Port::kReceiving || allow_close_on_bad_state;
locker.reset();
ports_locker.Release();
if (close_port0)
ClosePort(port0_ref);
if (close_port1)
ClosePort(port1_ref);
return ERROR_PORT_STATE_UNEXPECTED;
}
pending_update_events[0] = {
.receiver = port0->peer_node_name,
.port = port0->peer_port_name,
.from_port = port0_ref.name(),
.sequence_num = port0->next_control_sequence_num_to_send++,
.new_prev_node = name_,
.new_prev_port = port1_ref.name()};
pending_update_events[1] = {
.receiver = port1->peer_node_name,
.port = port1->peer_port_name,
.from_port = port1_ref.name(),
.sequence_num = port1->next_control_sequence_num_to_send++,
.new_prev_node = name_,
.new_prev_port = port0_ref.name()};
// Swap the ports' peer information and switch them both to proxying mode.
SwapPortPeers(port0_ref.name(), port0, port1_ref.name(), port1);
port0->state = Port::kProxying;
port1->state = Port::kProxying;
original_sequence_number[0] = port0->next_control_sequence_num_to_send;
original_sequence_number[1] = port1->next_control_sequence_num_to_send;
port0->next_control_sequence_num_to_send = kInitialSequenceNum;
port1->next_control_sequence_num_to_send = kInitialSequenceNum;
if (port0->peer_closed)
port0->remove_proxy_on_last_message = true;
if (port1->peer_closed)
port1->remove_proxy_on_last_message = true;
}
// Flush any queued messages from the new proxies and, if successful, complete
// the merge by initiating proxy removals.
if (ForwardUserMessagesFromProxy(port0_ref) == OK &&
ForwardUserMessagesFromProxy(port1_ref) == OK) {
#ifndef MOJO_BACKWARDS_COMPAT
// Send the prev peer updates out after the forwarding the user messages
// succeeded. Otherwise, we won't be able to restore the previous state
// below.
for (const auto& pending_update_event : pending_update_events) {
delegate_->ForwardEvent(
pending_update_event.receiver,
std::make_unique<UpdatePreviousPeerEvent>(
pending_update_event.port, pending_update_event.from_port,
pending_update_event.sequence_num,
pending_update_event.new_prev_node,
pending_update_event.new_prev_port));
}
#endif
for (const auto* const port_ref : port_refs) {
bool try_remove_proxy_immediately = false;
ScopedEvent closure_event;
NodeName closure_event_target_node;
{
SinglePortLocker locker(port_ref);
auto* port = locker.port();
DCHECK_EQ(port->state, Port::kProxying);
try_remove_proxy_immediately = port->remove_proxy_on_last_message;
if (try_remove_proxy_immediately || port->peer_closed) {
// If either end of the port cycle is closed, we propagate an
// ObserveClosure event.
closure_event_target_node = port->peer_node_name;
closure_event = std::make_unique<ObserveClosureEvent>(
port->peer_port_name, port_ref->name(),
port->next_control_sequence_num_to_send++,
port->last_sequence_num_to_receive);
}
}
if (try_remove_proxy_immediately)
TryRemoveProxy(*port_ref);
else
InitiateProxyRemoval(*port_ref);
if (closure_event) {
delegate_->ForwardEvent(closure_event_target_node,
std::move(closure_event));
}
}
return OK;
}
// If we failed to forward proxied messages, we keep the system in a
// consistent state by undoing the peer swap and closing the ports.
{
PortLocker::AssertNoPortsLockedOnCurrentThread();
base::AutoLock ports_locker(ports_lock_);
PortLocker locker(port_refs, 2);
auto* port0 = locker.GetPort(port0_ref);
auto* port1 = locker.GetPort(port1_ref);
SwapPortPeers(port0_ref.name(), port0, port1_ref.name(), port1);
port0->remove_proxy_on_last_message = false;
port1->remove_proxy_on_last_message = false;
DCHECK_EQ(Port::kProxying, port0->state);
DCHECK_EQ(Port::kProxying, port1->state);
port0->state = Port::kReceiving;
port1->state = Port::kReceiving;
port0->next_control_sequence_num_to_send = original_sequence_number[0];
port1->next_control_sequence_num_to_send = original_sequence_number[1];
}
ClosePort(port0_ref);
ClosePort(port1_ref);
return ERROR_PORT_STATE_UNEXPECTED;
}
void Node::ConvertToProxy(Port* port,
const NodeName& to_node_name,
PortName* port_name,
Event::PortDescriptor* port_descriptor,
PendingUpdatePreviousPeer* pending_update) {
port->AssertLockAcquired();
PortName local_port_name = *port_name;
PortName new_port_name;
GenerateRandomPortName(&new_port_name);
pending_update->receiver = port->peer_node_name;
pending_update->port = port->peer_port_name;
pending_update->sequence_num = port->next_control_sequence_num_to_send++;
pending_update->new_prev_node = to_node_name;
pending_update->new_prev_port = new_port_name;
// Make sure we don't send messages to the new peer until after we know it
// exists. In the meantime, just buffer messages locally.
DCHECK_EQ(port->state, Port::kReceiving);
port->state = Port::kBuffering;
// If we already know our peer is closed, we already know this proxy can
// be removed once it receives and forwards its last expected message.
if (port->peer_closed)
port->remove_proxy_on_last_message = true;
*port_name = new_port_name;
port_descriptor->peer_node_name = port->peer_node_name;
port_descriptor->peer_port_name = port->peer_port_name;
port_descriptor->referring_node_name = name_;
port_descriptor->referring_port_name = local_port_name;
port_descriptor->next_sequence_num_to_send = port->next_sequence_num_to_send;
port_descriptor->next_sequence_num_to_receive =
port->message_queue.next_sequence_num();
port_descriptor->last_sequence_num_to_receive =
port->last_sequence_num_to_receive;
port_descriptor->peer_closed = port->peer_closed;
memset(port_descriptor->padding, 0, sizeof(port_descriptor->padding));
// Configure the local port to point to the new port.
UpdatePortPeerAddress(local_port_name, port, to_node_name, new_port_name);
}
int Node::AcceptPort(const PortName& port_name,
const Event::PortDescriptor& port_descriptor) {
scoped_refptr<Port> port =
base::MakeRefCounted<Port>(port_descriptor.next_sequence_num_to_send,
port_descriptor.next_sequence_num_to_receive);
port->state = Port::kReceiving;
port->peer_node_name = port_descriptor.peer_node_name;
port->peer_port_name = port_descriptor.peer_port_name;
port->next_control_sequence_num_to_send = kInitialSequenceNum;
port->next_control_sequence_num_to_receive = kInitialSequenceNum;
port->prev_node_name = port_descriptor.referring_node_name;
port->prev_port_name = port_descriptor.referring_port_name;
port->last_sequence_num_to_receive =
port_descriptor.last_sequence_num_to_receive;
port->peer_closed = port_descriptor.peer_closed;
DVLOG(2) << "Accepting port " << port_name
<< " [peer_closed=" << port->peer_closed
<< "; last_sequence_num_to_receive="
<< port->last_sequence_num_to_receive << "]";
// A newly accepted port is not signalable until the message referencing the
// new port finds its way to the consumer (see GetMessage).
port->message_queue.set_signalable(false);
int rv = AddPortWithName(port_name, std::move(port));
if (rv != OK)
return rv;
// Allow referring port to forward messages.
delegate_->ForwardEvent(port_descriptor.referring_node_name,
std::make_unique<PortAcceptedEvent>(
port_descriptor.referring_port_name,
kInvalidPortName, kInvalidSequenceNum));
return OK;
}
int Node::PrepareToForwardUserMessage(const PortRef& forwarding_port_ref,
Port::State expected_port_state,
bool ignore_closed_peer,
UserMessageEvent* message,
NodeName* forward_to_node) {
bool target_is_remote = false;
base::queue<PendingUpdatePreviousPeer> peer_update_events;
for (;;) {
NodeName target_node_name;
{
SinglePortLocker locker(&forwarding_port_ref);
target_node_name = locker.port()->peer_node_name;
}
// NOTE: This may call out to arbitrary user code, so it's important to call
// it only while no port locks are held on the calling thread.
if (target_node_name != name_) {
if (!message->NotifyWillBeRoutedExternally()) {
LOG(ERROR) << "NotifyWillBeRoutedExternally failed unexpectedly.";
return ERROR_PORT_STATE_UNEXPECTED;
}
}
// Must be held because ConvertToProxy needs to update |peer_port_maps_|.
PortLocker::AssertNoPortsLockedOnCurrentThread();
base::AutoLock ports_locker(ports_lock_);
// Simultaneously lock the forwarding port as well as all attached ports.
absl::InlinedVector<PortRef, 4> attached_port_refs;
absl::InlinedVector<const PortRef*, 5> ports_to_lock;
attached_port_refs.resize(message->num_ports());
ports_to_lock.resize(message->num_ports() + 1);
ports_to_lock[0] = &forwarding_port_ref;
for (size_t i = 0; i < message->num_ports(); ++i) {
const PortName& attached_port_name = message->ports()[i];
auto iter = ports_.find(attached_port_name);
CHECK(iter != ports_.end());
attached_port_refs[i] = PortRef(attached_port_name, iter->second);
ports_to_lock[i + 1] = &attached_port_refs[i];
}
PortLocker locker(ports_to_lock.data(), ports_to_lock.size());
auto* forwarding_port = locker.GetPort(forwarding_port_ref);
if (forwarding_port->peer_node_name != target_node_name) {
// The target node has already changed since we last held the lock.
if (target_node_name == name_) {
// If the target node was previously this local node, we need to restart
// the loop, since that means we may now route the message externally.
continue;
}
target_node_name = forwarding_port->peer_node_name;
}
target_is_remote = target_node_name != name_;
if (forwarding_port->state != expected_port_state)
return ERROR_PORT_STATE_UNEXPECTED;
if (forwarding_port->peer_closed && !ignore_closed_peer)
return ERROR_PORT_PEER_CLOSED;
// Messages may already have a sequence number if they're being forwarded by
// a proxy. Otherwise, use the next outgoing sequence number.
if (message->sequence_num() == 0)
message->set_sequence_num(forwarding_port->next_sequence_num_to_send++);
#if DCHECK_IS_ON()
std::ostringstream ports_buf;
for (size_t i = 0; i < message->num_ports(); ++i) {
if (i > 0)
ports_buf << ",";
ports_buf << message->ports()[i];
}
#endif
if (message->num_ports() > 0) {
// Sanity check to make sure we can actually send all the attached ports.
// They must all be in the |kReceiving| state and must not be the sender's
// own peer.
DCHECK_EQ(message->num_ports(), attached_port_refs.size());
for (size_t i = 0; i < message->num_ports(); ++i) {
auto* attached_port = locker.GetPort(attached_port_refs[i]);
int error = OK;
if (attached_port->state != Port::kReceiving) {
error = ERROR_PORT_STATE_UNEXPECTED;
} else if (attached_port_refs[i].name() ==
forwarding_port->peer_port_name) {
error = ERROR_PORT_CANNOT_SEND_PEER;
}
if (error != OK) {
// Not going to send. Backpedal on the sequence number.
forwarding_port->next_sequence_num_to_send--;
return error;
}
}
if (target_is_remote) {
// We only bother to proxy and rewrite ports in the event if it's
// going to be routed to an external node. This substantially reduces
// the amount of port churn in the system, as many port-carrying
// events are routed at least 1 or 2 intra-node hops before (if ever)
// being routed externally.
Event::PortDescriptor* port_descriptors = message->port_descriptors();
for (size_t i = 0; i < message->num_ports(); ++i) {
auto* port = locker.GetPort(attached_port_refs[i]);
PendingUpdatePreviousPeer update_event = {
.from_port = attached_port_refs[i].name()};
ConvertToProxy(port, target_node_name, message->ports() + i,
port_descriptors + i, &update_event);
peer_update_events.push(update_event);
}
}
}
#if DCHECK_IS_ON()
DVLOG(4) << "Sending message " << message->sequence_num()
<< " [ports=" << ports_buf.str() << "]"
<< " from " << forwarding_port_ref.name() << "@" << name_ << " to "
<< forwarding_port->peer_port_name << "@" << target_node_name;
#endif
*forward_to_node = target_node_name;
message->set_port_name(forwarding_port->peer_port_name);
message->set_from_port(forwarding_port_ref.name());
message->set_control_sequence_num(
forwarding_port->next_control_sequence_num_to_send++);
break;
}
#ifndef MOJO_BACKWARDS_COMPAT
while (!peer_update_events.empty()) {
auto pending_update_event = peer_update_events.front();
peer_update_events.pop();
delegate_->ForwardEvent(
pending_update_event.receiver,
std::make_unique<UpdatePreviousPeerEvent>(
pending_update_event.port, pending_update_event.from_port,
pending_update_event.sequence_num,
pending_update_event.new_prev_node,
pending_update_event.new_prev_port));
}
#endif
if (target_is_remote) {
for (size_t i = 0; i < message->num_ports(); ++i) {
// For any ports that were converted to proxies above, make sure their
// prior local peer (if applicable) receives a status update so it can be
// made aware of its peer's location.
const Event::PortDescriptor& descriptor = message->port_descriptors()[i];
if (descriptor.peer_node_name == name_) {
PortRef local_peer;
if (GetPort(descriptor.peer_port_name, &local_peer) == OK)
delegate_->PortStatusChanged(local_peer);
}
}
}
return OK;
}
int Node::BeginProxying(const PortRef& port_ref) {
base::queue<std::pair<NodeName, ScopedEvent>> control_message_queue;
{
SinglePortLocker locker(&port_ref);
auto* port = locker.port();
if (port->state != Port::kBuffering)
return OOPS(ERROR_PORT_STATE_UNEXPECTED);
port->state = Port::kProxying;
std::swap(port->control_message_queue, control_message_queue);
}
while (!control_message_queue.empty()) {
auto node_event_pair = std::move(control_message_queue.front());
control_message_queue.pop();
delegate_->ForwardEvent(node_event_pair.first,
std::move(node_event_pair.second));
}
int rv = ForwardUserMessagesFromProxy(port_ref);
if (rv != OK)
return rv;
// Forward any pending acknowledge request.
MaybeForwardAckRequest(port_ref);
bool try_remove_proxy_immediately;
{
SinglePortLocker locker(&port_ref);
auto* port = locker.port();
if (port->state != Port::kProxying)
return OOPS(ERROR_PORT_STATE_UNEXPECTED);
try_remove_proxy_immediately = port->remove_proxy_on_last_message;
}
if (try_remove_proxy_immediately) {
TryRemoveProxy(port_ref);
} else {
InitiateProxyRemoval(port_ref);
}
return OK;
}
int Node::ForwardUserMessagesFromProxy(const PortRef& port_ref) {
for (;;) {
// NOTE: We forward messages in sequential order here so that we maintain
// the message queue's notion of next sequence number. That's useful for the
// proxy removal process as we can tell when this port has seen all of the
// messages it is expected to see.
std::unique_ptr<UserMessageEvent> message;
{
SinglePortLocker locker(&port_ref);
locker.port()->message_queue.GetNextMessage(&message, nullptr);
if (!message)
break;
}
NodeName target_node;
int rv = PrepareToForwardUserMessage(port_ref, Port::kProxying,
true /* ignore_closed_peer */,
message.get(), &target_node);
{
// Mark the message as processed after we ran PrepareToForwardUserMessage.
// This is important to prevent another thread from deleting the port
// before we grabbed a sequence number for the message.
SinglePortLocker locker(&port_ref);
locker.port()->message_queue.MessageProcessed();
}
if (rv != OK)
return rv;
delegate_->ForwardEvent(target_node, std::move(message));
}
return OK;
}
void Node::InitiateProxyRemoval(const PortRef& port_ref) {
NodeName peer_node_name;
PortName peer_port_name;
uint64_t sequence_num;
{
SinglePortLocker locker(&port_ref);
auto* port = locker.port();
if (port->state == Port::kClosed)
return;
peer_node_name = port->peer_node_name;
peer_port_name = port->peer_port_name;
sequence_num = port->next_control_sequence_num_to_send++;
DCHECK_EQ(port->state, Port::kProxying);
}
// To remove this node, we start by notifying the connected graph that we are
// a proxy. This allows whatever port is referencing this node to skip it.
// Eventually, this node will receive ObserveProxyAck (or ObserveClosure if
// the peer was closed in the meantime).
delegate_->ForwardEvent(
peer_node_name, std::make_unique<ObserveProxyEvent>(
peer_port_name, port_ref.name(), sequence_num, name_,
port_ref.name(), peer_node_name, peer_port_name));
}
void Node::TryRemoveProxy(const PortRef& port_ref) {
bool should_erase = false;
NodeName removal_target_node;
ScopedEvent removal_event;
PendingUpdatePreviousPeer pending_update_event;
{
SinglePortLocker locker(&port_ref);
auto* port = locker.port();
if (port->state == Port::kClosed)
return;
DCHECK_EQ(port->state, Port::kProxying);
// Make sure we have seen ObserveProxyAck before removing the port.
if (!port->remove_proxy_on_last_message)
return;
if (!CanAcceptMoreMessages(port)) {
DCHECK_EQ(port->message_queue.queued_message_count(), 0lu);
should_erase = true;
if (port->send_on_proxy_removal) {
removal_target_node = port->send_on_proxy_removal->first;
removal_event = std::move(port->send_on_proxy_removal->second);
if (removal_event) {
removal_event->set_control_sequence_num(
port->next_control_sequence_num_to_send++);
DCHECK_EQ(removal_target_node, port->peer_node_name);
DCHECK_EQ(removal_event->port_name(), port->peer_port_name);
}
}
// Tell the peer_node to accept messages from prev_node from now.
pending_update_event = {
.receiver = port->peer_node_name,
.port = port->peer_port_name,
.from_port = port_ref.name(),
.sequence_num = port->next_control_sequence_num_to_send++,
.new_prev_node = port->prev_node_name,
.new_prev_port = port->prev_port_name};
} else {
DVLOG(2) << "Cannot remove port " << port_ref.name() << "@" << name_
<< " now; waiting for more messages";
}
}
if (should_erase) {
#ifndef MOJO_BACKWARDS_COMPAT
delegate_->ForwardEvent(
pending_update_event.receiver,
std::make_unique<UpdatePreviousPeerEvent>(
pending_update_event.port, pending_update_event.from_port,
pending_update_event.sequence_num,
pending_update_event.new_prev_node,
pending_update_event.new_prev_port));
#endif
ErasePort(port_ref.name());
}
if (removal_event)
delegate_->ForwardEvent(removal_target_node, std::move(removal_event));
}
void Node::DestroyAllPortsWithPeer(const NodeName& node_name,
const PortName& port_name) {
// Wipes out all ports whose peer node matches |node_name| and whose peer port
// matches |port_name|. If |port_name| is |kInvalidPortName|, only the peer
// node is matched.
std::vector<PortRef> ports_to_notify;
std::vector<PortName> dead_proxies_to_broadcast;
std::vector<std::unique_ptr<UserMessageEvent>> undelivered_messages;
ScopedEvent closure_event;
NodeName closure_event_target_node;
{
PortLocker::AssertNoPortsLockedOnCurrentThread();
base::AutoLock ports_lock(ports_lock_);
auto node_peer_port_map_iter = peer_port_maps_.find(node_name);
if (node_peer_port_map_iter == peer_port_maps_.end())
return;
auto& node_peer_port_map = node_peer_port_map_iter->second;
auto peer_ports_begin = node_peer_port_map.begin();
auto peer_ports_end = node_peer_port_map.end();
if (port_name != kInvalidPortName) {
// If |port_name| is given, we limit the set of local ports to the ones
// with that specific port as their peer.
peer_ports_begin = node_peer_port_map.find(port_name);
if (peer_ports_begin == node_peer_port_map.end())
return;
peer_ports_end = peer_ports_begin;
++peer_ports_end;
}
for (auto peer_port_iter = peer_ports_begin;
peer_port_iter != peer_ports_end; ++peer_port_iter) {
auto& local_ports = peer_port_iter->second;
// NOTE: This inner loop almost always has only one element. There are
// relatively short-lived cases where more than one local port points to
// the same peer, and this only happens when extra ports are bypassed
// proxies waiting to be torn down.
for (auto local_port_iter = local_ports.begin();
local_port_iter != local_ports.end(); ++local_port_iter) {
auto& local_port_ref = local_port_iter->second;
SinglePortLocker locker(&local_port_ref);
auto* port = locker.port();
if (port_name != kInvalidPortName) {
// If this is a targeted observe dead proxy event, send out an
// ObserveClosure to acknowledge it.
closure_event_target_node = port->peer_node_name;
closure_event = std::make_unique<ObserveClosureEvent>(
port->peer_port_name, local_port_ref.name(),
port->next_control_sequence_num_to_send++,
port->last_sequence_num_to_receive);
}
if (!port->peer_closed) {
// Treat this as immediate peer closure. It's an exceptional
// condition akin to a broken pipe, so we don't care about losing
// messages.
port->peer_closed = true;
port->peer_lost_unexpectedly = true;
if (port->state == Port::kReceiving)
ports_to_notify.push_back(local_port_ref);
}
// We don't expect to forward any further messages, and we don't
// expect to receive a Port{Accepted,Rejected} event. Because we're
// a proxy with no active peer, we cannot use the normal proxy removal
// procedure of forward-propagating an ObserveProxy. Instead we
// broadcast our own death so it can be back-propagated. This is
// inefficient but rare.
if (port->state == Port::kBuffering || port->state == Port::kProxying) {
port->state = Port::kClosed;
dead_proxies_to_broadcast.push_back(local_port_ref.name());
std::vector<std::unique_ptr<UserMessageEvent>> messages;
port->message_queue.TakeAllMessages(&messages);
port->TakePendingMessages(messages);
for (auto& message : messages)
undelivered_messages.emplace_back(std::move(message));
}
}
}
}
#ifdef MOJO_BACKWARDS_COMPAT
for (const auto& proxy_name : dead_proxies_to_broadcast) {
ErasePort(proxy_name);
DVLOG(2) << "Forcibly deleted port " << proxy_name << "@" << name_;
}
#endif
if (closure_event) {
delegate_->ForwardEvent(closure_event_target_node,
std::move(closure_event));
}
// Wake up any receiving ports who have just observed simulated peer closure.
for (const auto& port : ports_to_notify)
delegate_->PortStatusChanged(port);
for (const auto& proxy_name : dead_proxies_to_broadcast) {
// Broadcast an event signifying that this proxy is no longer functioning.
delegate_->BroadcastEvent(std::make_unique<ObserveProxyEvent>(
kInvalidPortName, kInvalidPortName, kInvalidSequenceNum, name_,
proxy_name, kInvalidNodeName, kInvalidPortName));
// Also process death locally since the port that points this closed one
// could be on the current node.
// Note: Although this is recursive, only a single port is involved which
// limits the expected branching to 1.
DestroyAllPortsWithPeer(name_, proxy_name);
}
// Close any ports referenced by undelivered messages.
for (const auto& message : undelivered_messages) {
for (size_t i = 0; i < message->num_ports(); ++i) {
PortRef ref;
if (GetPort(message->ports()[i], &ref) == OK)
ClosePort(ref);
}
}
}
void Node::UpdatePortPeerAddress(const PortName& local_port_name,
Port* local_port,
const NodeName& new_peer_node,
const PortName& new_peer_port) {
ports_lock_.AssertAcquired();
local_port->AssertLockAcquired();
RemoveFromPeerPortMap(local_port_name, local_port);
local_port->peer_node_name = new_peer_node;
local_port->peer_port_name = new_peer_port;
local_port->next_control_sequence_num_to_send = kInitialSequenceNum;
if (new_peer_port != kInvalidPortName) {
peer_port_maps_[new_peer_node][new_peer_port].emplace(
local_port_name,
PortRef(local_port_name, base::WrapRefCounted<Port>(local_port)));
}
}
void Node::RemoveFromPeerPortMap(const PortName& local_port_name,
Port* local_port) {
if (local_port->peer_port_name == kInvalidPortName)
return;
auto node_iter = peer_port_maps_.find(local_port->peer_node_name);
if (node_iter == peer_port_maps_.end())
return;
auto& node_peer_port_map = node_iter->second;
auto ports_iter = node_peer_port_map.find(local_port->peer_port_name);
if (ports_iter == node_peer_port_map.end())
return;
auto& local_ports_with_this_peer = ports_iter->second;
local_ports_with_this_peer.erase(local_port_name);
if (local_ports_with_this_peer.empty())
node_peer_port_map.erase(ports_iter);
if (node_peer_port_map.empty())
peer_port_maps_.erase(node_iter);
}
void Node::SwapPortPeers(const PortName& port0_name,
Port* port0,
const PortName& port1_name,
Port* port1) {
ports_lock_.AssertAcquired();
port0->AssertLockAcquired();
port1->AssertLockAcquired();
auto& peer0_ports =
peer_port_maps_[port0->peer_node_name][port0->peer_port_name];
auto& peer1_ports =
peer_port_maps_[port1->peer_node_name][port1->peer_port_name];
peer0_ports.erase(port0_name);
peer1_ports.erase(port1_name);
peer0_ports.emplace(port1_name,
PortRef(port1_name, base::WrapRefCounted<Port>(port1)));
peer1_ports.emplace(port0_name,
PortRef(port0_name, base::WrapRefCounted<Port>(port0)));
std::swap(port0->peer_node_name, port1->peer_node_name);
std::swap(port0->peer_port_name, port1->peer_port_name);
}
void Node::MaybeResendAckRequest(const PortRef& port_ref) {
NodeName peer_node_name;
ScopedEvent ack_request_event;
{
SinglePortLocker locker(&port_ref);
auto* port = locker.port();
if (port->state != Port::kReceiving)
return;
if (!port->sequence_num_acknowledge_interval)
return;
peer_node_name = port->peer_node_name;
ack_request_event = std::make_unique<UserMessageReadAckRequestEvent>(
port->peer_port_name, port_ref.name(),
port->next_control_sequence_num_to_send++,
port->last_sequence_num_acknowledged +
port->sequence_num_acknowledge_interval);
}
delegate_->ForwardEvent(peer_node_name, std::move(ack_request_event));
}
void Node::MaybeForwardAckRequest(const PortRef& port_ref) {
NodeName peer_node_name;
ScopedEvent ack_request_event;
{
SinglePortLocker locker(&port_ref);
auto* port = locker.port();
if (port->state != Port::kProxying)
return;
if (!port->sequence_num_to_acknowledge)
return;
peer_node_name = port->peer_node_name;
ack_request_event = std::make_unique<UserMessageReadAckRequestEvent>(
port->peer_port_name, port_ref.name(),
port->next_control_sequence_num_to_send++,
port->sequence_num_to_acknowledge);
port->sequence_num_to_acknowledge = 0;
}
delegate_->ForwardEvent(peer_node_name, std::move(ack_request_event));
}
void Node::MaybeResendAck(const PortRef& port_ref) {
NodeName peer_node_name;
ScopedEvent ack_event;
{
SinglePortLocker locker(&port_ref);
auto* port = locker.port();
if (port->state != Port::kReceiving)
return;
uint64_t last_sequence_num_read =
port->message_queue.next_sequence_num() - 1;
if (!port->sequence_num_to_acknowledge || !last_sequence_num_read)
return;
peer_node_name = port->peer_node_name;
ack_event = std::make_unique<UserMessageReadAckEvent>(
port->peer_port_name, port_ref.name(),
port->next_control_sequence_num_to_send++, last_sequence_num_read);
}
delegate_->ForwardEvent(peer_node_name, std::move(ack_event));
}
Node::DelegateHolder::DelegateHolder(Node* node, NodeDelegate* delegate)
: node_(node), delegate_(delegate) {
DCHECK(node_);
}
Node::DelegateHolder::~DelegateHolder() = default;
#if DCHECK_IS_ON()
void Node::DelegateHolder::EnsureSafeDelegateAccess() const {
PortLocker::AssertNoPortsLockedOnCurrentThread();
base::AutoLock lock(node_->ports_lock_);
}
#endif
} // namespace ports
} // namespace core
} // namespace mojo
|