-
-
Notifications
You must be signed in to change notification settings - Fork 204
Open
Labels
enhancementNew feature or requestNew feature or request
Description
RFC: Log Streaming & SIEM Integration
Phase: 5 — Advanced Security & Enterprise
Priority: P3 — Medium
Estimated Effort: Medium
Depends on: Audit Logs (#505)
Problem Statement
Enterprise customers need to feed auth events into their existing security tooling (Datadog, Splunk, S3, SIEM platforms). The audit log system (#505) stores events in Authorizer's database, but enterprises need real-time streaming to external destinations for centralized monitoring and compliance. WorkOS Log Streams provides this capability.
Proposed Solution
1. Log Stream Configuration
New schema: internal/storage/schemas/log_stream.go
type LogStream struct {
ID string `json:"id" gorm:"primaryKey;type:char(36)"`
Name string `json:"name" gorm:"type:varchar(256)"`
DestinationType string `json:"destination_type" gorm:"type:varchar(50)"` // webhook | s3 | datadog | splunk
Config string `json:"config" gorm:"type:text"` // JSON destination-specific config
EventFilter string `json:"event_filter" gorm:"type:text"` // JSON: which event types to stream
IsActive bool `json:"is_active" gorm:"type:bool;default:true"`
OrganizationID string `json:"organization_id" gorm:"type:char(36);index"` // optional org scoping
CreatedAt int64 `json:"created_at" gorm:"autoCreateTime"`
UpdatedAt int64 `json:"updated_at" gorm:"autoUpdateTime"`
}2. Destination Providers
Interface: internal/log_stream/
type Destination interface {
Send(ctx context.Context, events []AuditEvent) error
HealthCheck(ctx context.Context) error
Name() string
}Supported destinations:
| Destination | Config Fields | Format |
|---|---|---|
| HTTP Webhook | url, headers, auth_header |
JSON POST |
| AWS S3 | bucket, prefix, region, access_key, secret_key |
JSONL files |
| Datadog | api_key, site (us1/eu1/etc) |
Datadog Log API |
| Splunk HEC | url, token |
Splunk HTTP Event Collector |
3. Structured Event Format
All destinations receive events in a consistent JSON format:
{
"id": "evt_a1b2c3d4",
"timestamp": "2026-03-30T10:15:30Z",
"type": "user.login_success",
"version": "1.0",
"actor": {
"id": "usr_123",
"type": "user",
"email": "john@example.com",
"ip_address": "203.0.113.42",
"user_agent": "Mozilla/5.0..."
},
"target": {
"type": "session",
"id": "sess_456"
},
"metadata": {
"method": "password",
"mfa_used": true,
"device_hash": "abc123..."
},
"organization_id": "org_789",
"environment": {
"authorizer_version": "2.0.0",
"instance_id": "inst_..."
}
}4. Streaming Architecture
Built on top of audit log system (#505) — the audit log's buffered channel feeds both database writes and log streams:
func (a *auditProvider) flushLoop() {
for {
select {
case event := <-a.eventChan:
batch = append(batch, event)
if len(batch) >= batchSize || ticker fired {
// Write to database
a.writeBatch(batch)
// Fan out to active log streams
a.streamBatch(batch)
batch = batch[:0]
}
}
}
}
func (a *auditProvider) streamBatch(events []AuditEvent) {
streams, _ := a.store.ListActiveLogStreams(ctx)
for _, stream := range streams {
// Filter events based on stream's event filter
filtered := filterEvents(events, stream.EventFilter)
if len(filtered) == 0 { continue }
// Send asynchronously with retry
go a.sendWithRetry(stream, filtered)
}
}5. Batching and Retry
- Batch size: configurable (default: 100 events or 5 seconds, whichever comes first)
- Retry: exponential backoff (1s, 2s, 4s, 8s, max 60s), max 5 retries
- Dead letter: after max retries, log failure and store in dead letter table for manual retry
- Backpressure: if destination is consistently slow, buffer up to max buffer size, then drop oldest events with warning
func (a *auditProvider) sendWithRetry(stream *schemas.LogStream, events []AuditEvent) {
dest := a.getDestination(stream)
for attempt := 0; attempt < maxRetries; attempt++ {
err := dest.Send(ctx, events)
if err == nil { return }
time.Sleep(backoff(attempt))
}
// Dead letter
a.storeDeadLetter(stream.ID, events)
a.log.Error().Str("stream", stream.Name).Msg("log stream delivery failed after retries")
}6. GraphQL Admin API
type LogStream {
id: ID!
name: String!
destination_type: String!
config: Map # masked sensitive fields
event_filter: [String!]
is_active: Boolean!
organization_id: String
created_at: Int64!
}
type Mutation {
_create_log_stream(params: CreateLogStreamInput!): LogStream!
_update_log_stream(params: UpdateLogStreamInput!): LogStream!
_delete_log_stream(id: ID!): Response!
_test_log_stream(id: ID!): Response! # Send test event to verify connectivity
}
type Query {
_log_streams(params: PaginatedInput): LogStreams!
_log_stream(id: ID!): LogStream!
}
input CreateLogStreamInput {
name: String!
destination_type: String! # webhook | s3 | datadog | splunk
config: Map! # destination-specific config
event_filter: [String!] # empty = all events
organization_id: String # optional
}CLI Configuration Flags
--log-stream-batch-size=100 # Events per batch
--log-stream-flush-interval=5s # Max time before flush
--log-stream-max-retries=5 # Retry attempts
--log-stream-buffer-size=10000 # Max buffered events before dropping
Testing Plan
- Unit tests for each destination provider (mock HTTP)
- Integration test: create stream → trigger events → verify delivery
- Test batching (events arrive in batches)
- Test retry with exponential backoff
- Test dead letter storage after max retries
- Test event filtering (only configured event types streamed)
- Test
_test_log_streamsends test event
References
Reactions are currently unavailable
Metadata
Metadata
Assignees
Labels
enhancementNew feature or requestNew feature or request