File: optical_flow.py

package info (click to toggle)
opencv 4.5.1%2Bdfsg-5
  • links: PTS, VCS
  • area: main
  • in suites: bullseye
  • size: 268,248 kB
  • sloc: cpp: 969,170; xml: 682,525; python: 36,732; lisp: 30,170; java: 25,155; ansic: 7,927; javascript: 5,643; objc: 2,041; sh: 935; cs: 601; perl: 494; makefile: 145
file content (103 lines) | stat: -rw-r--r-- 4,303 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
#!/usr/bin/env python
'''
This sample using FlowNet v2 model to calculate optical flow.
Original paper: https://arxiv.org/abs/1612.01925.
Original repo:  https://github.com/lmb-freiburg/flownet2.

Download the converted .caffemodel model from https://drive.google.com/open?id=16qvE9VNmU39NttpZwZs81Ga8VYQJDaWZ
and .prototxt from https://drive.google.com/file/d/1RyNIUsan1ZOh2hpYIH36A-jofAvJlT6a/view?usp=sharing.
Otherwise download original model from https://lmb.informatik.uni-freiburg.de/resources/binaries/flownet2/flownet2-models.tar.gz,
convert .h5 model to .caffemodel and modify original .prototxt using .prototxt from link above.
'''

import argparse
import os.path
import numpy as np
import cv2 as cv


class OpticalFlow(object):
    def __init__(self, proto, model, height, width):
        self.net = cv.dnn.readNetFromCaffe(proto, model)
        self.net.setPreferableBackend(cv.dnn.DNN_BACKEND_OPENCV)
        self.height = height
        self.width = width

    def compute_flow(self, first_img, second_img):
        inp0 = cv.dnn.blobFromImage(first_img, size=(self.width, self.height))
        inp1 = cv.dnn.blobFromImage(second_img, size=(self.width, self.height))
        self.net.setInput(inp0, "img0")
        self.net.setInput(inp1, "img1")
        flow = self.net.forward()
        output = self.motion_to_color(flow)
        return output

    def motion_to_color(self, flow):
        arr = np.arange(0, 255, dtype=np.uint8)
        colormap = cv.applyColorMap(arr, cv.COLORMAP_HSV)
        colormap = colormap.squeeze(1)

        flow = flow.squeeze(0)
        fx, fy = flow[0, ...], flow[1, ...]
        rad = np.sqrt(fx**2 + fy**2)
        maxrad = rad.max() if rad.max() != 0 else 1

        ncols = arr.size
        rad = rad[..., np.newaxis] / maxrad
        a = np.arctan2(-fy / maxrad, -fx / maxrad) / np.pi
        fk = (a + 1) / 2.0 * (ncols - 1)
        k0 = fk.astype(np.int)
        k1 = (k0 + 1) % ncols
        f = fk[..., np.newaxis] - k0[..., np.newaxis]

        col0 = colormap[k0] / 255.0
        col1 = colormap[k1] / 255.0
        col = (1 - f) * col0 + f * col1
        col = np.where(rad <= 1, 1 - rad * (1 - col), col * 0.75)
        output = (255.0 * col).astype(np.uint8)
        return output


if __name__ == '__main__':
    parser = argparse.ArgumentParser(description='Use this script to calculate optical flow using FlowNetv2',
                                     formatter_class=argparse.ArgumentDefaultsHelpFormatter)
    parser.add_argument('-input', '-i', required=True, help='Path to input video file. Skip this argument to capture frames from a camera.')
    parser.add_argument('--height', default=320, type=int, help='Input height')
    parser.add_argument('--width', default=448, type=int, help='Input width')
    parser.add_argument('--proto', '-p', default='FlowNet2_deploy_anysize.prototxt', help='Path to prototxt.')
    parser.add_argument('--model', '-m', default='FlowNet2_weights.caffemodel', help='Path to caffemodel.')
    args, _ = parser.parse_known_args()

    if not os.path.isfile(args.model) or not os.path.isfile(args.proto):
        raise OSError("Prototxt or caffemodel not exist")

    winName = 'Calculation optical flow in OpenCV'
    cv.namedWindow(winName, cv.WINDOW_NORMAL)
    cap = cv.VideoCapture(args.input if args.input else 0)
    hasFrame, first_frame = cap.read()

    divisor = 64.
    var = {}
    var['ADAPTED_WIDTH'] = int(np.ceil(args.width/divisor) * divisor)
    var['ADAPTED_HEIGHT'] = int(np.ceil(args.height/divisor) * divisor)
    var['SCALE_WIDTH'] = args.width / float(var['ADAPTED_WIDTH'])
    var['SCALE_HEIGHT'] = args.height / float(var['ADAPTED_HEIGHT'])

    config = ''
    proto = open(args.proto).readlines()
    for line in proto:
        for key, value in var.items():
            tag = "$%s$" % key
            line = line.replace(tag, str(value))
        config += line

    caffemodel = open(args.model, 'rb').read()

    opt_flow = OpticalFlow(bytearray(config.encode()), caffemodel, var['ADAPTED_HEIGHT'], var['ADAPTED_WIDTH'])
    while cv.waitKey(1) < 0:
        hasFrame, second_frame = cap.read()
        if not hasFrame:
            break
        flow = opt_flow.compute_flow(first_frame, second_frame)
        first_frame = second_frame
        cv.imshow(winName, flow)