Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 73 additions & 14 deletions src/main/java/Main.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,20 +12,27 @@
import io.netty.channel.epoll.EpollSocketChannel;
import io.netty.channel.kqueue.KQueueEventLoopGroup;
import io.netty.channel.kqueue.KQueueSocketChannel;
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.SingleThreadEventExecutor;
import java.text.MessageFormat;
import java.util.AbstractMap;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

public final class Main {

private static void createAndStartGrpServer(int port) throws Exception {
ServerBuilder.forPort(port)
.addService(new MyTestServiceGrpc.MyTestServiceImplBase() {
@Override public void myTest(MyTestProto.Request request, StreamObserver<MyTestProto.Response> so) {
@Override
public void myTest(MyTestProto.Request request, StreamObserver<MyTestProto.Response> so) {
if (request.getValue() == 0) {
so.onNext(MyTestProto.Response.getDefaultInstance());
so.onCompleted();
Expand All @@ -36,18 +43,28 @@ private static void createAndStartGrpServer(int port) throws Exception {
.start();
}

private static ManagedChannel buildChannel(String host, int port, boolean scheduledService) {
private static EventLoopGroup createEventLoopGroup(String suffix) {
ThreadFactory threadFactory =
new ThreadFactoryBuilder().setNameFormat("io-event-loop-%s").setDaemon(true).build();
EventLoopGroup eventLoopGroup;
Class<? extends Channel> channelType;
new ThreadFactoryBuilder().setNameFormat("io-event-loop-" + suffix + "-%s").setDaemon(true)
.build();

if (Epoll.isAvailable()) {
eventLoopGroup = new EpollEventLoopGroup(1, threadFactory);
channelType = EpollSocketChannel.class;
return new EpollEventLoopGroup(1, threadFactory);
} else {
eventLoopGroup = new KQueueEventLoopGroup(1, threadFactory);
return new KQueueEventLoopGroup(1, threadFactory);
}
}

private static ManagedChannel buildChannel(String host, int port, boolean scheduledService,
EventLoopGroup eventLoopGroup) {

Class<? extends Channel> channelType;
if (eventLoopGroup instanceof EpollEventLoopGroup) {
channelType = EpollSocketChannel.class;
} else if (eventLoopGroup instanceof KQueueEventLoopGroup) {
channelType = KQueueSocketChannel.class;
} else {
throw new RuntimeException("unknown EventLoopGroup type");
}

System.out.println("Using " + channelType);
Expand All @@ -73,15 +90,29 @@ public static void main(String[] args) throws Exception {

createAndStartGrpServer(port);

ScheduledExecutorService metricsScheduler = Executors.newSingleThreadScheduledExecutor();

EventLoopGroup eventLoopGroupWithSchedulingLoops = createEventLoopGroup("scheduling");
final MyTestServiceGrpc.MyTestServiceBlockingStub stub = MyTestServiceGrpc.newBlockingStub(
buildChannel(host, port, false));
buildChannel(host, port, false, eventLoopGroupWithSchedulingLoops));

EventLoopGroup eventLoopGroupWithoutSchedulingLoops = createEventLoopGroup("no-scheduling");
final MyTestServiceGrpc.MyTestServiceBlockingStub stubWithScheduledExecutor = MyTestServiceGrpc.newBlockingStub(
buildChannel(host, port, true));
buildChannel(host, port, true, eventLoopGroupWithoutSchedulingLoops));

AtomicInteger pendingTasksWithSchedulingLoops = new AtomicInteger();
AtomicInteger pendingTasksWithoutSchedulingLoops = new AtomicInteger();

metricsScheduler.scheduleAtFixedRate(() -> countPendingTasks(List.of(
new AbstractMap.SimpleEntry<>(pendingTasksWithSchedulingLoops, eventLoopGroupWithSchedulingLoops),
new AbstractMap.SimpleEntry<>(pendingTasksWithoutSchedulingLoops, eventLoopGroupWithoutSchedulingLoops)))
, 10, 10,
TimeUnit.MILLISECONDS);


// Warm-up calls
MyTestProto.Request warmUpRequest = MyTestProto.Request.newBuilder().build();
for(int i = 0; i < 5; i++) {
for (int i = 0; i < 5; i++) {
stub.myTest(warmUpRequest);
stubWithScheduledExecutor.myTest(warmUpRequest);
}
Expand All @@ -92,7 +123,9 @@ public static void main(String[] args) throws Exception {
ExecutorService executorService = Executors.newCachedThreadPool(
new ThreadFactoryBuilder().setNameFormat("main-grpc-request-%d").build());

for(MyTestServiceGrpc.MyTestServiceBlockingStub currentStub : List.of(stub, stubWithScheduledExecutor, stub, stubWithScheduledExecutor, stub, stubWithScheduledExecutor, stub)) {
for (MyTestServiceGrpc.MyTestServiceBlockingStub currentStub : List.of(stub,
stubWithScheduledExecutor, stub, stubWithScheduledExecutor, stub, stubWithScheduledExecutor,
stub, stubWithScheduledExecutor)) {
long totalTimePerExperiments = 0;
for (int i = 0; i < 5; i++) {
CountDownLatch startLatch = new CountDownLatch(1);
Expand Down Expand Up @@ -133,12 +166,38 @@ public static void main(String[] args) throws Exception {
finishLatch.await();

totalTimePerExperiments += totalTime.get();
System.out.println(MessageFormat.format("withScheduledService={0}, testNum={1}, maxTime={2}, minTime={3}, totalTime={4}", currentStub == stubWithScheduledExecutor, i, maxTime, minTime, totalTime));
System.out.println(MessageFormat.format(
"withScheduledService={0}, testNum={1}, maxTime={2}, minTime={3}, totalTime={4}",
currentStub == stubWithScheduledExecutor, i, maxTime, minTime, totalTime));
Thread.sleep(100);
}
System.out.println(MessageFormat.format("withScheduledService={0}, totalTimePerExperiments={1}", currentStub == stubWithScheduledExecutor, totalTimePerExperiments));
System.out.println(
MessageFormat.format("withScheduledService={0}, totalTimePerExperiments={1}",
currentStub == stubWithScheduledExecutor, totalTimePerExperiments));
Thread.sleep(1000);
}

System.out.println(
MessageFormat.format("pending tasks withoutScheduledService={0}, withScheduledService={1}",
pendingTasksWithSchedulingLoops.get(), pendingTasksWithoutSchedulingLoops.get()));

System.exit(0);
}

private static void countPendingTasks(
List<AbstractMap.Entry<AtomicInteger, EventLoopGroup>> pairs) {
for (AbstractMap.Entry<AtomicInteger, EventLoopGroup> pair : pairs) {
pair.getKey().getAndAccumulate(getPendingTasks(pair.getValue()), Math::max);
}
}

private static int getPendingTasks(EventLoopGroup loopGroup) {
int allPendingTasks = 0;
for (EventExecutor eventExecutor : loopGroup) {
if (eventExecutor instanceof SingleThreadEventExecutor executor) {
allPendingTasks += executor.pendingTasks();
}
}
return allPendingTasks;
}
}