Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 33 additions & 0 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,39 @@ To create an integer-range partitioned table
bigquery_require_partition_filter=True,
)

To create an external table backed by files in Google Cloud Storage:

.. code-block:: python

from google.cloud import bigquery

external_config = bigquery.ExternalConfig(bigquery.ExternalSourceFormat.PARQUET)
external_config.source_uris = ["gs://my-bucket/path/to/files/*"]

table = Table('mytable', ...,
prefixes=['EXTERNAL'],
bigquery_external_data_configuration=external_config,
)

To create an external table with hive partitioning:

.. code-block:: python

from google.cloud import bigquery

hive_partitioning = bigquery.HivePartitioningOptions()
hive_partitioning.source_uri_prefix = "gs://my-bucket/path/to"
hive_partitioning.require_partition_filter = True

external_config = bigquery.ExternalConfig(bigquery.ExternalSourceFormat.PARQUET)
external_config.source_uris = ["gs://my-bucket/path/to/field=*/*"]
external_config.hive_partitioning = hive_partitioning

table = Table('mytable', ...,
prefixes=['EXTERNAL'],
bigquery_external_data_configuration=external_config,
)


Threading and Multiprocessing
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
Expand Down
50 changes: 49 additions & 1 deletion sqlalchemy_bigquery/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,15 @@

import datetime
from decimal import Decimal
import inspect
import random
import operator
import uuid

from google import auth
import google.api_core.exceptions
from google.cloud.bigquery import dbapi, ConnectionProperty
from google.cloud.bigquery import ExternalConfig, dbapi, ConnectionProperty
from google.cloud.bigquery.external_config import HivePartitioningOptions
from google.cloud.bigquery.table import (
RangePartitioning,
TableReference,
Expand Down Expand Up @@ -647,6 +649,9 @@ class BigQueryDDLCompiler(DDLCompiler):
"expiration_timestamp": datetime.datetime,
"require_partition_filter": bool,
"default_rounding_mode": str,
"format": str,
"hive_partition_uri_prefix": str,
"require_hive_partition_filter": bool,
}

# BigQuery has no support for foreign keys.
Expand All @@ -671,6 +676,20 @@ def get_column_specification(self, column, **kwargs):
)
return colspec

def create_table_suffix(self, table):
"""
Generate the suffix for external tables with hive partitioning.

For external tables with hive partitioning, BigQuery requires
'WITH PARTITION COLUMNS' after the column definitions.

"""
bq_opts = table.dialect_options["bigquery"]
if external_config := bq_opts.get("external_data_configuration"):
if external_config.hive_partitioning is not None:
return "WITH PARTITION COLUMNS"
return ""

def post_create_table(self, table):
"""
Constructs additional SQL clauses for table creation in BigQuery.
Expand Down Expand Up @@ -752,6 +771,34 @@ def post_create_table(self, table):
self._validate_option_value_type("description", description)
options["description"] = description

if external_config := bq_opts.get("external_data_configuration"):
self._raise_for_type(
"external_data_configuration", external_config, ExternalConfig
)
if not isinstance(external_config.source_uris, (list, str)):
raise TypeError(
"External table source_uris must be a list of strings"
" (or a single string for Bigtable)"
)
Comment on lines +778 to +782

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The current type check for source_uris is not strict enough. It allows a list containing non-string elements (e.g., [1, 2, 3]), which would lead to invalid SQL being generated as the uris option in BigQuery expects an array of strings. It would be more robust to also validate the types of the elements when a list is provided.

            if isinstance(external_config.source_uris, list):
                if not all(isinstance(uri, str) for uri in external_config.source_uris):
                    raise TypeError(
                        "When providing a list for source_uris, all elements must be strings."
                    )
            elif not isinstance(external_config.source_uris, str):
                raise TypeError(
                    "External table source_uris must be a list of strings"
                    " (or a single string for Bigtable)"
                )

options["format"] = external_config.source_format
options["uris"] = external_config.source_uris
if partitioning := external_config.hive_partitioning:
self._raise_for_type(
"external_data_configuration.hive_partitioning",
partitioning,
HivePartitioningOptions,
)
options["hive_partition_uri_prefix"] = partitioning.source_uri_prefix
options[
"require_hive_partition_filter"
] = partitioning.require_partition_filter
for name, _ in inspect.getmembers_static(
external_config.options, lambda m: isinstance(m, property)
):
value = getattr(external_config.options, name)
if value is not None:
options[name] = value

for option in self.option_datatype_mapping:
if option in bq_opts:
options[option] = bq_opts.get(option)
Expand Down Expand Up @@ -978,6 +1025,7 @@ def _process_option_value(self, value):
float: lambda x: x,
bool: lambda x: "true" if x else "false",
datetime.datetime: lambda x: BQTimestamp.process_timestamp_literal(x),
list: lambda x: f'[{", ".join([self._process_option_value(item) for item in x])}]',
}

if (option_cast := option_casting.get(type(value))) is not None:
Expand Down
142 changes: 142 additions & 0 deletions tests/unit/test_table_options.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,12 @@
import sqlalchemy

from google.cloud.bigquery import (
AvroOptions,
CSVOptions,
ExternalConfig,
ExternalSourceFormat,
HivePartitioningOptions,
ParquetOptions,
PartitionRange,
RangePartitioning,
TimePartitioning,
Expand Down Expand Up @@ -461,6 +467,142 @@ def test_table_all_dialect_option(faux_conn):
assert result == expected


def test_create_external_table(faux_conn):
external_config = ExternalConfig(ExternalSourceFormat.AVRO)
external_config.source_uris = ["gs://bucket-name/prefix/file.avro"]

with pytest.raises(sqlite3.OperationalError):
# expect table creation to fail as SQLite does not support external tables
setup_table(
faux_conn,
"some_table",
sqlalchemy.Column("string_col", sqlalchemy.String),
sqlalchemy.Column("int_col", sqlalchemy.Integer),
prefixes=["EXTERNAL"],
bigquery_external_data_configuration=external_config,
)

result = " ".join(faux_conn.test_data["execute"][-1][0].strip().split())
expected = (
"CREATE EXTERNAL TABLE `some_table` ( `string_col` STRING, `int_col` INT64 )"
" OPTIONS(format='AVRO', uris=['gs://bucket-name/prefix/file.avro'])"
)

assert result == expected


def test_create_external_table_hive_partitioning(faux_conn):
hive_partitioning = HivePartitioningOptions()
hive_partitioning.source_uri_prefix = "gs://bucket-name/prefix"
hive_partitioning.require_partition_filter = False

external_config = ExternalConfig(ExternalSourceFormat.PARQUET)
external_config.source_uris = [
"gs://bucket-name/prefix/string_col=A/*",
"gs://bucket-name/prefix/string_col=B/*",
]
external_config.hive_partitioning = hive_partitioning

with pytest.raises(sqlite3.OperationalError):
# expect table creation to fail as SQLite does not support external tables
setup_table(
faux_conn,
"some_table",
sqlalchemy.Column("string_col", sqlalchemy.String),
sqlalchemy.Column("int_col", sqlalchemy.Integer),
prefixes=["EXTERNAL"],
bigquery_external_data_configuration=external_config,
)

result = " ".join(faux_conn.test_data["execute"][-1][0].strip().split())
expected = (
"CREATE EXTERNAL TABLE `some_table` WITH PARTITION COLUMNS ( `string_col` STRING, `int_col` INT64 )"
" OPTIONS(format='PARQUET', uris=['gs://bucket-name/prefix/string_col=A/*', 'gs://bucket-name/prefix/string_col=B/*'], hive_partition_uri_prefix='gs://bucket-name/prefix', require_hive_partition_filter=false)"
)

assert result == expected


def test_create_external_table_format_csv_options(faux_conn):
external_config = ExternalConfig(ExternalSourceFormat.CSV)
external_config.source_uris = ["gs://bucket-name/prefix/string_col=A/file.csv"]

external_config.csv_options = CSVOptions()
external_config.csv_options.skip_leading_rows = 1
external_config.csv_options.preserve_ascii_control_characters = True

with pytest.raises(sqlite3.OperationalError):
# expect table creation to fail as SQLite does not support external tables
setup_table(
faux_conn,
"some_table",
sqlalchemy.Column("string_col", sqlalchemy.String),
prefixes=["EXTERNAL"],
bigquery_external_data_configuration=external_config,
)

result = " ".join(faux_conn.test_data["execute"][-1][0].strip().split())
expected = (
"CREATE EXTERNAL TABLE `some_table` ( `string_col` STRING )"
" OPTIONS(format='CSV', uris=['gs://bucket-name/prefix/string_col=A/file.csv'], preserve_ascii_control_characters=true, skip_leading_rows=1)"
)

assert result == expected


def test_create_external_table_format_parquet_options(faux_conn):
external_config = ExternalConfig(ExternalSourceFormat.PARQUET)
external_config.source_uris = ["gs://bucket-name/prefix/string_col=A/file.parquet"]

external_config.parquet_options = ParquetOptions()
external_config.parquet_options.enable_list_inference = True
external_config.parquet_options.enum_as_string = False

with pytest.raises(sqlite3.OperationalError):
# expect table creation to fail as SQLite does not support external tables
setup_table(
faux_conn,
"some_table",
sqlalchemy.Column("string_col", sqlalchemy.String),
prefixes=["EXTERNAL"],
bigquery_external_data_configuration=external_config,
)

result = " ".join(faux_conn.test_data["execute"][-1][0].strip().split())
expected = (
"CREATE EXTERNAL TABLE `some_table` ( `string_col` STRING )"
" OPTIONS(format='PARQUET', uris=['gs://bucket-name/prefix/string_col=A/file.parquet'], enable_list_inference=true, enum_as_string=false)"
)

assert result == expected


def test_create_external_table_format_avro_options(faux_conn):
external_config = ExternalConfig(ExternalSourceFormat.AVRO)
external_config.source_uris = ["gs://bucket-name/prefix/string_col=A/file.avro"]

external_config.avro_options = AvroOptions()
external_config.avro_options.use_avro_logical_types = True

with pytest.raises(sqlite3.OperationalError):
# expect table creation to fail as SQLite does not support external tables
setup_table(
faux_conn,
"some_table",
sqlalchemy.Column("string_col", sqlalchemy.String),
prefixes=["EXTERNAL"],
bigquery_external_data_configuration=external_config,
)

result = " ".join(faux_conn.test_data["execute"][-1][0].strip().split())
expected = (
"CREATE EXTERNAL TABLE `some_table` ( `string_col` STRING )"
" OPTIONS(format='AVRO', uris=['gs://bucket-name/prefix/string_col=A/file.avro'], use_avro_logical_types=true)"
)

assert result == expected


def test_validate_friendly_name_value_type(ddl_compiler):
# expect option value to be transformed as a string expression

Expand Down
Loading