diesel/pg/connection/
copy.rs

1use core::ffi;
2use std::io::BufRead;
3use std::io::Read;
4use std::io::Write;
5
6use super::raw::RawConnection;
7use super::result::PgResult;
8use crate::QueryResult;
9
10#[allow(missing_debug_implementations)] // `PgConnection` is not debug
11pub struct CopyFromSink<'conn> {
12    conn: &'conn mut RawConnection,
13}
14
15impl<'conn> CopyFromSink<'conn> {
16    pub(super) fn new(conn: &'conn mut RawConnection) -> Self {
17        Self { conn }
18    }
19
20    pub(super) fn finish(self, err: Option<String>) -> QueryResult<()> {
21        self.conn.finish_copy_from(err)
22    }
23}
24
25impl Write for CopyFromSink<'_> {
26    fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
27        self.conn
28            .put_copy_data(buf)
29            .map_err(std::io::Error::other)?;
30        Ok(buf.len())
31    }
32
33    fn flush(&mut self) -> std::io::Result<()> {
34        Ok(())
35    }
36}
37
38#[allow(missing_debug_implementations)] // `PgConnection` is not debug
39pub struct CopyToBuffer<'conn> {
40    conn: &'conn mut RawConnection,
41    ptr: *mut ffi::c_char,
42    offset: usize,
43    len: usize,
44    result: PgResult,
45}
46
47impl<'conn> CopyToBuffer<'conn> {
48    pub(super) fn new(conn: &'conn mut RawConnection, result: PgResult) -> Self {
49        Self {
50            conn,
51            ptr: std::ptr::null_mut(),
52            offset: 0,
53            len: 0,
54            result,
55        }
56    }
57
58    #[allow(unsafe_code)] // construct a slice from a raw ptr
59    pub(crate) fn data_slice(&self) -> &[u8] {
60        if !self.ptr.is_null() && self.offset < self.len {
61            let slice = unsafe { std::slice::from_raw_parts(self.ptr as *const u8, self.len - 1) };
62            &slice[self.offset..]
63        } else {
64            &[]
65        }
66    }
67
68    pub(crate) fn get_result(&self) -> &PgResult {
69        &self.result
70    }
71}
72
73impl Drop for CopyToBuffer<'_> {
74    #[allow(unsafe_code)] // ffi code
75    fn drop(&mut self) {
76        if !self.ptr.is_null() {
77            unsafe { pq_sys::PQfreemem(self.ptr as *mut ffi::c_void) };
78            self.ptr = std::ptr::null_mut();
79        }
80    }
81}
82
83impl Read for CopyToBuffer<'_> {
84    fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
85        let data = self.fill_buf()?;
86        let len = usize::min(buf.len(), data.len());
87        buf[..len].copy_from_slice(&data[..len]);
88        self.consume(len);
89        Ok(len)
90    }
91}
92
93impl BufRead for CopyToBuffer<'_> {
94    #[allow(unsafe_code)] // ffi code + ptr arithmetic
95    fn fill_buf(&mut self) -> std::io::Result<&[u8]> {
96        if self.data_slice().is_empty() {
97            unsafe {
98                if !self.ptr.is_null() {
99                    pq_sys::PQfreemem(self.ptr as *mut ffi::c_void);
100                    self.ptr = std::ptr::null_mut();
101                }
102                let len =
103                    pq_sys::PQgetCopyData(self.conn.internal_connection.as_ptr(), &mut self.ptr, 0);
104                match len {
105                    len if len >= 0 => {
106                        self.len = 1 + usize::try_from(len).map_err(std::io::Error::other)?
107                    }
108                    -1 => self.len = 0,
109                    _ => {
110                        let error = self.conn.last_error_message();
111                        return Err(std::io::Error::other(error));
112                    }
113                }
114                self.offset = 0;
115            }
116        }
117        Ok(self.data_slice())
118    }
119
120    fn consume(&mut self, amt: usize) {
121        self.offset = usize::min(self.len, self.offset + amt);
122    }
123}