1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169
|
#!/usr/bin/env python
import json
from datetime import datetime
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from sqlalchemy.schema import Column
from sqlalchemy.types import DateTime, Integer, LargeBinary
from libnmap.plugins.backendplugin import NmapBackendPlugin
from libnmap.reportjson import ReportDecoder, ReportEncoder
Base = declarative_base()
class NmapSqlPlugin(NmapBackendPlugin):
"""
This class handle the persistence of NmapRepport object in SQL backend
Implementation is made using sqlalchemy(0.8.1)
usage :
#get a nmapReport object
from libnmap.parser import NmapParser
from libnmap.reportjson import ReportDecoder, ReportEncoder
import json
nmap_report_obj = NmapParser.parse_fromfile(
'/home/vagrant/python-nmap-lib/libnmap/test/files/1_hosts.xml')
#get a backend with in memory sqlite
from libnmap.plugins.backendpluginFactory import BackendPluginFactory
mybackend_mem = BackendPluginFactory.create(plugin_name='sql',
url='sqlite://',
echo=True)
mybackend_mysql = BackendPluginFactory.create(plugin_name='sql',
url='mysql+mysqldb://scott:tiger@localhost/foo',
echo=True)
mybackend = BackendPluginFactory.create(plugin_name='sql',
url='sqlite:////tmp/reportdb.sql',
echo=True)
#lets save!!
nmap_report_obj.save(mybackend)
mybackend.getall()
mybackend.get(1)
"""
class Reports(Base):
"""
Embeded class for ORM map NmapReport to a
simple three column table
"""
__tablename__ = "reports"
id = Column("report_id", Integer, primary_key=True)
inserted = Column("inserted", DateTime(), default="now")
report_json = Column("report_json", LargeBinary())
def __init__(self, obj_NmapReport):
self.inserted = datetime.fromtimestamp(obj_NmapReport.endtime)
dumped_json = json.dumps(obj_NmapReport, cls=ReportEncoder)
self.report_json = bytes(dumped_json.encode("UTF-8"))
def decode(self):
json_decoded = self.report_json.decode("utf-8")
nmap_report_obj = json.loads(json_decoded, cls=ReportDecoder)
return nmap_report_obj
def __init__(self, **kwargs):
"""
constructor receive a **kwargs as the **kwargs in the sqlalchemy
create_engine() method (see sqlalchemy docs)
You must add to this **kwargs an 'url' key with the url to your
database
This constructor will :
- create all the necessary obj to discuss with the DB
- create all the mapping(ORM)
todo : support the : sqlalchemy.engine_from_config
:param **kwargs:
:raises: ValueError if no url is given,
all exception sqlalchemy can throw
ie sqlite in memory url='sqlite://' echo=True
ie sqlite file on hd url='sqlite:////tmp/reportdb.sql' echo=True
ie mysql url='mysql+mysqldb://scott:tiger@localhost/foo'
"""
NmapBackendPlugin.__init__(self)
self.engine = None
self.url = None
self.Session = sessionmaker()
if "url" not in kwargs:
raise ValueError
self.url = kwargs["url"]
del kwargs["url"]
try:
self.engine = create_engine(self.url, **kwargs)
Base.metadata.create_all(bind=self.engine, checkfirst=True)
self.Session.configure(bind=self.engine)
except Exception as e:
raise (e)
def insert(self, nmap_report):
"""
insert NmapReport in the backend
:param NmapReport:
:returns: the ident of the object in the backend for future usage \
or None
"""
sess = self.Session()
report = NmapSqlPlugin.Reports(nmap_report)
sess.add(report)
sess.commit()
reportid = report.id
sess.close()
return reportid if reportid else None
def get(self, report_id=None):
"""
retrieve a NmapReport from the backend
:param id: str
:returns: NmapReport
"""
if report_id is None:
raise ValueError
sess = self.Session()
orp = sess.query(NmapSqlPlugin.Reports).filter_by(id=report_id)
our_report = orp.first()
sess.close()
return our_report.decode() if our_report else None
def getall(self):
"""
:param filter: Nice to have implement a filter capability
:returns: collection of tuple (id,NmapReport)
"""
sess = self.Session()
nmapreportList = []
for report in sess.query(NmapSqlPlugin.Reports).order_by(
NmapSqlPlugin.Reports.inserted
):
nmapreportList.append((report.id, report.decode()))
sess.close()
return nmapreportList
def delete(self, report_id=None):
"""
Remove a report from the backend
:param id: str
:returns: The number of rows deleted
"""
if report_id is None:
raise ValueError
nb_line = 0
sess = self.Session()
rpt = sess.query(NmapSqlPlugin.Reports).filter_by(id=report_id)
nb_line = rpt.delete()
sess.commit()
sess.close()
return nb_line
|