Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@
import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;

import com.fasterxml.jackson.core.type.TypeReference;
import java.util.HashMap;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.beam.sdk.extensions.sql.TableUtils;
import org.apache.beam.sdk.extensions.sql.impl.TableName;
import org.apache.beam.sdk.extensions.sql.meta.BeamSqlTable;
Expand Down Expand Up @@ -59,8 +61,14 @@ public void createTable(Table table) {
getProvider(table.getType()).createTable(table);
} else {
String identifier = getIdentifier(table);
Map<String, String> props =
TableUtils.getObjectMapper()
.convertValue(table.getProperties(), new TypeReference<Map<String, String>>() {})
.entrySet().stream()
.filter(p -> !p.getKey().startsWith("beam."))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
try {
catalogConfig.createTable(identifier, table.getSchema(), table.getPartitionFields());
catalogConfig.createTable(identifier, table.getSchema(), table.getPartitionFields(), props);
} catch (TableAlreadyExistsException e) {
LOG.info(
"Iceberg table '{}' already exists at location '{}'.", table.getName(), identifier);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -56,6 +57,8 @@ class IcebergTable extends SchemaBaseBeamTable {
@VisibleForTesting static final String CATALOG_PROPERTIES_FIELD = "catalog_properties";
@VisibleForTesting static final String HADOOP_CONFIG_PROPERTIES_FIELD = "config_properties";
@VisibleForTesting static final String CATALOG_NAME_FIELD = "catalog_name";
static final String BEAM_WRITE_PROPERTY = "beam.write.";
static final String BEAM_READ_PROPERTY = "beam.read.";

@VisibleForTesting
static final String TRIGGERING_FREQUENCY_FIELD = "triggering_frequency_seconds";
Expand All @@ -71,9 +74,21 @@ class IcebergTable extends SchemaBaseBeamTable {
this.tableIdentifier = tableIdentifier;
this.catalogConfig = catalogConfig;
ObjectNode properties = table.getProperties();
if (properties.has(TRIGGERING_FREQUENCY_FIELD)) {
this.triggeringFrequency = properties.get(TRIGGERING_FREQUENCY_FIELD).asInt();
for (Map.Entry<String, JsonNode> property : properties.properties()) {
String name = property.getKey();
if (name.startsWith(BEAM_WRITE_PROPERTY)) {
String prop = name.substring(BEAM_WRITE_PROPERTY.length());
if (prop.equals(TRIGGERING_FREQUENCY_FIELD)) {
this.triggeringFrequency = property.getValue().asInt();
} else {
throw new IllegalArgumentException("Unknown Beam write property: " + name);
}
} else if (name.startsWith(BEAM_READ_PROPERTY)) {
// none supported yet
throw new IllegalArgumentException("Unknown Beam read property: " + name);
}
}

this.partitionFields = table.getPartitionFields();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ public void testSimpleInsertWithPartitionedFields() throws Exception {
+ ") \n"
+ "TYPE 'iceberg' \n"
+ "PARTITIONED BY('id', 'truncate(name, 3)') \n"
+ "TBLPROPERTIES '{ \"triggering_frequency_seconds\" : 10 }'";
+ "TBLPROPERTIES '{ \"beam.write.triggering_frequency_seconds\" : 10 }'";
String insertStatement =
format("INSERT INTO %s \n", tableIdentifier)
+ "SELECT \n"
Expand Down Expand Up @@ -211,7 +211,7 @@ public void testSimpleInsertFlat() throws Exception {
+ " name VARCHAR \n "
+ ") \n"
+ "TYPE 'iceberg' \n"
+ "TBLPROPERTIES '{ \"triggering_frequency_seconds\" : 10 }'";
+ "TBLPROPERTIES '{ \"beam.write.triggering_frequency_seconds\" : 10 }'";
String insertStatement =
format("INSERT INTO %s \n", tableIdentifier)
+ "SELECT \n"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,10 @@ public boolean dropNamespace(String namespace, boolean cascade) {
}

public void createTable(
String tableIdentifier, Schema tableSchema, @Nullable List<String> partitionFields) {
String tableIdentifier,
Schema tableSchema,
@Nullable List<String> partitionFields,
@Nullable Map<String, String> properties) {
TableIdentifier icebergIdentifier = TableIdentifier.parse(tableIdentifier);
org.apache.iceberg.Schema icebergSchema = IcebergUtils.beamSchemaToIcebergSchema(tableSchema);
PartitionSpec icebergSpec = PartitionUtils.toPartitionSpec(partitionFields, tableSchema);
Expand All @@ -153,7 +156,11 @@ public void createTable(
icebergIdentifier,
icebergSchema,
icebergSpec);
catalog().createTable(icebergIdentifier, icebergSchema, icebergSpec);
if (properties != null) {
catalog().createTable(icebergIdentifier, icebergSchema, icebergSpec, properties);
} else {
catalog().createTable(icebergIdentifier, icebergSchema, icebergSpec);
}
LOG.info("Successfully created table '{}'.", icebergIdentifier);
} catch (AlreadyExistsException e) {
throw new TableAlreadyExistsException(e);
Expand Down
75 changes: 75 additions & 0 deletions website/www/site/content/en/documentation/dsls/sql/ddl/alter.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
---
type: languages
title: "Beam SQL DDL: Alter"
---
<!--
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->

# ALTER statements

The **ALTER** statement modifies the definition of an existing Catalog or Table.
For supported tables (like Iceberg), this enables **schema and partition evolution**.

## ALTER CATALOG
Modifies an existing catalog's properties.

```sql
ALTER CATALOG catalog_name
[ SET ( 'key' = 'val', ... ) ]
[ RESET ( 'key', ... ) ]
```
- **SET**: Adds new properties or updates existing ones.
- **RESET** / **UNSET**: Removes properties.

## ALTER TABLE
Modifies an existing table's properties and evolves its partition and schema.

```sql
ALTER TABLE table_name
[ ADD COLUMNS ( col_def, ... ) ]
[ DROP COLUMNS ( col_name, ... ) ]
[ ADD PARTITIONS ( partition_field, ... ) ]
[ DROP PARTITIONS ( partition_field, ... ) ]
[ SET ( 'key' = 'val', ... ) ]
[ ( RESET | UNSET ) ( 'key', ... ) ];
```

*Example 1: Add or remove columns*
```sql
-- Add columns
ALTER TABLE orders ADD COLUMNS (
customer_email VARCHAR,
shipping_region VARCHAR
);

-- Drop columns
ALTER TABLE orders DROP COLUMNS ( customer_email );
```

*Example 2: Modify partition spec*
```sql
-- Add a partition field
ALTER TABLE orders ADD PARTITIONS ( 'year(order_date)' );

-- Remove a partition field
ALTER TABLE orders DROP PARTITIONS ( 'region_id' );
```

*Example 3: Modify table properties*
```sql
ALTER TABLE orders SET (
'write.format.default' = 'orc',
'write.metadata.metrics.default' = 'full' );
ALTER TABLE orders RESET ( 'write.target-file-size-bytes' );
```
120 changes: 120 additions & 0 deletions website/www/site/content/en/documentation/dsls/sql/ddl/create.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
---
type: languages
title: "Beam SQL DDL: Create"
---
<!--
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->

# CREATE statements

The **CREATE** command serves two potential functions depending on the connector:

- **Registration**: By default, it registers an existing external entity in the Beam SQL session.
- **Instantiation**: For supported connectors (e.g., Iceberg), it physically creates the entity
(e.g. namespace or table) in the underlying storage.

_**Note**: Creating a catalog or database does not automatically switch to it. Remember
to run [USE](/use) afterwards to set it as a default._

## `CREATE CATALOG`
Registers a new catalog instance.

```sql
CREATE CATALOG [ IF NOT EXISTS ] catalog_name
TYPE 'type_name'
[ PROPERTIES ( 'key' = 'value' [, ...] ) ]
```

_**Example**: Creating a Hadoop Catalog (Local Storage)_
```sql
CREATE CATALOG local_catalog
TYPE iceberg
PROPERTIES (
'type' = 'hadoop',
'warehouse' = 'file:///tmp/iceberg-warehouse'
)
```

_**Example**: Registering a BigLake Catalog (GCS)_
```sql
CREATE CATALOG prod_iceberg
TYPE iceberg
PROPERTIES (
'type' = 'rest',
'uri' = 'https://biglake.googleapis.com/iceberg/v1/restcatalog',
'warehouse' = 'gs://my-company-bucket/warehouse',
'header.x-goog-user-project' = 'my_prod_project',
'rest.auth.type' = 'org.apache.iceberg.gcp.auth.GoogleAuthManager',
'io-impl' = 'org.apache.iceberg.gcp.gcs.GCSFileIO',
'rest-metrics-reporting-enabled' = 'false'
);
```

### `CREATE DATABASE`
Creates a new Database within the current Catalog (default), or the specified Catalog.
```sql
CREATE DATABASE [ IF NOT EXISTS ] [ catalog_name. ]database_name;
```

_**Example**: Create a database in the current active catalog_
```sql
USE CATALOG my_catalog;
CREATE DATABASE sales_data;
```

_**Example**: Create a database in a specified catalog (must be registered)_
```sql
CREATE DATABASE other_catalog.sales_data;
```

### `CREATE TABLE`
Creates a table within the currently active catalog and database. If the table name is fully qualified, the referenced database and catalog is used.

```sql
CREATE EXTERNAL TABLE [ IF NOT EXISTS ] [ catalog. ][ db. ]table_name (
col_name col_type [ NOT NULL ] [ COMMENT 'col_comment' ],
...
)
TYPE 'type_name'
[ PARTITIONED BY ( 'partition_field' [, ... ] ) ]
[ COMMENT 'table_comment' ]
[ LOCATION 'location_uri' ]
[ TBLPROPERTIES 'properties_json_string' ];
```
- **TYPE**: the table type (e.g. `'iceberg'`, `'text'`, `'kafka'`)
- **PARTITIONED BY**: an ordered list of fields describing the partition spec.
- **LOCATION**: explicitly sets the location of the table (overriding the inferred `catalog.db.table_name` location)
- **TBLPROPERTIES**: configuration properties used when creating the table or setting up its IO connection.

_**Example**: Creating an Iceberg Table_
```sql
CREATE EXTERNAL TABLE prod_iceberg.sales_data.orders (
order_id BIGINT NOT NULL COMMENT 'Unique order identifier',
amount DECIMAL(10, 2),
order_date TIMESTAMP,
region_id VARCHAR
)
TYPE 'iceberg'
PARTITIONED BY ( 'region_id', 'day(order_date)' )
COMMENT 'Daily sales transactions'
TBLPROPERTIES '{
"write.format.default": "parquet",
"read.split.target-size": 268435456",
"beam.write.triggering_frequency_seconds": 60"
}';
```
- This creates an Iceberg table named `orders` under the namespace `sales_data`, within the `prod_iceberg` catalog.
- The table is partitioned by `region_id`, then by the day value of `order_date` (using Iceberg's [hidden partitioning](https://iceberg.apache.org/docs/latest/partitioning/#icebergs-hidden-partitioning)).
- The table is created with the appropriate properties `"write.format.default"` and `"read.split.target-size"`. The Beam property `"beam.write.triggering_frequency_seconds"`
- Beam properties (prefixed with `"beam.write."` and `"beam.read."` are intended for the relevant IOs)
50 changes: 50 additions & 0 deletions website/www/site/content/en/documentation/dsls/sql/ddl/drop.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
---
type: languages
title: "Beam SQL DDL: Drop"
---
<!--
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->

# DROP statements
The **DROP** command serves two potential functions depending on the connector:

- **Unregistration**: unregisters an entity from the current Beam SQL session.
- **Deletion**: For supported connectors (like **Iceberg**), it **physically deletes** the entity
(e.g. namespace or table) in the underlying storage.

**Caution:** Physical deletion can be permanent

## DROP CATALOG
Unregisters a catalog from Beam SQL. This does not destroy external data, only the link within the SQL session.

```sql
DROP CATALOG [ IF EXISTS ] catalog_name;
```

## DROP DATABASE
Unregisters a database from the current session. For supported connectors, this
will also **delete** the database from the external data source.

```sql
DROP DATABASE [ IF EXISTS ] database_name [ RESTRICT | CASCADE ];
```
- **RESTRICT** (Default): Fails if the database is not empty.
- **CASCADE**: Drops the database and all tables contained within it. **Use with caution.**

## DROP TABLE
Unregisters a table from the current session. For supported connectors, this
will also **delete** the table from the external data source.
```sql
DROP TABLE [ IF EXISTS ] table_name;
```
Loading
Loading