Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
197 changes: 142 additions & 55 deletions crates/trusted-server-adapter-fastly/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ use trusted_server_core::ec::finalize::ec_finalize_response;
use trusted_server_core::ec::identify::{cors_preflight_identify, handle_identify};
use trusted_server_core::ec::kv::KvIdentityGraph;
use trusted_server_core::ec::partner::PartnerStore;
use trusted_server_core::ec::pull_sync::{
build_pull_sync_context, dispatch_pull_sync, PullSyncContext,
};
use trusted_server_core::ec::sync_pixel::{handle_sync, FastlyRateLimiter, RATE_COUNTER_NAME};
use trusted_server_core::ec::EcContext;
use trusted_server_core::error::TrustedServerError;
Expand All @@ -37,21 +40,26 @@ use trusted_server_core::settings_data::get_settings;
mod error;
use crate::error::to_error_response;

#[fastly::main]
fn main(req: Request) -> Result<Response, Error> {
fn main() -> Result<(), Error> {
init_logger();

let req = Request::from_client();

// Keep the health probe independent from settings loading and routing so
// readiness checks still get a cheap liveness response during startup.
if req.get_method() == Method::GET && req.get_path() == "/health" {
return Ok(Response::from_status(200).with_body_text_plain("ok"));
Response::from_status(200)
.with_body_text_plain("ok")
.send_to_client();
return Ok(());
}

let settings = match get_settings() {
Ok(s) => s,
Err(e) => {
log::error!("Failed to load settings: {:?}", e);
return Ok(to_error_response(&e));
to_error_response(&e).send_to_client();
return Ok(());
}
};
log::debug!("Settings {settings:?}");
Expand All @@ -63,24 +71,44 @@ fn main(req: Request) -> Result<Response, Error> {
Ok(r) => r,
Err(e) => {
log::error!("Failed to create integration registry: {:?}", e);
return Ok(to_error_response(&e));
to_error_response(&e).send_to_client();
return Ok(());
}
};

futures::executor::block_on(route_request(
let outcome = futures::executor::block_on(route_request(
&settings,
&orchestrator,
&integration_registry,
req,
))
))?;

let RouteOutcome {
response,
pull_sync_context,
} = outcome;

response.send_to_client();

if let Some(context) = pull_sync_context {
run_pull_sync_after_send(&settings, &context);
}

Ok(())
}

#[must_use]
struct RouteOutcome {
response: Response,
pull_sync_context: Option<PullSyncContext>,
}

async fn route_request(
settings: &Settings,
orchestrator: &AuctionOrchestrator,
integration_registry: &IntegrationRegistry,
mut req: Request,
) -> Result<Response, Error> {
) -> Result<RouteOutcome, Error> {
// Strip client-spoofable forwarded headers at the edge.
// On Fastly this service IS the first proxy — these headers from
// clients are untrusted and can hijack URL rewriting (see #409).
Expand All @@ -101,7 +129,10 @@ async fn route_request(
})
.unwrap_or_else(|e| to_error_response(&e));
finalize_response(settings, geo_info.as_ref(), &mut response);
return Ok(response);
return Ok(RouteOutcome {
response,
pull_sync_context: None,
});
}

let mut ec_context =
Expand All @@ -110,7 +141,10 @@ async fn route_request(
Err(err) => {
let mut response = to_error_response(&err);
finalize_response(settings, geo_info.as_ref(), &mut response);
return Ok(response);
return Ok(RouteOutcome {
response,
pull_sync_context: None,
});
}
};

Expand All @@ -125,79 +159,99 @@ async fn route_request(
&mut response,
);
finalize_response(settings, geo_info.as_ref(), &mut response);
return Ok(response);
return Ok(RouteOutcome {
response,
pull_sync_context: None,
});
}

// Get path and method for routing
let path = req.get_path().to_string();
let method = req.get_method().clone();

// Match known routes and handle them
let result = match (method, path.as_str()) {
let (result, organic_route) = match (method, path.as_str()) {
// Serve the tsjs library
(Method::GET, path) if path.starts_with("/static/tsjs=") => {
handle_tsjs_dynamic(&req, integration_registry)
(handle_tsjs_dynamic(&req, integration_registry), false)
}

// Discovery endpoint for trusted-server capabilities and JWKS
(Method::GET, "/.well-known/trusted-server.json") => {
handle_trusted_server_discovery(settings, req)
(handle_trusted_server_discovery(settings, req), false)
}

// Signature verification endpoint
(Method::POST, "/verify-signature") => handle_verify_signature(settings, req),
(Method::POST, "/verify-signature") => (handle_verify_signature(settings, req), false),

// Admin endpoints
// Keep in sync with Settings::ADMIN_ENDPOINTS in crates/trusted-server-core/src/settings.rs
(Method::POST, "/admin/keys/rotate") => handle_rotate_key(settings, req),
(Method::POST, "/admin/keys/deactivate") => handle_deactivate_key(settings, req),
(Method::POST, "/admin/partners/register") => {
require_partner_store(settings).and_then(|store| handle_register_partner(&store, req))
}

(Method::GET, "/sync") => require_identity_graph(settings).and_then(|kv| {
require_partner_store(settings).and_then(|partner_store| {
handle_sync(settings, &kv, &partner_store, &req, &mut ec_context)
})
}),
(Method::GET, "/identify") => require_identity_graph(settings).and_then(|kv| {
require_partner_store(settings).and_then(|partner_store| {
handle_identify(settings, &kv, &partner_store, &req, &ec_context)
})
}),
(Method::OPTIONS, "/identify") => cors_preflight_identify(settings, &req),
(Method::POST, "/admin/keys/rotate") => (handle_rotate_key(settings, req), false),
(Method::POST, "/admin/keys/deactivate") => (handle_deactivate_key(settings, req), false),
(Method::POST, "/admin/partners/register") => (
require_partner_store(settings).and_then(|store| handle_register_partner(&store, req)),
false,
),

(Method::GET, "/sync") => (
require_identity_graph(settings).and_then(|kv| {
require_partner_store(settings).and_then(|partner_store| {
handle_sync(settings, &kv, &partner_store, &req, &mut ec_context)
})
}),
false,
),
(Method::GET, "/identify") => (
require_identity_graph(settings).and_then(|kv| {
require_partner_store(settings).and_then(|partner_store| {
handle_identify(settings, &kv, &partner_store, &req, &ec_context)
})
}),
false,
),
(Method::OPTIONS, "/identify") => (cors_preflight_identify(settings, &req), false),

// Unified auction endpoint (returns creative HTML inline)
(Method::POST, "/auction") => {
let partner_store = require_partner_store(settings).ok();
handle_auction(
settings,
orchestrator,
kv_graph.as_ref(),
partner_store.as_ref(),
&ec_context,
req,
(
handle_auction(
settings,
orchestrator,
kv_graph.as_ref(),
partner_store.as_ref(),
&ec_context,
req,
)
.await,
false,
)
.await
}

// tsjs endpoints
(Method::GET, "/first-party/proxy") => handle_first_party_proxy(settings, req).await,
(Method::GET, "/first-party/click") => handle_first_party_click(settings, req).await,
(Method::GET, "/first-party/proxy") => {
(handle_first_party_proxy(settings, req).await, false)
}
(Method::GET, "/first-party/click") => {
(handle_first_party_click(settings, req).await, false)
}
(Method::GET, "/first-party/sign") | (Method::POST, "/first-party/sign") => {
handle_first_party_proxy_sign(settings, req).await
(handle_first_party_proxy_sign(settings, req).await, false)
}
(Method::POST, "/first-party/proxy-rebuild") => {
handle_first_party_proxy_rebuild(settings, req).await
(handle_first_party_proxy_rebuild(settings, req).await, false)
}
(m, path) if integration_registry.has_route(&m, path) => {
let result = integration_registry
.handle_proxy(&m, path, settings, kv_graph.as_ref(), &mut ec_context, req)
.await
.unwrap_or_else(|| {
Err(Report::new(TrustedServerError::BadRequest {
message: format!("Unknown integration route: {path}"),
}))
});
(result, true)
}
(m, path) if integration_registry.has_route(&m, path) => integration_registry
.handle_proxy(&m, path, settings, kv_graph.as_ref(), &mut ec_context, req)
.await
.unwrap_or_else(|| {
Err(Report::new(TrustedServerError::BadRequest {
message: format!("Unknown integration route: {path}"),
}))
}),

// No known route matched, proxy to publisher origin as fallback
_ => {
Expand All @@ -206,7 +260,7 @@ async fn route_request(
path
);

match handle_publisher_request(
let result = match handle_publisher_request(
settings,
integration_registry,
kv_graph.as_ref(),
Expand All @@ -218,10 +272,13 @@ async fn route_request(
log::error!("Failed to proxy to publisher origin: {:?}", e);
Err(e)
}
}
};
(result, true)
}
};

let route_succeeded = result.is_ok();

// Convert any errors to HTTP error responses
let mut response = result.unwrap_or_else(|e| to_error_response(&e));

Expand All @@ -235,13 +292,43 @@ async fn route_request(

finalize_response(settings, geo_info.as_ref(), &mut response);

Ok(response)
let pull_sync_context = if organic_route && route_succeeded {
build_pull_sync_context(&ec_context)
} else {
None
};

Ok(RouteOutcome {
response,
pull_sync_context,
})
}

fn maybe_identity_graph(settings: &Settings) -> Option<KvIdentityGraph> {
settings.ec.ec_store.as_ref().map(KvIdentityGraph::new)
}

fn run_pull_sync_after_send(settings: &Settings, context: &PullSyncContext) {
let kv = match require_identity_graph(settings) {
Ok(kv) => kv,
Err(err) => {
log::debug!("Pull sync: identity graph unavailable, skipping: {err:?}");
return;
}
};

let partner_store = match require_partner_store(settings) {
Ok(store) => store,
Err(err) => {
log::debug!("Pull sync: partner store unavailable, skipping: {err:?}");
return;
}
};

let limiter = FastlyRateLimiter::new(RATE_COUNTER_NAME);
dispatch_pull_sync(settings, &kv, &partner_store, &limiter, context);
}

/// Applies all standard response headers: geo, version, staging, and configured headers.
///
/// Called from every response path (including auth early-returns) so that all
Expand Down
21 changes: 21 additions & 0 deletions crates/trusted-server-core/src/ec/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
//! - [`identify`] — Browser identity read endpoint (`GET /identify`)
//! - [`eids`] — Shared EID resolution and formatting helpers
//! - [`batch_sync`] — S2S batch sync endpoint (`POST /api/v1/sync`)
//! - [`pull_sync`] — Background pull-sync dispatcher for organic routes

pub mod admin;
pub mod batch_sync;
Expand All @@ -36,6 +37,7 @@ pub mod identify;
pub mod kv;
pub mod kv_types;
pub mod partner;
pub mod pull_sync;
pub mod sync_pixel;

use cookie::CookieJar;
Expand Down Expand Up @@ -362,6 +364,25 @@ impl EcContext {
geo_info: None,
}
}

/// Creates a test-only [`EcContext`] with explicit client IP.
#[cfg(test)]
#[must_use]
pub fn new_for_test_with_ip(
ec_value: Option<String>,
consent: ConsentContext,
client_ip: Option<String>,
) -> Self {
Self {
ec_was_present: ec_value.is_some(),
cookie_ec_value: ec_value.clone(),
ec_value,
ec_generated: false,
consent,
client_ip,
geo_info: None,
}
}
}

fn current_timestamp() -> u64 {
Expand Down
Loading
Loading