Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
00e9a9c
Added PostgresSchemaRegistry.java
suddendust Dec 28, 2025
31846e9
Spotless
suddendust Dec 28, 2025
2fdbf0e
WIP
suddendust Dec 29, 2025
1727dd0
Spotless
suddendust Dec 29, 2025
a62fbc2
Remove unused method in SchemaRegistry
suddendust Dec 29, 2025
6b7595b
Remove unused method in ColumnMetadata
suddendust Dec 29, 2025
7b4ef2a
WIP
suddendust Dec 29, 2025
598cb25
WIP
suddendust Dec 29, 2025
9c173b9
Configure cache expiry and cooldown
suddendust Dec 29, 2025
7bf77c5
Added PostgresMetadataFetcherTest
suddendust Dec 29, 2025
6d03cd5
WIP
suddendust Dec 29, 2025
c3f5f7e
Added docs on thread safety
suddendust Dec 29, 2025
827381f
Added PostgresSchemaRegistryIntegrationTest.java
suddendust Dec 29, 2025
602037b
WIP
suddendust Dec 29, 2025
c8a53eb
WIP
suddendust Dec 30, 2025
31f16e2
Refactor
suddendust Jan 4, 2026
c139bee
Merge branch 'schema_cache' into pg_write_create
suddendust Jan 4, 2026
75150e3
Merge branch 'main' of github.com:hypertrace/document-store into sche…
suddendust Jan 11, 2026
5412f9b
Merge branch 'schema_cache' into pg_write_create
suddendust Jan 12, 2026
bf5ca5c
WIP
suddendust Jan 12, 2026
57b623a
Implement create for flat collections
suddendust Jan 12, 2026
233c9c4
Merge branch 'main' of github.com:hypertrace/document-store into pg_w…
suddendust Jan 12, 2026
9f8811e
Fix compilation issue
suddendust Jan 12, 2026
bfd6651
Refactor
suddendust Jan 14, 2026
70ec4b3
Enhanced CreateResult.java and others
suddendust Jan 14, 2026
6d1c277
Added more test cases
suddendust Jan 14, 2026
910ef8c
Spotless
suddendust Jan 14, 2026
3e2c178
WIP
suddendust Jan 14, 2026
9061e24
Add more test coverage
suddendust Jan 14, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -1,16 +1,24 @@
package org.hypertrace.core.documentstore;

import java.util.List;
import lombok.AllArgsConstructor;
import lombok.Getter;

/*
* Represent the result object for CREATE operation of document store APIs.
* */
@AllArgsConstructor
@Getter
public class CreateResult {
private boolean succeed;
private boolean isSucceed;
private boolean onRetry;
private List<String> skippedFields;

public CreateResult(boolean succeed) {
this.succeed = succeed;
public CreateResult(boolean isSucceed) {
this.isSucceed = isSucceed;
}

public boolean isSucceed() {
return succeed;
public boolean isPartial() {
return !skippedFields.isEmpty();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,9 @@ public interface ColumnMetadata {
* @return whether this column can be set to NULL
*/
boolean isNullable();

/**
* @return whether this column is an array type
*/
boolean isArray();
}
Original file line number Diff line number Diff line change
@@ -1,8 +1,21 @@
package org.hypertrace.core.documentstore.postgres;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
import java.sql.Connection;
import java.sql.Date;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.sql.Types;
import java.time.Instant;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.Set;
import org.hypertrace.core.documentstore.BulkArrayValueUpdateRequest;
Expand All @@ -15,12 +28,17 @@
import org.hypertrace.core.documentstore.Filter;
import org.hypertrace.core.documentstore.Key;
import org.hypertrace.core.documentstore.UpdateResult;
import org.hypertrace.core.documentstore.model.exception.DuplicateDocumentException;
import org.hypertrace.core.documentstore.model.options.QueryOptions;
import org.hypertrace.core.documentstore.model.options.UpdateOptions;
import org.hypertrace.core.documentstore.model.subdoc.SubDocumentUpdate;
import org.hypertrace.core.documentstore.postgres.model.PostgresColumnMetadata;
import org.hypertrace.core.documentstore.postgres.query.v1.PostgresQueryParser;
import org.hypertrace.core.documentstore.postgres.query.v1.parser.filter.nonjson.field.PostgresDataType;
import org.hypertrace.core.documentstore.postgres.query.v1.transformer.FlatPostgresFieldTransformer;
import org.hypertrace.core.documentstore.query.Query;
import org.postgresql.util.PSQLException;
import org.postgresql.util.PSQLState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -34,6 +52,7 @@
public class FlatPostgresCollection extends PostgresCollection {

private static final Logger LOGGER = LoggerFactory.getLogger(FlatPostgresCollection.class);
private static final ObjectMapper MAPPER = new ObjectMapper();
private static final String WRITE_NOT_SUPPORTED =
"Write operations are not supported for flat collections yet!";

Expand Down Expand Up @@ -148,7 +167,7 @@ public void drop() {

@Override
public CreateResult create(Key key, Document document) throws IOException {
throw new UnsupportedOperationException(WRITE_NOT_SUPPORTED);
return createWithRetry(key, document, false);
}

@Override
Expand Down Expand Up @@ -188,4 +207,264 @@ public CloseableIterator<Document> bulkUpdate(
throws IOException {
throw new UnsupportedOperationException(WRITE_NOT_SUPPORTED);
}

/*isRetry: Whether this is a retry attempt*/
private CreateResult createWithRetry(Key key, Document document, boolean isRetry)
throws IOException {
String tableName = tableIdentifier.getTableName();

List<String> skippedFields = new ArrayList<>();

try {
TypedDocument parsed = parseDocument(document, tableName, skippedFields);
// if there are no valid columns in the document
if (parsed.isEmpty()) {
LOGGER.warn("No valid columns found in the document for table: {}", tableName);
return new CreateResult(false, isRetry, skippedFields);
}

String sql = buildInsertSql(parsed.getColumns());
LOGGER.debug("Insert SQL: {}", sql);

int result = executeUpdate(sql, parsed);
LOGGER.debug("Create result: {}", result);
return new CreateResult(result > 0, isRetry, skippedFields);

} catch (PSQLException e) {
if (PSQLState.UNIQUE_VIOLATION.getState().equals(e.getSQLState())) {
throw new DuplicateDocumentException();
}
return handlePSQLExceptionForCreate(e, key, document, tableName, isRetry);
} catch (SQLException e) {
LOGGER.error("SQLException creating document. key: {} content: {}", key, document, e);
throw new IOException(e);
}
}

private TypedDocument parseDocument(
Document document, String tableName, List<String> skippedColumns) throws IOException {
JsonNode jsonNode = MAPPER.readTree(document.toJson());
TypedDocument typedDocument = new TypedDocument();

Iterator<Entry<String, JsonNode>> fields = jsonNode.fields();
while (fields.hasNext()) {
Entry<String, JsonNode> field = fields.next();
String fieldName = field.getKey();
JsonNode fieldValue = field.getValue();

Optional<PostgresColumnMetadata> columnMetadata =
schemaRegistry.getColumnOrRefresh(tableName, fieldName);

if (columnMetadata.isEmpty()) {
LOGGER.warn("Could not find column metadata for column: {}, skipping it", fieldName);
skippedColumns.add(fieldName);
continue;
}

PostgresDataType type = columnMetadata.get().getPostgresType();
boolean isArray = columnMetadata.get().isArray();

try {
Object value = extractValue(fieldValue, type, isArray);
typedDocument.add("\"" + fieldName + "\"", value, type, isArray);
} catch (Exception e) {
// If we fail to parse the value, we skip this field to write on a best-effort basis
LOGGER.warn(
"Could not parse value for column: {} with type: {}, skipping it. Error: {}",
fieldName,
type,
e.getMessage());
skippedColumns.add(fieldName);
}
}

return typedDocument;
}

private int executeUpdate(String sql, TypedDocument parsed) throws SQLException {
try (Connection conn = client.getPooledConnection();
PreparedStatement ps = conn.prepareStatement(sql)) {
int index = 1;
for (String column : parsed.getColumns()) {
setParameter(
conn,
ps,
index++,
parsed.getValue(column),
parsed.getType(column),
parsed.isArray(column));
}
return ps.executeUpdate();
}
}

private CreateResult handlePSQLExceptionForCreate(
PSQLException e, Key key, Document document, String tableName, boolean isRetry)
throws IOException {
if (!isRetry && shouldRefreshSchemaAndRetry(e.getSQLState())) {
LOGGER.info(
"Schema mismatch detected (SQLState: {}), refreshing schema and retrying. key: {}",
e.getSQLState(),
key);
schemaRegistry.invalidate(tableName);
return createWithRetry(key, document, true);
}
LOGGER.error("SQLException creating document. key: {} content: {}", key, document, e);
throw new IOException(e);
}

/**
* Returns true if the SQL state indicates a schema mismatch, i.e. the column does not exist or
* the data type is mismatched.
*/
private boolean shouldRefreshSchemaAndRetry(String sqlState) {
return PSQLState.UNDEFINED_COLUMN.getState().equals(sqlState)
|| PSQLState.DATATYPE_MISMATCH.getState().equals(sqlState);
}

/**
* Typed document contains field information along with the field type. Uses LinkedHashMaps keyed
* by column name. LinkedHashMap preserves insertion order for consistent parameter binding.
*/
private static class TypedDocument {
private final Map<String, Object> values = new HashMap<>();
private final Map<String, PostgresDataType> types = new HashMap<>();
private final Map<String, Boolean> arrays = new HashMap<>();

void add(String column, Object value, PostgresDataType type, boolean isArray) {
values.put(column, value);
types.put(column, type);
arrays.put(column, isArray);
}

boolean isEmpty() {
return values.isEmpty();
}

List<String> getColumns() {
return new ArrayList<>(values.keySet());
}

Object getValue(String column) {
return values.get(column);
}

PostgresDataType getType(String column) {
return types.get(column);
}

boolean isArray(String column) {
return arrays.getOrDefault(column, false);
}
}

private String buildInsertSql(List<String> columns) {
String columnList = String.join(", ", columns);
String placeholders = String.join(", ", columns.stream().map(c -> "?").toArray(String[]::new));
return String.format(
"INSERT INTO %s (%s) VALUES (%s)", tableIdentifier, columnList, placeholders);
}

private Object extractValue(JsonNode node, PostgresDataType type, boolean isArray) {
if (node == null || node.isNull()) {
return null;
}

if (isArray) {
if (!node.isArray()) {
node = MAPPER.createArrayNode().add(node);
}
List<Object> values = new ArrayList<>();
for (JsonNode element : node) {
values.add(extractScalarValue(element, type));
}
return values.toArray();
}

return extractScalarValue(node, type);
}

private Object extractScalarValue(JsonNode node, PostgresDataType type) {
switch (type) {
case INTEGER:
return node.isNumber() ? node.intValue() : Integer.parseInt(node.asText());
case BIGINT:
return node.isNumber() ? node.longValue() : Long.parseLong(node.asText());
case REAL:
return node.isNumber() ? node.floatValue() : Float.parseFloat(node.asText());
case DOUBLE_PRECISION:
return node.isNumber() ? node.doubleValue() : Double.parseDouble(node.asText());
case BOOLEAN:
return node.isBoolean() ? node.booleanValue() : Boolean.parseBoolean(node.asText());
case TIMESTAMPTZ:
if (node.isTextual()) {
return Timestamp.from(Instant.parse(node.asText()));
} else if (node.isNumber()) {
return new Timestamp(node.longValue());
}
return null;
case DATE:
if (node.isTextual()) {
return Date.valueOf(node.asText());
}
return null;
case JSONB:
return node.toString();
default:
return node.asText();
}
}

private void setParameter(
Connection conn,
PreparedStatement ps,
int index,
Object value,
PostgresDataType type,
boolean isArray)
throws SQLException {
if (value == null) {
ps.setObject(index, null);
return;
}

if (isArray) {
Object[] arrayValues = (value instanceof Object[]) ? (Object[]) value : new Object[] {value};
java.sql.Array sqlArray = conn.createArrayOf(type.getSqlType(), arrayValues);
ps.setArray(index, sqlArray);
return;
}

switch (type) {
case INTEGER:
ps.setInt(index, (Integer) value);
break;
case BIGINT:
ps.setLong(index, (Long) value);
break;
case REAL:
ps.setFloat(index, (Float) value);
break;
case DOUBLE_PRECISION:
ps.setDouble(index, (Double) value);
break;
case BOOLEAN:
ps.setBoolean(index, (Boolean) value);
break;
case TEXT:
ps.setString(index, (String) value);
break;
case TIMESTAMPTZ:
ps.setTimestamp(index, (Timestamp) value);
break;
case DATE:
ps.setDate(index, (java.sql.Date) value);
break;
case JSONB:
ps.setObject(index, value, Types.OTHER);
break;
default:
ps.setString(index, value.toString());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,16 @@ public Map<String, PostgresColumnMetadata> fetch(String tableName) {
String columnName = rs.getString("column_name");
String udtName = rs.getString("udt_name");
boolean isNullable = "YES".equalsIgnoreCase(rs.getString("is_nullable"));
boolean isArray = udtName != null && udtName.startsWith("_");
String baseType = isArray ? udtName.substring(1) : udtName;
metadataMap.put(
columnName,
new PostgresColumnMetadata(
columnName, mapToCanonicalType(udtName), mapToPostgresType(udtName), isNullable));
columnName,
mapToCanonicalType(baseType),
mapToPostgresType(baseType),
isNullable,
isArray));
}
}
return metadataMap;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ public class PostgresColumnMetadata implements ColumnMetadata {
private final DataType canonicalType;
@Getter private final PostgresDataType postgresType;
private final boolean nullable;
private final boolean isArray;

@Override
public String getName() {
Expand All @@ -30,4 +31,9 @@ public DataType getCanonicalType() {
public boolean isNullable() {
return nullable;
}

@Override
public boolean isArray() {
return isArray;
}
}
Loading
Loading