File: edit_message_definition.py

package info (click to toggle)
rosbags 0.11.0-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 1,940 kB
  • sloc: python: 19,988; makefile: 4
file content (100 lines) | stat: -rw-r--r-- 3,197 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
"""Example: Edit message definition.

Between ROS1 and ROS2 the CameraInfo message definition has changed.
The D, K, R, and P field names have been changed from upper case to
lower case. This example shows how to downgrade the messages in a
rosbag1 to the proper message definition.

"""

from __future__ import annotations

from typing import TYPE_CHECKING, cast

from rosbags.highlevel.anyreader import AnyReader
from rosbags.rosbag1 import Writer
from rosbags.typesys import Stores, get_types_from_msg, get_typestore

if TYPE_CHECKING:
    from pathlib import Path

    from rosbags.interfaces import ConnectionExtRosbag1
    from rosbags.typesys.stores.ros2_foxy import sensor_msgs__msg__CameraInfo

# Noetic camera info message definition, taken from:
# http://docs.ros.org/en/noetic/api/sensor_msgs/html/msg/CameraInfo.html
CAMERAINFO_DEFINITION = """
std_msgs/Header header
uint32 height
uint32 width
string distortion_model
float64[] D
float64[9] K
float64[9] R
float64[12] P
uint32 binning_x
uint32 binning_y
sensor_msgs/RegionOfInterest roi
"""


def downgrade_camerainfo_to_rosbag1(src: Path, dst: Path) -> None:
    """Edit message definitions in a rosbag1.

    Args:
        src: Rosbag1 source path.
        dst: Destination path.
        topic: Name of topic to remove.

    """
    typename = 'sensor_msgs/msg/CameraInfo'
    typestore = get_typestore(Stores.EMPTY)
    typestore.register(
        get_types_from_msg(CAMERAINFO_DEFINITION, typename),
    )
    CameraInfo = typestore.types[typename]  # noqa: N806

    with AnyReader([src]) as reader, Writer(dst) as writer:
        conn_map = {}

        for conn in reader.connections:
            ext = cast('ConnectionExtRosbag1', conn.ext)

            # Use updated message definition and md5sum for CameraInfo.
            if conn.msgtype == 'sensor_msgs/msg/CameraInfo':
                from_typestore = typestore
            else:
                from_typestore = reader.typestore

            conn_map[conn.id] = writer.add_connection(
                conn.topic,
                conn.msgtype,
                typestore=from_typestore,
                callerid=ext.callerid,
                latching=ext.latching,
            )

        for conn, timestamp, data in reader.messages():
            wconn = conn_map[conn.id]

            if conn.msgtype == 'sensor_msgs/msg/CameraInfo':
                msg = cast('sensor_msgs__msg__CameraInfo', reader.deserialize(data, conn.msgtype))
                converted_msg = CameraInfo(
                    header=msg.header,
                    height=msg.height,
                    width=msg.width,
                    distortion_model=msg.distortion_model,
                    # Map lower case names to upper case.
                    D=msg.d,
                    K=msg.k,
                    R=msg.r,
                    P=msg.p,
                    binning_x=msg.binning_x,
                    binning_y=msg.binning_y,
                    roi=msg.roi,
                )
                outdata: memoryview | bytes = typestore.serialize_ros1(converted_msg, wconn.msgtype)
            else:
                outdata = data

            writer.write(wconn, timestamp, outdata)