File: decimal.py

package info (click to toggle)
node-postgres 8.16.3%2B~cs35.24.27-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 3,788 kB
  • sloc: javascript: 15,879; python: 79; makefile: 57
file content (68 lines) | stat: -rwxr-xr-x 1,591 bytes parent folder | download | duplicates (4)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
#!/usr/bin/env python3
import binascii
import csv
import os.path
import re
import struct
import subprocess
import sys


TEST = re.compile(r'[-0-9.Na]+$')


def read_numerics(f):
	header = f.read(19)

	if header != b'PGCOPY\n\xff\r\n\0\0\0\0\0\0\0\0\0':
		raise Exception('Unexpected header')

	results = []

	while True:
		(field_count,) = struct.unpack('>h', f.read(2))

		if field_count == -1:
			assert f.read() == b''
			break

		if field_count != 1:
			raise Exception(f'Unexpected field count: {field_count!r}')

		(field_size,) = struct.unpack('>i', f.read(4))
		results.append(f.read(field_size))

	return results


def main():
	tests = []

	with open(os.path.join(os.path.dirname(__file__), 'decimal.in'), 'r') as f:
		for line in f:
			if line.isspace() or line.startswith('#'):
				continue

			if not TEST.match(line):
				raise ValueError(f'Invalid test: {line!r}')

			tests.append(line.rstrip())

	query = b'COPY (VALUES ' + b', '.join(b"(numeric '%s')" % (test.encode('ascii'),) for test in tests) + b') TO STDOUT (FORMAT binary)'

	with subprocess.Popen(['psql', '--set=ON_ERROR_STOP=1'], stdin=subprocess.PIPE, stdout=subprocess.PIPE) as psql:
		psql.stdin.write(query)
		psql.stdin.close()

		results = read_numerics(psql.stdout)

	if psql.returncode != 0:
		raise subprocess.CalledProcessError(psql.returncode, psql.args)

	writer = csv.writer(sys.stdout, lineterminator='\n')
	writer.writerow(('binary', 'expected'))
	writer.writerows((binascii.hexlify(binary).decode('ascii'), expected) for binary, expected in zip(results, tests))


if __name__ == '__main__':
	main()