1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201
|
#!/usr/bin/python
# Tests p2p_invite
######### MAY NEED TO RUN AS SUDO #############
import dbus
import sys, os
import time
import gobject
import getopt
import threading
from dbus.mainloop.glib import DBusGMainLoop
def usage():
print "Usage:"
print " %s -i <interface_name> -a <addr> \ " \
% sys.argv[0]
print " [-o <persistent_group_object>] [-w <wpas_dbus_interface>]"
print "Options:"
print " -i = interface name"
print " -a = address of peer"
print " -o = persistent group object path"
print " -w = wpas dbus interface = fi.w1.wpa_supplicant1"
print "Example:"
print " %s -i p2p-wlan0-0 -a 00150083523c" % sys.argv[0]
# Required Signals
def InvitationResult(invite_result):
print "Inviation Result signal :"
status = invite_result['status']
print "status = ", status
if invite_result.has_key('BSSID'):
bssid = invite_result['BSSID']
print "BSSID = ", hex(bssid[0]) , ":" , \
hex(bssid[1]) , ":" , hex(bssid[2]) , ":", \
hex(bssid[3]) , ":" , hex(bssid[4]) , ":" , \
hex(bssid[5])
os._exit(0)
class P2P_Invite (threading.Thread):
# Needed Variables
global bus
global wpas_object
global interface_object
global p2p_interface
global interface_name
global wpas
global wpas_dbus_interface
global path
global addr
global persistent_group_object
# Dbus Paths
global wpas_dbus_opath
global wpas_dbus_interfaces_opath
global wpas_dbus_interfaces_interface
global wpas_dbus_interfaces_p2pdevice
# Arguements
global P2PDictionary
# Constructor
def __init__(self,interface_name,wpas_dbus_interface,addr,
persistent_group_object):
# Initializes variables and threads
self.interface_name = interface_name
self.wpas_dbus_interface = wpas_dbus_interface
self.addr = addr
self.persistent_group_object = persistent_group_object
# Initializes thread and daemon allows for ctrl-c kill
threading.Thread.__init__(self)
self.daemon = True
# Generating interface/object paths
self.wpas_dbus_opath = "/" + \
self.wpas_dbus_interface.replace(".","/")
self.wpas_wpas_dbus_interfaces_opath = self.wpas_dbus_opath + \
"/Interfaces"
self.wpas_dbus_interfaces_interface = \
self.wpas_dbus_interface + ".Interface"
self.wpas_dbus_interfaces_p2pdevice = \
self.wpas_dbus_interfaces_interface \
+ ".P2PDevice"
# Getting interfaces and objects
DBusGMainLoop(set_as_default=True)
self.bus = dbus.SystemBus()
self.wpas_object = self.bus.get_object(
self.wpas_dbus_interface,
self.wpas_dbus_opath)
self.wpas = dbus.Interface(self.wpas_object,
self.wpas_dbus_interface)
# Try to see if supplicant knows about interface
# If not, throw an exception
try:
self.path = self.wpas.GetInterface(
self.interface_name)
except dbus.DBusException, exc:
error = 'Error:\n Interface ' + self.interface_name \
+ ' was not found'
print error
usage()
os._exit(0)
self.interface_object = self.bus.get_object(
self.wpas_dbus_interface, self.path)
self.p2p_interface = dbus.Interface(self.interface_object,
self.wpas_dbus_interfaces_p2pdevice)
#Adds listeners
self.bus.add_signal_receiver(InvitationResult,
dbus_interface=self.wpas_dbus_interfaces_p2pdevice,
signal_name="InvitationResult")
# Sets up p2p_invite dictionary
def constructArguements(self):
self.P2PDictionary = \
{'peer':dbus.ObjectPath(self.path+'/Peers/'+self.addr)}
if (self.persistent_group_object != None):
self.P2PDictionary.update({"persistent_group_object":
self.persistent_group_object})
# Run p2p_invite
def run(self):
try:
self.p2p_interface.Invite(self.P2PDictionary)
except:
print "Error:\n Invalid Arguements"
usage()
os._exit(0)
# Allows other threads to keep working while MainLoop runs
# Required for timeout implementation
gobject.MainLoop().get_context().iteration(True)
gobject.threads_init()
gobject.MainLoop().run()
if __name__ == "__main__":
# Defaults for optional inputs
addr = None
persistent_group_object = None
wpas_dbus_interface = 'fi.w1.wpa_supplicant1'
# interface_name is required
interface_name = None
# Using getopts to handle options
try:
options, args = getopt.getopt(sys.argv[1:],"hi:o:w:a:")
except getopt.GetoptError:
usage()
quit()
# If theres a switch, override default option
for key, value in options:
# Help
if (key == "-h"):
usage()
quit()
# Interface Name
elif (key == "-i"):
interface_name = value
elif (key == "-a"):
addr = value
# Persistent group object path
elif (key == "-o"):
persistent_group_object = value
# Dbus interface
elif (key == "-w"):
wpas_dbus_interface = value
else:
assert False, "unhandled option"
# Interface name is required and was not given
if (interface_name == None):
print "Error:\n interface_name is required"
usage()
quit()
if (addr == None):
print "Error:\n peer address is required"
usage()
quit()
try:
p2p_invite_test = \
P2P_Invite(interface_name,wpas_dbus_interface,
addr,persistent_group_object)
except:
print "Error:\n Invalid Arguements"
usage()
os._exit(1)
p2p_invite_test.constructArguements()
p2p_invite_test.start()
time.sleep(10)
print "Error:\n p2p_invite timed out"
os._exit(0)
|