1use std::{
2 cell::Cell,
3 os::fd::{AsRawFd, RawFd},
4 path::Path,
5 sync::{Arc, atomic::AtomicU32},
6};
7
8use crate::generic_unix::{UnixFile, UnixFileUnmapData};
9
10#[repr(transparent)]
11pub struct File(RawFd);
12impl Drop for File {
13 #[inline]
14 fn drop(&mut self) {
15 let _ = unsafe { libc::close(self.0) };
16 }
17}
18impl File {
19 #[inline]
20 pub fn open(pathname: &core::ffi::CStr, flags: core::ffi::c_int) -> std::io::Result<Self> {
21 let fd = unsafe { libc::open(pathname.as_ptr(), flags) };
22 if fd < 0 {
23 Err(std::io::Error::last_os_error())
24 } else {
25 Ok(Self(fd))
26 }
27 }
28
29 #[inline]
30 pub fn lseek64(&self, offset: libc::off64_t, whence: core::ffi::c_int) -> std::io::Result<u64> {
31 let r = unsafe { libc::lseek64(self.0, offset, whence) };
32 if r < 0 {
33 Err(std::io::Error::last_os_error())
34 } else {
35 Ok(r.cast_unsigned())
36 }
37 }
38}
39
40#[repr(transparent)]
41pub struct NativeFileBlobRandomReader(UnixFile);
42impl NativeFileBlobRandomReader {
43 #[inline(always)]
44 pub fn open(name: impl AsRef<Path>) -> std::io::Result<Self> {
45 Ok(Self(UnixFile::open(
46 &std::ffi::CString::new(name.as_ref().to_str().expect("invalid utf-8 sequence"))
47 .expect("invalid for cstr"),
48 libc::O_CLOEXEC,
49 )?))
50 }
51}
52impl super::BlobMetadata for NativeFileBlobRandomReader {
53 fn byte_length(&self) -> std::io::Result<u64> {
54 let mut stat = core::mem::MaybeUninit::uninit();
55 self.0.stat64(&mut stat)?;
56 let stat = unsafe { stat.assume_init_ref() };
57
58 Ok(stat.st_size.cast_unsigned())
59 }
60}
61impl super::RandomReadBlob for NativeFileBlobRandomReader {
62 #[inline(always)]
63 fn read(&self, pos: u64, buf: &mut [core::mem::MaybeUninit<u8>]) -> std::io::Result<usize> {
64 self.0.pread(pos as _, buf)
65 }
66
67 #[inline(always)]
68 fn readv(&self, pos: u64, buf: &mut [std::io::IoSliceMut]) -> std::io::Result<usize> {
69 self.0.preadv(pos as _, unsafe {
70 core::mem::transmute::<&mut [std::io::IoSliceMut], &mut [libc::iovec]>(buf)
71 })
72 }
73}
74impl super::MemoryMapBlob for NativeFileBlobRandomReader {
75 type MemoryUnmapData = UnixFileUnmapData;
76
77 #[inline(always)]
78 fn mmap(
79 &self,
80 offs: u64,
81 len: usize,
82 ) -> std::io::Result<(*mut core::ffi::c_void, Self::MemoryUnmapData)> {
83 let r = self
84 .0
85 .mmap(len, libc::PROT_READ, libc::MAP_PRIVATE, offs as _)?;
86
87 Ok((r.data_addr(), r))
88 }
89
90 #[inline(always)]
91 fn munmap(&self, data: Self::MemoryUnmapData) -> std::io::Result<()> {
92 data.unmap()
93 }
94}
95
96pub struct MemoryUnmapData {
97 addr: *mut core::ffi::c_void,
98 len: usize,
99}
100
101#[repr(transparent)]
102pub struct NativeFileAsyncBlobRandomReader(File);
103impl NativeFileAsyncBlobRandomReader {
104 #[inline]
105 pub fn open(name: impl AsRef<Path>) -> std::io::Result<Self> {
106 Ok(Self(File::open(
107 &std::ffi::CString::new(name.as_ref().to_str().expect("invalid utf-8 sequence"))
108 .expect("invalid for cstr"),
109 libc::O_CLOEXEC | libc::O_NONBLOCK,
110 )?))
111 }
112}
113impl super::BlobMetadataAsync for NativeFileAsyncBlobRandomReader {
114 #[inline(always)]
115 fn byte_length_async(&self) -> impl core::future::Future<Output = std::io::Result<u64>> {
116 AsyncNativeFileByteLengthFuture {
117 fd: self,
118 buf: core::mem::MaybeUninit::uninit(),
119 state: Arc::new(Cell::new(AsyncNativeFileReadState::Init)),
120 }
121 }
122}
123impl super::RandomReadBlobAsync for NativeFileAsyncBlobRandomReader {
124 type ReadFuture<'a, 'b>
125 = AsyncNativeFileReadFuture<'a, 'b>
126 where
127 Self: 'a;
128 type ReadVecFuture<'a, 'b, 'b2>
129 = AsyncNativeFileReadVecFuture<'a, 'b, 'b2>
130 where
131 Self: 'a,
132 'b2: 'b;
133
134 #[inline(always)]
135 fn read_async<'a, 'b>(
136 &'a self,
137 pos: u64,
138 buf: &'b mut [core::mem::MaybeUninit<u8>],
139 ) -> Self::ReadFuture<'a, 'b> {
140 AsyncNativeFileReadFuture {
141 fd: self,
142 pos,
143 buf,
144 state: Arc::new(Cell::new(AsyncNativeFileReadState::Init)),
145 }
146 }
147
148 #[inline(always)]
149 fn readv_async<'a, 'b, 'b2>(
150 &'a self,
151 pos: u64,
152 buf: &'b mut [std::io::IoSliceMut<'b2>],
153 ) -> Self::ReadVecFuture<'a, 'b, 'b2> {
154 AsyncNativeFileReadVecFuture {
155 fd: self,
156 pos,
157 iovecs: buf,
158 state: Arc::new(Cell::new(AsyncNativeFileReadState::Init)),
159 }
160 }
161}
162impl super::MemoryMapBlob for NativeFileAsyncBlobRandomReader {
163 type MemoryUnmapData = MemoryUnmapData;
164
165 #[inline]
166 fn mmap(
167 &self,
168 offs: u64,
169 len: usize,
170 ) -> std::io::Result<(*mut core::ffi::c_void, Self::MemoryUnmapData)> {
171 let r = unsafe {
172 libc::mmap(
173 core::ptr::null_mut(),
174 len,
175 libc::PROT_READ,
176 libc::MAP_PRIVATE,
177 self.0.0,
178 offs as _,
179 )
180 };
181 if r == libc::MAP_FAILED {
182 Err(std::io::Error::last_os_error())
183 } else {
184 Ok((r, MemoryUnmapData { addr: r, len }))
185 }
186 }
187
188 #[inline]
189 fn munmap(&self, data: Self::MemoryUnmapData) -> std::io::Result<()> {
190 let r = unsafe { libc::munmap(data.addr, data.len) };
191 if r < 0 {
192 Err(std::io::Error::last_os_error())
193 } else {
194 Ok(())
195 }
196 }
197}
198
199#[derive(Clone, Copy)]
200pub enum AsyncNativeFileReadState {
201 Init,
202 Pending,
203 CompletedSuccess(usize),
204 CompletedFailure(i32),
205}
206
207pub struct AsyncNativeFileReadFuture<'a, 'b> {
208 fd: &'a NativeFileAsyncBlobRandomReader,
209 pos: u64,
210 buf: &'b mut [core::mem::MaybeUninit<u8>],
211 state: Arc<Cell<AsyncNativeFileReadState>>,
212}
213impl<'a, 'b> Future for AsyncNativeFileReadFuture<'a, 'b> {
214 type Output = std::io::Result<usize>;
215
216 fn poll(
217 self: std::pin::Pin<&mut Self>,
218 cx: &mut std::task::Context<'_>,
219 ) -> std::task::Poll<Self::Output> {
220 let this = self.get_mut();
221
222 match this.state.get() {
223 AsyncNativeFileReadState::Init => {
224 IoReactorHandle::current()
226 .expect("no reactor running")
227 .pusher
228 .push(|sqe| unsafe {
229 core::ptr::write_volatile(&mut sqe.fd, this.fd.0.0);
230 core::ptr::write_volatile(
231 &mut sqe.opcode,
232 linux_io_uring::ffi::IORING_OP_READ as _,
233 );
234 core::ptr::write_volatile(&mut sqe.union1.off, this.pos);
235 core::ptr::write_volatile(
236 &mut sqe.union2.addr,
237 this.buf.as_mut_ptr() as usize as _,
238 );
239 core::ptr::write_volatile(&mut sqe.len, this.buf.len() as _);
240 core::ptr::write_volatile(
242 &mut sqe.user_data,
243 Box::into_raw(Box::new(ReadFutureQueueData::Read {
244 state: Arc::downgrade(&this.state),
245 waker: cx.waker().clone(),
246 })) as usize as _,
247 );
248 });
249
250 this.state.set(AsyncNativeFileReadState::Pending);
251 core::task::Poll::Pending
252 }
253 AsyncNativeFileReadState::Pending => core::task::Poll::Pending,
254 AsyncNativeFileReadState::CompletedSuccess(res) => core::task::Poll::Ready(Ok(res)),
255 AsyncNativeFileReadState::CompletedFailure(e) => {
256 core::task::Poll::Ready(Err(std::io::Error::from_raw_os_error(e)))
257 }
258 }
259 }
260}
261
262pub struct AsyncNativeFileReadVecFuture<'a, 'b, 'b2> {
263 fd: &'a NativeFileAsyncBlobRandomReader,
264 pos: u64,
265 iovecs: &'b mut [std::io::IoSliceMut<'b2>],
266 state: Arc<Cell<AsyncNativeFileReadState>>,
267}
268impl<'a, 'b, 'b2> Future for AsyncNativeFileReadVecFuture<'a, 'b, 'b2> {
269 type Output = std::io::Result<usize>;
270
271 fn poll(
272 self: std::pin::Pin<&mut Self>,
273 cx: &mut std::task::Context<'_>,
274 ) -> std::task::Poll<Self::Output> {
275 let this = self.get_mut();
276
277 match this.state.get() {
278 AsyncNativeFileReadState::Init => {
279 IoReactorHandle::current()
282 .expect("no reactor running")
283 .pusher
284 .push(|sqe| unsafe {
285 core::ptr::write_volatile(&mut sqe.fd, this.fd.0.0);
286 core::ptr::write_volatile(
287 &mut sqe.opcode,
288 linux_io_uring::ffi::IORING_OP_READV as _,
289 );
290 core::ptr::write_volatile(&mut sqe.union1.off, this.pos);
291 core::ptr::write_volatile(
292 &mut sqe.union2.addr,
293 this.iovecs.as_mut_ptr() as usize as _,
294 );
295 core::ptr::write_volatile(&mut sqe.len, this.iovecs.len() as _);
296 core::ptr::write_volatile(
298 &mut sqe.user_data,
299 Box::into_raw(Box::new(ReadFutureQueueData::Read {
300 state: Arc::downgrade(&this.state),
301 waker: cx.waker().clone(),
302 })) as usize as _,
303 );
304 });
305
306 this.state.set(AsyncNativeFileReadState::Pending);
307 core::task::Poll::Pending
308 }
309 AsyncNativeFileReadState::Pending => core::task::Poll::Pending,
310 AsyncNativeFileReadState::CompletedSuccess(res) => core::task::Poll::Ready(Ok(res)),
311 AsyncNativeFileReadState::CompletedFailure(e) => {
312 core::task::Poll::Ready(Err(std::io::Error::from_raw_os_error(e)))
313 }
314 }
315 }
316}
317
318const EMPTY_CSTR: &'static core::ffi::CStr =
319 unsafe { core::ffi::CStr::from_bytes_with_nul_unchecked(&[0]) };
320
321pub struct AsyncNativeFileByteLengthFuture<'a> {
322 fd: &'a NativeFileAsyncBlobRandomReader,
323 buf: core::mem::MaybeUninit<libc::statx>,
324 state: Arc<Cell<AsyncNativeFileReadState>>,
325}
326impl<'a> Future for AsyncNativeFileByteLengthFuture<'a> {
327 type Output = std::io::Result<u64>;
328
329 fn poll(
330 self: std::pin::Pin<&mut Self>,
331 cx: &mut std::task::Context<'_>,
332 ) -> std::task::Poll<Self::Output> {
333 let this = self.get_mut();
334
335 match this.state.get() {
336 AsyncNativeFileReadState::Init => {
337 IoReactorHandle::current()
339 .expect("no reactor running")
340 .pusher
341 .push(|sqe| unsafe {
342 core::ptr::write_volatile(&mut sqe.fd, this.fd.0.0);
343 core::ptr::write_volatile(
344 &mut sqe.opcode,
345 linux_io_uring::ffi::IORING_OP_STATX as _,
346 );
347 core::ptr::write_volatile(&mut sqe.union2.addr, EMPTY_CSTR.as_ptr() as _);
348 core::ptr::write_volatile(
349 &mut sqe.union3.statx_flags,
350 libc::AT_EMPTY_PATH as _,
351 );
352 core::ptr::write_volatile(&mut sqe.len, libc::STATX_SIZE);
353 core::ptr::write_volatile(
354 &mut sqe.union1.off,
355 this.buf.as_mut_ptr().addr() as _,
356 );
357 core::ptr::write_volatile(
359 &mut sqe.user_data,
360 Box::into_raw(Box::new(ReadFutureQueueData::Read {
361 state: Arc::downgrade(&this.state),
362 waker: cx.waker().clone(),
363 })) as usize as _,
364 );
365 });
366
367 this.state.set(AsyncNativeFileReadState::Pending);
368 core::task::Poll::Pending
369 }
370 AsyncNativeFileReadState::Pending => core::task::Poll::Pending,
371 AsyncNativeFileReadState::CompletedSuccess(_) => {
372 core::task::Poll::Ready(Ok(unsafe { this.buf.assume_init_ref().stx_size }))
373 }
374 AsyncNativeFileReadState::CompletedFailure(e) => {
375 core::task::Poll::Ready(Err(std::io::Error::from_raw_os_error(e)))
376 }
377 }
378 }
379}
380
381pub enum ReadFutureQueueData {
382 Read {
383 state: std::sync::Weak<Cell<AsyncNativeFileReadState>>,
384 waker: core::task::Waker,
385 },
386}
387
388#[derive(Clone)]
389pub struct IoReactorHandle {
390 pusher: SubmissionQueuePusher,
391}
392impl IoReactorHandle {
393 #[inline]
394 pub fn current() -> Option<Self> {
395 IO_REACTOR_CURRENT_HANDLE.write().expect("poisoned").clone()
396 }
397}
398
399static IO_REACTOR_CURRENT_HANDLE: std::sync::RwLock<Option<IoReactorHandle>> =
400 std::sync::RwLock::new(None);
401
402struct IoUringContext {
403 uring: linux_io_uring::IoUring,
404 sq_ptr: *mut core::ffi::c_void,
405 sq_size: usize,
406 cq: Option<(*mut core::ffi::c_void, usize)>,
408 sring_tail_ptr: *mut u32,
409 sring_mask_ptr: *mut u32,
410 sring_array_head_ptr: *mut u32,
411 cring_head_ptr: *mut u32,
412 cring_tail_ptr: *mut u32,
413 cring_mask_ptr: *mut u32,
414 cqes_ptr: *mut linux_io_uring::ffi::io_uring_cqe,
415 sqes: *mut linux_io_uring::ffi::io_uring_sqe,
416 sqes_size: usize,
417}
418unsafe impl Sync for IoUringContext {}
419unsafe impl Send for IoUringContext {}
420impl Drop for IoUringContext {
421 fn drop(&mut self) {
422 let _ = unsafe { libc::munmap(self.sq_ptr, self.sq_size) };
423 if let Some((ptr, size)) = self.cq.take() {
424 let _ = unsafe { libc::munmap(ptr, size) };
425 }
426 let _ = unsafe { libc::munmap(self.sqes.cast(), self.sqes_size) };
427 }
428}
429impl IoUringContext {
430 fn new() -> Self {
431 let mut params = linux_io_uring::ffi::io_uring_params {
432 ..unsafe { core::mem::MaybeUninit::zeroed().assume_init() }
433 };
434 let uring = linux_io_uring::IoUring::new(32, &mut params).expect("IoUring::new");
435
436 let is_shared_cq_sq = (params.features & linux_io_uring::ffi::IORING_FEAT_SINGLE_MMAP) != 0;
437 let mut sring_size = params.sq_off.array as usize
438 + params.sq_entries as usize * core::mem::size_of::<core::ffi::c_uint>();
439 let mut cring_size = params.cq_off.cqes as usize
440 + params.cq_entries as usize
441 * core::mem::size_of::<linux_io_uring::ffi::io_uring_cqe>();
442 if is_shared_cq_sq {
443 let ring_size = sring_size.max(cring_size);
445 sring_size = ring_size;
446 cring_size = ring_size;
447 }
448 let sq_ptr = unsafe {
449 libc::mmap(
450 core::ptr::null_mut(),
451 sring_size,
452 libc::PROT_READ | libc::PROT_WRITE,
453 libc::MAP_SHARED | libc::MAP_POPULATE,
454 uring.as_raw_fd(),
455 linux_io_uring::ffi::IORING_OFF_SQ_RING as _,
456 )
457 };
458 if sq_ptr == libc::MAP_FAILED {
459 panic!("sq map failed");
460 }
461
462 let cq_ptr = if is_shared_cq_sq {
463 sq_ptr
464 } else {
465 let p = unsafe {
466 libc::mmap(
467 core::ptr::null_mut(),
468 cring_size,
469 libc::PROT_READ | libc::PROT_WRITE,
470 libc::MAP_SHARED | libc::MAP_POPULATE,
471 uring.as_raw_fd(),
472 linux_io_uring::ffi::IORING_OFF_CQ_RING as _,
473 )
474 };
475 if p == libc::MAP_FAILED {
476 panic!("cq map failed");
477 }
478 p
479 };
480
481 let sqes_size =
482 params.sq_entries as usize * core::mem::size_of::<linux_io_uring::ffi::io_uring_sqe>();
483 let sqes = unsafe {
484 libc::mmap(
485 core::ptr::null_mut(),
486 sqes_size,
487 libc::PROT_READ | libc::PROT_WRITE,
488 libc::MAP_SHARED | libc::MAP_POPULATE,
489 uring.as_raw_fd(),
490 linux_io_uring::ffi::IORING_OFF_SQES as _,
491 )
492 };
493 if sqes == libc::MAP_FAILED {
494 panic!("sqes map failed");
495 }
496
497 let sring_tail = unsafe { sq_ptr.byte_add(params.sq_off.tail as _) };
498 let sring_mask = unsafe { sq_ptr.byte_add(params.sq_off.ring_mask as _) };
499 let sring_array = unsafe { sq_ptr.byte_add(params.sq_off.array as _) };
500 let cring_head = unsafe { cq_ptr.byte_add(params.cq_off.head as _) };
501 let cring_tail = unsafe { cq_ptr.byte_add(params.cq_off.tail as _) };
502 let cring_mask = unsafe { cq_ptr.byte_add(params.cq_off.ring_mask as _) };
503 let cqes = unsafe { cq_ptr.byte_add(params.cq_off.cqes as _) };
504
505 Self {
506 uring,
507 sq_ptr,
508 sq_size: sring_size,
509 cq: if is_shared_cq_sq {
510 None
511 } else {
512 Some((cq_ptr, cring_size))
513 },
514 sring_tail_ptr: sring_tail.cast(),
515 sring_mask_ptr: sring_mask.cast(),
516 sring_array_head_ptr: sring_array.cast(),
517 cring_head_ptr: cring_head.cast(),
518 cring_tail_ptr: cring_tail.cast(),
519 cring_mask_ptr: cring_mask.cast(),
520 cqes_ptr: cqes.cast(),
521 sqes: sqes.cast(),
522 sqes_size,
523 }
524 }
525
526 #[inline(always)]
527 const fn sring_tail(&self) -> &AtomicU32 {
528 unsafe { AtomicU32::from_ptr(self.sring_tail_ptr) }
529 }
530
531 #[inline(always)]
532 const fn cring_head(&self) -> &AtomicU32 {
533 unsafe { AtomicU32::from_ptr(self.cring_head_ptr) }
534 }
535
536 #[inline(always)]
537 const fn cring_tail(&self) -> &AtomicU32 {
538 unsafe { AtomicU32::from_ptr(self.cring_tail_ptr) }
539 }
540}
541
542struct CompletionQueueTaker {
543 context: Arc<IoUringContext>,
544}
545impl CompletionQueueTaker {
546 fn new(context: &Arc<IoUringContext>) -> Self {
547 Self {
548 context: context.clone(),
549 }
550 }
551
552 fn try_take(&self, process: impl FnOnce(&linux_io_uring::ffi::io_uring_cqe)) -> bool {
553 let head = unsafe { *self.context.cring_head_ptr };
554 if head
555 == self
556 .context
557 .cring_tail()
558 .load(core::sync::atomic::Ordering::Acquire)
559 {
560 return false;
562 }
563
564 let index = head & unsafe { *self.context.cring_mask_ptr };
565 process(unsafe { &*self.context.cqes_ptr.add(index as _) });
566 self.context
567 .cring_head()
568 .store(head + 1, core::sync::atomic::Ordering::Release);
569 true
570 }
571}
572
573#[derive(Clone)]
574struct SubmissionQueuePusher {
575 context: Arc<IoUringContext>,
576}
577impl SubmissionQueuePusher {
578 fn new(context: &Arc<IoUringContext>) -> Self {
579 Self {
580 context: context.clone(),
581 }
582 }
583
584 fn push(&self, describe_io: impl FnOnce(&mut linux_io_uring::ffi::io_uring_sqe)) {
585 let tail = unsafe { *self.context.sring_tail_ptr };
586 let index = tail & unsafe { *self.context.sring_mask_ptr };
587 describe_io(unsafe { &mut *self.context.sqes.add(index as _) });
588 unsafe {
589 core::ptr::write_volatile(self.context.sring_array_head_ptr.add(index as _), index);
590 }
591 self.context
592 .sring_tail()
593 .store(tail + 1, core::sync::atomic::Ordering::Release);
594 self.context
595 .uring
596 .enter(1, 0, 0, core::ptr::null_mut())
597 .expect("io_uring_enter");
598 }
599}
600
601pub struct IoReactorThread {
602 join_handle: Option<std::thread::JoinHandle<()>>,
603}
604impl Drop for IoReactorThread {
605 fn drop(&mut self) {
606 let Some(join_handle) = self.join_handle.take() else {
607 return;
609 };
610 let Some(reactor_handle) = IO_REACTOR_CURRENT_HANDLE.write().expect("poisoned").take()
611 else {
612 return;
614 };
615
616 reactor_handle.pusher.push(|sqe| {
617 sqe.opcode = linux_io_uring::ffi::IORING_OP_MSG_RING as _;
618 sqe.flags = linux_io_uring::ffi::IOSQE_CQE_SKIP_SUCCESS;
619 sqe.fd = reactor_handle.pusher.context.uring.as_raw_fd();
620 sqe.len = 0;
621 sqe.union1.off = 0;
622 sqe.union2.addr = 0; });
624 join_handle.join().expect("err in IoReactorThread");
625 }
626}
627impl IoReactorThread {
628 pub fn spawn() -> Self {
629 let uring = Arc::new(IoUringContext::new());
630 let cq_taker = CompletionQueueTaker::new(&uring);
631
632 *IO_REACTOR_CURRENT_HANDLE.write().expect("poisoned") = Some(IoReactorHandle {
633 pusher: SubmissionQueuePusher::new(&uring),
634 });
635
636 let join_handle = std::thread::Builder::new()
637 .name("Peridot NativeIO Reactor".into())
638 .spawn({
639 let uring = uring.clone();
640
641 move || {
642 let mut terminated = false;
643 while !terminated {
644 uring
645 .uring
646 .enter(
647 0,
648 1,
649 linux_io_uring::ffi::IORING_ENTER_GETEVENTS,
650 core::ptr::null_mut(),
651 )
652 .expect("uring enter for wait events");
653 cq_taker.try_take(|cqe| {
654 let res = cqe.res;
656 let user_data = cqe.user_data;
657
658 if user_data == 0 {
659 terminated = true;
661 return;
662 }
663
664 let queue_data: Box<ReadFutureQueueData> = unsafe {
665 Box::from_raw(core::ptr::with_exposed_provenance_mut(
666 user_data as _,
667 ))
668 };
669 match &*queue_data {
670 ReadFutureQueueData::Read { state, waker } => {
671 if let Some(st) = state.upgrade() {
672 st.set(if res < 0 {
673 AsyncNativeFileReadState::CompletedFailure(-res)
674 } else {
675 AsyncNativeFileReadState::CompletedSuccess(
676 res.cast_unsigned() as _,
677 )
678 });
679 }
680
681 waker.wake_by_ref();
682 }
683 }
684 });
685 }
686 }
687 })
688 .expect("Failed to spawn async fileio thread");
689
690 Self {
691 join_handle: Some(join_handle),
692 }
693 }
694}