diesel/pg/connection/
copy.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
use core::ffi;
use std::io::BufRead;
use std::io::Read;
use std::io::Write;

use super::raw::RawConnection;
use super::result::PgResult;
use crate::QueryResult;

#[allow(missing_debug_implementations)] // `PgConnection` is not debug
pub(in crate::pg) struct CopyFromSink<'conn> {
    conn: &'conn mut RawConnection,
}

impl<'conn> CopyFromSink<'conn> {
    pub(super) fn new(conn: &'conn mut RawConnection) -> Self {
        Self { conn }
    }

    pub(super) fn finish(self, err: Option<String>) -> QueryResult<()> {
        self.conn.finish_copy_from(err)
    }
}

impl Write for CopyFromSink<'_> {
    fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
        self.conn
            .put_copy_data(buf)
            .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?;
        Ok(buf.len())
    }

    fn flush(&mut self) -> std::io::Result<()> {
        Ok(())
    }
}

#[allow(missing_debug_implementations)] // `PgConnection` is not debug
pub struct CopyToBuffer<'conn> {
    conn: &'conn mut RawConnection,
    ptr: *mut ffi::c_char,
    offset: usize,
    len: usize,
    result: PgResult,
}

impl<'conn> CopyToBuffer<'conn> {
    pub(super) fn new(conn: &'conn mut RawConnection, result: PgResult) -> Self {
        Self {
            conn,
            ptr: std::ptr::null_mut(),
            offset: 0,
            len: 0,
            result,
        }
    }

    #[allow(unsafe_code)] // construct a slice from a raw ptr
    pub(crate) fn data_slice(&self) -> &[u8] {
        if !self.ptr.is_null() && self.offset < self.len {
            let slice = unsafe { std::slice::from_raw_parts(self.ptr as *const u8, self.len - 1) };
            &slice[self.offset..]
        } else {
            &[]
        }
    }

    pub(crate) fn get_result(&self) -> &PgResult {
        &self.result
    }
}

impl Drop for CopyToBuffer<'_> {
    #[allow(unsafe_code)] // ffi code
    fn drop(&mut self) {
        if !self.ptr.is_null() {
            unsafe { pq_sys::PQfreemem(self.ptr as *mut ffi::c_void) };
            self.ptr = std::ptr::null_mut();
        }
    }
}

impl Read for CopyToBuffer<'_> {
    fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
        let data = self.fill_buf()?;
        let len = usize::min(buf.len(), data.len());
        buf[..len].copy_from_slice(&data[..len]);
        self.consume(len);
        Ok(len)
    }
}

impl BufRead for CopyToBuffer<'_> {
    #[allow(unsafe_code)] // ffi code + ptr arithmetic
    fn fill_buf(&mut self) -> std::io::Result<&[u8]> {
        if self.data_slice().is_empty() {
            unsafe {
                if !self.ptr.is_null() {
                    pq_sys::PQfreemem(self.ptr as *mut ffi::c_void);
                    self.ptr = std::ptr::null_mut();
                }
                let len =
                    pq_sys::PQgetCopyData(self.conn.internal_connection.as_ptr(), &mut self.ptr, 0);
                match len {
                    len if len >= 0 => {
                        self.len = 1 + usize::try_from(len)
                            .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?
                    }
                    -1 => self.len = 0,
                    _ => {
                        let error = self.conn.last_error_message();
                        return Err(std::io::Error::new(std::io::ErrorKind::Other, error));
                    }
                }
                self.offset = 0;
            }
        }
        Ok(self.data_slice())
    }

    fn consume(&mut self, amt: usize) {
        self.offset = usize::min(self.len, self.offset + amt);
    }
}