-
Notifications
You must be signed in to change notification settings - Fork 9
feat: add findGrouped() to Adapter + ClickHouse implementation #119
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -529,6 +529,13 @@ public function getIndexes(): array | |||||||||||||||||||
| 'lengths' => [], | ||||||||||||||||||||
| 'orders' => [], | ||||||||||||||||||||
| ], | ||||||||||||||||||||
| [ | ||||||||||||||||||||
| '$id' => '_key_resource_type', | ||||||||||||||||||||
| 'type' => Database::INDEX_KEY, | ||||||||||||||||||||
| 'attributes' => ['resourceType'], | ||||||||||||||||||||
| 'lengths' => [], | ||||||||||||||||||||
| 'orders' => [], | ||||||||||||||||||||
| ], | ||||||||||||||||||||
| ]; | ||||||||||||||||||||
| } | ||||||||||||||||||||
|
|
||||||||||||||||||||
|
|
@@ -1079,6 +1086,178 @@ public function count(array $queries = [], ?int $max = null): int | |||||||||||||||||||
| return $trimmed !== '' ? (int) $trimmed : 0; | ||||||||||||||||||||
| } | ||||||||||||||||||||
|
|
||||||||||||||||||||
| /** | ||||||||||||||||||||
| * Allowed groupBy columns for findGrouped(). | ||||||||||||||||||||
| * | ||||||||||||||||||||
| * @var list<string> | ||||||||||||||||||||
| */ | ||||||||||||||||||||
| private const FIND_GROUPED_ATTRIBUTES = ['event', 'userType', 'resourceType']; | ||||||||||||||||||||
|
|
||||||||||||||||||||
| /** | ||||||||||||||||||||
| * Allowed interval values for findGrouped() mapped to ClickHouse date units. | ||||||||||||||||||||
| * | ||||||||||||||||||||
| * @var array<string, string> | ||||||||||||||||||||
| */ | ||||||||||||||||||||
| private const FIND_GROUPED_INTERVALS = [ | ||||||||||||||||||||
| 'hour' => 'hour', | ||||||||||||||||||||
| 'day' => 'day', | ||||||||||||||||||||
| 'week' => 'week', | ||||||||||||||||||||
| 'month' => 'month', | ||||||||||||||||||||
| ]; | ||||||||||||||||||||
|
|
||||||||||||||||||||
| private const FIND_GROUPED_DEFAULT_LIMIT = 25; | ||||||||||||||||||||
|
|
||||||||||||||||||||
| private const FIND_GROUPED_MAX_LIMIT = 100; | ||||||||||||||||||||
|
|
||||||||||||||||||||
| /** | ||||||||||||||||||||
| * Find logs aggregated into (groupValue, bucket, count) rows. | ||||||||||||||||||||
| * | ||||||||||||||||||||
| * Top-N groups are selected by `SUM(count)` desc (ties by groupValue asc), | ||||||||||||||||||||
| * then paginated via the supplied `Query::limit()` / `Query::offset()`. | ||||||||||||||||||||
| * Buckets are aligned to UTC via `toStartOfInterval(time, INTERVAL 1 <unit>, 'UTC')` | ||||||||||||||||||||
| * and zero-filled across the time range implied by the filter set. | ||||||||||||||||||||
| * | ||||||||||||||||||||
| * @param array<Query> $queries | ||||||||||||||||||||
| * @param string $groupBy | ||||||||||||||||||||
| * @param string $interval | ||||||||||||||||||||
| * @return array<int, array{value: string, bucket: string, count: int}> | ||||||||||||||||||||
| * @throws Exception | ||||||||||||||||||||
| */ | ||||||||||||||||||||
| public function findGrouped(array $queries, string $groupBy, string $interval): array | ||||||||||||||||||||
| { | ||||||||||||||||||||
| if (!in_array($groupBy, self::FIND_GROUPED_ATTRIBUTES, true)) { | ||||||||||||||||||||
| $allowed = implode(', ', self::FIND_GROUPED_ATTRIBUTES); | ||||||||||||||||||||
| throw new Exception("Invalid groupBy '{$groupBy}'. Allowed: {$allowed}"); | ||||||||||||||||||||
| } | ||||||||||||||||||||
|
|
||||||||||||||||||||
| if (!array_key_exists($interval, self::FIND_GROUPED_INTERVALS)) { | ||||||||||||||||||||
| $allowed = implode(', ', array_keys(self::FIND_GROUPED_INTERVALS)); | ||||||||||||||||||||
| throw new Exception("Invalid interval '{$interval}'. Allowed: {$allowed}"); | ||||||||||||||||||||
| } | ||||||||||||||||||||
|
|
||||||||||||||||||||
| $unit = self::FIND_GROUPED_INTERVALS[$interval]; | ||||||||||||||||||||
|
|
||||||||||||||||||||
| $this->validateAttributeName($groupBy); | ||||||||||||||||||||
| $escapedGroup = $this->escapeIdentifier($groupBy); | ||||||||||||||||||||
| $escapedTime = $this->escapeIdentifier('time'); | ||||||||||||||||||||
|
|
||||||||||||||||||||
| $tableName = $this->getTableName(); | ||||||||||||||||||||
| $escapedTable = $this->escapeIdentifier($this->database) . '.' . $this->escapeIdentifier($tableName); | ||||||||||||||||||||
|
|
||||||||||||||||||||
| $parsed = $this->parseQueries($queries); | ||||||||||||||||||||
|
|
||||||||||||||||||||
| $filters = $parsed['filters']; | ||||||||||||||||||||
| $tenantFilter = $this->getTenantFilter(); | ||||||||||||||||||||
| if ($tenantFilter !== '') { | ||||||||||||||||||||
| $filters[] = ltrim($tenantFilter, ' AND'); | ||||||||||||||||||||
| } | ||||||||||||||||||||
|
|
||||||||||||||||||||
| $whereClause = empty($filters) ? '' : ' WHERE ' . implode(' AND ', $filters); | ||||||||||||||||||||
|
|
||||||||||||||||||||
| $params = $parsed['params']; | ||||||||||||||||||||
| unset($params['limit'], $params['offset']); | ||||||||||||||||||||
|
|
||||||||||||||||||||
| $limit = self::FIND_GROUPED_DEFAULT_LIMIT; | ||||||||||||||||||||
| if (isset($parsed['limit'])) { | ||||||||||||||||||||
| $limit = $parsed['limit']; | ||||||||||||||||||||
| if ($limit > self::FIND_GROUPED_MAX_LIMIT) { | ||||||||||||||||||||
| $limit = self::FIND_GROUPED_MAX_LIMIT; | ||||||||||||||||||||
| } | ||||||||||||||||||||
| if ($limit < 1) { | ||||||||||||||||||||
| $limit = 1; | ||||||||||||||||||||
| } | ||||||||||||||||||||
| } | ||||||||||||||||||||
|
|
||||||||||||||||||||
| $offset = 0; | ||||||||||||||||||||
| if (isset($parsed['offset'])) { | ||||||||||||||||||||
| $offset = max(0, $parsed['offset']); | ||||||||||||||||||||
| } | ||||||||||||||||||||
|
|
||||||||||||||||||||
| $params['group_limit'] = $limit; | ||||||||||||||||||||
| $params['group_offset'] = $offset; | ||||||||||||||||||||
|
|
||||||||||||||||||||
| $sql = " | ||||||||||||||||||||
| WITH | ||||||||||||||||||||
| top_groups AS ( | ||||||||||||||||||||
| SELECT value FROM ( | ||||||||||||||||||||
| SELECT {$escapedGroup} AS value, COUNT(*) AS sum_count | ||||||||||||||||||||
| FROM {$escapedTable}{$whereClause} | ||||||||||||||||||||
| GROUP BY value | ||||||||||||||||||||
| ORDER BY sum_count DESC, value ASC | ||||||||||||||||||||
| LIMIT {group_limit:UInt64} OFFSET {group_offset:UInt64} | ||||||||||||||||||||
| ) | ||||||||||||||||||||
| ), | ||||||||||||||||||||
| bounds AS ( | ||||||||||||||||||||
| SELECT | ||||||||||||||||||||
| toStartOfInterval(MIN({$escapedTime}), INTERVAL 1 {$unit}, 'UTC') AS bucket_from, | ||||||||||||||||||||
| toStartOfInterval(MAX({$escapedTime}), INTERVAL 1 {$unit}, 'UTC') AS bucket_to | ||||||||||||||||||||
| FROM {$escapedTable}{$whereClause} | ||||||||||||||||||||
| ), | ||||||||||||||||||||
| buckets AS ( | ||||||||||||||||||||
| SELECT arrayJoin( | ||||||||||||||||||||
| arrayMap( | ||||||||||||||||||||
| i -> date_add({$unit}, i, bucket_from), | ||||||||||||||||||||
| range(0, toUInt64(dateDiff('{$unit}', bucket_from, bucket_to) + 1)) | ||||||||||||||||||||
| ) | ||||||||||||||||||||
| ) AS bucket | ||||||||||||||||||||
| FROM bounds | ||||||||||||||||||||
| WHERE bucket_from IS NOT NULL AND bucket_to IS NOT NULL | ||||||||||||||||||||
| ), | ||||||||||||||||||||
| agg AS ( | ||||||||||||||||||||
| SELECT | ||||||||||||||||||||
| {$escapedGroup} AS value, | ||||||||||||||||||||
| toStartOfInterval({$escapedTime}, INTERVAL 1 {$unit}, 'UTC') AS bucket, | ||||||||||||||||||||
| COUNT(*) AS count | ||||||||||||||||||||
| FROM {$escapedTable}{$whereClause} | ||||||||||||||||||||
| GROUP BY value, bucket | ||||||||||||||||||||
| ) | ||||||||||||||||||||
| SELECT | ||||||||||||||||||||
| g.value AS value, | ||||||||||||||||||||
| formatDateTime(b.bucket, '%Y-%m-%dT%H:%i:%S.000+00:00') AS bucket, | ||||||||||||||||||||
| toUInt64(coalesce(a.count, 0)) AS count | ||||||||||||||||||||
| FROM top_groups g | ||||||||||||||||||||
| CROSS JOIN buckets b | ||||||||||||||||||||
| LEFT JOIN agg a ON a.value = g.value AND a.bucket = b.bucket | ||||||||||||||||||||
| WHERE g.value IN (SELECT value FROM top_groups) | ||||||||||||||||||||
| ORDER BY value ASC, bucket ASC | ||||||||||||||||||||
|
Comment on lines
+1218
to
+1222
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||||||||||||||
| FORMAT JSON | ||||||||||||||||||||
| "; | ||||||||||||||||||||
|
|
||||||||||||||||||||
| $result = $this->query($sql, $params); | ||||||||||||||||||||
| $decoded = json_decode($result, true); | ||||||||||||||||||||
|
|
||||||||||||||||||||
| if (!is_array($decoded) || !isset($decoded['data']) || !is_array($decoded['data'])) { | ||||||||||||||||||||
| return []; | ||||||||||||||||||||
| } | ||||||||||||||||||||
|
|
||||||||||||||||||||
| $rows = []; | ||||||||||||||||||||
| /** @var array<int, array<string, mixed>> $data */ | ||||||||||||||||||||
| $data = $decoded['data']; | ||||||||||||||||||||
| foreach ($data as $row) { | ||||||||||||||||||||
| if (!is_array($row)) { | ||||||||||||||||||||
| continue; | ||||||||||||||||||||
| } | ||||||||||||||||||||
| $value = $row['value'] ?? ''; | ||||||||||||||||||||
| $bucket = $row['bucket'] ?? ''; | ||||||||||||||||||||
| $count = $row['count'] ?? 0; | ||||||||||||||||||||
|
|
||||||||||||||||||||
| if (!is_string($value)) { | ||||||||||||||||||||
| $value = is_scalar($value) ? (string) $value : ''; | ||||||||||||||||||||
| } | ||||||||||||||||||||
| if (!is_string($bucket)) { | ||||||||||||||||||||
| $bucket = is_scalar($bucket) ? (string) $bucket : ''; | ||||||||||||||||||||
| } | ||||||||||||||||||||
|
|
||||||||||||||||||||
| $rows[] = [ | ||||||||||||||||||||
| 'value' => $value, | ||||||||||||||||||||
| 'bucket' => $bucket, | ||||||||||||||||||||
| 'count' => is_numeric($count) ? (int) $count : 0, | ||||||||||||||||||||
| ]; | ||||||||||||||||||||
| } | ||||||||||||||||||||
|
|
||||||||||||||||||||
| return $rows; | ||||||||||||||||||||
| } | ||||||||||||||||||||
|
|
||||||||||||||||||||
| /** | ||||||||||||||||||||
| * Parse Query objects into SQL components. | ||||||||||||||||||||
| * | ||||||||||||||||||||
|
|
||||||||||||||||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
aggCTE aggregates every distinct group value that matches$whereClause, not just thetop_groups. For high-cardinality columns likeresourceTypethis can mean scanning and grouping many more rows than needed. Adding an IN-subquery filter restricts the aggregation to only the top-N groups already selected, at the cost of one extra read of the smalltop_groupsresult.