File: PlacesSemanticHistoryManager.sys.mjs

package info (click to toggle)
firefox 142.0.1-1
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 4,591,884 kB
  • sloc: cpp: 7,451,570; javascript: 6,392,463; ansic: 3,712,584; python: 1,388,569; xml: 629,223; asm: 426,919; java: 184,857; sh: 63,439; makefile: 19,150; objc: 13,059; perl: 12,983; yacc: 4,583; cs: 3,846; pascal: 3,352; lex: 1,720; ruby: 1,003; exp: 762; php: 436; lisp: 258; awk: 247; sql: 66; sed: 53; csh: 10
file content (919 lines) | stat: -rw-r--r-- 30,310 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
/* This Source Code Form is subject to the terms of the Mozilla Public
 * License, v. 2.0. If a copy of the MPL was not distributed with this
 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */

/**
 * PlacesSemanticHistoryManager manages the semantic.sqlite database and provides helper
 * methods for initializing, querying, and updating semantic data.
 *
 * This module handles embeddings-based semantic search capabilities using the
 * Places database and an ML engine for vector operations.
 */

/**
 * @import {OpenedConnection} from "resource://gre/modules/Sqlite.sys.mjs"
 */

const lazy = {};

ChromeUtils.defineESModuleGetters(lazy, {
  DeferredTask: "resource://gre/modules/DeferredTask.sys.mjs",
  AsyncShutdown: "resource://gre/modules/AsyncShutdown.sys.mjs",
  PlacesUtils: "resource://gre/modules/PlacesUtils.sys.mjs",
  PlacesSemanticHistoryDatabase:
    "resource://gre/modules/PlacesSemanticHistoryDatabase.sys.mjs",
  EmbeddingsGenerator: "chrome://global/content/ml/EmbeddingsGenerator.sys.mjs",
});

ChromeUtils.defineLazyGetter(lazy, "logger", function () {
  return lazy.PlacesUtils.getLogger({ prefix: "PlacesSemanticHistoryManager" });
});

// Constants to support an alternative frecency algorithm.
ChromeUtils.defineLazyGetter(lazy, "PAGES_FRECENCY_FIELD", () => {
  return lazy.PlacesUtils.history.isAlternativeFrecencyEnabled
    ? "alt_frecency"
    : "frecency";
});

// Time between deferred task executions.
const DEFERRED_TASK_INTERVAL_MS = 3000;
// Maximum time to wait for an idle before the task is executed anyway.
const DEFERRED_TASK_MAX_IDLE_WAIT_MS = 2 * 60000;
// Number of entries to update at once.
const DEFAULT_CHUNK_SIZE = 50;
const ONE_MiB = 1024 * 1024;
// minimum title length threshold; Usage len(title || description) > MIN_TITLE_LENGTH
const MIN_TITLE_LENGTH = 4;

class PlacesSemanticHistoryManager {
  #promiseConn;
  #engine = undefined;
  #embeddingSize;
  #rowLimit;
  #samplingAttrib;
  #changeThresholdCount;
  #distanceThreshold;
  #finalized = false;
  #updateTask = null;
  #prevPagesRankChangedCount = 0;
  #pendingUpdates = true;
  testFlag = false;
  #updateTaskLatency = [];
  embedder;
  qualifiedForSemanticSearch = false;
  #promiseRemoved = null;
  enoughEntries = false;
  #shutdownProgress = { state: "Not started" };
  #deferredTaskInterval = DEFERRED_TASK_INTERVAL_MS;
  #lastMaxChunksCount = 0;

  /**
   * Constructor for PlacesSemanticHistoryManager.
   *
   * @param {Object} options - Configuration options.
   * @param {number} [options.embeddingSize=384] - Size of embeddings used for vector operations.
   * @param {number} [options.rowLimit=10000] - Maximum number of rows to process from the database.
   * @param {string} [options.samplingAttrib="frecency"] - Attribute used for sampling rows.
   * @param {number} [options.changeThresholdCount=3] - Threshold of changed rows to trigger updates.
   * @param {number} [options.distanceThreshold=0.6] - Cosine distance threshold to determine similarity.
   * @param {boolean} [options.testFlag=false] - Flag for test behavior.
   */
  constructor({
    embeddingSize = 384,
    rowLimit = 10000,
    samplingAttrib = "frecency",
    changeThresholdCount = 3,
    distanceThreshold = 0.6,
    testFlag = false,
  } = {}) {
    this.QueryInterface = ChromeUtils.generateQI([
      "nsIObserver",
      "nsISupportsWeakReference",
    ]);

    // Do not initialize during shutdown.
    if (
      Services.startup.isInOrBeyondShutdownPhase(
        Ci.nsIAppStartup.SHUTDOWN_PHASE_APPSHUTDOWNCONFIRMED
      )
    ) {
      this.#finalized = true;
      return;
    }
    this.embedder = new lazy.EmbeddingsGenerator(embeddingSize);
    this.semanticDB = new lazy.PlacesSemanticHistoryDatabase({
      embeddingSize,
      fileName: "places_semantic.sqlite",
    });
    this.qualifiedForSemanticSearch =
      this.embedder.isEnoughPhysicalMemoryAvailable() &&
      this.embedder.isEnoughCpuCoresAvailable();

    lazy.AsyncShutdown.appShutdownConfirmed.addBlocker(
      "SemanticManager: shutdown",
      () => this.shutdown(),
      { fetchState: () => this.#shutdownProgress }
    );

    // Add the observer for pages-rank-changed and history-cleared topics
    this.handlePlacesEvents = this.handlePlacesEvents.bind(this);
    lazy.PlacesUtils.observers.addListener(
      ["pages-rank-changed", "history-cleared", "page-removed"],
      this.handlePlacesEvents
    );

    this.#rowLimit = rowLimit;
    this.#embeddingSize = embeddingSize;
    this.#samplingAttrib = samplingAttrib;
    this.#changeThresholdCount = changeThresholdCount;
    this.#distanceThreshold = distanceThreshold;
    this.testFlag = testFlag;
    this.#updateTaskLatency = [];
    lazy.logger.trace("PlaceSemanticManager constructor");

    // When semantic history is disabled or not available anymore due to system
    // requirements, we want to remove the database files, though we don't want
    // to check on disk on every startup, thus we use a pref. The removal is
    // done on startup anyway, as it's less likely to fail.
    // We check UserValue because users may set it to false to try to disable
    // the feature, then if we'd check the value the files would not be removed.
    let wasInitialized = Services.prefs.prefHasUserValue(
      "places.semanticHistory.initialized"
    );
    let isAvailable = this.canUseSemanticSearch;
    let removeFiles =
      (wasInitialized && !isAvailable) ||
      Services.prefs.getBoolPref(
        "places.semanticHistory.removeOnStartup",
        false
      );
    if (removeFiles) {
      lazy.logger.info("Removing database files on startup");
      Services.prefs.clearUserPref("places.semanticHistory.removeOnStartup");
      this.#promiseRemoved = this.semanticDB
        .removeDatabaseFiles()
        .catch(console.error);
    }
    if (!isAvailable) {
      Services.prefs.clearUserPref("places.semanticHistory.initialized");
    } else if (!wasInitialized) {
      Services.prefs.setBoolPref("places.semanticHistory.initialized", true);
    }
  }

  /**
   * Connects to the semantic.sqlite database and attaches the Places DB.
   *
   * @returns {Promise<object>}
   *   A promise resolving to the database connection.
   */
  async getConnection() {
    if (
      Services.startup.isInOrBeyondShutdownPhase(
        Ci.nsIAppStartup.SHUTDOWN_PHASE_APPSHUTDOWNCONFIRMED
      ) ||
      !this.canUseSemanticSearch
    ) {
      return null;
    }
    // We must eventually wait for removal to finish.
    await this.#promiseRemoved;

    // Avoid re-entrance using a cached promise rather than handing off a conn.
    if (!this.#promiseConn) {
      this.#promiseConn = this.semanticDB.getConnection().then(conn => {
        // Kick off updates.
        this.#createOrUpdateTask();
        this.onPagesRankChanged();
        return conn;
      });
    }
    return this.#promiseConn;
  }

  /**
   * Checks whether the semantic-history vector DB is *sufficiently populated*.
   *
   * We look at the **top N** Places entries (N = `#rowLimit`, ordered by
   * `#samplingAttrib`) and count how many of them already have an embedding in
   * `vec_history_mapping`.  If **more than completionThreshold %** are *missing* we consider the
   * DB **not ready** and set to true when the completionThreshold reaches
   *
   * The boolean result is memoised in `this.enoughEntries`; subsequent
   * calls return that cached value to avoid repeating the query.
   *
   * @returns {Promise<boolean>}
   *   `true`  – **not enough** entries yet (pending / total ≥ completionThreshold)
   *   `false` – DB is sufficiently populated (pending / total < completionThreshold)
   */
  async hasSufficientEntriesForSearching() {
    if (this.enoughEntries) {
      // Return cached answer if we already ran once.
      return true;
    }
    let conn = await this.getConnection();

    // Compute total candidates and how many of them updated with vectors.
    const [row] = await conn.execute(
      `
      WITH top_places AS (
        SELECT url_hash FROM moz_places
        WHERE title NOTNULL
          AND length(title || ifnull(description,'')) > :min_title_length
          AND last_visit_date NOTNULL
          AND frecency > 0
        ORDER BY ${this.#samplingAttrib} DESC
        LIMIT :rowLimit
      )
      SELECT
        (SELECT COUNT(*) FROM top_places) AS total,
        (SELECT COUNT(*) FROM top_places tp
         JOIN vec_history_mapping map USING (url_hash)) AS completed
      `,
      {
        rowLimit: this.#rowLimit,
        min_title_length: MIN_TITLE_LENGTH,
      }
    );

    const total = row.getResultByName("total");
    const completed = row.getResultByName("completed");
    const ratio = total ? completed / total : 0;

    const completionThreshold = Services.prefs.getFloatPref(
      "places.semanticHistory.completionThreshold",
      0.5
    );
    // Ready once ≥ completionThreshold % completed.
    this.enoughEntries = ratio >= completionThreshold;

    if (this.enoughEntries) {
      lazy.logger.debug(
        `Semantic-DB status — completed: ${completed}/${total} ` +
          `(${(ratio * 100).toFixed(1)} %). ` +
          (this.enoughEntries
            ? "Threshold met; update task can run at normal cadence."
            : "Below threshold; updater remains armed for frequent updates.")
      );
    }

    return this.enoughEntries;
  }

  /**
   * Determines if semantic search can be used based on preferences
   * and hardware qualification criteria
   *
   * @returns {boolean} - Returns `true` if semantic search can be used,
   *   else false
   */
  get canUseSemanticSearch() {
    return (
      this.qualifiedForSemanticSearch &&
      Services.prefs.getBoolPref("browser.ml.enable", true) &&
      Services.prefs.getBoolPref("places.semanticHistory.featureGate", false)
    );
  }

  handlePlacesEvents(events) {
    for (const { type } of events) {
      switch (type) {
        case "pages-rank-changed":
        case "history-cleared":
        case "page-removed":
          this.onPagesRankChanged();
          break;
      }
    }
  }

  /**
   * Handles updates triggered by database changes or rank changes.
   *
   * This is invoked whenever the `"pages-rank-changed"` or
   * `"history-cleared"` event is observed.
   * It re-arms the DeferredTask for updates if not finalized.
   *
   * @private
   * @returns Promise<void>
   */
  async onPagesRankChanged() {
    if (this.#updateTask && !this.#updateTask.isFinalized) {
      lazy.logger.trace("Arm update task");
      this.#updateTask.arm();
    }
  }

  // getter for testing purposes
  getUpdateTaskLatency() {
    return this.#updateTaskLatency;
  }

  /**
   * Sets the DeferredTask interval for testing purposes.
   * @param {number} val minimum milliseconds between deferred task executions.
   */
  setDeferredTaskIntervalForTests(val) {
    this.#deferredTaskInterval = val;
  }

  /**
   * Creates or updates the DeferredTask for managing updates to the semantic DB.
   */
  #createOrUpdateTask() {
    if (this.#finalized) {
      lazy.logger.trace(`Not resurrecting #updateTask because finalized`);
      return;
    }
    if (this.#updateTask) {
      this.#updateTask.disarm();
      this.#updateTask.finalize().catch(console.error);
    }

    // Syncs the semantic search database with history changes. It first checks
    // if enough page changes have occurred to warrant an update. If so, it
    // finds history entries that need to be added or removed from the vector
    // database. It then processes a chunk of additions, for which it generates
    // embeddings, and deletions in batches. It will re-arm itself if more work
    // remains, otherwise marks the update as complete and notifies.
    this.#updateTask = new lazy.DeferredTask(
      async () => {
        if (this.#finalized) {
          return;
        }

        // Capture updateTask startTime.
        const updateStartTime = Cu.now();

        try {
          lazy.logger.info("Running vector DB update task...");
          let conn = await this.getConnection();
          let pagesRankChangedCount =
            PlacesObservers.counts.get("pages-rank-changed") +
            PlacesObservers.counts.get("history-cleared") +
            PlacesObservers.counts.get("page-removed");
          if (
            pagesRankChangedCount - this.#prevPagesRankChangedCount <
              this.#changeThresholdCount &&
            !this.#pendingUpdates &&
            !this.testFlag
          ) {
            lazy.logger.info("No significant changes detected.");
            return;
          }

          this.#prevPagesRankChangedCount = pagesRankChangedCount;
          const startTime = Cu.now();

          lazy.logger.info(
            `Changes exceed threshold (${this.#changeThresholdCount}).`
          );

          let { count: addCount, results: addRows } =
            await this.findAddsChunk(conn);
          let { count: deleteCount, results: deleteRows } =
            await this.findDeletesChunk(conn);

          // We already have startTime for profile markers, so just use it
          // instead of tracking timer within the distribution.
          Glean.places.semanticHistoryFindChunksTime.accumulateSingleSample(
            Cu.now() - startTime
          );

          lazy.logger.info(
            `Total rows to add: ${addCount}, delete: ${deleteCount}`
          );

          if (addCount || deleteCount) {
            let chunkTimer =
              Glean.places.semanticHistoryChunkCalculateTime.start();

            let chunksCount =
              Math.ceil(addCount / DEFAULT_CHUNK_SIZE) +
              Math.ceil(deleteCount / DEFAULT_CHUNK_SIZE);
            if (chunksCount > this.#lastMaxChunksCount) {
              this.#lastMaxChunksCount = chunksCount;
              Glean.places.semanticHistoryMaxChunksCount.set(chunksCount);
            }

            await this.updateVectorDB(conn, addRows, deleteRows);
            ChromeUtils.addProfilerMarker(
              "updateVectorDB",
              startTime,
              "Details about updateVectorDB event"
            );

            Glean.places.semanticHistoryChunkCalculateTime.stopAndAccumulate(
              chunkTimer
            );
          }

          if (
            addCount > DEFAULT_CHUNK_SIZE ||
            deleteCount > DEFAULT_CHUNK_SIZE
          ) {
            // There's still entries to update, re-arm the task.
            this.#pendingUpdates = true;
            this.#updateTask.arm();
            return;
          }

          this.#pendingUpdates = false;
          Services.obs.notifyObservers(
            null,
            "places-semantichistorymanager-update-complete"
          );
          if (this.testFlag) {
            this.#updateTask.arm();
          }
        } catch (error) {
          lazy.logger.error("Error executing vector DB update task:", error);
        } finally {
          lazy.logger.info("Vector DB update task completed.");
          const updateEndTime = Cu.now();
          const updateTaskTime = updateEndTime - updateStartTime;
          this.#updateTaskLatency.push(updateTaskTime);

          lazy.logger.info(
            `DeferredTask update completed in ${updateTaskTime} ms.`
          );
        }
      },
      this.#deferredTaskInterval,
      DEFERRED_TASK_MAX_IDLE_WAIT_MS
    );
    lazy.logger.info("Update task armed.");
  }

  /**
   * Finalizes the PlacesSemanticHistoryManager by cleaning up resources.
   *
   * This ensures any tasks are finalized and the manager is properly
   * cleaned up during shutdown.
   *
   */
  #finalize() {
    lazy.logger.trace("Finalizing SemanticManager");
    // We don't mind about tasks completiion, since we can execute them in the
    // next session.
    this.#updateTask?.disarm();
    this.#updateTask?.finalize().catch(console.error);
    this.#finalized = true;
  }

  /**
   * Find semantic vector entries to be added.
   *
   * @param {OpenedConnection} conn a SQLite connection to the database.
   * @returns Promise<{count: number, results: { url_hash: string } }>}
   *   Resolves to an array of objects containing results, limited to
   *   DEFAULT_CHUNK_SIZE elements, and the total count of found entries.
   */
  async findAddsChunk(conn) {
    // find any adds after successful checkForChanges
    const rows = await conn.executeCached(
      `
      WITH top_places AS (
        SELECT url_hash, trim(title || " " || IFNULL(description, '')) AS content
        FROM moz_places
        WHERE title NOTNULL
          AND length(title || ifnull(description,'')) > :min_title_length
          AND last_visit_date NOTNULL
          AND frecency > 0
        ORDER BY ${this.#samplingAttrib} DESC
        LIMIT :rowLimit
      ),
      updates AS (
        SELECT top.url_hash, top.content
        FROM top_places top
        LEFT JOIN vec_history_mapping map USING (url_hash)
        WHERE map.url_hash IS NULL
      )
      SELECT url_hash, content, (SELECT count(*) FROM updates) AS total
      FROM updates
      LIMIT :chunkSize
    `,
      {
        rowLimit: this.#rowLimit,
        min_title_length: MIN_TITLE_LENGTH,
        chunkSize: DEFAULT_CHUNK_SIZE,
      }
    );

    return {
      count: rows[0]?.getResultByName("total") || 0,
      results: rows.map(row => ({
        url_hash: row.getResultByName("url_hash"),
        content: row.getResultByName("content"),
      })),
    };
  }

  /**
   * Find semantic vector entries to eventually delete due to:
   * - Orphaning: URLs no longer in top_places
   * - Broken Mappings: rowid has no corresponding entry in vec_history
   *
   * @param {OpenedConnection} conn a SQLite connection to the database.
   * @returns Promise<{count: number, results: { url_hash: string } }>}
   *   Resolves to an array of objects containing results, limited to
   *   DEFAULT_CHUNK_SIZE elements, and the total count of found entries.
   */
  async findDeletesChunk(conn) {
    // find any deletes after successful checkForChanges
    const rows = await conn.executeCached(
      `
      WITH top_places AS (
        SELECT url_hash
        FROM moz_places
        WHERE title NOTNULL
          AND length(title || ifnull(description,'')) > :min_title_length
          AND last_visit_date NOTNULL
          AND frecency > 0
        ORDER BY ${this.#samplingAttrib} DESC
        LIMIT :rowLimit
      ),
      orphans AS (
        SELECT url_hash FROM vec_history_mapping
        EXCEPT
        SELECT url_hash FROM top_places
      ),
      updates AS (
        SELECT url_hash FROM orphans
        UNION
        SELECT url_hash FROM vec_history_mapping
        LEFT JOIN vec_history v USING (rowid)
        WHERE v.rowid IS NULL
      )
      SELECT url_hash, (SELECT count(*) FROM updates) AS total
      FROM updates
      LIMIT :chunkSize
    `,
      {
        rowLimit: this.#rowLimit,
        min_title_length: MIN_TITLE_LENGTH,
        chunkSize: DEFAULT_CHUNK_SIZE,
      }
    );

    return {
      count: rows[0]?.getResultByName("total") || 0,
      results: rows.map(row => ({
        url_hash: row.getResultByName("url_hash"),
      })),
    };
  }

  async updateVectorDB(conn, rowsToAdd, rowsToDelete) {
    await this.embedder.createEngineIfNotPresent();

    let batchTensors;
    if (rowsToAdd.length) {
      // Instead of calling engineRun in a loop for each row,
      // you prepare an array of requests.
      batchTensors = await this.embedder.embedMany(
        rowsToAdd.map(r => r.content)
      );
      if (batchTensors.length != rowsToAdd.length) {
        throw new Error(
          `Expected ${rowsToAdd.length} tensors, got ${batchTensors.length}`
        );
      }
    }

    await conn.executeTransaction(async () => {
      // Process each new row and the corresponding tensor.
      for (let i = 0; i < rowsToAdd.length; i++) {
        const { url_hash } = rowsToAdd[i];
        const tensor = batchTensors[i];
        try {
          if (!Array.isArray(tensor) || tensor.length !== this.#embeddingSize) {
            lazy.logger.error(
              `Got tensor with invalid length: ${Array.isArray(tensor) ? tensor.length : "non-array value"}`
            );
            continue;
          }

          // We first insert the url into vec_history_mapping, get the rowid
          // and then insert the embedding into vec_history using that.
          // Doing the opposite doesn't work, as RETURNING is not properly
          // supported by the vec extension.
          // See https://github.com/asg017/sqlite-vec/issues/229.

          // Normally there should be no conflict on url_hash, as we previously
          // checked for its existence in vec_history_mapping. Though, since
          // the hash is not unique, we may try to insert two pages with the
          // same hash value as part of the same chunk.
          let rows = await conn.executeCached(
            `
            INSERT INTO vec_history_mapping (rowid, url_hash)
            VALUES (NULL, :url_hash)
            /* This is apparently useless, but it makes RETURNING always return
               a value, while DO NOTHING would not. */
            ON CONFLICT(url_hash) DO UPDATE SET url_hash = :url_hash
            RETURNING rowid
            `,
            { url_hash }
          );
          const rowid = rows[0].getResultByName("rowid");
          if (!rowid) {
            lazy.logger.error(`Unable to get inserted rowid for: ${url_hash}`);
            continue;
          }

          // UPSERT or INSERT OR REPLACE are not yet supported by the sqlite-vec
          // extension, so we must manage the conflict manually.
          // See https://github.com/asg017/sqlite-vec/issues/127.
          try {
            await conn.executeCached(
              `
              INSERT INTO vec_history (rowid, embedding, embedding_coarse)
              VALUES (:rowid, :vector, vec_quantize_binary(:vector))
              `,
              { rowid, vector: this.tensorToBindable(tensor) }
            );
          } catch (error) {
            lazy.logger.trace(
              `Error while inserting new vector, possible conflict. Error (${error.result}): ${error.message}`
            );
            // Ideally we'd check for `error.result == Cr.NS_ERROR_STORAGE_CONSTRAINT`,
            // unfortunately sqlite-vec doesn't generate a SQLITE_CONSTRAINT
            // error in this case, so we get a generic NS_ERROR_FAILURE.
            await conn.executeCached(
              `
              DELETE FROM vec_history WHERE rowid = :rowid
              `,
              { rowid }
            );
            await conn.executeCached(
              `
              INSERT INTO vec_history (rowid, embedding, embedding_coarse)
              VALUES (:rowid, :vector, vec_quantize_binary(:vector))
              `,
              { rowid, vector: this.tensorToBindable(tensor) }
            );
          }

          lazy.logger.info(
            `Added embedding and mapping for url_hash: ${url_hash}`
          );
        } catch (error) {
          lazy.logger.error(
            `Failed to insert embedding for url_hash: ${url_hash}. Error: ${error.message}`
          );
        }
      }

      // Now apply deletions.
      for (let { url_hash } of rowsToDelete) {
        try {
          // Delete the mapping from vec_history_mapping table
          const rows = await conn.executeCached(
            `
            DELETE FROM vec_history_mapping
            WHERE url_hash = :url_hash
            RETURNING rowid
            `,
            { url_hash }
          );

          if (rows.length === 0) {
            lazy.logger.warn(`No mapping found for url_hash: ${url_hash}`);
            continue;
          }

          const rowid = rows[0].getResultByName("rowid");

          // Delete the embedding from vec_history table
          await conn.executeCached(
            `
            DELETE FROM vec_history
            WHERE rowid = :rowid
            `,
            { rowid }
          );

          lazy.logger.info(
            `Deleted embedding and mapping for url_hash: ${url_hash}`
          );
        } catch (error) {
          lazy.logger.error(
            `Failed to delete for url_hash: ${url_hash}. Error: ${error.message}`
          );
        }
      }
    });
  }

  /**
   * Shuts down the manager, ensuring cleanup of tasks and connections.
   */
  async shutdown() {
    this.#shutdownProgress.state = "In progress";
    await this.#finalize();
    this.#shutdownProgress.state = "Task finalized";
    await this.semanticDB.closeConnection();
    this.#shutdownProgress.state = "Connection closed";

    lazy.PlacesUtils.observers.removeListener(
      ["pages-rank-changed", "history-cleared", "page-removed"],
      this.handlePlacesEvents
    );

    this.#shutdownProgress.state = "Complete";
    lazy.logger.info("PlacesSemanticHistoryManager shut down.");
  }

  tensorToBindable(tensor) {
    if (!tensor) {
      throw new Error("tensorToBindable received an undefined tensor");
    }
    return new Uint8ClampedArray(new Float32Array(tensor).buffer);
  }

  /**
   * Executes an inference operation using the ML engine.
   *
   * This runs the engine's inference pipeline on the provided request and
   * checks if changes to the rank warrant triggering an update.
   *
   * @param {object} queryContext
   *   The request to run through the engine.
   * @param {string} queryContext.searchString
   *   The search string used for the request.
   * @returns {Promise<object>}
   *   The result of the engine's inference pipeline.
   */
  async infer(queryContext) {
    const inferStartTime = Cu.now();
    let results = [];
    await this.embedder.ensureEngine();
    let tensor = await this.embedder.embed(queryContext.searchString);

    if (!tensor) {
      return results;
    }

    let metrics = tensor.metrics;

    // If tensor is a nested array with a single element, extract the inner array.
    if (
      Array.isArray(tensor) &&
      tensor.length === 1 &&
      Array.isArray(tensor[0])
    ) {
      tensor = tensor[0];
    }

    if (!Array.isArray(tensor) || tensor.length !== this.#embeddingSize) {
      lazy.logger.info(`Got tensor with length ${tensor.length}`);
      return results;
    }
    let conn = await this.getConnection();

    let rows = await conn.executeCached(
      `
      WITH coarse_matches AS (
        SELECT rowid,
               embedding
        FROM vec_history
        WHERE embedding_coarse match vec_quantize_binary(:vector)
        ORDER BY distance
        LIMIT 100
      ),
      matches AS (
        SELECT url_hash, vec_distance_cosine(embedding, :vector) AS distance
        FROM vec_history_mapping
        JOIN coarse_matches USING (rowid)
        WHERE distance <= :distanceThreshold
        ORDER BY distance
        LIMIT 2
      )
      SELECT id,
             title,
             url,
             distance,
             frecency,
             last_visit_date
      FROM moz_places
      JOIN matches USING (url_hash)
      WHERE ${lazy.PAGES_FRECENCY_FIELD} <> 0
      ORDER BY distance
      `,
      {
        vector: this.tensorToBindable(tensor),
        distanceThreshold: this.#distanceThreshold,
      }
    );

    for (let row of rows) {
      results.push({
        id: row.getResultByName("id"),
        title: row.getResultByName("title"),
        url: row.getResultByName("url"),
        distance: row.getResultByName("distance"),
        frecency: row.getResultByName("frecency"),
        lastVisit: row.getResultByName("last_visit_date"),
      });
    }

    // Add a duration marker, representing a span of time, with some additional text
    ChromeUtils.addProfilerMarker(
      "semanticHistorySearch",
      inferStartTime,
      "semanticHistorySearch details"
    );

    return { results, metrics };
  }

  // for easier testing purpose.
  async engineRun(request) {
    return await this.#engine.run(request);
  }

  /**
   * Performs a WAL checkpoint to flush all pending writes from WAL to the main database file.
   * Then measures the final disk size of semantic.sqlite.
   * **This method is for test purposes only.**
   *
   * @returns {Promise<number>} - The size of `semantic.sqlite` in bytes after checkpointing.
   */
  async checkpointAndMeasureDbSize() {
    let conn = await this.getConnection();

    try {
      lazy.logger.info("Starting WAL checkpoint on semantic.sqlite");

      // Perform a full checkpoint to move WAL data into the main database file
      await conn.execute(`PRAGMA wal_checkpoint(FULL);`);
      await conn.execute(`PRAGMA wal_checkpoint(TRUNCATE);`);

      // Ensure database is in WAL mode
      let journalMode = await conn.execute(`PRAGMA journal_mode;`);
      lazy.logger.info(
        `Journal Mode after checkpoint: ${journalMode[0].getResultByName("journal_mode")}`
      );

      // Measure the size of `semantic.sqlite` after checkpoint
      const semanticDbPath = this.semanticDB.databaseFilePath;
      let { size } = await IOUtils.stat(semanticDbPath);
      const sizeInMB = size / ONE_MiB;

      lazy.logger.info(
        `Size of semantic.sqlite after checkpoint: ${sizeInMB} mb`
      );
      return sizeInMB;
    } catch (error) {
      lazy.logger.error(
        "Error during WAL checkpoint and size measurement:",
        error
      );
      return null;
    }
  }

  //getters
  getEmbeddingSize() {
    return this.#embeddingSize;
  }

  getRowLimit() {
    return this.#rowLimit;
  }

  getPrevPagesRankChangeCount() {
    return this.#prevPagesRankChangedCount;
  }

  getPendingUpdatesStatus() {
    return this.#pendingUpdates;
  }

  //for test purposes
  stopProcess() {
    this.#finalize();
  }
}

/**
 * @type {PlacesSemanticHistoryManager}
 *   Internal holder for the singleton.
 */
let gSingleton = null;

/**
 * Get the one shared semantic‐history manager.
 * @param {Object} [options] invokes PlacesSemanticHistoryManager constructor on first call or if recreate==true
 * @param {boolean} recreate set could true only for testing purposes and should not be true in production
 */
export function getPlacesSemanticHistoryManager(
  options = {},
  recreate = false
) {
  if (!gSingleton || recreate) {
    gSingleton = new PlacesSemanticHistoryManager(options);
  }
  return gSingleton;
}