# This patch file was generated by NetBeans IDE # Following Index: paths are relative to: /home/oleksiys/Projects/0000/nfs-rpc # This patch can be applied using context Tools: Patch action on respective folder. # It uses platform neutral UTF-8 encoding and \n newlines. # Above lines and this line are ignored by the patching process. Index: nfs-rpc-common/src/main/java/code/google/nfs/rpc/benchmark/AbstractBenchmarkClient.java --- nfs-rpc-common/src/main/java/code/google/nfs/rpc/benchmark/AbstractBenchmarkClient.java Base (BASE) +++ nfs-rpc-common/src/main/java/code/google/nfs/rpc/benchmark/AbstractBenchmarkClient.java Locally Modified (Based On LOCAL) @@ -127,10 +127,10 @@ clientNums, timeout, codectype, requestSize, barrier, latch, endtime, benchmarkBeginTime); runnables.add(runnable); - Thread thread = new Thread(runnable, "benchmarkclient-" + i); - thread.start(); } + startRunnables(runnables); + latch.await(); // read results & add all @@ -271,4 +271,12 @@ int clientNums, int rpcTimeout, int codecType, int requestSize, CyclicBarrier barrier, CountDownLatch latch, long endTime, long startTime); + protected void startRunnables(List runnables) { + for (int i = 0; i < runnables.size(); i++) { + final ClientRunnable runnable = runnables.get(i); + Thread thread = new Thread(runnable, "benchmarkclient-" + i); + thread.start(); } + } + +} Index: nfs-rpc-grizzly/pom.xml --- nfs-rpc-grizzly/pom.xml Base (BASE) +++ nfs-rpc-grizzly/pom.xml Locally Modified (Based On LOCAL) @@ -46,8 +46,8 @@ org.glassfish.grizzly - grizzly-core - 2.1.2 + grizzly-framework + 2.2-SNAPSHOT com.google.protobuf Index: nfs-rpc-grizzly/src/main/java/code/google/nfs/rpc/grizzly/benchmark/GrizzlyBenchmarkServer.java --- nfs-rpc-grizzly/src/main/java/code/google/nfs/rpc/grizzly/benchmark/GrizzlyBenchmarkServer.java Base (BASE) +++ nfs-rpc-grizzly/src/main/java/code/google/nfs/rpc/grizzly/benchmark/GrizzlyBenchmarkServer.java Locally Modified (Based On LOCAL) @@ -1,4 +1,5 @@ package code.google.nfs.rpc.grizzly.benchmark; + /** * nfs-rpc * Apache License @@ -8,6 +9,7 @@ import code.google.nfs.rpc.benchmark.AbstractBenchmarkServer; import code.google.nfs.rpc.grizzly.server.GrizzlyServer; import code.google.nfs.rpc.server.Server; + /** * Grizzly RPC Benchmark Server * @@ -15,12 +17,14 @@ */ public class GrizzlyBenchmarkServer extends AbstractBenchmarkServer { - public static void main(String[] args) throws Exception{ + public static void main(String[] args) throws Exception { +// String[] myArgs = new String[] {"9090", "20", "100"}; new GrizzlyBenchmarkServer().run(args); + System.out.println("press enter to exit"); + System.in.read(); } public Server getServer() { return new GrizzlyServer(); } - } Index: nfs-rpc-grizzly/src/main/java/code/google/nfs/rpc/grizzly/benchmark/GrizzlySimpleBenchmarkClient.java --- nfs-rpc-grizzly/src/main/java/code/google/nfs/rpc/grizzly/benchmark/GrizzlySimpleBenchmarkClient.java Base (BASE) +++ nfs-rpc-grizzly/src/main/java/code/google/nfs/rpc/grizzly/benchmark/GrizzlySimpleBenchmarkClient.java Locally Modified (Based On LOCAL) @@ -1,4 +1,5 @@ package code.google.nfs.rpc.grizzly.benchmark; + /** * nfs-rpc * Apache License @@ -6,17 +7,21 @@ * http://code.google.com/p/nfs-rpc (c) 2011 */ import code.google.nfs.rpc.benchmark.AbstractSimpleProcessorBenchmarkClient; +import code.google.nfs.rpc.benchmark.ClientRunnable; import code.google.nfs.rpc.client.ClientFactory; import code.google.nfs.rpc.grizzly.client.GrizzlyClientFactory; +import java.util.List; +import org.glassfish.grizzly.threadpool.GrizzlyExecutorService; +import org.glassfish.grizzly.threadpool.ThreadPoolConfig; + /** * Grizzly Simple Benchmark Client * * @author bluedavy */ -public class GrizzlySimpleBenchmarkClient extends - AbstractSimpleProcessorBenchmarkClient { +public class GrizzlySimpleBenchmarkClient extends AbstractSimpleProcessorBenchmarkClient { - public static void main(String[] args) throws Exception{ + public static void main(String[] args) throws Exception { new GrizzlySimpleBenchmarkClient().run(args); } @@ -24,4 +29,19 @@ return GrizzlyClientFactory.getInstance(); } + @Override + protected void startRunnables(List runnables) { + ThreadPoolConfig tpc = ThreadPoolConfig.defaultConfig() + .copy() + .setPoolName("benchmarkclient") + .setMaxPoolSize(runnables.size()) + .setCorePoolSize(runnables.size()); + + GrizzlyExecutorService executorService = GrizzlyExecutorService.createInstance(tpc); + + for (int i = 0; i < runnables.size(); i++) { + ClientRunnable runnable = runnables.get(i); + executorService.execute(runnable); } + } +} Index: nfs-rpc-grizzly/src/main/java/code/google/nfs/rpc/grizzly/client/GrizzlyClient.java --- nfs-rpc-grizzly/src/main/java/code/google/nfs/rpc/grizzly/client/GrizzlyClient.java Base (BASE) +++ nfs-rpc-grizzly/src/main/java/code/google/nfs/rpc/grizzly/client/GrizzlyClient.java Locally Modified (Based On LOCAL) @@ -1,4 +1,5 @@ package code.google.nfs.rpc.grizzly.client; + /** * nfs-rpc * Apache License @@ -10,6 +11,7 @@ import code.google.nfs.rpc.RequestWrapper; import code.google.nfs.rpc.client.AbstractClient; + /** * Grizzly Client * @@ -18,16 +20,12 @@ public class GrizzlyClient extends AbstractClient { private String targetIP; - private int targetPort; - private int connectTimeout; - private Connection connection; - private String clientKey; - public GrizzlyClient(String targetIP,int targetPort,int connectTimeout,Connection connection,String clientKey){ + public GrizzlyClient(String targetIP, int targetPort, int connectTimeout, Connection connection, String clientKey) { this.targetIP = targetIP; this.targetPort = targetPort; this.connectTimeout = connectTimeout; @@ -35,13 +33,13 @@ this.clientKey = clientKey; } - @SuppressWarnings({ "unchecked", "rawtypes" }) + @SuppressWarnings({"unchecked", "rawtypes"}) public void sendRequest(RequestWrapper wrapper, int timeout) throws Exception { + System.out.println("sendRequest connection=" + connection + " tp=" + Thread.currentThread()); connection.write(wrapper, new CompletionHandler() { public void cancelled() { - } public void failed(Throwable throwable) { @@ -50,11 +48,9 @@ } public void completed(Object result) { - } public void updated(Object result) { - } }); } @@ -70,5 +66,4 @@ public int getConnectTimeout() { return connectTimeout; } - } Index: nfs-rpc-grizzly/src/main/java/code/google/nfs/rpc/grizzly/client/GrizzlyClientFactory.java --- nfs-rpc-grizzly/src/main/java/code/google/nfs/rpc/grizzly/client/GrizzlyClientFactory.java Base (BASE) +++ nfs-rpc-grizzly/src/main/java/code/google/nfs/rpc/grizzly/client/GrizzlyClientFactory.java Locally Modified (Based On LOCAL) @@ -43,7 +43,7 @@ GrizzlyClientHandler handler = new GrizzlyClientHandler(); FilterChainBuilder filterChainBuilder = FilterChainBuilder.stateless(); filterChainBuilder.add(new TransportFilter()); - filterChainBuilder.add(new GrizzlyProtocolFilter()); + filterChainBuilder.add(new GrizzlyProtocolFilter(false)); filterChainBuilder.add(handler); final TCPNIOTransport transport = TCPNIOTransportBuilder.newInstance().build(); transport.setIOStrategy(SameThreadIOStrategy.getInstance()); Index: nfs-rpc-grizzly/src/main/java/code/google/nfs/rpc/grizzly/client/GrizzlyClientHandler.java --- nfs-rpc-grizzly/src/main/java/code/google/nfs/rpc/grizzly/client/GrizzlyClientHandler.java Base (BASE) +++ nfs-rpc-grizzly/src/main/java/code/google/nfs/rpc/grizzly/client/GrizzlyClientHandler.java Locally Modified (Based On LOCAL) @@ -1,4 +1,5 @@ package code.google.nfs.rpc.grizzly.client; + /** * nfs-rpc * Apache License @@ -13,6 +14,8 @@ import code.google.nfs.rpc.ResponseWrapper; import code.google.nfs.rpc.client.Client; +import java.util.List; + /** * Grizzly Client Handler * @@ -22,18 +25,34 @@ private Client client; - public void setClient(Client client){ + public void setClient(Client client) { this.client = client; } public NextAction handleRead(FilterChainContext ctx) throws IOException { + final Object message = ctx.getMessage(); + + IllegalStateException error = null; + try { - client.putResponse((ResponseWrapper) ctx.getMessage()); + if (message instanceof List) { + @SuppressWarnings("unchecked") + List responses = (List) message; + client.putResponses(responses); + } else if (message instanceof ResponseWrapper) { + ResponseWrapper response = (ResponseWrapper) message; + client.putResponse(response); + } else { + error = new IllegalStateException("receive message error,only support List || ResponseWrapper"); } - catch (Exception e) { - e.printStackTrace(); + } catch (Exception e) { + error = new IllegalStateException(e); } - return ctx.getStopAction(); + + if (error != null) { + throw error; } + return ctx.getStopAction(); } +} Index: nfs-rpc-grizzly/src/main/java/code/google/nfs/rpc/grizzly/serialize/GrizzlyByteBufferWrapper.java --- nfs-rpc-grizzly/src/main/java/code/google/nfs/rpc/grizzly/serialize/GrizzlyByteBufferWrapper.java Base (BASE) +++ nfs-rpc-grizzly/src/main/java/code/google/nfs/rpc/grizzly/serialize/GrizzlyByteBufferWrapper.java Locally Modified (Based On LOCAL) @@ -1,4 +1,5 @@ package code.google.nfs.rpc.grizzly.serialize; + /** * nfs-rpc * Apache License @@ -10,6 +11,7 @@ import org.glassfish.grizzly.memory.MemoryManager; import code.google.nfs.rpc.protocol.ByteBufferWrapper; + /** * Grizzly ByteBuffer Wrapper * @@ -18,26 +20,27 @@ public class GrizzlyByteBufferWrapper implements ByteBufferWrapper { private Buffer buffer; - private FilterChainContext ctx; - public GrizzlyByteBufferWrapper(FilterChainContext ctx){ + public GrizzlyByteBufferWrapper(FilterChainContext ctx) { this.ctx = ctx; } - public GrizzlyByteBufferWrapper(Buffer buffer){ + public GrizzlyByteBufferWrapper(Buffer buffer) { this.buffer = buffer; } public ByteBufferWrapper get(int capacity) { @SuppressWarnings("unchecked") MemoryManager memoryManager = ctx.getConnection().getTransport().getMemoryManager(); - buffer = memoryManager.allocate(capacity); + buffer = memoryManager.allocate(capacity * 2); buffer.allowBufferDispose(true); + +// System.err.println("allocate " + capacity); return this; } - public Buffer getBuffer(){ + public Buffer getBuffer() { return buffer; } @@ -58,11 +61,11 @@ } public int readerIndex() { - return 0; + return buffer.position(); } public void setReaderIndex(int readerIndex) { - // Don't need,so do nothing + buffer.position(readerIndex); } public void writeByte(byte data) { @@ -80,5 +83,4 @@ public void writeInt(int data) { buffer.putInt(data); } - } Index: nfs-rpc-grizzly/src/main/java/code/google/nfs/rpc/grizzly/serialize/GrizzlyProtocolFilter.java --- nfs-rpc-grizzly/src/main/java/code/google/nfs/rpc/grizzly/serialize/GrizzlyProtocolFilter.java Base (BASE) +++ nfs-rpc-grizzly/src/main/java/code/google/nfs/rpc/grizzly/serialize/GrizzlyProtocolFilter.java Locally Modified (Based On LOCAL) @@ -1,4 +1,5 @@ package code.google.nfs.rpc.grizzly.serialize; + /** * nfs-rpc * Apache License @@ -12,49 +13,69 @@ import org.glassfish.grizzly.Buffer; import org.glassfish.grizzly.filterchain.BaseFilter; import org.glassfish.grizzly.filterchain.FilterChainContext; +import org.glassfish.grizzly.filterchain.FilterChainEvent; import org.glassfish.grizzly.filterchain.NextAction; -import code.google.nfs.rpc.RequestWrapper; -import code.google.nfs.rpc.ResponseWrapper; import code.google.nfs.rpc.protocol.ProtocolUtils; +import java.util.ArrayList; +import java.util.List; +import org.glassfish.grizzly.Grizzly; +import org.glassfish.grizzly.attributes.Attribute; +import org.glassfish.grizzly.filterchain.TransportFilter; +import org.glassfish.grizzly.memory.CompositeBuffer; /** * Grizzly Protocol Decoder * * @author bluedavy */ -public class GrizzlyProtocolFilter extends BaseFilter{ +public class GrizzlyProtocolFilter extends BaseFilter { private static final Log LOGGER = LogFactory.getLog(GrizzlyProtocolFilter.class); + private static final Attribute OUTPUT_BUFFER_ATTR = + Grizzly.DEFAULT_ATTRIBUTE_BUILDER.createAttribute( + "GrizzlyProtocolFilter.outputBuffer"); + + private final boolean isBufferingEnabled; + + public GrizzlyProtocolFilter(boolean isBufferingEnabled) { + this.isBufferingEnabled = isBufferingEnabled; + } + // decode object public NextAction handleRead(FilterChainContext ctx) throws IOException { - final Buffer buffer = ctx.getMessage(); - final int bufferLen = buffer.remaining(); + final Object message = ctx.getMessage(); + if (message instanceof IncompleteBufferHolder) { + return ctx.getStopAction(((IncompleteBufferHolder) message).buffer); + } + + final Buffer buffer = (Buffer) message; + +// final int bufferLen = buffer.remaining(); Object errorReturnObject = new Object(); GrizzlyByteBufferWrapper wrapper = new GrizzlyByteBufferWrapper(buffer); + try { - Object object = ProtocolUtils.decode(wrapper, errorReturnObject); - if(object == errorReturnObject){ + final List list = new ArrayList(); + Object object; + while ((object = ProtocolUtils.decode(wrapper, errorReturnObject)) + != errorReturnObject) { + list.add(object); + } + + if (list.isEmpty()) { return ctx.getStopAction(buffer); + } else { + final Buffer remainder = buffer.hasRemaining() + ? buffer.split(buffer.position()) : null; + buffer.dispose(); + + ctx.setMessage(list); + return ctx.getInvokeAction(new IncompleteBufferHolder(remainder)); } - else{ - ctx.setMessage(object); - int completeMessageLen = 0; - if(object instanceof RequestWrapper){ - completeMessageLen = ((RequestWrapper)object).getMessageLen(); - } - else{ - completeMessageLen = ((ResponseWrapper)object).getMessageLen(); - } - final Buffer remainder = bufferLen > completeMessageLen ? - buffer.split(completeMessageLen) : null; - buffer.tryDispose(); - return ctx.getInvokeAction(remainder); - } - } - catch (Exception e) { - LOGGER.error("decode message error",e); + } catch (Exception e) { + LOGGER.error("decode message error", e); throw new IOException(e); } } @@ -64,12 +85,49 @@ GrizzlyByteBufferWrapper wrapper = new GrizzlyByteBufferWrapper(ctx); try { ProtocolUtils.encode(ctx.getMessage(), wrapper); - ctx.setMessage(wrapper.getBuffer().flip()); + final Buffer buffer = wrapper.getBuffer(); + buffer.trim(); + + if (isBufferingEnabled) { + CompositeBuffer outputBuffer = OUTPUT_BUFFER_ATTR.get(ctx); + if (outputBuffer == null) { + outputBuffer = CompositeBuffer.newBuffer(ctx.getMemoryManager()); + outputBuffer.allowBufferDispose(true); + outputBuffer.allowInternalBuffersDispose(true); + OUTPUT_BUFFER_ATTR.set(ctx, outputBuffer); } - catch (Exception e) { - throw new IOException("encode message to byte error",e); - } + outputBuffer.append(buffer); + + return ctx.getStopAction(); + } else { + ctx.setMessage(buffer); return ctx.getInvokeAction(); } + } catch (Exception e) { + throw new IOException("encode message to byte error", e); } + } + + @Override + public NextAction handleEvent(final FilterChainContext ctx, + final FilterChainEvent event) throws IOException { + + if (event.type() == TransportFilter.FlushEvent.TYPE) { + final CompositeBuffer outputBuffer = OUTPUT_BUFFER_ATTR.remove(ctx); + ctx.write(outputBuffer); + } + + return ctx.getStopAction(); + } + + + private static class IncompleteBufferHolder { + + public IncompleteBufferHolder(Buffer buffer) { + this.buffer = buffer; + } + + private Buffer buffer; + } +} Index: nfs-rpc-grizzly/src/main/java/code/google/nfs/rpc/grizzly/server/GrizzlyServer.java --- nfs-rpc-grizzly/src/main/java/code/google/nfs/rpc/grizzly/server/GrizzlyServer.java Base (BASE) +++ nfs-rpc-grizzly/src/main/java/code/google/nfs/rpc/grizzly/server/GrizzlyServer.java Locally Modified (Based On LOCAL) @@ -1,11 +1,11 @@ package code.google.nfs.rpc.grizzly.server; + /** * nfs-rpc * Apache License * * http://code.google.com/p/nfs-rpc (c) 2011 */ -import java.io.IOException; import java.util.concurrent.ExecutorService; import java.util.concurrent.ThreadPoolExecutor; @@ -21,6 +21,7 @@ import code.google.nfs.rpc.ProtocolFactory; import code.google.nfs.rpc.grizzly.serialize.GrizzlyProtocolFilter; import code.google.nfs.rpc.server.Server; + /** * Grizzly Server * @@ -29,46 +30,29 @@ public class GrizzlyServer implements Server { private static final Log LOGGER = LogFactory.getLog(GrizzlyServer.class); - private TCPNIOTransport transport = null; public void start(int listenPort, ExecutorService threadpool) throws Exception { FilterChainBuilder filterChainBuilder = FilterChainBuilder.stateless(); filterChainBuilder.add(new TransportFilter()); - filterChainBuilder.add(new GrizzlyProtocolFilter()); + filterChainBuilder.add(new GrizzlyProtocolFilter(true)); filterChainBuilder.add(new GrizzlyServerHandler()); TCPNIOTransportBuilder builder = TCPNIOTransportBuilder.newInstance(); ThreadPoolConfig config = builder.getWorkerThreadPoolConfig(); - ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor)threadpool; + ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) threadpool; config.setCorePoolSize(threadPoolExecutor.getCorePoolSize()).setMaxPoolSize(threadPoolExecutor.getMaximumPoolSize()).setPoolName("GRIZZLY-SERVER"); transport = builder.build(); transport.setIOStrategy(WorkerThreadIOStrategy.getInstance()); + transport.setProcessor(filterChainBuilder.build()); transport.bind(listenPort); - final TCPNIOTransport t = transport; - new Thread(new Runnable() { - public void run() { - try { - t.start(); + + transport.start(); + LOGGER.warn("server started,listen at: " + listenPort); } - catch (IOException e) { - e.printStackTrace(); - } - while(true){ - try { - Thread.sleep(Long.MAX_VALUE); - } catch (InterruptedException e) { - // TODO Auto-generated catch block - e.printStackTrace(); - } - } - } - }).start(); - LOGGER.warn("server started,listen at: "+listenPort); - } public void stop() throws Exception { - if(transport != null){ + if (transport != null) { transport.stop(); LOGGER.warn("server stoped!"); } @@ -77,5 +61,4 @@ public void registerProcessor(Integer protocolType, String serviceName, Object serviceInstance) { ProtocolFactory.getServerHandler(protocolType).registerProcessor(serviceName, serviceInstance); } - } Index: nfs-rpc-grizzly/src/main/java/code/google/nfs/rpc/grizzly/server/GrizzlyServerHandler.java --- nfs-rpc-grizzly/src/main/java/code/google/nfs/rpc/grizzly/server/GrizzlyServerHandler.java Base (BASE) +++ nfs-rpc-grizzly/src/main/java/code/google/nfs/rpc/grizzly/server/GrizzlyServerHandler.java Locally Modified (Based On LOCAL) @@ -44,6 +44,7 @@ else{ handleOneRequest(ctx, (RequestWrapper)message); } + ctx.flush(null); return ctx.getStopAction(); }