1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977 978 979 980 981 982 983 984 985 986 987 988 989 990 991 992 993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023 1024 1025 1026 1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037 1038 1039 1040 1041 1042 1043 1044 1045 1046 1047 1048 1049 1050 1051 1052 1053 1054 1055 1056 1057 1058 1059 1060 1061 1062 1063 1064 1065 1066 1067 1068 1069 1070 1071 1072 1073 1074 1075 1076 1077 1078 1079 1080 1081 1082 1083 1084 1085 1086 1087 1088 1089 1090 1091 1092 1093 1094 1095 1096 1097 1098 1099 1100 1101 1102 1103 1104 1105 1106 1107 1108 1109 1110 1111 1112 1113 1114 1115 1116 1117 1118 1119 1120 1121 1122 1123 1124 1125 1126 1127 1128 1129 1130 1131 1132 1133 1134 1135 1136 1137 1138 1139 1140 1141 1142 1143 1144 1145 1146 1147 1148 1149 1150 1151 1152 1153 1154 1155 1156 1157 1158 1159 1160 1161 1162 1163 1164 1165 1166 1167 1168 1169 1170 1171 1172 1173 1174 1175 1176 1177 1178 1179 1180 1181 1182 1183 1184 1185 1186 1187 1188 1189 1190 1191 1192 1193 1194 1195 1196 1197 1198 1199 1200 1201 1202 1203 1204 1205 1206 1207 1208 1209 1210 1211 1212 1213 1214 1215 1216 1217 1218 1219 1220 1221 1222 1223 1224 1225 1226 1227 1228 1229 1230 1231 1232 1233 1234 1235 1236 1237 1238 1239 1240 1241 1242 1243 1244 1245 1246 1247 1248 1249 1250 1251 1252 1253 1254 1255 1256 1257 1258 1259 1260 1261 1262 1263 1264 1265 1266 1267 1268 1269 1270 1271 1272 1273 1274 1275 1276 1277 1278 1279 1280 1281 1282 1283 1284 1285 1286 1287 1288 1289 1290 1291 1292 1293 1294 1295 1296 1297 1298 1299 1300 1301 1302 1303 1304 1305 1306 1307 1308 1309 1310 1311 1312 1313 1314 1315 1316 1317 1318 1319 1320 1321 1322 1323 1324 1325 1326 1327 1328 1329 1330 1331 1332 1333 1334 1335 1336 1337 1338 1339 1340 1341 1342 1343 1344 1345 1346 1347 1348 1349 1350 1351 1352 1353 1354 1355 1356 1357 1358 1359 1360 1361 1362 1363 1364 1365 1366 1367 1368 1369 1370 1371 1372 1373 1374 1375 1376 1377 1378 1379 1380 1381 1382 1383 1384 1385 1386 1387 1388 1389 1390 1391 1392 1393 1394 1395 1396 1397 1398 1399 1400 1401 1402 1403 1404 1405 1406 1407 1408 1409 1410 1411 1412 1413 1414 1415 1416 1417 1418 1419 1420 1421 1422 1423 1424 1425 1426 1427 1428 1429 1430 1431 1432 1433 1434 1435 1436 1437 1438 1439 1440 1441 1442 1443 1444 1445 1446 1447 1448 1449 1450 1451 1452 1453 1454 1455 1456 1457 1458 1459 1460 1461 1462 1463 1464 1465 1466 1467 1468 1469 1470 1471 1472 1473 1474 1475 1476 1477 1478 1479 1480 1481 1482 1483 1484 1485 1486 1487 1488 1489 1490 1491 1492 1493 1494 1495 1496 1497 1498 1499 1500 1501 1502 1503 1504 1505 1506 1507 1508 1509 1510 1511 1512 1513 1514 1515 1516 1517 1518 1519 1520 1521 1522 1523 1524 1525 1526 1527 1528 1529 1530 1531 1532 1533 1534 1535 1536 1537 1538 1539 1540 1541 1542 1543 1544 1545 1546 1547 1548 1549 1550 1551 1552 1553 1554 1555 1556 1557 1558 1559 1560 1561 1562 1563 1564 1565 1566 1567 1568 1569 1570 1571 1572 1573 1574 1575 1576 1577 1578 1579 1580 1581 1582 1583 1584 1585 1586 1587 1588 1589 1590 1591 1592 1593 1594 1595 1596 1597
|
# -*- coding: utf-8 -*-
# Copyright (C) 2006-2011 Vodafone España, S.A.
# Copyright (C) 2008-2009 Warp Networks, S.L.
# Author: Pablo MartÃ
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
"""
Wrapper around :class:`~core.protocol.WCDMAProtocol`
It basically provides error control and more high-level operations.
N-tier folks can see this as a Business Logic class.
"""
from collections import deque
import dbus
import serial
from gobject import timeout_add_seconds, source_remove
from math import floor
from time import time
from twisted.python import log
from twisted.internet import defer, reactor, task
import wader.common.aterrors as E
from wader.common.consts import (WADER_SERVICE, MDM_INTFACE, CRD_INTFACE,
NET_INTFACE, USD_INTFACE, MM_NETWORK_BAND_ANY,
MM_NETWORK_MODE_ANY, MM_NETWORK_MODE_LAST,
MM_MODEM_STATE_DISABLED,
MM_MODEM_STATE_ENABLING,
MM_MODEM_STATE_ENABLED,
MM_MODEM_STATE_SEARCHING,
MM_MODEM_STATE_REGISTERED,
MM_MODEM_STATE_DISCONNECTING,
MM_MODEM_STATE_CONNECTING,
MM_MODEM_STATE_CONNECTED,
MM_GSM_ACCESS_TECH_GSM_COMPAT,
MM_GSM_ACCESS_TECH_GPRS,
MM_GSM_ACCESS_TECH_EDGE,
MM_GSM_ACCESS_TECH_UMTS,
MM_GSM_ACCESS_TECH_HSDPA,
MM_GSM_ACCESS_TECH_HSUPA,
MM_GSM_ACCESS_TECH_HSPA,
MM_GSM_ACCESS_TECH_HSPA_PLUS,
MM_GSM_ACCESS_TECH_LTE,
MM_IP_METHOD_PPP)
from wader.common.encoding import (from_ucs2, from_u, unpack_ucs2_bytes,
pack_ucs2_bytes, check_if_ucs2, LATIN_EX_B,
from_8bit_in_gsm_or_ts31101)
import wader.common.exceptions as ex
from wader.common.signals import SIG_CREG
from wader.common.sms import Message
from wader.common.utils import rssi_to_percentage
from core.contact import Contact
from core.mal import MessageAssemblyLayer
from core.mms import (send_m_send_req, send_m_notifyresp_ind,
get_payload)
from core.oal import get_os_object
from core.protocol import WCDMAProtocol
from core.sim import (COM_READ_BINARY, EF_AD, EF_SPN, EF_ICCID, SW_OK,
RETRY_ATTEMPTS, RETRY_TIMEOUT)
class WCDMAWrapper(WCDMAProtocol):
"""
I am a wrapper around :class:`~core.protocol.WCDMAProtocol`
Its main objective is to provide some error control on some operations
and a cleaner API to deal with its results.
"""
def __init__(self, device):
super(WCDMAWrapper, self).__init__(device)
# unfortunately some methods require me to save some state
# between runs. This dict contains 'em.
self.state_dict = {}
# message assembly layer (initted on do_enable_device)
self.mal = MessageAssemblyLayer(self)
self.signal_matchs = []
# timeout_add_seconds task ID
self.__time = 0
self.__rx_bytes = 0
self.__tx_bytes = 0
self.stats_id = None
# iface name
self.iface = None
self.apn_range = None
self.caches = {
'signal': (0, 0),
'registration': (0, (0, '', '')),
}
def maybecached(self, name, fn, cblist):
if self.caches[name][0] >= time(): # cache hasn't expired
log.msg("get '%s' (cached path)" % name, system='CACHE')
d = defer.succeed(self.caches[name][1])
else:
log.msg("get '%s' (noncached path)" % name, system='CACHE')
d = fn()
for cb in cblist:
d.addCallback(cb)
d.addCallback(self.updatecache, name)
return d
def updatecache(self, value, name):
CACHETIME = 5
self.caches[name] = (time() + CACHETIME, value)
return value
def emit_network_mode(self, value):
"""
Method to validate, convert ``value`` to dbus UInt32 and emit
"""
if value < 0 or value > MM_NETWORK_MODE_LAST:
return
self.device.exporter.NetworkMode(dbus.UInt32(value))
def emit_rssi(self, value):
"""
Method to validate, convert ``value`` to dbus UInt32 and emit
"""
if value < 0 or value > 100:
return
self.device.exporter.SignalQuality(dbus.UInt32(value))
def connect_to_signals(self):
bus = dbus.SystemBus()
device = bus.get_object(WADER_SERVICE, self.device.sconn.device.opath)
sm = device.connect_to_signal(SIG_CREG, self.on_creg_cb)
self.signal_matchs.append(sm)
def clean_signals(self):
while self.signal_matchs:
sm = self.signal_matchs.pop()
sm.remove()
def __str__(self):
return self.device.__remote_name__
def acknowledge_mms(self, index, extra_info):
"""
Acknowledges the Mms identified by ``index`` using ``extra_info``
"""
return self.mal.acknowledge_mms(index, extra_info)
def do_acknowledge_mms(self, index, extra_info):
if 'wap2' not in extra_info:
raise ValueError("Only WAP2.0 is supported at the moment")
if 'mmsc' not in extra_info:
raise ValueError("No mmsc key in %s" % extra_info)
try:
notification = self.mal.wap_map[index].get_last_notification()
except IndexError:
raise E.ExpiredNotification("Could not find "
"notification %d" % index)
d = send_m_notifyresp_ind(extra_info,
notification.headers['Transaction-Id'])
return d
def add_contact(self, contact):
"""
Adds ``contact`` to the SIM and returns the index where was stored
"""
ucs2 = 'UCS2' in self.device.sim.charset
name = pack_ucs2_bytes(contact.name) if ucs2 else from_u(contact.name)
# common arguments for both operations (name and number)
args = [name, from_u(contact.number)]
if contact.index:
# contact.index is set, user probably wants to overwrite an
# existing contact
args.append(contact.index)
d = super(WCDMAWrapper, self).add_contact(*args)
d.addCallback(lambda _: contact.index)
return d
def get_next_id_cb(index):
args.append(index)
d2 = super(WCDMAWrapper, self).add_contact(*args)
# now we just fake add_contact's response and we return the index
d2.addCallback(lambda _: index)
return d2
# contact.index is not set, this means that we need to obtain the
# first free slot on the phonebook and then add the contact
d = self._get_next_contact_id()
d.addCallback(get_next_id_cb)
return d
def cancel_ussd(self):
"""Cancels an ongoing USSD session"""
d = super(WCDMAWrapper, self).cancel_ussd()
def set_idle(result):
self.device.set_property(USD_INTFACE, 'State', 'idle')
return result[0].group('resp')
d.addCallback(set_idle)
return d
def change_pin(self, oldpin, newpin):
"""Changes PIN from ``oldpin`` to ``newpin``"""
d = super(WCDMAWrapper, self).change_pin(oldpin, newpin)
d.addCallback(lambda result: result[0].group('resp'))
return d
def check_pin(self):
"""
Returns the SIM's auth state
:raise SimPinRequired: Raised if SIM PIN is required
:raise SimPukRequired: Raised if SIM PUK is required
:raise SimPuk2Required: Raised if SIM PUK2 is required
"""
if self.device.quirks.get('needs_enable_before_pin_check', False):
# Note: this device needs to be enabled before pin can be checked
d = self.enable_radio(True)
d.addCallback(lambda x: super(WCDMAWrapper, self).check_pin())
else:
d = super(WCDMAWrapper, self).check_pin()
def process_result(resp):
result = resp[0].group('resp')
if result == 'READY':
return result
elif result == 'SIM PIN':
raise E.SimPinRequired()
elif result == 'SIM PUK':
raise E.SimPukRequired()
elif result == 'SIM PUK2':
raise E.SimPuk2Required()
else:
log.err("unknown authentication state %s" % result)
d.addCallback(process_result)
return d
def delete_contact(self, index):
"""Deletes contact at ``index``"""
d = super(WCDMAWrapper, self).delete_contact(index)
d.addCallback(lambda result: result[0].group('resp'))
return d
def delete_sms(self, index):
return self.mal.delete_sms(index)
def do_delete_sms(self, index):
"""Deletes SMS at ``index``"""
d = super(WCDMAWrapper, self).delete_sms(index)
d.addCallback(lambda result: result[0].group('resp'))
return d
def download_mms(self, index, extra_info):
"""Downloads the Mms identified by ``index``"""
return self.mal.download_mms(index, extra_info)
def do_download_mms(self, notification, extra_info):
uri = notification.headers['Content-Location']
return get_payload(uri, extra_info)
def disable_echo(self):
"""Disables echo"""
d = super(WCDMAWrapper, self).disable_echo()
d.addCallback(lambda result: result[0].group('resp'))
return d
def enable_pin(self, pin, enable):
"""
Enables or disables PIN auth with ``pin`` according to ``enable``
"""
def cache_and_return_response(response):
self.device.set_property(CRD_INTFACE, 'PinEnabled', enable)
return response[0].group('resp')
d = super(WCDMAWrapper, self).enable_pin(pin, enable)
d.addCallback(cache_and_return_response)
return d
def enable_echo(self):
"""
Enables echo
Use this with caution as it might leave Wader on an unusable state
"""
d = super(WCDMAWrapper, self).enable_echo()
d.addCallback(lambda result: result[0].group('resp'))
return d
def find_contacts(self, pattern):
"""
Returns all the `Contact` objects whose name matches ``pattern``
:rtype: list
"""
if 'UCS2' in self.device.sim.charset:
pattern = pack_ucs2_bytes(pattern)
d = super(WCDMAWrapper, self).find_contacts(pattern)
d.addCallback(lambda matches: map(self._regexp_to_contact, matches))
return d
def get_apns(self):
"""
Returns all the APNs in the SIM
:rtype: list
"""
d = super(WCDMAWrapper, self).get_apns()
d.addCallback(lambda resp:
[(int(r.group('index')), r.group('apn')) for r in resp])
return d
def get_apn_range(self):
"""
Returns the range of context IDs in the SIM
:rtype: tuple
"""
d = super(WCDMAWrapper, self).get_apn_range()
d.addCallback(lambda r: (int(r.group('lo4')), int(r.group('hi4'))))
return d
def get_band(self):
"""Returns the current band used"""
raise NotImplementedError()
def get_bands(self):
"""
Returns the available bands
:rtype: list
"""
bands = self.custom.band_dict.keys()
if MM_NETWORK_BAND_ANY in bands:
bands.pop(MM_NETWORK_BAND_ANY)
# cast it to UInt32
return defer.succeed(dbus.UInt32(sum(bands)))
def get_card_model(self):
"""Returns the card model"""
d = super(WCDMAWrapper, self).get_card_model()
d.addCallback(lambda response: response[0].group('model'))
return d
def get_card_version(self):
"""Returns the firmware version"""
d = super(WCDMAWrapper, self).get_card_version()
d.addCallback(lambda response: response[0].group('version'))
return d
def get_charset(self):
"""Returns the current charset"""
d = super(WCDMAWrapper, self).get_charset()
d.addCallback(lambda response: response[0].group('lang'))
return d
def get_charsets(self):
"""
Returns the available charsets
:rtype: list
"""
d = super(WCDMAWrapper, self).get_charsets()
d.addCallback(lambda resp: [match.group('lang') for match in resp])
return d
def get_contact(self, index):
"""Returns the contact at ``index``"""
d = super(WCDMAWrapper, self).get_contact(index)
d.addCallback(lambda match: self._regexp_to_contact(match[0]))
return d
def get_hardware_info(self):
"""Returns the manufacturer name, card model and firmware version"""
dlist = [self.get_manufacturer_name(),
self.get_card_model(),
self.get_card_version()]
return defer.gatherResults(dlist)
def sim_access_restricted(self, command, fileid=None, p1=None,
p2=None, p3=None, data=None, pathid=None):
d = super(WCDMAWrapper, self).sim_access_restricted(
command, fileid, p1, p2, p3, data, pathid)
def cb(response):
try:
sw1 = int(response[0].group('sw1'))
except (IndexError, TypeError, ValueError):
sw1 = None
try:
sw2 = int(response[0].group('sw2'))
except (IndexError, TypeError, ValueError):
sw2 = None
if (sw1 not in SW_OK) or (sw1 == 0x90 and sw2 != 0):
# Status error.
raise E.General('Bad Status response for AT+CRSM')
data = response[0].group('response')
return (sw1, sw2, data)
d.addCallback(cb)
return d
def get_iccid(self):
"""Returns ICC identification number"""
d = self.sim_access_restricted(COM_READ_BINARY, EF_ICCID, 0, 0, 10)
def get_iccid_cb(response):
sw1, sw2, data = response
if data is None:
raise E.General('ICCID not available')
if len(data) != 20:
raise E.General('ICCID length not correct')
# Parse BCD F padded string.
result = ''
i = 0
while (i + 1 < len(data)):
msd = data[i]
lsd = data[i + 1]
i += 2
if lsd in ['f', 'F']:
break
result += lsd
if msd in ['f', 'F']:
break
result += msd
return result
d.addCallback(get_iccid_cb)
return d
def get_imei(self):
"""Returns the IMEI"""
d = super(WCDMAWrapper, self).get_imei()
d.addCallback(lambda response: response[0].group('imei'))
return d
def get_imsi(self):
"""Returns the IMSI"""
d = super(WCDMAWrapper, self).get_imsi()
d.addCallback(lambda response: response[0].group('imsi'))
return d
def get_ip4_config(self):
"""Returns the IP4Config info related to IpMethod"""
raise NotImplementedError()
def get_manufacturer_name(self):
"""Returns the manufacturer name"""
d = super(WCDMAWrapper, self).get_manufacturer_name()
d.addCallback(lambda response: response[0].group('name'))
return d
def _get_netreg_info(self, status):
# Ugly but it works. The naive approach with DeferredList won't work
# as the call order is not guaranteed
resp = [status]
def get_netinfo_cb(info):
new = info[1]
cur = self.device.get_property(NET_INTFACE, 'AccessTechnology')
# Don't stamp on the value provided by a richer method
if (new == MM_GSM_ACCESS_TECH_GPRS and
cur == MM_GSM_ACCESS_TECH_EDGE) or \
(new == MM_GSM_ACCESS_TECH_UMTS and
cur in [MM_GSM_ACCESS_TECH_HSDPA,
MM_GSM_ACCESS_TECH_HSUPA,
MM_GSM_ACCESS_TECH_HSPA,
MM_GSM_ACCESS_TECH_HSPA_PLUS]):
self.device.set_property(NET_INTFACE, 'AccessTechnology', cur)
else:
self.device.set_property(NET_INTFACE, 'AccessTechnology', new)
return resp.append(info[0])
def get_netinfo_eb(failure):
failure.trap(E.NoNetwork, ex.LimitedServiceNetworkError)
resp.append('')
d = self.get_network_info('numeric')
d.addCallback(get_netinfo_cb)
d.addErrback(get_netinfo_eb)
d.addCallback(lambda _: self.get_network_info('name'))
d.addCallback(get_netinfo_cb)
d.addErrback(get_netinfo_eb)
d.addCallback(lambda _: tuple(resp))
return d
def _get_netreg_info_emit(self, _reginfo):
"""Convert type and emit RegistrationInfo signal"""
if self.device.status <= MM_MODEM_STATE_REGISTERED:
if _reginfo[0] in [1, 5]:
self.device.set_status(MM_MODEM_STATE_REGISTERED)
else:
self.device.set_status(MM_MODEM_STATE_SEARCHING)
reginfo = (dbus.UInt32(_reginfo[0]), _reginfo[1], _reginfo[2])
self.device.exporter.RegistrationInfo(*reginfo)
return reginfo
def get_netreg_info(self):
"""Get the registration status and the current operator"""
return self.maybecached('registration', self.get_netreg_status,
[lambda info: info[1], self._get_netreg_info,
self._get_netreg_info_emit])
def on_creg_cb(self, status):
"""Callback for +CREG notifications"""
d = defer.succeed(status)
d.addCallback(self._get_netreg_info)
d.addCallback(self._get_netreg_info_emit)
d.addCallback(self.updatecache, 'registration')
return d
def get_netreg_status(self):
"""Returns a tuple with the network registration status"""
d = super(WCDMAWrapper, self).get_netreg_status()
def convert_cb(resp):
# convert them to int
return int(resp[0].group('mode')), int(resp[0].group('status'))
d.addCallback(convert_cb)
return d
def get_network_info(self, _type=None):
"""
Returns the network info (a.k.a AT+COPS?)
The response will be a tuple as (OperatorName, ConnectionType) if
it returns a (None, None) that means that some error occurred while
obtaining the info. The class that requested the info should take
care of insisting before this problem. This method will convert
numeric network IDs to alphanumeric.
"""
d = super(WCDMAWrapper, self).get_network_info(_type)
def get_net_info_cb(netinfo):
"""
Returns a (Networkname, ConnType) tuple
It returns None if there's no info
"""
if not netinfo:
return None
netinfo = netinfo[0]
if netinfo.group('error'):
# this means that we've received a response like
# +COPS: 0 which means that we don't have network temporaly
# we should raise an exception here
raise E.NoNetwork()
# TS 27007 got updated as of 10.4
_map = {
'0': MM_GSM_ACCESS_TECH_GPRS, # strictly GSM
'1': MM_GSM_ACCESS_TECH_GSM_COMPAT,
'2': MM_GSM_ACCESS_TECH_UMTS, # strictly UTRAN
'3': MM_GSM_ACCESS_TECH_EDGE,
'4': MM_GSM_ACCESS_TECH_HSDPA,
'5': MM_GSM_ACCESS_TECH_HSUPA,
'6': MM_GSM_ACCESS_TECH_HSPA,
'7': MM_GSM_ACCESS_TECH_LTE,
}
conn_type = _map.get(netinfo.group('status'))
netname = netinfo.group('netname')
if netname in ['Limited Service',
pack_ucs2_bytes('Limited Service')]:
raise ex.LimitedServiceNetworkError
# netname can be in UCS2, as a string, or as a network id (int)
if check_if_ucs2(netname):
return unpack_ucs2_bytes(netname), conn_type
else:
# now can be either a string or a network id (int)
try:
netname = int(netname)
except ValueError:
# we got a string ID
return netname, conn_type
# if we have arrived here, that means that the network id
# is a five, six or seven digit integer
return str(netname), conn_type
d.addCallback(get_net_info_cb)
return d
def get_network_mode(self):
"""Returns the current network mode"""
raise NotImplementedError()
def get_network_modes(self):
"""Returns the supported network modes"""
modes = self.custom.conn_dict.copy()
if MM_NETWORK_MODE_ANY in modes:
modes.pop(MM_NETWORK_MODE_ANY)
return defer.succeed(sum(modes.keys()))
def get_network_names(self):
"""
Performs a network search
:rtype: list of :class:`NetworkOperator`
"""
d = super(WCDMAWrapper, self).get_network_names()
d.addCallback(lambda resp:
[NetworkOperator(*match.groups()) for match in resp])
return d
def _get_free_contact_ids(self):
"""Returns a deque with the not used contact ids"""
def list_contacts_cb(contacts):
if not contacts:
return deque(range(1, self.device.sim.size))
busy_ids = [contact.index for contact in contacts]
free = set(range(1, self.device.sim.size)) ^ set(busy_ids)
return deque(list(free))
def list_contacts_eb(failure):
failure.trap(E.NotFound, E.General)
return deque(range(1, self.device.sim.size))
d = self.list_contacts()
d.addCallbacks(list_contacts_cb, list_contacts_eb)
return d
def _get_next_contact_id(self):
"""
Returns the next unused contact id
Provides some error control and won't fail if sim.size
is None as the card might be a bit difficult
"""
def do_get_it():
d = self._get_free_contact_ids()
d.addCallback(lambda free: free.popleft())
return d
if self.device.sim.size and self.device.sim.size != 0:
return do_get_it()
deferred = defer.Deferred()
self.state_dict['phonebook_retries'] = 0
def get_it(auxdef=None):
def get_phonebook_size_cb(size):
self.device.sim.size = size
d = do_get_it()
d.chainDeferred(deferred)
def get_phonebook_size_eb(failure):
self.state_dict['phonebook_retries'] += 1
if self.state_dict['phonebook_retries'] > RETRY_ATTEMPTS:
raise RuntimeError("Could not obtain phonebook size")
reactor.callLater(RETRY_TIMEOUT, get_it, auxdef)
d = self.get_phonebook_size()
d.addCallback(get_phonebook_size_cb)
d.addErrback(get_phonebook_size_eb)
return auxdef
return get_it(deferred)
def get_operator_id(self):
"""
Returns the ID of the network operator that issued the SIM card,
formatted as a 5 or 6-digit MCC/MNC code (ex "310410").
:raise General: When MCC+MNC cannot be retrieved.
"""
d = defer.Deferred()
# Another way to handle global variables.
d.imsi = None
d.mnc_length = None
def get_op_id_eb(failure):
log.msg("get_operator_id FAILURE: %s" % failure.value)
failure.raiseException()
d.addErrback(get_op_id_eb)
d_mnc = self.sim_access_restricted(COM_READ_BINARY, EF_AD, 0, 0, 4)
def get_op_id_mnc_digits_cb(response):
sw1, sw2, number = response
if number is None or len(number) < 8:
raise E.General('Bad MNC length response')
number = int(number[6:8], 16)
if number in range(2, 5):
# We got MNC number of digits right.
return number
else:
raise E.General('Incorrect MNC length')
def get_op_id_mnc_digits_eb(failure):
log.msg("get_operator_id mnc_digits FAILURE %s" % failure.value)
failure.raiseException()
d_mnc.addCallback(get_op_id_mnc_digits_cb)
d_mnc.addCallback(d.callback)
d_mnc.addErrback(get_op_id_mnc_digits_eb)
d_mnc.addErrback(d.errback)
def store_mnc_length(mnc_length):
self.mnc_length = mnc_length
d_imsi = self.get_imsi()
d_imsi.addErrback(get_op_id_imsi_eb)
d_imsi.addErrback(d.errback)
return d_imsi
d.addCallback(store_mnc_length)
def get_op_id_imsi_eb(failure):
log.msg("get_operator_id imsi FAILURE %s" % failure.value)
failure.raiseException()
def store_imsi(imsi):
self.imsi = imsi
return None
d.addCallback(store_imsi)
def get_op_id_cb(response):
number = self.mnc_length # An integer.
imsi = self.imsi
if number is None or imsi is None:
raise E.General('Bad MNC length or IMSI')
length = number + 3
if len(imsi) < length:
raise E.General('IMSI length too short')
return imsi[0:length]
d.addCallback(get_op_id_cb)
return d
def get_phonebook_size(self):
"""Returns the phonebook size"""
d = super(WCDMAWrapper, self).get_phonebook_size()
d.addCallback(lambda resp: int(resp[0].group('size')))
return d
def get_pin_status(self):
"""Returns 1 if PIN auth is active and 0 if its not"""
def pinreq_errback(failure):
failure.trap(E.SimPinRequired)
return 1
def aterror_eb(failure):
failure.trap(E.General)
# return the failure or wont work
return failure
d = super(WCDMAWrapper, self).get_pin_status()
d.addCallback(lambda response: int(response[0].group('status')))
d.addErrback(pinreq_errback)
d.addErrback(aterror_eb)
return d
def get_radio_status(self):
"""Returns whether the radio is enabled or disabled"""
d = super(WCDMAWrapper, self).get_radio_status()
d.addCallback(lambda resp: int(resp[0].group('status')))
return d
def get_roaming_ids(self):
"""Returns the network ids stored in the SIM to roam"""
# a.k.a. AT+CPOL?
d = super(WCDMAWrapper, self).get_roaming_ids()
d.addCallback(lambda raw: [BasicNetworkOperator(
obj.group('netid')) for obj in raw if int(obj.group('netid'))])
return d
def get_signal_quality(self):
"""Returns the signal level quality"""
if self.device.status < MM_MODEM_STATE_ENABLED:
return defer.succeed(0)
return self.maybecached('signal',
super(WCDMAWrapper, self).get_signal_quality,
[lambda response: int(response[0].group('rssi')),
rssi_to_percentage])
def get_sms(self, index):
return self.mal.get_sms(index)
def do_get_sms(self, index):
"""
Returns a ``Message`` object representing the SMS at ``index``
"""
d = super(WCDMAWrapper, self).get_sms(index)
def get_sms_cb(rawsms):
try:
sms = Message.from_pdu(rawsms[0].group('pdu'))
sms.where = int(rawsms[0].group('where'))
sms.index = index
except IndexError:
# handle bogus CMTI notifications, see #180
return None
return sms
d.addCallback(get_sms_cb)
return d
def get_sms_format(self):
"""
Returns 1 if SMS format is text and 0 if SMS format is PDU
"""
d = super(WCDMAWrapper, self).get_sms_format()
d.addCallback(lambda response: int(response[0].group('format')))
return d
def get_smsc(self):
"""Returns the SMSC number stored in the SIM"""
d = super(WCDMAWrapper, self).get_smsc()
def get_smsc_cb(response):
try:
smsc = response[0].group('smsc')
if not smsc.startswith('+'):
if check_if_ucs2(smsc):
smsc = from_u(unpack_ucs2_bytes(smsc))
return smsc
except KeyError:
raise E.NotFound()
d.addCallback(get_smsc_cb)
return d
def get_spn(self):
"""
Returns SPN Service Provider Name from SIM.
"""
# AT+CRSM=176,28486,0,1,16
d = self.sim_access_restricted(COM_READ_BINARY, EF_SPN, 0, 1, 16)
def get_spn_cb(response):
sw1, sw2, spn = response
if spn is None:
raise E.General('SPN not returned.')
if spn:
spn = from_8bit_in_gsm_or_ts31101(spn)
return spn or ''
d.addCallback(get_spn_cb)
return d
def list_available_mms(self):
return self.mal.list_available_mms_notifications()
def list_contacts(self):
"""
Returns all the contacts in the SIM
:rtype: list
"""
def not_found_eb(failure):
failure.trap(E.NotFound, E.InvalidIndex, E.General)
return []
def get_them(ignored=None):
d = super(WCDMAWrapper, self).list_contacts()
d.addCallback(lambda matches:
map(self._regexp_to_contact, matches))
d.addErrback(not_found_eb)
return d
if self.device.sim.size:
return get_them()
else:
d = self._get_next_contact_id()
d.addCallback(get_them)
return d
def _regexp_to_contact(self, match):
"""
Returns a :class:`core.contact.Contact` out of ``match``
:type match: ``re.MatchObject``
"""
name = match.group('name')
if self.device.sim.charset == 'UCS2':
name = from_ucs2(name)
number = match.group('number')
index = int(match.group('id'))
return Contact(name, number, index=index)
def list_sms(self):
return self.mal.list_sms()
def do_list_sms(self):
"""
Returns all the SMS in the SIM card
:rtype: list
"""
d = super(WCDMAWrapper, self).list_sms()
def get_all_sms_cb(messages):
sms_list = []
for rawsms in messages:
try:
sms = Message.from_pdu(rawsms.group('pdu'))
sms.index = int(rawsms.group('id'))
sms.where = int(rawsms.group('where'))
sms_list.append(sms)
except ValueError:
log.err(ex.MalformedSMSError,
"Malformed PDU: %s" % rawsms.group('pdu'))
return sms_list
d.addCallback(get_all_sms_cb)
return d
def save_sms(self, sms):
return self.mal.save_sms(sms)
def do_save_sms(self, sms):
"""
Stores ``sms`` and returns a list of indexes
``sms`` might span several messages if it is a multipart SMS
"""
save_sms = super(WCDMAWrapper, self).save_sms
ret = [save_sms(p.pdu, p.length) for p in sms.to_pdu(store=True)]
d = defer.gatherResults(ret)
# the order is important! You need to run gatherResults and add
# the callback to its result, not the other way around!
d.addCallback(lambda response:
[int(resp[0].group('index')) for resp in response])
return d
def send_at(self, atstr, name='send_at', callback=None):
"""Sends an arbitrary AT string ``atstr``"""
d = super(WCDMAWrapper, self).send_at(atstr, name=name)
if callback is None:
d.addCallback(lambda response: response[0].group('resp'))
else:
d.addCallback(callback)
return d
def send_pin(self, pin):
"""
Sends ``pin`` to authenticate
Most devices need some time to settle after a successful auth
it is the caller's responsability to give at least 15 seconds
to the device to settle, this time varies from device to device
"""
from core.startup import attach_to_serial_port
d = attach_to_serial_port(self.device)
d.addCallback(lambda _: super(WCDMAWrapper, self).send_pin(pin))
d.addCallback(lambda response: response[0].group('resp'))
return d
def send_puk(self, puk, pin):
"""
Send ``puk`` and ``pin`` to authenticate
Most devices need some time to settle after a successful auth
it is the caller's responsability to give at least 15 seconds
to the device to settle, this time varies from device to device
"""
d = super(WCDMAWrapper, self).send_puk(puk, pin)
d.addCallback(lambda response: response[0].group('resp'))
return d
def send_mms(self, mms, extra_info):
"""Send ``mms`` and returns the Message-Id"""
return self.mal.send_mms(mms, extra_info)
def do_send_mms(self, mms, extra_info):
if 'wap2' not in extra_info:
raise ValueError("Only WAP2.0 is supported at the moment")
if 'mmsc' not in extra_info:
raise ValueError("No mmsc key in %s" % extra_info)
return send_m_send_req(extra_info, mms)
def send_sms(self, sms):
"""
Sends ``sms`` and returns the indexes
``sms`` might span several messages if it is a multipart SMS
"""
return self.mal.send_sms(sms)
def do_send_sms(self, sms):
def send_sms_cb(response):
return int(response[0].group('index'))
ret = []
for pdu in sms.to_pdu():
d = super(WCDMAWrapper, self).send_sms(pdu.pdu, pdu.length)
d.addCallback(send_sms_cb)
ret.append(d)
return defer.gatherResults(ret)
def send_sms_from_storage(self, index):
"""Sends the SMS stored at ``index`` and returns the new index"""
return self.mal.send_sms_from_storage(index)
def do_send_sms_from_storage(self, index):
d = super(WCDMAWrapper, self).send_sms_from_storage(index)
d.addCallback(lambda response: int(response[0].group('index')))
return d
def send_ussd(self, ussd, force_ascii=False, loose_charset_check=False):
"""Sends the ussd command ``ussd``"""
def convert_response(response):
index = response[0].group('index')
if index == '1':
self.device.set_property(USD_INTFACE, 'State', 'user-response')
else:
self.device.set_property(USD_INTFACE, 'State', 'idle')
resp = response[0].group('resp')
if resp is None:
return "" # returning the Empty string is valid
if not check_if_ucs2(resp, limit=LATIN_EX_B):
if 'UCS2' in self.device.sim.charset and \
not loose_charset_check:
raise E.MalformedUssdPduError(resp)
else:
return resp
else:
try:
return unpack_ucs2_bytes(resp)
except (TypeError, UnicodeDecodeError):
raise E.MalformedUssdPduError(resp)
def reset_state(failure):
if self.device.get_property(USD_INTFACE, 'State') != 'idle':
self.device.set_property(USD_INTFACE, 'State', 'idle')
failure.raiseException() # re-raise
if 'UCS2' in self.device.sim.charset and not force_ascii:
ussd = pack_ucs2_bytes(ussd)
d = super(WCDMAWrapper, self).send_ussd(str(ussd))
d.addCallback(convert_response)
d.addErrback(reset_state)
return d
def set_allowed_mode(self, mode):
raise NotImplementedError("Implement it in the device family wrapper")
def set_apn(self, apn):
"""Sets the APN to ``apn``"""
def process_apns(apns, the_apn):
for _index, _apn in apns:
if _apn == the_apn:
self.state_dict['conn_id'] = _index
return defer.succeed('OK')
try:
conn_id = max([idx for idx, _ in apns]) + 1
if conn_id < self.apn_range[0] or conn_id > self.apn_range[1]:
raise ValueError
except (ValueError, TypeError):
conn_id = self.apn_range[0]
self.state_dict['conn_id'] = conn_id
d = super(WCDMAWrapper, self).set_apn(conn_id, the_apn)
d.addCallback(lambda response: response[0].group('resp'))
return d
d = self.get_apns()
d.addCallback(process_apns, apn)
return d
def set_band(self, band):
"""Sets the device band to ``band``"""
raise NotImplementedError()
def set_charset(self, charset):
"""Sets the SIMs charset to ``charset``"""
d = super(WCDMAWrapper, self).set_charset(charset)
d.addCallback(lambda ignored: self.device.sim.set_charset(charset))
return d
def set_error_level(self, level):
"""Sets the modem's error reporting level to ``level``"""
d = super(WCDMAWrapper, self).set_error_level(level)
d.addCallback(lambda response: response[0].group('resp'))
return d
def set_network_mode(self, mode):
"""Sets the network mode to ``mode``"""
raise NotImplementedError()
def enable_radio(self, enable):
"""
Enables the radio according to ``enable``
It will not enable it if its already enabled and viceversa
"""
def check_if_necessary(status):
if (status == 1 and enable) or (status == 0 and not enable):
return defer.succeed('OK')
d = super(WCDMAWrapper, self).enable_radio(enable)
d.addCallback(lambda response: response[0].group('resp'))
return d
d = self.get_radio_status()
d.addCallback(check_if_necessary)
return d
def set_sms_format(self, _format=0):
"""Sets PDU mode or text mode in the SIM"""
d = super(WCDMAWrapper, self).set_sms_format(_format)
d.addCallback(lambda response: response[0].group('resp'))
return d
def set_smsc(self, smsc):
"""Sets the SIMS's SMSC number to ``smsc``"""
if 'UCS2' in self.device.sim.charset:
smsc = pack_ucs2_bytes(smsc)
d = super(WCDMAWrapper, self).set_smsc(smsc)
d.addCallback(lambda response: response[0].group('resp'))
return d
# some high-level methods exported over DBus
def init_properties(self):
# XXX: Implement UnlockRetries
self.device.set_property(MDM_INTFACE,
'UnlockRetries', dbus.UInt32(999))
# There's no way to query this, so we have to assume :-(
self.device.set_property(USD_INTFACE, 'State', 'idle')
d = self.get_bands()
d.addCallback(lambda bands: self.device.set_property(CRD_INTFACE,
'SupportedBands', dbus.UInt32(bands)))
d.addCallback(lambda _: self.get_network_modes())
d.addCallback(lambda modes: self.device.set_property(CRD_INTFACE,
'SupportedModes', dbus.UInt32(modes)))
d.addCallback(lambda _: self.get_pin_status())
d.addCallback(lambda active: self.device.set_property(CRD_INTFACE,
'PinEnabled', dbus.Boolean(active)))
d.addCallback(lambda _: self.get_imei())
d.addCallback(lambda imei: self.device.set_property(MDM_INTFACE,
'EquipmentIdentifier', imei))
d.addCallback(lambda _: self.get_iccid())
def iccid_eb(failure):
failure.trap(E.General)
self.device.set_property(CRD_INTFACE, 'SimIdentifier', ''),
d.addCallbacks(lambda iccid:
self.device.set_property(CRD_INTFACE,
'SimIdentifier', iccid), iccid_eb)
return d
def get_simple_status(self):
"""Returns the status for o.fd.MM.Modem.Simple.GetStatus"""
if self.device.status < MM_MODEM_STATE_ENABLED:
return defer.succeed(dict(state=self.device.status))
def get_simple_status_cb((sigstrength, netinfo, band, net_mode)):
return dict(state=self.device.status,
signal_quality=sigstrength,
operator_code=netinfo[1],
operator_name=netinfo[2],
band=band,
network_mode=net_mode)
deferred_list = []
deferred_list.append(self.get_signal_quality())
deferred_list.append(self.get_netreg_info())
deferred_list.append(self.get_band())
deferred_list.append(self.get_network_mode())
d = defer.gatherResults(deferred_list)
d.addCallback(get_simple_status_cb)
d.addErrback(log.err)
return d
def connect_simple(self, settings):
"""Connects with the given ``settings``"""
if self.device.status == MM_MODEM_STATE_CONNECTED:
# this cannot happen
raise E.Connected("we are already connected")
if self.device.status == MM_MODEM_STATE_CONNECTING:
raise E.SimBusy("we are already connecting")
def connect_eb(failure):
log.msg("connect_simple errorback")
if self.device.status >= MM_MODEM_STATE_REGISTERED:
self.device.set_status(MM_MODEM_STATE_REGISTERED)
failure.raiseException() # re-raise
simplesm = self.device.custom.simp_klass(self.device, settings)
d = simplesm.start_simple()
d.addCallback(lambda _:
self.device.set_status(MM_MODEM_STATE_CONNECTED))
d.addErrback(connect_eb)
return d
def connect_to_internet(self, settings):
# Note: this is called by:
# 1/ connect_simple via simple state machine
# 2/ directly by Connect() dbus method
self.device.set_property(NET_INTFACE, 'LastApn',
unicode(settings.get('apn', '')))
ip_method = self.device.get_property(MDM_INTFACE, 'IpMethod')
if ip_method == MM_IP_METHOD_PPP:
d = self.connect_to_internet_ppp(settings)
else:
d = self.connect_to_internet_hso(settings)
d.addCallback(self.start_traffic_monitoring)
return d
def connect_to_internet_hso(self, settings):
username = settings.get('username', '')
password = settings.get('password', '')
auth = settings.get('allowed_auth', None)
d = self.hso_authenticate(username, password, auth)
d.addCallback(lambda _: self.hso_connect())
return d
def connect_to_internet_ppp(self, settings):
"""Opens data port and dials ``settings['number']`` in"""
if self.device.status == MM_MODEM_STATE_CONNECTED:
# this cannot happen
raise E.Connected("we are already connected")
if self.device.status == MM_MODEM_STATE_CONNECTING:
raise E.SimBusy("we are already connecting")
self.device.set_status(MM_MODEM_STATE_CONNECTING)
# open the data port
port = self.device.ports.dport
# this will raise a SerialException if port is busy
port.obj = serial.Serial(port.path)
port.obj.flush()
# send ATDT and convert number to string as pyserial does
# not like to write unicode to serial ports
number = settings.get('number')
d = defer.maybeDeferred(port.obj.write,
"ATDT%s\r\n" % str(number))
# we should detect error or success here and set state
return d
def disconnect_from_internet(self):
"""Disconnects the modem"""
ip_method = self.device.get_property(MDM_INTFACE, 'IpMethod')
if ip_method == MM_IP_METHOD_PPP:
d = self.disconnect_from_internet_ppp()
else:
d = self.disconnect_from_internet_hso()
d.addCallback(self.stop_traffic_monitoring)
return d
def disconnect_from_internet_hso(self):
# NM usually issues disconnect as part of a connect sequence
if self.device.status < MM_MODEM_STATE_CONNECTED:
return defer.succeed(True)
return self.hso_disconnect()
def disconnect_from_internet_ppp(self):
"""Disconnects the modem temporarily lowering the DTR"""
# NM usually issues disconnect as part of a connect sequence
if self.device.status < MM_MODEM_STATE_CONNECTED:
return defer.succeed(True)
port = self.device.ports.dport
if not port.obj.isOpen():
raise AttributeError("Data serial port is not open")
self.device.set_status(MM_MODEM_STATE_DISCONNECTING)
# XXX: should check that we did stop the connection and set status
def restore_speed(speed):
try:
port.obj.setBaudrate(speed)
except serial.SerialException:
pass
port.obj.close()
# XXX: perhaps we should check the registration status here
if self.device.status > MM_MODEM_STATE_REGISTERED:
self.device.set_status(MM_MODEM_STATE_REGISTERED)
return True
# lower and raise baud speed
speed = port.obj.getBaudrate()
try:
port.obj.setBaudrate(0)
except serial.SerialException:
pass
# restore the speed in .1 seconds
return task.deferLater(reactor, .1, restore_speed, speed)
def get_stats(self):
"""
Returns a tuple with the connection statistics for this dialer
:return: (in_bytes, out_bytes)
"""
if self.iface is not None:
now = time()
rx_bytes, tx_bytes = get_os_object().get_iface_stats(self.iface)
# if any of these three are not 0, it means that this is at
# least the second time this method is executed, thus we
# should have cached meaningful data
if self.__rx_bytes or self.__tx_bytes or self.__time:
rx_delta = rx_bytes - self.__rx_bytes
tx_delta = tx_bytes - self.__tx_bytes
interval = now - self.__time
raw_rx_rate = int(floor(rx_delta / interval))
raw_tx_rate = int(floor(tx_delta / interval))
rx_rate = raw_rx_rate if raw_rx_rate >= 0 else 0
tx_rate = raw_tx_rate if raw_tx_rate >= 0 else 0
else:
# first time this is executed, we cannot reliably compute
# the rate. It is better to lie just once
rx_rate = tx_rate = 0
self.__rx_bytes, self.__tx_bytes = rx_bytes, tx_bytes
self.__time = now
return rx_bytes, tx_bytes, rx_rate, tx_rate
def register_with_netid(self, netid):
"""
I will try my best to register with ``netid``
If ``netid`` is an empty string, I will register with my home network
"""
netr_klass = self.device.custom.netr_klass
netsm = netr_klass(self.device.sconn, netid)
return netsm.start_netreg()
def start_traffic_monitoring(self, x=None):
ip_method = self.device.get_property(MDM_INTFACE, 'IpMethod')
if ip_method == MM_IP_METHOD_PPP:
self.iface = 'ppp0' # XXX: don't hardcode to first PPP instance
else:
self.iface = self.device.get_property(MDM_INTFACE, 'Device')
if self.stats_id is None:
self.stats_id = timeout_add_seconds(1, self._emit_dial_stats)
return x
def stop_traffic_monitoring(self, x=None):
# remove the emit stats task
if self.stats_id is not None:
source_remove(self.stats_id)
self.stats_id = None
return x
def enable_device(self, enable):
"""
I enable or disable myself according to ``enable``
If enable is True, I check the auth state of a device and will try to
initialize it. Otherwise I will disable myself
"""
if enable:
return self._do_enable_device()
else:
return self._do_disable_device()
def _do_disable_device(self):
self.clean_signals()
if self.device.status == MM_MODEM_STATE_CONNECTED:
def on_disconnect_from_internet(_):
if self.device.status >= MM_MODEM_STATE_REGISTERED:
self.device.set_status(MM_MODEM_STATE_REGISTERED)
self.device.close()
d = self.disconnect_from_internet()
d.addCallback(on_disconnect_from_internet)
d.addErrback(log.err)
return d
if self.device.status >= MM_MODEM_STATE_ENABLED:
self.device.close()
return defer.succeed(None)
def _do_enable_device(self):
if self.device.status >= MM_MODEM_STATE_ENABLED:
return defer.succeed(self.device)
if self.device.status == MM_MODEM_STATE_ENABLING:
raise E.SimBusy()
self.device.set_status(MM_MODEM_STATE_ENABLING)
def signals(resp):
self.connect_to_signals()
# XXX: This netreg notification seems to be unrelated to enable,
# perhaps it should be moved?
self.device.sconn.set_netreg_notification(1)
if self.device.status < MM_MODEM_STATE_ENABLED:
self.device.set_status(MM_MODEM_STATE_ENABLED)
return resp
def setdefaults(resp):
def apn_range_cb(result):
self.apn_range = result
return resp
def apn_range_eb(e):
self.apn_range = (1, 3)
return resp
d = self.device.sconn.get_apn_range()
d.addCallback(apn_range_cb)
d.addErrback(apn_range_eb)
return d
from core.startup import attach_to_serial_port
def process_device_and_initialize(device):
self.device = device
auth_klass = self.device.custom.auth_klass
authsm = auth_klass(self.device)
def set_status(failure):
self.device.set_status(MM_MODEM_STATE_DISABLED)
failure.raiseException() # re-raise
d = authsm.start_auth()
# if auth is ready, the device will initialize straight away
# if auth isn't ready the callback chain won't be executed and
# will just return the given exception
d.addErrback(set_status)
d.addCallback(self.device.initialize)
d.addCallback(setdefaults)
d.addCallback(signals)
return d
d = attach_to_serial_port(self.device)
d.addCallback(process_device_and_initialize)
return d
def _check_initted_device(self, result):
"""
To be executed after successful authentication over DBus
Network Manager calls this via SendPin() even when not performing an
Enable or SimpleConnect, it's just done as part of noticing a new
device has appeared. So after we have unlocked we save a timestamp so
that any subsequent initialisation can check if the requisite settling
DELAY has elapsed, but we can't continue blindly to initialisation as
used to be the case.
"""
self.device.set_property(MDM_INTFACE, 'UnlockRequired', '')
self.device.set_authtime(time())
return result
def _emit_dial_stats(self):
stats = self.get_stats()
if stats is not None:
self.device.exporter.DialStats(stats)
# make sure this is repeatedly called
return True
class BasicNetworkOperator(object):
"""A Network operator with a netid"""
def __init__(self, netid):
super(BasicNetworkOperator, self).__init__()
self.netid = from_ucs2(netid)
def __repr__(self):
return '<BasicNetworkOperator: %s>' % self.netid
def __cmp__(self, o):
try:
a = int(self.netid)
except (AttributeError, NameError, TypeError, ValueError):
a = None
try:
b = int(o.netid)
except (AttributeError, NameError, TypeError, ValueError):
b = None
return cmp(a, b)
def __eq__(self, o):
return self.netid == o.netid
def __ne__(self, o):
return not self.__eq__(o)
class NetworkOperator(BasicNetworkOperator):
"""I represent a network operator on a mobile network"""
def __init__(self, stat, long_name, short_name, netid, rat):
super(NetworkOperator, self).__init__(netid)
self.stat = int(stat)
self.long_name = from_ucs2(long_name)
self.short_name = from_ucs2(short_name)
self.rat = int(rat)
def __repr__(self):
args = (self.long_name, self.netid)
return '<NetworkOperator "%s" netid: %s>' % args
|