1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138
|
# ----------------------------------------------------------------------------
# - Open3D: www.open3d.org -
# ----------------------------------------------------------------------------
# Copyright (c) 2018-2024 www.open3d.org
# SPDX-License-Identifier: MIT
# ----------------------------------------------------------------------------
# examples/python/reconstruction_system/sensors/azure_kinect_mkv_reader.py
import argparse
import open3d as o3d
import os
import json
import sys
pwd = os.path.dirname(os.path.realpath(__file__))
sys.path.append(os.path.join(pwd, '..'))
from initialize_config import initialize_config
class ReaderWithCallback:
def __init__(self, input, output):
self.flag_exit = False
self.flag_play = True
self.input = input
self.output = output
self.reader = o3d.io.AzureKinectMKVReader()
self.reader.open(self.input)
if not self.reader.is_opened():
raise RuntimeError("Unable to open file {}".format(args.input))
def escape_callback(self, vis):
self.flag_exit = True
return False
def space_callback(self, vis):
if self.flag_play:
print('Playback paused, press [SPACE] to continue.')
else:
print('Playback resumed, press [SPACE] to pause.')
self.flag_play = not self.flag_play
return False
def run(self):
glfw_key_escape = 256
glfw_key_space = 32
vis = o3d.visualization.VisualizerWithKeyCallback()
vis.register_key_callback(glfw_key_escape, self.escape_callback)
vis.register_key_callback(glfw_key_space, self.space_callback)
vis_geometry_added = False
vis.create_window('reader', 1920, 540)
print(
"MKV reader initialized. Press [SPACE] to pause/start, [ESC] to exit."
)
if self.output is not None:
abspath = os.path.abspath(self.output)
metadata = self.reader.get_metadata()
o3d.io.write_azure_kinect_mkv_metadata(
'{}/intrinsic.json'.format(abspath), metadata)
config = {
'path_dataset': abspath,
'path_intrinsic': '{}/intrinsic.json'.format(abspath)
}
initialize_config(config)
with open('{}/config.json'.format(abspath), 'w') as f:
json.dump(config, f, indent=4)
idx = 0
while not self.reader.is_eof() and not self.flag_exit:
if self.flag_play:
rgbd = self.reader.next_frame()
if rgbd is None:
continue
if not vis_geometry_added:
vis.add_geometry(rgbd)
vis_geometry_added = True
if self.output is not None:
color_filename = '{0}/color/{1:05d}.jpg'.format(
self.output, idx)
print('Writing to {}'.format(color_filename))
o3d.io.write_image(color_filename, rgbd.color)
depth_filename = '{0}/depth/{1:05d}.png'.format(
self.output, idx)
print('Writing to {}'.format(depth_filename))
o3d.io.write_image(depth_filename, rgbd.depth)
idx += 1
try:
vis.update_geometry(rgbd)
except NameError:
pass
vis.poll_events()
vis.update_renderer()
self.reader.close()
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='Azure kinect mkv reader.')
parser.add_argument('--input',
type=str,
required=True,
help='input mkv file')
parser.add_argument('--output',
type=str,
help='output path to store color/ and depth/ images')
args = parser.parse_args()
if args.input is None:
parser.print_help()
exit()
if args.output is None:
print('No output path, only play mkv')
elif os.path.isdir(args.output):
print('Output path {} already existing, only play mkv'.format(
args.output))
args.output = None
else:
try:
os.mkdir(args.output)
os.mkdir('{}/color'.format(args.output))
os.mkdir('{}/depth'.format(args.output))
except (PermissionError, FileExistsError):
print('Unable to mkdir {}, only play mkv'.format(args.output))
args.output = None
reader = ReaderWithCallback(args.input, args.output)
reader.run()
|