peridot_native_io/
buffered.rs

1use core::ops::Range;
2use std::io::{IoSliceMut, Read};
3
4pub struct BufferedRandomBlobReader<R> {
5    inner: R,
6    buf: Vec<u8>,
7    buffered_range: Range<u64>,
8}
9impl<R> BufferedRandomBlobReader<R> {
10    const BUFFER_SIZE: usize = 4096;
11
12    pub fn new(inner: R) -> Self {
13        Self {
14            inner,
15            buf: Vec::with_capacity(Self::BUFFER_SIZE),
16            buffered_range: 0..0,
17        }
18    }
19
20    #[inline(always)]
21    pub const fn inner_ref(&self) -> &R {
22        &self.inner
23    }
24}
25impl<R> BufferedRandomBlobReader<R>
26where
27    R: crate::RandomReadBlob,
28{
29    pub fn hint_buffering(&mut self, start: u64) -> std::io::Result<()> {
30        if !self.buffered_range.is_empty() && self.buffered_range.contains(&start) {
31            // already buffered
32            return Ok(());
33        }
34
35        self.fill_buf_at(start)?;
36        Ok(())
37    }
38
39    fn fill_buf_at(&mut self, start: u64) -> std::io::Result<usize> {
40        let bufread = self.inner.read(start, self.buf.spare_capacity_mut())?;
41        self.buffered_range = start..start + bufread as u64;
42        Ok(bufread)
43    }
44
45    pub fn read_at(
46        &mut self,
47        pos: u64,
48        buf: &mut [core::mem::MaybeUninit<u8>],
49    ) -> std::io::Result<usize> {
50        self.hint_buffering(pos)?;
51
52        let offs = pos - self.buffered_range.start;
53        let fill_size = ((self.buffered_range.end - pos) as usize).min(buf.len());
54        unsafe {
55            core::ptr::copy_nonoverlapping(
56                self.buf.spare_capacity_mut().as_ptr().add(offs as usize),
57                buf.as_mut_ptr(),
58                fill_size,
59            );
60        }
61
62        Ok(fill_size)
63    }
64
65    pub fn readv_at(&mut self, pos: u64, iovecs: &mut [IoSliceMut]) -> std::io::Result<usize> {
66        self.hint_buffering(pos)?;
67
68        let offs = pos - self.buffered_range.start;
69        let max_size = self.buffered_range.end - pos;
70        unsafe {
71            core::slice::from_raw_parts(self.buf.as_ptr().add(offs as _), max_size as _)
72                .read_vectored(iovecs)
73        }
74    }
75
76    pub fn read_byte_at(&mut self, pos: u64) -> std::io::Result<u8> {
77        self.hint_buffering(pos)?;
78        Ok(self.buf[(pos - self.buffered_range.start) as usize])
79    }
80
81    pub fn read_exact_at(
82        &mut self,
83        pos: u64,
84        buf: &mut [core::mem::MaybeUninit<u8>],
85    ) -> std::io::Result<()> {
86        if self.buffered_range.contains(&pos)
87            && self.buffered_range.contains(&(pos + buf.len() as u64))
88        {
89            // perfectly contained
90            let offs = pos - self.buffered_range.start;
91            unsafe {
92                core::ptr::copy_nonoverlapping(
93                    self.buf.spare_capacity_mut().as_ptr().add(offs as usize),
94                    buf.as_mut_ptr(),
95                    buf.len(),
96                );
97            }
98
99            return Ok(());
100        }
101
102        // simple read from blob
103        self.inner.read_exact(pos, buf)
104    }
105
106    pub fn readv_all_at(
107        &mut self,
108        mut pos: u64,
109        mut iovecs: &mut [IoSliceMut],
110    ) -> std::io::Result<()> {
111        IoSliceMut::advance_slices(&mut iovecs, 0);
112        if iovecs.is_empty() {
113            // no iovecs or empty
114            return Ok(());
115        }
116
117        if self.buffered_range.contains(&pos)
118            && self
119                .buffered_range
120                .contains(&(pos + iovecs[0].len() as u64))
121        {
122            // can fill from buffer at least one slice
123            let offs = pos - self.buffered_range.start;
124            let max_size = self.buffered_range.end - pos;
125            let filled = unsafe {
126                core::slice::from_raw_parts(self.buf.as_ptr().add(offs as _), max_size as _)
127                    .read_vectored(iovecs)?
128            };
129            IoSliceMut::advance_slices(&mut iovecs, filled);
130            pos += filled as u64;
131        }
132
133        self.inner.readv_all(pos, iovecs)
134    }
135}
136impl<R> BufferedRandomBlobReader<R>
137where
138    R: crate::RandomReadBlobAsync,
139{
140    pub async fn hint_buffering_async(&mut self, start: u64) -> std::io::Result<()> {
141        if !self.buffered_range.is_empty() && self.buffered_range.contains(&start) {
142            // already buffered
143            return Ok(());
144        }
145
146        self.fill_buf_at_async(start).await?;
147        Ok(())
148    }
149
150    async fn fill_buf_at_async(&mut self, start: u64) -> std::io::Result<usize> {
151        let bufread = self
152            .inner
153            .read_async(start, self.buf.spare_capacity_mut())
154            .await?;
155        self.buffered_range = start..start + bufread as u64;
156        Ok(bufread)
157    }
158
159    pub async fn read_at_async(
160        &mut self,
161        pos: u64,
162        buf: &mut [core::mem::MaybeUninit<u8>],
163    ) -> std::io::Result<usize> {
164        self.hint_buffering_async(pos).await?;
165
166        let offs = pos - self.buffered_range.start;
167        let fill_size = ((self.buffered_range.end - pos) as usize).min(buf.len());
168        unsafe {
169            core::ptr::copy_nonoverlapping(
170                self.buf.spare_capacity_mut().as_ptr().add(offs as usize),
171                buf.as_mut_ptr(),
172                fill_size,
173            );
174        }
175
176        Ok(fill_size)
177    }
178
179    pub async fn readv_at_async(
180        &mut self,
181        pos: u64,
182        iovecs: &mut [IoSliceMut<'_>],
183    ) -> std::io::Result<usize> {
184        self.hint_buffering_async(pos).await?;
185
186        let offs = pos - self.buffered_range.start;
187        let max_size = self.buffered_range.end - pos;
188        unsafe {
189            core::slice::from_raw_parts(self.buf.as_ptr().add(offs as _), max_size as _)
190                .read_vectored(iovecs)
191        }
192    }
193
194    pub async fn read_byte_at_async(&mut self, pos: u64) -> std::io::Result<u8> {
195        self.hint_buffering_async(pos).await?;
196        Ok(self.buf[(pos - self.buffered_range.start) as usize])
197    }
198
199    pub async fn read_exact_at_async(
200        &mut self,
201        pos: u64,
202        buf: &mut [core::mem::MaybeUninit<u8>],
203    ) -> std::io::Result<()> {
204        if self.buffered_range.contains(&pos)
205            && self.buffered_range.contains(&(pos + buf.len() as u64))
206        {
207            // perfectly contained
208            let offs = pos - self.buffered_range.start;
209            unsafe {
210                core::ptr::copy_nonoverlapping(
211                    self.buf.spare_capacity_mut().as_ptr().add(offs as usize),
212                    buf.as_mut_ptr(),
213                    buf.len(),
214                );
215            }
216
217            return Ok(());
218        }
219
220        // simple read from blob
221        self.inner.read_exact_async(pos, buf).await
222    }
223
224    pub async fn readv_all_at_async(
225        &mut self,
226        mut pos: u64,
227        mut iovecs: &mut [IoSliceMut<'_>],
228    ) -> std::io::Result<()> {
229        IoSliceMut::advance_slices(&mut iovecs, 0);
230        if iovecs.is_empty() {
231            // no iovecs or empty
232            return Ok(());
233        }
234
235        if self.buffered_range.contains(&pos)
236            && self
237                .buffered_range
238                .contains(&(pos + iovecs[0].len() as u64))
239        {
240            // can fill from buffer at least one slice
241            let offs = pos - self.buffered_range.start;
242            let max_size = self.buffered_range.end - pos;
243            let filled = unsafe {
244                core::slice::from_raw_parts(self.buf.as_ptr().add(offs as _), max_size as _)
245                    .read_vectored(iovecs)?
246            };
247            IoSliceMut::advance_slices(&mut iovecs, filled);
248            pos += filled as u64;
249        }
250
251        self.inner.readv_all_async(pos, iovecs).await
252    }
253}