Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions conf/cassandra.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -957,6 +957,9 @@ index_summary_capacity:
# Min unit: m
index_summary_resize_interval: 60m

# How frequently (in seconds) to poll the index_events table for peer index status changes.
# index_status_poll_interval_in_seconds: 30

# Whether to, when doing sequential writing, fsync() at intervals in
# order to force the operating system to flush the dirty
# buffers. Enable this to avoid sudden dirty buffer flushing from
Expand Down
1 change: 1 addition & 0 deletions src/java/org/apache/cassandra/config/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -944,6 +944,7 @@ public static void setClientMode(boolean clientMode)

public volatile String default_secondary_index = CassandraIndex.NAME;
public volatile boolean default_secondary_index_enabled = true;
public volatile int index_status_poll_interval_in_seconds = 30;

public volatile boolean uncompressed_tables_enabled = true;
public volatile boolean compact_tables_enabled = true;
Expand Down
10 changes: 10 additions & 0 deletions src/java/org/apache/cassandra/config/DatabaseDescriptor.java
Original file line number Diff line number Diff line change
Expand Up @@ -4710,6 +4710,16 @@ public static void setDefaultSecondaryIndexEnabled(boolean enabled)
conf.default_secondary_index_enabled = enabled;
}

public static int getIndexStatusPollInterval()
{
return conf.index_status_poll_interval_in_seconds;
}

public static void setIndexStatusPollInterval(int seconds)
{
conf.index_status_poll_interval_in_seconds = seconds;
}

public static boolean isTransientReplicationEnabled()
{
return conf.transient_replication_enabled;
Expand Down
233 changes: 224 additions & 9 deletions src/java/org/apache/cassandra/index/IndexStatusManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,18 @@

package org.apache.cassandra.index;

import java.nio.charset.StandardCharsets;
import java.time.Instant;
import java.time.LocalDate;
import java.time.ZoneOffset;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

Expand All @@ -36,6 +42,9 @@
import org.slf4j.LoggerFactory;

import org.apache.cassandra.concurrent.ExecutorPlus;
import org.apache.cassandra.concurrent.ScheduledExecutors;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.cql3.UntypedResultSet;
import org.apache.cassandra.db.ConsistencyLevel;
import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.exceptions.ReadFailureException;
Expand All @@ -46,10 +55,12 @@
import org.apache.cassandra.locator.Endpoints;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.locator.Replica;
import org.apache.cassandra.schema.SystemDistributedKeyspace;
import org.apache.cassandra.serializers.MarshalException;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.tcm.ClusterMetadata;
import org.apache.cassandra.utils.CassandraVersion;
import org.apache.cassandra.utils.Clock;
import org.apache.cassandra.utils.ExecutorUtils;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.JsonUtils;
Expand All @@ -70,6 +81,14 @@ public class IndexStatusManager

public static final IndexStatusManager instance = new IndexStatusManager();

private static final int MAX_GOSSIP_VALUE_SIZE = 65535;

public static final String TABLE_FALLBACK_MARKER = "TABLE_FALLBACK";

private volatile long lastPollTimestampMillis = 0;

private ScheduledFuture<?> pollFuture;

// executes index status propagation task asynchronously to avoid potential deadlock on SIM
private final ExecutorPlus statusPropagationExecutor = executorFactory().withJmxInternal()
.sequential("StatusPropagationExecutor");
Expand Down Expand Up @@ -163,7 +182,16 @@ public synchronized void receivePeerIndexStatus(InetAddressAndPort endpoint, Ver
if (endpoint.equals(FBUtilities.getBroadcastAddressAndPort()))
return;

Map<String, Index.Status> indexStatusMap = statusMapFromString(versionedValue);
Map<String, Index.Status> indexStatusMap;

if (versionedValue.value.equals(TABLE_FALLBACK_MARKER))
{
indexStatusMap = SystemDistributedKeyspace.allIndexStatusesForHost(StorageService.instance.getHostIdForEndpoint(endpoint));
}
else
{
indexStatusMap = statusMapFromString(versionedValue);
}

Map<String, Index.Status> oldStatus = peerIndexStatus.put(endpoint, indexStatusMap);
Map<String, Index.Status> updated = updatedIndexStatuses(oldStatus, indexStatusMap);
Expand Down Expand Up @@ -229,29 +257,65 @@ public synchronized void propagateLocalIndexStatus(String keyspace, String index
Map<String, Index.Status> statusMap = peerIndexStatus.computeIfAbsent(FBUtilities.getBroadcastAddressAndPort(),
k -> new HashMap<>());
String keyspaceIndex = identifier(keyspace, index);
UUID localHostId = StorageService.instance.getLocalHostUUID();
CassandraVersion minVersion = ClusterMetadata.current().directory.clusterMinVersion.cassandraVersion;

if (status == Index.Status.DROPPED)
{
statusMap.remove(keyspaceIndex);
if (localHostId != null)
statusPropagationExecutor.submit(() -> {
if (shouldWriteToIndexTables(minVersion))
{
SystemDistributedKeyspace.setIndexRemoved(localHostId, keyspace, index);
SystemDistributedKeyspace.recordIndexEvent(localHostId, keyspace, index, status);
}
});
}
else
{
statusMap.put(keyspaceIndex, status);
if (localHostId != null)
statusPropagationExecutor.submit(() -> {
if (shouldWriteToIndexTables(minVersion))
{
SystemDistributedKeyspace.updateIndexStatus(localHostId, keyspace, index, status);
SystemDistributedKeyspace.recordIndexEvent(localHostId, keyspace, index, status);
}
});
}

// Don't try and propagate if the gossiper isn't enabled. This is primarily for tests where the
// Gossiper has not been started. If we attempt to propagate when not started an exception is
// logged and this causes a number of dtests to fail.
if (Gossiper.instance.isEnabled())
// Only propagate via gossip when the gossiper is enabled and the cluster is not fully on 6.0+.
// Once all nodes are on 6.0+, index status is propagated via table polling instead.
if (Gossiper.instance.isEnabled() && !shouldWriteToIndexTables(minVersion))
{
// Versions 5.0.0 through 5.0.2 use a much more bloated format that duplicates keyspace names
// and writes full status names instead of their numeric codes. If the minimum cluster version is
// unknown or one of those 3 versions, continue to propagate the old format.
CassandraVersion minVersion = ClusterMetadata.current().directory.clusterMinVersion.cassandraVersion;

String newSerializedStatusMap = shouldWriteLegacyStatusFormat(minVersion) ? JsonUtils.writeAsJsonString(statusMap)
String newSerializedStatusMap = shouldWriteLegacyStatusFormat(minVersion) ? JsonUtils.writeAsJsonString(statusMap)
: toSerializedFormat(statusMap);

byte[] utf8Bytes = newSerializedStatusMap.getBytes(StandardCharsets.UTF_8);
String gossipPayload;

if (utf8Bytes.length > MAX_GOSSIP_VALUE_SIZE)
{
logger.error("Index status gossip payload size ({} bytes) exceeds limit ({} bytes), please consider removing unwanted indexes.",
utf8Bytes.length, MAX_GOSSIP_VALUE_SIZE);
gossipPayload = TABLE_FALLBACK_MARKER;
}
else
{
if (utf8Bytes.length > MAX_GOSSIP_VALUE_SIZE * 0.8)
logger.warn("Index status gossip payload size ({} bytes) approaching the limit ({} bytes), please consider removing unwanted indexes.",
utf8Bytes.length, MAX_GOSSIP_VALUE_SIZE);
gossipPayload = newSerializedStatusMap;
}

statusPropagationExecutor.submit(() -> {
// schedule gossiper update asynchronously to avoid potential deadlock when another thread is holding
// gossiper taskLock.
VersionedValue value = StorageService.instance.valueFactory.indexStatus(newSerializedStatusMap);
VersionedValue value = StorageService.instance.valueFactory.indexStatus(gossipPayload);
Gossiper.instance.addLocalApplicationState(ApplicationState.INDEX_STATUS, value);
});
}
Expand All @@ -267,6 +331,29 @@ private static boolean shouldWriteLegacyStatusFormat(CassandraVersion minVersion
return minVersion == null || (minVersion.major == 5 && minVersion.minor == 0 && minVersion.patch < 3);
}

private static boolean shouldWriteToIndexTables(CassandraVersion minVersion)
{
return minVersion != null && (minVersion.major >= 6);
}

@VisibleForTesting
static boolean shouldWriteToIndexTablesForTesting(CassandraVersion minVersion)
{
return shouldWriteToIndexTables(minVersion);
}

@VisibleForTesting
void processEventsForTesting(UntypedResultSet results)
{
processEvents(results);
}

@VisibleForTesting
void resetLastPollTimestamp()
{
lastPollTimestampMillis = 0;
}

/**
* Serializes as a JSON string the status of the indexes in the provided map.
* <p>
Expand Down Expand Up @@ -340,8 +427,136 @@ private String identifier(String keyspace, String index)
return keyspace + '.' + index;
}

/**
* Load index statuses from the system_distributed.index_build_status table on startup
* so that index statuses are known before gossip starts.
*/
public synchronized void loadIndexStatusesFromTable()
{
try
{
Map<UUID, Map<String, Index.Status>> allStatuses = SystemDistributedKeyspace.allIndexStatuses();
for (Map.Entry<UUID, Map<String, Index.Status>> entry : allStatuses.entrySet())
{
InetAddressAndPort endpoint = StorageService.instance.getEndpointForHostId(entry.getKey());
if (endpoint == null)
continue;

peerIndexStatus.putIfAbsent(endpoint, entry.getValue());
}
logger.info("Loaded index statuses from system table for {} peers", allStatuses.size());
}
catch (Exception e)
{
logger.warn("Unable to load index statuses from system table: {}", e.getMessage());
}
}

/**
* Refresh index statuses from the full table, overwriting any existing data.
*/
public synchronized void refreshFromFullTable()
{
try
{
Map<UUID, Map<String, Index.Status>> allStatuses = SystemDistributedKeyspace.allIndexStatuses();
for (Map.Entry<UUID, Map<String, Index.Status>> entry : allStatuses.entrySet())
{
InetAddressAndPort endpoint = StorageService.instance.getEndpointForHostId(entry.getKey());
if (endpoint == null)
continue;

peerIndexStatus.put(endpoint, entry.getValue());
}
logger.info("Refreshed index statuses from system table for {} peers", allStatuses.size());
}
catch (Exception e)
{
logger.warn("Unable to refresh index statuses from system table: {}", e.getMessage());
}
}

public void shutdownAndWait(long interval, TimeUnit unit) throws InterruptedException, TimeoutException
{
if (pollFuture != null)
pollFuture.cancel(false);

ExecutorUtils.shutdownAndWait(interval, unit, statusPropagationExecutor);
}

public void startPolling()
{
int intervalSeconds = DatabaseDescriptor.getIndexStatusPollInterval();
if (intervalSeconds <= 0)
return;

pollFuture = ScheduledExecutors.scheduledTasks.scheduleWithFixedDelay(
this::pollIndexEvents,
intervalSeconds,
intervalSeconds,
TimeUnit.SECONDS);
}

private synchronized void pollIndexEvents()
{
if (lastPollTimestampMillis == 0)
{
refreshFromFullTable();
lastPollTimestampMillis = Clock.Global.currentTimeMillis();
}
else
{
try
{
String today = LocalDate.now(ZoneOffset.UTC).toString();
String lastPollDate = Instant.ofEpochMilli(lastPollTimestampMillis).atZone(ZoneOffset.UTC).toLocalDate().toString();

if (!today.equals(lastPollDate))
{
// midnight crossed so get remaining events from last day.
UntypedResultSet yesterdayResults = SystemDistributedKeyspace.queryIndexEvents(lastPollDate, lastPollTimestampMillis);
if (yesterdayResults != null)
processEvents(yesterdayResults);
}

UntypedResultSet todayResults = SystemDistributedKeyspace.queryIndexEvents(today, lastPollTimestampMillis);
if (todayResults != null)
processEvents(todayResults);

lastPollTimestampMillis = Clock.Global.currentTimeMillis();
}
catch (Exception e)
{
logger.warn("Unable to load index events from system table: {}", e.getMessage());
}
}
}

private void processEvents(UntypedResultSet results)
{
for (UntypedResultSet.Row row : results)
{
UUID hostId = row.getUUID("host_id");
if ((hostId == null) || hostId.equals(StorageService.instance.getLocalHostUUID()))
continue;

InetAddressAndPort endpoint = StorageService.instance.getEndpointForHostId(hostId);
if (endpoint == null)
continue;

String indexName = row.getString("index_name");
Index.Status status = Index.Status.valueOf(row.getString("event"));

if (status == Index.Status.DROPPED)
{
Map<String, Index.Status> statusMap = peerIndexStatus.get(endpoint);
if (statusMap != null)
statusMap.remove(indexName);
}
else
{
peerIndexStatus.computeIfAbsent(endpoint, k -> new HashMap<>()).put(indexName, status);
}
}
}
}
Loading