feat: add BigLake Iceberg support for BigQuery analytics plugin#4750
feat: add BigLake Iceberg support for BigQuery analytics plugin#4750caohy1988 wants to merge 6 commits intogoogle:mainfrom
Conversation
Add `biglake_storage_uri` config option that, when set alongside `connection_id`, automatically creates BigLake managed Iceberg tables and replaces JSON schema fields with STRING (since BigLake Iceberg does not support JSON type). Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1. Normalize connection_id to full resource path for BigLakeConfiguration
(projects/{project}/locations/{loc}/connections/{name}).
2. Skip time partitioning for BigLake Iceberg by default (preview feature);
add biglake_time_partitioning opt-in flag.
3. Document Storage Write API latency caveat for Iceberg metadata refresh
(~90 min for open-source engine visibility).
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…connection_id normalization _normalize_biglake_connection_id() now correctly parses "project.location.connection" (e.g. "myproj.us.my-conn") in addition to the two-part "location.connection" and full resource path forms. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
|
Thanks for your pull request! It looks like this may be your first contribution to a Google open source project. Before we can look at your pull request, you'll need to sign a Contributor License Agreement (CLA). View this failed invocation of the CLA check for more information. For the most up to date status, view the checks section at the bottom of the pull request. |
Summary of ChangesHello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request introduces robust support for BigLake managed Iceberg tables within the BigQuery analytics plugin. It enables users to configure their analytics to write data to Iceberg format in Google Cloud Storage, leveraging BigQuery's capabilities. The changes include necessary schema adjustments for Iceberg compatibility, standardized connection handling, and flexible partitioning options, ensuring seamless integration and data consistency across different analytics engines. Highlights
🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console. Changelog
Activity
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
|
Response from ADK Triaging Agent Hello @caohy1988, thank you for your contribution! To proceed with the review, could you please address the following points from our contribution guidelines:
Completing these steps will help us move forward with the review process. Thanks! |
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
There was a problem hiding this comment.
Code Review
This pull request introduces support for BigLake Iceberg tables in the BigQuery analytics plugin, which is a valuable enhancement. The changes are well-implemented, including new configuration options, schema adjustments for Iceberg compatibility, and robust connection ID normalization. The accompanying unit tests are thorough and cover the new functionality comprehensively. I have one minor suggestion to remove a redundant validation check to improve code clarity. Overall, this is a high-quality contribution.
| tbl.clustering_fields = self.config.clustering_fields | ||
| tbl.labels = {_SCHEMA_VERSION_LABEL_KEY: _SCHEMA_VERSION} | ||
| if self.is_biglake: | ||
| from google.cloud.bigquery.table import BigLakeConfiguration | ||
|
|
There was a problem hiding this comment.
This validation check for connection_id is redundant. An equivalent check is already performed in the __init__ method (lines 1956-1959). Failing early during plugin instantiation is preferable to failing during lazy setup, as it's easier to debug. Removing this duplicate check will make the code cleaner.
|
I looked into the Spark BigQuery connector path as a way to validate the “BigLake Iceberg supports high throughput streaming using the Storage Write API” claim. Short conclusion: yes, the Spark connector is a valid proof path for Storage Write API -> BigLake Iceberg, but it is not a good in-process replacement for the current Python plugin writer. Why:
Example shape: (
df.write
.format("bigquery")
.option("writeMethod", "direct")
.option("writeAtLeastOnce", "true")
.mode("append")
.save("project.dataset.biglake_iceberg_table")
)Recommendation:
One additional caveat from the docs: even when streamed writes succeed, Iceberg metadata visibility for open-source engines may lag by up to ~90 minutes, so this should not be treated as immediate cross-engine freshness. Given that, I would not change the plugin implementation to Spark. I would treat Spark/Dataflow as:
|
|
After looking at the documented support surface and the current E2E result, my recommendation is to keep this PR as a minimal MVP for BigLake support. Recommended default behavior:
Why I think this is the right scope for this PR:
Why this split makes sense technically:
So for this PR, I would explicitly avoid expanding scope into:
Those can all be follow-up work if needed. For MVP, the cleanest path is:
That gives users a working feature now, keeps the PR minimal, and avoids overfitting to an undocumented / currently failing backend path. |
|
I think this should be tracked as a Google-internal / product bug, separate from this PR. Reason:
I would recommend filing a product bug with a minimal repro like this: Title: Repro summary:
Questions for product team:
Given that, I would not block this PR on raw Storage Write API. I would keep the PR minimal and use:
That gives users a working MVP now, while the raw |
The Storage Write API v2 (Arrow format) cannot write to BigLake Iceberg tables due to internal _colidentifier_iceberg_1 columns. Route BigLake writes to the legacy streaming API (insert_rows_json) which handles these transparently. - Add LegacyStreamingBatchProcessor with same queue/batch interface - BigLake: create LegacyStreamingBatchProcessor in _get_loop_state() - Non-BigLake: unchanged, uses Storage Write API (BatchProcessor) - Skip Arrow schema creation for BigLake (not needed) - Update _LoopState to accept Union processor type - Add 5 tests for legacy streaming processor and routing Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
BigLake Iceberg tables cannot handle nested RECORD fields via any streaming API (both Storage Write API and legacy streaming fail with _colidentifier_iceberg errors on RECORD positions). Changes: - _replace_json_with_string now also flattens RECORD/STRUCT fields to STRING (JSON-serialized) for BigLake Iceberg - LegacyStreamingBatchProcessor._prepare_rows_json serializes dict/list values to JSON strings - Updated E2E test scripts to verify flattened schema - Local E2E test passes: 44 events, all 9 event types, all checks Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Summary
Adds BigLake Iceberg support to the BigQuery Agent Analytics Plugin, allowing users to log agent events to BigLake managed Apache Iceberg tables in BigQuery.
What changed
Schema & Table Creation:
biglake_storage_urionBigQueryLoggerConfig: when set (together withconnection_id), the plugin automatically creates and configures a BigLake Iceberg tablebiglake_storage_uriis set:content,attributes,latency_ms,content_parts.object_ref.details) → STRINGcontent_parts,object_ref) → STRING (JSON-serialized)file_format=PARQUET,table_format=ICEBERG,storage_uri, and normalizedconnection_idon the BigQuery tableconnection_idnormalization: Accepts 3 formats and normalizes to the full resource path:location.connection→projects/{project}/locations/{location}/connections/{connection}project.location.connection→projects/{project}/locations/{location}/connections/{connection}projects/P/locations/L/connections/C→ used as-isbiglake_time_partitioning=TrueValueErrorifbiglake_storage_uriis set withoutconnection_idWrite Path:
storage_write_api(unchanged, existing behavior)legacy_streaming(newLegacyStreamingBatchProcessor)_colidentifier_iceberg_*errors). The fix is to flatten all complex types to STRING in the schema, then use legacy streaming which handles the data correctly.LegacyStreamingBatchProcessorhas the same queue/batch/flush/shutdown interface asBatchProcessor, usingclient.insert_rows_json()run in a ThreadPoolExecutor_prepare_rows_json()serializes dict/list values to JSON strings and datetime objects to ISO formatE2E Test Results
Local agent test: ALL CHECKS PASSED
PARSE_JSON()Agent Engine test: Expected failure — the remote Agent Engine installs
google-adk[bigquery]from PyPI (released version without BigLake changes). It created a standard BQ table and logged 33 events via Storage Write API. Will work once these changes are published.Additional notes
PARSE_JSON()on STRING columns instead ofJSON_VALUE()on JSON columns);create_views=Falseis recommended.Test plan
TestBigLakeIcebergclass:is_biglakeproperty, connection_id validationLegacyStreamingBatchProcessor(notBatchProcessor)_prepare_rows_jsonserialization (datetime, dict, list, None)insert_rows_jsonRelated
🤖 Generated with Claude Code