File: binarygrammar.py

package info (click to toggle)
gavodachs 2.3%2Bdfsg-3
  • links: PTS, VCS
  • area: main
  • in suites: bullseye
  • size: 7,260 kB
  • sloc: python: 58,359; xml: 8,882; javascript: 3,453; ansic: 661; sh: 158; makefile: 22
file content (149 lines) | stat: -rw-r--r-- 4,849 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
"""
A grammar reading from (fixed-record) binary files.
"""

#c Copyright 2008-2020, the GAVO project
#c
#c This program is free software, covered by the GNU GPL.  See the
#c COPYING file in the source distribution.


import re
import struct


from gavo import base
from gavo import utils
from gavo.grammars.common import Grammar, FileRowIterator
from gavo.utils import misctricks
from gavo.utils import parsetricks


class BinaryRowIterator(FileRowIterator):
	"""A row iterator reading from binary files.
	"""
	fileMode = "rb"

	def _iterUnarmoredRecords(self):
		while True:
			data = self.inputFile.read(self.grammar.fieldDefs.recordLength)
			if data==b"":
				return
			yield data

	def _iterInRecords(self):
		self.inputFile.read(self.grammar.skipBytes)
		if self.grammar.armor is None:
			return self._iterUnarmoredRecords()
		elif self.grammar.armor=="fortran":
			return misctricks.iterFortranRecs(self.inputFile)
		else:
			assert False

	def _iterRows(self):
		fmtStr = self.grammar.fieldDefs.structFormat
		fieldNames = self.grammar.fieldDefs.fieldNames
		try:
			for rawRec in self._iterInRecords():
				yield dict(list(zip(fieldNames, struct.unpack(fmtStr, rawRec))))
		except Exception as ex:
			raise base.ui.logOldExc(base.SourceParseError(str(ex), 
				location="byte %s"%self.inputFile.tell(),
				source=str(self.sourceToken)))


def _getFieldsGrammar():
	with parsetricks.pyparsingWhitechars(" \n\t\r"):
		identifier = parsetricks.Regex(utils.identifierPattern.pattern[:-1]
			).setName("identifier")
		formatCode = parsetricks.Regex("\d+s|[bBhHiIqQfd]"
			).setName("fieldSpec")
		field = ( identifier("identifier")
			+ parsetricks.Suppress(parsetricks.Literal("("))
			+ formatCode("formatCode")
			+ parsetricks.Suppress(parsetricks.Literal(")"))).setParseAction(
				lambda s, p, t: dict(t))
		return parsetricks.OneOrMore(field)+parsetricks.StringEnd()

		
class BinaryRecordDef(base.Structure):
	"""A definition of a binary record.

	A binary records consists of a number of binary fields, each of which
	is defined by a name and a format code.  The format codes supported
	here are a subset of what python's struct module supports.  The
	widths given below are for big, little, and packed binfmts.
	For native (which is the default), it depends on your platform.

	* <number>s -- <number> characters making up a string
	* b,B -- signed and unsigned byte (8 bit)
	* h,H -- signed and unsigned short (16 bit)
	* i,I -- signed and unsigned int (32 bit)
	* q,Q -- signed and unsigned long (64 bit)
	* f,d -- float and double.

	The content of this element gives the record structure in the format
	<name>(<code>){<whitespace><name>(<code>)} where <name> is a c-style
	identifier.
	"""
	name_ = "binaryRecordDef"

	_fieldsGrammar = _getFieldsGrammar()

	_binfmt = base.EnumeratedUnicodeAttribute("binfmt",
		default="native", 
		validValues=["big", "little", "native", "packed"],
		description="Binary format of the input data; big and little stand"
			" for msb first and lsb first, and"
			" packed is like native except no alignment takes place.")

	_fields = base.DataContent(description="The enumeration of"
		" the record fields.")

	_binfmtToStructCode = {
		"native": "",
		"packed": "=",
		"big": ">",
		"little": "<"}

	def completeElement(self, ctx):
		try:
			parsedFields = utils.pyparseString(self._fieldsGrammar, self.content_)
		except parsetricks.ParseBaseException as ex:
			raise base.ui.logOldExc(base.LiteralParseError("binaryRecordDef", 
				re.sub("\s+", " ", self.content_),
				pos=str(ex.loc), hint="The parser said: '%s'"%str(ex)))
# XXX TODO: Position should probably be position during XML parse.
# Fix when we have source positions on parsed elements.
		self.structFormat = (self._binfmtToStructCode[self.binfmt]+
			str("".join(f["formatCode"] for f in parsedFields)))
		self.recordLength = struct.calcsize(self.structFormat)
		self.fieldNames = tuple(f["identifier"] for f in parsedFields)
		self._completeElementNext(BinaryRecordDef, ctx)


class BinaryGrammar(Grammar):
	"""A grammar that builds rowdicts from binary data.

	The grammar expects the input to be in fixed-length records. 
	the actual specification of the fields is done via a binaryRecordDef
	element.
	"""
	name_ = "binaryGrammar"
	rowIterator = BinaryRowIterator

	_til = base.IntAttribute("skipBytes", 
		default=0, 
		description="Number of bytes to skip before parsing records.")
	
	_fdefs = base.StructAttribute("fieldDefs",
		description="Definition of the record.",
		childFactory=BinaryRecordDef)

	_armoring = base.EnumeratedUnicodeAttribute("armor",
		default=None,
		validValues=["fortran"],
		description="Record armoring; by default it's None meaning the"
			" data was dumped to the file sequentially.  Set it to fortran"
			" for fortran unformatted files (4 byte length before and after"
			" the payload).")