File: video_camera_testing.py

package info (click to toggle)
morse-simulator 1.4-8
  • links: PTS, VCS
  • area: main
  • in suites: bookworm
  • size: 187,116 kB
  • sloc: ansic: 108,311; python: 25,694; cpp: 786; makefile: 126; xml: 34; sh: 7
file content (269 lines) | stat: -rw-r--r-- 10,420 bytes parent folder | download | duplicates (4)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
#! /usr/bin/env python
# -*- coding: utf-8 -*-
"""
This script tests the 'data stream' oriented feature of the socket interface.
"""

from morse.testing.testing import MorseTestCase

try:
    # Include this import to be able to use your test file as a regular
    # builder script, ie, usable with: 'morse [run|exec] <your test>.py
    from morse.builder import *
except ImportError:
    pass

import os
import sys
import time
import base64
import struct
import zlib
from pymorse import Morse

IMAGE_WIDTH=320
IMAGE_HEIGHT=240
DEBUG_PGM=False # turn this to True to save difference images

def send_angles(orientation_stream, yaw, pitch, roll):
    orientation_stream.publish({'yaw' : yaw, 'pitch' : pitch, 'roll' : roll})

# a,b in radians.  d in digits of accuracy (ugh!) on the unit circle.
def are_angles_almost_equal(a, b, delta):
    return abs(math.cos(a) - math.cos(b)) <= delta and \
           abs(math.sin(a) - math.sin(b)) <= delta

def wait_yaw(gyroscope_stream, morse, timeout, yaw, precision):
    print("time::now %f" % morse.time())
    timeout_t = morse.time() + timeout
    print("wait_yaw %f %f" % (morse.time(), timeout_t))
    while morse.time() < timeout_t:
        print("%f %f" % (morse.time(), timeout_t))
        angles = gyroscope_stream.get()
        if are_angles_almost_equal(angles['yaw'], yaw, precision):
            return True
        morse.sleep(.1)

    return False

def rotate_robot_and_wait(orientation_stream, gyroscope_stream, morse, \
                          yaw, timeout, precision):
    send_angles(orientation_stream, yaw, 0.0, 0.0)
    # wait for the robot to be at the desired Z angle (yaw)
    return wait_yaw(gyroscope_stream, morse, timeout, yaw, precision)

def flush_camera(camera_stream, morse, timeout):
    timeout_t = morse.time() + timeout
    print("flush_camera %f" % timeout_t)
    while morse.time() < timeout_t:
        print("%f %f" % (morse.time(), timeout_t))
        # get a new image from the camera stream
        camera_stream.get()
        morse.sleep(.1)

def rgba2gray8u(image_rgba_base64):
    # Grayscale model used for HDTV developed by the ATSC (Wikipedia)
    image = base64.b64decode( image_rgba_base64 )
    return [ int(0.2126 * image[index] +
                 0.7152 * image[index + 1] +
                 0.0722 * image[index + 2] )
             for index in range(0, len(image), 4) ]

def load_image(filepath, size):
    image = []
    with open(filepath, 'rb') as f:
        # Read binary image (Bytes)
        image = struct.unpack('%iB'%size, zlib.decompress(f.read()))

    return image

def save_image(filepath, image8u):
    with open(filepath, 'wb') as f:
        # Write binary image (Bytes)
        f.write(zlib.compress(struct.pack('%iB'%len(image8u), *image8u)))

def save_pgm_ascii(filepath, image8u, width, height):
    assert(len(image8u) == width * height)
    with open(filepath, 'w') as f:
        # Write PGM P2 image (ASCII)
        f.write('P2\n%i %i\n255\n'%(width, height))
        f.write(' '.join(['%i'%px for px in image8u]))

# http://stackoverflow.com/questions/7368739/numpy-and-16-bit-pgm
def read_pgm_ascii(filepath):
    with open(filepath, 'r') as f:
        buff = f.read()
    try:
        import re
        header, width, height, maxval = re.search(
            "(^P2\s(?:\s*#.*[\r\n])*"
            "(\d+)\s(?:\s*#.*[\r\n])*"
            "(\d+)\s(?:\s*#.*[\r\n])*"
            "(\d+)\s(?:\s*#.*[\r\n]\s)*)", buff).groups()
    except AttributeError:
        raise ValueError("Not PGM P2 file: '%s'" % filepath)
    image8u = [int(px) for px in buff[len(header):].split()]
    assert(len(image8u) == int(width) * int(height))
    assert(max(image8u) <= int(maxval))
    return image8u

def capture8u(cam_stream, image_path=None):
    # get new RGBA image
    capture = cam_stream.get()
    # convert it to grayscale
    image8u = rgba2gray8u( capture['image'] )
    # or if we use Video8uPublisher
    # image8u = base64.b64decode( capture['image'] )
    # save the image (for debug)
    if image_path:
        save_pgm_ascii(image_path, image8u, IMAGE_WIDTH, IMAGE_HEIGHT)

    return image8u

def diff_image(imageA, imageB, debug=None):
    assert(len(imageA) == len(imageB))
    diff = [abs(pxA - pxB) for pxA, pxB in zip(imageA, imageB)]
    # returns sum(abs( imageA - imageB )) pixel per pixel
    sum_diff = sum(diff)
    if debug:
        pgm_path = '%s.%i.%s.pgm'%(os.path.abspath(__file__), \
                                   int(time.time()*1000), debug)
        save_pgm_ascii(pgm_path, diff, IMAGE_WIDTH, IMAGE_HEIGHT)
        print("debug: %8i -> %5.3f %% %s"% \
              (sum_diff, sum_diff/(len(diff)*2.55), pgm_path))

    return sum_diff

def normalize_radians(angle):
    two_pi = 2 * math.pi
    # First, normalize angle between 0 and 2*PI
    new_angle = angle % two_pi
    # Then, between -PI and PI
    if new_angle > math.pi:
        new_angle = new_angle - two_pi

    return new_angle

class CameraTest(MorseTestCase):

    def setUpEnv(self):

        atrv = ATRV('atrv')
        atrv.rotate(0.0, 0.0, math.pi)

        camera = VideoCamera('camera')
        camera.properties(capturing = True)
        camera.properties(cam_width = 320)
        camera.properties(cam_height = 240)
        camera.properties(cam_focal = 25.0000)
        camera.properties(Vertical_Flip = True)
        camera.translate(x=0.2, z=0.9)
        atrv.append(camera)
        camera.add_stream('socket')

        orientation = Orientation('orientation')
        orientation.add_stream('socket')
        atrv.append(orientation)

        gyroscope = Gyroscope('gyroscope')
        gyroscope.add_stream('socket')
        atrv.append(gyroscope)

        env = Environment('indoors-1/boxes')
        # Shadow may vary depending on the GPU, MULTITEXTURE mode = no shadow.
        # We can't use SOLID viewport or SINGLETEXTURE mode since they do not
        # provide image in bge.texture
        env.set_material_mode('MULTITEXTURE')
        camera.profile()

    def assert_image_file_diff_less(self, filepath, image8u, delta):
        image_from_file = read_pgm_ascii(filepath)
        self.assert_images_diff_less(image8u, image_from_file, delta)

    def assert_images_diff_less(self, imageA, imageB, delta):
        debug = "less" if DEBUG_PGM else None
        diff = diff_image(imageA, imageB, debug=debug)
        # Diff max is: width * height * 255
        # delta in percent (of the maximum difference)
        self.assertLess(diff, delta * 2.55 * len(imageA))

    def assert_images_diff_greater(self, imageA, imageB, delta):
        debug = "greater" if DEBUG_PGM else None
        diff = diff_image(imageA, imageB, debug=debug)
        # Diff max is: width * height * 255
        # delta in percent (of the maximum difference)
        self.assertGreater(diff, delta * 2.55 * len(imageA))

    def assert_orientation(self, gyroscope_stream, yaw, pitch, roll, precision):
        angles = gyroscope_stream.get()
        self.assertAnglesAlmostEqual(angles['yaw'], yaw, precision)
        self.assertAnglesAlmostEqual(angles['pitch'], pitch, precision)
        self.assertAnglesAlmostEqual(angles['roll'], roll, precision)

    # a,b in radians.  d in digits of accuracy (ugh!) on the unit circle.
    # http://astrometry.net/svn/trunk/projects/ephemeris/py/test_celestial_mechanics.py
    def assertAnglesAlmostEqual(self, a, b, delta=0.0):
        self.assertAlmostEqual(math.cos(a), math.cos(b), delta=delta)
        self.assertAlmostEqual(math.sin(a), math.sin(b), delta=delta)

    def test_camera(self):
        """ Test if we can connect to the pose data stream, and read from it.

            The test has been constructed in the following way
                - put the robot in a specific position and takes an
                  image
                - using an editor image, look for the rgb value of some
                  specific zone, and setup some threshold
                - retrieve the threshold in the scene and tests it is
                  mostly invariant

            It will allow to detect that :
                - the camera is broken and don't have a real image
                - blender has changed the way they transmit the image
                - blender changes the orientation of their image
                - possibly other bugs
        """

        with Morse() as morse:
            camera_stream = morse.atrv.camera
            orientation_stream = morse.atrv.orientation
            gyroscope_stream = morse.atrv.gyroscope

            imageA_path = '%s.A.pgm'%os.path.abspath(__file__)
            imageB_path = '%s.B.pgm'%os.path.abspath(__file__)

            precision = 0.01
            # assert robot orienation is correct
            self.assert_orientation(gyroscope_stream, math.pi, 0.0, 0.0, \
                                    precision)

            # get a new image from the camera in gray
            imageA = capture8u(camera_stream)#, imageA_path)
            # assert that the camera image differ < .1 percent from the expected
            self.assert_image_file_diff_less(imageA_path, imageA, 0.1)

            # command the robot to rotate and wait that he does for 5 seconds max
            in_time = rotate_robot_and_wait(orientation_stream, \
                                            gyroscope_stream, morse, 2.70, 5, precision)
            # XXX robot might have not graphically turned yet! happens randomly!
            # gyroscope can give its new orientation while the physics didnt update yet.
            if DEBUG_PGM:
                print("debug: rotate in time: %s (False = timeout)"%str(in_time))
            # "flush" the camera stream for 1 second
            flush_camera(camera_stream, morse, 1.0)

            # assert robot orienation is correct
            self.assert_orientation(gyroscope_stream, 2.70, 0.0, 0.0, precision)

            # get a new image from the camera in gray
            imageB = capture8u(camera_stream)#, imageB_path)
            # assert that the camera image differ < .1 percent from the expected
            self.assert_image_file_diff_less(imageB_path, imageB, 0.2)

            # assert that the second image differ > 4 percent from the first
            self.assert_images_diff_greater(imageA, imageB, 4)

########################## Run these tests ##########################
if __name__ == "__main__":
    from morse.testing.testing import main
    main(CameraTest)