import org.apache.thrift.TProcessor; import org.apache.thrift.protocol.TBinaryProtocol; import org.apache.thrift.protocol.TCompactProtocol; import org.apache.thrift.server.TNonblockingServer; import org.apache.thrift.server.TServer; import org.apache.thrift.server.TSimpleServer; import org.apache.thrift.server.TThreadPoolServer; import org.apache.thrift.transport.TFramedTransport; import org.apache.thrift.transport.TNonblockingServerSocket; import org.apache.thrift.transport.TNonblockingServerTransport; import org.apache.thrift.transport.TServerSocket; import org.apache.thrift.transport.TServerTransport; import org.glassfish.grizzly.filterchain.FilterChainBuilder; import org.glassfish.grizzly.filterchain.TransportFilter; import org.glassfish.grizzly.nio.transport.TCPNIOTransport; import org.glassfish.grizzly.nio.transport.TCPNIOTransportBuilder; import org.glassfish.grizzly.strategies.LeaderFollowerNIOStrategy; import org.glassfish.grizzly.strategies.SameThreadIOStrategy; import org.glassfish.grizzly.thrift.ThriftFrameFilter; import org.glassfish.grizzly.thrift.ThriftServerFilter; import org.glassfish.grizzly.utils.LinkedTransferQueue; import org.jboss.netty.bootstrap.ServerBootstrap; import org.jboss.netty.channel.Channel; import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory; import se.cgbystrom.netty.thrift.ThriftPipelineFactory; import se.cgbystrom.netty.thrift.ThriftServerHandler; import tutorial.Calculator; import java.net.InetSocketAddress; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; /** * @author Bongjae Chang */ public class ThriftServerBenchmark { private final String host; private final int port; private final String serverType; private final int maxWorkerPoolSize; private final int minWorkerPoolSize; private ThriftServer thriftServer; // for 3M //private final static int size = 4 * 1024 * 1024; // for 3K //private final static int size = 4 * 1024; // for 300Bytes private final static int size = 4 * 102; private enum ServerType { TSERVER, TTHREADPOOLSERVER, TNONBLOCKINGSERVER, NETTY, GRIZZLY } private ThriftServerBenchmark(Builder builder) { host = builder.host; port = builder.port; serverType = builder.serverType; maxWorkerPoolSize = builder.maxWorkerPoolSize; minWorkerPoolSize = Runtime.getRuntime().availableProcessors() * 2; } public void startBenchmark(TProcessor processor) throws Exception { ThriftServer thriftServer = getThriftServer(serverType); System.out.println("Start the " + thriftServer); thriftServer.open(processor); } public void stopBenchmark() throws Exception { ThriftServer thriftServer = getThriftServer(serverType); thriftServer.close(); } private ThriftServer getThriftServer(String serverType) { ThriftServer thriftServer = this.thriftServer; if (thriftServer != null) return thriftServer; if (serverType == null) { throw new IllegalArgumentException(); } final String serverTypeUpperCase = serverType.toUpperCase(); if (serverTypeUpperCase.equals(ServerType.TSERVER.toString())) { thriftServer = new TThriftServer(); } else if (serverTypeUpperCase.equals(ServerType.TTHREADPOOLSERVER.toString())) { thriftServer = new TThreadPoolThriftServer(); } else if (serverTypeUpperCase.equals(ServerType.TNONBLOCKINGSERVER.toString())) { thriftServer = new TNonblockingThriftServer(); } else if (serverTypeUpperCase.equals(ServerType.NETTY.toString())) { thriftServer = new NettyThriftServer(); } else if (serverTypeUpperCase.equals(ServerType.GRIZZLY.toString())) { thriftServer = new GrizzlyThriftServer(); } else { throw new IllegalArgumentException("unknown server type"); } this.thriftServer = thriftServer; return thriftServer; } private interface ThriftServer { public void open(TProcessor processor) throws Exception; public void close() throws Exception; } private class GrizzlyThriftServer implements ThriftServer { private TCPNIOTransport transport; @SuppressWarnings("unchecked") @Override public void open(TProcessor processor) throws Exception { if (transport == null) { final FilterChainBuilder serverFilterChainBuilder = FilterChainBuilder.stateless(); // for BinaryProtocol //serverFilterChainBuilder.add(new TransportFilter()).add(new ThriftFrameFilter(size)).add(new ThriftServerFilter(processor, new TBinaryProtocol.Factory(), size)); // for CompactProtocol serverFilterChainBuilder.add(new TransportFilter()).add(new ThriftFrameFilter(size)).add(new ThriftServerFilter(processor, new TCompactProtocol.Factory(), size)); transport = TCPNIOTransportBuilder.newInstance().build(); transport.setProcessor(serverFilterChainBuilder.build()); // for LeaderFollower IO Strategy //transport.setIOStrategy(LeaderFollowerNIOStrategy.getInstance()); // for Same thread IO Strategy transport.setIOStrategy(SameThreadIOStrategy.getInstance()); transport.setSelectorRunnersCount(minWorkerPoolSize); java.util.concurrent.ThreadPoolExecutor executor = new java.util.concurrent.ThreadPoolExecutor(minWorkerPoolSize, maxWorkerPoolSize, 60 * 5, TimeUnit.SECONDS, new LinkedTransferQueue()); transport.setWorkerThreadPool(executor); if (host != null) { transport.bind(host, port); } else { transport.bind(port); } transport.start(); } } @Override public void close() throws Exception { if (transport != null) { transport.stop(); transport = null; } } } private class NettyThriftServer implements ThriftServer { private Channel channel; @Override public void open(TProcessor processor) throws Exception { if (channel == null) { ServerBootstrap serverBootstrap = new ServerBootstrap(new NioServerSocketChannelFactory(Executors.newCachedThreadPool(), Executors.newCachedThreadPool())); // for Binaryprotocol //final ThriftServerHandler serverHandler = new ThriftServerHandler(processor, new TBinaryProtocol.Factory()); // for CompactProtocol final ThriftServerHandler serverHandler = new ThriftServerHandler(processor, new TCompactProtocol.Factory()); serverHandler.setResponseSize(size); serverBootstrap.setPipelineFactory(new ThriftPipelineFactory(serverHandler)); if (host != null) { channel = serverBootstrap.bind(new InetSocketAddress(host, port)); } else { channel = serverBootstrap.bind(new InetSocketAddress(port)); } } } @Override public void close() throws Exception { if (channel != null) { channel.close().awaitUninterruptibly(); channel = null; } } } private abstract class AbstractThriftServer implements ThriftServer { protected TServer server; protected TServerTransport serverTransport; @Override public void close() throws Exception { if (server != null) { server.stop(); server = null; } if (serverTransport != null) { serverTransport.close(); serverTransport = null; } } } private class TThriftServer extends AbstractThriftServer { @Override public void open(TProcessor processor) throws Exception { if (serverTransport == null) { if (host != null) { serverTransport = new TServerSocket(new InetSocketAddress(host, port)); } else { serverTransport = new TServerSocket(new InetSocketAddress(port)); } } if (server == null) { server = new TSimpleServer(new TServer.Args(serverTransport) .processor(processor) .transportFactory(new TFramedTransport.Factory()) .protocolFactory(new TCompactProtocol.Factory())); server.serve(); } } } private class TThreadPoolThriftServer extends AbstractThriftServer { @Override public void open(TProcessor processor) throws Exception { if (serverTransport == null) { if (host != null) { serverTransport = new TServerSocket(new InetSocketAddress(host, port)); } else { serverTransport = new TServerSocket(new InetSocketAddress(port)); } } if (server == null) { server = new TThreadPoolServer(new TThreadPoolServer.Args(serverTransport) .minWorkerThreads(minWorkerPoolSize) .maxWorkerThreads(maxWorkerPoolSize) .processor(processor) .transportFactory(new TFramedTransport.Factory()) // for BinaryProtocol //.protocolFactory(new TBinaryProtocol.Factory())); // for CompactProtocol .protocolFactory(new TCompactProtocol.Factory())); server.serve(); } } } private class TNonblockingThriftServer extends AbstractThriftServer { @Override public void open(TProcessor processor) throws Exception { if (serverTransport == null) { if (host != null) { serverTransport = new TNonblockingServerSocket(new InetSocketAddress(host, port)); } else { serverTransport = new TNonblockingServerSocket(new InetSocketAddress(port)); } } if (server == null) { server = new TNonblockingServer(new TNonblockingServer.Args((TNonblockingServerTransport) serverTransport) .processor(processor) .transportFactory(new TFramedTransport.Factory()) .protocolFactory(new TCompactProtocol.Factory())); //.protocolFactory(new TBinaryProtocol.Factory())); server.serve(); } } } public static class Builder { //private String host = "localhost"; private String host = null; private int port = 9090; private int maxWorkerPoolSize = 2048; private String serverType = "grizzly"; public Builder host(String host) { this.host = host; return this; } public Builder port(int port) { this.port = port; return this; } public Builder serverType(String serverType) { this.serverType = serverType; return this; } public Builder maxWorkerPoolSize(int maxWorkerPoolSize) { this.maxWorkerPoolSize = maxWorkerPoolSize; return this; } public ThriftServerBenchmark build() { return new ThriftServerBenchmark(this); } } @SuppressWarnings("unchecked") public static void main(String[] args) { final int port; final String serverType; final int maxWorkerPoolSize; if (args.length != 3) { printUsages(); return; } try { port = Integer.parseInt(args[0]); } catch (NumberFormatException e) { printUsages(); return; } serverType = args[1]; try { maxWorkerPoolSize = Integer.parseInt(args[2]); } catch (NumberFormatException e) { printUsages(); return; } ThriftServerBenchmark test = new ThriftServerBenchmark.Builder() .port(port) .serverType(serverType) .maxWorkerPoolSize(maxWorkerPoolSize) .build(); try { test.startBenchmark(new Calculator.Processor(new CalculatorHandler())); } catch (Exception e) { e.printStackTrace(); } final int infiniteSleep = 1000000000; try { Thread.sleep(infiniteSleep); } catch (InterruptedException ignore) { } /* try { test.stopBenchmark(); } catch (Exception ignore) { } */ } private static void printUsages() { System.out.println("please enter port, server type and max worker pool size.\n"); System.out.println("server type: \"tserver\" or \"tthreadpoolserver\" or \"tnonblockingserver\" or \"netty\" or \"grizzly\""); System.out.println("ex) java -cp . ThriftServerBenchmark 9090 grizzly 2048"); } }