Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
/**
* Window value aggregator for BOOL_AND window function.
*/
public class BoolAndValueAggregator implements WindowValueAggregator<Object> {
public class BoolAndWindowValueAggregator implements WindowValueAggregator<Object> {
private int _numFalse = 0;
private int _numTrue = 0;
private int _numNull = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
/**
* Window value aggregator for BOOL_OR window function.
*/
public class BoolOrValueAggregator implements WindowValueAggregator<Object> {
public class BoolOrWindowValueAggregator implements WindowValueAggregator<Object> {
private int _numFalse = 0;
private int _numTrue = 0;
private int _numNull = 0;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.query.runtime.operator.window.aggregate;

import java.util.ArrayDeque;
import javax.annotation.Nullable;


/**
* Window value aggregator for MAX window function that preserves the input type by using {@link Comparable} for
* comparisons. Used for types like BIG_DECIMAL that don't have a dedicated primitive-typed aggregator.
*/
public class MaxComparableWindowValueAggregator implements WindowValueAggregator<Object> {

private final boolean _supportRemoval;
private final ArrayDeque<Object> _deque = new ArrayDeque<>();
private Object _maxValue = null;

/**
* @param supportRemoval whether this window value aggregator should support removal of values. Some cases require
* only addition of values in which case this value aggregator will have O(1) space complexity;
* if {@code supportRemoval} is true, this value aggregator will have O(K) space complexity
* (where K is the max size of the window).
*/
public MaxComparableWindowValueAggregator(boolean supportRemoval) {
_supportRemoval = supportRemoval;
}

@Override
public void addValue(@Nullable Object value) {
if (value != null) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not introduced in this PR, but how do we handle null first/last?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the current implementation (both pre- and post-PR), nulls are simply skipped in addValue() / removeValue() - they don't participate in MIN/MAX (and other aggregations), which is standard SQL aggregate-over-nulls behavior. For other non aggregate window functions like FIRST_VALUE, LAST_VALUE, we support customizable null handling using the standard IGNORE NULLS / RESPECT NULLS modifiers - #14264. The actual null ordering gets pushed into the sort created below the window by the planner.

if (_supportRemoval) {
// Remove previously added elements if they're < than the current element since they're no longer useful
while (!_deque.isEmpty() && compare(_deque.peekLast(), value) < 0) {
_deque.pollLast();
}
_deque.addLast(value);
} else {
if (_maxValue == null || compare(value, _maxValue) > 0) {
_maxValue = value;
}
}
}
}

@Override
public void removeValue(@Nullable Object value) {
if (!_supportRemoval) {
throw new UnsupportedOperationException();
}

if (value != null) {
if (!_deque.isEmpty() && compare(_deque.peekFirst(), value) == 0) {
_deque.pollFirst();
}
}
}

@Nullable
@Override
public Object getCurrentAggregatedValue() {
if (_supportRemoval) {
return _deque.peekFirst();
} else {
return _maxValue;
}
}

@Override
public void clear() {
_deque.clear();
_maxValue = null;
}

@SuppressWarnings({"unchecked", "rawtypes"})
private static int compare(Object a, Object b) {
return ((Comparable) a).compareTo(b);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
/**
* Window value aggregator for MAX window function.
*/
public class MaxWindowValueAggregator implements WindowValueAggregator<Object> {
public class MaxDoubleWindowValueAggregator implements WindowValueAggregator<Object> {

private final boolean _supportRemoval;
private final DoubleArrayFIFOQueue _deque = new DoubleArrayFIFOQueue();
Expand All @@ -37,7 +37,7 @@ public class MaxWindowValueAggregator implements WindowValueAggregator<Object> {
* if {@code supportRemoval} is true, this value aggregator will have O(K) space complexity
* (where K is the max size of the window).
*/
public MaxWindowValueAggregator(boolean supportRemoval) {
public MaxDoubleWindowValueAggregator(boolean supportRemoval) {
_supportRemoval = supportRemoval;
}

Expand Down Expand Up @@ -73,6 +73,7 @@ public void removeValue(@Nullable Object value) {
}
}

@Nullable
@Override
public Object getCurrentAggregatedValue() {
if (_supportRemoval) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.query.runtime.operator.window.aggregate;

import it.unimi.dsi.fastutil.ints.IntArrayFIFOQueue;
import javax.annotation.Nullable;


/**
* Window value aggregator for MAX window function over INT types.
* Uses primitive {@code int} operations and {@link IntArrayFIFOQueue} to avoid boxing overhead and improve cache
* locality compared to the {@link Comparable}-based aggregator.
*/
public class MaxIntWindowValueAggregator implements WindowValueAggregator<Object> {

private final boolean _supportRemoval;
private final IntArrayFIFOQueue _deque = new IntArrayFIFOQueue();
private Integer _maxValue = null;

/**
* @param supportRemoval whether this window value aggregator should support removal of values. Some cases require
* only addition of values in which case this value aggregator will have O(1) space complexity;
* if {@code supportRemoval} is true, this value aggregator will have O(K) space complexity
* (where K is the max size of the window).
*/
public MaxIntWindowValueAggregator(boolean supportRemoval) {
_supportRemoval = supportRemoval;
}

@Override
public void addValue(@Nullable Object value) {
if (value != null) {
int intValue = ((Number) value).intValue();
if (_supportRemoval) {
// Remove previously added elements if they're < than the current element since they're no longer useful
while (!_deque.isEmpty() && _deque.lastInt() < intValue) {
_deque.dequeueLastInt();
}
_deque.enqueue(intValue);
} else {
if (_maxValue == null || intValue > _maxValue) {
_maxValue = intValue;
}
}
}
}

@Override
public void removeValue(@Nullable Object value) {
if (!_supportRemoval) {
throw new UnsupportedOperationException();
}

if (value != null) {
int intValue = ((Number) value).intValue();
if (!_deque.isEmpty() && _deque.firstInt() == intValue) {
_deque.dequeueInt();
}
}
}

@Nullable
@Override
public Object getCurrentAggregatedValue() {
if (_supportRemoval) {
if (_deque.isEmpty()) {
return null;
}
return _deque.firstInt();
} else {
return _maxValue;
}
}

@Override
public void clear() {
_deque.clear();
_maxValue = null;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.query.runtime.operator.window.aggregate;

import it.unimi.dsi.fastutil.longs.LongArrayFIFOQueue;
import javax.annotation.Nullable;


/**
* Window value aggregator for MAX window function over LONG types.
* Uses primitive {@code long} operations and {@link LongArrayFIFOQueue} to avoid boxing overhead and improve cache
* locality compared to the {@link Comparable}-based aggregator. Also avoids precision loss that occurs when converting
* large long values to double.
*/
public class MaxLongWindowValueAggregator implements WindowValueAggregator<Object> {

private final boolean _supportRemoval;
private final LongArrayFIFOQueue _deque = new LongArrayFIFOQueue();
private Long _maxValue = null;

/**
* @param supportRemoval whether this window value aggregator should support removal of values. Some cases require
* only addition of values in which case this value aggregator will have O(1) space complexity;
* if {@code supportRemoval} is true, this value aggregator will have O(K) space complexity
* (where K is the max size of the window).
*/
public MaxLongWindowValueAggregator(boolean supportRemoval) {
_supportRemoval = supportRemoval;
}

@Override
public void addValue(@Nullable Object value) {
if (value != null) {
long longValue = ((Number) value).longValue();
if (_supportRemoval) {
// Remove previously added elements if they're < than the current element since they're no longer useful
while (!_deque.isEmpty() && _deque.lastLong() < longValue) {
_deque.dequeueLastLong();
}
_deque.enqueue(longValue);
} else {
if (_maxValue == null || longValue > _maxValue) {
_maxValue = longValue;
}
}
}
}

@Override
public void removeValue(@Nullable Object value) {
if (!_supportRemoval) {
throw new UnsupportedOperationException();
}

if (value != null) {
long longValue = ((Number) value).longValue();
if (!_deque.isEmpty() && _deque.firstLong() == longValue) {
_deque.dequeueLong();
}
}
}

@Nullable
@Override
public Object getCurrentAggregatedValue() {
if (_supportRemoval) {
if (_deque.isEmpty()) {
return null;
}
return _deque.firstLong();
} else {
return _maxValue;
}
}

@Override
public void clear() {
_deque.clear();
_maxValue = null;
}
}
Loading
Loading