Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@
*/
package org.apache.spark.sql.catalyst.util;

import org.apache.spark.sql.catalyst.util.geo.GeometryModel;
import org.apache.spark.sql.catalyst.util.geo.WkbReader;
import org.apache.spark.sql.catalyst.util.geo.WkbWriter;
import org.apache.spark.unsafe.types.GeographyVal;

import java.nio.ByteBuffer;
Expand Down Expand Up @@ -77,6 +80,9 @@ public Geography copy() {

// Returns a Geography object with the specified SRID value by parsing the input WKB.
public static Geography fromWkb(byte[] wkb, int srid) {
WkbReader reader = new WkbReader(true);
reader.read(wkb); // Validate WKB with geography coordinate bounds.

byte[] bytes = new byte[HEADER_SIZE + wkb.length];
ByteBuffer.wrap(bytes).order(DEFAULT_ENDIANNESS).putInt(srid);
System.arraycopy(wkb, 0, bytes, WKB_OFFSET, wkb.length);
Expand Down Expand Up @@ -118,19 +124,20 @@ public static Geography fromEwkt(byte[] ewkt) {

@Override
public byte[] toWkb() {
// This method returns only the WKB portion of the in-memory Geography representation.
// Note that the header is skipped, and that the WKB is returned as-is (little-endian).
return Arrays.copyOfRange(getBytes(), WKB_OFFSET, getBytes().length);
return toWkbInternal(DEFAULT_ENDIANNESS);
}

@Override
public byte[] toWkb(ByteOrder endianness) {
// The default endianness is Little Endian (NDR).
if (endianness == DEFAULT_ENDIANNESS) {
return toWkb();
} else {
throw new UnsupportedOperationException("Geography WKB endianness is not yet supported.");
}
return toWkbInternal(endianness);
}

private byte[] toWkbInternal(ByteOrder endianness) {
WkbReader reader = new WkbReader(true);
GeometryModel model = reader.read(Arrays.copyOfRange(
getBytes(), WKB_OFFSET, getBytes().length));
WkbWriter writer = new WkbWriter();
return writer.write(model, endianness);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,29 +25,60 @@
import java.util.List;

/**
* Reader for parsing Well-Known Binary (WKB) format geometries.
* Reader for parsing Well-Known Binary (WKB) format geometries and geographies.
* This class implements the OGC Simple Features specification for WKB parsing.
* For geographies, coordinate bounds validation is enforced:
* - X (longitude) must be between -180 and 180 (inclusive),
* - Y (latitude) must be between -90 and 90 (inclusive).
* This class is not thread-safe. Create a new instance for each thread.
* This class should be catalyst-internal.
*/
public class WkbReader {
private ByteBuffer buffer;
private final int validationLevel;
private final boolean isGeography;
private byte[] currentWkb;

// Geography coordinate bounds.
private static final double MIN_LONGITUDE = -180.0;
private static final double MAX_LONGITUDE = 180.0;
private static final double MIN_LATITUDE = -90.0;
private static final double MAX_LATITUDE = 90.0;
// Default WKB reader settings.
private static final int DEFAULT_VALIDATION_LEVEL = 1; // basic validation

/**
* Constructor for WkbReader with default validation level (1 = basic validation).
* Constructor for WkbReader with default validation level (1 = basic validation)
* and geometry mode (no geography coordinate bounds checking).
*/
public WkbReader() {
this(1);
this(DEFAULT_VALIDATION_LEVEL, false);
}

/**
* Constructor for WkbReader with specified validation level.
* Constructor for WkbReader with specified validation level and geometry mode.
* @param validationLevel validation level (0 = no validation, 1 = basic validation)
*/
public WkbReader(int validationLevel) {
this(validationLevel, false);
}

/**
* Constructor for WkbReader with default validation level and geography mode.
* @param isGeography if true, validates geography coordinate bounds for longitude and latitude
*/
public WkbReader(boolean isGeography) {
this(DEFAULT_VALIDATION_LEVEL, isGeography);
}

/**
* Constructor for WkbReader with specified validation level and geography mode.
* @param validationLevel validation level (0 = no validation, 1 = basic validation)
* @param isGeography if true, validates geography coordinate bounds for longitude and latitude
*/
public WkbReader(int validationLevel, boolean isGeography) {
this.validationLevel = validationLevel;
this.isGeography = isGeography;
}

// ========== Coordinate Validation Helpers ==========
Expand All @@ -69,6 +100,32 @@ private static boolean isValidCoordinateAllowEmpty(double value) {
return Double.isFinite(value) || Double.isNaN(value);
}

/**
* Returns true if the longitude value is within valid geography bounds [-180, 180].
*/
private static boolean isValidLongitude(double value) {
return value >= MIN_LONGITUDE && value <= MAX_LONGITUDE;
}

/**
* Returns true if the latitude value is within valid geography bounds [-90, 90].
*/
private static boolean isValidLatitude(double value) {
return value >= MIN_LATITUDE && value <= MAX_LATITUDE;
}

/**
* Validates geography coordinate bounds for a point. In geography mode with validation
* level > 0, longitude must be between -180 and 180, and latitude must be between -90 and 90.
*/
private void validateGeographyBounds(Point point, long pos) {
if (isGeography && validationLevel > 0 && !point.isEmpty()) {
if (!isValidLongitude(point.getX()) || !isValidLatitude(point.getY())) {
throw new WkbParseException("Invalid coordinate value found", pos, currentWkb);
}
}
}

/**
* Reads a geometry from WKB bytes.
*/
Expand Down Expand Up @@ -301,11 +358,14 @@ private GeometryModel readGeometryData(
* Reads a top-level point geometry (allows empty points with NaN coordinates).
*/
private Point readPoint(int srid, int dimensionCount, boolean hasZ, boolean hasM) {
long coordsStartPos = buffer.position();
double[] coords = new double[dimensionCount];
for (int i = 0; i < dimensionCount; i++) {
coords[i] = readDoubleAllowEmpty();
}
return new Point(coords, srid, hasZ, hasM);
Point point = new Point(coords, srid, hasZ, hasM);
validateGeographyBounds(point, coordsStartPos);
return point;
}

/**
Expand All @@ -314,11 +374,14 @@ private Point readPoint(int srid, int dimensionCount, boolean hasZ, boolean hasM
*/
private Point readInternalPoint(int srid, int dimensionCount, boolean hasZ,
boolean hasM) {
long coordsStartPos = buffer.position();
double[] coords = new double[dimensionCount];
for (int i = 0; i < dimensionCount; i++) {
coords[i] = readDoubleNoEmpty();
}
return new Point(coords, srid, hasZ, hasM);
Point point = new Point(coords, srid, hasZ, hasM);
validateGeographyBounds(point, coordsStartPos);
return point;
}

private LineString readLineString(int srid, int dimensionCount, boolean hasZ, boolean hasM) {
Expand Down
Loading