Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 0 additions & 11 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@
<projectEmail>https://github.com/Commonjava/weft</projectEmail>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<plugin.jacoco.skip>false</plugin.jacoco.skip>
<o11yphantVersion>1.9.2</o11yphantVersion>
<weldVersion>5.1.7.Final</weldVersion>
<jakarta.annotation.version>3.0.0</jakarta.annotation.version>
<jakarta.inject.version>2.0.1</jakarta.inject.version>
Expand All @@ -60,20 +59,10 @@
<type>pom</type>
<scope>import</scope>
</dependency>
<dependency>
<groupId>org.commonjava.util</groupId>
<artifactId>o11yphant-metrics-api</artifactId>
<version>${o11yphantVersion}</version>
</dependency>
</dependencies>
</dependencyManagement>

<dependencies>
<!-- metrics support-->
<dependency>
<groupId>org.commonjava.util</groupId>
<artifactId>o11yphant-metrics-api</artifactId>
</dependency>
<dependency>
<groupId>jakarta.annotation</groupId>
<artifactId>jakarta.annotation-api</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@
package org.commonjava.cdi.util.weft;

import org.commonjava.cdi.util.weft.exception.PoolOverloadException;
import org.commonjava.o11yphant.metrics.api.MetricRegistry;
import org.commonjava.o11yphant.metrics.api.Timer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -40,18 +38,12 @@
import java.util.function.Function;
import java.util.stream.Collectors;

import static org.commonjava.o11yphant.metrics.util.NameUtils.name;

/**
* Created by jdcasey on 1/3/17.
*/
public class PoolWeftExecutorService
implements WeftExecutorService, ScheduledExecutorService
{
private static final String TIMER = "timer";

private static final String METER = "meter";

private static final int DEFAULT_THREAD_COUNT = 2;

private static final float DEFAULT_LOAD_FACTOR = 10f;
Expand All @@ -68,41 +60,33 @@ public class PoolWeftExecutorService

private final boolean loadSensitive;

private final MetricRegistry metricRegistry;

private final String metricPrefix;

private Set<ThreadContextualizer> contextualizers;

private final AtomicLong load = new AtomicLong( 0L );


public PoolWeftExecutorService( String name, ThreadPoolExecutor delegate )
{
this( name, delegate, DEFAULT_THREAD_COUNT, DEFAULT_LOAD_FACTOR, DEFAULT_LOAD_SENSITIVE, null, null,
this( name, delegate, DEFAULT_THREAD_COUNT, DEFAULT_LOAD_FACTOR, DEFAULT_LOAD_SENSITIVE,
Collections.emptySet() );
}

public PoolWeftExecutorService( final String name, ThreadPoolExecutor delegate, final Integer threadCount,
final Float maxLoadFactor, boolean loadSensitive,
final MetricRegistry metricRegistry, final String metricPrefix )
final Float maxLoadFactor, boolean loadSensitive )
{
this( name, delegate, threadCount, maxLoadFactor, loadSensitive, metricRegistry, metricPrefix,
this( name, delegate, threadCount, maxLoadFactor, loadSensitive,
Collections.emptySet() );
}

public PoolWeftExecutorService( final String name, ThreadPoolExecutor delegate, final Integer threadCount,
final Float maxLoadFactor, boolean loadSensitive,
final MetricRegistry metricRegistry, final String metricPrefix,
Iterable<ThreadContextualizer> contextualizers )
{
this.name = name;
this.delegate = delegate;
this.threadCount = threadCount;
this.maxLoadFactor = maxLoadFactor;
this.loadSensitive = loadSensitive;
this.metricRegistry = metricRegistry;
this.metricPrefix = metricPrefix;
this.contextualizers = new HashSet<>();
contextualizers.forEach( c -> this.contextualizers.add( c ) );
}
Expand Down Expand Up @@ -175,7 +159,7 @@ private void verifyLoad()
throw new PoolOverloadException( getName(), getLoadFactor(), getCurrentLoad(), maxLoadFactor, getThreadCount() );
}
}

@Override
public <T> Future<T> submit( Callable<T> callable )
{
Expand Down Expand Up @@ -313,52 +297,6 @@ private <T> ScheduledFuture<T> asScheduled( Function<ScheduledExecutorService, S
}
}

private <T> Callable<T> timeCallable( Callable<T> callable )
{
return (Callable<T>) ()->{
if( metricRegistry != null )
{
metricRegistry.meter( name( metricPrefix, "call", METER ) ).mark();
Timer.Context context = metricRegistry.timer( name( metricPrefix, "call", TIMER ) ).time();
try
{
return callable.call();
}
finally
{
context.stop();
}
}
else
{
return callable.call();
}
};
}

private Runnable timeRunnable( Runnable runnable )
{
return ()->{
if( metricRegistry != null )
{
metricRegistry.meter( name( metricPrefix, "run", METER ) ).mark();
Timer.Context context = metricRegistry.timer( name( metricPrefix, "run", TIMER ) ).time();
try
{
runnable.run();
}
finally
{
context.stop();
}
}
else
{
runnable.run();
}
};
}

private <T> Collection<Callable<T>> wrapAll( Collection<? extends Callable<T>> collection )
{
ThreadContext ctx = ThreadContext.getContext( false );
Expand All @@ -369,7 +307,7 @@ private <T> Collection<Callable<T>> wrapAll( Collection<? extends Callable<T>> c
setContext( extractedContext );
Logger logger = LoggerFactory.getLogger( getClass() );
logger.debug( "Using ThreadContext: {} (saving: {}) in {}", ctx, old, Thread.currentThread().getName() );
return timeCallable((Callable<T>) () -> {
return (Callable<T>) () -> {
try
{
return callable.call();
Expand All @@ -381,7 +319,7 @@ private <T> Collection<Callable<T>> wrapAll( Collection<? extends Callable<T>> c
clearBridgedContext();
load.decrementAndGet();
}
});
};
} ).collect( Collectors.toList() );
}

Expand All @@ -390,7 +328,7 @@ private Runnable wrapRunnable( Runnable runnable )
ThreadContext ctx = ThreadContext.getContext( false );
Map<String, Object> extractedContext = extractContext();
load.incrementAndGet();
return timeRunnable(()->{
return ()->{
ThreadContext old = ThreadContext.setContext( ctx );
setContext( extractedContext );
Logger logger = LoggerFactory.getLogger( getClass() );
Expand All @@ -407,15 +345,15 @@ private Runnable wrapRunnable( Runnable runnable )
clearBridgedContext();
load.decrementAndGet();
}
});
};
}

private <T> Callable<T> wrapCallable( Callable<T> callable )
{
ThreadContext ctx = ThreadContext.getContext( false );
Map<String, Object> extractedContext = extractContext();
load.incrementAndGet();
return timeCallable((Callable<T>) ()->{
return (Callable<T>) ()->{
ThreadContext old = ThreadContext.setContext( ctx );
setContext( extractedContext );
Logger logger = LoggerFactory.getLogger( getClass() );
Expand All @@ -431,7 +369,7 @@ private <T> Callable<T> wrapCallable( Callable<T> callable )
clearBridgedContext();
load.decrementAndGet();
}
});
};
}

private void clearBridgedContext()
Expand Down
44 changes: 4 additions & 40 deletions src/main/java/org/commonjava/cdi/util/weft/WeftPoolBoy.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,9 @@
package org.commonjava.cdi.util.weft;

import org.commonjava.cdi.util.weft.config.WeftConfig;
import org.commonjava.o11yphant.metrics.api.Gauge;
import org.commonjava.o11yphant.metrics.api.MetricRegistry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.inject.Instance;
Expand All @@ -36,11 +33,9 @@
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import static org.apache.commons.lang3.StringUtils.isBlank;
import static org.commonjava.cdi.util.weft.config.DefaultWeftConfig.DEFAULT_MAX_LOAD_FACTOR;
import static org.commonjava.cdi.util.weft.config.DefaultWeftConfig.DEFAULT_PRIORITY;
import static org.commonjava.cdi.util.weft.config.DefaultWeftConfig.DEFAULT_THREADS;
import static org.commonjava.o11yphant.metrics.util.NameUtils.name;

@ApplicationScoped
public class WeftPoolBoy
Expand All @@ -54,29 +49,14 @@ public class WeftPoolBoy
@Inject
private WeftConfig config;

@Inject
private Instance<MetricRegistry> metricRegistryInstance;

private MetricRegistry metricRegistry;

@Inject
private Instance<ThreadContextualizer> contextualizers;

protected WeftPoolBoy(){}

public WeftPoolBoy( WeftConfig config, MetricRegistry registry )
public WeftPoolBoy( WeftConfig config )
{
this.config = config;
this.metricRegistry = registry;
}

@PostConstruct
public void init()
{
if ( !metricRegistryInstance.isUnsatisfied() )
{
this.metricRegistry = metricRegistryInstance.get();
}
}

public WeftExecutorService getPool( final String key )
Expand Down Expand Up @@ -140,7 +120,7 @@ public synchronized WeftExecutorService getPool( final ExecutorConfig ec, final
{
int threadCount = ec.threads();
String name = ec.named();
if ( isBlank( name ) )
if ( name == null || name.trim().isEmpty() )
{
name = DUMMY_NAME;
}
Expand Down Expand Up @@ -215,34 +195,18 @@ else if ( threadCount > 0 )
svc = (ThreadPoolExecutor) Executors.newCachedThreadPool( fac );
}

String metricPrefix = name( config.getNodePrefix(), "weft.ThreadPoolExecutor", name );

service = new PoolWeftExecutorService( name, svc, threadCount, maxLoadFactor, loadSensitive, metricRegistry,
metricPrefix, contextualizers );
service = new PoolWeftExecutorService( name, svc, threadCount, maxLoadFactor, loadSensitive,
contextualizers );

// TODO: Wrapper ThreadPoolExecutor that wraps Runnables to store/copy MDC when it gets created/started.

addPool( service );
registerMetrics( metricPrefix, service );
}

return service;
}


private void registerMetrics( String prefix, WeftExecutorService pool )
{
if ( metricRegistry != null )
{
metricRegistry.register( name( prefix, "corePoolSize" ), (Gauge<Integer>) () -> pool.getCorePoolSize() );
metricRegistry.register( name( prefix, "activeThreads" ), (Gauge<Integer>) () -> pool.getActiveCount() );
metricRegistry.register( name( prefix, "loadFactor" ), (Gauge<Double>) () -> pool.getLoadFactor() );
metricRegistry.register( name( prefix, "currentLoad" ), (Gauge<Long>) () -> pool.getCurrentLoad() );

metricRegistry.registerHealthCheck( name( prefix, pool.getName() ), new WeftPoolHealthCheck( pool ) );
}
}

public Map<String, WeftExecutorService> getPools()
{
Map<String, WeftExecutorService> result = new HashMap<>( pools );
Expand Down
Loading