peridot_native_io/
linux.rs

1use std::{
2    cell::Cell,
3    os::fd::{AsRawFd, RawFd},
4    path::Path,
5    sync::{Arc, atomic::AtomicU32},
6};
7
8use crate::generic_unix::{UnixFile, UnixFileUnmapData};
9
10#[repr(transparent)]
11pub struct File(RawFd);
12impl Drop for File {
13    #[inline]
14    fn drop(&mut self) {
15        let _ = unsafe { libc::close(self.0) };
16    }
17}
18impl File {
19    #[inline]
20    pub fn open(pathname: &core::ffi::CStr, flags: core::ffi::c_int) -> std::io::Result<Self> {
21        let fd = unsafe { libc::open(pathname.as_ptr(), flags) };
22        if fd < 0 {
23            Err(std::io::Error::last_os_error())
24        } else {
25            Ok(Self(fd))
26        }
27    }
28
29    #[inline]
30    pub fn lseek64(&self, offset: libc::off64_t, whence: core::ffi::c_int) -> std::io::Result<u64> {
31        let r = unsafe { libc::lseek64(self.0, offset, whence) };
32        if r < 0 {
33            Err(std::io::Error::last_os_error())
34        } else {
35            Ok(r.cast_unsigned())
36        }
37    }
38}
39
40#[repr(transparent)]
41pub struct NativeFileBlobRandomReader(UnixFile);
42impl NativeFileBlobRandomReader {
43    #[inline(always)]
44    pub fn open(name: impl AsRef<Path>) -> std::io::Result<Self> {
45        Ok(Self(UnixFile::open(
46            &std::ffi::CString::new(name.as_ref().to_str().expect("invalid utf-8 sequence"))
47                .expect("invalid for cstr"),
48            libc::O_CLOEXEC,
49        )?))
50    }
51}
52impl super::BlobMetadata for NativeFileBlobRandomReader {
53    fn byte_length(&self) -> std::io::Result<u64> {
54        let mut stat = core::mem::MaybeUninit::uninit();
55        self.0.stat64(&mut stat)?;
56        let stat = unsafe { stat.assume_init_ref() };
57
58        Ok(stat.st_size.cast_unsigned())
59    }
60}
61impl super::RandomReadBlob for NativeFileBlobRandomReader {
62    #[inline(always)]
63    fn read(&self, pos: u64, buf: &mut [core::mem::MaybeUninit<u8>]) -> std::io::Result<usize> {
64        self.0.pread(pos as _, buf)
65    }
66
67    #[inline(always)]
68    fn readv(&self, pos: u64, buf: &mut [std::io::IoSliceMut]) -> std::io::Result<usize> {
69        self.0.preadv(pos as _, unsafe {
70            core::mem::transmute::<&mut [std::io::IoSliceMut], &mut [libc::iovec]>(buf)
71        })
72    }
73}
74impl super::MemoryMapBlob for NativeFileBlobRandomReader {
75    type MemoryUnmapData = UnixFileUnmapData;
76
77    #[inline(always)]
78    fn mmap(
79        &self,
80        offs: u64,
81        len: usize,
82    ) -> std::io::Result<(*mut core::ffi::c_void, Self::MemoryUnmapData)> {
83        let r = self
84            .0
85            .mmap(len, libc::PROT_READ, libc::MAP_PRIVATE, offs as _)?;
86
87        Ok((r.data_addr(), r))
88    }
89
90    #[inline(always)]
91    fn munmap(&self, data: Self::MemoryUnmapData) -> std::io::Result<()> {
92        data.unmap()
93    }
94}
95
96pub struct MemoryUnmapData {
97    addr: *mut core::ffi::c_void,
98    len: usize,
99}
100
101#[repr(transparent)]
102pub struct NativeFileAsyncBlobRandomReader(File);
103impl NativeFileAsyncBlobRandomReader {
104    #[inline]
105    pub fn open(name: impl AsRef<Path>) -> std::io::Result<Self> {
106        Ok(Self(File::open(
107            &std::ffi::CString::new(name.as_ref().to_str().expect("invalid utf-8 sequence"))
108                .expect("invalid for cstr"),
109            libc::O_CLOEXEC | libc::O_NONBLOCK,
110        )?))
111    }
112}
113impl super::BlobMetadataAsync for NativeFileAsyncBlobRandomReader {
114    #[inline(always)]
115    fn byte_length_async(&self) -> impl core::future::Future<Output = std::io::Result<u64>> {
116        AsyncNativeFileByteLengthFuture {
117            fd: self,
118            buf: core::mem::MaybeUninit::uninit(),
119            state: Arc::new(Cell::new(AsyncNativeFileReadState::Init)),
120        }
121    }
122}
123impl super::RandomReadBlobAsync for NativeFileAsyncBlobRandomReader {
124    type ReadFuture<'a, 'b>
125        = AsyncNativeFileReadFuture<'a, 'b>
126    where
127        Self: 'a;
128    type ReadVecFuture<'a, 'b, 'b2>
129        = AsyncNativeFileReadVecFuture<'a, 'b, 'b2>
130    where
131        Self: 'a,
132        'b2: 'b;
133
134    #[inline(always)]
135    fn read_async<'a, 'b>(
136        &'a self,
137        pos: u64,
138        buf: &'b mut [core::mem::MaybeUninit<u8>],
139    ) -> Self::ReadFuture<'a, 'b> {
140        AsyncNativeFileReadFuture {
141            fd: self,
142            pos,
143            buf,
144            state: Arc::new(Cell::new(AsyncNativeFileReadState::Init)),
145        }
146    }
147
148    #[inline(always)]
149    fn readv_async<'a, 'b, 'b2>(
150        &'a self,
151        pos: u64,
152        buf: &'b mut [std::io::IoSliceMut<'b2>],
153    ) -> Self::ReadVecFuture<'a, 'b, 'b2> {
154        AsyncNativeFileReadVecFuture {
155            fd: self,
156            pos,
157            iovecs: buf,
158            state: Arc::new(Cell::new(AsyncNativeFileReadState::Init)),
159        }
160    }
161}
162impl super::MemoryMapBlob for NativeFileAsyncBlobRandomReader {
163    type MemoryUnmapData = MemoryUnmapData;
164
165    #[inline]
166    fn mmap(
167        &self,
168        offs: u64,
169        len: usize,
170    ) -> std::io::Result<(*mut core::ffi::c_void, Self::MemoryUnmapData)> {
171        let r = unsafe {
172            libc::mmap(
173                core::ptr::null_mut(),
174                len,
175                libc::PROT_READ,
176                libc::MAP_PRIVATE,
177                self.0.0,
178                offs as _,
179            )
180        };
181        if r == libc::MAP_FAILED {
182            Err(std::io::Error::last_os_error())
183        } else {
184            Ok((r, MemoryUnmapData { addr: r, len }))
185        }
186    }
187
188    #[inline]
189    fn munmap(&self, data: Self::MemoryUnmapData) -> std::io::Result<()> {
190        let r = unsafe { libc::munmap(data.addr, data.len) };
191        if r < 0 {
192            Err(std::io::Error::last_os_error())
193        } else {
194            Ok(())
195        }
196    }
197}
198
199#[derive(Clone, Copy)]
200pub enum AsyncNativeFileReadState {
201    Init,
202    Pending,
203    CompletedSuccess(usize),
204    CompletedFailure(i32),
205}
206
207pub struct AsyncNativeFileReadFuture<'a, 'b> {
208    fd: &'a NativeFileAsyncBlobRandomReader,
209    pos: u64,
210    buf: &'b mut [core::mem::MaybeUninit<u8>],
211    state: Arc<Cell<AsyncNativeFileReadState>>,
212}
213impl<'a, 'b> Future for AsyncNativeFileReadFuture<'a, 'b> {
214    type Output = std::io::Result<usize>;
215
216    fn poll(
217        self: std::pin::Pin<&mut Self>,
218        cx: &mut std::task::Context<'_>,
219    ) -> std::task::Poll<Self::Output> {
220        let this = self.get_mut();
221
222        match this.state.get() {
223            AsyncNativeFileReadState::Init => {
224                // first call
225                IoReactorHandle::current()
226                    .expect("no reactor running")
227                    .pusher
228                    .push(|sqe| unsafe {
229                        core::ptr::write_volatile(&mut sqe.fd, this.fd.0.0);
230                        core::ptr::write_volatile(
231                            &mut sqe.opcode,
232                            linux_io_uring::ffi::IORING_OP_READ as _,
233                        );
234                        core::ptr::write_volatile(&mut sqe.union1.off, this.pos);
235                        core::ptr::write_volatile(
236                            &mut sqe.union2.addr,
237                            this.buf.as_mut_ptr() as usize as _,
238                        );
239                        core::ptr::write_volatile(&mut sqe.len, this.buf.len() as _);
240                        // TODO: 毎回mallocするのはちょっとやめたい気もする うまい感じのpoolつくれないか......
241                        core::ptr::write_volatile(
242                            &mut sqe.user_data,
243                            Box::into_raw(Box::new(ReadFutureQueueData::Read {
244                                state: Arc::downgrade(&this.state),
245                                waker: cx.waker().clone(),
246                            })) as usize as _,
247                        );
248                    });
249
250                this.state.set(AsyncNativeFileReadState::Pending);
251                core::task::Poll::Pending
252            }
253            AsyncNativeFileReadState::Pending => core::task::Poll::Pending,
254            AsyncNativeFileReadState::CompletedSuccess(res) => core::task::Poll::Ready(Ok(res)),
255            AsyncNativeFileReadState::CompletedFailure(e) => {
256                core::task::Poll::Ready(Err(std::io::Error::from_raw_os_error(e)))
257            }
258        }
259    }
260}
261
262pub struct AsyncNativeFileReadVecFuture<'a, 'b, 'b2> {
263    fd: &'a NativeFileAsyncBlobRandomReader,
264    pos: u64,
265    iovecs: &'b mut [std::io::IoSliceMut<'b2>],
266    state: Arc<Cell<AsyncNativeFileReadState>>,
267}
268impl<'a, 'b, 'b2> Future for AsyncNativeFileReadVecFuture<'a, 'b, 'b2> {
269    type Output = std::io::Result<usize>;
270
271    fn poll(
272        self: std::pin::Pin<&mut Self>,
273        cx: &mut std::task::Context<'_>,
274    ) -> std::task::Poll<Self::Output> {
275        let this = self.get_mut();
276
277        match this.state.get() {
278            AsyncNativeFileReadState::Init => {
279                // first call
280
281                IoReactorHandle::current()
282                    .expect("no reactor running")
283                    .pusher
284                    .push(|sqe| unsafe {
285                        core::ptr::write_volatile(&mut sqe.fd, this.fd.0.0);
286                        core::ptr::write_volatile(
287                            &mut sqe.opcode,
288                            linux_io_uring::ffi::IORING_OP_READV as _,
289                        );
290                        core::ptr::write_volatile(&mut sqe.union1.off, this.pos);
291                        core::ptr::write_volatile(
292                            &mut sqe.union2.addr,
293                            this.iovecs.as_mut_ptr() as usize as _,
294                        );
295                        core::ptr::write_volatile(&mut sqe.len, this.iovecs.len() as _);
296                        // TODO: 毎回mallocするのはちょっとやめたい気もする うまい感じのpoolつくれないか......
297                        core::ptr::write_volatile(
298                            &mut sqe.user_data,
299                            Box::into_raw(Box::new(ReadFutureQueueData::Read {
300                                state: Arc::downgrade(&this.state),
301                                waker: cx.waker().clone(),
302                            })) as usize as _,
303                        );
304                    });
305
306                this.state.set(AsyncNativeFileReadState::Pending);
307                core::task::Poll::Pending
308            }
309            AsyncNativeFileReadState::Pending => core::task::Poll::Pending,
310            AsyncNativeFileReadState::CompletedSuccess(res) => core::task::Poll::Ready(Ok(res)),
311            AsyncNativeFileReadState::CompletedFailure(e) => {
312                core::task::Poll::Ready(Err(std::io::Error::from_raw_os_error(e)))
313            }
314        }
315    }
316}
317
318const EMPTY_CSTR: &'static core::ffi::CStr =
319    unsafe { core::ffi::CStr::from_bytes_with_nul_unchecked(&[0]) };
320
321pub struct AsyncNativeFileByteLengthFuture<'a> {
322    fd: &'a NativeFileAsyncBlobRandomReader,
323    buf: core::mem::MaybeUninit<libc::statx>,
324    state: Arc<Cell<AsyncNativeFileReadState>>,
325}
326impl<'a> Future for AsyncNativeFileByteLengthFuture<'a> {
327    type Output = std::io::Result<u64>;
328
329    fn poll(
330        self: std::pin::Pin<&mut Self>,
331        cx: &mut std::task::Context<'_>,
332    ) -> std::task::Poll<Self::Output> {
333        let this = self.get_mut();
334
335        match this.state.get() {
336            AsyncNativeFileReadState::Init => {
337                // first call
338                IoReactorHandle::current()
339                    .expect("no reactor running")
340                    .pusher
341                    .push(|sqe| unsafe {
342                        core::ptr::write_volatile(&mut sqe.fd, this.fd.0.0);
343                        core::ptr::write_volatile(
344                            &mut sqe.opcode,
345                            linux_io_uring::ffi::IORING_OP_STATX as _,
346                        );
347                        core::ptr::write_volatile(&mut sqe.union2.addr, EMPTY_CSTR.as_ptr() as _);
348                        core::ptr::write_volatile(
349                            &mut sqe.union3.statx_flags,
350                            libc::AT_EMPTY_PATH as _,
351                        );
352                        core::ptr::write_volatile(&mut sqe.len, libc::STATX_SIZE);
353                        core::ptr::write_volatile(
354                            &mut sqe.union1.off,
355                            this.buf.as_mut_ptr().addr() as _,
356                        );
357                        // TODO: 毎回mallocするのはちょっとやめたい気もする うまい感じのpoolつくれないか......
358                        core::ptr::write_volatile(
359                            &mut sqe.user_data,
360                            Box::into_raw(Box::new(ReadFutureQueueData::Read {
361                                state: Arc::downgrade(&this.state),
362                                waker: cx.waker().clone(),
363                            })) as usize as _,
364                        );
365                    });
366
367                this.state.set(AsyncNativeFileReadState::Pending);
368                core::task::Poll::Pending
369            }
370            AsyncNativeFileReadState::Pending => core::task::Poll::Pending,
371            AsyncNativeFileReadState::CompletedSuccess(_) => {
372                core::task::Poll::Ready(Ok(unsafe { this.buf.assume_init_ref().stx_size }))
373            }
374            AsyncNativeFileReadState::CompletedFailure(e) => {
375                core::task::Poll::Ready(Err(std::io::Error::from_raw_os_error(e)))
376            }
377        }
378    }
379}
380
381pub enum ReadFutureQueueData {
382    Read {
383        state: std::sync::Weak<Cell<AsyncNativeFileReadState>>,
384        waker: core::task::Waker,
385    },
386}
387
388#[derive(Clone)]
389pub struct IoReactorHandle {
390    pusher: SubmissionQueuePusher,
391}
392impl IoReactorHandle {
393    #[inline]
394    pub fn current() -> Option<Self> {
395        IO_REACTOR_CURRENT_HANDLE.write().expect("poisoned").clone()
396    }
397}
398
399static IO_REACTOR_CURRENT_HANDLE: std::sync::RwLock<Option<IoReactorHandle>> =
400    std::sync::RwLock::new(None);
401
402struct IoUringContext {
403    uring: linux_io_uring::IoUring,
404    sq_ptr: *mut core::ffi::c_void,
405    sq_size: usize,
406    // None for shared with sq
407    cq: Option<(*mut core::ffi::c_void, usize)>,
408    sring_tail_ptr: *mut u32,
409    sring_mask_ptr: *mut u32,
410    sring_array_head_ptr: *mut u32,
411    cring_head_ptr: *mut u32,
412    cring_tail_ptr: *mut u32,
413    cring_mask_ptr: *mut u32,
414    cqes_ptr: *mut linux_io_uring::ffi::io_uring_cqe,
415    sqes: *mut linux_io_uring::ffi::io_uring_sqe,
416    sqes_size: usize,
417}
418unsafe impl Sync for IoUringContext {}
419unsafe impl Send for IoUringContext {}
420impl Drop for IoUringContext {
421    fn drop(&mut self) {
422        let _ = unsafe { libc::munmap(self.sq_ptr, self.sq_size) };
423        if let Some((ptr, size)) = self.cq.take() {
424            let _ = unsafe { libc::munmap(ptr, size) };
425        }
426        let _ = unsafe { libc::munmap(self.sqes.cast(), self.sqes_size) };
427    }
428}
429impl IoUringContext {
430    fn new() -> Self {
431        let mut params = linux_io_uring::ffi::io_uring_params {
432            ..unsafe { core::mem::MaybeUninit::zeroed().assume_init() }
433        };
434        let uring = linux_io_uring::IoUring::new(32, &mut params).expect("IoUring::new");
435
436        let is_shared_cq_sq = (params.features & linux_io_uring::ffi::IORING_FEAT_SINGLE_MMAP) != 0;
437        let mut sring_size = params.sq_off.array as usize
438            + params.sq_entries as usize * core::mem::size_of::<core::ffi::c_uint>();
439        let mut cring_size = params.cq_off.cqes as usize
440            + params.cq_entries as usize
441                * core::mem::size_of::<linux_io_uring::ffi::io_uring_cqe>();
442        if is_shared_cq_sq {
443            // can be shared with sring and cring
444            let ring_size = sring_size.max(cring_size);
445            sring_size = ring_size;
446            cring_size = ring_size;
447        }
448        let sq_ptr = unsafe {
449            libc::mmap(
450                core::ptr::null_mut(),
451                sring_size,
452                libc::PROT_READ | libc::PROT_WRITE,
453                libc::MAP_SHARED | libc::MAP_POPULATE,
454                uring.as_raw_fd(),
455                linux_io_uring::ffi::IORING_OFF_SQ_RING as _,
456            )
457        };
458        if sq_ptr == libc::MAP_FAILED {
459            panic!("sq map failed");
460        }
461
462        let cq_ptr = if is_shared_cq_sq {
463            sq_ptr
464        } else {
465            let p = unsafe {
466                libc::mmap(
467                    core::ptr::null_mut(),
468                    cring_size,
469                    libc::PROT_READ | libc::PROT_WRITE,
470                    libc::MAP_SHARED | libc::MAP_POPULATE,
471                    uring.as_raw_fd(),
472                    linux_io_uring::ffi::IORING_OFF_CQ_RING as _,
473                )
474            };
475            if p == libc::MAP_FAILED {
476                panic!("cq map failed");
477            }
478            p
479        };
480
481        let sqes_size =
482            params.sq_entries as usize * core::mem::size_of::<linux_io_uring::ffi::io_uring_sqe>();
483        let sqes = unsafe {
484            libc::mmap(
485                core::ptr::null_mut(),
486                sqes_size,
487                libc::PROT_READ | libc::PROT_WRITE,
488                libc::MAP_SHARED | libc::MAP_POPULATE,
489                uring.as_raw_fd(),
490                linux_io_uring::ffi::IORING_OFF_SQES as _,
491            )
492        };
493        if sqes == libc::MAP_FAILED {
494            panic!("sqes map failed");
495        }
496
497        let sring_tail = unsafe { sq_ptr.byte_add(params.sq_off.tail as _) };
498        let sring_mask = unsafe { sq_ptr.byte_add(params.sq_off.ring_mask as _) };
499        let sring_array = unsafe { sq_ptr.byte_add(params.sq_off.array as _) };
500        let cring_head = unsafe { cq_ptr.byte_add(params.cq_off.head as _) };
501        let cring_tail = unsafe { cq_ptr.byte_add(params.cq_off.tail as _) };
502        let cring_mask = unsafe { cq_ptr.byte_add(params.cq_off.ring_mask as _) };
503        let cqes = unsafe { cq_ptr.byte_add(params.cq_off.cqes as _) };
504
505        Self {
506            uring,
507            sq_ptr,
508            sq_size: sring_size,
509            cq: if is_shared_cq_sq {
510                None
511            } else {
512                Some((cq_ptr, cring_size))
513            },
514            sring_tail_ptr: sring_tail.cast(),
515            sring_mask_ptr: sring_mask.cast(),
516            sring_array_head_ptr: sring_array.cast(),
517            cring_head_ptr: cring_head.cast(),
518            cring_tail_ptr: cring_tail.cast(),
519            cring_mask_ptr: cring_mask.cast(),
520            cqes_ptr: cqes.cast(),
521            sqes: sqes.cast(),
522            sqes_size,
523        }
524    }
525
526    #[inline(always)]
527    const fn sring_tail(&self) -> &AtomicU32 {
528        unsafe { AtomicU32::from_ptr(self.sring_tail_ptr) }
529    }
530
531    #[inline(always)]
532    const fn cring_head(&self) -> &AtomicU32 {
533        unsafe { AtomicU32::from_ptr(self.cring_head_ptr) }
534    }
535
536    #[inline(always)]
537    const fn cring_tail(&self) -> &AtomicU32 {
538        unsafe { AtomicU32::from_ptr(self.cring_tail_ptr) }
539    }
540}
541
542struct CompletionQueueTaker {
543    context: Arc<IoUringContext>,
544}
545impl CompletionQueueTaker {
546    fn new(context: &Arc<IoUringContext>) -> Self {
547        Self {
548            context: context.clone(),
549        }
550    }
551
552    fn try_take(&self, process: impl FnOnce(&linux_io_uring::ffi::io_uring_cqe)) -> bool {
553        let head = unsafe { *self.context.cring_head_ptr };
554        if head
555            == self
556                .context
557                .cring_tail()
558                .load(core::sync::atomic::Ordering::Acquire)
559        {
560            // empty
561            return false;
562        }
563
564        let index = head & unsafe { *self.context.cring_mask_ptr };
565        process(unsafe { &*self.context.cqes_ptr.add(index as _) });
566        self.context
567            .cring_head()
568            .store(head + 1, core::sync::atomic::Ordering::Release);
569        true
570    }
571}
572
573#[derive(Clone)]
574struct SubmissionQueuePusher {
575    context: Arc<IoUringContext>,
576}
577impl SubmissionQueuePusher {
578    fn new(context: &Arc<IoUringContext>) -> Self {
579        Self {
580            context: context.clone(),
581        }
582    }
583
584    fn push(&self, describe_io: impl FnOnce(&mut linux_io_uring::ffi::io_uring_sqe)) {
585        let tail = unsafe { *self.context.sring_tail_ptr };
586        let index = tail & unsafe { *self.context.sring_mask_ptr };
587        describe_io(unsafe { &mut *self.context.sqes.add(index as _) });
588        unsafe {
589            core::ptr::write_volatile(self.context.sring_array_head_ptr.add(index as _), index);
590        }
591        self.context
592            .sring_tail()
593            .store(tail + 1, core::sync::atomic::Ordering::Release);
594        self.context
595            .uring
596            .enter(1, 0, 0, core::ptr::null_mut())
597            .expect("io_uring_enter");
598    }
599}
600
601pub struct IoReactorThread {
602    join_handle: Option<std::thread::JoinHandle<()>>,
603}
604impl Drop for IoReactorThread {
605    fn drop(&mut self) {
606        let Some(join_handle) = self.join_handle.take() else {
607            // already dropped?
608            return;
609        };
610        let Some(reactor_handle) = IO_REACTOR_CURRENT_HANDLE.write().expect("poisoned").take()
611        else {
612            // already dropped?
613            return;
614        };
615
616        reactor_handle.pusher.push(|sqe| {
617            sqe.opcode = linux_io_uring::ffi::IORING_OP_MSG_RING as _;
618            sqe.flags = linux_io_uring::ffi::IOSQE_CQE_SKIP_SUCCESS;
619            sqe.fd = reactor_handle.pusher.context.uring.as_raw_fd();
620            sqe.len = 0;
621            sqe.union1.off = 0;
622            sqe.union2.addr = 0; // これが必要
623        });
624        join_handle.join().expect("err in IoReactorThread");
625    }
626}
627impl IoReactorThread {
628    pub fn spawn() -> Self {
629        let uring = Arc::new(IoUringContext::new());
630        let cq_taker = CompletionQueueTaker::new(&uring);
631
632        *IO_REACTOR_CURRENT_HANDLE.write().expect("poisoned") = Some(IoReactorHandle {
633            pusher: SubmissionQueuePusher::new(&uring),
634        });
635
636        let join_handle = std::thread::Builder::new()
637            .name("Peridot NativeIO Reactor".into())
638            .spawn({
639                let uring = uring.clone();
640
641                move || {
642                    let mut terminated = false;
643                    while !terminated {
644                        uring
645                            .uring
646                            .enter(
647                                0,
648                                1,
649                                linux_io_uring::ffi::IORING_ENTER_GETEVENTS,
650                                core::ptr::null_mut(),
651                            )
652                            .expect("uring enter for wait events");
653                        cq_taker.try_take(|cqe| {
654                            // process cqe
655                            let res = cqe.res;
656                            let user_data = cqe.user_data;
657
658                            if user_data == 0 {
659                                // termination message
660                                terminated = true;
661                                return;
662                            }
663
664                            let queue_data: Box<ReadFutureQueueData> = unsafe {
665                                Box::from_raw(core::ptr::with_exposed_provenance_mut(
666                                    user_data as _,
667                                ))
668                            };
669                            match &*queue_data {
670                                ReadFutureQueueData::Read { state, waker } => {
671                                    if let Some(st) = state.upgrade() {
672                                        st.set(if res < 0 {
673                                            AsyncNativeFileReadState::CompletedFailure(-res)
674                                        } else {
675                                            AsyncNativeFileReadState::CompletedSuccess(
676                                                res.cast_unsigned() as _,
677                                            )
678                                        });
679                                    }
680
681                                    waker.wake_by_ref();
682                                }
683                            }
684                        });
685                    }
686                }
687            })
688            .expect("Failed to spawn async fileio thread");
689
690        Self {
691            join_handle: Some(join_handle),
692        }
693    }
694}