Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,16 @@
*/
package org.apache.accumulo.core.logging;

import static java.util.Objects.requireNonNull;

import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ConcurrentMap;
import java.util.function.BiFunction;
import java.util.function.BiPredicate;

import org.apache.accumulo.core.util.Pair;
import org.slf4j.Logger;
import org.slf4j.Marker;
import org.slf4j.event.Level;
import org.slf4j.helpers.AbstractLogger;

import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
Expand All @@ -37,9 +36,7 @@
* Logger that wraps another Logger and only emits a log message once per the supplied duration.
*
*/
public abstract class ConditionalLogger extends AbstractLogger {

private static final long serialVersionUID = 1L;
public abstract class ConditionalLogger {

/**
* A Logger implementation that will log a message at the supplied elevated level if it has not
Expand All @@ -49,30 +46,37 @@ public abstract class ConditionalLogger extends AbstractLogger {
*/
public static class EscalatingLogger extends DeduplicatingLogger {

private static final long serialVersionUID = 1L;
private final Level elevatedLevel;
private final ConditionalLogAction elevatedLogAction;

public EscalatingLogger(Logger log, Duration threshold, long maxCachedLogMessages,
Level elevatedLevel) {
ConditionalLogAction elevatedLogAction) {
super(log, threshold, maxCachedLogMessages);
this.elevatedLevel = elevatedLevel;
this.elevatedLogAction = requireNonNull(elevatedLogAction);
}

@Override
protected void handleNormalizedLoggingCall(Level level, Marker marker, String messagePattern,
Object[] arguments, Throwable throwable) {
public void trace(String format, Object... arguments) {
log(elevatedLogAction, Logger::trace, format, arguments);
}

if (arguments == null) {
arguments = new Object[0];
}
if (!condition.apply(messagePattern, Arrays.asList(arguments))) {
delegate.atLevel(level).addMarker(marker).setCause(throwable).log(messagePattern,
arguments);
} else {
delegate.atLevel(elevatedLevel).addMarker(marker).setCause(throwable).log(messagePattern,
arguments);
}
@Override
public void debug(String format, Object... arguments) {
log(elevatedLogAction, Logger::debug, format, arguments);
}

@Override
public void info(String format, Object... arguments) {
log(elevatedLogAction, Logger::info, format, arguments);
}

@Override
public void warn(String format, Object... arguments) {
log(elevatedLogAction, Logger::warn, format, arguments);
}

@Override
public void error(String format, Object... arguments) {
log(elevatedLogAction, Logger::error, format, arguments);
}

}
Expand All @@ -82,25 +86,29 @@ protected void handleNormalizedLoggingCall(Level level, Marker marker, String me
*/
public static class DeduplicatingLogger extends ConditionalLogger {

private static final long serialVersionUID = 1L;

public DeduplicatingLogger(Logger log, Duration threshold, long maxCachedLogMessages) {
super(log, new BiFunction<>() {
super(log, new BiPredicate<>() {

private final Cache<Pair<String,List<Object>>,Boolean> cache = Caffeine.newBuilder()
.expireAfterWrite(threshold).maximumSize(maxCachedLogMessages).build();

private final ConcurrentMap<Pair<String,List<Object>>,Boolean> cacheMap = cache.asMap();

/**
* Function that will return true if the message has not been seen in the supplied duration.
* Function that will return true if the message with the provided arguments (minus any
* included Throwable as the last argument) has not been seen in the supplied duration.
* Deduplication will only work if the arguments are of types that implement meaningful
* equals. This is not generally true of Throwables.
*
* @param msg log message
* @param args log message arguments
* @return true if message has not been seen in duration, else false.
*/
@Override
public Boolean apply(String msg, List<Object> args) {
public boolean test(String msg, List<Object> args) {
if (!args.isEmpty() && args.get(args.size() - 1) instanceof Throwable) {
args = args.subList(0, args.size() - 1);
}
return cacheMap.putIfAbsent(new Pair<>(msg, args), true) == null;
}

Expand All @@ -110,85 +118,60 @@ public Boolean apply(String msg, List<Object> args) {
}

protected final Logger delegate;
protected final BiFunction<String,List<Object>,Boolean> condition;

protected ConditionalLogger(Logger log, BiFunction<String,List<Object>,Boolean> condition) {
// this.delegate = new DelegateWrapper(log);
this.delegate = log;
this.condition = condition;
}

@Override
public boolean isTraceEnabled() {
return this.delegate.isTraceEnabled();
}

@Override
public boolean isTraceEnabled(Marker marker) {
return this.delegate.isTraceEnabled(marker);
}

@Override
public boolean isDebugEnabled() {
return this.delegate.isDebugEnabled();
}

@Override
public boolean isDebugEnabled(Marker marker) {
return this.delegate.isDebugEnabled(marker);
}
protected final BiPredicate<String,List<Object>> condition;

@Override
public boolean isInfoEnabled() {
return this.delegate.isInfoEnabled();
protected ConditionalLogger(Logger log, BiPredicate<String,List<Object>> condition) {
this.delegate = requireNonNull(log);
this.condition = requireNonNull(condition);
}

@Override
public boolean isInfoEnabled(Marker marker) {
return this.delegate.isInfoEnabled(marker);
@FunctionalInterface
public interface ConditionalLogAction {
void log(Logger logger, String format, Object... arguments);
}

@Override
public boolean isWarnEnabled() {
return this.delegate.isWarnEnabled();
}

@Override
public boolean isWarnEnabled(Marker marker) {
return this.delegate.isWarnEnabled(marker);
/**
* Conditionally executes the log action with the provided format string and arguments
*
* @param conditionTrueLogAction the log action to execute (e.g. Logger::warn, Logger::debug,
* etc.) when the condition is true (optional, may be null)
* @param conditionFalseLogAction the log action to execute (e.g. Logger::warn, Logger::debug,
* etc.) when the condition is false (optional, may be null)
* @param format the message format String for the logger
* @param arguments the arguments to the format String
*/
protected final void log(ConditionalLogAction conditionTrueLogAction,
ConditionalLogAction conditionFalseLogAction, String format, Object... arguments) {
if (arguments == null) {
arguments = new Object[0];
}
if (condition.test(format, Arrays.asList(arguments))) {
if (conditionTrueLogAction != null) {
conditionTrueLogAction.log(delegate, format, arguments);
}
} else if (conditionFalseLogAction != null) {
conditionFalseLogAction.log(delegate, format, arguments);
}
}

@Override
public boolean isErrorEnabled() {
return this.delegate.isErrorEnabled();
public void trace(String format, Object... arguments) {
log(Logger::trace, null, format, arguments);
}

@Override
public boolean isErrorEnabled(Marker marker) {
return this.delegate.isErrorEnabled(marker);
public void debug(String format, Object... arguments) {
log(Logger::debug, null, format, arguments);
}

@Override
public String getName() {
return this.delegate.getName();
public void info(String format, Object... arguments) {
log(Logger::info, null, format, arguments);
}

@Override
protected String getFullyQualifiedCallerName() {
return this.delegate.getName();
public void warn(String format, Object... arguments) {
log(Logger::warn, null, format, arguments);
}

@Override
protected void handleNormalizedLoggingCall(Level level, Marker marker, String messagePattern,
Object[] arguments, Throwable throwable) {

if (arguments == null) {
arguments = new Object[0];
}
if (condition.apply(messagePattern, Arrays.asList(arguments))) {
delegate.atLevel(level).addMarker(marker).setCause(throwable).log(messagePattern, arguments);
}

public void error(String format, Object... arguments) {
log(Logger::error, null, format, arguments);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@
import org.apache.commons.lang3.builder.ToStringStyle;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.event.Level;

import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
Expand Down Expand Up @@ -102,8 +101,8 @@ public class HostRegexTableLoadBalancer extends TableLoadBalancer {
private static final String PROP_PREFIX = Property.TABLE_ARBITRARY_PROP_PREFIX.getKey();

private static final Logger LOG = LoggerFactory.getLogger(HostRegexTableLoadBalancer.class);
private static final Logger MIGRATIONS_LOGGER =
new EscalatingLogger(LOG, Duration.ofMinutes(5), 1000, Level.WARN);
private static final EscalatingLogger MIGRATIONS_LOGGER =
new EscalatingLogger(LOG, Duration.ofMinutes(5), 1000, Logger::warn);
public static final String HOST_BALANCER_PREFIX = PROP_PREFIX + "balancer.host.regex.";
public static final String HOST_BALANCER_OOB_CHECK_KEY =
PROP_PREFIX + "balancer.host.regex.oob.period";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
public class DeduplicatingLoggerTest {

private static final Logger LOG = LoggerFactory.getLogger(DeduplicatingLoggerTest.class);
private static final Logger TEST_LOGGER =
private static final DeduplicatingLogger TEST_LOGGER =
new DeduplicatingLogger(LOG, Duration.ofMinutes(1), 100);

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,12 @@
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.event.Level;

public class EscalatingLoggerTest {

private static final Logger LOG = LoggerFactory.getLogger(EscalatingLoggerTest.class);
private static final Logger TEST_LOGGER =
new EscalatingLogger(LOG, Duration.ofSeconds(3), 100, Level.WARN);
private static final EscalatingLogger TEST_LOGGER =
new EscalatingLogger(LOG, Duration.ofSeconds(3), 100, Logger::warn);

@Test
public void test() throws InterruptedException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
import org.apache.accumulo.core.fate.zookeeper.ServiceLock;
import org.apache.accumulo.core.fate.zookeeper.ServiceLockSupport.HAServiceLockWatcher;
import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter;
import org.apache.accumulo.core.logging.ConditionalLogger.ConditionalLogAction;
import org.apache.accumulo.core.metadata.TServerInstance;
import org.apache.accumulo.core.metadata.schema.Ample;
import org.apache.accumulo.core.metadata.schema.ExternalCompactionId;
Expand Down Expand Up @@ -96,7 +97,6 @@
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.event.Level;

import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
Expand Down Expand Up @@ -705,16 +705,13 @@ private <T> void printStats(String logPrefix, ConcurrentHashMap<T,FailureCounts>
for (var key : failureCounts.keySet()) {
failureCounts.compute(key, (k, counts) -> {
if (counts != null) {
Level level;
ConditionalLogAction logAction = Logger::debug;
if (counts.failures > 0) {
level = Level.WARN;
logAction = Logger::warn;
} else if (logSuccessAtTrace) {
level = Level.TRACE;
} else {
level = Level.DEBUG;
logAction = Logger::trace;
}

LOG.atLevel(level).log("{} {} failures:{} successes:{} since last time this was logged ",
logAction.log(LOG, "{} {} failures:{} successes:{} since last time this was logged ",
logPrefix, k, counts.failures, counts.successes);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,6 @@
import org.apache.hadoop.io.Text;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.event.Level;

import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableSortedSet;
Expand All @@ -115,8 +114,8 @@

abstract class TabletGroupWatcher extends AccumuloDaemonThread {

private static final Logger TABLET_UNLOAD_LOGGER =
new EscalatingLogger(Manager.log, Duration.ofMinutes(5), 1000, Level.INFO);
private static final EscalatingLogger TABLET_UNLOAD_LOGGER =
new EscalatingLogger(Manager.log, Duration.ofMinutes(5), 1000, Logger::info);
private final Manager manager;
private final TabletStateStore store;
private final TabletGroupWatcher dependentWatcher;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@
*/
public class Tablet extends TabletBase {
private static final Logger log = LoggerFactory.getLogger(Tablet.class);
private static final Logger CLOSING_STUCK_LOGGER =
private static final DeduplicatingLogger CLOSING_STUCK_LOGGER =
new DeduplicatingLogger(log, Duration.ofMinutes(5), 1000);

private final TabletServer tabletServer;
Expand Down