Skip to content

Commit a0d0299

Browse files
committed
feat: adding parallel processing
1 parent 77075f4 commit a0d0299

4 files changed

Lines changed: 208 additions & 56 deletions

File tree

cmd/gitops/main.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,13 @@ func main() {
6969
Usage: "display unchanged secrets in the plan overview",
7070
EnvVars: []string{"GITOPS_SHOW_UNCHANGED"},
7171
},
72+
&cli.IntFlag{
73+
Name: "parallelism",
74+
Aliases: []string{"p"},
75+
Value: 5,
76+
Usage: "number of parallel operations for decrypting secrets and kubernetes operations",
77+
EnvVars: []string{"GITOPS_PARALLELISM"},
78+
},
7279
},
7380
Commands: []*cli.Command{
7481
{

internal/kubernetes/kubernetes.go

Lines changed: 75 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package kubernetes
33
import (
44
"fmt"
55
"strings"
6+
"sync"
67

78
"github.com/TwiN/go-color"
89
"github.com/google/uuid"
@@ -122,6 +123,11 @@ func createKubernetesPlan(c *cli.Context) (*plan.Plan, error) {
122123
Items: []plan.PlanItem{},
123124
}
124125

126+
parallelism := c.Int("parallelism")
127+
if parallelism < 1 {
128+
parallelism = 1
129+
}
130+
125131
bar := progressbar.NewOptions(len(localSecrets),
126132
progressbar.OptionEnableColorCodes(true),
127133
progressbar.OptionShowBytes(false),
@@ -132,38 +138,79 @@ func createKubernetesPlan(c *cli.Context) (*plan.Plan, error) {
132138
progressbar.OptionSetDescription("[green][Syncing local state with cluster][reset]"),
133139
)
134140

135-
for _, localSecret := range localSecrets {
136-
bar.Add(1)
137-
// check for local secret in state
138-
// update ID in localSecret if secret exists in state
139-
// update hash in state if secret exists in state
140-
stateSecret := state.GetState().GetByPath(localSecret.Path)
141-
if stateSecret == nil {
142-
log.Trace("Secret ", localSecret.CombinedName(), " does not exist in state")
143-
localSecret.ID = uuid.New().String()
144-
stateSecret = state.GetState().Add(localSecret)
145-
} else {
146-
log.Trace("Secret ", localSecret.CombinedName(), " exists in state. Updating")
147-
stateSecret.Update(localSecret)
148-
}
149-
150-
planItem := plan.PlanItem{
151-
LocalSecret: localSecret,
152-
}
141+
// Create channels for parallel processing
142+
type planItemResult struct {
143+
item plan.PlanItem
144+
err error
145+
}
153146

154-
remoteSecret, err := k8s.GetSecret(localSecret, localSecret.Target)
155-
if err != nil {
156-
if k8sErrors.IsNotFound(err) {
157-
log.Trace("Secret ", localSecret.Name, " does not exist in Kubernetes cluster")
158-
} else {
159-
log.Error("Failed to get secret ", localSecret.Name, " from Kubernetes cluster")
160-
return nil, err
147+
secretChan := make(chan *secret.Secret, len(localSecrets))
148+
resultChan := make(chan planItemResult, len(localSecrets))
149+
150+
// Start worker goroutines
151+
var workerGroup sync.WaitGroup
152+
for i := 0; i < parallelism; i++ {
153+
workerGroup.Add(1)
154+
go func() {
155+
defer workerGroup.Done()
156+
for localSecret := range secretChan {
157+
// check for local secret in state
158+
// update ID in localSecret if secret exists in state
159+
// update hash in state if secret exists in state
160+
stateSecret := state.GetState().GetByPath(localSecret.Path)
161+
if stateSecret == nil {
162+
log.Trace("Secret ", localSecret.CombinedName(), " does not exist in state")
163+
localSecret.ID = uuid.New().String()
164+
stateSecret = state.GetState().Add(localSecret)
165+
} else {
166+
log.Trace("Secret ", localSecret.CombinedName(), " exists in state. Updating")
167+
stateSecret.Update(localSecret)
168+
}
169+
170+
planItem := plan.PlanItem{
171+
LocalSecret: localSecret,
172+
}
173+
174+
remoteSecret, err := k8s.GetSecret(localSecret, localSecret.Target)
175+
if err != nil {
176+
if k8sErrors.IsNotFound(err) {
177+
log.Trace("Secret ", localSecret.Name, " does not exist in Kubernetes cluster")
178+
} else {
179+
log.Error("Failed to get secret ", localSecret.Name, " from Kubernetes cluster")
180+
resultChan <- planItemResult{err: err}
181+
return
182+
}
183+
}
184+
185+
planItem.RemoteSecret = remoteSecret
186+
planItem.ComputeDiff()
187+
resultChan <- planItemResult{item: planItem, err: nil}
161188
}
189+
}()
190+
}
191+
192+
// Send work to workers
193+
go func() {
194+
for _, localSecret := range localSecrets {
195+
secretChan <- localSecret
162196
}
197+
close(secretChan)
198+
}()
163199

164-
planItem.RemoteSecret = remoteSecret
165-
planItem.ComputeDiff()
166-
p.AddItem(planItem)
200+
// Wait for all workers to finish and close result channel
201+
go func() {
202+
workerGroup.Wait()
203+
close(resultChan)
204+
}()
205+
206+
// Collect results
207+
for result := range resultChan {
208+
bar.Add(1)
209+
if result.err != nil {
210+
bar.Finish()
211+
return nil, result.err
212+
}
213+
p.AddItem(result.item)
167214
}
168215
bar.Finish()
169216
println("")

internal/plan/plan.go

Lines changed: 75 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,13 @@
11
package plan
22

33
import (
4+
"sync"
5+
46
log "github.com/sirupsen/logrus"
57

68
"github.com/mxcd/gitops-cli/internal/k8s"
79
"github.com/mxcd/gitops-cli/internal/secret"
10+
"github.com/mxcd/gitops-cli/internal/util"
811
)
912

1013
type Plan struct {
@@ -64,33 +67,83 @@ func (p *Plan) Execute() error {
6467
}
6568

6669
func executeKubernetesPlan(p *Plan) error {
70+
parallelism := util.GetCliContext().Int("parallelism")
71+
if parallelism < 1 {
72+
parallelism = 1
73+
}
74+
75+
// Filter items that need processing
76+
itemsToProcess := []PlanItem{}
6777
for _, item := range p.Items {
68-
if item.Diff.Equal {
78+
if !item.Diff.Equal {
79+
itemsToProcess = append(itemsToProcess, item)
80+
} else {
6981
log.Trace("Secret ", item.LocalSecret.Namespace, "/", item.LocalSecret.Name, " is equal, skipping...")
70-
continue
7182
}
72-
if item.Diff.Type == secret.SecretDiffTypeAdded {
73-
log.Trace("Secret ", item.LocalSecret.Namespace, "/", item.LocalSecret.Name, " is new, creating...")
74-
err := k8s.CreateSecret(item.LocalSecret, item.LocalSecret.Target)
75-
if err != nil {
76-
log.Error("Failed to create secret ", item.LocalSecret.Namespace, "/", item.LocalSecret.Name, " in cluster")
77-
return err
78-
}
79-
} else if item.Diff.Type == secret.SecretDiffTypeChanged {
80-
log.Trace("Secret ", item.LocalSecret.Namespace, "/", item.LocalSecret.Name, " is modified, updating...")
81-
err := k8s.UpdateSecret(item.LocalSecret, item.LocalSecret.Target)
82-
if err != nil {
83-
log.Error("Failed to update secret ", item.LocalSecret.Namespace, "/", item.LocalSecret.Name, " in cluster")
84-
return err
85-
}
86-
} else if item.Diff.Type == secret.SecretDiffTypeRemoved {
87-
log.Trace("Secret ", item.RemoteSecret.Namespace, "/", item.RemoteSecret.Name, " is deleted, deleting...")
88-
err := k8s.DeleteSecret(item.RemoteSecret, item.RemoteSecret.Target)
89-
if err != nil {
90-
log.Error("Failed to delete secret ", item.RemoteSecret.Namespace, "/", item.RemoteSecret.Name, " in cluster")
91-
return err
83+
}
84+
85+
if len(itemsToProcess) == 0 {
86+
return nil
87+
}
88+
89+
// Create channels for parallel processing
90+
itemChan := make(chan PlanItem, len(itemsToProcess))
91+
errorChan := make(chan error, len(itemsToProcess))
92+
93+
// Start worker goroutines
94+
var workerGroup sync.WaitGroup
95+
for i := 0; i < parallelism; i++ {
96+
workerGroup.Add(1)
97+
go func() {
98+
defer workerGroup.Done()
99+
for item := range itemChan {
100+
var err error
101+
if item.Diff.Type == secret.SecretDiffTypeAdded {
102+
log.Trace("Secret ", item.LocalSecret.Namespace, "/", item.LocalSecret.Name, " is new, creating...")
103+
err = k8s.CreateSecret(item.LocalSecret, item.LocalSecret.Target)
104+
if err != nil {
105+
log.Error("Failed to create secret ", item.LocalSecret.Namespace, "/", item.LocalSecret.Name, " in cluster")
106+
}
107+
} else if item.Diff.Type == secret.SecretDiffTypeChanged {
108+
log.Trace("Secret ", item.LocalSecret.Namespace, "/", item.LocalSecret.Name, " is modified, updating...")
109+
err = k8s.UpdateSecret(item.LocalSecret, item.LocalSecret.Target)
110+
if err != nil {
111+
log.Error("Failed to update secret ", item.LocalSecret.Namespace, "/", item.LocalSecret.Name, " in cluster")
112+
}
113+
} else if item.Diff.Type == secret.SecretDiffTypeRemoved {
114+
log.Trace("Secret ", item.RemoteSecret.Namespace, "/", item.RemoteSecret.Name, " is deleted, deleting...")
115+
err = k8s.DeleteSecret(item.RemoteSecret, item.RemoteSecret.Target)
116+
if err != nil {
117+
log.Error("Failed to delete secret ", item.RemoteSecret.Namespace, "/", item.RemoteSecret.Name, " in cluster")
118+
}
119+
}
120+
if err != nil {
121+
errorChan <- err
122+
}
92123
}
124+
}()
125+
}
126+
127+
// Send work to workers
128+
go func() {
129+
for _, item := range itemsToProcess {
130+
itemChan <- item
131+
}
132+
close(itemChan)
133+
}()
134+
135+
// Wait for all workers to finish
136+
go func() {
137+
workerGroup.Wait()
138+
close(errorChan)
139+
}()
140+
141+
// Check for errors
142+
for err := range errorChan {
143+
if err != nil {
144+
return err
93145
}
94146
}
147+
95148
return nil
96149
}

internal/secret/loader.go

Lines changed: 51 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package secret
33
import (
44
"errors"
55
"strings"
6+
"sync"
67

78
"github.com/mxcd/gitops-cli/internal/util"
89
"github.com/schollz/progressbar/v3"
@@ -40,6 +41,11 @@ func LoadLocalSecretsLimited(targetTypeFilter SecretTargetType, directoryLimit s
4041
}
4142
secretFileNames = filteredFileNames
4243

44+
parallelism := util.GetCliContext().Int("parallelism")
45+
if parallelism < 1 {
46+
parallelism = 1
47+
}
48+
4349
secrets := []*Secret{}
4450
bar := progressbar.NewOptions(len(secretFileNames),
4551
progressbar.OptionEnableColorCodes(true),
@@ -50,19 +56,58 @@ func LoadLocalSecretsLimited(targetTypeFilter SecretTargetType, directoryLimit s
5056
progressbar.OptionSetPredictTime(false),
5157
progressbar.OptionSetDescription("[green][Loading local secrets][reset]"),
5258
)
53-
for _, secretFileName := range secretFileNames {
59+
60+
// Create channels for parallel processing
61+
type secretResult struct {
62+
secret *Secret
63+
err error
64+
}
65+
66+
secretChan := make(chan string, len(secretFileNames))
67+
resultChan := make(chan secretResult, len(secretFileNames))
68+
69+
// Start worker goroutines
70+
var workerGroup sync.WaitGroup
71+
for i := 0; i < parallelism; i++ {
72+
workerGroup.Add(1)
73+
go func() {
74+
defer workerGroup.Done()
75+
for secretFileName := range secretChan {
76+
secret, err := FromPath(secretFileName)
77+
resultChan <- secretResult{secret: secret, err: err}
78+
}
79+
}()
80+
}
81+
82+
// Send work to workers
83+
go func() {
84+
for _, secretFileName := range secretFileNames {
85+
secretChan <- secretFileName
86+
}
87+
close(secretChan)
88+
}()
89+
90+
// Wait for all workers to finish and close result channel
91+
go func() {
92+
workerGroup.Wait()
93+
close(resultChan)
94+
}()
95+
96+
// Collect results
97+
for result := range resultChan {
5498
bar.Add(1)
55-
secret, err := FromPath(secretFileName)
56-
if err != nil {
99+
if result.err != nil {
57100
bar.Finish()
58-
return nil, err
101+
return nil, result.err
59102
}
103+
104+
secret := result.secret
60105
if secret.TargetType != targetTypeFilter && targetTypeFilter != SecretTargetTypeAll {
61-
log.Trace("Skipping file due to targetType filter: ", secretFileName)
106+
log.Trace("Skipping file due to targetType filter: ", secret.Path)
62107
continue
63108
}
64109
if clusterLimit != "" && secret.Target != clusterLimit {
65-
log.Trace("Skipping file due to target filter: ", secretFileName)
110+
log.Trace("Skipping file due to target filter: ", secret.Path)
66111
continue
67112
}
68113
for _, s := range secrets {

0 commit comments

Comments
 (0)