Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,21 @@ public IteratorResultSet(String query,
this.useInternalLimit = !query.contains("LIMIT");
}

public IteratorResultSet(String query,
ConnectionManager connectionManager,
long limit,
long startOffset,
Function<ResultSet, T> parsingFn,
Function<T, Boolean> predicateValidator
) {
this.query = query;
this.limit = limit;
this.parsingFn = parsingFn;
this.connectionManager = connectionManager;
this.predicateValidator = predicateValidator;
this.useInternalLimit = !query.contains("LIMIT");
}

@Override
public boolean hasNext() {
if(!records.isEmpty()) {
Expand Down Expand Up @@ -91,9 +106,10 @@ public T next() {

private void makeRequest() {
long countRes = 0;
String nextQuery="";
try (Connection connection = connectionManager.getConnection()) {
try(Statement statement = connection.createStatement()) {
String nextQuery = getQuery();
nextQuery = getQuery();
if(logger.isDebugEnabled()) {
logger.debug(nextQuery);
}
Expand All @@ -112,7 +128,7 @@ private void makeRequest() {
ended = true;
}
} catch (SQLException e) {
throw new RuntimeException(e);
logger.error("Cannot Execute the request {}",query);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,14 @@ public String createTableQuery() {
"data JSONB)";
}

public String createSpatialIndex() {
return "CREATE INDEX IF NOT EXISTS "+this.getStoreTableName()+"_features_geometry ON "+this.getStoreTableName()+" USING GIST ("+GEOMETRY+");";
}

public String createParentIdIndex() {
return "CREATE INDEX IF NOT EXISTS "+this.getStoreTableName()+"_features_parentId ON "+this.getStoreTableName()+" (parentId);";
}

public String insertFeatureQuery() {
return "INSERT INTO "+this.getStoreTableName()+" (parentId,"+GEOMETRY+", "+VALID_TIME+", data) VALUES (?,?,?,?)";
}
Expand Down Expand Up @@ -123,6 +131,10 @@ public String createValidTimeIndexQuery() {
return "CREATE INDEX IF NOT EXISTS "+this.getStoreTableName()+"_feature_valid_time_0_idx ON "+this.getStoreTableName()+ " using GIST (validTime)";
}

public String createLowerValidTimeIndexQuery() {
return "CREATE INDEX IF NOT EXISTS "+this.getStoreTableName()+"_feature_valid_time_lower_desc_idx ON "+this.getStoreTableName()+ " ((lower(validTime)) DESC, id DESC)";
}

public String createIdIndexQuery() {
return "CREATE INDEX IF NOT EXISTS "+this.getStoreTableName()+"_feature_id_idx ON "+this.getStoreTableName()+" (id)";
}
Expand All @@ -132,11 +144,7 @@ public String createTrigramExtensionQuery() {
}

public String createTrigramDescriptionFullTextIndexQuery() {
return "CREATE INDEX IF NOT EXISTS "+this.getStoreTableName()+"_feature_desc_full_text_datastream_idx ON "+this.getStoreTableName()+" USING GIN ((data->'properties'->>'description') gin_trgm_ops)";
}

public String createTrigramUidFullTextIndexQuery() {
return "CREATE INDEX IF NOT EXISTS "+this.getStoreTableName()+"_feature_uid_full_text_datastream_idx ON "+this.getStoreTableName()+" USING GIN ((data->'properties'->>'uid') gin_trgm_ops)";
return "CREATE INDEX IF NOT EXISTS "+this.getStoreTableName()+"_feature_desc_full_text_datastream_idx ON "+this.getStoreTableName()+" USING GIN ((data::text) gin_trgm_ops)";
}

public abstract String createSelectEntriesQuery(F filter, Set<VF> fields);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,28 +38,26 @@ public String createTableQuery() {
}

public String createIndexQuery() {
return "CREATE INDEX IF NOT EXISTS " + this.getStoreTableName() + "_data_idx on " + this.getStoreTableName() + " USING GIN(data)";
return "CREATE INDEX IF NOT EXISTS " + this.getStoreTableName() + "_data_idx on " + this.getStoreTableName() + " USING GIN(data jsonb_ops)";
}

public String createUniqueIndexQuery() {
return "CREATE UNIQUE INDEX IF NOT EXISTS " + this.getStoreTableName() + "_data_output_idx ON " + this.getStoreTableName()
+ " USING BTREE((data->'name'), (data->'system@id'), (data->'validTime'))";
}

public String createDateRangeIndexQuery() {
return "CREATE INDEX IF NOT EXISTS "+this.getStoreTableName()+"_date_range_idx ON " + this.getStoreTableName() +
" USING gist (int8range( " +
"(data->'validTime'->'begin')::bigint, " +
"(data->'validTime'->'end')::bigint" +
") )";
public String createImmutubleFunctionForValidTime() {
return "CREATE OR REPLACE FUNCTION parse_utc_timestamp(txt text) " +
"RETURNS timestamp " +
"IMMUTABLE " +
"LANGUAGE sql " +
"AS $$ " +
" SELECT (txt::timestamptz AT TIME ZONE 'UTC')::timestamp " +
"$$;";
}

public String createValidTimeBeginIndexQuery() {
return "CREATE INDEX IF NOT EXISTS "+this.getStoreTableName()+"_date_range_begin_idx ON " + this.getStoreTableName() + " USING GIN((data->'validTime'->'begin'))";
}

public String createValidTimeEndIndexQuery() {
return "CREATE INDEX IF NOT EXISTS "+this.getStoreTableName()+"_date_range_end_idx ON " + this.getStoreTableName() + " USING GIN((data->'validTime'->'end'))";
public String createValidTimeIndexQuery() {
return "CREATE INDEX IF NOT EXISTS "+this.getStoreTableName()+"_date_range_begin_idx ON " + this.getStoreTableName() +" "+
"USING GIST (tsrange(parse_utc_timestamp(data->'validTime'->>'begin'),parse_utc_timestamp(data->'validTime'->>'end')))";
}

public String createTrigramExtensionQuery() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ public String createSelectEntriesQuery(FeatureFilter filter, Set<IFeatureStoreBa
.withFields(fields)
.withFeatureFilter(filter)
.withLimit(filter.getLimit())
.withOffset(0)
.build();
return selectEntriesFeatureQuery.toQuery();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,10 @@ public String createPhenomenonTimeIndexQuery() {
return "CREATE INDEX IF NOT EXISTS "+this.getStoreTableName()+"_datastream_id_phenomenon_time_idx on "+this.getStoreTableName()+" ("+ DATASTREAM_ID + ", " + PHENOMENON_TIME +")";
}

public String createPhenomenonTimeSimpleIndexQuery() {
return "CREATE INDEX IF NOT EXISTS "+this.getStoreTableName()+"_datastream_id_phenomenon_time_only_idx on "+this.getStoreTableName()+" (" + PHENOMENON_TIME +" ASC)";
}

public String createResultTimeIndexQuery() {
return "CREATE INDEX IF NOT EXISTS "+this.getStoreTableName()+"_datastream_id_result_time_idx on "+this.getStoreTableName()+" ("+ DATASTREAM_ID + ", " + RESULT_TIME +")";
}
Expand Down Expand Up @@ -94,16 +98,16 @@ public String updateByIdQuery() {
}

public String getPhenomenonTimeRangeByDataStreamIdQuery(long dataStreamID) {
return "SELECT Min("+PHENOMENON_TIME+"),Max("+PHENOMENON_TIME+") FROM "+this.getStoreTableName()+" WHERE "+DATASTREAM_ID+" = "+dataStreamID;
return "SELECT Min("+PHENOMENON_TIME+"),Max("+PHENOMENON_TIME+") FROM "+this.getStoreTableName()+" WHERE "+DATASTREAM_ID+" = "+dataStreamID+" ";
}

public String getPhenomenonTimeRangeByDataStreamIdsQuery() {
public String getPhenomenonTimeRangeByDataStreamIdsQuery(String ids) {
return "SELECT " + DATASTREAM_ID + ", " +
" MIN(" + PHENOMENON_TIME + ") AS min, " +
" MAX(" + PHENOMENON_TIME + ") AS max " +
"FROM " + getStoreTableName() +
" WHERE " + DATASTREAM_ID + " = ANY (?) " +
"GROUP BY " + DATASTREAM_ID;
" WHERE " + DATASTREAM_ID + " IN ("+ ids +") " +
"GROUP BY " + DATASTREAM_ID+" ";
}

public String getBinCountByPhenomenontime(long seconds, List<Long> dsIds, List<Long> foiIds) {
Expand All @@ -128,16 +132,16 @@ public String getBinCountByPhenomenontime(long seconds, List<Long> dsIds, List<L
}

public String getResultTimeRangeByDataStreamIdQuery(long dataStreamID) {
return "SELECT Min("+RESULT_TIME+"),Max("+RESULT_TIME+") FROM "+this.getStoreTableName()+" WHERE "+DATASTREAM_ID+" = "+dataStreamID;
return "SELECT Min("+RESULT_TIME+"),Max("+RESULT_TIME+") FROM "+this.getStoreTableName()+" WHERE "+DATASTREAM_ID+" = "+dataStreamID+" LIMIT 1";
}

public String getResultTimeRangeByDataStreamIdsQuery() {
public String getResultTimeRangeByDataStreamIdsQuery(String ids) {
return "SELECT " + DATASTREAM_ID + ", " +
" MIN(" + RESULT_TIME + ") AS min, " +
" MAX(" + RESULT_TIME + ") AS max " +
"FROM " + getStoreTableName() +
" WHERE " + DATASTREAM_ID + " = ANY (?) " +
"GROUP BY " + DATASTREAM_ID;
" WHERE " + DATASTREAM_ID + " IN ("+ ids +") " +
"GROUP BY " + DATASTREAM_ID+"";
}

public String countByPhenomenonTimeRangeQuery(Instant min, Instant max) {
Expand Down Expand Up @@ -165,6 +169,7 @@ public String createSelectEntriesQuery(ObsFilter filter, Set<IObsStore.ObsField>
.withFields(fields)
.withObsFilter(filter)
.withLimit(filter.getLimit())
.withOffset(0)
.build();
return selectEntriesObsQuery.toQuery();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,19 +56,10 @@ public String selectLastVersionByUidQuery(String uid, String timestamp) {
}

@Override
public String createUidUniqueIndexQuery() {
return "CREATE UNIQUE INDEX IF NOT EXISTS "+this.getStoreTableName()+"_feature_uid_idx ON "+this.getStoreTableName()+" " +
"((data->>'uniqueId'), "+VALID_TIME+")";
}

@Override
public String createTrigramDescriptionFullTextIndexQuery() {
return "CREATE INDEX IF NOT EXISTS "+this.getStoreTableName()+"_feature_desc_full_text_datastream_idx ON "+this.getStoreTableName()+" USING GIN ((data->>'description') gin_trgm_ops)";
}
@Override
public String createTrigramUidFullTextIndexQuery() {
return "CREATE INDEX IF NOT EXISTS "+this.getStoreTableName()+"_feature_uid_full_text_datastream_idx ON "+this.getStoreTableName()+" USING GIN ((data->>'uniqueId') gin_trgm_ops)";
}
public String createUidUniqueIndexQuery() {
return "CREATE UNIQUE INDEX IF NOT EXISTS "+this.getStoreTableName()+"_feature_uid_idx ON "+this.getStoreTableName()+" " +
"((data->>'uniqueId'), "+VALID_TIME+")";
}

public String addOrUpdateByIdQuery() {
return this.insertFeatureByIdQuery()+" ON CONFLICT ((data->>'uniqueId'), "+VALID_TIME +") DO "+
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ protected void addCondition(String condition) {
filterQueryGenerator.addCondition(condition);
}

protected void orCondition(String condition) {
filterQueryGenerator.orCondition(condition);
}

public void setCommandStreamTableName(String commandStreamTableName) {
this.commandStreamTableName = commandStreamTableName;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

package org.sensorhub.impl.datastore.postgis.builder.filter.datastream;

import org.sensorhub.api.common.BigId;
import org.sensorhub.api.datastore.FullTextFilter;
import org.sensorhub.api.datastore.TemporalFilter;
import org.sensorhub.api.datastore.obs.DataStreamFilter;
Expand Down Expand Up @@ -55,9 +56,18 @@ protected void handleId(long id) {

protected void handleOutputNames(SortedSet<String> names) {
if (names != null && !names.isEmpty()) {
addCondition("("+tableName+".data->>'outputName') in (" +
names.stream().map(name -> "'" + name + "'").collect(Collectors.joining(",")) +
")");
StringBuilder query = new StringBuilder();
boolean first = true;
String op = "";
for(String name: names) {
if(first) {
first = false;
}else {
op = " OR ";
}
query.append(op).append(tableName).append(".data @> '{\"outputName\": \"").append(name).append("\"}'");
}
addCondition(query.toString());
}
}

Expand Down Expand Up @@ -113,9 +123,13 @@ protected void handleSystemFilter(SystemFilter systemFilter) {
if(uid.contains("*")) {
operator = "ILIKE";
currentId = uid.replaceAll("\\*","%");
}

sb.append("(").append(tableName).append(".data->'system@id'->>'uniqueID') "+operator+" '").append(currentId).append("'");
// USE pgtrim index?
sb.append("(").append(tableName).append(".data->'system@id'->>'uniqueID') "+operator+" '").append(currentId).append("'");
} else {
// USE GIN index
sb.append("(").append(tableName).append(".data @> '{\"system@id\": {\"uniqueID\": \"").append(currentId).append("\"").append("}}'").append(")");
}
if(++i < uniqueIds.size()) {
sb.append(" OR ");
}
Expand All @@ -126,10 +140,18 @@ protected void handleSystemFilter(SystemFilter systemFilter) {

// handle internal IDS
if (systemFilter.getInternalIDs() != null && !systemFilter.getInternalIDs().isEmpty()) {
String sb = "(" + tableName + ".data->'system@id'->'internalID'->'id')::bigint in (" +
systemFilter.getInternalIDs().stream().map(bigId -> String.valueOf(bigId.getIdAsLong())).collect(Collectors.joining(",")) +
")";
addCondition(sb);
StringBuilder query = new StringBuilder();
boolean first = true;
String op = "";
for(BigId sysId: systemFilter.getInternalIDs()) {
if(first) {
first = false;
}else {
op = " OR ";
}
query.append(op).append(tableName).append(".data @> '{\"system@id\": {\"internalID\": {\"id\": ").append(sysId.getIdAsLong()).append("}}}'");
}
addCondition(query.toString());
}
}
if (systemFilter.getParentFilter() != null || systemFilter.getProcedureFilter() != null
Expand All @@ -150,7 +172,7 @@ protected void handleObsFilter(ObsFilter obsFilter) {
protected void handleObservedPropertiesFilter(SortedSet<String> properties) {
if(properties != null) {
StringBuilder sb = new StringBuilder();
sb.append("jsonb_path_exists(").append(tableName).append(".data, '$.** ? (");
sb.append(tableName).append(".data @? '$.** ? (");
boolean first=true;
for(String property : properties) {
if(!first) {
Expand All @@ -160,7 +182,7 @@ protected void handleObservedPropertiesFilter(SortedSet<String> properties) {
sb.append("@ == \"").append(property).append("\"");
first = false;
}
sb.append(")')");
sb.append(")'");
addCondition(sb.toString());
}
}
Expand All @@ -177,6 +199,5 @@ public static boolean hasOnlyInternalIds(DataStreamFilter dataStreamFilter) {
dataStreamFilter.getOutputNames() == null &&
dataStreamFilter.getObservedProperties() == null &&
(dataStreamFilter.getInternalIDs() != null && !dataStreamFilter.getInternalIDs().isEmpty()));

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,11 @@ protected void handleValidTimeFilter(TemporalFilter temporalFilter) {
String min = PostgisUtils.checkAndGetValidInstant(temporalFilter.getMin());
String max = PostgisUtils.checkAndGetValidInstant(temporalFilter.getMax());

String sb = "tsrange((" +
String sb = "tsrange(parse_utc_timestamp(" +
tableName +
".data->'validTime'->>'begin')::timestamp,(" +
".data->'validTime'->>'begin'),parse_utc_timestamp(" +
tableName +
".data->'validTime'->>'end')::timestamp)" +
".data->'validTime'->>'end'))" +
" "+PostgisUtils.getOperator(temporalFilter)+" " +
"'[" + min + "," + max + "]'::tsrange";
addCondition(sb);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,11 @@ protected void handleValidTimeFilter(TemporalFilter temporalFilter) {
String min = PostgisUtils.checkAndGetValidInstant(temporalFilter.getMin());
String max = PostgisUtils.checkAndGetValidInstant(temporalFilter.getMax());

String sb = "tsrange((" +
String sb = "tsrange(parse_utc_timestamp(" +
tableName +
".data->'validTime'->>'begin')::timestamp,(" +
".data->'validTime'->>'begin'),parse_utc_timestamp(" +
tableName +
".data->'validTime'->>'end')::timestamp)" +
".data->'validTime'->>'end'))" +
" "+PostgisUtils.getOperator(temporalFilter)+" " +
"'[" + min + "," + max + "]'::tsrange";
addCondition(sb);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import org.sensorhub.impl.datastore.postgis.utils.PostgisUtils;
import org.vast.ogc.gml.IFeature;

import java.util.ArrayList;
import java.util.List;
import java.util.SortedSet;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -65,13 +67,15 @@ public void handleValidTimeFilter(TemporalFilter temporalFilter, String rangeOpS

protected void handleFullTextFilter(FullTextFilter fullTextFilter) {
if (fullTextFilter != null) {
// can use directly ~* for fast lookup
// https://www.postgresql.org/docs/current/pgtrgm.html
if(fullTextFilter.getKeywords() != null) {
String sb = "(" + tableName + ".data->'properties'->>'description') ~* '(" +
fullTextFilter.getKeywords().stream().collect(Collectors.joining("|")) +
")'";
addCondition(sb);
List<String> sqlKeywords = fullTextFilter.getKeywords()
.stream()
.map(k -> {
String start = (k.startsWith("*") ? "" : "%");
String end = (k.endsWith("*") ? "" : "%");;
return tableName + ".data::text ILIKE '" + start + k.replaceAll("\\*", "%") + end + "'";
}).toList();
addCondition(" ( "+sqlKeywords.stream().collect(Collectors.joining(" OR "))+" ) ");
}
}
}
Expand Down
Loading
Loading