Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/sql/colexec/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ go_library(
"//pkg/util/mon",
"//pkg/util/randutil",
"//pkg/util/stringarena",
"//pkg/util/timeutil",
"//pkg/util/tracing",
"@com_github_cockroachdb_apd_v3//:apd", # keep
"@com_github_cockroachdb_errors//:errors",
Expand Down
17 changes: 17 additions & 0 deletions pkg/sql/colexec/parallel_unordered_synchronizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/util/admission"
"github.com/cockroachdb/cockroach/pkg/util/cancelchecker"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/errors"
)
Expand Down Expand Up @@ -339,7 +340,14 @@ func (s *ParallelUnorderedSynchronizer) workerSendErr(err error) {
// error on s.errCh, resulting in the first error pushed to be observed by the
// Next goroutine. Inputs are asynchronous so that the synchronizer is minimally
// affected by slow inputs.
//
// As a side-effect, this goroutine's grunning is measured and emitted as a
// Metrics.RawSQLCPUTime entry on the drained metadata so that the gateway can
// attribute the upstream operator work driven via input.Root.Next() to the
// query's SQL CPU time.
func (s *ParallelUnorderedSynchronizer) workerRun(input colexecargs.OpWithMetaInfo, inputIdx int) {
var cpuStopWatch timeutil.CPUStopWatch
cpuStopWatch.Start()
select {
case <-s.blockWorkersCh:
case <-s.exitWorkersCh:
Expand Down Expand Up @@ -408,6 +416,15 @@ func (s *ParallelUnorderedSynchronizer) workerRun(input colexecargs.OpWithMetaIn
if input.MetadataSources != nil {
msg.meta = append(msg.meta, input.MetadataSources.DrainMeta()...)
}
// Emit this worker goroutine's grunning as metadata. The synchronizer
// drives upstream operator work inline via input.Root.Next(), so this
// goroutine's CPU includes that work.
if delta := cpuStopWatch.Stop(); delta > 0 {
grunningMeta := execinfrapb.ProducerMetadata{}
grunningMeta.Metrics = execinfrapb.GetMetricsMeta()
grunningMeta.Metrics.RawSQLCPUTime = int64(delta)
msg.meta = append(msg.meta, grunningMeta)
}
default:
s.workerSendErr(errors.AssertionFailedf("unhandled state in ParallelUnorderedSynchronizer input goroutine: %d", state))
return
Expand Down
32 changes: 25 additions & 7 deletions pkg/sql/colexec/parallel_unordered_synchronizer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,23 +155,26 @@ func TestParallelUnorderedSynchronizer(t *testing.T) {
}
}
if meta != nil {
streamingMeta = append(streamingMeta, *meta)
// Skip the always-on grunning emission from each worker
// goroutine; it is orthogonal to the test-induced metadata
// this test exercises.
if meta.Metrics == nil {
streamingMeta = append(streamingMeta, *meta)
}
continue
}
if b.Length() == 0 {
if terminationScenario == synchronizerGracefulTermination {
// Successful run, check that all inputs have returned metadata.
drainedMeta := s.DrainMeta()
meta := append(streamingMeta, drainedMeta...)
meta := append(streamingMeta, filterMetricsMeta(s.DrainMeta())...)
require.Equal(t, len(inputs), len(meta), "metadata length mismatch, returned metadata is: %v", meta)
}
break
}
batchesReturned++
if terminationScenario == synchronizerPrematureDrainMeta && batchesReturned < expectedBatchesReturned {
// Call DrainMeta before the input is finished.
drainedMeta := s.DrainMeta()
meta := append(streamingMeta, drainedMeta...)
meta := append(streamingMeta, filterMetricsMeta(s.DrainMeta())...)
// Make sure that all expected metadata is still propagated.
// Note that if the last input wasn't pre-emptied, then the
// error will not match.
Expand Down Expand Up @@ -239,7 +242,8 @@ func TestUnorderedSynchronizerNoLeaksOnError(t *testing.T) {
for {
if err := colexecerror.CatchVectorizedRuntimeError(func() {
_, meta := s.Next()
if meta != nil {
// Skip the always-on grunning emission from each worker goroutine.
if meta != nil && meta.Metrics == nil {
streamingMeta = append(streamingMeta, *meta)
}
}); err != nil {
Expand All @@ -254,11 +258,25 @@ func TestUnorderedSynchronizerNoLeaksOnError(t *testing.T) {
// properly drain their metadata sources. Notably, the error itself should
// not be propagated as metadata (i.e. we don't want it to be duplicated),
// but each input should produce a single metadata object.
require.Equal(t, len(inputs), len(streamingMeta)+len(s.DrainMeta()))
require.Equal(t, len(inputs), len(streamingMeta)+len(filterMetricsMeta(s.DrainMeta())))
// This is the crux of the test: assert that all inputs have finished.
require.Equal(t, len(inputs), int(atomic.LoadUint32(&s.numFinishedInputs)))
}

// filterMetricsMeta drops the always-on per-goroutine grunning emissions
// (Metrics-only metadata) so tests that count test-induced metadata don't
// have to account for the synchronizer's own instrumentation overhead.
func filterMetricsMeta(meta []execinfrapb.ProducerMetadata) []execinfrapb.ProducerMetadata {
filtered := meta[:0]
for _, m := range meta {
if m.Metrics != nil && m.Err == nil {
continue
}
filtered = append(filtered, m)
}
return filtered
}

func BenchmarkParallelUnorderedSynchronizer(b *testing.B) {
const numInputs = 6

Expand Down
15 changes: 9 additions & 6 deletions pkg/sql/colexecop/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,10 +95,11 @@ type KVReader interface {
// GetConsumedRU returns the number of RUs that were consumed during the
// KV reads.
GetConsumedRU() uint64
// GetLocalKVCPUTime returns the CPU time (in nanoseconds) spent on the
// calling goroutine during KV operations. It is measured via grunning
// deltas around txn.Send() calls. It must be safe for concurrent use.
// It is used to calculate the SQL CPU time.
// GetLocalKVCPUTime returns the SQL goroutine CPU time (in nanoseconds)
// spent inside KV calls, measured via grunning deltas around txn.Send.
// This is the portion of SQL goroutine CPU that overlapped with KV work,
// not the CPU consumed on KV servers (see GetKVResponseCPUTime for that).
// It must be safe for concurrent use.
GetLocalKVCPUTime() int64
// GetKVResponseCPUTime returns the CPU time as reported by KV BatchResponses
// processed by the KVReader throughout its lifetime so far.
Expand Down Expand Up @@ -543,9 +544,11 @@ type VectorizedStatsCollector interface {
func NextNoMeta(op Operator) coldata.Batch {
b, meta := op.Next()
if meta != nil {
if buildutil.CrdbTestBuild && meta.RowNum != nil {
if buildutil.CrdbTestBuild && (meta.RowNum != nil || meta.Metrics != nil) {
// In test builds, the invariantsChecker can inject RowNum metadata
// in arbitrary Operator chains, so we'll just silently swallow it.
// in arbitrary Operator chains, and synchronizer / router / outbox
// wrappers may emit Metrics metadata carrying the always-on
// grunning measurement. Both are silently swallowed.
return NextNoMeta(op)
}
colexecerror.InternalError(errors.AssertionFailedf("non-nil metadata from %T: %v", op, meta))
Expand Down
11 changes: 7 additions & 4 deletions pkg/sql/colfetcher/cfetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,8 +278,9 @@ type cFetcher struct {
stableKVs bool
// bytesRead, kvPairsRead, kvCPUTime, localKVCPUTime, and
// batchRequestsIssued store the total number of bytes read, key-value
// pairs read, CPU time reported by KV BatchResponses, local KV CPU time,
// and BatchRequests issued, respectively, by this cFetcher throughout its
// pairs read, KV-server CPU time reported by BatchResponses, SQL goroutine
// CPU time spent inside KV calls (measured via grunning), and
// BatchRequests issued, respectively, by this cFetcher throughout its
// lifetime in case when the underlying row.KVFetcher has already been
// closed and nil-ed out.
//
Expand Down Expand Up @@ -1500,8 +1501,10 @@ func (cf *cFetcher) getKVCPUTime() int64 {
return cf.kvCPUTime
}

// getLocalKVCPUTime returns the CPU time spent on the calling goroutine during
// KV operations, as measured by grunning deltas around txn.Send() calls.
// getLocalKVCPUTime returns the SQL goroutine CPU time spent inside KV calls,
// measured via grunning deltas around txn.Send. This is the portion of SQL
// goroutine CPU that overlapped with KV work, not the CPU consumed on KV
// servers (see getKVCPUTime for that).
func (cf *cFetcher) getLocalKVCPUTime() int64 {
if cf.fetcher != nil {
return cf.fetcher.GetLocalKVCPUTime()
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/colfetcher/colbatch_direct_scan.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ func (s *ColBatchDirectScan) DrainMeta() []execinfrapb.ProducerMetadata {
meta.Metrics.BytesRead = s.GetBytesRead()
meta.Metrics.RowsRead = s.getRowsReadSinceLastMeta()
meta.Metrics.KVCPUTime = s.GetKVResponseCPUTime()
meta.Metrics.LocalKVCPUTime = s.GetLocalKVCPUTime()
meta.Metrics.StageID = s.stageID
trailingMeta = append(trailingMeta, *meta)
return trailingMeta
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/colfetcher/colbatch_scan.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,7 @@ func (s *ColBatchScan) DrainMeta() []execinfrapb.ProducerMetadata {
meta.Metrics.BytesRead = s.GetBytesRead()
meta.Metrics.RowsRead = s.getRowsReadSinceLastMeta()
meta.Metrics.KVCPUTime = s.GetKVResponseCPUTime()
meta.Metrics.LocalKVCPUTime = s.GetLocalKVCPUTime()
meta.Metrics.StageID = s.stageID
trailingMeta = append(trailingMeta, *meta)
return trailingMeta
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/colfetcher/index_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -427,6 +427,7 @@ func (s *ColIndexJoin) DrainMeta() []execinfrapb.ProducerMetadata {
meta.Metrics.BytesRead = s.GetBytesRead()
meta.Metrics.RowsRead = s.GetRowsRead()
meta.Metrics.KVCPUTime = s.GetKVResponseCPUTime()
meta.Metrics.LocalKVCPUTime = s.GetLocalKVCPUTime()
trailingMeta = append(trailingMeta, *meta)
if !s.flowCtx.Gateway {
if trace := tracing.SpanFromContext(s.Ctx).GetConfiguredRecording(); trace != nil {
Expand Down
17 changes: 17 additions & 0 deletions pkg/sql/colflow/colrpc/outbox.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/mon"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/logtags"
Expand Down Expand Up @@ -81,6 +82,11 @@ type Outbox struct {
// A copy of Run's caller ctx, with no StreamID tag.
// Used to pass a clean context to the input.Next.
runnerCtx context.Context

// cpuStopWatch measures goroutine CPU time for the outbox goroutine.
// The zero value is valid; Stop() returns 0 if grunning is not
// supported or Start was not called.
cpuStopWatch timeutil.CPUStopWatch
}

// NewOutbox creates a new Outbox.
Expand Down Expand Up @@ -209,6 +215,7 @@ func (o *Outbox) Run(
flowCtxCancel context.CancelFunc,
connectionTimeout time.Duration,
) {
o.cpuStopWatch.Start()
flowCtx := ctx
// Derive a child context so that we can cancel all components rooted in
// this outbox.
Expand Down Expand Up @@ -423,6 +430,16 @@ func (o *Outbox) sendDrainedMetadata(
msg.Data.Metadata = append(msg.Data.Metadata, execinfrapb.LocalMetaToRemoteProducerMeta(ctx, meta))
}
}
// Always-on: each outbox emits its goroutine's raw CPU time via Metrics
// metadata. The gateway sums all RawSQLCPUTime entries and subtracts
// total LocalKVCPUTime to derive SQL CPU.
if delta := o.cpuStopWatch.Stop(); delta > 0 {
sqlCPUMeta := execinfrapb.ProducerMetadata{}
sqlCPUMeta.Metrics = execinfrapb.GetMetricsMeta()
sqlCPUMeta.Metrics.RawSQLCPUTime = int64(delta)
msg.Data.Metadata = append(msg.Data.Metadata,
execinfrapb.LocalMetaToRemoteProducerMeta(ctx, sqlCPUMeta))
}
if !o.flowCtx.Gateway {
if trace := tracing.SpanFromContext(ctx).GetConfiguredRecording(); trace != nil {
msg.Data.Metadata = append(msg.Data.Metadata, execinfrapb.RemoteProducerMetadata{
Expand Down
12 changes: 12 additions & 0 deletions pkg/sql/colflow/routers.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/mon"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/errors"
"github.com/marusama/semaphore"
Expand Down Expand Up @@ -590,6 +591,8 @@ func (r *HashRouter) getDrainState() hashRouterDrainState {
// output calculated by hashing columns. Cancel the given context to terminate
// early.
func (r *HashRouter) Run(ctx context.Context) {
var cpuStopWatch timeutil.CPUStopWatch
cpuStopWatch.Start()
var span *tracing.Span
ctx, span = execinfra.ProcessorSpan(ctx, r.flowCtx, "hash router", r.processorID)
if span != nil {
Expand Down Expand Up @@ -671,6 +674,15 @@ func (r *HashRouter) Run(ctx context.Context) {
}
drainedMeta = append(drainedMeta, r.inputMetaInfo.MetadataSources.DrainMeta()...)
}
// Emit this goroutine's CPU time as metadata. The hash router goroutine
// executes upstream operators via Input.Next(), so its CPU includes all
// upstream operator work.
if delta := cpuStopWatch.Stop(); delta > 0 {
grunningMeta := execinfrapb.ProducerMetadata{}
grunningMeta.Metrics = execinfrapb.GetMetricsMeta()
grunningMeta.Metrics.RawSQLCPUTime = int64(delta)
drainedMeta = append(drainedMeta, grunningMeta)
}
// Non-blocking send of metadata so that one of the outputs can return it
// in DrainMeta.
r.waitForDrainedMetadata <- drainedMeta
Expand Down
25 changes: 20 additions & 5 deletions pkg/sql/colflow/routers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -951,10 +951,13 @@ func TestHashRouterOneOutput(t *testing.T) {
t.Fatal(err)
}
wg.Wait()
// Expect no metadata, this should be a successful run.
unexpectedMetadata := ro.DrainMeta()
if len(unexpectedMetadata) != 0 {
t.Fatalf("unexpected metadata when draining HashRouter output: %+v", unexpectedMetadata)
// Expect no metadata other than the always-on Metrics record
// carrying the router goroutine's RawSQLCPUTime.
for _, meta := range ro.DrainMeta() {
if meta.Metrics != nil {
continue
}
t.Fatalf("unexpected metadata when draining HashRouter output: %+v", meta)
}
if !mtc.skipExpSpillCheck {
// If len(sel) == 0, no items will have been enqueued so override an
Expand Down Expand Up @@ -1193,8 +1196,20 @@ func TestHashRouterRandom(t *testing.T) {

wg.Wait()
// The waitGroup protects metadataMu from concurrent access, so there's
// no need to lock the mutex here.
// no need to lock the mutex here. Filter out the always-on grunning
// emission from the HashRouter goroutine; it is orthogonal to the
// error metadata this test exercises.
metadata := metadataMu.metadata
for i := range metadata {
filtered := metadata[i][:0]
for _, m := range metadata[i] {
if m.Metrics != nil && m.Err == nil {
continue
}
filtered = append(filtered, m)
}
metadata[i] = filtered
}
checkMetadata := func(t *testing.T, expectedErrMsgs []string) {
t.Helper()
require.Equal(t, 1, len(metadata), "one output (the last to exit) should return metadata")
Expand Down
5 changes: 5 additions & 0 deletions pkg/sql/colflow/vectorized_flow_shutdown_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -403,6 +403,11 @@ func TestVectorizedFlowShutdown(t *testing.T) {
checkMetadata := func(receivedMeta []execinfrapb.ProducerMetadata) {
receivedMetaFromID := make([]bool, streamID)
for _, meta := range receivedMeta {
// Each outbox also emits an always-on Metrics record
// carrying RawSQLCPUTime; skip those.
if meta.Metrics != nil {
continue
}
require.NotNil(t, meta.Err)
id, err := strconv.Atoi(meta.Err.Error())
require.NoError(t, err)
Expand Down
Loading
Loading