Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions nursery.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ type nursery struct {
limiter limiter
goRoutine chan Routine
routinesCount atomic.Int32
bufSize int
}

func newNursery() *nursery {
Expand All @@ -39,6 +40,7 @@ func newNursery() *nursery {
errors: make(chan error),
limiter: nil,
goRoutine: make(chan func() error),
bufSize: 0,
}

return n
Expand Down
13 changes: 13 additions & 0 deletions options.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,3 +88,16 @@ func WithMaxGoroutines(max int) BlockOption {
n.limiter = make(chan struct{}, max+1)
}
}

// WithOutputBuffer returns a nursery block option that sets the output buffer
// size for AsyncMap operations. This option is only used by AsyncMap and is
// ignored by other functions. If size is zero or negative, an unbuffered
// channel is used (default behavior).
func WithOutputBuffer(size int) BlockOption {
return func(n *nursery) {
if size < 0 {
size = 0
}
n.bufSize = size
}
}
105 changes: 105 additions & 0 deletions utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,3 +153,108 @@ func Map2[K comparable, V any](input map[K]V, f func(context.Context, K, V) (K,
return nil
}, opts...)
}

// AsyncMap concurrently transforms an input sequence into an output sequence.
// The order of the output sequence is non-deterministic. Items are delivered as they finish.
//
// Within the parent nursery, a goroutine is launched which creates a nested nursery.
// The sequence is processed entirely within this nested nursery. By default, the parent
// nursery is used as the context for the nested nursery.
//
// If a transform function returns an error, that error is propagated to the parent nursery
// and the nested nursery's context is canceled, stopping processing of all remaining items.
//
// By default, AsyncMap uses an unbuffered channel for outputs. For high-throughput scenarios
// where producers might temporarily outpace consumers, use WithOutputBuffer to specify
// a buffer size.
//
// If consuming from the output sequence stops before it's fully drained (e.g.,
// breaking out of the range loop early), transformation goroutines may block indefinitely.
// To prevent this, cancel the parent nursery's context when done consuming. For multi-stage
// pipelines requiring finer control, provide a separate cancellable context via WithContext.
//
// Example:
//
// _ = Block(func(n Nursery) error {
// inputs := slices.Values([]int{1, 2, 3, 4, 5})
// outputs := conc.AsyncMap(n, inputs, func(_ context.Context, i int) (string, error) {
// return fmt.Sprintf("item-%d", i), nil
// })
//
// for output := range outputs {
// fmt.Println(output)
// }
// })
//
// Note: If supplying a WithContext option, it will override the default parent context.
// Ensure any custom context derives from the parent nursery context to maintain proper
// cancellation propagation.
func AsyncMap[I any, O any](
parent Nursery,
inputs iter.Seq[I],
transform func(context.Context, I) (O, error),
opts ...BlockOption,
) iter.Seq[O] {
// Create a temporary nursery to get correct output buffer size
tn := newNursery()
for _, opt := range opts {
opt(tn)
}
var outputs chan O
if tn.bufSize > 0 {
outputs = make(chan O, tn.bufSize)
} else {
outputs = make(chan O)
}
allOpts := append([]BlockOption{WithContext(parent)}, opts...)

// Core processing loop
process := func(nested Nursery) error {
done := nested.Done()
for input := range inputs {
select {
case <-done:
return nil
default:
input := input
nested.Go(func() error {
output, err := transform(nested, input)
if err != nil {
return err
}
select {
case outputs <- output:
return nil
case <-done:
return nested.Err()
}
})
}
}
return nil
}

// Launch
parent.Go(func() error {
defer func() {
close(outputs)
}()
if err := Block(process, allOpts...); err != nil {
return err
}
return nil
})

return chanSeq(outputs)
}

// chanSeq returns an iterator that yields the values of ch until it is closed.
func chanSeq[T any](ch <-chan T) iter.Seq[T] {
return func(yield func(T) bool) {
for v := range ch {
if !yield(v) {
return
}
}
}
}
Loading