2222import queue
2323import threading
2424import typing
25- from typing import Any , Iterator , Optional , Sequence , Tuple
25+ from typing import Any , Iterator , List , Literal , Optional , Sequence , Tuple , Union
2626
2727from google .cloud import bigquery_storage_v1
2828import google .cloud .bigquery as bq
2929import google .cloud .bigquery_storage_v1 .types as bq_storage_types
3030from google .protobuf import timestamp_pb2
3131import pyarrow as pa
3232
33+ import bigframes .constants
3334from bigframes .core import pyarrow_utils
3435import bigframes .core .schema
3536
3637if typing .TYPE_CHECKING :
3738 import bigframes .core .ordering as orderings
3839
3940
41+ def _resolve_standard_gcp_region (bq_region : str ):
42+ """
43+ Resolve bq regions to standardized
44+ """
45+ if bq_region .casefold () == "US" :
46+ return "us-central1"
47+ elif bq_region .casefold () == "EU" :
48+ return "europe-west4"
49+ return bq_region
50+
51+
52+ def is_irc_table (table_id : str ):
53+ """
54+ Determines if a table id should be resolved through the iceberg rest catalog.
55+ """
56+ return len (table_id .split ("." )) == 4
57+
58+
59+ def is_compatible (
60+ data_region : Union [GcsRegion , BigQueryRegion ], session_location : str
61+ ) -> bool :
62+ # based on https://docs.cloud.google.com/bigquery/docs/locations#storage-location-considerations
63+ if isinstance (data_region , BigQueryRegion ):
64+ return data_region .name == session_location
65+ else :
66+ assert isinstance (data_region , GcsRegion )
67+ # TODO(b/463675088): Multi-regions don't yet support rest catalog tables
68+ if session_location in bigframes .constants .BIGQUERY_MULTIREGIONS :
69+ return False
70+ return _resolve_standard_gcp_region (session_location ) in data_region .included
71+
72+
73+ def get_default_bq_region (data_region : Union [GcsRegion , BigQueryRegion ]) -> str :
74+ if isinstance (data_region , BigQueryRegion ):
75+ return data_region .name
76+ elif isinstance (data_region , GcsRegion ):
77+ # should maybe try to track and prefer primary replica?
78+ return data_region .included [0 ]
79+
80+
4081@dataclasses .dataclass (frozen = True )
41- class GbqTable :
82+ class BigQueryRegion :
83+ name : str
84+
85+
86+ @dataclasses .dataclass (frozen = True )
87+ class GcsRegion :
88+ # this is the name of gcs regions, which may be names for multi-regions, so shouldn't be compared with non-gcs locations
89+ storage_regions : tuple [str , ...]
90+ # this tracks all the included standard, specific regions (eg us-east1), and should be comparable to bq regions (except non-standard US, EU, omni regions)
91+ included : tuple [str , ...]
92+
93+
94+ # what is the line between metadata and core fields? Mostly metadata fields are optional or unreliable, but its fuzzy
95+ @dataclasses .dataclass (frozen = True )
96+ class TableMetadata :
97+ # this size metadata might be stale, don't use where strict correctness is needed
98+ location : Union [BigQueryRegion , GcsRegion ]
99+ type : Literal ["TABLE" , "EXTERNAL" , "VIEW" , "MATERIALIZE_VIEW" , "SNAPSHOT" ]
100+ numBytes : Optional [int ] = None
101+ numRows : Optional [int ] = None
102+ created_time : Optional [datetime .datetime ] = None
103+ modified_time : Optional [datetime .datetime ] = None
104+
105+
106+ @dataclasses .dataclass (frozen = True )
107+ class GbqNativeTable :
42108 project_id : str = dataclasses .field ()
43109 dataset_id : str = dataclasses .field ()
44110 table_id : str = dataclasses .field ()
45111 physical_schema : Tuple [bq .SchemaField , ...] = dataclasses .field ()
46- is_physically_stored : bool = dataclasses .field ()
47- cluster_cols : typing .Optional [Tuple [str , ...]]
112+ metadata : TableMetadata = dataclasses .field ()
113+ partition_col : Optional [str ] = None
114+ cluster_cols : typing .Optional [Tuple [str , ...]] = None
115+ primary_key : Optional [Tuple [str , ...]] = None
48116
49117 @staticmethod
50- def from_table (table : bq .Table , columns : Sequence [str ] = ()) -> GbqTable :
118+ def from_table (table : bq .Table , columns : Sequence [str ] = ()) -> GbqNativeTable :
51119 # Subsetting fields with columns can reduce cost of row-hash default ordering
52120 if columns :
53121 schema = tuple (item for item in table .schema if item .name in columns )
54122 else :
55123 schema = tuple (table .schema )
56- return GbqTable (
124+
125+ metadata = TableMetadata (
126+ numBytes = table .num_bytes ,
127+ numRows = table .num_rows ,
128+ location = table .location , # type: ignore
129+ type = table .table_type , # type: ignore
130+ created_time = table .created ,
131+ modified_time = table .modified ,
132+ )
133+
134+ return GbqNativeTable (
57135 project_id = table .project ,
58136 dataset_id = table .dataset_id ,
59137 table_id = table .table_id ,
60138 physical_schema = schema ,
61- is_physically_stored = (table .table_type in ["TABLE" , "MATERIALIZED_VIEW" ]),
62139 cluster_cols = None
63140 if table .clustering_fields is None
64141 else tuple (table .clustering_fields ),
142+ primary_key = tuple (_get_primary_keys (table )),
143+ metadata = metadata ,
65144 )
66145
67146 @staticmethod
68147 def from_ref_and_schema (
69148 table_ref : bq .TableReference ,
70149 schema : Sequence [bq .SchemaField ],
150+ location : str ,
151+ table_type : Literal ["TABLE" ] = "TABLE" ,
71152 cluster_cols : Optional [Sequence [str ]] = None ,
72- ) -> GbqTable :
73- return GbqTable (
153+ ) -> GbqNativeTable :
154+ return GbqNativeTable (
74155 project_id = table_ref .project ,
75156 dataset_id = table_ref .dataset_id ,
76157 table_id = table_ref .table_id ,
158+ metadata = TableMetadata (location = BigQueryRegion (location ), type = table_type ),
77159 physical_schema = tuple (schema ),
78- is_physically_stored = True ,
79160 cluster_cols = tuple (cluster_cols ) if cluster_cols else None ,
80161 )
81162
163+ @property
164+ def is_physically_stored (self ) -> bool :
165+ return self .metadata .type in ["TABLE" , "MATERIALIZED_VIEW" ]
166+
82167 def get_table_ref (self ) -> bq .TableReference :
83168 return bq .TableReference (
84169 bq .DatasetReference (self .project_id , self .dataset_id ), self .table_id
85170 )
86171
172+ def get_full_id (self , quoted : bool = False ) -> str :
173+ if quoted :
174+ return f"`{ self .project_id } `.`{ self .dataset_id } `.`{ self .table_id } `"
175+ return f"{ self .project_id } .{ self .dataset_id } .{ self .table_id } "
176+
177+ @property
178+ @functools .cache
179+ def schema_by_id (self ):
180+ return {col .name : col for col in self .physical_schema }
181+
182+
183+ @dataclasses .dataclass (frozen = True )
184+ class BiglakeIcebergTable :
185+ project_id : str = dataclasses .field ()
186+ catalog_id : str = dataclasses .field ()
187+ namespace_id : str = dataclasses .field ()
188+ table_id : str = dataclasses .field ()
189+ physical_schema : Tuple [bq .SchemaField , ...] = dataclasses .field ()
190+ cluster_cols : typing .Optional [Tuple [str , ...]]
191+ metadata : TableMetadata
192+
193+ def get_full_id (self , quoted : bool = False ) -> str :
194+ if quoted :
195+ return f"`{ self .project_id } `.`{ self .catalog_id } `.`{ self .namespace_id } `.`{ self .table_id } `"
196+ return (
197+ f"{ self .project_id } .{ self .catalog_id } .{ self .namespace_id } .{ self .table_id } "
198+ )
199+
87200 @property
88201 @functools .cache
89202 def schema_by_id (self ):
90203 return {col .name : col for col in self .physical_schema }
91204
205+ @property
206+ def partition_col (self ) -> Optional [str ]:
207+ return None
208+
209+ @property
210+ def primary_key (self ) -> Optional [Tuple [str , ...]]:
211+ return None
212+
92213
93214@dataclasses .dataclass (frozen = True )
94215class BigqueryDataSource :
@@ -104,13 +225,13 @@ def __post_init__(self):
104225 self .schema .names
105226 )
106227
107- table : GbqTable
228+ table : Union [ GbqNativeTable , BiglakeIcebergTable ]
108229 schema : bigframes .core .schema .ArraySchema
109230 at_time : typing .Optional [datetime .datetime ] = None
110231 # Added for backwards compatibility, not validated
111232 sql_predicate : typing .Optional [str ] = None
112233 ordering : typing .Optional [orderings .RowOrdering ] = None
113- # Optimization field
234+ # Optimization field, must be correct if set, don't put maybe-stale number here
114235 n_rows : Optional [int ] = None
115236
116237
@@ -188,6 +309,8 @@ def get_arrow_batches(
188309 project_id : str ,
189310 sample_rate : Optional [float ] = None ,
190311) -> ReadResult :
312+ assert isinstance (data .table , GbqNativeTable )
313+
191314 table_mod_options = {}
192315 read_options_dict : dict [str , Any ] = {"selected_fields" : list (columns )}
193316
@@ -245,3 +368,21 @@ def process_batch(pa_batch):
245368 return ReadResult (
246369 batches , session .estimated_row_count , session .estimated_total_bytes_scanned
247370 )
371+
372+
373+ def _get_primary_keys (
374+ table : bq .Table ,
375+ ) -> List [str ]:
376+ """Get primary keys from table if they are set."""
377+
378+ primary_keys : List [str ] = []
379+ if (
380+ (table_constraints := getattr (table , "table_constraints" , None )) is not None
381+ and (primary_key := table_constraints .primary_key ) is not None
382+ # This will be False for either None or empty list.
383+ # We want primary_keys = None if no primary keys are set.
384+ and (columns := primary_key .columns )
385+ ):
386+ primary_keys = columns if columns is not None else []
387+
388+ return primary_keys
0 commit comments