Package: gem / 1:0.93.3-7

threadfixes.patch Patch series | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
Author: IOhannes m zmölnig
Description: fixed race-condition with threaded image loading
Applied-Upstream: 0.93.4
--- gem.orig/src/Gem/ThreadMutex.cpp
+++ gem/src/Gem/ThreadMutex.cpp
@@ -21,7 +21,6 @@
 # include <winsock2.h>
 #endif
 
-
 class gem::thread::Mutex::PIMPL {
 public:
   pthread_mutex_t*mutex;
@@ -31,10 +30,10 @@
     pthread_mutex_init(mutex, NULL); 
   }
   PIMPL(const PIMPL&org) : mutex(org.mutex), refcount(org.refcount) {
-    *refcount++;
+    ++*refcount;
   }
   ~PIMPL(void) {
-    *refcount--;
+    --*refcount;
     if(*refcount==0) {
       pthread_mutex_destroy(mutex); 
       delete mutex;
--- gem.orig/src/Gem/WorkerThread.cpp
+++ gem/src/Gem/WorkerThread.cpp
@@ -49,7 +49,7 @@
   class WorkerThread::PIMPL {
   public:
     WorkerThread*owner;
-    WorkerThread::id_t ID;
+    WorkerThread::id_t ID; /* for generating the next ID */
 
     bool keeprunning;
     bool isrunning;
@@ -60,12 +60,15 @@
     Mutex m_done;
     Semaphore s_newdata;
 
+    WorkerThread::id_t processingID; /* the ID currently processed or INVALID: must only be written in the thread! */
+
     pthread_t p_thread;
 
     PIMPL(WorkerThread*x) : owner(x), ID(0),
                             keeprunning(true), isrunning(false),
                             m_todo(Mutex()), m_done(Mutex()),
-                            s_newdata(Semaphore())
+                            s_newdata(Semaphore()),
+                            processingID(WorkerThread::INVALID)
     {
 
     }
@@ -91,6 +94,7 @@
 
         me->m_todo.lock();
         if(me->q_todo.empty()) {
+        empty:
           me->m_todo.unlock();
           //std::cerr << "THREAD: waiting for new data...freeze"<<std::endl;
           me->s_newdata.freeze();
@@ -102,8 +106,11 @@
 
           me->m_todo.lock();
         }
+        if(me->q_todo.empty())
+          goto empty;
         in=me->q_todo.front();
-        me->q_todo.POP();
+         me->processingID=in.first;
+         me->q_todo.POP();
         me->m_todo.unlock();
 
         //std::cerr << "THREAD: processing data " << in.second  << " as "<<in.first<<std::endl;
@@ -117,6 +124,7 @@
         bool newdata=true;//me->q_done.empty();
         //std::cerr<<"THREAD: processed "<< out.first <<" -> "<< newdata<<std::endl;
         me->q_done.PUSH(out);
+        me->processingID=WorkerThread::INVALID;
         me->m_done.unlock();
         //std::cerr << "THREAD: signaling newdata "<<newdata<<" for "<< out.first << std::endl;
         if(newdata)wt->signal();
@@ -216,24 +224,42 @@
   bool WorkerThread::cancel(WorkerThread::id_t ID) {
     bool success=false;
 #ifdef WORKERTHREAD_DEQUEUE
-    QUEUE< std::pair<WorkerThread::id_t, void*> > :: iterator it;
-    m_pimpl->m_todo.lock();
+    if(!success) {
+      /* cancel from TODO list */
+      QUEUE< std::pair<WorkerThread::id_t, void*> > :: iterator it;
+      //std::cerr << "cancelling "<< (int)ID <<" from TODO" << std::endl;
+      m_pimpl->m_todo.lock();
+
+      for(it=m_pimpl->q_todo.begin(); it!=m_pimpl->q_todo.end(); it++) {
+        if(it->first == ID) {
+          m_pimpl->q_todo.erase(it);
+          success=true;
+          break;
+        }
+      }
+      m_pimpl->m_todo.unlock();
 
-    for(it=m_pimpl->q_todo.begin(); it!=m_pimpl->q_todo.end(); it++) {
-      if(it->first == ID) {
-        m_pimpl->q_todo.erase(it);
-        break;
+      /* TODO: if ID is currently in the process, cancel that as well ... */
+      if(WorkerThread::INVALID != ID) {
+        /* ... or at least block until it is done... */
+        struct timeval sleep;
+        while(ID==m_pimpl->processingID) {
+          sleep.tv_sec=0;
+          sleep.tv_usec=10;
+          select(0,0,0,0,&sleep);
+        }
       }
     }
     m_pimpl->m_todo.unlock();
 #endif
+    //    std::cerr << "cancelling "<< (int)ID <<" success " << success << std::endl;
     return success;
   }
   bool WorkerThread::dequeue(WorkerThread::id_t&ID, void*&data) {
     std::pair <id_t, void*> DATA;
     DATA.first=WorkerThread::INVALID;
     DATA.second=0;
-
+    //std::cerr << "dequeuing "<< (int)ID << std::endl;
     m_pimpl->m_done.lock();
     if(!m_pimpl->q_done.empty()) {
       DATA=m_pimpl->q_done.front();
@@ -243,7 +269,7 @@
 
     ID=DATA.first;
     data=DATA.second;
-    //    std::cerr<<"dequeuing "<<data<<" as "<< ID<<std::endl;
+    //std::cerr<<"dequeuing "<<data<<" as "<< ID<<std::endl;
 
     return (WorkerThread::INVALID != ID);
   }