File: MultithreadedMolSupplier.h

package info (click to toggle)
rdkit 202503.1-5
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 220,160 kB
  • sloc: cpp: 399,240; python: 77,453; ansic: 25,517; java: 8,173; javascript: 4,005; sql: 2,389; yacc: 1,565; lex: 1,263; cs: 1,081; makefile: 580; xml: 229; fortran: 183; sh: 105
file content (187 lines) | stat: -rw-r--r-- 6,575 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
//
//  Copyright (C) 2020 Shrey Aryan
//
//   @@ All Rights Reserved @@
//  This file is part of the RDKit.
//  The contents are covered by the terms of the BSD license
//  which is included in the file license.txt, found at the root
//  of the RDKit source tree.
//
#ifdef RDK_BUILD_THREADSAFE_SSS
#ifndef MULTITHREADED_MOL_SUPPLIER
#define MULTITHREADED_MOL_SUPPLIER

#include <GraphMol/SmilesParse/SmilesParse.h>
#include <RDGeneral/BadFileException.h>
#include <RDGeneral/ConcurrentQueue.h>
#include <RDGeneral/FileParseException.h>
#include <RDGeneral/RDLog.h>
#include <RDGeneral/RDThreads.h>
#include <RDGeneral/StreamOps.h>

#include <functional>
#include <atomic>
#include <boost/tokenizer.hpp>

#include "FileParsers.h"
#include "MolSupplier.h"

typedef boost::tokenizer<boost::char_separator<char>> tokenizer;

namespace RDKit {
namespace v2 {
namespace FileParsers {
class RDKIT_FILEPARSERS_EXPORT MultithreadedMolSupplier : public MolSupplier {
  //! this is an abstract base class to concurrently supply molecules one at a
  //! time
 public:
  struct Parameters {
    unsigned int numWriterThreads = 1;
    size_t sizeInputQueue = 5;
    size_t sizeOutputQueue = 5;
  };

  MultithreadedMolSupplier() {}
  ~MultithreadedMolSupplier() override;
  //! pop elements from the output queue
  std::unique_ptr<RWMol> next() override;

  //! returns true when all records have been read from the supplier
  bool atEnd() override;

  //! included for the interface, always returns false
  bool getEOFHitOnRead() const { return false; }

  //! returns the record id of the last extracted item
  //! Note: d_LastRecordId = 0, initially therefore the value 0 is returned
  //! if and only if the function is called before extracting the first
  //! record
  unsigned int getLastRecordId() const;
  //! returns the text block for the last extracted item
  std::string getLastItemText() const;

  //! sets the callback to be applied to molecules before they are returned by
  ///! the next() function
  /*!
    \param cb: a function that takes a reference to an RWMol and a const
    reference to the MultithreadedMolSupplier. This can modify the molecule in
    place

   */
  template <typename T>
  void setNextCallback(T cb) {
    nextCallback = cb;
  }
  //! sets the callback to be applied to molecules after they are processed, but
  ///! before they are written to the output queue
  /*!
    \param cb: a function that takes a reference to an RWMol, a const reference
    to the string record, and an unsigned int record id. This can modify the
    molecule in place
  */
  template <typename T>
  void setWriteCallback(T cb) {
    writeCallback = cb;
  }
  //! sets the callback to be applied to input text records before they are
  ///! added to the input queue
  /*!
    \param cb: a function that takes a const reference to the string record and
    an unsigned int record id and returns the modified string record
  */
  template <typename T>
  void setReadCallback(T cb) {
    readCallback = cb;
  }

 protected:
  //! starts reader and writer threads
  void startThreads();

 private:
  //! reads lines from input stream to populate the input queue
  void reader();
  //! parses lines from the input queue converting them to RWMol objects
  //! populating the output queue
  void writer();
  //! finalizes the reader and writer threads
  void endThreads();
  //! disable automatic copy constructors and assignment operators
  //! for this class and its subclasses.  They will likely be
  //! carrying around stream pointers and copying those is a recipe
  //! for disaster.
  MultithreadedMolSupplier(const MultithreadedMolSupplier &);
  MultithreadedMolSupplier &operator=(const MultithreadedMolSupplier &);
  //! not yet implemented
  void reset() override;
  void init() override = 0;
  virtual bool getEnd() const = 0;
  //! extracts next record from the input file or stream
  virtual bool extractNextRecord(std::string &record, unsigned int &lineNum,
                                 unsigned int &index) = 0;
  //! processes the record into an RWMol object
  virtual RWMol *processMoleculeRecord(const std::string &record,
                                       unsigned int lineNum) = 0;

  std::atomic<unsigned int> d_threadCounter{1};  //!< thread counter
  std::vector<std::thread> d_writerThreads;      //!< vector writer threads
  std::thread d_readerThread;                    //!< single reader thread

 protected:
  std::atomic<bool> df_started = false;
  std::atomic<unsigned int> d_lastRecordId =
      0;                       //!< stores last extracted record id
  std::string d_lastItemText;  //!< stores last extracted record
  const unsigned int d_numReaderThread = 1;  //!< number of reader thread

  std::unique_ptr<
      ConcurrentQueue<std::tuple<std::string, unsigned int, unsigned int>>>
      d_inputQueue;  //!< concurrent input queue
  std::unique_ptr<
      ConcurrentQueue<std::tuple<RWMol *, std::string, unsigned int>>>
      d_outputQueue;  //!< concurrent output queue
  Parameters d_params;
  std::function<void(RWMol &, const MultithreadedMolSupplier &)> nextCallback =
      nullptr;
  std::function<void(RWMol &, const std::string &, unsigned int)>
      writeCallback = nullptr;
  std::function<std::string(const std::string &, unsigned int)> readCallback =
      nullptr;
};
}  // namespace FileParsers
}  // namespace v2

inline namespace v1 {
class RDKIT_FILEPARSERS_EXPORT MultithreadedMolSupplier : public MolSupplier {
  //! this is an abstract base class to concurrently supply molecules one at a
  //! time
 public:
  using ContainedType = v2::FileParsers::MultithreadedMolSupplier;
  MultithreadedMolSupplier() {}

  //! included for the interface, always returns false
  bool getEOFHitOnRead() const {
    if (dp_supplier) {
      return static_cast<ContainedType *>(dp_supplier.get())->getEOFHitOnRead();
    }
    return false;
  }

  //! returns the record id of the last extracted item
  //! Note: d_LastRecordId = 0, initially therefore the value 0 is returned
  //! if and only if the function is called before extracting the first
  //! record
  unsigned int getLastRecordId() const {
    PRECONDITION(dp_supplier, "no supplier");
    return static_cast<ContainedType *>(dp_supplier.get())->getLastRecordId();
  }
  //! returns the text block for the last extracted item
  std::string getLastItemText() const {
    PRECONDITION(dp_supplier, "no supplier");
    return static_cast<ContainedType *>(dp_supplier.get())->getLastItemText();
  }
};
}  // namespace v1
}  // namespace RDKit
#endif
#endif