1use super::private::QueryFragmentHelper;
2use super::raw::RawConnection;
3use super::result::PgResult;
4use super::row::PgRow;
5use std::rc::Rc;
6
7#[allow(missing_debug_implementations)]
8pub struct Cursor {
9 current_row: usize,
10 db_result: Rc<PgResult>,
11}
12
13impl Cursor {
14 pub(super) fn new(result: PgResult, conn: &mut RawConnection) -> crate::QueryResult<Cursor> {
15 let next_res = conn.get_next_result()?;
16 debug_assert!(next_res.is_none());
17 Ok(Self {
18 current_row: 0,
19 db_result: Rc::new(result),
20 })
21 }
22}
23
24impl ExactSizeIterator for Cursor {
25 fn len(&self) -> usize {
26 self.db_result.num_rows() - self.current_row
27 }
28}
29
30impl Iterator for Cursor {
31 type Item = crate::QueryResult<PgRow>;
32
33 fn next(&mut self) -> Option<Self::Item> {
34 if self.current_row < self.db_result.num_rows() {
35 let row = self.db_result.clone().get_row(self.current_row);
36 self.current_row += 1;
37 Some(Ok(row))
38 } else {
39 None
40 }
41 }
42
43 fn nth(&mut self, n: usize) -> Option<Self::Item> {
44 self.current_row = (self.current_row + n).min(self.db_result.num_rows());
45 self.next()
46 }
47
48 fn size_hint(&self) -> (usize, Option<usize>) {
49 let len = self.len();
50 (len, Some(len))
51 }
52
53 fn count(self) -> usize
54 where
55 Self: Sized,
56 {
57 self.len()
58 }
59}
60
61#[allow(missing_debug_implementations)]
64pub struct RowByRowCursor<'conn, 'query> {
65 first_row: bool,
66 db_result: Rc<PgResult>,
67 conn: &'conn mut super::ConnectionAndTransactionManager,
68 query: Box<dyn QueryFragmentHelper<crate::result::Error> + 'query>,
69}
70
71impl<'conn, 'query> RowByRowCursor<'conn, 'query> {
72 pub(super) fn new(
73 db_result: PgResult,
74 conn: &'conn mut super::ConnectionAndTransactionManager,
75 query: Box<dyn QueryFragmentHelper<crate::result::Error> + 'query>,
76 ) -> Self {
77 RowByRowCursor {
78 first_row: true,
79 db_result: Rc::new(db_result),
80 conn,
81 query,
82 }
83 }
84}
85
86impl Iterator for RowByRowCursor<'_, '_> {
87 type Item = crate::QueryResult<PgRow>;
88
89 fn next(&mut self) -> Option<Self::Item> {
90 if !self.first_row {
91 let get_next_result = super::update_transaction_manager_status(
92 self.conn.raw_connection.get_next_result(),
93 self.conn,
94 &|callback| self.query.instrumentation(callback),
95 false,
96 );
97 match get_next_result {
98 Ok(Some(res)) => {
99 if let Some(old_res) = Rc::get_mut(&mut self.db_result) {
101 *old_res = res;
102 } else {
103 self.db_result = Rc::new(res);
104 }
105 }
106 Ok(None) => {
107 return None;
108 }
109 Err(e) => return Some(Err(e)),
110 }
111 }
112 if self.db_result.num_rows() > 0 {
114 debug_assert_eq!(self.db_result.num_rows(), 1);
115 self.first_row = false;
116 Some(Ok(self.db_result.clone().get_row(0)))
117 } else {
118 None
119 }
120 }
121}
122
123impl Drop for RowByRowCursor<'_, '_> {
124 fn drop(&mut self) {
125 loop {
126 let res = super::update_transaction_manager_status(
127 self.conn.raw_connection.get_next_result(),
128 self.conn,
129 &|callback| self.query.instrumentation(callback),
130 false,
131 );
132 if matches!(res, Err(_) | Ok(None)) {
133 if res.is_ok() {
135 self.query.instrumentation(&mut |query| {
136 self.conn.instrumentation.on_connection_event(
137 crate::connection::InstrumentationEvent::FinishQuery {
138 query,
139 error: None,
140 },
141 );
142 });
143 }
144 break;
145 }
146 }
147 }
148}
149
150#[cfg(test)]
151mod tests {
152 use crate::connection::DefaultLoadingMode;
153 use crate::pg::PgRowByRowLoadingMode;
154
155 #[diesel_test_helper::test]
156 fn fun_with_row_iters() {
157 crate::table! {
158 #[allow(unused_parens)]
159 users(id) {
160 id -> Integer,
161 name -> Text,
162 }
163 }
164
165 use crate::connection::LoadConnection;
166 use crate::deserialize::{FromSql, FromSqlRow};
167 use crate::pg::Pg;
168 use crate::prelude::*;
169 use crate::row::{Field, Row};
170 use crate::sql_types;
171
172 let conn = &mut crate::test_helpers::connection();
173
174 crate::sql_query(
175 "CREATE TABLE IF NOT EXISTS users(id INTEGER PRIMARY KEY, name TEXT NOT NULL);",
176 )
177 .execute(conn)
178 .unwrap();
179
180 crate::insert_into(users::table)
181 .values(vec![
182 (users::id.eq(1), users::name.eq("Sean")),
183 (users::id.eq(2), users::name.eq("Tess")),
184 ])
185 .execute(conn)
186 .unwrap();
187
188 let query = users::table.select((users::id, users::name));
189
190 let expected = vec![(1, String::from("Sean")), (2, String::from("Tess"))];
191
192 let row_iter = LoadConnection::<DefaultLoadingMode>::load(conn, query).unwrap();
193 for (row, expected) in row_iter.zip(&expected) {
194 let row = row.unwrap();
195
196 let deserialized = <(i32, String) as FromSqlRow<
197 (sql_types::Integer, sql_types::Text),
198 _,
199 >>::build_from_row(&row)
200 .unwrap();
201
202 assert_eq!(&deserialized, expected);
203 }
204
205 {
206 let collected_rows = LoadConnection::<DefaultLoadingMode>::load(conn, query)
207 .unwrap()
208 .collect::<Vec<_>>();
209
210 for (row, expected) in collected_rows.iter().zip(&expected) {
211 let deserialized = row
212 .as_ref()
213 .map(|row| {
214 <(i32, String) as FromSqlRow<
215 (sql_types::Integer, sql_types::Text),
216 _,
217 >>::build_from_row(row).unwrap()
218 })
219 .unwrap();
220
221 assert_eq!(&deserialized, expected);
222 }
223 }
224
225 let mut row_iter = LoadConnection::<DefaultLoadingMode>::load(conn, query).unwrap();
226
227 let first_row = row_iter.next().unwrap().unwrap();
228 let first_fields = (first_row.get(0).unwrap(), first_row.get(1).unwrap());
229 let first_values = (first_fields.0.value(), first_fields.1.value());
230
231 let second_row = row_iter.next().unwrap().unwrap();
232 let second_fields = (second_row.get(0).unwrap(), second_row.get(1).unwrap());
233 let second_values = (second_fields.0.value(), second_fields.1.value());
234
235 assert!(row_iter.next().is_none());
236
237 assert_eq!(
238 <i32 as FromSql<sql_types::Integer, Pg>>::from_nullable_sql(first_values.0).unwrap(),
239 expected[0].0
240 );
241 assert_eq!(
242 <String as FromSql<sql_types::Text, Pg>>::from_nullable_sql(first_values.1).unwrap(),
243 expected[0].1
244 );
245
246 assert_eq!(
247 <i32 as FromSql<sql_types::Integer, Pg>>::from_nullable_sql(second_values.0).unwrap(),
248 expected[1].0
249 );
250 assert_eq!(
251 <String as FromSql<sql_types::Text, Pg>>::from_nullable_sql(second_values.1).unwrap(),
252 expected[1].1
253 );
254
255 let first_fields = (first_row.get(0).unwrap(), first_row.get(1).unwrap());
256 let first_values = (first_fields.0.value(), first_fields.1.value());
257
258 assert_eq!(
259 <i32 as FromSql<sql_types::Integer, Pg>>::from_nullable_sql(first_values.0).unwrap(),
260 expected[0].0
261 );
262 assert_eq!(
263 <String as FromSql<sql_types::Text, Pg>>::from_nullable_sql(first_values.1).unwrap(),
264 expected[0].1
265 );
266 }
267
268 #[diesel_test_helper::test]
269 fn loading_modes_return_the_same_result() {
270 use crate::prelude::*;
271
272 crate::table! {
273 #[allow(unused_parens)]
274 users(id) {
275 id -> Integer,
276 name -> Text,
277 }
278 }
279
280 let conn = &mut crate::test_helpers::connection();
281
282 crate::sql_query(
283 "CREATE TABLE IF NOT EXISTS users(id INTEGER PRIMARY KEY, name TEXT NOT NULL);",
284 )
285 .execute(conn)
286 .unwrap();
287
288 crate::insert_into(users::table)
289 .values(vec![
290 (users::id.eq(1), users::name.eq("Sean")),
291 (users::id.eq(2), users::name.eq("Tess")),
292 ])
293 .execute(conn)
294 .unwrap();
295
296 let users_by_default_mode = users::table
297 .select(users::name)
298 .load_iter::<String, DefaultLoadingMode>(conn)
299 .unwrap()
300 .collect::<QueryResult<Vec<_>>>()
301 .unwrap();
302 let users_row_by_row = users::table
303 .select(users::name)
304 .load_iter::<String, PgRowByRowLoadingMode>(conn)
305 .unwrap()
306 .collect::<QueryResult<Vec<_>>>()
307 .unwrap();
308 assert_eq!(users_by_default_mode, users_row_by_row);
309 assert_eq!(users_by_default_mode, vec!["Sean", "Tess"]);
310 }
311
312 #[diesel_test_helper::test]
313 fn fun_with_row_iters_row_by_row() {
314 crate::table! {
315 #[allow(unused_parens)]
316 users(id) {
317 id -> Integer,
318 name -> Text,
319 }
320 }
321
322 use crate::connection::LoadConnection;
323 use crate::deserialize::{FromSql, FromSqlRow};
324 use crate::pg::Pg;
325 use crate::prelude::*;
326 use crate::row::{Field, Row};
327 use crate::sql_types;
328
329 let conn = &mut crate::test_helpers::connection();
330
331 crate::sql_query(
332 "CREATE TABLE IF NOT EXISTS users(id INTEGER PRIMARY KEY, name TEXT NOT NULL);",
333 )
334 .execute(conn)
335 .unwrap();
336
337 crate::insert_into(users::table)
338 .values(vec![
339 (users::id.eq(1), users::name.eq("Sean")),
340 (users::id.eq(2), users::name.eq("Tess")),
341 ])
342 .execute(conn)
343 .unwrap();
344
345 let query = users::table.select((users::id, users::name));
346
347 let expected = vec![(1, String::from("Sean")), (2, String::from("Tess"))];
348
349 let row_iter = LoadConnection::<PgRowByRowLoadingMode>::load(conn, query).unwrap();
350 for (row, expected) in row_iter.zip(&expected) {
351 let row = row.unwrap();
352
353 let deserialized = <(i32, String) as FromSqlRow<
354 (sql_types::Integer, sql_types::Text),
355 _,
356 >>::build_from_row(&row)
357 .unwrap();
358
359 assert_eq!(&deserialized, expected);
360 }
361
362 {
363 let collected_rows = LoadConnection::<PgRowByRowLoadingMode>::load(conn, query)
364 .unwrap()
365 .collect::<Vec<_>>();
366
367 for (row, expected) in collected_rows.iter().zip(&expected) {
368 let deserialized = row
369 .as_ref()
370 .map(|row| {
371 <(i32, String) as FromSqlRow<
372 (sql_types::Integer, sql_types::Text),
373 _,
374 >>::build_from_row(row).unwrap()
375 })
376 .unwrap();
377
378 assert_eq!(&deserialized, expected);
379 }
380 }
381
382 let mut row_iter = LoadConnection::<PgRowByRowLoadingMode>::load(conn, query).unwrap();
383
384 let first_row = row_iter.next().unwrap().unwrap();
385 let first_fields = (first_row.get(0).unwrap(), first_row.get(1).unwrap());
386 let first_values = (first_fields.0.value(), first_fields.1.value());
387
388 let second_row = row_iter.next().unwrap().unwrap();
389 let second_fields = (second_row.get(0).unwrap(), second_row.get(1).unwrap());
390 let second_values = (second_fields.0.value(), second_fields.1.value());
391
392 assert!(row_iter.next().is_none());
393
394 assert_eq!(
395 <i32 as FromSql<sql_types::Integer, Pg>>::from_nullable_sql(first_values.0).unwrap(),
396 expected[0].0
397 );
398 assert_eq!(
399 <String as FromSql<sql_types::Text, Pg>>::from_nullable_sql(first_values.1).unwrap(),
400 expected[0].1
401 );
402
403 assert_eq!(
404 <i32 as FromSql<sql_types::Integer, Pg>>::from_nullable_sql(second_values.0).unwrap(),
405 expected[1].0
406 );
407 assert_eq!(
408 <String as FromSql<sql_types::Text, Pg>>::from_nullable_sql(second_values.1).unwrap(),
409 expected[1].1
410 );
411
412 let first_fields = (first_row.get(0).unwrap(), first_row.get(1).unwrap());
413 let first_values = (first_fields.0.value(), first_fields.1.value());
414
415 assert_eq!(
416 <i32 as FromSql<sql_types::Integer, Pg>>::from_nullable_sql(first_values.0).unwrap(),
417 expected[0].0
418 );
419 assert_eq!(
420 <String as FromSql<sql_types::Text, Pg>>::from_nullable_sql(first_values.1).unwrap(),
421 expected[0].1
422 );
423 }
424}