Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions src/Audit/Adapter.php
Original file line number Diff line number Diff line change
Expand Up @@ -241,4 +241,34 @@ abstract public function find(array $queries = []): array;
* @throws \Exception
*/
abstract public function count(array $queries = [], ?int $max = null): int;

/**
* Find logs aggregated into (groupValue, bucket, count) rows.
*
* Produces time-bucketed grouped counts for charts of the form
* "events per <interval>, grouped by <attribute>". The result is a flat
* array of associative rows ordered by groupValue asc, bucket asc;
* cloud-side consumers reshape into nested groups.
*
* Adapters MUST:
* - reject `$groupBy` values outside {`event`, `userType`, `resourceType`}
* - reject `$interval` values outside {`hour`, `day`, `week`, `month`}
* - apply the same filters/tenant scoping as `find()` to `$queries`
* - apply `Query::limit()` / `Query::offset()` to the group set (top-N
* by `SUM(count)` desc, ties by groupValue asc), default limit 25,
* hard max 100
* - zero-fill every bucket between the smallest and largest bucket
* implied by the time filter for every returned group, with UTC-aligned
* boundaries
* - return the bucket as an ISO-8601 UTC string (matching `find()`'s
* formatting of `time`)
*
* @param array<\Utopia\Audit\Query> $queries Filter/limit/offset queries; callers should include a time BETWEEN filter
* @param string $groupBy One of `event`, `userType`, `resourceType`
* @param string $interval One of `hour`, `day`, `week`, `month`
* @return array<int, array{value: string, bucket: string, count: int}>
*
* @throws \Exception
*/
abstract public function findGrouped(array $queries, string $groupBy, string $interval): array;
}
179 changes: 179 additions & 0 deletions src/Audit/Adapter/ClickHouse.php
Original file line number Diff line number Diff line change
Expand Up @@ -529,6 +529,13 @@ public function getIndexes(): array
'lengths' => [],
'orders' => [],
],
[
'$id' => '_key_resource_type',
'type' => Database::INDEX_KEY,
'attributes' => ['resourceType'],
'lengths' => [],
'orders' => [],
],
];
}

Expand Down Expand Up @@ -1079,6 +1086,178 @@ public function count(array $queries = [], ?int $max = null): int
return $trimmed !== '' ? (int) $trimmed : 0;
}

/**
* Allowed groupBy columns for findGrouped().
*
* @var list<string>
*/
private const FIND_GROUPED_ATTRIBUTES = ['event', 'userType', 'resourceType'];

/**
* Allowed interval values for findGrouped() mapped to ClickHouse date units.
*
* @var array<string, string>
*/
private const FIND_GROUPED_INTERVALS = [
'hour' => 'hour',
'day' => 'day',
'week' => 'week',
'month' => 'month',
];

private const FIND_GROUPED_DEFAULT_LIMIT = 25;

private const FIND_GROUPED_MAX_LIMIT = 100;

/**
* Find logs aggregated into (groupValue, bucket, count) rows.
*
* Top-N groups are selected by `SUM(count)` desc (ties by groupValue asc),
* then paginated via the supplied `Query::limit()` / `Query::offset()`.
* Buckets are aligned to UTC via `toStartOfInterval(time, INTERVAL 1 <unit>, 'UTC')`
* and zero-filled across the time range implied by the filter set.
*
* @param array<Query> $queries
* @param string $groupBy
* @param string $interval
* @return array<int, array{value: string, bucket: string, count: int}>
* @throws Exception
*/
public function findGrouped(array $queries, string $groupBy, string $interval): array
{
if (!in_array($groupBy, self::FIND_GROUPED_ATTRIBUTES, true)) {
$allowed = implode(', ', self::FIND_GROUPED_ATTRIBUTES);
throw new Exception("Invalid groupBy '{$groupBy}'. Allowed: {$allowed}");
}

if (!array_key_exists($interval, self::FIND_GROUPED_INTERVALS)) {
$allowed = implode(', ', array_keys(self::FIND_GROUPED_INTERVALS));
throw new Exception("Invalid interval '{$interval}'. Allowed: {$allowed}");
}

$unit = self::FIND_GROUPED_INTERVALS[$interval];

$this->validateAttributeName($groupBy);
$escapedGroup = $this->escapeIdentifier($groupBy);
$escapedTime = $this->escapeIdentifier('time');

$tableName = $this->getTableName();
$escapedTable = $this->escapeIdentifier($this->database) . '.' . $this->escapeIdentifier($tableName);

$parsed = $this->parseQueries($queries);

$filters = $parsed['filters'];
$tenantFilter = $this->getTenantFilter();
if ($tenantFilter !== '') {
$filters[] = ltrim($tenantFilter, ' AND');
}

$whereClause = empty($filters) ? '' : ' WHERE ' . implode(' AND ', $filters);

$params = $parsed['params'];
unset($params['limit'], $params['offset']);

$limit = self::FIND_GROUPED_DEFAULT_LIMIT;
if (isset($parsed['limit'])) {
$limit = $parsed['limit'];
if ($limit > self::FIND_GROUPED_MAX_LIMIT) {
$limit = self::FIND_GROUPED_MAX_LIMIT;
}
if ($limit < 1) {
$limit = 1;
}
}

$offset = 0;
if (isset($parsed['offset'])) {
$offset = max(0, $parsed['offset']);
}

$params['group_limit'] = $limit;
$params['group_offset'] = $offset;

$sql = "
WITH
top_groups AS (
SELECT value FROM (
SELECT {$escapedGroup} AS value, COUNT(*) AS sum_count
FROM {$escapedTable}{$whereClause}
GROUP BY value
ORDER BY sum_count DESC, value ASC
LIMIT {group_limit:UInt64} OFFSET {group_offset:UInt64}
)
),
bounds AS (
SELECT
toStartOfInterval(MIN({$escapedTime}), INTERVAL 1 {$unit}, 'UTC') AS bucket_from,
toStartOfInterval(MAX({$escapedTime}), INTERVAL 1 {$unit}, 'UTC') AS bucket_to
FROM {$escapedTable}{$whereClause}
),
buckets AS (
SELECT arrayJoin(
arrayMap(
i -> date_add({$unit}, i, bucket_from),
range(0, toUInt64(dateDiff('{$unit}', bucket_from, bucket_to) + 1))
)
) AS bucket
FROM bounds
WHERE bucket_from IS NOT NULL AND bucket_to IS NOT NULL
),
agg AS (
SELECT
{$escapedGroup} AS value,
toStartOfInterval({$escapedTime}, INTERVAL 1 {$unit}, 'UTC') AS bucket,
COUNT(*) AS count
FROM {$escapedTable}{$whereClause}
GROUP BY value, bucket
)
Comment on lines +1206 to +1213
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 The agg CTE aggregates every distinct group value that matches $whereClause, not just the top_groups. For high-cardinality columns like resourceType this can mean scanning and grouping many more rows than needed. Adding an IN-subquery filter restricts the aggregation to only the top-N groups already selected, at the cost of one extra read of the small top_groups result.

Suggested change
agg AS (
SELECT
{$escapedGroup} AS value,
toStartOfInterval({$escapedTime}, INTERVAL 1 {$unit}, 'UTC') AS bucket,
COUNT(*) AS count
FROM {$escapedTable}{$whereClause}
GROUP BY value, bucket
)
agg AS (
SELECT
{$escapedGroup} AS value,
toStartOfInterval({$escapedTime}, INTERVAL 1 {$unit}, 'UTC') AS bucket,
COUNT(*) AS count
FROM {$escapedTable}{$whereClause}
WHERE {$escapedGroup} IN (SELECT value FROM top_groups)
GROUP BY value, bucket
)

SELECT
g.value AS value,
formatDateTime(b.bucket, '%Y-%m-%dT%H:%i:%S.000+00:00') AS bucket,
toUInt64(coalesce(a.count, 0)) AS count
FROM top_groups g
CROSS JOIN buckets b
LEFT JOIN agg a ON a.value = g.value AND a.bucket = b.bucket
WHERE g.value IN (SELECT value FROM top_groups)
ORDER BY value ASC, bucket ASC
Comment on lines +1218 to +1222
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 The outer WHERE g.value IN (SELECT value FROM top_groups) predicate is redundant — g is already derived directly from top_groups, so every row of g is guaranteed to satisfy this condition. The clause adds an extra correlated subquery that ClickHouse may or may not optimise away, and it obscures the intent of the JOIN.

Suggested change
FROM top_groups g
CROSS JOIN buckets b
LEFT JOIN agg a ON a.value = g.value AND a.bucket = b.bucket
WHERE g.value IN (SELECT value FROM top_groups)
ORDER BY value ASC, bucket ASC
FROM top_groups g
CROSS JOIN buckets b
LEFT JOIN agg a ON a.value = g.value AND a.bucket = b.bucket
ORDER BY value ASC, bucket ASC

FORMAT JSON
";

$result = $this->query($sql, $params);
$decoded = json_decode($result, true);

if (!is_array($decoded) || !isset($decoded['data']) || !is_array($decoded['data'])) {
return [];
}

$rows = [];
/** @var array<int, array<string, mixed>> $data */
$data = $decoded['data'];
foreach ($data as $row) {
if (!is_array($row)) {
continue;
}
$value = $row['value'] ?? '';
$bucket = $row['bucket'] ?? '';
$count = $row['count'] ?? 0;

if (!is_string($value)) {
$value = is_scalar($value) ? (string) $value : '';
}
if (!is_string($bucket)) {
$bucket = is_scalar($bucket) ? (string) $bucket : '';
}

$rows[] = [
'value' => $value,
'bucket' => $bucket,
'count' => is_numeric($count) ? (int) $count : 0,
];
}

return $rows;
}

/**
* Parse Query objects into SQL components.
*
Expand Down
18 changes: 18 additions & 0 deletions src/Audit/Adapter/Database.php
Original file line number Diff line number Diff line change
Expand Up @@ -542,4 +542,22 @@ public function count(array $queries = [], ?int $max = null): int
);
});
}

/**
* Find logs aggregated into (groupValue, bucket, count) rows.
*
* Not implemented for the Database adapter — grouped time-bucketed
* aggregations are designed for analytical backends (ClickHouse). Callers
* relying on this method must use the ClickHouse adapter.
*
* @param array<\Utopia\Audit\Query> $queries
* @param string $groupBy
* @param string $interval
* @return array<int, array{value: string, bucket: string, count: int}>
* @throws \Exception
*/
public function findGrouped(array $queries, string $groupBy, string $interval): array
{
throw new Exception('findGrouped is not supported by the Database adapter');
}
}
15 changes: 15 additions & 0 deletions src/Audit/Audit.php
Original file line number Diff line number Diff line change
Expand Up @@ -289,4 +289,19 @@ public function count(array $queries = [], ?int $max = null): int
{
return $this->adapter->count($queries, $max);
}

/**
* Find logs aggregated into (groupValue, bucket, count) rows.
*
* @param array<Query> $queries Filter/limit/offset queries; callers should include a time BETWEEN filter
* @param string $groupBy One of `event`, `userType`, `resourceType`
* @param string $interval One of `hour`, `day`, `week`, `month`
* @return array<int, array{value: string, bucket: string, count: int}>
*
* @throws \Exception
*/
public function findGrouped(array $queries, string $groupBy, string $interval): array
{
return $this->adapter->findGrouped($queries, $groupBy, $interval);
}
}
Loading
Loading