|
12 | 12 | # See the License for the specific language governing permissions and |
13 | 13 | # limitations under the License. |
14 | 14 |
|
| 15 | +from __future__ import annotations |
| 16 | + |
| 17 | +import functools |
15 | 18 |
|
16 | 19 | from bigframes import dtypes |
| 20 | +from bigframes.core import agg_expressions, bigframe_node, expression, nodes |
| 21 | +from bigframes.core.rewrite import schema_binding |
| 22 | + |
| 23 | +IGNORED_NODES = ( |
| 24 | + nodes.SelectionNode, |
| 25 | + nodes.ReadLocalNode, |
| 26 | + nodes.ReadTableNode, |
| 27 | + nodes.ConcatNode, |
| 28 | + nodes.RandomSampleNode, |
| 29 | + nodes.FromRangeNode, |
| 30 | + nodes.PromoteOffsetsNode, |
| 31 | + nodes.ReversedNode, |
| 32 | + nodes.SliceNode, |
| 33 | + nodes.ResultNode, |
| 34 | +) |
| 35 | + |
| 36 | + |
| 37 | +def encode_type_refs(root: bigframe_node.BigFrameNode) -> str: |
| 38 | + return f"{root.reduce_up(_encode_type_refs_from_node):x}" |
| 39 | + |
| 40 | + |
| 41 | +def _encode_type_refs_from_node( |
| 42 | + node: bigframe_node.BigFrameNode, child_results: tuple[int, ...] |
| 43 | +) -> int: |
| 44 | + child_result = functools.reduce(lambda x, y: x | y, child_results, 0) |
| 45 | + |
| 46 | + curr_result = 0 |
| 47 | + if isinstance(node, nodes.FilterNode): |
| 48 | + curr_result = _encode_type_refs_from_expr(node.predicate, node.child) |
| 49 | + elif isinstance(node, nodes.ProjectionNode): |
| 50 | + for assignment in node.assignments: |
| 51 | + expr = assignment[0] |
| 52 | + if isinstance(expr, (expression.DerefOp)): |
| 53 | + # Ignore direct assignments in projection nodes. |
| 54 | + continue |
| 55 | + curr_result = curr_result | _encode_type_refs_from_expr( |
| 56 | + assignment[0], node.child |
| 57 | + ) |
| 58 | + elif isinstance(node, nodes.OrderByNode): |
| 59 | + for by in node.by: |
| 60 | + curr_result = curr_result | _encode_type_refs_from_expr( |
| 61 | + by.scalar_expression, node.child |
| 62 | + ) |
| 63 | + elif isinstance(node, nodes.JoinNode): |
| 64 | + for left, right in node.conditions: |
| 65 | + curr_result = ( |
| 66 | + curr_result |
| 67 | + | _encode_type_refs_from_expr(left, node.left_child) |
| 68 | + | _encode_type_refs_from_expr(right, node.right_child) |
| 69 | + ) |
| 70 | + elif isinstance(node, nodes.InNode): |
| 71 | + curr_result = _encode_type_refs_from_expr(node.left_col, node.left_child) |
| 72 | + elif isinstance(node, nodes.AggregateNode): |
| 73 | + for agg, _ in node.aggregations: |
| 74 | + curr_result = curr_result | _encode_type_refs_from_expr(agg, node.child) |
| 75 | + elif isinstance(node, nodes.WindowOpNode): |
| 76 | + for grouping_key in node.window_spec.grouping_keys: |
| 77 | + curr_result = curr_result | _encode_type_refs_from_expr( |
| 78 | + grouping_key, node.child |
| 79 | + ) |
| 80 | + for ordering_expr in node.window_spec.ordering: |
| 81 | + curr_result = curr_result | _encode_type_refs_from_expr( |
| 82 | + ordering_expr.scalar_expression, node.child |
| 83 | + ) |
| 84 | + for col_def in node.agg_exprs: |
| 85 | + curr_result = curr_result | _encode_type_refs_from_expr( |
| 86 | + col_def.expression, node.child |
| 87 | + ) |
| 88 | + elif isinstance(node, nodes.ExplodeNode): |
| 89 | + for col_id in node.column_ids: |
| 90 | + curr_result = curr_result | _encode_type_refs_from_expr(col_id, node.child) |
| 91 | + elif isinstance(node, IGNORED_NODES): |
| 92 | + # Do nothing |
| 93 | + pass |
| 94 | + else: |
| 95 | + # For unseen nodes, do not raise errors as this is the logging path, but |
| 96 | + # we should cover those nodes either in the branches above, or place them |
| 97 | + # in the IGNORED_NODES collection. |
| 98 | + pass |
| 99 | + |
| 100 | + return child_result | curr_result |
| 101 | + |
| 102 | + |
| 103 | +def _encode_type_refs_from_expr( |
| 104 | + expr: expression.Expression, child_node: bigframe_node.BigFrameNode |
| 105 | +) -> int: |
| 106 | + # TODO(b/409387790): Remove this branch once SQLGlot compiler fully replaces Ibis compiler |
| 107 | + if not expr.is_resolved: |
| 108 | + if isinstance(expr, agg_expressions.Aggregation): |
| 109 | + expr = schema_binding._bind_schema_to_aggregation_expr(expr, child_node) |
| 110 | + else: |
| 111 | + expr = expression.bind_schema_fields(expr, child_node.field_by_id) |
17 | 112 |
|
| 113 | + result = _get_dtype_mask(expr.output_type) |
| 114 | + for child_expr in expr.children: |
| 115 | + result = result | _encode_type_refs_from_expr(child_expr, child_node) |
18 | 116 |
|
19 | | -def _add_data_type(existing_types: int, curr_type: dtypes.Dtype) -> int: |
20 | | - return existing_types | _get_dtype_mask(curr_type) |
| 117 | + return result |
21 | 118 |
|
22 | 119 |
|
23 | | -def _get_dtype_mask(dtype: dtypes.Dtype) -> int: |
| 120 | +def _get_dtype_mask(dtype: dtypes.Dtype | None) -> int: |
| 121 | + if dtype is None: |
| 122 | + # If the dtype is not given, ignore |
| 123 | + return 0 |
24 | 124 | if dtype == dtypes.INT_DTYPE: |
25 | 125 | return 1 << 1 |
26 | 126 | if dtype == dtypes.FLOAT_DTYPE: |
|
0 commit comments