1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145
|
#include "stdafx.h"
#include "LinuxIO.h"
#ifdef LINUX_IO_URING
#include <sys/mman.h>
#include <sys/syscall.h>
namespace os {
// Define system calls that usually don't have wrappers (they are in liburing, which we don't
// want to depend on). Luckily, the data structures and system call numbers are defined in
// system headers at least! Names of parameters are from the manpages from liburing.
int io_uring_setup(unsigned entries, struct io_uring_params *p) {
return (int)syscall(__NR_io_uring_setup, entries, p);
}
int io_uring_register(int fd, unsigned opcode, void *arg, unsigned nr_args) {
return (int)syscall(__NR_io_uring_register, fd, opcode, arg, nr_args);
}
int io_uring_enter(int ring_fd, unsigned to_submit, unsigned min_complete, unsigned flags) {
return (int)syscall(__NR_io_uring_enter, ring_fd, to_submit, min_complete, flags, NULL, 0);
}
LinuxIO::LinuxIO() {
struct io_uring_params params;
memset(¶ms, 0, sizeof(params));
// Specify the size of the completion queue. Since we call 'io_uring_enter' after each
// submission, we don't need many entries in the submission queue at all.
const unsigned submission_size = 2;
params.flags = IORING_SETUP_CQSIZE;
params.cq_entries = LINUX_IO_URING_QUEUE;
// TODO? We could use the IORING_SETUP_ATTACH_WQ to use the same kernel thread pool for all
// threads in Storm. I am not sure if that is desirable or not.
uringfd = io_uring_setup(submission_size, ¶ms);
submission.size = params.sq_off.array + params.sq_entries * sizeof(uint32_t);
submissionArray.size = params.sq_entries * sizeof(struct io_uring_sqe);
completion.size = params.cq_off.cqes + params.cq_entries * sizeof(struct io_uring_cqe);
if (params.features & IORING_FEAT_SINGLE_MMAP) {
submission.size = max(submission.size, completion.size);
submission.map(uringfd, IORING_OFF_SQ_RING);
// This makes 'completion' not munmap the allocation from 'submission'.
completion.size = 0;
completion.base = submission.base;
} else {
submission.map(uringfd, IORING_OFF_SQ_RING);
completion.map(uringfd, IORING_OFF_CQ_RING);
}
submissionArray.map(uringfd, IORING_OFF_SQES);
submission.init(params.sq_off);
completion.init(params.cq_off);
}
LinuxIO::~LinuxIO() {
close(uringfd);
}
void LinuxIO::attachEventfd(int fd) {
int result = io_uring_register(uringfd, IORING_REGISTER_EVENTFD, &fd, 1);
if (result != 0) {
perror("io_uring_register");
assert(false);
}
}
void LinuxIO::submit(const struct io_uring_sqe &job) {
// Note: We don't actually need to check 'head' since 'io_uring_enter' guarantees that
// elements from the submission queue are consumed when it returns.
uint32_t tail = atomicRead(*submission.tail);
uint32_t index = tail & *submission.ringMask;
submissionArray[index] = job;
// I don't think we necessarily need to have this identity-mapped, but it is convenient.
submission.sqes[index] = index;
atomicWrite(*submission.tail, tail + 1);
io_uring_enter(uringfd, 1, 0, 0);
}
bool LinuxIO::get(struct io_uring_cqe &out) {
uint32_t head = atomicRead(*completion.head);
uint32_t tail = atomicRead(*completion.tail);
// Empty?
if (head == tail)
return false;
out = completion.cqes[head & *completion.ringMask];
atomicWrite(*completion.head, head + 1);
return true;
}
LinuxIO::Memory::Memory() : base(null), size(0) {}
LinuxIO::Memory::~Memory() {
if (base != null && size > 0)
munmap(base, size);
base = null;
}
void LinuxIO::Memory::map(int fd, size_t offset) {
base = mmap(0, size, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE, fd, offset);
if (base == null) {
perror("mmap");
assert(false);
}
}
void LinuxIO::SubmissionQueue::init(struct io_sqring_offsets &offsets) {
byte *b = reinterpret_cast<byte *>(base);
head = reinterpret_cast<uint32_t *>(b + offsets.head);
tail = reinterpret_cast<uint32_t *>(b + offsets.tail);
ringMask = reinterpret_cast<uint32_t *>(b + offsets.ring_mask);
ringEntries = reinterpret_cast<uint32_t *>(b + offsets.ring_entries);
flags = reinterpret_cast<uint32_t *>(b + offsets.flags);
sqes = reinterpret_cast<uint32_t *>(b + offsets.array);
}
void LinuxIO::CompletionQueue::init(struct io_cqring_offsets &offsets) {
byte *b = reinterpret_cast<byte *>(base);
head = reinterpret_cast<uint32_t *>(b + offsets.head);
tail = reinterpret_cast<uint32_t *>(b + offsets.tail);
ringMask = reinterpret_cast<uint32_t *>(b + offsets.ring_mask);
ringEntries = reinterpret_cast<uint32_t *>(b + offsets.ring_entries);
flags = reinterpret_cast<uint32_t *>(b + offsets.flags);
cqes = reinterpret_cast<struct io_uring_cqe *>(b + offsets.cqes);
}
}
#endif
|