Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Large diffs are not rendered by default.

Large diffs are not rendered by default.

35 changes: 29 additions & 6 deletions core/src/main/thrift/compaction-coordinator.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ enum TCompactionState {
SUCCEEDED
# Compactor should set state to FAILED when compaction job fails, message should be mandatory
FAILED
# Compactor should set state to CANCELLED to acknowledge that it has stopped compacting
# Compactor should set state to CANCELLED to acknowledge that it has stopped compacting
CANCELLED
}

Expand Down Expand Up @@ -79,12 +79,15 @@ service CompactionCoordinatorService {
*/
void compactionCompleted(
1:client.TInfo tinfo
2:security.TCredentials credentials
2:security.TCredentials credentials
3:string externalCompactionId
4:data.TKeyExtent extent
5:tabletserver.TCompactionStats stats
)throws(
1:client.ThriftSecurityException sec
2:client.ThriftNotActiveServiceException tnase
)

/*
* Called by Compactor to get the next compaction job
*/
Expand All @@ -94,8 +97,11 @@ service CompactionCoordinatorService {
3:string groupName
4:string compactor
5:string externalCompactionId
)throws(
1:client.ThriftSecurityException sec
2:client.ThriftNotActiveServiceException tnase
)

/*
* Called by Compactor to update the Coordinator with the state of the compaction
*/
Expand All @@ -105,8 +111,11 @@ service CompactionCoordinatorService {
3:string externalCompactionId
4:TCompactionStatusUpdate status
5:i64 timestamp
)throws(
1:client.ThriftSecurityException sec
2:client.ThriftNotActiveServiceException tnase
)

/*
* Called by Compactor on unsuccessful completion of compaction job
*/
Expand All @@ -117,6 +126,9 @@ service CompactionCoordinatorService {
4:data.TKeyExtent extent
5:string exceptionClassName
6:TCompactionState failureState
)throws(
1:client.ThriftSecurityException sec
2:client.ThriftNotActiveServiceException tnase
)

/*
Expand All @@ -125,6 +137,9 @@ service CompactionCoordinatorService {
TExternalCompactionMap getRunningCompactions(
1:client.TInfo tinfo
2:security.TCredentials credentials
)throws(
1:client.ThriftSecurityException sec
2:client.ThriftNotActiveServiceException tnase
)

/*
Expand All @@ -134,6 +149,9 @@ service CompactionCoordinatorService {
map<string,TExternalCompactionList> getLongRunningCompactions(
1:client.TInfo tinfo
2:security.TCredentials credentials
)throws(
1:client.ThriftSecurityException sec
2:client.ThriftNotActiveServiceException tnase
)

/*
Expand All @@ -142,14 +160,19 @@ service CompactionCoordinatorService {
TExternalCompactionMap getCompletedCompactions(
1:client.TInfo tinfo
2:security.TCredentials credentials
)throws(
1:client.ThriftSecurityException sec
2:client.ThriftNotActiveServiceException tnase
)

void cancel(
1:client.TInfo tinfo
2:security.TCredentials credentials
3:string externalCompactionId
)throws(
1:client.ThriftSecurityException sec
2:client.ThriftNotActiveServiceException tnase
)

}

service CompactorService {
Expand Down
17 changes: 10 additions & 7 deletions core/src/main/thrift/manager.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ service FateService {
1:client.ThriftSecurityException sec
2:client.ThriftNotActiveServiceException tnase
)

}

service ManagerClientService {
Expand Down Expand Up @@ -348,7 +348,7 @@ service ManagerClientService {
1:client.ThriftSecurityException sec
2:client.ThriftNotActiveServiceException tnase
)

void tabletServerStopping(
1:client.TInfo tinfo
2:security.TCredentials credentials
Expand All @@ -369,7 +369,7 @@ service ManagerClientService {
2:client.ThriftNotActiveServiceException tnase
3:ThriftPropertyException tpe
)

void modifySystemProperties(
1:client.TInfo tinfo
2:security.TCredentials credentials
Expand Down Expand Up @@ -398,17 +398,17 @@ service ManagerClientService {
1:client.ThriftSecurityException sec
2:client.ThriftNotActiveServiceException tnase
)

void removeResourceGroupNode(
1:client.TInfo tinfo
2:security.TCredentials credentials
3:string resourceGroup
) throws (
1:client.ThriftSecurityException sec
2:client.ThriftNotActiveServiceException tnase
3:client.ThriftResourceGroupNotExistsException rgne
3:client.ThriftResourceGroupNotExistsException rgne
)

void setResourceGroupProperty(
1:client.TInfo tinfo
2:security.TCredentials credentials
Expand All @@ -421,7 +421,7 @@ service ManagerClientService {
3:ThriftPropertyException tpe
4:client.ThriftResourceGroupNotExistsException rgne
)

void modifyResourceGroupProperties(
1:client.TInfo tinfo
2:security.TCredentials credentials
Expand Down Expand Up @@ -496,6 +496,7 @@ service ManagerClientService {
) throws (
1:client.ThriftSecurityException sec
2:client.ThriftTableOperationException toe
3:client.ThriftNotActiveServiceException tnase
)

list<data.TKeyExtent> updateTabletMergeability(
Expand All @@ -506,12 +507,14 @@ service ManagerClientService {
) throws (
1:client.ThriftSecurityException sec
2:client.ThriftTableOperationException toe
3:client.ThriftNotActiveServiceException tnase
)

i64 getManagerTimeNanos(
1:client.TInfo tinfo
2:security.TCredentials credentials
) throws (
1:client.ThriftSecurityException sec
2:client.ThriftNotActiveServiceException tnase
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -347,17 +347,13 @@ protected void updateAdvertiseAddress(HostAndPort thriftBindAddress) {
* advertise address based on the address to which the ThriftServer is bound
*
* @param supplier ThriftServer
* @param start true to start the server, else false
* @throws UnknownHostException thrown from ThriftServer when binding to bad address
*/
protected void updateThriftServer(ThriftServerSupplier supplier, boolean start)
throws UnknownHostException {
protected void updateThriftServer(ThriftServerSupplier supplier) throws UnknownHostException {
thriftServer = supplier.get();
if (start) {
thriftServer.startThriftServer("Thrift Client Server");
log.info("Starting {} Thrift server, listening on {}", this.getClass().getSimpleName(),
thriftServer.address);
}
thriftServer.startThriftServer("Thrift Client Server");
log.info("Starting {} Thrift server, listening on {}", this.getClass().getSimpleName(),
thriftServer.address);
updateAdvertiseAddress(thriftServer.address);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.accumulo.server;

/**
* This interface allows service implementations which support running multiple instances
* concurrently with only one active instance to report whether or not they are the active service.
*/
public interface HighlyAvailableService {

/**
* Is this service instance currently the active instance for the Accumulo cluster.
*
* @return True if the service is the active service, false otherwise.
*/
boolean isActiveService();

/**
* Is this service instance currently in the process of upgrading.
*
* @return True if the service is upgrading, false otherwise.
*/
default boolean isUpgrading() {
return false;
}

/**
* Get the name of the service
*
* @return service name
*/
default String getServiceName() {
return this.getClass().getSimpleName();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.accumulo.server.rpc;

import java.lang.reflect.InvocationHandler;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.Arrays;
import java.util.Objects;
import java.util.Set;

import org.apache.accumulo.core.clientImpl.thrift.ThriftNotActiveServiceException;
import org.apache.accumulo.server.HighlyAvailableService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* An {@link InvocationHandler} which checks to see if a {@link HighlyAvailableService} is the
* current active instance of that service, throwing {@link ThriftNotActiveServiceException} when it
* is not the current active instance.
*/
public class HighlyAvailableServiceInvocationHandler<I> implements InvocationHandler {
private static final Logger LOG =
LoggerFactory.getLogger(HighlyAvailableServiceInvocationHandler.class);

private final I instance;
private final HighlyAvailableService service;
private final Set<String> onewayMethods;

public HighlyAvailableServiceInvocationHandler(I instance, HighlyAvailableService service,
Set<String> onewayMethods) {
this.instance = Objects.requireNonNull(instance);
this.service = Objects.requireNonNull(service);
this.onewayMethods = onewayMethods;
}

@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {

// If the service is upgrading, throw an exception
if (service.isUpgrading()) {
if (onewayMethods.contains(method.getName())) {
// if thrift one way method throws an exception it will just log an error
LOG.debug("Ignoring one way thrift call during upgrade : {} {}", method.getName(),
Arrays.asList(args));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you need to return on this line, or it will fall through and invoke the method.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch, that would have spammed the logs w/ errors. Fixed in d5670e6

return null;
} else {
LOG.trace("Service can not be accessed while it is upgrading.");
throw new ThriftNotActiveServiceException(service.getServiceName(),
"Service can not be accessed while it is upgrading");
}
}

// If the service is not active, throw an exception
if (!service.isActiveService()) {
if (onewayMethods.contains(method.getName())) {
// if thrift one way method throws an exception it will just log an error
LOG.debug("Ignoring one way thrift call because not active : {} {}", method.getName(),
Arrays.asList(args));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you need to return on this line, or it will fall through and invoke the method.

return null;
} else {
LOG.trace("Denying access to RPC service as this instance is not the active instance.");
throw new ThriftNotActiveServiceException(service.getServiceName(),
"Denying access to RPC service as this instance is not the active instance");
}
}
try {
// Otherwise, call the real method
return method.invoke(instance, args);
} catch (InvocationTargetException ex) {
throw ex.getCause();
}
}
}
Loading