-
Notifications
You must be signed in to change notification settings - Fork 8
Migrate handler layer to HTTP types (PR 12) #624
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
180471e
f63e5b2
020e88c
37c8fbf
7495d96
2c40d58
46e3360
2f40b4c
8210a85
99d7bee
a2597e5
d7a35a1
d8b267b
591b9b3
ce456a9
ed57b14
a8c5648
14e54c4
c682c6d
eec34fb
d6be0b2
b25bfd6
a641eb0
1ee695c
5b205bb
df6bc60
21ec187
e51a7d6
b4bda32
57d6bec
571656c
e271dce
7181a92
f4c4b57
b8c4daf
2bc167e
b458d64
089a805
882fd29
291ad66
ebf129b
2ff0ce9
ead539c
8bbfc74
b39cd79
d6a624a
986a1b2
86079c5
a03a765
ac79961
b96aec0
661e3df
774a07f
95ce45e
b10dcec
888170d
f856b68
eb12522
7fcb3b4
1844290
0132a36
ffa1174
fbbf767
b89a9e6
6fa8b38
e9ce63d
794b66d
e13537b
b0c6571
14f282b
34c44bd
04b9cda
2c0c4eb
f6b00c8
ec62970
27a0949
5b6555f
0a8915c
2f1cc97
ba141fa
7310198
a05189e
4617253
236eecf
cd68357
41cb0df
e437454
f9b4d62
1a0c0b6
49e3f1d
086b32c
a9dd665
7365ec4
079a97f
3924a98
dd6929c
2817761
ba9c608
4cd511c
d5f5c0d
f9df8da
1acbfa7
ae402ff
ff2e0cd
6a2ad3f
1a2cb46
e8c06e9
9a1fd41
cd9cda7
cf0df8e
3bb3006
ed3c161
7321e79
dfa0386
e0c9bb8
5f5b067
1d22f8c
8eb72ee
2986f33
caaface
76df6f8
2a908d4
65d1e2d
73734dd
c23663c
ca5585c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Large diffs are not rendered by default.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,17 +1,21 @@ | ||
| use edgezero_core::body::Body as EdgeBody; | ||
| use edgezero_core::http::{ | ||
| header, HeaderName, HeaderValue, Method, Request as HttpRequest, Response as HttpResponse, | ||
| }; | ||
| use error_stack::Report; | ||
| use fastly::http::Method; | ||
| use fastly::{Request, Response}; | ||
| use fastly::http::Method as FastlyMethod; | ||
| use fastly::{Request as FastlyRequest, Response as FastlyResponse}; | ||
|
|
||
| use trusted_server_core::auction::endpoints::handle_auction; | ||
| use trusted_server_core::auction::{build_orchestrator, AuctionOrchestrator}; | ||
| use trusted_server_core::auth::enforce_basic_auth; | ||
| use trusted_server_core::compat; | ||
| use trusted_server_core::constants::{ | ||
| ENV_FASTLY_IS_STAGING, ENV_FASTLY_SERVICE_VERSION, HEADER_X_GEO_INFO_AVAILABLE, | ||
| HEADER_X_TS_ENV, HEADER_X_TS_VERSION, | ||
| }; | ||
| use trusted_server_core::error::TrustedServerError; | ||
| use trusted_server_core::error::{IntoHttpResponse, TrustedServerError}; | ||
| use trusted_server_core::geo::GeoInfo; | ||
| use trusted_server_core::http_util::sanitize_forwarded_headers; | ||
| use trusted_server_core::integrations::IntegrationRegistry; | ||
| use trusted_server_core::platform::RuntimeServices; | ||
| use trusted_server_core::proxy::{ | ||
|
|
@@ -41,20 +45,17 @@ use crate::platform::{build_runtime_services, open_kv_store, UnavailableKvStore} | |
|
|
||
| /// Entry point for the Fastly Compute program. | ||
| /// | ||
| /// Uses an undecorated `main()` with `Request::from_client()` instead of | ||
| /// `#[fastly::main]` so we can call `stream_to_client()` or `send_to_client()` | ||
| /// explicitly. `#[fastly::main]` is syntactic sugar that auto-calls | ||
| /// `send_to_client()` on the returned `Response`, which is incompatible with | ||
| /// streaming. | ||
| /// Uses an undecorated `main()` with `FastlyRequest::from_client()` instead of | ||
| /// `#[fastly::main]` so we can call `send_to_client()` explicitly when needed. | ||
| fn main() { | ||
| init_logger(); | ||
|
|
||
| let req = Request::from_client(); | ||
| let mut req = FastlyRequest::from_client(); | ||
|
|
||
| // Keep the health probe independent from settings loading and routing so | ||
| // readiness checks still get a cheap liveness response during startup. | ||
| if req.get_method() == Method::GET && req.get_path() == "/health" { | ||
| Response::from_status(200) | ||
| if req.get_method() == FastlyMethod::GET && req.get_path() == "/health" { | ||
| FastlyResponse::from_status(200) | ||
| .with_body_text_plain("ok") | ||
| .send_to_client(); | ||
| return; | ||
|
|
@@ -89,71 +90,63 @@ fn main() { | |
| } | ||
| }; | ||
|
|
||
| // Start with an unavailable KV slot. Consent-dependent routes lazily | ||
| // replace it with the configured store at dispatch time so unrelated | ||
| // routes stay available when consent persistence is misconfigured. | ||
| let kv_store = std::sync::Arc::new(UnavailableKvStore) | ||
| as std::sync::Arc<dyn trusted_server_core::platform::PlatformKvStore>; | ||
| // Strip client-spoofable forwarded headers at the edge before building | ||
| // any request-derived context or converting to the core HTTP types. | ||
| compat::sanitize_fastly_forwarded_headers(&mut req); | ||
|
|
||
| let runtime_services = build_runtime_services(&req, kv_store); | ||
| let http_req = compat::from_fastly_request(req); | ||
|
|
||
| // route_request may send the response directly (streaming path) or | ||
| // return it for us to send (buffered path). | ||
| if let Some(response) = futures::executor::block_on(route_request( | ||
| let mut response = futures::executor::block_on(route_request( | ||
| &settings, | ||
| &orchestrator, | ||
| &integration_registry, | ||
| &runtime_services, | ||
| req, | ||
| )) { | ||
| response.send_to_client(); | ||
| } | ||
| http_req, | ||
| )) | ||
| .unwrap_or_else(|e| http_error_response(&e)); | ||
|
|
||
| let geo_info = if response.status() == edgezero_core::http::StatusCode::UNAUTHORIZED { | ||
| None | ||
| } else { | ||
| runtime_services | ||
| .geo() | ||
| .lookup(runtime_services.client_info().client_ip) | ||
| .unwrap_or_else(|e| { | ||
| log::warn!("geo lookup failed: {e}"); | ||
| None | ||
| }) | ||
| }; | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🤔 thinking — 401 responses no longer carry geo headers — silent behavior change. Previously This may be intentional (avoid leaking geo to unauthenticated callers) but it's a silent change. Add a one-line comment to make the rationale explicit: // Skip geo lookup for 401s so geo headers are not exposed to unauthenticated
// callers. Authenticated routes do their own lookups for consent context.
let geo_info = if response.status() == StatusCode::UNAUTHORIZED { … }; |
||
|
|
||
| finalize_response(&settings, geo_info.as_ref(), &mut response); | ||
|
|
||
| compat::to_fastly_response(response).send_to_client(); | ||
| } | ||
|
|
||
| async fn route_request( | ||
| settings: &Settings, | ||
| orchestrator: &AuctionOrchestrator, | ||
| integration_registry: &IntegrationRegistry, | ||
| runtime_services: &RuntimeServices, | ||
| mut req: Request, | ||
| ) -> Option<Response> { | ||
| // Strip client-spoofable forwarded headers at the edge. | ||
| // On Fastly this service IS the first proxy — these headers from | ||
| // clients are untrusted and can hijack URL rewriting (see #409). | ||
| sanitize_forwarded_headers(&mut req); | ||
|
|
||
| // Look up geo info via the platform abstraction using the client IP | ||
| // already captured in RuntimeServices at the entry point. | ||
| let geo_info = runtime_services | ||
| .geo() | ||
| .lookup(runtime_services.client_info().client_ip) | ||
| .unwrap_or_else(|e| { | ||
| log::warn!("geo lookup failed: {e}"); | ||
| None | ||
| }); | ||
|
|
||
| req: HttpRequest, | ||
| ) -> Result<HttpResponse, Report<TrustedServerError>> { | ||
| // `get_settings()` should already have rejected invalid handler regexes. | ||
| // Keep this fallback so manually-constructed or otherwise unprepared | ||
| // settings still become an error response instead of panicking. | ||
| match enforce_basic_auth(settings, &req) { | ||
| Ok(Some(mut response)) => { | ||
| finalize_response(settings, geo_info.as_ref(), &mut response); | ||
| return Some(response); | ||
| } | ||
| Ok(Some(response)) => return Ok(response), | ||
| Ok(None) => {} | ||
| Err(e) => { | ||
| log::error!("Failed to evaluate basic auth: {:?}", e); | ||
| let mut response = to_error_response(&e); | ||
| finalize_response(settings, geo_info.as_ref(), &mut response); | ||
| return Some(response); | ||
| } | ||
| Err(e) => return Err(e), | ||
| } | ||
|
|
||
| // Get path and method for routing | ||
| let path = req.get_path().to_string(); | ||
| let method = req.get_method().clone(); | ||
| let path = req.uri().path().to_string(); | ||
| let method = req.method().clone(); | ||
|
|
||
| // Match known routes and handle them | ||
| let result = match (method, path.as_str()) { | ||
| match (method, path.as_str()) { | ||
| // Serve the tsjs library | ||
| (Method::GET, path) if path.starts_with("/static/tsjs=") => { | ||
| handle_tsjs_dynamic(&req, integration_registry) | ||
|
|
@@ -199,14 +192,24 @@ async fn route_request( | |
| (Method::POST, "/first-party/proxy-rebuild") => { | ||
| handle_first_party_proxy_rebuild(settings, runtime_services, req).await | ||
| } | ||
| (m, path) if integration_registry.has_route(&m, path) => integration_registry | ||
| .handle_proxy(&m, path, settings, runtime_services, req) | ||
| .await | ||
| .unwrap_or_else(|| { | ||
| Err(Report::new(TrustedServerError::BadRequest { | ||
| message: format!("Unknown integration route: {path}"), | ||
| })) | ||
| }), | ||
| (m, path) if integration_registry.has_route(&m, path) => { | ||
| // TODO(PR13): migrate integration trait to http types here | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 📝 note — |
||
| integration_registry | ||
| .handle_proxy( | ||
| &m, | ||
| path, | ||
| settings, | ||
| runtime_services, | ||
| compat::to_fastly_request(req), | ||
| ) | ||
| .await | ||
| .unwrap_or_else(|| { | ||
| Err(Report::new(TrustedServerError::BadRequest { | ||
| message: format!("Unknown integration route: {path}"), | ||
| })) | ||
| }) | ||
| .map(compat::from_fastly_response) | ||
| } | ||
|
|
||
| // No known route matched, proxy to publisher origin as fallback | ||
| _ => { | ||
|
|
@@ -216,64 +219,48 @@ async fn route_request( | |
| ); | ||
|
|
||
| match runtime_services_for_consent_route(settings, runtime_services) { | ||
| Ok(publisher_services) => { | ||
| match handle_publisher_request( | ||
| settings, | ||
| integration_registry, | ||
| &publisher_services, | ||
| req, | ||
| ) { | ||
| Ok(PublisherResponse::Stream { | ||
| mut response, | ||
| body, | ||
| params, | ||
| }) => { | ||
| // Streaming path: finalize headers, then stream body to client. | ||
| finalize_response(settings, geo_info.as_ref(), &mut response); | ||
| let mut streaming_body = response.stream_to_client(); | ||
| if let Err(e) = stream_publisher_body( | ||
| body, | ||
| &mut streaming_body, | ||
| ¶ms, | ||
| settings, | ||
| integration_registry, | ||
| ) { | ||
| // Headers already committed. Log and abort — client | ||
| // sees a truncated response. Standard proxy behavior. | ||
| log::error!("Streaming processing failed: {e:?}"); | ||
| drop(streaming_body); | ||
| } else if let Err(e) = streaming_body.finish() { | ||
| log::error!("Failed to finish streaming body: {e}"); | ||
| } | ||
| // Response already sent via stream_to_client() | ||
| return None; | ||
| } | ||
| Ok(PublisherResponse::PassThrough { mut response, body }) => { | ||
| // Binary pass-through: reattach body and send via send_to_client(). | ||
| // This preserves Content-Length and avoids chunked encoding overhead. | ||
| // Fastly streams the body from its internal buffer — no WASM | ||
| // memory buffering occurs. | ||
| response.set_body(body); | ||
| Ok(response) | ||
| } | ||
| Ok(PublisherResponse::Buffered(response)) => Ok(response), | ||
| Err(e) => { | ||
| log::error!("Failed to proxy to publisher origin: {:?}", e); | ||
| Err(e) | ||
| } | ||
| } | ||
| } | ||
| Ok(publisher_services) => handle_publisher_request( | ||
| settings, | ||
| integration_registry, | ||
| &publisher_services, | ||
| req, | ||
| ) | ||
| .await | ||
| .and_then(|pub_response| { | ||
| resolve_publisher_response(pub_response, settings, integration_registry) | ||
| }), | ||
| Err(e) => Err(e), | ||
| } | ||
| } | ||
| }; | ||
|
|
||
| // Convert any errors to HTTP error responses | ||
| let mut response = result.unwrap_or_else(|e| to_error_response(&e)); | ||
|
|
||
| finalize_response(settings, geo_info.as_ref(), &mut response); | ||
| } | ||
| } | ||
|
|
||
| Some(response) | ||
| fn resolve_publisher_response( | ||
| publisher_response: PublisherResponse, | ||
| settings: &Settings, | ||
| integration_registry: &IntegrationRegistry, | ||
| ) -> Result<HttpResponse, Report<TrustedServerError>> { | ||
| match publisher_response { | ||
| PublisherResponse::Buffered(response) => Ok(response), | ||
| PublisherResponse::Stream { | ||
| mut response, | ||
| body, | ||
| params, | ||
| } => { | ||
| let mut output = Vec::new(); | ||
| stream_publisher_body(body, &mut output, ¶ms, settings, integration_registry)?; | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🔧 wrench — Streaming regression: this arm now buffers the full pipeline output instead of streaming chunks to the client. Before this PR, the adapter handled After this PR,
Structural cause: Fix (preferred) — keep the streaming dispatch in the adapter: // Have route_request return Result<HandlerOutcome, …> where HandlerOutcome
// distinguishes streamed vs buffered responses, then in main():
match outcome {
HandlerOutcome::Streaming { mut response, body, params } => {
finalize_response(&settings, geo_info.as_ref(), &mut response);
let mut fastly_resp = compat::to_fastly_response_skeleton(response);
let mut sb = fastly_resp.stream_to_client();
if let Err(e) = stream_publisher_body(body, &mut sb, ¶ms, &settings, &integration_registry) {
log::error!("Streaming processing failed: {e:?}");
drop(sb);
} else if let Err(e) = sb.finish() {
log::error!("Failed to finish streaming body: {e}");
}
}
HandlerOutcome::Buffered(response) => {
compat::to_fastly_response(response).send_to_client();
}
}Alternative — fold this arm into No test today asserts streaming behavior — the publisher tests ( |
||
| response.headers_mut().insert( | ||
| header::CONTENT_LENGTH, | ||
| HeaderValue::from(output.len() as u64), | ||
| ); | ||
| *response.body_mut() = EdgeBody::from(output); | ||
| Ok(response) | ||
| } | ||
| PublisherResponse::PassThrough { mut response, body } => { | ||
| *response.body_mut() = body; | ||
| Ok(response) | ||
| } | ||
| } | ||
| } | ||
|
|
||
| fn runtime_services_for_consent_route( | ||
|
|
@@ -302,21 +289,48 @@ fn runtime_services_for_consent_route( | |
| /// Header precedence (last write wins): geo headers are set first, then | ||
| /// version/staging, then operator-configured `settings.response_headers`. | ||
| /// This means operators can intentionally override any managed header. | ||
| fn finalize_response(settings: &Settings, geo_info: Option<&GeoInfo>, response: &mut Response) { | ||
| fn finalize_response(settings: &Settings, geo_info: Option<&GeoInfo>, response: &mut HttpResponse) { | ||
| if let Some(geo) = geo_info { | ||
| geo.set_response_headers(response); | ||
| } else { | ||
| response.set_header(HEADER_X_GEO_INFO_AVAILABLE, "false"); | ||
| response.headers_mut().insert( | ||
| HEADER_X_GEO_INFO_AVAILABLE, | ||
| HeaderValue::from_static("false"), | ||
| ); | ||
| } | ||
|
|
||
| if let Ok(v) = ::std::env::var(ENV_FASTLY_SERVICE_VERSION) { | ||
| response.set_header(HEADER_X_TS_VERSION, v); | ||
| if let Ok(value) = HeaderValue::from_str(&v) { | ||
| response.headers_mut().insert(HEADER_X_TS_VERSION, value); | ||
| } else { | ||
| log::warn!("Skipping invalid FASTLY_SERVICE_VERSION response header value"); | ||
| } | ||
| } | ||
| if ::std::env::var(ENV_FASTLY_IS_STAGING).as_deref() == Ok("1") { | ||
| response.set_header(HEADER_X_TS_ENV, "staging"); | ||
| response | ||
| .headers_mut() | ||
| .insert(HEADER_X_TS_ENV, HeaderValue::from_static("staging")); | ||
| } | ||
|
|
||
| for (key, value) in &settings.response_headers { | ||
| response.set_header(key, value); | ||
| let header_name = HeaderName::from_bytes(key.as_bytes()) | ||
| .expect("settings.response_headers validated at load time"); | ||
| let header_value = | ||
| HeaderValue::from_str(value).expect("settings.response_headers validated at load time"); | ||
| response.headers_mut().insert(header_name, header_value); | ||
| } | ||
| } | ||
|
|
||
| fn http_error_response(report: &Report<TrustedServerError>) -> HttpResponse { | ||
| let root_error = report.current_context(); | ||
| log::error!("Error occurred: {:?}", report); | ||
|
|
||
|
prk-Jr marked this conversation as resolved.
|
||
| let mut response = | ||
| HttpResponse::new(EdgeBody::from(format!("{}\n", root_error.user_message()))); | ||
| *response.status_mut() = root_error.status_code(); | ||
| response.headers_mut().insert( | ||
| header::CONTENT_TYPE, | ||
| HeaderValue::from_static("text/plain; charset=utf-8"), | ||
| ); | ||
| response | ||
| } | ||
Uh oh!
There was an error while loading. Please reload this page.