peridot_native_io/
adapter.rs1use pin_project::pin_project;
4use std::io::{Read, Seek};
5
6pub struct RandomBlobReadSeekAdapter<R> {
7 inner: R,
8 pos: u64,
9}
10impl<R> RandomBlobReadSeekAdapter<R> {
11 #[inline(always)]
12 pub const fn new(inner: R) -> Self {
13 Self { inner, pos: 0 }
14 }
15}
16impl<R> Read for RandomBlobReadSeekAdapter<R>
17where
18 R: crate::RandomReadBlob,
19{
20 #[inline]
21 fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
22 let transferred = self.inner.read(self.pos, unsafe {
23 core::mem::transmute::<&mut [_], &mut [core::mem::MaybeUninit<_>]>(buf)
24 })?;
25 self.pos += transferred as u64;
26
27 Ok(transferred)
28 }
29}
30impl<R> Seek for RandomBlobReadSeekAdapter<R>
31where
32 R: crate::BlobMetadata,
33{
34 #[inline]
35 fn seek(&mut self, pos: std::io::SeekFrom) -> std::io::Result<u64> {
36 let new_pos = match pos {
37 std::io::SeekFrom::Start(x) => x,
38 std::io::SeekFrom::Current(x) => (self.pos as i64 + x) as _,
39 std::io::SeekFrom::End(x) => (self.inner.byte_length()? as i64 + x) as _,
40 };
41 self.pos = new_pos;
42 Ok(new_pos)
43 }
44}
45
46#[pin_project(project = RandomBlobAsyncReadSeekAdapterStateProjected)]
47enum RandomBlobAsyncReadSeekAdapterState<'r, R: crate::RandomReadBlobAsync + 'r> {
48 Idle,
49 Reading(#[pin] R::ReadFuture<'r, 'static>),
50 ReadingVec(#[pin] R::ReadVecFuture<'r, 'static, 'static>),
51}
52
53#[pin_project]
54pub struct RandomBlobAsyncReadSeekAdapter<'r, R: crate::RandomReadBlobAsync + 'r> {
55 inner: &'r R,
56 pos: u64,
57 #[pin]
58 state: RandomBlobAsyncReadSeekAdapterState<'r, R>,
59}
60impl<'r, R: crate::RandomReadBlobAsync + 'r> RandomBlobAsyncReadSeekAdapter<'r, R> {
61 #[inline(always)]
62 pub const fn new(inner: &'r R) -> Self {
63 Self::with_pos(inner, 0)
64 }
65
66 #[inline(always)]
67 pub const fn with_pos(inner: &'r R, pos: u64) -> Self {
68 Self {
69 inner,
70 pos,
71 state: RandomBlobAsyncReadSeekAdapterState::Idle,
72 }
73 }
74}
75impl<'r, R: crate::RandomReadBlobAsync + 'r> futures_io::AsyncRead
76 for RandomBlobAsyncReadSeekAdapter<'r, R>
77{
78 fn poll_read(
79 self: std::pin::Pin<&mut Self>,
80 cx: &mut std::task::Context<'_>,
81 buf: &mut [u8],
82 ) -> std::task::Poll<std::io::Result<usize>> {
83 let mut this = self.project();
84
85 loop {
86 match this.state.as_mut().project() {
87 RandomBlobAsyncReadSeekAdapterStateProjected::Idle => {
88 this.state.set(RandomBlobAsyncReadSeekAdapterState::Reading(
89 this.inner.read_async(*this.pos, unsafe {
90 core::mem::transmute::<&mut [_], &mut [core::mem::MaybeUninit<_>]>(buf)
91 }),
92 ));
93 }
94 RandomBlobAsyncReadSeekAdapterStateProjected::Reading(f) => {
95 let r = std::task::ready!(f.poll(cx));
96 this.state.set(RandomBlobAsyncReadSeekAdapterState::Idle);
97 break std::task::Poll::Ready(r);
98 }
99 RandomBlobAsyncReadSeekAdapterStateProjected::ReadingVec(_) => {
100 panic!("poll_read called but poll_read_vectored is ongoing");
101 }
102 }
103 }
104 }
105
106 fn poll_read_vectored(
107 self: std::pin::Pin<&mut Self>,
108 cx: &mut std::task::Context<'_>,
109 bufs: &mut [std::io::IoSliceMut<'_>],
110 ) -> std::task::Poll<std::io::Result<usize>> {
111 let mut this = self.project();
112
113 loop {
114 match this.state.as_mut().project() {
115 RandomBlobAsyncReadSeekAdapterStateProjected::Idle => {
116 this.state
117 .set(RandomBlobAsyncReadSeekAdapterState::ReadingVec(
118 this.inner.readv_async(*this.pos, unsafe {
119 core::mem::transmute::<&mut [_], &mut [_]>(&mut *bufs)
120 }),
121 ));
122 }
123 RandomBlobAsyncReadSeekAdapterStateProjected::ReadingVec(f) => {
124 let r = std::task::ready!(f.poll(cx));
125 this.state.set(RandomBlobAsyncReadSeekAdapterState::Idle);
126 break std::task::Poll::Ready(r);
127 }
128 RandomBlobAsyncReadSeekAdapterStateProjected::Reading(_) => {
129 panic!("poll_read_vectored called but poll_read is ongoing");
130 }
131 }
132 }
133 }
134}
135impl<'r, R: crate::RandomReadBlobAsync + 'r> futures_io::AsyncSeek
136 for RandomBlobAsyncReadSeekAdapter<'r, R>
137{
138 fn poll_seek(
139 self: std::pin::Pin<&mut Self>,
140 _cx: &mut std::task::Context<'_>,
141 pos: std::io::SeekFrom,
142 ) -> std::task::Poll<std::io::Result<u64>> {
143 let pref = self.project().pos;
145 *pref = match pos {
146 std::io::SeekFrom::Start(x) => x,
147 std::io::SeekFrom::Current(x) => pref.checked_add_signed(x).ok_or_else(|| {
148 std::io::Error::new(
149 std::io::ErrorKind::InvalidInput,
150 "resulting file pointer is out of range!",
151 )
152 })?,
153 std::io::SeekFrom::End(x) => pref.checked_add_signed(x).ok_or_else(|| {
154 std::io::Error::new(
155 std::io::ErrorKind::InvalidInput,
156 "resulting file pointer is out of range!",
157 )
158 })?,
159 };
160
161 std::task::Poll::Ready(Ok(*pref))
162 }
163}