1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120
|
#code to generate eEventTypes
import xml.etree.ElementTree as ET
import opcua.ua.object_ids as obIds
import generate_model_event as gme
class EventsCodeGenerator(object):
def __init__(self, event_model, output):
self.output_file = output
self.event_model = event_model
self.indent = " "
self.iidx = 0 # indent index
def eventList(self):
tree = ET.parse(self.event_model)
root = tree.getroot()
for child in root:
if child.tag.endswith("UAObjectType"):
print (child.attrib)
def write(self, line):
if line:
line = self.indent * self.iidx + line
self.output_file.write(line + "\n")
def make_header(self):
self.write('"""')
self.write("Autogenerated code from xml spec")
self.write('"""')
self.write("")
self.write("from opcua import ua")
self.write("from opcua.common.events import Event")
def addProperties(self, event):
for ref in event.references:
if ref.referenceType == "HasProperty":
self.write("self.add_property('{0}', {1}, {2})".format(ref.refBrowseName, self.getPropertyValue(ref), self.getPropertyDataType(ref)))
def getPropertyValue(self, reference):
if reference.refBrowseName == "SourceNode":
return "sourcenode"
elif reference.refBrowseName == "Severity":
return "severity"
elif reference.refBrowseName == "Status":
return "False"
elif reference.refBrowseName == "Message":
return "ua.LocalizedText(message)"
elif reference.refDataType == "NodeId":
return "ua.NodeId(ua.ObjectIds.{0})".format(str(obIds.ObjectIdNames[int(str(reference.refId).split("=")[1])]).split("_")[0])
else:
return "None"
def getPropertyDataType(self, reference):
if str(reference.refBrowseName) in ("Time", "ReceiveTime"):
return "ua.VariantType.DateTime"
elif str(reference.refBrowseName) == "LocalTime":
return "ua.VariantType.ExtensionObject"
elif str(reference.refDataType).startswith("i="):
return "ua.NodeId(ua.ObjectIds.{0})".format(str(obIds.ObjectIdNames[int(str(reference.refDataType).split("=")[1])]).split("_")[0])
else:
return "ua.VariantType.{0}".format(reference.refDataType)
def generateEventclass(self, event, *parentEventBrowseName):
self.write("")
if event.browseName == "BaseEvent":
self.write("class {0}(Event):".format(event.browseName))
self.iidx += 1
self.write('"""')
if (event.description != None):
self.write(event.browseName + ": " + event.description)
else:
self.write(event.browseName + ": ")
self.write('"""')
self.write("def __init__(self, sourcenode=None, message=None, severity=1):")
self.iidx += 1
self.write("Event.__init__(self)")
self.addProperties(event)
else:
self.write("class {0}({1}):".format(event.browseName, parentEventBrowseName[0]))
self.iidx += 1
self.write('"""')
if (event.description != None):
self.write(event.browseName + ": " + event.description)
else:
self.write(event.browseName + ": ")
self.write('"""')
self.write("def __init__(self, sourcenode=None, message=None, severity=1):")
self.iidx += 1
self.write("super({0}, self).__init__(sourcenode, message, severity)".format(event.browseName))
self.write("self.EventType = ua.NodeId(ua.ObjectIds.{0}Type)".format(event.browseName))
self.addProperties(event)
self.iidx -= 2
def generateEventsCode(self, model):
self.output_file = open(self.output_file, "w")
self.make_header()
for event in model.values():
if (event.browseName == "BaseEvent"):
self.generateEventclass(event)
else:
parentNode = model[event.parentNodeId]
self.generateEventclass(event, parentNode.browseName)
self.write("")
self.write("")
self.write("IMPLEMENTED_EVENTS = {")
self.iidx += 1
for event in model.values():
self.write("ua.ObjectIds.{0}Type: {0},".format(event.browseName))
self.write("}")
if __name__ == "__main__":
xmlPath = "Opc.Ua.NodeSet2.xml"
output_file = "../opcua/common/event_objects.py"
p = gme.Parser(xmlPath)
model = p.parse()
ECG = EventsCodeGenerator(model, output_file)
ECG.generateEventsCode(model)
|