Skip to content

Commit 7b283bf

Browse files
author
Michal Tichák
committed
[core] sorting active environmets in kafka plugin
1 parent 4636d9e commit 7b283bf

File tree

2 files changed

+99
-1
lines changed

2 files changed

+99
-1
lines changed

core/integration/kafka/plugin.go

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ import (
3131
"encoding/json"
3232
"errors"
3333
"net/url"
34+
"slices"
3435
"strconv"
3536
"strings"
3637
"sync"
@@ -307,6 +308,29 @@ func (p *Plugin) GetRunningEnvList() []*kafkapb.EnvInfo {
307308
return array
308309
}
309310

311+
// OCTRL-932 we sort the active envs here are grafana cannot do it
312+
// order is defined as:
313+
// 1) runs with more than 2 detectors first
314+
// 2) if there are more than 2 runs with more than 2 detectors, those with ITS should be first
315+
func (p *Plugin) SortRunningEnvList(activeEnvs []*kafkapb.EnvInfo) {
316+
slices.SortStableFunc(activeEnvs, func(a, b *kafkapb.EnvInfo) int {
317+
if len(a.Detectors) >= 2 && len(b.Detectors) >= 2 {
318+
aHasITS := slices.Contains(a.Detectors, "ITS")
319+
bHasITS := slices.Contains(b.Detectors, "ITS")
320+
if aHasITS && bHasITS {
321+
return len(b.Detectors) - len(a.Detectors)
322+
}
323+
if aHasITS {
324+
return -1
325+
}
326+
if bHasITS {
327+
return 1
328+
}
329+
}
330+
return len(b.Detectors) - len(a.Detectors)
331+
})
332+
}
333+
310334
func (p *Plugin) produceMessage(message []byte, topic string, envId string, call string) {
311335
log.WithField("call", call).
312336
WithField("partition", envId).
@@ -317,7 +341,6 @@ func (p *Plugin) produceMessage(message []byte, topic string, envId string, call
317341
Topic: topic,
318342
Value: message,
319343
})
320-
321344
if err != nil {
322345
log.WithField("call", call).
323346
WithField("partition", envId).
Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
/*
2+
* === This file is part of ALICE O² ===
3+
*
4+
* Copyright 2024 CERN and copyright holders of ALICE O².
5+
* Author: Teo Mrnjavac <teo.mrnjavac@cern.ch>
6+
*
7+
* This program is free software: you can redistribute it and/or modify
8+
* it under the terms of the GNU General Public License as published by
9+
* the Free Software Foundation, either version 3 of the License, or
10+
* (at your option) any later version.
11+
*
12+
* This program is distributed in the hope that it will be useful,
13+
* but WITHOUT ANY WARRANTY; without even the implied warranty of
14+
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15+
* GNU General Public License for more details.
16+
*
17+
* You should have received a copy of the GNU General Public License
18+
* along with this program. If not, see <http://www.gnu.org/licenses/>.
19+
*
20+
* In applying this license CERN does not waive the privileges and
21+
* immunities granted to it by virtue of its status as an
22+
* Intergovernmental Organization or submit itself to any jurisdiction.
23+
*/
24+
25+
package kafka
26+
27+
import (
28+
"testing"
29+
30+
kafkapb "github.com/AliceO2Group/Control/core/integration/kafka/protos"
31+
)
32+
33+
func TestSortActiveRuns(t *testing.T) {
34+
p := Plugin{}
35+
p.SortRunningEnvList(nil)
36+
37+
var envs []*kafkapb.EnvInfo
38+
envs = append(envs,
39+
&kafkapb.EnvInfo{EnvironmentId: "1", Detectors: []string{"ITS", "first"}},
40+
&kafkapb.EnvInfo{EnvironmentId: "2", Detectors: []string{"first"}},
41+
&kafkapb.EnvInfo{EnvironmentId: "3", Detectors: []string{"ITS", "first", "second"}},
42+
&kafkapb.EnvInfo{EnvironmentId: "4", Detectors: []string{"second"}},
43+
&kafkapb.EnvInfo{EnvironmentId: "5", Detectors: []string{"first", "second"}},
44+
&kafkapb.EnvInfo{EnvironmentId: "6", Detectors: []string{"ITS", "second"}})
45+
46+
p.SortRunningEnvList(envs)
47+
48+
if len(envs) != 6 {
49+
t.Error("wrong number of environments")
50+
}
51+
52+
if envs[0].EnvironmentId != "3" {
53+
t.Errorf("first should have been environment 3, but is %s with dets %v", envs[0].EnvironmentId, envs[0].Detectors)
54+
}
55+
56+
if envs[1].EnvironmentId != "1" {
57+
t.Errorf("second should have been environment 1, but is %s with dets %v", envs[1].EnvironmentId, envs[1].Detectors)
58+
}
59+
60+
if envs[2].EnvironmentId != "6" {
61+
t.Errorf("third should have been environment 6, but is %s with dets %v", envs[2].EnvironmentId, envs[2].Detectors)
62+
}
63+
64+
if envs[3].EnvironmentId != "5" {
65+
t.Errorf("fourth should have been environment 5, but is %s with dets %v", envs[3].EnvironmentId, envs[3].Detectors)
66+
}
67+
68+
if envs[4].EnvironmentId != "2" {
69+
t.Errorf("fifth should have been environment 2, but is %s with dets %v", envs[4].EnvironmentId, envs[4].Detectors)
70+
}
71+
72+
if envs[5].EnvironmentId != "4" {
73+
t.Errorf("fifth should have been environment 4, but is %s with dets %v", envs[5].EnvironmentId, envs[5].Detectors)
74+
}
75+
}

0 commit comments

Comments
 (0)