|
15 | 15 | // specific language governing permissions and limitations |
16 | 16 | // under the License. |
17 | 17 |
|
18 | | -use std::sync::Arc; |
19 | | - |
20 | | -use crate::{ |
21 | | - errors::{PyDataFusionError, PyDataFusionResult}, |
22 | | - expr::PyExpr, |
23 | | -}; |
| 18 | +use crate::{errors::PyDataFusionResult, expr::PyExpr}; |
24 | 19 | use datafusion::logical_expr::conditional_expressions::CaseBuilder; |
25 | | -use parking_lot::{Mutex, MutexGuard}; |
| 20 | +use datafusion::prelude::Expr; |
26 | 21 | use pyo3::prelude::*; |
27 | 22 |
|
28 | | -struct CaseBuilderHandle<'a> { |
29 | | - guard: MutexGuard<'a, Option<CaseBuilder>>, |
30 | | - builder: Option<CaseBuilder>, |
31 | | -} |
32 | | - |
33 | | -impl<'a> CaseBuilderHandle<'a> { |
34 | | - fn new(mut guard: MutexGuard<'a, Option<CaseBuilder>>) -> PyDataFusionResult<Self> { |
35 | | - let builder = guard.take().ok_or_else(|| { |
36 | | - PyDataFusionError::Common("CaseBuilder has already been consumed".to_string()) |
37 | | - })?; |
38 | | - |
39 | | - Ok(Self { |
40 | | - guard, |
41 | | - builder: Some(builder), |
42 | | - }) |
43 | | - } |
44 | | - |
45 | | - fn builder_mut(&mut self) -> &mut CaseBuilder { |
46 | | - self.builder |
47 | | - .as_mut() |
48 | | - .expect("builder should be present while handle is alive") |
49 | | - } |
50 | | - |
51 | | - fn into_inner(mut self) -> CaseBuilder { |
52 | | - self.builder |
53 | | - .take() |
54 | | - .expect("builder should be present when consuming handle") |
55 | | - } |
56 | | -} |
57 | | - |
58 | | -impl Drop for CaseBuilderHandle<'_> { |
59 | | - fn drop(&mut self) { |
60 | | - if let Some(builder) = self.builder.take() { |
61 | | - *self.guard = Some(builder); |
62 | | - } |
63 | | - } |
64 | | -} |
65 | | - |
| 23 | +// TODO(tsaucer) replace this all with CaseBuilder after it implements Clone |
| 24 | +#[derive(Clone, Debug)] |
66 | 25 | #[pyclass(name = "CaseBuilder", module = "datafusion.expr", subclass, frozen)] |
67 | | -#[derive(Clone)] |
68 | 26 | pub struct PyCaseBuilder { |
69 | | - case_builder: Arc<Mutex<Option<CaseBuilder>>>, |
70 | | -} |
71 | | - |
72 | | -impl From<CaseBuilder> for PyCaseBuilder { |
73 | | - fn from(case_builder: CaseBuilder) -> PyCaseBuilder { |
74 | | - PyCaseBuilder { |
75 | | - case_builder: Arc::new(Mutex::new(Some(case_builder))), |
76 | | - } |
77 | | - } |
| 27 | + expr: Option<Expr>, |
| 28 | + when: Vec<Expr>, |
| 29 | + then: Vec<Expr>, |
78 | 30 | } |
79 | 31 |
|
| 32 | +#[pymethods] |
80 | 33 | impl PyCaseBuilder { |
81 | | - fn case_builder_handle(&self) -> PyDataFusionResult<CaseBuilderHandle<'_>> { |
82 | | - let guard = self.case_builder.lock(); |
83 | | - CaseBuilderHandle::new(guard) |
| 34 | + #[new] |
| 35 | + pub fn new(expr: Option<PyExpr>) -> Self { |
| 36 | + Self { |
| 37 | + expr: expr.map(Into::into), |
| 38 | + when: vec![], |
| 39 | + then: vec![], |
| 40 | + } |
84 | 41 | } |
85 | 42 |
|
86 | | - pub fn into_case_builder(self) -> PyDataFusionResult<CaseBuilder> { |
87 | | - let guard = self.case_builder.lock(); |
88 | | - CaseBuilderHandle::new(guard).map(CaseBuilderHandle::into_inner) |
89 | | - } |
90 | | -} |
| 43 | + pub fn when(&self, when: PyExpr, then: PyExpr) -> PyCaseBuilder { |
| 44 | + println!("when called {self:?}"); |
| 45 | + let mut case_builder = self.clone(); |
| 46 | + case_builder.when.push(when.into()); |
| 47 | + case_builder.then.push(then.into()); |
91 | 48 |
|
92 | | -#[pymethods] |
93 | | -impl PyCaseBuilder { |
94 | | - fn when(&self, when: PyExpr, then: PyExpr) -> PyDataFusionResult<PyCaseBuilder> { |
95 | | - let mut handle = self.case_builder_handle()?; |
96 | | - let next_builder = handle.builder_mut().when(when.expr, then.expr); |
97 | | - Ok(next_builder.into()) |
| 49 | + case_builder |
98 | 50 | } |
99 | 51 |
|
100 | 52 | fn otherwise(&self, else_expr: PyExpr) -> PyDataFusionResult<PyExpr> { |
101 | | - let mut handle = self.case_builder_handle()?; |
102 | | - match handle.builder_mut().otherwise(else_expr.expr) { |
103 | | - Ok(expr) => Ok(expr.clone().into()), |
104 | | - Err(err) => Err(err.into()), |
105 | | - } |
| 53 | + println!("otherwise called {self:?}"); |
| 54 | + let case_builder = CaseBuilder::new( |
| 55 | + self.expr.clone().map(Box::new), |
| 56 | + self.when.clone(), |
| 57 | + self.then.clone(), |
| 58 | + Some(Box::new(else_expr.into())), |
| 59 | + ); |
| 60 | + |
| 61 | + let expr = case_builder.end()?; |
| 62 | + |
| 63 | + Ok(expr.into()) |
106 | 64 | } |
107 | 65 |
|
108 | 66 | fn end(&self) -> PyDataFusionResult<PyExpr> { |
109 | | - let mut handle = self.case_builder_handle()?; |
110 | | - match handle.builder_mut().end() { |
111 | | - Ok(expr) => Ok(expr.clone().into()), |
112 | | - Err(err) => Err(err.into()), |
113 | | - } |
| 67 | + println!("end called {self:?}"); |
| 68 | + |
| 69 | + let case_builder = CaseBuilder::new( |
| 70 | + self.expr.clone().map(Box::new), |
| 71 | + self.when.clone(), |
| 72 | + self.then.clone(), |
| 73 | + None, |
| 74 | + ); |
| 75 | + |
| 76 | + let expr = case_builder.end()?; |
| 77 | + |
| 78 | + Ok(expr.into()) |
114 | 79 | } |
115 | 80 | } |
0 commit comments