Skip to content

RFC: Log Streaming & SIEM Integration #522

@lakhansamani

Description

@lakhansamani

RFC: Log Streaming & SIEM Integration

Phase: 5 — Advanced Security & Enterprise
Priority: P3 — Medium
Estimated Effort: Medium
Depends on: Audit Logs (#505)


Problem Statement

Enterprise customers need to feed auth events into their existing security tooling (Datadog, Splunk, S3, SIEM platforms). The audit log system (#505) stores events in Authorizer's database, but enterprises need real-time streaming to external destinations for centralized monitoring and compliance. WorkOS Log Streams provides this capability.


Proposed Solution

1. Log Stream Configuration

New schema: internal/storage/schemas/log_stream.go

type LogStream struct {
    ID              string `json:"id" gorm:"primaryKey;type:char(36)"`
    Name            string `json:"name" gorm:"type:varchar(256)"`
    DestinationType string `json:"destination_type" gorm:"type:varchar(50)"`   // webhook | s3 | datadog | splunk
    Config          string `json:"config" gorm:"type:text"`                     // JSON destination-specific config
    EventFilter     string `json:"event_filter" gorm:"type:text"`              // JSON: which event types to stream
    IsActive        bool   `json:"is_active" gorm:"type:bool;default:true"`
    OrganizationID  string `json:"organization_id" gorm:"type:char(36);index"` // optional org scoping
    CreatedAt       int64  `json:"created_at" gorm:"autoCreateTime"`
    UpdatedAt       int64  `json:"updated_at" gorm:"autoUpdateTime"`
}

2. Destination Providers

Interface: internal/log_stream/

type Destination interface {
    Send(ctx context.Context, events []AuditEvent) error
    HealthCheck(ctx context.Context) error
    Name() string
}

Supported destinations:

Destination Config Fields Format
HTTP Webhook url, headers, auth_header JSON POST
AWS S3 bucket, prefix, region, access_key, secret_key JSONL files
Datadog api_key, site (us1/eu1/etc) Datadog Log API
Splunk HEC url, token Splunk HTTP Event Collector

3. Structured Event Format

All destinations receive events in a consistent JSON format:

{
    "id": "evt_a1b2c3d4",
    "timestamp": "2026-03-30T10:15:30Z",
    "type": "user.login_success",
    "version": "1.0",
    "actor": {
        "id": "usr_123",
        "type": "user",
        "email": "john@example.com",
        "ip_address": "203.0.113.42",
        "user_agent": "Mozilla/5.0..."
    },
    "target": {
        "type": "session",
        "id": "sess_456"
    },
    "metadata": {
        "method": "password",
        "mfa_used": true,
        "device_hash": "abc123..."
    },
    "organization_id": "org_789",
    "environment": {
        "authorizer_version": "2.0.0",
        "instance_id": "inst_..."
    }
}

4. Streaming Architecture

Built on top of audit log system (#505) — the audit log's buffered channel feeds both database writes and log streams:

func (a *auditProvider) flushLoop() {
    for {
        select {
        case event := <-a.eventChan:
            batch = append(batch, event)
            if len(batch) >= batchSize || ticker fired {
                // Write to database
                a.writeBatch(batch)
                // Fan out to active log streams
                a.streamBatch(batch)
                batch = batch[:0]
            }
        }
    }
}

func (a *auditProvider) streamBatch(events []AuditEvent) {
    streams, _ := a.store.ListActiveLogStreams(ctx)
    for _, stream := range streams {
        // Filter events based on stream's event filter
        filtered := filterEvents(events, stream.EventFilter)
        if len(filtered) == 0 { continue }
        
        // Send asynchronously with retry
        go a.sendWithRetry(stream, filtered)
    }
}

5. Batching and Retry

  • Batch size: configurable (default: 100 events or 5 seconds, whichever comes first)
  • Retry: exponential backoff (1s, 2s, 4s, 8s, max 60s), max 5 retries
  • Dead letter: after max retries, log failure and store in dead letter table for manual retry
  • Backpressure: if destination is consistently slow, buffer up to max buffer size, then drop oldest events with warning
func (a *auditProvider) sendWithRetry(stream *schemas.LogStream, events []AuditEvent) {
    dest := a.getDestination(stream)
    for attempt := 0; attempt < maxRetries; attempt++ {
        err := dest.Send(ctx, events)
        if err == nil { return }
        time.Sleep(backoff(attempt))
    }
    // Dead letter
    a.storeDeadLetter(stream.ID, events)
    a.log.Error().Str("stream", stream.Name).Msg("log stream delivery failed after retries")
}

6. GraphQL Admin API

type LogStream {
    id: ID!
    name: String!
    destination_type: String!
    config: Map                        # masked sensitive fields
    event_filter: [String!]
    is_active: Boolean!
    organization_id: String
    created_at: Int64!
}

type Mutation {
    _create_log_stream(params: CreateLogStreamInput!): LogStream!
    _update_log_stream(params: UpdateLogStreamInput!): LogStream!
    _delete_log_stream(id: ID!): Response!
    _test_log_stream(id: ID!): Response!       # Send test event to verify connectivity
}

type Query {
    _log_streams(params: PaginatedInput): LogStreams!
    _log_stream(id: ID!): LogStream!
}

input CreateLogStreamInput {
    name: String!
    destination_type: String!          # webhook | s3 | datadog | splunk
    config: Map!                       # destination-specific config
    event_filter: [String!]            # empty = all events
    organization_id: String            # optional
}

CLI Configuration Flags

--log-stream-batch-size=100                # Events per batch
--log-stream-flush-interval=5s             # Max time before flush
--log-stream-max-retries=5                 # Retry attempts
--log-stream-buffer-size=10000             # Max buffered events before dropping

Testing Plan

  • Unit tests for each destination provider (mock HTTP)
  • Integration test: create stream → trigger events → verify delivery
  • Test batching (events arrive in batches)
  • Test retry with exponential backoff
  • Test dead letter storage after max retries
  • Test event filtering (only configured event types streamed)
  • Test _test_log_stream sends test event

References

Metadata

Metadata

Assignees

No one assigned

    Labels

    enhancementNew feature or request

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions