Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

142 changes: 134 additions & 8 deletions apps/labrinth/src/routes/v3/analytics_get.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,24 @@ use rust_decimal::Decimal;
use serde::{Deserialize, Deserializer, Serialize, Serializer, de::Error as _};

use crate::{
auth::{AuthenticationError, get_user_from_headers},
auth::{
AuthenticationError, checks::filter_visible_version_ids,
get_user_from_headers,
},
database::{
self, DBProject,
models::{
DBAffiliateCode, DBAffiliateCodeId, DBProjectId, DBUser, DBUserId,
DBVersionId,
DBVersion, DBVersionId,
},
redis::RedisPool,
},
models::{
ids::{AffiliateCodeId, ProjectId, VersionId},
pats::Scopes,
projects::ProjectStatus,
teams::ProjectPermissions,
threads::MessageBody,
v3::analytics::DownloadReason,
},
queue::session::AuthQueue,
Expand Down Expand Up @@ -255,17 +260,46 @@ pub const MAX_TIME_SLICES: usize = 1024;

/// Response for a [`GetRequest`].
#[derive(Debug, Default, Serialize, Deserialize, utoipa::ToSchema)]
pub struct FetchResponse {
pub struct GetResponse {
/// List of N [`TimeSlice`]s, where each slice represents an equal
/// time interval of metrics collection. The number of slices is determined
/// by [`GetRequest::time_range`].
pub metrics: Vec<TimeSlice>,
/// List of events associated with projects that were requested.
pub project_events: Vec<ProjectAnalyticsEvent>,
}

/// Single time interval of metrics collection.
#[derive(Debug, Clone, Default, Serialize, Deserialize, utoipa::ToSchema)]
pub struct TimeSlice(pub Vec<AnalyticsData>);

/// Notable update to a project which may reflect in analytics metrics.
#[derive(Debug, Clone, Serialize, Deserialize, utoipa::ToSchema)]
pub struct ProjectAnalyticsEvent {
/// ID of the event's project.
pub project_id: ProjectId,
/// When the event occurred.
pub timestamp: DateTime<Utc>,
#[serde(flatten)]
pub kind: ProjectAnalyticsEventKind,
}

#[derive(Debug, Clone, Serialize, Deserialize, utoipa::ToSchema)]
#[serde(tag = "kind", rename_all = "snake_case")]
pub enum ProjectAnalyticsEventKind {
/// New version of this project was uploaded.
VersionUploaded {
version_id: VersionId,
version_name: String,
version_number: String,
},
/// Project changed status.
StatusChanged {
status_from: ProjectStatus,
status_to: ProjectStatus,
},
}

/// Metrics collected in a single [`TimeSlice`].
#[derive(Debug, Clone, Serialize, Deserialize, utoipa::ToSchema)]
#[serde(untagged)] // the presence of `source_project`, `source_affiliate_code` determines the kind
Expand Down Expand Up @@ -665,7 +699,7 @@ mod query {

/// Fetches analytics data for the authorized user's projects.
#[utoipa::path(
responses((status = OK, body = inline(FetchResponse))),
responses((status = OK, body = inline(GetResponse))),
)]
#[post("")]
pub async fn fetch_analytics(
Expand All @@ -675,7 +709,7 @@ pub async fn fetch_analytics(
redis: web::Data<RedisPool>,
session_queue: web::Data<AuthQueue>,
clickhouse: web::Data<clickhouse::Client>,
) -> Result<web::Json<FetchResponse>, ApiError> {
) -> Result<web::Json<GetResponse>, ApiError> {
let (scopes, user) = get_user_from_headers(
&http_req,
&**pool,
Expand Down Expand Up @@ -768,6 +802,44 @@ pub async fn fetch_analytics(
.iter()
.map(|version| DBProjectId(version.mod_id))
.collect::<Vec<_>>();
let parent_version_data =
DBVersion::get_many(&parent_version_ids, &**pool, &redis).await?;
let visible_version_ids = filter_visible_version_ids(
parent_version_data
.iter()
.map(|version| &version.inner)
.collect(),
&Some(user.clone()),
&pool,
&redis,
)
.await?;
let mut project_events = parent_version_data
.iter()
.filter(|version| {
visible_version_ids.contains(&version.inner.id)
&& version.inner.date_published >= req.time_range.start
&& version.inner.date_published <= req.time_range.end
})
.map(|version| ProjectAnalyticsEvent {
project_id: version.inner.project_id.into(),
timestamp: version.inner.date_published,
kind: ProjectAnalyticsEventKind::VersionUploaded {
version_id: version.inner.id.into(),
version_name: version.inner.name.clone(),
version_number: version.inner.version_number.clone(),
},
})
.collect::<Vec<_>>();
project_events.extend(
fetch_project_status_change_events(
&project_ids,
&req.time_range,
&pool,
)
.await?,
);
project_events.sort_by_key(|event| event.timestamp);

let affiliate_code_ids =
DBAffiliateCode::get_by_affiliate(user.id.into(), &**pool)
Expand Down Expand Up @@ -1103,8 +1175,9 @@ pub async fn fetch_analytics(
}
}

Ok(web::Json(FetchResponse {
Ok(web::Json(GetResponse {
metrics: time_slices,
project_events,
}))
}

Expand Down Expand Up @@ -1195,6 +1268,57 @@ fn condense_country(country: String, count: u64) -> String {
}
}

async fn fetch_project_status_change_events(
project_ids: &[DBProjectId],
time_range: &TimeRange,
pool: &PgPool,
) -> Result<Vec<ProjectAnalyticsEvent>, ApiError> {
let project_id_values =
project_ids.iter().map(|id| id.0).collect::<Vec<_>>();

let rows = sqlx::query!(
r#"
SELECT
t.mod_id AS "project_id!",
tm.created,
tm.body AS "body: sqlx::types::Json<MessageBody>"
FROM threads_messages tm
INNER JOIN threads t ON t.id = tm.thread_id
WHERE
t.mod_id = ANY($1)
AND tm.body->>'type' = 'status_change'
AND tm.created BETWEEN $2 AND $3
"#,
&project_id_values,
time_range.start,
time_range.end,
)
.fetch_all(&**pool)
.await?;

Ok(rows
.into_iter()
.filter_map(|row| {
let MessageBody::StatusChange {
old_status,
new_status,
} = row.body.0
else {
return None;
};

Some(ProjectAnalyticsEvent {
project_id: DBProjectId(row.project_id).into(),
timestamp: row.created,
kind: ProjectAnalyticsEventKind::StatusChanged {
status_from: old_status,
status_to: new_status,
},
})
})
.collect())
}

struct QueryClickhouseContext<'a> {
clickhouse: &'a clickhouse::Client,
req: &'a GetRequest,
Expand Down Expand Up @@ -1395,7 +1519,7 @@ mod tests {
let test_project_2 = ProjectId(456);
let test_project_3 = ProjectId(789);

let src = FetchResponse {
let src = GetResponse {
metrics: vec![
TimeSlice(vec![
AnalyticsData::Project(ProjectAnalytics {
Expand All @@ -1422,6 +1546,7 @@ mod tests {
}),
})]),
],
project_events: vec![],
};
let target = json!({
"metrics": [
Expand All @@ -1446,7 +1571,8 @@ mod tests {
"revenue": "200.00",
}
]
]
],
"project_events": []
});

assert_eq!(serde_json::to_value(src).unwrap(), target);
Expand Down
Loading