1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977 978 979 980 981 982 983 984 985 986 987 988 989 990 991 992 993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023 1024 1025 1026 1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037 1038 1039 1040 1041 1042 1043 1044 1045 1046 1047 1048 1049 1050 1051 1052 1053 1054 1055 1056 1057 1058 1059 1060 1061 1062 1063 1064 1065 1066 1067 1068 1069 1070 1071 1072 1073 1074 1075 1076 1077 1078 1079 1080 1081 1082 1083 1084 1085 1086 1087 1088 1089 1090 1091 1092 1093 1094 1095 1096 1097 1098 1099 1100 1101 1102 1103 1104 1105 1106 1107 1108 1109 1110 1111 1112 1113 1114 1115 1116 1117 1118 1119 1120 1121 1122 1123 1124 1125 1126 1127 1128 1129 1130 1131 1132 1133 1134 1135 1136 1137 1138 1139 1140 1141 1142 1143 1144 1145 1146 1147 1148 1149 1150 1151 1152 1153 1154 1155 1156 1157 1158 1159 1160 1161 1162 1163 1164 1165 1166 1167 1168 1169 1170 1171 1172 1173 1174 1175 1176 1177 1178 1179 1180 1181 1182 1183 1184 1185 1186 1187 1188 1189 1190 1191 1192 1193 1194 1195 1196 1197 1198 1199 1200 1201 1202 1203 1204 1205 1206 1207 1208 1209 1210 1211 1212 1213 1214 1215 1216 1217 1218 1219 1220 1221 1222 1223 1224 1225 1226 1227 1228 1229 1230 1231 1232 1233 1234 1235 1236 1237 1238 1239 1240 1241 1242 1243 1244 1245 1246 1247 1248 1249 1250 1251 1252 1253 1254 1255 1256 1257 1258 1259 1260 1261 1262 1263 1264 1265 1266 1267 1268 1269 1270 1271 1272 1273 1274 1275 1276 1277 1278 1279 1280 1281 1282 1283 1284 1285 1286 1287 1288 1289 1290 1291 1292 1293 1294 1295 1296 1297 1298 1299 1300 1301 1302 1303 1304 1305 1306 1307 1308 1309 1310 1311 1312 1313 1314 1315 1316 1317 1318 1319 1320 1321 1322 1323 1324 1325 1326 1327 1328 1329 1330 1331 1332 1333 1334 1335 1336 1337 1338 1339 1340 1341 1342 1343 1344 1345 1346 1347 1348 1349 1350 1351 1352 1353 1354 1355 1356 1357 1358 1359 1360 1361 1362 1363 1364 1365 1366 1367 1368 1369 1370 1371 1372 1373 1374 1375 1376 1377 1378 1379 1380 1381 1382 1383 1384 1385 1386 1387 1388 1389 1390 1391 1392 1393 1394 1395 1396 1397 1398 1399 1400 1401 1402 1403 1404 1405 1406 1407 1408 1409 1410 1411 1412 1413 1414 1415 1416 1417 1418 1419 1420 1421 1422 1423 1424 1425 1426 1427 1428 1429 1430 1431 1432 1433 1434 1435 1436 1437 1438 1439 1440 1441 1442 1443 1444 1445 1446 1447 1448 1449 1450 1451 1452 1453 1454 1455 1456 1457 1458 1459 1460 1461 1462 1463 1464 1465 1466 1467 1468 1469 1470 1471 1472 1473 1474 1475 1476
|
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#
# Tests various schema replication scenarios
#
# Copyright (C) Catalyst.Net Ltd. 2017
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
#
# Usage:
# export DC1=dc1_dns_name
# export DC2=dc2_dns_name
# export SUBUNITRUN=$samba4srcdir/scripting/bin/subunitrun
# PYTHONPATH="$PYTHONPATH:$samba4srcdir/torture/drs/python" $SUBUNITRUN \
# getncchanges -U"$DOMAIN/$DC_USERNAME"%"$DC_PASSWORD"
#
import drs_base
import samba.tests
import ldb
from ldb import SCOPE_BASE
import random
from samba.dcerpc import drsuapi, misc
from samba import WERRORError
from samba import werror
class DrsReplicaSyncIntegrityTestCase(drs_base.DrsBaseTestCase):
def setUp(self):
super(DrsReplicaSyncIntegrityTestCase, self).setUp()
self.init_test_state()
# Note that DC2 is the DC with the testenv-specific quirks (e.g. it's
# the vampire_dc), so we point this test directly at that DC
self.set_test_ldb_dc(self.ldb_dc2)
self.ou = str(samba.tests.create_test_ou(self.test_ldb_dc,
"getncchanges." + self.id().rsplit(".", 1)[1]))
self.addCleanup(self.ldb_dc2.delete, self.ou, ["tree_delete:1"])
self.base_dn = self.test_ldb_dc.get_default_basedn()
self.default_conn = DcConnection(self, self.ldb_dc2, self.dnsname_dc2)
self.set_dc_connection(self.default_conn)
def init_test_state(self):
self.rxd_dn_list = []
self.rxd_links = []
self.rxd_guids = []
self.last_ctr = None
# 100 is the minimum max_objects that Microsoft seems to honour
# (the max honoured is 400ish), so we use that in these tests
self.max_objects = 100
# store whether we used GET_TGT/GET_ANC flags in the requests
self.used_get_tgt = False
self.used_get_anc = False
def add_object(self, dn, objectclass="organizationalunit"):
"""Adds an OU object"""
self.test_ldb_dc.add({"dn": dn, "objectclass": objectclass})
res = self.test_ldb_dc.search(base=dn, scope=SCOPE_BASE)
self.assertEqual(len(res), 1)
def modify_object(self, dn, attr, value):
"""Modifies an object's USN by adding an attribute value to it"""
m = ldb.Message()
m.dn = ldb.Dn(self.test_ldb_dc, dn)
m[attr] = ldb.MessageElement(value, ldb.FLAG_MOD_ADD, attr)
self.test_ldb_dc.modify(m)
def delete_attribute(self, dn, attr, value):
"""Deletes an attribute from an object"""
m = ldb.Message()
m.dn = ldb.Dn(self.test_ldb_dc, dn)
m[attr] = ldb.MessageElement(value, ldb.FLAG_MOD_DELETE, attr)
self.test_ldb_dc.modify(m)
def start_new_repl_cycle(self):
"""Resets enough state info to start a new replication cycle"""
# reset rxd_links, but leave rxd_guids and rxd_dn_list alone so we know
# whether a parent/target is unknown and needs GET_ANC/GET_TGT to
# resolve
self.rxd_links = []
self.used_get_tgt = False
self.used_get_anc = False
# mostly preserve self.last_ctr, so that we use the last HWM
if self.last_ctr is not None:
self.last_ctr.more_data = True
def create_object_range(self, start, end, prefix="",
children=None, parent_list=None):
"""
Creates a block of objects. Object names are numbered sequentially,
using the optional prefix supplied. If the children parameter is
supplied it will create a parent-child hierarchy and return the
top-level parents separately.
"""
dn_list = []
# Use dummy/empty lists if we're not creating a parent/child hierarchy
if children is None:
children = []
if parent_list is None:
parent_list = []
# Create the parents first, then the children.
# This makes it easier to see in debug when GET_ANC takes effect
# because the parent/children become interleaved (by default,
# this approach means the objects are organized into blocks of
# parents and blocks of children together)
for x in range(start, end):
ou = "OU=test_ou_%s%d,%s" % (prefix, x, self.ou)
self.add_object(ou)
dn_list.append(ou)
# keep track of the top-level parents (if needed)
parent_list.append(ou)
# create the block of children (if needed)
for x in range(start, end):
for child in children:
ou = "OU=test_ou_child%s%d,%s" % (child, x, parent_list[x])
self.add_object(ou)
dn_list.append(ou)
return dn_list
def assert_expected_data(self, expected_list):
"""
Asserts that we received all the DNs that we expected and
none are missing.
"""
received_list = self.rxd_dn_list
# Note that with GET_ANC Windows can end up sending the same parent
# object multiple times, so this might be noteworthy but doesn't
# warrant failing the test
num_received = len(received_list)
num_expected = len(expected_list)
if num_received != num_expected:
print("Note: received %d objects but expected %d" % (num_received,
num_expected))
# Check that we received every object that we were expecting
for dn in expected_list:
self.assertTrue(dn in received_list,
"DN '%s' missing from replication." % dn)
def test_repl_integrity(self):
"""
Modify the objects being replicated while the replication is still
in progress and check that no object loss occurs.
"""
# The server behaviour differs between samba and Windows. Samba returns
# the objects in the original order (up to the pre-modify HWM). Windows
# incorporates the modified objects and returns them in the new order
# (i.e. modified objects last), up to the post-modify HWM. The
# Microsoft docs state the Windows behaviour is optional.
# Create a range of objects to replicate.
expected_dn_list = self.create_object_range(0, 400)
(orig_hwm, unused) = self._get_highest_hwm_utdv(self.test_ldb_dc)
# We ask for the first page of 100 objects.
# For this test, we don't care what order we receive the objects in,
# so long as by the end we've received everything
self.repl_get_next()
# Modify some of the second page of objects. This should bump the
# highwatermark
for x in range(100, 200):
self.modify_object(expected_dn_list[x], "displayName", "OU%d" % x)
(post_modify_hwm, _) = self._get_highest_hwm_utdv(self.test_ldb_dc)
self.assertTrue(post_modify_hwm.highest_usn > orig_hwm.highest_usn)
# Get the remaining blocks of data
while not self.replication_complete():
self.repl_get_next()
# Check we still receive all the objects we're expecting
self.assert_expected_data(expected_dn_list)
def is_parent_known(self, dn, known_dn_list):
"""
Returns True if the parent of the dn specified is in known_dn_list
"""
# we can sometimes get system objects like the RID Manager returned.
# Ignore anything that is not under the test OU we created
if self.ou not in dn:
return True
# Remove the child portion from the name to get the parent's DN
name_substrings = dn.split(",")
del name_substrings[0]
parent_dn = ",".join(name_substrings)
# check either this object is a parent (it's parent is the top-level
# test object), or its parent has been seen previously
return parent_dn == self.ou or parent_dn in known_dn_list
def _repl_send_request(self, get_anc=False, get_tgt=False):
"""
Sends a GetNCChanges request for the next block of replication data.
"""
# we're just trying to mimic regular client behaviour here, so just
# use the highwatermark in the last response we received
if self.last_ctr:
highwatermark = self.last_ctr.new_highwatermark
uptodateness_vector = self.last_ctr.uptodateness_vector
else:
# this is the first replication chunk
highwatermark = None
uptodateness_vector = None
# Ask for the next block of replication data
replica_flags = drsuapi.DRSUAPI_DRS_WRIT_REP
more_flags = 0
if get_anc:
replica_flags |= drsuapi.DRSUAPI_DRS_GET_ANC
self.used_get_anc = True
if get_tgt:
more_flags = drsuapi.DRSUAPI_DRS_GET_TGT
self.used_get_tgt = True
# return the response from the DC
return self._get_replication(replica_flags,
max_objects=self.max_objects,
highwatermark=highwatermark,
uptodateness_vector=uptodateness_vector,
more_flags=more_flags)
def repl_get_next(self, get_anc=False, get_tgt=False, assert_links=False):
"""
Requests the next block of replication data. This tries to simulate
client behaviour - if we receive a replicated object that we don't know
the parent of, then re-request the block with the GET_ANC flag set.
If we don't know the target object for a linked attribute, then
re-request with GET_TGT.
"""
# send a request to the DC and get the response
ctr6 = self._repl_send_request(get_anc=get_anc, get_tgt=get_tgt)
# extract the object DNs and their GUIDs from the response
rxd_dn_list = self._get_ctr6_dn_list(ctr6)
rxd_guid_list = self._get_ctr6_object_guids(ctr6)
# we'll add new objects as we discover them, so take a copy of the
# ones we already know about, so we can modify these lists safely
known_objects = self.rxd_dn_list[:]
known_guids = self.rxd_guids[:]
# check that we know the parent for every object received
for i in range(0, len(rxd_dn_list)):
dn = rxd_dn_list[i]
guid = rxd_guid_list[i]
if self.is_parent_known(dn, known_objects):
# the new DN is now known so add it to the list.
# It may be the parent of another child in this block
known_objects.append(dn)
known_guids.append(guid)
else:
# If we've already set the GET_ANC flag then it should mean
# we receive the parents before the child
self.assertFalse(get_anc, "Unknown parent for object %s" % dn)
print("Unknown parent for %s - try GET_ANC" % dn)
# try the same thing again with the GET_ANC flag set this time
return self.repl_get_next(get_anc=True, get_tgt=get_tgt,
assert_links=assert_links)
# check we know about references to any objects in the linked attrs
received_links = self._get_ctr6_links(ctr6)
# This is so that older versions of Samba fail - we want the links to
# be sent roughly with the objects, rather than getting all links at
# the end
if assert_links:
self.assertTrue(len(received_links) > 0,
"Links were expected in the GetNCChanges response")
for link in received_links:
# skip any links that aren't part of the test
if self.ou not in link.targetDN:
continue
# check the source object is known (Windows can actually send links
# where we don't know the source object yet). Samba shouldn't ever
# hit this case because it gets the links based on the source
if link.identifier not in known_guids:
# If we've already set the GET_ANC flag then it should mean
# this case doesn't happen
self.assertFalse(get_anc, "Unknown source object for GUID %s"
% link.identifier)
print("Unknown source GUID %s - try GET_ANC" % link.identifier)
# try the same thing again with the GET_ANC flag set this time
return self.repl_get_next(get_anc=True, get_tgt=get_tgt,
assert_links=assert_links)
# check we know the target object
if link.targetGUID not in known_guids:
# If we've already set the GET_TGT flag then we should have
# already received any objects we need to know about
self.assertFalse(get_tgt, "Unknown linked target for object %s"
% link.targetDN)
print("Unknown target for %s - try GET_TGT" % link.targetDN)
# try the same thing again with the GET_TGT flag set this time
return self.repl_get_next(get_anc=get_anc, get_tgt=True,
assert_links=assert_links)
# store the last successful result so we know what HWM to request next
self.last_ctr = ctr6
# store the objects, GUIDs, and links we received
self.rxd_dn_list += self._get_ctr6_dn_list(ctr6)
self.rxd_links += self._get_ctr6_links(ctr6)
self.rxd_guids += self._get_ctr6_object_guids(ctr6)
return ctr6
def replication_complete(self):
"""Returns True if the current/last replication cycle is complete"""
if self.last_ctr is None or self.last_ctr.more_data:
return False
else:
return True
def test_repl_integrity_get_anc(self):
"""
Modify the parent objects being replicated while the replication is
still in progress (using GET_ANC) and check that no object loss occurs.
"""
# Note that GET_ANC behaviour varies between Windows and Samba.
# On Samba GET_ANC results in the replication restarting from the very
# beginning. After that, Samba remembers GET_ANC and also sends the
# parents in subsequent requests (regardless of whether GET_ANC is
# specified in the later request).
# Windows only sends the parents if GET_ANC was specified in the last
# request. It will also resend a parent, even if it's already sent the
# parent in a previous response (whereas Samba doesn't).
# Create a small block of 50 parents, each with 2 children (A and B)
# This is so that we receive some children in the first block, so we
# can resend with GET_ANC before we learn too many parents
parent_dn_list = []
expected_dn_list = self.create_object_range(0, 50, prefix="parent",
children=("A", "B"),
parent_list=parent_dn_list)
# create the remaining parents and children
expected_dn_list += self.create_object_range(50, 150, prefix="parent",
children=("A", "B"),
parent_list=parent_dn_list)
# We've now got objects in the following order:
# [50 parents][100 children][100 parents][200 children]
# Modify the first parent so that it's now ordered last by USN
# This means we set the GET_ANC flag pretty much straight away
# because we receive the first child before the first parent
self.modify_object(parent_dn_list[0], "displayName", "OU0")
# modify a later block of parents so they also get reordered
for x in range(50, 100):
self.modify_object(parent_dn_list[x], "displayName", "OU%d" % x)
# Get the first block of objects - this should resend the request with
# GET_ANC set because we won't know about the first child's parent.
# On samba GET_ANC essentially starts the sync from scratch again, so
# we get this over with early before we learn too many parents
self.repl_get_next()
# modify the last chunk of parents. They should now have a USN higher
# than the highwater-mark for the replication cycle
for x in range(100, 150):
self.modify_object(parent_dn_list[x], "displayName", "OU%d" % x)
# Get the remaining blocks of data - this will resend the request with
# GET_ANC if it encounters an object it doesn't have the parent for.
while not self.replication_complete():
self.repl_get_next()
# The way the test objects have been created should force
# self.repl_get_next() to use the GET_ANC flag. If this doesn't
# actually happen, then the test isn't doing its job properly
self.assertTrue(self.used_get_anc,
"Test didn't use the GET_ANC flag as expected")
# Check we get all the objects we're expecting
self.assert_expected_data(expected_dn_list)
def assert_expected_links(self, objects_with_links, link_attr="managedBy",
num_expected=None):
"""
Asserts that a GetNCChanges response contains any expected links
for the objects it contains.
"""
received_links = self.rxd_links
if num_expected is None:
num_expected = len(objects_with_links)
self.assertTrue(len(received_links) == num_expected,
"Received %d links but expected %d"
% (len(received_links), num_expected))
for dn in objects_with_links:
self.assert_object_has_link(dn, link_attr, received_links)
def assert_object_has_link(self, dn, link_attr, received_links):
"""
Queries the object in the DB and asserts there is a link in the
GetNCChanges response that matches.
"""
# Look up the link attribute in the DB
# The extended_dn option will dump the GUID info for the link
# attribute (as a hex blob)
res = self.test_ldb_dc.search(ldb.Dn(self.test_ldb_dc, dn),
attrs=[link_attr],
controls=['extended_dn:1:0'],
scope=ldb.SCOPE_BASE)
# We didn't find the expected link attribute in the DB for the object.
# Something has gone wrong somewhere...
self.assertTrue(link_attr in res[0],
"%s in DB doesn't have attribute %s" % (dn, link_attr))
# find the received link in the list and assert that the target and
# source GUIDs match what's in the DB
for val in [str(val) for val in res[0][link_attr]]:
# Work out the expected source and target GUIDs for the DB link
target_dn = ldb.Dn(self.test_ldb_dc, val)
targetGUID_blob = target_dn.get_extended_component("GUID")
sourceGUID_blob = res[0].dn.get_extended_component("GUID")
found = False
for link in received_links:
if link.selfGUID_blob == sourceGUID_blob and \
link.targetGUID_blob == targetGUID_blob:
found = True
if self._debug:
print("Link %s --> %s" % (dn[:25], link.targetDN[:25]))
break
self.assertTrue(found,
"Did not receive expected link for DN %s" % dn)
def test_repl_get_tgt(self):
"""
Creates a scenario where we should receive the linked attribute before
we know about the target object, and therefore need to use GET_TGT.
Note: Samba currently avoids this problem by sending all its links last
"""
# create the test objects
reportees = self.create_object_range(0, 100, prefix="reportee")
managers = self.create_object_range(0, 100, prefix="manager")
all_objects = managers + reportees
expected_links = reportees
# add a link attribute to each reportee object that points to the
# corresponding manager object as the target
for i in range(0, 100):
self.modify_object(reportees[i], "managedBy", managers[i])
# touch the managers (the link-target objects) again to make sure the
# reportees (link source objects) get returned first by the replication
for i in range(0, 100):
self.modify_object(managers[i], "displayName", "OU%d" % i)
links_expected = True
# Get all the replication data - this code should resend the requests
# with GET_TGT
while not self.replication_complete():
# get the next block of replication data (this sets GET_TGT
# if needed)
self.repl_get_next(assert_links=links_expected)
links_expected = len(self.rxd_links) < len(expected_links)
# The way the test objects have been created should force
# self.repl_get_next() to use the GET_TGT flag. If this doesn't
# actually happen, then the test isn't doing its job properly
self.assertTrue(self.used_get_tgt,
"Test didn't use the GET_TGT flag as expected")
# Check we get all the objects we're expecting
self.assert_expected_data(all_objects)
# Check we received links for all the reportees
self.assert_expected_links(expected_links)
def test_repl_get_tgt_chain(self):
"""
Tests the behaviour of GET_TGT with a more complicated scenario.
Here we create a chain of objects linked together, so if we follow
the link target, then we'd traverse ~200 objects each time.
"""
# create the test objects
objectsA = self.create_object_range(0, 100, prefix="AAA")
objectsB = self.create_object_range(0, 100, prefix="BBB")
objectsC = self.create_object_range(0, 100, prefix="CCC")
# create a complex set of object links:
# A0-->B0-->C1-->B2-->C3-->B4-->and so on...
# Basically each object-A should link to a circular chain of 200 B/C
# objects. We create the links in separate chunks here, as it makes it
# clearer what happens with the USN (links on Windows have their own
# USN, so this approach means the A->B/B->C links aren't interleaved)
for i in range(0, 100):
self.modify_object(objectsA[i], "managedBy", objectsB[i])
for i in range(0, 100):
self.modify_object(objectsB[i], "managedBy",
objectsC[(i + 1) % 100])
for i in range(0, 100):
self.modify_object(objectsC[i], "managedBy",
objectsB[(i + 1) % 100])
all_objects = objectsA + objectsB + objectsC
expected_links = all_objects
# the default order the objects now get returned in should be:
# [A0-A99][B0-B99][C0-C99]
links_expected = True
# Get all the replication data - this code should resend the requests
# with GET_TGT
while not self.replication_complete():
# get the next block of replication data (this sets GET_TGT
# if needed)
self.repl_get_next(assert_links=links_expected)
links_expected = len(self.rxd_links) < len(expected_links)
# The way the test objects have been created should force
# self.repl_get_next() to use the GET_TGT flag. If this doesn't
# actually happen, then the test isn't doing its job properly
self.assertTrue(self.used_get_tgt,
"Test didn't use the GET_TGT flag as expected")
# Check we get all the objects we're expecting
self.assert_expected_data(all_objects)
# Check we received links for all the reportees
self.assert_expected_links(expected_links)
def test_repl_integrity_link_attr(self):
"""
Tests adding links to new objects while a replication is in progress.
"""
# create some source objects for the linked attributes, sandwiched
# between 2 blocks of filler objects
filler = self.create_object_range(0, 100, prefix="filler")
reportees = self.create_object_range(0, 100, prefix="reportee")
filler += self.create_object_range(100, 200, prefix="filler")
# Start the replication and get the first block of filler objects
# (We're being mean here and setting the GET_TGT flag right from the
# start. On earlier Samba versions, if the client encountered an
# unknown target object and retried with GET_TGT, it would restart the
# replication cycle from scratch, which avoids the problem).
self.repl_get_next(get_tgt=True)
# create the target objects and add the links. These objects should be
# outside the scope of the Samba replication cycle, but the links
# should still get sent with the source object
managers = self.create_object_range(0, 100, prefix="manager")
for i in range(0, 100):
self.modify_object(reportees[i], "managedBy", managers[i])
expected_objects = managers + reportees + filler
expected_links = reportees
# complete the replication
while not self.replication_complete():
self.repl_get_next(get_tgt=True)
# If we didn't receive the most recently created objects in the last
# replication cycle, then kick off another replication to get them
if len(self.rxd_dn_list) < len(expected_objects):
self.repl_get_next()
while not self.replication_complete():
self.repl_get_next()
# Check we get all the objects we're expecting
self.assert_expected_data(expected_objects)
# Check we received links for all the parents
self.assert_expected_links(expected_links)
def test_repl_get_anc_link_attr(self):
"""
A basic GET_ANC test where the parents have linked attributes
"""
# Create a block of 100 parents and 100 children
parent_dn_list = []
expected_dn_list = self.create_object_range(0, 100, prefix="parent",
children=("A"),
parent_list=parent_dn_list)
# Add links from the parents to the children
for x in range(0, 100):
self.modify_object(parent_dn_list[x], "managedBy",
expected_dn_list[x + 100])
# add some filler objects at the end. This allows us to easily see
# which chunk the links get sent in
expected_dn_list += self.create_object_range(0, 100, prefix="filler")
# We've now got objects in the following order:
# [100 x children][100 x parents][100 x filler]
# Get the replication data - because the block of children come first,
# this should retry the request with GET_ANC
while not self.replication_complete():
self.repl_get_next()
self.assertTrue(self.used_get_anc,
"Test didn't use the GET_ANC flag as expected")
# Check we get all the objects we're expecting
self.assert_expected_data(expected_dn_list)
# Check we received links for all the parents
self.assert_expected_links(parent_dn_list)
def test_repl_get_tgt_and_anc(self):
"""
Check we can resolve an unknown ancestor when fetching the link target,
i.e. tests using GET_TGT and GET_ANC in combination
"""
# Create some parent/child objects (the child will be the link target)
parents = []
all_objects = self.create_object_range(0, 100, prefix="parent",
children=["la_tgt"],
parent_list=parents)
children = [item for item in all_objects if item not in parents]
# create the link source objects and link them to the child/target
la_sources = self.create_object_range(0, 100, prefix="la_src")
all_objects += la_sources
for i in range(0, 100):
self.modify_object(la_sources[i], "managedBy", children[i])
expected_links = la_sources
# modify the children/targets so they come after the link source
for x in range(0, 100):
self.modify_object(children[x], "displayName", "OU%d" % x)
# modify the parents, so they now come last in the replication
for x in range(0, 100):
self.modify_object(parents[x], "displayName", "OU%d" % x)
# We've now got objects in the following order:
# [100 la_source][100 la_target][100 parents (of la_target)]
links_expected = True
# Get all the replication data - this code should resend the requests
# with GET_TGT and GET_ANC
while not self.replication_complete():
# get the next block of replication data (this sets
# GET_TGT/GET_ANC)
self.repl_get_next(assert_links=links_expected)
links_expected = len(self.rxd_links) < len(expected_links)
# The way the test objects have been created should force
# self.repl_get_next() to use the GET_TGT/GET_ANC flags. If this
# doesn't actually happen, then the test isn't doing its job properly
self.assertTrue(self.used_get_tgt,
"Test didn't use the GET_TGT flag as expected")
self.assertTrue(self.used_get_anc,
"Test didn't use the GET_ANC flag as expected")
# Check we get all the objects we're expecting
self.assert_expected_data(all_objects)
# Check we received links for all the link sources
self.assert_expected_links(expected_links)
# Second part of test. Add some extra objects and kick off another
# replication. The test code will use the HWM from the last replication
# so we'll only receive the objects we modify below
self.start_new_repl_cycle()
# add an extra level of grandchildren that hang off a child
# that got created last time
new_parent = "OU=test_new_parent,%s" % children[0]
self.add_object(new_parent)
new_children = []
for x in range(0, 50):
dn = "OU=test_new_la_tgt%d,%s" % (x, new_parent)
self.add_object(dn)
new_children.append(dn)
# replace half of the links to point to the new children
for x in range(0, 50):
self.delete_attribute(la_sources[x], "managedBy", children[x])
self.modify_object(la_sources[x], "managedBy", new_children[x])
# add some filler objects to fill up the 1st chunk
filler = self.create_object_range(0, 100, prefix="filler")
# modify the new children/targets so they come after the link source
for x in range(0, 50):
self.modify_object(new_children[x], "displayName", "OU-%d" % x)
# modify the parent, so it now comes last in the replication
self.modify_object(new_parent, "displayName", "OU%d" % x)
# We should now get the modified objects in the following order:
# [50 links (x 2)][100 filler][50 new children][new parent]
# Note that the link sources aren't actually sent (their new linked
# attributes are sent, but apart from that, nothing has changed)
all_objects = filler + new_children + [new_parent]
expected_links = la_sources[:50]
links_expected = True
while not self.replication_complete():
self.repl_get_next(assert_links=links_expected)
links_expected = len(self.rxd_links) < len(expected_links)
self.assertTrue(self.used_get_tgt,
"Test didn't use the GET_TGT flag as expected")
self.assertTrue(self.used_get_anc,
"Test didn't use the GET_ANC flag as expected")
# Check we get all the objects we're expecting
self.assert_expected_data(all_objects)
# Check we received links (50 deleted links and 50 new)
self.assert_expected_links(expected_links, num_expected=100)
def _repl_integrity_obj_deletion(self, delete_link_source=True):
"""
Tests deleting link objects while a replication is in progress.
"""
# create some objects and link them together, with some filler
# object in between the link sources
la_sources = self.create_object_range(0, 100, prefix="la_source")
la_targets = self.create_object_range(0, 100, prefix="la_targets")
for i in range(0, 50):
self.modify_object(la_sources[i], "managedBy", la_targets[i])
filler = self.create_object_range(0, 100, prefix="filler")
for i in range(50, 100):
self.modify_object(la_sources[i], "managedBy", la_targets[i])
# touch the targets so that the sources get replicated first
for i in range(0, 100):
self.modify_object(la_targets[i], "displayName", "OU%d" % i)
# objects should now be in the following USN order:
# [50 la_source][100 filler][50 la_source][100 la_target]
# Get the first block containing 50 link sources
self.repl_get_next()
# delete either the link targets or link source objects
if delete_link_source:
objects_to_delete = la_sources
# in GET_TGT testenvs we only receive the first 50 source objects
expected_objects = la_sources[:50] + la_targets + filler
else:
objects_to_delete = la_targets
expected_objects = la_sources + filler
for obj in objects_to_delete:
self.ldb_dc2.delete(obj)
# complete the replication
while not self.replication_complete():
self.repl_get_next()
# Check we get all the objects we're expecting
self.assert_expected_data(expected_objects)
# we can't use assert_expected_links() here because it tries to check
# against the deleted objects on the DC. (Although we receive some
# links from the first block processed, the Samba client should end up
# deleting these, as the source/target object involved is deleted)
self.assertTrue(len(self.rxd_links) == 50,
"Expected 50 links, not %d" % len(self.rxd_links))
def test_repl_integrity_src_obj_deletion(self):
self._repl_integrity_obj_deletion(delete_link_source=True)
def test_repl_integrity_tgt_obj_deletion(self):
self._repl_integrity_obj_deletion(delete_link_source=False)
def restore_deleted_object(self, guid, new_dn):
"""Re-animates a deleted object"""
guid_str = self._GUID_string(guid)
res = self.test_ldb_dc.search(base="<GUID=%s>" % guid_str,
attrs=["isDeleted"],
controls=['show_deleted:1'],
scope=ldb.SCOPE_BASE)
if len(res) != 1:
return
msg = ldb.Message()
msg.dn = res[0].dn
msg["isDeleted"] = ldb.MessageElement([], ldb.FLAG_MOD_DELETE,
"isDeleted")
msg["distinguishedName"] = ldb.MessageElement([new_dn],
ldb.FLAG_MOD_REPLACE,
"distinguishedName")
self.test_ldb_dc.modify(msg, ["show_deleted:1"])
def sync_DCs(self, nc_dn=None):
# make sure DC1 has all the changes we've made to DC2
self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2,
nc_dn=nc_dn)
def get_object_guid(self, dn):
res = self.test_ldb_dc.search(base=dn, attrs=["objectGUID"],
scope=ldb.SCOPE_BASE)
return res[0]['objectGUID'][0]
def set_dc_connection(self, conn):
"""
Switches over the connection state info that the underlying drs_base
class uses so that we replicate with a different DC.
"""
self.default_hwm = conn.default_hwm
self.default_utdv = conn.default_utdv
self.drs = conn.drs
self.drs_handle = conn.drs_handle
self.set_test_ldb_dc(conn.ldb_dc)
def assert_DCs_replication_is_consistent(self, peer_conn, all_objects,
expected_links):
"""
Replicates against both the primary and secondary DCs in the testenv
and checks that both return the expected results.
"""
print("Checking replication against primary test DC...")
# get the replication data from the test DC first
while not self.replication_complete():
self.repl_get_next()
# Check we get all the objects and links we're expecting
self.assert_expected_data(all_objects)
self.assert_expected_links(expected_links)
# switch over the DC state info so we now talk to the peer DC
self.set_dc_connection(peer_conn)
self.init_test_state()
print("Checking replication against secondary test DC...")
# check that we get the same information from the 2nd DC
while not self.replication_complete():
self.repl_get_next()
self.assert_expected_data(all_objects)
self.assert_expected_links(expected_links)
# switch back to using the default connection
self.set_dc_connection(self.default_conn)
def test_repl_integrity_obj_reanimation(self):
"""
Checks receiving links for a re-animated object doesn't lose links.
We test this against the peer DC to make sure it doesn't drop links.
"""
# This test is a little different in that we're particularly interested
# in exercising the replmd client code on the second DC.
# First, make sure the peer DC has the base OU, then connect to it (so
# we store its initial HWM)
self.sync_DCs()
peer_conn = DcConnection(self, self.ldb_dc1, self.dnsname_dc1)
# create the link source/target objects
la_sources = self.create_object_range(0, 100, prefix="la_src")
la_targets = self.create_object_range(0, 100, prefix="la_tgt")
# store the target object's GUIDs (we need to know these to
# reanimate them)
target_guids = []
for dn in la_targets:
target_guids.append(self.get_object_guid(dn))
# delete the link target
for x in range(0, 100):
self.ldb_dc2.delete(la_targets[x])
# sync the DCs, then disable replication. We want the peer DC to get
# all the following changes in a single replication cycle
self.sync_DCs()
self._disable_all_repl(self.dnsname_dc2)
# restore the target objects for the linked attributes again
for x in range(0, 100):
self.restore_deleted_object(target_guids[x], la_targets[x])
# add the links
for x in range(0, 100):
self.modify_object(la_sources[x], "managedBy", la_targets[x])
# create some additional filler objects
filler = self.create_object_range(0, 100, prefix="filler")
# modify the targets so they now come last
for x in range(0, 100):
self.modify_object(la_targets[x], "displayName", "OU-%d" % x)
# the objects should now be sent in the following order:
# [la sources + links][filler][la targets]
all_objects = la_sources + la_targets + filler
expected_links = la_sources
# Enable replication again make sure the 2 DCs are back in sync
self._enable_all_repl(self.dnsname_dc2)
self.sync_DCs()
# Get the replication data from each DC in turn.
# Check that both give us all the objects and links we're expecting,
# i.e. no links were lost
self.assert_DCs_replication_is_consistent(peer_conn, all_objects,
expected_links)
def _test_repl_integrity_cross_partition_links(self, get_tgt=False):
"""
Checks that a cross-partition link to an unknown target object does
not result in missing links.
"""
# check the peer DC is up-to-date, then connect (storing its HWM)
self.sync_DCs()
peer_conn = DcConnection(self, self.ldb_dc1, self.dnsname_dc1)
# stop replication so the peer gets the following objects in one go
self._disable_all_repl(self.dnsname_dc2)
# optionally force the client-side to use GET_TGT locally, by adding a
# one-way link to a missing/deleted target object
if get_tgt:
missing_target = "OU=missing_tgt,%s" % self.ou
self.add_object(missing_target)
get_tgt_source = "CN=get_tgt_src,%s" % self.ou
self.add_object(get_tgt_source,
objectclass="msExchConfigurationContainer")
self.modify_object(get_tgt_source, "addressBookRoots2",
missing_target)
self.test_ldb_dc.delete(missing_target)
# create a link source object in the main NC
la_source = "OU=cross_nc_src,%s" % self.ou
self.add_object(la_source)
# create the link target (a server object) in the config NC
sites_dn = "CN=Sites,%s" % self.config_dn
servers_dn = "CN=Servers,CN=Default-First-Site-Name,%s" % sites_dn
rand = random.randint(1, 10000000)
la_target = "CN=getncchanges-%d,%s" % (rand, servers_dn)
self.add_object(la_target, objectclass="server")
# add a cross-partition link between the two
self.modify_object(la_source, "managedBy", la_target)
# First, sync to the peer the NC containing the link source object
self.sync_DCs()
# Now, before the peer has received the partition containing the target
# object, try replicating from the peer. It will only know about half
# of the link at this point, but it should be a valid scenario
self.set_dc_connection(peer_conn)
while not self.replication_complete():
# pretend we've received other link targets out of order and that's
# forced us to use GET_TGT. This checks the peer doesn't fail
# trying to fetch a cross-partition target object that doesn't
# exist
self.repl_get_next(get_tgt=True)
self.set_dc_connection(self.default_conn)
# delete the GET_TGT test object. We're not interested in asserting its
# links - it was just there to make the client use GET_TGT (and it
# creates an inconsistency because one DC correctly ignores the link,
# because it points to a deleted object)
if get_tgt:
self.test_ldb_dc.delete(get_tgt_source)
self.init_test_state()
# Now sync across the partition containing the link target object
self.sync_DCs(nc_dn=self.config_dn)
self._enable_all_repl(self.dnsname_dc2)
# Get the replication data from each DC in turn.
# Check that both return the cross-partition link (note we're not
# checking the config domain NC here for simplicity)
self.assert_DCs_replication_is_consistent(peer_conn,
all_objects=[la_source],
expected_links=[la_source])
# the cross-partition linked attribute has a missing backlink. Check
# that we can still delete it successfully
self.delete_attribute(la_source, "managedBy", la_target)
self.sync_DCs()
res = self.test_ldb_dc.search(ldb.Dn(self.ldb_dc1, la_source),
attrs=["managedBy"],
controls=['extended_dn:1:0'],
scope=ldb.SCOPE_BASE)
self.assertFalse("managedBy" in res[0],
"%s in DB still has managedBy attribute" % la_source)
res = self.test_ldb_dc.search(ldb.Dn(self.ldb_dc2, la_source),
attrs=["managedBy"],
controls=['extended_dn:1:0'],
scope=ldb.SCOPE_BASE)
self.assertFalse("managedBy" in res[0],
"%s in DB still has managedBy attribute" % la_source)
# Check receiving a cross-partition link to a deleted target.
# Delete the target and make sure the deletion is sync'd between DCs
target_guid = self.get_object_guid(la_target)
self.test_ldb_dc.delete(la_target)
self.sync_DCs(nc_dn=self.config_dn)
self._disable_all_repl(self.dnsname_dc2)
# re-animate the target
self.restore_deleted_object(target_guid, la_target)
self.modify_object(la_source, "managedBy", la_target)
# now sync the link - because the target is in another partition, the
# peer DC receives a link for a deleted target, which it should accept
self.sync_DCs()
res = self.test_ldb_dc.search(ldb.Dn(self.ldb_dc1, la_source),
attrs=["managedBy"],
controls=['extended_dn:1:0'],
scope=ldb.SCOPE_BASE)
self.assertTrue("managedBy" in res[0],
"%s in DB missing managedBy attribute" % la_source)
# cleanup the server object we created in the Configuration partition
self.test_ldb_dc.delete(la_target)
self._enable_all_repl(self.dnsname_dc2)
def test_repl_integrity_cross_partition_links(self):
self._test_repl_integrity_cross_partition_links(get_tgt=False)
def test_repl_integrity_cross_partition_links_with_tgt(self):
self._test_repl_integrity_cross_partition_links(get_tgt=True)
def test_repl_get_tgt_multivalued_links(self):
"""Tests replication with multi-valued link attributes."""
# create the target/source objects and link them together
la_targets = self.create_object_range(0, 500, prefix="la_tgt")
la_source = "CN=la_src,%s" % self.ou
self.add_object(la_source, objectclass="msExchConfigurationContainer")
for tgt in la_targets:
self.modify_object(la_source, "addressBookRoots2", tgt)
filler = self.create_object_range(0, 100, prefix="filler")
# We should receive the objects/links in the following order:
# [500 targets + 1 source][500 links][100 filler]
expected_objects = la_targets + [la_source] + filler
link_only_chunk = False
# First do the replication without needing GET_TGT
while not self.replication_complete():
ctr6 = self.repl_get_next()
if ctr6.object_count == 0 and ctr6.linked_attributes_count != 0:
link_only_chunk = True
# we should receive one chunk that contains only links
self.assertTrue(link_only_chunk,
"Expected to receive a chunk containing only links")
# check we received all the expected objects/links
self.assert_expected_data(expected_objects)
self.assert_expected_links([la_source], link_attr="addressBookRoots2",
num_expected=500)
# Do the replication again, forcing the use of GET_TGT this time
self.init_test_state()
for x in range(0, 500):
self.modify_object(la_targets[x], "displayName", "OU-%d" % x)
# The objects/links should get sent in the following order:
# [1 source][500 targets][500 links][100 filler]
while not self.replication_complete():
ctr6 = self.repl_get_next()
self.assertTrue(self.used_get_tgt,
"Test didn't use the GET_TGT flag as expected")
# check we received all the expected objects/links
self.assert_expected_data(expected_objects)
self.assert_expected_links([la_source], link_attr="addressBookRoots2",
num_expected=500)
def test_InvalidNC_DummyDN_InvalidGUID_full_repl(self):
"""Test full replication on a totally invalid GUID fails with the right error code"""
dc_guid_1 = self.ldb_dc1.get_invocation_id()
drs, drs_handle = self._ds_bind(self.dnsname_dc1, ip=self.url_dc1)
req8 = self._exop_req8(dest_dsa="9c637462-5b8c-4467-aef2-bdb1f57bc4ef",
invocation_id=dc_guid_1,
nc_dn_str="DummyDN",
nc_guid=misc.GUID("c2d2f745-1610-4e93-964b-d4ba73eb32f8"),
exop=drsuapi.DRSUAPI_EXOP_NONE,
max_objects=1)
(drs, drs_handle) = self._ds_bind(self.dnsname_dc1, ip=self.url_dc1)
try:
(level, ctr) = drs.DsGetNCChanges(drs_handle, 8, req8)
except WERRORError as e1:
(enum, estr) = e1.args
self.assertEqual(enum, werror.WERR_DS_DRA_BAD_NC)
def test_DummyDN_valid_GUID_full_repl(self):
dc_guid_1 = self.ldb_dc1.get_invocation_id()
drs, drs_handle = self._ds_bind(self.dnsname_dc1, ip=self.url_dc1)
res = self.ldb_dc1.search(base=self.base_dn, scope=SCOPE_BASE,
attrs=["objectGUID"])
guid = misc.GUID(res[0]["objectGUID"][0])
req8 = self._exop_req8(dest_dsa=None,
invocation_id=dc_guid_1,
nc_dn_str="DummyDN",
nc_guid=guid,
replica_flags=drsuapi.DRSUAPI_DRS_WRIT_REP |
drsuapi.DRSUAPI_DRS_GET_ANC,
exop=drsuapi.DRSUAPI_EXOP_NONE,
max_objects=1)
try:
(level, ctr) = drs.DsGetNCChanges(drs_handle, 8, req8)
except WERRORError as e1:
(enum, estr) = e1.args
self.fail(f"Failed to call GetNCChanges with DummyDN and a GUID: {estr}")
# The NC should be the first object returned due to GET_ANC
self.assertEqual(ctr.first_object.object.identifier.guid, guid)
def _test_do_full_repl_no_overlap(self, mix=True, get_anc=False):
self.default_hwm = drsuapi.DsReplicaHighWaterMark()
# We set get_anc=True so we can assert the BASE DN will be the
# first object
ctr6 = self._repl_send_request(get_anc=get_anc)
guid_list_1 = self._get_ctr6_object_guids(ctr6)
if mix:
dc_guid_1 = self.ldb_dc1.get_invocation_id()
req8 = self._exop_req8(dest_dsa=None,
invocation_id=dc_guid_1,
nc_dn_str=self.ldb_dc1.get_default_basedn(),
exop=drsuapi.DRSUAPI_EXOP_REPL_OBJ)
(level, ctr_repl_obj) = self.drs.DsGetNCChanges(self.drs_handle, 8, req8)
self.assertEqual(ctr_repl_obj.extended_ret, drsuapi.DRSUAPI_EXOP_ERR_SUCCESS)
repl_obj_guid_list = self._get_ctr6_object_guids(ctr_repl_obj)
self.assertEqual(len(repl_obj_guid_list), 1)
# This should be the first object in the main replication due
# to get_anc=True above in one case, and a rule that the NC must be first regardless otherwise
self.assertEqual(repl_obj_guid_list[0], guid_list_1[0])
self.last_ctr = ctr6
ctr6 = self._repl_send_request(get_anc=True)
guid_list_2 = self._get_ctr6_object_guids(ctr6)
self.assertNotEqual(guid_list_1, guid_list_2)
def test_do_full_repl_no_overlap_get_anc(self):
"""
Make sure that a full replication on an nc succeeds to the goal despite needing multiple passes
"""
self._test_do_full_repl_no_overlap(mix=False, get_anc=True)
def test_do_full_repl_no_overlap(self):
"""
Make sure that a full replication on an nc succeeds to the goal despite needing multiple passes
"""
self._test_do_full_repl_no_overlap(mix=False)
def test_do_full_repl_mix_no_overlap(self):
"""
Make sure that a full replication on an nc succeeds to the goal despite needing multiple passes
Assert this is true even if we do a REPL_OBJ in between the replications
"""
self._test_do_full_repl_no_overlap(mix=True)
def nc_change(self):
old_base_msg = self.default_conn.ldb_dc.search(base=self.base_dn,
scope=SCOPE_BASE,
attrs=["oEMInformation"])
rec_cleanup = {"dn": self.base_dn,
"oEMInformation": old_base_msg[0]["oEMInformation"][0]}
m_cleanup = ldb.Message.from_dict(self.default_conn.ldb_dc,
rec_cleanup,
ldb.FLAG_MOD_REPLACE)
self.addCleanup(self.default_conn.ldb_dc.modify, m_cleanup)
rec = {"dn": self.base_dn,
"oEMInformation": f"Tortured by Samba's getncchanges.py {self.id()} against {self.default_conn.dnsname_dc}"}
m = ldb.Message.from_dict(self.default_conn.ldb_dc, rec, ldb.FLAG_MOD_REPLACE)
self.default_conn.ldb_dc.modify(m)
def _test_repl_nc_is_first(self, start_at_zero=True, nc_change=True, ou_change=True, mid_change=False):
"""Tests that the NC is always replicated first, but does not move the
tmp_highest_usn at that point, just like 'early' GET_ANC objects.
"""
# create objects, twice more than the page size of 133
objs = self.create_object_range(0, 300, prefix="obj")
if nc_change:
self.nc_change()
if mid_change:
# create even more objects
objs = self.create_object_range(301, 450, prefix="obj2")
base_msg = self.default_conn.ldb_dc.search(base=self.base_dn,
scope=SCOPE_BASE,
attrs=["uSNChanged",
"objectGUID"])
base_guid = misc.GUID(base_msg[0]["objectGUID"][0])
base_usn = int(base_msg[0]["uSNChanged"][0])
if ou_change:
# Make one more modification. We want to assert we have
# caught up to the base DN, but Windows both promotes the NC
# to the front and skips including it in the tmp_highest_usn,
# so we make a later modification that will be to show we get
# this change.
rec = {"dn": self.ou,
"postalCode": "0"}
m = ldb.Message.from_dict(self.default_conn.ldb_dc, rec, ldb.FLAG_MOD_REPLACE)
self.default_conn.ldb_dc.modify(m)
ou_msg = self.default_conn.ldb_dc.search(base=self.ou,
scope=SCOPE_BASE,
attrs=["uSNChanged",
"objectGUID"])
ou_guid = misc.GUID(ou_msg[0]["objectGUID"][0])
ou_usn = int(ou_msg[0]["uSNChanged"][0])
# Check some predicates about USN ordering that the below tests will rely on
if ou_change and nc_change:
self.assertGreater(ou_usn, base_usn)
elif not ou_change and nc_change:
self.assertGreater(base_usn, ou_usn)
ctr6 = self.repl_get_next()
guid_list_1 = self._get_ctr6_object_guids(ctr6)
if nc_change or start_at_zero:
self.assertEqual(base_guid, misc.GUID(guid_list_1[0]))
self.assertIn(str(base_guid), guid_list_1)
self.assertNotIn(str(base_guid), guid_list_1[1:])
else:
self.assertNotEqual(base_guid, misc.GUID(guid_list_1[0]))
self.assertNotIn(str(base_guid), guid_list_1)
self.assertTrue(ctr6.more_data)
if not ou_change and nc_change:
self.assertLess(ctr6.new_highwatermark.tmp_highest_usn, base_usn)
i = 0
while not self.replication_complete():
i = i + 1
last_tmp_highest_usn = ctr6.new_highwatermark.tmp_highest_usn
ctr6 = self.repl_get_next()
guid_list_2 = self._get_ctr6_object_guids(ctr6)
if len(guid_list_2) > 0:
self.assertNotEqual(last_tmp_highest_usn, ctr6.new_highwatermark.tmp_highest_usn)
if (nc_change or start_at_zero) and base_usn > last_tmp_highest_usn:
self.assertEqual(base_guid, misc.GUID(guid_list_2[0]),
f"pass={i} more_data={ctr6.more_data} base_usn={base_usn} tmp_highest_usn={ctr6.new_highwatermark.tmp_highest_usn} last_tmp_highest_usn={last_tmp_highest_usn}")
self.assertIn(str(base_guid), guid_list_2,
f"pass {i}·more_data={ctr6.more_data} base_usn={base_usn} tmp_highest_usn={ctr6.new_highwatermark.tmp_highest_usn} last_tmp_highest_usn={last_tmp_highest_usn}")
else:
self.assertNotIn(str(base_guid), guid_list_2,
f"pass {i}·more_data={ctr6.more_data} base_usn={base_usn} tmp_highest_usn={ctr6.new_highwatermark.tmp_highest_usn} last_tmp_highest_usn={last_tmp_highest_usn}")
if ou_change:
# The modification to the base OU should be in the final chunk
self.assertIn(str(ou_guid), guid_list_2)
self.assertGreaterEqual(ctr6.new_highwatermark.highest_usn,
ou_usn)
else:
# Show that the NC root change does not show up in the
# highest_usn. We either get the change before or after
# it.
self.assertNotEqual(ctr6.new_highwatermark.highest_usn,
base_usn)
self.assertEqual(ctr6.new_highwatermark.highest_usn,
ctr6.new_highwatermark.tmp_highest_usn)
self.assertFalse(ctr6.more_data)
def test_repl_nc_is_first_start_zero_nc_change(self):
self.default_hwm = drsuapi.DsReplicaHighWaterMark()
self._test_repl_nc_is_first(start_at_zero=True, nc_change=True, ou_change=True)
def test_repl_nc_is_first_start_zero(self):
# Get the NC change in the middle of the replication stream, certainly not at the start or end
self.nc_change()
self.default_hwm = drsuapi.DsReplicaHighWaterMark()
self._test_repl_nc_is_first(start_at_zero=True, nc_change=False, ou_change=False)
def test_repl_nc_is_first_mid(self):
# This is a modification of the next test, that Samba
# will pass as it will always include the NC in the
# tmp_highest_usn at the point where it belongs
self._test_repl_nc_is_first(start_at_zero=False,
nc_change=True,
ou_change=True,
mid_change=True)
def test_repl_nc_is_first(self):
# This is a modification of the next test, that Samba
# will pass as it will always include the NC in the
# tmp_highest_usn at the point where it belongs
self._test_repl_nc_is_first(start_at_zero=False, nc_change=True, ou_change=True)
def test_repl_nc_is_first_nc_change_only(self):
# This shows that the NC change is not reflected in the tmp_highest_usn
self._test_repl_nc_is_first(start_at_zero=False, nc_change=True, ou_change=False)
def test_repl_nc_is_first_no_change(self):
# The NC should not be present in this replication
self._test_repl_nc_is_first(start_at_zero=False, nc_change=False, ou_change=False)
class DrsReplicaSyncFakeAzureAdTests(DrsReplicaSyncIntegrityTestCase):
"""This repeats all of DrsReplicaSyncIntegrityTestCase, but the client
always sets highwatermark.reserved_usn = 0. This is what Azure AD
/ Entra ID Connect does.
"""
@staticmethod
def modify_highwatermark(hwm):
if hwm is not None:
hwm.reserved_usn = 0
SKIPPED_TESTS = {
"test_repl_get_tgt",
"test_repl_get_tgt_and_anc",
"test_repl_get_tgt_chain",
"test_do_full_repl_mix_no_overlap",
"test_do_full_repl_no_overlap",
"test_do_full_repl_no_overlap_get_anc",
"test_DummyDN_valid_GUID_full_repl",
"test_InvalidNC_DummyDN_InvalidGUID_full_repl",
"test_repl_get_anc_link_attr",
"test_repl_integrity_cross_partition_links",
"test_repl_integrity_cross_partition_links_with_tgt",
"test_repl_integrity_get_anc",
"test_repl_integrity_obj_reanimation",
"test_repl_integrity_src_obj_deletion",
"test_repl_integrity_tgt_obj_deletion",
"test_repl_nc_is_first",
"test_repl_nc_is_first_mid",
"test_repl_nc_is_first_nc_change_only", # xfail in parent
"test_repl_nc_is_first_no_change",
"test_repl_nc_is_first_start_zero",
"test_repl_nc_is_first_start_zero_nc_change",
}
def setUp(self):
# Only some tests behave any differently with the zeroed
# reserved_usn. The getncchanges tests are quite slow, so it
# is worth skipping unnecessary tests.
#
# If you think a test is worth running here, add it to the
# list.
testname = self.id().rsplit(".", 1)[1]
if testname in self.SKIPPED_TESTS:
self.skipTest("Probably not affected by reserved_usn = 0")
super().setUp()
class DcConnection:
"""Helper class to track a connection to another DC"""
def __init__(self, drs_base, ldb_dc, dnsname_dc):
self.ldb_dc = ldb_dc
(self.drs, self.drs_handle) = drs_base._ds_bind(dnsname_dc)
(self.default_hwm, utdv) = drs_base._get_highest_hwm_utdv(ldb_dc)
self.default_utdv = utdv
self.dnsname_dc = dnsname_dc
|