File: retrievalengine.cpp

package info (click to toggle)
classified-ads 0.13-1
  • links: PTS, VCS
  • area: main
  • in suites: bullseye, buster, sid
  • size: 6,772 kB
  • sloc: cpp: 34,291; tcl: 1,175; xml: 64; makefile: 40
file content (340 lines) | stat: -rw-r--r-- 15,965 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
/*     -*-C++-*- -*-coding: utf-8-unix;-*-
  Classified Ads is Copyright (c) Antti Järvinen 2013-2017.

  This file is part of Classified Ads.

  Classified Ads is free software; you can redistribute it and/or
  modify it under the terms of the GNU Lesser General Public
  License as published by the Free Software Foundation; either
  version 2.1 of the License, or (at your option) any later version.

  Classified Ads is distributed in the hope that it will be useful,
  but WITHOUT ANY WARRANTY; without even the implied warranty of
  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
  Lesser General Public License for more details.

  You should have received a copy of the GNU Lesser General Public
  License along with Classified Ads; if not, write to the Free Software
  Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
*/
#include "retrievalengine.h"
#include "../log.h"
#include "../controller.h"
#include "../datamodel/model.h"
#include "node.h"
#include "connection.h"

RetrievalEngine::RetrievalEngine(Controller* aController,
                                 Model& aModel) :
    QTimer(aController),
    iController(aController),
    iModel(aModel),
    iNowRunning(false) {
    iNodeCandidatesToTryQuery.clear() ;
    connect(this, SIGNAL(timeout()), this, SLOT(run()));
}

RetrievalEngine::~RetrievalEngine() {
    LOG_STR("RetrievalEngine::~RetrievalEngine") ;
    emptyNodeCandidateList() ;
    LOG_STR("RetrievalEngine::~RetrievalEngine out") ;
}


void RetrievalEngine::run() {
    if ( iNowRunning == false ) {
        iNowRunning = true ;
        if ( iObjectBeingRetrieved.iRequestedItem == KNullHash ) {
            // we had nothing to retrieve, go on and start
            iModel.lock() ;
            if ( iDownloadQueue.size() > 0 ) {
                iObjectBeingRetrieved = iDownloadQueue.takeAt(0) ;
                iObjectBeingRetrieved.iState = NetworkRequestExecutor::NewRequest ;
            }
            iModel.unlock() ;
            iObjectBeingRetrieved.iTimeStampOfLastActivity= QDateTime::currentDateTimeUtc().toTime_t();
        }

        if ( iObjectBeingRetrieved.iRequestedItem != KNullHash ) {
            iModel.lock() ;
            switch ( iObjectBeingRetrieved.iState ) {

            case NetworkRequestExecutor::NewRequest: {
                // do internal initialization:
                iNodesSuccessfullyConnected.clear();
                iNodesFailurefullyConnected.clear();
                emptyNodeCandidateList() ;

                // initial stage, check if we have named node:
                if( iObjectBeingRetrieved.iDestinationNode != KNullHash ) {
                    // we have candidate, check if it is connected, or should
                    // we have it on wishlist:
                    if(iModel.nodeModel().isNodeAlreadyConnected(iObjectBeingRetrieved.iDestinationNode)) {
                        sendQueryToNode(iObjectBeingRetrieved.iDestinationNode) ;
                    } else {
                        iNodeCandidatesToTryQuery.append(iObjectBeingRetrieved.iDestinationNode) ;
                        iModel.nodeModel().addNodeToConnectionWishList(iObjectBeingRetrieved.iDestinationNode);
                    }
                }
                // additionally, to our connected nodes send a query
                // about nodes around that hash we want:
                NetworkRequestExecutor::NetworkRequestQueueItem hashReq ;
                hashReq.iRequestType = RequestForNodesAroundHash ;
                hashReq.iRequestedItem = iObjectBeingRetrieved.iRequestedItem ;
                hashReq.iState = NetworkRequestExecutor::NewRequest ;
                hashReq.iMaxNumberOfItems = 10 ; // ask from 10 nodes currently connected
                iModel.addNetworkRequest(hashReq) ; // datamodel will add to queue
                // and netreq-executor will send to 10 nodes
                iObjectBeingRetrieved.iState = NetworkRequestExecutor::Processing ;
                iObjectBeingRetrieved.iTimeStampOfLastActivity= QDateTime::currentDateTimeUtc().toTime_t();
            }
                break ;
            case NetworkRequestExecutor::Processing: {
                if ( iObjectBeingRetrieved.iTimeStampOfLastActivity + 10 <
                     QDateTime::currentDateTimeUtc().toTime_t() ) {
                    // 10 seconds passed since we asked for node references
                    // around hash: proceed to ask for connections to
                    // given node
                    QList<Node *>* nodesToTry =
                        iModel.nodeModel().getNodesAfterHash(iObjectBeingRetrieved.iRequestedItem,
                                                             20, // 20 nodes
                                                             300 ) ;// at most 5 hours old

                    while ( ! nodesToTry->isEmpty() ) {
                        Node* connectCandidate ( nodesToTry->takeFirst() ) ;
                        if (  connectCandidate->nodeFingerPrint() != iController->getNode().nodeFingerPrint() ) {
                            if ( iModel.nodeModel().isNodeAlreadyConnected(connectCandidate->nodeFingerPrint()) ) {
                                sendQueryToNode(connectCandidate->nodeFingerPrint()) ;
                                QLOG_STR("In retrieve::processing node " + connectCandidate->nodeFingerPrint().toString() + " was already connected") ;
                                delete connectCandidate ;
                            } else {
                                iNodeCandidatesToTryQuery.append(connectCandidate->nodeFingerPrint()) ;
                                iModel.nodeModel().addNodeToConnectionWishList(connectCandidate) ;
                                QLOG_STR("In retrieve::processing node " + connectCandidate->nodeFingerPrint().toString() + " was added to wishlist") ;
                            }
                        }
                    }
                    if ( iNodeCandidatesToTryQuery.size() > 0 ) {
                        iObjectBeingRetrieved.iState = NetworkRequestExecutor::NodeIsInWishList ;
                        iObjectBeingRetrieved.iTimeStampOfLastActivity= QDateTime::currentDateTimeUtc().toTime_t();
                    } else {
                        // did not add anything to wishlist ; set timestamp back
                        // to past so this state-machine will spam the query to
                        // every connected node ; better than nothing
                        iObjectBeingRetrieved.iState = NetworkRequestExecutor::NodeIsInWishList ;
                        iObjectBeingRetrieved.iTimeStampOfLastActivity= QDateTime::currentDateTimeUtc().toTime_t() - 120;
                    }
                }
            }
                break ;
            case NetworkRequestExecutor::NodeIsInWishList: {
                if ( iObjectBeingRetrieved.iTimeStampOfLastActivity + 60 <
                     QDateTime::currentDateTimeUtc().toTime_t() ) {
                    // after one minute decide that we're not gonna get
                    // it from nodes where we requested connection to ..
                    // spam the request to our connected nodes
                    const QList <Connection *>& currentlyOpenConnections ( iModel.getConnections() ) ;
                    Node* n ( NULL ) ;
                    Connection* c (NULL) ;
                    for ( int i = 0 ; i < 10 &&
                              i< currentlyOpenConnections.size() ;
                          i++ ) {
                        c = currentlyOpenConnections.at(i) ;
                        if ( c && ( ( n = c->node() ) != NULL ) ) {
                            sendQueryToNode(n->nodeFingerPrint()) ;
                        }
                    }
                    iObjectBeingRetrieved.iState=NetworkRequestExecutor::RequestBeingSentAround;
                    iObjectBeingRetrieved.iTimeStampOfLastActivity= QDateTime::currentDateTimeUtc().toTime_t();
                } else {
                    checkForSuccessfullyConnectedNodes() ;
                    checkForUnSuccessfullyConnectedNodes() ;
                }
            }
                break ;
            case NetworkRequestExecutor::RequestBeingSentAround: {
                if ( iObjectBeingRetrieved.iTimeStampOfLastActivity + 60 <
                     QDateTime::currentDateTimeUtc().toTime_t() ) {
                    // after one minute+one minute give up
                    emit notifyOfContentNotReceived(iObjectBeingRetrieved.iRequestedItem ,
                                                    iObjectBeingRetrieved.iRequestType ) ;
                    iObjectBeingRetrieved.iRequestedItem= KNullHash ;
                    iObjectBeingRetrieved.iState=NetworkRequestExecutor::ReadyToSend;
                } else {
                    checkForSuccessfullyConnectedNodes() ;
                    checkForUnSuccessfullyConnectedNodes() ;
                }
            }
                break;
            case NetworkRequestExecutor::ReadyToSend:
                // final stage, do nothing here
                break ;
            }
            iModel.unlock() ;
        } // if iRequestedItem was not nullhash
        iNowRunning = false ;
    }
}


void RetrievalEngine::emptyNodeCandidateList() {
    iNodeCandidatesToTryQuery.clear() ;
}


void RetrievalEngine::nodeConnectionAttemptStatus(Connection::ConnectionState aStatus,
        const Hash aHashOfAttemptedNode ) {
    LOG_STR2("RetrievalEngine::nodeConnectionAttemptStatus %d in", aStatus) ;
    LOG_STR2("RetrievalEngine::nodeConnectionAttemptStatus %s ", qPrintable(aHashOfAttemptedNode.toString())) ;
    // use model to lock our own resources too..
    iModel.lock() ;
    for ( int i = 0 ; i < iNodeCandidatesToTryQuery.size() ; i++ ) {
        if ( iNodeCandidatesToTryQuery[i] == aHashOfAttemptedNode ) {
            if ( aStatus == Connection::Open ) {
                if ( ! iNodesSuccessfullyConnected.contains(aHashOfAttemptedNode) ) {
                    iNodesSuccessfullyConnected.append(aHashOfAttemptedNode) ;
                }
            } else {
                if ( ! iNodesFailurefullyConnected.contains(aHashOfAttemptedNode) ) {
                    iNodesFailurefullyConnected.append(aHashOfAttemptedNode) ;
                }
            }
            break ;
        }
    }
    iModel.unlock() ;
    // so just add the hash into 1 of our internal arrays, run() will pick up
    // the items from list and try to do something clever
}


void RetrievalEngine::sendQueryToNode(const Hash& aNode) {
    if ( iObjectBeingRetrieved.iRequestedItem != KNullHash ) {
        if ( iNodesFailurefullyConnected.contains ( aNode ) ) {
            iNodesFailurefullyConnected.removeOne(aNode) ;
        }
        if ( iNodesSuccessfullyConnected.contains ( aNode ) ) {
            iNodesSuccessfullyConnected.removeOne(aNode) ;
        }
    }
    struct NetworkRequestExecutor::NetworkRequestQueueItem queryReq = iObjectBeingRetrieved  ;
    queryReq.iDestinationNode = aNode ;
    queryReq.iState = NetworkRequestExecutor::NewRequest ;
    iModel.addNetworkRequest(queryReq) ;
}
//
// this method is for periodically checking if any nodes we've
// asked to be connected to, has actually been connected.
// if such miracle has taken place, send the stuff and remove
// node from list-of-nodes-to-ask (iNodeCandidatesToTryQuery)
//
void RetrievalEngine::checkForSuccessfullyConnectedNodes() {
    for ( int i = iNodeCandidatesToTryQuery.size()-1 ;
            ( i >=0 ) && ( iObjectBeingRetrieved.iRequestedItem != KNullHash ) ;
            i-- ) {
        if ( iNodesSuccessfullyConnected.contains(iNodeCandidatesToTryQuery[i] ) ) {
            // if we came here it means that we've asked for connection to
            // particular node and it happened
            LOG_STR2("RetEng: After asking, node has been connected: %s", qPrintable(iNodeCandidatesToTryQuery[i].toString())) ;
            sendQueryToNode(iNodeCandidatesToTryQuery[i]) ;
        }
    }
    // can be emptied every time ; if there were anything important, it
    // has been processed above
    iNodesSuccessfullyConnected.clear() ;
}

//
// this is for pruning from ask-list those nodes that we've tried
// to connect with no success
//
void RetrievalEngine::checkForUnSuccessfullyConnectedNodes() {
    for ( int i = iNodeCandidatesToTryQuery.size()-1 ;
            ( i >=0 ) && ( iObjectBeingRetrieved.iRequestedItem != KNullHash ) ;
            i-- ) {
        if ( iNodesFailurefullyConnected.contains(iNodeCandidatesToTryQuery[i]) ) {
            // if we came here it means that we've asked for connection to
            // particular node and it did not happen
            LOG_STR2("RetEng: After asking, node has not been connected: %s", qPrintable(iNodeCandidatesToTryQuery[i].toString())) ;
            iNodeCandidatesToTryQuery.removeAt(i) ;
        }
    }
    // can be emptied every time ; if there were anything important, it
    // has been processed above
    iNodesFailurefullyConnected.clear() ;
}

void RetrievalEngine::startRetrieving(  NetworkRequestExecutor::NetworkRequestQueueItem aObject,
                                        bool aIsPriorityWork) {
    bool isAlreadyInQueue ( false ) ;

    for ( int i = 0 ; i < iDownloadQueue.size() ; i++ ) {
        if (iDownloadQueue.at(i).iRequestedItem == aObject.iRequestedItem &&
                iDownloadQueue.at(i).iRequestType == aObject.iRequestType ) {
            isAlreadyInQueue = true ;
            break ;
        }
    }
    if ( isAlreadyInQueue == false && aIsPriorityWork == false ) {
        iDownloadQueue.append(aObject) ;
    } else if (aIsPriorityWork == true &&
               iObjectBeingRetrieved.iRequestedItem != aObject.iRequestedItem) {
        // so we got another priority work: queue the current item and
        // start fetching this new one:

        // insert previously active item in front
        iDownloadQueue.insert(0, iObjectBeingRetrieved) ;
        iObjectBeingRetrieved.iRequestedItem = KNullHash ;
        emptyNodeCandidateList() ;
        iNodesSuccessfullyConnected.clear();
        iNodesFailurefullyConnected.clear() ;
        iDownloadQueue.insert(0, aObject) ; // insert in front
    } else {
        // were obviously already downloading said item, either via queue
        // or in iObjectBeingRetrieved -> do no thing
    }
}

void RetrievalEngine::notifyOfContentReceived(const Hash& aHashOfContent,
        const ProtocolItemType aTypeOfReceivdContent ) {
    ProtocolItemType typeToCheckFromQueue ;
    bool doCheck ( false ) ;
    switch ( aTypeOfReceivdContent ) {
    case BinaryBlob:
        typeToCheckFromQueue = RequestForBinaryBlob ;
        doCheck = true ;
        break ;
    case UserProfile:
        typeToCheckFromQueue = RequestForUserProfile ;
        doCheck = true ;
        break ;
    case ClassifiedAd:
        typeToCheckFromQueue = RequestForClassifiedAd ;
        doCheck = true ;
        break ;
    default:
        doCheck = false ;
        break ;
    }
    if ( doCheck ) {
        iModel.lock() ;
        if ( iObjectBeingRetrieved.iRequestedItem == aHashOfContent &&
                iObjectBeingRetrieved.iRequestType == typeToCheckFromQueue ) {
            // hurray, we got what we were waiting for
            iObjectBeingRetrieved.iRequestedItem = KNullHash ;
            iObjectBeingRetrieved.iState=NetworkRequestExecutor::ReadyToSend;
            iNodesSuccessfullyConnected.clear();
            iNodesFailurefullyConnected.clear() ;
            emptyNodeCandidateList() ;

        }
        // also check queue:
        for ( int i = iDownloadQueue.size()-1 ; i >= 0  ; i-- ) {
            if (iDownloadQueue.at(i).iRequestedItem == aHashOfContent &&
                    iDownloadQueue.at(i).iRequestType == typeToCheckFromQueue ) {
                iDownloadQueue.removeAt(i) ;
            }
        }
        iModel.unlock() ;
    }
}