1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169
|
#!/usr/bin/python
# Tests P2P_Disconnect
# Will perform disconnect on interface_name
######### MAY NEED TO RUN AS SUDO #############
import dbus
import sys, os
import time
import gobject
import threading
import getopt
from dbus.mainloop.glib import DBusGMainLoop
def usage():
print "Usage:"
print " %s -i <interface_name> \ " \
% sys.argv[0]
print " [-w <wpas_dbus_interface>]"
print "Options:"
print " -i = interface name"
print " -w = wpas dbus interface = fi.w1.wpa_supplicant1"
print "Example:"
print " %s -i p2p-wlan0-0" % sys.argv[0]
# Required Signals
def GroupFinished(status, etc):
print "Disconnected"
os._exit(0)
class P2P_Disconnect (threading.Thread):
# Needed Variables
global bus
global wpas_object
global interface_object
global p2p_interface
global interface_name
global wpas
global wpas_dbus_interface
global path
global timeout
# Dbus Paths
global wpas_dbus_opath
global wpas_dbus_interfaces_opath
global wpas_dbus_interfaces_interface
global wpas_dbus_interfaces_p2pdevice
# Constructor
def __init__(self,interface_name,wpas_dbus_interface,timeout):
# Initializes variables and threads
self.interface_name = interface_name
self.wpas_dbus_interface = wpas_dbus_interface
self.timeout = timeout
# Initializes thread and daemon allows for ctrl-c kill
threading.Thread.__init__(self)
self.daemon = True
# Generating interface/object paths
self.wpas_dbus_opath = "/" + \
self.wpas_dbus_interface.replace(".","/")
self.wpas_wpas_dbus_interfaces_opath = self.wpas_dbus_opath + \
"/Interfaces"
self.wpas_dbus_interfaces_interface = \
self.wpas_dbus_interface + ".Interface"
self.wpas_dbus_interfaces_p2pdevice = \
self.wpas_dbus_interfaces_interface \
+ ".P2PDevice"
# Getting interfaces and objects
DBusGMainLoop(set_as_default=True)
self.bus = dbus.SystemBus()
self.wpas_object = self.bus.get_object(
self.wpas_dbus_interface,
self.wpas_dbus_opath)
self.wpas = dbus.Interface(self.wpas_object,
self.wpas_dbus_interface)
# Try to see if supplicant knows about interface
# If not, throw an exception
try:
self.path = self.wpas.GetInterface(
self.interface_name)
except dbus.DBusException, exc:
error = 'Error:\n Interface ' + self.interface_name \
+ ' was not found'
print error
usage()
os._exit(0)
self.interface_object = self.bus.get_object(
self.wpas_dbus_interface, self.path)
self.p2p_interface = dbus.Interface(self.interface_object,
self.wpas_dbus_interfaces_p2pdevice)
# Signals
self.bus.add_signal_receiver(GroupFinished,
dbus_interface=self.wpas_dbus_interfaces_p2pdevice,
signal_name="GroupFinished")
# Runs p2p_disconnect
def run(self):
# Allows other threads to keep working while MainLoop runs
# Required for timeout implementation
gobject.MainLoop().get_context().iteration(True)
gobject.threads_init()
self.p2p_interface.Disconnect()
gobject.MainLoop().run()
if __name__ == "__main__":
timeout = 5
# Defaults for optional inputs
wpas_dbus_interface = 'fi.w1.wpa_supplicant1'
# interface_name is required
interface_name = None
# Using getopts to handle options
try:
options, args = getopt.getopt(sys.argv[1:],"hi:w:")
except getopt.GetoptError:
usage()
quit()
# If theres a switch, override default option
for key, value in options:
# Help
if (key == "-h"):
usage()
quit()
# Interface Name
elif (key == "-i"):
interface_name = value
# Dbus interface
elif (key == "-w"):
wpas_dbus_interface = value
else:
assert False, "unhandled option"
# Interface name is required and was not given
if (interface_name == None):
print "Error:\n interface_name is required"
usage()
quit()
# Constructor
try:
p2p_disconnect_test = P2P_Disconnect(interface_name,
wpas_dbus_interface,timeout)
except:
print "Error:\n Invalid wpas_dbus_interface"
usage()
quit()
# Start P2P_Disconnect
p2p_disconnect_test.start()
try:
time.sleep(int(p2p_disconnect_test.timeout))
except:
pass
print "Disconnect timed out"
quit()
|