Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 33 additions & 0 deletions api-description.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ servers:
tags:
- name: "Health"
description: "Health check"
- name: "Metrics"
description: "Prometheus scrape endpoint"
- name: "File upload"
description: "Upload files"
- name: "File download"
Expand All @@ -30,6 +32,37 @@ paths:
schema:
type: "string"
example: "OK"
/metrics:
get:
tags:
- "Metrics"
summary: "Prometheus text-format metrics"
description: |
Returns usage counters and gauges suitable for Prometheus scraping
by the Grafana instance on Scaleway. Intended to be reachable only
from the internal monitoring network; firewall or reverse-proxy
allow-list in front of Cryptify.

Exposed metrics:
* `cryptify_uploads_total{channel}` — counter of finalized uploads.
* `cryptify_upload_bytes_total{channel}` — counter of bytes.
* `cryptify_storage_bytes` — gauge, current disk usage.
* `cryptify_active_files` — gauge, current file count.
* `cryptify_expired_files_total` — counter of uploads purged
before finalization.

The `channel` label is derived from the `X-Cryptify-Source` header,
falling back to `Authorization`/`X-Api-Key` (→ `api`), then the
`Origin` header (`website` / `staging-website`), then `User-Agent`
(`outlook` / `thunderbird`), then `unknown`.
operationId: "metrics"
responses:
"200":
description: "Prometheus text exposition format"
content:
text/plain:
schema:
type: "string"
/fileupload/init:
post:
tags:
Expand Down
8 changes: 8 additions & 0 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ pub struct RawCryptifyConfig {
smtp_tls: Option<bool>,
allowed_origins: String,
pkg_url: String,
metrics_scan_interval_secs: Option<u64>,
chunk_size: Option<u64>,
session_ttl_secs: Option<u64>,
staging_mode: Option<bool>,
Expand All @@ -30,6 +31,7 @@ pub struct CryptifyConfig {
smtp_tls: bool,
allowed_origins: String,
pkg_url: String,
metrics_scan_interval_secs: u64,
chunk_size: u64,
session_ttl_secs: u64,
staging_mode: bool,
Expand All @@ -51,6 +53,7 @@ impl From<RawCryptifyConfig> for CryptifyConfig {
smtp_tls: config.smtp_tls.unwrap_or(true),
allowed_origins: config.allowed_origins,
pkg_url: config.pkg_url,
metrics_scan_interval_secs: config.metrics_scan_interval_secs.unwrap_or(60),
chunk_size: config.chunk_size.unwrap_or(5_000_000),
session_ttl_secs: config.session_ttl_secs.unwrap_or(3600),
staging_mode: config.staging_mode.unwrap_or(false),
Expand Down Expand Up @@ -99,6 +102,10 @@ impl CryptifyConfig {
&self.pkg_url
}

pub fn metrics_scan_interval_secs(&self) -> u64 {
self.metrics_scan_interval_secs
}

pub fn chunk_size(&self) -> u64 {
self.chunk_size
}
Expand All @@ -124,6 +131,7 @@ impl CryptifyConfig {
smtp_tls: false,
allowed_origins: String::new(),
pkg_url: String::new(),
metrics_scan_interval_secs: 60,
chunk_size: 5_000_000,
session_ttl_secs: 3600,
staging_mode,
Expand Down
1 change: 1 addition & 0 deletions src/email.rs
Original file line number Diff line number Diff line change
Expand Up @@ -419,6 +419,7 @@ mod tests {
("phone".to_owned(), "+31123".to_owned()),
],
confirm: true,
source_channel: String::new(),
notify_recipients: true,
api_key_tenant: None,
api_key_validation_failed: false,
Expand Down
58 changes: 51 additions & 7 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
mod config;
mod email;
mod error;
mod metrics;
mod store;

use std::sync::Arc;
use std::time::Duration;

use crate::config::CryptifyConfig;
use crate::email::send_email;
use crate::error::{Error, PayloadTooLargeBody};
use crate::metrics::{detect_channel, storage_sampler, Metrics};
use crate::store::{
API_KEY_PER_UPLOAD_LIMIT, API_KEY_ROLLING_LIMIT, PER_UPLOAD_LIMIT, ROLLING_LIMIT,
ROLLING_WINDOW_SECS,
Expand Down Expand Up @@ -38,7 +43,6 @@ use rocket::http::Method;
use rocket_cors::{AllowedHeaders, AllowedOrigins, CorsOptions};

use serde::{Deserialize, Serialize};
use std::time::Duration;
use store::{FileState, LastChunkRecord, Store};

#[derive(Serialize, Deserialize)]
Expand Down Expand Up @@ -89,11 +93,35 @@ struct InitResponder {
cryptify_token: CryptifyToken,
}

/// Request guard that derives the traffic source channel from the request
/// headers for metrics labelling.
struct ClientHeaders {
channel: String,
}

#[rocket::async_trait]
impl<'r> FromRequest<'r> for ClientHeaders {
type Error = std::convert::Infallible;

async fn from_request(
request: &'r rocket::Request<'_>,
) -> rocket::request::Outcome<Self, Self::Error> {
rocket::request::Outcome::Success(ClientHeaders {
channel: detect_channel(request.headers()),
})
}
}

#[get("/health")]
fn health() -> &'static str {
"OK"
}

#[get("/metrics")]
fn metrics_endpoint(metrics: &State<Arc<Metrics>>) -> rocket::response::content::RawText<String> {
rocket::response::content::RawText(metrics.render())
}

/// Extract a `PG-…` bearer token from an Authorization header value, or
/// `None` for any other shape (missing, wrong scheme, non-PG prefix). Kept
/// as a pure helper so the parsing rules are unit-testable without HTTP.
Expand Down Expand Up @@ -258,6 +286,7 @@ async fn upload_init(
store: &State<Store>,
api_key: ApiKey,
request: Json<InitBody>,
client_headers: ClientHeaders,
) -> Result<InitResponder, Error> {
let current_time = chrono::offset::Utc::now().timestamp();

Expand Down Expand Up @@ -288,6 +317,7 @@ async fn upload_init(
sender: None,
sender_attributes: Vec::new(),
confirm: request.confirm,
source_channel: client_headers.channel,
notify_recipients: request.notify_recipients,
api_key_tenant: api_key.tenant,
api_key_validation_failed: api_key.validation_failed,
Expand Down Expand Up @@ -694,6 +724,7 @@ async fn upload_finalize(
config: &State<CryptifyConfig>,
store: &State<Store>,
vk: &State<Parameters<VerifyingKey>>,
metrics: &State<Arc<Metrics>>,
headers: FinalizeHeaders,
uuid: &str,
) -> Result<(), Error> {
Expand Down Expand Up @@ -792,6 +823,8 @@ async fn upload_finalize(
Error::InternalServerError(Some("could not send email".to_owned()))
})?;

metrics.record_upload(&state.source_channel, state.uploaded);

if let Some(key) = accounting_key {
store.record_upload(key, state.uploaded, now_secs);
}
Expand Down Expand Up @@ -1141,6 +1174,13 @@ pub fn build_rocket(figment: Figment, vk: Parameters<VerifyingKey>) -> Rocket<Bu
.to_cors()
.expect("unable to configure CORS");

let metrics = Arc::new(Metrics::new());
rocket::tokio::spawn(storage_sampler(
metrics.clone(),
std::path::PathBuf::from(config.data_dir()),
Duration::from_secs(config.metrics_scan_interval_secs()),
));

let pkg_client = PkgClient::new(config.pkg_url().to_string());

rocket
Expand All @@ -1149,6 +1189,7 @@ pub fn build_rocket(figment: Figment, vk: Parameters<VerifyingKey>) -> Rocket<Bu
"/",
routes![
health,
metrics_endpoint,
upload_init,
upload_chunk,
upload_finalize,
Expand All @@ -1158,11 +1199,13 @@ pub fn build_rocket(figment: Figment, vk: Parameters<VerifyingKey>) -> Rocket<Bu
],
)
.attach(AdHoc::config::<CryptifyConfig>())
.manage(Store::with_idle_ttl(std::time::Duration::from_secs(
config.session_ttl_secs(),
)))
.manage(Store::with_idle_ttl(
std::time::Duration::from_secs(config.session_ttl_secs()),
metrics.clone(),
))
.manage(vk)
.manage(pkg_client)
.manage(metrics)
}

#[launch]
Expand Down Expand Up @@ -1361,7 +1404,7 @@ mod tests {
let rocket = rocket::custom(figment)
.mount("/", routes![upload_init])
.attach(AdHoc::config::<CryptifyConfig>())
.manage(Store::new());
.manage(Store::new(Arc::new(Metrics::new())));

Client::tracked(rocket).await.expect("valid rocket")
}
Expand Down Expand Up @@ -1450,7 +1493,7 @@ mod tests {
let rocket = rocket::custom(figment)
.mount("/", routes![upload_init, upload_status])
.attach(AdHoc::config::<CryptifyConfig>())
.manage(Store::new());
.manage(Store::new(Arc::new(Metrics::new())));

Client::tracked(rocket).await.expect("valid rocket")
}
Expand Down Expand Up @@ -1497,7 +1540,7 @@ mod tests {
.attach(cors)
.mount("/", routes![upload_init, upload_status])
.attach(AdHoc::config::<CryptifyConfig>())
.manage(Store::new());
.manage(Store::new(Arc::new(Metrics::new())));

Client::tracked(rocket).await.expect("valid rocket")
}
Expand Down Expand Up @@ -1787,6 +1830,7 @@ mod tests {
sender: None,
sender_attributes: Vec::new(),
confirm: false,
source_channel: String::new(),
notify_recipients: true,
api_key_tenant: None,
api_key_validation_failed: false,
Expand Down
Loading