Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@
*/
class FormulaChunkedOperator implements IterativeChunkedAggregationOperator {

private final GroupByChunkedOperator groupBy;
private final boolean delegateToBy;
private GroupByChunkedOperator groupBy;
private boolean delegateToBy;
private final String[] inputColumnNames;
private final String[] resultColumnNames;

Expand Down Expand Up @@ -494,4 +494,9 @@ private boolean[] makeObjectOrModifiedColumnsMask(@NotNull final ModifiedColumnS
}
return columnsMask;
}

public void updateGroupBy(GroupByChunkedOperator groupBy, boolean delegateToBy) {
this.groupBy = groupBy;
this.delegateToBy = delegateToBy;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import io.deephaven.engine.table.impl.sources.ArrayBackedColumnSource;
import io.deephaven.util.SafeCloseable;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

import java.util.Arrays;
import java.util.HashMap;
Expand All @@ -34,11 +35,15 @@ class FormulaMultiColumnChunkedOperator implements IterativeChunkedAggregationOp

private final QueryTable inputTable;

private final GroupByChunkedOperator groupBy;
private final boolean delegateToBy;
private GroupByOperator groupBy;
private boolean delegateToBy;
private final SelectColumn selectColumn;
private final WritableColumnSource<?> resultColumn;
private final String[] inputKeyColumns;
@Nullable
private final ColumnSource<Integer> formulaDepthSource;
@Nullable
private final Map<String, String> renames;

private ChunkSource<Values> formulaDataSource;

Expand All @@ -60,18 +65,24 @@ class FormulaMultiColumnChunkedOperator implements IterativeChunkedAggregationOp
* be false if {@code groupBy} is updated by the helper, or if this is not the first operator sharing
* {@code groupBy}.
* @param selectColumn The formula column that will produce the results
* @param renames a map from input names in the groupBy operator (i.e. mangled names) to input column names in the
* formula
*/
FormulaMultiColumnChunkedOperator(
@NotNull final QueryTable inputTable,
@NotNull final GroupByChunkedOperator groupBy,
@NotNull final GroupByOperator groupBy,
final boolean delegateToBy,
@NotNull final SelectColumn selectColumn,
@NotNull final String[] inputKeyColumns) {
@NotNull final String[] inputKeyColumns,
@Nullable Map<String, String> renames,
@Nullable final ColumnSource<Integer> formulaDepthSource) {
this.inputTable = inputTable;
this.groupBy = groupBy;
this.delegateToBy = delegateToBy;
this.selectColumn = selectColumn;
this.inputKeyColumns = inputKeyColumns;
this.renames = renames;
this.formulaDepthSource = formulaDepthSource;

resultColumn = ArrayBackedColumnSource.getMemoryColumnSource(
0, selectColumn.getReturnedType(), selectColumn.getReturnedComponentType());
Expand Down Expand Up @@ -199,7 +210,7 @@ public boolean modifyRowKeys(final SingletonContext context,

@Override
public boolean requiresRowKeys() {
return delegateToBy;
return delegateToBy && groupBy.requiresRowKeys();
}

@Override
Expand All @@ -222,13 +233,16 @@ public void propagateInitialState(@NotNull final QueryTable resultTable, int sta
}

final Map<String, ColumnSource<?>> sourceColumns;
if (inputKeyColumns.length == 0) {
if (inputKeyColumns.length == 0 && formulaDepthSource == null && renames == null) {
// noinspection unchecked
sourceColumns = (Map<String, ColumnSource<?>>) groupBy.getInputResultColumns();
} else {
final Map<String, ColumnSource<?>> columnSourceMap = resultTable.getColumnSourceMap();
sourceColumns = new HashMap<>(groupBy.getInputResultColumns());
sourceColumns = new HashMap<>(groupBy.getInputResultColumns().size() + 1);
groupBy.getInputResultColumns()
.forEach((k, v) -> sourceColumns.put(renames == null ? k : renames.get(k), v));
Arrays.stream(inputKeyColumns).forEach(col -> sourceColumns.put(col, columnSourceMap.get(col)));
sourceColumns.put(AggregationProcessor.ROLLUP_FORMULA_DEPTH.name(), formulaDepthSource);
}
selectColumn.initInputs(resultTable.getRowSet(), sourceColumns);
formulaDataSource = selectColumn.getDataView();
Expand Down Expand Up @@ -263,8 +277,7 @@ public UnaryOperator<ModifiedColumnSet> initializeRefreshing(@NotNull final Quer
final String[] inputColumnNames = selectColumn.getColumns().toArray(String[]::new);
final ModifiedColumnSet inputMCS = inputTable.newModifiedColumnSet(inputColumnNames);
return inputToResultModifiedColumnSetFactory = input -> {
if (groupBy.getSomeKeyHasAddsOrRemoves() ||
(groupBy.getSomeKeyHasModifies() && input.containsAny(inputMCS))) {
if (groupBy.hasModifications(input.containsAny(inputMCS))) {
return resultMCS;
}
return ModifiedColumnSet.EMPTY;
Expand Down Expand Up @@ -408,4 +421,9 @@ public void close() {
private static long calculateContainingBlockLastKey(final long firstKey) {
return (firstKey / BLOCK_SIZE) * BLOCK_SIZE + BLOCK_SIZE - 1;
}

public void updateGroupBy(GroupByOperator groupBy, boolean delegateToBy) {
this.groupBy = groupBy;
this.delegateToBy = delegateToBy;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
//
package io.deephaven.engine.table.impl.by;

import io.deephaven.api.Pair;
import io.deephaven.base.verify.Assert;
import io.deephaven.chunk.attributes.ChunkLengths;
import io.deephaven.chunk.attributes.ChunkPositions;
Expand All @@ -22,9 +23,7 @@
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

import java.util.Arrays;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.*;
import java.util.function.UnaryOperator;

import static io.deephaven.engine.table.impl.sources.ArrayBackedColumnSource.BLOCK_SIZE;
Expand All @@ -33,7 +32,7 @@
* An {@link IterativeChunkedAggregationOperator} used in the implementation of {@link Table#groupBy},
* {@link io.deephaven.api.agg.spec.AggSpecGroup}, and {@link io.deephaven.api.agg.Aggregation#AggGroup(String...)}.
*/
public final class GroupByChunkedOperator implements IterativeChunkedAggregationOperator {
public final class GroupByChunkedOperator implements GroupByOperator {

private final QueryTable inputTable;
private final boolean registeredWithHelper;
Expand All @@ -44,8 +43,8 @@ public final class GroupByChunkedOperator implements IterativeChunkedAggregation
private final ObjectArraySource<Object> addedBuilders;
private final ObjectArraySource<Object> removedBuilders;

private final String[] inputColumnNames;
private final Map<String, AggregateColumnSource<?, ?>> inputAggregatedColumns;
private final String[] inputColumnNamesForResults;
private final Map<String, AggregateColumnSource<?, ?>> resultAggregatedColumns;
private final ModifiedColumnSet aggregationInputsModifiedColumnSet;

Expand All @@ -56,34 +55,55 @@ public final class GroupByChunkedOperator implements IterativeChunkedAggregation
private boolean someKeyHasModifies;
private boolean initialized;

private MatchPair[] aggregatedColumnPairs;
private List<String> hiddenResults;

/**
*
* @param inputTable the table we are aggregating
* @param registeredWithHelper true if we are registered with the helper (meaning we independently produce result
* columns), false otherwise. For a normal AggGroup this is true; for a group-by that is only part of an
* AggFormula this is false.
* @param exposeRowSetsAs the name of the column to expose the rowsets for each group as
* @param hiddenResults a list (possibly empty) of columns that are not exposed to the helper
* @param aggregatedColumnPairs the list of input and output columns for this operation
*/
public GroupByChunkedOperator(
@NotNull final QueryTable inputTable,
final boolean registeredWithHelper,
@Nullable final String exposeRowSetsAs,
@Nullable final List<String> hiddenResults,
@NotNull final MatchPair... aggregatedColumnPairs) {
this.inputTable = inputTable;
this.registeredWithHelper = registeredWithHelper;
this.exposeRowSetsAs = exposeRowSetsAs;
this.hiddenResults = hiddenResults;
this.aggregatedColumnPairs = aggregatedColumnPairs;

live = inputTable.isRefreshing();
rowSets = new ObjectArraySource<>(WritableRowSet.class);
addedBuilders = new ObjectArraySource<>(Object.class);

inputAggregatedColumns = new LinkedHashMap<>(aggregatedColumnPairs.length);
resultAggregatedColumns = new LinkedHashMap<>(aggregatedColumnPairs.length);
final List<String> inputResultNameList = new ArrayList<>(aggregatedColumnPairs.length);
Arrays.stream(aggregatedColumnPairs).forEach(pair -> {
final AggregateColumnSource<?, ?> aggregateColumnSource =
AggregateColumnSource.make(inputTable.getColumnSource(pair.rightColumn()), rowSets);
inputAggregatedColumns.put(pair.rightColumn(), aggregateColumnSource);
resultAggregatedColumns.put(pair.leftColumn(), aggregateColumnSource);
if (hiddenResults == null || !hiddenResults.contains(pair.output().name())) {
resultAggregatedColumns.put(pair.output().name(), aggregateColumnSource);
inputResultNameList.add(pair.input().name());
}
});
inputColumnNamesForResults = inputResultNameList.toArray(String[]::new);

if (exposeRowSetsAs != null && resultAggregatedColumns.containsKey(exposeRowSetsAs)) {
throw new IllegalArgumentException(String.format(
"Exposing group RowSets as %s, but this conflicts with a requested grouped output column name",
exposeRowSetsAs));
}
inputColumnNames = MatchPair.getRightColumns(aggregatedColumnPairs);
final String[] inputColumnNames = MatchPair.getRightColumns(aggregatedColumnPairs);
if (live) {
aggregationInputsModifiedColumnSet = inputTable.newModifiedColumnSet(inputColumnNames);
removedBuilders = new ObjectArraySource<>(Object.class);
Expand Down Expand Up @@ -392,9 +412,7 @@ public void ensureCapacity(final long tableSize) {
return resultAggregatedColumns;
}

/**
* Get a map from input column names to the corresponding output {@link ColumnSource}.
*/
@Override
public Map<String, ? extends ColumnSource<?>> getInputResultColumns() {
return inputAggregatedColumns;
}
Expand All @@ -416,6 +434,7 @@ public UnaryOperator<ModifiedColumnSet> initializeRefreshing(
initializeNewRowSetPreviousValues(resultTable.getRowSet());
return registeredWithHelper
? new InputToResultModifiedColumnSetFactory(resultTable,
inputColumnNamesForResults,
resultAggregatedColumns.keySet().toArray(String[]::new))
: null;
}
Expand All @@ -430,7 +449,7 @@ public UnaryOperator<ModifiedColumnSet> initializeRefreshing(
UnaryOperator<ModifiedColumnSet> makeInputToResultModifiedColumnSetFactory(
@NotNull final QueryTable resultTable,
@NotNull final String[] resultColumnNames) {
return new InputToResultModifiedColumnSetFactory(resultTable, resultColumnNames);
return new InputToResultModifiedColumnSetFactory(resultTable, inputColumnNamesForResults, resultColumnNames);
}

private class InputToResultModifiedColumnSetFactory implements UnaryOperator<ModifiedColumnSet> {
Expand All @@ -441,6 +460,7 @@ private class InputToResultModifiedColumnSetFactory implements UnaryOperator<Mod

private InputToResultModifiedColumnSetFactory(
@NotNull final QueryTable resultTable,
@NotNull final String[] inputColumnNames,
@NotNull final String[] resultAggregatedColumnNames) {
updateModifiedColumnSet = new ModifiedColumnSet(resultTable.getModifiedColumnSetForUpdates());

Expand Down Expand Up @@ -635,4 +655,86 @@ boolean getSomeKeyHasAddsOrRemoves() {
boolean getSomeKeyHasModifies() {
return someKeyHasModifies;
}

@Override
public boolean hasModifications(boolean columnsModified) {
return getSomeKeyHasAddsOrRemoves() || (getSomeKeyHasModifies() && columnsModified);
}

public String getExposedRowSetsAs() {
return exposeRowSetsAs;
}

public MatchPair[] getAggregatedColumnPairs() {
return aggregatedColumnPairs;
}

public List<String> getHiddenResults() {
return hiddenResults;
}

private class ResultExtractor implements IterativeChunkedAggregationOperator {
final Map<String, ? extends ColumnSource<?>> resultColumns;
final String[] inputColumnNames;

private ResultExtractor(Map<String, ? extends ColumnSource<?>> resultColumns, String[] inputColumnNames) {
this.resultColumns = resultColumns;
this.inputColumnNames = inputColumnNames;
}

@Override
public Map<String, ? extends ColumnSource<?>> getResultColumns() {
return resultColumns;
}

@Override
public void addChunk(BucketedContext context, Chunk<? extends Values> values,
LongChunk<? extends RowKeys> inputRowKeys, IntChunk<RowKeys> destinations,
IntChunk<ChunkPositions> startPositions, IntChunk<ChunkLengths> length,
WritableBooleanChunk<Values> stateModified) {}

@Override
public void removeChunk(BucketedContext context, Chunk<? extends Values> values,
LongChunk<? extends RowKeys> inputRowKeys, IntChunk<RowKeys> destinations,
IntChunk<ChunkPositions> startPositions, IntChunk<ChunkLengths> length,
WritableBooleanChunk<Values> stateModified) {}

@Override
public boolean addChunk(SingletonContext context, int chunkSize, Chunk<? extends Values> values,
LongChunk<? extends RowKeys> inputRowKeys, long destination) {
return false;
}

@Override
public boolean removeChunk(SingletonContext context, int chunkSize, Chunk<? extends Values> values,
LongChunk<? extends RowKeys> inputRowKeys, long destination) {
return false;
}

@Override
public void ensureCapacity(long tableSize) {}

@Override
public void startTrackingPrevValues() {}

@Override
public UnaryOperator<ModifiedColumnSet> initializeRefreshing(@NotNull QueryTable resultTable,
@NotNull LivenessReferent aggregationUpdateListener) {
return new InputToResultModifiedColumnSetFactory(resultTable,
inputColumnNames,
resultColumns.keySet().toArray(String[]::new));
}
}

@NotNull
public IterativeChunkedAggregationOperator resultExtractor(List<Pair> resultPairs) {
final List<String> inputColumnNamesList = new ArrayList<>(resultPairs.size());
final Map<String, ColumnSource<?>> resultColumns = new LinkedHashMap<>(resultPairs.size());
for (final Pair pair : resultPairs) {
final String inputName = pair.input().name();
inputColumnNamesList.add(inputName);
resultColumns.put(pair.output().name(), inputAggregatedColumns.get(inputName));
}
return new ResultExtractor(resultColumns, inputColumnNamesList.toArray(String[]::new));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
//
// Copyright (c) 2016-2025 Deephaven Data Labs and Patent Pending
//
package io.deephaven.engine.table.impl.by;

import io.deephaven.engine.table.ColumnSource;

import java.util.Map;

public interface GroupByOperator extends IterativeChunkedAggregationOperator {
/**
* Get a map from input column names to the corresponding output {@link ColumnSource}.
*/
Map<String, ? extends ColumnSource<?>> getInputResultColumns();

/**
* Determine whether to propagate changes when input columns have been modified.
*
* @param columnsModified have any of the input columns been modified (as per the MCS)?
* @return true if we have modified our output (e.g., because of additions or modifications).
*/
boolean hasModifications(final boolean columnsModified);
}
Loading
Loading