File: MultithreadedMolSupplier.cpp

package info (click to toggle)
rdkit 202503.6-3
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 222,000 kB
  • sloc: cpp: 411,111; python: 78,482; ansic: 26,181; java: 8,285; javascript: 4,404; sql: 2,393; yacc: 1,626; lex: 1,267; cs: 1,090; makefile: 581; xml: 229; fortran: 183; sh: 121
file content (194 lines) | stat: -rw-r--r-- 5,708 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
#ifdef RDK_BUILD_THREADSAFE_SSS
//
//  Copyright (C) 2020 Shrey Aryan
//
//   @@ All Rights Reserved @@
//  This file is part of the RDKit.
//  The contents are covered by the terms of the BSD license
//  which is included in the file license.txt, found at the root
//  of the RDKit source tree.
//
#include "MultithreadedMolSupplier.h"

#include <RDGeneral/RDLog.h>

namespace RDKit {

namespace v2 {
namespace FileParsers {

void MultithreadedMolSupplier::close() {
  df_forceStop = true;
  d_outputQueue->setDone();

  if (df_started) {
    // Clear the queues until they are empty
    //  d_inputQueue->clear is not thread-safe
    std::tuple<std::string, unsigned int, unsigned int> r;
    while (d_inputQueue->pop(r)) {
    }
    // clear the output queues, they might be full
    //  and blocking the writer threads, note
    //  that while ending threads the writers may
    //  put a few more items back in the queue
    std::tuple<RWMol *, std::string, unsigned int> mol_r;
    while (d_outputQueue->pop(mol_r)) {
      delete std::get<0>(mol_r);
    }
  }

  endThreads();

  // notify the queue again that it is done in case
  //  anyone is waiting on it
  d_outputQueue->setDone();

  // destroy all objects in the input and output queues
  //  and anything missed put in the queues while
  //  the threads were endings
  if (df_started) {
    d_inputQueue->clear();
  }

  if (d_outputQueue) {
    // destroy all objects in the output queue
    std::tuple<RWMol *, std::string, unsigned int> r;
    while (d_outputQueue->pop(r)) {
      delete std::get<0>(r);
    }
  }

  // close external streams if any
  //  destructors are called child to parent, however the threads
  //  need to be ended before shutting down streams, so override this
  //  in the child class.
  closeStreams();
  df_started = false;
}

void MultithreadedMolSupplier::reader() {
  std::string record;
  unsigned int lineNum, index;
  while (!df_forceStop && extractNextRecord(record, lineNum, index)) {
    if (readCallback) {
      try {
        record = readCallback(record, index);
      } catch (std::exception &e) {
        BOOST_LOG(rdErrorLog)
            << "Read callback exception: " << e.what() << std::endl;
      }
    }
    auto r = std::make_tuple(record, lineNum, index);
    if (!df_forceStop) {
      d_inputQueue->push(r);
    }
  }
  d_inputQueue->setDone();
}

void MultithreadedMolSupplier::writer() {
  std::tuple<std::string, unsigned int, unsigned int> r;
  while (!df_forceStop && d_inputQueue->pop(r)) {
    try {
      std::unique_ptr<RWMol> mol(
          processMoleculeRecord(std::get<0>(r), std::get<1>(r)));
      if (!df_forceStop && mol && writeCallback) {
        writeCallback(*mol, std::get<0>(r), std::get<2>(r));
      }
      auto temp = std::tuple<RWMol *, std::string, unsigned int>{
          mol.release(), std::get<0>(r), std::get<2>(r)};

      d_outputQueue->push(temp);
    } catch (...) {
      // fill the queue wih a null value
      auto nullValue = std::tuple<RWMol *, std::string, unsigned int>{
          nullptr, std::get<0>(r), std::get<2>(r)};
      d_outputQueue->push(nullValue);
    }
  }

  // we need a lock here otherwise two threads
  //  can increment d_threadCounter even though it's
  //  atomic.
  d_threadCounterMutex.lock();
  if (d_threadCounter < d_params.numWriterThreads) {
    ++d_threadCounter;
    d_threadCounterMutex.unlock();
  } else {
    // Here we need to unlock the threadCounterMutex before we setDone on the
    //  outputQueue.  This causes a notification to the queue which may actually
    //  have elements in it.  This notification may unblock the queue which
    //  allows waiting threads to get their last attempt at adding to it
    //  which will end up here and deadlock.
    d_threadCounterMutex.unlock();
    d_outputQueue->setDone();
  }
}

std::unique_ptr<RWMol> MultithreadedMolSupplier::next() {
  if (!df_started) {
    df_started = true;
    startThreads();
  }
  std::tuple<RWMol *, std::string, unsigned int> r;
  if (!df_forceStop && d_outputQueue->pop(r)) {
    d_lastItemText = std::get<1>(r);
    d_lastRecordId = std::get<2>(r);
    std::unique_ptr<RWMol> res{std::get<0>(r)};
    if (res && nextCallback) {
      try {
        nextCallback(*res, *this);
      } catch (...) {
        // Ignore exception and proceed with mol as is.
      }
    }
    return res;
  }
  return nullptr;
}

// this calls joins on the reader and writer threads
//  and waits until completion.  To actually force a stop
//  call close which handles the input and output queues
void MultithreadedMolSupplier::endThreads() {
  if (!df_started) {
    return;
  }

  // stop the writers before stopping the readers
  //  otherwise there might be a deadlock
  for (auto &thread : d_writerThreads) {
    thread.join();
  }
  d_readerThread.join();
}

void MultithreadedMolSupplier::startThreads() {
  // run the reader function in a seperate thread
  d_readerThread = std::thread(&MultithreadedMolSupplier::reader, this);
  // run the writer function in seperate threads
  for (unsigned int i = 0; i < d_params.numWriterThreads; i++) {
    d_writerThreads.emplace_back(
        std::thread(&MultithreadedMolSupplier::writer, this));
  }
}

bool MultithreadedMolSupplier::atEnd() {
  return (d_outputQueue->isEmpty() && d_outputQueue->getDone());
}

unsigned int MultithreadedMolSupplier::getLastRecordId() const {
  return d_lastRecordId;
}

std::string MultithreadedMolSupplier::getLastItemText() const {
  return d_lastItemText;
}

void MultithreadedMolSupplier::reset() {
  UNDER_CONSTRUCTION("reset() not supported for MultithreadedMolSupplier();");
}
}  // namespace FileParsers
}  // namespace v2
}  // namespace RDKit
#endif