Skip to content

Route delivery receipts through proxy Queue instead of direct HTTP #168

@vrknetha

Description

@vrknetha

Epic: #211 (CF Queues backbone) — Phase 2

Problem

Currently, when Alice's connector processes Bob's message, it POSTs the delivery receipt directly to Bob's proxy via HTTP. This has issues:

  • Alice's connector needs to know Bob's proxy URL and authenticate to it
  • If Bob's proxy is temporarily unavailable, the receipt is lost (fire-and-forget)
  • Cross-proxy receipt delivery requires the connector to handle external auth

Current Flow

Alice's connector → HTTP POST → Bob's proxy → stores in DO → nobody reads it

Proposed Flow

Alice's connector → HTTP POST → Alice's proxy (trusted, local)
  → Alice's proxy publishes to clawdentity-receipts Queue
  → Bob's proxy consumes from Queue
  → Bob's proxy routes to Bob's relay DO
  → Bob's DO pushes via WebSocket to Bob's connector (#165)
  → Bob's connector → Bob's OpenClaw hook

Implementation

1. Create receipt queue

File: apps/proxy/wrangler.jsonc (all envs)

"queues": {
  "producers": [
    {
      "binding": "RECEIPT_QUEUE",
      "queue": "clawdentity-receipts-dev"
    }
  ],
  "consumers": [
    // ... existing clawdentity-events consumer from #167
    {
      "queue": "clawdentity-receipts-dev",
      "max_batch_size": 25,
      "max_batch_timeout": 5,
      "dead_letter_queue": "clawdentity-receipts-dlq-dev"
    }
  ]
}

2. Proxy receipt endpoint (for local connector)

New file: apps/proxy/src/routes/receipt-route.ts

Alice's connector POSTs receipt to its own proxy:

// POST /v1/relay/delivery-receipts (already exists, refactor to publish to queue)
export function createReceiptHandler(env: Bindings) {
  return async (c: Context) => {
    // Validate receipt payload
    const receipt = receiptSchema.parse(await c.req.json());
    
    // Publish to receipt queue instead of direct cross-proxy HTTP
    await env.RECEIPT_QUEUE.send(JSON.stringify({
      type: "delivery_receipt",
      senderAgentDid: receipt.fromAgentDid,     // who sent the original message
      recipientAgentDid: receipt.toAgentDid,     // who received it
      originalFrameId: receipt.frameId,
      status: receipt.status,                    // "processed_by_openclaw" | "dead_lettered"
      reason: receipt.reason,
      timestamp: nowIso(),
      senderProxyOrigin: receipt.senderProxyUrl, // so consumer knows which DO to route to
    }));
    
    return c.json({ accepted: true }, 202);
  };
}

3. Queue consumer routes receipt to sender's DO

File: apps/proxy/src/queue-consumer/receipt-events.ts

export async function handleReceiptEvent(
  event: ReceiptQueueEvent,
  env: Bindings,
): Promise<void> {
  // Route to the SENDER's relay DO (they need to know their message was delivered)
  const doId = env.AGENT_RELAY_SESSION.idFromName(event.senderAgentDid);
  const stub = env.AGENT_RELAY_SESSION.get(doId);
  
  await stub.fetch(new Request("http://internal/receipt", {
    method: "POST",
    body: JSON.stringify({
      originalFrameId: event.originalFrameId,
      toAgentDid: event.recipientAgentDid,
      status: event.status,
      reason: event.reason,
      timestamp: event.timestamp,
    }),
  }));
}

4. Update proxy worker queue() handler

File: apps/proxy/src/worker.ts

async queue(batch: MessageBatch<string>, env: Bindings): Promise<void> {
  for (const message of batch.messages) {
    try {
      const event = JSON.parse(message.body);
      
      // Route based on event type
      if (event.type === "agent.auth.revoked" || event.type?.startsWith("agent.auth.")) {
        await handleRegistryEvent(event, env);
      } else if (event.type === "delivery_receipt") {
        await handleReceiptEvent(event, env);
      }
      
      message.ack();
    } catch (error) {
      message.retry();
    }
  }
}

5. Remove direct cross-proxy HTTP receipt POST

File: packages/connector/src/runtime/relay-service.ts

Change postDeliveryReceipt to POST to own proxy instead of peer proxy:

// Before: POST to peer proxy (requires cross-proxy auth)
// After: POST to own proxy receipt endpoint (trusted, local)

This eliminates the trusted receipt target validation complexity and the requirement for the connector to authenticate to remote proxies.

Queue Routing Logic

                    clawdentity-receipts Queue
                            │
                    ┌───────┴────────┐
                    │ Queue Consumer  │
                    │ (proxy worker)  │
                    └───────┬────────┘
                            │
            ┌───────────────┼───────────────┐
            ▼               ▼               ▼
    Agent A's DO      Agent B's DO     Agent C's DO
    (by senderDid)   (by senderDid)   (by senderDid)
            │               │               │
            ▼               ▼               ▼
    WebSocket push   WebSocket push   WebSocket push
    to connector     to connector     to connector

Dependencies

Impact

  • CF Queues free tier: 1M messages/month (receipts are low volume, <1000/day at scale)
  • Eliminates cross-proxy auth for receipts
  • Receipts are durable (queue retries on consumer failure)
  • Dead letter receipts follow the same path as success receipts

Acceptance Criteria

  • clawdentity-receipts queue created in wrangler config (dev + prod)
  • Connector POSTs receipts to own proxy, not peer proxy
  • Proxy publishes receipt to queue
  • Queue consumer routes receipt to sender's relay DO
  • DO pushes receipt frame to sender's connector via WebSocket
  • Sender's agent receives receipt at OpenClaw hook
  • Dead letter queue catches consumer failures
  • Test: full round-trip — send message → delivered → receipt reaches sender agent

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions