You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
// Partial represents the intermediate result of a Map phase.// For most functions, only Value is meaningful.// For MEAN, both Value (sum) and Count are used.typePartial[NNumber] struct {
ValueNCountN
}
Map interface (replaces Func, used by data node, standalone, and TopN)
// Map accumulates raw values and produces aggregation results.// It serves as the local accumulator for raw data points.typeMap[NNumber] interface {
// In feeds a raw value into the accumulator.In(N)
// Val returns the locally finalized aggregation result.// For MEAN, this computes sum/count. For others, same as Partial().Value.Val() N// Partial returns the intermediate result for the reduce phase.Partial() Partial[N]
// Reset clears the accumulator for reuse.Reset()
}
Reduce interface (new, for liaison node use)
// Reduce combines intermediate results from Map phases into a final value.typeReduce[NNumber] interface {
// Combine feeds an intermediate result from a Map phase.Combine(Partial[N])
// Val returns the final aggregated value.Val() N// Reset clears the accumulator for reuse.Reset()
}
// PartialToFieldValues converts a Partial to field values for wire transport.funcPartialToFieldValues[NNumber](pPartial[N]) ([]*modelv1.FieldValue, error)
// FieldValuesToPartial converts field values from wire transport to a Partial.funcFieldValuesToPartial[NNumber](fvs []*modelv1.FieldValue) (Partial[N], error)
For non-MEAN functions, this produces a single FieldValue. For MEAN, it produces two (sum and count).
The aggregationPlan and its iterators (aggGroupIterator, aggAllIterator) use Map[N] instead of Func[N]:
Standalone (single node): Call mapFunc.In(v) for each raw value, then mapFunc.Val() for the final result. This is a drop-in replacement for the old Func — same In/Val/Reset contract.
Distributed data node: Call mapFunc.In(v) for each raw value, then mapFunc.Partial() to produce the intermediate result. Convert via PartialToFieldValues for the wire response.
The liaison-side aggregation plan and iterators use Reduce[N]:
Receive intermediate results from data nodes
Call reduceFunc.Combine(partial) for each intermediate
Call reduceFunc.Val() to produce the final result
This likely means splitting the current aggregationPlan into two variants (map vs reduce) or parameterizing it with a mode flag, since the data node plan and liaison node plan serve different roles.
Replace int64Func aggregation.Func[int64] with mapFunc aggregation.Map[int64] on topNAggregatorItem. TopN feeds raw values into the aggregation, so it needs Map semantics (not Reduce). For COUNT, Map.In() correctly increments a counter rather than summing values:
typetopNAggregatorItemstruct {
mapFunc aggregation.Map[int64]
// ... other fields unchanged
}
Usage changes:
exist.mapFunc.In(item.val) instead of exist.int64Func.In(item.val) (same signature)
item.mapFunc.Val() instead of item.int64Func.Val() (same signature)
Execution Flow (distributed)
sequenceDiagram
participant Client
participant Liaison as Liaison_Node
participant Data1 as Data_Node_1
participant Data2 as Data_Node_2
Client->>Liaison: QueryRequest with Agg
Liaison->>Data1: InternalQueryRequest (push down agg)
Liaison->>Data2: InternalQueryRequest (push down agg)
Note over Data1: Map phase
Data1->>Data1: mapFunc.In(rawValue) per data point
Data1->>Data1: mapFunc.Partial() -> intermediate
Data1-->>Liaison: InternalQueryResponse with Partial results
Note over Data2: Map phase
Data2->>Data2: mapFunc.In(rawValue) per data point
Data2->>Data2: mapFunc.Partial() -> intermediate
Data2-->>Liaison: InternalQueryResponse with Partial results
Note over Liaison: Reduce phase
Liaison->>Liaison: reduceFunc.Combine(partial1)
Liaison->>Liaison: reduceFunc.Combine(partial2)
Liaison->>Liaison: reduceFunc.Val() -> final result
Liaison-->>Client: QueryResponse
Search before asking
Description
Current Architecture
The
Func[N]interface (In,Val,Reset) is used as a monolithic accumulator everywhere:Analyze)DistributedAnalyze)The distributed push-down is handled with hacks:
needCompletePushDownAggonly supports MAX/MIN/SUM/COUNT (not MEAN)New Interface Design
Intermediate type in aggregation.go
Map interface (replaces Func, used by data node, standalone, and TopN)
Reduce interface (new, for liaison node use)
Factory functions
Concrete Implementations in function.go
SUM
Partial()returns{Value: sum}.Valuefields.Val()returns total sum.COUNT
Partial()returns{Value: count}.Valuefields (same logic as SUM reduce).Val()returns total count.MAX
Partial()returns{Value: max}.Valuefields.Val()returns global max.MIN
Partial()returns{Value: min}.Valuefields.Val()returns global min.MEAN (the key case this enables)
Partial()returns{Value: sum, Count: count}.Val()returnstotalSum / totalCount.Serialization Helpers in aggregation.go
For non-MEAN functions, this produces a single
FieldValue. For MEAN, it produces two (sum and count).Interface Usage by Context
Analyze)MapIn(),Val(),Reset()MapIn(),Partial(),Reset()ReduceCombine(),Val(),Reset()MapIn(),Val(),Reset()Usage Changes
Standalone / Data node: measure_plan_aggregation.go
The
aggregationPlanand its iterators (aggGroupIterator,aggAllIterator) useMap[N]instead ofFunc[N]:mapFunc.In(v)for each raw value, thenmapFunc.Val()for the final result. This is a drop-in replacement for the oldFunc— sameIn/Val/Resetcontract.mapFunc.In(v)for each raw value, thenmapFunc.Partial()to produce the intermediate result. Convert viaPartialToFieldValuesfor the wire response.Liaison node: measure_plan_aggregation.go
The liaison-side aggregation plan and iterators use
Reduce[N]:reduceFunc.Combine(partial)for each intermediatereduceFunc.Val()to produce the final resultThis likely means splitting the current
aggregationPlaninto two variants (map vs reduce) or parameterizing it with a mode flag, since the data node plan and liaison node plan serve different roles.Distributed plan: measure_plan_distributed.go
needCompletePushDownAggflag (all aggregation functions can now be pushed down)deduplicateAggregatedDataPointsWithShard) remains for replica handlingTopN post-processor: topn_post_processor.go
Replace
int64Func aggregation.Func[int64]withmapFunc aggregation.Map[int64]ontopNAggregatorItem. TopN feeds raw values into the aggregation, so it needs Map semantics (not Reduce). For COUNT,Map.In()correctly increments a counter rather than summing values:Usage changes:
exist.mapFunc.In(item.val)instead ofexist.int64Func.In(item.val)(same signature)item.mapFunc.Val()instead ofitem.int64Func.Val()(same signature)Execution Flow (distributed)
sequenceDiagram participant Client participant Liaison as Liaison_Node participant Data1 as Data_Node_1 participant Data2 as Data_Node_2 Client->>Liaison: QueryRequest with Agg Liaison->>Data1: InternalQueryRequest (push down agg) Liaison->>Data2: InternalQueryRequest (push down agg) Note over Data1: Map phase Data1->>Data1: mapFunc.In(rawValue) per data point Data1->>Data1: mapFunc.Partial() -> intermediate Data1-->>Liaison: InternalQueryResponse with Partial results Note over Data2: Map phase Data2->>Data2: mapFunc.In(rawValue) per data point Data2->>Data2: mapFunc.Partial() -> intermediate Data2-->>Liaison: InternalQueryResponse with Partial results Note over Liaison: Reduce phase Liaison->>Liaison: reduceFunc.Combine(partial1) Liaison->>Liaison: reduceFunc.Combine(partial2) Liaison->>Liaison: reduceFunc.Val() -> final result Liaison-->>Client: QueryResponserelated to #13291
Use case
No response
Related issues
No response
Are you willing to submit a pull request to implement this on your own?
Code of Conduct