File: LinuxIO.cpp

package info (click to toggle)
storm-lang 0.7.5-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 52,028 kB
  • sloc: ansic: 261,471; cpp: 140,432; sh: 14,891; perl: 9,846; python: 2,525; lisp: 2,504; asm: 860; makefile: 678; pascal: 70; java: 52; xml: 37; awk: 12
file content (145 lines) | stat: -rw-r--r-- 4,721 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
#include "stdafx.h"
#include "LinuxIO.h"

#ifdef LINUX_IO_URING

#include <sys/mman.h>
#include <sys/syscall.h>

namespace os {

	// Define system calls that usually don't have wrappers (they are in liburing, which we don't
	// want to depend on). Luckily, the data structures and system call numbers are defined in
	// system headers at least! Names of parameters are from the manpages from liburing.

	int io_uring_setup(unsigned entries, struct io_uring_params *p) {
		return (int)syscall(__NR_io_uring_setup, entries, p);
	}

	int io_uring_register(int fd, unsigned opcode, void *arg, unsigned nr_args) {
		return (int)syscall(__NR_io_uring_register, fd, opcode, arg, nr_args);
	}

	int io_uring_enter(int ring_fd, unsigned to_submit, unsigned min_complete, unsigned flags) {
		return (int)syscall(__NR_io_uring_enter, ring_fd, to_submit, min_complete, flags, NULL, 0);
	}


	LinuxIO::LinuxIO() {
		struct io_uring_params params;
		memset(&params, 0, sizeof(params));

		// Specify the size of the completion queue. Since we call 'io_uring_enter' after each
		// submission, we don't need many entries in the submission queue at all.
		const unsigned submission_size = 2;
		params.flags = IORING_SETUP_CQSIZE;
		params.cq_entries = LINUX_IO_URING_QUEUE;

		// TODO? We could use the IORING_SETUP_ATTACH_WQ to use the same kernel thread pool for all
		// threads in Storm. I am not sure if that is desirable or not.
		uringfd = io_uring_setup(submission_size, &params);

		submission.size = params.sq_off.array + params.sq_entries * sizeof(uint32_t);
		submissionArray.size = params.sq_entries * sizeof(struct io_uring_sqe);
		completion.size = params.cq_off.cqes + params.cq_entries * sizeof(struct io_uring_cqe);

		if (params.features & IORING_FEAT_SINGLE_MMAP) {
			submission.size = max(submission.size, completion.size);
			submission.map(uringfd, IORING_OFF_SQ_RING);

			// This makes 'completion' not munmap the allocation from 'submission'.
			completion.size = 0;
			completion.base = submission.base;
		} else {
			submission.map(uringfd, IORING_OFF_SQ_RING);
			completion.map(uringfd, IORING_OFF_CQ_RING);
		}

		submissionArray.map(uringfd, IORING_OFF_SQES);

		submission.init(params.sq_off);
		completion.init(params.cq_off);
	}

	LinuxIO::~LinuxIO() {
		close(uringfd);
	}

	void LinuxIO::attachEventfd(int fd) {
		int result = io_uring_register(uringfd, IORING_REGISTER_EVENTFD, &fd, 1);
		if (result != 0) {
			perror("io_uring_register");
			assert(false);
		}
	}

	void LinuxIO::submit(const struct io_uring_sqe &job) {
		// Note: We don't actually need to check 'head' since 'io_uring_enter' guarantees that
		// elements from the submission queue are consumed when it returns.
		uint32_t tail = atomicRead(*submission.tail);

		uint32_t index = tail & *submission.ringMask;
		submissionArray[index] = job;
		// I don't think we necessarily need to have this identity-mapped, but it is convenient.
		submission.sqes[index] = index;

		atomicWrite(*submission.tail, tail + 1);

		io_uring_enter(uringfd, 1, 0, 0);
	}

	bool LinuxIO::get(struct io_uring_cqe &out) {
		uint32_t head = atomicRead(*completion.head);
		uint32_t tail = atomicRead(*completion.tail);

		// Empty?
		if (head == tail)
			return false;

		out = completion.cqes[head & *completion.ringMask];

		atomicWrite(*completion.head, head + 1);

		return true;
	}


	LinuxIO::Memory::Memory() : base(null), size(0) {}

	LinuxIO::Memory::~Memory() {
		if (base != null && size > 0)
			munmap(base, size);
		base = null;
	}

	void LinuxIO::Memory::map(int fd, size_t offset) {
		base = mmap(0, size, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE, fd, offset);
		if (base == null) {
			perror("mmap");
			assert(false);
		}
	}

	void LinuxIO::SubmissionQueue::init(struct io_sqring_offsets &offsets) {
		byte *b = reinterpret_cast<byte *>(base);
		head = reinterpret_cast<uint32_t *>(b + offsets.head);
		tail = reinterpret_cast<uint32_t *>(b + offsets.tail);
		ringMask = reinterpret_cast<uint32_t *>(b + offsets.ring_mask);
		ringEntries = reinterpret_cast<uint32_t *>(b + offsets.ring_entries);
		flags = reinterpret_cast<uint32_t *>(b + offsets.flags);
		sqes = reinterpret_cast<uint32_t *>(b + offsets.array);
	}

	void LinuxIO::CompletionQueue::init(struct io_cqring_offsets &offsets) {
		byte *b = reinterpret_cast<byte *>(base);
		head = reinterpret_cast<uint32_t *>(b + offsets.head);
		tail = reinterpret_cast<uint32_t *>(b + offsets.tail);
		ringMask = reinterpret_cast<uint32_t *>(b + offsets.ring_mask);
		ringEntries = reinterpret_cast<uint32_t *>(b + offsets.ring_entries);
		flags = reinterpret_cast<uint32_t *>(b + offsets.flags);
		cqes = reinterpret_cast<struct io_uring_cqe *>(b + offsets.cqes);
	}

}

#endif