Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
# See https://github.com/apache/solr/blob/main/dev-docs/changelog.adoc
title: ShardTerms can now induce a leader election if needed
type: other # added, changed, fixed, deprecated, removed, dependency_update, security, other
authors:
- name: Houston Putman
nick: HoustonPutman
links:
- name: SOLR-18080
url: https://issues.apache.org/jira/browse/SOLR-18080
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import java.lang.invoke.MethodHandles;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.core.CoreContainer;
import org.apache.solr.core.CoreDescriptor;
import org.apache.solr.core.SolrCore;
Expand Down Expand Up @@ -52,15 +53,55 @@ public boolean onTermChanged(ShardTerms terms) {
if (solrCore.getCoreDescriptor() == null
|| solrCore.getCoreDescriptor().getCloudDescriptor() == null) return true;
String coreNodeName = solrCore.getCoreDescriptor().getCloudDescriptor().getCoreNodeName();
if (terms.haveHighestTermValue(coreNodeName)) return true;
if (lastTermDoRecovery.get() < terms.getTerm(coreNodeName)) {
log.info(
"Start recovery on {} because core's term is less than leader's term", coreNodeName);
lastTermDoRecovery.set(terms.getTerm(coreNodeName));
solrCore
.getUpdateHandler()
.getSolrCoreState()
.doRecovery(solrCore.getCoreContainer(), solrCore.getCoreDescriptor());

// If we have the highest term, there is nothing to do
if (terms.haveHighestTermValue(coreNodeName)) {
return true;
}

long lastRecoveryTerm;
long newTerm;
synchronized (lastTermDoRecovery) {
lastRecoveryTerm = lastTermDoRecovery.get();
newTerm = terms.getTerm(coreNodeName);
if (lastRecoveryTerm < newTerm) {
lastTermDoRecovery.set(newTerm);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lastTermDoRecovery is set here but its possible recovery is deferred below because of leader election now. Is that right? The old logic set it, then actually does recovery regardless. Reading this, seems like there is a possibility that lastTermDoRecovery is set to the new term but can skip actually doing recovery further down. So the term this was set to is incorrect based on the name if recovery is skipped?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah lastTermDoRecovery might be a bad name, but after leader election, recovery is guaranteed for these replicas at this term value. So while recovery is not explicitly being done here, we know that leader election will do the recovery. So lastTermDoRecovery is still technically correct, just assuming the leader election succeeds.

}
}

if (coreDescriptor.getCloudDescriptor().isLeader()) {
log.warn(
"Removing {} leader as leader, since its term is no longer the highest. This will initiate recovery",
coreNodeName);
coreContainer.getZkController().giveupLeadership(coreDescriptor);
} else if (lastRecoveryTerm < newTerm) {
CloudDescriptor cloudDescriptor = solrCore.getCoreDescriptor().getCloudDescriptor();
Replica leaderReplica =
solrCore
.getCoreContainer()
.getZkController()
.getClusterState()
.getCollection(cloudDescriptor.getCollectionName())
.getSlice(cloudDescriptor.getShardId())
.getLeader();

// Only recover if the leader replica still has the highest term.
// If not, then the leader-election process will take care of recovery.
if (leaderReplica != null && terms.canBecomeLeader(leaderReplica.getName())) {
log.info(
"Start recovery on {} because core's term is less than leader's term", coreNodeName);
solrCore
.getUpdateHandler()
.getSolrCoreState()
.doRecovery(solrCore.getCoreContainer(), solrCore.getCoreDescriptor());
} else {
if (log.isInfoEnabled()) {
log.info(
"Defer recovery on {} because leader-election will happen soon, old leader: {}",
coreNodeName,
leaderReplica == null ? null : leaderReplica.getName());
}
}
}
} catch (Exception e) {
if (log.isInfoEnabled()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,41 +205,13 @@ void runLeaderProcess(boolean weAreReplacement) throws KeeperException, Interrup
}
}

PeerSync.PeerSyncResult result = null;
boolean success = false;
try {
result = syncStrategy.sync(zkController, core, leaderProps, weAreReplacement);
PeerSync.PeerSyncResult result =
syncStrategy.sync(zkController, core, leaderProps, weAreReplacement, true);
success = result.isSuccess();
} catch (Exception e) {
log.error("Exception while trying to sync", e);
result = PeerSync.PeerSyncResult.failure();
}

UpdateLog ulog = core.getUpdateHandler().getUpdateLog();

if (!success) {
boolean hasRecentUpdates = false;
if (ulog != null) {
// TODO: we could optimize this if necessary
try (UpdateLog.RecentUpdates recentUpdates = ulog.getRecentUpdates()) {
hasRecentUpdates = !recentUpdates.getVersions(1).isEmpty();
}
}

if (!hasRecentUpdates) {
// we failed sync, but we have no versions - we can't sync in that case
// - we were active
// before, so become leader anyway if no one else has any versions either
if (result.getOtherHasVersions().orElse(false)) {
log.info(
"We failed sync, but we have no versions - we can't sync in that case. But others have some versions, so we should not become leader");
success = false;
} else {
log.info(
"We failed sync, but we have no versions - we can't sync in that case - we were active before, so become leader anyway");
success = true;
}
}
}

// solrcloud_debug
Expand All @@ -250,7 +222,7 @@ void runLeaderProcess(boolean weAreReplacement) throws KeeperException, Interrup
try {
if (log.isDebugEnabled()) {
log.debug(
"{} synched {}",
"{} synced {}",
core.getCoreContainer().getZkController().getNodeName(),
searcher.count(new MatchAllDocsQuery()));
}
Expand Down Expand Up @@ -507,12 +479,10 @@ private void rejoinLeaderElection(SolrCore core) throws InterruptedException, Ke
return;
}

log.info("There may be a better leader candidate than us - going back into recovery");
log.info("There may be a better leader candidate than us - rejoining the election");

cancelElection();

core.getUpdateHandler().getSolrCoreState().doRecovery(cc, core.getCoreDescriptor());

leaderElector.joinElection(this, true);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

import java.lang.invoke.MethodHandles;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.curator.framework.api.transaction.CuratorTransactionResult;
import org.apache.curator.framework.api.transaction.OperationType;
import org.apache.solr.cloud.overseer.OverseerAction;
Expand Down Expand Up @@ -182,8 +184,7 @@ void runLeaderProcess(boolean weAreReplacement) throws KeeperException, Interrup
.getClusterState()
.getCollection(collection)
.getSlice(shardId)
.getReplicas()
.size()
.getNumLeaderReplicas()
< 2) {
Replica leader = zkStateReader.getLeader(collection, shardId);
if (leader != null
Expand Down Expand Up @@ -239,6 +240,18 @@ void runLeaderProcess(boolean weAreReplacement) throws KeeperException, Interrup
prs)
.persist(coll.getZNode(), zkStateReader.getZkClient());
}
try {
zkStateReader.waitForState(
collection,
10,
TimeUnit.SECONDS,
state -> {
Replica leader = state.getLeader(shardId);
return leader != null && id.equals(leader.getName());
});
} catch (TimeoutException e) {
throw new RuntimeException(e);
}
}
}

Expand Down
22 changes: 22 additions & 0 deletions solr/core/src/java/org/apache/solr/cloud/ShardTerms.java
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,28 @@ private boolean skipIncreaseTermOf(String key, Set<String> replicasNeedingRecove
return replicasNeedingRecovery.contains(key);
}

public ShardTerms setHighestTerms(Set<String> highestTermKeys) {
long newMaxTerm = maxTerm + 1;
boolean keyFound = false;
HashMap<String, Long> newValues = new HashMap<>(values);
long nextHighestTerm = -1;
for (String key : values.keySet()) {
if (highestTermKeys.contains(key)) {
newValues.put(key, newMaxTerm);
keyFound = true;
} else {
nextHighestTerm = Math.max(nextHighestTerm, values.get(key));
}
}
// We only want to update if increasing the maxTerm makes an impact.
// If the nextHighestTerm is already < maxTerm, then upping the maxTerm doesn't do anything.
if (nextHighestTerm == maxTerm && keyFound) {
return new ShardTerms(newValues, version);
} else {
return null;
}
}

/**
* Return a new {@link ShardTerms} in which the highest terms are not zero
*
Expand Down
85 changes: 62 additions & 23 deletions solr/core/src/java/org/apache/solr/cloud/SyncStrategy.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.jetty.HttpJettySolrClient;
import org.apache.solr.client.solrj.request.CoreAdminRequest.RequestRecovery;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.ZkCoreNodeProps;
import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.params.CoreAdminParams.CoreAdminAction;
Expand All @@ -40,6 +41,7 @@
import org.apache.solr.handler.component.ShardRequest;
import org.apache.solr.handler.component.ShardResponse;
import org.apache.solr.update.PeerSync;
import org.apache.solr.update.UpdateLog;
import org.apache.solr.update.UpdateShardHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -79,14 +81,15 @@ private static class ShardCoreRequest extends ShardRequest {

public PeerSync.PeerSyncResult sync(
ZkController zkController, SolrCore core, ZkNodeProps leaderProps) {
return sync(zkController, core, leaderProps, false);
return sync(zkController, core, leaderProps, false, false);
}

public PeerSync.PeerSyncResult sync(
ZkController zkController,
SolrCore core,
ZkNodeProps leaderProps,
boolean peerSyncOnlyWithActive) {
boolean peerSyncOnlyWithActive,
boolean ignoreNoVersionsFailure) {
if (SKIP_AUTO_RECOVERY) {
return PeerSync.PeerSyncResult.success();
}
Expand All @@ -102,19 +105,16 @@ public PeerSync.PeerSyncResult sync(
log.info("Sync replicas to {}", ZkCoreNodeProps.getCoreUrl(leaderProps));
}

if (core.getUpdateHandler().getUpdateLog() == null) {
log.error("No UpdateLog found - cannot sync");
return PeerSync.PeerSyncResult.failure();
}

return syncReplicas(zkController, core, leaderProps, peerSyncOnlyWithActive);
return syncReplicas(
zkController, core, leaderProps, peerSyncOnlyWithActive, ignoreNoVersionsFailure);
}

private PeerSync.PeerSyncResult syncReplicas(
ZkController zkController,
SolrCore core,
ZkNodeProps leaderProps,
boolean peerSyncOnlyWithActive) {
boolean peerSyncOnlyWithActive,
boolean ignoreNoVersionsFailure) {
if (isClosed) {
log.info("We have been closed, won't sync with replicas");
return PeerSync.PeerSyncResult.failure();
Expand All @@ -129,9 +129,32 @@ private PeerSync.PeerSyncResult syncReplicas(

// first sync ourselves - we are the potential leader after all
try {
result =
syncWithReplicas(
zkController, core, leaderProps, collection, shardId, peerSyncOnlyWithActive);
if (core.getUpdateHandler().getUpdateLog() == null) {
log.error("No UpdateLog found - cannot sync");
result = PeerSync.PeerSyncResult.failure();
} else {
result =
syncWithReplicas(
zkController, core, leaderProps, collection, shardId, peerSyncOnlyWithActive);
}

if (!result.isSuccess() && ignoreNoVersionsFailure) {
UpdateLog ulog = core.getUpdateHandler().getUpdateLog();
boolean hasRecentUpdates = false;
if (ulog != null) {
// TODO: we could optimize this if necessary
try (UpdateLog.RecentUpdates recentUpdates = ulog.getRecentUpdates()) {
hasRecentUpdates = !recentUpdates.getVersions(1).isEmpty();
}
}
// we failed sync, but we have no versions - we can't sync in that case
// - we were active before, so continue if no one else has any versions either
if (!hasRecentUpdates && !result.getOtherHasVersions().orElse(false)) {
log.info(
"We failed sync, but we have no versions - we can't sync in that case - so continue");
result = PeerSync.PeerSyncResult.success();
}
}
success = result.isSuccess();
} catch (Exception e) {
log.error("Sync Failed", e);
Expand Down Expand Up @@ -173,7 +196,7 @@ private PeerSync.PeerSyncResult syncWithReplicas(
String shardId,
boolean peerSyncOnlyWithActive)
throws Exception {
List<ZkCoreNodeProps> nodes =
List<Replica> replicas =
zkController
.getZkStateReader()
.getReplicaProps(
Expand All @@ -186,13 +209,13 @@ private PeerSync.PeerSyncResult syncWithReplicas(
return PeerSync.PeerSyncResult.failure();
}

if (nodes == null) {
if (replicas == null) {
// I have no replicas
return PeerSync.PeerSyncResult.success();
}

List<String> syncWith = new ArrayList<>(nodes.size());
for (ZkCoreNodeProps node : nodes) {
List<String> syncWith = new ArrayList<>(replicas.size());
for (Replica node : replicas) {
syncWith.add(node.getCoreUrl());
}

Expand Down Expand Up @@ -230,32 +253,48 @@ private void syncToMe(

// sync everyone else
// TODO: we should do this in parallel at least
List<ZkCoreNodeProps> nodes =
List<Replica> replicas =
zkController
.getZkStateReader()
.getReplicaProps(collection, shardId, cd.getCloudDescriptor().getCoreNodeName());
if (nodes == null) {
if (replicas == null) {
if (log.isInfoEnabled()) {
log.info("{} has no replicas", ZkCoreNodeProps.getCoreUrl(leaderProps));
}
return;
}

ZkCoreNodeProps zkLeader = new ZkCoreNodeProps(leaderProps);
for (ZkCoreNodeProps node : nodes) {
ZkShardTerms shardTerms = zkController.getShardTerms(collection, shardId);
for (Replica replica : replicas) {
try {
if (shardTerms.registered(replica.getName())
&& !shardTerms.canBecomeLeader(replica.getName())) {
if (log.isInfoEnabled()) {
log.info(
"{}: do NOT ask {} to sync, as it is not of the same shardTerm. Issue a recovery instead.",
ZkCoreNodeProps.getCoreUrl(leaderProps),
replica.getCoreUrl());
}
RecoveryRequest rr = new RecoveryRequest();
rr.leaderProps = leaderProps;
rr.baseUrl = replica.getBaseUrl();
rr.coreName = replica.getCoreName();
recoveryRequests.add(rr);
continue;
}
if (log.isInfoEnabled()) {
log.info(
"{}: try and ask {} to sync",
ZkCoreNodeProps.getCoreUrl(leaderProps),
node.getCoreUrl());
replica.getCoreUrl());
}

requestSync(
node.getBaseUrl(),
node.getCoreUrl(),
replica.getBaseUrl(),
replica.getCoreUrl(),
zkLeader.getCoreUrl(),
node.getCoreName(),
replica.getCoreName(),
nUpdates);

} catch (Exception e) {
Expand Down
Loading
Loading