1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212
|
#if __APPLE__ || __FreeBSD__
int main(int argc, char ** argv) { return 0; }
#else
#include <fcntl.h>
#include <port/unistd.h>
#include <stdlib.h>
#include <time.h>
#include <iostream>
#include <iomanip>
#include <vector>
#include <Poco/Exception.h>
#include <Common/Exception.h>
#include <common/ThreadPool.h>
#include <Common/Stopwatch.h>
#include <IO/BufferWithOwnMemory.h>
#include <IO/ReadHelpers.h>
#include <stdlib.h>
#include <fcntl.h>
#include <stdlib.h>
#include <stdio.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <IO/AIO.h>
#if !defined(__APPLE__) && !defined(__FreeBSD__)
#include <malloc.h>
#endif
#include <sys/syscall.h>
namespace DB
{
namespace ErrorCodes
{
extern const int CANNOT_OPEN_FILE;
extern const int CANNOT_CLOSE_FILE;
extern const int CANNOT_IO_SUBMIT;
extern const int CANNOT_IO_GETEVENTS;
}
}
enum Mode
{
MODE_READ = 1,
MODE_WRITE = 2,
};
void thread(int fd, int mode, size_t min_offset, size_t max_offset, size_t block_size, size_t buffers_count, size_t count)
{
using namespace DB;
AIOContext ctx;
std::vector<Memory> buffers(buffers_count);
for (size_t i = 0; i < buffers_count; ++i)
buffers[i] = Memory(block_size, sysconf(_SC_PAGESIZE));
drand48_data rand_data;
timespec times;
clock_gettime(CLOCK_THREAD_CPUTIME_ID, ×);
srand48_r(times.tv_nsec, &rand_data);
size_t in_progress = 0;
size_t blocks_sent = 0;
std::vector<bool> buffer_used(buffers_count, false);
std::vector<iocb> iocbs(buffers_count);
std::vector<iocb*> query_cbs;
std::vector<io_event> events(buffers_count);
while (blocks_sent < count || in_progress > 0)
{
/// Prepare queries.
query_cbs.clear();
for (size_t i = 0; i < buffers_count; ++i)
{
if (blocks_sent >= count || in_progress >= buffers_count)
break;
if (buffer_used[i])
continue;
buffer_used[i] = true;
++blocks_sent;
++in_progress;
char * buf = buffers[i].data();
long rand_result1 = 0;
long rand_result2 = 0;
long rand_result3 = 0;
lrand48_r(&rand_data, &rand_result1);
lrand48_r(&rand_data, &rand_result2);
lrand48_r(&rand_data, &rand_result3);
size_t rand_result = rand_result1 ^ (rand_result2 << 22) ^ (rand_result3 << 43);
size_t offset = min_offset + rand_result % ((max_offset - min_offset) / block_size) * block_size;
iocb & cb = iocbs[i];
memset(&cb, 0, sizeof(cb));
cb.aio_buf = reinterpret_cast<UInt64>(buf);
cb.aio_fildes = fd;
cb.aio_nbytes = block_size;
cb.aio_offset = offset;
cb.aio_data = static_cast<UInt64>(i);
if (mode == MODE_READ)
{
cb.aio_lio_opcode = IOCB_CMD_PREAD;
}
else
{
cb.aio_lio_opcode = IOCB_CMD_PWRITE;
}
query_cbs.push_back(&cb);
}
/// Send queries.
if (io_submit(ctx.ctx, query_cbs.size(), &query_cbs[0]) < 0)
throwFromErrno("io_submit failed", ErrorCodes::CANNOT_IO_SUBMIT);
/// Receive answers. If we have something else to send, then receive at least one answer (after that send them), otherwise wait all answers.
memset(&events[0], 0, buffers_count * sizeof(events[0]));
int evs = io_getevents(ctx.ctx, (blocks_sent < count ? 1 : in_progress), buffers_count, &events[0], nullptr);
if (evs < 0)
throwFromErrno("io_getevents failed", ErrorCodes::CANNOT_IO_GETEVENTS);
for (int i = 0; i < evs; ++i)
{
int b = static_cast<int>(events[i].data);
if (events[i].res != static_cast<int>(block_size))
throw Poco::Exception("read/write error");
--in_progress;
buffer_used[b] = false;
}
}
}
int mainImpl(int argc, char ** argv)
{
using namespace DB;
const char * file_name = 0;
int mode = MODE_READ;
UInt64 min_offset = 0;
UInt64 max_offset = 0;
UInt64 block_size = 0;
UInt64 buffers_count = 0;
UInt64 threads_count = 0;
UInt64 count = 0;
if (argc != 9)
{
std::cerr << "Usage: " << argv[0] << " file_name r|w min_offset max_offset block_size threads buffers count" << std::endl;
return 1;
}
file_name = argv[1];
if (argv[2][0] == 'w')
mode = MODE_WRITE;
min_offset = parse<UInt64>(argv[3]);
max_offset = parse<UInt64>(argv[4]);
block_size = parse<UInt64>(argv[5]);
threads_count = parse<UInt64>(argv[6]);
buffers_count = parse<UInt64>(argv[7]);
count = parse<UInt64>(argv[8]);
int fd = open(file_name, ((mode == MODE_READ) ? O_RDONLY : O_WRONLY) | O_DIRECT);
if (-1 == fd)
throwFromErrno("Cannot open file", ErrorCodes::CANNOT_OPEN_FILE);
ThreadPool pool(threads_count);
Stopwatch watch;
for (size_t i = 0; i < threads_count; ++i)
pool.schedule(std::bind(thread, fd, mode, min_offset, max_offset, block_size, buffers_count, count));
pool.wait();
watch.stop();
if (0 != close(fd))
throwFromErrno("Cannot close file", ErrorCodes::CANNOT_CLOSE_FILE);
std::cout << std::fixed << std::setprecision(2)
<< "Done " << count << " * " << threads_count << " ops";
std::cout << " in " << watch.elapsedSeconds() << " sec."
<< ", " << count * threads_count / watch.elapsedSeconds() << " ops/sec."
<< ", " << count * threads_count * block_size / watch.elapsedSeconds() / 1000000 << " MB/sec."
<< std::endl;
return 0;
}
int main(int argc, char ** argv)
{
try
{
return mainImpl(argc, argv);
}
catch (const Poco::Exception & e)
{
std::cerr << e.what() << ", " << e.message() << std::endl;
return 1;
}
}
#endif
|