File: publishingengine.cpp

package info (click to toggle)
classified-ads 0.13-1
  • links: PTS, VCS
  • area: main
  • in suites: bullseye, buster, sid
  • size: 6,772 kB
  • sloc: cpp: 34,291; tcl: 1,175; xml: 64; makefile: 40
file content (394 lines) | stat: -rw-r--r-- 19,480 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
/*     -*-C++-*- -*-coding: utf-8-unix;-*-
  Classified Ads is Copyright (c) Antti Järvinen 2013-2017.

  This file is part of Classified Ads.

  Classified Ads is free software; you can redistribute it and/or
  modify it under the terms of the GNU Lesser General Public
  License as published by the Free Software Foundation; either
  version 2.1 of the License, or (at your option) any later version.

  Classified Ads is distributed in the hope that it will be useful,
  but WITHOUT ANY WARRANTY; without even the implied warranty of
  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
  Lesser General Public License for more details.

  You should have received a copy of the GNU Lesser General Public
  License along with Classified Ads; if not, write to the Free Software
  Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
*/
#include "publishingengine.h"
#include "../log.h"
#include "../controller.h"
#include "../datamodel/model.h"
#include "node.h"
#include "connection.h"

PublishingEngine::PublishingEngine(Controller* aController,
                                   Model& aModel) :
    QTimer(aController),
    iNeedsToRun(true),
    iController(aController),
    iModel(aModel),
    iNowRunning(false) {
    iNodeCandidatesToTryPush.clear() ;
    iWorkItem.iObjectHash = KNullHash ; // NULL hash in the beginning,
    connect(this, SIGNAL(timeout()), this, SLOT(run()));
}

PublishingEngine::~PublishingEngine() {
    LOG_STR("PublishingEngine::~PublishingEngine") ;
    emptyNodeCandidateList() ;
    LOG_STR("PublishingEngine::~PublishingEngine out") ;
}

// lets have the logic work like this:
//  quite often poll the table of published works.
//  when there is an item in the table, ask for nodes
//  that are destination addresses for the published items.
//  match that list with list of already-connected nodes ;
//  if we already have connections to nodes that are on
//  our list, do the following loop:
//     1. put published item into send queue
//     2. ask for notification that item has been sent ;
//        upon receiving, update the bangpath in publish-tables
//     3. select next already-connected host
//  when already-connected hosts run out and we still have
//  nodes in our list where content should be sent, run following loop
//     1. ask network-connection engine to obtain connection to host
//     2. if unsuccessful, move to next host
//     3. if success, put published item into send queue
//     4. ask for notificatoin that item has been sent ;
//        upon receiving, update the bangpath in publish-tables
//     5. select next non-connected host
//  with these steps this method should push the published content
//  into 5 hosts. each of these will do the same with publish-count=4,
//  each of those 4 will do the same with publish-count=3, each of
//  those 3 will do the same with publish-count=2
void PublishingEngine::run() {
    if ( iNowRunning == false ) {
        iNowRunning = true ;
        if (iNeedsToRun ) {
            if ( iWorkItem.iObjectHash == KNullHash ) {
                // we have nothing particular to do, sleep for 10s and
                // query
                iModel.lock() ;
                bool found = false ;
                iNodesSuccessfullyConnected.clear();
                iNodesFailurefullyConnected.clear();
                iWorkItem = iModel.nextItemToPublish(&found) ;
                if ( !found ) {
                    iWorkItem.iObjectHash = KNullHash ;
                } else {
                    LOG_STR2("PublishingEngine::run got item to pub %d ", iWorkItem.iObjectType) ;
                    QList<Node *>* nodesToTry = NULL ;
                    switch ( iWorkItem.iObjectType ) {
                    case ClassifiedAd2NdAddr:
                    case UserProfileComment:
                    case DbRecord:
                        // user profile commments, ads with 2nd addr and 
                        // db records get published to seconddary address,
                        // not content address
                        nodesToTry    =
                            iModel.nodeModel().getNodesAfterHash(iWorkItem.i2NdAddr,
                                    20, // 20 nodes
                                    300 ) ;// at most 5 hours old
                        break ;
                    default:
                        // all other content types get published to
                        // primary address, or addr of the object itself
                        nodesToTry    =
                            iModel.nodeModel().getNodesAfterHash(iWorkItem.iObjectHash,
                                    20, // 20 nodes
                                    300 ) ;// at most 5 hours old
                        break ;
                    }

                    if ( nodesToTry == NULL || nodesToTry->size() == 0 ) {
                        // uh, oh. do not try then.
                        iWorkItem.iObjectHash = KNullHash ;
                        LOG_STR("But got no nodes to try? ") ;
                    } else {
                        emptyNodeCandidateList() ;
                        while ( ! nodesToTry->isEmpty() ) {
                            Node* connectCandidate ( nodesToTry->takeFirst() ) ;
                            if ( iModel.nodeModel().isNodeAlreadyConnected(*connectCandidate) ) {
                                publishToNode(connectCandidate->nodeFingerPrint()) ;
                                delete connectCandidate ;
                            } else {
                                iNodeCandidatesToTryPush.append(connectCandidate) ;
                            }
                        }
                        iStageOfPublish = InitialStage;
                    }
                    delete nodesToTry ;
                }
                iModel.unlock() ;
            }
            // ok, we've got something..?
            if ( iWorkItem.iObjectHash != KNullHash ) {
                switch ( iStageOfPublish ) {
                case InitialStage:
                    // ok, we just started. first look how many our desired nodes
                    // are already connected and do those:
                    sendPublishItemToAlreadyConnectedNodes() ;
                    iStageOfPublish = AwaitingConnection;
                    askConnectionsForNodesOnPublishList() ;
                    break ;
                case AwaitingConnection:
                    checkForSuccessfullyConnectedNodes() ;
                    checkForUnSuccessfullyConnectedNodes() ;
                    // check if we managed to run out of nodes:
                    if ( iNodeCandidatesToTryPush.isEmpty() ) {
                        // then pick up next publish-item and leave this behind
                        iWorkItem.iObjectHash = KNullHash ;
                        LOG_STR("PubEng: No more nodes to try ; picking up next item..") ;
                    }
                    break ;
                }
            }
            iNowRunning = false ;
        }
    }
}

void PublishingEngine::emptyNodeCandidateList() {
    while ( ! iNodeCandidatesToTryPush.isEmpty() ) {
        LOG_STR2("in emptyNodeCandidateList len of list is %d", (int)(iNodeCandidatesToTryPush.size())) ;
        Node* n = iNodeCandidatesToTryPush.takeFirst() ;
        delete n ;
    }
}

void 	  PublishingEngine::askConnectionsForNodesOnPublishList() {
    // this is run at publish begin: we have item, we have
    // connected nodes. publish happens automatically to
    // nodes that are connected but some nodes then remain..
    // here add the remaining nodes to network engines wishlist
    // so that it will attempt to connect:
    iModel.lock() ;
    for ( int i = 0 ; i < iNodeCandidatesToTryPush.size() ; i++ ) {
        Node* n = iNodeCandidatesToTryPush.at(i) ;
        // make copy of the node, because datamodel takes ownership:
        Node* wishListItem = Node::fromQVariant(n->asQVariant().toMap(), false) ;
        if ( wishListItem ) {
            iModel.nodeModel().addNodeToConnectionWishList(wishListItem) ;
        }
    }
    iModel.unlock() ;
}

void PublishingEngine::nodeConnectionAttemptStatus(Connection::ConnectionState aStatus,
        const Hash aHashOfAttemptedNode ) {
    LOG_STR2("PublishingEngine::nodeConnectionAttemptStatus %d in", aStatus) ;
    LOG_STR2("PublishingEngine::nodeConnectionAttemptStatus %s ", qPrintable(aHashOfAttemptedNode.toString())) ;
    // use model to lock our own resources too..
    iModel.lock() ;
    for ( int i = 0 ; i < iNodeCandidatesToTryPush.size() ; i++ ) {
        if ( iNodeCandidatesToTryPush[i]->nodeFingerPrint() == aHashOfAttemptedNode ) {
            if ( aStatus == Connection::Open ) {
                if ( ! iNodesSuccessfullyConnected.contains(aHashOfAttemptedNode) ) {
                    iNodesSuccessfullyConnected.append(aHashOfAttemptedNode) ;
                }
            } else {
                if ( ! iNodesFailurefullyConnected.contains(aHashOfAttemptedNode) ) {
                    iNodesFailurefullyConnected.append(aHashOfAttemptedNode) ;
                }
            }
            break ;
        }
    }
    iModel.unlock() ;
    // so just add the hash into 1 of our internal arrays, run() will pick up
    // the items from list and try to do something clever
}

void PublishingEngine::sendPublishItemToAlreadyConnectedNodes() {
    iModel.lock() ;
    const QList <Connection *>& currentlyOpenConnections ( iModel.getConnections() ) ;
    for ( int i = iNodeCandidatesToTryPush.size()-1 ;
            ( i >=0 ) && ( iWorkItem.iObjectHash != KNullHash ) ;
            i-- ) {
        for ( int j = currentlyOpenConnections.size()-1 ;
                ( j >= 0 ) && ( iWorkItem.iObjectHash != KNullHash ) ;
                j-- ) {
            const Node* conn_n = currentlyOpenConnections[j]->node() ;
            if ( ( conn_n!= NULL ) &&
                    ( conn_n->nodeFingerPrint() ==
                      iNodeCandidatesToTryPush[i]->nodeFingerPrint() ) ) {
                // yes, a match.
                Node* n = iNodeCandidatesToTryPush[i] ;
                // here check if that node already got it:
                for ( int k = 0 ; k < iWorkItem.iAlreadyPushedHosts.size() ; k++ ) {
                    LOG_STR2("iWorkItem.iAlreadyPushedHosts contains %u",
                             iWorkItem.iAlreadyPushedHosts[k]) ;
                }
                if ( iWorkItem.iAlreadyPushedHosts.size() == 0) {
                    LOG_STR("iWorkItem.iAlreadyPushedHosts is empty") ;
                }
                if ( iWorkItem.iAlreadyPushedHosts.contains(n->nodeFingerPrint().iHash160bits[4])) {
                    LOG_STR("Node already had this content ; skipping in publish") ;
                    // and also remove from list:
                    iNodeCandidatesToTryPush.takeAt(i) ;
                    delete n ;
                    n = NULL ;
                } else {
                    // note that publishtonode takes node from iAlreadyPushedHosts
                    // and deletes it ; after the line above, do not try
                    // to reference it again ..
                    publishToNode(n->nodeFingerPrint()) ;
                    n = NULL ; // and now gone
                }
                break ; // out from loop of j (so that i won't get tried again because it was just deleted
            }
        }
    }
    iModel.unlock() ;
}

void PublishingEngine::publishToNode(const Hash& aNode) {
    if ( iWorkItem.iObjectHash != KNullHash ) {
        if ( iWorkItem.iAlreadyPushedHosts.contains(aNode.iHash160bits[4])) {
            LOG_STR("Node already had this content ; skipping in publish") ;
        } else {
            struct NetworkRequestExecutor::NetworkRequestQueueItem sendReq ;
            sendReq.iDestinationNode = aNode ;
            sendReq.iRequestedItem = iWorkItem.iObjectHash;
            sendReq.iMaxNumberOfItems = 1 ;
            sendReq.iBangPath = iWorkItem.iAlreadyPushedHosts ;
            sendReq.iState = NetworkRequestExecutor::NewRequest ;

            switch ( iWorkItem.iObjectType ) {
            case UserProfile:
                sendReq.iRequestType =  UserProfilePublish ;
                iModel.addNetworkRequest(sendReq) ;
                iWorkItem = iModel.addNodeToPublishedItem(iWorkItem.iObjectHash, aNode) ;
                LOG_STR2("Published profile to node %s", qPrintable(aNode.toString())) ;
                LOG_STR2("Published profile to node low-order bits %u", aNode.iHash160bits[4]) ;
                LOG_STR2("Now work-item type is %d",(int) iWorkItem.iObjectType) ;
                for ( int i = 0 ; i <iWorkItem.iAlreadyPushedHosts.size() ; i++ ) {
                    LOG_STR2("Already published to to node low-order bits %u", iWorkItem.iAlreadyPushedHosts[i]) ;
                }
                break ;
            case BinaryBlob:
                sendReq.iRequestType =  BinaryFilePublish ;
                iModel.addNetworkRequest(sendReq) ;
                iWorkItem = iModel.addNodeToPublishedItem(iWorkItem.iObjectHash, aNode) ;
                LOG_STR2("Published binary file to node %s", qPrintable(aNode.toString())) ;
                LOG_STR2("Published binary file to node low-order bits %u", aNode.iHash160bits[4]) ;
                LOG_STR2("Now work-item type is %d",(int) iWorkItem.iObjectType) ;
                for ( int i = 0 ; i <iWorkItem.iAlreadyPushedHosts.size() ; i++ ) {
                    LOG_STR2("Already published to to node low-order bits %u", iWorkItem.iAlreadyPushedHosts[i]) ;
                }
                break ;
            case UserProfileComment:
                sendReq.iRequestType =  ProfileCommentPublish ;
                iModel.addNetworkRequest(sendReq) ;
                iWorkItem = iModel.addNodeToPublishedItem(iWorkItem.iObjectHash, aNode) ;
                LOG_STR2("Published profile comment to node %s", qPrintable(aNode.toString())) ;
                LOG_STR2("Published profile comment to node low-order bits %u", aNode.iHash160bits[4]) ;
                LOG_STR2("Now work-item type is %d",(int) iWorkItem.iObjectType) ;
                for ( int i = 0 ; i <iWorkItem.iAlreadyPushedHosts.size() ; i++ ) {
                    LOG_STR2("Already published to to node low-order bits %u", iWorkItem.iAlreadyPushedHosts[i]) ;
                }
                break ;
            case ClassifiedAdPublish:
            case ClassifiedAd2NdAddr:
                sendReq.iRequestType = ClassifiedAdPublish ;
                iModel.addNetworkRequest(sendReq) ;
                iWorkItem = iModel.addNodeToPublishedItem(iWorkItem.iObjectHash, aNode) ;
                LOG_STR2("Published ca to node %s", qPrintable(aNode.toString())) ;
                LOG_STR2("Published ca to node low-order bits %u", aNode.iHash160bits[4]) ;
                LOG_STR2("Now work-item type is %d",(int) iWorkItem.iObjectType) ;
                for ( int i = 0 ; i <iWorkItem.iAlreadyPushedHosts.size() ; i++ ) {
                    LOG_STR2("Already published to to node low-order bits %u", iWorkItem.iAlreadyPushedHosts[i]) ;
                }
                break ;
            case PrivateMessage:
                sendReq.iRequestType = PrivateMessagePublish ;
                iModel.addNetworkRequest(sendReq) ;
                iWorkItem = iModel.addNodeToPublishedItem(iWorkItem.iObjectHash, aNode) ;
                LOG_STR2("Published private msg to node %s", qPrintable(aNode.toString())) ;
                LOG_STR2("Published private msg to node low-order bits %u", aNode.iHash160bits[4]) ;
                LOG_STR2("Now work-item type is %d",(int) iWorkItem.iObjectType) ;
                for ( int i = 0 ; i <iWorkItem.iAlreadyPushedHosts.size() ; i++ ) {
                    LOG_STR2("Already published to to node low-order bits %u", iWorkItem.iAlreadyPushedHosts[i]) ;
                }
                break ;
            case DbRecord:
                sendReq.iRequestType = DbRecordPublish ;
                iModel.addNetworkRequest(sendReq) ;
                iWorkItem = iModel.addNodeToPublishedItem(iWorkItem.iObjectHash, aNode) ;
                LOG_STR2("Published db record to node %s", qPrintable(aNode.toString())) ;
                LOG_STR2("Published db record to node low-order bits %u", aNode.iHash160bits[4]) ;
                LOG_STR2("Now work-item type is %d",(int) iWorkItem.iObjectType) ;
                for ( int i = 0 ; i <iWorkItem.iAlreadyPushedHosts.size() ; i++ ) {
                    LOG_STR2("Already published to to node low-order bits %u", iWorkItem.iAlreadyPushedHosts[i]) ;
                }
                break ;
            default:
                LOG_STR2("TODO: unimplemented publish of type %d", iWorkItem.iObjectType) ;
                break ;
            }
        }
        for ( int i = iNodeCandidatesToTryPush.size()-1 ; i >= 0 ; i-- ) {
            if ( aNode == iNodeCandidatesToTryPush[i]->nodeFingerPrint() ) {
                Node* n = iNodeCandidatesToTryPush.takeAt(i) ;
                delete n ;
                break ;
            }
        }
        if ( iNodesFailurefullyConnected.contains ( aNode ) ) {
            iNodesFailurefullyConnected.removeOne(aNode) ;
        }
        if ( iNodesSuccessfullyConnected.contains ( aNode ) ) {
            iNodesSuccessfullyConnected.removeOne(aNode) ;
        }
    }
}
//
// this method is for periodically checking if any nodes we've
// asked to be connected to, has actually been connected.
// if such miracle has taken place, send the stuff and remove
// node from list-of-nodes-to-ask (iNodeCandidatesToTryPush)
//
void PublishingEngine::checkForSuccessfullyConnectedNodes() {
    iModel.lock() ;
    for ( int i = iNodeCandidatesToTryPush.size()-1 ;
            ( i >=0 ) && ( iWorkItem.iObjectHash != KNullHash ) ;
            i-- ) {
        if ( iNodesSuccessfullyConnected.contains(iNodeCandidatesToTryPush[i]->nodeFingerPrint()) ) {
            // if we came here it means that we've asked for connection to
            // particular node and it happened
            LOG_STR2("PubEng: After asking, node has been connected: %s", qPrintable(iNodeCandidatesToTryPush[i]->nodeFingerPrint().toString())) ;
            publishToNode(iNodeCandidatesToTryPush[i]->nodeFingerPrint()) ;
        }
    }
    // can be emptied every time ; if there were anything important, it
    // has been processed above
    iNodesSuccessfullyConnected.clear() ;
    iModel.unlock() ;
}

//
// this is for pruning from ask-list those nodes that we've tried
// to connect with no success
//
void PublishingEngine::checkForUnSuccessfullyConnectedNodes() {
    iModel.lock() ;
    for ( int i = iNodeCandidatesToTryPush.size()-1 ;
            ( i >=0 ) && ( iWorkItem.iObjectHash != KNullHash ) ;
            i-- ) {
        if ( iNodesFailurefullyConnected.contains(iNodeCandidatesToTryPush[i]->nodeFingerPrint()) ) {
            // if we came here it means that we've asked for connection to
            // particular node and it did not happen
            LOG_STR2("PubEng: After asking, node has not been connected: %s", qPrintable(iNodeCandidatesToTryPush[i]->nodeFingerPrint().toString())) ;
            Node* n = iNodeCandidatesToTryPush.takeAt(i) ;
            delete n ;
        }
    }
    // can be emptied every time ; if there were anything important, it
    // has been processed above
    iNodesFailurefullyConnected.clear() ;
    iModel.unlock() ;
}