1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117
|
#!/usr/bin/python
import asyncio
import logging
import traceback
from pyrad.dictionary import Dictionary
from pyrad.server_async import ServerAsync
from pyrad.packet import AccessAccept
from pyrad.server import RemoteHost
try:
import uvloop
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
except:
pass
logging.basicConfig(level="DEBUG",
format="%(asctime)s [%(levelname)-8s] %(message)s")
class FakeServer(ServerAsync):
def __init__(self, loop, dictionary):
ServerAsync.__init__(self, loop=loop, dictionary=dictionary,
enable_pkt_verify=True, debug=True)
def handle_auth_packet(self, protocol, pkt, addr):
print("Received an authentication request with id ", pkt.id)
print('Authenticator ', pkt.authenticator.hex())
print('Secret ', pkt.secret)
print("Attributes: ")
for attr in pkt.keys():
print("%s: %s" % (attr, pkt[attr]))
reply = self.CreateReplyPacket(pkt, **{
"Service-Type": "Framed-User",
"Framed-IP-Address": '192.168.0.1',
"Framed-IPv6-Prefix": "fc66::1/64"
})
reply.code = AccessAccept
protocol.send_response(reply, addr)
def handle_acct_packet(self, protocol, pkt, addr):
print("Received an accounting request")
print("Attributes: ")
for attr in pkt.keys():
print("%s: %s" % (attr, pkt[attr]))
reply = self.CreateReplyPacket(pkt)
protocol.send_response(reply, addr)
def handle_coa_packet(self, protocol, pkt, addr):
print("Received an coa request")
print("Attributes: ")
for attr in pkt.keys():
print("%s: %s" % (attr, pkt[attr]))
reply = self.CreateReplyPacket(pkt)
protocol.send_response(reply, addr)
def handle_disconnect_packet(self, protocol, pkt, addr):
print("Received an disconnect request")
print("Attributes: ")
for attr in pkt.keys():
print("%s: %s" % (attr, pkt[attr]))
reply = self.CreateReplyPacket(pkt)
# COA NAK
reply.code = 45
protocol.send_response(reply, addr)
if __name__ == '__main__':
# create server and read dictionary
loop = asyncio.get_event_loop()
server = FakeServer(loop=loop, dictionary=Dictionary('dictionary'))
# add clients (address, secret, name)
server.hosts["127.0.0.1"] = RemoteHost("127.0.0.1",
b"Kah3choteereethiejeimaeziecumi",
"localhost")
try:
# Initialize transports
loop.run_until_complete(
asyncio.ensure_future(
server.initialize_transports(enable_auth=True,
enable_acct=True,
enable_coa=True)))
try:
# start server
loop.run_forever()
except KeyboardInterrupt as k:
pass
# Close transports
loop.run_until_complete(asyncio.ensure_future(
server.deinitialize_transports()))
except Exception as exc:
print('Error: ', exc)
print('\n'.join(traceback.format_exc().splitlines()))
# Close transports
loop.run_until_complete(asyncio.ensure_future(
server.deinitialize_transports()))
loop.close()
|