File: message_queue_test.cpp

package info (click to toggle)
boost1.35 1.35.0-5
  • links: PTS
  • area: main
  • in suites: lenny
  • size: 203,856 kB
  • ctags: 337,867
  • sloc: cpp: 938,683; xml: 56,847; ansic: 41,589; python: 18,999; sh: 11,566; makefile: 664; perl: 494; yacc: 456; asm: 353; csh: 6
file content (264 lines) | stat: -rw-r--r-- 8,243 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
//////////////////////////////////////////////////////////////////////////////
//
// (C) Copyright Ion Gaztanaga 2004-2007. Distributed under the Boost
// Software License, Version 1.0. (See accompanying file
// LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
// See http://www.boost.org/libs/interprocess for documentation.
//
//////////////////////////////////////////////////////////////////////////////

#include <boost/interprocess/detail/config_begin.hpp>
#include <boost/interprocess/ipc/message_queue.hpp>
#include <boost/interprocess/managed_external_buffer.hpp>
#include <boost/interprocess/managed_heap_memory.hpp>
#include <boost/interprocess/containers/map.hpp>
#include <boost/interprocess/containers/set.hpp>
#include <boost/interprocess/allocators/node_allocator.hpp>
#include <vector>
#include <cstddef>
#include <limits>
#include <boost/thread.hpp>
#include <memory>
#include <string>
#include "get_process_id_name.hpp"

#ifdef max
#undef max
#endif

////////////////////////////////////////////////////////////////////////////////
//                                                                            //
//  This example tests the process shared message queue.                      //
//                                                                            //
////////////////////////////////////////////////////////////////////////////////

using namespace boost::interprocess;

//This test inserts messages with different priority and marks them with a 
//time-stamp to check if receiver obtains highest priority messages first and
//messages with same priority are received in fifo order
bool test_priority_order()
{
   message_queue::remove(test::get_process_id_name());
   {
      message_queue mq1
         (open_or_create, test::get_process_id_name(), 100, sizeof(std::size_t)),
         mq2
         (open_or_create, test::get_process_id_name(), 100, sizeof(std::size_t));

      //We test that the queue is ordered by priority and in the 
      //same priority, is a FIFO
      std::size_t recvd = 0;
      unsigned int priority = 0;
      std::size_t tstamp;

      //We will send 100 message with priority 0-9
      //The message will contain the timestamp of the message
      for(std::size_t i = 0; i < 100; ++i){
         tstamp = i;
         mq1.send(&tstamp, sizeof(tstamp), (unsigned int)(i%10));
      }

      unsigned int priority_prev = std::numeric_limits<unsigned int>::max();
      std::size_t  tstamp_prev = 0;

      //Receive all messages and test those are ordered
      //by priority and by FIFO in the same priority
      for(std::size_t i = 0; i < 100; ++i){
         mq1.receive(&tstamp, sizeof(tstamp), recvd, priority);
         if(priority > priority_prev)
            return false;
         if(priority == priority_prev &&
            tstamp   <= tstamp_prev){
            return false;
         }
         priority_prev  = priority;
         tstamp_prev    = tstamp;
      }
   }
   message_queue::remove(test::get_process_id_name());
   return true;
}

//[message_queue_test_test_serialize_db
//This test creates a in memory data-base using Interprocess machinery and 
//serializes it through a message queue. Then rebuilds the data-base in 
//another buffer and checks it against the original data-base
bool test_serialize_db()
{
   //Typedef data to create a Interprocess map   
   typedef std::pair<const std::size_t, std::size_t> MyPair;
   typedef std::less<std::size_t>   MyLess;
   typedef node_allocator<MyPair, managed_external_buffer::segment_manager>
      node_allocator_t;
   typedef map<std::size_t, 
               std::size_t, 
               std::less<std::size_t>, 
               node_allocator_t>
               MyMap;

   //Some constants
   const std::size_t BufferSize  = 65536;
   const std::size_t MaxMsgSize  = 100;

   //Allocate a memory buffer to hold the destiny database using vector<char>
   std::vector<char> buffer_destiny(BufferSize, 0);

   message_queue::remove(test::get_process_id_name());
   {
      //Create the message-queues
      message_queue mq1(create_only, test::get_process_id_name(), 1, MaxMsgSize);

      //Open previously created message-queue simulating other process
      message_queue mq2(open_only, test::get_process_id_name());

      //A managed heap memory to create the origin database
      managed_heap_memory db_origin(buffer_destiny.size());

      //Construct the map in the first buffer
      MyMap *map1 = db_origin.construct<MyMap>("MyMap")
                                       (MyLess(), 
                                       db_origin.get_segment_manager());
      if(!map1)
         return false;

      //Fill map1 until is full 
      try{
         std::size_t i = 0;
         while(1){
            (*map1)[i] = i;
            ++i;
         }
      }
      catch(boost::interprocess::bad_alloc &){}

      //Data control data sending through the message queue
      std::size_t sent = 0;
      std::size_t recvd = 0;
      std::size_t total_recvd = 0;
      unsigned int priority;

      //Send whole first buffer through the mq1, read it 
      //through mq2 to the second buffer
      while(1){
         //Send a fragment of buffer1 through mq1
         std::size_t bytes_to_send = MaxMsgSize < (db_origin.get_size() - sent) ? 
                                       MaxMsgSize : (db_origin.get_size() - sent);
         mq1.send( &static_cast<char*>(db_origin.get_address())[sent]
               , bytes_to_send
               , 0);
         sent += bytes_to_send;
         //Receive the fragment through mq2 to buffer_destiny
         mq2.receive( &buffer_destiny[total_recvd]
                  , BufferSize - recvd
                  , recvd
                  , priority);
         total_recvd += recvd;

         //Check if we have received all the buffer
         if(total_recvd == BufferSize){
            break;
         }
      }
      
      //The buffer will contain a copy of the original database 
      //so let's interpret the buffer with managed_external_buffer
      managed_external_buffer db_destiny(open_only, &buffer_destiny[0], BufferSize);

      //Let's find the map
      std::pair<MyMap *, std::size_t> ret = db_destiny.find<MyMap>("MyMap");
      MyMap *map2 = ret.first;

      //Check if we have found it
      if(!map2){
         return false;
      }

      //Check if it is a single variable (not an array)
      if(ret.second != 1){
         return false;
      }

      //Now let's compare size
      if(map1->size() != map2->size()){
         return false;
      }

      //Now let's compare all db values
      for(std::size_t i = 0, num_elements = map1->size(); i < num_elements; ++i){
         if((*map1)[i] != (*map2)[i]){
            return false;
         }
      }
      
      //Destroy maps from db-s
      db_origin.destroy_ptr(map1);
      db_destiny.destroy_ptr(map2);
   }
   message_queue::remove(test::get_process_id_name());
   return true;
}
//]
static const int MsgSize = 10;
static const int NumMsg  = 1000;
static char msgsend [10];
static char msgrecv [10];


static boost::interprocess::message_queue *pmessage_queue;

void receiver()
{
   std::size_t recvd_size;
   unsigned int priority;
   int nummsg = NumMsg;

   while(nummsg--){
      pmessage_queue->receive(msgrecv, MsgSize, recvd_size, priority);
   }
}

bool test_buffer_overflow()
{
   boost::interprocess::message_queue::remove(test::get_process_id_name());
   {
      std::auto_ptr<boost::interprocess::message_queue>
         ptr(new boost::interprocess::message_queue
               (create_only, test::get_process_id_name(), 10, 10));
      pmessage_queue = ptr.get();

      //Launch the receiver thread
      boost::thread thread(&receiver);
      boost::thread::yield();

      int nummsg = NumMsg;

      while(nummsg--){
         pmessage_queue->send(msgsend, MsgSize, 0);
      }

      thread.join();
   }
   boost::interprocess::message_queue::remove(test::get_process_id_name());
   return true;
}

int main ()
{
   if(!test_priority_order()){ 
      return 1;
   }

   if(!test_serialize_db()){ 
      return 1;
   }

   if(!test_buffer_overflow()){ 
      return 1;
   }

   return 0;
}

#include <boost/interprocess/detail/config_end.hpp>