1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404
|
// ============================================================================
/**
* @file Proactor_File_Test.cpp
*
* This program illustrates how the ACE_Proactor can be used to
* implement an application that does asynchronous file IO
* operations.
*
* @author Martin Corino <mcorino@remedy.nl>
*/
// ============================================================================
#include "test_config.h"
#if defined (ACE_HAS_WIN32_OVERLAPPED_IO) || defined (ACE_HAS_AIO_CALLS)
// This only works on Win32 platforms and on Unix platforms
// supporting POSIX aio calls.
//////////////////////////////////////////////////////////////////
// This sample application integrates asynch file
// read/write operations with the Proactor, using an ACE_FILE_Connector,
// ACE_FILE_IO, ACE_Asynch_Read_File, ACE_Asynch_Write_File in an ACE_Handler.
// The program sets up asynch read and write on a temporary file, and sends
// out 16 16-character bursts of data at timed intervals.
#include <stdio.h>
#include "ace/OS_NS_stdio.h"
#include "ace/OS_NS_unistd.h"
#include "ace/OS_NS_errno.h"
#include "ace/OS_NS_string.h"
#include "ace/OS_NS_sys_time.h"
#include "ace/FILE_Connector.h"
#include "ace/FILE_IO.h"
#include "ace/Proactor.h"
#include "ace/Asynch_Connector.h"
#include "ace/Time_Value.h"
// How long are our fake serial I/O frames?
#define FILE_FRAME_SIZE 16
class FileIOHandler : public ACE_Handler
{
public:
FileIOHandler ();
~FileIOHandler () override;
int
Connect();
// This method will be called when an asynchronous read
// completes on a file.
void
handle_read_file(const ACE_Asynch_Read_File::Result &result) override;
// This method will be called when an asynchronous write
// completes on a file.
void
handle_write_file(const ACE_Asynch_Write_File::Result &result) override;
// Callback hook invoked by the Proactor's Timer_Queue.
void
handle_time_out(const ACE_Time_Value &tv, const void *arg) override;
// Our I/O objects; they're public so others can access them
ACE_Asynch_Read_File reader_;
ACE_Asynch_Write_File writer_;
private:
int block_count_;
#if defined (ACE_WIN32)
bool read_pending_;
#endif
ACE_FILE_IO peer_;
ACE_FILE_Connector connector_;
};
FileIOHandler::FileIOHandler ()
: ACE_Handler ()
, block_count_ (0)
#if defined (ACE_WIN32)
, read_pending_ (false)
#endif
{
}
FileIOHandler::~FileIOHandler ()
{
ACE_FILE_Addr tmp_addr;
peer_.get_local_addr (tmp_addr);
if (tmp_addr.get_path_name ())
{
peer_.remove ();
}
}
//***************************************************************************
//
// Method: Connect
//
// Description: Establishes connection, primes read process
//
// Inputs: port name, port parameter block
//
// Returns: none
//
//***************************************************************************
int FileIOHandler::Connect()
{
int result = 0;
// create an empty temporary file for the test
if(connector_.connect(peer_,
ACE_sap_any_cast (ACE_FILE_Addr &)) != 0)
{
ACE_ERROR((LM_ERROR, ACE_TEXT("%p\n"),
ACE_TEXT("FileIOHandler connect failed to create file")));
result = -1;
}
// close opened file but leave it where it is
if (peer_.close () != 0)
{
ACE_ERROR((LM_ERROR, ACE_TEXT("%p\n"),
ACE_TEXT("FileIOHandler connect failed to close file")));
peer_.remove ();
result = -1;
}
// get file address
ACE_FILE_Addr tmp_addr;
peer_.get_local_addr (tmp_addr);
// reopen new file for asynch IO
if(connector_.connect(peer_,
tmp_addr,
0, //timeout
ACE_Addr::sap_any,
0, //reuse
O_RDWR |FILE_FLAG_OVERLAPPED) != 0)
{
ACE_ERROR((LM_ERROR, ACE_TEXT("%p\n"),
ACE_TEXT("FileIOHandler connect failed to open file")));
peer_.remove ();
result = -1;
}
else // device connected successfully
{
// keep track of our writes for offset calculations (can't use O_APPEND since
// this is not supported for the Win32_Asynch implementation) and data verifications
this->block_count_ = 0; // start counting
// Set our I/O handle to that of the peer_ object handling our connection
handle(peer_.get_handle());
if (writer_.open(*this) != 0 || reader_.open(*this) != 0)
{
ACE_ERROR(
(LM_ERROR, ACE_TEXT("%p\n"), ACE_TEXT("FileIOHandler reader or writer open failed")));
result = -1;
}
else // reader and writer opened successfully
{
// Allocate a new message block and initiate a read operation on it
// to prime the asynchronous read pipeline
// The message block is sized for the largest message we expect
ACE_Message_Block *mb;
ACE_NEW_NORETURN(mb, ACE_Message_Block(FILE_FRAME_SIZE));
if (reader_.read(*mb, mb->space()) != 0)
{
int errnr = ACE_OS::last_error ();
ACE_DEBUG(
(LM_INFO, ACE_TEXT("%p [%d]\n"), ACE_TEXT("FileIOHandler begin read failed"), errnr));
mb->release();
#if defined (ACE_WIN32)
// On older Win32 versions (WinXP, Win2003/2008) asynch IO with disk files is not
// reliable and may perform sync IO in certain cases like when the read offset denotes
// current end of file. Instead of scheduling a write operation the read will immediately
// return with an EOF error.
// We circumvent that situation here by not reporting an error and scheduling a read operation
// later when we are sure data has been written at the offset in question (after the write finishes).
if (errnr != ERROR_HANDLE_EOF)
#endif
result = -1;
}
#if defined (ACE_WIN32)
else
{
this->read_pending_ = true;
}
#endif
// If read worked, psMsg is now controlled by Proactor framework.
}
}
return result;
}
//***************************************************************************
//
// Method: handle_read_file
//
// Description: Callback used when a read completes
//
// Inputs: read file result structure containing message block
//
// Returns: none
//
//***************************************************************************
void
FileIOHandler::handle_read_file(const ACE_Asynch_Read_File::Result &result)
{
ACE_Message_Block &mb = result.message_block();
// If the read failed, queue up another one using the same message block
if (!result.success() || result.bytes_transferred() == 0)
{
//ACE_DEBUG((LM_INFO, ACE_TEXT("FileIOHandler receive timeout.\n")));
reader_.read(mb,
mb.space(),
result.offset () + result.bytes_transferred ());
}
else
{
// We have a message block with some read data in it. Send it onward
ACE_DEBUG((LM_INFO, ACE_TEXT("FileIOHandler received %d bytes of data at offset %d\n"),
result.bytes_transferred(), result.offset ()));
// TODO: Process this data in some meaningful way
if (result.offset () != (unsigned long)*reinterpret_cast<unsigned char*> (mb.rd_ptr ()))
{
ACE_DEBUG((LM_ERROR, ACE_TEXT("FileIOHandler received incorrect data: got [%u] expected [%u]\n"),
*reinterpret_cast<unsigned char*> (mb.rd_ptr ()), result.offset ()));
}
// Release the message block when we're done with it
mb.release();
if ((result.offset () + result.bytes_transferred ()) < 256)
{
// Our processing is done; prime the read process again
ACE_Message_Block *new_mb;
ACE_NEW_NORETURN(new_mb, ACE_Message_Block(FILE_FRAME_SIZE));
if (reader_.read(*new_mb, new_mb->space(),
result.offset () + result.bytes_transferred ()) != 0)
{
int errnr = ACE_OS::last_error ();
ACE_DEBUG(
(LM_INFO, ACE_TEXT("%p [%d]\n"), ACE_TEXT("FileIOHandler continuing read failed"), errnr));
new_mb->release();
#if defined (ACE_WIN32)
this->read_pending_ = false;
}
else
{
this->read_pending_ = true;
#endif
}
}
else
{
// we have it all; stop the proactor
ACE_Proactor::instance ()->proactor_end_event_loop ();
}
}
}
//***************************************************************************
//
// Method: handle_write_file
//
// Description: Callback used when a write completes
//
// Inputs: write file result structure containing message block
//
// Returns: none
//
//***************************************************************************
void
FileIOHandler::handle_write_file(const ACE_Asynch_Write_File::Result &result)
{
ACE_DEBUG((LM_INFO, ACE_TEXT("Finished write\n")));
// When the write completes, we get the message block. It's been sent,
// so we just deallocate it.
result.message_block().release();
#if defined (ACE_WIN32)
// to circumvent problems on older Win32 (see above) we schedule a read here if none
// is pending yet.
if (!this->read_pending_)
{
ACE_Message_Block *mb;
ACE_NEW_NORETURN(mb, ACE_Message_Block(FILE_FRAME_SIZE));
if (reader_.read(*mb, mb->space(),
(this->block_count_ - 1) * FILE_FRAME_SIZE) != 0)
{
int errnr = ACE_OS::last_error ();
ACE_DEBUG(
(LM_INFO, ACE_TEXT("%p [%d]\n"), ACE_TEXT("FileIOHandler read after write failed"), errnr));
mb->release();
}
else
{
this->read_pending_ = true;
}
}
#endif
}
//***************************************************************************
//
// Method: handle_time_out
//
// Description: Hook method called when a timer expires
//
// Inputs: time value, completion token passed to timer at scheduling
// The token tells us which timer we're handling
//
// Returns: none
//
//***************************************************************************
void
FileIOHandler::handle_time_out(const ACE_Time_Value & /*tv*/, const void * /*act*/)
{
// do not schedule more than 16 writes
if (this->block_count_ < 16)
{
// In our example, we send a bunch of data every time the timer expires
// setup the next payload
char payload[FILE_FRAME_SIZE];
for (int i=0; i<FILE_FRAME_SIZE ;++i)
{
payload[i] = (this->block_count_ * FILE_FRAME_SIZE) + i;
}
ACE_Message_Block *new_mb;
ACE_NEW_NORETURN(new_mb, ACE_Message_Block(FILE_FRAME_SIZE));
new_mb->copy(payload, FILE_FRAME_SIZE);
// queue up a write (append to end of file) operation, give visual feedback on success or failure.
if (this->writer_.write(*new_mb, new_mb->length(),
this->block_count_ * FILE_FRAME_SIZE) == 0)
{
ACE_DEBUG((LM_INFO, ACE_TEXT("Successfully queued write of %d bytes\n"), new_mb->length ())); // success
this->block_count_ ++; // next block
}
else
{
ACE_DEBUG((LM_ERROR, ACE_TEXT("FAILED to queue write operation\n"))); // failure
};
}
}
int
run_main(int /*argc*/, ACE_TCHAR * /*argv*/[])
{
ACE_START_TEST (ACE_TEXT ("Proactor_File_Test"));
int rc = 0;
FileIOHandler fileIOHandler;
// Initialize the serial port handler
if (0 != fileIOHandler.Connect())
{
rc = 1;
}
else
{
ACE_DEBUG((LM_INFO, ACE_TEXT(" File I/O Handler connected.\n")));
// start the repeating timer for data transmission
ACE_Time_Value repeatTime(0, 50000); // 0.05 second time interval
ACE_Proactor::instance()->schedule_repeating_timer(fileIOHandler,
(void *) (100),
repeatTime);
// Run the Proactor
ACE_Proactor::instance()->proactor_run_event_loop();
}
ACE_END_TEST;
return rc;
}
#else
int
run_main (int, ACE_TCHAR *[])
{
ACE_START_TEST (ACE_TEXT ("Proactor_File_Test"));
ACE_DEBUG ((LM_INFO,
ACE_TEXT ("Asynchronous IO is unsupported.\n")
ACE_TEXT ("Proactor_File_Test will not be run.\n")));
ACE_END_TEST;
return 0;
}
#endif /* ACE_HAS_WIN32_OVERLAPPED_IO || ACE_HAS_AIO_CALLS */
|