Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions api/src/main/java/org/apache/iceberg/expressions/Evaluator.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.iceberg.expressions;

import java.io.Serializable;
import java.nio.ByteBuffer;
import java.util.Comparator;
import java.util.Set;
import org.apache.iceberg.StructLike;
Expand Down Expand Up @@ -156,5 +157,17 @@ public <T> Boolean startsWith(Bound<T> valueExpr, Literal<T> lit) {
public <T> Boolean notStartsWith(Bound<T> valueExpr, Literal<T> lit) {
return !startsWith(valueExpr, lit);
}

@Override
public <T> Boolean stIntersects(Bound<T> valueExpr, Literal<ByteBuffer> literal) {
throw new UnsupportedOperationException(
"Evaluation of stIntersects against geometry/geography value is not implemented.");
}

@Override
public <T> Boolean stDisjoint(Bound<T> valueExpr, Literal<ByteBuffer> literal) {
throw new UnsupportedOperationException(
"Evaluation of stDisjoint against geometry/geography value is not implemented.");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ enum Operation {
OR,
STARTS_WITH,
NOT_STARTS_WITH,
ST_INTERSECTS,
ST_DISJOINT,
COUNT,
COUNT_NULL,
COUNT_STAR,
Expand Down Expand Up @@ -91,6 +93,10 @@ public Operation negate() {
return Operation.NOT_STARTS_WITH;
case NOT_STARTS_WITH:
return Operation.STARTS_WITH;
case ST_INTERSECTS:
return Operation.ST_DISJOINT;
case ST_DISJOINT:
return Operation.ST_INTERSECTS;
default:
throw new IllegalArgumentException("No negation for operation: " + this);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.expressions.Literals.BoundingBoxLiteral;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.transforms.Transforms;
Expand Down Expand Up @@ -341,6 +342,8 @@ public <T> Expression predicate(UnboundPredicate<T> pred) {
case NOT_EQ:
case STARTS_WITH:
case NOT_STARTS_WITH:
case ST_INTERSECTS:
case ST_DISJOINT:
return new UnboundPredicate<>(
pred.op(), pred.term(), (T) sanitize(pred.literal(), now, today));
case IN:
Expand Down Expand Up @@ -441,6 +444,10 @@ public <T> String predicate(BoundPredicate<T> pred) {
return term + " STARTS WITH " + value((BoundLiteralPredicate<?>) pred);
case NOT_STARTS_WITH:
return term + " NOT STARTS WITH " + value((BoundLiteralPredicate<?>) pred);
case ST_INTERSECTS:
return "st_intersects(" + term + ", " + value((BoundLiteralPredicate<?>) pred) + ")";
case ST_DISJOINT:
return "st_disjoint(" + term + ", " + value((BoundLiteralPredicate<?>) pred) + ")";
default:
throw new UnsupportedOperationException(
"Cannot sanitize unsupported predicate type: " + pred.op());
Expand Down Expand Up @@ -493,6 +500,10 @@ public <T> String predicate(UnboundPredicate<T> pred) {
return term + " STARTS WITH " + sanitize(pred.literal(), nowMicros, today);
case NOT_STARTS_WITH:
return term + " NOT STARTS WITH " + sanitize(pred.literal(), nowMicros, today);
case ST_INTERSECTS:
return "st_intersects(" + term + ", " + sanitize(pred.literal(), nowMicros, today) + ")";
case ST_DISJOINT:
return "st_disjoint(" + term + ", " + sanitize(pred.literal(), nowMicros, today) + ")";
default:
throw new UnsupportedOperationException(
"Cannot sanitize unsupported predicate type: " + pred.op());
Expand Down Expand Up @@ -552,6 +563,9 @@ private static String sanitize(Type type, Object value, long now, int today) {
case BINARY:
// for boolean, uuid, decimal, fixed, unknown, and binary, match the string result
return sanitizeSimpleString(value.toString());
case GEOMETRY:
case GEOGRAPHY:
return "(bounding-box)";
}
throw new UnsupportedOperationException(
String.format("Cannot sanitize value for unsupported type %s: %s", type, value));
Expand Down Expand Up @@ -579,6 +593,8 @@ private static String sanitize(Literal<?> literal, long now, int today) {
return sanitizeNumber(((Literals.DoubleLiteral) literal).value(), "float");
} else if (literal instanceof Literals.VariantLiteral) {
return sanitizeVariant(((Literals.VariantLiteral) literal).value(), now, today);
} else if (literal instanceof BoundingBoxLiteral) {
return "(bounding-box)";
} else {
// for uuid, decimal, fixed and binary, match the string result
return sanitizeSimpleString(literal.value().toString());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.iceberg.expressions;

import java.nio.ByteBuffer;
import java.util.Set;
import java.util.function.Supplier;
import org.apache.iceberg.exceptions.ValidationException;
Expand Down Expand Up @@ -126,6 +127,16 @@ public <T> R notStartsWith(BoundReference<T> ref, Literal<T> lit) {
"notStartsWith expression is not supported by the visitor");
}

public <T> R stIntersects(BoundReference<T> ref, Literal<ByteBuffer> lit) {
throw new UnsupportedOperationException(
"stIntersects expression is not supported by the visitor");
}

public <T> R stDisjoint(BoundReference<T> ref, Literal<ByteBuffer> lit) {
throw new UnsupportedOperationException(
"stDisjoint expression is not supported by the visitor");
}

/**
* Handle a non-reference value in this visitor.
*
Expand All @@ -141,6 +152,7 @@ public <T> R handleNonReference(Bound<T> term) {
throw new ValidationException("Visitor %s does not support non-reference: %s", this, term);
}

@SuppressWarnings("unchecked")
@Override
public <T> R predicate(BoundPredicate<T> pred) {
if (!(pred.term() instanceof BoundReference)) {
Expand All @@ -166,6 +178,12 @@ public <T> R predicate(BoundPredicate<T> pred) {
return startsWith((BoundReference<T>) pred.term(), literalPred.literal());
case NOT_STARTS_WITH:
return notStartsWith((BoundReference<T>) pred.term(), literalPred.literal());
case ST_INTERSECTS:
return stIntersects(
(BoundReference<T>) pred.term(), (Literal<ByteBuffer>) literalPred.literal());
case ST_DISJOINT:
return stDisjoint(
(BoundReference<T>) pred.term(), (Literal<ByteBuffer>) literalPred.literal());
default:
throw new IllegalStateException(
"Invalid operation for BoundLiteralPredicate: " + pred.op());
Expand Down Expand Up @@ -266,6 +284,14 @@ public <T> R notStartsWith(Bound<T> expr, Literal<T> lit) {
throw new UnsupportedOperationException("Unsupported operation.");
}

public <T> R stIntersects(Bound<T> term, Literal<ByteBuffer> literal) {
throw new UnsupportedOperationException("ST_INTERSECTS is not supported by the visitor");
}

public <T> R stDisjoint(Bound<T> term, Literal<ByteBuffer> literal) {
throw new UnsupportedOperationException("ST_DISJOINT is not supported by the visitor");
}

@Override
public <T> R predicate(BoundPredicate<T> pred) {
if (pred.isLiteralPredicate()) {
Expand All @@ -287,6 +313,10 @@ public <T> R predicate(BoundPredicate<T> pred) {
return startsWith(pred.term(), literalPred.literal());
case NOT_STARTS_WITH:
return notStartsWith(pred.term(), literalPred.literal());
case ST_INTERSECTS:
return stIntersects(pred.term(), (Literal<ByteBuffer>) literalPred.literal());
case ST_DISJOINT:
return stDisjoint(pred.term(), (Literal<ByteBuffer>) literalPred.literal());
default:
throw new IllegalStateException(
"Invalid operation for BoundLiteralPredicate: " + pred.op());
Expand Down Expand Up @@ -318,7 +348,6 @@ public <T> R predicate(BoundPredicate<T> pred) {
"Invalid operation for BoundSetPredicate: " + pred.op());
}
}

throw new IllegalStateException("Unsupported bound predicate: " + pred.getClass().getName());
}

Expand Down Expand Up @@ -465,6 +494,10 @@ public <T> R predicate(BoundPredicate<T> pred) {
return startsWith(pred.term(), literalPred.literal());
case NOT_STARTS_WITH:
return notStartsWith(pred.term(), literalPred.literal());
case ST_INTERSECTS:
return stIntersects(pred.term(), (Literal<ByteBuffer>) literalPred.literal());
case ST_DISJOINT:
return stDisjoint(pred.term(), (Literal<ByteBuffer>) literalPred.literal());
default:
throw new IllegalStateException(
"Invalid operation for BoundLiteralPredicate: " + pred.op());
Expand Down Expand Up @@ -555,6 +588,14 @@ public <T> R startsWith(BoundTerm<T> term, Literal<T> lit) {
public <T> R notStartsWith(BoundTerm<T> term, Literal<T> lit) {
return null;
}

public <T> R stIntersects(BoundTerm<T> term, Literal<ByteBuffer> lit) {
return null;
}

public <T> R stDisjoint(BoundTerm<T> term, Literal<ByteBuffer> lit) {
return null;
}
}

/**
Expand Down
30 changes: 30 additions & 0 deletions api/src/main/java/org/apache/iceberg/expressions/Expressions.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@
*/
package org.apache.iceberg.expressions;

import java.nio.ByteBuffer;
import java.util.stream.Stream;
import org.apache.iceberg.expressions.Expression.Operation;
import org.apache.iceberg.geospatial.BoundingBox;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.transforms.Transform;
Expand Down Expand Up @@ -202,6 +204,24 @@ public static UnboundPredicate<String> notStartsWith(UnboundTerm<String> expr, S
return new UnboundPredicate<>(Expression.Operation.NOT_STARTS_WITH, expr, value);
}

public static UnboundPredicate<ByteBuffer> stIntersects(String name, BoundingBox value) {
return geospatialPredicate(Operation.ST_INTERSECTS, name, value);
}

public static UnboundPredicate<ByteBuffer> stIntersects(
UnboundTerm<ByteBuffer> expr, BoundingBox value) {
return geospatialPredicate(Operation.ST_INTERSECTS, expr, value);
}

public static UnboundPredicate<ByteBuffer> stDisjoint(String name, BoundingBox value) {
return geospatialPredicate(Operation.ST_DISJOINT, name, value);
}

public static UnboundPredicate<ByteBuffer> stDisjoint(
UnboundTerm<ByteBuffer> expr, BoundingBox value) {
return geospatialPredicate(Operation.ST_DISJOINT, expr, value);
}

public static <T> UnboundPredicate<T> in(String name, T... values) {
return predicate(Operation.IN, name, Lists.newArrayList(values));
}
Expand Down Expand Up @@ -280,6 +300,16 @@ public static <T> UnboundPredicate<T> predicate(Operation op, UnboundTerm<T> exp
return new UnboundPredicate<>(op, expr);
}

public static UnboundPredicate<ByteBuffer> geospatialPredicate(
Operation op, String name, BoundingBox value) {
return geospatialPredicate(op, ref(name), value);
}

public static UnboundPredicate<ByteBuffer> geospatialPredicate(
Operation op, UnboundTerm<ByteBuffer> expr, BoundingBox value) {
return new UnboundPredicate<>(op, expr, Literal.of(value));
}

public static True alwaysTrue() {
return True.INSTANCE;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
import org.apache.iceberg.ContentFile;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.Schema;
import org.apache.iceberg.geospatial.BoundingBox;
import org.apache.iceberg.geospatial.GeospatialPredicateEvaluators;
import org.apache.iceberg.transforms.Transform;
import org.apache.iceberg.types.Comparators;
import org.apache.iceberg.types.Conversions;
Expand Down Expand Up @@ -471,6 +473,35 @@ public <T> Boolean notStartsWith(Bound<T> term, Literal<T> lit) {
return ROWS_MIGHT_MATCH;
}

@Override
public <T> Boolean stIntersects(Bound<T> term, Literal<ByteBuffer> lit) {
T lower = lowerBound(term);
T upper = upperBound(term);

if (lower == null || upper == null) {
return ROWS_MIGHT_MATCH;
}

if (lit.value() != null && lower instanceof ByteBuffer && upper instanceof ByteBuffer) {
BoundingBox dataBox = BoundingBox.fromByteBuffers((ByteBuffer) lower, (ByteBuffer) upper);
BoundingBox queryBox = BoundingBox.fromByteBuffer(lit.value());

// If the data box and query box doesn't intersect, no records can match
GeospatialPredicateEvaluators.GeospatialPredicateEvaluator evaluator =
GeospatialPredicateEvaluators.create(term.ref().type());
if (!evaluator.intersects(dataBox, queryBox)) {
return ROWS_CANNOT_MATCH;
}
}

return ROWS_MIGHT_MATCH;
}

@Override
public <T> Boolean stDisjoint(Bound<T> term, Literal<ByteBuffer> lit) {
return ROWS_MIGHT_MATCH;
}

private boolean mayContainNull(Integer id) {
return nullCounts == null || !nullCounts.containsKey(id) || nullCounts.get(id) != 0;
}
Expand Down
5 changes: 5 additions & 0 deletions api/src/main/java/org/apache/iceberg/expressions/Literal.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.nio.ByteBuffer;
import java.util.Comparator;
import java.util.UUID;
import org.apache.iceberg.geospatial.BoundingBox;
import org.apache.iceberg.types.Type;

/**
Expand Down Expand Up @@ -71,6 +72,10 @@ static Literal<BigDecimal> of(BigDecimal value) {
return new Literals.DecimalLiteral(value);
}

static Literal<ByteBuffer> of(BoundingBox value) {
return new Literals.BoundingBoxLiteral(value);
}

/** Returns the value wrapped by this literal. */
T value();

Expand Down
44 changes: 44 additions & 0 deletions api/src/main/java/org/apache/iceberg/expressions/Literals.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.util.Comparator;
import java.util.Objects;
import java.util.UUID;
import org.apache.iceberg.geospatial.BoundingBox;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.io.BaseEncoding;
import org.apache.iceberg.types.Comparators;
Expand Down Expand Up @@ -85,6 +86,8 @@ static <T> Literal<T> from(T value) {
return (Literal<T>) new Literals.DecimalLiteral((BigDecimal) value);
} else if (value instanceof Variant) {
return (Literal<T>) new Literals.VariantLiteral((Variant) value);
} else if (value instanceof BoundingBox) {
return (Literal<T>) new Literals.BoundingBoxLiteral((BoundingBox) value);
}

throw new IllegalArgumentException(
Expand Down Expand Up @@ -719,4 +722,45 @@ public String toString() {
return "X'" + BaseEncoding.base16().encode(bytes) + "'";
}
}

static class BoundingBoxLiteral extends BaseLiteral<ByteBuffer> {
private static final Comparator<ByteBuffer> CMP =
Comparators.<ByteBuffer>nullsFirst().thenComparing(Comparators.unsignedBytes());

BoundingBoxLiteral(BoundingBox value) {
super(value.toByteBuffer());
}

BoundingBoxLiteral(ByteBuffer value) {
super(value);
}

@Override
protected Type.TypeID typeId() {
return null;
}

@Override
public <T> Literal<T> to(Type type) {
if (type.typeId() != Type.TypeID.GEOMETRY && type.typeId() != Type.TypeID.GEOGRAPHY) {
return null;
}

return (Literal<T>) this;
}

@Override
public Comparator<ByteBuffer> comparator() {
return CMP;
}

Object writeReplace() throws ObjectStreamException {
return new SerializationProxies.BoundingBoxLiteralProxy(value());
}

@Override
public String toString() {
return String.valueOf(value());
}
}
}
Loading