diesel/pg/connection/
copy.rs

1use core::ffi;
2use std::io::BufRead;
3use std::io::Read;
4use std::io::Write;
5
6use super::raw::RawConnection;
7use super::result::PgResult;
8use crate::QueryResult;
9
10#[allow(missing_debug_implementations)] // `PgConnection` is not debug
11pub(in crate::pg) struct CopyFromSink<'conn> {
12    conn: &'conn mut RawConnection,
13}
14
15impl<'conn> CopyFromSink<'conn> {
16    pub(super) fn new(conn: &'conn mut RawConnection) -> Self {
17        Self { conn }
18    }
19
20    pub(super) fn finish(self, err: Option<String>) -> QueryResult<()> {
21        self.conn.finish_copy_from(err)
22    }
23}
24
25impl Write for CopyFromSink<'_> {
26    fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
27        self.conn
28            .put_copy_data(buf)
29            .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?;
30        Ok(buf.len())
31    }
32
33    fn flush(&mut self) -> std::io::Result<()> {
34        Ok(())
35    }
36}
37
38#[allow(missing_debug_implementations)] // `PgConnection` is not debug
39pub struct CopyToBuffer<'conn> {
40    conn: &'conn mut RawConnection,
41    ptr: *mut ffi::c_char,
42    offset: usize,
43    len: usize,
44    result: PgResult,
45}
46
47impl<'conn> CopyToBuffer<'conn> {
48    pub(super) fn new(conn: &'conn mut RawConnection, result: PgResult) -> Self {
49        Self {
50            conn,
51            ptr: std::ptr::null_mut(),
52            offset: 0,
53            len: 0,
54            result,
55        }
56    }
57
58    #[allow(unsafe_code)] // construct a slice from a raw ptr
59    pub(crate) fn data_slice(&self) -> &[u8] {
60        if !self.ptr.is_null() && self.offset < self.len {
61            let slice = unsafe { std::slice::from_raw_parts(self.ptr as *const u8, self.len - 1) };
62            &slice[self.offset..]
63        } else {
64            &[]
65        }
66    }
67
68    pub(crate) fn get_result(&self) -> &PgResult {
69        &self.result
70    }
71}
72
73impl Drop for CopyToBuffer<'_> {
74    #[allow(unsafe_code)] // ffi code
75    fn drop(&mut self) {
76        if !self.ptr.is_null() {
77            unsafe { pq_sys::PQfreemem(self.ptr as *mut ffi::c_void) };
78            self.ptr = std::ptr::null_mut();
79        }
80    }
81}
82
83impl Read for CopyToBuffer<'_> {
84    fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
85        let data = self.fill_buf()?;
86        let len = usize::min(buf.len(), data.len());
87        buf[..len].copy_from_slice(&data[..len]);
88        self.consume(len);
89        Ok(len)
90    }
91}
92
93impl BufRead for CopyToBuffer<'_> {
94    #[allow(unsafe_code)] // ffi code + ptr arithmetic
95    fn fill_buf(&mut self) -> std::io::Result<&[u8]> {
96        if self.data_slice().is_empty() {
97            unsafe {
98                if !self.ptr.is_null() {
99                    pq_sys::PQfreemem(self.ptr as *mut ffi::c_void);
100                    self.ptr = std::ptr::null_mut();
101                }
102                let len =
103                    pq_sys::PQgetCopyData(self.conn.internal_connection.as_ptr(), &mut self.ptr, 0);
104                match len {
105                    len if len >= 0 => {
106                        self.len = 1 + usize::try_from(len)
107                            .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?
108                    }
109                    -1 => self.len = 0,
110                    _ => {
111                        let error = self.conn.last_error_message();
112                        return Err(std::io::Error::new(std::io::ErrorKind::Other, error));
113                    }
114                }
115                self.offset = 0;
116            }
117        }
118        Ok(self.data_slice())
119    }
120
121    fn consume(&mut self, amt: usize) {
122        self.offset = usize::min(self.len, self.offset + amt);
123    }
124}