peridot_native_io/
adapter.rs

1//! Adapter structs for other crate's traits.
2
3use pin_project::pin_project;
4use std::io::{Read, Seek};
5
6pub struct RandomBlobReadSeekAdapter<R> {
7    inner: R,
8    pos: u64,
9}
10impl<R> RandomBlobReadSeekAdapter<R> {
11    #[inline(always)]
12    pub const fn new(inner: R) -> Self {
13        Self { inner, pos: 0 }
14    }
15}
16impl<R> Read for RandomBlobReadSeekAdapter<R>
17where
18    R: crate::RandomReadBlob,
19{
20    #[inline]
21    fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
22        let transferred = self.inner.read(self.pos, unsafe {
23            core::mem::transmute::<&mut [_], &mut [core::mem::MaybeUninit<_>]>(buf)
24        })?;
25        self.pos += transferred as u64;
26
27        Ok(transferred)
28    }
29}
30impl<R> Seek for RandomBlobReadSeekAdapter<R>
31where
32    R: crate::BlobMetadata,
33{
34    #[inline]
35    fn seek(&mut self, pos: std::io::SeekFrom) -> std::io::Result<u64> {
36        let new_pos = match pos {
37            std::io::SeekFrom::Start(x) => x,
38            std::io::SeekFrom::Current(x) => (self.pos as i64 + x) as _,
39            std::io::SeekFrom::End(x) => (self.inner.byte_length()? as i64 + x) as _,
40        };
41        self.pos = new_pos;
42        Ok(new_pos)
43    }
44}
45
46#[pin_project(project = RandomBlobAsyncReadSeekAdapterStateProjected)]
47enum RandomBlobAsyncReadSeekAdapterState<'r, R: crate::RandomReadBlobAsync + 'r> {
48    Idle,
49    Reading(#[pin] R::ReadFuture<'r, 'static>),
50    ReadingVec(#[pin] R::ReadVecFuture<'r, 'static, 'static>),
51}
52
53#[pin_project]
54pub struct RandomBlobAsyncReadSeekAdapter<'r, R: crate::RandomReadBlobAsync + 'r> {
55    inner: &'r R,
56    pos: u64,
57    #[pin]
58    state: RandomBlobAsyncReadSeekAdapterState<'r, R>,
59}
60impl<'r, R: crate::RandomReadBlobAsync + 'r> RandomBlobAsyncReadSeekAdapter<'r, R> {
61    #[inline(always)]
62    pub const fn new(inner: &'r R) -> Self {
63        Self::with_pos(inner, 0)
64    }
65
66    #[inline(always)]
67    pub const fn with_pos(inner: &'r R, pos: u64) -> Self {
68        Self {
69            inner,
70            pos,
71            state: RandomBlobAsyncReadSeekAdapterState::Idle,
72        }
73    }
74}
75impl<'r, R: crate::RandomReadBlobAsync + 'r> futures_io::AsyncRead
76    for RandomBlobAsyncReadSeekAdapter<'r, R>
77{
78    fn poll_read(
79        self: std::pin::Pin<&mut Self>,
80        cx: &mut std::task::Context<'_>,
81        buf: &mut [u8],
82    ) -> std::task::Poll<std::io::Result<usize>> {
83        let mut this = self.project();
84
85        loop {
86            match this.state.as_mut().project() {
87                RandomBlobAsyncReadSeekAdapterStateProjected::Idle => {
88                    this.state.set(RandomBlobAsyncReadSeekAdapterState::Reading(
89                        this.inner.read_async(*this.pos, unsafe {
90                            core::mem::transmute::<&mut [_], &mut [core::mem::MaybeUninit<_>]>(buf)
91                        }),
92                    ));
93                }
94                RandomBlobAsyncReadSeekAdapterStateProjected::Reading(f) => {
95                    let r = std::task::ready!(f.poll(cx));
96                    this.state.set(RandomBlobAsyncReadSeekAdapterState::Idle);
97                    break std::task::Poll::Ready(r);
98                }
99                RandomBlobAsyncReadSeekAdapterStateProjected::ReadingVec(_) => {
100                    panic!("poll_read called but poll_read_vectored is ongoing");
101                }
102            }
103        }
104    }
105
106    fn poll_read_vectored(
107        self: std::pin::Pin<&mut Self>,
108        cx: &mut std::task::Context<'_>,
109        bufs: &mut [std::io::IoSliceMut<'_>],
110    ) -> std::task::Poll<std::io::Result<usize>> {
111        let mut this = self.project();
112
113        loop {
114            match this.state.as_mut().project() {
115                RandomBlobAsyncReadSeekAdapterStateProjected::Idle => {
116                    this.state
117                        .set(RandomBlobAsyncReadSeekAdapterState::ReadingVec(
118                            this.inner.readv_async(*this.pos, unsafe {
119                                core::mem::transmute::<&mut [_], &mut [_]>(&mut *bufs)
120                            }),
121                        ));
122                }
123                RandomBlobAsyncReadSeekAdapterStateProjected::ReadingVec(f) => {
124                    let r = std::task::ready!(f.poll(cx));
125                    this.state.set(RandomBlobAsyncReadSeekAdapterState::Idle);
126                    break std::task::Poll::Ready(r);
127                }
128                RandomBlobAsyncReadSeekAdapterStateProjected::Reading(_) => {
129                    panic!("poll_read_vectored called but poll_read is ongoing");
130                }
131            }
132        }
133    }
134}
135impl<'r, R: crate::RandomReadBlobAsync + 'r> futures_io::AsyncSeek
136    for RandomBlobAsyncReadSeekAdapter<'r, R>
137{
138    fn poll_seek(
139        self: std::pin::Pin<&mut Self>,
140        _cx: &mut std::task::Context<'_>,
141        pos: std::io::SeekFrom,
142    ) -> std::task::Poll<std::io::Result<u64>> {
143        // これは同期的にできる
144        let pref = self.project().pos;
145        *pref = match pos {
146            std::io::SeekFrom::Start(x) => x,
147            std::io::SeekFrom::Current(x) => pref.checked_add_signed(x).ok_or_else(|| {
148                std::io::Error::new(
149                    std::io::ErrorKind::InvalidInput,
150                    "resulting file pointer is out of range!",
151                )
152            })?,
153            std::io::SeekFrom::End(x) => pref.checked_add_signed(x).ok_or_else(|| {
154                std::io::Error::new(
155                    std::io::ErrorKind::InvalidInput,
156                    "resulting file pointer is out of range!",
157                )
158            })?,
159        };
160
161        std::task::Poll::Ready(Ok(*pref))
162    }
163}