Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hive.ql.optimizer.calcite;

import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelOptSchema;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.externalize.RelJsonReader;
import org.apache.calcite.sql.fun.SqlStdOperatorTable;
import org.apache.hadoop.hive.common.classification.InterfaceStability;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.optimizer.calcite.translator.HiveSqlOperatorTable;

import java.io.IOException;
import java.nio.charset.Charset;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Objects;

import static org.apache.calcite.sql.util.SqlOperatorTables.chain;

/**
* Reads a JSON plan and converts it to a Hive relational expression (RelNode).
*/
@InterfaceStability.Evolving
public class HiveRelJsonReader {
private final RelOptCluster cluster;

public HiveRelJsonReader(RelOptCluster cluster) {
this.cluster = Objects.requireNonNull(cluster);
}

public RelNode readFile(Path path) throws IOException {
return readJson(Files.readString(path, Charset.defaultCharset()));
}

public RelNode readJson(String json) throws IOException {
HiveConf conf = cluster.getPlanner().getContext().unwrap(HiveConf.class);
if (conf == null) {
conf = new HiveConf();
}
RelOptSchema schema = HiveRelJsonSchemaReader.read(json, conf, cluster.getTypeFactory());
RelJsonReader reader = new RelJsonReader(
cluster,
schema,
null,
t -> t.withOperatorTable(chain(new HiveSqlOperatorTable(), SqlStdOperatorTable.instance())));
// At the moment we assume that the JSON plan always has a top-level field "CBOPlan"
// that contains the actual plan that can be handled by RelJsonReader.
return reader.read(new ObjectMapper().readTree(json).get("CBOPlan").toString());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hive.ql.optimizer.calcite;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.JsonNodeType;
import org.apache.calcite.plan.RelOptPlanner;
import org.apache.calcite.plan.RelOptSchema;
import org.apache.calcite.plan.RelOptTable;
import org.apache.calcite.rel.externalize.RelJson;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.hadoop.hive.common.classification.InterfaceStability;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.metadata.Table;
import org.apache.hadoop.hive.ql.metadata.TableConstraintsInfo;
import org.apache.hadoop.hive.ql.parse.QueryTables;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;

/**
* Reads a JSON plan and converts it to a RelOptSchema by finding all tables
* that are referenced in the plan. The schema is created exclusively from the
* JSON input so any information that is not there will not be available.
*/
@InterfaceStability.Evolving
public final class HiveRelJsonSchemaReader {

private HiveRelJsonSchemaReader() {
throw new IllegalStateException("Utility class");
}
/**
* Reads the schema from the JSON input using the specified configuration and type factory.
*/
public static RelOptSchema read(String jsonInput, HiveConf conf, RelDataTypeFactory typeFactory) throws IOException {
JsonNode node = new ObjectMapper().readTree(jsonInput);
Map<List<String>, TableInfo> tables = new HashMap<>();
for (JsonNode scan : node.findParents("table")) {
List<String> names = new ArrayList<>();
for (JsonNode n : scan.get("table")) {
names.add(n.asText());
}
RelDataType type = readType(typeFactory, scan.get("rowType"));
JsonNode rowNode = scan.get("rowCount");
double rowCount = rowNode != null ? rowNode.asDouble() : 100.0;
tables.put(names, new TableInfo(type, rowCount));
}
return new MapRelOptSchema(conf, typeFactory, tables);
}

private static RelDataType readType(RelDataTypeFactory typeFactory, JsonNode typeNode) throws IOException {
ObjectMapper typeMapper = new ObjectMapper();
Object value;
if (typeNode.getNodeType() == JsonNodeType.OBJECT) {
value = typeMapper.treeToValue(typeNode, Map.class);
} else if (typeNode.getNodeType() == JsonNodeType.ARRAY) {
value = typeMapper.treeToValue(typeNode, List.class);
} else {
throw new IllegalStateException();
}
return RelJson.create().toType(typeFactory, value);
}

private static class TableInfo {
private final RelDataType rowType;
private final double rowCount;

TableInfo(RelDataType rowType, double rowCount) {
this.rowType = rowType;
this.rowCount = rowCount;
}
}

private static final class MapRelOptSchema implements RelOptSchema {
private final HiveConf conf;
private final RelDataTypeFactory typeFactory;
private final Map<List<String>, TableInfo> tables;

MapRelOptSchema(HiveConf conf, RelDataTypeFactory typeFactory, Map<List<String>, TableInfo> tables) {
this.conf = conf;
this.typeFactory = typeFactory;
this.tables = tables;
}

@Override
public RelOptTable getTableForMember(List<String> names) {
TableInfo tableInfo = tables.get(names);
if (tableInfo == null) {
return null;
}
org.apache.hadoop.hive.metastore.api.Table mTable = new org.apache.hadoop.hive.metastore.api.Table();
mTable.setDbName(names.get(0));
mTable.setTableName(names.get(1));
Table metadataTable = new Table(mTable);
// Set info constraints as empty since we can't extract anything from JSON.
// and we want to avoid lookups in the metastore that will anyways fail
// since tables are not expected to exist.
metadataTable.setTableConstraintsInfo(new TableConstraintsInfo());
RelOptHiveTable optTable = new RelOptHiveTable(
this,
typeFactory,
names,
tableInfo.rowType,
metadataTable,
new ArrayList<>(),
new ArrayList<>(),
new ArrayList<>(),
conf,
new QueryTables(true),
new HashMap<>(),
new HashMap<>(),
new AtomicInteger());
optTable.setRowCount(tableInfo.rowCount);
return optTable;
}

@Override
public RelDataTypeFactory getTypeFactory() {
return typeFactory;
}

@Override
public void registerRules(RelOptPlanner planner) {
// No need to register any rules in the planner
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hive.ql.optimizer.calcite;

import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rex.RexBuilder;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.hadoop.hive.ql.parse.type.RexNodeExprFactory;

/**
* Factory for row expressions created from JSON files.
*/
class HiveRexJsonBuilder extends RexBuilder {
HiveRexJsonBuilder() {
super(new HiveTypeFactory());
}

@Override
public RexNode makeLiteral(Object value, RelDataType type, boolean allowCast, boolean trim) {
// VARCHAR will always come back as CHAR if we don't allow a cast
allowCast = SqlTypeName.VARCHAR.equals(type.getSqlTypeName()) || allowCast;
if (SqlTypeName.CHAR_TYPES.contains(type.getSqlTypeName())) {
value = RexNodeExprFactory.makeHiveUnicodeString((String) value);
}
return super.makeLiteral(value, type, allowCast, trim);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@
import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.calcite.util.ConversionUtil;
import org.checkerframework.checker.nullness.qual.Nullable;

import java.nio.charset.Charset;
import java.util.List;

public class HiveTypeFactory extends JavaTypeFactoryImpl {
Expand Down Expand Up @@ -51,4 +53,9 @@ public HiveTypeFactory() {
}
return null;
}

@Override
public Charset getDefaultCharset() {
return Charset.forName(ConversionUtil.NATIVE_UTF16_CHARSET_NAME);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.calcite.linq4j.Ord;
import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelTraitSet;
import org.apache.calcite.rel.RelInput;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.RelShuttle;
import org.apache.calcite.rel.core.Aggregate;
Expand All @@ -32,12 +33,15 @@
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.calcite.rel.type.RelDataTypeField;
import org.apache.calcite.sql.SqlAggFunction;
import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.calcite.sql.type.SqlTypeUtil;
import org.apache.calcite.util.ImmutableBitSet;
import org.apache.hadoop.hive.ql.optimizer.calcite.HiveRelShuttle;
import org.apache.hadoop.hive.ql.optimizer.calcite.TraitsUtil;

import com.google.common.collect.Sets;
import org.apache.hadoop.hive.ql.optimizer.calcite.translator.SqlFunctionConverter;

public class HiveAggregate extends Aggregate implements HiveRelNode {

Expand All @@ -51,6 +55,30 @@ public HiveAggregate(RelOptCluster cluster, RelTraitSet traitSet, RelNode child,
groupSet, groupSets, aggCalls);
}

public HiveAggregate(RelInput input) {
this(
input.getCluster(),
input.getTraitSet(),
input.getInput(),
input.getBitSet("group"),
input.getBitSetList("groups"),
input.getAggregateCalls("aggs")
.stream()
.map(call -> HiveAggregate.replaceAggFunction(input.getInput().getRowType(), call))
.toList());
}

private static AggregateCall replaceAggFunction(RelDataType rowType, AggregateCall aggCall) {
// Fix the return type of the agg function
SqlAggFunction aggFunction = SqlFunctionConverter.getCalciteAggFn(
aggCall.getAggregation().getName(),
SqlTypeUtil.projectTypes(rowType, aggCall.getArgList()),
aggCall.getType());
return AggregateCall.create(aggFunction, aggCall.isDistinct(), aggCall.isApproximate(), aggCall.ignoreNulls(),
aggCall.getArgList(), aggCall.filterArg, aggCall.distinctKeys, aggCall.getCollation(), aggCall.getType(),
aggCall.getName());
}

@Override
public Aggregate copy(RelTraitSet traitSet, RelNode input,
ImmutableBitSet groupSet,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.google.common.collect.Sets;
import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelTraitSet;
import org.apache.calcite.rel.RelInput;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.core.Join;
import org.apache.calcite.rel.core.JoinRelType;
Expand Down Expand Up @@ -68,6 +69,15 @@ protected HiveAntiJoin(RelOptCluster cluster,
this.getCondition(), joinKeyExprs, filterNulls, null);
}

public HiveAntiJoin(RelInput input) throws CalciteSemanticException {
this(
input.getCluster(),
input.getTraitSet(),
input.getInputs().get(0),
input.getInputs().get(1),
input.getExpression("condition"));
}

public RexNode getJoinFilter() {
return joinFilter;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelTraitSet;
import org.apache.calcite.rel.RelInput;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.RelShuttle;
import org.apache.calcite.rel.core.Filter;
Expand Down Expand Up @@ -71,6 +72,10 @@ public HiveFilter(RelOptCluster cluster, RelTraitSet traits, RelNode child, RexN
this.correlationInfos = new CorrelationInfoSupplier(getCondition());
}

public HiveFilter(RelInput input) {
this(input.getCluster(), input.getTraitSet(), input.getInput(), input.getExpression("condition"));
}

@Override
public Filter copy(RelTraitSet traitSet, RelNode input, RexNode condition) {
assert traitSet.containsIfApplicable(HiveRelNode.CONVENTION);
Expand Down
Loading