File: dasiamrpn_tracker.py

package info (click to toggle)
opencv 4.5.1%2Bdfsg-5
  • links: PTS, VCS
  • area: main
  • in suites: bullseye
  • size: 268,248 kB
  • sloc: cpp: 969,170; xml: 682,525; python: 36,732; lisp: 30,170; java: 25,155; ansic: 7,927; javascript: 5,643; objc: 2,041; sh: 935; cs: 601; perl: 494; makefile: 145
file content (291 lines) | stat: -rw-r--r-- 12,462 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
"""
DaSiamRPN tracker.
Original paper: https://arxiv.org/abs/1808.06048
Link to original repo: https://github.com/foolwood/DaSiamRPN
Links to onnx models:
network:     https://www.dropbox.com/s/rr1lk9355vzolqv/dasiamrpn_model.onnx?dl=0
kernel_r1:   https://www.dropbox.com/s/999cqx5zrfi7w4p/dasiamrpn_kernel_r1.onnx?dl=0
kernel_cls1: https://www.dropbox.com/s/qvmtszx5h339a0w/dasiamrpn_kernel_cls1.onnx?dl=0
"""

import numpy as np
import cv2 as cv
import argparse
import sys

class DaSiamRPNTracker:
    # Initialization of used values, initial bounding box, used network
    def __init__(self, net="dasiamrpn_model.onnx", kernel_r1="dasiamrpn_kernel_r1.onnx", kernel_cls1="dasiamrpn_kernel_cls1.onnx"):
        self.windowing = "cosine"
        self.exemplar_size = 127
        self.instance_size = 271
        self.total_stride = 8
        self.score_size = (self.instance_size - self.exemplar_size) // self.total_stride + 1
        self.context_amount = 0.5
        self.ratios = [0.33, 0.5, 1, 2, 3]
        self.scales = [8, ]
        self.anchor_num = len(self.ratios) * len(self.scales)
        self.penalty_k = 0.055
        self.window_influence = 0.42
        self.lr = 0.295
        self.score = []
        if self.windowing == "cosine":
            self.window = np.outer(np.hanning(self.score_size), np.hanning(self.score_size))
        elif self.windowing == "uniform":
            self.window = np.ones((self.score_size, self.score_size))
        self.window = np.tile(self.window.flatten(), self.anchor_num)
        # Loading network`s and kernel`s models
        self.net = cv.dnn.readNet(net)
        self.kernel_r1 = cv.dnn.readNet(kernel_r1)
        self.kernel_cls1 = cv.dnn.readNet(kernel_cls1)

    def init(self, im, init_bb):
        target_pos, target_sz = np.array([init_bb[0], init_bb[1]]), np.array([init_bb[2], init_bb[3]])
        self.im_h = im.shape[0]
        self.im_w = im.shape[1]
        self.target_pos = target_pos
        self.target_sz = target_sz
        self.avg_chans = np.mean(im, axis=(0, 1))

        # When we trying to generate ONNX model from the pre-trained .pth model
        # we are using only one state of the network. In our case used state
        # with big bounding box, so we were forced to add assertion for
        # too small bounding boxes - current state of the network can not
        # work properly with such small bounding boxes
        if ((self.target_sz[0] * self.target_sz[1]) / float(self.im_h * self.im_w)) < 0.004:
            raise AssertionError(
        "Initializing BB is too small-try to restart tracker with larger BB")

        self.anchor = self.__generate_anchor()
        wc_z = self.target_sz[0] + self.context_amount * sum(self.target_sz)
        hc_z = self.target_sz[1] + self.context_amount * sum(self.target_sz)
        s_z = round(np.sqrt(wc_z * hc_z))
        z_crop = self.__get_subwindow_tracking(im, self.exemplar_size, s_z)
        z_crop = z_crop.transpose(2, 0, 1).reshape(1, 3, 127, 127).astype(np.float32)
        self.net.setInput(z_crop)
        z_f = self.net.forward('63')
        self.kernel_r1.setInput(z_f)
        r1 = self.kernel_r1.forward()
        self.kernel_cls1.setInput(z_f)
        cls1 = self.kernel_cls1.forward()
        r1 = r1.reshape(20, 256, 4, 4)
        cls1 = cls1.reshape(10, 256 , 4, 4)
        self.net.setParam(self.net.getLayerId('65'), 0, r1)
        self.net.setParam(self.net.getLayerId('68'), 0, cls1)

    # Сreating anchor for tracking bounding box
    def __generate_anchor(self):
        self.anchor = np.zeros((self.anchor_num, 4),  dtype = np.float32)
        size = self.total_stride * self.total_stride
        count = 0

        for ratio in self.ratios:
            ws = int(np.sqrt(size / ratio))
            hs = int(ws * ratio)
            for scale in self.scales:
                wws = ws * scale
                hhs = hs * scale
                self.anchor[count] = [0, 0, wws, hhs]
                count += 1

        score_sz = int(self.score_size)
        self.anchor = np.tile(self.anchor, score_sz * score_sz).reshape((-1, 4))
        ori = - (score_sz / 2) * self.total_stride
        xx, yy = np.meshgrid([ori + self.total_stride * dx for dx in range(score_sz)], [ori + self.total_stride * dy for dy in range(score_sz)])
        xx, yy = np.tile(xx.flatten(), (self.anchor_num, 1)).flatten(), np.tile(yy.flatten(), (self.anchor_num, 1)).flatten()
        self.anchor[:, 0], self.anchor[:, 1] = xx.astype(np.float32), yy.astype(np.float32)
        return self.anchor

    # Function for updating tracker state
    def update(self, im):
        wc_z = self.target_sz[1] + self.context_amount * sum(self.target_sz)
        hc_z = self.target_sz[0] + self.context_amount * sum(self.target_sz)
        s_z = np.sqrt(wc_z * hc_z)
        scale_z = self.exemplar_size / s_z
        d_search = (self.instance_size - self.exemplar_size) / 2
        pad = d_search / scale_z
        s_x = round(s_z + 2 * pad)

        # Region preprocessing part
        x_crop = self.__get_subwindow_tracking(im, self.instance_size, s_x)
        x_crop = x_crop.transpose(2, 0, 1).reshape(1, 3, 271, 271).astype(np.float32)
        self.score = self.__tracker_eval(x_crop, scale_z)
        self.target_pos[0] = max(0, min(self.im_w, self.target_pos[0]))
        self.target_pos[1] = max(0, min(self.im_h, self.target_pos[1]))
        self.target_sz[0] = max(10, min(self.im_w, self.target_sz[0]))
        self.target_sz[1] = max(10, min(self.im_h, self.target_sz[1]))

        cx, cy = self.target_pos
        w, h = self.target_sz
        updated_bb = (cx, cy, w, h)
        return True, updated_bb

    # Function for updating position of the bounding box
    def __tracker_eval(self, x_crop, scale_z):
        target_size = self.target_sz * scale_z
        self.net.setInput(x_crop)
        outNames = self.net.getUnconnectedOutLayersNames()
        outNames = ['66', '68']
        delta, score = self.net.forward(outNames)
        delta = np.transpose(delta, (1, 2, 3, 0))
        delta = np.ascontiguousarray(delta, dtype = np.float32)
        delta = np.reshape(delta, (4, -1))
        score = np.transpose(score, (1, 2, 3, 0))
        score = np.ascontiguousarray(score, dtype = np.float32)
        score = np.reshape(score, (2, -1))
        score = self.__softmax(score)[1, :]
        delta[0, :] = delta[0, :] * self.anchor[:, 2] + self.anchor[:, 0]
        delta[1, :] = delta[1, :] * self.anchor[:, 3] + self.anchor[:, 1]
        delta[2, :] = np.exp(delta[2, :]) * self.anchor[:, 2]
        delta[3, :] = np.exp(delta[3, :]) * self.anchor[:, 3]

        def __change(r):
            return np.maximum(r, 1./r)

        def __sz(w, h):
            pad = (w + h) * 0.5
            sz2 = (w + pad) * (h + pad)
            return np.sqrt(sz2)

        def __sz_wh(wh):
            pad = (wh[0] + wh[1]) * 0.5
            sz2 = (wh[0] + pad) * (wh[1] + pad)
            return np.sqrt(sz2)

        s_c = __change(__sz(delta[2, :], delta[3, :]) / (__sz_wh(target_size)))
        r_c = __change((target_size[0] / target_size[1]) / (delta[2, :] / delta[3, :]))
        penalty = np.exp(-(r_c * s_c - 1.) * self.penalty_k)
        pscore = penalty * score
        pscore = pscore * (1 - self.window_influence) + self.window * self.window_influence
        best_pscore_id = np.argmax(pscore)
        target = delta[:, best_pscore_id] / scale_z
        target_size /= scale_z
        lr = penalty[best_pscore_id] * score[best_pscore_id] * self.lr
        res_x = target[0] + self.target_pos[0]
        res_y = target[1] + self.target_pos[1]
        res_w = target_size[0] * (1 - lr) + target[2] * lr
        res_h = target_size[1] * (1 - lr) + target[3] * lr
        self.target_pos = np.array([res_x, res_y])
        self.target_sz = np.array([res_w, res_h])
        return score[best_pscore_id]

    def __softmax(self, x):
        x_max = x.max(0)
        e_x = np.exp(x - x_max)
        y = e_x / e_x.sum(axis = 0)
        return y

    # Reshaping cropped image for using in the model
    def __get_subwindow_tracking(self, im, model_size, original_sz):
        im_sz = im.shape
        c = (original_sz + 1) / 2
        context_xmin = round(self.target_pos[0] - c)
        context_xmax = context_xmin + original_sz - 1
        context_ymin = round(self.target_pos[1] - c)
        context_ymax = context_ymin + original_sz - 1
        left_pad = int(max(0., -context_xmin))
        top_pad = int(max(0., -context_ymin))
        right_pad = int(max(0., context_xmax - im_sz[1] + 1))
        bot_pad = int(max(0., context_ymax - im_sz[0] + 1))
        context_xmin += left_pad
        context_xmax += left_pad
        context_ymin += top_pad
        context_ymax += top_pad
        r, c, k = im.shape

        if any([top_pad, bot_pad, left_pad, right_pad]):
            te_im = np.zeros((
                r + top_pad + bot_pad, c + left_pad + right_pad, k), np.uint8)
            te_im[top_pad:top_pad + r, left_pad:left_pad + c, :] = im
            if top_pad:
                te_im[0:top_pad, left_pad:left_pad + c, :] = self.avg_chans
            if bot_pad:
                te_im[r + top_pad:, left_pad:left_pad + c, :] = self.avg_chans
            if left_pad:
                te_im[:, 0:left_pad, :] = self.avg_chans
            if right_pad:
                te_im[:, c + left_pad:, :] = self.avg_chans
            im_patch_original = te_im[int(context_ymin):int(context_ymax + 1), int(context_xmin):int(context_xmax + 1), :]
        else:
            im_patch_original = im[int(context_ymin):int(context_ymax + 1), int(context_xmin):int(context_xmax + 1), :]

        if not np.array_equal(model_size, original_sz):
            im_patch_original = cv.resize(im_patch_original, (model_size, model_size))
        return im_patch_original

# Sample for using DaSiamRPN tracker
def main():
    parser = argparse.ArgumentParser(description="Run tracker")
    parser.add_argument("--input", type=str, help="Full path to input (empty for camera)")
    parser.add_argument("--net", type=str, default="dasiamrpn_model.onnx", help="Full path to onnx model of net")
    parser.add_argument("--kernel_r1", type=str, default="dasiamrpn_kernel_r1.onnx", help="Full path to onnx model of kernel_r1")
    parser.add_argument("--kernel_cls1", type=str, default="dasiamrpn_kernel_cls1.onnx", help="Full path to onnx model of kernel_cls1")
    args = parser.parse_args()
    point1 = ()
    point2 = ()
    mark = True
    drawing = False
    cx, cy, w, h = 0.0, 0.0, 0, 0
    # Fucntion for drawing during videostream
    def get_bb(event, x, y, flag, param):
        nonlocal point1, point2, cx, cy, w, h, drawing, mark

        if event == cv.EVENT_LBUTTONDOWN:
            if not drawing:
                drawing = True
                point1 = (x, y)
            else:
                drawing = False

        elif event == cv.EVENT_MOUSEMOVE:
            if drawing:
                point2 = (x, y)

        elif event == cv.EVENT_LBUTTONUP:
            cx = point1[0] - (point1[0] - point2[0]) / 2
            cy = point1[1] - (point1[1] - point2[1]) / 2
            w = abs(point1[0] - point2[0])
            h = abs(point1[1] - point2[1])
            mark = False

    # Creating window for visualization
    cap = cv.VideoCapture(args.input if args.input else 0)
    cv.namedWindow("DaSiamRPN")
    cv.setMouseCallback("DaSiamRPN", get_bb)

    whitespace_key = 32
    while cv.waitKey(40) != whitespace_key:
        has_frame, frame = cap.read()
        if not has_frame:
            sys.exit(0)
        cv.imshow("DaSiamRPN", frame)

    while mark:
        twin = np.copy(frame)
        if point1 and point2:
            cv.rectangle(twin, point1, point2, (0, 255, 255), 3)
        cv.imshow("DaSiamRPN", twin)
        cv.waitKey(40)

    init_bb = (cx, cy, w, h)
    tracker = DaSiamRPNTracker(args.net, args.kernel_r1, args.kernel_cls1)
    tracker.init(frame, init_bb)

    # Tracking loop
    while cap.isOpened():
        has_frame, frame = cap.read()
        if not has_frame:
            sys.exit(0)
        _, new_bb = tracker.update(frame)
        cx, cy, w, h = new_bb
        cv.rectangle(frame, (int(cx - w // 2), int(cy - h // 2)), (int(cx - w // 2) + int(w), int(cy - h // 2) + int(h)),(0, 255, 255), 3)
        cv.imshow("DaSiamRPN", frame)
        key = cv.waitKey(1)
        if key == ord("q"):
            break

    cap.release()
    cv.destroyAllWindows()

if __name__ == "__main__":
    main()