Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,13 @@ public JavaType handleUnknownTypeId(
int ix = baseCommand.indexOf("Command");
if (ix > 0) {
baseCommand = baseCommand.substring(0, ix) + " " + "Command";
} else {
// Also handle nested polymorphic operations like "AlterCollectionOperation" ->
// "AlterCollection Operation" so the error message reads more naturally.
int opIx = baseCommand.indexOf("Operation");
if (opIx > 0) {
baseCommand = baseCommand.substring(0, opIx) + " " + "Operation";
}
}

throw RequestException.Code.COMMAND_UNKNOWN.get(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
@JsonSubTypes.Type(value = InsertOneCommand.class),
@JsonSubTypes.Type(value = UpdateManyCommand.class),
@JsonSubTypes.Type(value = UpdateOneCommand.class),
@JsonSubTypes.Type(value = AlterCollectionCommand.class),
// We have only collection resource that is used for API Tables
@JsonSubTypes.Type(value = AlterTableCommand.class),
@JsonSubTypes.Type(value = CreateIndexCommand.class),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ public enum CommandName {
// they should not be DDL, they are not changing schema, we should add an CommandType.ADMIN for
// them ?

ALTER_COLLECTION(Names.ALTER_COLLECTION, CommandType.DDL, CommandTarget.COLLECTION),
ALTER_TABLE(Names.ALTER_TABLE, CommandType.DDL, CommandTarget.TABLE),
ALTER_TYPE(Names.ALTER_TYPE, CommandType.DDL, CommandTarget.TABLE),
COUNT_DOCUMENTS(Names.COUNT_DOCUMENTS, CommandType.DML, CommandTarget.COLLECTION),
Expand Down Expand Up @@ -107,6 +108,7 @@ public static List<CommandName> filterByTarget(CommandTarget target) {
}

public interface Names {
String ALTER_COLLECTION = "alterCollection";
String ALTER_TABLE = "alterTable";
String ALTER_TYPE = "alterType";
String COUNT_DOCUMENTS = "countDocuments";
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package io.stargate.sgv2.jsonapi.api.model.command.impl;

import com.fasterxml.jackson.annotation.JsonTypeName;
import io.stargate.sgv2.jsonapi.api.model.command.CollectionCommand;
import io.stargate.sgv2.jsonapi.api.model.command.CommandName;
import io.stargate.sgv2.jsonapi.api.model.command.NoOptionsCommand;
import org.eclipse.microprofile.openapi.annotations.media.Schema;

@Schema(
description =
"Command that alters mutable settings of an existing collection. Currently supports enabling the 'lexical' feature.")
@JsonTypeName(CommandName.Names.ALTER_COLLECTION)
public record AlterCollectionCommand(AlterCollectionOperation operation)
implements CollectionCommand, NoOptionsCommand {

@Override
public CommandName commandName() {
return CommandName.ALTER_COLLECTION;
}

@Override
public boolean isForceSchemaRefresh() {
return true;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package io.stargate.sgv2.jsonapi.api.model.command.impl;

import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;

/**
* Polymorphic operation payload for {@link AlterCollectionCommand}. Each operation is represented
* by a record implementing this interface; Jackson selects the concrete subtype by the wrapper key
* (e.g. {@code "enableLexical"}). Mirrors {@link AlterTableOperation}.
*/
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.WRAPPER_OBJECT)
@JsonSubTypes({@JsonSubTypes.Type(value = AlterCollectionOperationImpl.EnableLexical.class)})
public interface AlterCollectionOperation {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package io.stargate.sgv2.jsonapi.api.model.command.impl;

import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import com.fasterxml.jackson.databind.JsonNode;
import java.util.Map;
import javax.annotation.Nullable;
import org.eclipse.microprofile.openapi.annotations.media.Schema;

/** Each operation that {@link AlterCollectionCommand} understands is represented by a record. */
public class AlterCollectionOperationImpl {

@Schema(description = "Operation to enable the lexical search feature on a collection.")
@JsonTypeName("enableLexical")
public record EnableLexical(
@Schema(
description =
"Analyzer to use for '$lexical' field: either String (name of a pre-defined analyzer), or JSON Object to specify custom one. Default: 'standard'.",
defaultValue = "standard",
oneOf = {String.class, Map.class})
@JsonInclude(JsonInclude.Include.NON_NULL)
@JsonProperty("analyzer")
@Nullable
JsonNode analyzerDef)
implements AlterCollectionOperation {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import io.smallrye.mutiny.Uni;
import io.stargate.sgv2.jsonapi.ConfigPreLoader;
import io.stargate.sgv2.jsonapi.api.model.command.*;
import io.stargate.sgv2.jsonapi.api.model.command.impl.AlterCollectionCommand;
import io.stargate.sgv2.jsonapi.api.model.command.impl.AlterTableCommand;
import io.stargate.sgv2.jsonapi.api.model.command.impl.CountDocumentsCommand;
import io.stargate.sgv2.jsonapi.api.model.command.impl.CreateIndexCommand;
Expand Down Expand Up @@ -138,6 +139,7 @@ public CollectionResource(
InsertManyCommand.class,
UpdateManyCommand.class,
UpdateOneCommand.class,
AlterCollectionCommand.class,
// Table Only commands
AlterTableCommand.class,
CreateIndexCommand.class,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ public enum Code implements ErrorCode<SchemaException> {
EXISTING_COLLECTION_DIFFERENT_SETTINGS,

EXISTING_TABLE_NOT_DATA_API_COLLECTION, // converted from ErrorCodeV1
INVALID_ALTER_COLLECTION_OPTIONS,
INVALID_CREATE_COLLECTION_OPTIONS,
INVALID_FORMAT_FOR_INDEX_CREATION_COLUMN,
INVALID_INDEXING_DEFINITION,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
package io.stargate.sgv2.jsonapi.service.operation.collections;

import com.datastax.oss.driver.api.core.CqlIdentifier;
import com.datastax.oss.driver.api.core.cql.AsyncResultSet;
import com.datastax.oss.driver.api.core.cql.SimpleStatement;
import com.fasterxml.jackson.core.JacksonException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import io.smallrye.mutiny.Uni;
import io.stargate.sgv2.jsonapi.api.model.command.CommandContext;
import io.stargate.sgv2.jsonapi.api.model.command.CommandResult;
import io.stargate.sgv2.jsonapi.api.request.RequestContext;
import io.stargate.sgv2.jsonapi.config.constants.DocumentConstants;
import io.stargate.sgv2.jsonapi.config.constants.TableCommentConstants;
import io.stargate.sgv2.jsonapi.service.cqldriver.executor.QueryExecutor;
import io.stargate.sgv2.jsonapi.service.operation.Operation;
import io.stargate.sgv2.jsonapi.service.resolver.CreateCollectionCommandResolver;
import io.stargate.sgv2.jsonapi.service.schema.collections.CollectionLexicalConfig;
import io.stargate.sgv2.jsonapi.service.schema.collections.CollectionSchemaObject;
import java.time.Duration;
import java.util.function.Supplier;

/**
* Operation that enables the lexical feature on an existing collection by adding the {@code
* query_lexical_value} column, creating an analyzed SAI index on it, and updating the table
* "comment" JSON to record the new lexical config.
*
* <p>When {@link #noOp} is true the operation returns success without executing any DDL: this is
* the "already enabled with same settings" case.
*
* <p><b>No rollback on partial failure.</b> If e.g. ADD COLUMN succeeds but CREATE INDEX fails, the
* column is left in place and the failure is propagated to the caller. This matches {@link
* CreateCollectionOperation}'s behavior and is intentional:
*
* <ul>
* <li>Reverse DDL (DROP COLUMN, DROP INDEX) is itself fallible — a rollback that fails leaves the
* schema in a worse state than the original partial failure and obscures the root cause.
* <li>The operation is designed to be retry-safe: ADD COLUMN is skipped when the column already
* exists, CREATE INDEX uses {@code IF NOT EXISTS}, and ALTER TABLE WITH comment is naturally
* idempotent. Re-issuing the same {@code alterCollection} command after the underlying issue
* is resolved completes the unfinished steps without re-running the finished ones.
* <li>Users get a consistent mental model with {@code createCollection}, which has the same
* partial-failure semantics.
* </ul>
*/
public record AlterCollectionLexicalOperation(
CommandContext<CollectionSchemaObject> commandContext,
ObjectMapper objectMapper,
int ddlDelayMillis,
CollectionLexicalConfig newLexicalConfig,
boolean noOp)
implements Operation<CollectionSchemaObject> {

private static final CqlIdentifier COMMENT_OPTION = CqlIdentifier.fromInternal("comment");

private static final CqlIdentifier LEXICAL_COLUMN =
CqlIdentifier.fromInternal(DocumentConstants.Columns.LEXICAL_INDEX_COLUMN_NAME);

@Override
public Uni<Supplier<CommandResult>> execute(
RequestContext requestContext, QueryExecutor queryExecutor) {

if (noOp) {
// Type witness needed: Mutiny's item(T) and item(Supplier<? extends T>) overloads otherwise
// both match SchemaChangeResult (which is a Supplier<CommandResult>), and inference picks
// the wrong T.
return Uni.createFrom().<Supplier<CommandResult>>item(new SchemaChangeResult(true));
}

final CollectionSchemaObject schemaObject = commandContext.schemaObject();
final String keyspace = schemaObject.tableMetadata().getKeyspace().asInternal();
final String table = schemaObject.tableMetadata().getName().asInternal();

final String newComment;
try {
newComment = buildUpdatedComment(schemaObject);
} catch (JacksonException e) {
return Uni.createFrom().failure(e);
}

// Idempotent for retry after partial failure: skip ADD COLUMN if the column already exists.
final boolean columnAlreadyExists =
schemaObject.tableMetadata().getColumn(LEXICAL_COLUMN).isPresent();

SimpleStatement createIndexStmt =
CreateCollectionOperation.buildLexicalIndexStatement(
keyspace, table, newLexicalConfig, /* ifNotExists */ true);

// Cassandra does not accept bind parameters for table options like `comment`; embed the
// JSON directly with CQL single-quote escaping (matches
// CreateCollectionOperation.getCreateTable).
SimpleStatement alterCommentStmt =
SimpleStatement.newInstance(
"ALTER TABLE \"%s\".\"%s\" WITH comment = '%s'"
.formatted(keyspace, table, newComment.replace("'", "''")));

final Duration delay = Duration.ofMillis(ddlDelayMillis > 0 ? ddlDelayMillis : 100);

Uni<AsyncResultSet> pipeline;
if (columnAlreadyExists) {
pipeline = queryExecutor.executeCreateSchemaChange(requestContext, createIndexStmt);
} else {
SimpleStatement addColumnStmt =
SimpleStatement.newInstance(
"ALTER TABLE \"%s\".\"%s\" ADD %s text"
.formatted(keyspace, table, DocumentConstants.Columns.LEXICAL_INDEX_COLUMN_NAME));
pipeline =
queryExecutor
.executeCreateSchemaChange(requestContext, addColumnStmt)
.onItem()
.delayIt()
.by(delay)
.onItem()
.transformToUni(
r1 -> queryExecutor.executeCreateSchemaChange(requestContext, createIndexStmt));
}

return pipeline
.onItem()
.delayIt()
.by(delay)
.onItem()
.transformToUni(
r2 -> queryExecutor.executeCreateSchemaChange(requestContext, alterCommentStmt))
.map(r3 -> new SchemaChangeResult(true));
}

/**
* Reads the current table comment JSON and surgically replaces the {@code
* collection.options.lexical} sub-node, leaving all other options (vector / indexing / id /
* rerank / unknown fields) untouched.
*
* <p>The resolver guarantees we are operating on a V1-shaped comment (legacy/V0 collections are
* rejected before reaching the operation).
*/
private String buildUpdatedComment(CollectionSchemaObject schemaObject) throws JacksonException {
final Object commentObj = schemaObject.tableMetadata().getOptions().get(COMMENT_OPTION);
final String comment = commentObj == null ? null : commentObj.toString();
if (comment == null || comment.isBlank()) {
// Defensive: resolver should have rejected this case.
throw new IllegalStateException(
"Cannot alter collection: table comment is empty; expected V1 schema");
}

final ObjectNode rootNode = (ObjectNode) objectMapper.readTree(comment);
final ObjectNode collectionNode =
(ObjectNode) rootNode.get(TableCommentConstants.TOP_LEVEL_KEY);
if (collectionNode == null) {
// Defensive: resolver should have rejected this case.
throw new IllegalStateException(
"Cannot alter collection: comment does not have '"
+ TableCommentConstants.TOP_LEVEL_KEY
+ "' node");
}
ObjectNode optionsNode = (ObjectNode) collectionNode.get(TableCommentConstants.OPTIONS_KEY);
if (optionsNode == null) {
optionsNode = objectMapper.createObjectNode();
collectionNode.set(TableCommentConstants.OPTIONS_KEY, optionsNode);
}
CreateCollectionCommandResolver.addLexicalToOptionsNode(optionsNode, newLexicalConfig);
return objectMapper.writeValueAsString(rootNode);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import com.datastax.oss.driver.api.core.cql.SimpleStatement;
import com.datastax.oss.driver.api.core.servererrors.InvalidQueryException;
import io.stargate.sgv2.jsonapi.config.constants.DocumentConstants;
import io.stargate.sgv2.jsonapi.exception.*;
import io.stargate.sgv2.jsonapi.service.cqldriver.executor.DefaultDriverExceptionHandler;
import io.stargate.sgv2.jsonapi.service.operation.tables.CreateIndexExceptionHandler;
Expand Down Expand Up @@ -52,7 +53,9 @@ public RuntimeException handle(InvalidQueryException exception) {
if (exception
.getMessage()
.contains(
"analyzed size for column query_lexical_value exceeds the cumulative limit for index")) {
"analyzed size for column "
+ DocumentConstants.Columns.LEXICAL_INDEX_COLUMN_NAME
+ " exceeds the cumulative limit for index")) {
return DocumentException.Code.LEXICAL_CONTENT_TOO_LONG.get(errVars(schemaObject, exception));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import io.stargate.sgv2.jsonapi.api.model.command.tracing.RequestTracing;
import io.stargate.sgv2.jsonapi.api.request.RequestContext;
import io.stargate.sgv2.jsonapi.config.DatabaseLimitsConfig;
import io.stargate.sgv2.jsonapi.config.constants.DocumentConstants;
import io.stargate.sgv2.jsonapi.exception.DatabaseException;
import io.stargate.sgv2.jsonapi.exception.SchemaException;
import io.stargate.sgv2.jsonapi.service.cqldriver.CQLSessionCache;
Expand Down Expand Up @@ -526,7 +527,10 @@ public static SimpleStatement getCreateTable(
String comment,
CollectionLexicalConfig lexicalConfig) {
// The keyspace and table name are quoted to make it case-sensitive
final String lexicalField = lexicalConfig.enabled() ? " query_lexical_value text, " : "";
final String lexicalField =
lexicalConfig.enabled()
? " %s text, ".formatted(DocumentConstants.Columns.LEXICAL_INDEX_COLUMN_NAME)
: "";
if (vectorSearch) {
String createTableWithVector =
"CREATE TABLE IF NOT EXISTS \"%s\".\"%s\" ("
Expand Down Expand Up @@ -643,18 +647,31 @@ public List<SimpleStatement> getIndexStatements(
}

if (lexicalConfig.enabled()) {
var analyzerDef = lexicalConfig.analyzerDefinition();
// Note: needs to be either plain (unquoted) String (NOT quoted JSON String) OR JSON Object
final String analyzerString =
analyzerDef.isTextual() ? analyzerDef.asText() : analyzerDef.toString();
final String lexicalCreateStmt =
"""
%s "%s_query_lexical_value" ON "%s"."%s" (query_lexical_value)
USING 'StorageAttachedIndex' WITH OPTIONS = { 'index_analyzer': '%s' }
"""
.formatted(appender, table, keyspace, table, analyzerString);
statements.add(SimpleStatement.newInstance(lexicalCreateStmt));
statements.add(buildLexicalIndexStatement(keyspace, table, lexicalConfig, collectionExisted));
}
return statements;
}

/**
* Builds the {@code CREATE CUSTOM INDEX} statement for the lexical column, used both by
* createCollection (when the table is fresh or being recreated) and by alterCollection (when
* enabling lexical on an existing collection).
*
* @param ifNotExists when true, emits {@code IF NOT EXISTS} for idempotent retries
*/
public static SimpleStatement buildLexicalIndexStatement(
String keyspace, String table, CollectionLexicalConfig lexicalConfig, boolean ifNotExists) {
var analyzerDef = lexicalConfig.analyzerDefinition();
// Note: needs to be either plain (unquoted) String (NOT quoted JSON String) OR JSON Object
final String analyzerString =
analyzerDef.isTextual() ? analyzerDef.asText() : analyzerDef.toString();
final String lexicalCol = DocumentConstants.Columns.LEXICAL_INDEX_COLUMN_NAME;
final String prefix = ifNotExists ? "CREATE CUSTOM INDEX IF NOT EXISTS" : "CREATE CUSTOM INDEX";
return SimpleStatement.newInstance(
"""
%s "%s_%s" ON "%s"."%s" (%s)
USING 'StorageAttachedIndex' WITH OPTIONS = { 'index_analyzer': '%s' }
"""
.formatted(prefix, table, lexicalCol, keyspace, table, lexicalCol, analyzerString));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import io.stargate.sgv2.jsonapi.api.model.command.CommandContext;
import io.stargate.sgv2.jsonapi.api.model.command.CommandResult;
import io.stargate.sgv2.jsonapi.api.request.RequestContext;
import io.stargate.sgv2.jsonapi.config.constants.DocumentConstants;
import io.stargate.sgv2.jsonapi.exception.DocumentException;
import io.stargate.sgv2.jsonapi.exception.SchemaException;
import io.stargate.sgv2.jsonapi.service.cqldriver.executor.QueryExecutor;
Expand Down Expand Up @@ -215,7 +216,7 @@ public String buildInsertQuery(boolean vectorEnabled) {
insertQuery.append(", query_vector_value");
}
if (lexicalEnabled) {
insertQuery.append(", query_lexical_value");
insertQuery.append(", ").append(DocumentConstants.Columns.LEXICAL_INDEX_COLUMN_NAME);
}

insertQuery.append(") VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?");
Expand Down
Loading
Loading