File: controller.h

package info (click to toggle)
meshlab 2020.09%2Bdfsg1-1
  • links: PTS, VCS
  • area: main
  • in suites: bullseye
  • size: 45,124 kB
  • sloc: cpp: 400,238; ansic: 31,952; javascript: 1,578; sh: 387; yacc: 238; lex: 139; python: 86; makefile: 29
file content (182 lines) | stat: -rw-r--r-- 4,918 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
#ifndef GCACHE_CONTROLLER_H
#define GCACHE_CONTROLLER_H

#include "cache.h"

/** Allows to insert tokens, update priorities and generally control the cache.
*/
namespace vcg {

template <class Token>
class Controller {
 public:
  ///tokens waiting to be added, should be private
  std::vector<Token *> tokens;
  /// threads still running, but no door is open in caches,
  ///transfers might still be going on!
  bool paused;
  ///all cache threads are stopped
  bool stopped;

 public:
  ///should be protected
  Provider<Token> provider;
  ///should be protected
  std::vector<Cache<Token> *> caches;

  Controller(): paused(false), stopped(true) {}
  ~Controller() { if(!stopped) finish(); }

  ///called before the cache is started to add a cache in the chain
  /** The order in which the caches are added is from the lowest to the highest. */
  void addCache(Cache<Token> *cache) {
    if(caches.size() == 0)
      cache->setInputCache(&provider);
    else
      cache->setInputCache(caches.back());
    assert(cache->input);
    caches.push_back(cache);
  }

  ///insert the token in the cache if not already present (actual insertion is done on updatePriorities)
  bool addToken(Token *token) {
    if(token->count.testAndSetOrdered(Token::OUTSIDE, Token::CACHE)) {
      tokens.push_back(token);
      return true;
    }
    return false;
  }

  ///WARNING: migh stall for the time needed to drop tokens from cache.
  //FUNCTOR has bool operator(Token *) and return true to remove
  template<class FUNCTOR> void removeTokens(FUNCTOR functor) {
    pause(); //this might actually be unnecessary if you mark tokens to be removed
    for(int i = (int)caches.size()-1; i >= 0; i--)
      caches[i]->flush(functor);
    provider.flush(functor);

    resume();
  }

  ///if more tokens than m present in the provider, lowest priority ones will be removed
  void setMaxTokens(int m) {
    mt::mutexlocker l(&provider.heap_lock);
    provider.max_tokens = m;
  }

  ///ensure that added tokens are processed and existing ones have their priority updated.
  ///potential bug! update is done on the heaps, if something is in transit...
  void updatePriorities() {
    if(tokens.size()) {
      mt::mutexlocker l(&provider.heap_lock);
      for(unsigned int i = 0; i < tokens.size(); i++)
        provider.heap.push(tokens[i]);
      tokens.clear();
    }

    provider.pushPriorities();
    for(unsigned int i = 0; i < caches.size(); i++)
      caches[i]->pushPriorities();
  }

  ///start the various cache threads.
  void start() {
    assert(stopped);
    assert(!paused);
    assert(caches.size() > 1);
    caches.back()->final = true;
    for(unsigned int i = 0; i < caches.size(); i++) //cache 0 is a provider, and his thread is not running.
      caches[i]->start();
    stopped = false;
  }

  ///stops the cache threads
  void stop() {
    if(stopped) return;
    assert(!paused);

    //signal al caches to quit
    for(unsigned int i = 0; i < caches.size(); i++)
      caches[i]->quit = true;

    //abort current gets
    for(unsigned int i = 0; i < caches.size(); i++)
      caches[i]->abort();

    //make sure all caches actually run a cycle.
    for(unsigned int i = 0; i < caches.size(); i++)
      caches[i]->input->check_queue.open();

    for(unsigned int i = 0; i < caches.size(); i++)
      caches[i]->wait();

    stopped = true;
  }

  void finish() {
    stop();
    flush();
  }

  void pause() {
    assert(!stopped);
    assert(!paused);

    //lock all doors.
    for(unsigned int i = 0; i < caches.size(); i++)
      caches[i]->input->check_queue.lock();

    //abort all pending calls
    for(unsigned int i = 0; i < caches.size(); i++)
      caches[i]->abort();

    //make sure no cache is running (must be done after abort! otherwise we have to wait for the get)
    for(unsigned int i = 0; i < caches.size()-1; i++)
      caches[i]->input->check_queue.room.lock();

    paused = true;
  }

  void resume() {
    assert(!stopped);
    assert(paused);
    cout << "Resume" << endl;

    //unlock and open all doors
    for(unsigned int i = 0; i < caches.size(); i++) {
      caches[i]->input->check_queue.unlock();
      caches[i]->input->check_queue.open();
    }

    //allow all cache to enter again.
    for(unsigned int i = 0; i < caches.size()-1; i++)
      caches[i]->input->check_queue.room.unlock();

    paused = false;
  }
  ///empty all caches AND REMOVES ALL TOKENS!
  void flush() {
    for(int i = (int)caches.size()-1; i >= 0; i--)
      caches[i]->flush();
    provider.heap.clear();
  }

  bool newData() {
    bool c = false;
    for(int i = (int)caches.size() -1; i >= 0; i--) {
      c |= caches[i]->newData();
    }
    return c;
  }

  bool isWaiting() {
    bool waiting = true;
    for(int i = (int)caches.size() -1; i >= 0; i--) {
      waiting &= caches[i]->input->check_queue.isWaiting();
    }
    return waiting;
  }
};

} //namespace
#endif // CONTROLLER_H