diesel/pg/connection/
copy.rs
1use core::ffi;
2use std::io::BufRead;
3use std::io::Read;
4use std::io::Write;
5
6use super::raw::RawConnection;
7use super::result::PgResult;
8use crate::QueryResult;
9
10#[allow(missing_debug_implementations)] pub(in crate::pg) struct CopyFromSink<'conn> {
12 conn: &'conn mut RawConnection,
13}
14
15impl<'conn> CopyFromSink<'conn> {
16 pub(super) fn new(conn: &'conn mut RawConnection) -> Self {
17 Self { conn }
18 }
19
20 pub(super) fn finish(self, err: Option<String>) -> QueryResult<()> {
21 self.conn.finish_copy_from(err)
22 }
23}
24
25impl Write for CopyFromSink<'_> {
26 fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
27 self.conn
28 .put_copy_data(buf)
29 .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?;
30 Ok(buf.len())
31 }
32
33 fn flush(&mut self) -> std::io::Result<()> {
34 Ok(())
35 }
36}
37
38#[allow(missing_debug_implementations)] pub struct CopyToBuffer<'conn> {
40 conn: &'conn mut RawConnection,
41 ptr: *mut ffi::c_char,
42 offset: usize,
43 len: usize,
44 result: PgResult,
45}
46
47impl<'conn> CopyToBuffer<'conn> {
48 pub(super) fn new(conn: &'conn mut RawConnection, result: PgResult) -> Self {
49 Self {
50 conn,
51 ptr: std::ptr::null_mut(),
52 offset: 0,
53 len: 0,
54 result,
55 }
56 }
57
58 #[allow(unsafe_code)] pub(crate) fn data_slice(&self) -> &[u8] {
60 if !self.ptr.is_null() && self.offset < self.len {
61 let slice = unsafe { std::slice::from_raw_parts(self.ptr as *const u8, self.len - 1) };
62 &slice[self.offset..]
63 } else {
64 &[]
65 }
66 }
67
68 pub(crate) fn get_result(&self) -> &PgResult {
69 &self.result
70 }
71}
72
73impl Drop for CopyToBuffer<'_> {
74 #[allow(unsafe_code)] fn drop(&mut self) {
76 if !self.ptr.is_null() {
77 unsafe { pq_sys::PQfreemem(self.ptr as *mut ffi::c_void) };
78 self.ptr = std::ptr::null_mut();
79 }
80 }
81}
82
83impl Read for CopyToBuffer<'_> {
84 fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
85 let data = self.fill_buf()?;
86 let len = usize::min(buf.len(), data.len());
87 buf[..len].copy_from_slice(&data[..len]);
88 self.consume(len);
89 Ok(len)
90 }
91}
92
93impl BufRead for CopyToBuffer<'_> {
94 #[allow(unsafe_code)] fn fill_buf(&mut self) -> std::io::Result<&[u8]> {
96 if self.data_slice().is_empty() {
97 unsafe {
98 if !self.ptr.is_null() {
99 pq_sys::PQfreemem(self.ptr as *mut ffi::c_void);
100 self.ptr = std::ptr::null_mut();
101 }
102 let len =
103 pq_sys::PQgetCopyData(self.conn.internal_connection.as_ptr(), &mut self.ptr, 0);
104 match len {
105 len if len >= 0 => {
106 self.len = 1 + usize::try_from(len)
107 .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?
108 }
109 -1 => self.len = 0,
110 _ => {
111 let error = self.conn.last_error_message();
112 return Err(std::io::Error::new(std::io::ErrorKind::Other, error));
113 }
114 }
115 self.offset = 0;
116 }
117 }
118 Ok(self.data_slice())
119 }
120
121 fn consume(&mut self, amt: usize) {
122 self.offset = usize::min(self.len, self.offset + amt);
123 }
124}