Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
@JsonSubTypes.Type(value = InsertOneCommand.class),
@JsonSubTypes.Type(value = UpdateManyCommand.class),
@JsonSubTypes.Type(value = UpdateOneCommand.class),
@JsonSubTypes.Type(value = AlterCollectionCommand.class),
// We have only collection resource that is used for API Tables
@JsonSubTypes.Type(value = AlterTableCommand.class),
@JsonSubTypes.Type(value = CreateIndexCommand.class),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ public enum CommandName {
// they should not be DDL, they are not changing schema, we should add an CommandType.ADMIN for
// them ?

ALTER_COLLECTION(Names.ALTER_COLLECTION, CommandType.DDL, CommandTarget.COLLECTION),
ALTER_TABLE(Names.ALTER_TABLE, CommandType.DDL, CommandTarget.TABLE),
ALTER_TYPE(Names.ALTER_TYPE, CommandType.DDL, CommandTarget.TABLE),
COUNT_DOCUMENTS(Names.COUNT_DOCUMENTS, CommandType.DML, CommandTarget.COLLECTION),
Expand Down Expand Up @@ -107,6 +108,7 @@ public static List<CommandName> filterByTarget(CommandTarget target) {
}

public interface Names {
String ALTER_COLLECTION = "alterCollection";
String ALTER_TABLE = "alterTable";
String ALTER_TYPE = "alterType";
String COUNT_DOCUMENTS = "countDocuments";
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package io.stargate.sgv2.jsonapi.api.model.command.impl;

import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonTypeName;
import io.stargate.sgv2.jsonapi.api.model.command.CollectionCommand;
import io.stargate.sgv2.jsonapi.api.model.command.CommandName;
import jakarta.validation.Valid;
import javax.annotation.Nullable;
import org.eclipse.microprofile.openapi.annotations.enums.SchemaType;
import org.eclipse.microprofile.openapi.annotations.media.Schema;

@Schema(
description =
"Command that alters mutable settings of an existing collection. Currently supports enabling the 'lexical' feature.")
@JsonTypeName(CommandName.Names.ALTER_COLLECTION)
public record AlterCollectionCommand(
@Valid
@JsonInclude(JsonInclude.Include.NON_NULL)
@Nullable
@Schema(
description =
"Lexical configuration to apply. Currently only enabling is supported ('enabled' must be true).",
type = SchemaType.OBJECT,
implementation = CreateCollectionCommand.Options.LexicalConfigDefinition.class)
CreateCollectionCommand.Options.LexicalConfigDefinition lexical)
implements CollectionCommand {

@Override
public CommandName commandName() {
return CommandName.ALTER_COLLECTION;
}

@Override
public boolean isForceSchemaRefresh() {
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import io.smallrye.mutiny.Uni;
import io.stargate.sgv2.jsonapi.ConfigPreLoader;
import io.stargate.sgv2.jsonapi.api.model.command.*;
import io.stargate.sgv2.jsonapi.api.model.command.impl.AlterCollectionCommand;
import io.stargate.sgv2.jsonapi.api.model.command.impl.AlterTableCommand;
import io.stargate.sgv2.jsonapi.api.model.command.impl.CountDocumentsCommand;
import io.stargate.sgv2.jsonapi.api.model.command.impl.CreateIndexCommand;
Expand Down Expand Up @@ -138,6 +139,7 @@ public CollectionResource(
InsertManyCommand.class,
UpdateManyCommand.class,
UpdateOneCommand.class,
AlterCollectionCommand.class,
// Table Only commands
AlterTableCommand.class,
CreateIndexCommand.class,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ public enum Code implements ErrorCode<SchemaException> {
EXISTING_COLLECTION_DIFFERENT_SETTINGS,

EXISTING_TABLE_NOT_DATA_API_COLLECTION, // converted from ErrorCodeV1
INVALID_ALTER_COLLECTION_OPTIONS,
INVALID_CREATE_COLLECTION_OPTIONS,
INVALID_FORMAT_FOR_INDEX_CREATION_COLUMN,
INVALID_INDEXING_DEFINITION,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
package io.stargate.sgv2.jsonapi.service.operation.collections;

import com.datastax.oss.driver.api.core.CqlIdentifier;
import com.datastax.oss.driver.api.core.cql.AsyncResultSet;
import com.datastax.oss.driver.api.core.cql.SimpleStatement;
import com.fasterxml.jackson.core.JacksonException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import io.smallrye.mutiny.Uni;
import io.stargate.sgv2.jsonapi.api.model.command.CommandContext;
import io.stargate.sgv2.jsonapi.api.model.command.CommandResult;
import io.stargate.sgv2.jsonapi.api.request.RequestContext;
import io.stargate.sgv2.jsonapi.config.constants.DocumentConstants;
import io.stargate.sgv2.jsonapi.config.constants.TableCommentConstants;
import io.stargate.sgv2.jsonapi.service.cqldriver.executor.QueryExecutor;
import io.stargate.sgv2.jsonapi.service.operation.Operation;
import io.stargate.sgv2.jsonapi.service.resolver.CreateCollectionCommandResolver;
import io.stargate.sgv2.jsonapi.service.schema.collections.CollectionLexicalConfig;
import io.stargate.sgv2.jsonapi.service.schema.collections.CollectionSchemaObject;
import java.time.Duration;
import java.util.function.Supplier;

/**
* Operation that enables the lexical feature on an existing collection by adding the {@code
* query_lexical_value} column, creating an analyzed SAI index on it, and updating the table
* "comment" JSON to record the new lexical config.
*
* <p>When {@link #noOp} is true the operation returns success without executing any DDL: this is
* the "already enabled with same settings" case.
*
* <p><b>No rollback on partial failure.</b> If e.g. ADD COLUMN succeeds but CREATE INDEX fails, the
* column is left in place and the failure is propagated to the caller. This matches {@link
* CreateCollectionOperation}'s behavior and is intentional:
*
* <ul>
* <li>Reverse DDL (DROP COLUMN, DROP INDEX) is itself fallible — a rollback that fails leaves the
* schema in a worse state than the original partial failure and obscures the root cause.
* <li>The operation is designed to be retry-safe: ADD COLUMN is skipped when the column already
* exists, CREATE INDEX uses {@code IF NOT EXISTS}, and ALTER TABLE WITH comment is naturally
* idempotent. Re-issuing the same {@code alterCollection} command after the underlying issue
* is resolved completes the unfinished steps without re-running the finished ones.
* <li>Users get a consistent mental model with {@code createCollection}, which has the same
* partial-failure semantics.
* </ul>
*/
public record AlterCollectionLexicalOperation(
CommandContext<CollectionSchemaObject> commandContext,
ObjectMapper objectMapper,
int ddlDelayMillis,
CollectionLexicalConfig newLexicalConfig,
boolean noOp)
implements Operation<CollectionSchemaObject> {

private static final CqlIdentifier COMMENT_OPTION = CqlIdentifier.fromInternal("comment");

private static final CqlIdentifier LEXICAL_COLUMN =
CqlIdentifier.fromInternal(DocumentConstants.Columns.LEXICAL_INDEX_COLUMN_NAME);

@Override
public Uni<Supplier<CommandResult>> execute(
RequestContext requestContext, QueryExecutor queryExecutor) {

if (noOp) {
// Type witness needed: Mutiny's item(T) and item(Supplier<? extends T>) overloads otherwise
// both match SchemaChangeResult (which is a Supplier<CommandResult>), and inference picks
// the wrong T.
return Uni.createFrom().<Supplier<CommandResult>>item(new SchemaChangeResult(true));
}

final CollectionSchemaObject schemaObject = commandContext.schemaObject();
final String keyspace = schemaObject.tableMetadata().getKeyspace().asInternal();
final String table = schemaObject.tableMetadata().getName().asInternal();

final String newComment;
try {
newComment = buildUpdatedComment(schemaObject);
} catch (JacksonException e) {
return Uni.createFrom().failure(e);
}

// Idempotent for retry after partial failure: skip ADD COLUMN if the column already exists.
final boolean columnAlreadyExists =
schemaObject.tableMetadata().getColumn(LEXICAL_COLUMN).isPresent();

SimpleStatement createIndexStmt =
CreateCollectionOperation.buildLexicalIndexStatement(
keyspace, table, newLexicalConfig, /* ifNotExists */ true);

// Cassandra does not accept bind parameters for table options like `comment`; embed the
// JSON directly with CQL single-quote escaping (matches
// CreateCollectionOperation.getCreateTable).
SimpleStatement alterCommentStmt =
SimpleStatement.newInstance(
"ALTER TABLE \"%s\".\"%s\" WITH comment = '%s'"
.formatted(keyspace, table, newComment.replace("'", "''")));

final Duration delay = Duration.ofMillis(ddlDelayMillis > 0 ? ddlDelayMillis : 100);

Uni<AsyncResultSet> pipeline;
if (columnAlreadyExists) {
pipeline = queryExecutor.executeCreateSchemaChange(requestContext, createIndexStmt);
} else {
SimpleStatement addColumnStmt =
SimpleStatement.newInstance(
"ALTER TABLE \"%s\".\"%s\" ADD %s text"
.formatted(keyspace, table, DocumentConstants.Columns.LEXICAL_INDEX_COLUMN_NAME));
pipeline =
queryExecutor
.executeCreateSchemaChange(requestContext, addColumnStmt)
.onItem()
.delayIt()
.by(delay)
.onItem()
.transformToUni(
r1 -> queryExecutor.executeCreateSchemaChange(requestContext, createIndexStmt));
}

return pipeline
.onItem()
.delayIt()
.by(delay)
.onItem()
.transformToUni(
r2 -> queryExecutor.executeCreateSchemaChange(requestContext, alterCommentStmt))
.map(r3 -> new SchemaChangeResult(true));
}

/**
* Reads the current table comment JSON and surgically replaces the {@code
* collection.options.lexical} sub-node, leaving all other options (vector / indexing / id /
* rerank / unknown fields) untouched.
*
* <p>The resolver guarantees we are operating on a V1-shaped comment (legacy/V0 collections are
* rejected before reaching the operation).
*/
private String buildUpdatedComment(CollectionSchemaObject schemaObject) throws JacksonException {
final Object commentObj = schemaObject.tableMetadata().getOptions().get(COMMENT_OPTION);
final String comment = commentObj == null ? null : commentObj.toString();
if (comment == null || comment.isBlank()) {
// Defensive: resolver should have rejected this case.
throw new IllegalStateException(
"Cannot alter collection: table comment is empty; expected V1 schema");
}

final ObjectNode rootNode = (ObjectNode) objectMapper.readTree(comment);
final ObjectNode collectionNode =
(ObjectNode) rootNode.get(TableCommentConstants.TOP_LEVEL_KEY);
if (collectionNode == null) {
// Defensive: resolver should have rejected this case.
throw new IllegalStateException(
"Cannot alter collection: comment does not have '"
+ TableCommentConstants.TOP_LEVEL_KEY
+ "' node");
}
ObjectNode optionsNode = (ObjectNode) collectionNode.get(TableCommentConstants.OPTIONS_KEY);
if (optionsNode == null) {
optionsNode = objectMapper.createObjectNode();
collectionNode.set(TableCommentConstants.OPTIONS_KEY, optionsNode);
}
CreateCollectionCommandResolver.addLexicalToOptionsNode(optionsNode, newLexicalConfig);
return objectMapper.writeValueAsString(rootNode);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import com.datastax.oss.driver.api.core.cql.SimpleStatement;
import com.datastax.oss.driver.api.core.servererrors.InvalidQueryException;
import io.stargate.sgv2.jsonapi.config.constants.DocumentConstants;
import io.stargate.sgv2.jsonapi.exception.*;
import io.stargate.sgv2.jsonapi.service.cqldriver.executor.DefaultDriverExceptionHandler;
import io.stargate.sgv2.jsonapi.service.operation.tables.CreateIndexExceptionHandler;
Expand Down Expand Up @@ -52,7 +53,9 @@ public RuntimeException handle(InvalidQueryException exception) {
if (exception
.getMessage()
.contains(
"analyzed size for column query_lexical_value exceeds the cumulative limit for index")) {
"analyzed size for column "
+ DocumentConstants.Columns.LEXICAL_INDEX_COLUMN_NAME
+ " exceeds the cumulative limit for index")) {
return DocumentException.Code.LEXICAL_CONTENT_TOO_LONG.get(errVars(schemaObject, exception));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import io.stargate.sgv2.jsonapi.api.model.command.tracing.RequestTracing;
import io.stargate.sgv2.jsonapi.api.request.RequestContext;
import io.stargate.sgv2.jsonapi.config.DatabaseLimitsConfig;
import io.stargate.sgv2.jsonapi.config.constants.DocumentConstants;
import io.stargate.sgv2.jsonapi.exception.DatabaseException;
import io.stargate.sgv2.jsonapi.exception.SchemaException;
import io.stargate.sgv2.jsonapi.service.cqldriver.CQLSessionCache;
Expand Down Expand Up @@ -526,7 +527,10 @@ public static SimpleStatement getCreateTable(
String comment,
CollectionLexicalConfig lexicalConfig) {
// The keyspace and table name are quoted to make it case-sensitive
final String lexicalField = lexicalConfig.enabled() ? " query_lexical_value text, " : "";
final String lexicalField =
lexicalConfig.enabled()
? " %s text, ".formatted(DocumentConstants.Columns.LEXICAL_INDEX_COLUMN_NAME)
: "";
if (vectorSearch) {
String createTableWithVector =
"CREATE TABLE IF NOT EXISTS \"%s\".\"%s\" ("
Expand Down Expand Up @@ -643,18 +647,31 @@ public List<SimpleStatement> getIndexStatements(
}

if (lexicalConfig.enabled()) {
var analyzerDef = lexicalConfig.analyzerDefinition();
// Note: needs to be either plain (unquoted) String (NOT quoted JSON String) OR JSON Object
final String analyzerString =
analyzerDef.isTextual() ? analyzerDef.asText() : analyzerDef.toString();
final String lexicalCreateStmt =
"""
%s "%s_query_lexical_value" ON "%s"."%s" (query_lexical_value)
USING 'StorageAttachedIndex' WITH OPTIONS = { 'index_analyzer': '%s' }
"""
.formatted(appender, table, keyspace, table, analyzerString);
statements.add(SimpleStatement.newInstance(lexicalCreateStmt));
statements.add(buildLexicalIndexStatement(keyspace, table, lexicalConfig, collectionExisted));
}
return statements;
}

/**
* Builds the {@code CREATE CUSTOM INDEX} statement for the lexical column, used both by
* createCollection (when the table is fresh or being recreated) and by alterCollection (when
* enabling lexical on an existing collection).
*
* @param ifNotExists when true, emits {@code IF NOT EXISTS} for idempotent retries
*/
public static SimpleStatement buildLexicalIndexStatement(
String keyspace, String table, CollectionLexicalConfig lexicalConfig, boolean ifNotExists) {
var analyzerDef = lexicalConfig.analyzerDefinition();
// Note: needs to be either plain (unquoted) String (NOT quoted JSON String) OR JSON Object
final String analyzerString =
analyzerDef.isTextual() ? analyzerDef.asText() : analyzerDef.toString();
final String lexicalCol = DocumentConstants.Columns.LEXICAL_INDEX_COLUMN_NAME;
final String prefix = ifNotExists ? "CREATE CUSTOM INDEX IF NOT EXISTS" : "CREATE CUSTOM INDEX";
return SimpleStatement.newInstance(
"""
%s "%s_%s" ON "%s"."%s" (%s)
USING 'StorageAttachedIndex' WITH OPTIONS = { 'index_analyzer': '%s' }
"""
.formatted(prefix, table, lexicalCol, keyspace, table, lexicalCol, analyzerString));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import io.stargate.sgv2.jsonapi.api.model.command.CommandContext;
import io.stargate.sgv2.jsonapi.api.model.command.CommandResult;
import io.stargate.sgv2.jsonapi.api.request.RequestContext;
import io.stargate.sgv2.jsonapi.config.constants.DocumentConstants;
import io.stargate.sgv2.jsonapi.exception.DocumentException;
import io.stargate.sgv2.jsonapi.exception.SchemaException;
import io.stargate.sgv2.jsonapi.service.cqldriver.executor.QueryExecutor;
Expand Down Expand Up @@ -215,7 +216,7 @@ public String buildInsertQuery(boolean vectorEnabled) {
insertQuery.append(", query_vector_value");
}
if (lexicalEnabled) {
insertQuery.append(", query_lexical_value");
insertQuery.append(", ").append(DocumentConstants.Columns.LEXICAL_INDEX_COLUMN_NAME);
}

insertQuery.append(") VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?");
Expand Down
Loading
Loading