Skip to content

Commit f8cb120

Browse files
author
Michal Tichák
committed
[core] sorting active environmets in kafka plugin
1 parent 4636d9e commit f8cb120

File tree

2 files changed

+93
-1
lines changed

2 files changed

+93
-1
lines changed

core/integration/kafka/plugin.go

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ import (
3131
"encoding/json"
3232
"errors"
3333
"net/url"
34+
"slices"
3435
"strconv"
3536
"strings"
3637
"sync"
@@ -304,9 +305,36 @@ func (p *Plugin) GetRunningEnvList() []*kafkapb.EnvInfo {
304305
for _, v := range p.envsInRunning {
305306
array = append(array, v)
306307
}
308+
309+
p.SortRunningEnvList(array)
310+
p.SortDetectorsOfRunningEnvs(array)
307311
return array
308312
}
309313

314+
// OCTRL-932 we sort the active envs here are grafana cannot do it
315+
// order is defined as:
316+
// 1) runs with more than 2 detectors first
317+
// 2) if there are more than 2 runs with more than 2 detectors, those with ITS should be first
318+
func (p *Plugin) SortRunningEnvList(activeEnvs []*kafkapb.EnvInfo) {
319+
slices.SortStableFunc(activeEnvs, func(a, b *kafkapb.EnvInfo) int {
320+
if len(a.Detectors) >= 2 && len(b.Detectors) >= 2 {
321+
if slices.Contains(a.Detectors, "ITS") {
322+
return -1
323+
}
324+
if slices.Contains(b.Detectors, "ITS") {
325+
return 1
326+
}
327+
}
328+
return len(b.Detectors) - len(a.Detectors)
329+
})
330+
}
331+
332+
func (p *Plugin) SortDetectorsOfRunningEnvs(activeEnvs []*kafkapb.EnvInfo) {
333+
for _, env := range activeEnvs {
334+
slices.Sort(env.Detectors)
335+
}
336+
}
337+
310338
func (p *Plugin) produceMessage(message []byte, topic string, envId string, call string) {
311339
log.WithField("call", call).
312340
WithField("partition", envId).
@@ -317,7 +345,6 @@ func (p *Plugin) produceMessage(message []byte, topic string, envId string, call
317345
Topic: topic,
318346
Value: message,
319347
})
320-
321348
if err != nil {
322349
log.WithField("call", call).
323350
WithField("partition", envId).
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
/*
2+
* === This file is part of ALICE O² ===
3+
*
4+
* Copyright 2024 CERN and copyright holders of ALICE O².
5+
* Author: Teo Mrnjavac <teo.mrnjavac@cern.ch>
6+
*
7+
* This program is free software: you can redistribute it and/or modify
8+
* it under the terms of the GNU General Public License as published by
9+
* the Free Software Foundation, either version 3 of the License, or
10+
* (at your option) any later version.
11+
*
12+
* This program is distributed in the hope that it will be useful,
13+
* but WITHOUT ANY WARRANTY; without even the implied warranty of
14+
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15+
* GNU General Public License for more details.
16+
*
17+
* You should have received a copy of the GNU General Public License
18+
* along with this program. If not, see <http://www.gnu.org/licenses/>.
19+
*
20+
* In applying this license CERN does not waive the privileges and
21+
* immunities granted to it by virtue of its status as an
22+
* Intergovernmental Organization or submit itself to any jurisdiction.
23+
*/
24+
25+
package kafka
26+
27+
import (
28+
"testing"
29+
30+
kafkapb "github.com/AliceO2Group/Control/core/integration/kafka/protos"
31+
)
32+
33+
func TestSortActiveRuns(t *testing.T) {
34+
p := Plugin{}
35+
p.SortRunningEnvList(nil)
36+
37+
var envs []*kafkapb.EnvInfo
38+
envs = append(envs,
39+
&kafkapb.EnvInfo{EnvironmentId: "1", Detectors: []string{"first"}},
40+
&kafkapb.EnvInfo{EnvironmentId: "2", Detectors: []string{"second"}},
41+
&kafkapb.EnvInfo{EnvironmentId: "3", Detectors: []string{"ITS", "first"}},
42+
&kafkapb.EnvInfo{EnvironmentId: "4", Detectors: []string{"first", "second", "third"}})
43+
44+
p.SortRunningEnvList(envs)
45+
46+
if len(envs) != 4 {
47+
t.Error("wrong number of environments")
48+
}
49+
50+
if envs[0].EnvironmentId != "3" {
51+
t.Errorf("first should have been environment 3, but is %s with dets %v", envs[0].EnvironmentId, envs[0].Detectors)
52+
}
53+
54+
if envs[1].EnvironmentId != "4" {
55+
t.Errorf("second should have been environment 4, but is %s with dets %v", envs[1].EnvironmentId, envs[1].Detectors)
56+
}
57+
58+
if envs[2].EnvironmentId != "1" {
59+
t.Errorf("third should have been environment 1, but is %s with dets %v", envs[2].EnvironmentId, envs[2].Detectors)
60+
}
61+
62+
if envs[3].EnvironmentId != "2" {
63+
t.Errorf("fourth should have been environment 2, but is %s with dets %v", envs[3].EnvironmentId, envs[3].Detectors)
64+
}
65+
}

0 commit comments

Comments
 (0)