From f9babbcbc0c67f2fbdda8aed76ffb13bb664a973 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Marcus=20M=C3=BCller?= <mmueller@gnuradio.org>
Date: Sun, 23 Feb 2025 00:36:12 +0100
Subject: [PATCH 23/41] blocks/annotator_raw: use set instead of vector to
 avoid sorting
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

Signed-off-by: Marcus Müller <mmueller@gnuradio.org>
---
 gr-blocks/lib/annotator_raw_impl.cc         | 45 +++++-----
 gr-blocks/lib/annotator_raw_impl.h          | 27 +++++-
 gr-blocks/python/blocks/qa_annotator_raw.py | 94 +++++++++++++++++++++
 3 files changed, 141 insertions(+), 25 deletions(-)
 create mode 100644 gr-blocks/python/blocks/qa_annotator_raw.py

diff --git a/gr-blocks/lib/annotator_raw_impl.cc b/gr-blocks/lib/annotator_raw_impl.cc
index 4939946d51..d9da9264e3 100644
--- a/gr-blocks/lib/annotator_raw_impl.cc
+++ b/gr-blocks/lib/annotator_raw_impl.cc
@@ -8,12 +8,10 @@
  *
  */
 
-#ifdef HAVE_CONFIG_H
-#include "config.h"
-#endif
 
 #include "annotator_raw_impl.h"
 #include <gnuradio/io_signature.h>
+#include <cstdint>
 #include <cstring>
 #include <stdexcept>
 
@@ -42,20 +40,20 @@ void annotator_raw_impl::add_tag(uint64_t offset, pmt::pmt_t key, pmt::pmt_t val
     gr::thread::scoped_lock l(d_mutex);
 
     tag_t tag;
-    tag.srcid = pmt::intern(name());
+    tag.offset = offset;
     tag.key = key;
     tag.value = val;
-    tag.offset = offset;
+    tag.srcid = pmt::intern(name());
 
-    // add our new tag
-    d_queued_tags.push_back(tag);
-    // make sure our tags are in offset order
-    std::sort(d_queued_tags.begin(), d_queued_tags.end(), tag_t::offset_compare);
     // make sure we are not adding an item in the past!
-    if (tag.offset > nitems_read(0)) {
-        throw std::runtime_error(
-            "annotator_raw::add_tag: item added too far in the past.");
+    if (tag.offset < nitems_read(0)) {
+        throw std::runtime_error(fmt::format("annotator_raw::add_tag: item added too far "
+                                             "in the past at {}; we're already at {}.",
+                                             tag.offset,
+                                             nitems_read(0)));
     }
+    // add our new tag
+    d_queued_tags.insert(tag);
 }
 
 int annotator_raw_impl::work(int noutput_items,
@@ -64,25 +62,26 @@ int annotator_raw_impl::work(int noutput_items,
 {
     gr::thread::scoped_lock l(d_mutex);
 
-    const char* in = (const char*)input_items[0];
-    char* out = (char*)output_items[0];
-
     uint64_t start_N = nitems_read(0);
     uint64_t end_N = start_N + (uint64_t)(noutput_items);
 
     // locate queued tags that fall in this range and insert them when appropriate
-    std::vector<tag_t>::iterator i = d_queued_tags.begin();
-    while (i != d_queued_tags.end()) {
-        if ((*i).offset >= start_N && (*i).offset < end_N) {
-            add_item_tag(0, (*i).offset, (*i).key, (*i).value, (*i).srcid);
-            i = d_queued_tags.erase(i);
-        } else {
-            break;
+    const auto lower_bound = d_queued_tags.lower_bound(start_N);
+    if (lower_bound != d_queued_tags.end()) {
+        // at least one element in range
+        const auto upper_bound = d_queued_tags.upper_bound(end_N);
+        for (auto iterator = lower_bound; iterator != upper_bound; ++iterator) {
+            add_item_tag(0, *iterator);
         }
+        if (lower_bound != d_queued_tags.begin()) {
+            d_logger->error("dropping in-past tags");
+        }
+        d_queued_tags.erase(d_queued_tags.begin(), upper_bound);
     }
 
+
     // copy data across
-    memcpy(out, in, noutput_items * d_itemsize);
+    memcpy(output_items[0], input_items[0], noutput_items * d_itemsize);
     return noutput_items;
 }
 
diff --git a/gr-blocks/lib/annotator_raw_impl.h b/gr-blocks/lib/annotator_raw_impl.h
index a17b6ae27c..ac84187ee3 100644
--- a/gr-blocks/lib/annotator_raw_impl.h
+++ b/gr-blocks/lib/annotator_raw_impl.h
@@ -13,6 +13,9 @@
 
 #include <gnuradio/blocks/annotator_raw.h>
 #include <gnuradio/thread/thread.h>
+#include <type_traits>
+#include <set>
+#include <tuple>
 
 namespace gr {
 namespace blocks {
@@ -20,9 +23,29 @@ namespace blocks {
 class annotator_raw_impl : public annotator_raw
 {
 private:
-    const size_t d_itemsize;
-    std::vector<tag_t> d_queued_tags;
+    struct tag_comparator {
+        using is_transparent = std::true_type;
+        //!\brief comparator over all fields in a tag, not just the offset
+        constexpr bool operator()(const tag_t& l, const tag_t& r) const
+        {
+            return std::tie(l.offset, l.key, l.value, l.srcid) <
+                   std::tie(r.offset, r.key, r.value, r.srcid);
+        }
+        constexpr bool operator()(const tag_t& l,
+                                  const decltype(tag_t::offset)& r_offset) const
+        {
+            return l.offset < r_offset;
+        }
+        constexpr bool operator()(const decltype(tag_t::offset)& l_offset,
+                                  const tag_t& r) const
+        {
+            return l_offset < r.offset;
+        }
+    };
+    using tag_container = std::set<tag_t, tag_comparator>;
+    tag_container d_queued_tags;
     gr::thread::mutex d_mutex;
+    const size_t d_itemsize;
 
 public:
     annotator_raw_impl(size_t sizeof_stream_item);
diff --git a/gr-blocks/python/blocks/qa_annotator_raw.py b/gr-blocks/python/blocks/qa_annotator_raw.py
new file mode 100644
index 0000000000..708ee98262
--- /dev/null
+++ b/gr-blocks/python/blocks/qa_annotator_raw.py
@@ -0,0 +1,94 @@
+#!/usr/bin/env python
+#
+# Copyright 2025 Marcus Müller
+#
+# This file is part of GNU Radio
+#
+# SPDX-License-Identifier: GPL-3.0-or-later
+#
+#
+
+
+from gnuradio import gr, gr_unittest
+from gnuradio.blocks import vector_sink_b, vector_source_b, annotator_raw, throttle
+from time import sleep
+import pmt
+
+
+class test_annotator_raw(gr_unittest.TestCase):
+
+    def compare_tag_iterables(self, tags_out: list, tags_in: list, name: str):
+        for tag, ref in zip(tags_out, tags_in):
+            self.assertTupleEqual(
+                (tag.offset, tag.key, tag.value),
+                ref,
+                "tag contents differ",
+            )
+            self.assertEqual(pmt.to_python(tag.srcid), name)
+
+    def setUp(self):
+        self.tb = gr.top_block()
+
+    def tearDown(self):
+        self.tb = None
+
+    def test_001_instantiation(self):
+        blk = annotator_raw(1)
+        self.assertTrue(blk)
+
+    def test_002_preseed(self):
+        N = 1000
+        tags_in = [(n * N, pmt.mp(f"key_{n}"), pmt.from_long(n * 10)) for n in range(N)]
+
+        source = vector_source_b([i % 256 for i in range(N * N)], repeat=False)
+        blk = annotator_raw(gr.sizeof_char)
+        sink = vector_sink_b(reserve_items=N * N)
+        self.tb.connect(source, blk, sink)
+
+        for tag_tuple in tags_in:
+            blk.add_tag(*tag_tuple)
+
+        self.tb.run()
+
+        self.assertEqual(
+            N * N, len(sink.data()), "did not get correct number of samples"
+        )
+
+        tags_out = sink.tags()
+        self.compare_tag_iterables(tags_out, tags_in, blk.name())
+
+    def test_003_late_insertion(self):
+        N = 1000
+        total_time = 0.5
+        tags_in = [
+            (n * N, pmt.mp(f"key_{n}"), pmt.from_long(n * 10)) for n in range(N // 2, N)
+        ]
+
+        source = vector_source_b([i % 256 for i in range(N * N)], repeat=False)
+        slower = throttle(
+            gr.sizeof_char, N * N / total_time, maximum_items_per_chunk=16
+        )
+        blk = annotator_raw(gr.sizeof_char)
+        sink = vector_sink_b(reserve_items=N * N)
+        self.tb.connect(source, slower, blk, sink)
+
+        self.tb.start()
+        sleep(0.4 * total_time)
+        # we should be a fair bit into the input, but not yet halfway through
+        for tag_tuple in tags_in:
+            blk.add_tag(*tag_tuple)
+        self.assertRaises(
+            RuntimeError, lambda: blk.add_tag(0, pmt.PMT_NIL, pmt.PMT_NIL)
+        )
+        self.tb.wait()
+
+        self.assertEqual(
+            N * N, len(sink.data()), "did not get correct number of samples"
+        )
+
+        tags_out = sink.tags()
+        self.compare_tag_iterables(tags_out, tags_in, blk.name())
+
+
+if __name__ == "__main__":
+    gr_unittest.run(test_annotator_raw)
-- 
2.47.3

