Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ public class TransportConfiguration implements Serializable {

private String name;

private String lockCoordinator;

private String factoryClassName = "null";

private Map<String, Object> params;
Expand Down Expand Up @@ -413,6 +415,15 @@ public void decode(final ActiveMQBuffer buffer) {
}
}

public String getLockCoordinator() {
return lockCoordinator;
}

public TransportConfiguration setLockCoordinator(String lockCoordinator) {
this.lockCoordinator = lockCoordinator;
return this;
}

private static String replaceWildcardChars(final String str) {
return str.replace('.', '-');
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ public interface DistributedLock extends AutoCloseable {

String getLockId();

// TODO: A better name for this method would be isLockValid
boolean isHeldByCaller() throws UnavailableStateException;

boolean tryLock() throws UnavailableStateException, InterruptedException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,14 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import org.apache.activemq.artemis.utils.ClassloadingUtil;

public interface DistributedLockManager extends AutoCloseable {

static DistributedLockManager newInstanceOf(String className, Map<String, String> properties) throws Exception {
return (DistributedLockManager) ClassloadingUtil.getInstanceForParamsWithTypeCheck(className,
DistributedLockManager.class,
DistributedLockManager.class.getClassLoader(),
new Class[]{Map.class},
properties);
DistributedLockManagerFactory factory = Registry.getInstance().getFactoryWithClassName(className);
if (factory == null) {
throw new IllegalArgumentException(className + " not found");
}
return factory.build(properties);
}

@FunctionalInterface
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.activemq.artemis.lockmanager;

import java.util.Map;
import java.util.Set;

public interface DistributedLockManagerFactory {
DistributedLockManager build(Map<String, String> properties);

String getName();

String getImplName();

default Map<String, String> validateParameters(Map<String, String> config) {
config.forEach((parameterName, ignore) -> validateParameter(parameterName));
return config;
}

default String getParameterListAsString() {
return String.join(", ", getValidParametersList());
}

Set<String> getValidParametersList();

default void validateParameter(String parameterName) {
Set<String> validList = getValidParametersList();
if (!validList.contains(parameterName)) {
throw new IllegalArgumentException("Invalid parameter '" + parameterName + "'. Accepted parameters: " + String.join(", ", validList));
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.activemq.artemis.lockmanager;

import java.util.HashMap;
import java.util.ServiceLoader;

public class Registry {

private final HashMap<String, DistributedLockManagerFactory> factories = new HashMap<>();
private final HashMap<String, DistributedLockManagerFactory> factoriesWithImpl = new HashMap<>();

private volatile boolean serviceLoaded = false;

private static final Registry INSTANCE = new Registry();

private Registry() {
}

public static Registry getInstance() {
return INSTANCE;
}

public synchronized void register(DistributedLockManagerFactory factory) {
factories.put(factory.getName().toLowerCase(), factory);
factoriesWithImpl.put(factory.getImplName(), factory);
}

public synchronized void unregisterWithType(String type) {
unregister(factories.get(type.toLowerCase()));
}

public synchronized void unregisterWithClassName(String name) {
unregister(factoriesWithImpl.get(name));
}

private void unregister(DistributedLockManagerFactory factory) {
if (factory != null) {
factories.remove(factory.getName());
factoriesWithImpl.remove(factory.getImplName());
}
}

public synchronized DistributedLockManagerFactory getFactoryWithClassName(String className) {
checkService();
DistributedLockManagerFactory factory = factoriesWithImpl.get(className);
if (factory == null) {
throw new IllegalArgumentException("factory " + className + " not found");
}
return factory;
}

public synchronized DistributedLockManagerFactory getFactory(String type) {
checkService();
return factories.get(type.toLowerCase());
}

public synchronized void checkService() {
if (serviceLoaded) {
return;
}
ServiceLoader<DistributedLockManagerFactory> services = ServiceLoader.load(DistributedLockManagerFactory.class);
services.forEach(this::register);
serviceLoaded = true;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -31,21 +31,13 @@
import org.apache.activemq.artemis.lockmanager.MutableLong;
import org.apache.activemq.artemis.lockmanager.UnavailableStateException;

/**
* This is an implementation suitable to be used just on unit tests and it won't attempt to manage nor purge existing
* stale locks files. It's part of the tests life-cycle to properly set-up and tear-down the environment.
*/
public class FileBasedLockManager implements DistributedLockManager {

private final File locksFolder;
private final Map<String, FileDistributedLock> locks;
private boolean started;

public FileBasedLockManager(Map<String, String> args) {
this(new File(args.get("locks-folder")));
}

public FileBasedLockManager(File locksFolder) {
FileBasedLockManager(File locksFolder) {
Objects.requireNonNull(locksFolder);
if (!locksFolder.exists()) {
throw new IllegalStateException(locksFolder + " is supposed to already exists");
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.activemq.artemis.lockmanager.file;

import java.io.File;
import java.util.Map;
import java.util.Set;

import org.apache.activemq.artemis.lockmanager.DistributedLockManager;
import org.apache.activemq.artemis.lockmanager.DistributedLockManagerFactory;

/**
* Factory for creating file-based distributed lock managers.
* <p>
* This implementation uses the file system to manage distributed locks
* <p>
* Valid configuration parameters:
* <ul>
* <li><b>locks-folder</b> (required): Path to the directory where lock files will be created and managed.
* The directory must be created in advance before using this lock manager.</li>
* </ul>
*/
public class FileBasedLockManagerFactory implements DistributedLockManagerFactory {

private static final String LOCK_FOLDER = "locks-folder";

private static final Set<String> VALID_PARAMS = Set.of(LOCK_FOLDER);

@Override
public String getName() {
return "file";
}

@Override
public DistributedLockManager build(Map<String, String> config) {
config = validateParameters(config);
String folder = config.get(LOCK_FOLDER);
if (folder == null) {
throw new IllegalArgumentException("folder not passed as a parameter");
}
return new FileBasedLockManager(new File(folder));
}

@Override
public Set<String> getValidParametersList() {
return VALID_PARAMS;
}

@Override
public String getImplName() {
return FileBasedLockManager.class.getName();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public boolean isHeldByCaller() {
}

@Override
public boolean tryLock() {
public synchronized boolean tryLock() {
checkNotClosed();
final FileLock fileLock = this.fileLock;
if (fileLock != null) {
Expand All @@ -88,7 +88,7 @@ public boolean tryLock() {
}

@Override
public void unlock() {
public synchronized void unlock() {
checkNotClosed();
final FileLock fileLock = this.fileLock;
if (fileLock != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,10 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import org.apache.activemq.artemis.lockmanager.DistributedLock;
import org.apache.activemq.artemis.lockmanager.DistributedLockManager;
Expand All @@ -42,7 +39,6 @@
import org.apache.curator.utils.DebugUtils;

import static java.util.Objects.requireNonNull;
import static java.util.stream.Collectors.joining;

public class CuratorDistributedLockManager implements DistributedLockManager, ConnectionStateListener {

Expand Down Expand Up @@ -97,41 +93,6 @@ public int hashCode() {
}
}

private static final String CONNECT_STRING_PARAM = "connect-string";
private static final String NAMESPACE_PARAM = "namespace";
private static final String SESSION_MS_PARAM = "session-ms";
private static final String SESSION_PERCENT_PARAM = "session-percent";
private static final String CONNECTION_MS_PARAM = "connection-ms";
private static final String RETRIES_PARAM = "retries";
private static final String RETRIES_MS_PARAM = "retries-ms";
private static final Set<String> VALID_PARAMS = Stream.of(
CONNECT_STRING_PARAM,
NAMESPACE_PARAM,
SESSION_MS_PARAM,
SESSION_PERCENT_PARAM,
CONNECTION_MS_PARAM,
RETRIES_PARAM,
RETRIES_MS_PARAM).collect(Collectors.toSet());
private static final String VALID_PARAMS_ON_ERROR = VALID_PARAMS.stream().collect(joining(","));
// It's 9 times the default ZK tick time ie 2000 ms
private static final String DEFAULT_SESSION_TIMEOUT_MS = Integer.toString(18_000);
private static final String DEFAULT_CONNECTION_TIMEOUT_MS = Integer.toString(8_000);
private static final String DEFAULT_RETRIES = Integer.toString(1);
private static final String DEFAULT_RETRIES_MS = Integer.toString(1000);
// why 1/3 of the session? https://cwiki.apache.org/confluence/display/CURATOR/TN14
private static final String DEFAULT_SESSION_PERCENT = Integer.toString(33);

private static Map<String, String> validateParameters(Map<String, String> config) {
config.forEach((parameterName, ignore) -> validateParameter(parameterName));
return config;
}

private static void validateParameter(String parameterName) {
if (!VALID_PARAMS.contains(parameterName)) {
throw new IllegalArgumentException("non existent parameter " + parameterName + ": accepted list is " + VALID_PARAMS_ON_ERROR);
}
}

private CuratorFramework client;
private final Map<PrimitiveId, CuratorDistributedPrimitive> primitives;
private List<UnavailableManagerListener> listeners;
Expand All @@ -146,21 +107,7 @@ private static void validateParameter(String parameterName) {
}
}

public CuratorDistributedLockManager(Map<String, String> config) {
this(validateParameters(config), true);
}

private CuratorDistributedLockManager(Map<String, String> config, boolean ignore) {
this(config.get(CONNECT_STRING_PARAM),
config.get(NAMESPACE_PARAM),
Integer.parseInt(config.getOrDefault(SESSION_MS_PARAM, DEFAULT_SESSION_TIMEOUT_MS)),
Integer.parseInt(config.getOrDefault(SESSION_PERCENT_PARAM, DEFAULT_SESSION_PERCENT)),
Integer.parseInt(config.getOrDefault(CONNECTION_MS_PARAM, DEFAULT_CONNECTION_TIMEOUT_MS)),
Integer.parseInt(config.getOrDefault(RETRIES_PARAM, DEFAULT_RETRIES)),
Integer.parseInt(config.getOrDefault(RETRIES_MS_PARAM, DEFAULT_RETRIES_MS)));
}

private CuratorDistributedLockManager(String connectString,
CuratorDistributedLockManager(String connectString,
String namespace,
int sessionMs,
int sessionPercent,
Expand Down
Loading