Skip to content
This repository was archived by the owner on Sep 30, 2024. It is now read-only.

Commit 27abfc8

Browse files
authored
Use conc (#52805)
## Test plan <!-- All pull requests REQUIRE a test plan: https://docs.sourcegraph.com/dev/background-information/testing_principles -->
1 parent 60b0a48 commit 27abfc8

5 files changed

Lines changed: 18 additions & 76 deletions

File tree

internal/goroutine/BUILD.bazel

Lines changed: 0 additions & 3 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

internal/uploadstore/BUILD.bazel

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

internal/uploadstore/gcs_client.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -193,7 +193,7 @@ func (s *gcsStore) create(ctx context.Context, bucket gcsBucketHandle) error {
193193
}
194194

195195
func (s *gcsStore) deleteSources(ctx context.Context, bucket gcsBucketHandle, sources []string) error {
196-
return RunWorkersOverStrings(sources, func(index int, source string) error {
196+
return ForEachString(sources, func(index int, source string) error {
197197
if err := bucket.Object(source).Delete(ctx); err != nil {
198198
return errors.Wrap(err, "failed to delete source object")
199199
}

internal/uploadstore/pool.go

Lines changed: 13 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -2,81 +2,24 @@ package uploadstore
22

33
import (
44
"runtime"
5-
"sync"
65

7-
"github.com/sourcegraph/sourcegraph/lib/errors"
6+
"github.com/sourcegraph/conc/pool"
87
)
98

10-
// poolWorker is a function invoked by RunWorkers that sends
11-
// any errors that occur during execution down a shared channel.
12-
type poolWorker func(errs chan<- error)
13-
14-
// runWorkersN invokes the given worker n times and collects the
15-
// errors from each invocation.
16-
func runWorkersN(n int, worker poolWorker) (err error) {
17-
errs := make(chan error, n)
18-
19-
var wg sync.WaitGroup
20-
for i := 0; i < n; i++ {
21-
wg.Add(1)
22-
go func() { worker(errs); wg.Done() }()
23-
}
24-
25-
go func() {
26-
wg.Wait()
27-
close(errs)
28-
}()
29-
30-
for e := range errs {
31-
if err == nil {
32-
err = e
33-
} else {
34-
err = errors.Append(err, e)
35-
}
36-
}
37-
38-
return err
39-
}
40-
41-
// RunWorkersOverStrings invokes the given worker once for each of the
42-
// given string values. The worker function will receive the index as well
43-
// as the string value as parameters. Workers will be invoked in a number
9+
// ForEachString invokes the given callback once for each of the
10+
// given string values. The callback function will receive the index as well
11+
// as the string value as parameters. Callbacks will be invoked in a number
4412
// of concurrent routines proportional to the maximum number of CPUs that
4513
// can be executing simultaneously.
46-
func RunWorkersOverStrings(values []string, worker func(index int, value string) error) error {
47-
return runWorkersOverStringsN(runtime.GOMAXPROCS(0), values, worker)
48-
}
49-
50-
// RunWorkersOverStrings invokes the given worker once for each of the
51-
// given string values. The worker function will receive the index as well
52-
// as the string value as parameters. Workers will be invoked in n concurrent
53-
// routines.
54-
func runWorkersOverStringsN(n int, values []string, worker func(index int, value string) error) error {
55-
return runWorkersN(n, indexedStringWorker(loadIndexedStringChannel(values), worker))
56-
}
57-
58-
type indexedString struct {
59-
index int
60-
value string
61-
}
62-
63-
func loadIndexedStringChannel(values []string) <-chan indexedString {
64-
ch := make(chan indexedString, len(values))
65-
defer close(ch)
66-
14+
func ForEachString(values []string, f func(index int, value string) error) error {
15+
p := pool.New().
16+
WithErrors().
17+
WithMaxGoroutines(runtime.GOMAXPROCS(0))
6718
for i, value := range values {
68-
ch <- indexedString{index: i, value: value}
69-
}
70-
71-
return ch
72-
}
73-
74-
func indexedStringWorker(ch <-chan indexedString, worker func(index int, value string) error) poolWorker {
75-
return func(errs chan<- error) {
76-
for value := range ch {
77-
if err := worker(value.index, value.value); err != nil {
78-
errs <- err
79-
}
80-
}
19+
i, value := i, value
20+
p.Go(func() error {
21+
return f(i, value)
22+
})
8123
}
24+
return p.Wait()
8225
}

internal/uploadstore/s3_client.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -200,7 +200,7 @@ func (s *s3Store) Compose(ctx context.Context, destination string, sources ...st
200200
var m sync.Mutex
201201
etags := map[int]*string{}
202202

203-
if err := RunWorkersOverStrings(sources, func(index int, source string) error {
203+
if err := ForEachString(sources, func(index int, source string) error {
204204
partNumber := index + 1
205205

206206
copyResult, err := s.client.UploadPartCopy(ctx, &s3.UploadPartCopyInput{
@@ -331,7 +331,7 @@ func (s *s3Store) create(ctx context.Context) error {
331331
}
332332

333333
func (s *s3Store) deleteSources(ctx context.Context, bucket string, sources []string) error {
334-
return RunWorkersOverStrings(sources, func(index int, source string) error {
334+
return ForEachString(sources, func(index int, source string) error {
335335
if _, err := s.client.DeleteObject(ctx, &s3.DeleteObjectInput{
336336
Bucket: aws.String(bucket),
337337
Key: aws.String(source),

0 commit comments

Comments
 (0)