Skip to content

Commit 2ce3800

Browse files
committed
Add support for external table creation
1 parent c0c824b commit 2ce3800

File tree

3 files changed

+224
-1
lines changed

3 files changed

+224
-1
lines changed

README.rst

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -350,6 +350,39 @@ To create an integer-range partitioned table
350350
bigquery_require_partition_filter=True,
351351
)
352352
353+
To create an external table backed by files in Google Cloud Storage:
354+
355+
.. code-block:: python
356+
357+
from google.cloud import bigquery
358+
359+
external_config = bigquery.ExternalConfig(bigquery.ExternalSourceFormat.PARQUET)
360+
external_config.source_uris = ["gs://my-bucket/path/to/files/*"]
361+
362+
table = Table('mytable', ...,
363+
prefixes=['EXTERNAL'],
364+
bigquery_external_data_configuration=external_config,
365+
)
366+
367+
To create an external table with hive partitioning:
368+
369+
.. code-block:: python
370+
371+
from google.cloud import bigquery
372+
373+
hive_partitioning = bigquery.HivePartitioningOptions()
374+
hive_partitioning.source_uri_prefix = "gs://my-bucket/path/to"
375+
hive_partitioning.require_partition_filter = True
376+
377+
external_config = bigquery.ExternalConfig(bigquery.ExternalSourceFormat.PARQUET)
378+
external_config.source_uris = ["gs://my-bucket/path/to/field=*/*"]
379+
external_config.hive_partitioning = hive_partitioning
380+
381+
table = Table('mytable', ...,
382+
prefixes=['EXTERNAL'],
383+
bigquery_external_data_configuration=external_config,
384+
)
385+
353386
354387
Threading and Multiprocessing
355388
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

sqlalchemy_bigquery/base.py

Lines changed: 49 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,13 +21,15 @@
2121

2222
import datetime
2323
from decimal import Decimal
24+
import inspect
2425
import random
2526
import operator
2627
import uuid
2728

2829
from google import auth
2930
import google.api_core.exceptions
30-
from google.cloud.bigquery import dbapi, ConnectionProperty
31+
from google.cloud.bigquery import ExternalConfig, dbapi, ConnectionProperty
32+
from google.cloud.bigquery.external_config import HivePartitioningOptions
3133
from google.cloud.bigquery.table import (
3234
RangePartitioning,
3335
TableReference,
@@ -647,6 +649,9 @@ class BigQueryDDLCompiler(DDLCompiler):
647649
"expiration_timestamp": datetime.datetime,
648650
"require_partition_filter": bool,
649651
"default_rounding_mode": str,
652+
"format": str,
653+
"hive_partition_uri_prefix": str,
654+
"require_hive_partition_filter": bool,
650655
}
651656

652657
# BigQuery has no support for foreign keys.
@@ -671,6 +676,20 @@ def get_column_specification(self, column, **kwargs):
671676
)
672677
return colspec
673678

679+
def create_table_suffix(self, table):
680+
"""
681+
Generate the suffix for external tables with hive partitioning.
682+
683+
For external tables with hive partitioning, BigQuery requires
684+
'WITH PARTITION COLUMNS' after the column definitions.
685+
686+
"""
687+
bq_opts = table.dialect_options["bigquery"]
688+
if external_config := bq_opts.get("external_data_configuration"):
689+
if external_config.hive_partitioning is not None:
690+
return "WITH PARTITION COLUMNS"
691+
return ""
692+
674693
def post_create_table(self, table):
675694
"""
676695
Constructs additional SQL clauses for table creation in BigQuery.
@@ -752,6 +771,34 @@ def post_create_table(self, table):
752771
self._validate_option_value_type("description", description)
753772
options["description"] = description
754773

774+
if external_config := bq_opts.get("external_data_configuration"):
775+
self._raise_for_type(
776+
"external_data_configuration", external_config, ExternalConfig
777+
)
778+
if not isinstance(external_config.source_uris, (list, str)):
779+
raise TypeError(
780+
"External table source_uris must be a list of strings"
781+
" (or a single string for Bigtable)"
782+
)
783+
options["format"] = external_config.source_format
784+
options["uris"] = external_config.source_uris
785+
if partitioning := external_config.hive_partitioning:
786+
self._raise_for_type(
787+
"external_data_configuration.hive_partitioning",
788+
partitioning,
789+
HivePartitioningOptions,
790+
)
791+
options["hive_partition_uri_prefix"] = partitioning.source_uri_prefix
792+
options[
793+
"require_hive_partition_filter"
794+
] = partitioning.require_partition_filter
795+
for name, _ in inspect.getmembers_static(
796+
external_config.options, lambda m: isinstance(m, property)
797+
):
798+
value = getattr(external_config.options, name)
799+
if value is not None:
800+
options[name] = value
801+
755802
for option in self.option_datatype_mapping:
756803
if option in bq_opts:
757804
options[option] = bq_opts.get(option)
@@ -978,6 +1025,7 @@ def _process_option_value(self, value):
9781025
float: lambda x: x,
9791026
bool: lambda x: "true" if x else "false",
9801027
datetime.datetime: lambda x: BQTimestamp.process_timestamp_literal(x),
1028+
list: lambda x: f'[{", ".join([self._process_option_value(item) for item in x])}]',
9811029
}
9821030

9831031
if (option_cast := option_casting.get(type(value))) is not None:

tests/unit/test_table_options.py

Lines changed: 142 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,12 @@
2323
import sqlalchemy
2424

2525
from google.cloud.bigquery import (
26+
AvroOptions,
27+
CSVOptions,
28+
ExternalConfig,
29+
ExternalSourceFormat,
30+
HivePartitioningOptions,
31+
ParquetOptions,
2632
PartitionRange,
2733
RangePartitioning,
2834
TimePartitioning,
@@ -461,6 +467,142 @@ def test_table_all_dialect_option(faux_conn):
461467
assert result == expected
462468

463469

470+
def test_create_external_table(faux_conn):
471+
external_config = ExternalConfig(ExternalSourceFormat.AVRO)
472+
external_config.source_uris = ["gs://bucket-name/prefix/file.avro"]
473+
474+
with pytest.raises(sqlite3.OperationalError):
475+
# expect table creation to fail as SQLite does not support external tables
476+
setup_table(
477+
faux_conn,
478+
"some_table",
479+
sqlalchemy.Column("string_col", sqlalchemy.String),
480+
sqlalchemy.Column("int_col", sqlalchemy.Integer),
481+
prefixes=["EXTERNAL"],
482+
bigquery_external_data_configuration=external_config,
483+
)
484+
485+
result = " ".join(faux_conn.test_data["execute"][-1][0].strip().split())
486+
expected = (
487+
"CREATE EXTERNAL TABLE `some_table` ( `string_col` STRING, `int_col` INT64 )"
488+
" OPTIONS(format='AVRO', uris=['gs://bucket-name/prefix/file.avro'])"
489+
)
490+
491+
assert result == expected
492+
493+
494+
def test_create_external_table_hive_partitioning(faux_conn):
495+
hive_partitioning = HivePartitioningOptions()
496+
hive_partitioning.source_uri_prefix = "gs://bucket-name/prefix"
497+
hive_partitioning.require_partition_filter = False
498+
499+
external_config = ExternalConfig(ExternalSourceFormat.PARQUET)
500+
external_config.source_uris = [
501+
"gs://bucket-name/prefix/string_col=A/*",
502+
"gs://bucket-name/prefix/string_col=B/*",
503+
]
504+
external_config.hive_partitioning = hive_partitioning
505+
506+
with pytest.raises(sqlite3.OperationalError):
507+
# expect table creation to fail as SQLite does not support external tables
508+
setup_table(
509+
faux_conn,
510+
"some_table",
511+
sqlalchemy.Column("string_col", sqlalchemy.String),
512+
sqlalchemy.Column("int_col", sqlalchemy.Integer),
513+
prefixes=["EXTERNAL"],
514+
bigquery_external_data_configuration=external_config,
515+
)
516+
517+
result = " ".join(faux_conn.test_data["execute"][-1][0].strip().split())
518+
expected = (
519+
"CREATE EXTERNAL TABLE `some_table` WITH PARTITION COLUMNS ( `string_col` STRING, `int_col` INT64 )"
520+
" OPTIONS(format='PARQUET', uris=['gs://bucket-name/prefix/string_col=A/*', 'gs://bucket-name/prefix/string_col=B/*'], hive_partition_uri_prefix='gs://bucket-name/prefix', require_hive_partition_filter=false)"
521+
)
522+
523+
assert result == expected
524+
525+
526+
def test_create_external_table_format_csv_options(faux_conn):
527+
external_config = ExternalConfig(ExternalSourceFormat.CSV)
528+
external_config.source_uris = ["gs://bucket-name/prefix/string_col=A/file.csv"]
529+
530+
external_config.csv_options = CSVOptions()
531+
external_config.csv_options.skip_leading_rows = 1
532+
external_config.csv_options.preserve_ascii_control_characters = True
533+
534+
with pytest.raises(sqlite3.OperationalError):
535+
# expect table creation to fail as SQLite does not support external tables
536+
setup_table(
537+
faux_conn,
538+
"some_table",
539+
sqlalchemy.Column("string_col", sqlalchemy.String),
540+
prefixes=["EXTERNAL"],
541+
bigquery_external_data_configuration=external_config,
542+
)
543+
544+
result = " ".join(faux_conn.test_data["execute"][-1][0].strip().split())
545+
expected = (
546+
"CREATE EXTERNAL TABLE `some_table` ( `string_col` STRING )"
547+
" OPTIONS(format='CSV', uris=['gs://bucket-name/prefix/string_col=A/file.csv'], preserve_ascii_control_characters=true, skip_leading_rows=1)"
548+
)
549+
550+
assert result == expected
551+
552+
553+
def test_create_external_table_format_parquet_options(faux_conn):
554+
external_config = ExternalConfig(ExternalSourceFormat.PARQUET)
555+
external_config.source_uris = ["gs://bucket-name/prefix/string_col=A/file.parquet"]
556+
557+
external_config.parquet_options = ParquetOptions()
558+
external_config.parquet_options.enable_list_inference = True
559+
external_config.parquet_options.enum_as_string = False
560+
561+
with pytest.raises(sqlite3.OperationalError):
562+
# expect table creation to fail as SQLite does not support external tables
563+
setup_table(
564+
faux_conn,
565+
"some_table",
566+
sqlalchemy.Column("string_col", sqlalchemy.String),
567+
prefixes=["EXTERNAL"],
568+
bigquery_external_data_configuration=external_config,
569+
)
570+
571+
result = " ".join(faux_conn.test_data["execute"][-1][0].strip().split())
572+
expected = (
573+
"CREATE EXTERNAL TABLE `some_table` ( `string_col` STRING )"
574+
" OPTIONS(format='PARQUET', uris=['gs://bucket-name/prefix/string_col=A/file.parquet'], enable_list_inference=true, enum_as_string=false)"
575+
)
576+
577+
assert result == expected
578+
579+
580+
def test_create_external_table_format_avro_options(faux_conn):
581+
external_config = ExternalConfig(ExternalSourceFormat.AVRO)
582+
external_config.source_uris = ["gs://bucket-name/prefix/string_col=A/file.avro"]
583+
584+
external_config.avro_options = AvroOptions()
585+
external_config.avro_options.use_avro_logical_types = True
586+
587+
with pytest.raises(sqlite3.OperationalError):
588+
# expect table creation to fail as SQLite does not support external tables
589+
setup_table(
590+
faux_conn,
591+
"some_table",
592+
sqlalchemy.Column("string_col", sqlalchemy.String),
593+
prefixes=["EXTERNAL"],
594+
bigquery_external_data_configuration=external_config,
595+
)
596+
597+
result = " ".join(faux_conn.test_data["execute"][-1][0].strip().split())
598+
expected = (
599+
"CREATE EXTERNAL TABLE `some_table` ( `string_col` STRING )"
600+
" OPTIONS(format='AVRO', uris=['gs://bucket-name/prefix/string_col=A/file.avro'], use_avro_logical_types=true)"
601+
)
602+
603+
assert result == expected
604+
605+
464606
def test_validate_friendly_name_value_type(ddl_compiler):
465607
# expect option value to be transformed as a string expression
466608

0 commit comments

Comments
 (0)