Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -209,48 +209,74 @@ SELECT
offset,
timestamp AS kafka_timestamp,
current_timestamp() AS _ingested_at
FROM read_stream(
format => 'kafka',
kafka.bootstrap.servers => '${kafka_brokers}',
FROM STREAM read_kafka(
bootstrapServers => '${kafka_brokers}',
subscribe => 'events-topic',
startingOffsets => 'latest', -- or 'earliest'
kafka.security.protocol => 'SASL_SSL',
kafka.sasl.mechanism => 'PLAIN',
kafka.sasl.jaas.config => 'kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="${kafka_username}" password="${kafka_password}";'
startingOffsets => 'latest' -- or 'earliest'
);
```

**With a Databricks service credential** (recommended for managed auth):
```sql
FROM STREAM read_kafka(
bootstrapServers => '${kafka_brokers}',
subscribe => 'events-topic',
serviceCredential => 'my-kafka-credential'
)
```

**With explicit SASL/SSL** (external clusters):
```sql
FROM STREAM read_kafka(
bootstrapServers => '${kafka_brokers}',
subscribe => 'events-topic',
startingOffsets => 'latest',
`kafka.security.protocol` => 'SASL_SSL',
`kafka.sasl.mechanism` => 'PLAIN',
`kafka.sasl.username` => secret('kafka', 'username'),
`kafka.sasl.password` => secret('kafka', 'password')
)
```

### Kafka with Multiple Topics

```sql
FROM read_stream(
format => 'kafka',
kafka.bootstrap.servers => '${kafka_brokers}',
FROM STREAM read_kafka(
bootstrapServers => '${kafka_brokers}',
subscribe => 'topic1,topic2,topic3',
startingOffsets => 'latest'
)
```

### Azure Event Hub

There is no `read_eventhub` TVF. Use `read_kafka` with the Kafka-compatible endpoint. The SAS connection string becomes the JAAS config password — pass it as a pipeline variable or via `secret()`.

```sql
CREATE OR REPLACE STREAMING TABLE bronze_eventhub_events AS
SELECT
CAST(body AS STRING) AS event_body,
enqueuedTime AS event_time,
CAST(value AS STRING) AS event_body,
timestamp AS event_time,
partition,
offset,
sequenceNumber,
current_timestamp() AS _ingested_at
FROM read_stream(
format => 'eventhubs',
eventhubs.connectionString => '${eventhub_connection_string}',
eventhubs.consumerGroup => '${consumer_group}',
startingPosition => 'latest'
current_timestamp() AS _ingested_at
FROM STREAM read_kafka(
bootstrapServers => '${eventhub_namespace}.servicebus.windows.net:9093',
subscribe => '${eventhub_name}',
startingOffsets => 'latest',
`kafka.security.protocol` => 'SASL_SSL',
`kafka.sasl.mechanism` => 'PLAIN',
`kafka.sasl.jaas.config` => concat(
'kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="',
'${eventhub_connection_string}',
'";'
)
);
```

### AWS Kinesis

**With explicit credentials** (using secrets):
```sql
CREATE OR REPLACE STREAMING TABLE bronze_kinesis_events AS
SELECT
Expand All @@ -259,14 +285,24 @@ SELECT
sequenceNumber,
approximateArrivalTimestamp AS arrival_time,
current_timestamp() AS _ingested_at
FROM read_stream(
format => 'kinesis',
kinesis.streamName => '${stream_name}',
kinesis.region => '${aws_region}',
kinesis.startingPosition => 'LATEST'
FROM STREAM read_kinesis(
streamName => '${stream_name}',
awsAccessKey => secret('kinesis', 'awsAccessKey'),
awsSecretKey => secret('kinesis', 'awsSecretKey'),
initialPosition => 'latest'
);
```

**With IAM role**:
```sql
FROM STREAM read_kinesis(
streamName => '${stream_name}',
initialPosition => 'latest',
roleArn => 'arn:aws:iam::123456789012:role/MyRole',
roleSessionName => 'my-pipeline-session'
)
```

### Parse JSON from Streaming Sources

```sql
Expand Down Expand Up @@ -300,21 +336,40 @@ FROM STREAM silver_kafka_parsed;

### Using Databricks Secrets

**Kafka**:
Use the `secret('scope', 'key')` SQL function to inject secrets into TVF parameters. The `{{secrets/scope/key}}` template syntax is **not valid** in SDP SQL.

**Kafka — service credential** (recommended):
```sql
kafka.sasl.jaas.config => 'kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="{{secrets/kafka/username}}" password="{{secrets/kafka/password}}";'
serviceCredential => 'my-kafka-credential'
```

**Event Hub**:
**Kafka — SASL/SSL** (external clusters):
```sql
eventhubs.connectionString => '{{secrets/eventhub/connection-string}}'
`kafka.security.protocol` => 'SASL_SSL',
`kafka.sasl.mechanism` => 'PLAIN',
`kafka.sasl.username` => secret('kafka', 'username'),
`kafka.sasl.password` => secret('kafka', 'password')
```

**Kinesis — explicit credentials**:
```sql
awsAccessKey => secret('kinesis', 'awsAccessKey'),
awsSecretKey => secret('kinesis', 'awsSecretKey')
```

**Kinesis — IAM role**:
```sql
roleArn => 'arn:aws:iam::123456789012:role/MyRole',
roleSessionName => 'my-pipeline-session'
```

**Kinesis — environment variables**: no auth params needed if `AWS_ACCESS_KEY_ID` / `AWS_SECRET_ACCESS_KEY` are set on the cluster.

### Using Pipeline Variables

Reference variables in SQL:
```sql
kafka.bootstrap.servers => '${kafka_brokers}'
bootstrapServers => '${kafka_brokers}'
```

Define in pipeline configuration:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ Common configuration keys (all values must be strings):
| `spark.sql.shuffle.partitions` | Number of shuffle partitions (`"auto"` recommended) |
| `pipelines.numRetries` | Number of retries on transient failures |
| `pipelines.trigger.interval` | Trigger interval for continuous pipelines, e.g., `"1 hour"` |
| `pipelines.reset.allowed` | Set to `"false"` to preserve Delta table data on full refresh |
| `spark.databricks.delta.preview.enabled` | Enable Delta preview features (`"true"`) |

### `run_as` Object - Pipeline Execution Identity
Expand Down