Peano
Loading...
Searching...
No Matches
dma.h
Go to the documentation of this file.
1#pragma once
2
3#include <aio.h>
4#include <fcntl.h>
5#include <unistd.h>
6
7#include <lang/assert.h>
8#include <lang/type.h>
9
10constexpr i32 DMA_FLAGS = O_RDWR | O_CREAT | O_NONBLOCK | O_TRUNC | O_DIRECT;
11// O_DIRECT works only when all of the above are true:
12// - buffer is BLOCK-aligned (typically 512 bytes) this happens automatically for ~1kiB+ allocations
13// - length to be written is divisible by BLOCK-size
14// - offset is divisible by BLOCK-size
15
16template<u64 T_BUFF_SIZE = 1024 * 1024 * 1, u8 WIDTH = 4, i32 BLOCK = 512>
17class DmaWriter {
18 static constexpr u64 BUFF_SIZE() {
19 return T_BUFF_SIZE / WIDTH;
20 }
21
22 // for fast path the buffer size has to be a multiple of BLOCK size
23 static_assert(T_BUFF_SIZE % BLOCK == 0);
24
25 static_assert(BUFF_SIZE() >= BLOCK);
26
27 aiocb *cbs = nullptr;
28
29 u64 len = 0;
33 i32 fd = -1;
34 u8 idx = 0;
35
36 aiocb *getCb() {
37 return this->cbs + this->idx;
38 }
39
40 aiocb *pickNextCb() {
41 auto localIdx = (this->idx + 1) % WIDTH;
42
43 while (true) {
44 auto *cbCandidate = this->cbs + localIdx;
45
46 if (cbCandidate->aio_fildes == -1) [[unlikely]] {
47 this->idx = localIdx;
48
49 return cbCandidate;
50 }
51
52 auto error = aio_error(cbCandidate);
53 if (error != EINPROGRESS) [[unlikely]] {
54 assert(error == 0)
55
56 auto bytes = aio_return(cbCandidate);
57 assert(bytes == cbCandidate->aio_nbytes)
58
59 this->idx = localIdx;
60 cbCandidate->aio_fildes = -1;
61
62 return cbCandidate;
63 }
64
65 localIdx = (localIdx + 1) % WIDTH;
66 }
67 }
68
69 void syncCb(aiocb *cb) {
70 if (cb->aio_fildes < 0) [[unlikely]] return;
71 auto error = aio_error(cb);
72
73 while (error == EINPROGRESS) {
74 error = aio_error(cb);
75 }
76
77 assert(error == 0)
78
79 auto bytes = aio_return(cb);
80 assert(bytes == cb->aio_nbytes)
81 }
82
83 /*
84 * We have to adhere to the BLOCK boundaries
85 */
86 void flushCb(aiocb *cb) {
87 cb->aio_fildes = this->fd;
88 cb->aio_offset = this->offset;
89
90 this->max_len = std::max(this->max_len, this->offset + this->len);
91
92 auto postfill = (BLOCK - this->len % BLOCK) % BLOCK;
93
94 if (this->prefill == 0 & postfill == 0) [[likely]] { // fast path, we're updating whole block(s)
95 cb->aio_nbytes = this->len;
96
97 auto code = aio_write(cb);
98 assert(code == 0)
99
100 this->offset += this->len;
101 this->len = 0;
102
103 return;
104 }
105
106 alignas(BLOCK) char buf[BLOCK] = {};
107
108 if (this->prefill > 0) {
109 auto bytes = pread(this->fd, buf, BLOCK, this->offset);
110 assert(bytes >= 0) // usually bytes == this->prefill unless seek() is called before any writes
111
112 memcpy((char *) cb->aio_buf, buf, this->prefill);
113 }
114
115 if (postfill == 0) {
116 cb->aio_nbytes = this->len;
117
118 auto code = aio_write(cb);
119 assert(code == 0)
120
121 this->offset += this->len;
122 this->len = 0;
123
124 return;
125 }
126
127 if (this->len + postfill > BLOCK | this->prefill == 0) {
128 // write spans multiple buffers (or prefill is zero), cannot reuse buf
129 auto bytes = pread(this->fd, buf, BLOCK, this->offset);
130 assert(bytes >= 0)
131 }
132
133 memcpy((char *) cb->aio_buf + this->len, buf + BLOCK - postfill, postfill);
134
135 cb->aio_nbytes = this->len + postfill;
136
137 auto code = aio_write(cb);
138 assert(code == 0)
139
140 this->offset += this->len + postfill - BLOCK;
141 this->prefill = BLOCK - postfill;
142 this->len = this->prefill;
143 }
144
146 if (this->len == 0 | this->len == this->prefill) [[unlikely]] return;
147
148 this->flushCb(this->getCb());
149
150 this->pickNextCb();
151 }
152
153 char *getBuf() {
154 return ((char *) this->getCb()->aio_buf) + this->len;
155 }
156
157 struct alignas(BLOCK) BUF {
158 char b[BUFF_SIZE()];
159 };
160
161public:
162 explicit DmaWriter(i32 fd) noexcept: fd(fd), cbs(nullptr) {
163 this->cbs = new aiocb[WIDTH];
164 std::memset(this->cbs, 0, sizeof(aiocb) * WIDTH);
165 for (i32 i = 0; i < WIDTH; i++) {
166 this->cbs[i].aio_fildes = -1;
167 this->cbs[i].aio_buf = new BUF;
168 assert(((u64) this->cbs[i].aio_buf) % BLOCK == 0) // buffers must be BLOCK-aligned
169 }
170 }
171
172 explicit DmaWriter() = default;
173
174 DmaWriter(const DmaWriter &) = delete;
175
176 DmaWriter& operator=(DmaWriter&& other) noexcept {
177 if (this->cbs) {
178 for (i32 i = 0; i < WIDTH; i++) delete (BUF *) this->cbs[i].aio_buf;
179 delete[] this->cbs;
180 }
181 this->cbs = other.cbs;
182
183 this->len = other.len;
184 this->offset = other.offset;
185 this->max_len = other.max_len;
186 this->prefill = other.prefill;
187 this->fd = other.fd;
188 this->idx = other.idx;
189
190 other.cbs = nullptr;
191 other.fd = -1;
192
193 return *this;
194 }
195
196 template<typename T>
197 requires (!std::is_pointer_v<T>)
198 void write(T val) {
199 static_assert(sizeof(T) <= BUFF_SIZE());
200
201 if (this->len + sizeof(T) <= BUFF_SIZE()) [[likely]] {
202 *((T *) this->getBuf()) = val;
203 this->len += sizeof(T);
204 return;
205 }
206
207 this->write(&val, sizeof(T));
208 }
209
210 void write(std::string_view s) {
211 this->write(s.data(), s.length());
212 }
213
214 void write(const void *ptr, u64 n) {
215 auto *data = (char *) ptr;
216 auto freeCap = BUFF_SIZE() - this->len;
217
218 do {
219 auto bytesToWrite = std::min(n, freeCap);
220 memcpy(this->getBuf(), data, bytesToWrite);
221
222 if (this->len == BUFF_SIZE()) this->flushCbAsync();
223
224 data += bytesToWrite;
225 n -= bytesToWrite;
226 this->len += bytesToWrite;
227 freeCap = BUFF_SIZE() - this->len;
228 } while (n > 0);
229 }
230
231 void seek(u64 pos) {
232 this->sync();
233
234 this->prefill = pos % BLOCK;
235 this->offset = pos - this->prefill;
236 assert(this->offset % BLOCK == 0)
237 this->len = this->prefill;
238 }
239
240 void *buffer(u64 n) {
241 if (this->len + n > BUFF_SIZE()) [[unlikely]] return nullptr;
242 auto *buf = this->getBuf();
243 this->len += n;
244 return buf;
245 }
246
247 void sync() {
248 this->flushCbAsync();
249 i32 localIdx = this->idx;
250 for (i32 i = 1; i < WIDTH; i++) {
251 this->syncCb(this->cbs + (localIdx + i) % WIDTH);
252 }
253 auto code = ftruncate(this->fd, this->max_len);
254 assert(code == 0)
255 code = syncfs(this->fd);
256 assert(code == 0)
257 }
258
260 auto pos = this->offset + this->len;
261 return pos;
262 }
263
265 if (this->fd == -1) return;
266 for (i32 i = 0; i < WIDTH; i++) delete (BUF *) this->cbs[i].aio_buf;
267 delete[] this->cbs;
268 }
269};
270
applications::exahype2::acoustic::VariableShortcuts s
Definition Acoustic.cpp:9
#define assert(...)
Definition LinuxAMD.h:28
void syncCb(aiocb *cb)
Definition dma.h:69
aiocb * cbs
Definition dma.h:27
void write(std::string_view s)
Definition dma.h:210
void sync()
Definition dma.h:247
void write(const void *ptr, u64 n)
Definition dma.h:214
DmaWriter(const DmaWriter &)=delete
DmaWriter()=default
u64 getPos()
Definition dma.h:259
void flushCbAsync()
Definition dma.h:145
DmaWriter(i32 fd) noexcept
Definition dma.h:162
void * buffer(u64 n)
Definition dma.h:240
aiocb * pickNextCb()
Definition dma.h:40
u64 max_len
Definition dma.h:31
DmaWriter & operator=(DmaWriter &&other) noexcept
Definition dma.h:176
u64 len
Definition dma.h:29
void seek(u64 pos)
Definition dma.h:231
~DmaWriter()
Definition dma.h:264
void write(T val)
Definition dma.h:198
u8 idx
Definition dma.h:34
char * getBuf()
Definition dma.h:153
i32 fd
Definition dma.h:33
u32 prefill
Definition dma.h:32
static constexpr u64 BUFF_SIZE()
Definition dma.h:18
u64 offset
Definition dma.h:30
void flushCb(aiocb *cb)
Definition dma.h:86
aiocb * getCb()
Definition dma.h:36
constexpr i32 DMA_FLAGS
Definition dma.h:10
Definition type.h:27
std::uint8_t u8
Definition type.h:7
std::uint32_t u32
Definition type.h:11
std::uint64_t u64
Definition type.h:13
std::int32_t i32
Definition type.h:10